chiark / gitweb /
wip, and move PacketQueue
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Mon, 20 Mar 2017 19:45:20 +0000 (19:45 +0000)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Mon, 20 Mar 2017 19:46:42 +0000 (19:46 +0000)
hippotat/__init__.py
server

index 2b13003994fa6bf3464e8f039df0aa2d81f82736..c1ce08511aae7f43f9113512c1263ff7b54f5666 100644 (file)
@@ -73,3 +73,31 @@ def queue_inbound(packet):
   ipif.transport.write(slip.encode(packet))
   ipif.transport.write(slip.delimiter)
 
+#---------- packet queue ----------
+
+class PacketQueue():
+  def __init__(self, max_queue_time):
+    self._max_queue_time = max_queue_time
+    self._pq = collections.deque() # packets
+
+  def append(self, packet):
+    self._pq.append((time.monotonic(), packet))
+
+  def nonempty(self):
+    while True:
+      try: (queuetime, packet) = self._pq[0]
+      except IndexError: return False
+
+      age = time.monotonic() - queuetime
+      if age > self.max_queue_time:
+        # strip old packets off the front
+        self._pq.popleft()
+        continue
+
+      return True
+
+  def popleft(self):
+    # caller must have checked nonempty
+    try: (dummy, packet) = self._pq[0]
+    except IndexError: return None
+    return packet
diff --git a/server b/server
index 80bdd807202cdd8a20f09eb1a5a1cc431dd4202d..db13f25954d01ff4ad24fe6fbd6bb4305e0d2995 100755 (executable)
--- a/server
+++ b/server
@@ -6,10 +6,8 @@ signal.signal(signal.SIGINT, signal.SIG_DFL)
 import sys
 import os
 
-import twisted
 import twisted.internet
 import twisted.internet.endpoints
-from twisted.internet import reactor
 from twisted.web.server import NOT_DONE_YET
 from twisted.logger import LogLevel
 
@@ -33,6 +31,7 @@ defcfg = '''
 max_batch_down = 65536
 max_queue_time = 10
 max_request_time = 54
+target_requests_outstanding = 3
 
 [virtual]
 mtu = 1500
@@ -49,6 +48,7 @@ port = 8099
 max_batch_down = 262144
 max_queue_time = 121
 max_request_time = 121
+target_requests_outstanding = 10
 '''
 
 #---------- error handling ----------
@@ -97,15 +97,18 @@ class Client():
     self._cs = cs
     self.pw = cfg.get(cs, 'password')
     self._rq = collections.deque() # requests
-    self._pq = collections.deque() # packets
+    # self._pq = PacketQueue(...)
     # plus from config:
     #  .max_batch_down
     #  .max_queue_time
     #  .max_request_time
-    for k in ('max_batch_down','max_queue_time','max_request_time'):
+    #  .target_requests_outstanding
+    for k in ('max_batch_down','max_queue_time','max_request_time',
+              'target_requests_outstanding'):
       req = cfg.getint(cs, k)
       limit = cfg.getint('limits',k)
       self.__dict__[k] = min(req, limit)
+    self._pq = PacketQueue(self.max_queue_time)
 
     def process_arriving_data(self, d):
       for packet in slip.decode(d):
@@ -121,7 +124,7 @@ class Client():
       self._req_cancel(request)
 
     def queue_outbound(self, packet):
-      self._pq.append((time.monotonic(), packet))
+      self._pq.append(packet)
 
     def http_request(self, request):
       request.setHeader('Content-Type','application/octet-stream')
@@ -138,15 +141,8 @@ class Client():
           self._rq.popleft()
           continue
 
-        # now request is an unfinished request, or None
-        try: (queuetime, packet) = self._pq[0]
-        except IndexError:
+        if not self._pq.nonempty():
           # no packets, oh well
-          break
-
-        age = time.monotonic() - queuetime
-        if age > self.max_queue_time:
-          self._pq.popleft()
           continue
 
         if request is None:
@@ -155,8 +151,8 @@ class Client():
 
         # request, and also some non-expired packets
         while True:
-          try: (dummy, packet) = self._pq[0]
-          except IndexError: break
+          packet = self.pq.popleft()
+          if packet is None: break
 
           encoded = slip.encode(packet)
           
@@ -174,6 +170,10 @@ class Client():
         request.finish()
         # round again, looking for more to do
 
+      while len(self._rq) > self.target_requests_outstanding:
+        request = self._rq.popleft()
+        request.finish()
+
 class IphttpResource(twisted.web.resource.Resource):
   isLeaf = True
   def render_POST(self, request):
@@ -186,7 +186,8 @@ class IphttpResource(twisted.web.resource.Resource):
     # update config
     for r, w in (('mbd', 'max_batch_down'),
                  ('mqt', 'max_queue_time'),
-                 ('mrt', 'max_request_time')):
+                 ('mrt', 'max_request_time'),
+                 ('tro', 'target_requests_outstanding')):
       try: v = request.args[r]
       except KeyError: continue
       v = int(v)