4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from twisted.internet import reactor
10 from twisted.logger import LogLevel
11 import twisted.internet.endpoints
14 from ipaddress import AddressValueError
16 from optparse import OptionParser
17 from configparser import ConfigParser
18 from configparser import NoOptionError
24 import hippotat.slip as slip
29 max_batch_down = 65536 # used by server, subject to [limits]
30 max_queue_time = 10 # used by server, subject to [limits]
31 max_request_time = 54 # used by server, subject to [limits]
32 target_requests_outstanding = 3 # must match; subject to [limits] on server
33 max_requests_outstanding = 4 # used by client
34 max_batch_up = 4000 # used by client
35 http_timeout = 30 # used by client
37 #[server] or [<client>] overrides
38 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
39 # extra interpolations: %(local)s %(peer)s %(rnet)s
40 # obtained on server [virtual]server [virtual]relay [virtual]network
41 # from on client <client> [virtual]server [virtual]routes
46 # network = <prefix>/<len> # mandatory for server
47 # server = <ipaddr> # used by both, default is computed from `network'
48 # relay = <ipaddr> # used by server, default from `network' and `server'
49 # default server is first host in network
50 # default relay is first host which is not server
53 # addrs = 127.0.0.1 ::1 # mandatory for server
54 port = 80 # used by server
55 # url # used by client; default from first `addrs' and `port'
57 # [<client-ip4-or-ipv6-address>]
58 # password = <password> # used by both, must match
61 max_batch_down = 262144 # used by server
62 max_queue_time = 121 # used by server
63 max_request_time = 121 # used by server
64 target_requests_outstanding = 10 # used by server
67 # these need to be defined here so that they can be imported by import *
69 optparser = OptionParser()
71 _mimetrans = str.maketrans(b'-'+slip.esc, slip.esc+'-')
72 def mime_translate(s):
73 # SLIP-encoded packets cannot contain ESC ESC.
74 # Swap `-' and ESC. The result cannot contain `--'
75 return s.translate(_mimetrans)
78 def __init__(self, d = { }):
81 return 'ConfigResults('+repr(self.__dict__)+')'
85 def log_discard(packet, saddr, daddr, why):
86 print('DROP ', saddr, daddr, why)
87 # syslog.syslog(syslog.LOG_DEBUG,
88 # 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
90 #---------- packet parsing ----------
92 def packet_addrs(packet):
93 version = packet[0] >> 4
97 factory = ipaddress.IPv4Address
101 factory = ipaddress.IPv6Address
103 raise ValueError('unsupported IP version %d' % version)
104 saddr = factory(packet[ saddroff : saddroff + addrlen ])
105 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
106 return (saddr, daddr)
108 #---------- address handling ----------
112 r = ipaddress.IPv4Address(input)
113 except AddressValueError:
114 r = ipaddress.IPv6Address(input)
117 def ipnetwork(input):
119 r = ipaddress.IPv4Network(input)
120 except NetworkValueError:
121 r = ipaddress.IPv6Network(input)
124 #---------- ipif (SLIP) subprocess ----------
126 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
127 def __init__(self, router):
129 self._router = router
130 def connectionMade(self): pass
131 def outReceived(self, data):
132 #print('IPIF-GOT ', repr(data))
134 packets = slip.decode(self._buffer)
135 self._buffer = packets.pop()
136 for packet in packets:
137 if not len(packet): continue
138 (saddr, daddr) = packet_addrs(packet)
139 if saddr.is_link_local or daddr.is_link_local:
140 log_discard(packet, saddr, daddr, 'link-local')
142 self._router(packet, saddr, daddr)
143 def processEnded(self, status):
144 status.raiseException()
146 def start_ipif(command, router):
148 ipif = _IpifProcessProtocol(router)
149 reactor.spawnProcess(ipif,
150 '/bin/sh',['sh','-xc', command],
151 childFDs={0:'w', 1:'r', 2:2})
153 def queue_inbound(packet):
154 ipif.transport.write(slip.delimiter)
155 ipif.transport.write(slip.encode(packet))
156 ipif.transport.write(slip.delimiter)
158 #---------- packet queue ----------
161 def __init__(self, max_queue_time):
162 self._max_queue_time = max_queue_time
163 self._pq = collections.deque() # packets
165 def append(self, packet):
166 self._pq.append((time.monotonic(), packet))
170 try: (queuetime, packet) = self._pq[0]
171 except IndexError: return False
173 age = time.monotonic() - queuetime
174 if age > self.max_queue_time:
175 # strip old packets off the front
181 def process(self, sizequery, moredata, max_batch):
182 # sizequery() should return size of batch so far
183 # moredata(s) should add s to batch
185 try: (dummy, packet) = self._pq[0]
186 except IndexError: break
188 encoded = slip.encode(packet)
192 if sofar + len(slip.delimiter) + len(encoded) > max_batch:
194 moredata(slip.delimiter)
199 #---------- error handling ----------
202 print('CRASH ', err, file=sys.stderr)
204 except twisted.internet.error.ReactorNotRunning: pass
206 def crash_on_defer(defer):
207 defer.addErrback(lambda err: crash(err))
209 vdef crash_on_critical(event):
210 if event.get('log_level') >= LogLevel.critical:
211 crash(twisted.logger.formatEvent(event))
213 #---------- config processing ----------
215 def process_cfg_common_always():
217 c.mtu = cfg.get('virtual','mtu')
219 def process_cfg_ipif(section, varmap):
221 try: v = getattr(c, s)
222 except AttributeError: continue
227 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
229 def process_cfg_network():
230 c.network = ipnetwork(cfg.get('virtual','network'))
231 if c.network.num_addresses < 3 + 2:
232 raise ValueError('network needs at least 2^3 addresses')
234 def process_cfg_server():
236 c.server = cfg.get('virtual','server')
237 except NoOptionError:
238 process_cfg_network()
239 c.server = next(c.network.hosts())
242 def __init__(self, port, addrspec):
246 self.addr = ipaddress.IPv4Address(addrspec)
247 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
249 except AddressValueError:
250 self.addr = ipaddress.IPv6Address(addrspec)
251 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
253 def make_endpoint(self):
254 return self._endpointfactory(reactor, self.port, self.addr)
256 url = 'http://' + (self._inurl % self.addr)
257 if self.port != 80: url += ':%d' % self.port
261 def process_cfg_saddrs():
262 try: port = cfg.getint('server','port')
263 except NoOptionError: port = 80
266 for addrspec in cfg.get('server','addrs').split():
267 sa = ServerAddr(port, addrspec)
270 def process_cfg_clients(constructor):
272 for cs in cfg.sections():
273 if not (':' in cs or '.' in cs): continue
275 pw = cfg.get(cs, 'password')
276 constructor(ci,cs,pw)
278 #---------- startup ----------
280 def common_startup():
281 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
283 optparser.add_option('-c', '--config', dest='configfile',
284 default='/etc/hippotat/config')
285 (opts, args) = optparser.parse_args()
286 if len(args): optparser.error('no non-option arguments please')
288 re = regexp.compile('#.*')
289 cfg.read_string(re.sub('', defcfg))
290 cfg.read(opts.configfile)
294 print('CRASHED (end)', file=sys.stderr)