4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from twisted.internet import reactor
10 import twisted.internet.endpoints
12 from twisted.logger import LogLevel
13 import twisted.python.constants
14 from twisted.python.constants import NamedConstant
17 from ipaddress import AddressValueError
19 from optparse import OptionParser
20 from configparser import ConfigParser
21 from configparser import NoOptionError
30 import hippotat.slip as slip
32 class DBG(twisted.python.constants.Names):
33 ROUTE = NamedConstant()
34 DROP = NamedConstant()
35 FLOW = NamedConstant()
36 HTTP = NamedConstant()
37 HTTP_CTRL = NamedConstant()
38 INIT = NamedConstant()
39 QUEUE = NamedConstant()
40 QUEUE_CTRL = NamedConstant()
41 HTTP_FULL = NamedConstant()
42 SLIP_FULL = NamedConstant()
44 _hex_codec = codecs.getencoder('hex_codec')
46 log = twisted.logger.Logger()
48 def log_debug(dflag, msg, idof=None, d=None):
49 #print('---------------->',repr((dflag, msg, idof, d)), file=sys.stderr)
51 msg = '[%d] %s' % (id(idof), msg)
54 d = _hex_codec(d)[0].decode('ascii')
56 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
61 max_batch_down = 65536 # used by server, subject to [limits]
62 max_queue_time = 10 # used by server, subject to [limits]
63 max_request_time = 54 # used by server, subject to [limits]
64 target_requests_outstanding = 3 # must match; subject to [limits] on server
65 max_requests_outstanding = 4 # used by client
66 max_batch_up = 4000 # used by client
67 http_timeout = 30 # used by client
68 http_retry = 5 # used by client
70 #[server] or [<client>] overrides
71 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
72 # extra interpolations: %(local)s %(peer)s %(rnet)s
73 # obtained on server [virtual]server [virtual]relay [virtual]network
74 # from on client <client> [virtual]server [virtual]routes
79 # network = <prefix>/<len> # mandatory for server
80 # server = <ipaddr> # used by both, default is computed from `network'
81 # relay = <ipaddr> # used by server, default from `network' and `server'
82 # default server is first host in network
83 # default relay is first host which is not server
86 # addrs = 127.0.0.1 ::1 # mandatory for server
87 port = 80 # used by server
88 # url # used by client; default from first `addrs' and `port'
90 # [<client-ip4-or-ipv6-address>]
91 # password = <password> # used by both, must match
94 max_batch_down = 262144 # used by server
95 max_queue_time = 121 # used by server
96 max_request_time = 121 # used by server
97 target_requests_outstanding = 10 # used by server
100 # these need to be defined here so that they can be imported by import *
102 optparser = OptionParser()
104 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
105 def mime_translate(s):
106 # SLIP-encoded packets cannot contain ESC ESC.
107 # Swap `-' and ESC. The result cannot contain `--'
108 return s.translate(_mimetrans)
111 def __init__(self, d = { }):
114 return 'ConfigResults('+repr(self.__dict__)+')'
118 def log_discard(packet, iface, saddr, daddr, why):
120 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
123 #---------- packet parsing ----------
125 def packet_addrs(packet):
126 version = packet[0] >> 4
130 factory = ipaddress.IPv4Address
134 factory = ipaddress.IPv6Address
136 raise ValueError('unsupported IP version %d' % version)
137 saddr = factory(packet[ saddroff : saddroff + addrlen ])
138 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
139 return (saddr, daddr)
141 #---------- address handling ----------
145 r = ipaddress.IPv4Address(input)
146 except AddressValueError:
147 r = ipaddress.IPv6Address(input)
150 def ipnetwork(input):
152 r = ipaddress.IPv4Network(input)
153 except NetworkValueError:
154 r = ipaddress.IPv6Network(input)
157 #---------- ipif (SLIP) subprocess ----------
159 class SlipStreamDecoder():
160 def __init__(self, desc, on_packet):
162 self._on_packet = on_packet
164 self._log('__init__')
166 def _log(self, msg, **kwargs):
167 log_debug(DBG.SLIP_FULL, 'slip %s: %s' % (self._desc, msg), **kwargs)
169 def inputdata(self, data):
170 self._log('inputdata', d=data)
171 data = self._buffer + data
173 packets = slip.decode(data)
174 self._buffer = packets.pop()
175 for packet in packets:
176 self._maybe_packet(packet)
177 self._log('bufremain', d=self._buffer)
179 def _maybe_packet(self, packet):
180 self._log('maybepacket', d=packet)
182 self._on_packet(packet)
186 self._maybe_packet(self._buffer)
189 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
190 def __init__(self, router):
191 self._router = router
192 self._decoder = SlipStreamDecoder('ipif', self.slip_on_packet)
193 def connectionMade(self): pass
194 def outReceived(self, data):
195 self._decoder.inputdata(data)
196 def slip_on_packet(self, packet):
197 (saddr, daddr) = packet_addrs(packet)
198 if saddr.is_link_local or daddr.is_link_local:
199 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
201 self._router(packet, saddr, daddr)
202 def processEnded(self, status):
203 status.raiseException()
205 def start_ipif(command, router):
207 ipif = _IpifProcessProtocol(router)
208 reactor.spawnProcess(ipif,
209 '/bin/sh',['sh','-xc', command],
210 childFDs={0:'w', 1:'r', 2:2},
213 def queue_inbound(packet):
214 log_debug(DBG.FLOW, "queue_inbound", d=packet)
215 ipif.transport.write(slip.delimiter)
216 ipif.transport.write(slip.encode(packet))
217 ipif.transport.write(slip.delimiter)
219 #---------- packet queue ----------
222 def __init__(self, desc, max_queue_time):
225 self._max_queue_time = max_queue_time
226 self._pq = collections.deque() # packets
228 def _log(self, dflag, msg, **kwargs):
229 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
231 def append(self, packet):
232 self._log(DBG.QUEUE, 'append', d=packet)
233 self._pq.append((time.monotonic(), packet))
236 self._log(DBG.QUEUE, 'nonempty ?')
238 try: (queuetime, packet) = self._pq[0]
240 self._log(DBG.QUEUE, 'nonempty ? empty.')
243 age = time.monotonic() - queuetime
244 if age > self._max_queue_time:
245 # strip old packets off the front
246 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
250 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
253 def process(self, sizequery, moredata, max_batch):
254 # sizequery() should return size of batch so far
255 # moredata(s) should add s to batch
256 self._log(DBG.QUEUE, 'process...')
258 try: (dummy, packet) = self._pq[0]
260 self._log(DBG.QUEUE, 'process... empty')
263 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
265 encoded = slip.encode(packet)
268 self._log(DBG.QUEUE_CTRL,
269 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
273 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
274 self._log(DBG.QUEUE_CTRL, 'process... overflow')
276 moredata(slip.delimiter)
281 #---------- error handling ----------
288 print('CRASH ', err, file=sys.stderr)
290 except twisted.internet.error.ReactorNotRunning: pass
292 def crash_on_defer(defer):
293 defer.addErrback(lambda err: crash(err))
295 def crash_on_critical(event):
296 if event.get('log_level') >= LogLevel.critical:
297 crash(twisted.logger.formatEvent(event))
299 #---------- config processing ----------
301 def process_cfg_common_always():
303 c.mtu = cfg.get('virtual','mtu')
305 def process_cfg_ipif(section, varmap):
307 try: v = getattr(c, s)
308 except AttributeError: continue
313 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
315 def process_cfg_network():
316 c.network = ipnetwork(cfg.get('virtual','network'))
317 if c.network.num_addresses < 3 + 2:
318 raise ValueError('network needs at least 2^3 addresses')
320 def process_cfg_server():
322 c.server = cfg.get('virtual','server')
323 except NoOptionError:
324 process_cfg_network()
325 c.server = next(c.network.hosts())
328 def __init__(self, port, addrspec):
332 self.addr = ipaddress.IPv4Address(addrspec)
333 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
335 except AddressValueError:
336 self.addr = ipaddress.IPv6Address(addrspec)
337 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
338 self._inurl = b'[%s]'
339 def make_endpoint(self):
340 return self._endpointfactory(reactor, self.port, self.addr)
342 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
343 if self.port != 80: url += b':%d' % self.port
347 def process_cfg_saddrs():
348 try: port = cfg.getint('server','port')
349 except NoOptionError: port = 80
352 for addrspec in cfg.get('server','addrs').split():
353 sa = ServerAddr(port, addrspec)
356 def process_cfg_clients(constructor):
358 for cs in cfg.sections():
359 if not (':' in cs or '.' in cs): continue
361 pw = cfg.get(cs, 'password')
362 pw = pw.encode('utf-8')
363 constructor(ci,cs,pw)
365 #---------- startup ----------
367 def common_startup():
368 log_formatter = twisted.logger.formatEventAsClassicLogText
369 log_observer = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
370 twisted.logger.globalLogBeginner.beginLoggingTo(
371 [ log_observer, crash_on_critical ]
374 optparser.add_option('-c', '--config', dest='configfile',
375 default='/etc/hippotat/config')
376 (opts, args) = optparser.parse_args()
377 if len(args): optparser.error('no non-option arguments please')
379 re = regexp.compile('#.*')
380 cfg.read_string(re.sub('', defcfg))
381 cfg.read(opts.configfile)
384 log_debug(DBG.INIT, 'entering reactor')
385 if not _crashing: reactor.run()
386 print('CRASHED (end)', file=sys.stderr)