3 ### Administration connection with tripe server
5 ### (c) 2006 Straylight/Edgeware
8 ###----- Licensing notice ---------------------------------------------------
10 ### This file is part of Trivial IP Encryption (TrIPE).
12 ### TrIPE is free software; you can redistribute it and/or modify
13 ### it under the terms of the GNU General Public License as published by
14 ### the Free Software Foundation; either version 2 of the License, or
15 ### (at your option) any later version.
17 ### TrIPE is distributed in the hope that it will be useful,
18 ### but WITHOUT ANY WARRANTY; without even the implied warranty of
19 ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 ### GNU General Public License for more details.
22 ### You should have received a copy of the GNU General Public License
23 ### along with TrIPE; if not, write to the Free Software Foundation,
24 ### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
27 This module provides classes and functions for connecting to a running tripe
28 server, sending it commands, receiving and processing replies, and
29 implementing services.
31 Rather than end up in lost in a storm of little event-driven classes, or a
32 morass of concurrent threads, the module uses coroutines to present a fairly
33 simple function call/return interface to potentially long-running commands
34 which must run without blocking the main process. It assumes a coroutine
35 module presenting a subset of the `greenlet' interface: if actual greenlets
36 are available, they are used; otherwise there's an implementation in terms of
37 threads (with lots of locking) which will do instead.
39 The simple rule governing the coroutines used here is this:
41 * The root coroutine never cares what values are passed to it when it
42 resumes: it just discards them.
44 * Other, non-root, coroutines are presumed to be waiting for some specific
47 Configuration variables:
55 Other useful variables:
79 TripeSynchronousCommand
80 TripeAsynchronousCommand
83 TripeCommandDispatcher
97 __pychecker__ = 'self=me no-constCond no-argsused'
101 ###--------------------------------------------------------------------------
102 ### External dependencies.
112 if OS.getenv('TRIPE_FORCE_RMCR') is not None:
114 from py.magic import greenlet as _Coroutine
116 from rmcr import Coroutine as _Coroutine
118 ###--------------------------------------------------------------------------
119 ### Coroutine hacking.
121 rootcr = _Coroutine.getcurrent()
123 class Coroutine (_Coroutine):
125 A coroutine class which can only be invoked by the root coroutine.
127 The root, by construction, cannot be an instance of this class.
129 def switch(me, *args, **kw):
130 assert _Coroutine.getcurrent() is rootcr
131 _Coroutine.switch(me, *args, **kw)
133 ###--------------------------------------------------------------------------
134 ### Default places for things.
136 configdir = OS.environ.get('TRIPEDIR', "@configdir@")
137 socketdir = "@socketdir@"
138 PACKAGE = "@PACKAGE@"
139 VERSION = "@VERSION@"
141 tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
142 peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb')
144 ###--------------------------------------------------------------------------
145 ### Connection to the server.
147 def readnonblockingly(sock, len):
149 Nonblocking read from SOCK.
151 Try to return LEN bytes. If couldn't read anything, return None. EOF is
152 returned as an empty string.
156 return sock.recv(len)
158 if exc[0] == E.EWOULDBLOCK:
162 class TripeConnectionError (StandardError):
163 """Something happened to the connection with the server."""
165 class TripeInternalError (StandardError):
166 """This program is very confused."""
169 class TripeConnection (object):
171 A logical connection to the tripe administration socket.
173 There may or may not be a physical connection. (This is needed for the
174 monitor, for example.)
176 This class isn't very useful on its own, but it has useful subclasses. At
177 this level, the class is agnostic about I/O multiplexing schemes; that gets
181 def __init__(me, socket):
183 Make a connection to the named SOCKET.
185 No physical connection is made initially.
193 Ensure that there's a physical connection.
195 Do nothing if we're already connected. Invoke the `connected' method if
199 sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
200 sock.connect(me.socket)
202 me.lbuf = M.LineBuffer(me.line, me._eof)
207 def disconnect(me, reason):
209 Disconnect the physical connection.
211 Invoke the `disconnected' method, giving the provided REASON, which
212 should be either None or an exception.
214 if not me.sock: return
215 me.disconnected(reason)
224 Return true if there's a current, believed-good physical connection.
226 return me.sock is not None
228 __nonzero__ = connectedp
232 Send the LINE to the connection's socket.
234 All output is done through this method; it can be overridden to provide
235 proper nonblocking writing, though this seems generally unnecessary.
238 me.sock.setblocking(1)
239 me.sock.send(line + '\n')
240 except Exception, exc:
247 Receive whatever's ready from the connection's socket.
249 Call `line' on each complete line, and `eof' if the connection closed.
250 Subclasses which attach this class to an I/O-event system should call
251 this method when the socket (CONN.sock) is ready for reading.
253 while me.sock is not None:
255 buf = readnonblockingly(me.sock, 16384)
256 except Exception, exc:
268 """Internal end-of-file handler."""
269 me.disconnect(TripeConnectionError('connection lost'))
274 To be overridden by subclasses to react to a connection being
279 def disconnected(me, reason):
281 To be overridden by subclasses to react to a connection being severed.
286 """To be overridden by subclasses to handle end-of-file."""
290 """To be overridden by subclasses to handle incoming lines."""
293 ###--------------------------------------------------------------------------
294 ### Dispatching coroutine.
296 ## Match a string if it can stand on its own as a bareword: i.e., it doesn't
297 ## contain backslashes, quotes or whitespace.
298 rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
300 ## Match characters which need to be escaped, even in quoted text.
301 rx_weird = RX.compile(r'([\\\'])')
304 """Quote S according to the tripe-admin(5) rules."""
305 m = rx_ordinary.match(s)
306 if m and m.end() == len(s):
309 return "'" + rx_weird.sub(r'\\\1', s) + "'"
313 Return a wrapper for FUNC which reports exceptions thrown by it.
315 Useful in the case of callbacks invoked by C functions which ignore
320 return func(*a, **kw)
322 SYS.excepthook(*SYS.exc_info())
326 class TripeCommand (object):
328 This abstract class represents a command in progress.
330 The `words' attribute contains the list of tokens which make up the
333 Subclasses must implement a method to handle server responses:
335 * response(CODE, *ARGS): CODE is one of the strings 'OK', 'INFO' or
336 'FAIL'; ARGS are the remaining tokens from the server's response.
339 def __init__(me, words):
340 """Make a new command consisting of the given list of WORDS."""
343 class TripeSynchronousCommand (TripeCommand):
345 A simple command, processed apparently synchronously.
347 Must be invoked from a coroutine other than the root (or whichever one is
348 running the dispatcher); in reality, other coroutines carry on running
349 while we wait for a response from the server.
351 Each server response causes the calling coroutine to be resumed with the
352 pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
353 or `FAIL') and REST is a list of the server's other response tokens. The
354 calling coroutine must continue switching back to the dispatcher until a
355 terminating response (`OK' or `FAIL') is received or become very
358 Mostly it's better to use the TripeCommandIterator to do this
362 def __init__(me, words):
363 """Initialize the command, specifying the WORDS to send to the server."""
364 TripeCommand.__init__(me, words)
365 me.owner = Coroutine.getcurrent()
367 def response(me, code, *rest):
368 """Handle a server response by forwarding it to the calling coroutine."""
369 me.owner.switch((code, rest))
371 class TripeError (StandardError):
373 A tripe command failed with an error (a FAIL code). The args attribute
374 contains a list of the server's message tokens.
378 class TripeCommandIterator (object):
380 Iterator interface to a tripe command.
382 The values returned by the iterator are lists of tokens from the server's
383 INFO lines, as processed by the given filter function, if any. The
384 iterator completes normally (by raising StopIteration) if the server
385 reported OK, and raises an exception if the command failed for some reason.
387 A TripeError is raised if the server issues a FAIL code. If the connection
388 failed, some other exception is raised.
391 def __init__(me, dispatcher, words, bg = False, filter = None):
393 Create a new command iterator.
395 The command is submitted to the DISPATCHER; it consists of the given
396 WORDS. If BG is true, then an option is inserted to request that the
397 server run the command in the background. The FILTER is applied to the
398 token lists which the server responds, and the filter's output are the
399 items returned by the iterator.
401 me.dcr = Coroutine.getcurrent().parent
403 raise ValueError, 'must invoke from coroutine'
404 me.filter = filter or (lambda x: x)
406 words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
407 dispatcher.rawcommand(TripeSynchronousCommand(words))
410 """Iterator protocol: I am my own iterator."""
415 Iterator protocol: return the next piece of information from the server.
417 INFO responses are filtered and returned as the values of the iteration.
418 FAIL and CONNERR responses are turned into exceptions and raised.
419 Finally, OK is turned into StopIteration, which should cause a normal end
420 to the iteration process.
422 thing = me.dcr.switch()
425 return me.filter(rest)
428 elif code == 'CONNERR':
430 raise TripeConnectionError, 'connection terminated by user'
434 raise TripeError(*rest)
436 raise TripeInternalError \
437 ('unexpected tripe response %r' % ([code] + rest))
439 ### Simple utility functions for the TripeCommandIterator convenience
442 def _tokenjoin(words):
443 """Filter function: simply join the given tokens with spaces between."""
444 return ' '.join(words)
447 """Return a dictionary formed from the KEY=VALUE pairs returned by the
453 kv[w[:q]] = w[q + 1:]
457 """Raise an error if ITER contains any item."""
460 raise TripeInternalError('expected no response')
464 """If ITER contains a single item, return it; otherwise raise an error."""
467 raise TripeInternalError('expected only one line of response')
470 def _tracelike(iter):
471 """Handle a TRACE-like command. The result is a list of tuples (CHAR,
472 STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
473 disabled, `+' if enabled, maybe something else later), and DESC is the
474 human-readable description."""
479 desc = ' '.join(ww[1:])
480 stuff.append((ch, st, desc))
483 def _kwopts(kw, allowed):
484 """Parse keyword arguments into options. ALLOWED is a list of allowable
485 keywords; raise errors if other keywords are present. KEY = VALUE becomes
486 an option pair -KEY VALUE if VALUE is a string, just the option -KEY if
487 VALUE is a true non-string, or nothing if VALUE is false.. Insert a `--'
488 at the end to stop the parser getting confused."""
491 for a in allowed: amap[a] = True
492 for k, v in kw.iteritems():
494 raise ValueError('option %s not allowed here' % k)
495 if isinstance(v, str):
502 class TripeCommandDispatcher (TripeConnection):
506 The command dispatcher is a connection which knows how to handle commands.
507 This is probably the most important class in this module to understand.
509 Lines from the server are parsed into tokens. The first token is a code
510 (OK or NOTE or something) explaining what kind of line this is. The
511 `handler' attribute is a dictionary mapping server line codes to handler
512 functions, which are applied to the words of the line as individual
513 arguments. *Exception*: the content of TRACE lines is not tokenized.
515 There are default handlers for server codes which respond to commands.
516 Commands arrive as TripeCommand instances through the `rawcommand'
517 interface. The dispatcher keeps track of which command objects represent
518 which jobs, and sends responses on to the appropriate command objects by
519 invoking their `response' methods. Command objects don't see the
520 BG... codes, because the dispatcher has already transformed them into
521 regular codes when it was looking up job code.
523 The dispatcher also has a special response code of its own: CONNERR
524 indicates that the connection failed and the command has therefore been
528 ## --- Infrastructure ---
530 ## We will get confused if we pipeline commands. Send them one at a time.
531 ## Only send a command when the previous one detaches or completes.
533 ## The following attributes are interesting:
535 ## tagseq Sequence number for next background job (for bgtag)
537 ## queue Commands awaiting submission.
539 ## cmd Mapping from job tags to commands: cmd[None] is the
540 ## foreground command.
542 ## handler Mapping from server codes to handler functions.
544 def __init__(me, socket):
546 Initialize the dispatcher.
548 The SOCKET is the filename of the administration socket to connect to,
549 for TripeConnection.__init__.
551 TripeConnection.__init__(me, socket)
554 me.handler['BGDETACH'] = me._detach
555 for i in 'BGOK', 'BGINFO', 'BGFAIL':
556 me.handler[i] = me._response
557 for i in 'OK', 'INFO', 'FAIL':
558 me.handler[i] = me._fgresponse
564 If a subclass overrides this method, it must call us; clears out the
565 command queue and job map.
570 def disconnected(me, reason):
574 If a subclass hooks overrides this method, it must call us; sends a
575 special CONNERR code to all incomplete commands.
577 for cmd in me.cmd.itervalues():
578 cmd.response('CONNERR', reason)
580 cmd.response('CONNERR', reason)
584 """Handle an incoming line, sending it to the right place."""
585 if _debug: print '<', line
586 code, rest = M.word(line, quotep = True)
587 func = me.handler.get(code)
592 func(code, *M.split(rest, quotep = True)[0])
597 Pull the oldest command off the queue and try to send it to the server.
599 if not me.queue or None in me.cmd: return
600 cmd = me.queue.shift()
601 if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
602 me.send(' '.join([quotify(w) for w in cmd.words]))
607 Return an unused job tag.
609 May be of use when composing commands by hand.
611 tag = 'J%05d' % me.tagseq
615 ## --- Built-in handler functions for server responses ---
617 def _detach(me, _, tag):
619 Respond to a BGDETACH TAG message.
621 Move the current foreground command to the background.
623 assert tag not in me.cmd
624 me.cmd[tag] = me.cmd[None]
627 def _response(me, code, tag, *w):
629 Respond to an OK, INFO or FAIL message.
631 If this is a message for a background job, find the tag; then dispatch
632 the result to the command object.
634 if code.startswith('BG'):
639 cmd.response(code, *w)
641 def _fgresponse(me, code, *w):
642 """Process responses to the foreground command."""
643 me._response(code, None, *w)
645 ## --- Interface methods ---
647 def rawcommand(me, cmd):
649 Submit the TripeCommand CMD to the server, and look after it until it
652 if not me.connectedp():
653 raise TripeConnectionError('connection closed')
657 def command(me, *cmd, **kw):
658 """Convenience wrapper for creating a TripeCommandIterator object."""
659 return TripeCommandIterator(me, cmd, **kw)
661 ## --- Convenience methods for server commands ---
663 def add(me, peer, *addr, **kw):
664 return _simple(me.command(bg = True,
666 _kwopts(kw, ['tunnel', 'keepalive', 'cork']) +
670 return _oneline(me.command('ADDR', peer))
672 return _keyvals(me.command('ALGS'))
673 def checkchal(me, chal):
674 return _simple(me.command('CHECKCHAL', chal))
676 return _simple(me.command('DAEMON'))
677 def eping(me, peer, **kw):
678 return _oneline(me.command(bg = True,
680 _kwopts(kw, ['timeout']) +
682 def forcekx(me, peer):
683 return _simple(me.command('FORCEKX', peer))
685 return _oneline(me.command('GETCHAL', filter = _tokenjoin))
686 def greet(me, peer, chal):
687 return _simple(me.command('GREET', peer, chal))
689 return list(me.command('HELP', filter = _tokenjoin))
690 def ifname(me, peer):
691 return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
693 return _simple(me.command('KILL', peer))
695 return list(me.command('LIST', filter = _tokenjoin))
696 def notify(me, *msg):
697 return _simple(me.command('NOTIFY', *msg))
698 def peerinfo(me, peer):
699 return _keyvals(me.command('PEERINFO', peer))
700 def ping(me, peer, **kw):
701 return _oneline(me.command(bg = True,
703 _kwopts(kw, ['timeout']) +
706 return _oneline(me.command('PORT', filter = _tokenjoin))
708 return _simple(me.command('QUIT'))
710 return _simple(me.command('RELOAD'))
712 return _keyvals(me.command('SERVINFO'))
713 def setifname(me, new):
714 return _simple(me.command('SETIFNAME', new))
715 def svcclaim(me, service, version):
716 return _simple(me.command('SVCCLAIM', service, version))
717 def svcensure(me, service, version = None):
718 return _simple(me.command('SVCENSURE', service,
719 *((version is not None and [version]) or [])))
720 def svcfail(me, job, *msg):
721 return _simple(me.command('SVCFAIL', job, *msg))
722 def svcinfo(me, job, *msg):
723 return _simple(me.command('SVCINFO', job, *msg))
725 return list(me.command('SVCLIST'))
727 return _simple(me.command('SVCOK', job))
728 def svcquery(me, service):
729 return _keyvals(me.command('SVCQUERY', service))
730 def svcrelease(me, service):
731 return _simple(me.command('SVCRELEASE', service))
732 def svcsubmit(me, service, *args, **kw):
733 return me.command(bg = True,
735 _kwopts(kw, ['version']) +
739 return _keyvals(me.command('STATS', peer))
740 def trace(me, *args):
741 return _tracelike(me.command('TRACE', *args))
743 return list(me.command('TUNNELS', filter = _tokenjoin))
745 return _oneline(me.command('VERSION', filter = _tokenjoin))
747 return _simple(me.command('WARN', *msg))
748 def watch(me, *args):
749 return _tracelike(me.command('WATCH', *args))
751 ###--------------------------------------------------------------------------
752 ### Asynchronous commands.
754 class Queue (object):
756 A queue of things arriving asynchronously.
758 This is a very simple single-reader multiple-writer queue. It's useful for
759 more complex coroutines which need to cope with a variety of possible
764 """Create a new empty queue."""
765 me.contents = M.Array()
770 Internal: wait for an item to arrive in the queue.
772 Complain if someone is already waiting, because this is just a
776 raise ValueError('queue already being waited on')
778 me.waiter = Coroutine.getcurrent()
779 while not me.contents:
780 me.waiter.parent.switch()
786 Remove and return the item at the head of the queue.
788 If the queue is empty, wait until an item arrives.
791 return me.contents.shift()
795 Return the item at the head of the queue without removing it.
797 If the queue is empty, wait until an item arrives.
800 return me.contents[0]
804 Write THING to the queue.
806 If someone is waiting on the queue, wake him up immediately; otherwise
807 just leave the item there for later.
809 me.contents.push(thing)
813 class TripeAsynchronousCommand (TripeCommand):
815 Asynchronous commands.
817 This is the complicated way of issuing commands. You must set up a queue,
818 and associate the command with the queue. Responses arriving for the
819 command will be put on the queue as an triple of the form (TAG, CODE, REST)
820 -- where TAG is an object of your choice, not interpreted by this class,
821 CODE is the server's response code (OK, INFO, FAIL), and REST is the list
822 of the rest of the server's tokens.
824 Using this, you can write coroutines which process many commands (and
825 possibly other events) simultaneously.
828 def __init__(me, queue, tag, words):
829 """Make an asynchronous command consisting of the given WORDS, which
830 sends responses to QUEUE, labelled with TAG."""
831 TripeCommand.__init__(me, words)
835 def response(me, code, *stuff):
836 """Handle a server response by writing it to the caller's queue."""
837 me.queue.put((me.tag, code, list(stuff)))
839 ###--------------------------------------------------------------------------
840 ### Selecting command dispatcher.
842 class SelCommandDispatcher (TripeCommandDispatcher):
844 A command dispatcher which integrates with mLib's I/O-event system.
846 To use, simply create an instance and run mLib.select in a loop in your
850 def __init__(me, socket):
852 Create an instance; SOCKET is the admin socket to connect to.
854 Note that no connection is made initially.
856 TripeCommandDispatcher.__init__(me, socket)
860 """Connection hook: wires itself into the mLib select machinery."""
861 TripeCommandDispatcher.connected(me)
862 me.selfile = M.SelFile(me.sock.fileno(), M.SEL_READ, me.receive)
865 def disconnected(me, reason):
866 """Disconnection hook: removes itself from the mLib select machinery."""
867 TripeCommandDispatcher.disconnected(me, reason)
870 ###--------------------------------------------------------------------------
873 class TripeJobCancelled (Exception):
875 Exception sent to job handler if the client kills the job.
877 Not propagated further.
881 class TripeJobError (Exception):
883 Exception to cause failure report for running job.
885 Sends an SVCFAIL code back.
889 class TripeSyntaxError (Exception):
891 Exception to report a syntax error for a job.
893 Sends an SVCFAIL bad-svc-syntax message back.
897 class TripeServiceManager (SelCommandDispatcher):
899 A command dispatcher with added handling for incoming service requests.
901 There is usually only one instance of this class, called svcmgr. Some of
902 the support functions in this module assume that this is the case.
904 To use, run mLib.select in a loop until the quitp method returns true;
905 then, in a non-root coroutine, register your services by calling `add', and
906 then call `running' when you've finished setting up.
908 The instance handles server service messages SVCJOB, SVCCANCEL and
909 SVCCLAIM. It maintains a table of running services. Incoming jobs cause
910 the service's `job' method to be invoked; SVCCANCEL sends a
911 TripeJobCancelled exception to the handler coroutine, and SVCCLAIM causes
912 the relevant service to be deregistered.
914 There is no base class for jobs, but a job must implement two methods:
916 start() Begin processing; might be a no-op.
918 cancel() Stop processing; the original client has killed the
921 The life of a service manager is divided into two parts: setup and running;
922 you tell the manager that you've finished setting up by calling the
923 `running' method. If, at any point after setup is finished, there are no
924 remaining services or jobs, `quitp' will return true, ending the process.
927 ## --- Attributes ---
929 ## svc Mapping name -> service object
931 ## job Mapping jobid -> job handler coroutine
933 ## runningp True when setup is finished
935 ## _quitp True if explicit quit has been requested
937 def __init__(me, socket):
939 Initialize the service manager.
941 SOCKET is the administration socket to connect to.
943 SelCommandDispatcher.__init__(me, socket)
947 me.handler['SVCCANCEL'] = me._cancel
948 me.handler['SVCJOB'] = me._job
949 me.handler['SVCCLAIM'] = me._claim
953 """Register a new service; SVC is a TripeService instance."""
954 assert svc.name not in me.svc
955 me.svcclaim(svc.name, svc.version)
956 me.svc[svc.name] = svc
958 def _cancel(me, _, jid):
960 Called when the server cancels a job; invokes the job's `cancel' method.
966 def _claim(me, _, svc, __):
967 """Called when another program claims our service at a higher version."""
970 def _job(me, _, jid, svc, cmd, *args):
972 Called when the server sends us a job to do.
974 Calls the service to collect a job, and begins processing it.
976 assert jid not in me.job
977 svc = me.svc[svc.lower()]
978 job = svc.job(jid, cmd, args)
983 """Answer true if setup is finished."""
986 def jobdone(me, jid):
987 """Informs the service manager that the job with id JID has finished."""
995 Return true if no services or jobs are active (and, therefore, if this
996 process can quit without anyone caring).
998 return me._quitp or (me.runningp and ((not me.svc and not me.job) or
1002 """Forces the quit flag (returned by quitp) on."""
1005 class TripeService (object):
1009 The NAME and VERSION are passed on to the server. The CMDTAB is a
1010 dictionary mapping command names (in lowercase) to command objects.
1012 If the CMDTAB doesn't have entries for commands HELP and QUIT then defaults
1015 TripeService itself is mostly agnostic about the nature of command objects,
1016 but the TripeServiceJob class (below) has some requirements. The built-in
1017 HELP command requires command objects to have `usage' attributes.
1020 def __init__(me, name, version, cmdtab):
1022 Create and register a new service with the given NAME and VERSION.
1024 CMDTAB maps command names (in lower-case) to command objects.
1027 me.version = version
1030 me.cmd.setdefault('help',
1031 TripeServiceCommand('help', 0, 0, '', me._help))
1032 me.cmd.setdefault('quit',
1033 TripeServiceCommand('quit', 0, 0, '', me._quit))
1035 def job(me, jid, cmd, args):
1037 Called by the service manager: a job arrived with id JID.
1039 It asks for comamnd CMD with argument list ARGS. Creates a new job,
1040 passing it the information needed.
1042 return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1044 ## Simple default command handlers, complying with the spec in
1045 ## tripe-service(7).
1048 """Send a help summary to the user."""
1049 cmds = me.cmd.items()
1051 for name, cmd in cmds:
1052 svcinfo(name, *cmd.usage)
1055 """Terminate the service manager."""
1056 svcmgr.notify('svc-quit', me.name, 'admin-request')
1059 class TripeServiceCommand (object):
1060 """A simple service command."""
1062 def __init__(me, name, min, max, usage, func):
1064 Creates a new command.
1066 NAME is the command's name (in lowercase).
1068 MIN and MAX are the minimum and maximum number of allowed arguments (used
1069 for checking); either may be None to indicate no minimum or maximum.
1071 USAGE is a usage string, used for generating help and error messages.
1073 FUNC is the function to invoke.
1078 me.usage = usage.split()
1083 Called when the command is invoked.
1085 Does minimal checking of the arguments and calls the supplied function.
1087 if (me.min is not None and len(args) < me.min) or \
1088 (me.max is not None and len(args) > me.max):
1089 raise TripeSyntaxError
1092 class TripeServiceJob (Coroutine):
1094 Job handler coroutine.
1096 A standard TripeService invokes a TripeServiceJob for each incoming job
1097 request, passing it the jobid, command and arguments, and a command
1098 object. The command object needs the following attributes.
1100 usage A usage list (excluding the command name) showing
1101 arguments and options.
1103 run(*ARGS) Function to react to the command with ARGS split into
1104 separate arguments. Invoked in a coroutine. The
1105 svcinfo function (not the TripeCommandDispatcher
1106 method) may be used to send INFO lines. The function
1107 may raise TripeJobError to send a FAIL response back,
1108 or TripeSyntaxError to send a generic usage error.
1109 TripeJobCancelled exceptions are trapped silently.
1110 Other exceptions are translated into a generic
1111 internal-error message.
1113 This class automatically takes care of sending some closing response to the
1114 job, and for informing the service manager that the job is completed.
1116 The `jid' attribute stores the job's id.
1119 def __init__(me, jid, svc, cmd, command, args):
1123 The job is created with id JID, for service SVC, processing command name
1124 CMD (which the service resolved into the command object COMMAND, or
1125 None), and with the arguments ARGS.
1127 Coroutine.__init__(me)
1131 me.command = command
1136 Main body of the coroutine.
1138 Does the tedious exception handling boilerplate and invokes the command's
1143 if me.command is None:
1144 svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1146 me.command.run(*me.args)
1147 svcmgr.svcok(me.jid)
1148 except TripeJobError, exc:
1149 svcmgr.svcfail(me.jid, *exc.args)
1150 except TripeSyntaxError:
1151 svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1152 me.svc.name, me.command.name,
1154 except TripeJobCancelled:
1156 except Exception, exc:
1157 svcmgr.svcfail(me.jid, 'svc-internal-error',
1158 exc.__class__.__name__, str(exc))
1160 svcmgr.jobdone(me.jid)
1163 """Invoked by the service manager to start running the coroutine."""
1167 """Invoked by the service manager to cancel the job."""
1168 me.throw(TripeJobCancelled())
1172 If invoked from a TripeServiceJob coroutine, sends an INFO line to the
1173 job's sender, automatically using the correct job id.
1175 svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1177 def _setupsvc(tab, func):
1179 Setup coroutine for setting up service programs.
1181 Register the given services.
1185 svcmgr.addsvc(service)
1191 svcmgr = TripeServiceManager(None)
1193 def runservices(socket, tab, init = None, setup = None, daemon = False):
1195 Function to start a service provider.
1197 SOCKET is the socket to connect to, usually tripesock.
1199 TAB is a list of entries. An entry may be either a tuple
1201 (NAME, VERSION, COMMANDS)
1203 or a service object (e.g., a TripeService instance).
1205 COMMANDS is a dictionary mapping command names to tuples
1207 (MIN, MAX, USAGE, FUNC)
1209 of arguments for a TripeServiceCommand object.
1211 If DAEMON is true, then the process is forked into the background before we
1212 start. If INIT is given, it is called in the main coroutine, immediately
1213 after forking. If SETUP is given, it is called in a coroutine, after
1214 calling INIT and setting up the services but before marking the service
1217 It is a really bad idea to do any initialization, particularly setting up
1218 coroutines, outside of the INIT or SETUP functions. In particular, if
1219 we're using rmcr for fake coroutines, the daemonizing fork will kill off
1220 the currently established coroutines in a most surprising way.
1222 The function runs a main select loop until the service manager decides to
1227 svcmgr.socket = socket
1231 if not isinstance(service, tuple):
1232 svcs.append(service)
1234 name, version, commands = service
1236 for cmd, stuff in commands.iteritems():
1237 cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1238 svcs.append(TripeService(name, version, cmdmap))
1241 if init is not None:
1243 Coroutine(_setupsvc).switch(svcs, setup)
1244 while not svcmgr.quitp():
1245 for cr, args, kw in _spawnq:
1246 cr.switch(*args, **kw)
1250 def spawn(cr, *args, **kw):
1252 Utility for spawning coroutines.
1254 The coroutine CR is made to be a direct child of the root coroutine, and
1255 invoked by it with the given arguments.
1258 _spawnq.append((cr, args, kw))
1260 ###--------------------------------------------------------------------------
1261 ### Utilities for services.
1263 _timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
1265 """Parse the timespec SPEC, returning a number of seconds."""
1267 if len(spec) > 1 and spec[-1] in _timeunits:
1268 mul = _timeunits[spec[-1]]
1273 raise TripeJobError('bad-time-spec', spec)
1275 raise TripeJobError('bad-time-spec', spec)
1276 return mul * int(spec)
1278 class OptParse (object):
1280 Parse options from a command list in the conventional fashion.
1282 ARGS is a list of arguments to a command. ALLOWED is a sequence of allowed
1283 options. The returned values are the option tags. During parsing, the
1284 `arg' method may be used to retrieve the argument for the most recent
1285 option. Afterwards, `rest' may be used to retrieve the remaining
1286 non-option arguments, and do a simple check on how many there are.
1288 The parser correctly handles `--' option terminators.
1291 def __init__(me, args, allowed):
1293 Create a new option parser.
1295 The parser will scan the ARGS for options given in the sequence ALLOWED
1296 (which are expected to include the `-' prefix).
1300 me.allowed[a] = True
1301 me.args = list(args)
1304 """Iterator protocol: I am my own iterator."""
1309 Iterator protocol: return the next option.
1311 If we've run out, raise StopIteration.
1313 if len(me.args) == 0 or \
1314 len(me.args[0]) < 2 or \
1315 not me.args[0].startswith('-'):
1317 opt = me.args.pop(0)
1320 if opt not in me.allowed:
1321 raise TripeSyntaxError
1326 Return the argument for the most recent option.
1328 If none is available, raise TripeSyntaxError.
1330 if len(me.args) == 0:
1331 raise TripeSyntaxError
1332 return me.args.pop(0)
1334 def rest(me, min = None, max = None):
1336 After option parsing is done, return the remaining arguments.
1338 Check that there are at least MIN and at most MAX arguments remaining --
1339 either may be None to suppress the check.
1341 if (min is not None and len(me.args) < min) or \
1342 (max is not None and len(me.args) > max):
1343 raise TripeSyntaxError
1346 ###----- That's all, folks --------------------------------------------------