+#! @PYTHON@
+### -*-python-*-
+###
+### Watch arrival and departure of peers
+###
+### (c) 2007 Straylight/Edgeware
+###
+
+###----- Licensing notice ---------------------------------------------------
+###
+### This file is part of Trivial IP Encryption (TrIPE).
+###
+### TrIPE is free software; you can redistribute it and/or modify
+### it under the terms of the GNU General Public License as published by
+### the Free Software Foundation; either version 2 of the License, or
+### (at your option) any later version.
+###
+### TrIPE is distributed in the hope that it will be useful,
+### but WITHOUT ANY WARRANTY; without even the implied warranty of
+### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+### GNU General Public License for more details.
+###
+### You should have received a copy of the GNU General Public License
+### along with TrIPE; if not, write to the Free Software Foundation,
+### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+VERSION = '@VERSION@'
+
+###--------------------------------------------------------------------------
+### External dependencies.
+
+from optparse import OptionParser
+import tripe as T
+import os as OS
+import signal as SIG
+import errno as E
+import cdb as CDB
+import mLib as M
+import re as RX
+from time import time
+import subprocess as PROC
+
+S = T.svcmgr
+
+###--------------------------------------------------------------------------
+### Running auxiliary commands.
+
+class SelLineQueue (M.SelLineBuffer):
+ """Glues the select-line-buffer into the coroutine queue system."""
+
+ def __new__(cls, file, queue, tag, kind):
+ """See __init__ for documentation."""
+ return M.SelLineBuffer.__new__(cls, file.fileno())
+
+ def __init__(me, file, queue, tag, kind):
+ """
+ Initialize a new line-reading adaptor.
+
+ The adaptor reads lines from FILE. Each line is inserted as a message of
+ the stated KIND, bearing the TAG, into the QUEUE. End-of-file is
+ represented as None.
+ """
+ me._q = queue
+ me._file = file
+ me._tag = tag
+ me._kind = kind
+ me.enable()
+
+ @T._callback
+ def line(me, line):
+ me._q.put((me._tag, me._kind, line))
+
+ @T._callback
+ def eof(me):
+ me.disable()
+ me._q.put((me._tag, me._kind, None))
+
+class ErrorWatch (T.Coroutine):
+ """
+ An object which watches stderr streams for errors and converts them into
+ warnings of the form
+
+ WARN watch INFO stderr LINE
+
+ The INFO is a list of tokens associated with the file when it was
+ registered.
+
+ Usually there is a single ErrorWatch object, called errorwatch.
+ """
+
+ def __init__(me):
+ """Initialization: there are no arguments."""
+ T.Coroutine.__init__(me)
+ me._q = T.Queue()
+ me._map = {}
+ me._seq = 1
+
+ def watch(me, file, info):
+ """
+ Adds FILE to the collection of files to watch.
+
+ INFO will be written in the warning messages from this FILE. Returns a
+ sequence number which can be used to unregister the file again.
+ """
+ seq = me._seq
+ me._seq += 1
+ me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
+ return seq
+
+ def unwatch(me, seq):
+ """Stop watching the file with sequence number SEQ."""
+ del me._map[seq]
+ return me
+
+ def run(me):
+ """
+ Coroutine function: read items from the queue and report them.
+
+ Unregisters files automatically when they reach EOF.
+ """
+ while True:
+ seq, _, line = me._q.get()
+ if line is None:
+ me.unwatch(seq)
+ else:
+ S.warn(*['watch'] + me._map[seq][0] + ['stderr', line])
+
+def dbwatch():
+ """
+ Coroutine function: wake up every second and notice changes to the
+ database. When a change happens, tell the Pinger (q.v.) to rescan its
+ peers.
+ """
+ cr = T.Coroutine.getcurrent()
+ main = cr.parent
+ fw = M.FWatch(opts.cdb)
+ while True:
+ timer = M.SelTimer(time() + 1, lambda: cr.switch())
+ main.switch()
+ if fw.update():
+ pinger.rescan(False)
+ S.notify('watch', 'peerdb-update')
+
+class ChildWatch (M.SelSignal):
+ """
+ An object which watches for specified processes exiting and reports
+ terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
+
+ There is usually only one ChildWatch object, called childwatch.
+ """
+
+ def __new__(cls):
+ """Initialize the child-watcher."""
+ return M.SelSignal.__new__(cls, SIG.SIGCHLD)
+
+ def __init__(me):
+ """Initialize the child-watcher."""
+ me._pid = {}
+ me.enable()
+
+ def watch(me, pid, queue, tag):
+ """
+ Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE)
+ to the QUEUE, where CODE is one of
+
+ * None (successful termination)
+ * ['exit-nonzero', CODE] (CODE is a string!)
+ * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
+ * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
+ """
+ me._pid[pid] = queue, tag
+ return me
+
+ def unwatch(me, pid):
+ """Unregister PID as a child to watch."""
+ del me._pid[pid]
+ return me
+
+ @T._callback
+ def signalled(me):
+ """
+ Called when child processes exit: collect exit statuses and report
+ failures.
+ """
+ while True:
+ try:
+ pid, status = OS.waitpid(-1, OS.WNOHANG)
+ except OSError, exc:
+ if exc.errno == E.ECHILD:
+ break
+ if pid == 0:
+ break
+ if pid not in me._pid:
+ continue
+ queue, tag = me._pid[pid]
+ if OS.WIFEXITED(status):
+ exit = OS.WEXITSTATUS(status)
+ if exit == 0:
+ code = None
+ else:
+ code = ['exit-nonzero', str(exit)]
+ elif OS.WIFSIGNALED(status):
+ code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
+ else:
+ code = ['exit-unknown', hex(status)]
+ queue.put((tag, 'exit', code))
+
+class Command (object):
+ """
+ Represents a running command.
+
+ This class is the main interface to the machery provided by the ChildWatch
+ and ErrorWatch objects. See also potwatch.
+ """
+
+ def __init__(me, info, queue, tag, args, env):
+ """
+ Start a new child process.
+
+ The ARGS are a list of arguments to be given to the child process. The
+ ENV is either None or a dictionary of environment variable assignments to
+ override the extant environment. INFO is a list of tokens to be included
+ in warnings about the child's stderr output. If the child writes a line
+ to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the
+ child exits, write (TAG, 'exit', CODE) to the QUEUE.
+ """
+ me._info = info
+ me._q = queue
+ me._tag = tag
+ myenv = OS.environ.copy()
+ if env: myenv.update(env)
+ me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
+ stdout = PROC.PIPE, stderr = PROC.PIPE)
+ me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
+ errorwatch.watch(me._proc.stderr, info)
+ childwatch.watch(me._proc.pid, queue, tag)
+
+ def __del__(me):
+ """
+ If I've been forgotten then stop watching for termination.
+ """
+ childwatch.unwatch(me._proc.pid)
+
+def potwatch(what, name, q):
+ """
+ Watch the queue Q for activity as reported by a Command object.
+
+ Information from the process's stdout is reported as
+
+ NOTE WHAT NAME stdout LINE
+
+ abnormal termination is reported as
+
+ WARN WHAT NAME CODE
+
+ where CODE is what the ChildWatch wrote.
+ """
+ eofp = deadp = False
+ while not deadp or not eofp:
+ _, kind, more = q.get()
+ if kind == 'stdout':
+ if more is None:
+ eofp = True
+ else:
+ S.notify('watch', what, name, 'stdout', more)
+ elif kind == 'exit':
+ if more: S.warn('watch', what, name, *more)
+ deadp = True
+
+###--------------------------------------------------------------------------
+### Peer database utilities.
+
+def timespec(info, key, default):
+ """Parse INFO[KEY] as a timespec, or return DEFAULT."""
+ try:
+ return T.timespec(info[key])
+ except (KeyError, T.TripeJobError):
+ return default
+
+def integer(info, key, default):
+ """Parse INFO[KEY] as an integer, or return DEFAULT."""
+ try:
+ return int(info[key])
+ except (KeyError, ValueError):
+ return default
+
+def boolean(info, key, default):
+ """Parse INFO[KEY] as a boolean, or return DEFAULT."""
+ try:
+ return info[key] in ['t', 'true', 'y', 'yes', 'on']
+ except (KeyError, ValueError):
+ return default
+
+def peerinfo(peer):
+ """
+ Return a dictionary containing information about PEER from the database.
+ """
+ return dict(M.URLDecode(CDB.init(opts.cdb)['P' + peer], semip = True))
+
+###--------------------------------------------------------------------------
+### Waking up and watching peers.
+
+def connect(peer, conn = None):
+ """
+ Start the job of connecting to the passive PEER.
+
+ The CONN string is a shell command which will connect to the peer (via some
+ back-channel, say ssh and userv), issue a command
+
+ SVCSUBMIT connect passive [OPTIONS] USER
+
+ and write the resulting challenge to standard error.
+ """
+ if conn is None:
+ try:
+ conn = peerinfo(peer)['connect']
+ except KeyError:
+ return
+ q = T.Queue()
+ cmd = Command(['connect', peer], q, 'connect',
+ ['/bin/sh', '-c', conn], None)
+ _, kind, more = q.peek()
+ if kind == 'stdout':
+ if more is None:
+ S.warn('watch', 'connect', peer, 'unexpected-eof')
+ else:
+ chal = more
+ S.greet(peer, chal)
+ q.get()
+ potwatch('connect', peer, q)
+
+_pingseq = 0
+class PingPeer (object):
+ """
+ Object representing a peer which we are pinging to ensure that it is still
+ present.
+
+ PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an
+ event queue -- which saves us from having an enormous swarm of coroutines
+ -- but most of the actual work is done here.
+
+ In order to avoid confusion between different PingPeer instances for the
+ same actual peer, each PingPeer has a sequence number (its `seq'
+ attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair.
+ (Using the PingPeer instance itself will prevent garbage collection of
+ otherwise defunct instances.)
+ """
+
+ def __init__(me, pinger, queue, peer, info, pingnow):
+ """
+ Create a new PingPeer.
+
+ The PINGER is the Pinger object we should send the results to. This is
+ used when we remove ourselves, if the peer has been explicitly removed.
+
+ The QUEUE is the event queue on which timer and ping-command events
+ should be written.
+
+ The PEER is just the peer's name, as a string.
+
+ The INFO is the database record for the peer, as a dictionary, or None if
+ it's not readily available. (This is just a tweak to save multiple
+ probes if we don't really need them.)
+
+ If PINGNOW is true, then immediately start pinging the peer. Otherwise
+ wait until the usual retry interval.
+ """
+ global _pingseq
+ me._pinger = pinger
+ me._q = queue
+ me._peer = peer
+ me.update(info)
+ me.seq = _pingseq
+ _pingseq += 1
+ me._failures = 0
+ if pingnow:
+ me._timer = None
+ me._ping()
+ else:
+ me._timer = M.SelTimer(time() + me._every, me._time)
+
+ def update(me, info):
+ """
+ Refreshes the timer parameters for this peer. We don't, however,
+ immediately reschedule anything: that will happen next time anything
+ interesting happens.
+ """
+ if info is None:
+ info = peerinfo(me._peer)
+ me._every = timespec(info, 'every', 120)
+ me._timeout = timespec(info, 'timeout', 10)
+ me._retries = integer(info, 'retries', 5)
+ me._connectp = 'connect' in info
+ return me
+
+ def _ping(me):
+ """
+ Send a ping to the peer; the result is sent to the Pinger's event queue.
+ """
+ S.rawcommand(T.TripeAsynchronousCommand(
+ me._q, (me._peer, me.seq),
+ ['PING',
+ '-background', S.bgtag(),
+ '-timeout', str(me._timeout),
+ '--',
+ me._peer]))
+
+ def event(me, code, stuff):
+ """
+ Respond to an event which happened to this peer.
+
+ Timer events indicate that we should start a new ping. (The server has
+ its own timeout which detects lost packets.)
+
+ We trap unknown-peer responses and detach from the Pinger.
+
+ If the ping fails and we run out of retries, we attempt to restart the
+ connection.
+ """
+ if code == 'TIMER':
+ me._failures = 0
+ me._ping()
+ elif code == 'FAIL':
+ S.notify('watch', 'ping-failed', me._peer, *stuff)
+ if stuff and stuff[0] == 'unknown-peer':
+ me._pinger.kill(me._peer)
+ elif code == 'INFO':
+ if stuff[0] == 'ping-ok':
+ if me._failures > 0:
+ S.warn('watch', 'ping-ok', me._peer)
+ me._timer = M.SelTimer(time() + me._every, me._time)
+ elif stuff[0] == 'ping-timeout':
+ me._failures += 1
+ S.warn('watch', 'ping-timeout', me._peer,
+ 'attempt', str(me._failures), 'of', str(me._retries))
+ if me._failures < me._retries:
+ me._ping()
+ else:
+ info = peerinfo(me._peer)
+ if 'connect' in info:
+ S.warn('watch', 'reconnecting', me._peer)
+ S.forcekx(me._peer)
+ T.spawn(T.Coroutine(connect), me._peer)
+ me._timer = M.SelTimer(time() + me._every, me._time)
+ else:
+ S.kill(me._peer)
+ elif stuff[0] == 'ping-peer-died':
+ me._pinger.kill(me._peer)
+
+ @T._callback
+ def _time(me):
+ """
+ Handle timer callbacks by posting a timeout event on the queue.
+ """
+ me._timer = None
+ me._q.put(((me._peer, me.seq), 'TIMER', None))
+
+ def __str__(me):
+ return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
+ def __repr__(me):
+ return str(me)
+
+class Pinger (T.Coroutine):
+ """
+ The Pinger keeps track of the peers which we expect to be connected and
+ takes action if they seem to stop responding.
+
+ There is usually only one Pinger, called pinger.
+
+ The Pinger maintains a collection of PingPeer objects, and an event queue.
+ The PingPeers direct the results of their pings, and timer events, to the
+ event queue. The Pinger's coroutine picks items off the queue and
+ dispatches them back to the PingPeers as appropriate.
+ """
+
+ def __init__(me):
+ """Initialize the Pinger."""
+ T.Coroutine.__init__(me)
+ me._peers = {}
+ me._q = T.Queue()
+
+ def run(me):
+ """
+ Coroutine function: reads the pinger queue and sends events to the
+ PingPeer objects they correspond to.
+ """
+ while True:
+ (peer, seq), code, stuff = me._q.get()
+ if peer in me._peers and seq == me._peers[peer].seq:
+ me._peers[peer].event(code, stuff)
+
+ def add(me, peer, info, pingnow):
+ """
+ Add PEER to the collection of peers under the Pinger's watchful eye.
+ The arguments are as for PingPeer: see above.
+ """
+ me._peers[peer] = PingPeer(me, me._q, peer, info, pingnow)
+ return me
+
+ def kill(me, peer):
+ """Remove PEER from the peers being watched by the Pinger."""
+ del me._peers[peer]
+ return me
+
+ def rescan(me, startup):
+ """
+ General resynchronization method.
+
+ We scan the list of peers (with connect scripts) known at the server.
+ Any which are known to the Pinger but aren't known to the server are
+ removed from our list; newly arrived peers are added. (Note that a peer
+ can change state here either due to the server sneakily changing its list
+ without issuing notifications or, more likely, the database changing its
+ idea of whether a peer is interesting.) Finally, PingPeers which are
+ still present are prodded to update their timing parameters.
+
+ This method is called once at startup to pick up the peers already
+ installed, and again by the dbwatcher coroutine when it detects a change
+ to the database.
+ """
+ correct = {}
+ for peer in S.list():
+ try:
+ info = peerinfo(peer)
+ except KeyError:
+ continue
+ if boolean(info, 'watch', False):
+ correct[peer] = info
+ for peer, obj in me._peers.items():
+ if peer in correct:
+ obj.update(correct[peer])
+ else:
+ del me._peers[peer]
+ for peer, info in correct.iteritems():
+ if peer not in me._peers:
+ if startup:
+ ifname = S.ifname(peer)
+ addr = S.addr(peer)
+ addpeer(info, peer, ifname, *addr)
+ else:
+ me.add(peer, info, True)
+ return me
+
+ def adopted(me):
+ """
+ Returns the list of peers being watched by the Pinger.
+ """
+ return me._peers.keys()
+
+###--------------------------------------------------------------------------
+### New connections.
+
+def encode_envvars(env, prefix, vars):
+ """
+ Encode the variables in VARS suitably for including in a program
+ environment. Lowercase letters in variable names are forced to uppercase;
+ runs of non-alphanumeric characters are replaced by single underscores; and
+ the PREFIX is prepended. The resulting variables are written to ENV.
+ """
+ for k, v in vars.iteritems():
+ env[prefix + r_bad.sub('_', k.upper())] = v
+
+r_bad = RX.compile(r'[\W_]+')
+def envvars(info):
+ """
+ Translate the database INFO dictionary for a peer into a dictionary of
+ environment variables with plausible upper-case names and a P_ prefix.
+ Also collect the crypto information into A_ variables.
+ """
+ env = {}
+ encode_envvars(env, 'P_', info)
+ encode_envvars(env, 'A_', S.algs())
+ return env
+
+def ifupdown(what, peer, info, *args):
+ """
+ Run the interface up/down script for a peer.
+
+ WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. INFO is the
+ database record dictionary. ARGS is a list of arguments to pass to the
+ script, in addition to the peer name.
+
+ The command is run and watched in the background by potwatch.
+ """
+ q = T.Queue()
+ c = Command([what, peer], q, what,
+ M.split(info[what], quotep = True)[0] +
+ [peer] + list(args),
+ envvars(info))
+ potwatch(what, peer, q)
+
+def addpeer(info, peer, ifname, *addr):
+ """
+ Add a new peer to our collection.
+
+ INFO is the peer information dictionary, or None if we don't have one yet.
+
+ PEER names the peer; IFNAME is the interface name for its tunnel; and ADDR
+ is the list of tokens representing its address.
+
+ We try to bring up the interface and provoke a connection to the peer if
+ it's passive.
+ """
+ if info is None:
+ try:
+ info = peerinfo(peer)
+ except KeyError:
+ return
+ if 'ifup' in info:
+ T.Coroutine(ifupdown).switch('ifup', peer, info, ifname, *addr)
+ if 'connect' in info:
+ T.Coroutine(connect).switch(peer, info['connect'])
+ if boolean(info, 'watch', False):
+ pinger.add(peer, info, False)
+
+def delpeer(peer):
+ """Drop the PEER from the Pinger and put its interface to bed."""
+ try:
+ info = peerinfo(peer)
+ except KeyError:
+ return
+ try:
+ pinger.kill(peer)
+ except KeyError:
+ pass
+ if 'ifdown' in info:
+ T.Coroutine(ifupdown).switch('ifdown', peer, info)
+
+def notify(_, code, *rest):
+ """
+ Watch for notifications.
+
+ We trap ADD and KILL notifications, and send them straight to addpeer and
+ delpeer respectively.
+ """
+ if code == 'ADD':
+ addpeer(None, *rest)
+ elif code == 'KILL':
+ delpeer(*rest)
+
+###--------------------------------------------------------------------------
+### Command stubs.
+
+def cmd_stub(*args):
+ raise T.TripeJobError('not-implemented')
+
+def cmd_kick(peer):
+ """
+ kick PEER: Force a new connection attempt for PEER
+ """
+ if peer not in pinger.adopted():
+ raise T.TripeJobError('peer-not-adopted', peer)
+ T.spawn(T.Coroutine(connect), peer)
+
+def cmd_adopted():
+ """
+ adopted: Report a list of adopted peers.
+ """
+ for peer in pinger.adopted():
+ T.svcinfo(peer)
+
+###--------------------------------------------------------------------------
+### Start up.
+
+def setup():
+ """
+ Service setup.
+
+ Register the notification watcher, and rescan the peers.
+ """
+ S.handler['NOTE'] = notify
+ S.watch('+n')
+ pinger.rescan(opts.startup)
+
+def init():
+ """
+ Initialization to be done before service startup.
+ """
+ global errorwatch, childwatch, pinger
+ errorwatch = ErrorWatch()
+ childwatch = ChildWatch()
+ pinger = Pinger()
+ T.Coroutine(dbwatch).switch()
+ errorwatch.switch()
+ pinger.switch()
+
+def parse_options():
+ """
+ Parse the command-line options.
+
+ Automatically changes directory to the requested configdir, and turns on
+ debugging. Returns the options object.
+ """
+ op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
+ version = '%%prog %s' % VERSION)
+
+ op.add_option('-a', '--admin-socket',
+ metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
+ help = 'Select socket to connect to [default %default]')
+ op.add_option('-d', '--directory',
+ metavar = 'DIR', dest = 'dir', default = T.configdir,
+ help = 'Select current diretory [default %default]')
+ op.add_option('-p', '--peerdb',
+ metavar = 'FILE', dest = 'cdb', default = T.peerdb,
+ help = 'Select peers database [default %default]')
+ op.add_option('--daemon', dest = 'daemon',
+ default = False, action = 'store_true',
+ help = 'Become a daemon after successful initialization')
+ op.add_option('--debug', dest = 'debug',
+ default = False, action = 'store_true',
+ help = 'Emit debugging trace information')
+ op.add_option('--startup', dest = 'startup',
+ default = False, action = 'store_true',
+ help = 'Being called as part of the server startup')
+
+ opts, args = op.parse_args()
+ if args: op.error('no arguments permitted')
+ OS.chdir(opts.dir)
+ T._debug = opts.debug
+ return opts
+
+## Service table, for running manually.
+service_info = [('watch', T.VERSION, {
+ 'adopted': (0, 0, '', cmd_adopted),
+ 'kick': (1, 1, 'PEER', cmd_kick)
+})]
+
+if __name__ == '__main__':
+ opts = parse_options()
+ T.runservices(opts.tripesock, service_info,
+ init = init, setup = setup,
+ daemon = opts.daemon)
+
+###----- That's all, folks --------------------------------------------------