chiark / gitweb /
break out process_request
[hippotat.git] / client
1 #!/usr/bin/python3
2
3 from hippotat import *
4
5 import twisted.web
6 import twisted.web.client
7
8 client_cs = None
9
10 def set_client(ci,cs,pw):
11   global client_cs
12   global password
13   assert(client_cs is None)
14   client_cs = cs
15   c.client = ci
16   c.max_outstanding = cfg.getint(cs, 'max_requests_outstanding')
17   c.target_outstanding = cfg.getint(cs, 'target_requests_outstanding')
18   password = pw
19
20 def process_cfg():
21   global url
22   global max_requests_outstanding
23
24   process_cfg_common_always()
25   process_cfg_server()
26
27   try:
28     c.url = cfg.get('server','url')
29   except NoOptionError:
30     process_cfg_saddrs()
31     c.url = c.saddrs[0].url()
32
33   process_cfg_clients(set_client)
34
35   c.routes = cfg.get('virtual','routes')
36   c.max_queue_time = cfg.getint(client_cs, 'max_queue_time')
37   c.max_batch_up   = cfg.getint(client_cs, 'max_batch_up')
38   c.http_timeout   = cfg.getint(client_cs, 'http_timeout')
39
40   process_cfg_ipif(client_cs,
41                    (('local', 'client'),
42                     ('peer',  'server'),
43                     ('rnets', 'routes')))
44
45 outstanding = 0
46
47 def start_client():
48   global queue
49   global agent
50   queue = PacketQueue(c.max_queue_time)
51   agent = twisted.web.client.Agent(reactor, connectTimeout = c.http_timeout)
52
53 def outbound(packet, saddr, daddr):
54   #print('OUT ', saddr, daddr, repr(packet))
55   queue.append(packet)
56   check_outbound()
57
58 class ResponseConsumer(twisted.internet.protocol.Protocol):
59   def __init__(self):
60     self._ssd = SlipStreamDecoder(queue_inbound)
61   def dataReceived(self, data):
62     self._ssd.inputdata(mime_translate(data))
63   def connectionMade(self): pass
64   def connectionLost(self, reason):
65     if isinstance(reason, twisted.internet.error.ConnectionDone):
66       self._ssd.flush()
67     else:
68       print(reason, file=sys.stderr)
69
70 def req_ok(resp):
71   resp.deliverBody(ResponseConsumer())
72
73 def req_err(err):
74   print(err, file=sys.stderr)
75
76 def req_fin(*args):
77   global outstanding
78   outstanding -= 1
79
80 def check_outbound():
81   global outstanding
82   while True:
83     if                          outstanding >= c.max_outstanding   : break
84     if not queue.nonempty() and outstanding >= c.target_outstanding: break
85
86     d = b''
87     def moredata(s): nonlocal d; d += s
88     queue.process((lambda: len(d)),
89                   moredata,
90                   c.max_batch_up)
91     
92     crlf = b'\r\n'
93     mime = (b'--b'                                      + crlf +
94             b'Content-Disposition: form-data; name="m"' + crlf +
95             password                                    + crlf +
96             str(c.client)             .encode('ascii')  + crlf +
97             str(c.target_outstanding) .encode('ascii')  + crlf +
98             b'--b'                                      + crlf +
99             b'Content-Disposition: form-data; name="d"' + crlf +
100             mime_translate(d)                           + crlf +
101             b'--b--'                                    + crlf)
102
103     hh = { 'User-Agent': ['hippotat'],
104            'Content-Type': ['multipart/form-data; boundary="b"'] }
105     req = agent.request(b'POST',
106                         c.url,
107                         twisted.web.client.Headers(hh))
108     req.addTimeout(c.http_timeout, reactor)
109     req.addCallbacks(req_ok, req_err)
110     req.addBoth(req_fin)
111     outstanding += 1
112
113 common_startup()
114 process_cfg()
115 start_client()
116 start_ipif(c.ipif_command, outbound)
117 common_run()