chiark / gitweb /
server/tests.at: Abstract out the wait-for-knock machinery.
[tripe] / svc / connect.in
1 #! @PYTHON@
2 ### -*-python-*-
3 ###
4 ### Connect to remote peers, and keep track of them
5 ###
6 ### (c) 2007 Straylight/Edgeware
7 ###
8
9 ###----- Licensing notice ---------------------------------------------------
10 ###
11 ### This file is part of Trivial IP Encryption (TrIPE).
12 ###
13 ### TrIPE is free software: you can redistribute it and/or modify it under
14 ### the terms of the GNU General Public License as published by the Free
15 ### Software Foundation; either version 3 of the License, or (at your
16 ### option) any later version.
17 ###
18 ### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19 ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20 ### FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
21 ### for more details.
22 ###
23 ### You should have received a copy of the GNU General Public License
24 ### along with TrIPE.  If not, see <https://www.gnu.org/licenses/>.
25
26 VERSION = '@VERSION@'
27
28 ###--------------------------------------------------------------------------
29 ### External dependencies.
30
31 from optparse import OptionParser
32 import tripe as T
33 import os as OS
34 import signal as SIG
35 import errno as E
36 from math import sqrt
37 import cdb as CDB
38 import mLib as M
39 import re as RX
40 import sys as SYS
41 from time import time
42 import subprocess as PROC
43
44 S = T.svcmgr
45
46 ###--------------------------------------------------------------------------
47 ### Running auxiliary commands.
48
49 class SelLineQueue (M.SelLineBuffer):
50   """Glues the select-line-buffer into the coroutine queue system."""
51
52   def __new__(cls, file, queue, tag, kind):
53     """See __init__ for documentation."""
54     return M.SelLineBuffer.__new__(cls, file.fileno())
55
56   def __init__(me, file, queue, tag, kind):
57     """
58     Initialize a new line-reading adaptor.
59
60     The adaptor reads lines from FILE.  Each line is inserted as a message of
61     the stated KIND, bearing the TAG, into the QUEUE.  End-of-file is
62     represented as None.
63     """
64     me._q = queue
65     me._file = file
66     me._tag = tag
67     me._kind = kind
68     me.enable()
69
70   @T._callback
71   def line(me, line):
72     me._q.put((me._tag, me._kind, line))
73
74   @T._callback
75   def eof(me):
76     me.disable()
77     me._q.put((me._tag, me._kind, None))
78
79 class ErrorWatch (T.Coroutine):
80   """
81   An object which watches stderr streams for errors and converts them into
82   warnings of the form
83
84     WARN connect INFO stderr LINE
85
86   The INFO is a list of tokens associated with the file when it was
87   registered.
88
89   Usually there is a single ErrorWatch object, called errorwatch.
90   """
91
92   def __init__(me):
93     """Initialization: there are no arguments."""
94     T.Coroutine.__init__(me)
95     me._q = T.Queue()
96     me._map = {}
97     me._seq = 1
98
99   def watch(me, file, info):
100     """
101     Adds FILE to the collection of files to watch.
102
103     INFO will be written in the warning messages from this FILE.  Returns a
104     sequence number which can be used to unregister the file again.
105     """
106     seq = me._seq
107     me._seq += 1
108     me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
109     return seq
110
111   def unwatch(me, seq):
112     """Stop watching the file with sequence number SEQ."""
113     del me._map[seq]
114     return me
115
116   def run(me):
117     """
118     Coroutine function: read items from the queue and report them.
119
120     Unregisters files automatically when they reach EOF.
121     """
122     while True:
123       seq, _, line = me._q.get()
124       if line is None:
125         me.unwatch(seq)
126       else:
127         S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
128
129 def dbwatch():
130   """
131   Coroutine function: wake up every minute and notice changes to the
132   database.  When a change happens, tell the Pinger (q.v.) to rescan its
133   peers.
134   """
135   cr = T.Coroutine.getcurrent()
136   main = cr.parent
137   fw = M.FWatch(opts.cdb)
138   while True:
139     timer = M.SelTimer(time() + 60, lambda: cr.switch())
140     main.switch()
141     if fw.update():
142       pinger.rescan(False)
143       S.notify('connect', 'peerdb-update')
144
145 class ChildWatch (M.SelSignal):
146   """
147   An object which watches for specified processes exiting and reports
148   terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
149
150   There is usually only one ChildWatch object, called childwatch.
151   """
152
153   def __new__(cls):
154     """Initialize the child-watcher."""
155     return M.SelSignal.__new__(cls, SIG.SIGCHLD)
156
157   def __init__(me):
158     """Initialize the child-watcher."""
159     me._pid = {}
160     me.enable()
161
162   def watch(me, pid, queue, tag):
163     """
164     Register PID as a child to watch.  If it exits, write (TAG, 'exit', CODE)
165     to the QUEUE, where CODE is one of
166
167       * None (successful termination)
168       * ['exit-nonzero', CODE] (CODE is a string!)
169       * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
170       * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
171     """
172     me._pid[pid] = queue, tag
173     return me
174
175   def unwatch(me, pid):
176     """Unregister PID as a child to watch."""
177     del me._pid[pid]
178     return me
179
180   @T._callback
181   def signalled(me):
182     """
183     Called when child processes exit: collect exit statuses and report
184     failures.
185     """
186     while True:
187       try:
188         pid, status = OS.waitpid(-1, OS.WNOHANG)
189       except OSError, exc:
190         if exc.errno == E.ECHILD:
191           break
192       if pid == 0:
193         break
194       if pid not in me._pid:
195         continue
196       queue, tag = me._pid[pid]
197       if OS.WIFEXITED(status):
198         exit = OS.WEXITSTATUS(status)
199         if exit == 0:
200           code = None
201         else:
202           code = ['exit-nonzero', str(exit)]
203       elif OS.WIFSIGNALED(status):
204         code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
205       else:
206         code = ['exit-unknown', hex(status)]
207       queue.put((tag, 'exit', code))
208
209 class Command (object):
210   """
211   Represents a running command.
212
213   This class is the main interface to the machery provided by the ChildWatch
214   and ErrorWatch objects.  See also potwatch.
215   """
216
217   def __init__(me, info, queue, tag, args, env):
218     """
219     Start a new child process.
220
221     The ARGS are a list of arguments to be given to the child process.  The
222     ENV is either None or a dictionary of environment variable assignments to
223     override the extant environment.  INFO is a list of tokens to be included
224     in warnings about the child's stderr output.  If the child writes a line
225     to standard output, put (TAG, 'stdout', LINE) to the QUEUE.  When the
226     child exits, write (TAG, 'exit', CODE) to the QUEUE.
227     """
228     me._info = info
229     me._q = queue
230     me._tag = tag
231     myenv = OS.environ.copy()
232     if env: myenv.update(env)
233     me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
234                           stdout = PROC.PIPE, stderr = PROC.PIPE)
235     me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
236     errorwatch.watch(me._proc.stderr, info)
237     childwatch.watch(me._proc.pid, queue, tag)
238
239   def __del__(me):
240     """
241     If I've been forgotten then stop watching for termination.
242     """
243     childwatch.unwatch(me._proc.pid)
244
245 def potwatch(what, name, q):
246   """
247   Watch the queue Q for activity as reported by a Command object.
248
249   Information from the process's stdout is reported as
250
251     NOTE WHAT NAME stdout LINE
252
253   abnormal termination is reported as
254
255     WARN WHAT NAME CODE
256
257   where CODE is what the ChildWatch wrote.
258   """
259   eofp = deadp = False
260   while not deadp or not eofp:
261     _, kind, more = q.get()
262     if kind == 'stdout':
263       if more is None:
264         eofp = True
265       else:
266         S.notify('connect', what, name, 'stdout', more)
267     elif kind == 'exit':
268       if more: S.warn('connect', what, name, *more)
269       deadp = True
270
271 ###--------------------------------------------------------------------------
272 ### Peer database utilities.
273
274 _magic = ['_magic']                     # An object distinct from all others
275
276 class Peer (object):
277   """Representation of a peer in the database."""
278
279   def __init__(me, peer, cdb = None):
280     """
281     Create a new peer, named PEER.
282
283     Information about the peer is read from the database CDB, or the default
284     one given on the command-line.
285     """
286     me.name = peer
287     record = (cdb or CDB.init(opts.cdb))['P' + peer]
288     me.__dict__.update(M.URLDecode(record, semip = True))
289
290   def get(me, key, default = _magic, filter = None):
291     """
292     Get the information stashed under KEY from the peer's database record.
293
294     If DEFAULT is given, then use it if the database doesn't contain the
295     necessary information.  If no DEFAULT is given, then report an error.  If
296     a FILTER function is given then apply it to the information from the
297     database before returning it.
298     """
299     try:
300       attr = me.__dict__[key]
301     except KeyError:
302       if default is _magic:
303         raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
304       return default
305     else:
306       if filter is not None: attr = filter(attr)
307       return attr
308
309   def has(me, key):
310     """
311     Return whether the peer's database record has the KEY.
312     """
313     return key in me.__dict__
314
315   def list(me):
316     """
317     Iterate over the available keys in the peer's database record.
318     """
319     return me.__dict__.iterkeys()
320
321 def boolean(value):
322   """Parse VALUE as a boolean."""
323   return value in ['t', 'true', 'y', 'yes', 'on']
324
325 ###--------------------------------------------------------------------------
326 ### Waking up and watching peers.
327
328 def run_connect(peer, cmd):
329   """
330   Start the job of connecting to the passive PEER.
331
332   The CMD string is a shell command which will connect to the peer (via some
333   back-channel, say ssh and userv), issue a command
334
335     SVCSUBMIT connect passive [OPTIONS] USER
336
337   and write the resulting challenge to standard error.
338   """
339   q = T.Queue()
340   cmd = Command(['connect', peer.name], q, 'connect',
341                 ['/bin/sh', '-c', cmd], None)
342   _, kind, more = q.peek()
343   if kind == 'stdout':
344     if more is None:
345       S.warn('connect', 'connect', peer.name, 'unexpected-eof')
346     else:
347       chal = more
348       S.greet(peer.name, chal)
349       q.get()
350   potwatch('connect', peer.name, q)
351
352 def run_disconnect(peer, cmd):
353   """
354   Start the job of disconnecting from a passive PEER.
355
356   The CMD string is a shell command which will disconnect from the peer.
357   """
358   q = T.Queue()
359   cmd = Command(['disconnect', peer.name], q, 'disconnect',
360                 ['/bin/sh', '-c', cmd], None)
361   potwatch('disconnect', peer.name, q)
362
363 _pingseq = 0
364 class PingPeer (object):
365   """
366   Object representing a peer which we are pinging to ensure that it is still
367   present.
368
369   PingPeer objects are held by the Pinger (q.v.).  The Pinger maintains an
370   event queue -- which saves us from having an enormous swarm of coroutines
371   -- but most of the actual work is done here.
372
373   In order to avoid confusion between different PingPeer instances for the
374   same actual peer, each PingPeer has a sequence number (its `seq'
375   attribute).  Events for the PingPeer are identified by a (PEER, SEQ) pair.
376   (Using the PingPeer instance itself will prevent garbage collection of
377   otherwise defunct instances.)
378   """
379
380   def __init__(me, pinger, queue, peer, pingnow):
381     """
382     Create a new PingPeer.
383
384     The PINGER is the Pinger object we should send the results to.  This is
385     used when we remove ourselves, if the peer has been explicitly removed.
386
387     The QUEUE is the event queue on which timer and ping-command events
388     should be written.
389
390     The PEER is a `Peer' object describing the peer.
391
392     If PINGNOW is true, then immediately start pinging the peer.  Otherwise
393     wait until the usual retry interval.
394     """
395     global _pingseq
396     me._pinger = pinger
397     me._q = queue
398     me._peer = peer.name
399     me.update(peer)
400     me.seq = _pingseq
401     _pingseq += 1
402     me._failures = 0
403     me._sabotage = False
404     me._last = '-'
405     me._nping = 0
406     me._nlost = 0
407     me._sigma_t = 0
408     me._sigma_t2 = 0
409     me._min = me._max = '-'
410     now = time()
411     if pingnow:
412       me._timer = None
413       me._ping()
414     else:
415       me._timer = M.SelTimer(now + me._every, me._time)
416     me._last_reconn = now
417
418   def update(me, peer):
419     """
420     Refreshes the timer parameters for this peer.  We don't, however,
421     immediately reschedule anything: that will happen next time anything
422     interesting happens.
423     """
424     if peer is None: peer = Peer(me._peer)
425     assert peer.name == me._peer
426     me._every = peer.get('every', filter = T.timespec, default = 120)
427     me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
428     me._retries = peer.get('retries', filter = int, default = 5)
429     me._connectp = peer.has('connect')
430     me._knockp = peer.has('knock')
431     return me
432
433   def _ping(me):
434     """
435     Send a ping to the peer; the result is sent to the Pinger's event queue.
436     """
437     S.rawcommand(T.TripeAsynchronousCommand(
438       me._q, (me._peer, me.seq),
439       ['EPING',
440        '-background', S.bgtag(),
441        '-timeout', str(me._timeout),
442        '--',
443        me._peer]))
444
445   def _reconnect(me, now):
446     now = time()
447     try:
448       peer = Peer(me._peer)
449       if me._connectp or me._knockp:
450         S.warn('connect', 'reconnecting', me._peer)
451         S.forcekx(me._peer, quiet = not me._knockp)
452         if me._connectp: T.spawn(run_connect, peer, peer.get('connect'))
453         me._timer = M.SelTimer(now + me._every, me._time)
454         me._sabotage = False
455         me._last_reconn = now
456       else:
457         S.kill(me._peer)
458     except T.TripeError, e:
459       if e.args[0] == 'unknown-peer': me._pinger.kill(me._peer)
460
461   def reconnect(me):
462     """
463     Attempt reconnection to the peer.
464
465     Applies rate-limiting so that we don't hammer a remote peer just because
466     we notice several problems in a short time interval.
467     """
468     now = time()
469     if now >= me._last_reconn + 5: me._reconnect(now)
470
471   def event(me, code, stuff):
472     """
473     Respond to an event which happened to this peer.
474
475     Timer events indicate that we should start a new ping.  (The server has
476     its own timeout which detects lost packets.)
477
478     We trap unknown-peer responses and detach from the Pinger.
479
480     If the ping fails and we run out of retries, we attempt to restart the
481     connection.
482     """
483     now = time()
484     if code == 'TIMER':
485       me._failures = 0
486       me._ping()
487     elif code == 'FAIL':
488       S.notify('connect', 'ping-failed', me._peer, *stuff)
489       if not stuff: pass
490       elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer)
491       elif stuff[0] == 'ping-send-failed': me._reconnect(now)
492     elif code == 'INFO':
493       outcome = stuff[0]
494       if outcome == 'ping-ok' and me._sabotage:
495         outcome = 'ping-timeout'
496       if outcome == 'ping-ok':
497         if me._failures > 0: S.warn('connect', 'ping-ok', me._peer)
498         t = float(stuff[1])
499         me._last = '%.1fms' % t
500         me._sigma_t += t
501         me._sigma_t2 += t*t
502         me._nping += 1
503         if me._min == '-' or t < me._min: me._min = t
504         if me._max == '-' or t > me._max: me._max = t
505         me._timer = M.SelTimer(now + me._every, me._time)
506       elif outcome == 'ping-timeout':
507         me._failures += 1
508         me._nlost += 1
509         S.warn('connect', 'ping-timeout', me._peer,
510                'attempt', str(me._failures), 'of', str(me._retries))
511         if me._failures < me._retries:
512           me._ping()
513           me._last = 'timeout'
514         else:
515           me._reconnect(now)
516           me._last = 'reconnect'
517       elif outcome == 'ping-peer-died':
518         me._pinger.kill(me._peer)
519
520   def sabotage(me):
521     """Sabotage the peer, for testing purposes."""
522     me._sabotage = True
523     if me._timer: me._timer.kill()
524     T.defer(me._time)
525
526   def info(me):
527     if not me._nping:
528       mean = sd = min = max = '-'
529     else:
530       meanval = me._sigma_t/me._nping
531       mean = '%.1fms' % meanval
532       sd = '%.1fms' % sqrt(me._sigma_t2/me._nping - meanval*meanval)
533       min = '%.1fms' % me._min
534       max = '%.1fms' % me._max
535     n = me._nping + me._nlost
536     if not n: pclost = '-'
537     else: pclost = '%d' % ((100*me._nlost + n//2)//n)
538     return { 'last-ping': me._last,
539              'mean-ping': mean,
540              'sd-ping': sd,
541              'n-ping': '%d' % me._nping,
542              'n-lost': '%d' % me._nlost,
543              'percent-lost': pclost,
544              'min-ping': min,
545              'max-ping': max,
546              'state': me._timer and 'idle' or 'check',
547              'failures': str(me._failures) }
548
549   @T._callback
550   def _time(me):
551     """
552     Handle timer callbacks by posting a timeout event on the queue.
553     """
554     me._timer = None
555     me._q.put(((me._peer, me.seq), 'TIMER', None))
556
557   def __str__(me):
558     return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
559   def __repr__(me):
560     return str(me)
561
562 class Pinger (T.Coroutine):
563   """
564   The Pinger keeps track of the peers which we expect to be connected and
565   takes action if they seem to stop responding.
566
567   There is usually only one Pinger, called pinger.
568
569   The Pinger maintains a collection of PingPeer objects, and an event queue.
570   The PingPeers direct the results of their pings, and timer events, to the
571   event queue.  The Pinger's coroutine picks items off the queue and
572   dispatches them back to the PingPeers as appropriate.
573   """
574
575   def __init__(me):
576     """Initialize the Pinger."""
577     T.Coroutine.__init__(me)
578     me._peers = {}
579     me._q = T.Queue()
580
581   def run(me):
582     """
583     Coroutine function: reads the pinger queue and sends events to the
584     PingPeer objects they correspond to.
585     """
586     while True:
587       (peer, seq), code, stuff = me._q.get()
588       if peer in me._peers and seq == me._peers[peer].seq:
589         try: me._peers[peer].event(code, stuff)
590         except Exception, e:
591           SYS.excepthook(*SYS.exc_info())
592
593   def add(me, peer, pingnow):
594     """
595     Add PEER to the collection of peers under the Pinger's watchful eye.
596     The arguments are as for PingPeer: see above.
597     """
598     me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
599     return me
600
601   def kill(me, peername):
602     """Remove PEER from the peers being watched by the Pinger."""
603     try: del me._peers[peername]
604     except KeyError: pass
605     return me
606
607   def rescan(me, startup):
608     """
609     General resynchronization method.
610
611     We scan the list of peers (with connect scripts) known at the server.
612     Any which are known to the Pinger but aren't known to the server are
613     removed from our list; newly arrived peers are added.  (Note that a peer
614     can change state here either due to the server sneakily changing its list
615     without issuing notifications or, more likely, the database changing its
616     idea of whether a peer is interesting.)  Finally, PingPeers which are
617     still present are prodded to update their timing parameters.
618
619     This method is called once at startup to pick up the peers already
620     installed, and again by the dbwatcher coroutine when it detects a change
621     to the database.
622     """
623     if T._debug: print '# rescan peers'
624     correct = {}
625     start = {}
626     for name in S.list():
627       try: peer = Peer(name)
628       except KeyError: continue
629       if peer.get('watch', filter = boolean, default = False):
630         if T._debug: print '# interesting peer %s' % peer
631         correct[peer.name] = start[peer.name] = peer
632       elif startup:
633         if T._debug: print '# peer %s ready for adoption' % peer
634         start[peer.name] = peer
635     for name, obj in me._peers.items():
636       try:
637         peer = correct[name]
638       except KeyError:
639         if T._debug: print '# peer %s vanished' % name
640         del me._peers[name]
641       else:
642         obj.update(peer)
643     for name, peer in start.iteritems():
644       if name in me._peers: continue
645       if startup:
646         if T._debug: print '# setting up peer %s' % name
647         ifname = S.ifname(name)
648         addr = S.addr(name)
649         T.defer(adoptpeer, peer, ifname, *addr)
650       else:
651         if T._debug: print '# adopting new peer %s' % name
652         me.add(peer, True)
653     return me
654
655   def adopted(me):
656     """
657     Returns the list of peers being watched by the Pinger.
658     """
659     return me._peers.keys()
660
661   def find(me, name):
662     """Return the PingPeer with the given name."""
663     return me._peers[name]
664
665 ###--------------------------------------------------------------------------
666 ### New connections.
667
668 def encode_envvars(env, prefix, vars):
669   """
670   Encode the variables in VARS suitably for including in a program
671   environment.  Lowercase letters in variable names are forced to uppercase;
672   runs of non-alphanumeric characters are replaced by single underscores; and
673   the PREFIX is prepended.  The resulting variables are written to ENV.
674   """
675   for k, v in vars.iteritems():
676     env[prefix + r_bad.sub('_', k.upper())] = v
677
678 r_bad = RX.compile(r'[\W_]+')
679 def envvars(peer):
680   """
681   Translate the database information for a PEER into a dictionary of
682   environment variables with plausible upper-case names and a P_ prefix.
683   Also collect the crypto information into A_ variables.
684   """
685   env = {}
686   encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
687   encode_envvars(env, 'A_', S.algs(peer.name))
688   return env
689
690 def run_ifupdown(what, peer, *args):
691   """
692   Run the interface up/down script for a peer.
693
694   WHAT is 'ifup' or 'ifdown'.  PEER names the peer in question.  ARGS is a
695   list of arguments to pass to the script, in addition to the peer name.
696
697   The command is run and watched in the background by potwatch.
698   """
699   q = T.Queue()
700   c = Command([what, peer.name], q, what,
701               M.split(peer.get(what), quotep = True)[0] +
702               [peer.name] + list(args),
703               envvars(peer))
704   potwatch(what, peer.name, q)
705
706 def adoptpeer(peer, ifname, *addr):
707   """
708   Add a new peer to our collection.
709
710   PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
711   ADDR is the list of tokens representing its address.
712
713   We try to bring up the interface and provoke a connection to the peer if
714   it's passive.
715   """
716   if peer.has('ifup'):
717     T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
718         .switch('ifup', peer, ifname, *addr)
719   cmd = peer.get('connect', default = None)
720   if cmd is not None:
721     T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
722         .switch(peer, cmd)
723   if peer.get('watch', filter = boolean, default = False):
724     pinger.add(peer, False)
725
726 def disownpeer(peer):
727   """Drop the PEER from the Pinger and put its interface to bed."""
728   try: pinger.kill(peer)
729   except KeyError: pass
730   cmd = peer.get('disconnect', default = None)
731   if cmd is not None:
732     T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
733         .switch(peer, cmd)
734   if peer.has('ifdown'):
735     T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
736         .switch('ifdown', peer)
737
738 def addpeer(peer, addr, ephemp):
739   """
740   Process a connect request from a new peer PEER on address ADDR.
741
742   Any existing peer with this name is disconnected from the server.  EPHEMP
743   is the default ephemeral-ness state for the new peer.
744   """
745   if peer.name in S.list():
746     S.kill(peer.name)
747   try:
748     S.add(peer.name,
749           tunnel = peer.get('tunnel', default = None),
750           keepalive = peer.get('keepalive', default = None),
751           key = peer.get('key', default = None),
752           priv = peer.get('priv', default = None),
753           mobile = peer.get('mobile', filter = boolean, default = False),
754           knock = peer.get('knock', default = None),
755           cork = peer.get('cork', filter = boolean, default = False),
756           ephemeral = peer.get('ephemeral', filter = boolean,
757                                default = ephemp),
758           *addr)
759   except T.TripeError, exc:
760     raise T.TripeJobError(*exc.args)
761
762 ## Dictionary mapping challenges to waiting passive-connection coroutines.
763 chalmap = {}
764
765 def notify(_, code, *rest):
766   """
767   Watch for notifications.
768
769   We trap ADD and KILL notifications, and send them straight to adoptpeer and
770   disownpeer respectively; and dispatch GREET notifications to the
771   corresponding waiting coroutine.
772   """
773   if code == 'ADD':
774     try: p = Peer(rest[0])
775     except KeyError: pass
776     else: adoptpeer(p, *rest[1:])
777   elif code == 'KILL':
778     try: p = Peer(rest[0])
779     except KeyError: pass
780     else: disownpeer(p, *rest[1:])
781   elif code == 'GREET':
782     chal = rest[0]
783     try: cr = chalmap[chal]
784     except KeyError: pass
785     else: cr.switch(rest[1:])
786   elif code == 'KNOCK':
787     try: p = Peer(rest[0])
788     except KeyError:
789       S.warn(['connect', 'knock-unknown-peer', rest[0]])
790       return
791     if p.get('peer') != 'PASSIVE':
792       S.warn(['connect', 'knock-active-peer', p.name])
793       return
794     dot = p.name.find('.')
795     if dot >= 0: kname = p.name[dot + 1:]
796     else: kname = p.name
797     ktag = p.get('key', p.name)
798     if kname != ktag:
799       S.warn(['connect', 'knock-tag-mismatch',
800               'peer', pname, 'public-key-tag', ktag])
801       return
802     T.spawn(addpeer, p, rest[1:], True)
803
804 ###--------------------------------------------------------------------------
805 ### Command implementation.
806
807 def cmd_kick(name):
808   """
809   kick NAME: Force a new connection attempt for the NAMEd peer.
810   """
811   try: pp = pinger.find(name)
812   except KeyError: raise T.TripeJobError('peer-not-adopted', name)
813   try: peer = Peer(name)
814   except KeyError: raise T.TripeJobError('unknown-peer', name)
815   conn = peer.get('connect', None)
816   if conn: T.spawn(run_connect, peer, peer.get('connect'))
817   else: T.spawn(lambda p: S.forcekx(p.name), peer)
818
819 def cmd_adopted():
820   """
821   adopted: Report a list of adopted peers.
822   """
823   for name in pinger.adopted():
824     T.svcinfo(name)
825
826 def cmd_active(name):
827   """
828   active NAME: Handle an active connection request for the peer called NAME.
829
830   The appropriate address is read from the database automatically.
831   """
832   try: peer = Peer(name)
833   except KeyError: raise T.TripeJobError('unknown-peer', name)
834   addr = peer.get('peer')
835   if addr == 'PASSIVE':
836     raise T.TripeJobError('passive-peer', name)
837   addpeer(peer, M.split(addr, quotep = True)[0], True)
838
839 def cmd_listactive():
840   """
841   list: Report a list of the available active peers.
842   """
843   cdb = CDB.init(opts.cdb)
844   for key in cdb.keys():
845     if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
846       T.svcinfo(key[1:])
847
848 def cmd_info(name):
849   """
850   info NAME: Report the database entries for the named peer.
851   """
852   try: peer = Peer(name)
853   except KeyError: raise T.TripeJobError('unknown-peer', name)
854   d = {}
855   try: pp = pinger.find(name)
856   except KeyError: pass
857   else: d.update(pp.info())
858   items = list(peer.list()) + d.keys()
859   items.sort()
860   for i in items:
861     try: v = d[i]
862     except KeyError: v = peer.get(i)
863     T.svcinfo('%s=%s' % (i, v.replace('\n', ' ')))
864
865 def cmd_userpeer(user):
866   """
867   userpeer USER: Report the peer name for the named user.
868   """
869   try: name = CDB.init(opts.cdb)['U' + user]
870   except KeyError: raise T.TripeJobError('unknown-user', user)
871   T.svcinfo(name)
872
873 def cmd_passive(*args):
874   """
875   passive [OPTIONS] USER: Await the arrival of the named USER.
876
877   Report a challenge; when (and if!) the server receives a greeting quoting
878   this challenge, add the corresponding peer to the server.
879   """
880   now = time()
881   timeout = 30
882   op = T.OptParse(args, ['-timeout'])
883   for opt in op:
884     if opt == '-timeout':
885       timeout = T.timespec(op.arg())
886   user, = op.rest(1, 1)
887   try: name = CDB.init(opts.cdb)['U' + user]
888   except KeyError: raise T.TripeJobError('unknown-user', user)
889   try: peer = Peer(name)
890   except KeyError: raise T.TripeJobError('unknown-peer', name)
891   chal = S.getchal()
892   cr = T.Coroutine.getcurrent()
893   timer = M.SelTimer(now + timeout, lambda: cr.switch(None))
894   try:
895     T.svcinfo(chal)
896     chalmap[chal] = cr
897     addr = cr.parent.switch()
898     if addr is None:
899       raise T.TripeJobError('connect-timeout')
900     addpeer(peer, addr, True)
901   finally:
902     del chalmap[chal]
903
904 def cmd_sabotage(name):
905   """
906   sabotage NAME: Sabotage the NAMEd peer so that we think it can't be pinged.
907   """
908   try: pp = pinger.find(name)
909   except KeyError: raise T.TripeJobError('unknown-peer', name)
910   pp.sabotage()
911
912 ###--------------------------------------------------------------------------
913 ### Start up.
914
915 def setup():
916   """
917   Service setup.
918
919   Register the notification watcher, rescan the peers, and add automatic
920   active peers.
921   """
922   S.handler['NOTE'] = notify
923   S.watch('+n')
924
925   pinger.rescan(opts.startup)
926
927   if opts.startup:
928     cdb = CDB.init(opts.cdb)
929     try:
930       autos = cdb['%AUTO']
931     except KeyError:
932       autos = ''
933     for name in M.split(autos)[0]:
934       try:
935         peer = Peer(name, cdb)
936         addpeer(peer, M.split(peer.get('peer'), quotep = True)[0], False)
937       except T.TripeJobError, err:
938         S.warn('connect', 'auto-add-failed', name, *err.args)
939
940 def init():
941   """
942   Initialization to be done before service startup.
943   """
944   global errorwatch, childwatch, pinger
945   errorwatch = ErrorWatch()
946   childwatch = ChildWatch()
947   pinger = Pinger()
948   T.Coroutine(dbwatch, name = 'dbwatch').switch()
949   errorwatch.switch()
950   pinger.switch()
951
952 def parse_options():
953   """
954   Parse the command-line options.
955
956   Automatically changes directory to the requested configdir, and turns on
957   debugging.  Returns the options object.
958   """
959   op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
960                     version = '%%prog %s' % VERSION)
961
962   op.add_option('-a', '--admin-socket',
963                 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
964                 help = 'Select socket to connect to [default %default]')
965   op.add_option('-d', '--directory',
966                 metavar = 'DIR', dest = 'dir', default = T.configdir,
967                 help = 'Select current diretory [default %default]')
968   op.add_option('-p', '--peerdb',
969                 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
970                 help = 'Select peers database [default %default]')
971   op.add_option('--daemon', dest = 'daemon',
972                 default = False, action = 'store_true',
973                 help = 'Become a daemon after successful initialization')
974   op.add_option('--debug', dest = 'debug',
975                 default = False, action = 'store_true',
976                 help = 'Emit debugging trace information')
977   op.add_option('--startup', dest = 'startup',
978                 default = False, action = 'store_true',
979                 help = 'Being called as part of the server startup')
980
981   opts, args = op.parse_args()
982   if args: op.error('no arguments permitted')
983   OS.chdir(opts.dir)
984   T._debug = opts.debug
985   return opts
986
987 ## Service table, for running manually.
988 service_info = [('connect', VERSION, {
989   'adopted': (0, 0, '', cmd_adopted),
990   'kick': (1, 1, 'PEER', cmd_kick),
991   'passive': (1, None, '[OPTIONS] USER', cmd_passive),
992   'active': (1, 1, 'PEER', cmd_active),
993   'info': (1, 1, 'PEER', cmd_info),
994   'list-active': (0, 0, '', cmd_listactive),
995   'userpeer': (1, 1, 'USER', cmd_userpeer),
996   'sabotage': (1, 1, 'PEER', cmd_sabotage)
997 })]
998
999 if __name__ == '__main__':
1000   opts = parse_options()
1001   OS.environ['TRIPESOCK'] = opts.tripesock
1002   T.runservices(opts.tripesock, service_info,
1003                 init = init, setup = setup,
1004                 daemon = opts.daemon)
1005
1006 ###----- That's all, folks --------------------------------------------------