4 ### Watch arrival and departure of peers
6 ### (c) 2007 Straylight/Edgeware
9 ###----- Licensing notice ---------------------------------------------------
11 ### This file is part of Trivial IP Encryption (TrIPE).
13 ### TrIPE is free software; you can redistribute it and/or modify
14 ### it under the terms of the GNU General Public License as published by
15 ### the Free Software Foundation; either version 2 of the License, or
16 ### (at your option) any later version.
18 ### TrIPE is distributed in the hope that it will be useful,
19 ### but WITHOUT ANY WARRANTY; without even the implied warranty of
20 ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 ### GNU General Public License for more details.
23 ### You should have received a copy of the GNU General Public License
24 ### along with TrIPE; if not, write to the Free Software Foundation,
25 ### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
29 ###--------------------------------------------------------------------------
30 ### External dependencies.
32 from optparse import OptionParser
41 import subprocess as PROC
45 ###--------------------------------------------------------------------------
46 ### Running auxiliary commands.
48 class SelLineQueue (M.SelLineBuffer):
49 """Glues the select-line-buffer into the coroutine queue system."""
51 def __new__(cls, file, queue, tag, kind):
52 """See __init__ for documentation."""
53 return M.SelLineBuffer.__new__(cls, file.fileno())
55 def __init__(me, file, queue, tag, kind):
57 Initialize a new line-reading adaptor.
59 The adaptor reads lines from FILE. Each line is inserted as a message of
60 the stated KIND, bearing the TAG, into the QUEUE. End-of-file is
71 me._q.put((me._tag, me._kind, line))
76 me._q.put((me._tag, me._kind, None))
78 class ErrorWatch (T.Coroutine):
80 An object which watches stderr streams for errors and converts them into
83 WARN watch INFO stderr LINE
85 The INFO is a list of tokens associated with the file when it was
88 Usually there is a single ErrorWatch object, called errorwatch.
92 """Initialization: there are no arguments."""
93 T.Coroutine.__init__(me)
98 def watch(me, file, info):
100 Adds FILE to the collection of files to watch.
102 INFO will be written in the warning messages from this FILE. Returns a
103 sequence number which can be used to unregister the file again.
107 me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
110 def unwatch(me, seq):
111 """Stop watching the file with sequence number SEQ."""
117 Coroutine function: read items from the queue and report them.
119 Unregisters files automatically when they reach EOF.
122 seq, _, line = me._q.get()
126 S.warn(*['watch'] + me._map[seq][0] + ['stderr', line])
130 Coroutine function: wake up every second and notice changes to the
131 database. When a change happens, tell the Pinger (q.v.) to rescan its
134 cr = T.Coroutine.getcurrent()
136 fw = M.FWatch(opts.cdb)
138 timer = M.SelTimer(time() + 1, lambda: cr.switch())
142 S.notify('watch', 'peerdb-update')
144 class ChildWatch (M.SelSignal):
146 An object which watches for specified processes exiting and reports
147 terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
149 There is usually only one ChildWatch object, called childwatch.
153 """Initialize the child-watcher."""
154 return M.SelSignal.__new__(cls, SIG.SIGCHLD)
157 """Initialize the child-watcher."""
161 def watch(me, pid, queue, tag):
163 Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE)
164 to the QUEUE, where CODE is one of
166 * None (successful termination)
167 * ['exit-nonzero', CODE] (CODE is a string!)
168 * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
169 * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
171 me._pid[pid] = queue, tag
174 def unwatch(me, pid):
175 """Unregister PID as a child to watch."""
182 Called when child processes exit: collect exit statuses and report
187 pid, status = OS.waitpid(-1, OS.WNOHANG)
189 if exc.errno == E.ECHILD:
193 if pid not in me._pid:
195 queue, tag = me._pid[pid]
196 if OS.WIFEXITED(status):
197 exit = OS.WEXITSTATUS(status)
201 code = ['exit-nonzero', str(exit)]
202 elif OS.WIFSIGNALED(status):
203 code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
205 code = ['exit-unknown', hex(status)]
206 queue.put((tag, 'exit', code))
208 class Command (object):
210 Represents a running command.
212 This class is the main interface to the machery provided by the ChildWatch
213 and ErrorWatch objects. See also potwatch.
216 def __init__(me, info, queue, tag, args, env):
218 Start a new child process.
220 The ARGS are a list of arguments to be given to the child process. The
221 ENV is either None or a dictionary of environment variable assignments to
222 override the extant environment. INFO is a list of tokens to be included
223 in warnings about the child's stderr output. If the child writes a line
224 to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the
225 child exits, write (TAG, 'exit', CODE) to the QUEUE.
230 myenv = OS.environ.copy()
231 if env: myenv.update(env)
232 me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
233 stdout = PROC.PIPE, stderr = PROC.PIPE)
234 me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
235 errorwatch.watch(me._proc.stderr, info)
236 childwatch.watch(me._proc.pid, queue, tag)
240 If I've been forgotten then stop watching for termination.
242 childwatch.unwatch(me._proc.pid)
244 def potwatch(what, name, q):
246 Watch the queue Q for activity as reported by a Command object.
248 Information from the process's stdout is reported as
250 NOTE WHAT NAME stdout LINE
252 abnormal termination is reported as
256 where CODE is what the ChildWatch wrote.
259 while not deadp or not eofp:
260 _, kind, more = q.get()
265 S.notify('watch', what, name, 'stdout', more)
267 if more: S.warn('watch', what, name, *more)
270 ###--------------------------------------------------------------------------
271 ### Peer database utilities.
273 _magic = ['_magic'] # An object distinct from all others
276 """Representation of a peer in the database."""
278 def __init__(me, peer, cdb = None):
280 Create a new peer, named PEER.
282 Information about the peer is read from the database CDB, or the default
283 one given on the command-line.
286 record = (cdb or CDB.init(opts.cdb))['P' + peer]
287 me.__dict__.update(M.URLDecode(record, semip = True))
289 def get(me, key, default = _magic, filter = None):
291 Get the information stashed under KEY from the peer's database record.
293 If DEFAULT is given, then use it if the database doesn't contain the
294 necessary information. If no DEFAULT is given, then report an error. If
295 a FILTER function is given then apply it to the information from the
296 database before returning it.
298 attr = me.__dict__.get(key, default)
300 raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
301 elif filter is not None:
307 Return whether the peer's database record has the KEY.
309 return key in me.__dict__
313 Iterate over the available keys in the peer's database record.
315 return me.__dict__.iterkeys()
318 """Parse VALUE as a boolean."""
319 return value in ['t', 'true', 'y', 'yes', 'on']
321 ###--------------------------------------------------------------------------
322 ### Waking up and watching peers.
324 def run_connect(peer, cmd):
326 Start the job of connecting to the passive PEER.
328 The CMD string is a shell command which will connect to the peer (via some
329 back-channel, say ssh and userv), issue a command
331 SVCSUBMIT connect passive [OPTIONS] USER
333 and write the resulting challenge to standard error.
336 cmd = Command(['connect', peer.name], q, 'connect',
337 ['/bin/sh', '-c', cmd], None)
338 _, kind, more = q.peek()
341 S.warn('watch', 'connect', peer.name, 'unexpected-eof')
344 S.greet(peer.name, chal)
346 potwatch('connect', peer.name, q)
348 def run_disconnect(peer, cmd):
350 Start the job of disconnecting from a passive PEER.
352 The CMD string is a shell command which will disconnect from the peer.
355 cmd = Command(['disconnect', peer.name], q, 'disconnect',
356 ['/bin/sh', '-c', cmd], None)
357 potwatch('disconnect', peer.name, q)
360 class PingPeer (object):
362 Object representing a peer which we are pinging to ensure that it is still
365 PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an
366 event queue -- which saves us from having an enormous swarm of coroutines
367 -- but most of the actual work is done here.
369 In order to avoid confusion between different PingPeer instances for the
370 same actual peer, each PingPeer has a sequence number (its `seq'
371 attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair.
372 (Using the PingPeer instance itself will prevent garbage collection of
373 otherwise defunct instances.)
376 def __init__(me, pinger, queue, peer, pingnow):
378 Create a new PingPeer.
380 The PINGER is the Pinger object we should send the results to. This is
381 used when we remove ourselves, if the peer has been explicitly removed.
383 The QUEUE is the event queue on which timer and ping-command events
386 The PEER is a `Peer' object describing the peer.
388 If PINGNOW is true, then immediately start pinging the peer. Otherwise
389 wait until the usual retry interval.
403 me._timer = M.SelTimer(time() + me._every, me._time)
405 def update(me, peer):
407 Refreshes the timer parameters for this peer. We don't, however,
408 immediately reschedule anything: that will happen next time anything
411 if peer is None: peer = Peer(me._peer)
412 assert peer.name == me._peer
413 me._every = peer.get('every', filter = T.timespec, default = 120)
414 me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
415 me._retries = peer.get('retries', filter = int, default = 5)
416 me._connectp = peer.has('connect')
421 Send a ping to the peer; the result is sent to the Pinger's event queue.
423 S.rawcommand(T.TripeAsynchronousCommand(
424 me._q, (me._peer, me.seq),
426 '-background', S.bgtag(),
427 '-timeout', str(me._timeout),
432 peer = Peer(me._peer)
433 if peer.has('connect'):
434 S.warn('watch', 'reconnecting', me._peer)
436 T.spawn(run_connect, peer, peer.get('connect'))
437 me._timer = M.SelTimer(time() + me._every, me._time)
441 def event(me, code, stuff):
443 Respond to an event which happened to this peer.
445 Timer events indicate that we should start a new ping. (The server has
446 its own timeout which detects lost packets.)
448 We trap unknown-peer responses and detach from the Pinger.
450 If the ping fails and we run out of retries, we attempt to restart the
457 S.notify('watch', 'ping-failed', me._peer, *stuff)
460 elif stuff[0] == 'unknown-peer':
461 me._pinger.kill(me._peer)
462 elif stuff[0] == 'ping-send-failed':
465 if stuff[0] == 'ping-ok':
467 S.warn('watch', 'ping-ok', me._peer)
468 me._timer = M.SelTimer(time() + me._every, me._time)
469 elif stuff[0] == 'ping-timeout':
471 S.warn('watch', 'ping-timeout', me._peer,
472 'attempt', str(me._failures), 'of', str(me._retries))
473 if me._failures < me._retries:
477 elif stuff[0] == 'ping-peer-died':
478 me._pinger.kill(me._peer)
483 Handle timer callbacks by posting a timeout event on the queue.
486 me._q.put(((me._peer, me.seq), 'TIMER', None))
489 return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
493 class Pinger (T.Coroutine):
495 The Pinger keeps track of the peers which we expect to be connected and
496 takes action if they seem to stop responding.
498 There is usually only one Pinger, called pinger.
500 The Pinger maintains a collection of PingPeer objects, and an event queue.
501 The PingPeers direct the results of their pings, and timer events, to the
502 event queue. The Pinger's coroutine picks items off the queue and
503 dispatches them back to the PingPeers as appropriate.
507 """Initialize the Pinger."""
508 T.Coroutine.__init__(me)
514 Coroutine function: reads the pinger queue and sends events to the
515 PingPeer objects they correspond to.
518 (peer, seq), code, stuff = me._q.get()
519 if peer in me._peers and seq == me._peers[peer].seq:
520 me._peers[peer].event(code, stuff)
522 def add(me, peer, pingnow):
524 Add PEER to the collection of peers under the Pinger's watchful eye.
525 The arguments are as for PingPeer: see above.
527 me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
530 def kill(me, peername):
531 """Remove PEER from the peers being watched by the Pinger."""
532 del me._peers[peername]
535 def rescan(me, startup):
537 General resynchronization method.
539 We scan the list of peers (with connect scripts) known at the server.
540 Any which are known to the Pinger but aren't known to the server are
541 removed from our list; newly arrived peers are added. (Note that a peer
542 can change state here either due to the server sneakily changing its list
543 without issuing notifications or, more likely, the database changing its
544 idea of whether a peer is interesting.) Finally, PingPeers which are
545 still present are prodded to update their timing parameters.
547 This method is called once at startup to pick up the peers already
548 installed, and again by the dbwatcher coroutine when it detects a change
551 if T._debug: print '# rescan peers'
554 for name in S.list():
555 try: peer = Peer(name)
556 except KeyError: continue
557 if peer.get('watch', filter = boolean, default = False):
558 if T._debug: print '# interesting peer %s' % peer
559 correct[peer.name] = start[peer.name] = peer
561 if T._debug: print '# peer %s ready for adoption' % peer
562 start[peer.name] = peer
563 for name, obj in me._peers.items():
567 if T._debug: print '# peer %s vanished' % name
571 for name, peer in start.iteritems():
572 if name in me._peers: continue
574 if T._debug: print '# setting up peer %s' % name
575 ifname = S.ifname(name)
577 T.defer(adoptpeer, peer, ifname, *addr)
579 if T._debug: print '# adopting new peer %s' % name
585 Returns the list of peers being watched by the Pinger.
587 return me._peers.keys()
589 ###--------------------------------------------------------------------------
592 def encode_envvars(env, prefix, vars):
594 Encode the variables in VARS suitably for including in a program
595 environment. Lowercase letters in variable names are forced to uppercase;
596 runs of non-alphanumeric characters are replaced by single underscores; and
597 the PREFIX is prepended. The resulting variables are written to ENV.
599 for k, v in vars.iteritems():
600 env[prefix + r_bad.sub('_', k.upper())] = v
602 r_bad = RX.compile(r'[\W_]+')
605 Translate the database information for a PEER into a dictionary of
606 environment variables with plausible upper-case names and a P_ prefix.
607 Also collect the crypto information into A_ variables.
610 encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
611 encode_envvars(env, 'A_', S.algs(peer.name))
614 def run_ifupdown(what, peer, *args):
616 Run the interface up/down script for a peer.
618 WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. ARGS is a
619 list of arguments to pass to the script, in addition to the peer name.
621 The command is run and watched in the background by potwatch.
624 c = Command([what, peer.name], q, what,
625 M.split(peer.get(what), quotep = True)[0] +
626 [peer.name] + list(args),
628 potwatch(what, peer.name, q)
630 def adoptpeer(peer, ifname, *addr):
632 Add a new peer to our collection.
634 PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
635 ADDR is the list of tokens representing its address.
637 We try to bring up the interface and provoke a connection to the peer if
641 T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
642 .switch('ifup', peer, ifname, *addr)
643 cmd = peer.get('connect', default = None)
645 T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
647 if peer.get('watch', filter = boolean, default = False):
648 pinger.add(peer, False)
650 def disownpeer(peer):
651 """Drop the PEER from the Pinger and put its interface to bed."""
652 try: pinger.kill(peer)
653 except KeyError: pass
654 cmd = peer.get('disconnect', default = None)
656 T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
658 if peer.has('ifdown'):
659 T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
660 .switch('ifdown', peer)
662 def notify(_, code, *rest):
664 Watch for notifications.
666 We trap ADD and KILL notifications, and send them straight to addpeer and
667 delpeer respectively.
670 try: p = Peer(rest[0])
671 except KeyError: return
672 adoptpeer(p, *rest[1:])
674 try: p = Peer(rest[0])
675 except KeyError: return
676 disownpeer(p, *rest[1:])
678 ###--------------------------------------------------------------------------
682 raise T.TripeJobError('not-implemented')
686 kick NAME: Force a new connection attempt for the NAMEd peer.
688 if name not in pinger.adopted():
689 raise T.TripeJobError('peer-not-adopted', name)
690 try: peer = Peer(name)
691 except KeyError: raise T.TripeJobError('unknown-peer', name)
692 T.spawn(connect, peer)
696 adopted: Report a list of adopted peers.
698 for name in pinger.adopted():
701 ###--------------------------------------------------------------------------
708 Register the notification watcher, and rescan the peers.
710 S.handler['NOTE'] = notify
712 pinger.rescan(opts.startup)
716 Initialization to be done before service startup.
718 global errorwatch, childwatch, pinger
719 errorwatch = ErrorWatch()
720 childwatch = ChildWatch()
722 T.Coroutine(dbwatch, name = 'dbwatch').switch()
728 Parse the command-line options.
730 Automatically changes directory to the requested configdir, and turns on
731 debugging. Returns the options object.
733 op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
734 version = '%%prog %s' % VERSION)
736 op.add_option('-a', '--admin-socket',
737 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
738 help = 'Select socket to connect to [default %default]')
739 op.add_option('-d', '--directory',
740 metavar = 'DIR', dest = 'dir', default = T.configdir,
741 help = 'Select current diretory [default %default]')
742 op.add_option('-p', '--peerdb',
743 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
744 help = 'Select peers database [default %default]')
745 op.add_option('--daemon', dest = 'daemon',
746 default = False, action = 'store_true',
747 help = 'Become a daemon after successful initialization')
748 op.add_option('--debug', dest = 'debug',
749 default = False, action = 'store_true',
750 help = 'Emit debugging trace information')
751 op.add_option('--startup', dest = 'startup',
752 default = False, action = 'store_true',
753 help = 'Being called as part of the server startup')
755 opts, args = op.parse_args()
756 if args: op.error('no arguments permitted')
758 T._debug = opts.debug
761 ## Service table, for running manually.
762 service_info = [('watch', T.VERSION, {
763 'adopted': (0, 0, '', cmd_adopted),
764 'kick': (1, 1, 'PEER', cmd_kick)
767 if __name__ == '__main__':
768 opts = parse_options()
769 T.runservices(opts.tripesock, service_info,
770 init = init, setup = setup,
771 daemon = opts.daemon)
773 ###----- That's all, folks --------------------------------------------------