4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from twisted.internet import reactor
10 import twisted.internet.endpoints
12 from twisted.logger import LogLevel
13 import twisted.python.constants
14 from twisted.python.constants import NamedConstant
17 from ipaddress import AddressValueError
19 from optparse import OptionParser
20 from configparser import ConfigParser
21 from configparser import NoOptionError
30 import hippotat.slip as slip
32 class DBG(twisted.python.constants.Names):
33 ROUTE = NamedConstant()
34 DROP = NamedConstant()
35 FLOW = NamedConstant()
36 HTTP = NamedConstant()
37 HTTP_CTRL = NamedConstant()
38 INIT = NamedConstant()
39 QUEUE = NamedConstant()
40 QUEUE_CTRL = NamedConstant()
41 HTTP_FULL = NamedConstant()
43 _hex_codec = codecs.getencoder('hex_codec')
45 log = twisted.logger.Logger()
47 def log_debug(dflag, msg, idof=None, d=None):
48 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
50 msg = '[%d] %s' % (id(idof), msg)
53 d = _hex_codec(d)[0].decode('ascii')
55 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
60 max_batch_down = 65536 # used by server, subject to [limits]
61 max_queue_time = 10 # used by server, subject to [limits]
62 max_request_time = 54 # used by server, subject to [limits]
63 target_requests_outstanding = 3 # must match; subject to [limits] on server
64 max_requests_outstanding = 4 # used by client
65 max_batch_up = 4000 # used by client
66 http_timeout = 30 # used by client
67 http_retry = 5 # used by client
69 #[server] or [<client>] overrides
70 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
71 # extra interpolations: %(local)s %(peer)s %(rnet)s
72 # obtained on server [virtual]server [virtual]relay [virtual]network
73 # from on client <client> [virtual]server [virtual]routes
78 # network = <prefix>/<len> # mandatory for server
79 # server = <ipaddr> # used by both, default is computed from `network'
80 # relay = <ipaddr> # used by server, default from `network' and `server'
81 # default server is first host in network
82 # default relay is first host which is not server
85 # addrs = 127.0.0.1 ::1 # mandatory for server
86 port = 80 # used by server
87 # url # used by client; default from first `addrs' and `port'
89 # [<client-ip4-or-ipv6-address>]
90 # password = <password> # used by both, must match
93 max_batch_down = 262144 # used by server
94 max_queue_time = 121 # used by server
95 max_request_time = 121 # used by server
96 target_requests_outstanding = 10 # used by server
99 # these need to be defined here so that they can be imported by import *
101 optparser = OptionParser()
103 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
104 def mime_translate(s):
105 # SLIP-encoded packets cannot contain ESC ESC.
106 # Swap `-' and ESC. The result cannot contain `--'
107 return s.translate(_mimetrans)
110 def __init__(self, d = { }):
113 return 'ConfigResults('+repr(self.__dict__)+')'
117 def log_discard(packet, iface, saddr, daddr, why):
119 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
122 #---------- packet parsing ----------
124 def packet_addrs(packet):
125 version = packet[0] >> 4
129 factory = ipaddress.IPv4Address
133 factory = ipaddress.IPv6Address
135 raise ValueError('unsupported IP version %d' % version)
136 saddr = factory(packet[ saddroff : saddroff + addrlen ])
137 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
138 return (saddr, daddr)
140 #---------- address handling ----------
144 r = ipaddress.IPv4Address(input)
145 except AddressValueError:
146 r = ipaddress.IPv6Address(input)
149 def ipnetwork(input):
151 r = ipaddress.IPv4Network(input)
152 except NetworkValueError:
153 r = ipaddress.IPv6Network(input)
156 #---------- ipif (SLIP) subprocess ----------
158 class SlipStreamDecoder():
159 def __init__(self, on_packet):
160 # we will call packet(<packet>)
162 self._on_packet = on_packet
164 def inputdata(self, data):
165 #print('SLIP-GOT ', repr(data))
166 data = self._buffer + data
168 packets = slip.decode(data)
169 self._buffer = packets.pop()
170 for packet in packets:
171 self._maybe_packet(packet)
173 def _maybe_packet(self, packet):
175 self._on_packet(packet)
178 self._maybe_packet(self._buffer)
181 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
182 def __init__(self, router):
183 self._router = router
184 self._decoder = SlipStreamDecoder(self.slip_on_packet)
185 def connectionMade(self): pass
186 def outReceived(self, data):
187 self._decoder.inputdata(data)
188 def slip_on_packet(self, packet):
189 (saddr, daddr) = packet_addrs(packet)
190 if saddr.is_link_local or daddr.is_link_local:
191 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
193 self._router(packet, saddr, daddr)
194 def processEnded(self, status):
195 status.raiseException()
197 def start_ipif(command, router):
199 ipif = _IpifProcessProtocol(router)
200 reactor.spawnProcess(ipif,
201 '/bin/sh',['sh','-xc', command],
202 childFDs={0:'w', 1:'r', 2:2},
205 def queue_inbound(packet):
206 log_debug(DBG.FLOW, "queue_inbound", d=packet)
207 ipif.transport.write(slip.delimiter)
208 ipif.transport.write(slip.encode(packet))
209 ipif.transport.write(slip.delimiter)
211 #---------- packet queue ----------
214 def __init__(self, desc, max_queue_time):
217 self._max_queue_time = max_queue_time
218 self._pq = collections.deque() # packets
220 def _log(self, dflag, msg, **kwargs):
221 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
223 def append(self, packet):
224 self._log(DBG.QUEUE, 'append', d=packet)
225 self._pq.append((time.monotonic(), packet))
228 self._log(DBG.QUEUE, 'nonempty ?')
230 try: (queuetime, packet) = self._pq[0]
232 self._log(DBG.QUEUE, 'nonempty ? empty.')
235 age = time.monotonic() - queuetime
236 if age > self._max_queue_time:
237 # strip old packets off the front
238 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
242 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
245 def process(self, sizequery, moredata, max_batch):
246 # sizequery() should return size of batch so far
247 # moredata(s) should add s to batch
248 self._log(DBG.QUEUE, 'process...')
250 try: (dummy, packet) = self._pq[0]
252 self._log(DBG.QUEUE, 'process... empty')
255 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
257 encoded = slip.encode(packet)
260 self._log(DBG.QUEUE_CTRL,
261 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
265 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
266 self._log(DBG.QUEUE_CTRL, 'process... overflow')
268 moredata(slip.delimiter)
273 #---------- error handling ----------
280 print('CRASH ', err, file=sys.stderr)
282 except twisted.internet.error.ReactorNotRunning: pass
284 def crash_on_defer(defer):
285 defer.addErrback(lambda err: crash(err))
287 def crash_on_critical(event):
288 if event.get('log_level') >= LogLevel.critical:
289 crash(twisted.logger.formatEvent(event))
291 #---------- config processing ----------
293 def process_cfg_common_always():
295 c.mtu = cfg.get('virtual','mtu')
297 def process_cfg_ipif(section, varmap):
299 try: v = getattr(c, s)
300 except AttributeError: continue
305 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
307 def process_cfg_network():
308 c.network = ipnetwork(cfg.get('virtual','network'))
309 if c.network.num_addresses < 3 + 2:
310 raise ValueError('network needs at least 2^3 addresses')
312 def process_cfg_server():
314 c.server = cfg.get('virtual','server')
315 except NoOptionError:
316 process_cfg_network()
317 c.server = next(c.network.hosts())
320 def __init__(self, port, addrspec):
324 self.addr = ipaddress.IPv4Address(addrspec)
325 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
327 except AddressValueError:
328 self.addr = ipaddress.IPv6Address(addrspec)
329 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
330 self._inurl = b'[%s]'
331 def make_endpoint(self):
332 return self._endpointfactory(reactor, self.port, self.addr)
334 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
335 if self.port != 80: url += b':%d' % self.port
339 def process_cfg_saddrs():
340 try: port = cfg.getint('server','port')
341 except NoOptionError: port = 80
344 for addrspec in cfg.get('server','addrs').split():
345 sa = ServerAddr(port, addrspec)
348 def process_cfg_clients(constructor):
350 for cs in cfg.sections():
351 if not (':' in cs or '.' in cs): continue
353 pw = cfg.get(cs, 'password')
354 pw = pw.encode('utf-8')
355 constructor(ci,cs,pw)
357 #---------- startup ----------
359 def common_startup():
360 log_formatter = twisted.logger.formatEventAsClassicLogText
361 log_observer = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
362 twisted.logger.globalLogBeginner.beginLoggingTo(
363 [ log_observer, crash_on_critical ]
366 optparser.add_option('-c', '--config', dest='configfile',
367 default='/etc/hippotat/config')
368 (opts, args) = optparser.parse_args()
369 if len(args): optparser.error('no non-option arguments please')
371 re = regexp.compile('#.*')
372 cfg.read_string(re.sub('', defcfg))
373 cfg.read(opts.configfile)
376 log_debug(DBG.INIT, 'entering reactor')
377 if not _crashing: reactor.run()
378 print('CRASHED (end)', file=sys.stderr)