4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from twisted.internet import reactor
10 import twisted.internet.endpoints
12 from twisted.logger import LogLevel
13 import twisted.python.constants
14 from twisted.python.constants import NamedConstant
17 from ipaddress import AddressValueError
19 from optparse import OptionParser
20 from configparser import ConfigParser
21 from configparser import NoOptionError
29 import hippotat.slip as slip
31 class DBG(twisted.python.constants.Names):
32 ROUTE = NamedConstant()
33 FLOW = NamedConstant()
34 HTTP = NamedConstant()
35 HTTP_CTRL = NamedConstant()
36 INIT = NamedConstant()
37 QUEUE = NamedConstant()
38 QUEUE_CTRL = NamedConstant()
40 _hexcodec = codecs.getencoder('hex_codec')
42 log = twisted.logger.Logger()
44 def log_debug(dflag, msg, idof=None, d=None):
46 msg = '[%d] %s' % (id(idof), msg)
51 log.info('{dflag} {msgcore}', dflag=dflag, msgcore=msg)
56 max_batch_down = 65536 # used by server, subject to [limits]
57 max_queue_time = 10 # used by server, subject to [limits]
58 max_request_time = 54 # used by server, subject to [limits]
59 target_requests_outstanding = 3 # must match; subject to [limits] on server
60 max_requests_outstanding = 4 # used by client
61 max_batch_up = 4000 # used by client
62 http_timeout = 30 # used by client
63 http_retry = 5 # used by client
65 #[server] or [<client>] overrides
66 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
67 # extra interpolations: %(local)s %(peer)s %(rnet)s
68 # obtained on server [virtual]server [virtual]relay [virtual]network
69 # from on client <client> [virtual]server [virtual]routes
74 # network = <prefix>/<len> # mandatory for server
75 # server = <ipaddr> # used by both, default is computed from `network'
76 # relay = <ipaddr> # used by server, default from `network' and `server'
77 # default server is first host in network
78 # default relay is first host which is not server
81 # addrs = 127.0.0.1 ::1 # mandatory for server
82 port = 80 # used by server
83 # url # used by client; default from first `addrs' and `port'
85 # [<client-ip4-or-ipv6-address>]
86 # password = <password> # used by both, must match
89 max_batch_down = 262144 # used by server
90 max_queue_time = 121 # used by server
91 max_request_time = 121 # used by server
92 target_requests_outstanding = 10 # used by server
95 # these need to be defined here so that they can be imported by import *
97 optparser = OptionParser()
99 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
100 def mime_translate(s):
101 # SLIP-encoded packets cannot contain ESC ESC.
102 # Swap `-' and ESC. The result cannot contain `--'
103 return s.translate(_mimetrans)
106 def __init__(self, d = { }):
109 return 'ConfigResults('+repr(self.__dict__)+')'
113 def log_discard(packet, saddr, daddr, why):
115 Print('Drop ', Saddr, Daddr, why)
116 # syslog.syslog(syslog.LOG_DEBUG,
117 # 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
119 #---------- packet parsing ----------
121 def packet_addrs(packet):
122 version = packet[0] >> 4
126 factory = ipaddress.IPv4Address
130 factory = ipaddress.IPv6Address
132 raise ValueError('unsupported IP version %d' % version)
133 saddr = factory(packet[ saddroff : saddroff + addrlen ])
134 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
135 return (saddr, daddr)
137 #---------- address handling ----------
141 r = ipaddress.IPv4Address(input)
142 except AddressValueError:
143 r = ipaddress.IPv6Address(input)
146 def ipnetwork(input):
148 r = ipaddress.IPv4Network(input)
149 except NetworkValueError:
150 r = ipaddress.IPv6Network(input)
153 #---------- ipif (SLIP) subprocess ----------
155 class SlipStreamDecoder():
156 def __init__(self, on_packet):
157 # we will call packet(<packet>)
159 self._on_packet = on_packet
161 def inputdata(self, data):
162 #print('SLIP-GOT ', repr(data))
164 packets = slip.decode(self._buffer)
165 self._buffer = packets.pop()
166 for packet in packets:
167 self._maybe_packet(packet)
169 def _maybe_packet(self, packet):
171 self._on_packet(packet)
174 self._maybe_packet(self._buffer)
177 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
178 def __init__(self, router):
179 self._router = router
180 self._decoder = SlipStreamDecoder(self.slip_on_packet)
181 def connectionMade(self): pass
182 def outReceived(self, data):
183 self._decoder.inputdata(data)
184 def slip_on_packet(self, packet):
185 (saddr, daddr) = packet_addrs(packet)
186 if saddr.is_link_local or daddr.is_link_local:
187 log_discard(packet, saddr, daddr, 'link-local')
189 self._router(packet, saddr, daddr)
190 def processEnded(self, status):
191 status.raiseException()
193 def start_ipif(command, router):
195 ipif = _IpifProcessProtocol(router)
196 reactor.spawnProcess(ipif,
197 '/bin/sh',['sh','-xc', command],
198 childFDs={0:'w', 1:'r', 2:2},
201 def queue_inbound(packet):
202 ipif.transport.write(slip.delimiter)
203 ipif.transport.write(slip.encode(packet))
204 ipif.transport.write(slip.delimiter)
206 #---------- packet queue ----------
209 def __init__(self, desc, max_queue_time):
211 self._max_queue_time = max_queue_time
212 self._pq = collections.deque() # packets
214 def _log(self, dflag, msg, **kwargs)
215 log_debug(dflag, self._desc+' pq: '+msg, **kwargs)
217 def append(self, packet):
218 self._log(DBG.QUEUE, 'append', d=packet)
219 self._pq.append((time.monotonic(), packet))
222 self._log(DBG.QUEUE, 'nonempty ?')
224 try: (queuetime, packet) = self._pq[0]
226 self._log(DBG.QUEUE, 'nonempty ? empty.')
229 age = time.monotonic() - queuetime
230 if age > self._max_queue_time:
231 # strip old packets off the front
232 self._log(DBG.QUEUE, 'dropping (old)', d=packet)
236 self._log(DBG.QUEUE, 'nonempty ? nonempty.')
239 def process(self, sizequery, moredata, max_batch):
240 # sizequery() should return size of batch so far
241 # moredata(s) should add s to batch
242 self._log(DBG.QUEUE, 'process...')
244 try: (dummy, packet) = self._pq[0]
246 self._log(DBG.QUEUE, 'process... empty')
249 self._log(DBG.QUEUE_CTRL, 'process... packet', d=packet)
251 encoded = slip.encode(packet)
254 self._log(DBG.QUEUE_CTRL,
255 'process... (sofar=%d, max=%d) encoded' % (sofar, max_batch),
259 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
260 self._log(DBG.QUEUE_CTRL, 'process... overflow')
262 moredata(slip.delimiter)
267 #---------- error handling ----------
270 print('CRASH ', err, file=sys.stderr)
272 except twisted.internet.error.ReactorNotRunning: pass
274 def crash_on_defer(defer):
275 defer.addErrback(lambda err: crash(err))
277 def crash_on_critical(event):
278 if event.get('log_level') >= LogLevel.critical:
279 crash(twisted.logger.formatEvent(event))
281 #---------- config processing ----------
283 def process_cfg_common_always():
285 c.mtu = cfg.get('virtual','mtu')
287 def process_cfg_ipif(section, varmap):
289 try: v = getattr(c, s)
290 except AttributeError: continue
295 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
297 def process_cfg_network():
298 c.network = ipnetwork(cfg.get('virtual','network'))
299 if c.network.num_addresses < 3 + 2:
300 raise ValueError('network needs at least 2^3 addresses')
302 def process_cfg_server():
304 c.server = cfg.get('virtual','server')
305 except NoOptionError:
306 process_cfg_network()
307 c.server = next(c.network.hosts())
310 def __init__(self, port, addrspec):
314 self.addr = ipaddress.IPv4Address(addrspec)
315 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
317 except AddressValueError:
318 self.addr = ipaddress.IPv6Address(addrspec)
319 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
320 self._inurl = b'[%s]'
321 def make_endpoint(self):
322 return self._endpointfactory(reactor, self.port, self.addr)
324 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
325 if self.port != 80: url += b':%d' % self.port
329 def process_cfg_saddrs():
330 try: port = cfg.getint('server','port')
331 except NoOptionError: port = 80
334 for addrspec in cfg.get('server','addrs').split():
335 sa = ServerAddr(port, addrspec)
338 def process_cfg_clients(constructor):
340 for cs in cfg.sections():
341 if not (':' in cs or '.' in cs): continue
343 pw = cfg.get(cs, 'password')
344 pw = pw.encode('utf-8')
345 constructor(ci,cs,pw)
347 #---------- startup ----------
349 def common_startup():
350 log_formatter = twisted.logger.formatEventAsClassicLogText
351 log_observer = twisted.logger.FileLogObserver(sys.stderr, log_formatter)
352 twisted.logger.globalLogBeginner.beginLoggingTo(
353 [ log_observer, crash_on_critical ]
356 optparser.add_option('-c', '--config', dest='configfile',
357 default='/etc/hippotat/config')
358 (opts, args) = optparser.parse_args()
359 if len(args): optparser.error('no non-option arguments please')
361 re = regexp.compile('#.*')
362 cfg.read_string(re.sub('', defcfg))
363 cfg.read(opts.configfile)
366 log_debug(DBG.INIT, 'ready')
368 print('CRASHED (end)', file=sys.stderr)