chiark / gitweb /
py: New Python module for writing services and suchlike
[tripe] / py / tripe.py.in
CommitLineData
2fa80010
MW
1### -*-python-*-
2###
3### Administration connection with tripe server
4###
5### (c) 2006 Straylight/Edgeware
6###
7
8###----- Licensing notice ---------------------------------------------------
9###
10### This file is part of Trivial IP Encryption (TrIPE).
11###
12### TrIPE is free software; you can redistribute it and/or modify
13### it under the terms of the GNU General Public License as published by
14### the Free Software Foundation; either version 2 of the License, or
15### (at your option) any later version.
16###
17### TrIPE is distributed in the hope that it will be useful,
18### but WITHOUT ANY WARRANTY; without even the implied warranty of
19### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20### GNU General Public License for more details.
21###
22### You should have received a copy of the GNU General Public License
23### along with TrIPE; if not, write to the Free Software Foundation,
24### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25
26"""
27This module provides classes and functions for connecting to a running tripe
28server, sending it commands, receiving and processing replies, and
29implementing services.
30
31Rather than end up in lost in a storm of little event-driven classes, or a
32morass of concurrent threads, the module uses coroutines to present a fairly
33simple function call/return interface to potentially long-running commands
34which must run without blocking the main process. It sassumes a coroutine
35module presenting a subset of the `greenlet' interface: if actual greenlets
36are available, they are used; otherwise there's an implementation in terms of
37threads (with lots of locking) which will do instead.
38
39The simple rule governing the coroutines used here is this:
40
41 * The root coroutine never cares what values are passed to it when it
42 resumes: it just discards them.
43
44 * Other, non-root, coroutines are presumed to be waiting for some specific
45 thing.
46
47Configuration variables:
48 configdir
49 socketdir
50 PACKAGE
51 VERSION
52 tripesock
53 peerdb
54
55Other useful variables:
56 rootcr
57 svcmgr
58
59Other tweakables:
60 _debug
61
62Exceptions:
63 Exception
64 StandardError
65 TripeConnectionError
66 TripeError
67 TripeInternalError
68 TripeJobCancelled
69 TripeJobError
70 TripeSyntaxError
71
72Classes:
73 _Coroutine
74 Coroutine
75 TripeServiceJob
76 OptParse
77 Queue
78 TripeCommand
79 TripeSynchronousCommand
80 TripeAsynchronousCommand
81 TripeCommandIterator
82 TripeConnection
83 TripeCommandDispatcher
84 SelCommandDispatcher
85 TripeServiceManager
86 TripeService
87 TripeServiceCommand
88
89Utility functions:
90 quotify
91 runservices
92 spawn
93 svcinfo
94 timespec
95"""
96
97__pychecker__ = 'self=me no-constCond no-argsused'
98
99_debug = False
100
101###--------------------------------------------------------------------------
102### External dependencies.
103
104import socket as S
105import errno as E
106import mLib as M
107import re as RX
108import sys as SYS
109import os as OS
110
111try:
112 if OS.getenv('TRIPE_FORCE_RMCR') is not None:
113 raise ImportError
114 from py.magic import greenlet as _Coroutine
115except ImportError:
116 from rmcr import Coroutine as _Coroutine
117
118###--------------------------------------------------------------------------
119### Coroutine hacking.
120
121rootcr = _Coroutine.getcurrent()
122
123class Coroutine (_Coroutine):
124 """
125 A coroutine class which can only be invoked by the root coroutine.
126
127 The root, by construction, cannot be an instance of this class.
128 """
129 def switch(me, *args, **kw):
130 assert _Coroutine.getcurrent() is rootcr
131 _Coroutine.switch(me, *args, **kw)
132
133###--------------------------------------------------------------------------
134### Default places for things.
135
136configdir = OS.environ.get('TRIPEDIR', "@configdir@")
137socketdir = "@socketdir@"
138PACKAGE = "@PACKAGE@"
139VERSION = "@VERSION@"
140
141tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
142
143###--------------------------------------------------------------------------
144### Connection to the server.
145
146def readnonblockingly(sock, len):
147 """
148 Nonblocking read from SOCK.
149
150 Try to return LEN bytes. If couldn't read anything, return None. EOF is
151 returned as an empty string.
152 """
153 try:
154 sock.setblocking(0)
155 return sock.recv(len)
156 except S.error, exc:
157 if exc[0] == E.EWOULDBLOCK:
158 return None
159 raise
160
161class TripeConnectionError (StandardError):
162 """Something happened to the connection with the server."""
163 pass
164class TripeInternalError (StandardError):
165 """This program is very confused."""
166 pass
167
168class TripeConnection (object):
169 """
170 A logical connection to the tripe administration socket.
171
172 There may or may not be a physical connection. (This is needed for the
173 monitor, for example.)
174
175 This class isn't very useful on its own, but it has useful subclasses. At
176 this level, the class is agnostic about I/O multiplexing schemes; that gets
177 added later.
178 """
179
180 def __init__(me, socket):
181 """
182 Make a connection to the named SOCKET.
183
184 No physical connection is made initially.
185 """
186 me.socket = socket
187 me.sock = None
188 me.lbuf = None
189
190 def connect(me):
191 """
192 Ensure that there's a physical connection.
193
194 Do nothing if we're already connected. Invoke the `connected' method if
195 successful.
196 """
197 if me.sock: return
198 sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
199 sock.connect(me.socket)
200 me.sock = sock
201 me.lbuf = M.LineBuffer(me.line, me._eof)
202 me.lbuf.size = 1024
203 me.connected()
204 return me
205
206 def disconnect(me, reason):
207 """
208 Disconnect the physical connection.
209
210 Invoke the `disconnected' method, giving the provided REASON, which
211 should be either None or an exception.
212 """
213 if not me.sock: return
214 me.disconnected(reason)
215 me.sock.close()
216 me.sock = None
217 me.lbuf.disable()
218 me.lbuf = None
219 return me
220
221 def connectedp(me):
222 """
223 Return true if there's a current, believed-good physical connection.
224 """
225 return me.sock is not None
226
227 __nonzero__ = connectedp
228
229 def send(me, line):
230 """
231 Send the LINE to the connection's socket.
232
233 All output is done through this method; it can be overridden to provide
234 proper nonblocking writing, though this seems generally unnecessary.
235 """
236 try:
237 me.sock.setblocking(1)
238 me.sock.send(line + '\n')
239 except Exception, exc:
240 me.disconnect(exc)
241 raise
242 return me
243
244 def receive(me):
245 """
246 Receive whatever's ready from the connection's socket.
247
248 Call `line' on each complete line, and `eof' if the connection closed.
249 Subclasses which attach this class to an I/O-event system should call
250 this method when the socket (CONN.sock) is ready for reading.
251 """
252 while me.sock is not None:
253 try:
254 buf = readnonblockingly(me.sock, 16384)
255 except Exception, exc:
256 me.disconnect(exc)
257 raise
258 if buf is None:
259 return me
260 if buf == '':
261 me._eof()
262 return me
263 me.lbuf.flush(buf)
264 return me
265
266 def _eof(me):
267 """Internal end-of-file handler."""
268 me.disconnect(TripeConnectionError('connection lost'))
269 me.eof()
270
271 def connected(me):
272 """
273 To be overridden by subclasses to react to a connection being
274 established.
275 """
276 pass
277
278 def disconnected(me, reason):
279 """
280 To be overridden by subclasses to react to a connection being severed.
281 """
282 pass
283
284 def eof(me):
285 """To be overridden by subclasses to handle end-of-file."""
286 pass
287
288 def line(me, line):
289 """To be overridden by subclasses to handle incoming lines."""
290 pass
291
292###--------------------------------------------------------------------------
293### Dispatching coroutine.
294
295## Match a string if it can stand on its own as a bareword: i.e., it doesn't
296## contain backslashes, quotes or whitespace.
297rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
298
299## Match characters which need to be escaped, even in quoted text.
300rx_weird = RX.compile(r'([\\\'])')
301
302def quotify(s):
303 """Quote S according to the tripe-admin(5) rules."""
304 m = rx_ordinary.match(s)
305 if m and m.end() == len(s):
306 return s
307 else:
308 return "'" + rx_weird.sub(r'\\\1', s) + "'"
309
310def _callback(func):
311 """
312 Return a wrapper for FUNC which reports exceptions thrown by it.
313
314 Useful in the case of callbacks invoked by C functions which ignore
315 exceptions.
316 """
317 def _(*a, **kw):
318 try:
319 return func(*a, **kw)
320 except:
321 SYS.excepthook(*SYS.exc_info())
322 raise
323 return _
324
325class TripeCommand (object):
326 """
327 This abstract class represents a command in progress.
328
329 The `words' attribute contains the list of tokens which make up the
330 command.
331
332 Subclasses must implement a method to handle server responses:
333
334 * response(CODE, *ARGS): CODE is one of the strings 'OK', 'INFO' or
335 'FAIL'; ARGS are the remaining tokens from the server's response.
336 """
337
338 def __init__(me, words):
339 """Make a new command consisting of the given list of WORDS."""
340 me.words = words
341
342class TripeSynchronousCommand (TripeCommand):
343 """
344 A simple command, processed apparently synchronously.
345
346 Must be invoked from a coroutine other than the root (or whichever one is
347 running the dispatcher); in reality, other coroutines carry on running
348 while we wait for a response from the server.
349
350 Each server response causes the calling coroutine to be resumed with the
351 pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
352 or `FAIL') and REST is a list of the server's other response tokens. The
353 calling coroutine must continue switching back to the dispatcher until a
354 terminating response (`OK' or `FAIL') is received or become very
355 confused.
356
357 Mostly it's better to use the TripeCommandIterator to do this
358 automatically.
359 """
360
361 def __init__(me, words):
362 """Initialize the command, specifying the WORDS to send to the server."""
363 TripeCommand.__init__(me, words)
364 me.owner = Coroutine.getcurrent()
365
366 def response(me, code, *rest):
367 """Handle a server response by forwarding it to the calling coroutine."""
368 me.owner.switch((code, rest))
369
370class TripeError (StandardError):
371 """
372 A tripe command failed with an error (a FAIL code). The args attribute
373 contains a list of the server's message tokens.
374 """
375 pass
376
377class TripeCommandIterator (object):
378 """
379 Iterator interface to a tripe command.
380
381 The values returned by the iterator are lists of tokens from the server's
382 INFO lines, as processed by the given filter function, if any. The
383 iterator completes normally (by raising StopIteration) if the server
384 reported OK, and raises an exception if the command failed for some reason.
385
386 A TripeError is raised if the server issues a FAIL code. If the connection
387 failed, some other exception is raised.
388 """
389
390 def __init__(me, dispatcher, words, bg = False, filter = None):
391 """
392 Create a new command iterator.
393
394 The command is submitted to the DISPATCHER; it consists of the given
395 WORDS. If BG is true, then an option is inserted to request that the
396 server run the command in the background. The FILTER is applied to the
397 token lists which the server responds, and the filter's output are the
398 items returned by the iterator.
399 """
400 me.dcr = Coroutine.getcurrent().parent
401 if me.dcr is None:
402 raise ValueError, 'must invoke from coroutine'
403 me.filter = filter or (lambda x: x)
404 if bg:
405 words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
406 dispatcher.rawcommand(TripeSynchronousCommand(words))
407
408 def __iter__(me):
409 """Iterator protocol: I am my own iterator."""
410 return me
411
412 def next(me):
413 """
414 Iterator protocol: return the next piece of information from the server.
415
416 INFO responses are filtered and returned as the values of the iteration.
417 FAIL and CONNERR responses are turned into exceptions and raised.
418 Finally, OK is turned into StopIteration, which should cause a normal end
419 to the iteration process.
420 """
421 thing = me.dcr.switch()
422 code, rest = thing
423 if code == 'INFO':
424 return me.filter(rest)
425 elif code == 'OK':
426 raise StopIteration
427 elif code == 'CONNERR':
428 if rest is None:
429 raise TripeConnectionError, 'connection terminated by user'
430 else:
431 raise rest
432 elif code == 'FAIL':
433 raise TripeError(*rest)
434 else:
435 raise TripeInternalError \
436 ('unexpected tripe response %r' % ([code] + rest))
437
438### Simple utility functions for the TripeCommandIterator convenience
439### methods.
440
441def _tokenjoin(words):
442 """Filter function: simply join the given tokens with spaces between."""
443 return ' '.join(words)
444
445def _keyvals(iter):
446 """Return a dictionary formed from the KEY=VALUE pairs returned by the
447 iterator ITER."""
448 kv = {}
449 for ww in iter:
450 for w in ww:
451 q = w.index('=')
452 kv[w[:q]] = w[q + 1:]
453 return kv
454
455def _simple(iter):
456 """Raise an error if ITER contains any item."""
457 stuff = list(iter)
458 if len(stuff) != 0:
459 raise TripeInternalError('expected no response')
460 return None
461
462def _oneline(iter):
463 """If ITER contains a single item, return it; otherwise raise an error."""
464 stuff = list(iter)
465 if len(stuff) != 1:
466 raise TripeInternalError('expected only one line of response')
467 return stuff[0]
468
469def _tracelike(iter):
470 """Handle a TRACE-like command. The result is a list of tuples (CHAR,
471 STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
472 disabled, `+' if enabled, maybe something else later), and DESC is the
473 human-readable description."""
474 stuff = []
475 for ww in iter:
476 ch = ww[0][0]
477 st = ww[0][1:]
478 desc = ' '.join(ww[1:])
479 stuff.append((ch, st, desc))
480 return stuff
481
482def _kwopts(kw, allowed):
483 """Parse keyword arguments into options. ALLOWED is a list of allowable
484 keywords; raise errors if other keywords are present. KEY = VALUE becomes
485 an option pair -KEY VALUE if VALUE is a string, just the option -KEY if
486 VALUE is a true non-string, or nothing if VALUE is false.. Insert a `--'
487 at the end to stop the parser getting confused."""
488 opts = []
489 amap = {}
490 for a in allowed: amap[a] = True
491 for k, v in kw.iteritems():
492 if k not in amap:
493 raise ValueError('option %s not allowed here' % k)
494 if isinstance(v, str):
495 opts += ['-' + k, v]
496 elif v:
497 opts += ['-' + k]
498 opts.append('--')
499 return opts
500
501class TripeCommandDispatcher (TripeConnection):
502 """
503 Command dispatcher.
504
505 The command dispatcher is a connection which knows how to handle commands.
506 This is probably the most important class in this module to understand.
507
508 Lines from the server are parsed into tokens. The first token is a code
509 (OK or NOTE or something) explaining what kind of line this is. The
510 `handler' attribute is a dictionary mapping server line codes to handler
511 functions, which are applied to the words of the line as individual
512 arguments. *Exception*: the content of TRACE lines is not tokenized.
513
514 There are default handlers for server codes which respond to commands.
515 Commands arrive as TripeCommand instances through the `rawcommand'
516 interface. The dispatcher keeps track of which command objects represent
517 which jobs, and sends responses on to the appropriate command objects by
518 invoking their `response' methods. Command objects don't see the
519 BG... codes, because the dispatcher has already transformed them into
520 regular codes when it was looking up job code.
521
522 The dispatcher also has a special response code of its own: CONNERR
523 indicates that the connection failed and the command has therefore been
524 lost; the
525 """
526
527 ## --- Infrastructure ---
528 ##
529 ## We will get confused if we pipeline commands. Send them one at a time.
530 ## Only send a command when the previous one detaches or completes.
531 ##
532 ## The following attributes are interesting:
533 ##
534 ## tagseq Sequence number for next background job (for bgtag)
535 ##
536 ## queue Commands awaiting submission.
537 ##
538 ## cmd Mapping from job tags to commands: cmd[None] is the
539 ## foreground command.
540 ##
541 ## handler Mapping from server codes to handler functions.
542
543 def __init__(me, socket):
544 """
545 Initialize the dispatcher.
546
547 The SOCKET is the filename of the administration socket to connect to,
548 for TripeConnection.__init__.
549 """
550 TripeConnection.__init__(me, socket)
551 me.tagseq = 0
552 me.handler = {}
553 me.handler['BGDETACH'] = me._detach
554 for i in 'BGOK', 'BGINFO', 'BGFAIL':
555 me.handler[i] = me._response
556 for i in 'OK', 'INFO', 'FAIL':
557 me.handler[i] = me._fgresponse
558
559 def connected(me):
560 """
561 Connection hook.
562
563 If a subclass overrides this method, it must call us; clears out the
564 command queue and job map.
565 """
566 me.queue = M.Array()
567 me.cmd = {}
568
569 def disconnected(me, reason):
570 """
571 Disconnection hook.
572
573 If a subclass hooks overrides this method, it must call us; sends a
574 special CONNERR code to all incomplete commands.
575 """
576 for cmd in me.cmd.itervalues():
577 cmd.response('CONNERR', reason)
578 for cmd in me.queue:
579 cmd.response('CONNERR', reason)
580
581 @_callback
582 def line(me, line):
583 """Handle an incoming line, sending it to the right place."""
584 if _debug: print '<', line
585 code, rest = M.word(line, quotep = True)
586 func = me.handler.get(code)
587 if func is not None:
588 if code == 'TRACE':
589 func(code, rest)
590 else:
591 func(code, *M.split(rest, quotep = True)[0])
592 me.dequeue()
593
594 def dequeue(me):
595 """
596 Pull the oldest command off the queue and try to send it to the server.
597 """
598 if not me.queue or None in me.cmd: return
599 cmd = me.queue.shift()
600 if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
601 me.send(' '.join([quotify(w) for w in cmd.words]))
602 me.cmd[None] = cmd
603
604 def bgtag(me):
605 """
606 Return an unused job tag.
607
608 May be of use when composing commands by hand.
609 """
610 tag = 'J%05d' % me.tagseq
611 me.tagseq += 1
612 return tag
613
614 ## --- Built-in handler functions for server responses ---
615
616 def _detach(me, _, tag):
617 """
618 Respond to a BGDETACH TAG message.
619
620 Move the current foreground command to the background.
621 """
622 assert tag not in me.cmd
623 me.cmd[tag] = me.cmd[None]
624 del me.cmd[None]
625
626 def _response(me, code, tag, *w):
627 """
628 Respond to an OK, INFO or FAIL message.
629
630 If this is a message for a background job, find the tag; then dispatch
631 the result to the command object.
632 """
633 if code.startswith('BG'):
634 code = code[2:]
635 cmd = me.cmd[tag]
636 if code != 'INFO':
637 del me.cmd[tag]
638 cmd.response(code, *w)
639
640 def _fgresponse(me, code, *w):
641 """Process responses to the foreground command."""
642 me._response(code, None, *w)
643
644 ## --- Interface methods ---
645
646 def rawcommand(me, cmd):
647 """
648 Submit the TripeCommand CMD to the server, and look after it until it
649 completes.
650 """
651 if not me.connectedp():
652 raise TripeConnectionError('connection closed')
653 me.queue.push(cmd)
654 me.dequeue()
655
656 def command(me, *cmd, **kw):
657 """Convenience wrapper for creating a TripeCommandIterator object."""
658 return TripeCommandIterator(me, cmd, **kw)
659
660 ## --- Convenience methods for server commands ---
661
662 def add(me, peer, *addr, **kw):
663 return _simple(me.command(bg = True,
664 *['ADD'] +
665 _kwopts(kw, ['tunnel', 'keepalive', 'cork']) +
666 [peer] +
667 list(addr)))
668 def addr(me, peer):
669 return _oneline(me.command('ADDR', peer))
670 def algs(me):
671 return _keyvals(me.command('ALGS'))
672 def checkchal(me, chal):
673 return _simple(me.command('CHECKCHAL', chal))
674 def daemon(me):
675 return _simple(me.command('DAEMON'))
676 def eping(me, peer, **kw):
677 return _oneline(me.command(bg = True,
678 *['PING'] +
679 _kwopts(kw, ['timeout']) +
680 [peer]))
681 def forcekx(me, peer):
682 return _simple(me.command('FORCEKX', peer))
683 def getchal(me):
684 return _oneline(me.command('GETCHAL', filter = _tokenjoin))
685 def greet(me, peer, chal):
686 return _simple(me.command('GREET', peer, chal))
687 def help(me):
688 return list(me.command('HELP', filter = _tokenjoin))
689 def ifname(me, peer):
690 return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
691 def kill(me, peer):
692 return _simple(me.command('KILL', peer))
693 def list(me):
694 return list(me.command('LIST', filter = _tokenjoin))
695 def notify(me, *msg):
696 return _simple(me.command('NOTIFY', *msg))
697 def peerinfo(me, peer):
698 return _keyvals(me.command('PEERINFO', peer))
699 def ping(me, peer, **kw):
700 return _oneline(me.command(bg = True,
701 *['PING'] +
702 _kwopts(kw, ['timeout']) +
703 [peer]))
704 def port(me):
705 return _oneline(me.command('PORT', filter = _tokenjoin))
706 def quit(me):
707 return _simple(me.command('QUIT'))
708 def reload(me):
709 return _simple(me.command('RELOAD'))
710 def servinfo(me):
711 return _keyvals(me.command('SERVINFO'))
712 def setifname(me, new):
713 return _simple(me.command('SETIFNAME', new))
714 def svcclaim(me, service, version):
715 return _simple(me.command('SVCCLAIM', service, version))
716 def svcensure(me, service, version = None):
717 return _simple(me.command('SVCENSURE', service,
718 *((version is not None and [version]) or [])))
719 def svcfail(me, job, *msg):
720 return _simple(me.command('SVCFAIL', job, *msg))
721 def svcinfo(me, job, *msg):
722 return _simple(me.command('SVCINFO', job, *msg))
723 def svclist(me):
724 return list(me.command('SVCLIST'))
725 def svcok(me, job):
726 return _simple(me.command('SVCOK', job))
727 def svcquery(me, service):
728 return _keyvals(me.command('SVCQUERY', service))
729 def svcrelease(me, service):
730 return _simple(me.command('SVCRELEASE', service))
731 def svcsubmit(me, service, *args, **kw):
732 return me.command(bg = True,
733 *['SVCSUBMIT'] +
734 _kwopts(kw, ['version']) +
735 [service] +
736 list(args))
737 def stats(me, peer):
738 return _keyvals(me.command('STATS', peer))
739 def trace(me, *args):
740 return _tracelike(me.command('TRACE', *args))
741 def tunnels(me):
742 return list(me.command('TUNNELS', filter = _tokenjoin))
743 def version(me):
744 return _oneline(me.command('VERSION', filter = _tokenjoin))
745 def warn(me, *msg):
746 return _simple(me.command('WARN', *msg))
747 def watch(me, *args):
748 return _tracelike(me.command('WATCH', *args))
749
750###--------------------------------------------------------------------------
751### Asynchronous commands.
752
753class Queue (object):
754 """
755 A queue of things arriving asynchronously.
756
757 This is a very simple single-reader multiple-writer queue. It's useful for
758 more complex coroutines which need to cope with a variety of possible
759 incoming events.
760 """
761
762 def __init__(me):
763 """Create a new empty queue."""
764 me.contents = M.Array()
765 me.waiter = None
766
767 def _wait(me):
768 """
769 Internal: wait for an item to arrive in the queue.
770
771 Complain if someone is already waiting, because this is just a
772 single-reader queue.
773 """
774 if me.waiter:
775 raise ValueError('queue already being waited on')
776 try:
777 me.waiter = Coroutine.getcurrent()
778 while not me.contents:
779 me.waiter.parent.switch()
780 finally:
781 me.waiter = None
782
783 def get(me):
784 """
785 Remove and return the item at the head of the queue.
786
787 If the queue is empty, wait until an item arrives.
788 """
789 me._wait()
790 return me.contents.shift()
791
792 def peek(me):
793 """
794 Return the item at the head of the queue without removing it.
795
796 If the queue is empty, wait until an item arrives.
797 """
798 me._wait()
799 return me.contents[0]
800
801 def put(me, thing):
802 """
803 Write THING to the queue.
804
805 If someone is waiting on the queue, wake him up immediately; otherwise
806 just leave the item there for later.
807 """
808 me.contents.push(thing)
809 if me.waiter:
810 me.waiter.switch()
811
812class TripeAsynchronousCommand (TripeCommand):
813 """
814 Asynchronous commands.
815
816 This is the complicated way of issuing commands. You must set up a queue,
817 and associate the command with the queue. Responses arriving for the
818 command will be put on the queue as an triple of the form (TAG, CODE, REST)
819 -- where TAG is an object of your choice, not interpreted by this class,
820 CODE is the server's response code (OK, INFO, FAIL), and REST is the list
821 of the rest of the server's tokens.
822
823 Using this, you can write coroutines which process many commands (and
824 possibly other events) simultaneously.
825 """
826
827 def __init__(me, queue, tag, words):
828 """Make an asynchronous command consisting of the given WORDS, which
829 sends responses to QUEUE, labelled with TAG."""
830 TripeCommand.__init__(me, words)
831 me.queue = queue
832 me.tag = tag
833
834 def response(me, code, *stuff):
835 """Handle a server response by writing it to the caller's queue."""
836 me.queue.put((me.tag, code, list(stuff)))
837
838###--------------------------------------------------------------------------
839### Selecting command dispatcher.
840
841class SelCommandDispatcher (TripeCommandDispatcher):
842 """
843 A command dispatcher which integrates with mLib's I/O-event system.
844
845 To use, simply create an instance and run mLib.select in a loop in your
846 main coroutine.
847 """
848
849 def __init__(me, socket):
850 """
851 Create an instance; SOCKET is the admin socket to connect to.
852
853 Note that no connection is made initially.
854 """
855 TripeCommandDispatcher.__init__(me, socket)
856 me.selfile = None
857
858 def connected(me):
859 """Connection hook: wires itself into the mLib select machinery."""
860 TripeCommandDispatcher.connected(me)
861 me.selfile = M.SelFile(me.sock.fileno(), M.SEL_READ, me.receive)
862 me.selfile.enable()
863
864 def disconnected(me, reason):
865 """Disconnection hook: removes itself from the mLib select machinery."""
866 TripeCommandDispatcher.disconnected(me, reason)
867 me.selfile = None
868
869###--------------------------------------------------------------------------
870### Services.
871
872class TripeJobCancelled (Exception):
873 """
874 Exception sent to job handler if the client kills the job.
875
876 Not propagated further.
877 """
878 pass
879
880class TripeJobError (Exception):
881 """
882 Exception to cause failure report for running job.
883
884 Sends an SVCFAIL code back.
885 """
886 pass
887
888class TripeSyntaxError (Exception):
889 """
890 Exception to report a syntax error for a job.
891
892 Sends an SVCFAIL bad-svc-syntax message back.
893 """
894 pass
895
896class TripeServiceManager (SelCommandDispatcher):
897 """
898 A command dispatcher with added handling for incoming service requests.
899
900 There is usually only one instance of this class, called svcmgr. Some of
901 the support functions in this module assume that this is the case.
902
903 To use, run mLib.select in a loop until the quitp method returns true;
904 then, in a non-root coroutine, register your services by calling `add', and
905 then call `running' when you've finished setting up.
906
907 The instance handles server service messages SVCJOB, SVCCANCEL and
908 SVCCLAIM. It maintains a table of running services. Incoming jobs cause
909 the service's `job' method to be invoked; SVCCANCEL sends a
910 TripeJobCancelled exception to the handler coroutine, and SVCCLAIM causes
911 the relevant service to be deregistered.
912
913 There is no base class for jobs, but a job must implement two methods:
914
915 start() Begin processing; might be a no-op.
916
917 cancel() Stop processing; the original client has killed the
918 job.
919
920 The life of a service manager is divided into two parts: setup and running;
921 you tell the manager that you've finished setting up by calling the
922 `running' method. If, at any point after setup is finished, there are no
923 remaining services or jobs, `quitp' will return true, ending the process.
924 """
925
926 ## --- Attributes ---
927 ##
928 ## svc Mapping name -> service object
929 ##
930 ## job Mapping jobid -> job handler coroutine
931 ##
932 ## runningp True when setup is finished
933 ##
934 ## _quitp True if explicit quit has been requested
935
936 def __init__(me, socket):
937 """
938 Initialize the service manager.
939
940 SOCKET is the administration socket to connect to.
941 """
942 SelCommandDispatcher.__init__(me, socket)
943 me.svc = {}
944 me.job = {}
945 me.runningp = False
946 me.handler['SVCCANCEL'] = me._cancel
947 me.handler['SVCJOB'] = me._job
948 me.handler['SVCCLAIM'] = me._claim
949 me._quitp = 0
950
951 def addsvc(me, svc):
952 """Register a new service; SVC is a TripeService instance."""
953 assert svc.name not in me.svc
954 me.svcclaim(svc.name, svc.version)
955 me.svc[svc.name] = svc
956
957 def _cancel(me, _, jid):
958 """
959 Called when the server cancels a job; invokes the job's `cancel' method.
960 """
961 job = me.job[jid]
962 del me.job[jid]
963 job.cancel()
964
965 def _claim(me, _, svc, __):
966 """Called when another program claims our service at a higher version."""
967 del me.svc[svc]
968
969 def _job(me, _, jid, svc, cmd, *args):
970 """
971 Called when the server sends us a job to do.
972
973 Calls the service to collect a job, and begins processing it.
974 """
975 assert jid not in me.job
976 svc = me.svc[svc.lower()]
977 job = svc.job(jid, cmd, args)
978 me.job[jid] = job
979 job.start()
980
981 def running(me):
982 """Answer true if setup is finished."""
983 me.runningp = True
984
985 def jobdone(me, jid):
986 """Informs the service manager that the job with id JID has finished."""
987 try:
988 del me.job[jid]
989 except KeyError:
990 pass
991
992 def quitp(me):
993 """
994 Return true if no services or jobs are active (and, therefore, if this
995 process can quit without anyone caring).
996 """
997 return me._quitp or (me.runningp and ((not me.svc and not me.job) or
998 not me.selfile))
999
1000 def quit(me):
1001 """Forces the quit flag (returned by quitp) on."""
1002 me._quitp = True
1003
1004class TripeService (object):
1005 """
1006 A standard service.
1007
1008 The NAME and VERSION are passed on to the server. The CMDTAB is a
1009 dictionary mapping command names (in lowercase) to command objects.
1010
1011 If the CMDTAB doesn't have entries for commands HELP and QUIT then defaults
1012 are provided.
1013
1014 TripeService itself is mostly agnostic about the nature of command objects,
1015 but the TripeServiceJob class (below) has some requirements. The built-in
1016 HELP command requires command objects to have `usage' attributes.
1017 """
1018
1019 def __init__(me, name, version, cmdtab):
1020 """
1021 Create and register a new service with the given NAME and VERSION.
1022
1023 CMDTAB maps command names (in lower-case) to command objects.
1024 """
1025 me.name = name
1026 me.version = version
1027 me.cmd = cmdtab
1028 me.activep = True
1029 me.cmd.setdefault('help',
1030 TripeServiceCommand('help', 0, 0, '', me._help))
1031 me.cmd.setdefault('quit',
1032 TripeServiceCommand('quit', 0, 0, '', me._quit))
1033
1034 def job(me, jid, cmd, args):
1035 """
1036 Called by the service manager: a job arrived with id JID.
1037
1038 It asks for comamnd CMD with argument list ARGS. Creates a new job,
1039 passing it the information needed.
1040 """
1041 return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1042
1043 ## Simple default command handlers, complying with the spec in
1044 ## tripe-service(7).
1045
1046 def _help(me):
1047 """Send a help summary to the user."""
1048 cmds = me.cmd.items()
1049 cmds.sort()
1050 for name, cmd in cmds:
1051 svcinfo(name, *cmd.usage)
1052
1053 def _quit(me):
1054 """Terminate the service manager."""
1055 svcmgr.notify('svc-quit', me.name, 'admin-request')
1056 svcmgr.quit()
1057
1058class TripeServiceCommand (object):
1059 """A simple service command."""
1060
1061 def __init__(me, name, min, max, usage, func):
1062 """
1063 Creates a new command.
1064
1065 NAME is the command's name (in lowercase).
1066
1067 MIN and MAX are the minimum and maximum number of allowed arguments (used
1068 for checking); either may be None to indicate no minimum or maximum.
1069
1070 USAGE is a usage string, used for generating help and error messages.
1071
1072 FUNC is the function to invoke.
1073 """
1074 me.name = name
1075 me.min = min
1076 me.max = max
1077 me.usage = usage.split()
1078 me.func = func
1079
1080 def run(me, *args):
1081 """
1082 Called when the command is invoked.
1083
1084 Does minimal checking of the arguments and calls the supplied function.
1085 """
1086 if (me.min is not None and len(args) < me.min) or \
1087 (me.max is not None and len(args) > me.max):
1088 raise TripeSyntaxError
1089 me.func(*args)
1090
1091class TripeServiceJob (Coroutine):
1092 """
1093 Job handler coroutine.
1094
1095 A standard TripeService invokes a TripeServiceJob for each incoming job
1096 request, passing it the jobid, command and arguments, and a command
1097 object. The command object needs the following attributes.
1098
1099 usage A usage list (excluding the command name) showing
1100 arguments and options.
1101
1102 run(*ARGS) Function to react to the command with ARGS split into
1103 separate arguments. Invoked in a coroutine. The
1104 svcinfo function (not the TripeCommandDispatcher
1105 method) may be used to send INFO lines. The function
1106 may raise TripeJobError to send a FAIL response back,
1107 or TripeSyntaxError to send a generic usage error.
1108 TripeJobCancelled exceptions are trapped silently.
1109 Other exceptions are translated into a generic
1110 internal-error message.
1111
1112 This class automatically takes care of sending some closing response to the
1113 job, and for informing the service manager that the job is completed.
1114
1115 The `jid' attribute stores the job's id.
1116 """
1117
1118 def __init__(me, jid, svc, cmd, command, args):
1119 """
1120 Start a new job.
1121
1122 The job is created with id JID, for service SVC, processing command name
1123 CMD (which the service resolved into the command object COMMAND, or
1124 None), and with the arguments ARGS.
1125 """
1126 Coroutine.__init__(me)
1127 me.jid = jid
1128 me.svc = svc
1129 me.cmd = cmd
1130 me.command = command
1131 me.args = args
1132
1133 def run(me):
1134 """
1135 Main body of the coroutine.
1136
1137 Does the tedious exception handling boilerplate and invokes the command's
1138 run method.
1139 """
1140 try:
1141 try:
1142 if me.command is None:
1143 svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1144 else:
1145 me.command.run(*me.args)
1146 svcmgr.svcok(me.jid)
1147 except TripeJobError, exc:
1148 svcmgr.svcfail(me.jid, *exc.args)
1149 except TripeSyntaxError:
1150 svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1151 me.svc.name, me.command.name,
1152 *me.command.usage)
1153 except TripeJobCancelled:
1154 pass
1155 except Exception, exc:
1156 svcmgr.svcfail(me.jid, 'svc-internal-error',
1157 exc.__class__.__name__, str(exc))
1158 finally:
1159 svcmgr.jobdone(me.jid)
1160
1161 def start(me):
1162 """Invoked by the service manager to start running the coroutine."""
1163 me.switch()
1164
1165 def cancel(me):
1166 """Invoked by the service manager to cancel the job."""
1167 me.throw(TripeJobCancelled())
1168
1169def svcinfo(*args):
1170 """
1171 If invoked from a TripeServiceJob coroutine, sends an INFO line to the
1172 job's sender, automatically using the correct job id.
1173 """
1174 svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1175
1176def _setupsvc(tab, func):
1177 """
1178 Setup coroutine for setting up service programs.
1179
1180 Register the given services.
1181 """
1182 try:
1183 for service in tab:
1184 svcmgr.addsvc(service)
1185 if func:
1186 func()
1187 finally:
1188 svcmgr.running()
1189
1190svcmgr = TripeServiceManager(None)
1191_spawnq = []
1192def runservices(socket, tab, init = None, setup = None, daemon = False):
1193 """
1194 Function to start a service provider.
1195
1196 SOCKET is the socket to connect to, usually tripesock.
1197
1198 TAB is a list of entries. An entry may be either a tuple
1199
1200 (NAME, VERSION, COMMANDS)
1201
1202 or a service object (e.g., a TripeService instance).
1203
1204 COMMANDS is a dictionary mapping command names to tuples
1205
1206 (MIN, MAX, USAGE, FUNC)
1207
1208 of arguments for a TripeServiceCommand object.
1209
1210 If DAEMON is true, then the process is forked into the background before we
1211 start. If INIT is given, it is called in the main coroutine, immediately
1212 after forking. If SETUP is given, it is called in a coroutine, after
1213 calling INIT and setting up the services but before marking the service
1214 manager as running.
1215
1216 It is a really bad idea to do any initialization, particularly setting up
1217 coroutines, outside of the INIT or SETUP functions. In particular, if
1218 we're using rmcr for fake coroutines, the daemonizing fork will kill off
1219 the currently established coroutines in a most surprising way.
1220
1221 The function runs a main select loop until the service manager decides to
1222 quit.
1223 """
1224
1225 global _spawnq
1226 svcmgr.socket = socket
1227 svcmgr.connect()
1228 svcs = []
1229 for service in tab:
1230 if not isinstance(service, tuple):
1231 svcs.append(service)
1232 else:
1233 name, version, commands = service
1234 cmdmap = {}
1235 for cmd, stuff in commands.iteritems():
1236 cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1237 svcs.append(TripeService(name, version, cmdmap))
1238 if daemon:
1239 M.daemonize()
1240 if init is not None:
1241 init()
1242 Coroutine(_setupsvc).switch(svcs, setup)
1243 while not svcmgr.quitp():
1244 for cr, args, kw in _spawnq:
1245 cr.switch(*args, **kw)
1246 _spawnq = []
1247 M.select()
1248
1249def spawn(cr, *args, **kw):
1250 """
1251 Utility for spawning coroutines.
1252
1253 The coroutine CR is made to be a direct child of the root coroutine, and
1254 invoked by it with the given arguments.
1255 """
1256 cr.parent = rootcr
1257 _spawnq.append((cr, args, kw))
1258
1259###--------------------------------------------------------------------------
1260### Utilities for services.
1261
1262_timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
1263def timespec(spec):
1264 """Parse the timespec SPEC, returning a number of seconds."""
1265 mul = 1
1266 if len(spec) > 1 and spec[-1] in _timeunits:
1267 mul = _timeunits[spec[-1]]
1268 spec = spec[:-1]
1269 try:
1270 t = int(spec)
1271 except:
1272 raise TripeJobError('bad-time-spec', spec)
1273 if t < 0:
1274 raise TripeJobError('bad-time-spec', spec)
1275 return mul * int(spec)
1276
1277class OptParse (object):
1278 """
1279 Parse options from a command list in the conventional fashion.
1280
1281 ARGS is a list of arguments to a command. ALLOWED is a sequence of allowed
1282 options. The returned values are the option tags. During parsing, the
1283 `arg' method may be used to retrieve the argument for the most recent
1284 option. Afterwards, `rest' may be used to retrieve the remaining
1285 non-option arguments, and do a simple check on how many there are.
1286
1287 The parser correctly handles `--' option terminators.
1288 """
1289
1290 def __init__(me, args, allowed):
1291 """
1292 Create a new option parser.
1293
1294 The parser will scan the ARGS for options given in the sequence ALLOWED
1295 (which are expected to include the `-' prefix).
1296 """
1297 me.allowed = {}
1298 for a in allowed:
1299 me.allowed[a] = True
1300 me.args = list(args)
1301
1302 def __iter__(me):
1303 """Iterator protocol: I am my own iterator."""
1304 return me
1305
1306 def next(me):
1307 """
1308 Iterator protocol: return the next option.
1309
1310 If we've run out, raise StopIteration.
1311 """
1312 if len(me.args) == 0 or \
1313 len(me.args[0]) < 2 or \
1314 not me.args[0].startswith('-'):
1315 raise StopIteration
1316 opt = me.args.pop(0)
1317 if opt == '--':
1318 raise StopIteration
1319 if opt not in me.allowed:
1320 raise TripeSyntaxError
1321 return opt
1322
1323 def arg(me):
1324 """
1325 Return the argument for the most recent option.
1326
1327 If none is available, raise TripeSyntaxError.
1328 """
1329 if len(me.args) == 0:
1330 raise TripeSyntaxError
1331 return me.args.pop(0)
1332
1333 def rest(me, min = None, max = None):
1334 """
1335 After option parsing is done, return the remaining arguments.
1336
1337 Check that there are at least MIN and at most MAX arguments remaining --
1338 either may be None to suppress the check.
1339 """
1340 if (min is not None and len(me.args) < min) or \
1341 (max is not None and len(me.args) > max):
1342 raise TripeSyntaxError
1343 return me.args
1344
1345###----- That's all, folks --------------------------------------------------