chiark / gitweb /
wip
[hippotat] / server
CommitLineData
094ee3a2 1#!/usr/bin/python3
3fba9787 2
e2d41dc1
IJ
3import sys
4import os
5
5bae5ba3 6import twisted
e2d41dc1
IJ
7import twisted.internet
8import twisted.internet.endpoints
9from twisted.internet import reactor
10from twisted.web.server import NOT_DONE_YET
11from twisted.logger import LogLevel
12
13import ipaddress
14from ipaddress import AddressValueError
5bae5ba3 15
5da7763e
IJ
16#import twisted.web.server import Site
17#from twisted.web.resource import Resource
3fba9787 18
e75e9c17
IJ
19from optparse import OptionParser
20from configparser import ConfigParser
21from configparser import NoOptionError
3fba9787 22
0ac316c8
IJ
23import collections
24
c4b6d990
IJ
25import syslog
26
3fba9787
IJ
27clients = { }
28
e2d41dc1 29def ipaddr(input):
3fba9787 30 try:
ec88b1f1 31 r = ipaddress.IPv4Address(input)
3fba9787 32 except AddressValueError:
ec88b1f1 33 r = ipaddress.IPv6Address(input)
3fba9787
IJ
34 return r
35
36def ipnetwork(input):
37 try:
ec88b1f1 38 r = ipaddress.IPv4Network(input)
3fba9787 39 except NetworkValueError:
ec88b1f1 40 r = ipaddress.IPv6Network(input)
3fba9787
IJ
41 return r
42
e75e9c17 43defcfg = '''
094ee3a2
IJ
44[DEFAULT]
45max_batch_down = 65536
46max_queue_time = 10
47max_request_time = 54
48
e75e9c17
IJ
49[virtual]
50mtu = 1500
51# network
52# [host]
53# [relay]
54
55[server]
e2d41dc1 56ipif = userv root ipif %(host)s,%(relay)s,%(mtu)s,slip %(network)s
5da7763e
IJ
57addrs = 127.0.0.1 ::1
58port = 80
e75e9c17 59
094ee3a2
IJ
60[limits]
61max_batch_down = 262144
62max_queue_time = 121
63max_request_time = 121
ec88b1f1
IJ
64'''
65
5da7763e
IJ
66#---------- "router" ----------
67
ec0c4d95
IJ
68def route(packet, saddr, daddr):
69 print('TRACE ', saddr, daddr, packet)
5da7763e
IJ
70 try: client = clients[daddr]
71 except KeyError: dclient = None
72 if dclient is not None:
73 dclient.queue_outbound(packet)
ec0c4d95
IJ
74 elif daddr.is_multicast:
75 log_discard(packet, saddr, daddr, 'multicast')
76 elif daddr.is_link_local:
77 log_discard(packet, saddr, daddr, 'link-local')
e2d41dc1 78 elif daddr == host or daddr not in network:
ec0c4d95 79 print('TRACE INBOUND ', saddr, daddr, packet)
5da7763e 80 queue_inbound(packet)
e2d41dc1 81 elif daddr == relay:
5da7763e
IJ
82 log_discard(packet, saddr, daddr, 'relay')
83 else:
84 log_discard(packet, saddr, daddr, 'no client')
85
86def log_discard(packet, saddr, daddr, why):
ec0c4d95
IJ
87 print('DROP ', saddr, daddr, why, packet)
88# syslog.syslog(syslog.LOG_DEBUG,
89# 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
5da7763e
IJ
90
91#---------- ipif (slip subprocess) ----------
92
5bae5ba3
IJ
93class IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
94 def __init__(self):
95 self._buffer = b''
96 def connectionMade(self): pass
97 def outReceived(self, data):
ce7f1431 98 #print('RECV ', repr(data))
2b95da16
IJ
99 self._buffer += data
100 packets = slip_decode(self._buffer)
101 self._buffer = packets.pop()
5bae5ba3 102 for packet in packets:
ec0c4d95 103 if not len(packet): continue
5bae5ba3 104 (saddr, daddr) = packet_addrs(packet)
ec0c4d95 105 route(packet, saddr, daddr)
5da7763e
IJ
106 def processEnded(self, status):
107 status.raiseException()
5bae5ba3
IJ
108
109def start_ipif():
5da7763e
IJ
110 global ipif
111 ipif = IpifProcessProtocol()
112 reactor.spawnProcess(ipif,
ce7f1431 113 '/bin/sh',['sh','-xc', ipif_command],
5bae5ba3
IJ
114 childFDs={0:'w', 1:'r', 2:2})
115
5da7763e
IJ
116def queue_inbound(packet):
117 ipif.transport.write(slip_delimiter)
118 ipif.transport.write(slip_encode(packet))
119 ipif.transport.write(slip_delimiter)
5bae5ba3 120
ce7f1431
IJ
121#---------- SLIP handling ----------
122
123slip_end = b'\300'
124slip_esc = b'\333'
125slip_esc_end = b'\334'
126slip_esc_esc = b'\335'
127slip_delimiter = slip_end
128
129def slip_encode(packet):
130 return (packet
131 .replace(slip_esc, slip_esc + slip_esc_esc)
132 .replace(slip_end, slip_esc + slip_esc_end))
133
134def slip_decode(data):
135 print('DECODE ', repr(data))
136 out = []
137 for packet in data.split(slip_end):
138 pdata = b''
139 while True:
140 eix = packet.find(slip_esc)
141 if eix == -1:
142 pdata += packet
143 break
144 #print('ESC ', repr((pdata, packet, eix)))
145 pdata += packet[0 : eix]
146 ck = packet[eix+1]
147 if ck == slip_esc_esc: pdata += slip_esc
148 elif ck == slip_esc_end: pdata += slip_end
149 else: raise ValueError('invalid SLIP escape')
150 packet = packet[eix+2 : ]
151 out.append(pdata)
152 print('DECODED ', repr(out))
153 return out
154
155#---------- packet parsing ----------
156
157def packet_addrs(packet):
ec0c4d95
IJ
158 version = packet[0] >> 4
159 if version == 4:
160 addrlen = 4
161 saddroff = 3*4
162 factory = ipaddress.IPv4Address
163 elif version == 6:
164 addrlen = 16
165 saddroff = 2*4
166 factory = ipaddress.IPv6Address
167 else:
168 raise ValueError('unsupported IP version %d' % version)
169 saddr = factory(packet[ saddroff : saddroff + addrlen ])
170 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
171 return (saddr, daddr)
ce7f1431 172
5da7763e 173#---------- client ----------
c4b6d990 174
ec88b1f1 175class Client():
c4b6d990 176 def __init__(self, ip, cs):
ec88b1f1
IJ
177 # instance data members
178 self._ip = ip
179 self._cs = cs
180 self.pw = cfg.get(cs, 'password')
0ac316c8
IJ
181 self._rq = collections.deque() # requests
182 self._pq = collections.deque() # packets
c4b6d990
IJ
183 # plus from config:
184 # .max_batch_down
185 # .max_queue_time
186 # .max_request_time
ec88b1f1
IJ
187 for k in ('max_batch_down','max_queue_time','max_request_time'):
188 req = cfg.getint(cs, k)
094ee3a2 189 limit = cfg.getint('limits',k)
c4b6d990
IJ
190 self.__dict__[k] = min(req, limit)
191
192 def process_arriving_data(self, d):
193 for packet in slip_decode(d):
5bae5ba3 194 (saddr, daddr) = packet_addrs(packet)
c4b6d990
IJ
195 if saddr != self._ip:
196 raise ValueError('wrong source address %s' % saddr)
ec0c4d95 197 route(packet, saddr, daddr)
ec88b1f1 198
c4b6d990
IJ
199 def _req_cancel(self, request):
200 request.finish()
201
202 def _req_error(self, err, request):
203 self._req_cancel(request)
204
0ac316c8 205 def queue_outbound(self, packet):
094ee3a2 206 self._pq.append((time.monotonic(), packet))
0ac316c8 207
c4b6d990
IJ
208 def http_request(self, request):
209 request.setHeader('Content-Type','application/octet-stream')
210 reactor.callLater(self.max_request_time, self._req_cancel, request)
211 request.notifyFinish().addErrback(self._req_error, request)
0ac316c8
IJ
212 self._rq.append(request)
213 self._check_outbound()
214
215 def _check_outbound(self):
216 while True:
217 try: request = self._rq[0]
218 except IndexError: request = None
219 if request and request.finished:
220 self._rq.popleft()
221 continue
222
223 # now request is an unfinished request, or None
224 try: (queuetime, packet) = self._pq[0]
e2d41dc1 225 except IndexError:
0ac316c8 226 # no packets, oh well
094ee3a2
IJ
227 break
228
229 age = time.monotonic() - queuetime
230 if age > self.max_queue_time:
231 self._pq.popleft()
0ac316c8
IJ
232 continue
233
094ee3a2
IJ
234 if request is None:
235 # no request
236 break
237
238 # request, and also some non-expired packets
239 while True:
240 try: (dummy, packet) = self._pq[0]
241 except IndexError: break
242
243 encoded = slip_encode(packet)
244
245 if request.sentLength > 0:
246 if (request.sentLength + len(slip_delimiter)
247 + len(encoded) > self.max_batch_down):
248 break
249 request.write(slip_delimiter)
250
251 request.write(encoded)
252 self._pq.popLeft()
253
254 assert(request.sentLength)
255 self._rq.popLeft()
256 request.finish()
257 # round again, looking for more to do
ec88b1f1 258
5da7763e
IJ
259class IphttpResource(twisted.web.resource.Resource):
260 def render_POST(self, request):
261 # find client, update config, etc.
e2d41dc1 262 ci = ipaddr(request.args['i'])
5da7763e
IJ
263 c = clients[ci]
264 pw = request.args['pw']
265 if pw != c.pw: raise ValueError('bad password')
266
267 # update config
268 for r, w in (('mbd', 'max_batch_down'),
269 ('mqt', 'max_queue_time'),
270 ('mrt', 'max_request_time')):
271 try: v = request.args[r]
272 except KeyError: continue
273 v = int(v)
274 c.__dict__[w] = v
275
276 try: d = request.args['d']
277 except KeyError: d = ''
278
279 c.process_arriving_data(d)
280 c.new_request(request)
281
282def start_http():
283 resource = IphttpResource()
284 sitefactory = twisted.web.server.Site(resource)
e2d41dc1 285 for addrspec in cfg.get('server','addrs').split():
5da7763e
IJ
286 try:
287 addr = ipaddress.IPv4Address(addrspec)
288 endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
289 except AddressValueError:
290 addr = ipaddress.IPv6Address(addrspec)
291 endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
292 ep = endpointfactory(reactor, cfg.getint('server','port'), addr)
293 ep.listen(sitefactory)
294
295#---------- config and setup ----------
296
3fba9787
IJ
297def process_cfg():
298 global network
e75e9c17
IJ
299 global host
300 global relay
5bae5ba3 301 global ipif_command
3fba9787 302
ec88b1f1 303 network = ipnetwork(cfg.get('virtual','network'))
e75e9c17
IJ
304 if network.num_addresses < 3 + 2:
305 raise ValueError('network needs at least 2^3 addresses')
306
3fba9787 307 try:
e75e9c17
IJ
308 host = cfg.get('virtual','host')
309 except NoOptionError:
e2d41dc1 310 host = next(network.hosts())
e75e9c17
IJ
311
312 try:
313 relay = cfg.get('virtual','relay')
e2d41dc1 314 except NoOptionError:
e75e9c17 315 for search in network.hosts():
e2d41dc1 316 if search == host: continue
e75e9c17
IJ
317 relay = search
318 break
3fba9787 319
ec88b1f1
IJ
320 for cs in cfg.sections():
321 if not (':' in cs or '.' in cs): continue
e2d41dc1 322 ci = ipaddr(cs)
ec88b1f1
IJ
323 if ci not in network:
324 raise ValueError('client %s not in network' % ci)
325 if ci in clients:
326 raise ValueError('multiple client cfg sections for %s' % ci)
327 clients[ci] = Client(ci, cs)
3fba9787 328
e2d41dc1
IJ
329 global mtu
330 mtu = cfg.get('virtual','mtu')
331
5bae5ba3
IJ
332 iic_vars = { }
333 for k in ('host','relay','mtu','network'):
334 iic_vars[k] = globals()[k]
335
336 ipif_command = cfg.get('server','ipif', vars=iic_vars)
337
e2d41dc1
IJ
338def crash_on_critical(event):
339 if event.get('log_level') >= LogLevel.critical:
340 print('crashing: ', twisted.logger.formatEvent(event), file=sys.stderr)
341 #print('crashing!', file=sys.stderr)
342 #os._exit(1)
343 try: reactor.stop()
344 except twisted.internet.error.ReactorNotRunning: pass
345
e75e9c17 346def startup():
e2d41dc1
IJ
347 global cfg
348
e75e9c17
IJ
349 op = OptionParser()
350 op.add_option('-c', '--config', dest='configfile',
351 default='/etc/hippottd/server.conf')
352 global opts
353 (opts, args) = op.parse_args()
354 if len(args): op.error('no non-option arguments please')
355
e2d41dc1
IJ
356 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
357
e75e9c17 358 cfg = ConfigParser()
5bae5ba3 359 cfg.read_string(defcfg)
e2d41dc1 360 cfg.read(opts.configfile)
5bae5ba3
IJ
361 process_cfg()
362
363 start_ipif()
364 start_http()
e2d41dc1
IJ
365
366startup()
367reactor.run()