chiark / gitweb /
Merge branches 'mdw/knock' and 'mdw/ipv6' into bleeding
[tripe] / py / tripe.py.in
1 ### -*-python-*-
2 ###
3 ### Administration connection with tripe server
4 ###
5 ### (c) 2006 Straylight/Edgeware
6 ###
7
8 ###----- Licensing notice ---------------------------------------------------
9 ###
10 ### This file is part of Trivial IP Encryption (TrIPE).
11 ###
12 ### TrIPE is free software: you can redistribute it and/or modify it under
13 ### the terms of the GNU General Public License as published by the Free
14 ### Software Foundation; either version 3 of the License, or (at your
15 ### option) any later version.
16 ###
17 ### TrIPE is distributed in the hope that it will be useful, but WITHOUT
18 ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
19 ### FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
20 ### for more details.
21 ###
22 ### You should have received a copy of the GNU General Public License
23 ### along with TrIPE.  If not, see <https://www.gnu.org/licenses/>.
24
25 """
26 This module provides classes and functions for connecting to a running tripe
27 server, sending it commands, receiving and processing replies, and
28 implementing services.
29
30 Rather than end up in lost in a storm of little event-driven classes, or a
31 morass of concurrent threads, the module uses coroutines to present a fairly
32 simple function call/return interface to potentially long-running commands
33 which must run without blocking the main process.  It assumes a coroutine
34 module presenting a subset of the `greenlet' interface: if actual greenlets
35 are available, they are used; otherwise there's an implementation in terms of
36 threads (with lots of locking) which will do instead.
37
38 The simple rule governing the coroutines used here is this:
39
40   * The root coroutine never cares what values are passed to it when it
41     resumes: it just discards them.
42
43   * Other, non-root, coroutines are presumed to be waiting for some specific
44     thing.
45
46 Configuration variables:
47         configdir
48         socketdir
49         PACKAGE
50         VERSION
51         tripesock
52         peerdb
53
54 Other useful variables:
55         rootcr
56         svcmgr
57
58 Other tweakables:
59         _debug
60
61 Exceptions:
62         Exception
63           StandardError
64             TripeConnectionError
65             TripeError
66             TripeInternalError
67           TripeJobCancelled
68           TripeJobError
69           TripeSyntaxError
70
71 Classes:
72         _Coroutine
73           Coroutine
74             TripeServiceJob
75         OptParse
76         Queue
77         SelIOWatcher
78         TripeCommand
79           TripeSynchronousCommand
80           TripeAsynchronousCommand
81         TripeCommandIterator
82         TripeConnection
83           TripeCommandDispatcher
84             TripeServiceManager
85         TripeService
86         TripeServiceCommand
87
88 Utility functions:
89         quotify
90         runservices
91         spawn
92         svcinfo
93         timespec
94 """
95
96 __pychecker__ = 'self=me no-constCond no-argsused'
97
98 _debug = False
99
100 ###--------------------------------------------------------------------------
101 ### External dependencies.
102
103 import socket as S
104 import errno as E
105 import mLib as M
106 import re as RX
107 import sys as SYS
108 import os as OS
109
110 try:
111   if OS.getenv('TRIPE_FORCE_RMCR') is not None:
112     raise ImportError()
113   from py.magic import greenlet as _Coroutine
114 except ImportError:
115   from rmcr import Coroutine as _Coroutine
116
117 ###--------------------------------------------------------------------------
118 ### Coroutine hacking.
119
120 rootcr = _Coroutine.getcurrent()
121
122 class Coroutine (_Coroutine):
123   """
124   A coroutine class which can only be invoked by the root coroutine.
125
126   The root, by construction, cannot be an instance of this class.
127   """
128   def switch(me, *args, **kw):
129     assert _Coroutine.getcurrent() is rootcr
130     if _debug: print '* %s' % me
131     _Coroutine.switch(me, *args, **kw)
132     if _debug: print '* %s' % rootcr
133
134 ###--------------------------------------------------------------------------
135 ### Default places for things.
136
137 configdir = OS.environ.get('TRIPEDIR', "@configdir@")
138 socketdir = "@socketdir@"
139 PACKAGE = "@PACKAGE@"
140 VERSION = "@VERSION@"
141
142 tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
143 peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb')
144
145 ###--------------------------------------------------------------------------
146 ### Connection to the server.
147
148 def readnonblockingly(sock, len):
149   """
150   Nonblocking read from SOCK.
151
152   Try to return LEN bytes.  If couldn't read anything, return `None'.  EOF is
153   returned as an empty string.
154   """
155   try:
156     sock.setblocking(0)
157     return sock.recv(len)
158   except S.error, exc:
159     if exc[0] == E.EWOULDBLOCK:
160       return None
161     raise
162
163 class TripeConnectionError (StandardError):
164   """Something happened to the connection with the server."""
165   pass
166 class TripeInternalError (StandardError):
167   """This program is very confused."""
168   pass
169
170 class TripeConnection (object):
171   """
172   A logical connection to the tripe administration socket.
173
174   There may or may not be a physical connection.  (This is needed for the
175   monitor, for example.)
176
177   This class isn't very useful on its own, but it has useful subclasses.  At
178   this level, the class is agnostic about I/O multiplexing schemes; that gets
179   added later.
180   """
181
182   def __init__(me, socket):
183     """
184     Make a connection to the named SOCKET.
185
186     No physical connection is made initially.
187     """
188     me.socket = socket
189     me.sock = None
190     me.lbuf = None
191     me.iowatch = SelIOWatcher(me)
192
193   def connect(me):
194     """
195     Ensure that there's a physical connection.
196
197     Do nothing if we're already connected.  Invoke the `connected' method if
198     successful.
199     """
200     if me.sock: return
201     sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
202     sock.connect(me.socket)
203     me.sock = sock
204     me.lbuf = M.LineBuffer(me.line, me._eof)
205     me.lbuf.size = 1024
206     me.connected()
207     return me
208
209   def disconnect(me, reason):
210     """
211     Disconnect the physical connection.
212
213     Invoke the `disconnected' method, giving the provided REASON, which
214     should be either `None' or an exception.
215     """
216     if not me.sock: return
217     me.disconnected(reason)
218     me.sock.close()
219     me.sock = None
220     me.lbuf.disable()
221     me.lbuf = None
222     return me
223
224   def connectedp(me):
225     """
226     Return true if there's a current, believed-good physical connection.
227     """
228     return me.sock is not None
229
230   __nonzero__ = connectedp
231
232   def send(me, line):
233     """
234     Send the LINE to the connection's socket.
235
236     All output is done through this method; it can be overridden to provide
237     proper nonblocking writing, though this seems generally unnecessary.
238     """
239     try:
240       me.sock.setblocking(1)
241       me.sock.send(line + '\n')
242     except Exception, exc:
243       me.disconnect(exc)
244       raise
245     return me
246
247   def receive(me):
248     """
249     Receive whatever's ready from the connection's socket.
250
251     Call `line' on each complete line, and `eof' if the connection closed.
252     Subclasses which attach this class to an I/O-event system should call
253     this method when the socket (the `sock' attribute) is ready for reading.
254     """
255     while me.sock is not None:
256       try:
257         buf = readnonblockingly(me.sock, 16384)
258       except Exception, exc:
259         me.disconnect(exc)
260         raise
261       if buf is None:
262         return me
263       if buf == '':
264         me._eof()
265         return me
266       me.lbuf.flush(buf)
267     return me
268
269   def _eof(me):
270     """Internal end-of-file handler."""
271     me.disconnect(TripeConnectionError('connection lost'))
272     me.eof()
273
274   def connected(me):
275     """
276     To be overridden by subclasses to react to a connection being
277     established.
278     """
279     me.iowatch.connected(me.sock)
280
281   def disconnected(me, reason):
282     """
283     To be overridden by subclasses to react to a connection being severed.
284     """
285     me.iowatch.disconnected()
286
287   def eof(me):
288     """To be overridden by subclasses to handle end-of-file."""
289     pass
290
291   def line(me, line):
292     """To be overridden by subclasses to handle incoming lines."""
293     pass
294
295 ###--------------------------------------------------------------------------
296 ### I/O loop integration.
297
298 class SelIOWatcher (object):
299   """
300   Integration with mLib's I/O event system.
301
302   You can replace this object with a different one for integration with,
303   e.g., glib's main loop, by setting `CONN.iowatcher' to a different object
304   while the CONN is disconnected.
305   """
306
307   def __init__(me, conn):
308     me._conn = conn
309     me._selfile = None
310
311   def connected(me, sock):
312     """
313     Called when a connection is made.
314
315     SOCK is the socket.  The watcher must arrange to call `CONN.receive' when
316     data is available.
317     """
318     me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive)
319     me._selfile.enable()
320
321   def disconnected(me):
322     """
323     Called when the connection is lost.
324     """
325     me._selfile = None
326
327   def iterate(me):
328     """
329     Wait for something interesting to happen, and issue events.
330
331     That is, basically, do one iteration of a main select loop, processing
332     all of the events, and then return.  This is used in the method
333     `TripeCommandDispatcher.mainloop', but that's mostly for the benefit of
334     `runservices'; if your I/O watcher has a different main loop, you can
335     drive it yourself.
336     """
337     M.select()
338
339 ###--------------------------------------------------------------------------
340 ### Inter-coroutine communication.
341
342 class Queue (object):
343   """
344   A queue of things arriving asynchronously.
345
346   This is a very simple single-reader multiple-writer queue.  It's useful for
347   more complex coroutines which need to cope with a variety of possible
348   incoming events.
349   """
350
351   def __init__(me):
352     """Create a new empty queue."""
353     me.contents = M.Array()
354     me.waiter = None
355
356   def _wait(me):
357     """
358     Internal: wait for an item to arrive in the queue.
359
360     Complain if someone is already waiting, because this is just a
361     single-reader queue.
362     """
363     if me.waiter:
364       raise ValueError('queue already being waited on')
365     try:
366       me.waiter = Coroutine.getcurrent()
367       while not me.contents:
368         me.waiter.parent.switch()
369     finally:
370       me.waiter = None
371
372   def get(me):
373     """
374     Remove and return the item at the head of the queue.
375
376     If the queue is empty, wait until an item arrives.
377     """
378     me._wait()
379     return me.contents.shift()
380
381   def peek(me):
382     """
383     Return the item at the head of the queue without removing it.
384
385     If the queue is empty, wait until an item arrives.
386     """
387     me._wait()
388     return me.contents[0]
389
390   def put(me, thing):
391     """
392     Write THING to the queue.
393
394     If someone is waiting on the queue, wake him up immediately; otherwise
395     just leave the item there for later.
396     """
397     me.contents.push(thing)
398     if me.waiter:
399       me.waiter.switch()
400
401 ###--------------------------------------------------------------------------
402 ### Dispatching coroutine.
403
404 ## Match a string if it can stand on its own as a bareword: i.e., it doesn't
405 ## contain backslashes, quotes or whitespace.
406 rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
407
408 ## Match characters which need to be escaped, even in quoted text.
409 rx_weird = RX.compile(r'([\\\'])')
410
411 def quotify(s):
412   """Quote S according to the tripe-admin(5) rules."""
413   m = rx_ordinary.match(s)
414   if m and m.end() == len(s):
415     return s
416   else:
417     return "'" + rx_weird.sub(r'\\\1', s) + "'"
418
419 def _callback(func):
420   """
421   Return a wrapper for FUNC which reports exceptions thrown by it.
422
423   Useful in the case of callbacks invoked by C functions which ignore
424   exceptions.
425   """
426   def _(*a, **kw):
427     try:
428       return func(*a, **kw)
429     except:
430       SYS.excepthook(*SYS.exc_info())
431       raise
432   return _
433
434 class TripeCommand (object):
435   """
436   This abstract class represents a command in progress.
437
438   The `words' attribute contains the list of tokens which make up the
439   command.
440
441   Subclasses must implement a method to handle server responses:
442
443     * response(CODE, *ARGS): CODE is one of the strings `OK', `INFO' or
444       `FAIL'; ARGS are the remaining tokens from the server's response.
445   """
446
447   def __init__(me, words):
448     """Make a new command consisting of the given list of WORDS."""
449     me.words = words
450
451 class TripeSynchronousCommand (TripeCommand):
452   """
453   A simple command, processed apparently synchronously.
454
455   Must be invoked from a coroutine other than the root (or whichever one is
456   running the dispatcher); in reality, other coroutines carry on running
457   while we wait for a response from the server.
458
459   Each server response causes the calling coroutine to be resumed with the
460   pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
461   or `FAIL') and REST is a list of the server's other response tokens.  The
462   calling coroutine must continue switching back to the dispatcher until a
463   terminating response (`OK' or `FAIL') is received or become very
464   confused.
465
466   Mostly it's better to use the `TripeCommandIterator' to do this
467   automatically.
468   """
469
470   def __init__(me, words):
471     """Initialize the command, specifying the WORDS to send to the server."""
472     TripeCommand.__init__(me, words)
473     me.owner = Coroutine.getcurrent()
474
475   def response(me, code, *rest):
476     """Handle a server response by forwarding it to the calling coroutine."""
477     me.owner.switch((code, rest))
478
479 class TripeError (StandardError):
480   """
481   A tripe command failed with an error (a `FAIL' code).  The args attribute
482   contains a list of the server's message tokens.
483   """
484   pass
485
486 class TripeCommandIterator (object):
487   """
488   Iterator interface to a tripe command.
489
490   The values returned by the iterator are lists of tokens from the server's
491   `INFO' lines, as processed by the given filter function, if any.  The
492   iterator completes normally (by raising `StopIteration') if the server
493   reported `OK', and raises an exception if the command failed for some reason.
494
495   A `TripeError' is raised if the server issues a `FAIL' code.  If the
496   connection failed, some other exception is raised.
497   """
498
499   def __init__(me, dispatcher, words, bg = False, filter = None):
500     """
501     Create a new command iterator.
502
503     The command is submitted to the DISPATCHER; it consists of the given
504     WORDS.  If BG is true, then an option is inserted to request that the
505     server run the command in the background.  The FILTER is applied to the
506     token lists which the server responds, and the filter's output are the
507     items returned by the iterator.
508     """
509     me.dcr = Coroutine.getcurrent().parent
510     if me.dcr is None:
511       raise ValueError('must invoke from coroutine')
512     me.filter = filter or (lambda x: x)
513     if bg:
514       words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
515     dispatcher.rawcommand(TripeSynchronousCommand(words))
516
517   def __iter__(me):
518     """Iterator protocol: I am my own iterator."""
519     return me
520
521   def next(me):
522     """
523     Iterator protocol: return the next piece of information from the server.
524
525     `INFO' responses are filtered and returned as the values of the
526     iteration.  `FAIL' and `CONNERR' responses are turned into exceptions and
527     raised.  Finally, `OK' is turned into `StopIteration', which should cause
528     a normal end to the iteration process.
529     """
530     thing = me.dcr.switch()
531     code, rest = thing
532     if code == 'INFO':
533       return me.filter(rest)
534     elif code == 'OK':
535       raise StopIteration()
536     elif code == 'CONNERR':
537       if rest is None:
538         raise TripeConnectionError('connection terminated by user')
539       else:
540         raise rest
541     elif code == 'FAIL':
542       raise TripeError(*rest)
543     else:
544       raise TripeInternalError('unexpected tripe response %r' %
545                                ([code] + rest))
546
547 ### Simple utility functions for the TripeCommandIterator convenience
548 ### methods.
549
550 def _tokenjoin(words):
551   """Filter function: simply join the given tokens with spaces between."""
552   return ' '.join(words)
553
554 def _keyvals(iter):
555   """Return a dictionary formed from the `KEY=VALUE' pairs returned by the
556   iterator ITER."""
557   kv = {}
558   for ww in iter:
559     for w in ww:
560       q = w.index('=')
561       kv[w[:q]] = w[q + 1:]
562   return kv
563
564 def _simple(iter):
565   """Raise an error if ITER contains any item."""
566   stuff = list(iter)
567   if len(stuff) != 0:
568     raise TripeInternalError('expected no response')
569   return None
570
571 def _oneline(iter):
572   """If ITER contains a single item, return it; otherwise raise an error."""
573   stuff = list(iter)
574   if len(stuff) != 1:
575     raise TripeInternalError('expected only one line of response')
576   return stuff[0]
577
578 def _tracelike(iter):
579   """Handle a TRACE-like command.  The result is a list of tuples (CHAR,
580   STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
581   disabled, `+' if enabled, maybe something else later), and DESC is the
582   human-readable description."""
583   stuff = []
584   for ww in iter:
585     ch = ww[0][0]
586     st = ww[0][1:]
587     desc = ' '.join(ww[1:])
588     stuff.append((ch, st, desc))
589   return stuff
590
591 def _kwopts(kw, allowed):
592   """Parse keyword arguments into options.  ALLOWED is a list of allowable
593   keywords; raise errors if other keywords are present.  `KEY = VALUE'
594   becomes an option pair `-KEY VALUE' if VALUE is a string, just the option
595   `-KEY' if VALUE is a true non-string, or nothing if VALUE is false.  Insert
596   a `--' at the end to stop the parser getting confused."""
597   opts = []
598   amap = {}
599   for a in allowed: amap[a] = True
600   for k, v in kw.iteritems():
601     if k not in amap:
602       raise ValueError('option %s not allowed here' % k)
603     if isinstance(v, str):
604       opts += ['-' + k, v]
605     elif v:
606       opts += ['-' + k]
607   opts.append('--')
608   return opts
609
610 ## Deferral.
611 _deferq = []
612 def defer(func, *args, **kw):
613   """Call FUNC(*ARGS, **KW) later, in the root coroutine."""
614   _deferq.append((func, args, kw))
615
616 def funargstr(func, args, kw):
617   items = [repr(a) for a in args]
618   for k, v in kw.iteritems():
619     items.append('%s = %r' % (k, v))
620   return '%s(%s)' % (func.__name__, ', '.join(items))
621
622 def spawn(func, *args, **kw):
623   """Call FUNC, passing ARGS and KW, in a fresh coroutine."""
624   defer(lambda: (Coroutine(func, name = funargstr(func, args, kw))
625                  .switch(*args, **kw)))
626
627 ## Asides.
628 _asideq = Queue()
629 def _runasides():
630   """
631   Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW).
632   """
633   while True:
634     func, args, kw = _asideq.get()
635     try:
636       func(*args, **kw)
637     except:
638       SYS.excepthook(*SYS.exc_info())
639
640 def aside(func, *args, **kw):
641   """Call FUNC(*ARGS, **KW) later, in a non-root coroutine."""
642   defer(_asideq.put, (func, args, kw))
643
644 class TripeCommandDispatcher (TripeConnection):
645   """
646   Command dispatcher.
647
648   The command dispatcher is a connection which knows how to handle commands.
649   This is probably the most important class in this module to understand.
650
651   Lines from the server are parsed into tokens.  The first token is a code
652   (`OK' or `NOTE' or something) explaining what kind of line this is.  The
653   `handler' attribute is a dictionary mapping server line codes to handler
654   functions, which are applied to the words of the line as individual
655   arguments.  *Exception*: the content of `TRACE' lines is not tokenized.
656
657   There are default handlers for server codes which respond to commands.
658   Commands arrive as `TripeCommand' instances through the `rawcommand'
659   interface.  The dispatcher keeps track of which command objects represent
660   which jobs, and sends responses on to the appropriate command objects by
661   invoking their `response' methods.  Command objects don't see the `BG...'
662   codes, because the dispatcher has already transformed them into regular
663   codes when it was looking up the job tag.
664
665   The dispatcher also has a special response code of its own: `CONNERR'
666   indicates that the connection failed and the command has therefore been
667   lost.  This is sent to all outstanding commands when a connection error is
668   encountered: rather than a token list, it is accompanied by an exception
669   object which is the cause of the disconnection, which may be `None' if the
670   disconnection is expected (e.g., the direct result of a user request).
671   """
672
673   ## --- Infrastructure ---
674   ##
675   ## We will get confused if we pipeline commands.  Send them one at a time.
676   ## Only send a command when the previous one detaches or completes.
677   ##
678   ## The following attributes are interesting:
679   ##
680   ## tagseq             Sequence number for next background job (for bgtag)
681   ##
682   ## queue              Commands awaiting submission.
683   ##
684   ## cmd                Mapping from job tags to commands: cmd[None] is the
685   ##                    foreground command.
686   ##
687   ## handler            Mapping from server codes to handler functions.
688
689   def __init__(me, socket):
690     """
691     Initialize the dispatcher.
692
693     The SOCKET is the filename of the administration socket to connect to,
694     for TripeConnection.__init__.
695     """
696     TripeConnection.__init__(me, socket)
697     me.tagseq = 0
698     me.handler = {}
699     me.handler['BGDETACH'] = me._detach
700     for i in 'BGOK', 'BGINFO', 'BGFAIL':
701       me.handler[i] = me._response
702     for i in 'OK', 'INFO', 'FAIL':
703       me.handler[i] = me._fgresponse
704
705   def quitp(me):
706     """Should we quit the main loop?  Subclasses should override."""
707     return False
708
709   def mainloop(me, quitp = None):
710     """
711     Iterate the I/O watcher until QUITP returns true.
712
713     Arranges for asides and deferred calls to be made at the right times.
714     """
715
716     global _deferq
717     assert _Coroutine.getcurrent() is rootcr
718     Coroutine(_runasides, name = '_runasides').switch()
719     if quitp is None:
720       quitp = me.quitp
721     while not quitp():
722       while _deferq:
723         q = _deferq
724         _deferq = []
725         for func, args, kw in q:
726           func(*args, **kw)
727       me.iowatch.iterate()
728
729   def connected(me):
730     """
731     Connection hook.
732
733     If a subclass overrides this method, it must call us; clears out the
734     command queue and job map.
735     """
736     me.queue = M.Array()
737     me.cmd = {}
738     TripeConnection.connected(me)
739
740   def disconnected(me, reason):
741     """
742     Disconnection hook.
743
744     If a subclass hooks overrides this method, it must call us; sends a
745     special `CONNERR' code to all incomplete commands.
746     """
747     TripeConnection.disconnected(me, reason)
748     for cmd in me.cmd.itervalues():
749       cmd.response('CONNERR', reason)
750     for cmd in me.queue:
751       cmd.response('CONNERR', reason)
752
753   @_callback
754   def line(me, line):
755     """Handle an incoming line, sending it to the right place."""
756     if _debug: print '<', line
757     code, rest = M.word(line, quotep = True)
758     func = me.handler.get(code)
759     if func is not None:
760       if code == 'TRACE':
761         func(code, rest)
762       else:
763         func(code, *M.split(rest, quotep = True)[0])
764       me.dequeue()
765
766   def dequeue(me):
767     """
768     Pull the oldest command off the queue and try to send it to the server.
769     """
770     if not me.queue or None in me.cmd: return
771     cmd = me.queue.shift()
772     if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
773     me.send(' '.join([quotify(w) for w in cmd.words]))
774     me.cmd[None] = cmd
775
776   def bgtag(me):
777     """
778     Return an unused job tag.
779
780     May be of use when composing commands by hand.
781     """
782     tag = 'J%05d' % me.tagseq
783     me.tagseq += 1
784     return tag
785
786   ## --- Built-in handler functions for server responses ---
787
788   def _detach(me, _, tag):
789     """
790     Respond to a `BGDETACH' TAG message.
791
792     Move the current foreground command to the background.
793     """
794     assert tag not in me.cmd
795     me.cmd[tag] = me.cmd[None]
796     del me.cmd[None]
797
798   def _response(me, code, tag, *w):
799     """
800     Respond to an `OK', `INFO' or `FAIL' message.
801
802     If this is a message for a background job, find the tag; then dispatch
803     the result to the command object.  This is also called by `_fgresponse'
804     (wth TAG set to `None') to handle responses for foreground commands, and
805     is therefore a useful method to extend or override in subclasses.
806     """
807     if code.startswith('BG'):
808       code = code[2:]
809     cmd = me.cmd[tag]
810     if code != 'INFO':
811       del me.cmd[tag]
812     cmd.response(code, *w)
813
814   def _fgresponse(me, code, *w):
815     """Process responses to the foreground command."""
816     me._response(code, None, *w)
817
818   ## --- Interface methods ---
819
820   def rawcommand(me, cmd):
821     """
822     Submit the `TripeCommand' CMD to the server, and look after it until it
823     completes.
824     """
825     if not me.connectedp():
826       raise TripeConnectionError('connection closed')
827     me.queue.push(cmd)
828     me.dequeue()
829
830   def command(me, *cmd, **kw):
831     """Convenience wrapper for creating a TripeCommandIterator object."""
832     return TripeCommandIterator(me, cmd, **kw)
833
834   ## --- Convenience methods for server commands ---
835
836   def add(me, peer, *addr, **kw):
837     return _simple(me.command(bg = True,
838                               *['ADD'] +
839                               _kwopts(kw, ['tunnel', 'keepalive',
840                                            'key', 'priv', 'cork',
841                                            'mobile', 'knock',
842                                            'ephemeral']) +
843                               [peer] +
844                               list(addr)))
845   def addr(me, peer):
846     return _oneline(me.command('ADDR', peer))
847   def algs(me, peer = None):
848     return _keyvals(me.command('ALGS',
849                                *((peer is not None and [peer]) or [])))
850   def checkchal(me, chal):
851     return _simple(me.command('CHECKCHAL', chal))
852   def daemon(me):
853     return _simple(me.command('DAEMON'))
854   def eping(me, peer, **kw):
855     return _oneline(me.command(bg = True,
856                                *['EPING'] +
857                                _kwopts(kw, ['timeout']) +
858                                [peer]))
859   def forcekx(me, peer):
860     return _simple(me.command('FORCEKX', peer))
861   def getchal(me):
862     return _oneline(me.command('GETCHAL', filter = _tokenjoin))
863   def greet(me, peer, chal):
864     return _simple(me.command('GREET', peer, chal))
865   def help(me):
866     return list(me.command('HELP', filter = _tokenjoin))
867   def ifname(me, peer):
868     return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
869   def kill(me, peer):
870     return _simple(me.command('KILL', peer))
871   def list(me):
872     return list(me.command('LIST', filter = _tokenjoin))
873   def notify(me, *msg):
874     return _simple(me.command('NOTIFY', *msg))
875   def peerinfo(me, peer):
876     return _keyvals(me.command('PEERINFO', peer))
877   def ping(me, peer, **kw):
878     return _oneline(me.command(bg = True,
879                                *['PING'] +
880                                _kwopts(kw, ['timeout']) +
881                                [peer]))
882   def port(me, af = None):
883     return _oneline(me.command('PORT',
884                                *((af is not None) and [af] or []),
885                                filter = _tokenjoin))
886   def quit(me):
887     return _simple(me.command('QUIT'))
888   def reload(me):
889     return _simple(me.command('RELOAD'))
890   def servinfo(me):
891     return _keyvals(me.command('SERVINFO'))
892   def setifname(me, new):
893     return _simple(me.command('SETIFNAME', new))
894   def svcclaim(me, service, version):
895     return _simple(me.command('SVCCLAIM', service, version))
896   def svcensure(me, service, version = None):
897     return _simple(me.command('SVCENSURE', service,
898                               *((version is not None and [version]) or [])))
899   def svcfail(me, job, *msg):
900     return _simple(me.command('SVCFAIL', job, *msg))
901   def svcinfo(me, job, *msg):
902     return _simple(me.command('SVCINFO', job, *msg))
903   def svclist(me):
904     return list(me.command('SVCLIST'))
905   def svcok(me, job):
906     return _simple(me.command('SVCOK', job))
907   def svcquery(me, service):
908     return _keyvals(me.command('SVCQUERY', service))
909   def svcrelease(me, service):
910     return _simple(me.command('SVCRELEASE', service))
911   def svcsubmit(me, service, *args, **kw):
912     return me.command(bg = True,
913                       *['SVCSUBMIT'] +
914                       _kwopts(kw, ['version']) +
915                       [service] +
916                       list(args))
917   def stats(me, peer):
918     return _keyvals(me.command('STATS', peer))
919   def trace(me, *args):
920     return _tracelike(me.command('TRACE', *args))
921   def tunnels(me):
922     return list(me.command('TUNNELS', filter = _tokenjoin))
923   def version(me):
924     return _oneline(me.command('VERSION', filter = _tokenjoin))
925   def warn(me, *msg):
926     return _simple(me.command('WARN', *msg))
927   def watch(me, *args):
928     return _tracelike(me.command('WATCH', *args))
929
930 ###--------------------------------------------------------------------------
931 ### Asynchronous commands.
932
933 class TripeAsynchronousCommand (TripeCommand):
934   """
935   Asynchronous commands.
936
937   This is the complicated way of issuing commands.  You must set up a queue,
938   and associate the command with the queue.  Responses arriving for the
939   command will be put on the queue as an triple of the form (TAG, CODE, REST)
940   -- where TAG is an object of your choice, not interpreted by this class,
941   CODE is the server's response code (`OK', `INFO', `FAIL', or `CONNERR'),
942   and REST is the list of the rest of the server's tokens.
943
944   Using this, you can write coroutines which process many commands (and
945   possibly other events) simultaneously.
946   """
947
948   def __init__(me, queue, tag, words):
949     """Make an asynchronous command consisting of the given WORDS, which
950     sends responses to QUEUE, labelled with TAG."""
951     TripeCommand.__init__(me, words)
952     me.queue = queue
953     me.tag = tag
954
955   def response(me, code, *stuff):
956     """Handle a server response by writing it to the caller's queue."""
957     me.queue.put((me.tag, code, list(stuff)))
958
959 ###--------------------------------------------------------------------------
960 ### Services.
961
962 class TripeJobCancelled (Exception):
963   """
964   Exception sent to job handler if the client kills the job.
965
966   Not propagated further.
967   """
968   pass
969
970 class TripeJobError (Exception):
971   """
972   Exception to cause failure report for running job.
973
974   Sends an SVCFAIL code back.
975   """
976   pass
977
978 class TripeSyntaxError (Exception):
979   """
980   Exception to report a syntax error for a job.
981
982   Sends an SVCFAIL bad-svc-syntax message back.
983   """
984   pass
985
986 class TripeServiceManager (TripeCommandDispatcher):
987   """
988   A command dispatcher with added handling for incoming service requests.
989
990   There is usually only one instance of this class, called svcmgr.  Some of
991   the support functions in this module assume that this is the case.
992
993   To use, run `mLib.select' in a loop until the quitp method returns true;
994   then, in a non-root coroutine, register your services by calling `add', and
995   then call `running' when you've finished setting up.
996
997   The instance handles server service messages `SVCJOB', `SVCCANCEL' and
998   `SVCCLAIM'.  It maintains a table of running services.  Incoming jobs cause
999   the service's `job' method to be invoked; `SVCCANCEL' sends a
1000   `TripeJobCancelled' exception to the handler coroutine, and `SVCCLAIM'
1001   causes the relevant service to be deregistered.
1002
1003   There is no base class for jobs, but a job must implement two methods:
1004
1005   start()               Begin processing; might be a no-op.
1006
1007   cancel()              Stop processing; the original client has killed the
1008                         job.
1009
1010   The life of a service manager is divided into two parts: setup and running;
1011   you tell the manager that you've finished setting up by calling the
1012   `running' method.  If, at any point after setup is finished, there are no
1013   remaining services or jobs, `quitp' will return true, ending the process.
1014   """
1015
1016   ## --- Attributes ---
1017   ##
1018   ## svc                Mapping name -> service object
1019   ##
1020   ## job                Mapping jobid -> job handler coroutine
1021   ##
1022   ## runningp           True when setup is finished
1023   ##
1024   ## _quitp             True if explicit quit has been requested
1025
1026   def __init__(me, socket):
1027     """
1028     Initialize the service manager.
1029
1030     SOCKET is the administration socket to connect to.
1031     """
1032     TripeCommandDispatcher.__init__(me, socket)
1033     me.svc = {}
1034     me.job = {}
1035     me.runningp = False
1036     me.handler['SVCCANCEL'] = me._cancel
1037     me.handler['SVCJOB'] = me._job
1038     me.handler['SVCCLAIM'] = me._claim
1039     me._quitp = 0
1040
1041   def addsvc(me, svc):
1042     """Register a new service; SVC is a `TripeService' instance."""
1043     assert svc.name not in me.svc
1044     me.svcclaim(svc.name, svc.version)
1045     me.svc[svc.name] = svc
1046
1047   def _cancel(me, _, jid):
1048     """
1049     Called when the server cancels a job; invokes the job's `cancel' method.
1050     """
1051     job = me.job[jid]
1052     del me.job[jid]
1053     job.cancel()
1054
1055   def _claim(me, _, svc, __):
1056     """Called when another program claims our service at a higher version."""
1057     del me.svc[svc]
1058
1059   def _job(me, _, jid, svc, cmd, *args):
1060     """
1061     Called when the server sends us a job to do.
1062
1063     Calls the service to collect a job, and begins processing it.
1064     """
1065     assert jid not in me.job
1066     svc = me.svc[svc.lower()]
1067     job = svc.job(jid, cmd, args)
1068     me.job[jid] = job
1069     job.start()
1070
1071   def running(me):
1072     """Answer true if setup is finished."""
1073     me.runningp = True
1074
1075   def jobdone(me, jid):
1076     """Informs the service manager that the job with id JID has finished."""
1077     try:
1078       del me.job[jid]
1079     except KeyError:
1080       pass
1081
1082   def quitp(me):
1083     """
1084     Return true if no services or jobs are active (and, therefore, if this
1085     process can quit without anyone caring).
1086     """
1087     return me._quitp or (me.runningp and ((not me.svc and not me.job) or
1088                                           not me.sock))
1089
1090   def quit(me):
1091     """Forces the quit flag (returned by quitp) on."""
1092     me._quitp = True
1093
1094 class TripeService (object):
1095   """
1096   A standard service.
1097
1098   The NAME and VERSION are passed on to the server.  The CMDTAB is a
1099   dictionary mapping command names (in lowercase) to command objects.
1100
1101   If the CMDTAB doesn't have entries for commands `HELP' and `QUIT' then
1102   defaults are provided.
1103
1104   TripeService itself is mostly agnostic about the nature of command objects,
1105   but the TripeServiceJob class (below) has some requirements.  The built-in
1106   HELP command requires command objects to have `usage' attributes.
1107   """
1108
1109   def __init__(me, name, version, cmdtab):
1110     """
1111     Create and register a new service with the given NAME and VERSION.
1112
1113     CMDTAB maps command names (in lower-case) to command objects.
1114     """
1115     me.name = name
1116     me.version = version
1117     me.cmd = cmdtab
1118     me.activep = True
1119     me.cmd.setdefault('help',
1120                       TripeServiceCommand('help', 0, 0, '', me._help))
1121     me.cmd.setdefault('quit',
1122                       TripeServiceCommand('quit', 0, 0, '', me._quit))
1123
1124   def job(me, jid, cmd, args):
1125     """
1126     Called by the service manager: a job arrived with id JID.
1127
1128     It asks for comamnd CMD with argument list ARGS.  Creates a new job,
1129     passing it the information needed.
1130     """
1131     return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1132
1133   ## Simple default command handlers, complying with the spec in
1134   ## tripe-service(7).
1135
1136   def _help(me):
1137     """Send a help summary to the user."""
1138     cmds = me.cmd.items()
1139     cmds.sort()
1140     for name, cmd in cmds:
1141       svcinfo(name, *cmd.usage)
1142
1143   def _quit(me):
1144     """Terminate the service manager."""
1145     svcmgr.notify('svc-quit', me.name, 'admin-request')
1146     svcmgr.quit()
1147
1148 class TripeServiceCommand (object):
1149   """A simple service command."""
1150
1151   def __init__(me, name, min, max, usage, func):
1152     """
1153     Creates a new command.
1154
1155     NAME is the command's name (in lowercase).
1156
1157     MIN and MAX are the minimum and maximum number of allowed arguments (used
1158     for checking); either may be None to indicate no minimum or maximum.
1159
1160     USAGE is a usage string, used for generating help and error messages.
1161
1162     FUNC is the function to invoke.
1163     """
1164     me.name = name
1165     me.min = min
1166     me.max = max
1167     me.usage = usage.split()
1168     me.func = func
1169
1170   def run(me, *args):
1171     """
1172     Called when the command is invoked.
1173
1174     Does minimal checking of the arguments and calls the supplied function.
1175     """
1176     if (me.min is not None and len(args) < me.min) or \
1177        (me.max is not None and len(args) > me.max):
1178       raise TripeSyntaxError()
1179     me.func(*args)
1180
1181 class TripeServiceJob (Coroutine):
1182   """
1183   Job handler coroutine.
1184
1185   A standard `TripeService' invokes a `TripeServiceJob' for each incoming job
1186   request, passing it the jobid, command and arguments, and a command object.
1187   The command object needs the following attributes.
1188
1189   usage                 A usage list (excluding the command name) showing
1190                         arguments and options.
1191
1192   run(*ARGS)            Function to react to the command with ARGS split into
1193                         separate arguments.  Invoked in a coroutine.  The
1194                         `svcinfo function (not the `TripeCommandDispatcher'
1195                         method) may be used to send `INFO' lines.  The
1196                         function may raise `TripeJobError' to send a `FAIL'
1197                         response back, or `TripeSyntaxError' to send a
1198                         generic usage error.  `TripeJobCancelled' exceptions
1199                         are trapped silently.  Other exceptions are
1200                         translated into a generic internal-error message.
1201
1202   This class automatically takes care of sending some closing response to the
1203   job, and for informing the service manager that the job is completed.
1204
1205   The `jid' attribute stores the job's id.
1206   """
1207
1208   def __init__(me, jid, svc, cmd, command, args):
1209     """
1210     Start a new job.
1211
1212     The job is created with id JID, for service SVC, processing command name
1213     CMD (which the service resolved into the command object COMMAND, or
1214     `None'), and with the arguments ARGS.
1215     """
1216     Coroutine.__init__(me)
1217     me.jid = jid
1218     me.svc = svc
1219     me.cmd = cmd
1220     me.command = command
1221     me.args = args
1222
1223   def run(me):
1224     """
1225     Main body of the coroutine.
1226
1227     Does the tedious exception handling boilerplate and invokes the command's
1228     run method.
1229     """
1230     try:
1231       try:
1232         if me.command is None:
1233           svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1234         else:
1235           me.command.run(*me.args)
1236           svcmgr.svcok(me.jid)
1237       except TripeJobError, exc:
1238         svcmgr.svcfail(me.jid, *exc.args)
1239       except TripeSyntaxError:
1240         svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1241                        me.svc.name, me.command.name,
1242                        *me.command.usage)
1243       except TripeJobCancelled:
1244         pass
1245       except Exception, exc:
1246         svcmgr.svcfail(me.jid, 'svc-internal-error',
1247                        exc.__class__.__name__, str(exc))
1248     finally:
1249       svcmgr.jobdone(me.jid)
1250
1251   def start(me):
1252     """Invoked by the service manager to start running the coroutine."""
1253     me.switch()
1254
1255   def cancel(me):
1256     """Invoked by the service manager to cancel the job."""
1257     me.throw(TripeJobCancelled())
1258
1259 def svcinfo(*args):
1260   """
1261   If invoked from a TripeServiceJob coroutine, sends an `INFO' line to the
1262   job's sender, automatically using the correct job id.
1263   """
1264   svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1265
1266 def _setupsvc(tab, func):
1267   """
1268   Setup coroutine for setting up service programs.
1269
1270   Register the given services.
1271   """
1272   try:
1273     for service in tab:
1274       svcmgr.addsvc(service)
1275     if func:
1276       func()
1277   finally:
1278     svcmgr.running()
1279
1280 svcmgr = TripeServiceManager(None)
1281 def runservices(socket, tab, init = None, setup = None, daemon = False):
1282   """
1283   Function to start a service provider.
1284
1285   SOCKET is the socket to connect to, usually tripesock.
1286
1287   TAB is a list of entries.  An entry may be either a tuple
1288
1289     (NAME, VERSION, COMMANDS)
1290
1291   or a service object (e.g., a `TripeService' instance).
1292
1293   COMMANDS is a dictionary mapping command names to tuples
1294
1295     (MIN, MAX, USAGE, FUNC)
1296
1297   of arguments for a `TripeServiceCommand' object.
1298
1299   If DAEMON is true, then the process is forked into the background before we
1300   start.  If INIT is given, it is called in the main coroutine, immediately
1301   after forking.  If SETUP is given, it is called in a coroutine, after
1302   calling INIT and setting up the services but before marking the service
1303   manager as running.
1304
1305   It is a really bad idea to do any initialization, particularly setting up
1306   coroutines, outside of the INIT or SETUP functions.  In particular, if
1307   we're using rmcr for fake coroutines, the daemonizing fork will kill off
1308   the currently established coroutines in a most surprising way.
1309
1310   The function runs a main select loop until the service manager decides to
1311   quit.
1312   """
1313
1314   svcmgr.socket = socket
1315   svcmgr.connect()
1316   svcs = []
1317   for service in tab:
1318     if not isinstance(service, tuple):
1319       svcs.append(service)
1320     else:
1321       name, version, commands = service
1322       cmdmap = {}
1323       for cmd, stuff in commands.iteritems():
1324         cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1325       svcs.append(TripeService(name, version, cmdmap))
1326   if daemon:
1327     M.daemonize()
1328   if init is not None:
1329     init()
1330   spawn(_setupsvc, svcs, setup)
1331   svcmgr.mainloop()
1332
1333 ###--------------------------------------------------------------------------
1334 ### Utilities for services.
1335
1336 _timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
1337 def timespec(spec):
1338   """Parse the timespec SPEC, returning a number of seconds."""
1339   mul = 1
1340   if len(spec) > 1 and spec[-1] in _timeunits:
1341     mul = _timeunits[spec[-1]]
1342     spec = spec[:-1]
1343   try:
1344     t = int(spec)
1345   except:
1346     raise TripeJobError('bad-time-spec', spec)
1347   if t < 0:
1348     raise TripeJobError('bad-time-spec', spec)
1349   return mul * int(spec)
1350
1351 class OptParse (object):
1352   """
1353   Parse options from a command list in the conventional fashion.
1354
1355   ARGS is a list of arguments to a command.  ALLOWED is a sequence of allowed
1356   options.  The returned values are the option tags.  During parsing, the
1357   `arg' method may be used to retrieve the argument for the most recent
1358   option.  Afterwards, `rest' may be used to retrieve the remaining
1359   non-option arguments, and do a simple check on how many there are.
1360
1361   The parser correctly handles `--' option terminators.
1362   """
1363
1364   def __init__(me, args, allowed):
1365     """
1366     Create a new option parser.
1367
1368     The parser will scan the ARGS for options given in the sequence ALLOWED
1369     (which are expected to include the `-' prefix).
1370     """
1371     me.allowed = {}
1372     for a in allowed:
1373       me.allowed[a] = True
1374     me.args = list(args)
1375
1376   def __iter__(me):
1377     """Iterator protocol: I am my own iterator."""
1378     return me
1379
1380   def next(me):
1381     """
1382     Iterator protocol: return the next option.
1383
1384     If we've run out, raise `StopIteration'.
1385     """
1386     if len(me.args) == 0 or \
1387        len(me.args[0]) < 2 or \
1388        not me.args[0].startswith('-'):
1389       raise StopIteration()
1390     opt = me.args.pop(0)
1391     if opt == '--':
1392       raise StopIteration()
1393     if opt not in me.allowed:
1394       raise TripeSyntaxError()
1395     return opt
1396
1397   def arg(me):
1398     """
1399     Return the argument for the most recent option.
1400
1401     If none is available, raise `TripeSyntaxError'.
1402     """
1403     if len(me.args) == 0:
1404       raise TripeSyntaxError()
1405     return me.args.pop(0)
1406
1407   def rest(me, min = None, max = None):
1408     """
1409     After option parsing is done, return the remaining arguments.
1410
1411     Check that there are at least MIN and at most MAX arguments remaining --
1412     either may be None to suppress the check.
1413     """
1414     if (min is not None and len(me.args) < min) or \
1415        (max is not None and len(me.args) > max):
1416       raise TripeSyntaxError()
1417     return me.args
1418
1419 ###----- That's all, folks --------------------------------------------------