chiark / gitweb /
rename hippotat module to hippotatlib
[hippotat.git] / hippotat / __init__.py
diff --git a/hippotat/__init__.py b/hippotat/__init__.py
deleted file mode 100644 (file)
index 654a5c5..0000000
+++ /dev/null
@@ -1,639 +0,0 @@
-# -*- python -*-
-
-import signal
-signal.signal(signal.SIGINT, signal.SIG_DFL)
-
-import sys
-import os
-
-from zope.interface import implementer
-
-import twisted
-from twisted.internet import reactor
-import twisted.internet.endpoints
-import twisted.logger
-from twisted.logger import LogLevel
-import twisted.python.constants
-from twisted.python.constants import NamedConstant
-
-import ipaddress
-from ipaddress import AddressValueError
-
-from optparse import OptionParser
-import configparser
-from configparser import ConfigParser
-from configparser import NoOptionError
-
-from functools import partial
-
-import collections
-import time
-import codecs
-import traceback
-
-import re as regexp
-
-import hippotat.slip as slip
-
-class DBG(twisted.python.constants.Names):
-  INIT = NamedConstant()
-  CONFIG = NamedConstant()
-  ROUTE = NamedConstant()
-  DROP = NamedConstant()
-  FLOW = NamedConstant()
-  HTTP = NamedConstant()
-  TWISTED = NamedConstant()
-  QUEUE = NamedConstant()
-  HTTP_CTRL = NamedConstant()
-  QUEUE_CTRL = NamedConstant()
-  HTTP_FULL = NamedConstant()
-  CTRL_DUMP = NamedConstant()
-  SLIP_FULL = NamedConstant()
-  DATA_COMPLETE = NamedConstant()
-
-_hex_codec = codecs.getencoder('hex_codec')
-
-#---------- logging ----------
-
-org_stderr = sys.stderr
-
-log = twisted.logger.Logger()
-
-debug_set = set()
-debug_def_detail = DBG.HTTP
-
-def log_debug(dflag, msg, idof=None, d=None):
-  if dflag not in debug_set: return
-  #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
-  if idof is not None:
-    msg = '[%#x] %s' % (id(idof), msg)
-  if d is not None:
-    trunc = ''
-    if not DBG.DATA_COMPLETE in debug_set:
-      if len(d) > 64:
-        d = d[0:64]
-        trunc = '...'
-    d = _hex_codec(d)[0].decode('ascii')
-    msg += ' ' + d + trunc
-  log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
-
-@implementer(twisted.logger.ILogFilterPredicate)
-class LogNotBoringTwisted:
-  def __call__(self, event):
-    yes = twisted.logger.PredicateResult.yes
-    no  = twisted.logger.PredicateResult.no
-    try:
-      if event.get('log_level') != LogLevel.info:
-        return yes
-      dflag = event.get('dflag')
-      if dflag is False                            : return yes
-      if dflag                         in debug_set: return yes
-      if dflag is None and DBG.TWISTED in debug_set: return yes
-      return no
-    except Exception:
-      print(traceback.format_exc(), file=org_stderr)
-      return yes
-
-#---------- default config ----------
-
-defcfg = '''
-[DEFAULT]
-max_batch_down = 65536
-max_queue_time = 10
-target_requests_outstanding = 3
-http_timeout = 30
-http_timeout_grace = 5
-max_requests_outstanding = 6
-max_batch_up = 4000
-http_retry = 5
-port = 80
-vroutes = ''
-
-#[server] or [<client>] overrides
-ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
-
-# relating to virtual network
-mtu = 1500
-
-[SERVER]
-server = SERVER
-# addrs = 127.0.0.1 ::1
-# url
-
-# relating to virtual network
-vvnetwork = 172.24.230.192
-# vnetwork = <prefix>/<len>
-# vadd  r  = <ipaddr>
-# vrelay   = <ipaddr>
-
-
-# [<client-ip4-or-ipv6-address>]
-# password = <password>    # used by both, must match
-
-[LIMIT]
-max_batch_down = 262144
-max_queue_time = 121
-http_timeout = 121
-target_requests_outstanding = 10
-'''
-
-# these need to be defined here so that they can be imported by import *
-cfg = ConfigParser(strict=False)
-optparser = OptionParser()
-
-_mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
-def mime_translate(s):
-  # SLIP-encoded packets cannot contain ESC ESC.
-  # Swap `-' and ESC.  The result cannot contain `--'
-  return s.translate(_mimetrans)
-
-class ConfigResults:
-  def __init__(self):
-    pass
-  def __repr__(self):
-    return 'ConfigResults('+repr(self.__dict__)+')'
-
-def log_discard(packet, iface, saddr, daddr, why):
-  log_debug(DBG.DROP,
-            'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
-            d=packet)
-
-#---------- packet parsing ----------
-
-def packet_addrs(packet):
-  version = packet[0] >> 4
-  if version == 4:
-    addrlen = 4
-    saddroff = 3*4
-    factory = ipaddress.IPv4Address
-  elif version == 6:
-    addrlen = 16
-    saddroff = 2*4
-    factory = ipaddress.IPv6Address
-  else:
-    raise ValueError('unsupported IP version %d' % version)
-  saddr = factory(packet[ saddroff           : saddroff + addrlen   ])
-  daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
-  return (saddr, daddr)
-
-#---------- address handling ----------
-
-def ipaddr(input):
-  try:
-    r = ipaddress.IPv4Address(input)
-  except AddressValueError:
-    r = ipaddress.IPv6Address(input)
-  return r
-
-def ipnetwork(input):
-  try:
-    r = ipaddress.IPv4Network(input)
-  except NetworkValueError:
-    r = ipaddress.IPv6Network(input)
-  return r
-
-#---------- ipif (SLIP) subprocess ----------
-
-class SlipStreamDecoder():
-  def __init__(self, desc, on_packet):
-    self._buffer = b''
-    self._on_packet = on_packet
-    self._desc = desc
-    self._log('__init__')
-
-  def _log(self, msg, **kwargs):
-    log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
-
-  def inputdata(self, data):
-    self._log('inputdata', d=data)
-    data = self._buffer + data
-    self._buffer = b''
-    packets = slip.decode(data, True)
-    self._buffer = packets.pop()
-    for packet in packets:
-      self._maybe_packet(packet)
-    self._log('bufremain', d=self._buffer)
-
-  def _maybe_packet(self, packet):
-    self._log('maybepacket', d=packet)
-    if len(packet):
-      self._on_packet(packet)
-
-  def flush(self):
-    self._log('flush')
-    data = self._buffer
-    self._buffer = b''
-    packets = slip.decode(data)
-    assert(len(packets) == 1)
-    self._maybe_packet(packets[0])
-
-class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
-  def __init__(self, router):
-    self._router = router
-    self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
-  def connectionMade(self): pass
-  def outReceived(self, data):
-    self._decoder.inputdata(data)
-  def slip_on_packet(self, packet):
-    (saddr, daddr) = packet_addrs(packet)
-    if saddr.is_link_local or daddr.is_link_local:
-      log_discard(packet, 'ipif', saddr, daddr, 'link-local')
-      return
-    self._router(packet, saddr, daddr)
-  def processEnded(self, status):
-    status.raiseException()
-
-def start_ipif(command, router):
-  ipif = _IpifProcessProtocol(router)
-  reactor.spawnProcess(ipif,
-                       '/bin/sh',['sh','-xc', command],
-                       childFDs={0:'w', 1:'r', 2:2},
-                       env=None)
-  return ipif
-
-def queue_inbound(ipif, packet):
-  log_debug(DBG.FLOW, "queue_inbound", d=packet)
-  ipif.transport.write(slip.delimiter)
-  ipif.transport.write(slip.encode(packet))
-  ipif.transport.write(slip.delimiter)
-
-#---------- packet queue ----------
-
-class PacketQueue():
-  def __init__(self, desc, max_queue_time):
-    self._desc = desc
-    assert(desc + '')
-    self._max_queue_time = max_queue_time
-    self._pq = collections.deque() # packets
-
-  def _log(self, dflag, msg, **kwargs):
-    log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
-
-  def append(self, packet):
-    self._log(DBG.QUEUE, 'append', d=packet)
-    self._pq.append((time.monotonic(), packet))
-
-  def nonempty(self):
-    self._log(DBG.QUEUE, 'nonempty ?')
-    while True:
-      try: (queuetime, packet) = self._pq[0]
-      except IndexError:
-        self._log(DBG.QUEUE, 'nonempty ? empty.')
-        return False
-
-      age = time.monotonic() - queuetime
-      if age > self._max_queue_time:
-        # strip old packets off the front
-        self._log(DBG.QUEUE, 'dropping (old)', d=packet)
-        self._pq.popleft()
-        continue
-
-      self._log(DBG.QUEUE, 'nonempty ? nonempty.')
-      return True
-
-  def process(self, sizequery, moredata, max_batch):
-    # sizequery() should return size of batch so far
-    # moredata(s) should add s to batch
-    self._log(DBG.QUEUE, 'process...')
-    while True:
-      try: (dummy, packet) = self._pq[0]
-      except IndexError:
-        self._log(DBG.QUEUE, 'process... empty')
-        break
-
-      self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
-
-      encoded = slip.encode(packet)
-      sofar = sizequery()  
-
-      self._log(DBG.QUEUE_CTRL,
-                'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
-                d=encoded)
-
-      if sofar > 0:
-        if sofar + len(slip.delimiter) + len(encoded) > max_batch:
-          self._log(DBG.QUEUE_CTRL, 'process... overflow')
-          break
-        moredata(slip.delimiter)
-
-      moredata(encoded)
-      self._pq.popleft()
-
-#---------- error handling ----------
-
-_crashing = False
-
-def crash(err):
-  global _crashing
-  _crashing = True
-  print('========== CRASH ==========', err,
-        '===========================', file=sys.stderr)
-  try: reactor.stop()
-  except twisted.internet.error.ReactorNotRunning: pass
-
-def crash_on_defer(defer):
-  defer.addErrback(lambda err: crash(err))
-
-def crash_on_critical(event):
-  if event.get('log_level') >= LogLevel.critical:
-    crash(twisted.logger.formatEvent(event))
-
-#---------- config processing ----------
-
-def _cfg_process_putatives():
-  servers = { }
-  clients = { }
-  # maps from abstract object to canonical name for cs's
-
-  def putative(cmap, abstract, canoncs):
-    try:
-      current_canoncs = cmap[abstract]
-    except KeyError:
-      pass
-    else:
-      assert(current_canoncs == canoncs)
-    cmap[abstract] = canoncs
-
-  server_pat = r'[-.0-9A-Za-z]+'
-  client_pat = r'[.:0-9a-f]+'
-  server_re = regexp.compile(server_pat)
-  serverclient_re = regexp.compile(server_pat + r' ' + client_pat)
-
-  for cs in cfg.sections():
-    if cs == 'LIMIT':
-      # plan A "[LIMIT]"
-      continue
-
-    try:
-      # plan B "[<client>]" part 1
-      ci = ipaddr(cs)
-    except AddressValueError:
-
-      if server_re.fullmatch(cs):
-        # plan C "[<servername>]"
-        putative(servers, cs, cs)
-        continue
-
-      if serverclient_re.fullmatch(cs):
-        # plan D "[<servername> <client>]" part 1
-        (pss,pcs) = cs.split(' ')
-
-        if pcs == 'LIMIT':
-          # plan E "[<servername> LIMIT]"
-          continue
-
-        try:
-          # plan D "[<servername> <client>]" part 2
-          ci = ipaddr(pc)
-        except AddressValueError:
-          # plan F "[<some thing we do not understand>]"
-          # well, we ignore this
-          print('warning: ignoring config section %s' % cs, file=sys.stderr)
-          continue
-
-        else: # no AddressValueError
-          # plan D "[<servername> <client]" part 3
-          putative(clients, ci, pcs)
-          putative(servers, pss, pss)
-          continue
-
-    else: # no AddressValueError
-      # plan B "[<client>" part 2
-      putative(clients, ci, cs)
-      continue
-
-  return (servers, clients)
-
-def cfg_process_common(c, ss):
-  c.mtu = cfg.getint(ss, 'mtu')
-
-def cfg_process_saddrs(c, ss):
-  class ServerAddr():
-    def __init__(self, port, addrspec):
-      self.port = port
-      # also self.addr
-      try:
-        self.addr = ipaddress.IPv4Address(addrspec)
-        self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
-        self._inurl = b'%s'
-      except AddressValueError:
-        self.addr = ipaddress.IPv6Address(addrspec)
-        self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
-        self._inurl = b'[%s]'
-    def make_endpoint(self):
-      return self._endpointfactory(reactor, self.port, self.addr)
-    def url(self):
-      url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
-      if self.port != 80: url += b':%d' % self.port
-      url += b'/'
-      return url
-
-  c.port = cfg.getint(ss,'port')
-  c.saddrs = [ ]
-  for addrspec in cfg.get(ss, 'addrs').split():
-    sa = ServerAddr(c.port, addrspec)
-    c.saddrs.append(sa)
-
-def cfg_process_vnetwork(c, ss):
-  c.vnetwork = ipnetwork(cfg.get(ss,'vnetwork'))
-  if c.vnetwork.num_addresses < 3 + 2:
-    raise ValueError('vnetwork needs at least 2^3 addresses')
-
-def cfg_process_vaddr(c, ss):
-  try:
-    c.vaddr = cfg.get(ss,'vaddr')
-  except NoOptionError:
-    cfg_process_vnetwork(c, ss)
-    c.vaddr = next(c.vnetwork.hosts())
-
-def cfg_search_section(key,sections):
-  for section in sections:
-    if cfg.has_option(section, key):
-      return section
-  raise NoOptionError(key, repr(sections))
-
-def cfg_search(getter,key,sections):
-  section = cfg_search_section(key,sections)
-  return getter(section, key)
-
-def cfg_process_client_limited(cc,ss,sections,key):
-  val = cfg_search(cfg.getint, key, sections)
-  lim = cfg_search(cfg.getint, key, ['%s LIMIT' % ss, 'LIMIT'])
-  cc.__dict__[key] = min(val,lim)
-
-def cfg_process_client_common(cc,ss,cs,ci):
-  # returns sections to search in, iff password is defined, otherwise None
-  cc.ci = ci
-
-  sections = ['%s %s' % (ss,cs),
-              cs,
-              ss,
-              'DEFAULT']
-
-  try: pwsection = cfg_search_section('password', sections)
-  except NoOptionError: return None
-    
-  pw = cfg.get(pwsection, 'password')
-  cc.password = pw.encode('utf-8')
-
-  cfg_process_client_limited(cc,ss,sections,'target_requests_outstanding')
-  cfg_process_client_limited(cc,ss,sections,'http_timeout')
-
-  return sections
-
-def cfg_process_ipif(c, sections, varmap):
-  for d, s in varmap:
-    try: v = getattr(c, s)
-    except AttributeError: continue
-    setattr(c, d, v)
-
-  #print('CFGIPIF',repr((varmap, sections, c.__dict__)),file=sys.stderr)
-
-  section = cfg_search_section('ipif', sections)
-  c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
-
-#---------- startup ----------
-
-def common_startup(process_cfg):
-  # calls process_cfg(putative_clients, putative_servers)
-
-  # ConfigParser hates #-comments after values
-  trailingcomments_re = regexp.compile(r'#.*')
-  cfg.read_string(trailingcomments_re.sub('', defcfg))
-  need_defcfg = True
-
-  def readconfig(pathname, mandatory=True):
-    def log(m, p=pathname):
-      if not DBG.CONFIG in debug_set: return
-      print('DBG.CONFIG: %s: %s' % (m, pathname))
-
-    try:
-      files = os.listdir(pathname)
-
-    except FileNotFoundError:
-      if mandatory: raise
-      log('skipped')
-      return
-
-    except NotADirectoryError:
-      cfg.read(pathname)
-      log('read file')
-      return
-
-    # is a directory
-    log('directory')
-    re = regexp.compile('[^-A-Za-z0-9_]')
-    for f in os.listdir(cdir):
-      if re.search(f): continue
-      subpath = pathname + '/' + f
-      try:
-        os.stat(subpath)
-      except FileNotFoundError:
-        log('entry skipped', subpath)
-        continue
-      cfg.read(subpath)
-      log('entry read', subpath)
-      
-  def oc_config(od,os, value, op):
-    nonlocal need_defcfg
-    need_defcfg = False
-    readconfig(value)
-
-  def dfs_less_detailed(dl):
-    return [df for df in DBG.iterconstants() if df <= dl]
-
-  def ds_default(od,os,dl,op):
-    global debug_set
-    debug_set = set(dfs_less_detailed(debug_def_detail))
-
-  def ds_select(od,os, spec, op):
-    for it in spec.split(','):
-
-      if it.startswith('-'):
-        mutator = debug_set.discard
-        it = it[1:]
-      else:
-        mutator = debug_set.add
-
-      if it == '+':
-        dfs = DBG.iterconstants()
-
-      else:
-        if it.endswith('+'):
-          mapper = dfs_less_detailed
-          it = it[0:len(it)-1]
-        else:
-          mapper = lambda x: [x]
-
-          try:
-            dfspec = DBG.lookupByName(it)
-          except ValueError:
-            optparser.error('unknown debug flag %s in --debug-select' % it)
-
-        dfs = mapper(dfspec)
-
-      for df in dfs:
-        mutator(df)
-
-  optparser.add_option('-D', '--debug',
-                       nargs=0,
-                       action='callback',
-                       help='enable default debug (to stdout)',
-                       callback= ds_default)
-
-  optparser.add_option('--debug-select',
-                       nargs=1,
-                       type='string',
-                       metavar='[-]DFLAG[+]|[-]+,...',
-                       help=
-'''enable (`-': disable) each specified DFLAG;
-`+': do same for all "more interesting" DFLAGSs;
-just `+': all DFLAGs.
-  DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]),
-                       action='callback',
-                       callback= ds_select)
-
-  optparser.add_option('-c', '--config',
-                       nargs=1,
-                       type='string',
-                       metavar='CONFIGFILE',
-                       dest='configfile',
-                       action='callback',
-                       callback= oc_config)
-
-  (opts, args) = optparser.parse_args()
-  if len(args): optparser.error('no non-option arguments please')
-
-  if need_defcfg:
-    readconfig('/etc/hippotat/config',   False)
-    readconfig('/etc/hippotat/config.d', False)
-
-  try:
-    (pss, pcs) = _cfg_process_putatives()
-    process_cfg(pss, pcs)
-  except (configparser.Error, ValueError):
-    traceback.print_exc(file=sys.stderr)
-    print('\nInvalid configuration, giving up.', file=sys.stderr)
-    sys.exit(12)
-
-  #print(repr(debug_set), file=sys.stderr)
-
-  log_formatter = twisted.logger.formatEventAsClassicLogText
-  stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
-  stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
-  pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
-  stdsomething_obs = twisted.logger.FilteringLogObserver(
-    stderr_obs, [pred], stdout_obs
-  )
-  log_observer = twisted.logger.FilteringLogObserver(
-    stdsomething_obs, [LogNotBoringTwisted()]
-  )
-  #log_observer = stdsomething_obs
-  twisted.logger.globalLogBeginner.beginLoggingTo(
-    [ log_observer, crash_on_critical ]
-    )
-
-def common_run():
-  log_debug(DBG.INIT, 'entering reactor')
-  if not _crashing: reactor.run()
-  print('CRASHED (end)', file=sys.stderr)