4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from twisted.internet import reactor
10 from twisted.logger import LogLevel
11 import twisted.internet.endpoints
14 from ipaddress import AddressValueError
16 from optparse import OptionParser
17 from configparser import ConfigParser
18 from configparser import NoOptionError
25 from twisted.python.constants import NamedConstant
27 import hippotat.slip as slip
29 class DBG(twisted.python.constants.Names):
30 ROUTE = NamedConstant()
31 FLOW = NamedConstant()
32 HTTP = NamedConstant()
33 HTTP_CTRL = NamedConstant()
34 INIT = NamedConstant()
35 QUEUE = NamedConstant()
36 QUEUE_CTRL = NamedConstant()
41 max_batch_down = 65536 # used by server, subject to [limits]
42 max_queue_time = 10 # used by server, subject to [limits]
43 max_request_time = 54 # used by server, subject to [limits]
44 target_requests_outstanding = 3 # must match; subject to [limits] on server
45 max_requests_outstanding = 4 # used by client
46 max_batch_up = 4000 # used by client
47 http_timeout = 30 # used by client
48 http_retry = 5 # used by client
50 #[server] or [<client>] overrides
51 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
52 # extra interpolations: %(local)s %(peer)s %(rnet)s
53 # obtained on server [virtual]server [virtual]relay [virtual]network
54 # from on client <client> [virtual]server [virtual]routes
59 # network = <prefix>/<len> # mandatory for server
60 # server = <ipaddr> # used by both, default is computed from `network'
61 # relay = <ipaddr> # used by server, default from `network' and `server'
62 # default server is first host in network
63 # default relay is first host which is not server
66 # addrs = 127.0.0.1 ::1 # mandatory for server
67 port = 80 # used by server
68 # url # used by client; default from first `addrs' and `port'
70 # [<client-ip4-or-ipv6-address>]
71 # password = <password> # used by both, must match
74 max_batch_down = 262144 # used by server
75 max_queue_time = 121 # used by server
76 max_request_time = 121 # used by server
77 target_requests_outstanding = 10 # used by server
80 # these need to be defined here so that they can be imported by import *
82 optparser = OptionParser()
84 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
85 def mime_translate(s):
86 # SLIP-encoded packets cannot contain ESC ESC.
87 # Swap `-' and ESC. The result cannot contain `--'
88 return s.translate(_mimetrans)
91 def __init__(self, d = { }):
94 return 'ConfigResults('+repr(self.__dict__)+')'
98 def log_discard(packet, saddr, daddr, why):
99 print('DROP ', saddr, daddr, why)
100 # syslog.syslog(syslog.LOG_DEBUG,
101 # 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
103 #---------- packet parsing ----------
105 def packet_addrs(packet):
106 version = packet[0] >> 4
110 factory = ipaddress.IPv4Address
114 factory = ipaddress.IPv6Address
116 raise ValueError('unsupported IP version %d' % version)
117 saddr = factory(packet[ saddroff : saddroff + addrlen ])
118 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
119 return (saddr, daddr)
121 #---------- address handling ----------
125 r = ipaddress.IPv4Address(input)
126 except AddressValueError:
127 r = ipaddress.IPv6Address(input)
130 def ipnetwork(input):
132 r = ipaddress.IPv4Network(input)
133 except NetworkValueError:
134 r = ipaddress.IPv6Network(input)
137 #---------- ipif (SLIP) subprocess ----------
139 class SlipStreamDecoder():
140 def __init__(self, on_packet):
141 # we will call packet(<packet>)
143 self._on_packet = on_packet
145 def inputdata(self, data):
146 #print('SLIP-GOT ', repr(data))
148 packets = slip.decode(self._buffer)
149 self._buffer = packets.pop()
150 for packet in packets:
151 self._maybe_packet(packet)
153 def _maybe_packet(self, packet):
155 self._on_packet(packet)
158 self._maybe_packet(self._buffer)
161 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
162 def __init__(self, router):
163 self._router = router
164 self._decoder = SlipStreamDecoder(self.slip_on_packet)
165 def connectionMade(self): pass
166 def outReceived(self, data):
167 self._decoder.inputdata(data)
168 def slip_on_packet(self, packet):
169 (saddr, daddr) = packet_addrs(packet)
170 if saddr.is_link_local or daddr.is_link_local:
171 log_discard(packet, saddr, daddr, 'link-local')
173 self._router(packet, saddr, daddr)
174 def processEnded(self, status):
175 status.raiseException()
177 def start_ipif(command, router):
179 ipif = _IpifProcessProtocol(router)
180 reactor.spawnProcess(ipif,
181 '/bin/sh',['sh','-xc', command],
182 childFDs={0:'w', 1:'r', 2:2},
185 def queue_inbound(packet):
186 ipif.transport.write(slip.delimiter)
187 ipif.transport.write(slip.encode(packet))
188 ipif.transport.write(slip.delimiter)
190 #---------- packet queue ----------
193 def __init__(self, desc, max_queue_time):
195 self._max_queue_time = max_queue_time
196 self._pq = collections.deque() # packets
198 def _log_debug(self, fn, pri, msg)
201 def append(self, packet):
202 log_data(DBG.QUEUE, packet, 'pq %s: append' % self._desc)
203 self._pq.append((time.monotonic(), packet))
206 log_debug(DBG.QUEUE, 'pq %s: nonempty ?' % self._desc)
208 try: (queuetime, packet) = self._pq[0]
209 except IndexError: return False
211 age = time.monotonic() - queuetime
212 if age > self._max_queue_time:
213 # strip old packets off the front
219 def process(self, sizequery, moredata, max_batch):
220 # sizequery() should return size of batch so far
221 # moredata(s) should add s to batch
223 try: (dummy, packet) = self._pq[0]
224 except IndexError: break
226 encoded = slip.encode(packet)
230 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
232 moredata(slip.delimiter)
237 #---------- error handling ----------
240 print('CRASH ', err, file=sys.stderr)
242 except twisted.internet.error.ReactorNotRunning: pass
244 def crash_on_defer(defer):
245 defer.addErrback(lambda err: crash(err))
247 def crash_on_critical(event):
248 if event.get('log_level') >= LogLevel.critical:
249 crash(twisted.logger.formatEvent(event))
251 #---------- config processing ----------
253 def process_cfg_common_always():
255 c.mtu = cfg.get('virtual','mtu')
257 def process_cfg_ipif(section, varmap):
259 try: v = getattr(c, s)
260 except AttributeError: continue
265 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
267 def process_cfg_network():
268 c.network = ipnetwork(cfg.get('virtual','network'))
269 if c.network.num_addresses < 3 + 2:
270 raise ValueError('network needs at least 2^3 addresses')
272 def process_cfg_server():
274 c.server = cfg.get('virtual','server')
275 except NoOptionError:
276 process_cfg_network()
277 c.server = next(c.network.hosts())
280 def __init__(self, port, addrspec):
284 self.addr = ipaddress.IPv4Address(addrspec)
285 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
287 except AddressValueError:
288 self.addr = ipaddress.IPv6Address(addrspec)
289 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
290 self._inurl = b'[%s]'
291 def make_endpoint(self):
292 return self._endpointfactory(reactor, self.port, self.addr)
294 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
295 if self.port != 80: url += b':%d' % self.port
299 def process_cfg_saddrs():
300 try: port = cfg.getint('server','port')
301 except NoOptionError: port = 80
304 for addrspec in cfg.get('server','addrs').split():
305 sa = ServerAddr(port, addrspec)
308 def process_cfg_clients(constructor):
310 for cs in cfg.sections():
311 if not (':' in cs or '.' in cs): continue
313 pw = cfg.get(cs, 'password')
314 pw = pw.encode('utf-8')
315 constructor(ci,cs,pw)
317 #---------- startup ----------
319 def common_startup():
320 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
322 optparser.add_option('-c', '--config', dest='configfile',
323 default='/etc/hippotat/config')
324 (opts, args) = optparser.parse_args()
325 if len(args): optparser.error('no non-option arguments please')
327 re = regexp.compile('#.*')
328 cfg.read_string(re.sub('', defcfg))
329 cfg.read(opts.configfile)
333 print('CRASHED (end)', file=sys.stderr)