X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~mdw/git/tripe/blobdiff_plain/a62f8e8a94bf56194539f7140a1215bc74309b36..1b9a78583fbff3967b91e128dd7f57fcbcdc8db9:/svc/watch.in diff --git a/svc/watch.in b/svc/watch.in index bfad160c..4aaf20be 100644 --- a/svc/watch.in +++ b/svc/watch.in @@ -329,6 +329,22 @@ def connect(peer, conn = None): q.get() potwatch('connect', peer, q) +def disconnect(peer, disconn = None): + """ + Start the job of disconnecting from a passive PEER. + + The DISCONN string is a shell command which will disconnect from the peer. + """ + if disconn is None: + try: + conn = peerinfo(peer)['disconnect'] + except KeyError: + return + q = T.Queue() + cmd = Command(['disconnect', peer], q, 'disconnect', + ['/bin/sh', '-c', disconn], None) + potwatch('disconnect', peer, q) + _pingseq = 0 class PingPeer (object): """ @@ -399,12 +415,22 @@ class PingPeer (object): """ S.rawcommand(T.TripeAsynchronousCommand( me._q, (me._peer, me.seq), - ['PING', + ['EPING', '-background', S.bgtag(), '-timeout', str(me._timeout), '--', me._peer])) + def _reconnect(me): + info = peerinfo(me._peer) + if 'connect' in info: + S.warn('watch', 'reconnecting', me._peer) + S.forcekx(me._peer) + T.spawn(connect, me._peer) + me._timer = M.SelTimer(time() + me._every, me._time) + else: + S.kill(me._peer) + def event(me, code, stuff): """ Respond to an event which happened to this peer. @@ -422,8 +448,12 @@ class PingPeer (object): me._ping() elif code == 'FAIL': S.notify('watch', 'ping-failed', me._peer, *stuff) - if stuff and stuff[0] == 'unknown-peer': + if not stuff: + pass + elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer) + elif stuff[0] == 'ping-send-failed': + me._reconnect() elif code == 'INFO': if stuff[0] == 'ping-ok': if me._failures > 0: @@ -436,14 +466,7 @@ class PingPeer (object): if me._failures < me._retries: me._ping() else: - info = peerinfo(me._peer) - if 'connect' in info: - S.warn('watch', 'reconnecting', me._peer) - S.forcekx(me._peer) - T.spawn(T.Coroutine(connect), me._peer) - me._timer = M.SelTimer(time() + me._every, me._time) - else: - S.kill(me._peer) + me._reconnect() elif stuff[0] == 'ping-peer-died': me._pinger.kill(me._peer) @@ -518,26 +541,35 @@ class Pinger (T.Coroutine): installed, and again by the dbwatcher coroutine when it detects a change to the database. """ + if T._debug: print '# rescan peers' correct = {} + start = {} for peer in S.list(): try: info = peerinfo(peer) except KeyError: continue if boolean(info, 'watch', False): - correct[peer] = info + if T._debug: print '# interesting peer %s' % peer + correct[peer] = start[peer] = info + elif startup: + if T._debug: print '# peer %s ready for adoption' % peer + start[peer] = info for peer, obj in me._peers.items(): if peer in correct: obj.update(correct[peer]) else: + if T._debug: print '# peer %s vanished' % peer del me._peers[peer] - for peer, info in correct.iteritems(): + for peer, info in start.iteritems(): if peer not in me._peers: if startup: + if T._debug: print '# setting up peer %s' % peer ifname = S.ifname(peer) addr = S.addr(peer) - addpeer(info, peer, ifname, *addr) + T.defer(addpeer, info, peer, ifname, *addr) else: + if T._debug: print '# adopting new peer %s' % peer me.add(peer, info, True) return me @@ -561,15 +593,15 @@ def encode_envvars(env, prefix, vars): env[prefix + r_bad.sub('_', k.upper())] = v r_bad = RX.compile(r'[\W_]+') -def envvars(info): +def envvars(peer, info): """ - Translate the database INFO dictionary for a peer into a dictionary of + Translate the database INFO dictionary for a PEER into a dictionary of environment variables with plausible upper-case names and a P_ prefix. Also collect the crypto information into A_ variables. """ env = {} encode_envvars(env, 'P_', info) - encode_envvars(env, 'A_', S.algs()) + encode_envvars(env, 'A_', S.algs(peer)) return env def ifupdown(what, peer, info, *args): @@ -586,7 +618,7 @@ def ifupdown(what, peer, info, *args): c = Command([what, peer], q, what, M.split(info[what], quotep = True)[0] + [peer] + list(args), - envvars(info)) + envvars(peer, info)) potwatch(what, peer, q) def addpeer(info, peer, ifname, *addr): @@ -607,9 +639,11 @@ def addpeer(info, peer, ifname, *addr): except KeyError: return if 'ifup' in info: - T.Coroutine(ifupdown).switch('ifup', peer, info, ifname, *addr) + T.Coroutine(ifupdown, name = 'ifup %s' % peer) \ + .switch('ifup', peer, info, ifname, *addr) if 'connect' in info: - T.Coroutine(connect).switch(peer, info['connect']) + T.Coroutine(connect, name = 'connect %s' % peer) \ + .switch(peer, info['connect']) if boolean(info, 'watch', False): pinger.add(peer, info, False) @@ -623,8 +657,12 @@ def delpeer(peer): pinger.kill(peer) except KeyError: pass + if 'disconnect' in info: + T.Coroutine(disconnect, name = 'disconnect %s' % peer) \ + .switch(peer, info['disconnect']) if 'ifdown' in info: - T.Coroutine(ifupdown).switch('ifdown', peer, info) + T.Coroutine(ifupdown, name = 'ifdown %s' % peer) \ + .switch('ifdown', peer, info) def notify(_, code, *rest): """ @@ -650,7 +688,7 @@ def cmd_kick(peer): """ if peer not in pinger.adopted(): raise T.TripeJobError('peer-not-adopted', peer) - T.spawn(T.Coroutine(connect), peer) + T.spawn(connect, peer) def cmd_adopted(): """ @@ -680,7 +718,7 @@ def init(): errorwatch = ErrorWatch() childwatch = ChildWatch() pinger = Pinger() - T.Coroutine(dbwatch).switch() + T.Coroutine(dbwatch, name = 'dbwatch').switch() errorwatch.switch() pinger.switch()