chiark / gitweb /
server/admin.c (a_vformat): Fix uses of `va_arg' to dereference `ap'.
[tripe] / svc / connect.in
index 37241bbce9fcd2b37c231b6b03b190b8df562800..047139678a9904ef805ebb1b75e764693233701f 100644 (file)
@@ -1,9 +1,9 @@
 #! @PYTHON@
 ### -*-python-*-
 ###
 #! @PYTHON@
 ### -*-python-*-
 ###
-### Service for establishing dynamic connections
+### Connect to remote peers, and keep track of them
 ###
 ###
-### (c) 2006 Straylight/Edgeware
+### (c) 2007 Straylight/Edgeware
 ###
 
 ###----- Licensing notice ---------------------------------------------------
 ###
 
 ###----- Licensing notice ---------------------------------------------------
@@ -32,14 +32,243 @@ VERSION = '@VERSION@'
 from optparse import OptionParser
 import tripe as T
 import os as OS
 from optparse import OptionParser
 import tripe as T
 import os as OS
+import signal as SIG
+import errno as E
 import cdb as CDB
 import mLib as M
 import cdb as CDB
 import mLib as M
+import re as RX
 from time import time
 from time import time
+import subprocess as PROC
 
 S = T.svcmgr
 
 ###--------------------------------------------------------------------------
 
 S = T.svcmgr
 
 ###--------------------------------------------------------------------------
