chiark / gitweb /
wip
[hippotat.git] / client
1 #!/usr/bin/python3
2
3 from hippotat import *
4
5 import twisted.web
6 import twisted.web.client
7
8 client_cs = None
9
10 def set_client(ci,cs,pw):
11   global client_cs
12   global password
13   assert(client_cs is None)
14   client_cs = cs
15   c.client = ci
16   c.max_outstanding = cfg.getint(cs, 'max_requests_outstanding')
17   c.target_outstanding = cfg.getint(cs, 'target_requests_outstanding')
18   password = pw
19
20 def process_cfg():
21   global url
22   global max_requests_outstanding
23
24   process_cfg_common_always()
25   process_cfg_server()
26
27   try:
28     c.url = cfg.get('server','url')
29   except NoOptionError:
30     process_cfg_saddrs()
31     sa = c.saddrs[0].url()
32
33   process_cfg_clients(set_client)
34
35   c.routes = cfg.get('virtual','routes')
36   c.max_queue_time = cfg.getint(client_cs, 'max_queue_time')
37   c.max_batch_up   = cfg.getint(client_cs, 'max_batch_up')
38
39   process_cfg_ipif(client_cs,
40                    (('local', 'client'),
41                     ('peer',  'server'),
42                     ('rnets', 'routes')))
43
44 outstanding = 0
45
46 def start_client():
47   global queue
48   global agent
49   queue = PacketQueue(c.max_queue_time)
50   agent = twisted.web.client.Agent(reactor, connectTimeout = c.http_timeout)
51
52 def outbound(packet, saddr, daddr):
53   #print('OUT ', saddr, daddr, repr(packet))
54   queue.append(packet)
55   check_outbound()
56
57 class ResponseConsumer(twisted.internet.protocol.Protocol):
58   def __init__(self):
59     self._ssd = SlipStreamDecoder(queue_inbound)
60   def dataReceived(self, data):
61     self._ssd.inputdata(mime_translate(data))
62   def connectionMade(self): pass
63   def connectionLost(self, reason):
64     if isinstance(reason, twisted.internet.error.ConnectionDone):
65       self._ssd.flush()
66     else:
67       print(reason, file=sys.stderr)
68
69 def req_ok(resp):
70   resp.deliverBody(ResponseConsumer())
71
72 def req_err(err):
73   print(err, file=sys.stderr)
74
75 def req_fin(*args):  
76   outstanding -= 1
77
78 def check_outbound():
79   while True:
80     if                          outstanding >= c.max_outstanding   : break
81     if not queue.nonempty() and outstanding >= c.target_outstanding: break
82
83     d = b''
84     def moredata(s): global d; d += s
85     queue.process((lambda: len(d)),
86                   moredata,
87                   c.max_batch_up)
88     assert(len(d))
89     
90     crlf = b'\r\n'
91     mime = (b'--b'                                      + crlf +
92             b'Content-Disposition: form-data; name="m"' + crlf +
93             password                                    + crlf +
94             c.client                                    + crlf +
95             c.target_outstanding                        + crlf +
96             b'--b'                                      + crlf +
97             b'Content-Disposition: form-data; name="d"' + crlf +
98             mime_translate(d)                           + crlf +
99             b'--b--'                                    + crlf)
100
101     hh = { 'User-Agent': ['hippotat'],
102            'Content-Type': ['multipart/form-data; boundary="b"'] }
103     req = agent.request(b'POST',
104                         c.url,
105                         twisted.web.client.Headers(hh))
106     req.addTimeout(c.http_timeout)
107     req.addCallbacks(req_ok, req_err)
108     req.addBoth(req_fin)
109     outstanding += 1
110
111 common_startup()
112 process_cfg()
113 start_client()
114 start_ipif(c.ipif_command, outbound)
115 common_run()