4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from twisted.internet import reactor
10 import twisted.internet.endpoints
12 from twisted.logger import LogLevel
13 import twisted.python.constants
14 from twisted.python.constants import NamedConstant
17 from ipaddress import AddressValueError
19 from optparse import OptionParser
20 from configparser import ConfigParser
21 from configparser import NoOptionError
23 from functools import partial
32 import hippotat.slip as slip
34 class DBG(twisted.python.constants.Names):
35 INIT = NamedConstant()
36 ROUTE = NamedConstant()
37 DROP = NamedConstant()
38 FLOW = NamedConstant()
39 HTTP = NamedConstant()
40 TWISTED = NamedConstant()
41 QUEUE = NamedConstant()
42 HTTP_CTRL = NamedConstant()
43 QUEUE_CTRL = NamedConstant()
44 HTTP_FULL = NamedConstant()
45 CTRL_DUMP = NamedConstant()
46 SLIP_FULL = NamedConstant()
48 _hex_codec = codecs.getencoder('hex_codec')
50 log = twisted.logger.Logger()
52 def log_debug(dflag, msg, idof=None, d=None):
53 if dflag > DBG.HTTP: return
54 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
56 msg = '[%#x] %s' % (id(idof), msg)
59 d = _hex_codec(d)[0].decode('ascii')
61 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
66 max_batch_down = 65536 # used by server, subject to [limits]
67 max_queue_time = 10 # used by server, subject to [limits]
68 target_requests_outstanding = 3 # must match; subject to [limits] on server
69 http_timeout = 30 # used by both } must be
70 http_timeout_grace = 5 # used by both } compatible
71 max_requests_outstanding = 4 # used by client
72 max_batch_up = 4000 # used by client
73 http_retry = 5 # used by client
75 #[server] or [<client>] overrides
76 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
77 # extra interpolations: %(local)s %(peer)s %(rnet)s
78 # obtained on server [virtual]server [virtual]relay [virtual]network
79 # from on client <client> [virtual]server [virtual]routes
84 # network = <prefix>/<len> # mandatory for server
85 # server = <ipaddr> # used by both, default is computed from `network'
86 # relay = <ipaddr> # used by server, default from `network' and `server'
87 # default server is first host in network
88 # default relay is first host which is not server
91 # addrs = 127.0.0.1 ::1 # mandatory for server
92 port = 80 # used by server
93 # url # used by client; default from first `addrs' and `port'
95 # [<client-ip4-or-ipv6-address>]
96 # password = <password> # used by both, must match
99 max_batch_down = 262144 # used by server
100 max_queue_time = 121 # used by server
101 http_timeout = 121 # used by server
102 target_requests_outstanding = 10 # used by server
105 # these need to be defined here so that they can be imported by import *
107 optparser = OptionParser()
109 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
110 def mime_translate(s):
111 # SLIP-encoded packets cannot contain ESC ESC.
112 # Swap `-' and ESC. The result cannot contain `--'
113 return s.translate(_mimetrans)
116 def __init__(self, d = { }):
119 return 'ConfigResults('+repr(self.__dict__)+')'
123 def log_discard(packet, iface, saddr, daddr, why):
125 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
128 #---------- packet parsing ----------
130 def packet_addrs(packet):
131 version = packet[0] >> 4
135 factory = ipaddress.IPv4Address
139 factory = ipaddress.IPv6Address
141 raise ValueError('unsupported IP version %d' % version)
142 saddr = factory(packet[ saddroff : saddroff + addrlen ])
143 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
144 return (saddr, daddr)
146 #---------- address handling ----------
150 r = ipaddress.IPv4Address(input)
151 except AddressValueError:
152 r = ipaddress.IPv6Address(input)
155 def ipnetwork(input):
157 r = ipaddress.IPv4Network(input)
158 except NetworkValueError:
159 r = ipaddress.IPv6Network(input)
162 #---------- ipif (SLIP) subprocess ----------
164 class SlipStreamDecoder():
165 def __init__(self, desc, on_packet):
167 self._on_packet = on_packet
169 self._log('__init__')
171 def _log(self, msg, **kwargs):
172 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
174 def inputdata(self, data):
175 self._log('inputdata', d=data)
176 packets = slip.decode(data)
177 packets[0] = self._buffer + packets[0]
178 self._buffer = packets.pop()
179 for packet in packets:
180 self._maybe_packet(packet)
181 self._log('bufremain', d=self._buffer)
183 def _maybe_packet(self, packet):
184 self._log('maybepacket', d=packet)
186 self._on_packet(packet)
190 self._maybe_packet(self._buffer)
193 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
194 def __init__(self, router):
195 self._router = router
196 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
197 def connectionMade(self): pass
198 def outReceived(self, data):
199 self._decoder.inputdata(data)
200 def slip_on_packet(self, packet):
201 (saddr, daddr) = packet_addrs(packet)
202 if saddr.is_link_local or daddr.is_link_local:
203 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
205 self._router(packet, saddr, daddr)
206 def processEnded(self, status):
207 status.raiseException()
209 def start_ipif(command, router):
211 ipif = _IpifProcessProtocol(router)
212 reactor.spawnProcess(ipif,
213 '/bin/sh',['sh','-xc', command],
214 childFDs={0:'w', 1:'r', 2:2},
217 def queue_inbound(packet):
218 log_debug(DBG.FLOW, "queue_inbound", d=packet)
219 ipif.transport.write(slip.delimiter)
220 ipif.transport.write(slip.encode(packet))
221 ipif.transport.write(slip.delimiter)
223 #---------- packet queue ----------
226 def __init__(self, desc, max_queue_time):
229 self._max_queue_time = max_queue_time
230 self._pq = collections.deque() # packets
232 def _log(self, dflag, msg, **kwargs):
233 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
235 def append(self, packet):
236 self._log(DBG.QUEUE, 'append', d=packet)
237 self._pq.append((time.monotonic(), packet))
240 self._log(DBG.QUEUE, 'nonempty ?')
242 try: (queuetime, packet) = self._pq[0]
244 self._log(DBG.QUEUE, 'nonempty ? empty.')
247 age = time.monotonic() - queuetime
248 if age > self._max_queue_time:
249 # strip old packets off the front
250 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
254 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
257 def process(self, sizequery, moredata, max_batch):
258 # sizequery() should return size of batch so far
259 # moredata(s) should add s to batch
260 self._log(DBG.QUEUE, 'process...')
262 try: (dummy, packet) = self._pq[0]
264 self._log(DBG.QUEUE, 'process... empty')
267 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
269 encoded = slip.encode(packet)
272 self._log(DBG.QUEUE_CTRL,
273 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
277 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
278 self._log(DBG.QUEUE_CTRL, 'process... overflow')
280 moredata(slip.delimiter)
285 #---------- error handling ----------
292 print('========== CRASH ==========', err,
293 '===========================', file=sys.stderr)
295 except twisted.internet.error.ReactorNotRunning: pass
297 def crash_on_defer(defer):
298 defer.addErrback(lambda err: crash(err))
300 def crash_on_critical(event):
301 if event.get('log_level') >= LogLevel.critical:
302 crash(twisted.logger.formatEvent(event))
304 #---------- config processing ----------
306 def process_cfg_common_always():
308 c.mtu = cfg.get('virtual','mtu')
310 def process_cfg_ipif(section, varmap):
312 try: v = getattr(c, s)
313 except AttributeError: continue
318 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
320 def process_cfg_network():
321 c.network = ipnetwork(cfg.get('virtual','network'))
322 if c.network.num_addresses < 3 + 2:
323 raise ValueError('network needs at least 2^3 addresses')
325 def process_cfg_server():
327 c.server = cfg.get('virtual','server')
328 except NoOptionError:
329 process_cfg_network()
330 c.server = next(c.network.hosts())
333 def __init__(self, port, addrspec):
337 self.addr = ipaddress.IPv4Address(addrspec)
338 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
340 except AddressValueError:
341 self.addr = ipaddress.IPv6Address(addrspec)
342 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
343 self._inurl = b'[%s]'
344 def make_endpoint(self):
345 return self._endpointfactory(reactor, self.port, self.addr)
347 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
348 if self.port != 80: url += b':%d' % self.port
352 def process_cfg_saddrs():
353 try: port = cfg.getint('server','port')
354 except NoOptionError: port = 80
357 for addrspec in cfg.get('server','addrs').split():
358 sa = ServerAddr(port, addrspec)
361 def process_cfg_clients(constructor):
363 for cs in cfg.sections():
364 if not (':' in cs or '.' in cs): continue
366 pw = cfg.get(cs, 'password')
367 pw = pw.encode('utf-8')
368 constructor(ci,cs,pw)
370 #---------- startup ----------
372 def common_startup():
373 log_formatter = twisted.logger.formatEventAsClassicLogText
374 stdout_obs = twisted.logger.FileLogObserver(sys.stdout, log_formatter)
375 stderr_obs = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
376 pred = twisted.logger.LogLevelFilterPredicate(LogLevel.error)
377 log_observer = twisted.logger.FilteringLogObserver(
378 stderr_obs, [pred], stdout_obs
380 twisted.logger.globalLogBeginner.beginLoggingTo(
381 [ log_observer, crash_on_critical ]
384 optparser.add_option('-c', '--config', dest='configfile',
385 default='/etc/hippotat/config')
386 (opts, args) = optparser.parse_args()
387 if len(args): optparser.error('no non-option arguments please')
389 re = regexp.compile('#.*')
390 cfg.read_string(re.sub('', defcfg))
391 cfg.read(opts.configfile)
394 log_debug(DBG.INIT, 'entering reactor')
395 if not _crashing: reactor.run()
396 print('CRASHED (end)', file=sys.stderr)