chiark / gitweb /
py: New Python module for writing services and suchlike
[tripe] / py / tripe.py.in
1 ### -*-python-*-
2 ###
3 ### Administration connection with tripe server
4 ###
5 ### (c) 2006 Straylight/Edgeware
6 ###
7
8 ###----- Licensing notice ---------------------------------------------------
9 ###
10 ### This file is part of Trivial IP Encryption (TrIPE).
11 ###
12 ### TrIPE is free software; you can redistribute it and/or modify
13 ### it under the terms of the GNU General Public License as published by
14 ### the Free Software Foundation; either version 2 of the License, or
15 ### (at your option) any later version.
16 ###
17 ### TrIPE is distributed in the hope that it will be useful,
18 ### but WITHOUT ANY WARRANTY; without even the implied warranty of
19 ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 ### GNU General Public License for more details.
21 ###
22 ### You should have received a copy of the GNU General Public License
23 ### along with TrIPE; if not, write to the Free Software Foundation,
24 ### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25
26 """
27 This module provides classes and functions for connecting to a running tripe
28 server, sending it commands, receiving and processing replies, and
29 implementing services.
30
31 Rather than end up in lost in a storm of little event-driven classes, or a
32 morass of concurrent threads, the module uses coroutines to present a fairly
33 simple function call/return interface to potentially long-running commands
34 which must run without blocking the main process.  It sassumes a coroutine
35 module presenting a subset of the `greenlet' interface: if actual greenlets
36 are available, they are used; otherwise there's an implementation in terms of
37 threads (with lots of locking) which will do instead.
38
39 The simple rule governing the coroutines used here is this:
40
41   * The root coroutine never cares what values are passed to it when it
42     resumes: it just discards them.
43
44   * Other, non-root, coroutines are presumed to be waiting for some specific
45     thing.
46
47 Configuration variables:
48         configdir
49         socketdir
50         PACKAGE
51         VERSION
52         tripesock
53         peerdb
54
55 Other useful variables:
56         rootcr
57         svcmgr
58
59 Other tweakables:
60         _debug
61
62 Exceptions:
63         Exception
64           StandardError
65             TripeConnectionError
66             TripeError
67             TripeInternalError
68           TripeJobCancelled
69           TripeJobError
70           TripeSyntaxError
71
72 Classes:
73         _Coroutine
74           Coroutine
75             TripeServiceJob
76         OptParse
77         Queue
78         TripeCommand
79           TripeSynchronousCommand
80           TripeAsynchronousCommand
81         TripeCommandIterator
82         TripeConnection
83           TripeCommandDispatcher
84             SelCommandDispatcher
85               TripeServiceManager
86         TripeService
87         TripeServiceCommand
88
89 Utility functions:
90         quotify
91         runservices
92         spawn
93         svcinfo
94         timespec
95 """
96
97 __pychecker__ = 'self=me no-constCond no-argsused'
98
99 _debug = False
100
101 ###--------------------------------------------------------------------------
102 ### External dependencies.
103
104 import socket as S
105 import errno as E
106 import mLib as M
107 import re as RX
108 import sys as SYS
109 import os as OS
110
111 try:
112   if OS.getenv('TRIPE_FORCE_RMCR') is not None:
113     raise ImportError
114   from py.magic import greenlet as _Coroutine
115 except ImportError:
116   from rmcr import Coroutine as _Coroutine
117
118 ###--------------------------------------------------------------------------
119 ### Coroutine hacking.
120
121 rootcr = _Coroutine.getcurrent()
122
123 class Coroutine (_Coroutine):
124   """
125   A coroutine class which can only be invoked by the root coroutine.
126
127   The root, by construction, cannot be an instance of this class.
128   """
129   def switch(me, *args, **kw):
130     assert _Coroutine.getcurrent() is rootcr
131     _Coroutine.switch(me, *args, **kw)
132
133 ###--------------------------------------------------------------------------
134 ### Default places for things.
135
136 configdir = OS.environ.get('TRIPEDIR', "@configdir@")
137 socketdir = "@socketdir@"
138 PACKAGE = "@PACKAGE@"
139 VERSION = "@VERSION@"
140
141 tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
142
143 ###--------------------------------------------------------------------------
144 ### Connection to the server.
145
146 def readnonblockingly(sock, len):
147   """
148   Nonblocking read from SOCK.
149
150   Try to return LEN bytes.  If couldn't read anything, return None.  EOF is
151   returned as an empty string.
152   """
153   try:
154     sock.setblocking(0)
155     return sock.recv(len)
156   except S.error, exc:
157     if exc[0] == E.EWOULDBLOCK:
158       return None
159     raise
160
161 class TripeConnectionError (StandardError):
162   """Something happened to the connection with the server."""
163   pass
164 class TripeInternalError (StandardError):
165   """This program is very confused."""
166   pass
167
168 class TripeConnection (object):
169   """
170   A logical connection to the tripe administration socket.
171
172   There may or may not be a physical connection.  (This is needed for the
173   monitor, for example.)
174
175   This class isn't very useful on its own, but it has useful subclasses.  At
176   this level, the class is agnostic about I/O multiplexing schemes; that gets
177   added later.
178   """
179
180   def __init__(me, socket):
181     """
182     Make a connection to the named SOCKET.
183
184     No physical connection is made initially.
185     """
186     me.socket = socket
187     me.sock = None
188     me.lbuf = None
189
190   def connect(me):
191     """
192     Ensure that there's a physical connection.
193
194     Do nothing if we're already connected.  Invoke the `connected' method if
195     successful.
196     """
197     if me.sock: return
198     sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
199     sock.connect(me.socket)
200     me.sock = sock
201     me.lbuf = M.LineBuffer(me.line, me._eof)
202     me.lbuf.size = 1024
203     me.connected()
204     return me
205
206   def disconnect(me, reason):
207     """
208     Disconnect the physical connection.
209
210     Invoke the `disconnected' method, giving the provided REASON, which
211     should be either None or an exception.
212     """
213     if not me.sock: return
214     me.disconnected(reason)
215     me.sock.close()
216     me.sock = None
217     me.lbuf.disable()
218     me.lbuf = None
219     return me
220
221   def connectedp(me):
222     """
223     Return true if there's a current, believed-good physical connection.
224     """
225     return me.sock is not None
226
227   __nonzero__ = connectedp
228
229   def send(me, line):
230     """
231     Send the LINE to the connection's socket.
232
233     All output is done through this method; it can be overridden to provide
234     proper nonblocking writing, though this seems generally unnecessary.
235     """
236     try:
237       me.sock.setblocking(1)
238       me.sock.send(line + '\n')
239     except Exception, exc:
240       me.disconnect(exc)
241       raise
242     return me
243
244   def receive(me):
245     """
246     Receive whatever's ready from the connection's socket.
247
248     Call `line' on each complete line, and `eof' if the connection closed.
249     Subclasses which attach this class to an I/O-event system should call
250     this method when the socket (CONN.sock) is ready for reading.
251     """
252     while me.sock is not None:
253       try:
254         buf = readnonblockingly(me.sock, 16384)
255       except Exception, exc:
256         me.disconnect(exc)
257         raise
258       if buf is None:
259         return me
260       if buf == '':
261         me._eof()
262         return me
263       me.lbuf.flush(buf)
264     return me
265
266   def _eof(me):
267     """Internal end-of-file handler."""
268     me.disconnect(TripeConnectionError('connection lost'))
269     me.eof()
270
271   def connected(me):
272     """
273     To be overridden by subclasses to react to a connection being
274     established.
275     """
276     pass
277
278   def disconnected(me, reason):
279     """
280     To be overridden by subclasses to react to a connection being severed.
281     """
282     pass
283
284   def eof(me):
285     """To be overridden by subclasses to handle end-of-file."""
286     pass
287
288   def line(me, line):
289     """To be overridden by subclasses to handle incoming lines."""
290     pass
291
292 ###--------------------------------------------------------------------------
293 ### Dispatching coroutine.
294
295 ## Match a string if it can stand on its own as a bareword: i.e., it doesn't
296 ## contain backslashes, quotes or whitespace.
297 rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
298
299 ## Match characters which need to be escaped, even in quoted text.
300 rx_weird = RX.compile(r'([\\\'])')
301
302 def quotify(s):
303   """Quote S according to the tripe-admin(5) rules."""
304   m = rx_ordinary.match(s)
305   if m and m.end() == len(s):
306     return s
307   else:
308     return "'" + rx_weird.sub(r'\\\1', s) + "'"
309
310 def _callback(func):
311   """
312   Return a wrapper for FUNC which reports exceptions thrown by it.
313
314   Useful in the case of callbacks invoked by C functions which ignore
315   exceptions.
316   """
317   def _(*a, **kw):
318     try:
319       return func(*a, **kw)
320     except:
321       SYS.excepthook(*SYS.exc_info())
322       raise
323   return _
324
325 class TripeCommand (object):
326   """
327   This abstract class represents a command in progress.
328
329   The `words' attribute contains the list of tokens which make up the
330   command.
331
332   Subclasses must implement a method to handle server responses:
333
334     * response(CODE, *ARGS): CODE is one of the strings 'OK', 'INFO' or
335       'FAIL'; ARGS are the remaining tokens from the server's response.
336   """
337
338   def __init__(me, words):
339     """Make a new command consisting of the given list of WORDS."""
340     me.words = words
341
342 class TripeSynchronousCommand (TripeCommand):
343   """
344   A simple command, processed apparently synchronously.
345
346   Must be invoked from a coroutine other than the root (or whichever one is
347   running the dispatcher); in reality, other coroutines carry on running
348   while we wait for a response from the server.
349
350   Each server response causes the calling coroutine to be resumed with the
351   pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
352   or `FAIL') and REST is a list of the server's other response tokens.  The
353   calling coroutine must continue switching back to the dispatcher until a
354   terminating response (`OK' or `FAIL') is received or become very
355   confused.
356
357   Mostly it's better to use the TripeCommandIterator to do this
358   automatically.
359   """
360
361   def __init__(me, words):
362     """Initialize the command, specifying the WORDS to send to the server."""
363     TripeCommand.__init__(me, words)
364     me.owner = Coroutine.getcurrent()
365
366   def response(me, code, *rest):
367     """Handle a server response by forwarding it to the calling coroutine."""
368     me.owner.switch((code, rest))
369
370 class TripeError (StandardError):
371   """
372   A tripe command failed with an error (a FAIL code).  The args attribute
373   contains a list of the server's message tokens.
374   """
375   pass
376
377 class TripeCommandIterator (object):
378   """
379   Iterator interface to a tripe command.
380
381   The values returned by the iterator are lists of tokens from the server's
382   INFO lines, as processed by the given filter function, if any.  The
383   iterator completes normally (by raising StopIteration) if the server
384   reported OK, and raises an exception if the command failed for some reason.
385
386   A TripeError is raised if the server issues a FAIL code.  If the connection
387   failed, some other exception is raised.
388   """
389
390   def __init__(me, dispatcher, words, bg = False, filter = None):
391     """
392     Create a new command iterator.
393
394     The command is submitted to the DISPATCHER; it consists of the given
395     WORDS.  If BG is true, then an option is inserted to request that the
396     server run the command in the background.  The FILTER is applied to the
397     token lists which the server responds, and the filter's output are the
398     items returned by the iterator.
399     """
400     me.dcr = Coroutine.getcurrent().parent
401     if me.dcr is None:
402       raise ValueError, 'must invoke from coroutine'
403     me.filter = filter or (lambda x: x)
404     if bg:
405       words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
406     dispatcher.rawcommand(TripeSynchronousCommand(words))
407
408   def __iter__(me):
409     """Iterator protocol: I am my own iterator."""
410     return me
411
412   def next(me):
413     """
414     Iterator protocol: return the next piece of information from the server.
415
416     INFO responses are filtered and returned as the values of the iteration.
417     FAIL and CONNERR responses are turned into exceptions and raised.
418     Finally, OK is turned into StopIteration, which should cause a normal end
419     to the iteration process.
420     """
421     thing = me.dcr.switch()
422     code, rest = thing
423     if code == 'INFO':
424       return me.filter(rest)
425     elif code == 'OK':
426       raise StopIteration
427     elif code == 'CONNERR':
428       if rest is None:
429         raise TripeConnectionError, 'connection terminated by user'
430       else:
431         raise rest
432     elif code == 'FAIL':
433       raise TripeError(*rest)
434     else:
435       raise TripeInternalError \
436             ('unexpected tripe response %r' % ([code] + rest))
437
438 ### Simple utility functions for the TripeCommandIterator convenience
439 ### methods.
440
441 def _tokenjoin(words):
442   """Filter function: simply join the given tokens with spaces between."""
443   return ' '.join(words)
444
445 def _keyvals(iter):
446   """Return a dictionary formed from the KEY=VALUE pairs returned by the
447   iterator ITER."""
448   kv = {}
449   for ww in iter:
450     for w in ww:
451       q = w.index('=')
452       kv[w[:q]] = w[q + 1:]
453   return kv
454
455 def _simple(iter):
456   """Raise an error if ITER contains any item."""
457   stuff = list(iter)
458   if len(stuff) != 0:
459     raise TripeInternalError('expected no response')
460   return None
461
462 def _oneline(iter):
463   """If ITER contains a single item, return it; otherwise raise an error."""
464   stuff = list(iter)
465   if len(stuff) != 1:
466     raise TripeInternalError('expected only one line of response')
467   return stuff[0]
468
469 def _tracelike(iter):
470   """Handle a TRACE-like command.  The result is a list of tuples (CHAR,
471   STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
472   disabled, `+' if enabled, maybe something else later), and DESC is the
473   human-readable description."""
474   stuff = []
475   for ww in iter:
476     ch = ww[0][0]
477     st = ww[0][1:]
478     desc = ' '.join(ww[1:])
479     stuff.append((ch, st, desc))
480   return stuff
481
482 def _kwopts(kw, allowed):
483   """Parse keyword arguments into options.  ALLOWED is a list of allowable
484   keywords; raise errors if other keywords are present.  KEY = VALUE becomes
485   an option pair -KEY VALUE if VALUE is a string, just the option -KEY if
486   VALUE is a true non-string, or nothing if VALUE is false..  Insert a `--'
487   at the end to stop the parser getting confused."""
488   opts = []
489   amap = {}
490   for a in allowed: amap[a] = True
491   for k, v in kw.iteritems():
492     if k not in amap:
493       raise ValueError('option %s not allowed here' % k)
494     if isinstance(v, str):
495       opts += ['-' + k, v]
496     elif v:
497       opts += ['-' + k]
498   opts.append('--')
499   return opts
500
501 class TripeCommandDispatcher (TripeConnection):
502   """
503   Command dispatcher.
504
505   The command dispatcher is a connection which knows how to handle commands.
506   This is probably the most important class in this module to understand.
507
508   Lines from the server are parsed into tokens.  The first token is a code
509   (OK or NOTE or something) explaining what kind of line this is.  The
510   `handler' attribute is a dictionary mapping server line codes to handler
511   functions, which are applied to the words of the line as individual
512   arguments.  *Exception*: the content of TRACE lines is not tokenized.
513
514   There are default handlers for server codes which respond to commands.
515   Commands arrive as TripeCommand instances through the `rawcommand'
516   interface.  The dispatcher keeps track of which command objects represent
517   which jobs, and sends responses on to the appropriate command objects by
518   invoking their `response' methods.  Command objects don't see the
519   BG... codes, because the dispatcher has already transformed them into
520   regular codes when it was looking up job code.
521
522   The dispatcher also has a special response code of its own: CONNERR
523   indicates that the connection failed and the command has therefore been
524   lost; the
525   """
526
527   ## --- Infrastructure ---
528   ##
529   ## We will get confused if we pipeline commands.  Send them one at a time.
530   ## Only send a command when the previous one detaches or completes.
531   ##
532   ## The following attributes are interesting:
533   ##
534   ## tagseq             Sequence number for next background job (for bgtag)
535   ##
536   ## queue              Commands awaiting submission.
537   ##
538   ## cmd                Mapping from job tags to commands: cmd[None] is the
539   ##                    foreground command.
540   ##
541   ## handler            Mapping from server codes to handler functions.
542
543   def __init__(me, socket):
544     """
545     Initialize the dispatcher.
546
547     The SOCKET is the filename of the administration socket to connect to,
548     for TripeConnection.__init__.
549     """
550     TripeConnection.__init__(me, socket)
551     me.tagseq = 0
552     me.handler = {}
553     me.handler['BGDETACH'] = me._detach
554     for i in 'BGOK', 'BGINFO', 'BGFAIL':
555       me.handler[i] = me._response
556     for i in 'OK', 'INFO', 'FAIL':
557       me.handler[i] = me._fgresponse
558
559   def connected(me):
560     """
561     Connection hook.
562
563     If a subclass overrides this method, it must call us; clears out the
564     command queue and job map.
565     """
566     me.queue = M.Array()
567     me.cmd = {}
568
569   def disconnected(me, reason):
570     """
571     Disconnection hook.
572
573     If a subclass hooks overrides this method, it must call us; sends a
574     special CONNERR code to all incomplete commands.
575     """
576     for cmd in me.cmd.itervalues():
577       cmd.response('CONNERR', reason)
578     for cmd in me.queue:
579       cmd.response('CONNERR', reason)
580
581   @_callback
582   def line(me, line):
583     """Handle an incoming line, sending it to the right place."""
584     if _debug: print '<', line
585     code, rest = M.word(line, quotep = True)
586     func = me.handler.get(code)
587     if func is not None:
588       if code == 'TRACE':
589         func(code, rest)
590       else:
591         func(code, *M.split(rest, quotep = True)[0])
592       me.dequeue()
593
594   def dequeue(me):
595     """
596     Pull the oldest command off the queue and try to send it to the server.
597     """
598     if not me.queue or None in me.cmd: return
599     cmd = me.queue.shift()
600     if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
601     me.send(' '.join([quotify(w) for w in cmd.words]))
602     me.cmd[None] = cmd
603
604   def bgtag(me):
605     """
606     Return an unused job tag.
607
608     May be of use when composing commands by hand.
609     """
610     tag = 'J%05d' % me.tagseq
611     me.tagseq += 1
612     return tag
613
614   ## --- Built-in handler functions for server responses ---
615
616   def _detach(me, _, tag):
617     """
618     Respond to a BGDETACH TAG message.
619
620     Move the current foreground command to the background.
621     """
622     assert tag not in me.cmd
623     me.cmd[tag] = me.cmd[None]
624     del me.cmd[None]
625
626   def _response(me, code, tag, *w):
627     """
628     Respond to an OK, INFO or FAIL message.
629
630     If this is a message for a background job, find the tag; then dispatch
631     the result to the command object.
632     """
633     if code.startswith('BG'):
634       code = code[2:]
635     cmd = me.cmd[tag]
636     if code != 'INFO':
637       del me.cmd[tag]
638     cmd.response(code, *w)
639
640   def _fgresponse(me, code, *w):
641     """Process responses to the foreground command."""
642     me._response(code, None, *w)
643
644   ## --- Interface methods ---
645
646   def rawcommand(me, cmd):
647     """
648     Submit the TripeCommand CMD to the server, and look after it until it
649     completes.
650     """
651     if not me.connectedp():
652       raise TripeConnectionError('connection closed')
653     me.queue.push(cmd)
654     me.dequeue()
655
656   def command(me, *cmd, **kw):
657     """Convenience wrapper for creating a TripeCommandIterator object."""
658     return TripeCommandIterator(me, cmd, **kw)
659
660   ## --- Convenience methods for server commands ---
661
662   def add(me, peer, *addr, **kw):
663     return _simple(me.command(bg = True,
664                               *['ADD'] +
665                               _kwopts(kw, ['tunnel', 'keepalive', 'cork']) +
666                               [peer] +
667                               list(addr)))
668   def addr(me, peer):
669     return _oneline(me.command('ADDR', peer))
670   def algs(me):
671     return _keyvals(me.command('ALGS'))
672   def checkchal(me, chal):
673     return _simple(me.command('CHECKCHAL', chal))
674   def daemon(me):
675     return _simple(me.command('DAEMON'))
676   def eping(me, peer, **kw):
677     return _oneline(me.command(bg = True,
678                                *['PING'] +
679                                _kwopts(kw, ['timeout']) +
680                                [peer]))
681   def forcekx(me, peer):
682     return _simple(me.command('FORCEKX', peer))
683   def getchal(me):
684     return _oneline(me.command('GETCHAL', filter = _tokenjoin))
685   def greet(me, peer, chal):
686     return _simple(me.command('GREET', peer, chal))
687   def help(me):
688     return list(me.command('HELP', filter = _tokenjoin))
689   def ifname(me, peer):
690     return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
691   def kill(me, peer):
692     return _simple(me.command('KILL', peer))
693   def list(me):
694     return list(me.command('LIST', filter = _tokenjoin))
695   def notify(me, *msg):
696     return _simple(me.command('NOTIFY', *msg))
697   def peerinfo(me, peer):
698     return _keyvals(me.command('PEERINFO', peer))
699   def ping(me, peer, **kw):
700     return _oneline(me.command(bg = True,
701                                *['PING'] +
702                                _kwopts(kw, ['timeout']) +
703                                [peer]))
704   def port(me):
705     return _oneline(me.command('PORT', filter = _tokenjoin))
706   def quit(me):
707     return _simple(me.command('QUIT'))
708   def reload(me):
709     return _simple(me.command('RELOAD'))
710   def servinfo(me):
711     return _keyvals(me.command('SERVINFO'))
712   def setifname(me, new):
713     return _simple(me.command('SETIFNAME', new))
714   def svcclaim(me, service, version):
715     return _simple(me.command('SVCCLAIM', service, version))
716   def svcensure(me, service, version = None):
717     return _simple(me.command('SVCENSURE', service,
718                               *((version is not None and [version]) or [])))
719   def svcfail(me, job, *msg):
720     return _simple(me.command('SVCFAIL', job, *msg))
721   def svcinfo(me, job, *msg):
722     return _simple(me.command('SVCINFO', job, *msg))
723   def svclist(me):
724     return list(me.command('SVCLIST'))
725   def svcok(me, job):
726     return _simple(me.command('SVCOK', job))
727   def svcquery(me, service):
728     return _keyvals(me.command('SVCQUERY', service))
729   def svcrelease(me, service):
730     return _simple(me.command('SVCRELEASE', service))
731   def svcsubmit(me, service, *args, **kw):
732     return me.command(bg = True,
733                       *['SVCSUBMIT'] +
734                       _kwopts(kw, ['version']) +
735                       [service] +
736                       list(args))
737   def stats(me, peer):
738     return _keyvals(me.command('STATS', peer))
739   def trace(me, *args):
740     return _tracelike(me.command('TRACE', *args))
741   def tunnels(me):
742     return list(me.command('TUNNELS', filter = _tokenjoin))
743   def version(me):
744     return _oneline(me.command('VERSION', filter = _tokenjoin))
745   def warn(me, *msg):
746     return _simple(me.command('WARN', *msg))
747   def watch(me, *args):
748     return _tracelike(me.command('WATCH', *args))
749
750 ###--------------------------------------------------------------------------
751 ### Asynchronous commands.
752
753 class Queue (object):
754   """
755   A queue of things arriving asynchronously.
756
757   This is a very simple single-reader multiple-writer queue.  It's useful for
758   more complex coroutines which need to cope with a variety of possible
759   incoming events.
760   """
761
762   def __init__(me):
763     """Create a new empty queue."""
764     me.contents = M.Array()
765     me.waiter = None
766
767   def _wait(me):
768     """
769     Internal: wait for an item to arrive in the queue.
770
771     Complain if someone is already waiting, because this is just a
772     single-reader queue.
773     """
774     if me.waiter:
775       raise ValueError('queue already being waited on')
776     try:
777       me.waiter = Coroutine.getcurrent()
778       while not me.contents:
779         me.waiter.parent.switch()
780     finally:
781       me.waiter = None
782
783   def get(me):
784     """
785     Remove and return the item at the head of the queue.
786
787     If the queue is empty, wait until an item arrives.
788     """
789     me._wait()
790     return me.contents.shift()
791
792   def peek(me):
793     """
794     Return the item at the head of the queue without removing it.
795
796     If the queue is empty, wait until an item arrives.
797     """
798     me._wait()
799     return me.contents[0]
800
801   def put(me, thing):
802     """
803     Write THING to the queue.
804
805     If someone is waiting on the queue, wake him up immediately; otherwise
806     just leave the item there for later.
807     """
808     me.contents.push(thing)
809     if me.waiter:
810       me.waiter.switch()
811
812 class TripeAsynchronousCommand (TripeCommand):
813   """
814   Asynchronous commands.
815
816   This is the complicated way of issuing commands.  You must set up a queue,
817   and associate the command with the queue.  Responses arriving for the
818   command will be put on the queue as an triple of the form (TAG, CODE, REST)
819   -- where TAG is an object of your choice, not interpreted by this class,
820   CODE is the server's response code (OK, INFO, FAIL), and REST is the list
821   of the rest of the server's tokens.
822
823   Using this, you can write coroutines which process many commands (and
824   possibly other events) simultaneously.
825   """
826
827   def __init__(me, queue, tag, words):
828     """Make an asynchronous command consisting of the given WORDS, which
829     sends responses to QUEUE, labelled with TAG."""
830     TripeCommand.__init__(me, words)
831     me.queue = queue
832     me.tag = tag
833
834   def response(me, code, *stuff):
835     """Handle a server response by writing it to the caller's queue."""
836     me.queue.put((me.tag, code, list(stuff)))
837
838 ###--------------------------------------------------------------------------
839 ### Selecting command dispatcher.
840
841 class SelCommandDispatcher (TripeCommandDispatcher):
842   """
843   A command dispatcher which integrates with mLib's I/O-event system.
844
845   To use, simply create an instance and run mLib.select in a loop in your
846   main coroutine.
847   """
848
849   def __init__(me, socket):
850     """
851     Create an instance; SOCKET is the admin socket to connect to.
852
853     Note that no connection is made initially.
854     """
855     TripeCommandDispatcher.__init__(me, socket)
856     me.selfile = None
857
858   def connected(me):
859     """Connection hook: wires itself into the mLib select machinery."""
860     TripeCommandDispatcher.connected(me)
861     me.selfile = M.SelFile(me.sock.fileno(), M.SEL_READ, me.receive)
862     me.selfile.enable()
863
864   def disconnected(me, reason):
865     """Disconnection hook: removes itself from the mLib select machinery."""
866     TripeCommandDispatcher.disconnected(me, reason)
867     me.selfile = None
868
869 ###--------------------------------------------------------------------------
870 ### Services.
871
872 class TripeJobCancelled (Exception):
873   """
874   Exception sent to job handler if the client kills the job.
875
876   Not propagated further.
877   """
878   pass
879
880 class TripeJobError (Exception):
881   """
882   Exception to cause failure report for running job.
883
884   Sends an SVCFAIL code back.
885   """
886   pass
887
888 class TripeSyntaxError (Exception):
889   """
890   Exception to report a syntax error for a job.
891
892   Sends an SVCFAIL bad-svc-syntax message back.
893   """
894   pass
895
896 class TripeServiceManager (SelCommandDispatcher):
897   """
898   A command dispatcher with added handling for incoming service requests.
899
900   There is usually only one instance of this class, called svcmgr.  Some of
901   the support functions in this module assume that this is the case.
902
903   To use, run mLib.select in a loop until the quitp method returns true;
904   then, in a non-root coroutine, register your services by calling `add', and
905   then call `running' when you've finished setting up.
906
907   The instance handles server service messages SVCJOB, SVCCANCEL and
908   SVCCLAIM.  It maintains a table of running services.  Incoming jobs cause
909   the service's `job' method to be invoked; SVCCANCEL sends a
910   TripeJobCancelled exception to the handler coroutine, and SVCCLAIM causes
911   the relevant service to be deregistered.
912
913   There is no base class for jobs, but a job must implement two methods:
914
915   start()               Begin processing; might be a no-op.
916
917   cancel()              Stop processing; the original client has killed the
918                         job.
919
920   The life of a service manager is divided into two parts: setup and running;
921   you tell the manager that you've finished setting up by calling the
922   `running' method.  If, at any point after setup is finished, there are no
923   remaining services or jobs, `quitp' will return true, ending the process.
924   """
925
926   ## --- Attributes ---
927   ##
928   ## svc                Mapping name -> service object
929   ##
930   ## job                Mapping jobid -> job handler coroutine
931   ##
932   ## runningp           True when setup is finished
933   ##
934   ## _quitp             True if explicit quit has been requested
935
936   def __init__(me, socket):
937     """
938     Initialize the service manager.
939
940     SOCKET is the administration socket to connect to.
941     """
942     SelCommandDispatcher.__init__(me, socket)
943     me.svc = {}
944     me.job = {}
945     me.runningp = False
946     me.handler['SVCCANCEL'] = me._cancel
947     me.handler['SVCJOB'] = me._job
948     me.handler['SVCCLAIM'] = me._claim
949     me._quitp = 0
950
951   def addsvc(me, svc):
952     """Register a new service; SVC is a TripeService instance."""
953     assert svc.name not in me.svc
954     me.svcclaim(svc.name, svc.version)
955     me.svc[svc.name] = svc
956
957   def _cancel(me, _, jid):
958     """
959     Called when the server cancels a job; invokes the job's `cancel' method.
960     """
961     job = me.job[jid]
962     del me.job[jid]
963     job.cancel()
964
965   def _claim(me, _, svc, __):
966     """Called when another program claims our service at a higher version."""
967     del me.svc[svc]
968
969   def _job(me, _, jid, svc, cmd, *args):
970     """
971     Called when the server sends us a job to do.
972
973     Calls the service to collect a job, and begins processing it.
974     """
975     assert jid not in me.job
976     svc = me.svc[svc.lower()]
977     job = svc.job(jid, cmd, args)
978     me.job[jid] = job
979     job.start()
980
981   def running(me):
982     """Answer true if setup is finished."""
983     me.runningp = True
984
985   def jobdone(me, jid):
986     """Informs the service manager that the job with id JID has finished."""
987     try:
988       del me.job[jid]
989     except KeyError:
990       pass
991
992   def quitp(me):
993     """
994     Return true if no services or jobs are active (and, therefore, if this
995     process can quit without anyone caring).
996     """
997     return me._quitp or (me.runningp and ((not me.svc and not me.job) or
998                                           not me.selfile))
999
1000   def quit(me):
1001     """Forces the quit flag (returned by quitp) on."""
1002     me._quitp = True
1003
1004 class TripeService (object):
1005   """
1006   A standard service.
1007
1008   The NAME and VERSION are passed on to the server.  The CMDTAB is a
1009   dictionary mapping command names (in lowercase) to command objects.
1010
1011   If the CMDTAB doesn't have entries for commands HELP and QUIT then defaults
1012   are provided.
1013
1014   TripeService itself is mostly agnostic about the nature of command objects,
1015   but the TripeServiceJob class (below) has some requirements.  The built-in
1016   HELP command requires command objects to have `usage' attributes.
1017   """
1018
1019   def __init__(me, name, version, cmdtab):
1020     """
1021     Create and register a new service with the given NAME and VERSION.
1022
1023     CMDTAB maps command names (in lower-case) to command objects.
1024     """
1025     me.name = name
1026     me.version = version
1027     me.cmd = cmdtab
1028     me.activep = True
1029     me.cmd.setdefault('help',
1030                       TripeServiceCommand('help', 0, 0, '', me._help))
1031     me.cmd.setdefault('quit',
1032                       TripeServiceCommand('quit', 0, 0, '', me._quit))
1033
1034   def job(me, jid, cmd, args):
1035     """
1036     Called by the service manager: a job arrived with id JID.
1037
1038     It asks for comamnd CMD with argument list ARGS.  Creates a new job,
1039     passing it the information needed.
1040     """
1041     return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1042
1043   ## Simple default command handlers, complying with the spec in
1044   ## tripe-service(7).
1045
1046   def _help(me):
1047     """Send a help summary to the user."""
1048     cmds = me.cmd.items()
1049     cmds.sort()
1050     for name, cmd in cmds:
1051       svcinfo(name, *cmd.usage)
1052
1053   def _quit(me):
1054     """Terminate the service manager."""
1055     svcmgr.notify('svc-quit', me.name, 'admin-request')
1056     svcmgr.quit()
1057
1058 class TripeServiceCommand (object):
1059   """A simple service command."""
1060
1061   def __init__(me, name, min, max, usage, func):
1062     """
1063     Creates a new command.
1064
1065     NAME is the command's name (in lowercase).
1066
1067     MIN and MAX are the minimum and maximum number of allowed arguments (used
1068     for checking); either may be None to indicate no minimum or maximum.
1069
1070     USAGE is a usage string, used for generating help and error messages.
1071
1072     FUNC is the function to invoke.
1073     """
1074     me.name = name
1075     me.min = min
1076     me.max = max
1077     me.usage = usage.split()
1078     me.func = func
1079
1080   def run(me, *args):
1081     """
1082     Called when the command is invoked.
1083
1084     Does minimal checking of the arguments and calls the supplied function.
1085     """
1086     if (me.min is not None and len(args) < me.min) or \
1087        (me.max is not None and len(args) > me.max):
1088       raise TripeSyntaxError
1089     me.func(*args)
1090
1091 class TripeServiceJob (Coroutine):
1092   """
1093   Job handler coroutine.
1094
1095   A standard TripeService invokes a TripeServiceJob for each incoming job
1096   request, passing it the jobid, command and arguments, and a command
1097   object.  The command object needs the following attributes.
1098
1099   usage                 A usage list (excluding the command name) showing
1100                         arguments and options.
1101
1102   run(*ARGS)            Function to react to the command with ARGS split into
1103                         separate arguments.  Invoked in a coroutine.  The
1104                         svcinfo function (not the TripeCommandDispatcher
1105                         method) may be used to send INFO lines.  The function
1106                         may raise TripeJobError to send a FAIL response back,
1107                         or TripeSyntaxError to send a generic usage error.
1108                         TripeJobCancelled exceptions are trapped silently.
1109                         Other exceptions are translated into a generic
1110                         internal-error message.
1111
1112   This class automatically takes care of sending some closing response to the
1113   job, and for informing the service manager that the job is completed.
1114
1115   The `jid' attribute stores the job's id.
1116   """
1117
1118   def __init__(me, jid, svc, cmd, command, args):
1119     """
1120     Start a new job.
1121
1122     The job is created with id JID, for service SVC, processing command name
1123     CMD (which the service resolved into the command object COMMAND, or
1124     None), and with the arguments ARGS.
1125     """
1126     Coroutine.__init__(me)
1127     me.jid = jid
1128     me.svc = svc
1129     me.cmd = cmd
1130     me.command = command
1131     me.args = args
1132
1133   def run(me):
1134     """
1135     Main body of the coroutine.
1136
1137     Does the tedious exception handling boilerplate and invokes the command's
1138     run method.
1139     """
1140     try:
1141       try:
1142         if me.command is None:
1143           svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1144         else:
1145           me.command.run(*me.args)
1146           svcmgr.svcok(me.jid)
1147       except TripeJobError, exc:
1148         svcmgr.svcfail(me.jid, *exc.args)
1149       except TripeSyntaxError:
1150         svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1151                        me.svc.name, me.command.name,
1152                        *me.command.usage)
1153       except TripeJobCancelled:
1154         pass
1155       except Exception, exc:
1156         svcmgr.svcfail(me.jid, 'svc-internal-error',
1157                        exc.__class__.__name__, str(exc))
1158     finally:
1159       svcmgr.jobdone(me.jid)
1160
1161   def start(me):
1162     """Invoked by the service manager to start running the coroutine."""
1163     me.switch()
1164
1165   def cancel(me):
1166     """Invoked by the service manager to cancel the job."""
1167     me.throw(TripeJobCancelled())
1168
1169 def svcinfo(*args):
1170   """
1171   If invoked from a TripeServiceJob coroutine, sends an INFO line to the
1172   job's sender, automatically using the correct job id.
1173   """
1174   svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1175
1176 def _setupsvc(tab, func):
1177   """
1178   Setup coroutine for setting up service programs.
1179
1180   Register the given services.
1181   """
1182   try:
1183     for service in tab:
1184       svcmgr.addsvc(service)
1185     if func:
1186       func()
1187   finally:
1188     svcmgr.running()
1189
1190 svcmgr = TripeServiceManager(None)
1191 _spawnq = []
1192 def runservices(socket, tab, init = None, setup = None, daemon = False):
1193   """
1194   Function to start a service provider.
1195
1196   SOCKET is the socket to connect to, usually tripesock.
1197
1198   TAB is a list of entries.  An entry may be either a tuple
1199
1200     (NAME, VERSION, COMMANDS)
1201
1202   or a service object (e.g., a TripeService instance).
1203
1204   COMMANDS is a dictionary mapping command names to tuples
1205
1206     (MIN, MAX, USAGE, FUNC)
1207
1208   of arguments for a TripeServiceCommand object.
1209
1210   If DAEMON is true, then the process is forked into the background before we
1211   start.  If INIT is given, it is called in the main coroutine, immediately
1212   after forking.  If SETUP is given, it is called in a coroutine, after
1213   calling INIT and setting up the services but before marking the service
1214   manager as running.
1215
1216   It is a really bad idea to do any initialization, particularly setting up
1217   coroutines, outside of the INIT or SETUP functions.  In particular, if
1218   we're using rmcr for fake coroutines, the daemonizing fork will kill off
1219   the currently established coroutines in a most surprising way.
1220
1221   The function runs a main select loop until the service manager decides to
1222   quit.
1223   """
1224
1225   global _spawnq
1226   svcmgr.socket = socket
1227   svcmgr.connect()
1228   svcs = []
1229   for service in tab:
1230     if not isinstance(service, tuple):
1231       svcs.append(service)
1232     else:
1233       name, version, commands = service
1234       cmdmap = {}
1235       for cmd, stuff in commands.iteritems():
1236         cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1237       svcs.append(TripeService(name, version, cmdmap))
1238   if daemon:
1239     M.daemonize()
1240   if init is not None:
1241     init()
1242   Coroutine(_setupsvc).switch(svcs, setup)
1243   while not svcmgr.quitp():
1244     for cr, args, kw in _spawnq:
1245       cr.switch(*args, **kw)
1246     _spawnq = []
1247     M.select()
1248
1249 def spawn(cr, *args, **kw):
1250   """
1251   Utility for spawning coroutines.
1252
1253   The coroutine CR is made to be a direct child of the root coroutine, and
1254   invoked by it with the given arguments.
1255   """
1256   cr.parent = rootcr
1257   _spawnq.append((cr, args, kw))
1258
1259 ###--------------------------------------------------------------------------
1260 ### Utilities for services.
1261
1262 _timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
1263 def timespec(spec):
1264   """Parse the timespec SPEC, returning a number of seconds."""
1265   mul = 1
1266   if len(spec) > 1 and spec[-1] in _timeunits:
1267     mul = _timeunits[spec[-1]]
1268     spec = spec[:-1]
1269   try:
1270     t = int(spec)
1271   except:
1272     raise TripeJobError('bad-time-spec', spec)
1273   if t < 0:
1274     raise TripeJobError('bad-time-spec', spec)
1275   return mul * int(spec)
1276
1277 class OptParse (object):
1278   """
1279   Parse options from a command list in the conventional fashion.
1280
1281   ARGS is a list of arguments to a command.  ALLOWED is a sequence of allowed
1282   options.  The returned values are the option tags.  During parsing, the
1283   `arg' method may be used to retrieve the argument for the most recent
1284   option.  Afterwards, `rest' may be used to retrieve the remaining
1285   non-option arguments, and do a simple check on how many there are.
1286
1287   The parser correctly handles `--' option terminators.
1288   """
1289
1290   def __init__(me, args, allowed):
1291     """
1292     Create a new option parser.
1293
1294     The parser will scan the ARGS for options given in the sequence ALLOWED
1295     (which are expected to include the `-' prefix).
1296     """
1297     me.allowed = {}
1298     for a in allowed:
1299       me.allowed[a] = True
1300     me.args = list(args)
1301
1302   def __iter__(me):
1303     """Iterator protocol: I am my own iterator."""
1304     return me
1305
1306   def next(me):
1307     """
1308     Iterator protocol: return the next option.
1309
1310     If we've run out, raise StopIteration.
1311     """
1312     if len(me.args) == 0 or \
1313        len(me.args[0]) < 2 or \
1314        not me.args[0].startswith('-'):
1315       raise StopIteration
1316     opt = me.args.pop(0)
1317     if opt == '--':
1318       raise StopIteration
1319     if opt not in me.allowed:
1320       raise TripeSyntaxError
1321     return opt
1322
1323   def arg(me):
1324     """
1325     Return the argument for the most recent option.
1326
1327     If none is available, raise TripeSyntaxError.
1328     """
1329     if len(me.args) == 0:
1330       raise TripeSyntaxError
1331     return me.args.pop(0)
1332
1333   def rest(me, min = None, max = None):
1334     """
1335     After option parsing is done, return the remaining arguments.
1336
1337     Check that there are at least MIN and at most MAX arguments remaining --
1338     either may be None to suppress the check.
1339     """
1340     if (min is not None and len(me.args) < min) or \
1341        (max is not None and len(me.args) > max):
1342       raise TripeSyntaxError
1343     return me.args
1344
1345 ###----- That's all, folks --------------------------------------------------