From 690a6ec147f76213263b58a8a7fd82b28dde349a Mon Sep 17 00:00:00 2001 Message-Id: <690a6ec147f76213263b58a8a7fd82b28dde349a.1714469568.git.mdw@distorted.org.uk> From: Mark Wooding Date: Sat, 8 May 2010 14:46:52 +0100 Subject: [PATCH 1/1] python: Refactoring of I/O events and suchlike. Organization: Straylight/Edgeware From: Mark Wooding There are a number of problems with the current arrangement. * The service manager main loop is tied to getting I/O events through mLib. There's a new service coming up which wants to collect D-Bus signals, and that involves using the glib event loop instead. * Some services seem to (still) have coroutine scheduling bugs: they're calling functions from the wrong coroutine. The utilities in tripemon would be useful, but they're tied to the glib event loop. So here's the proposed fix. * SelCommandDispatcher is a mistake. Split its functionality out into a separate class which can be plugged into TripeConnection or one of its descendants, so that clients which want a glib-based dispatcher can replace it without having to overturn the class hierarchy. * Move the scheduling utilities (`idly', `aside') from tripemon into tripe.py. Make them available from services by providing a main loop in TripeConnection. A few more wrinkles. * `idly' is renamed `defer' because that's what it does now: rather than waiting until there are no events incoming, it clears the decks of all deferred operations before looking for more events. * The interface of `spawn' has changed. You provide a function to call in a new coroutine rather than a freshly minted coroutine that needs hacking. All uses of `spawn' fit this pattern, so it's not a big deal, but it's worth mentioning. --- mon/tripemon.in | 97 ++++++----------- py/tripe.py.in | 280 ++++++++++++++++++++++++++++-------------------- svc/watch.in | 21 ++-- 3 files changed, 208 insertions(+), 190 deletions(-) diff --git a/mon/tripemon.in b/mon/tripemon.in index e666ab9f..b898ec98 100644 --- a/mon/tripemon.in +++ b/mon/tripemon.in @@ -56,46 +56,6 @@ def uncaught(): """Report an uncaught exception.""" excepthook(*exc_info()) -_idles = [] -def _runidles(): - """Invoke the functions on the idles queue.""" - global _idles - while _idles: - old = _idles - _idles = [] - for func, args, kw in old: - try: - func(*args, **kw) - except: - uncaught() - return False - -def idly(func, *args, **kw): - """Invoke FUNC(*ARGS, **KW) at some later point in time.""" - if not _idles: - GO.idle_add(_runidles) - _idles.append((func, args, kw)) - -_asides = T.Queue() -def _runasides(): - """ - Coroutine function: reads (FUNC, ARGS, KW) triples from a queue and invokes - FUNC(*ARGS, **KW) - """ - while True: - func, args, kw = _asides.get() - try: - func(*args, **kw) - except: - uncaught() - -def aside(func, *args, **kw): - """ - Arrange for FUNC(*ARGS, **KW) to be performed at some point in the future, - and not from the main coroutine. - """ - idly(_asides.put, (func, args, kw)) - def xwrap(func): """ Return a function which behaves like FUNC, but reports exceptions via @@ -170,12 +130,6 @@ class HookList (object): if rc is not None: return rc return None - def runidly(me, *args, **kw): - """ - Invoke the hook functions as for run, but at some point in the future. - """ - idly(me.run, *args, **kw) - class HookClient (object): """ Mixin for classes which are clients of hooks. @@ -215,6 +169,23 @@ rx_time = RX.compile(r'^(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)$') ###-------------------------------------------------------------------------- ### Connections. +class GIOWatcher (object): + """ + Monitor I/O events using glib. + """ + def __init__(me, conn, mc = GO.main_context_default()): + me._conn = conn + me._watch = None + me._mc = mc + def connected(me, sock): + me._watch = GO.io_add_watch(sock, GO.IO_IN, + lambda *hunoz: me._conn.receive()) + def disconnected(me): + GO.source_remove(me._watch) + me._watch = None + def iterate(me): + me._mc.iteration(True) + class Connection (T.TripeCommandDispatcher): """ The main connection to the server. @@ -245,18 +216,15 @@ class Connection (T.TripeCommandDispatcher): me.handler['NOTE'] = lambda _, *rest: me.notehook.run(*rest) me.handler['WARN'] = lambda _, *rest: me.warnhook.run(*rest) me.handler['TRACE'] = lambda _, *rest: me.tracehook.run(*rest) - me._watch = None + me.iowatch = GIOWatcher(me) def connected(me): """Handles reconnection to the server, and signals the hook.""" T.TripeCommandDispatcher.connected(me) - me._watch = GO.io_add_watch(me.sock, GO.IO_IN, invoker(me.receive)) me.connecthook.run() def disconnected(me, reason): """Handles disconnection from the server, and signals the hook.""" - GO.source_remove(me._watch) - me._watch = None me.disconnecthook.run(reason) T.TripeCommandDispatcher.disconnected(me, reason) @@ -498,27 +466,27 @@ class Monitor (HookClient): the auto-peers list. """ if code == 'ADD': - aside(me.peers.add, rest[0], None) + T.aside(me.peers.add, rest[0], None) elif code == 'KILL': - aside(me.peers.remove, rest[0]) + T.aside(me.peers.remove, rest[0]) elif code == 'NEWIFNAME': try: me.peers[rest[0]].setifname(rest[2]) except KeyError: pass elif code == 'SVCCLAIM': - aside(me.services.add, rest[0], rest[1]) + T.aside(me.services.add, rest[0], rest[1]) if rest[0] == 'connect': - aside(me._updateautopeers) + T.aside(me._updateautopeers) elif code == 'SVCRELEASE': - aside(me.services.remove, rest[0]) + T.aside(me.services.remove, rest[0]) if rest[0] == 'connect': - aside(me._updateautopeers) + T.aside(me._updateautopeers) elif code == 'USER': if not rest: return if rest[0] == 'watch' and \ rest[1] == 'peerdb-update': - aside(me._updateautopeers) + T.aside(me._updateautopeers) ###-------------------------------------------------------------------------- ### Window management cruft. @@ -949,9 +917,9 @@ class Pinger (T.Coroutine, HookClient): me.hook(conn.connecthook, me._connected) me.hook(conn.disconnecthook, me._disconnected) me.hook(monitor.peers.addhook, - lambda p: idly(me._q.put, (p, 'ADD', None))) + lambda p: T.defer(me._q.put, (p, 'ADD', None))) me.hook(monitor.peers.delhook, - lambda p: idly(me._q.put, (p, 'KILL', None))) + lambda p: T.defer(me._q.put, (p, 'KILL', None))) if conn.connectedp(): me.connected() def _connected(me): @@ -1093,7 +1061,7 @@ class AddPeerDialog (MyDialog): conn.add(name, addr, port, keepalive = keepalive, tunnel = tunnel) me.destroy() except T.TripeError, exc: - idly(moanbox, ' '.join(exc)) + T.defer(moanbox, ' '.join(exc)) class ServInfo (MyWindow): """ @@ -1349,7 +1317,7 @@ class CryptoInfo (MyWindow): def __init__(me): MyWindow.__init__(me) me.set_title('Cryptographic algorithms') - aside(me.populate) + T.aside(me.populate) def populate(me): table = GridPacker() me.add(table) @@ -1655,7 +1623,7 @@ class MonitorWindow (MyWindow): try: T._simple(conn.svcsubmit('connect', 'active', peer)) except T.TripeError, exc: - idly(moanbox, ' '.join(exc.args)) + T.defer(moanbox, ' '.join(exc.args)) me.apchange() def activate(me, l, path, col): @@ -1780,9 +1748,6 @@ def init(opts): global conn, monitor, pinger - ## Run jobs put off for later. - T.Coroutine(_runasides).switch() - ## Try to establish a connection. conn = Connection(opts.tripesock) @@ -1800,7 +1765,7 @@ def main(): ## Main loop. HookClient().hook(root.closehook, exit) - G.main() + conn.mainloop() if __name__ == '__main__': opts = parse_options() diff --git a/py/tripe.py.in b/py/tripe.py.in index e88f379f..2e21885b 100644 --- a/py/tripe.py.in +++ b/py/tripe.py.in @@ -75,14 +75,14 @@ Classes: TripeServiceJob OptParse Queue + SelIOWatcher TripeCommand TripeSynchronousCommand TripeAsynchronousCommand TripeCommandIterator TripeConnection TripeCommandDispatcher - SelCommandDispatcher - TripeServiceManager + TripeServiceManager TripeService TripeServiceCommand @@ -187,6 +187,7 @@ class TripeConnection (object): me.socket = socket me.sock = None me.lbuf = None + me.iowatch = SelIOWatcher(me) def connect(me): """ @@ -274,13 +275,13 @@ class TripeConnection (object): To be overridden by subclasses to react to a connection being established. """ - pass + me.iowatch.connected(me.sock) def disconnected(me, reason): """ To be overridden by subclasses to react to a connection being severed. """ - pass + me.iowatch.disconnected() def eof(me): """To be overridden by subclasses to handle end-of-file.""" @@ -290,6 +291,110 @@ class TripeConnection (object): """To be overridden by subclasses to handle incoming lines.""" pass +###-------------------------------------------------------------------------- +### I/O loop integration. + +class SelIOWatcher (object): + """ + Integration with mLib's I/O event system. + + You can replace this object with a different one for integration with, + e.g., glib's main loop, by setting CONN.iowatcher to a different object + while the CONN is disconnected. + """ + + def __init__(me, conn): + me._conn = conn + me._selfile = None + + def connected(me, sock): + """ + Called when a connection is made. + + SOCK is the socket. The watcher must arrange to call CONN.receive when + data is available. + """ + me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive) + me._selfile.enable() + + def disconnected(me): + """ + Called when the connection is lost. + """ + me._selfile = None + + def iterate(me): + """ + Wait for something interesting to happen, and issue events. + + That is, basically, do one iteration of a main select loop, processing + all of the events, and then return. This isn't needed for + TripeCommandDispatcher, but runservices wants it. + """ + M.select() + +###-------------------------------------------------------------------------- +### Inter-coroutine communication. + +class Queue (object): + """ + A queue of things arriving asynchronously. + + This is a very simple single-reader multiple-writer queue. It's useful for + more complex coroutines which need to cope with a variety of possible + incoming events. + """ + + def __init__(me): + """Create a new empty queue.""" + me.contents = M.Array() + me.waiter = None + + def _wait(me): + """ + Internal: wait for an item to arrive in the queue. + + Complain if someone is already waiting, because this is just a + single-reader queue. + """ + if me.waiter: + raise ValueError('queue already being waited on') + try: + me.waiter = Coroutine.getcurrent() + while not me.contents: + me.waiter.parent.switch() + finally: + me.waiter = None + + def get(me): + """ + Remove and return the item at the head of the queue. + + If the queue is empty, wait until an item arrives. + """ + me._wait() + return me.contents.shift() + + def peek(me): + """ + Return the item at the head of the queue without removing it. + + If the queue is empty, wait until an item arrives. + """ + me._wait() + return me.contents[0] + + def put(me, thing): + """ + Write THING to the queue. + + If someone is waiting on the queue, wake him up immediately; otherwise + just leave the item there for later. + """ + me.contents.push(thing) + if me.waiter: + me.waiter.switch() + ###-------------------------------------------------------------------------- ### Dispatching coroutine. @@ -499,6 +604,33 @@ def _kwopts(kw, allowed): opts.append('--') return opts +## Deferral. +_deferq = [] +def defer(func, *args, **kw): + """Call FUNC(*ARGS, **KW) later, in the root coroutine.""" + _deferq.append((func, args, kw)) + +def spawn(func, *args, **kw): + """Call FUNC, passing ARGS and KW, in a fresh coroutine.""" + defer(lambda: Coroutine(func).switch(*args, **kw)) + +## Asides. +_asideq = Queue() +def _runasides(): + """ + Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW). + """ + while True: + func, args, kw = _asideq.get() + try: + func(*args, **kw) + except: + SYS.excepthook(*SYS.exc_info()) + +def aside(func, *args, **kw): + """Call FUNC(*ARGS, **KW) later, in a non-root coroutine.""" + defer(_asideq.put, (func, args, kw)) + class TripeCommandDispatcher (TripeConnection): """ Command dispatcher. @@ -557,6 +689,28 @@ class TripeCommandDispatcher (TripeConnection): for i in 'OK', 'INFO', 'FAIL': me.handler[i] = me._fgresponse + def quitp(me): + """Should we quit the main loop? Subclasses should override.""" + return False + + def mainloop(me, quitp = None): + """ + Iterate the I/O watcher until QUITP returns true. + + Arranges for asides and deferred calls to be made at the right times. + """ + + global _deferq + assert _Coroutine.getcurrent() is rootcr + Coroutine(_runasides).switch() + while not quitp(): + while _deferq: + q = _deferq + _deferq = [] + for func, args, kw in q: + func(*args, **kw) + me.iowatch.iterate() + def connected(me): """ Connection hook. @@ -566,6 +720,7 @@ class TripeCommandDispatcher (TripeConnection): """ me.queue = M.Array() me.cmd = {} + TripeConnection.connected(me) def disconnected(me, reason): """ @@ -574,6 +729,7 @@ class TripeCommandDispatcher (TripeConnection): If a subclass hooks overrides this method, it must call us; sends a special CONNERR code to all incomplete commands. """ + TripeConnection.disconnected(me, reason) for cmd in me.cmd.itervalues(): cmd.response('CONNERR', reason) for cmd in me.queue: @@ -751,65 +907,6 @@ class TripeCommandDispatcher (TripeConnection): ###-------------------------------------------------------------------------- ### Asynchronous commands. -class Queue (object): - """ - A queue of things arriving asynchronously. - - This is a very simple single-reader multiple-writer queue. It's useful for - more complex coroutines which need to cope with a variety of possible - incoming events. - """ - - def __init__(me): - """Create a new empty queue.""" - me.contents = M.Array() - me.waiter = None - - def _wait(me): - """ - Internal: wait for an item to arrive in the queue. - - Complain if someone is already waiting, because this is just a - single-reader queue. - """ - if me.waiter: - raise ValueError('queue already being waited on') - try: - me.waiter = Coroutine.getcurrent() - while not me.contents: - me.waiter.parent.switch() - finally: - me.waiter = None - - def get(me): - """ - Remove and return the item at the head of the queue. - - If the queue is empty, wait until an item arrives. - """ - me._wait() - return me.contents.shift() - - def peek(me): - """ - Return the item at the head of the queue without removing it. - - If the queue is empty, wait until an item arrives. - """ - me._wait() - return me.contents[0] - - def put(me, thing): - """ - Write THING to the queue. - - If someone is waiting on the queue, wake him up immediately; otherwise - just leave the item there for later. - """ - me.contents.push(thing) - if me.waiter: - me.waiter.switch() - class TripeAsynchronousCommand (TripeCommand): """ Asynchronous commands. @@ -836,37 +933,6 @@ class TripeAsynchronousCommand (TripeCommand): """Handle a server response by writing it to the caller's queue.""" me.queue.put((me.tag, code, list(stuff))) -###-------------------------------------------------------------------------- -### Selecting command dispatcher. - -class SelCommandDispatcher (TripeCommandDispatcher): - """ - A command dispatcher which integrates with mLib's I/O-event system. - - To use, simply create an instance and run mLib.select in a loop in your - main coroutine. - """ - - def __init__(me, socket): - """ - Create an instance; SOCKET is the admin socket to connect to. - - Note that no connection is made initially. - """ - TripeCommandDispatcher.__init__(me, socket) - me.selfile = None - - def connected(me): - """Connection hook: wires itself into the mLib select machinery.""" - TripeCommandDispatcher.connected(me) - me.selfile = M.SelFile(me.sock.fileno(), M.SEL_READ, me.receive) - me.selfile.enable() - - def disconnected(me, reason): - """Disconnection hook: removes itself from the mLib select machinery.""" - TripeCommandDispatcher.disconnected(me, reason) - me.selfile = None - ###-------------------------------------------------------------------------- ### Services. @@ -894,7 +960,7 @@ class TripeSyntaxError (Exception): """ pass -class TripeServiceManager (SelCommandDispatcher): +class TripeServiceManager (TripeCommandDispatcher): """ A command dispatcher with added handling for incoming service requests. @@ -940,7 +1006,7 @@ class TripeServiceManager (SelCommandDispatcher): SOCKET is the administration socket to connect to. """ - SelCommandDispatcher.__init__(me, socket) + TripeCommandDispatcher.__init__(me, socket) me.svc = {} me.job = {} me.runningp = False @@ -996,7 +1062,7 @@ class TripeServiceManager (SelCommandDispatcher): process can quit without anyone caring). """ return me._quitp or (me.runningp and ((not me.svc and not me.job) or - not me.selfile)) + not me.sock)) def quit(me): """Forces the quit flag (returned by quitp) on.""" @@ -1189,7 +1255,6 @@ def _setupsvc(tab, func): svcmgr.running() svcmgr = TripeServiceManager(None) -_spawnq = [] def runservices(socket, tab, init = None, setup = None, daemon = False): """ Function to start a service provider. @@ -1223,7 +1288,6 @@ def runservices(socket, tab, init = None, setup = None, daemon = False): quit. """ - global _spawnq svcmgr.socket = socket svcmgr.connect() svcs = [] @@ -1240,22 +1304,8 @@ def runservices(socket, tab, init = None, setup = None, daemon = False): M.daemonize() if init is not None: init() - Coroutine(_setupsvc).switch(svcs, setup) - while not svcmgr.quitp(): - for cr, args, kw in _spawnq: - cr.switch(*args, **kw) - _spawnq = [] - M.select() - -def spawn(cr, *args, **kw): - """ - Utility for spawning coroutines. - - The coroutine CR is made to be a direct child of the root coroutine, and - invoked by it with the given arguments. - """ - cr.parent = rootcr - _spawnq.append((cr, args, kw)) + spawn(_setupsvc, svcs, setup) + svcmgr.mainloop() ###-------------------------------------------------------------------------- ### Utilities for services. diff --git a/svc/watch.in b/svc/watch.in index bfad160c..be184b47 100644 --- a/svc/watch.in +++ b/svc/watch.in @@ -405,6 +405,16 @@ class PingPeer (object): '--', me._peer])) + def _reconnect(me): + info = peerinfo(me._peer) + if 'connect' in info: + S.warn('watch', 'reconnecting', me._peer) + S.forcekx(me._peer) + T.spawn(connect, me._peer) + me._timer = M.SelTimer(time() + me._every, me._time) + else: + S.kill(me._peer) + def event(me, code, stuff): """ Respond to an event which happened to this peer. @@ -436,14 +446,7 @@ class PingPeer (object): if me._failures < me._retries: me._ping() else: - info = peerinfo(me._peer) - if 'connect' in info: - S.warn('watch', 'reconnecting', me._peer) - S.forcekx(me._peer) - T.spawn(T.Coroutine(connect), me._peer) - me._timer = M.SelTimer(time() + me._every, me._time) - else: - S.kill(me._peer) + me._reconnect() elif stuff[0] == 'ping-peer-died': me._pinger.kill(me._peer) @@ -650,7 +653,7 @@ def cmd_kick(peer): """ if peer not in pinger.adopted(): raise T.TripeJobError('peer-not-adopted', peer) - T.spawn(T.Coroutine(connect), peer) + T.spawn(connect, peer) def cmd_adopted(): """ -- [mdw]