-### Main service machinery.
+### Running auxiliary commands.
+
+class SelLineQueue (M.SelLineBuffer):
+  """Glues the select-line-buffer into the coroutine queue system."""
+
+  def __new__(cls, file, queue, tag, kind):
+    """See __init__ for documentation."""
+    return M.SelLineBuffer.__new__(cls, file.fileno())
+
+  def __init__(me, file, queue, tag, kind):
+    """
+    Initialize a new line-reading adaptor.
+
+    The adaptor reads lines from FILE.  Each line is inserted as a message of
+    the stated KIND, bearing the TAG, into the QUEUE.  End-of-file is
+    represented as None.
+    """
+    me._q = queue
+    me._file = file
+    me._tag = tag
+    me._kind = kind
+    me.enable()
+
+  @T._callback
+  def line(me, line):
+    me._q.put((me._tag, me._kind, line))
+
+  @T._callback
+  def eof(me):
+    me.disable()
+    me._q.put((me._tag, me._kind, None))
+
+class ErrorWatch (T.Coroutine):
+  """
+  An object which watches stderr streams for errors and converts them into
+  warnings of the form
+
+    WARN connect INFO stderr LINE
+
+  The INFO is a list of tokens associated with the file when it was
+  registered.
+
+  Usually there is a single ErrorWatch object, called errorwatch.
+  """
+
+  def __init__(me):
+    """Initialization: there are no arguments."""
+    T.Coroutine.__init__(me)
+    me._q = T.Queue()
+    me._map = {}
+    me._seq = 1
+
+  def watch(me, file, info):
+    """
+    Adds FILE to the collection of files to watch.
+
+    INFO will be written in the warning messages from this FILE.  Returns a
+    sequence number which can be used to unregister the file again.
+    """
+    seq = me._seq
+    me._seq += 1
+    me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
+    return seq
+
+  def unwatch(me, seq):
+    """Stop watching the file with sequence number SEQ."""
+    del me._map[seq]
+    return me
+
+  def run(me):
+    """
+    Coroutine function: read items from the queue and report them.
+
+    Unregisters files automatically when they reach EOF.
+    """
+    while True:
+      seq, _, line = me._q.get()
+      if line is None:
+        me.unwatch(seq)
+      else:
+        S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
+
+def dbwatch():
+  """
+  Coroutine function: wake up every minute and notice changes to the
+  database.  When a change happens, tell the Pinger (q.v.) to rescan its
+  peers.
+  """
+  cr = T.Coroutine.getcurrent()
+  main = cr.parent
+  fw = M.FWatch(opts.cdb)
+  while True:
+    timer = M.SelTimer(time() + 60, lambda: cr.switch())
+    main.switch()
+    if fw.update():
+      pinger.rescan(False)
+      S.notify('connect', 'peerdb-update')
+
+class ChildWatch (M.SelSignal):
+  """
+  An object which watches for specified processes exiting and reports
+  terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
+
+  There is usually only one ChildWatch object, called childwatch.
+  """
+
+  def __new__(cls):
+    """Initialize the child-watcher."""
+    return M.SelSignal.__new__(cls, SIG.SIGCHLD)
+
+  def __init__(me):
+    """Initialize the child-watcher."""
+    me._pid = {}
+    me.enable()
+
+  def watch(me, pid, queue, tag):
+    """
+    Register PID as a child to watch.  If it exits, write (TAG, 'exit', CODE)
+    to the QUEUE, where CODE is one of
+
+      * None (successful termination)
+      * ['exit-nonzero', CODE] (CODE is a string!)
+      * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
+      * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
+    """
+    me._pid[pid] = queue, tag
+    return me
+
+  def unwatch(me, pid):
+    """Unregister PID as a child to watch."""
+    del me._pid[pid]
+    return me
+
+  @T._callback
+  def signalled(me):
+    """
+    Called when child processes exit: collect exit statuses and report
+    failures.
+    """
+    while True:
+      try:
+        pid, status = OS.waitpid(-1, OS.WNOHANG)
+      except OSError, exc:
+        if exc.errno == E.ECHILD:
+          break
+      if pid == 0:
+        break
+      if pid not in me._pid:
+        continue
+      queue, tag = me._pid[pid]
+      if OS.WIFEXITED(status):
+        exit = OS.WEXITSTATUS(status)
+        if exit == 0:
+          code = None
+        else:
+          code = ['exit-nonzero', str(exit)]
+      elif OS.WIFSIGNALED(status):
+        code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
+      else:
+        code = ['exit-unknown', hex(status)]
+      queue.put((tag, 'exit', code))
+
+class Command (object):
+  """
+  Represents a running command.
+
+  This class is the main interface to the machery provided by the ChildWatch
+  and ErrorWatch objects.  See also potwatch.
+  """
+
+  def __init__(me, info, queue, tag, args, env):
+    """
+    Start a new child process.
+
+    The ARGS are a list of arguments to be given to the child process.  The
+    ENV is either None or a dictionary of environment variable assignments to
+    override the extant environment.  INFO is a list of tokens to be included
+    in warnings about the child's stderr output.  If the child writes a line
+    to standard output, put (TAG, 'stdout', LINE) to the QUEUE.  When the
+    child exits, write (TAG, 'exit', CODE) to the QUEUE.
+    """
+    me._info = info
+    me._q = queue
+    me._tag = tag
+    myenv = OS.environ.copy()
+    if env: myenv.update(env)
+    me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
+                          stdout = PROC.PIPE, stderr = PROC.PIPE)
+    me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
+    errorwatch.watch(me._proc.stderr, info)
+    childwatch.watch(me._proc.pid, queue, tag)
+
+  def __del__(me):
+    """
+    If I've been forgotten then stop watching for termination.
+    """
+    childwatch.unwatch(me._proc.pid)
+
+def potwatch(what, name, q):
+  """
+  Watch the queue Q for activity as reported by a Command object.
+
+  Information from the process's stdout is reported as
+
+    NOTE WHAT NAME stdout LINE
+
+  abnormal termination is reported as
+
+    WARN WHAT NAME CODE
+
+  where CODE is what the ChildWatch wrote.
+  """
+  eofp = deadp = False
+  while not deadp or not eofp:
+    _, kind, more = q.get()
+    if kind == 'stdout':
+      if more is None:
+        eofp = True
+      else:
+        S.notify('connect', what, name, 'stdout', more)
+    elif kind == 'exit':
+      if more: S.warn('connect', what, name, *more)
+      deadp = True
+
+###--------------------------------------------------------------------------
+### Peer database utilities.
 
 _magic = ['_magic']                     # An object distinct from all others
 
 
 _magic = ['_magic']                     # An object distinct from all others
 
@@ -54,30 +283,382 @@ class Peer (object):
     one given on the command-line.
     """
     me.name = peer
     one given on the command-line.
     """
     me.name = peer
-    try:
-      record = (cdb or CDB.init(opts.cdb))['P' + peer]
-    except KeyError:
-      raise T.TripeJobError('unknown-peer', peer)
+    record = (cdb or CDB.init(opts.cdb))['P' + peer]
     me.__dict__.update(M.URLDecode(record, semip = True))
 
     me.__dict__.update(M.URLDecode(record, semip = True))
 
