chiark / gitweb /
Upgrade licence to GPLv3+.
[tripe] / svc / connect.in
index 0ae539f5123c4b46ad9f520d71e44d63f6c3b0ee..e3a5022331c1f1eaa8c4f880dc238c90a31ea433 100644 (file)
@@ -1,28 +1,27 @@
 #! @PYTHON@
 ### -*-python-*-
 ###
-### Service for establishing dynamic connections
+### Connect to remote peers, and keep track of them
 ###
-### (c) 2006 Straylight/Edgeware
+### (c) 2007 Straylight/Edgeware
 ###
 
 ###----- Licensing notice ---------------------------------------------------
 ###
 ### This file is part of Trivial IP Encryption (TrIPE).
 ###
-### TrIPE is free software; you can redistribute it and/or modify
-### it under the terms of the GNU General Public License as published by
-### the Free Software Foundation; either version 2 of the License, or
-### (at your option) any later version.
+### TrIPE is free software: you can redistribute it and/or modify it under
+### the terms of the GNU General Public License as published by the Free
+### Software Foundation; either version 3 of the License, or (at your
+### option) any later version.
 ###
-### TrIPE is distributed in the hope that it will be useful,
-### but WITHOUT ANY WARRANTY; without even the implied warranty of
-### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-### GNU General Public License for more details.
+### TrIPE is distributed in the hope that it will be useful, but WITHOUT
+### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+### FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+### for more details.
 ###
 ### You should have received a copy of the GNU General Public License
-### along with TrIPE; if not, write to the Free Software Foundation,
-### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+### along with TrIPE.  If not, see <https://www.gnu.org/licenses/>.
 
 VERSION = '@VERSION@'
 
@@ -32,14 +31,243 @@ VERSION = '@VERSION@'
 from optparse import OptionParser
 import tripe as T
 import os as OS
+import signal as SIG
+import errno as E
 import cdb as CDB
 import mLib as M
+import re as RX
 from time import time
+import subprocess as PROC
 
 S = T.svcmgr
 
 ###--------------------------------------------------------------------------
