chiark / gitweb /
Fix some format and other varargs errors.
[tripe] / svc / watch.in
index bfad160cf7178c1441fe05df5757623bcc9247a9..4aaf20be713f1198c62ce10366b1bf8542d3870d 100644 (file)
@@ -329,6 +329,22 @@ def connect(peer, conn = None):
       q.get()
   potwatch('connect', peer, q)
 
+def disconnect(peer, disconn = None):
+  """
+  Start the job of disconnecting from a passive PEER.
+
+  The DISCONN string is a shell command which will disconnect from the peer.
+  """
+  if disconn is None:
+    try:
+      conn = peerinfo(peer)['disconnect']
+    except KeyError:
+      return
+  q = T.Queue()
+  cmd = Command(['disconnect', peer], q, 'disconnect',
+                ['/bin/sh', '-c', disconn], None)
+  potwatch('disconnect', peer, q)
+
 _pingseq = 0
 class PingPeer (object):
   """
@@ -399,12 +415,22 @@ class PingPeer (object):
     """
     S.rawcommand(T.TripeAsynchronousCommand(
       me._q, (me._peer, me.seq),
-      ['PING',
+      ['EPING',
        '-background', S.bgtag(),
        '-timeout', str(me._timeout),
        '--',
        me._peer]))
 
+  def _reconnect(me):
+    info = peerinfo(me._peer)
+    if 'connect' in info:
+      S.warn('watch', 'reconnecting', me._peer)
+      S.forcekx(me._peer)
+      T.spawn(connect, me._peer)
+      me._timer = M.SelTimer(time() + me._every, me._time)
+    else:
+      S.kill(me._peer)
+
   def event(me, code, stuff):
     """
     Respond to an event which happened to this peer.
@@ -422,8 +448,12 @@ class PingPeer (object):
       me._ping()
     elif code == 'FAIL':
       S.notify('watch', 'ping-failed', me._peer, *stuff)
-      if stuff and stuff[0] == 'unknown-peer':
+      if not stuff:
+        pass
+      elif stuff[0] == 'unknown-peer':
         me._pinger.kill(me._peer)
+      elif stuff[0] == 'ping-send-failed':
+        me._reconnect()
     elif code == 'INFO':
       if stuff[0] == 'ping-ok':
         if me._failures > 0:
@@ -436,14 +466,7 @@ class PingPeer (object):
         if me._failures < me._retries:
           me._ping()
         else:
-          info = peerinfo(me._peer)
-          if 'connect' in info:
-            S.warn('watch', 'reconnecting', me._peer)
-            S.forcekx(me._peer)
-            T.spawn(T.Coroutine(connect), me._peer)
-            me._timer = M.SelTimer(time() + me._every, me._time)
-          else:
-            S.kill(me._peer)
+          me._reconnect()
       elif stuff[0] == 'ping-peer-died':
         me._pinger.kill(me._peer)
 
@@ -518,26 +541,35 @@ class Pinger (T.Coroutine):
     installed, and again by the dbwatcher coroutine when it detects a change
     to the database.
     """
+    if T._debug: print '# rescan peers'
     correct = {}
+    start = {}
     for peer in S.list():
       try:
         info = peerinfo(peer)
       except KeyError:
         continue
       if boolean(info, 'watch', False):
-        correct[peer] = info
+        if T._debug: print '# interesting peer %s' % peer
+        correct[peer] = start[peer] = info
+      elif startup:
+        if T._debug: print '# peer %s ready for adoption' % peer
+        start[peer] = info
     for peer, obj in me._peers.items():
       if peer in correct:
         obj.update(correct[peer])
       else:
+        if T._debug: print '# peer %s vanished' % peer
         del me._peers[peer]
-    for peer, info in correct.iteritems():
+    for peer, info in start.iteritems():
       if peer not in me._peers:
         if startup:
+          if T._debug: print '# setting up peer %s' % peer
           ifname = S.ifname(peer)
           addr = S.addr(peer)
-          addpeer(info, peer, ifname, *addr)
+          T.defer(addpeer, info, peer, ifname, *addr)
         else:
+          if T._debug: print '# adopting new peer %s' % peer
           me.add(peer, info, True)
     return me
 
@@ -561,15 +593,15 @@ def encode_envvars(env, prefix, vars):
     env[prefix + r_bad.sub('_', k.upper())] = v
 
 r_bad = RX.compile(r'[\W_]+')
-def envvars(info):
+def envvars(peer, info):
   """
-  Translate the database INFO dictionary for a peer into a dictionary of
+  Translate the database INFO dictionary for a PEER into a dictionary of
   environment variables with plausible upper-case names and a P_ prefix.
   Also collect the crypto information into A_ variables.
   """
   env = {}
   encode_envvars(env, 'P_', info)
-  encode_envvars(env, 'A_', S.algs())
+  encode_envvars(env, 'A_', S.algs(peer))
   return env
 
 def ifupdown(what, peer, info, *args):
@@ -586,7 +618,7 @@ def ifupdown(what, peer, info, *args):
   c = Command([what, peer], q, what,
               M.split(info[what], quotep = True)[0] +
               [peer] + list(args),
-              envvars(info))
+              envvars(peer, info))
   potwatch(what, peer, q)
 
 def addpeer(info, peer, ifname, *addr):
@@ -607,9 +639,11 @@ def addpeer(info, peer, ifname, *addr):
     except KeyError:
       return
   if 'ifup' in info:
-    T.Coroutine(ifupdown).switch('ifup', peer, info, ifname, *addr)
+    T.Coroutine(ifupdown, name = 'ifup %s' % peer) \
+                          .switch('ifup', peer, info, ifname, *addr)
   if 'connect' in info:
-    T.Coroutine(connect).switch(peer, info['connect'])
+    T.Coroutine(connect, name = 'connect %s' % peer) \
+                         .switch(peer, info['connect'])
   if boolean(info, 'watch', False):
     pinger.add(peer, info, False)
 
@@ -623,8 +657,12 @@ def delpeer(peer):
     pinger.kill(peer)
   except KeyError:
     pass
+  if 'disconnect' in info:
+    T.Coroutine(disconnect, name = 'disconnect %s' % peer) \
+                            .switch(peer, info['disconnect'])
   if 'ifdown' in info:
-    T.Coroutine(ifupdown).switch('ifdown', peer, info)
+    T.Coroutine(ifupdown, name = 'ifdown %s' % peer) \
+                          .switch('ifdown', peer, info)
 
 def notify(_, code, *rest):
   """
@@ -650,7 +688,7 @@ def cmd_kick(peer):
   """
   if peer not in pinger.adopted():
     raise T.TripeJobError('peer-not-adopted', peer)
-  T.spawn(T.Coroutine(connect), peer)
+  T.spawn(connect, peer)
 
 def cmd_adopted():
   """
@@ -680,7 +718,7 @@ def init():
   errorwatch = ErrorWatch()
   childwatch = ChildWatch()
   pinger = Pinger()
-  T.Coroutine(dbwatch).switch()
+  T.Coroutine(dbwatch, name = 'dbwatch').switch()
   errorwatch.switch()
   pinger.switch()