-# -*- python -*-
-
-import signal
-signal.signal(signal.SIGINT, signal.SIG_DFL)
-
-import sys
-import os
-
-from zope.interface import implementer
-
-import twisted
-from twisted.internet import reactor
-import twisted.internet.endpoints
-import twisted.logger
-from twisted.logger import LogLevel
-import twisted.python.constants
-from twisted.python.constants import NamedConstant
-
-import ipaddress
-from ipaddress import AddressValueError
-
-from optparse import OptionParser
-import configparser
-from configparser import ConfigParser
-from configparser import NoOptionError
-
-from functools import partial
-
-import collections
-import time
-import codecs
-import traceback
-
-import re as regexp
-
-import hippotat.slip as slip
-
-class DBG(twisted.python.constants.Names):
- INIT = NamedConstant()
- CONFIG = NamedConstant()
- ROUTE = NamedConstant()
- DROP = NamedConstant()
- FLOW = NamedConstant()
- HTTP = NamedConstant()
- TWISTED = NamedConstant()
- QUEUE = NamedConstant()
- HTTP_CTRL = NamedConstant()
- QUEUE_CTRL = NamedConstant()
- HTTP_FULL = NamedConstant()
- CTRL_DUMP = NamedConstant()
- SLIP_FULL = NamedConstant()
- DATA_COMPLETE = NamedConstant()
-
-_hex_codec = codecs.getencoder('hex_codec')
-
-#---------- logging ----------
-
-org_stderr = sys.stderr
-
-log = twisted.logger.Logger()
-
-debug_set = set()
-debug_def_detail = DBG.HTTP
-
-def log_debug(dflag, msg, idof=None, d=None):
- if dflag not in debug_set: return
- #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
- if idof is not None:
- msg = '[%#x] %s' % (id(idof), msg)
- if d is not None:
- trunc = ''
- if not DBG.DATA_COMPLETE in debug_set:
- if len(d) > 64:
- d = d[0:64]
- trunc = '...'
- d = _hex_codec(d)[0].decode('ascii')
- msg += ' ' + d + trunc
- log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
-
-@implementer(twisted.logger.ILogFilterPredicate)
-class LogNotBoringTwisted:
- def __call__(self, event):
- yes = twisted.logger.PredicateResult.yes
- no = twisted.logger.PredicateResult.no
- try:
- if event.get('log_level') != LogLevel.info:
- return yes
- dflag = event.get('dflag')
- if dflag is False : return yes
- if dflag in debug_set: return yes
- if dflag is None and DBG.TWISTED in debug_set: return yes
- return no
- except Exception:
- print(traceback.format_exc(), file=org_stderr)
- return yes
-
-#---------- default config ----------
-
-defcfg = '''
-[DEFAULT]
-max_batch_down = 65536
-max_queue_time = 10
-target_requests_outstanding = 3
-http_timeout = 30
-http_timeout_grace = 5
-max_requests_outstanding = 6
-max_batch_up = 4000
-http_retry = 5
-port = 80
-vroutes = ''
-
-#[server] or [<client>] overrides
-ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
-
-# relating to virtual network
-mtu = 1500
-
-[SERVER]
-server = SERVER
-# addrs = 127.0.0.1 ::1
-# url
-
-# relating to virtual network
-vvnetwork = 172.24.230.192
-# vnetwork = <prefix>/<len>
-# vadd r = <ipaddr>
-# vrelay = <ipaddr>
-
-
-# [<client-ip4-or-ipv6-address>]
-# password = <password> # used by both, must match
-
-[LIMIT]
-max_batch_down = 262144
-max_queue_time = 121
-http_timeout = 121
-target_requests_outstanding = 10
-'''
-
-# these need to be defined here so that they can be imported by import *
-cfg = ConfigParser(strict=False)
-optparser = OptionParser()
-
-_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
-def mime_translate(s):
- # SLIP-encoded packets cannot contain ESC ESC.
- # Swap `-' and ESC. The result cannot contain `--'
- return s.translate(_mimetrans)
-
-class ConfigResults:
- def __init__(self):
- pass
- def __repr__(self):
- return 'ConfigResults('+repr(self.__dict__)+')'
-
-def log_discard(packet, iface, saddr, daddr, why):
- log_debug(DBG.DROP,
- 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
- d=packet)
-
-#---------- packet parsing ----------
-
-def packet_addrs(packet):
- version = packet[0] >> 4
- if version == 4:
- addrlen = 4
- saddroff = 3*4
- factory = ipaddress.IPv4Address
- elif version == 6:
- addrlen = 16
- saddroff = 2*4
- factory = ipaddress.IPv6Address
- else:
- raise ValueError('unsupported IP version %d' % version)
- saddr = factory(packet[ saddroff : saddroff + addrlen ])
- daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
- return (saddr, daddr)
-
-#---------- address handling ----------
-
-def ipaddr(input):
- try:
- r = ipaddress.IPv4Address(input)
- except AddressValueError:
- r = ipaddress.IPv6Address(input)
- return r
-
-def ipnetwork(input):
- try:
- r = ipaddress.IPv4Network(input)
- except NetworkValueError:
- r = ipaddress.IPv6Network(input)
- return r
-
-#---------- ipif (SLIP) subprocess ----------
-
-class SlipStreamDecoder():
- def __init__(self, desc, on_packet):
- self._buffer = b''
- self._on_packet = on_packet
- self._desc = desc
- self._log('__init__')
-
- def _log(self, msg, **kwargs):
- log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
-
- def inputdata(self, data):
- self._log('inputdata', d=data)
- data = self._buffer + data
- self._buffer = b''
- packets = slip.decode(data, True)
- self._buffer = packets.pop()
- for packet in packets:
- self._maybe_packet(packet)
- self._log('bufremain', d=self._buffer)
-
- def _maybe_packet(self, packet):
- self._log('maybepacket', d=packet)
- if len(packet):
- self._on_packet(packet)
-
- def flush(self):
- self._log('flush')
- data = self._buffer
- self._buffer = b''
- packets = slip.decode(data)
- assert(len(packets) == 1)
- self._maybe_packet(packets[0])
-
-class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
- def __init__(self, router):
- self._router = router
- self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
- def connectionMade(self): pass
- def outReceived(self, data):
- self._decoder.inputdata(data)
- def slip_on_packet(self, packet):
- (saddr, daddr) = packet_addrs(packet)
- if saddr.is_link_local or daddr.is_link_local:
- log_discard(packet, 'ipif', saddr, daddr, 'link-local')
- return
- self._router(packet, saddr, daddr)
- def processEnded(self, status):
- status.raiseException()
-
-def start_ipif(command, router):
- ipif = _IpifProcessProtocol(router)
- reactor.spawnProcess(ipif,
- '/bin/sh',['sh','-xc', command],
- childFDs={0:'w', 1:'r', 2:2},
- env=None)
- return ipif
-
-def queue_inbound(ipif, packet):
- log_debug(DBG.FLOW, "queue_inbound", d=packet)
- ipif.transport.write(slip.delimiter)
- ipif.transport.write(slip.encode(packet))
- ipif.transport.write(slip.delimiter)
-
-#---------- packet queue ----------
-
-class PacketQueue():
- def __init__(self, desc, max_queue_time):
- self._desc = desc
- assert(desc + '')
- self._max_queue_time = max_queue_time
- self._pq = collections.deque() # packets
-
- def _log(self, dflag, msg, **kwargs):
- log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
-
- def append(self, packet):
- self._log(DBG.QUEUE, 'append', d=packet)
- self._pq.append((time.monotonic(), packet))
-
- def nonempty(self):
- self._log(DBG.QUEUE, 'nonempty ?')
- while True:
- try: (queuetime, packet) = self._pq[0]
- except IndexError:
- self._log(DBG.QUEUE, 'nonempty ? empty.')
- return False
-
- age = time.monotonic() - queuetime
- if age > self._max_queue_time:
- # strip old packets off the front
- self._log(DBG.QUEUE, 'dropping (old)', d=packet)
- self._pq.popleft()
- continue
-
- self._log(DBG.QUEUE, 'nonempty ? nonempty.')
- return True
-
- def process(self, sizequery, moredata, max_batch):
- # sizequery() should return size of batch so far
- # moredata(s) should add s to batch
- self._log(DBG.QUEUE, 'process...')
- while True:
- try: (dummy, packet) = self._pq[0]
- except IndexError:
- self._log(DBG.QUEUE, 'process... empty')
- break
-
- self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
-
- encoded = slip.encode(packet)
- sofar = sizequery()
-
- self._log(DBG.QUEUE_CTRL,
- 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
- d=encoded)
-
- if sofar > 0:
- if sofar + len(slip.delimiter) + len(encoded) > max_batch:
- self._log(DBG.QUEUE_CTRL, 'process... overflow')
- break
- moredata(slip.delimiter)
-
- moredata(encoded)
- self._pq.popleft()
-
-#---------- error handling ----------
-
-_crashing = False
-
-def crash(err):
- global _crashing
- _crashing = True
- print('========== CRASH ==========', err,
- '===========================', file=sys.stderr)
- try: reactor.stop()
- except twisted.internet.error.ReactorNotRunning: pass
-
-def crash_on_defer(defer):
- defer.addErrback(lambda err: crash(err))
-
-def crash_on_critical(event):
- if event.get('log_level') >= LogLevel.critical:
- crash(twisted.logger.formatEvent(event))
-
-#---------- config processing ----------
-
-def _cfg_process_putatives():
- servers = { }
- clients = { }
- # maps from abstract object to canonical name for cs's
-
- def putative(cmap, abstract, canoncs):
- try:
- current_canoncs = cmap[abstract]
- except KeyError:
- pass
- else:
- assert(current_canoncs == canoncs)
- cmap[abstract] = canoncs
-
- server_pat = r'[-.0-9A-Za-z]+'
- client_pat = r'[.:0-9a-f]+'
- server_re = regexp.compile(server_pat)
- serverclient_re = regexp.compile(server_pat + r' ' + client_pat)
-
- for cs in cfg.sections():
- if cs == 'LIMIT':
- # plan A "[LIMIT]"
- continue
-
- try:
- # plan B "[<client>]" part 1
- ci = ipaddr(cs)
- except AddressValueError:
-
- if server_re.fullmatch(cs):
- # plan C "[<servername>]"
- putative(servers, cs, cs)
- continue
-
- if serverclient_re.fullmatch(cs):
- # plan D "[<servername> <client>]" part 1
- (pss,pcs) = cs.split(' ')
-
- if pcs == 'LIMIT':
- # plan E "[<servername> LIMIT]"
- continue
-
- try:
- # plan D "[<servername> <client>]" part 2
- ci = ipaddr(pc)
- except AddressValueError:
- # plan F "[<some thing we do not understand>]"
- # well, we ignore this
- print('warning: ignoring config section %s' % cs, file=sys.stderr)
- continue
-
- else: # no AddressValueError
- # plan D "[<servername> <client]" part 3
- putative(clients, ci, pcs)
- putative(servers, pss, pss)
- continue
-
- else: # no AddressValueError
- # plan B "[<client>" part 2
- putative(clients, ci, cs)
- continue
-
- return (servers, clients)
-
-def cfg_process_common(c, ss):
- c.mtu = cfg.getint(ss, 'mtu')
-
-def cfg_process_saddrs(c, ss):
- class ServerAddr():
- def __init__(self, port, addrspec):
- self.port = port
- # also self.addr
- try:
- self.addr = ipaddress.IPv4Address(addrspec)
- self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
- self._inurl = b'%s'
- except AddressValueError:
- self.addr = ipaddress.IPv6Address(addrspec)
- self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
- self._inurl = b'[%s]'
- def make_endpoint(self):
- return self._endpointfactory(reactor, self.port, self.addr)
- def url(self):
- url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
- if self.port != 80: url += b':%d' % self.port
- url += b'/'
- return url
-
- c.port = cfg.getint(ss,'port')
- c.saddrs = [ ]
- for addrspec in cfg.get(ss, 'addrs').split():
- sa = ServerAddr(c.port, addrspec)
- c.saddrs.append(sa)
-
-def cfg_process_vnetwork(c, ss):
- c.vnetwork = ipnetwork(cfg.get(ss,'vnetwork'))
- if c.vnetwork.num_addresses < 3 + 2:
- raise ValueError('vnetwork needs at least 2^3 addresses')
-
-def cfg_process_vaddr(c, ss):
- try:
- c.vaddr = cfg.get(ss,'vaddr')
- except NoOptionError:
- cfg_process_vnetwork(c, ss)
- c.vaddr = next(c.vnetwork.hosts())
-
-def cfg_search_section(key,sections):
- for section in sections:
- if cfg.has_option(section, key):
- return section
- raise NoOptionError(key, repr(sections))
-
-def cfg_search(getter,key,sections):
- section = cfg_search_section(key,sections)
- return getter(section, key)
-
-def cfg_process_client_limited(cc,ss,sections,key):
- val = cfg_search(cfg.getint, key, sections)
- lim = cfg_search(cfg.getint, key, ['%s LIMIT' % ss, 'LIMIT'])
- cc.__dict__[key] = min(val,lim)
-
-def cfg_process_client_common(cc,ss,cs,ci):
- # returns sections to search in, iff password is defined, otherwise None
- cc.ci = ci
-
- sections = ['%s %s' % (ss,cs),
- cs,
- ss,
- 'DEFAULT']
-
- try: pwsection = cfg_search_section('password', sections)
- except NoOptionError: return None
-
- pw = cfg.get(pwsection, 'password')
- cc.password = pw.encode('utf-8')
-
- cfg_process_client_limited(cc,ss,sections,'target_requests_outstanding')
- cfg_process_client_limited(cc,ss,sections,'http_timeout')
-
- return sections
-
-def cfg_process_ipif(c, sections, varmap):
- for d, s in varmap:
- try: v = getattr(c, s)
- except AttributeError: continue
- setattr(c, d, v)
-
- #print('CFGIPIF',repr((varmap, sections, c.__dict__)),file=sys.stderr)
-
- section = cfg_search_section('ipif', sections)
- c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
-
-#---------- startup ----------
-
-def common_startup(process_cfg):
- # calls process_cfg(putative_clients, putative_servers)
-
- # ConfigParser hates #-comments after values
- trailingcomments_re = regexp.compile(r'#.*')
- cfg.read_string(trailingcomments_re.sub('', defcfg))
- need_defcfg = True
-
- def readconfig(pathname, mandatory=True):
- def log(m, p=pathname):
- if not DBG.CONFIG in debug_set: return
- print('DBG.CONFIG: %s: %s' % (m, pathname))
-
- try:
- files = os.listdir(pathname)
-
- except FileNotFoundError:
- if mandatory: raise
- log('skipped')
- return
-
- except NotADirectoryError:
- cfg.read(pathname)
- log('read file')
- return
-
- # is a directory
- log('directory')
- re = regexp.compile('[^-A-Za-z0-9_]')
- for f in os.listdir(cdir):
- if re.search(f): continue
- subpath = pathname + '/' + f
- try:
- os.stat(subpath)
- except FileNotFoundError:
- log('entry skipped', subpath)
- continue
- cfg.read(subpath)
- log('entry read', subpath)
-
- def oc_config(od,os, value, op):
- nonlocal need_defcfg
- need_defcfg = False
- readconfig(value)
-
- def dfs_less_detailed(dl):
- return [df for df in DBG.iterconstants() if df <= dl]
-
- def ds_default(od,os,dl,op):
- global debug_set
- debug_set = set(dfs_less_detailed(debug_def_detail))
-
- def ds_select(od,os, spec, op):
- for it in spec.split(','):
-
- if it.startswith('-'):
- mutator = debug_set.discard
- it = it[1:]
- else:
- mutator = debug_set.add
-
- if it == '+':
- dfs = DBG.iterconstants()
-
- else:
- if it.endswith('+'):
- mapper = dfs_less_detailed
- it = it[0:len(it)-1]
- else:
- mapper = lambda x: [x]
-
- try:
- dfspec = DBG.lookupByName(it)
- except ValueError:
- optparser.error('unknown debug flag %s in --debug-select' % it)
-
- dfs = mapper(dfspec)
-
- for df in dfs:
- mutator(df)
-
- optparser.add_option('-D', '--debug',
- nargs=0,
- action='callback',
- help='enable default debug (to stdout)',
- callback= ds_default)
-
- optparser.add_option('--debug-select',
- nargs=1,
- type='string',
- metavar='[-]DFLAG[+]|[-]+,...',
- help=
-'''enable (`-': disable) each specified DFLAG;
-`+': do same for all "more interesting" DFLAGSs;
-just `+': all DFLAGs.
- DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]),
- action='callback',
- callback= ds_select)
-
- optparser.add_option('-c', '--config',
- nargs=1,
- type='string',
- metavar='CONFIGFILE',
- dest='configfile',
- action='callback',
- callback= oc_config)
-
- (opts, args) = optparser.parse_args()
- if len(args): optparser.error('no non-option arguments please')
-
- if need_defcfg:
- readconfig('/etc/hippotat/config', False)
- readconfig('/etc/hippotat/config.d', False)
-
- try:
- (pss, pcs) = _cfg_process_putatives()
- process_cfg(pss, pcs)
- except (configparser.Error, ValueError):
- traceback.print_exc(file=sys.stderr)
- print('\nInvalid configuration, giving up.', file=sys.stderr)
- sys.exit(12)
-
- #print(repr(debug_set), file=sys.stderr)
-
- log_formatter = twisted.logger.formatEventAsClassicLogText
- stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
- stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
- pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
- stdsomething_obs = twisted.logger.FilteringLogObserver(
- stderr_obs, [pred], stdout_obs
- )
- log_observer = twisted.logger.FilteringLogObserver(
- stdsomething_obs, [LogNotBoringTwisted()]
- )
- #log_observer = stdsomething_obs
- twisted.logger.globalLogBeginner.beginLoggingTo(
- [ log_observer, crash_on_critical ]
- )
-
-def common_run():
- log_debug(DBG.INIT, 'entering reactor')
- if not _crashing: reactor.run()
- print('CRASHED (end)', file=sys.stderr)