chiark / gitweb /
wip, and move PacketQueue
[hippotat.git] / server
1 #!/usr/bin/python3
2
3 import signal
4 signal.signal(signal.SIGINT, signal.SIG_DFL)
5
6 import sys
7 import os
8
9 import twisted.internet
10 import twisted.internet.endpoints
11 from twisted.web.server import NOT_DONE_YET
12 from twisted.logger import LogLevel
13
14 #import twisted.web.server import Site
15 #from twisted.web.resource import Resource
16
17 from optparse import OptionParser
18 from configparser import ConfigParser
19 from configparser import NoOptionError
20
21 import collections
22
23 import syslog
24
25 from hippotat import *
26
27 clients = { }
28
29 defcfg = '''
30 [DEFAULT]
31 max_batch_down = 65536
32 max_queue_time = 10
33 max_request_time = 54
34 target_requests_outstanding = 3
35
36 [virtual]
37 mtu = 1500
38 # network
39 # [host]
40 # [relay]
41
42 [server]
43 ipif = userv root ipif %(host)s,%(relay)s,%(mtu)s,slip %(network)s
44 addrs = 127.0.0.1 ::1
45 port = 8099
46
47 [limits]
48 max_batch_down = 262144
49 max_queue_time = 121
50 max_request_time = 121
51 target_requests_outstanding = 10
52 '''
53
54 #---------- error handling ----------
55
56 def crash(err):
57   print('CRASH ', err, file=sys.stderr)
58   try: reactor.stop()
59   except twisted.internet.error.ReactorNotRunning: pass
60
61 def crash_on_defer(defer):
62   defer.addErrback(lambda err: crash(err))
63
64 def crash_on_critical(event):
65   if event.get('log_level') >= LogLevel.critical:
66     crash(twisted.logger.formatEvent(event))
67
68 #---------- "router" ----------
69
70 def route(packet, saddr, daddr):
71   print('TRACE ', saddr, daddr, packet)
72   try: client = clients[daddr]
73   except KeyError: dclient = None
74   if dclient is not None:
75     dclient.queue_outbound(packet)
76   elif saddr.is_link_local or daddr.is_link_local:
77     log_discard(packet, saddr, daddr, 'link-local')
78   elif daddr == host or daddr not in network:
79     print('TRACE INBOUND ', saddr, daddr, packet)
80     queue_inbound(packet)
81   elif daddr == relay:
82     log_discard(packet, saddr, daddr, 'relay')
83   else:
84     log_discard(packet, saddr, daddr, 'no client')
85
86 def log_discard(packet, saddr, daddr, why):
87   print('DROP ', saddr, daddr, why)
88 #  syslog.syslog(syslog.LOG_DEBUG,
89 #                'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
90
91 #---------- client ----------
92
93 class Client():
94   def __init__(self, ip, cs):
95     # instance data members
96     self._ip = ip
97     self._cs = cs
98     self.pw = cfg.get(cs, 'password')
99     self._rq = collections.deque() # requests
100     # self._pq = PacketQueue(...)
101     # plus from config:
102     #  .max_batch_down
103     #  .max_queue_time
104     #  .max_request_time
105     #  .target_requests_outstanding
106     for k in ('max_batch_down','max_queue_time','max_request_time',
107               'target_requests_outstanding'):
108       req = cfg.getint(cs, k)
109       limit = cfg.getint('limits',k)
110       self.__dict__[k] = min(req, limit)
111     self._pq = PacketQueue(self.max_queue_time)
112
113     def process_arriving_data(self, d):
114       for packet in slip.decode(d):
115         (saddr, daddr) = packet_addrs(packet)
116         if saddr != self._ip:
117           raise ValueError('wrong source address %s' % saddr)
118         route(packet, saddr, daddr)
119
120     def _req_cancel(self, request):
121       request.finish()
122
123     def _req_error(self, err, request):
124       self._req_cancel(request)
125
126     def queue_outbound(self, packet):
127       self._pq.append(packet)
128
129     def http_request(self, request):
130       request.setHeader('Content-Type','application/octet-stream')
131       reactor.callLater(self.max_request_time, self._req_cancel, request)
132       request.notifyFinish().addErrback(self._req_error, request)
133       self._rq.append(request)
134       self._check_outbound()
135
136     def _check_outbound(self):
137       while True:
138         try: request = self._rq[0]
139         except IndexError: request = None
140         if request and request.finished:
141           self._rq.popleft()
142           continue
143
144         if not self._pq.nonempty():
145           # no packets, oh well
146           continue
147
148         if request is None:
149           # no request
150           break
151
152         # request, and also some non-expired packets
153         while True:
154           packet = self.pq.popleft()
155           if packet is None: break
156
157           encoded = slip.encode(packet)
158           
159           if request.sentLength > 0:
160             if (request.sentLength + len(slip.delimiter)
161                 + len(encoded) > self.max_batch_down):
162               break
163             request.write(slip.delimiter)
164
165           request.write(encoded)
166           self._pq.popLeft()
167
168         assert(request.sentLength)
169         self._rq.popLeft()
170         request.finish()
171         # round again, looking for more to do
172
173       while len(self._rq) > self.target_requests_outstanding:
174         request = self._rq.popleft()
175         request.finish()
176
177 class IphttpResource(twisted.web.resource.Resource):
178   isLeaf = True
179   def render_POST(self, request):
180     # find client, update config, etc.
181     ci = ipaddr(request.args['i'])
182     c = clients[ci]
183     pw = request.args['pw']
184     if pw != c.pw: raise ValueError('bad password')
185
186     # update config
187     for r, w in (('mbd', 'max_batch_down'),
188                  ('mqt', 'max_queue_time'),
189                  ('mrt', 'max_request_time'),
190                  ('tro', 'target_requests_outstanding')):
191       try: v = request.args[r]
192       except KeyError: continue
193       v = int(v)
194       c.__dict__[w] = v
195
196     try: d = request.args['d']
197     except KeyError: d = ''
198
199     c.process_arriving_data(d)
200     c.new_request(request)
201
202   def render_GET(self, request):
203     return b'<html><body>hippotat</body></html>'
204
205 def start_http():
206   resource = IphttpResource()
207   site = twisted.web.server.Site(resource)
208   for addrspec in cfg.get('server','addrs').split():
209     try:
210       addr = ipaddress.IPv4Address(addrspec)
211       endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
212     except AddressValueError:
213       addr = ipaddress.IPv6Address(addrspec)
214       endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
215     ep = endpointfactory(reactor, cfg.getint('server','port'), addr)
216     crash_on_defer(ep.listen(site))
217
218 #---------- config and setup ----------
219         
220 def process_cfg():
221   global network
222   global host
223   global relay
224   global ipif_command
225
226   network = ipnetwork(cfg.get('virtual','network'))
227   if network.num_addresses < 3 + 2:
228     raise ValueError('network needs at least 2^3 addresses')
229
230   try:
231     host = cfg.get('virtual','host')
232   except NoOptionError:
233     host = next(network.hosts())
234
235   try:
236     relay = cfg.get('virtual','relay')
237   except NoOptionError:
238     for search in network.hosts():
239       if search == host: continue
240       relay = search
241       break
242
243   for cs in cfg.sections():
244     if not (':' in cs or '.' in cs): continue
245     ci = ipaddr(cs)
246     if ci not in network:
247       raise ValueError('client %s not in network' % ci)
248     if ci in clients:
249       raise ValueError('multiple client cfg sections for %s' % ci)
250     clients[ci] = Client(ci, cs)
251
252   global mtu
253   mtu = cfg.get('virtual','mtu')
254
255   iic_vars = { }
256   for k in ('host','relay','mtu','network'):
257     iic_vars[k] = globals()[k]
258
259   ipif_command = cfg.get('server','ipif', vars=iic_vars)
260
261 def startup():
262   global cfg
263
264   op = OptionParser()
265   op.add_option('-c', '--config', dest='configfile',
266                 default='/etc/hippottd/server.conf')
267   global opts
268   (opts, args) = op.parse_args()
269   if len(args): op.error('no non-option arguments please')
270
271   twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
272
273   cfg = ConfigParser()
274   cfg.read_string(defcfg)
275   cfg.read(opts.configfile)
276   process_cfg()
277
278   start_ipif(ipif_command, route)
279   start_http()
280
281 startup()
282 reactor.run()
283 print('CRASHED (end)', file=sys.stderr)