chiark / gitweb /
Add new `knock' protocol.
[tripe] / svc / connect.in
1 #! @PYTHON@
2 ### -*-python-*-
3 ###
4 ### Connect to remote peers, and keep track of them
5 ###
6 ### (c) 2007 Straylight/Edgeware
7 ###
8
9 ###----- Licensing notice ---------------------------------------------------
10 ###
11 ### This file is part of Trivial IP Encryption (TrIPE).
12 ###
13 ### TrIPE is free software: you can redistribute it and/or modify it under
14 ### the terms of the GNU General Public License as published by the Free
15 ### Software Foundation; either version 3 of the License, or (at your
16 ### option) any later version.
17 ###
18 ### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19 ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20 ### FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
21 ### for more details.
22 ###
23 ### You should have received a copy of the GNU General Public License
24 ### along with TrIPE.  If not, see <https://www.gnu.org/licenses/>.
25
26 VERSION = '@VERSION@'
27
28 ###--------------------------------------------------------------------------
29 ### External dependencies.
30
31 from optparse import OptionParser
32 import tripe as T
33 import os as OS
34 import signal as SIG
35 import errno as E
36 from math import sqrt
37 import cdb as CDB
38 import mLib as M
39 import re as RX
40 import sys as SYS
41 from time import time
42 import subprocess as PROC
43
44 S = T.svcmgr
45
46 ###--------------------------------------------------------------------------
47 ### Running auxiliary commands.
48
49 class SelLineQueue (M.SelLineBuffer):
50   """Glues the select-line-buffer into the coroutine queue system."""
51
52   def __new__(cls, file, queue, tag, kind):
53     """See __init__ for documentation."""
54     return M.SelLineBuffer.__new__(cls, file.fileno())
55
56   def __init__(me, file, queue, tag, kind):
57     """
58     Initialize a new line-reading adaptor.
59
60     The adaptor reads lines from FILE.  Each line is inserted as a message of
61     the stated KIND, bearing the TAG, into the QUEUE.  End-of-file is
62     represented as None.
63     """
64     me._q = queue
65     me._file = file
66     me._tag = tag
67     me._kind = kind
68     me.enable()
69
70   @T._callback
71   def line(me, line):
72     me._q.put((me._tag, me._kind, line))
73
74   @T._callback
75   def eof(me):
76     me.disable()
77     me._q.put((me._tag, me._kind, None))
78
79 class ErrorWatch (T.Coroutine):
80   """
81   An object which watches stderr streams for errors and converts them into
82   warnings of the form
83
84     WARN connect INFO stderr LINE
85
86   The INFO is a list of tokens associated with the file when it was
87   registered.
88
89   Usually there is a single ErrorWatch object, called errorwatch.
90   """
91
92   def __init__(me):
93     """Initialization: there are no arguments."""
94     T.Coroutine.__init__(me)
95     me._q = T.Queue()
96     me._map = {}
97     me._seq = 1
98
99   def watch(me, file, info):
100     """
101     Adds FILE to the collection of files to watch.
102
103     INFO will be written in the warning messages from this FILE.  Returns a
104     sequence number which can be used to unregister the file again.
105     """
106     seq = me._seq
107     me._seq += 1
108     me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
109     return seq
110
111   def unwatch(me, seq):
112     """Stop watching the file with sequence number SEQ."""
113     del me._map[seq]
114     return me
115
116   def run(me):
117     """
118     Coroutine function: read items from the queue and report them.
119
120     Unregisters files automatically when they reach EOF.
121     """
122     while True:
123       seq, _, line = me._q.get()
124       if line is None:
125         me.unwatch(seq)
126       else:
127         S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
128
129 def dbwatch():
130   """
131   Coroutine function: wake up every minute and notice changes to the
132   database.  When a change happens, tell the Pinger (q.v.) to rescan its
133   peers.
134   """
135   cr = T.Coroutine.getcurrent()
136   main = cr.parent
137   fw = M.FWatch(opts.cdb)
138   while True:
139     timer = M.SelTimer(time() + 60, lambda: cr.switch())
140     main.switch()
141     if fw.update():
142       pinger.rescan(False)
143       S.notify('connect', 'peerdb-update')
144
145 class ChildWatch (M.SelSignal):
146   """
147   An object which watches for specified processes exiting and reports
148   terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
149
150   There is usually only one ChildWatch object, called childwatch.
151   """
152
153   def __new__(cls):
154     """Initialize the child-watcher."""
155     return M.SelSignal.__new__(cls, SIG.SIGCHLD)
156
157   def __init__(me):
158     """Initialize the child-watcher."""
159     me._pid = {}
160     me.enable()
161
162   def watch(me, pid, queue, tag):
163     """
164     Register PID as a child to watch.  If it exits, write (TAG, 'exit', CODE)
165     to the QUEUE, where CODE is one of
166
167       * None (successful termination)
168       * ['exit-nonzero', CODE] (CODE is a string!)
169       * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
170       * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
171     """
172     me._pid[pid] = queue, tag
173     return me
174
175   def unwatch(me, pid):
176     """Unregister PID as a child to watch."""
177     del me._pid[pid]
178     return me
179
180   @T._callback
181   def signalled(me):
182     """
183     Called when child processes exit: collect exit statuses and report
184     failures.
185     """
186     while True:
187       try:
188         pid, status = OS.waitpid(-1, OS.WNOHANG)
189       except OSError, exc:
190         if exc.errno == E.ECHILD:
191           break
192       if pid == 0:
193         break
194       if pid not in me._pid:
195         continue
196       queue, tag = me._pid[pid]
197       if OS.WIFEXITED(status):
198         exit = OS.WEXITSTATUS(status)
199         if exit == 0:
200           code = None
201         else:
202           code = ['exit-nonzero', str(exit)]
203       elif OS.WIFSIGNALED(status):
204         code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
205       else:
206         code = ['exit-unknown', hex(status)]
207       queue.put((tag, 'exit', code))
208
209 class Command (object):
210   """
211   Represents a running command.
212
213   This class is the main interface to the machery provided by the ChildWatch
214   and ErrorWatch objects.  See also potwatch.
215   """
216
217   def __init__(me, info, queue, tag, args, env):
218     """
219     Start a new child process.
220
221     The ARGS are a list of arguments to be given to the child process.  The
222     ENV is either None or a dictionary of environment variable assignments to
223     override the extant environment.  INFO is a list of tokens to be included
224     in warnings about the child's stderr output.  If the child writes a line
225     to standard output, put (TAG, 'stdout', LINE) to the QUEUE.  When the
226     child exits, write (TAG, 'exit', CODE) to the QUEUE.
227     """
228     me._info = info
229     me._q = queue
230     me._tag = tag
231     myenv = OS.environ.copy()
232     if env: myenv.update(env)
233     me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
234                           stdout = PROC.PIPE, stderr = PROC.PIPE)
235     me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
236     errorwatch.watch(me._proc.stderr, info)
237     childwatch.watch(me._proc.pid, queue, tag)
238
239   def __del__(me):
240     """
241     If I've been forgotten then stop watching for termination.
242     """
243     childwatch.unwatch(me._proc.pid)
244
245 def potwatch(what, name, q):
246   """
247   Watch the queue Q for activity as reported by a Command object.
248
249   Information from the process's stdout is reported as
250
251     NOTE WHAT NAME stdout LINE
252
253   abnormal termination is reported as
254
255     WARN WHAT NAME CODE
256
257   where CODE is what the ChildWatch wrote.
258   """
259   eofp = deadp = False
260   while not deadp or not eofp:
261     _, kind, more = q.get()
262     if kind == 'stdout':
263       if more is None:
264         eofp = True
265       else:
266         S.notify('connect', what, name, 'stdout', more)
267     elif kind == 'exit':
268       if more: S.warn('connect', what, name, *more)
269       deadp = True
270
271 ###--------------------------------------------------------------------------
272 ### Peer database utilities.
273
274 _magic = ['_magic']                     # An object distinct from all others
275
276 class Peer (object):
277   """Representation of a peer in the database."""
278
279   def __init__(me, peer, cdb = None):
280     """
281     Create a new peer, named PEER.
282
283     Information about the peer is read from the database CDB, or the default
284     one given on the command-line.
285     """
286     me.name = peer
287     record = (cdb or CDB.init(opts.cdb))['P' + peer]
288     me.__dict__.update(M.URLDecode(record, semip = True))
289
290   def get(me, key, default = _magic, filter = None):
291     """
292     Get the information stashed under KEY from the peer's database record.
293
294     If DEFAULT is given, then use it if the database doesn't contain the
295     necessary information.  If no DEFAULT is given, then report an error.  If
296     a FILTER function is given then apply it to the information from the
297     database before returning it.
298     """
299     try:
300       attr = me.__dict__[key]
301     except KeyError:
302       if default is _magic:
303         raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
304       return default
305     else:
306       if filter is not None: attr = filter(attr)
307       return attr
308
309   def has(me, key):
310     """
311     Return whether the peer's database record has the KEY.
312     """
313     return key in me.__dict__
314
315   def list(me):
316     """
317     Iterate over the available keys in the peer's database record.
318     """
319     return me.__dict__.iterkeys()
320
321 def boolean(value):
322   """Parse VALUE as a boolean."""
323   return value in ['t', 'true', 'y', 'yes', 'on']
324
325 ###--------------------------------------------------------------------------
326 ### Waking up and watching peers.
327
328 def run_connect(peer, cmd):
329   """
330   Start the job of connecting to the passive PEER.
331
332   The CMD string is a shell command which will connect to the peer (via some
333   back-channel, say ssh and userv), issue a command
334
335     SVCSUBMIT connect passive [OPTIONS] USER
336
337   and write the resulting challenge to standard error.
338   """
339   q = T.Queue()
340   cmd = Command(['connect', peer.name], q, 'connect',
341                 ['/bin/sh', '-c', cmd], None)
342   _, kind, more = q.peek()
343   if kind == 'stdout':
344     if more is None:
345       S.warn('connect', 'connect', peer.name, 'unexpected-eof')
346     else:
347       chal = more
348       S.greet(peer.name, chal)
349       q.get()
350   potwatch('connect', peer.name, q)
351
352 def run_disconnect(peer, cmd):
353   """
354   Start the job of disconnecting from a passive PEER.
355
356   The CMD string is a shell command which will disconnect from the peer.
357   """
358   q = T.Queue()
359   cmd = Command(['disconnect', peer.name], q, 'disconnect',
360                 ['/bin/sh', '-c', cmd], None)
361   potwatch('disconnect', peer.name, q)
362
363 _pingseq = 0
364 class PingPeer (object):
365   """
366   Object representing a peer which we are pinging to ensure that it is still
367   present.
368
369   PingPeer objects are held by the Pinger (q.v.).  The Pinger maintains an
370   event queue -- which saves us from having an enormous swarm of coroutines
371   -- but most of the actual work is done here.
372
373   In order to avoid confusion between different PingPeer instances for the
374   same actual peer, each PingPeer has a sequence number (its `seq'
375   attribute).  Events for the PingPeer are identified by a (PEER, SEQ) pair.
376   (Using the PingPeer instance itself will prevent garbage collection of
377   otherwise defunct instances.)
378   """
379
380   def __init__(me, pinger, queue, peer, pingnow):
381     """
382     Create a new PingPeer.
383
384     The PINGER is the Pinger object we should send the results to.  This is
385     used when we remove ourselves, if the peer has been explicitly removed.
386
387     The QUEUE is the event queue on which timer and ping-command events
388     should be written.
389
390     The PEER is a `Peer' object describing the peer.
391
392     If PINGNOW is true, then immediately start pinging the peer.  Otherwise
393     wait until the usual retry interval.
394     """
395     global _pingseq
396     me._pinger = pinger
397     me._q = queue
398     me._peer = peer.name
399     me.update(peer)
400     me.seq = _pingseq
401     _pingseq += 1
402     me._failures = 0
403     me._sabotage = False
404     me._last = '-'
405     me._nping = 0
406     me._nlost = 0
407     me._sigma_t = 0
408     me._sigma_t2 = 0
409     me._min = me._max = '-'
410     if pingnow:
411       me._timer = None
412       me._ping()
413     else:
414       me._timer = M.SelTimer(time() + me._every, me._time)
415
416   def update(me, peer):
417     """
418     Refreshes the timer parameters for this peer.  We don't, however,
419     immediately reschedule anything: that will happen next time anything
420     interesting happens.
421     """
422     if peer is None: peer = Peer(me._peer)
423     assert peer.name == me._peer
424     me._every = peer.get('every', filter = T.timespec, default = 120)
425     me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
426     me._retries = peer.get('retries', filter = int, default = 5)
427     me._connectp = peer.has('connect')
428     me._knockp = peer.has('knock')
429     return me
430
431   def _ping(me):
432     """
433     Send a ping to the peer; the result is sent to the Pinger's event queue.
434     """
435     S.rawcommand(T.TripeAsynchronousCommand(
436       me._q, (me._peer, me.seq),
437       ['EPING',
438        '-background', S.bgtag(),
439        '-timeout', str(me._timeout),
440        '--',
441        me._peer]))
442
443   def _reconnect(me):
444     try:
445       peer = Peer(me._peer)
446       if me._connectp or me._knockp:
447         S.warn('connect', 'reconnecting', me._peer)
448         S.forcekx(me._peer)
449         if me._connectp: T.spawn(run_connect, peer, peer.get('connect'))
450         me._timer = M.SelTimer(time() + me._every, me._time)
451         me._sabotage = False
452       else:
453         S.kill(me._peer)
454     except TripeError, e:
455       if e.args[0] == 'unknown-peer': me._pinger.kill(me._peer)
456
457   def event(me, code, stuff):
458     """
459     Respond to an event which happened to this peer.
460
461     Timer events indicate that we should start a new ping.  (The server has
462     its own timeout which detects lost packets.)
463
464     We trap unknown-peer responses and detach from the Pinger.
465
466     If the ping fails and we run out of retries, we attempt to restart the
467     connection.
468     """
469     if code == 'TIMER':
470       me._failures = 0
471       me._ping()
472     elif code == 'FAIL':
473       S.notify('connect', 'ping-failed', me._peer, *stuff)
474       if not stuff: pass
475       elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer)
476       elif stuff[0] == 'ping-send-failed': me._reconnect()
477     elif code == 'INFO':
478       outcome = stuff[0]
479       if outcome == 'ping-ok' and me._sabotage:
480         outcome = 'ping-timeout'
481       if outcome == 'ping-ok':
482         if me._failures > 0: S.warn('connect', 'ping-ok', me._peer)
483         t = float(stuff[1])
484         me._last = '%.1fms' % t
485         me._sigma_t += t
486         me._sigma_t2 += t*t
487         me._nping += 1
488         if me._min == '-' or t < me._min: me._min = t
489         if me._max == '-' or t > me._max: me._max = t
490         me._timer = M.SelTimer(time() + me._every, me._time)
491       elif outcome == 'ping-timeout':
492         me._failures += 1
493         me._nlost += 1
494         S.warn('connect', 'ping-timeout', me._peer,
495                'attempt', str(me._failures), 'of', str(me._retries))
496         if me._failures < me._retries:
497           me._ping()
498           me._last = 'timeout'
499         else:
500           me._reconnect()
501           me._last = 'reconnect'
502       elif outcome == 'ping-peer-died':
503         me._pinger.kill(me._peer)
504
505   def sabotage(me):
506     """Sabotage the peer, for testing purposes."""
507     me._sabotage = True
508     if me._timer: me._timer.kill()
509     T.defer(me._time)
510
511   def info(me):
512     if not me._nping:
513       mean = sd = '-'
514     else:
515       mean = me._sigma_t/me._nping
516       sd = sqrt(me._sigma_t2/me._nping - mean*mean)
517     n = me._nping + me._nlost
518     if not n: pclost = '-'
519     else: pclost = '%d' % ((100*me._nlost + n//2)//n)
520     return { 'last-ping': me._last,
521              'mean-ping': '%.1fms' % mean,
522              'sd-ping': '%.1fms' % sd,
523              'n-ping': '%d' % me._nping,
524              'n-lost': '%d' % me._nlost,
525              'percent-lost': pclost,
526              'min-ping': '%.1fms' % me._min,
527              'max-ping': '%.1fms' % me._max,
528              'state': me._timer and 'idle' or 'check',
529              'failures': me._failures }
530
531   @T._callback
532   def _time(me):
533     """
534     Handle timer callbacks by posting a timeout event on the queue.
535     """
536     me._timer = None
537     me._q.put(((me._peer, me.seq), 'TIMER', None))
538
539   def __str__(me):
540     return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
541   def __repr__(me):
542     return str(me)
543
544 class Pinger (T.Coroutine):
545   """
546   The Pinger keeps track of the peers which we expect to be connected and
547   takes action if they seem to stop responding.
548
549   There is usually only one Pinger, called pinger.
550
551   The Pinger maintains a collection of PingPeer objects, and an event queue.
552   The PingPeers direct the results of their pings, and timer events, to the
553   event queue.  The Pinger's coroutine picks items off the queue and
554   dispatches them back to the PingPeers as appropriate.
555   """
556
557   def __init__(me):
558     """Initialize the Pinger."""
559     T.Coroutine.__init__(me)
560     me._peers = {}
561     me._q = T.Queue()
562
563   def run(me):
564     """
565     Coroutine function: reads the pinger queue and sends events to the
566     PingPeer objects they correspond to.
567     """
568     while True:
569       (peer, seq), code, stuff = me._q.get()
570       if peer in me._peers and seq == me._peers[peer].seq:
571         try: me._peers[peer].event(code, stuff)
572         except Exception, e:
573           SYS.excepthook(*SYS.exc_info())
574
575   def add(me, peer, pingnow):
576     """
577     Add PEER to the collection of peers under the Pinger's watchful eye.
578     The arguments are as for PingPeer: see above.
579     """
580     me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
581     return me
582
583   def kill(me, peername):
584     """Remove PEER from the peers being watched by the Pinger."""
585     try: del me._peers[peername]
586     except KeyError: pass
587     return me
588
589   def rescan(me, startup):
590     """
591     General resynchronization method.
592
593     We scan the list of peers (with connect scripts) known at the server.
594     Any which are known to the Pinger but aren't known to the server are
595     removed from our list; newly arrived peers are added.  (Note that a peer
596     can change state here either due to the server sneakily changing its list
597     without issuing notifications or, more likely, the database changing its
598     idea of whether a peer is interesting.)  Finally, PingPeers which are
599     still present are prodded to update their timing parameters.
600
601     This method is called once at startup to pick up the peers already
602     installed, and again by the dbwatcher coroutine when it detects a change
603     to the database.
604     """
605     if T._debug: print '# rescan peers'
606     correct = {}
607     start = {}
608     for name in S.list():
609       try: peer = Peer(name)
610       except KeyError: continue
611       if peer.get('watch', filter = boolean, default = False):
612         if T._debug: print '# interesting peer %s' % peer
613         correct[peer.name] = start[peer.name] = peer
614       elif startup:
615         if T._debug: print '# peer %s ready for adoption' % peer
616         start[peer.name] = peer
617     for name, obj in me._peers.items():
618       try:
619         peer = correct[name]
620       except KeyError:
621         if T._debug: print '# peer %s vanished' % name
622         del me._peers[name]
623       else:
624         obj.update(peer)
625     for name, peer in start.iteritems():
626       if name in me._peers: continue
627       if startup:
628         if T._debug: print '# setting up peer %s' % name
629         ifname = S.ifname(name)
630         addr = S.addr(name)
631         T.defer(adoptpeer, peer, ifname, *addr)
632       else:
633         if T._debug: print '# adopting new peer %s' % name
634         me.add(peer, True)
635     return me
636
637   def adopted(me):
638     """
639     Returns the list of peers being watched by the Pinger.
640     """
641     return me._peers.keys()
642
643   def find(me, name):
644     """Return the PingPeer with the given name."""
645     return me._peers[name]
646
647 ###--------------------------------------------------------------------------
648 ### New connections.
649
650 def encode_envvars(env, prefix, vars):
651   """
652   Encode the variables in VARS suitably for including in a program
653   environment.  Lowercase letters in variable names are forced to uppercase;
654   runs of non-alphanumeric characters are replaced by single underscores; and
655   the PREFIX is prepended.  The resulting variables are written to ENV.
656   """
657   for k, v in vars.iteritems():
658     env[prefix + r_bad.sub('_', k.upper())] = v
659
660 r_bad = RX.compile(r'[\W_]+')
661 def envvars(peer):
662   """
663   Translate the database information for a PEER into a dictionary of
664   environment variables with plausible upper-case names and a P_ prefix.
665   Also collect the crypto information into A_ variables.
666   """
667   env = {}
668   encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
669   encode_envvars(env, 'A_', S.algs(peer.name))
670   return env
671
672 def run_ifupdown(what, peer, *args):
673   """
674   Run the interface up/down script for a peer.
675
676   WHAT is 'ifup' or 'ifdown'.  PEER names the peer in question.  ARGS is a
677   list of arguments to pass to the script, in addition to the peer name.
678
679   The command is run and watched in the background by potwatch.
680   """
681   q = T.Queue()
682   c = Command([what, peer.name], q, what,
683               M.split(peer.get(what), quotep = True)[0] +
684               [peer.name] + list(args),
685               envvars(peer))
686   potwatch(what, peer.name, q)
687
688 def adoptpeer(peer, ifname, *addr):
689   """
690   Add a new peer to our collection.
691
692   PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
693   ADDR is the list of tokens representing its address.
694
695   We try to bring up the interface and provoke a connection to the peer if
696   it's passive.
697   """
698   if peer.has('ifup'):
699     T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
700         .switch('ifup', peer, ifname, *addr)
701   cmd = peer.get('connect', default = None)
702   if cmd is not None:
703     T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
704         .switch(peer, cmd)
705   if peer.get('watch', filter = boolean, default = False):
706     pinger.add(peer, False)
707
708 def disownpeer(peer):
709   """Drop the PEER from the Pinger and put its interface to bed."""
710   try: pinger.kill(peer)
711   except KeyError: pass
712   cmd = peer.get('disconnect', default = None)
713   if cmd is not None:
714     T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
715         .switch(peer, cmd)
716   if peer.has('ifdown'):
717     T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
718         .switch('ifdown', peer)
719
720 def addpeer(peer, addr):
721   """
722   Process a connect request from a new peer PEER on address ADDR.
723
724   Any existing peer with this name is disconnected from the server.
725   """
726   if peer.name in S.list():
727     S.kill(peer.name)
728   try:
729     S.add(peer.name,
730           tunnel = peer.get('tunnel', default = None),
731           keepalive = peer.get('keepalive', default = None),
732           key = peer.get('key', default = None),
733           priv = peer.get('priv', default = None),
734           mobile = peer.get('mobile', filter = boolean, default = False),
735           knock = peer.get('knock', default = None),
736           cork = peer.get('cork', filter = boolean, default = False),
737           *addr)
738   except T.TripeError, exc:
739     raise T.TripeJobError(*exc.args)
740
741 ## Dictionary mapping challenges to waiting passive-connection coroutines.
742 chalmap = {}
743
744 def notify(_, code, *rest):
745   """
746   Watch for notifications.
747
748   We trap ADD and KILL notifications, and send them straight to adoptpeer and
749   disownpeer respectively; and dispatch GREET notifications to the
750   corresponding waiting coroutine.
751   """
752   if code == 'ADD':
753     try: p = Peer(rest[0])
754     except KeyError: return
755     adoptpeer(p, *rest[1:])
756   elif code == 'KILL':
757     try: p = Peer(rest[0])
758     except KeyError: return
759     disownpeer(p, *rest[1:])
760   elif code == 'GREET':
761     chal = rest[0]
762     try: cr = chalmap[chal]
763     except KeyError: pass
764     else: cr.switch(rest[1:])
765   elif code == 'KNOCK':
766     try: p = Peer(rest[0])
767     except KeyError:
768       S.warn(['connect', 'knock-unknown-peer', rest[0]])
769       return
770     if p.get('peer') != 'PASSIVE':
771       S.warn(['connect', 'knock-active-peer', p.name])
772       return
773     dot = p.name.find('.')
774     if dot >= 0: kname = p.name[dot + 1:]
775     else: kname = p.name
776     ktag = p.get('key', p.name)
777     if kname != ktag:
778       S.warn(['connect', 'knock-tag-mismatch',
779               'peer', pname, 'public-key-tag', ktag])
780       return
781     T.spawn(addpeer, p, rest[1:])
782
783 ###--------------------------------------------------------------------------
784 ### Command implementation.
785
786 def cmd_kick(name):
787   """
788   kick NAME: Force a new connection attempt for the NAMEd peer.
789   """
790   try: pp = pinger.find(name)
791   except KeyError: raise T.TripeJobError('peer-not-adopted', name)
792   try: peer = Peer(name)
793   except KeyError: raise T.TripeJobError('unknown-peer', name)
794   conn = peer.get('connect', None)
795   if conn: T.spawn(run_connect, peer, peer.get('connect'))
796   else: T.spawn(lambda p: S.forcekx(p.name), peer)
797
798 def cmd_adopted():
799   """
800   adopted: Report a list of adopted peers.
801   """
802   for name in pinger.adopted():
803     T.svcinfo(name)
804
805 def cmd_active(name):
806   """
807   active NAME: Handle an active connection request for the peer called NAME.
808
809   The appropriate address is read from the database automatically.
810   """
811   try: peer = Peer(name)
812   except KeyError: raise T.TripeJobError('unknown-peer', name)
813   addr = peer.get('peer')
814   if addr == 'PASSIVE':
815     raise T.TripeJobError('passive-peer', name)
816   addpeer(peer, M.split(addr, quotep = True)[0])
817
818 def cmd_listactive():
819   """
820   list: Report a list of the available active peers.
821   """
822   cdb = CDB.init(opts.cdb)
823   for key in cdb.keys():
824     if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
825       T.svcinfo(key[1:])
826
827 def cmd_info(name):
828   """
829   info NAME: Report the database entries for the named peer.
830   """
831   try: peer = Peer(name)
832   except KeyError: raise T.TripeJobError('unknown-peer', name)
833   d = {}
834   try: pp = pinger.find(name)
835   except KeyError: pass
836   else: d.update(pp.info())
837   items = list(peer.list()) + d.keys()
838   items.sort()
839   for i in items:
840     try: v = d[i]
841     except KeyError: v = peer.get(i)
842     T.svcinfo('%s=%s' % (i, v))
843
844 def cmd_userpeer(user):
845   """
846   userpeer USER: Report the peer name for the named user.
847   """
848   try: name = CDB.init(opts.cdb)['U' + user]
849   except KeyError: raise T.TripeJobError('unknown-user', user)
850   T.svcinfo(name)
851
852 def cmd_passive(*args):
853   """
854   passive [OPTIONS] USER: Await the arrival of the named USER.
855
856   Report a challenge; when (and if!) the server receives a greeting quoting
857   this challenge, add the corresponding peer to the server.
858   """
859   timeout = 30
860   op = T.OptParse(args, ['-timeout'])
861   for opt in op:
862     if opt == '-timeout':
863       timeout = T.timespec(op.arg())
864   user, = op.rest(1, 1)
865   try: name = CDB.init(opts.cdb)['U' + user]
866   except KeyError: raise T.TripeJobError('unknown-user', user)
867   try: peer = Peer(name)
868   except KeyError: raise T.TripeJobError('unknown-peer', name)
869   chal = S.getchal()
870   cr = T.Coroutine.getcurrent()
871   timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
872   try:
873     T.svcinfo(chal)
874     chalmap[chal] = cr
875     addr = cr.parent.switch()
876     if addr is None:
877       raise T.TripeJobError('connect-timeout')
878     addpeer(peer, addr)
879   finally:
880     del chalmap[chal]
881
882 def cmd_sabotage(name):
883   """
884   sabotage NAME: Sabotage the NAMEd peer so that we think it can't be pinged.
885   """
886   try: pp = pinger.find(name)
887   except KeyError: raise T.TripeJobError('unknown-peer', name)
888   pp.sabotage()
889
890 ###--------------------------------------------------------------------------
891 ### Start up.
892
893 def setup():
894   """
895   Service setup.
896
897   Register the notification watcher, rescan the peers, and add automatic
898   active peers.
899   """
900   S.handler['NOTE'] = notify
901   S.watch('+n')
902
903   pinger.rescan(opts.startup)
904
905   if opts.startup:
906     cdb = CDB.init(opts.cdb)
907     try:
908       autos = cdb['%AUTO']
909     except KeyError:
910       autos = ''
911     for name in M.split(autos)[0]:
912       try:
913         peer = Peer(name, cdb)
914         addpeer(peer, M.split(peer.get('peer'), quotep = True)[0])
915       except T.TripeJobError, err:
916         S.warn('connect', 'auto-add-failed', name, *err.args)
917
918 def init():
919   """
920   Initialization to be done before service startup.
921   """
922   global errorwatch, childwatch, pinger
923   errorwatch = ErrorWatch()
924   childwatch = ChildWatch()
925   pinger = Pinger()
926   T.Coroutine(dbwatch, name = 'dbwatch').switch()
927   errorwatch.switch()
928   pinger.switch()
929
930 def parse_options():
931   """
932   Parse the command-line options.
933
934   Automatically changes directory to the requested configdir, and turns on
935   debugging.  Returns the options object.
936   """
937   op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
938                     version = '%%prog %s' % VERSION)
939
940   op.add_option('-a', '--admin-socket',
941                 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
942                 help = 'Select socket to connect to [default %default]')
943   op.add_option('-d', '--directory',
944                 metavar = 'DIR', dest = 'dir', default = T.configdir,
945                 help = 'Select current diretory [default %default]')
946   op.add_option('-p', '--peerdb',
947                 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
948                 help = 'Select peers database [default %default]')
949   op.add_option('--daemon', dest = 'daemon',
950                 default = False, action = 'store_true',
951                 help = 'Become a daemon after successful initialization')
952   op.add_option('--debug', dest = 'debug',
953                 default = False, action = 'store_true',
954                 help = 'Emit debugging trace information')
955   op.add_option('--startup', dest = 'startup',
956                 default = False, action = 'store_true',
957                 help = 'Being called as part of the server startup')
958
959   opts, args = op.parse_args()
960   if args: op.error('no arguments permitted')
961   OS.chdir(opts.dir)
962   T._debug = opts.debug
963   return opts
964
965 ## Service table, for running manually.
966 service_info = [('connect', T.VERSION, {
967   'adopted': (0, 0, '', cmd_adopted),
968   'kick': (1, 1, 'PEER', cmd_kick),
969   'passive': (1, None, '[OPTIONS] USER', cmd_passive),
970   'active': (1, 1, 'PEER', cmd_active),
971   'info': (1, 1, 'PEER', cmd_info),
972   'list-active': (0, 0, '', cmd_listactive),
973   'userpeer': (1, 1, 'USER', cmd_userpeer),
974   'sabotage': (1, 1, 'PEER', cmd_sabotage)
975 })]
976
977 if __name__ == '__main__':
978   opts = parse_options()
979   OS.environ['TRIPESOCK'] = opts.tripesock
980   T.runservices(opts.tripesock, service_info,
981                 init = init, setup = setup,
982                 daemon = opts.daemon)
983
984 ###----- That's all, folks --------------------------------------------------