chiark / gitweb /
wip client
[hippotat] / server
1 #!/usr/bin/python3
2
3 from hippotat import *
4
5 import sys
6 import os
7
8 import twisted.internet
9 import twisted.internet.endpoints
10 from twisted.web.server import NOT_DONE_YET
11
12 #import twisted.web.server import Site
13 #from twisted.web.resource import Resource
14
15 import syslog
16
17 clients = { }
18
19 defcfg = '''
20 [DEFAULT]
21 max_batch_down = 65536
22 max_queue_time = 10
23 max_request_time = 54
24 target_requests_outstanding = 3
25
26 [virtual]
27 mtu = 1500
28 # network
29 # [host]
30 # [relay]
31
32 [server]
33 ipif = userv root ipif %(host)s,%(relay)s,%(mtu)s,slip %(network)s
34 addrs = 127.0.0.1 ::1
35 port = 8099
36
37 [limits]
38 max_batch_down = 262144
39 max_queue_time = 121
40 max_request_time = 121
41 target_requests_outstanding = 10
42 '''
43
44 #---------- "router" ----------
45
46 def route(packet, saddr, daddr):
47   print('TRACE ', saddr, daddr, packet)
48   try: client = clients[daddr]
49   except KeyError: dclient = None
50   if dclient is not None:
51     dclient.queue_outbound(packet)
52   elif saddr.is_link_local or daddr.is_link_local:
53     log_discard(packet, saddr, daddr, 'link-local')
54   elif daddr == host or daddr not in network:
55     print('TRACE INBOUND ', saddr, daddr, packet)
56     queue_inbound(packet)
57   elif daddr == relay:
58     log_discard(packet, saddr, daddr, 'relay')
59   else:
60     log_discard(packet, saddr, daddr, 'no client')
61
62 def log_discard(packet, saddr, daddr, why):
63   print('DROP ', saddr, daddr, why)
64 #  syslog.syslog(syslog.LOG_DEBUG,
65 #                'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
66
67 #---------- client ----------
68
69 class Client():
70   def __init__(self, ip, cs):
71     # instance data members
72     self._ip = ip
73     self._cs = cs
74     self.pw = cfg.get(cs, 'password')
75     self._rq = collections.deque() # requests
76     # self._pq = PacketQueue(...)
77     # plus from config:
78     #  .max_batch_down
79     #  .max_queue_time
80     #  .max_request_time
81     #  .target_requests_outstanding
82     for k in ('max_batch_down','max_queue_time','max_request_time',
83               'target_requests_outstanding'):
84       req = cfg.getint(cs, k)
85       limit = cfg.getint('limits',k)
86       self.__dict__[k] = min(req, limit)
87     self._pq = PacketQueue(self.max_queue_time)
88
89     def process_arriving_data(self, d):
90       for packet in slip.decode(d):
91         (saddr, daddr) = packet_addrs(packet)
92         if saddr != self._ip:
93           raise ValueError('wrong source address %s' % saddr)
94         route(packet, saddr, daddr)
95
96     def _req_cancel(self, request):
97       request.finish()
98
99     def _req_error(self, err, request):
100       self._req_cancel(request)
101
102     def queue_outbound(self, packet):
103       self._pq.append(packet)
104
105     def http_request(self, request):
106       request.setHeader('Content-Type','application/octet-stream')
107       reactor.callLater(self.max_request_time, self._req_cancel, request)
108       request.notifyFinish().addErrback(self._req_error, request)
109       self._rq.append(request)
110       self._check_outbound()
111
112     def _check_outbound(self):
113       while True:
114         try: request = self._rq[0]
115         except IndexError: request = None
116         if request and request.finished:
117           self._rq.popleft()
118           continue
119
120         if not self._pq.nonempty():
121           # no packets, oh well
122           continue
123
124         if request is None:
125           # no request
126           break
127
128         # request, and also some non-expired packets
129         while True:
130           packet = self.pq.popleft()
131           if packet is None: break
132
133           encoded = slip.encode(packet)
134           
135           if request.sentLength > 0:
136             if (request.sentLength + len(slip.delimiter)
137                 + len(encoded) > self.max_batch_down):
138               break
139             request.write(slip.delimiter)
140
141           request.write(encoded)
142           self._pq.popLeft()
143
144         assert(request.sentLength)
145         self._rq.popLeft()
146         request.finish()
147         # round again, looking for more to do
148
149       while len(self._rq) > self.target_requests_outstanding:
150         request = self._rq.popleft()
151         request.finish()
152
153 class IphttpResource(twisted.web.resource.Resource):
154   isLeaf = True
155   def render_POST(self, request):
156     # find client, update config, etc.
157     ci = ipaddr(request.args['i'])
158     c = clients[ci]
159     pw = request.args['pw']
160     if pw != c.pw: raise ValueError('bad password')
161
162     # update config
163     for r, w in (('mbd', 'max_batch_down'),
164                  ('mqt', 'max_queue_time'),
165                  ('mrt', 'max_request_time'),
166                  ('tro', 'target_requests_outstanding')):
167       try: v = request.args[r]
168       except KeyError: continue
169       v = int(v)
170       c.__dict__[w] = v
171
172     try: d = request.args['d']
173     except KeyError: d = ''
174
175     c.process_arriving_data(d)
176     c.new_request(request)
177
178   def render_GET(self, request):
179     return b'<html><body>hippotat</body></html>'
180
181 def start_http():
182   resource = IphttpResource()
183   site = twisted.web.server.Site(resource)
184   for addrspec in cfg.get('server','addrs').split():
185     try:
186       addr = ipaddress.IPv4Address(addrspec)
187       endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
188     except AddressValueError:
189       addr = ipaddress.IPv6Address(addrspec)
190       endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
191     ep = endpointfactory(reactor, cfg.getint('server','port'), addr)
192     crash_on_defer(ep.listen(site))
193
194 #---------- config and setup ----------
195         
196 def process_cfg():
197   global network
198   global host
199   global relay
200   global ipif_command
201
202   network = ipnetwork(cfg.get('virtual','network'))
203   if network.num_addresses < 3 + 2:
204     raise ValueError('network needs at least 2^3 addresses')
205
206   try:
207     host = cfg.get('virtual','host')
208   except NoOptionError:
209     host = next(network.hosts())
210
211   try:
212     relay = cfg.get('virtual','relay')
213   except NoOptionError:
214     for search in network.hosts():
215       if search == host: continue
216       relay = search
217       break
218
219   for cs in cfg.sections():
220     if not (':' in cs or '.' in cs): continue
221     ci = ipaddr(cs)
222     if ci not in network:
223       raise ValueError('client %s not in network' % ci)
224     if ci in clients:
225       raise ValueError('multiple client cfg sections for %s' % ci)
226     clients[ci] = Client(ci, cs)
227
228   global mtu
229   mtu = cfg.get('virtual','mtu')
230
231   iic_vars = { }
232   for k in ('host','relay','mtu','network'):
233     iic_vars[k] = globals()[k]
234
235   ipif_command = cfg.get('server','ipif', vars=iic_vars)
236
237 def startup():
238   common_startup(defcfg)
239   process_cfg()
240   start_ipif(ipif_command, route)
241   start_http()
242
243 startup()
244 common_run()