chiark / gitweb /
29321e9435bab3f8deff72824246a0ff849f24de
[hippotat.git] / server
1 #!/usr/bin/python3
2
3 from hippotat import *
4
5 import sys
6 import os
7
8 import twisted.internet
9 import twisted.internet.endpoints
10 from twisted.web.server import NOT_DONE_YET
11 from twisted.logger import LogLevel
12
13 #import twisted.web.server import Site
14 #from twisted.web.resource import Resource
15
16 from optparse import OptionParser
17 from configparser import ConfigParser
18 from configparser import NoOptionError
19
20 import collections
21
22 import syslog
23
24 clients = { }
25
26 defcfg = '''
27 [DEFAULT]
28 max_batch_down = 65536
29 max_queue_time = 10
30 max_request_time = 54
31 target_requests_outstanding = 3
32
33 [virtual]
34 mtu = 1500
35 # network
36 # [host]
37 # [relay]
38
39 [server]
40 ipif = userv root ipif %(host)s,%(relay)s,%(mtu)s,slip %(network)s
41 addrs = 127.0.0.1 ::1
42 port = 8099
43
44 [limits]
45 max_batch_down = 262144
46 max_queue_time = 121
47 max_request_time = 121
48 target_requests_outstanding = 10
49 '''
50
51 #---------- error handling ----------
52
53 def crash(err):
54   print('CRASH ', err, file=sys.stderr)
55   try: reactor.stop()
56   except twisted.internet.error.ReactorNotRunning: pass
57
58 def crash_on_defer(defer):
59   defer.addErrback(lambda err: crash(err))
60
61 def crash_on_critical(event):
62   if event.get('log_level') >= LogLevel.critical:
63     crash(twisted.logger.formatEvent(event))
64
65 #---------- "router" ----------
66
67 def route(packet, saddr, daddr):
68   print('TRACE ', saddr, daddr, packet)
69   try: client = clients[daddr]
70   except KeyError: dclient = None
71   if dclient is not None:
72     dclient.queue_outbound(packet)
73   elif saddr.is_link_local or daddr.is_link_local:
74     log_discard(packet, saddr, daddr, 'link-local')
75   elif daddr == host or daddr not in network:
76     print('TRACE INBOUND ', saddr, daddr, packet)
77     queue_inbound(packet)
78   elif daddr == relay:
79     log_discard(packet, saddr, daddr, 'relay')
80   else:
81     log_discard(packet, saddr, daddr, 'no client')
82
83 def log_discard(packet, saddr, daddr, why):
84   print('DROP ', saddr, daddr, why)
85 #  syslog.syslog(syslog.LOG_DEBUG,
86 #                'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
87
88 #---------- client ----------
89
90 class Client():
91   def __init__(self, ip, cs):
92     # instance data members
93     self._ip = ip
94     self._cs = cs
95     self.pw = cfg.get(cs, 'password')
96     self._rq = collections.deque() # requests
97     # self._pq = PacketQueue(...)
98     # plus from config:
99     #  .max_batch_down
100     #  .max_queue_time
101     #  .max_request_time
102     #  .target_requests_outstanding
103     for k in ('max_batch_down','max_queue_time','max_request_time',
104               'target_requests_outstanding'):
105       req = cfg.getint(cs, k)
106       limit = cfg.getint('limits',k)
107       self.__dict__[k] = min(req, limit)
108     self._pq = PacketQueue(self.max_queue_time)
109
110     def process_arriving_data(self, d):
111       for packet in slip.decode(d):
112         (saddr, daddr) = packet_addrs(packet)
113         if saddr != self._ip:
114           raise ValueError('wrong source address %s' % saddr)
115         route(packet, saddr, daddr)
116
117     def _req_cancel(self, request):
118       request.finish()
119
120     def _req_error(self, err, request):
121       self._req_cancel(request)
122
123     def queue_outbound(self, packet):
124       self._pq.append(packet)
125
126     def http_request(self, request):
127       request.setHeader('Content-Type','application/octet-stream')
128       reactor.callLater(self.max_request_time, self._req_cancel, request)
129       request.notifyFinish().addErrback(self._req_error, request)
130       self._rq.append(request)
131       self._check_outbound()
132
133     def _check_outbound(self):
134       while True:
135         try: request = self._rq[0]
136         except IndexError: request = None
137         if request and request.finished:
138           self._rq.popleft()
139           continue
140
141         if not self._pq.nonempty():
142           # no packets, oh well
143           continue
144
145         if request is None:
146           # no request
147           break
148
149         # request, and also some non-expired packets
150         while True:
151           packet = self.pq.popleft()
152           if packet is None: break
153
154           encoded = slip.encode(packet)
155           
156           if request.sentLength > 0:
157             if (request.sentLength + len(slip.delimiter)
158                 + len(encoded) > self.max_batch_down):
159               break
160             request.write(slip.delimiter)
161
162           request.write(encoded)
163           self._pq.popLeft()
164
165         assert(request.sentLength)
166         self._rq.popLeft()
167         request.finish()
168         # round again, looking for more to do
169
170       while len(self._rq) > self.target_requests_outstanding:
171         request = self._rq.popleft()
172         request.finish()
173
174 class IphttpResource(twisted.web.resource.Resource):
175   isLeaf = True
176   def render_POST(self, request):
177     # find client, update config, etc.
178     ci = ipaddr(request.args['i'])
179     c = clients[ci]
180     pw = request.args['pw']
181     if pw != c.pw: raise ValueError('bad password')
182
183     # update config
184     for r, w in (('mbd', 'max_batch_down'),
185                  ('mqt', 'max_queue_time'),
186                  ('mrt', 'max_request_time'),
187                  ('tro', 'target_requests_outstanding')):
188       try: v = request.args[r]
189       except KeyError: continue
190       v = int(v)
191       c.__dict__[w] = v
192
193     try: d = request.args['d']
194     except KeyError: d = ''
195
196     c.process_arriving_data(d)
197     c.new_request(request)
198
199   def render_GET(self, request):
200     return b'<html><body>hippotat</body></html>'
201
202 def start_http():
203   resource = IphttpResource()
204   site = twisted.web.server.Site(resource)
205   for addrspec in cfg.get('server','addrs').split():
206     try:
207       addr = ipaddress.IPv4Address(addrspec)
208       endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
209     except AddressValueError:
210       addr = ipaddress.IPv6Address(addrspec)
211       endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
212     ep = endpointfactory(reactor, cfg.getint('server','port'), addr)
213     crash_on_defer(ep.listen(site))
214
215 #---------- config and setup ----------
216         
217 def process_cfg():
218   global network
219   global host
220   global relay
221   global ipif_command
222
223   network = ipnetwork(cfg.get('virtual','network'))
224   if network.num_addresses < 3 + 2:
225     raise ValueError('network needs at least 2^3 addresses')
226
227   try:
228     host = cfg.get('virtual','host')
229   except NoOptionError:
230     host = next(network.hosts())
231
232   try:
233     relay = cfg.get('virtual','relay')
234   except NoOptionError:
235     for search in network.hosts():
236       if search == host: continue
237       relay = search
238       break
239
240   for cs in cfg.sections():
241     if not (':' in cs or '.' in cs): continue
242     ci = ipaddr(cs)
243     if ci not in network:
244       raise ValueError('client %s not in network' % ci)
245     if ci in clients:
246       raise ValueError('multiple client cfg sections for %s' % ci)
247     clients[ci] = Client(ci, cs)
248
249   global mtu
250   mtu = cfg.get('virtual','mtu')
251
252   iic_vars = { }
253   for k in ('host','relay','mtu','network'):
254     iic_vars[k] = globals()[k]
255
256   ipif_command = cfg.get('server','ipif', vars=iic_vars)
257
258 def startup():
259   global cfg
260
261   op = OptionParser()
262   op.add_option('-c', '--config', dest='configfile',
263                 default='/etc/hippottd/server.conf')
264   global opts
265   (opts, args) = op.parse_args()
266   if len(args): op.error('no non-option arguments please')
267
268   twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
269
270   cfg = ConfigParser()
271   cfg.read_string(defcfg)
272   cfg.read(opts.configfile)
273   process_cfg()
274
275   start_ipif(ipif_command, route)
276   start_http()
277
278 startup()
279 reactor.run()
280 print('CRASHED (end)', file=sys.stderr)