import os as OS
import signal as SIG
import errno as E
+from math import sqrt
import cdb as CDB
import mLib as M
import re as RX
+import sys as SYS
from time import time
import subprocess as PROC
a FILTER function is given then apply it to the information from the
database before returning it.
"""
- attr = me.__dict__.get(key, default)
- if attr is _magic:
- raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
- elif filter is not None:
- attr = filter(attr)
- return attr
+ try:
+ attr = me.__dict__[key]
+ except KeyError:
+ if default is _magic:
+ raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
+ return default
+ else:
+ if filter is not None: attr = filter(attr)
+ return attr
def has(me, key):
"""
me.seq = _pingseq
_pingseq += 1
me._failures = 0
+ me._sabotage = False
+ me._last = '-'
+ me._nping = 0
+ me._nlost = 0
+ me._sigma_t = 0
+ me._sigma_t2 = 0
+ me._min = me._max = '-'
if pingnow:
me._timer = None
me._ping()
me._peer]))
def _reconnect(me):
- peer = Peer(me._peer)
- if me._connectp:
- S.warn('connect', 'reconnecting', me._peer)
- S.forcekx(me._peer)
- T.spawn(run_connect, peer, peer.get('connect'))
- me._timer = M.SelTimer(time() + me._every, me._time)
- else:
- S.kill(me._peer)
+ try:
+ peer = Peer(me._peer)
+ if me._connectp:
+ S.warn('connect', 'reconnecting', me._peer)
+ S.forcekx(me._peer)
+ T.spawn(run_connect, peer, peer.get('connect'))
+ me._timer = M.SelTimer(time() + me._every, me._time)
+ me._sabotage = False
+ else:
+ S.kill(me._peer)
+ except TripeError, e:
+ if e.args[0] == 'unknown-peer': me._pinger.kill(me._peer)
def event(me, code, stuff):
"""
me._ping()
elif code == 'FAIL':
S.notify('connect', 'ping-failed', me._peer, *stuff)
- if not stuff:
- pass
- elif stuff[0] == 'unknown-peer':
- me._pinger.kill(me._peer)
- elif stuff[0] == 'ping-send-failed':
- me._reconnect()
+ if not stuff: pass
+ elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer)
+ elif stuff[0] == 'ping-send-failed': me._reconnect()
elif code == 'INFO':
- if stuff[0] == 'ping-ok':
- if me._failures > 0:
- S.warn('connect', 'ping-ok', me._peer)
+ outcome = stuff[0]
+ if outcome == 'ping-ok' and me._sabotage:
+ outcome = 'ping-timeout'
+ if outcome == 'ping-ok':
+ if me._failures > 0: S.warn('connect', 'ping-ok', me._peer)
+ t = float(stuff[1])
+ me._last = '%.1fms' % t
+ me._sigma_t += t
+ me._sigma_t2 += t*t
+ me._nping += 1
+ if me._min == '-' or t < me._min: me._min = t
+ if me._max == '-' or t > me._max: me._max = t
me._timer = M.SelTimer(time() + me._every, me._time)
- elif stuff[0] == 'ping-timeout':
+ elif outcome == 'ping-timeout':
me._failures += 1
+ me._nlost += 1
S.warn('connect', 'ping-timeout', me._peer,
'attempt', str(me._failures), 'of', str(me._retries))
if me._failures < me._retries:
me._ping()
+ me._last = 'timeout'
else:
me._reconnect()
- elif stuff[0] == 'ping-peer-died':
+ me._last = 'reconnect'
+ elif outcome == 'ping-peer-died':
me._pinger.kill(me._peer)
+ def sabotage(me):
+ """Sabotage the peer, for testing purposes."""
+ me._sabotage = True
+ if me._timer: me._timer.kill()
+ T.defer(me._time)
+
+ def info(me):
+ if not me._nping:
+ mean = sd = '-'
+ else:
+ mean = me._sigma_t/me._nping
+ sd = sqrt(me._sigma_t2/me._nping - mean*mean)
+ n = me._nping + me._nlost
+ if not n: pclost = '-'
+ else: pclost = '%d' % ((100*me._nlost + n//2)//n)
+ return { 'last-ping': me._last,
+ 'mean-ping': '%.1fms' % mean,
+ 'sd-ping': '%.1fms' % sd,
+ 'n-ping': '%d' % me._nping,
+ 'n-lost': '%d' % me._nlost,
+ 'percent-lost': pclost,
+ 'min-ping': '%.1fms' % me._min,
+ 'max-ping': '%.1fms' % me._max,
+ 'state': me._timer and 'idle' or 'check',
+ 'failures': me._failures }
+
@T._callback
def _time(me):
"""
while True:
(peer, seq), code, stuff = me._q.get()
if peer in me._peers and seq == me._peers[peer].seq:
- me._peers[peer].event(code, stuff)
+ try: me._peers[peer].event(code, stuff)
+ except Exception, e:
+ SYS.excepthook(*SYS.exc_info())
def add(me, peer, pingnow):
"""
def kill(me, peername):
"""Remove PEER from the peers being watched by the Pinger."""
- del me._peers[peername]
+ try: del me._peers[peername]
+ except KeyError: pass
return me
def rescan(me, startup):
"""
return me._peers.keys()
+ def find(me, name):
+ """Return the PingPeer with the given name."""
+ return me._peers[name]
+
###--------------------------------------------------------------------------
### New connections.
if peer.name in S.list():
S.kill(peer.name)
try:
- booltrue = ['t', 'true', 'y', 'yes', 'on']
S.add(peer.name,
- tunnel = peer.get('tunnel', None),
- keepalive = peer.get('keepalive', None),
- key = peer.get('key', None),
- priv = peer.get('priv', None),
- mobile = peer.get('mobile', 'nil') in booltrue,
- cork = peer.get('cork', 'nil') in booltrue,
+ tunnel = peer.get('tunnel', default = None),
+ keepalive = peer.get('keepalive', default = None),
+ key = peer.get('key', default = None),
+ priv = peer.get('priv', default = None),
+ mobile = peer.get('mobile', filter = boolean, default = False),
+ cork = peer.get('cork', filter = boolean, default = False),
*addr)
except T.TripeError, exc:
raise T.TripeJobError(*exc.args)
"""
kick NAME: Force a new connection attempt for the NAMEd peer.
"""
- if name not in pinger.adopted():
- raise T.TripeJobError('peer-not-adopted', name)
+ try: pp = pinger.find(name)
+ except KeyError: raise T.TripeJobError('peer-not-adopted', name)
try: peer = Peer(name)
except KeyError: raise T.TripeJobError('unknown-peer', name)
- T.spawn(run_connect, peer, peer.get('connect'))
+ conn = peer.get('connect', None)
+ if conn: T.spawn(run_connect, peer, peer.get('connect'))
+ else: T.spawn(lambda p: S.forcekx(p.name), peer)
def cmd_adopted():
"""
"""
try: peer = Peer(name)
except KeyError: raise T.TripeJobError('unknown-peer', name)
- items = list(peer.list())
+ d = {}
+ try: pp = pinger.find(name)
+ except KeyError: pass
+ else: d.update(pp.info())
+ items = list(peer.list()) + d.keys()
items.sort()
for i in items:
- T.svcinfo('%s=%s' % (i, peer.get(i)))
+ try: v = d[i]
+ except KeyError: v = peer.get(i)
+ T.svcinfo('%s=%s' % (i, v))
def cmd_userpeer(user):
"""
finally:
del chalmap[chal]
+def cmd_sabotage(name):
+ """
+ sabotage NAME: Sabotage the NAMEd peer so that we think it can't be pinged.
+ """
+ try: pp = pinger.find(name)
+ except KeyError: raise T.TripeJobError('unknown-peer', name)
+ pp.sabotage()
+
###--------------------------------------------------------------------------
### Start up.
'active': (1, 1, 'PEER', cmd_active),
'info': (1, 1, 'PEER', cmd_info),
'list-active': (0, 0, '', cmd_listactive),
- 'userpeer': (1, 1, 'USER', cmd_userpeer)
+ 'userpeer': (1, 1, 'USER', cmd_userpeer),
+ 'sabotage': (1, 1, 'PEER', cmd_sabotage)
})]
if __name__ == '__main__':
opts = parse_options()
+ OS.environ['TRIPESOCK'] = opts.tripesock
T.runservices(opts.tripesock, service_info,
init = init, setup = setup,
daemon = opts.daemon)