4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from twisted.internet import reactor
10 import twisted.internet.endpoints
12 from twisted.logger import LogLevel
13 import twisted.python.constants
14 from twisted.python.constants import NamedConstant
17 from ipaddress import AddressValueError
19 from optparse import OptionParser
20 from configparser import ConfigParser
21 from configparser import NoOptionError
29 import hippotat.slip as slip
31 class DBG(twisted.python.constants.Names):
32 ROUTE = NamedConstant()
33 DROP = NamedConstant()
34 FLOW = NamedConstant()
35 HTTP = NamedConstant()
36 HTTP_CTRL = NamedConstant()
37 INIT = NamedConstant()
38 QUEUE = NamedConstant()
39 QUEUE_CTRL = NamedConstant()
40 HTTP_FULL = NamedConstant()
42 _hex_codec = codecs.getencoder('hex_codec')
44 log = twisted.logger.Logger()
46 def log_debug(dflag, msg, idof=None, d=None):
48 msg = '[%d] %s' % (id(idof), msg)
51 d = _hex_codec(d)[0].decode('ascii')
53 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
58 max_batch_down = 65536 # used by server, subject to [limits]
59 max_queue_time = 10 # used by server, subject to [limits]
60 max_request_time = 54 # used by server, subject to [limits]
61 target_requests_outstanding = 3 # must match; subject to [limits] on server
62 max_requests_outstanding = 4 # used by client
63 max_batch_up = 4000 # used by client
64 http_timeout = 30 # used by client
65 http_retry = 5 # used by client
67 #[server] or [<client>] overrides
68 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
69 # extra interpolations: %(local)s %(peer)s %(rnet)s
70 # obtained on server [virtual]server [virtual]relay [virtual]network
71 # from on client <client> [virtual]server [virtual]routes
76 # network = <prefix>/<len> # mandatory for server
77 # server = <ipaddr> # used by both, default is computed from `network'
78 # relay = <ipaddr> # used by server, default from `network' and `server'
79 # default server is first host in network
80 # default relay is first host which is not server
83 # addrs = 127.0.0.1 ::1 # mandatory for server
84 port = 80 # used by server
85 # url # used by client; default from first `addrs' and `port'
87 # [<client-ip4-or-ipv6-address>]
88 # password = <password> # used by both, must match
91 max_batch_down = 262144 # used by server
92 max_queue_time = 121 # used by server
93 max_request_time = 121 # used by server
94 target_requests_outstanding = 10 # used by server
97 # these need to be defined here so that they can be imported by import *
99 optparser = OptionParser()
101 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
102 def mime_translate(s):
103 # SLIP-encoded packets cannot contain ESC ESC.
104 # Swap `-' and ESC. The result cannot contain `--'
105 return s.translate(_mimetrans)
108 def __init__(self, d = { }):
111 return 'ConfigResults('+repr(self.__dict__)+')'
115 def log_discard(packet, iface, saddr, daddr, why):
117 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
120 #---------- packet parsing ----------
122 def packet_addrs(packet):
123 version = packet[0] >> 4
127 factory = ipaddress.IPv4Address
131 factory = ipaddress.IPv6Address
133 raise ValueError('unsupported IP version %d' % version)
134 saddr = factory(packet[ saddroff : saddroff + addrlen ])
135 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
136 return (saddr, daddr)
138 #---------- address handling ----------
142 r = ipaddress.IPv4Address(input)
143 except AddressValueError:
144 r = ipaddress.IPv6Address(input)
147 def ipnetwork(input):
149 r = ipaddress.IPv4Network(input)
150 except NetworkValueError:
151 r = ipaddress.IPv6Network(input)
154 #---------- ipif (SLIP) subprocess ----------
156 class SlipStreamDecoder():
157 def __init__(self, on_packet):
158 # we will call packet(<packet>)
160 self._on_packet = on_packet
162 def inputdata(self, data):
163 #print('SLIP-GOT ', repr(data))
165 packets = slip.decode(self._buffer)
166 self._buffer = packets.pop()
167 for packet in packets:
168 self._maybe_packet(packet)
170 def _maybe_packet(self, packet):
172 self._on_packet(packet)
175 self._maybe_packet(self._buffer)
178 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
179 def __init__(self, router):
180 self._router = router
181 self._decoder = SlipStreamDecoder(self.slip_on_packet)
182 def connectionMade(self): pass
183 def outReceived(self, data):
184 self._decoder.inputdata(data)
185 def slip_on_packet(self, packet):
186 (saddr, daddr) = packet_addrs(packet)
187 if saddr.is_link_local or daddr.is_link_local:
188 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
190 self._router(packet, saddr, daddr)
191 def processEnded(self, status):
192 status.raiseException()
194 def start_ipif(command, router):
196 ipif = _IpifProcessProtocol(router)
197 reactor.spawnProcess(ipif,
198 '/bin/sh',['sh','-xc', command],
199 childFDs={0:'w', 1:'r', 2:2},
202 def queue_inbound(packet):
203 ipif.transport.write(slip.delimiter)
204 ipif.transport.write(slip.encode(packet))
205 ipif.transport.write(slip.delimiter)
207 #---------- packet queue ----------
210 def __init__(self, desc, max_queue_time):
212 self._max_queue_time = max_queue_time
213 self._pq = collections.deque() # packets
215 def _log(self, dflag, msg, **kwargs):
216 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
218 def append(self, packet):
219 self._log(DBG.QUEUE, 'append', d=packet)
220 self._pq.append((time.monotonic(), packet))
223 self._log(DBG.QUEUE, 'nonempty ?')
225 try: (queuetime, packet) = self._pq[0]
227 self._log(DBG.QUEUE, 'nonempty ? empty.')
230 age = time.monotonic() - queuetime
231 if age > self._max_queue_time:
232 # strip old packets off the front
233 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
237 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
240 def process(self, sizequery, moredata, max_batch):
241 # sizequery() should return size of batch so far
242 # moredata(s) should add s to batch
243 self._log(DBG.QUEUE, 'process...')
245 try: (dummy, packet) = self._pq[0]
247 self._log(DBG.QUEUE, 'process... empty')
250 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
252 encoded = slip.encode(packet)
255 self._log(DBG.QUEUE_CTRL,
256 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
260 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
261 self._log(DBG.QUEUE_CTRL, 'process... overflow')
263 moredata(slip.delimiter)
268 #---------- error handling ----------
275 print('CRASH ', err, file=sys.stderr)
277 except twisted.internet.error.ReactorNotRunning: pass
279 def crash_on_defer(defer):
280 defer.addErrback(lambda err: crash(err))
282 def crash_on_critical(event):
283 if event.get('log_level') >= LogLevel.critical:
284 crash(twisted.logger.formatEvent(event))
286 #---------- config processing ----------
288 def process_cfg_common_always():
290 c.mtu = cfg.get('virtual','mtu')
292 def process_cfg_ipif(section, varmap):
294 try: v = getattr(c, s)
295 except AttributeError: continue
300 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
302 def process_cfg_network():
303 c.network = ipnetwork(cfg.get('virtual','network'))
304 if c.network.num_addresses < 3 + 2:
305 raise ValueError('network needs at least 2^3 addresses')
307 def process_cfg_server():
309 c.server = cfg.get('virtual','server')
310 except NoOptionError:
311 process_cfg_network()
312 c.server = next(c.network.hosts())
315 def __init__(self, port, addrspec):
319 self.addr = ipaddress.IPv4Address(addrspec)
320 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
322 except AddressValueError:
323 self.addr = ipaddress.IPv6Address(addrspec)
324 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
325 self._inurl = b'[%s]'
326 def make_endpoint(self):
327 return self._endpointfactory(reactor, self.port, self.addr)
329 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
330 if self.port != 80: url += b':%d' % self.port
334 def process_cfg_saddrs():
335 try: port = cfg.getint('server','port')
336 except NoOptionError: port = 80
339 for addrspec in cfg.get('server','addrs').split():
340 sa = ServerAddr(port, addrspec)
343 def process_cfg_clients(constructor):
345 for cs in cfg.sections():
346 if not (':' in cs or '.' in cs): continue
348 pw = cfg.get(cs, 'password')
349 pw = pw.encode('utf-8')
350 constructor(ci,cs,pw)
352 #---------- startup ----------
354 def common_startup():
355 log_formatter = twisted.logger.formatEventAsClassicLogText
356 log_observer = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
357 twisted.logger.globalLogBeginner.beginLoggingTo(
358 [ log_observer, crash_on_critical ]
361 optparser.add_option('-c', '--config', dest='configfile',
362 default='/etc/hippotat/config')
363 (opts, args) = optparser.parse_args()
364 if len(args): optparser.error('no non-option arguments please')
366 re = regexp.compile('#.*')
367 cfg.read_string(re.sub('', defcfg))
368 cfg.read(opts.configfile)
371 log_debug(DBG.INIT, 'entering reactor')
372 if not _crashing: reactor.run()
373 print('CRASHED (end)', file=sys.stderr)