4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from twisted.internet import reactor
10 import twisted.internet.endpoints
12 from twisted.logger import LogLevel
13 import twisted.python.constants
14 from twisted.python.constants import NamedConstant
17 from ipaddress import AddressValueError
19 from optparse import OptionParser
20 from configparser import ConfigParser
21 from configparser import NoOptionError
29 import hippotat.slip as slip
31 class DBG(twisted.python.constants.Names):
32 ROUTE = NamedConstant()
33 DROP = NamedConstant()
34 FLOW = NamedConstant()
35 HTTP = NamedConstant()
36 HTTP_CTRL = NamedConstant()
37 INIT = NamedConstant()
38 QUEUE = NamedConstant()
39 QUEUE_CTRL = NamedConstant()
40 HTTP_FULL = NamedConstant()
42 _hex_codec = codecs.getencoder('hex_codec')
44 log = twisted.logger.Logger()
46 def log_debug(dflag, msg, idof=None, d=None):
47 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
49 msg = '[%d] %s' % (id(idof), msg)
52 d = _hex_codec(d)[0].decode('ascii')
54 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
59 max_batch_down = 65536 # used by server, subject to [limits]
60 max_queue_time = 10 # used by server, subject to [limits]
61 max_request_time = 54 # used by server, subject to [limits]
62 target_requests_outstanding = 3 # must match; subject to [limits] on server
63 max_requests_outstanding = 4 # used by client
64 max_batch_up = 4000 # used by client
65 http_timeout = 30 # used by client
66 http_retry = 5 # used by client
68 #[server] or [<client>] overrides
69 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
70 # extra interpolations: %(local)s %(peer)s %(rnet)s
71 # obtained on server [virtual]server [virtual]relay [virtual]network
72 # from on client <client> [virtual]server [virtual]routes
77 # network = <prefix>/<len> # mandatory for server
78 # server = <ipaddr> # used by both, default is computed from `network'
79 # relay = <ipaddr> # used by server, default from `network' and `server'
80 # default server is first host in network
81 # default relay is first host which is not server
84 # addrs = 127.0.0.1 ::1 # mandatory for server
85 port = 80 # used by server
86 # url # used by client; default from first `addrs' and `port'
88 # [<client-ip4-or-ipv6-address>]
89 # password = <password> # used by both, must match
92 max_batch_down = 262144 # used by server
93 max_queue_time = 121 # used by server
94 max_request_time = 121 # used by server
95 target_requests_outstanding = 10 # used by server
98 # these need to be defined here so that they can be imported by import *
100 optparser = OptionParser()
102 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
103 def mime_translate(s):
104 # SLIP-encoded packets cannot contain ESC ESC.
105 # Swap `-' and ESC. The result cannot contain `--'
106 return s.translate(_mimetrans)
109 def __init__(self, d = { }):
112 return 'ConfigResults('+repr(self.__dict__)+')'
116 def log_discard(packet, iface, saddr, daddr, why):
118 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
121 #---------- packet parsing ----------
123 def packet_addrs(packet):
124 version = packet[0] >> 4
128 factory = ipaddress.IPv4Address
132 factory = ipaddress.IPv6Address
134 raise ValueError('unsupported IP version %d' % version)
135 saddr = factory(packet[ saddroff : saddroff + addrlen ])
136 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
137 return (saddr, daddr)
139 #---------- address handling ----------
143 r = ipaddress.IPv4Address(input)
144 except AddressValueError:
145 r = ipaddress.IPv6Address(input)
148 def ipnetwork(input):
150 r = ipaddress.IPv4Network(input)
151 except NetworkValueError:
152 r = ipaddress.IPv6Network(input)
155 #---------- ipif (SLIP) subprocess ----------
157 class SlipStreamDecoder():
158 def __init__(self, on_packet):
159 # we will call packet(<packet>)
161 self._on_packet = on_packet
163 def inputdata(self, data):
164 #print('SLIP-GOT ', repr(data))
166 packets = slip.decode(self._buffer)
167 self._buffer = packets.pop()
168 for packet in packets:
169 self._maybe_packet(packet)
171 def _maybe_packet(self, packet):
173 self._on_packet(packet)
176 self._maybe_packet(self._buffer)
179 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
180 def __init__(self, router):
181 self._router = router
182 self._decoder = SlipStreamDecoder(self.slip_on_packet)
183 def connectionMade(self): pass
184 def outReceived(self, data):
185 self._decoder.inputdata(data)
186 def slip_on_packet(self, packet):
187 (saddr, daddr) = packet_addrs(packet)
188 if saddr.is_link_local or daddr.is_link_local:
189 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
191 self._router(packet, saddr, daddr)
192 def processEnded(self, status):
193 status.raiseException()
195 def start_ipif(command, router):
197 ipif = _IpifProcessProtocol(router)
198 reactor.spawnProcess(ipif,
199 '/bin/sh',['sh','-xc', command],
200 childFDs={0:'w', 1:'r', 2:2},
203 def queue_inbound(packet):
204 log_debug(DBG.FLOW, "queue_inbound", d=packet)
205 ipif.transport.write(slip.delimiter)
206 ipif.transport.write(slip.encode(packet))
207 ipif.transport.write(slip.delimiter)
209 #---------- packet queue ----------
212 def __init__(self, desc, max_queue_time):
215 self._max_queue_time = max_queue_time
216 self._pq = collections.deque() # packets
218 def _log(self, dflag, msg, **kwargs):
219 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
221 def append(self, packet):
222 self._log(DBG.QUEUE, 'append', d=packet)
223 self._pq.append((time.monotonic(), packet))
226 self._log(DBG.QUEUE, 'nonempty ?')
228 try: (queuetime, packet) = self._pq[0]
230 self._log(DBG.QUEUE, 'nonempty ? empty.')
233 age = time.monotonic() - queuetime
234 if age > self._max_queue_time:
235 # strip old packets off the front
236 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
240 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
243 def process(self, sizequery, moredata, max_batch):
244 # sizequery() should return size of batch so far
245 # moredata(s) should add s to batch
246 self._log(DBG.QUEUE, 'process...')
248 try: (dummy, packet) = self._pq[0]
250 self._log(DBG.QUEUE, 'process... empty')
253 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
255 encoded = slip.encode(packet)
258 self._log(DBG.QUEUE_CTRL,
259 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
263 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
264 self._log(DBG.QUEUE_CTRL, 'process... overflow')
266 moredata(slip.delimiter)
271 #---------- error handling ----------
278 print('CRASH ', err, file=sys.stderr)
280 except twisted.internet.error.ReactorNotRunning: pass
282 def crash_on_defer(defer):
283 defer.addErrback(lambda err: crash(err))
285 def crash_on_critical(event):
286 if event.get('log_level') >= LogLevel.critical:
287 crash(twisted.logger.formatEvent(event))
289 #---------- config processing ----------
291 def process_cfg_common_always():
293 c.mtu = cfg.get('virtual','mtu')
295 def process_cfg_ipif(section, varmap):
297 try: v = getattr(c, s)
298 except AttributeError: continue
303 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
305 def process_cfg_network():
306 c.network = ipnetwork(cfg.get('virtual','network'))
307 if c.network.num_addresses < 3 + 2:
308 raise ValueError('network needs at least 2^3 addresses')
310 def process_cfg_server():
312 c.server = cfg.get('virtual','server')
313 except NoOptionError:
314 process_cfg_network()
315 c.server = next(c.network.hosts())
318 def __init__(self, port, addrspec):
322 self.addr = ipaddress.IPv4Address(addrspec)
323 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
325 except AddressValueError:
326 self.addr = ipaddress.IPv6Address(addrspec)
327 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
328 self._inurl = b'[%s]'
329 def make_endpoint(self):
330 return self._endpointfactory(reactor, self.port, self.addr)
332 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
333 if self.port != 80: url += b':%d' % self.port
337 def process_cfg_saddrs():
338 try: port = cfg.getint('server','port')
339 except NoOptionError: port = 80
342 for addrspec in cfg.get('server','addrs').split():
343 sa = ServerAddr(port, addrspec)
346 def process_cfg_clients(constructor):
348 for cs in cfg.sections():
349 if not (':' in cs or '.' in cs): continue
351 pw = cfg.get(cs, 'password')
352 pw = pw.encode('utf-8')
353 constructor(ci,cs,pw)
355 #---------- startup ----------
357 def common_startup():
358 log_formatter = twisted.logger.formatEventAsClassicLogText
359 log_observer = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
360 twisted.logger.globalLogBeginner.beginLoggingTo(
361 [ log_observer, crash_on_critical ]
364 optparser.add_option('-c', '--config', dest='configfile',
365 default='/etc/hippotat/config')
366 (opts, args) = optparser.parse_args()
367 if len(args): optparser.error('no non-option arguments please')
369 re = regexp.compile('#.*')
370 cfg.read_string(re.sub('', defcfg))
371 cfg.read(opts.configfile)
374 log_debug(DBG.INIT, 'entering reactor')
375 if not _crashing: reactor.run()
376 print('CRASHED (end)', file=sys.stderr)