chiark / gitweb /
svc/conntrack.in (straddr): Handle `None' as a packed address.
[tripe] / py / tripe.py.in
CommitLineData
2fa80010
MW
1### -*-python-*-
2###
3### Administration connection with tripe server
4###
5### (c) 2006 Straylight/Edgeware
6###
7
8###----- Licensing notice ---------------------------------------------------
9###
10### This file is part of Trivial IP Encryption (TrIPE).
11###
12### TrIPE is free software; you can redistribute it and/or modify
13### it under the terms of the GNU General Public License as published by
14### the Free Software Foundation; either version 2 of the License, or
15### (at your option) any later version.
16###
17### TrIPE is distributed in the hope that it will be useful,
18### but WITHOUT ANY WARRANTY; without even the implied warranty of
19### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20### GNU General Public License for more details.
21###
22### You should have received a copy of the GNU General Public License
23### along with TrIPE; if not, write to the Free Software Foundation,
24### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25
26"""
27This module provides classes and functions for connecting to a running tripe
28server, sending it commands, receiving and processing replies, and
29implementing services.
30
31Rather than end up in lost in a storm of little event-driven classes, or a
32morass of concurrent threads, the module uses coroutines to present a fairly
33simple function call/return interface to potentially long-running commands
a62f8e8a 34which must run without blocking the main process. It assumes a coroutine
2fa80010
MW
35module presenting a subset of the `greenlet' interface: if actual greenlets
36are available, they are used; otherwise there's an implementation in terms of
37threads (with lots of locking) which will do instead.
38
39The simple rule governing the coroutines used here is this:
40
41 * The root coroutine never cares what values are passed to it when it
42 resumes: it just discards them.
43
44 * Other, non-root, coroutines are presumed to be waiting for some specific
45 thing.
46
47Configuration variables:
48 configdir
49 socketdir
50 PACKAGE
51 VERSION
52 tripesock
53 peerdb
54
55Other useful variables:
56 rootcr
57 svcmgr
58
59Other tweakables:
60 _debug
61
62Exceptions:
63 Exception
64 StandardError
65 TripeConnectionError
66 TripeError
67 TripeInternalError
68 TripeJobCancelled
69 TripeJobError
70 TripeSyntaxError
71
72Classes:
73 _Coroutine
74 Coroutine
75 TripeServiceJob
76 OptParse
77 Queue
690a6ec1 78 SelIOWatcher
2fa80010
MW
79 TripeCommand
80 TripeSynchronousCommand
81 TripeAsynchronousCommand
82 TripeCommandIterator
83 TripeConnection
84 TripeCommandDispatcher
690a6ec1 85 TripeServiceManager
2fa80010
MW
86 TripeService
87 TripeServiceCommand
88
89Utility functions:
90 quotify
91 runservices
92 spawn
93 svcinfo
94 timespec
95"""
96
97__pychecker__ = 'self=me no-constCond no-argsused'
98
99_debug = False
100
101###--------------------------------------------------------------------------
102### External dependencies.
103
104import socket as S
105import errno as E
106import mLib as M
107import re as RX
108import sys as SYS
109import os as OS
110
111try:
112 if OS.getenv('TRIPE_FORCE_RMCR') is not None:
113 raise ImportError
114 from py.magic import greenlet as _Coroutine
115except ImportError:
116 from rmcr import Coroutine as _Coroutine
117
118###--------------------------------------------------------------------------
119### Coroutine hacking.
120
121rootcr = _Coroutine.getcurrent()
122
123class Coroutine (_Coroutine):
124 """
125 A coroutine class which can only be invoked by the root coroutine.
126
127 The root, by construction, cannot be an instance of this class.
128 """
129 def switch(me, *args, **kw):
130 assert _Coroutine.getcurrent() is rootcr
22b47552 131 if _debug: print '* %s' % me
2fa80010 132 _Coroutine.switch(me, *args, **kw)
22b47552 133 if _debug: print '* %s' % rootcr
2fa80010
MW
134
135###--------------------------------------------------------------------------
136### Default places for things.
137
138configdir = OS.environ.get('TRIPEDIR', "@configdir@")
139socketdir = "@socketdir@"
140PACKAGE = "@PACKAGE@"
141VERSION = "@VERSION@"
142
143tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
6005ef9b 144peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb')
2fa80010
MW
145
146###--------------------------------------------------------------------------
147### Connection to the server.
148
149def readnonblockingly(sock, len):
150 """
151 Nonblocking read from SOCK.
152
53e9ae9e 153 Try to return LEN bytes. If couldn't read anything, return `None'. EOF is
2fa80010
MW
154 returned as an empty string.
155 """
156 try:
157 sock.setblocking(0)
158 return sock.recv(len)
159 except S.error, exc:
160 if exc[0] == E.EWOULDBLOCK:
161 return None
162 raise
163
164class TripeConnectionError (StandardError):
165 """Something happened to the connection with the server."""
166 pass
167class TripeInternalError (StandardError):
168 """This program is very confused."""
169 pass
170
171class TripeConnection (object):
172 """
173 A logical connection to the tripe administration socket.
174
175 There may or may not be a physical connection. (This is needed for the
176 monitor, for example.)
177
178 This class isn't very useful on its own, but it has useful subclasses. At
179 this level, the class is agnostic about I/O multiplexing schemes; that gets
180 added later.
181 """
182
183 def __init__(me, socket):
184 """
185 Make a connection to the named SOCKET.
186
187 No physical connection is made initially.
188 """
189 me.socket = socket
190 me.sock = None
191 me.lbuf = None
690a6ec1 192 me.iowatch = SelIOWatcher(me)
2fa80010
MW
193
194 def connect(me):
195 """
196 Ensure that there's a physical connection.
197
198 Do nothing if we're already connected. Invoke the `connected' method if
199 successful.
200 """
201 if me.sock: return
202 sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
203 sock.connect(me.socket)
204 me.sock = sock
205 me.lbuf = M.LineBuffer(me.line, me._eof)
206 me.lbuf.size = 1024
207 me.connected()
208 return me
209
210 def disconnect(me, reason):
211 """
212 Disconnect the physical connection.
213
214 Invoke the `disconnected' method, giving the provided REASON, which
53e9ae9e 215 should be either `None' or an exception.
2fa80010
MW
216 """
217 if not me.sock: return
218 me.disconnected(reason)
219 me.sock.close()
220 me.sock = None
221 me.lbuf.disable()
222 me.lbuf = None
223 return me
224
225 def connectedp(me):
226 """
227 Return true if there's a current, believed-good physical connection.
228 """
229 return me.sock is not None
230
231 __nonzero__ = connectedp
232
233 def send(me, line):
234 """
235 Send the LINE to the connection's socket.
236
237 All output is done through this method; it can be overridden to provide
238 proper nonblocking writing, though this seems generally unnecessary.
239 """
240 try:
241 me.sock.setblocking(1)
242 me.sock.send(line + '\n')
243 except Exception, exc:
244 me.disconnect(exc)
245 raise
246 return me
247
248 def receive(me):
249 """
250 Receive whatever's ready from the connection's socket.
251
252 Call `line' on each complete line, and `eof' if the connection closed.
253 Subclasses which attach this class to an I/O-event system should call
53e9ae9e 254 this method when the socket (the `sock' attribute) is ready for reading.
2fa80010
MW
255 """
256 while me.sock is not None:
257 try:
258 buf = readnonblockingly(me.sock, 16384)
259 except Exception, exc:
260 me.disconnect(exc)
261 raise
262 if buf is None:
263 return me
264 if buf == '':
265 me._eof()
266 return me
267 me.lbuf.flush(buf)
268 return me
269
270 def _eof(me):
271 """Internal end-of-file handler."""
272 me.disconnect(TripeConnectionError('connection lost'))
273 me.eof()
274
275 def connected(me):
276 """
277 To be overridden by subclasses to react to a connection being
278 established.
279 """
690a6ec1 280 me.iowatch.connected(me.sock)
2fa80010
MW
281
282 def disconnected(me, reason):
283 """
284 To be overridden by subclasses to react to a connection being severed.
285 """
690a6ec1 286 me.iowatch.disconnected()
2fa80010
MW
287
288 def eof(me):
289 """To be overridden by subclasses to handle end-of-file."""
290 pass
291
292 def line(me, line):
293 """To be overridden by subclasses to handle incoming lines."""
294 pass
295
690a6ec1
MW
296###--------------------------------------------------------------------------
297### I/O loop integration.
298
299class SelIOWatcher (object):
300 """
301 Integration with mLib's I/O event system.
302
303 You can replace this object with a different one for integration with,
53e9ae9e 304 e.g., glib's main loop, by setting `CONN.iowatcher' to a different object
690a6ec1
MW
305 while the CONN is disconnected.
306 """
307
308 def __init__(me, conn):
309 me._conn = conn
310 me._selfile = None
311
312 def connected(me, sock):
313 """
314 Called when a connection is made.
315
53e9ae9e 316 SOCK is the socket. The watcher must arrange to call `CONN.receive' when
690a6ec1
MW
317 data is available.
318 """
319 me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive)
320 me._selfile.enable()
321
322 def disconnected(me):
323 """
324 Called when the connection is lost.
325 """
326 me._selfile = None
327
328 def iterate(me):
329 """
330 Wait for something interesting to happen, and issue events.
331
332 That is, basically, do one iteration of a main select loop, processing
d8fedf21
MW
333 all of the events, and then return. This is used in the method
334 `TripeCommandDispatcher.mainloop', but that's mostly for the benefit of
335 `runservices'; if your I/O watcher has a different main loop, you can
336 drive it yourself.
690a6ec1
MW
337 """
338 M.select()
339
340###--------------------------------------------------------------------------
341### Inter-coroutine communication.
342
343class Queue (object):
344 """
345 A queue of things arriving asynchronously.
346
347 This is a very simple single-reader multiple-writer queue. It's useful for
348 more complex coroutines which need to cope with a variety of possible
349 incoming events.
350 """
351
352 def __init__(me):
353 """Create a new empty queue."""
354 me.contents = M.Array()
355 me.waiter = None
356
357 def _wait(me):
358 """
359 Internal: wait for an item to arrive in the queue.
360
361 Complain if someone is already waiting, because this is just a
362 single-reader queue.
363 """
364 if me.waiter:
365 raise ValueError('queue already being waited on')
366 try:
367 me.waiter = Coroutine.getcurrent()
368 while not me.contents:
369 me.waiter.parent.switch()
370 finally:
371 me.waiter = None
372
373 def get(me):
374 """
375 Remove and return the item at the head of the queue.
376
377 If the queue is empty, wait until an item arrives.
378 """
379 me._wait()
380 return me.contents.shift()
381
382 def peek(me):
383 """
384 Return the item at the head of the queue without removing it.
385
386 If the queue is empty, wait until an item arrives.
387 """
388 me._wait()
389 return me.contents[0]
390
391 def put(me, thing):
392 """
393 Write THING to the queue.
394
395 If someone is waiting on the queue, wake him up immediately; otherwise
396 just leave the item there for later.
397 """
398 me.contents.push(thing)
399 if me.waiter:
400 me.waiter.switch()
401
2fa80010
MW
402###--------------------------------------------------------------------------
403### Dispatching coroutine.
404
405## Match a string if it can stand on its own as a bareword: i.e., it doesn't
406## contain backslashes, quotes or whitespace.
407rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
408
409## Match characters which need to be escaped, even in quoted text.
410rx_weird = RX.compile(r'([\\\'])')
411
412def quotify(s):
413 """Quote S according to the tripe-admin(5) rules."""
414 m = rx_ordinary.match(s)
415 if m and m.end() == len(s):
416 return s
417 else:
418 return "'" + rx_weird.sub(r'\\\1', s) + "'"
419
420def _callback(func):
421 """
422 Return a wrapper for FUNC which reports exceptions thrown by it.
423
424 Useful in the case of callbacks invoked by C functions which ignore
425 exceptions.
426 """
427 def _(*a, **kw):
428 try:
429 return func(*a, **kw)
430 except:
431 SYS.excepthook(*SYS.exc_info())
432 raise
433 return _
434
435class TripeCommand (object):
436 """
437 This abstract class represents a command in progress.
438
439 The `words' attribute contains the list of tokens which make up the
440 command.
441
442 Subclasses must implement a method to handle server responses:
443
53e9ae9e
MW
444 * response(CODE, *ARGS): CODE is one of the strings `OK', `INFO' or
445 `FAIL'; ARGS are the remaining tokens from the server's response.
2fa80010
MW
446 """
447
448 def __init__(me, words):
449 """Make a new command consisting of the given list of WORDS."""
450 me.words = words
451
452class TripeSynchronousCommand (TripeCommand):
453 """
454 A simple command, processed apparently synchronously.
455
456 Must be invoked from a coroutine other than the root (or whichever one is
457 running the dispatcher); in reality, other coroutines carry on running
458 while we wait for a response from the server.
459
460 Each server response causes the calling coroutine to be resumed with the
461 pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
462 or `FAIL') and REST is a list of the server's other response tokens. The
463 calling coroutine must continue switching back to the dispatcher until a
464 terminating response (`OK' or `FAIL') is received or become very
465 confused.
466
53e9ae9e 467 Mostly it's better to use the `TripeCommandIterator' to do this
2fa80010
MW
468 automatically.
469 """
470
471 def __init__(me, words):
472 """Initialize the command, specifying the WORDS to send to the server."""
473 TripeCommand.__init__(me, words)
474 me.owner = Coroutine.getcurrent()
475
476 def response(me, code, *rest):
477 """Handle a server response by forwarding it to the calling coroutine."""
478 me.owner.switch((code, rest))
479
480class TripeError (StandardError):
481 """
53e9ae9e 482 A tripe command failed with an error (a `FAIL' code). The args attribute
2fa80010
MW
483 contains a list of the server's message tokens.
484 """
485 pass
486
487class TripeCommandIterator (object):
488 """
489 Iterator interface to a tripe command.
490
491 The values returned by the iterator are lists of tokens from the server's
53e9ae9e
MW
492 `INFO' lines, as processed by the given filter function, if any. The
493 iterator completes normally (by raising `StopIteration') if the server
494 reported `OK', and raises an exception if the command failed for some reason.
2fa80010 495
53e9ae9e
MW
496 A `TripeError' is raised if the server issues a `FAIL' code. If the
497 connection failed, some other exception is raised.
2fa80010
MW
498 """
499
500 def __init__(me, dispatcher, words, bg = False, filter = None):
501 """
502 Create a new command iterator.
503
504 The command is submitted to the DISPATCHER; it consists of the given
505 WORDS. If BG is true, then an option is inserted to request that the
506 server run the command in the background. The FILTER is applied to the
507 token lists which the server responds, and the filter's output are the
508 items returned by the iterator.
509 """
510 me.dcr = Coroutine.getcurrent().parent
511 if me.dcr is None:
512 raise ValueError, 'must invoke from coroutine'
513 me.filter = filter or (lambda x: x)
514 if bg:
515 words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
516 dispatcher.rawcommand(TripeSynchronousCommand(words))
517
518 def __iter__(me):
519 """Iterator protocol: I am my own iterator."""
520 return me
521
522 def next(me):
523 """
524 Iterator protocol: return the next piece of information from the server.
525
53e9ae9e
MW
526 `INFO' responses are filtered and returned as the values of the
527 iteration. `FAIL' and `CONNERR' responses are turned into exceptions and
528 raised. Finally, `OK' is turned into `StopIteration', which should cause
529 a normal end to the iteration process.
2fa80010
MW
530 """
531 thing = me.dcr.switch()
532 code, rest = thing
533 if code == 'INFO':
534 return me.filter(rest)
535 elif code == 'OK':
536 raise StopIteration
537 elif code == 'CONNERR':
538 if rest is None:
539 raise TripeConnectionError, 'connection terminated by user'
540 else:
541 raise rest
542 elif code == 'FAIL':
543 raise TripeError(*rest)
544 else:
545 raise TripeInternalError \
546 ('unexpected tripe response %r' % ([code] + rest))
547
548### Simple utility functions for the TripeCommandIterator convenience
549### methods.
550
551def _tokenjoin(words):
552 """Filter function: simply join the given tokens with spaces between."""
553 return ' '.join(words)
554
555def _keyvals(iter):
53e9ae9e 556 """Return a dictionary formed from the `KEY=VALUE' pairs returned by the
2fa80010
MW
557 iterator ITER."""
558 kv = {}
559 for ww in iter:
560 for w in ww:
561 q = w.index('=')
562 kv[w[:q]] = w[q + 1:]
563 return kv
564
565def _simple(iter):
566 """Raise an error if ITER contains any item."""
567 stuff = list(iter)
568 if len(stuff) != 0:
569 raise TripeInternalError('expected no response')
570 return None
571
572def _oneline(iter):
573 """If ITER contains a single item, return it; otherwise raise an error."""
574 stuff = list(iter)
575 if len(stuff) != 1:
576 raise TripeInternalError('expected only one line of response')
577 return stuff[0]
578
579def _tracelike(iter):
580 """Handle a TRACE-like command. The result is a list of tuples (CHAR,
581 STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
582 disabled, `+' if enabled, maybe something else later), and DESC is the
583 human-readable description."""
584 stuff = []
585 for ww in iter:
586 ch = ww[0][0]
587 st = ww[0][1:]
588 desc = ' '.join(ww[1:])
589 stuff.append((ch, st, desc))
590 return stuff
591
592def _kwopts(kw, allowed):
593 """Parse keyword arguments into options. ALLOWED is a list of allowable
53e9ae9e
MW
594 keywords; raise errors if other keywords are present. `KEY = VALUE'
595 becomes an option pair `-KEY VALUE' if VALUE is a string, just the option
596 `-KEY' if VALUE is a true non-string, or nothing if VALUE is false. Insert
597 a `--' at the end to stop the parser getting confused."""
2fa80010
MW
598 opts = []
599 amap = {}
600 for a in allowed: amap[a] = True
601 for k, v in kw.iteritems():
602 if k not in amap:
603 raise ValueError('option %s not allowed here' % k)
604 if isinstance(v, str):
605 opts += ['-' + k, v]
606 elif v:
607 opts += ['-' + k]
608 opts.append('--')
609 return opts
610
690a6ec1
MW
611## Deferral.
612_deferq = []
613def defer(func, *args, **kw):
614 """Call FUNC(*ARGS, **KW) later, in the root coroutine."""
615 _deferq.append((func, args, kw))
616
22b47552
MW
617def funargstr(func, args, kw):
618 items = [repr(a) for a in args]
619 for k, v in kw.iteritems():
620 items.append('%s = %r' % (k, v))
621 return '%s(%s)' % (func.__name__, ', '.join(items))
622
690a6ec1
MW
623def spawn(func, *args, **kw):
624 """Call FUNC, passing ARGS and KW, in a fresh coroutine."""
22b47552
MW
625 defer(lambda: (Coroutine(func, name = funargstr(func, args, kw))
626 .switch(*args, **kw)))
690a6ec1
MW
627
628## Asides.
629_asideq = Queue()
630def _runasides():
631 """
632 Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW).
633 """
634 while True:
635 func, args, kw = _asideq.get()
636 try:
637 func(*args, **kw)
638 except:
639 SYS.excepthook(*SYS.exc_info())
640
641def aside(func, *args, **kw):
642 """Call FUNC(*ARGS, **KW) later, in a non-root coroutine."""
643 defer(_asideq.put, (func, args, kw))
644
2fa80010
MW
645class TripeCommandDispatcher (TripeConnection):
646 """
647 Command dispatcher.
648
649 The command dispatcher is a connection which knows how to handle commands.
650 This is probably the most important class in this module to understand.
651
652 Lines from the server are parsed into tokens. The first token is a code
53e9ae9e 653 (`OK' or `NOTE' or something) explaining what kind of line this is. The
2fa80010
MW
654 `handler' attribute is a dictionary mapping server line codes to handler
655 functions, which are applied to the words of the line as individual
53e9ae9e 656 arguments. *Exception*: the content of `TRACE' lines is not tokenized.
2fa80010
MW
657
658 There are default handlers for server codes which respond to commands.
53e9ae9e 659 Commands arrive as `TripeCommand' instances through the `rawcommand'
2fa80010
MW
660 interface. The dispatcher keeps track of which command objects represent
661 which jobs, and sends responses on to the appropriate command objects by
53e9ae9e
MW
662 invoking their `response' methods. Command objects don't see the `BG...'
663 codes, because the dispatcher has already transformed them into regular
664 codes when it was looking up the job tag.
2fa80010 665
53e9ae9e 666 The dispatcher also has a special response code of its own: `CONNERR'
2fa80010 667 indicates that the connection failed and the command has therefore been
c1d15fda
MW
668 lost. This is sent to all outstanding commands when a connection error is
669 encountered: rather than a token list, it is accompanied by an exception
670 object which is the cause of the disconnection, which may be `None' if the
671 disconnection is expected (e.g., the direct result of a user request).
2fa80010
MW
672 """
673
674 ## --- Infrastructure ---
675 ##
676 ## We will get confused if we pipeline commands. Send them one at a time.
677 ## Only send a command when the previous one detaches or completes.
678 ##
679 ## The following attributes are interesting:
680 ##
681 ## tagseq Sequence number for next background job (for bgtag)
682 ##
683 ## queue Commands awaiting submission.
684 ##
685 ## cmd Mapping from job tags to commands: cmd[None] is the
686 ## foreground command.
687 ##
688 ## handler Mapping from server codes to handler functions.
689
690 def __init__(me, socket):
691 """
692 Initialize the dispatcher.
693
694 The SOCKET is the filename of the administration socket to connect to,
695 for TripeConnection.__init__.
696 """
697 TripeConnection.__init__(me, socket)
698 me.tagseq = 0
699 me.handler = {}
700 me.handler['BGDETACH'] = me._detach
701 for i in 'BGOK', 'BGINFO', 'BGFAIL':
702 me.handler[i] = me._response
703 for i in 'OK', 'INFO', 'FAIL':
704 me.handler[i] = me._fgresponse
705
690a6ec1
MW
706 def quitp(me):
707 """Should we quit the main loop? Subclasses should override."""
708 return False
709
710 def mainloop(me, quitp = None):
711 """
712 Iterate the I/O watcher until QUITP returns true.
713
714 Arranges for asides and deferred calls to be made at the right times.
715 """
716
717 global _deferq
718 assert _Coroutine.getcurrent() is rootcr
22b47552
MW
719 Coroutine(_runasides, name = '_runasides').switch()
720 if quitp is None:
721 quitp = me.quitp
690a6ec1
MW
722 while not quitp():
723 while _deferq:
724 q = _deferq
725 _deferq = []
726 for func, args, kw in q:
727 func(*args, **kw)
728 me.iowatch.iterate()
729
2fa80010
MW
730 def connected(me):
731 """
732 Connection hook.
733
734 If a subclass overrides this method, it must call us; clears out the
735 command queue and job map.
736 """
737 me.queue = M.Array()
738 me.cmd = {}
690a6ec1 739 TripeConnection.connected(me)
2fa80010
MW
740
741 def disconnected(me, reason):
742 """
743 Disconnection hook.
744
745 If a subclass hooks overrides this method, it must call us; sends a
53e9ae9e 746 special `CONNERR' code to all incomplete commands.
2fa80010 747 """
690a6ec1 748 TripeConnection.disconnected(me, reason)
2fa80010
MW
749 for cmd in me.cmd.itervalues():
750 cmd.response('CONNERR', reason)
751 for cmd in me.queue:
752 cmd.response('CONNERR', reason)
753
754 @_callback
755 def line(me, line):
756 """Handle an incoming line, sending it to the right place."""
757 if _debug: print '<', line
758 code, rest = M.word(line, quotep = True)
759 func = me.handler.get(code)
760 if func is not None:
761 if code == 'TRACE':
762 func(code, rest)
763 else:
764 func(code, *M.split(rest, quotep = True)[0])
765 me.dequeue()
766
767 def dequeue(me):
768 """
769 Pull the oldest command off the queue and try to send it to the server.
770 """
771 if not me.queue or None in me.cmd: return
772 cmd = me.queue.shift()
773 if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
774 me.send(' '.join([quotify(w) for w in cmd.words]))
775 me.cmd[None] = cmd
776
777 def bgtag(me):
778 """
779 Return an unused job tag.
780
781 May be of use when composing commands by hand.
782 """
783 tag = 'J%05d' % me.tagseq
784 me.tagseq += 1
785 return tag
786
787 ## --- Built-in handler functions for server responses ---
788
789 def _detach(me, _, tag):
790 """
53e9ae9e 791 Respond to a `BGDETACH' TAG message.
2fa80010
MW
792
793 Move the current foreground command to the background.
794 """
795 assert tag not in me.cmd
796 me.cmd[tag] = me.cmd[None]
797 del me.cmd[None]
798
799 def _response(me, code, tag, *w):
800 """
53e9ae9e 801 Respond to an `OK', `INFO' or `FAIL' message.
2fa80010
MW
802
803 If this is a message for a background job, find the tag; then dispatch
53e9ae9e
MW
804 the result to the command object. This is also called by `_fgresponse'
805 (wth TAG set to `None') to handle responses for foreground commands, and
806 is therefore a useful method to extend or override in subclasses.
2fa80010
MW
807 """
808 if code.startswith('BG'):
809 code = code[2:]
810 cmd = me.cmd[tag]
811 if code != 'INFO':
812 del me.cmd[tag]
813 cmd.response(code, *w)
814
815 def _fgresponse(me, code, *w):
816 """Process responses to the foreground command."""
817 me._response(code, None, *w)
818
819 ## --- Interface methods ---
820
821 def rawcommand(me, cmd):
822 """
53e9ae9e 823 Submit the `TripeCommand' CMD to the server, and look after it until it
2fa80010
MW
824 completes.
825 """
826 if not me.connectedp():
827 raise TripeConnectionError('connection closed')
828 me.queue.push(cmd)
829 me.dequeue()
830
831 def command(me, *cmd, **kw):
832 """Convenience wrapper for creating a TripeCommandIterator object."""
833 return TripeCommandIterator(me, cmd, **kw)
834
835 ## --- Convenience methods for server commands ---
836
837 def add(me, peer, *addr, **kw):
838 return _simple(me.command(bg = True,
839 *['ADD'] +
48b84569 840 _kwopts(kw, ['tunnel', 'keepalive',
fe2a5dcf
MW
841 'key', 'priv', 'cork',
842 'mobile']) +
2fa80010
MW
843 [peer] +
844 list(addr)))
845 def addr(me, peer):
846 return _oneline(me.command('ADDR', peer))
35c8b547
MW
847 def algs(me, peer = None):
848 return _keyvals(me.command('ALGS',
849 *((peer is not None and [peer]) or [])))
2fa80010
MW
850 def checkchal(me, chal):
851 return _simple(me.command('CHECKCHAL', chal))
852 def daemon(me):
853 return _simple(me.command('DAEMON'))
854 def eping(me, peer, **kw):
855 return _oneline(me.command(bg = True,
856 *['PING'] +
857 _kwopts(kw, ['timeout']) +
858 [peer]))
859 def forcekx(me, peer):
860 return _simple(me.command('FORCEKX', peer))
861 def getchal(me):
862 return _oneline(me.command('GETCHAL', filter = _tokenjoin))
863 def greet(me, peer, chal):
864 return _simple(me.command('GREET', peer, chal))
865 def help(me):
866 return list(me.command('HELP', filter = _tokenjoin))
867 def ifname(me, peer):
868 return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
869 def kill(me, peer):
870 return _simple(me.command('KILL', peer))
871 def list(me):
872 return list(me.command('LIST', filter = _tokenjoin))
873 def notify(me, *msg):
874 return _simple(me.command('NOTIFY', *msg))
875 def peerinfo(me, peer):
876 return _keyvals(me.command('PEERINFO', peer))
877 def ping(me, peer, **kw):
878 return _oneline(me.command(bg = True,
879 *['PING'] +
880 _kwopts(kw, ['timeout']) +
881 [peer]))
882 def port(me):
883 return _oneline(me.command('PORT', filter = _tokenjoin))
884 def quit(me):
885 return _simple(me.command('QUIT'))
886 def reload(me):
887 return _simple(me.command('RELOAD'))
888 def servinfo(me):
889 return _keyvals(me.command('SERVINFO'))
890 def setifname(me, new):
891 return _simple(me.command('SETIFNAME', new))
892 def svcclaim(me, service, version):
893 return _simple(me.command('SVCCLAIM', service, version))
894 def svcensure(me, service, version = None):
895 return _simple(me.command('SVCENSURE', service,
896 *((version is not None and [version]) or [])))
897 def svcfail(me, job, *msg):
898 return _simple(me.command('SVCFAIL', job, *msg))
899 def svcinfo(me, job, *msg):
900 return _simple(me.command('SVCINFO', job, *msg))
901 def svclist(me):
902 return list(me.command('SVCLIST'))
903 def svcok(me, job):
904 return _simple(me.command('SVCOK', job))
905 def svcquery(me, service):
906 return _keyvals(me.command('SVCQUERY', service))
907 def svcrelease(me, service):
908 return _simple(me.command('SVCRELEASE', service))
909 def svcsubmit(me, service, *args, **kw):
910 return me.command(bg = True,
911 *['SVCSUBMIT'] +
912 _kwopts(kw, ['version']) +
913 [service] +
914 list(args))
915 def stats(me, peer):
916 return _keyvals(me.command('STATS', peer))
917 def trace(me, *args):
918 return _tracelike(me.command('TRACE', *args))
919 def tunnels(me):
920 return list(me.command('TUNNELS', filter = _tokenjoin))
921 def version(me):
922 return _oneline(me.command('VERSION', filter = _tokenjoin))
923 def warn(me, *msg):
924 return _simple(me.command('WARN', *msg))
925 def watch(me, *args):
926 return _tracelike(me.command('WATCH', *args))
927
928###--------------------------------------------------------------------------
929### Asynchronous commands.
930
2fa80010
MW
931class TripeAsynchronousCommand (TripeCommand):
932 """
933 Asynchronous commands.
934
935 This is the complicated way of issuing commands. You must set up a queue,
936 and associate the command with the queue. Responses arriving for the
937 command will be put on the queue as an triple of the form (TAG, CODE, REST)
938 -- where TAG is an object of your choice, not interpreted by this class,
53e9ae9e
MW
939 CODE is the server's response code (`OK', `INFO', `FAIL', or `CONNERR'),
940 and REST is the list of the rest of the server's tokens.
2fa80010
MW
941
942 Using this, you can write coroutines which process many commands (and
943 possibly other events) simultaneously.
944 """
945
946 def __init__(me, queue, tag, words):
947 """Make an asynchronous command consisting of the given WORDS, which
948 sends responses to QUEUE, labelled with TAG."""
949 TripeCommand.__init__(me, words)
950 me.queue = queue
951 me.tag = tag
952
953 def response(me, code, *stuff):
954 """Handle a server response by writing it to the caller's queue."""
955 me.queue.put((me.tag, code, list(stuff)))
956
2fa80010
MW
957###--------------------------------------------------------------------------
958### Services.
959
960class TripeJobCancelled (Exception):
961 """
962 Exception sent to job handler if the client kills the job.
963
964 Not propagated further.
965 """
966 pass
967
968class TripeJobError (Exception):
969 """
970 Exception to cause failure report for running job.
971
972 Sends an SVCFAIL code back.
973 """
974 pass
975
976class TripeSyntaxError (Exception):
977 """
978 Exception to report a syntax error for a job.
979
980 Sends an SVCFAIL bad-svc-syntax message back.
981 """
982 pass
983
690a6ec1 984class TripeServiceManager (TripeCommandDispatcher):
2fa80010
MW
985 """
986 A command dispatcher with added handling for incoming service requests.
987
988 There is usually only one instance of this class, called svcmgr. Some of
989 the support functions in this module assume that this is the case.
990
53e9ae9e 991 To use, run `mLib.select' in a loop until the quitp method returns true;
2fa80010
MW
992 then, in a non-root coroutine, register your services by calling `add', and
993 then call `running' when you've finished setting up.
994
53e9ae9e
MW
995 The instance handles server service messages `SVCJOB', `SVCCANCEL' and
996 `SVCCLAIM'. It maintains a table of running services. Incoming jobs cause
997 the service's `job' method to be invoked; `SVCCANCEL' sends a
998 `TripeJobCancelled' exception to the handler coroutine, and `SVCCLAIM'
999 causes the relevant service to be deregistered.
2fa80010
MW
1000
1001 There is no base class for jobs, but a job must implement two methods:
1002
1003 start() Begin processing; might be a no-op.
1004
1005 cancel() Stop processing; the original client has killed the
1006 job.
1007
1008 The life of a service manager is divided into two parts: setup and running;
1009 you tell the manager that you've finished setting up by calling the
1010 `running' method. If, at any point after setup is finished, there are no
1011 remaining services or jobs, `quitp' will return true, ending the process.
1012 """
1013
1014 ## --- Attributes ---
1015 ##
1016 ## svc Mapping name -> service object
1017 ##
1018 ## job Mapping jobid -> job handler coroutine
1019 ##
1020 ## runningp True when setup is finished
1021 ##
1022 ## _quitp True if explicit quit has been requested
1023
1024 def __init__(me, socket):
1025 """
1026 Initialize the service manager.
1027
1028 SOCKET is the administration socket to connect to.
1029 """
690a6ec1 1030 TripeCommandDispatcher.__init__(me, socket)
2fa80010
MW
1031 me.svc = {}
1032 me.job = {}
1033 me.runningp = False
1034 me.handler['SVCCANCEL'] = me._cancel
1035 me.handler['SVCJOB'] = me._job
1036 me.handler['SVCCLAIM'] = me._claim
1037 me._quitp = 0
1038
1039 def addsvc(me, svc):
53e9ae9e 1040 """Register a new service; SVC is a `TripeService' instance."""
2fa80010
MW
1041 assert svc.name not in me.svc
1042 me.svcclaim(svc.name, svc.version)
1043 me.svc[svc.name] = svc
1044
1045 def _cancel(me, _, jid):
1046 """
1047 Called when the server cancels a job; invokes the job's `cancel' method.
1048 """
1049 job = me.job[jid]
1050 del me.job[jid]
1051 job.cancel()
1052
1053 def _claim(me, _, svc, __):
1054 """Called when another program claims our service at a higher version."""
1055 del me.svc[svc]
1056
1057 def _job(me, _, jid, svc, cmd, *args):
1058 """
1059 Called when the server sends us a job to do.
1060
1061 Calls the service to collect a job, and begins processing it.
1062 """
1063 assert jid not in me.job
1064 svc = me.svc[svc.lower()]
1065 job = svc.job(jid, cmd, args)
1066 me.job[jid] = job
1067 job.start()
1068
1069 def running(me):
1070 """Answer true if setup is finished."""
1071 me.runningp = True
1072
1073 def jobdone(me, jid):
1074 """Informs the service manager that the job with id JID has finished."""
1075 try:
1076 del me.job[jid]
1077 except KeyError:
1078 pass
1079
1080 def quitp(me):
1081 """
1082 Return true if no services or jobs are active (and, therefore, if this
1083 process can quit without anyone caring).
1084 """
1085 return me._quitp or (me.runningp and ((not me.svc and not me.job) or
690a6ec1 1086 not me.sock))
2fa80010
MW
1087
1088 def quit(me):
1089 """Forces the quit flag (returned by quitp) on."""
1090 me._quitp = True
1091
1092class TripeService (object):
1093 """
1094 A standard service.
1095
1096 The NAME and VERSION are passed on to the server. The CMDTAB is a
1097 dictionary mapping command names (in lowercase) to command objects.
1098
53e9ae9e
MW
1099 If the CMDTAB doesn't have entries for commands `HELP' and `QUIT' then
1100 defaults are provided.
2fa80010
MW
1101
1102 TripeService itself is mostly agnostic about the nature of command objects,
1103 but the TripeServiceJob class (below) has some requirements. The built-in
1104 HELP command requires command objects to have `usage' attributes.
1105 """
1106
1107 def __init__(me, name, version, cmdtab):
1108 """
1109 Create and register a new service with the given NAME and VERSION.
1110
1111 CMDTAB maps command names (in lower-case) to command objects.
1112 """
1113 me.name = name
1114 me.version = version
1115 me.cmd = cmdtab
1116 me.activep = True
1117 me.cmd.setdefault('help',
1118 TripeServiceCommand('help', 0, 0, '', me._help))
1119 me.cmd.setdefault('quit',
1120 TripeServiceCommand('quit', 0, 0, '', me._quit))
1121
1122 def job(me, jid, cmd, args):
1123 """
1124 Called by the service manager: a job arrived with id JID.
1125
1126 It asks for comamnd CMD with argument list ARGS. Creates a new job,
1127 passing it the information needed.
1128 """
1129 return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1130
1131 ## Simple default command handlers, complying with the spec in
1132 ## tripe-service(7).
1133
1134 def _help(me):
1135 """Send a help summary to the user."""
1136 cmds = me.cmd.items()
1137 cmds.sort()
1138 for name, cmd in cmds:
1139 svcinfo(name, *cmd.usage)
1140
1141 def _quit(me):
1142 """Terminate the service manager."""
1143 svcmgr.notify('svc-quit', me.name, 'admin-request')
1144 svcmgr.quit()
1145
1146class TripeServiceCommand (object):
1147 """A simple service command."""
1148
1149 def __init__(me, name, min, max, usage, func):
1150 """
1151 Creates a new command.
1152
1153 NAME is the command's name (in lowercase).
1154
1155 MIN and MAX are the minimum and maximum number of allowed arguments (used
1156 for checking); either may be None to indicate no minimum or maximum.
1157
1158 USAGE is a usage string, used for generating help and error messages.
1159
1160 FUNC is the function to invoke.
1161 """
1162 me.name = name
1163 me.min = min
1164 me.max = max
1165 me.usage = usage.split()
1166 me.func = func
1167
1168 def run(me, *args):
1169 """
1170 Called when the command is invoked.
1171
1172 Does minimal checking of the arguments and calls the supplied function.
1173 """
1174 if (me.min is not None and len(args) < me.min) or \
1175 (me.max is not None and len(args) > me.max):
1176 raise TripeSyntaxError
1177 me.func(*args)
1178
1179class TripeServiceJob (Coroutine):
1180 """
1181 Job handler coroutine.
1182
53e9ae9e
MW
1183 A standard `TripeService' invokes a `TripeServiceJob' for each incoming job
1184 request, passing it the jobid, command and arguments, and a command object.
1185 The command object needs the following attributes.
2fa80010
MW
1186
1187 usage A usage list (excluding the command name) showing
1188 arguments and options.
1189
1190 run(*ARGS) Function to react to the command with ARGS split into
1191 separate arguments. Invoked in a coroutine. The
53e9ae9e
MW
1192 `svcinfo function (not the `TripeCommandDispatcher'
1193 method) may be used to send `INFO' lines. The
1194 function may raise `TripeJobError' to send a `FAIL'
1195 response back, or `TripeSyntaxError' to send a
1196 generic usage error. `TripeJobCancelled' exceptions
1197 are trapped silently. Other exceptions are
1198 translated into a generic internal-error message.
2fa80010
MW
1199
1200 This class automatically takes care of sending some closing response to the
1201 job, and for informing the service manager that the job is completed.
1202
1203 The `jid' attribute stores the job's id.
1204 """
1205
1206 def __init__(me, jid, svc, cmd, command, args):
1207 """
1208 Start a new job.
1209
1210 The job is created with id JID, for service SVC, processing command name
1211 CMD (which the service resolved into the command object COMMAND, or
53e9ae9e 1212 `None'), and with the arguments ARGS.
2fa80010
MW
1213 """
1214 Coroutine.__init__(me)
1215 me.jid = jid
1216 me.svc = svc
1217 me.cmd = cmd
1218 me.command = command
1219 me.args = args
1220
1221 def run(me):
1222 """
1223 Main body of the coroutine.
1224
1225 Does the tedious exception handling boilerplate and invokes the command's
1226 run method.
1227 """
1228 try:
1229 try:
1230 if me.command is None:
1231 svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1232 else:
1233 me.command.run(*me.args)
1234 svcmgr.svcok(me.jid)
1235 except TripeJobError, exc:
1236 svcmgr.svcfail(me.jid, *exc.args)
1237 except TripeSyntaxError:
1238 svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1239 me.svc.name, me.command.name,
1240 *me.command.usage)
1241 except TripeJobCancelled:
1242 pass
1243 except Exception, exc:
1244 svcmgr.svcfail(me.jid, 'svc-internal-error',
1245 exc.__class__.__name__, str(exc))
1246 finally:
1247 svcmgr.jobdone(me.jid)
1248
1249 def start(me):
1250 """Invoked by the service manager to start running the coroutine."""
1251 me.switch()
1252
1253 def cancel(me):
1254 """Invoked by the service manager to cancel the job."""
1255 me.throw(TripeJobCancelled())
1256
1257def svcinfo(*args):
1258 """
53e9ae9e 1259 If invoked from a TripeServiceJob coroutine, sends an `INFO' line to the
2fa80010
MW
1260 job's sender, automatically using the correct job id.
1261 """
1262 svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1263
1264def _setupsvc(tab, func):
1265 """
1266 Setup coroutine for setting up service programs.
1267
1268 Register the given services.
1269 """
1270 try:
1271 for service in tab:
1272 svcmgr.addsvc(service)
1273 if func:
1274 func()
1275 finally:
1276 svcmgr.running()
1277
1278svcmgr = TripeServiceManager(None)
2fa80010
MW
1279def runservices(socket, tab, init = None, setup = None, daemon = False):
1280 """
1281 Function to start a service provider.
1282
1283 SOCKET is the socket to connect to, usually tripesock.
1284
1285 TAB is a list of entries. An entry may be either a tuple
1286
1287 (NAME, VERSION, COMMANDS)
1288
53e9ae9e 1289 or a service object (e.g., a `TripeService' instance).
2fa80010
MW
1290
1291 COMMANDS is a dictionary mapping command names to tuples
1292
1293 (MIN, MAX, USAGE, FUNC)
1294
53e9ae9e 1295 of arguments for a `TripeServiceCommand' object.
2fa80010
MW
1296
1297 If DAEMON is true, then the process is forked into the background before we
1298 start. If INIT is given, it is called in the main coroutine, immediately
1299 after forking. If SETUP is given, it is called in a coroutine, after
1300 calling INIT and setting up the services but before marking the service
1301 manager as running.
1302
1303 It is a really bad idea to do any initialization, particularly setting up
1304 coroutines, outside of the INIT or SETUP functions. In particular, if
1305 we're using rmcr for fake coroutines, the daemonizing fork will kill off
1306 the currently established coroutines in a most surprising way.
1307
1308 The function runs a main select loop until the service manager decides to
1309 quit.
1310 """
1311
2fa80010
MW
1312 svcmgr.socket = socket
1313 svcmgr.connect()
1314 svcs = []
1315 for service in tab:
1316 if not isinstance(service, tuple):
1317 svcs.append(service)
1318 else:
1319 name, version, commands = service
1320 cmdmap = {}
1321 for cmd, stuff in commands.iteritems():
1322 cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1323 svcs.append(TripeService(name, version, cmdmap))
1324 if daemon:
1325 M.daemonize()
1326 if init is not None:
1327 init()
690a6ec1
MW
1328 spawn(_setupsvc, svcs, setup)
1329 svcmgr.mainloop()
2fa80010
MW
1330
1331###--------------------------------------------------------------------------
1332### Utilities for services.
1333
1334_timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
1335def timespec(spec):
1336 """Parse the timespec SPEC, returning a number of seconds."""
1337 mul = 1
1338 if len(spec) > 1 and spec[-1] in _timeunits:
1339 mul = _timeunits[spec[-1]]
1340 spec = spec[:-1]
1341 try:
1342 t = int(spec)
1343 except:
1344 raise TripeJobError('bad-time-spec', spec)
1345 if t < 0:
1346 raise TripeJobError('bad-time-spec', spec)
1347 return mul * int(spec)
1348
1349class OptParse (object):
1350 """
1351 Parse options from a command list in the conventional fashion.
1352
1353 ARGS is a list of arguments to a command. ALLOWED is a sequence of allowed
1354 options. The returned values are the option tags. During parsing, the
1355 `arg' method may be used to retrieve the argument for the most recent
1356 option. Afterwards, `rest' may be used to retrieve the remaining
1357 non-option arguments, and do a simple check on how many there are.
1358
1359 The parser correctly handles `--' option terminators.
1360 """
1361
1362 def __init__(me, args, allowed):
1363 """
1364 Create a new option parser.
1365
1366 The parser will scan the ARGS for options given in the sequence ALLOWED
1367 (which are expected to include the `-' prefix).
1368 """
1369 me.allowed = {}
1370 for a in allowed:
1371 me.allowed[a] = True
1372 me.args = list(args)
1373
1374 def __iter__(me):
1375 """Iterator protocol: I am my own iterator."""
1376 return me
1377
1378 def next(me):
1379 """
1380 Iterator protocol: return the next option.
1381
53e9ae9e 1382 If we've run out, raise `StopIteration'.
2fa80010
MW
1383 """
1384 if len(me.args) == 0 or \
1385 len(me.args[0]) < 2 or \
1386 not me.args[0].startswith('-'):
1387 raise StopIteration
1388 opt = me.args.pop(0)
1389 if opt == '--':
1390 raise StopIteration
1391 if opt not in me.allowed:
1392 raise TripeSyntaxError
1393 return opt
1394
1395 def arg(me):
1396 """
1397 Return the argument for the most recent option.
1398
53e9ae9e 1399 If none is available, raise `TripeSyntaxError'.
2fa80010
MW
1400 """
1401 if len(me.args) == 0:
1402 raise TripeSyntaxError
1403 return me.args.pop(0)
1404
1405 def rest(me, min = None, max = None):
1406 """
1407 After option parsing is done, return the remaining arguments.
1408
1409 Check that there are at least MIN and at most MAX arguments remaining --
1410 either may be None to suppress the check.
1411 """
1412 if (min is not None and len(me.args) < min) or \
1413 (max is not None and len(me.args) > max):
1414 raise TripeSyntaxError
1415 return me.args
1416
1417###----- That's all, folks --------------------------------------------------