4 ### Watch arrival and departure of peers
6 ### (c) 2007 Straylight/Edgeware
9 ###----- Licensing notice ---------------------------------------------------
11 ### This file is part of Trivial IP Encryption (TrIPE).
13 ### TrIPE is free software; you can redistribute it and/or modify
14 ### it under the terms of the GNU General Public License as published by
15 ### the Free Software Foundation; either version 2 of the License, or
16 ### (at your option) any later version.
18 ### TrIPE is distributed in the hope that it will be useful,
19 ### but WITHOUT ANY WARRANTY; without even the implied warranty of
20 ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 ### GNU General Public License for more details.
23 ### You should have received a copy of the GNU General Public License
24 ### along with TrIPE; if not, write to the Free Software Foundation,
25 ### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
29 ###--------------------------------------------------------------------------
30 ### External dependencies.
32 from optparse import OptionParser
41 import subprocess as PROC
45 ###--------------------------------------------------------------------------
46 ### Running auxiliary commands.
48 class SelLineQueue (M.SelLineBuffer):
49 """Glues the select-line-buffer into the coroutine queue system."""
51 def __new__(cls, file, queue, tag, kind):
52 """See __init__ for documentation."""
53 return M.SelLineBuffer.__new__(cls, file.fileno())
55 def __init__(me, file, queue, tag, kind):
57 Initialize a new line-reading adaptor.
59 The adaptor reads lines from FILE. Each line is inserted as a message of
60 the stated KIND, bearing the TAG, into the QUEUE. End-of-file is
71 me._q.put((me._tag, me._kind, line))
76 me._q.put((me._tag, me._kind, None))
78 class ErrorWatch (T.Coroutine):
80 An object which watches stderr streams for errors and converts them into
83 WARN watch INFO stderr LINE
85 The INFO is a list of tokens associated with the file when it was
88 Usually there is a single ErrorWatch object, called errorwatch.
92 """Initialization: there are no arguments."""
93 T.Coroutine.__init__(me)
98 def watch(me, file, info):
100 Adds FILE to the collection of files to watch.
102 INFO will be written in the warning messages from this FILE. Returns a
103 sequence number which can be used to unregister the file again.
107 me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
110 def unwatch(me, seq):
111 """Stop watching the file with sequence number SEQ."""
117 Coroutine function: read items from the queue and report them.
119 Unregisters files automatically when they reach EOF.
122 seq, _, line = me._q.get()
126 S.warn(*['watch'] + me._map[seq][0] + ['stderr', line])
130 Coroutine function: wake up every second and notice changes to the
131 database. When a change happens, tell the Pinger (q.v.) to rescan its
134 cr = T.Coroutine.getcurrent()
136 fw = M.FWatch(opts.cdb)
138 timer = M.SelTimer(time() + 1, lambda: cr.switch())
142 S.notify('watch', 'peerdb-update')
144 class ChildWatch (M.SelSignal):
146 An object which watches for specified processes exiting and reports
147 terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
149 There is usually only one ChildWatch object, called childwatch.
153 """Initialize the child-watcher."""
154 return M.SelSignal.__new__(cls, SIG.SIGCHLD)
157 """Initialize the child-watcher."""
161 def watch(me, pid, queue, tag):
163 Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE)
164 to the QUEUE, where CODE is one of
166 * None (successful termination)
167 * ['exit-nonzero', CODE] (CODE is a string!)
168 * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
169 * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
171 me._pid[pid] = queue, tag
174 def unwatch(me, pid):
175 """Unregister PID as a child to watch."""
182 Called when child processes exit: collect exit statuses and report
187 pid, status = OS.waitpid(-1, OS.WNOHANG)
189 if exc.errno == E.ECHILD:
193 if pid not in me._pid:
195 queue, tag = me._pid[pid]
196 if OS.WIFEXITED(status):
197 exit = OS.WEXITSTATUS(status)
201 code = ['exit-nonzero', str(exit)]
202 elif OS.WIFSIGNALED(status):
203 code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
205 code = ['exit-unknown', hex(status)]
206 queue.put((tag, 'exit', code))
208 class Command (object):
210 Represents a running command.
212 This class is the main interface to the machery provided by the ChildWatch
213 and ErrorWatch objects. See also potwatch.
216 def __init__(me, info, queue, tag, args, env):
218 Start a new child process.
220 The ARGS are a list of arguments to be given to the child process. The
221 ENV is either None or a dictionary of environment variable assignments to
222 override the extant environment. INFO is a list of tokens to be included
223 in warnings about the child's stderr output. If the child writes a line
224 to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the
225 child exits, write (TAG, 'exit', CODE) to the QUEUE.
230 myenv = OS.environ.copy()
231 if env: myenv.update(env)
232 me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
233 stdout = PROC.PIPE, stderr = PROC.PIPE)
234 me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
235 errorwatch.watch(me._proc.stderr, info)
236 childwatch.watch(me._proc.pid, queue, tag)
240 If I've been forgotten then stop watching for termination.
242 childwatch.unwatch(me._proc.pid)
244 def potwatch(what, name, q):
246 Watch the queue Q for activity as reported by a Command object.
248 Information from the process's stdout is reported as
250 NOTE WHAT NAME stdout LINE
252 abnormal termination is reported as
256 where CODE is what the ChildWatch wrote.
259 while not deadp or not eofp:
260 _, kind, more = q.get()
265 S.notify('watch', what, name, 'stdout', more)
267 if more: S.warn('watch', what, name, *more)
270 ###--------------------------------------------------------------------------
271 ### Peer database utilities.
273 def timespec(info, key, default):
274 """Parse INFO[KEY] as a timespec, or return DEFAULT."""
276 return T.timespec(info[key])
277 except (KeyError, T.TripeJobError):
280 def integer(info, key, default):
281 """Parse INFO[KEY] as an integer, or return DEFAULT."""
283 return int(info[key])
284 except (KeyError, ValueError):
287 def boolean(info, key, default):
288 """Parse INFO[KEY] as a boolean, or return DEFAULT."""
290 return info[key] in ['t', 'true', 'y', 'yes', 'on']
291 except (KeyError, ValueError):
296 Return a dictionary containing information about PEER from the database.
298 return dict(M.URLDecode(CDB.init(opts.cdb)['P' + peer], semip = True))
300 ###--------------------------------------------------------------------------
301 ### Waking up and watching peers.
303 def connect(peer, conn = None):
305 Start the job of connecting to the passive PEER.
307 The CONN string is a shell command which will connect to the peer (via some
308 back-channel, say ssh and userv), issue a command
310 SVCSUBMIT connect passive [OPTIONS] USER
312 and write the resulting challenge to standard error.
316 conn = peerinfo(peer)['connect']
320 cmd = Command(['connect', peer], q, 'connect',
321 ['/bin/sh', '-c', conn], None)
322 _, kind, more = q.peek()
325 S.warn('watch', 'connect', peer, 'unexpected-eof')
330 potwatch('connect', peer, q)
332 def disconnect(peer, disconn = None):
334 Start the job of disconnecting from a passive PEER.
336 The DISCONN string is a shell command which will disconnect from the peer.
340 conn = peerinfo(peer)['disconnect']
344 cmd = Command(['disconnect', peer], q, 'disconnect',
345 ['/bin/sh', '-c', disconn], None)
346 potwatch('disconnect', peer, q)
349 class PingPeer (object):
351 Object representing a peer which we are pinging to ensure that it is still
354 PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an
355 event queue -- which saves us from having an enormous swarm of coroutines
356 -- but most of the actual work is done here.
358 In order to avoid confusion between different PingPeer instances for the
359 same actual peer, each PingPeer has a sequence number (its `seq'
360 attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair.
361 (Using the PingPeer instance itself will prevent garbage collection of
362 otherwise defunct instances.)
365 def __init__(me, pinger, queue, peer, info, pingnow):
367 Create a new PingPeer.
369 The PINGER is the Pinger object we should send the results to. This is
370 used when we remove ourselves, if the peer has been explicitly removed.
372 The QUEUE is the event queue on which timer and ping-command events
375 The PEER is just the peer's name, as a string.
377 The INFO is the database record for the peer, as a dictionary, or None if
378 it's not readily available. (This is just a tweak to save multiple
379 probes if we don't really need them.)
381 If PINGNOW is true, then immediately start pinging the peer. Otherwise
382 wait until the usual retry interval.
396 me._timer = M.SelTimer(time() + me._every, me._time)
398 def update(me, info):
400 Refreshes the timer parameters for this peer. We don't, however,
401 immediately reschedule anything: that will happen next time anything
405 info = peerinfo(me._peer)
406 me._every = timespec(info, 'every', 120)
407 me._timeout = timespec(info, 'timeout', 10)
408 me._retries = integer(info, 'retries', 5)
409 me._connectp = 'connect' in info
414 Send a ping to the peer; the result is sent to the Pinger's event queue.
416 S.rawcommand(T.TripeAsynchronousCommand(
417 me._q, (me._peer, me.seq),
419 '-background', S.bgtag(),
420 '-timeout', str(me._timeout),
425 info = peerinfo(me._peer)
426 if 'connect' in info:
427 S.warn('watch', 'reconnecting', me._peer)
429 T.spawn(connect, me._peer)
430 me._timer = M.SelTimer(time() + me._every, me._time)
434 def event(me, code, stuff):
436 Respond to an event which happened to this peer.
438 Timer events indicate that we should start a new ping. (The server has
439 its own timeout which detects lost packets.)
441 We trap unknown-peer responses and detach from the Pinger.
443 If the ping fails and we run out of retries, we attempt to restart the
450 S.notify('watch', 'ping-failed', me._peer, *stuff)
453 elif stuff[0] == 'unknown-peer':
454 me._pinger.kill(me._peer)
455 elif stuff[0] == 'ping-send-failed':
458 if stuff[0] == 'ping-ok':
460 S.warn('watch', 'ping-ok', me._peer)
461 me._timer = M.SelTimer(time() + me._every, me._time)
462 elif stuff[0] == 'ping-timeout':
464 S.warn('watch', 'ping-timeout', me._peer,
465 'attempt', str(me._failures), 'of', str(me._retries))
466 if me._failures < me._retries:
470 elif stuff[0] == 'ping-peer-died':
471 me._pinger.kill(me._peer)
476 Handle timer callbacks by posting a timeout event on the queue.
479 me._q.put(((me._peer, me.seq), 'TIMER', None))
482 return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
486 class Pinger (T.Coroutine):
488 The Pinger keeps track of the peers which we expect to be connected and
489 takes action if they seem to stop responding.
491 There is usually only one Pinger, called pinger.
493 The Pinger maintains a collection of PingPeer objects, and an event queue.
494 The PingPeers direct the results of their pings, and timer events, to the
495 event queue. The Pinger's coroutine picks items off the queue and
496 dispatches them back to the PingPeers as appropriate.
500 """Initialize the Pinger."""
501 T.Coroutine.__init__(me)
507 Coroutine function: reads the pinger queue and sends events to the
508 PingPeer objects they correspond to.
511 (peer, seq), code, stuff = me._q.get()
512 if peer in me._peers and seq == me._peers[peer].seq:
513 me._peers[peer].event(code, stuff)
515 def add(me, peer, info, pingnow):
517 Add PEER to the collection of peers under the Pinger's watchful eye.
518 The arguments are as for PingPeer: see above.
520 me._peers[peer] = PingPeer(me, me._q, peer, info, pingnow)
524 """Remove PEER from the peers being watched by the Pinger."""
528 def rescan(me, startup):
530 General resynchronization method.
532 We scan the list of peers (with connect scripts) known at the server.
533 Any which are known to the Pinger but aren't known to the server are
534 removed from our list; newly arrived peers are added. (Note that a peer
535 can change state here either due to the server sneakily changing its list
536 without issuing notifications or, more likely, the database changing its
537 idea of whether a peer is interesting.) Finally, PingPeers which are
538 still present are prodded to update their timing parameters.
540 This method is called once at startup to pick up the peers already
541 installed, and again by the dbwatcher coroutine when it detects a change
544 if T._debug: print '# rescan peers'
547 for peer in S.list():
549 info = peerinfo(peer)
552 if boolean(info, 'watch', False):
553 if T._debug: print '# interesting peer %s' % peer
554 correct[peer] = start[peer] = info
556 if T._debug: print '# peer %s ready for adoption' % peer
558 for peer, obj in me._peers.items():
560 obj.update(correct[peer])
562 if T._debug: print '# peer %s vanished' % peer
564 for peer, info in start.iteritems():
565 if peer not in me._peers:
567 if T._debug: print '# setting up peer %s' % peer
568 ifname = S.ifname(peer)
570 T.defer(addpeer, info, peer, ifname, *addr)
572 if T._debug: print '# adopting new peer %s' % peer
573 me.add(peer, info, True)
578 Returns the list of peers being watched by the Pinger.
580 return me._peers.keys()
582 ###--------------------------------------------------------------------------
585 def encode_envvars(env, prefix, vars):
587 Encode the variables in VARS suitably for including in a program
588 environment. Lowercase letters in variable names are forced to uppercase;
589 runs of non-alphanumeric characters are replaced by single underscores; and
590 the PREFIX is prepended. The resulting variables are written to ENV.
592 for k, v in vars.iteritems():
593 env[prefix + r_bad.sub('_', k.upper())] = v
595 r_bad = RX.compile(r'[\W_]+')
596 def envvars(peer, info):
598 Translate the database INFO dictionary for a PEER into a dictionary of
599 environment variables with plausible upper-case names and a P_ prefix.
600 Also collect the crypto information into A_ variables.
603 encode_envvars(env, 'P_', info)
604 encode_envvars(env, 'A_', S.algs(peer))
607 def ifupdown(what, peer, info, *args):
609 Run the interface up/down script for a peer.
611 WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. INFO is the
612 database record dictionary. ARGS is a list of arguments to pass to the
613 script, in addition to the peer name.
615 The command is run and watched in the background by potwatch.
618 c = Command([what, peer], q, what,
619 M.split(info[what], quotep = True)[0] +
622 potwatch(what, peer, q)
624 def addpeer(info, peer, ifname, *addr):
626 Add a new peer to our collection.
628 INFO is the peer information dictionary, or None if we don't have one yet.
630 PEER names the peer; IFNAME is the interface name for its tunnel; and ADDR
631 is the list of tokens representing its address.
633 We try to bring up the interface and provoke a connection to the peer if
638 info = peerinfo(peer)
642 T.Coroutine(ifupdown, name = 'ifup %s' % peer) \
643 .switch('ifup', peer, info, ifname, *addr)
644 if 'connect' in info:
645 T.Coroutine(connect, name = 'connect %s' % peer) \
646 .switch(peer, info['connect'])
647 if boolean(info, 'watch', False):
648 pinger.add(peer, info, False)
651 """Drop the PEER from the Pinger and put its interface to bed."""
653 info = peerinfo(peer)
660 if 'disconnect' in info:
661 T.Coroutine(disconnect, name = 'disconnect %s' % peer) \
662 .switch(peer, info['disconnect'])
664 T.Coroutine(ifupdown, name = 'ifdown %s' % peer) \
665 .switch('ifdown', peer, info)
667 def notify(_, code, *rest):
669 Watch for notifications.
671 We trap ADD and KILL notifications, and send them straight to addpeer and
672 delpeer respectively.
679 ###--------------------------------------------------------------------------
683 raise T.TripeJobError('not-implemented')
687 kick PEER: Force a new connection attempt for PEER
689 if peer not in pinger.adopted():
690 raise T.TripeJobError('peer-not-adopted', peer)
691 T.spawn(connect, peer)
695 adopted: Report a list of adopted peers.
697 for peer in pinger.adopted():
700 ###--------------------------------------------------------------------------
707 Register the notification watcher, and rescan the peers.
709 S.handler['NOTE'] = notify
711 pinger.rescan(opts.startup)
715 Initialization to be done before service startup.
717 global errorwatch, childwatch, pinger
718 errorwatch = ErrorWatch()
719 childwatch = ChildWatch()
721 T.Coroutine(dbwatch, name = 'dbwatch').switch()
727 Parse the command-line options.
729 Automatically changes directory to the requested configdir, and turns on
730 debugging. Returns the options object.
732 op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
733 version = '%%prog %s' % VERSION)
735 op.add_option('-a', '--admin-socket',
736 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
737 help = 'Select socket to connect to [default %default]')
738 op.add_option('-d', '--directory',
739 metavar = 'DIR', dest = 'dir', default = T.configdir,
740 help = 'Select current diretory [default %default]')
741 op.add_option('-p', '--peerdb',
742 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
743 help = 'Select peers database [default %default]')
744 op.add_option('--daemon', dest = 'daemon',
745 default = False, action = 'store_true',
746 help = 'Become a daemon after successful initialization')
747 op.add_option('--debug', dest = 'debug',
748 default = False, action = 'store_true',
749 help = 'Emit debugging trace information')
750 op.add_option('--startup', dest = 'startup',
751 default = False, action = 'store_true',
752 help = 'Being called as part of the server startup')
754 opts, args = op.parse_args()
755 if args: op.error('no arguments permitted')
757 T._debug = opts.debug
760 ## Service table, for running manually.
761 service_info = [('watch', T.VERSION, {
762 'adopted': (0, 0, '', cmd_adopted),
763 'kick': (1, 1, 'PEER', cmd_kick)
766 if __name__ == '__main__':
767 opts = parse_options()
768 T.runservices(opts.tripesock, service_info,
769 init = init, setup = setup,
770 daemon = opts.daemon)
772 ###----- That's all, folks --------------------------------------------------