4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from twisted.internet import reactor
10 import twisted.internet.endpoints
12 from twisted.logger import LogLevel
13 import twisted.python.constants
14 from twisted.python.constants import NamedConstant
17 from ipaddress import AddressValueError
19 from optparse import OptionParser
20 from configparser import ConfigParser
21 from configparser import NoOptionError
29 import hippotat.slip as slip
31 class DBG(twisted.python.constants.Names):
32 ROUTE = NamedConstant()
33 DROP = NamedConstant()
34 FLOW = NamedConstant()
35 HTTP = NamedConstant()
36 HTTP_CTRL = NamedConstant()
37 INIT = NamedConstant()
38 QUEUE = NamedConstant()
39 QUEUE_CTRL = NamedConstant()
41 _hex_codec = codecs.getencoder('hex_codec')
43 log = twisted.logger.Logger()
45 def log_debug(dflag, msg, idof=None, d=None):
47 msg = '[%d] %s' % (id(idof), msg)
50 d = _hex_codec(d)[0].decode('ascii')
52 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
57 max_batch_down = 65536 # used by server, subject to [limits]
58 max_queue_time = 10 # used by server, subject to [limits]
59 max_request_time = 54 # used by server, subject to [limits]
60 target_requests_outstanding = 3 # must match; subject to [limits] on server
61 max_requests_outstanding = 4 # used by client
62 max_batch_up = 4000 # used by client
63 http_timeout = 30 # used by client
64 http_retry = 5 # used by client
66 #[server] or [<client>] overrides
67 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
68 # extra interpolations: %(local)s %(peer)s %(rnet)s
69 # obtained on server [virtual]server [virtual]relay [virtual]network
70 # from on client <client> [virtual]server [virtual]routes
75 # network = <prefix>/<len> # mandatory for server
76 # server = <ipaddr> # used by both, default is computed from `network'
77 # relay = <ipaddr> # used by server, default from `network' and `server'
78 # default server is first host in network
79 # default relay is first host which is not server
82 # addrs = 127.0.0.1 ::1 # mandatory for server
83 port = 80 # used by server
84 # url # used by client; default from first `addrs' and `port'
86 # [<client-ip4-or-ipv6-address>]
87 # password = <password> # used by both, must match
90 max_batch_down = 262144 # used by server
91 max_queue_time = 121 # used by server
92 max_request_time = 121 # used by server
93 target_requests_outstanding = 10 # used by server
96 # these need to be defined here so that they can be imported by import *
98 optparser = OptionParser()
100 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
101 def mime_translate(s):
102 # SLIP-encoded packets cannot contain ESC ESC.
103 # Swap `-' and ESC. The result cannot contain `--'
104 return s.translate(_mimetrans)
107 def __init__(self, d = { }):
110 return 'ConfigResults('+repr(self.__dict__)+')'
114 def log_discard(packet, iface, saddr, daddr, why):
116 'discarded packet [%s] %s -> %s: %s' % (iface, saddr, daddr, why),
119 #---------- packet parsing ----------
121 def packet_addrs(packet):
122 version = packet[0] >> 4
126 factory = ipaddress.IPv4Address
130 factory = ipaddress.IPv6Address
132 raise ValueError('unsupported IP version %d' % version)
133 saddr = factory(packet[ saddroff : saddroff + addrlen ])
134 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
135 return (saddr, daddr)
137 #---------- address handling ----------
141 r = ipaddress.IPv4Address(input)
142 except AddressValueError:
143 r = ipaddress.IPv6Address(input)
146 def ipnetwork(input):
148 r = ipaddress.IPv4Network(input)
149 except NetworkValueError:
150 r = ipaddress.IPv6Network(input)
153 #---------- ipif (SLIP) subprocess ----------
155 class SlipStreamDecoder():
156 def __init__(self, on_packet):
157 # we will call packet(<packet>)
159 self._on_packet = on_packet
161 def inputdata(self, data):
162 #print('SLIP-GOT ', repr(data))
164 packets = slip.decode(self._buffer)
165 self._buffer = packets.pop()
166 for packet in packets:
167 self._maybe_packet(packet)
169 def _maybe_packet(self, packet):
171 self._on_packet(packet)
174 self._maybe_packet(self._buffer)
177 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
178 def __init__(self, router):
179 self._router = router
180 self._decoder = SlipStreamDecoder(self.slip_on_packet)
181 def connectionMade(self): pass
182 def outReceived(self, data):
183 self._decoder.inputdata(data)
184 def slip_on_packet(self, packet):
185 (saddr, daddr) = packet_addrs(packet)
186 if saddr.is_link_local or daddr.is_link_local:
187 log_discard(packet, 'ipif', saddr, daddr, 'link-local')
189 self._router(packet, saddr, daddr)
190 def processEnded(self, status):
191 status.raiseException()
193 def start_ipif(command, router):
195 ipif = _IpifProcessProtocol(router)
196 reactor.spawnProcess(ipif,
197 '/bin/sh',['sh','-xc', command],
198 childFDs={0:'w', 1:'r', 2:2},
201 def queue_inbound(packet):
202 ipif.transport.write(slip.delimiter)
203 ipif.transport.write(slip.encode(packet))
204 ipif.transport.write(slip.delimiter)
206 #---------- packet queue ----------
209 def __init__(self, desc, max_queue_time):
211 self._max_queue_time = max_queue_time
212 self._pq = collections.deque() # packets
214 def _log(self, dflag, msg, **kwargs):
215 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
217 def append(self, packet):
218 self._log(DBG.QUEUE, 'append', d=packet)
219 self._pq.append((time.monotonic(), packet))
222 self._log(DBG.QUEUE, 'nonempty ?')
224 try: (queuetime, packet) = self._pq[0]
226 self._log(DBG.QUEUE, 'nonempty ? empty.')
229 age = time.monotonic() - queuetime
230 if age > self._max_queue_time:
231 # strip old packets off the front
232 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
236 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
239 def process(self, sizequery, moredata, max_batch):
240 # sizequery() should return size of batch so far
241 # moredata(s) should add s to batch
242 self._log(DBG.QUEUE, 'process...')
244 try: (dummy, packet) = self._pq[0]
246 self._log(DBG.QUEUE, 'process... empty')
249 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
251 encoded = slip.encode(packet)
254 self._log(DBG.QUEUE_CTRL,
255 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
259 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
260 self._log(DBG.QUEUE_CTRL, 'process... overflow')
262 moredata(slip.delimiter)
267 #---------- error handling ----------
274 print('CRASH ', err, file=sys.stderr)
276 except twisted.internet.error.ReactorNotRunning: pass
278 def crash_on_defer(defer):
279 defer.addErrback(lambda err: crash(err))
281 def crash_on_critical(event):
282 if event.get('log_level') >= LogLevel.critical:
283 crash(twisted.logger.formatEvent(event))
285 #---------- config processing ----------
287 def process_cfg_common_always():
289 c.mtu = cfg.get('virtual','mtu')
291 def process_cfg_ipif(section, varmap):
293 try: v = getattr(c, s)
294 except AttributeError: continue
299 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
301 def process_cfg_network():
302 c.network = ipnetwork(cfg.get('virtual','network'))
303 if c.network.num_addresses < 3 + 2:
304 raise ValueError('network needs at least 2^3 addresses')
306 def process_cfg_server():
308 c.server = cfg.get('virtual','server')
309 except NoOptionError:
310 process_cfg_network()
311 c.server = next(c.network.hosts())
314 def __init__(self, port, addrspec):
318 self.addr = ipaddress.IPv4Address(addrspec)
319 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
321 except AddressValueError:
322 self.addr = ipaddress.IPv6Address(addrspec)
323 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
324 self._inurl = b'[%s]'
325 def make_endpoint(self):
326 return self._endpointfactory(reactor, self.port, self.addr)
328 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
329 if self.port != 80: url += b':%d' % self.port
333 def process_cfg_saddrs():
334 try: port = cfg.getint('server','port')
335 except NoOptionError: port = 80
338 for addrspec in cfg.get('server','addrs').split():
339 sa = ServerAddr(port, addrspec)
342 def process_cfg_clients(constructor):
344 for cs in cfg.sections():
345 if not (':' in cs or '.' in cs): continue
347 pw = cfg.get(cs, 'password')
348 pw = pw.encode('utf-8')
349 constructor(ci,cs,pw)
351 #---------- startup ----------
353 def common_startup():
354 log_formatter = twisted.logger.formatEventAsClassicLogText
355 log_observer = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
356 twisted.logger.globalLogBeginner.beginLoggingTo(
357 [ log_observer, crash_on_critical ]
360 optparser.add_option('-c', '--config', dest='configfile',
361 default='/etc/hippotat/config')
362 (opts, args) = optparser.parse_args()
363 if len(args): optparser.error('no non-option arguments please')
365 re = regexp.compile('#.*')
366 cfg.read_string(re.sub('', defcfg))
367 cfg.read(opts.configfile)
370 log_debug(DBG.INIT, 'entering reactor')
371 if not _crashing: reactor.run()
372 print('CRASHED (end)', file=sys.stderr)