+def boolean(value):
+ """Parse VALUE as a boolean."""
+ return value in ['t', 'true', 'y', 'yes', 'on']
+
+###--------------------------------------------------------------------------
+### Waking up and watching peers.
+
+def run_connect(peer, cmd):
+ """
+ Start the job of connecting to the passive PEER.
+
+ The CMD string is a shell command which will connect to the peer (via some
+ back-channel, say ssh and userv), issue a command
+
+ SVCSUBMIT connect passive [OPTIONS] USER
+
+ and write the resulting challenge to standard error.
+ """
+ q = T.Queue()
+ cmd = Command(['connect', peer.name], q, 'connect',
+ ['/bin/sh', '-c', cmd], None)
+ _, kind, more = q.peek()
+ if kind == 'stdout':
+ if more is None:
+ S.warn('connect', 'connect', peer.name, 'unexpected-eof')
+ else:
+ chal = more
+ S.greet(peer.name, chal)
+ q.get()
+ potwatch('connect', peer.name, q)
+
+def run_disconnect(peer, cmd):
+ """
+ Start the job of disconnecting from a passive PEER.
+
+ The CMD string is a shell command which will disconnect from the peer.
+ """
+ q = T.Queue()
+ cmd = Command(['disconnect', peer.name], q, 'disconnect',
+ ['/bin/sh', '-c', cmd], None)
+ potwatch('disconnect', peer.name, q)
+
+_pingseq = 0
+class PingPeer (object):
+ """
+ Object representing a peer which we are pinging to ensure that it is still
+ present.
+
+ PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an
+ event queue -- which saves us from having an enormous swarm of coroutines
+ -- but most of the actual work is done here.
+
+ In order to avoid confusion between different PingPeer instances for the
+ same actual peer, each PingPeer has a sequence number (its `seq'
+ attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair.
+ (Using the PingPeer instance itself will prevent garbage collection of
+ otherwise defunct instances.)
+ """
+
+ def __init__(me, pinger, queue, peer, pingnow):
+ """
+ Create a new PingPeer.
+
+ The PINGER is the Pinger object we should send the results to. This is
+ used when we remove ourselves, if the peer has been explicitly removed.
+
+ The QUEUE is the event queue on which timer and ping-command events
+ should be written.
+
+ The PEER is a `Peer' object describing the peer.
+
+ If PINGNOW is true, then immediately start pinging the peer. Otherwise
+ wait until the usual retry interval.
+ """
+ global _pingseq
+ me._pinger = pinger
+ me._q = queue
+ me._peer = peer.name
+ me.update(peer)
+ me.seq = _pingseq
+ _pingseq += 1
+ me._failures = 0
+ if pingnow:
+ me._timer = None
+ me._ping()
+ else:
+ me._timer = M.SelTimer(time() + me._every, me._time)
+
+ def update(me, peer):
+ """
+ Refreshes the timer parameters for this peer. We don't, however,
+ immediately reschedule anything: that will happen next time anything
+ interesting happens.
+ """
+ if peer is None: peer = Peer(me._peer)
+ assert peer.name == me._peer
+ me._every = peer.get('every', filter = T.timespec, default = 120)
+ me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
+ me._retries = peer.get('retries', filter = int, default = 5)
+ me._connectp = peer.has('connect')
+ return me
+
+ def _ping(me):
+ """
+ Send a ping to the peer; the result is sent to the Pinger's event queue.
+ """
+ S.rawcommand(T.TripeAsynchronousCommand(
+ me._q, (me._peer, me.seq),
+ ['EPING',
+ '-background', S.bgtag(),
+ '-timeout', str(me._timeout),
+ '--',
+ me._peer]))
+
+ def _reconnect(me):
+ peer = Peer(me._peer)
+ if me._connectp:
+ S.warn('connect', 'reconnecting', me._peer)
+ S.forcekx(me._peer)
+ T.spawn(run_connect, peer, peer.get('connect'))
+ me._timer = M.SelTimer(time() + me._every, me._time)
+ else:
+ S.kill(me._peer)
+
+ def event(me, code, stuff):
+ """
+ Respond to an event which happened to this peer.
+
+ Timer events indicate that we should start a new ping. (The server has
+ its own timeout which detects lost packets.)
+
+ We trap unknown-peer responses and detach from the Pinger.
+
+ If the ping fails and we run out of retries, we attempt to restart the
+ connection.
+ """
+ if code == 'TIMER':
+ me._failures = 0
+ me._ping()
+ elif code == 'FAIL':
+ S.notify('connect', 'ping-failed', me._peer, *stuff)
+ if not stuff:
+ pass
+ elif stuff[0] == 'unknown-peer':
+ me._pinger.kill(me._peer)
+ elif stuff[0] == 'ping-send-failed':
+ me._reconnect()
+ elif code == 'INFO':
+ if stuff[0] == 'ping-ok':
+ if me._failures > 0:
+ S.warn('connect', 'ping-ok', me._peer)
+ me._timer = M.SelTimer(time() + me._every, me._time)
+ elif stuff[0] == 'ping-timeout':
+ me._failures += 1
+ S.warn('connect', 'ping-timeout', me._peer,
+ 'attempt', str(me._failures), 'of', str(me._retries))
+ if me._failures < me._retries:
+ me._ping()
+ else:
+ me._reconnect()
+ elif stuff[0] == 'ping-peer-died':
+ me._pinger.kill(me._peer)
+
+ @T._callback
+ def _time(me):
+ """
+ Handle timer callbacks by posting a timeout event on the queue.
+ """
+ me._timer = None
+ me._q.put(((me._peer, me.seq), 'TIMER', None))
+
+ def __str__(me):
+ return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
+ def __repr__(me):
+ return str(me)
+
+class Pinger (T.Coroutine):
+ """
+ The Pinger keeps track of the peers which we expect to be connected and
+ takes action if they seem to stop responding.
+
+ There is usually only one Pinger, called pinger.
+
+ The Pinger maintains a collection of PingPeer objects, and an event queue.
+ The PingPeers direct the results of their pings, and timer events, to the
+ event queue. The Pinger's coroutine picks items off the queue and
+ dispatches them back to the PingPeers as appropriate.
+ """
+
+ def __init__(me):
+ """Initialize the Pinger."""
+ T.Coroutine.__init__(me)
+ me._peers = {}
+ me._q = T.Queue()
+
+ def run(me):
+ """
+ Coroutine function: reads the pinger queue and sends events to the
+ PingPeer objects they correspond to.
+ """
+ while True:
+ (peer, seq), code, stuff = me._q.get()
+ if peer in me._peers and seq == me._peers[peer].seq:
+ me._peers[peer].event(code, stuff)
+
+ def add(me, peer, pingnow):
+ """
+ Add PEER to the collection of peers under the Pinger's watchful eye.
+ The arguments are as for PingPeer: see above.
+ """
+ me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
+ return me
+
+ def kill(me, peername):
+ """Remove PEER from the peers being watched by the Pinger."""
+ del me._peers[peername]
+ return me
+
+ def rescan(me, startup):
+ """
+ General resynchronization method.
+
+ We scan the list of peers (with connect scripts) known at the server.
+ Any which are known to the Pinger but aren't known to the server are
+ removed from our list; newly arrived peers are added. (Note that a peer
+ can change state here either due to the server sneakily changing its list
+ without issuing notifications or, more likely, the database changing its
+ idea of whether a peer is interesting.) Finally, PingPeers which are
+ still present are prodded to update their timing parameters.
+
+ This method is called once at startup to pick up the peers already
+ installed, and again by the dbwatcher coroutine when it detects a change
+ to the database.
+ """
+ if T._debug: print '# rescan peers'
+ correct = {}
+ start = {}
+ for name in S.list():
+ try: peer = Peer(name)
+ except KeyError: continue
+ if peer.get('watch', filter = boolean, default = False):
+ if T._debug: print '# interesting peer %s' % peer
+ correct[peer.name] = start[peer.name] = peer
+ elif startup:
+ if T._debug: print '# peer %s ready for adoption' % peer
+ start[peer.name] = peer
+ for name, obj in me._peers.items():
+ try:
+ peer = correct[name]
+ except KeyError:
+ if T._debug: print '# peer %s vanished' % name
+ del me._peers[name]
+ else:
+ obj.update(peer)
+ for name, peer in start.iteritems():
+ if name in me._peers: continue
+ if startup:
+ if T._debug: print '# setting up peer %s' % name
+ ifname = S.ifname(name)
+ addr = S.addr(name)
+ T.defer(adoptpeer, peer, ifname, *addr)
+ else:
+ if T._debug: print '# adopting new peer %s' % name
+ me.add(peer, True)
+ return me
+
+ def adopted(me):
+ """
+ Returns the list of peers being watched by the Pinger.
+ """
+ return me._peers.keys()
+
+###--------------------------------------------------------------------------
+### New connections.
+
+def encode_envvars(env, prefix, vars):
+ """
+ Encode the variables in VARS suitably for including in a program
+ environment. Lowercase letters in variable names are forced to uppercase;
+ runs of non-alphanumeric characters are replaced by single underscores; and
+ the PREFIX is prepended. The resulting variables are written to ENV.
+ """
+ for k, v in vars.iteritems():
+ env[prefix + r_bad.sub('_', k.upper())] = v
+
+r_bad = RX.compile(r'[\W_]+')
+def envvars(peer):
+ """
+ Translate the database information for a PEER into a dictionary of
+ environment variables with plausible upper-case names and a P_ prefix.
+ Also collect the crypto information into A_ variables.
+ """
+ env = {}
+ encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
+ encode_envvars(env, 'A_', S.algs(peer.name))
+ return env
+
+def run_ifupdown(what, peer, *args):
+ """
+ Run the interface up/down script for a peer.
+
+ WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. ARGS is a
+ list of arguments to pass to the script, in addition to the peer name.
+
+ The command is run and watched in the background by potwatch.
+ """
+ q = T.Queue()
+ c = Command([what, peer.name], q, what,
+ M.split(peer.get(what), quotep = True)[0] +
+ [peer.name] + list(args),
+ envvars(peer))
+ potwatch(what, peer.name, q)
+
+def adoptpeer(peer, ifname, *addr):
+ """
+ Add a new peer to our collection.
+
+ PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
+ ADDR is the list of tokens representing its address.
+
+ We try to bring up the interface and provoke a connection to the peer if
+ it's passive.
+ """
+ if peer.has('ifup'):
+ T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
+ .switch('ifup', peer, ifname, *addr)
+ cmd = peer.get('connect', default = None)
+ if cmd is not None:
+ T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
+ .switch(peer, cmd)
+ if peer.get('watch', filter = boolean, default = False):
+ pinger.add(peer, False)
+
+def disownpeer(peer):
+ """Drop the PEER from the Pinger and put its interface to bed."""
+ try: pinger.kill(peer)
+ except KeyError: pass
+ cmd = peer.get('disconnect', default = None)
+ if cmd is not None:
+ T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
+ .switch(peer, cmd)
+ if peer.has('ifdown'):
+ T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
+ .switch('ifdown', peer)
+