-  def get(me, key, default = _magic):
+  def get(me, key, default = _magic, filter = None):
     """
     Get the information stashed under KEY from the peer's database record.
 
     If DEFAULT is given, then use it if the database doesn't contain the
     """
     Get the information stashed under KEY from the peer's database record.
 
     If DEFAULT is given, then use it if the database doesn't contain the
-    necessary information.  If no DEFAULT is given, then report an error.
+    necessary information.  If no DEFAULT is given, then report an error.  If
+    a FILTER function is given then apply it to the information from the
+    database before returning it.
     """
     attr = me.__dict__.get(key, default)
     if attr is _magic:
       raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
     """
     attr = me.__dict__.get(key, default)
     if attr is _magic:
       raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
+    elif filter is not None:
+      attr = filter(attr)
     return attr
 
     return attr
 
+  def has(me, key):
+    """
+    Return whether the peer's database record has the KEY.
+    """
+    return key in me.__dict__
+
   def list(me):
     """
     Iterate over the available keys in the peer's database record.
     """
     return me.__dict__.iterkeys()
 
   def list(me):
     """
     Iterate over the available keys in the peer's database record.
     """
     return me.__dict__.iterkeys()
 
+def boolean(value):
+  """Parse VALUE as a boolean."""
+  return value in ['t', 'true', 'y', 'yes', 'on']
+
+###--------------------------------------------------------------------------
+### Waking up and watching peers.
+
+def run_connect(peer, cmd):
+  """
+  Start the job of connecting to the passive PEER.
+
+  The CMD string is a shell command which will connect to the peer (via some
+  back-channel, say ssh and userv), issue a command
+
+    SVCSUBMIT connect passive [OPTIONS] USER
+
+  and write the resulting challenge to standard error.
+  """
+  q = T.Queue()
+  cmd = Command(['connect', peer.name], q, 'connect',
+                ['/bin/sh', '-c', cmd], None)
+  _, kind, more = q.peek()
+  if kind == 'stdout':
+    if more is None:
+      S.warn('connect', 'connect', peer.name, 'unexpected-eof')
+    else:
+      chal = more
+      S.greet(peer.name, chal)
+      q.get()
+  potwatch('connect', peer.name, q)
+
+def run_disconnect(peer, cmd):
+  """
+  Start the job of disconnecting from a passive PEER.
+
+  The CMD string is a shell command which will disconnect from the peer.
+  """
+  q = T.Queue()
+  cmd = Command(['disconnect', peer.name], q, 'disconnect',
+                ['/bin/sh', '-c', cmd], None)
+  potwatch('disconnect', peer.name, q)
+
+_pingseq = 0
+class PingPeer (object):
+  """
+  Object representing a peer which we are pinging to ensure that it is still
+  present.
+
+  PingPeer objects are held by the Pinger (q.v.).  The Pinger maintains an
+  event queue -- which saves us from having an enormous swarm of coroutines
+  -- but most of the actual work is done here.
+
+  In order to avoid confusion between different PingPeer instances for the
+  same actual peer, each PingPeer has a sequence number (its `seq'
+  attribute).  Events for the PingPeer are identified by a (PEER, SEQ) pair.
+  (Using the PingPeer instance itself will prevent garbage collection of
+  otherwise defunct instances.)
+  """
+
+  def __init__(me, pinger, queue, peer, pingnow):
+    """
+    Create a new PingPeer.
+
+    The PINGER is the Pinger object we should send the results to.  This is
+    used when we remove ourselves, if the peer has been explicitly removed.
+
+    The QUEUE is the event queue on which timer and ping-command events
+    should be written.
+
+    The PEER is a `Peer' object describing the peer.
+
+    If PINGNOW is true, then immediately start pinging the peer.  Otherwise
+    wait until the usual retry interval.
+    """
+    global _pingseq
+    me._pinger = pinger
+    me._q = queue
+    me._peer = peer.name
+    me.update(peer)
+    me.seq = _pingseq
+    _pingseq += 1
+    me._failures = 0
+    if pingnow:
+      me._timer = None
+      me._ping()
+    else:
+      me._timer = M.SelTimer(time() + me._every, me._time)
+
+  def update(me, peer):
+    """
+    Refreshes the timer parameters for this peer.  We don't, however,
+    immediately reschedule anything: that will happen next time anything
+    interesting happens.
+    """
+    if peer is None: peer = Peer(me._peer)
+    assert peer.name == me._peer
+    me._every = peer.get('every', filter = T.timespec, default = 120)
+    me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
+    me._retries = peer.get('retries', filter = int, default = 5)
+    me._connectp = peer.has('connect')
+    return me
+
+  def _ping(me):
+    """
+    Send a ping to the peer; the result is sent to the Pinger's event queue.
+    """
+    S.rawcommand(T.TripeAsynchronousCommand(
+      me._q, (me._peer, me.seq),
+      ['EPING',
+       '-background', S.bgtag(),
+       '-timeout', str(me._timeout),
+       '--',
+       me._peer]))
+
+  def _reconnect(me):
+    peer = Peer(me._peer)
+    if me._connectp:
+      S.warn('connect', 'reconnecting', me._peer)
+      S.forcekx(me._peer)
+      T.spawn(run_connect, peer, peer.get('connect'))
+      me._timer = M.SelTimer(time() + me._every, me._time)
+    else:
+      S.kill(me._peer)
+
+  def event(me, code, stuff):
+    """
+    Respond to an event which happened to this peer.
+
+    Timer events indicate that we should start a new ping.  (The server has
+    its own timeout which detects lost packets.)
+
+    We trap unknown-peer responses and detach from the Pinger.
+
+    If the ping fails and we run out of retries, we attempt to restart the
+    connection.
+    """
+    if code == 'TIMER':
+      me._failures = 0
+      me._ping()
+    elif code == 'FAIL':
+      S.notify('connect', 'ping-failed', me._peer, *stuff)
+      if not stuff:
+        pass
+      elif stuff[0] == 'unknown-peer':
+        me._pinger.kill(me._peer)
+      elif stuff[0] == 'ping-send-failed':
+        me._reconnect()
+    elif code == 'INFO':
+      if stuff[0] == 'ping-ok':
+        if me._failures > 0:
+          S.warn('connect', 'ping-ok', me._peer)
+        me._timer = M.SelTimer(time() + me._every, me._time)
+      elif stuff[0] == 'ping-timeout':
+        me._failures += 1
+        S.warn('connect', 'ping-timeout', me._peer,
+               'attempt', str(me._failures), 'of', str(me._retries))
+        if me._failures < me._retries:
+          me._ping()
+        else:
+          me._reconnect()
+      elif stuff[0] == 'ping-peer-died':
+        me._pinger.kill(me._peer)
+
+  @T._callback
+  def _time(me):
+    """
+    Handle timer callbacks by posting a timeout event on the queue.
+    """
+    me._timer = None
+    me._q.put(((me._peer, me.seq), 'TIMER', None))
+
+  def __str__(me):
+    return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
+  def __repr__(me):
+    return str(me)
+
+class Pinger (T.Coroutine):
+  """
+  The Pinger keeps track of the peers which we expect to be connected and
+  takes action if they seem to stop responding.
+
+  There is usually only one Pinger, called pinger.
+
+  The Pinger maintains a collection of PingPeer objects, and an event queue.
+  The PingPeers direct the results of their pings, and timer events, to the
+  event queue.  The Pinger's coroutine picks items off the queue and
+  dispatches them back to the PingPeers as appropriate.
+  """
+
+  def __init__(me):
+    """Initialize the Pinger."""
+    T.Coroutine.__init__(me)
+    me._peers = {}
+    me._q = T.Queue()
+
+  def run(me):
+    """
+    Coroutine function: reads the pinger queue and sends events to the
+    PingPeer objects they correspond to.
+    """
+    while True:
+      (peer, seq), code, stuff = me._q.get()
+      if peer in me._peers and seq == me._peers[peer].seq:
+        me._peers[peer].event(code, stuff)
+
+  def add(me, peer, pingnow):
+    """
+    Add PEER to the collection of peers under the Pinger's watchful eye.
+    The arguments are as for PingPeer: see above.
+    """
+    me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
+    return me
+
+  def kill(me, peername):
+    """Remove PEER from the peers being watched by the Pinger."""
+    del me._peers[peername]
+    return me
+
+  def rescan(me, startup):
+    """
+    General resynchronization method.
+
+    We scan the list of peers (with connect scripts) known at the server.
+    Any which are known to the Pinger but aren't known to the server are
+    removed from our list; newly arrived peers are added.  (Note that a peer
+    can change state here either due to the server sneakily changing its list
+    without issuing notifications or, more likely, the database changing its
+    idea of whether a peer is interesting.)  Finally, PingPeers which are
+    still present are prodded to update their timing parameters.
+
+    This method is called once at startup to pick up the peers already
+    installed, and again by the dbwatcher coroutine when it detects a change
+    to the database.
+    """
+    if T._debug: print '# rescan peers'
+    correct = {}
+    start = {}
+    for name in S.list():
+      try: peer = Peer(name)
+      except KeyError: continue
+      if peer.get('watch', filter = boolean, default = False):
+        if T._debug: print '# interesting peer %s' % peer
+        correct[peer.name] = start[peer.name] = peer
+      elif startup:
+        if T._debug: print '# peer %s ready for adoption' % peer
+        start[peer.name] = peer
+    for name, obj in me._peers.items():
+      try:
+        peer = correct[name]
+      except KeyError:
+        if T._debug: print '# peer %s vanished' % name
+        del me._peers[name]
+      else:
+        obj.update(peer)
+    for name, peer in start.iteritems():
+      if name in me._peers: continue
+      if startup:
+        if T._debug: print '# setting up peer %s' % name
+        ifname = S.ifname(name)
+        addr = S.addr(name)
+        T.defer(adoptpeer, peer, ifname, *addr)
+      else:
+        if T._debug: print '# adopting new peer %s' % name
+        me.add(peer, True)
+    return me
+
+  def adopted(me):
+    """
+    Returns the list of peers being watched by the Pinger.
+    """
+    return me._peers.keys()
+
+###--------------------------------------------------------------------------
+### New connections.
+
+def encode_envvars(env, prefix, vars):
+  """
+  Encode the variables in VARS suitably for including in a program
+  environment.  Lowercase letters in variable names are forced to uppercase;
+  runs of non-alphanumeric characters are replaced by single underscores; and
+  the PREFIX is prepended.  The resulting variables are written to ENV.
+  """
+  for k, v in vars.iteritems():
+    env[prefix + r_bad.sub('_', k.upper())] = v
+
+r_bad = RX.compile(r'[\W_]+')
+def envvars(peer):
+  """
+  Translate the database information for a PEER into a dictionary of
+  environment variables with plausible upper-case names and a P_ prefix.
+  Also collect the crypto information into A_ variables.
+  """
+  env = {}
+  encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
+  encode_envvars(env, 'A_', S.algs(peer.name))
+  return env
+
+def run_ifupdown(what, peer, *args):
+  """
+  Run the interface up/down script for a peer.
+
+  WHAT is 'ifup' or 'ifdown'.  PEER names the peer in question.  ARGS is a
+  list of arguments to pass to the script, in addition to the peer name.
+
+  The command is run and watched in the background by potwatch.
+  """
+  q = T.Queue()
+  c = Command([what, peer.name], q, what,
+              M.split(peer.get(what), quotep = True)[0] +
+              [peer.name] + list(args),
+              envvars(peer))
+  potwatch(what, peer.name, q)
+
+def adoptpeer(peer, ifname, *addr):
+  """
+  Add a new peer to our collection.
+
+  PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
+  ADDR is the list of tokens representing its address.
+
+  We try to bring up the interface and provoke a connection to the peer if
+  it's passive.
+  """
+  if peer.has('ifup'):
+    T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
+        .switch('ifup', peer, ifname, *addr)
+  cmd = peer.get('connect', default = None)
+  if cmd is not None:
+    T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
+        .switch(peer, cmd)
+  if peer.get('watch', filter = boolean, default = False):
+    pinger.add(peer, False)
+
+def disownpeer(peer):
+  """Drop the PEER from the Pinger and put its interface to bed."""
+  try: pinger.kill(peer)
+  except KeyError: pass
+  cmd = peer.get('disconnect', default = None)
+  if cmd is not None:
+    T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
+        .switch(peer, cmd)
+  if peer.has('ifdown'):
+    T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
+        .switch('ifdown', peer)
+
 def addpeer(peer, addr):
   """
   Process a connect request from a new peer PEER on address ADDR.
 def addpeer(peer, addr):
   """
   Process a connect request from a new peer PEER on address ADDR.
