4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from twisted.internet import reactor
10 import twisted.internet.endpoints
12 from twisted.logger import LogLevel
13 import twisted.python.constants
14 from twisted.python.constants import NamedConstant
17 from ipaddress import AddressValueError
19 from optparse import OptionParser
20 from configparser import ConfigParser
21 from configparser import NoOptionError
30 import hippotat.slip as slip
32 class DBG(twisted.python.constants.Names):
33 ROUTE = NamedConstant()
34 DROP = NamedConstant()
35 FLOW = NamedConstant()
36 HTTP = NamedConstant()
37 HTTP_CTRL = NamedConstant()
38 INIT = NamedConstant()
39 QUEUE = NamedConstant()
40 QUEUE_CTRL = NamedConstant()
41 HTTP_FULL = NamedConstant()
42 SLIP_FULL = NamedConstant()
43 CTRL_DUMP = NamedConstant()
45 _hex_codec = codecs.getencoder('hex_codec')
47 log = twisted.logger.Logger()
49 def log_debug(dflag, msg, idof=None, d=None):
50 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
52 msg = '[%#x] %s' % (id(idof), msg)
55 d = _hex_codec(d)[0].decode('ascii')
57 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
62 max_batch_down = 65536 # used by server, subject to [limits]
63 max_queue_time = 10 # used by server, subject to [limits]
64 max_request_time = 54 # used by server, subject to [limits]
65 target_requests_outstanding = 3 # must match; subject to [limits] on server
66 max_requests_outstanding = 4 # used by client
67 max_batch_up = 4000 # used by client
68 http_timeout = 30 # used by client
69 http_retry = 5 # used by client
71 #[server] or [<client>] overrides
72 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
73 # extra interpolations: %(local)s %(peer)s %(rnet)s
74 # obtained on server [virtual]server [virtual]relay [virtual]network
75 # from on client <client> [virtual]server [virtual]routes
80 # network = <prefix>/<len> # mandatory for server
81 # server = <ipaddr> # used by both, default is computed from `network'
82 # relay = <ipaddr> # used by server, default from `network' and `server'
83 # default server is first host in network
84 # default relay is first host which is not server
87 # addrs = 127.0.0.1 ::1 # mandatory for server
88 port = 80 # used by server
89 # url # used by client; default from first `addrs' and `port'
91 # [<client-ip4-or-ipv6-address>]
92 # password = <password> # used by both, must match
95 max_batch_down = 262144 # used by server
96 max_queue_time = 121 # used by server
97 max_request_time = 121 # used by server
98 target_requests_outstanding = 10 # used by server
101 # these need to be defined here so that they can be imported by import *
103 optparser = OptionParser()
105 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
106 def mime_translate(s):
107 # SLIP-encoded packets cannot contain ESC ESC.
108 # Swap `-' and ESC. The result cannot contain `--'
109 return s.translate(_mimetrans)
112 def __init__(self, d = { }):
115 return 'ConfigResults('+repr(self.__dict__)+')'
119 def log_discard(packet, iface, saddr, daddr, why):
121 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
124 #---------- packet parsing ----------
126 def packet_addrs(packet):
127 version = packet[0] >> 4
131 factory = ipaddress.IPv4Address
135 factory = ipaddress.IPv6Address
137 raise ValueError('unsupported IP version %d' % version)
138 saddr = factory(packet[ saddroff : saddroff + addrlen ])
139 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
140 return (saddr, daddr)
142 #---------- address handling ----------
146 r = ipaddress.IPv4Address(input)
147 except AddressValueError:
148 r = ipaddress.IPv6Address(input)
151 def ipnetwork(input):
153 r = ipaddress.IPv4Network(input)
154 except NetworkValueError:
155 r = ipaddress.IPv6Network(input)
158 #---------- ipif (SLIP) subprocess ----------
160 class SlipStreamDecoder():
161 def __init__(self, desc, on_packet):
163 self._on_packet = on_packet
165 self._log('__init__')
167 def _log(self, msg, **kwargs):
168 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
170 def inputdata(self, data):
171 self._log('inputdata', d=data)
172 data = self._buffer + data
174 packets = slip.decode(data)
175 self._buffer = packets.pop()
176 for packet in packets:
177 self._maybe_packet(packet)
178 self._log('bufremain', d=self._buffer)
180 def _maybe_packet(self, packet):
181 self._log('maybepacket', d=packet)
183 self._on_packet(packet)
187 self._maybe_packet(self._buffer)
190 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
191 def __init__(self, router):
192 self._router = router
193 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
194 def connectionMade(self): pass
195 def outReceived(self, data):
196 self._decoder.inputdata(data)
197 def slip_on_packet(self, packet):
198 (saddr, daddr) = packet_addrs(packet)
199 if saddr.is_link_local or daddr.is_link_local:
200 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
202 self._router(packet, saddr, daddr)
203 def processEnded(self, status):
204 status.raiseException()
206 def start_ipif(command, router):
208 ipif = _IpifProcessProtocol(router)
209 reactor.spawnProcess(ipif,
210 '/bin/sh',['sh','-xc', command],
211 childFDs={0:'w', 1:'r', 2:2},
214 def queue_inbound(packet):
215 log_debug(DBG.FLOW, "queue_inbound", d=packet)
216 ipif.transport.write(slip.delimiter)
217 ipif.transport.write(slip.encode(packet))
218 ipif.transport.write(slip.delimiter)
220 #---------- packet queue ----------
223 def __init__(self, desc, max_queue_time):
226 self._max_queue_time = max_queue_time
227 self._pq = collections.deque() # packets
229 def _log(self, dflag, msg, **kwargs):
230 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
232 def append(self, packet):
233 self._log(DBG.QUEUE, 'append', d=packet)
234 self._pq.append((time.monotonic(), packet))
237 self._log(DBG.QUEUE, 'nonempty ?')
239 try: (queuetime, packet) = self._pq[0]
241 self._log(DBG.QUEUE, 'nonempty ? empty.')
244 age = time.monotonic() - queuetime
245 if age > self._max_queue_time:
246 # strip old packets off the front
247 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
251 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
254 def process(self, sizequery, moredata, max_batch):
255 # sizequery() should return size of batch so far
256 # moredata(s) should add s to batch
257 self._log(DBG.QUEUE, 'process...')
259 try: (dummy, packet) = self._pq[0]
261 self._log(DBG.QUEUE, 'process... empty')
264 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
266 encoded = slip.encode(packet)
269 self._log(DBG.QUEUE_CTRL,
270 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
274 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
275 self._log(DBG.QUEUE_CTRL, 'process... overflow')
277 moredata(slip.delimiter)
282 #---------- error handling ----------
289 print('========== CRASH ==========', err,
290 '===========================', file=sys.stderr)
292 except twisted.internet.error.ReactorNotRunning: pass
294 def crash_on_defer(defer):
295 defer.addErrback(lambda err: crash(err))
297 def crash_on_critical(event):
298 if event.get('log_level') >= LogLevel.critical:
299 crash(twisted.logger.formatEvent(event))
301 #---------- config processing ----------
303 def process_cfg_common_always():
305 c.mtu = cfg.get('virtual','mtu')
307 def process_cfg_ipif(section, varmap):
309 try: v = getattr(c, s)
310 except AttributeError: continue
315 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
317 def process_cfg_network():
318 c.network = ipnetwork(cfg.get('virtual','network'))
319 if c.network.num_addresses < 3 + 2:
320 raise ValueError('network needs at least 2^3 addresses')
322 def process_cfg_server():
324 c.server = cfg.get('virtual','server')
325 except NoOptionError:
326 process_cfg_network()
327 c.server = next(c.network.hosts())
330 def __init__(self, port, addrspec):
334 self.addr = ipaddress.IPv4Address(addrspec)
335 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
337 except AddressValueError:
338 self.addr = ipaddress.IPv6Address(addrspec)
339 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
340 self._inurl = b'[%s]'
341 def make_endpoint(self):
342 return self._endpointfactory(reactor, self.port, self.addr)
344 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
345 if self.port != 80: url += b':%d' % self.port
349 def process_cfg_saddrs():
350 try: port = cfg.getint('server','port')
351 except NoOptionError: port = 80
354 for addrspec in cfg.get('server','addrs').split():
355 sa = ServerAddr(port, addrspec)
358 def process_cfg_clients(constructor):
360 for cs in cfg.sections():
361 if not (':' in cs or '.' in cs): continue
363 pw = cfg.get(cs, 'password')
364 pw = pw.encode('utf-8')
365 constructor(ci,cs,pw)
367 #---------- startup ----------
369 def common_startup():
370 log_formatter = twisted.logger.formatEventAsClassicLogText
371 log_observer = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
372 twisted.logger.globalLogBeginner.beginLoggingTo(
373 [ log_observer, crash_on_critical ]
376 optparser.add_option('-c', '--config', dest='configfile',
377 default='/etc/hippotat/config')
378 (opts, args) = optparser.parse_args()
379 if len(args): optparser.error('no non-option arguments please')
381 re = regexp.compile('#.*')
382 cfg.read_string(re.sub('', defcfg))
383 cfg.read(opts.configfile)
386 log_debug(DBG.INIT, 'entering reactor')
387 if not _crashing: reactor.run()
388 print('CRASHED (end)', file=sys.stderr)