X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~mdw/git/tripe/blobdiff_plain/48b845698dcf3ec4b9f8b9f1848a157f0245d7cc..844abf94571ffdd3e113c518947073ff5375d748:/svc/connect.in diff --git a/svc/connect.in b/svc/connect.in index 37241bbc..04713967 100644 --- a/svc/connect.in +++ b/svc/connect.in @@ -1,9 +1,9 @@ #! @PYTHON@ ### -*-python-*- ### -### Service for establishing dynamic connections +### Connect to remote peers, and keep track of them ### -### (c) 2006 Straylight/Edgeware +### (c) 2007 Straylight/Edgeware ### ###----- Licensing notice --------------------------------------------------- @@ -32,14 +32,243 @@ VERSION = '@VERSION@' from optparse import OptionParser import tripe as T import os as OS +import signal as SIG +import errno as E import cdb as CDB import mLib as M +import re as RX from time import time +import subprocess as PROC S = T.svcmgr ###-------------------------------------------------------------------------- -### Main service machinery. +### Running auxiliary commands. + +class SelLineQueue (M.SelLineBuffer): + """Glues the select-line-buffer into the coroutine queue system.""" + + def __new__(cls, file, queue, tag, kind): + """See __init__ for documentation.""" + return M.SelLineBuffer.__new__(cls, file.fileno()) + + def __init__(me, file, queue, tag, kind): + """ + Initialize a new line-reading adaptor. + + The adaptor reads lines from FILE. Each line is inserted as a message of + the stated KIND, bearing the TAG, into the QUEUE. End-of-file is + represented as None. + """ + me._q = queue + me._file = file + me._tag = tag + me._kind = kind + me.enable() + + @T._callback + def line(me, line): + me._q.put((me._tag, me._kind, line)) + + @T._callback + def eof(me): + me.disable() + me._q.put((me._tag, me._kind, None)) + +class ErrorWatch (T.Coroutine): + """ + An object which watches stderr streams for errors and converts them into + warnings of the form + + WARN connect INFO stderr LINE + + The INFO is a list of tokens associated with the file when it was + registered. + + Usually there is a single ErrorWatch object, called errorwatch. + """ + + def __init__(me): + """Initialization: there are no arguments.""" + T.Coroutine.__init__(me) + me._q = T.Queue() + me._map = {} + me._seq = 1 + + def watch(me, file, info): + """ + Adds FILE to the collection of files to watch. + + INFO will be written in the warning messages from this FILE. Returns a + sequence number which can be used to unregister the file again. + """ + seq = me._seq + me._seq += 1 + me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr') + return seq + + def unwatch(me, seq): + """Stop watching the file with sequence number SEQ.""" + del me._map[seq] + return me + + def run(me): + """ + Coroutine function: read items from the queue and report them. + + Unregisters files automatically when they reach EOF. + """ + while True: + seq, _, line = me._q.get() + if line is None: + me.unwatch(seq) + else: + S.warn(*['connect'] + me._map[seq][0] + ['stderr', line]) + +def dbwatch(): + """ + Coroutine function: wake up every minute and notice changes to the + database. When a change happens, tell the Pinger (q.v.) to rescan its + peers. + """ + cr = T.Coroutine.getcurrent() + main = cr.parent + fw = M.FWatch(opts.cdb) + while True: + timer = M.SelTimer(time() + 60, lambda: cr.switch()) + main.switch() + if fw.update(): + pinger.rescan(False) + S.notify('connect', 'peerdb-update') + +class ChildWatch (M.SelSignal): + """ + An object which watches for specified processes exiting and reports + terminations by writing items of the form (TAG, 'exit', RESULT) to a queue. + + There is usually only one ChildWatch object, called childwatch. + """ + + def __new__(cls): + """Initialize the child-watcher.""" + return M.SelSignal.__new__(cls, SIG.SIGCHLD) + + def __init__(me): + """Initialize the child-watcher.""" + me._pid = {} + me.enable() + + def watch(me, pid, queue, tag): + """ + Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE) + to the QUEUE, where CODE is one of + + * None (successful termination) + * ['exit-nonzero', CODE] (CODE is a string!) + * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string) + * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex) + """ + me._pid[pid] = queue, tag + return me + + def unwatch(me, pid): + """Unregister PID as a child to watch.""" + del me._pid[pid] + return me + + @T._callback + def signalled(me): + """ + Called when child processes exit: collect exit statuses and report + failures. + """ + while True: + try: + pid, status = OS.waitpid(-1, OS.WNOHANG) + except OSError, exc: + if exc.errno == E.ECHILD: + break + if pid == 0: + break + if pid not in me._pid: + continue + queue, tag = me._pid[pid] + if OS.WIFEXITED(status): + exit = OS.WEXITSTATUS(status) + if exit == 0: + code = None + else: + code = ['exit-nonzero', str(exit)] + elif OS.WIFSIGNALED(status): + code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))] + else: + code = ['exit-unknown', hex(status)] + queue.put((tag, 'exit', code)) + +class Command (object): + """ + Represents a running command. + + This class is the main interface to the machery provided by the ChildWatch + and ErrorWatch objects. See also potwatch. + """ + + def __init__(me, info, queue, tag, args, env): + """ + Start a new child process. + + The ARGS are a list of arguments to be given to the child process. The + ENV is either None or a dictionary of environment variable assignments to + override the extant environment. INFO is a list of tokens to be included + in warnings about the child's stderr output. If the child writes a line + to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the + child exits, write (TAG, 'exit', CODE) to the QUEUE. + """ + me._info = info + me._q = queue + me._tag = tag + myenv = OS.environ.copy() + if env: myenv.update(env) + me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1, + stdout = PROC.PIPE, stderr = PROC.PIPE) + me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout') + errorwatch.watch(me._proc.stderr, info) + childwatch.watch(me._proc.pid, queue, tag) + + def __del__(me): + """ + If I've been forgotten then stop watching for termination. + """ + childwatch.unwatch(me._proc.pid) + +def potwatch(what, name, q): + """ + Watch the queue Q for activity as reported by a Command object. + + Information from the process's stdout is reported as + + NOTE WHAT NAME stdout LINE + + abnormal termination is reported as + + WARN WHAT NAME CODE + + where CODE is what the ChildWatch wrote. + """ + eofp = deadp = False + while not deadp or not eofp: + _, kind, more = q.get() + if kind == 'stdout': + if more is None: + eofp = True + else: + S.notify('connect', what, name, 'stdout', more) + elif kind == 'exit': + if more: S.warn('connect', what, name, *more) + deadp = True + +###-------------------------------------------------------------------------- +### Peer database utilities. _magic = ['_magic'] # An object distinct from all others @@ -54,30 +283,382 @@ class Peer (object): one given on the command-line. """ me.name = peer - try: - record = (cdb or CDB.init(opts.cdb))['P' + peer] - except KeyError: - raise T.TripeJobError('unknown-peer', peer) + record = (cdb or CDB.init(opts.cdb))['P' + peer] me.__dict__.update(M.URLDecode(record, semip = True)) - def get(me, key, default = _magic): + def get(me, key, default = _magic, filter = None): """ Get the information stashed under KEY from the peer's database record. If DEFAULT is given, then use it if the database doesn't contain the - necessary information. If no DEFAULT is given, then report an error. + necessary information. If no DEFAULT is given, then report an error. If + a FILTER function is given then apply it to the information from the + database before returning it. """ attr = me.__dict__.get(key, default) if attr is _magic: raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key) + elif filter is not None: + attr = filter(attr) return attr + def has(me, key): + """ + Return whether the peer's database record has the KEY. + """ + return key in me.__dict__ + def list(me): """ Iterate over the available keys in the peer's database record. """ return me.__dict__.iterkeys() +def boolean(value): + """Parse VALUE as a boolean.""" + return value in ['t', 'true', 'y', 'yes', 'on'] + +###-------------------------------------------------------------------------- +### Waking up and watching peers. + +def run_connect(peer, cmd): + """ + Start the job of connecting to the passive PEER. + + The CMD string is a shell command which will connect to the peer (via some + back-channel, say ssh and userv), issue a command + + SVCSUBMIT connect passive [OPTIONS] USER + + and write the resulting challenge to standard error. + """ + q = T.Queue() + cmd = Command(['connect', peer.name], q, 'connect', + ['/bin/sh', '-c', cmd], None) + _, kind, more = q.peek() + if kind == 'stdout': + if more is None: + S.warn('connect', 'connect', peer.name, 'unexpected-eof') + else: + chal = more + S.greet(peer.name, chal) + q.get() + potwatch('connect', peer.name, q) + +def run_disconnect(peer, cmd): + """ + Start the job of disconnecting from a passive PEER. + + The CMD string is a shell command which will disconnect from the peer. + """ + q = T.Queue() + cmd = Command(['disconnect', peer.name], q, 'disconnect', + ['/bin/sh', '-c', cmd], None) + potwatch('disconnect', peer.name, q) + +_pingseq = 0 +class PingPeer (object): + """ + Object representing a peer which we are pinging to ensure that it is still + present. + + PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an + event queue -- which saves us from having an enormous swarm of coroutines + -- but most of the actual work is done here. + + In order to avoid confusion between different PingPeer instances for the + same actual peer, each PingPeer has a sequence number (its `seq' + attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair. + (Using the PingPeer instance itself will prevent garbage collection of + otherwise defunct instances.) + """ + + def __init__(me, pinger, queue, peer, pingnow): + """ + Create a new PingPeer. + + The PINGER is the Pinger object we should send the results to. This is + used when we remove ourselves, if the peer has been explicitly removed. + + The QUEUE is the event queue on which timer and ping-command events + should be written. + + The PEER is a `Peer' object describing the peer. + + If PINGNOW is true, then immediately start pinging the peer. Otherwise + wait until the usual retry interval. + """ + global _pingseq + me._pinger = pinger + me._q = queue + me._peer = peer.name + me.update(peer) + me.seq = _pingseq + _pingseq += 1 + me._failures = 0 + if pingnow: + me._timer = None + me._ping() + else: + me._timer = M.SelTimer(time() + me._every, me._time) + + def update(me, peer): + """ + Refreshes the timer parameters for this peer. We don't, however, + immediately reschedule anything: that will happen next time anything + interesting happens. + """ + if peer is None: peer = Peer(me._peer) + assert peer.name == me._peer + me._every = peer.get('every', filter = T.timespec, default = 120) + me._timeout = peer.get('timeout', filter = T.timespec, default = 10) + me._retries = peer.get('retries', filter = int, default = 5) + me._connectp = peer.has('connect') + return me + + def _ping(me): + """ + Send a ping to the peer; the result is sent to the Pinger's event queue. + """ + S.rawcommand(T.TripeAsynchronousCommand( + me._q, (me._peer, me.seq), + ['EPING', + '-background', S.bgtag(), + '-timeout', str(me._timeout), + '--', + me._peer])) + + def _reconnect(me): + peer = Peer(me._peer) + if me._connectp: + S.warn('connect', 'reconnecting', me._peer) + S.forcekx(me._peer) + T.spawn(run_connect, peer, peer.get('connect')) + me._timer = M.SelTimer(time() + me._every, me._time) + else: + S.kill(me._peer) + + def event(me, code, stuff): + """ + Respond to an event which happened to this peer. + + Timer events indicate that we should start a new ping. (The server has + its own timeout which detects lost packets.) + + We trap unknown-peer responses and detach from the Pinger. + + If the ping fails and we run out of retries, we attempt to restart the + connection. + """ + if code == 'TIMER': + me._failures = 0 + me._ping() + elif code == 'FAIL': + S.notify('connect', 'ping-failed', me._peer, *stuff) + if not stuff: + pass + elif stuff[0] == 'unknown-peer': + me._pinger.kill(me._peer) + elif stuff[0] == 'ping-send-failed': + me._reconnect() + elif code == 'INFO': + if stuff[0] == 'ping-ok': + if me._failures > 0: + S.warn('connect', 'ping-ok', me._peer) + me._timer = M.SelTimer(time() + me._every, me._time) + elif stuff[0] == 'ping-timeout': + me._failures += 1 + S.warn('connect', 'ping-timeout', me._peer, + 'attempt', str(me._failures), 'of', str(me._retries)) + if me._failures < me._retries: + me._ping() + else: + me._reconnect() + elif stuff[0] == 'ping-peer-died': + me._pinger.kill(me._peer) + + @T._callback + def _time(me): + """ + Handle timer callbacks by posting a timeout event on the queue. + """ + me._timer = None + me._q.put(((me._peer, me.seq), 'TIMER', None)) + + def __str__(me): + return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures) + def __repr__(me): + return str(me) + +class Pinger (T.Coroutine): + """ + The Pinger keeps track of the peers which we expect to be connected and + takes action if they seem to stop responding. + + There is usually only one Pinger, called pinger. + + The Pinger maintains a collection of PingPeer objects, and an event queue. + The PingPeers direct the results of their pings, and timer events, to the + event queue. The Pinger's coroutine picks items off the queue and + dispatches them back to the PingPeers as appropriate. + """ + + def __init__(me): + """Initialize the Pinger.""" + T.Coroutine.__init__(me) + me._peers = {} + me._q = T.Queue() + + def run(me): + """ + Coroutine function: reads the pinger queue and sends events to the + PingPeer objects they correspond to. + """ + while True: + (peer, seq), code, stuff = me._q.get() + if peer in me._peers and seq == me._peers[peer].seq: + me._peers[peer].event(code, stuff) + + def add(me, peer, pingnow): + """ + Add PEER to the collection of peers under the Pinger's watchful eye. + The arguments are as for PingPeer: see above. + """ + me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow) + return me + + def kill(me, peername): + """Remove PEER from the peers being watched by the Pinger.""" + del me._peers[peername] + return me + + def rescan(me, startup): + """ + General resynchronization method. + + We scan the list of peers (with connect scripts) known at the server. + Any which are known to the Pinger but aren't known to the server are + removed from our list; newly arrived peers are added. (Note that a peer + can change state here either due to the server sneakily changing its list + without issuing notifications or, more likely, the database changing its + idea of whether a peer is interesting.) Finally, PingPeers which are + still present are prodded to update their timing parameters. + + This method is called once at startup to pick up the peers already + installed, and again by the dbwatcher coroutine when it detects a change + to the database. + """ + if T._debug: print '# rescan peers' + correct = {} + start = {} + for name in S.list(): + try: peer = Peer(name) + except KeyError: continue + if peer.get('watch', filter = boolean, default = False): + if T._debug: print '# interesting peer %s' % peer + correct[peer.name] = start[peer.name] = peer + elif startup: + if T._debug: print '# peer %s ready for adoption' % peer + start[peer.name] = peer + for name, obj in me._peers.items(): + try: + peer = correct[name] + except KeyError: + if T._debug: print '# peer %s vanished' % name + del me._peers[name] + else: + obj.update(peer) + for name, peer in start.iteritems(): + if name in me._peers: continue + if startup: + if T._debug: print '# setting up peer %s' % name + ifname = S.ifname(name) + addr = S.addr(name) + T.defer(adoptpeer, peer, ifname, *addr) + else: + if T._debug: print '# adopting new peer %s' % name + me.add(peer, True) + return me + + def adopted(me): + """ + Returns the list of peers being watched by the Pinger. + """ + return me._peers.keys() + +###-------------------------------------------------------------------------- +### New connections. + +def encode_envvars(env, prefix, vars): + """ + Encode the variables in VARS suitably for including in a program + environment. Lowercase letters in variable names are forced to uppercase; + runs of non-alphanumeric characters are replaced by single underscores; and + the PREFIX is prepended. The resulting variables are written to ENV. + """ + for k, v in vars.iteritems(): + env[prefix + r_bad.sub('_', k.upper())] = v + +r_bad = RX.compile(r'[\W_]+') +def envvars(peer): + """ + Translate the database information for a PEER into a dictionary of + environment variables with plausible upper-case names and a P_ prefix. + Also collect the crypto information into A_ variables. + """ + env = {} + encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()])) + encode_envvars(env, 'A_', S.algs(peer.name)) + return env + +def run_ifupdown(what, peer, *args): + """ + Run the interface up/down script for a peer. + + WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. ARGS is a + list of arguments to pass to the script, in addition to the peer name. + + The command is run and watched in the background by potwatch. + """ + q = T.Queue() + c = Command([what, peer.name], q, what, + M.split(peer.get(what), quotep = True)[0] + + [peer.name] + list(args), + envvars(peer)) + potwatch(what, peer.name, q) + +def adoptpeer(peer, ifname, *addr): + """ + Add a new peer to our collection. + + PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and + ADDR is the list of tokens representing its address. + + We try to bring up the interface and provoke a connection to the peer if + it's passive. + """ + if peer.has('ifup'): + T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \ + .switch('ifup', peer, ifname, *addr) + cmd = peer.get('connect', default = None) + if cmd is not None: + T.Coroutine(run_connect, name = 'connect %s' % peer.name) \ + .switch(peer, cmd) + if peer.get('watch', filter = boolean, default = False): + pinger.add(peer, False) + +def disownpeer(peer): + """Drop the PEER from the Pinger and put its interface to bed.""" + try: pinger.kill(peer) + except KeyError: pass + cmd = peer.get('disconnect', default = None) + if cmd is not None: + T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \ + .switch(peer, cmd) + if peer.has('ifdown'): + T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \ + .switch('ifdown', peer) + def addpeer(peer, addr): """ Process a connect request from a new peer PEER on address ADDR. @@ -87,28 +668,77 @@ def addpeer(peer, addr): if peer.name in S.list(): S.kill(peer.name) try: + booltrue = ['t', 'true', 'y', 'yes', 'on'] S.add(peer.name, tunnel = peer.get('tunnel', None), keepalive = peer.get('keepalive', None), key = peer.get('key', None), - cork = peer.get('cork', 'nil') in ['t', 'true', 'y', 'yes', 'on'], + priv = peer.get('priv', None), + mobile = peer.get('mobile', 'nil') in booltrue, + cork = peer.get('cork', 'nil') in booltrue, *addr) except T.TripeError, exc: raise T.TripeJobError(*exc.args) +## Dictionary mapping challenges to waiting passive-connection coroutines. +chalmap = {} + +def notify(_, code, *rest): + """ + Watch for notifications. + + We trap ADD and KILL notifications, and send them straight to adoptpeer and + disownpeer respectively; and dispatch GREET notifications to the + corresponding waiting coroutine. + """ + if code == 'ADD': + try: p = Peer(rest[0]) + except KeyError: return + adoptpeer(p, *rest[1:]) + elif code == 'KILL': + try: p = Peer(rest[0]) + except KeyError: return + disownpeer(p, *rest[1:]) + elif code == 'GREET': + chal = rest[0] + try: cr = chalmap[chal] + except KeyError: pass + else: cr.switch(rest[1:]) + +###-------------------------------------------------------------------------- +### Command implementation. + +def cmd_kick(name): + """ + kick NAME: Force a new connection attempt for the NAMEd peer. + """ + if name not in pinger.adopted(): + raise T.TripeJobError('peer-not-adopted', name) + try: peer = Peer(name) + except KeyError: raise T.TripeJobError('unknown-peer', name) + T.spawn(run_connect, peer, peer.get('connect')) + +def cmd_adopted(): + """ + adopted: Report a list of adopted peers. + """ + for name in pinger.adopted(): + T.svcinfo(name) + def cmd_active(name): """ active NAME: Handle an active connection request for the peer called NAME. The appropriate address is read from the database automatically. """ - peer = Peer(name) + try: peer = Peer(name) + except KeyError: raise T.TripeJobError('unknown-peer', name) addr = peer.get('peer') if addr == 'PASSIVE': raise T.TripeJobError('passive-peer', name) addpeer(peer, M.split(addr, quotep = True)[0]) -def cmd_list(): +def cmd_listactive(): """ list: Report a list of the available active peers. """ @@ -121,14 +751,20 @@ def cmd_info(name): """ info NAME: Report the database entries for the named peer. """ - peer = Peer(name) + try: peer = Peer(name) + except KeyError: raise T.TripeJobError('unknown-peer', name) items = list(peer.list()) items.sort() for i in items: T.svcinfo('%s=%s' % (i, peer.get(i))) -## Dictionary mapping challenges to waiting passive-connection coroutines. -chalmap = {} +def cmd_userpeer(user): + """ + userpeer USER: Report the peer name for the named user. + """ + try: name = CDB.init(opts.cdb)['U' + user] + except KeyError: raise T.TripeJobError('unknown-user', user) + T.svcinfo(name) def cmd_passive(*args): """ @@ -143,10 +779,10 @@ def cmd_passive(*args): if opt == '-timeout': timeout = T.timespec(op.arg()) user, = op.rest(1, 1) - try: - peer = CDB.init(opts.cdb)['U' + user] - except KeyError: - raise T.TripeJobError('unknown-user', user) + try: name = CDB.init(opts.cdb)['U' + user] + except KeyError: raise T.TripeJobError('unknown-user', user) + try: peer = Peer(name) + except KeyError: raise T.TripeJobError('unknown-peer', name) chal = S.getchal() cr = T.Coroutine.getcurrent() timer = M.SelTimer(time() + timeout, lambda: cr.switch(None)) @@ -156,24 +792,10 @@ def cmd_passive(*args): addr = cr.parent.switch() if addr is None: raise T.TripeJobError('connect-timeout') - addpeer(Peer(peer), addr) + addpeer(peer, addr) finally: del chalmap[chal] -def notify(_, code, *rest): - """ - Watch for notifications. - - In particular, if a GREETing appears quoting a challenge in the chalmap - then wake up the corresponding coroutine. - """ - if code != 'GREET': - return - chal = rest[0] - addr = rest[1:] - if chal in chalmap: - chalmap[chal].switch(addr) - ###-------------------------------------------------------------------------- ### Start up. @@ -181,10 +803,14 @@ def setup(): """ Service setup. - Register the notification-watcher, and add the automatic active peers. + Register the notification watcher, rescan the peers, and add automatic + active peers. """ S.handler['NOTE'] = notify S.watch('+n') + + pinger.rescan(opts.startup) + if opts.startup: cdb = CDB.init(opts.cdb) try: @@ -198,6 +824,18 @@ def setup(): except T.TripeJobError, err: S.warn('connect', 'auto-add-failed', name, *err.args) +def init(): + """ + Initialization to be done before service startup. + """ + global errorwatch, childwatch, pinger + errorwatch = ErrorWatch() + childwatch = ChildWatch() + pinger = Pinger() + T.Coroutine(dbwatch, name = 'dbwatch').switch() + errorwatch.switch() + pinger.switch() + def parse_options(): """ Parse the command-line options. @@ -234,17 +872,20 @@ def parse_options(): return opts ## Service table, for running manually. -service_info = [('connect', VERSION, { +service_info = [('connect', T.VERSION, { + 'adopted': (0, 0, '', cmd_adopted), + 'kick': (1, 1, 'PEER', cmd_kick), 'passive': (1, None, '[OPTIONS] USER', cmd_passive), 'active': (1, 1, 'PEER', cmd_active), 'info': (1, 1, 'PEER', cmd_info), - 'list': (0, 0, '', cmd_list) + 'list-active': (0, 0, '', cmd_listactive), + 'userpeer': (1, 1, 'USER', cmd_userpeer) })] if __name__ == '__main__': opts = parse_options() T.runservices(opts.tripesock, service_info, - setup = setup, + init = init, setup = setup, daemon = opts.daemon) ###----- That's all, folks --------------------------------------------------