@@ -87,28 +668,77 @@ def addpeer(peer, addr):
   if peer.name in S.list():
     S.kill(peer.name)
   try:
   if peer.name in S.list():
     S.kill(peer.name)
   try:
+    booltrue = ['t', 'true', 'y', 'yes', 'on']
     S.add(peer.name,
           tunnel = peer.get('tunnel', None),
           keepalive = peer.get('keepalive', None),
           key = peer.get('key', None),
     S.add(peer.name,
           tunnel = peer.get('tunnel', None),
           keepalive = peer.get('keepalive', None),
           key = peer.get('key', None),
-          cork = peer.get('cork', 'nil') in ['t', 'true', 'y', 'yes', 'on'],
+          priv = peer.get('priv', None),
+          mobile = peer.get('mobile', 'nil') in booltrue,
+          cork = peer.get('cork', 'nil') in booltrue,
           *addr)
   except T.TripeError, exc:
     raise T.TripeJobError(*exc.args)
 
           *addr)
   except T.TripeError, exc:
     raise T.TripeJobError(*exc.args)
 
+## Dictionary mapping challenges to waiting passive-connection coroutines.
+chalmap = {}
+
+def notify(_, code, *rest):
+  """
+  Watch for notifications.
+
+  We trap ADD and KILL notifications, and send them straight to adoptpeer and
+  disownpeer respectively; and dispatch GREET notifications to the
+  corresponding waiting coroutine.
+  """
+  if code == 'ADD':
+    try: p = Peer(rest[0])
+    except KeyError: return
+    adoptpeer(p, *rest[1:])
+  elif code == 'KILL':
+    try: p = Peer(rest[0])
+    except KeyError: return
+    disownpeer(p, *rest[1:])
+  elif code == 'GREET':
+    chal = rest[0]
+    try: cr = chalmap[chal]
+    except KeyError: pass
+    else: cr.switch(rest[1:])
+
+###--------------------------------------------------------------------------
+### Command implementation.
+
+def cmd_kick(name):
+  """
+  kick NAME: Force a new connection attempt for the NAMEd peer.
+  """
+  if name not in pinger.adopted():
+    raise T.TripeJobError('peer-not-adopted', name)
+  try: peer = Peer(name)
+  except KeyError: raise T.TripeJobError('unknown-peer', name)
+  T.spawn(run_connect, peer, peer.get('connect'))
+
+def cmd_adopted():
+  """
+  adopted: Report a list of adopted peers.
+  """
+  for name in pinger.adopted():
+    T.svcinfo(name)
+
 def cmd_active(name):
   """
   active NAME: Handle an active connection request for the peer called NAME.
 
   The appropriate address is read from the database automatically.
   """
 def cmd_active(name):
   """
   active NAME: Handle an active connection request for the peer called NAME.
 
   The appropriate address is read from the database automatically.
   """
