chiark / gitweb /
move ipif slip process handling
[hippotat.git] / server
1 #!/usr/bin/python3
2
3 import signal
4 signal.signal(signal.SIGINT, signal.SIG_DFL)
5
6 import sys
7 import os
8
9 import twisted
10 import twisted.internet
11 import twisted.internet.endpoints
12 from twisted.internet import reactor
13 from twisted.web.server import NOT_DONE_YET
14 from twisted.logger import LogLevel
15
16 #import twisted.web.server import Site
17 #from twisted.web.resource import Resource
18
19 from optparse import OptionParser
20 from configparser import ConfigParser
21 from configparser import NoOptionError
22
23 import collections
24
25 import syslog
26
27 from hippotat import *
28
29 clients = { }
30
31 defcfg = '''
32 [DEFAULT]
33 max_batch_down = 65536
34 max_queue_time = 10
35 max_request_time = 54
36
37 [virtual]
38 mtu = 1500
39 # network
40 # [host]
41 # [relay]
42
43 [server]
44 ipif = userv root ipif %(host)s,%(relay)s,%(mtu)s,slip %(network)s
45 addrs = 127.0.0.1 ::1
46 port = 8099
47
48 [limits]
49 max_batch_down = 262144
50 max_queue_time = 121
51 max_request_time = 121
52 '''
53
54 #---------- error handling ----------
55
56 def crash(err):
57   print('CRASH ', err, file=sys.stderr)
58   try: reactor.stop()
59   except twisted.internet.error.ReactorNotRunning: pass
60
61 def crash_on_defer(defer):
62   defer.addErrback(lambda err: crash(err))
63
64 def crash_on_critical(event):
65   if event.get('log_level') >= LogLevel.critical:
66     crash(twisted.logger.formatEvent(event))
67
68 #---------- "router" ----------
69
70 def route(packet, saddr, daddr):
71   print('TRACE ', saddr, daddr, packet)
72   try: client = clients[daddr]
73   except KeyError: dclient = None
74   if dclient is not None:
75     dclient.queue_outbound(packet)
76   elif saddr.is_link_local or daddr.is_link_local:
77     log_discard(packet, saddr, daddr, 'link-local')
78   elif daddr == host or daddr not in network:
79     print('TRACE INBOUND ', saddr, daddr, packet)
80     queue_inbound(packet)
81   elif daddr == relay:
82     log_discard(packet, saddr, daddr, 'relay')
83   else:
84     log_discard(packet, saddr, daddr, 'no client')
85
86 def log_discard(packet, saddr, daddr, why):
87   print('DROP ', saddr, daddr, why)
88 #  syslog.syslog(syslog.LOG_DEBUG,
89 #                'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
90
91 #---------- client ----------
92
93 class Client():
94   def __init__(self, ip, cs):
95     # instance data members
96     self._ip = ip
97     self._cs = cs
98     self.pw = cfg.get(cs, 'password')
99     self._rq = collections.deque() # requests
100     self._pq = collections.deque() # packets
101     # plus from config:
102     #  .max_batch_down
103     #  .max_queue_time
104     #  .max_request_time
105     for k in ('max_batch_down','max_queue_time','max_request_time'):
106       req = cfg.getint(cs, k)
107       limit = cfg.getint('limits',k)
108       self.__dict__[k] = min(req, limit)
109
110     def process_arriving_data(self, d):
111       for packet in slip.decode(d):
112         (saddr, daddr) = packet_addrs(packet)
113         if saddr != self._ip:
114           raise ValueError('wrong source address %s' % saddr)
115         route(packet, saddr, daddr)
116
117     def _req_cancel(self, request):
118       request.finish()
119
120     def _req_error(self, err, request):
121       self._req_cancel(request)
122
123     def queue_outbound(self, packet):
124       self._pq.append((time.monotonic(), packet))
125
126     def http_request(self, request):
127       request.setHeader('Content-Type','application/octet-stream')
128       reactor.callLater(self.max_request_time, self._req_cancel, request)
129       request.notifyFinish().addErrback(self._req_error, request)
130       self._rq.append(request)
131       self._check_outbound()
132
133     def _check_outbound(self):
134       while True:
135         try: request = self._rq[0]
136         except IndexError: request = None
137         if request and request.finished:
138           self._rq.popleft()
139           continue
140
141         # now request is an unfinished request, or None
142         try: (queuetime, packet) = self._pq[0]
143         except IndexError:
144           # no packets, oh well
145           break
146
147         age = time.monotonic() - queuetime
148         if age > self.max_queue_time:
149           self._pq.popleft()
150           continue
151
152         if request is None:
153           # no request
154           break
155
156         # request, and also some non-expired packets
157         while True:
158           try: (dummy, packet) = self._pq[0]
159           except IndexError: break
160
161           encoded = slip.encode(packet)
162           
163           if request.sentLength > 0:
164             if (request.sentLength + len(slip.delimiter)
165                 + len(encoded) > self.max_batch_down):
166               break
167             request.write(slip.delimiter)
168
169           request.write(encoded)
170           self._pq.popLeft()
171
172         assert(request.sentLength)
173         self._rq.popLeft()
174         request.finish()
175         # round again, looking for more to do
176
177 class IphttpResource(twisted.web.resource.Resource):
178   isLeaf = True
179   def render_POST(self, request):
180     # find client, update config, etc.
181     ci = ipaddr(request.args['i'])
182     c = clients[ci]
183     pw = request.args['pw']
184     if pw != c.pw: raise ValueError('bad password')
185
186     # update config
187     for r, w in (('mbd', 'max_batch_down'),
188                  ('mqt', 'max_queue_time'),
189                  ('mrt', 'max_request_time')):
190       try: v = request.args[r]
191       except KeyError: continue
192       v = int(v)
193       c.__dict__[w] = v
194
195     try: d = request.args['d']
196     except KeyError: d = ''
197
198     c.process_arriving_data(d)
199     c.new_request(request)
200
201   def render_GET(self, request):
202     return b'<html><body>hippotat</body></html>'
203
204 def start_http():
205   resource = IphttpResource()
206   site = twisted.web.server.Site(resource)
207   for addrspec in cfg.get('server','addrs').split():
208     try:
209       addr = ipaddress.IPv4Address(addrspec)
210       endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
211     except AddressValueError:
212       addr = ipaddress.IPv6Address(addrspec)
213       endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
214     ep = endpointfactory(reactor, cfg.getint('server','port'), addr)
215     crash_on_defer(ep.listen(site))
216
217 #---------- config and setup ----------
218         
219 def process_cfg():
220   global network
221   global host
222   global relay
223   global ipif_command
224
225   network = ipnetwork(cfg.get('virtual','network'))
226   if network.num_addresses < 3 + 2:
227     raise ValueError('network needs at least 2^3 addresses')
228
229   try:
230     host = cfg.get('virtual','host')
231   except NoOptionError:
232     host = next(network.hosts())
233
234   try:
235     relay = cfg.get('virtual','relay')
236   except NoOptionError:
237     for search in network.hosts():
238       if search == host: continue
239       relay = search
240       break
241
242   for cs in cfg.sections():
243     if not (':' in cs or '.' in cs): continue
244     ci = ipaddr(cs)
245     if ci not in network:
246       raise ValueError('client %s not in network' % ci)
247     if ci in clients:
248       raise ValueError('multiple client cfg sections for %s' % ci)
249     clients[ci] = Client(ci, cs)
250
251   global mtu
252   mtu = cfg.get('virtual','mtu')
253
254   iic_vars = { }
255   for k in ('host','relay','mtu','network'):
256     iic_vars[k] = globals()[k]
257
258   ipif_command = cfg.get('server','ipif', vars=iic_vars)
259
260 def startup():
261   global cfg
262
263   op = OptionParser()
264   op.add_option('-c', '--config', dest='configfile',
265                 default='/etc/hippottd/server.conf')
266   global opts
267   (opts, args) = op.parse_args()
268   if len(args): op.error('no non-option arguments please')
269
270   twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
271
272   cfg = ConfigParser()
273   cfg.read_string(defcfg)
274   cfg.read(opts.configfile)
275   process_cfg()
276
277   start_ipif(ipif_command, route)
278   start_http()
279
280 startup()
281 reactor.run()
282 print('CRASHED (end)', file=sys.stderr)