chiark / gitweb /
wip, python3
[hippotat.git] / server
1 #!/usr/bin/python3
2
3 import twisted.web.server import Site
4 from twisted.web.resource import Resource
5 from twisted.web.server import NOT_DONE_YET
6 from twisted.internet import reactor
7
8 import configparser
9 import ipaddress
10
11 import collections
12
13 import syslog
14
15 clients = { }
16
17 def ipaddress(input):
18   try:
19     r = ipaddress.IPv4Address(input)
20   except AddressValueError:
21     r = ipaddress.IPv6Address(input)
22   return r
23
24 def ipnetwork(input):
25   try:
26     r = ipaddress.IPv4Network(input)
27   except NetworkValueError:
28     r = ipaddress.IPv6Network(input)
29   return r
30
31 defcfg = u'''
32 [DEFAULT]
33 max_batch_down = 65536
34 max_queue_time = 10
35 max_request_time = 54
36
37 [limits]
38 max_batch_down = 262144
39 max_queue_time = 121
40 max_request_time = 121
41 '''
42
43 def route(packet. daddr):
44   try: client = clients[daddr]
45   except KeyError: dclient = None
46   if dclient is not None:
47     dclient.queue_outbound(packet)
48   else if daddr = server or daddr not in network:
49     queue_inbound(packet)
50   else:
51     syslog.syslog(syslog.LOG_DEBUG, 'no client for %s' % daddr)
52
53 class Client():
54   def __init__(self, ip, cs):
55     # instance data members
56     self._ip = ip
57     self._cs = cs
58     self.pw = cfg.get(cs, 'password')
59     self._rq = collections.deque() # requests
60     self._pq = collections.deque() # packets
61     # plus from config:
62     #  .max_batch_down
63     #  .max_queue_time
64     #  .max_request_time
65     for k in ('max_batch_down','max_queue_time','max_request_time'):
66       req = cfg.getint(cs, k)
67       limit = cfg.getint('limits',k)
68       self.__dict__[k] = min(req, limit)
69
70     def process_arriving_data(self, d):
71       for packet in slip_decode(d):
72         (saddr, daddr) = ip_64_addrs(packet)
73         if saddr != self._ip:
74           raise ValueError('wrong source address %s' % saddr)
75         route(packet, daddr)
76
77     def _req_cancel(self, request):
78       request.finish()
79
80     def _req_error(self, err, request):
81       self._req_cancel(request)
82
83     def queue_outbound(self, packet):
84       self._pq.append((time.monotonic(), packet))
85
86     def http_request(self, request):
87       request.setHeader('Content-Type','application/octet-stream')
88       reactor.callLater(self.max_request_time, self._req_cancel, request)
89       request.notifyFinish().addErrback(self._req_error, request)
90       self._rq.append(request)
91       self._check_outbound()
92
93     def _check_outbound(self):
94       while True:
95         try: request = self._rq[0]
96         except IndexError: request = None
97         if request and request.finished:
98           self._rq.popleft()
99           continue
100
101         # now request is an unfinished request, or None
102         try: (queuetime, packet) = self._pq[0]
103         except: IndexError:
104           # no packets, oh well
105           break
106
107         age = time.monotonic() - queuetime
108         if age > self.max_queue_time:
109           self._pq.popleft()
110           continue
111
112         if request is None:
113           # no request
114           break
115
116         # request, and also some non-expired packets
117         while True:
118           try: (dummy, packet) = self._pq[0]
119           except IndexError: break
120
121           encoded = slip_encode(packet)
122           
123           if request.sentLength > 0:
124             if (request.sentLength + len(slip_delimiter)
125                 + len(encoded) > self.max_batch_down):
126               break
127             request.write(slip_delimiter)
128
129           request.write(encoded)
130           self._pq.popLeft()
131
132         assert(request.sentLength)
133         self._rq.popLeft()
134         request.finish()
135         # round again, looking for more to do
136
137 def process_cfg():
138   global network
139   global ourself
140
141   network = ipnetwork(cfg.get('virtual','network'))
142   try:
143     ourself = cfg.get('virtual','server')
144   except ConfigParser.NoOptionError:
145     ourself = network.hosts().next()
146
147   for cs in cfg.sections():
148     if not (':' in cs or '.' in cs): continue
149     ci = ipaddress(cs)
150     if ci not in network:
151       raise ValueError('client %s not in network' % ci)
152     if ci in clients:
153       raise ValueError('multiple client cfg sections for %s' % ci)
154     clients[ci] = Client(ci, cs)
155
156 class FormPage(Resource):
157   def render_POST(self, request):
158     # find client, update config, etc.
159     ci = ipaddress(request.args['i'])
160     c = clients[ci]
161     pw = request.args['pw']
162     if pw != c.pw: raise ValueError('bad password')
163
164     # update config
165     for r, w in (('mbd', 'max_batch_down'),
166                  ('mqt', 'max_queue_time'),
167                  ('mrt', 'max_request_time')):
168       try: v = request.args[r]
169       except KeyError: continue
170       v = int(v)
171       c.__dict__[w] = v
172
173     try: d = request.args['d']
174     except KeyError: d = ''
175
176     c.process_arriving_data(d)
177     c.new_request(request)