chiark / gitweb /
server/admin.c: Remove spurious `ping' in usage message.
[tripe] / svc / connect.in
index 917366d15eb0b86553311c798072ede0133fee5e..25c80b0fcc620ebe1d43d00a6d135acfc64f9889 100644 (file)
@@ -33,9 +33,11 @@ import tripe as T
 import os as OS
 import signal as SIG
 import errno as E
+from math import sqrt
 import cdb as CDB
 import mLib as M
 import re as RX
+import sys as SYS
 from time import time
 import subprocess as PROC
 
@@ -294,12 +296,15 @@ class Peer (object):
     a FILTER function is given then apply it to the information from the
     database before returning it.
     """
-    attr = me.__dict__.get(key, default)
-    if attr is _magic:
-      raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
-    elif filter is not None:
-      attr = filter(attr)
-    return attr
+    try:
+      attr = me.__dict__[key]
+    except KeyError:
+      if default is _magic:
+        raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
+      return default
+    else:
+      if filter is not None: attr = filter(attr)
+      return attr
 
   def has(me, key):
     """
@@ -395,6 +400,13 @@ class PingPeer (object):
     me.seq = _pingseq
     _pingseq += 1
     me._failures = 0
+    me._sabotage = False
+    me._last = '-'
+    me._nping = 0
+    me._nlost = 0
+    me._sigma_t = 0
+    me._sigma_t2 = 0
+    me._min = me._max = '-'
     if pingnow:
       me._timer = None
       me._ping()
@@ -413,6 +425,7 @@ class PingPeer (object):
     me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
     me._retries = peer.get('retries', filter = int, default = 5)
     me._connectp = peer.has('connect')
+    me._knockp = peer.has('knock')
     return me
 
   def _ping(me):
@@ -428,14 +441,18 @@ class PingPeer (object):
        me._peer]))
 
   def _reconnect(me):
-    peer = Peer(me._peer)
-    if me._connectp:
-      S.warn('connect', 'reconnecting', me._peer)
-      S.forcekx(me._peer)
-      T.spawn(run_connect, peer, peer.get('connect'))
-      me._timer = M.SelTimer(time() + me._every, me._time)
-    else:
-      S.kill(me._peer)
+    try:
+      peer = Peer(me._peer)
+      if me._connectp or me._knockp:
+        S.warn('connect', 'reconnecting', me._peer)
+        S.forcekx(me._peer)
+        if me._connectp: T.spawn(run_connect, peer, peer.get('connect'))
+        me._timer = M.SelTimer(time() + me._every, me._time)
+        me._sabotage = False
+      else:
+        S.kill(me._peer)
+    except TripeError, e:
+      if e.args[0] == 'unknown-peer': me._pinger.kill(me._peer)
 
   def event(me, code, stuff):
     """
@@ -454,28 +471,63 @@ class PingPeer (object):
       me._ping()
     elif code == 'FAIL':
       S.notify('connect', 'ping-failed', me._peer, *stuff)
-      if not stuff:
-        pass
-      elif stuff[0] == 'unknown-peer':
-        me._pinger.kill(me._peer)
-      elif stuff[0] == 'ping-send-failed':
-        me._reconnect()
+      if not stuff: pass
+      elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer)
+      elif stuff[0] == 'ping-send-failed': me._reconnect()
     elif code == 'INFO':
-      if stuff[0] == 'ping-ok':
-        if me._failures > 0:
-          S.warn('connect', 'ping-ok', me._peer)
+      outcome = stuff[0]
+      if outcome == 'ping-ok' and me._sabotage:
+        outcome = 'ping-timeout'
+      if outcome == 'ping-ok':
+        if me._failures > 0: S.warn('connect', 'ping-ok', me._peer)
+        t = float(stuff[1])
+        me._last = '%.1fms' % t
+        me._sigma_t += t
+        me._sigma_t2 += t*t
+        me._nping += 1
+        if me._min == '-' or t < me._min: me._min = t
+        if me._max == '-' or t > me._max: me._max = t
         me._timer = M.SelTimer(time() + me._every, me._time)
-      elif stuff[0] == 'ping-timeout':
+      elif outcome == 'ping-timeout':
         me._failures += 1
+        me._nlost += 1
         S.warn('connect', 'ping-timeout', me._peer,
                'attempt', str(me._failures), 'of', str(me._retries))
         if me._failures < me._retries:
           me._ping()
+          me._last = 'timeout'
         else:
           me._reconnect()
-      elif stuff[0] == 'ping-peer-died':
+          me._last = 'reconnect'
+      elif outcome == 'ping-peer-died':
         me._pinger.kill(me._peer)
 
+  def sabotage(me):
+    """Sabotage the peer, for testing purposes."""
+    me._sabotage = True
+    if me._timer: me._timer.kill()
+    T.defer(me._time)
+
+  def info(me):
+    if not me._nping:
+      mean = sd = '-'
+    else:
+      mean = me._sigma_t/me._nping
+      sd = sqrt(me._sigma_t2/me._nping - mean*mean)
+    n = me._nping + me._nlost
+    if not n: pclost = '-'
+    else: pclost = '%d' % ((100*me._nlost + n//2)//n)
+    return { 'last-ping': me._last,
+             'mean-ping': '%.1fms' % mean,
+             'sd-ping': '%.1fms' % sd,
+             'n-ping': '%d' % me._nping,
+             'n-lost': '%d' % me._nlost,
+             'percent-lost': pclost,
+             'min-ping': '%.1fms' % me._min,
+             'max-ping': '%.1fms' % me._max,
+             'state': me._timer and 'idle' or 'check',
+             'failures': str(me._failures) }
+
   @T._callback
   def _time(me):
     """
@@ -516,7 +568,9 @@ class Pinger (T.Coroutine):
     while True:
       (peer, seq), code, stuff = me._q.get()
       if peer in me._peers and seq == me._peers[peer].seq:
-        me._peers[peer].event(code, stuff)
+        try: me._peers[peer].event(code, stuff)
+        except Exception, e:
+          SYS.excepthook(*SYS.exc_info())
 
   def add(me, peer, pingnow):
     """
@@ -663,23 +717,26 @@ def disownpeer(peer):
     T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
         .switch('ifdown', peer)
 
-def addpeer(peer, addr):
+def addpeer(peer, addr, ephemp):
   """
   Process a connect request from a new peer PEER on address ADDR.
 
-  Any existing peer with this name is disconnected from the server.
+  Any existing peer with this name is disconnected from the server.  EPHEMP
+  is the default ephemeral-ness state for the new peer.
   """
   if peer.name in S.list():
     S.kill(peer.name)
   try:
-    booltrue = ['t', 'true', 'y', 'yes', 'on']
     S.add(peer.name,
-          tunnel = peer.get('tunnel', None),
-          keepalive = peer.get('keepalive', None),
-          key = peer.get('key', None),
-          priv = peer.get('priv', None),
-          mobile = peer.get('mobile', 'nil') in booltrue,
-          cork = peer.get('cork', 'nil') in booltrue,
+          tunnel = peer.get('tunnel', default = None),
+          keepalive = peer.get('keepalive', default = None),
+          key = peer.get('key', default = None),
+          priv = peer.get('priv', default = None),
+          mobile = peer.get('mobile', filter = boolean, default = False),
+          knock = peer.get('knock', default = None),
+          cork = peer.get('cork', filter = boolean, default = False),
+          ephemeral = peer.get('ephemeral', filter = boolean,
+                               default = ephemp),
           *addr)
   except T.TripeError, exc:
     raise T.TripeJobError(*exc.args)
@@ -708,6 +765,23 @@ def notify(_, code, *rest):
     try: cr = chalmap[chal]
     except KeyError: pass
     else: cr.switch(rest[1:])
+  elif code == 'KNOCK':
+    try: p = Peer(rest[0])
+    except KeyError:
+      S.warn(['connect', 'knock-unknown-peer', rest[0]])
+      return
+    if p.get('peer') != 'PASSIVE':
+      S.warn(['connect', 'knock-active-peer', p.name])
+      return
+    dot = p.name.find('.')
+    if dot >= 0: kname = p.name[dot + 1:]
+    else: kname = p.name
+    ktag = p.get('key', p.name)
+    if kname != ktag:
+      S.warn(['connect', 'knock-tag-mismatch',
+              'peer', pname, 'public-key-tag', ktag])
+      return
+    T.spawn(addpeer, p, rest[1:], True)
 
 ###--------------------------------------------------------------------------
 ### Command implementation.
@@ -720,7 +794,9 @@ def cmd_kick(name):
   except KeyError: raise T.TripeJobError('peer-not-adopted', name)
   try: peer = Peer(name)
   except KeyError: raise T.TripeJobError('unknown-peer', name)
-  T.spawn(run_connect, peer, peer.get('connect'))
+  conn = peer.get('connect', None)
+  if conn: T.spawn(run_connect, peer, peer.get('connect'))
+  else: T.spawn(lambda p: S.forcekx(p.name), peer)
 
 def cmd_adopted():
   """
@@ -740,7 +816,7 @@ def cmd_active(name):
   addr = peer.get('peer')
   if addr == 'PASSIVE':
     raise T.TripeJobError('passive-peer', name)
-  addpeer(peer, M.split(addr, quotep = True)[0])
+  addpeer(peer, M.split(addr, quotep = True)[0], True)
 
 def cmd_listactive():
   """
@@ -757,10 +833,16 @@ def cmd_info(name):
   """
   try: peer = Peer(name)
   except KeyError: raise T.TripeJobError('unknown-peer', name)
-  items = list(peer.list())
+  d = {}
+  try: pp = pinger.find(name)
+  except KeyError: pass
+  else: d.update(pp.info())
+  items = list(peer.list()) + d.keys()
   items.sort()
   for i in items:
-    T.svcinfo('%s=%s' % (i, peer.get(i)))
+    try: v = d[i]
+    except KeyError: v = peer.get(i)
+    T.svcinfo('%s=%s' % (i, v.replace('\n', ' ')))
 
 def cmd_userpeer(user):
   """
@@ -796,10 +878,18 @@ def cmd_passive(*args):
     addr = cr.parent.switch()
     if addr is None:
       raise T.TripeJobError('connect-timeout')
-    addpeer(peer, addr)
+    addpeer(peer, addr, True)
   finally:
     del chalmap[chal]
 
+def cmd_sabotage(name):
+  """
+  sabotage NAME: Sabotage the NAMEd peer so that we think it can't be pinged.
+  """
+  try: pp = pinger.find(name)
+  except KeyError: raise T.TripeJobError('unknown-peer', name)
+  pp.sabotage()
+
 ###--------------------------------------------------------------------------
 ### Start up.
 
@@ -824,7 +914,7 @@ def setup():
     for name in M.split(autos)[0]:
       try:
         peer = Peer(name, cdb)
-        addpeer(peer, M.split(peer.get('peer'), quotep = True)[0])
+        addpeer(peer, M.split(peer.get('peer'), quotep = True)[0], False)
       except T.TripeJobError, err:
         S.warn('connect', 'auto-add-failed', name, *err.args)
 
@@ -883,11 +973,13 @@ service_info = [('connect', T.VERSION, {
   'active': (1, 1, 'PEER', cmd_active),
   'info': (1, 1, 'PEER', cmd_info),
   'list-active': (0, 0, '', cmd_listactive),
-  'userpeer': (1, 1, 'USER', cmd_userpeer)
+  'userpeer': (1, 1, 'USER', cmd_userpeer),
+  'sabotage': (1, 1, 'PEER', cmd_sabotage)
 })]
 
 if __name__ == '__main__':
   opts = parse_options()
+  OS.environ['TRIPESOCK'] = opts.tripesock
   T.runservices(opts.tripesock, service_info,
                 init = init, setup = setup,
                 daemon = opts.daemon)