4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from zope.interface import implementer
12 from twisted.internet import reactor
13 import twisted.internet.endpoints
15 from twisted.logger import LogLevel
16 import twisted.python.constants
17 from twisted.python.constants import NamedConstant
20 from ipaddress import AddressValueError
22 from optparse import OptionParser
24 from configparser import ConfigParser
25 from configparser import NoOptionError
27 from functools import partial
36 import hippotat.slip as slip
38 class DBG(twisted.python.constants.Names):
39 INIT = NamedConstant()
40 CONFIG = NamedConstant()
41 ROUTE = NamedConstant()
42 DROP = NamedConstant()
43 FLOW = NamedConstant()
44 HTTP = NamedConstant()
45 TWISTED = NamedConstant()
46 QUEUE = NamedConstant()
47 HTTP_CTRL = NamedConstant()
48 QUEUE_CTRL = NamedConstant()
49 HTTP_FULL = NamedConstant()
50 CTRL_DUMP = NamedConstant()
51 SLIP_FULL = NamedConstant()
52 DATA_COMPLETE = NamedConstant()
54 _hex_codec = codecs.getencoder('hex_codec')
56 #---------- logging ----------
58 org_stderr = sys.stderr
60 log = twisted.logger.Logger()
63 debug_def_detail = DBG.HTTP
65 def log_debug(dflag, msg, idof=None, d=None):
66 if dflag not in debug_set: return
67 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
69 msg = '[%#x] %s' % (id(idof), msg)
72 if not DBG.DATA_COMPLETE in debug_set:
76 d = _hex_codec(d)[0].decode('ascii')
77 msg += ' ' + d + trunc
78 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
80 @implementer(twisted.logger.ILogFilterPredicate)
81 class LogNotBoringTwisted:
82 def __call__(self, event):
83 yes = twisted.logger.PredicateResult.yes
84 no = twisted.logger.PredicateResult.no
86 if event.get('log_level') != LogLevel.info:
88 dflag = event.get('dflag')
89 if dflag is False : return yes
90 if dflag in debug_set: return yes
91 if dflag is None and DBG.TWISTED in debug_set: return yes
94 print(traceback.format_exc(), file=org_stderr)
97 #---------- default config ----------
101 max_batch_down = 65536
103 target_requests_outstanding = 3
105 http_timeout_grace = 5
106 max_requests_outstanding = 6
112 #[server] or [<client>] overrides
113 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
115 # relating to virtual network
120 # addrs = 127.0.0.1 ::1
123 # relating to virtual network
124 vvnetwork = 172.24.230.192
125 # vnetwork = <prefix>/<len>
130 # [<client-ip4-or-ipv6-address>]
131 # password = <password> # used by both, must match
134 max_batch_down = 262144
137 target_requests_outstanding = 10
140 # these need to be defined here so that they can be imported by import *
141 cfg = ConfigParser(strict=False)
142 optparser = OptionParser()
144 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
145 def mime_translate(s):
146 # SLIP-encoded packets cannot contain ESC ESC.
147 # Swap `-' and ESC. The result cannot contain `--'
148 return s.translate(_mimetrans)
154 return 'ConfigResults('+repr(self.__dict__)+')'
156 def log_discard(packet, iface, saddr, daddr, why):
158 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
161 #---------- packet parsing ----------
163 def packet_addrs(packet):
164 version = packet[0] >> 4
168 factory = ipaddress.IPv4Address
172 factory = ipaddress.IPv6Address
174 raise ValueError('unsupported IP version %d' % version)
175 saddr = factory(packet[ saddroff : saddroff + addrlen ])
176 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
177 return (saddr, daddr)
179 #---------- address handling ----------
183 r = ipaddress.IPv4Address(input)
184 except AddressValueError:
185 r = ipaddress.IPv6Address(input)
188 def ipnetwork(input):
190 r = ipaddress.IPv4Network(input)
191 except NetworkValueError:
192 r = ipaddress.IPv6Network(input)
195 #---------- ipif (SLIP) subprocess ----------
197 class SlipStreamDecoder():
198 def __init__(self, desc, on_packet):
200 self._on_packet = on_packet
202 self._log('__init__')
204 def _log(self, msg, **kwargs):
205 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
207 def inputdata(self, data):
208 self._log('inputdata', d=data)
209 data = self._buffer + data
211 packets = slip.decode(data, True)
212 self._buffer = packets.pop()
213 for packet in packets:
214 self._maybe_packet(packet)
215 self._log('bufremain', d=self._buffer)
217 def _maybe_packet(self, packet):
218 self._log('maybepacket', d=packet)
220 self._on_packet(packet)
226 packets = slip.decode(data)
227 assert(len(packets) == 1)
228 self._maybe_packet(packets[0])
230 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
231 def __init__(self, router):
232 self._router = router
233 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
234 def connectionMade(self): pass
235 def outReceived(self, data):
236 self._decoder.inputdata(data)
237 def slip_on_packet(self, packet):
238 (saddr, daddr) = packet_addrs(packet)
239 if saddr.is_link_local or daddr.is_link_local:
240 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
242 self._router(packet, saddr, daddr)
243 def processEnded(self, status):
244 status.raiseException()
246 def start_ipif(command, router):
247 ipif = _IpifProcessProtocol(router)
248 reactor.spawnProcess(ipif,
249 '/bin/sh',['sh','-xc', command],
250 childFDs={0:'w', 1:'r', 2:2},
254 def queue_inbound(ipif, packet):
255 log_debug(DBG.FLOW, "queue_inbound", d=packet)
256 ipif.transport.write(slip.delimiter)
257 ipif.transport.write(slip.encode(packet))
258 ipif.transport.write(slip.delimiter)
260 #---------- packet queue ----------
263 def __init__(self, desc, max_queue_time):
266 self._max_queue_time = max_queue_time
267 self._pq = collections.deque() # packets
269 def _log(self, dflag, msg, **kwargs):
270 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
272 def append(self, packet):
273 self._log(DBG.QUEUE, 'append', d=packet)
274 self._pq.append((time.monotonic(), packet))
277 self._log(DBG.QUEUE, 'nonempty ?')
279 try: (queuetime, packet) = self._pq[0]
281 self._log(DBG.QUEUE, 'nonempty ? empty.')
284 age = time.monotonic() - queuetime
285 if age > self._max_queue_time:
286 # strip old packets off the front
287 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
291 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
294 def process(self, sizequery, moredata, max_batch):
295 # sizequery() should return size of batch so far
296 # moredata(s) should add s to batch
297 self._log(DBG.QUEUE, 'process...')
299 try: (dummy, packet) = self._pq[0]
301 self._log(DBG.QUEUE, 'process... empty')
304 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
306 encoded = slip.encode(packet)
309 self._log(DBG.QUEUE_CTRL,
310 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
314 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
315 self._log(DBG.QUEUE_CTRL, 'process... overflow')
317 moredata(slip.delimiter)
322 #---------- error handling ----------
329 print('========== CRASH ==========', err,
330 '===========================', file=sys.stderr)
332 except twisted.internet.error.ReactorNotRunning: pass
334 def crash_on_defer(defer):
335 defer.addErrback(lambda err: crash(err))
337 def crash_on_critical(event):
338 if event.get('log_level') >= LogLevel.critical:
339 crash(twisted.logger.formatEvent(event))
341 #---------- config processing ----------
343 def _cfg_process_putatives():
346 # maps from abstract object to canonical name for cs's
348 def putative(cmap, abstract, canoncs):
350 current_canoncs = cmap[abstract]
354 assert(current_canoncs == canoncs)
355 cmap[abstract] = canoncs
357 server_pat = r'[-.0-9A-Za-z]+'
358 client_pat = r'[.:0-9a-f]+'
359 server_re = regexp.compile(server_pat)
360 serverclient_re = regexp.compile(server_pat + r' ' + client_pat)
362 for cs in cfg.sections():
368 # plan B "[<client>]" part 1
370 except AddressValueError:
372 if server_re.fullmatch(cs):
373 # plan C "[<servername>]"
374 putative(servers, cs, cs)
377 if serverclient_re.fullmatch(cs):
378 # plan D "[<servername> <client>]" part 1
379 (pss,pcs) = cs.split(' ')
382 # plan E "[<servername> LIMIT]"
386 # plan D "[<servername> <client>]" part 2
388 except AddressValueError:
389 # plan F "[<some thing we do not understand>]"
390 # well, we ignore this
391 print('warning: ignoring config section %s' % cs, file=sys.stderr)
394 else: # no AddressValueError
395 # plan D "[<servername> <client]" part 3
396 putative(clients, ci, pcs)
397 putative(servers, pss, pss)
400 else: # no AddressValueError
401 # plan B "[<client>" part 2
402 putative(clients, ci, cs)
405 return (servers, clients)
407 def cfg_process_common(c, ss):
408 c.mtu = cfg.getint(ss, 'mtu')
410 def cfg_process_saddrs(c, ss):
412 def __init__(self, port, addrspec):
416 self.addr = ipaddress.IPv4Address(addrspec)
417 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
419 except AddressValueError:
420 self.addr = ipaddress.IPv6Address(addrspec)
421 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
422 self._inurl = b'[%s]'
423 def make_endpoint(self):
424 return self._endpointfactory(reactor, self.port, self.addr)
426 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
427 if self.port != 80: url += b':%d' % self.port
431 c.port = cfg.getint(ss,'port')
433 for addrspec in cfg.get(ss, 'addrs').split():
434 sa = ServerAddr(c.port, addrspec)
437 def cfg_process_vnetwork(c, ss):
438 c.vnetwork = ipnetwork(cfg.get(ss,'vnetwork'))
439 if c.vnetwork.num_addresses < 3 + 2:
440 raise ValueError('vnetwork needs at least 2^3 addresses')
442 def cfg_process_vaddr(c, ss):
444 c.vaddr = cfg.get(ss,'vaddr')
445 except NoOptionError:
446 cfg_process_vnetwork(c, ss)
447 c.vaddr = next(c.vnetwork.hosts())
449 def cfg_search_section(key,sections):
450 for section in sections:
451 if cfg.has_option(section, key):
453 raise NoOptionError(key, repr(sections))
455 def cfg_search(getter,key,sections):
456 section = cfg_search_section(key,sections)
457 return getter(section, key)
459 def cfg_process_client_limited(cc,ss,sections,key):
460 val = cfg_search(cfg.getint, key, sections)
461 lim = cfg_search(cfg.getint, key, ['%s LIMIT' % ss, 'LIMIT'])
462 cc.__dict__[key] = min(val,lim)
464 def cfg_process_client_common(cc,ss,cs,ci):
465 # returns sections to search in, iff password is defined, otherwise None
468 sections = ['%s %s' % (ss,cs),
473 try: pwsection = cfg_search_section('password', sections)
474 except NoOptionError: return None
476 pw = cfg.get(pwsection, 'password')
477 cc.password = pw.encode('utf-8')
479 cfg_process_client_limited(cc,ss,sections,'target_requests_outstanding')
480 cfg_process_client_limited(cc,ss,sections,'http_timeout')
484 def cfg_process_ipif(c, sections, varmap):
486 try: v = getattr(c, s)
487 except AttributeError: continue
490 #print('CFGIPIF',repr((varmap, sections, c.__dict__)),file=sys.stderr)
492 section = cfg_search_section('ipif', sections)
493 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
495 #---------- startup ----------
497 def common_startup(process_cfg):
498 # calls process_cfg(putative_clients, putative_servers)
500 # ConfigParser hates #-comments after values
501 trailingcomments_re = regexp.compile(r'#.*')
502 cfg.read_string(trailingcomments_re.sub('', defcfg))
505 def readconfig(pathname, mandatory=True):
506 def log(m, p=pathname):
507 if not DBG.CONFIG in debug_set: return
508 print('DBG.CONFIG: %s: %s' % (m, pathname))
511 files = os.listdir(pathname)
513 except FileNotFoundError:
518 except NotADirectoryError:
525 re = regexp.compile('[^-A-Za-z0-9_]')
526 for f in os.listdir(cdir):
527 if re.search(f): continue
528 subpath = pathname + '/' + f
531 except FileNotFoundError:
532 log('entry skipped', subpath)
535 log('entry read', subpath)
537 def oc_config(od,os, value, op):
542 def dfs_less_detailed(dl):
543 return [df for df in DBG.iterconstants() if df <= dl]
545 def ds_default(od,os,dl,op):
547 debug_set = set(dfs_less_detailed(debug_def_detail))
549 def ds_select(od,os, spec, op):
550 for it in spec.split(','):
552 if it.startswith('-'):
553 mutator = debug_set.discard
556 mutator = debug_set.add
559 dfs = DBG.iterconstants()
563 mapper = dfs_less_detailed
566 mapper = lambda x: [x]
569 dfspec = DBG.lookupByName(it)
571 optparser.error('unknown debug flag %s in --debug-select' % it)
578 optparser.add_option('-D', '--debug',
581 help='enable default debug (to stdout)',
582 callback= ds_default)
584 optparser.add_option('--debug-select',
587 metavar='[-]DFLAG[+]|[-]+,...',
589 '''enable (`-': disable) each specified DFLAG;
590 `+': do same for all "more interesting" DFLAGSs;
591 just `+': all DFLAGs.
592 DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]),
596 optparser.add_option('-c', '--config',
599 metavar='CONFIGFILE',
604 (opts, args) = optparser.parse_args()
605 if len(args): optparser.error('no non-option arguments please')
608 readconfig('/etc/hippotat/config', False)
609 readconfig('/etc/hippotat/config.d', False)
612 (pss, pcs) = _cfg_process_putatives()
613 process_cfg(pss, pcs)
614 except (configparser.Error, ValueError):
615 traceback.print_exc(file=sys.stderr)
616 print('\nInvalid configuration, giving up.', file=sys.stderr)
619 #print(repr(debug_set), file=sys.stderr)
621 log_formatter = twisted.logger.formatEventAsClassicLogText
622 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
623 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
624 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
625 stdsomething_obs = twisted.logger.FilteringLogObserver(
626 stderr_obs, [pred], stdout_obs
628 log_observer = twisted.logger.FilteringLogObserver(
629 stdsomething_obs, [LogNotBoringTwisted()]
631 #log_observer = stdsomething_obs
632 twisted.logger.globalLogBeginner.beginLoggingTo(
633 [ log_observer, crash_on_critical ]
637 log_debug(DBG.INIT, 'entering reactor')
638 if not _crashing: reactor.run()
639 print('CRASHED (end)', file=sys.stderr)