From 2fa800105a5897cad85c51c48d1970b9061a33f5 Mon Sep 17 00:00:00 2001 Message-Id: <2fa800105a5897cad85c51c48d1970b9061a33f5.1714565093.git.mdw@distorted.org.uk> From: Mark Wooding Date: Mon, 19 Apr 2010 21:11:04 +0100 Subject: [PATCH] py: New Python module for writing services and suchlike Organization: Straylight/Edgeware From: Mark Wooding Also rmcr: coroutines in terms of threads. Other changes: * The new module uses Python 2.4 features, so make sure we have that version. --- Makefile.am | 4 + configure.ac | 1 + debian/.gitignore | 1 + debian/control | 22 +- debian/python-tripe.install | 1 + debian/rules | 6 + py/Makefile.am | 49 ++ py/rmcr.py | 208 ++++++ py/tripe.py.in | 1345 +++++++++++++++++++++++++++++++++++ 9 files changed, 1635 insertions(+), 2 deletions(-) create mode 100644 debian/python-tripe.install create mode 100644 py/Makefile.am create mode 100644 py/rmcr.py create mode 100644 py/tripe.py.in diff --git a/Makefile.am b/Makefile.am index d4af1ef4..6a814ef3 100644 --- a/Makefile.am +++ b/Makefile.am @@ -53,6 +53,7 @@ endif ## Services. if HAVE_PYTHON SUBDIRS += svc +SUBDIRS += py endif ## Key-management. @@ -128,6 +129,9 @@ EXTRA_DIST += debian/tripe-uslip.install ## keys EXTRA_DIST += debian/tripe-keys.install +## modules +EXTRA_DIST += debian/python-tripe.install + ## monitor EXTRA_DIST += debian/tripemon.install diff --git a/configure.ac b/configure.ac index 2d70b948..608298b0 100644 --- a/configure.ac +++ b/configure.ac @@ -320,6 +320,7 @@ AC_CONFIG_FILES( [pkstream/Makefile] [wireshark/Makefile] [init/Makefile] + [py/Makefile] [keys/Makefile] [svc/Makefile] [mon/Makefile] diff --git a/debian/.gitignore b/debian/.gitignore index a993e57d..50597ec9 100644 --- a/debian/.gitignore +++ b/debian/.gitignore @@ -11,6 +11,7 @@ compat ## Individual packages pkstream pathmtu +python-tripe tripe tripe.default tripe.init diff --git a/debian/control b/debian/control index 61ed0f6b..91626c2f 100644 --- a/debian/control +++ b/debian/control @@ -2,6 +2,7 @@ Source: tripe Section: net Priority: extra Maintainer: Mark Wooding +XS-Python-Version: >= 2.4 Build-Depends: catacomb-dev (>= 2.1.1), mlib-dev (>= 2.0.4), tshark, wireshark-dev (>= 0.10.10), debhelper (>= 4.0.2) Standards-Version: 3.1.1 @@ -71,7 +72,7 @@ Description: Trivial IP Encryption: a simple virtual private network Package: tripemon Architecture: all -Depends: python (>= 2.3), python-gtk2 (>= 2.6), tripe +Depends: python (>= 2.4), python-gtk2 (>= 2.6), tripe Description: Trivial IP Encryption: a simple virtual private network TrIPE is a simple VPN protocol. It uses cryptography to ensure secrecy and authenticity of packets it sends and receives. @@ -79,9 +80,26 @@ Description: Trivial IP Encryption: a simple virtual private network This package contains a graphical monitor program for managing and keeping an eye on a TrIPE server. +Package: python-tripe +Architecture: all +Depends: ${python:Depends}, python-mlib, tripe +Recommends: python-codespeak-lib +XB-Python-Version: ${python:Versions} +Description: Trivial IP Encryption: a simple virtual private network + TrIPE is a simple VPN protocol. It uses cryptography to ensure secrecy + and authenticity of packets it sends and receives. + . + This package contains Python modules for communicating with the tripe + server. + . + The package python-codespeak-lib provides an efficient implementation of + coroutines (`greenlets') used by these modules. An inefficient + implementation, based on threads, is included, so small sites can probably + manage without. + Package: tripe-keys Architecture: all -Depends: python (>= 2.3), tripe, catacomb-bin, python-catacomb +Depends: python (>= 2.4), tripe, catacomb-bin, python-catacomb Description: Trivial IP Encryption: a simple virtual private network TrIPE is a simple VPN protocol. It uses cryptography to ensure secrecy and authenticity of packets it sends and receives. diff --git a/debian/python-tripe.install b/debian/python-tripe.install new file mode 100644 index 00000000..80f0c2aa --- /dev/null +++ b/debian/python-tripe.install @@ -0,0 +1 @@ +debian/tmp/usr/lib/python* diff --git a/debian/rules b/debian/rules index 6df30ce6..12de3ab1 100755 --- a/debian/rules +++ b/debian/rules @@ -27,6 +27,12 @@ DEB_CONFIGURE_EXTRA_FLAGS = \ --with-initconfig="/etc/default/tripe" \ --with-wireshark +###-------------------------------------------------------------------------- +### Python. + +binary-install/python-tripe:: + dh_pycentral -ppython-tripe + ###-------------------------------------------------------------------------- ### Install the startup scripts. diff --git a/py/Makefile.am b/py/Makefile.am new file mode 100644 index 00000000..46446560 --- /dev/null +++ b/py/Makefile.am @@ -0,0 +1,49 @@ +### -*-makefile-*- +### +### Makefile for Python core stuff +### +### (c) 2006 Straylight/Edgeware +### + +###----- Licensing notice --------------------------------------------------- +### +### This file is part of Trivial IP Encryption (TrIPE). +### +### TrIPE is free software; you can redistribute it and/or modify +### it under the terms of the GNU General Public License as published by +### the Free Software Foundation; either version 2 of the License, or +### (at your option) any later version. +### +### TrIPE is distributed in the hope that it will be useful, +### but WITHOUT ANY WARRANTY; without even the implied warranty of +### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +### GNU General Public License for more details. +### +### You should have received a copy of the GNU General Public License +### along with TrIPE; if not, write to the Free Software Foundation, +### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +include $(top_srcdir)/vars.am + +python_PYTHON = +nodist_python_PYTHON = + +###-------------------------------------------------------------------------- +### Python modules. + +## Coroutine infrastructure. +python_PYTHON += rmcr.py + +## Main support stuff. +nodist_python_PYTHON += tripe.py +CLEANFILES += tripe.py +EXTRA_DIST += tripe.py.in + +tripe.py: tripe.py.in Makefile + $(confsubst) $(srcdir)/tripe.py.in >$@.new $(SUBSTITUTIONS) && \ + mv $@.new $@ + +## Make sure the modules are compiled. +all: $(python_PYTHON) $(nodist_python_PYTHON) + +###----- That's all, folks -------------------------------------------------- diff --git a/py/rmcr.py b/py/rmcr.py new file mode 100644 index 00000000..61521b4d --- /dev/null +++ b/py/rmcr.py @@ -0,0 +1,208 @@ +### -*-python-*- +### +### Rich man's coroutines +### +### (c) 2006 Straylight/Edgeware +### + +###----- Licensing notice --------------------------------------------------- +### +### This file is part of Trivial IP Encryption (TrIPE). +### +### TrIPE is free software; you can redistribute it and/or modify +### it under the terms of the GNU General Public License as published by +### the Free Software Foundation; either version 2 of the License, or +### (at your option) any later version. +### +### TrIPE is distributed in the hope that it will be useful, +### but WITHOUT ANY WARRANTY; without even the implied warranty of +### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +### GNU General Public License for more details. +### +### You should have received a copy of the GNU General Public License +### along with TrIPE; if not, write to the Free Software Foundation, +### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +__pychecker__ = 'self=me' + +###-------------------------------------------------------------------------- +### External dependencies. + +import thread as T +from sys import exc_info + +###-------------------------------------------------------------------------- +### What's going on? +### +### A coroutine is just a thread. Each coroutine has a recursive lock +### associated with it. The lock is almost always held. In order to switch +### to a coroutine, one releases its lock and then (re)locks one's own. + +###-------------------------------------------------------------------------- +### Low-level machinery. + +__debug = False +def _debug(msg): + if __debug: + print '+++ %s: %s' % (T.get_ident(), msg) + +def _switchto(cr, arg = None, exc = None): + """Switch to coroutine CR.""" + global active + _debug('> _switchto(%s, %s, %s)' % (cr, arg, exc)) + if not cr.livep: + raise ValueError, 'coroutine is dead' + cr._arg = arg + cr._exc = exc + if cr is active: + _debug(' _switchto: self-switch to %s' % cr) + else: + _debug(' _switchto: switch from %s to %s' % (active, cr)) + active = cr + _debug(' _switchto: %s releasing' % cr) + assert cr._lk.locked() + cr._lk.release() + _debug('< _switchto') + +###-------------------------------------------------------------------------- +### Coroutine object. + +def findvictim(cr): + """Find an appropriate victim coroutine for something, starting from CR.""" + while not cr: + if cr is None: + return main + cr = cr.parent + return cr + +class Coroutine (object): + """Heard of lightweight threads? Well, this is a heavyweight coroutine.""" + + def __init__(me, func = None, name = None, parent = None, __tid = None): + """ + Create a new coroutine object. + + The new coroutine is immediately activated. FUNC is the function to run, + and defaults to the coroutine object's `run' method, so subclassing is a + reasonable thing to do; NAME is a friendly name for the coroutine, and + shows up in debug traces. The __TID argument is used internally for + special effects, and shouldn't be used by external callers. + """ + global active + _debug('> __init__(%s, func = %s, tid = %s)' % (name, func, __tid)) + me.name = name + me._lk = T.allocate_lock() + _debug(' __init__: %s locking' % me) + me._lk.acquire() + me.livep = True + me._exc = None + me._arg = None + me.parent = parent or active + me._onexit = [None, None] + if __tid is not None: + me._tid = __tid + _debug(' __init__: manufacture cr %s with existing thread %d' % + (me, __tid)) + me._startp = False + else: + me._func = func or me.run + me._tid = T.start_new_thread(me._start, ()) + me._startp = True + assert me._lk.locked() + _debug(' __init__: create %s with new thread %d' % (me, me._tid)) + _debug('< __init__(%s)' % me) + + def __str__(me): + """Stringify a coroutine using its name if possible.""" + if me.name is not None: + return '' % me.name + else: + return repr(me) + + def _start(me): + """ + Start up the coroutine. + + Wait for this coroutine to become active, and then run the user's + function. When (if) that returns, mark the coroutine as dead. + """ + _debug('> _start(%s)' % (me)) + args, kwargs = me._wait() + _debug(' start(%s): args = %s, kwargs = %s' % (me, args, kwargs)) + me._startp = False + try: + try: + _debug(' _start(%s): call user (args = %s, kwargs = %s)' % + (me, args, kwargs)) + me._func(*args, **kwargs) + except: + _switchto(findvictim(me.parent), None, exc_info()) + finally: + _debug(' _start(%s): finally' % me) + _debug(' _start(%s): _onexit = %s' % (me, me._onexit)) + me.livep = False + _switchto(findvictim(me.parent), *me._onexit) + _debug('< _start(%s)' % me) + + def _wait(me): + """Wait for this coroutine to become active.""" + global active + _debug('> _wait(%s)' % me) + me._lk.acquire() + while me is not active: + _debug(' _wait(%s): locking' % me) + me._lk.acquire() + _debug(' _wait(%s): active, arg = %s, exc = %s' % + (me, me._arg, me._exc)) + if me._exc: + raise me._exc[0], me._exc[1], me._exc[2] + _debug('< _wait(%s): %s' % (me, me._arg)) + return me._arg + + def switch(me, *args, **kwargs): + """Switch to this coroutine, passing it the object OBJ.""" + global active + _debug('> switch(%s, args = %s, kwargs = %s)' % (me, args, kwargs)) + if me._startp: + obj = args, kwargs + else: + obj, = args or (None,) + assert not kwargs + old = active + _switchto(me, obj) + _debug('< switch') + return old._wait() + + def getcurrent(): + return active + getcurrent = staticmethod(getcurrent) + + def __nonzero__(me): + return me.livep + + def throw(me, exc, arg = None, tb = None): + """ + Switch to this coroutine, throwing it an exception. + + The exception is given by EXC, ARG and TB, which form the exception, + argument, traceback triple. + """ + global active + _debug('> throw(%s, %s, args = %s)' % (me, exc, arg)) + old = active + me._exc = [exc, arg, tb] + _switchto(me, None) + _debug('< throw') + return old._wait() + + def run(me, *args, **kw): + raise Exception('Coroutine has no body to run') + +###-------------------------------------------------------------------------- +### Setup stuff. + +active = None +main = Coroutine(_Coroutine__tid = T.get_ident(), name = '_main') +active = main + +###----- That's all, folks -------------------------------------------------- diff --git a/py/tripe.py.in b/py/tripe.py.in new file mode 100644 index 00000000..d158d1f5 --- /dev/null +++ b/py/tripe.py.in @@ -0,0 +1,1345 @@ +### -*-python-*- +### +### Administration connection with tripe server +### +### (c) 2006 Straylight/Edgeware +### + +###----- Licensing notice --------------------------------------------------- +### +### This file is part of Trivial IP Encryption (TrIPE). +### +### TrIPE is free software; you can redistribute it and/or modify +### it under the terms of the GNU General Public License as published by +### the Free Software Foundation; either version 2 of the License, or +### (at your option) any later version. +### +### TrIPE is distributed in the hope that it will be useful, +### but WITHOUT ANY WARRANTY; without even the implied warranty of +### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +### GNU General Public License for more details. +### +### You should have received a copy of the GNU General Public License +### along with TrIPE; if not, write to the Free Software Foundation, +### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +""" +This module provides classes and functions for connecting to a running tripe +server, sending it commands, receiving and processing replies, and +implementing services. + +Rather than end up in lost in a storm of little event-driven classes, or a +morass of concurrent threads, the module uses coroutines to present a fairly +simple function call/return interface to potentially long-running commands +which must run without blocking the main process. It sassumes a coroutine +module presenting a subset of the `greenlet' interface: if actual greenlets +are available, they are used; otherwise there's an implementation in terms of +threads (with lots of locking) which will do instead. + +The simple rule governing the coroutines used here is this: + + * The root coroutine never cares what values are passed to it when it + resumes: it just discards them. + + * Other, non-root, coroutines are presumed to be waiting for some specific + thing. + +Configuration variables: + configdir + socketdir + PACKAGE + VERSION + tripesock + peerdb + +Other useful variables: + rootcr + svcmgr + +Other tweakables: + _debug + +Exceptions: + Exception + StandardError + TripeConnectionError + TripeError + TripeInternalError + TripeJobCancelled + TripeJobError + TripeSyntaxError + +Classes: + _Coroutine + Coroutine + TripeServiceJob + OptParse + Queue + TripeCommand + TripeSynchronousCommand + TripeAsynchronousCommand + TripeCommandIterator + TripeConnection + TripeCommandDispatcher + SelCommandDispatcher + TripeServiceManager + TripeService + TripeServiceCommand + +Utility functions: + quotify + runservices + spawn + svcinfo + timespec +""" + +__pychecker__ = 'self=me no-constCond no-argsused' + +_debug = False + +###-------------------------------------------------------------------------- +### External dependencies. + +import socket as S +import errno as E +import mLib as M +import re as RX +import sys as SYS +import os as OS + +try: + if OS.getenv('TRIPE_FORCE_RMCR') is not None: + raise ImportError + from py.magic import greenlet as _Coroutine +except ImportError: + from rmcr import Coroutine as _Coroutine + +###-------------------------------------------------------------------------- +### Coroutine hacking. + +rootcr = _Coroutine.getcurrent() + +class Coroutine (_Coroutine): + """ + A coroutine class which can only be invoked by the root coroutine. + + The root, by construction, cannot be an instance of this class. + """ + def switch(me, *args, **kw): + assert _Coroutine.getcurrent() is rootcr + _Coroutine.switch(me, *args, **kw) + +###-------------------------------------------------------------------------- +### Default places for things. + +configdir = OS.environ.get('TRIPEDIR', "@configdir@") +socketdir = "@socketdir@" +PACKAGE = "@PACKAGE@" +VERSION = "@VERSION@" + +tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock')) + +###-------------------------------------------------------------------------- +### Connection to the server. + +def readnonblockingly(sock, len): + """ + Nonblocking read from SOCK. + + Try to return LEN bytes. If couldn't read anything, return None. EOF is + returned as an empty string. + """ + try: + sock.setblocking(0) + return sock.recv(len) + except S.error, exc: + if exc[0] == E.EWOULDBLOCK: + return None + raise + +class TripeConnectionError (StandardError): + """Something happened to the connection with the server.""" + pass +class TripeInternalError (StandardError): + """This program is very confused.""" + pass + +class TripeConnection (object): + """ + A logical connection to the tripe administration socket. + + There may or may not be a physical connection. (This is needed for the + monitor, for example.) + + This class isn't very useful on its own, but it has useful subclasses. At + this level, the class is agnostic about I/O multiplexing schemes; that gets + added later. + """ + + def __init__(me, socket): + """ + Make a connection to the named SOCKET. + + No physical connection is made initially. + """ + me.socket = socket + me.sock = None + me.lbuf = None + + def connect(me): + """ + Ensure that there's a physical connection. + + Do nothing if we're already connected. Invoke the `connected' method if + successful. + """ + if me.sock: return + sock = S.socket(S.AF_UNIX, S.SOCK_STREAM) + sock.connect(me.socket) + me.sock = sock + me.lbuf = M.LineBuffer(me.line, me._eof) + me.lbuf.size = 1024 + me.connected() + return me + + def disconnect(me, reason): + """ + Disconnect the physical connection. + + Invoke the `disconnected' method, giving the provided REASON, which + should be either None or an exception. + """ + if not me.sock: return + me.disconnected(reason) + me.sock.close() + me.sock = None + me.lbuf.disable() + me.lbuf = None + return me + + def connectedp(me): + """ + Return true if there's a current, believed-good physical connection. + """ + return me.sock is not None + + __nonzero__ = connectedp + + def send(me, line): + """ + Send the LINE to the connection's socket. + + All output is done through this method; it can be overridden to provide + proper nonblocking writing, though this seems generally unnecessary. + """ + try: + me.sock.setblocking(1) + me.sock.send(line + '\n') + except Exception, exc: + me.disconnect(exc) + raise + return me + + def receive(me): + """ + Receive whatever's ready from the connection's socket. + + Call `line' on each complete line, and `eof' if the connection closed. + Subclasses which attach this class to an I/O-event system should call + this method when the socket (CONN.sock) is ready for reading. + """ + while me.sock is not None: + try: + buf = readnonblockingly(me.sock, 16384) + except Exception, exc: + me.disconnect(exc) + raise + if buf is None: + return me + if buf == '': + me._eof() + return me + me.lbuf.flush(buf) + return me + + def _eof(me): + """Internal end-of-file handler.""" + me.disconnect(TripeConnectionError('connection lost')) + me.eof() + + def connected(me): + """ + To be overridden by subclasses to react to a connection being + established. + """ + pass + + def disconnected(me, reason): + """ + To be overridden by subclasses to react to a connection being severed. + """ + pass + + def eof(me): + """To be overridden by subclasses to handle end-of-file.""" + pass + + def line(me, line): + """To be overridden by subclasses to handle incoming lines.""" + pass + +###-------------------------------------------------------------------------- +### Dispatching coroutine. + +## Match a string if it can stand on its own as a bareword: i.e., it doesn't +## contain backslashes, quotes or whitespace. +rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$') + +## Match characters which need to be escaped, even in quoted text. +rx_weird = RX.compile(r'([\\\'])') + +def quotify(s): + """Quote S according to the tripe-admin(5) rules.""" + m = rx_ordinary.match(s) + if m and m.end() == len(s): + return s + else: + return "'" + rx_weird.sub(r'\\\1', s) + "'" + +def _callback(func): + """ + Return a wrapper for FUNC which reports exceptions thrown by it. + + Useful in the case of callbacks invoked by C functions which ignore + exceptions. + """ + def _(*a, **kw): + try: + return func(*a, **kw) + except: + SYS.excepthook(*SYS.exc_info()) + raise + return _ + +class TripeCommand (object): + """ + This abstract class represents a command in progress. + + The `words' attribute contains the list of tokens which make up the + command. + + Subclasses must implement a method to handle server responses: + + * response(CODE, *ARGS): CODE is one of the strings 'OK', 'INFO' or + 'FAIL'; ARGS are the remaining tokens from the server's response. + """ + + def __init__(me, words): + """Make a new command consisting of the given list of WORDS.""" + me.words = words + +class TripeSynchronousCommand (TripeCommand): + """ + A simple command, processed apparently synchronously. + + Must be invoked from a coroutine other than the root (or whichever one is + running the dispatcher); in reality, other coroutines carry on running + while we wait for a response from the server. + + Each server response causes the calling coroutine to be resumed with the + pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO' + or `FAIL') and REST is a list of the server's other response tokens. The + calling coroutine must continue switching back to the dispatcher until a + terminating response (`OK' or `FAIL') is received or become very + confused. + + Mostly it's better to use the TripeCommandIterator to do this + automatically. + """ + + def __init__(me, words): + """Initialize the command, specifying the WORDS to send to the server.""" + TripeCommand.__init__(me, words) + me.owner = Coroutine.getcurrent() + + def response(me, code, *rest): + """Handle a server response by forwarding it to the calling coroutine.""" + me.owner.switch((code, rest)) + +class TripeError (StandardError): + """ + A tripe command failed with an error (a FAIL code). The args attribute + contains a list of the server's message tokens. + """ + pass + +class TripeCommandIterator (object): + """ + Iterator interface to a tripe command. + + The values returned by the iterator are lists of tokens from the server's + INFO lines, as processed by the given filter function, if any. The + iterator completes normally (by raising StopIteration) if the server + reported OK, and raises an exception if the command failed for some reason. + + A TripeError is raised if the server issues a FAIL code. If the connection + failed, some other exception is raised. + """ + + def __init__(me, dispatcher, words, bg = False, filter = None): + """ + Create a new command iterator. + + The command is submitted to the DISPATCHER; it consists of the given + WORDS. If BG is true, then an option is inserted to request that the + server run the command in the background. The FILTER is applied to the + token lists which the server responds, and the filter's output are the + items returned by the iterator. + """ + me.dcr = Coroutine.getcurrent().parent + if me.dcr is None: + raise ValueError, 'must invoke from coroutine' + me.filter = filter or (lambda x: x) + if bg: + words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:]) + dispatcher.rawcommand(TripeSynchronousCommand(words)) + + def __iter__(me): + """Iterator protocol: I am my own iterator.""" + return me + + def next(me): + """ + Iterator protocol: return the next piece of information from the server. + + INFO responses are filtered and returned as the values of the iteration. + FAIL and CONNERR responses are turned into exceptions and raised. + Finally, OK is turned into StopIteration, which should cause a normal end + to the iteration process. + """ + thing = me.dcr.switch() + code, rest = thing + if code == 'INFO': + return me.filter(rest) + elif code == 'OK': + raise StopIteration + elif code == 'CONNERR': + if rest is None: + raise TripeConnectionError, 'connection terminated by user' + else: + raise rest + elif code == 'FAIL': + raise TripeError(*rest) + else: + raise TripeInternalError \ + ('unexpected tripe response %r' % ([code] + rest)) + +### Simple utility functions for the TripeCommandIterator convenience +### methods. + +def _tokenjoin(words): + """Filter function: simply join the given tokens with spaces between.""" + return ' '.join(words) + +def _keyvals(iter): + """Return a dictionary formed from the KEY=VALUE pairs returned by the + iterator ITER.""" + kv = {} + for ww in iter: + for w in ww: + q = w.index('=') + kv[w[:q]] = w[q + 1:] + return kv + +def _simple(iter): + """Raise an error if ITER contains any item.""" + stuff = list(iter) + if len(stuff) != 0: + raise TripeInternalError('expected no response') + return None + +def _oneline(iter): + """If ITER contains a single item, return it; otherwise raise an error.""" + stuff = list(iter) + if len(stuff) != 1: + raise TripeInternalError('expected only one line of response') + return stuff[0] + +def _tracelike(iter): + """Handle a TRACE-like command. The result is a list of tuples (CHAR, + STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if + disabled, `+' if enabled, maybe something else later), and DESC is the + human-readable description.""" + stuff = [] + for ww in iter: + ch = ww[0][0] + st = ww[0][1:] + desc = ' '.join(ww[1:]) + stuff.append((ch, st, desc)) + return stuff + +def _kwopts(kw, allowed): + """Parse keyword arguments into options. ALLOWED is a list of allowable + keywords; raise errors if other keywords are present. KEY = VALUE becomes + an option pair -KEY VALUE if VALUE is a string, just the option -KEY if + VALUE is a true non-string, or nothing if VALUE is false.. Insert a `--' + at the end to stop the parser getting confused.""" + opts = [] + amap = {} + for a in allowed: amap[a] = True + for k, v in kw.iteritems(): + if k not in amap: + raise ValueError('option %s not allowed here' % k) + if isinstance(v, str): + opts += ['-' + k, v] + elif v: + opts += ['-' + k] + opts.append('--') + return opts + +class TripeCommandDispatcher (TripeConnection): + """ + Command dispatcher. + + The command dispatcher is a connection which knows how to handle commands. + This is probably the most important class in this module to understand. + + Lines from the server are parsed into tokens. The first token is a code + (OK or NOTE or something) explaining what kind of line this is. The + `handler' attribute is a dictionary mapping server line codes to handler + functions, which are applied to the words of the line as individual + arguments. *Exception*: the content of TRACE lines is not tokenized. + + There are default handlers for server codes which respond to commands. + Commands arrive as TripeCommand instances through the `rawcommand' + interface. The dispatcher keeps track of which command objects represent + which jobs, and sends responses on to the appropriate command objects by + invoking their `response' methods. Command objects don't see the + BG... codes, because the dispatcher has already transformed them into + regular codes when it was looking up job code. + + The dispatcher also has a special response code of its own: CONNERR + indicates that the connection failed and the command has therefore been + lost; the + """ + + ## --- Infrastructure --- + ## + ## We will get confused if we pipeline commands. Send them one at a time. + ## Only send a command when the previous one detaches or completes. + ## + ## The following attributes are interesting: + ## + ## tagseq Sequence number for next background job (for bgtag) + ## + ## queue Commands awaiting submission. + ## + ## cmd Mapping from job tags to commands: cmd[None] is the + ## foreground command. + ## + ## handler Mapping from server codes to handler functions. + + def __init__(me, socket): + """ + Initialize the dispatcher. + + The SOCKET is the filename of the administration socket to connect to, + for TripeConnection.__init__. + """ + TripeConnection.__init__(me, socket) + me.tagseq = 0 + me.handler = {} + me.handler['BGDETACH'] = me._detach + for i in 'BGOK', 'BGINFO', 'BGFAIL': + me.handler[i] = me._response + for i in 'OK', 'INFO', 'FAIL': + me.handler[i] = me._fgresponse + + def connected(me): + """ + Connection hook. + + If a subclass overrides this method, it must call us; clears out the + command queue and job map. + """ + me.queue = M.Array() + me.cmd = {} + + def disconnected(me, reason): + """ + Disconnection hook. + + If a subclass hooks overrides this method, it must call us; sends a + special CONNERR code to all incomplete commands. + """ + for cmd in me.cmd.itervalues(): + cmd.response('CONNERR', reason) + for cmd in me.queue: + cmd.response('CONNERR', reason) + + @_callback + def line(me, line): + """Handle an incoming line, sending it to the right place.""" + if _debug: print '<', line + code, rest = M.word(line, quotep = True) + func = me.handler.get(code) + if func is not None: + if code == 'TRACE': + func(code, rest) + else: + func(code, *M.split(rest, quotep = True)[0]) + me.dequeue() + + def dequeue(me): + """ + Pull the oldest command off the queue and try to send it to the server. + """ + if not me.queue or None in me.cmd: return + cmd = me.queue.shift() + if _debug: print '>', ' '.join([quotify(w) for w in cmd.words]) + me.send(' '.join([quotify(w) for w in cmd.words])) + me.cmd[None] = cmd + + def bgtag(me): + """ + Return an unused job tag. + + May be of use when composing commands by hand. + """ + tag = 'J%05d' % me.tagseq + me.tagseq += 1 + return tag + + ## --- Built-in handler functions for server responses --- + + def _detach(me, _, tag): + """ + Respond to a BGDETACH TAG message. + + Move the current foreground command to the background. + """ + assert tag not in me.cmd + me.cmd[tag] = me.cmd[None] + del me.cmd[None] + + def _response(me, code, tag, *w): + """ + Respond to an OK, INFO or FAIL message. + + If this is a message for a background job, find the tag; then dispatch + the result to the command object. + """ + if code.startswith('BG'): + code = code[2:] + cmd = me.cmd[tag] + if code != 'INFO': + del me.cmd[tag] + cmd.response(code, *w) + + def _fgresponse(me, code, *w): + """Process responses to the foreground command.""" + me._response(code, None, *w) + + ## --- Interface methods --- + + def rawcommand(me, cmd): + """ + Submit the TripeCommand CMD to the server, and look after it until it + completes. + """ + if not me.connectedp(): + raise TripeConnectionError('connection closed') + me.queue.push(cmd) + me.dequeue() + + def command(me, *cmd, **kw): + """Convenience wrapper for creating a TripeCommandIterator object.""" + return TripeCommandIterator(me, cmd, **kw) + + ## --- Convenience methods for server commands --- + + def add(me, peer, *addr, **kw): + return _simple(me.command(bg = True, + *['ADD'] + + _kwopts(kw, ['tunnel', 'keepalive', 'cork']) + + [peer] + + list(addr))) + def addr(me, peer): + return _oneline(me.command('ADDR', peer)) + def algs(me): + return _keyvals(me.command('ALGS')) + def checkchal(me, chal): + return _simple(me.command('CHECKCHAL', chal)) + def daemon(me): + return _simple(me.command('DAEMON')) + def eping(me, peer, **kw): + return _oneline(me.command(bg = True, + *['PING'] + + _kwopts(kw, ['timeout']) + + [peer])) + def forcekx(me, peer): + return _simple(me.command('FORCEKX', peer)) + def getchal(me): + return _oneline(me.command('GETCHAL', filter = _tokenjoin)) + def greet(me, peer, chal): + return _simple(me.command('GREET', peer, chal)) + def help(me): + return list(me.command('HELP', filter = _tokenjoin)) + def ifname(me, peer): + return _oneline(me.command('IFNAME', peer, filter = _tokenjoin)) + def kill(me, peer): + return _simple(me.command('KILL', peer)) + def list(me): + return list(me.command('LIST', filter = _tokenjoin)) + def notify(me, *msg): + return _simple(me.command('NOTIFY', *msg)) + def peerinfo(me, peer): + return _keyvals(me.command('PEERINFO', peer)) + def ping(me, peer, **kw): + return _oneline(me.command(bg = True, + *['PING'] + + _kwopts(kw, ['timeout']) + + [peer])) + def port(me): + return _oneline(me.command('PORT', filter = _tokenjoin)) + def quit(me): + return _simple(me.command('QUIT')) + def reload(me): + return _simple(me.command('RELOAD')) + def servinfo(me): + return _keyvals(me.command('SERVINFO')) + def setifname(me, new): + return _simple(me.command('SETIFNAME', new)) + def svcclaim(me, service, version): + return _simple(me.command('SVCCLAIM', service, version)) + def svcensure(me, service, version = None): + return _simple(me.command('SVCENSURE', service, + *((version is not None and [version]) or []))) + def svcfail(me, job, *msg): + return _simple(me.command('SVCFAIL', job, *msg)) + def svcinfo(me, job, *msg): + return _simple(me.command('SVCINFO', job, *msg)) + def svclist(me): + return list(me.command('SVCLIST')) + def svcok(me, job): + return _simple(me.command('SVCOK', job)) + def svcquery(me, service): + return _keyvals(me.command('SVCQUERY', service)) + def svcrelease(me, service): + return _simple(me.command('SVCRELEASE', service)) + def svcsubmit(me, service, *args, **kw): + return me.command(bg = True, + *['SVCSUBMIT'] + + _kwopts(kw, ['version']) + + [service] + + list(args)) + def stats(me, peer): + return _keyvals(me.command('STATS', peer)) + def trace(me, *args): + return _tracelike(me.command('TRACE', *args)) + def tunnels(me): + return list(me.command('TUNNELS', filter = _tokenjoin)) + def version(me): + return _oneline(me.command('VERSION', filter = _tokenjoin)) + def warn(me, *msg): + return _simple(me.command('WARN', *msg)) + def watch(me, *args): + return _tracelike(me.command('WATCH', *args)) + +###-------------------------------------------------------------------------- +### Asynchronous commands. + +class Queue (object): + """ + A queue of things arriving asynchronously. + + This is a very simple single-reader multiple-writer queue. It's useful for + more complex coroutines which need to cope with a variety of possible + incoming events. + """ + + def __init__(me): + """Create a new empty queue.""" + me.contents = M.Array() + me.waiter = None + + def _wait(me): + """ + Internal: wait for an item to arrive in the queue. + + Complain if someone is already waiting, because this is just a + single-reader queue. + """ + if me.waiter: + raise ValueError('queue already being waited on') + try: + me.waiter = Coroutine.getcurrent() + while not me.contents: + me.waiter.parent.switch() + finally: + me.waiter = None + + def get(me): + """ + Remove and return the item at the head of the queue. + + If the queue is empty, wait until an item arrives. + """ + me._wait() + return me.contents.shift() + + def peek(me): + """ + Return the item at the head of the queue without removing it. + + If the queue is empty, wait until an item arrives. + """ + me._wait() + return me.contents[0] + + def put(me, thing): + """ + Write THING to the queue. + + If someone is waiting on the queue, wake him up immediately; otherwise + just leave the item there for later. + """ + me.contents.push(thing) + if me.waiter: + me.waiter.switch() + +class TripeAsynchronousCommand (TripeCommand): + """ + Asynchronous commands. + + This is the complicated way of issuing commands. You must set up a queue, + and associate the command with the queue. Responses arriving for the + command will be put on the queue as an triple of the form (TAG, CODE, REST) + -- where TAG is an object of your choice, not interpreted by this class, + CODE is the server's response code (OK, INFO, FAIL), and REST is the list + of the rest of the server's tokens. + + Using this, you can write coroutines which process many commands (and + possibly other events) simultaneously. + """ + + def __init__(me, queue, tag, words): + """Make an asynchronous command consisting of the given WORDS, which + sends responses to QUEUE, labelled with TAG.""" + TripeCommand.__init__(me, words) + me.queue = queue + me.tag = tag + + def response(me, code, *stuff): + """Handle a server response by writing it to the caller's queue.""" + me.queue.put((me.tag, code, list(stuff))) + +###-------------------------------------------------------------------------- +### Selecting command dispatcher. + +class SelCommandDispatcher (TripeCommandDispatcher): + """ + A command dispatcher which integrates with mLib's I/O-event system. + + To use, simply create an instance and run mLib.select in a loop in your + main coroutine. + """ + + def __init__(me, socket): + """ + Create an instance; SOCKET is the admin socket to connect to. + + Note that no connection is made initially. + """ + TripeCommandDispatcher.__init__(me, socket) + me.selfile = None + + def connected(me): + """Connection hook: wires itself into the mLib select machinery.""" + TripeCommandDispatcher.connected(me) + me.selfile = M.SelFile(me.sock.fileno(), M.SEL_READ, me.receive) + me.selfile.enable() + + def disconnected(me, reason): + """Disconnection hook: removes itself from the mLib select machinery.""" + TripeCommandDispatcher.disconnected(me, reason) + me.selfile = None + +###-------------------------------------------------------------------------- +### Services. + +class TripeJobCancelled (Exception): + """ + Exception sent to job handler if the client kills the job. + + Not propagated further. + """ + pass + +class TripeJobError (Exception): + """ + Exception to cause failure report for running job. + + Sends an SVCFAIL code back. + """ + pass + +class TripeSyntaxError (Exception): + """ + Exception to report a syntax error for a job. + + Sends an SVCFAIL bad-svc-syntax message back. + """ + pass + +class TripeServiceManager (SelCommandDispatcher): + """ + A command dispatcher with added handling for incoming service requests. + + There is usually only one instance of this class, called svcmgr. Some of + the support functions in this module assume that this is the case. + + To use, run mLib.select in a loop until the quitp method returns true; + then, in a non-root coroutine, register your services by calling `add', and + then call `running' when you've finished setting up. + + The instance handles server service messages SVCJOB, SVCCANCEL and + SVCCLAIM. It maintains a table of running services. Incoming jobs cause + the service's `job' method to be invoked; SVCCANCEL sends a + TripeJobCancelled exception to the handler coroutine, and SVCCLAIM causes + the relevant service to be deregistered. + + There is no base class for jobs, but a job must implement two methods: + + start() Begin processing; might be a no-op. + + cancel() Stop processing; the original client has killed the + job. + + The life of a service manager is divided into two parts: setup and running; + you tell the manager that you've finished setting up by calling the + `running' method. If, at any point after setup is finished, there are no + remaining services or jobs, `quitp' will return true, ending the process. + """ + + ## --- Attributes --- + ## + ## svc Mapping name -> service object + ## + ## job Mapping jobid -> job handler coroutine + ## + ## runningp True when setup is finished + ## + ## _quitp True if explicit quit has been requested + + def __init__(me, socket): + """ + Initialize the service manager. + + SOCKET is the administration socket to connect to. + """ + SelCommandDispatcher.__init__(me, socket) + me.svc = {} + me.job = {} + me.runningp = False + me.handler['SVCCANCEL'] = me._cancel + me.handler['SVCJOB'] = me._job + me.handler['SVCCLAIM'] = me._claim + me._quitp = 0 + + def addsvc(me, svc): + """Register a new service; SVC is a TripeService instance.""" + assert svc.name not in me.svc + me.svcclaim(svc.name, svc.version) + me.svc[svc.name] = svc + + def _cancel(me, _, jid): + """ + Called when the server cancels a job; invokes the job's `cancel' method. + """ + job = me.job[jid] + del me.job[jid] + job.cancel() + + def _claim(me, _, svc, __): + """Called when another program claims our service at a higher version.""" + del me.svc[svc] + + def _job(me, _, jid, svc, cmd, *args): + """ + Called when the server sends us a job to do. + + Calls the service to collect a job, and begins processing it. + """ + assert jid not in me.job + svc = me.svc[svc.lower()] + job = svc.job(jid, cmd, args) + me.job[jid] = job + job.start() + + def running(me): + """Answer true if setup is finished.""" + me.runningp = True + + def jobdone(me, jid): + """Informs the service manager that the job with id JID has finished.""" + try: + del me.job[jid] + except KeyError: + pass + + def quitp(me): + """ + Return true if no services or jobs are active (and, therefore, if this + process can quit without anyone caring). + """ + return me._quitp or (me.runningp and ((not me.svc and not me.job) or + not me.selfile)) + + def quit(me): + """Forces the quit flag (returned by quitp) on.""" + me._quitp = True + +class TripeService (object): + """ + A standard service. + + The NAME and VERSION are passed on to the server. The CMDTAB is a + dictionary mapping command names (in lowercase) to command objects. + + If the CMDTAB doesn't have entries for commands HELP and QUIT then defaults + are provided. + + TripeService itself is mostly agnostic about the nature of command objects, + but the TripeServiceJob class (below) has some requirements. The built-in + HELP command requires command objects to have `usage' attributes. + """ + + def __init__(me, name, version, cmdtab): + """ + Create and register a new service with the given NAME and VERSION. + + CMDTAB maps command names (in lower-case) to command objects. + """ + me.name = name + me.version = version + me.cmd = cmdtab + me.activep = True + me.cmd.setdefault('help', + TripeServiceCommand('help', 0, 0, '', me._help)) + me.cmd.setdefault('quit', + TripeServiceCommand('quit', 0, 0, '', me._quit)) + + def job(me, jid, cmd, args): + """ + Called by the service manager: a job arrived with id JID. + + It asks for comamnd CMD with argument list ARGS. Creates a new job, + passing it the information needed. + """ + return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args) + + ## Simple default command handlers, complying with the spec in + ## tripe-service(7). + + def _help(me): + """Send a help summary to the user.""" + cmds = me.cmd.items() + cmds.sort() + for name, cmd in cmds: + svcinfo(name, *cmd.usage) + + def _quit(me): + """Terminate the service manager.""" + svcmgr.notify('svc-quit', me.name, 'admin-request') + svcmgr.quit() + +class TripeServiceCommand (object): + """A simple service command.""" + + def __init__(me, name, min, max, usage, func): + """ + Creates a new command. + + NAME is the command's name (in lowercase). + + MIN and MAX are the minimum and maximum number of allowed arguments (used + for checking); either may be None to indicate no minimum or maximum. + + USAGE is a usage string, used for generating help and error messages. + + FUNC is the function to invoke. + """ + me.name = name + me.min = min + me.max = max + me.usage = usage.split() + me.func = func + + def run(me, *args): + """ + Called when the command is invoked. + + Does minimal checking of the arguments and calls the supplied function. + """ + if (me.min is not None and len(args) < me.min) or \ + (me.max is not None and len(args) > me.max): + raise TripeSyntaxError + me.func(*args) + +class TripeServiceJob (Coroutine): + """ + Job handler coroutine. + + A standard TripeService invokes a TripeServiceJob for each incoming job + request, passing it the jobid, command and arguments, and a command + object. The command object needs the following attributes. + + usage A usage list (excluding the command name) showing + arguments and options. + + run(*ARGS) Function to react to the command with ARGS split into + separate arguments. Invoked in a coroutine. The + svcinfo function (not the TripeCommandDispatcher + method) may be used to send INFO lines. The function + may raise TripeJobError to send a FAIL response back, + or TripeSyntaxError to send a generic usage error. + TripeJobCancelled exceptions are trapped silently. + Other exceptions are translated into a generic + internal-error message. + + This class automatically takes care of sending some closing response to the + job, and for informing the service manager that the job is completed. + + The `jid' attribute stores the job's id. + """ + + def __init__(me, jid, svc, cmd, command, args): + """ + Start a new job. + + The job is created with id JID, for service SVC, processing command name + CMD (which the service resolved into the command object COMMAND, or + None), and with the arguments ARGS. + """ + Coroutine.__init__(me) + me.jid = jid + me.svc = svc + me.cmd = cmd + me.command = command + me.args = args + + def run(me): + """ + Main body of the coroutine. + + Does the tedious exception handling boilerplate and invokes the command's + run method. + """ + try: + try: + if me.command is None: + svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd) + else: + me.command.run(*me.args) + svcmgr.svcok(me.jid) + except TripeJobError, exc: + svcmgr.svcfail(me.jid, *exc.args) + except TripeSyntaxError: + svcmgr.svcfail(me.jid, 'bad-svc-syntax', + me.svc.name, me.command.name, + *me.command.usage) + except TripeJobCancelled: + pass + except Exception, exc: + svcmgr.svcfail(me.jid, 'svc-internal-error', + exc.__class__.__name__, str(exc)) + finally: + svcmgr.jobdone(me.jid) + + def start(me): + """Invoked by the service manager to start running the coroutine.""" + me.switch() + + def cancel(me): + """Invoked by the service manager to cancel the job.""" + me.throw(TripeJobCancelled()) + +def svcinfo(*args): + """ + If invoked from a TripeServiceJob coroutine, sends an INFO line to the + job's sender, automatically using the correct job id. + """ + svcmgr.svcinfo(Coroutine.getcurrent().jid, *args) + +def _setupsvc(tab, func): + """ + Setup coroutine for setting up service programs. + + Register the given services. + """ + try: + for service in tab: + svcmgr.addsvc(service) + if func: + func() + finally: + svcmgr.running() + +svcmgr = TripeServiceManager(None) +_spawnq = [] +def runservices(socket, tab, init = None, setup = None, daemon = False): + """ + Function to start a service provider. + + SOCKET is the socket to connect to, usually tripesock. + + TAB is a list of entries. An entry may be either a tuple + + (NAME, VERSION, COMMANDS) + + or a service object (e.g., a TripeService instance). + + COMMANDS is a dictionary mapping command names to tuples + + (MIN, MAX, USAGE, FUNC) + + of arguments for a TripeServiceCommand object. + + If DAEMON is true, then the process is forked into the background before we + start. If INIT is given, it is called in the main coroutine, immediately + after forking. If SETUP is given, it is called in a coroutine, after + calling INIT and setting up the services but before marking the service + manager as running. + + It is a really bad idea to do any initialization, particularly setting up + coroutines, outside of the INIT or SETUP functions. In particular, if + we're using rmcr for fake coroutines, the daemonizing fork will kill off + the currently established coroutines in a most surprising way. + + The function runs a main select loop until the service manager decides to + quit. + """ + + global _spawnq + svcmgr.socket = socket + svcmgr.connect() + svcs = [] + for service in tab: + if not isinstance(service, tuple): + svcs.append(service) + else: + name, version, commands = service + cmdmap = {} + for cmd, stuff in commands.iteritems(): + cmdmap[cmd] = TripeServiceCommand(cmd, *stuff) + svcs.append(TripeService(name, version, cmdmap)) + if daemon: + M.daemonize() + if init is not None: + init() + Coroutine(_setupsvc).switch(svcs, setup) + while not svcmgr.quitp(): + for cr, args, kw in _spawnq: + cr.switch(*args, **kw) + _spawnq = [] + M.select() + +def spawn(cr, *args, **kw): + """ + Utility for spawning coroutines. + + The coroutine CR is made to be a direct child of the root coroutine, and + invoked by it with the given arguments. + """ + cr.parent = rootcr + _spawnq.append((cr, args, kw)) + +###-------------------------------------------------------------------------- +### Utilities for services. + +_timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400} +def timespec(spec): + """Parse the timespec SPEC, returning a number of seconds.""" + mul = 1 + if len(spec) > 1 and spec[-1] in _timeunits: + mul = _timeunits[spec[-1]] + spec = spec[:-1] + try: + t = int(spec) + except: + raise TripeJobError('bad-time-spec', spec) + if t < 0: + raise TripeJobError('bad-time-spec', spec) + return mul * int(spec) + +class OptParse (object): + """ + Parse options from a command list in the conventional fashion. + + ARGS is a list of arguments to a command. ALLOWED is a sequence of allowed + options. The returned values are the option tags. During parsing, the + `arg' method may be used to retrieve the argument for the most recent + option. Afterwards, `rest' may be used to retrieve the remaining + non-option arguments, and do a simple check on how many there are. + + The parser correctly handles `--' option terminators. + """ + + def __init__(me, args, allowed): + """ + Create a new option parser. + + The parser will scan the ARGS for options given in the sequence ALLOWED + (which are expected to include the `-' prefix). + """ + me.allowed = {} + for a in allowed: + me.allowed[a] = True + me.args = list(args) + + def __iter__(me): + """Iterator protocol: I am my own iterator.""" + return me + + def next(me): + """ + Iterator protocol: return the next option. + + If we've run out, raise StopIteration. + """ + if len(me.args) == 0 or \ + len(me.args[0]) < 2 or \ + not me.args[0].startswith('-'): + raise StopIteration + opt = me.args.pop(0) + if opt == '--': + raise StopIteration + if opt not in me.allowed: + raise TripeSyntaxError + return opt + + def arg(me): + """ + Return the argument for the most recent option. + + If none is available, raise TripeSyntaxError. + """ + if len(me.args) == 0: + raise TripeSyntaxError + return me.args.pop(0) + + def rest(me, min = None, max = None): + """ + After option parsing is done, return the remaining arguments. + + Check that there are at least MIN and at most MAX arguments remaining -- + either may be None to suppress the check. + """ + if (min is not None and len(me.args) < min) or \ + (max is not None and len(me.args) > max): + raise TripeSyntaxError + return me.args + +###----- That's all, folks -------------------------------------------------- -- [mdw]