3 ### Administration connection with tripe server
5 ### (c) 2006 Straylight/Edgeware
8 ###----- Licensing notice ---------------------------------------------------
10 ### This file is part of Trivial IP Encryption (TrIPE).
12 ### TrIPE is free software: you can redistribute it and/or modify it under
13 ### the terms of the GNU General Public License as published by the Free
14 ### Software Foundation; either version 3 of the License, or (at your
15 ### option) any later version.
17 ### TrIPE is distributed in the hope that it will be useful, but WITHOUT
18 ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
19 ### FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
22 ### You should have received a copy of the GNU General Public License
23 ### along with TrIPE. If not, see <https://www.gnu.org/licenses/>.
26 This module provides classes and functions for connecting to a running tripe
27 server, sending it commands, receiving and processing replies, and
28 implementing services.
30 Rather than end up in lost in a storm of little event-driven classes, or a
31 morass of concurrent threads, the module uses coroutines to present a fairly
32 simple function call/return interface to potentially long-running commands
33 which must run without blocking the main process. It assumes a coroutine
34 module presenting a subset of the `greenlet' interface: if actual greenlets
35 are available, they are used; otherwise there's an implementation in terms of
36 threads (with lots of locking) which will do instead.
38 The simple rule governing the coroutines used here is this:
40 * The root coroutine never cares what values are passed to it when it
41 resumes: it just discards them.
43 * Other, non-root, coroutines are presumed to be waiting for some specific
46 Configuration variables:
54 Other useful variables:
79 TripeSynchronousCommand
80 TripeAsynchronousCommand
83 TripeCommandDispatcher
96 __pychecker__ = 'self=me no-constCond no-argsused'
100 ###--------------------------------------------------------------------------
101 ### External dependencies.
111 if OS.getenv('TRIPE_FORCE_RMCR') is not None:
113 from py.magic import greenlet as _Coroutine
115 from rmcr import Coroutine as _Coroutine
117 ###--------------------------------------------------------------------------
118 ### Coroutine hacking.
120 rootcr = _Coroutine.getcurrent()
122 class Coroutine (_Coroutine):
124 A coroutine class which can only be invoked by the root coroutine.
126 The root, by construction, cannot be an instance of this class.
128 def switch(me, *args, **kw):
129 assert _Coroutine.getcurrent() is rootcr
130 if _debug: print '* %s' % me
131 _Coroutine.switch(me, *args, **kw)
132 if _debug: print '* %s' % rootcr
134 ###--------------------------------------------------------------------------
135 ### Default places for things.
137 configdir = OS.environ.get('TRIPEDIR', "@configdir@")
138 socketdir = "@socketdir@"
139 PACKAGE = "@PACKAGE@"
140 VERSION = "@VERSION@"
142 tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
143 peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb')
145 ###--------------------------------------------------------------------------
146 ### Connection to the server.
148 def readnonblockingly(sock, len):
150 Nonblocking read from SOCK.
152 Try to return LEN bytes. If couldn't read anything, return `None'. EOF is
153 returned as an empty string.
157 return sock.recv(len)
159 if exc[0] == E.EWOULDBLOCK:
163 class TripeConnectionError (StandardError):
164 """Something happened to the connection with the server."""
166 class TripeInternalError (StandardError):
167 """This program is very confused."""
170 class TripeConnection (object):
172 A logical connection to the tripe administration socket.
174 There may or may not be a physical connection. (This is needed for the
175 monitor, for example.)
177 This class isn't very useful on its own, but it has useful subclasses. At
178 this level, the class is agnostic about I/O multiplexing schemes; that gets
182 def __init__(me, socket):
184 Make a connection to the named SOCKET.
186 No physical connection is made initially.
191 me.iowatch = SelIOWatcher(me)
195 Ensure that there's a physical connection.
197 Do nothing if we're already connected. Invoke the `connected' method if
201 sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
202 sock.connect(me.socket)
204 me.lbuf = M.LineBuffer(me.line, me._eof)
209 def disconnect(me, reason):
211 Disconnect the physical connection.
213 Invoke the `disconnected' method, giving the provided REASON, which
214 should be either `None' or an exception.
216 if not me.sock: return
217 me.disconnected(reason)
226 Return true if there's a current, believed-good physical connection.
228 return me.sock is not None
230 __nonzero__ = connectedp
234 Send the LINE to the connection's socket.
236 All output is done through this method; it can be overridden to provide
237 proper nonblocking writing, though this seems generally unnecessary.
240 me.sock.setblocking(1)
241 me.sock.send(line + '\n')
242 except Exception, exc:
249 Receive whatever's ready from the connection's socket.
251 Call `line' on each complete line, and `eof' if the connection closed.
252 Subclasses which attach this class to an I/O-event system should call
253 this method when the socket (the `sock' attribute) is ready for reading.
255 while me.sock is not None:
257 buf = readnonblockingly(me.sock, 16384)
258 except Exception, exc:
270 """Internal end-of-file handler."""
271 me.disconnect(TripeConnectionError('connection lost'))
276 To be overridden by subclasses to react to a connection being
279 me.iowatch.connected(me.sock)
281 def disconnected(me, reason):
283 To be overridden by subclasses to react to a connection being severed.
285 me.iowatch.disconnected()
288 """To be overridden by subclasses to handle end-of-file."""
292 """To be overridden by subclasses to handle incoming lines."""
295 ###--------------------------------------------------------------------------
296 ### I/O loop integration.
298 class SelIOWatcher (object):
300 Integration with mLib's I/O event system.
302 You can replace this object with a different one for integration with,
303 e.g., glib's main loop, by setting `CONN.iowatcher' to a different object
304 while the CONN is disconnected.
307 def __init__(me, conn):
311 def connected(me, sock):
313 Called when a connection is made.
315 SOCK is the socket. The watcher must arrange to call `CONN.receive' when
318 me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive)
321 def disconnected(me):
323 Called when the connection is lost.
329 Wait for something interesting to happen, and issue events.
331 That is, basically, do one iteration of a main select loop, processing
332 all of the events, and then return. This is used in the method
333 `TripeCommandDispatcher.mainloop', but that's mostly for the benefit of
334 `runservices'; if your I/O watcher has a different main loop, you can
339 ###--------------------------------------------------------------------------
340 ### Inter-coroutine communication.
342 class Queue (object):
344 A queue of things arriving asynchronously.
346 This is a very simple single-reader multiple-writer queue. It's useful for
347 more complex coroutines which need to cope with a variety of possible
352 """Create a new empty queue."""
353 me.contents = M.Array()
358 Internal: wait for an item to arrive in the queue.
360 Complain if someone is already waiting, because this is just a
364 raise ValueError('queue already being waited on')
366 me.waiter = Coroutine.getcurrent()
367 while not me.contents:
368 me.waiter.parent.switch()
374 Remove and return the item at the head of the queue.
376 If the queue is empty, wait until an item arrives.
379 return me.contents.shift()
383 Return the item at the head of the queue without removing it.
385 If the queue is empty, wait until an item arrives.
388 return me.contents[0]
392 Write THING to the queue.
394 If someone is waiting on the queue, wake him up immediately; otherwise
395 just leave the item there for later.
397 me.contents.push(thing)
401 ###--------------------------------------------------------------------------
402 ### Dispatching coroutine.
404 ## Match a string if it can stand on its own as a bareword: i.e., it doesn't
405 ## contain backslashes, quotes or whitespace.
406 rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
408 ## Match characters which need to be escaped, even in quoted text.
409 rx_weird = RX.compile(r'([\\\'])')
412 """Quote S according to the tripe-admin(5) rules."""
413 m = rx_ordinary.match(s)
414 if m and m.end() == len(s):
417 return "'" + rx_weird.sub(r'\\\1', s) + "'"
421 Return a wrapper for FUNC which reports exceptions thrown by it.
423 Useful in the case of callbacks invoked by C functions which ignore
428 return func(*a, **kw)
430 SYS.excepthook(*SYS.exc_info())
434 class TripeCommand (object):
436 This abstract class represents a command in progress.
438 The `words' attribute contains the list of tokens which make up the
441 Subclasses must implement a method to handle server responses:
443 * response(CODE, *ARGS): CODE is one of the strings `OK', `INFO' or
444 `FAIL'; ARGS are the remaining tokens from the server's response.
447 def __init__(me, words):
448 """Make a new command consisting of the given list of WORDS."""
451 class TripeSynchronousCommand (TripeCommand):
453 A simple command, processed apparently synchronously.
455 Must be invoked from a coroutine other than the root (or whichever one is
456 running the dispatcher); in reality, other coroutines carry on running
457 while we wait for a response from the server.
459 Each server response causes the calling coroutine to be resumed with the
460 pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
461 or `FAIL') and REST is a list of the server's other response tokens. The
462 calling coroutine must continue switching back to the dispatcher until a
463 terminating response (`OK' or `FAIL') is received or become very
466 Mostly it's better to use the `TripeCommandIterator' to do this
470 def __init__(me, words):
471 """Initialize the command, specifying the WORDS to send to the server."""
472 TripeCommand.__init__(me, words)
473 me.owner = Coroutine.getcurrent()
475 def response(me, code, *rest):
476 """Handle a server response by forwarding it to the calling coroutine."""
477 me.owner.switch((code, rest))
479 class TripeError (StandardError):
481 A tripe command failed with an error (a `FAIL' code). The args attribute
482 contains a list of the server's message tokens.
486 class TripeCommandIterator (object):
488 Iterator interface to a tripe command.
490 The values returned by the iterator are lists of tokens from the server's
491 `INFO' lines, as processed by the given filter function, if any. The
492 iterator completes normally (by raising `StopIteration') if the server
493 reported `OK', and raises an exception if the command failed for some reason.
495 A `TripeError' is raised if the server issues a `FAIL' code. If the
496 connection failed, some other exception is raised.
499 def __init__(me, dispatcher, words, bg = False, filter = None):
501 Create a new command iterator.
503 The command is submitted to the DISPATCHER; it consists of the given
504 WORDS. If BG is true, then an option is inserted to request that the
505 server run the command in the background. The FILTER is applied to the
506 token lists which the server responds, and the filter's output are the
507 items returned by the iterator.
509 me.dcr = Coroutine.getcurrent().parent
511 raise ValueError('must invoke from coroutine')
512 me.filter = filter or (lambda x: x)
514 words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
515 dispatcher.rawcommand(TripeSynchronousCommand(words))
518 """Iterator protocol: I am my own iterator."""
523 Iterator protocol: return the next piece of information from the server.
525 `INFO' responses are filtered and returned as the values of the
526 iteration. `FAIL' and `CONNERR' responses are turned into exceptions and
527 raised. Finally, `OK' is turned into `StopIteration', which should cause
528 a normal end to the iteration process.
530 thing = me.dcr.switch()
533 return me.filter(rest)
535 raise StopIteration()
536 elif code == 'CONNERR':
538 raise TripeConnectionError('connection terminated by user')
542 raise TripeError(*rest)
544 raise TripeInternalError('unexpected tripe response %r' %
547 ### Simple utility functions for the TripeCommandIterator convenience
550 def _tokenjoin(words):
551 """Filter function: simply join the given tokens with spaces between."""
552 return ' '.join(words)
555 """Return a dictionary formed from the `KEY=VALUE' pairs returned by the
561 kv[w[:q]] = w[q + 1:]
565 """Raise an error if ITER contains any item."""
568 raise TripeInternalError('expected no response')
572 """If ITER contains a single item, return it; otherwise raise an error."""
575 raise TripeInternalError('expected only one line of response')
578 def _tracelike(iter):
579 """Handle a TRACE-like command. The result is a list of tuples (CHAR,
580 STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
581 disabled, `+' if enabled, maybe something else later), and DESC is the
582 human-readable description."""
587 desc = ' '.join(ww[1:])
588 stuff.append((ch, st, desc))
591 def _kwopts(kw, allowed):
592 """Parse keyword arguments into options. ALLOWED is a list of allowable
593 keywords; raise errors if other keywords are present. `KEY = VALUE'
594 becomes an option pair `-KEY VALUE' if VALUE is a string, just the option
595 `-KEY' if VALUE is a true non-string, or nothing if VALUE is false. Insert
596 a `--' at the end to stop the parser getting confused."""
599 for a in allowed: amap[a] = True
600 for k, v in kw.iteritems():
602 raise ValueError('option %s not allowed here' % k)
603 if isinstance(v, str):
612 def defer(func, *args, **kw):
613 """Call FUNC(*ARGS, **KW) later, in the root coroutine."""
614 _deferq.append((func, args, kw))
616 def funargstr(func, args, kw):
617 items = [repr(a) for a in args]
618 for k, v in kw.iteritems():
619 items.append('%s = %r' % (k, v))
620 return '%s(%s)' % (func.__name__, ', '.join(items))
622 def spawn(func, *args, **kw):
623 """Call FUNC, passing ARGS and KW, in a fresh coroutine."""
624 defer(lambda: (Coroutine(func, name = funargstr(func, args, kw))
625 .switch(*args, **kw)))
631 Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW).
634 func, args, kw = _asideq.get()
638 SYS.excepthook(*SYS.exc_info())
640 def aside(func, *args, **kw):
641 """Call FUNC(*ARGS, **KW) later, in a non-root coroutine."""
642 defer(_asideq.put, (func, args, kw))
644 class TripeCommandDispatcher (TripeConnection):
648 The command dispatcher is a connection which knows how to handle commands.
649 This is probably the most important class in this module to understand.
651 Lines from the server are parsed into tokens. The first token is a code
652 (`OK' or `NOTE' or something) explaining what kind of line this is. The
653 `handler' attribute is a dictionary mapping server line codes to handler
654 functions, which are applied to the words of the line as individual
655 arguments. *Exception*: the content of `TRACE' lines is not tokenized.
657 There are default handlers for server codes which respond to commands.
658 Commands arrive as `TripeCommand' instances through the `rawcommand'
659 interface. The dispatcher keeps track of which command objects represent
660 which jobs, and sends responses on to the appropriate command objects by
661 invoking their `response' methods. Command objects don't see the `BG...'
662 codes, because the dispatcher has already transformed them into regular
663 codes when it was looking up the job tag.
665 The dispatcher also has a special response code of its own: `CONNERR'
666 indicates that the connection failed and the command has therefore been
667 lost. This is sent to all outstanding commands when a connection error is
668 encountered: rather than a token list, it is accompanied by an exception
669 object which is the cause of the disconnection, which may be `None' if the
670 disconnection is expected (e.g., the direct result of a user request).
673 ## --- Infrastructure ---
675 ## We will get confused if we pipeline commands. Send them one at a time.
676 ## Only send a command when the previous one detaches or completes.
678 ## The following attributes are interesting:
680 ## tagseq Sequence number for next background job (for bgtag)
682 ## queue Commands awaiting submission.
684 ## cmd Mapping from job tags to commands: cmd[None] is the
685 ## foreground command.
687 ## handler Mapping from server codes to handler functions.
689 def __init__(me, socket):
691 Initialize the dispatcher.
693 The SOCKET is the filename of the administration socket to connect to,
694 for TripeConnection.__init__.
696 TripeConnection.__init__(me, socket)
699 me.handler['BGDETACH'] = me._detach
700 for i in 'BGOK', 'BGINFO', 'BGFAIL':
701 me.handler[i] = me._response
702 for i in 'OK', 'INFO', 'FAIL':
703 me.handler[i] = me._fgresponse
706 """Should we quit the main loop? Subclasses should override."""
709 def mainloop(me, quitp = None):
711 Iterate the I/O watcher until QUITP returns true.
713 Arranges for asides and deferred calls to be made at the right times.
717 assert _Coroutine.getcurrent() is rootcr
718 Coroutine(_runasides, name = '_runasides').switch()
725 for func, args, kw in q:
733 If a subclass overrides this method, it must call us; clears out the
734 command queue and job map.
738 TripeConnection.connected(me)
740 def disconnected(me, reason):
744 If a subclass hooks overrides this method, it must call us; sends a
745 special `CONNERR' code to all incomplete commands.
747 TripeConnection.disconnected(me, reason)
748 for cmd in me.cmd.itervalues():
749 cmd.response('CONNERR', reason)
751 cmd.response('CONNERR', reason)
755 """Handle an incoming line, sending it to the right place."""
756 if _debug: print '<', line
757 code, rest = M.word(line, quotep = True)
758 func = me.handler.get(code)
763 func(code, *M.split(rest, quotep = True)[0])
768 Pull the oldest command off the queue and try to send it to the server.
770 if not me.queue or None in me.cmd: return
771 cmd = me.queue.shift()
772 if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
773 me.send(' '.join([quotify(w) for w in cmd.words]))
778 Return an unused job tag.
780 May be of use when composing commands by hand.
782 tag = 'J%05d' % me.tagseq
786 ## --- Built-in handler functions for server responses ---
788 def _detach(me, _, tag):
790 Respond to a `BGDETACH' TAG message.
792 Move the current foreground command to the background.
794 assert tag not in me.cmd
795 me.cmd[tag] = me.cmd[None]
798 def _response(me, code, tag, *w):
800 Respond to an `OK', `INFO' or `FAIL' message.
802 If this is a message for a background job, find the tag; then dispatch
803 the result to the command object. This is also called by `_fgresponse'
804 (wth TAG set to `None') to handle responses for foreground commands, and
805 is therefore a useful method to extend or override in subclasses.
807 if code.startswith('BG'):
812 cmd.response(code, *w)
814 def _fgresponse(me, code, *w):
815 """Process responses to the foreground command."""
816 me._response(code, None, *w)
818 ## --- Interface methods ---
820 def rawcommand(me, cmd):
822 Submit the `TripeCommand' CMD to the server, and look after it until it
825 if not me.connectedp():
826 raise TripeConnectionError('connection closed')
830 def command(me, *cmd, **kw):
831 """Convenience wrapper for creating a TripeCommandIterator object."""
832 return TripeCommandIterator(me, cmd, **kw)
834 ## --- Convenience methods for server commands ---
836 def add(me, peer, *addr, **kw):
837 return _simple(me.command(bg = True,
839 _kwopts(kw, ['tunnel', 'keepalive',
840 'key', 'priv', 'cork',
845 return _oneline(me.command('ADDR', peer))
846 def algs(me, peer = None):
847 return _keyvals(me.command('ALGS',
848 *((peer is not None and [peer]) or [])))
849 def checkchal(me, chal):
850 return _simple(me.command('CHECKCHAL', chal))
852 return _simple(me.command('DAEMON'))
853 def eping(me, peer, **kw):
854 return _oneline(me.command(bg = True,
856 _kwopts(kw, ['timeout']) +
858 def forcekx(me, peer):
859 return _simple(me.command('FORCEKX', peer))
861 return _oneline(me.command('GETCHAL', filter = _tokenjoin))
862 def greet(me, peer, chal):
863 return _simple(me.command('GREET', peer, chal))
865 return list(me.command('HELP', filter = _tokenjoin))
866 def ifname(me, peer):
867 return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
869 return _simple(me.command('KILL', peer))
871 return list(me.command('LIST', filter = _tokenjoin))
872 def notify(me, *msg):
873 return _simple(me.command('NOTIFY', *msg))
874 def peerinfo(me, peer):
875 return _keyvals(me.command('PEERINFO', peer))
876 def ping(me, peer, **kw):
877 return _oneline(me.command(bg = True,
879 _kwopts(kw, ['timeout']) +
881 def port(me, af = None):
882 return _oneline(me.command('PORT',
883 *((af is not None) and [af] or []),
884 filter = _tokenjoin))
886 return _simple(me.command('QUIT'))
888 return _simple(me.command('RELOAD'))
890 return _keyvals(me.command('SERVINFO'))
891 def setifname(me, new):
892 return _simple(me.command('SETIFNAME', new))
893 def svcclaim(me, service, version):
894 return _simple(me.command('SVCCLAIM', service, version))
895 def svcensure(me, service, version = None):
896 return _simple(me.command('SVCENSURE', service,
897 *((version is not None and [version]) or [])))
898 def svcfail(me, job, *msg):
899 return _simple(me.command('SVCFAIL', job, *msg))
900 def svcinfo(me, job, *msg):
901 return _simple(me.command('SVCINFO', job, *msg))
903 return list(me.command('SVCLIST'))
905 return _simple(me.command('SVCOK', job))
906 def svcquery(me, service):
907 return _keyvals(me.command('SVCQUERY', service))
908 def svcrelease(me, service):
909 return _simple(me.command('SVCRELEASE', service))
910 def svcsubmit(me, service, *args, **kw):
911 return me.command(bg = True,
913 _kwopts(kw, ['version']) +
917 return _keyvals(me.command('STATS', peer))
918 def trace(me, *args):
919 return _tracelike(me.command('TRACE', *args))
921 return list(me.command('TUNNELS', filter = _tokenjoin))
923 return _oneline(me.command('VERSION', filter = _tokenjoin))
925 return _simple(me.command('WARN', *msg))
926 def watch(me, *args):
927 return _tracelike(me.command('WATCH', *args))
929 ###--------------------------------------------------------------------------
930 ### Asynchronous commands.
932 class TripeAsynchronousCommand (TripeCommand):
934 Asynchronous commands.
936 This is the complicated way of issuing commands. You must set up a queue,
937 and associate the command with the queue. Responses arriving for the
938 command will be put on the queue as an triple of the form (TAG, CODE, REST)
939 -- where TAG is an object of your choice, not interpreted by this class,
940 CODE is the server's response code (`OK', `INFO', `FAIL', or `CONNERR'),
941 and REST is the list of the rest of the server's tokens.
943 Using this, you can write coroutines which process many commands (and
944 possibly other events) simultaneously.
947 def __init__(me, queue, tag, words):
948 """Make an asynchronous command consisting of the given WORDS, which
949 sends responses to QUEUE, labelled with TAG."""
950 TripeCommand.__init__(me, words)
954 def response(me, code, *stuff):
955 """Handle a server response by writing it to the caller's queue."""
956 me.queue.put((me.tag, code, list(stuff)))
958 ###--------------------------------------------------------------------------
961 class TripeJobCancelled (Exception):
963 Exception sent to job handler if the client kills the job.
965 Not propagated further.
969 class TripeJobError (Exception):
971 Exception to cause failure report for running job.
973 Sends an SVCFAIL code back.
977 class TripeSyntaxError (Exception):
979 Exception to report a syntax error for a job.
981 Sends an SVCFAIL bad-svc-syntax message back.
985 class TripeServiceManager (TripeCommandDispatcher):
987 A command dispatcher with added handling for incoming service requests.
989 There is usually only one instance of this class, called svcmgr. Some of
990 the support functions in this module assume that this is the case.
992 To use, run `mLib.select' in a loop until the quitp method returns true;
993 then, in a non-root coroutine, register your services by calling `add', and
994 then call `running' when you've finished setting up.
996 The instance handles server service messages `SVCJOB', `SVCCANCEL' and
997 `SVCCLAIM'. It maintains a table of running services. Incoming jobs cause
998 the service's `job' method to be invoked; `SVCCANCEL' sends a
999 `TripeJobCancelled' exception to the handler coroutine, and `SVCCLAIM'
1000 causes the relevant service to be deregistered.
1002 There is no base class for jobs, but a job must implement two methods:
1004 start() Begin processing; might be a no-op.
1006 cancel() Stop processing; the original client has killed the
1009 The life of a service manager is divided into two parts: setup and running;
1010 you tell the manager that you've finished setting up by calling the
1011 `running' method. If, at any point after setup is finished, there are no
1012 remaining services or jobs, `quitp' will return true, ending the process.
1015 ## --- Attributes ---
1017 ## svc Mapping name -> service object
1019 ## job Mapping jobid -> job handler coroutine
1021 ## runningp True when setup is finished
1023 ## _quitp True if explicit quit has been requested
1025 def __init__(me, socket):
1027 Initialize the service manager.
1029 SOCKET is the administration socket to connect to.
1031 TripeCommandDispatcher.__init__(me, socket)
1035 me.handler['SVCCANCEL'] = me._cancel
1036 me.handler['SVCJOB'] = me._job
1037 me.handler['SVCCLAIM'] = me._claim
1040 def addsvc(me, svc):
1041 """Register a new service; SVC is a `TripeService' instance."""
1042 assert svc.name not in me.svc
1043 me.svcclaim(svc.name, svc.version)
1044 me.svc[svc.name] = svc
1046 def _cancel(me, _, jid):
1048 Called when the server cancels a job; invokes the job's `cancel' method.
1054 def _claim(me, _, svc, __):
1055 """Called when another program claims our service at a higher version."""
1058 def _job(me, _, jid, svc, cmd, *args):
1060 Called when the server sends us a job to do.
1062 Calls the service to collect a job, and begins processing it.
1064 assert jid not in me.job
1065 svc = me.svc[svc.lower()]
1066 job = svc.job(jid, cmd, args)
1071 """Answer true if setup is finished."""
1074 def jobdone(me, jid):
1075 """Informs the service manager that the job with id JID has finished."""
1083 Return true if no services or jobs are active (and, therefore, if this
1084 process can quit without anyone caring).
1086 return me._quitp or (me.runningp and ((not me.svc and not me.job) or
1090 """Forces the quit flag (returned by quitp) on."""
1093 class TripeService (object):
1097 The NAME and VERSION are passed on to the server. The CMDTAB is a
1098 dictionary mapping command names (in lowercase) to command objects.
1100 If the CMDTAB doesn't have entries for commands `HELP' and `QUIT' then
1101 defaults are provided.
1103 TripeService itself is mostly agnostic about the nature of command objects,
1104 but the TripeServiceJob class (below) has some requirements. The built-in
1105 HELP command requires command objects to have `usage' attributes.
1108 def __init__(me, name, version, cmdtab):
1110 Create and register a new service with the given NAME and VERSION.
1112 CMDTAB maps command names (in lower-case) to command objects.
1115 me.version = version
1118 me.cmd.setdefault('help',
1119 TripeServiceCommand('help', 0, 0, '', me._help))
1120 me.cmd.setdefault('quit',
1121 TripeServiceCommand('quit', 0, 0, '', me._quit))
1123 def job(me, jid, cmd, args):
1125 Called by the service manager: a job arrived with id JID.
1127 It asks for comamnd CMD with argument list ARGS. Creates a new job,
1128 passing it the information needed.
1130 return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1132 ## Simple default command handlers, complying with the spec in
1133 ## tripe-service(7).
1136 """Send a help summary to the user."""
1137 cmds = me.cmd.items()
1139 for name, cmd in cmds:
1140 svcinfo(name, *cmd.usage)
1143 """Terminate the service manager."""
1144 svcmgr.notify('svc-quit', me.name, 'admin-request')
1147 class TripeServiceCommand (object):
1148 """A simple service command."""
1150 def __init__(me, name, min, max, usage, func):
1152 Creates a new command.
1154 NAME is the command's name (in lowercase).
1156 MIN and MAX are the minimum and maximum number of allowed arguments (used
1157 for checking); either may be None to indicate no minimum or maximum.
1159 USAGE is a usage string, used for generating help and error messages.
1161 FUNC is the function to invoke.
1166 me.usage = usage.split()
1171 Called when the command is invoked.
1173 Does minimal checking of the arguments and calls the supplied function.
1175 if (me.min is not None and len(args) < me.min) or \
1176 (me.max is not None and len(args) > me.max):
1177 raise TripeSyntaxError()
1180 class TripeServiceJob (Coroutine):
1182 Job handler coroutine.
1184 A standard `TripeService' invokes a `TripeServiceJob' for each incoming job
1185 request, passing it the jobid, command and arguments, and a command object.
1186 The command object needs the following attributes.
1188 usage A usage list (excluding the command name) showing
1189 arguments and options.
1191 run(*ARGS) Function to react to the command with ARGS split into
1192 separate arguments. Invoked in a coroutine. The
1193 `svcinfo function (not the `TripeCommandDispatcher'
1194 method) may be used to send `INFO' lines. The
1195 function may raise `TripeJobError' to send a `FAIL'
1196 response back, or `TripeSyntaxError' to send a
1197 generic usage error. `TripeJobCancelled' exceptions
1198 are trapped silently. Other exceptions are
1199 translated into a generic internal-error message.
1201 This class automatically takes care of sending some closing response to the
1202 job, and for informing the service manager that the job is completed.
1204 The `jid' attribute stores the job's id.
1207 def __init__(me, jid, svc, cmd, command, args):
1211 The job is created with id JID, for service SVC, processing command name
1212 CMD (which the service resolved into the command object COMMAND, or
1213 `None'), and with the arguments ARGS.
1215 Coroutine.__init__(me)
1219 me.command = command
1224 Main body of the coroutine.
1226 Does the tedious exception handling boilerplate and invokes the command's
1231 if me.command is None:
1232 svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1234 me.command.run(*me.args)
1235 svcmgr.svcok(me.jid)
1236 except TripeJobError, exc:
1237 svcmgr.svcfail(me.jid, *exc.args)
1238 except TripeSyntaxError:
1239 svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1240 me.svc.name, me.command.name,
1242 except TripeJobCancelled:
1244 except Exception, exc:
1245 svcmgr.svcfail(me.jid, 'svc-internal-error',
1246 exc.__class__.__name__, str(exc))
1248 svcmgr.jobdone(me.jid)
1251 """Invoked by the service manager to start running the coroutine."""
1255 """Invoked by the service manager to cancel the job."""
1256 me.throw(TripeJobCancelled())
1260 If invoked from a TripeServiceJob coroutine, sends an `INFO' line to the
1261 job's sender, automatically using the correct job id.
1263 svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1265 def _setupsvc(tab, func):
1267 Setup coroutine for setting up service programs.
1269 Register the given services.
1273 svcmgr.addsvc(service)
1279 svcmgr = TripeServiceManager(None)
1280 def runservices(socket, tab, init = None, setup = None, daemon = False):
1282 Function to start a service provider.
1284 SOCKET is the socket to connect to, usually tripesock.
1286 TAB is a list of entries. An entry may be either a tuple
1288 (NAME, VERSION, COMMANDS)
1290 or a service object (e.g., a `TripeService' instance).
1292 COMMANDS is a dictionary mapping command names to tuples
1294 (MIN, MAX, USAGE, FUNC)
1296 of arguments for a `TripeServiceCommand' object.
1298 If DAEMON is true, then the process is forked into the background before we
1299 start. If INIT is given, it is called in the main coroutine, immediately
1300 after forking. If SETUP is given, it is called in a coroutine, after
1301 calling INIT and setting up the services but before marking the service
1304 It is a really bad idea to do any initialization, particularly setting up
1305 coroutines, outside of the INIT or SETUP functions. In particular, if
1306 we're using rmcr for fake coroutines, the daemonizing fork will kill off
1307 the currently established coroutines in a most surprising way.
1309 The function runs a main select loop until the service manager decides to
1313 svcmgr.socket = socket
1317 if not isinstance(service, tuple):
1318 svcs.append(service)
1320 name, version, commands = service
1322 for cmd, stuff in commands.iteritems():
1323 cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1324 svcs.append(TripeService(name, version, cmdmap))
1327 if init is not None:
1329 spawn(_setupsvc, svcs, setup)
1332 ###--------------------------------------------------------------------------
1333 ### Utilities for services.
1335 _timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
1337 """Parse the timespec SPEC, returning a number of seconds."""
1339 if len(spec) > 1 and spec[-1] in _timeunits:
1340 mul = _timeunits[spec[-1]]
1345 raise TripeJobError('bad-time-spec', spec)
1347 raise TripeJobError('bad-time-spec', spec)
1348 return mul * int(spec)
1350 class OptParse (object):
1352 Parse options from a command list in the conventional fashion.
1354 ARGS is a list of arguments to a command. ALLOWED is a sequence of allowed
1355 options. The returned values are the option tags. During parsing, the
1356 `arg' method may be used to retrieve the argument for the most recent
1357 option. Afterwards, `rest' may be used to retrieve the remaining
1358 non-option arguments, and do a simple check on how many there are.
1360 The parser correctly handles `--' option terminators.
1363 def __init__(me, args, allowed):
1365 Create a new option parser.
1367 The parser will scan the ARGS for options given in the sequence ALLOWED
1368 (which are expected to include the `-' prefix).
1372 me.allowed[a] = True
1373 me.args = list(args)
1376 """Iterator protocol: I am my own iterator."""
1381 Iterator protocol: return the next option.
1383 If we've run out, raise `StopIteration'.
1385 if len(me.args) == 0 or \
1386 len(me.args[0]) < 2 or \
1387 not me.args[0].startswith('-'):
1388 raise StopIteration()
1389 opt = me.args.pop(0)
1391 raise StopIteration()
1392 if opt not in me.allowed:
1393 raise TripeSyntaxError()
1398 Return the argument for the most recent option.
1400 If none is available, raise `TripeSyntaxError'.
1402 if len(me.args) == 0:
1403 raise TripeSyntaxError()
1404 return me.args.pop(0)
1406 def rest(me, min = None, max = None):
1408 After option parsing is done, return the remaining arguments.
1410 Check that there are at least MIN and at most MAX arguments remaining --
1411 either may be None to suppress the check.
1413 if (min is not None and len(me.args) < min) or \
1414 (max is not None and len(me.args) > max):
1415 raise TripeSyntaxError()
1418 ###----- That's all, folks --------------------------------------------------