3 ### Administration connection with tripe server
5 ### (c) 2006 Straylight/Edgeware
8 ###----- Licensing notice ---------------------------------------------------
10 ### This file is part of Trivial IP Encryption (TrIPE).
12 ### TrIPE is free software; you can redistribute it and/or modify
13 ### it under the terms of the GNU General Public License as published by
14 ### the Free Software Foundation; either version 2 of the License, or
15 ### (at your option) any later version.
17 ### TrIPE is distributed in the hope that it will be useful,
18 ### but WITHOUT ANY WARRANTY; without even the implied warranty of
19 ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 ### GNU General Public License for more details.
22 ### You should have received a copy of the GNU General Public License
23 ### along with TrIPE; if not, write to the Free Software Foundation,
24 ### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
27 This module provides classes and functions for connecting to a running tripe
28 server, sending it commands, receiving and processing replies, and
29 implementing services.
31 Rather than end up in lost in a storm of little event-driven classes, or a
32 morass of concurrent threads, the module uses coroutines to present a fairly
33 simple function call/return interface to potentially long-running commands
34 which must run without blocking the main process. It assumes a coroutine
35 module presenting a subset of the `greenlet' interface: if actual greenlets
36 are available, they are used; otherwise there's an implementation in terms of
37 threads (with lots of locking) which will do instead.
39 The simple rule governing the coroutines used here is this:
41 * The root coroutine never cares what values are passed to it when it
42 resumes: it just discards them.
44 * Other, non-root, coroutines are presumed to be waiting for some specific
47 Configuration variables:
55 Other useful variables:
80 TripeSynchronousCommand
81 TripeAsynchronousCommand
84 TripeCommandDispatcher
97 __pychecker__ = 'self=me no-constCond no-argsused'
101 ###--------------------------------------------------------------------------
102 ### External dependencies.
112 if OS.getenv('TRIPE_FORCE_RMCR') is not None:
114 from py.magic import greenlet as _Coroutine
116 from rmcr import Coroutine as _Coroutine
118 ###--------------------------------------------------------------------------
119 ### Coroutine hacking.
121 rootcr = _Coroutine.getcurrent()
123 class Coroutine (_Coroutine):
125 A coroutine class which can only be invoked by the root coroutine.
127 The root, by construction, cannot be an instance of this class.
129 def switch(me, *args, **kw):
130 assert _Coroutine.getcurrent() is rootcr
131 _Coroutine.switch(me, *args, **kw)
133 ###--------------------------------------------------------------------------
134 ### Default places for things.
136 configdir = OS.environ.get('TRIPEDIR', "@configdir@")
137 socketdir = "@socketdir@"
138 PACKAGE = "@PACKAGE@"
139 VERSION = "@VERSION@"
141 tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
142 peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb')
144 ###--------------------------------------------------------------------------
145 ### Connection to the server.
147 def readnonblockingly(sock, len):
149 Nonblocking read from SOCK.
151 Try to return LEN bytes. If couldn't read anything, return None. EOF is
152 returned as an empty string.
156 return sock.recv(len)
158 if exc[0] == E.EWOULDBLOCK:
162 class TripeConnectionError (StandardError):
163 """Something happened to the connection with the server."""
165 class TripeInternalError (StandardError):
166 """This program is very confused."""
169 class TripeConnection (object):
171 A logical connection to the tripe administration socket.
173 There may or may not be a physical connection. (This is needed for the
174 monitor, for example.)
176 This class isn't very useful on its own, but it has useful subclasses. At
177 this level, the class is agnostic about I/O multiplexing schemes; that gets
181 def __init__(me, socket):
183 Make a connection to the named SOCKET.
185 No physical connection is made initially.
190 me.iowatch = SelIOWatcher(me)
194 Ensure that there's a physical connection.
196 Do nothing if we're already connected. Invoke the `connected' method if
200 sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
201 sock.connect(me.socket)
203 me.lbuf = M.LineBuffer(me.line, me._eof)
208 def disconnect(me, reason):
210 Disconnect the physical connection.
212 Invoke the `disconnected' method, giving the provided REASON, which
213 should be either None or an exception.
215 if not me.sock: return
216 me.disconnected(reason)
225 Return true if there's a current, believed-good physical connection.
227 return me.sock is not None
229 __nonzero__ = connectedp
233 Send the LINE to the connection's socket.
235 All output is done through this method; it can be overridden to provide
236 proper nonblocking writing, though this seems generally unnecessary.
239 me.sock.setblocking(1)
240 me.sock.send(line + '\n')
241 except Exception, exc:
248 Receive whatever's ready from the connection's socket.
250 Call `line' on each complete line, and `eof' if the connection closed.
251 Subclasses which attach this class to an I/O-event system should call
252 this method when the socket (CONN.sock) is ready for reading.
254 while me.sock is not None:
256 buf = readnonblockingly(me.sock, 16384)
257 except Exception, exc:
269 """Internal end-of-file handler."""
270 me.disconnect(TripeConnectionError('connection lost'))
275 To be overridden by subclasses to react to a connection being
278 me.iowatch.connected(me.sock)
280 def disconnected(me, reason):
282 To be overridden by subclasses to react to a connection being severed.
284 me.iowatch.disconnected()
287 """To be overridden by subclasses to handle end-of-file."""
291 """To be overridden by subclasses to handle incoming lines."""
294 ###--------------------------------------------------------------------------
295 ### I/O loop integration.
297 class SelIOWatcher (object):
299 Integration with mLib's I/O event system.
301 You can replace this object with a different one for integration with,
302 e.g., glib's main loop, by setting CONN.iowatcher to a different object
303 while the CONN is disconnected.
306 def __init__(me, conn):
310 def connected(me, sock):
312 Called when a connection is made.
314 SOCK is the socket. The watcher must arrange to call CONN.receive when
317 me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive)
320 def disconnected(me):
322 Called when the connection is lost.
328 Wait for something interesting to happen, and issue events.
330 That is, basically, do one iteration of a main select loop, processing
331 all of the events, and then return. This isn't needed for
332 TripeCommandDispatcher, but runservices wants it.
336 ###--------------------------------------------------------------------------
337 ### Inter-coroutine communication.
339 class Queue (object):
341 A queue of things arriving asynchronously.
343 This is a very simple single-reader multiple-writer queue. It's useful for
344 more complex coroutines which need to cope with a variety of possible
349 """Create a new empty queue."""
350 me.contents = M.Array()
355 Internal: wait for an item to arrive in the queue.
357 Complain if someone is already waiting, because this is just a
361 raise ValueError('queue already being waited on')
363 me.waiter = Coroutine.getcurrent()
364 while not me.contents:
365 me.waiter.parent.switch()
371 Remove and return the item at the head of the queue.
373 If the queue is empty, wait until an item arrives.
376 return me.contents.shift()
380 Return the item at the head of the queue without removing it.
382 If the queue is empty, wait until an item arrives.
385 return me.contents[0]
389 Write THING to the queue.
391 If someone is waiting on the queue, wake him up immediately; otherwise
392 just leave the item there for later.
394 me.contents.push(thing)
398 ###--------------------------------------------------------------------------
399 ### Dispatching coroutine.
401 ## Match a string if it can stand on its own as a bareword: i.e., it doesn't
402 ## contain backslashes, quotes or whitespace.
403 rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
405 ## Match characters which need to be escaped, even in quoted text.
406 rx_weird = RX.compile(r'([\\\'])')
409 """Quote S according to the tripe-admin(5) rules."""
410 m = rx_ordinary.match(s)
411 if m and m.end() == len(s):
414 return "'" + rx_weird.sub(r'\\\1', s) + "'"
418 Return a wrapper for FUNC which reports exceptions thrown by it.
420 Useful in the case of callbacks invoked by C functions which ignore
425 return func(*a, **kw)
427 SYS.excepthook(*SYS.exc_info())
431 class TripeCommand (object):
433 This abstract class represents a command in progress.
435 The `words' attribute contains the list of tokens which make up the
438 Subclasses must implement a method to handle server responses:
440 * response(CODE, *ARGS): CODE is one of the strings 'OK', 'INFO' or
441 'FAIL'; ARGS are the remaining tokens from the server's response.
444 def __init__(me, words):
445 """Make a new command consisting of the given list of WORDS."""
448 class TripeSynchronousCommand (TripeCommand):
450 A simple command, processed apparently synchronously.
452 Must be invoked from a coroutine other than the root (or whichever one is
453 running the dispatcher); in reality, other coroutines carry on running
454 while we wait for a response from the server.
456 Each server response causes the calling coroutine to be resumed with the
457 pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
458 or `FAIL') and REST is a list of the server's other response tokens. The
459 calling coroutine must continue switching back to the dispatcher until a
460 terminating response (`OK' or `FAIL') is received or become very
463 Mostly it's better to use the TripeCommandIterator to do this
467 def __init__(me, words):
468 """Initialize the command, specifying the WORDS to send to the server."""
469 TripeCommand.__init__(me, words)
470 me.owner = Coroutine.getcurrent()
472 def response(me, code, *rest):
473 """Handle a server response by forwarding it to the calling coroutine."""
474 me.owner.switch((code, rest))
476 class TripeError (StandardError):
478 A tripe command failed with an error (a FAIL code). The args attribute
479 contains a list of the server's message tokens.
483 class TripeCommandIterator (object):
485 Iterator interface to a tripe command.
487 The values returned by the iterator are lists of tokens from the server's
488 INFO lines, as processed by the given filter function, if any. The
489 iterator completes normally (by raising StopIteration) if the server
490 reported OK, and raises an exception if the command failed for some reason.
492 A TripeError is raised if the server issues a FAIL code. If the connection
493 failed, some other exception is raised.
496 def __init__(me, dispatcher, words, bg = False, filter = None):
498 Create a new command iterator.
500 The command is submitted to the DISPATCHER; it consists of the given
501 WORDS. If BG is true, then an option is inserted to request that the
502 server run the command in the background. The FILTER is applied to the
503 token lists which the server responds, and the filter's output are the
504 items returned by the iterator.
506 me.dcr = Coroutine.getcurrent().parent
508 raise ValueError, 'must invoke from coroutine'
509 me.filter = filter or (lambda x: x)
511 words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
512 dispatcher.rawcommand(TripeSynchronousCommand(words))
515 """Iterator protocol: I am my own iterator."""
520 Iterator protocol: return the next piece of information from the server.
522 INFO responses are filtered and returned as the values of the iteration.
523 FAIL and CONNERR responses are turned into exceptions and raised.
524 Finally, OK is turned into StopIteration, which should cause a normal end
525 to the iteration process.
527 thing = me.dcr.switch()
530 return me.filter(rest)
533 elif code == 'CONNERR':
535 raise TripeConnectionError, 'connection terminated by user'
539 raise TripeError(*rest)
541 raise TripeInternalError \
542 ('unexpected tripe response %r' % ([code] + rest))
544 ### Simple utility functions for the TripeCommandIterator convenience
547 def _tokenjoin(words):
548 """Filter function: simply join the given tokens with spaces between."""
549 return ' '.join(words)
552 """Return a dictionary formed from the KEY=VALUE pairs returned by the
558 kv[w[:q]] = w[q + 1:]
562 """Raise an error if ITER contains any item."""
565 raise TripeInternalError('expected no response')
569 """If ITER contains a single item, return it; otherwise raise an error."""
572 raise TripeInternalError('expected only one line of response')
575 def _tracelike(iter):
576 """Handle a TRACE-like command. The result is a list of tuples (CHAR,
577 STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
578 disabled, `+' if enabled, maybe something else later), and DESC is the
579 human-readable description."""
584 desc = ' '.join(ww[1:])
585 stuff.append((ch, st, desc))
588 def _kwopts(kw, allowed):
589 """Parse keyword arguments into options. ALLOWED is a list of allowable
590 keywords; raise errors if other keywords are present. KEY = VALUE becomes
591 an option pair -KEY VALUE if VALUE is a string, just the option -KEY if
592 VALUE is a true non-string, or nothing if VALUE is false.. Insert a `--'
593 at the end to stop the parser getting confused."""
596 for a in allowed: amap[a] = True
597 for k, v in kw.iteritems():
599 raise ValueError('option %s not allowed here' % k)
600 if isinstance(v, str):
609 def defer(func, *args, **kw):
610 """Call FUNC(*ARGS, **KW) later, in the root coroutine."""
611 _deferq.append((func, args, kw))
613 def spawn(func, *args, **kw):
614 """Call FUNC, passing ARGS and KW, in a fresh coroutine."""
615 defer(lambda: Coroutine(func).switch(*args, **kw))
621 Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW).
624 func, args, kw = _asideq.get()
628 SYS.excepthook(*SYS.exc_info())
630 def aside(func, *args, **kw):
631 """Call FUNC(*ARGS, **KW) later, in a non-root coroutine."""
632 defer(_asideq.put, (func, args, kw))
634 class TripeCommandDispatcher (TripeConnection):
638 The command dispatcher is a connection which knows how to handle commands.
639 This is probably the most important class in this module to understand.
641 Lines from the server are parsed into tokens. The first token is a code
642 (OK or NOTE or something) explaining what kind of line this is. The
643 `handler' attribute is a dictionary mapping server line codes to handler
644 functions, which are applied to the words of the line as individual
645 arguments. *Exception*: the content of TRACE lines is not tokenized.
647 There are default handlers for server codes which respond to commands.
648 Commands arrive as TripeCommand instances through the `rawcommand'
649 interface. The dispatcher keeps track of which command objects represent
650 which jobs, and sends responses on to the appropriate command objects by
651 invoking their `response' methods. Command objects don't see the
652 BG... codes, because the dispatcher has already transformed them into
653 regular codes when it was looking up job code.
655 The dispatcher also has a special response code of its own: CONNERR
656 indicates that the connection failed and the command has therefore been
660 ## --- Infrastructure ---
662 ## We will get confused if we pipeline commands. Send them one at a time.
663 ## Only send a command when the previous one detaches or completes.
665 ## The following attributes are interesting:
667 ## tagseq Sequence number for next background job (for bgtag)
669 ## queue Commands awaiting submission.
671 ## cmd Mapping from job tags to commands: cmd[None] is the
672 ## foreground command.
674 ## handler Mapping from server codes to handler functions.
676 def __init__(me, socket):
678 Initialize the dispatcher.
680 The SOCKET is the filename of the administration socket to connect to,
681 for TripeConnection.__init__.
683 TripeConnection.__init__(me, socket)
686 me.handler['BGDETACH'] = me._detach
687 for i in 'BGOK', 'BGINFO', 'BGFAIL':
688 me.handler[i] = me._response
689 for i in 'OK', 'INFO', 'FAIL':
690 me.handler[i] = me._fgresponse
693 """Should we quit the main loop? Subclasses should override."""
696 def mainloop(me, quitp = None):
698 Iterate the I/O watcher until QUITP returns true.
700 Arranges for asides and deferred calls to be made at the right times.
704 assert _Coroutine.getcurrent() is rootcr
705 Coroutine(_runasides).switch()
710 for func, args, kw in q:
718 If a subclass overrides this method, it must call us; clears out the
719 command queue and job map.
723 TripeConnection.connected(me)
725 def disconnected(me, reason):
729 If a subclass hooks overrides this method, it must call us; sends a
730 special CONNERR code to all incomplete commands.
732 TripeConnection.disconnected(me, reason)
733 for cmd in me.cmd.itervalues():
734 cmd.response('CONNERR', reason)
736 cmd.response('CONNERR', reason)
740 """Handle an incoming line, sending it to the right place."""
741 if _debug: print '<', line
742 code, rest = M.word(line, quotep = True)
743 func = me.handler.get(code)
748 func(code, *M.split(rest, quotep = True)[0])
753 Pull the oldest command off the queue and try to send it to the server.
755 if not me.queue or None in me.cmd: return
756 cmd = me.queue.shift()
757 if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
758 me.send(' '.join([quotify(w) for w in cmd.words]))
763 Return an unused job tag.
765 May be of use when composing commands by hand.
767 tag = 'J%05d' % me.tagseq
771 ## --- Built-in handler functions for server responses ---
773 def _detach(me, _, tag):
775 Respond to a BGDETACH TAG message.
777 Move the current foreground command to the background.
779 assert tag not in me.cmd
780 me.cmd[tag] = me.cmd[None]
783 def _response(me, code, tag, *w):
785 Respond to an OK, INFO or FAIL message.
787 If this is a message for a background job, find the tag; then dispatch
788 the result to the command object.
790 if code.startswith('BG'):
795 cmd.response(code, *w)
797 def _fgresponse(me, code, *w):
798 """Process responses to the foreground command."""
799 me._response(code, None, *w)
801 ## --- Interface methods ---
803 def rawcommand(me, cmd):
805 Submit the TripeCommand CMD to the server, and look after it until it
808 if not me.connectedp():
809 raise TripeConnectionError('connection closed')
813 def command(me, *cmd, **kw):
814 """Convenience wrapper for creating a TripeCommandIterator object."""
815 return TripeCommandIterator(me, cmd, **kw)
817 ## --- Convenience methods for server commands ---
819 def add(me, peer, *addr, **kw):
820 return _simple(me.command(bg = True,
822 _kwopts(kw, ['tunnel', 'keepalive', 'cork']) +
826 return _oneline(me.command('ADDR', peer))
828 return _keyvals(me.command('ALGS'))
829 def checkchal(me, chal):
830 return _simple(me.command('CHECKCHAL', chal))
832 return _simple(me.command('DAEMON'))
833 def eping(me, peer, **kw):
834 return _oneline(me.command(bg = True,
836 _kwopts(kw, ['timeout']) +
838 def forcekx(me, peer):
839 return _simple(me.command('FORCEKX', peer))
841 return _oneline(me.command('GETCHAL', filter = _tokenjoin))
842 def greet(me, peer, chal):
843 return _simple(me.command('GREET', peer, chal))
845 return list(me.command('HELP', filter = _tokenjoin))
846 def ifname(me, peer):
847 return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
849 return _simple(me.command('KILL', peer))
851 return list(me.command('LIST', filter = _tokenjoin))
852 def notify(me, *msg):
853 return _simple(me.command('NOTIFY', *msg))
854 def peerinfo(me, peer):
855 return _keyvals(me.command('PEERINFO', peer))
856 def ping(me, peer, **kw):
857 return _oneline(me.command(bg = True,
859 _kwopts(kw, ['timeout']) +
862 return _oneline(me.command('PORT', filter = _tokenjoin))
864 return _simple(me.command('QUIT'))
866 return _simple(me.command('RELOAD'))
868 return _keyvals(me.command('SERVINFO'))
869 def setifname(me, new):
870 return _simple(me.command('SETIFNAME', new))
871 def svcclaim(me, service, version):
872 return _simple(me.command('SVCCLAIM', service, version))
873 def svcensure(me, service, version = None):
874 return _simple(me.command('SVCENSURE', service,
875 *((version is not None and [version]) or [])))
876 def svcfail(me, job, *msg):
877 return _simple(me.command('SVCFAIL', job, *msg))
878 def svcinfo(me, job, *msg):
879 return _simple(me.command('SVCINFO', job, *msg))
881 return list(me.command('SVCLIST'))
883 return _simple(me.command('SVCOK', job))
884 def svcquery(me, service):
885 return _keyvals(me.command('SVCQUERY', service))
886 def svcrelease(me, service):
887 return _simple(me.command('SVCRELEASE', service))
888 def svcsubmit(me, service, *args, **kw):
889 return me.command(bg = True,
891 _kwopts(kw, ['version']) +
895 return _keyvals(me.command('STATS', peer))
896 def trace(me, *args):
897 return _tracelike(me.command('TRACE', *args))
899 return list(me.command('TUNNELS', filter = _tokenjoin))
901 return _oneline(me.command('VERSION', filter = _tokenjoin))
903 return _simple(me.command('WARN', *msg))
904 def watch(me, *args):
905 return _tracelike(me.command('WATCH', *args))
907 ###--------------------------------------------------------------------------
908 ### Asynchronous commands.
910 class TripeAsynchronousCommand (TripeCommand):
912 Asynchronous commands.
914 This is the complicated way of issuing commands. You must set up a queue,
915 and associate the command with the queue. Responses arriving for the
916 command will be put on the queue as an triple of the form (TAG, CODE, REST)
917 -- where TAG is an object of your choice, not interpreted by this class,
918 CODE is the server's response code (OK, INFO, FAIL), and REST is the list
919 of the rest of the server's tokens.
921 Using this, you can write coroutines which process many commands (and
922 possibly other events) simultaneously.
925 def __init__(me, queue, tag, words):
926 """Make an asynchronous command consisting of the given WORDS, which
927 sends responses to QUEUE, labelled with TAG."""
928 TripeCommand.__init__(me, words)
932 def response(me, code, *stuff):
933 """Handle a server response by writing it to the caller's queue."""
934 me.queue.put((me.tag, code, list(stuff)))
936 ###--------------------------------------------------------------------------
939 class TripeJobCancelled (Exception):
941 Exception sent to job handler if the client kills the job.
943 Not propagated further.
947 class TripeJobError (Exception):
949 Exception to cause failure report for running job.
951 Sends an SVCFAIL code back.
955 class TripeSyntaxError (Exception):
957 Exception to report a syntax error for a job.
959 Sends an SVCFAIL bad-svc-syntax message back.
963 class TripeServiceManager (TripeCommandDispatcher):
965 A command dispatcher with added handling for incoming service requests.
967 There is usually only one instance of this class, called svcmgr. Some of
968 the support functions in this module assume that this is the case.
970 To use, run mLib.select in a loop until the quitp method returns true;
971 then, in a non-root coroutine, register your services by calling `add', and
972 then call `running' when you've finished setting up.
974 The instance handles server service messages SVCJOB, SVCCANCEL and
975 SVCCLAIM. It maintains a table of running services. Incoming jobs cause
976 the service's `job' method to be invoked; SVCCANCEL sends a
977 TripeJobCancelled exception to the handler coroutine, and SVCCLAIM causes
978 the relevant service to be deregistered.
980 There is no base class for jobs, but a job must implement two methods:
982 start() Begin processing; might be a no-op.
984 cancel() Stop processing; the original client has killed the
987 The life of a service manager is divided into two parts: setup and running;
988 you tell the manager that you've finished setting up by calling the
989 `running' method. If, at any point after setup is finished, there are no
990 remaining services or jobs, `quitp' will return true, ending the process.
993 ## --- Attributes ---
995 ## svc Mapping name -> service object
997 ## job Mapping jobid -> job handler coroutine
999 ## runningp True when setup is finished
1001 ## _quitp True if explicit quit has been requested
1003 def __init__(me, socket):
1005 Initialize the service manager.
1007 SOCKET is the administration socket to connect to.
1009 TripeCommandDispatcher.__init__(me, socket)
1013 me.handler['SVCCANCEL'] = me._cancel
1014 me.handler['SVCJOB'] = me._job
1015 me.handler['SVCCLAIM'] = me._claim
1018 def addsvc(me, svc):
1019 """Register a new service; SVC is a TripeService instance."""
1020 assert svc.name not in me.svc
1021 me.svcclaim(svc.name, svc.version)
1022 me.svc[svc.name] = svc
1024 def _cancel(me, _, jid):
1026 Called when the server cancels a job; invokes the job's `cancel' method.
1032 def _claim(me, _, svc, __):
1033 """Called when another program claims our service at a higher version."""
1036 def _job(me, _, jid, svc, cmd, *args):
1038 Called when the server sends us a job to do.
1040 Calls the service to collect a job, and begins processing it.
1042 assert jid not in me.job
1043 svc = me.svc[svc.lower()]
1044 job = svc.job(jid, cmd, args)
1049 """Answer true if setup is finished."""
1052 def jobdone(me, jid):
1053 """Informs the service manager that the job with id JID has finished."""
1061 Return true if no services or jobs are active (and, therefore, if this
1062 process can quit without anyone caring).
1064 return me._quitp or (me.runningp and ((not me.svc and not me.job) or
1068 """Forces the quit flag (returned by quitp) on."""
1071 class TripeService (object):
1075 The NAME and VERSION are passed on to the server. The CMDTAB is a
1076 dictionary mapping command names (in lowercase) to command objects.
1078 If the CMDTAB doesn't have entries for commands HELP and QUIT then defaults
1081 TripeService itself is mostly agnostic about the nature of command objects,
1082 but the TripeServiceJob class (below) has some requirements. The built-in
1083 HELP command requires command objects to have `usage' attributes.
1086 def __init__(me, name, version, cmdtab):
1088 Create and register a new service with the given NAME and VERSION.
1090 CMDTAB maps command names (in lower-case) to command objects.
1093 me.version = version
1096 me.cmd.setdefault('help',
1097 TripeServiceCommand('help', 0, 0, '', me._help))
1098 me.cmd.setdefault('quit',
1099 TripeServiceCommand('quit', 0, 0, '', me._quit))
1101 def job(me, jid, cmd, args):
1103 Called by the service manager: a job arrived with id JID.
1105 It asks for comamnd CMD with argument list ARGS. Creates a new job,
1106 passing it the information needed.
1108 return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1110 ## Simple default command handlers, complying with the spec in
1111 ## tripe-service(7).
1114 """Send a help summary to the user."""
1115 cmds = me.cmd.items()
1117 for name, cmd in cmds:
1118 svcinfo(name, *cmd.usage)
1121 """Terminate the service manager."""
1122 svcmgr.notify('svc-quit', me.name, 'admin-request')
1125 class TripeServiceCommand (object):
1126 """A simple service command."""
1128 def __init__(me, name, min, max, usage, func):
1130 Creates a new command.
1132 NAME is the command's name (in lowercase).
1134 MIN and MAX are the minimum and maximum number of allowed arguments (used
1135 for checking); either may be None to indicate no minimum or maximum.
1137 USAGE is a usage string, used for generating help and error messages.
1139 FUNC is the function to invoke.
1144 me.usage = usage.split()
1149 Called when the command is invoked.
1151 Does minimal checking of the arguments and calls the supplied function.
1153 if (me.min is not None and len(args) < me.min) or \
1154 (me.max is not None and len(args) > me.max):
1155 raise TripeSyntaxError
1158 class TripeServiceJob (Coroutine):
1160 Job handler coroutine.
1162 A standard TripeService invokes a TripeServiceJob for each incoming job
1163 request, passing it the jobid, command and arguments, and a command
1164 object. The command object needs the following attributes.
1166 usage A usage list (excluding the command name) showing
1167 arguments and options.
1169 run(*ARGS) Function to react to the command with ARGS split into
1170 separate arguments. Invoked in a coroutine. The
1171 svcinfo function (not the TripeCommandDispatcher
1172 method) may be used to send INFO lines. The function
1173 may raise TripeJobError to send a FAIL response back,
1174 or TripeSyntaxError to send a generic usage error.
1175 TripeJobCancelled exceptions are trapped silently.
1176 Other exceptions are translated into a generic
1177 internal-error message.
1179 This class automatically takes care of sending some closing response to the
1180 job, and for informing the service manager that the job is completed.
1182 The `jid' attribute stores the job's id.
1185 def __init__(me, jid, svc, cmd, command, args):
1189 The job is created with id JID, for service SVC, processing command name
1190 CMD (which the service resolved into the command object COMMAND, or
1191 None), and with the arguments ARGS.
1193 Coroutine.__init__(me)
1197 me.command = command
1202 Main body of the coroutine.
1204 Does the tedious exception handling boilerplate and invokes the command's
1209 if me.command is None:
1210 svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1212 me.command.run(*me.args)
1213 svcmgr.svcok(me.jid)
1214 except TripeJobError, exc:
1215 svcmgr.svcfail(me.jid, *exc.args)
1216 except TripeSyntaxError:
1217 svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1218 me.svc.name, me.command.name,
1220 except TripeJobCancelled:
1222 except Exception, exc:
1223 svcmgr.svcfail(me.jid, 'svc-internal-error',
1224 exc.__class__.__name__, str(exc))
1226 svcmgr.jobdone(me.jid)
1229 """Invoked by the service manager to start running the coroutine."""
1233 """Invoked by the service manager to cancel the job."""
1234 me.throw(TripeJobCancelled())
1238 If invoked from a TripeServiceJob coroutine, sends an INFO line to the
1239 job's sender, automatically using the correct job id.
1241 svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1243 def _setupsvc(tab, func):
1245 Setup coroutine for setting up service programs.
1247 Register the given services.
1251 svcmgr.addsvc(service)
1257 svcmgr = TripeServiceManager(None)
1258 def runservices(socket, tab, init = None, setup = None, daemon = False):
1260 Function to start a service provider.
1262 SOCKET is the socket to connect to, usually tripesock.
1264 TAB is a list of entries. An entry may be either a tuple
1266 (NAME, VERSION, COMMANDS)
1268 or a service object (e.g., a TripeService instance).
1270 COMMANDS is a dictionary mapping command names to tuples
1272 (MIN, MAX, USAGE, FUNC)
1274 of arguments for a TripeServiceCommand object.
1276 If DAEMON is true, then the process is forked into the background before we
1277 start. If INIT is given, it is called in the main coroutine, immediately
1278 after forking. If SETUP is given, it is called in a coroutine, after
1279 calling INIT and setting up the services but before marking the service
1282 It is a really bad idea to do any initialization, particularly setting up
1283 coroutines, outside of the INIT or SETUP functions. In particular, if
1284 we're using rmcr for fake coroutines, the daemonizing fork will kill off
1285 the currently established coroutines in a most surprising way.
1287 The function runs a main select loop until the service manager decides to
1291 svcmgr.socket = socket
1295 if not isinstance(service, tuple):
1296 svcs.append(service)
1298 name, version, commands = service
1300 for cmd, stuff in commands.iteritems():
1301 cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1302 svcs.append(TripeService(name, version, cmdmap))
1305 if init is not None:
1307 spawn(_setupsvc, svcs, setup)
1310 ###--------------------------------------------------------------------------
1311 ### Utilities for services.
1313 _timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
1315 """Parse the timespec SPEC, returning a number of seconds."""
1317 if len(spec) > 1 and spec[-1] in _timeunits:
1318 mul = _timeunits[spec[-1]]
1323 raise TripeJobError('bad-time-spec', spec)
1325 raise TripeJobError('bad-time-spec', spec)
1326 return mul * int(spec)
1328 class OptParse (object):
1330 Parse options from a command list in the conventional fashion.
1332 ARGS is a list of arguments to a command. ALLOWED is a sequence of allowed
1333 options. The returned values are the option tags. During parsing, the
1334 `arg' method may be used to retrieve the argument for the most recent
1335 option. Afterwards, `rest' may be used to retrieve the remaining
1336 non-option arguments, and do a simple check on how many there are.
1338 The parser correctly handles `--' option terminators.
1341 def __init__(me, args, allowed):
1343 Create a new option parser.
1345 The parser will scan the ARGS for options given in the sequence ALLOWED
1346 (which are expected to include the `-' prefix).
1350 me.allowed[a] = True
1351 me.args = list(args)
1354 """Iterator protocol: I am my own iterator."""
1359 Iterator protocol: return the next option.
1361 If we've run out, raise StopIteration.
1363 if len(me.args) == 0 or \
1364 len(me.args[0]) < 2 or \
1365 not me.args[0].startswith('-'):
1367 opt = me.args.pop(0)
1370 if opt not in me.allowed:
1371 raise TripeSyntaxError
1376 Return the argument for the most recent option.
1378 If none is available, raise TripeSyntaxError.
1380 if len(me.args) == 0:
1381 raise TripeSyntaxError
1382 return me.args.pop(0)
1384 def rest(me, min = None, max = None):
1386 After option parsing is done, return the remaining arguments.
1388 Check that there are at least MIN and at most MAX arguments remaining --
1389 either may be None to suppress the check.
1391 if (min is not None and len(me.args) < min) or \
1392 (max is not None and len(me.args) > max):
1393 raise TripeSyntaxError
1396 ###----- That's all, folks --------------------------------------------------