### -*-python-*- ### ### Administration connection with tripe server ### ### (c) 2006 Straylight/Edgeware ### ###----- Licensing notice --------------------------------------------------- ### ### This file is part of Trivial IP Encryption (TrIPE). ### ### TrIPE is free software; you can redistribute it and/or modify ### it under the terms of the GNU General Public License as published by ### the Free Software Foundation; either version 2 of the License, or ### (at your option) any later version. ### ### TrIPE is distributed in the hope that it will be useful, ### but WITHOUT ANY WARRANTY; without even the implied warranty of ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ### GNU General Public License for more details. ### ### You should have received a copy of the GNU General Public License ### along with TrIPE; if not, write to the Free Software Foundation, ### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ This module provides classes and functions for connecting to a running tripe server, sending it commands, receiving and processing replies, and implementing services. Rather than end up in lost in a storm of little event-driven classes, or a morass of concurrent threads, the module uses coroutines to present a fairly simple function call/return interface to potentially long-running commands which must run without blocking the main process. It assumes a coroutine module presenting a subset of the `greenlet' interface: if actual greenlets are available, they are used; otherwise there's an implementation in terms of threads (with lots of locking) which will do instead. The simple rule governing the coroutines used here is this: * The root coroutine never cares what values are passed to it when it resumes: it just discards them. * Other, non-root, coroutines are presumed to be waiting for some specific thing. Configuration variables: configdir socketdir PACKAGE VERSION tripesock peerdb Other useful variables: rootcr svcmgr Other tweakables: _debug Exceptions: Exception StandardError TripeConnectionError TripeError TripeInternalError TripeJobCancelled TripeJobError TripeSyntaxError Classes: _Coroutine Coroutine TripeServiceJob OptParse Queue SelIOWatcher TripeCommand TripeSynchronousCommand TripeAsynchronousCommand TripeCommandIterator TripeConnection TripeCommandDispatcher TripeServiceManager TripeService TripeServiceCommand Utility functions: quotify runservices spawn svcinfo timespec """ __pychecker__ = 'self=me no-constCond no-argsused' _debug = False ###-------------------------------------------------------------------------- ### External dependencies. import socket as S import errno as E import mLib as M import re as RX import sys as SYS import os as OS try: if OS.getenv('TRIPE_FORCE_RMCR') is not None: raise ImportError from py.magic import greenlet as _Coroutine except ImportError: from rmcr import Coroutine as _Coroutine ###-------------------------------------------------------------------------- ### Coroutine hacking. rootcr = _Coroutine.getcurrent() class Coroutine (_Coroutine): """ A coroutine class which can only be invoked by the root coroutine. The root, by construction, cannot be an instance of this class. """ def switch(me, *args, **kw): assert _Coroutine.getcurrent() is rootcr if _debug: print '* %s' % me _Coroutine.switch(me, *args, **kw) if _debug: print '* %s' % rootcr ###-------------------------------------------------------------------------- ### Default places for things. configdir = OS.environ.get('TRIPEDIR', "@configdir@") socketdir = "@socketdir@" PACKAGE = "@PACKAGE@" VERSION = "@VERSION@" tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock')) peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb') ###-------------------------------------------------------------------------- ### Connection to the server. def readnonblockingly(sock, len): """ Nonblocking read from SOCK. Try to return LEN bytes. If couldn't read anything, return `None'. EOF is returned as an empty string. """ try: sock.setblocking(0) return sock.recv(len) except S.error, exc: if exc[0] == E.EWOULDBLOCK: return None raise class TripeConnectionError (StandardError): """Something happened to the connection with the server.""" pass class TripeInternalError (StandardError): """This program is very confused.""" pass class TripeConnection (object): """ A logical connection to the tripe administration socket. There may or may not be a physical connection. (This is needed for the monitor, for example.) This class isn't very useful on its own, but it has useful subclasses. At this level, the class is agnostic about I/O multiplexing schemes; that gets added later. """ def __init__(me, socket): """ Make a connection to the named SOCKET. No physical connection is made initially. """ me.socket = socket me.sock = None me.lbuf = None me.iowatch = SelIOWatcher(me) def connect(me): """ Ensure that there's a physical connection. Do nothing if we're already connected. Invoke the `connected' method if successful. """ if me.sock: return sock = S.socket(S.AF_UNIX, S.SOCK_STREAM) sock.connect(me.socket) me.sock = sock me.lbuf = M.LineBuffer(me.line, me._eof) me.lbuf.size = 1024 me.connected() return me def disconnect(me, reason): """ Disconnect the physical connection. Invoke the `disconnected' method, giving the provided REASON, which should be either `None' or an exception. """ if not me.sock: return me.disconnected(reason) me.sock.close() me.sock = None me.lbuf.disable() me.lbuf = None return me def connectedp(me): """ Return true if there's a current, believed-good physical connection. """ return me.sock is not None __nonzero__ = connectedp def send(me, line): """ Send the LINE to the connection's socket. All output is done through this method; it can be overridden to provide proper nonblocking writing, though this seems generally unnecessary. """ try: me.sock.setblocking(1) me.sock.send(line + '\n') except Exception, exc: me.disconnect(exc) raise return me def receive(me): """ Receive whatever's ready from the connection's socket. Call `line' on each complete line, and `eof' if the connection closed. Subclasses which attach this class to an I/O-event system should call this method when the socket (the `sock' attribute) is ready for reading. """ while me.sock is not None: try: buf = readnonblockingly(me.sock, 16384) except Exception, exc: me.disconnect(exc) raise if buf is None: return me if buf == '': me._eof() return me me.lbuf.flush(buf) return me def _eof(me): """Internal end-of-file handler.""" me.disconnect(TripeConnectionError('connection lost')) me.eof() def connected(me): """ To be overridden by subclasses to react to a connection being established. """ me.iowatch.connected(me.sock) def disconnected(me, reason): """ To be overridden by subclasses to react to a connection being severed. """ me.iowatch.disconnected() def eof(me): """To be overridden by subclasses to handle end-of-file.""" pass def line(me, line): """To be overridden by subclasses to handle incoming lines.""" pass ###-------------------------------------------------------------------------- ### I/O loop integration. class SelIOWatcher (object): """ Integration with mLib's I/O event system. You can replace this object with a different one for integration with, e.g., glib's main loop, by setting `CONN.iowatcher' to a different object while the CONN is disconnected. """ def __init__(me, conn): me._conn = conn me._selfile = None def connected(me, sock): """ Called when a connection is made. SOCK is the socket. The watcher must arrange to call `CONN.receive' when data is available. """ me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive) me._selfile.enable() def disconnected(me): """ Called when the connection is lost. """ me._selfile = None def iterate(me): """ Wait for something interesting to happen, and issue events. That is, basically, do one iteration of a main select loop, processing all of the events, and then return. This is used in the method `TripeCommandDispatcher.mainloop', but that's mostly for the benefit of `runservices'; if your I/O watcher has a different main loop, you can drive it yourself. """ M.select() ###-------------------------------------------------------------------------- ### Inter-coroutine communication. class Queue (object): """ A queue of things arriving asynchronously. This is a very simple single-reader multiple-writer queue. It's useful for more complex coroutines which need to cope with a variety of possible incoming events. """ def __init__(me): """Create a new empty queue.""" me.contents = M.Array() me.waiter = None def _wait(me): """ Internal: wait for an item to arrive in the queue. Complain if someone is already waiting, because this is just a single-reader queue. """ if me.waiter: raise ValueError('queue already being waited on') try: me.waiter = Coroutine.getcurrent() while not me.contents: me.waiter.parent.switch() finally: me.waiter = None def get(me): """ Remove and return the item at the head of the queue. If the queue is empty, wait until an item arrives. """ me._wait() return me.contents.shift() def peek(me): """ Return the item at the head of the queue without removing it. If the queue is empty, wait until an item arrives. """ me._wait() return me.contents[0] def put(me, thing): """ Write THING to the queue. If someone is waiting on the queue, wake him up immediately; otherwise just leave the item there for later. """ me.contents.push(thing) if me.waiter: me.waiter.switch() ###-------------------------------------------------------------------------- ### Dispatching coroutine. ## Match a string if it can stand on its own as a bareword: i.e., it doesn't ## contain backslashes, quotes or whitespace. rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$') ## Match characters which need to be escaped, even in quoted text. rx_weird = RX.compile(r'([\\\'])') def quotify(s): """Quote S according to the tripe-admin(5) rules.""" m = rx_ordinary.match(s) if m and m.end() == len(s): return s else: return "'" + rx_weird.sub(r'\\\1', s) + "'" def _callback(func): """ Return a wrapper for FUNC which reports exceptions thrown by it. Useful in the case of callbacks invoked by C functions which ignore exceptions. """ def _(*a, **kw): try: return func(*a, **kw) except: SYS.excepthook(*SYS.exc_info()) raise return _ class TripeCommand (object): """ This abstract class represents a command in progress. The `words' attribute contains the list of tokens which make up the command. Subclasses must implement a method to handle server responses: * response(CODE, *ARGS): CODE is one of the strings `OK', `INFO' or `FAIL'; ARGS are the remaining tokens from the server's response. """ def __init__(me, words): """Make a new command consisting of the given list of WORDS.""" me.words = words class TripeSynchronousCommand (TripeCommand): """ A simple command, processed apparently synchronously. Must be invoked from a coroutine other than the root (or whichever one is running the dispatcher); in reality, other coroutines carry on running while we wait for a response from the server. Each server response causes the calling coroutine to be resumed with the pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO' or `FAIL') and REST is a list of the server's other response tokens. The calling coroutine must continue switching back to the dispatcher until a terminating response (`OK' or `FAIL') is received or become very confused. Mostly it's better to use the `TripeCommandIterator' to do this automatically. """ def __init__(me, words): """Initialize the command, specifying the WORDS to send to the server.""" TripeCommand.__init__(me, words) me.owner = Coroutine.getcurrent() def response(me, code, *rest): """Handle a server response by forwarding it to the calling coroutine.""" me.owner.switch((code, rest)) class TripeError (StandardError): """ A tripe command failed with an error (a `FAIL' code). The args attribute contains a list of the server's message tokens. """ pass class TripeCommandIterator (object): """ Iterator interface to a tripe command. The values returned by the iterator are lists of tokens from the server's `INFO' lines, as processed by the given filter function, if any. The iterator completes normally (by raising `StopIteration') if the server reported `OK', and raises an exception if the command failed for some reason. A `TripeError' is raised if the server issues a `FAIL' code. If the connection failed, some other exception is raised. """ def __init__(me, dispatcher, words, bg = False, filter = None): """ Create a new command iterator. The command is submitted to the DISPATCHER; it consists of the given WORDS. If BG is true, then an option is inserted to request that the server run the command in the background. The FILTER is applied to the token lists which the server responds, and the filter's output are the items returned by the iterator. """ me.dcr = Coroutine.getcurrent().parent if me.dcr is None: raise ValueError, 'must invoke from coroutine' me.filter = filter or (lambda x: x) if bg: words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:]) dispatcher.rawcommand(TripeSynchronousCommand(words)) def __iter__(me): """Iterator protocol: I am my own iterator.""" return me def next(me): """ Iterator protocol: return the next piece of information from the server. `INFO' responses are filtered and returned as the values of the iteration. `FAIL' and `CONNERR' responses are turned into exceptions and raised. Finally, `OK' is turned into `StopIteration', which should cause a normal end to the iteration process. """ thing = me.dcr.switch() code, rest = thing if code == 'INFO': return me.filter(rest) elif code == 'OK': raise StopIteration elif code == 'CONNERR': if rest is None: raise TripeConnectionError, 'connection terminated by user' else: raise rest elif code == 'FAIL': raise TripeError(*rest) else: raise TripeInternalError \ ('unexpected tripe response %r' % ([code] + rest)) ### Simple utility functions for the TripeCommandIterator convenience ### methods. def _tokenjoin(words): """Filter function: simply join the given tokens with spaces between.""" return ' '.join(words) def _keyvals(iter): """Return a dictionary formed from the `KEY=VALUE' pairs returned by the iterator ITER.""" kv = {} for ww in iter: for w in ww: q = w.index('=') kv[w[:q]] = w[q + 1:] return kv def _simple(iter): """Raise an error if ITER contains any item.""" stuff = list(iter) if len(stuff) != 0: raise TripeInternalError('expected no response') return None def _oneline(iter): """If ITER contains a single item, return it; otherwise raise an error.""" stuff = list(iter) if len(stuff) != 1: raise TripeInternalError('expected only one line of response') return stuff[0] def _tracelike(iter): """Handle a TRACE-like command. The result is a list of tuples (CHAR, STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if disabled, `+' if enabled, maybe something else later), and DESC is the human-readable description.""" stuff = [] for ww in iter: ch = ww[0][0] st = ww[0][1:] desc = ' '.join(ww[1:]) stuff.append((ch, st, desc)) return stuff def _kwopts(kw, allowed): """Parse keyword arguments into options. ALLOWED is a list of allowable keywords; raise errors if other keywords are present. `KEY = VALUE' becomes an option pair `-KEY VALUE' if VALUE is a string, just the option `-KEY' if VALUE is a true non-string, or nothing if VALUE is false. Insert a `--' at the end to stop the parser getting confused.""" opts = [] amap = {} for a in allowed: amap[a] = True for k, v in kw.iteritems(): if k not in amap: raise ValueError('option %s not allowed here' % k) if isinstance(v, str): opts += ['-' + k, v] elif v: opts += ['-' + k] opts.append('--') return opts ## Deferral. _deferq = [] def defer(func, *args, **kw): """Call FUNC(*ARGS, **KW) later, in the root coroutine.""" _deferq.append((func, args, kw)) def funargstr(func, args, kw): items = [repr(a) for a in args] for k, v in kw.iteritems(): items.append('%s = %r' % (k, v)) return '%s(%s)' % (func.__name__, ', '.join(items)) def spawn(func, *args, **kw): """Call FUNC, passing ARGS and KW, in a fresh coroutine.""" defer(lambda: (Coroutine(func, name = funargstr(func, args, kw)) .switch(*args, **kw))) ## Asides. _asideq = Queue() def _runasides(): """ Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW). """ while True: func, args, kw = _asideq.get() try: func(*args, **kw) except: SYS.excepthook(*SYS.exc_info()) def aside(func, *args, **kw): """Call FUNC(*ARGS, **KW) later, in a non-root coroutine.""" defer(_asideq.put, (func, args, kw)) class TripeCommandDispatcher (TripeConnection): """ Command dispatcher. The command dispatcher is a connection which knows how to handle commands. This is probably the most important class in this module to understand. Lines from the server are parsed into tokens. The first token is a code (`OK' or `NOTE' or something) explaining what kind of line this is. The `handler' attribute is a dictionary mapping server line codes to handler functions, which are applied to the words of the line as individual arguments. *Exception*: the content of `TRACE' lines is not tokenized. There are default handlers for server codes which respond to commands. Commands arrive as `TripeCommand' instances through the `rawcommand' interface. The dispatcher keeps track of which command objects represent which jobs, and sends responses on to the appropriate command objects by invoking their `response' methods. Command objects don't see the `BG...' codes, because the dispatcher has already transformed them into regular codes when it was looking up the job tag. The dispatcher also has a special response code of its own: `CONNERR' indicates that the connection failed and the command has therefore been lost. This is sent to all outstanding commands when a connection error is encountered: rather than a token list, it is accompanied by an exception object which is the cause of the disconnection, which may be `None' if the disconnection is expected (e.g., the direct result of a user request). """ ## --- Infrastructure --- ## ## We will get confused if we pipeline commands. Send them one at a time. ## Only send a command when the previous one detaches or completes. ## ## The following attributes are interesting: ## ## tagseq Sequence number for next background job (for bgtag) ## ## queue Commands awaiting submission. ## ## cmd Mapping from job tags to commands: cmd[None] is the ## foreground command. ## ## handler Mapping from server codes to handler functions. def __init__(me, socket): """ Initialize the dispatcher. The SOCKET is the filename of the administration socket to connect to, for TripeConnection.__init__. """ TripeConnection.__init__(me, socket) me.tagseq = 0 me.handler = {} me.handler['BGDETACH'] = me._detach for i in 'BGOK', 'BGINFO', 'BGFAIL': me.handler[i] = me._response for i in 'OK', 'INFO', 'FAIL': me.handler[i] = me._fgresponse def quitp(me): """Should we quit the main loop? Subclasses should override.""" return False def mainloop(me, quitp = None): """ Iterate the I/O watcher until QUITP returns true. Arranges for asides and deferred calls to be made at the right times. """ global _deferq assert _Coroutine.getcurrent() is rootcr Coroutine(_runasides, name = '_runasides').switch() if quitp is None: quitp = me.quitp while not quitp(): while _deferq: q = _deferq _deferq = [] for func, args, kw in q: func(*args, **kw) me.iowatch.iterate() def connected(me): """ Connection hook. If a subclass overrides this method, it must call us; clears out the command queue and job map. """ me.queue = M.Array() me.cmd = {} TripeConnection.connected(me) def disconnected(me, reason): """ Disconnection hook. If a subclass hooks overrides this method, it must call us; sends a special `CONNERR' code to all incomplete commands. """ TripeConnection.disconnected(me, reason) for cmd in me.cmd.itervalues(): cmd.response('CONNERR', reason) for cmd in me.queue: cmd.response('CONNERR', reason) @_callback def line(me, line): """Handle an incoming line, sending it to the right place.""" if _debug: print '<', line code, rest = M.word(line, quotep = True) func = me.handler.get(code) if func is not None: if code == 'TRACE': func(code, rest) else: func(code, *M.split(rest, quotep = True)[0]) me.dequeue() def dequeue(me): """ Pull the oldest command off the queue and try to send it to the server. """ if not me.queue or None in me.cmd: return cmd = me.queue.shift() if _debug: print '>', ' '.join([quotify(w) for w in cmd.words]) me.send(' '.join([quotify(w) for w in cmd.words])) me.cmd[None] = cmd def bgtag(me): """ Return an unused job tag. May be of use when composing commands by hand. """ tag = 'J%05d' % me.tagseq me.tagseq += 1 return tag ## --- Built-in handler functions for server responses --- def _detach(me, _, tag): """ Respond to a `BGDETACH' TAG message. Move the current foreground command to the background. """ assert tag not in me.cmd me.cmd[tag] = me.cmd[None] del me.cmd[None] def _response(me, code, tag, *w): """ Respond to an `OK', `INFO' or `FAIL' message. If this is a message for a background job, find the tag; then dispatch the result to the command object. This is also called by `_fgresponse' (wth TAG set to `None') to handle responses for foreground commands, and is therefore a useful method to extend or override in subclasses. """ if code.startswith('BG'): code = code[2:] cmd = me.cmd[tag] if code != 'INFO': del me.cmd[tag] cmd.response(code, *w) def _fgresponse(me, code, *w): """Process responses to the foreground command.""" me._response(code, None, *w) ## --- Interface methods --- def rawcommand(me, cmd): """ Submit the `TripeCommand' CMD to the server, and look after it until it completes. """ if not me.connectedp(): raise TripeConnectionError('connection closed') me.queue.push(cmd) me.dequeue() def command(me, *cmd, **kw): """Convenience wrapper for creating a TripeCommandIterator object.""" return TripeCommandIterator(me, cmd, **kw) ## --- Convenience methods for server commands --- def add(me, peer, *addr, **kw): return _simple(me.command(bg = True, *['ADD'] + _kwopts(kw, ['tunnel', 'keepalive', 'key', 'priv', 'cork', 'mobile']) + [peer] + list(addr))) def addr(me, peer): return _oneline(me.command('ADDR', peer)) def algs(me, peer = None): return _keyvals(me.command('ALGS', *((peer is not None and [peer]) or []))) def checkchal(me, chal): return _simple(me.command('CHECKCHAL', chal)) def daemon(me): return _simple(me.command('DAEMON')) def eping(me, peer, **kw): return _oneline(me.command(bg = True, *['PING'] + _kwopts(kw, ['timeout']) + [peer])) def forcekx(me, peer): return _simple(me.command('FORCEKX', peer)) def getchal(me): return _oneline(me.command('GETCHAL', filter = _tokenjoin)) def greet(me, peer, chal): return _simple(me.command('GREET', peer, chal)) def help(me): return list(me.command('HELP', filter = _tokenjoin)) def ifname(me, peer): return _oneline(me.command('IFNAME', peer, filter = _tokenjoin)) def kill(me, peer): return _simple(me.command('KILL', peer)) def list(me): return list(me.command('LIST', filter = _tokenjoin)) def notify(me, *msg): return _simple(me.command('NOTIFY', *msg)) def peerinfo(me, peer): return _keyvals(me.command('PEERINFO', peer)) def ping(me, peer, **kw): return _oneline(me.command(bg = True, *['PING'] + _kwopts(kw, ['timeout']) + [peer])) def port(me): return _oneline(me.command('PORT', filter = _tokenjoin)) def quit(me): return _simple(me.command('QUIT')) def reload(me): return _simple(me.command('RELOAD')) def servinfo(me): return _keyvals(me.command('SERVINFO')) def setifname(me, new): return _simple(me.command('SETIFNAME', new)) def svcclaim(me, service, version): return _simple(me.command('SVCCLAIM', service, version)) def svcensure(me, service, version = None): return _simple(me.command('SVCENSURE', service, *((version is not None and [version]) or []))) def svcfail(me, job, *msg): return _simple(me.command('SVCFAIL', job, *msg)) def svcinfo(me, job, *msg): return _simple(me.command('SVCINFO', job, *msg)) def svclist(me): return list(me.command('SVCLIST')) def svcok(me, job): return _simple(me.command('SVCOK', job)) def svcquery(me, service): return _keyvals(me.command('SVCQUERY', service)) def svcrelease(me, service): return _simple(me.command('SVCRELEASE', service)) def svcsubmit(me, service, *args, **kw): return me.command(bg = True, *['SVCSUBMIT'] + _kwopts(kw, ['version']) + [service] + list(args)) def stats(me, peer): return _keyvals(me.command('STATS', peer)) def trace(me, *args): return _tracelike(me.command('TRACE', *args)) def tunnels(me): return list(me.command('TUNNELS', filter = _tokenjoin)) def version(me): return _oneline(me.command('VERSION', filter = _tokenjoin)) def warn(me, *msg): return _simple(me.command('WARN', *msg)) def watch(me, *args): return _tracelike(me.command('WATCH', *args)) ###-------------------------------------------------------------------------- ### Asynchronous commands. class TripeAsynchronousCommand (TripeCommand): """ Asynchronous commands. This is the complicated way of issuing commands. You must set up a queue, and associate the command with the queue. Responses arriving for the command will be put on the queue as an triple of the form (TAG, CODE, REST) -- where TAG is an object of your choice, not interpreted by this class, CODE is the server's response code (`OK', `INFO', `FAIL', or `CONNERR'), and REST is the list of the rest of the server's tokens. Using this, you can write coroutines which process many commands (and possibly other events) simultaneously. """ def __init__(me, queue, tag, words): """Make an asynchronous command consisting of the given WORDS, which sends responses to QUEUE, labelled with TAG.""" TripeCommand.__init__(me, words) me.queue = queue me.tag = tag def response(me, code, *stuff): """Handle a server response by writing it to the caller's queue.""" me.queue.put((me.tag, code, list(stuff))) ###-------------------------------------------------------------------------- ### Services. class TripeJobCancelled (Exception): """ Exception sent to job handler if the client kills the job. Not propagated further. """ pass class TripeJobError (Exception): """ Exception to cause failure report for running job. Sends an SVCFAIL code back. """ pass class TripeSyntaxError (Exception): """ Exception to report a syntax error for a job. Sends an SVCFAIL bad-svc-syntax message back. """ pass class TripeServiceManager (TripeCommandDispatcher): """ A command dispatcher with added handling for incoming service requests. There is usually only one instance of this class, called svcmgr. Some of the support functions in this module assume that this is the case. To use, run `mLib.select' in a loop until the quitp method returns true; then, in a non-root coroutine, register your services by calling `add', and then call `running' when you've finished setting up. The instance handles server service messages `SVCJOB', `SVCCANCEL' and `SVCCLAIM'. It maintains a table of running services. Incoming jobs cause the service's `job' method to be invoked; `SVCCANCEL' sends a `TripeJobCancelled' exception to the handler coroutine, and `SVCCLAIM' causes the relevant service to be deregistered. There is no base class for jobs, but a job must implement two methods: start() Begin processing; might be a no-op. cancel() Stop processing; the original client has killed the job. The life of a service manager is divided into two parts: setup and running; you tell the manager that you've finished setting up by calling the `running' method. If, at any point after setup is finished, there are no remaining services or jobs, `quitp' will return true, ending the process. """ ## --- Attributes --- ## ## svc Mapping name -> service object ## ## job Mapping jobid -> job handler coroutine ## ## runningp True when setup is finished ## ## _quitp True if explicit quit has been requested def __init__(me, socket): """ Initialize the service manager. SOCKET is the administration socket to connect to. """ TripeCommandDispatcher.__init__(me, socket) me.svc = {} me.job = {} me.runningp = False me.handler['SVCCANCEL'] = me._cancel me.handler['SVCJOB'] = me._job me.handler['SVCCLAIM'] = me._claim me._quitp = 0 def addsvc(me, svc): """Register a new service; SVC is a `TripeService' instance.""" assert svc.name not in me.svc me.svcclaim(svc.name, svc.version) me.svc[svc.name] = svc def _cancel(me, _, jid): """ Called when the server cancels a job; invokes the job's `cancel' method. """ job = me.job[jid] del me.job[jid] job.cancel() def _claim(me, _, svc, __): """Called when another program claims our service at a higher version.""" del me.svc[svc] def _job(me, _, jid, svc, cmd, *args): """ Called when the server sends us a job to do. Calls the service to collect a job, and begins processing it. """ assert jid not in me.job svc = me.svc[svc.lower()] job = svc.job(jid, cmd, args) me.job[jid] = job job.start() def running(me): """Answer true if setup is finished.""" me.runningp = True def jobdone(me, jid): """Informs the service manager that the job with id JID has finished.""" try: del me.job[jid] except KeyError: pass def quitp(me): """ Return true if no services or jobs are active (and, therefore, if this process can quit without anyone caring). """ return me._quitp or (me.runningp and ((not me.svc and not me.job) or not me.sock)) def quit(me): """Forces the quit flag (returned by quitp) on.""" me._quitp = True class TripeService (object): """ A standard service. The NAME and VERSION are passed on to the server. The CMDTAB is a dictionary mapping command names (in lowercase) to command objects. If the CMDTAB doesn't have entries for commands `HELP' and `QUIT' then defaults are provided. TripeService itself is mostly agnostic about the nature of command objects, but the TripeServiceJob class (below) has some requirements. The built-in HELP command requires command objects to have `usage' attributes. """ def __init__(me, name, version, cmdtab): """ Create and register a new service with the given NAME and VERSION. CMDTAB maps command names (in lower-case) to command objects. """ me.name = name me.version = version me.cmd = cmdtab me.activep = True me.cmd.setdefault('help', TripeServiceCommand('help', 0, 0, '', me._help)) me.cmd.setdefault('quit', TripeServiceCommand('quit', 0, 0, '', me._quit)) def job(me, jid, cmd, args): """ Called by the service manager: a job arrived with id JID. It asks for comamnd CMD with argument list ARGS. Creates a new job, passing it the information needed. """ return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args) ## Simple default command handlers, complying with the spec in ## tripe-service(7). def _help(me): """Send a help summary to the user.""" cmds = me.cmd.items() cmds.sort() for name, cmd in cmds: svcinfo(name, *cmd.usage) def _quit(me): """Terminate the service manager.""" svcmgr.notify('svc-quit', me.name, 'admin-request') svcmgr.quit() class TripeServiceCommand (object): """A simple service command.""" def __init__(me, name, min, max, usage, func): """ Creates a new command. NAME is the command's name (in lowercase). MIN and MAX are the minimum and maximum number of allowed arguments (used for checking); either may be None to indicate no minimum or maximum. USAGE is a usage string, used for generating help and error messages. FUNC is the function to invoke. """ me.name = name me.min = min me.max = max me.usage = usage.split() me.func = func def run(me, *args): """ Called when the command is invoked. Does minimal checking of the arguments and calls the supplied function. """ if (me.min is not None and len(args) < me.min) or \ (me.max is not None and len(args) > me.max): raise TripeSyntaxError me.func(*args) class TripeServiceJob (Coroutine): """ Job handler coroutine. A standard `TripeService' invokes a `TripeServiceJob' for each incoming job request, passing it the jobid, command and arguments, and a command object. The command object needs the following attributes. usage A usage list (excluding the command name) showing arguments and options. run(*ARGS) Function to react to the command with ARGS split into separate arguments. Invoked in a coroutine. The `svcinfo function (not the `TripeCommandDispatcher' method) may be used to send `INFO' lines. The function may raise `TripeJobError' to send a `FAIL' response back, or `TripeSyntaxError' to send a generic usage error. `TripeJobCancelled' exceptions are trapped silently. Other exceptions are translated into a generic internal-error message. This class automatically takes care of sending some closing response to the job, and for informing the service manager that the job is completed. The `jid' attribute stores the job's id. """ def __init__(me, jid, svc, cmd, command, args): """ Start a new job. The job is created with id JID, for service SVC, processing command name CMD (which the service resolved into the command object COMMAND, or `None'), and with the arguments ARGS. """ Coroutine.__init__(me) me.jid = jid me.svc = svc me.cmd = cmd me.command = command me.args = args def run(me): """ Main body of the coroutine. Does the tedious exception handling boilerplate and invokes the command's run method. """ try: try: if me.command is None: svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd) else: me.command.run(*me.args) svcmgr.svcok(me.jid) except TripeJobError, exc: svcmgr.svcfail(me.jid, *exc.args) except TripeSyntaxError: svcmgr.svcfail(me.jid, 'bad-svc-syntax', me.svc.name, me.command.name, *me.command.usage) except TripeJobCancelled: pass except Exception, exc: svcmgr.svcfail(me.jid, 'svc-internal-error', exc.__class__.__name__, str(exc)) finally: svcmgr.jobdone(me.jid) def start(me): """Invoked by the service manager to start running the coroutine.""" me.switch() def cancel(me): """Invoked by the service manager to cancel the job.""" me.throw(TripeJobCancelled()) def svcinfo(*args): """ If invoked from a TripeServiceJob coroutine, sends an `INFO' line to the job's sender, automatically using the correct job id. """ svcmgr.svcinfo(Coroutine.getcurrent().jid, *args) def _setupsvc(tab, func): """ Setup coroutine for setting up service programs. Register the given services. """ try: for service in tab: svcmgr.addsvc(service) if func: func() finally: svcmgr.running() svcmgr = TripeServiceManager(None) def runservices(socket, tab, init = None, setup = None, daemon = False): """ Function to start a service provider. SOCKET is the socket to connect to, usually tripesock. TAB is a list of entries. An entry may be either a tuple (NAME, VERSION, COMMANDS) or a service object (e.g., a `TripeService' instance). COMMANDS is a dictionary mapping command names to tuples (MIN, MAX, USAGE, FUNC) of arguments for a `TripeServiceCommand' object. If DAEMON is true, then the process is forked into the background before we start. If INIT is given, it is called in the main coroutine, immediately after forking. If SETUP is given, it is called in a coroutine, after calling INIT and setting up the services but before marking the service manager as running. It is a really bad idea to do any initialization, particularly setting up coroutines, outside of the INIT or SETUP functions. In particular, if we're using rmcr for fake coroutines, the daemonizing fork will kill off the currently established coroutines in a most surprising way. The function runs a main select loop until the service manager decides to quit. """ svcmgr.socket = socket svcmgr.connect() svcs = [] for service in tab: if not isinstance(service, tuple): svcs.append(service) else: name, version, commands = service cmdmap = {} for cmd, stuff in commands.iteritems(): cmdmap[cmd] = TripeServiceCommand(cmd, *stuff) svcs.append(TripeService(name, version, cmdmap)) if daemon: M.daemonize() if init is not None: init() spawn(_setupsvc, svcs, setup) svcmgr.mainloop() ###-------------------------------------------------------------------------- ### Utilities for services. _timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400} def timespec(spec): """Parse the timespec SPEC, returning a number of seconds.""" mul = 1 if len(spec) > 1 and spec[-1] in _timeunits: mul = _timeunits[spec[-1]] spec = spec[:-1] try: t = int(spec) except: raise TripeJobError('bad-time-spec', spec) if t < 0: raise TripeJobError('bad-time-spec', spec) return mul * int(spec) class OptParse (object): """ Parse options from a command list in the conventional fashion. ARGS is a list of arguments to a command. ALLOWED is a sequence of allowed options. The returned values are the option tags. During parsing, the `arg' method may be used to retrieve the argument for the most recent option. Afterwards, `rest' may be used to retrieve the remaining non-option arguments, and do a simple check on how many there are. The parser correctly handles `--' option terminators. """ def __init__(me, args, allowed): """ Create a new option parser. The parser will scan the ARGS for options given in the sequence ALLOWED (which are expected to include the `-' prefix). """ me.allowed = {} for a in allowed: me.allowed[a] = True me.args = list(args) def __iter__(me): """Iterator protocol: I am my own iterator.""" return me def next(me): """ Iterator protocol: return the next option. If we've run out, raise `StopIteration'. """ if len(me.args) == 0 or \ len(me.args[0]) < 2 or \ not me.args[0].startswith('-'): raise StopIteration opt = me.args.pop(0) if opt == '--': raise StopIteration if opt not in me.allowed: raise TripeSyntaxError return opt def arg(me): """ Return the argument for the most recent option. If none is available, raise `TripeSyntaxError'. """ if len(me.args) == 0: raise TripeSyntaxError return me.args.pop(0) def rest(me, min = None, max = None): """ After option parsing is done, return the remaining arguments. Check that there are at least MIN and at most MAX arguments remaining -- either may be None to suppress the check. """ if (min is not None and len(me.args) < min) or \ (max is not None and len(me.args) > max): raise TripeSyntaxError return me.args ###----- That's all, folks --------------------------------------------------