4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from twisted.internet import reactor
10 import twisted.internet.endpoints
12 from twisted.logger import LogLevel
13 import twisted.python.constants
14 from twisted.python.constants import NamedConstant
17 from ipaddress import AddressValueError
19 from optparse import OptionParser
20 from configparser import ConfigParser
21 from configparser import NoOptionError
23 from functools import partial
32 import hippotat.slip as slip
34 class DBG(twisted.python.constants.Names):
35 ROUTE = NamedConstant()
36 DROP = NamedConstant()
37 FLOW = NamedConstant()
38 HTTP = NamedConstant()
39 HTTP_CTRL = NamedConstant()
40 INIT = NamedConstant()
41 QUEUE = NamedConstant()
42 QUEUE_CTRL = NamedConstant()
43 HTTP_FULL = NamedConstant()
44 SLIP_FULL = NamedConstant()
45 CTRL_DUMP = NamedConstant()
47 _hex_codec = codecs.getencoder('hex_codec')
49 log = twisted.logger.Logger()
51 def log_debug(dflag, msg, idof=None, d=None):
52 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
54 msg = '[%#x] %s' % (id(idof), msg)
57 d = _hex_codec(d)[0].decode('ascii')
59 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
64 max_batch_down = 65536 # used by server, subject to [limits]
65 max_queue_time = 10 # used by server, subject to [limits]
66 target_requests_outstanding = 3 # must match; subject to [limits] on server
67 http_timeout = 30 # used by both } must be
68 http_timeout_grace = 5 # used by both } compatible
69 max_requests_outstanding = 4 # used by client
70 max_batch_up = 4000 # used by client
71 http_retry = 5 # used by client
73 #[server] or [<client>] overrides
74 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
75 # extra interpolations: %(local)s %(peer)s %(rnet)s
76 # obtained on server [virtual]server [virtual]relay [virtual]network
77 # from on client <client> [virtual]server [virtual]routes
82 # network = <prefix>/<len> # mandatory for server
83 # server = <ipaddr> # used by both, default is computed from `network'
84 # relay = <ipaddr> # used by server, default from `network' and `server'
85 # default server is first host in network
86 # default relay is first host which is not server
89 # addrs = 127.0.0.1 ::1 # mandatory for server
90 port = 80 # used by server
91 # url # used by client; default from first `addrs' and `port'
93 # [<client-ip4-or-ipv6-address>]
94 # password = <password> # used by both, must match
97 max_batch_down = 262144 # used by server
98 max_queue_time = 121 # used by server
99 http_timeout = 121 # used by server
100 target_requests_outstanding = 10 # used by server
103 # these need to be defined here so that they can be imported by import *
105 optparser = OptionParser()
107 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
108 def mime_translate(s):
109 # SLIP-encoded packets cannot contain ESC ESC.
110 # Swap `-' and ESC. The result cannot contain `--'
111 return s.translate(_mimetrans)
114 def __init__(self, d = { }):
117 return 'ConfigResults('+repr(self.__dict__)+')'
121 def log_discard(packet, iface, saddr, daddr, why):
123 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
126 #---------- packet parsing ----------
128 def packet_addrs(packet):
129 version = packet[0] >> 4
133 factory = ipaddress.IPv4Address
137 factory = ipaddress.IPv6Address
139 raise ValueError('unsupported IP version %d' % version)
140 saddr = factory(packet[ saddroff : saddroff + addrlen ])
141 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
142 return (saddr, daddr)
144 #---------- address handling ----------
148 r = ipaddress.IPv4Address(input)
149 except AddressValueError:
150 r = ipaddress.IPv6Address(input)
153 def ipnetwork(input):
155 r = ipaddress.IPv4Network(input)
156 except NetworkValueError:
157 r = ipaddress.IPv6Network(input)
160 #---------- ipif (SLIP) subprocess ----------
162 class SlipStreamDecoder():
163 def __init__(self, desc, on_packet):
165 self._on_packet = on_packet
167 self._log('__init__')
169 def _log(self, msg, **kwargs):
170 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
172 def inputdata(self, data):
173 self._log('inputdata', d=data)
174 packets = slip.decode(data)
175 packets[0] = self._buffer + packets[0]
176 self._buffer = packets.pop()
177 for packet in packets:
178 self._maybe_packet(packet)
179 self._log('bufremain', d=self._buffer)
181 def _maybe_packet(self, packet):
182 self._log('maybepacket', d=packet)
184 self._on_packet(packet)
188 self._maybe_packet(self._buffer)
191 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
192 def __init__(self, router):
193 self._router = router
194 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
195 def connectionMade(self): pass
196 def outReceived(self, data):
197 self._decoder.inputdata(data)
198 def slip_on_packet(self, packet):
199 (saddr, daddr) = packet_addrs(packet)
200 if saddr.is_link_local or daddr.is_link_local:
201 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
203 self._router(packet, saddr, daddr)
204 def processEnded(self, status):
205 status.raiseException()
207 def start_ipif(command, router):
209 ipif = _IpifProcessProtocol(router)
210 reactor.spawnProcess(ipif,
211 '/bin/sh',['sh','-xc', command],
212 childFDs={0:'w', 1:'r', 2:2},
215 def queue_inbound(packet):
216 log_debug(DBG.FLOW, "queue_inbound", d=packet)
217 ipif.transport.write(slip.delimiter)
218 ipif.transport.write(slip.encode(packet))
219 ipif.transport.write(slip.delimiter)
221 #---------- packet queue ----------
224 def __init__(self, desc, max_queue_time):
227 self._max_queue_time = max_queue_time
228 self._pq = collections.deque() # packets
230 def _log(self, dflag, msg, **kwargs):
231 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
233 def append(self, packet):
234 self._log(DBG.QUEUE, 'append', d=packet)
235 self._pq.append((time.monotonic(), packet))
238 self._log(DBG.QUEUE, 'nonempty ?')
240 try: (queuetime, packet) = self._pq[0]
242 self._log(DBG.QUEUE, 'nonempty ? empty.')
245 age = time.monotonic() - queuetime
246 if age > self._max_queue_time:
247 # strip old packets off the front
248 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
252 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
255 def process(self, sizequery, moredata, max_batch):
256 # sizequery() should return size of batch so far
257 # moredata(s) should add s to batch
258 self._log(DBG.QUEUE, 'process...')
260 try: (dummy, packet) = self._pq[0]
262 self._log(DBG.QUEUE, 'process... empty')
265 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
267 encoded = slip.encode(packet)
270 self._log(DBG.QUEUE_CTRL,
271 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
275 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
276 self._log(DBG.QUEUE_CTRL, 'process... overflow')
278 moredata(slip.delimiter)
283 #---------- error handling ----------
290 print('========== CRASH ==========', err,
291 '===========================', file=sys.stderr)
293 except twisted.internet.error.ReactorNotRunning: pass
295 def crash_on_defer(defer):
296 defer.addErrback(lambda err: crash(err))
298 def crash_on_critical(event):
299 if event.get('log_level') >= LogLevel.critical:
300 crash(twisted.logger.formatEvent(event))
302 #---------- config processing ----------
304 def process_cfg_common_always():
306 c.mtu = cfg.get('virtual','mtu')
308 def process_cfg_ipif(section, varmap):
310 try: v = getattr(c, s)
311 except AttributeError: continue
316 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
318 def process_cfg_network():
319 c.network = ipnetwork(cfg.get('virtual','network'))
320 if c.network.num_addresses < 3 + 2:
321 raise ValueError('network needs at least 2^3 addresses')
323 def process_cfg_server():
325 c.server = cfg.get('virtual','server')
326 except NoOptionError:
327 process_cfg_network()
328 c.server = next(c.network.hosts())
331 def __init__(self, port, addrspec):
335 self.addr = ipaddress.IPv4Address(addrspec)
336 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
338 except AddressValueError:
339 self.addr = ipaddress.IPv6Address(addrspec)
340 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
341 self._inurl = b'[%s]'
342 def make_endpoint(self):
343 return self._endpointfactory(reactor, self.port, self.addr)
345 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
346 if self.port != 80: url += b':%d' % self.port
350 def process_cfg_saddrs():
351 try: port = cfg.getint('server','port')
352 except NoOptionError: port = 80
355 for addrspec in cfg.get('server','addrs').split():
356 sa = ServerAddr(port, addrspec)
359 def process_cfg_clients(constructor):
361 for cs in cfg.sections():
362 if not (':' in cs or '.' in cs): continue
364 pw = cfg.get(cs, 'password')
365 pw = pw.encode('utf-8')
366 constructor(ci,cs,pw)
368 #---------- startup ----------
370 def common_startup():
371 log_formatter = twisted.logger.formatEventAsClassicLogText
372 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
373 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
374 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
375 log_observer = twisted.logger.FilteringLogObserver(
376 stderr_obs, [pred], stdout_obs
378 twisted.logger.globalLogBeginner.beginLoggingTo(
379 [ log_observer, crash_on_critical ]
382 optparser.add_option('-c', '--config', dest='configfile',
383 default='/etc/hippotat/config')
384 (opts, args) = optparser.parse_args()
385 if len(args): optparser.error('no non-option arguments please')
387 re = regexp.compile('#.*')
388 cfg.read_string(re.sub('', defcfg))
389 cfg.read(opts.configfile)
392 log_debug(DBG.INIT, 'entering reactor')
393 if not _crashing: reactor.run()
394 print('CRASHED (end)', file=sys.stderr)