4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from twisted.internet import reactor
10 from twisted.logger import LogLevel
11 import twisted.internet.endpoints
14 from ipaddress import AddressValueError
16 from optparse import OptionParser
17 from configparser import ConfigParser
18 from configparser import NoOptionError
25 import hippotat.slip as slip
30 max_batch_down = 65536 # used by server, subject to [limits]
31 max_queue_time = 10 # used by server, subject to [limits]
32 max_request_time = 54 # used by server, subject to [limits]
33 target_requests_outstanding = 3 # must match; subject to [limits] on server
34 max_requests_outstanding = 4 # used by client
35 max_batch_up = 4000 # used by client
36 http_timeout = 30 # used by client
37 http_retry = 5 # used by client
39 #[server] or [<client>] overrides
40 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
41 # extra interpolations: %(local)s %(peer)s %(rnet)s
42 # obtained on server [virtual]server [virtual]relay [virtual]network
43 # from on client <client> [virtual]server [virtual]routes
48 # network = <prefix>/<len> # mandatory for server
49 # server = <ipaddr> # used by both, default is computed from `network'
50 # relay = <ipaddr> # used by server, default from `network' and `server'
51 # default server is first host in network
52 # default relay is first host which is not server
55 # addrs = 127.0.0.1 ::1 # mandatory for server
56 port = 80 # used by server
57 # url # used by client; default from first `addrs' and `port'
59 # [<client-ip4-or-ipv6-address>]
60 # password = <password> # used by both, must match
63 max_batch_down = 262144 # used by server
64 max_queue_time = 121 # used by server
65 max_request_time = 121 # used by server
66 target_requests_outstanding = 10 # used by server
69 # these need to be defined here so that they can be imported by import *
71 optparser = OptionParser()
73 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
74 def mime_translate(s):
75 # SLIP-encoded packets cannot contain ESC ESC.
76 # Swap `-' and ESC. The result cannot contain `--'
77 return s.translate(_mimetrans)
80 def __init__(self, d = { }):
83 return 'ConfigResults('+repr(self.__dict__)+')'
87 def log_discard(packet, saddr, daddr, why):
88 print('DROP ', saddr, daddr, why)
89 # syslog.syslog(syslog.LOG_DEBUG,
90 # 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
92 #---------- packet parsing ----------
94 def packet_addrs(packet):
95 version = packet[0] >> 4
99 factory = ipaddress.IPv4Address
103 factory = ipaddress.IPv6Address
105 raise ValueError('unsupported IP version %d' % version)
106 saddr = factory(packet[ saddroff : saddroff + addrlen ])
107 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
108 return (saddr, daddr)
110 #---------- address handling ----------
114 r = ipaddress.IPv4Address(input)
115 except AddressValueError:
116 r = ipaddress.IPv6Address(input)
119 def ipnetwork(input):
121 r = ipaddress.IPv4Network(input)
122 except NetworkValueError:
123 r = ipaddress.IPv6Network(input)
126 #---------- ipif (SLIP) subprocess ----------
128 class SlipStreamDecoder():
129 def __init__(self, on_packet):
130 # we will call packet(<packet>)
132 self._on_packet = on_packet
134 def inputdata(self, data):
135 #print('SLIP-GOT ', repr(data))
137 packets = slip.decode(self._buffer)
138 self._buffer = packets.pop()
139 for packet in packets:
140 self._maybe_packet(packet)
142 def _maybe_packet(self, packet):
144 self._on_packet(packet)
147 self._maybe_packet(self._buffer)
150 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
151 def __init__(self, router):
152 self._router = router
153 self._decoder = SlipStreamDecoder(self.slip_on_packet)
154 def connectionMade(self): pass
155 def outReceived(self, data):
156 self._decoder.inputdata(data)
157 def slip_on_packet(self, packet):
158 (saddr, daddr) = packet_addrs(packet)
159 if saddr.is_link_local or daddr.is_link_local:
160 log_discard(packet, saddr, daddr, 'link-local')
162 self._router(packet, saddr, daddr)
163 def processEnded(self, status):
164 status.raiseException()
166 def start_ipif(command, router):
168 ipif = _IpifProcessProtocol(router)
169 reactor.spawnProcess(ipif,
170 '/bin/sh',['sh','-xc', command],
171 childFDs={0:'w', 1:'r', 2:2},
174 def queue_inbound(packet):
175 ipif.transport.write(slip.delimiter)
176 ipif.transport.write(slip.encode(packet))
177 ipif.transport.write(slip.delimiter)
179 #---------- packet queue ----------
182 def __init__(self, max_queue_time):
183 self._max_queue_time = max_queue_time
184 self._pq = collections.deque() # packets
186 def append(self, packet):
187 self._pq.append((time.monotonic(), packet))
191 try: (queuetime, packet) = self._pq[0]
192 except IndexError: return False
194 age = time.monotonic() - queuetime
195 if age > self._max_queue_time:
196 # strip old packets off the front
202 def process(self, sizequery, moredata, max_batch):
203 # sizequery() should return size of batch so far
204 # moredata(s) should add s to batch
206 try: (dummy, packet) = self._pq[0]
207 except IndexError: break
209 encoded = slip.encode(packet)
213 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
215 moredata(slip.delimiter)
220 #---------- error handling ----------
223 print('CRASH ', err, file=sys.stderr)
225 except twisted.internet.error.ReactorNotRunning: pass
227 def crash_on_defer(defer):
228 defer.addErrback(lambda err: crash(err))
230 def crash_on_critical(event):
231 if event.get('log_level') >= LogLevel.critical:
232 crash(twisted.logger.formatEvent(event))
234 #---------- config processing ----------
236 def process_cfg_common_always():
238 c.mtu = cfg.get('virtual','mtu')
240 def process_cfg_ipif(section, varmap):
242 try: v = getattr(c, s)
243 except AttributeError: continue
248 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
250 def process_cfg_network():
251 c.network = ipnetwork(cfg.get('virtual','network'))
252 if c.network.num_addresses < 3 + 2:
253 raise ValueError('network needs at least 2^3 addresses')
255 def process_cfg_server():
257 c.server = cfg.get('virtual','server')
258 except NoOptionError:
259 process_cfg_network()
260 c.server = next(c.network.hosts())
263 def __init__(self, port, addrspec):
267 self.addr = ipaddress.IPv4Address(addrspec)
268 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
270 except AddressValueError:
271 self.addr = ipaddress.IPv6Address(addrspec)
272 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
273 self._inurl = b'[%s]'
274 def make_endpoint(self):
275 return self._endpointfactory(reactor, self.port, self.addr)
277 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
278 if self.port != 80: url += b':%d' % self.port
282 def process_cfg_saddrs():
283 try: port = cfg.getint('server','port')
284 except NoOptionError: port = 80
287 for addrspec in cfg.get('server','addrs').split():
288 sa = ServerAddr(port, addrspec)
291 def process_cfg_clients(constructor):
293 for cs in cfg.sections():
294 if not (':' in cs or '.' in cs): continue
296 pw = cfg.get(cs, 'password')
297 pw = pw.encode('utf-8')
298 constructor(ci,cs,pw)
300 #---------- startup ----------
302 def common_startup():
303 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
305 optparser.add_option('-c', '--config', dest='configfile',
306 default='/etc/hippotat/config')
307 (opts, args) = optparser.parse_args()
308 if len(args): optparser.error('no non-option arguments please')
310 re = regexp.compile('#.*')
311 cfg.read_string(re.sub('', defcfg))
312 cfg.read(opts.configfile)
316 print('CRASHED (end)', file=sys.stderr)