4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from zope.interface import implementer
12 from twisted.internet import reactor
13 import twisted.internet.endpoints
15 from twisted.logger import LogLevel
16 import twisted.python.constants
17 from twisted.python.constants import NamedConstant
20 from ipaddress import AddressValueError
22 from optparse import OptionParser
24 from configparser import ConfigParser
25 from configparser import NoOptionError
27 from functools import partial
36 import hippotat.slip as slip
38 class DBG(twisted.python.constants.Names):
39 INIT = NamedConstant()
40 CONFIG = NamedConstant()
41 ROUTE = NamedConstant()
42 DROP = NamedConstant()
43 FLOW = NamedConstant()
44 HTTP = NamedConstant()
45 TWISTED = NamedConstant()
46 QUEUE = NamedConstant()
47 HTTP_CTRL = NamedConstant()
48 QUEUE_CTRL = NamedConstant()
49 HTTP_FULL = NamedConstant()
50 CTRL_DUMP = NamedConstant()
51 SLIP_FULL = NamedConstant()
52 DATA_COMPLETE = NamedConstant()
54 _hex_codec = codecs.getencoder('hex_codec')
56 #---------- logging ----------
58 org_stderr = sys.stderr
60 log = twisted.logger.Logger()
63 debug_def_detail = DBG.HTTP
65 def log_debug(dflag, msg, idof=None, d=None):
66 if dflag not in debug_set: return
67 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
69 msg = '[%#x] %s' % (id(idof), msg)
72 if not DBG.DATA_COMPLETE in debug_set:
76 d = _hex_codec(d)[0].decode('ascii')
77 msg += ' ' + d + trunc
78 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
80 @implementer(twisted.logger.ILogFilterPredicate)
81 class LogNotBoringTwisted:
82 def __call__(self, event):
83 yes = twisted.logger.PredicateResult.yes
84 no = twisted.logger.PredicateResult.no
86 if event.get('log_level') != LogLevel.info:
88 dflag = event.get('dflag')
89 if dflag in debug_set: return yes
90 if dflag is None and DBG.TWISTED in debug_set: return yes
93 print(traceback.format_exc(), file=org_stderr)
96 #---------- default config ----------
100 max_batch_down = 65536
102 target_requests_outstanding = 3
104 http_timeout_grace = 5
105 max_requests_outstanding = 6
109 #[server] or [<client>] overrides
110 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
112 # relating to virtual network
116 # addrs = 127.0.0.1 ::1
120 # relating to virtual network
122 vnetwork = 172.24.230.192
123 # network = <prefix>/<len>
128 # [<client-ip4-or-ipv6-address>]
129 # password = <password> # used by both, must match
132 max_batch_down = 262144
135 target_requests_outstanding = 10
138 # these need to be defined here so that they can be imported by import *
139 cfg = ConfigParser(strict=False)
140 optparser = OptionParser()
142 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
143 def mime_translate(s):
144 # SLIP-encoded packets cannot contain ESC ESC.
145 # Swap `-' and ESC. The result cannot contain `--'
146 return s.translate(_mimetrans)
149 def __init__(self, d = { }):
152 return 'ConfigResults('+repr(self.__dict__)+')'
156 def log_discard(packet, iface, saddr, daddr, why):
158 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
161 #---------- packet parsing ----------
163 def packet_addrs(packet):
164 version = packet[0] >> 4
168 factory = ipaddress.IPv4Address
172 factory = ipaddress.IPv6Address
174 raise ValueError('unsupported IP version %d' % version)
175 saddr = factory(packet[ saddroff : saddroff + addrlen ])
176 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
177 return (saddr, daddr)
179 #---------- address handling ----------
183 r = ipaddress.IPv4Address(input)
184 except AddressValueError:
185 r = ipaddress.IPv6Address(input)
188 def ipnetwork(input):
190 r = ipaddress.IPv4Network(input)
191 except NetworkValueError:
192 r = ipaddress.IPv6Network(input)
195 #---------- ipif (SLIP) subprocess ----------
197 class SlipStreamDecoder():
198 def __init__(self, desc, on_packet):
200 self._on_packet = on_packet
202 self._log('__init__')
204 def _log(self, msg, **kwargs):
205 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
207 def inputdata(self, data):
208 self._log('inputdata', d=data)
209 packets = slip.decode(data)
210 packets[0] = self._buffer + packets[0]
211 self._buffer = packets.pop()
212 for packet in packets:
213 self._maybe_packet(packet)
214 self._log('bufremain', d=self._buffer)
216 def _maybe_packet(self, packet):
217 self._log('maybepacket', d=packet)
219 self._on_packet(packet)
223 self._maybe_packet(self._buffer)
226 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
227 def __init__(self, router):
228 self._router = router
229 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
230 def connectionMade(self): pass
231 def outReceived(self, data):
232 self._decoder.inputdata(data)
233 def slip_on_packet(self, packet):
234 (saddr, daddr) = packet_addrs(packet)
235 if saddr.is_link_local or daddr.is_link_local:
236 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
238 self._router(packet, saddr, daddr)
239 def processEnded(self, status):
240 status.raiseException()
242 def start_ipif(command, router):
244 ipif = _IpifProcessProtocol(router)
245 reactor.spawnProcess(ipif,
246 '/bin/sh',['sh','-xc', command],
247 childFDs={0:'w', 1:'r', 2:2},
250 def queue_inbound(packet):
251 log_debug(DBG.FLOW, "queue_inbound", d=packet)
252 ipif.transport.write(slip.delimiter)
253 ipif.transport.write(slip.encode(packet))
254 ipif.transport.write(slip.delimiter)
256 #---------- packet queue ----------
259 def __init__(self, desc, max_queue_time):
262 self._max_queue_time = max_queue_time
263 self._pq = collections.deque() # packets
265 def _log(self, dflag, msg, **kwargs):
266 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
268 def append(self, packet):
269 self._log(DBG.QUEUE, 'append', d=packet)
270 self._pq.append((time.monotonic(), packet))
273 self._log(DBG.QUEUE, 'nonempty ?')
275 try: (queuetime, packet) = self._pq[0]
277 self._log(DBG.QUEUE, 'nonempty ? empty.')
280 age = time.monotonic() - queuetime
281 if age > self._max_queue_time:
282 # strip old packets off the front
283 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
287 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
290 def process(self, sizequery, moredata, max_batch):
291 # sizequery() should return size of batch so far
292 # moredata(s) should add s to batch
293 self._log(DBG.QUEUE, 'process...')
295 try: (dummy, packet) = self._pq[0]
297 self._log(DBG.QUEUE, 'process... empty')
300 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
302 encoded = slip.encode(packet)
305 self._log(DBG.QUEUE_CTRL,
306 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
310 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
311 self._log(DBG.QUEUE_CTRL, 'process... overflow')
313 moredata(slip.delimiter)
318 #---------- error handling ----------
325 print('========== CRASH ==========', err,
326 '===========================', file=sys.stderr)
328 except twisted.internet.error.ReactorNotRunning: pass
330 def crash_on_defer(defer):
331 defer.addErrback(lambda err: crash(err))
333 def crash_on_critical(event):
334 if event.get('log_level') >= LogLevel.critical:
335 crash(twisted.logger.formatEvent(event))
337 #---------- config processing ----------
339 def process_cfg_common_always():
341 c.mtu = cfg.get('virtual','mtu')
343 def process_cfg_ipif(section, varmap):
345 try: v = getattr(c, s)
346 except AttributeError: continue
351 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
353 def process_cfg_network():
354 c.network = ipnetwork(cfg.get('virtual','network'))
355 if c.network.num_addresses < 3 + 2:
356 raise ValueError('network needs at least 2^3 addresses')
358 def process_cfg_server():
360 c.server = cfg.get('virtual','server')
361 except NoOptionError:
362 process_cfg_network()
363 c.server = next(c.network.hosts())
366 def __init__(self, port, addrspec):
370 self.addr = ipaddress.IPv4Address(addrspec)
371 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
373 except AddressValueError:
374 self.addr = ipaddress.IPv6Address(addrspec)
375 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
376 self._inurl = b'[%s]'
377 def make_endpoint(self):
378 return self._endpointfactory(reactor, self.port, self.addr)
380 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
381 if self.port != 80: url += b':%d' % self.port
385 def process_cfg_saddrs():
386 try: port = cfg.getint('server','port')
387 except NoOptionError: port = 80
390 for addrspec in cfg.get('server','addrs').split():
391 sa = ServerAddr(port, addrspec)
394 def process_cfg_clients(constructor):
396 for cs in cfg.sections():
397 if not (':' in cs or '.' in cs): continue
399 pw = cfg.get(cs, 'password')
400 pw = pw.encode('utf-8')
401 constructor(ci,cs,pw)
403 #---------- startup ----------
405 def common_startup(process_cfg):
406 # ConfigParser hates #-comments after values
407 trailingcomments_re = regexp.compile('#.*')
408 cfg.read_string(trailingcomments_re.sub('', defcfg))
411 def readconfig(pathname, mandatory=True):
412 def log(m, p=pathname):
413 if not DBG.CONFIG in debug_set: return
414 print('DBG.CONFIG: %s: %s' % (m, pathname))
417 files = os.listdir(pathname)
419 except FileNotFoundError:
424 except NotADirectoryError:
431 re = regexp.compile('[^-A-Za-z0-9_]')
432 for f in os.listdir(cdir):
433 if re.search(f): continue
434 subpath = pathname + '/' + f
437 except FileNotFoundError:
438 log('entry skipped', subpath)
441 log('entry read', subpath)
443 def oc_config(od,os, value, op):
448 def dfs_less_detailed(dl):
449 return [df for df in DBG.iterconstants() if df <= dl]
451 def ds_default(od,os,dl,op):
453 debug_set = set(dfs_less_detailed(debug_def_detail))
455 def ds_select(od,os, spec, op):
456 for it in spec.split(','):
458 if it.startswith('-'):
459 mutator = debug_set.discard
462 mutator = debug_set.add
465 dfs = DBG.iterconstants()
469 mapper = dfs_less_detailed
472 mapper = lambda x: [x]
475 dfspec = DBG.lookupByName(it)
477 optparser.error('unknown debug flag %s in --debug-select' % it)
484 optparser.add_option('-D', '--debug',
487 help='enable default debug (to stdout)',
488 callback= ds_default)
490 optparser.add_option('--debug-select',
493 metavar='[-]DFLAG[+]|[-]+,...',
495 '''enable (`-': disable) each specified DFLAG;
496 `+': do same for all "more interesting" DFLAGSs;
497 just `+': all DFLAGs.
498 DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]),
502 optparser.add_option('-c', '--config',
505 metavar='CONFIGFILE',
510 (opts, args) = optparser.parse_args()
511 if len(args): optparser.error('no non-option arguments please')
514 readconfig('/etc/hippotat/config', False)
515 readconfig('/etc/hippotat/config.d', False)
518 except (configparser.Error, ValueError):
519 traceback.print_exc(file=sys.stderr)
520 print('\nInvalid configuration, giving up.', file=sys.stderr)
523 #print(repr(debug_set), file=sys.stderr)
525 log_formatter = twisted.logger.formatEventAsClassicLogText
526 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
527 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
528 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
529 stdsomething_obs = twisted.logger.FilteringLogObserver(
530 stderr_obs, [pred], stdout_obs
532 log_observer = twisted.logger.FilteringLogObserver(
533 stdsomething_obs, [LogNotBoringTwisted()]
535 #log_observer = stdsomething_obs
536 twisted.logger.globalLogBeginner.beginLoggingTo(
537 [ log_observer, crash_on_critical ]
541 log_debug(DBG.INIT, 'entering reactor')
542 if not _crashing: reactor.run()
543 print('CRASHED (end)', file=sys.stderr)