q.get()
potwatch('connect', peer, q)
+def disconnect(peer, disconn = None):
+ """
+ Start the job of disconnecting from a passive PEER.
+
+ The DISCONN string is a shell command which will disconnect from the peer.
+ """
+ if disconn is None:
+ try:
+ conn = peerinfo(peer)['disconnect']
+ except KeyError:
+ return
+ q = T.Queue()
+ cmd = Command(['disconnect', peer], q, 'disconnect',
+ ['/bin/sh', '-c', disconn], None)
+ potwatch('disconnect', peer, q)
+
_pingseq = 0
class PingPeer (object):
"""
"""
S.rawcommand(T.TripeAsynchronousCommand(
me._q, (me._peer, me.seq),
- ['PING',
+ ['EPING',
'-background', S.bgtag(),
'-timeout', str(me._timeout),
'--',
me._peer]))
+ def _reconnect(me):
+ info = peerinfo(me._peer)
+ if 'connect' in info:
+ S.warn('watch', 'reconnecting', me._peer)
+ S.forcekx(me._peer)
+ T.spawn(connect, me._peer)
+ me._timer = M.SelTimer(time() + me._every, me._time)
+ else:
+ S.kill(me._peer)
+
def event(me, code, stuff):
"""
Respond to an event which happened to this peer.
me._ping()
elif code == 'FAIL':
S.notify('watch', 'ping-failed', me._peer, *stuff)
- if stuff and stuff[0] == 'unknown-peer':
+ if not stuff:
+ pass
+ elif stuff[0] == 'unknown-peer':
me._pinger.kill(me._peer)
+ elif stuff[0] == 'ping-send-failed':
+ me._reconnect()
elif code == 'INFO':
if stuff[0] == 'ping-ok':
if me._failures > 0:
if me._failures < me._retries:
me._ping()
else:
- info = peerinfo(me._peer)
- if 'connect' in info:
- S.warn('watch', 'reconnecting', me._peer)
- S.forcekx(me._peer)
- T.spawn(T.Coroutine(connect), me._peer)
- me._timer = M.SelTimer(time() + me._every, me._time)
- else:
- S.kill(me._peer)
+ me._reconnect()
elif stuff[0] == 'ping-peer-died':
me._pinger.kill(me._peer)
installed, and again by the dbwatcher coroutine when it detects a change
to the database.
"""
+ if T._debug: print '# rescan peers'
correct = {}
+ start = {}
for peer in S.list():
try:
info = peerinfo(peer)
except KeyError:
continue
if boolean(info, 'watch', False):
- correct[peer] = info
+ if T._debug: print '# interesting peer %s' % peer
+ correct[peer] = start[peer] = info
+ elif startup:
+ if T._debug: print '# peer %s ready for adoption' % peer
+ start[peer] = info
for peer, obj in me._peers.items():
if peer in correct:
obj.update(correct[peer])
else:
+ if T._debug: print '# peer %s vanished' % peer
del me._peers[peer]
- for peer, info in correct.iteritems():
+ for peer, info in start.iteritems():
if peer not in me._peers:
if startup:
+ if T._debug: print '# setting up peer %s' % peer
ifname = S.ifname(peer)
addr = S.addr(peer)
- addpeer(info, peer, ifname, *addr)
+ T.defer(addpeer, info, peer, ifname, *addr)
else:
+ if T._debug: print '# adopting new peer %s' % peer
me.add(peer, info, True)
return me
env[prefix + r_bad.sub('_', k.upper())] = v
r_bad = RX.compile(r'[\W_]+')
-def envvars(info):
+def envvars(peer, info):
"""
- Translate the database INFO dictionary for a peer into a dictionary of
+ Translate the database INFO dictionary for a PEER into a dictionary of
environment variables with plausible upper-case names and a P_ prefix.
Also collect the crypto information into A_ variables.
"""
env = {}
encode_envvars(env, 'P_', info)
- encode_envvars(env, 'A_', S.algs())
+ encode_envvars(env, 'A_', S.algs(peer))
return env
def ifupdown(what, peer, info, *args):
c = Command([what, peer], q, what,
M.split(info[what], quotep = True)[0] +
[peer] + list(args),
- envvars(info))
+ envvars(peer, info))
potwatch(what, peer, q)
def addpeer(info, peer, ifname, *addr):
except KeyError:
return
if 'ifup' in info:
- T.Coroutine(ifupdown).switch('ifup', peer, info, ifname, *addr)
+ T.Coroutine(ifupdown, name = 'ifup %s' % peer) \
+ .switch('ifup', peer, info, ifname, *addr)
if 'connect' in info:
- T.Coroutine(connect).switch(peer, info['connect'])
+ T.Coroutine(connect, name = 'connect %s' % peer) \
+ .switch(peer, info['connect'])
if boolean(info, 'watch', False):
pinger.add(peer, info, False)
pinger.kill(peer)
except KeyError:
pass
+ if 'disconnect' in info:
+ T.Coroutine(disconnect, name = 'disconnect %s' % peer) \
+ .switch(peer, info['disconnect'])
if 'ifdown' in info:
- T.Coroutine(ifupdown).switch('ifdown', peer, info)
+ T.Coroutine(ifupdown, name = 'ifdown %s' % peer) \
+ .switch('ifdown', peer, info)
def notify(_, code, *rest):
"""
"""
if peer not in pinger.adopted():
raise T.TripeJobError('peer-not-adopted', peer)
- T.spawn(T.Coroutine(connect), peer)
+ T.spawn(connect, peer)
def cmd_adopted():
"""
errorwatch = ErrorWatch()
childwatch = ChildWatch()
pinger = Pinger()
- T.Coroutine(dbwatch).switch()
+ T.Coroutine(dbwatch, name = 'dbwatch').switch()
errorwatch.switch()
pinger.switch()