chiark / gitweb /
svc/connect.in: Add a new `sabotage' command to test ping-failure actions.
[tripe] / svc / connect.in
1 #! @PYTHON@
2 ### -*-python-*-
3 ###
4 ### Connect to remote peers, and keep track of them
5 ###
6 ### (c) 2007 Straylight/Edgeware
7 ###
8
9 ###----- Licensing notice ---------------------------------------------------
10 ###
11 ### This file is part of Trivial IP Encryption (TrIPE).
12 ###
13 ### TrIPE is free software: you can redistribute it and/or modify it under
14 ### the terms of the GNU General Public License as published by the Free
15 ### Software Foundation; either version 3 of the License, or (at your
16 ### option) any later version.
17 ###
18 ### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19 ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20 ### FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
21 ### for more details.
22 ###
23 ### You should have received a copy of the GNU General Public License
24 ### along with TrIPE.  If not, see <https://www.gnu.org/licenses/>.
25
26 VERSION = '@VERSION@'
27
28 ###--------------------------------------------------------------------------
29 ### External dependencies.
30
31 from optparse import OptionParser
32 import tripe as T
33 import os as OS
34 import signal as SIG
35 import errno as E
36 from math import sqrt
37 import cdb as CDB
38 import mLib as M
39 import re as RX
40 from time import time
41 import subprocess as PROC
42
43 S = T.svcmgr
44
45 ###--------------------------------------------------------------------------
46 ### Running auxiliary commands.
47
48 class SelLineQueue (M.SelLineBuffer):
49   """Glues the select-line-buffer into the coroutine queue system."""
50
51   def __new__(cls, file, queue, tag, kind):
52     """See __init__ for documentation."""
53     return M.SelLineBuffer.__new__(cls, file.fileno())
54
55   def __init__(me, file, queue, tag, kind):
56     """
57     Initialize a new line-reading adaptor.
58
59     The adaptor reads lines from FILE.  Each line is inserted as a message of
60     the stated KIND, bearing the TAG, into the QUEUE.  End-of-file is
61     represented as None.
62     """
63     me._q = queue
64     me._file = file
65     me._tag = tag
66     me._kind = kind
67     me.enable()
68
69   @T._callback
70   def line(me, line):
71     me._q.put((me._tag, me._kind, line))
72
73   @T._callback
74   def eof(me):
75     me.disable()
76     me._q.put((me._tag, me._kind, None))
77
78 class ErrorWatch (T.Coroutine):
79   """
80   An object which watches stderr streams for errors and converts them into
81   warnings of the form
82
83     WARN connect INFO stderr LINE
84
85   The INFO is a list of tokens associated with the file when it was
86   registered.
87
88   Usually there is a single ErrorWatch object, called errorwatch.
89   """
90
91   def __init__(me):
92     """Initialization: there are no arguments."""
93     T.Coroutine.__init__(me)
94     me._q = T.Queue()
95     me._map = {}
96     me._seq = 1
97
98   def watch(me, file, info):
99     """
100     Adds FILE to the collection of files to watch.
101
102     INFO will be written in the warning messages from this FILE.  Returns a
103     sequence number which can be used to unregister the file again.
104     """
105     seq = me._seq
106     me._seq += 1
107     me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
108     return seq
109
110   def unwatch(me, seq):
111     """Stop watching the file with sequence number SEQ."""
112     del me._map[seq]
113     return me
114
115   def run(me):
116     """
117     Coroutine function: read items from the queue and report them.
118
119     Unregisters files automatically when they reach EOF.
120     """
121     while True:
122       seq, _, line = me._q.get()
123       if line is None:
124         me.unwatch(seq)
125       else:
126         S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
127
128 def dbwatch():
129   """
130   Coroutine function: wake up every minute and notice changes to the
131   database.  When a change happens, tell the Pinger (q.v.) to rescan its
132   peers.
133   """
134   cr = T.Coroutine.getcurrent()
135   main = cr.parent
136   fw = M.FWatch(opts.cdb)
137   while True:
138     timer = M.SelTimer(time() + 60, lambda: cr.switch())
139     main.switch()
140     if fw.update():
141       pinger.rescan(False)
142       S.notify('connect', 'peerdb-update')
143
144 class ChildWatch (M.SelSignal):
145   """
146   An object which watches for specified processes exiting and reports
147   terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
148
149   There is usually only one ChildWatch object, called childwatch.
150   """
151
152   def __new__(cls):
153     """Initialize the child-watcher."""
154     return M.SelSignal.__new__(cls, SIG.SIGCHLD)
155
156   def __init__(me):
157     """Initialize the child-watcher."""
158     me._pid = {}
159     me.enable()
160
161   def watch(me, pid, queue, tag):
162     """
163     Register PID as a child to watch.  If it exits, write (TAG, 'exit', CODE)
164     to the QUEUE, where CODE is one of
165
166       * None (successful termination)
167       * ['exit-nonzero', CODE] (CODE is a string!)
168       * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
169       * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
170     """
171     me._pid[pid] = queue, tag
172     return me
173
174   def unwatch(me, pid):
175     """Unregister PID as a child to watch."""
176     del me._pid[pid]
177     return me
178
179   @T._callback
180   def signalled(me):
181     """
182     Called when child processes exit: collect exit statuses and report
183     failures.
184     """
185     while True:
186       try:
187         pid, status = OS.waitpid(-1, OS.WNOHANG)
188       except OSError, exc:
189         if exc.errno == E.ECHILD:
190           break
191       if pid == 0:
192         break
193       if pid not in me._pid:
194         continue
195       queue, tag = me._pid[pid]
196       if OS.WIFEXITED(status):
197         exit = OS.WEXITSTATUS(status)
198         if exit == 0:
199           code = None
200         else:
201           code = ['exit-nonzero', str(exit)]
202       elif OS.WIFSIGNALED(status):
203         code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
204       else:
205         code = ['exit-unknown', hex(status)]
206       queue.put((tag, 'exit', code))
207
208 class Command (object):
209   """
210   Represents a running command.
211
212   This class is the main interface to the machery provided by the ChildWatch
213   and ErrorWatch objects.  See also potwatch.
214   """
215
216   def __init__(me, info, queue, tag, args, env):
217     """
218     Start a new child process.
219
220     The ARGS are a list of arguments to be given to the child process.  The
221     ENV is either None or a dictionary of environment variable assignments to
222     override the extant environment.  INFO is a list of tokens to be included
223     in warnings about the child's stderr output.  If the child writes a line
224     to standard output, put (TAG, 'stdout', LINE) to the QUEUE.  When the
225     child exits, write (TAG, 'exit', CODE) to the QUEUE.
226     """
227     me._info = info
228     me._q = queue
229     me._tag = tag
230     myenv = OS.environ.copy()
231     if env: myenv.update(env)
232     me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
233                           stdout = PROC.PIPE, stderr = PROC.PIPE)
234     me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
235     errorwatch.watch(me._proc.stderr, info)
236     childwatch.watch(me._proc.pid, queue, tag)
237
238   def __del__(me):
239     """
240     If I've been forgotten then stop watching for termination.
241     """
242     childwatch.unwatch(me._proc.pid)
243
244 def potwatch(what, name, q):
245   """
246   Watch the queue Q for activity as reported by a Command object.
247
248   Information from the process's stdout is reported as
249
250     NOTE WHAT NAME stdout LINE
251
252   abnormal termination is reported as
253
254     WARN WHAT NAME CODE
255
256   where CODE is what the ChildWatch wrote.
257   """
258   eofp = deadp = False
259   while not deadp or not eofp:
260     _, kind, more = q.get()
261     if kind == 'stdout':
262       if more is None:
263         eofp = True
264       else:
265         S.notify('connect', what, name, 'stdout', more)
266     elif kind == 'exit':
267       if more: S.warn('connect', what, name, *more)
268       deadp = True
269
270 ###--------------------------------------------------------------------------
271 ### Peer database utilities.
272
273 _magic = ['_magic']                     # An object distinct from all others
274
275 class Peer (object):
276   """Representation of a peer in the database."""
277
278   def __init__(me, peer, cdb = None):
279     """
280     Create a new peer, named PEER.
281
282     Information about the peer is read from the database CDB, or the default
283     one given on the command-line.
284     """
285     me.name = peer
286     record = (cdb or CDB.init(opts.cdb))['P' + peer]
287     me.__dict__.update(M.URLDecode(record, semip = True))
288
289   def get(me, key, default = _magic, filter = None):
290     """
291     Get the information stashed under KEY from the peer's database record.
292
293     If DEFAULT is given, then use it if the database doesn't contain the
294     necessary information.  If no DEFAULT is given, then report an error.  If
295     a FILTER function is given then apply it to the information from the
296     database before returning it.
297     """
298     attr = me.__dict__.get(key, default)
299     if attr is _magic:
300       raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
301     elif filter is not None:
302       attr = filter(attr)
303     return attr
304
305   def has(me, key):
306     """
307     Return whether the peer's database record has the KEY.
308     """
309     return key in me.__dict__
310
311   def list(me):
312     """
313     Iterate over the available keys in the peer's database record.
314     """
315     return me.__dict__.iterkeys()
316
317 def boolean(value):
318   """Parse VALUE as a boolean."""
319   return value in ['t', 'true', 'y', 'yes', 'on']
320
321 ###--------------------------------------------------------------------------
322 ### Waking up and watching peers.
323
324 def run_connect(peer, cmd):
325   """
326   Start the job of connecting to the passive PEER.
327
328   The CMD string is a shell command which will connect to the peer (via some
329   back-channel, say ssh and userv), issue a command
330
331     SVCSUBMIT connect passive [OPTIONS] USER
332
333   and write the resulting challenge to standard error.
334   """
335   q = T.Queue()
336   cmd = Command(['connect', peer.name], q, 'connect',
337                 ['/bin/sh', '-c', cmd], None)
338   _, kind, more = q.peek()
339   if kind == 'stdout':
340     if more is None:
341       S.warn('connect', 'connect', peer.name, 'unexpected-eof')
342     else:
343       chal = more
344       S.greet(peer.name, chal)
345       q.get()
346   potwatch('connect', peer.name, q)
347
348 def run_disconnect(peer, cmd):
349   """
350   Start the job of disconnecting from a passive PEER.
351
352   The CMD string is a shell command which will disconnect from the peer.
353   """
354   q = T.Queue()
355   cmd = Command(['disconnect', peer.name], q, 'disconnect',
356                 ['/bin/sh', '-c', cmd], None)
357   potwatch('disconnect', peer.name, q)
358
359 _pingseq = 0
360 class PingPeer (object):
361   """
362   Object representing a peer which we are pinging to ensure that it is still
363   present.
364
365   PingPeer objects are held by the Pinger (q.v.).  The Pinger maintains an
366   event queue -- which saves us from having an enormous swarm of coroutines
367   -- but most of the actual work is done here.
368
369   In order to avoid confusion between different PingPeer instances for the
370   same actual peer, each PingPeer has a sequence number (its `seq'
371   attribute).  Events for the PingPeer are identified by a (PEER, SEQ) pair.
372   (Using the PingPeer instance itself will prevent garbage collection of
373   otherwise defunct instances.)
374   """
375
376   def __init__(me, pinger, queue, peer, pingnow):
377     """
378     Create a new PingPeer.
379
380     The PINGER is the Pinger object we should send the results to.  This is
381     used when we remove ourselves, if the peer has been explicitly removed.
382
383     The QUEUE is the event queue on which timer and ping-command events
384     should be written.
385
386     The PEER is a `Peer' object describing the peer.
387
388     If PINGNOW is true, then immediately start pinging the peer.  Otherwise
389     wait until the usual retry interval.
390     """
391     global _pingseq
392     me._pinger = pinger
393     me._q = queue
394     me._peer = peer.name
395     me.update(peer)
396     me.seq = _pingseq
397     _pingseq += 1
398     me._failures = 0
399     me._sabotage = False
400     me._last = '-'
401     me._nping = 0
402     me._nlost = 0
403     me._sigma_t = 0
404     me._sigma_t2 = 0
405     me._min = me._max = '-'
406     if pingnow:
407       me._timer = None
408       me._ping()
409     else:
410       me._timer = M.SelTimer(time() + me._every, me._time)
411
412   def update(me, peer):
413     """
414     Refreshes the timer parameters for this peer.  We don't, however,
415     immediately reschedule anything: that will happen next time anything
416     interesting happens.
417     """
418     if peer is None: peer = Peer(me._peer)
419     assert peer.name == me._peer
420     me._every = peer.get('every', filter = T.timespec, default = 120)
421     me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
422     me._retries = peer.get('retries', filter = int, default = 5)
423     me._connectp = peer.has('connect')
424     return me
425
426   def _ping(me):
427     """
428     Send a ping to the peer; the result is sent to the Pinger's event queue.
429     """
430     S.rawcommand(T.TripeAsynchronousCommand(
431       me._q, (me._peer, me.seq),
432       ['EPING',
433        '-background', S.bgtag(),
434        '-timeout', str(me._timeout),
435        '--',
436        me._peer]))
437
438   def _reconnect(me):
439     peer = Peer(me._peer)
440     if me._connectp:
441       S.warn('connect', 'reconnecting', me._peer)
442       S.forcekx(me._peer)
443       T.spawn(run_connect, peer, peer.get('connect'))
444       me._timer = M.SelTimer(time() + me._every, me._time)
445       me._sabotage = False
446     else:
447       S.kill(me._peer)
448
449   def event(me, code, stuff):
450     """
451     Respond to an event which happened to this peer.
452
453     Timer events indicate that we should start a new ping.  (The server has
454     its own timeout which detects lost packets.)
455
456     We trap unknown-peer responses and detach from the Pinger.
457
458     If the ping fails and we run out of retries, we attempt to restart the
459     connection.
460     """
461     if code == 'TIMER':
462       me._failures = 0
463       me._ping()
464     elif code == 'FAIL':
465       S.notify('connect', 'ping-failed', me._peer, *stuff)
466       if not stuff:
467         pass
468       elif stuff[0] == 'unknown-peer':
469         me._pinger.kill(me._peer)
470       elif stuff[0] == 'ping-send-failed':
471         me._reconnect()
472     elif code == 'INFO':
473       outcome = stuff[0]
474       if outcome == 'ping-ok' and me._sabotage:
475         outcome = 'ping-timeout'
476       if outcome == 'ping-ok':
477         if me._failures > 0:
478           S.warn('connect', 'ping-ok', me._peer)
479         t = float(stuff[1])
480         me._last = '%.1fms' % t
481         me._sigma_t += t
482         me._sigma_t2 += t*t
483         me._nping += 1
484         if me._min == '-' or t < me._min: me._min = t
485         if me._max == '-' or t > me._max: me._max = t
486         me._timer = M.SelTimer(time() + me._every, me._time)
487       elif outcome == 'ping-timeout':
488         me._failures += 1
489         me._nlost += 1
490         S.warn('connect', 'ping-timeout', me._peer,
491                'attempt', str(me._failures), 'of', str(me._retries))
492         if me._failures < me._retries:
493           me._ping()
494           me._last = 'timeout'
495         else:
496           me._reconnect()
497           me._last = 'reconnect'
498       elif outcome == 'ping-peer-died':
499         me._pinger.kill(me._peer)
500
501   def sabotage(me):
502     """Sabotage the peer, for testing purposes."""
503     me._sabotage = True
504     if me._timer: me._timer.kill()
505     T.defer(me._time)
506
507   def info(me):
508     if not me._nping:
509       mean = sd = '-'
510     else:
511       mean = me._sigma_t/me._nping
512       sd = sqrt(me._sigma_t2/me._nping - mean*mean)
513     n = me._nping + me._nlost
514     if not n: pclost = '-'
515     else: pclost = '%d' % ((100*me._nlost + n//2)//n)
516     return { 'last-ping': me._last,
517              'mean-ping': '%.1fms' % mean,
518              'sd-ping': '%.1fms' % sd,
519              'n-ping': '%d' % me._nping,
520              'n-lost': '%d' % me._nlost,
521              'percent-lost': pclost,
522              'min-ping': '%.1fms' % me._min,
523              'max-ping': '%.1fms' % me._max,
524              'state': me._timer and 'idle' or 'check',
525              'failures': me._failures }
526
527   @T._callback
528   def _time(me):
529     """
530     Handle timer callbacks by posting a timeout event on the queue.
531     """
532     me._timer = None
533     me._q.put(((me._peer, me.seq), 'TIMER', None))
534
535   def __str__(me):
536     return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
537   def __repr__(me):
538     return str(me)
539
540 class Pinger (T.Coroutine):
541   """
542   The Pinger keeps track of the peers which we expect to be connected and
543   takes action if they seem to stop responding.
544
545   There is usually only one Pinger, called pinger.
546
547   The Pinger maintains a collection of PingPeer objects, and an event queue.
548   The PingPeers direct the results of their pings, and timer events, to the
549   event queue.  The Pinger's coroutine picks items off the queue and
550   dispatches them back to the PingPeers as appropriate.
551   """
552
553   def __init__(me):
554     """Initialize the Pinger."""
555     T.Coroutine.__init__(me)
556     me._peers = {}
557     me._q = T.Queue()
558
559   def run(me):
560     """
561     Coroutine function: reads the pinger queue and sends events to the
562     PingPeer objects they correspond to.
563     """
564     while True:
565       (peer, seq), code, stuff = me._q.get()
566       if peer in me._peers and seq == me._peers[peer].seq:
567         me._peers[peer].event(code, stuff)
568
569   def add(me, peer, pingnow):
570     """
571     Add PEER to the collection of peers under the Pinger's watchful eye.
572     The arguments are as for PingPeer: see above.
573     """
574     me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
575     return me
576
577   def kill(me, peername):
578     """Remove PEER from the peers being watched by the Pinger."""
579     try: del me._peers[peername]
580     except KeyError: pass
581     return me
582
583   def rescan(me, startup):
584     """
585     General resynchronization method.
586
587     We scan the list of peers (with connect scripts) known at the server.
588     Any which are known to the Pinger but aren't known to the server are
589     removed from our list; newly arrived peers are added.  (Note that a peer
590     can change state here either due to the server sneakily changing its list
591     without issuing notifications or, more likely, the database changing its
592     idea of whether a peer is interesting.)  Finally, PingPeers which are
593     still present are prodded to update their timing parameters.
594
595     This method is called once at startup to pick up the peers already
596     installed, and again by the dbwatcher coroutine when it detects a change
597     to the database.
598     """
599     if T._debug: print '# rescan peers'
600     correct = {}
601     start = {}
602     for name in S.list():
603       try: peer = Peer(name)
604       except KeyError: continue
605       if peer.get('watch', filter = boolean, default = False):
606         if T._debug: print '# interesting peer %s' % peer
607         correct[peer.name] = start[peer.name] = peer
608       elif startup:
609         if T._debug: print '# peer %s ready for adoption' % peer
610         start[peer.name] = peer
611     for name, obj in me._peers.items():
612       try:
613         peer = correct[name]
614       except KeyError:
615         if T._debug: print '# peer %s vanished' % name
616         del me._peers[name]
617       else:
618         obj.update(peer)
619     for name, peer in start.iteritems():
620       if name in me._peers: continue
621       if startup:
622         if T._debug: print '# setting up peer %s' % name
623         ifname = S.ifname(name)
624         addr = S.addr(name)
625         T.defer(adoptpeer, peer, ifname, *addr)
626       else:
627         if T._debug: print '# adopting new peer %s' % name
628         me.add(peer, True)
629     return me
630
631   def adopted(me):
632     """
633     Returns the list of peers being watched by the Pinger.
634     """
635     return me._peers.keys()
636
637   def find(me, name):
638     """Return the PingPeer with the given name."""
639     return me._peers[name]
640
641 ###--------------------------------------------------------------------------
642 ### New connections.
643
644 def encode_envvars(env, prefix, vars):
645   """
646   Encode the variables in VARS suitably for including in a program
647   environment.  Lowercase letters in variable names are forced to uppercase;
648   runs of non-alphanumeric characters are replaced by single underscores; and
649   the PREFIX is prepended.  The resulting variables are written to ENV.
650   """
651   for k, v in vars.iteritems():
652     env[prefix + r_bad.sub('_', k.upper())] = v
653
654 r_bad = RX.compile(r'[\W_]+')
655 def envvars(peer):
656   """
657   Translate the database information for a PEER into a dictionary of
658   environment variables with plausible upper-case names and a P_ prefix.
659   Also collect the crypto information into A_ variables.
660   """
661   env = {}
662   encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
663   encode_envvars(env, 'A_', S.algs(peer.name))
664   return env
665
666 def run_ifupdown(what, peer, *args):
667   """
668   Run the interface up/down script for a peer.
669
670   WHAT is 'ifup' or 'ifdown'.  PEER names the peer in question.  ARGS is a
671   list of arguments to pass to the script, in addition to the peer name.
672
673   The command is run and watched in the background by potwatch.
674   """
675   q = T.Queue()
676   c = Command([what, peer.name], q, what,
677               M.split(peer.get(what), quotep = True)[0] +
678               [peer.name] + list(args),
679               envvars(peer))
680   potwatch(what, peer.name, q)
681
682 def adoptpeer(peer, ifname, *addr):
683   """
684   Add a new peer to our collection.
685
686   PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
687   ADDR is the list of tokens representing its address.
688
689   We try to bring up the interface and provoke a connection to the peer if
690   it's passive.
691   """
692   if peer.has('ifup'):
693     T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
694         .switch('ifup', peer, ifname, *addr)
695   cmd = peer.get('connect', default = None)
696   if cmd is not None:
697     T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
698         .switch(peer, cmd)
699   if peer.get('watch', filter = boolean, default = False):
700     pinger.add(peer, False)
701
702 def disownpeer(peer):
703   """Drop the PEER from the Pinger and put its interface to bed."""
704   try: pinger.kill(peer)
705   except KeyError: pass
706   cmd = peer.get('disconnect', default = None)
707   if cmd is not None:
708     T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
709         .switch(peer, cmd)
710   if peer.has('ifdown'):
711     T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
712         .switch('ifdown', peer)
713
714 def addpeer(peer, addr):
715   """
716   Process a connect request from a new peer PEER on address ADDR.
717
718   Any existing peer with this name is disconnected from the server.
719   """
720   if peer.name in S.list():
721     S.kill(peer.name)
722   try:
723     booltrue = ['t', 'true', 'y', 'yes', 'on']
724     S.add(peer.name,
725           tunnel = peer.get('tunnel', None),
726           keepalive = peer.get('keepalive', None),
727           key = peer.get('key', None),
728           priv = peer.get('priv', None),
729           mobile = peer.get('mobile', 'nil') in booltrue,
730           cork = peer.get('cork', 'nil') in booltrue,
731           *addr)
732   except T.TripeError, exc:
733     raise T.TripeJobError(*exc.args)
734
735 ## Dictionary mapping challenges to waiting passive-connection coroutines.
736 chalmap = {}
737
738 def notify(_, code, *rest):
739   """
740   Watch for notifications.
741
742   We trap ADD and KILL notifications, and send them straight to adoptpeer and
743   disownpeer respectively; and dispatch GREET notifications to the
744   corresponding waiting coroutine.
745   """
746   if code == 'ADD':
747     try: p = Peer(rest[0])
748     except KeyError: return
749     adoptpeer(p, *rest[1:])
750   elif code == 'KILL':
751     try: p = Peer(rest[0])
752     except KeyError: return
753     disownpeer(p, *rest[1:])
754   elif code == 'GREET':
755     chal = rest[0]
756     try: cr = chalmap[chal]
757     except KeyError: pass
758     else: cr.switch(rest[1:])
759
760 ###--------------------------------------------------------------------------
761 ### Command implementation.
762
763 def cmd_kick(name):
764   """
765   kick NAME: Force a new connection attempt for the NAMEd peer.
766   """
767   try: pp = pinger.find(name)
768   except KeyError: raise T.TripeJobError('peer-not-adopted', name)
769   try: peer = Peer(name)
770   except KeyError: raise T.TripeJobError('unknown-peer', name)
771   conn = peer.get('connect', None)
772   if conn: T.spawn(run_connect, peer, peer.get('connect'))
773   else: T.spawn(lambda p: S.forcekx(p.name), peer)
774
775 def cmd_adopted():
776   """
777   adopted: Report a list of adopted peers.
778   """
779   for name in pinger.adopted():
780     T.svcinfo(name)
781
782 def cmd_active(name):
783   """
784   active NAME: Handle an active connection request for the peer called NAME.
785
786   The appropriate address is read from the database automatically.
787   """
788   try: peer = Peer(name)
789   except KeyError: raise T.TripeJobError('unknown-peer', name)
790   addr = peer.get('peer')
791   if addr == 'PASSIVE':
792     raise T.TripeJobError('passive-peer', name)
793   addpeer(peer, M.split(addr, quotep = True)[0])
794
795 def cmd_listactive():
796   """
797   list: Report a list of the available active peers.
798   """
799   cdb = CDB.init(opts.cdb)
800   for key in cdb.keys():
801     if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
802       T.svcinfo(key[1:])
803
804 def cmd_info(name):
805   """
806   info NAME: Report the database entries for the named peer.
807   """
808   try: peer = Peer(name)
809   except KeyError: raise T.TripeJobError('unknown-peer', name)
810   d = {}
811   try: pp = pinger.find(name)
812   except KeyError: pass
813   else: d.update(pp.info())
814   items = list(peer.list()) + d.keys()
815   items.sort()
816   for i in items:
817     try: v = d[i]
818     except KeyError: v = peer.get(i)
819     T.svcinfo('%s=%s' % (i, v))
820
821 def cmd_userpeer(user):
822   """
823   userpeer USER: Report the peer name for the named user.
824   """
825   try: name = CDB.init(opts.cdb)['U' + user]
826   except KeyError: raise T.TripeJobError('unknown-user', user)
827   T.svcinfo(name)
828
829 def cmd_passive(*args):
830   """
831   passive [OPTIONS] USER: Await the arrival of the named USER.
832
833   Report a challenge; when (and if!) the server receives a greeting quoting
834   this challenge, add the corresponding peer to the server.
835   """
836   timeout = 30
837   op = T.OptParse(args, ['-timeout'])
838   for opt in op:
839     if opt == '-timeout':
840       timeout = T.timespec(op.arg())
841   user, = op.rest(1, 1)
842   try: name = CDB.init(opts.cdb)['U' + user]
843   except KeyError: raise T.TripeJobError('unknown-user', user)
844   try: peer = Peer(name)
845   except KeyError: raise T.TripeJobError('unknown-peer', name)
846   chal = S.getchal()
847   cr = T.Coroutine.getcurrent()
848   timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
849   try:
850     T.svcinfo(chal)
851     chalmap[chal] = cr
852     addr = cr.parent.switch()
853     if addr is None:
854       raise T.TripeJobError('connect-timeout')
855     addpeer(peer, addr)
856   finally:
857     del chalmap[chal]
858
859 def cmd_sabotage(name):
860   """
861   sabotage NAME: Sabotage the NAMEd peer so that we think it can't be pinged.
862   """
863   try: pp = pinger.find(name)
864   except KeyError: raise T.TripeJobError('unknown-peer', name)
865   pp.sabotage()
866
867 ###--------------------------------------------------------------------------
868 ### Start up.
869
870 def setup():
871   """
872   Service setup.
873
874   Register the notification watcher, rescan the peers, and add automatic
875   active peers.
876   """
877   S.handler['NOTE'] = notify
878   S.watch('+n')
879
880   pinger.rescan(opts.startup)
881
882   if opts.startup:
883     cdb = CDB.init(opts.cdb)
884     try:
885       autos = cdb['%AUTO']
886     except KeyError:
887       autos = ''
888     for name in M.split(autos)[0]:
889       try:
890         peer = Peer(name, cdb)
891         addpeer(peer, M.split(peer.get('peer'), quotep = True)[0])
892       except T.TripeJobError, err:
893         S.warn('connect', 'auto-add-failed', name, *err.args)
894
895 def init():
896   """
897   Initialization to be done before service startup.
898   """
899   global errorwatch, childwatch, pinger
900   errorwatch = ErrorWatch()
901   childwatch = ChildWatch()
902   pinger = Pinger()
903   T.Coroutine(dbwatch, name = 'dbwatch').switch()
904   errorwatch.switch()
905   pinger.switch()
906
907 def parse_options():
908   """
909   Parse the command-line options.
910
911   Automatically changes directory to the requested configdir, and turns on
912   debugging.  Returns the options object.
913   """
914   op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
915                     version = '%%prog %s' % VERSION)
916
917   op.add_option('-a', '--admin-socket',
918                 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
919                 help = 'Select socket to connect to [default %default]')
920   op.add_option('-d', '--directory',
921                 metavar = 'DIR', dest = 'dir', default = T.configdir,
922                 help = 'Select current diretory [default %default]')
923   op.add_option('-p', '--peerdb',
924                 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
925                 help = 'Select peers database [default %default]')
926   op.add_option('--daemon', dest = 'daemon',
927                 default = False, action = 'store_true',
928                 help = 'Become a daemon after successful initialization')
929   op.add_option('--debug', dest = 'debug',
930                 default = False, action = 'store_true',
931                 help = 'Emit debugging trace information')
932   op.add_option('--startup', dest = 'startup',
933                 default = False, action = 'store_true',
934                 help = 'Being called as part of the server startup')
935
936   opts, args = op.parse_args()
937   if args: op.error('no arguments permitted')
938   OS.chdir(opts.dir)
939   T._debug = opts.debug
940   return opts
941
942 ## Service table, for running manually.
943 service_info = [('connect', T.VERSION, {
944   'adopted': (0, 0, '', cmd_adopted),
945   'kick': (1, 1, 'PEER', cmd_kick),
946   'passive': (1, None, '[OPTIONS] USER', cmd_passive),
947   'active': (1, 1, 'PEER', cmd_active),
948   'info': (1, 1, 'PEER', cmd_info),
949   'list-active': (0, 0, '', cmd_listactive),
950   'userpeer': (1, 1, 'USER', cmd_userpeer),
951   'sabotage': (1, 1, 'PEER', cmd_sabotage)
952 })]
953
954 if __name__ == '__main__':
955   opts = parse_options()
956   T.runservices(opts.tripesock, service_info,
957                 init = init, setup = setup,
958                 daemon = opts.daemon)
959
960 ###----- That's all, folks --------------------------------------------------