chiark / gitweb /
python: Refactoring of I/O events and suchlike.
authorMark Wooding <mdw@distorted.org.uk>
Sat, 8 May 2010 13:46:52 +0000 (14:46 +0100)
committerMark Wooding <mdw@distorted.org.uk>
Sun, 9 May 2010 14:07:37 +0000 (15:07 +0100)
There are a number of problems with the current arrangement.

  * The service manager main loop is tied to getting I/O events through
    mLib.  There's a new service coming up which wants to collect D-Bus
    signals, and that involves using the glib event loop instead.

  * Some services seem to (still) have coroutine scheduling bugs:
    they're calling functions from the wrong coroutine.  The utilities
    in tripemon would be useful, but they're tied to the glib event
    loop.

So here's the proposed fix.

  * SelCommandDispatcher is a mistake.  Split its functionality out into
    a separate class which can be plugged into TripeConnection or one of
    its descendants, so that clients which want a glib-based dispatcher
    can replace it without having to overturn the class hierarchy.

  * Move the scheduling utilities (`idly', `aside') from tripemon into
    tripe.py.  Make them available from services by providing a main
    loop in TripeConnection.

A few more wrinkles.

  * `idly' is renamed `defer' because that's what it does now: rather
    than waiting until there are no events incoming, it clears the decks
    of all deferred operations before looking for more events.

  * The interface of `spawn' has changed.  You provide a function to
    call in a new coroutine rather than a freshly minted coroutine that
    needs hacking.  All uses of `spawn' fit this pattern, so it's not a
    big deal, but it's worth mentioning.

mon/tripemon.in
py/tripe.py.in
svc/watch.in

index e666ab9..b898ec9 100644 (file)
@@ -56,46 +56,6 @@ def uncaught():
   """Report an uncaught exception."""
   excepthook(*exc_info())
 
-_idles = []
-def _runidles():
-  """Invoke the functions on the idles queue."""
-  global _idles
-  while _idles:
-    old = _idles
-    _idles = []
-    for func, args, kw in old:
-      try:
-        func(*args, **kw)
-      except:
-        uncaught()
-  return False
-
-def idly(func, *args, **kw):
-  """Invoke FUNC(*ARGS, **KW) at some later point in time."""
-  if not _idles:
-    GO.idle_add(_runidles)
-  _idles.append((func, args, kw))
-
-_asides = T.Queue()
-def _runasides():
-  """
-  Coroutine function: reads (FUNC, ARGS, KW) triples from a queue and invokes
-  FUNC(*ARGS, **KW)
-  """
-  while True:
-    func, args, kw = _asides.get()
-    try:
-      func(*args, **kw)
-    except:
-      uncaught()
-
-def aside(func, *args, **kw):
-  """
-  Arrange for FUNC(*ARGS, **KW) to be performed at some point in the future,
-  and not from the main coroutine.
-  """
-  idly(_asides.put, (func, args, kw))
-
 def xwrap(func):
   """
   Return a function which behaves like FUNC, but reports exceptions via
@@ -170,12 +130,6 @@ class HookList (object):
       if rc is not None: return rc
     return None
 
-  def runidly(me, *args, **kw):
-    """
-    Invoke the hook functions as for run, but at some point in the future.
-    """
-    idly(me.run, *args, **kw)
-
 class HookClient (object):
   """
   Mixin for classes which are clients of hooks.
@@ -215,6 +169,23 @@ rx_time = RX.compile(r'^(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)$')
 ###--------------------------------------------------------------------------
 ### Connections.
 
+class GIOWatcher (object):
+  """
+  Monitor I/O events using glib.
+  """
+  def __init__(me, conn, mc = GO.main_context_default()):
+    me._conn = conn
+    me._watch = None
+    me._mc = mc
+  def connected(me, sock):
+    me._watch = GO.io_add_watch(sock, GO.IO_IN,
+                                lambda *hunoz: me._conn.receive())
+  def disconnected(me):
+    GO.source_remove(me._watch)
+    me._watch = None
+  def iterate(me):
+    me._mc.iteration(True)
+
 class Connection (T.TripeCommandDispatcher):
   """
   The main connection to the server.
@@ -245,18 +216,15 @@ class Connection (T.TripeCommandDispatcher):
     me.handler['NOTE'] = lambda _, *rest: me.notehook.run(*rest)
     me.handler['WARN'] = lambda _, *rest: me.warnhook.run(*rest)
     me.handler['TRACE'] = lambda _, *rest: me.tracehook.run(*rest)
-    me._watch = None
+    me.iowatch = GIOWatcher(me)
 
   def connected(me):
     """Handles reconnection to the server, and signals the hook."""
     T.TripeCommandDispatcher.connected(me)
-    me._watch = GO.io_add_watch(me.sock, GO.IO_IN, invoker(me.receive))
     me.connecthook.run()
 
   def disconnected(me, reason):
     """Handles disconnection from the server, and signals the hook."""
-    GO.source_remove(me._watch)
-    me._watch = None
     me.disconnecthook.run(reason)
     T.TripeCommandDispatcher.disconnected(me, reason)
 
@@ -498,27 +466,27 @@ class Monitor (HookClient):
     the auto-peers list.
     """
     if code == 'ADD':
-      aside(me.peers.add, rest[0], None)
+      T.aside(me.peers.add, rest[0], None)
     elif code == 'KILL':
-      aside(me.peers.remove, rest[0])
+      T.aside(me.peers.remove, rest[0])
     elif code == 'NEWIFNAME':
       try:
         me.peers[rest[0]].setifname(rest[2])
       except KeyError:
         pass
     elif code == 'SVCCLAIM':
-      aside(me.services.add, rest[0], rest[1])
+      T.aside(me.services.add, rest[0], rest[1])
       if rest[0] == 'connect':
-        aside(me._updateautopeers)
+        T.aside(me._updateautopeers)
     elif code == 'SVCRELEASE':
-      aside(me.services.remove, rest[0])
+      T.aside(me.services.remove, rest[0])
       if rest[0] == 'connect':
-        aside(me._updateautopeers)
+        T.aside(me._updateautopeers)
     elif code == 'USER':
       if not rest: return
       if rest[0] == 'watch' and \
          rest[1] == 'peerdb-update':
-        aside(me._updateautopeers)
+        T.aside(me._updateautopeers)
 
 ###--------------------------------------------------------------------------
 ### Window management cruft.
@@ -949,9 +917,9 @@ class Pinger (T.Coroutine, HookClient):
     me.hook(conn.connecthook, me._connected)
     me.hook(conn.disconnecthook, me._disconnected)
     me.hook(monitor.peers.addhook,
-            lambda p: idly(me._q.put, (p, 'ADD', None)))
+            lambda p: T.defer(me._q.put, (p, 'ADD', None)))
     me.hook(monitor.peers.delhook,
-            lambda p: idly(me._q.put, (p, 'KILL', None)))
+            lambda p: T.defer(me._q.put, (p, 'KILL', None)))
     if conn.connectedp(): me.connected()
 
   def _connected(me):