-### Main service machinery.
+### Running auxiliary commands.
+
+class SelLineQueue (M.SelLineBuffer):
+  """Glues the select-line-buffer into the coroutine queue system."""
+
+  def __new__(cls, file, queue, tag, kind):
+    """See __init__ for documentation."""
+    return M.SelLineBuffer.__new__(cls, file.fileno())
+
+  def __init__(me, file, queue, tag, kind):
+    """
+    Initialize a new line-reading adaptor.
+
+    The adaptor reads lines from FILE.  Each line is inserted as a message of
+    the stated KIND, bearing the TAG, into the QUEUE.  End-of-file is
+    represented as None.
+    """
+    me._q = queue
+    me._file = file
+    me._tag = tag
+    me._kind = kind
+    me.enable()
+
+  @T._callback
+  def line(me, line):
+    me._q.put((me._tag, me._kind, line))
+
+  @T._callback
+  def eof(me):
+    me.disable()
+    me._q.put((me._tag, me._kind, None))
+
+class ErrorWatch (T.Coroutine):
+  """
+  An object which watches stderr streams for errors and converts them into
+  warnings of the form
+
+    WARN connect INFO stderr LINE
+
+  The INFO is a list of tokens associated with the file when it was
+  registered.
+
+  Usually there is a single ErrorWatch object, called errorwatch.
+  """
+
+  def __init__(me):
+    """Initialization: there are no arguments."""
+    T.Coroutine.__init__(me)
+    me._q = T.Queue()
+    me._map = {}
+    me._seq = 1
+
+  def watch(me, file, info):
+    """
+    Adds FILE to the collection of files to watch.
+
+    INFO will be written in the warning messages from this FILE.  Returns a
+    sequence number which can be used to unregister the file again.
+    """
+    seq = me._seq
+    me._seq += 1
+    me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
+    return seq
+
+  def unwatch(me, seq):
+    """Stop watching the file with sequence number SEQ."""
+    del me._map[seq]
+    return me
+
+  def run(me):
+    """
+    Coroutine function: read items from the queue and report them.
+
+    Unregisters files automatically when they reach EOF.
+    """
+    while True:
+      seq, _, line = me._q.get()
+      if line is None:
+        me.unwatch(seq)
+      else:
+        S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
+
+def dbwatch():
+  """
+  Coroutine function: wake up every minute and notice changes to the
+  database.  When a change happens, tell the Pinger (q.v.) to rescan its
+  peers.
+  """
+  cr = T.Coroutine.getcurrent()
+  main = cr.parent
+  fw = M.FWatch(opts.cdb)
+  while True:
+    timer = M.SelTimer(time() + 60, lambda: cr.switch())
+    main.switch()
+    if fw.update():
+      pinger.rescan(False)
+      S.notify('connect', 'peerdb-update')
+
+class ChildWatch (M.SelSignal):
+  """
+  An object which watches for specified processes exiting and reports
+  terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
+
+  There is usually only one ChildWatch object, called childwatch.
+  """
+
+  def __new__(cls):
+    """Initialize the child-watcher."""
+    return M.SelSignal.__new__(cls, SIG.SIGCHLD)
+
+  def __init__(me):
+    """Initialize the child-watcher."""
+    me._pid = {}
+    me.enable()
+
+  def watch(me, pid, queue, tag):
+    """
+    Register PID as a child to watch.  If it exits, write (TAG, 'exit', CODE)
+    to the QUEUE, where CODE is one of
+
+      * None (successful termination)
+      * ['exit-nonzero', CODE] (CODE is a string!)
+      * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
+      * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
+    """
+    me._pid[pid] = queue, tag
+    return me
+
+  def unwatch(me, pid):
+    """Unregister PID as a child to watch."""
+    del me._pid[pid]
+    return me
+
+  @T._callback
+  def signalled(me):
+    """
+    Called when child processes exit: collect exit statuses and report
+    failures.
+    """
+    while True:
+      try:
+        pid, status = OS.waitpid(-1, OS.WNOHANG)
+      except OSError, exc:
+        if exc.errno == E.ECHILD:
+          break
+      if pid == 0:
+        break
+      if pid not in me._pid:
+        continue
+      queue, tag = me._pid[pid]
+      if OS.WIFEXITED(status):
+        exit = OS.WEXITSTATUS(status)
+        if exit == 0:
+          code = None
+        else:
+          code = ['exit-nonzero', str(exit)]
+      elif OS.WIFSIGNALED(status):
+        code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
+      else:
+        code = ['exit-unknown', hex(status)]
+      queue.put((tag, 'exit', code))
+
+class Command (object):
+  """
+  Represents a running command.
+
+  This class is the main interface to the machery provided by the ChildWatch
+  and ErrorWatch objects.  See also potwatch.
+  """
+
+  def __init__(me, info, queue, tag, args, env):
+    """
+    Start a new child process.
+
+    The ARGS are a list of arguments to be given to the child process.  The
+    ENV is either None or a dictionary of environment variable assignments to
+    override the extant environment.  INFO is a list of tokens to be included
+    in warnings about the child's stderr output.  If the child writes a line
+    to standard output, put (TAG, 'stdout', LINE) to the QUEUE.  When the
+    child exits, write (TAG, 'exit', CODE) to the QUEUE.
+    """
+    me._info = info
+    me._q = queue
+    me._tag = tag
+    myenv = OS.environ.copy()
+    if env: myenv.update(env)
+    me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
+                          stdout = PROC.PIPE, stderr = PROC.PIPE)
+    me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
+    errorwatch.watch(me._proc.stderr, info)
+    childwatch.watch(me._proc.pid, queue, tag)
+
+  def __del__(me):
+    """
+    If I've been forgotten then stop watching for termination.
+    """
+    childwatch.unwatch(me._proc.pid)
+
+def potwatch(what, name, q):
+  """
+  Watch the queue Q for activity as reported by a Command object.
+
+  Information from the process's stdout is reported as
+
+    NOTE WHAT NAME stdout LINE
+
+  abnormal termination is reported as
+
+    WARN WHAT NAME CODE
+
+  where CODE is what the ChildWatch wrote.
+  """
+  eofp = deadp = False
+  while not deadp or not eofp:
+    _, kind, more = q.get()
+    if kind == 'stdout':
+      if more is None:
+        eofp = True
+      else:
+        S.notify('connect', what, name, 'stdout', more)
+    elif kind == 'exit':
+      if more: S.warn('connect', what, name, *more)
+      deadp = True
+
+###--------------------------------------------------------------------------
+### Peer database utilities.
 
 _magic = ['_magic']                     # An object distinct from all others
 
