4 signal.signal(signal.SIGINT, signal.SIG_DFL)
9 from twisted.internet import reactor
10 from twisted.logger import LogLevel
11 import twisted.internet.endpoints
14 from ipaddress import AddressValueError
16 from optparse import OptionParser
17 from configparser import ConfigParser
18 from configparser import NoOptionError
24 import hippotat.slip as slip
29 max_batch_down = 65536 # used by server, subject to [limits]
30 max_queue_time = 10 # used by server, subject to [limits]
31 max_request_time = 54 # used by server, subject to [limits]
32 target_requests_outstanding = 3 # must match; subject to [limits] on server
33 max_requests_outstanding = 4 # used by client
34 max_batch_up = 4000 # used by client
36 #[server] or [<client>] overrides
37 ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s
38 # extra interpolations: %(local)s %(peer)s %(rnet)s
39 # obtained on server [virtual]server [virtual]relay [virtual]network
40 # from on client <client> [virtual]server [virtual]routes
45 # network = <prefix>/<len> # mandatory for server
46 # server = <ipaddr> # used by both, default is computed from `network'
47 # relay = <ipaddr> # used by server, default from `network' and `server'
48 # default server is first host in network
49 # default relay is first host which is not server
52 # addrs = 127.0.0.1 ::1 # mandatory for server
53 port = 80 # used by server
54 # url # used by client; default from first `addrs' and `port'
56 # [<client-ip4-or-ipv6-address>]
57 # password = <password> # used by both, must match
60 max_batch_down = 262144 # used by server
61 max_queue_time = 121 # used by server
62 max_request_time = 121 # used by server
63 target_requests_outstanding = 10 # used by server
66 # these need to be defined here so that they can be imported by import *
68 optparser = OptionParser()
71 def __init__(self, d = { }):
74 return 'ConfigResults('+repr(self.__dict__)+')'
78 def log_discard(packet, saddr, daddr, why):
79 print('DROP ', saddr, daddr, why)
80 # syslog.syslog(syslog.LOG_DEBUG,
81 # 'discarded packet %s -> %s (%s)' % (saddr, daddr, why))
83 #---------- packet parsing ----------
85 def packet_addrs(packet):
86 version = packet[0] >> 4
90 factory = ipaddress.IPv4Address
94 factory = ipaddress.IPv6Address
96 raise ValueError('unsupported IP version %d' % version)
97 saddr = factory(packet[ saddroff : saddroff + addrlen ])
98 daddr = factory(packet[ saddroff + addrlen : saddroff + addrlen*2 ])
101 #---------- address handling ----------
105 r = ipaddress.IPv4Address(input)
106 except AddressValueError:
107 r = ipaddress.IPv6Address(input)
110 def ipnetwork(input):
112 r = ipaddress.IPv4Network(input)
113 except NetworkValueError:
114 r = ipaddress.IPv6Network(input)
117 #---------- ipif (SLIP) subprocess ----------
119 class _IpifProcessProtocol(twisted.internet.protocol.ProcessProtocol):
120 def __init__(self, router):
122 self._router = router
123 def connectionMade(self): pass
124 def outReceived(self, data):
125 #print('IPIF-GOT ', repr(data))
127 packets = slip.decode(self._buffer)
128 self._buffer = packets.pop()
129 for packet in packets:
130 if not len(packet): continue
131 (saddr, daddr) = packet_addrs(packet)
132 if saddr.is_link_local or daddr.is_link_local:
133 log_discard(packet, saddr, daddr, 'link-local')
135 self._router(packet, saddr, daddr)
136 def processEnded(self, status):
137 status.raiseException()
139 def start_ipif(command, router):
141 ipif = _IpifProcessProtocol(router)
142 reactor.spawnProcess(ipif,
143 '/bin/sh',['sh','-xc', command],
144 childFDs={0:'w', 1:'r', 2:2})
146 def queue_inbound(packet):
147 ipif.transport.write(slip.delimiter)
148 ipif.transport.write(slip.encode(packet))
149 ipif.transport.write(slip.delimiter)
151 #---------- packet queue ----------
154 def __init__(self, max_queue_time):
155 self._max_queue_time = max_queue_time
156 self._pq = collections.deque() # packets
158 def append(self, packet):
159 self._pq.append((time.monotonic(), packet))
163 try: (queuetime, packet) = self._pq[0]
164 except IndexError: return False
166 age = time.monotonic() - queuetime
167 if age > self.max_queue_time:
168 # strip old packets off the front
175 # caller must have checked nonempty
176 try: (dummy, packet) = self._pq[0]
177 except IndexError: return None
180 #---------- error handling ----------
183 print('CRASH ', err, file=sys.stderr)
185 except twisted.internet.error.ReactorNotRunning: pass
187 def crash_on_defer(defer):
188 defer.addErrback(lambda err: crash(err))
190 def crash_on_critical(event):
191 if event.get('log_level') >= LogLevel.critical:
192 crash(twisted.logger.formatEvent(event))
194 #---------- config processing ----------
196 def process_cfg_common_always():
198 c.mtu = cfg.get('virtual','mtu')
200 def process_cfg_ipif(section, varmap):
202 try: v = getattr(c, s)
203 except AttributeError: continue
208 c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__)
210 def process_cfg_network():
211 c.network = ipnetwork(cfg.get('virtual','network'))
212 if c.network.num_addresses < 3 + 2:
213 raise ValueError('network needs at least 2^3 addresses')
215 def process_cfg_server():
217 c.server = cfg.get('virtual','server')
218 except NoOptionError:
219 process_cfg_network()
220 c.server = next(c.network.hosts())
223 def __init__(self, port, addrspec):
227 self.addr = ipaddress.IPv4Address(addrspec)
228 self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint
230 except AddressValueError:
231 self.addr = ipaddress.IPv6Address(addrspec)
232 self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint
234 def make_endpoint(self):
235 return self._endpointfactory(reactor, self.port, self.addr)
237 url = 'http://' + (self._inurl % self.addr)
238 if self.port != 80: url += ':%d' % self.port
242 def process_cfg_saddrs():
243 try: port = cfg.getint('server','port')
244 except NoOptionError: port = 80
247 for addrspec in cfg.get('server','addrs').split():
248 sa = ServerAddr(port, addrspec)
251 def process_cfg_clients(constructor):
253 for cs in cfg.sections():
254 if not (':' in cs or '.' in cs): continue
256 pw = cfg.get(cs, 'password')
257 constructor(ci,cs,pw)
259 #---------- startup ----------
261 def common_startup():
262 twisted.logger.globalLogPublisher.addObserver(crash_on_critical)
264 optparser.add_option('-c', '--config', dest='configfile',
265 default='/etc/hippotat/config')
266 (opts, args) = optparser.parse_args()
267 if len(args): optparser.error('no non-option arguments please')
269 re = regexp.compile('#.*')
270 cfg.read_string(re.sub('', defcfg))
271 cfg.read(opts.configfile)
275 print('CRASHED (end)', file=sys.stderr)