chiark / gitweb /
server/peer.c, etc.: Introduce who-goes-there protocol.
[tripe] / py / tripe.py.in
1 ### -*-python-*-
2 ###
3 ### Administration connection with tripe server
4 ###
5 ### (c) 2006 Straylight/Edgeware
6 ###
7
8 ###----- Licensing notice ---------------------------------------------------
9 ###
10 ### This file is part of Trivial IP Encryption (TrIPE).
11 ###
12 ### TrIPE is free software: you can redistribute it and/or modify it under
13 ### the terms of the GNU General Public License as published by the Free
14 ### Software Foundation; either version 3 of the License, or (at your
15 ### option) any later version.
16 ###
17 ### TrIPE is distributed in the hope that it will be useful, but WITHOUT
18 ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
19 ### FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
20 ### for more details.
21 ###
22 ### You should have received a copy of the GNU General Public License
23 ### along with TrIPE.  If not, see <https://www.gnu.org/licenses/>.
24
25 """
26 This module provides classes and functions for connecting to a running tripe
27 server, sending it commands, receiving and processing replies, and
28 implementing services.
29
30 Rather than end up in lost in a storm of little event-driven classes, or a
31 morass of concurrent threads, the module uses coroutines to present a fairly
32 simple function call/return interface to potentially long-running commands
33 which must run without blocking the main process.  It assumes a coroutine
34 module presenting a subset of the `greenlet' interface: if actual greenlets
35 are available, they are used; otherwise there's an implementation in terms of
36 threads (with lots of locking) which will do instead.
37
38 The simple rule governing the coroutines used here is this:
39
40   * The root coroutine never cares what values are passed to it when it
41     resumes: it just discards them.
42
43   * Other, non-root, coroutines are presumed to be waiting for some specific
44     thing.
45
46 Configuration variables:
47         configdir
48         socketdir
49         PACKAGE
50         VERSION
51         tripesock
52         peerdb
53
54 Other useful variables:
55         rootcr
56         svcmgr
57
58 Other tweakables:
59         _debug
60
61 Exceptions:
62         Exception
63           StandardError
64             TripeConnectionError
65             TripeError
66             TripeInternalError
67           TripeJobCancelled
68           TripeJobError
69           TripeSyntaxError
70
71 Classes:
72         _Coroutine
73           Coroutine
74             TripeServiceJob
75         OptParse
76         Queue
77         SelIOWatcher
78         TripeCommand
79           TripeSynchronousCommand
80           TripeAsynchronousCommand
81         TripeCommandIterator
82         TripeConnection
83           TripeCommandDispatcher
84             TripeServiceManager
85         TripeService
86         TripeServiceCommand
87
88 Utility functions:
89         quotify
90         runservices
91         spawn
92         svcinfo
93         timespec
94 """
95
96 __pychecker__ = 'self=me no-constCond no-argsused'
97
98 _debug = False
99
100 ###--------------------------------------------------------------------------
101 ### External dependencies.
102
103 import socket as S
104 import errno as E
105 import mLib as M
106 import re as RX
107 import sys as SYS
108 import os as OS
109
110 try:
111   if OS.getenv('TRIPE_FORCE_RMCR') is not None:
112     raise ImportError()
113   from py.magic import greenlet as _Coroutine
114 except ImportError:
115   from rmcr import Coroutine as _Coroutine
116
117 ###--------------------------------------------------------------------------
118 ### Coroutine hacking.
119
120 rootcr = _Coroutine.getcurrent()
121
122 class Coroutine (_Coroutine):
123   """
124   A coroutine class which can only be invoked by the root coroutine.
125
126   The root, by construction, cannot be an instance of this class.
127   """
128   def switch(me, *args, **kw):
129     assert _Coroutine.getcurrent() is rootcr
130     if _debug: print '* %s' % me
131     _Coroutine.switch(me, *args, **kw)
132     if _debug: print '* %s' % rootcr
133
134 ###--------------------------------------------------------------------------
135 ### Default places for things.
136
137 configdir = OS.environ.get('TRIPEDIR', "@configdir@")
138 socketdir = "@socketdir@"
139 PACKAGE = "@PACKAGE@"
140 VERSION = "@VERSION@"
141
142 tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
143 peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb')
144
145 ###--------------------------------------------------------------------------
146 ### Connection to the server.
147
148 def readnonblockingly(sock, len):
149   """
150   Nonblocking read from SOCK.
151
152   Try to return LEN bytes.  If couldn't read anything, return `None'.  EOF is
153   returned as an empty string.
154   """
155   try:
156     sock.setblocking(0)
157     return sock.recv(len)
158   except S.error, exc:
159     if exc[0] == E.EWOULDBLOCK:
160       return None
161     raise
162
163 class TripeConnectionError (StandardError):
164   """Something happened to the connection with the server."""
165   pass
166 class TripeInternalError (StandardError):
167   """This program is very confused."""
168   pass
169
170 class TripeConnection (object):
171   """
172   A logical connection to the tripe administration socket.
173
174   There may or may not be a physical connection.  (This is needed for the
175   monitor, for example.)
176
177   This class isn't very useful on its own, but it has useful subclasses.  At
178   this level, the class is agnostic about I/O multiplexing schemes; that gets
179   added later.
180   """
181
182   def __init__(me, socket):
183     """
184     Make a connection to the named SOCKET.
185
186     No physical connection is made initially.
187     """
188     me.socket = socket
189     me.sock = None
190     me.lbuf = None
191     me.iowatch = SelIOWatcher(me)
192
193   def connect(me):
194     """
195     Ensure that there's a physical connection.
196
197     Do nothing if we're already connected.  Invoke the `connected' method if
198     successful.
199     """
200     if me.sock: return
201     sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
202     sock.connect(me.socket)
203     me.sock = sock
204     me.lbuf = M.LineBuffer(me.line, me._eof)
205     me.lbuf.size = 1024
206     me.connected()
207     return me
208
209   def disconnect(me, reason):
210     """
211     Disconnect the physical connection.
212
213     Invoke the `disconnected' method, giving the provided REASON, which
214     should be either `None' or an exception.
215     """
216     if not me.sock: return
217     me.disconnected(reason)
218     me.sock.close()
219     me.sock = None
220     me.lbuf.disable()
221     me.lbuf = None
222     return me
223
224   def connectedp(me):
225     """
226     Return true if there's a current, believed-good physical connection.
227     """
228     return me.sock is not None
229
230   __nonzero__ = connectedp
231
232   def send(me, line):
233     """
234     Send the LINE to the connection's socket.
235
236     All output is done through this method; it can be overridden to provide
237     proper nonblocking writing, though this seems generally unnecessary.
238     """
239     try:
240       me.sock.setblocking(1)
241       me.sock.send(line + '\n')
242     except Exception, exc:
243       me.disconnect(exc)
244       raise
245     return me
246
247   def receive(me):
248     """
249     Receive whatever's ready from the connection's socket.
250
251     Call `line' on each complete line, and `eof' if the connection closed.
252     Subclasses which attach this class to an I/O-event system should call
253     this method when the socket (the `sock' attribute) is ready for reading.
254     """
255     while me.sock is not None:
256       try:
257         buf = readnonblockingly(me.sock, 16384)
258       except Exception, exc:
259         me.disconnect(exc)
260         raise
261       if buf is None:
262         return me
263       if buf == '':
264         me._eof()
265         return me
266       me.lbuf.flush(buf)
267     return me
268
269   def _eof(me):
270     """Internal end-of-file handler."""
271     me.disconnect(TripeConnectionError('connection lost'))
272     me.eof()
273
274   def connected(me):
275     """
276     To be overridden by subclasses to react to a connection being
277     established.
278     """
279     me.iowatch.connected(me.sock)
280
281   def disconnected(me, reason):
282     """
283     To be overridden by subclasses to react to a connection being severed.
284     """
285     me.iowatch.disconnected()
286
287   def eof(me):
288     """To be overridden by subclasses to handle end-of-file."""
289     pass
290
291   def line(me, line):
292     """To be overridden by subclasses to handle incoming lines."""
293     pass
294
295 ###--------------------------------------------------------------------------
296 ### I/O loop integration.
297
298 class SelIOWatcher (object):
299   """
300   Integration with mLib's I/O event system.
301
302   You can replace this object with a different one for integration with,
303   e.g., glib's main loop, by setting `CONN.iowatcher' to a different object
304   while the CONN is disconnected.
305   """
306
307   def __init__(me, conn):
308     me._conn = conn
309     me._selfile = None
310
311   def connected(me, sock):
312     """
313     Called when a connection is made.
314
315     SOCK is the socket.  The watcher must arrange to call `CONN.receive' when
316     data is available.
317     """
318     me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive)
319     me._selfile.enable()
320
321   def disconnected(me):
322     """
323     Called when the connection is lost.
324     """
325     me._selfile = None
326
327   def iterate(me):
328     """
329     Wait for something interesting to happen, and issue events.
330
331     That is, basically, do one iteration of a main select loop, processing
332     all of the events, and then return.  This is used in the method
333     `TripeCommandDispatcher.mainloop', but that's mostly for the benefit of
334     `runservices'; if your I/O watcher has a different main loop, you can
335     drive it yourself.
336     """
337     M.select()
338
339 ###--------------------------------------------------------------------------
340 ### Inter-coroutine communication.
341
342 class Queue (object):
343   """
344   A queue of things arriving asynchronously.
345
346   This is a very simple single-reader multiple-writer queue.  It's useful for
347   more complex coroutines which need to cope with a variety of possible
348   incoming events.
349   """
350
351   def __init__(me):
352     """Create a new empty queue."""
353     me.contents = M.Array()
354     me.waiter = None
355
356   def _wait(me):
357     """
358     Internal: wait for an item to arrive in the queue.
359
360     Complain if someone is already waiting, because this is just a
361     single-reader queue.
362     """
363     if me.waiter:
364       raise ValueError('queue already being waited on')
365     try:
366       me.waiter = Coroutine.getcurrent()
367       while not me.contents:
368         me.waiter.parent.switch()
369     finally:
370       me.waiter = None
371
372   def get(me):
373     """
374     Remove and return the item at the head of the queue.
375
376     If the queue is empty, wait until an item arrives.
377     """
378     me._wait()
379     return me.contents.shift()
380
381   def peek(me):
382     """
383     Return the item at the head of the queue without removing it.
384
385     If the queue is empty, wait until an item arrives.
386     """
387     me._wait()
388     return me.contents[0]
389
390   def put(me, thing):
391     """
392     Write THING to the queue.
393
394     If someone is waiting on the queue, wake him up immediately; otherwise
395     just leave the item there for later.
396     """
397     me.contents.push(thing)
398     if me.waiter:
399       me.waiter.switch()
400
401 ###--------------------------------------------------------------------------
402 ### Dispatching coroutine.
403
404 ## Match a string if it can stand on its own as a bareword: i.e., it doesn't
405 ## contain backslashes, quotes or whitespace.
406 rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
407
408 ## Match characters which need to be escaped, even in quoted text.
409 rx_weird = RX.compile(r'([\\\'])')
410
411 def quotify(s):
412   """Quote S according to the tripe-admin(5) rules."""
413   m = rx_ordinary.match(s)
414   if m and m.end() == len(s):
415     return s
416   else:
417     return "'" + rx_weird.sub(r'\\\1', s) + "'"
418
419 def _callback(func):
420   """
421   Return a wrapper for FUNC which reports exceptions thrown by it.
422
423   Useful in the case of callbacks invoked by C functions which ignore
424   exceptions.
425   """
426   def _(*a, **kw):
427     try:
428       return func(*a, **kw)
429     except:
430       SYS.excepthook(*SYS.exc_info())
431       raise
432   return _
433
434 class TripeCommand (object):
435   """
436   This abstract class represents a command in progress.
437
438   The `words' attribute contains the list of tokens which make up the
439   command.
440
441   Subclasses must implement a method to handle server responses:
442
443     * response(CODE, *ARGS): CODE is one of the strings `OK', `INFO' or
444       `FAIL'; ARGS are the remaining tokens from the server's response.
445   """
446
447   def __init__(me, words):
448     """Make a new command consisting of the given list of WORDS."""
449     for word in words:
450       if '\n' in word:
451         raise TripeInternalError("command word contains newline")
452     me.words = words
453
454 class TripeSynchronousCommand (TripeCommand):
455   """
456   A simple command, processed apparently synchronously.
457
458   Must be invoked from a coroutine other than the root (or whichever one is
459   running the dispatcher); in reality, other coroutines carry on running
460   while we wait for a response from the server.
461
462   Each server response causes the calling coroutine to be resumed with the
463   pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
464   or `FAIL') and REST is a list of the server's other response tokens.  The
465   calling coroutine must continue switching back to the dispatcher until a
466   terminating response (`OK' or `FAIL') is received or become very
467   confused.
468
469   Mostly it's better to use the `TripeCommandIterator' to do this
470   automatically.
471   """
472
473   def __init__(me, words):
474     """Initialize the command, specifying the WORDS to send to the server."""
475     TripeCommand.__init__(me, words)
476     me.owner = Coroutine.getcurrent()
477
478   def response(me, code, *rest):
479     """Handle a server response by forwarding it to the calling coroutine."""
480     me.owner.switch((code, rest))
481
482 class TripeError (StandardError):
483   """
484   A tripe command failed with an error (a `FAIL' code).  The args attribute
485   contains a list of the server's message tokens.
486   """
487   pass
488
489 class TripeCommandIterator (object):
490   """
491   Iterator interface to a tripe command.
492
493   The values returned by the iterator are lists of tokens from the server's
494   `INFO' lines, as processed by the given filter function, if any.  The
495   iterator completes normally (by raising `StopIteration') if the server
496   reported `OK', and raises an exception if the command failed for some reason.
497
498   A `TripeError' is raised if the server issues a `FAIL' code.  If the
499   connection failed, some other exception is raised.
500   """
501
502   def __init__(me, dispatcher, words, bg = False, filter = None):
503     """
504     Create a new command iterator.
505
506     The command is submitted to the DISPATCHER; it consists of the given
507     WORDS.  If BG is true, then an option is inserted to request that the
508     server run the command in the background.  The FILTER is applied to the
509     token lists which the server responds, and the filter's output are the
510     items returned by the iterator.
511     """
512     me.dcr = Coroutine.getcurrent().parent
513     if me.dcr is None:
514       raise ValueError('must invoke from coroutine')
515     me.filter = filter or (lambda x: x)
516     if bg:
517       words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
518     dispatcher.rawcommand(TripeSynchronousCommand(words))
519
520   def __iter__(me):
521     """Iterator protocol: I am my own iterator."""
522     return me
523
524   def next(me):
525     """
526     Iterator protocol: return the next piece of information from the server.
527
528     `INFO' responses are filtered and returned as the values of the
529     iteration.  `FAIL' and `CONNERR' responses are turned into exceptions and
530     raised.  Finally, `OK' is turned into `StopIteration', which should cause
531     a normal end to the iteration process.
532     """
533     thing = me.dcr.switch()
534     code, rest = thing
535     if code == 'INFO':
536       return me.filter(rest)
537     elif code == 'OK':
538       raise StopIteration()
539     elif code == 'CONNERR':
540       if rest is None:
541         raise TripeConnectionError('connection terminated by user')
542       else:
543         raise rest
544     elif code == 'FAIL':
545       raise TripeError(*rest)
546     else:
547       raise TripeInternalError('unexpected tripe response %r' %
548                                ([code] + rest))
549
550 ### Simple utility functions for the TripeCommandIterator convenience
551 ### methods.
552
553 def _tokenjoin(words):
554   """Filter function: simply join the given tokens with spaces between."""
555   return ' '.join(words)
556
557 def _keyvals(iter):
558   """Return a dictionary formed from the `KEY=VALUE' pairs returned by the
559   iterator ITER."""
560   kv = {}
561   for ww in iter:
562     for w in ww:
563       q = w.index('=')
564       kv[w[:q]] = w[q + 1:]
565   return kv
566
567 def _simple(iter):
568   """Raise an error if ITER contains any item."""
569   stuff = list(iter)
570   if len(stuff) != 0:
571     raise TripeInternalError('expected no response')
572   return None
573
574 def _oneline(iter):
575   """If ITER contains a single item, return it; otherwise raise an error."""
576   stuff = list(iter)
577   if len(stuff) != 1:
578     raise TripeInternalError('expected only one line of response')
579   return stuff[0]
580
581 def _tracelike(iter):
582   """Handle a TRACE-like command.  The result is a list of tuples (CHAR,
583   STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
584   disabled, `+' if enabled, maybe something else later), and DESC is the
585   human-readable description."""
586   stuff = []
587   for ww in iter:
588     ch = ww[0][0]
589     st = ww[0][1:]
590     desc = ' '.join(ww[1:])
591     stuff.append((ch, st, desc))
592   return stuff
593
594 def _kwopts(kw, allowed):
595   """Parse keyword arguments into options.  ALLOWED is a list of allowable
596   keywords; raise errors if other keywords are present.  `KEY = VALUE'
597   becomes an option pair `-KEY VALUE' if VALUE is a string, just the option
598   `-KEY' if VALUE is a true non-string, or nothing if VALUE is false.  Insert
599   a `--' at the end to stop the parser getting confused."""
600   opts = []
601   amap = {}
602   for a in allowed: amap[a] = True
603   for k, v in kw.iteritems():
604     if k not in amap:
605       raise ValueError('option %s not allowed here' % k)
606     if isinstance(v, str):
607       opts += ['-' + k, v]
608     elif v:
609       opts += ['-' + k]
610   opts.append('--')
611   return opts
612
613 ## Deferral.
614 _deferq = []
615 def defer(func, *args, **kw):
616   """Call FUNC(*ARGS, **KW) later, in the root coroutine."""
617   _deferq.append((func, args, kw))
618
619 def funargstr(func, args, kw):
620   items = [repr(a) for a in args]
621   for k, v in kw.iteritems():
622     items.append('%s = %r' % (k, v))
623   return '%s(%s)' % (func.__name__, ', '.join(items))
624
625 def spawn(func, *args, **kw):
626   """Call FUNC, passing ARGS and KW, in a fresh coroutine."""
627   defer(lambda: (Coroutine(func, name = funargstr(func, args, kw))
628                  .switch(*args, **kw)))
629
630 ## Asides.
631 _asideq = Queue()
632 def _runasides():
633   """
634   Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW).
635   """
636   while True:
637     func, args, kw = _asideq.get()
638     try:
639       func(*args, **kw)
640     except:
641       SYS.excepthook(*SYS.exc_info())
642
643 def aside(func, *args, **kw):
644   """Call FUNC(*ARGS, **KW) later, in a non-root coroutine."""
645   defer(_asideq.put, (func, args, kw))
646
647 class TripeCommandDispatcher (TripeConnection):
648   """
649   Command dispatcher.
650
651   The command dispatcher is a connection which knows how to handle commands.
652   This is probably the most important class in this module to understand.
653
654   Lines from the server are parsed into tokens.  The first token is a code
655   (`OK' or `NOTE' or something) explaining what kind of line this is.  The
656   `handler' attribute is a dictionary mapping server line codes to handler
657   functions, which are applied to the words of the line as individual
658   arguments.  *Exception*: the content of `TRACE' lines is not tokenized.
659
660   There are default handlers for server codes which respond to commands.
661   Commands arrive as `TripeCommand' instances through the `rawcommand'
662   interface.  The dispatcher keeps track of which command objects represent
663   which jobs, and sends responses on to the appropriate command objects by
664   invoking their `response' methods.  Command objects don't see the `BG...'
665   codes, because the dispatcher has already transformed them into regular
666   codes when it was looking up the job tag.
667
668   The dispatcher also has a special response code of its own: `CONNERR'
669   indicates that the connection failed and the command has therefore been
670   lost.  This is sent to all outstanding commands when a connection error is
671   encountered: rather than a token list, it is accompanied by an exception
672   object which is the cause of the disconnection, which may be `None' if the
673   disconnection is expected (e.g., the direct result of a user request).
674   """
675
676   ## --- Infrastructure ---
677   ##
678   ## We will get confused if we pipeline commands.  Send them one at a time.
679   ## Only send a command when the previous one detaches or completes.
680   ##
681   ## The following attributes are interesting:
682   ##
683   ## tagseq             Sequence number for next background job (for bgtag)
684   ##
685   ## queue              Commands awaiting submission.
686   ##
687   ## cmd                Mapping from job tags to commands: cmd[None] is the
688   ##                    foreground command.
689   ##
690   ## handler            Mapping from server codes to handler functions.
691
692   def __init__(me, socket):
693     """
694     Initialize the dispatcher.
695
696     The SOCKET is the filename of the administration socket to connect to,
697     for TripeConnection.__init__.
698     """
699     TripeConnection.__init__(me, socket)
700     me.tagseq = 0
701     me.handler = {}
702     me.handler['BGDETACH'] = me._detach
703     for i in 'BGOK', 'BGINFO', 'BGFAIL':
704       me.handler[i] = me._response
705     for i in 'OK', 'INFO', 'FAIL':
706       me.handler[i] = me._fgresponse
707
708   def quitp(me):
709     """Should we quit the main loop?  Subclasses should override."""
710     return False
711
712   def mainloop(me, quitp = None):
713     """
714     Iterate the I/O watcher until QUITP returns true.
715
716     Arranges for asides and deferred calls to be made at the right times.
717     """
718
719     global _deferq
720     assert _Coroutine.getcurrent() is rootcr
721     Coroutine(_runasides, name = '_runasides').switch()
722     if quitp is None:
723       quitp = me.quitp
724     while not quitp():
725       while _deferq:
726         q = _deferq
727         _deferq = []
728         for func, args, kw in q:
729           func(*args, **kw)
730       me.iowatch.iterate()
731
732   def connected(me):
733     """
734     Connection hook.
735
736     If a subclass overrides this method, it must call us; clears out the
737     command queue and job map.
738     """
739     me.queue = M.Array()
740     me.cmd = {}
741     TripeConnection.connected(me)
742
743   def disconnected(me, reason):
744     """
745     Disconnection hook.
746
747     If a subclass hooks overrides this method, it must call us; sends a
748     special `CONNERR' code to all incomplete commands.
749     """
750     TripeConnection.disconnected(me, reason)
751     for cmd in me.cmd.itervalues():
752       cmd.response('CONNERR', reason)
753     for cmd in me.queue:
754       cmd.response('CONNERR', reason)
755
756   @_callback
757   def line(me, line):
758     """Handle an incoming line, sending it to the right place."""
759     if _debug: print '<', line
760     code, rest = M.word(line, quotep = True)
761     func = me.handler.get(code)
762     if func is not None:
763       if code == 'TRACE':
764         func(code, rest)
765       else:
766         func(code, *M.split(rest, quotep = True)[0])
767       me.dequeue()
768
769   def dequeue(me):
770     """
771     Pull the oldest command off the queue and try to send it to the server.
772     """
773     if not me.queue or None in me.cmd: return
774     cmd = me.queue.shift()
775     if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
776     me.send(' '.join([quotify(w) for w in cmd.words]))
777     me.cmd[None] = cmd
778
779   def bgtag(me):
780     """
781     Return an unused job tag.
782
783     May be of use when composing commands by hand.
784     """
785     tag = 'J%05d' % me.tagseq
786     me.tagseq += 1
787     return tag
788
789   ## --- Built-in handler functions for server responses ---
790
791   def _detach(me, _, tag):
792     """
793     Respond to a `BGDETACH' TAG message.
794
795     Move the current foreground command to the background.
796     """
797     assert tag not in me.cmd
798     me.cmd[tag] = me.cmd[None]
799     del me.cmd[None]
800
801   def _response(me, code, tag, *w):
802     """
803     Respond to an `OK', `INFO' or `FAIL' message.
804
805     If this is a message for a background job, find the tag; then dispatch
806     the result to the command object.  This is also called by `_fgresponse'
807     (wth TAG set to `None') to handle responses for foreground commands, and
808     is therefore a useful method to extend or override in subclasses.
809     """
810     if code.startswith('BG'):
811       code = code[2:]
812     cmd = me.cmd[tag]
813     if code != 'INFO':
814       del me.cmd[tag]
815     cmd.response(code, *w)
816
817   def _fgresponse(me, code, *w):
818     """Process responses to the foreground command."""
819     me._response(code, None, *w)
820
821   ## --- Interface methods ---
822
823   def rawcommand(me, cmd):
824     """
825     Submit the `TripeCommand' CMD to the server, and look after it until it
826     completes.
827     """
828     if not me.connectedp():
829       raise TripeConnectionError('connection closed')
830     me.queue.push(cmd)
831     me.dequeue()
832
833   def command(me, *cmd, **kw):
834     """Convenience wrapper for creating a TripeCommandIterator object."""
835     return TripeCommandIterator(me, cmd, **kw)
836
837   ## --- Convenience methods for server commands ---
838
839   def add(me, peer, *addr, **kw):
840     return _simple(me.command(bg = True,
841                               *['ADD'] +
842                               _kwopts(kw, ['tunnel', 'keepalive',
843                                            'key', 'priv', 'cork',
844                                            'mobile', 'knock',
845                                            'ephemeral']) +
846                               [peer] +
847                               list(addr)))
848   def addr(me, peer):
849     return _oneline(me.command('ADDR', peer))
850   def algs(me, peer = None):
851     return _keyvals(me.command('ALGS',
852                                *((peer is not None and [peer]) or [])))
853   def checkchal(me, chal):
854     return _simple(me.command('CHECKCHAL', chal))
855   def daemon(me):
856     return _simple(me.command('DAEMON'))
857   def eping(me, peer, **kw):
858     return _oneline(me.command(bg = True,
859                                *['EPING'] +
860                                _kwopts(kw, ['timeout']) +
861                                [peer]))
862   def forcekx(me, peer, **kw):
863     return _simple(me.command(*['FORCEKX'] +
864                               _kwopts(kw, ["quiet"]) +
865                               [peer]))
866   def getchal(me):
867     return _oneline(me.command('GETCHAL', filter = _tokenjoin))
868   def greet(me, peer, chal):
869     return _simple(me.command('GREET', peer, chal))
870   def help(me):
871     return list(me.command('HELP', filter = _tokenjoin))
872   def ifname(me, peer):
873     return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
874   def kill(me, peer, **kw):
875     return _simple(me.command(*['KILL'] +
876                               _kwopts(kw, ["quiet"]) +
877                               [peer]))
878   def list(me):
879     return list(me.command('LIST', filter = _tokenjoin))
880   def notify(me, *msg):
881     return _simple(me.command('NOTIFY', *msg))
882   def peerinfo(me, peer):
883     return _keyvals(me.command('PEERINFO', peer))
884   def ping(me, peer, **kw):
885     return _oneline(me.command(bg = True,
886                                *['PING'] +
887                                _kwopts(kw, ['timeout']) +
888                                [peer]))
889   def port(me, af = None):
890     return _oneline(me.command('PORT',
891                                *((af is not None) and [af] or []),
892                                filter = _tokenjoin))
893   def quit(me):
894     return _simple(me.command('QUIT'))
895   def reload(me):
896     return _simple(me.command('RELOAD'))
897   def servinfo(me):
898     return _keyvals(me.command('SERVINFO'))
899   def setifname(me, new):
900     return _simple(me.command('SETIFNAME', new))
901   def svcclaim(me, service, version):
902     return _simple(me.command('SVCCLAIM', service, version))
903   def svcensure(me, service, version = None):
904     return _simple(me.command('SVCENSURE', service,
905                               *((version is not None and [version]) or [])))
906   def svcfail(me, job, *msg):
907     return _simple(me.command('SVCFAIL', job, *msg))
908   def svcinfo(me, job, *msg):
909     return _simple(me.command('SVCINFO', job, *msg))
910   def svclist(me):
911     return list(me.command('SVCLIST'))
912   def svcok(me, job):
913     return _simple(me.command('SVCOK', job))
914   def svcquery(me, service):
915     return _keyvals(me.command('SVCQUERY', service))
916   def svcrelease(me, service):
917     return _simple(me.command('SVCRELEASE', service))
918   def svcsubmit(me, service, *args, **kw):
919     return me.command(bg = True,
920                       *['SVCSUBMIT'] +
921                       _kwopts(kw, ['version']) +
922                       [service] +
923                       list(args))
924   def stats(me, peer):
925     return _keyvals(me.command('STATS', peer))
926   def trace(me, *args):
927     return _tracelike(me.command('TRACE', *args))
928   def tunnels(me):
929     return list(me.command('TUNNELS', filter = _tokenjoin))
930   def version(me):
931     return _oneline(me.command('VERSION', filter = _tokenjoin))
932   def warn(me, *msg):
933     return _simple(me.command('WARN', *msg))
934   def watch(me, *args):
935     return _tracelike(me.command('WATCH', *args))
936
937 ###--------------------------------------------------------------------------
938 ### Asynchronous commands.
939
940 class TripeAsynchronousCommand (TripeCommand):
941   """
942   Asynchronous commands.
943
944   This is the complicated way of issuing commands.  You must set up a queue,
945   and associate the command with the queue.  Responses arriving for the
946   command will be put on the queue as an triple of the form (TAG, CODE, REST)
947   -- where TAG is an object of your choice, not interpreted by this class,
948   CODE is the server's response code (`OK', `INFO', `FAIL', or `CONNERR'),
949   and REST is the list of the rest of the server's tokens.
950
951   Using this, you can write coroutines which process many commands (and
952   possibly other events) simultaneously.
953   """
954
955   def __init__(me, queue, tag, words):
956     """Make an asynchronous command consisting of the given WORDS, which
957     sends responses to QUEUE, labelled with TAG."""
958     TripeCommand.__init__(me, words)
959     me.queue = queue
960     me.tag = tag
961
962   def response(me, code, *stuff):
963     """Handle a server response by writing it to the caller's queue."""
964     me.queue.put((me.tag, code, list(stuff)))
965
966 ###--------------------------------------------------------------------------
967 ### Services.
968
969 class TripeJobCancelled (Exception):
970   """
971   Exception sent to job handler if the client kills the job.
972
973   Not propagated further.
974   """
975   pass
976
977 class TripeJobError (Exception):
978   """
979   Exception to cause failure report for running job.
980
981   Sends an SVCFAIL code back.
982   """
983   pass
984
985 class TripeSyntaxError (Exception):
986   """
987   Exception to report a syntax error for a job.
988
989   Sends an SVCFAIL bad-svc-syntax message back.
990   """
991   pass
992
993 class TripeServiceManager (TripeCommandDispatcher):
994   """
995   A command dispatcher with added handling for incoming service requests.
996
997   There is usually only one instance of this class, called svcmgr.  Some of
998   the support functions in this module assume that this is the case.
999
1000   To use, run `mLib.select' in a loop until the quitp method returns true;
1001   then, in a non-root coroutine, register your services by calling `add', and
1002   then call `running' when you've finished setting up.
1003
1004   The instance handles server service messages `SVCJOB', `SVCCANCEL' and
1005   `SVCCLAIM'.  It maintains a table of running services.  Incoming jobs cause
1006   the service's `job' method to be invoked; `SVCCANCEL' sends a
1007   `TripeJobCancelled' exception to the handler coroutine, and `SVCCLAIM'
1008   causes the relevant service to be deregistered.
1009
1010   There is no base class for jobs, but a job must implement two methods:
1011
1012   start()               Begin processing; might be a no-op.
1013
1014   cancel()              Stop processing; the original client has killed the
1015                         job.
1016
1017   The life of a service manager is divided into two parts: setup and running;
1018   you tell the manager that you've finished setting up by calling the
1019   `running' method.  If, at any point after setup is finished, there are no
1020   remaining services or jobs, `quitp' will return true, ending the process.
1021   """
1022
1023   ## --- Attributes ---
1024   ##
1025   ## svc                Mapping name -> service object
1026   ##
1027   ## job                Mapping jobid -> job handler coroutine
1028   ##
1029   ## runningp           True when setup is finished
1030   ##
1031   ## _quitp             True if explicit quit has been requested
1032
1033   def __init__(me, socket):
1034     """
1035     Initialize the service manager.
1036
1037     SOCKET is the administration socket to connect to.
1038     """
1039     TripeCommandDispatcher.__init__(me, socket)
1040     me.svc = {}
1041     me.job = {}
1042     me.runningp = False
1043     me.handler['SVCCANCEL'] = me._cancel
1044     me.handler['SVCJOB'] = me._job
1045     me.handler['SVCCLAIM'] = me._claim
1046     me._quitp = 0
1047
1048   def addsvc(me, svc):
1049     """Register a new service; SVC is a `TripeService' instance."""
1050     assert svc.name not in me.svc
1051     me.svcclaim(svc.name, svc.version)
1052     me.svc[svc.name] = svc
1053
1054   def _cancel(me, _, jid):
1055     """
1056     Called when the server cancels a job; invokes the job's `cancel' method.
1057     """
1058     job = me.job[jid]
1059     del me.job[jid]
1060     job.cancel()
1061
1062   def _claim(me, _, svc, __):
1063     """Called when another program claims our service at a higher version."""
1064     del me.svc[svc]
1065
1066   def _job(me, _, jid, svc, cmd, *args):
1067     """
1068     Called when the server sends us a job to do.
1069
1070     Calls the service to collect a job, and begins processing it.
1071     """
1072     assert jid not in me.job
1073     svc = me.svc[svc.lower()]
1074     job = svc.job(jid, cmd, args)
1075     me.job[jid] = job
1076     job.start()
1077
1078   def running(me):
1079     """Answer true if setup is finished."""
1080     me.runningp = True
1081
1082   def jobdone(me, jid):
1083     """Informs the service manager that the job with id JID has finished."""
1084     try:
1085       del me.job[jid]
1086     except KeyError:
1087       pass
1088
1089   def quitp(me):
1090     """
1091     Return true if no services or jobs are active (and, therefore, if this
1092     process can quit without anyone caring).
1093     """
1094     return me._quitp or (me.runningp and ((not me.svc and not me.job) or
1095                                           not me.sock))
1096
1097   def quit(me):
1098     """Forces the quit flag (returned by quitp) on."""
1099     me._quitp = True
1100
1101 class TripeService (object):
1102   """
1103   A standard service.
1104
1105   The NAME and VERSION are passed on to the server.  The CMDTAB is a
1106   dictionary mapping command names (in lowercase) to command objects.
1107
1108   If the CMDTAB doesn't have entries for commands `HELP' and `QUIT' then
1109   defaults are provided.
1110
1111   TripeService itself is mostly agnostic about the nature of command objects,
1112   but the TripeServiceJob class (below) has some requirements.  The built-in
1113   HELP command requires command objects to have `usage' attributes.
1114   """
1115
1116   def __init__(me, name, version, cmdtab):
1117     """
1118     Create and register a new service with the given NAME and VERSION.
1119
1120     CMDTAB maps command names (in lower-case) to command objects.
1121     """
1122     me.name = name
1123     me.version = version
1124     me.cmd = cmdtab
1125     me.activep = True
1126     me.cmd.setdefault('help',
1127                       TripeServiceCommand('help', 0, 0, '', me._help))
1128     me.cmd.setdefault('quit',
1129                       TripeServiceCommand('quit', 0, 0, '', me._quit))
1130
1131   def job(me, jid, cmd, args):
1132     """
1133     Called by the service manager: a job arrived with id JID.
1134
1135     It asks for comamnd CMD with argument list ARGS.  Creates a new job,
1136     passing it the information needed.
1137     """
1138     return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1139
1140   ## Simple default command handlers, complying with the spec in
1141   ## tripe-service(7).
1142
1143   def _help(me):
1144     """Send a help summary to the user."""
1145     cmds = me.cmd.items()
1146     cmds.sort()
1147     for name, cmd in cmds:
1148       svcinfo(name, *cmd.usage)
1149
1150   def _quit(me):
1151     """Terminate the service manager."""
1152     svcmgr.notify('svc-quit', me.name, 'admin-request')
1153     svcmgr.quit()
1154
1155 class TripeServiceCommand (object):
1156   """A simple service command."""
1157
1158   def __init__(me, name, min, max, usage, func):
1159     """
1160     Creates a new command.
1161
1162     NAME is the command's name (in lowercase).
1163
1164     MIN and MAX are the minimum and maximum number of allowed arguments (used
1165     for checking); either may be None to indicate no minimum or maximum.
1166
1167     USAGE is a usage string, used for generating help and error messages.
1168
1169     FUNC is the function to invoke.
1170     """
1171     me.name = name
1172     me.min = min
1173     me.max = max
1174     me.usage = usage.split()
1175     me.func = func
1176
1177   def run(me, *args):
1178     """
1179     Called when the command is invoked.
1180
1181     Does minimal checking of the arguments and calls the supplied function.
1182     """
1183     if (me.min is not None and len(args) < me.min) or \
1184        (me.max is not None and len(args) > me.max):
1185       raise TripeSyntaxError()
1186     me.func(*args)
1187
1188 class TripeServiceJob (Coroutine):
1189   """
1190   Job handler coroutine.
1191
1192   A standard `TripeService' invokes a `TripeServiceJob' for each incoming job
1193   request, passing it the jobid, command and arguments, and a command object.
1194   The command object needs the following attributes.
1195
1196   usage                 A usage list (excluding the command name) showing
1197                         arguments and options.
1198
1199   run(*ARGS)            Function to react to the command with ARGS split into
1200                         separate arguments.  Invoked in a coroutine.  The
1201                         `svcinfo function (not the `TripeCommandDispatcher'
1202                         method) may be used to send `INFO' lines.  The
1203                         function may raise `TripeJobError' to send a `FAIL'
1204                         response back, or `TripeSyntaxError' to send a
1205                         generic usage error.  `TripeJobCancelled' exceptions
1206                         are trapped silently.  Other exceptions are
1207                         translated into a generic internal-error message.
1208
1209   This class automatically takes care of sending some closing response to the
1210   job, and for informing the service manager that the job is completed.
1211
1212   The `jid' attribute stores the job's id.
1213   """
1214
1215   def __init__(me, jid, svc, cmd, command, args):
1216     """
1217     Start a new job.
1218
1219     The job is created with id JID, for service SVC, processing command name
1220     CMD (which the service resolved into the command object COMMAND, or
1221     `None'), and with the arguments ARGS.
1222     """
1223     Coroutine.__init__(me)
1224     me.jid = jid
1225     me.svc = svc
1226     me.cmd = cmd
1227     me.command = command
1228     me.args = args
1229
1230   def run(me):
1231     """
1232     Main body of the coroutine.
1233
1234     Does the tedious exception handling boilerplate and invokes the command's
1235     run method.
1236     """
1237     try:
1238       try:
1239         if me.command is None:
1240           svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1241         else:
1242           me.command.run(*me.args)
1243           svcmgr.svcok(me.jid)
1244       except TripeJobError, exc:
1245         svcmgr.svcfail(me.jid, *exc.args)
1246       except TripeSyntaxError:
1247         svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1248                        me.svc.name, me.command.name,
1249                        *me.command.usage)
1250       except TripeJobCancelled:
1251         pass
1252       except Exception, exc:
1253         svcmgr.svcfail(me.jid, 'svc-internal-error',
1254                        exc.__class__.__name__, str(exc))
1255     finally:
1256       svcmgr.jobdone(me.jid)
1257
1258   def start(me):
1259     """Invoked by the service manager to start running the coroutine."""
1260     me.switch()
1261
1262   def cancel(me):
1263     """Invoked by the service manager to cancel the job."""
1264     me.throw(TripeJobCancelled())
1265
1266 def svcinfo(*args):
1267   """
1268   If invoked from a TripeServiceJob coroutine, sends an `INFO' line to the
1269   job's sender, automatically using the correct job id.
1270   """
1271   svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1272
1273 def _setupsvc(tab, func):
1274   """
1275   Setup coroutine for setting up service programs.
1276
1277   Register the given services.
1278   """
1279   try:
1280     for service in tab:
1281       svcmgr.addsvc(service)
1282     if func:
1283       func()
1284   finally:
1285     svcmgr.running()
1286
1287 svcmgr = TripeServiceManager(None)
1288 def runservices(socket, tab, init = None, setup = None, daemon = False):
1289   """
1290   Function to start a service provider.
1291
1292   SOCKET is the socket to connect to, usually tripesock.
1293
1294   TAB is a list of entries.  An entry may be either a tuple
1295
1296     (NAME, VERSION, COMMANDS)
1297
1298   or a service object (e.g., a `TripeService' instance).
1299
1300   COMMANDS is a dictionary mapping command names to tuples
1301
1302     (MIN, MAX, USAGE, FUNC)
1303
1304   of arguments for a `TripeServiceCommand' object.
1305
1306   If DAEMON is true, then the process is forked into the background before we
1307   start.  If INIT is given, it is called in the main coroutine, immediately
1308   after forking.  If SETUP is given, it is called in a coroutine, after
1309   calling INIT and setting up the services but before marking the service
1310   manager as running.
1311
1312   It is a really bad idea to do any initialization, particularly setting up
1313   coroutines, outside of the INIT or SETUP functions.  In particular, if
1314   we're using rmcr for fake coroutines, the daemonizing fork will kill off
1315   the currently established coroutines in a most surprising way.
1316
1317   The function runs a main select loop until the service manager decides to
1318   quit.
1319   """
1320
1321   svcmgr.socket = socket
1322   svcmgr.connect()
1323   svcs = []
1324   for service in tab:
1325     if not isinstance(service, tuple):
1326       svcs.append(service)
1327     else:
1328       name, version, commands = service
1329       cmdmap = {}
1330       for cmd, stuff in commands.iteritems():
1331         cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1332       svcs.append(TripeService(name, version, cmdmap))
1333   if daemon:
1334     M.daemonize()
1335   if init is not None:
1336     init()
1337   spawn(_setupsvc, svcs, setup)
1338   svcmgr.mainloop()
1339
1340 ###--------------------------------------------------------------------------
1341 ### Utilities for services.
1342
1343 _timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
1344 def timespec(spec):
1345   """Parse the timespec SPEC, returning a number of seconds."""
1346   mul = 1
1347   if len(spec) > 1 and spec[-1] in _timeunits:
1348     mul = _timeunits[spec[-1]]
1349     spec = spec[:-1]
1350   try:
1351     t = int(spec)
1352   except:
1353     raise TripeJobError('bad-time-spec', spec)
1354   if t < 0:
1355     raise TripeJobError('bad-time-spec', spec)
1356   return mul * int(spec)
1357
1358 class OptParse (object):
1359   """
1360   Parse options from a command list in the conventional fashion.
1361
1362   ARGS is a list of arguments to a command.  ALLOWED is a sequence of allowed
1363   options.  The returned values are the option tags.  During parsing, the
1364   `arg' method may be used to retrieve the argument for the most recent
1365   option.  Afterwards, `rest' may be used to retrieve the remaining
1366   non-option arguments, and do a simple check on how many there are.
1367
1368   The parser correctly handles `--' option terminators.
1369   """
1370
1371   def __init__(me, args, allowed):
1372     """
1373     Create a new option parser.
1374
1375     The parser will scan the ARGS for options given in the sequence ALLOWED
1376     (which are expected to include the `-' prefix).
1377     """
1378     me.allowed = {}
1379     for a in allowed:
1380       me.allowed[a] = True
1381     me.args = list(args)
1382
1383   def __iter__(me):
1384     """Iterator protocol: I am my own iterator."""
1385     return me
1386
1387   def next(me):
1388     """
1389     Iterator protocol: return the next option.
1390
1391     If we've run out, raise `StopIteration'.
1392     """
1393     if len(me.args) == 0 or \
1394        len(me.args[0]) < 2 or \
1395        not me.args[0].startswith('-'):
1396       raise StopIteration()
1397     opt = me.args.pop(0)
1398     if opt == '--':
1399       raise StopIteration()
1400     if opt not in me.allowed:
1401       raise TripeSyntaxError()
1402     return opt
1403
1404   def arg(me):
1405     """
1406     Return the argument for the most recent option.
1407
1408     If none is available, raise `TripeSyntaxError'.
1409     """
1410     if len(me.args) == 0:
1411       raise TripeSyntaxError()
1412     return me.args.pop(0)
1413
1414   def rest(me, min = None, max = None):
1415     """
1416     After option parsing is done, return the remaining arguments.
1417
1418     Check that there are at least MIN and at most MAX arguments remaining --
1419     either may be None to suppress the check.
1420     """
1421     if (min is not None and len(me.args) < min) or \
1422        (max is not None and len(me.args) > max):
1423       raise TripeSyntaxError()
1424     return me.args
1425
1426 ###----- That's all, folks --------------------------------------------------