X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~mdw/git/tripe/blobdiff_plain/a62f8e8a94bf56194539f7140a1215bc74309b36..1171524232347e4c8fbd47409577c5e988ebc6d6:/svc/watch.in diff --git a/svc/watch.in b/svc/watch.in index bfad160c..e058ed8e 100644 --- a/svc/watch.in +++ b/svc/watch.in @@ -399,12 +399,22 @@ class PingPeer (object): """ S.rawcommand(T.TripeAsynchronousCommand( me._q, (me._peer, me.seq), - ['PING', + ['EPING', '-background', S.bgtag(), '-timeout', str(me._timeout), '--', me._peer])) + def _reconnect(me): + info = peerinfo(me._peer) + if 'connect' in info: + S.warn('watch', 'reconnecting', me._peer) + S.forcekx(me._peer) + T.spawn(connect, me._peer) + me._timer = M.SelTimer(time() + me._every, me._time) + else: + S.kill(me._peer) + def event(me, code, stuff): """ Respond to an event which happened to this peer. @@ -422,8 +432,12 @@ class PingPeer (object): me._ping() elif code == 'FAIL': S.notify('watch', 'ping-failed', me._peer, *stuff) - if stuff and stuff[0] == 'unknown-peer': + if not stuff: + pass + elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer) + elif stuff[0] == 'ping-send-failed': + me._reconnect() elif code == 'INFO': if stuff[0] == 'ping-ok': if me._failures > 0: @@ -436,14 +450,7 @@ class PingPeer (object): if me._failures < me._retries: me._ping() else: - info = peerinfo(me._peer) - if 'connect' in info: - S.warn('watch', 'reconnecting', me._peer) - S.forcekx(me._peer) - T.spawn(T.Coroutine(connect), me._peer) - me._timer = M.SelTimer(time() + me._every, me._time) - else: - S.kill(me._peer) + me._reconnect() elif stuff[0] == 'ping-peer-died': me._pinger.kill(me._peer) @@ -536,7 +543,7 @@ class Pinger (T.Coroutine): if startup: ifname = S.ifname(peer) addr = S.addr(peer) - addpeer(info, peer, ifname, *addr) + T.defer(addpeer, info, peer, ifname, *addr) else: me.add(peer, info, True) return me @@ -607,9 +614,11 @@ def addpeer(info, peer, ifname, *addr): except KeyError: return if 'ifup' in info: - T.Coroutine(ifupdown).switch('ifup', peer, info, ifname, *addr) + T.Coroutine(ifupdown, name = 'ifup %s' % peer) \ + .switch('ifup', peer, info, ifname, *addr) if 'connect' in info: - T.Coroutine(connect).switch(peer, info['connect']) + T.Coroutine(connect, name = 'connect %s' % peer) \ + .switch(peer, info['connect']) if boolean(info, 'watch', False): pinger.add(peer, info, False) @@ -624,7 +633,8 @@ def delpeer(peer): except KeyError: pass if 'ifdown' in info: - T.Coroutine(ifupdown).switch('ifdown', peer, info) + T.Coroutine(ifupdown, name = 'ifdown %s' % peer) \ + .switch('ifdown', peer, info) def notify(_, code, *rest): """ @@ -650,7 +660,7 @@ def cmd_kick(peer): """ if peer not in pinger.adopted(): raise T.TripeJobError('peer-not-adopted', peer) - T.spawn(T.Coroutine(connect), peer) + T.spawn(connect, peer) def cmd_adopted(): """ @@ -680,7 +690,7 @@ def init(): errorwatch = ErrorWatch() childwatch = ChildWatch() pinger = Pinger() - T.Coroutine(dbwatch).switch() + T.Coroutine(dbwatch, name = 'dbwatch').switch() errorwatch.switch() pinger.switch()