chiark / gitweb /
wip
[hippotat] / server
... / ...
CommitLineData
1#!/usr/bin/python3
2
3import twisted.web.server import Site
4from twisted.web.resource import Resource
5from twisted.web.server import NOT_DONE_YET
6from twisted.internet import reactor
7
8from optparse import OptionParser
9from configparser import ConfigParser
10from configparser import NoOptionError
11import ipaddress
12
13import collections
14
15import syslog
16
17clients = { }
18
19def ipaddress(input):
20 try:
21 r = ipaddress.IPv4Address(input)
22 except AddressValueError:
23 r = ipaddress.IPv6Address(input)
24 return r
25
26def ipnetwork(input):
27 try:
28 r = ipaddress.IPv4Network(input)
29 except NetworkValueError:
30 r = ipaddress.IPv6Network(input)
31 return r
32
33defcfg = '''
34[DEFAULT]
35max_batch_down = 65536
36max_queue_time = 10
37max_request_time = 54
38
39[virtual]
40mtu = 1500
41# network
42# [host]
43# [relay]
44
45[server]
46ipif_program = userv root ipif %(host),%(relay),%(mtu),slip %(network)
47
48[limits]
49max_batch_down = 262144
50max_queue_time = 121
51max_request_time = 121
52'''
53
54def route(packet. daddr):
55 try: client = clients[daddr]
56 except KeyError: dclient = None
57 if dclient is not None:
58 dclient.queue_outbound(packet)
59 else if daddr = server or daddr not in network:
60 queue_inbound(packet)
61 else:
62 syslog.syslog(syslog.LOG_DEBUG, 'no client for %s' % daddr)
63
64class Client():
65 def __init__(self, ip, cs):
66 # instance data members
67 self._ip = ip
68 self._cs = cs
69 self.pw = cfg.get(cs, 'password')
70 self._rq = collections.deque() # requests
71 self._pq = collections.deque() # packets
72 # plus from config:
73 # .max_batch_down
74 # .max_queue_time
75 # .max_request_time
76 for k in ('max_batch_down','max_queue_time','max_request_time'):
77 req = cfg.getint(cs, k)
78 limit = cfg.getint('limits',k)
79 self.__dict__[k] = min(req, limit)
80
81 def process_arriving_data(self, d):
82 for packet in slip_decode(d):
83 (saddr, daddr) = ip_64_addrs(packet)
84 if saddr != self._ip:
85 raise ValueError('wrong source address %s' % saddr)
86 route(packet, daddr)
87
88 def _req_cancel(self, request):
89 request.finish()
90
91 def _req_error(self, err, request):
92 self._req_cancel(request)
93
94 def queue_outbound(self, packet):
95 self._pq.append((time.monotonic(), packet))
96
97 def http_request(self, request):
98 request.setHeader('Content-Type','application/octet-stream')
99 reactor.callLater(self.max_request_time, self._req_cancel, request)
100 request.notifyFinish().addErrback(self._req_error, request)
101 self._rq.append(request)
102 self._check_outbound()
103
104 def _check_outbound(self):
105 while True:
106 try: request = self._rq[0]
107 except IndexError: request = None
108 if request and request.finished:
109 self._rq.popleft()
110 continue
111
112 # now request is an unfinished request, or None
113 try: (queuetime, packet) = self._pq[0]
114 except: IndexError:
115 # no packets, oh well
116 break
117
118 age = time.monotonic() - queuetime
119 if age > self.max_queue_time:
120 self._pq.popleft()
121 continue
122
123 if request is None:
124 # no request
125 break
126
127 # request, and also some non-expired packets
128 while True:
129 try: (dummy, packet) = self._pq[0]
130 except IndexError: break
131
132 encoded = slip_encode(packet)
133
134 if request.sentLength > 0:
135 if (request.sentLength + len(slip_delimiter)
136 + len(encoded) > self.max_batch_down):
137 break
138 request.write(slip_delimiter)
139
140 request.write(encoded)
141 self._pq.popLeft()
142
143 assert(request.sentLength)
144 self._rq.popLeft()
145 request.finish()
146 # round again, looking for more to do
147
148def process_cfg():
149 global network
150 global host
151 global relay
152
153 network = ipnetwork(cfg.get('virtual','network'))
154 if network.num_addresses < 3 + 2:
155 raise ValueError('network needs at least 2^3 addresses')
156
157 try:
158 host = cfg.get('virtual','host')
159 except NoOptionError:
160 host = network.hosts().next()
161
162 try:
163 relay = cfg.get('virtual','relay')
164 except OptionError:
165 for search in network.hosts():
166 if search = host: continue
167 relay = search
168 break
169
170 for cs in cfg.sections():
171 if not (':' in cs or '.' in cs): continue
172 ci = ipaddress(cs)
173 if ci not in network:
174 raise ValueError('client %s not in network' % ci)
175 if ci in clients:
176 raise ValueError('multiple client cfg sections for %s' % ci)
177 clients[ci] = Client(ci, cs)
178
179class FormPage(Resource):
180 def render_POST(self, request):
181 # find client, update config, etc.
182 ci = ipaddress(request.args['i'])
183 c = clients[ci]
184 pw = request.args['pw']
185 if pw != c.pw: raise ValueError('bad password')
186
187 # update config
188 for r, w in (('mbd', 'max_batch_down'),
189 ('mqt', 'max_queue_time'),
190 ('mrt', 'max_request_time')):
191 try: v = request.args[r]
192 except KeyError: continue
193 v = int(v)
194 c.__dict__[w] = v
195
196 try: d = request.args['d']
197 except KeyError: d = ''
198
199 c.process_arriving_data(d)
200 c.new_request(request)
201
202def startup():
203 op = OptionParser()
204 op.add_option('-c', '--config', dest='configfile',
205 default='/etc/hippottd/server.conf')
206 global opts
207 (opts, args) = op.parse_args()
208 if len(args): op.error('no non-option arguments please')
209
210 cfg = ConfigParser()
211