@@ -1093,7 +1061,7 @@ class AddPeerDialog (MyDialog):
       conn.add(name, addr, port, keepalive = keepalive, tunnel = tunnel)
       me.destroy()
     except T.TripeError, exc:
-      idly(moanbox, ' '.join(exc))
+      T.defer(moanbox, ' '.join(exc))
 
 class ServInfo (MyWindow):
   """
@@ -1349,7 +1317,7 @@ class CryptoInfo (MyWindow):
   def __init__(me):
     MyWindow.__init__(me)
     me.set_title('Cryptographic algorithms')
-    aside(me.populate)
+    T.aside(me.populate)
   def populate(me):
     table = GridPacker()
     me.add(table)
@@ -1655,7 +1623,7 @@ class MonitorWindow (MyWindow):
     try:
       T._simple(conn.svcsubmit('connect', 'active', peer))
     except T.TripeError, exc:
-      idly(moanbox, ' '.join(exc.args))
+      T.defer(moanbox, ' '.join(exc.args))
     me.apchange()
 
   def activate(me, l, path, col):
@@ -1780,9 +1748,6 @@ def init(opts):
 
   global conn, monitor, pinger
 
-  ## Run jobs put off for later.
-  T.Coroutine(_runasides).switch()
-
   ## Try to establish a connection.
   conn = Connection(opts.tripesock)
 
@@ -1800,7 +1765,7 @@ def main():
 
   ## Main loop.
   HookClient().hook(root.closehook, exit)
-  G.main()
+  conn.mainloop()
 
 if __name__ == '__main__':
   opts = parse_options()
index e88f379..2e21885 100644 (file)
@@ -75,14 +75,14 @@ Classes:
             TripeServiceJob
         OptParse
         Queue
+        SelIOWatcher
         TripeCommand
           TripeSynchronousCommand
           TripeAsynchronousCommand
         TripeCommandIterator
         TripeConnection
           TripeCommandDispatcher
-            SelCommandDispatcher
-              TripeServiceManager
+            TripeServiceManager
         TripeService
         TripeServiceCommand
 
@@ -187,6 +187,7 @@ class TripeConnection (object):
     me.socket = socket
     me.sock = None
     me.lbuf = None
+    me.iowatch = SelIOWatcher(me)
 
   def connect(me):
     """
