chiark / gitweb /
018da51da4bdc4bb77d43b025971e7e62b5a1d86
[hippotat.git] / client
1 #!/usr/bin/python3
2
3 from hippotat import *
4
5 import twisted.web
6 import twisted.web.client
7
8 client_cs = None
9
10 def set_client(ci,cs,pw):
11   global client_cs
12   global password
13   assert(client_cs is None)
14   client_cs = cs
15   c.client = ci
16   c.max_outstanding = cfg.getint(cs, 'max_requests_outstanding')
17   c.target_outstanding = cfg.getint(cs, 'target_requests_outstanding')
18   password = pw
19
20 def process_cfg():
21   global url
22   global max_requests_outstanding
23
24   process_cfg_common_always()
25   process_cfg_server()
26
27   try:
28     c.url = cfg.get('server','url')
29   except NoOptionError:
30     process_cfg_saddrs()
31     sa = c.saddrs[0].url()
32
33   process_cfg_clients(set_client)
34
35   c.routes = cfg.get('virtual','routes')
36   c.max_queue_time = cfg.getint(client_cs, 'max_queue_time')
37   c.max_batch_up   = cfg.getint(client_cs, 'max_batch_up')
38   c.http_timeout   = cfg.getint(client_cs, 'http_timeout')
39
40   process_cfg_ipif(client_cs,
41                    (('local', 'client'),
42                     ('peer',  'server'),
43                     ('rnets', 'routes')))
44
45 outstanding = 0
46
47 def start_client():
48   global queue
49   global agent
50   queue = PacketQueue(c.max_queue_time)
51   agent = twisted.web.client.Agent(reactor, connectTimeout = c.http_timeout)
52
53 def outbound(packet, saddr, daddr):
54   #print('OUT ', saddr, daddr, repr(packet))
55   queue.append(packet)
56   check_outbound()
57
58 class ResponseConsumer(twisted.internet.protocol.Protocol):
59   def __init__(self):
60     self._ssd = SlipStreamDecoder(queue_inbound)
61   def dataReceived(self, data):
62     self._ssd.inputdata(mime_translate(data))
63   def connectionMade(self): pass
64   def connectionLost(self, reason):
65     if isinstance(reason, twisted.internet.error.ConnectionDone):
66       self._ssd.flush()
67     else:
68       print(reason, file=sys.stderr)
69
70 def req_ok(resp):
71   resp.deliverBody(ResponseConsumer())
72
73 def req_err(err):
74   print(err, file=sys.stderr)
75
76 def req_fin(*args):  
77   outstanding -= 1
78
79 def check_outbound():
80   while True:
81     if                          outstanding >= c.max_outstanding   : break
82     if not queue.nonempty() and outstanding >= c.target_outstanding: break
83
84     d = b''
85     def moredata(s): global d; d += s
86     queue.process((lambda: len(d)),
87                   moredata,
88                   c.max_batch_up)
89     assert(len(d))
90     
91     crlf = b'\r\n'
92     mime = (b'--b'                                      + crlf +
93             b'Content-Disposition: form-data; name="m"' + crlf +
94             password                                    + crlf +
95             c.client                                    + crlf +
96             c.target_outstanding                        + crlf +
97             b'--b'                                      + crlf +
98             b'Content-Disposition: form-data; name="d"' + crlf +
99             mime_translate(d)                           + crlf +
100             b'--b--'                                    + crlf)
101
102     hh = { 'User-Agent': ['hippotat'],
103            'Content-Type': ['multipart/form-data; boundary="b"'] }
104     req = agent.request(b'POST',
105                         c.url,
106                         twisted.web.client.Headers(hh))
107     req.addTimeout(c.http_timeout)
108     req.addCallbacks(req_ok, req_err)
109     req.addBoth(req_fin)
110     outstanding += 1
111
112 common_startup()
113 process_cfg()
114 start_client()
115 start_ipif(c.ipif_command, outbound)
116 common_run()