chiark / gitweb /
svc/watch.in: Use the `Peer' object from `connect' to carry information.
authorMark Wooding <mdw@distorted.org.uk>
Sat, 13 Jul 2013 15:34:40 +0000 (16:34 +0100)
committerMark Wooding <mdw@distorted.org.uk>
Tue, 14 Jan 2014 10:40:17 +0000 (10:40 +0000)
The `get' method is slightly improved to do type conversions.

This replaces the ad-hoc use of dictionaries.  The code is somewhat
cleaner as a result, and very slightly longer.  But most importantly it
paves the way for a merge of these two services.

svc/watch.in

index 4aaf20b..606d9cc 100644 (file)
@@ -270,80 +270,91 @@ def potwatch(what, name, q):
 ###--------------------------------------------------------------------------
 ### Peer database utilities.
 
-def timespec(info, key, default):
-  """Parse INFO[KEY] as a timespec, or return DEFAULT."""
-  try:
-    return T.timespec(info[key])
-  except (KeyError, T.TripeJobError):
-    return default
+_magic = ['_magic']                     # An object distinct from all others
 
-def integer(info, key, default):
-  """Parse INFO[KEY] as an integer, or return DEFAULT."""
-  try:
-    return int(info[key])
-  except (KeyError, ValueError):
-    return default
+class Peer (object):
+  """Representation of a peer in the database."""
 
-def boolean(info, key, default):
-  """Parse INFO[KEY] as a boolean, or return DEFAULT."""
-  try:
-    return info[key] in ['t', 'true', 'y', 'yes', 'on']
-  except (KeyError, ValueError):
-    return default
+  def __init__(me, peer, cdb = None):
+    """
+    Create a new peer, named PEER.
 
-def peerinfo(peer):
-  """
-  Return a dictionary containing information about PEER from the database.
-  """
-  return dict(M.URLDecode(CDB.init(opts.cdb)['P' + peer], semip = True))
+    Information about the peer is read from the database CDB, or the default
+    one given on the command-line.
+    """
+    me.name = peer
+    record = (cdb or CDB.init(opts.cdb))['P' + peer]
+    me.__dict__.update(M.URLDecode(record, semip = True))
+
+  def get(me, key, default = _magic, filter = None):
+    """
+    Get the information stashed under KEY from the peer's database record.
+
+    If DEFAULT is given, then use it if the database doesn't contain the
+    necessary information.  If no DEFAULT is given, then report an error.  If
+    a FILTER function is given then apply it to the information from the
+    database before returning it.
+    """
+    attr = me.__dict__.get(key, default)
+    if attr is _magic:
+      raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
+    elif filter is not None:
+      attr = filter(attr)
+    return attr
+
+  def has(me, key):
+    """
+    Return whether the peer's database record has the KEY.
+    """
+    return key in me.__dict__
+
+  def list(me):
+    """
+    Iterate over the available keys in the peer's database record.
+    """
+    return me.__dict__.iterkeys()
+
+def boolean(value):
+  """Parse VALUE as a boolean."""
+  return value in ['t', 'true', 'y', 'yes', 'on']
 
 ###--------------------------------------------------------------------------
 ### Waking up and watching peers.
 
-def connect(peer, conn = None):
+def connect(peer, cmd):
   """
   Start the job of connecting to the passive PEER.
 
-  The CONN string is a shell command which will connect to the peer (via some
+  The CMD string is a shell command which will connect to the peer (via some
   back-channel, say ssh and userv), issue a command
 
     SVCSUBMIT connect passive [OPTIONS] USER
 
   and write the resulting challenge to standard error.
   """
-  if conn is None:
-    try:
-      conn = peerinfo(peer)['connect']
-    except KeyError:
-      return
   q = T.Queue()
-  cmd = Command(['connect', peer], q, 'connect',
-                ['/bin/sh', '-c', conn], None)
+  cmd = Command(['connect', peer.name], q, 'connect',
+                ['/bin/sh', '-c', cmd], None)
   _, kind, more = q.peek()
   if kind == 'stdout':
     if more is None:
-      S.warn('watch', 'connect', peer, 'unexpected-eof')
+      S.warn('watch', 'connect', peer.name, 'unexpected-eof')
     else:
       chal = more
-      S.greet(peer, chal)
+      S.greet(peer.name, chal)
       q.get()
-  potwatch('connect', peer, q)
+  potwatch('connect', peer.name, q)
 
-def disconnect(peer, disconn = None):
+def disconnect(peer, cmd):
   """
   Start the job of disconnecting from a passive PEER.
 
-  The DISCONN string is a shell command which will disconnect from the peer.
+  The CMD string is a shell command which will disconnect from the peer.
   """
-  if disconn is None:
-    try:
-      conn = peerinfo(peer)['disconnect']
-    except KeyError:
-      return
   q = T.Queue()
-  cmd = Command(['disconnect', peer], q, 'disconnect',
-                ['/bin/sh', '-c', disconn], None)
-  potwatch('disconnect', peer, q)
+  cmd = Command(['disconnect', peer.name], q, 'disconnect',
+                ['/bin/sh', '-c', cmd], None)
+  potwatch('disconnect', peer.name, q)
 
 _pingseq = 0
 class PingPeer (object):
@@ -362,7 +373,7 @@ class PingPeer (object):
   otherwise defunct instances.)
   """
 
-  def __init__(me, pinger, queue, peer, info, pingnow):
+  def __init__(me, pinger, queue, peer, pingnow):
     """
     Create a new PingPeer.
 
@@ -372,11 +383,7 @@ class PingPeer (object):
     The QUEUE is the event queue on which timer and ping-command events
     should be written.
 
-    The PEER is just the peer's name, as a string.
-
-    The INFO is the database record for the peer, as a dictionary, or None if
-    it's not readily available.  (This is just a tweak to save multiple
-    probes if we don't really need them.)
+    The PEER is a `Peer' object describing the peer.
 
     If PINGNOW is true, then immediately start pinging the peer.  Otherwise
     wait until the usual retry interval.
