###--------------------------------------------------------------------------
### Peer database utilities.
-def timespec(info, key, default):
- """Parse INFO[KEY] as a timespec, or return DEFAULT."""
- try:
- return T.timespec(info[key])
- except (KeyError, T.TripeJobError):
- return default
+_magic = ['_magic'] # An object distinct from all others
-def integer(info, key, default):
- """Parse INFO[KEY] as an integer, or return DEFAULT."""
- try:
- return int(info[key])
- except (KeyError, ValueError):
- return default
+class Peer (object):
+ """Representation of a peer in the database."""
-def boolean(info, key, default):
- """Parse INFO[KEY] as a boolean, or return DEFAULT."""
- try:
- return info[key] in ['t', 'true', 'y', 'yes', 'on']
- except (KeyError, ValueError):
- return default
+ def __init__(me, peer, cdb = None):
+ """
+ Create a new peer, named PEER.
-def peerinfo(peer):
- """
- Return a dictionary containing information about PEER from the database.
- """
- return dict(M.URLDecode(CDB.init(opts.cdb)['P' + peer], semip = True))
+ Information about the peer is read from the database CDB, or the default
+ one given on the command-line.
+ """
+ me.name = peer
+ record = (cdb or CDB.init(opts.cdb))['P' + peer]
+ me.__dict__.update(M.URLDecode(record, semip = True))
+
+ def get(me, key, default = _magic, filter = None):
+ """
+ Get the information stashed under KEY from the peer's database record.
+
+ If DEFAULT is given, then use it if the database doesn't contain the
+ necessary information. If no DEFAULT is given, then report an error. If
+ a FILTER function is given then apply it to the information from the
+ database before returning it.
+ """
+ attr = me.__dict__.get(key, default)
+ if attr is _magic:
+ raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
+ elif filter is not None:
+ attr = filter(attr)
+ return attr
+
+ def has(me, key):
+ """
+ Return whether the peer's database record has the KEY.
+ """
+ return key in me.__dict__
+
+ def list(me):
+ """
+ Iterate over the available keys in the peer's database record.
+ """
+ return me.__dict__.iterkeys()
+
+def boolean(value):
+ """Parse VALUE as a boolean."""
+ return value in ['t', 'true', 'y', 'yes', 'on']
###--------------------------------------------------------------------------
### Waking up and watching peers.
-def connect(peer, conn = None):
+def connect(peer, cmd):
"""
Start the job of connecting to the passive PEER.
- The CONN string is a shell command which will connect to the peer (via some
+ The CMD string is a shell command which will connect to the peer (via some
back-channel, say ssh and userv), issue a command
SVCSUBMIT connect passive [OPTIONS] USER
and write the resulting challenge to standard error.
"""
- if conn is None:
- try:
- conn = peerinfo(peer)['connect']
- except KeyError:
- return
q = T.Queue()
- cmd = Command(['connect', peer], q, 'connect',
- ['/bin/sh', '-c', conn], None)
+ cmd = Command(['connect', peer.name], q, 'connect',
+ ['/bin/sh', '-c', cmd], None)
_, kind, more = q.peek()
if kind == 'stdout':
if more is None:
- S.warn('watch', 'connect', peer, 'unexpected-eof')
+ S.warn('watch', 'connect', peer.name, 'unexpected-eof')
else:
chal = more
- S.greet(peer, chal)
+ S.greet(peer.name, chal)
q.get()
- potwatch('connect', peer, q)
+ potwatch('connect', peer.name, q)
+
+def disconnect(peer, cmd):
+ """
+ Start the job of disconnecting from a passive PEER.
+
+ The CMD string is a shell command which will disconnect from the peer.
+ """
+ q = T.Queue()
+ cmd = Command(['disconnect', peer.name], q, 'disconnect',
+ ['/bin/sh', '-c', cmd], None)
+ potwatch('disconnect', peer.name, q)
_pingseq = 0
class PingPeer (object):
otherwise defunct instances.)
"""
- def __init__(me, pinger, queue, peer, info, pingnow):
+ def __init__(me, pinger, queue, peer, pingnow):
"""
Create a new PingPeer.
The QUEUE is the event queue on which timer and ping-command events
should be written.
- The PEER is just the peer's name, as a string.
-
- The INFO is the database record for the peer, as a dictionary, or None if
- it's not readily available. (This is just a tweak to save multiple
- probes if we don't really need them.)
+ The PEER is a `Peer' object describing the peer.
If PINGNOW is true, then immediately start pinging the peer. Otherwise
wait until the usual retry interval.
global _pingseq
me._pinger = pinger
me._q = queue
- me._peer = peer
- me.update(info)
+ me._peer = peer.name
+ me.update(peer)
me.seq = _pingseq
_pingseq += 1
me._failures = 0
else:
me._timer = M.SelTimer(time() + me._every, me._time)
- def update(me, info):
+ def update(me, peer):
"""
Refreshes the timer parameters for this peer. We don't, however,
immediately reschedule anything: that will happen next time anything
interesting happens.
"""
- if info is None:
- info = peerinfo(me._peer)
- me._every = timespec(info, 'every', 120)
- me._timeout = timespec(info, 'timeout', 10)
- me._retries = integer(info, 'retries', 5)
- me._connectp = 'connect' in info
+ if peer is None: peer = Peer(me._peer)
+ assert peer.name == me._peer
+ me._every = peer.get('every', filter = T.timespec, default = 120)
+ me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
+ me._retries = peer.get('retries', filter = int, default = 5)
+ me._connectp = peer.has('connect')
return me
def _ping(me):
me._peer]))
def _reconnect(me):
- info = peerinfo(me._peer)
- if 'connect' in info:
+ peer = Peer(me._peer)
+ if peer.has('connect'):
S.warn('watch', 'reconnecting', me._peer)
S.forcekx(me._peer)
- T.spawn(connect, me._peer)
+ T.spawn(connect, peer, peer.get('connect'))
me._timer = M.SelTimer(time() + me._every, me._time)
else:
S.kill(me._peer)
if peer in me._peers and seq == me._peers[peer].seq:
me._peers[peer].event(code, stuff)
- def add(me, peer, info, pingnow):
+ def add(me, peer, pingnow):
"""
Add PEER to the collection of peers under the Pinger's watchful eye.
The arguments are as for PingPeer: see above.
"""
- me._peers[peer] = PingPeer(me, me._q, peer, info, pingnow)
+ me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
return me
- def kill(me, peer):
+ def kill(me, peername):
"""Remove PEER from the peers being watched by the Pinger."""
- del me._peers[peer]
+ del me._peers[peername]
return me
def rescan(me, startup):
installed, and again by the dbwatcher coroutine when it detects a change
to the database.
"""
+ if T._debug: print '# rescan peers'
correct = {}
- for peer in S.list():
+ start = {}
+ for name in S.list():
+ try: peer = Peer(name)
+ except KeyError: continue
+ if peer.get('watch', filter = boolean, default = False):
+ if T._debug: print '# interesting peer %s' % peer
+ correct[peer.name] = start[peer.name] = peer
+ elif startup:
+ if T._debug: print '# peer %s ready for adoption' % peer
+ start[peer.name] = peer
+ for name, obj in me._peers.items():
try:
- info = peerinfo(peer)
+ peer = correct[name]
except KeyError:
- continue
- if boolean(info, 'watch', False):
- correct[peer] = info
- for peer, obj in me._peers.items():
- if peer in correct:
- obj.update(correct[peer])
+ if T._debug: print '# peer %s vanished' % name
+ del me._peers[name]
else:
- del me._peers[peer]
- for peer, info in correct.iteritems():
- if peer not in me._peers:
- if startup:
- ifname = S.ifname(peer)
- addr = S.addr(peer)
- addpeer(info, peer, ifname, *addr)
- else:
- me.add(peer, info, True)
+ obj.update(peer)
+ for name, peer in start.iteritems():
+ if name in me._peers: continue
+ if startup:
+ if T._debug: print '# setting up peer %s' % name
+ ifname = S.ifname(name)
+ addr = S.addr(name)
+ T.defer(addpeer, peer, ifname, *addr)
+ else:
+ if T._debug: print '# adopting new peer %s' % name
+ me.add(peer, True)
return me
def adopted(me):
env[prefix + r_bad.sub('_', k.upper())] = v
r_bad = RX.compile(r'[\W_]+')
-def envvars(info):
+def envvars(peer):
"""
- Translate the database INFO dictionary for a peer into a dictionary of
+ Translate the database information for a PEER into a dictionary of
environment variables with plausible upper-case names and a P_ prefix.
Also collect the crypto information into A_ variables.
"""
env = {}
- encode_envvars(env, 'P_', info)
- encode_envvars(env, 'A_', S.algs())
+ encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
+ encode_envvars(env, 'A_', S.algs(peer.name))
return env
-def ifupdown(what, peer, info, *args):
+def ifupdown(what, peer, *args):
"""
Run the interface up/down script for a peer.
- WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. INFO is the
- database record dictionary. ARGS is a list of arguments to pass to the
- script, in addition to the peer name.
+ WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. ARGS is a
+ list of arguments to pass to the script, in addition to the peer name.
The command is run and watched in the background by potwatch.
"""
q = T.Queue()
- c = Command([what, peer], q, what,
- M.split(info[what], quotep = True)[0] +
- [peer] + list(args),
- envvars(info))
- potwatch(what, peer, q)
+ c = Command([what, peer.name], q, what,
+ M.split(peer.get(what), quotep = True)[0] +
+ [peer.name] + list(args),
+ envvars(peer))
+ potwatch(what, peer.name, q)
-def addpeer(info, peer, ifname, *addr):
+def addpeer(peer, ifname, *addr):
"""
Add a new peer to our collection.
- INFO is the peer information dictionary, or None if we don't have one yet.
-
- PEER names the peer; IFNAME is the interface name for its tunnel; and ADDR
- is the list of tokens representing its address.
+ PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
+ ADDR is the list of tokens representing its address.
We try to bring up the interface and provoke a connection to the peer if
it's passive.
"""
- if info is None:
- try:
- info = peerinfo(peer)
- except KeyError:
- return
- if 'ifup' in info:
- T.Coroutine(ifupdown).switch('ifup', peer, info, ifname, *addr)
- if 'connect' in info:
- T.Coroutine(connect).switch(peer, info['connect'])
- if boolean(info, 'watch', False):
- pinger.add(peer, info, False)
+ if peer.has('ifup'):
+ T.Coroutine(ifupdown, name = 'ifup %s' % peer.name) \
+ .switch('ifup', peer, ifname, *addr)
+ cmd = peer.get('connect', default = None)
+ if cmd is not None:
+ T.Coroutine(connect, name = 'connect %s' % peer.name) \
+ .switch(peer, cmd)
+ if peer.get('watch', filter = boolean, default = False):
+ pinger.add(peer, False)
def delpeer(peer):
"""Drop the PEER from the Pinger and put its interface to bed."""
- try:
- info = peerinfo(peer)
- except KeyError:
- return
- try:
- pinger.kill(peer)
- except KeyError:
- pass
- if 'ifdown' in info:
- T.Coroutine(ifupdown).switch('ifdown', peer, info)
+ try: pinger.kill(peer)
+ except KeyError: pass
+ cmd = peer.get('disconnect', default = None)
+ if cmd is not None:
+ T.Coroutine(disconnect, name = 'disconnect %s' % peer.name) \
+ .switch(peer, cmd)
+ if peer.has('ifdown'):
+ T.Coroutine(ifupdown, name = 'ifdown %s' % peer.name) \
+ .switch('ifdown', peer)
def notify(_, code, *rest):
"""
delpeer respectively.
"""
if code == 'ADD':
- addpeer(None, *rest)
+ try: p = Peer(rest[0])
+ except KeyError: return
+ addpeer(p, *rest[1:])
elif code == 'KILL':
- delpeer(*rest)
+ try: p = Peer(rest[0])
+ except KeyError: return
+ delpeer(p, *rest[1:])
###--------------------------------------------------------------------------
### Command stubs.
def cmd_stub(*args):
raise T.TripeJobError('not-implemented')
-def cmd_kick(peer):
+def cmd_kick(name):
"""
- kick PEER: Force a new connection attempt for PEER
+ kick NAME: Force a new connection attempt for the NAMEd peer.
"""
- if peer not in pinger.adopted():
- raise T.TripeJobError('peer-not-adopted', peer)
+ if name not in pinger.adopted():
+ raise T.TripeJobError('peer-not-adopted', name)
+ try: peer = Peer(name)
+ except KeyError: raise T.TripeJobError('unknown-peer', name)
T.spawn(connect, peer)
def cmd_adopted():
"""
adopted: Report a list of adopted peers.
"""
- for peer in pinger.adopted():
- T.svcinfo(peer)
+ for name in pinger.adopted():
+ T.svcinfo(name)
###--------------------------------------------------------------------------
### Start up.
errorwatch = ErrorWatch()
childwatch = ChildWatch()
pinger = Pinger()
- T.Coroutine(dbwatch).switch()
+ T.Coroutine(dbwatch, name = 'dbwatch').switch()
errorwatch.switch()
pinger.switch()