chiark / gitweb /
775a0c34362458b93dffc0ec3fb6ade28b054e35
[hippotat.git] / server
1 #!/usr/bin/python3
2
3 import sys
4 import os
5
6 import twisted
7 import twisted.internet
8 import twisted.internet.endpoints
9 from twisted.internet import reactor
10 from twisted.web.server import NOT_DONE_YET
11 from twisted.logger import LogLevel
12
13 import ipaddress
14 from ipaddress import AddressValueError
15
16 #import twisted.web.server import Site
17 #from twisted.web.resource import Resource
18
19 from optparse import OptionParser
20 from configparser import ConfigParser
21 from configparser import NoOptionError
22
23 import collections
24
25 import syslog
26
27 clients = { }
28
29 def ipaddr(input):
30   try:
31     r = ipaddress.IPv4Address(input)
32   except AddressValueError:
33     r = ipaddress.IPv6Address(input)
34   return r
35
36 def ipnetwork(input):
37   try:
38     r = ipaddress.IPv4Network(input)
39   except NetworkValueError:
40     r = ipaddress.IPv6Network(input)
41   return r
42
43 defcfg = '''
44 [DEFAULT]
45 max_batch_down = 65536
46 max_queue_time = 10
47 max_request_time = 54
48
49 [virtual]
50 mtu = 1500
51 # network
52 # [host]
53 # [relay]
54
55 [server]
56 ipif = userv root ipif %(host)s,%(relay)s,%(mtu)s,slip %(network)s
57 addrs = 127.0.0.1 ::1
58 port = 80
59
60 [limits]
61 max_batch_down = 262144
62 max_queue_time = 121
63 max_request_time = 121
64 '''
65
66 #---------- "router" ----------
67
68 def route(packet, daddr):
69   try: client = clients[daddr]
70   except KeyError: dclient = None
71   if dclient is not None:
72     dclient.queue_outbound(packet)
73   elif daddr == host or daddr not in network:
74     queue_inbound(packet)
75   elif daddr == relay:
76     log_discard(packet, saddr, daddr, 'relay')
77   else:
78     log_discard(packet, saddr, daddr, 'no client')
79
80 def log_discard(packet, saddr, daddr, why):
81   syslog.syslog(syslog.LOG_DEBUG,
82                 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
83
84 #---------- ipif (slip subprocess) ----------
85
86 class IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
87   def __init__(self):
88     self._buffer = b''
89   def connectionMade(self): pass
90   def outReceived(self, data):
91     #print('RECV ', repr(data))
92     self._buffer += data
93     packets = slip_decode(self._buffer)
94     self._buffer = packets.pop()
95     for packet in packets:
96       (saddr, daddr) = packet_addrs(packet)
97       route(packet, daddr)
98   def processEnded(self, status):
99     status.raiseException()
100
101 def start_ipif():
102   global ipif
103   ipif = IpifProcessProtocol()
104   reactor.spawnProcess(ipif,
105                        '/bin/sh',['sh','-xc', ipif_command],
106                        childFDs={0:'w', 1:'r', 2:2})
107
108 def queue_inbound(packet):
109   ipif.transport.write(slip_delimiter)
110   ipif.transport.write(slip_encode(packet))
111   ipif.transport.write(slip_delimiter)
112
113 #---------- SLIP handling ----------
114
115 slip_end = b'\300'
116 slip_esc = b'\333'
117 slip_esc_end = b'\334'
118 slip_esc_esc = b'\335'
119 slip_delimiter = slip_end
120
121 def slip_encode(packet):
122   return (packet
123           .replace(slip_esc, slip_esc + slip_esc_esc)
124           .replace(slip_end, slip_esc + slip_esc_end))
125
126 def slip_decode(data):
127   print('DECODE ', repr(data))
128   out = []
129   for packet in data.split(slip_end):
130     pdata = b''
131     while True:
132       eix = packet.find(slip_esc)
133       if eix == -1:
134         pdata += packet
135         break
136       #print('ESC ', repr((pdata, packet, eix)))
137       pdata += packet[0 : eix]
138       ck = packet[eix+1]
139       if   ck == slip_esc_esc: pdata += slip_esc
140       elif ck == slip_esc_end: pdata += slip_end
141       else: raise ValueError('invalid SLIP escape')
142       packet = packet[eix+2 : ]
143     out.append(pdata)
144   print('DECODED ', repr(out))
145   return out
146
147 #---------- packet parsing ----------
148
149 def packet_addrs(packet):
150   pass
151
152 #---------- client ----------
153
154 class Client():
155   def __init__(self, ip, cs):
156     # instance data members
157     self._ip = ip
158     self._cs = cs
159     self.pw = cfg.get(cs, 'password')
160     self._rq = collections.deque() # requests
161     self._pq = collections.deque() # packets
162     # plus from config:
163     #  .max_batch_down
164     #  .max_queue_time
165     #  .max_request_time
166     for k in ('max_batch_down','max_queue_time','max_request_time'):
167       req = cfg.getint(cs, k)
168       limit = cfg.getint('limits',k)
169       self.__dict__[k] = min(req, limit)
170
171     def process_arriving_data(self, d):
172       for packet in slip_decode(d):
173         (saddr, daddr) = packet_addrs(packet)
174         if saddr != self._ip:
175           raise ValueError('wrong source address %s' % saddr)
176         route(packet, daddr)
177
178     def _req_cancel(self, request):
179       request.finish()
180
181     def _req_error(self, err, request):
182       self._req_cancel(request)
183
184     def queue_outbound(self, packet):
185       self._pq.append((time.monotonic(), packet))
186
187     def http_request(self, request):
188       request.setHeader('Content-Type','application/octet-stream')
189       reactor.callLater(self.max_request_time, self._req_cancel, request)
190       request.notifyFinish().addErrback(self._req_error, request)
191       self._rq.append(request)
192       self._check_outbound()
193
194     def _check_outbound(self):
195       while True:
196         try: request = self._rq[0]
197         except IndexError: request = None
198         if request and request.finished:
199           self._rq.popleft()
200           continue
201
202         # now request is an unfinished request, or None
203         try: (queuetime, packet) = self._pq[0]
204         except IndexError:
205           # no packets, oh well
206           break
207
208         age = time.monotonic() - queuetime
209         if age > self.max_queue_time:
210           self._pq.popleft()
211           continue
212
213         if request is None:
214           # no request
215           break
216
217         # request, and also some non-expired packets
218         while True:
219           try: (dummy, packet) = self._pq[0]
220           except IndexError: break
221
222           encoded = slip_encode(packet)
223           
224           if request.sentLength > 0:
225             if (request.sentLength + len(slip_delimiter)
226                 + len(encoded) > self.max_batch_down):
227               break
228             request.write(slip_delimiter)
229
230           request.write(encoded)
231           self._pq.popLeft()
232
233         assert(request.sentLength)
234         self._rq.popLeft()
235         request.finish()
236         # round again, looking for more to do
237
238 class IphttpResource(twisted.web.resource.Resource):
239   def render_POST(self, request):
240     # find client, update config, etc.
241     ci = ipaddr(request.args['i'])
242     c = clients[ci]
243     pw = request.args['pw']
244     if pw != c.pw: raise ValueError('bad password')
245
246     # update config
247     for r, w in (('mbd', 'max_batch_down'),
248                  ('mqt', 'max_queue_time'),
249                  ('mrt', 'max_request_time')):
250       try: v = request.args[r]
251       except KeyError: continue
252       v = int(v)
253       c.__dict__[w] = v
254
255     try: d = request.args['d']
256     except KeyError: d = ''
257
258     c.process_arriving_data(d)
259     c.new_request(request)
260
261 def start_http():
262   resource = IphttpResource()
263   sitefactory = twisted.web.server.Site(resource)
264   for addrspec in cfg.get('server','addrs').split():
265     try:
266       addr = ipaddress.IPv4Address(addrspec)
267       endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
268     except AddressValueError:
269       addr = ipaddress.IPv6Address(addrspec)
270       endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
271     ep = endpointfactory(reactor, cfg.getint('server','port'), addr)
272     ep.listen(sitefactory)
273
274 #---------- config and setup ----------
275         
276 def process_cfg():
277   global network
278   global host
279   global relay
280   global ipif_command
281
282   network = ipnetwork(cfg.get('virtual','network'))
283   if network.num_addresses < 3 + 2:
284     raise ValueError('network needs at least 2^3 addresses')
285
286   try:
287     host = cfg.get('virtual','host')
288   except NoOptionError:
289     host = next(network.hosts())
290
291   try:
292     relay = cfg.get('virtual','relay')
293   except NoOptionError:
294     for search in network.hosts():
295       if search == host: continue
296       relay = search
297       break
298
299   for cs in cfg.sections():
300     if not (':' in cs or '.' in cs): continue
301     ci = ipaddr(cs)
302     if ci not in network:
303       raise ValueError('client %s not in network' % ci)
304     if ci in clients:
305       raise ValueError('multiple client cfg sections for %s' % ci)
306     clients[ci] = Client(ci, cs)
307
308   global mtu
309   mtu = cfg.get('virtual','mtu')
310
311   iic_vars = { }
312   for k in ('host','relay','mtu','network'):
313     iic_vars[k] = globals()[k]
314
315   ipif_command = cfg.get('server','ipif', vars=iic_vars)
316
317 def crash_on_critical(event):
318   if event.get('log_level') >= LogLevel.critical:
319     print('crashing: ', twisted.logger.formatEvent(event), file=sys.stderr)
320     #print('crashing!', file=sys.stderr)
321     #os._exit(1)
322     try: reactor.stop()
323     except twisted.internet.error.ReactorNotRunning: pass
324
325 def startup():
326   global cfg
327
328   op = OptionParser()
329   op.add_option('-c', '--config', dest='configfile',
330                 default='/etc/hippottd/server.conf')
331   global opts
332   (opts, args) = op.parse_args()
333   if len(args): op.error('no non-option arguments please')
334
335   twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
336
337   cfg = ConfigParser()
338   cfg.read_string(defcfg)
339   cfg.read(opts.configfile)
340   process_cfg()
341
342   start_ipif()
343   start_http()
344
345 startup()
346 reactor.run()