chiark / gitweb /
py/rmcr.py: More useful diagnostics for uncaught exceptions.
[tripe] / py / tripe.py.in
CommitLineData
2fa80010
MW
1### -*-python-*-
2###
3### Administration connection with tripe server
4###
5### (c) 2006 Straylight/Edgeware
6###
7
8###----- Licensing notice ---------------------------------------------------
9###
10### This file is part of Trivial IP Encryption (TrIPE).
11###
12### TrIPE is free software; you can redistribute it and/or modify
13### it under the terms of the GNU General Public License as published by
14### the Free Software Foundation; either version 2 of the License, or
15### (at your option) any later version.
16###
17### TrIPE is distributed in the hope that it will be useful,
18### but WITHOUT ANY WARRANTY; without even the implied warranty of
19### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20### GNU General Public License for more details.
21###
22### You should have received a copy of the GNU General Public License
23### along with TrIPE; if not, write to the Free Software Foundation,
24### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25
26"""
27This module provides classes and functions for connecting to a running tripe
28server, sending it commands, receiving and processing replies, and
29implementing services.
30
31Rather than end up in lost in a storm of little event-driven classes, or a
32morass of concurrent threads, the module uses coroutines to present a fairly
33simple function call/return interface to potentially long-running commands
a62f8e8a 34which must run without blocking the main process. It assumes a coroutine
2fa80010
MW
35module presenting a subset of the `greenlet' interface: if actual greenlets
36are available, they are used; otherwise there's an implementation in terms of
37threads (with lots of locking) which will do instead.
38
39The simple rule governing the coroutines used here is this:
40
41 * The root coroutine never cares what values are passed to it when it
42 resumes: it just discards them.
43
44 * Other, non-root, coroutines are presumed to be waiting for some specific
45 thing.
46
47Configuration variables:
48 configdir
49 socketdir
50 PACKAGE
51 VERSION
52 tripesock
53 peerdb
54
55Other useful variables:
56 rootcr
57 svcmgr
58
59Other tweakables:
60 _debug
61
62Exceptions:
63 Exception
64 StandardError
65 TripeConnectionError
66 TripeError
67 TripeInternalError
68 TripeJobCancelled
69 TripeJobError
70 TripeSyntaxError
71
72Classes:
73 _Coroutine
74 Coroutine
75 TripeServiceJob
76 OptParse
77 Queue
78 TripeCommand
79 TripeSynchronousCommand
80 TripeAsynchronousCommand
81 TripeCommandIterator
82 TripeConnection
83 TripeCommandDispatcher
84 SelCommandDispatcher
85 TripeServiceManager
86 TripeService
87 TripeServiceCommand
88
89Utility functions:
90 quotify
91 runservices
92 spawn
93 svcinfo
94 timespec
95"""
96
97__pychecker__ = 'self=me no-constCond no-argsused'
98
99_debug = False
100
101###--------------------------------------------------------------------------
102### External dependencies.
103
104import socket as S
105import errno as E
106import mLib as M
107import re as RX
108import sys as SYS
109import os as OS
110
111try:
112 if OS.getenv('TRIPE_FORCE_RMCR') is not None:
113 raise ImportError
114 from py.magic import greenlet as _Coroutine
115except ImportError:
116 from rmcr import Coroutine as _Coroutine
117
118###--------------------------------------------------------------------------
119### Coroutine hacking.
120
121rootcr = _Coroutine.getcurrent()
122
123class Coroutine (_Coroutine):
124 """
125 A coroutine class which can only be invoked by the root coroutine.
126
127 The root, by construction, cannot be an instance of this class.
128 """
129 def switch(me, *args, **kw):
130 assert _Coroutine.getcurrent() is rootcr
131 _Coroutine.switch(me, *args, **kw)
132
133###--------------------------------------------------------------------------
134### Default places for things.
135
136configdir = OS.environ.get('TRIPEDIR', "@configdir@")
137socketdir = "@socketdir@"
138PACKAGE = "@PACKAGE@"
139VERSION = "@VERSION@"
140
141tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
6005ef9b 142peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb')
2fa80010
MW
143
144###--------------------------------------------------------------------------
145### Connection to the server.
146
147def readnonblockingly(sock, len):
148 """
149 Nonblocking read from SOCK.
150
151 Try to return LEN bytes. If couldn't read anything, return None. EOF is
152 returned as an empty string.
153 """
154 try:
155 sock.setblocking(0)
156 return sock.recv(len)
157 except S.error, exc:
158 if exc[0] == E.EWOULDBLOCK:
159 return None
160 raise
161
162class TripeConnectionError (StandardError):
163 """Something happened to the connection with the server."""
164 pass
165class TripeInternalError (StandardError):
166 """This program is very confused."""
167 pass
168
169class TripeConnection (object):
170 """
171 A logical connection to the tripe administration socket.
172
173 There may or may not be a physical connection. (This is needed for the
174 monitor, for example.)
175
176 This class isn't very useful on its own, but it has useful subclasses. At
177 this level, the class is agnostic about I/O multiplexing schemes; that gets
178 added later.
179 """
180
181 def __init__(me, socket):
182 """
183 Make a connection to the named SOCKET.
184
185 No physical connection is made initially.
186 """
187 me.socket = socket
188 me.sock = None
189 me.lbuf = None
190
191 def connect(me):
192 """
193 Ensure that there's a physical connection.
194
195 Do nothing if we're already connected. Invoke the `connected' method if
196 successful.
197 """
198 if me.sock: return
199 sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
200 sock.connect(me.socket)
201 me.sock = sock
202 me.lbuf = M.LineBuffer(me.line, me._eof)
203 me.lbuf.size = 1024
204 me.connected()
205 return me
206
207 def disconnect(me, reason):
208 """
209 Disconnect the physical connection.
210
211 Invoke the `disconnected' method, giving the provided REASON, which
212 should be either None or an exception.
213 """
214 if not me.sock: return
215 me.disconnected(reason)
216 me.sock.close()
217 me.sock = None
218 me.lbuf.disable()
219 me.lbuf = None
220 return me
221
222 def connectedp(me):
223 """
224 Return true if there's a current, believed-good physical connection.
225 """
226 return me.sock is not None
227
228 __nonzero__ = connectedp
229
230 def send(me, line):
231 """
232 Send the LINE to the connection's socket.
233
234 All output is done through this method; it can be overridden to provide
235 proper nonblocking writing, though this seems generally unnecessary.
236 """
237 try:
238 me.sock.setblocking(1)
239 me.sock.send(line + '\n')
240 except Exception, exc:
241 me.disconnect(exc)
242 raise
243 return me
244
245 def receive(me):
246 """
247 Receive whatever's ready from the connection's socket.
248
249 Call `line' on each complete line, and `eof' if the connection closed.
250 Subclasses which attach this class to an I/O-event system should call
251 this method when the socket (CONN.sock) is ready for reading.
252 """
253 while me.sock is not None:
254 try:
255 buf = readnonblockingly(me.sock, 16384)
256 except Exception, exc:
257 me.disconnect(exc)
258 raise
259 if buf is None:
260 return me
261 if buf == '':
262 me._eof()
263 return me
264 me.lbuf.flush(buf)
265 return me
266
267 def _eof(me):
268 """Internal end-of-file handler."""
269 me.disconnect(TripeConnectionError('connection lost'))
270 me.eof()
271
272 def connected(me):
273 """
274 To be overridden by subclasses to react to a connection being
275 established.
276 """
277 pass
278
279 def disconnected(me, reason):
280 """
281 To be overridden by subclasses to react to a connection being severed.
282 """
283 pass
284
285 def eof(me):
286 """To be overridden by subclasses to handle end-of-file."""
287 pass
288
289 def line(me, line):
290 """To be overridden by subclasses to handle incoming lines."""
291 pass
292
293###--------------------------------------------------------------------------
294### Dispatching coroutine.
295
296## Match a string if it can stand on its own as a bareword: i.e., it doesn't
297## contain backslashes, quotes or whitespace.
298rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
299
300## Match characters which need to be escaped, even in quoted text.
301rx_weird = RX.compile(r'([\\\'])')
302
303def quotify(s):
304 """Quote S according to the tripe-admin(5) rules."""
305 m = rx_ordinary.match(s)
306 if m and m.end() == len(s):
307 return s
308 else:
309 return "'" + rx_weird.sub(r'\\\1', s) + "'"
310
311def _callback(func):
312 """
313 Return a wrapper for FUNC which reports exceptions thrown by it.
314
315 Useful in the case of callbacks invoked by C functions which ignore
316 exceptions.
317 """
318 def _(*a, **kw):
319 try:
320 return func(*a, **kw)
321 except:
322 SYS.excepthook(*SYS.exc_info())
323 raise
324 return _
325
326class TripeCommand (object):
327 """
328 This abstract class represents a command in progress.
329
330 The `words' attribute contains the list of tokens which make up the
331 command.
332
333 Subclasses must implement a method to handle server responses:
334
335 * response(CODE, *ARGS): CODE is one of the strings 'OK', 'INFO' or
336 'FAIL'; ARGS are the remaining tokens from the server's response.
337 """
338
339 def __init__(me, words):
340 """Make a new command consisting of the given list of WORDS."""
341 me.words = words
342
343class TripeSynchronousCommand (TripeCommand):
344 """
345 A simple command, processed apparently synchronously.
346
347 Must be invoked from a coroutine other than the root (or whichever one is
348 running the dispatcher); in reality, other coroutines carry on running
349 while we wait for a response from the server.
350
351 Each server response causes the calling coroutine to be resumed with the
352 pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
353 or `FAIL') and REST is a list of the server's other response tokens. The
354 calling coroutine must continue switching back to the dispatcher until a
355 terminating response (`OK' or `FAIL') is received or become very
356 confused.
357
358 Mostly it's better to use the TripeCommandIterator to do this
359 automatically.
360 """
361
362 def __init__(me, words):
363 """Initialize the command, specifying the WORDS to send to the server."""
364 TripeCommand.__init__(me, words)
365 me.owner = Coroutine.getcurrent()
366
367 def response(me, code, *rest):
368 """Handle a server response by forwarding it to the calling coroutine."""
369 me.owner.switch((code, rest))
370
371class TripeError (StandardError):
372 """
373 A tripe command failed with an error (a FAIL code). The args attribute
374 contains a list of the server's message tokens.
375 """
376 pass
377
378class TripeCommandIterator (object):
379 """
380 Iterator interface to a tripe command.
381
382 The values returned by the iterator are lists of tokens from the server's
383 INFO lines, as processed by the given filter function, if any. The
384 iterator completes normally (by raising StopIteration) if the server
385 reported OK, and raises an exception if the command failed for some reason.
386
387 A TripeError is raised if the server issues a FAIL code. If the connection
388 failed, some other exception is raised.
389 """
390
391 def __init__(me, dispatcher, words, bg = False, filter = None):
392 """
393 Create a new command iterator.
394
395 The command is submitted to the DISPATCHER; it consists of the given
396 WORDS. If BG is true, then an option is inserted to request that the
397 server run the command in the background. The FILTER is applied to the
398 token lists which the server responds, and the filter's output are the
399 items returned by the iterator.
400 """
401 me.dcr = Coroutine.getcurrent().parent
402 if me.dcr is None:
403 raise ValueError, 'must invoke from coroutine'
404 me.filter = filter or (lambda x: x)
405 if bg:
406 words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
407 dispatcher.rawcommand(TripeSynchronousCommand(words))
408
409 def __iter__(me):
410 """Iterator protocol: I am my own iterator."""
411 return me
412
413 def next(me):
414 """
415 Iterator protocol: return the next piece of information from the server.
416
417 INFO responses are filtered and returned as the values of the iteration.
418 FAIL and CONNERR responses are turned into exceptions and raised.
419 Finally, OK is turned into StopIteration, which should cause a normal end
420 to the iteration process.
421 """
422 thing = me.dcr.switch()
423 code, rest = thing
424 if code == 'INFO':
425 return me.filter(rest)
426 elif code == 'OK':
427 raise StopIteration
428 elif code == 'CONNERR':
429 if rest is None:
430 raise TripeConnectionError, 'connection terminated by user'
431 else:
432 raise rest
433 elif code == 'FAIL':
434 raise TripeError(*rest)
435 else:
436 raise TripeInternalError \
437 ('unexpected tripe response %r' % ([code] + rest))
438
439### Simple utility functions for the TripeCommandIterator convenience
440### methods.
441
442def _tokenjoin(words):
443 """Filter function: simply join the given tokens with spaces between."""
444 return ' '.join(words)
445
446def _keyvals(iter):
447 """Return a dictionary formed from the KEY=VALUE pairs returned by the
448 iterator ITER."""
449 kv = {}
450 for ww in iter:
451 for w in ww:
452 q = w.index('=')
453 kv[w[:q]] = w[q + 1:]
454 return kv
455
456def _simple(iter):
457 """Raise an error if ITER contains any item."""
458 stuff = list(iter)
459 if len(stuff) != 0:
460 raise TripeInternalError('expected no response')
461 return None
462
463def _oneline(iter):
464 """If ITER contains a single item, return it; otherwise raise an error."""
465 stuff = list(iter)
466 if len(stuff) != 1:
467 raise TripeInternalError('expected only one line of response')
468 return stuff[0]
469
470def _tracelike(iter):
471 """Handle a TRACE-like command. The result is a list of tuples (CHAR,
472 STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
473 disabled, `+' if enabled, maybe something else later), and DESC is the
474 human-readable description."""
475 stuff = []
476 for ww in iter:
477 ch = ww[0][0]
478 st = ww[0][1:]
479 desc = ' '.join(ww[1:])
480 stuff.append((ch, st, desc))
481 return stuff
482
483def _kwopts(kw, allowed):
484 """Parse keyword arguments into options. ALLOWED is a list of allowable
485 keywords; raise errors if other keywords are present. KEY = VALUE becomes
486 an option pair -KEY VALUE if VALUE is a string, just the option -KEY if
487 VALUE is a true non-string, or nothing if VALUE is false.. Insert a `--'
488 at the end to stop the parser getting confused."""
489 opts = []
490 amap = {}
491 for a in allowed: amap[a] = True
492 for k, v in kw.iteritems():
493 if k not in amap:
494 raise ValueError('option %s not allowed here' % k)
495 if isinstance(v, str):
496 opts += ['-' + k, v]
497 elif v:
498 opts += ['-' + k]
499 opts.append('--')
500 return opts
501
502class TripeCommandDispatcher (TripeConnection):
503 """
504 Command dispatcher.
505
506 The command dispatcher is a connection which knows how to handle commands.
507 This is probably the most important class in this module to understand.
508
509 Lines from the server are parsed into tokens. The first token is a code
510 (OK or NOTE or something) explaining what kind of line this is. The
511 `handler' attribute is a dictionary mapping server line codes to handler
512 functions, which are applied to the words of the line as individual
513 arguments. *Exception*: the content of TRACE lines is not tokenized.
514
515 There are default handlers for server codes which respond to commands.
516 Commands arrive as TripeCommand instances through the `rawcommand'
517 interface. The dispatcher keeps track of which command objects represent
518 which jobs, and sends responses on to the appropriate command objects by
519 invoking their `response' methods. Command objects don't see the
520 BG... codes, because the dispatcher has already transformed them into
521 regular codes when it was looking up job code.
522
523 The dispatcher also has a special response code of its own: CONNERR
524 indicates that the connection failed and the command has therefore been
525 lost; the
526 """
527
528 ## --- Infrastructure ---
529 ##
530 ## We will get confused if we pipeline commands. Send them one at a time.
531 ## Only send a command when the previous one detaches or completes.
532 ##
533 ## The following attributes are interesting:
534 ##
535 ## tagseq Sequence number for next background job (for bgtag)
536 ##
537 ## queue Commands awaiting submission.
538 ##
539 ## cmd Mapping from job tags to commands: cmd[None] is the
540 ## foreground command.
541 ##
542 ## handler Mapping from server codes to handler functions.
543
544 def __init__(me, socket):
545 """
546 Initialize the dispatcher.
547
548 The SOCKET is the filename of the administration socket to connect to,
549 for TripeConnection.__init__.
550 """
551 TripeConnection.__init__(me, socket)
552 me.tagseq = 0
553 me.handler = {}
554 me.handler['BGDETACH'] = me._detach
555 for i in 'BGOK', 'BGINFO', 'BGFAIL':
556 me.handler[i] = me._response
557 for i in 'OK', 'INFO', 'FAIL':
558 me.handler[i] = me._fgresponse
559
560 def connected(me):
561 """
562 Connection hook.
563
564 If a subclass overrides this method, it must call us; clears out the
565 command queue and job map.
566 """
567 me.queue = M.Array()
568 me.cmd = {}
569
570 def disconnected(me, reason):
571 """
572 Disconnection hook.
573
574 If a subclass hooks overrides this method, it must call us; sends a
575 special CONNERR code to all incomplete commands.
576 """
577 for cmd in me.cmd.itervalues():
578 cmd.response('CONNERR', reason)
579 for cmd in me.queue:
580 cmd.response('CONNERR', reason)
581
582 @_callback
583 def line(me, line):
584 """Handle an incoming line, sending it to the right place."""
585 if _debug: print '<', line
586 code, rest = M.word(line, quotep = True)
587 func = me.handler.get(code)
588 if func is not None:
589 if code == 'TRACE':
590 func(code, rest)
591 else:
592 func(code, *M.split(rest, quotep = True)[0])
593 me.dequeue()
594
595 def dequeue(me):
596 """
597 Pull the oldest command off the queue and try to send it to the server.
598 """
599 if not me.queue or None in me.cmd: return
600 cmd = me.queue.shift()
601 if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
602 me.send(' '.join([quotify(w) for w in cmd.words]))
603 me.cmd[None] = cmd
604
605 def bgtag(me):
606 """
607 Return an unused job tag.
608
609 May be of use when composing commands by hand.
610 """
611 tag = 'J%05d' % me.tagseq
612 me.tagseq += 1
613 return tag
614
615 ## --- Built-in handler functions for server responses ---
616
617 def _detach(me, _, tag):
618 """
619 Respond to a BGDETACH TAG message.
620
621 Move the current foreground command to the background.
622 """
623 assert tag not in me.cmd
624 me.cmd[tag] = me.cmd[None]
625 del me.cmd[None]
626
627 def _response(me, code, tag, *w):
628 """
629 Respond to an OK, INFO or FAIL message.
630
631 If this is a message for a background job, find the tag; then dispatch
632 the result to the command object.
633 """
634 if code.startswith('BG'):
635 code = code[2:]
636 cmd = me.cmd[tag]
637 if code != 'INFO':
638 del me.cmd[tag]
639 cmd.response(code, *w)
640
641 def _fgresponse(me, code, *w):
642 """Process responses to the foreground command."""
643 me._response(code, None, *w)
644
645 ## --- Interface methods ---
646
647 def rawcommand(me, cmd):
648 """
649 Submit the TripeCommand CMD to the server, and look after it until it
650 completes.
651 """
652 if not me.connectedp():
653 raise TripeConnectionError('connection closed')
654 me.queue.push(cmd)
655 me.dequeue()
656
657 def command(me, *cmd, **kw):
658 """Convenience wrapper for creating a TripeCommandIterator object."""
659 return TripeCommandIterator(me, cmd, **kw)
660
661 ## --- Convenience methods for server commands ---
662
663 def add(me, peer, *addr, **kw):
664 return _simple(me.command(bg = True,
665 *['ADD'] +
666 _kwopts(kw, ['tunnel', 'keepalive', 'cork']) +
667 [peer] +
668 list(addr)))
669 def addr(me, peer):
670 return _oneline(me.command('ADDR', peer))
671 def algs(me):
672 return _keyvals(me.command('ALGS'))
673 def checkchal(me, chal):
674 return _simple(me.command('CHECKCHAL', chal))
675 def daemon(me):
676 return _simple(me.command('DAEMON'))
677 def eping(me, peer, **kw):
678 return _oneline(me.command(bg = True,
679 *['PING'] +
680 _kwopts(kw, ['timeout']) +
681 [peer]))
682 def forcekx(me, peer):
683 return _simple(me.command('FORCEKX', peer))
684 def getchal(me):
685 return _oneline(me.command('GETCHAL', filter = _tokenjoin))
686 def greet(me, peer, chal):
687 return _simple(me.command('GREET', peer, chal))
688 def help(me):
689 return list(me.command('HELP', filter = _tokenjoin))
690 def ifname(me, peer):
691 return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
692 def kill(me, peer):
693 return _simple(me.command('KILL', peer))
694 def list(me):
695 return list(me.command('LIST', filter = _tokenjoin))
696 def notify(me, *msg):
697 return _simple(me.command('NOTIFY', *msg))
698 def peerinfo(me, peer):
699 return _keyvals(me.command('PEERINFO', peer))
700 def ping(me, peer, **kw):
701 return _oneline(me.command(bg = True,
702 *['PING'] +
703 _kwopts(kw, ['timeout']) +
704 [peer]))
705 def port(me):
706 return _oneline(me.command('PORT', filter = _tokenjoin))
707 def quit(me):
708 return _simple(me.command('QUIT'))
709 def reload(me):
710 return _simple(me.command('RELOAD'))
711 def servinfo(me):
712 return _keyvals(me.command('SERVINFO'))
713 def setifname(me, new):
714 return _simple(me.command('SETIFNAME', new))
715 def svcclaim(me, service, version):
716 return _simple(me.command('SVCCLAIM', service, version))
717 def svcensure(me, service, version = None):
718 return _simple(me.command('SVCENSURE', service,
719 *((version is not None and [version]) or [])))
720 def svcfail(me, job, *msg):
721 return _simple(me.command('SVCFAIL', job, *msg))
722 def svcinfo(me, job, *msg):
723 return _simple(me.command('SVCINFO', job, *msg))
724 def svclist(me):
725 return list(me.command('SVCLIST'))
726 def svcok(me, job):
727 return _simple(me.command('SVCOK', job))
728 def svcquery(me, service):
729 return _keyvals(me.command('SVCQUERY', service))
730 def svcrelease(me, service):
731 return _simple(me.command('SVCRELEASE', service))
732 def svcsubmit(me, service, *args, **kw):
733 return me.command(bg = True,
734 *['SVCSUBMIT'] +
735 _kwopts(kw, ['version']) +
736 [service] +
737 list(args))
738 def stats(me, peer):
739 return _keyvals(me.command('STATS', peer))
740 def trace(me, *args):
741 return _tracelike(me.command('TRACE', *args))
742 def tunnels(me):
743 return list(me.command('TUNNELS', filter = _tokenjoin))
744 def version(me):
745 return _oneline(me.command('VERSION', filter = _tokenjoin))
746 def warn(me, *msg):
747 return _simple(me.command('WARN', *msg))
748 def watch(me, *args):
749 return _tracelike(me.command('WATCH', *args))
750
751###--------------------------------------------------------------------------
752### Asynchronous commands.
753
754class Queue (object):
755 """
756 A queue of things arriving asynchronously.
757
758 This is a very simple single-reader multiple-writer queue. It's useful for
759 more complex coroutines which need to cope with a variety of possible
760 incoming events.
761 """
762
763 def __init__(me):
764 """Create a new empty queue."""
765 me.contents = M.Array()
766 me.waiter = None
767
768 def _wait(me):
769 """
770 Internal: wait for an item to arrive in the queue.
771
772 Complain if someone is already waiting, because this is just a
773 single-reader queue.
774 """
775 if me.waiter:
776 raise ValueError('queue already being waited on')
777 try:
778 me.waiter = Coroutine.getcurrent()
779 while not me.contents:
780 me.waiter.parent.switch()
781 finally:
782 me.waiter = None
783
784 def get(me):
785 """
786 Remove and return the item at the head of the queue.
787
788 If the queue is empty, wait until an item arrives.
789 """
790 me._wait()
791 return me.contents.shift()
792
793 def peek(me):
794 """
795 Return the item at the head of the queue without removing it.
796
797 If the queue is empty, wait until an item arrives.
798 """
799 me._wait()
800 return me.contents[0]
801
802 def put(me, thing):
803 """
804 Write THING to the queue.
805
806 If someone is waiting on the queue, wake him up immediately; otherwise
807 just leave the item there for later.
808 """
809 me.contents.push(thing)
810 if me.waiter:
811 me.waiter.switch()
812
813class TripeAsynchronousCommand (TripeCommand):
814 """
815 Asynchronous commands.
816
817 This is the complicated way of issuing commands. You must set up a queue,
818 and associate the command with the queue. Responses arriving for the
819 command will be put on the queue as an triple of the form (TAG, CODE, REST)
820 -- where TAG is an object of your choice, not interpreted by this class,
821 CODE is the server's response code (OK, INFO, FAIL), and REST is the list
822 of the rest of the server's tokens.
823
824 Using this, you can write coroutines which process many commands (and
825 possibly other events) simultaneously.
826 """
827
828 def __init__(me, queue, tag, words):
829 """Make an asynchronous command consisting of the given WORDS, which
830 sends responses to QUEUE, labelled with TAG."""
831 TripeCommand.__init__(me, words)
832 me.queue = queue
833 me.tag = tag
834
835 def response(me, code, *stuff):
836 """Handle a server response by writing it to the caller's queue."""
837 me.queue.put((me.tag, code, list(stuff)))
838
839###--------------------------------------------------------------------------
840### Selecting command dispatcher.
841
842class SelCommandDispatcher (TripeCommandDispatcher):
843 """
844 A command dispatcher which integrates with mLib's I/O-event system.
845
846 To use, simply create an instance and run mLib.select in a loop in your
847 main coroutine.
848 """
849
850 def __init__(me, socket):
851 """
852 Create an instance; SOCKET is the admin socket to connect to.
853
854 Note that no connection is made initially.
855 """
856 TripeCommandDispatcher.__init__(me, socket)
857 me.selfile = None
858
859 def connected(me):
860 """Connection hook: wires itself into the mLib select machinery."""
861 TripeCommandDispatcher.connected(me)
862 me.selfile = M.SelFile(me.sock.fileno(), M.SEL_READ, me.receive)
863 me.selfile.enable()
864
865 def disconnected(me, reason):
866 """Disconnection hook: removes itself from the mLib select machinery."""
867 TripeCommandDispatcher.disconnected(me, reason)
868 me.selfile = None
869
870###--------------------------------------------------------------------------
871### Services.
872
873class TripeJobCancelled (Exception):
874 """
875 Exception sent to job handler if the client kills the job.
876
877 Not propagated further.
878 """
879 pass
880
881class TripeJobError (Exception):
882 """
883 Exception to cause failure report for running job.
884
885 Sends an SVCFAIL code back.
886 """
887 pass
888
889class TripeSyntaxError (Exception):
890 """
891 Exception to report a syntax error for a job.
892
893 Sends an SVCFAIL bad-svc-syntax message back.
894 """
895 pass
896
897class TripeServiceManager (SelCommandDispatcher):
898 """
899 A command dispatcher with added handling for incoming service requests.
900
901 There is usually only one instance of this class, called svcmgr. Some of
902 the support functions in this module assume that this is the case.
903
904 To use, run mLib.select in a loop until the quitp method returns true;
905 then, in a non-root coroutine, register your services by calling `add', and
906 then call `running' when you've finished setting up.
907
908 The instance handles server service messages SVCJOB, SVCCANCEL and
909 SVCCLAIM. It maintains a table of running services. Incoming jobs cause
910 the service's `job' method to be invoked; SVCCANCEL sends a
911 TripeJobCancelled exception to the handler coroutine, and SVCCLAIM causes
912 the relevant service to be deregistered.
913
914 There is no base class for jobs, but a job must implement two methods:
915
916 start() Begin processing; might be a no-op.
917
918 cancel() Stop processing; the original client has killed the
919 job.
920
921 The life of a service manager is divided into two parts: setup and running;
922 you tell the manager that you've finished setting up by calling the
923 `running' method. If, at any point after setup is finished, there are no
924 remaining services or jobs, `quitp' will return true, ending the process.
925 """
926
927 ## --- Attributes ---
928 ##
929 ## svc Mapping name -> service object
930 ##
931 ## job Mapping jobid -> job handler coroutine
932 ##
933 ## runningp True when setup is finished
934 ##
935 ## _quitp True if explicit quit has been requested
936
937 def __init__(me, socket):
938 """
939 Initialize the service manager.
940
941 SOCKET is the administration socket to connect to.
942 """
943 SelCommandDispatcher.__init__(me, socket)
944 me.svc = {}
945 me.job = {}
946 me.runningp = False
947 me.handler['SVCCANCEL'] = me._cancel
948 me.handler['SVCJOB'] = me._job
949 me.handler['SVCCLAIM'] = me._claim
950 me._quitp = 0
951
952 def addsvc(me, svc):
953 """Register a new service; SVC is a TripeService instance."""
954 assert svc.name not in me.svc
955 me.svcclaim(svc.name, svc.version)
956 me.svc[svc.name] = svc
957
958 def _cancel(me, _, jid):
959 """
960 Called when the server cancels a job; invokes the job's `cancel' method.
961 """
962 job = me.job[jid]
963 del me.job[jid]
964 job.cancel()
965
966 def _claim(me, _, svc, __):
967 """Called when another program claims our service at a higher version."""
968 del me.svc[svc]
969
970 def _job(me, _, jid, svc, cmd, *args):
971 """
972 Called when the server sends us a job to do.
973
974 Calls the service to collect a job, and begins processing it.
975 """
976 assert jid not in me.job
977 svc = me.svc[svc.lower()]
978 job = svc.job(jid, cmd, args)
979 me.job[jid] = job
980 job.start()
981
982 def running(me):
983 """Answer true if setup is finished."""
984 me.runningp = True
985
986 def jobdone(me, jid):
987 """Informs the service manager that the job with id JID has finished."""
988 try:
989 del me.job[jid]
990 except KeyError:
991 pass
992
993 def quitp(me):
994 """
995 Return true if no services or jobs are active (and, therefore, if this
996 process can quit without anyone caring).
997 """
998 return me._quitp or (me.runningp and ((not me.svc and not me.job) or
999 not me.selfile))
1000
1001 def quit(me):
1002 """Forces the quit flag (returned by quitp) on."""
1003 me._quitp = True
1004
1005class TripeService (object):
1006 """
1007 A standard service.
1008
1009 The NAME and VERSION are passed on to the server. The CMDTAB is a
1010 dictionary mapping command names (in lowercase) to command objects.
1011
1012 If the CMDTAB doesn't have entries for commands HELP and QUIT then defaults
1013 are provided.
1014
1015 TripeService itself is mostly agnostic about the nature of command objects,
1016 but the TripeServiceJob class (below) has some requirements. The built-in
1017 HELP command requires command objects to have `usage' attributes.
1018 """
1019
1020 def __init__(me, name, version, cmdtab):
1021 """
1022 Create and register a new service with the given NAME and VERSION.
1023
1024 CMDTAB maps command names (in lower-case) to command objects.
1025 """
1026 me.name = name
1027 me.version = version
1028 me.cmd = cmdtab
1029 me.activep = True
1030 me.cmd.setdefault('help',
1031 TripeServiceCommand('help', 0, 0, '', me._help))
1032 me.cmd.setdefault('quit',
1033 TripeServiceCommand('quit', 0, 0, '', me._quit))
1034
1035 def job(me, jid, cmd, args):
1036 """
1037 Called by the service manager: a job arrived with id JID.
1038
1039 It asks for comamnd CMD with argument list ARGS. Creates a new job,
1040 passing it the information needed.
1041 """
1042 return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1043
1044 ## Simple default command handlers, complying with the spec in
1045 ## tripe-service(7).
1046
1047 def _help(me):
1048 """Send a help summary to the user."""
1049 cmds = me.cmd.items()
1050 cmds.sort()
1051 for name, cmd in cmds:
1052 svcinfo(name, *cmd.usage)
1053
1054 def _quit(me):
1055 """Terminate the service manager."""
1056 svcmgr.notify('svc-quit', me.name, 'admin-request')
1057 svcmgr.quit()
1058
1059class TripeServiceCommand (object):
1060 """A simple service command."""
1061
1062 def __init__(me, name, min, max, usage, func):
1063 """
1064 Creates a new command.
1065
1066 NAME is the command's name (in lowercase).
1067
1068 MIN and MAX are the minimum and maximum number of allowed arguments (used
1069 for checking); either may be None to indicate no minimum or maximum.
1070
1071 USAGE is a usage string, used for generating help and error messages.
1072
1073 FUNC is the function to invoke.
1074 """
1075 me.name = name
1076 me.min = min
1077 me.max = max
1078 me.usage = usage.split()
1079 me.func = func
1080
1081 def run(me, *args):
1082 """
1083 Called when the command is invoked.
1084
1085 Does minimal checking of the arguments and calls the supplied function.
1086 """
1087 if (me.min is not None and len(args) < me.min) or \
1088 (me.max is not None and len(args) > me.max):
1089 raise TripeSyntaxError
1090 me.func(*args)
1091
1092class TripeServiceJob (Coroutine):
1093 """
1094 Job handler coroutine.
1095
1096 A standard TripeService invokes a TripeServiceJob for each incoming job
1097 request, passing it the jobid, command and arguments, and a command
1098 object. The command object needs the following attributes.
1099
1100 usage A usage list (excluding the command name) showing
1101 arguments and options.
1102
1103 run(*ARGS) Function to react to the command with ARGS split into
1104 separate arguments. Invoked in a coroutine. The
1105 svcinfo function (not the TripeCommandDispatcher
1106 method) may be used to send INFO lines. The function
1107 may raise TripeJobError to send a FAIL response back,
1108 or TripeSyntaxError to send a generic usage error.
1109 TripeJobCancelled exceptions are trapped silently.
1110 Other exceptions are translated into a generic
1111 internal-error message.
1112
1113 This class automatically takes care of sending some closing response to the
1114 job, and for informing the service manager that the job is completed.
1115
1116 The `jid' attribute stores the job's id.
1117 """
1118
1119 def __init__(me, jid, svc, cmd, command, args):
1120 """
1121 Start a new job.
1122
1123 The job is created with id JID, for service SVC, processing command name
1124 CMD (which the service resolved into the command object COMMAND, or
1125 None), and with the arguments ARGS.
1126 """
1127 Coroutine.__init__(me)
1128 me.jid = jid
1129 me.svc = svc
1130 me.cmd = cmd
1131 me.command = command
1132 me.args = args
1133
1134 def run(me):
1135 """
1136 Main body of the coroutine.
1137
1138 Does the tedious exception handling boilerplate and invokes the command's
1139 run method.
1140 """
1141 try:
1142 try:
1143 if me.command is None:
1144 svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1145 else:
1146 me.command.run(*me.args)
1147 svcmgr.svcok(me.jid)
1148 except TripeJobError, exc:
1149 svcmgr.svcfail(me.jid, *exc.args)
1150 except TripeSyntaxError:
1151 svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1152 me.svc.name, me.command.name,
1153 *me.command.usage)
1154 except TripeJobCancelled:
1155 pass
1156 except Exception, exc:
1157 svcmgr.svcfail(me.jid, 'svc-internal-error',
1158 exc.__class__.__name__, str(exc))
1159 finally:
1160 svcmgr.jobdone(me.jid)
1161
1162 def start(me):
1163 """Invoked by the service manager to start running the coroutine."""
1164 me.switch()
1165
1166 def cancel(me):
1167 """Invoked by the service manager to cancel the job."""
1168 me.throw(TripeJobCancelled())
1169
1170def svcinfo(*args):
1171 """
1172 If invoked from a TripeServiceJob coroutine, sends an INFO line to the
1173 job's sender, automatically using the correct job id.
1174 """
1175 svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1176
1177def _setupsvc(tab, func):
1178 """
1179 Setup coroutine for setting up service programs.
1180
1181 Register the given services.
1182 """
1183 try:
1184 for service in tab:
1185 svcmgr.addsvc(service)
1186 if func:
1187 func()
1188 finally:
1189 svcmgr.running()
1190
1191svcmgr = TripeServiceManager(None)
1192_spawnq = []
1193def runservices(socket, tab, init = None, setup = None, daemon = False):
1194 """
1195 Function to start a service provider.
1196
1197 SOCKET is the socket to connect to, usually tripesock.
1198
1199 TAB is a list of entries. An entry may be either a tuple
1200
1201 (NAME, VERSION, COMMANDS)
1202
1203 or a service object (e.g., a TripeService instance).
1204
1205 COMMANDS is a dictionary mapping command names to tuples
1206
1207 (MIN, MAX, USAGE, FUNC)
1208
1209 of arguments for a TripeServiceCommand object.
1210
1211 If DAEMON is true, then the process is forked into the background before we
1212 start. If INIT is given, it is called in the main coroutine, immediately
1213 after forking. If SETUP is given, it is called in a coroutine, after
1214 calling INIT and setting up the services but before marking the service
1215 manager as running.
1216
1217 It is a really bad idea to do any initialization, particularly setting up
1218 coroutines, outside of the INIT or SETUP functions. In particular, if
1219 we're using rmcr for fake coroutines, the daemonizing fork will kill off
1220 the currently established coroutines in a most surprising way.
1221
1222 The function runs a main select loop until the service manager decides to
1223 quit.
1224 """
1225
1226 global _spawnq
1227 svcmgr.socket = socket
1228 svcmgr.connect()
1229 svcs = []
1230 for service in tab:
1231 if not isinstance(service, tuple):
1232 svcs.append(service)
1233 else:
1234 name, version, commands = service
1235 cmdmap = {}
1236 for cmd, stuff in commands.iteritems():
1237 cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1238 svcs.append(TripeService(name, version, cmdmap))
1239 if daemon:
1240 M.daemonize()
1241 if init is not None:
1242 init()
1243 Coroutine(_setupsvc).switch(svcs, setup)
1244 while not svcmgr.quitp():
1245 for cr, args, kw in _spawnq:
1246 cr.switch(*args, **kw)
1247 _spawnq = []
1248 M.select()
1249
1250def spawn(cr, *args, **kw):
1251 """
1252 Utility for spawning coroutines.
1253
1254 The coroutine CR is made to be a direct child of the root coroutine, and
1255 invoked by it with the given arguments.
1256 """
1257 cr.parent = rootcr
1258 _spawnq.append((cr, args, kw))
1259
1260###--------------------------------------------------------------------------
1261### Utilities for services.
1262
1263_timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
1264def timespec(spec):
1265 """Parse the timespec SPEC, returning a number of seconds."""
1266 mul = 1
1267 if len(spec) > 1 and spec[-1] in _timeunits:
1268 mul = _timeunits[spec[-1]]
1269 spec = spec[:-1]
1270 try:
1271 t = int(spec)
1272 except:
1273 raise TripeJobError('bad-time-spec', spec)
1274 if t < 0:
1275 raise TripeJobError('bad-time-spec', spec)
1276 return mul * int(spec)
1277
1278class OptParse (object):
1279 """
1280 Parse options from a command list in the conventional fashion.
1281
1282 ARGS is a list of arguments to a command. ALLOWED is a sequence of allowed
1283 options. The returned values are the option tags. During parsing, the
1284 `arg' method may be used to retrieve the argument for the most recent
1285 option. Afterwards, `rest' may be used to retrieve the remaining
1286 non-option arguments, and do a simple check on how many there are.
1287
1288 The parser correctly handles `--' option terminators.
1289 """
1290
1291 def __init__(me, args, allowed):
1292 """
1293 Create a new option parser.
1294
1295 The parser will scan the ARGS for options given in the sequence ALLOWED
1296 (which are expected to include the `-' prefix).
1297 """
1298 me.allowed = {}
1299 for a in allowed:
1300 me.allowed[a] = True
1301 me.args = list(args)
1302
1303 def __iter__(me):
1304 """Iterator protocol: I am my own iterator."""
1305 return me
1306
1307 def next(me):
1308 """
1309 Iterator protocol: return the next option.
1310
1311 If we've run out, raise StopIteration.
1312 """
1313 if len(me.args) == 0 or \
1314 len(me.args[0]) < 2 or \
1315 not me.args[0].startswith('-'):
1316 raise StopIteration
1317 opt = me.args.pop(0)
1318 if opt == '--':
1319 raise StopIteration
1320 if opt not in me.allowed:
1321 raise TripeSyntaxError
1322 return opt
1323
1324 def arg(me):
1325 """
1326 Return the argument for the most recent option.
1327
1328 If none is available, raise TripeSyntaxError.
1329 """
1330 if len(me.args) == 0:
1331 raise TripeSyntaxError
1332 return me.args.pop(0)
1333
1334 def rest(me, min = None, max = None):
1335 """
1336 After option parsing is done, return the remaining arguments.
1337
1338 Check that there are at least MIN and at most MAX arguments remaining --
1339 either may be None to suppress the check.
1340 """
1341 if (min is not None and len(me.args) < min) or \
1342 (max is not None and len(me.args) > max):
1343 raise TripeSyntaxError
1344 return me.args
1345
1346###----- That's all, folks --------------------------------------------------