@@ -384,8 +391,8 @@ class PingPeer (object):
     global _pingseq
     me._pinger = pinger
     me._q = queue
-    me._peer = peer
-    me.update(info)
+    me._peer = peer.name
+    me.update(peer)
     me.seq = _pingseq
     _pingseq += 1
     me._failures = 0
@@ -395,18 +402,18 @@ class PingPeer (object):
     else:
       me._timer = M.SelTimer(time() + me._every, me._time)
 
-  def update(me, info):
+  def update(me, peer):
     """
     Refreshes the timer parameters for this peer.  We don't, however,
     immediately reschedule anything: that will happen next time anything
     interesting happens.
     """
-    if info is None:
-      info = peerinfo(me._peer)
-    me._every = timespec(info, 'every', 120)
-    me._timeout = timespec(info, 'timeout', 10)
-    me._retries = integer(info, 'retries', 5)
-    me._connectp = 'connect' in info
+    if peer is None: peer = Peer(me._peer)
+    assert peer.name == me._peer
+    me._every = peer.get('every', filter = T.timespec, default = 120)
+    me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
+    me._retries = peer.get('retries', filter = int, default = 5)
+    me._connectp = peer.has('connect')
     return me
 
   def _ping(me):
@@ -422,11 +429,11 @@ class PingPeer (object):
        me._peer]))
 
   def _reconnect(me):
-    info = peerinfo(me._peer)
-    if 'connect' in info:
+    peer = Peer(me._peer)
+    if peer.has('connect'):
       S.warn('watch', 'reconnecting', me._peer)
       S.forcekx(me._peer)
-      T.spawn(connect, me._peer)
+      T.spawn(connect, peer, peer.get('connect'))
       me._timer = M.SelTimer(time() + me._every, me._time)
     else:
       S.kill(me._peer)
@@ -512,17 +519,17 @@ class Pinger (T.Coroutine):
       if peer in me._peers and seq == me._peers[peer].seq:
         me._peers[peer].event(code, stuff)
 
-  def add(me, peer, info, pingnow):
+  def add(me, peer, pingnow):
     """
     Add PEER to the collection of peers under the Pinger's watchful eye.
     The arguments are as for PingPeer: see above.
     """
