chiark / gitweb /
a139e6e240d3d19e24847c54ff77c1698bc52459
[hippotat.git] / server
1 #!/usr/bin/python3
2
3 import twisted
4
5 import twisted.web.server import Site
6 from twisted.web.resource import Resource
7 from twisted.web.server import NOT_DONE_YET
8 from twisted.internet import reactor
9
10 from optparse import OptionParser
11 from configparser import ConfigParser
12 from configparser import NoOptionError
13 import ipaddress
14
15 import collections
16
17 import syslog
18
19 clients = { }
20
21 def ipaddress(input):
22   try:
23     r = ipaddress.IPv4Address(input)
24   except AddressValueError:
25     r = ipaddress.IPv6Address(input)
26   return r
27
28 def ipnetwork(input):
29   try:
30     r = ipaddress.IPv4Network(input)
31   except NetworkValueError:
32     r = ipaddress.IPv6Network(input)
33   return r
34
35 defcfg = '''
36 [DEFAULT]
37 max_batch_down = 65536
38 max_queue_time = 10
39 max_request_time = 54
40
41 [virtual]
42 mtu = 1500
43 # network
44 # [host]
45 # [relay]
46
47 [server]
48 ipif = userv root ipif %(host),%(relay),%(mtu),slip %(network)
49
50 [limits]
51 max_batch_down = 262144
52 max_queue_time = 121
53 max_request_time = 121
54 '''
55
56 class IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
57   def __init__(self):
58     self._buffer = b''
59   def connectionMade(self): pass
60   def outReceived(self, data):
61     buffer += data
62     packets = slip_decode(buffer)
63     buffer = packets.pop()
64     for packet in packets:
65       (saddr, daddr) = packet_addrs(packet)
66       route(packet, daddr)
67
68 def start_ipif():
69   reactor.spawnProcess(IpifProcessProtocol(),
70                        '/bin/sh',['-c', ipif_command],
71                        childFDs={0:'w', 1:'r', 2:2})
72
73 def log_discard(packet, saddr, daddr, why):
74   syslog.syslog(syslog.LOG_DEBUG,
75                 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
76
77 def route(packet. daddr):
78   try: client = clients[daddr]
79   except KeyError: dclient = None
80   if dclient is not None:
81     dclient.queue_outbound(packet)
82   else if daddr = host or daddr not in network:
83     queue_inbound(packet)
84   else if daddr = relay:
85     log_discard(packet, saddr, daddr, 'relay')
86   else:
87     log_discard(packet, saddr, daddr, 'no client')
88
89 class Client():
90   def __init__(self, ip, cs):
91     # instance data members
92     self._ip = ip
93     self._cs = cs
94     self.pw = cfg.get(cs, 'password')
95     self._rq = collections.deque() # requests
96     self._pq = collections.deque() # packets
97     # plus from config:
98     #  .max_batch_down
99     #  .max_queue_time
100     #  .max_request_time
101     for k in ('max_batch_down','max_queue_time','max_request_time'):
102       req = cfg.getint(cs, k)
103       limit = cfg.getint('limits',k)
104       self.__dict__[k] = min(req, limit)
105
106     def process_arriving_data(self, d):
107       for packet in slip_decode(d):
108         (saddr, daddr) = packet_addrs(packet)
109         if saddr != self._ip:
110           raise ValueError('wrong source address %s' % saddr)
111         route(packet, daddr)
112
113     def _req_cancel(self, request):
114       request.finish()
115
116     def _req_error(self, err, request):
117       self._req_cancel(request)
118
119     def queue_outbound(self, packet):
120       self._pq.append((time.monotonic(), packet))
121
122     def http_request(self, request):
123       request.setHeader('Content-Type','application/octet-stream')
124       reactor.callLater(self.max_request_time, self._req_cancel, request)
125       request.notifyFinish().addErrback(self._req_error, request)
126       self._rq.append(request)
127       self._check_outbound()
128
129     def _check_outbound(self):
130       while True:
131         try: request = self._rq[0]
132         except IndexError: request = None
133         if request and request.finished:
134           self._rq.popleft()
135           continue
136
137         # now request is an unfinished request, or None
138         try: (queuetime, packet) = self._pq[0]
139         except: IndexError:
140           # no packets, oh well
141           break
142
143         age = time.monotonic() - queuetime
144         if age > self.max_queue_time:
145           self._pq.popleft()
146           continue
147
148         if request is None:
149           # no request
150           break
151
152         # request, and also some non-expired packets
153         while True:
154           try: (dummy, packet) = self._pq[0]
155           except IndexError: break
156
157           encoded = slip_encode(packet)
158           
159           if request.sentLength > 0:
160             if (request.sentLength + len(slip_delimiter)
161                 + len(encoded) > self.max_batch_down):
162               break
163             request.write(slip_delimiter)
164
165           request.write(encoded)
166           self._pq.popLeft()
167
168         assert(request.sentLength)
169         self._rq.popLeft()
170         request.finish()
171         # round again, looking for more to do
172
173 def process_cfg():
174   global network
175   global host
176   global relay
177   global ipif_command
178
179   network = ipnetwork(cfg.get('virtual','network'))
180   if network.num_addresses < 3 + 2:
181     raise ValueError('network needs at least 2^3 addresses')
182
183   try:
184     host = cfg.get('virtual','host')
185   except NoOptionError:
186     host = network.hosts().next()
187
188   try:
189     relay = cfg.get('virtual','relay')
190   except OptionError:
191     for search in network.hosts():
192       if search = host: continue
193       relay = search
194       break
195
196   for cs in cfg.sections():
197     if not (':' in cs or '.' in cs): continue
198     ci = ipaddress(cs)
199     if ci not in network:
200       raise ValueError('client %s not in network' % ci)
201     if ci in clients:
202       raise ValueError('multiple client cfg sections for %s' % ci)
203     clients[ci] = Client(ci, cs)
204
205   iic_vars = { }
206   for k in ('host','relay','mtu','network'):
207     iic_vars[k] = globals()[k]
208
209   ipif_command = cfg.get('server','ipif', vars=iic_vars)
210
211 class FormPage(Resource):
212   def render_POST(self, request):
213     # find client, update config, etc.
214     ci = ipaddress(request.args['i'])
215     c = clients[ci]
216     pw = request.args['pw']
217     if pw != c.pw: raise ValueError('bad password')
218
219     # update config
220     for r, w in (('mbd', 'max_batch_down'),
221                  ('mqt', 'max_queue_time'),
222                  ('mrt', 'max_request_time')):
223       try: v = request.args[r]
224       except KeyError: continue
225       v = int(v)
226       c.__dict__[w] = v
227
228     try: d = request.args['d']
229     except KeyError: d = ''
230
231     c.process_arriving_data(d)
232     c.new_request(request)
233
234 def startup():
235   op = OptionParser()
236   op.add_option('-c', '--config', dest='configfile',
237                 default='/etc/hippottd/server.conf')
238   global opts
239   (opts, args) = op.parse_args()
240   if len(args): op.error('no non-option arguments please')
241
242   cfg = ConfigParser()
243   cfg.read_string(defcfg)
244   cfg.read_file(opts['configfile'])
245   process_cfg()
246
247   start_ipif()
248   start_http()