4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from twisted.internet import reactor
10 from twisted.logger import LogLevel
11 import twisted.internet.endpoints
14 from ipaddress import AddressValueError
16 from optparse import OptionParser
17 from configparser import ConfigParser
18 from configparser import NoOptionError
25 import hippotat.slip as slip
30 max_batch_down = 65536 # used by server, subject to [limits]
31 max_queue_time = 10 # used by server, subject to [limits]
32 max_request_time = 54 # used by server, subject to [limits]
33 target_requests_outstanding = 3 # must match; subject to [limits] on server
34 max_requests_outstanding = 4 # used by client
35 max_batch_up = 4000 # used by client
36 http_timeout = 30 # used by client
38 #[server] or [<client>] overrides
39 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
40 # extra interpolations: %(local)s %(peer)s %(rnet)s
41 # obtained on server [virtual]server [virtual]relay [virtual]network
42 # from on client <client> [virtual]server [virtual]routes
47 # network = <prefix>/<len> # mandatory for server
48 # server = <ipaddr> # used by both, default is computed from `network'
49 # relay = <ipaddr> # used by server, default from `network' and `server'
50 # default server is first host in network
51 # default relay is first host which is not server
54 # addrs = 127.0.0.1 ::1 # mandatory for server
55 port = 80 # used by server
56 # url # used by client; default from first `addrs' and `port'
58 # [<client-ip4-or-ipv6-address>]
59 # password = <password> # used by both, must match
62 max_batch_down = 262144 # used by server
63 max_queue_time = 121 # used by server
64 max_request_time = 121 # used by server
65 target_requests_outstanding = 10 # used by server
68 # these need to be defined here so that they can be imported by import *
70 optparser = OptionParser()
72 _mimetrans = bytes.maketrans(b'-'+slip.esc, slip.esc+b'-')
73 def mime_translate(s):
74 # SLIP-encoded packets cannot contain ESC ESC.
75 # Swap `-' and ESC. The result cannot contain `--'
76 return s.translate(_mimetrans)
79 def __init__(self, d = { }):
82 return 'ConfigResults('+repr(self.__dict__)+')'
86 def log_discard(packet, saddr, daddr, why):
87 print('DROP ', saddr, daddr, why)
88 # syslog.syslog(syslog.LOG_DEBUG,
89 # 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
91 #---------- packet parsing ----------
93 def packet_addrs(packet):
94 version = packet[0] >> 4
98 factory = ipaddress.IPv4Address
102 factory = ipaddress.IPv6Address
104 raise ValueError('unsupported IP version %d' % version)
105 saddr = factory(packet[ saddroff : saddroff + addrlen ])
106 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
107 return (saddr, daddr)
109 #---------- address handling ----------
113 r = ipaddress.IPv4Address(input)
114 except AddressValueError:
115 r = ipaddress.IPv6Address(input)
118 def ipnetwork(input):
120 r = ipaddress.IPv4Network(input)
121 except NetworkValueError:
122 r = ipaddress.IPv6Network(input)
125 #---------- ipif (SLIP) subprocess ----------
127 class SlipStreamDecoder():
128 def __init__(self, on_packet):
129 # we will call packet(<packet>)
131 self._on_packet = on_packet
133 def inputdata(self, data):
134 #print('SLIP-GOT ', repr(data))
136 packets = slip.decode(self._buffer)
137 self._buffer = packets.pop()
138 for packet in packets:
139 self._maybe_packet(packet)
141 def _maybe_packet(self, packet):
143 self._on_packet(packet)
146 self._maybe_packet(self._buffer)
149 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
150 def __init__(self, router):
151 self._router = router
152 self._decoder = SlipStreamDecoder(self.slip_on_packet)
153 def connectionMade(self): pass
154 def outReceived(self, data):
155 self._decoder.inputdata(data)
156 def slip_on_packet(self, packet):
157 (saddr, daddr) = packet_addrs(packet)
158 if saddr.is_link_local or daddr.is_link_local:
159 log_discard(packet, saddr, daddr, 'link-local')
161 self._router(packet, saddr, daddr)
162 def processEnded(self, status):
163 status.raiseException()
165 def start_ipif(command, router):
167 ipif = _IpifProcessProtocol(router)
168 reactor.spawnProcess(ipif,
169 '/bin/sh',['sh','-xc', command],
170 childFDs={0:'w', 1:'r', 2:2},
173 def queue_inbound(packet):
174 ipif.transport.write(slip.delimiter)
175 ipif.transport.write(slip.encode(packet))
176 ipif.transport.write(slip.delimiter)
178 #---------- packet queue ----------
181 def __init__(self, max_queue_time):
182 self._max_queue_time = max_queue_time
183 self._pq = collections.deque() # packets
185 def append(self, packet):
186 self._pq.append((time.monotonic(), packet))
190 try: (queuetime, packet) = self._pq[0]
191 except IndexError: return False
193 age = time.monotonic() - queuetime
194 if age > self._max_queue_time:
195 # strip old packets off the front
201 def process(self, sizequery, moredata, max_batch):
202 # sizequery() should return size of batch so far
203 # moredata(s) should add s to batch
205 try: (dummy, packet) = self._pq[0]
206 except IndexError: break
208 encoded = slip.encode(packet)
212 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
214 moredata(slip.delimiter)
219 #---------- error handling ----------
222 print('CRASH ', err, file=sys.stderr)
224 except twisted.internet.error.ReactorNotRunning: pass
226 def crash_on_defer(defer):
227 defer.addErrback(lambda err: crash(err))
229 def crash_on_critical(event):
230 if event.get('log_level') >= LogLevel.critical:
231 crash(twisted.logger.formatEvent(event))
233 #---------- config processing ----------
235 def process_cfg_common_always():
237 c.mtu = cfg.get('virtual','mtu')
239 def process_cfg_ipif(section, varmap):
241 try: v = getattr(c, s)
242 except AttributeError: continue
247 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
249 def process_cfg_network():
250 c.network = ipnetwork(cfg.get('virtual','network'))
251 if c.network.num_addresses < 3 + 2:
252 raise ValueError('network needs at least 2^3 addresses')
254 def process_cfg_server():
256 c.server = cfg.get('virtual','server')
257 except NoOptionError:
258 process_cfg_network()
259 c.server = next(c.network.hosts())
262 def __init__(self, port, addrspec):
266 self.addr = ipaddress.IPv4Address(addrspec)
267 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
269 except AddressValueError:
270 self.addr = ipaddress.IPv6Address(addrspec)
271 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
272 self._inurl = b'[%s]'
273 def make_endpoint(self):
274 return self._endpointfactory(reactor, self.port, self.addr)
276 url = b'http://' + (self._inurl % str(self.addr).encode('ascii'))
277 if self.port != 80: url += b':%d' % self.port
281 def process_cfg_saddrs():
282 try: port = cfg.getint('server','port')
283 except NoOptionError: port = 80
286 for addrspec in cfg.get('server','addrs').split():
287 sa = ServerAddr(port, addrspec)
290 def process_cfg_clients(constructor):
292 for cs in cfg.sections():
293 if not (':' in cs or '.' in cs): continue
295 pw = cfg.get(cs, 'password')
296 pw = pw.encode('utf-8')
297 constructor(ci,cs,pw)
299 #---------- startup ----------
301 def common_startup():
302 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
304 optparser.add_option('-c', '--config', dest='configfile',
305 default='/etc/hippotat/config')
306 (opts, args) = optparser.parse_args()
307 if len(args): optparser.error('no non-option arguments please')
309 re = regexp.compile('#.*')
310 cfg.read_string(re.sub('', defcfg))
311 cfg.read(opts.configfile)
315 print('CRASHED (end)', file=sys.stderr)