4 signal.signal(signal.SIGINT, signal.SIG_DFL)
8 from zope.interface import implementer
11 from twisted.internet import reactor
12 import twisted.internet.endpoints
14 from twisted.logger import LogLevel
15 import twisted.python.constants
16 from twisted.python.constants import NamedConstant
19 from ipaddress import AddressValueError
21 from optparse import OptionParser
22 from configparser import ConfigParser
23 from configparser import NoOptionError
25 from functools import partial
34 import hippotat.slip as slip
36 class DBG(twisted.python.constants.Names):
37 INIT = NamedConstant()
38 ROUTE = NamedConstant()
39 DROP = NamedConstant()
40 FLOW = NamedConstant()
41 HTTP = NamedConstant()
42 TWISTED = NamedConstant()
43 QUEUE = NamedConstant()
44 HTTP_CTRL = NamedConstant()
45 QUEUE_CTRL = NamedConstant()
46 HTTP_FULL = NamedConstant()
47 CTRL_DUMP = NamedConstant()
48 SLIP_FULL = NamedConstant()
49 DATA_COMPLETE = NamedConstant()
51 _hex_codec = codecs.getencoder('hex_codec')
53 #---------- logging ----------
55 org_stderr = sys.stderr
57 log = twisted.logger.Logger()
60 debug_def_detail = DBG.HTTP
62 def log_debug(dflag, msg, idof=None, d=None):
63 if dflag not in debug_set: return
64 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
66 msg = '[%#x] %s' % (id(idof), msg)
69 if not DBG.DATA_COMPLETE in debug_set:
73 d = _hex_codec(d)[0].decode('ascii')
74 msg += ' ' + d + trunc
75 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
77 @implementer(twisted.logger.ILogFilterPredicate)
78 class LogNotBoringTwisted:
79 def __call__(self, event):
80 yes = twisted.logger.PredicateResult.yes
81 no = twisted.logger.PredicateResult.no
83 if event.get('log_level') != LogLevel.info:
85 dflag = event.get('dflag')
86 if dflag in debug_set: return yes
87 if dflag is None and DBG.TWISTED in debug_set: return yes
90 print(traceback.format_exc(), file=org_stderr)
93 #---------- default config ----------
98 max_batch_down = 65536 # used by server, subject to [limits]
99 max_queue_time = 10 # used by server, subject to [limits]
100 target_requests_outstanding = 3 # must match; subject to [limits] on server
101 http_timeout = 30 # used by both } must be
102 http_timeout_grace = 5 # used by both } compatible
103 max_requests_outstanding = 4 # used by client
104 max_batch_up = 4000 # used by client
105 http_retry = 5 # used by client
107 #[server] or [<client>] overrides
108 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
109 # extra interpolations: %(local)s %(peer)s %(rnet)s
110 # obtained on server [virtual]server [virtual]relay [virtual]network
111 # from on client <client> [virtual]server [virtual]routes
116 # network = <prefix>/<len> # mandatory for server
117 # server = <ipaddr> # used by both, default is computed from `network'
118 # relay = <ipaddr> # used by server, default from `network' and `server'
119 # default server is first host in network
120 # default relay is first host which is not server
123 # addrs = 127.0.0.1 ::1 # mandatory for server
124 port = 80 # used by server
125 # url # used by client; default from first `addrs' and `port'
127 # [<client-ip4-or-ipv6-address>]
128 # password = <password> # used by both, must match
131 max_batch_down = 262144 # used by server
132 max_queue_time = 121 # used by server
133 http_timeout = 121 # used by server
134 target_requests_outstanding = 10 # used by server
137 # these need to be defined here so that they can be imported by import *
139 optparser = OptionParser()
141 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
142 def mime_translate(s):
143 # SLIP-encoded packets cannot contain ESC ESC.
144 # Swap `-' and ESC. The result cannot contain `--'
145 return s.translate(_mimetrans)
148 def __init__(self, d = { }):
151 return 'ConfigResults('+repr(self.__dict__)+')'
155 def log_discard(packet, iface, saddr, daddr, why):
157 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
160 #---------- packet parsing ----------
162 def packet_addrs(packet):
163 version = packet[0] >> 4
167 factory = ipaddress.IPv4Address
171 factory = ipaddress.IPv6Address
173 raise ValueError('unsupported IP version %d' % version)
174 saddr = factory(packet[ saddroff : saddroff + addrlen ])
175 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
176 return (saddr, daddr)
178 #---------- address handling ----------
182 r = ipaddress.IPv4Address(input)
183 except AddressValueError:
184 r = ipaddress.IPv6Address(input)
187 def ipnetwork(input):
189 r = ipaddress.IPv4Network(input)
190 except NetworkValueError:
191 r = ipaddress.IPv6Network(input)
194 #---------- ipif (SLIP) subprocess ----------
196 class SlipStreamDecoder():
197 def __init__(self, desc, on_packet):
199 self._on_packet = on_packet
201 self._log('__init__')
203 def _log(self, msg, **kwargs):
204 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
206 def inputdata(self, data):
207 self._log('inputdata', d=data)
208 packets = slip.decode(data)
209 packets[0] = self._buffer + packets[0]
210 self._buffer = packets.pop()
211 for packet in packets:
212 self._maybe_packet(packet)
213 self._log('bufremain', d=self._buffer)
215 def _maybe_packet(self, packet):
216 self._log('maybepacket', d=packet)
218 self._on_packet(packet)
222 self._maybe_packet(self._buffer)
225 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
226 def __init__(self, router):
227 self._router = router
228 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
229 def connectionMade(self): pass
230 def outReceived(self, data):
231 self._decoder.inputdata(data)
232 def slip_on_packet(self, packet):
233 (saddr, daddr) = packet_addrs(packet)
234 if saddr.is_link_local or daddr.is_link_local:
235 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
237 self._router(packet, saddr, daddr)
238 def processEnded(self, status):
239 status.raiseException()
241 def start_ipif(command, router):
243 ipif = _IpifProcessProtocol(router)
244 reactor.spawnProcess(ipif,
245 '/bin/sh',['sh','-xc', command],
246 childFDs={0:'w', 1:'r', 2:2},
249 def queue_inbound(packet):
250 log_debug(DBG.FLOW, "queue_inbound", d=packet)
251 ipif.transport.write(slip.delimiter)
252 ipif.transport.write(slip.encode(packet))
253 ipif.transport.write(slip.delimiter)
255 #---------- packet queue ----------
258 def __init__(self, desc, max_queue_time):
261 self._max_queue_time = max_queue_time
262 self._pq = collections.deque() # packets
264 def _log(self, dflag, msg, **kwargs):
265 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
267 def append(self, packet):
268 self._log(DBG.QUEUE, 'append', d=packet)
269 self._pq.append((time.monotonic(), packet))
272 self._log(DBG.QUEUE, 'nonempty ?')
274 try: (queuetime, packet) = self._pq[0]
276 self._log(DBG.QUEUE, 'nonempty ? empty.')
279 age = time.monotonic() - queuetime
280 if age > self._max_queue_time:
281 # strip old packets off the front
282 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
286 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
289 def process(self, sizequery, moredata, max_batch):
290 # sizequery() should return size of batch so far
291 # moredata(s) should add s to batch
292 self._log(DBG.QUEUE, 'process...')
294 try: (dummy, packet) = self._pq[0]
296 self._log(DBG.QUEUE, 'process... empty')
299 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
301 encoded = slip.encode(packet)
304 self._log(DBG.QUEUE_CTRL,
305 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
309 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
310 self._log(DBG.QUEUE_CTRL, 'process... overflow')
312 moredata(slip.delimiter)
317 #---------- error handling ----------
324 print('========== CRASH ==========', err,
325 '===========================', file=sys.stderr)
327 except twisted.internet.error.ReactorNotRunning: pass
329 def crash_on_defer(defer):
330 defer.addErrback(lambda err: crash(err))
332 def crash_on_critical(event):
333 if event.get('log_level') >= LogLevel.critical:
334 crash(twisted.logger.formatEvent(event))
336 #---------- config processing ----------
338 def process_cfg_common_always():
340 c.mtu = cfg.get('virtual','mtu')
342 def process_cfg_ipif(section, varmap):
344 try: v = getattr(c, s)
345 except AttributeError: continue
350 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
352 def process_cfg_network():
353 c.network = ipnetwork(cfg.get('virtual','network'))
354 if c.network.num_addresses < 3 + 2:
355 raise ValueError('network needs at least 2^3 addresses')
357 def process_cfg_server():
359 c.server = cfg.get('virtual','server')
360 except NoOptionError:
361 process_cfg_network()
362 c.server = next(c.network.hosts())
365 def __init__(self, port, addrspec):
369 self.addr = ipaddress.IPv4Address(addrspec)
370 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
372 except AddressValueError:
373 self.addr = ipaddress.IPv6Address(addrspec)
374 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
375 self._inurl = b'[%s]'
376 def make_endpoint(self):
377 return self._endpointfactory(reactor, self.port, self.addr)
379 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
380 if self.port != 80: url += b':%d' % self.port
384 def process_cfg_saddrs():
385 try: port = cfg.getint('server','port')
386 except NoOptionError: port = 80
389 for addrspec in cfg.get('server','addrs').split():
390 sa = ServerAddr(port, addrspec)
393 def process_cfg_clients(constructor):
395 for cs in cfg.sections():
396 if not (':' in cs or '.' in cs): continue
398 pw = cfg.get(cs, 'password')
399 pw = pw.encode('utf-8')
400 constructor(ci,cs,pw)
402 #---------- startup ----------
404 def common_startup():
405 optparser.add_option('-c', '--config', dest='configfile',
406 default='/etc/hippotat/config')
408 def dfs_less_detailed(dl):
409 return [df for df in DBG.iterconstants() if df <= dl]
411 def ds_default(od,os,dl,op):
413 debug_set = set(dfs_less_detailed(debug_def_detail))
415 def ds_select(od,os, spec, op):
416 for it in spec.split(','):
418 if it.startswith('-'):
419 mutator = debug_set.discard
422 mutator = debug_set.add
425 dfs = DBG.iterconstants()
429 mapper = dfs_less_detailed
432 mapper = lambda x: [x]
435 dfspec = DBG.lookupByName(it)
437 optparser.error('unknown debug flag %s in --debug-select' % it)
444 optparser.add_option('-D', '--debug',
447 help='enable default debug (to stdout)',
448 callback= ds_default)
450 optparser.add_option('--debug-select',
453 metavar='[-]DFLAG[+]|[-]+,...',
455 '''enable (`-': disable) each specified DFLAG;
456 `+': do same for all "more interesting" DFLAGSs;
457 just `+': all DFLAGs.
458 DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]),
462 (opts, args) = optparser.parse_args()
463 if len(args): optparser.error('no non-option arguments please')
465 print(repr(debug_set), file=sys.stderr)
467 re = regexp.compile('#.*')
468 cfg.read_string(re.sub('', defcfg))
469 cfg.read(opts.configfile)
471 log_formatter = twisted.logger.formatEventAsClassicLogText
472 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
473 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
474 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
475 stdsomething_obs = twisted.logger.FilteringLogObserver(
476 stderr_obs, [pred], stdout_obs
478 log_observer = twisted.logger.FilteringLogObserver(
479 stdsomething_obs, [LogNotBoringTwisted()]
481 #log_observer = stdsomething_obs
482 twisted.logger.globalLogBeginner.beginLoggingTo(
483 [ log_observer, crash_on_critical ]
487 log_debug(DBG.INIT, 'entering reactor')
488 if not _crashing: reactor.run()
489 print('CRASHED (end)', file=sys.stderr)