chiark / gitweb /
move ipif slip process handling
[hippotat] / server
CommitLineData
094ee3a2 1#!/usr/bin/python3
3fba9787 2
aa663282
IJ
3import signal
4signal.signal(signal.SIGINT, signal.SIG_DFL)
5
e2d41dc1
IJ
6import sys
7import os
8
5bae5ba3 9import twisted
e2d41dc1
IJ
10import twisted.internet
11import twisted.internet.endpoints
12from twisted.internet import reactor
13from twisted.web.server import NOT_DONE_YET
14from twisted.logger import LogLevel
15
5da7763e
IJ
16#import twisted.web.server import Site
17#from twisted.web.resource import Resource
3fba9787 18
e75e9c17
IJ
19from optparse import OptionParser
20from configparser import ConfigParser
21from configparser import NoOptionError
3fba9787 22
0ac316c8
IJ
23import collections
24
c4b6d990
IJ
25import syslog
26
040ff511 27from hippotat import *
3fba9787 28
b0cfbfce 29clients = { }
3fba9787 30
e75e9c17 31defcfg = '''
094ee3a2
IJ
32[DEFAULT]
33max_batch_down = 65536
34max_queue_time = 10
35max_request_time = 54
36
e75e9c17
IJ
37[virtual]
38mtu = 1500
39# network
40# [host]
41# [relay]
42
43[server]
e2d41dc1 44ipif = userv root ipif %(host)s,%(relay)s,%(mtu)s,slip %(network)s
5da7763e 45addrs = 127.0.0.1 ::1
aa663282 46port = 8099
e75e9c17 47
094ee3a2
IJ
48[limits]
49max_batch_down = 262144
50max_queue_time = 121
51max_request_time = 121
ec88b1f1
IJ
52'''
53
aa663282
IJ
54#---------- error handling ----------
55
56def crash(err):
57 print('CRASH ', err, file=sys.stderr)
58 try: reactor.stop()
59 except twisted.internet.error.ReactorNotRunning: pass
60
61def crash_on_defer(defer):
62 defer.addErrback(lambda err: crash(err))
63
64def crash_on_critical(event):
65 if event.get('log_level') >= LogLevel.critical:
66 crash(twisted.logger.formatEvent(event))
67
5da7763e
IJ
68#---------- "router" ----------
69
ec0c4d95
IJ
70def route(packet, saddr, daddr):
71 print('TRACE ', saddr, daddr, packet)
5da7763e
IJ
72 try: client = clients[daddr]
73 except KeyError: dclient = None
74 if dclient is not None:
75 dclient.queue_outbound(packet)
3a6076b4 76 elif saddr.is_link_local or daddr.is_link_local:
ec0c4d95 77 log_discard(packet, saddr, daddr, 'link-local')
e2d41dc1 78 elif daddr == host or daddr not in network:
ec0c4d95 79 print('TRACE INBOUND ', saddr, daddr, packet)
5da7763e 80 queue_inbound(packet)
e2d41dc1 81 elif daddr == relay:
5da7763e
IJ
82 log_discard(packet, saddr, daddr, 'relay')
83 else:
84 log_discard(packet, saddr, daddr, 'no client')
85
86def log_discard(packet, saddr, daddr, why):
3a6076b4 87 print('DROP ', saddr, daddr, why)
ec0c4d95
IJ
88# syslog.syslog(syslog.LOG_DEBUG,
89# 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
5da7763e 90
5da7763e 91#---------- client ----------
c4b6d990 92
ec88b1f1 93class Client():
c4b6d990 94 def __init__(self, ip, cs):
ec88b1f1
IJ
95 # instance data members
96 self._ip = ip
97 self._cs = cs
98 self.pw = cfg.get(cs, 'password')
0ac316c8
IJ
99 self._rq = collections.deque() # requests
100 self._pq = collections.deque() # packets
c4b6d990
IJ
101 # plus from config:
102 # .max_batch_down
103 # .max_queue_time
104 # .max_request_time
ec88b1f1
IJ
105 for k in ('max_batch_down','max_queue_time','max_request_time'):
106 req = cfg.getint(cs, k)
094ee3a2 107 limit = cfg.getint('limits',k)
c4b6d990
IJ
108 self.__dict__[k] = min(req, limit)
109
110 def process_arriving_data(self, d):
b0cfbfce 111 for packet in slip.decode(d):
5bae5ba3 112 (saddr, daddr) = packet_addrs(packet)
c4b6d990
IJ
113 if saddr != self._ip:
114 raise ValueError('wrong source address %s' % saddr)
ec0c4d95 115 route(packet, saddr, daddr)
ec88b1f1 116
c4b6d990
IJ
117 def _req_cancel(self, request):
118 request.finish()
119
120 def _req_error(self, err, request):
121 self._req_cancel(request)
122
0ac316c8 123 def queue_outbound(self, packet):
094ee3a2 124 self._pq.append((time.monotonic(), packet))
0ac316c8 125
c4b6d990
IJ
126 def http_request(self, request):
127 request.setHeader('Content-Type','application/octet-stream')
128 reactor.callLater(self.max_request_time, self._req_cancel, request)
129 request.notifyFinish().addErrback(self._req_error, request)
0ac316c8
IJ
130 self._rq.append(request)
131 self._check_outbound()
132
133 def _check_outbound(self):
134 while True:
135 try: request = self._rq[0]
136 except IndexError: request = None
137 if request and request.finished:
138 self._rq.popleft()
139 continue
140
141 # now request is an unfinished request, or None
142 try: (queuetime, packet) = self._pq[0]
e2d41dc1 143 except IndexError:
0ac316c8 144 # no packets, oh well
094ee3a2
IJ
145 break
146
147 age = time.monotonic() - queuetime
148 if age > self.max_queue_time:
149 self._pq.popleft()
0ac316c8
IJ
150 continue
151
094ee3a2
IJ
152 if request is None:
153 # no request
154 break
155
156 # request, and also some non-expired packets
157 while True:
158 try: (dummy, packet) = self._pq[0]
159 except IndexError: break
160
b0cfbfce 161 encoded = slip.encode(packet)
094ee3a2
IJ
162
163 if request.sentLength > 0:
b0cfbfce 164 if (request.sentLength + len(slip.delimiter)
094ee3a2
IJ
165 + len(encoded) > self.max_batch_down):
166 break
b0cfbfce 167 request.write(slip.delimiter)
094ee3a2
IJ
168
169 request.write(encoded)
170 self._pq.popLeft()
171
172 assert(request.sentLength)
173 self._rq.popLeft()
174 request.finish()
175 # round again, looking for more to do
ec88b1f1 176
5da7763e 177class IphttpResource(twisted.web.resource.Resource):
c1e4910b 178 isLeaf = True
5da7763e
IJ
179 def render_POST(self, request):
180 # find client, update config, etc.
e2d41dc1 181 ci = ipaddr(request.args['i'])
5da7763e
IJ
182 c = clients[ci]
183 pw = request.args['pw']
184 if pw != c.pw: raise ValueError('bad password')
185
186 # update config
187 for r, w in (('mbd', 'max_batch_down'),
188 ('mqt', 'max_queue_time'),
189 ('mrt', 'max_request_time')):
190 try: v = request.args[r]
191 except KeyError: continue
192 v = int(v)
193 c.__dict__[w] = v
194
195 try: d = request.args['d']
196 except KeyError: d = ''
197
198 c.process_arriving_data(d)
199 c.new_request(request)
200
8e279651 201 def render_GET(self, request):
040ff511 202 return b'<html><body>hippotat</body></html>'
8e279651 203
5da7763e
IJ
204def start_http():
205 resource = IphttpResource()
b11c6e7a 206 site = twisted.web.server.Site(resource)
e2d41dc1 207 for addrspec in cfg.get('server','addrs').split():
5da7763e
IJ
208 try:
209 addr = ipaddress.IPv4Address(addrspec)
210 endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
211 except AddressValueError:
212 addr = ipaddress.IPv6Address(addrspec)
213 endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
214 ep = endpointfactory(reactor, cfg.getint('server','port'), addr)
b11c6e7a 215 crash_on_defer(ep.listen(site))
5da7763e
IJ
216
217#---------- config and setup ----------
218
3fba9787
IJ
219def process_cfg():
220 global network
e75e9c17
IJ
221 global host
222 global relay
5bae5ba3 223 global ipif_command
3fba9787 224
ec88b1f1 225 network = ipnetwork(cfg.get('virtual','network'))
e75e9c17
IJ
226 if network.num_addresses < 3 + 2:
227 raise ValueError('network needs at least 2^3 addresses')
228
3fba9787 229 try:
e75e9c17
IJ
230 host = cfg.get('virtual','host')
231 except NoOptionError:
e2d41dc1 232 host = next(network.hosts())
e75e9c17
IJ
233
234 try:
235 relay = cfg.get('virtual','relay')
e2d41dc1 236 except NoOptionError:
e75e9c17 237 for search in network.hosts():
e2d41dc1 238 if search == host: continue
e75e9c17
IJ
239 relay = search
240 break
3fba9787 241
ec88b1f1
IJ
242 for cs in cfg.sections():
243 if not (':' in cs or '.' in cs): continue
e2d41dc1 244 ci = ipaddr(cs)
ec88b1f1
IJ
245 if ci not in network:
246 raise ValueError('client %s not in network' % ci)
247 if ci in clients:
248 raise ValueError('multiple client cfg sections for %s' % ci)
249 clients[ci] = Client(ci, cs)
3fba9787 250
e2d41dc1
IJ
251 global mtu
252 mtu = cfg.get('virtual','mtu')
253
5bae5ba3
IJ
254 iic_vars = { }
255 for k in ('host','relay','mtu','network'):
256 iic_vars[k] = globals()[k]
257
258 ipif_command = cfg.get('server','ipif', vars=iic_vars)
259
e75e9c17 260def startup():
e2d41dc1
IJ
261 global cfg
262
e75e9c17
IJ
263 op = OptionParser()
264 op.add_option('-c', '--config', dest='configfile',
265 default='/etc/hippottd/server.conf')
266 global opts
267 (opts, args) = op.parse_args()
268 if len(args): op.error('no non-option arguments please')
269
e2d41dc1
IJ
270 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
271
e75e9c17 272 cfg = ConfigParser()
5bae5ba3 273 cfg.read_string(defcfg)
e2d41dc1 274 cfg.read(opts.configfile)
5bae5ba3
IJ
275 process_cfg()
276
040ff511 277 start_ipif(ipif_command, route)
5bae5ba3 278 start_http()
e2d41dc1
IJ
279
280startup()
281reactor.run()
aa663282 282print('CRASHED (end)', file=sys.stderr)