X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~mdw/git/tripe/blobdiff_plain/984b6d3310be68c87d1f278102bc4a5ef61645ff..29b6a378ad93267a4de0863ffe042e432a70d353:/mon/tripemon.in diff --git a/mon/tripemon.in b/mon/tripemon.in index e666ab9f..413c8abb 100644 --- a/mon/tripemon.in +++ b/mon/tripemon.in @@ -56,46 +56,6 @@ def uncaught(): """Report an uncaught exception.""" excepthook(*exc_info()) -_idles = [] -def _runidles(): - """Invoke the functions on the idles queue.""" - global _idles - while _idles: - old = _idles - _idles = [] - for func, args, kw in old: - try: - func(*args, **kw) - except: - uncaught() - return False - -def idly(func, *args, **kw): - """Invoke FUNC(*ARGS, **KW) at some later point in time.""" - if not _idles: - GO.idle_add(_runidles) - _idles.append((func, args, kw)) - -_asides = T.Queue() -def _runasides(): - """ - Coroutine function: reads (FUNC, ARGS, KW) triples from a queue and invokes - FUNC(*ARGS, **KW) - """ - while True: - func, args, kw = _asides.get() - try: - func(*args, **kw) - except: - uncaught() - -def aside(func, *args, **kw): - """ - Arrange for FUNC(*ARGS, **KW) to be performed at some point in the future, - and not from the main coroutine. - """ - idly(_asides.put, (func, args, kw)) - def xwrap(func): """ Return a function which behaves like FUNC, but reports exceptions via @@ -123,13 +83,15 @@ def invoker(func, *args, **kw): def cr(func, *args, **kw): """Return a function which invokes FUNC(*ARGS, **KW) in a coroutine.""" - def _(*hunoz, **hukairz): - T.Coroutine(xwrap(func)).switch(*args, **kw) - return _ + name = T.funargstr(func, args, kw) + return lambda *hunoz, **hukairz: \ + T.Coroutine(xwrap(func), name = name).switch(*args, **kw) def incr(func): """Decorator: runs its function in a coroutine of its own.""" - return lambda *args, **kw: T.Coroutine(func).switch(*args, **kw) + return lambda *args, **kw: \ + (T.Coroutine(func, name = T.funargstr(func, args, kw)) + .switch(*args, **kw)) ###-------------------------------------------------------------------------- ### Random bits of infrastructure. @@ -170,12 +132,6 @@ class HookList (object): if rc is not None: return rc return None - def runidly(me, *args, **kw): - """ - Invoke the hook functions as for run, but at some point in the future. - """ - idly(me.run, *args, **kw) - class HookClient (object): """ Mixin for classes which are clients of hooks. @@ -215,6 +171,23 @@ rx_time = RX.compile(r'^(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)$') ###-------------------------------------------------------------------------- ### Connections. +class GIOWatcher (object): + """ + Monitor I/O events using glib. + """ + def __init__(me, conn, mc = GO.main_context_default()): + me._conn = conn + me._watch = None + me._mc = mc + def connected(me, sock): + me._watch = GO.io_add_watch(sock, GO.IO_IN, + lambda *hunoz: me._conn.receive()) + def disconnected(me): + GO.source_remove(me._watch) + me._watch = None + def iterate(me): + me._mc.iteration(True) + class Connection (T.TripeCommandDispatcher): """ The main connection to the server. @@ -245,18 +218,15 @@ class Connection (T.TripeCommandDispatcher): me.handler['NOTE'] = lambda _, *rest: me.notehook.run(*rest) me.handler['WARN'] = lambda _, *rest: me.warnhook.run(*rest) me.handler['TRACE'] = lambda _, *rest: me.tracehook.run(*rest) - me._watch = None + me.iowatch = GIOWatcher(me) def connected(me): """Handles reconnection to the server, and signals the hook.""" T.TripeCommandDispatcher.connected(me) - me._watch = GO.io_add_watch(me.sock, GO.IO_IN, invoker(me.receive)) me.connecthook.run() def disconnected(me, reason): """Handles disconnection from the server, and signals the hook.""" - GO.source_remove(me._watch) - me._watch = None me.disconnecthook.run(reason) T.TripeCommandDispatcher.disconnected(me, reason) @@ -498,27 +468,27 @@ class Monitor (HookClient): the auto-peers list. """ if code == 'ADD': - aside(me.peers.add, rest[0], None) + T.aside(me.peers.add, rest[0], None) elif code == 'KILL': - aside(me.peers.remove, rest[0]) + T.aside(me.peers.remove, rest[0]) elif code == 'NEWIFNAME': try: me.peers[rest[0]].setifname(rest[2]) except KeyError: pass elif code == 'SVCCLAIM': - aside(me.services.add, rest[0], rest[1]) + T.aside(me.services.add, rest[0], rest[1]) if rest[0] == 'connect': - aside(me._updateautopeers) + T.aside(me._updateautopeers) elif code == 'SVCRELEASE': - aside(me.services.remove, rest[0]) + T.aside(me.services.remove, rest[0]) if rest[0] == 'connect': - aside(me._updateautopeers) + T.aside(me._updateautopeers) elif code == 'USER': if not rest: return if rest[0] == 'watch' and \ rest[1] == 'peerdb-update': - aside(me._updateautopeers) + T.aside(me._updateautopeers) ###-------------------------------------------------------------------------- ### Window management cruft. @@ -949,9 +919,9 @@ class Pinger (T.Coroutine, HookClient): me.hook(conn.connecthook, me._connected) me.hook(conn.disconnecthook, me._disconnected) me.hook(monitor.peers.addhook, - lambda p: idly(me._q.put, (p, 'ADD', None))) + lambda p: T.defer(me._q.put, (p, 'ADD', None))) me.hook(monitor.peers.delhook, - lambda p: idly(me._q.put, (p, 'KILL', None))) + lambda p: T.defer(me._q.put, (p, 'KILL', None))) if conn.connectedp(): me.connected() def _connected(me): @@ -1093,7 +1063,7 @@ class AddPeerDialog (MyDialog): conn.add(name, addr, port, keepalive = keepalive, tunnel = tunnel) me.destroy() except T.TripeError, exc: - idly(moanbox, ' '.join(exc)) + T.defer(moanbox, ' '.join(exc)) class ServInfo (MyWindow): """ @@ -1319,7 +1289,8 @@ class PeerWindow (MyWindow): def tryupdate(me): """Start the updater coroutine, if it's not going already.""" if me.cr is None: - me.cr = T.Coroutine(me._update) + me.cr = T.Coroutine(me._update, + name = 'update-peer-window %s' % me.peer.name) me.cr.switch() def stopupdate(me, *hunoz, **hukairz): @@ -1349,7 +1320,7 @@ class CryptoInfo (MyWindow): def __init__(me): MyWindow.__init__(me) me.set_title('Cryptographic algorithms') - aside(me.populate) + T.aside(me.populate) def populate(me): table = GridPacker() me.add(table) @@ -1646,7 +1617,8 @@ class MonitorWindow (MyWindow): """ if me._kidding: return - T.Coroutine(me._addautopeer_hack).switch(peer) + T.Coroutine(me._addautopeer_hack, + name = '_addautopeerhack %s' % peer).switch(peer) def _addautopeer_hack(me, peer): """Make an automated connection to PEER in response to a user click.""" @@ -1655,7 +1627,7 @@ class MonitorWindow (MyWindow): try: T._simple(conn.svcsubmit('connect', 'active', peer)) except T.TripeError, exc: - idly(moanbox, ' '.join(exc.args)) + T.defer(moanbox, ' '.join(exc.args)) me.apchange() def activate(me, l, path, col): @@ -1780,9 +1752,6 @@ def init(opts): global conn, monitor, pinger - ## Run jobs put off for later. - T.Coroutine(_runasides).switch() - ## Try to establish a connection. conn = Connection(opts.tripesock) @@ -1800,7 +1769,7 @@ def main(): ## Main loop. HookClient().hook(root.closehook, exit) - G.main() + conn.mainloop() if __name__ == '__main__': opts = parse_options()