-  peer = Peer(name)
+  try: peer = Peer(name)
+  except KeyError: raise T.TripeJobError('unknown-peer', name)
   addr = peer.get('peer')
   if addr == 'PASSIVE':
     raise T.TripeJobError('passive-peer', name)
   addpeer(peer, M.split(addr, quotep = True)[0])
 
   addr = peer.get('peer')
   if addr == 'PASSIVE':
     raise T.TripeJobError('passive-peer', name)
   addpeer(peer, M.split(addr, quotep = True)[0])
 
-def cmd_list():
+def cmd_listactive():
   """
   list: Report a list of the available active peers.
   """
   """
   list: Report a list of the available active peers.
   """
@@ -121,14 +751,20 @@ def cmd_info(name):
   """
   info NAME: Report the database entries for the named peer.
   """
   """
   info NAME: Report the database entries for the named peer.
   """
-  peer = Peer(name)
+  try: peer = Peer(name)
+  except KeyError: raise T.TripeJobError('unknown-peer', name)
   items = list(peer.list())
   items.sort()
   for i in items:
     T.svcinfo('%s=%s' % (i, peer.get(i)))
 
   items = list(peer.list())
   items.sort()
   for i in items:
     T.svcinfo('%s=%s' % (i, peer.get(i)))
 
