chiark / gitweb /
svc/conntrack.in: New service to track connection status.
[tripe] / py / tripe.py.in
CommitLineData
2fa80010
MW
1### -*-python-*-
2###
3### Administration connection with tripe server
4###
5### (c) 2006 Straylight/Edgeware
6###
7
8###----- Licensing notice ---------------------------------------------------
9###
10### This file is part of Trivial IP Encryption (TrIPE).
11###
12### TrIPE is free software; you can redistribute it and/or modify
13### it under the terms of the GNU General Public License as published by
14### the Free Software Foundation; either version 2 of the License, or
15### (at your option) any later version.
16###
17### TrIPE is distributed in the hope that it will be useful,
18### but WITHOUT ANY WARRANTY; without even the implied warranty of
19### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20### GNU General Public License for more details.
21###
22### You should have received a copy of the GNU General Public License
23### along with TrIPE; if not, write to the Free Software Foundation,
24### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25
26"""
27This module provides classes and functions for connecting to a running tripe
28server, sending it commands, receiving and processing replies, and
29implementing services.
30
31Rather than end up in lost in a storm of little event-driven classes, or a
32morass of concurrent threads, the module uses coroutines to present a fairly
33simple function call/return interface to potentially long-running commands
a62f8e8a 34which must run without blocking the main process. It assumes a coroutine
2fa80010
MW
35module presenting a subset of the `greenlet' interface: if actual greenlets
36are available, they are used; otherwise there's an implementation in terms of
37threads (with lots of locking) which will do instead.
38
39The simple rule governing the coroutines used here is this:
40
41 * The root coroutine never cares what values are passed to it when it
42 resumes: it just discards them.
43
44 * Other, non-root, coroutines are presumed to be waiting for some specific
45 thing.
46
47Configuration variables:
48 configdir
49 socketdir
50 PACKAGE
51 VERSION
52 tripesock
53 peerdb
54
55Other useful variables:
56 rootcr
57 svcmgr
58
59Other tweakables:
60 _debug
61
62Exceptions:
63 Exception
64 StandardError
65 TripeConnectionError
66 TripeError
67 TripeInternalError
68 TripeJobCancelled
69 TripeJobError
70 TripeSyntaxError
71
72Classes:
73 _Coroutine
74 Coroutine
75 TripeServiceJob
76 OptParse
77 Queue
690a6ec1 78 SelIOWatcher
2fa80010
MW
79 TripeCommand
80 TripeSynchronousCommand
81 TripeAsynchronousCommand
82 TripeCommandIterator
83 TripeConnection
84 TripeCommandDispatcher
690a6ec1 85 TripeServiceManager
2fa80010
MW
86 TripeService
87 TripeServiceCommand
88
89Utility functions:
90 quotify
91 runservices
92 spawn
93 svcinfo
94 timespec
95"""
96
97__pychecker__ = 'self=me no-constCond no-argsused'
98
99_debug = False
100
101###--------------------------------------------------------------------------
102### External dependencies.
103
104import socket as S
105import errno as E
106import mLib as M
107import re as RX
108import sys as SYS
109import os as OS
110
111try:
112 if OS.getenv('TRIPE_FORCE_RMCR') is not None:
113 raise ImportError
114 from py.magic import greenlet as _Coroutine
115except ImportError:
116 from rmcr import Coroutine as _Coroutine
117
118###--------------------------------------------------------------------------
119### Coroutine hacking.
120
121rootcr = _Coroutine.getcurrent()
122
123class Coroutine (_Coroutine):
124 """
125 A coroutine class which can only be invoked by the root coroutine.
126
127 The root, by construction, cannot be an instance of this class.
128 """
129 def switch(me, *args, **kw):
130 assert _Coroutine.getcurrent() is rootcr
131 _Coroutine.switch(me, *args, **kw)
132
133###--------------------------------------------------------------------------
134### Default places for things.
135
136configdir = OS.environ.get('TRIPEDIR', "@configdir@")
137socketdir = "@socketdir@"
138PACKAGE = "@PACKAGE@"
139VERSION = "@VERSION@"
140
141tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
6005ef9b 142peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb')
2fa80010
MW
143
144###--------------------------------------------------------------------------
145### Connection to the server.
146
147def readnonblockingly(sock, len):
148 """
149 Nonblocking read from SOCK.
150
151 Try to return LEN bytes. If couldn't read anything, return None. EOF is
152 returned as an empty string.
153 """
154 try:
155 sock.setblocking(0)
156 return sock.recv(len)
157 except S.error, exc:
158 if exc[0] == E.EWOULDBLOCK:
159 return None
160 raise
161
162class TripeConnectionError (StandardError):
163 """Something happened to the connection with the server."""
164 pass
165class TripeInternalError (StandardError):
166 """This program is very confused."""
167 pass
168
169class TripeConnection (object):
170 """
171 A logical connection to the tripe administration socket.
172
173 There may or may not be a physical connection. (This is needed for the
174 monitor, for example.)
175
176 This class isn't very useful on its own, but it has useful subclasses. At
177 this level, the class is agnostic about I/O multiplexing schemes; that gets
178 added later.
179 """
180
181 def __init__(me, socket):
182 """
183 Make a connection to the named SOCKET.
184
185 No physical connection is made initially.
186 """
187 me.socket = socket
188 me.sock = None
189 me.lbuf = None
690a6ec1 190 me.iowatch = SelIOWatcher(me)
2fa80010
MW
191
192 def connect(me):
193 """
194 Ensure that there's a physical connection.
195
196 Do nothing if we're already connected. Invoke the `connected' method if
197 successful.
198 """
199 if me.sock: return
200 sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
201 sock.connect(me.socket)
202 me.sock = sock
203 me.lbuf = M.LineBuffer(me.line, me._eof)
204 me.lbuf.size = 1024
205 me.connected()
206 return me
207
208 def disconnect(me, reason):
209 """
210 Disconnect the physical connection.
211
212 Invoke the `disconnected' method, giving the provided REASON, which
213 should be either None or an exception.
214 """
215 if not me.sock: return
216 me.disconnected(reason)
217 me.sock.close()
218 me.sock = None
219 me.lbuf.disable()
220 me.lbuf = None
221 return me
222
223 def connectedp(me):
224 """
225 Return true if there's a current, believed-good physical connection.
226 """
227 return me.sock is not None
228
229 __nonzero__ = connectedp
230
231 def send(me, line):
232 """
233 Send the LINE to the connection's socket.
234
235 All output is done through this method; it can be overridden to provide
236 proper nonblocking writing, though this seems generally unnecessary.
237 """
238 try:
239 me.sock.setblocking(1)
240 me.sock.send(line + '\n')
241 except Exception, exc:
242 me.disconnect(exc)
243 raise
244 return me
245
246 def receive(me):
247 """
248 Receive whatever's ready from the connection's socket.
249
250 Call `line' on each complete line, and `eof' if the connection closed.
251 Subclasses which attach this class to an I/O-event system should call
252 this method when the socket (CONN.sock) is ready for reading.
253 """
254 while me.sock is not None:
255 try:
256 buf = readnonblockingly(me.sock, 16384)
257 except Exception, exc:
258 me.disconnect(exc)
259 raise
260 if buf is None:
261 return me
262 if buf == '':
263 me._eof()
264 return me
265 me.lbuf.flush(buf)
266 return me
267
268 def _eof(me):
269 """Internal end-of-file handler."""
270 me.disconnect(TripeConnectionError('connection lost'))
271 me.eof()
272
273 def connected(me):
274 """
275 To be overridden by subclasses to react to a connection being
276 established.
277 """
690a6ec1 278 me.iowatch.connected(me.sock)
2fa80010
MW
279
280 def disconnected(me, reason):
281 """
282 To be overridden by subclasses to react to a connection being severed.
283 """
690a6ec1 284 me.iowatch.disconnected()
2fa80010
MW
285
286 def eof(me):
287 """To be overridden by subclasses to handle end-of-file."""
288 pass
289
290 def line(me, line):
291 """To be overridden by subclasses to handle incoming lines."""
292 pass
293
690a6ec1
MW
294###--------------------------------------------------------------------------
295### I/O loop integration.
296
297class SelIOWatcher (object):
298 """
299 Integration with mLib's I/O event system.
300
301 You can replace this object with a different one for integration with,
302 e.g., glib's main loop, by setting CONN.iowatcher to a different object
303 while the CONN is disconnected.
304 """
305
306 def __init__(me, conn):
307 me._conn = conn
308 me._selfile = None
309
310 def connected(me, sock):
311 """
312 Called when a connection is made.
313
314 SOCK is the socket. The watcher must arrange to call CONN.receive when
315 data is available.
316 """
317 me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive)
318 me._selfile.enable()
319
320 def disconnected(me):
321 """
322 Called when the connection is lost.
323 """
324 me._selfile = None
325
326 def iterate(me):
327 """
328 Wait for something interesting to happen, and issue events.
329
330 That is, basically, do one iteration of a main select loop, processing
331 all of the events, and then return. This isn't needed for
332 TripeCommandDispatcher, but runservices wants it.
333 """
334 M.select()
335
336###--------------------------------------------------------------------------
337### Inter-coroutine communication.
338
339class Queue (object):
340 """
341 A queue of things arriving asynchronously.
342
343 This is a very simple single-reader multiple-writer queue. It's useful for
344 more complex coroutines which need to cope with a variety of possible
345 incoming events.
346 """
347
348 def __init__(me):
349 """Create a new empty queue."""
350 me.contents = M.Array()
351 me.waiter = None
352
353 def _wait(me):
354 """
355 Internal: wait for an item to arrive in the queue.
356
357 Complain if someone is already waiting, because this is just a
358 single-reader queue.
359 """
360 if me.waiter:
361 raise ValueError('queue already being waited on')
362 try:
363 me.waiter = Coroutine.getcurrent()
364 while not me.contents:
365 me.waiter.parent.switch()
366 finally:
367 me.waiter = None
368
369 def get(me):
370 """
371 Remove and return the item at the head of the queue.
372
373 If the queue is empty, wait until an item arrives.
374 """
375 me._wait()
376 return me.contents.shift()
377
378 def peek(me):
379 """
380 Return the item at the head of the queue without removing it.
381
382 If the queue is empty, wait until an item arrives.
383 """
384 me._wait()
385 return me.contents[0]
386
387 def put(me, thing):
388 """
389 Write THING to the queue.
390
391 If someone is waiting on the queue, wake him up immediately; otherwise
392 just leave the item there for later.
393 """
394 me.contents.push(thing)
395 if me.waiter:
396 me.waiter.switch()
397
2fa80010
MW
398###--------------------------------------------------------------------------
399### Dispatching coroutine.
400
401## Match a string if it can stand on its own as a bareword: i.e., it doesn't
402## contain backslashes, quotes or whitespace.
403rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
404
405## Match characters which need to be escaped, even in quoted text.
406rx_weird = RX.compile(r'([\\\'])')
407
408def quotify(s):
409 """Quote S according to the tripe-admin(5) rules."""
410 m = rx_ordinary.match(s)
411 if m and m.end() == len(s):
412 return s
413 else:
414 return "'" + rx_weird.sub(r'\\\1', s) + "'"
415
416def _callback(func):
417 """
418 Return a wrapper for FUNC which reports exceptions thrown by it.
419
420 Useful in the case of callbacks invoked by C functions which ignore
421 exceptions.
422 """
423 def _(*a, **kw):
424 try:
425 return func(*a, **kw)
426 except:
427 SYS.excepthook(*SYS.exc_info())
428 raise
429 return _
430
431class TripeCommand (object):
432 """
433 This abstract class represents a command in progress.
434
435 The `words' attribute contains the list of tokens which make up the
436 command.
437
438 Subclasses must implement a method to handle server responses:
439
440 * response(CODE, *ARGS): CODE is one of the strings 'OK', 'INFO' or
441 'FAIL'; ARGS are the remaining tokens from the server's response.
442 """
443
444 def __init__(me, words):
445 """Make a new command consisting of the given list of WORDS."""
446 me.words = words
447
448class TripeSynchronousCommand (TripeCommand):
449 """
450 A simple command, processed apparently synchronously.
451
452 Must be invoked from a coroutine other than the root (or whichever one is
453 running the dispatcher); in reality, other coroutines carry on running
454 while we wait for a response from the server.
455
456 Each server response causes the calling coroutine to be resumed with the
457 pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
458 or `FAIL') and REST is a list of the server's other response tokens. The
459 calling coroutine must continue switching back to the dispatcher until a
460 terminating response (`OK' or `FAIL') is received or become very
461 confused.
462
463 Mostly it's better to use the TripeCommandIterator to do this
464 automatically.
465 """
466
467 def __init__(me, words):
468 """Initialize the command, specifying the WORDS to send to the server."""
469 TripeCommand.__init__(me, words)
470 me.owner = Coroutine.getcurrent()
471
472 def response(me, code, *rest):
473 """Handle a server response by forwarding it to the calling coroutine."""
474 me.owner.switch((code, rest))
475
476class TripeError (StandardError):
477 """
478 A tripe command failed with an error (a FAIL code). The args attribute
479 contains a list of the server's message tokens.
480 """
481 pass
482
483class TripeCommandIterator (object):
484 """
485 Iterator interface to a tripe command.
486
487 The values returned by the iterator are lists of tokens from the server's
488 INFO lines, as processed by the given filter function, if any. The
489 iterator completes normally (by raising StopIteration) if the server
490 reported OK, and raises an exception if the command failed for some reason.
491
492 A TripeError is raised if the server issues a FAIL code. If the connection
493 failed, some other exception is raised.
494 """
495
496 def __init__(me, dispatcher, words, bg = False, filter = None):
497 """
498 Create a new command iterator.
499
500 The command is submitted to the DISPATCHER; it consists of the given
501 WORDS. If BG is true, then an option is inserted to request that the
502 server run the command in the background. The FILTER is applied to the
503 token lists which the server responds, and the filter's output are the
504 items returned by the iterator.
505 """
506 me.dcr = Coroutine.getcurrent().parent
507 if me.dcr is None:
508 raise ValueError, 'must invoke from coroutine'
509 me.filter = filter or (lambda x: x)
510 if bg:
511 words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
512 dispatcher.rawcommand(TripeSynchronousCommand(words))
513
514 def __iter__(me):
515 """Iterator protocol: I am my own iterator."""
516 return me
517
518 def next(me):
519 """
520 Iterator protocol: return the next piece of information from the server.
521
522 INFO responses are filtered and returned as the values of the iteration.
523 FAIL and CONNERR responses are turned into exceptions and raised.
524 Finally, OK is turned into StopIteration, which should cause a normal end
525 to the iteration process.
526 """
527 thing = me.dcr.switch()
528 code, rest = thing
529 if code == 'INFO':
530 return me.filter(rest)
531 elif code == 'OK':
532 raise StopIteration
533 elif code == 'CONNERR':
534 if rest is None:
535 raise TripeConnectionError, 'connection terminated by user'
536 else:
537 raise rest
538 elif code == 'FAIL':
539 raise TripeError(*rest)
540 else:
541 raise TripeInternalError \
542 ('unexpected tripe response %r' % ([code] + rest))
543
544### Simple utility functions for the TripeCommandIterator convenience
545### methods.
546
547def _tokenjoin(words):
548 """Filter function: simply join the given tokens with spaces between."""
549 return ' '.join(words)
550
551def _keyvals(iter):
552 """Return a dictionary formed from the KEY=VALUE pairs returned by the
553 iterator ITER."""
554 kv = {}
555 for ww in iter:
556 for w in ww:
557 q = w.index('=')
558 kv[w[:q]] = w[q + 1:]
559 return kv
560
561def _simple(iter):
562 """Raise an error if ITER contains any item."""
563 stuff = list(iter)
564 if len(stuff) != 0:
565 raise TripeInternalError('expected no response')
566 return None
567
568def _oneline(iter):
569 """If ITER contains a single item, return it; otherwise raise an error."""
570 stuff = list(iter)
571 if len(stuff) != 1:
572 raise TripeInternalError('expected only one line of response')
573 return stuff[0]
574
575def _tracelike(iter):
576 """Handle a TRACE-like command. The result is a list of tuples (CHAR,
577 STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
578 disabled, `+' if enabled, maybe something else later), and DESC is the
579 human-readable description."""
580 stuff = []
581 for ww in iter:
582 ch = ww[0][0]
583 st = ww[0][1:]
584 desc = ' '.join(ww[1:])
585 stuff.append((ch, st, desc))
586 return stuff
587
588def _kwopts(kw, allowed):
589 """Parse keyword arguments into options. ALLOWED is a list of allowable
590 keywords; raise errors if other keywords are present. KEY = VALUE becomes
591 an option pair -KEY VALUE if VALUE is a string, just the option -KEY if
592 VALUE is a true non-string, or nothing if VALUE is false.. Insert a `--'
593 at the end to stop the parser getting confused."""
594 opts = []
595 amap = {}
596 for a in allowed: amap[a] = True
597 for k, v in kw.iteritems():
598 if k not in amap:
599 raise ValueError('option %s not allowed here' % k)
600 if isinstance(v, str):
601 opts += ['-' + k, v]
602 elif v:
603 opts += ['-' + k]
604 opts.append('--')
605 return opts
606
690a6ec1
MW
607## Deferral.
608_deferq = []
609def defer(func, *args, **kw):
610 """Call FUNC(*ARGS, **KW) later, in the root coroutine."""
611 _deferq.append((func, args, kw))
612
613def spawn(func, *args, **kw):
614 """Call FUNC, passing ARGS and KW, in a fresh coroutine."""
615 defer(lambda: Coroutine(func).switch(*args, **kw))
616
617## Asides.
618_asideq = Queue()
619def _runasides():
620 """
621 Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW).
622 """
623 while True:
624 func, args, kw = _asideq.get()
625 try:
626 func(*args, **kw)
627 except:
628 SYS.excepthook(*SYS.exc_info())
629
630def aside(func, *args, **kw):
631 """Call FUNC(*ARGS, **KW) later, in a non-root coroutine."""
632 defer(_asideq.put, (func, args, kw))
633
2fa80010
MW
634class TripeCommandDispatcher (TripeConnection):
635 """
636 Command dispatcher.
637
638 The command dispatcher is a connection which knows how to handle commands.
639 This is probably the most important class in this module to understand.
640
641 Lines from the server are parsed into tokens. The first token is a code
642 (OK or NOTE or something) explaining what kind of line this is. The
643 `handler' attribute is a dictionary mapping server line codes to handler
644 functions, which are applied to the words of the line as individual
645 arguments. *Exception*: the content of TRACE lines is not tokenized.
646
647 There are default handlers for server codes which respond to commands.
648 Commands arrive as TripeCommand instances through the `rawcommand'
649 interface. The dispatcher keeps track of which command objects represent
650 which jobs, and sends responses on to the appropriate command objects by
651 invoking their `response' methods. Command objects don't see the
652 BG... codes, because the dispatcher has already transformed them into
653 regular codes when it was looking up job code.
654
655 The dispatcher also has a special response code of its own: CONNERR
656 indicates that the connection failed and the command has therefore been
657 lost; the
658 """
659
660 ## --- Infrastructure ---
661 ##
662 ## We will get confused if we pipeline commands. Send them one at a time.
663 ## Only send a command when the previous one detaches or completes.
664 ##
665 ## The following attributes are interesting:
666 ##
667 ## tagseq Sequence number for next background job (for bgtag)
668 ##
669 ## queue Commands awaiting submission.
670 ##
671 ## cmd Mapping from job tags to commands: cmd[None] is the
672 ## foreground command.
673 ##
674 ## handler Mapping from server codes to handler functions.
675
676 def __init__(me, socket):
677 """
678 Initialize the dispatcher.
679
680 The SOCKET is the filename of the administration socket to connect to,
681 for TripeConnection.__init__.
682 """
683 TripeConnection.__init__(me, socket)
684 me.tagseq = 0
685 me.handler = {}
686 me.handler['BGDETACH'] = me._detach
687 for i in 'BGOK', 'BGINFO', 'BGFAIL':
688 me.handler[i] = me._response
689 for i in 'OK', 'INFO', 'FAIL':
690 me.handler[i] = me._fgresponse
691
690a6ec1
MW
692 def quitp(me):
693 """Should we quit the main loop? Subclasses should override."""
694 return False
695
696 def mainloop(me, quitp = None):
697 """
698 Iterate the I/O watcher until QUITP returns true.
699
700 Arranges for asides and deferred calls to be made at the right times.
701 """
702
703 global _deferq
704 assert _Coroutine.getcurrent() is rootcr
705 Coroutine(_runasides).switch()
706 while not quitp():
707 while _deferq:
708 q = _deferq
709 _deferq = []
710 for func, args, kw in q:
711 func(*args, **kw)
712 me.iowatch.iterate()
713
2fa80010
MW
714 def connected(me):
715 """
716 Connection hook.
717
718 If a subclass overrides this method, it must call us; clears out the
719 command queue and job map.
720 """
721 me.queue = M.Array()
722 me.cmd = {}
690a6ec1 723 TripeConnection.connected(me)
2fa80010
MW
724
725 def disconnected(me, reason):
726 """
727 Disconnection hook.
728
729 If a subclass hooks overrides this method, it must call us; sends a
730 special CONNERR code to all incomplete commands.
731 """
690a6ec1 732 TripeConnection.disconnected(me, reason)
2fa80010
MW
733 for cmd in me.cmd.itervalues():
734 cmd.response('CONNERR', reason)
735 for cmd in me.queue:
736 cmd.response('CONNERR', reason)
737
738 @_callback
739 def line(me, line):
740 """Handle an incoming line, sending it to the right place."""
741 if _debug: print '<', line
742 code, rest = M.word(line, quotep = True)
743 func = me.handler.get(code)
744 if func is not None:
745 if code == 'TRACE':
746 func(code, rest)
747 else:
748 func(code, *M.split(rest, quotep = True)[0])
749 me.dequeue()
750
751 def dequeue(me):
752 """
753 Pull the oldest command off the queue and try to send it to the server.
754 """
755 if not me.queue or None in me.cmd: return
756 cmd = me.queue.shift()
757 if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
758 me.send(' '.join([quotify(w) for w in cmd.words]))
759 me.cmd[None] = cmd
760
761 def bgtag(me):
762 """
763 Return an unused job tag.
764
765 May be of use when composing commands by hand.
766 """
767 tag = 'J%05d' % me.tagseq
768 me.tagseq += 1
769 return tag
770
771 ## --- Built-in handler functions for server responses ---
772
773 def _detach(me, _, tag):
774 """
775 Respond to a BGDETACH TAG message.
776
777 Move the current foreground command to the background.
778 """
779 assert tag not in me.cmd
780 me.cmd[tag] = me.cmd[None]
781 del me.cmd[None]
782
783 def _response(me, code, tag, *w):
784 """
785 Respond to an OK, INFO or FAIL message.
786
787 If this is a message for a background job, find the tag; then dispatch
788 the result to the command object.
789 """
790 if code.startswith('BG'):
791 code = code[2:]
792 cmd = me.cmd[tag]
793 if code != 'INFO':
794 del me.cmd[tag]
795 cmd.response(code, *w)
796
797 def _fgresponse(me, code, *w):
798 """Process responses to the foreground command."""
799 me._response(code, None, *w)
800
801 ## --- Interface methods ---
802
803 def rawcommand(me, cmd):
804 """
805 Submit the TripeCommand CMD to the server, and look after it until it
806 completes.
807 """
808 if not me.connectedp():
809 raise TripeConnectionError('connection closed')
810 me.queue.push(cmd)
811 me.dequeue()
812
813 def command(me, *cmd, **kw):
814 """Convenience wrapper for creating a TripeCommandIterator object."""
815 return TripeCommandIterator(me, cmd, **kw)
816
817 ## --- Convenience methods for server commands ---
818
819 def add(me, peer, *addr, **kw):
820 return _simple(me.command(bg = True,
821 *['ADD'] +
822 _kwopts(kw, ['tunnel', 'keepalive', 'cork']) +
823 [peer] +
824 list(addr)))
825 def addr(me, peer):
826 return _oneline(me.command('ADDR', peer))
827 def algs(me):
828 return _keyvals(me.command('ALGS'))
829 def checkchal(me, chal):
830 return _simple(me.command('CHECKCHAL', chal))
831 def daemon(me):
832 return _simple(me.command('DAEMON'))
833 def eping(me, peer, **kw):
834 return _oneline(me.command(bg = True,
835 *['PING'] +
836 _kwopts(kw, ['timeout']) +
837 [peer]))
838 def forcekx(me, peer):
839 return _simple(me.command('FORCEKX', peer))
840 def getchal(me):
841 return _oneline(me.command('GETCHAL', filter = _tokenjoin))
842 def greet(me, peer, chal):
843 return _simple(me.command('GREET', peer, chal))
844 def help(me):
845 return list(me.command('HELP', filter = _tokenjoin))
846 def ifname(me, peer):
847 return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
848 def kill(me, peer):
849 return _simple(me.command('KILL', peer))
850 def list(me):
851 return list(me.command('LIST', filter = _tokenjoin))
852 def notify(me, *msg):
853 return _simple(me.command('NOTIFY', *msg))
854 def peerinfo(me, peer):
855 return _keyvals(me.command('PEERINFO', peer))
856 def ping(me, peer, **kw):
857 return _oneline(me.command(bg = True,
858 *['PING'] +
859 _kwopts(kw, ['timeout']) +
860 [peer]))
861 def port(me):
862 return _oneline(me.command('PORT', filter = _tokenjoin))
863 def quit(me):
864 return _simple(me.command('QUIT'))
865 def reload(me):
866 return _simple(me.command('RELOAD'))
867 def servinfo(me):
868 return _keyvals(me.command('SERVINFO'))
869 def setifname(me, new):
870 return _simple(me.command('SETIFNAME', new))
871 def svcclaim(me, service, version):
872 return _simple(me.command('SVCCLAIM', service, version))
873 def svcensure(me, service, version = None):
874 return _simple(me.command('SVCENSURE', service,
875 *((version is not None and [version]) or [])))
876 def svcfail(me, job, *msg):
877 return _simple(me.command('SVCFAIL', job, *msg))
878 def svcinfo(me, job, *msg):
879 return _simple(me.command('SVCINFO', job, *msg))
880 def svclist(me):
881 return list(me.command('SVCLIST'))
882 def svcok(me, job):
883 return _simple(me.command('SVCOK', job))
884 def svcquery(me, service):
885 return _keyvals(me.command('SVCQUERY', service))
886 def svcrelease(me, service):
887 return _simple(me.command('SVCRELEASE', service))
888 def svcsubmit(me, service, *args, **kw):
889 return me.command(bg = True,
890 *['SVCSUBMIT'] +
891 _kwopts(kw, ['version']) +
892 [service] +
893 list(args))
894 def stats(me, peer):
895 return _keyvals(me.command('STATS', peer))
896 def trace(me, *args):
897 return _tracelike(me.command('TRACE', *args))
898 def tunnels(me):
899 return list(me.command('TUNNELS', filter = _tokenjoin))
900 def version(me):
901 return _oneline(me.command('VERSION', filter = _tokenjoin))
902 def warn(me, *msg):
903 return _simple(me.command('WARN', *msg))
904 def watch(me, *args):
905 return _tracelike(me.command('WATCH', *args))
906
907###--------------------------------------------------------------------------
908### Asynchronous commands.
909
2fa80010
MW
910class TripeAsynchronousCommand (TripeCommand):
911 """
912 Asynchronous commands.
913
914 This is the complicated way of issuing commands. You must set up a queue,
915 and associate the command with the queue. Responses arriving for the
916 command will be put on the queue as an triple of the form (TAG, CODE, REST)
917 -- where TAG is an object of your choice, not interpreted by this class,
918 CODE is the server's response code (OK, INFO, FAIL), and REST is the list
919 of the rest of the server's tokens.
920
921 Using this, you can write coroutines which process many commands (and
922 possibly other events) simultaneously.
923 """
924
925 def __init__(me, queue, tag, words):
926 """Make an asynchronous command consisting of the given WORDS, which
927 sends responses to QUEUE, labelled with TAG."""
928 TripeCommand.__init__(me, words)
929 me.queue = queue
930 me.tag = tag
931
932 def response(me, code, *stuff):
933 """Handle a server response by writing it to the caller's queue."""
934 me.queue.put((me.tag, code, list(stuff)))
935
2fa80010
MW
936###--------------------------------------------------------------------------
937### Services.
938
939class TripeJobCancelled (Exception):
940 """
941 Exception sent to job handler if the client kills the job.
942
943 Not propagated further.
944 """
945 pass
946
947class TripeJobError (Exception):
948 """
949 Exception to cause failure report for running job.
950
951 Sends an SVCFAIL code back.
952 """
953 pass
954
955class TripeSyntaxError (Exception):
956 """
957 Exception to report a syntax error for a job.
958
959 Sends an SVCFAIL bad-svc-syntax message back.
960 """
961 pass
962
690a6ec1 963class TripeServiceManager (TripeCommandDispatcher):
2fa80010
MW
964 """
965 A command dispatcher with added handling for incoming service requests.
966
967 There is usually only one instance of this class, called svcmgr. Some of
968 the support functions in this module assume that this is the case.
969
970 To use, run mLib.select in a loop until the quitp method returns true;
971 then, in a non-root coroutine, register your services by calling `add', and
972 then call `running' when you've finished setting up.
973
974 The instance handles server service messages SVCJOB, SVCCANCEL and
975 SVCCLAIM. It maintains a table of running services. Incoming jobs cause
976 the service's `job' method to be invoked; SVCCANCEL sends a
977 TripeJobCancelled exception to the handler coroutine, and SVCCLAIM causes
978 the relevant service to be deregistered.
979
980 There is no base class for jobs, but a job must implement two methods:
981
982 start() Begin processing; might be a no-op.
983
984 cancel() Stop processing; the original client has killed the
985 job.
986
987 The life of a service manager is divided into two parts: setup and running;
988 you tell the manager that you've finished setting up by calling the
989 `running' method. If, at any point after setup is finished, there are no
990 remaining services or jobs, `quitp' will return true, ending the process.
991 """
992
993 ## --- Attributes ---
994 ##
995 ## svc Mapping name -> service object
996 ##
997 ## job Mapping jobid -> job handler coroutine
998 ##
999 ## runningp True when setup is finished
1000 ##
1001 ## _quitp True if explicit quit has been requested
1002
1003 def __init__(me, socket):
1004 """
1005 Initialize the service manager.
1006
1007 SOCKET is the administration socket to connect to.
1008 """
690a6ec1 1009 TripeCommandDispatcher.__init__(me, socket)
2fa80010
MW
1010 me.svc = {}
1011 me.job = {}
1012 me.runningp = False
1013 me.handler['SVCCANCEL'] = me._cancel
1014 me.handler['SVCJOB'] = me._job
1015 me.handler['SVCCLAIM'] = me._claim
1016 me._quitp = 0
1017
1018 def addsvc(me, svc):
1019 """Register a new service; SVC is a TripeService instance."""
1020 assert svc.name not in me.svc
1021 me.svcclaim(svc.name, svc.version)
1022 me.svc[svc.name] = svc
1023
1024 def _cancel(me, _, jid):
1025 """
1026 Called when the server cancels a job; invokes the job's `cancel' method.
1027 """
1028 job = me.job[jid]
1029 del me.job[jid]
1030 job.cancel()
1031
1032 def _claim(me, _, svc, __):
1033 """Called when another program claims our service at a higher version."""
1034 del me.svc[svc]
1035
1036 def _job(me, _, jid, svc, cmd, *args):
1037 """
1038 Called when the server sends us a job to do.
1039
1040 Calls the service to collect a job, and begins processing it.
1041 """
1042 assert jid not in me.job
1043 svc = me.svc[svc.lower()]
1044 job = svc.job(jid, cmd, args)
1045 me.job[jid] = job
1046 job.start()
1047
1048 def running(me):
1049 """Answer true if setup is finished."""
1050 me.runningp = True
1051
1052 def jobdone(me, jid):
1053 """Informs the service manager that the job with id JID has finished."""
1054 try:
1055 del me.job[jid]
1056 except KeyError:
1057 pass
1058
1059 def quitp(me):
1060 """
1061 Return true if no services or jobs are active (and, therefore, if this
1062 process can quit without anyone caring).
1063 """
1064 return me._quitp or (me.runningp and ((not me.svc and not me.job) or
690a6ec1 1065 not me.sock))
2fa80010
MW
1066
1067 def quit(me):
1068 """Forces the quit flag (returned by quitp) on."""
1069 me._quitp = True
1070
1071class TripeService (object):
1072 """
1073 A standard service.
1074
1075 The NAME and VERSION are passed on to the server. The CMDTAB is a
1076 dictionary mapping command names (in lowercase) to command objects.
1077
1078 If the CMDTAB doesn't have entries for commands HELP and QUIT then defaults
1079 are provided.
1080
1081 TripeService itself is mostly agnostic about the nature of command objects,
1082 but the TripeServiceJob class (below) has some requirements. The built-in
1083 HELP command requires command objects to have `usage' attributes.
1084 """
1085
1086 def __init__(me, name, version, cmdtab):
1087 """
1088 Create and register a new service with the given NAME and VERSION.
1089
1090 CMDTAB maps command names (in lower-case) to command objects.
1091 """
1092 me.name = name
1093 me.version = version
1094 me.cmd = cmdtab
1095 me.activep = True
1096 me.cmd.setdefault('help',
1097 TripeServiceCommand('help', 0, 0, '', me._help))
1098 me.cmd.setdefault('quit',
1099 TripeServiceCommand('quit', 0, 0, '', me._quit))
1100
1101 def job(me, jid, cmd, args):
1102 """
1103 Called by the service manager: a job arrived with id JID.
1104
1105 It asks for comamnd CMD with argument list ARGS. Creates a new job,
1106 passing it the information needed.
1107 """
1108 return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1109
1110 ## Simple default command handlers, complying with the spec in
1111 ## tripe-service(7).
1112
1113 def _help(me):
1114 """Send a help summary to the user."""
1115 cmds = me.cmd.items()
1116 cmds.sort()
1117 for name, cmd in cmds:
1118 svcinfo(name, *cmd.usage)
1119
1120 def _quit(me):
1121 """Terminate the service manager."""
1122 svcmgr.notify('svc-quit', me.name, 'admin-request')
1123 svcmgr.quit()
1124
1125class TripeServiceCommand (object):
1126 """A simple service command."""
1127
1128 def __init__(me, name, min, max, usage, func):
1129 """
1130 Creates a new command.
1131
1132 NAME is the command's name (in lowercase).
1133
1134 MIN and MAX are the minimum and maximum number of allowed arguments (used
1135 for checking); either may be None to indicate no minimum or maximum.
1136
1137 USAGE is a usage string, used for generating help and error messages.
1138
1139 FUNC is the function to invoke.
1140 """
1141 me.name = name
1142 me.min = min
1143 me.max = max
1144 me.usage = usage.split()
1145 me.func = func
1146
1147 def run(me, *args):
1148 """
1149 Called when the command is invoked.
1150
1151 Does minimal checking of the arguments and calls the supplied function.
1152 """
1153 if (me.min is not None and len(args) < me.min) or \
1154 (me.max is not None and len(args) > me.max):
1155 raise TripeSyntaxError
1156 me.func(*args)
1157
1158class TripeServiceJob (Coroutine):
1159 """
1160 Job handler coroutine.
1161
1162 A standard TripeService invokes a TripeServiceJob for each incoming job
1163 request, passing it the jobid, command and arguments, and a command
1164 object. The command object needs the following attributes.
1165
1166 usage A usage list (excluding the command name) showing
1167 arguments and options.
1168
1169 run(*ARGS) Function to react to the command with ARGS split into
1170 separate arguments. Invoked in a coroutine. The
1171 svcinfo function (not the TripeCommandDispatcher
1172 method) may be used to send INFO lines. The function
1173 may raise TripeJobError to send a FAIL response back,
1174 or TripeSyntaxError to send a generic usage error.
1175 TripeJobCancelled exceptions are trapped silently.
1176 Other exceptions are translated into a generic
1177 internal-error message.
1178
1179 This class automatically takes care of sending some closing response to the
1180 job, and for informing the service manager that the job is completed.
1181
1182 The `jid' attribute stores the job's id.
1183 """
1184
1185 def __init__(me, jid, svc, cmd, command, args):
1186 """
1187 Start a new job.
1188
1189 The job is created with id JID, for service SVC, processing command name
1190 CMD (which the service resolved into the command object COMMAND, or
1191 None), and with the arguments ARGS.
1192 """
1193 Coroutine.__init__(me)
1194 me.jid = jid
1195 me.svc = svc
1196 me.cmd = cmd
1197 me.command = command
1198 me.args = args
1199
1200 def run(me):
1201 """
1202 Main body of the coroutine.
1203
1204 Does the tedious exception handling boilerplate and invokes the command's
1205 run method.
1206 """
1207 try:
1208 try:
1209 if me.command is None:
1210 svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1211 else:
1212 me.command.run(*me.args)
1213 svcmgr.svcok(me.jid)
1214 except TripeJobError, exc:
1215 svcmgr.svcfail(me.jid, *exc.args)
1216 except TripeSyntaxError:
1217 svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1218 me.svc.name, me.command.name,
1219 *me.command.usage)
1220 except TripeJobCancelled:
1221 pass
1222 except Exception, exc:
1223 svcmgr.svcfail(me.jid, 'svc-internal-error',
1224 exc.__class__.__name__, str(exc))
1225 finally:
1226 svcmgr.jobdone(me.jid)
1227
1228 def start(me):
1229 """Invoked by the service manager to start running the coroutine."""
1230 me.switch()
1231
1232 def cancel(me):
1233 """Invoked by the service manager to cancel the job."""
1234 me.throw(TripeJobCancelled())
1235
1236def svcinfo(*args):
1237 """
1238 If invoked from a TripeServiceJob coroutine, sends an INFO line to the
1239 job's sender, automatically using the correct job id.
1240 """
1241 svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1242
1243def _setupsvc(tab, func):
1244 """
1245 Setup coroutine for setting up service programs.
1246
1247 Register the given services.
1248 """
1249 try:
1250 for service in tab:
1251 svcmgr.addsvc(service)
1252 if func:
1253 func()
1254 finally:
1255 svcmgr.running()
1256
1257svcmgr = TripeServiceManager(None)
2fa80010
MW
1258def runservices(socket, tab, init = None, setup = None, daemon = False):
1259 """
1260 Function to start a service provider.
1261
1262 SOCKET is the socket to connect to, usually tripesock.
1263
1264 TAB is a list of entries. An entry may be either a tuple
1265
1266 (NAME, VERSION, COMMANDS)
1267
1268 or a service object (e.g., a TripeService instance).
1269
1270 COMMANDS is a dictionary mapping command names to tuples
1271
1272 (MIN, MAX, USAGE, FUNC)
1273
1274 of arguments for a TripeServiceCommand object.
1275
1276 If DAEMON is true, then the process is forked into the background before we
1277 start. If INIT is given, it is called in the main coroutine, immediately
1278 after forking. If SETUP is given, it is called in a coroutine, after
1279 calling INIT and setting up the services but before marking the service
1280 manager as running.
1281
1282 It is a really bad idea to do any initialization, particularly setting up
1283 coroutines, outside of the INIT or SETUP functions. In particular, if
1284 we're using rmcr for fake coroutines, the daemonizing fork will kill off
1285 the currently established coroutines in a most surprising way.
1286
1287 The function runs a main select loop until the service manager decides to
1288 quit.
1289 """
1290
2fa80010
MW
1291 svcmgr.socket = socket
1292 svcmgr.connect()
1293 svcs = []
1294 for service in tab:
1295 if not isinstance(service, tuple):
1296 svcs.append(service)
1297 else:
1298 name, version, commands = service
1299 cmdmap = {}
1300 for cmd, stuff in commands.iteritems():
1301 cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1302 svcs.append(TripeService(name, version, cmdmap))
1303 if daemon:
1304 M.daemonize()
1305 if init is not None:
1306 init()
690a6ec1
MW
1307 spawn(_setupsvc, svcs, setup)
1308 svcmgr.mainloop()
2fa80010
MW
1309
1310###--------------------------------------------------------------------------
1311### Utilities for services.
1312
1313_timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
1314def timespec(spec):
1315 """Parse the timespec SPEC, returning a number of seconds."""
1316 mul = 1
1317 if len(spec) > 1 and spec[-1] in _timeunits:
1318 mul = _timeunits[spec[-1]]
1319 spec = spec[:-1]
1320 try:
1321 t = int(spec)
1322 except:
1323 raise TripeJobError('bad-time-spec', spec)
1324 if t < 0:
1325 raise TripeJobError('bad-time-spec', spec)
1326 return mul * int(spec)
1327
1328class OptParse (object):
1329 """
1330 Parse options from a command list in the conventional fashion.
1331
1332 ARGS is a list of arguments to a command. ALLOWED is a sequence of allowed
1333 options. The returned values are the option tags. During parsing, the
1334 `arg' method may be used to retrieve the argument for the most recent
1335 option. Afterwards, `rest' may be used to retrieve the remaining
1336 non-option arguments, and do a simple check on how many there are.
1337
1338 The parser correctly handles `--' option terminators.
1339 """
1340
1341 def __init__(me, args, allowed):
1342 """
1343 Create a new option parser.
1344
1345 The parser will scan the ARGS for options given in the sequence ALLOWED
1346 (which are expected to include the `-' prefix).
1347 """
1348 me.allowed = {}
1349 for a in allowed:
1350 me.allowed[a] = True
1351 me.args = list(args)
1352
1353 def __iter__(me):
1354 """Iterator protocol: I am my own iterator."""
1355 return me
1356
1357 def next(me):
1358 """
1359 Iterator protocol: return the next option.
1360
1361 If we've run out, raise StopIteration.
1362 """
1363 if len(me.args) == 0 or \
1364 len(me.args[0]) < 2 or \
1365 not me.args[0].startswith('-'):
1366 raise StopIteration
1367 opt = me.args.pop(0)
1368 if opt == '--':
1369 raise StopIteration
1370 if opt not in me.allowed:
1371 raise TripeSyntaxError
1372 return opt
1373
1374 def arg(me):
1375 """
1376 Return the argument for the most recent option.
1377
1378 If none is available, raise TripeSyntaxError.
1379 """
1380 if len(me.args) == 0:
1381 raise TripeSyntaxError
1382 return me.args.pop(0)
1383
1384 def rest(me, min = None, max = None):
1385 """
1386 After option parsing is done, return the remaining arguments.
1387
1388 Check that there are at least MIN and at most MAX arguments remaining --
1389 either may be None to suppress the check.
1390 """
1391 if (min is not None and len(me.args) < min) or \
1392 (max is not None and len(me.args) > max):
1393 raise TripeSyntaxError
1394 return me.args
1395
1396###----- That's all, folks --------------------------------------------------