@@ -54,30 +282,382 @@ class Peer (object):
     one given on the command-line.
     """
     me.name = peer
-    try:
-      record = (cdb or CDB.init(opts.cdb))['P' + peer]
-    except KeyError:
-      raise T.TripeJobError('unknown-peer', peer)
+    record = (cdb or CDB.init(opts.cdb))['P' + peer]
     me.__dict__.update(M.URLDecode(record, semip = True))
 
-  def get(me, key, default = _magic):
+  def get(me, key, default = _magic, filter = None):
     """
     Get the information stashed under KEY from the peer's database record.
 
     If DEFAULT is given, then use it if the database doesn't contain the
-    necessary information.  If no DEFAULT is given, then report an error.
+    necessary information.  If no DEFAULT is given, then report an error.  If
+    a FILTER function is given then apply it to the information from the
+    database before returning it.
     """
     attr = me.__dict__.get(key, default)
     if attr is _magic:
       raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
+    elif filter is not None:
+      attr = filter(attr)
     return attr
 
+  def has(me, key):
+    """
+    Return whether the peer's database record has the KEY.
+    """
+    return key in me.__dict__
+
   def list(me):
     """
     Iterate over the available keys in the peer's database record.
     """
     return me.__dict__.iterkeys()
 
+def boolean(value):
+  """Parse VALUE as a boolean."""
+  return value in ['t', 'true', 'y', 'yes', 'on']
+
+###--------------------------------------------------------------------------
+### Waking up and watching peers.
+
+def run_connect(peer, cmd):
+  """
+  Start the job of connecting to the passive PEER.
+
+  The CMD string is a shell command which will connect to the peer (via some
+  back-channel, say ssh and userv), issue a command
+
+    SVCSUBMIT connect passive [OPTIONS] USER
+
+  and write the resulting challenge to standard error.
+  """
+  q = T.Queue()
+  cmd = Command(['connect', peer.name], q, 'connect',
+                ['/bin/sh', '-c', cmd], None)
+  _, kind, more = q.peek()
+  if kind == 'stdout':
+    if more is None:
+      S.warn('connect', 'connect', peer.name, 'unexpected-eof')
+    else:
+      chal = more
+      S.greet(peer.name, chal)
+      q.get()
+  potwatch('connect', peer.name, q)
+
+def run_disconnect(peer, cmd):
+  """
+  Start the job of disconnecting from a passive PEER.
+
+  The CMD string is a shell command which will disconnect from the peer.
+  """
+  q = T.Queue()
+  cmd = Command(['disconnect', peer.name], q, 'disconnect',
+                ['/bin/sh', '-c', cmd], None)
+  potwatch('disconnect', peer.name, q)
+
+_pingseq = 0
+class PingPeer (object):
+  """
+  Object representing a peer which we are pinging to ensure that it is still
+  present.
+
+  PingPeer objects are held by the Pinger (q.v.).  The Pinger maintains an
+  event queue -- which saves us from having an enormous swarm of coroutines
+  -- but most of the actual work is done here.
+
+  In order to avoid confusion between different PingPeer instances for the
+  same actual peer, each PingPeer has a sequence number (its `seq'
+  attribute).  Events for the PingPeer are identified by a (PEER, SEQ) pair.
+  (Using the PingPeer instance itself will prevent garbage collection of
+  otherwise defunct instances.)
+  """
+
+  def __init__(me, pinger, queue, peer, pingnow):
+    """
+    Create a new PingPeer.
+
+    The PINGER is the Pinger object we should send the results to.  This is
+    used when we remove ourselves, if the peer has been explicitly removed.
+
+    The QUEUE is the event queue on which timer and ping-command events
+    should be written.
+
+    The PEER is a `Peer' object describing the peer.
+
+    If PINGNOW is true, then immediately start pinging the peer.  Otherwise
+    wait until the usual retry interval.
+    """
+    global _pingseq
+    me._pinger = pinger
+    me._q = queue
+    me._peer = peer.name
+    me.update(peer)
+    me.seq = _pingseq
+    _pingseq += 1
+    me._failures = 0
+    if pingnow:
+      me._timer = None
+      me._ping()
+    else:
+      me._timer = M.SelTimer(time() + me._every, me._time)
+
+  def update(me, peer):
+    """
+    Refreshes the timer parameters for this peer.  We don't, however,
+    immediately reschedule anything: that will happen next time anything
+    interesting happens.
+    """
+    if peer is None: peer = Peer(me._peer)
+    assert peer.name == me._peer
+    me._every = peer.get('every', filter = T.timespec, default = 120)
+    me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
+    me._retries = peer.get('retries', filter = int, default = 5)
+    me._connectp = peer.has('connect')
+    return me
+
+  def _ping(me):
+    """
+    Send a ping to the peer; the result is sent to the Pinger's event queue.
+    """
+    S.rawcommand(T.TripeAsynchronousCommand(
+      me._q, (me._peer, me.seq),
+      ['EPING',
+       '-background', S.bgtag(),
+       '-timeout', str(me._timeout),
+       '--',
+       me._peer]))
+
+  def _reconnect(me):
+    peer = Peer(me._peer)
+    if me._connectp:
+      S.warn('connect', 'reconnecting', me._peer)
+      S.forcekx(me._peer)
+      T.spawn(run_connect, peer, peer.get('connect'))
+      me._timer = M.SelTimer(time() + me._every, me._time)
+    else:
+      S.kill(me._peer)
+
+  def event(me, code, stuff):
+    """
+    Respond to an event which happened to this peer.
+
+    Timer events indicate that we should start a new ping.  (The server has
+    its own timeout which detects lost packets.)
+
+    We trap unknown-peer responses and detach from the Pinger.
+
+    If the ping fails and we run out of retries, we attempt to restart the
+    connection.
+    """
+    if code == 'TIMER':
+      me._failures = 0
+      me._ping()
+    elif code == 'FAIL':
+      S.notify('connect', 'ping-failed', me._peer, *stuff)
+      if not stuff:
+        pass
+      elif stuff[0] == 'unknown-peer':
+        me._pinger.kill(me._peer)
+      elif stuff[0] == 'ping-send-failed':
+        me._reconnect()
+    elif code == 'INFO':
+      if stuff[0] == 'ping-ok':
+        if me._failures > 0:
+          S.warn('connect', 'ping-ok', me._peer)
+        me._timer = M.SelTimer(time() + me._every, me._time)
+      elif stuff[0] == 'ping-timeout':
+        me._failures += 1
+        S.warn('connect', 'ping-timeout', me._peer,
+               'attempt', str(me._failures), 'of', str(me._retries))
+        if me._failures < me._retries:
+          me._ping()
+        else:
+          me._reconnect()
+      elif stuff[0] == 'ping-peer-died':
+        me._pinger.kill(me._peer)
+
+  @T._callback
+  def _time(me):
+    """
+    Handle timer callbacks by posting a timeout event on the queue.
+    """
+    me._timer = None
+    me._q.put(((me._peer, me.seq), 'TIMER', None))
+
+  def __str__(me):
+    return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
+  def __repr__(me):
+    return str(me)
+
+class Pinger (T.Coroutine):
+  """
+  The Pinger keeps track of the peers which we expect to be connected and
+  takes action if they seem to stop responding.
+
+  There is usually only one Pinger, called pinger.
+
+  The Pinger maintains a collection of PingPeer objects, and an event queue.
+  The PingPeers direct the results of their pings, and timer events, to the
+  event queue.  The Pinger's coroutine picks items off the queue and
+  dispatches them back to the PingPeers as appropriate.
+  """
+
+  def __init__(me):
+    """Initialize the Pinger."""
+    T.Coroutine.__init__(me)
+    me._peers = {}
+    me._q = T.Queue()
+
+  def run(me):
+    """
+    Coroutine function: reads the pinger queue and sends events to the
+    PingPeer objects they correspond to.
+    """
+    while True:
+      (peer, seq), code, stuff = me._q.get()
+      if peer in me._peers and seq == me._peers[peer].seq:
+        me._peers[peer].event(code, stuff)
+
+  def add(me, peer, pingnow):
+    """
+    Add PEER to the collection of peers under the Pinger's watchful eye.
+    The arguments are as for PingPeer: see above.
+    """
+    me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
+    return me
+
+  def kill(me, peername):
+    """Remove PEER from the peers being watched by the Pinger."""
+    del me._peers[peername]
+    return me
+
+  def rescan(me, startup):
+    """
+    General resynchronization method.
+
+    We scan the list of peers (with connect scripts) known at the server.
+    Any which are known to the Pinger but aren't known to the server are
+    removed from our list; newly arrived peers are added.  (Note that a peer
+    can change state here either due to the server sneakily changing its list
+    without issuing notifications or, more likely, the database changing its
+    idea of whether a peer is interesting.)  Finally, PingPeers which are
+    still present are prodded to update their timing parameters.
+
+    This method is called once at startup to pick up the peers already
+    installed, and again by the dbwatcher coroutine when it detects a change
+    to the database.
+    """
+    if T._debug: print '# rescan peers'
+    correct = {}
+    start = {}
+    for name in S.list():
+      try: peer = Peer(name)
+      except KeyError: continue
+      if peer.get('watch', filter = boolean, default = False):
+        if T._debug: print '# interesting peer %s' % peer
+        correct[peer.name] = start[peer.name] = peer
+      elif startup:
+        if T._debug: print '# peer %s ready for adoption' % peer
+        start[peer.name] = peer
+    for name, obj in me._peers.items():
+      try:
+        peer = correct[name]
+      except KeyError:
+        if T._debug: print '# peer %s vanished' % name
+        del me._peers[name]
+      else:
+        obj.update(peer)
+    for name, peer in start.iteritems():
+      if name in me._peers: continue
+      if startup:
+        if T._debug: print '# setting up peer %s' % name
+        ifname = S.ifname(name)
+        addr = S.addr(name)
+        T.defer(adoptpeer, peer, ifname, *addr)
+      else:
+        if T._debug: print '# adopting new peer %s' % name
+        me.add(peer, True)
+    return me
+
+  def adopted(me):
+    """
+    Returns the list of peers being watched by the Pinger.
+    """
+    return me._peers.keys()
+
+###--------------------------------------------------------------------------
+### New connections.
+
+def encode_envvars(env, prefix, vars):
+  """
+  Encode the variables in VARS suitably for including in a program
+  environment.  Lowercase letters in variable names are forced to uppercase;
+  runs of non-alphanumeric characters are replaced by single underscores; and
+  the PREFIX is prepended.  The resulting variables are written to ENV.
+  """
+  for k, v in vars.iteritems():
+    env[prefix + r_bad.sub('_', k.upper())] = v
+
+r_bad = RX.compile(r'[\W_]+')
+def envvars(peer):
+  """
+  Translate the database information for a PEER into a dictionary of
+  environment variables with plausible upper-case names and a P_ prefix.
+  Also collect the crypto information into A_ variables.
+  """
+  env = {}
+  encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
+  encode_envvars(env, 'A_', S.algs(peer.name))
+  return env
+
+def run_ifupdown(what, peer, *args):
+  """
+  Run the interface up/down script for a peer.
+
+  WHAT is 'ifup' or 'ifdown'.  PEER names the peer in question.  ARGS is a
+  list of arguments to pass to the script, in addition to the peer name.
+
+  The command is run and watched in the background by potwatch.
+  """
+  q = T.Queue()
+  c = Command([what, peer.name], q, what,
+              M.split(peer.get(what), quotep = True)[0] +
+              [peer.name] + list(args),
+              envvars(peer))
+  potwatch(what, peer.name, q)
+
+def adoptpeer(peer, ifname, *addr):
+  """
+  Add a new peer to our collection.
+
+  PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
+  ADDR is the list of tokens representing its address.
+
+  We try to bring up the interface and provoke a connection to the peer if
+  it's passive.
+  """
+  if peer.has('ifup'):
+    T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
+        .switch('ifup', peer, ifname, *addr)
+  cmd = peer.get('connect', default = None)
+  if cmd is not None:
+    T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
+        .switch(peer, cmd)
+  if peer.get('watch', filter = boolean, default = False):
+    pinger.add(peer, False)
+
+def disownpeer(peer):
+  """Drop the PEER from the Pinger and put its interface to bed."""
+  try: pinger.kill(peer)
+  except KeyError: pass
+  cmd = peer.get('disconnect', default = None)
+  if cmd is not None:
+    T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
+        .switch(peer, cmd)
+  if peer.has('ifdown'):
+    T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
+        .switch('ifdown', peer)
+
 def addpeer(peer, addr):
   """
   Process a connect request from a new peer PEER on address ADDR.
@@ -92,25 +672,72 @@ def addpeer(peer, addr):
           tunnel = peer.get('tunnel', None),
           keepalive = peer.get('keepalive', None),
           key = peer.get('key', None),
+          priv = peer.get('priv', None),
           mobile = peer.get('mobile', 'nil') in booltrue,
           cork = peer.get('cork', 'nil') in booltrue,
           *addr)
   except T.TripeError, exc:
     raise T.TripeJobError(*exc.args)
 
