X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=hippotat.git;a=blobdiff_plain;f=hippotat%2F__init__.py;h=0b059e59e2384d0b20cf388ec17887198e66c8ae;hp=8694e978a436efc06effbbb5b4e3b81c5868d06f;hb=9acb0ecadba89a0d715c19c38865ddf28802ebb3;hpb=e4006ac49fd7aa465361d86d079c30b4cabf4593 diff --git a/hippotat/__init__.py b/hippotat/__init__.py index 8694e97..0b059e5 100644 --- a/hippotat/__init__.py +++ b/hippotat/__init__.py @@ -5,10 +5,15 @@ signal.signal(signal.SIGINT, signal.SIG_DFL) import sys +from zope.interface import implementer + import twisted from twisted.internet import reactor -from twisted.logger import LogLevel import twisted.internet.endpoints +import twisted.logger +from twisted.logger import LogLevel +import twisted.python.constants +from twisted.python.constants import NamedConstant import ipaddress from ipaddress import AddressValueError @@ -17,22 +22,87 @@ from optparse import OptionParser from configparser import ConfigParser from configparser import NoOptionError +from functools import partial + import collections +import time +import codecs +import traceback import re as regexp import hippotat.slip as slip +class DBG(twisted.python.constants.Names): + INIT = NamedConstant() + ROUTE = NamedConstant() + DROP = NamedConstant() + FLOW = NamedConstant() + HTTP = NamedConstant() + TWISTED = NamedConstant() + QUEUE = NamedConstant() + HTTP_CTRL = NamedConstant() + QUEUE_CTRL = NamedConstant() + HTTP_FULL = NamedConstant() + CTRL_DUMP = NamedConstant() + SLIP_FULL = NamedConstant() + DATA_COMPLETE = NamedConstant() + +_hex_codec = codecs.getencoder('hex_codec') + +#---------- logging ---------- + +org_stderr = sys.stderr + +log = twisted.logger.Logger() + +debug_set = set() +debug_def_detail = DBG.HTTP + +def log_debug(dflag, msg, idof=None, d=None): + if dflag not in debug_set: return + #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr) + if idof is not None: + msg = '[%#x] %s' % (id(idof), msg) + if d is not None: + trunc = '' + if not DBG.DATA_COMPLETE in debug_set: + if len(d) > 64: + d = d[0:64] + trunc = '...' + d = _hex_codec(d)[0].decode('ascii') + msg += ' ' + d + trunc + log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg) + +@implementer(twisted.logger.ILogFilterPredicate) +class LogNotBoringTwisted: + def __call__(self, event): + yes = twisted.logger.PredicateResult.yes + no = twisted.logger.PredicateResult.no + try: + if event.get('log_level') != LogLevel.info: + return yes + dflag = event.get('dflag') + if dflag in debug_set: return yes + if dflag is None and DBG.TWISTED in debug_set: return yes + return no + except Exception: + print(traceback.format_exc(), file=org_stderr) + return yes + +#---------- default config ---------- + defcfg = ''' [DEFAULT] #[] overrides max_batch_down = 65536 # used by server, subject to [limits] max_queue_time = 10 # used by server, subject to [limits] -max_request_time = 54 # used by server, subject to [limits] target_requests_outstanding = 3 # must match; subject to [limits] on server +http_timeout = 30 # used by both } must be +http_timeout_grace = 5 # used by both } compatible max_requests_outstanding = 4 # used by client max_batch_up = 4000 # used by client -http_timeout = 30 # used by client +http_retry = 5 # used by client #[server] or [] overrides ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s @@ -60,7 +130,7 @@ port = 80 # used by server [limits] max_batch_down = 262144 # used by server max_queue_time = 121 # used by server -max_request_time = 121 # used by server +http_timeout = 121 # used by server target_requests_outstanding = 10 # used by server ''' @@ -82,10 +152,10 @@ class ConfigResults: c = ConfigResults() -def log_discard(packet, saddr, daddr, why): - print('DROP ', saddr, daddr, why) -# syslog.syslog(syslog.LOG_DEBUG, -# 'discarded packet %s -> %s (%s)' % (saddr, daddr, why)) +def log_discard(packet, iface, saddr, daddr, why): + log_debug(DBG.DROP, + 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why), + d=packet) #---------- packet parsing ---------- @@ -124,38 +194,45 @@ def ipnetwork(input): #---------- ipif (SLIP) subprocess ---------- class SlipStreamDecoder(): - def __init__(self, on_packet): - # we will call packet() + def __init__(self, desc, on_packet): self._buffer = b'' self._on_packet = on_packet + self._desc = desc + self._log('__init__') + + def _log(self, msg, **kwargs): + log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs) def inputdata(self, data): - #print('SLIP-GOT ', repr(data)) - self._buffer += data - packets = slip.decode(self._buffer) + self._log('inputdata', d=data) + packets = slip.decode(data) + packets[0] = self._buffer + packets[0] self._buffer = packets.pop() for packet in packets: self._maybe_packet(packet) + self._log('bufremain', d=self._buffer) def _maybe_packet(self, packet): - if len(packet): - self._on_packet(packet) + self._log('maybepacket', d=packet) + if len(packet): + self._on_packet(packet) def flush(self): + self._log('flush') self._maybe_packet(self._buffer) self._buffer = b'' class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol): def __init__(self, router): self._router = router - self._decoder = SlipStreamDecoder(self.slip_on_packet) + self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet) def connectionMade(self): pass def outReceived(self, data): self._decoder.inputdata(data) def slip_on_packet(self, packet): (saddr, daddr) = packet_addrs(packet) if saddr.is_link_local or daddr.is_link_local: - log_discard(packet, saddr, daddr, 'link-local') + log_discard(packet, 'ipif', saddr, daddr, 'link-local') return self._router(packet, saddr, daddr) def processEnded(self, status): @@ -166,9 +243,11 @@ def start_ipif(command, router): ipif = _IpifProcessProtocol(router) reactor.spawnProcess(ipif, '/bin/sh',['sh','-xc', command], - childFDs={0:'w', 1:'r', 2:2}) + childFDs={0:'w', 1:'r', 2:2}, + env=None) def queue_inbound(packet): + log_debug(DBG.FLOW, "queue_inbound", d=packet) ipif.transport.write(slip.delimiter) ipif.transport.write(slip.encode(packet)) ipif.transport.write(slip.delimiter) @@ -176,48 +255,74 @@ def queue_inbound(packet): #---------- packet queue ---------- class PacketQueue(): - def __init__(self, max_queue_time): + def __init__(self, desc, max_queue_time): + self._desc = desc + assert(desc + '') self._max_queue_time = max_queue_time self._pq = collections.deque() # packets + def _log(self, dflag, msg, **kwargs): + log_debug(dflag, self._desc+' pq: '+msg, **kwargs) + def append(self, packet): + self._log(DBG.QUEUE, 'append', d=packet) self._pq.append((time.monotonic(), packet)) def nonempty(self): + self._log(DBG.QUEUE, 'nonempty ?') while True: try: (queuetime, packet) = self._pq[0] - except IndexError: return False + except IndexError: + self._log(DBG.QUEUE, 'nonempty ? empty.') + return False age = time.monotonic() - queuetime - if age > self.max_queue_time: + if age > self._max_queue_time: # strip old packets off the front + self._log(DBG.QUEUE, 'dropping (old)', d=packet) self._pq.popleft() continue + self._log(DBG.QUEUE, 'nonempty ? nonempty.') return True def process(self, sizequery, moredata, max_batch): # sizequery() should return size of batch so far # moredata(s) should add s to batch + self._log(DBG.QUEUE, 'process...') while True: try: (dummy, packet) = self._pq[0] - except IndexError: break + except IndexError: + self._log(DBG.QUEUE, 'process... empty') + break + + self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet) encoded = slip.encode(packet) sofar = sizequery() + self._log(DBG.QUEUE_CTRL, + 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch), + d=encoded) + if sofar > 0: if sofar + len(slip.delimiter) + len(encoded) > max_batch: + self._log(DBG.QUEUE_CTRL, 'process... overflow') break moredata(slip.delimiter) moredata(encoded) - self._pq.popLeft() + self._pq.popleft() #---------- error handling ---------- +_crashing = False + def crash(err): - print('CRASH ', err, file=sys.stderr) + global _crashing + _crashing = True + print('========== CRASH ==========', err, + '===========================', file=sys.stderr) try: reactor.stop() except twisted.internet.error.ReactorNotRunning: pass @@ -240,7 +345,7 @@ def process_cfg_ipif(section, varmap): except AttributeError: continue setattr(c, d, v) - print(repr(c)) + #print(repr(c)) c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__) @@ -263,17 +368,17 @@ class ServerAddr(): try: self.addr = ipaddress.IPv4Address(addrspec) self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint - self._inurl = '%s' + self._inurl = b'%s' except AddressValueError: self.addr = ipaddress.IPv6Address(addrspec) self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint - self._inurl = '[%s]' + self._inurl = b'[%s]' def make_endpoint(self): return self._endpointfactory(reactor, self.port, self.addr) def url(self): - url = 'http://' + (self._inurl % self.addr) - if self.port != 80: url += ':%d' % self.port - url += '/' + url = b'http://' + (self._inurl % str(self.addr).encode('ascii')) + if self.port != 80: url += b':%d' % self.port + url += b'/' return url def process_cfg_saddrs(): @@ -291,22 +396,88 @@ def process_cfg_clients(constructor): if not (':' in cs or '.' in cs): continue ci = ipaddr(cs) pw = cfg.get(cs, 'password') + pw = pw.encode('utf-8') constructor(ci,cs,pw) #---------- startup ---------- def common_startup(): - twisted.logger.globalLogPublisher.addObserver(crash_on_critical) - optparser.add_option('-c', '--config', dest='configfile', default='/etc/hippotat/config') + + def dfs_less_detailed(dl): + return [df for df in DBG.iterconstants() if df <= dl] + + def ds_default(od,os,dl,op): + global debug_set + debug_set = set(dfs_less_detailed(debug_def_detail)) + + def ds_select(od,os, spec, op): + last_df = next(DBG.iterconstants()) + mutator = debug_set.add + + for it in spec.split(','): + + if not len(it): + for df in dfs_less_detailed(last_df): + mutator(df) + continue + + if it.startswith('-'): + mutator = debug_set.discard + it = it[1:] + else: + mutator = debug_set.add + + try: + df = DBG.lookupByName(it) + except ValueError: + optparser.error('unknown debug flag %s in --debug-select' % it) + mutator(df) + last_df = df + + optparser.add_option('-D', '--debug', + nargs=0, + action='callback', + help='enable default debug (to stdout)', + callback= ds_default) + + optparser.add_option('--debug-select', + nargs=1, + type='string', + metavar='[-]DFLAG[,],...', + help= +'''enable (or with -, disable) each specified DFLAG; +empty entry means do the same for all DFLAGS "more interesting" +than the last (or, all DFLAGs)''', + action='callback', + callback= ds_select) + (opts, args) = optparser.parse_args() if len(args): optparser.error('no non-option arguments please') + print(repr(debug_set), file=sys.stderr) + re = regexp.compile('#.*') cfg.read_string(re.sub('', defcfg)) cfg.read(opts.configfile) + log_formatter = twisted.logger.formatEventAsClassicLogText + stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter) + stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter) + pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error) + stdsomething_obs = twisted.logger.FilteringLogObserver( + stderr_obs, [pred], stdout_obs + ) + log_observer = twisted.logger.FilteringLogObserver( + stdsomething_obs, [LogNotBoringTwisted()] + ) + #log_observer = stdsomething_obs + twisted.logger.globalLogBeginner.beginLoggingTo( + [ log_observer, crash_on_critical ] + ) + def common_run(): - reactor.run() + log_debug(DBG.INIT, 'entering reactor') + if not _crashing: reactor.run() print('CRASHED (end)', file=sys.stderr)