4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from zope.interface import implementer
12 from twisted.internet import reactor
13 import twisted.internet.endpoints
15 from twisted.logger import LogLevel
16 import twisted.python.constants
17 from twisted.python.constants import NamedConstant
20 from ipaddress import AddressValueError
22 from optparse import OptionParser
24 from configparser import ConfigParser
25 from configparser import NoOptionError
27 from functools import partial
36 import hippotat.slip as slip
38 class DBG(twisted.python.constants.Names):
39 INIT = NamedConstant()
40 CONFIG = NamedConstant()
41 ROUTE = NamedConstant()
42 DROP = NamedConstant()
43 FLOW = NamedConstant()
44 HTTP = NamedConstant()
45 TWISTED = NamedConstant()
46 QUEUE = NamedConstant()
47 HTTP_CTRL = NamedConstant()
48 QUEUE_CTRL = NamedConstant()
49 HTTP_FULL = NamedConstant()
50 CTRL_DUMP = NamedConstant()
51 SLIP_FULL = NamedConstant()
52 DATA_COMPLETE = NamedConstant()
54 _hex_codec = codecs.getencoder('hex_codec')
56 #---------- logging ----------
58 org_stderr = sys.stderr
60 log = twisted.logger.Logger()
63 debug_def_detail = DBG.HTTP
65 def log_debug(dflag, msg, idof=None, d=None):
66 if dflag not in debug_set: return
67 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
69 msg = '[%#x] %s' % (id(idof), msg)
72 if not DBG.DATA_COMPLETE in debug_set:
76 d = _hex_codec(d)[0].decode('ascii')
77 msg += ' ' + d + trunc
78 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
80 @implementer(twisted.logger.ILogFilterPredicate)
81 class LogNotBoringTwisted:
82 def __call__(self, event):
83 yes = twisted.logger.PredicateResult.yes
84 no = twisted.logger.PredicateResult.no
86 if event.get('log_level') != LogLevel.info:
88 dflag = event.get('dflag')
89 if dflag in debug_set: return yes
90 if dflag is None and DBG.TWISTED in debug_set: return yes
93 print(traceback.format_exc(), file=org_stderr)
96 #---------- default config ----------
100 max_batch_down = 65536
102 target_requests_outstanding = 3
104 http_timeout_grace = 5
105 max_requests_outstanding = 6
111 #[server] or [<client>] overrides
112 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
114 # relating to virtual network
119 # addrs = 127.0.0.1 ::1
122 # relating to virtual network
123 vvnetwork = 172.24.230.192
124 # vnetwork = <prefix>/<len>
129 # [<client-ip4-or-ipv6-address>]
130 # password = <password> # used by both, must match
133 max_batch_down = 262144
136 target_requests_outstanding = 10
139 # these need to be defined here so that they can be imported by import *
140 cfg = ConfigParser(strict=False)
141 optparser = OptionParser()
143 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
144 def mime_translate(s):
145 # SLIP-encoded packets cannot contain ESC ESC.
146 # Swap `-' and ESC. The result cannot contain `--'
147 return s.translate(_mimetrans)
153 return 'ConfigResults('+repr(self.__dict__)+')'
155 def log_discard(packet, iface, saddr, daddr, why):
157 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
160 #---------- packet parsing ----------
162 def packet_addrs(packet):
163 version = packet[0] >> 4
167 factory = ipaddress.IPv4Address
171 factory = ipaddress.IPv6Address
173 raise ValueError('unsupported IP version %d' % version)
174 saddr = factory(packet[ saddroff : saddroff + addrlen ])
175 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
176 return (saddr, daddr)
178 #---------- address handling ----------
182 r = ipaddress.IPv4Address(input)
183 except AddressValueError:
184 r = ipaddress.IPv6Address(input)
187 def ipnetwork(input):
189 r = ipaddress.IPv4Network(input)
190 except NetworkValueError:
191 r = ipaddress.IPv6Network(input)
194 #---------- ipif (SLIP) subprocess ----------
196 class SlipStreamDecoder():
197 def __init__(self, desc, on_packet):
199 self._on_packet = on_packet
201 self._log('__init__')
203 def _log(self, msg, **kwargs):
204 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
206 def inputdata(self, data):
207 self._log('inputdata', d=data)
208 packets = slip.decode(data)
209 packets[0] = self._buffer + packets[0]
210 self._buffer = packets.pop()
211 for packet in packets:
212 self._maybe_packet(packet)
213 self._log('bufremain', d=self._buffer)
215 def _maybe_packet(self, packet):
216 self._log('maybepacket', d=packet)
218 self._on_packet(packet)
222 self._maybe_packet(self._buffer)
225 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
226 def __init__(self, router):
227 self._router = router
228 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
229 def connectionMade(self): pass
230 def outReceived(self, data):
231 self._decoder.inputdata(data)
232 def slip_on_packet(self, packet):
233 (saddr, daddr) = packet_addrs(packet)
234 if saddr.is_link_local or daddr.is_link_local:
235 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
237 self._router(packet, saddr, daddr)
238 def processEnded(self, status):
239 status.raiseException()
241 def start_ipif(command, router):
243 ipif = _IpifProcessProtocol(router)
244 reactor.spawnProcess(ipif,
245 '/bin/sh',['sh','-xc', command],
246 childFDs={0:'w', 1:'r', 2:2},
249 def queue_inbound(packet):
250 log_debug(DBG.FLOW, "queue_inbound", d=packet)
251 ipif.transport.write(slip.delimiter)
252 ipif.transport.write(slip.encode(packet))
253 ipif.transport.write(slip.delimiter)
255 #---------- packet queue ----------
258 def __init__(self, desc, max_queue_time):
261 self._max_queue_time = max_queue_time
262 self._pq = collections.deque() # packets
264 def _log(self, dflag, msg, **kwargs):
265 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
267 def append(self, packet):
268 self._log(DBG.QUEUE, 'append', d=packet)
269 self._pq.append((time.monotonic(), packet))
272 self._log(DBG.QUEUE, 'nonempty ?')
274 try: (queuetime, packet) = self._pq[0]
276 self._log(DBG.QUEUE, 'nonempty ? empty.')
279 age = time.monotonic() - queuetime
280 if age > self._max_queue_time:
281 # strip old packets off the front
282 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
286 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
289 def process(self, sizequery, moredata, max_batch):
290 # sizequery() should return size of batch so far
291 # moredata(s) should add s to batch
292 self._log(DBG.QUEUE, 'process...')
294 try: (dummy, packet) = self._pq[0]
296 self._log(DBG.QUEUE, 'process... empty')
299 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
301 encoded = slip.encode(packet)
304 self._log(DBG.QUEUE_CTRL,
305 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
309 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
310 self._log(DBG.QUEUE_CTRL, 'process... overflow')
312 moredata(slip.delimiter)
317 #---------- error handling ----------
324 print('========== CRASH ==========', err,
325 '===========================', file=sys.stderr)
327 except twisted.internet.error.ReactorNotRunning: pass
329 def crash_on_defer(defer):
330 defer.addErrback(lambda err: crash(err))
332 def crash_on_critical(event):
333 if event.get('log_level') >= LogLevel.critical:
334 crash(twisted.logger.formatEvent(event))
336 #---------- config processing ----------
338 def _cfg_process_putatives():
341 # maps from abstract object to canonical name for cs's
343 def putative(cmap, abstract, canoncs):
345 current_canoncs = cmap[abstract]
349 assert(current_canoncs == canoncs)
350 cmap[abstract] = canoncs
352 server_pat = r'[-.0-9A-Za-z]+'
353 client_pat = r'[.:0-9a-f]+'
354 server_re = regexp.compile(server_pat)
355 serverclient_re = regexp.compile(server_pat + r' ' + client_pat)
357 for cs in cfg.sections():
363 # plan B "[<client>]" part 1
365 except AddressValueError:
367 if server_re.fullmatch(cs):
368 # plan C "[<servername>]"
369 putative(servers, cs, cs)
372 if serverclient_re.fullmatch(cs):
373 # plan D "[<servername> <client>]" part 1
374 (pss,pcs) = cs.split(' ')
377 # plan E "[<servername> LIMIT]"
381 # plan D "[<servername> <client>]" part 2
383 except AddressValueError:
384 # plan F "[<some thing we do not understand>]"
385 # well, we ignore this
386 print('warning: ignoring config section %s' % cs, file=sys.stderr)
389 else: # no AddressValueError
390 # plan D "[<servername> <client]" part 3
391 putative(clients, ci, pcs)
392 putative(servers, pss, pss)
395 else: # no AddressValueError
396 # plan B "[<client>" part 2
397 putative(clients, ci, cs)
400 return (servers, clients)
402 def cfg_process_common(ss):
403 c.mtu = cfg.getint(ss, 'mtu')
405 def cfg_process_saddrs(c, ss):
407 def __init__(self, port, addrspec):
411 self.addr = ipaddress.IPv4Address(addrspec)
412 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
414 except AddressValueError:
415 self.addr = ipaddress.IPv6Address(addrspec)
416 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
417 self._inurl = b'[%s]'
418 def make_endpoint(self):
419 return self._endpointfactory(reactor, self.port, self.addr)
421 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
422 if self.port != 80: url += b':%d' % self.port
426 c.port = cfg.getint(ss,'port')
428 for addrspec in cfg.get(ss, 'addrs').split():
429 sa = ServerAddr(c.port, addrspec)
432 def cfg_process_vnetwork(c, ss):
433 c.network = ipnetwork(cfg.get(ss,'network'))
434 if c.network.num_addresses < 3 + 2:
435 raise ValueError('network needs at least 2^3 addresses')
437 def cfg_process_vaddr(c, ss):
439 c.vaddr = cfg.get(ss,'server')
440 except NoOptionError:
441 cfg_process_vnetwork(c, ss)
442 c.vaddr = next(c.network.hosts())
444 def cfg_search_section(key,sections):
445 for section in sections:
446 if cfg.has_option(section, key):
448 raise NoOptionError(key, repr(sections))
450 def cfg_search(getter,key,sections):
451 section = cfg_search_section(key,sections)
452 return getter(section, key)
454 def cfg_process_client_limited(cc,ss,sections,key):
455 val = cfg_search(cfg.getint, key, sections)
456 lim = cfg_search(cfg.getint, key, ['%s LIMIT' % ss, 'LIMIT'])
457 cc.__dict__[key] = min(val,lim)
459 def cfg_process_client_common(cc,ss,cs,ci):
460 # returns sections to search in, iff password is defined, otherwise None
463 sections = ['%s %s' % (ss,cs),
468 try: pwsection = cfg_search_section('password', sections)
469 except NoOptionError: return None
471 pw = cfg.get(pwsection, 'password')
472 pw = pw.encode('utf-8')
474 cfg_process_client_limited(cc,ss,sections,'target_requests_outstanding')
475 cfg_process_client_limited(cc,ss,sections,'http_timeout')
479 def cfg_process_ipif(c, sections, varmap):
481 try: v = getattr(c, s)
482 except AttributeError: continue
485 print('CFGIPIF',repr((varmap, sections, c.__dict__)),file=sys.stderr)
487 section = cfg_search_section('ipif', sections)
488 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
490 #---------- startup ----------
492 def common_startup(process_cfg):
493 # calls process_cfg(putative_clients, putative_servers)
495 # ConfigParser hates #-comments after values
496 trailingcomments_re = regexp.compile(r'#.*')
497 cfg.read_string(trailingcomments_re.sub('', defcfg))
500 def readconfig(pathname, mandatory=True):
501 def log(m, p=pathname):
502 if not DBG.CONFIG in debug_set: return
503 print('DBG.CONFIG: %s: %s' % (m, pathname))
506 files = os.listdir(pathname)
508 except FileNotFoundError:
513 except NotADirectoryError:
520 re = regexp.compile('[^-A-Za-z0-9_]')
521 for f in os.listdir(cdir):
522 if re.search(f): continue
523 subpath = pathname + '/' + f
526 except FileNotFoundError:
527 log('entry skipped', subpath)
530 log('entry read', subpath)
532 def oc_config(od,os, value, op):
537 def dfs_less_detailed(dl):
538 return [df for df in DBG.iterconstants() if df <= dl]
540 def ds_default(od,os,dl,op):
542 debug_set = set(dfs_less_detailed(debug_def_detail))
544 def ds_select(od,os, spec, op):
545 for it in spec.split(','):
547 if it.startswith('-'):
548 mutator = debug_set.discard
551 mutator = debug_set.add
554 dfs = DBG.iterconstants()
558 mapper = dfs_less_detailed
561 mapper = lambda x: [x]
564 dfspec = DBG.lookupByName(it)
566 optparser.error('unknown debug flag %s in --debug-select' % it)
573 optparser.add_option('-D', '--debug',
576 help='enable default debug (to stdout)',
577 callback= ds_default)
579 optparser.add_option('--debug-select',
582 metavar='[-]DFLAG[+]|[-]+,...',
584 '''enable (`-': disable) each specified DFLAG;
585 `+': do same for all "more interesting" DFLAGSs;
586 just `+': all DFLAGs.
587 DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]),
591 optparser.add_option('-c', '--config',
594 metavar='CONFIGFILE',
599 (opts, args) = optparser.parse_args()
600 if len(args): optparser.error('no non-option arguments please')
603 readconfig('/etc/hippotat/config', False)
604 readconfig('/etc/hippotat/config.d', False)
607 (pss, pcs) = _cfg_process_putatives()
608 process_cfg(pss, pcs)
609 except (configparser.Error, ValueError):
610 traceback.print_exc(file=sys.stderr)
611 print('\nInvalid configuration, giving up.', file=sys.stderr)
614 #print(repr(debug_set), file=sys.stderr)
616 log_formatter = twisted.logger.formatEventAsClassicLogText
617 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
618 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
619 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
620 stdsomething_obs = twisted.logger.FilteringLogObserver(
621 stderr_obs, [pred], stdout_obs
623 log_observer = twisted.logger.FilteringLogObserver(
624 stdsomething_obs, [LogNotBoringTwisted()]
626 #log_observer = stdsomething_obs
627 twisted.logger.globalLogBeginner.beginLoggingTo(
628 [ log_observer, crash_on_critical ]
632 log_debug(DBG.INIT, 'entering reactor')
633 if not _crashing: reactor.run()
634 print('CRASHED (end)', file=sys.stderr)