-## Dictionary mapping challenges to waiting passive-connection coroutines.
-chalmap = {}
+def cmd_userpeer(user):
+  """
+  userpeer USER: Report the peer name for the named user.
+  """
+  try: name = CDB.init(opts.cdb)['U' + user]
+  except KeyError: raise T.TripeJobError('unknown-user', user)
+  T.svcinfo(name)
 
 def cmd_passive(*args):
   """
 
 def cmd_passive(*args):
   """
@@ -143,10 +779,10 @@ def cmd_passive(*args):
     if opt == '-timeout':
       timeout = T.timespec(op.arg())
   user, = op.rest(1, 1)
     if opt == '-timeout':
       timeout = T.timespec(op.arg())
   user, = op.rest(1, 1)
-  try:
-    peer = CDB.init(opts.cdb)['U' + user]
-  except KeyError:
-    raise T.TripeJobError('unknown-user', user)
+  try: name = CDB.init(opts.cdb)['U' + user]
+  except KeyError: raise T.TripeJobError('unknown-user', user)
+  try: peer = Peer(name)
+  except KeyError: raise T.TripeJobError('unknown-peer', name)
   chal = S.getchal()
   cr = T.Coroutine.getcurrent()
   timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
   chal = S.getchal()
   cr = T.Coroutine.getcurrent()
   timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
