chiark / gitweb /
svc/connect.in: Add a backstop exception handler to the pinger loop.
[tripe] / svc / connect.in
1 #! @PYTHON@
2 ### -*-python-*-
3 ###
4 ### Connect to remote peers, and keep track of them
5 ###
6 ### (c) 2007 Straylight/Edgeware
7 ###
8
9 ###----- Licensing notice ---------------------------------------------------
10 ###
11 ### This file is part of Trivial IP Encryption (TrIPE).
12 ###
13 ### TrIPE is free software: you can redistribute it and/or modify it under
14 ### the terms of the GNU General Public License as published by the Free
15 ### Software Foundation; either version 3 of the License, or (at your
16 ### option) any later version.
17 ###
18 ### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19 ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20 ### FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
21 ### for more details.
22 ###
23 ### You should have received a copy of the GNU General Public License
24 ### along with TrIPE.  If not, see <https://www.gnu.org/licenses/>.
25
26 VERSION = '@VERSION@'
27
28 ###--------------------------------------------------------------------------
29 ### External dependencies.
30
31 from optparse import OptionParser
32 import tripe as T
33 import os as OS
34 import signal as SIG
35 import errno as E
36 from math import sqrt
37 import cdb as CDB
38 import mLib as M
39 import re as RX
40 import sys as SYS
41 from time import time
42 import subprocess as PROC
43
44 S = T.svcmgr
45
46 ###--------------------------------------------------------------------------
47 ### Running auxiliary commands.
48
49 class SelLineQueue (M.SelLineBuffer):
50   """Glues the select-line-buffer into the coroutine queue system."""
51
52   def __new__(cls, file, queue, tag, kind):
53     """See __init__ for documentation."""
54     return M.SelLineBuffer.__new__(cls, file.fileno())
55
56   def __init__(me, file, queue, tag, kind):
57     """
58     Initialize a new line-reading adaptor.
59
60     The adaptor reads lines from FILE.  Each line is inserted as a message of
61     the stated KIND, bearing the TAG, into the QUEUE.  End-of-file is
62     represented as None.
63     """
64     me._q = queue
65     me._file = file
66     me._tag = tag
67     me._kind = kind
68     me.enable()
69
70   @T._callback
71   def line(me, line):
72     me._q.put((me._tag, me._kind, line))
73
74   @T._callback
75   def eof(me):
76     me.disable()
77     me._q.put((me._tag, me._kind, None))
78
79 class ErrorWatch (T.Coroutine):
80   """
81   An object which watches stderr streams for errors and converts them into
82   warnings of the form
83
84     WARN connect INFO stderr LINE
85
86   The INFO is a list of tokens associated with the file when it was
87   registered.
88
89   Usually there is a single ErrorWatch object, called errorwatch.
90   """
91
92   def __init__(me):
93     """Initialization: there are no arguments."""
94     T.Coroutine.__init__(me)
95     me._q = T.Queue()
96     me._map = {}
97     me._seq = 1
98
99   def watch(me, file, info):
100     """
101     Adds FILE to the collection of files to watch.
102
103     INFO will be written in the warning messages from this FILE.  Returns a
104     sequence number which can be used to unregister the file again.
105     """
106     seq = me._seq
107     me._seq += 1
108     me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
109     return seq
110
111   def unwatch(me, seq):
112     """Stop watching the file with sequence number SEQ."""
113     del me._map[seq]
114     return me
115
116   def run(me):
117     """
118     Coroutine function: read items from the queue and report them.
119
120     Unregisters files automatically when they reach EOF.
121     """
122     while True:
123       seq, _, line = me._q.get()
124       if line is None:
125         me.unwatch(seq)
126       else:
127         S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
128
129 def dbwatch():
130   """
131   Coroutine function: wake up every minute and notice changes to the
132   database.  When a change happens, tell the Pinger (q.v.) to rescan its
133   peers.
134   """
135   cr = T.Coroutine.getcurrent()
136   main = cr.parent
137   fw = M.FWatch(opts.cdb)
138   while True:
139     timer = M.SelTimer(time() + 60, lambda: cr.switch())
140     main.switch()
141     if fw.update():
142       pinger.rescan(False)
143       S.notify('connect', 'peerdb-update')
144
145 class ChildWatch (M.SelSignal):
146   """
147   An object which watches for specified processes exiting and reports
148   terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
149
150   There is usually only one ChildWatch object, called childwatch.
151   """
152
153   def __new__(cls):
154     """Initialize the child-watcher."""
155     return M.SelSignal.__new__(cls, SIG.SIGCHLD)
156
157   def __init__(me):
158     """Initialize the child-watcher."""
159     me._pid = {}
160     me.enable()
161
162   def watch(me, pid, queue, tag):
163     """
164     Register PID as a child to watch.  If it exits, write (TAG, 'exit', CODE)
165     to the QUEUE, where CODE is one of
166
167       * None (successful termination)
168       * ['exit-nonzero', CODE] (CODE is a string!)
169       * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
170       * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
171     """
172     me._pid[pid] = queue, tag
173     return me
174
175   def unwatch(me, pid):
176     """Unregister PID as a child to watch."""
177     del me._pid[pid]
178     return me
179
180   @T._callback
181   def signalled(me):
182     """
183     Called when child processes exit: collect exit statuses and report
184     failures.
185     """
186     while True:
187       try:
188         pid, status = OS.waitpid(-1, OS.WNOHANG)
189       except OSError, exc:
190         if exc.errno == E.ECHILD:
191           break
192       if pid == 0:
193         break
194       if pid not in me._pid:
195         continue
196       queue, tag = me._pid[pid]
197       if OS.WIFEXITED(status):
198         exit = OS.WEXITSTATUS(status)
199         if exit == 0:
200           code = None
201         else:
202           code = ['exit-nonzero', str(exit)]
203       elif OS.WIFSIGNALED(status):
204         code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
205       else:
206         code = ['exit-unknown', hex(status)]
207       queue.put((tag, 'exit', code))
208
209 class Command (object):
210   """
211   Represents a running command.
212
213   This class is the main interface to the machery provided by the ChildWatch
214   and ErrorWatch objects.  See also potwatch.
215   """
216
217   def __init__(me, info, queue, tag, args, env):
218     """
219     Start a new child process.
220
221     The ARGS are a list of arguments to be given to the child process.  The
222     ENV is either None or a dictionary of environment variable assignments to
223     override the extant environment.  INFO is a list of tokens to be included
224     in warnings about the child's stderr output.  If the child writes a line
225     to standard output, put (TAG, 'stdout', LINE) to the QUEUE.  When the
226     child exits, write (TAG, 'exit', CODE) to the QUEUE.
227     """
228     me._info = info
229     me._q = queue
230     me._tag = tag
231     myenv = OS.environ.copy()
232     if env: myenv.update(env)
233     me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
234                           stdout = PROC.PIPE, stderr = PROC.PIPE)
235     me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
236     errorwatch.watch(me._proc.stderr, info)
237     childwatch.watch(me._proc.pid, queue, tag)
238
239   def __del__(me):
240     """
241     If I've been forgotten then stop watching for termination.
242     """
243     childwatch.unwatch(me._proc.pid)
244
245 def potwatch(what, name, q):
246   """
247   Watch the queue Q for activity as reported by a Command object.
248
249   Information from the process's stdout is reported as
250
251     NOTE WHAT NAME stdout LINE
252
253   abnormal termination is reported as
254
255     WARN WHAT NAME CODE
256
257   where CODE is what the ChildWatch wrote.
258   """
259   eofp = deadp = False
260   while not deadp or not eofp:
261     _, kind, more = q.get()
262     if kind == 'stdout':
263       if more is None:
264         eofp = True
265       else:
266         S.notify('connect', what, name, 'stdout', more)
267     elif kind == 'exit':
268       if more: S.warn('connect', what, name, *more)
269       deadp = True
270
271 ###--------------------------------------------------------------------------
272 ### Peer database utilities.
273
274 _magic = ['_magic']                     # An object distinct from all others
275
276 class Peer (object):
277   """Representation of a peer in the database."""
278
279   def __init__(me, peer, cdb = None):
280     """
281     Create a new peer, named PEER.
282
283     Information about the peer is read from the database CDB, or the default
284     one given on the command-line.
285     """
286     me.name = peer
287     record = (cdb or CDB.init(opts.cdb))['P' + peer]
288     me.__dict__.update(M.URLDecode(record, semip = True))
289
290   def get(me, key, default = _magic, filter = None):
291     """
292     Get the information stashed under KEY from the peer's database record.
293
294     If DEFAULT is given, then use it if the database doesn't contain the
295     necessary information.  If no DEFAULT is given, then report an error.  If
296     a FILTER function is given then apply it to the information from the
297     database before returning it.
298     """
299     attr = me.__dict__.get(key, default)
300     if attr is _magic:
301       raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
302     elif filter is not None:
303       attr = filter(attr)
304     return attr
305
306   def has(me, key):
307     """
308     Return whether the peer's database record has the KEY.
309     """
310     return key in me.__dict__
311
312   def list(me):
313     """
314     Iterate over the available keys in the peer's database record.
315     """
316     return me.__dict__.iterkeys()
317
318 def boolean(value):
319   """Parse VALUE as a boolean."""
320   return value in ['t', 'true', 'y', 'yes', 'on']
321
322 ###--------------------------------------------------------------------------
323 ### Waking up and watching peers.
324
325 def run_connect(peer, cmd):
326   """
327   Start the job of connecting to the passive PEER.
328
329   The CMD string is a shell command which will connect to the peer (via some
330   back-channel, say ssh and userv), issue a command
331
332     SVCSUBMIT connect passive [OPTIONS] USER
333
334   and write the resulting challenge to standard error.
335   """
336   q = T.Queue()
337   cmd = Command(['connect', peer.name], q, 'connect',
338                 ['/bin/sh', '-c', cmd], None)
339   _, kind, more = q.peek()
340   if kind == 'stdout':
341     if more is None:
342       S.warn('connect', 'connect', peer.name, 'unexpected-eof')
343     else:
344       chal = more
345       S.greet(peer.name, chal)
346       q.get()
347   potwatch('connect', peer.name, q)
348
349 def run_disconnect(peer, cmd):
350   """
351   Start the job of disconnecting from a passive PEER.
352
353   The CMD string is a shell command which will disconnect from the peer.
354   """
355   q = T.Queue()
356   cmd = Command(['disconnect', peer.name], q, 'disconnect',
357                 ['/bin/sh', '-c', cmd], None)
358   potwatch('disconnect', peer.name, q)
359
360 _pingseq = 0
361 class PingPeer (object):
362   """
363   Object representing a peer which we are pinging to ensure that it is still
364   present.
365
366   PingPeer objects are held by the Pinger (q.v.).  The Pinger maintains an
367   event queue -- which saves us from having an enormous swarm of coroutines
368   -- but most of the actual work is done here.
369
370   In order to avoid confusion between different PingPeer instances for the
371   same actual peer, each PingPeer has a sequence number (its `seq'
372   attribute).  Events for the PingPeer are identified by a (PEER, SEQ) pair.
373   (Using the PingPeer instance itself will prevent garbage collection of
374   otherwise defunct instances.)
375   """
376
377   def __init__(me, pinger, queue, peer, pingnow):
378     """
379     Create a new PingPeer.
380
381     The PINGER is the Pinger object we should send the results to.  This is
382     used when we remove ourselves, if the peer has been explicitly removed.
383
384     The QUEUE is the event queue on which timer and ping-command events
385     should be written.
386
387     The PEER is a `Peer' object describing the peer.
388
389     If PINGNOW is true, then immediately start pinging the peer.  Otherwise
390     wait until the usual retry interval.
391     """
392     global _pingseq
393     me._pinger = pinger
394     me._q = queue
395     me._peer = peer.name
396     me.update(peer)
397     me.seq = _pingseq
398     _pingseq += 1
399     me._failures = 0
400     me._sabotage = False
401     me._last = '-'
402     me._nping = 0
403     me._nlost = 0
404     me._sigma_t = 0
405     me._sigma_t2 = 0
406     me._min = me._max = '-'
407     if pingnow:
408       me._timer = None
409       me._ping()
410     else:
411       me._timer = M.SelTimer(time() + me._every, me._time)
412
413   def update(me, peer):
414     """
415     Refreshes the timer parameters for this peer.  We don't, however,
416     immediately reschedule anything: that will happen next time anything
417     interesting happens.
418     """
419     if peer is None: peer = Peer(me._peer)
420     assert peer.name == me._peer
421     me._every = peer.get('every', filter = T.timespec, default = 120)
422     me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
423     me._retries = peer.get('retries', filter = int, default = 5)
424     me._connectp = peer.has('connect')
425     return me
426
427   def _ping(me):
428     """
429     Send a ping to the peer; the result is sent to the Pinger's event queue.
430     """
431     S.rawcommand(T.TripeAsynchronousCommand(
432       me._q, (me._peer, me.seq),
433       ['EPING',
434        '-background', S.bgtag(),
435        '-timeout', str(me._timeout),
436        '--',
437        me._peer]))
438
439   def _reconnect(me):
440     try:
441       peer = Peer(me._peer)
442       if me._connectp:
443         S.warn('connect', 'reconnecting', me._peer)
444         S.forcekx(me._peer)
445         T.spawn(run_connect, peer, peer.get('connect'))
446         me._timer = M.SelTimer(time() + me._every, me._time)
447         me._sabotage = False
448       else:
449         S.kill(me._peer)
450     except TripeError, e:
451       if e.args[0] == 'unknown-peer': me._pinger.kill(me._peer)
452
453   def event(me, code, stuff):
454     """
455     Respond to an event which happened to this peer.
456
457     Timer events indicate that we should start a new ping.  (The server has
458     its own timeout which detects lost packets.)
459
460     We trap unknown-peer responses and detach from the Pinger.
461
462     If the ping fails and we run out of retries, we attempt to restart the
463     connection.
464     """
465     if code == 'TIMER':
466       me._failures = 0
467       me._ping()
468     elif code == 'FAIL':
469       S.notify('connect', 'ping-failed', me._peer, *stuff)
470       if not stuff:
471         pass
472       elif stuff[0] == 'unknown-peer':
473         me._pinger.kill(me._peer)
474       elif stuff[0] == 'ping-send-failed':
475         me._reconnect()
476     elif code == 'INFO':
477       outcome = stuff[0]
478       if outcome == 'ping-ok' and me._sabotage:
479         outcome = 'ping-timeout'
480       if outcome == 'ping-ok':
481         if me._failures > 0:
482           S.warn('connect', 'ping-ok', me._peer)
483         t = float(stuff[1])
484         me._last = '%.1fms' % t
485         me._sigma_t += t
486         me._sigma_t2 += t*t
487         me._nping += 1
488         if me._min == '-' or t < me._min: me._min = t
489         if me._max == '-' or t > me._max: me._max = t
490         me._timer = M.SelTimer(time() + me._every, me._time)
491       elif outcome == 'ping-timeout':
492         me._failures += 1
493         me._nlost += 1
494         S.warn('connect', 'ping-timeout', me._peer,
495                'attempt', str(me._failures), 'of', str(me._retries))
496         if me._failures < me._retries:
497           me._ping()
498           me._last = 'timeout'
499         else:
500           me._reconnect()
501           me._last = 'reconnect'
502       elif outcome == 'ping-peer-died':
503         me._pinger.kill(me._peer)
504
505   def sabotage(me):
506     """Sabotage the peer, for testing purposes."""
507     me._sabotage = True
508     if me._timer: me._timer.kill()
509     T.defer(me._time)
510
511   def info(me):
512     if not me._nping:
513       mean = sd = '-'
514     else:
515       mean = me._sigma_t/me._nping
516       sd = sqrt(me._sigma_t2/me._nping - mean*mean)
517     n = me._nping + me._nlost
518     if not n: pclost = '-'
519     else: pclost = '%d' % ((100*me._nlost + n//2)//n)
520     return { 'last-ping': me._last,
521              'mean-ping': '%.1fms' % mean,
522              'sd-ping': '%.1fms' % sd,
523              'n-ping': '%d' % me._nping,
524              'n-lost': '%d' % me._nlost,
525              'percent-lost': pclost,
526              'min-ping': '%.1fms' % me._min,
527              'max-ping': '%.1fms' % me._max,
528              'state': me._timer and 'idle' or 'check',
529              'failures': me._failures }
530
531   @T._callback
532   def _time(me):
533     """
534     Handle timer callbacks by posting a timeout event on the queue.
535     """
536     me._timer = None
537     me._q.put(((me._peer, me.seq), 'TIMER', None))
538
539   def __str__(me):
540     return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
541   def __repr__(me):
542     return str(me)
543
544 class Pinger (T.Coroutine):
545   """
546   The Pinger keeps track of the peers which we expect to be connected and
547   takes action if they seem to stop responding.
548
549   There is usually only one Pinger, called pinger.
550
551   The Pinger maintains a collection of PingPeer objects, and an event queue.
552   The PingPeers direct the results of their pings, and timer events, to the
553   event queue.  The Pinger's coroutine picks items off the queue and
554   dispatches them back to the PingPeers as appropriate.
555   """
556
557   def __init__(me):
558     """Initialize the Pinger."""
559     T.Coroutine.__init__(me)
560     me._peers = {}
561     me._q = T.Queue()
562
563   def run(me):
564     """
565     Coroutine function: reads the pinger queue and sends events to the
566     PingPeer objects they correspond to.
567     """
568     while True:
569       (peer, seq), code, stuff = me._q.get()
570       if peer in me._peers and seq == me._peers[peer].seq:
571         try: me._peers[peer].event(code, stuff)
572         except Exception, e:
573           SYS.excepthook(*SYS.exc_info())
574
575   def add(me, peer, pingnow):
576     """
577     Add PEER to the collection of peers under the Pinger's watchful eye.
578     The arguments are as for PingPeer: see above.
579     """
580     me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
581     return me
582
583   def kill(me, peername):
584     """Remove PEER from the peers being watched by the Pinger."""
585     try: del me._peers[peername]
586     except KeyError: pass
587     return me
588
589   def rescan(me, startup):
590     """
591     General resynchronization method.
592
593     We scan the list of peers (with connect scripts) known at the server.
594     Any which are known to the Pinger but aren't known to the server are
595     removed from our list; newly arrived peers are added.  (Note that a peer
596     can change state here either due to the server sneakily changing its list
597     without issuing notifications or, more likely, the database changing its
598     idea of whether a peer is interesting.)  Finally, PingPeers which are
599     still present are prodded to update their timing parameters.
600
601     This method is called once at startup to pick up the peers already
602     installed, and again by the dbwatcher coroutine when it detects a change
603     to the database.
604     """
605     if T._debug: print '# rescan peers'
606     correct = {}
607     start = {}
608     for name in S.list():
609       try: peer = Peer(name)
610       except KeyError: continue
611       if peer.get('watch', filter = boolean, default = False):
612         if T._debug: print '# interesting peer %s' % peer
613         correct[peer.name] = start[peer.name] = peer
614       elif startup:
615         if T._debug: print '# peer %s ready for adoption' % peer
616         start[peer.name] = peer
617     for name, obj in me._peers.items():
618       try:
619         peer = correct[name]
620       except KeyError:
621         if T._debug: print '# peer %s vanished' % name
622         del me._peers[name]
623       else:
624         obj.update(peer)
625     for name, peer in start.iteritems():
626       if name in me._peers: continue
627       if startup:
628         if T._debug: print '# setting up peer %s' % name
629         ifname = S.ifname(name)
630         addr = S.addr(name)
631         T.defer(adoptpeer, peer, ifname, *addr)
632       else:
633         if T._debug: print '# adopting new peer %s' % name
634         me.add(peer, True)
635     return me
636
637   def adopted(me):
638     """
639     Returns the list of peers being watched by the Pinger.
640     """
641     return me._peers.keys()
642
643   def find(me, name):
644     """Return the PingPeer with the given name."""
645     return me._peers[name]
646
647 ###--------------------------------------------------------------------------
648 ### New connections.
649
650 def encode_envvars(env, prefix, vars):
651   """
652   Encode the variables in VARS suitably for including in a program
653   environment.  Lowercase letters in variable names are forced to uppercase;
654   runs of non-alphanumeric characters are replaced by single underscores; and
655   the PREFIX is prepended.  The resulting variables are written to ENV.
656   """
657   for k, v in vars.iteritems():
658     env[prefix + r_bad.sub('_', k.upper())] = v
659
660 r_bad = RX.compile(r'[\W_]+')
661 def envvars(peer):
662   """
663   Translate the database information for a PEER into a dictionary of
664   environment variables with plausible upper-case names and a P_ prefix.
665   Also collect the crypto information into A_ variables.
666   """
667   env = {}
668   encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
669   encode_envvars(env, 'A_', S.algs(peer.name))
670   return env
671
672 def run_ifupdown(what, peer, *args):
673   """
674   Run the interface up/down script for a peer.
675
676   WHAT is 'ifup' or 'ifdown'.  PEER names the peer in question.  ARGS is a
677   list of arguments to pass to the script, in addition to the peer name.
678
679   The command is run and watched in the background by potwatch.
680   """
681   q = T.Queue()
682   c = Command([what, peer.name], q, what,
683               M.split(peer.get(what), quotep = True)[0] +
684               [peer.name] + list(args),
685               envvars(peer))
686   potwatch(what, peer.name, q)
687
688 def adoptpeer(peer, ifname, *addr):
689   """
690   Add a new peer to our collection.
691
692   PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
693   ADDR is the list of tokens representing its address.
694
695   We try to bring up the interface and provoke a connection to the peer if
696   it's passive.
697   """
698   if peer.has('ifup'):
699     T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
700         .switch('ifup', peer, ifname, *addr)
701   cmd = peer.get('connect', default = None)
702   if cmd is not None:
703     T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
704         .switch(peer, cmd)
705   if peer.get('watch', filter = boolean, default = False):
706     pinger.add(peer, False)
707
708 def disownpeer(peer):
709   """Drop the PEER from the Pinger and put its interface to bed."""
710   try: pinger.kill(peer)
711   except KeyError: pass
712   cmd = peer.get('disconnect', default = None)
713   if cmd is not None:
714     T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
715         .switch(peer, cmd)
716   if peer.has('ifdown'):
717     T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
718         .switch('ifdown', peer)
719
720 def addpeer(peer, addr):
721   """
722   Process a connect request from a new peer PEER on address ADDR.
723
724   Any existing peer with this name is disconnected from the server.
725   """
726   if peer.name in S.list():
727     S.kill(peer.name)
728   try:
729     booltrue = ['t', 'true', 'y', 'yes', 'on']
730     S.add(peer.name,
731           tunnel = peer.get('tunnel', None),
732           keepalive = peer.get('keepalive', None),
733           key = peer.get('key', None),
734           priv = peer.get('priv', None),
735           mobile = peer.get('mobile', 'nil') in booltrue,
736           cork = peer.get('cork', 'nil') in booltrue,
737           *addr)
738   except T.TripeError, exc:
739     raise T.TripeJobError(*exc.args)
740
741 ## Dictionary mapping challenges to waiting passive-connection coroutines.
742 chalmap = {}
743
744 def notify(_, code, *rest):
745   """
746   Watch for notifications.
747
748   We trap ADD and KILL notifications, and send them straight to adoptpeer and
749   disownpeer respectively; and dispatch GREET notifications to the
750   corresponding waiting coroutine.
751   """
752   if code == 'ADD':
753     try: p = Peer(rest[0])
754     except KeyError: return
755     adoptpeer(p, *rest[1:])
756   elif code == 'KILL':
757     try: p = Peer(rest[0])
758     except KeyError: return
759     disownpeer(p, *rest[1:])
760   elif code == 'GREET':
761     chal = rest[0]
762     try: cr = chalmap[chal]
763     except KeyError: pass
764     else: cr.switch(rest[1:])
765
766 ###--------------------------------------------------------------------------
767 ### Command implementation.
768
769 def cmd_kick(name):
770   """
771   kick NAME: Force a new connection attempt for the NAMEd peer.
772   """
773   try: pp = pinger.find(name)
774   except KeyError: raise T.TripeJobError('peer-not-adopted', name)
775   try: peer = Peer(name)
776   except KeyError: raise T.TripeJobError('unknown-peer', name)
777   conn = peer.get('connect', None)
778   if conn: T.spawn(run_connect, peer, peer.get('connect'))
779   else: T.spawn(lambda p: S.forcekx(p.name), peer)
780
781 def cmd_adopted():
782   """
783   adopted: Report a list of adopted peers.
784   """
785   for name in pinger.adopted():
786     T.svcinfo(name)
787
788 def cmd_active(name):
789   """
790   active NAME: Handle an active connection request for the peer called NAME.
791
792   The appropriate address is read from the database automatically.
793   """
794   try: peer = Peer(name)
795   except KeyError: raise T.TripeJobError('unknown-peer', name)
796   addr = peer.get('peer')
797   if addr == 'PASSIVE':
798     raise T.TripeJobError('passive-peer', name)
799   addpeer(peer, M.split(addr, quotep = True)[0])
800
801 def cmd_listactive():
802   """
803   list: Report a list of the available active peers.
804   """
805   cdb = CDB.init(opts.cdb)
806   for key in cdb.keys():
807     if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
808       T.svcinfo(key[1:])
809
810 def cmd_info(name):
811   """
812   info NAME: Report the database entries for the named peer.
813   """
814   try: peer = Peer(name)
815   except KeyError: raise T.TripeJobError('unknown-peer', name)
816   d = {}
817   try: pp = pinger.find(name)
818   except KeyError: pass
819   else: d.update(pp.info())
820   items = list(peer.list()) + d.keys()
821   items.sort()
822   for i in items:
823     try: v = d[i]
824     except KeyError: v = peer.get(i)
825     T.svcinfo('%s=%s' % (i, v))
826
827 def cmd_userpeer(user):
828   """
829   userpeer USER: Report the peer name for the named user.
830   """
831   try: name = CDB.init(opts.cdb)['U' + user]
832   except KeyError: raise T.TripeJobError('unknown-user', user)
833   T.svcinfo(name)
834
835 def cmd_passive(*args):
836   """
837   passive [OPTIONS] USER: Await the arrival of the named USER.
838
839   Report a challenge; when (and if!) the server receives a greeting quoting
840   this challenge, add the corresponding peer to the server.
841   """
842   timeout = 30
843   op = T.OptParse(args, ['-timeout'])
844   for opt in op:
845     if opt == '-timeout':
846       timeout = T.timespec(op.arg())
847   user, = op.rest(1, 1)
848   try: name = CDB.init(opts.cdb)['U' + user]
849   except KeyError: raise T.TripeJobError('unknown-user', user)
850   try: peer = Peer(name)
851   except KeyError: raise T.TripeJobError('unknown-peer', name)
852   chal = S.getchal()
853   cr = T.Coroutine.getcurrent()
854   timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
855   try:
856     T.svcinfo(chal)
857     chalmap[chal] = cr
858     addr = cr.parent.switch()
859     if addr is None:
860       raise T.TripeJobError('connect-timeout')
861     addpeer(peer, addr)
862   finally:
863     del chalmap[chal]
864
865 def cmd_sabotage(name):
866   """
867   sabotage NAME: Sabotage the NAMEd peer so that we think it can't be pinged.
868   """
869   try: pp = pinger.find(name)
870   except KeyError: raise T.TripeJobError('unknown-peer', name)
871   pp.sabotage()
872
873 ###--------------------------------------------------------------------------
874 ### Start up.
875
876 def setup():
877   """
878   Service setup.
879
880   Register the notification watcher, rescan the peers, and add automatic
881   active peers.
882   """
883   S.handler['NOTE'] = notify
884   S.watch('+n')
885
886   pinger.rescan(opts.startup)
887
888   if opts.startup:
889     cdb = CDB.init(opts.cdb)
890     try:
891       autos = cdb['%AUTO']
892     except KeyError:
893       autos = ''
894     for name in M.split(autos)[0]:
895       try:
896         peer = Peer(name, cdb)
897         addpeer(peer, M.split(peer.get('peer'), quotep = True)[0])
898       except T.TripeJobError, err:
899         S.warn('connect', 'auto-add-failed', name, *err.args)
900
901 def init():
902   """
903   Initialization to be done before service startup.
904   """
905   global errorwatch, childwatch, pinger
906   errorwatch = ErrorWatch()
907   childwatch = ChildWatch()
908   pinger = Pinger()
909   T.Coroutine(dbwatch, name = 'dbwatch').switch()
910   errorwatch.switch()
911   pinger.switch()
912
913 def parse_options():
914   """
915   Parse the command-line options.
916
917   Automatically changes directory to the requested configdir, and turns on
918   debugging.  Returns the options object.
919   """
920   op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
921                     version = '%%prog %s' % VERSION)
922
923   op.add_option('-a', '--admin-socket',
924                 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
925                 help = 'Select socket to connect to [default %default]')
926   op.add_option('-d', '--directory',
927                 metavar = 'DIR', dest = 'dir', default = T.configdir,
928                 help = 'Select current diretory [default %default]')
929   op.add_option('-p', '--peerdb',
930                 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
931                 help = 'Select peers database [default %default]')
932   op.add_option('--daemon', dest = 'daemon',
933                 default = False, action = 'store_true',
934                 help = 'Become a daemon after successful initialization')
935   op.add_option('--debug', dest = 'debug',
936                 default = False, action = 'store_true',
937                 help = 'Emit debugging trace information')
938   op.add_option('--startup', dest = 'startup',
939                 default = False, action = 'store_true',
940                 help = 'Being called as part of the server startup')
941
942   opts, args = op.parse_args()
943   if args: op.error('no arguments permitted')
944   OS.chdir(opts.dir)
945   T._debug = opts.debug
946   return opts
947
948 ## Service table, for running manually.
949 service_info = [('connect', T.VERSION, {
950   'adopted': (0, 0, '', cmd_adopted),
951   'kick': (1, 1, 'PEER', cmd_kick),
952   'passive': (1, None, '[OPTIONS] USER', cmd_passive),
953   'active': (1, 1, 'PEER', cmd_active),
954   'info': (1, 1, 'PEER', cmd_info),
955   'list-active': (0, 0, '', cmd_listactive),
956   'userpeer': (1, 1, 'USER', cmd_userpeer),
957   'sabotage': (1, 1, 'PEER', cmd_sabotage)
958 })]
959
960 if __name__ == '__main__':
961   opts = parse_options()
962   OS.environ['TRIPESOCK'] = opts.tripesock
963   T.runservices(opts.tripesock, service_info,
964                 init = init, setup = setup,
965                 daemon = opts.daemon)
966
967 ###----- That's all, folks --------------------------------------------------