chiark / gitweb /
py/rmcr.py: More useful diagnostics for uncaught exceptions.
[tripe] / py / tripe.py.in
1 ### -*-python-*-
2 ###
3 ### Administration connection with tripe server
4 ###
5 ### (c) 2006 Straylight/Edgeware
6 ###
7
8 ###----- Licensing notice ---------------------------------------------------
9 ###
10 ### This file is part of Trivial IP Encryption (TrIPE).
11 ###
12 ### TrIPE is free software; you can redistribute it and/or modify
13 ### it under the terms of the GNU General Public License as published by
14 ### the Free Software Foundation; either version 2 of the License, or
15 ### (at your option) any later version.
16 ###
17 ### TrIPE is distributed in the hope that it will be useful,
18 ### but WITHOUT ANY WARRANTY; without even the implied warranty of
19 ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 ### GNU General Public License for more details.
21 ###
22 ### You should have received a copy of the GNU General Public License
23 ### along with TrIPE; if not, write to the Free Software Foundation,
24 ### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25
26 """
27 This module provides classes and functions for connecting to a running tripe
28 server, sending it commands, receiving and processing replies, and
29 implementing services.
30
31 Rather than end up in lost in a storm of little event-driven classes, or a
32 morass of concurrent threads, the module uses coroutines to present a fairly
33 simple function call/return interface to potentially long-running commands
34 which must run without blocking the main process.  It assumes a coroutine
35 module presenting a subset of the `greenlet' interface: if actual greenlets
36 are available, they are used; otherwise there's an implementation in terms of
37 threads (with lots of locking) which will do instead.
38
39 The simple rule governing the coroutines used here is this:
40
41   * The root coroutine never cares what values are passed to it when it
42     resumes: it just discards them.
43
44   * Other, non-root, coroutines are presumed to be waiting for some specific
45     thing.
46
47 Configuration variables:
48         configdir
49         socketdir
50         PACKAGE
51         VERSION
52         tripesock
53         peerdb
54
55 Other useful variables:
56         rootcr
57         svcmgr
58
59 Other tweakables:
60         _debug
61
62 Exceptions:
63         Exception
64           StandardError
65             TripeConnectionError
66             TripeError
67             TripeInternalError
68           TripeJobCancelled
69           TripeJobError
70           TripeSyntaxError
71
72 Classes:
73         _Coroutine
74           Coroutine
75             TripeServiceJob
76         OptParse
77         Queue
78         TripeCommand
79           TripeSynchronousCommand
80           TripeAsynchronousCommand
81         TripeCommandIterator
82         TripeConnection
83           TripeCommandDispatcher
84             SelCommandDispatcher
85               TripeServiceManager
86         TripeService
87         TripeServiceCommand
88
89 Utility functions:
90         quotify
91         runservices
92         spawn
93         svcinfo
94         timespec
95 """
96
97 __pychecker__ = 'self=me no-constCond no-argsused'
98
99 _debug = False
100
101 ###--------------------------------------------------------------------------
102 ### External dependencies.
103
104 import socket as S
105 import errno as E
106 import mLib as M
107 import re as RX
108 import sys as SYS
109 import os as OS
110
111 try:
112   if OS.getenv('TRIPE_FORCE_RMCR') is not None:
113     raise ImportError
114   from py.magic import greenlet as _Coroutine
115 except ImportError:
116   from rmcr import Coroutine as _Coroutine
117
118 ###--------------------------------------------------------------------------
119 ### Coroutine hacking.
120
121 rootcr = _Coroutine.getcurrent()
122
123 class Coroutine (_Coroutine):
124   """
125   A coroutine class which can only be invoked by the root coroutine.
126
127   The root, by construction, cannot be an instance of this class.
128   """
129   def switch(me, *args, **kw):
130     assert _Coroutine.getcurrent() is rootcr
131     _Coroutine.switch(me, *args, **kw)
132
133 ###--------------------------------------------------------------------------
134 ### Default places for things.
135
136 configdir = OS.environ.get('TRIPEDIR', "@configdir@")
137 socketdir = "@socketdir@"
138 PACKAGE = "@PACKAGE@"
139 VERSION = "@VERSION@"
140
141 tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
142 peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb')
143
144 ###--------------------------------------------------------------------------
145 ### Connection to the server.
146
147 def readnonblockingly(sock, len):
148   """
149   Nonblocking read from SOCK.
150
151   Try to return LEN bytes.  If couldn't read anything, return None.  EOF is
152   returned as an empty string.
153   """
154   try:
155     sock.setblocking(0)
156     return sock.recv(len)
157   except S.error, exc:
158     if exc[0] == E.EWOULDBLOCK:
159       return None
160     raise
161
162 class TripeConnectionError (StandardError):
163   """Something happened to the connection with the server."""
164   pass
165 class TripeInternalError (StandardError):
166   """This program is very confused."""
167   pass
168
169 class TripeConnection (object):
170   """
171   A logical connection to the tripe administration socket.
172
173   There may or may not be a physical connection.  (This is needed for the
174   monitor, for example.)
175
176   This class isn't very useful on its own, but it has useful subclasses.  At
177   this level, the class is agnostic about I/O multiplexing schemes; that gets
178   added later.
179   """
180
181   def __init__(me, socket):
182     """
183     Make a connection to the named SOCKET.
184
185     No physical connection is made initially.
186     """
187     me.socket = socket
188     me.sock = None
189     me.lbuf = None
190
191   def connect(me):
192     """
193     Ensure that there's a physical connection.
194
195     Do nothing if we're already connected.  Invoke the `connected' method if
196     successful.
197     """
198     if me.sock: return
199     sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
200     sock.connect(me.socket)
201     me.sock = sock
202     me.lbuf = M.LineBuffer(me.line, me._eof)
203     me.lbuf.size = 1024
204     me.connected()
205     return me
206
207   def disconnect(me, reason):
208     """
209     Disconnect the physical connection.
210
211     Invoke the `disconnected' method, giving the provided REASON, which
212     should be either None or an exception.
213     """
214     if not me.sock: return
215     me.disconnected(reason)
216     me.sock.close()
217     me.sock = None
218     me.lbuf.disable()
219     me.lbuf = None
220     return me
221
222   def connectedp(me):
223     """
224     Return true if there's a current, believed-good physical connection.
225     """
226     return me.sock is not None
227
228   __nonzero__ = connectedp
229
230   def send(me, line):
231     """
232     Send the LINE to the connection's socket.
233
234     All output is done through this method; it can be overridden to provide
235     proper nonblocking writing, though this seems generally unnecessary.
236     """
237     try:
238       me.sock.setblocking(1)
239       me.sock.send(line + '\n')
240     except Exception, exc:
241       me.disconnect(exc)
242       raise
243     return me
244
245   def receive(me):
246     """
247     Receive whatever's ready from the connection's socket.
248
249     Call `line' on each complete line, and `eof' if the connection closed.
250     Subclasses which attach this class to an I/O-event system should call
251     this method when the socket (CONN.sock) is ready for reading.
252     """
253     while me.sock is not None:
254       try:
255         buf = readnonblockingly(me.sock, 16384)
256       except Exception, exc:
257         me.disconnect(exc)
258         raise
259       if buf is None:
260         return me
261       if buf == '':
262         me._eof()
263         return me
264       me.lbuf.flush(buf)
265     return me
266
267   def _eof(me):
268     """Internal end-of-file handler."""
269     me.disconnect(TripeConnectionError('connection lost'))
270     me.eof()
271
272   def connected(me):
273     """
274     To be overridden by subclasses to react to a connection being
275     established.
276     """
277     pass
278
279   def disconnected(me, reason):
280     """
281     To be overridden by subclasses to react to a connection being severed.
282     """
283     pass
284
285   def eof(me):
286     """To be overridden by subclasses to handle end-of-file."""
287     pass
288
289   def line(me, line):
290     """To be overridden by subclasses to handle incoming lines."""
291     pass
292
293 ###--------------------------------------------------------------------------
294 ### Dispatching coroutine.
295
296 ## Match a string if it can stand on its own as a bareword: i.e., it doesn't
297 ## contain backslashes, quotes or whitespace.
298 rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
299
300 ## Match characters which need to be escaped, even in quoted text.
301 rx_weird = RX.compile(r'([\\\'])')
302
303 def quotify(s):
304   """Quote S according to the tripe-admin(5) rules."""
305   m = rx_ordinary.match(s)
306   if m and m.end() == len(s):
307     return s
308   else:
309     return "'" + rx_weird.sub(r'\\\1', s) + "'"
310
311 def _callback(func):
312   """
313   Return a wrapper for FUNC which reports exceptions thrown by it.
314
315   Useful in the case of callbacks invoked by C functions which ignore
316   exceptions.
317   """
318   def _(*a, **kw):
319     try:
320       return func(*a, **kw)
321     except:
322       SYS.excepthook(*SYS.exc_info())
323       raise
324   return _
325
326 class TripeCommand (object):
327   """
328   This abstract class represents a command in progress.
329
330   The `words' attribute contains the list of tokens which make up the
331   command.
332
333   Subclasses must implement a method to handle server responses:
334
335     * response(CODE, *ARGS): CODE is one of the strings 'OK', 'INFO' or
336       'FAIL'; ARGS are the remaining tokens from the server's response.
337   """
338
339   def __init__(me, words):
340     """Make a new command consisting of the given list of WORDS."""
341     me.words = words
342
343 class TripeSynchronousCommand (TripeCommand):
344   """
345   A simple command, processed apparently synchronously.
346
347   Must be invoked from a coroutine other than the root (or whichever one is
348   running the dispatcher); in reality, other coroutines carry on running
349   while we wait for a response from the server.
350
351   Each server response causes the calling coroutine to be resumed with the
352   pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
353   or `FAIL') and REST is a list of the server's other response tokens.  The
354   calling coroutine must continue switching back to the dispatcher until a
355   terminating response (`OK' or `FAIL') is received or become very
356   confused.
357
358   Mostly it's better to use the TripeCommandIterator to do this
359   automatically.
360   """
361
362   def __init__(me, words):
363     """Initialize the command, specifying the WORDS to send to the server."""
364     TripeCommand.__init__(me, words)
365     me.owner = Coroutine.getcurrent()
366
367   def response(me, code, *rest):
368     """Handle a server response by forwarding it to the calling coroutine."""
369     me.owner.switch((code, rest))
370
371 class TripeError (StandardError):
372   """
373   A tripe command failed with an error (a FAIL code).  The args attribute
374   contains a list of the server's message tokens.
375   """
376   pass
377
378 class TripeCommandIterator (object):
379   """
380   Iterator interface to a tripe command.
381
382   The values returned by the iterator are lists of tokens from the server's
383   INFO lines, as processed by the given filter function, if any.  The
384   iterator completes normally (by raising StopIteration) if the server
385   reported OK, and raises an exception if the command failed for some reason.
386
387   A TripeError is raised if the server issues a FAIL code.  If the connection
388   failed, some other exception is raised.
389   """
390
391   def __init__(me, dispatcher, words, bg = False, filter = None):
392     """
393     Create a new command iterator.
394
395     The command is submitted to the DISPATCHER; it consists of the given
396     WORDS.  If BG is true, then an option is inserted to request that the
397     server run the command in the background.  The FILTER is applied to the
398     token lists which the server responds, and the filter's output are the
399     items returned by the iterator.
400     """
401     me.dcr = Coroutine.getcurrent().parent
402     if me.dcr is None:
403       raise ValueError, 'must invoke from coroutine'
404     me.filter = filter or (lambda x: x)
405     if bg:
406       words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
407     dispatcher.rawcommand(TripeSynchronousCommand(words))
408
409   def __iter__(me):
410     """Iterator protocol: I am my own iterator."""
411     return me
412
413   def next(me):
414     """
415     Iterator protocol: return the next piece of information from the server.
416
417     INFO responses are filtered and returned as the values of the iteration.
418     FAIL and CONNERR responses are turned into exceptions and raised.
419     Finally, OK is turned into StopIteration, which should cause a normal end
420     to the iteration process.
421     """
422     thing = me.dcr.switch()
423     code, rest = thing
424     if code == 'INFO':
425       return me.filter(rest)
426     elif code == 'OK':
427       raise StopIteration
428     elif code == 'CONNERR':
429       if rest is None:
430         raise TripeConnectionError, 'connection terminated by user'
431       else:
432         raise rest
433     elif code == 'FAIL':
434       raise TripeError(*rest)
435     else:
436       raise TripeInternalError \
437             ('unexpected tripe response %r' % ([code] + rest))
438
439 ### Simple utility functions for the TripeCommandIterator convenience
440 ### methods.
441
442 def _tokenjoin(words):
443   """Filter function: simply join the given tokens with spaces between."""
444   return ' '.join(words)
445
446 def _keyvals(iter):
447   """Return a dictionary formed from the KEY=VALUE pairs returned by the
448   iterator ITER."""
449   kv = {}
450   for ww in iter:
451     for w in ww:
452       q = w.index('=')
453       kv[w[:q]] = w[q + 1:]
454   return kv
455
456 def _simple(iter):
457   """Raise an error if ITER contains any item."""
458   stuff = list(iter)
459   if len(stuff) != 0:
460     raise TripeInternalError('expected no response')
461   return None
462
463 def _oneline(iter):
464   """If ITER contains a single item, return it; otherwise raise an error."""
465   stuff = list(iter)
466   if len(stuff) != 1:
467     raise TripeInternalError('expected only one line of response')
468   return stuff[0]
469
470 def _tracelike(iter):
471   """Handle a TRACE-like command.  The result is a list of tuples (CHAR,
472   STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
473   disabled, `+' if enabled, maybe something else later), and DESC is the
474   human-readable description."""
475   stuff = []
476   for ww in iter:
477     ch = ww[0][0]
478     st = ww[0][1:]
479     desc = ' '.join(ww[1:])
480     stuff.append((ch, st, desc))
481   return stuff
482
483 def _kwopts(kw, allowed):
484   """Parse keyword arguments into options.  ALLOWED is a list of allowable
485   keywords; raise errors if other keywords are present.  KEY = VALUE becomes
486   an option pair -KEY VALUE if VALUE is a string, just the option -KEY if
487   VALUE is a true non-string, or nothing if VALUE is false..  Insert a `--'
488   at the end to stop the parser getting confused."""
489   opts = []
490   amap = {}
491   for a in allowed: amap[a] = True
492   for k, v in kw.iteritems():
493     if k not in amap:
494       raise ValueError('option %s not allowed here' % k)
495     if isinstance(v, str):
496       opts += ['-' + k, v]
497     elif v:
498       opts += ['-' + k]
499   opts.append('--')
500   return opts
501
502 class TripeCommandDispatcher (TripeConnection):
503   """
504   Command dispatcher.
505
506   The command dispatcher is a connection which knows how to handle commands.
507   This is probably the most important class in this module to understand.
508
509   Lines from the server are parsed into tokens.  The first token is a code
510   (OK or NOTE or something) explaining what kind of line this is.  The
511   `handler' attribute is a dictionary mapping server line codes to handler
512   functions, which are applied to the words of the line as individual
513   arguments.  *Exception*: the content of TRACE lines is not tokenized.
514
515   There are default handlers for server codes which respond to commands.
516   Commands arrive as TripeCommand instances through the `rawcommand'
517   interface.  The dispatcher keeps track of which command objects represent
518   which jobs, and sends responses on to the appropriate command objects by
519   invoking their `response' methods.  Command objects don't see the
520   BG... codes, because the dispatcher has already transformed them into
521   regular codes when it was looking up job code.
522
523   The dispatcher also has a special response code of its own: CONNERR
524   indicates that the connection failed and the command has therefore been
525   lost; the
526   """
527
528   ## --- Infrastructure ---
529   ##
530   ## We will get confused if we pipeline commands.  Send them one at a time.
531   ## Only send a command when the previous one detaches or completes.
532   ##
533   ## The following attributes are interesting:
534   ##
535   ## tagseq             Sequence number for next background job (for bgtag)
536   ##
537   ## queue              Commands awaiting submission.
538   ##
539   ## cmd                Mapping from job tags to commands: cmd[None] is the
540   ##                    foreground command.
541   ##
542   ## handler            Mapping from server codes to handler functions.
543
544   def __init__(me, socket):
545     """
546     Initialize the dispatcher.
547
548     The SOCKET is the filename of the administration socket to connect to,
549     for TripeConnection.__init__.
550     """
551     TripeConnection.__init__(me, socket)
552     me.tagseq = 0
553     me.handler = {}
554     me.handler['BGDETACH'] = me._detach
555     for i in 'BGOK', 'BGINFO', 'BGFAIL':
556       me.handler[i] = me._response
557     for i in 'OK', 'INFO', 'FAIL':
558       me.handler[i] = me._fgresponse
559
560   def connected(me):
561     """
562     Connection hook.
563
564     If a subclass overrides this method, it must call us; clears out the
565     command queue and job map.
566     """
567     me.queue = M.Array()
568     me.cmd = {}
569
570   def disconnected(me, reason):
571     """
572     Disconnection hook.
573
574     If a subclass hooks overrides this method, it must call us; sends a
575     special CONNERR code to all incomplete commands.
576     """
577     for cmd in me.cmd.itervalues():
578       cmd.response('CONNERR', reason)
579     for cmd in me.queue:
580       cmd.response('CONNERR', reason)
581
582   @_callback
583   def line(me, line):
584     """Handle an incoming line, sending it to the right place."""
585     if _debug: print '<', line
586     code, rest = M.word(line, quotep = True)
587     func = me.handler.get(code)
588     if func is not None:
589       if code == 'TRACE':
590         func(code, rest)
591       else:
592         func(code, *M.split(rest, quotep = True)[0])
593       me.dequeue()
594
595   def dequeue(me):
596     """
597     Pull the oldest command off the queue and try to send it to the server.
598     """
599     if not me.queue or None in me.cmd: return
600     cmd = me.queue.shift()
601     if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
602     me.send(' '.join([quotify(w) for w in cmd.words]))
603     me.cmd[None] = cmd
604
605   def bgtag(me):
606     """
607     Return an unused job tag.
608
609     May be of use when composing commands by hand.
610     """
611     tag = 'J%05d' % me.tagseq
612     me.tagseq += 1
613     return tag
614
615   ## --- Built-in handler functions for server responses ---
616
617   def _detach(me, _, tag):
618     """
619     Respond to a BGDETACH TAG message.
620
621     Move the current foreground command to the background.
622     """
623     assert tag not in me.cmd
624     me.cmd[tag] = me.cmd[None]
625     del me.cmd[None]
626
627   def _response(me, code, tag, *w):
628     """
629     Respond to an OK, INFO or FAIL message.
630
631     If this is a message for a background job, find the tag; then dispatch
632     the result to the command object.
633     """
634     if code.startswith('BG'):
635       code = code[2:]
636     cmd = me.cmd[tag]
637     if code != 'INFO':
638       del me.cmd[tag]
639     cmd.response(code, *w)
640
641   def _fgresponse(me, code, *w):
642     """Process responses to the foreground command."""
643     me._response(code, None, *w)
644
645   ## --- Interface methods ---
646
647   def rawcommand(me, cmd):
648     """
649     Submit the TripeCommand CMD to the server, and look after it until it
650     completes.
651     """
652     if not me.connectedp():
653       raise TripeConnectionError('connection closed')
654     me.queue.push(cmd)
655     me.dequeue()
656
657   def command(me, *cmd, **kw):
658     """Convenience wrapper for creating a TripeCommandIterator object."""
659     return TripeCommandIterator(me, cmd, **kw)
660
661   ## --- Convenience methods for server commands ---
662
663   def add(me, peer, *addr, **kw):
664     return _simple(me.command(bg = True,
665                               *['ADD'] +
666                               _kwopts(kw, ['tunnel', 'keepalive', 'cork']) +
667                               [peer] +
668                               list(addr)))
669   def addr(me, peer):
670     return _oneline(me.command('ADDR', peer))
671   def algs(me):
672     return _keyvals(me.command('ALGS'))
673   def checkchal(me, chal):
674     return _simple(me.command('CHECKCHAL', chal))
675   def daemon(me):
676     return _simple(me.command('DAEMON'))
677   def eping(me, peer, **kw):
678     return _oneline(me.command(bg = True,
679                                *['PING'] +
680                                _kwopts(kw, ['timeout']) +
681                                [peer]))
682   def forcekx(me, peer):
683     return _simple(me.command('FORCEKX', peer))
684   def getchal(me):
685     return _oneline(me.command('GETCHAL', filter = _tokenjoin))
686   def greet(me, peer, chal):
687     return _simple(me.command('GREET', peer, chal))
688   def help(me):
689     return list(me.command('HELP', filter = _tokenjoin))
690   def ifname(me, peer):
691     return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
692   def kill(me, peer):
693     return _simple(me.command('KILL', peer))
694   def list(me):
695     return list(me.command('LIST', filter = _tokenjoin))
696   def notify(me, *msg):
697     return _simple(me.command('NOTIFY', *msg))
698   def peerinfo(me, peer):
699     return _keyvals(me.command('PEERINFO', peer))
700   def ping(me, peer, **kw):
701     return _oneline(me.command(bg = True,
702                                *['PING'] +
703                                _kwopts(kw, ['timeout']) +
704                                [peer]))
705   def port(me):
706     return _oneline(me.command('PORT', filter = _tokenjoin))
707   def quit(me):
708     return _simple(me.command('QUIT'))
709   def reload(me):
710     return _simple(me.command('RELOAD'))
711   def servinfo(me):
712     return _keyvals(me.command('SERVINFO'))
713   def setifname(me, new):
714     return _simple(me.command('SETIFNAME', new))
715   def svcclaim(me, service, version):
716     return _simple(me.command('SVCCLAIM', service, version))
717   def svcensure(me, service, version = None):
718     return _simple(me.command('SVCENSURE', service,
719                               *((version is not None and [version]) or [])))
720   def svcfail(me, job, *msg):
721     return _simple(me.command('SVCFAIL', job, *msg))
722   def svcinfo(me, job, *msg):
723     return _simple(me.command('SVCINFO', job, *msg))
724   def svclist(me):
725     return list(me.command('SVCLIST'))
726   def svcok(me, job):
727     return _simple(me.command('SVCOK', job))
728   def svcquery(me, service):
729     return _keyvals(me.command('SVCQUERY', service))
730   def svcrelease(me, service):
731     return _simple(me.command('SVCRELEASE', service))
732   def svcsubmit(me, service, *args, **kw):
733     return me.command(bg = True,
734                       *['SVCSUBMIT'] +
735                       _kwopts(kw, ['version']) +
736                       [service] +
737                       list(args))
738   def stats(me, peer):
739     return _keyvals(me.command('STATS', peer))
740   def trace(me, *args):
741     return _tracelike(me.command('TRACE', *args))
742   def tunnels(me):
743     return list(me.command('TUNNELS', filter = _tokenjoin))
744   def version(me):
745     return _oneline(me.command('VERSION', filter = _tokenjoin))
746   def warn(me, *msg):
747     return _simple(me.command('WARN', *msg))
748   def watch(me, *args):
749     return _tracelike(me.command('WATCH', *args))
750
751 ###--------------------------------------------------------------------------
752 ### Asynchronous commands.
753
754 class Queue (object):
755   """
756   A queue of things arriving asynchronously.
757
758   This is a very simple single-reader multiple-writer queue.  It's useful for
759   more complex coroutines which need to cope with a variety of possible
760   incoming events.
761   """
762
763   def __init__(me):
764     """Create a new empty queue."""
765     me.contents = M.Array()
766     me.waiter = None
767
768   def _wait(me):
769     """
770     Internal: wait for an item to arrive in the queue.
771
772     Complain if someone is already waiting, because this is just a
773     single-reader queue.
774     """
775     if me.waiter:
776       raise ValueError('queue already being waited on')
777     try:
778       me.waiter = Coroutine.getcurrent()
779       while not me.contents:
780         me.waiter.parent.switch()
781     finally:
782       me.waiter = None
783
784   def get(me):
785     """
786     Remove and return the item at the head of the queue.
787
788     If the queue is empty, wait until an item arrives.
789     """
790     me._wait()
791     return me.contents.shift()
792
793   def peek(me):
794     """
795     Return the item at the head of the queue without removing it.
796
797     If the queue is empty, wait until an item arrives.
798     """
799     me._wait()
800     return me.contents[0]
801
802   def put(me, thing):
803     """
804     Write THING to the queue.
805
806     If someone is waiting on the queue, wake him up immediately; otherwise
807     just leave the item there for later.
808     """
809     me.contents.push(thing)
810     if me.waiter:
811       me.waiter.switch()
812
813 class TripeAsynchronousCommand (TripeCommand):
814   """
815   Asynchronous commands.
816
817   This is the complicated way of issuing commands.  You must set up a queue,
818   and associate the command with the queue.  Responses arriving for the
819   command will be put on the queue as an triple of the form (TAG, CODE, REST)
820   -- where TAG is an object of your choice, not interpreted by this class,
821   CODE is the server's response code (OK, INFO, FAIL), and REST is the list
822   of the rest of the server's tokens.
823
824   Using this, you can write coroutines which process many commands (and
825   possibly other events) simultaneously.
826   """
827
828   def __init__(me, queue, tag, words):
829     """Make an asynchronous command consisting of the given WORDS, which
830     sends responses to QUEUE, labelled with TAG."""
831     TripeCommand.__init__(me, words)
832     me.queue = queue
833     me.tag = tag
834
835   def response(me, code, *stuff):
836     """Handle a server response by writing it to the caller's queue."""
837     me.queue.put((me.tag, code, list(stuff)))
838
839 ###--------------------------------------------------------------------------
840 ### Selecting command dispatcher.
841
842 class SelCommandDispatcher (TripeCommandDispatcher):
843   """
844   A command dispatcher which integrates with mLib's I/O-event system.
845
846   To use, simply create an instance and run mLib.select in a loop in your
847   main coroutine.
848   """
849
850   def __init__(me, socket):
851     """
852     Create an instance; SOCKET is the admin socket to connect to.
853
854     Note that no connection is made initially.
855     """
856     TripeCommandDispatcher.__init__(me, socket)
857     me.selfile = None
858
859   def connected(me):
860     """Connection hook: wires itself into the mLib select machinery."""
861     TripeCommandDispatcher.connected(me)
862     me.selfile = M.SelFile(me.sock.fileno(), M.SEL_READ, me.receive)
863     me.selfile.enable()
864
865   def disconnected(me, reason):
866     """Disconnection hook: removes itself from the mLib select machinery."""
867     TripeCommandDispatcher.disconnected(me, reason)
868     me.selfile = None
869
870 ###--------------------------------------------------------------------------
871 ### Services.
872
873 class TripeJobCancelled (Exception):
874   """
875   Exception sent to job handler if the client kills the job.
876
877   Not propagated further.
878   """
879   pass
880
881 class TripeJobError (Exception):
882   """
883   Exception to cause failure report for running job.
884
885   Sends an SVCFAIL code back.
886   """
887   pass
888
889 class TripeSyntaxError (Exception):
890   """
891   Exception to report a syntax error for a job.
892
893   Sends an SVCFAIL bad-svc-syntax message back.
894   """
895   pass
896
897 class TripeServiceManager (SelCommandDispatcher):
898   """
899   A command dispatcher with added handling for incoming service requests.
900
901   There is usually only one instance of this class, called svcmgr.  Some of
902   the support functions in this module assume that this is the case.
903
904   To use, run mLib.select in a loop until the quitp method returns true;
905   then, in a non-root coroutine, register your services by calling `add', and
906   then call `running' when you've finished setting up.
907
908   The instance handles server service messages SVCJOB, SVCCANCEL and
909   SVCCLAIM.  It maintains a table of running services.  Incoming jobs cause
910   the service's `job' method to be invoked; SVCCANCEL sends a
911   TripeJobCancelled exception to the handler coroutine, and SVCCLAIM causes
912   the relevant service to be deregistered.
913
914   There is no base class for jobs, but a job must implement two methods:
915
916   start()               Begin processing; might be a no-op.
917
918   cancel()              Stop processing; the original client has killed the
919                         job.
920
921   The life of a service manager is divided into two parts: setup and running;
922   you tell the manager that you've finished setting up by calling the
923   `running' method.  If, at any point after setup is finished, there are no
924   remaining services or jobs, `quitp' will return true, ending the process.
925   """
926
927   ## --- Attributes ---
928   ##
929   ## svc                Mapping name -> service object
930   ##
931   ## job                Mapping jobid -> job handler coroutine
932   ##
933   ## runningp           True when setup is finished
934   ##
935   ## _quitp             True if explicit quit has been requested
936
937   def __init__(me, socket):
938     """
939     Initialize the service manager.
940
941     SOCKET is the administration socket to connect to.
942     """
943     SelCommandDispatcher.__init__(me, socket)
944     me.svc = {}
945     me.job = {}
946     me.runningp = False
947     me.handler['SVCCANCEL'] = me._cancel
948     me.handler['SVCJOB'] = me._job
949     me.handler['SVCCLAIM'] = me._claim
950     me._quitp = 0
951
952   def addsvc(me, svc):
953     """Register a new service; SVC is a TripeService instance."""
954     assert svc.name not in me.svc
955     me.svcclaim(svc.name, svc.version)
956     me.svc[svc.name] = svc
957
958   def _cancel(me, _, jid):
959     """
960     Called when the server cancels a job; invokes the job's `cancel' method.
961     """
962     job = me.job[jid]
963     del me.job[jid]
964     job.cancel()
965
966   def _claim(me, _, svc, __):
967     """Called when another program claims our service at a higher version."""
968     del me.svc[svc]
969
970   def _job(me, _, jid, svc, cmd, *args):
971     """
972     Called when the server sends us a job to do.
973
974     Calls the service to collect a job, and begins processing it.
975     """
976     assert jid not in me.job
977     svc = me.svc[svc.lower()]
978     job = svc.job(jid, cmd, args)
979     me.job[jid] = job
980     job.start()
981
982   def running(me):
983     """Answer true if setup is finished."""
984     me.runningp = True
985
986   def jobdone(me, jid):
987     """Informs the service manager that the job with id JID has finished."""
988     try:
989       del me.job[jid]
990     except KeyError:
991       pass
992
993   def quitp(me):
994     """
995     Return true if no services or jobs are active (and, therefore, if this
996     process can quit without anyone caring).
997     """
998     return me._quitp or (me.runningp and ((not me.svc and not me.job) or
999                                           not me.selfile))
1000
1001   def quit(me):
1002     """Forces the quit flag (returned by quitp) on."""
1003     me._quitp = True
1004
1005 class TripeService (object):
1006   """
1007   A standard service.
1008
1009   The NAME and VERSION are passed on to the server.  The CMDTAB is a
1010   dictionary mapping command names (in lowercase) to command objects.
1011
1012   If the CMDTAB doesn't have entries for commands HELP and QUIT then defaults
1013   are provided.
1014
1015   TripeService itself is mostly agnostic about the nature of command objects,
1016   but the TripeServiceJob class (below) has some requirements.  The built-in
1017   HELP command requires command objects to have `usage' attributes.
1018   """
1019
1020   def __init__(me, name, version, cmdtab):
1021     """
1022     Create and register a new service with the given NAME and VERSION.
1023
1024     CMDTAB maps command names (in lower-case) to command objects.
1025     """
1026     me.name = name
1027     me.version = version
1028     me.cmd = cmdtab
1029     me.activep = True
1030     me.cmd.setdefault('help',
1031                       TripeServiceCommand('help', 0, 0, '', me._help))
1032     me.cmd.setdefault('quit',
1033                       TripeServiceCommand('quit', 0, 0, '', me._quit))
1034
1035   def job(me, jid, cmd, args):
1036     """
1037     Called by the service manager: a job arrived with id JID.
1038
1039     It asks for comamnd CMD with argument list ARGS.  Creates a new job,
1040     passing it the information needed.
1041     """
1042     return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1043
1044   ## Simple default command handlers, complying with the spec in
1045   ## tripe-service(7).
1046
1047   def _help(me):
1048     """Send a help summary to the user."""
1049     cmds = me.cmd.items()
1050     cmds.sort()
1051     for name, cmd in cmds:
1052       svcinfo(name, *cmd.usage)
1053
1054   def _quit(me):
1055     """Terminate the service manager."""
1056     svcmgr.notify('svc-quit', me.name, 'admin-request')
1057     svcmgr.quit()
1058
1059 class TripeServiceCommand (object):
1060   """A simple service command."""
1061
1062   def __init__(me, name, min, max, usage, func):
1063     """
1064     Creates a new command.
1065
1066     NAME is the command's name (in lowercase).
1067
1068     MIN and MAX are the minimum and maximum number of allowed arguments (used
1069     for checking); either may be None to indicate no minimum or maximum.
1070
1071     USAGE is a usage string, used for generating help and error messages.
1072
1073     FUNC is the function to invoke.
1074     """
1075     me.name = name
1076     me.min = min
1077     me.max = max
1078     me.usage = usage.split()
1079     me.func = func
1080
1081   def run(me, *args):
1082     """
1083     Called when the command is invoked.
1084
1085     Does minimal checking of the arguments and calls the supplied function.
1086     """
1087     if (me.min is not None and len(args) < me.min) or \
1088        (me.max is not None and len(args) > me.max):
1089       raise TripeSyntaxError
1090     me.func(*args)
1091
1092 class TripeServiceJob (Coroutine):
1093   """
1094   Job handler coroutine.
1095
1096   A standard TripeService invokes a TripeServiceJob for each incoming job
1097   request, passing it the jobid, command and arguments, and a command
1098   object.  The command object needs the following attributes.
1099
1100   usage                 A usage list (excluding the command name) showing
1101                         arguments and options.
1102
1103   run(*ARGS)            Function to react to the command with ARGS split into
1104                         separate arguments.  Invoked in a coroutine.  The
1105                         svcinfo function (not the TripeCommandDispatcher
1106                         method) may be used to send INFO lines.  The function
1107                         may raise TripeJobError to send a FAIL response back,
1108                         or TripeSyntaxError to send a generic usage error.
1109                         TripeJobCancelled exceptions are trapped silently.
1110                         Other exceptions are translated into a generic
1111                         internal-error message.
1112
1113   This class automatically takes care of sending some closing response to the
1114   job, and for informing the service manager that the job is completed.
1115
1116   The `jid' attribute stores the job's id.
1117   """
1118
1119   def __init__(me, jid, svc, cmd, command, args):
1120     """
1121     Start a new job.
1122
1123     The job is created with id JID, for service SVC, processing command name
1124     CMD (which the service resolved into the command object COMMAND, or
1125     None), and with the arguments ARGS.
1126     """
1127     Coroutine.__init__(me)
1128     me.jid = jid
1129     me.svc = svc
1130     me.cmd = cmd
1131     me.command = command
1132     me.args = args
1133
1134   def run(me):
1135     """
1136     Main body of the coroutine.
1137
1138     Does the tedious exception handling boilerplate and invokes the command's
1139     run method.
1140     """
1141     try:
1142       try:
1143         if me.command is None:
1144           svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1145         else:
1146           me.command.run(*me.args)
1147           svcmgr.svcok(me.jid)
1148       except TripeJobError, exc:
1149         svcmgr.svcfail(me.jid, *exc.args)
1150       except TripeSyntaxError:
1151         svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1152                        me.svc.name, me.command.name,
1153                        *me.command.usage)
1154       except TripeJobCancelled:
1155         pass
1156       except Exception, exc:
1157         svcmgr.svcfail(me.jid, 'svc-internal-error',
1158                        exc.__class__.__name__, str(exc))
1159     finally:
1160       svcmgr.jobdone(me.jid)
1161
1162   def start(me):
1163     """Invoked by the service manager to start running the coroutine."""
1164     me.switch()
1165
1166   def cancel(me):
1167     """Invoked by the service manager to cancel the job."""
1168     me.throw(TripeJobCancelled())
1169
1170 def svcinfo(*args):
1171   """
1172   If invoked from a TripeServiceJob coroutine, sends an INFO line to the
1173   job's sender, automatically using the correct job id.
1174   """
1175   svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1176
1177 def _setupsvc(tab, func):
1178   """
1179   Setup coroutine for setting up service programs.
1180
1181   Register the given services.
1182   """
1183   try:
1184     for service in tab:
1185       svcmgr.addsvc(service)
1186     if func:
1187       func()
1188   finally:
1189     svcmgr.running()
1190
1191 svcmgr = TripeServiceManager(None)
1192 _spawnq = []
1193 def runservices(socket, tab, init = None, setup = None, daemon = False):
1194   """
1195   Function to start a service provider.
1196
1197   SOCKET is the socket to connect to, usually tripesock.
1198
1199   TAB is a list of entries.  An entry may be either a tuple
1200
1201     (NAME, VERSION, COMMANDS)
1202
1203   or a service object (e.g., a TripeService instance).
1204
1205   COMMANDS is a dictionary mapping command names to tuples
1206
1207     (MIN, MAX, USAGE, FUNC)
1208
1209   of arguments for a TripeServiceCommand object.
1210
1211   If DAEMON is true, then the process is forked into the background before we
1212   start.  If INIT is given, it is called in the main coroutine, immediately
1213   after forking.  If SETUP is given, it is called in a coroutine, after
1214   calling INIT and setting up the services but before marking the service
1215   manager as running.
1216
1217   It is a really bad idea to do any initialization, particularly setting up
1218   coroutines, outside of the INIT or SETUP functions.  In particular, if
1219   we're using rmcr for fake coroutines, the daemonizing fork will kill off
1220   the currently established coroutines in a most surprising way.
1221
1222   The function runs a main select loop until the service manager decides to
1223   quit.
1224   """
1225
1226   global _spawnq
1227   svcmgr.socket = socket
1228   svcmgr.connect()
1229   svcs = []
1230   for service in tab:
1231     if not isinstance(service, tuple):
1232       svcs.append(service)
1233     else:
1234       name, version, commands = service
1235       cmdmap = {}
1236       for cmd, stuff in commands.iteritems():
1237         cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1238       svcs.append(TripeService(name, version, cmdmap))
1239   if daemon:
1240     M.daemonize()
1241   if init is not None:
1242     init()
1243   Coroutine(_setupsvc).switch(svcs, setup)
1244   while not svcmgr.quitp():
1245     for cr, args, kw in _spawnq:
1246       cr.switch(*args, **kw)
1247     _spawnq = []
1248     M.select()
1249
1250 def spawn(cr, *args, **kw):
1251   """
1252   Utility for spawning coroutines.
1253
1254   The coroutine CR is made to be a direct child of the root coroutine, and
1255   invoked by it with the given arguments.
1256   """
1257   cr.parent = rootcr
1258   _spawnq.append((cr, args, kw))
1259
1260 ###--------------------------------------------------------------------------
1261 ### Utilities for services.
1262
1263 _timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
1264 def timespec(spec):
1265   """Parse the timespec SPEC, returning a number of seconds."""
1266   mul = 1
1267   if len(spec) > 1 and spec[-1] in _timeunits:
1268     mul = _timeunits[spec[-1]]
1269     spec = spec[:-1]
1270   try:
1271     t = int(spec)
1272   except:
1273     raise TripeJobError('bad-time-spec', spec)
1274   if t < 0:
1275     raise TripeJobError('bad-time-spec', spec)
1276   return mul * int(spec)
1277
1278 class OptParse (object):
1279   """
1280   Parse options from a command list in the conventional fashion.
1281
1282   ARGS is a list of arguments to a command.  ALLOWED is a sequence of allowed
1283   options.  The returned values are the option tags.  During parsing, the
1284   `arg' method may be used to retrieve the argument for the most recent
1285   option.  Afterwards, `rest' may be used to retrieve the remaining
1286   non-option arguments, and do a simple check on how many there are.
1287
1288   The parser correctly handles `--' option terminators.
1289   """
1290
1291   def __init__(me, args, allowed):
1292     """
1293     Create a new option parser.
1294
1295     The parser will scan the ARGS for options given in the sequence ALLOWED
1296     (which are expected to include the `-' prefix).
1297     """
1298     me.allowed = {}
1299     for a in allowed:
1300       me.allowed[a] = True
1301     me.args = list(args)
1302
1303   def __iter__(me):
1304     """Iterator protocol: I am my own iterator."""
1305     return me
1306
1307   def next(me):
1308     """
1309     Iterator protocol: return the next option.
1310
1311     If we've run out, raise StopIteration.
1312     """
1313     if len(me.args) == 0 or \
1314        len(me.args[0]) < 2 or \
1315        not me.args[0].startswith('-'):
1316       raise StopIteration
1317     opt = me.args.pop(0)
1318     if opt == '--':
1319       raise StopIteration
1320     if opt not in me.allowed:
1321       raise TripeSyntaxError
1322     return opt
1323
1324   def arg(me):
1325     """
1326     Return the argument for the most recent option.
1327
1328     If none is available, raise TripeSyntaxError.
1329     """
1330     if len(me.args) == 0:
1331       raise TripeSyntaxError
1332     return me.args.pop(0)
1333
1334   def rest(me, min = None, max = None):
1335     """
1336     After option parsing is done, return the remaining arguments.
1337
1338     Check that there are at least MIN and at most MAX arguments remaining --
1339     either may be None to suppress the check.
1340     """
1341     if (min is not None and len(me.args) < min) or \
1342        (max is not None and len(me.args) > max):
1343       raise TripeSyntaxError
1344     return me.args
1345
1346 ###----- That's all, folks --------------------------------------------------