chiark / gitweb /
wip
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Wed, 22 Mar 2017 23:01:42 +0000 (23:01 +0000)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Wed, 22 Mar 2017 23:01:42 +0000 (23:01 +0000)
PROTOCOL
client
hippotat/__init__.py
server

index 6b12e4f01a6ee114e6247024e3e82b3349083cf8..b597b9a003f917a607ca64fd10cbee798d3f7773 100644 (file)
--- a/PROTOCOL
+++ b/PROTOCOL
@@ -17,7 +17,7 @@ Client form parameters (multipart/form-data):
                        client ip address (textual)
                        password
                        target_requests_outstanding
- d              data (SLIP format)
+ d              data (SLIP format, with SLIP_ESC and `-' swapped)
 
 
 
diff --git a/client b/client
index 49a10efecf0b7dd81cca7c6de293ce88d2dae16f..39e04c56640e7eeac6a4c0ec01a0310a8f314bd1 100755 (executable)
--- a/client
+++ b/client
@@ -11,6 +11,7 @@ def set_client(ci,cs,pw):
   client_cs = cs
   c.client = ci
   c.max_outstanding = cfg.getint(cs, 'max_requests_outstanding')
+  c.target_outstanding = cfg.getint(cs, 'target_requests_outstanding')
   password = pw
 
 def process_cfg():
@@ -29,7 +30,8 @@ def process_cfg():
   process_cfg_clients(set_client)
 
   c.routes = cfg.get('virtual','routes')
-  c.max_queue_time = cfg.get(client_cs, 'max_queue_time')
+  c.max_queue_time = cfg.getint(client_cs, 'max_queue_time')
+  c.max_batch_up   = cfg.getint(client_cs, 'max_batch_up')
 
   process_cfg_ipif(client_cs,
                    (('local', 'client'),
@@ -40,24 +42,57 @@ outstanding = 0
 
 def start_client():
   global queue
+  global agent
   queue = PacketQueue(c.max_queue_time)
+  agent = twisted.web.client.Agent(reactor, connectTimeout = c.http_timeout)
 
 def outbound(packet, saddr, daddr):
   #print('OUT ', saddr, daddr, repr(packet))
   queue.append(packet)
   check_outbound()
 
+def req_ok(data)
+
+def req_err(err):
+  print(err, >>sys.stderr)
+  outstanding--
+
+def req_fin(*args):  
+
 def check_outbound():
   while True:
-    if outstanding >= c.max_outstanding: break
-    elements = { }
-    if not queue.nonempty():
-      if outstanding >= c.target_
+    if                         outstanding >= c.max_outstanding   : break
+    if not queue.nonempty() && outstanding >= c.target_outstanding: break
+
+    d = b''
+    queue.process((lambda: len(d)),
+                  (lambda s: d += s),
+                  c.max_batch_up)
+    assert(len(d))
+    
+    crlf = b'\r\n'
+    mime = (b'--b'                                      + crlf +
+            b'Content-Disposition: form-data; name="m"' + crlf +
+            password                                    + crlf +
+            c.client                                    + crlf +
+            c.target_outstanding                        + crlf +
+            b'--b'                                      + crlf +
+            b'Content-Disposition: form-data; name="d"' + crlf +
+            mime_translate(d)                           + crlf +
+            b'--b--'                                    + crlf)
 
-  while (outstanding <  and
-         (queue.notempty() or outstanding < c.
+    hh = { 'User-Agent': ['hippotat'],
+           'Content-Type': ['multipart/form-data; boundary="b"'] }
+    req = agent.request('POST',
+                        c.url,
+                        twisted.web.client.Headers(hh))
+    req.addTimeout(c.http_timeout, 
+    req.addCallbacks(req_ok, req_err)
+    req.addBoth(req_fin)
+    outstanding++
 
 common_startup()
 process_cfg()
+start_client()
 start_ipif(c.ipif_command, outbound)
 common_run()
index c57767ae1c2c3045def4875cdaa1efd7c7c61191..8f8a4479a1619727b3b633cf55a5c1889f16bef7 100644 (file)
@@ -32,6 +32,7 @@ max_request_time = 54            # used by server, subject to [limits]
 target_requests_outstanding = 3  # must match; subject to [limits] on server
 max_requests_outstanding = 4     # used by client
 max_batch_up = 4000              # used by client
+http_timeout = 30                # used by client
 
 #[server] or [<client>] overrides
 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
@@ -67,6 +68,12 @@ target_requests_outstanding = 10  # used by server
 cfg = ConfigParser()
 optparser = OptionParser()
 
+_mimetrans = str.maketrans(b'-'+slip.esc, slip.esc+'-')
+def mime_translate(s):
+  # SLIP-encoded packets cannot contain ESC ESC.
+  # Swap `-' and ESC.  The result cannot contain `--'
+  return s.translate(_mimetrans)
+
 class ConfigResults:
   def __init__(self, d = { }):
     self.__dict__ = d
@@ -171,11 +178,23 @@ class PacketQueue():
 
       return True
 
-  def popleft(self):
-    # caller must have checked nonempty
-    try: (dummy, packet) = self._pq[0]
-    except IndexError: return None
-    return packet
+  def process(self, sizequery, moredata, max_batch):
+    # sizequery() should return size of batch so far
+    # moredata(s) should add s to batch
+    while True:
+      try: (dummy, packet) = self._pq[0]
+      except IndexError: break
+
+      encoded = slip.encode(packet)
+      sofar = sizequery()  
+
+      if sofar > 0:
+        if sofar + len(slip.delimiter) + len(encoded) > max_batch:
+          break
+        moredata(slip.delimiter)
+
+      moredata(encoded)
+      self._pq.popLeft()
 
 #---------- error handling ----------
 
@@ -187,7 +206,7 @@ def crash(err):
 def crash_on_defer(defer):
   defer.addErrback(lambda err: crash(err))
 
-def crash_on_critical(event):
+vdef crash_on_critical(event):
   if event.get('log_level') >= LogLevel.critical:
     crash(twisted.logger.formatEvent(event))
 
diff --git a/server b/server
index 2daf0b0223a2f692a1494629cd4db39890440f3c..e3e28cebada5e77c3186f47d2a3748fa90f8e1d7 100755 (executable)
--- a/server
+++ b/server
@@ -102,20 +102,9 @@ class Client():
         break
 
       # request, and also some non-expired packets
-      while True:
-        packet = self.pq.popleft()
-        if packet is None: break
-
-        encoded = slip.encode(packet)
-        
-        if request.sentLength > 0:
-          if (request.sentLength + len(slip.delimiter)
-              + len(encoded) > self.max_batch_down):
-            break
-          request.write(slip.delimiter)
-
-        request.write(encoded)
-        self._pq.popLeft()
+      self._pq.process((lambda: request.sentLength),
+                       request.write,
+                       self.max_batch_down)
 
       assert(request.sentLength)
       self._rq.popLeft()