4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from zope.interface import implementer
12 from twisted.internet import reactor
13 import twisted.internet.endpoints
15 from twisted.logger import LogLevel
16 import twisted.python.constants
17 from twisted.python.constants import NamedConstant
20 from ipaddress import AddressValueError
22 from optparse import OptionParser
24 from configparser import ConfigParser
25 from configparser import NoOptionError
27 from functools import partial
36 import hippotat.slip as slip
38 class DBG(twisted.python.constants.Names):
39 INIT = NamedConstant()
40 CONFIG = NamedConstant()
41 ROUTE = NamedConstant()
42 DROP = NamedConstant()
43 FLOW = NamedConstant()
44 HTTP = NamedConstant()
45 TWISTED = NamedConstant()
46 QUEUE = NamedConstant()
47 HTTP_CTRL = NamedConstant()
48 QUEUE_CTRL = NamedConstant()
49 HTTP_FULL = NamedConstant()
50 CTRL_DUMP = NamedConstant()
51 SLIP_FULL = NamedConstant()
52 DATA_COMPLETE = NamedConstant()
54 _hex_codec = codecs.getencoder('hex_codec')
56 #---------- logging ----------
58 org_stderr = sys.stderr
60 log = twisted.logger.Logger()
63 debug_def_detail = DBG.HTTP
65 def log_debug(dflag, msg, idof=None, d=None):
66 if dflag not in debug_set: return
67 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
69 msg = '[%#x] %s' % (id(idof), msg)
72 if not DBG.DATA_COMPLETE in debug_set:
76 d = _hex_codec(d)[0].decode('ascii')
77 msg += ' ' + d + trunc
78 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
80 @implementer(twisted.logger.ILogFilterPredicate)
81 class LogNotBoringTwisted:
82 def __call__(self, event):
83 yes = twisted.logger.PredicateResult.yes
84 no = twisted.logger.PredicateResult.no
86 if event.get('log_level') != LogLevel.info:
88 dflag = event.get('dflag')
89 if dflag in debug_set: return yes
90 if dflag is None and DBG.TWISTED in debug_set: return yes
93 print(traceback.format_exc(), file=org_stderr)
96 #---------- default config ----------
100 #[<client>] overrides
101 max_batch_down = 65536 # used by server, subject to [limits]
102 max_queue_time = 10 # used by server, subject to [limits]
103 target_requests_outstanding = 3 # must match; subject to [limits] on server
104 http_timeout = 30 # used by both } must be
105 http_timeout_grace = 5 # used by both } compatible
106 max_requests_outstanding = 4 # used by client
107 max_batch_up = 4000 # used by client
108 http_retry = 5 # used by client
110 #[server] or [<client>] overrides
111 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
112 # extra interpolations: %(local)s %(peer)s %(rnet)s
113 # obtained on server [virtual]server [virtual]relay [virtual]network
114 # from on client <client> [virtual]server [virtual]routes
119 # network = <prefix>/<len> # mandatory for server
120 # server = <ipaddr> # used by both, default is computed from `network'
121 # relay = <ipaddr> # used by server, default from `network' and `server'
122 # default server is first host in network
123 # default relay is first host which is not server
126 # addrs = 127.0.0.1 ::1 # mandatory for server
127 port = 80 # used by server
128 # url # used by client; default from first `addrs' and `port'
130 # [<client-ip4-or-ipv6-address>]
131 # password = <password> # used by both, must match
134 max_batch_down = 262144 # used by server
135 max_queue_time = 121 # used by server
136 http_timeout = 121 # used by server
137 target_requests_outstanding = 10 # used by server
140 # these need to be defined here so that they can be imported by import *
141 cfg = ConfigParser(strict=False)
142 optparser = OptionParser()
144 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
145 def mime_translate(s):
146 # SLIP-encoded packets cannot contain ESC ESC.
147 # Swap `-' and ESC. The result cannot contain `--'
148 return s.translate(_mimetrans)
151 def __init__(self, d = { }):
154 return 'ConfigResults('+repr(self.__dict__)+')'
158 def log_discard(packet, iface, saddr, daddr, why):
160 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
163 #---------- packet parsing ----------
165 def packet_addrs(packet):
166 version = packet[0] >> 4
170 factory = ipaddress.IPv4Address
174 factory = ipaddress.IPv6Address
176 raise ValueError('unsupported IP version %d' % version)
177 saddr = factory(packet[ saddroff : saddroff + addrlen ])
178 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
179 return (saddr, daddr)
181 #---------- address handling ----------
185 r = ipaddress.IPv4Address(input)
186 except AddressValueError:
187 r = ipaddress.IPv6Address(input)
190 def ipnetwork(input):
192 r = ipaddress.IPv4Network(input)
193 except NetworkValueError:
194 r = ipaddress.IPv6Network(input)
197 #---------- ipif (SLIP) subprocess ----------
199 class SlipStreamDecoder():
200 def __init__(self, desc, on_packet):
202 self._on_packet = on_packet
204 self._log('__init__')
206 def _log(self, msg, **kwargs):
207 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
209 def inputdata(self, data):
210 self._log('inputdata', d=data)
211 packets = slip.decode(data)
212 packets[0] = self._buffer + packets[0]
213 self._buffer = packets.pop()
214 for packet in packets:
215 self._maybe_packet(packet)
216 self._log('bufremain', d=self._buffer)
218 def _maybe_packet(self, packet):
219 self._log('maybepacket', d=packet)
221 self._on_packet(packet)
225 self._maybe_packet(self._buffer)
228 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
229 def __init__(self, router):
230 self._router = router
231 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
232 def connectionMade(self): pass
233 def outReceived(self, data):
234 self._decoder.inputdata(data)
235 def slip_on_packet(self, packet):
236 (saddr, daddr) = packet_addrs(packet)
237 if saddr.is_link_local or daddr.is_link_local:
238 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
240 self._router(packet, saddr, daddr)
241 def processEnded(self, status):
242 status.raiseException()
244 def start_ipif(command, router):
246 ipif = _IpifProcessProtocol(router)
247 reactor.spawnProcess(ipif,
248 '/bin/sh',['sh','-xc', command],
249 childFDs={0:'w', 1:'r', 2:2},
252 def queue_inbound(packet):
253 log_debug(DBG.FLOW, "queue_inbound", d=packet)
254 ipif.transport.write(slip.delimiter)
255 ipif.transport.write(slip.encode(packet))
256 ipif.transport.write(slip.delimiter)
258 #---------- packet queue ----------
261 def __init__(self, desc, max_queue_time):
264 self._max_queue_time = max_queue_time
265 self._pq = collections.deque() # packets
267 def _log(self, dflag, msg, **kwargs):
268 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
270 def append(self, packet):
271 self._log(DBG.QUEUE, 'append', d=packet)
272 self._pq.append((time.monotonic(), packet))
275 self._log(DBG.QUEUE, 'nonempty ?')
277 try: (queuetime, packet) = self._pq[0]
279 self._log(DBG.QUEUE, 'nonempty ? empty.')
282 age = time.monotonic() - queuetime
283 if age > self._max_queue_time:
284 # strip old packets off the front
285 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
289 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
292 def process(self, sizequery, moredata, max_batch):
293 # sizequery() should return size of batch so far
294 # moredata(s) should add s to batch
295 self._log(DBG.QUEUE, 'process...')
297 try: (dummy, packet) = self._pq[0]
299 self._log(DBG.QUEUE, 'process... empty')
302 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
304 encoded = slip.encode(packet)
307 self._log(DBG.QUEUE_CTRL,
308 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
312 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
313 self._log(DBG.QUEUE_CTRL, 'process... overflow')
315 moredata(slip.delimiter)
320 #---------- error handling ----------
327 print('========== CRASH ==========', err,
328 '===========================', file=sys.stderr)
330 except twisted.internet.error.ReactorNotRunning: pass
332 def crash_on_defer(defer):
333 defer.addErrback(lambda err: crash(err))
335 def crash_on_critical(event):
336 if event.get('log_level') >= LogLevel.critical:
337 crash(twisted.logger.formatEvent(event))
339 #---------- config processing ----------
341 def process_cfg_common_always():
343 c.mtu = cfg.get('virtual','mtu')
345 def process_cfg_ipif(section, varmap):
347 try: v = getattr(c, s)
348 except AttributeError: continue
353 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
355 def process_cfg_network():
356 c.network = ipnetwork(cfg.get('virtual','network'))
357 if c.network.num_addresses < 3 + 2:
358 raise ValueError('network needs at least 2^3 addresses')
360 def process_cfg_server():
362 c.server = cfg.get('virtual','server')
363 except NoOptionError:
364 process_cfg_network()
365 c.server = next(c.network.hosts())
368 def __init__(self, port, addrspec):
372 self.addr = ipaddress.IPv4Address(addrspec)
373 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
375 except AddressValueError:
376 self.addr = ipaddress.IPv6Address(addrspec)
377 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
378 self._inurl = b'[%s]'
379 def make_endpoint(self):
380 return self._endpointfactory(reactor, self.port, self.addr)
382 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
383 if self.port != 80: url += b':%d' % self.port
387 def process_cfg_saddrs():
388 try: port = cfg.getint('server','port')
389 except NoOptionError: port = 80
392 for addrspec in cfg.get('server','addrs').split():
393 sa = ServerAddr(port, addrspec)
396 def process_cfg_clients(constructor):
398 for cs in cfg.sections():
399 if not (':' in cs or '.' in cs): continue
401 pw = cfg.get(cs, 'password')
402 pw = pw.encode('utf-8')
403 constructor(ci,cs,pw)
405 #---------- startup ----------
407 def common_startup(process_cfg):
408 re = regexp.compile('#.*')
409 cfg.read_string(re.sub('', defcfg))
412 def readconfig(pathname, mandatory=True):
413 def log(m, p=pathname):
414 if not DBG.CONFIG in debug_set: return
415 print('DBG.CONFIG: %s: %s' % (m, pathname))
418 files = os.listdir(pathname)
420 except FileNotFoundError:
425 except NotADirectoryError:
432 re = regexp.compile('[^-A-Za-z0-9_]')
433 for f in os.listdir(cdir):
434 if re.search(f): continue
435 subpath = pathname + '/' + f
438 except FileNotFoundError:
439 log('entry skipped', subpath)
442 log('entry read', subpath)
444 def oc_config(od,os, value, op):
449 def dfs_less_detailed(dl):
450 return [df for df in DBG.iterconstants() if df <= dl]
452 def ds_default(od,os,dl,op):
454 debug_set = set(dfs_less_detailed(debug_def_detail))
456 def ds_select(od,os, spec, op):
457 for it in spec.split(','):
459 if it.startswith('-'):
460 mutator = debug_set.discard
463 mutator = debug_set.add
466 dfs = DBG.iterconstants()
470 mapper = dfs_less_detailed
473 mapper = lambda x: [x]
476 dfspec = DBG.lookupByName(it)
478 optparser.error('unknown debug flag %s in --debug-select' % it)
485 optparser.add_option('-D', '--debug',
488 help='enable default debug (to stdout)',
489 callback= ds_default)
491 optparser.add_option('--debug-select',
494 metavar='[-]DFLAG[+]|[-]+,...',
496 '''enable (`-': disable) each specified DFLAG;
497 `+': do same for all "more interesting" DFLAGSs;
498 just `+': all DFLAGs.
499 DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]),
503 optparser.add_option('-c', '--config',
506 metavar='CONFIGFILE',
511 (opts, args) = optparser.parse_args()
512 if len(args): optparser.error('no non-option arguments please')
515 readconfig('/etc/hippotat/config', False)
516 readconfig('/etc/hippotat/config.d', False)
519 except (configparser.Error, ValueError):
520 traceback.print_exc(file=sys.stderr)
521 print('\nInvalid configuration, giving up.', file=sys.stderr)
524 #print(repr(debug_set), file=sys.stderr)
526 log_formatter = twisted.logger.formatEventAsClassicLogText
527 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
528 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
529 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
530 stdsomething_obs = twisted.logger.FilteringLogObserver(
531 stderr_obs, [pred], stdout_obs
533 log_observer = twisted.logger.FilteringLogObserver(
534 stdsomething_obs, [LogNotBoringTwisted()]
536 #log_observer = stdsomething_obs
537 twisted.logger.globalLogBeginner.beginLoggingTo(
538 [ log_observer, crash_on_critical ]
542 log_debug(DBG.INIT, 'entering reactor')
543 if not _crashing: reactor.run()
544 print('CRASHED (end)', file=sys.stderr)