-    me._peers[peer] = PingPeer(me, me._q, peer, info, pingnow)
+    me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
     return me
 
-  def kill(me, peer):
+  def kill(me, peername):
     """Remove PEER from the peers being watched by the Pinger."""
-    del me._peers[peer]
+    del me._peers[peername]
     return me
 
   def rescan(me, startup):
@@ -544,33 +551,33 @@ class Pinger (T.Coroutine):
     if T._debug: print '# rescan peers'
     correct = {}
     start = {}
-    for peer in S.list():
-      try:
-        info = peerinfo(peer)
-      except KeyError:
-        continue
-      if boolean(info, 'watch', False):
+    for name in S.list():
+      try: peer = Peer(name)
+      except KeyError: continue
+      if peer.get('watch', filter = boolean, default = False):
         if T._debug: print '# interesting peer %s' % peer
-        correct[peer] = start[peer] = info
+        correct[peer.name] = start[peer.name] = peer
       elif startup:
         if T._debug: print '# peer %s ready for adoption' % peer
-        start[peer] = info
-    for peer, obj in me._peers.items():
-      if peer in correct:
-        obj.update(correct[peer])
+        start[peer.name] = peer
+    for name, obj in me._peers.items():
+      try:
+        peer = correct[name]
+      except KeyError:
+        if T._debug: print '# peer %s vanished' % name
+        del me._peers[name]
       else:
-        if T._debug: print '# peer %s vanished' % peer
-        del me._peers[peer]
-    for peer, info in start.iteritems():
-      if peer not in me._peers:
-        if startup:
-          if T._debug: print '# setting up peer %s' % peer
-          ifname = S.ifname(peer)
-          addr = S.addr(peer)
-          T.defer(addpeer, info, peer, ifname, *addr)
-        else:
-          if T._debug: print '# adopting new peer %s' % peer
-          me.add(peer, info, True)
+        obj.update(peer)
+    for name, peer in start.iteritems():
+      if name in me._peers: continue
+      if startup:
+        if T._debug: print '# setting up peer %s' % name
+        ifname = S.ifname(name)
+        addr = S.addr(name)
+        T.defer(addpeer, peer, ifname, *addr)
+      else:
+        if T._debug: print '# adopting new peer %s' % name
+        me.add(peer, True)
     return me
 
   def adopted(me):
@@ -593,76 +600,64 @@ def encode_envvars(env, prefix, vars):
     env[prefix + r_bad.sub('_', k.upper())] = v
 
 r_bad = RX.compile(r'[\W_]+')
-def envvars(peer, info):
+def envvars(peer):
   """
-  Translate the database INFO dictionary for a PEER into a dictionary of
+  Translate the database information for a PEER into a dictionary of
   environment variables with plausible upper-case names and a P_ prefix.
   Also collect the crypto information into A_ variables.
   """
   env = {}
-  encode_envvars(env, 'P_', info)
-  encode_envvars(env, 'A_', S.algs(peer))
+  encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
+  encode_envvars(env, 'A_', S.algs(peer.name))
   return env
 
-def ifupdown(what, peer, info, *args):
+def ifupdown(what, peer, *args):
   """
   Run the interface up/down script for a peer.
 
-  WHAT is 'ifup' or 'ifdown'.  PEER names the peer in question.  INFO is the
-  database record dictionary.  ARGS is a list of arguments to pass to the
-  script, in addition to the peer name.
+  WHAT is 'ifup' or 'ifdown'.  PEER names the peer in question.  ARGS is a
+  list of arguments to pass to the script, in addition to the peer name.
 
   The command is run and watched in the background by potwatch.
   """
   q = T.Queue()
-  c = Command([what, peer], q, what,
-              M.split(info[what], quotep = True)[0] +
-              [peer] + list(args),
-              envvars(peer, info))
-  potwatch(what, peer, q)
+  c = Command([what, peer.name], q, what,
+              M.split(peer.get(what), quotep = True)[0] +
+              [peer.name] + list(args),
+              envvars(peer))
+  potwatch(what, peer.name, q)
 
