X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~mdw/git/tripe/blobdiff_plain/22b4755242c39bc8e948e4b714bea80433882a11..1171524232347e4c8fbd47409577c5e988ebc6d6:/svc/conntrack.in diff --git a/svc/conntrack.in b/svc/conntrack.in index eabdc2b1..a7431321 100644 --- a/svc/conntrack.in +++ b/svc/conntrack.in @@ -214,6 +214,7 @@ def localaddr(peer): _kick = T.Queue() def kickpeers(): + lastip = {} while True: upness, reason = _kick.get() @@ -233,35 +234,60 @@ def kickpeers(): addr = localaddr(CF.testaddr) if addr is None: upness = False + else: + addr = None ## Now decide what to do. changes = [] for g, pp in CF.groups: ## Find out which peer in the group ought to be active. - want = None # unequal to any string - if upness: - for t, p, a, m in pp: - if p is None: - aq = addr + ip = None + map = {} + want = None + for t, p, a, m in pp: + if p is None or not upness: + ipq = addr + else: + ipq = localaddr(p) + if upness and ip is None and \ + ipq is not None and (ipq & m) == a: + map[t] = 'up' + if t == 'down' or t.startswith('down/'): + want = None else: - aq = localaddr(p) - if aq is not None and (aq & m) == a: want = t - break + ip = ipq + else: + map[t] = 'down' ## Shut down the wrong ones. found = False for p in peers: - if p == want: + what = map.get(p, 'leave') + if what == 'up': found = True - elif p.startswith(g) and p != want: - changes.append(lambda p=p: SM.kill(p)) + elif what == 'down': + def _(p = p): + try: + SM.kill(p) + except T.TripeError, exc: + if exc.args[0] == 'unknown-peer': + ## Inherently racy; don't worry about this. + pass + else: + raise + changes.append(_) ## Start the right one if necessary. - if want is not None and not found: - changes.append(lambda: T._simple(SM.svcsubmit('connect', 'active', - want))) + if want is not None and (not found or ip != lastip.get(g, None)): + def _(want = want): + try: + SM.svcsubmit('connect', 'active', want) + except T.TripeError, exc: + SM.warn('conntrack', 'connect-failed', want, *exc.args) + changes.append(_) + lastip[g] = ip ## Commit the changes. if changes: @@ -388,6 +414,7 @@ class DBusMonitor (object): """ me._mons = [] me._loop = D.mainloop.glib.DBusGMainLoop() + me._state = 'startup' me._reconnect() def addmon(me, mon): @@ -402,12 +429,20 @@ class DBusMonitor (object): if me._bus is not None: mon.attach(me._bus) - def _reconnect(me): + def _reconnect(me, hunoz = None): """ Start connecting to the bus. If we fail the first time, retry periodically. """ + if me._state == 'startup': + T.aside(SM.notify, 'conntrack', 'dbus-connection', 'startup') + elif me._state == 'connected': + T.aside(SM.notify, 'conntrack', 'dbus-connection', 'lost') + else: + T.aside(SM.notify, 'conntrack', 'dbus-connection', + 'state=%s' % me._state) + me._state == 'reconnecting' me._bus = None if me._try_connect(): G.timeout_add_seconds(5, me._try_connect) @@ -419,13 +454,21 @@ class DBusMonitor (object): If we succeed, attach the monitors. """ try: - bus = D.SystemBus(mainloop = me._loop, private = True) - except D.DBusException: + addr = OS.getenv('TRIPE_CONNTRACK_BUS') + if addr == 'SESSION': + bus = D.SessionBus(mainloop = me._loop, private = True) + elif addr is not None: + bus = D.bus.BusConnection(addr, mainloop = me._loop) + else: + bus = D.SystemBus(mainloop = me._loop, private = True) + for m in me._mons: + m.attach(bus) + except D.DBusException, e: return True me._bus = bus + me._state = 'connected' bus.call_on_disconnection(me._reconnect) - for m in me._mons: - m.attach(bus) + T.aside(SM.notify, 'conntrack', 'dbus-connection', 'connected') return False ###-------------------------------------------------------------------------- @@ -457,12 +500,13 @@ def init(): Add the D-Bus monitor here, because we might send commands off immediately, and we want to make sure the server connection is up. """ + global DBM T.Coroutine(kickpeers, name = 'kickpeers').switch() - dbm = DBusMonitor() - dbm.addmon(NetworkManagerMonitor()) - dbm.addmon(MaemoICdMonitor()) - G.timeout_add_seconds(300, lambda: (netupdown(True, ['interval-timer']) - or True)) + DBM = DBusMonitor() + DBM.addmon(NetworkManagerMonitor()) + DBM.addmon(MaemoICdMonitor()) + G.timeout_add_seconds(30, lambda: (netupdown(True, ['interval-timer']) + or True)) def parse_options(): """