chiark / gitweb /
reorg SlipStreamDecoder again
[hippotat.git] / hippotat / __init__.py
index c57767ae1c2c3045def4875cdaa1efd7c7c61191..e48083a3ae85f8cec7c7c8d1e64f6b84a134b99d 100644 (file)
@@ -32,6 +32,7 @@ max_request_time = 54            # used by server, subject to [limits]
 target_requests_outstanding = 3  # must match; subject to [limits] on server
 max_requests_outstanding = 4     # used by client
 max_batch_up = 4000              # used by client
+http_timeout = 30                # used by client
 
 #[server] or [<client>] overrides
 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
@@ -67,6 +68,12 @@ target_requests_outstanding = 10  # used by server
 cfg = ConfigParser()
 optparser = OptionParser()
 
+_mimetrans = str.maketrans(b'-'+slip.esc, slip.esc+'-')
+def mime_translate(s):
+  # SLIP-encoded packets cannot contain ESC ESC.
+  # Swap `-' and ESC.  The result cannot contain `--'
+  return s.translate(_mimetrans)
+
 class ConfigResults:
   def __init__(self, d = { }):
     self.__dict__ = d
@@ -116,23 +123,41 @@ def ipnetwork(input):
 
 #---------- ipif (SLIP) subprocess ----------
 
-class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
-  def __init__(self, router):
+class SlipStreamDecoder():
+  def __init__(self, on_packet):
+    # we will call packet(<packet>)
     self._buffer = b''
-    self._router = router
-  def connectionMade(self): pass
-  def outReceived(self, data):
-    #print('IPIF-GOT ', repr(data))
+    self._on_packet = on_packet
+
+  def inputdata(self, data):
+    #print('SLIP-GOT ', repr(data))
     self._buffer += data
     packets = slip.decode(self._buffer)
     self._buffer = packets.pop()
     for packet in packets:
-      if not len(packet): continue
-      (saddr, daddr) = packet_addrs(packet)
-      if saddr.is_link_local or daddr.is_link_local:
-        log_discard(packet, saddr, daddr, 'link-local')
-        continue
-      self._router(packet, saddr, daddr)
+      self._maybe_packet(packet)
+
+  def _maybe_packet(self, packet):
+      if len(packet):
+        self._on_packet(packet)
+
+  def flush(self):
+    self._maybe_packet(self._buffer)
+    self._buffer = b''
+
+class _IpifProcessProtocol(SlipProtocol):
+  def __init__(self, router):
+    self._router = router
+    self._decoder = SlipStreamDecoder(self.slip_on_packet)
+  def connectionMade(self): pass
+  def outReceived(self, data):
+    self._decoder.inputdata(data)
+  def slip_on_packet(self, packet):
+    (saddr, daddr) = packet_addrs(packet)
+    if saddr.is_link_local or daddr.is_link_local:
+      log_discard(packet, saddr, daddr, 'link-local')
+      return
+    self._router(packet, saddr, daddr)
   def processEnded(self, status):
     status.raiseException()
 
@@ -171,11 +196,23 @@ class PacketQueue():
 
       return True
 
-  def popleft(self):
-    # caller must have checked nonempty
-    try: (dummy, packet) = self._pq[0]
-    except IndexError: return None
-    return packet
+  def process(self, sizequery, moredata, max_batch):
+    # sizequery() should return size of batch so far
+    # moredata(s) should add s to batch
+    while True:
+      try: (dummy, packet) = self._pq[0]
+      except IndexError: break
+
+      encoded = slip.encode(packet)
+      sofar = sizequery()  
+
+      if sofar > 0:
+        if sofar + len(slip.delimiter) + len(encoded) > max_batch:
+          break
+        moredata(slip.delimiter)
+
+      moredata(encoded)
+      self._pq.popLeft()
 
 #---------- error handling ----------
 
@@ -187,7 +224,7 @@ def crash(err):
 def crash_on_defer(defer):
   defer.addErrback(lambda err: crash(err))
 
-def crash_on_critical(event):
+vdef crash_on_critical(event):
   if event.get('log_level') >= LogLevel.critical:
     crash(twisted.logger.formatEvent(event))