| 1 | ### -*-python-*- |
| 2 | ### |
| 3 | ### Administration connection with tripe server |
| 4 | ### |
| 5 | ### (c) 2006 Straylight/Edgeware |
| 6 | ### |
| 7 | |
| 8 | ###----- Licensing notice --------------------------------------------------- |
| 9 | ### |
| 10 | ### This file is part of Trivial IP Encryption (TrIPE). |
| 11 | ### |
| 12 | ### TrIPE is free software; you can redistribute it and/or modify |
| 13 | ### it under the terms of the GNU General Public License as published by |
| 14 | ### the Free Software Foundation; either version 2 of the License, or |
| 15 | ### (at your option) any later version. |
| 16 | ### |
| 17 | ### TrIPE is distributed in the hope that it will be useful, |
| 18 | ### but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 19 | ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 20 | ### GNU General Public License for more details. |
| 21 | ### |
| 22 | ### You should have received a copy of the GNU General Public License |
| 23 | ### along with TrIPE; if not, write to the Free Software Foundation, |
| 24 | ### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. |
| 25 | |
| 26 | """ |
| 27 | This module provides classes and functions for connecting to a running tripe |
| 28 | server, sending it commands, receiving and processing replies, and |
| 29 | implementing services. |
| 30 | |
| 31 | Rather than end up in lost in a storm of little event-driven classes, or a |
| 32 | morass of concurrent threads, the module uses coroutines to present a fairly |
| 33 | simple function call/return interface to potentially long-running commands |
| 34 | which must run without blocking the main process. It assumes a coroutine |
| 35 | module presenting a subset of the `greenlet' interface: if actual greenlets |
| 36 | are available, they are used; otherwise there's an implementation in terms of |
| 37 | threads (with lots of locking) which will do instead. |
| 38 | |
| 39 | The simple rule governing the coroutines used here is this: |
| 40 | |
| 41 | * The root coroutine never cares what values are passed to it when it |
| 42 | resumes: it just discards them. |
| 43 | |
| 44 | * Other, non-root, coroutines are presumed to be waiting for some specific |
| 45 | thing. |
| 46 | |
| 47 | Configuration variables: |
| 48 | configdir |
| 49 | socketdir |
| 50 | PACKAGE |
| 51 | VERSION |
| 52 | tripesock |
| 53 | peerdb |
| 54 | |
| 55 | Other useful variables: |
| 56 | rootcr |
| 57 | svcmgr |
| 58 | |
| 59 | Other tweakables: |
| 60 | _debug |
| 61 | |
| 62 | Exceptions: |
| 63 | Exception |
| 64 | StandardError |
| 65 | TripeConnectionError |
| 66 | TripeError |
| 67 | TripeInternalError |
| 68 | TripeJobCancelled |
| 69 | TripeJobError |
| 70 | TripeSyntaxError |
| 71 | |
| 72 | Classes: |
| 73 | _Coroutine |
| 74 | Coroutine |
| 75 | TripeServiceJob |
| 76 | OptParse |
| 77 | Queue |
| 78 | SelIOWatcher |
| 79 | TripeCommand |
| 80 | TripeSynchronousCommand |
| 81 | TripeAsynchronousCommand |
| 82 | TripeCommandIterator |
| 83 | TripeConnection |
| 84 | TripeCommandDispatcher |
| 85 | TripeServiceManager |
| 86 | TripeService |
| 87 | TripeServiceCommand |
| 88 | |
| 89 | Utility functions: |
| 90 | quotify |
| 91 | runservices |
| 92 | spawn |
| 93 | svcinfo |
| 94 | timespec |
| 95 | """ |
| 96 | |
| 97 | __pychecker__ = 'self=me no-constCond no-argsused' |
| 98 | |
| 99 | _debug = False |
| 100 | |
| 101 | ###-------------------------------------------------------------------------- |
| 102 | ### External dependencies. |
| 103 | |
| 104 | import socket as S |
| 105 | import errno as E |
| 106 | import mLib as M |
| 107 | import re as RX |
| 108 | import sys as SYS |
| 109 | import os as OS |
| 110 | |
| 111 | try: |
| 112 | if OS.getenv('TRIPE_FORCE_RMCR') is not None: |
| 113 | raise ImportError |
| 114 | from py.magic import greenlet as _Coroutine |
| 115 | except ImportError: |
| 116 | from rmcr import Coroutine as _Coroutine |
| 117 | |
| 118 | ###-------------------------------------------------------------------------- |
| 119 | ### Coroutine hacking. |
| 120 | |
| 121 | rootcr = _Coroutine.getcurrent() |
| 122 | |
| 123 | class Coroutine (_Coroutine): |
| 124 | """ |
| 125 | A coroutine class which can only be invoked by the root coroutine. |
| 126 | |
| 127 | The root, by construction, cannot be an instance of this class. |
| 128 | """ |
| 129 | def switch(me, *args, **kw): |
| 130 | assert _Coroutine.getcurrent() is rootcr |
| 131 | if _debug: print '* %s' % me |
| 132 | _Coroutine.switch(me, *args, **kw) |
| 133 | if _debug: print '* %s' % rootcr |
| 134 | |
| 135 | ###-------------------------------------------------------------------------- |
| 136 | ### Default places for things. |
| 137 | |
| 138 | configdir = OS.environ.get('TRIPEDIR', "@configdir@") |
| 139 | socketdir = "@socketdir@" |
| 140 | PACKAGE = "@PACKAGE@" |
| 141 | VERSION = "@VERSION@" |
| 142 | |
| 143 | tripesock = OS.environ.get('TRIPESOCK', OS.path.join(socketdir, 'tripesock')) |
| 144 | peerdb = OS.environ.get('TRIPEPEERDB', 'peers.cdb') |
| 145 | |
| 146 | ###-------------------------------------------------------------------------- |
| 147 | ### Connection to the server. |
| 148 | |
| 149 | def readnonblockingly(sock, len): |
| 150 | """ |
| 151 | Nonblocking read from SOCK. |
| 152 | |
| 153 | Try to return LEN bytes. If couldn't read anything, return None. EOF is |
| 154 | returned as an empty string. |
| 155 | """ |
| 156 | try: |
| 157 | sock.setblocking(0) |
| 158 | return sock.recv(len) |
| 159 | except S.error, exc: |
| 160 | if exc[0] == E.EWOULDBLOCK: |
| 161 | return None |
| 162 | raise |
| 163 | |
| 164 | class TripeConnectionError (StandardError): |
| 165 | """Something happened to the connection with the server.""" |
| 166 | pass |
| 167 | class TripeInternalError (StandardError): |
| 168 | """This program is very confused.""" |
| 169 | pass |
| 170 | |
| 171 | class TripeConnection (object): |
| 172 | """ |
| 173 | A logical connection to the tripe administration socket. |
| 174 | |
| 175 | There may or may not be a physical connection. (This is needed for the |
| 176 | monitor, for example.) |
| 177 | |
| 178 | This class isn't very useful on its own, but it has useful subclasses. At |
| 179 | this level, the class is agnostic about I/O multiplexing schemes; that gets |
| 180 | added later. |
| 181 | """ |
| 182 | |
| 183 | def __init__(me, socket): |
| 184 | """ |
| 185 | Make a connection to the named SOCKET. |
| 186 | |
| 187 | No physical connection is made initially. |
| 188 | """ |
| 189 | me.socket = socket |
| 190 | me.sock = None |
| 191 | me.lbuf = None |
| 192 | me.iowatch = SelIOWatcher(me) |
| 193 | |
| 194 | def connect(me): |
| 195 | """ |
| 196 | Ensure that there's a physical connection. |
| 197 | |
| 198 | Do nothing if we're already connected. Invoke the `connected' method if |
| 199 | successful. |
| 200 | """ |
| 201 | if me.sock: return |
| 202 | sock = S.socket(S.AF_UNIX, S.SOCK_STREAM) |
| 203 | sock.connect(me.socket) |
| 204 | me.sock = sock |
| 205 | me.lbuf = M.LineBuffer(me.line, me._eof) |
| 206 | me.lbuf.size = 1024 |
| 207 | me.connected() |
| 208 | return me |
| 209 | |
| 210 | def disconnect(me, reason): |
| 211 | """ |
| 212 | Disconnect the physical connection. |
| 213 | |
| 214 | Invoke the `disconnected' method, giving the provided REASON, which |
| 215 | should be either None or an exception. |
| 216 | """ |
| 217 | if not me.sock: return |
| 218 | me.disconnected(reason) |
| 219 | me.sock.close() |
| 220 | me.sock = None |
| 221 | me.lbuf.disable() |
| 222 | me.lbuf = None |
| 223 | return me |
| 224 | |
| 225 | def connectedp(me): |
| 226 | """ |
| 227 | Return true if there's a current, believed-good physical connection. |
| 228 | """ |
| 229 | return me.sock is not None |
| 230 | |
| 231 | __nonzero__ = connectedp |
| 232 | |
| 233 | def send(me, line): |
| 234 | """ |
| 235 | Send the LINE to the connection's socket. |
| 236 | |
| 237 | All output is done through this method; it can be overridden to provide |
| 238 | proper nonblocking writing, though this seems generally unnecessary. |
| 239 | """ |
| 240 | try: |
| 241 | me.sock.setblocking(1) |
| 242 | me.sock.send(line + '\n') |
| 243 | except Exception, exc: |
| 244 | me.disconnect(exc) |
| 245 | raise |
| 246 | return me |
| 247 | |
| 248 | def receive(me): |
| 249 | """ |
| 250 | Receive whatever's ready from the connection's socket. |
| 251 | |
| 252 | Call `line' on each complete line, and `eof' if the connection closed. |
| 253 | Subclasses which attach this class to an I/O-event system should call |
| 254 | this method when the socket (CONN.sock) is ready for reading. |
| 255 | """ |
| 256 | while me.sock is not None: |
| 257 | try: |
| 258 | buf = readnonblockingly(me.sock, 16384) |
| 259 | except Exception, exc: |
| 260 | me.disconnect(exc) |
| 261 | raise |
| 262 | if buf is None: |
| 263 | return me |
| 264 | if buf == '': |
| 265 | me._eof() |
| 266 | return me |
| 267 | me.lbuf.flush(buf) |
| 268 | return me |
| 269 | |
| 270 | def _eof(me): |
| 271 | """Internal end-of-file handler.""" |
| 272 | me.disconnect(TripeConnectionError('connection lost')) |
| 273 | me.eof() |
| 274 | |
| 275 | def connected(me): |
| 276 | """ |
| 277 | To be overridden by subclasses to react to a connection being |
| 278 | established. |
| 279 | """ |
| 280 | me.iowatch.connected(me.sock) |
| 281 | |
| 282 | def disconnected(me, reason): |
| 283 | """ |
| 284 | To be overridden by subclasses to react to a connection being severed. |
| 285 | """ |
| 286 | me.iowatch.disconnected() |
| 287 | |
| 288 | def eof(me): |
| 289 | """To be overridden by subclasses to handle end-of-file.""" |
| 290 | pass |
| 291 | |
| 292 | def line(me, line): |
| 293 | """To be overridden by subclasses to handle incoming lines.""" |
| 294 | pass |
| 295 | |
| 296 | ###-------------------------------------------------------------------------- |
| 297 | ### I/O loop integration. |
| 298 | |
| 299 | class SelIOWatcher (object): |
| 300 | """ |
| 301 | Integration with mLib's I/O event system. |
| 302 | |
| 303 | You can replace this object with a different one for integration with, |
| 304 | e.g., glib's main loop, by setting CONN.iowatcher to a different object |
| 305 | while the CONN is disconnected. |
| 306 | """ |
| 307 | |
| 308 | def __init__(me, conn): |
| 309 | me._conn = conn |
| 310 | me._selfile = None |
| 311 | |
| 312 | def connected(me, sock): |
| 313 | """ |
| 314 | Called when a connection is made. |
| 315 | |
| 316 | SOCK is the socket. The watcher must arrange to call CONN.receive when |
| 317 | data is available. |
| 318 | """ |
| 319 | me._selfile = M.SelFile(sock.fileno(), M.SEL_READ, me._conn.receive) |
| 320 | me._selfile.enable() |
| 321 | |
| 322 | def disconnected(me): |
| 323 | """ |
| 324 | Called when the connection is lost. |
| 325 | """ |
| 326 | me._selfile = None |
| 327 | |
| 328 | def iterate(me): |
| 329 | """ |
| 330 | Wait for something interesting to happen, and issue events. |
| 331 | |
| 332 | That is, basically, do one iteration of a main select loop, processing |
| 333 | all of the events, and then return. This isn't needed for |
| 334 | TripeCommandDispatcher, but runservices wants it. |
| 335 | """ |
| 336 | M.select() |
| 337 | |
| 338 | ###-------------------------------------------------------------------------- |
| 339 | ### Inter-coroutine communication. |
| 340 | |
| 341 | class Queue (object): |
| 342 | """ |
| 343 | A queue of things arriving asynchronously. |
| 344 | |
| 345 | This is a very simple single-reader multiple-writer queue. It's useful for |
| 346 | more complex coroutines which need to cope with a variety of possible |
| 347 | incoming events. |
| 348 | """ |
| 349 | |
| 350 | def __init__(me): |
| 351 | """Create a new empty queue.""" |
| 352 | me.contents = M.Array() |
| 353 | me.waiter = None |
| 354 | |
| 355 | def _wait(me): |
| 356 | """ |
| 357 | Internal: wait for an item to arrive in the queue. |
| 358 | |
| 359 | Complain if someone is already waiting, because this is just a |
| 360 | single-reader queue. |
| 361 | """ |
| 362 | if me.waiter: |
| 363 | raise ValueError('queue already being waited on') |
| 364 | try: |
| 365 | me.waiter = Coroutine.getcurrent() |
| 366 | while not me.contents: |
| 367 | me.waiter.parent.switch() |
| 368 | finally: |
| 369 | me.waiter = None |
| 370 | |
| 371 | def get(me): |
| 372 | """ |
| 373 | Remove and return the item at the head of the queue. |
| 374 | |
| 375 | If the queue is empty, wait until an item arrives. |
| 376 | """ |
| 377 | me._wait() |
| 378 | return me.contents.shift() |
| 379 | |
| 380 | def peek(me): |
| 381 | """ |
| 382 | Return the item at the head of the queue without removing it. |
| 383 | |
| 384 | If the queue is empty, wait until an item arrives. |
| 385 | """ |
| 386 | me._wait() |
| 387 | return me.contents[0] |
| 388 | |
| 389 | def put(me, thing): |
| 390 | """ |
| 391 | Write THING to the queue. |
| 392 | |
| 393 | If someone is waiting on the queue, wake him up immediately; otherwise |
| 394 | just leave the item there for later. |
| 395 | """ |
| 396 | me.contents.push(thing) |
| 397 | if me.waiter: |
| 398 | me.waiter.switch() |
| 399 | |
| 400 | ###-------------------------------------------------------------------------- |
| 401 | ### Dispatching coroutine. |
| 402 | |
| 403 | ## Match a string if it can stand on its own as a bareword: i.e., it doesn't |
| 404 | ## contain backslashes, quotes or whitespace. |
| 405 | rx_ordinary = RX.compile(r'^[^\\\'\"\s]+$') |
| 406 | |
| 407 | ## Match characters which need to be escaped, even in quoted text. |
| 408 | rx_weird = RX.compile(r'([\\\'])') |
| 409 | |
| 410 | def quotify(s): |
| 411 | """Quote S according to the tripe-admin(5) rules.""" |
| 412 | m = rx_ordinary.match(s) |
| 413 | if m and m.end() == len(s): |
| 414 | return s |
| 415 | else: |
| 416 | return "'" + rx_weird.sub(r'\\\1', s) + "'" |
| 417 | |
| 418 | def _callback(func): |
| 419 | """ |
| 420 | Return a wrapper for FUNC which reports exceptions thrown by it. |
| 421 | |
| 422 | Useful in the case of callbacks invoked by C functions which ignore |
| 423 | exceptions. |
| 424 | """ |
| 425 | def _(*a, **kw): |
| 426 | try: |
| 427 | return func(*a, **kw) |
| 428 | except: |
| 429 | SYS.excepthook(*SYS.exc_info()) |
| 430 | raise |
| 431 | return _ |
| 432 | |
| 433 | class TripeCommand (object): |
| 434 | """ |
| 435 | This abstract class represents a command in progress. |
| 436 | |
| 437 | The `words' attribute contains the list of tokens which make up the |
| 438 | command. |
| 439 | |
| 440 | Subclasses must implement a method to handle server responses: |
| 441 | |
| 442 | * response(CODE, *ARGS): CODE is one of the strings 'OK', 'INFO' or |
| 443 | 'FAIL'; ARGS are the remaining tokens from the server's response. |
| 444 | """ |
| 445 | |
| 446 | def __init__(me, words): |
| 447 | """Make a new command consisting of the given list of WORDS.""" |
| 448 | me.words = words |
| 449 | |
| 450 | class TripeSynchronousCommand (TripeCommand): |
| 451 | """ |
| 452 | A simple command, processed apparently synchronously. |
| 453 | |
| 454 | Must be invoked from a coroutine other than the root (or whichever one is |
| 455 | running the dispatcher); in reality, other coroutines carry on running |
| 456 | while we wait for a response from the server. |
| 457 | |
| 458 | Each server response causes the calling coroutine to be resumed with the |
| 459 | pair (CODE, REST) -- where CODE is the server's response code (`OK', `INFO' |
| 460 | or `FAIL') and REST is a list of the server's other response tokens. The |
| 461 | calling coroutine must continue switching back to the dispatcher until a |
| 462 | terminating response (`OK' or `FAIL') is received or become very |
| 463 | confused. |
| 464 | |
| 465 | Mostly it's better to use the TripeCommandIterator to do this |
| 466 | automatically. |
| 467 | """ |
| 468 | |
| 469 | def __init__(me, words): |
| 470 | """Initialize the command, specifying the WORDS to send to the server.""" |
| 471 | TripeCommand.__init__(me, words) |
| 472 | me.owner = Coroutine.getcurrent() |
| 473 | |
| 474 | def response(me, code, *rest): |
| 475 | """Handle a server response by forwarding it to the calling coroutine.""" |
| 476 | me.owner.switch((code, rest)) |
| 477 | |
| 478 | class TripeError (StandardError): |
| 479 | """ |
| 480 | A tripe command failed with an error (a FAIL code). The args attribute |
| 481 | contains a list of the server's message tokens. |
| 482 | """ |
| 483 | pass |
| 484 | |
| 485 | class TripeCommandIterator (object): |
| 486 | """ |
| 487 | Iterator interface to a tripe command. |
| 488 | |
| 489 | The values returned by the iterator are lists of tokens from the server's |
| 490 | INFO lines, as processed by the given filter function, if any. The |
| 491 | iterator completes normally (by raising StopIteration) if the server |
| 492 | reported OK, and raises an exception if the command failed for some reason. |
| 493 | |
| 494 | A TripeError is raised if the server issues a FAIL code. If the connection |
| 495 | failed, some other exception is raised. |
| 496 | """ |
| 497 | |
| 498 | def __init__(me, dispatcher, words, bg = False, filter = None): |
| 499 | """ |
| 500 | Create a new command iterator. |
| 501 | |
| 502 | The command is submitted to the DISPATCHER; it consists of the given |
| 503 | WORDS. If BG is true, then an option is inserted to request that the |
| 504 | server run the command in the background. The FILTER is applied to the |
| 505 | token lists which the server responds, and the filter's output are the |
| 506 | items returned by the iterator. |
| 507 | """ |
| 508 | me.dcr = Coroutine.getcurrent().parent |
| 509 | if me.dcr is None: |
| 510 | raise ValueError, 'must invoke from coroutine' |
| 511 | me.filter = filter or (lambda x: x) |
| 512 | if bg: |
| 513 | words = [words[0], '-background', dispatcher.bgtag()] + list(words[1:]) |
| 514 | dispatcher.rawcommand(TripeSynchronousCommand(words)) |
| 515 | |
| 516 | def __iter__(me): |
| 517 | """Iterator protocol: I am my own iterator.""" |
| 518 | return me |
| 519 | |
| 520 | def next(me): |
| 521 | """ |
| 522 | Iterator protocol: return the next piece of information from the server. |
| 523 | |
| 524 | INFO responses are filtered and returned as the values of the iteration. |
| 525 | FAIL and CONNERR responses are turned into exceptions and raised. |
| 526 | Finally, OK is turned into StopIteration, which should cause a normal end |
| 527 | to the iteration process. |
| 528 | """ |
| 529 | thing = me.dcr.switch() |
| 530 | code, rest = thing |
| 531 | if code == 'INFO': |
| 532 | return me.filter(rest) |
| 533 | elif code == 'OK': |
| 534 | raise StopIteration |
| 535 | elif code == 'CONNERR': |
| 536 | if rest is None: |
| 537 | raise TripeConnectionError, 'connection terminated by user' |
| 538 | else: |
| 539 | raise rest |
| 540 | elif code == 'FAIL': |
| 541 | raise TripeError(*rest) |
| 542 | else: |
| 543 | raise TripeInternalError \ |
| 544 | ('unexpected tripe response %r' % ([code] + rest)) |
| 545 | |
| 546 | ### Simple utility functions for the TripeCommandIterator convenience |
| 547 | ### methods. |
| 548 | |
| 549 | def _tokenjoin(words): |
| 550 | """Filter function: simply join the given tokens with spaces between.""" |
| 551 | return ' '.join(words) |
| 552 | |
| 553 | def _keyvals(iter): |
| 554 | """Return a dictionary formed from the KEY=VALUE pairs returned by the |
| 555 | iterator ITER.""" |
| 556 | kv = {} |
| 557 | for ww in iter: |
| 558 | for w in ww: |
| 559 | q = w.index('=') |
| 560 | kv[w[:q]] = w[q + 1:] |
| 561 | return kv |
| 562 | |
| 563 | def _simple(iter): |
| 564 | """Raise an error if ITER contains any item.""" |
| 565 | stuff = list(iter) |
| 566 | if len(stuff) != 0: |
| 567 | raise TripeInternalError('expected no response') |
| 568 | return None |
| 569 | |
| 570 | def _oneline(iter): |
| 571 | """If ITER contains a single item, return it; otherwise raise an error.""" |
| 572 | stuff = list(iter) |
| 573 | if len(stuff) != 1: |
| 574 | raise TripeInternalError('expected only one line of response') |
| 575 | return stuff[0] |
| 576 | |
| 577 | def _tracelike(iter): |
| 578 | """Handle a TRACE-like command. The result is a list of tuples (CHAR, |
| 579 | STATUS, DESC): CHAR is a selector character, STATUS is the status (empty if |
| 580 | disabled, `+' if enabled, maybe something else later), and DESC is the |
| 581 | human-readable description.""" |
| 582 | stuff = [] |
| 583 | for ww in iter: |
| 584 | ch = ww[0][0] |
| 585 | st = ww[0][1:] |
| 586 | desc = ' '.join(ww[1:]) |
| 587 | stuff.append((ch, st, desc)) |
| 588 | return stuff |
| 589 | |
| 590 | def _kwopts(kw, allowed): |
| 591 | """Parse keyword arguments into options. ALLOWED is a list of allowable |
| 592 | keywords; raise errors if other keywords are present. KEY = VALUE becomes |
| 593 | an option pair -KEY VALUE if VALUE is a string, just the option -KEY if |
| 594 | VALUE is a true non-string, or nothing if VALUE is false.. Insert a `--' |
| 595 | at the end to stop the parser getting confused.""" |
| 596 | opts = [] |
| 597 | amap = {} |
| 598 | for a in allowed: amap[a] = True |
| 599 | for k, v in kw.iteritems(): |
| 600 | if k not in amap: |
| 601 | raise ValueError('option %s not allowed here' % k) |
| 602 | if isinstance(v, str): |
| 603 | opts += ['-' + k, v] |
| 604 | elif v: |
| 605 | opts += ['-' + k] |
| 606 | opts.append('--') |
| 607 | return opts |
| 608 | |
| 609 | ## Deferral. |
| 610 | _deferq = [] |
| 611 | def defer(func, *args, **kw): |
| 612 | """Call FUNC(*ARGS, **KW) later, in the root coroutine.""" |
| 613 | _deferq.append((func, args, kw)) |
| 614 | |
| 615 | def funargstr(func, args, kw): |
| 616 | items = [repr(a) for a in args] |
| 617 | for k, v in kw.iteritems(): |
| 618 | items.append('%s = %r' % (k, v)) |
| 619 | return '%s(%s)' % (func.__name__, ', '.join(items)) |
| 620 | |
| 621 | def spawn(func, *args, **kw): |
| 622 | """Call FUNC, passing ARGS and KW, in a fresh coroutine.""" |
| 623 | defer(lambda: (Coroutine(func, name = funargstr(func, args, kw)) |
| 624 | .switch(*args, **kw))) |
| 625 | |
| 626 | ## Asides. |
| 627 | _asideq = Queue() |
| 628 | def _runasides(): |
| 629 | """ |
| 630 | Read (FUNC, ARGS, KW) triples from queue and invoke FUNC(*ARGS, **KW). |
| 631 | """ |
| 632 | while True: |
| 633 | func, args, kw = _asideq.get() |
| 634 | try: |
| 635 | func(*args, **kw) |
| 636 | except: |
| 637 | SYS.excepthook(*SYS.exc_info()) |
| 638 | |
| 639 | def aside(func, *args, **kw): |
| 640 | """Call FUNC(*ARGS, **KW) later, in a non-root coroutine.""" |
| 641 | defer(_asideq.put, (func, args, kw)) |
| 642 | |
| 643 | class TripeCommandDispatcher (TripeConnection): |
| 644 | """ |
| 645 | Command dispatcher. |
| 646 | |
| 647 | The command dispatcher is a connection which knows how to handle commands. |
| 648 | This is probably the most important class in this module to understand. |
| 649 | |
| 650 | Lines from the server are parsed into tokens. The first token is a code |
| 651 | (OK or NOTE or something) explaining what kind of line this is. The |
| 652 | `handler' attribute is a dictionary mapping server line codes to handler |
| 653 | functions, which are applied to the words of the line as individual |
| 654 | arguments. *Exception*: the content of TRACE lines is not tokenized. |
| 655 | |
| 656 | There are default handlers for server codes which respond to commands. |
| 657 | Commands arrive as TripeCommand instances through the `rawcommand' |
| 658 | interface. The dispatcher keeps track of which command objects represent |
| 659 | which jobs, and sends responses on to the appropriate command objects by |
| 660 | invoking their `response' methods. Command objects don't see the |
| 661 | BG... codes, because the dispatcher has already transformed them into |
| 662 | regular codes when it was looking up job code. |
| 663 | |
| 664 | The dispatcher also has a special response code of its own: CONNERR |
| 665 | indicates that the connection failed and the command has therefore been |
| 666 | lost; the |
| 667 | """ |
| 668 | |
| 669 | ## --- Infrastructure --- |
| 670 | ## |
| 671 | ## We will get confused if we pipeline commands. Send them one at a time. |
| 672 | ## Only send a command when the previous one detaches or completes. |
| 673 | ## |
| 674 | ## The following attributes are interesting: |
| 675 | ## |
| 676 | ## tagseq Sequence number for next background job (for bgtag) |
| 677 | ## |
| 678 | ## queue Commands awaiting submission. |
| 679 | ## |
| 680 | ## cmd Mapping from job tags to commands: cmd[None] is the |
| 681 | ## foreground command. |
| 682 | ## |
| 683 | ## handler Mapping from server codes to handler functions. |
| 684 | |
| 685 | def __init__(me, socket): |
| 686 | """ |
| 687 | Initialize the dispatcher. |
| 688 | |
| 689 | The SOCKET is the filename of the administration socket to connect to, |
| 690 | for TripeConnection.__init__. |
| 691 | """ |
| 692 | TripeConnection.__init__(me, socket) |
| 693 | me.tagseq = 0 |
| 694 | me.handler = {} |
| 695 | me.handler['BGDETACH'] = me._detach |
| 696 | for i in 'BGOK', 'BGINFO', 'BGFAIL': |
| 697 | me.handler[i] = me._response |
| 698 | for i in 'OK', 'INFO', 'FAIL': |
| 699 | me.handler[i] = me._fgresponse |
| 700 | |
| 701 | def quitp(me): |
| 702 | """Should we quit the main loop? Subclasses should override.""" |
| 703 | return False |
| 704 | |
| 705 | def mainloop(me, quitp = None): |
| 706 | """ |
| 707 | Iterate the I/O watcher until QUITP returns true. |
| 708 | |
| 709 | Arranges for asides and deferred calls to be made at the right times. |
| 710 | """ |
| 711 | |
| 712 | global _deferq |
| 713 | assert _Coroutine.getcurrent() is rootcr |
| 714 | Coroutine(_runasides, name = '_runasides').switch() |
| 715 | if quitp is None: |
| 716 | quitp = me.quitp |
| 717 | while not quitp(): |
| 718 | while _deferq: |
| 719 | q = _deferq |
| 720 | _deferq = [] |
| 721 | for func, args, kw in q: |
| 722 | func(*args, **kw) |
| 723 | me.iowatch.iterate() |
| 724 | |
| 725 | def connected(me): |
| 726 | """ |
| 727 | Connection hook. |
| 728 | |
| 729 | If a subclass overrides this method, it must call us; clears out the |
| 730 | command queue and job map. |
| 731 | """ |
| 732 | me.queue = M.Array() |
| 733 | me.cmd = {} |
| 734 | TripeConnection.connected(me) |
| 735 | |
| 736 | def disconnected(me, reason): |
| 737 | """ |
| 738 | Disconnection hook. |
| 739 | |
| 740 | If a subclass hooks overrides this method, it must call us; sends a |
| 741 | special CONNERR code to all incomplete commands. |
| 742 | """ |
| 743 | TripeConnection.disconnected(me, reason) |
| 744 | for cmd in me.cmd.itervalues(): |
| 745 | cmd.response('CONNERR', reason) |
| 746 | for cmd in me.queue: |
| 747 | cmd.response('CONNERR', reason) |
| 748 | |
| 749 | @_callback |
| 750 | def line(me, line): |
| 751 | """Handle an incoming line, sending it to the right place.""" |
| 752 | if _debug: print '<', line |
| 753 | code, rest = M.word(line, quotep = True) |
| 754 | func = me.handler.get(code) |
| 755 | if func is not None: |
| 756 | if code == 'TRACE': |
| 757 | func(code, rest) |
| 758 | else: |
| 759 | func(code, *M.split(rest, quotep = True)[0]) |
| 760 | me.dequeue() |
| 761 | |
| 762 | def dequeue(me): |
| 763 | """ |
| 764 | Pull the oldest command off the queue and try to send it to the server. |
| 765 | """ |
| 766 | if not me.queue or None in me.cmd: return |
| 767 | cmd = me.queue.shift() |
| 768 | if _debug: print '>', ' '.join([quotify(w) for w in cmd.words]) |
| 769 | me.send(' '.join([quotify(w) for w in cmd.words])) |
| 770 | me.cmd[None] = cmd |
| 771 | |
| 772 | def bgtag(me): |
| 773 | """ |
| 774 | Return an unused job tag. |
| 775 | |
| 776 | May be of use when composing commands by hand. |
| 777 | """ |
| 778 | tag = 'J%05d' % me.tagseq |
| 779 | me.tagseq += 1 |
| 780 | return tag |
| 781 | |
| 782 | ## --- Built-in handler functions for server responses --- |
| 783 | |
| 784 | def _detach(me, _, tag): |
| 785 | """ |
| 786 | Respond to a BGDETACH TAG message. |
| 787 | |
| 788 | Move the current foreground command to the background. |
| 789 | """ |
| 790 | assert tag not in me.cmd |
| 791 | me.cmd[tag] = me.cmd[None] |
| 792 | del me.cmd[None] |
| 793 | |
| 794 | def _response(me, code, tag, *w): |
| 795 | """ |
| 796 | Respond to an OK, INFO or FAIL message. |
| 797 | |
| 798 | If this is a message for a background job, find the tag; then dispatch |
| 799 | the result to the command object. |
| 800 | """ |
| 801 | if code.startswith('BG'): |
| 802 | code = code[2:] |
| 803 | cmd = me.cmd[tag] |
| 804 | if code != 'INFO': |
| 805 | del me.cmd[tag] |
| 806 | cmd.response(code, *w) |
| 807 | |
| 808 | def _fgresponse(me, code, *w): |
| 809 | """Process responses to the foreground command.""" |
| 810 | me._response(code, None, *w) |
| 811 | |
| 812 | ## --- Interface methods --- |
| 813 | |
| 814 | def rawcommand(me, cmd): |
| 815 | """ |
| 816 | Submit the TripeCommand CMD to the server, and look after it until it |
| 817 | completes. |
| 818 | """ |
| 819 | if not me.connectedp(): |
| 820 | raise TripeConnectionError('connection closed') |
| 821 | me.queue.push(cmd) |
| 822 | me.dequeue() |
| 823 | |
| 824 | def command(me, *cmd, **kw): |
| 825 | """Convenience wrapper for creating a TripeCommandIterator object.""" |
| 826 | return TripeCommandIterator(me, cmd, **kw) |
| 827 | |
| 828 | ## --- Convenience methods for server commands --- |
| 829 | |
| 830 | def add(me, peer, *addr, **kw): |
| 831 | return _simple(me.command(bg = True, |
| 832 | *['ADD'] + |
| 833 | _kwopts(kw, ['tunnel', 'keepalive', |
| 834 | 'key', 'cork', 'mobile']) + |
| 835 | [peer] + |
| 836 | list(addr))) |
| 837 | def addr(me, peer): |
| 838 | return _oneline(me.command('ADDR', peer)) |
| 839 | def algs(me): |
| 840 | return _keyvals(me.command('ALGS')) |
| 841 | def checkchal(me, chal): |
| 842 | return _simple(me.command('CHECKCHAL', chal)) |
| 843 | def daemon(me): |
| 844 | return _simple(me.command('DAEMON')) |
| 845 | def eping(me, peer, **kw): |
| 846 | return _oneline(me.command(bg = True, |
| 847 | *['PING'] + |
| 848 | _kwopts(kw, ['timeout']) + |
| 849 | [peer])) |
| 850 | def forcekx(me, peer): |
| 851 | return _simple(me.command('FORCEKX', peer)) |
| 852 | def getchal(me): |
| 853 | return _oneline(me.command('GETCHAL', filter = _tokenjoin)) |
| 854 | def greet(me, peer, chal): |
| 855 | return _simple(me.command('GREET', peer, chal)) |
| 856 | def help(me): |
| 857 | return list(me.command('HELP', filter = _tokenjoin)) |
| 858 | def ifname(me, peer): |
| 859 | return _oneline(me.command('IFNAME', peer, filter = _tokenjoin)) |
| 860 | def kill(me, peer): |
| 861 | return _simple(me.command('KILL', peer)) |
| 862 | def list(me): |
| 863 | return list(me.command('LIST', filter = _tokenjoin)) |
| 864 | def notify(me, *msg): |
| 865 | return _simple(me.command('NOTIFY', *msg)) |
| 866 | def peerinfo(me, peer): |
| 867 | return _keyvals(me.command('PEERINFO', peer)) |
| 868 | def ping(me, peer, **kw): |
| 869 | return _oneline(me.command(bg = True, |
| 870 | *['PING'] + |
| 871 | _kwopts(kw, ['timeout']) + |
| 872 | [peer])) |
| 873 | def port(me): |
| 874 | return _oneline(me.command('PORT', filter = _tokenjoin)) |
| 875 | def quit(me): |
| 876 | return _simple(me.command('QUIT')) |
| 877 | def reload(me): |
| 878 | return _simple(me.command('RELOAD')) |
| 879 | def servinfo(me): |
| 880 | return _keyvals(me.command('SERVINFO')) |
| 881 | def setifname(me, new): |
| 882 | return _simple(me.command('SETIFNAME', new)) |
| 883 | def svcclaim(me, service, version): |
| 884 | return _simple(me.command('SVCCLAIM', service, version)) |
| 885 | def svcensure(me, service, version = None): |
| 886 | return _simple(me.command('SVCENSURE', service, |
| 887 | *((version is not None and [version]) or []))) |
| 888 | def svcfail(me, job, *msg): |
| 889 | return _simple(me.command('SVCFAIL', job, *msg)) |
| 890 | def svcinfo(me, job, *msg): |
| 891 | return _simple(me.command('SVCINFO', job, *msg)) |
| 892 | def svclist(me): |
| 893 | return list(me.command('SVCLIST')) |
| 894 | def svcok(me, job): |
| 895 | return _simple(me.command('SVCOK', job)) |
| 896 | def svcquery(me, service): |
| 897 | return _keyvals(me.command('SVCQUERY', service)) |
| 898 | def svcrelease(me, service): |
| 899 | return _simple(me.command('SVCRELEASE', service)) |
| 900 | def svcsubmit(me, service, *args, **kw): |
| 901 | return me.command(bg = True, |
| 902 | *['SVCSUBMIT'] + |
| 903 | _kwopts(kw, ['version']) + |
| 904 | [service] + |
| 905 | list(args)) |
| 906 | def stats(me, peer): |
| 907 | return _keyvals(me.command('STATS', peer)) |
| 908 | def trace(me, *args): |
| 909 | return _tracelike(me.command('TRACE', *args)) |
| 910 | def tunnels(me): |
| 911 | return list(me.command('TUNNELS', filter = _tokenjoin)) |
| 912 | def version(me): |
| 913 | return _oneline(me.command('VERSION', filter = _tokenjoin)) |
| 914 | def warn(me, *msg): |
| 915 | return _simple(me.command('WARN', *msg)) |
| 916 | def watch(me, *args): |
| 917 | return _tracelike(me.command('WATCH', *args)) |
| 918 | |
| 919 | ###-------------------------------------------------------------------------- |
| 920 | ### Asynchronous commands. |
| 921 | |
| 922 | class TripeAsynchronousCommand (TripeCommand): |
| 923 | """ |
| 924 | Asynchronous commands. |
| 925 | |
| 926 | This is the complicated way of issuing commands. You must set up a queue, |
| 927 | and associate the command with the queue. Responses arriving for the |
| 928 | command will be put on the queue as an triple of the form (TAG, CODE, REST) |
| 929 | -- where TAG is an object of your choice, not interpreted by this class, |
| 930 | CODE is the server's response code (OK, INFO, FAIL), and REST is the list |
| 931 | of the rest of the server's tokens. |
| 932 | |
| 933 | Using this, you can write coroutines which process many commands (and |
| 934 | possibly other events) simultaneously. |
| 935 | """ |
| 936 | |
| 937 | def __init__(me, queue, tag, words): |
| 938 | """Make an asynchronous command consisting of the given WORDS, which |
| 939 | sends responses to QUEUE, labelled with TAG.""" |
| 940 | TripeCommand.__init__(me, words) |
| 941 | me.queue = queue |
| 942 | me.tag = tag |
| 943 | |
| 944 | def response(me, code, *stuff): |
| 945 | """Handle a server response by writing it to the caller's queue.""" |
| 946 | me.queue.put((me.tag, code, list(stuff))) |
| 947 | |
| 948 | ###-------------------------------------------------------------------------- |
| 949 | ### Services. |
| 950 | |
| 951 | class TripeJobCancelled (Exception): |
| 952 | """ |
| 953 | Exception sent to job handler if the client kills the job. |
| 954 | |
| 955 | Not propagated further. |
| 956 | """ |
| 957 | pass |
| 958 | |
| 959 | class TripeJobError (Exception): |
| 960 | """ |
| 961 | Exception to cause failure report for running job. |
| 962 | |
| 963 | Sends an SVCFAIL code back. |
| 964 | """ |
| 965 | pass |
| 966 | |
| 967 | class TripeSyntaxError (Exception): |
| 968 | """ |
| 969 | Exception to report a syntax error for a job. |
| 970 | |
| 971 | Sends an SVCFAIL bad-svc-syntax message back. |
| 972 | """ |
| 973 | pass |
| 974 | |
| 975 | class TripeServiceManager (TripeCommandDispatcher): |
| 976 | """ |
| 977 | A command dispatcher with added handling for incoming service requests. |
| 978 | |
| 979 | There is usually only one instance of this class, called svcmgr. Some of |
| 980 | the support functions in this module assume that this is the case. |
| 981 | |
| 982 | To use, run mLib.select in a loop until the quitp method returns true; |
| 983 | then, in a non-root coroutine, register your services by calling `add', and |
| 984 | then call `running' when you've finished setting up. |
| 985 | |
| 986 | The instance handles server service messages SVCJOB, SVCCANCEL and |
| 987 | SVCCLAIM. It maintains a table of running services. Incoming jobs cause |
| 988 | the service's `job' method to be invoked; SVCCANCEL sends a |
| 989 | TripeJobCancelled exception to the handler coroutine, and SVCCLAIM causes |
| 990 | the relevant service to be deregistered. |
| 991 | |
| 992 | There is no base class for jobs, but a job must implement two methods: |
| 993 | |
| 994 | start() Begin processing; might be a no-op. |
| 995 | |
| 996 | cancel() Stop processing; the original client has killed the |
| 997 | job. |
| 998 | |
| 999 | The life of a service manager is divided into two parts: setup and running; |
| 1000 | you tell the manager that you've finished setting up by calling the |
| 1001 | `running' method. If, at any point after setup is finished, there are no |
| 1002 | remaining services or jobs, `quitp' will return true, ending the process. |
| 1003 | """ |
| 1004 | |
| 1005 | ## --- Attributes --- |
| 1006 | ## |
| 1007 | ## svc Mapping name -> service object |
| 1008 | ## |
| 1009 | ## job Mapping jobid -> job handler coroutine |
| 1010 | ## |
| 1011 | ## runningp True when setup is finished |
| 1012 | ## |
| 1013 | ## _quitp True if explicit quit has been requested |
| 1014 | |
| 1015 | def __init__(me, socket): |
| 1016 | """ |
| 1017 | Initialize the service manager. |
| 1018 | |
| 1019 | SOCKET is the administration socket to connect to. |
| 1020 | """ |
| 1021 | TripeCommandDispatcher.__init__(me, socket) |
| 1022 | me.svc = {} |
| 1023 | me.job = {} |
| 1024 | me.runningp = False |
| 1025 | me.handler['SVCCANCEL'] = me._cancel |
| 1026 | me.handler['SVCJOB'] = me._job |
| 1027 | me.handler['SVCCLAIM'] = me._claim |
| 1028 | me._quitp = 0 |
| 1029 | |
| 1030 | def addsvc(me, svc): |
| 1031 | """Register a new service; SVC is a TripeService instance.""" |
| 1032 | assert svc.name not in me.svc |
| 1033 | me.svcclaim(svc.name, svc.version) |
| 1034 | me.svc[svc.name] = svc |
| 1035 | |
| 1036 | def _cancel(me, _, jid): |
| 1037 | """ |
| 1038 | Called when the server cancels a job; invokes the job's `cancel' method. |
| 1039 | """ |
| 1040 | job = me.job[jid] |
| 1041 | del me.job[jid] |
| 1042 | job.cancel() |
| 1043 | |
| 1044 | def _claim(me, _, svc, __): |
| 1045 | """Called when another program claims our service at a higher version.""" |
| 1046 | del me.svc[svc] |
| 1047 | |
| 1048 | def _job(me, _, jid, svc, cmd, *args): |
| 1049 | """ |
| 1050 | Called when the server sends us a job to do. |
| 1051 | |
| 1052 | Calls the service to collect a job, and begins processing it. |
| 1053 | """ |
| 1054 | assert jid not in me.job |
| 1055 | svc = me.svc[svc.lower()] |
| 1056 | job = svc.job(jid, cmd, args) |
| 1057 | me.job[jid] = job |
| 1058 | job.start() |
| 1059 | |
| 1060 | def running(me): |
| 1061 | """Answer true if setup is finished.""" |
| 1062 | me.runningp = True |
| 1063 | |
| 1064 | def jobdone(me, jid): |
| 1065 | """Informs the service manager that the job with id JID has finished.""" |
| 1066 | try: |
| 1067 | del me.job[jid] |
| 1068 | except KeyError: |
| 1069 | pass |
| 1070 | |
| 1071 | def quitp(me): |
| 1072 | """ |
| 1073 | Return true if no services or jobs are active (and, therefore, if this |
| 1074 | process can quit without anyone caring). |
| 1075 | """ |
| 1076 | return me._quitp or (me.runningp and ((not me.svc and not me.job) or |
| 1077 | not me.sock)) |
| 1078 | |
| 1079 | def quit(me): |
| 1080 | """Forces the quit flag (returned by quitp) on.""" |
| 1081 | me._quitp = True |
| 1082 | |
| 1083 | class TripeService (object): |
| 1084 | """ |
| 1085 | A standard service. |
| 1086 | |
| 1087 | The NAME and VERSION are passed on to the server. The CMDTAB is a |
| 1088 | dictionary mapping command names (in lowercase) to command objects. |
| 1089 | |
| 1090 | If the CMDTAB doesn't have entries for commands HELP and QUIT then defaults |
| 1091 | are provided. |
| 1092 | |
| 1093 | TripeService itself is mostly agnostic about the nature of command objects, |
| 1094 | but the TripeServiceJob class (below) has some requirements. The built-in |
| 1095 | HELP command requires command objects to have `usage' attributes. |
| 1096 | """ |
| 1097 | |
| 1098 | def __init__(me, name, version, cmdtab): |
| 1099 | """ |
| 1100 | Create and register a new service with the given NAME and VERSION. |
| 1101 | |
| 1102 | CMDTAB maps command names (in lower-case) to command objects. |
| 1103 | """ |
| 1104 | me.name = name |
| 1105 | me.version = version |
| 1106 | me.cmd = cmdtab |
| 1107 | me.activep = True |
| 1108 | me.cmd.setdefault('help', |
| 1109 | TripeServiceCommand('help', 0, 0, '', me._help)) |
| 1110 | me.cmd.setdefault('quit', |
| 1111 | TripeServiceCommand('quit', 0, 0, '', me._quit)) |
| 1112 | |
| 1113 | def job(me, jid, cmd, args): |
| 1114 | """ |
| 1115 | Called by the service manager: a job arrived with id JID. |
| 1116 | |
| 1117 | It asks for comamnd CMD with argument list ARGS. Creates a new job, |
| 1118 | passing it the information needed. |
| 1119 | """ |
| 1120 | return TripeServiceJob(jid, me, cmd, me.cmd.get(cmd.lower()), args) |
| 1121 | |
| 1122 | ## Simple default command handlers, complying with the spec in |
| 1123 | ## tripe-service(7). |
| 1124 | |
| 1125 | def _help(me): |
| 1126 | """Send a help summary to the user.""" |
| 1127 | cmds = me.cmd.items() |
| 1128 | cmds.sort() |
| 1129 | for name, cmd in cmds: |
| 1130 | svcinfo(name, *cmd.usage) |
| 1131 | |
| 1132 | def _quit(me): |
| 1133 | """Terminate the service manager.""" |
| 1134 | svcmgr.notify('svc-quit', me.name, 'admin-request') |
| 1135 | svcmgr.quit() |
| 1136 | |
| 1137 | class TripeServiceCommand (object): |
| 1138 | """A simple service command.""" |
| 1139 | |
| 1140 | def __init__(me, name, min, max, usage, func): |
| 1141 | """ |
| 1142 | Creates a new command. |
| 1143 | |
| 1144 | NAME is the command's name (in lowercase). |
| 1145 | |
| 1146 | MIN and MAX are the minimum and maximum number of allowed arguments (used |
| 1147 | for checking); either may be None to indicate no minimum or maximum. |
| 1148 | |
| 1149 | USAGE is a usage string, used for generating help and error messages. |
| 1150 | |
| 1151 | FUNC is the function to invoke. |
| 1152 | """ |
| 1153 | me.name = name |
| 1154 | me.min = min |
| 1155 | me.max = max |
| 1156 | me.usage = usage.split() |
| 1157 | me.func = func |
| 1158 | |
| 1159 | def run(me, *args): |
| 1160 | """ |
| 1161 | Called when the command is invoked. |
| 1162 | |
| 1163 | Does minimal checking of the arguments and calls the supplied function. |
| 1164 | """ |
| 1165 | if (me.min is not None and len(args) < me.min) or \ |
| 1166 | (me.max is not None and len(args) > me.max): |
| 1167 | raise TripeSyntaxError |
| 1168 | me.func(*args) |
| 1169 | |
| 1170 | class TripeServiceJob (Coroutine): |
| 1171 | """ |
| 1172 | Job handler coroutine. |
| 1173 | |
| 1174 | A standard TripeService invokes a TripeServiceJob for each incoming job |
| 1175 | request, passing it the jobid, command and arguments, and a command |
| 1176 | object. The command object needs the following attributes. |
| 1177 | |
| 1178 | usage A usage list (excluding the command name) showing |
| 1179 | arguments and options. |
| 1180 | |
| 1181 | run(*ARGS) Function to react to the command with ARGS split into |
| 1182 | separate arguments. Invoked in a coroutine. The |
| 1183 | svcinfo function (not the TripeCommandDispatcher |
| 1184 | method) may be used to send INFO lines. The function |
| 1185 | may raise TripeJobError to send a FAIL response back, |
| 1186 | or TripeSyntaxError to send a generic usage error. |
| 1187 | TripeJobCancelled exceptions are trapped silently. |
| 1188 | Other exceptions are translated into a generic |
| 1189 | internal-error message. |
| 1190 | |
| 1191 | This class automatically takes care of sending some closing response to the |
| 1192 | job, and for informing the service manager that the job is completed. |
| 1193 | |
| 1194 | The `jid' attribute stores the job's id. |
| 1195 | """ |
| 1196 | |
| 1197 | def __init__(me, jid, svc, cmd, command, args): |
| 1198 | """ |
| 1199 | Start a new job. |
| 1200 | |
| 1201 | The job is created with id JID, for service SVC, processing command name |
| 1202 | CMD (which the service resolved into the command object COMMAND, or |
| 1203 | None), and with the arguments ARGS. |
| 1204 | """ |
| 1205 | Coroutine.__init__(me) |
| 1206 | me.jid = jid |
| 1207 | me.svc = svc |
| 1208 | me.cmd = cmd |
| 1209 | me.command = command |
| 1210 | me.args = args |
| 1211 | |
| 1212 | def run(me): |
| 1213 | """ |
| 1214 | Main body of the coroutine. |
| 1215 | |
| 1216 | Does the tedious exception handling boilerplate and invokes the command's |
| 1217 | run method. |
| 1218 | """ |
| 1219 | try: |
| 1220 | try: |
| 1221 | if me.command is None: |
| 1222 | svcmgr.svcfail(me.jid, 'unknown-svc-command', me.cmd) |
| 1223 | else: |
| 1224 | me.command.run(*me.args) |
| 1225 | svcmgr.svcok(me.jid) |
| 1226 | except TripeJobError, exc: |
| 1227 | svcmgr.svcfail(me.jid, *exc.args) |
| 1228 | except TripeSyntaxError: |
| 1229 | svcmgr.svcfail(me.jid, 'bad-svc-syntax', |
| 1230 | me.svc.name, me.command.name, |
| 1231 | *me.command.usage) |
| 1232 | except TripeJobCancelled: |
| 1233 | pass |
| 1234 | except Exception, exc: |
| 1235 | svcmgr.svcfail(me.jid, 'svc-internal-error', |
| 1236 | exc.__class__.__name__, str(exc)) |
| 1237 | finally: |
| 1238 | svcmgr.jobdone(me.jid) |
| 1239 | |
| 1240 | def start(me): |
| 1241 | """Invoked by the service manager to start running the coroutine.""" |
| 1242 | me.switch() |
| 1243 | |
| 1244 | def cancel(me): |
| 1245 | """Invoked by the service manager to cancel the job.""" |
| 1246 | me.throw(TripeJobCancelled()) |
| 1247 | |
| 1248 | def svcinfo(*args): |
| 1249 | """ |
| 1250 | If invoked from a TripeServiceJob coroutine, sends an INFO line to the |
| 1251 | job's sender, automatically using the correct job id. |
| 1252 | """ |
| 1253 | svcmgr.svcinfo(Coroutine.getcurrent().jid, *args) |
| 1254 | |
| 1255 | def _setupsvc(tab, func): |
| 1256 | """ |
| 1257 | Setup coroutine for setting up service programs. |
| 1258 | |
| 1259 | Register the given services. |
| 1260 | """ |
| 1261 | try: |
| 1262 | for service in tab: |
| 1263 | svcmgr.addsvc(service) |
| 1264 | if func: |
| 1265 | func() |
| 1266 | finally: |
| 1267 | svcmgr.running() |
| 1268 | |
| 1269 | svcmgr = TripeServiceManager(None) |
| 1270 | def runservices(socket, tab, init = None, setup = None, daemon = False): |
| 1271 | """ |
| 1272 | Function to start a service provider. |
| 1273 | |
| 1274 | SOCKET is the socket to connect to, usually tripesock. |
| 1275 | |
| 1276 | TAB is a list of entries. An entry may be either a tuple |
| 1277 | |
| 1278 | (NAME, VERSION, COMMANDS) |
| 1279 | |
| 1280 | or a service object (e.g., a TripeService instance). |
| 1281 | |
| 1282 | COMMANDS is a dictionary mapping command names to tuples |
| 1283 | |
| 1284 | (MIN, MAX, USAGE, FUNC) |
| 1285 | |
| 1286 | of arguments for a TripeServiceCommand object. |
| 1287 | |
| 1288 | If DAEMON is true, then the process is forked into the background before we |
| 1289 | start. If INIT is given, it is called in the main coroutine, immediately |
| 1290 | after forking. If SETUP is given, it is called in a coroutine, after |
| 1291 | calling INIT and setting up the services but before marking the service |
| 1292 | manager as running. |
| 1293 | |
| 1294 | It is a really bad idea to do any initialization, particularly setting up |
| 1295 | coroutines, outside of the INIT or SETUP functions. In particular, if |
| 1296 | we're using rmcr for fake coroutines, the daemonizing fork will kill off |
| 1297 | the currently established coroutines in a most surprising way. |
| 1298 | |
| 1299 | The function runs a main select loop until the service manager decides to |
| 1300 | quit. |
| 1301 | """ |
| 1302 | |
| 1303 | svcmgr.socket = socket |
| 1304 | svcmgr.connect() |
| 1305 | svcs = [] |
| 1306 | for service in tab: |
| 1307 | if not isinstance(service, tuple): |
| 1308 | svcs.append(service) |
| 1309 | else: |
| 1310 | name, version, commands = service |
| 1311 | cmdmap = {} |
| 1312 | for cmd, stuff in commands.iteritems(): |
| 1313 | cmdmap[cmd] = TripeServiceCommand(cmd, *stuff) |
| 1314 | svcs.append(TripeService(name, version, cmdmap)) |
| 1315 | if daemon: |
| 1316 | M.daemonize() |
| 1317 | if init is not None: |
| 1318 | init() |
| 1319 | spawn(_setupsvc, svcs, setup) |
| 1320 | svcmgr.mainloop() |
| 1321 | |
| 1322 | ###-------------------------------------------------------------------------- |
| 1323 | ### Utilities for services. |
| 1324 | |
| 1325 | _timeunits = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400} |
| 1326 | def timespec(spec): |
| 1327 | """Parse the timespec SPEC, returning a number of seconds.""" |
| 1328 | mul = 1 |
| 1329 | if len(spec) > 1 and spec[-1] in _timeunits: |
| 1330 | mul = _timeunits[spec[-1]] |
| 1331 | spec = spec[:-1] |
| 1332 | try: |
| 1333 | t = int(spec) |
| 1334 | except: |
| 1335 | raise TripeJobError('bad-time-spec', spec) |
| 1336 | if t < 0: |
| 1337 | raise TripeJobError('bad-time-spec', spec) |
| 1338 | return mul * int(spec) |
| 1339 | |
| 1340 | class OptParse (object): |
| 1341 | """ |
| 1342 | Parse options from a command list in the conventional fashion. |
| 1343 | |
| 1344 | ARGS is a list of arguments to a command. ALLOWED is a sequence of allowed |
| 1345 | options. The returned values are the option tags. During parsing, the |
| 1346 | `arg' method may be used to retrieve the argument for the most recent |
| 1347 | option. Afterwards, `rest' may be used to retrieve the remaining |
| 1348 | non-option arguments, and do a simple check on how many there are. |
| 1349 | |
| 1350 | The parser correctly handles `--' option terminators. |
| 1351 | """ |
| 1352 | |
| 1353 | def __init__(me, args, allowed): |
| 1354 | """ |
| 1355 | Create a new option parser. |
| 1356 | |
| 1357 | The parser will scan the ARGS for options given in the sequence ALLOWED |
| 1358 | (which are expected to include the `-' prefix). |
| 1359 | """ |
| 1360 | me.allowed = {} |
| 1361 | for a in allowed: |
| 1362 | me.allowed[a] = True |
| 1363 | me.args = list(args) |
| 1364 | |
| 1365 | def __iter__(me): |
| 1366 | """Iterator protocol: I am my own iterator.""" |
| 1367 | return me |
| 1368 | |
| 1369 | def next(me): |
| 1370 | """ |
| 1371 | Iterator protocol: return the next option. |
| 1372 | |
| 1373 | If we've run out, raise StopIteration. |
| 1374 | """ |
| 1375 | if len(me.args) == 0 or \ |
| 1376 | len(me.args[0]) < 2 or \ |
| 1377 | not me.args[0].startswith('-'): |
| 1378 | raise StopIteration |
| 1379 | opt = me.args.pop(0) |
| 1380 | if opt == '--': |
| 1381 | raise StopIteration |
| 1382 | if opt not in me.allowed: |
| 1383 | raise TripeSyntaxError |
| 1384 | return opt |
| 1385 | |
| 1386 | def arg(me): |
| 1387 | """ |
| 1388 | Return the argument for the most recent option. |
| 1389 | |
| 1390 | If none is available, raise TripeSyntaxError. |
| 1391 | """ |
| 1392 | if len(me.args) == 0: |
| 1393 | raise TripeSyntaxError |
| 1394 | return me.args.pop(0) |
| 1395 | |
| 1396 | def rest(me, min = None, max = None): |
| 1397 | """ |
| 1398 | After option parsing is done, return the remaining arguments. |
| 1399 | |
| 1400 | Check that there are at least MIN and at most MAX arguments remaining -- |
| 1401 | either may be None to suppress the check. |
| 1402 | """ |
| 1403 | if (min is not None and len(me.args) < min) or \ |
| 1404 | (max is not None and len(me.args) > max): |
| 1405 | raise TripeSyntaxError |
| 1406 | return me.args |
| 1407 | |
| 1408 | ###----- That's all, folks -------------------------------------------------- |