@@ -274,13 +275,13 @@ class TripeConnection (object):
     To be overridden by subclasses to react to a connection being
     established.
     """
-    pass
+    me.iowatch.connected(me.sock)
 
   def disconnected(me, reason):
     """
     To be overridden by subclasses to react to a connection being severed.
     """
-    pass
+    me.iowatch.disconnected()
 
   def eof(me):
     """To be overridden by subclasses to handle end-of-file."""
@@ -291,6 +292,110 @@ class TripeConnection (object):
     pass
 
 ###--------------------------------------------------------------------------
+### I/O loop integration.
+
+class SelIOWatcher (object):
+  """
+  Integration with mLib's I/O event system.
+
+  You can replace this object with a different one for integration with,
+  e.g., glib's main loop, by setting CONN.iowatcher to a different object
+  while the CONN is disconnected.
+  """
+
+  def __init__(me, conn):
+    me._conn = conn
+    me._selfile = None
+
+  def connected(me, sock):
+    """
+    Called when a connection is made.
+
+    SOCK is the socket.  The watcher must arrange to call CONN.receive when
+    data is available.
+    """
+    me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive)
+    me._selfile.enable()
+
+  def disconnected(me):
+    """
+    Called when the connection is lost.
+    """
+    me._selfile = None
+
+  def iterate(me):
+    """
+    Wait for something interesting to happen, and issue events.
+
+    That is, basically, do one iteration of a main select loop, processing
+    all of the events, and then return.  This isn't needed for
+    TripeCommandDispatcher, but runservices wants it.
+    """
+    M.select()
+
+###--------------------------------------------------------------------------
+### Inter-coroutine communication.
+
+class Queue (object):
+  """
+  A queue of things arriving asynchronously.
+
+  This is a very simple single-reader multiple-writer queue.  It's useful for
+  more complex coroutines which need to cope with a variety of possible
+  incoming events.
+  """
+
+  def __init__(me):
+    """Create a new empty queue."""
+    me.contents = M.Array()
+    me.waiter = None
+
+  def _wait(me):
+    """
+    Internal: wait for an item to arrive in the queue.
+
+    Complain if someone is already waiting, because this is just a
+    single-reader queue.
+    """
+    if me.waiter:
+      raise ValueError('queue already being waited on')
+    try:
+      me.waiter = Coroutine.getcurrent()
+      while not me.contents:
+        me.waiter.parent.switch()
+    finally:
+      me.waiter = None
+
+  def get(me):
+    """
+    Remove and return the item at the head of the queue.
+
+    If the queue is empty, wait until an item arrives.
+    """
+    me._wait()
+    return me.contents.shift()
+
+  def peek(me):
+    """
+    Return the item at the head of the queue without removing it.
+
+    If the queue is empty, wait until an item arrives.
+    """
+    me._wait()
+    return me.contents[0]
+
+  def put(me, thing):
+    """
+    Write THING to the queue.
+
+    If someone is waiting on the queue, wake him up immediately; otherwise
+    just leave the item there for later.
+    """
+    me.contents.push(thing)
+    if me.waiter:
+      me.waiter.switch()
+
+###--------------------------------------------------------------------------
 ### Dispatching coroutine.
 
 ## Match a string if it can stand on its own as a bareword: i.e., it doesn't
@@ -499,6 +604,33 @@ def _kwopts(kw, allowed):
   opts.append('--')
   return opts
 
+## Deferral.
+_deferq = []
+def defer(func, *args, **kw):
+  """Call FUNC(*ARGS, **KW) later, in the root coroutine."""
+  _deferq.append((func, args, kw))
+
+def spawn(func, *args, **kw):
+  """Call FUNC, passing ARGS and KW, in a fresh coroutine."""
+  defer(lambda: Coroutine(func).switch(*args, **kw))
+
+## Asides.
+_asideq = Queue()
+def _runasides():
+  """
+  Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW).
+  """
+  while True:
+    func, args, kw = _asideq.get()
+    try:
+      func(*args, **kw)
+    except:
+      SYS.excepthook(*SYS.exc_info())
+
+def aside(func, *args, **kw):
+  """Call FUNC(*ARGS, **KW) later, in a non-root coroutine."""
+  defer(_asideq.put, (func, args, kw))
+
 class TripeCommandDispatcher (TripeConnection):
   """
   Command dispatcher.
