chiark / gitweb /
svc/watch.in: Bug fix: addpeer on correct coroutine.
[tripe] / py / tripe.py.in
1 ### -*-python-*-
2 ###
3 ### Administration connection with tripe server
4 ###
5 ### (c) 2006 Straylight/Edgeware
6 ###
7
8 ###----- Licensing notice ---------------------------------------------------
9 ###
10 ### This file is part of Trivial IP Encryption (TrIPE).
11 ###
12 ### TrIPE is free software; you can redistribute it and/or modify
13 ### it under the terms of the GNU General Public License as published by
14 ### the Free Software Foundation; either version 2 of the License, or
15 ### (at your option) any later version.
16 ###
17 ### TrIPE is distributed in the hope that it will be useful,
18 ### but WITHOUT ANY WARRANTY; without even the implied warranty of
19 ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 ### GNU General Public License for more details.
21 ###
22 ### You should have received a copy of the GNU General Public License
23 ### along with TrIPE; if not, write to the Free Software Foundation,
24 ### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25
26 """
27 This module provides classes and functions for connecting to a running tripe
28 server, sending it commands, receiving and processing replies, and
29 implementing services.
30
31 Rather than end up in lost in a storm of little event-driven classes, or a
32 morass of concurrent threads, the module uses coroutines to present a fairly
33 simple function call/return interface to potentially long-running commands
34 which must run without blocking the main process.  It assumes a coroutine
35 module presenting a subset of the `greenlet' interface: if actual greenlets
36 are available, they are used; otherwise there's an implementation in terms of
37 threads (with lots of locking) which will do instead.
38
39 The simple rule governing the coroutines used here is this:
40
41   * The root coroutine never cares what values are passed to it when it
42     resumes: it just discards them.
43
44   * Other, non-root, coroutines are presumed to be waiting for some specific
45     thing.
46
47 Configuration variables:
48         configdir
49         socketdir
50         PACKAGE
51         VERSION
52         tripesock
53         peerdb
54
55 Other useful variables:
56         rootcr
57         svcmgr
58
59 Other tweakables:
60         _debug
61
62 Exceptions:
63         Exception
64           StandardError
65             TripeConnectionError
66             TripeError
67             TripeInternalError
68           TripeJobCancelled
69           TripeJobError
70           TripeSyntaxError
71
72 Classes:
73         _Coroutine
74           Coroutine
75             TripeServiceJob
76         OptParse
77         Queue
78         SelIOWatcher
79         TripeCommand
80           TripeSynchronousCommand
81           TripeAsynchronousCommand
82         TripeCommandIterator
83         TripeConnection
84           TripeCommandDispatcher
85             TripeServiceManager
86         TripeService
87         TripeServiceCommand
88
89 Utility functions:
90         quotify
91         runservices
92         spawn
93         svcinfo
94         timespec
95 """
96
97 __pychecker__ = 'self=me no-constCond no-argsused'
98
99 _debug = False
100
101 ###--------------------------------------------------------------------------
102 ### External dependencies.
103
104 import socket as S
105 import errno as E
106 import mLib as M
107 import re as RX
108 import sys as SYS
109 import os as OS
110
111 try:
112   if OS.getenv('TRIPE_FORCE_RMCR') is not None:
113     raise ImportError
114   from py.magic import greenlet as _Coroutine
115 except ImportError:
116   from rmcr import Coroutine as _Coroutine
117
118 ###--------------------------------------------------------------------------
119 ### Coroutine hacking.
120
121 rootcr = _Coroutine.getcurrent()
122
123 class Coroutine (_Coroutine):
124   """
125   A coroutine class which can only be invoked by the root coroutine.
126
127   The root, by construction, cannot be an instance of this class.
128   """
129   def switch(me, *args, **kw):
130     assert _Coroutine.getcurrent() is rootcr
131     _Coroutine.switch(me, *args, **kw)
132
133 ###--------------------------------------------------------------------------
134 ### Default places for things.
135
136 configdir = OS.environ.get('TRIPEDIR', "@configdir@")
137 socketdir = "@socketdir@"
138 PACKAGE = "@PACKAGE@"
139 VERSION = "@VERSION@"
140
141 tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
142 peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb')
143
144 ###--------------------------------------------------------------------------
145 ### Connection to the server.
146
147 def readnonblockingly(sock, len):
148   """
149   Nonblocking read from SOCK.
150
151   Try to return LEN bytes.  If couldn't read anything, return None.  EOF is
152   returned as an empty string.
153   """
154   try:
155     sock.setblocking(0)
156     return sock.recv(len)
157   except S.error, exc:
158     if exc[0] == E.EWOULDBLOCK:
159       return None
160     raise
161
162 class TripeConnectionError (StandardError):
163   """Something happened to the connection with the server."""
164   pass
165 class TripeInternalError (StandardError):
166   """This program is very confused."""
167   pass
168
169 class TripeConnection (object):
170   """
171   A logical connection to the tripe administration socket.
172
173   There may or may not be a physical connection.  (This is needed for the
174   monitor, for example.)
175
176   This class isn't very useful on its own, but it has useful subclasses.  At
177   this level, the class is agnostic about I/O multiplexing schemes; that gets
178   added later.
179   """
180
181   def __init__(me, socket):
182     """
183     Make a connection to the named SOCKET.
184
185     No physical connection is made initially.
186     """
187     me.socket = socket
188     me.sock = None
189     me.lbuf = None
190     me.iowatch = SelIOWatcher(me)
191
192   def connect(me):
193     """
194     Ensure that there's a physical connection.
195
196     Do nothing if we're already connected.  Invoke the `connected' method if
197     successful.
198     """
199     if me.sock: return
200     sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
201     sock.connect(me.socket)
202     me.sock = sock
203     me.lbuf = M.LineBuffer(me.line, me._eof)
204     me.lbuf.size = 1024
205     me.connected()
206     return me
207
208   def disconnect(me, reason):
209     """
210     Disconnect the physical connection.
211
212     Invoke the `disconnected' method, giving the provided REASON, which
213     should be either None or an exception.
214     """
215     if not me.sock: return
216     me.disconnected(reason)
217     me.sock.close()
218     me.sock = None
219     me.lbuf.disable()
220     me.lbuf = None
221     return me
222
223   def connectedp(me):
224     """
225     Return true if there's a current, believed-good physical connection.
226     """
227     return me.sock is not None
228
229   __nonzero__ = connectedp
230
231   def send(me, line):
232     """
233     Send the LINE to the connection's socket.
234
235     All output is done through this method; it can be overridden to provide
236     proper nonblocking writing, though this seems generally unnecessary.
237     """
238     try:
239       me.sock.setblocking(1)
240       me.sock.send(line + '\n')
241     except Exception, exc:
242       me.disconnect(exc)
243       raise
244     return me
245
246   def receive(me):
247     """
248     Receive whatever's ready from the connection's socket.
249
250     Call `line' on each complete line, and `eof' if the connection closed.
251     Subclasses which attach this class to an I/O-event system should call
252     this method when the socket (CONN.sock) is ready for reading.
253     """
254     while me.sock is not None:
255       try:
256         buf = readnonblockingly(me.sock, 16384)
257       except Exception, exc:
258         me.disconnect(exc)
259         raise
260       if buf is None:
261         return me
262       if buf == '':
263         me._eof()
264         return me
265       me.lbuf.flush(buf)
266     return me
267
268   def _eof(me):
269     """Internal end-of-file handler."""
270     me.disconnect(TripeConnectionError('connection lost'))
271     me.eof()
272
273   def connected(me):
274     """
275     To be overridden by subclasses to react to a connection being
276     established.
277     """
278     me.iowatch.connected(me.sock)
279
280   def disconnected(me, reason):
281     """
282     To be overridden by subclasses to react to a connection being severed.
283     """
284     me.iowatch.disconnected()
285
286   def eof(me):
287     """To be overridden by subclasses to handle end-of-file."""
288     pass
289
290   def line(me, line):
291     """To be overridden by subclasses to handle incoming lines."""
292     pass
293
294 ###--------------------------------------------------------------------------
295 ### I/O loop integration.
296
297 class SelIOWatcher (object):
298   """
299   Integration with mLib's I/O event system.
300
301   You can replace this object with a different one for integration with,
302   e.g., glib's main loop, by setting CONN.iowatcher to a different object
303   while the CONN is disconnected.
304   """
305
306   def __init__(me, conn):
307     me._conn = conn
308     me._selfile = None
309
310   def connected(me, sock):
311     """
312     Called when a connection is made.
313
314     SOCK is the socket.  The watcher must arrange to call CONN.receive when
315     data is available.
316     """
317     me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive)
318     me._selfile.enable()
319
320   def disconnected(me):
321     """
322     Called when the connection is lost.
323     """
324     me._selfile = None
325
326   def iterate(me):
327     """
328     Wait for something interesting to happen, and issue events.
329
330     That is, basically, do one iteration of a main select loop, processing
331     all of the events, and then return.  This isn't needed for
332     TripeCommandDispatcher, but runservices wants it.
333     """
334     M.select()
335
336 ###--------------------------------------------------------------------------
337 ### Inter-coroutine communication.
338
339 class Queue (object):
340   """
341   A queue of things arriving asynchronously.
342
343   This is a very simple single-reader multiple-writer queue.  It's useful for
344   more complex coroutines which need to cope with a variety of possible
345   incoming events.
346   """
347
348   def __init__(me):
349     """Create a new empty queue."""
350     me.contents = M.Array()
351     me.waiter = None
352
353   def _wait(me):
354     """
355     Internal: wait for an item to arrive in the queue.
356
357     Complain if someone is already waiting, because this is just a
358     single-reader queue.
359     """
360     if me.waiter:
361       raise ValueError('queue already being waited on')
362     try:
363       me.waiter = Coroutine.getcurrent()
364       while not me.contents:
365         me.waiter.parent.switch()
366     finally:
367       me.waiter = None
368
369   def get(me):
370     """
371     Remove and return the item at the head of the queue.
372
373     If the queue is empty, wait until an item arrives.
374     """
375     me._wait()
376     return me.contents.shift()
377
378   def peek(me):
379     """
380     Return the item at the head of the queue without removing it.
381
382     If the queue is empty, wait until an item arrives.
383     """
384     me._wait()
385     return me.contents[0]
386
387   def put(me, thing):
388     """
389     Write THING to the queue.
390
391     If someone is waiting on the queue, wake him up immediately; otherwise
392     just leave the item there for later.
393     """
394     me.contents.push(thing)
395     if me.waiter:
396       me.waiter.switch()
397
398 ###--------------------------------------------------------------------------
399 ### Dispatching coroutine.
400
401 ## Match a string if it can stand on its own as a bareword: i.e., it doesn't
402 ## contain backslashes, quotes or whitespace.
403 rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
404
405 ## Match characters which need to be escaped, even in quoted text.
406 rx_weird = RX.compile(r'([\\\'])')
407
408 def quotify(s):
409   """Quote S according to the tripe-admin(5) rules."""
410   m = rx_ordinary.match(s)
411   if m and m.end() == len(s):
412     return s
413   else:
414     return "'" + rx_weird.sub(r'\\\1', s) + "'"
415
416 def _callback(func):
417   """
418   Return a wrapper for FUNC which reports exceptions thrown by it.
419
420   Useful in the case of callbacks invoked by C functions which ignore
421   exceptions.
422   """
423   def _(*a, **kw):
424     try:
425       return func(*a, **kw)
426     except:
427       SYS.excepthook(*SYS.exc_info())
428       raise
429   return _
430
431 class TripeCommand (object):
432   """
433   This abstract class represents a command in progress.
434
435   The `words' attribute contains the list of tokens which make up the
436   command.
437
438   Subclasses must implement a method to handle server responses:
439
440     * response(CODE, *ARGS): CODE is one of the strings 'OK', 'INFO' or
441       'FAIL'; ARGS are the remaining tokens from the server's response.
442   """
443
444   def __init__(me, words):
445     """Make a new command consisting of the given list of WORDS."""
446     me.words = words
447
448 class TripeSynchronousCommand (TripeCommand):
449   """
450   A simple command, processed apparently synchronously.
451
452   Must be invoked from a coroutine other than the root (or whichever one is
453   running the dispatcher); in reality, other coroutines carry on running
454   while we wait for a response from the server.
455
456   Each server response causes the calling coroutine to be resumed with the
457   pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
458   or `FAIL') and REST is a list of the server's other response tokens.  The
459   calling coroutine must continue switching back to the dispatcher until a
460   terminating response (`OK' or `FAIL') is received or become very
461   confused.
462
463   Mostly it's better to use the TripeCommandIterator to do this
464   automatically.
465   """
466
467   def __init__(me, words):
468     """Initialize the command, specifying the WORDS to send to the server."""
469     TripeCommand.__init__(me, words)
470     me.owner = Coroutine.getcurrent()
471
472   def response(me, code, *rest):
473     """Handle a server response by forwarding it to the calling coroutine."""
474     me.owner.switch((code, rest))
475
476 class TripeError (StandardError):
477   """
478   A tripe command failed with an error (a FAIL code).  The args attribute
479   contains a list of the server's message tokens.
480   """
481   pass
482
483 class TripeCommandIterator (object):
484   """
485   Iterator interface to a tripe command.
486
487   The values returned by the iterator are lists of tokens from the server's
488   INFO lines, as processed by the given filter function, if any.  The
489   iterator completes normally (by raising StopIteration) if the server
490   reported OK, and raises an exception if the command failed for some reason.
491
492   A TripeError is raised if the server issues a FAIL code.  If the connection
493   failed, some other exception is raised.
494   """
495
496   def __init__(me, dispatcher, words, bg = False, filter = None):
497     """
498     Create a new command iterator.
499
500     The command is submitted to the DISPATCHER; it consists of the given
501     WORDS.  If BG is true, then an option is inserted to request that the
502     server run the command in the background.  The FILTER is applied to the
503     token lists which the server responds, and the filter's output are the
504     items returned by the iterator.
505     """
506     me.dcr = Coroutine.getcurrent().parent
507     if me.dcr is None:
508       raise ValueError, 'must invoke from coroutine'
509     me.filter = filter or (lambda x: x)
510     if bg:
511       words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
512     dispatcher.rawcommand(TripeSynchronousCommand(words))
513
514   def __iter__(me):
515     """Iterator protocol: I am my own iterator."""
516     return me
517
518   def next(me):
519     """
520     Iterator protocol: return the next piece of information from the server.
521
522     INFO responses are filtered and returned as the values of the iteration.
523     FAIL and CONNERR responses are turned into exceptions and raised.
524     Finally, OK is turned into StopIteration, which should cause a normal end
525     to the iteration process.
526     """
527     thing = me.dcr.switch()
528     code, rest = thing
529     if code == 'INFO':
530       return me.filter(rest)
531     elif code == 'OK':
532       raise StopIteration
533     elif code == 'CONNERR':
534       if rest is None:
535         raise TripeConnectionError, 'connection terminated by user'
536       else:
537         raise rest
538     elif code == 'FAIL':
539       raise TripeError(*rest)
540     else:
541       raise TripeInternalError \
542             ('unexpected tripe response %r' % ([code] + rest))
543
544 ### Simple utility functions for the TripeCommandIterator convenience
545 ### methods.
546
547 def _tokenjoin(words):
548   """Filter function: simply join the given tokens with spaces between."""
549   return ' '.join(words)
550
551 def _keyvals(iter):
552   """Return a dictionary formed from the KEY=VALUE pairs returned by the
553   iterator ITER."""
554   kv = {}
555   for ww in iter:
556     for w in ww:
557       q = w.index('=')
558       kv[w[:q]] = w[q + 1:]
559   return kv
560
561 def _simple(iter):
562   """Raise an error if ITER contains any item."""
563   stuff = list(iter)
564   if len(stuff) != 0:
565     raise TripeInternalError('expected no response')
566   return None
567
568 def _oneline(iter):
569   """If ITER contains a single item, return it; otherwise raise an error."""
570   stuff = list(iter)
571   if len(stuff) != 1:
572     raise TripeInternalError('expected only one line of response')
573   return stuff[0]
574
575 def _tracelike(iter):
576   """Handle a TRACE-like command.  The result is a list of tuples (CHAR,
577   STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
578   disabled, `+' if enabled, maybe something else later), and DESC is the
579   human-readable description."""
580   stuff = []
581   for ww in iter:
582     ch = ww[0][0]
583     st = ww[0][1:]
584     desc = ' '.join(ww[1:])
585     stuff.append((ch, st, desc))
586   return stuff
587
588 def _kwopts(kw, allowed):
589   """Parse keyword arguments into options.  ALLOWED is a list of allowable
590   keywords; raise errors if other keywords are present.  KEY = VALUE becomes
591   an option pair -KEY VALUE if VALUE is a string, just the option -KEY if
592   VALUE is a true non-string, or nothing if VALUE is false..  Insert a `--'
593   at the end to stop the parser getting confused."""
594   opts = []
595   amap = {}
596   for a in allowed: amap[a] = True
597   for k, v in kw.iteritems():
598     if k not in amap:
599       raise ValueError('option %s not allowed here' % k)
600     if isinstance(v, str):
601       opts += ['-' + k, v]
602     elif v:
603       opts += ['-' + k]
604   opts.append('--')
605   return opts
606
607 ## Deferral.
608 _deferq = []
609 def defer(func, *args, **kw):
610   """Call FUNC(*ARGS, **KW) later, in the root coroutine."""
611   _deferq.append((func, args, kw))
612
613 def spawn(func, *args, **kw):
614   """Call FUNC, passing ARGS and KW, in a fresh coroutine."""
615   defer(lambda: Coroutine(func).switch(*args, **kw))
616
617 ## Asides.
618 _asideq = Queue()
619 def _runasides():
620   """
621   Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW).
622   """
623   while True:
624     func, args, kw = _asideq.get()
625     try:
626       func(*args, **kw)
627     except:
628       SYS.excepthook(*SYS.exc_info())
629
630 def aside(func, *args, **kw):
631   """Call FUNC(*ARGS, **KW) later, in a non-root coroutine."""
632   defer(_asideq.put, (func, args, kw))
633
634 class TripeCommandDispatcher (TripeConnection):
635   """
636   Command dispatcher.
637
638   The command dispatcher is a connection which knows how to handle commands.
639   This is probably the most important class in this module to understand.
640
641   Lines from the server are parsed into tokens.  The first token is a code
642   (OK or NOTE or something) explaining what kind of line this is.  The
643   `handler' attribute is a dictionary mapping server line codes to handler
644   functions, which are applied to the words of the line as individual
645   arguments.  *Exception*: the content of TRACE lines is not tokenized.
646
647   There are default handlers for server codes which respond to commands.
648   Commands arrive as TripeCommand instances through the `rawcommand'
649   interface.  The dispatcher keeps track of which command objects represent
650   which jobs, and sends responses on to the appropriate command objects by
651   invoking their `response' methods.  Command objects don't see the
652   BG... codes, because the dispatcher has already transformed them into
653   regular codes when it was looking up job code.
654
655   The dispatcher also has a special response code of its own: CONNERR
656   indicates that the connection failed and the command has therefore been
657   lost; the
658   """
659
660   ## --- Infrastructure ---
661   ##
662   ## We will get confused if we pipeline commands.  Send them one at a time.
663   ## Only send a command when the previous one detaches or completes.
664   ##
665   ## The following attributes are interesting:
666   ##
667   ## tagseq             Sequence number for next background job (for bgtag)
668   ##
669   ## queue              Commands awaiting submission.
670   ##
671   ## cmd                Mapping from job tags to commands: cmd[None] is the
672   ##                    foreground command.
673   ##
674   ## handler            Mapping from server codes to handler functions.
675
676   def __init__(me, socket):
677     """
678     Initialize the dispatcher.
679
680     The SOCKET is the filename of the administration socket to connect to,
681     for TripeConnection.__init__.
682     """
683     TripeConnection.__init__(me, socket)
684     me.tagseq = 0
685     me.handler = {}
686     me.handler['BGDETACH'] = me._detach
687     for i in 'BGOK', 'BGINFO', 'BGFAIL':
688       me.handler[i] = me._response
689     for i in 'OK', 'INFO', 'FAIL':
690       me.handler[i] = me._fgresponse
691
692   def quitp(me):
693     """Should we quit the main loop?  Subclasses should override."""
694     return False
695
696   def mainloop(me, quitp = None):
697     """
698     Iterate the I/O watcher until QUITP returns true.
699
700     Arranges for asides and deferred calls to be made at the right times.
701     """
702
703     global _deferq
704     assert _Coroutine.getcurrent() is rootcr
705     Coroutine(_runasides).switch()
706     while not quitp():
707       while _deferq:
708         q = _deferq
709         _deferq = []
710         for func, args, kw in q:
711           func(*args, **kw)
712       me.iowatch.iterate()
713
714   def connected(me):
715     """
716     Connection hook.
717
718     If a subclass overrides this method, it must call us; clears out the
719     command queue and job map.
720     """
721     me.queue = M.Array()
722     me.cmd = {}
723     TripeConnection.connected(me)
724
725   def disconnected(me, reason):
726     """
727     Disconnection hook.
728
729     If a subclass hooks overrides this method, it must call us; sends a
730     special CONNERR code to all incomplete commands.
731     """
732     TripeConnection.disconnected(me, reason)
733     for cmd in me.cmd.itervalues():
734       cmd.response('CONNERR', reason)
735     for cmd in me.queue:
736       cmd.response('CONNERR', reason)
737
738   @_callback
739   def line(me, line):
740     """Handle an incoming line, sending it to the right place."""
741     if _debug: print '<', line
742     code, rest = M.word(line, quotep = True)
743     func = me.handler.get(code)
744     if func is not None:
745       if code == 'TRACE':
746         func(code, rest)
747       else:
748         func(code, *M.split(rest, quotep = True)[0])
749       me.dequeue()
750
751   def dequeue(me):
752     """
753     Pull the oldest command off the queue and try to send it to the server.
754     """
755     if not me.queue or None in me.cmd: return
756     cmd = me.queue.shift()
757     if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
758     me.send(' '.join([quotify(w) for w in cmd.words]))
759     me.cmd[None] = cmd
760
761   def bgtag(me):
762     """
763     Return an unused job tag.
764
765     May be of use when composing commands by hand.
766     """
767     tag = 'J%05d' % me.tagseq
768     me.tagseq += 1
769     return tag
770
771   ## --- Built-in handler functions for server responses ---
772
773   def _detach(me, _, tag):
774     """
775     Respond to a BGDETACH TAG message.
776
777     Move the current foreground command to the background.
778     """
779     assert tag not in me.cmd
780     me.cmd[tag] = me.cmd[None]
781     del me.cmd[None]
782
783   def _response(me, code, tag, *w):
784     """
785     Respond to an OK, INFO or FAIL message.
786
787     If this is a message for a background job, find the tag; then dispatch
788     the result to the command object.
789     """
790     if code.startswith('BG'):
791       code = code[2:]
792     cmd = me.cmd[tag]
793     if code != 'INFO':
794       del me.cmd[tag]
795     cmd.response(code, *w)
796
797   def _fgresponse(me, code, *w):
798     """Process responses to the foreground command."""
799     me._response(code, None, *w)
800
801   ## --- Interface methods ---
802
803   def rawcommand(me, cmd):
804     """
805     Submit the TripeCommand CMD to the server, and look after it until it
806     completes.
807     """
808     if not me.connectedp():
809       raise TripeConnectionError('connection closed')
810     me.queue.push(cmd)
811     me.dequeue()
812
813   def command(me, *cmd, **kw):
814     """Convenience wrapper for creating a TripeCommandIterator object."""
815     return TripeCommandIterator(me, cmd, **kw)
816
817   ## --- Convenience methods for server commands ---
818
819   def add(me, peer, *addr, **kw):
820     return _simple(me.command(bg = True,
821                               *['ADD'] +
822                               _kwopts(kw, ['tunnel', 'keepalive', 'cork']) +
823                               [peer] +
824                               list(addr)))
825   def addr(me, peer):
826     return _oneline(me.command('ADDR', peer))
827   def algs(me):
828     return _keyvals(me.command('ALGS'))
829   def checkchal(me, chal):
830     return _simple(me.command('CHECKCHAL', chal))
831   def daemon(me):
832     return _simple(me.command('DAEMON'))
833   def eping(me, peer, **kw):
834     return _oneline(me.command(bg = True,
835                                *['PING'] +
836                                _kwopts(kw, ['timeout']) +
837                                [peer]))
838   def forcekx(me, peer):
839     return _simple(me.command('FORCEKX', peer))
840   def getchal(me):
841     return _oneline(me.command('GETCHAL', filter = _tokenjoin))
842   def greet(me, peer, chal):
843     return _simple(me.command('GREET', peer, chal))
844   def help(me):
845     return list(me.command('HELP', filter = _tokenjoin))
846   def ifname(me, peer):
847     return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
848   def kill(me, peer):
849     return _simple(me.command('KILL', peer))
850   def list(me):
851     return list(me.command('LIST', filter = _tokenjoin))
852   def notify(me, *msg):
853     return _simple(me.command('NOTIFY', *msg))
854   def peerinfo(me, peer):
855     return _keyvals(me.command('PEERINFO', peer))
856   def ping(me, peer, **kw):
857     return _oneline(me.command(bg = True,
858                                *['PING'] +
859                                _kwopts(kw, ['timeout']) +
860                                [peer]))
861   def port(me):
862     return _oneline(me.command('PORT', filter = _tokenjoin))
863   def quit(me):
864     return _simple(me.command('QUIT'))
865   def reload(me):
866     return _simple(me.command('RELOAD'))
867   def servinfo(me):
868     return _keyvals(me.command('SERVINFO'))
869   def setifname(me, new):
870     return _simple(me.command('SETIFNAME', new))
871   def svcclaim(me, service, version):
872     return _simple(me.command('SVCCLAIM', service, version))
873   def svcensure(me, service, version = None):
874     return _simple(me.command('SVCENSURE', service,
875                               *((version is not None and [version]) or [])))
876   def svcfail(me, job, *msg):
877     return _simple(me.command('SVCFAIL', job, *msg))
878   def svcinfo(me, job, *msg):
879     return _simple(me.command('SVCINFO', job, *msg))
880   def svclist(me):
881     return list(me.command('SVCLIST'))
882   def svcok(me, job):
883     return _simple(me.command('SVCOK', job))
884   def svcquery(me, service):
885     return _keyvals(me.command('SVCQUERY', service))
886   def svcrelease(me, service):
887     return _simple(me.command('SVCRELEASE', service))
888   def svcsubmit(me, service, *args, **kw):
889     return me.command(bg = True,
890                       *['SVCSUBMIT'] +
891                       _kwopts(kw, ['version']) +
892                       [service] +
893                       list(args))
894   def stats(me, peer):
895     return _keyvals(me.command('STATS', peer))
896   def trace(me, *args):
897     return _tracelike(me.command('TRACE', *args))
898   def tunnels(me):
899     return list(me.command('TUNNELS', filter = _tokenjoin))
900   def version(me):
901     return _oneline(me.command('VERSION', filter = _tokenjoin))
902   def warn(me, *msg):
903     return _simple(me.command('WARN', *msg))
904   def watch(me, *args):
905     return _tracelike(me.command('WATCH', *args))
906
907 ###--------------------------------------------------------------------------
908 ### Asynchronous commands.
909
910 class TripeAsynchronousCommand (TripeCommand):
911   """
912   Asynchronous commands.
913
914   This is the complicated way of issuing commands.  You must set up a queue,
915   and associate the command with the queue.  Responses arriving for the
916   command will be put on the queue as an triple of the form (TAG, CODE, REST)
917   -- where TAG is an object of your choice, not interpreted by this class,
918   CODE is the server's response code (OK, INFO, FAIL), and REST is the list
919   of the rest of the server's tokens.
920
921   Using this, you can write coroutines which process many commands (and
922   possibly other events) simultaneously.
923   """
924
925   def __init__(me, queue, tag, words):
926     """Make an asynchronous command consisting of the given WORDS, which
927     sends responses to QUEUE, labelled with TAG."""
928     TripeCommand.__init__(me, words)
929     me.queue = queue
930     me.tag = tag
931
932   def response(me, code, *stuff):
933     """Handle a server response by writing it to the caller's queue."""
934     me.queue.put((me.tag, code, list(stuff)))
935
936 ###--------------------------------------------------------------------------
937 ### Services.
938
939 class TripeJobCancelled (Exception):
940   """
941   Exception sent to job handler if the client kills the job.
942
943   Not propagated further.
944   """
945   pass
946
947 class TripeJobError (Exception):
948   """
949   Exception to cause failure report for running job.
950
951   Sends an SVCFAIL code back.
952   """
953   pass
954
955 class TripeSyntaxError (Exception):
956   """
957   Exception to report a syntax error for a job.
958
959   Sends an SVCFAIL bad-svc-syntax message back.
960   """
961   pass
962
963 class TripeServiceManager (TripeCommandDispatcher):
964   """
965   A command dispatcher with added handling for incoming service requests.
966
967   There is usually only one instance of this class, called svcmgr.  Some of
968   the support functions in this module assume that this is the case.
969
970   To use, run mLib.select in a loop until the quitp method returns true;
971   then, in a non-root coroutine, register your services by calling `add', and
972   then call `running' when you've finished setting up.
973
974   The instance handles server service messages SVCJOB, SVCCANCEL and
975   SVCCLAIM.  It maintains a table of running services.  Incoming jobs cause
976   the service's `job' method to be invoked; SVCCANCEL sends a
977   TripeJobCancelled exception to the handler coroutine, and SVCCLAIM causes
978   the relevant service to be deregistered.
979
980   There is no base class for jobs, but a job must implement two methods:
981
982   start()               Begin processing; might be a no-op.
983
984   cancel()              Stop processing; the original client has killed the
985                         job.
986
987   The life of a service manager is divided into two parts: setup and running;
988   you tell the manager that you've finished setting up by calling the
989   `running' method.  If, at any point after setup is finished, there are no
990   remaining services or jobs, `quitp' will return true, ending the process.
991   """
992
993   ## --- Attributes ---
994   ##
995   ## svc                Mapping name -> service object
996   ##
997   ## job                Mapping jobid -> job handler coroutine
998   ##
999   ## runningp           True when setup is finished
1000   ##
1001   ## _quitp             True if explicit quit has been requested
1002
1003   def __init__(me, socket):
1004     """
1005     Initialize the service manager.
1006
1007     SOCKET is the administration socket to connect to.
1008     """
1009     TripeCommandDispatcher.__init__(me, socket)
1010     me.svc = {}
1011     me.job = {}
1012     me.runningp = False
1013     me.handler['SVCCANCEL'] = me._cancel
1014     me.handler['SVCJOB'] = me._job
1015     me.handler['SVCCLAIM'] = me._claim
1016     me._quitp = 0
1017
1018   def addsvc(me, svc):
1019     """Register a new service; SVC is a TripeService instance."""
1020     assert svc.name not in me.svc
1021     me.svcclaim(svc.name, svc.version)
1022     me.svc[svc.name] = svc
1023
1024   def _cancel(me, _, jid):
1025     """
1026     Called when the server cancels a job; invokes the job's `cancel' method.
1027     """
1028     job = me.job[jid]
1029     del me.job[jid]
1030     job.cancel()
1031
1032   def _claim(me, _, svc, __):
1033     """Called when another program claims our service at a higher version."""
1034     del me.svc[svc]
1035
1036   def _job(me, _, jid, svc, cmd, *args):
1037     """
1038     Called when the server sends us a job to do.
1039
1040     Calls the service to collect a job, and begins processing it.
1041     """
1042     assert jid not in me.job
1043     svc = me.svc[svc.lower()]
1044     job = svc.job(jid, cmd, args)
1045     me.job[jid] = job
1046     job.start()
1047
1048   def running(me):
1049     """Answer true if setup is finished."""
1050     me.runningp = True
1051
1052   def jobdone(me, jid):
1053     """Informs the service manager that the job with id JID has finished."""
1054     try:
1055       del me.job[jid]
1056     except KeyError:
1057       pass
1058
1059   def quitp(me):
1060     """
1061     Return true if no services or jobs are active (and, therefore, if this
1062     process can quit without anyone caring).
1063     """
1064     return me._quitp or (me.runningp and ((not me.svc and not me.job) or
1065                                           not me.sock))
1066
1067   def quit(me):
1068     """Forces the quit flag (returned by quitp) on."""
1069     me._quitp = True
1070
1071 class TripeService (object):
1072   """
1073   A standard service.
1074
1075   The NAME and VERSION are passed on to the server.  The CMDTAB is a
1076   dictionary mapping command names (in lowercase) to command objects.
1077
1078   If the CMDTAB doesn't have entries for commands HELP and QUIT then defaults
1079   are provided.
1080
1081   TripeService itself is mostly agnostic about the nature of command objects,
1082   but the TripeServiceJob class (below) has some requirements.  The built-in
1083   HELP command requires command objects to have `usage' attributes.
1084   """
1085
1086   def __init__(me, name, version, cmdtab):
1087     """
1088     Create and register a new service with the given NAME and VERSION.
1089
1090     CMDTAB maps command names (in lower-case) to command objects.
1091     """
1092     me.name = name
1093     me.version = version
1094     me.cmd = cmdtab
1095     me.activep = True
1096     me.cmd.setdefault('help',
1097                       TripeServiceCommand('help', 0, 0, '', me._help))
1098     me.cmd.setdefault('quit',
1099                       TripeServiceCommand('quit', 0, 0, '', me._quit))
1100
1101   def job(me, jid, cmd, args):
1102     """
1103     Called by the service manager: a job arrived with id JID.
1104
1105     It asks for comamnd CMD with argument list ARGS.  Creates a new job,
1106     passing it the information needed.
1107     """
1108     return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1109
1110   ## Simple default command handlers, complying with the spec in
1111   ## tripe-service(7).
1112
1113   def _help(me):
1114     """Send a help summary to the user."""
1115     cmds = me.cmd.items()
1116     cmds.sort()
1117     for name, cmd in cmds:
1118       svcinfo(name, *cmd.usage)
1119
1120   def _quit(me):
1121     """Terminate the service manager."""
1122     svcmgr.notify('svc-quit', me.name, 'admin-request')
1123     svcmgr.quit()
1124
1125 class TripeServiceCommand (object):
1126   """A simple service command."""
1127
1128   def __init__(me, name, min, max, usage, func):
1129     """
1130     Creates a new command.
1131
1132     NAME is the command's name (in lowercase).
1133
1134     MIN and MAX are the minimum and maximum number of allowed arguments (used
1135     for checking); either may be None to indicate no minimum or maximum.
1136
1137     USAGE is a usage string, used for generating help and error messages.
1138
1139     FUNC is the function to invoke.
1140     """
1141     me.name = name
1142     me.min = min
1143     me.max = max
1144     me.usage = usage.split()
1145     me.func = func
1146
1147   def run(me, *args):
1148     """
1149     Called when the command is invoked.
1150
1151     Does minimal checking of the arguments and calls the supplied function.
1152     """
1153     if (me.min is not None and len(args) < me.min) or \
1154        (me.max is not None and len(args) > me.max):
1155       raise TripeSyntaxError
1156     me.func(*args)
1157
1158 class TripeServiceJob (Coroutine):
1159   """
1160   Job handler coroutine.
1161
1162   A standard TripeService invokes a TripeServiceJob for each incoming job
1163   request, passing it the jobid, command and arguments, and a command
1164   object.  The command object needs the following attributes.
1165
1166   usage                 A usage list (excluding the command name) showing
1167                         arguments and options.
1168
1169   run(*ARGS)            Function to react to the command with ARGS split into
1170                         separate arguments.  Invoked in a coroutine.  The
1171                         svcinfo function (not the TripeCommandDispatcher
1172                         method) may be used to send INFO lines.  The function
1173                         may raise TripeJobError to send a FAIL response back,
1174                         or TripeSyntaxError to send a generic usage error.
1175                         TripeJobCancelled exceptions are trapped silently.
1176                         Other exceptions are translated into a generic
1177                         internal-error message.
1178
1179   This class automatically takes care of sending some closing response to the
1180   job, and for informing the service manager that the job is completed.
1181
1182   The `jid' attribute stores the job's id.
1183   """
1184
1185   def __init__(me, jid, svc, cmd, command, args):
1186     """
1187     Start a new job.
1188
1189     The job is created with id JID, for service SVC, processing command name
1190     CMD (which the service resolved into the command object COMMAND, or
1191     None), and with the arguments ARGS.
1192     """
1193     Coroutine.__init__(me)
1194     me.jid = jid
1195     me.svc = svc
1196     me.cmd = cmd
1197     me.command = command
1198     me.args = args
1199
1200   def run(me):
1201     """
1202     Main body of the coroutine.
1203
1204     Does the tedious exception handling boilerplate and invokes the command's
1205     run method.
1206     """
1207     try:
1208       try:
1209         if me.command is None:
1210           svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1211         else:
1212           me.command.run(*me.args)
1213           svcmgr.svcok(me.jid)
1214       except TripeJobError, exc:
1215         svcmgr.svcfail(me.jid, *exc.args)
1216       except TripeSyntaxError:
1217         svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1218                        me.svc.name, me.command.name,
1219                        *me.command.usage)
1220       except TripeJobCancelled:
1221         pass
1222       except Exception, exc:
1223         svcmgr.svcfail(me.jid, 'svc-internal-error',
1224                        exc.__class__.__name__, str(exc))
1225     finally:
1226       svcmgr.jobdone(me.jid)
1227
1228   def start(me):
1229     """Invoked by the service manager to start running the coroutine."""
1230     me.switch()
1231
1232   def cancel(me):
1233     """Invoked by the service manager to cancel the job."""
1234     me.throw(TripeJobCancelled())
1235
1236 def svcinfo(*args):
1237   """
1238   If invoked from a TripeServiceJob coroutine, sends an INFO line to the
1239   job's sender, automatically using the correct job id.
1240   """
1241   svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1242
1243 def _setupsvc(tab, func):
1244   """
1245   Setup coroutine for setting up service programs.
1246
1247   Register the given services.
1248   """
1249   try:
1250     for service in tab:
1251       svcmgr.addsvc(service)
1252     if func:
1253       func()
1254   finally:
1255     svcmgr.running()
1256
1257 svcmgr = TripeServiceManager(None)
1258 def runservices(socket, tab, init = None, setup = None, daemon = False):
1259   """
1260   Function to start a service provider.
1261
1262   SOCKET is the socket to connect to, usually tripesock.
1263
1264   TAB is a list of entries.  An entry may be either a tuple
1265
1266     (NAME, VERSION, COMMANDS)
1267
1268   or a service object (e.g., a TripeService instance).
1269
1270   COMMANDS is a dictionary mapping command names to tuples
1271
1272     (MIN, MAX, USAGE, FUNC)
1273
1274   of arguments for a TripeServiceCommand object.
1275
1276   If DAEMON is true, then the process is forked into the background before we
1277   start.  If INIT is given, it is called in the main coroutine, immediately
1278   after forking.  If SETUP is given, it is called in a coroutine, after
1279   calling INIT and setting up the services but before marking the service
1280   manager as running.
1281
1282   It is a really bad idea to do any initialization, particularly setting up
1283   coroutines, outside of the INIT or SETUP functions.  In particular, if
1284   we're using rmcr for fake coroutines, the daemonizing fork will kill off
1285   the currently established coroutines in a most surprising way.
1286
1287   The function runs a main select loop until the service manager decides to
1288   quit.
1289   """
1290
1291   svcmgr.socket = socket
1292   svcmgr.connect()
1293   svcs = []
1294   for service in tab:
1295     if not isinstance(service, tuple):
1296       svcs.append(service)
1297     else:
1298       name, version, commands = service
1299       cmdmap = {}
1300       for cmd, stuff in commands.iteritems():
1301         cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1302       svcs.append(TripeService(name, version, cmdmap))
1303   if daemon:
1304     M.daemonize()
1305   if init is not None:
1306     init()
1307   spawn(_setupsvc, svcs, setup)
1308   svcmgr.mainloop()
1309
1310 ###--------------------------------------------------------------------------
1311 ### Utilities for services.
1312
1313 _timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
1314 def timespec(spec):
1315   """Parse the timespec SPEC, returning a number of seconds."""
1316   mul = 1
1317   if len(spec) > 1 and spec[-1] in _timeunits:
1318     mul = _timeunits[spec[-1]]
1319     spec = spec[:-1]
1320   try:
1321     t = int(spec)
1322   except:
1323     raise TripeJobError('bad-time-spec', spec)
1324   if t < 0:
1325     raise TripeJobError('bad-time-spec', spec)
1326   return mul * int(spec)
1327
1328 class OptParse (object):
1329   """
1330   Parse options from a command list in the conventional fashion.
1331
1332   ARGS is a list of arguments to a command.  ALLOWED is a sequence of allowed
1333   options.  The returned values are the option tags.  During parsing, the
1334   `arg' method may be used to retrieve the argument for the most recent
1335   option.  Afterwards, `rest' may be used to retrieve the remaining
1336   non-option arguments, and do a simple check on how many there are.
1337
1338   The parser correctly handles `--' option terminators.
1339   """
1340
1341   def __init__(me, args, allowed):
1342     """
1343     Create a new option parser.
1344
1345     The parser will scan the ARGS for options given in the sequence ALLOWED
1346     (which are expected to include the `-' prefix).
1347     """
1348     me.allowed = {}
1349     for a in allowed:
1350       me.allowed[a] = True
1351     me.args = list(args)
1352
1353   def __iter__(me):
1354     """Iterator protocol: I am my own iterator."""
1355     return me
1356
1357   def next(me):
1358     """
1359     Iterator protocol: return the next option.
1360
1361     If we've run out, raise StopIteration.
1362     """
1363     if len(me.args) == 0 or \
1364        len(me.args[0]) < 2 or \
1365        not me.args[0].startswith('-'):
1366       raise StopIteration
1367     opt = me.args.pop(0)
1368     if opt == '--':
1369       raise StopIteration
1370     if opt not in me.allowed:
1371       raise TripeSyntaxError
1372     return opt
1373
1374   def arg(me):
1375     """
1376     Return the argument for the most recent option.
1377
1378     If none is available, raise TripeSyntaxError.
1379     """
1380     if len(me.args) == 0:
1381       raise TripeSyntaxError
1382     return me.args.pop(0)
1383
1384   def rest(me, min = None, max = None):
1385     """
1386     After option parsing is done, return the remaining arguments.
1387
1388     Check that there are at least MIN and at most MAX arguments remaining --
1389     either may be None to suppress the check.
1390     """
1391     if (min is not None and len(me.args) < min) or \
1392        (max is not None and len(me.args) > max):
1393       raise TripeSyntaxError
1394     return me.args
1395
1396 ###----- That's all, folks --------------------------------------------------