X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=hippotat.git;a=blobdiff_plain;f=hippotat%2F__init__.py;h=abc2a9fd687129eecd9732e258ac68168ed24711;hp=8f8a4479a1619727b3b633cf55a5c1889f16bef7;hb=d579a04817c1a141e715481a18fe7d71568c451d;hpb=7b07f0b5fd215702dc58c53bd1cd7c63767f5710 diff --git a/hippotat/__init__.py b/hippotat/__init__.py index 8f8a447..abc2a9f 100644 --- a/hippotat/__init__.py +++ b/hippotat/__init__.py @@ -18,11 +18,23 @@ from configparser import ConfigParser from configparser import NoOptionError import collections +import time import re as regexp +from twisted.python.constants import NamedConstant + import hippotat.slip as slip +class DBG(twisted.python.constants.Names): + ROUTE = NamedConstant() + FLOW = NamedConstant() + HTTP = NamedConstant() + HTTP_CTRL = NamedConstant() + INIT = NamedConstant() + QUEUE = NamedConstant() + QUEUE_CTRL = NamedConstant() + defcfg = ''' [DEFAULT] #[] overrides @@ -33,6 +45,7 @@ target_requests_outstanding = 3 # must match; subject to [limits] on server max_requests_outstanding = 4 # used by client max_batch_up = 4000 # used by client http_timeout = 30 # used by client +http_retry = 5 # used by client #[server] or [] overrides ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s @@ -68,7 +81,7 @@ target_requests_outstanding = 10 # used by server cfg = ConfigParser() optparser = OptionParser() -_mimetrans = str.maketrans(b'-'+slip.esc, slip.esc+'-') +_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-') def mime_translate(s): # SLIP-encoded packets cannot contain ESC ESC. # Swap `-' and ESC. The result cannot contain `--' @@ -123,23 +136,41 @@ def ipnetwork(input): #---------- ipif (SLIP) subprocess ---------- -class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol): - def __init__(self, router): +class SlipStreamDecoder(): + def __init__(self, on_packet): + # we will call packet() self._buffer = b'' - self._router = router - def connectionMade(self): pass - def outReceived(self, data): - #print('IPIF-GOT ', repr(data)) + self._on_packet = on_packet + + def inputdata(self, data): + #print('SLIP-GOT ', repr(data)) self._buffer += data packets = slip.decode(self._buffer) self._buffer = packets.pop() for packet in packets: - if not len(packet): continue - (saddr, daddr) = packet_addrs(packet) - if saddr.is_link_local or daddr.is_link_local: - log_discard(packet, saddr, daddr, 'link-local') - continue - self._router(packet, saddr, daddr) + self._maybe_packet(packet) + + def _maybe_packet(self, packet): + if len(packet): + self._on_packet(packet) + + def flush(self): + self._maybe_packet(self._buffer) + self._buffer = b'' + +class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol): + def __init__(self, router): + self._router = router + self._decoder = SlipStreamDecoder(self.slip_on_packet) + def connectionMade(self): pass + def outReceived(self, data): + self._decoder.inputdata(data) + def slip_on_packet(self, packet): + (saddr, daddr) = packet_addrs(packet) + if saddr.is_link_local or daddr.is_link_local: + log_discard(packet, saddr, daddr, 'link-local') + return + self._router(packet, saddr, daddr) def processEnded(self, status): status.raiseException() @@ -148,7 +179,8 @@ def start_ipif(command, router): ipif = _IpifProcessProtocol(router) reactor.spawnProcess(ipif, '/bin/sh',['sh','-xc', command], - childFDs={0:'w', 1:'r', 2:2}) + childFDs={0:'w', 1:'r', 2:2}, + env=None) def queue_inbound(packet): ipif.transport.write(slip.delimiter) @@ -158,20 +190,26 @@ def queue_inbound(packet): #---------- packet queue ---------- class PacketQueue(): - def __init__(self, max_queue_time): + def __init__(self, desc, max_queue_time): + self._desc = desc self._max_queue_time = max_queue_time self._pq = collections.deque() # packets + def _log_debug(self, fn, pri, msg) + log_debug(pri, + def append(self, packet): + log_data(DBG.QUEUE, packet, 'pq %s: append' % self._desc) self._pq.append((time.monotonic(), packet)) def nonempty(self): + log_debug(DBG.QUEUE, 'pq %s: nonempty ?' % self._desc) while True: try: (queuetime, packet) = self._pq[0] except IndexError: return False age = time.monotonic() - queuetime - if age > self.max_queue_time: + if age > self._max_queue_time: # strip old packets off the front self._pq.popleft() continue @@ -194,7 +232,7 @@ class PacketQueue(): moredata(slip.delimiter) moredata(encoded) - self._pq.popLeft() + self._pq.popleft() #---------- error handling ---------- @@ -206,7 +244,7 @@ def crash(err): def crash_on_defer(defer): defer.addErrback(lambda err: crash(err)) -vdef crash_on_critical(event): +def crash_on_critical(event): if event.get('log_level') >= LogLevel.critical: crash(twisted.logger.formatEvent(event)) @@ -245,17 +283,17 @@ class ServerAddr(): try: self.addr = ipaddress.IPv4Address(addrspec) self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint - self._inurl = '%s' + self._inurl = b'%s' except AddressValueError: self.addr = ipaddress.IPv6Address(addrspec) self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint - self._inurl = '[%s]' + self._inurl = b'[%s]' def make_endpoint(self): return self._endpointfactory(reactor, self.port, self.addr) def url(self): - url = 'http://' + (self._inurl % self.addr) - if self.port != 80: url += ':%d' % self.port - url += '/' + url = b'http://' + (self._inurl % str(self.addr).encode('ascii')) + if self.port != 80: url += b':%d' % self.port + url += b'/' return url def process_cfg_saddrs(): @@ -273,6 +311,7 @@ def process_cfg_clients(constructor): if not (':' in cs or '.' in cs): continue ci = ipaddr(cs) pw = cfg.get(cs, 'password') + pw = pw.encode('utf-8') constructor(ci,cs,pw) #---------- startup ----------