chiark / gitweb /
server/admin.c: Remove spurious `ping' in usage message.
[tripe] / py / tripe.py.in
CommitLineData
2fa80010
MW
1### -*-python-*-
2###
3### Administration connection with tripe server
4###
5### (c) 2006 Straylight/Edgeware
6###
7
8###----- Licensing notice ---------------------------------------------------
9###
10### This file is part of Trivial IP Encryption (TrIPE).
11###
11ad66c2
MW
12### TrIPE is free software: you can redistribute it and/or modify it under
13### the terms of the GNU General Public License as published by the Free
14### Software Foundation; either version 3 of the License, or (at your
15### option) any later version.
2fa80010 16###
11ad66c2
MW
17### TrIPE is distributed in the hope that it will be useful, but WITHOUT
18### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
19### FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20### for more details.
2fa80010
MW
21###
22### You should have received a copy of the GNU General Public License
11ad66c2 23### along with TrIPE. If not, see <https://www.gnu.org/licenses/>.
2fa80010
MW
24
25"""
26This module provides classes and functions for connecting to a running tripe
27server, sending it commands, receiving and processing replies, and
28implementing services.
29
30Rather than end up in lost in a storm of little event-driven classes, or a
31morass of concurrent threads, the module uses coroutines to present a fairly
32simple function call/return interface to potentially long-running commands
a62f8e8a 33which must run without blocking the main process. It assumes a coroutine
2fa80010
MW
34module presenting a subset of the `greenlet' interface: if actual greenlets
35are available, they are used; otherwise there's an implementation in terms of
36threads (with lots of locking) which will do instead.
37
38The simple rule governing the coroutines used here is this:
39
40 * The root coroutine never cares what values are passed to it when it
41 resumes: it just discards them.
42
43 * Other, non-root, coroutines are presumed to be waiting for some specific
44 thing.
45
46Configuration variables:
47 configdir
48 socketdir
49 PACKAGE
50 VERSION
51 tripesock
52 peerdb
53
54Other useful variables:
55 rootcr
56 svcmgr
57
58Other tweakables:
59 _debug
60
61Exceptions:
62 Exception
63 StandardError
64 TripeConnectionError
65 TripeError
66 TripeInternalError
67 TripeJobCancelled
68 TripeJobError
69 TripeSyntaxError
70
71Classes:
72 _Coroutine
73 Coroutine
74 TripeServiceJob
75 OptParse
76 Queue
690a6ec1 77 SelIOWatcher
2fa80010
MW
78 TripeCommand
79 TripeSynchronousCommand
80 TripeAsynchronousCommand
81 TripeCommandIterator
82 TripeConnection
83 TripeCommandDispatcher
690a6ec1 84 TripeServiceManager
2fa80010
MW
85 TripeService
86 TripeServiceCommand
87
88Utility functions:
89 quotify
90 runservices
91 spawn
92 svcinfo
93 timespec
94"""
95
96__pychecker__ = 'self=me no-constCond no-argsused'
97
98_debug = False
99
100###--------------------------------------------------------------------------
101### External dependencies.
102
103import socket as S
104import errno as E
105import mLib as M
106import re as RX
107import sys as SYS
108import os as OS
109
110try:
111 if OS.getenv('TRIPE_FORCE_RMCR') is not None:
171206b5 112 raise ImportError()
2fa80010
MW
113 from py.magic import greenlet as _Coroutine
114except ImportError:
115 from rmcr import Coroutine as _Coroutine
116
117###--------------------------------------------------------------------------
118### Coroutine hacking.
119
120rootcr = _Coroutine.getcurrent()
121
122class Coroutine (_Coroutine):
123 """
124 A coroutine class which can only be invoked by the root coroutine.
125
126 The root, by construction, cannot be an instance of this class.
127 """
128 def switch(me, *args, **kw):
129 assert _Coroutine.getcurrent() is rootcr
22b47552 130 if _debug: print '* %s' % me
2fa80010 131 _Coroutine.switch(me, *args, **kw)
22b47552 132 if _debug: print '* %s' % rootcr
2fa80010
MW
133
134###--------------------------------------------------------------------------
135### Default places for things.
136
137configdir = OS.environ.get('TRIPEDIR', "@configdir@")
138socketdir = "@socketdir@"
139PACKAGE = "@PACKAGE@"
140VERSION = "@VERSION@"
141
142tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock'))
6005ef9b 143peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb')
2fa80010
MW
144
145###--------------------------------------------------------------------------
146### Connection to the server.
147
148def readnonblockingly(sock, len):
149 """
150 Nonblocking read from SOCK.
151
53e9ae9e 152 Try to return LEN bytes. If couldn't read anything, return `None'. EOF is
2fa80010
MW
153 returned as an empty string.
154 """
155 try:
156 sock.setblocking(0)
157 return sock.recv(len)
158 except S.error, exc:
159 if exc[0] == E.EWOULDBLOCK:
160 return None
161 raise
162
163class TripeConnectionError (StandardError):
164 """Something happened to the connection with the server."""
165 pass
166class TripeInternalError (StandardError):
167 """This program is very confused."""
168 pass
169
170class TripeConnection (object):
171 """
172 A logical connection to the tripe administration socket.
173
174 There may or may not be a physical connection. (This is needed for the
175 monitor, for example.)
176
177 This class isn't very useful on its own, but it has useful subclasses. At
178 this level, the class is agnostic about I/O multiplexing schemes; that gets
179 added later.
180 """
181
182 def __init__(me, socket):
183 """
184 Make a connection to the named SOCKET.
185
186 No physical connection is made initially.
187 """
188 me.socket = socket
189 me.sock = None
190 me.lbuf = None
690a6ec1 191 me.iowatch = SelIOWatcher(me)
2fa80010
MW
192
193 def connect(me):
194 """
195 Ensure that there's a physical connection.
196
197 Do nothing if we're already connected. Invoke the `connected' method if
198 successful.
199 """
200 if me.sock: return
201 sock = S.socket(S.AF_UNIX, S.SOCK_STREAM)
202 sock.connect(me.socket)
203 me.sock = sock
204 me.lbuf = M.LineBuffer(me.line, me._eof)
205 me.lbuf.size = 1024
206 me.connected()
207 return me
208
209 def disconnect(me, reason):
210 """
211 Disconnect the physical connection.
212
213 Invoke the `disconnected' method, giving the provided REASON, which
53e9ae9e 214 should be either `None' or an exception.
2fa80010
MW
215 """
216 if not me.sock: return
217 me.disconnected(reason)
218 me.sock.close()
219 me.sock = None
220 me.lbuf.disable()
221 me.lbuf = None
222 return me
223
224 def connectedp(me):
225 """
226 Return true if there's a current, believed-good physical connection.
227 """
228 return me.sock is not None
229
230 __nonzero__ = connectedp
231
232 def send(me, line):
233 """
234 Send the LINE to the connection's socket.
235
236 All output is done through this method; it can be overridden to provide
237 proper nonblocking writing, though this seems generally unnecessary.
238 """
239 try:
240 me.sock.setblocking(1)
241 me.sock.send(line + '\n')
242 except Exception, exc:
243 me.disconnect(exc)
244 raise
245 return me
246
247 def receive(me):
248 """
249 Receive whatever's ready from the connection's socket.
250
251 Call `line' on each complete line, and `eof' if the connection closed.
252 Subclasses which attach this class to an I/O-event system should call
53e9ae9e 253 this method when the socket (the `sock' attribute) is ready for reading.
2fa80010
MW
254 """
255 while me.sock is not None:
256 try:
257 buf = readnonblockingly(me.sock, 16384)
258 except Exception, exc:
259 me.disconnect(exc)
260 raise
261 if buf is None:
262 return me
263 if buf == '':
264 me._eof()
265 return me
266 me.lbuf.flush(buf)
267 return me
268
269 def _eof(me):
270 """Internal end-of-file handler."""
271 me.disconnect(TripeConnectionError('connection lost'))
272 me.eof()
273
274 def connected(me):
275 """
276 To be overridden by subclasses to react to a connection being
277 established.
278 """
690a6ec1 279 me.iowatch.connected(me.sock)
2fa80010
MW
280
281 def disconnected(me, reason):
282 """
283 To be overridden by subclasses to react to a connection being severed.
284 """
690a6ec1 285 me.iowatch.disconnected()
2fa80010
MW
286
287 def eof(me):
288 """To be overridden by subclasses to handle end-of-file."""
289 pass
290
291 def line(me, line):
292 """To be overridden by subclasses to handle incoming lines."""
293 pass
294
690a6ec1
MW
295###--------------------------------------------------------------------------
296### I/O loop integration.
297
298class SelIOWatcher (object):
299 """
300 Integration with mLib's I/O event system.
301
302 You can replace this object with a different one for integration with,
53e9ae9e 303 e.g., glib's main loop, by setting `CONN.iowatcher' to a different object
690a6ec1
MW
304 while the CONN is disconnected.
305 """
306
307 def __init__(me, conn):
308 me._conn = conn
309 me._selfile = None
310
311 def connected(me, sock):
312 """
313 Called when a connection is made.
314
53e9ae9e 315 SOCK is the socket. The watcher must arrange to call `CONN.receive' when
690a6ec1
MW
316 data is available.
317 """
318 me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive)
319 me._selfile.enable()
320
321 def disconnected(me):
322 """
323 Called when the connection is lost.
324 """
325 me._selfile = None
326
327 def iterate(me):
328 """
329 Wait for something interesting to happen, and issue events.
330
331 That is, basically, do one iteration of a main select loop, processing
d8fedf21
MW
332 all of the events, and then return. This is used in the method
333 `TripeCommandDispatcher.mainloop', but that's mostly for the benefit of
334 `runservices'; if your I/O watcher has a different main loop, you can
335 drive it yourself.
690a6ec1
MW
336 """
337 M.select()
338
339###--------------------------------------------------------------------------
340### Inter-coroutine communication.
341
342class Queue (object):
343 """
344 A queue of things arriving asynchronously.
345
346 This is a very simple single-reader multiple-writer queue. It's useful for
347 more complex coroutines which need to cope with a variety of possible
348 incoming events.
349 """
350
351 def __init__(me):
352 """Create a new empty queue."""
353 me.contents = M.Array()
354 me.waiter = None
355
356 def _wait(me):
357 """
358 Internal: wait for an item to arrive in the queue.
359
360 Complain if someone is already waiting, because this is just a
361 single-reader queue.
362 """
363 if me.waiter:
364 raise ValueError('queue already being waited on')
365 try:
366 me.waiter = Coroutine.getcurrent()
367 while not me.contents:
368 me.waiter.parent.switch()
369 finally:
370 me.waiter = None
371
372 def get(me):
373 """
374 Remove and return the item at the head of the queue.
375
376 If the queue is empty, wait until an item arrives.
377 """
378 me._wait()
379 return me.contents.shift()
380
381 def peek(me):
382 """
383 Return the item at the head of the queue without removing it.
384
385 If the queue is empty, wait until an item arrives.
386 """
387 me._wait()
388 return me.contents[0]
389
390 def put(me, thing):
391 """
392 Write THING to the queue.
393
394 If someone is waiting on the queue, wake him up immediately; otherwise
395 just leave the item there for later.
396 """
397 me.contents.push(thing)
398 if me.waiter:
399 me.waiter.switch()
400
2fa80010
MW
401###--------------------------------------------------------------------------
402### Dispatching coroutine.
403
404## Match a string if it can stand on its own as a bareword: i.e., it doesn't
405## contain backslashes, quotes or whitespace.
406rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$')
407
408## Match characters which need to be escaped, even in quoted text.
409rx_weird = RX.compile(r'([\\\'])')
410
411def quotify(s):
412 """Quote S according to the tripe-admin(5) rules."""
413 m = rx_ordinary.match(s)
414 if m and m.end() == len(s):
415 return s
416 else:
417 return "'" + rx_weird.sub(r'\\\1', s) + "'"
418
419def _callback(func):
420 """
421 Return a wrapper for FUNC which reports exceptions thrown by it.
422
423 Useful in the case of callbacks invoked by C functions which ignore
424 exceptions.
425 """
426 def _(*a, **kw):
427 try:
428 return func(*a, **kw)
429 except:
430 SYS.excepthook(*SYS.exc_info())
431 raise
432 return _
433
434class TripeCommand (object):
435 """
436 This abstract class represents a command in progress.
437
438 The `words' attribute contains the list of tokens which make up the
439 command.
440
441 Subclasses must implement a method to handle server responses:
442
53e9ae9e
MW
443 * response(CODE, *ARGS): CODE is one of the strings `OK', `INFO' or
444 `FAIL'; ARGS are the remaining tokens from the server's response.
2fa80010
MW
445 """
446
447 def __init__(me, words):
448 """Make a new command consisting of the given list of WORDS."""
449 me.words = words
450
451class TripeSynchronousCommand (TripeCommand):
452 """
453 A simple command, processed apparently synchronously.
454
455 Must be invoked from a coroutine other than the root (or whichever one is
456 running the dispatcher); in reality, other coroutines carry on running
457 while we wait for a response from the server.
458
459 Each server response causes the calling coroutine to be resumed with the
460 pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO'
461 or `FAIL') and REST is a list of the server's other response tokens. The
462 calling coroutine must continue switching back to the dispatcher until a
463 terminating response (`OK' or `FAIL') is received or become very
464 confused.
465
53e9ae9e 466 Mostly it's better to use the `TripeCommandIterator' to do this
2fa80010
MW
467 automatically.
468 """
469
470 def __init__(me, words):
471 """Initialize the command, specifying the WORDS to send to the server."""
472 TripeCommand.__init__(me, words)
473 me.owner = Coroutine.getcurrent()
474
475 def response(me, code, *rest):
476 """Handle a server response by forwarding it to the calling coroutine."""
477 me.owner.switch((code, rest))
478
479class TripeError (StandardError):
480 """
53e9ae9e 481 A tripe command failed with an error (a `FAIL' code). The args attribute
2fa80010
MW
482 contains a list of the server's message tokens.
483 """
484 pass
485
486class TripeCommandIterator (object):
487 """
488 Iterator interface to a tripe command.
489
490 The values returned by the iterator are lists of tokens from the server's
53e9ae9e
MW
491 `INFO' lines, as processed by the given filter function, if any. The
492 iterator completes normally (by raising `StopIteration') if the server
493 reported `OK', and raises an exception if the command failed for some reason.
2fa80010 494
53e9ae9e
MW
495 A `TripeError' is raised if the server issues a `FAIL' code. If the
496 connection failed, some other exception is raised.
2fa80010
MW
497 """
498
499 def __init__(me, dispatcher, words, bg = False, filter = None):
500 """
501 Create a new command iterator.
502
503 The command is submitted to the DISPATCHER; it consists of the given
504 WORDS. If BG is true, then an option is inserted to request that the
505 server run the command in the background. The FILTER is applied to the
506 token lists which the server responds, and the filter's output are the
507 items returned by the iterator.
508 """
509 me.dcr = Coroutine.getcurrent().parent
510 if me.dcr is None:
171206b5 511 raise ValueError('must invoke from coroutine')
2fa80010
MW
512 me.filter = filter or (lambda x: x)
513 if bg:
514 words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:])
515 dispatcher.rawcommand(TripeSynchronousCommand(words))
516
517 def __iter__(me):
518 """Iterator protocol: I am my own iterator."""
519 return me
520
521 def next(me):
522 """
523 Iterator protocol: return the next piece of information from the server.
524
53e9ae9e
MW
525 `INFO' responses are filtered and returned as the values of the
526 iteration. `FAIL' and `CONNERR' responses are turned into exceptions and
527 raised. Finally, `OK' is turned into `StopIteration', which should cause
528 a normal end to the iteration process.
2fa80010
MW
529 """
530 thing = me.dcr.switch()
531 code, rest = thing
532 if code == 'INFO':
533 return me.filter(rest)
534 elif code == 'OK':
171206b5 535 raise StopIteration()
2fa80010
MW
536 elif code == 'CONNERR':
537 if rest is None:
171206b5 538 raise TripeConnectionError('connection terminated by user')
2fa80010
MW
539 else:
540 raise rest
541 elif code == 'FAIL':
542 raise TripeError(*rest)
543 else:
171206b5
MW
544 raise TripeInternalError('unexpected tripe response %r' %
545 ([code] + rest))
2fa80010
MW
546
547### Simple utility functions for the TripeCommandIterator convenience
548### methods.
549
550def _tokenjoin(words):
551 """Filter function: simply join the given tokens with spaces between."""
552 return ' '.join(words)
553
554def _keyvals(iter):
53e9ae9e 555 """Return a dictionary formed from the `KEY=VALUE' pairs returned by the
2fa80010
MW
556 iterator ITER."""
557 kv = {}
558 for ww in iter:
559 for w in ww:
560 q = w.index('=')
561 kv[w[:q]] = w[q + 1:]
562 return kv
563
564def _simple(iter):
565 """Raise an error if ITER contains any item."""
566 stuff = list(iter)
567 if len(stuff) != 0:
568 raise TripeInternalError('expected no response')
569 return None
570
571def _oneline(iter):
572 """If ITER contains a single item, return it; otherwise raise an error."""
573 stuff = list(iter)
574 if len(stuff) != 1:
575 raise TripeInternalError('expected only one line of response')
576 return stuff[0]
577
578def _tracelike(iter):
579 """Handle a TRACE-like command. The result is a list of tuples (CHAR,
580 STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if
581 disabled, `+' if enabled, maybe something else later), and DESC is the
582 human-readable description."""
583 stuff = []
584 for ww in iter:
585 ch = ww[0][0]
586 st = ww[0][1:]
587 desc = ' '.join(ww[1:])
588 stuff.append((ch, st, desc))
589 return stuff
590
591def _kwopts(kw, allowed):
592 """Parse keyword arguments into options. ALLOWED is a list of allowable
53e9ae9e
MW
593 keywords; raise errors if other keywords are present. `KEY = VALUE'
594 becomes an option pair `-KEY VALUE' if VALUE is a string, just the option
595 `-KEY' if VALUE is a true non-string, or nothing if VALUE is false. Insert
596 a `--' at the end to stop the parser getting confused."""
2fa80010
MW
597 opts = []
598 amap = {}
599 for a in allowed: amap[a] = True
600 for k, v in kw.iteritems():
601 if k not in amap:
602 raise ValueError('option %s not allowed here' % k)
603 if isinstance(v, str):
604 opts += ['-' + k, v]
605 elif v:
606 opts += ['-' + k]
607 opts.append('--')
608 return opts
609
690a6ec1
MW
610## Deferral.
611_deferq = []
612def defer(func, *args, **kw):
613 """Call FUNC(*ARGS, **KW) later, in the root coroutine."""
614 _deferq.append((func, args, kw))
615
22b47552
MW
616def funargstr(func, args, kw):
617 items = [repr(a) for a in args]
618 for k, v in kw.iteritems():
619 items.append('%s = %r' % (k, v))
620 return '%s(%s)' % (func.__name__, ', '.join(items))
621
690a6ec1
MW
622def spawn(func, *args, **kw):
623 """Call FUNC, passing ARGS and KW, in a fresh coroutine."""
22b47552
MW
624 defer(lambda: (Coroutine(func, name = funargstr(func, args, kw))
625 .switch(*args, **kw)))
690a6ec1
MW
626
627## Asides.
628_asideq = Queue()
629def _runasides():
630 """
631 Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW).
632 """
633 while True:
634 func, args, kw = _asideq.get()
635 try:
636 func(*args, **kw)
637 except:
638 SYS.excepthook(*SYS.exc_info())
639
640def aside(func, *args, **kw):
641 """Call FUNC(*ARGS, **KW) later, in a non-root coroutine."""
642 defer(_asideq.put, (func, args, kw))
643
2fa80010
MW
644class TripeCommandDispatcher (TripeConnection):
645 """
646 Command dispatcher.
647
648 The command dispatcher is a connection which knows how to handle commands.
649 This is probably the most important class in this module to understand.
650
651 Lines from the server are parsed into tokens. The first token is a code
53e9ae9e 652 (`OK' or `NOTE' or something) explaining what kind of line this is. The
2fa80010
MW
653 `handler' attribute is a dictionary mapping server line codes to handler
654 functions, which are applied to the words of the line as individual
53e9ae9e 655 arguments. *Exception*: the content of `TRACE' lines is not tokenized.
2fa80010
MW
656
657 There are default handlers for server codes which respond to commands.
53e9ae9e 658 Commands arrive as `TripeCommand' instances through the `rawcommand'
2fa80010
MW
659 interface. The dispatcher keeps track of which command objects represent
660 which jobs, and sends responses on to the appropriate command objects by
53e9ae9e
MW
661 invoking their `response' methods. Command objects don't see the `BG...'
662 codes, because the dispatcher has already transformed them into regular
663 codes when it was looking up the job tag.
2fa80010 664
53e9ae9e 665 The dispatcher also has a special response code of its own: `CONNERR'
2fa80010 666 indicates that the connection failed and the command has therefore been
c1d15fda
MW
667 lost. This is sent to all outstanding commands when a connection error is
668 encountered: rather than a token list, it is accompanied by an exception
669 object which is the cause of the disconnection, which may be `None' if the
670 disconnection is expected (e.g., the direct result of a user request).
2fa80010
MW
671 """
672
673 ## --- Infrastructure ---
674 ##
675 ## We will get confused if we pipeline commands. Send them one at a time.
676 ## Only send a command when the previous one detaches or completes.
677 ##
678 ## The following attributes are interesting:
679 ##
680 ## tagseq Sequence number for next background job (for bgtag)
681 ##
682 ## queue Commands awaiting submission.
683 ##
684 ## cmd Mapping from job tags to commands: cmd[None] is the
685 ## foreground command.
686 ##
687 ## handler Mapping from server codes to handler functions.
688
689 def __init__(me, socket):
690 """
691 Initialize the dispatcher.
692
693 The SOCKET is the filename of the administration socket to connect to,
694 for TripeConnection.__init__.
695 """
696 TripeConnection.__init__(me, socket)
697 me.tagseq = 0
698 me.handler = {}
699 me.handler['BGDETACH'] = me._detach
700 for i in 'BGOK', 'BGINFO', 'BGFAIL':
701 me.handler[i] = me._response
702 for i in 'OK', 'INFO', 'FAIL':
703 me.handler[i] = me._fgresponse
704
690a6ec1
MW
705 def quitp(me):
706 """Should we quit the main loop? Subclasses should override."""
707 return False
708
709 def mainloop(me, quitp = None):
710 """
711 Iterate the I/O watcher until QUITP returns true.
712
713 Arranges for asides and deferred calls to be made at the right times.
714 """
715
716 global _deferq
717 assert _Coroutine.getcurrent() is rootcr
22b47552
MW
718 Coroutine(_runasides, name = '_runasides').switch()
719 if quitp is None:
720 quitp = me.quitp
690a6ec1
MW
721 while not quitp():
722 while _deferq:
723 q = _deferq
724 _deferq = []
725 for func, args, kw in q:
726 func(*args, **kw)
727 me.iowatch.iterate()
728
2fa80010
MW
729 def connected(me):
730 """
731 Connection hook.
732
733 If a subclass overrides this method, it must call us; clears out the
734 command queue and job map.
735 """
736 me.queue = M.Array()
737 me.cmd = {}
690a6ec1 738 TripeConnection.connected(me)
2fa80010
MW
739
740 def disconnected(me, reason):
741 """
742 Disconnection hook.
743
744 If a subclass hooks overrides this method, it must call us; sends a
53e9ae9e 745 special `CONNERR' code to all incomplete commands.
2fa80010 746 """
690a6ec1 747 TripeConnection.disconnected(me, reason)
2fa80010
MW
748 for cmd in me.cmd.itervalues():
749 cmd.response('CONNERR', reason)
750 for cmd in me.queue:
751 cmd.response('CONNERR', reason)
752
753 @_callback
754 def line(me, line):
755 """Handle an incoming line, sending it to the right place."""
756 if _debug: print '<', line
757 code, rest = M.word(line, quotep = True)
758 func = me.handler.get(code)
759 if func is not None:
760 if code == 'TRACE':
761 func(code, rest)
762 else:
763 func(code, *M.split(rest, quotep = True)[0])
764 me.dequeue()
765
766 def dequeue(me):
767 """
768 Pull the oldest command off the queue and try to send it to the server.
769 """
770 if not me.queue or None in me.cmd: return
771 cmd = me.queue.shift()
772 if _debug: print '>', ' '.join([quotify(w) for w in cmd.words])
773 me.send(' '.join([quotify(w) for w in cmd.words]))
774 me.cmd[None] = cmd
775
776 def bgtag(me):
777 """
778 Return an unused job tag.
779
780 May be of use when composing commands by hand.
781 """
782 tag = 'J%05d' % me.tagseq
783 me.tagseq += 1
784 return tag
785
786 ## --- Built-in handler functions for server responses ---
787
788 def _detach(me, _, tag):
789 """
53e9ae9e 790 Respond to a `BGDETACH' TAG message.
2fa80010
MW
791
792 Move the current foreground command to the background.
793 """
794 assert tag not in me.cmd
795 me.cmd[tag] = me.cmd[None]
796 del me.cmd[None]
797
798 def _response(me, code, tag, *w):
799 """
53e9ae9e 800 Respond to an `OK', `INFO' or `FAIL' message.
2fa80010
MW
801
802 If this is a message for a background job, find the tag; then dispatch
53e9ae9e
MW
803 the result to the command object. This is also called by `_fgresponse'
804 (wth TAG set to `None') to handle responses for foreground commands, and
805 is therefore a useful method to extend or override in subclasses.
2fa80010
MW
806 """
807 if code.startswith('BG'):
808 code = code[2:]
809 cmd = me.cmd[tag]
810 if code != 'INFO':
811 del me.cmd[tag]
812 cmd.response(code, *w)
813
814 def _fgresponse(me, code, *w):
815 """Process responses to the foreground command."""
816 me._response(code, None, *w)
817
818 ## --- Interface methods ---
819
820 def rawcommand(me, cmd):
821 """
53e9ae9e 822 Submit the `TripeCommand' CMD to the server, and look after it until it
2fa80010
MW
823 completes.
824 """
825 if not me.connectedp():
826 raise TripeConnectionError('connection closed')
827 me.queue.push(cmd)
828 me.dequeue()
829
830 def command(me, *cmd, **kw):
831 """Convenience wrapper for creating a TripeCommandIterator object."""
832 return TripeCommandIterator(me, cmd, **kw)
833
834 ## --- Convenience methods for server commands ---
835
836 def add(me, peer, *addr, **kw):
837 return _simple(me.command(bg = True,
838 *['ADD'] +
48b84569 839 _kwopts(kw, ['tunnel', 'keepalive',
fe2a5dcf 840 'key', 'priv', 'cork',
067aa5f0
MW
841 'mobile', 'knock',
842 'ephemeral']) +
2fa80010
MW
843 [peer] +
844 list(addr)))
845 def addr(me, peer):
846 return _oneline(me.command('ADDR', peer))
35c8b547
MW
847 def algs(me, peer = None):
848 return _keyvals(me.command('ALGS',
849 *((peer is not None and [peer]) or [])))
2fa80010
MW
850 def checkchal(me, chal):
851 return _simple(me.command('CHECKCHAL', chal))
852 def daemon(me):
853 return _simple(me.command('DAEMON'))
854 def eping(me, peer, **kw):
855 return _oneline(me.command(bg = True,
db11df93 856 *['EPING'] +
2fa80010
MW
857 _kwopts(kw, ['timeout']) +
858 [peer]))
859 def forcekx(me, peer):
860 return _simple(me.command('FORCEKX', peer))
861 def getchal(me):
862 return _oneline(me.command('GETCHAL', filter = _tokenjoin))
863 def greet(me, peer, chal):
864 return _simple(me.command('GREET', peer, chal))
865 def help(me):
866 return list(me.command('HELP', filter = _tokenjoin))
867 def ifname(me, peer):
868 return _oneline(me.command('IFNAME', peer, filter = _tokenjoin))
869 def kill(me, peer):
870 return _simple(me.command('KILL', peer))
871 def list(me):
872 return list(me.command('LIST', filter = _tokenjoin))
873 def notify(me, *msg):
874 return _simple(me.command('NOTIFY', *msg))
875 def peerinfo(me, peer):
876 return _keyvals(me.command('PEERINFO', peer))
877 def ping(me, peer, **kw):
878 return _oneline(me.command(bg = True,
879 *['PING'] +
880 _kwopts(kw, ['timeout']) +
881 [peer]))
5d06f63e
MW
882 def port(me, af = None):
883 return _oneline(me.command('PORT',
884 *((af is not None) and [af] or []),
885 filter = _tokenjoin))
2fa80010
MW
886 def quit(me):
887 return _simple(me.command('QUIT'))
888 def reload(me):
889 return _simple(me.command('RELOAD'))
890 def servinfo(me):
891 return _keyvals(me.command('SERVINFO'))
892 def setifname(me, new):
893 return _simple(me.command('SETIFNAME', new))
894 def svcclaim(me, service, version):
895 return _simple(me.command('SVCCLAIM', service, version))
896 def svcensure(me, service, version = None):
897 return _simple(me.command('SVCENSURE', service,
898 *((version is not None and [version]) or [])))
899 def svcfail(me, job, *msg):
900 return _simple(me.command('SVCFAIL', job, *msg))
901 def svcinfo(me, job, *msg):
902 return _simple(me.command('SVCINFO', job, *msg))
903 def svclist(me):
904 return list(me.command('SVCLIST'))
905 def svcok(me, job):
906 return _simple(me.command('SVCOK', job))
907 def svcquery(me, service):
908 return _keyvals(me.command('SVCQUERY', service))
909 def svcrelease(me, service):
910 return _simple(me.command('SVCRELEASE', service))
911 def svcsubmit(me, service, *args, **kw):
912 return me.command(bg = True,
913 *['SVCSUBMIT'] +
914 _kwopts(kw, ['version']) +
915 [service] +
916 list(args))
917 def stats(me, peer):
918 return _keyvals(me.command('STATS', peer))
919 def trace(me, *args):
920 return _tracelike(me.command('TRACE', *args))
921 def tunnels(me):
922 return list(me.command('TUNNELS', filter = _tokenjoin))
923 def version(me):
924 return _oneline(me.command('VERSION', filter = _tokenjoin))
925 def warn(me, *msg):
926 return _simple(me.command('WARN', *msg))
927 def watch(me, *args):
928 return _tracelike(me.command('WATCH', *args))
929
930###--------------------------------------------------------------------------
931### Asynchronous commands.
932
2fa80010
MW
933class TripeAsynchronousCommand (TripeCommand):
934 """
935 Asynchronous commands.
936
937 This is the complicated way of issuing commands. You must set up a queue,
938 and associate the command with the queue. Responses arriving for the
939 command will be put on the queue as an triple of the form (TAG, CODE, REST)
940 -- where TAG is an object of your choice, not interpreted by this class,
53e9ae9e
MW
941 CODE is the server's response code (`OK', `INFO', `FAIL', or `CONNERR'),
942 and REST is the list of the rest of the server's tokens.
2fa80010
MW
943
944 Using this, you can write coroutines which process many commands (and
945 possibly other events) simultaneously.
946 """
947
948 def __init__(me, queue, tag, words):
949 """Make an asynchronous command consisting of the given WORDS, which
950 sends responses to QUEUE, labelled with TAG."""
951 TripeCommand.__init__(me, words)
952 me.queue = queue
953 me.tag = tag
954
955 def response(me, code, *stuff):
956 """Handle a server response by writing it to the caller's queue."""
957 me.queue.put((me.tag, code, list(stuff)))
958
2fa80010
MW
959###--------------------------------------------------------------------------
960### Services.
961
962class TripeJobCancelled (Exception):
963 """
964 Exception sent to job handler if the client kills the job.
965
966 Not propagated further.
967 """
968 pass
969
970class TripeJobError (Exception):
971 """
972 Exception to cause failure report for running job.
973
974 Sends an SVCFAIL code back.
975 """
976 pass
977
978class TripeSyntaxError (Exception):
979 """
980 Exception to report a syntax error for a job.
981
982 Sends an SVCFAIL bad-svc-syntax message back.
983 """
984 pass
985
690a6ec1 986class TripeServiceManager (TripeCommandDispatcher):
2fa80010
MW
987 """
988 A command dispatcher with added handling for incoming service requests.
989
990 There is usually only one instance of this class, called svcmgr. Some of
991 the support functions in this module assume that this is the case.
992
53e9ae9e 993 To use, run `mLib.select' in a loop until the quitp method returns true;
2fa80010
MW
994 then, in a non-root coroutine, register your services by calling `add', and
995 then call `running' when you've finished setting up.
996
53e9ae9e
MW
997 The instance handles server service messages `SVCJOB', `SVCCANCEL' and
998 `SVCCLAIM'. It maintains a table of running services. Incoming jobs cause
999 the service's `job' method to be invoked; `SVCCANCEL' sends a
1000 `TripeJobCancelled' exception to the handler coroutine, and `SVCCLAIM'
1001 causes the relevant service to be deregistered.
2fa80010
MW
1002
1003 There is no base class for jobs, but a job must implement two methods:
1004
1005 start() Begin processing; might be a no-op.
1006
1007 cancel() Stop processing; the original client has killed the
1008 job.
1009
1010 The life of a service manager is divided into two parts: setup and running;
1011 you tell the manager that you've finished setting up by calling the
1012 `running' method. If, at any point after setup is finished, there are no
1013 remaining services or jobs, `quitp' will return true, ending the process.
1014 """
1015
1016 ## --- Attributes ---
1017 ##
1018 ## svc Mapping name -> service object
1019 ##
1020 ## job Mapping jobid -> job handler coroutine
1021 ##
1022 ## runningp True when setup is finished
1023 ##
1024 ## _quitp True if explicit quit has been requested
1025
1026 def __init__(me, socket):
1027 """
1028 Initialize the service manager.
1029
1030 SOCKET is the administration socket to connect to.
1031 """
690a6ec1 1032 TripeCommandDispatcher.__init__(me, socket)
2fa80010
MW
1033 me.svc = {}
1034 me.job = {}
1035 me.runningp = False
1036 me.handler['SVCCANCEL'] = me._cancel
1037 me.handler['SVCJOB'] = me._job
1038 me.handler['SVCCLAIM'] = me._claim
1039 me._quitp = 0
1040
1041 def addsvc(me, svc):
53e9ae9e 1042 """Register a new service; SVC is a `TripeService' instance."""
2fa80010
MW
1043 assert svc.name not in me.svc
1044 me.svcclaim(svc.name, svc.version)
1045 me.svc[svc.name] = svc
1046
1047 def _cancel(me, _, jid):
1048 """
1049 Called when the server cancels a job; invokes the job's `cancel' method.
1050 """
1051 job = me.job[jid]
1052 del me.job[jid]
1053 job.cancel()
1054
1055 def _claim(me, _, svc, __):
1056 """Called when another program claims our service at a higher version."""
1057 del me.svc[svc]
1058
1059 def _job(me, _, jid, svc, cmd, *args):
1060 """
1061 Called when the server sends us a job to do.
1062
1063 Calls the service to collect a job, and begins processing it.
1064 """
1065 assert jid not in me.job
1066 svc = me.svc[svc.lower()]
1067 job = svc.job(jid, cmd, args)
1068 me.job[jid] = job
1069 job.start()
1070
1071 def running(me):
1072 """Answer true if setup is finished."""
1073 me.runningp = True
1074
1075 def jobdone(me, jid):
1076 """Informs the service manager that the job with id JID has finished."""
1077 try:
1078 del me.job[jid]
1079 except KeyError:
1080 pass
1081
1082 def quitp(me):
1083 """
1084 Return true if no services or jobs are active (and, therefore, if this
1085 process can quit without anyone caring).
1086 """
1087 return me._quitp or (me.runningp and ((not me.svc and not me.job) or
690a6ec1 1088 not me.sock))
2fa80010
MW
1089
1090 def quit(me):
1091 """Forces the quit flag (returned by quitp) on."""
1092 me._quitp = True
1093
1094class TripeService (object):
1095 """
1096 A standard service.
1097
1098 The NAME and VERSION are passed on to the server. The CMDTAB is a
1099 dictionary mapping command names (in lowercase) to command objects.
1100
53e9ae9e
MW
1101 If the CMDTAB doesn't have entries for commands `HELP' and `QUIT' then
1102 defaults are provided.
2fa80010
MW
1103
1104 TripeService itself is mostly agnostic about the nature of command objects,
1105 but the TripeServiceJob class (below) has some requirements. The built-in
1106 HELP command requires command objects to have `usage' attributes.
1107 """
1108
1109 def __init__(me, name, version, cmdtab):
1110 """
1111 Create and register a new service with the given NAME and VERSION.
1112
1113 CMDTAB maps command names (in lower-case) to command objects.
1114 """
1115 me.name = name
1116 me.version = version
1117 me.cmd = cmdtab
1118 me.activep = True
1119 me.cmd.setdefault('help',
1120 TripeServiceCommand('help', 0, 0, '', me._help))
1121 me.cmd.setdefault('quit',
1122 TripeServiceCommand('quit', 0, 0, '', me._quit))
1123
1124 def job(me, jid, cmd, args):
1125 """
1126 Called by the service manager: a job arrived with id JID.
1127
1128 It asks for comamnd CMD with argument list ARGS. Creates a new job,
1129 passing it the information needed.
1130 """
1131 return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args)
1132
1133 ## Simple default command handlers, complying with the spec in
1134 ## tripe-service(7).
1135
1136 def _help(me):
1137 """Send a help summary to the user."""
1138 cmds = me.cmd.items()
1139 cmds.sort()
1140 for name, cmd in cmds:
1141 svcinfo(name, *cmd.usage)
1142
1143 def _quit(me):
1144 """Terminate the service manager."""
1145 svcmgr.notify('svc-quit', me.name, 'admin-request')
1146 svcmgr.quit()
1147
1148class TripeServiceCommand (object):
1149 """A simple service command."""
1150
1151 def __init__(me, name, min, max, usage, func):
1152 """
1153 Creates a new command.
1154
1155 NAME is the command's name (in lowercase).
1156
1157 MIN and MAX are the minimum and maximum number of allowed arguments (used
1158 for checking); either may be None to indicate no minimum or maximum.
1159
1160 USAGE is a usage string, used for generating help and error messages.
1161
1162 FUNC is the function to invoke.
1163 """
1164 me.name = name
1165 me.min = min
1166 me.max = max
1167 me.usage = usage.split()
1168 me.func = func
1169
1170 def run(me, *args):
1171 """
1172 Called when the command is invoked.
1173
1174 Does minimal checking of the arguments and calls the supplied function.
1175 """
1176 if (me.min is not None and len(args) < me.min) or \
1177 (me.max is not None and len(args) > me.max):
171206b5 1178 raise TripeSyntaxError()
2fa80010
MW
1179 me.func(*args)
1180
1181class TripeServiceJob (Coroutine):
1182 """
1183 Job handler coroutine.
1184
53e9ae9e
MW
1185 A standard `TripeService' invokes a `TripeServiceJob' for each incoming job
1186 request, passing it the jobid, command and arguments, and a command object.
1187 The command object needs the following attributes.
2fa80010
MW
1188
1189 usage A usage list (excluding the command name) showing
1190 arguments and options.
1191
1192 run(*ARGS) Function to react to the command with ARGS split into
1193 separate arguments. Invoked in a coroutine. The
53e9ae9e
MW
1194 `svcinfo function (not the `TripeCommandDispatcher'
1195 method) may be used to send `INFO' lines. The
1196 function may raise `TripeJobError' to send a `FAIL'
1197 response back, or `TripeSyntaxError' to send a
1198 generic usage error. `TripeJobCancelled' exceptions
1199 are trapped silently. Other exceptions are
1200 translated into a generic internal-error message.
2fa80010
MW
1201
1202 This class automatically takes care of sending some closing response to the
1203 job, and for informing the service manager that the job is completed.
1204
1205 The `jid' attribute stores the job's id.
1206 """
1207
1208 def __init__(me, jid, svc, cmd, command, args):
1209 """
1210 Start a new job.
1211
1212 The job is created with id JID, for service SVC, processing command name
1213 CMD (which the service resolved into the command object COMMAND, or
53e9ae9e 1214 `None'), and with the arguments ARGS.
2fa80010
MW
1215 """
1216 Coroutine.__init__(me)
1217 me.jid = jid
1218 me.svc = svc
1219 me.cmd = cmd
1220 me.command = command
1221 me.args = args
1222
1223 def run(me):
1224 """
1225 Main body of the coroutine.
1226
1227 Does the tedious exception handling boilerplate and invokes the command's
1228 run method.
1229 """
1230 try:
1231 try:
1232 if me.command is None:
1233 svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd)
1234 else:
1235 me.command.run(*me.args)
1236 svcmgr.svcok(me.jid)
1237 except TripeJobError, exc:
1238 svcmgr.svcfail(me.jid, *exc.args)
1239 except TripeSyntaxError:
1240 svcmgr.svcfail(me.jid, 'bad-svc-syntax',
1241 me.svc.name, me.command.name,
1242 *me.command.usage)
1243 except TripeJobCancelled:
1244 pass
1245 except Exception, exc:
1246 svcmgr.svcfail(me.jid, 'svc-internal-error',
1247 exc.__class__.__name__, str(exc))
1248 finally:
1249 svcmgr.jobdone(me.jid)
1250
1251 def start(me):
1252 """Invoked by the service manager to start running the coroutine."""
1253 me.switch()
1254
1255 def cancel(me):
1256 """Invoked by the service manager to cancel the job."""
1257 me.throw(TripeJobCancelled())
1258
1259def svcinfo(*args):
1260 """
53e9ae9e 1261 If invoked from a TripeServiceJob coroutine, sends an `INFO' line to the
2fa80010
MW
1262 job's sender, automatically using the correct job id.
1263 """
1264 svcmgr.svcinfo(Coroutine.getcurrent().jid, *args)
1265
1266def _setupsvc(tab, func):
1267 """
1268 Setup coroutine for setting up service programs.
1269
1270 Register the given services.
1271 """
1272 try:
1273 for service in tab:
1274 svcmgr.addsvc(service)
1275 if func:
1276 func()
1277 finally:
1278 svcmgr.running()
1279
1280svcmgr = TripeServiceManager(None)
2fa80010
MW
1281def runservices(socket, tab, init = None, setup = None, daemon = False):
1282 """
1283 Function to start a service provider.
1284
1285 SOCKET is the socket to connect to, usually tripesock.
1286
1287 TAB is a list of entries. An entry may be either a tuple
1288
1289 (NAME, VERSION, COMMANDS)
1290
53e9ae9e 1291 or a service object (e.g., a `TripeService' instance).
2fa80010
MW
1292
1293 COMMANDS is a dictionary mapping command names to tuples
1294
1295 (MIN, MAX, USAGE, FUNC)
1296
53e9ae9e 1297 of arguments for a `TripeServiceCommand' object.
2fa80010
MW
1298
1299 If DAEMON is true, then the process is forked into the background before we
1300 start. If INIT is given, it is called in the main coroutine, immediately
1301 after forking. If SETUP is given, it is called in a coroutine, after
1302 calling INIT and setting up the services but before marking the service
1303 manager as running.
1304
1305 It is a really bad idea to do any initialization, particularly setting up
1306 coroutines, outside of the INIT or SETUP functions. In particular, if
1307 we're using rmcr for fake coroutines, the daemonizing fork will kill off
1308 the currently established coroutines in a most surprising way.
1309
1310 The function runs a main select loop until the service manager decides to
1311 quit.
1312 """
1313
2fa80010
MW
1314 svcmgr.socket = socket
1315 svcmgr.connect()
1316 svcs = []
1317 for service in tab:
1318 if not isinstance(service, tuple):
1319 svcs.append(service)
1320 else:
1321 name, version, commands = service
1322 cmdmap = {}
1323 for cmd, stuff in commands.iteritems():
1324 cmdmap[cmd] = TripeServiceCommand(cmd, *stuff)
1325 svcs.append(TripeService(name, version, cmdmap))
1326 if daemon:
1327 M.daemonize()
1328 if init is not None:
1329 init()
690a6ec1
MW
1330 spawn(_setupsvc, svcs, setup)
1331 svcmgr.mainloop()
2fa80010
MW
1332
1333###--------------------------------------------------------------------------
1334### Utilities for services.
1335
1336_timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}
1337def timespec(spec):
1338 """Parse the timespec SPEC, returning a number of seconds."""
1339 mul = 1
1340 if len(spec) > 1 and spec[-1] in _timeunits:
1341 mul = _timeunits[spec[-1]]
1342 spec = spec[:-1]
1343 try:
1344 t = int(spec)
1345 except:
1346 raise TripeJobError('bad-time-spec', spec)
1347 if t < 0:
1348 raise TripeJobError('bad-time-spec', spec)
1349 return mul * int(spec)
1350
1351class OptParse (object):
1352 """
1353 Parse options from a command list in the conventional fashion.
1354
1355 ARGS is a list of arguments to a command. ALLOWED is a sequence of allowed
1356 options. The returned values are the option tags. During parsing, the
1357 `arg' method may be used to retrieve the argument for the most recent
1358 option. Afterwards, `rest' may be used to retrieve the remaining
1359 non-option arguments, and do a simple check on how many there are.
1360
1361 The parser correctly handles `--' option terminators.
1362 """
1363
1364 def __init__(me, args, allowed):
1365 """
1366 Create a new option parser.
1367
1368 The parser will scan the ARGS for options given in the sequence ALLOWED
1369 (which are expected to include the `-' prefix).
1370 """
1371 me.allowed = {}
1372 for a in allowed:
1373 me.allowed[a] = True
1374 me.args = list(args)
1375
1376 def __iter__(me):
1377 """Iterator protocol: I am my own iterator."""
1378 return me
1379
1380 def next(me):
1381 """
1382 Iterator protocol: return the next option.
1383
53e9ae9e 1384 If we've run out, raise `StopIteration'.
2fa80010
MW
1385 """
1386 if len(me.args) == 0 or \
1387 len(me.args[0]) < 2 or \
1388 not me.args[0].startswith('-'):
171206b5 1389 raise StopIteration()
2fa80010
MW
1390 opt = me.args.pop(0)
1391 if opt == '--':
171206b5 1392 raise StopIteration()
2fa80010 1393 if opt not in me.allowed:
171206b5 1394 raise TripeSyntaxError()
2fa80010
MW
1395 return opt
1396
1397 def arg(me):
1398 """
1399 Return the argument for the most recent option.
1400
53e9ae9e 1401 If none is available, raise `TripeSyntaxError'.
2fa80010
MW
1402 """
1403 if len(me.args) == 0:
171206b5 1404 raise TripeSyntaxError()
2fa80010
MW
1405 return me.args.pop(0)
1406
1407 def rest(me, min = None, max = None):
1408 """
1409 After option parsing is done, return the remaining arguments.
1410
1411 Check that there are at least MIN and at most MAX arguments remaining --
1412 either may be None to suppress the check.
1413 """
1414 if (min is not None and len(me.args) < min) or \
1415 (max is not None and len(me.args) > max):
171206b5 1416 raise TripeSyntaxError()
2fa80010
MW
1417 return me.args
1418
1419###----- That's all, folks --------------------------------------------------