chiark / gitweb /
process putatives: Add many log_debug_config calls
[hippotat.git] / hippotatlib / __init__.py
1 # -*- python -*-
2 #
3 # Hippotat - Asinine IP Over HTTP program
4 # hippotatlib/__init__.py - common library code
5 #
6 # Copyright 2017 Ian Jackson
7 #
8 # GPLv3+
9 #
10 #    This program is free software: you can redistribute it and/or modify
11 #    it under the terms of the GNU General Public License as published by
12 #    the Free Software Foundation, either version 3 of the License, or
13 #    (at your option) any later version.
14 #
15 #    This program is distributed in the hope that it will be useful,
16 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
17 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 #    GNU General Public License for more details.
19 #
20 #    You should have received a copy of the GNU General Public License
21 #    along with this program, in the file GPLv3.  If not,
22 #    see <http://www.gnu.org/licenses/>.
23
24
25 import signal
26 signal.signal(signal.SIGINT, signal.SIG_DFL)
27
28 import sys
29 import os
30
31 from zope.interface import implementer
32
33 import twisted
34 from twisted.internet import reactor
35 import twisted.internet.endpoints
36 import twisted.logger
37 from twisted.logger import LogLevel
38 import twisted.python.constants
39 from twisted.python.constants import NamedConstant
40
41 import ipaddress
42 from ipaddress import AddressValueError
43
44 from optparse import OptionParser
45 import configparser
46 from configparser import ConfigParser
47 from configparser import NoOptionError
48
49 from functools import partial
50
51 import collections
52 import time
53 import codecs
54 import traceback
55
56 import re as regexp
57
58 import hippotatlib.slip as slip
59
60 class DBG(twisted.python.constants.Names):
61   INIT = NamedConstant()
62   CONFIG = NamedConstant()
63   ROUTE = NamedConstant()
64   DROP = NamedConstant()
65   OWNSOURCE = NamedConstant()
66   FLOW = NamedConstant()
67   HTTP = NamedConstant()
68   TWISTED = NamedConstant()
69   QUEUE = NamedConstant()
70   HTTP_CTRL = NamedConstant()
71   QUEUE_CTRL = NamedConstant()
72   HTTP_FULL = NamedConstant()
73   CTRL_DUMP = NamedConstant()
74   SLIP_FULL = NamedConstant()
75   DATA_COMPLETE = NamedConstant()
76
77 _hex_codec = codecs.getencoder('hex_codec')
78
79 #---------- logging ----------
80
81 org_stderr = sys.stderr
82
83 log = twisted.logger.Logger()
84
85 debug_set = set()
86 debug_def_detail = DBG.HTTP
87
88 def log_debug(dflag, msg, idof=None, d=None):
89   if dflag not in debug_set: return
90   #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
91   if idof is not None:
92     msg = '[%#x] %s' % (id(idof), msg)
93   if d is not None:
94     trunc = ''
95     if not DBG.DATA_COMPLETE in debug_set:
96       if len(d) > 64:
97         d = d[0:64]
98         trunc = '...'
99     d = _hex_codec(d)[0].decode('ascii')
100     msg += ' ' + d + trunc
101   log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
102
103 def logevent_is_boringtwisted(event):
104   try:
105     if event.get('log_level') != LogLevel.info:
106       return False
107     dflag = event.get('dflag')
108     if dflag is False                            : return False
109     if dflag                         in debug_set: return False
110     if dflag is None and DBG.TWISTED in debug_set: return False
111     return True
112   except Exception:
113     print('EXCEPTION (IN BORINGTWISTED CHECK)',
114           traceback.format_exc(), file=org_stderr)
115     return False
116
117 @implementer(twisted.logger.ILogFilterPredicate)
118 class LogNotBoringTwisted:
119   def __call__(self, event):
120     return (
121       twisted.logger.PredicateResult.no
122       if logevent_is_boringtwisted(event) else
123       twisted.logger.PredicateResult.yes
124     )
125
126 #---------- default config ----------
127
128 defcfg = '''
129 [COMMON]
130 max_batch_down = 65536
131 max_queue_time = 10
132 target_requests_outstanding = 3
133 http_timeout = 30
134 http_timeout_grace = 5
135 max_requests_outstanding = 6
136 max_batch_up = 4000
137 http_retry = 5
138 port = 80
139 vroutes = ''
140 ifname_client = hippo%%d
141 ifname_server = shippo%%d
142
143 #[server] or [<client>] overrides
144 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip,%(ifname)s %(rnets)s
145
146 # relating to virtual network
147 mtu = 1500
148
149 # addrs = 127.0.0.1 ::1
150 # url
151
152 # relating to virtual network
153 vvnetwork = 172.24.230.192
154 # vnetwork = <prefix>/<len>
155 # vaddr    = <ipaddr>
156 # vrelay   = <ipaddr>
157
158
159 # [<client-ip4-or-ipv6-address>]
160 # password = <password>    # used by both, must match
161
162 [LIMIT]
163 max_batch_down = 262144
164 max_queue_time = 121
165 http_timeout = 121
166 target_requests_outstanding = 10
167 '''
168
169 # these need to be defined here so that they can be imported by import *
170 cfg = ConfigParser(strict=False)
171 optparser = OptionParser()
172
173 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
174 def mime_translate(s):
175   # SLIP-encoded packets cannot contain ESC ESC.
176   # Swap `-' and ESC.  The result cannot contain `--'
177   return s.translate(_mimetrans)
178
179 class ConfigResults:
180   def __init__(self):
181     pass
182   def __repr__(self):
183     return 'ConfigResults('+repr(self.__dict__)+')'
184
185 def log_discard(packet, iface, saddr, daddr, why):
186   log_debug(DBG.DROP,
187             'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
188             d=packet)
189
190 #---------- packet parsing ----------
191
192 def packet_addrs(packet):
193   version = packet[0] >> 4
194   if version == 4:
195     addrlen = 4
196     saddroff = 3*4
197     factory = ipaddress.IPv4Address
198   elif version == 6:
199     addrlen = 16
200     saddroff = 2*4
201     factory = ipaddress.IPv6Address
202   else:
203     raise ValueError('unsupported IP version %d' % version)
204   saddr = factory(packet[ saddroff           : saddroff + addrlen   ])
205   daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
206   return (saddr, daddr)
207
208 #---------- address handling ----------
209
210 def ipaddr(input):
211   try:
212     r = ipaddress.IPv4Address(input)
213   except AddressValueError:
214     r = ipaddress.IPv6Address(input)
215   return r
216
217 def ipnetwork(input):
218   try:
219     r = ipaddress.IPv4Network(input)
220   except NetworkValueError:
221     r = ipaddress.IPv6Network(input)
222   return r
223
224 #---------- ipif (SLIP) subprocess ----------
225
226 class SlipStreamDecoder():
227   def __init__(self, desc, on_packet):
228     self._buffer = b''
229     self._on_packet = on_packet
230     self._desc = desc
231     self._log('__init__')
232
233   def _log(self, msg, **kwargs):
234     log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
235
236   def inputdata(self, data):
237     self._log('inputdata', d=data)
238     data = self._buffer + data
239     self._buffer = b''
240     packets = slip.decode(data, True)
241     self._buffer = packets.pop()
242     for packet in packets:
243       self._maybe_packet(packet)
244     self._log('bufremain', d=self._buffer)
245
246   def _maybe_packet(self, packet):
247     self._log('maybepacket', d=packet)
248     if len(packet):
249       self._on_packet(packet)
250
251   def flush(self):
252     self._log('flush')
253     data = self._buffer
254     self._buffer = b''
255     packets = slip.decode(data)
256     assert(len(packets) == 1)
257     self._maybe_packet(packets[0])
258
259 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
260   def __init__(self, router):
261     self._router = router
262     self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
263   def connectionMade(self): pass
264   def outReceived(self, data):
265     self._decoder.inputdata(data)
266   def slip_on_packet(self, packet):
267     (saddr, daddr) = packet_addrs(packet)
268     if saddr.is_link_local or daddr.is_link_local:
269       log_discard(packet, 'ipif', saddr, daddr, 'link-local')
270       return
271     self._router(packet, saddr, daddr)
272   def processEnded(self, status):
273     status.raiseException()
274
275 def start_ipif(command, router):
276   ipif = _IpifProcessProtocol(router)
277   reactor.spawnProcess(ipif,
278                        '/bin/sh',['sh','-xc', command],
279                        childFDs={0:'w', 1:'r', 2:2},
280                        env=None)
281   return ipif
282
283 def queue_inbound(ipif, packet):
284   log_debug(DBG.FLOW, "queue_inbound", d=packet)
285   ipif.transport.write(slip.delimiter)
286   ipif.transport.write(slip.encode(packet))
287   ipif.transport.write(slip.delimiter)
288
289 #---------- packet queue ----------
290
291 class PacketQueue():
292   def __init__(self, desc, max_queue_time):
293     self._desc = desc
294     assert(desc + '')
295     self._max_queue_time = max_queue_time
296     self._pq = collections.deque() # packets
297
298   def _log(self, dflag, msg, **kwargs):
299     log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
300
301   def append(self, packet):
302     self._log(DBG.QUEUE, 'append', d=packet)
303     self._pq.append((time.monotonic(), packet))
304
305   def nonempty(self):
306     self._log(DBG.QUEUE, 'nonempty ?')
307     while True:
308       try: (queuetime, packet) = self._pq[0]
309       except IndexError:
310         self._log(DBG.QUEUE, 'nonempty ? empty.')
311         return False
312
313       age = time.monotonic() - queuetime
314       if age > self._max_queue_time:
315         # strip old packets off the front
316         self._log(DBG.QUEUE, 'dropping (old)', d=packet)
317         self._pq.popleft()
318         continue
319
320       self._log(DBG.QUEUE, 'nonempty ? nonempty.')
321       return True
322
323   def process(self, sizequery, moredata, max_batch):
324     # sizequery() should return size of batch so far
325     # moredata(s) should add s to batch
326     self._log(DBG.QUEUE, 'process...')
327     while True:
328       try: (dummy, packet) = self._pq[0]
329       except IndexError:
330         self._log(DBG.QUEUE, 'process... empty')
331         break
332
333       self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
334
335       encoded = slip.encode(packet)
336       sofar = sizequery()  
337
338       self._log(DBG.QUEUE_CTRL,
339                 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
340                 d=encoded)
341
342       if sofar > 0:
343         if sofar + len(slip.delimiter) + len(encoded) > max_batch:
344           self._log(DBG.QUEUE_CTRL, 'process... overflow')
345           break
346         moredata(slip.delimiter)
347
348       moredata(encoded)
349       self._pq.popleft()
350
351 #---------- error handling ----------
352
353 _crashing = False
354
355 def crash(err):
356   global _crashing
357   _crashing = True
358   print('========== CRASH ==========', err,
359         '===========================', file=sys.stderr)
360   try: reactor.stop()
361   except twisted.internet.error.ReactorNotRunning: pass
362
363 def crash_on_defer(defer):
364   defer.addErrback(lambda err: crash(err))
365
366 def crash_on_critical(event):
367   if event.get('log_level') >= LogLevel.critical:
368     crash(twisted.logger.formatEvent(event))
369
370 #---------- config processing ----------
371
372 def _cfg_process_putatives():
373   servers = { }
374   clients = { }
375   # maps from abstract object to canonical name for cs's
376
377   def putative(cmap, abstract, canoncs):
378     try:
379       current_canoncs = cmap[abstract]
380     except KeyError:
381       pass
382     else:
383       assert(current_canoncs == canoncs)
384     cmap[abstract] = canoncs
385
386   server_pat = r'[-.0-9A-Za-z]+'
387   client_pat = r'[.:0-9a-f]+'
388   server_re = regexp.compile(server_pat)
389   serverclient_re = regexp.compile(
390         server_pat + r' ' + '(?:' + client_pat + '|LIMIT)')
391
392   for cs in cfg.sections():
393     log_debug_config('putatives: section [%s]...' % (cs))
394
395     def log_ignore(why):
396       log_debug_config('putatives: section [%s] X ignore: %s' % (cs, why))
397       print('warning: ignoring config section [%s] (%s)' % (cs, why),
398             file=sys.stderr)
399
400     if cs == 'LIMIT' or cs == 'COMMON':
401       # plan A "[LIMIT]" or "[COMMON]"
402       log_debug_config('putatives: section [%s] A ignore' % (cs))
403       continue
404
405     try:
406       # plan B "[<client>]" part 1
407       ci = ipaddr(cs)
408     except AddressValueError:
409
410       if server_re.fullmatch(cs):
411         # plan C "[<servername>]"
412         log_debug_config('putatives: section [%s] C <server>' % (cs))
413         putative(servers, cs, cs)
414         continue
415
416       if serverclient_re.fullmatch(cs):
417         # plan D "[<servername> <client>]" part 1
418         (pss,pcs) = cs.split(' ')
419
420         if pcs == 'LIMIT':
421           # plan E "[<servername> LIMIT]"
422           log_debug_config('putatives: section [%s] E <server> LIMIT' % (cs))
423           continue
424
425         try:
426           # plan D "[<servername> <client>]" part 2
427           ci = ipaddr(pc)
428         except AddressValueError:
429           # plan F "[<some thing we do not understand>]"
430           log_ignore('bad-addr')
431           continue
432
433         else: # no AddressValueError
434           # plan D "[<servername> <client>]" part 3
435           log_debug_config('putatives: section [%s] D <server> <client>'
436                            % (cs))
437           putative(clients, ci, pcs)
438           putative(servers, pss, pss)
439           continue
440
441     else: # no AddressValueError
442       # plan B "[<client>" part 2
443       log_debug_config('putatives: section [%s] B <client>' % (cs))
444       putative(clients, ci, cs)
445       continue
446
447   return (servers, clients)
448
449 def cfg_process_general(c, ss):
450   c.mtu = cfg1getint(ss, 'mtu')
451
452 def cfg_process_saddrs(c, ss):
453   class ServerAddr():
454     def __init__(self, port, addrspec):
455       self.port = port
456       # also self.addr
457       try:
458         self.addr = ipaddress.IPv4Address(addrspec)
459         self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
460         self._inurl = b'%s'
461       except AddressValueError:
462         self.addr = ipaddress.IPv6Address(addrspec)
463         self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
464         self._inurl = b'[%s]'
465     def make_endpoint(self):
466       return self._endpointfactory(reactor, self.port,
467                                    interface= '%s' % self.addr)
468     def url(self):
469       url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
470       if self.port != 80: url += b':%d' % self.port
471       url += b'/'
472       return url
473     def __repr__(self):
474       return 'ServerAddr'+repr((self.port,self.addr))
475
476   c.port = cfg1getint(ss,'port')
477   c.saddrs = [ ]
478   for addrspec in cfg1get(ss, 'addrs').split():
479     sa = ServerAddr(c.port, addrspec)
480     c.saddrs.append(sa)
481
482 def cfg_process_vnetwork(c, ss):
483   c.vnetwork = ipnetwork(cfg1get(ss,'vnetwork'))
484   if c.vnetwork.num_addresses < 3 + 2:
485     raise ValueError('vnetwork needs at least 2^3 addresses')
486
487 def cfg_process_vaddr(c, ss):
488   try:
489     c.vaddr = cfg1get(ss,'vaddr')
490   except NoOptionError:
491     cfg_process_vnetwork(c, ss)
492     c.vaddr = next(c.vnetwork.hosts())
493
494 def cfg_search_section(key,sections):
495   for section in sections:
496     if cfg.has_option(section, key):
497       return section
498   raise NoOptionError(key, repr(sections))
499
500 def cfg_get_raw(*args, **kwargs):
501   # for passing to cfg_search
502   return cfg.get(*args, raw=True, **kwargs)
503
504 def cfg_search(getter,key,sections):
505   section = cfg_search_section(key,sections)
506   return getter(section, key)
507
508 def cfg1get(section,key, getter=cfg.get,**kwargs):
509   section = cfg_search_section(key,[section,'COMMON'])
510   return getter(section,key,**kwargs)
511
512 def cfg1getint(section,key, **kwargs):
513   return cfg1get(section,key, getter=cfg.getint,**kwargs);
514
515 def cfg_process_client_limited(cc,ss,sections,key):
516   val = cfg_search(cfg1getint, key, sections)
517   lim = cfg_search(cfg1getint, key, ['%s LIMIT' % ss, 'LIMIT'])
518   cc.__dict__[key] = min(val,lim)
519
520 def cfg_process_client_common(cc,ss,cs,ci):
521   # returns sections to search in, iff password is defined, otherwise None
522   cc.ci = ci
523
524   sections = ['%s %s' % (ss,cs),
525               cs,
526               ss,
527               'COMMON']
528
529   try: pwsection = cfg_search_section('password', sections)
530   except NoOptionError: return None
531     
532   pw = cfg1get(pwsection, 'password')
533   cc.password = pw.encode('utf-8')
534
535   cfg_process_client_limited(cc,ss,sections,'target_requests_outstanding')
536   cfg_process_client_limited(cc,ss,sections,'http_timeout')
537
538   return sections
539
540 def cfg_process_ipif(c, sections, varmap):
541   for d, s in varmap:
542     try: v = getattr(c, s)
543     except AttributeError: continue
544     setattr(c, d, v)
545
546   #print('CFGIPIF',repr((varmap, sections, c.__dict__)),file=sys.stderr)
547
548   section = cfg_search_section('ipif', sections)
549   c.ipif_command = cfg1get(section,'ipif', vars=c.__dict__)
550
551 #---------- startup ----------
552
553 def log_debug_config(m):
554   if not DBG.CONFIG in debug_set: return
555   print('DBG.CONFIG:', m)
556
557 def common_startup(process_cfg):
558   # calls process_cfg(putative_clients, putative_servers)
559
560   # ConfigParser hates #-comments after values
561   trailingcomments_re = regexp.compile(r'#.*')
562   cfg.read_string(trailingcomments_re.sub('', defcfg))
563   need_defcfg = True
564
565   def readconfig(pathname, mandatory=True):
566     def log(m, p=pathname):
567       if not DBG.CONFIG in debug_set: return
568       log_debug_config('%s: %s' % (m, p))
569
570     try:
571       files = os.listdir(pathname)
572
573     except FileNotFoundError:
574       if mandatory: raise
575       log('skipped')
576       return
577
578     except NotADirectoryError:
579       cfg.read(pathname)
580       log('read file')
581       return
582
583     # is a directory
584     log('directory')
585     re = regexp.compile('[^-A-Za-z0-9_]')
586     for f in os.listdir(pathname):
587       if re.search(f): continue
588       subpath = pathname + '/' + f
589       try:
590         os.stat(subpath)
591       except FileNotFoundError:
592         log('entry skipped', subpath)
593         continue
594       cfg.read(subpath)
595       log('entry read', subpath)
596       
597   def oc_config(od,os, value, op):
598     nonlocal need_defcfg
599     need_defcfg = False
600     readconfig(value)
601
602   def oc_extra_config(od,os, value, op):
603     readconfig(value)
604
605   def read_defconfig():
606     readconfig('/etc/hippotat/config.d', False)
607     readconfig('/etc/hippotat/passwords.d', False)
608     readconfig('/etc/hippotat/master.cfg',   False)
609
610   def oc_defconfig(od,os, value, op):
611     nonlocal need_defcfg
612     need_defcfg = False
613     read_defconfig(value)
614
615   def dfs_less_detailed(dl):
616     return [df for df in DBG.iterconstants() if df <= dl]
617
618   def ds_default(od,os,dl,op):
619     global debug_set
620     debug_set.clear
621     debug_set |= set(dfs_less_detailed(debug_def_detail))
622
623   def ds_select(od,os, spec, op):
624     for it in spec.split(','):
625
626       if it.startswith('-'):
627         mutator = debug_set.discard
628         it = it[1:]
629       else:
630         mutator = debug_set.add
631
632       if it == '+':
633         dfs = DBG.iterconstants()
634
635       else:
636         if it.endswith('+'):
637           mapper = dfs_less_detailed
638           it = it[0:len(it)-1]
639         else:
640           mapper = lambda x: [x]
641
642           try:
643             dfspec = DBG.lookupByName(it)
644           except ValueError:
645             optparser.error('unknown debug flag %s in --debug-select' % it)
646
647         dfs = mapper(dfspec)
648
649       for df in dfs:
650         mutator(df)
651
652   optparser.add_option('-D', '--debug',
653                        nargs=0,
654                        action='callback',
655                        help='enable default debug (to stdout)',
656                        callback= ds_default)
657
658   optparser.add_option('--debug-select',
659                        nargs=1,
660                        type='string',
661                        metavar='[-]DFLAG[+]|[-]+,...',
662                        help=
663 '''enable (`-': disable) each specified DFLAG;
664 `+': do same for all "more interesting" DFLAGSs;
665 just `+': all DFLAGs.
666   DFLAGS: ''' + ' '.join([df.name for df in DBG.iterconstants()]),
667                        action='callback',
668                        callback= ds_select)
669
670   optparser.add_option('-c', '--config',
671                        nargs=1,
672                        type='string',
673                        metavar='CONFIGFILE',
674                        dest='configfile',
675                        action='callback',
676                        callback= oc_config)
677
678   optparser.add_option('--extra-config',
679                        nargs=1,
680                        type='string',
681                        metavar='CONFIGFILE',
682                        dest='configfile',
683                        action='callback',
684                        callback= oc_extra_config)
685
686   optparser.add_option('--default-config',
687                        action='callback',
688                        callback= oc_defconfig)
689
690   (opts, args) = optparser.parse_args()
691   if len(args): optparser.error('no non-option arguments please')
692
693   if need_defcfg:
694     read_defconfig()
695
696   try:
697     (pss, pcs) = _cfg_process_putatives()
698     process_cfg(opts, pss, pcs)
699   except (configparser.Error, ValueError):
700     traceback.print_exc(file=sys.stderr)
701     print('\nInvalid configuration, giving up.', file=sys.stderr)
702     sys.exit(12)
703
704
705   #print('X', debug_set, file=sys.stderr)
706
707   log_formatter = twisted.logger.formatEventAsClassicLogText
708   stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
709   stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
710   pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
711   stdsomething_obs = twisted.logger.FilteringLogObserver(
712     stderr_obs, [pred], stdout_obs
713   )
714   global file_log_observer
715   file_log_observer = twisted.logger.FilteringLogObserver(
716     stdsomething_obs, [LogNotBoringTwisted()]
717   )
718   #log_observer = stdsomething_obs
719   twisted.logger.globalLogBeginner.beginLoggingTo(
720     [ file_log_observer, crash_on_critical ]
721     )
722
723 def common_run():
724   log_debug(DBG.INIT, 'entering reactor')
725   if not _crashing: reactor.run()
726   print('ENDED', file=sys.stderr)
727   sys.exit(16)