#! @PYTHON@ ### -*-python-*- ### ### Connect to remote peers, and keep track of them ### ### (c) 2007 Straylight/Edgeware ### ###----- Licensing notice --------------------------------------------------- ### ### This file is part of Trivial IP Encryption (TrIPE). ### ### TrIPE is free software; you can redistribute it and/or modify ### it under the terms of the GNU General Public License as published by ### the Free Software Foundation; either version 2 of the License, or ### (at your option) any later version. ### ### TrIPE is distributed in the hope that it will be useful, ### but WITHOUT ANY WARRANTY; without even the implied warranty of ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ### GNU General Public License for more details. ### ### You should have received a copy of the GNU General Public License ### along with TrIPE; if not, write to the Free Software Foundation, ### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. VERSION = '@VERSION@' ###-------------------------------------------------------------------------- ### External dependencies. from optparse import OptionParser import tripe as T import os as OS import signal as SIG import errno as E import cdb as CDB import mLib as M import re as RX from time import time import subprocess as PROC S = T.svcmgr ###-------------------------------------------------------------------------- ### Running auxiliary commands. class SelLineQueue (M.SelLineBuffer): """Glues the select-line-buffer into the coroutine queue system.""" def __new__(cls, file, queue, tag, kind): """See __init__ for documentation.""" return M.SelLineBuffer.__new__(cls, file.fileno()) def __init__(me, file, queue, tag, kind): """ Initialize a new line-reading adaptor. The adaptor reads lines from FILE. Each line is inserted as a message of the stated KIND, bearing the TAG, into the QUEUE. End-of-file is represented as None. """ me._q = queue me._file = file me._tag = tag me._kind = kind me.enable() @T._callback def line(me, line): me._q.put((me._tag, me._kind, line)) @T._callback def eof(me): me.disable() me._q.put((me._tag, me._kind, None)) class ErrorWatch (T.Coroutine): """ An object which watches stderr streams for errors and converts them into warnings of the form WARN connect INFO stderr LINE The INFO is a list of tokens associated with the file when it was registered. Usually there is a single ErrorWatch object, called errorwatch. """ def __init__(me): """Initialization: there are no arguments.""" T.Coroutine.__init__(me) me._q = T.Queue() me._map = {} me._seq = 1 def watch(me, file, info): """ Adds FILE to the collection of files to watch. INFO will be written in the warning messages from this FILE. Returns a sequence number which can be used to unregister the file again. """ seq = me._seq me._seq += 1 me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr') return seq def unwatch(me, seq): """Stop watching the file with sequence number SEQ.""" del me._map[seq] return me def run(me): """ Coroutine function: read items from the queue and report them. Unregisters files automatically when they reach EOF. """ while True: seq, _, line = me._q.get() if line is None: me.unwatch(seq) else: S.warn(*['connect'] + me._map[seq][0] + ['stderr', line]) def dbwatch(): """ Coroutine function: wake up every second and notice changes to the database. When a change happens, tell the Pinger (q.v.) to rescan its peers. """ cr = T.Coroutine.getcurrent() main = cr.parent fw = M.FWatch(opts.cdb) while True: timer = M.SelTimer(time() + 1, lambda: cr.switch()) main.switch() if fw.update(): pinger.rescan(False) S.notify('connect', 'peerdb-update') class ChildWatch (M.SelSignal): """ An object which watches for specified processes exiting and reports terminations by writing items of the form (TAG, 'exit', RESULT) to a queue. There is usually only one ChildWatch object, called childwatch. """ def __new__(cls): """Initialize the child-watcher.""" return M.SelSignal.__new__(cls, SIG.SIGCHLD) def __init__(me): """Initialize the child-watcher.""" me._pid = {} me.enable() def watch(me, pid, queue, tag): """ Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE) to the QUEUE, where CODE is one of * None (successful termination) * ['exit-nonzero', CODE] (CODE is a string!) * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string) * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex) """ me._pid[pid] = queue, tag return me def unwatch(me, pid): """Unregister PID as a child to watch.""" del me._pid[pid] return me @T._callback def signalled(me): """ Called when child processes exit: collect exit statuses and report failures. """ while True: try: pid, status = OS.waitpid(-1, OS.WNOHANG) except OSError, exc: if exc.errno == E.ECHILD: break if pid == 0: break if pid not in me._pid: continue queue, tag = me._pid[pid] if OS.WIFEXITED(status): exit = OS.WEXITSTATUS(status) if exit == 0: code = None else: code = ['exit-nonzero', str(exit)] elif OS.WIFSIGNALED(status): code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))] else: code = ['exit-unknown', hex(status)] queue.put((tag, 'exit', code)) class Command (object): """ Represents a running command. This class is the main interface to the machery provided by the ChildWatch and ErrorWatch objects. See also potwatch. """ def __init__(me, info, queue, tag, args, env): """ Start a new child process. The ARGS are a list of arguments to be given to the child process. The ENV is either None or a dictionary of environment variable assignments to override the extant environment. INFO is a list of tokens to be included in warnings about the child's stderr output. If the child writes a line to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the child exits, write (TAG, 'exit', CODE) to the QUEUE. """ me._info = info me._q = queue me._tag = tag myenv = OS.environ.copy() if env: myenv.update(env) me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1, stdout = PROC.PIPE, stderr = PROC.PIPE) me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout') errorwatch.watch(me._proc.stderr, info) childwatch.watch(me._proc.pid, queue, tag) def __del__(me): """ If I've been forgotten then stop watching for termination. """ childwatch.unwatch(me._proc.pid) def potwatch(what, name, q): """ Watch the queue Q for activity as reported by a Command object. Information from the process's stdout is reported as NOTE WHAT NAME stdout LINE abnormal termination is reported as WARN WHAT NAME CODE where CODE is what the ChildWatch wrote. """ eofp = deadp = False while not deadp or not eofp: _, kind, more = q.get() if kind == 'stdout': if more is None: eofp = True else: S.notify('connect', what, name, 'stdout', more) elif kind == 'exit': if more: S.warn('connect', what, name, *more) deadp = True ###-------------------------------------------------------------------------- ### Peer database utilities. _magic = ['_magic'] # An object distinct from all others class Peer (object): """Representation of a peer in the database.""" def __init__(me, peer, cdb = None): """ Create a new peer, named PEER. Information about the peer is read from the database CDB, or the default one given on the command-line. """ me.name = peer record = (cdb or CDB.init(opts.cdb))['P' + peer] me.__dict__.update(M.URLDecode(record, semip = True)) def get(me, key, default = _magic, filter = None): """ Get the information stashed under KEY from the peer's database record. If DEFAULT is given, then use it if the database doesn't contain the necessary information. If no DEFAULT is given, then report an error. If a FILTER function is given then apply it to the information from the database before returning it. """ attr = me.__dict__.get(key, default) if attr is _magic: raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key) elif filter is not None: attr = filter(attr) return attr def has(me, key): """ Return whether the peer's database record has the KEY. """ return key in me.__dict__ def list(me): """ Iterate over the available keys in the peer's database record. """ return me.__dict__.iterkeys() def boolean(value): """Parse VALUE as a boolean.""" return value in ['t', 'true', 'y', 'yes', 'on'] ###-------------------------------------------------------------------------- ### Waking up and watching peers. def run_connect(peer, cmd): """ Start the job of connecting to the passive PEER. The CMD string is a shell command which will connect to the peer (via some back-channel, say ssh and userv), issue a command SVCSUBMIT connect passive [OPTIONS] USER and write the resulting challenge to standard error. """ q = T.Queue() cmd = Command(['connect', peer.name], q, 'connect', ['/bin/sh', '-c', cmd], None) _, kind, more = q.peek() if kind == 'stdout': if more is None: S.warn('connect', 'connect', peer.name, 'unexpected-eof') else: chal = more S.greet(peer.name, chal) q.get() potwatch('connect', peer.name, q) def run_disconnect(peer, cmd): """ Start the job of disconnecting from a passive PEER. The CMD string is a shell command which will disconnect from the peer. """ q = T.Queue() cmd = Command(['disconnect', peer.name], q, 'disconnect', ['/bin/sh', '-c', cmd], None) potwatch('disconnect', peer.name, q) _pingseq = 0 class PingPeer (object): """ Object representing a peer which we are pinging to ensure that it is still present. PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an event queue -- which saves us from having an enormous swarm of coroutines -- but most of the actual work is done here. In order to avoid confusion between different PingPeer instances for the same actual peer, each PingPeer has a sequence number (its `seq' attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair. (Using the PingPeer instance itself will prevent garbage collection of otherwise defunct instances.) """ def __init__(me, pinger, queue, peer, pingnow): """ Create a new PingPeer. The PINGER is the Pinger object we should send the results to. This is used when we remove ourselves, if the peer has been explicitly removed. The QUEUE is the event queue on which timer and ping-command events should be written. The PEER is a `Peer' object describing the peer. If PINGNOW is true, then immediately start pinging the peer. Otherwise wait until the usual retry interval. """ global _pingseq me._pinger = pinger me._q = queue me._peer = peer.name me.update(peer) me.seq = _pingseq _pingseq += 1 me._failures = 0 if pingnow: me._timer = None me._ping() else: me._timer = M.SelTimer(time() + me._every, me._time) def update(me, peer): """ Refreshes the timer parameters for this peer. We don't, however, immediately reschedule anything: that will happen next time anything interesting happens. """ if peer is None: peer = Peer(me._peer) assert peer.name == me._peer me._every = peer.get('every', filter = T.timespec, default = 120) me._timeout = peer.get('timeout', filter = T.timespec, default = 10) me._retries = peer.get('retries', filter = int, default = 5) me._connectp = peer.has('connect') return me def _ping(me): """ Send a ping to the peer; the result is sent to the Pinger's event queue. """ S.rawcommand(T.TripeAsynchronousCommand( me._q, (me._peer, me.seq), ['EPING', '-background', S.bgtag(), '-timeout', str(me._timeout), '--', me._peer])) def _reconnect(me): peer = Peer(me._peer) if me._connectp: S.warn('connect', 'reconnecting', me._peer) S.forcekx(me._peer) T.spawn(run_connect, peer, peer.get('connect')) me._timer = M.SelTimer(time() + me._every, me._time) else: S.kill(me._peer) def event(me, code, stuff): """ Respond to an event which happened to this peer. Timer events indicate that we should start a new ping. (The server has its own timeout which detects lost packets.) We trap unknown-peer responses and detach from the Pinger. If the ping fails and we run out of retries, we attempt to restart the connection. """ if code == 'TIMER': me._failures = 0 me._ping() elif code == 'FAIL': S.notify('connect', 'ping-failed', me._peer, *stuff) if not stuff: pass elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer) elif stuff[0] == 'ping-send-failed': me._reconnect() elif code == 'INFO': if stuff[0] == 'ping-ok': if me._failures > 0: S.warn('connect', 'ping-ok', me._peer) me._timer = M.SelTimer(time() + me._every, me._time) elif stuff[0] == 'ping-timeout': me._failures += 1 S.warn('connect', 'ping-timeout', me._peer, 'attempt', str(me._failures), 'of', str(me._retries)) if me._failures < me._retries: me._ping() else: me._reconnect() elif stuff[0] == 'ping-peer-died': me._pinger.kill(me._peer) @T._callback def _time(me): """ Handle timer callbacks by posting a timeout event on the queue. """ me._timer = None me._q.put(((me._peer, me.seq), 'TIMER', None)) def __str__(me): return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures) def __repr__(me): return str(me) class Pinger (T.Coroutine): """ The Pinger keeps track of the peers which we expect to be connected and takes action if they seem to stop responding. There is usually only one Pinger, called pinger. The Pinger maintains a collection of PingPeer objects, and an event queue. The PingPeers direct the results of their pings, and timer events, to the event queue. The Pinger's coroutine picks items off the queue and dispatches them back to the PingPeers as appropriate. """ def __init__(me): """Initialize the Pinger.""" T.Coroutine.__init__(me) me._peers = {} me._q = T.Queue() def run(me): """ Coroutine function: reads the pinger queue and sends events to the PingPeer objects they correspond to. """ while True: (peer, seq), code, stuff = me._q.get() if peer in me._peers and seq == me._peers[peer].seq: me._peers[peer].event(code, stuff) def add(me, peer, pingnow): """ Add PEER to the collection of peers under the Pinger's watchful eye. The arguments are as for PingPeer: see above. """ me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow) return me def kill(me, peername): """Remove PEER from the peers being watched by the Pinger.""" del me._peers[peername] return me def rescan(me, startup): """ General resynchronization method. We scan the list of peers (with connect scripts) known at the server. Any which are known to the Pinger but aren't known to the server are removed from our list; newly arrived peers are added. (Note that a peer can change state here either due to the server sneakily changing its list without issuing notifications or, more likely, the database changing its idea of whether a peer is interesting.) Finally, PingPeers which are still present are prodded to update their timing parameters. This method is called once at startup to pick up the peers already installed, and again by the dbwatcher coroutine when it detects a change to the database. """ if T._debug: print '# rescan peers' correct = {} start = {} for name in S.list(): try: peer = Peer(name) except KeyError: continue if peer.get('watch', filter = boolean, default = False): if T._debug: print '# interesting peer %s' % peer correct[peer.name] = start[peer.name] = peer elif startup: if T._debug: print '# peer %s ready for adoption' % peer start[peer.name] = peer for name, obj in me._peers.items(): try: peer = correct[name] except KeyError: if T._debug: print '# peer %s vanished' % name del me._peers[name] else: obj.update(peer) for name, peer in start.iteritems(): if name in me._peers: continue if startup: if T._debug: print '# setting up peer %s' % name ifname = S.ifname(name) addr = S.addr(name) T.defer(adoptpeer, peer, ifname, *addr) else: if T._debug: print '# adopting new peer %s' % name me.add(peer, True) return me def adopted(me): """ Returns the list of peers being watched by the Pinger. """ return me._peers.keys() ###-------------------------------------------------------------------------- ### New connections. def encode_envvars(env, prefix, vars): """ Encode the variables in VARS suitably for including in a program environment. Lowercase letters in variable names are forced to uppercase; runs of non-alphanumeric characters are replaced by single underscores; and the PREFIX is prepended. The resulting variables are written to ENV. """ for k, v in vars.iteritems(): env[prefix + r_bad.sub('_', k.upper())] = v r_bad = RX.compile(r'[\W_]+') def envvars(peer): """ Translate the database information for a PEER into a dictionary of environment variables with plausible upper-case names and a P_ prefix. Also collect the crypto information into A_ variables. """ env = {} encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()])) encode_envvars(env, 'A_', S.algs(peer.name)) return env def run_ifupdown(what, peer, *args): """ Run the interface up/down script for a peer. WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. ARGS is a list of arguments to pass to the script, in addition to the peer name. The command is run and watched in the background by potwatch. """ q = T.Queue() c = Command([what, peer.name], q, what, M.split(peer.get(what), quotep = True)[0] + [peer.name] + list(args), envvars(peer)) potwatch(what, peer.name, q) def adoptpeer(peer, ifname, *addr): """ Add a new peer to our collection. PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and ADDR is the list of tokens representing its address. We try to bring up the interface and provoke a connection to the peer if it's passive. """ if peer.has('ifup'): T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \ .switch('ifup', peer, ifname, *addr) cmd = peer.get('connect', default = None) if cmd is not None: T.Coroutine(run_connect, name = 'connect %s' % peer.name) \ .switch(peer, cmd) if peer.get('watch', filter = boolean, default = False): pinger.add(peer, False) def disownpeer(peer): """Drop the PEER from the Pinger and put its interface to bed.""" try: pinger.kill(peer) except KeyError: pass cmd = peer.get('disconnect', default = None) if cmd is not None: T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \ .switch(peer, cmd) if peer.has('ifdown'): T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \ .switch('ifdown', peer) def addpeer(peer, addr): """ Process a connect request from a new peer PEER on address ADDR. Any existing peer with this name is disconnected from the server. """ if peer.name in S.list(): S.kill(peer.name) try: booltrue = ['t', 'true', 'y', 'yes', 'on'] S.add(peer.name, tunnel = peer.get('tunnel', None), keepalive = peer.get('keepalive', None), key = peer.get('key', None), priv = peer.get('priv', None), mobile = peer.get('mobile', 'nil') in booltrue, cork = peer.get('cork', 'nil') in booltrue, *addr) except T.TripeError, exc: raise T.TripeJobError(*exc.args) ## Dictionary mapping challenges to waiting passive-connection coroutines. chalmap = {} def notify(_, code, *rest): """ Watch for notifications. We trap ADD and KILL notifications, and send them straight to adoptpeer and disownpeer respectively; and dispatch GREET notifications to the corresponding waiting coroutine. """ if code == 'ADD': try: p = Peer(rest[0]) except KeyError: return adoptpeer(p, *rest[1:]) elif code == 'KILL': try: p = Peer(rest[0]) except KeyError: return disownpeer(p, *rest[1:]) elif code == 'GREET': chal = rest[0] try: cr = chalmap[chal] except KeyError: pass else: cr.switch(rest[1:]) ###-------------------------------------------------------------------------- ### Command implementation. def cmd_kick(name): """ kick NAME: Force a new connection attempt for the NAMEd peer. """ if name not in pinger.adopted(): raise T.TripeJobError('peer-not-adopted', name) try: peer = Peer(name) except KeyError: raise T.TripeJobError('unknown-peer', name) T.spawn(connect, peer) def cmd_adopted(): """ adopted: Report a list of adopted peers. """ for name in pinger.adopted(): T.svcinfo(name) def cmd_active(name): """ active NAME: Handle an active connection request for the peer called NAME. The appropriate address is read from the database automatically. """ try: peer = Peer(name) except KeyError: raise T.TripeJobError('unknown-peer', name) addr = peer.get('peer') if addr == 'PASSIVE': raise T.TripeJobError('passive-peer', name) addpeer(peer, M.split(addr, quotep = True)[0]) def cmd_listactive(): """ list: Report a list of the available active peers. """ cdb = CDB.init(opts.cdb) for key in cdb.keys(): if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE': T.svcinfo(key[1:]) def cmd_info(name): """ info NAME: Report the database entries for the named peer. """ try: peer = Peer(name) except KeyError: raise T.TripeJobError('unknown-peer', name) items = list(peer.list()) items.sort() for i in items: T.svcinfo('%s=%s' % (i, peer.get(i))) def cmd_userpeer(user): """ userpeer USER: Report the peer name for the named user. """ try: name = CDB.init(opts.cdb)['U' + user] except KeyError: raise T.TripeJobError('unknown-user', user) T.svcinfo(name) def cmd_passive(*args): """ passive [OPTIONS] USER: Await the arrival of the named USER. Report a challenge; when (and if!) the server receives a greeting quoting this challenge, add the corresponding peer to the server. """ timeout = 30 op = T.OptParse(args, ['-timeout']) for opt in op: if opt == '-timeout': timeout = T.timespec(op.arg()) user, = op.rest(1, 1) try: name = CDB.init(opts.cdb)['U' + user] except KeyError: raise T.TripeJobError('unknown-user', user) try: peer = Peer(name) except KeyError: raise T.TripeJobError('unknown-peer', name) chal = S.getchal() cr = T.Coroutine.getcurrent() timer = M.SelTimer(time() + timeout, lambda: cr.switch(None)) try: T.svcinfo(chal) chalmap[chal] = cr addr = cr.parent.switch() if addr is None: raise T.TripeJobError('connect-timeout') addpeer(peer, addr) finally: del chalmap[chal] ###-------------------------------------------------------------------------- ### Start up. def setup(): """ Service setup. Register the notification watcher, rescan the peers, and add automatic active peers. """ S.handler['NOTE'] = notify S.watch('+n') pinger.rescan(opts.startup) if opts.startup: cdb = CDB.init(opts.cdb) try: autos = cdb['%AUTO'] except KeyError: autos = '' for name in M.split(autos)[0]: try: peer = Peer(name, cdb) addpeer(peer, M.split(peer.get('peer'), quotep = True)[0]) except T.TripeJobError, err: S.warn('connect', 'auto-add-failed', name, *err.args) def init(): """ Initialization to be done before service startup. """ global errorwatch, childwatch, pinger errorwatch = ErrorWatch() childwatch = ChildWatch() pinger = Pinger() T.Coroutine(dbwatch, name = 'dbwatch').switch() errorwatch.switch() pinger.switch() def parse_options(): """ Parse the command-line options. Automatically changes directory to the requested configdir, and turns on debugging. Returns the options object. """ op = OptionParser(usage = '%prog [-a FILE] [-d DIR]', version = '%%prog %s' % VERSION) op.add_option('-a', '--admin-socket', metavar = 'FILE', dest = 'tripesock', default = T.tripesock, help = 'Select socket to connect to [default %default]') op.add_option('-d', '--directory', metavar = 'DIR', dest = 'dir', default = T.configdir, help = 'Select current diretory [default %default]') op.add_option('-p', '--peerdb', metavar = 'FILE', dest = 'cdb', default = T.peerdb, help = 'Select peers database [default %default]') op.add_option('--daemon', dest = 'daemon', default = False, action = 'store_true', help = 'Become a daemon after successful initialization') op.add_option('--debug', dest = 'debug', default = False, action = 'store_true', help = 'Emit debugging trace information') op.add_option('--startup', dest = 'startup', default = False, action = 'store_true', help = 'Being called as part of the server startup') opts, args = op.parse_args() if args: op.error('no arguments permitted') OS.chdir(opts.dir) T._debug = opts.debug return opts ## Service table, for running manually. service_info = [('connect', T.VERSION, { 'adopted': (0, 0, '', cmd_adopted), 'kick': (1, 1, 'PEER', cmd_kick), 'passive': (1, None, '[OPTIONS] USER', cmd_passive), 'active': (1, 1, 'PEER', cmd_active), 'info': (1, 1, 'PEER', cmd_info), 'list-active': (0, 0, '', cmd_listactive), 'userpeer': (1, 1, 'USER', cmd_userpeer) })] if __name__ == '__main__': opts = parse_options() T.runservices(opts.tripesock, service_info, init = init, setup = setup, daemon = opts.daemon) ###----- That's all, folks --------------------------------------------------