From 7b07f0b5fd215702dc58c53bd1cd7c63767f5710 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Wed, 22 Mar 2017 23:01:42 +0000 Subject: [PATCH] wip --- PROTOCOL | 2 +- client | 49 +++++++++++++++++++++++++++++++++++++------- hippotat/__init__.py | 31 ++++++++++++++++++++++------ server | 17 +++------------ 4 files changed, 71 insertions(+), 28 deletions(-) diff --git a/PROTOCOL b/PROTOCOL index 6b12e4f..b597b9a 100644 --- a/PROTOCOL +++ b/PROTOCOL @@ -17,7 +17,7 @@ Client form parameters (multipart/form-data): client ip address (textual) password target_requests_outstanding - d data (SLIP format) + d data (SLIP format, with SLIP_ESC and `-' swapped) diff --git a/client b/client index 49a10ef..39e04c5 100755 --- a/client +++ b/client @@ -11,6 +11,7 @@ def set_client(ci,cs,pw): client_cs = cs c.client = ci c.max_outstanding = cfg.getint(cs, 'max_requests_outstanding') + c.target_outstanding = cfg.getint(cs, 'target_requests_outstanding') password = pw def process_cfg(): @@ -29,7 +30,8 @@ def process_cfg(): process_cfg_clients(set_client) c.routes = cfg.get('virtual','routes') - c.max_queue_time = cfg.get(client_cs, 'max_queue_time') + c.max_queue_time = cfg.getint(client_cs, 'max_queue_time') + c.max_batch_up = cfg.getint(client_cs, 'max_batch_up') process_cfg_ipif(client_cs, (('local', 'client'), @@ -40,24 +42,57 @@ outstanding = 0 def start_client(): global queue + global agent queue = PacketQueue(c.max_queue_time) + agent = twisted.web.client.Agent(reactor, connectTimeout = c.http_timeout) def outbound(packet, saddr, daddr): #print('OUT ', saddr, daddr, repr(packet)) queue.append(packet) check_outbound() +def req_ok(data) + +def req_err(err): + print(err, >>sys.stderr) + outstanding-- + +def req_fin(*args): + def check_outbound(): while True: - if outstanding >= c.max_outstanding: break - elements = { } - if not queue.nonempty(): - if outstanding >= c.target_ + if outstanding >= c.max_outstanding : break + if not queue.nonempty() && outstanding >= c.target_outstanding: break + + d = b'' + queue.process((lambda: len(d)), + (lambda s: d += s), + c.max_batch_up) + assert(len(d)) + + crlf = b'\r\n' + mime = (b'--b' + crlf + + b'Content-Disposition: form-data; name="m"' + crlf + + password + crlf + + c.client + crlf + + c.target_outstanding + crlf + + b'--b' + crlf + + b'Content-Disposition: form-data; name="d"' + crlf + + mime_translate(d) + crlf + + b'--b--' + crlf) - while (outstanding < and - (queue.notempty() or outstanding < c. + hh = { 'User-Agent': ['hippotat'], + 'Content-Type': ['multipart/form-data; boundary="b"'] } + req = agent.request('POST', + c.url, + twisted.web.client.Headers(hh)) + req.addTimeout(c.http_timeout, + req.addCallbacks(req_ok, req_err) + req.addBoth(req_fin) + outstanding++ common_startup() process_cfg() +start_client() start_ipif(c.ipif_command, outbound) common_run() diff --git a/hippotat/__init__.py b/hippotat/__init__.py index c57767a..8f8a447 100644 --- a/hippotat/__init__.py +++ b/hippotat/__init__.py @@ -32,6 +32,7 @@ max_request_time = 54 # used by server, subject to [limits] target_requests_outstanding = 3 # must match; subject to [limits] on server max_requests_outstanding = 4 # used by client max_batch_up = 4000 # used by client +http_timeout = 30 # used by client #[server] or [] overrides ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s @@ -67,6 +68,12 @@ target_requests_outstanding = 10 # used by server cfg = ConfigParser() optparser = OptionParser() +_mimetrans = str.maketrans(b'-'+slip.esc, slip.esc+'-') +def mime_translate(s): + # SLIP-encoded packets cannot contain ESC ESC. + # Swap `-' and ESC. The result cannot contain `--' + return s.translate(_mimetrans) + class ConfigResults: def __init__(self, d = { }): self.__dict__ = d @@ -171,11 +178,23 @@ class PacketQueue(): return True - def popleft(self): - # caller must have checked nonempty - try: (dummy, packet) = self._pq[0] - except IndexError: return None - return packet + def process(self, sizequery, moredata, max_batch): + # sizequery() should return size of batch so far + # moredata(s) should add s to batch + while True: + try: (dummy, packet) = self._pq[0] + except IndexError: break + + encoded = slip.encode(packet) + sofar = sizequery() + + if sofar > 0: + if sofar + len(slip.delimiter) + len(encoded) > max_batch: + break + moredata(slip.delimiter) + + moredata(encoded) + self._pq.popLeft() #---------- error handling ---------- @@ -187,7 +206,7 @@ def crash(err): def crash_on_defer(defer): defer.addErrback(lambda err: crash(err)) -def crash_on_critical(event): +vdef crash_on_critical(event): if event.get('log_level') >= LogLevel.critical: crash(twisted.logger.formatEvent(event)) diff --git a/server b/server index 2daf0b0..e3e28ce 100755 --- a/server +++ b/server @@ -102,20 +102,9 @@ class Client(): break # request, and also some non-expired packets - while True: - packet = self.pq.popleft() - if packet is None: break - - encoded = slip.encode(packet) - - if request.sentLength > 0: - if (request.sentLength + len(slip.delimiter) - + len(encoded) > self.max_batch_down): - break - request.write(slip.delimiter) - - request.write(encoded) - self._pq.popLeft() + self._pq.process((lambda: request.sentLength), + request.write, + self.max_batch_down) assert(request.sentLength) self._rq.popLeft() -- 2.30.2