chiark / gitweb /
Disassociate public key tags from peer names.
[tripe] / py / tripe.py.in
index 790601067e89efda6cb9d1998aea7def082f963f..37de5a404de6aff7f87037aee85ff742db52cb39 100644 (file)
@@ -31,7 +31,7 @@ implementing services.
 Rather than end up in lost in a storm of little event-driven classes, or a
 morass of concurrent threads, the module uses coroutines to present a fairly
 simple function call/return interface to potentially long-running commands
-which must run without blocking the main process.  It sassumes a coroutine
+which must run without blocking the main process.  It assumes a coroutine
 module presenting a subset of the `greenlet' interface: if actual greenlets
 are available, they are used; otherwise there's an implementation in terms of
 threads (with lots of locking) which will do instead.
@@ -75,14 +75,14 @@ Classes:
             TripeServiceJob
         OptParse
         Queue
+        SelIOWatcher
         TripeCommand
           TripeSynchronousCommand
           TripeAsynchronousCommand
         TripeCommandIterator
         TripeConnection
           TripeCommandDispatcher
-            SelCommandDispatcher
-              TripeServiceManager
+            TripeServiceManager
         TripeService
         TripeServiceCommand
 
@@ -128,7 +128,9 @@ class Coroutine (_Coroutine):
   """
   def switch(me, *args, **kw):
     assert _Coroutine.getcurrent() is rootcr
+    if _debug: print '* %s' % me
     _Coroutine.switch(me, *args, **kw)
+    if _debug: print '* %s' % rootcr
 
 ###--------------------------------------------------------------------------
 ### Default places for things.
@@ -187,6 +189,7 @@ class TripeConnection (object):
     me.socket = socket
     me.sock = None
     me.lbuf = None
+    me.iowatch = SelIOWatcher(me)
 
   def connect(me):
     """
@@ -274,13 +277,13 @@ class TripeConnection (object):
     To be overridden by subclasses to react to a connection being
     established.
     """
-    pass
+    me.iowatch.connected(me.sock)
 
   def disconnected(me, reason):
     """
     To be overridden by subclasses to react to a connection being severed.
     """
-    pass
+    me.iowatch.disconnected()
 
   def eof(me):
     """To be overridden by subclasses to handle end-of-file."""
@@ -290,6 +293,110 @@ class TripeConnection (object):
     """To be overridden by subclasses to handle incoming lines."""
     pass
 
+###--------------------------------------------------------------------------
+### I/O loop integration.
+
+class SelIOWatcher (object):
+  """
+  Integration with mLib's I/O event system.
+
+  You can replace this object with a different one for integration with,
+  e.g., glib's main loop, by setting CONN.iowatcher to a different object
+  while the CONN is disconnected.
+  """
+
+  def __init__(me, conn):
+    me._conn = conn
+    me._selfile = None
+
+  def connected(me, sock):
+    """
+    Called when a connection is made.
+
+    SOCK is the socket.  The watcher must arrange to call CONN.receive when
+    data is available.
+    """
+    me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive)
+    me._selfile.enable()
+
+  def disconnected(me):
+    """
+    Called when the connection is lost.
+    """
+    me._selfile = None
+
+  def iterate(me):
+    """
+    Wait for something interesting to happen, and issue events.
+
+    That is, basically, do one iteration of a main select loop, processing
+    all of the events, and then return.  This isn't needed for
+    TripeCommandDispatcher, but runservices wants it.
+    """
+    M.select()
+
+###--------------------------------------------------------------------------
+### Inter-coroutine communication.
+
+class Queue (object):
+  """
+  A queue of things arriving asynchronously.
+
+  This is a very simple single-reader multiple-writer queue.  It's useful for
+  more complex coroutines which need to cope with a variety of possible
+  incoming events.
+  """
+
+  def __init__(me):
+    """Create a new empty queue."""
+    me.contents = M.Array()
+    me.waiter = None
+
+  def _wait(me):
+    """
+    Internal: wait for an item to arrive in the queue.
+
+    Complain if someone is already waiting, because this is just a
+    single-reader queue.
+    """
+    if me.waiter:
+      raise ValueError('queue already being waited on')
+    try:
+      me.waiter = Coroutine.getcurrent()
+      while not me.contents:
+        me.waiter.parent.switch()
+    finally:
+      me.waiter = None
+
+  def get(me):
+    """
+    Remove and return the item at the head of the queue.
+
+    If the queue is empty, wait until an item arrives.
+    """
+    me._wait()
+    return me.contents.shift()
+
+  def peek(me):
+    """
+    Return the item at the head of the queue without removing it.
+
+    If the queue is empty, wait until an item arrives.
+    """
+    me._wait()
+    return me.contents[0]
+
+  def put(me, thing):
+    """
+    Write THING to the queue.
+
+    If someone is waiting on the queue, wake him up immediately; otherwise
+    just leave the item there for later.
+    """
+    me.contents.push(thing)
+    if me.waiter:
+      me.waiter.switch()
+
 ###--------------------------------------------------------------------------
 ### Dispatching coroutine.
 
@@ -499,6 +606,40 @@ def _kwopts(kw, allowed):
   opts.append('--')
   return opts
 
+## Deferral.
+_deferq = []
+def defer(func, *args, **kw):
+  """Call FUNC(*ARGS, **KW) later, in the root coroutine."""
+  _deferq.append((func, args, kw))
+
+def funargstr(func, args, kw):
+  items = [repr(a) for a in args]
+  for k, v in kw.iteritems():
+    items.append('%s = %r' % (k, v))
+  return '%s(%s)' % (func.__name__, ', '.join(items))
+
+def spawn(func, *args, **kw):
+  """Call FUNC, passing ARGS and KW, in a fresh coroutine."""
+  defer(lambda: (Coroutine(func, name = funargstr(func, args, kw))
+                 .switch(*args, **kw)))
+
+## Asides.
+_asideq = Queue()
+def _runasides():
+  """
+  Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW).
+  """
+  while True:
+    func, args, kw = _asideq.get()
+    try:
+      func(*args, **kw)
+    except:
+      SYS.excepthook(*SYS.exc_info())
+
+def aside(func, *args, **kw):
+  """Call FUNC(*ARGS, **KW) later, in a non-root coroutine."""
+  defer(_asideq.put, (func, args, kw))
+
 class TripeCommandDispatcher (TripeConnection):
   """
   Command dispatcher.
@@ -557,6 +698,30 @@ class TripeCommandDispatcher (TripeConnection):
     for i in 'OK', 'INFO', 'FAIL':
       me.handler[i] = me._fgresponse
 
+  def quitp(me):
+    """Should we quit the main loop?  Subclasses should override."""
+    return False
+
+  def mainloop(me, quitp = None):
+    """
+    Iterate the I/O watcher until QUITP returns true.
+
+    Arranges for asides and deferred calls to be made at the right times.
+    """
+
+    global _deferq
+    assert _Coroutine.getcurrent() is rootcr
+    Coroutine(_runasides, name = '_runasides').switch()
+    if quitp is None:
+      quitp = me.quitp
+    while not quitp():
+      while _deferq:
+        q = _deferq
+        _deferq = []
+        for func, args, kw in q:
+          func(*args, **kw)
+      me.iowatch.iterate()
+
   def connected(me):
     """
     Connection hook.
@@ -566,6 +731,7 @@ class TripeCommandDispatcher (TripeConnection):
     """
     me.queue = M.Array()
     me.cmd = {}
+    TripeConnection.connected(me)
 
   def disconnected(me, reason):
     """
@@ -574,6 +740,7 @@ class TripeCommandDispatcher (TripeConnection):
     If a subclass hooks overrides this method, it must call us; sends a
     special CONNERR code to all incomplete commands.
     """
+    TripeConnection.disconnected(me, reason)
     for cmd in me.cmd.itervalues():
       cmd.response('CONNERR', reason)
     for cmd in me.queue:
@@ -663,7 +830,8 @@ class TripeCommandDispatcher (TripeConnection):
   def add(me, peer, *addr, **kw):
     return _simple(me.command(bg = True,
                               *['ADD'] +
-                              _kwopts(kw, ['tunnel', 'keepalive', 'cork']) +
+                              _kwopts(kw, ['tunnel', 'keepalive',
+                                           'key', 'cork']) +
                               [peer] +
                               list(addr)))
   def addr(me, peer):
@@ -751,65 +919,6 @@ class TripeCommandDispatcher (TripeConnection):
 ###--------------------------------------------------------------------------
 ### Asynchronous commands.
 
-class Queue (object):
-  """
-  A queue of things arriving asynchronously.
-
-  This is a very simple single-reader multiple-writer queue.  It's useful for
-  more complex coroutines which need to cope with a variety of possible
-  incoming events.
-  """
-
-  def __init__(me):
-    """Create a new empty queue."""
-    me.contents = M.Array()
-    me.waiter = None
-
-  def _wait(me):
-    """
-    Internal: wait for an item to arrive in the queue.
-
-    Complain if someone is already waiting, because this is just a
-    single-reader queue.
-    """
-    if me.waiter:
-      raise ValueError('queue already being waited on')
-    try:
-      me.waiter = Coroutine.getcurrent()
-      while not me.contents:
-        me.waiter.parent.switch()
-    finally:
-      me.waiter = None
-
-  def get(me):
-    """
-    Remove and return the item at the head of the queue.
-
-    If the queue is empty, wait until an item arrives.
-    """
-    me._wait()
-    return me.contents.shift()
-
-  def peek(me):
-    """
-    Return the item at the head of the queue without removing it.
-
-    If the queue is empty, wait until an item arrives.
-    """
-    me._wait()
-    return me.contents[0]
-
-  def put(me, thing):
-    """
-    Write THING to the queue.
-
-    If someone is waiting on the queue, wake him up immediately; otherwise
-    just leave the item there for later.
-    """
-    me.contents.push(thing)
-    if me.waiter:
-      me.waiter.switch()
-
 class TripeAsynchronousCommand (TripeCommand):
   """
   Asynchronous commands.
@@ -836,37 +945,6 @@ class TripeAsynchronousCommand (TripeCommand):
     """Handle a server response by writing it to the caller's queue."""
     me.queue.put((me.tag, code, list(stuff)))
 
-###--------------------------------------------------------------------------
-### Selecting command dispatcher.
-
-class SelCommandDispatcher (TripeCommandDispatcher):
-  """
-  A command dispatcher which integrates with mLib's I/O-event system.
-
-  To use, simply create an instance and run mLib.select in a loop in your
-  main coroutine.
-  """
-
-  def __init__(me, socket):
-    """
-    Create an instance; SOCKET is the admin socket to connect to.
-
-    Note that no connection is made initially.
-    """
-    TripeCommandDispatcher.__init__(me, socket)
-    me.selfile = None
-
-  def connected(me):
-    """Connection hook: wires itself into the mLib select machinery."""
-    TripeCommandDispatcher.connected(me)
-    me.selfile = M.SelFile(me.sock.fileno(), M.SEL_READ, me.receive)
-    me.selfile.enable()
-
-  def disconnected(me, reason):
-    """Disconnection hook: removes itself from the mLib select machinery."""
-    TripeCommandDispatcher.disconnected(me, reason)
-    me.selfile = None
-
 ###--------------------------------------------------------------------------
 ### Services.
 
@@ -894,7 +972,7 @@ class TripeSyntaxError (Exception):
   """
   pass
 
-class TripeServiceManager (SelCommandDispatcher):
+class TripeServiceManager (TripeCommandDispatcher):
   """
   A command dispatcher with added handling for incoming service requests.
 
@@ -940,7 +1018,7 @@ class TripeServiceManager (SelCommandDispatcher):
 
     SOCKET is the administration socket to connect to.
     """
-    SelCommandDispatcher.__init__(me, socket)
+    TripeCommandDispatcher.__init__(me, socket)
     me.svc = {}
     me.job = {}
     me.runningp = False
@@ -996,7 +1074,7 @@ class TripeServiceManager (SelCommandDispatcher):
     process can quit without anyone caring).
     """
     return me._quitp or (me.runningp and ((not me.svc and not me.job) or
-                                          not me.selfile))
+                                          not me.sock))
 
   def quit(me):
     """Forces the quit flag (returned by quitp) on."""
@@ -1189,7 +1267,6 @@ def _setupsvc(tab, func):
     svcmgr.running()
 
 svcmgr = TripeServiceManager(None)
-_spawnq = []
 def runservices(socket, tab, init = None, setup = None, daemon = False):
   """
   Function to start a service provider.
@@ -1223,7 +1300,6 @@ def runservices(socket, tab, init = None, setup = None, daemon = False):
   quit.
   """
 
-  global _spawnq
   svcmgr.socket = socket
   svcmgr.connect()
   svcs = []
@@ -1240,22 +1316,8 @@ def runservices(socket, tab, init = None, setup = None, daemon = False):
     M.daemonize()
   if init is not None:
     init()
-  Coroutine(_setupsvc).switch(svcs, setup)
-  while not svcmgr.quitp():
-    for cr, args, kw in _spawnq:
-      cr.switch(*args, **kw)
-    _spawnq = []
-    M.select()
-
-def spawn(cr, *args, **kw):
-  """
-  Utility for spawning coroutines.
-
-  The coroutine CR is made to be a direct child of the root coroutine, and
-  invoked by it with the given arguments.
-  """
-  cr.parent = rootcr
-  _spawnq.append((cr, args, kw))
+  spawn(_setupsvc, svcs, setup)
+  svcmgr.mainloop()
 
 ###--------------------------------------------------------------------------
 ### Utilities for services.