#! @PYTHON@ ### -*-python-*- ### ### Watch arrival and departure of peers ### ### (c) 2007 Straylight/Edgeware ### ###----- Licensing notice --------------------------------------------------- ### ### This file is part of Trivial IP Encryption (TrIPE). ### ### TrIPE is free software; you can redistribute it and/or modify ### it under the terms of the GNU General Public License as published by ### the Free Software Foundation; either version 2 of the License, or ### (at your option) any later version. ### ### TrIPE is distributed in the hope that it will be useful, ### but WITHOUT ANY WARRANTY; without even the implied warranty of ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ### GNU General Public License for more details. ### ### You should have received a copy of the GNU General Public License ### along with TrIPE; if not, write to the Free Software Foundation, ### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. VERSION = '@VERSION@' ###-------------------------------------------------------------------------- ### External dependencies. from optparse import OptionParser import tripe as T import os as OS import signal as SIG import errno as E import cdb as CDB import mLib as M import re as RX from time import time import subprocess as PROC S = T.svcmgr ###-------------------------------------------------------------------------- ### Running auxiliary commands. class SelLineQueue (M.SelLineBuffer): """Glues the select-line-buffer into the coroutine queue system.""" def __new__(cls, file, queue, tag, kind): """See __init__ for documentation.""" return M.SelLineBuffer.__new__(cls, file.fileno()) def __init__(me, file, queue, tag, kind): """ Initialize a new line-reading adaptor. The adaptor reads lines from FILE. Each line is inserted as a message of the stated KIND, bearing the TAG, into the QUEUE. End-of-file is represented as None. """ me._q = queue me._file = file me._tag = tag me._kind = kind me.enable() @T._callback def line(me, line): me._q.put((me._tag, me._kind, line)) @T._callback def eof(me): me.disable() me._q.put((me._tag, me._kind, None)) class ErrorWatch (T.Coroutine): """ An object which watches stderr streams for errors and converts them into warnings of the form WARN watch INFO stderr LINE The INFO is a list of tokens associated with the file when it was registered. Usually there is a single ErrorWatch object, called errorwatch. """ def __init__(me): """Initialization: there are no arguments.""" T.Coroutine.__init__(me) me._q = T.Queue() me._map = {} me._seq = 1 def watch(me, file, info): """ Adds FILE to the collection of files to watch. INFO will be written in the warning messages from this FILE. Returns a sequence number which can be used to unregister the file again. """ seq = me._seq me._seq += 1 me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr') return seq def unwatch(me, seq): """Stop watching the file with sequence number SEQ.""" del me._map[seq] return me def run(me): """ Coroutine function: read items from the queue and report them. Unregisters files automatically when they reach EOF. """ while True: seq, _, line = me._q.get() if line is None: me.unwatch(seq) else: S.warn(*['watch'] + me._map[seq][0] + ['stderr', line]) def dbwatch(): """ Coroutine function: wake up every second and notice changes to the database. When a change happens, tell the Pinger (q.v.) to rescan its peers. """ cr = T.Coroutine.getcurrent() main = cr.parent fw = M.FWatch(opts.cdb) while True: timer = M.SelTimer(time() + 1, lambda: cr.switch()) main.switch() if fw.update(): pinger.rescan(False) S.notify('watch', 'peerdb-update') class ChildWatch (M.SelSignal): """ An object which watches for specified processes exiting and reports terminations by writing items of the form (TAG, 'exit', RESULT) to a queue. There is usually only one ChildWatch object, called childwatch. """ def __new__(cls): """Initialize the child-watcher.""" return M.SelSignal.__new__(cls, SIG.SIGCHLD) def __init__(me): """Initialize the child-watcher.""" me._pid = {} me.enable() def watch(me, pid, queue, tag): """ Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE) to the QUEUE, where CODE is one of * None (successful termination) * ['exit-nonzero', CODE] (CODE is a string!) * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string) * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex) """ me._pid[pid] = queue, tag return me def unwatch(me, pid): """Unregister PID as a child to watch.""" del me._pid[pid] return me @T._callback def signalled(me): """ Called when child processes exit: collect exit statuses and report failures. """ while True: try: pid, status = OS.waitpid(-1, OS.WNOHANG) except OSError, exc: if exc.errno == E.ECHILD: break if pid == 0: break if pid not in me._pid: continue queue, tag = me._pid[pid] if OS.WIFEXITED(status): exit = OS.WEXITSTATUS(status) if exit == 0: code = None else: code = ['exit-nonzero', str(exit)] elif OS.WIFSIGNALED(status): code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))] else: code = ['exit-unknown', hex(status)] queue.put((tag, 'exit', code)) class Command (object): """ Represents a running command. This class is the main interface to the machery provided by the ChildWatch and ErrorWatch objects. See also potwatch. """ def __init__(me, info, queue, tag, args, env): """ Start a new child process. The ARGS are a list of arguments to be given to the child process. The ENV is either None or a dictionary of environment variable assignments to override the extant environment. INFO is a list of tokens to be included in warnings about the child's stderr output. If the child writes a line to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the child exits, write (TAG, 'exit', CODE) to the QUEUE. """ me._info = info me._q = queue me._tag = tag myenv = OS.environ.copy() if env: myenv.update(env) me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1, stdout = PROC.PIPE, stderr = PROC.PIPE) me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout') errorwatch.watch(me._proc.stderr, info) childwatch.watch(me._proc.pid, queue, tag) def __del__(me): """ If I've been forgotten then stop watching for termination. """ childwatch.unwatch(me._proc.pid) def potwatch(what, name, q): """ Watch the queue Q for activity as reported by a Command object. Information from the process's stdout is reported as NOTE WHAT NAME stdout LINE abnormal termination is reported as WARN WHAT NAME CODE where CODE is what the ChildWatch wrote. """ eofp = deadp = False while not deadp or not eofp: _, kind, more = q.get() if kind == 'stdout': if more is None: eofp = True else: S.notify('watch', what, name, 'stdout', more) elif kind == 'exit': if more: S.warn('watch', what, name, *more) deadp = True ###-------------------------------------------------------------------------- ### Peer database utilities. def timespec(info, key, default): """Parse INFO[KEY] as a timespec, or return DEFAULT.""" try: return T.timespec(info[key]) except (KeyError, T.TripeJobError): return default def integer(info, key, default): """Parse INFO[KEY] as an integer, or return DEFAULT.""" try: return int(info[key]) except (KeyError, ValueError): return default def boolean(info, key, default): """Parse INFO[KEY] as a boolean, or return DEFAULT.""" try: return info[key] in ['t', 'true', 'y', 'yes', 'on'] except (KeyError, ValueError): return default def peerinfo(peer): """ Return a dictionary containing information about PEER from the database. """ return dict(M.URLDecode(CDB.init(opts.cdb)['P' + peer], semip = True)) ###-------------------------------------------------------------------------- ### Waking up and watching peers. def connect(peer, conn = None): """ Start the job of connecting to the passive PEER. The CONN string is a shell command which will connect to the peer (via some back-channel, say ssh and userv), issue a command SVCSUBMIT connect passive [OPTIONS] USER and write the resulting challenge to standard error. """ if conn is None: try: conn = peerinfo(peer)['connect'] except KeyError: return q = T.Queue() cmd = Command(['connect', peer], q, 'connect', ['/bin/sh', '-c', conn], None) _, kind, more = q.peek() if kind == 'stdout': if more is None: S.warn('watch', 'connect', peer, 'unexpected-eof') else: chal = more S.greet(peer, chal) q.get() potwatch('connect', peer, q) _pingseq = 0 class PingPeer (object): """ Object representing a peer which we are pinging to ensure that it is still present. PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an event queue -- which saves us from having an enormous swarm of coroutines -- but most of the actual work is done here. In order to avoid confusion between different PingPeer instances for the same actual peer, each PingPeer has a sequence number (its `seq' attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair. (Using the PingPeer instance itself will prevent garbage collection of otherwise defunct instances.) """ def __init__(me, pinger, queue, peer, info, pingnow): """ Create a new PingPeer. The PINGER is the Pinger object we should send the results to. This is used when we remove ourselves, if the peer has been explicitly removed. The QUEUE is the event queue on which timer and ping-command events should be written. The PEER is just the peer's name, as a string. The INFO is the database record for the peer, as a dictionary, or None if it's not readily available. (This is just a tweak to save multiple probes if we don't really need them.) If PINGNOW is true, then immediately start pinging the peer. Otherwise wait until the usual retry interval. """ global _pingseq me._pinger = pinger me._q = queue me._peer = peer me.update(info) me.seq = _pingseq _pingseq += 1 me._failures = 0 if pingnow: me._timer = None me._ping() else: me._timer = M.SelTimer(time() + me._every, me._time) def update(me, info): """ Refreshes the timer parameters for this peer. We don't, however, immediately reschedule anything: that will happen next time anything interesting happens. """ if info is None: info = peerinfo(me._peer) me._every = timespec(info, 'every', 120) me._timeout = timespec(info, 'timeout', 10) me._retries = integer(info, 'retries', 5) me._connectp = 'connect' in info return me def _ping(me): """ Send a ping to the peer; the result is sent to the Pinger's event queue. """ S.rawcommand(T.TripeAsynchronousCommand( me._q, (me._peer, me.seq), ['EPING', '-background', S.bgtag(), '-timeout', str(me._timeout), '--', me._peer])) def _reconnect(me): info = peerinfo(me._peer) if 'connect' in info: S.warn('watch', 'reconnecting', me._peer) S.forcekx(me._peer) T.spawn(connect, me._peer) me._timer = M.SelTimer(time() + me._every, me._time) else: S.kill(me._peer) def event(me, code, stuff): """ Respond to an event which happened to this peer. Timer events indicate that we should start a new ping. (The server has its own timeout which detects lost packets.) We trap unknown-peer responses and detach from the Pinger. If the ping fails and we run out of retries, we attempt to restart the connection. """ if code == 'TIMER': me._failures = 0 me._ping() elif code == 'FAIL': S.notify('watch', 'ping-failed', me._peer, *stuff) if not stuff: pass elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer) elif stuff[0] == 'ping-send-failed': me._reconnect() elif code == 'INFO': if stuff[0] == 'ping-ok': if me._failures > 0: S.warn('watch', 'ping-ok', me._peer) me._timer = M.SelTimer(time() + me._every, me._time) elif stuff[0] == 'ping-timeout': me._failures += 1 S.warn('watch', 'ping-timeout', me._peer, 'attempt', str(me._failures), 'of', str(me._retries)) if me._failures < me._retries: me._ping() else: me._reconnect() elif stuff[0] == 'ping-peer-died': me._pinger.kill(me._peer) @T._callback def _time(me): """ Handle timer callbacks by posting a timeout event on the queue. """ me._timer = None me._q.put(((me._peer, me.seq), 'TIMER', None)) def __str__(me): return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures) def __repr__(me): return str(me) class Pinger (T.Coroutine): """ The Pinger keeps track of the peers which we expect to be connected and takes action if they seem to stop responding. There is usually only one Pinger, called pinger. The Pinger maintains a collection of PingPeer objects, and an event queue. The PingPeers direct the results of their pings, and timer events, to the event queue. The Pinger's coroutine picks items off the queue and dispatches them back to the PingPeers as appropriate. """ def __init__(me): """Initialize the Pinger.""" T.Coroutine.__init__(me) me._peers = {} me._q = T.Queue() def run(me): """ Coroutine function: reads the pinger queue and sends events to the PingPeer objects they correspond to. """ while True: (peer, seq), code, stuff = me._q.get() if peer in me._peers and seq == me._peers[peer].seq: me._peers[peer].event(code, stuff) def add(me, peer, info, pingnow): """ Add PEER to the collection of peers under the Pinger's watchful eye. The arguments are as for PingPeer: see above. """ me._peers[peer] = PingPeer(me, me._q, peer, info, pingnow) return me def kill(me, peer): """Remove PEER from the peers being watched by the Pinger.""" del me._peers[peer] return me def rescan(me, startup): """ General resynchronization method. We scan the list of peers (with connect scripts) known at the server. Any which are known to the Pinger but aren't known to the server are removed from our list; newly arrived peers are added. (Note that a peer can change state here either due to the server sneakily changing its list without issuing notifications or, more likely, the database changing its idea of whether a peer is interesting.) Finally, PingPeers which are still present are prodded to update their timing parameters. This method is called once at startup to pick up the peers already installed, and again by the dbwatcher coroutine when it detects a change to the database. """ correct = {} for peer in S.list(): try: info = peerinfo(peer) except KeyError: continue if boolean(info, 'watch', False): correct[peer] = info for peer, obj in me._peers.items(): if peer in correct: obj.update(correct[peer]) else: del me._peers[peer] for peer, info in correct.iteritems(): if peer not in me._peers: if startup: ifname = S.ifname(peer) addr = S.addr(peer) T.defer(addpeer, info, peer, ifname, *addr) else: me.add(peer, info, True) return me def adopted(me): """ Returns the list of peers being watched by the Pinger. """ return me._peers.keys() ###-------------------------------------------------------------------------- ### New connections. def encode_envvars(env, prefix, vars): """ Encode the variables in VARS suitably for including in a program environment. Lowercase letters in variable names are forced to uppercase; runs of non-alphanumeric characters are replaced by single underscores; and the PREFIX is prepended. The resulting variables are written to ENV. """ for k, v in vars.iteritems(): env[prefix + r_bad.sub('_', k.upper())] = v r_bad = RX.compile(r'[\W_]+') def envvars(info): """ Translate the database INFO dictionary for a peer into a dictionary of environment variables with plausible upper-case names and a P_ prefix. Also collect the crypto information into A_ variables. """ env = {} encode_envvars(env, 'P_', info) encode_envvars(env, 'A_', S.algs()) return env def ifupdown(what, peer, info, *args): """ Run the interface up/down script for a peer. WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. INFO is the database record dictionary. ARGS is a list of arguments to pass to the script, in addition to the peer name. The command is run and watched in the background by potwatch. """ q = T.Queue() c = Command([what, peer], q, what, M.split(info[what], quotep = True)[0] + [peer] + list(args), envvars(info)) potwatch(what, peer, q) def addpeer(info, peer, ifname, *addr): """ Add a new peer to our collection. INFO is the peer information dictionary, or None if we don't have one yet. PEER names the peer; IFNAME is the interface name for its tunnel; and ADDR is the list of tokens representing its address. We try to bring up the interface and provoke a connection to the peer if it's passive. """ if info is None: try: info = peerinfo(peer) except KeyError: return if 'ifup' in info: T.Coroutine(ifupdown, name = 'ifup %s' % peer) \ .switch('ifup', peer, info, ifname, *addr) if 'connect' in info: T.Coroutine(connect, name = 'connect %s' % peer) \ .switch(peer, info['connect']) if boolean(info, 'watch', False): pinger.add(peer, info, False) def delpeer(peer): """Drop the PEER from the Pinger and put its interface to bed.""" try: info = peerinfo(peer) except KeyError: return try: pinger.kill(peer) except KeyError: pass if 'ifdown' in info: T.Coroutine(ifupdown, name = 'ifdown %s' % peer) \ .switch('ifdown', peer, info) def notify(_, code, *rest): """ Watch for notifications. We trap ADD and KILL notifications, and send them straight to addpeer and delpeer respectively. """ if code == 'ADD': addpeer(None, *rest) elif code == 'KILL': delpeer(*rest) ###-------------------------------------------------------------------------- ### Command stubs. def cmd_stub(*args): raise T.TripeJobError('not-implemented') def cmd_kick(peer): """ kick PEER: Force a new connection attempt for PEER """ if peer not in pinger.adopted(): raise T.TripeJobError('peer-not-adopted', peer) T.spawn(connect, peer) def cmd_adopted(): """ adopted: Report a list of adopted peers. """ for peer in pinger.adopted(): T.svcinfo(peer) ###-------------------------------------------------------------------------- ### Start up. def setup(): """ Service setup. Register the notification watcher, and rescan the peers. """ S.handler['NOTE'] = notify S.watch('+n') pinger.rescan(opts.startup) def init(): """ Initialization to be done before service startup. """ global errorwatch, childwatch, pinger errorwatch = ErrorWatch() childwatch = ChildWatch() pinger = Pinger() T.Coroutine(dbwatch, name = 'dbwatch').switch() errorwatch.switch() pinger.switch() def parse_options(): """ Parse the command-line options. Automatically changes directory to the requested configdir, and turns on debugging. Returns the options object. """ op = OptionParser(usage = '%prog [-a FILE] [-d DIR]', version = '%%prog %s' % VERSION) op.add_option('-a', '--admin-socket', metavar = 'FILE', dest = 'tripesock', default = T.tripesock, help = 'Select socket to connect to [default %default]') op.add_option('-d', '--directory', metavar = 'DIR', dest = 'dir', default = T.configdir, help = 'Select current diretory [default %default]') op.add_option('-p', '--peerdb', metavar = 'FILE', dest = 'cdb', default = T.peerdb, help = 'Select peers database [default %default]') op.add_option('--daemon', dest = 'daemon', default = False, action = 'store_true', help = 'Become a daemon after successful initialization') op.add_option('--debug', dest = 'debug', default = False, action = 'store_true', help = 'Emit debugging trace information') op.add_option('--startup', dest = 'startup', default = False, action = 'store_true', help = 'Being called as part of the server startup') opts, args = op.parse_args() if args: op.error('no arguments permitted') OS.chdir(opts.dir) T._debug = opts.debug return opts ## Service table, for running manually. service_info = [('watch', T.VERSION, { 'adopted': (0, 0, '', cmd_adopted), 'kick': (1, 1, 'PEER', cmd_kick) })] if __name__ == '__main__': opts = parse_options() T.runservices(opts.tripesock, service_info, init = init, setup = setup, daemon = opts.daemon) ###----- That's all, folks --------------------------------------------------