From 88487243bc0be906c63258005df75b96bc8165a5 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Tue, 21 Mar 2017 20:40:28 +0000 Subject: [PATCH] wip, config reorg --- client | 22 +++++- hippotat/__init__.py | 58 ++++++++++++++ server | 183 ++++++++++++++++++++----------------------- test.cfg | 2 +- 4 files changed, 162 insertions(+), 103 deletions(-) diff --git a/client b/client index a77600e..b0250de 100755 --- a/client +++ b/client @@ -8,7 +8,7 @@ max_requests_outstanding = 4 [virtual] mtu = 1500 -# [host] } maybe computed from `network' (see server defaults) +# [server] } maybe computed from `network' (see server defaults) [server] # url } maybe computed from `addrs' and `port' (see server defaults) @@ -17,12 +17,30 @@ mtu = 1500 # password = ''' +c.clientv = None +c.max_outstanding = + +def set_client(ci,cs,pw): + global password + assert(c.clientv is None) + c.clientv = ci + c.max_outstanding = cfg.getint(cs, 'max_requests_outstanding') + password = pw + def process_cfg(): global url global max_requests_outstanding process_cfg_common_always() - + process_cfg_server() + + try: + c.url = cfg.get('server','url') + except NoOptionError: + process_cfg_saddrs() + sa = c.saddrs[1].url() + + process_cfg_clients(set_client) common_startup(defcfg) process_cfg() diff --git a/hippotat/__init__.py b/hippotat/__init__.py index 8219238..6f0d3e8 100644 --- a/hippotat/__init__.py +++ b/hippotat/__init__.py @@ -144,6 +144,64 @@ def process_cfg_common_always(): global mtu c.mtu = cfg.get('virtual','mtu') +def process_cfg_ipif(section, varmap): + for d, s in varmap: + try: v = getattr(c, s) + except KeyError: pass + setattr(c, d, v) + + print(repr(c)) + + c.ipif_command = cfg.get(section,'ipif', vars=c.__dict__) + +def process_cfg_network(): + c.network = ipnetwork(cfg.get('virtual','network')) + if c.network.num_addresses < 3 + 2: + raise ValueError('network needs at least 2^3 addresses') + +def process_cfg_server(): + try: + c.server = cfg.get('virtual','server') + except NoOptionError: + process_cfg_network() + c.server = next(c.network.hosts()) + +class ServerAddr(): + def __init__(self, port, addrspec): + self.port = port + # also self.addr + try: + self.addr = ipaddress.IPv4Address(addrspec) + self._endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint + self._inurl = '%s' + except AddressValueError: + self.addr = ipaddress.IPv6Address(addrspec) + self._endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint + self._inurl = '[%s]' + def make_endpoint(self): + return self._endpointfactory(reactor, self.port, self.addr) + def url(self): + url = 'http://' + (self._inurl % self.addr) + if self.port != 80: url += ':%d' % self.port + url += '/' + return url + +def process_cfg_saddrs(): + port = cfg.getint('server','port') + + c.saddrs = [ ] + for addrspec in cfg.get('server','addrs').split(): + sa = ServerAddr(port, addrspec) + c.saddrs.append(sa) + +def process_cfg_clients(constructor): + c.clients = [ ] + for cs in cfg.sections(): + if not (':' in cs or '.' in cs): continue + ci = ipaddr(cs) + pw = cfg.get(cs, 'password') + constructor(ci,cs,pw) + #---------- startup ---------- def common_startup(defcfg): diff --git a/server b/server index a34cc3c..fc149c0 100755 --- a/server +++ b/server @@ -27,7 +27,7 @@ ipif = userv root ipif %(local)s,%(peer)s,%(mtu)s,slip %(rnets)s [virtual] mtu = 1500 # network -# [host] +# [server] # [relay] [server] @@ -51,7 +51,7 @@ def route(packet, saddr, daddr): dclient.queue_outbound(packet) elif saddr.is_link_local or daddr.is_link_local: log_discard(packet, saddr, daddr, 'link-local') - elif daddr == c.host or daddr not in c.network: + elif daddr == c.server or daddr not in c.network: print('TRACE INBOUND ', saddr, daddr, packet) queue_inbound(packet) elif daddr == relay: @@ -67,11 +67,11 @@ def log_discard(packet, saddr, daddr, why): #---------- client ---------- class Client(): - def __init__(self, ip, cs): + def __init__(self, ip, cs, pw): # instance data members self._ip = ip self._cs = cs - self.pw = cfg.get(cs, 'password') + self.pw = pw self._rq = collections.deque() # requests # self._pq = PacketQueue(...) # plus from config: @@ -79,6 +79,10 @@ class Client(): # .max_queue_time # .max_request_time # .target_requests_outstanding + + if ip not in c.network: + raise ValueError('client %s not in network' % ip) + for k in ('max_batch_down','max_queue_time','max_request_time', 'target_requests_outstanding'): req = cfg.getint(cs, k) @@ -86,69 +90,73 @@ class Client(): self.__dict__[k] = min(req, limit) self._pq = PacketQueue(self.max_queue_time) - def process_arriving_data(self, d): - for packet in slip.decode(d): - (saddr, daddr) = packet_addrs(packet) - if saddr != self._ip: - raise ValueError('wrong source address %s' % saddr) - route(packet, saddr, daddr) - - def _req_cancel(self, request): - request.finish() + if ip in clients: + raise ValueError('multiple client cfg sections for %s' % ip) + clients[ip] = self + + def process_arriving_data(self, d): + for packet in slip.decode(d): + (saddr, daddr) = packet_addrs(packet) + if saddr != self._ip: + raise ValueError('wrong source address %s' % saddr) + route(packet, saddr, daddr) + + def _req_cancel(self, request): + request.finish() + + def _req_error(self, err, request): + self._req_cancel(request) + + def queue_outbound(self, packet): + self._pq.append(packet) + + def http_request(self, request): + request.setHeader('Content-Type','application/octet-stream') + reactor.callLater(self.max_request_time, self._req_cancel, request) + request.notifyFinish().addErrback(self._req_error, request) + self._rq.append(request) + self._check_outbound() + + def _check_outbound(self): + while True: + try: request = self._rq[0] + except IndexError: request = None + if request and request.finished: + self._rq.popleft() + continue + + if not self._pq.nonempty(): + # no packets, oh well + continue + + if request is None: + # no request + break + + # request, and also some non-expired packets + while True: + packet = self.pq.popleft() + if packet is None: break - def _req_error(self, err, request): - self._req_cancel(request) + encoded = slip.encode(packet) + + if request.sentLength > 0: + if (request.sentLength + len(slip.delimiter) + + len(encoded) > self.max_batch_down): + break + request.write(slip.delimiter) - def queue_outbound(self, packet): - self._pq.append(packet) + request.write(encoded) + self._pq.popLeft() - def http_request(self, request): - request.setHeader('Content-Type','application/octet-stream') - reactor.callLater(self.max_request_time, self._req_cancel, request) - request.notifyFinish().addErrback(self._req_error, request) - self._rq.append(request) - self._check_outbound() + assert(request.sentLength) + self._rq.popLeft() + request.finish() + # round again, looking for more to do - def _check_outbound(self): - while True: - try: request = self._rq[0] - except IndexError: request = None - if request and request.finished: - self._rq.popleft() - continue - - if not self._pq.nonempty(): - # no packets, oh well - continue - - if request is None: - # no request - break - - # request, and also some non-expired packets - while True: - packet = self.pq.popleft() - if packet is None: break - - encoded = slip.encode(packet) - - if request.sentLength > 0: - if (request.sentLength + len(slip.delimiter) - + len(encoded) > self.max_batch_down): - break - request.write(slip.delimiter) - - request.write(encoded) - self._pq.popLeft() - - assert(request.sentLength) - self._rq.popLeft() - request.finish() - # round again, looking for more to do - - while len(self._rq) > self.target_requests_outstanding: - request = self._rq.popleft() - request.finish() + while len(self._rq) > self.target_requests_outstanding: + request = self._rq.popleft() + request.finish() class IphttpResource(twisted.web.resource.Resource): isLeaf = True @@ -181,57 +189,32 @@ class IphttpResource(twisted.web.resource.Resource): def start_http(): resource = IphttpResource() site = twisted.web.server.Site(resource) - for addrspec in cfg.get('server','addrs').split(): - try: - addr = ipaddress.IPv4Address(addrspec) - endpointfactory = twisted.internet.endpoints.TCP4ServerEndpoint - except AddressValueError: - addr = ipaddress.IPv6Address(addrspec) - endpointfactory = twisted.internet.endpoints.TCP6ServerEndpoint - ep = endpointfactory(reactor, cfg.getint('server','port'), addr) + for sa in c.saddrs: + ep = sa.make_endpoint() crash_on_defer(ep.listen(site)) #---------- config and setup ---------- def process_cfg(): process_cfg_common_always() - - c.network = ipnetwork(cfg.get('virtual','network')) - if c.network.num_addresses < 3 + 2: - raise ValueError('network needs at least 2^3 addresses') - - try: - c.host = cfg.get('virtual','host') - except NoOptionError: - c.host = next(c.network.hosts()) + process_cfg_server() + process_cfg_network() try: c.relay = cfg.get('virtual','relay') except NoOptionError: for search in c.network.hosts(): - if search == c.host: continue + if search == c.server: continue c.relay = search break - for cs in cfg.sections(): - if not (':' in cs or '.' in cs): continue - ci = ipaddr(cs) - if ci not in c.network: - raise ValueError('client %s not in network' % ci) - if ci in clients: - raise ValueError('multiple client cfg sections for %s' % ci) - clients[ci] = Client(ci, cs) - - for d, s in (('local', 'host'), - ('peer','relay'), - ('rnets','network')): - try: v = getattr(c, s) - except KeyError: pass - setattr(c, d, v) - - print(repr(c)) - - c.ipif_command = cfg.get('server','ipif', vars=c.__dict__) + process_cfg_saddrs() + process_cfg_clients(Client) + + process_cfg_ipif('server', + (('local','server'), + ('peer','relay'), + ('rnets','network'))) common_startup(defcfg) process_cfg() diff --git a/test.cfg b/test.cfg index 51bd968..62afbf0 100644 --- a/test.cfg +++ b/test.cfg @@ -1,6 +1,6 @@ [server] addrs = 127.0.0.1 -ipif = PATH=/usr/local/sbin:/sbin:/usr/sbin:$PATH really /home/ian/things/Userv/userv-utils.git/ipif/service \* -- %(host)s,%(relay)s,%(mtu)s,slip %(network)s +ipif = PATH=/usr/local/sbin:/sbin:/usr/sbin:$PATH really /home/ian/things/Userv/userv-utils.git/ipif/service \* -- %(local)s,%(relay)s,%(mtu)s,slip %(network)s [virtual] network = 192.0.2.0/24 -- 2.30.2