chiark / gitweb /
fixes
[hippotat.git] / client
1 #!/usr/bin/python3
2
3 from hippotat import *
4
5 import twisted.web
6 import twisted.web.client
7
8 client_cs = None
9
10 def set_client(ci,cs,pw):
11   global client_cs
12   global password
13   assert(client_cs is None)
14   client_cs = cs
15   c.client = ci
16   c.max_outstanding = cfg.getint(cs, 'max_requests_outstanding')
17   c.target_outstanding = cfg.getint(cs, 'target_requests_outstanding')
18   password = pw
19
20 def process_cfg():
21   global url
22   global max_requests_outstanding
23
24   process_cfg_common_always()
25   process_cfg_server()
26
27   try:
28     c.url = cfg.get('server','url')
29   except NoOptionError:
30     process_cfg_saddrs()
31     c.url = c.saddrs[0].url()
32
33   process_cfg_clients(set_client)
34
35   c.routes = cfg.get('virtual','routes')
36   c.max_queue_time = cfg.getint(client_cs, 'max_queue_time')
37   c.max_batch_up   = cfg.getint(client_cs, 'max_batch_up')
38   c.http_timeout   = cfg.getint(client_cs, 'http_timeout')
39   c.http_retry     = cfg.getint(client_cs, 'http_retry')
40
41   process_cfg_ipif(client_cs,
42                    (('local', 'client'),
43                     ('peer',  'server'),
44                     ('rnets', 'routes')))
45
46 outstanding = 0
47
48 def start_client():
49   global queue
50   global agent
51   queue = PacketQueue('up', c.max_queue_time)
52   agent = twisted.web.client.Agent(reactor, connectTimeout = c.http_timeout)
53
54 def outbound(packet, saddr, daddr):
55   #print('OUT ', saddr, daddr, repr(packet))
56   queue.append(packet)
57   check_outbound()
58
59 class ResponseConsumer(twisted.internet.protocol.Protocol):
60   def __init__(self):
61     self._ssd = SlipStreamDecoder(queue_inbound)
62   def dataReceived(self, data):
63     try: self._ssd.inputdata(mime_translate(data))
64     except Exception as e: asyncfailure(e)
65   def connectionMade(self): pass
66   def connectionLost(self, reason):
67     if isinstance(reason, twisted.internet.error.ConnectionDone):
68       try: self._ssd.flush()
69       except Exception as e: asyncfailure(e)
70     else:
71       asyncfailure(reason)
72
73 def req_ok(resp):
74   resp.deliverBody(ResponseConsumer())
75   req_fin()
76
77 def req_err(err):
78   print(err, file=sys.stderr)
79   reactor.callLater(c.http_retry, req_fin)
80
81 def req_fin(*args):
82   global outstanding
83   outstanding -= 1
84   check_outbound()
85
86 def asyncfailure(reason):
87   global outstanding
88   outstanding += 1
89   req_err(reason)
90
91 def check_outbound():
92   global outstanding
93
94   while True:
95     if                          outstanding >= c.max_outstanding   : break
96     if not queue.nonempty() and outstanding >= c.target_outstanding: break
97
98     d = b''
99     def moredata(s): nonlocal d; d += s
100     queue.process((lambda: len(d)),
101                   moredata,
102                   c.max_batch_up)
103     
104     crlf = b'\r\n'
105     lf   =   b'\n'
106     mime = (b'--b'                                        + crlf +
107             b'Content-Type: text/plain; charset="utf-8"'  + crlf +
108             b'Content-Disposition: form-data; name="m"'   + crlf + crlf +
109             str(c.client)             .encode('ascii')    + crlf +
110             password                                      + crlf +
111             str(c.target_outstanding) .encode('ascii')    + crlf +
112           ((
113             b'--b'                                        + crlf +
114             b'Content-Type: application/octet-stream'     + crlf +
115             b'Content-Disposition: form-data; name="d"'   + crlf + crlf +
116             mime_translate(d)                             + crlf
117            ) if len(d) else b'')                               +
118             b'--b--'                                      + crlf)
119
120     #df = open('data.dump.dbg', mode='wb')
121     #df.write(mime)
122     #df.close()
123     # POST -use -c 'multipart/form-data; boundary="b"' http://localhost:8099/ <data.dump.dbg
124
125     log_debug(DBG.HTTP_FULL, 'requesting: ' + str(mime))
126
127     hh = { 'User-Agent': ['hippotat'],
128            'Content-Type': ['multipart/form-data; boundary="b"'],
129            'Content-Length': [str(len(mime))] }
130     req = agent.request(b'POST',
131                         c.url,
132                         twisted.web.client.Headers(hh))
133     req.addTimeout(c.http_timeout, reactor)
134     req.addCallbacks(req_ok, req_err)
135     outstanding += 1
136
137 common_startup()
138 process_cfg()
139 start_client()
140 start_ipif(c.ipif_command, outbound)
141 check_outbound()
142 common_run()