X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=hippotat%2F__init__.py;h=8694e978a436efc06effbbb5b4e3b81c5868d06f;hb=e4006ac49fd7aa465361d86d079c30b4cabf4593;hp=194dc9971bd7b3ede7573ae720e76c374b5f04e0;hpb=1d023c89e4379225b5d71e565b93f537e119ab75;p=hippotat.git diff --git a/hippotat/__init__.py b/hippotat/__init__.py index 194dc99..8694e97 100644 --- a/hippotat/__init__.py +++ b/hippotat/__init__.py @@ -3,6 +3,8 @@ import signal signal.signal(signal.SIGINT, signal.SIG_DFL) +import sys + import twisted from twisted.internet import reactor from twisted.logger import LogLevel @@ -11,18 +13,67 @@ import twisted.internet.endpoints import ipaddress from ipaddress import AddressValueError -import hippotat.slip as slip - from optparse import OptionParser from configparser import ConfigParser from configparser import NoOptionError import collections +import re as regexp + +import hippotat.slip as slip + +defcfg = ''' +[DEFAULT] +#[] overrides +max_batch_down = 65536 # used by server, subject to [limits] +max_queue_time = 10 # used by server, subject to [limits] +max_request_time = 54 # used by server, subject to [limits] +target_requests_outstanding = 3 # must match; subject to [limits] on server +max_requests_outstanding = 4 # used by client +max_batch_up = 4000 # used by client +http_timeout = 30 # used by client + +#[server] or [] overrides +ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s +# extra interpolations: %(local)s %(peer)s %(rnet)s +# obtained on server [virtual]server [virtual]relay [virtual]network +# from on client [virtual]server [virtual]routes + +[virtual] +mtu = 1500 +routes = '' +# network = / # mandatory for server +# server = # used by both, default is computed from `network' +# relay = # used by server, default from `network' and `server' +# default server is first host in network +# default relay is first host which is not server + +[server] +# addrs = 127.0.0.1 ::1 # mandatory for server +port = 80 # used by server +# url # used by client; default from first `addrs' and `port' + +# [] +# password = # used by both, must match + +[limits] +max_batch_down = 262144 # used by server +max_queue_time = 121 # used by server +max_request_time = 121 # used by server +target_requests_outstanding = 10 # used by server +''' + # these need to be defined here so that they can be imported by import * cfg = ConfigParser() optparser = OptionParser() +_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-') +def mime_translate(s): + # SLIP-encoded packets cannot contain ESC ESC. + # Swap `-' and ESC. The result cannot contain `--' + return s.translate(_mimetrans) + class ConfigResults: def __init__(self, d = { }): self.__dict__ = d @@ -31,6 +82,11 @@ class ConfigResults: c = ConfigResults() +def log_discard(packet, saddr, daddr, why): + print('DROP ', saddr, daddr, why) +# syslog.syslog(syslog.LOG_DEBUG, +# 'discarded packet %s -> %s (%s)' % (saddr, daddr, why)) + #---------- packet parsing ---------- def packet_addrs(packet): @@ -67,20 +123,41 @@ def ipnetwork(input): #---------- ipif (SLIP) subprocess ---------- -class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol): - def __init__(self, router): +class SlipStreamDecoder(): + def __init__(self, on_packet): + # we will call packet() self._buffer = b'' - self._router = router - def connectionMade(self): pass - def outReceived(self, data): - #print('RECV ', repr(data)) + self._on_packet = on_packet + + def inputdata(self, data): + #print('SLIP-GOT ', repr(data)) self._buffer += data packets = slip.decode(self._buffer) self._buffer = packets.pop() for packet in packets: - if not len(packet): continue - (saddr, daddr) = packet_addrs(packet) - self._router(packet, saddr, daddr) + self._maybe_packet(packet) + + def _maybe_packet(self, packet): + if len(packet): + self._on_packet(packet) + + def flush(self): + self._maybe_packet(self._buffer) + self._buffer = b'' + +class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol): + def __init__(self, router): + self._router = router + self._decoder = SlipStreamDecoder(self.slip_on_packet) + def connectionMade(self): pass + def outReceived(self, data): + self._decoder.inputdata(data) + def slip_on_packet(self, packet): + (saddr, daddr) = packet_addrs(packet) + if saddr.is_link_local or daddr.is_link_local: + log_discard(packet, saddr, daddr, 'link-local') + return + self._router(packet, saddr, daddr) def processEnded(self, status): status.raiseException() @@ -119,11 +196,23 @@ class PacketQueue(): return True - def popleft(self): - # caller must have checked nonempty - try: (dummy, packet) = self._pq[0] - except IndexError: return None - return packet + def process(self, sizequery, moredata, max_batch): + # sizequery() should return size of batch so far + # moredata(s) should add s to batch + while True: + try: (dummy, packet) = self._pq[0] + except IndexError: break + + encoded = slip.encode(packet) + sofar = sizequery() + + if sofar > 0: + if sofar + len(slip.delimiter) + len(encoded) > max_batch: + break + moredata(slip.delimiter) + + moredata(encoded) + self._pq.popLeft() #---------- error handling ---------- @@ -148,7 +237,7 @@ def process_cfg_common_always(): def process_cfg_ipif(section, varmap): for d, s in varmap: try: v = getattr(c, s) - except KeyError: pass + except AttributeError: continue setattr(c, d, v) print(repr(c)) @@ -206,7 +295,7 @@ def process_cfg_clients(constructor): #---------- startup ---------- -def common_startup(defcfg): +def common_startup(): twisted.logger.globalLogPublisher.addObserver(crash_on_critical) optparser.add_option('-c', '--config', dest='configfile', @@ -214,7 +303,8 @@ def common_startup(defcfg): (opts, args) = optparser.parse_args() if len(args): optparser.error('no non-option arguments please') - cfg.read_string(defcfg) + re = regexp.compile('#.*') + cfg.read_string(re.sub('', defcfg)) cfg.read(opts.configfile) def common_run():