chiark / gitweb /
850137304e1fd9447016d290fd3d90258ada2a81
[hippotat.git] / server
1 #!/usr/bin/python2
2
3 from twisted.web.server import Site
4 from twisted.web.resource import Resource
5 from twisted.web.server import NOT_DONE_YET
6 from twisted.internet import reactor
7
8 import ConfigParser
9 import ipaddress
10
11 import collections
12
13 import syslog
14
15 clients = { }
16
17 def ipaddress(input):
18   try:
19     r = ipaddress.IPv4Address(input)
20   except AddressValueError:
21     r = ipaddress.IPv6Address(input)
22   return r
23
24 def ipnetwork(input):
25   try:
26     r = ipaddress.IPv4Network(input)
27   except NetworkValueError:
28     r = ipaddress.IPv6Network(input)
29   return r
30
31 defcfg = u'''
32 [default]
33 max_batch_down: 65536
34 max_queue_time: 10
35 max_request_time: 54
36
37 [global]
38 max_batch_down: 262144
39 max_queue_time: 121
40 max_request_time: 121
41 '''
42
43 def route(packet. daddr):
44   try: client = clients[daddr]
45   except KeyError: dclient = None
46   if dclient is not None:
47     dclient.queue_outbound(packet)
48   else if daddr = server or daddr not in network:
49     queue_inbound(packet)
50   else:
51     syslog.syslog(syslog.LOG_DEBUG, 'no client for %s' % daddr)
52
53 class Client():
54   def __init__(self, ip, cs):
55     # instance data members
56     self._ip = ip
57     self._cs = cs
58     self.pw = cfg.get(cs, 'password')
59     self._rq = collections.deque() # requests
60     self._pq = collections.deque() # packets
61     # plus from config:
62     #  .max_batch_down
63     #  .max_queue_time
64     #  .max_request_time
65     for k in ('max_batch_down','max_queue_time','max_request_time'):
66       req = cfg.getint(cs, k)
67       limit = cfg.getint('global',k)
68       self.__dict__[k] = min(req, limit)
69
70     def process_arriving_data(self, d):
71       for packet in slip_decode(d):
72         (saddr, daddr) = ip_64_addrs(packet)
73         if saddr != self._ip:
74           raise ValueError('wrong source address %s' % saddr)
75         route(packet, daddr)
76
77     def _req_cancel(self, request):
78       request.finish()
79
80     def _req_error(self, err, request):
81       self._req_cancel(request)
82
83     def queue_outbound(self, packet):
84       self._pq.append((time.time, packet))
85
86     def http_request(self, request):
87       request.setHeader('Content-Type','application/octet-stream')
88       reactor.callLater(self.max_request_time, self._req_cancel, request)
89       request.notifyFinish().addErrback(self._req_error, request)
90       self._rq.append(request)
91       self._check_outbound()
92
93     def _check_outbound(self):
94       while True:
95         try: request = self._rq[0]
96         except IndexError: request = None
97         if request and request.finished:
98           self._rq.popleft()
99           continue
100
101         # now request is an unfinished request, or None
102         try: (queuetime, packet) = self._pq[0]
103         except: IndexError:
104           # no packets, oh well
105           continue
106
107         age = time.time() - queuetime
108
109 def process_cfg():
110   global network
111   global ourself
112
113   network = ipnetwork(cfg.get('virtual','network'))
114   try:
115     ourself = cfg.get('virtual','server')
116   except ConfigParser.NoOptionError:
117     ourself = network.hosts().next()
118
119   for cs in cfg.sections():
120     if not (':' in cs or '.' in cs): continue
121     ci = ipaddress(cs)
122     if ci not in network:
123       raise ValueError('client %s not in network' % ci)
124     if ci in clients:
125       raise ValueError('multiple client cfg sections for %s' % ci)
126     clients[ci] = Client(ci, cs)
127
128 class FormPage(Resource):
129   def render_POST(self, request):
130     # find client, update config, etc.
131     ci = ipaddress(request.args['i'])
132     c = clients[ci]
133     pw = request.args['pw']
134     if pw != c.pw: raise ValueError('bad password')
135
136     # update config
137     for r, w in (('mbd', 'max_batch_down'),
138                  ('mqt', 'max_queue_time'),
139                  ('mrt', 'max_request_time')):
140       try: v = request.args[r]
141       except KeyError: continue
142       v = int(v)
143       c.__dict__[w] = v
144
145     try: d = request.args['d']
146     except KeyError: d = ''
147
148     c.process_arriving_data(d)
149     c.new_request(request)