Commit | Line | Data |
---|---|---|
c55f394e IJ |
1 | #!/usr/bin/python3 |
2 | ||
3 | from hippotat import * | |
4 | ||
c0c90673 IJ |
5 | import twisted.web |
6 | import twisted.web.client | |
7 | ||
dd6665ee IJ |
8 | import io |
9 | ||
0accf0d3 | 10 | class GeneralResponseConsumer(twisted.internet.protocol.Protocol): |
c7fb640e IJ |
11 | def __init__(self, cl, req, desc): |
12 | self._cl = cl | |
8b62cd2c | 13 | self._req = req |
0accf0d3 | 14 | self._desc = desc |
14c6d55c IJ |
15 | |
16 | def _log(self, dflag, msg, **kwargs): | |
c7fb640e | 17 | self.cl.log(dflag, '%s: %s' % (self._desc, msg), idof=self._req, **kwargs) |
0accf0d3 IJ |
18 | |
19 | def connectionMade(self): | |
20 | self._log(DBG.HTTP_CTRL, 'connectionMade') | |
21 | ||
22 | class ResponseConsumer(GeneralResponseConsumer): | |
c7fb640e IJ |
23 | def __init__(self, cl, req): |
24 | super().__init__(cl, req, 'RC') | |
0accf0d3 | 25 | ssddesc = '[%s] %s' % (id(req), self._desc) |
c7fb640e | 26 | self._ssd = SlipStreamDecoder(ssddesc, cl.queue_inbound) |
0accf0d3 | 27 | self._log(DBG.HTTP_CTRL, '__init__') |
bd9e77fb | 28 | |
62b51bcf | 29 | def dataReceived(self, data): |
380ed56c | 30 | self._log(DBG.HTTP, 'dataReceived', d=data) |
9b65cdd4 | 31 | try: |
02cdcb52 | 32 | self._ssd.inputdata(data) |
9b65cdd4 | 33 | except Exception as e: |
eedc8b30 | 34 | self._handleexception() |
ccd371b3 | 35 | |
62b51bcf | 36 | def connectionLost(self, reason): |
15407d80 | 37 | self._log(DBG.HTTP_CTRL, 'connectionLost ' + str(reason)) |
765aba55 | 38 | if not reason.check(twisted.web.client.ResponseDone): |
0accf0d3 | 39 | self.latefailure() |
765aba55 IJ |
40 | return |
41 | try: | |
380ed56c | 42 | self._log(DBG.HTTP, 'ResponseDone') |
765aba55 | 43 | self._ssd.flush() |
c7fb640e | 44 | self.cl.req_fin(self._req) |
765aba55 | 45 | except Exception as e: |
eedc8b30 IJ |
46 | self._handleexception() |
47 | ||
48 | def _handleexception(self): | |
0accf0d3 | 49 | self._latefailure(traceback.format_exc()) |
33932420 | 50 | |
0accf0d3 | 51 | def _latefailure(self, reason): |
380ed56c | 52 | self._log(DBG.HTTP_CTRL, '_latefailure ' + str(reason)) |
c7fb640e | 53 | self.cl.req_err(self._req, reason) |
bd9e77fb | 54 | |
6e4af0a2 | 55 | class ErrorResponseConsumer(twisted.internet.protocol.Protocol): |
c7fb640e IJ |
56 | def __init__(self, cl, req, resp): |
57 | super().__init__(cl, req, 'ERROR-RC') | |
6e4af0a2 | 58 | self._resp = resp |
0accf0d3 | 59 | self._m = b'' |
6e4af0a2 IJ |
60 | try: |
61 | self._phrase = resp.phrase.decode('utf-8') | |
62 | except Exception: | |
63 | self._phrase = repr(resp.phrase) | |
6e4af0a2 IJ |
64 | self._log(DBG.HTTP_CTRL, '__init__ %d %s' % (resp.code, self._phrase)) |
65 | ||
765aba55 IJ |
66 | def dataReceived(self, data): |
67 | self._log(DBG.HTTP_CTRL, 'dataReceived ' + repr(data)) | |
68 | self._m += data | |
69 | ||
6e4af0a2 IJ |
70 | def connectionLost(self, reason): |
71 | try: | |
72 | mbody = self._m.decode('utf-8') | |
73 | except Exception: | |
74 | mbody = repr(self._m) | |
765aba55 IJ |
75 | if not reason.check(twisted.web.client.ResponseDone): |
76 | mbody += ' || ' + str(reason) | |
c7fb640e | 77 | self.cl.req_err(self._req, |
765aba55 IJ |
78 | "FAILED %d %s | %s" |
79 | % (self._resp.code, self._phrase, mbody)) | |
6e4af0a2 | 80 | |
c7fb640e IJ |
81 | class Client(): |
82 | def __init__(cl, c,ss,cs): | |
83 | cl.c = c | |
84 | cl.outstanding = { } | |
85 | cl.desc = '[%s %s] ' % (ss,cs) | |
86 | ||
87 | def log(cl, dflag, msg, **kwargs): | |
88 | log_debug(dflag, cl.desc + msg, **kwargs) | |
89 | ||
90 | def log_outstanding(cl): | |
91 | cl.log(DBG.CTRL_DUMP, 'OS %s' % outstanding) | |
92 | ||
93 | def start(cl): | |
94 | cl.queue = PacketQueue('up', c.max_queue_time) | |
95 | cl.agent = twisted.web.client.Agent( | |
96 | reactor, connectTimeout = c.http_timeout) | |
97 | ||
98 | def outbound(cl, packet, saddr, daddr): | |
99 | #print('OUT ', saddr, daddr, repr(packet)) | |
100 | cl.queue.append(packet) | |
101 | cl.check_outbound() | |
102 | ||
103 | def req_ok(cl, req, resp): | |
104 | cl.log(DBG.HTTP_CTRL, | |
5dd3275b IJ |
105 | 'req_ok %d %s %s' % (resp.code, repr(resp.phrase), str(resp)), |
106 | idof=req) | |
6e4af0a2 | 107 | if resp.code == 200: |
c7fb640e | 108 | rc = ResponseConsumer(cl, req) |
6e4af0a2 | 109 | else: |
c7fb640e | 110 | rc = ErrorResponseConsumer(cl, req, resp) |
5dd3275b | 111 | |
8b62cd2c | 112 | resp.deliverBody(rc) |
0accf0d3 | 113 | # now rc is responsible for calling req_fin |
7b07f0b5 | 114 | |
c7fb640e IJ |
115 | def req_err(cl, req, err): |
116 | # called when the Deferred fails, or (if it completes), | |
117 | # later, by ResponsConsumer or ErrorResponsConsumer | |
118 | try: | |
119 | cl.log(DBG.HTTP_CTRL, 'req_err ' + str(err), idof=req) | |
120 | if isinstance(err, twisted.python.failure.Failure): | |
121 | err = err.getTraceback() | |
122 | print('[%#x] %s' % (id(req), err), file=sys.stderr) | |
123 | if not isinstance(outstanding[req], int): | |
124 | raise RuntimeError('[%#x] previously %s' % (id(req), outstanding[req])) | |
125 | cl.outstanding[req] = err | |
126 | cl.log_outstanding() | |
127 | reactor.callLater(c.http_retry, partial(cl.req_fin, req)) | |
128 | except Exception as e: | |
129 | crash(traceback.format_exc() + '\n----- handling -----\n' + err) | |
130 | ||
131 | def req_fin(cl, req): | |
132 | del cl.outstanding[req] | |
133 | cl.log(DBG.HTTP_CTRL, 'req_fin OS=%d' % len(outstanding), idof=req) | |
134 | cl.check_outbound() | |
135 | ||
136 | def check_outbound(cl): | |
137 | while True: | |
138 | if len(cl.outstanding) >= cl.c.max_outstanding: | |
139 | break | |
140 | ||
141 | if (not queue.nonempty() and | |
142 | len(cl.outstanding) >= cl.c.target_outstanding): | |
143 | break | |
144 | ||
145 | d = b'' | |
146 | def moredata(s): nonlocal d; d += s | |
147 | queue.process((lambda: len(d)), | |
148 | moredata, | |
149 | cl.c.max_batch_up) | |
150 | ||
151 | d = mime_translate(d) | |
152 | ||
153 | crlf = b'\r\n' | |
154 | lf = b'\n' | |
155 | mime = (b'--b' + crlf + | |
156 | b'Content-Type: text/plain; charset="utf-8"' + crlf + | |
157 | b'Content-Disposition: form-data; name="m"' + crlf + crlf + | |
158 | str(cl.c.client) .encode('ascii') + crlf + | |
159 | cl.c.password + crlf + | |
160 | str(cl.c.target_outstanding).encode('ascii') + crlf + | |
161 | str(cl.c.http_timeout) .encode('ascii') + crlf + | |
162 | (( | |
163 | b'--b' + crlf + | |
164 | b'Content-Type: application/octet-stream' + crlf + | |
165 | b'Content-Disposition: form-data; name="d"' + crlf + crlf + | |
166 | d + crlf | |
167 | ) if len(d) else b'') + | |
168 | b'--b--' + crlf) | |
169 | ||
170 | #df = open('data.dump.dbg', mode='wb') | |
171 | #df.write(mime) | |
172 | #df.close() | |
173 | # POST -use -c 'multipart/form-data; boundary="b"' http://localhost:8099/ <data.dump.dbg | |
174 | ||
175 | cl.log(DBG.HTTP_FULL, 'requesting: ' + str(mime)) | |
176 | ||
177 | hh = { 'User-Agent': ['hippotat'], | |
178 | 'Content-Type': ['multipart/form-data; boundary="b"'], | |
179 | 'Content-Length': [str(len(mime))] } | |
180 | ||
181 | bytesreader = io.BytesIO(mime) | |
182 | producer = twisted.web.client.FileBodyProducer(bytesreader) | |
183 | ||
184 | req = agent.request(b'POST', | |
185 | cl.c.url, | |
186 | twisted.web.client.Headers(hh), | |
187 | producer) | |
188 | ||
189 | cl.outstanding[req] = len(d) | |
190 | cl.log(DBG.HTTP_CTRL, | |
191 | 'request OS=%d' % len(cl.outstanding), | |
192 | idof=req, d=d) | |
193 | req.addTimeout(cl.c.http_timeout, reactor) | |
194 | req.addCallback(partial(cl.req_ok, req)) | |
195 | req.addErrback(partial(cl.req_err, req)) | |
196 | ||
197 | cl.log_outstanding() | |
198 | ||
199 | clients = [ ] | |
200 | ||
201 | def process_cfg(putative_servers, putative_clients): | |
202 | global clients | |
203 | ||
204 | for ss in putative_servers.values(): | |
205 | for (ci,cs) in putative_clients.items(): | |
206 | c = ConfigResults() | |
207 | ||
208 | sections = process_cfg_client_common(c,ss,cs,ci): | |
209 | if not sections: continue | |
210 | ||
211 | def srch(getter,key): return cfg_search(getter,key,sections) | |
212 | ||
213 | c.http_timeout += srch(cfg.getint, 'http_timeout_grace') | |
214 | c.max_outstanding = srch(cfg.getint, 'max_requests_outstanding') | |
215 | c.max_batch_up = srch(cfg.getint, 'max_batch_up') | |
216 | c.http_retry = srch(cfg.getint, 'http_retry') | |
217 | c.vroutes = srch(cfg.get, 'vroutes') | |
218 | ||
219 | process_cfg_common(c,ss) | |
220 | try: c.url = srch(cfg.get,'url') | |
221 | except NoOptionError: | |
222 | process_cfg_saddrs() | |
223 | c.url = c.saddrs[0].url() | |
224 | ||
225 | process_cfg_ipif(cc, | |
226 | sections, | |
227 | (('local','client'), | |
228 | ('peer', 'vaddr'), | |
229 | ('rnets','vroutes'))) | |
230 | ||
231 | clients.append(Client(c,ss,cs)) | |
0accf0d3 | 232 | |
5510890e | 233 | common_startup(process_cfg) |
c7fb640e IJ |
234 | |
235 | for cl in clients: | |
236 | cl.start() | |
237 | start_ipif(cl.c.ipif_command, cl.outbound) | |
238 | cl.check_outbound() | |
239 | ||
034284c3 | 240 | common_run() |