chiark / gitweb /
ae88ba3d4f2a3a785f778939a077f69a9a8686bf
[tripe] / py / tripe.py.in
1 ### -*-python-*-
2 ###
3 ### Administration connection with tripe server
4 ###
5 ### (c) 2006 Straylight/Edgeware
6 ###
7
8 ###----- Licensing notice ---------------------------------------------------
9 ###
10 ### This file is part of Trivial IP Encryption (TrIPE).
11 ###
12 ### TrIPE is free software; you can redistribute it and/or modify
13 ### it under the terms of the GNU General Public License as published by
14 ### the Free Software Foundation; either version 2 of the License, or
15 ### (at your option) any later version.
16 ###
17 ### TrIPE is distributed in the hope that it will be useful,
18 ### but WITHOUT ANY WARRANTY; without even the implied warranty of
19 ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 ### GNU General Public License for more details.
21 ###
22 ### You should have received a copy of the GNU General Public License
23 ### along with TrIPE; if not, write to the Free Software Foundation,
24 ### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25
26 """
27 This module provides classes and functions for connecting to a running tripe
28 server, sending it commands, receiving and processing replies, and
29 implementing services.
30
31 Rather than end up in lost in a storm of little event-driven classes, or a
32 morass of concurrent threads, the module uses coroutines to present a fairly
33 simple function call/return interface to potentially long-running commands
34 which must run without blocking the main process.  It assumes a coroutine
35 module presenting a subset of the `greenlet' interface: if actual greenlets
36 are available, they are used; otherwise there's an implementation in terms of
37 threads (with lots of locking) which will do instead.
38
39 The simple rule governing the coroutines used here is this:
40
41   * The root coroutine never cares what values are passed to it when it
42     resumes: it just discards them.
43
44   * Other, non-root, coroutines are presumed to be waiting for some specific
45     thing.
46
47 Configuration variables:
48         configdir
49         socketdir
50         PACKAGE
51         VERSION
52         tripesock
53         peerdb
54
55 Other useful variables:
56         rootcr
57         svcmgr
58
59 Other tweakables:
60         _debug
61
62 Exceptions:
63         Exception
64           StandardError
65             TripeConnectionError
66             TripeError
67             TripeInternalError
68           TripeJobCancelled
69           TripeJobError
70           TripeSyntaxError
71
72 Classes:
73         _Coroutine
74           Coroutine
75             TripeServiceJob
76         OptParse
77         Queue
78         SelIOWatcher
79         TripeCommand
80           TripeSynchronousCommand
81           TripeAsynchronousCommand
82         TripeCommandIterator
83         TripeConnection
84           TripeCommandDispatcher
85             TripeServiceManager
86         TripeService
87         TripeServiceCommand
88
89 Utility functions:
90         quotify
91         runservices
92         spawn
93         svcinfo
94         timespec
95 """
96
97 __pychecker__ = 'self=me no-constCond no-argsused'
98
99 _debug = False
100
101 ###--------------------------------------------------------------------------
102 ### External dependencies.
103
104 import socket as S
105 import errno as E
106 import mLib as M
107 import re as RX
108 import sys as SYS
109 import os as OS
110
111 try:
112   if OS.getenv('TRIPE_FORCE_RMCR') is not None:
113     raise ImportError
114   from py.magic import greenlet as _Coroutine
115 except ImportError:
116   from rmcr import Coroutine as _Coroutine
117
118 ###--------------------------------------------------------------------------
119 ### Coroutine hacking.
120
121 rootcr = _Coroutine.getcurrent()
122
123 class Coroutine (_Coroutine):
124   """
125   A coroutine class which can only be invoked by the root coroutine.
126
127   The root, by construction, cannot be an instance of this class.
128   """
129   def switch(me, *args, **kw):
130     assert _Coroutine.getcurrent() is rootcr
131     if _debug: print '* %s' % me
132     _Coroutine.switch(me, *args, **kw)
133     if _debug: print '* %s' % rootcr
134
135 ###--------------------------------------------------------------------------
136 ### Default places for things.
137
138 configdir = OS.environ.get('TRIPEDIR', "@configdir@")
139 socketdir = "@socketdir@"
140 PACKAGE = "@PACKAGE@"
141 VERSION = "@VERSION@"
142
143 tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
144 peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb')
145
146 ###--------------------------------------------------------------------------
147 ### Connection to the server.
148
149 def readnonblockingly(sock, len):
150   """
151   Nonblocking read from SOCK.
152
153   Try to return LEN bytes.  If couldn't read anything, return `None'.  EOF is
154   returned as an empty string.
155   """
156   try:
157     sock.setblocking(0)
158     return sock.recv(len)
159   except S.error, exc:
160     if exc[0] == E.EWOULDBLOCK:
161       return None
162     raise
163
164 class TripeConnectionError (StandardError):
165   """Something happened to the connection with the server."""
166   pass
167 class TripeInternalError (StandardError):
168   """This program is very confused."""
169   pass
170
171 class TripeConnection (object):
172   """
173   A logical connection to the tripe administration socket.
174
175   There may or may not be a physical connection.  (This is needed for the
176   monitor, for example.)
177
178   This class isn't very useful on its own, but it has useful subclasses.  At
179   this level, the class is agnostic about I/O multiplexing schemes; that gets
180   added later.
181   """
182
183   def __init__(me, socket):
184     """
185     Make a connection to the named SOCKET.
186
187     No physical connection is made initially.
188     """
189     me.socket = socket
190     me.sock = None
191     me.lbuf = None
192     me.iowatch = SelIOWatcher(me)
193
194   def connect(me):
195     """
196     Ensure that there's a physical connection.
197
198     Do nothing if we're already connected.  Invoke the `connected' method if
199     successful.
200     """
201     if me.sock: return
202     sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
203     sock.connect(me.socket)
204     me.sock = sock
205     me.lbuf = M.LineBuffer(me.line, me._eof)
206     me.lbuf.size = 1024
207     me.connected()
208     return me
209
210   def disconnect(me, reason):
211     """
212     Disconnect the physical connection.
213
214     Invoke the `disconnected' method, giving the provided REASON, which
215     should be either `None' or an exception.
216     """
217     if not me.sock: return
218     me.disconnected(reason)
219     me.sock.close()
220     me.sock = None
221     me.lbuf.disable()
222     me.lbuf = None
223     return me
224
225   def connectedp(me):
226     """
227     Return true if there's a current, believed-good physical connection.
228     """
229     return me.sock is not None
230
231   __nonzero__ = connectedp
232
233   def send(me, line):
234     """
235     Send the LINE to the connection's socket.
236
237     All output is done through this method; it can be overridden to provide
238     proper nonblocking writing, though this seems generally unnecessary.
239     """
240     try:
241       me.sock.setblocking(1)
242       me.sock.send(line + '\n')
243     except Exception, exc:
244       me.disconnect(exc)
245       raise
246     return me
247
248   def receive(me):
249     """
250     Receive whatever's ready from the connection's socket.
251
252     Call `line' on each complete line, and `eof' if the connection closed.
253     Subclasses which attach this class to an I/O-event system should call
254     this method when the socket (the `sock' attribute) is ready for reading.
255     """
256     while me.sock is not None:
257       try:
258         buf = readnonblockingly(me.sock, 16384)
259       except Exception, exc:
260         me.disconnect(exc)
261         raise
262       if buf is None:
263         return me
264       if buf == '':
265         me._eof()
266         return me
267       me.lbuf.flush(buf)
268     return me
269
270   def _eof(me):
271     """Internal end-of-file handler."""
272     me.disconnect(TripeConnectionError('connection lost'))
273     me.eof()
274
275   def connected(me):
276     """
277     To be overridden by subclasses to react to a connection being
278     established.
279     """
280     me.iowatch.connected(me.sock)
281
282   def disconnected(me, reason):
283     """
284     To be overridden by subclasses to react to a connection being severed.
285     """
286     me.iowatch.disconnected()
287
288   def eof(me):
289     """To be overridden by subclasses to handle end-of-file."""
290     pass
291
292   def line(me, line):
293     """To be overridden by subclasses to handle incoming lines."""
294     pass
295
296 ###--------------------------------------------------------------------------
297 ### I/O loop integration.
298
299 class SelIOWatcher (object):
300   """
301   Integration with mLib's I/O event system.
302
303   You can replace this object with a different one for integration with,
304   e.g., glib's main loop, by setting `CONN.iowatcher' to a different object
305   while the CONN is disconnected.
306   """
307
308   def __init__(me, conn):
309     me._conn = conn
310     me._selfile = None
311
312   def connected(me, sock):
313     """
314     Called when a connection is made.
315
316     SOCK is the socket.  The watcher must arrange to call `CONN.receive' when
317     data is available.
318     """
319     me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive)
320     me._selfile.enable()
321
322   def disconnected(me):
323     """
324     Called when the connection is lost.
325     """
326     me._selfile = None
327
328   def iterate(me):
329     """
330     Wait for something interesting to happen, and issue events.
331
332     That is, basically, do one iteration of a main select loop, processing
333     all of the events, and then return.  This is used in the method
334     `TripeCommandDispatcher.mainloop', but that's mostly for the benefit of
335     `runservices'; if your I/O watcher has a different main loop, you can
336     drive it yourself.
337     """
338     M.select()
339
340 ###--------------------------------------------------------------------------
341 ### Inter-coroutine communication.
342
343 class Queue (object):
344   """
345   A queue of things arriving asynchronously.
346
347   This is a very simple single-reader multiple-writer queue.  It's useful for
348   more complex coroutines which need to cope with a variety of possible
349   incoming events.
350   """
351
352   def __init__(me):
353     """Create a new empty queue."""
354     me.contents = M.Array()
355     me.waiter = None
356
357   def _wait(me):
358     """
359     Internal: wait for an item to arrive in the queue.
360
361     Complain if someone is already waiting, because this is just a
362     single-reader queue.
363     """
364     if me.waiter:
365       raise ValueError('queue already being waited on')
366     try:
367       me.waiter = Coroutine.getcurrent()
368       while not me.contents:
369         me.waiter.parent.switch()
370     finally:
371       me.waiter = None
372
373   def get(me):
374     """
375     Remove and return the item at the head of the queue.
376
377     If the queue is empty, wait until an item arrives.
378     """
379     me._wait()
380     return me.contents.shift()
381
382   def peek(me):
383     """
384     Return the item at the head of the queue without removing it.
385
386     If the queue is empty, wait until an item arrives.
387     """
388     me._wait()
389     return me.contents[0]
390
391   def put(me, thing):
392     """
393     Write THING to the queue.
394
395     If someone is waiting on the queue, wake him up immediately; otherwise
396     just leave the item there for later.
397     """
398     me.contents.push(thing)
399     if me.waiter:
400       me.waiter.switch()
401
402 ###--------------------------------------------------------------------------
403 ### Dispatching coroutine.
404
405 ## Match a string if it can stand on its own as a bareword: i.e., it doesn't
406 ## contain backslashes, quotes or whitespace.
407 rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
408
409 ## Match characters which need to be escaped, even in quoted text.
410 rx_weird = RX.compile(r'([\\\'])')
411
412 def quotify(s):
413   """Quote S according to the tripe-admin(5) rules."""
414   m = rx_ordinary.match(s)
415   if m and m.end() == len(s):
416     return s
417   else:
418     return "'" + rx_weird.sub(r'\\\1', s) + "'"
419
420 def _callback(func):
421   """
422   Return a wrapper for FUNC which reports exceptions thrown by it.
423
424   Useful in the case of callbacks invoked by C functions which ignore
425   exceptions.
426   """
427   def _(*a, **kw):
428     try:
429       return func(*a, **kw)
430     except:
431       SYS.excepthook(*SYS.exc_info())
432       raise
433   return _
434
435 class TripeCommand (object):
436   """
437   This abstract class represents a command in progress.
438
439   The `words' attribute contains the list of tokens which make up the
440   command.
441
442   Subclasses must implement a method to handle server responses:
443
444     * response(CODE, *ARGS): CODE is one of the strings `OK', `INFO' or
445       `FAIL'; ARGS are the remaining tokens from the server's response.
446   """
447
448   def __init__(me, words):
449     """Make a new command consisting of the given list of WORDS."""
450     me.words = words
451
452 class TripeSynchronousCommand (TripeCommand):
453   """
454   A simple command, processed apparently synchronously.
455
456   Must be invoked from a coroutine other than the root (or whichever one is
457   running the dispatcher); in reality, other coroutines carry on running
458   while we wait for a response from the server.
459
460   Each server response causes the calling coroutine to be resumed with the
461   pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
462   or `FAIL') and REST is a list of the server's other response tokens.  The
463   calling coroutine must continue switching back to the dispatcher until a
464   terminating response (`OK' or `FAIL') is received or become very
465   confused.
466
467   Mostly it's better to use the `TripeCommandIterator' to do this
468   automatically.
469   """
470
471   def __init__(me, words):
472     """Initialize the command, specifying the WORDS to send to the server."""
473     TripeCommand.__init__(me, words)
474     me.owner = Coroutine.getcurrent()
475
476   def response(me, code, *rest):
477     """Handle a server response by forwarding it to the calling coroutine."""
478     me.owner.switch((code, rest))
479
480 class TripeError (StandardError):
481   """
482   A tripe command failed with an error (a `FAIL' code).  The args attribute
483   contains a list of the server's message tokens.
484   """
485   pass
486
487 class TripeCommandIterator (object):
488   """
489   Iterator interface to a tripe command.
490
491   The values returned by the iterator are lists of tokens from the server's
492   `INFO' lines, as processed by the given filter function, if any.  The
493   iterator completes normally (by raising `StopIteration') if the server
494   reported `OK', and raises an exception if the command failed for some reason.
495
496   A `TripeError' is raised if the server issues a `FAIL' code.  If the
497   connection failed, some other exception is raised.
498   """
499
500   def __init__(me, dispatcher, words, bg = False, filter = None):
501     """
502     Create a new command iterator.
503
504     The command is submitted to the DISPATCHER; it consists of the given
505     WORDS.  If BG is true, then an option is inserted to request that the
506     server run the command in the background.  The FILTER is applied to the
507     token lists which the server responds, and the filter's output are the
508     items returned by the iterator.
509     """
510     me.dcr = Coroutine.getcurrent().parent
511     if me.dcr is None:
512       raise ValueError, 'must invoke from coroutine'
513     me.filter = filter or (lambda x: x)
514     if bg:
515       words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
516     dispatcher.rawcommand(TripeSynchronousCommand(words))
517
518   def __iter__(me):
519     """Iterator protocol: I am my own iterator."""
520     return me
521
522   def next(me):
523     """
524     Iterator protocol: return the next piece of information from the server.
525
526     `INFO' responses are filtered and returned as the values of the
527     iteration.  `FAIL' and `CONNERR' responses are turned into exceptions and
528     raised.  Finally, `OK' is turned into `StopIteration', which should cause
529     a normal end to the iteration process.
530     """
531     thing = me.dcr.switch()
532     code, rest = thing
533     if code == 'INFO':
534       return me.filter(rest)
535     elif code == 'OK':
536       raise StopIteration
537     elif code == 'CONNERR':
538       if rest is None:
539         raise TripeConnectionError, 'connection terminated by user'
540       else:
541         raise rest
542     elif code == 'FAIL':
543       raise TripeError(*rest)
544     else:
545       raise TripeInternalError \
546             ('unexpected tripe response %r' % ([code] + rest))
547
548 ### Simple utility functions for the TripeCommandIterator convenience
549 ### methods.
550
551 def _tokenjoin(words):
552   """Filter function: simply join the given tokens with spaces between."""
553   return ' '.join(words)
554
555 def _keyvals(iter):
556   """Return a dictionary formed from the `KEY=VALUE' pairs returned by the
557   iterator ITER."""
558   kv = {}
559   for ww in iter:
560     for w in ww:
561       q = w.index('=')
562       kv[w[:q]] = w[q + 1:]
563   return kv
564
565 def _simple(iter):
566   """Raise an error if ITER contains any item."""
567   stuff = list(iter)
568   if len(stuff) != 0:
569     raise TripeInternalError('expected no response')
570   return None
571
572 def _oneline(iter):
573   """If ITER contains a single item, return it; otherwise raise an error."""
574   stuff = list(iter)
575   if len(stuff) != 1:
576     raise TripeInternalError('expected only one line of response')
577   return stuff[0]
578
579 def _tracelike(iter):
580   """Handle a TRACE-like command.  The result is a list of tuples (CHAR,
581   STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
582   disabled, `+' if enabled, maybe something else later), and DESC is the
583   human-readable description."""
584   stuff = []
585   for ww in iter:
586     ch = ww[0][0]
587     st = ww[0][1:]
588     desc = ' '.join(ww[1:])
589     stuff.append((ch, st, desc))
590   return stuff
591
592 def _kwopts(kw, allowed):
593   """Parse keyword arguments into options.  ALLOWED is a list of allowable
594   keywords; raise errors if other keywords are present.  `KEY = VALUE'
595   becomes an option pair `-KEY VALUE' if VALUE is a string, just the option
596   `-KEY' if VALUE is a true non-string, or nothing if VALUE is false.  Insert
597   a `--' at the end to stop the parser getting confused."""
598   opts = []
599   amap = {}
600   for a in allowed: amap[a] = True
601   for k, v in kw.iteritems():
602     if k not in amap:
603       raise ValueError('option %s not allowed here' % k)
604     if isinstance(v, str):
605       opts += ['-' + k, v]
606     elif v:
607       opts += ['-' + k]
608   opts.append('--')
609   return opts
610
611 ## Deferral.
612 _deferq = []
613 def defer(func, *args, **kw):
614   """Call FUNC(*ARGS, **KW) later, in the root coroutine."""
615   _deferq.append((func, args, kw))
616
617 def funargstr(func, args, kw):
618   items = [repr(a) for a in args]
619   for k, v in kw.iteritems():
620     items.append('%s = %r' % (k, v))
621   return '%s(%s)' % (func.__name__, ', '.join(items))
622
623 def spawn(func, *args, **kw):
624   """Call FUNC, passing ARGS and KW, in a fresh coroutine."""
625   defer(lambda: (Coroutine(func, name = funargstr(func, args, kw))
626                  .switch(*args, **kw)))
627
628 ## Asides.
629 _asideq = Queue()
630 def _runasides():
631   """
632   Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW).
633   """
634   while True:
635     func, args, kw = _asideq.get()
636     try:
637       func(*args, **kw)
638     except:
639       SYS.excepthook(*SYS.exc_info())
640
641 def aside(func, *args, **kw):
642   """Call FUNC(*ARGS, **KW) later, in a non-root coroutine."""
643   defer(_asideq.put, (func, args, kw))
644
645 class TripeCommandDispatcher (TripeConnection):
646   """
647   Command dispatcher.
648
649   The command dispatcher is a connection which knows how to handle commands.
650   This is probably the most important class in this module to understand.
651
652   Lines from the server are parsed into tokens.  The first token is a code
653   (`OK' or `NOTE' or something) explaining what kind of line this is.  The
654   `handler' attribute is a dictionary mapping server line codes to handler
655   functions, which are applied to the words of the line as individual
656   arguments.  *Exception*: the content of `TRACE' lines is not tokenized.
657
658   There are default handlers for server codes which respond to commands.
659   Commands arrive as `TripeCommand' instances through the `rawcommand'
660   interface.  The dispatcher keeps track of which command objects represent
661   which jobs, and sends responses on to the appropriate command objects by
662   invoking their `response' methods.  Command objects don't see the `BG...'
663   codes, because the dispatcher has already transformed them into regular
664   codes when it was looking up the job tag.
665
666   The dispatcher also has a special response code of its own: `CONNERR'
667   indicates that the connection failed and the command has therefore been
668   lost.  This is sent to all outstanding commands when a connection error is
669   encountered: rather than a token list, it is accompanied by an exception
670   object which is the cause of the disconnection, which may be `None' if the
671   disconnection is expected (e.g., the direct result of a user request).
672   """
673
674   ## --- Infrastructure ---
675   ##
676   ## We will get confused if we pipeline commands.  Send them one at a time.
677   ## Only send a command when the previous one detaches or completes.
678   ##
679   ## The following attributes are interesting:
680   ##
681   ## tagseq             Sequence number for next background job (for bgtag)
682   ##
683   ## queue              Commands awaiting submission.
684   ##
685   ## cmd                Mapping from job tags to commands: cmd[None] is the
686   ##                    foreground command.
687   ##
688   ## handler            Mapping from server codes to handler functions.
689
690   def __init__(me, socket):
691     """
692     Initialize the dispatcher.
693
694     The SOCKET is the filename of the administration socket to connect to,
695     for TripeConnection.__init__.
696     """
697     TripeConnection.__init__(me, socket)
698     me.tagseq = 0
699     me.handler = {}
700     me.handler['BGDETACH'] = me._detach
701     for i in 'BGOK', 'BGINFO', 'BGFAIL':
702       me.handler[i] = me._response
703     for i in 'OK', 'INFO', 'FAIL':
704       me.handler[i] = me._fgresponse
705
706   def quitp(me):
707     """Should we quit the main loop?  Subclasses should override."""
708     return False
709
710   def mainloop(me, quitp = None):
711     """
712     Iterate the I/O watcher until QUITP returns true.
713
714     Arranges for asides and deferred calls to be made at the right times.
715     """
716
717     global _deferq
718     assert _Coroutine.getcurrent() is rootcr
719     Coroutine(_runasides, name = '_runasides').switch()
720     if quitp is None:
721       quitp = me.quitp
722     while not quitp():
723       while _deferq:
724         q = _deferq
725         _deferq = []
726         for func, args, kw in q:
727           func(*args, **kw)
728       me.iowatch.iterate()
729
730   def connected(me):
731     """
732     Connection hook.
733
734     If a subclass overrides this method, it must call us; clears out the
735     command queue and job map.
736     """
737     me.queue = M.Array()
738     me.cmd = {}
739     TripeConnection.connected(me)
740
741   def disconnected(me, reason):
742     """
743     Disconnection hook.
744
745     If a subclass hooks overrides this method, it must call us; sends a
746     special `CONNERR' code to all incomplete commands.
747     """
748     TripeConnection.disconnected(me, reason)
749     for cmd in me.cmd.itervalues():
750       cmd.response('CONNERR', reason)
751     for cmd in me.queue:
752       cmd.response('CONNERR', reason)
753
754   @_callback
755   def line(me, line):
756     """Handle an incoming line, sending it to the right place."""
757     if _debug: print '<', line
758     code, rest = M.word(line, quotep = True)
759     func = me.handler.get(code)
760     if func is not None:
761       if code == 'TRACE':
762         func(code, rest)
763       else:
764         func(code, *M.split(rest, quotep = True)[0])
765       me.dequeue()
766
767   def dequeue(me):
768     """
769     Pull the oldest command off the queue and try to send it to the server.
770     """
771     if not me.queue or None in me.cmd: return
772     cmd = me.queue.shift()
773     if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
774     me.send(' '.join([quotify(w) for w in cmd.words]))
775     me.cmd[None] = cmd
776
777   def bgtag(me):
778     """
779     Return an unused job tag.
780
781     May be of use when composing commands by hand.
782     """
783     tag = 'J%05d' % me.tagseq
784     me.tagseq += 1
785     return tag
786
787   ## --- Built-in handler functions for server responses ---
788
789   def _detach(me, _, tag):
790     """
791     Respond to a `BGDETACH' TAG message.
792
793     Move the current foreground command to the background.
794     """
795     assert tag not in me.cmd
796     me.cmd[tag] = me.cmd[None]
797     del me.cmd[None]
798
799   def _response(me, code, tag, *w):
800     """
801     Respond to an `OK', `INFO' or `FAIL' message.
802
803     If this is a message for a background job, find the tag; then dispatch
804     the result to the command object.  This is also called by `_fgresponse'
805     (wth TAG set to `None') to handle responses for foreground commands, and
806     is therefore a useful method to extend or override in subclasses.
807     """
808     if code.startswith('BG'):
809       code = code[2:]
810     cmd = me.cmd[tag]
811     if code != 'INFO':
812       del me.cmd[tag]
813     cmd.response(code, *w)
814
815   def _fgresponse(me, code, *w):
816     """Process responses to the foreground command."""
817     me._response(code, None, *w)
818
819   ## --- Interface methods ---
820
821   def rawcommand(me, cmd):
822     """
823     Submit the `TripeCommand' CMD to the server, and look after it until it
824     completes.
825     """
826     if not me.connectedp():
827       raise TripeConnectionError('connection closed')
828     me.queue.push(cmd)
829     me.dequeue()
830
831   def command(me, *cmd, **kw):
832     """Convenience wrapper for creating a TripeCommandIterator object."""
833     return TripeCommandIterator(me, cmd, **kw)
834
835   ## --- Convenience methods for server commands ---
836
837   def add(me, peer, *addr, **kw):
838     return _simple(me.command(bg = True,
839                               *['ADD'] +
840                               _kwopts(kw, ['tunnel', 'keepalive',
841                                            'key', 'priv', 'cork',
842                                            'mobile']) +
843                               [peer] +
844                               list(addr)))
845   def addr(me, peer):
846     return _oneline(me.command('ADDR', peer))
847   def algs(me, peer = None):
848     return _keyvals(me.command('ALGS',
849                                *((peer is not None and [peer]) or [])))
850   def checkchal(me, chal):
851     return _simple(me.command('CHECKCHAL', chal))
852   def daemon(me):
853     return _simple(me.command('DAEMON'))
854   def eping(me, peer, **kw):
855     return _oneline(me.command(bg = True,
856                                *['PING'] +
857                                _kwopts(kw, ['timeout']) +
858                                [peer]))
859   def forcekx(me, peer):
860     return _simple(me.command('FORCEKX', peer))
861   def getchal(me):
862     return _oneline(me.command('GETCHAL', filter = _tokenjoin))
863   def greet(me, peer, chal):
864     return _simple(me.command('GREET', peer, chal))
865   def help(me):
866     return list(me.command('HELP', filter = _tokenjoin))
867   def ifname(me, peer):
868     return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
869   def kill(me, peer):
870     return _simple(me.command('KILL', peer))
871   def list(me):
872     return list(me.command('LIST', filter = _tokenjoin))
873   def notify(me, *msg):
874     return _simple(me.command('NOTIFY', *msg))
875   def peerinfo(me, peer):
876     return _keyvals(me.command('PEERINFO', peer))
877   def ping(me, peer, **kw):
878     return _oneline(me.command(bg = True,
879                                *['PING'] +
880                                _kwopts(kw, ['timeout']) +
881                                [peer]))
882   def port(me):
883     return _oneline(me.command('PORT', filter = _tokenjoin))
884   def quit(me):
885     return _simple(me.command('QUIT'))
886   def reload(me):
887     return _simple(me.command('RELOAD'))
888   def servinfo(me):
889     return _keyvals(me.command('SERVINFO'))
890   def setifname(me, new):
891     return _simple(me.command('SETIFNAME', new))
892   def svcclaim(me, service, version):
893     return _simple(me.command('SVCCLAIM', service, version))
894   def svcensure(me, service, version = None):
895     return _simple(me.command('SVCENSURE', service,
896                               *((version is not None and [version]) or [])))
897   def svcfail(me, job, *msg):
898     return _simple(me.command('SVCFAIL', job, *msg))
899   def svcinfo(me, job, *msg):
900     return _simple(me.command('SVCINFO', job, *msg))
901   def svclist(me):
902     return list(me.command('SVCLIST'))
903   def svcok(me, job):
904     return _simple(me.command('SVCOK', job))
905   def svcquery(me, service):
906     return _keyvals(me.command('SVCQUERY', service))
907   def svcrelease(me, service):
908     return _simple(me.command('SVCRELEASE', service))
909   def svcsubmit(me, service, *args, **kw):
910     return me.command(bg = True,
911                       *['SVCSUBMIT'] +
912                       _kwopts(kw, ['version']) +
913                       [service] +
914                       list(args))
915   def stats(me, peer):
916     return _keyvals(me.command('STATS', peer))
917   def trace(me, *args):
918     return _tracelike(me.command('TRACE', *args))
919   def tunnels(me):
920     return list(me.command('TUNNELS', filter = _tokenjoin))
921   def version(me):
922     return _oneline(me.command('VERSION', filter = _tokenjoin))
923   def warn(me, *msg):
924     return _simple(me.command('WARN', *msg))
925   def watch(me, *args):
926     return _tracelike(me.command('WATCH', *args))
927
928 ###--------------------------------------------------------------------------
929 ### Asynchronous commands.
930
931 class TripeAsynchronousCommand (TripeCommand):
932   """
933   Asynchronous commands.
934
935   This is the complicated way of issuing commands.  You must set up a queue,
936   and associate the command with the queue.  Responses arriving for the
937   command will be put on the queue as an triple of the form (TAG, CODE, REST)
938   -- where TAG is an object of your choice, not interpreted by this class,
939   CODE is the server's response code (`OK', `INFO', `FAIL', or `CONNERR'),
940   and REST is the list of the rest of the server's tokens.
941
942   Using this, you can write coroutines which process many commands (and
943   possibly other events) simultaneously.
944   """
945
946   def __init__(me, queue, tag, words):
947     """Make an asynchronous command consisting of the given WORDS, which
948     sends responses to QUEUE, labelled with TAG."""
949     TripeCommand.__init__(me, words)
950     me.queue = queue
951     me.tag = tag
952
953   def response(me, code, *stuff):
954     """Handle a server response by writing it to the caller's queue."""
955     me.queue.put((me.tag, code, list(stuff)))
956
957 ###--------------------------------------------------------------------------
958 ### Services.
959
960 class TripeJobCancelled (Exception):
961   """
962   Exception sent to job handler if the client kills the job.
963
964   Not propagated further.
965   """
966   pass
967
968 class TripeJobError (Exception):
969   """
970   Exception to cause failure report for running job.
971
972   Sends an SVCFAIL code back.
973   """
974   pass
975
976 class TripeSyntaxError (Exception):
977   """
978   Exception to report a syntax error for a job.
979
980   Sends an SVCFAIL bad-svc-syntax message back.
981   """
982   pass
983
984 class TripeServiceManager (TripeCommandDispatcher):
985   """
986   A command dispatcher with added handling for incoming service requests.
987
988   There is usually only one instance of this class, called svcmgr.  Some of
989   the support functions in this module assume that this is the case.
990
991   To use, run `mLib.select' in a loop until the quitp method returns true;
992   then, in a non-root coroutine, register your services by calling `add', and
993   then call `running' when you've finished setting up.
994
995   The instance handles server service messages `SVCJOB', `SVCCANCEL' and
996   `SVCCLAIM'.  It maintains a table of running services.  Incoming jobs cause
997   the service's `job' method to be invoked; `SVCCANCEL' sends a
998   `TripeJobCancelled' exception to the handler coroutine, and `SVCCLAIM'
999   causes the relevant service to be deregistered.
1000
1001   There is no base class for jobs, but a job must implement two methods:
1002
1003   start()               Begin processing; might be a no-op.
1004
1005   cancel()              Stop processing; the original client has killed the
1006                         job.
1007
1008   The life of a service manager is divided into two parts: setup and running;
1009   you tell the manager that you've finished setting up by calling the
1010   `running' method.  If, at any point after setup is finished, there are no
1011   remaining services or jobs, `quitp' will return true, ending the process.
1012   """
1013
1014   ## --- Attributes ---
1015   ##
1016   ## svc                Mapping name -> service object
1017   ##
1018   ## job                Mapping jobid -> job handler coroutine
1019   ##
1020   ## runningp           True when setup is finished
1021   ##
1022   ## _quitp             True if explicit quit has been requested
1023
1024   def __init__(me, socket):
1025     """
1026     Initialize the service manager.
1027
1028     SOCKET is the administration socket to connect to.
1029     """
1030     TripeCommandDispatcher.__init__(me, socket)
1031     me.svc = {}
1032     me.job = {}
1033     me.runningp = False
1034     me.handler['SVCCANCEL'] = me._cancel
1035     me.handler['SVCJOB'] = me._job
1036     me.handler['SVCCLAIM'] = me._claim
1037     me._quitp = 0
1038
1039   def addsvc(me, svc):
1040     """Register a new service; SVC is a `TripeService' instance."""
1041     assert svc.name not in me.svc
1042     me.svcclaim(svc.name, svc.version)
1043     me.svc[svc.name] = svc
1044
1045   def _cancel(me, _, jid):
1046     """
1047     Called when the server cancels a job; invokes the job's `cancel' method.
1048     """
1049     job = me.job[jid]
1050     del me.job[jid]
1051     job.cancel()
1052
1053   def _claim(me, _, svc, __):
1054     """Called when another program claims our service at a higher version."""
1055     del me.svc[svc]
1056
1057   def _job(me, _, jid, svc, cmd, *args):
1058     """
1059     Called when the server sends us a job to do.
1060
1061     Calls the service to collect a job, and begins processing it.
1062     """
1063     assert jid not in me.job
1064     svc = me.svc[svc.lower()]
1065     job = svc.job(jid, cmd, args)
1066     me.job[jid] = job
1067     job.start()
1068
1069   def running(me):
1070     """Answer true if setup is finished."""
1071     me.runningp = True
1072
1073   def jobdone(me, jid):
1074     """Informs the service manager that the job with id JID has finished."""
1075     try:
1076       del me.job[jid]
1077     except KeyError:
1078       pass
1079
1080   def quitp(me):
1081     """
1082     Return true if no services or jobs are active (and, therefore, if this
1083     process can quit without anyone caring).
1084     """
1085     return me._quitp or (me.runningp and ((not me.svc and not me.job) or
1086                                           not me.sock))
1087
1088   def quit(me):
1089     """Forces the quit flag (returned by quitp) on."""
1090     me._quitp = True
1091
1092 class TripeService (object):
1093   """
1094   A standard service.
1095
1096   The NAME and VERSION are passed on to the server.  The CMDTAB is a
1097   dictionary mapping command names (in lowercase) to command objects.
1098
1099   If the CMDTAB doesn't have entries for commands `HELP' and `QUIT' then
1100   defaults are provided.
1101
1102   TripeService itself is mostly agnostic about the nature of command objects,
1103   but the TripeServiceJob class (below) has some requirements.  The built-in
1104   HELP command requires command objects to have `usage' attributes.
1105   """
1106
1107   def __init__(me, name, version, cmdtab):
1108     """
1109     Create and register a new service with the given NAME and VERSION.
1110
1111     CMDTAB maps command names (in lower-case) to command objects.
1112     """
1113     me.name = name
1114     me.version = version
1115     me.cmd = cmdtab
1116     me.activep = True
1117     me.cmd.setdefault('help',
1118                       TripeServiceCommand('help', 0, 0, '', me._help))
1119     me.cmd.setdefault('quit',
1120                       TripeServiceCommand('quit', 0, 0, '', me._quit))
1121
1122   def job(me, jid, cmd, args):
1123     """
1124     Called by the service manager: a job arrived with id JID.
1125
1126     It asks for comamnd CMD with argument list ARGS.  Creates a new job,
1127     passing it the information needed.
1128     """
1129     return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1130
1131   ## Simple default command handlers, complying with the spec in
1132   ## tripe-service(7).
1133
1134   def _help(me):
1135     """Send a help summary to the user."""
1136     cmds = me.cmd.items()
1137     cmds.sort()
1138     for name, cmd in cmds:
1139       svcinfo(name, *cmd.usage)
1140
1141   def _quit(me):
1142     """Terminate the service manager."""
1143     svcmgr.notify('svc-quit', me.name, 'admin-request')
1144     svcmgr.quit()
1145
1146 class TripeServiceCommand (object):
1147   """A simple service command."""
1148
1149   def __init__(me, name, min, max, usage, func):
1150     """
1151     Creates a new command.
1152
1153     NAME is the command's name (in lowercase).
1154
1155     MIN and MAX are the minimum and maximum number of allowed arguments (used
1156     for checking); either may be None to indicate no minimum or maximum.
1157
1158     USAGE is a usage string, used for generating help and error messages.
1159
1160     FUNC is the function to invoke.
1161     """
1162     me.name = name
1163     me.min = min
1164     me.max = max
1165     me.usage = usage.split()
1166     me.func = func
1167
1168   def run(me, *args):
1169     """
1170     Called when the command is invoked.
1171
1172     Does minimal checking of the arguments and calls the supplied function.
1173     """
1174     if (me.min is not None and len(args) < me.min) or \
1175        (me.max is not None and len(args) > me.max):
1176       raise TripeSyntaxError
1177     me.func(*args)
1178
1179 class TripeServiceJob (Coroutine):
1180   """
1181   Job handler coroutine.
1182
1183   A standard `TripeService' invokes a `TripeServiceJob' for each incoming job
1184   request, passing it the jobid, command and arguments, and a command object.
1185   The command object needs the following attributes.
1186
1187   usage                 A usage list (excluding the command name) showing
1188                         arguments and options.
1189
1190   run(*ARGS)            Function to react to the command with ARGS split into
1191                         separate arguments.  Invoked in a coroutine.  The
1192                         `svcinfo function (not the `TripeCommandDispatcher'
1193                         method) may be used to send `INFO' lines.  The
1194                         function may raise `TripeJobError' to send a `FAIL'
1195                         response back, or `TripeSyntaxError' to send a
1196                         generic usage error.  `TripeJobCancelled' exceptions
1197                         are trapped silently.  Other exceptions are
1198                         translated into a generic internal-error message.
1199
1200   This class automatically takes care of sending some closing response to the
1201   job, and for informing the service manager that the job is completed.
1202
1203   The `jid' attribute stores the job's id.
1204   """
1205
1206   def __init__(me, jid, svc, cmd, command, args):
1207     """
1208     Start a new job.
1209
1210     The job is created with id JID, for service SVC, processing command name
1211     CMD (which the service resolved into the command object COMMAND, or
1212     `None'), and with the arguments ARGS.
1213     """
1214     Coroutine.__init__(me)
1215     me.jid = jid
1216     me.svc = svc
1217     me.cmd = cmd
1218     me.command = command
1219     me.args = args
1220
1221   def run(me):
1222     """
1223     Main body of the coroutine.
1224
1225     Does the tedious exception handling boilerplate and invokes the command's
1226     run method.
1227     """
1228     try:
1229       try:
1230         if me.command is None:
1231           svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1232         else:
1233           me.command.run(*me.args)
1234           svcmgr.svcok(me.jid)
1235       except TripeJobError, exc:
1236         svcmgr.svcfail(me.jid, *exc.args)
1237       except TripeSyntaxError:
1238         svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1239                        me.svc.name, me.command.name,
1240                        *me.command.usage)
1241       except TripeJobCancelled:
1242         pass
1243       except Exception, exc:
1244         svcmgr.svcfail(me.jid, 'svc-internal-error',
1245                        exc.__class__.__name__, str(exc))
1246     finally:
1247       svcmgr.jobdone(me.jid)
1248
1249   def start(me):
1250     """Invoked by the service manager to start running the coroutine."""
1251     me.switch()
1252
1253   def cancel(me):
1254     """Invoked by the service manager to cancel the job."""
1255     me.throw(TripeJobCancelled())
1256
1257 def svcinfo(*args):
1258   """
1259   If invoked from a TripeServiceJob coroutine, sends an `INFO' line to the
1260   job's sender, automatically using the correct job id.
1261   """
1262   svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1263
1264 def _setupsvc(tab, func):
1265   """
1266   Setup coroutine for setting up service programs.
1267
1268   Register the given services.
1269   """
1270   try:
1271     for service in tab:
1272       svcmgr.addsvc(service)
1273     if func:
1274       func()
1275   finally:
1276     svcmgr.running()
1277
1278 svcmgr = TripeServiceManager(None)
1279 def runservices(socket, tab, init = None, setup = None, daemon = False):
1280   """
1281   Function to start a service provider.
1282
1283   SOCKET is the socket to connect to, usually tripesock.
1284
1285   TAB is a list of entries.  An entry may be either a tuple
1286
1287     (NAME, VERSION, COMMANDS)
1288
1289   or a service object (e.g., a `TripeService' instance).
1290
1291   COMMANDS is a dictionary mapping command names to tuples
1292
1293     (MIN, MAX, USAGE, FUNC)
1294
1295   of arguments for a `TripeServiceCommand' object.
1296
1297   If DAEMON is true, then the process is forked into the background before we
1298   start.  If INIT is given, it is called in the main coroutine, immediately
1299   after forking.  If SETUP is given, it is called in a coroutine, after
1300   calling INIT and setting up the services but before marking the service
1301   manager as running.
1302
1303   It is a really bad idea to do any initialization, particularly setting up
1304   coroutines, outside of the INIT or SETUP functions.  In particular, if
1305   we're using rmcr for fake coroutines, the daemonizing fork will kill off
1306   the currently established coroutines in a most surprising way.
1307
1308   The function runs a main select loop until the service manager decides to
1309   quit.
1310   """
1311
1312   svcmgr.socket = socket
1313   svcmgr.connect()
1314   svcs = []
1315   for service in tab:
1316     if not isinstance(service, tuple):
1317       svcs.append(service)
1318     else:
1319       name, version, commands = service
1320       cmdmap = {}
1321       for cmd, stuff in commands.iteritems():
1322         cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1323       svcs.append(TripeService(name, version, cmdmap))
1324   if daemon:
1325     M.daemonize()
1326   if init is not None:
1327     init()
1328   spawn(_setupsvc, svcs, setup)
1329   svcmgr.mainloop()
1330
1331 ###--------------------------------------------------------------------------
1332 ### Utilities for services.
1333
1334 _timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
1335 def timespec(spec):
1336   """Parse the timespec SPEC, returning a number of seconds."""
1337   mul = 1
1338   if len(spec) > 1 and spec[-1] in _timeunits:
1339     mul = _timeunits[spec[-1]]
1340     spec = spec[:-1]
1341   try:
1342     t = int(spec)
1343   except:
1344     raise TripeJobError('bad-time-spec', spec)
1345   if t < 0:
1346     raise TripeJobError('bad-time-spec', spec)
1347   return mul * int(spec)
1348
1349 class OptParse (object):
1350   """
1351   Parse options from a command list in the conventional fashion.
1352
1353   ARGS is a list of arguments to a command.  ALLOWED is a sequence of allowed
1354   options.  The returned values are the option tags.  During parsing, the
1355   `arg' method may be used to retrieve the argument for the most recent
1356   option.  Afterwards, `rest' may be used to retrieve the remaining
1357   non-option arguments, and do a simple check on how many there are.
1358
1359   The parser correctly handles `--' option terminators.
1360   """
1361
1362   def __init__(me, args, allowed):
1363     """
1364     Create a new option parser.
1365
1366     The parser will scan the ARGS for options given in the sequence ALLOWED
1367     (which are expected to include the `-' prefix).
1368     """
1369     me.allowed = {}
1370     for a in allowed:
1371       me.allowed[a] = True
1372     me.args = list(args)
1373
1374   def __iter__(me):
1375     """Iterator protocol: I am my own iterator."""
1376     return me
1377
1378   def next(me):
1379     """
1380     Iterator protocol: return the next option.
1381
1382     If we've run out, raise `StopIteration'.
1383     """
1384     if len(me.args) == 0 or \
1385        len(me.args[0]) < 2 or \
1386        not me.args[0].startswith('-'):
1387       raise StopIteration
1388     opt = me.args.pop(0)
1389     if opt == '--':
1390       raise StopIteration
1391     if opt not in me.allowed:
1392       raise TripeSyntaxError
1393     return opt
1394
1395   def arg(me):
1396     """
1397     Return the argument for the most recent option.
1398
1399     If none is available, raise `TripeSyntaxError'.
1400     """
1401     if len(me.args) == 0:
1402       raise TripeSyntaxError
1403     return me.args.pop(0)
1404
1405   def rest(me, min = None, max = None):
1406     """
1407     After option parsing is done, return the remaining arguments.
1408
1409     Check that there are at least MIN and at most MAX arguments remaining --
1410     either may be None to suppress the check.
1411     """
1412     if (min is not None and len(me.args) < min) or \
1413        (max is not None and len(me.args) > max):
1414       raise TripeSyntaxError
1415     return me.args
1416
1417 ###----- That's all, folks --------------------------------------------------