4 ### Connect to remote peers, and keep track of them
6 ### (c) 2007 Straylight/Edgeware
9 ###----- Licensing notice ---------------------------------------------------
11 ### This file is part of Trivial IP Encryption (TrIPE).
13 ### TrIPE is free software: you can redistribute it and/or modify it under
14 ### the terms of the GNU General Public License as published by the Free
15 ### Software Foundation; either version 3 of the License, or (at your
16 ### option) any later version.
18 ### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19 ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20 ### FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
23 ### You should have received a copy of the GNU General Public License
24 ### along with TrIPE. If not, see <https://www.gnu.org/licenses/>.
28 ###--------------------------------------------------------------------------
29 ### External dependencies.
31 from optparse import OptionParser
40 import subprocess as PROC
44 ###--------------------------------------------------------------------------
45 ### Running auxiliary commands.
47 class SelLineQueue (M.SelLineBuffer):
48 """Glues the select-line-buffer into the coroutine queue system."""
50 def __new__(cls, file, queue, tag, kind):
51 """See __init__ for documentation."""
52 return M.SelLineBuffer.__new__(cls, file.fileno())
54 def __init__(me, file, queue, tag, kind):
56 Initialize a new line-reading adaptor.
58 The adaptor reads lines from FILE. Each line is inserted as a message of
59 the stated KIND, bearing the TAG, into the QUEUE. End-of-file is
70 me._q.put((me._tag, me._kind, line))
75 me._q.put((me._tag, me._kind, None))
77 class ErrorWatch (T.Coroutine):
79 An object which watches stderr streams for errors and converts them into
82 WARN connect INFO stderr LINE
84 The INFO is a list of tokens associated with the file when it was
87 Usually there is a single ErrorWatch object, called errorwatch.
91 """Initialization: there are no arguments."""
92 T.Coroutine.__init__(me)
97 def watch(me, file, info):
99 Adds FILE to the collection of files to watch.
101 INFO will be written in the warning messages from this FILE. Returns a
102 sequence number which can be used to unregister the file again.
106 me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
109 def unwatch(me, seq):
110 """Stop watching the file with sequence number SEQ."""
116 Coroutine function: read items from the queue and report them.
118 Unregisters files automatically when they reach EOF.
121 seq, _, line = me._q.get()
125 S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
129 Coroutine function: wake up every minute and notice changes to the
130 database. When a change happens, tell the Pinger (q.v.) to rescan its
133 cr = T.Coroutine.getcurrent()
135 fw = M.FWatch(opts.cdb)
137 timer = M.SelTimer(time() + 60, lambda: cr.switch())
141 S.notify('connect', 'peerdb-update')
143 class ChildWatch (M.SelSignal):
145 An object which watches for specified processes exiting and reports
146 terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
148 There is usually only one ChildWatch object, called childwatch.
152 """Initialize the child-watcher."""
153 return M.SelSignal.__new__(cls, SIG.SIGCHLD)
156 """Initialize the child-watcher."""
160 def watch(me, pid, queue, tag):
162 Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE)
163 to the QUEUE, where CODE is one of
165 * None (successful termination)
166 * ['exit-nonzero', CODE] (CODE is a string!)
167 * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
168 * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
170 me._pid[pid] = queue, tag
173 def unwatch(me, pid):
174 """Unregister PID as a child to watch."""
181 Called when child processes exit: collect exit statuses and report
186 pid, status = OS.waitpid(-1, OS.WNOHANG)
188 if exc.errno == E.ECHILD:
192 if pid not in me._pid:
194 queue, tag = me._pid[pid]
195 if OS.WIFEXITED(status):
196 exit = OS.WEXITSTATUS(status)
200 code = ['exit-nonzero', str(exit)]
201 elif OS.WIFSIGNALED(status):
202 code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
204 code = ['exit-unknown', hex(status)]
205 queue.put((tag, 'exit', code))
207 class Command (object):
209 Represents a running command.
211 This class is the main interface to the machery provided by the ChildWatch
212 and ErrorWatch objects. See also potwatch.
215 def __init__(me, info, queue, tag, args, env):
217 Start a new child process.
219 The ARGS are a list of arguments to be given to the child process. The
220 ENV is either None or a dictionary of environment variable assignments to
221 override the extant environment. INFO is a list of tokens to be included
222 in warnings about the child's stderr output. If the child writes a line
223 to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the
224 child exits, write (TAG, 'exit', CODE) to the QUEUE.
229 myenv = OS.environ.copy()
230 if env: myenv.update(env)
231 me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
232 stdout = PROC.PIPE, stderr = PROC.PIPE)
233 me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
234 errorwatch.watch(me._proc.stderr, info)
235 childwatch.watch(me._proc.pid, queue, tag)
239 If I've been forgotten then stop watching for termination.
241 childwatch.unwatch(me._proc.pid)
243 def potwatch(what, name, q):
245 Watch the queue Q for activity as reported by a Command object.
247 Information from the process's stdout is reported as
249 NOTE WHAT NAME stdout LINE
251 abnormal termination is reported as
255 where CODE is what the ChildWatch wrote.
258 while not deadp or not eofp:
259 _, kind, more = q.get()
264 S.notify('connect', what, name, 'stdout', more)
266 if more: S.warn('connect', what, name, *more)
269 ###--------------------------------------------------------------------------
270 ### Peer database utilities.
272 _magic = ['_magic'] # An object distinct from all others
275 """Representation of a peer in the database."""
277 def __init__(me, peer, cdb = None):
279 Create a new peer, named PEER.
281 Information about the peer is read from the database CDB, or the default
282 one given on the command-line.
285 record = (cdb or CDB.init(opts.cdb))['P' + peer]
286 me.__dict__.update(M.URLDecode(record, semip = True))
288 def get(me, key, default = _magic, filter = None):
290 Get the information stashed under KEY from the peer's database record.
292 If DEFAULT is given, then use it if the database doesn't contain the
293 necessary information. If no DEFAULT is given, then report an error. If
294 a FILTER function is given then apply it to the information from the
295 database before returning it.
297 attr = me.__dict__.get(key, default)
299 raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
300 elif filter is not None:
306 Return whether the peer's database record has the KEY.
308 return key in me.__dict__
312 Iterate over the available keys in the peer's database record.
314 return me.__dict__.iterkeys()
317 """Parse VALUE as a boolean."""
318 return value in ['t', 'true', 'y', 'yes', 'on']
320 ###--------------------------------------------------------------------------
321 ### Waking up and watching peers.
323 def run_connect(peer, cmd):
325 Start the job of connecting to the passive PEER.
327 The CMD string is a shell command which will connect to the peer (via some
328 back-channel, say ssh and userv), issue a command
330 SVCSUBMIT connect passive [OPTIONS] USER
332 and write the resulting challenge to standard error.
335 cmd = Command(['connect', peer.name], q, 'connect',
336 ['/bin/sh', '-c', cmd], None)
337 _, kind, more = q.peek()
340 S.warn('connect', 'connect', peer.name, 'unexpected-eof')
343 S.greet(peer.name, chal)
345 potwatch('connect', peer.name, q)
347 def run_disconnect(peer, cmd):
349 Start the job of disconnecting from a passive PEER.
351 The CMD string is a shell command which will disconnect from the peer.
354 cmd = Command(['disconnect', peer.name], q, 'disconnect',
355 ['/bin/sh', '-c', cmd], None)
356 potwatch('disconnect', peer.name, q)
359 class PingPeer (object):
361 Object representing a peer which we are pinging to ensure that it is still
364 PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an
365 event queue -- which saves us from having an enormous swarm of coroutines
366 -- but most of the actual work is done here.
368 In order to avoid confusion between different PingPeer instances for the
369 same actual peer, each PingPeer has a sequence number (its `seq'
370 attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair.
371 (Using the PingPeer instance itself will prevent garbage collection of
372 otherwise defunct instances.)
375 def __init__(me, pinger, queue, peer, pingnow):
377 Create a new PingPeer.
379 The PINGER is the Pinger object we should send the results to. This is
380 used when we remove ourselves, if the peer has been explicitly removed.
382 The QUEUE is the event queue on which timer and ping-command events
385 The PEER is a `Peer' object describing the peer.
387 If PINGNOW is true, then immediately start pinging the peer. Otherwise
388 wait until the usual retry interval.
402 me._timer = M.SelTimer(time() + me._every, me._time)
404 def update(me, peer):
406 Refreshes the timer parameters for this peer. We don't, however,
407 immediately reschedule anything: that will happen next time anything
410 if peer is None: peer = Peer(me._peer)
411 assert peer.name == me._peer
412 me._every = peer.get('every', filter = T.timespec, default = 120)
413 me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
414 me._retries = peer.get('retries', filter = int, default = 5)
415 me._connectp = peer.has('connect')
420 Send a ping to the peer; the result is sent to the Pinger's event queue.
422 S.rawcommand(T.TripeAsynchronousCommand(
423 me._q, (me._peer, me.seq),
425 '-background', S.bgtag(),
426 '-timeout', str(me._timeout),
431 peer = Peer(me._peer)
433 S.warn('connect', 'reconnecting', me._peer)
435 T.spawn(run_connect, peer, peer.get('connect'))
436 me._timer = M.SelTimer(time() + me._every, me._time)
440 def event(me, code, stuff):
442 Respond to an event which happened to this peer.
444 Timer events indicate that we should start a new ping. (The server has
445 its own timeout which detects lost packets.)
447 We trap unknown-peer responses and detach from the Pinger.
449 If the ping fails and we run out of retries, we attempt to restart the
456 S.notify('connect', 'ping-failed', me._peer, *stuff)
459 elif stuff[0] == 'unknown-peer':
460 me._pinger.kill(me._peer)
461 elif stuff[0] == 'ping-send-failed':
464 if stuff[0] == 'ping-ok':
466 S.warn('connect', 'ping-ok', me._peer)
467 me._timer = M.SelTimer(time() + me._every, me._time)
468 elif stuff[0] == 'ping-timeout':
470 S.warn('connect', 'ping-timeout', me._peer,
471 'attempt', str(me._failures), 'of', str(me._retries))
472 if me._failures < me._retries:
476 elif stuff[0] == 'ping-peer-died':
477 me._pinger.kill(me._peer)
482 Handle timer callbacks by posting a timeout event on the queue.
485 me._q.put(((me._peer, me.seq), 'TIMER', None))
488 return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
492 class Pinger (T.Coroutine):
494 The Pinger keeps track of the peers which we expect to be connected and
495 takes action if they seem to stop responding.
497 There is usually only one Pinger, called pinger.
499 The Pinger maintains a collection of PingPeer objects, and an event queue.
500 The PingPeers direct the results of their pings, and timer events, to the
501 event queue. The Pinger's coroutine picks items off the queue and
502 dispatches them back to the PingPeers as appropriate.
506 """Initialize the Pinger."""
507 T.Coroutine.__init__(me)
513 Coroutine function: reads the pinger queue and sends events to the
514 PingPeer objects they correspond to.
517 (peer, seq), code, stuff = me._q.get()
518 if peer in me._peers and seq == me._peers[peer].seq:
519 me._peers[peer].event(code, stuff)
521 def add(me, peer, pingnow):
523 Add PEER to the collection of peers under the Pinger's watchful eye.
524 The arguments are as for PingPeer: see above.
526 me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
529 def kill(me, peername):
530 """Remove PEER from the peers being watched by the Pinger."""
531 try: del me._peers[peername]
532 except KeyError: pass
535 def rescan(me, startup):
537 General resynchronization method.
539 We scan the list of peers (with connect scripts) known at the server.
540 Any which are known to the Pinger but aren't known to the server are
541 removed from our list; newly arrived peers are added. (Note that a peer
542 can change state here either due to the server sneakily changing its list
543 without issuing notifications or, more likely, the database changing its
544 idea of whether a peer is interesting.) Finally, PingPeers which are
545 still present are prodded to update their timing parameters.
547 This method is called once at startup to pick up the peers already
548 installed, and again by the dbwatcher coroutine when it detects a change
551 if T._debug: print '# rescan peers'
554 for name in S.list():
555 try: peer = Peer(name)
556 except KeyError: continue
557 if peer.get('watch', filter = boolean, default = False):
558 if T._debug: print '# interesting peer %s' % peer
559 correct[peer.name] = start[peer.name] = peer
561 if T._debug: print '# peer %s ready for adoption' % peer
562 start[peer.name] = peer
563 for name, obj in me._peers.items():
567 if T._debug: print '# peer %s vanished' % name
571 for name, peer in start.iteritems():
572 if name in me._peers: continue
574 if T._debug: print '# setting up peer %s' % name
575 ifname = S.ifname(name)
577 T.defer(adoptpeer, peer, ifname, *addr)
579 if T._debug: print '# adopting new peer %s' % name
585 Returns the list of peers being watched by the Pinger.
587 return me._peers.keys()
589 ###--------------------------------------------------------------------------
592 def encode_envvars(env, prefix, vars):
594 Encode the variables in VARS suitably for including in a program
595 environment. Lowercase letters in variable names are forced to uppercase;
596 runs of non-alphanumeric characters are replaced by single underscores; and
597 the PREFIX is prepended. The resulting variables are written to ENV.
599 for k, v in vars.iteritems():
600 env[prefix + r_bad.sub('_', k.upper())] = v
602 r_bad = RX.compile(r'[\W_]+')
605 Translate the database information for a PEER into a dictionary of
606 environment variables with plausible upper-case names and a P_ prefix.
607 Also collect the crypto information into A_ variables.
610 encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
611 encode_envvars(env, 'A_', S.algs(peer.name))
614 def run_ifupdown(what, peer, *args):
616 Run the interface up/down script for a peer.
618 WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. ARGS is a
619 list of arguments to pass to the script, in addition to the peer name.
621 The command is run and watched in the background by potwatch.
624 c = Command([what, peer.name], q, what,
625 M.split(peer.get(what), quotep = True)[0] +
626 [peer.name] + list(args),
628 potwatch(what, peer.name, q)
630 def adoptpeer(peer, ifname, *addr):
632 Add a new peer to our collection.
634 PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
635 ADDR is the list of tokens representing its address.
637 We try to bring up the interface and provoke a connection to the peer if
641 T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
642 .switch('ifup', peer, ifname, *addr)
643 cmd = peer.get('connect', default = None)
645 T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
647 if peer.get('watch', filter = boolean, default = False):
648 pinger.add(peer, False)
650 def disownpeer(peer):
651 """Drop the PEER from the Pinger and put its interface to bed."""
652 try: pinger.kill(peer)
653 except KeyError: pass
654 cmd = peer.get('disconnect', default = None)
656 T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
658 if peer.has('ifdown'):
659 T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
660 .switch('ifdown', peer)
662 def addpeer(peer, addr):
664 Process a connect request from a new peer PEER on address ADDR.
666 Any existing peer with this name is disconnected from the server.
668 if peer.name in S.list():
671 booltrue = ['t', 'true', 'y', 'yes', 'on']
673 tunnel = peer.get('tunnel', None),
674 keepalive = peer.get('keepalive', None),
675 key = peer.get('key', None),
676 priv = peer.get('priv', None),
677 mobile = peer.get('mobile', 'nil') in booltrue,
678 cork = peer.get('cork', 'nil') in booltrue,
680 except T.TripeError, exc:
681 raise T.TripeJobError(*exc.args)
683 ## Dictionary mapping challenges to waiting passive-connection coroutines.
686 def notify(_, code, *rest):
688 Watch for notifications.
690 We trap ADD and KILL notifications, and send them straight to adoptpeer and
691 disownpeer respectively; and dispatch GREET notifications to the
692 corresponding waiting coroutine.
695 try: p = Peer(rest[0])
696 except KeyError: return
697 adoptpeer(p, *rest[1:])
699 try: p = Peer(rest[0])
700 except KeyError: return
701 disownpeer(p, *rest[1:])
702 elif code == 'GREET':
704 try: cr = chalmap[chal]
705 except KeyError: pass
706 else: cr.switch(rest[1:])
708 ###--------------------------------------------------------------------------
709 ### Command implementation.
713 kick NAME: Force a new connection attempt for the NAMEd peer.
715 if name not in pinger.adopted():
716 raise T.TripeJobError('peer-not-adopted', name)
717 try: peer = Peer(name)
718 except KeyError: raise T.TripeJobError('unknown-peer', name)
719 T.spawn(run_connect, peer, peer.get('connect'))
723 adopted: Report a list of adopted peers.
725 for name in pinger.adopted():
728 def cmd_active(name):
730 active NAME: Handle an active connection request for the peer called NAME.
732 The appropriate address is read from the database automatically.
734 try: peer = Peer(name)
735 except KeyError: raise T.TripeJobError('unknown-peer', name)
736 addr = peer.get('peer')
737 if addr == 'PASSIVE':
738 raise T.TripeJobError('passive-peer', name)
739 addpeer(peer, M.split(addr, quotep = True)[0])
741 def cmd_listactive():
743 list: Report a list of the available active peers.
745 cdb = CDB.init(opts.cdb)
746 for key in cdb.keys():
747 if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
752 info NAME: Report the database entries for the named peer.
754 try: peer = Peer(name)
755 except KeyError: raise T.TripeJobError('unknown-peer', name)
756 items = list(peer.list())
759 T.svcinfo('%s=%s' % (i, peer.get(i)))
761 def cmd_userpeer(user):
763 userpeer USER: Report the peer name for the named user.
765 try: name = CDB.init(opts.cdb)['U' + user]
766 except KeyError: raise T.TripeJobError('unknown-user', user)
769 def cmd_passive(*args):
771 passive [OPTIONS] USER: Await the arrival of the named USER.
773 Report a challenge; when (and if!) the server receives a greeting quoting
774 this challenge, add the corresponding peer to the server.
777 op = T.OptParse(args, ['-timeout'])
779 if opt == '-timeout':
780 timeout = T.timespec(op.arg())
781 user, = op.rest(1, 1)
782 try: name = CDB.init(opts.cdb)['U' + user]
783 except KeyError: raise T.TripeJobError('unknown-user', user)
784 try: peer = Peer(name)
785 except KeyError: raise T.TripeJobError('unknown-peer', name)
787 cr = T.Coroutine.getcurrent()
788 timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
792 addr = cr.parent.switch()
794 raise T.TripeJobError('connect-timeout')
799 ###--------------------------------------------------------------------------
806 Register the notification watcher, rescan the peers, and add automatic
809 S.handler['NOTE'] = notify
812 pinger.rescan(opts.startup)
815 cdb = CDB.init(opts.cdb)
820 for name in M.split(autos)[0]:
822 peer = Peer(name, cdb)
823 addpeer(peer, M.split(peer.get('peer'), quotep = True)[0])
824 except T.TripeJobError, err:
825 S.warn('connect', 'auto-add-failed', name, *err.args)
829 Initialization to be done before service startup.
831 global errorwatch, childwatch, pinger
832 errorwatch = ErrorWatch()
833 childwatch = ChildWatch()
835 T.Coroutine(dbwatch, name = 'dbwatch').switch()
841 Parse the command-line options.
843 Automatically changes directory to the requested configdir, and turns on
844 debugging. Returns the options object.
846 op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
847 version = '%%prog %s' % VERSION)
849 op.add_option('-a', '--admin-socket',
850 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
851 help = 'Select socket to connect to [default %default]')
852 op.add_option('-d', '--directory',
853 metavar = 'DIR', dest = 'dir', default = T.configdir,
854 help = 'Select current diretory [default %default]')
855 op.add_option('-p', '--peerdb',
856 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
857 help = 'Select peers database [default %default]')
858 op.add_option('--daemon', dest = 'daemon',
859 default = False, action = 'store_true',
860 help = 'Become a daemon after successful initialization')
861 op.add_option('--debug', dest = 'debug',
862 default = False, action = 'store_true',
863 help = 'Emit debugging trace information')
864 op.add_option('--startup', dest = 'startup',
865 default = False, action = 'store_true',
866 help = 'Being called as part of the server startup')
868 opts, args = op.parse_args()
869 if args: op.error('no arguments permitted')
871 T._debug = opts.debug
874 ## Service table, for running manually.
875 service_info = [('connect', T.VERSION, {
876 'adopted': (0, 0, '', cmd_adopted),
877 'kick': (1, 1, 'PEER', cmd_kick),
878 'passive': (1, None, '[OPTIONS] USER', cmd_passive),
879 'active': (1, 1, 'PEER', cmd_active),
880 'info': (1, 1, 'PEER', cmd_info),
881 'list-active': (0, 0, '', cmd_listactive),
882 'userpeer': (1, 1, 'USER', cmd_userpeer)
885 if __name__ == '__main__':
886 opts = parse_options()
887 T.runservices(opts.tripesock, service_info,
888 init = init, setup = setup,
889 daemon = opts.daemon)
891 ###----- That's all, folks --------------------------------------------------