From 650a3251620a44d22f30a4242cea6fff88ccfba5 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Mon, 20 Mar 2017 19:45:20 +0000 Subject: [PATCH 1/1] wip, and move PacketQueue --- hippotat/__init__.py | 28 ++++++++++++++++++++++++++++ server | 33 +++++++++++++++++---------------- 2 files changed, 45 insertions(+), 16 deletions(-) diff --git a/hippotat/__init__.py b/hippotat/__init__.py index 2b13003..c1ce085 100644 --- a/hippotat/__init__.py +++ b/hippotat/__init__.py @@ -73,3 +73,31 @@ def queue_inbound(packet): ipif.transport.write(slip.encode(packet)) ipif.transport.write(slip.delimiter) +#---------- packet queue ---------- + +class PacketQueue(): + def __init__(self, max_queue_time): + self._max_queue_time = max_queue_time + self._pq = collections.deque() # packets + + def append(self, packet): + self._pq.append((time.monotonic(), packet)) + + def nonempty(self): + while True: + try: (queuetime, packet) = self._pq[0] + except IndexError: return False + + age = time.monotonic() - queuetime + if age > self.max_queue_time: + # strip old packets off the front + self._pq.popleft() + continue + + return True + + def popleft(self): + # caller must have checked nonempty + try: (dummy, packet) = self._pq[0] + except IndexError: return None + return packet diff --git a/server b/server index 80bdd80..db13f25 100755 --- a/server +++ b/server @@ -6,10 +6,8 @@ signal.signal(signal.SIGINT, signal.SIG_DFL) import sys import os -import twisted import twisted.internet import twisted.internet.endpoints -from twisted.internet import reactor from twisted.web.server import NOT_DONE_YET from twisted.logger import LogLevel @@ -33,6 +31,7 @@ defcfg = ''' max_batch_down = 65536 max_queue_time = 10 max_request_time = 54 +target_requests_outstanding = 3 [virtual] mtu = 1500 @@ -49,6 +48,7 @@ port = 8099 max_batch_down = 262144 max_queue_time = 121 max_request_time = 121 +target_requests_outstanding = 10 ''' #---------- error handling ---------- @@ -97,15 +97,18 @@ class Client(): self._cs = cs self.pw = cfg.get(cs, 'password') self._rq = collections.deque() # requests - self._pq = collections.deque() # packets + # self._pq = PacketQueue(...) # plus from config: # .max_batch_down # .max_queue_time # .max_request_time - for k in ('max_batch_down','max_queue_time','max_request_time'): + # .target_requests_outstanding + for k in ('max_batch_down','max_queue_time','max_request_time', + 'target_requests_outstanding'): req = cfg.getint(cs, k) limit = cfg.getint('limits',k) self.__dict__[k] = min(req, limit) + self._pq = PacketQueue(self.max_queue_time) def process_arriving_data(self, d): for packet in slip.decode(d): @@ -121,7 +124,7 @@ class Client(): self._req_cancel(request) def queue_outbound(self, packet): - self._pq.append((time.monotonic(), packet)) + self._pq.append(packet) def http_request(self, request): request.setHeader('Content-Type','application/octet-stream') @@ -138,15 +141,8 @@ class Client(): self._rq.popleft() continue - # now request is an unfinished request, or None - try: (queuetime, packet) = self._pq[0] - except IndexError: + if not self._pq.nonempty(): # no packets, oh well - break - - age = time.monotonic() - queuetime - if age > self.max_queue_time: - self._pq.popleft() continue if request is None: @@ -155,8 +151,8 @@ class Client(): # request, and also some non-expired packets while True: - try: (dummy, packet) = self._pq[0] - except IndexError: break + packet = self.pq.popleft() + if packet is None: break encoded = slip.encode(packet) @@ -174,6 +170,10 @@ class Client(): request.finish() # round again, looking for more to do + while len(self._rq) > self.target_requests_outstanding: + request = self._rq.popleft() + request.finish() + class IphttpResource(twisted.web.resource.Resource): isLeaf = True def render_POST(self, request): @@ -186,7 +186,8 @@ class IphttpResource(twisted.web.resource.Resource): # update config for r, w in (('mbd', 'max_batch_down'), ('mqt', 'max_queue_time'), - ('mrt', 'max_request_time')): + ('mrt', 'max_request_time'), + ('tro', 'target_requests_outstanding')): try: v = request.args[r] except KeyError: continue v = int(v) -- 2.30.2