chiark / gitweb /
Allow different peer associations to use different private keys.
[tripe] / py / tripe.py.in
CommitLineData
2fa80010
MW
1### -*-python-*-
2###
3### Administration connection with tripe server
4###
5### (c) 2006 Straylight/Edgeware
6###
7
8###----- Licensing notice ---------------------------------------------------
9###
10### This file is part of Trivial IP Encryption (TrIPE).
11###
12### TrIPE is free software; you can redistribute it and/or modify
13### it under the terms of the GNU General Public License as published by
14### the Free Software Foundation; either version 2 of the License, or
15### (at your option) any later version.
16###
17### TrIPE is distributed in the hope that it will be useful,
18### but WITHOUT ANY WARRANTY; without even the implied warranty of
19### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20### GNU General Public License for more details.
21###
22### You should have received a copy of the GNU General Public License
23### along with TrIPE; if not, write to the Free Software Foundation,
24### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25
26"""
27This module provides classes and functions for connecting to a running tripe
28server, sending it commands, receiving and processing replies, and
29implementing services.
30
31Rather than end up in lost in a storm of little event-driven classes, or a
32morass of concurrent threads, the module uses coroutines to present a fairly
33simple function call/return interface to potentially long-running commands
a62f8e8a 34which must run without blocking the main process. It assumes a coroutine
2fa80010
MW
35module presenting a subset of the `greenlet' interface: if actual greenlets
36are available, they are used; otherwise there's an implementation in terms of
37threads (with lots of locking) which will do instead.
38
39The simple rule governing the coroutines used here is this:
40
41 * The root coroutine never cares what values are passed to it when it
42 resumes: it just discards them.
43
44 * Other, non-root, coroutines are presumed to be waiting for some specific
45 thing.
46
47Configuration variables:
48 configdir
49 socketdir
50 PACKAGE
51 VERSION
52 tripesock
53 peerdb
54
55Other useful variables:
56 rootcr
57 svcmgr
58
59Other tweakables:
60 _debug
61
62Exceptions:
63 Exception
64 StandardError
65 TripeConnectionError
66 TripeError
67 TripeInternalError
68 TripeJobCancelled
69 TripeJobError
70 TripeSyntaxError
71
72Classes:
73 _Coroutine
74 Coroutine
75 TripeServiceJob
76 OptParse
77 Queue
690a6ec1 78 SelIOWatcher
2fa80010
MW
79 TripeCommand
80 TripeSynchronousCommand
81 TripeAsynchronousCommand
82 TripeCommandIterator
83 TripeConnection
84 TripeCommandDispatcher
690a6ec1 85 TripeServiceManager
2fa80010
MW
86 TripeService
87 TripeServiceCommand
88
89Utility functions:
90 quotify
91 runservices
92 spawn
93 svcinfo
94 timespec
95"""
96
97__pychecker__ = 'self=me no-constCond no-argsused'
98
99_debug = False
100
101###--------------------------------------------------------------------------
102### External dependencies.
103
104import socket as S
105import errno as E
106import mLib as M
107import re as RX
108import sys as SYS
109import os as OS
110
111try:
112 if OS.getenv('TRIPE_FORCE_RMCR') is not None:
113 raise ImportError
114 from py.magic import greenlet as _Coroutine
115except ImportError:
116 from rmcr import Coroutine as _Coroutine
117
118###--------------------------------------------------------------------------
119### Coroutine hacking.
120
121rootcr = _Coroutine.getcurrent()
122
123class Coroutine (_Coroutine):
124 """
125 A coroutine class which can only be invoked by the root coroutine.
126
127 The root, by construction, cannot be an instance of this class.
128 """
129 def switch(me, *args, **kw):
130 assert _Coroutine.getcurrent() is rootcr
22b47552 131 if _debug: print '* %s' % me
2fa80010 132 _Coroutine.switch(me, *args, **kw)
22b47552 133 if _debug: print '* %s' % rootcr
2fa80010
MW
134
135###--------------------------------------------------------------------------
136### Default places for things.
137
138configdir = OS.environ.get('TRIPEDIR', "@configdir@")
139socketdir = "@socketdir@"
140PACKAGE = "@PACKAGE@"
141VERSION = "@VERSION@"
142
143tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
6005ef9b 144peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb')
2fa80010
MW
145
146###--------------------------------------------------------------------------
147### Connection to the server.
148
149def readnonblockingly(sock, len):
150 """
151 Nonblocking read from SOCK.
152
153 Try to return LEN bytes. If couldn't read anything, return None. EOF is
154 returned as an empty string.
155 """
156 try:
157 sock.setblocking(0)
158 return sock.recv(len)
159 except S.error, exc:
160 if exc[0] == E.EWOULDBLOCK:
161 return None
162 raise
163
164class TripeConnectionError (StandardError):
165 """Something happened to the connection with the server."""
166 pass
167class TripeInternalError (StandardError):
168 """This program is very confused."""
169 pass
170
171class TripeConnection (object):
172 """
173 A logical connection to the tripe administration socket.
174
175 There may or may not be a physical connection. (This is needed for the
176 monitor, for example.)
177
178 This class isn't very useful on its own, but it has useful subclasses. At
179 this level, the class is agnostic about I/O multiplexing schemes; that gets
180 added later.
181 """
182
183 def __init__(me, socket):
184 """
185 Make a connection to the named SOCKET.
186
187 No physical connection is made initially.
188 """
189 me.socket = socket
190 me.sock = None
191 me.lbuf = None
690a6ec1 192 me.iowatch = SelIOWatcher(me)
2fa80010
MW
193
194 def connect(me):
195 """
196 Ensure that there's a physical connection.
197
198 Do nothing if we're already connected. Invoke the `connected' method if
199 successful.
200 """
201 if me.sock: return
202 sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
203 sock.connect(me.socket)
204 me.sock = sock
205 me.lbuf = M.LineBuffer(me.line, me._eof)
206 me.lbuf.size = 1024
207 me.connected()
208 return me
209
210 def disconnect(me, reason):
211 """
212 Disconnect the physical connection.
213
214 Invoke the `disconnected' method, giving the provided REASON, which
215 should be either None or an exception.
216 """
217 if not me.sock: return
218 me.disconnected(reason)
219 me.sock.close()
220 me.sock = None
221 me.lbuf.disable()
222 me.lbuf = None
223 return me
224
225 def connectedp(me):
226 """
227 Return true if there's a current, believed-good physical connection.
228 """
229 return me.sock is not None
230
231 __nonzero__ = connectedp
232
233 def send(me, line):
234 """
235 Send the LINE to the connection's socket.
236
237 All output is done through this method; it can be overridden to provide
238 proper nonblocking writing, though this seems generally unnecessary.
239 """
240 try:
241 me.sock.setblocking(1)
242 me.sock.send(line + '\n')
243 except Exception, exc:
244 me.disconnect(exc)
245 raise
246 return me
247
248 def receive(me):
249 """
250 Receive whatever's ready from the connection's socket.
251
252 Call `line' on each complete line, and `eof' if the connection closed.
253 Subclasses which attach this class to an I/O-event system should call
254 this method when the socket (CONN.sock) is ready for reading.
255 """
256 while me.sock is not None:
257 try:
258 buf = readnonblockingly(me.sock, 16384)
259 except Exception, exc:
260 me.disconnect(exc)
261 raise
262 if buf is None:
263 return me
264 if buf == '':
265 me._eof()
266 return me
267 me.lbuf.flush(buf)
268 return me
269
270 def _eof(me):
271 """Internal end-of-file handler."""
272 me.disconnect(TripeConnectionError('connection lost'))
273 me.eof()
274
275 def connected(me):
276 """
277 To be overridden by subclasses to react to a connection being
278 established.
279 """
690a6ec1 280 me.iowatch.connected(me.sock)
2fa80010
MW
281
282 def disconnected(me, reason):
283 """
284 To be overridden by subclasses to react to a connection being severed.
285 """
690a6ec1 286 me.iowatch.disconnected()
2fa80010
MW
287
288 def eof(me):
289 """To be overridden by subclasses to handle end-of-file."""
290 pass
291
292 def line(me, line):
293 """To be overridden by subclasses to handle incoming lines."""
294 pass
295
690a6ec1
MW
296###--------------------------------------------------------------------------
297### I/O loop integration.
298
299class SelIOWatcher (object):
300 """
301 Integration with mLib's I/O event system.
302
303 You can replace this object with a different one for integration with,
304 e.g., glib's main loop, by setting CONN.iowatcher to a different object
305 while the CONN is disconnected.
306 """
307
308 def __init__(me, conn):
309 me._conn = conn
310 me._selfile = None
311
312 def connected(me, sock):
313 """
314 Called when a connection is made.
315
316 SOCK is the socket. The watcher must arrange to call CONN.receive when
317 data is available.
318 """
319 me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive)
320 me._selfile.enable()
321
322 def disconnected(me):
323 """
324 Called when the connection is lost.
325 """
326 me._selfile = None
327
328 def iterate(me):
329 """
330 Wait for something interesting to happen, and issue events.
331
332 That is, basically, do one iteration of a main select loop, processing
333 all of the events, and then return. This isn't needed for
334 TripeCommandDispatcher, but runservices wants it.
335 """
336 M.select()
337
338###--------------------------------------------------------------------------
339### Inter-coroutine communication.
340
341class Queue (object):
342 """
343 A queue of things arriving asynchronously.
344
345 This is a very simple single-reader multiple-writer queue. It's useful for
346 more complex coroutines which need to cope with a variety of possible
347 incoming events.
348 """
349
350 def __init__(me):
351 """Create a new empty queue."""
352 me.contents = M.Array()
353 me.waiter = None
354
355 def _wait(me):
356 """
357 Internal: wait for an item to arrive in the queue.
358
359 Complain if someone is already waiting, because this is just a
360 single-reader queue.
361 """
362 if me.waiter:
363 raise ValueError('queue already being waited on')
364 try:
365 me.waiter = Coroutine.getcurrent()
366 while not me.contents:
367 me.waiter.parent.switch()
368 finally:
369 me.waiter = None
370
371 def get(me):
372 """
373 Remove and return the item at the head of the queue.
374
375 If the queue is empty, wait until an item arrives.
376 """
377 me._wait()
378 return me.contents.shift()
379
380 def peek(me):
381 """
382 Return the item at the head of the queue without removing it.
383
384 If the queue is empty, wait until an item arrives.
385 """
386 me._wait()
387 return me.contents[0]
388
389 def put(me, thing):
390 """
391 Write THING to the queue.
392
393 If someone is waiting on the queue, wake him up immediately; otherwise
394 just leave the item there for later.
395 """
396 me.contents.push(thing)
397 if me.waiter:
398 me.waiter.switch()
399
2fa80010
MW
400###--------------------------------------------------------------------------
401### Dispatching coroutine.
402
403## Match a string if it can stand on its own as a bareword: i.e., it doesn't
404## contain backslashes, quotes or whitespace.
405rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
406
407## Match characters which need to be escaped, even in quoted text.
408rx_weird = RX.compile(r'([\\\'])')
409
410def quotify(s):
411 """Quote S according to the tripe-admin(5) rules."""
412 m = rx_ordinary.match(s)
413 if m and m.end() == len(s):
414 return s
415 else:
416 return "'" + rx_weird.sub(r'\\\1', s) + "'"
417
418def _callback(func):
419 """
420 Return a wrapper for FUNC which reports exceptions thrown by it.
421
422 Useful in the case of callbacks invoked by C functions which ignore
423 exceptions.
424 """
425 def _(*a, **kw):
426 try:
427 return func(*a, **kw)
428 except:
429 SYS.excepthook(*SYS.exc_info())
430 raise
431 return _
432
433class TripeCommand (object):
434 """
435 This abstract class represents a command in progress.
436
437 The `words' attribute contains the list of tokens which make up the
438 command.
439
440 Subclasses must implement a method to handle server responses:
441
442 * response(CODE, *ARGS): CODE is one of the strings 'OK', 'INFO' or
443 'FAIL'; ARGS are the remaining tokens from the server's response.
444 """
445
446 def __init__(me, words):
447 """Make a new command consisting of the given list of WORDS."""
448 me.words = words
449
450class TripeSynchronousCommand (TripeCommand):
451 """
452 A simple command, processed apparently synchronously.
453
454 Must be invoked from a coroutine other than the root (or whichever one is
455 running the dispatcher); in reality, other coroutines carry on running
456 while we wait for a response from the server.
457
458 Each server response causes the calling coroutine to be resumed with the
459 pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
460 or `FAIL') and REST is a list of the server's other response tokens. The
461 calling coroutine must continue switching back to the dispatcher until a
462 terminating response (`OK' or `FAIL') is received or become very
463 confused.
464
465 Mostly it's better to use the TripeCommandIterator to do this
466 automatically.
467 """
468
469 def __init__(me, words):
470 """Initialize the command, specifying the WORDS to send to the server."""
471 TripeCommand.__init__(me, words)
472 me.owner = Coroutine.getcurrent()
473
474 def response(me, code, *rest):
475 """Handle a server response by forwarding it to the calling coroutine."""
476 me.owner.switch((code, rest))
477
478class TripeError (StandardError):
479 """
480 A tripe command failed with an error (a FAIL code). The args attribute
481 contains a list of the server's message tokens.
482 """
483 pass
484
485class TripeCommandIterator (object):
486 """
487 Iterator interface to a tripe command.
488
489 The values returned by the iterator are lists of tokens from the server's
490 INFO lines, as processed by the given filter function, if any. The
491 iterator completes normally (by raising StopIteration) if the server
492 reported OK, and raises an exception if the command failed for some reason.
493
494 A TripeError is raised if the server issues a FAIL code. If the connection
495 failed, some other exception is raised.
496 """
497
498 def __init__(me, dispatcher, words, bg = False, filter = None):
499 """
500 Create a new command iterator.
501
502 The command is submitted to the DISPATCHER; it consists of the given
503 WORDS. If BG is true, then an option is inserted to request that the
504 server run the command in the background. The FILTER is applied to the
505 token lists which the server responds, and the filter's output are the
506 items returned by the iterator.
507 """
508 me.dcr = Coroutine.getcurrent().parent
509 if me.dcr is None:
510 raise ValueError, 'must invoke from coroutine'
511 me.filter = filter or (lambda x: x)
512 if bg:
513 words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
514 dispatcher.rawcommand(TripeSynchronousCommand(words))
515
516 def __iter__(me):
517 """Iterator protocol: I am my own iterator."""
518 return me
519
520 def next(me):
521 """
522 Iterator protocol: return the next piece of information from the server.
523
524 INFO responses are filtered and returned as the values of the iteration.
525 FAIL and CONNERR responses are turned into exceptions and raised.
526 Finally, OK is turned into StopIteration, which should cause a normal end
527 to the iteration process.
528 """
529 thing = me.dcr.switch()
530 code, rest = thing
531 if code == 'INFO':
532 return me.filter(rest)
533 elif code == 'OK':
534 raise StopIteration
535 elif code == 'CONNERR':
536 if rest is None:
537 raise TripeConnectionError, 'connection terminated by user'
538 else:
539 raise rest
540 elif code == 'FAIL':
541 raise TripeError(*rest)
542 else:
543 raise TripeInternalError \
544 ('unexpected tripe response %r' % ([code] + rest))
545
546### Simple utility functions for the TripeCommandIterator convenience
547### methods.
548
549def _tokenjoin(words):
550 """Filter function: simply join the given tokens with spaces between."""
551 return ' '.join(words)
552
553def _keyvals(iter):
554 """Return a dictionary formed from the KEY=VALUE pairs returned by the
555 iterator ITER."""
556 kv = {}
557 for ww in iter:
558 for w in ww:
559 q = w.index('=')
560 kv[w[:q]] = w[q + 1:]
561 return kv
562
563def _simple(iter):
564 """Raise an error if ITER contains any item."""
565 stuff = list(iter)
566 if len(stuff) != 0:
567 raise TripeInternalError('expected no response')
568 return None
569
570def _oneline(iter):
571 """If ITER contains a single item, return it; otherwise raise an error."""
572 stuff = list(iter)
573 if len(stuff) != 1:
574 raise TripeInternalError('expected only one line of response')
575 return stuff[0]
576
577def _tracelike(iter):
578 """Handle a TRACE-like command. The result is a list of tuples (CHAR,
579 STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
580 disabled, `+' if enabled, maybe something else later), and DESC is the
581 human-readable description."""
582 stuff = []
583 for ww in iter:
584 ch = ww[0][0]
585 st = ww[0][1:]
586 desc = ' '.join(ww[1:])
587 stuff.append((ch, st, desc))
588 return stuff
589
590def _kwopts(kw, allowed):
591 """Parse keyword arguments into options. ALLOWED is a list of allowable
592 keywords; raise errors if other keywords are present. KEY = VALUE becomes
593 an option pair -KEY VALUE if VALUE is a string, just the option -KEY if
594 VALUE is a true non-string, or nothing if VALUE is false.. Insert a `--'
595 at the end to stop the parser getting confused."""
596 opts = []
597 amap = {}
598 for a in allowed: amap[a] = True
599 for k, v in kw.iteritems():
600 if k not in amap:
601 raise ValueError('option %s not allowed here' % k)
602 if isinstance(v, str):
603 opts += ['-' + k, v]
604 elif v:
605 opts += ['-' + k]
606 opts.append('--')
607 return opts
608
690a6ec1
MW
609## Deferral.
610_deferq = []
611def defer(func, *args, **kw):
612 """Call FUNC(*ARGS, **KW) later, in the root coroutine."""
613 _deferq.append((func, args, kw))
614
22b47552
MW
615def funargstr(func, args, kw):
616 items = [repr(a) for a in args]
617 for k, v in kw.iteritems():
618 items.append('%s = %r' % (k, v))
619 return '%s(%s)' % (func.__name__, ', '.join(items))
620
690a6ec1
MW
621def spawn(func, *args, **kw):
622 """Call FUNC, passing ARGS and KW, in a fresh coroutine."""
22b47552
MW
623 defer(lambda: (Coroutine(func, name = funargstr(func, args, kw))
624 .switch(*args, **kw)))
690a6ec1
MW
625
626## Asides.
627_asideq = Queue()
628def _runasides():
629 """
630 Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW).
631 """
632 while True:
633 func, args, kw = _asideq.get()
634 try:
635 func(*args, **kw)
636 except:
637 SYS.excepthook(*SYS.exc_info())
638
639def aside(func, *args, **kw):
640 """Call FUNC(*ARGS, **KW) later, in a non-root coroutine."""
641 defer(_asideq.put, (func, args, kw))
642
2fa80010
MW
643class TripeCommandDispatcher (TripeConnection):
644 """
645 Command dispatcher.
646
647 The command dispatcher is a connection which knows how to handle commands.
648 This is probably the most important class in this module to understand.
649
650 Lines from the server are parsed into tokens. The first token is a code
651 (OK or NOTE or something) explaining what kind of line this is. The
652 `handler' attribute is a dictionary mapping server line codes to handler
653 functions, which are applied to the words of the line as individual
654 arguments. *Exception*: the content of TRACE lines is not tokenized.
655
656 There are default handlers for server codes which respond to commands.
657 Commands arrive as TripeCommand instances through the `rawcommand'
658 interface. The dispatcher keeps track of which command objects represent
659 which jobs, and sends responses on to the appropriate command objects by
660 invoking their `response' methods. Command objects don't see the
661 BG... codes, because the dispatcher has already transformed them into
662 regular codes when it was looking up job code.
663
664 The dispatcher also has a special response code of its own: CONNERR
665 indicates that the connection failed and the command has therefore been
666 lost; the
667 """
668
669 ## --- Infrastructure ---
670 ##
671 ## We will get confused if we pipeline commands. Send them one at a time.
672 ## Only send a command when the previous one detaches or completes.
673 ##
674 ## The following attributes are interesting:
675 ##
676 ## tagseq Sequence number for next background job (for bgtag)
677 ##
678 ## queue Commands awaiting submission.
679 ##
680 ## cmd Mapping from job tags to commands: cmd[None] is the
681 ## foreground command.
682 ##
683 ## handler Mapping from server codes to handler functions.
684
685 def __init__(me, socket):
686 """
687 Initialize the dispatcher.
688
689 The SOCKET is the filename of the administration socket to connect to,
690 for TripeConnection.__init__.
691 """
692 TripeConnection.__init__(me, socket)
693 me.tagseq = 0
694 me.handler = {}
695 me.handler['BGDETACH'] = me._detach
696 for i in 'BGOK', 'BGINFO', 'BGFAIL':
697 me.handler[i] = me._response
698 for i in 'OK', 'INFO', 'FAIL':
699 me.handler[i] = me._fgresponse
700
690a6ec1
MW
701 def quitp(me):
702 """Should we quit the main loop? Subclasses should override."""
703 return False
704
705 def mainloop(me, quitp = None):
706 """
707 Iterate the I/O watcher until QUITP returns true.
708
709 Arranges for asides and deferred calls to be made at the right times.
710 """
711
712 global _deferq
713 assert _Coroutine.getcurrent() is rootcr
22b47552
MW
714 Coroutine(_runasides, name = '_runasides').switch()
715 if quitp is None:
716 quitp = me.quitp
690a6ec1
MW
717 while not quitp():
718 while _deferq:
719 q = _deferq
720 _deferq = []
721 for func, args, kw in q:
722 func(*args, **kw)
723 me.iowatch.iterate()
724
2fa80010
MW
725 def connected(me):
726 """
727 Connection hook.
728
729 If a subclass overrides this method, it must call us; clears out the
730 command queue and job map.
731 """
732 me.queue = M.Array()
733 me.cmd = {}
690a6ec1 734 TripeConnection.connected(me)
2fa80010
MW
735
736 def disconnected(me, reason):
737 """
738 Disconnection hook.
739
740 If a subclass hooks overrides this method, it must call us; sends a
741 special CONNERR code to all incomplete commands.
742 """
690a6ec1 743 TripeConnection.disconnected(me, reason)
2fa80010
MW
744 for cmd in me.cmd.itervalues():
745 cmd.response('CONNERR', reason)
746 for cmd in me.queue:
747 cmd.response('CONNERR', reason)
748
749 @_callback
750 def line(me, line):
751 """Handle an incoming line, sending it to the right place."""
752 if _debug: print '<', line
753 code, rest = M.word(line, quotep = True)
754 func = me.handler.get(code)
755 if func is not None:
756 if code == 'TRACE':
757 func(code, rest)
758 else:
759 func(code, *M.split(rest, quotep = True)[0])
760 me.dequeue()
761
762 def dequeue(me):
763 """
764 Pull the oldest command off the queue and try to send it to the server.
765 """
766 if not me.queue or None in me.cmd: return
767 cmd = me.queue.shift()
768 if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
769 me.send(' '.join([quotify(w) for w in cmd.words]))
770 me.cmd[None] = cmd
771
772 def bgtag(me):
773 """
774 Return an unused job tag.
775
776 May be of use when composing commands by hand.
777 """
778 tag = 'J%05d' % me.tagseq
779 me.tagseq += 1
780 return tag
781
782 ## --- Built-in handler functions for server responses ---
783
784 def _detach(me, _, tag):
785 """
786 Respond to a BGDETACH TAG message.
787
788 Move the current foreground command to the background.
789 """
790 assert tag not in me.cmd
791 me.cmd[tag] = me.cmd[None]
792 del me.cmd[None]
793
794 def _response(me, code, tag, *w):
795 """
796 Respond to an OK, INFO or FAIL message.
797
798 If this is a message for a background job, find the tag; then dispatch
799 the result to the command object.
800 """
801 if code.startswith('BG'):
802 code = code[2:]
803 cmd = me.cmd[tag]
804 if code != 'INFO':
805 del me.cmd[tag]
806 cmd.response(code, *w)
807
808 def _fgresponse(me, code, *w):
809 """Process responses to the foreground command."""
810 me._response(code, None, *w)
811
812 ## --- Interface methods ---
813
814 def rawcommand(me, cmd):
815 """
816 Submit the TripeCommand CMD to the server, and look after it until it
817 completes.
818 """
819 if not me.connectedp():
820 raise TripeConnectionError('connection closed')
821 me.queue.push(cmd)
822 me.dequeue()
823
824 def command(me, *cmd, **kw):
825 """Convenience wrapper for creating a TripeCommandIterator object."""
826 return TripeCommandIterator(me, cmd, **kw)
827
828 ## --- Convenience methods for server commands ---
829
830 def add(me, peer, *addr, **kw):
831 return _simple(me.command(bg = True,
832 *['ADD'] +
48b84569 833 _kwopts(kw, ['tunnel', 'keepalive',
fe2a5dcf
MW
834 'key', 'priv', 'cork',
835 'mobile']) +
2fa80010
MW
836 [peer] +
837 list(addr)))
838 def addr(me, peer):
839 return _oneline(me.command('ADDR', peer))
35c8b547
MW
840 def algs(me, peer = None):
841 return _keyvals(me.command('ALGS',
842 *((peer is not None and [peer]) or [])))
2fa80010
MW
843 def checkchal(me, chal):
844 return _simple(me.command('CHECKCHAL', chal))
845 def daemon(me):
846 return _simple(me.command('DAEMON'))
847 def eping(me, peer, **kw):
848 return _oneline(me.command(bg = True,
849 *['PING'] +
850 _kwopts(kw, ['timeout']) +
851 [peer]))
852 def forcekx(me, peer):
853 return _simple(me.command('FORCEKX', peer))
854 def getchal(me):
855 return _oneline(me.command('GETCHAL', filter = _tokenjoin))
856 def greet(me, peer, chal):
857 return _simple(me.command('GREET', peer, chal))
858 def help(me):
859 return list(me.command('HELP', filter = _tokenjoin))
860 def ifname(me, peer):
861 return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
862 def kill(me, peer):
863 return _simple(me.command('KILL', peer))
864 def list(me):
865 return list(me.command('LIST', filter = _tokenjoin))
866 def notify(me, *msg):
867 return _simple(me.command('NOTIFY', *msg))
868 def peerinfo(me, peer):
869 return _keyvals(me.command('PEERINFO', peer))
870 def ping(me, peer, **kw):
871 return _oneline(me.command(bg = True,
872 *['PING'] +
873 _kwopts(kw, ['timeout']) +
874 [peer]))
875 def port(me):
876 return _oneline(me.command('PORT', filter = _tokenjoin))
877 def quit(me):
878 return _simple(me.command('QUIT'))
879 def reload(me):
880 return _simple(me.command('RELOAD'))
881 def servinfo(me):
882 return _keyvals(me.command('SERVINFO'))
883 def setifname(me, new):
884 return _simple(me.command('SETIFNAME', new))
885 def svcclaim(me, service, version):
886 return _simple(me.command('SVCCLAIM', service, version))
887 def svcensure(me, service, version = None):
888 return _simple(me.command('SVCENSURE', service,
889 *((version is not None and [version]) or [])))
890 def svcfail(me, job, *msg):
891 return _simple(me.command('SVCFAIL', job, *msg))
892 def svcinfo(me, job, *msg):
893 return _simple(me.command('SVCINFO', job, *msg))
894 def svclist(me):
895 return list(me.command('SVCLIST'))
896 def svcok(me, job):
897 return _simple(me.command('SVCOK', job))
898 def svcquery(me, service):
899 return _keyvals(me.command('SVCQUERY', service))
900 def svcrelease(me, service):
901 return _simple(me.command('SVCRELEASE', service))
902 def svcsubmit(me, service, *args, **kw):
903 return me.command(bg = True,
904 *['SVCSUBMIT'] +
905 _kwopts(kw, ['version']) +
906 [service] +
907 list(args))
908 def stats(me, peer):
909 return _keyvals(me.command('STATS', peer))
910 def trace(me, *args):
911 return _tracelike(me.command('TRACE', *args))
912 def tunnels(me):
913 return list(me.command('TUNNELS', filter = _tokenjoin))
914 def version(me):
915 return _oneline(me.command('VERSION', filter = _tokenjoin))
916 def warn(me, *msg):
917 return _simple(me.command('WARN', *msg))
918 def watch(me, *args):
919 return _tracelike(me.command('WATCH', *args))
920
921###--------------------------------------------------------------------------
922### Asynchronous commands.
923
2fa80010
MW
924class TripeAsynchronousCommand (TripeCommand):
925 """
926 Asynchronous commands.
927
928 This is the complicated way of issuing commands. You must set up a queue,
929 and associate the command with the queue. Responses arriving for the
930 command will be put on the queue as an triple of the form (TAG, CODE, REST)
931 -- where TAG is an object of your choice, not interpreted by this class,
932 CODE is the server's response code (OK, INFO, FAIL), and REST is the list
933 of the rest of the server's tokens.
934
935 Using this, you can write coroutines which process many commands (and
936 possibly other events) simultaneously.
937 """
938
939 def __init__(me, queue, tag, words):
940 """Make an asynchronous command consisting of the given WORDS, which
941 sends responses to QUEUE, labelled with TAG."""
942 TripeCommand.__init__(me, words)
943 me.queue = queue
944 me.tag = tag
945
946 def response(me, code, *stuff):
947 """Handle a server response by writing it to the caller's queue."""
948 me.queue.put((me.tag, code, list(stuff)))
949
2fa80010
MW
950###--------------------------------------------------------------------------
951### Services.
952
953class TripeJobCancelled (Exception):
954 """
955 Exception sent to job handler if the client kills the job.
956
957 Not propagated further.
958 """
959 pass
960
961class TripeJobError (Exception):
962 """
963 Exception to cause failure report for running job.
964
965 Sends an SVCFAIL code back.
966 """
967 pass
968
969class TripeSyntaxError (Exception):
970 """
971 Exception to report a syntax error for a job.
972
973 Sends an SVCFAIL bad-svc-syntax message back.
974 """
975 pass
976
690a6ec1 977class TripeServiceManager (TripeCommandDispatcher):
2fa80010
MW
978 """
979 A command dispatcher with added handling for incoming service requests.
980
981 There is usually only one instance of this class, called svcmgr. Some of
982 the support functions in this module assume that this is the case.
983
984 To use, run mLib.select in a loop until the quitp method returns true;
985 then, in a non-root coroutine, register your services by calling `add', and
986 then call `running' when you've finished setting up.
987
988 The instance handles server service messages SVCJOB, SVCCANCEL and
989 SVCCLAIM. It maintains a table of running services. Incoming jobs cause
990 the service's `job' method to be invoked; SVCCANCEL sends a
991 TripeJobCancelled exception to the handler coroutine, and SVCCLAIM causes
992 the relevant service to be deregistered.
993
994 There is no base class for jobs, but a job must implement two methods:
995
996 start() Begin processing; might be a no-op.
997
998 cancel() Stop processing; the original client has killed the
999 job.
1000
1001 The life of a service manager is divided into two parts: setup and running;
1002 you tell the manager that you've finished setting up by calling the
1003 `running' method. If, at any point after setup is finished, there are no
1004 remaining services or jobs, `quitp' will return true, ending the process.
1005 """
1006
1007 ## --- Attributes ---
1008 ##
1009 ## svc Mapping name -> service object
1010 ##
1011 ## job Mapping jobid -> job handler coroutine
1012 ##
1013 ## runningp True when setup is finished
1014 ##
1015 ## _quitp True if explicit quit has been requested
1016
1017 def __init__(me, socket):
1018 """
1019 Initialize the service manager.
1020
1021 SOCKET is the administration socket to connect to.
1022 """
690a6ec1 1023 TripeCommandDispatcher.__init__(me, socket)
2fa80010
MW
1024 me.svc = {}
1025 me.job = {}
1026 me.runningp = False
1027 me.handler['SVCCANCEL'] = me._cancel
1028 me.handler['SVCJOB'] = me._job
1029 me.handler['SVCCLAIM'] = me._claim
1030 me._quitp = 0
1031
1032 def addsvc(me, svc):
1033 """Register a new service; SVC is a TripeService instance."""
1034 assert svc.name not in me.svc
1035 me.svcclaim(svc.name, svc.version)
1036 me.svc[svc.name] = svc
1037
1038 def _cancel(me, _, jid):
1039 """
1040 Called when the server cancels a job; invokes the job's `cancel' method.
1041 """
1042 job = me.job[jid]
1043 del me.job[jid]
1044 job.cancel()
1045
1046 def _claim(me, _, svc, __):
1047 """Called when another program claims our service at a higher version."""
1048 del me.svc[svc]
1049
1050 def _job(me, _, jid, svc, cmd, *args):
1051 """
1052 Called when the server sends us a job to do.
1053
1054 Calls the service to collect a job, and begins processing it.
1055 """
1056 assert jid not in me.job
1057 svc = me.svc[svc.lower()]
1058 job = svc.job(jid, cmd, args)
1059 me.job[jid] = job
1060 job.start()
1061
1062 def running(me):
1063 """Answer true if setup is finished."""
1064 me.runningp = True
1065
1066 def jobdone(me, jid):
1067 """Informs the service manager that the job with id JID has finished."""
1068 try:
1069 del me.job[jid]
1070 except KeyError:
1071 pass
1072
1073 def quitp(me):
1074 """
1075 Return true if no services or jobs are active (and, therefore, if this
1076 process can quit without anyone caring).
1077 """
1078 return me._quitp or (me.runningp and ((not me.svc and not me.job) or
690a6ec1 1079 not me.sock))
2fa80010
MW
1080
1081 def quit(me):
1082 """Forces the quit flag (returned by quitp) on."""
1083 me._quitp = True
1084
1085class TripeService (object):
1086 """
1087 A standard service.
1088
1089 The NAME and VERSION are passed on to the server. The CMDTAB is a
1090 dictionary mapping command names (in lowercase) to command objects.
1091
1092 If the CMDTAB doesn't have entries for commands HELP and QUIT then defaults
1093 are provided.
1094
1095 TripeService itself is mostly agnostic about the nature of command objects,
1096 but the TripeServiceJob class (below) has some requirements. The built-in
1097 HELP command requires command objects to have `usage' attributes.
1098 """
1099
1100 def __init__(me, name, version, cmdtab):
1101 """
1102 Create and register a new service with the given NAME and VERSION.
1103
1104 CMDTAB maps command names (in lower-case) to command objects.
1105 """
1106 me.name = name
1107 me.version = version
1108 me.cmd = cmdtab
1109 me.activep = True
1110 me.cmd.setdefault('help',
1111 TripeServiceCommand('help', 0, 0, '', me._help))
1112 me.cmd.setdefault('quit',
1113 TripeServiceCommand('quit', 0, 0, '', me._quit))
1114
1115 def job(me, jid, cmd, args):
1116 """
1117 Called by the service manager: a job arrived with id JID.
1118
1119 It asks for comamnd CMD with argument list ARGS. Creates a new job,
1120 passing it the information needed.
1121 """
1122 return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1123
1124 ## Simple default command handlers, complying with the spec in
1125 ## tripe-service(7).
1126
1127 def _help(me):
1128 """Send a help summary to the user."""
1129 cmds = me.cmd.items()
1130 cmds.sort()
1131 for name, cmd in cmds:
1132 svcinfo(name, *cmd.usage)
1133
1134 def _quit(me):
1135 """Terminate the service manager."""
1136 svcmgr.notify('svc-quit', me.name, 'admin-request')
1137 svcmgr.quit()
1138
1139class TripeServiceCommand (object):
1140 """A simple service command."""
1141
1142 def __init__(me, name, min, max, usage, func):
1143 """
1144 Creates a new command.
1145
1146 NAME is the command's name (in lowercase).
1147
1148 MIN and MAX are the minimum and maximum number of allowed arguments (used
1149 for checking); either may be None to indicate no minimum or maximum.
1150
1151 USAGE is a usage string, used for generating help and error messages.
1152
1153 FUNC is the function to invoke.
1154 """
1155 me.name = name
1156 me.min = min
1157 me.max = max
1158 me.usage = usage.split()
1159 me.func = func
1160
1161 def run(me, *args):
1162 """
1163 Called when the command is invoked.
1164
1165 Does minimal checking of the arguments and calls the supplied function.
1166 """
1167 if (me.min is not None and len(args) < me.min) or \
1168 (me.max is not None and len(args) > me.max):
1169 raise TripeSyntaxError
1170 me.func(*args)
1171
1172class TripeServiceJob (Coroutine):
1173 """
1174 Job handler coroutine.
1175
1176 A standard TripeService invokes a TripeServiceJob for each incoming job
1177 request, passing it the jobid, command and arguments, and a command
1178 object. The command object needs the following attributes.
1179
1180 usage A usage list (excluding the command name) showing
1181 arguments and options.
1182
1183 run(*ARGS) Function to react to the command with ARGS split into
1184 separate arguments. Invoked in a coroutine. The
1185 svcinfo function (not the TripeCommandDispatcher
1186 method) may be used to send INFO lines. The function
1187 may raise TripeJobError to send a FAIL response back,
1188 or TripeSyntaxError to send a generic usage error.
1189 TripeJobCancelled exceptions are trapped silently.
1190 Other exceptions are translated into a generic
1191 internal-error message.
1192
1193 This class automatically takes care of sending some closing response to the
1194 job, and for informing the service manager that the job is completed.
1195
1196 The `jid' attribute stores the job's id.
1197 """
1198
1199 def __init__(me, jid, svc, cmd, command, args):
1200 """
1201 Start a new job.
1202
1203 The job is created with id JID, for service SVC, processing command name
1204 CMD (which the service resolved into the command object COMMAND, or
1205 None), and with the arguments ARGS.
1206 """
1207 Coroutine.__init__(me)
1208 me.jid = jid
1209 me.svc = svc
1210 me.cmd = cmd
1211 me.command = command
1212 me.args = args
1213
1214 def run(me):
1215 """
1216 Main body of the coroutine.
1217
1218 Does the tedious exception handling boilerplate and invokes the command's
1219 run method.
1220 """
1221 try:
1222 try:
1223 if me.command is None:
1224 svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1225 else:
1226 me.command.run(*me.args)
1227 svcmgr.svcok(me.jid)
1228 except TripeJobError, exc:
1229 svcmgr.svcfail(me.jid, *exc.args)
1230 except TripeSyntaxError:
1231 svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1232 me.svc.name, me.command.name,
1233 *me.command.usage)
1234 except TripeJobCancelled:
1235 pass
1236 except Exception, exc:
1237 svcmgr.svcfail(me.jid, 'svc-internal-error',
1238 exc.__class__.__name__, str(exc))
1239 finally:
1240 svcmgr.jobdone(me.jid)
1241
1242 def start(me):
1243 """Invoked by the service manager to start running the coroutine."""
1244 me.switch()
1245
1246 def cancel(me):
1247 """Invoked by the service manager to cancel the job."""
1248 me.throw(TripeJobCancelled())
1249
1250def svcinfo(*args):
1251 """
1252 If invoked from a TripeServiceJob coroutine, sends an INFO line to the
1253 job's sender, automatically using the correct job id.
1254 """
1255 svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1256
1257def _setupsvc(tab, func):
1258 """
1259 Setup coroutine for setting up service programs.
1260
1261 Register the given services.
1262 """
1263 try:
1264 for service in tab:
1265 svcmgr.addsvc(service)
1266 if func:
1267 func()
1268 finally:
1269 svcmgr.running()
1270
1271svcmgr = TripeServiceManager(None)
2fa80010
MW
1272def runservices(socket, tab, init = None, setup = None, daemon = False):
1273 """
1274 Function to start a service provider.
1275
1276 SOCKET is the socket to connect to, usually tripesock.
1277
1278 TAB is a list of entries. An entry may be either a tuple
1279
1280 (NAME, VERSION, COMMANDS)
1281
1282 or a service object (e.g., a TripeService instance).
1283
1284 COMMANDS is a dictionary mapping command names to tuples
1285
1286 (MIN, MAX, USAGE, FUNC)
1287
1288 of arguments for a TripeServiceCommand object.
1289
1290 If DAEMON is true, then the process is forked into the background before we
1291 start. If INIT is given, it is called in the main coroutine, immediately
1292 after forking. If SETUP is given, it is called in a coroutine, after
1293 calling INIT and setting up the services but before marking the service
1294 manager as running.
1295
1296 It is a really bad idea to do any initialization, particularly setting up
1297 coroutines, outside of the INIT or SETUP functions. In particular, if
1298 we're using rmcr for fake coroutines, the daemonizing fork will kill off
1299 the currently established coroutines in a most surprising way.
1300
1301 The function runs a main select loop until the service manager decides to
1302 quit.
1303 """
1304
2fa80010
MW
1305 svcmgr.socket = socket
1306 svcmgr.connect()
1307 svcs = []
1308 for service in tab:
1309 if not isinstance(service, tuple):
1310 svcs.append(service)
1311 else:
1312 name, version, commands = service
1313 cmdmap = {}
1314 for cmd, stuff in commands.iteritems():
1315 cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1316 svcs.append(TripeService(name, version, cmdmap))
1317 if daemon:
1318 M.daemonize()
1319 if init is not None:
1320 init()
690a6ec1
MW
1321 spawn(_setupsvc, svcs, setup)
1322 svcmgr.mainloop()
2fa80010
MW
1323
1324###--------------------------------------------------------------------------
1325### Utilities for services.
1326
1327_timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
1328def timespec(spec):
1329 """Parse the timespec SPEC, returning a number of seconds."""
1330 mul = 1
1331 if len(spec) > 1 and spec[-1] in _timeunits:
1332 mul = _timeunits[spec[-1]]
1333 spec = spec[:-1]
1334 try:
1335 t = int(spec)
1336 except:
1337 raise TripeJobError('bad-time-spec', spec)
1338 if t < 0:
1339 raise TripeJobError('bad-time-spec', spec)
1340 return mul * int(spec)
1341
1342class OptParse (object):
1343 """
1344 Parse options from a command list in the conventional fashion.
1345
1346 ARGS is a list of arguments to a command. ALLOWED is a sequence of allowed
1347 options. The returned values are the option tags. During parsing, the
1348 `arg' method may be used to retrieve the argument for the most recent
1349 option. Afterwards, `rest' may be used to retrieve the remaining
1350 non-option arguments, and do a simple check on how many there are.
1351
1352 The parser correctly handles `--' option terminators.
1353 """
1354
1355 def __init__(me, args, allowed):
1356 """
1357 Create a new option parser.
1358
1359 The parser will scan the ARGS for options given in the sequence ALLOWED
1360 (which are expected to include the `-' prefix).
1361 """
1362 me.allowed = {}
1363 for a in allowed:
1364 me.allowed[a] = True
1365 me.args = list(args)
1366
1367 def __iter__(me):
1368 """Iterator protocol: I am my own iterator."""
1369 return me
1370
1371 def next(me):
1372 """
1373 Iterator protocol: return the next option.
1374
1375 If we've run out, raise StopIteration.
1376 """
1377 if len(me.args) == 0 or \
1378 len(me.args[0]) < 2 or \
1379 not me.args[0].startswith('-'):
1380 raise StopIteration
1381 opt = me.args.pop(0)
1382 if opt == '--':
1383 raise StopIteration
1384 if opt not in me.allowed:
1385 raise TripeSyntaxError
1386 return opt
1387
1388 def arg(me):
1389 """
1390 Return the argument for the most recent option.
1391
1392 If none is available, raise TripeSyntaxError.
1393 """
1394 if len(me.args) == 0:
1395 raise TripeSyntaxError
1396 return me.args.pop(0)
1397
1398 def rest(me, min = None, max = None):
1399 """
1400 After option parsing is done, return the remaining arguments.
1401
1402 Check that there are at least MIN and at most MAX arguments remaining --
1403 either may be None to suppress the check.
1404 """
1405 if (min is not None and len(me.args) < min) or \
1406 (max is not None and len(me.args) > max):
1407 raise TripeSyntaxError
1408 return me.args
1409
1410###----- That's all, folks --------------------------------------------------