4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from zope.interface import implementer
12 from twisted.internet import reactor
13 import twisted.internet.endpoints
15 from twisted.logger import LogLevel
16 import twisted.python.constants
17 from twisted.python.constants import NamedConstant
20 from ipaddress import AddressValueError
22 from optparse import OptionParser
24 from configparser import ConfigParser
25 from configparser import NoOptionError
27 from functools import partial
36 import hippotat.slip as slip
38 class DBG(twisted.python.constants.Names):
39 INIT = NamedConstant()
40 CONFIG = NamedConstant()
41 ROUTE = NamedConstant()
42 DROP = NamedConstant()
43 FLOW = NamedConstant()
44 HTTP = NamedConstant()
45 TWISTED = NamedConstant()
46 QUEUE = NamedConstant()
47 HTTP_CTRL = NamedConstant()
48 QUEUE_CTRL = NamedConstant()
49 HTTP_FULL = NamedConstant()
50 CTRL_DUMP = NamedConstant()
51 SLIP_FULL = NamedConstant()
52 DATA_COMPLETE = NamedConstant()
54 _hex_codec = codecs.getencoder('hex_codec')
56 #---------- logging ----------
58 org_stderr = sys.stderr
60 log = twisted.logger.Logger()
63 debug_def_detail = DBG.HTTP
65 def log_debug(dflag, msg, idof=None, d=None):
66 if dflag not in debug_set: return
67 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
69 msg = '[%#x] %s' % (id(idof), msg)
72 if not DBG.DATA_COMPLETE in debug_set:
76 d = _hex_codec(d)[0].decode('ascii')
77 msg += ' ' + d + trunc
78 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
80 @implementer(twisted.logger.ILogFilterPredicate)
81 class LogNotBoringTwisted:
82 def __call__(self, event):
83 yes = twisted.logger.PredicateResult.yes
84 no = twisted.logger.PredicateResult.no
86 if event.get('log_level') != LogLevel.info:
88 dflag = event.get('dflag')
89 if dflag in debug_set: return yes
90 if dflag is None and DBG.TWISTED in debug_set: return yes
93 print(traceback.format_exc(), file=org_stderr)
96 #---------- default config ----------
100 max_batch_down = 65536
102 target_requests_outstanding = 3
104 http_timeout_grace = 5
105 max_requests_outstanding = 6
110 #[server] or [<client>] overrides
111 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
113 # relating to virtual network
118 # addrs = 127.0.0.1 ::1
121 # relating to virtual network
123 vnetwork = 172.24.230.192
124 # network = <prefix>/<len>
129 # [<client-ip4-or-ipv6-address>]
130 # password = <password> # used by both, must match
133 max_batch_down = 262144
136 target_requests_outstanding = 10
139 # these need to be defined here so that they can be imported by import *
140 cfg = ConfigParser(strict=False)
141 optparser = OptionParser()
143 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
144 def mime_translate(s):
145 # SLIP-encoded packets cannot contain ESC ESC.
146 # Swap `-' and ESC. The result cannot contain `--'
147 return s.translate(_mimetrans)
153 return 'ConfigResults('+repr(self.__dict__)+')'
155 def log_discard(packet, iface, saddr, daddr, why):
157 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
160 #---------- packet parsing ----------
162 def packet_addrs(packet):
163 version = packet[0] >> 4
167 factory = ipaddress.IPv4Address
171 factory = ipaddress.IPv6Address
173 raise ValueError('unsupported IP version %d' % version)
174 saddr = factory(packet[ saddroff : saddroff + addrlen ])
175 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
176 return (saddr, daddr)
178 #---------- address handling ----------
182 r = ipaddress.IPv4Address(input)
183 except AddressValueError:
184 r = ipaddress.IPv6Address(input)
187 def ipnetwork(input):
189 r = ipaddress.IPv4Network(input)
190 except NetworkValueError:
191 r = ipaddress.IPv6Network(input)
194 #---------- ipif (SLIP) subprocess ----------
196 class SlipStreamDecoder():
197 def __init__(self, desc, on_packet):
199 self._on_packet = on_packet
201 self._log('__init__')
203 def _log(self, msg, **kwargs):
204 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
206 def inputdata(self, data):
207 self._log('inputdata', d=data)
208 packets = slip.decode(data)
209 packets[0] = self._buffer + packets[0]
210 self._buffer = packets.pop()
211 for packet in packets:
212 self._maybe_packet(packet)
213 self._log('bufremain', d=self._buffer)
215 def _maybe_packet(self, packet):
216 self._log('maybepacket', d=packet)
218 self._on_packet(packet)
222 self._maybe_packet(self._buffer)
225 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
226 def __init__(self, router):
227 self._router = router
228 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
229 def connectionMade(self): pass
230 def outReceived(self, data):
231 self._decoder.inputdata(data)
232 def slip_on_packet(self, packet):
233 (saddr, daddr) = packet_addrs(packet)
234 if saddr.is_link_local or daddr.is_link_local:
235 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
237 self._router(packet, saddr, daddr)
238 def processEnded(self, status):
239 status.raiseException()
241 def start_ipif(command, router):
243 ipif = _IpifProcessProtocol(router)
244 reactor.spawnProcess(ipif,
245 '/bin/sh',['sh','-xc', command],
246 childFDs={0:'w', 1:'r', 2:2},
249 def queue_inbound(packet):
250 log_debug(DBG.FLOW, "queue_inbound", d=packet)
251 ipif.transport.write(slip.delimiter)
252 ipif.transport.write(slip.encode(packet))
253 ipif.transport.write(slip.delimiter)
255 #---------- packet queue ----------
258 def __init__(self, desc, max_queue_time):
261 self._max_queue_time = max_queue_time
262 self._pq = collections.deque() # packets
264 def _log(self, dflag, msg, **kwargs):
265 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
267 def append(self, packet):
268 self._log(DBG.QUEUE, 'append', d=packet)
269 self._pq.append((time.monotonic(), packet))
272 self._log(DBG.QUEUE, 'nonempty ?')
274 try: (queuetime, packet) = self._pq[0]
276 self._log(DBG.QUEUE, 'nonempty ? empty.')
279 age = time.monotonic() - queuetime
280 if age > self._max_queue_time:
281 # strip old packets off the front
282 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
286 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
289 def process(self, sizequery, moredata, max_batch):
290 # sizequery() should return size of batch so far
291 # moredata(s) should add s to batch
292 self._log(DBG.QUEUE, 'process...')
294 try: (dummy, packet) = self._pq[0]
296 self._log(DBG.QUEUE, 'process... empty')
299 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
301 encoded = slip.encode(packet)
304 self._log(DBG.QUEUE_CTRL,
305 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
309 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
310 self._log(DBG.QUEUE_CTRL, 'process... overflow')
312 moredata(slip.delimiter)
317 #---------- error handling ----------
324 print('========== CRASH ==========', err,
325 '===========================', file=sys.stderr)
327 except twisted.internet.error.ReactorNotRunning: pass
329 def crash_on_defer(defer):
330 defer.addErrback(lambda err: crash(err))
332 def crash_on_critical(event):
333 if event.get('log_level') >= LogLevel.critical:
334 crash(twisted.logger.formatEvent(event))
336 #---------- config processing ----------
338 def _cfg_process_putatives():
341 # maps from abstract object to canonical name for cs's
343 def putative(cmap, abstract, canoncs):
345 current_canoncs = cmap[abstract]
349 assert(current_canoncs == canoncs)
350 cmap[abstract] = canoncs
352 server_pat = r'[-.0-9A-Za-z]+'
353 client_pat = r'[.:0-9a-f]+'
354 server_re = regexp.compile(server_pat)
355 serverclient_re = regexp.compile(server_pat + r' ' + client_pat)
357 for cs in cfg.sections():
363 # plan B "[<client>]" part 1
365 except AddressValueError:
367 if server_re.fullmatch(cs):
368 # plan C "[<servername>]"
369 putative(servers, cs, cs)
372 if serverclient_re.fullmatch(cs):
373 # plan D "[<servername> <client>]" part 1
374 (pss,pcs) = cs.split(' ')
377 # plan E "[<servername> LIMIT]"
381 # plan D "[<servername> <client>]" part 2
383 except AddressValueError:
384 # plan F "[<some thing we do not understand>]"
385 # well, we ignore this
386 print('warning: ignoring config section %s' % cs, file=sys.stderr)
389 else: # no AddressValueError
390 # plan D "[<servername> <client]" part 3
391 putative(clients, ci, pcs)
392 putative(servers, pss, pss)
395 else: # no AddressValueError
396 # plan B "[<client>" part 2
397 putative(clients, ci, cs)
400 return (servers, clients)
402 def cfg_process_common(ss):
403 c.mtu = cfg.getint(ss, 'mtu')
405 def cfg_process_saddrs(c, ss):
407 def __init__(self, port, addrspec):
411 self.addr = ipaddress.IPv4Address(addrspec)
412 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
414 except AddressValueError:
415 self.addr = ipaddress.IPv6Address(addrspec)
416 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
417 self._inurl = b'[%s]'
418 def make_endpoint(self):
419 return self._endpointfactory(reactor, self.port, self.addr)
421 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
422 if self.port != 80: url += b':%d' % self.port
426 c.port = cfg.getint(ss,'port')
428 for addrspec in cfg.get(ss, 'addrs').split():
429 sa = ServerAddr(c.port, addrspec)
432 def cfg_process_vnetwork(c, ss):
433 c.network = ipnetwork(cfg.get(ss,'network'))
434 if c.network.num_addresses < 3 + 2:
435 raise ValueError('network needs at least 2^3 addresses')
437 def cfg_process_vaddr():
439 c.server = cfg.get('virtual','server')
440 except NoOptionError:
441 process_cfg_network()
442 c.server = next(c.network.hosts())
444 def cfg_search_section(key,sections):
445 for section in sections:
446 if cfg.has_option(section, key):
448 raise NoOptionError('missing %s %s' % (key, repr(sections)))
450 def cfg_search(getter,key,sections):
451 section = cfg_search_section(key,sections)
452 return getter(section, key)
454 def cfg_process_client_limited(cc,ss,sections,key):
455 val = cfg_search(cfg.getint, key, sections)
456 lim = cfg_search(cfg.getint, key, '%s LIMIT' % ss, 'LIMIT')
457 cc.__dict__[key] = min(val,lim)
459 def cfg_process_client_common(cc,ss,cs,ci):
460 # returns sections to search in, iff password is defined, otherwise None
463 sections = ['%s %s' % section,
468 try: pwsection = cfg_search_section('password', sections)
469 except NoOptionError: return None
471 pw = cfg.get(pwsection, 'password')
472 pw = pw.encode('utf-8')
474 cfg_process_client_limited(cc,ss,sections,'target_requests_outstanding')
475 cfg_process_client_limited(cc,ss,sections,'http_timeout')
479 def process_cfg_ipif(c, sections, varmap):
481 try: v = getattr(c, s)
482 except AttributeError: continue
485 section = cfg_search_section('ipif', sections)
486 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
488 #---------- startup ----------
490 def common_startup(process_cfg):
491 # calls process_cfg(putative_clients, putative_servers)
493 # ConfigParser hates #-comments after values
494 trailingcomments_re = regexp.compile(r'#.*')
495 cfg.read_string(trailingcomments_re.sub('', defcfg))
498 def readconfig(pathname, mandatory=True):
499 def log(m, p=pathname):
500 if not DBG.CONFIG in debug_set: return
501 print('DBG.CONFIG: %s: %s' % (m, pathname))
504 files = os.listdir(pathname)
506 except FileNotFoundError:
511 except NotADirectoryError:
518 re = regexp.compile('[^-A-Za-z0-9_]')
519 for f in os.listdir(cdir):
520 if re.search(f): continue
521 subpath = pathname + '/' + f
524 except FileNotFoundError:
525 log('entry skipped', subpath)
528 log('entry read', subpath)
530 def oc_config(od,os, value, op):
535 def dfs_less_detailed(dl):
536 return [df for df in DBG.iterconstants() if df <= dl]
538 def ds_default(od,os,dl,op):
540 debug_set = set(dfs_less_detailed(debug_def_detail))
542 def ds_select(od,os, spec, op):
543 for it in spec.split(','):
545 if it.startswith('-'):
546 mutator = debug_set.discard
549 mutator = debug_set.add
552 dfs = DBG.iterconstants()
556 mapper = dfs_less_detailed
559 mapper = lambda x: [x]
562 dfspec = DBG.lookupByName(it)
564 optparser.error('unknown debug flag %s in --debug-select' % it)
571 optparser.add_option('-D', '--debug',
574 help='enable default debug (to stdout)',
575 callback= ds_default)
577 optparser.add_option('--debug-select',
580 metavar='[-]DFLAG[+]|[-]+,...',
582 '''enable (`-': disable) each specified DFLAG;
583 `+': do same for all "more interesting" DFLAGSs;
584 just `+': all DFLAGs.
585 DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]),
589 optparser.add_option('-c', '--config',
592 metavar='CONFIGFILE',
597 (opts, args) = optparser.parse_args()
598 if len(args): optparser.error('no non-option arguments please')
601 readconfig('/etc/hippotat/config', False)
602 readconfig('/etc/hippotat/config.d', False)
605 (pss, pcs) = process_cfg_putatives()
606 process_cfg(pss, pcs)
607 except (configparser.Error, ValueError):
608 traceback.print_exc(file=sys.stderr)
609 print('\nInvalid configuration, giving up.', file=sys.stderr)
612 #print(repr(debug_set), file=sys.stderr)
614 log_formatter = twisted.logger.formatEventAsClassicLogText
615 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
616 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
617 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
618 stdsomething_obs = twisted.logger.FilteringLogObserver(
619 stderr_obs, [pred], stdout_obs
621 log_observer = twisted.logger.FilteringLogObserver(
622 stdsomething_obs, [LogNotBoringTwisted()]
624 #log_observer = stdsomething_obs
625 twisted.logger.globalLogBeginner.beginLoggingTo(
626 [ log_observer, crash_on_critical ]
630 log_debug(DBG.INIT, 'entering reactor')
631 if not _crashing: reactor.run()
632 print('CRASHED (end)', file=sys.stderr)