3 ### Administration connection with tripe server
5 ### (c) 2006 Straylight/Edgeware
8 ###----- Licensing notice ---------------------------------------------------
10 ### This file is part of Trivial IP Encryption (TrIPE).
12 ### TrIPE is free software; you can redistribute it and/or modify
13 ### it under the terms of the GNU General Public License as published by
14 ### the Free Software Foundation; either version 2 of the License, or
15 ### (at your option) any later version.
17 ### TrIPE is distributed in the hope that it will be useful,
18 ### but WITHOUT ANY WARRANTY; without even the implied warranty of
19 ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 ### GNU General Public License for more details.
22 ### You should have received a copy of the GNU General Public License
23 ### along with TrIPE; if not, write to the Free Software Foundation,
24 ### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
27 This module provides classes and functions for connecting to a running tripe
28 server, sending it commands, receiving and processing replies, and
29 implementing services.
31 Rather than end up in lost in a storm of little event-driven classes, or a
32 morass of concurrent threads, the module uses coroutines to present a fairly
33 simple function call/return interface to potentially long-running commands
34 which must run without blocking the main process. It assumes a coroutine
35 module presenting a subset of the `greenlet' interface: if actual greenlets
36 are available, they are used; otherwise there's an implementation in terms of
37 threads (with lots of locking) which will do instead.
39 The simple rule governing the coroutines used here is this:
41 * The root coroutine never cares what values are passed to it when it
42 resumes: it just discards them.
44 * Other, non-root, coroutines are presumed to be waiting for some specific
47 Configuration variables:
55 Other useful variables:
80 TripeSynchronousCommand
81 TripeAsynchronousCommand
84 TripeCommandDispatcher
97 __pychecker__ = 'self=me no-constCond no-argsused'
101 ###--------------------------------------------------------------------------
102 ### External dependencies.
112 if OS.getenv('TRIPE_FORCE_RMCR') is not None:
114 from py.magic import greenlet as _Coroutine
116 from rmcr import Coroutine as _Coroutine
118 ###--------------------------------------------------------------------------
119 ### Coroutine hacking.
121 rootcr = _Coroutine.getcurrent()
123 class Coroutine (_Coroutine):
125 A coroutine class which can only be invoked by the root coroutine.
127 The root, by construction, cannot be an instance of this class.
129 def switch(me, *args, **kw):
130 assert _Coroutine.getcurrent() is rootcr
131 if _debug: print '* %s' % me
132 _Coroutine.switch(me, *args, **kw)
133 if _debug: print '* %s' % rootcr
135 ###--------------------------------------------------------------------------
136 ### Default places for things.
138 configdir = OS.environ.get('TRIPEDIR', "@configdir@")
139 socketdir = "@socketdir@"
140 PACKAGE = "@PACKAGE@"
141 VERSION = "@VERSION@"
143 tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
144 peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb')
146 ###--------------------------------------------------------------------------
147 ### Connection to the server.
149 def readnonblockingly(sock, len):
151 Nonblocking read from SOCK.
153 Try to return LEN bytes. If couldn't read anything, return None. EOF is
154 returned as an empty string.
158 return sock.recv(len)
160 if exc[0] == E.EWOULDBLOCK:
164 class TripeConnectionError (StandardError):
165 """Something happened to the connection with the server."""
167 class TripeInternalError (StandardError):
168 """This program is very confused."""
171 class TripeConnection (object):
173 A logical connection to the tripe administration socket.
175 There may or may not be a physical connection. (This is needed for the
176 monitor, for example.)
178 This class isn't very useful on its own, but it has useful subclasses. At
179 this level, the class is agnostic about I/O multiplexing schemes; that gets
183 def __init__(me, socket):
185 Make a connection to the named SOCKET.
187 No physical connection is made initially.
192 me.iowatch = SelIOWatcher(me)
196 Ensure that there's a physical connection.
198 Do nothing if we're already connected. Invoke the `connected' method if
202 sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
203 sock.connect(me.socket)
205 me.lbuf = M.LineBuffer(me.line, me._eof)
210 def disconnect(me, reason):
212 Disconnect the physical connection.
214 Invoke the `disconnected' method, giving the provided REASON, which
215 should be either None or an exception.
217 if not me.sock: return
218 me.disconnected(reason)
227 Return true if there's a current, believed-good physical connection.
229 return me.sock is not None
231 __nonzero__ = connectedp
235 Send the LINE to the connection's socket.
237 All output is done through this method; it can be overridden to provide
238 proper nonblocking writing, though this seems generally unnecessary.
241 me.sock.setblocking(1)
242 me.sock.send(line + '\n')
243 except Exception, exc:
250 Receive whatever's ready from the connection's socket.
252 Call `line' on each complete line, and `eof' if the connection closed.
253 Subclasses which attach this class to an I/O-event system should call
254 this method when the socket (CONN.sock) is ready for reading.
256 while me.sock is not None:
258 buf = readnonblockingly(me.sock, 16384)
259 except Exception, exc:
271 """Internal end-of-file handler."""
272 me.disconnect(TripeConnectionError('connection lost'))
277 To be overridden by subclasses to react to a connection being
280 me.iowatch.connected(me.sock)
282 def disconnected(me, reason):
284 To be overridden by subclasses to react to a connection being severed.
286 me.iowatch.disconnected()
289 """To be overridden by subclasses to handle end-of-file."""
293 """To be overridden by subclasses to handle incoming lines."""
296 ###--------------------------------------------------------------------------
297 ### I/O loop integration.
299 class SelIOWatcher (object):
301 Integration with mLib's I/O event system.
303 You can replace this object with a different one for integration with,
304 e.g., glib's main loop, by setting CONN.iowatcher to a different object
305 while the CONN is disconnected.
308 def __init__(me, conn):
312 def connected(me, sock):
314 Called when a connection is made.
316 SOCK is the socket. The watcher must arrange to call CONN.receive when
319 me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive)
322 def disconnected(me):
324 Called when the connection is lost.
330 Wait for something interesting to happen, and issue events.
332 That is, basically, do one iteration of a main select loop, processing
333 all of the events, and then return. This isn't needed for
334 TripeCommandDispatcher, but runservices wants it.
338 ###--------------------------------------------------------------------------
339 ### Inter-coroutine communication.
341 class Queue (object):
343 A queue of things arriving asynchronously.
345 This is a very simple single-reader multiple-writer queue. It's useful for
346 more complex coroutines which need to cope with a variety of possible
351 """Create a new empty queue."""
352 me.contents = M.Array()
357 Internal: wait for an item to arrive in the queue.
359 Complain if someone is already waiting, because this is just a
363 raise ValueError('queue already being waited on')
365 me.waiter = Coroutine.getcurrent()
366 while not me.contents:
367 me.waiter.parent.switch()
373 Remove and return the item at the head of the queue.
375 If the queue is empty, wait until an item arrives.
378 return me.contents.shift()
382 Return the item at the head of the queue without removing it.
384 If the queue is empty, wait until an item arrives.
387 return me.contents[0]
391 Write THING to the queue.
393 If someone is waiting on the queue, wake him up immediately; otherwise
394 just leave the item there for later.
396 me.contents.push(thing)
400 ###--------------------------------------------------------------------------
401 ### Dispatching coroutine.
403 ## Match a string if it can stand on its own as a bareword: i.e., it doesn't
404 ## contain backslashes, quotes or whitespace.
405 rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
407 ## Match characters which need to be escaped, even in quoted text.
408 rx_weird = RX.compile(r'([\\\'])')
411 """Quote S according to the tripe-admin(5) rules."""
412 m = rx_ordinary.match(s)
413 if m and m.end() == len(s):
416 return "'" + rx_weird.sub(r'\\\1', s) + "'"
420 Return a wrapper for FUNC which reports exceptions thrown by it.
422 Useful in the case of callbacks invoked by C functions which ignore
427 return func(*a, **kw)
429 SYS.excepthook(*SYS.exc_info())
433 class TripeCommand (object):
435 This abstract class represents a command in progress.
437 The `words' attribute contains the list of tokens which make up the
440 Subclasses must implement a method to handle server responses:
442 * response(CODE, *ARGS): CODE is one of the strings 'OK', 'INFO' or
443 'FAIL'; ARGS are the remaining tokens from the server's response.
446 def __init__(me, words):
447 """Make a new command consisting of the given list of WORDS."""
450 class TripeSynchronousCommand (TripeCommand):
452 A simple command, processed apparently synchronously.
454 Must be invoked from a coroutine other than the root (or whichever one is
455 running the dispatcher); in reality, other coroutines carry on running
456 while we wait for a response from the server.
458 Each server response causes the calling coroutine to be resumed with the
459 pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
460 or `FAIL') and REST is a list of the server's other response tokens. The
461 calling coroutine must continue switching back to the dispatcher until a
462 terminating response (`OK' or `FAIL') is received or become very
465 Mostly it's better to use the TripeCommandIterator to do this
469 def __init__(me, words):
470 """Initialize the command, specifying the WORDS to send to the server."""
471 TripeCommand.__init__(me, words)
472 me.owner = Coroutine.getcurrent()
474 def response(me, code, *rest):
475 """Handle a server response by forwarding it to the calling coroutine."""
476 me.owner.switch((code, rest))
478 class TripeError (StandardError):
480 A tripe command failed with an error (a FAIL code). The args attribute
481 contains a list of the server's message tokens.
485 class TripeCommandIterator (object):
487 Iterator interface to a tripe command.
489 The values returned by the iterator are lists of tokens from the server's
490 INFO lines, as processed by the given filter function, if any. The
491 iterator completes normally (by raising StopIteration) if the server
492 reported OK, and raises an exception if the command failed for some reason.
494 A TripeError is raised if the server issues a FAIL code. If the connection
495 failed, some other exception is raised.
498 def __init__(me, dispatcher, words, bg = False, filter = None):
500 Create a new command iterator.
502 The command is submitted to the DISPATCHER; it consists of the given
503 WORDS. If BG is true, then an option is inserted to request that the
504 server run the command in the background. The FILTER is applied to the
505 token lists which the server responds, and the filter's output are the
506 items returned by the iterator.
508 me.dcr = Coroutine.getcurrent().parent
510 raise ValueError, 'must invoke from coroutine'
511 me.filter = filter or (lambda x: x)
513 words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
514 dispatcher.rawcommand(TripeSynchronousCommand(words))
517 """Iterator protocol: I am my own iterator."""
522 Iterator protocol: return the next piece of information from the server.
524 INFO responses are filtered and returned as the values of the iteration.
525 FAIL and CONNERR responses are turned into exceptions and raised.
526 Finally, OK is turned into StopIteration, which should cause a normal end
527 to the iteration process.
529 thing = me.dcr.switch()
532 return me.filter(rest)
535 elif code == 'CONNERR':
537 raise TripeConnectionError, 'connection terminated by user'
541 raise TripeError(*rest)
543 raise TripeInternalError \
544 ('unexpected tripe response %r' % ([code] + rest))
546 ### Simple utility functions for the TripeCommandIterator convenience
549 def _tokenjoin(words):
550 """Filter function: simply join the given tokens with spaces between."""
551 return ' '.join(words)
554 """Return a dictionary formed from the KEY=VALUE pairs returned by the
560 kv[w[:q]] = w[q + 1:]
564 """Raise an error if ITER contains any item."""
567 raise TripeInternalError('expected no response')
571 """If ITER contains a single item, return it; otherwise raise an error."""
574 raise TripeInternalError('expected only one line of response')
577 def _tracelike(iter):
578 """Handle a TRACE-like command. The result is a list of tuples (CHAR,
579 STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
580 disabled, `+' if enabled, maybe something else later), and DESC is the
581 human-readable description."""
586 desc = ' '.join(ww[1:])
587 stuff.append((ch, st, desc))
590 def _kwopts(kw, allowed):
591 """Parse keyword arguments into options. ALLOWED is a list of allowable
592 keywords; raise errors if other keywords are present. KEY = VALUE becomes
593 an option pair -KEY VALUE if VALUE is a string, just the option -KEY if
594 VALUE is a true non-string, or nothing if VALUE is false.. Insert a `--'
595 at the end to stop the parser getting confused."""
598 for a in allowed: amap[a] = True
599 for k, v in kw.iteritems():
601 raise ValueError('option %s not allowed here' % k)
602 if isinstance(v, str):
611 def defer(func, *args, **kw):
612 """Call FUNC(*ARGS, **KW) later, in the root coroutine."""
613 _deferq.append((func, args, kw))
615 def funargstr(func, args, kw):
616 items = [repr(a) for a in args]
617 for k, v in kw.iteritems():
618 items.append('%s = %r' % (k, v))
619 return '%s(%s)' % (func.__name__, ', '.join(items))
621 def spawn(func, *args, **kw):
622 """Call FUNC, passing ARGS and KW, in a fresh coroutine."""
623 defer(lambda: (Coroutine(func, name = funargstr(func, args, kw))
624 .switch(*args, **kw)))
630 Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW).
633 func, args, kw = _asideq.get()
637 SYS.excepthook(*SYS.exc_info())
639 def aside(func, *args, **kw):
640 """Call FUNC(*ARGS, **KW) later, in a non-root coroutine."""
641 defer(_asideq.put, (func, args, kw))
643 class TripeCommandDispatcher (TripeConnection):
647 The command dispatcher is a connection which knows how to handle commands.
648 This is probably the most important class in this module to understand.
650 Lines from the server are parsed into tokens. The first token is a code
651 (OK or NOTE or something) explaining what kind of line this is. The
652 `handler' attribute is a dictionary mapping server line codes to handler
653 functions, which are applied to the words of the line as individual
654 arguments. *Exception*: the content of TRACE lines is not tokenized.
656 There are default handlers for server codes which respond to commands.
657 Commands arrive as TripeCommand instances through the `rawcommand'
658 interface. The dispatcher keeps track of which command objects represent
659 which jobs, and sends responses on to the appropriate command objects by
660 invoking their `response' methods. Command objects don't see the
661 BG... codes, because the dispatcher has already transformed them into
662 regular codes when it was looking up job code.
664 The dispatcher also has a special response code of its own: CONNERR
665 indicates that the connection failed and the command has therefore been
666 lost. This is sent to all outstanding commands when a connection error is
667 encountered: rather than a token list, it is accompanied by an exception
668 object which is the cause of the disconnection, which may be `None' if the
669 disconnection is expected (e.g., the direct result of a user request).
672 ## --- Infrastructure ---
674 ## We will get confused if we pipeline commands. Send them one at a time.
675 ## Only send a command when the previous one detaches or completes.
677 ## The following attributes are interesting:
679 ## tagseq Sequence number for next background job (for bgtag)
681 ## queue Commands awaiting submission.
683 ## cmd Mapping from job tags to commands: cmd[None] is the
684 ## foreground command.
686 ## handler Mapping from server codes to handler functions.
688 def __init__(me, socket):
690 Initialize the dispatcher.
692 The SOCKET is the filename of the administration socket to connect to,
693 for TripeConnection.__init__.
695 TripeConnection.__init__(me, socket)
698 me.handler['BGDETACH'] = me._detach
699 for i in 'BGOK', 'BGINFO', 'BGFAIL':
700 me.handler[i] = me._response
701 for i in 'OK', 'INFO', 'FAIL':
702 me.handler[i] = me._fgresponse
705 """Should we quit the main loop? Subclasses should override."""
708 def mainloop(me, quitp = None):
710 Iterate the I/O watcher until QUITP returns true.
712 Arranges for asides and deferred calls to be made at the right times.
716 assert _Coroutine.getcurrent() is rootcr
717 Coroutine(_runasides, name = '_runasides').switch()
724 for func, args, kw in q:
732 If a subclass overrides this method, it must call us; clears out the
733 command queue and job map.
737 TripeConnection.connected(me)
739 def disconnected(me, reason):
743 If a subclass hooks overrides this method, it must call us; sends a
744 special CONNERR code to all incomplete commands.
746 TripeConnection.disconnected(me, reason)
747 for cmd in me.cmd.itervalues():
748 cmd.response('CONNERR', reason)
750 cmd.response('CONNERR', reason)
754 """Handle an incoming line, sending it to the right place."""
755 if _debug: print '<', line
756 code, rest = M.word(line, quotep = True)
757 func = me.handler.get(code)
762 func(code, *M.split(rest, quotep = True)[0])
767 Pull the oldest command off the queue and try to send it to the server.
769 if not me.queue or None in me.cmd: return
770 cmd = me.queue.shift()
771 if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
772 me.send(' '.join([quotify(w) for w in cmd.words]))
777 Return an unused job tag.
779 May be of use when composing commands by hand.
781 tag = 'J%05d' % me.tagseq
785 ## --- Built-in handler functions for server responses ---
787 def _detach(me, _, tag):
789 Respond to a BGDETACH TAG message.
791 Move the current foreground command to the background.
793 assert tag not in me.cmd
794 me.cmd[tag] = me.cmd[None]
797 def _response(me, code, tag, *w):
799 Respond to an OK, INFO or FAIL message.
801 If this is a message for a background job, find the tag; then dispatch
802 the result to the command object.
804 if code.startswith('BG'):
809 cmd.response(code, *w)
811 def _fgresponse(me, code, *w):
812 """Process responses to the foreground command."""
813 me._response(code, None, *w)
815 ## --- Interface methods ---
817 def rawcommand(me, cmd):
819 Submit the TripeCommand CMD to the server, and look after it until it
822 if not me.connectedp():
823 raise TripeConnectionError('connection closed')
827 def command(me, *cmd, **kw):
828 """Convenience wrapper for creating a TripeCommandIterator object."""
829 return TripeCommandIterator(me, cmd, **kw)
831 ## --- Convenience methods for server commands ---
833 def add(me, peer, *addr, **kw):
834 return _simple(me.command(bg = True,
836 _kwopts(kw, ['tunnel', 'keepalive',
837 'key', 'priv', 'cork',
842 return _oneline(me.command('ADDR', peer))
843 def algs(me, peer = None):
844 return _keyvals(me.command('ALGS',
845 *((peer is not None and [peer]) or [])))
846 def checkchal(me, chal):
847 return _simple(me.command('CHECKCHAL', chal))
849 return _simple(me.command('DAEMON'))
850 def eping(me, peer, **kw):
851 return _oneline(me.command(bg = True,
853 _kwopts(kw, ['timeout']) +
855 def forcekx(me, peer):
856 return _simple(me.command('FORCEKX', peer))
858 return _oneline(me.command('GETCHAL', filter = _tokenjoin))
859 def greet(me, peer, chal):
860 return _simple(me.command('GREET', peer, chal))
862 return list(me.command('HELP', filter = _tokenjoin))
863 def ifname(me, peer):
864 return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
866 return _simple(me.command('KILL', peer))
868 return list(me.command('LIST', filter = _tokenjoin))
869 def notify(me, *msg):
870 return _simple(me.command('NOTIFY', *msg))
871 def peerinfo(me, peer):
872 return _keyvals(me.command('PEERINFO', peer))
873 def ping(me, peer, **kw):
874 return _oneline(me.command(bg = True,
876 _kwopts(kw, ['timeout']) +
879 return _oneline(me.command('PORT', filter = _tokenjoin))
881 return _simple(me.command('QUIT'))
883 return _simple(me.command('RELOAD'))
885 return _keyvals(me.command('SERVINFO'))
886 def setifname(me, new):
887 return _simple(me.command('SETIFNAME', new))
888 def svcclaim(me, service, version):
889 return _simple(me.command('SVCCLAIM', service, version))
890 def svcensure(me, service, version = None):
891 return _simple(me.command('SVCENSURE', service,
892 *((version is not None and [version]) or [])))
893 def svcfail(me, job, *msg):
894 return _simple(me.command('SVCFAIL', job, *msg))
895 def svcinfo(me, job, *msg):
896 return _simple(me.command('SVCINFO', job, *msg))
898 return list(me.command('SVCLIST'))
900 return _simple(me.command('SVCOK', job))
901 def svcquery(me, service):
902 return _keyvals(me.command('SVCQUERY', service))
903 def svcrelease(me, service):
904 return _simple(me.command('SVCRELEASE', service))
905 def svcsubmit(me, service, *args, **kw):
906 return me.command(bg = True,
908 _kwopts(kw, ['version']) +
912 return _keyvals(me.command('STATS', peer))
913 def trace(me, *args):
914 return _tracelike(me.command('TRACE', *args))
916 return list(me.command('TUNNELS', filter = _tokenjoin))
918 return _oneline(me.command('VERSION', filter = _tokenjoin))
920 return _simple(me.command('WARN', *msg))
921 def watch(me, *args):
922 return _tracelike(me.command('WATCH', *args))
924 ###--------------------------------------------------------------------------
925 ### Asynchronous commands.
927 class TripeAsynchronousCommand (TripeCommand):
929 Asynchronous commands.
931 This is the complicated way of issuing commands. You must set up a queue,
932 and associate the command with the queue. Responses arriving for the
933 command will be put on the queue as an triple of the form (TAG, CODE, REST)
934 -- where TAG is an object of your choice, not interpreted by this class,
935 CODE is the server's response code (OK, INFO, FAIL), and REST is the list
936 of the rest of the server's tokens.
938 Using this, you can write coroutines which process many commands (and
939 possibly other events) simultaneously.
942 def __init__(me, queue, tag, words):
943 """Make an asynchronous command consisting of the given WORDS, which
944 sends responses to QUEUE, labelled with TAG."""
945 TripeCommand.__init__(me, words)
949 def response(me, code, *stuff):
950 """Handle a server response by writing it to the caller's queue."""
951 me.queue.put((me.tag, code, list(stuff)))
953 ###--------------------------------------------------------------------------
956 class TripeJobCancelled (Exception):
958 Exception sent to job handler if the client kills the job.
960 Not propagated further.
964 class TripeJobError (Exception):
966 Exception to cause failure report for running job.
968 Sends an SVCFAIL code back.
972 class TripeSyntaxError (Exception):
974 Exception to report a syntax error for a job.
976 Sends an SVCFAIL bad-svc-syntax message back.
980 class TripeServiceManager (TripeCommandDispatcher):
982 A command dispatcher with added handling for incoming service requests.
984 There is usually only one instance of this class, called svcmgr. Some of
985 the support functions in this module assume that this is the case.
987 To use, run mLib.select in a loop until the quitp method returns true;
988 then, in a non-root coroutine, register your services by calling `add', and
989 then call `running' when you've finished setting up.
991 The instance handles server service messages SVCJOB, SVCCANCEL and
992 SVCCLAIM. It maintains a table of running services. Incoming jobs cause
993 the service's `job' method to be invoked; SVCCANCEL sends a
994 TripeJobCancelled exception to the handler coroutine, and SVCCLAIM causes
995 the relevant service to be deregistered.
997 There is no base class for jobs, but a job must implement two methods:
999 start() Begin processing; might be a no-op.
1001 cancel() Stop processing; the original client has killed the
1004 The life of a service manager is divided into two parts: setup and running;
1005 you tell the manager that you've finished setting up by calling the
1006 `running' method. If, at any point after setup is finished, there are no
1007 remaining services or jobs, `quitp' will return true, ending the process.
1010 ## --- Attributes ---
1012 ## svc Mapping name -> service object
1014 ## job Mapping jobid -> job handler coroutine
1016 ## runningp True when setup is finished
1018 ## _quitp True if explicit quit has been requested
1020 def __init__(me, socket):
1022 Initialize the service manager.
1024 SOCKET is the administration socket to connect to.
1026 TripeCommandDispatcher.__init__(me, socket)
1030 me.handler['SVCCANCEL'] = me._cancel
1031 me.handler['SVCJOB'] = me._job
1032 me.handler['SVCCLAIM'] = me._claim
1035 def addsvc(me, svc):
1036 """Register a new service; SVC is a TripeService instance."""
1037 assert svc.name not in me.svc
1038 me.svcclaim(svc.name, svc.version)
1039 me.svc[svc.name] = svc
1041 def _cancel(me, _, jid):
1043 Called when the server cancels a job; invokes the job's `cancel' method.
1049 def _claim(me, _, svc, __):
1050 """Called when another program claims our service at a higher version."""
1053 def _job(me, _, jid, svc, cmd, *args):
1055 Called when the server sends us a job to do.
1057 Calls the service to collect a job, and begins processing it.
1059 assert jid not in me.job
1060 svc = me.svc[svc.lower()]
1061 job = svc.job(jid, cmd, args)
1066 """Answer true if setup is finished."""
1069 def jobdone(me, jid):
1070 """Informs the service manager that the job with id JID has finished."""
1078 Return true if no services or jobs are active (and, therefore, if this
1079 process can quit without anyone caring).
1081 return me._quitp or (me.runningp and ((not me.svc and not me.job) or
1085 """Forces the quit flag (returned by quitp) on."""
1088 class TripeService (object):
1092 The NAME and VERSION are passed on to the server. The CMDTAB is a
1093 dictionary mapping command names (in lowercase) to command objects.
1095 If the CMDTAB doesn't have entries for commands HELP and QUIT then defaults
1098 TripeService itself is mostly agnostic about the nature of command objects,
1099 but the TripeServiceJob class (below) has some requirements. The built-in
1100 HELP command requires command objects to have `usage' attributes.
1103 def __init__(me, name, version, cmdtab):
1105 Create and register a new service with the given NAME and VERSION.
1107 CMDTAB maps command names (in lower-case) to command objects.
1110 me.version = version
1113 me.cmd.setdefault('help',
1114 TripeServiceCommand('help', 0, 0, '', me._help))
1115 me.cmd.setdefault('quit',
1116 TripeServiceCommand('quit', 0, 0, '', me._quit))
1118 def job(me, jid, cmd, args):
1120 Called by the service manager: a job arrived with id JID.
1122 It asks for comamnd CMD with argument list ARGS. Creates a new job,
1123 passing it the information needed.
1125 return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1127 ## Simple default command handlers, complying with the spec in
1128 ## tripe-service(7).
1131 """Send a help summary to the user."""
1132 cmds = me.cmd.items()
1134 for name, cmd in cmds:
1135 svcinfo(name, *cmd.usage)
1138 """Terminate the service manager."""
1139 svcmgr.notify('svc-quit', me.name, 'admin-request')
1142 class TripeServiceCommand (object):
1143 """A simple service command."""
1145 def __init__(me, name, min, max, usage, func):
1147 Creates a new command.
1149 NAME is the command's name (in lowercase).
1151 MIN and MAX are the minimum and maximum number of allowed arguments (used
1152 for checking); either may be None to indicate no minimum or maximum.
1154 USAGE is a usage string, used for generating help and error messages.
1156 FUNC is the function to invoke.
1161 me.usage = usage.split()
1166 Called when the command is invoked.
1168 Does minimal checking of the arguments and calls the supplied function.
1170 if (me.min is not None and len(args) < me.min) or \
1171 (me.max is not None and len(args) > me.max):
1172 raise TripeSyntaxError
1175 class TripeServiceJob (Coroutine):
1177 Job handler coroutine.
1179 A standard TripeService invokes a TripeServiceJob for each incoming job
1180 request, passing it the jobid, command and arguments, and a command
1181 object. The command object needs the following attributes.
1183 usage A usage list (excluding the command name) showing
1184 arguments and options.
1186 run(*ARGS) Function to react to the command with ARGS split into
1187 separate arguments. Invoked in a coroutine. The
1188 svcinfo function (not the TripeCommandDispatcher
1189 method) may be used to send INFO lines. The function
1190 may raise TripeJobError to send a FAIL response back,
1191 or TripeSyntaxError to send a generic usage error.
1192 TripeJobCancelled exceptions are trapped silently.
1193 Other exceptions are translated into a generic
1194 internal-error message.
1196 This class automatically takes care of sending some closing response to the
1197 job, and for informing the service manager that the job is completed.
1199 The `jid' attribute stores the job's id.
1202 def __init__(me, jid, svc, cmd, command, args):
1206 The job is created with id JID, for service SVC, processing command name
1207 CMD (which the service resolved into the command object COMMAND, or
1208 None), and with the arguments ARGS.
1210 Coroutine.__init__(me)
1214 me.command = command
1219 Main body of the coroutine.
1221 Does the tedious exception handling boilerplate and invokes the command's
1226 if me.command is None:
1227 svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1229 me.command.run(*me.args)
1230 svcmgr.svcok(me.jid)
1231 except TripeJobError, exc:
1232 svcmgr.svcfail(me.jid, *exc.args)
1233 except TripeSyntaxError:
1234 svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1235 me.svc.name, me.command.name,
1237 except TripeJobCancelled:
1239 except Exception, exc:
1240 svcmgr.svcfail(me.jid, 'svc-internal-error',
1241 exc.__class__.__name__, str(exc))
1243 svcmgr.jobdone(me.jid)
1246 """Invoked by the service manager to start running the coroutine."""
1250 """Invoked by the service manager to cancel the job."""
1251 me.throw(TripeJobCancelled())
1255 If invoked from a TripeServiceJob coroutine, sends an INFO line to the
1256 job's sender, automatically using the correct job id.
1258 svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1260 def _setupsvc(tab, func):
1262 Setup coroutine for setting up service programs.
1264 Register the given services.
1268 svcmgr.addsvc(service)
1274 svcmgr = TripeServiceManager(None)
1275 def runservices(socket, tab, init = None, setup = None, daemon = False):
1277 Function to start a service provider.
1279 SOCKET is the socket to connect to, usually tripesock.
1281 TAB is a list of entries. An entry may be either a tuple
1283 (NAME, VERSION, COMMANDS)
1285 or a service object (e.g., a TripeService instance).
1287 COMMANDS is a dictionary mapping command names to tuples
1289 (MIN, MAX, USAGE, FUNC)
1291 of arguments for a TripeServiceCommand object.
1293 If DAEMON is true, then the process is forked into the background before we
1294 start. If INIT is given, it is called in the main coroutine, immediately
1295 after forking. If SETUP is given, it is called in a coroutine, after
1296 calling INIT and setting up the services but before marking the service
1299 It is a really bad idea to do any initialization, particularly setting up
1300 coroutines, outside of the INIT or SETUP functions. In particular, if
1301 we're using rmcr for fake coroutines, the daemonizing fork will kill off
1302 the currently established coroutines in a most surprising way.
1304 The function runs a main select loop until the service manager decides to
1308 svcmgr.socket = socket
1312 if not isinstance(service, tuple):
1313 svcs.append(service)
1315 name, version, commands = service
1317 for cmd, stuff in commands.iteritems():
1318 cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1319 svcs.append(TripeService(name, version, cmdmap))
1322 if init is not None:
1324 spawn(_setupsvc, svcs, setup)
1327 ###--------------------------------------------------------------------------
1328 ### Utilities for services.
1330 _timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
1332 """Parse the timespec SPEC, returning a number of seconds."""
1334 if len(spec) > 1 and spec[-1] in _timeunits:
1335 mul = _timeunits[spec[-1]]
1340 raise TripeJobError('bad-time-spec', spec)
1342 raise TripeJobError('bad-time-spec', spec)
1343 return mul * int(spec)
1345 class OptParse (object):
1347 Parse options from a command list in the conventional fashion.
1349 ARGS is a list of arguments to a command. ALLOWED is a sequence of allowed
1350 options. The returned values are the option tags. During parsing, the
1351 `arg' method may be used to retrieve the argument for the most recent
1352 option. Afterwards, `rest' may be used to retrieve the remaining
1353 non-option arguments, and do a simple check on how many there are.
1355 The parser correctly handles `--' option terminators.
1358 def __init__(me, args, allowed):
1360 Create a new option parser.
1362 The parser will scan the ARGS for options given in the sequence ALLOWED
1363 (which are expected to include the `-' prefix).
1367 me.allowed[a] = True
1368 me.args = list(args)
1371 """Iterator protocol: I am my own iterator."""
1376 Iterator protocol: return the next option.
1378 If we've run out, raise StopIteration.
1380 if len(me.args) == 0 or \
1381 len(me.args[0]) < 2 or \
1382 not me.args[0].startswith('-'):
1384 opt = me.args.pop(0)
1387 if opt not in me.allowed:
1388 raise TripeSyntaxError
1393 Return the argument for the most recent option.
1395 If none is available, raise TripeSyntaxError.
1397 if len(me.args) == 0:
1398 raise TripeSyntaxError
1399 return me.args.pop(0)
1401 def rest(me, min = None, max = None):
1403 After option parsing is done, return the remaining arguments.
1405 Check that there are at least MIN and at most MAX arguments remaining --
1406 either may be None to suppress the check.
1408 if (min is not None and len(me.args) < min) or \
1409 (max is not None and len(me.args) > max):
1410 raise TripeSyntaxError
1413 ###----- That's all, folks --------------------------------------------------