4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from zope.interface import implementer
12 from twisted.internet import reactor
13 import twisted.internet.endpoints
15 from twisted.logger import LogLevel
16 import twisted.python.constants
17 from twisted.python.constants import NamedConstant
20 from ipaddress import AddressValueError
22 from optparse import OptionParser
24 from configparser import ConfigParser
25 from configparser import NoOptionError
27 from functools import partial
36 import hippotat.slip as slip
38 class DBG(twisted.python.constants.Names):
39 INIT = NamedConstant()
40 CONFIG = NamedConstant()
41 ROUTE = NamedConstant()
42 DROP = NamedConstant()
43 FLOW = NamedConstant()
44 HTTP = NamedConstant()
45 TWISTED = NamedConstant()
46 QUEUE = NamedConstant()
47 HTTP_CTRL = NamedConstant()
48 QUEUE_CTRL = NamedConstant()
49 HTTP_FULL = NamedConstant()
50 CTRL_DUMP = NamedConstant()
51 SLIP_FULL = NamedConstant()
52 DATA_COMPLETE = NamedConstant()
54 _hex_codec = codecs.getencoder('hex_codec')
56 #---------- logging ----------
58 org_stderr = sys.stderr
60 log = twisted.logger.Logger()
63 debug_def_detail = DBG.HTTP
65 def log_debug(dflag, msg, idof=None, d=None):
66 if dflag not in debug_set: return
67 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
69 msg = '[%#x] %s' % (id(idof), msg)
72 if not DBG.DATA_COMPLETE in debug_set:
76 d = _hex_codec(d)[0].decode('ascii')
77 msg += ' ' + d + trunc
78 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
80 @implementer(twisted.logger.ILogFilterPredicate)
81 class LogNotBoringTwisted:
82 def __call__(self, event):
83 yes = twisted.logger.PredicateResult.yes
84 no = twisted.logger.PredicateResult.no
86 if event.get('log_level') != LogLevel.info:
88 dflag = event.get('dflag')
89 if dflag in debug_set: return yes
90 if dflag is None and DBG.TWISTED in debug_set: return yes
93 print(traceback.format_exc(), file=org_stderr)
96 #---------- default config ----------
100 #[<client>] overrides
101 max_batch_down = 65536 # used by server, subject to [limits]
102 max_queue_time = 10 # used by server, subject to [limits]
103 target_requests_outstanding = 3 # must match; subject to [limits] on server
104 http_timeout = 30 # used by both } must be
105 http_timeout_grace = 5 # used by both } compatible
106 max_requests_outstanding = 4 # used by client
107 max_batch_up = 4000 # used by client
108 http_retry = 5 # used by client
110 #[server] or [<client>] overrides
111 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
112 # extra interpolations: %(local)s %(peer)s %(rnet)s
113 # obtained on server [virtual]server [virtual]relay [virtual]network
114 # from on client <client> [virtual]server [virtual]routes
119 # network = <prefix>/<len> # mandatory for server
120 # server = <ipaddr> # used by both, default is computed from `network'
121 # relay = <ipaddr> # used by server, default from `network' and `server'
122 # default server is first host in network
123 # default relay is first host which is not server
126 # addrs = 127.0.0.1 ::1 # mandatory for server
127 port = 80 # used by server
128 # url # used by client; default from first `addrs' and `port'
130 # [<client-ip4-or-ipv6-address>]
131 # password = <password> # used by both, must match
134 max_batch_down = 262144 # used by server
135 max_queue_time = 121 # used by server
136 http_timeout = 121 # used by server
137 target_requests_outstanding = 10 # used by server
140 # these need to be defined here so that they can be imported by import *
141 cfg = ConfigParser(strict=False)
142 optparser = OptionParser()
144 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
145 def mime_translate(s):
146 # SLIP-encoded packets cannot contain ESC ESC.
147 # Swap `-' and ESC. The result cannot contain `--'
148 return s.translate(_mimetrans)
151 def __init__(self, d = { }):
154 return 'ConfigResults('+repr(self.__dict__)+')'
158 def log_discard(packet, iface, saddr, daddr, why):
160 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
163 #---------- packet parsing ----------
165 def packet_addrs(packet):
166 version = packet[0] >> 4
170 factory = ipaddress.IPv4Address
174 factory = ipaddress.IPv6Address
176 raise ValueError('unsupported IP version %d' % version)
177 saddr = factory(packet[ saddroff : saddroff + addrlen ])
178 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
179 return (saddr, daddr)
181 #---------- address handling ----------
185 r = ipaddress.IPv4Address(input)
186 except AddressValueError:
187 r = ipaddress.IPv6Address(input)
190 def ipnetwork(input):
192 r = ipaddress.IPv4Network(input)
193 except NetworkValueError:
194 r = ipaddress.IPv6Network(input)
197 #---------- ipif (SLIP) subprocess ----------
199 class SlipStreamDecoder():
200 def __init__(self, desc, on_packet):
202 self._on_packet = on_packet
204 self._log('__init__')
206 def _log(self, msg, **kwargs):
207 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
209 def inputdata(self, data):
210 self._log('inputdata', d=data)
211 packets = slip.decode(data)
212 packets[0] = self._buffer + packets[0]
213 self._buffer = packets.pop()
214 for packet in packets:
215 self._maybe_packet(packet)
216 self._log('bufremain', d=self._buffer)
218 def _maybe_packet(self, packet):
219 self._log('maybepacket', d=packet)
221 self._on_packet(packet)
225 self._maybe_packet(self._buffer)
228 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
229 def __init__(self, router):
230 self._router = router
231 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
232 def connectionMade(self): pass
233 def outReceived(self, data):
234 self._decoder.inputdata(data)
235 def slip_on_packet(self, packet):
236 (saddr, daddr) = packet_addrs(packet)
237 if saddr.is_link_local or daddr.is_link_local:
238 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
240 self._router(packet, saddr, daddr)
241 def processEnded(self, status):
242 status.raiseException()
244 def start_ipif(command, router):
246 ipif = _IpifProcessProtocol(router)
247 reactor.spawnProcess(ipif,
248 '/bin/sh',['sh','-xc', command],
249 childFDs={0:'w', 1:'r', 2:2},
252 def queue_inbound(packet):
253 log_debug(DBG.FLOW, "queue_inbound", d=packet)
254 ipif.transport.write(slip.delimiter)
255 ipif.transport.write(slip.encode(packet))
256 ipif.transport.write(slip.delimiter)
258 #---------- packet queue ----------
261 def __init__(self, desc, max_queue_time):
264 self._max_queue_time = max_queue_time
265 self._pq = collections.deque() # packets
267 def _log(self, dflag, msg, **kwargs):
268 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
270 def append(self, packet):
271 self._log(DBG.QUEUE, 'append', d=packet)
272 self._pq.append((time.monotonic(), packet))
275 self._log(DBG.QUEUE, 'nonempty ?')
277 try: (queuetime, packet) = self._pq[0]
279 self._log(DBG.QUEUE, 'nonempty ? empty.')
282 age = time.monotonic() - queuetime
283 if age > self._max_queue_time:
284 # strip old packets off the front
285 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
289 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
292 def process(self, sizequery, moredata, max_batch):
293 # sizequery() should return size of batch so far
294 # moredata(s) should add s to batch
295 self._log(DBG.QUEUE, 'process...')
297 try: (dummy, packet) = self._pq[0]
299 self._log(DBG.QUEUE, 'process... empty')
302 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
304 encoded = slip.encode(packet)
307 self._log(DBG.QUEUE_CTRL,
308 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
312 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
313 self._log(DBG.QUEUE_CTRL, 'process... overflow')
315 moredata(slip.delimiter)
320 #---------- error handling ----------
327 print('========== CRASH ==========', err,
328 '===========================', file=sys.stderr)
330 except twisted.internet.error.ReactorNotRunning: pass
332 def crash_on_defer(defer):
333 defer.addErrback(lambda err: crash(err))
335 def crash_on_critical(event):
336 if event.get('log_level') >= LogLevel.critical:
337 crash(twisted.logger.formatEvent(event))
339 #---------- config processing ----------
341 def process_cfg_common_always():
343 c.mtu = cfg.get('virtual','mtu')
345 def process_cfg_ipif(section, varmap):
347 try: v = getattr(c, s)
348 except AttributeError: continue
353 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
355 def process_cfg_network():
356 c.network = ipnetwork(cfg.get('virtual','network'))
357 if c.network.num_addresses < 3 + 2:
358 raise ValueError('network needs at least 2^3 addresses')
360 def process_cfg_server():
362 c.server = cfg.get('virtual','server')
363 except NoOptionError:
364 process_cfg_network()
365 c.server = next(c.network.hosts())
368 def __init__(self, port, addrspec):
372 self.addr = ipaddress.IPv4Address(addrspec)
373 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
375 except AddressValueError:
376 self.addr = ipaddress.IPv6Address(addrspec)
377 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
378 self._inurl = b'[%s]'
379 def make_endpoint(self):
380 return self._endpointfactory(reactor, self.port, self.addr)
382 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
383 if self.port != 80: url += b':%d' % self.port
387 def process_cfg_saddrs():
388 try: port = cfg.getint('server','port')
389 except NoOptionError: port = 80
392 for addrspec in cfg.get('server','addrs').split():
393 sa = ServerAddr(port, addrspec)
396 def process_cfg_clients(constructor):
398 for cs in cfg.sections():
399 if not (':' in cs or '.' in cs): continue
401 pw = cfg.get(cs, 'password')
402 pw = pw.encode('utf-8')
403 constructor(ci,cs,pw)
405 #---------- startup ----------
407 def common_startup(process_cfg):
408 # ConfigParser hates #-comments after values
409 trailingcomments_re = regexp.compile('#.*')
410 cfg.read_string(trailingcomments_re.sub('', defcfg))
413 def readconfig(pathname, mandatory=True):
414 def log(m, p=pathname):
415 if not DBG.CONFIG in debug_set: return
416 print('DBG.CONFIG: %s: %s' % (m, pathname))
419 files = os.listdir(pathname)
421 except FileNotFoundError:
426 except NotADirectoryError:
433 re = regexp.compile('[^-A-Za-z0-9_]')
434 for f in os.listdir(cdir):
435 if re.search(f): continue
436 subpath = pathname + '/' + f
439 except FileNotFoundError:
440 log('entry skipped', subpath)
443 log('entry read', subpath)
445 def oc_config(od,os, value, op):
450 def dfs_less_detailed(dl):
451 return [df for df in DBG.iterconstants() if df <= dl]
453 def ds_default(od,os,dl,op):
455 debug_set = set(dfs_less_detailed(debug_def_detail))
457 def ds_select(od,os, spec, op):
458 for it in spec.split(','):
460 if it.startswith('-'):
461 mutator = debug_set.discard
464 mutator = debug_set.add
467 dfs = DBG.iterconstants()
471 mapper = dfs_less_detailed
474 mapper = lambda x: [x]
477 dfspec = DBG.lookupByName(it)
479 optparser.error('unknown debug flag %s in --debug-select' % it)
486 optparser.add_option('-D', '--debug',
489 help='enable default debug (to stdout)',
490 callback= ds_default)
492 optparser.add_option('--debug-select',
495 metavar='[-]DFLAG[+]|[-]+,...',
497 '''enable (`-': disable) each specified DFLAG;
498 `+': do same for all "more interesting" DFLAGSs;
499 just `+': all DFLAGs.
500 DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]),
504 optparser.add_option('-c', '--config',
507 metavar='CONFIGFILE',
512 (opts, args) = optparser.parse_args()
513 if len(args): optparser.error('no non-option arguments please')
516 readconfig('/etc/hippotat/config', False)
517 readconfig('/etc/hippotat/config.d', False)
520 except (configparser.Error, ValueError):
521 traceback.print_exc(file=sys.stderr)
522 print('\nInvalid configuration, giving up.', file=sys.stderr)
525 #print(repr(debug_set), file=sys.stderr)
527 log_formatter = twisted.logger.formatEventAsClassicLogText
528 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
529 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
530 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
531 stdsomething_obs = twisted.logger.FilteringLogObserver(
532 stderr_obs, [pred], stdout_obs
534 log_observer = twisted.logger.FilteringLogObserver(
535 stdsomething_obs, [LogNotBoringTwisted()]
537 #log_observer = stdsomething_obs
538 twisted.logger.globalLogBeginner.beginLoggingTo(
539 [ log_observer, crash_on_critical ]
543 log_debug(DBG.INIT, 'entering reactor')
544 if not _crashing: reactor.run()
545 print('CRASHED (end)', file=sys.stderr)