+## Dictionary mapping challenges to waiting passive-connection coroutines.
+chalmap = {}
+
+def notify(_, code, *rest):
+  """
+  Watch for notifications.
+
+  We trap ADD and KILL notifications, and send them straight to adoptpeer and
+  disownpeer respectively; and dispatch GREET notifications to the
+  corresponding waiting coroutine.
+  """
+  if code == 'ADD':
+    try: p = Peer(rest[0])
+    except KeyError: return
+    adoptpeer(p, *rest[1:])
+  elif code == 'KILL':
+    try: p = Peer(rest[0])
+    except KeyError: return
+    disownpeer(p, *rest[1:])
+  elif code == 'GREET':
+    chal = rest[0]
+    try: cr = chalmap[chal]
+    except KeyError: pass
+    else: cr.switch(rest[1:])
+
+###--------------------------------------------------------------------------
+### Command implementation.
+
+def cmd_kick(name):
+  """
+  kick NAME: Force a new connection attempt for the NAMEd peer.
+  """
+  if name not in pinger.adopted():
+    raise T.TripeJobError('peer-not-adopted', name)
+  try: peer = Peer(name)
+  except KeyError: raise T.TripeJobError('unknown-peer', name)
+  T.spawn(run_connect, peer, peer.get('connect'))
+
+def cmd_adopted():
+  """
+  adopted: Report a list of adopted peers.
+  """
+  for name in pinger.adopted():
+    T.svcinfo(name)
+
 def cmd_active(name):
   """
   active NAME: Handle an active connection request for the peer called NAME.
 
   The appropriate address is read from the database automatically.
   """
