3 # Hippotat - Asinine IP Over HTTP program
4 # hippotatlib/__init__.py - common library code
6 # Copyright 2017 Ian Jackson
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
20 # You should have received a copy of the GNU General Public License
21 # along with this program, in the file GPLv3. If not,
22 # see <http://www.gnu.org/licenses/>.
26 signal.signal(signal.SIGINT, signal.SIG_DFL)
31 from zope.interface import implementer
34 from twisted.internet import reactor
35 import twisted.internet.endpoints
37 from twisted.logger import LogLevel
38 import twisted.python.constants
39 from twisted.python.constants import NamedConstant
42 from ipaddress import AddressValueError
44 from optparse import OptionParser
46 from configparser import ConfigParser
47 from configparser import NoOptionError
49 from functools import partial
61 import hippotatlib.slip as slip
63 class DBG(twisted.python.constants.Names):
64 INIT = NamedConstant()
65 CONFIG = NamedConstant()
66 ROUTE = NamedConstant()
67 DROP = NamedConstant()
68 OWNSOURCE = NamedConstant()
69 FLOW = NamedConstant()
70 HTTP = NamedConstant()
71 TWISTED = NamedConstant()
72 QUEUE = NamedConstant()
73 HTTP_CTRL = NamedConstant()
74 QUEUE_CTRL = NamedConstant()
75 HTTP_FULL = NamedConstant()
76 CTRL_DUMP = NamedConstant()
77 SLIP_FULL = NamedConstant()
78 DATA_COMPLETE = NamedConstant()
80 _hex_codec = codecs.getencoder('hex_codec')
82 #---------- logging ----------
84 org_stderr = sys.stderr
86 log = twisted.logger.Logger()
89 debug_def_detail = DBG.HTTP
91 def log_debug(dflag, msg, idof=None, d=None):
92 if dflag not in debug_set: return
93 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
95 msg = '[%#x] %s' % (id(idof), msg)
98 if not DBG.DATA_COMPLETE in debug_set:
102 d = _hex_codec(d)[0].decode('ascii')
103 msg += ' ' + d + trunc
104 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
106 def logevent_is_boringtwisted(event):
108 if event.get('log_level') != LogLevel.info:
110 dflag = event.get('dflag')
111 if dflag is False : return False
112 if dflag in debug_set: return False
113 if dflag is None and DBG.TWISTED in debug_set: return False
116 print('EXCEPTION (IN BORINGTWISTED CHECK)',
117 traceback.format_exc(), file=org_stderr)
120 @implementer(twisted.logger.ILogFilterPredicate)
121 class LogNotBoringTwisted:
122 def __call__(self, event):
124 twisted.logger.PredicateResult.no
125 if logevent_is_boringtwisted(event) else
126 twisted.logger.PredicateResult.yes
129 #---------- default config ----------
133 max_batch_down = 65536
135 target_requests_outstanding = 3
137 http_timeout_grace = 5
138 max_requests_outstanding = 6
143 ifname_client = hippo%%d
144 ifname_server = shippo%%d
147 #[server] or [<client>] overrides
148 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip,%(ifname)s %(rnets)s
150 # relating to virtual network
153 # addrs = 127.0.0.1 ::1
156 # relating to virtual network
157 vvnetwork = 172.24.230.192
158 # vnetwork = <prefix>/<len>
163 # [<client-ip4-or-ipv6-address>]
164 # secret = <secret> # used by both, must match
167 max_batch_down = 262144
170 target_requests_outstanding = 10
173 # these need to be defined here so that they can be imported by import *
174 cfg = ConfigParser(strict=False)
175 optparser = OptionParser()
177 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
178 def mime_translate(s):
179 # SLIP-encoded packets cannot contain ESC ESC.
180 # Swap `-' and ESC. The result cannot contain `--'
181 return s.translate(_mimetrans)
187 return 'ConfigResults('+repr(self.__dict__)+')'
189 def log_discard(packet, iface, saddr, daddr, why):
191 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
194 #---------- packet parsing ----------
196 def packet_addrs(packet):
197 version = packet[0] >> 4
201 factory = ipaddress.IPv4Address
205 factory = ipaddress.IPv6Address
207 raise ValueError('unsupported IP version %d' % version)
208 saddr = factory(packet[ saddroff : saddroff + addrlen ])
209 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
210 return (saddr, daddr)
212 #---------- address handling ----------
216 r = ipaddress.IPv4Address(input)
217 except AddressValueError:
218 r = ipaddress.IPv6Address(input)
221 def ipnetwork(input):
223 r = ipaddress.IPv4Network(input)
224 except NetworkValueError:
225 r = ipaddress.IPv6Network(input)
228 #---------- ipif (SLIP) subprocess ----------
230 class SlipStreamDecoder():
231 def __init__(self, desc, on_packet, mtu):
233 self._on_packet = on_packet
236 self._log('__init__')
238 def _log(self, msg, **kwargs):
239 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
241 def inputdata(self, data):
242 self._log('inputdata', d=data)
243 data = self._buffer + data
245 packets = slip.decode(data, True)
246 self._buffer = packets.pop()
247 for packet in packets:
248 self._maybe_packet(packet)
249 self._log('bufremain', d=self._buffer)
251 def _maybe_packet(self, packet):
252 self._log('maybepacket', d=packet)
253 if len(packet) and len(packet) <= self._mtu:
254 self._on_packet(packet)
260 packets = slip.decode(data)
261 assert(len(packets) == 1)
262 self._maybe_packet(packets[0])
264 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
265 def __init__(self, router, mtu):
266 self._router = router
267 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet, mtu)
268 def connectionMade(self): pass
269 def outReceived(self, data):
270 self._decoder.inputdata(data)
271 def slip_on_packet(self, packet):
272 (saddr, daddr) = packet_addrs(packet)
273 if saddr.is_link_local or daddr.is_link_local:
274 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
276 self._router(packet, saddr, daddr)
277 def processEnded(self, status):
278 status.raiseException()
280 def start_ipif(command, router, mtu):
281 ipif = _IpifProcessProtocol(router, mtu)
282 reactor.spawnProcess(ipif,
283 '/bin/sh',['sh','-xc', command],
284 childFDs={0:'w', 1:'r', 2:2},
288 def queue_inbound(ipif, packet):
289 log_debug(DBG.FLOW, "queue_inbound", d=packet)
290 ipif.transport.write(slip.delimiter)
291 ipif.transport.write(slip.encode(packet))
292 ipif.transport.write(slip.delimiter)
294 #---------- packet queue ----------
297 def __init__(self, desc, max_queue_time):
300 self._max_queue_time = max_queue_time
301 self._pq = collections.deque() # packets
303 def _log(self, dflag, msg, **kwargs):
304 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
306 def append(self, packet):
307 self._log(DBG.QUEUE, 'append', d=packet)
308 self._pq.append((time.monotonic(), packet))
311 self._log(DBG.QUEUE, 'nonempty ?')
313 try: (queuetime, packet) = self._pq[0]
315 self._log(DBG.QUEUE, 'nonempty ? empty.')
318 age = time.monotonic() - queuetime
319 if age > self._max_queue_time:
320 # strip old packets off the front
321 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
325 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
328 def process(self, sizequery, moredata, max_batch):
329 # sizequery() should return size of batch so far
330 # moredata(s) should add s to batch
331 self._log(DBG.QUEUE, 'process...')
333 try: (dummy, packet) = self._pq[0]
335 self._log(DBG.QUEUE, 'process... empty')
338 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
340 encoded = slip.encode(packet)
343 self._log(DBG.QUEUE_CTRL,
344 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
348 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
349 self._log(DBG.QUEUE_CTRL, 'process... overflow')
351 moredata(slip.delimiter)
356 #---------- error handling ----------
363 print('========== CRASH ==========', err,
364 '===========================', file=sys.stderr)
366 except twisted.internet.error.ReactorNotRunning: pass
368 def crash_on_defer(defer):
369 defer.addErrback(lambda err: crash(err))
371 def crash_on_critical(event):
372 if event.get('log_level') >= LogLevel.critical:
373 crash(twisted.logger.formatEvent(event))
375 #---------- authentication tokens ----------
377 _authtoken_digest = hashlib.sha256
379 def _authtoken_time():
380 return int(time.time())
382 def _authtoken_hmac(secret, hextime):
383 return hmac.new(secret, hextime, _authtoken_digest).digest()
385 def authtoken_make(secret):
386 hextime = ('%x' % _authtoken_time()).encode('ascii')
387 mac = _authtoken_hmac(secret, hextime)
388 return hextime + b' ' + base64.b64encode(mac)
390 def authtoken_check(secret, token, maxskew):
391 (hextime, theirmac64) = token.split(b' ')
392 now = _authtoken_time()
393 then = int(hextime, 16)
395 if (abs(skew) > maxskew):
396 raise ValueError('too much clock skew (client %ds ahead)' % skew)
397 theirmac = base64.b64decode(theirmac64)
398 ourmac = _authtoken_hmac(secret, hextime)
399 if not hmac.compare_digest(theirmac, ourmac):
400 raise ValueError('invalid token (wrong secret?)')
403 #---------- config processing ----------
405 def _cfg_process_putatives():
408 # maps from abstract object to canonical name for cs's
410 def putative(cmap, abstract, canoncs):
412 current_canoncs = cmap[abstract]
416 assert(current_canoncs == canoncs)
417 cmap[abstract] = canoncs
419 server_pat = r'[-.0-9A-Za-z]+'
420 client_pat = r'[.:0-9a-f]+'
421 server_re = regexp.compile(server_pat)
422 serverclient_re = regexp.compile(
423 server_pat + r' ' + '(?:' + client_pat + '|LIMIT)')
425 for cs in cfg.sections():
427 log_debug_config('putatives: section [%s] %s' % (cs, m))
430 dbg('X ignore: %s' % (why))
431 print('warning: ignoring config section [%s] (%s)' % (cs, why),
434 if cs == 'LIMIT' or cs == 'COMMON':
435 # plan A "[LIMIT]" or "[COMMON]"
440 # plan B "[<client>]" part 1
442 except AddressValueError:
444 if server_re.fullmatch(cs):
445 # plan C "[<servername>]"
447 putative(servers, cs, cs)
450 if serverclient_re.fullmatch(cs):
451 # plan D "[<servername> <client>]" part 1
452 (pss,pcs) = cs.split(' ')
455 # plan E "[<servername> LIMIT]"
456 dbg('E <server> LIMIT')
460 # plan D "[<servername> <client>]" part 2
462 except AddressValueError:
463 # plan F branch 1 "[<some thing we do not understand>]"
464 log_ignore('bad-addr')
467 else: # no AddressValueError
468 # plan D "[<servername> <client>]" part 3
469 dbg('D <server> <client>')
470 putative(clients, ci, pcs)
471 putative(servers, pss, pss)
474 # plan F branch 2 "[<some thing we do not understand>]"
475 log_ignore('nomatch '+ repr(serverclient_re))
477 else: # no AddressValueError
478 # plan B "[<client>" part 2
480 putative(clients, ci, cs)
483 return (servers, clients)
485 def cfg_process_general(c, ss):
486 c.mtu = cfg1getint(ss, 'mtu')
488 def cfg_process_saddrs(c, ss):
490 def __init__(self, port, addrspec):
494 self.addr = ipaddress.IPv4Address(addrspec)
495 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
497 except AddressValueError:
498 self.addr = ipaddress.IPv6Address(addrspec)
499 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
500 self._inurl = b'[%s]'
501 def make_endpoint(self):
502 return self._endpointfactory(reactor, self.port,
503 interface= '%s' % self.addr)
505 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
506 if self.port != 80: url += b':%d' % self.port
510 return 'ServerAddr'+repr((self.port,self.addr))
512 c.port = cfg1getint(ss,'port')
514 for addrspec in cfg1get(ss, 'addrs').split():
515 sa = ServerAddr(c.port, addrspec)
518 def cfg_process_vnetwork(c, ss):
519 c.vnetwork = ipnetwork(cfg1get(ss,'vnetwork'))
520 if c.vnetwork.num_addresses < 3 + 2:
521 raise ValueError('vnetwork needs at least 2^3 addresses')
523 def cfg_process_vaddr(c, ss):
525 c.vaddr = ipaddr(cfg1get(ss,'vaddr'))
526 except NoOptionError:
527 cfg_process_vnetwork(c, ss)
528 c.vaddr = next(c.vnetwork.hosts())
530 def cfg_search_section(key,sections):
531 for section in sections:
532 if cfg.has_option(section, key):
534 raise NoOptionError(key, repr(sections))
536 def cfg_get_raw(*args, **kwargs):
537 # for passing to cfg_search
538 return cfg.get(*args, raw=True, **kwargs)
540 def cfg_search(getter,key,sections):
541 section = cfg_search_section(key,sections)
542 return getter(section, key)
544 def cfg1get(section,key, getter=cfg.get,**kwargs):
545 section = cfg_search_section(key,[section,'COMMON'])
546 return getter(section,key,**kwargs)
548 def cfg1getint(section,key, **kwargs):
549 return cfg1get(section,key, getter=cfg.getint,**kwargs);
551 def cfg_process_client_limited(cc,ss,sections,key):
552 val = cfg_search(cfg1getint, key, sections)
553 lim = cfg_search(cfg1getint, key, ['%s LIMIT' % ss, 'LIMIT'])
554 cc.__dict__[key] = min(val,lim)
556 def cfg_process_client_common(cc,ss,cs,ci):
557 # returns sections to search in, iff secret is defined, otherwise None
560 sections = ['%s %s' % (ss,cs),
565 try: pwsection = cfg_search_section('secret', sections)
566 except NoOptionError: return None
568 pw = cfg1get(pwsection, 'secret')
569 cc.secret = pw.encode('utf-8')
571 cfg_process_client_limited(cc,ss,sections,'target_requests_outstanding')
572 cfg_process_client_limited(cc,ss,sections,'http_timeout')
576 def cfg_process_ipif(c, sections, varmap):
578 try: v = getattr(c, s)
579 except AttributeError: continue
582 v = cfg_search(cfg1getint, d, sections)
585 #print('CFGIPIF',repr((varmap, sections, c.__dict__)),file=sys.stderr)
587 section = cfg_search_section('ipif', sections)
588 c.ipif_command = cfg1get(section,'ipif', vars=c.__dict__)
590 #---------- startup ----------
592 def log_debug_config(m):
593 if not DBG.CONFIG in debug_set: return
594 print('DBG.CONFIG:', m)
596 def common_startup(process_cfg):
597 # calls process_cfg(putative_clients, putative_servers)
599 # ConfigParser hates #-comments after values
600 trailingcomments_re = regexp.compile(r'#.*')
601 cfg.read_string(trailingcomments_re.sub('', defcfg))
604 def readconfig(pathname, mandatory=True):
605 def log(m, p=pathname):
606 if not DBG.CONFIG in debug_set: return
607 log_debug_config('%s: %s' % (m, p))
610 files = os.listdir(pathname)
612 except FileNotFoundError:
617 except NotADirectoryError:
624 re = regexp.compile('[^-A-Za-z0-9_]')
625 for f in os.listdir(pathname):
626 if re.search(f): continue
627 subpath = pathname + '/' + f
630 except FileNotFoundError:
631 log('entry skipped', subpath)
634 log('entry read', subpath)
636 def oc_config(od,os, value, op):
641 def oc_extra_config(od,os, value, op):
644 def read_defconfig():
645 readconfig('/etc/hippotat/config.d', False)
646 readconfig('/etc/hippotat/secrets.d', False)
647 readconfig('/etc/hippotat/master.cfg', False)
649 def oc_defconfig(od,os, value, op):
652 read_defconfig(value)
654 def dfs_less_detailed(dl):
655 return [df for df in DBG.iterconstants() if df <= dl]
657 def ds_default(od,os,dl,op):
660 debug_set |= set(dfs_less_detailed(debug_def_detail))
662 def ds_select(od,os, spec, op):
663 for it in spec.split(','):
665 if it.startswith('-'):
666 mutator = debug_set.discard
669 mutator = debug_set.add
672 dfs = DBG.iterconstants()
676 mapper = dfs_less_detailed
679 mapper = lambda x: [x]
682 dfspec = DBG.lookupByName(it)
684 optparser.error('unknown debug flag %s in --debug-select' % it)
691 optparser.add_option('-D', '--debug',
694 help='enable default debug (to stdout)',
695 callback= ds_default)
697 optparser.add_option('--debug-select',
700 metavar='[-]DFLAG[+]|[-]+,...',
702 '''enable (`-': disable) each specified DFLAG;
703 `+': do same for all "more interesting" DFLAGSs;
704 just `+': all DFLAGs.
705 DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]),
709 optparser.add_option('-c', '--config',
712 metavar='CONFIGFILE',
717 optparser.add_option('--extra-config',
720 metavar='CONFIGFILE',
723 callback= oc_extra_config)
725 optparser.add_option('--default-config',
727 callback= oc_defconfig)
729 (opts, args) = optparser.parse_args()
730 if len(args): optparser.error('no non-option arguments please')
736 (pss, pcs) = _cfg_process_putatives()
737 process_cfg(opts, pss, pcs)
738 except (configparser.Error, ValueError):
739 traceback.print_exc(file=sys.stderr)
740 print('\nInvalid configuration, giving up.', file=sys.stderr)
744 #print('X', debug_set, file=sys.stderr)
746 log_formatter = twisted.logger.formatEventAsClassicLogText
747 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
748 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
749 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
750 stdsomething_obs = twisted.logger.FilteringLogObserver(
751 stderr_obs, [pred], stdout_obs
753 global file_log_observer
754 file_log_observer = twisted.logger.FilteringLogObserver(
755 stdsomething_obs, [LogNotBoringTwisted()]
757 #log_observer = stdsomething_obs
758 twisted.logger.globalLogBeginner.beginLoggingTo(
759 [ file_log_observer, crash_on_critical ]
763 log_debug(DBG.INIT, 'entering reactor')
764 if not _crashing: reactor.run()
765 print('ENDED', file=sys.stderr)