@@ -557,6 +689,28 @@ class TripeCommandDispatcher (TripeConnection):
     for i in 'OK', 'INFO', 'FAIL':
       me.handler[i] = me._fgresponse
 
+  def quitp(me):
+    """Should we quit the main loop?  Subclasses should override."""
+    return False
+
+  def mainloop(me, quitp = None):
+    """
+    Iterate the I/O watcher until QUITP returns true.
+
+    Arranges for asides and deferred calls to be made at the right times.
+    """
+
+    global _deferq
+    assert _Coroutine.getcurrent() is rootcr
+    Coroutine(_runasides).switch()
+    while not quitp():
+      while _deferq:
+        q = _deferq
+        _deferq = []
+        for func, args, kw in q:
+          func(*args, **kw)
+      me.iowatch.iterate()
+
   def connected(me):
     """
     Connection hook.
@@ -566,6 +720,7 @@ class TripeCommandDispatcher (TripeConnection):
     """
     me.queue = M.Array()
     me.cmd = {}
+    TripeConnection.connected(me)
 
   def disconnected(me, reason):
     """
@@ -574,6 +729,7 @@ class TripeCommandDispatcher (TripeConnection):
     If a subclass hooks overrides this method, it must call us; sends a
     special CONNERR code to all incomplete commands.
     """
+    TripeConnection.disconnected(me, reason)
     for cmd in me.cmd.itervalues():
       cmd.response('CONNERR', reason)
     for cmd in me.queue:
@@ -751,65 +907,6 @@ class TripeCommandDispatcher (TripeConnection):
 ###--------------------------------------------------------------------------
 ### Asynchronous commands.
 
-class Queue (object):
-  """
-  A queue of things arriving asynchronously.
-
-  This is a very simple single-reader multiple-writer queue.  It's useful for
-  more complex coroutines which need to cope with a variety of possible
-  incoming events.
-  """
-
-  def __init__(me):
-    """Create a new empty queue."""
-    me.contents = M.Array()
-    me.waiter = None
-
-  def _wait(me):
-    """
-    Internal: wait for an item to arrive in the queue.
-
-    Complain if someone is already waiting, because this is just a
-    single-reader queue.
-    """
-    if me.waiter:
-      raise ValueError('queue already being waited on')
-    try:
-      me.waiter = Coroutine.getcurrent()
-      while not me.contents:
-        me.waiter.parent.switch()
-    finally:
-      me.waiter = None
-
-  def get(me):
-    """
-    Remove and return the item at the head of the queue.
-
-    If the queue is empty, wait until an item arrives.
-    """
-    me._wait()
-    return me.contents.shift()
-
-  def peek(me):
-    """
-    Return the item at the head of the queue without removing it.
-
-    If the queue is empty, wait until an item arrives.
-    """
-    me._wait()
-    return me.contents[0]
-
-  def put(me, thing):
-    """
-    Write THING to the queue.
-
-    If someone is waiting on the queue, wake him up immediately; otherwise
-    just leave the item there for later.
-    """
-    me.contents.push(thing)
-    if me.waiter:
-      me.waiter.switch()
-
 class TripeAsynchronousCommand (TripeCommand):
   """
   Asynchronous commands.
@@ -837,37 +934,6 @@ class TripeAsynchronousCommand (TripeCommand):
     me.queue.put((me.tag, code, list(stuff)))
 
 ###--------------------------------------------------------------------------
-### Selecting command dispatcher.
-
-class SelCommandDispatcher (TripeCommandDispatcher):
-  """
-  A command dispatcher which integrates with mLib's I/O-event system.
-
-  To use, simply create an instance and run mLib.select in a loop in your
-  main coroutine.
-  """
-
-  def __init__(me, socket):
-    """
-    Create an instance; SOCKET is the admin socket to connect to.
-
-    Note that no connection is made initially.
-    """
-    TripeCommandDispatcher.__init__(me, socket)
-    me.selfile = None
-
-  def connected(me):
-    """Connection hook: wires itself into the mLib select machinery."""
-    TripeCommandDispatcher.connected(me)
-    me.selfile = M.SelFile(me.sock.fileno(), M.SEL_READ, me.receive)
-    me.selfile.enable()
-
-  def disconnected(me, reason):
-    """Disconnection hook: removes itself from the mLib select machinery."""
-    TripeCommandDispatcher.disconnected(me, reason)
-    me.selfile = None
-
-###--------------------------------------------------------------------------
 ### Services.
 
 class TripeJobCancelled (Exception):
@@ -894,7 +960,7 @@ class TripeSyntaxError (Exception):
   """
   pass
 
