4 signal.signal(signal.SIGINT, signal.SIG_DFL)
8 from zope.interface import implementer
11 from twisted.internet import reactor
12 import twisted.internet.endpoints
14 from twisted.logger import LogLevel
15 import twisted.python.constants
16 from twisted.python.constants import NamedConstant
19 from ipaddress import AddressValueError
21 from optparse import OptionParser
22 from configparser import ConfigParser
23 from configparser import NoOptionError
25 from functools import partial
34 import hippotat.slip as slip
36 class DBG(twisted.python.constants.Names):
37 INIT = NamedConstant()
38 ROUTE = NamedConstant()
39 DROP = NamedConstant()
40 FLOW = NamedConstant()
41 HTTP = NamedConstant()
42 TWISTED = NamedConstant()
43 QUEUE = NamedConstant()
44 HTTP_CTRL = NamedConstant()
45 QUEUE_CTRL = NamedConstant()
46 HTTP_FULL = NamedConstant()
47 CTRL_DUMP = NamedConstant()
48 SLIP_FULL = NamedConstant()
50 _hex_codec = codecs.getencoder('hex_codec')
52 #---------- logging ----------
54 org_stderr = sys.stderr
56 log = twisted.logger.Logger()
59 debug_def_detail = DBG.HTTP
61 def log_debug(dflag, msg, idof=None, d=None):
62 if dflag not in debug_set: return
63 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
65 msg = '[%#x] %s' % (id(idof), msg)
68 d = _hex_codec(d)[0].decode('ascii')
70 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
72 @implementer(twisted.logger.ILogFilterPredicate)
73 class LogNotBoringTwisted:
74 def __call__(self, event):
75 yes = twisted.logger.PredicateResult.yes
76 no = twisted.logger.PredicateResult.no
79 if event.get('log_level') != LogLevel.info:
82 dflag = event.get('dflag')
85 return yes if (dflag in debug_set) else no
87 print(traceback.format_exc(), file=org_stderr)
90 #---------- default config ----------
95 max_batch_down = 65536 # used by server, subject to [limits]
96 max_queue_time = 10 # used by server, subject to [limits]
97 target_requests_outstanding = 3 # must match; subject to [limits] on server
98 http_timeout = 30 # used by both } must be
99 http_timeout_grace = 5 # used by both } compatible
100 max_requests_outstanding = 4 # used by client
101 max_batch_up = 4000 # used by client
102 http_retry = 5 # used by client
104 #[server] or [<client>] overrides
105 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
106 # extra interpolations: %(local)s %(peer)s %(rnet)s
107 # obtained on server [virtual]server [virtual]relay [virtual]network
108 # from on client <client> [virtual]server [virtual]routes
113 # network = <prefix>/<len> # mandatory for server
114 # server = <ipaddr> # used by both, default is computed from `network'
115 # relay = <ipaddr> # used by server, default from `network' and `server'
116 # default server is first host in network
117 # default relay is first host which is not server
120 # addrs = 127.0.0.1 ::1 # mandatory for server
121 port = 80 # used by server
122 # url # used by client; default from first `addrs' and `port'
124 # [<client-ip4-or-ipv6-address>]
125 # password = <password> # used by both, must match
128 max_batch_down = 262144 # used by server
129 max_queue_time = 121 # used by server
130 http_timeout = 121 # used by server
131 target_requests_outstanding = 10 # used by server
134 # these need to be defined here so that they can be imported by import *
136 optparser = OptionParser()
138 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
139 def mime_translate(s):
140 # SLIP-encoded packets cannot contain ESC ESC.
141 # Swap `-' and ESC. The result cannot contain `--'
142 return s.translate(_mimetrans)
145 def __init__(self, d = { }):
148 return 'ConfigResults('+repr(self.__dict__)+')'
152 def log_discard(packet, iface, saddr, daddr, why):
154 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
157 #---------- packet parsing ----------
159 def packet_addrs(packet):
160 version = packet[0] >> 4
164 factory = ipaddress.IPv4Address
168 factory = ipaddress.IPv6Address
170 raise ValueError('unsupported IP version %d' % version)
171 saddr = factory(packet[ saddroff : saddroff + addrlen ])
172 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
173 return (saddr, daddr)
175 #---------- address handling ----------
179 r = ipaddress.IPv4Address(input)
180 except AddressValueError:
181 r = ipaddress.IPv6Address(input)
184 def ipnetwork(input):
186 r = ipaddress.IPv4Network(input)
187 except NetworkValueError:
188 r = ipaddress.IPv6Network(input)
191 #---------- ipif (SLIP) subprocess ----------
193 class SlipStreamDecoder():
194 def __init__(self, desc, on_packet):
196 self._on_packet = on_packet
198 self._log('__init__')
200 def _log(self, msg, **kwargs):
201 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
203 def inputdata(self, data):
204 self._log('inputdata', d=data)
205 packets = slip.decode(data)
206 packets[0] = self._buffer + packets[0]
207 self._buffer = packets.pop()
208 for packet in packets:
209 self._maybe_packet(packet)
210 self._log('bufremain', d=self._buffer)
212 def _maybe_packet(self, packet):
213 self._log('maybepacket', d=packet)
215 self._on_packet(packet)
219 self._maybe_packet(self._buffer)
222 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
223 def __init__(self, router):
224 self._router = router
225 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
226 def connectionMade(self): pass
227 def outReceived(self, data):
228 self._decoder.inputdata(data)
229 def slip_on_packet(self, packet):
230 (saddr, daddr) = packet_addrs(packet)
231 if saddr.is_link_local or daddr.is_link_local:
232 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
234 self._router(packet, saddr, daddr)
235 def processEnded(self, status):
236 status.raiseException()
238 def start_ipif(command, router):
240 ipif = _IpifProcessProtocol(router)
241 reactor.spawnProcess(ipif,
242 '/bin/sh',['sh','-xc', command],
243 childFDs={0:'w', 1:'r', 2:2},
246 def queue_inbound(packet):
247 log_debug(DBG.FLOW, "queue_inbound", d=packet)
248 ipif.transport.write(slip.delimiter)
249 ipif.transport.write(slip.encode(packet))
250 ipif.transport.write(slip.delimiter)
252 #---------- packet queue ----------
255 def __init__(self, desc, max_queue_time):
258 self._max_queue_time = max_queue_time
259 self._pq = collections.deque() # packets
261 def _log(self, dflag, msg, **kwargs):
262 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
264 def append(self, packet):
265 self._log(DBG.QUEUE, 'append', d=packet)
266 self._pq.append((time.monotonic(), packet))
269 self._log(DBG.QUEUE, 'nonempty ?')
271 try: (queuetime, packet) = self._pq[0]
273 self._log(DBG.QUEUE, 'nonempty ? empty.')
276 age = time.monotonic() - queuetime
277 if age > self._max_queue_time:
278 # strip old packets off the front
279 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
283 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
286 def process(self, sizequery, moredata, max_batch):
287 # sizequery() should return size of batch so far
288 # moredata(s) should add s to batch
289 self._log(DBG.QUEUE, 'process...')
291 try: (dummy, packet) = self._pq[0]
293 self._log(DBG.QUEUE, 'process... empty')
296 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
298 encoded = slip.encode(packet)
301 self._log(DBG.QUEUE_CTRL,
302 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
306 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
307 self._log(DBG.QUEUE_CTRL, 'process... overflow')
309 moredata(slip.delimiter)
314 #---------- error handling ----------
321 print('========== CRASH ==========', err,
322 '===========================', file=sys.stderr)
324 except twisted.internet.error.ReactorNotRunning: pass
326 def crash_on_defer(defer):
327 defer.addErrback(lambda err: crash(err))
329 def crash_on_critical(event):
330 if event.get('log_level') >= LogLevel.critical:
331 crash(twisted.logger.formatEvent(event))
333 #---------- config processing ----------
335 def process_cfg_common_always():
337 c.mtu = cfg.get('virtual','mtu')
339 def process_cfg_ipif(section, varmap):
341 try: v = getattr(c, s)
342 except AttributeError: continue
347 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
349 def process_cfg_network():
350 c.network = ipnetwork(cfg.get('virtual','network'))
351 if c.network.num_addresses < 3 + 2:
352 raise ValueError('network needs at least 2^3 addresses')
354 def process_cfg_server():
356 c.server = cfg.get('virtual','server')
357 except NoOptionError:
358 process_cfg_network()
359 c.server = next(c.network.hosts())
362 def __init__(self, port, addrspec):
366 self.addr = ipaddress.IPv4Address(addrspec)
367 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
369 except AddressValueError:
370 self.addr = ipaddress.IPv6Address(addrspec)
371 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
372 self._inurl = b'[%s]'
373 def make_endpoint(self):
374 return self._endpointfactory(reactor, self.port, self.addr)
376 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
377 if self.port != 80: url += b':%d' % self.port
381 def process_cfg_saddrs():
382 try: port = cfg.getint('server','port')
383 except NoOptionError: port = 80
386 for addrspec in cfg.get('server','addrs').split():
387 sa = ServerAddr(port, addrspec)
390 def process_cfg_clients(constructor):
392 for cs in cfg.sections():
393 if not (':' in cs or '.' in cs): continue
395 pw = cfg.get(cs, 'password')
396 pw = pw.encode('utf-8')
397 constructor(ci,cs,pw)
399 #---------- startup ----------
401 def common_startup():
402 optparser.add_option('-c', '--config', dest='configfile',
403 default='/etc/hippotat/config')
405 def ds_by_detail(od,os,detail_level,op):
407 debug_set = set([df for df in DBG.iterconstants() if df <= detail_level])
409 def ds_one(mutator,df, od,os,value,op):
412 optparser.add_option('-D', '--debug',
413 default=debug_def_detail.name,
415 choices=[dl.name for dl in DBG.iterconstants()],
417 callback= ds_by_detail)
419 optparser.add_option('--no-debug',
422 callback= partial(ds_by_detail,DBG.INIT))
424 for df in DBG.iterconstants():
425 optparser.add_option('--debug-'+df.name,
427 callback= partial(ds_one, debug_set.add, df))
428 optparser.add_option('--no-debug-'+df.name,
430 callback= partial(ds_one, debug_set.discard, df))
432 (opts, args) = optparser.parse_args()
433 if len(args): optparser.error('no non-option arguments please')
435 re = regexp.compile('#.*')
436 cfg.read_string(re.sub('', defcfg))
437 cfg.read(opts.configfile)
439 log_formatter = twisted.logger.formatEventAsClassicLogText
440 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
441 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
442 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
443 stdsomething_obs = twisted.logger.FilteringLogObserver(
444 stderr_obs, [pred], stdout_obs
446 log_observer = twisted.logger.FilteringLogObserver(
447 stdsomething_obs, [LogNotBoringTwisted()]
449 #log_observer = stdsomething_obs
450 twisted.logger.globalLogBeginner.beginLoggingTo(
451 [ log_observer, crash_on_critical ]
455 log_debug(DBG.INIT, 'entering reactor')
456 if not _crashing: reactor.run()
457 print('CRASHED (end)', file=sys.stderr)