-def addpeer(info, peer, ifname, *addr):
+def addpeer(peer, ifname, *addr):
   """
   Add a new peer to our collection.
 
-  INFO is the peer information dictionary, or None if we don't have one yet.
-
-  PEER names the peer; IFNAME is the interface name for its tunnel; and ADDR
-  is the list of tokens representing its address.
+  PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
+  ADDR is the list of tokens representing its address.
 
   We try to bring up the interface and provoke a connection to the peer if
   it's passive.
   """
-  if info is None:
-    try:
-      info = peerinfo(peer)
-    except KeyError:
-      return
-  if 'ifup' in info:
-    T.Coroutine(ifupdown, name = 'ifup %s' % peer) \
-                          .switch('ifup', peer, info, ifname, *addr)
-  if 'connect' in info:
-    T.Coroutine(connect, name = 'connect %s' % peer) \
-                         .switch(peer, info['connect'])
-  if boolean(info, 'watch', False):
-    pinger.add(peer, info, False)
+  if peer.has('ifup'):
+    T.Coroutine(ifupdown, name = 'ifup %s' % peer.name) \
+        .switch('ifup', peer, ifname, *addr)
+  cmd = peer.get('connect', default = None)
+  if cmd is not None:
+    T.Coroutine(connect, name = 'connect %s' % peer.name) \
+        .switch(peer, cmd)
+  if peer.get('watch', filter = boolean, default = False):
+    pinger.add(peer, False)
 
 def delpeer(peer):
   """Drop the PEER from the Pinger and put its interface to bed."""
-  try:
-    info = peerinfo(peer)
-  except KeyError:
-    return
-  try:
-    pinger.kill(peer)
-  except KeyError:
-    pass
-  if 'disconnect' in info:
-    T.Coroutine(disconnect, name = 'disconnect %s' % peer) \
-                            .switch(peer, info['disconnect'])
-  if 'ifdown' in info:
-    T.Coroutine(ifupdown, name = 'ifdown %s' % peer) \
-                          .switch('ifdown', peer, info)
+  try: pinger.kill(peer)
+  except KeyError: pass
+  cmd = peer.get('disconnect', default = None)
+  if cmd is not None:
+    T.Coroutine(disconnect, name = 'disconnect %s' % peer.name) \
+        .switch(peer, cmd)
+  if peer.has('ifdown'):
+    T.Coroutine(ifupdown, name = 'ifdown %s' % peer.name) \
+        .switch('ifdown', peer)
 
 def notify(_, code, *rest):
   """
@@ -672,9 +667,13 @@ def notify(_, code, *rest):
   delpeer respectively.
   """
   if code == 'ADD':
-    addpeer(None, *rest)
+    try: p = Peer(rest[0])
+    except KeyError: return
+    addpeer(p, *rest[1:])
   elif code == 'KILL':
-    delpeer(*rest)
+    try: p = Peer(rest[0])
+    except KeyError: return
+    delpeer(p, *rest[1:])
 
 ###--------------------------------------------------------------------------
 ### Command stubs.
@@ -682,20 +681,22 @@ def notify(_, code, *rest):
 def cmd_stub(*args):
   raise T.TripeJobError('not-implemented')
 
-def cmd_kick(peer):
+def cmd_kick(name):
   """
-  kick PEER: Force a new connection attempt for PEER
+  kick NAME: Force a new connection attempt for the NAMEd peer.
   """
-  if peer not in pinger.adopted():
-    raise T.TripeJobError('peer-not-adopted', peer)
+  if name not in pinger.adopted():
+    raise T.TripeJobError('peer-not-adopted', name)
+  try: peer = Peer(name)
+  except KeyError: raise T.TripeJobError('unknown-peer', name)
   T.spawn(connect, peer)
 
 def cmd_adopted():
   """
   adopted: Report a list of adopted peers.
   """
-  for peer in pinger.adopted():
-    T.svcinfo(peer)
+  for name in pinger.adopted():
+    T.svcinfo(name)
 
 ###--------------------------------------------------------------------------
 ### Start up.