3 # Hippotat - Asinine IP Over HTTP program
4 # hippotatlib/__init__.py - common library code
6 # Copyright 2017 Ian Jackson
10 # This program is free software: you can redistribute it and/or modify
11 # it under the terms of the GNU General Public License as published by
12 # the Free Software Foundation, either version 3 of the License, or
13 # (at your option) any later version.
15 # This program is distributed in the hope that it will be useful,
16 # but WITHOUT ANY WARRANTY; without even the implied warranty of
17 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 # GNU General Public License for more details.
20 # You should have received a copy of the GNU General Public License
21 # along with this program, in the file GPLv3. If not,
22 # see <http://www.gnu.org/licenses/>.
26 signal.signal(signal.SIGINT, signal.SIG_DFL)
31 from zope.interface import implementer
34 from twisted.internet import reactor
35 import twisted.internet.endpoints
37 from twisted.logger import LogLevel
38 import twisted.python.constants
39 from twisted.python.constants import NamedConstant
42 from ipaddress import AddressValueError
44 from optparse import OptionParser
46 from configparser import ConfigParser
47 from configparser import NoOptionError
49 from functools import partial
58 import hippotatlib.slip as slip
60 class DBG(twisted.python.constants.Names):
61 INIT = NamedConstant()
62 CONFIG = NamedConstant()
63 ROUTE = NamedConstant()
64 DROP = NamedConstant()
65 OWNSOURCE = NamedConstant()
66 FLOW = NamedConstant()
67 HTTP = NamedConstant()
68 TWISTED = NamedConstant()
69 QUEUE = NamedConstant()
70 HTTP_CTRL = NamedConstant()
71 QUEUE_CTRL = NamedConstant()
72 HTTP_FULL = NamedConstant()
73 CTRL_DUMP = NamedConstant()
74 SLIP_FULL = NamedConstant()
75 DATA_COMPLETE = NamedConstant()
77 _hex_codec = codecs.getencoder('hex_codec')
79 #---------- logging ----------
81 org_stderr = sys.stderr
83 log = twisted.logger.Logger()
86 debug_def_detail = DBG.HTTP
88 def log_debug(dflag, msg, idof=None, d=None):
89 if dflag not in debug_set: return
90 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
92 msg = '[%#x] %s' % (id(idof), msg)
95 if not DBG.DATA_COMPLETE in debug_set:
99 d = _hex_codec(d)[0].decode('ascii')
100 msg += ' ' + d + trunc
101 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
103 @implementer(twisted.logger.ILogFilterPredicate)
104 class LogNotBoringTwisted:
105 def __call__(self, event):
106 yes = twisted.logger.PredicateResult.yes
107 no = twisted.logger.PredicateResult.no
109 if event.get('log_level') != LogLevel.info:
111 dflag = event.get('dflag')
112 if dflag is False : return yes
113 if dflag in debug_set: return yes
114 if dflag is None and DBG.TWISTED in debug_set: return yes
117 print(traceback.format_exc(), file=org_stderr)
120 #---------- default config ----------
124 max_batch_down = 65536
126 target_requests_outstanding = 3
128 http_timeout_grace = 5
129 max_requests_outstanding = 6
135 #[server] or [<client>] overrides
136 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
138 # relating to virtual network
143 # addrs = 127.0.0.1 ::1
146 # relating to virtual network
147 vvnetwork = 172.24.230.192
148 # vnetwork = <prefix>/<len>
153 # [<client-ip4-or-ipv6-address>]
154 # password = <password> # used by both, must match
157 max_batch_down = 262144
160 target_requests_outstanding = 10
163 # these need to be defined here so that they can be imported by import *
164 cfg = ConfigParser(strict=False)
165 optparser = OptionParser()
167 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
168 def mime_translate(s):
169 # SLIP-encoded packets cannot contain ESC ESC.
170 # Swap `-' and ESC. The result cannot contain `--'
171 return s.translate(_mimetrans)
177 return 'ConfigResults('+repr(self.__dict__)+')'
179 def log_discard(packet, iface, saddr, daddr, why):
181 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
184 #---------- packet parsing ----------
186 def packet_addrs(packet):
187 version = packet[0] >> 4
191 factory = ipaddress.IPv4Address
195 factory = ipaddress.IPv6Address
197 raise ValueError('unsupported IP version %d' % version)
198 saddr = factory(packet[ saddroff : saddroff + addrlen ])
199 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
200 return (saddr, daddr)
202 #---------- address handling ----------
206 r = ipaddress.IPv4Address(input)
207 except AddressValueError:
208 r = ipaddress.IPv6Address(input)
211 def ipnetwork(input):
213 r = ipaddress.IPv4Network(input)
214 except NetworkValueError:
215 r = ipaddress.IPv6Network(input)
218 #---------- ipif (SLIP) subprocess ----------
220 class SlipStreamDecoder():
221 def __init__(self, desc, on_packet):
223 self._on_packet = on_packet
225 self._log('__init__')
227 def _log(self, msg, **kwargs):
228 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
230 def inputdata(self, data):
231 self._log('inputdata', d=data)
232 data = self._buffer + data
234 packets = slip.decode(data, True)
235 self._buffer = packets.pop()
236 for packet in packets:
237 self._maybe_packet(packet)
238 self._log('bufremain', d=self._buffer)
240 def _maybe_packet(self, packet):
241 self._log('maybepacket', d=packet)
243 self._on_packet(packet)
249 packets = slip.decode(data)
250 assert(len(packets) == 1)
251 self._maybe_packet(packets[0])
253 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
254 def __init__(self, router):
255 self._router = router
256 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
257 def connectionMade(self): pass
258 def outReceived(self, data):
259 self._decoder.inputdata(data)
260 def slip_on_packet(self, packet):
261 (saddr, daddr) = packet_addrs(packet)
262 if saddr.is_link_local or daddr.is_link_local:
263 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
265 self._router(packet, saddr, daddr)
266 def processEnded(self, status):
267 status.raiseException()
269 def start_ipif(command, router):
270 ipif = _IpifProcessProtocol(router)
271 reactor.spawnProcess(ipif,
272 '/bin/sh',['sh','-xc', command],
273 childFDs={0:'w', 1:'r', 2:2},
277 def queue_inbound(ipif, packet):
278 log_debug(DBG.FLOW, "queue_inbound", d=packet)
279 ipif.transport.write(slip.delimiter)
280 ipif.transport.write(slip.encode(packet))
281 ipif.transport.write(slip.delimiter)
283 #---------- packet queue ----------
286 def __init__(self, desc, max_queue_time):
289 self._max_queue_time = max_queue_time
290 self._pq = collections.deque() # packets
292 def _log(self, dflag, msg, **kwargs):
293 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
295 def append(self, packet):
296 self._log(DBG.QUEUE, 'append', d=packet)
297 self._pq.append((time.monotonic(), packet))
300 self._log(DBG.QUEUE, 'nonempty ?')
302 try: (queuetime, packet) = self._pq[0]
304 self._log(DBG.QUEUE, 'nonempty ? empty.')
307 age = time.monotonic() - queuetime
308 if age > self._max_queue_time:
309 # strip old packets off the front
310 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
314 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
317 def process(self, sizequery, moredata, max_batch):
318 # sizequery() should return size of batch so far
319 # moredata(s) should add s to batch
320 self._log(DBG.QUEUE, 'process...')
322 try: (dummy, packet) = self._pq[0]
324 self._log(DBG.QUEUE, 'process... empty')
327 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
329 encoded = slip.encode(packet)
332 self._log(DBG.QUEUE_CTRL,
333 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
337 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
338 self._log(DBG.QUEUE_CTRL, 'process... overflow')
340 moredata(slip.delimiter)
345 #---------- error handling ----------
352 print('========== CRASH ==========', err,
353 '===========================', file=sys.stderr)
355 except twisted.internet.error.ReactorNotRunning: pass
357 def crash_on_defer(defer):
358 defer.addErrback(lambda err: crash(err))
360 def crash_on_critical(event):
361 if event.get('log_level') >= LogLevel.critical:
362 crash(twisted.logger.formatEvent(event))
364 #---------- config processing ----------
366 def _cfg_process_putatives():
369 # maps from abstract object to canonical name for cs's
371 def putative(cmap, abstract, canoncs):
373 current_canoncs = cmap[abstract]
377 assert(current_canoncs == canoncs)
378 cmap[abstract] = canoncs
380 server_pat = r'[-.0-9A-Za-z]+'
381 client_pat = r'[.:0-9a-f]+'
382 server_re = regexp.compile(server_pat)
383 serverclient_re = regexp.compile(server_pat + r' ' + client_pat)
385 for cs in cfg.sections():
391 # plan B "[<client>]" part 1
393 except AddressValueError:
395 if server_re.fullmatch(cs):
396 # plan C "[<servername>]"
397 putative(servers, cs, cs)
400 if serverclient_re.fullmatch(cs):
401 # plan D "[<servername> <client>]" part 1
402 (pss,pcs) = cs.split(' ')
405 # plan E "[<servername> LIMIT]"
409 # plan D "[<servername> <client>]" part 2
411 except AddressValueError:
412 # plan F "[<some thing we do not understand>]"
413 # well, we ignore this
414 print('warning: ignoring config section %s' % cs, file=sys.stderr)
417 else: # no AddressValueError
418 # plan D "[<servername> <client]" part 3
419 putative(clients, ci, pcs)
420 putative(servers, pss, pss)
423 else: # no AddressValueError
424 # plan B "[<client>" part 2
425 putative(clients, ci, cs)
428 return (servers, clients)
430 def cfg_process_common(c, ss):
431 c.mtu = cfg.getint(ss, 'mtu')
433 def cfg_process_saddrs(c, ss):
435 def __init__(self, port, addrspec):
439 self.addr = ipaddress.IPv4Address(addrspec)
440 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
442 except AddressValueError:
443 self.addr = ipaddress.IPv6Address(addrspec)
444 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
445 self._inurl = b'[%s]'
446 def make_endpoint(self):
447 return self._endpointfactory(reactor, self.port, self.addr)
449 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
450 if self.port != 80: url += b':%d' % self.port
454 c.port = cfg.getint(ss,'port')
456 for addrspec in cfg.get(ss, 'addrs').split():
457 sa = ServerAddr(c.port, addrspec)
460 def cfg_process_vnetwork(c, ss):
461 c.vnetwork = ipnetwork(cfg.get(ss,'vnetwork'))
462 if c.vnetwork.num_addresses < 3 + 2:
463 raise ValueError('vnetwork needs at least 2^3 addresses')
465 def cfg_process_vaddr(c, ss):
467 c.vaddr = cfg.get(ss,'vaddr')
468 except NoOptionError:
469 cfg_process_vnetwork(c, ss)
470 c.vaddr = next(c.vnetwork.hosts())
472 def cfg_search_section(key,sections):
473 for section in sections:
474 if cfg.has_option(section, key):
476 raise NoOptionError(key, repr(sections))
478 def cfg_search(getter,key,sections):
479 section = cfg_search_section(key,sections)
480 return getter(section, key)
482 def cfg_process_client_limited(cc,ss,sections,key):
483 val = cfg_search(cfg.getint, key, sections)
484 lim = cfg_search(cfg.getint, key, ['%s LIMIT' % ss, 'LIMIT'])
485 cc.__dict__[key] = min(val,lim)
487 def cfg_process_client_common(cc,ss,cs,ci):
488 # returns sections to search in, iff password is defined, otherwise None
491 sections = ['%s %s' % (ss,cs),
496 try: pwsection = cfg_search_section('password', sections)
497 except NoOptionError: return None
499 pw = cfg.get(pwsection, 'password')
500 cc.password = pw.encode('utf-8')
502 cfg_process_client_limited(cc,ss,sections,'target_requests_outstanding')
503 cfg_process_client_limited(cc,ss,sections,'http_timeout')
507 def cfg_process_ipif(c, sections, varmap):
509 try: v = getattr(c, s)
510 except AttributeError: continue
513 #print('CFGIPIF',repr((varmap, sections, c.__dict__)),file=sys.stderr)
515 section = cfg_search_section('ipif', sections)
516 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
518 #---------- startup ----------
520 def common_startup(process_cfg):
521 # calls process_cfg(putative_clients, putative_servers)
523 # ConfigParser hates #-comments after values
524 trailingcomments_re = regexp.compile(r'#.*')
525 cfg.read_string(trailingcomments_re.sub('', defcfg))
528 def readconfig(pathname, mandatory=True):
529 def log(m, p=pathname):
530 if not DBG.CONFIG in debug_set: return
531 print('DBG.CONFIG: %s: %s' % (m, pathname))
534 files = os.listdir(pathname)
536 except FileNotFoundError:
541 except NotADirectoryError:
548 re = regexp.compile('[^-A-Za-z0-9_]')
549 for f in os.listdir(cdir):
550 if re.search(f): continue
551 subpath = pathname + '/' + f
554 except FileNotFoundError:
555 log('entry skipped', subpath)
558 log('entry read', subpath)
560 def oc_config(od,os, value, op):
565 def read_defconfig():
566 readconfig('/etc/hippotat/config.d', False)
567 readconfig('/etc/hippotat/passwords.d', False)
568 readconfig('/etc/hippotat/master.cfg', False)
570 def dfs_less_detailed(dl):
571 return [df for df in DBG.iterconstants() if df <= dl]
573 def ds_default(od,os,dl,op):
576 debug_set |= set(dfs_less_detailed(debug_def_detail))
578 def ds_select(od,os, spec, op):
579 for it in spec.split(','):
581 if it.startswith('-'):
582 mutator = debug_set.discard
585 mutator = debug_set.add
588 dfs = DBG.iterconstants()
592 mapper = dfs_less_detailed
595 mapper = lambda x: [x]
598 dfspec = DBG.lookupByName(it)
600 optparser.error('unknown debug flag %s in --debug-select' % it)
607 optparser.add_option('-D', '--debug',
610 help='enable default debug (to stdout)',
611 callback= ds_default)
613 optparser.add_option('--debug-select',
616 metavar='[-]DFLAG[+]|[-]+,...',
618 '''enable (`-': disable) each specified DFLAG;
619 `+': do same for all "more interesting" DFLAGSs;
620 just `+': all DFLAGs.
621 DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]),
625 optparser.add_option('-c', '--config',
628 metavar='CONFIGFILE',
637 (opts, args) = optparser.parse_args()
638 if len(args): optparser.error('no non-option arguments please')
641 (pss, pcs) = _cfg_process_putatives()
642 process_cfg(opts, pss, pcs)
643 except (configparser.Error, ValueError):
644 traceback.print_exc(file=sys.stderr)
645 print('\nInvalid configuration, giving up.', file=sys.stderr)
649 #print('X', debug_set, file=sys.stderr)
651 log_formatter = twisted.logger.formatEventAsClassicLogText
652 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
653 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
654 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
655 stdsomething_obs = twisted.logger.FilteringLogObserver(
656 stderr_obs, [pred], stdout_obs
658 global file_log_observer
659 file_log_observer = twisted.logger.FilteringLogObserver(
660 stdsomething_obs, [LogNotBoringTwisted()]
662 #log_observer = stdsomething_obs
663 twisted.logger.globalLogBeginner.beginLoggingTo(
664 [ file_log_observer, crash_on_critical ]
668 log_debug(DBG.INIT, 'entering reactor')
669 if not _crashing: reactor.run()
670 print('ENDED', file=sys.stderr)