X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=hippotat.git;a=blobdiff_plain;f=hippotat%2F__init__.py;h=8a21966e279c175af286cd6855e7255339996d33;hp=6de4d14c33e659b1bc91daa236ab86096c7b323d;hb=c7f134ce5802cb6a36371a5130d3a678518e9152;hpb=3297cac10f5726598246c6db6cf0efc4c51f355d diff --git a/hippotat/__init__.py b/hippotat/__init__.py index 6de4d14..8a21966 100644 --- a/hippotat/__init__.py +++ b/hippotat/__init__.py @@ -4,6 +4,9 @@ import signal signal.signal(signal.SIGINT, signal.SIG_DFL) import sys +import os + +from zope.interface import implementer import twisted from twisted.internet import reactor @@ -17,9 +20,12 @@ import ipaddress from ipaddress import AddressValueError from optparse import OptionParser +import configparser from configparser import ConfigParser from configparser import NoOptionError +from functools import partial + import collections import time import codecs @@ -30,75 +36,109 @@ import re as regexp import hippotat.slip as slip class DBG(twisted.python.constants.Names): + INIT = NamedConstant() + CONFIG = NamedConstant() ROUTE = NamedConstant() DROP = NamedConstant() FLOW = NamedConstant() HTTP = NamedConstant() - HTTP_CTRL = NamedConstant() - INIT = NamedConstant() + TWISTED = NamedConstant() QUEUE = NamedConstant() + HTTP_CTRL = NamedConstant() QUEUE_CTRL = NamedConstant() HTTP_FULL = NamedConstant() + CTRL_DUMP = NamedConstant() SLIP_FULL = NamedConstant() + DATA_COMPLETE = NamedConstant() _hex_codec = codecs.getencoder('hex_codec') +#---------- logging ---------- + +org_stderr = sys.stderr + log = twisted.logger.Logger() +debug_set = set() +debug_def_detail = DBG.HTTP + def log_debug(dflag, msg, idof=None, d=None): + if dflag not in debug_set: return #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr) if idof is not None: - msg = '[%d] %s' % (id(idof), msg) + msg = '[%#x] %s' % (id(idof), msg) if d is not None: - #d = d[0:64] + trunc = '' + if not DBG.DATA_COMPLETE in debug_set: + if len(d) > 64: + d = d[0:64] + trunc = '...' d = _hex_codec(d)[0].decode('ascii') - msg += ' ' + d + msg += ' ' + d + trunc log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg) +@implementer(twisted.logger.ILogFilterPredicate) +class LogNotBoringTwisted: + def __call__(self, event): + yes = twisted.logger.PredicateResult.yes + no = twisted.logger.PredicateResult.no + try: + if event.get('log_level') != LogLevel.info: + return yes + dflag = event.get('dflag') + if dflag is False : return yes + if dflag in debug_set: return yes + if dflag is None and DBG.TWISTED in debug_set: return yes + return no + except Exception: + print(traceback.format_exc(), file=org_stderr) + return yes + +#---------- default config ---------- + defcfg = ''' [DEFAULT] -#[] overrides -max_batch_down = 65536 # used by server, subject to [limits] -max_queue_time = 10 # used by server, subject to [limits] -max_request_time = 54 # used by server, subject to [limits] -target_requests_outstanding = 3 # must match; subject to [limits] on server -max_requests_outstanding = 4 # used by client -max_batch_up = 4000 # used by client -http_timeout = 30 # used by client -http_retry = 5 # used by client +max_batch_down = 65536 +max_queue_time = 10 +target_requests_outstanding = 3 +http_timeout = 30 +http_timeout_grace = 5 +max_requests_outstanding = 6 +max_batch_up = 4000 +http_retry = 5 +port = 80 +vroutes = '' #[server] or [] overrides ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s -# extra interpolations: %(local)s %(peer)s %(rnet)s -# obtained on server [virtual]server [virtual]relay [virtual]network -# from on client [virtual]server [virtual]routes -[virtual] +# relating to virtual network mtu = 1500 -routes = '' -# network = / # mandatory for server -# server = # used by both, default is computed from `network' -# relay = # used by server, default from `network' and `server' -# default server is first host in network -# default relay is first host which is not server - -[server] -# addrs = 127.0.0.1 ::1 # mandatory for server -port = 80 # used by server -# url # used by client; default from first `addrs' and `port' + +[SERVER] +server = SERVER +# addrs = 127.0.0.1 ::1 +# url + +# relating to virtual network +vvnetwork = 172.24.230.192 +# vnetwork = / +# vadd r = +# vrelay = + # [] # password = # used by both, must match -[limits] -max_batch_down = 262144 # used by server -max_queue_time = 121 # used by server -max_request_time = 121 # used by server -target_requests_outstanding = 10 # used by server +[LIMIT] +max_batch_down = 262144 +max_queue_time = 121 +http_timeout = 121 +target_requests_outstanding = 10 ''' # these need to be defined here so that they can be imported by import * -cfg = ConfigParser() +cfg = ConfigParser(strict=False) optparser = OptionParser() _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-') @@ -108,13 +148,11 @@ def mime_translate(s): return s.translate(_mimetrans) class ConfigResults: - def __init__(self, d = { }): - self.__dict__ = d + def __init__(self): + pass def __repr__(self): return 'ConfigResults('+repr(self.__dict__)+')' -c = ConfigResults() - def log_discard(packet, iface, saddr, daddr, why): log_debug(DBG.DROP, 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why), @@ -168,21 +206,20 @@ class SlipStreamDecoder(): def inputdata(self, data): self._log('inputdata', d=data) - data = self._buffer + data - self._buffer = b'' packets = slip.decode(data) + packets[0] = self._buffer + packets[0] self._buffer = packets.pop() for packet in packets: self._maybe_packet(packet) - self._log('inputdata bufremain', d=self._buffer) + self._log('bufremain', d=self._buffer) def _maybe_packet(self, packet): - self._log('inputdata maybepacket', d=packet) + self._log('maybepacket', d=packet) if len(packet): self._on_packet(packet) def flush(self): - self._log('inputdata flush') + self._log('flush') self._maybe_packet(self._buffer) self._buffer = b'' @@ -285,7 +322,8 @@ _crashing = False def crash(err): global _crashing _crashing = True - print('CRASH ', err, file=sys.stderr) + print('========== CRASH ==========', err, + '===========================', file=sys.stderr) try: reactor.stop() except twisted.internet.error.ReactorNotRunning: pass @@ -298,88 +336,299 @@ def crash_on_critical(event): #---------- config processing ---------- -def process_cfg_common_always(): - global mtu - c.mtu = cfg.get('virtual','mtu') +def _cfg_process_putatives(): + servers = { } + clients = { } + # maps from abstract object to canonical name for cs's + + def putative(cmap, abstract, canoncs): + try: + current_canoncs = cmap[abstract] + except KeyError: + pass + else: + assert(current_canoncs == canoncs) + cmap[abstract] = canoncs + + server_pat = r'[-.0-9A-Za-z]+' + client_pat = r'[.:0-9a-f]+' + server_re = regexp.compile(server_pat) + serverclient_re = regexp.compile(server_pat + r' ' + client_pat) + + for cs in cfg.sections(): + if cs == 'LIMIT': + # plan A "[LIMIT]" + continue + + try: + # plan B "[]" part 1 + ci = ipaddr(cs) + except AddressValueError: + + if server_re.fullmatch(cs): + # plan C "[]" + putative(servers, cs, cs) + continue + + if serverclient_re.fullmatch(cs): + # plan D "[ ]" part 1 + (pss,pcs) = cs.split(' ') + + if pcs == 'LIMIT': + # plan E "[ LIMIT]" + continue + + try: + # plan D "[ ]" part 2 + ci = ipaddr(pc) + except AddressValueError: + # plan F "[]" + # well, we ignore this + print('warning: ignoring config section %s' % cs, file=sys.stderr) + continue + + else: # no AddressValueError + # plan D "[ " part 2 + putative(clients, ci, cs) + continue + + return (servers, clients) + +def cfg_process_common(ss): + c.mtu = cfg.getint(ss, 'mtu') + +def cfg_process_saddrs(c, ss): + class ServerAddr(): + def __init__(self, port, addrspec): + self.port = port + # also self.addr + try: + self.addr = ipaddress.IPv4Address(addrspec) + self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint + self._inurl = b'%s' + except AddressValueError: + self.addr = ipaddress.IPv6Address(addrspec) + self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint + self._inurl = b'[%s]' + def make_endpoint(self): + return self._endpointfactory(reactor, self.port, self.addr) + def url(self): + url = b'http://' + (self._inurl % str(self.addr).encode('ascii')) + if self.port != 80: url += b':%d' % self.port + url += b'/' + return url + + c.port = cfg.getint(ss,'port') + c.saddrs = [ ] + for addrspec in cfg.get(ss, 'addrs').split(): + sa = ServerAddr(c.port, addrspec) + c.saddrs.append(sa) + +def cfg_process_vnetwork(c, ss): + c.vnetwork = ipnetwork(cfg.get(ss,'vnetwork')) + if c.vnetwork.num_addresses < 3 + 2: + raise ValueError('vnetwork needs at least 2^3 addresses') + +def cfg_process_vaddr(c, ss): + try: + c.vaddr = cfg.get(ss,'vaddr') + except NoOptionError: + cfg_process_vnetwork(c, ss) + c.vaddr = next(c.vnetwork.hosts()) + +def cfg_search_section(key,sections): + for section in sections: + if cfg.has_option(section, key): + return section + raise NoOptionError(key, repr(sections)) + +def cfg_search(getter,key,sections): + section = cfg_search_section(key,sections) + return getter(section, key) + +def cfg_process_client_limited(cc,ss,sections,key): + val = cfg_search(cfg.getint, key, sections) + lim = cfg_search(cfg.getint, key, ['%s LIMIT' % ss, 'LIMIT']) + cc.__dict__[key] = min(val,lim) + +def cfg_process_client_common(cc,ss,cs,ci): + # returns sections to search in, iff password is defined, otherwise None + cc.ci = ci + + sections = ['%s %s' % (ss,cs), + cs, + ss, + 'DEFAULT'] + + try: pwsection = cfg_search_section('password', sections) + except NoOptionError: return None + + pw = cfg.get(pwsection, 'password') + cc.password = pw.encode('utf-8') + + cfg_process_client_limited(cc,ss,sections,'target_requests_outstanding') + cfg_process_client_limited(cc,ss,sections,'http_timeout') -def process_cfg_ipif(section, varmap): + return sections + +def cfg_process_ipif(c, sections, varmap): for d, s in varmap: try: v = getattr(c, s) except AttributeError: continue setattr(c, d, v) - #print(repr(c)) + #print('CFGIPIF',repr((varmap, sections, c.__dict__)),file=sys.stderr) + section = cfg_search_section('ipif', sections) c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__) -def process_cfg_network(): - c.network = ipnetwork(cfg.get('virtual','network')) - if c.network.num_addresses < 3 + 2: - raise ValueError('network needs at least 2^3 addresses') +#---------- startup ---------- -def process_cfg_server(): - try: - c.server = cfg.get('virtual','server') - except NoOptionError: - process_cfg_network() - c.server = next(c.network.hosts()) +def common_startup(process_cfg): + # calls process_cfg(putative_clients, putative_servers) + + # ConfigParser hates #-comments after values + trailingcomments_re = regexp.compile(r'#.*') + cfg.read_string(trailingcomments_re.sub('', defcfg)) + need_defcfg = True + + def readconfig(pathname, mandatory=True): + def log(m, p=pathname): + if not DBG.CONFIG in debug_set: return + print('DBG.CONFIG: %s: %s' % (m, pathname)) -class ServerAddr(): - def __init__(self, port, addrspec): - self.port = port - # also self.addr try: - self.addr = ipaddress.IPv4Address(addrspec) - self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint - self._inurl = b'%s' - except AddressValueError: - self.addr = ipaddress.IPv6Address(addrspec) - self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint - self._inurl = b'[%s]' - def make_endpoint(self): - return self._endpointfactory(reactor, self.port, self.addr) - def url(self): - url = b'http://' + (self._inurl % str(self.addr).encode('ascii')) - if self.port != 80: url += b':%d' % self.port - url += b'/' - return url - -def process_cfg_saddrs(): - try: port = cfg.getint('server','port') - except NoOptionError: port = 80 + files = os.listdir(pathname) - c.saddrs = [ ] - for addrspec in cfg.get('server','addrs').split(): - sa = ServerAddr(port, addrspec) - c.saddrs.append(sa) + except FileNotFoundError: + if mandatory: raise + log('skipped') + return -def process_cfg_clients(constructor): - c.clients = [ ] - for cs in cfg.sections(): - if not (':' in cs or '.' in cs): continue - ci = ipaddr(cs) - pw = cfg.get(cs, 'password') - pw = pw.encode('utf-8') - constructor(ci,cs,pw) + except NotADirectoryError: + cfg.read(pathname) + log('read file') + return -#---------- startup ---------- + # is a directory + log('directory') + re = regexp.compile('[^-A-Za-z0-9_]') + for f in os.listdir(cdir): + if re.search(f): continue + subpath = pathname + '/' + f + try: + os.stat(subpath) + except FileNotFoundError: + log('entry skipped', subpath) + continue + cfg.read(subpath) + log('entry read', subpath) + + def oc_config(od,os, value, op): + nonlocal need_defcfg + need_defcfg = False + readconfig(value) + + def dfs_less_detailed(dl): + return [df for df in DBG.iterconstants() if df <= dl] + + def ds_default(od,os,dl,op): + global debug_set + debug_set = set(dfs_less_detailed(debug_def_detail)) + + def ds_select(od,os, spec, op): + for it in spec.split(','): + + if it.startswith('-'): + mutator = debug_set.discard + it = it[1:] + else: + mutator = debug_set.add + + if it == '+': + dfs = DBG.iterconstants() + + else: + if it.endswith('+'): + mapper = dfs_less_detailed + it = it[0:len(it)-1] + else: + mapper = lambda x: [x] + + try: + dfspec = DBG.lookupByName(it) + except ValueError: + optparser.error('unknown debug flag %s in --debug-select' % it) + + dfs = mapper(dfspec) + + for df in dfs: + mutator(df) + + optparser.add_option('-D', '--debug', + nargs=0, + action='callback', + help='enable default debug (to stdout)', + callback= ds_default) + + optparser.add_option('--debug-select', + nargs=1, + type='string', + metavar='[-]DFLAG[+]|[-]+,...', + help= +'''enable (`-': disable) each specified DFLAG; +`+': do same for all "more interesting" DFLAGSs; +just `+': all DFLAGs. + DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]), + action='callback', + callback= ds_select) + + optparser.add_option('-c', '--config', + nargs=1, + type='string', + metavar='CONFIGFILE', + dest='configfile', + action='callback', + callback= oc_config) + + (opts, args) = optparser.parse_args() + if len(args): optparser.error('no non-option arguments please') + + if need_defcfg: + readconfig('/etc/hippotat/config', False) + readconfig('/etc/hippotat/config.d', False) + + try: + (pss, pcs) = _cfg_process_putatives() + process_cfg(pss, pcs) + except (configparser.Error, ValueError): + traceback.print_exc(file=sys.stderr) + print('\nInvalid configuration, giving up.', file=sys.stderr) + sys.exit(12) + + #print(repr(debug_set), file=sys.stderr) -def common_startup(): log_formatter = twisted.logger.formatEventAsClassicLogText - log_observer = twisted.logger.FileLogObserver(sys.stderr, log_formatter) + stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter) + stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter) + pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error) + stdsomething_obs = twisted.logger.FilteringLogObserver( + stderr_obs, [pred], stdout_obs + ) + log_observer = twisted.logger.FilteringLogObserver( + stdsomething_obs, [LogNotBoringTwisted()] + ) + #log_observer = stdsomething_obs twisted.logger.globalLogBeginner.beginLoggingTo( [ log_observer, crash_on_critical ] ) - optparser.add_option('-c', '--config', dest='configfile', - default='/etc/hippotat/config') - (opts, args) = optparser.parse_args() - if len(args): optparser.error('no non-option arguments please') - - re = regexp.compile('#.*') - cfg.read_string(re.sub('', defcfg)) - cfg.read(opts.configfile) - def common_run(): log_debug(DBG.INIT, 'entering reactor') if not _crashing: reactor.run()