@@ -156,24 +792,10 @@ def cmd_passive(*args):
     addr = cr.parent.switch()
     if addr is None:
       raise T.TripeJobError('connect-timeout')
     addr = cr.parent.switch()
     if addr is None:
       raise T.TripeJobError('connect-timeout')
-    addpeer(Peer(peer), addr)
+    addpeer(peer, addr)
   finally:
     del chalmap[chal]
 
   finally:
     del chalmap[chal]
 
-def notify(_, code, *rest):
-  """
-  Watch for notifications.
-
-  In particular, if a GREETing appears quoting a challenge in the chalmap
-  then wake up the corresponding coroutine.
-  """
-  if code != 'GREET':
-    return
-  chal = rest[0]
-  addr = rest[1:]
-  if chal in chalmap:
-    chalmap[chal].switch(addr)
-
 ###--------------------------------------------------------------------------
 ### Start up.
 
 ###--------------------------------------------------------------------------
 ### Start up.
 
@@ -181,10 +803,14 @@ def setup():
   """
   Service setup.
 
   """
   Service setup.
 
-  Register the notification-watcher, and add the automatic active peers.
+  Register the notification watcher, rescan the peers, and add automatic
+  active peers.
   """
   S.handler['NOTE'] = notify
   S.watch('+n')
   """
   S.handler['NOTE'] = notify
   S.watch('+n')
+
+  pinger.rescan(opts.startup)
+
   if opts.startup:
     cdb = CDB.init(opts.cdb)
     try:
   if opts.startup:
     cdb = CDB.init(opts.cdb)
     try:
@@ -198,6 +824,18 @@ def setup():
       except T.TripeJobError, err:
         S.warn('connect', 'auto-add-failed', name, *err.args)
 
       except T.TripeJobError, err:
         S.warn('connect', 'auto-add-failed', name, *err.args)
 
+def init():
+  """
+  Initialization to be done before service startup.
+  """
+  global errorwatch, childwatch, pinger
+  errorwatch = ErrorWatch()
+  childwatch = ChildWatch()
+  pinger = Pinger()
+  T.Coroutine(dbwatch, name = 'dbwatch').switch()
+  errorwatch.switch()
+  pinger.switch()
+
 def parse_options():
   """
   Parse the command-line options.
 def parse_options():
   """
   Parse the command-line options.
@@ -234,17 +872,20 @@ def parse_options():
   return opts
 
 ## Service table, for running manually.
   return opts
 
 ## Service table, for running manually.
-service_info = [('connect', VERSION, {
+service_info = [('connect', T.VERSION, {
+  'adopted': (0, 0, '', cmd_adopted),
+  'kick': (1, 1, 'PEER', cmd_kick),
   'passive': (1, None, '[OPTIONS] USER', cmd_passive),
   'active': (1, 1, 'PEER', cmd_active),
   'info': (1, 1, 'PEER', cmd_info),
   'passive': (1, None, '[OPTIONS] USER', cmd_passive),
   'active': (1, 1, 'PEER', cmd_active),
   'info': (1, 1, 'PEER', cmd_info),
-  'list': (0, 0, '', cmd_list)
+  'list-active': (0, 0, '', cmd_listactive),
+  'userpeer': (1, 1, 'USER', cmd_userpeer)
 })]
 
 if __name__ == '__main__':
   opts = parse_options()
   T.runservices(opts.tripesock, service_info,
 })]
 
 if __name__ == '__main__':
   opts = parse_options()
   T.runservices(opts.tripesock, service_info,
-                setup = setup,
+                init = init, setup = setup,
                 daemon = opts.daemon)
 
 ###----- That's all, folks --------------------------------------------------
                 daemon = opts.daemon)
 
 ###----- That's all, folks --------------------------------------------------