TripeServiceJob
OptParse
Queue
+ SelIOWatcher
TripeCommand
TripeSynchronousCommand
TripeAsynchronousCommand
TripeCommandIterator
TripeConnection
TripeCommandDispatcher
- SelCommandDispatcher
- TripeServiceManager
+ TripeServiceManager
TripeService
TripeServiceCommand
me.socket = socket
me.sock = None
me.lbuf = None
+ me.iowatch = SelIOWatcher(me)
def connect(me):
"""
To be overridden by subclasses to react to a connection being
established.
"""
- pass
+ me.iowatch.connected(me.sock)
def disconnected(me, reason):
"""
To be overridden by subclasses to react to a connection being severed.
"""
- pass
+ me.iowatch.disconnected()
def eof(me):
"""To be overridden by subclasses to handle end-of-file."""
"""To be overridden by subclasses to handle incoming lines."""
pass
+###--------------------------------------------------------------------------
+### I/O loop integration.
+
+class SelIOWatcher (object):
+ """
+ Integration with mLib's I/O event system.
+
+ You can replace this object with a different one for integration with,
+ e.g., glib's main loop, by setting CONN.iowatcher to a different object
+ while the CONN is disconnected.
+ """
+
+ def __init__(me, conn):
+ me._conn = conn
+ me._selfile = None
+
+ def connected(me, sock):
+ """
+ Called when a connection is made.
+
+ SOCK is the socket. The watcher must arrange to call CONN.receive when
+ data is available.
+ """
+ me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive)
+ me._selfile.enable()
+
+ def disconnected(me):
+ """
+ Called when the connection is lost.
+ """
+ me._selfile = None
+
+ def iterate(me):
+ """
+ Wait for something interesting to happen, and issue events.
+
+ That is, basically, do one iteration of a main select loop, processing
+ all of the events, and then return. This isn't needed for
+ TripeCommandDispatcher, but runservices wants it.
+ """
+ M.select()
+
+###--------------------------------------------------------------------------
+### Inter-coroutine communication.
+
+class Queue (object):
+ """
+ A queue of things arriving asynchronously.
+
+ This is a very simple single-reader multiple-writer queue. It's useful for
+ more complex coroutines which need to cope with a variety of possible
+ incoming events.
+ """
+
+ def __init__(me):
+ """Create a new empty queue."""
+ me.contents = M.Array()
+ me.waiter = None
+
+ def _wait(me):
+ """
+ Internal: wait for an item to arrive in the queue.
+
+ Complain if someone is already waiting, because this is just a
+ single-reader queue.
+ """
+ if me.waiter:
+ raise ValueError('queue already being waited on')
+ try:
+ me.waiter = Coroutine.getcurrent()
+ while not me.contents:
+ me.waiter.parent.switch()
+ finally:
+ me.waiter = None
+
+ def get(me):
+ """
+ Remove and return the item at the head of the queue.
+
+ If the queue is empty, wait until an item arrives.
+ """
+ me._wait()
+ return me.contents.shift()
+
+ def peek(me):
+ """
+ Return the item at the head of the queue without removing it.
+
+ If the queue is empty, wait until an item arrives.
+ """
+ me._wait()
+ return me.contents[0]
+
+ def put(me, thing):
+ """
+ Write THING to the queue.
+
+ If someone is waiting on the queue, wake him up immediately; otherwise
+ just leave the item there for later.
+ """
+ me.contents.push(thing)
+ if me.waiter:
+ me.waiter.switch()
+
###--------------------------------------------------------------------------
### Dispatching coroutine.
opts.append('--')
return opts
+## Deferral.
+_deferq = []
+def defer(func, *args, **kw):
+ """Call FUNC(*ARGS, **KW) later, in the root coroutine."""
+ _deferq.append((func, args, kw))
+
+def spawn(func, *args, **kw):
+ """Call FUNC, passing ARGS and KW, in a fresh coroutine."""
+ defer(lambda: Coroutine(func).switch(*args, **kw))
+
+## Asides.
+_asideq = Queue()
+def _runasides():
+ """
+ Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW).
+ """
+ while True:
+ func, args, kw = _asideq.get()
+ try:
+ func(*args, **kw)
+ except:
+ SYS.excepthook(*SYS.exc_info())
+
+def aside(func, *args, **kw):
+ """Call FUNC(*ARGS, **KW) later, in a non-root coroutine."""
+ defer(_asideq.put, (func, args, kw))
+
class TripeCommandDispatcher (TripeConnection):
"""
Command dispatcher.
for i in 'OK', 'INFO', 'FAIL':
me.handler[i] = me._fgresponse
+ def quitp(me):
+ """Should we quit the main loop? Subclasses should override."""
+ return False
+
+ def mainloop(me, quitp = None):
+ """
+ Iterate the I/O watcher until QUITP returns true.
+
+ Arranges for asides and deferred calls to be made at the right times.
+ """
+
+ global _deferq
+ assert _Coroutine.getcurrent() is rootcr
+ Coroutine(_runasides).switch()
+ while not quitp():
+ while _deferq:
+ q = _deferq
+ _deferq = []
+ for func, args, kw in q:
+ func(*args, **kw)
+ me.iowatch.iterate()
+
def connected(me):
"""
Connection hook.
"""
me.queue = M.Array()
me.cmd = {}
+ TripeConnection.connected(me)
def disconnected(me, reason):
"""
If a subclass hooks overrides this method, it must call us; sends a
special CONNERR code to all incomplete commands.
"""
+ TripeConnection.disconnected(me, reason)
for cmd in me.cmd.itervalues():
cmd.response('CONNERR', reason)
for cmd in me.queue:
###--------------------------------------------------------------------------
### Asynchronous commands.
-class Queue (object):
- """
- A queue of things arriving asynchronously.
-
- This is a very simple single-reader multiple-writer queue. It's useful for
- more complex coroutines which need to cope with a variety of possible
- incoming events.
- """
-
- def __init__(me):
- """Create a new empty queue."""
- me.contents = M.Array()
- me.waiter = None
-
- def _wait(me):
- """
- Internal: wait for an item to arrive in the queue.
-
- Complain if someone is already waiting, because this is just a
- single-reader queue.
- """
- if me.waiter:
- raise ValueError('queue already being waited on')
- try:
- me.waiter = Coroutine.getcurrent()
- while not me.contents:
- me.waiter.parent.switch()
- finally:
- me.waiter = None
-
- def get(me):
- """
- Remove and return the item at the head of the queue.
-
- If the queue is empty, wait until an item arrives.
- """
- me._wait()
- return me.contents.shift()
-
- def peek(me):
- """
- Return the item at the head of the queue without removing it.
-
- If the queue is empty, wait until an item arrives.
- """
- me._wait()
- return me.contents[0]
-
- def put(me, thing):
- """
- Write THING to the queue.
-
- If someone is waiting on the queue, wake him up immediately; otherwise
- just leave the item there for later.
- """
- me.contents.push(thing)
- if me.waiter:
- me.waiter.switch()
-
class TripeAsynchronousCommand (TripeCommand):
"""
Asynchronous commands.
"""Handle a server response by writing it to the caller's queue."""
me.queue.put((me.tag, code, list(stuff)))
-###--------------------------------------------------------------------------
-### Selecting command dispatcher.
-
-class SelCommandDispatcher (TripeCommandDispatcher):
- """
- A command dispatcher which integrates with mLib's I/O-event system.
-
- To use, simply create an instance and run mLib.select in a loop in your
- main coroutine.
- """
-
- def __init__(me, socket):
- """
- Create an instance; SOCKET is the admin socket to connect to.
-
- Note that no connection is made initially.
- """
- TripeCommandDispatcher.__init__(me, socket)
- me.selfile = None
-
- def connected(me):
- """Connection hook: wires itself into the mLib select machinery."""
- TripeCommandDispatcher.connected(me)
- me.selfile = M.SelFile(me.sock.fileno(), M.SEL_READ, me.receive)
- me.selfile.enable()
-
- def disconnected(me, reason):
- """Disconnection hook: removes itself from the mLib select machinery."""
- TripeCommandDispatcher.disconnected(me, reason)
- me.selfile = None
-
###--------------------------------------------------------------------------
### Services.
"""
pass
-class TripeServiceManager (SelCommandDispatcher):
+class TripeServiceManager (TripeCommandDispatcher):
"""
A command dispatcher with added handling for incoming service requests.
SOCKET is the administration socket to connect to.
"""
- SelCommandDispatcher.__init__(me, socket)
+ TripeCommandDispatcher.__init__(me, socket)
me.svc = {}
me.job = {}
me.runningp = False
process can quit without anyone caring).
"""
return me._quitp or (me.runningp and ((not me.svc and not me.job) or
- not me.selfile))
+ not me.sock))
def quit(me):
"""Forces the quit flag (returned by quitp) on."""
svcmgr.running()
svcmgr = TripeServiceManager(None)
-_spawnq = []
def runservices(socket, tab, init = None, setup = None, daemon = False):
"""
Function to start a service provider.
quit.
"""
- global _spawnq
svcmgr.socket = socket
svcmgr.connect()
svcs = []
M.daemonize()
if init is not None:
init()
- Coroutine(_setupsvc).switch(svcs, setup)
- while not svcmgr.quitp():
- for cr, args, kw in _spawnq:
- cr.switch(*args, **kw)
- _spawnq = []
- M.select()
-
-def spawn(cr, *args, **kw):
- """
- Utility for spawning coroutines.
-
- The coroutine CR is made to be a direct child of the root coroutine, and
- invoked by it with the given arguments.
- """
- cr.parent = rootcr
- _spawnq.append((cr, args, kw))
+ spawn(_setupsvc, svcs, setup)
+ svcmgr.mainloop()
###--------------------------------------------------------------------------
### Utilities for services.