-#! @PYTHON@
-### -*-python-*-
-###
-### Watch arrival and departure of peers
-###
-### (c) 2007 Straylight/Edgeware
-###
-
-###----- Licensing notice ---------------------------------------------------
-###
-### This file is part of Trivial IP Encryption (TrIPE).
-###
-### TrIPE is free software; you can redistribute it and/or modify
-### it under the terms of the GNU General Public License as published by
-### the Free Software Foundation; either version 2 of the License, or
-### (at your option) any later version.
-###
-### TrIPE is distributed in the hope that it will be useful,
-### but WITHOUT ANY WARRANTY; without even the implied warranty of
-### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-### GNU General Public License for more details.
-###
-### You should have received a copy of the GNU General Public License
-### along with TrIPE; if not, write to the Free Software Foundation,
-### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
-
-VERSION = '@VERSION@'
-
-###--------------------------------------------------------------------------
-### External dependencies.
-
-from optparse import OptionParser
-import tripe as T
-import os as OS
-import signal as SIG
-import errno as E
-import cdb as CDB
-import mLib as M
-import re as RX
-from time import time
-import subprocess as PROC
-
-S = T.svcmgr
-
-###--------------------------------------------------------------------------
-### Running auxiliary commands.
-
-class SelLineQueue (M.SelLineBuffer):
- """Glues the select-line-buffer into the coroutine queue system."""
-
- def __new__(cls, file, queue, tag, kind):
- """See __init__ for documentation."""
- return M.SelLineBuffer.__new__(cls, file.fileno())
-
- def __init__(me, file, queue, tag, kind):
- """
- Initialize a new line-reading adaptor.
-
- The adaptor reads lines from FILE. Each line is inserted as a message of
- the stated KIND, bearing the TAG, into the QUEUE. End-of-file is
- represented as None.
- """
- me._q = queue
- me._file = file
- me._tag = tag
- me._kind = kind
- me.enable()
-
- @T._callback
- def line(me, line):
- me._q.put((me._tag, me._kind, line))
-
- @T._callback
- def eof(me):
- me.disable()
- me._q.put((me._tag, me._kind, None))
-
-class ErrorWatch (T.Coroutine):
- """
- An object which watches stderr streams for errors and converts them into
- warnings of the form
-
- WARN watch INFO stderr LINE
-
- The INFO is a list of tokens associated with the file when it was
- registered.
-
- Usually there is a single ErrorWatch object, called errorwatch.
- """
-
- def __init__(me):
- """Initialization: there are no arguments."""
- T.Coroutine.__init__(me)
- me._q = T.Queue()
- me._map = {}
- me._seq = 1
-
- def watch(me, file, info):
- """
- Adds FILE to the collection of files to watch.
-
- INFO will be written in the warning messages from this FILE. Returns a
- sequence number which can be used to unregister the file again.
- """
- seq = me._seq
- me._seq += 1
- me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
- return seq
-
- def unwatch(me, seq):
- """Stop watching the file with sequence number SEQ."""
- del me._map[seq]
- return me
-
- def run(me):
- """
- Coroutine function: read items from the queue and report them.
-
- Unregisters files automatically when they reach EOF.
- """
- while True:
- seq, _, line = me._q.get()
- if line is None:
- me.unwatch(seq)
- else:
- S.warn(*['watch'] + me._map[seq][0] + ['stderr', line])
-
-def dbwatch():
- """
- Coroutine function: wake up every second and notice changes to the
- database. When a change happens, tell the Pinger (q.v.) to rescan its
- peers.
- """
- cr = T.Coroutine.getcurrent()
- main = cr.parent
- fw = M.FWatch(opts.cdb)
- while True:
- timer = M.SelTimer(time() + 1, lambda: cr.switch())
- main.switch()
- if fw.update():
- pinger.rescan(False)
- S.notify('watch', 'peerdb-update')
-
-class ChildWatch (M.SelSignal):
- """
- An object which watches for specified processes exiting and reports
- terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
-
- There is usually only one ChildWatch object, called childwatch.
- """
-
- def __new__(cls):
- """Initialize the child-watcher."""
- return M.SelSignal.__new__(cls, SIG.SIGCHLD)
-
- def __init__(me):
- """Initialize the child-watcher."""
- me._pid = {}
- me.enable()
-
- def watch(me, pid, queue, tag):
- """
- Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE)
- to the QUEUE, where CODE is one of
-
- * None (successful termination)
- * ['exit-nonzero', CODE] (CODE is a string!)
- * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
- * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
- """
- me._pid[pid] = queue, tag
- return me
-
- def unwatch(me, pid):
- """Unregister PID as a child to watch."""
- del me._pid[pid]
- return me
-
- @T._callback
- def signalled(me):
- """
- Called when child processes exit: collect exit statuses and report
- failures.
- """
- while True:
- try:
- pid, status = OS.waitpid(-1, OS.WNOHANG)
- except OSError, exc:
- if exc.errno == E.ECHILD:
- break
- if pid == 0:
- break
- if pid not in me._pid:
- continue
- queue, tag = me._pid[pid]
- if OS.WIFEXITED(status):
- exit = OS.WEXITSTATUS(status)
- if exit == 0:
- code = None
- else:
- code = ['exit-nonzero', str(exit)]
- elif OS.WIFSIGNALED(status):
- code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
- else:
- code = ['exit-unknown', hex(status)]
- queue.put((tag, 'exit', code))
-
-class Command (object):
- """
- Represents a running command.
-
- This class is the main interface to the machery provided by the ChildWatch
- and ErrorWatch objects. See also potwatch.
- """
-
- def __init__(me, info, queue, tag, args, env):
- """
- Start a new child process.
-
- The ARGS are a list of arguments to be given to the child process. The
- ENV is either None or a dictionary of environment variable assignments to
- override the extant environment. INFO is a list of tokens to be included
- in warnings about the child's stderr output. If the child writes a line
- to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the
- child exits, write (TAG, 'exit', CODE) to the QUEUE.
- """
- me._info = info
- me._q = queue
- me._tag = tag
- myenv = OS.environ.copy()
- if env: myenv.update(env)
- me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
- stdout = PROC.PIPE, stderr = PROC.PIPE)
- me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
- errorwatch.watch(me._proc.stderr, info)
- childwatch.watch(me._proc.pid, queue, tag)
-
- def __del__(me):
- """
- If I've been forgotten then stop watching for termination.
- """
- childwatch.unwatch(me._proc.pid)
-
-def potwatch(what, name, q):
- """
- Watch the queue Q for activity as reported by a Command object.
-
- Information from the process's stdout is reported as
-
- NOTE WHAT NAME stdout LINE
-
- abnormal termination is reported as
-
- WARN WHAT NAME CODE
-
- where CODE is what the ChildWatch wrote.
- """
- eofp = deadp = False
- while not deadp or not eofp:
- _, kind, more = q.get()
- if kind == 'stdout':
- if more is None:
- eofp = True
- else:
- S.notify('watch', what, name, 'stdout', more)
- elif kind == 'exit':
- if more: S.warn('watch', what, name, *more)
- deadp = True
-
-###--------------------------------------------------------------------------
-### Peer database utilities.
-
-_magic = ['_magic'] # An object distinct from all others
-
-class Peer (object):
- """Representation of a peer in the database."""
-
- def __init__(me, peer, cdb = None):
- """
- Create a new peer, named PEER.
-
- Information about the peer is read from the database CDB, or the default
- one given on the command-line.
- """
- me.name = peer
- record = (cdb or CDB.init(opts.cdb))['P' + peer]
- me.__dict__.update(M.URLDecode(record, semip = True))
-
- def get(me, key, default = _magic, filter = None):
- """
- Get the information stashed under KEY from the peer's database record.
-
- If DEFAULT is given, then use it if the database doesn't contain the
- necessary information. If no DEFAULT is given, then report an error. If
- a FILTER function is given then apply it to the information from the
- database before returning it.
- """
- attr = me.__dict__.get(key, default)
- if attr is _magic:
- raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
- elif filter is not None:
- attr = filter(attr)
- return attr
-
- def has(me, key):
- """
- Return whether the peer's database record has the KEY.
- """
- return key in me.__dict__
-
- def list(me):
- """
- Iterate over the available keys in the peer's database record.
- """
- return me.__dict__.iterkeys()
-
-def boolean(value):
- """Parse VALUE as a boolean."""
- return value in ['t', 'true', 'y', 'yes', 'on']
-
-###--------------------------------------------------------------------------
-### Waking up and watching peers.
-
-def run_connect(peer, cmd):
- """
- Start the job of connecting to the passive PEER.
-
- The CMD string is a shell command which will connect to the peer (via some
- back-channel, say ssh and userv), issue a command
-
- SVCSUBMIT connect passive [OPTIONS] USER
-
- and write the resulting challenge to standard error.
- """
- q = T.Queue()
- cmd = Command(['connect', peer.name], q, 'connect',
- ['/bin/sh', '-c', cmd], None)
- _, kind, more = q.peek()
- if kind == 'stdout':
- if more is None:
- S.warn('watch', 'connect', peer.name, 'unexpected-eof')
- else:
- chal = more
- S.greet(peer.name, chal)
- q.get()
- potwatch('connect', peer.name, q)
-
-def run_disconnect(peer, cmd):
- """
- Start the job of disconnecting from a passive PEER.
-
- The CMD string is a shell command which will disconnect from the peer.
- """
- q = T.Queue()
- cmd = Command(['disconnect', peer.name], q, 'disconnect',
- ['/bin/sh', '-c', cmd], None)
- potwatch('disconnect', peer.name, q)
-
-_pingseq = 0
-class PingPeer (object):
- """
- Object representing a peer which we are pinging to ensure that it is still
- present.
-
- PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an
- event queue -- which saves us from having an enormous swarm of coroutines
- -- but most of the actual work is done here.
-
- In order to avoid confusion between different PingPeer instances for the
- same actual peer, each PingPeer has a sequence number (its `seq'
- attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair.
- (Using the PingPeer instance itself will prevent garbage collection of
- otherwise defunct instances.)
- """
-
- def __init__(me, pinger, queue, peer, pingnow):
- """
- Create a new PingPeer.
-
- The PINGER is the Pinger object we should send the results to. This is
- used when we remove ourselves, if the peer has been explicitly removed.
-
- The QUEUE is the event queue on which timer and ping-command events
- should be written.
-
- The PEER is a `Peer' object describing the peer.
-
- If PINGNOW is true, then immediately start pinging the peer. Otherwise
- wait until the usual retry interval.
- """
- global _pingseq
- me._pinger = pinger
- me._q = queue
- me._peer = peer.name
- me.update(peer)
- me.seq = _pingseq
- _pingseq += 1
- me._failures = 0
- if pingnow:
- me._timer = None
- me._ping()
- else:
- me._timer = M.SelTimer(time() + me._every, me._time)
-
- def update(me, peer):
- """
- Refreshes the timer parameters for this peer. We don't, however,
- immediately reschedule anything: that will happen next time anything
- interesting happens.
- """
- if peer is None: peer = Peer(me._peer)
- assert peer.name == me._peer
- me._every = peer.get('every', filter = T.timespec, default = 120)
- me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
- me._retries = peer.get('retries', filter = int, default = 5)
- me._connectp = peer.has('connect')
- return me
-
- def _ping(me):
- """
- Send a ping to the peer; the result is sent to the Pinger's event queue.
- """
- S.rawcommand(T.TripeAsynchronousCommand(
- me._q, (me._peer, me.seq),
- ['EPING',
- '-background', S.bgtag(),
- '-timeout', str(me._timeout),
- '--',
- me._peer]))
-
- def _reconnect(me):
- peer = Peer(me._peer)
- if peer.has('connect'):
- S.warn('watch', 'reconnecting', me._peer)
- S.forcekx(me._peer)
- T.spawn(run_connect, peer, peer.get('connect'))
- me._timer = M.SelTimer(time() + me._every, me._time)
- else:
- S.kill(me._peer)
-
- def event(me, code, stuff):
- """
- Respond to an event which happened to this peer.
-
- Timer events indicate that we should start a new ping. (The server has
- its own timeout which detects lost packets.)
-
- We trap unknown-peer responses and detach from the Pinger.
-
- If the ping fails and we run out of retries, we attempt to restart the
- connection.
- """
- if code == 'TIMER':
- me._failures = 0
- me._ping()
- elif code == 'FAIL':
- S.notify('watch', 'ping-failed', me._peer, *stuff)
- if not stuff:
- pass
- elif stuff[0] == 'unknown-peer':
- me._pinger.kill(me._peer)
- elif stuff[0] == 'ping-send-failed':
- me._reconnect()
- elif code == 'INFO':
- if stuff[0] == 'ping-ok':
- if me._failures > 0:
- S.warn('watch', 'ping-ok', me._peer)
- me._timer = M.SelTimer(time() + me._every, me._time)
- elif stuff[0] == 'ping-timeout':
- me._failures += 1
- S.warn('watch', 'ping-timeout', me._peer,
- 'attempt', str(me._failures), 'of', str(me._retries))
- if me._failures < me._retries:
- me._ping()
- else:
- me._reconnect()
- elif stuff[0] == 'ping-peer-died':
- me._pinger.kill(me._peer)
-
- @T._callback
- def _time(me):
- """
- Handle timer callbacks by posting a timeout event on the queue.
- """
- me._timer = None
- me._q.put(((me._peer, me.seq), 'TIMER', None))
-
- def __str__(me):
- return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
- def __repr__(me):
- return str(me)
-
-class Pinger (T.Coroutine):
- """
- The Pinger keeps track of the peers which we expect to be connected and
- takes action if they seem to stop responding.
-
- There is usually only one Pinger, called pinger.
-
- The Pinger maintains a collection of PingPeer objects, and an event queue.
- The PingPeers direct the results of their pings, and timer events, to the
- event queue. The Pinger's coroutine picks items off the queue and
- dispatches them back to the PingPeers as appropriate.
- """
-
- def __init__(me):
- """Initialize the Pinger."""
- T.Coroutine.__init__(me)
- me._peers = {}
- me._q = T.Queue()
-
- def run(me):
- """
- Coroutine function: reads the pinger queue and sends events to the
- PingPeer objects they correspond to.
- """
- while True:
- (peer, seq), code, stuff = me._q.get()
- if peer in me._peers and seq == me._peers[peer].seq:
- me._peers[peer].event(code, stuff)
-
- def add(me, peer, pingnow):
- """
- Add PEER to the collection of peers under the Pinger's watchful eye.
- The arguments are as for PingPeer: see above.
- """
- me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
- return me
-
- def kill(me, peername):
- """Remove PEER from the peers being watched by the Pinger."""
- del me._peers[peername]
- return me
-
- def rescan(me, startup):
- """
- General resynchronization method.
-
- We scan the list of peers (with connect scripts) known at the server.
- Any which are known to the Pinger but aren't known to the server are
- removed from our list; newly arrived peers are added. (Note that a peer
- can change state here either due to the server sneakily changing its list
- without issuing notifications or, more likely, the database changing its
- idea of whether a peer is interesting.) Finally, PingPeers which are
- still present are prodded to update their timing parameters.
-
- This method is called once at startup to pick up the peers already
- installed, and again by the dbwatcher coroutine when it detects a change
- to the database.
- """
- if T._debug: print '# rescan peers'
- correct = {}
- start = {}
- for name in S.list():
- try: peer = Peer(name)
- except KeyError: continue
- if peer.get('watch', filter = boolean, default = False):
- if T._debug: print '# interesting peer %s' % peer
- correct[peer.name] = start[peer.name] = peer
- elif startup:
- if T._debug: print '# peer %s ready for adoption' % peer
- start[peer.name] = peer
- for name, obj in me._peers.items():
- try:
- peer = correct[name]
- except KeyError:
- if T._debug: print '# peer %s vanished' % name
- del me._peers[name]
- else:
- obj.update(peer)
- for name, peer in start.iteritems():
- if name in me._peers: continue
- if startup:
- if T._debug: print '# setting up peer %s' % name
- ifname = S.ifname(name)
- addr = S.addr(name)
- T.defer(adoptpeer, peer, ifname, *addr)
- else:
- if T._debug: print '# adopting new peer %s' % name
- me.add(peer, True)
- return me
-
- def adopted(me):
- """
- Returns the list of peers being watched by the Pinger.
- """
- return me._peers.keys()
-
-###--------------------------------------------------------------------------
-### New connections.
-
-def encode_envvars(env, prefix, vars):
- """
- Encode the variables in VARS suitably for including in a program
- environment. Lowercase letters in variable names are forced to uppercase;
- runs of non-alphanumeric characters are replaced by single underscores; and
- the PREFIX is prepended. The resulting variables are written to ENV.
- """
- for k, v in vars.iteritems():
- env[prefix + r_bad.sub('_', k.upper())] = v
-
-r_bad = RX.compile(r'[\W_]+')
-def envvars(peer):
- """
- Translate the database information for a PEER into a dictionary of
- environment variables with plausible upper-case names and a P_ prefix.
- Also collect the crypto information into A_ variables.
- """
- env = {}
- encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
- encode_envvars(env, 'A_', S.algs(peer.name))
- return env
-
-def run_ifupdown(what, peer, *args):
- """
- Run the interface up/down script for a peer.
-
- WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. ARGS is a
- list of arguments to pass to the script, in addition to the peer name.
-
- The command is run and watched in the background by potwatch.
- """
- q = T.Queue()
- c = Command([what, peer.name], q, what,
- M.split(peer.get(what), quotep = True)[0] +
- [peer.name] + list(args),
- envvars(peer))
- potwatch(what, peer.name, q)
-
-def adoptpeer(peer, ifname, *addr):
- """
- Add a new peer to our collection.
-
- PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
- ADDR is the list of tokens representing its address.
-
- We try to bring up the interface and provoke a connection to the peer if
- it's passive.
- """
- if peer.has('ifup'):
- T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
- .switch('ifup', peer, ifname, *addr)
- cmd = peer.get('connect', default = None)
- if cmd is not None:
- T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
- .switch(peer, cmd)
- if peer.get('watch', filter = boolean, default = False):
- pinger.add(peer, False)
-
-def disownpeer(peer):
- """Drop the PEER from the Pinger and put its interface to bed."""
- try: pinger.kill(peer)
- except KeyError: pass
- cmd = peer.get('disconnect', default = None)
- if cmd is not None:
- T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
- .switch(peer, cmd)
- if peer.has('ifdown'):
- T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
- .switch('ifdown', peer)
-
-def notify(_, code, *rest):
- """
- Watch for notifications.
-
- We trap ADD and KILL notifications, and send them straight to addpeer and
- delpeer respectively.
- """
- if code == 'ADD':
- try: p = Peer(rest[0])
- except KeyError: return
- adoptpeer(p, *rest[1:])
- elif code == 'KILL':
- try: p = Peer(rest[0])
- except KeyError: return
- disownpeer(p, *rest[1:])
-
-###--------------------------------------------------------------------------
-### Command stubs.
-
-def cmd_stub(*args):
- raise T.TripeJobError('not-implemented')
-
-def cmd_kick(name):
- """
- kick NAME: Force a new connection attempt for the NAMEd peer.
- """
- if name not in pinger.adopted():
- raise T.TripeJobError('peer-not-adopted', name)
- try: peer = Peer(name)
- except KeyError: raise T.TripeJobError('unknown-peer', name)
- T.spawn(connect, peer)
-
-def cmd_adopted():
- """
- adopted: Report a list of adopted peers.
- """
- for name in pinger.adopted():
- T.svcinfo(name)
-
-###--------------------------------------------------------------------------
-### Start up.
-
-def setup():
- """
- Service setup.
-
- Register the notification watcher, and rescan the peers.
- """
- S.handler['NOTE'] = notify
- S.watch('+n')
- pinger.rescan(opts.startup)
-
-def init():
- """
- Initialization to be done before service startup.
- """
- global errorwatch, childwatch, pinger
- errorwatch = ErrorWatch()
- childwatch = ChildWatch()
- pinger = Pinger()
- T.Coroutine(dbwatch, name = 'dbwatch').switch()
- errorwatch.switch()
- pinger.switch()
-
-def parse_options():
- """
- Parse the command-line options.
-
- Automatically changes directory to the requested configdir, and turns on
- debugging. Returns the options object.
- """
- op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
- version = '%%prog %s' % VERSION)
-
- op.add_option('-a', '--admin-socket',
- metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
- help = 'Select socket to connect to [default %default]')
- op.add_option('-d', '--directory',
- metavar = 'DIR', dest = 'dir', default = T.configdir,
- help = 'Select current diretory [default %default]')
- op.add_option('-p', '--peerdb',
- metavar = 'FILE', dest = 'cdb', default = T.peerdb,
- help = 'Select peers database [default %default]')
- op.add_option('--daemon', dest = 'daemon',
- default = False, action = 'store_true',
- help = 'Become a daemon after successful initialization')
- op.add_option('--debug', dest = 'debug',
- default = False, action = 'store_true',
- help = 'Emit debugging trace information')
- op.add_option('--startup', dest = 'startup',
- default = False, action = 'store_true',
- help = 'Being called as part of the server startup')
-
- opts, args = op.parse_args()
- if args: op.error('no arguments permitted')
- OS.chdir(opts.dir)
- T._debug = opts.debug
- return opts
-
-## Service table, for running manually.
-service_info = [('watch', T.VERSION, {
- 'adopted': (0, 0, '', cmd_adopted),
- 'kick': (1, 1, 'PEER', cmd_kick)
-})]
-
-if __name__ == '__main__':
- opts = parse_options()
- T.runservices(opts.tripesock, service_info,
- init = init, setup = setup,
- daemon = opts.daemon)
-
-###----- That's all, folks --------------------------------------------------