chiark / gitweb /
py/tripe.py.in: More-or-less consistent quoting in docstrings.
[tripe] / py / tripe.py.in
1 ### -*-python-*-
2 ###
3 ### Administration connection with tripe server
4 ###
5 ### (c) 2006 Straylight/Edgeware
6 ###
7
8 ###----- Licensing notice ---------------------------------------------------
9 ###
10 ### This file is part of Trivial IP Encryption (TrIPE).
11 ###
12 ### TrIPE is free software; you can redistribute it and/or modify
13 ### it under the terms of the GNU General Public License as published by
14 ### the Free Software Foundation; either version 2 of the License, or
15 ### (at your option) any later version.
16 ###
17 ### TrIPE is distributed in the hope that it will be useful,
18 ### but WITHOUT ANY WARRANTY; without even the implied warranty of
19 ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 ### GNU General Public License for more details.
21 ###
22 ### You should have received a copy of the GNU General Public License
23 ### along with TrIPE; if not, write to the Free Software Foundation,
24 ### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25
26 """
27 This module provides classes and functions for connecting to a running tripe
28 server, sending it commands, receiving and processing replies, and
29 implementing services.
30
31 Rather than end up in lost in a storm of little event-driven classes, or a
32 morass of concurrent threads, the module uses coroutines to present a fairly
33 simple function call/return interface to potentially long-running commands
34 which must run without blocking the main process.  It assumes a coroutine
35 module presenting a subset of the `greenlet' interface: if actual greenlets
36 are available, they are used; otherwise there's an implementation in terms of
37 threads (with lots of locking) which will do instead.
38
39 The simple rule governing the coroutines used here is this:
40
41   * The root coroutine never cares what values are passed to it when it
42     resumes: it just discards them.
43
44   * Other, non-root, coroutines are presumed to be waiting for some specific
45     thing.
46
47 Configuration variables:
48         configdir
49         socketdir
50         PACKAGE
51         VERSION
52         tripesock
53         peerdb
54
55 Other useful variables:
56         rootcr
57         svcmgr
58
59 Other tweakables:
60         _debug
61
62 Exceptions:
63         Exception
64           StandardError
65             TripeConnectionError
66             TripeError
67             TripeInternalError
68           TripeJobCancelled
69           TripeJobError
70           TripeSyntaxError
71
72 Classes:
73         _Coroutine
74           Coroutine
75             TripeServiceJob
76         OptParse
77         Queue
78         SelIOWatcher
79         TripeCommand
80           TripeSynchronousCommand
81           TripeAsynchronousCommand
82         TripeCommandIterator
83         TripeConnection
84           TripeCommandDispatcher
85             TripeServiceManager
86         TripeService
87         TripeServiceCommand
88
89 Utility functions:
90         quotify
91         runservices
92         spawn
93         svcinfo
94         timespec
95 """
96
97 __pychecker__ = 'self=me no-constCond no-argsused'
98
99 _debug = False
100
101 ###--------------------------------------------------------------------------
102 ### External dependencies.
103
104 import socket as S
105 import errno as E
106 import mLib as M
107 import re as RX
108 import sys as SYS
109 import os as OS
110
111 try:
112   if OS.getenv('TRIPE_FORCE_RMCR') is not None:
113     raise ImportError
114   from py.magic import greenlet as _Coroutine
115 except ImportError:
116   from rmcr import Coroutine as _Coroutine
117
118 ###--------------------------------------------------------------------------
119 ### Coroutine hacking.
120
121 rootcr = _Coroutine.getcurrent()
122
123 class Coroutine (_Coroutine):
124   """
125   A coroutine class which can only be invoked by the root coroutine.
126
127   The root, by construction, cannot be an instance of this class.
128   """
129   def switch(me, *args, **kw):
130     assert _Coroutine.getcurrent() is rootcr
131     if _debug: print '* %s' % me
132     _Coroutine.switch(me, *args, **kw)
133     if _debug: print '* %s' % rootcr
134
135 ###--------------------------------------------------------------------------
136 ### Default places for things.
137
138 configdir = OS.environ.get('TRIPEDIR', "@configdir@")
139 socketdir = "@socketdir@"
140 PACKAGE = "@PACKAGE@"
141 VERSION = "@VERSION@"
142
143 tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
144 peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb')
145
146 ###--------------------------------------------------------------------------
147 ### Connection to the server.
148
149 def readnonblockingly(sock, len):
150   """
151   Nonblocking read from SOCK.
152
153   Try to return LEN bytes.  If couldn't read anything, return `None'.  EOF is
154   returned as an empty string.
155   """
156   try:
157     sock.setblocking(0)
158     return sock.recv(len)
159   except S.error, exc:
160     if exc[0] == E.EWOULDBLOCK:
161       return None
162     raise
163
164 class TripeConnectionError (StandardError):
165   """Something happened to the connection with the server."""
166   pass
167 class TripeInternalError (StandardError):
168   """This program is very confused."""
169   pass
170
171 class TripeConnection (object):
172   """
173   A logical connection to the tripe administration socket.
174
175   There may or may not be a physical connection.  (This is needed for the
176   monitor, for example.)
177
178   This class isn't very useful on its own, but it has useful subclasses.  At
179   this level, the class is agnostic about I/O multiplexing schemes; that gets
180   added later.
181   """
182
183   def __init__(me, socket):
184     """
185     Make a connection to the named SOCKET.
186
187     No physical connection is made initially.
188     """
189     me.socket = socket
190     me.sock = None
191     me.lbuf = None
192     me.iowatch = SelIOWatcher(me)
193
194   def connect(me):
195     """
196     Ensure that there's a physical connection.
197
198     Do nothing if we're already connected.  Invoke the `connected' method if
199     successful.
200     """
201     if me.sock: return
202     sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
203     sock.connect(me.socket)
204     me.sock = sock
205     me.lbuf = M.LineBuffer(me.line, me._eof)
206     me.lbuf.size = 1024
207     me.connected()
208     return me
209
210   def disconnect(me, reason):
211     """
212     Disconnect the physical connection.
213
214     Invoke the `disconnected' method, giving the provided REASON, which
215     should be either `None' or an exception.
216     """
217     if not me.sock: return
218     me.disconnected(reason)
219     me.sock.close()
220     me.sock = None
221     me.lbuf.disable()
222     me.lbuf = None
223     return me
224
225   def connectedp(me):
226     """
227     Return true if there's a current, believed-good physical connection.
228     """
229     return me.sock is not None
230
231   __nonzero__ = connectedp
232
233   def send(me, line):
234     """
235     Send the LINE to the connection's socket.
236
237     All output is done through this method; it can be overridden to provide
238     proper nonblocking writing, though this seems generally unnecessary.
239     """
240     try:
241       me.sock.setblocking(1)
242       me.sock.send(line + '\n')
243     except Exception, exc:
244       me.disconnect(exc)
245       raise
246     return me
247
248   def receive(me):
249     """
250     Receive whatever's ready from the connection's socket.
251
252     Call `line' on each complete line, and `eof' if the connection closed.
253     Subclasses which attach this class to an I/O-event system should call
254     this method when the socket (the `sock' attribute) is ready for reading.
255     """
256     while me.sock is not None:
257       try:
258         buf = readnonblockingly(me.sock, 16384)
259       except Exception, exc:
260         me.disconnect(exc)
261         raise
262       if buf is None:
263         return me
264       if buf == '':
265         me._eof()
266         return me
267       me.lbuf.flush(buf)
268     return me
269
270   def _eof(me):
271     """Internal end-of-file handler."""
272     me.disconnect(TripeConnectionError('connection lost'))
273     me.eof()
274
275   def connected(me):
276     """
277     To be overridden by subclasses to react to a connection being
278     established.
279     """
280     me.iowatch.connected(me.sock)
281
282   def disconnected(me, reason):
283     """
284     To be overridden by subclasses to react to a connection being severed.
285     """
286     me.iowatch.disconnected()
287
288   def eof(me):
289     """To be overridden by subclasses to handle end-of-file."""
290     pass
291
292   def line(me, line):
293     """To be overridden by subclasses to handle incoming lines."""
294     pass
295
296 ###--------------------------------------------------------------------------
297 ### I/O loop integration.
298
299 class SelIOWatcher (object):
300   """
301   Integration with mLib's I/O event system.
302
303   You can replace this object with a different one for integration with,
304   e.g., glib's main loop, by setting `CONN.iowatcher' to a different object
305   while the CONN is disconnected.
306   """
307
308   def __init__(me, conn):
309     me._conn = conn
310     me._selfile = None
311
312   def connected(me, sock):
313     """
314     Called when a connection is made.
315
316     SOCK is the socket.  The watcher must arrange to call `CONN.receive' when
317     data is available.
318     """
319     me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive)
320     me._selfile.enable()
321
322   def disconnected(me):
323     """
324     Called when the connection is lost.
325     """
326     me._selfile = None
327
328   def iterate(me):
329     """
330     Wait for something interesting to happen, and issue events.
331
332     That is, basically, do one iteration of a main select loop, processing
333     all of the events, and then return.  This isn't needed for
334     `TripeCommandDispatcher', but `runservices' wants it.
335     """
336     M.select()
337
338 ###--------------------------------------------------------------------------
339 ### Inter-coroutine communication.
340
341 class Queue (object):
342   """
343   A queue of things arriving asynchronously.
344
345   This is a very simple single-reader multiple-writer queue.  It's useful for
346   more complex coroutines which need to cope with a variety of possible
347   incoming events.
348   """
349
350   def __init__(me):
351     """Create a new empty queue."""
352     me.contents = M.Array()
353     me.waiter = None
354
355   def _wait(me):
356     """
357     Internal: wait for an item to arrive in the queue.
358
359     Complain if someone is already waiting, because this is just a
360     single-reader queue.
361     """
362     if me.waiter:
363       raise ValueError('queue already being waited on')
364     try:
365       me.waiter = Coroutine.getcurrent()
366       while not me.contents:
367         me.waiter.parent.switch()
368     finally:
369       me.waiter = None
370
371   def get(me):
372     """
373     Remove and return the item at the head of the queue.
374
375     If the queue is empty, wait until an item arrives.
376     """
377     me._wait()
378     return me.contents.shift()
379
380   def peek(me):
381     """
382     Return the item at the head of the queue without removing it.
383
384     If the queue is empty, wait until an item arrives.
385     """
386     me._wait()
387     return me.contents[0]
388
389   def put(me, thing):
390     """
391     Write THING to the queue.
392
393     If someone is waiting on the queue, wake him up immediately; otherwise
394     just leave the item there for later.
395     """
396     me.contents.push(thing)
397     if me.waiter:
398       me.waiter.switch()
399
400 ###--------------------------------------------------------------------------
401 ### Dispatching coroutine.
402
403 ## Match a string if it can stand on its own as a bareword: i.e., it doesn't
404 ## contain backslashes, quotes or whitespace.
405 rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
406
407 ## Match characters which need to be escaped, even in quoted text.
408 rx_weird = RX.compile(r'([\\\'])')
409
410 def quotify(s):
411   """Quote S according to the tripe-admin(5) rules."""
412   m = rx_ordinary.match(s)
413   if m and m.end() == len(s):
414     return s
415   else:
416     return "'" + rx_weird.sub(r'\\\1', s) + "'"
417
418 def _callback(func):
419   """
420   Return a wrapper for FUNC which reports exceptions thrown by it.
421
422   Useful in the case of callbacks invoked by C functions which ignore
423   exceptions.
424   """
425   def _(*a, **kw):
426     try:
427       return func(*a, **kw)
428     except:
429       SYS.excepthook(*SYS.exc_info())
430       raise
431   return _
432
433 class TripeCommand (object):
434   """
435   This abstract class represents a command in progress.
436
437   The `words' attribute contains the list of tokens which make up the
438   command.
439
440   Subclasses must implement a method to handle server responses:
441
442     * response(CODE, *ARGS): CODE is one of the strings `OK', `INFO' or
443       `FAIL'; ARGS are the remaining tokens from the server's response.
444   """
445
446   def __init__(me, words):
447     """Make a new command consisting of the given list of WORDS."""
448     me.words = words
449
450 class TripeSynchronousCommand (TripeCommand):
451   """
452   A simple command, processed apparently synchronously.
453
454   Must be invoked from a coroutine other than the root (or whichever one is
455   running the dispatcher); in reality, other coroutines carry on running
456   while we wait for a response from the server.
457
458   Each server response causes the calling coroutine to be resumed with the
459   pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
460   or `FAIL') and REST is a list of the server's other response tokens.  The
461   calling coroutine must continue switching back to the dispatcher until a
462   terminating response (`OK' or `FAIL') is received or become very
463   confused.
464
465   Mostly it's better to use the `TripeCommandIterator' to do this
466   automatically.
467   """
468
469   def __init__(me, words):
470     """Initialize the command, specifying the WORDS to send to the server."""
471     TripeCommand.__init__(me, words)
472     me.owner = Coroutine.getcurrent()
473
474   def response(me, code, *rest):
475     """Handle a server response by forwarding it to the calling coroutine."""
476     me.owner.switch((code, rest))
477
478 class TripeError (StandardError):
479   """
480   A tripe command failed with an error (a `FAIL' code).  The args attribute
481   contains a list of the server's message tokens.
482   """
483   pass
484
485 class TripeCommandIterator (object):
486   """
487   Iterator interface to a tripe command.
488
489   The values returned by the iterator are lists of tokens from the server's
490   `INFO' lines, as processed by the given filter function, if any.  The
491   iterator completes normally (by raising `StopIteration') if the server
492   reported `OK', and raises an exception if the command failed for some reason.
493
494   A `TripeError' is raised if the server issues a `FAIL' code.  If the
495   connection failed, some other exception is raised.
496   """
497
498   def __init__(me, dispatcher, words, bg = False, filter = None):
499     """
500     Create a new command iterator.
501
502     The command is submitted to the DISPATCHER; it consists of the given
503     WORDS.  If BG is true, then an option is inserted to request that the
504     server run the command in the background.  The FILTER is applied to the
505     token lists which the server responds, and the filter's output are the
506     items returned by the iterator.
507     """
508     me.dcr = Coroutine.getcurrent().parent
509     if me.dcr is None:
510       raise ValueError, 'must invoke from coroutine'
511     me.filter = filter or (lambda x: x)
512     if bg:
513       words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
514     dispatcher.rawcommand(TripeSynchronousCommand(words))
515
516   def __iter__(me):
517     """Iterator protocol: I am my own iterator."""
518     return me
519
520   def next(me):
521     """
522     Iterator protocol: return the next piece of information from the server.
523
524     `INFO' responses are filtered and returned as the values of the
525     iteration.  `FAIL' and `CONNERR' responses are turned into exceptions and
526     raised.  Finally, `OK' is turned into `StopIteration', which should cause
527     a normal end to the iteration process.
528     """
529     thing = me.dcr.switch()
530     code, rest = thing
531     if code == 'INFO':
532       return me.filter(rest)
533     elif code == 'OK':
534       raise StopIteration
535     elif code == 'CONNERR':
536       if rest is None:
537         raise TripeConnectionError, 'connection terminated by user'
538       else:
539         raise rest
540     elif code == 'FAIL':
541       raise TripeError(*rest)
542     else:
543       raise TripeInternalError \
544             ('unexpected tripe response %r' % ([code] + rest))
545
546 ### Simple utility functions for the TripeCommandIterator convenience
547 ### methods.
548
549 def _tokenjoin(words):
550   """Filter function: simply join the given tokens with spaces between."""
551   return ' '.join(words)
552
553 def _keyvals(iter):
554   """Return a dictionary formed from the `KEY=VALUE' pairs returned by the
555   iterator ITER."""
556   kv = {}
557   for ww in iter:
558     for w in ww:
559       q = w.index('=')
560       kv[w[:q]] = w[q + 1:]
561   return kv
562
563 def _simple(iter):
564   """Raise an error if ITER contains any item."""
565   stuff = list(iter)
566   if len(stuff) != 0:
567     raise TripeInternalError('expected no response')
568   return None
569
570 def _oneline(iter):
571   """If ITER contains a single item, return it; otherwise raise an error."""
572   stuff = list(iter)
573   if len(stuff) != 1:
574     raise TripeInternalError('expected only one line of response')
575   return stuff[0]
576
577 def _tracelike(iter):
578   """Handle a TRACE-like command.  The result is a list of tuples (CHAR,
579   STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
580   disabled, `+' if enabled, maybe something else later), and DESC is the
581   human-readable description."""
582   stuff = []
583   for ww in iter:
584     ch = ww[0][0]
585     st = ww[0][1:]
586     desc = ' '.join(ww[1:])
587     stuff.append((ch, st, desc))
588   return stuff
589
590 def _kwopts(kw, allowed):
591   """Parse keyword arguments into options.  ALLOWED is a list of allowable
592   keywords; raise errors if other keywords are present.  `KEY = VALUE'
593   becomes an option pair `-KEY VALUE' if VALUE is a string, just the option
594   `-KEY' if VALUE is a true non-string, or nothing if VALUE is false.  Insert
595   a `--' at the end to stop the parser getting confused."""
596   opts = []
597   amap = {}
598   for a in allowed: amap[a] = True
599   for k, v in kw.iteritems():
600     if k not in amap:
601       raise ValueError('option %s not allowed here' % k)
602     if isinstance(v, str):
603       opts += ['-' + k, v]
604     elif v:
605       opts += ['-' + k]
606   opts.append('--')
607   return opts
608
609 ## Deferral.
610 _deferq = []
611 def defer(func, *args, **kw):
612   """Call FUNC(*ARGS, **KW) later, in the root coroutine."""
613   _deferq.append((func, args, kw))
614
615 def funargstr(func, args, kw):
616   items = [repr(a) for a in args]
617   for k, v in kw.iteritems():
618     items.append('%s = %r' % (k, v))
619   return '%s(%s)' % (func.__name__, ', '.join(items))
620
621 def spawn(func, *args, **kw):
622   """Call FUNC, passing ARGS and KW, in a fresh coroutine."""
623   defer(lambda: (Coroutine(func, name = funargstr(func, args, kw))
624                  .switch(*args, **kw)))
625
626 ## Asides.
627 _asideq = Queue()
628 def _runasides():
629   """
630   Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW).
631   """
632   while True:
633     func, args, kw = _asideq.get()
634     try:
635       func(*args, **kw)
636     except:
637       SYS.excepthook(*SYS.exc_info())
638
639 def aside(func, *args, **kw):
640   """Call FUNC(*ARGS, **KW) later, in a non-root coroutine."""
641   defer(_asideq.put, (func, args, kw))
642
643 class TripeCommandDispatcher (TripeConnection):
644   """
645   Command dispatcher.
646
647   The command dispatcher is a connection which knows how to handle commands.
648   This is probably the most important class in this module to understand.
649
650   Lines from the server are parsed into tokens.  The first token is a code
651   (`OK' or `NOTE' or something) explaining what kind of line this is.  The
652   `handler' attribute is a dictionary mapping server line codes to handler
653   functions, which are applied to the words of the line as individual
654   arguments.  *Exception*: the content of `TRACE' lines is not tokenized.
655
656   There are default handlers for server codes which respond to commands.
657   Commands arrive as `TripeCommand' instances through the `rawcommand'
658   interface.  The dispatcher keeps track of which command objects represent
659   which jobs, and sends responses on to the appropriate command objects by
660   invoking their `response' methods.  Command objects don't see the `BG...'
661   codes, because the dispatcher has already transformed them into regular
662   codes when it was looking up the job tag.
663
664   The dispatcher also has a special response code of its own: `CONNERR'
665   indicates that the connection failed and the command has therefore been
666   lost.  This is sent to all outstanding commands when a connection error is
667   encountered: rather than a token list, it is accompanied by an exception
668   object which is the cause of the disconnection, which may be `None' if the
669   disconnection is expected (e.g., the direct result of a user request).
670   """
671
672   ## --- Infrastructure ---
673   ##
674   ## We will get confused if we pipeline commands.  Send them one at a time.
675   ## Only send a command when the previous one detaches or completes.
676   ##
677   ## The following attributes are interesting:
678   ##
679   ## tagseq             Sequence number for next background job (for bgtag)
680   ##
681   ## queue              Commands awaiting submission.
682   ##
683   ## cmd                Mapping from job tags to commands: cmd[None] is the
684   ##                    foreground command.
685   ##
686   ## handler            Mapping from server codes to handler functions.
687
688   def __init__(me, socket):
689     """
690     Initialize the dispatcher.
691
692     The SOCKET is the filename of the administration socket to connect to,
693     for TripeConnection.__init__.
694     """
695     TripeConnection.__init__(me, socket)
696     me.tagseq = 0
697     me.handler = {}
698     me.handler['BGDETACH'] = me._detach
699     for i in 'BGOK', 'BGINFO', 'BGFAIL':
700       me.handler[i] = me._response
701     for i in 'OK', 'INFO', 'FAIL':
702       me.handler[i] = me._fgresponse
703
704   def quitp(me):
705     """Should we quit the main loop?  Subclasses should override."""
706     return False
707
708   def mainloop(me, quitp = None):
709     """
710     Iterate the I/O watcher until QUITP returns true.
711
712     Arranges for asides and deferred calls to be made at the right times.
713     """
714
715     global _deferq
716     assert _Coroutine.getcurrent() is rootcr
717     Coroutine(_runasides, name = '_runasides').switch()
718     if quitp is None:
719       quitp = me.quitp
720     while not quitp():
721       while _deferq:
722         q = _deferq
723         _deferq = []
724         for func, args, kw in q:
725           func(*args, **kw)
726       me.iowatch.iterate()
727
728   def connected(me):
729     """
730     Connection hook.
731
732     If a subclass overrides this method, it must call us; clears out the
733     command queue and job map.
734     """
735     me.queue = M.Array()
736     me.cmd = {}
737     TripeConnection.connected(me)
738
739   def disconnected(me, reason):
740     """
741     Disconnection hook.
742
743     If a subclass hooks overrides this method, it must call us; sends a
744     special `CONNERR' code to all incomplete commands.
745     """
746     TripeConnection.disconnected(me, reason)
747     for cmd in me.cmd.itervalues():
748       cmd.response('CONNERR', reason)
749     for cmd in me.queue:
750       cmd.response('CONNERR', reason)
751
752   @_callback
753   def line(me, line):
754     """Handle an incoming line, sending it to the right place."""
755     if _debug: print '<', line
756     code, rest = M.word(line, quotep = True)
757     func = me.handler.get(code)
758     if func is not None:
759       if code == 'TRACE':
760         func(code, rest)
761       else:
762         func(code, *M.split(rest, quotep = True)[0])
763       me.dequeue()
764
765   def dequeue(me):
766     """
767     Pull the oldest command off the queue and try to send it to the server.
768     """
769     if not me.queue or None in me.cmd: return
770     cmd = me.queue.shift()
771     if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
772     me.send(' '.join([quotify(w) for w in cmd.words]))
773     me.cmd[None] = cmd
774
775   def bgtag(me):
776     """
777     Return an unused job tag.
778
779     May be of use when composing commands by hand.
780     """
781     tag = 'J%05d' % me.tagseq
782     me.tagseq += 1
783     return tag
784
785   ## --- Built-in handler functions for server responses ---
786
787   def _detach(me, _, tag):
788     """
789     Respond to a `BGDETACH' TAG message.
790
791     Move the current foreground command to the background.
792     """
793     assert tag not in me.cmd
794     me.cmd[tag] = me.cmd[None]
795     del me.cmd[None]
796
797   def _response(me, code, tag, *w):
798     """
799     Respond to an `OK', `INFO' or `FAIL' message.
800
801     If this is a message for a background job, find the tag; then dispatch
802     the result to the command object.  This is also called by `_fgresponse'
803     (wth TAG set to `None') to handle responses for foreground commands, and
804     is therefore a useful method to extend or override in subclasses.
805     """
806     if code.startswith('BG'):
807       code = code[2:]
808     cmd = me.cmd[tag]
809     if code != 'INFO':
810       del me.cmd[tag]
811     cmd.response(code, *w)
812
813   def _fgresponse(me, code, *w):
814     """Process responses to the foreground command."""
815     me._response(code, None, *w)
816
817   ## --- Interface methods ---
818
819   def rawcommand(me, cmd):
820     """
821     Submit the `TripeCommand' CMD to the server, and look after it until it
822     completes.
823     """
824     if not me.connectedp():
825       raise TripeConnectionError('connection closed')
826     me.queue.push(cmd)
827     me.dequeue()
828
829   def command(me, *cmd, **kw):
830     """Convenience wrapper for creating a TripeCommandIterator object."""
831     return TripeCommandIterator(me, cmd, **kw)
832
833   ## --- Convenience methods for server commands ---
834
835   def add(me, peer, *addr, **kw):
836     return _simple(me.command(bg = True,
837                               *['ADD'] +
838                               _kwopts(kw, ['tunnel', 'keepalive',
839                                            'key', 'priv', 'cork',
840                                            'mobile']) +
841                               [peer] +
842                               list(addr)))
843   def addr(me, peer):
844     return _oneline(me.command('ADDR', peer))
845   def algs(me, peer = None):
846     return _keyvals(me.command('ALGS',
847                                *((peer is not None and [peer]) or [])))
848   def checkchal(me, chal):
849     return _simple(me.command('CHECKCHAL', chal))
850   def daemon(me):
851     return _simple(me.command('DAEMON'))
852   def eping(me, peer, **kw):
853     return _oneline(me.command(bg = True,
854                                *['PING'] +
855                                _kwopts(kw, ['timeout']) +
856                                [peer]))
857   def forcekx(me, peer):
858     return _simple(me.command('FORCEKX', peer))
859   def getchal(me):
860     return _oneline(me.command('GETCHAL', filter = _tokenjoin))
861   def greet(me, peer, chal):
862     return _simple(me.command('GREET', peer, chal))
863   def help(me):
864     return list(me.command('HELP', filter = _tokenjoin))
865   def ifname(me, peer):
866     return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
867   def kill(me, peer):
868     return _simple(me.command('KILL', peer))
869   def list(me):
870     return list(me.command('LIST', filter = _tokenjoin))
871   def notify(me, *msg):
872     return _simple(me.command('NOTIFY', *msg))
873   def peerinfo(me, peer):
874     return _keyvals(me.command('PEERINFO', peer))
875   def ping(me, peer, **kw):
876     return _oneline(me.command(bg = True,
877                                *['PING'] +
878                                _kwopts(kw, ['timeout']) +
879                                [peer]))
880   def port(me):
881     return _oneline(me.command('PORT', filter = _tokenjoin))
882   def quit(me):
883     return _simple(me.command('QUIT'))
884   def reload(me):
885     return _simple(me.command('RELOAD'))
886   def servinfo(me):
887     return _keyvals(me.command('SERVINFO'))
888   def setifname(me, new):
889     return _simple(me.command('SETIFNAME', new))
890   def svcclaim(me, service, version):
891     return _simple(me.command('SVCCLAIM', service, version))
892   def svcensure(me, service, version = None):
893     return _simple(me.command('SVCENSURE', service,
894                               *((version is not None and [version]) or [])))
895   def svcfail(me, job, *msg):
896     return _simple(me.command('SVCFAIL', job, *msg))
897   def svcinfo(me, job, *msg):
898     return _simple(me.command('SVCINFO', job, *msg))
899   def svclist(me):
900     return list(me.command('SVCLIST'))
901   def svcok(me, job):
902     return _simple(me.command('SVCOK', job))
903   def svcquery(me, service):
904     return _keyvals(me.command('SVCQUERY', service))
905   def svcrelease(me, service):
906     return _simple(me.command('SVCRELEASE', service))
907   def svcsubmit(me, service, *args, **kw):
908     return me.command(bg = True,
909                       *['SVCSUBMIT'] +
910                       _kwopts(kw, ['version']) +
911                       [service] +
912                       list(args))
913   def stats(me, peer):
914     return _keyvals(me.command('STATS', peer))
915   def trace(me, *args):
916     return _tracelike(me.command('TRACE', *args))
917   def tunnels(me):
918     return list(me.command('TUNNELS', filter = _tokenjoin))
919   def version(me):
920     return _oneline(me.command('VERSION', filter = _tokenjoin))
921   def warn(me, *msg):
922     return _simple(me.command('WARN', *msg))
923   def watch(me, *args):
924     return _tracelike(me.command('WATCH', *args))
925
926 ###--------------------------------------------------------------------------
927 ### Asynchronous commands.
928
929 class TripeAsynchronousCommand (TripeCommand):
930   """
931   Asynchronous commands.
932
933   This is the complicated way of issuing commands.  You must set up a queue,
934   and associate the command with the queue.  Responses arriving for the
935   command will be put on the queue as an triple of the form (TAG, CODE, REST)
936   -- where TAG is an object of your choice, not interpreted by this class,
937   CODE is the server's response code (`OK', `INFO', `FAIL', or `CONNERR'),
938   and REST is the list of the rest of the server's tokens.
939
940   Using this, you can write coroutines which process many commands (and
941   possibly other events) simultaneously.
942   """
943
944   def __init__(me, queue, tag, words):
945     """Make an asynchronous command consisting of the given WORDS, which
946     sends responses to QUEUE, labelled with TAG."""
947     TripeCommand.__init__(me, words)
948     me.queue = queue
949     me.tag = tag
950
951   def response(me, code, *stuff):
952     """Handle a server response by writing it to the caller's queue."""
953     me.queue.put((me.tag, code, list(stuff)))
954
955 ###--------------------------------------------------------------------------
956 ### Services.
957
958 class TripeJobCancelled (Exception):
959   """
960   Exception sent to job handler if the client kills the job.
961
962   Not propagated further.
963   """
964   pass
965
966 class TripeJobError (Exception):
967   """
968   Exception to cause failure report for running job.
969
970   Sends an SVCFAIL code back.
971   """
972   pass
973
974 class TripeSyntaxError (Exception):
975   """
976   Exception to report a syntax error for a job.
977
978   Sends an SVCFAIL bad-svc-syntax message back.
979   """
980   pass
981
982 class TripeServiceManager (TripeCommandDispatcher):
983   """
984   A command dispatcher with added handling for incoming service requests.
985
986   There is usually only one instance of this class, called svcmgr.  Some of
987   the support functions in this module assume that this is the case.
988
989   To use, run `mLib.select' in a loop until the quitp method returns true;
990   then, in a non-root coroutine, register your services by calling `add', and
991   then call `running' when you've finished setting up.
992
993   The instance handles server service messages `SVCJOB', `SVCCANCEL' and
994   `SVCCLAIM'.  It maintains a table of running services.  Incoming jobs cause
995   the service's `job' method to be invoked; `SVCCANCEL' sends a
996   `TripeJobCancelled' exception to the handler coroutine, and `SVCCLAIM'
997   causes the relevant service to be deregistered.
998
999   There is no base class for jobs, but a job must implement two methods:
1000
1001   start()               Begin processing; might be a no-op.
1002
1003   cancel()              Stop processing; the original client has killed the
1004                         job.
1005
1006   The life of a service manager is divided into two parts: setup and running;
1007   you tell the manager that you've finished setting up by calling the
1008   `running' method.  If, at any point after setup is finished, there are no
1009   remaining services or jobs, `quitp' will return true, ending the process.
1010   """
1011
1012   ## --- Attributes ---
1013   ##
1014   ## svc                Mapping name -> service object
1015   ##
1016   ## job                Mapping jobid -> job handler coroutine
1017   ##
1018   ## runningp           True when setup is finished
1019   ##
1020   ## _quitp             True if explicit quit has been requested
1021
1022   def __init__(me, socket):
1023     """
1024     Initialize the service manager.
1025
1026     SOCKET is the administration socket to connect to.
1027     """
1028     TripeCommandDispatcher.__init__(me, socket)
1029     me.svc = {}
1030     me.job = {}
1031     me.runningp = False
1032     me.handler['SVCCANCEL'] = me._cancel
1033     me.handler['SVCJOB'] = me._job
1034     me.handler['SVCCLAIM'] = me._claim
1035     me._quitp = 0
1036
1037   def addsvc(me, svc):
1038     """Register a new service; SVC is a `TripeService' instance."""
1039     assert svc.name not in me.svc
1040     me.svcclaim(svc.name, svc.version)
1041     me.svc[svc.name] = svc
1042
1043   def _cancel(me, _, jid):
1044     """
1045     Called when the server cancels a job; invokes the job's `cancel' method.
1046     """
1047     job = me.job[jid]
1048     del me.job[jid]
1049     job.cancel()
1050
1051   def _claim(me, _, svc, __):
1052     """Called when another program claims our service at a higher version."""
1053     del me.svc[svc]
1054
1055   def _job(me, _, jid, svc, cmd, *args):
1056     """
1057     Called when the server sends us a job to do.
1058
1059     Calls the service to collect a job, and begins processing it.
1060     """
1061     assert jid not in me.job
1062     svc = me.svc[svc.lower()]
1063     job = svc.job(jid, cmd, args)
1064     me.job[jid] = job
1065     job.start()
1066
1067   def running(me):
1068     """Answer true if setup is finished."""
1069     me.runningp = True
1070
1071   def jobdone(me, jid):
1072     """Informs the service manager that the job with id JID has finished."""
1073     try:
1074       del me.job[jid]
1075     except KeyError:
1076       pass
1077
1078   def quitp(me):
1079     """
1080     Return true if no services or jobs are active (and, therefore, if this
1081     process can quit without anyone caring).
1082     """
1083     return me._quitp or (me.runningp and ((not me.svc and not me.job) or
1084                                           not me.sock))
1085
1086   def quit(me):
1087     """Forces the quit flag (returned by quitp) on."""
1088     me._quitp = True
1089
1090 class TripeService (object):
1091   """
1092   A standard service.
1093
1094   The NAME and VERSION are passed on to the server.  The CMDTAB is a
1095   dictionary mapping command names (in lowercase) to command objects.
1096
1097   If the CMDTAB doesn't have entries for commands `HELP' and `QUIT' then
1098   defaults are provided.
1099
1100   TripeService itself is mostly agnostic about the nature of command objects,
1101   but the TripeServiceJob class (below) has some requirements.  The built-in
1102   HELP command requires command objects to have `usage' attributes.
1103   """
1104
1105   def __init__(me, name, version, cmdtab):
1106     """
1107     Create and register a new service with the given NAME and VERSION.
1108
1109     CMDTAB maps command names (in lower-case) to command objects.
1110     """
1111     me.name = name
1112     me.version = version
1113     me.cmd = cmdtab
1114     me.activep = True
1115     me.cmd.setdefault('help',
1116                       TripeServiceCommand('help', 0, 0, '', me._help))
1117     me.cmd.setdefault('quit',
1118                       TripeServiceCommand('quit', 0, 0, '', me._quit))
1119
1120   def job(me, jid, cmd, args):
1121     """
1122     Called by the service manager: a job arrived with id JID.
1123
1124     It asks for comamnd CMD with argument list ARGS.  Creates a new job,
1125     passing it the information needed.
1126     """
1127     return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1128
1129   ## Simple default command handlers, complying with the spec in
1130   ## tripe-service(7).
1131
1132   def _help(me):
1133     """Send a help summary to the user."""
1134     cmds = me.cmd.items()
1135     cmds.sort()
1136     for name, cmd in cmds:
1137       svcinfo(name, *cmd.usage)
1138
1139   def _quit(me):
1140     """Terminate the service manager."""
1141     svcmgr.notify('svc-quit', me.name, 'admin-request')
1142     svcmgr.quit()
1143
1144 class TripeServiceCommand (object):
1145   """A simple service command."""
1146
1147   def __init__(me, name, min, max, usage, func):
1148     """
1149     Creates a new command.
1150
1151     NAME is the command's name (in lowercase).
1152
1153     MIN and MAX are the minimum and maximum number of allowed arguments (used
1154     for checking); either may be None to indicate no minimum or maximum.
1155
1156     USAGE is a usage string, used for generating help and error messages.
1157
1158     FUNC is the function to invoke.
1159     """
1160     me.name = name
1161     me.min = min
1162     me.max = max
1163     me.usage = usage.split()
1164     me.func = func
1165
1166   def run(me, *args):
1167     """
1168     Called when the command is invoked.
1169
1170     Does minimal checking of the arguments and calls the supplied function.
1171     """
1172     if (me.min is not None and len(args) < me.min) or \
1173        (me.max is not None and len(args) > me.max):
1174       raise TripeSyntaxError
1175     me.func(*args)
1176
1177 class TripeServiceJob (Coroutine):
1178   """
1179   Job handler coroutine.
1180
1181   A standard `TripeService' invokes a `TripeServiceJob' for each incoming job
1182   request, passing it the jobid, command and arguments, and a command object.
1183   The command object needs the following attributes.
1184
1185   usage                 A usage list (excluding the command name) showing
1186                         arguments and options.
1187
1188   run(*ARGS)            Function to react to the command with ARGS split into
1189                         separate arguments.  Invoked in a coroutine.  The
1190                         `svcinfo function (not the `TripeCommandDispatcher'
1191                         method) may be used to send `INFO' lines.  The
1192                         function may raise `TripeJobError' to send a `FAIL'
1193                         response back, or `TripeSyntaxError' to send a
1194                         generic usage error.  `TripeJobCancelled' exceptions
1195                         are trapped silently.  Other exceptions are
1196                         translated into a generic internal-error message.
1197
1198   This class automatically takes care of sending some closing response to the
1199   job, and for informing the service manager that the job is completed.
1200
1201   The `jid' attribute stores the job's id.
1202   """
1203
1204   def __init__(me, jid, svc, cmd, command, args):
1205     """
1206     Start a new job.
1207
1208     The job is created with id JID, for service SVC, processing command name
1209     CMD (which the service resolved into the command object COMMAND, or
1210     `None'), and with the arguments ARGS.
1211     """
1212     Coroutine.__init__(me)
1213     me.jid = jid
1214     me.svc = svc
1215     me.cmd = cmd
1216     me.command = command
1217     me.args = args
1218
1219   def run(me):
1220     """
1221     Main body of the coroutine.
1222
1223     Does the tedious exception handling boilerplate and invokes the command's
1224     run method.
1225     """
1226     try:
1227       try:
1228         if me.command is None:
1229           svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1230         else:
1231           me.command.run(*me.args)
1232           svcmgr.svcok(me.jid)
1233       except TripeJobError, exc:
1234         svcmgr.svcfail(me.jid, *exc.args)
1235       except TripeSyntaxError:
1236         svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1237                        me.svc.name, me.command.name,
1238                        *me.command.usage)
1239       except TripeJobCancelled:
1240         pass
1241       except Exception, exc:
1242         svcmgr.svcfail(me.jid, 'svc-internal-error',
1243                        exc.__class__.__name__, str(exc))
1244     finally:
1245       svcmgr.jobdone(me.jid)
1246
1247   def start(me):
1248     """Invoked by the service manager to start running the coroutine."""
1249     me.switch()
1250
1251   def cancel(me):
1252     """Invoked by the service manager to cancel the job."""
1253     me.throw(TripeJobCancelled())
1254
1255 def svcinfo(*args):
1256   """
1257   If invoked from a TripeServiceJob coroutine, sends an `INFO' line to the
1258   job's sender, automatically using the correct job id.
1259   """
1260   svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1261
1262 def _setupsvc(tab, func):
1263   """
1264   Setup coroutine for setting up service programs.
1265
1266   Register the given services.
1267   """
1268   try:
1269     for service in tab:
1270       svcmgr.addsvc(service)
1271     if func:
1272       func()
1273   finally:
1274     svcmgr.running()
1275
1276 svcmgr = TripeServiceManager(None)
1277 def runservices(socket, tab, init = None, setup = None, daemon = False):
1278   """
1279   Function to start a service provider.
1280
1281   SOCKET is the socket to connect to, usually tripesock.
1282
1283   TAB is a list of entries.  An entry may be either a tuple
1284
1285     (NAME, VERSION, COMMANDS)
1286
1287   or a service object (e.g., a `TripeService' instance).
1288
1289   COMMANDS is a dictionary mapping command names to tuples
1290
1291     (MIN, MAX, USAGE, FUNC)
1292
1293   of arguments for a `TripeServiceCommand' object.
1294
1295   If DAEMON is true, then the process is forked into the background before we
1296   start.  If INIT is given, it is called in the main coroutine, immediately
1297   after forking.  If SETUP is given, it is called in a coroutine, after
1298   calling INIT and setting up the services but before marking the service
1299   manager as running.
1300
1301   It is a really bad idea to do any initialization, particularly setting up
1302   coroutines, outside of the INIT or SETUP functions.  In particular, if
1303   we're using rmcr for fake coroutines, the daemonizing fork will kill off
1304   the currently established coroutines in a most surprising way.
1305
1306   The function runs a main select loop until the service manager decides to
1307   quit.
1308   """
1309
1310   svcmgr.socket = socket
1311   svcmgr.connect()
1312   svcs = []
1313   for service in tab:
1314     if not isinstance(service, tuple):
1315       svcs.append(service)
1316     else:
1317       name, version, commands = service
1318       cmdmap = {}
1319       for cmd, stuff in commands.iteritems():
1320         cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1321       svcs.append(TripeService(name, version, cmdmap))
1322   if daemon:
1323     M.daemonize()
1324   if init is not None:
1325     init()
1326   spawn(_setupsvc, svcs, setup)
1327   svcmgr.mainloop()
1328
1329 ###--------------------------------------------------------------------------
1330 ### Utilities for services.
1331
1332 _timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
1333 def timespec(spec):
1334   """Parse the timespec SPEC, returning a number of seconds."""
1335   mul = 1
1336   if len(spec) > 1 and spec[-1] in _timeunits:
1337     mul = _timeunits[spec[-1]]
1338     spec = spec[:-1]
1339   try:
1340     t = int(spec)
1341   except:
1342     raise TripeJobError('bad-time-spec', spec)
1343   if t < 0:
1344     raise TripeJobError('bad-time-spec', spec)
1345   return mul * int(spec)
1346
1347 class OptParse (object):
1348   """
1349   Parse options from a command list in the conventional fashion.
1350
1351   ARGS is a list of arguments to a command.  ALLOWED is a sequence of allowed
1352   options.  The returned values are the option tags.  During parsing, the
1353   `arg' method may be used to retrieve the argument for the most recent
1354   option.  Afterwards, `rest' may be used to retrieve the remaining
1355   non-option arguments, and do a simple check on how many there are.
1356
1357   The parser correctly handles `--' option terminators.
1358   """
1359
1360   def __init__(me, args, allowed):
1361     """
1362     Create a new option parser.
1363
1364     The parser will scan the ARGS for options given in the sequence ALLOWED
1365     (which are expected to include the `-' prefix).
1366     """
1367     me.allowed = {}
1368     for a in allowed:
1369       me.allowed[a] = True
1370     me.args = list(args)
1371
1372   def __iter__(me):
1373     """Iterator protocol: I am my own iterator."""
1374     return me
1375
1376   def next(me):
1377     """
1378     Iterator protocol: return the next option.
1379
1380     If we've run out, raise `StopIteration'.
1381     """
1382     if len(me.args) == 0 or \
1383        len(me.args[0]) < 2 or \
1384        not me.args[0].startswith('-'):
1385       raise StopIteration
1386     opt = me.args.pop(0)
1387     if opt == '--':
1388       raise StopIteration
1389     if opt not in me.allowed:
1390       raise TripeSyntaxError
1391     return opt
1392
1393   def arg(me):
1394     """
1395     Return the argument for the most recent option.
1396
1397     If none is available, raise `TripeSyntaxError'.
1398     """
1399     if len(me.args) == 0:
1400       raise TripeSyntaxError
1401     return me.args.pop(0)
1402
1403   def rest(me, min = None, max = None):
1404     """
1405     After option parsing is done, return the remaining arguments.
1406
1407     Check that there are at least MIN and at most MAX arguments remaining --
1408     either may be None to suppress the check.
1409     """
1410     if (min is not None and len(me.args) < min) or \
1411        (max is not None and len(me.args) > max):
1412       raise TripeSyntaxError
1413     return me.args
1414
1415 ###----- That's all, folks --------------------------------------------------