X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~mdw/git/tripe/blobdiff_plain/6005ef9bfba49124a25825a5b044d4f4cbf02792..a4f886c34289b7acc6a700d774a2f4f583719ae2:/py/tripe.py.in diff --git a/py/tripe.py.in b/py/tripe.py.in index 79060106..37de5a40 100644 --- a/py/tripe.py.in +++ b/py/tripe.py.in @@ -31,7 +31,7 @@ implementing services. Rather than end up in lost in a storm of little event-driven classes, or a morass of concurrent threads, the module uses coroutines to present a fairly simple function call/return interface to potentially long-running commands -which must run without blocking the main process. It sassumes a coroutine +which must run without blocking the main process. It assumes a coroutine module presenting a subset of the `greenlet' interface: if actual greenlets are available, they are used; otherwise there's an implementation in terms of threads (with lots of locking) which will do instead. @@ -75,14 +75,14 @@ Classes: TripeServiceJob OptParse Queue + SelIOWatcher TripeCommand TripeSynchronousCommand TripeAsynchronousCommand TripeCommandIterator TripeConnection TripeCommandDispatcher - SelCommandDispatcher - TripeServiceManager + TripeServiceManager TripeService TripeServiceCommand @@ -128,7 +128,9 @@ class Coroutine (_Coroutine): """ def switch(me, *args, **kw): assert _Coroutine.getcurrent() is rootcr + if _debug: print '* %s' % me _Coroutine.switch(me, *args, **kw) + if _debug: print '* %s' % rootcr ###-------------------------------------------------------------------------- ### Default places for things. @@ -187,6 +189,7 @@ class TripeConnection (object): me.socket = socket me.sock = None me.lbuf = None + me.iowatch = SelIOWatcher(me) def connect(me): """ @@ -274,13 +277,13 @@ class TripeConnection (object): To be overridden by subclasses to react to a connection being established. """ - pass + me.iowatch.connected(me.sock) def disconnected(me, reason): """ To be overridden by subclasses to react to a connection being severed. """ - pass + me.iowatch.disconnected() def eof(me): """To be overridden by subclasses to handle end-of-file.""" @@ -290,6 +293,110 @@ class TripeConnection (object): """To be overridden by subclasses to handle incoming lines.""" pass +###-------------------------------------------------------------------------- +### I/O loop integration. + +class SelIOWatcher (object): + """ + Integration with mLib's I/O event system. + + You can replace this object with a different one for integration with, + e.g., glib's main loop, by setting CONN.iowatcher to a different object + while the CONN is disconnected. + """ + + def __init__(me, conn): + me._conn = conn + me._selfile = None + + def connected(me, sock): + """ + Called when a connection is made. + + SOCK is the socket. The watcher must arrange to call CONN.receive when + data is available. + """ + me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive) + me._selfile.enable() + + def disconnected(me): + """ + Called when the connection is lost. + """ + me._selfile = None + + def iterate(me): + """ + Wait for something interesting to happen, and issue events. + + That is, basically, do one iteration of a main select loop, processing + all of the events, and then return. This isn't needed for + TripeCommandDispatcher, but runservices wants it. + """ + M.select() + +###-------------------------------------------------------------------------- +### Inter-coroutine communication. + +class Queue (object): + """ + A queue of things arriving asynchronously. + + This is a very simple single-reader multiple-writer queue. It's useful for + more complex coroutines which need to cope with a variety of possible + incoming events. + """ + + def __init__(me): + """Create a new empty queue.""" + me.contents = M.Array() + me.waiter = None + + def _wait(me): + """ + Internal: wait for an item to arrive in the queue. + + Complain if someone is already waiting, because this is just a + single-reader queue. + """ + if me.waiter: + raise ValueError('queue already being waited on') + try: + me.waiter = Coroutine.getcurrent() + while not me.contents: + me.waiter.parent.switch() + finally: + me.waiter = None + + def get(me): + """ + Remove and return the item at the head of the queue. + + If the queue is empty, wait until an item arrives. + """ + me._wait() + return me.contents.shift() + + def peek(me): + """ + Return the item at the head of the queue without removing it. + + If the queue is empty, wait until an item arrives. + """ + me._wait() + return me.contents[0] + + def put(me, thing): + """ + Write THING to the queue. + + If someone is waiting on the queue, wake him up immediately; otherwise + just leave the item there for later. + """ + me.contents.push(thing) + if me.waiter: + me.waiter.switch() + ###-------------------------------------------------------------------------- ### Dispatching coroutine. @@ -499,6 +606,40 @@ def _kwopts(kw, allowed): opts.append('--') return opts +## Deferral. +_deferq = [] +def defer(func, *args, **kw): + """Call FUNC(*ARGS, **KW) later, in the root coroutine.""" + _deferq.append((func, args, kw)) + +def funargstr(func, args, kw): + items = [repr(a) for a in args] + for k, v in kw.iteritems(): + items.append('%s = %r' % (k, v)) + return '%s(%s)' % (func.__name__, ', '.join(items)) + +def spawn(func, *args, **kw): + """Call FUNC, passing ARGS and KW, in a fresh coroutine.""" + defer(lambda: (Coroutine(func, name = funargstr(func, args, kw)) + .switch(*args, **kw))) + +## Asides. +_asideq = Queue() +def _runasides(): + """ + Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW). + """ + while True: + func, args, kw = _asideq.get() + try: + func(*args, **kw) + except: + SYS.excepthook(*SYS.exc_info()) + +def aside(func, *args, **kw): + """Call FUNC(*ARGS, **KW) later, in a non-root coroutine.""" + defer(_asideq.put, (func, args, kw)) + class TripeCommandDispatcher (TripeConnection): """ Command dispatcher. @@ -557,6 +698,30 @@ class TripeCommandDispatcher (TripeConnection): for i in 'OK', 'INFO', 'FAIL': me.handler[i] = me._fgresponse + def quitp(me): + """Should we quit the main loop? Subclasses should override.""" + return False + + def mainloop(me, quitp = None): + """ + Iterate the I/O watcher until QUITP returns true. + + Arranges for asides and deferred calls to be made at the right times. + """ + + global _deferq + assert _Coroutine.getcurrent() is rootcr + Coroutine(_runasides, name = '_runasides').switch() + if quitp is None: + quitp = me.quitp + while not quitp(): + while _deferq: + q = _deferq + _deferq = [] + for func, args, kw in q: + func(*args, **kw) + me.iowatch.iterate() + def connected(me): """ Connection hook. @@ -566,6 +731,7 @@ class TripeCommandDispatcher (TripeConnection): """ me.queue = M.Array() me.cmd = {} + TripeConnection.connected(me) def disconnected(me, reason): """ @@ -574,6 +740,7 @@ class TripeCommandDispatcher (TripeConnection): If a subclass hooks overrides this method, it must call us; sends a special CONNERR code to all incomplete commands. """ + TripeConnection.disconnected(me, reason) for cmd in me.cmd.itervalues(): cmd.response('CONNERR', reason) for cmd in me.queue: @@ -663,7 +830,8 @@ class TripeCommandDispatcher (TripeConnection): def add(me, peer, *addr, **kw): return _simple(me.command(bg = True, *['ADD'] + - _kwopts(kw, ['tunnel', 'keepalive', 'cork']) + + _kwopts(kw, ['tunnel', 'keepalive', + 'key', 'cork']) + [peer] + list(addr))) def addr(me, peer): @@ -751,65 +919,6 @@ class TripeCommandDispatcher (TripeConnection): ###-------------------------------------------------------------------------- ### Asynchronous commands. -class Queue (object): - """ - A queue of things arriving asynchronously. - - This is a very simple single-reader multiple-writer queue. It's useful for - more complex coroutines which need to cope with a variety of possible - incoming events. - """ - - def __init__(me): - """Create a new empty queue.""" - me.contents = M.Array() - me.waiter = None - - def _wait(me): - """ - Internal: wait for an item to arrive in the queue. - - Complain if someone is already waiting, because this is just a - single-reader queue. - """ - if me.waiter: - raise ValueError('queue already being waited on') - try: - me.waiter = Coroutine.getcurrent() - while not me.contents: - me.waiter.parent.switch() - finally: - me.waiter = None - - def get(me): - """ - Remove and return the item at the head of the queue. - - If the queue is empty, wait until an item arrives. - """ - me._wait() - return me.contents.shift() - - def peek(me): - """ - Return the item at the head of the queue without removing it. - - If the queue is empty, wait until an item arrives. - """ - me._wait() - return me.contents[0] - - def put(me, thing): - """ - Write THING to the queue. - - If someone is waiting on the queue, wake him up immediately; otherwise - just leave the item there for later. - """ - me.contents.push(thing) - if me.waiter: - me.waiter.switch() - class TripeAsynchronousCommand (TripeCommand): """ Asynchronous commands. @@ -836,37 +945,6 @@ class TripeAsynchronousCommand (TripeCommand): """Handle a server response by writing it to the caller's queue.""" me.queue.put((me.tag, code, list(stuff))) -###-------------------------------------------------------------------------- -### Selecting command dispatcher. - -class SelCommandDispatcher (TripeCommandDispatcher): - """ - A command dispatcher which integrates with mLib's I/O-event system. - - To use, simply create an instance and run mLib.select in a loop in your - main coroutine. - """ - - def __init__(me, socket): - """ - Create an instance; SOCKET is the admin socket to connect to. - - Note that no connection is made initially. - """ - TripeCommandDispatcher.__init__(me, socket) - me.selfile = None - - def connected(me): - """Connection hook: wires itself into the mLib select machinery.""" - TripeCommandDispatcher.connected(me) - me.selfile = M.SelFile(me.sock.fileno(), M.SEL_READ, me.receive) - me.selfile.enable() - - def disconnected(me, reason): - """Disconnection hook: removes itself from the mLib select machinery.""" - TripeCommandDispatcher.disconnected(me, reason) - me.selfile = None - ###-------------------------------------------------------------------------- ### Services. @@ -894,7 +972,7 @@ class TripeSyntaxError (Exception): """ pass -class TripeServiceManager (SelCommandDispatcher): +class TripeServiceManager (TripeCommandDispatcher): """ A command dispatcher with added handling for incoming service requests. @@ -940,7 +1018,7 @@ class TripeServiceManager (SelCommandDispatcher): SOCKET is the administration socket to connect to. """ - SelCommandDispatcher.__init__(me, socket) + TripeCommandDispatcher.__init__(me, socket) me.svc = {} me.job = {} me.runningp = False @@ -996,7 +1074,7 @@ class TripeServiceManager (SelCommandDispatcher): process can quit without anyone caring). """ return me._quitp or (me.runningp and ((not me.svc and not me.job) or - not me.selfile)) + not me.sock)) def quit(me): """Forces the quit flag (returned by quitp) on.""" @@ -1189,7 +1267,6 @@ def _setupsvc(tab, func): svcmgr.running() svcmgr = TripeServiceManager(None) -_spawnq = [] def runservices(socket, tab, init = None, setup = None, daemon = False): """ Function to start a service provider. @@ -1223,7 +1300,6 @@ def runservices(socket, tab, init = None, setup = None, daemon = False): quit. """ - global _spawnq svcmgr.socket = socket svcmgr.connect() svcs = [] @@ -1240,22 +1316,8 @@ def runservices(socket, tab, init = None, setup = None, daemon = False): M.daemonize() if init is not None: init() - Coroutine(_setupsvc).switch(svcs, setup) - while not svcmgr.quitp(): - for cr, args, kw in _spawnq: - cr.switch(*args, **kw) - _spawnq = [] - M.select() - -def spawn(cr, *args, **kw): - """ - Utility for spawning coroutines. - - The coroutine CR is made to be a direct child of the root coroutine, and - invoked by it with the given arguments. - """ - cr.parent = rootcr - _spawnq.append((cr, args, kw)) + spawn(_setupsvc, svcs, setup) + svcmgr.mainloop() ###-------------------------------------------------------------------------- ### Utilities for services.