-class TripeServiceManager (SelCommandDispatcher):
+class TripeServiceManager (TripeCommandDispatcher):
   """
   A command dispatcher with added handling for incoming service requests.
 
@@ -940,7 +1006,7 @@ class TripeServiceManager (SelCommandDispatcher):
 
     SOCKET is the administration socket to connect to.
     """
-    SelCommandDispatcher.__init__(me, socket)
+    TripeCommandDispatcher.__init__(me, socket)
     me.svc = {}
     me.job = {}
     me.runningp = False
@@ -996,7 +1062,7 @@ class TripeServiceManager (SelCommandDispatcher):
     process can quit without anyone caring).
     """
     return me._quitp or (me.runningp and ((not me.svc and not me.job) or
-                                          not me.selfile))
+                                          not me.sock))
 
   def quit(me):
     """Forces the quit flag (returned by quitp) on."""
@@ -1189,7 +1255,6 @@ def _setupsvc(tab, func):
     svcmgr.running()
 
 svcmgr = TripeServiceManager(None)
-_spawnq = []
 def runservices(socket, tab, init = None, setup = None, daemon = False):
   """
   Function to start a service provider.
@@ -1223,7 +1288,6 @@ def runservices(socket, tab, init = None, setup = None, daemon = False):
   quit.
   """
 
-  global _spawnq
   svcmgr.socket = socket
   svcmgr.connect()
   svcs = []
@@ -1240,22 +1304,8 @@ def runservices(socket, tab, init = None, setup = None, daemon = False):
     M.daemonize()
   if init is not None:
     init()
-  Coroutine(_setupsvc).switch(svcs, setup)
-  while not svcmgr.quitp():
-    for cr, args, kw in _spawnq:
-      cr.switch(*args, **kw)
-    _spawnq = []
-    M.select()
-
-def spawn(cr, *args, **kw):
-  """
-  Utility for spawning coroutines.
-
-  The coroutine CR is made to be a direct child of the root coroutine, and
-  invoked by it with the given arguments.
-  """
-  cr.parent = rootcr
-  _spawnq.append((cr, args, kw))
+  spawn(_setupsvc, svcs, setup)
+  svcmgr.mainloop()
 
 ###--------------------------------------------------------------------------
 ### Utilities for services.
index bfad160..be184b4 100644 (file)
@@ -405,6 +405,16 @@ class PingPeer (object):
        '--',
        me._peer]))
 
+  def _reconnect(me):
+    info = peerinfo(me._peer)
+    if 'connect' in info:
+      S.warn('watch', 'reconnecting', me._peer)
+      S.forcekx(me._peer)
+      T.spawn(connect, me._peer)
+      me._timer = M.SelTimer(time() + me._every, me._time)
+    else:
+      S.kill(me._peer)
+
   def event(me, code, stuff):
     """
     Respond to an event which happened to this peer.
@@ -436,14 +446,7 @@ class PingPeer (object):
         if me._failures < me._retries:
           me._ping()
         else:
-          info = peerinfo(me._peer)
-          if 'connect' in info:
-            S.warn('watch', 'reconnecting', me._peer)
-            S.forcekx(me._peer)
-            T.spawn(T.Coroutine(connect), me._peer)
-            me._timer = M.SelTimer(time() + me._every, me._time)
-          else:
-            S.kill(me._peer)
+          me._reconnect()
       elif stuff[0] == 'ping-peer-died':
         me._pinger.kill(me._peer)
 
@@ -650,7 +653,7 @@ def cmd_kick(peer):
   """
   if peer not in pinger.adopted():
     raise T.TripeJobError('peer-not-adopted', peer)
-  T.spawn(T.Coroutine(connect), peer)
+  T.spawn(connect, peer)
 
 def cmd_adopted():
   """