-  peer = Peer(name)
+  try: peer = Peer(name)
+  except KeyError: raise T.TripeJobError('unknown-peer', name)
   addr = peer.get('peer')
   if addr == 'PASSIVE':
     raise T.TripeJobError('passive-peer', name)
   addpeer(peer, M.split(addr, quotep = True)[0])
 
-def cmd_list():
+def cmd_listactive():
   """
   list: Report a list of the available active peers.
   """
@@ -123,14 +750,20 @@ def cmd_info(name):
   """
   info NAME: Report the database entries for the named peer.
   """
-  peer = Peer(name)
+  try: peer = Peer(name)
+  except KeyError: raise T.TripeJobError('unknown-peer', name)
   items = list(peer.list())
   items.sort()
   for i in items:
     T.svcinfo('%s=%s' % (i, peer.get(i)))
 
-## Dictionary mapping challenges to waiting passive-connection coroutines.
-chalmap = {}
+def cmd_userpeer(user):
+  """
+  userpeer USER: Report the peer name for the named user.
+  """
+  try: name = CDB.init(opts.cdb)['U' + user]
+  except KeyError: raise T.TripeJobError('unknown-user', user)
+  T.svcinfo(name)
 
 def cmd_passive(*args):
   """
@@ -145,10 +778,10 @@ def cmd_passive(*args):
     if opt == '-timeout':
       timeout = T.timespec(op.arg())
   user, = op.rest(1, 1)
-  try:
-    peer = CDB.init(opts.cdb)['U' + user]
-  except KeyError:
-    raise T.TripeJobError('unknown-user', user)
+  try: name = CDB.init(opts.cdb)['U' + user]
+  except KeyError: raise T.TripeJobError('unknown-user', user)
+  try: peer = Peer(name)
+  except KeyError: raise T.TripeJobError('unknown-peer', name)
   chal = S.getchal()
   cr = T.Coroutine.getcurrent()
   timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
@@ -158,24 +791,10 @@ def cmd_passive(*args):
     addr = cr.parent.switch()
     if addr is None:
       raise T.TripeJobError('connect-timeout')
-    addpeer(Peer(peer), addr)
+    addpeer(peer, addr)
   finally:
     del chalmap[chal]
 
-def notify(_, code, *rest):
-  """
-  Watch for notifications.
-
-  In particular, if a GREETing appears quoting a challenge in the chalmap
-  then wake up the corresponding coroutine.
-  """
-  if code != 'GREET':
-    return
-  chal = rest[0]
-  addr = rest[1:]
-  if chal in chalmap:
-    chalmap[chal].switch(addr)
-
 ###--------------------------------------------------------------------------
 ### Start up.
 
@@ -183,10 +802,14 @@ def setup():
   """
   Service setup.
 
-  Register the notification-watcher, and add the automatic active peers.
+  Register the notification watcher, rescan the peers, and add automatic
+  active peers.
   """
   S.handler['NOTE'] = notify
   S.watch('+n')
+
+  pinger.rescan(opts.startup)
+
   if opts.startup:
     cdb = CDB.init(opts.cdb)
     try:
@@ -200,6 +823,18 @@ def setup():
       except T.TripeJobError, err:
         S.warn('connect', 'auto-add-failed', name, *err.args)
 
+def init():
+  """
+  Initialization to be done before service startup.
+  """
+  global errorwatch, childwatch, pinger
+  errorwatch = ErrorWatch()
+  childwatch = ChildWatch()
+  pinger = Pinger()
+  T.Coroutine(dbwatch, name = 'dbwatch').switch()
+  errorwatch.switch()
+  pinger.switch()
+
 def parse_options():
   """
   Parse the command-line options.
@@ -236,17 +871,20 @@ def parse_options():
   return opts
 
 ## Service table, for running manually.
-service_info = [('connect', VERSION, {
+service_info = [('connect', T.VERSION, {
+  'adopted': (0, 0, '', cmd_adopted),
+  'kick': (1, 1, 'PEER', cmd_kick),
   'passive': (1, None, '[OPTIONS] USER', cmd_passive),
   'active': (1, 1, 'PEER', cmd_active),
   'info': (1, 1, 'PEER', cmd_info),
-  'list': (0, 0, '', cmd_list)
+  'list-active': (0, 0, '', cmd_listactive),
+  'userpeer': (1, 1, 'USER', cmd_userpeer)
 })]
 
 if __name__ == '__main__':
   opts = parse_options()
   T.runservices(opts.tripesock, service_info,
-                setup = setup,
+                init = init, setup = setup,
                 daemon = opts.daemon)
 
 ###----- That's all, folks --------------------------------------------------