chiark / gitweb /
svc/connect.in (addpeer): Use `boolean' filter rather than a local hack.
[tripe] / svc / connect.in
1 #! @PYTHON@
2 ### -*-python-*-
3 ###
4 ### Connect to remote peers, and keep track of them
5 ###
6 ### (c) 2007 Straylight/Edgeware
7 ###
8
9 ###----- Licensing notice ---------------------------------------------------
10 ###
11 ### This file is part of Trivial IP Encryption (TrIPE).
12 ###
13 ### TrIPE is free software: you can redistribute it and/or modify it under
14 ### the terms of the GNU General Public License as published by the Free
15 ### Software Foundation; either version 3 of the License, or (at your
16 ### option) any later version.
17 ###
18 ### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19 ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20 ### FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
21 ### for more details.
22 ###
23 ### You should have received a copy of the GNU General Public License
24 ### along with TrIPE.  If not, see <https://www.gnu.org/licenses/>.
25
26 VERSION = '@VERSION@'
27
28 ###--------------------------------------------------------------------------
29 ### External dependencies.
30
31 from optparse import OptionParser
32 import tripe as T
33 import os as OS
34 import signal as SIG
35 import errno as E
36 from math import sqrt
37 import cdb as CDB
38 import mLib as M
39 import re as RX
40 import sys as SYS
41 from time import time
42 import subprocess as PROC
43
44 S = T.svcmgr
45
46 ###--------------------------------------------------------------------------
47 ### Running auxiliary commands.
48
49 class SelLineQueue (M.SelLineBuffer):
50   """Glues the select-line-buffer into the coroutine queue system."""
51
52   def __new__(cls, file, queue, tag, kind):
53     """See __init__ for documentation."""
54     return M.SelLineBuffer.__new__(cls, file.fileno())
55
56   def __init__(me, file, queue, tag, kind):
57     """
58     Initialize a new line-reading adaptor.
59
60     The adaptor reads lines from FILE.  Each line is inserted as a message of
61     the stated KIND, bearing the TAG, into the QUEUE.  End-of-file is
62     represented as None.
63     """
64     me._q = queue
65     me._file = file
66     me._tag = tag
67     me._kind = kind
68     me.enable()
69
70   @T._callback
71   def line(me, line):
72     me._q.put((me._tag, me._kind, line))
73
74   @T._callback
75   def eof(me):
76     me.disable()
77     me._q.put((me._tag, me._kind, None))
78
79 class ErrorWatch (T.Coroutine):
80   """
81   An object which watches stderr streams for errors and converts them into
82   warnings of the form
83
84     WARN connect INFO stderr LINE
85
86   The INFO is a list of tokens associated with the file when it was
87   registered.
88
89   Usually there is a single ErrorWatch object, called errorwatch.
90   """
91
92   def __init__(me):
93     """Initialization: there are no arguments."""
94     T.Coroutine.__init__(me)
95     me._q = T.Queue()
96     me._map = {}
97     me._seq = 1
98
99   def watch(me, file, info):
100     """
101     Adds FILE to the collection of files to watch.
102
103     INFO will be written in the warning messages from this FILE.  Returns a
104     sequence number which can be used to unregister the file again.
105     """
106     seq = me._seq
107     me._seq += 1
108     me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
109     return seq
110
111   def unwatch(me, seq):
112     """Stop watching the file with sequence number SEQ."""
113     del me._map[seq]
114     return me
115
116   def run(me):
117     """
118     Coroutine function: read items from the queue and report them.
119
120     Unregisters files automatically when they reach EOF.
121     """
122     while True:
123       seq, _, line = me._q.get()
124       if line is None:
125         me.unwatch(seq)
126       else:
127         S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
128
129 def dbwatch():
130   """
131   Coroutine function: wake up every minute and notice changes to the
132   database.  When a change happens, tell the Pinger (q.v.) to rescan its
133   peers.
134   """
135   cr = T.Coroutine.getcurrent()
136   main = cr.parent
137   fw = M.FWatch(opts.cdb)
138   while True:
139     timer = M.SelTimer(time() + 60, lambda: cr.switch())
140     main.switch()
141     if fw.update():
142       pinger.rescan(False)
143       S.notify('connect', 'peerdb-update')
144
145 class ChildWatch (M.SelSignal):
146   """
147   An object which watches for specified processes exiting and reports
148   terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
149
150   There is usually only one ChildWatch object, called childwatch.
151   """
152
153   def __new__(cls):
154     """Initialize the child-watcher."""
155     return M.SelSignal.__new__(cls, SIG.SIGCHLD)
156
157   def __init__(me):
158     """Initialize the child-watcher."""
159     me._pid = {}
160     me.enable()
161
162   def watch(me, pid, queue, tag):
163     """
164     Register PID as a child to watch.  If it exits, write (TAG, 'exit', CODE)
165     to the QUEUE, where CODE is one of
166
167       * None (successful termination)
168       * ['exit-nonzero', CODE] (CODE is a string!)
169       * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
170       * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
171     """
172     me._pid[pid] = queue, tag
173     return me
174
175   def unwatch(me, pid):
176     """Unregister PID as a child to watch."""
177     del me._pid[pid]
178     return me
179
180   @T._callback
181   def signalled(me):
182     """
183     Called when child processes exit: collect exit statuses and report
184     failures.
185     """
186     while True:
187       try:
188         pid, status = OS.waitpid(-1, OS.WNOHANG)
189       except OSError, exc:
190         if exc.errno == E.ECHILD:
191           break
192       if pid == 0:
193         break
194       if pid not in me._pid:
195         continue
196       queue, tag = me._pid[pid]
197       if OS.WIFEXITED(status):
198         exit = OS.WEXITSTATUS(status)
199         if exit == 0:
200           code = None
201         else:
202           code = ['exit-nonzero', str(exit)]
203       elif OS.WIFSIGNALED(status):
204         code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
205       else:
206         code = ['exit-unknown', hex(status)]
207       queue.put((tag, 'exit', code))
208
209 class Command (object):
210   """
211   Represents a running command.
212
213   This class is the main interface to the machery provided by the ChildWatch
214   and ErrorWatch objects.  See also potwatch.
215   """
216
217   def __init__(me, info, queue, tag, args, env):
218     """
219     Start a new child process.
220
221     The ARGS are a list of arguments to be given to the child process.  The
222     ENV is either None or a dictionary of environment variable assignments to
223     override the extant environment.  INFO is a list of tokens to be included
224     in warnings about the child's stderr output.  If the child writes a line
225     to standard output, put (TAG, 'stdout', LINE) to the QUEUE.  When the
226     child exits, write (TAG, 'exit', CODE) to the QUEUE.
227     """
228     me._info = info
229     me._q = queue
230     me._tag = tag
231     myenv = OS.environ.copy()
232     if env: myenv.update(env)
233     me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
234                           stdout = PROC.PIPE, stderr = PROC.PIPE)
235     me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
236     errorwatch.watch(me._proc.stderr, info)
237     childwatch.watch(me._proc.pid, queue, tag)
238
239   def __del__(me):
240     """
241     If I've been forgotten then stop watching for termination.
242     """
243     childwatch.unwatch(me._proc.pid)
244
245 def potwatch(what, name, q):
246   """
247   Watch the queue Q for activity as reported by a Command object.
248
249   Information from the process's stdout is reported as
250
251     NOTE WHAT NAME stdout LINE
252
253   abnormal termination is reported as
254
255     WARN WHAT NAME CODE
256
257   where CODE is what the ChildWatch wrote.
258   """
259   eofp = deadp = False
260   while not deadp or not eofp:
261     _, kind, more = q.get()
262     if kind == 'stdout':
263       if more is None:
264         eofp = True
265       else:
266         S.notify('connect', what, name, 'stdout', more)
267     elif kind == 'exit':
268       if more: S.warn('connect', what, name, *more)
269       deadp = True
270
271 ###--------------------------------------------------------------------------
272 ### Peer database utilities.
273
274 _magic = ['_magic']                     # An object distinct from all others
275
276 class Peer (object):
277   """Representation of a peer in the database."""
278
279   def __init__(me, peer, cdb = None):
280     """
281     Create a new peer, named PEER.
282
283     Information about the peer is read from the database CDB, or the default
284     one given on the command-line.
285     """
286     me.name = peer
287     record = (cdb or CDB.init(opts.cdb))['P' + peer]
288     me.__dict__.update(M.URLDecode(record, semip = True))
289
290   def get(me, key, default = _magic, filter = None):
291     """
292     Get the information stashed under KEY from the peer's database record.
293
294     If DEFAULT is given, then use it if the database doesn't contain the
295     necessary information.  If no DEFAULT is given, then report an error.  If
296     a FILTER function is given then apply it to the information from the
297     database before returning it.
298     """
299     try:
300       attr = me.__dict__[key]
301     except KeyError:
302       if default is _magic:
303         raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
304       return default
305     else:
306       if filter is not None: attr = filter(attr)
307       return attr
308
309   def has(me, key):
310     """
311     Return whether the peer's database record has the KEY.
312     """
313     return key in me.__dict__
314
315   def list(me):
316     """
317     Iterate over the available keys in the peer's database record.
318     """
319     return me.__dict__.iterkeys()
320
321 def boolean(value):
322   """Parse VALUE as a boolean."""
323   return value in ['t', 'true', 'y', 'yes', 'on']
324
325 ###--------------------------------------------------------------------------
326 ### Waking up and watching peers.
327
328 def run_connect(peer, cmd):
329   """
330   Start the job of connecting to the passive PEER.
331
332   The CMD string is a shell command which will connect to the peer (via some
333   back-channel, say ssh and userv), issue a command
334
335     SVCSUBMIT connect passive [OPTIONS] USER
336
337   and write the resulting challenge to standard error.
338   """
339   q = T.Queue()
340   cmd = Command(['connect', peer.name], q, 'connect',
341                 ['/bin/sh', '-c', cmd], None)
342   _, kind, more = q.peek()
343   if kind == 'stdout':
344     if more is None:
345       S.warn('connect', 'connect', peer.name, 'unexpected-eof')
346     else:
347       chal = more
348       S.greet(peer.name, chal)
349       q.get()
350   potwatch('connect', peer.name, q)
351
352 def run_disconnect(peer, cmd):
353   """
354   Start the job of disconnecting from a passive PEER.
355
356   The CMD string is a shell command which will disconnect from the peer.
357   """
358   q = T.Queue()
359   cmd = Command(['disconnect', peer.name], q, 'disconnect',
360                 ['/bin/sh', '-c', cmd], None)
361   potwatch('disconnect', peer.name, q)
362
363 _pingseq = 0
364 class PingPeer (object):
365   """
366   Object representing a peer which we are pinging to ensure that it is still
367   present.
368
369   PingPeer objects are held by the Pinger (q.v.).  The Pinger maintains an
370   event queue -- which saves us from having an enormous swarm of coroutines
371   -- but most of the actual work is done here.
372
373   In order to avoid confusion between different PingPeer instances for the
374   same actual peer, each PingPeer has a sequence number (its `seq'
375   attribute).  Events for the PingPeer are identified by a (PEER, SEQ) pair.
376   (Using the PingPeer instance itself will prevent garbage collection of
377   otherwise defunct instances.)
378   """
379
380   def __init__(me, pinger, queue, peer, pingnow):
381     """
382     Create a new PingPeer.
383
384     The PINGER is the Pinger object we should send the results to.  This is
385     used when we remove ourselves, if the peer has been explicitly removed.
386
387     The QUEUE is the event queue on which timer and ping-command events
388     should be written.
389
390     The PEER is a `Peer' object describing the peer.
391
392     If PINGNOW is true, then immediately start pinging the peer.  Otherwise
393     wait until the usual retry interval.
394     """
395     global _pingseq
396     me._pinger = pinger
397     me._q = queue
398     me._peer = peer.name
399     me.update(peer)
400     me.seq = _pingseq
401     _pingseq += 1
402     me._failures = 0
403     me._sabotage = False
404     me._last = '-'
405     me._nping = 0
406     me._nlost = 0
407     me._sigma_t = 0
408     me._sigma_t2 = 0
409     me._min = me._max = '-'
410     if pingnow:
411       me._timer = None
412       me._ping()
413     else:
414       me._timer = M.SelTimer(time() + me._every, me._time)
415
416   def update(me, peer):
417     """
418     Refreshes the timer parameters for this peer.  We don't, however,
419     immediately reschedule anything: that will happen next time anything
420     interesting happens.
421     """
422     if peer is None: peer = Peer(me._peer)
423     assert peer.name == me._peer
424     me._every = peer.get('every', filter = T.timespec, default = 120)
425     me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
426     me._retries = peer.get('retries', filter = int, default = 5)
427     me._connectp = peer.has('connect')
428     return me
429
430   def _ping(me):
431     """
432     Send a ping to the peer; the result is sent to the Pinger's event queue.
433     """
434     S.rawcommand(T.TripeAsynchronousCommand(
435       me._q, (me._peer, me.seq),
436       ['EPING',
437        '-background', S.bgtag(),
438        '-timeout', str(me._timeout),
439        '--',
440        me._peer]))
441
442   def _reconnect(me):
443     try:
444       peer = Peer(me._peer)
445       if me._connectp:
446         S.warn('connect', 'reconnecting', me._peer)
447         S.forcekx(me._peer)
448         T.spawn(run_connect, peer, peer.get('connect'))
449         me._timer = M.SelTimer(time() + me._every, me._time)
450         me._sabotage = False
451       else:
452         S.kill(me._peer)
453     except TripeError, e:
454       if e.args[0] == 'unknown-peer': me._pinger.kill(me._peer)
455
456   def event(me, code, stuff):
457     """
458     Respond to an event which happened to this peer.
459
460     Timer events indicate that we should start a new ping.  (The server has
461     its own timeout which detects lost packets.)
462
463     We trap unknown-peer responses and detach from the Pinger.
464
465     If the ping fails and we run out of retries, we attempt to restart the
466     connection.
467     """
468     if code == 'TIMER':
469       me._failures = 0
470       me._ping()
471     elif code == 'FAIL':
472       S.notify('connect', 'ping-failed', me._peer, *stuff)
473       if not stuff: pass
474       elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer)
475       elif stuff[0] == 'ping-send-failed': me._reconnect()
476     elif code == 'INFO':
477       outcome = stuff[0]
478       if outcome == 'ping-ok' and me._sabotage:
479         outcome = 'ping-timeout'
480       if outcome == 'ping-ok':
481         if me._failures > 0: S.warn('connect', 'ping-ok', me._peer)
482         t = float(stuff[1])
483         me._last = '%.1fms' % t
484         me._sigma_t += t
485         me._sigma_t2 += t*t
486         me._nping += 1
487         if me._min == '-' or t < me._min: me._min = t
488         if me._max == '-' or t > me._max: me._max = t
489         me._timer = M.SelTimer(time() + me._every, me._time)
490       elif outcome == 'ping-timeout':
491         me._failures += 1
492         me._nlost += 1
493         S.warn('connect', 'ping-timeout', me._peer,
494                'attempt', str(me._failures), 'of', str(me._retries))
495         if me._failures < me._retries:
496           me._ping()
497           me._last = 'timeout'
498         else:
499           me._reconnect()
500           me._last = 'reconnect'
501       elif outcome == 'ping-peer-died':
502         me._pinger.kill(me._peer)
503
504   def sabotage(me):
505     """Sabotage the peer, for testing purposes."""
506     me._sabotage = True
507     if me._timer: me._timer.kill()
508     T.defer(me._time)
509
510   def info(me):
511     if not me._nping:
512       mean = sd = '-'
513     else:
514       mean = me._sigma_t/me._nping
515       sd = sqrt(me._sigma_t2/me._nping - mean*mean)
516     n = me._nping + me._nlost
517     if not n: pclost = '-'
518     else: pclost = '%d' % ((100*me._nlost + n//2)//n)
519     return { 'last-ping': me._last,
520              'mean-ping': '%.1fms' % mean,
521              'sd-ping': '%.1fms' % sd,
522              'n-ping': '%d' % me._nping,
523              'n-lost': '%d' % me._nlost,
524              'percent-lost': pclost,
525              'min-ping': '%.1fms' % me._min,
526              'max-ping': '%.1fms' % me._max,
527              'state': me._timer and 'idle' or 'check',
528              'failures': me._failures }
529
530   @T._callback
531   def _time(me):
532     """
533     Handle timer callbacks by posting a timeout event on the queue.
534     """
535     me._timer = None
536     me._q.put(((me._peer, me.seq), 'TIMER', None))
537
538   def __str__(me):
539     return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
540   def __repr__(me):
541     return str(me)
542
543 class Pinger (T.Coroutine):
544   """
545   The Pinger keeps track of the peers which we expect to be connected and
546   takes action if they seem to stop responding.
547
548   There is usually only one Pinger, called pinger.
549
550   The Pinger maintains a collection of PingPeer objects, and an event queue.
551   The PingPeers direct the results of their pings, and timer events, to the
552   event queue.  The Pinger's coroutine picks items off the queue and
553   dispatches them back to the PingPeers as appropriate.
554   """
555
556   def __init__(me):
557     """Initialize the Pinger."""
558     T.Coroutine.__init__(me)
559     me._peers = {}
560     me._q = T.Queue()
561
562   def run(me):
563     """
564     Coroutine function: reads the pinger queue and sends events to the
565     PingPeer objects they correspond to.
566     """
567     while True:
568       (peer, seq), code, stuff = me._q.get()
569       if peer in me._peers and seq == me._peers[peer].seq:
570         try: me._peers[peer].event(code, stuff)
571         except Exception, e:
572           SYS.excepthook(*SYS.exc_info())
573
574   def add(me, peer, pingnow):
575     """
576     Add PEER to the collection of peers under the Pinger's watchful eye.
577     The arguments are as for PingPeer: see above.
578     """
579     me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
580     return me
581
582   def kill(me, peername):
583     """Remove PEER from the peers being watched by the Pinger."""
584     try: del me._peers[peername]
585     except KeyError: pass
586     return me
587
588   def rescan(me, startup):
589     """
590     General resynchronization method.
591
592     We scan the list of peers (with connect scripts) known at the server.
593     Any which are known to the Pinger but aren't known to the server are
594     removed from our list; newly arrived peers are added.  (Note that a peer
595     can change state here either due to the server sneakily changing its list
596     without issuing notifications or, more likely, the database changing its
597     idea of whether a peer is interesting.)  Finally, PingPeers which are
598     still present are prodded to update their timing parameters.
599
600     This method is called once at startup to pick up the peers already
601     installed, and again by the dbwatcher coroutine when it detects a change
602     to the database.
603     """
604     if T._debug: print '# rescan peers'
605     correct = {}
606     start = {}
607     for name in S.list():
608       try: peer = Peer(name)
609       except KeyError: continue
610       if peer.get('watch', filter = boolean, default = False):
611         if T._debug: print '# interesting peer %s' % peer
612         correct[peer.name] = start[peer.name] = peer
613       elif startup:
614         if T._debug: print '# peer %s ready for adoption' % peer
615         start[peer.name] = peer
616     for name, obj in me._peers.items():
617       try:
618         peer = correct[name]
619       except KeyError:
620         if T._debug: print '# peer %s vanished' % name
621         del me._peers[name]
622       else:
623         obj.update(peer)
624     for name, peer in start.iteritems():
625       if name in me._peers: continue
626       if startup:
627         if T._debug: print '# setting up peer %s' % name
628         ifname = S.ifname(name)
629         addr = S.addr(name)
630         T.defer(adoptpeer, peer, ifname, *addr)
631       else:
632         if T._debug: print '# adopting new peer %s' % name
633         me.add(peer, True)
634     return me
635
636   def adopted(me):
637     """
638     Returns the list of peers being watched by the Pinger.
639     """
640     return me._peers.keys()
641
642   def find(me, name):
643     """Return the PingPeer with the given name."""
644     return me._peers[name]
645
646 ###--------------------------------------------------------------------------
647 ### New connections.
648
649 def encode_envvars(env, prefix, vars):
650   """
651   Encode the variables in VARS suitably for including in a program
652   environment.  Lowercase letters in variable names are forced to uppercase;
653   runs of non-alphanumeric characters are replaced by single underscores; and
654   the PREFIX is prepended.  The resulting variables are written to ENV.
655   """
656   for k, v in vars.iteritems():
657     env[prefix + r_bad.sub('_', k.upper())] = v
658
659 r_bad = RX.compile(r'[\W_]+')
660 def envvars(peer):
661   """
662   Translate the database information for a PEER into a dictionary of
663   environment variables with plausible upper-case names and a P_ prefix.
664   Also collect the crypto information into A_ variables.
665   """
666   env = {}
667   encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
668   encode_envvars(env, 'A_', S.algs(peer.name))
669   return env
670
671 def run_ifupdown(what, peer, *args):
672   """
673   Run the interface up/down script for a peer.
674
675   WHAT is 'ifup' or 'ifdown'.  PEER names the peer in question.  ARGS is a
676   list of arguments to pass to the script, in addition to the peer name.
677
678   The command is run and watched in the background by potwatch.
679   """
680   q = T.Queue()
681   c = Command([what, peer.name], q, what,
682               M.split(peer.get(what), quotep = True)[0] +
683               [peer.name] + list(args),
684               envvars(peer))
685   potwatch(what, peer.name, q)
686
687 def adoptpeer(peer, ifname, *addr):
688   """
689   Add a new peer to our collection.
690
691   PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
692   ADDR is the list of tokens representing its address.
693
694   We try to bring up the interface and provoke a connection to the peer if
695   it's passive.
696   """
697   if peer.has('ifup'):
698     T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
699         .switch('ifup', peer, ifname, *addr)
700   cmd = peer.get('connect', default = None)
701   if cmd is not None:
702     T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
703         .switch(peer, cmd)
704   if peer.get('watch', filter = boolean, default = False):
705     pinger.add(peer, False)
706
707 def disownpeer(peer):
708   """Drop the PEER from the Pinger and put its interface to bed."""
709   try: pinger.kill(peer)
710   except KeyError: pass
711   cmd = peer.get('disconnect', default = None)
712   if cmd is not None:
713     T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
714         .switch(peer, cmd)
715   if peer.has('ifdown'):
716     T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
717         .switch('ifdown', peer)
718
719 def addpeer(peer, addr):
720   """
721   Process a connect request from a new peer PEER on address ADDR.
722
723   Any existing peer with this name is disconnected from the server.
724   """
725   if peer.name in S.list():
726     S.kill(peer.name)
727   try:
728     S.add(peer.name,
729           tunnel = peer.get('tunnel', default = None),
730           keepalive = peer.get('keepalive', default = None),
731           key = peer.get('key', default = None),
732           priv = peer.get('priv', default = None),
733           mobile = peer.get('mobile', filter = boolean, default = False),
734           cork = peer.get('cork', filter = boolean, default = False),
735           *addr)
736   except T.TripeError, exc:
737     raise T.TripeJobError(*exc.args)
738
739 ## Dictionary mapping challenges to waiting passive-connection coroutines.
740 chalmap = {}
741
742 def notify(_, code, *rest):
743   """
744   Watch for notifications.
745
746   We trap ADD and KILL notifications, and send them straight to adoptpeer and
747   disownpeer respectively; and dispatch GREET notifications to the
748   corresponding waiting coroutine.
749   """
750   if code == 'ADD':
751     try: p = Peer(rest[0])
752     except KeyError: return
753     adoptpeer(p, *rest[1:])
754   elif code == 'KILL':
755     try: p = Peer(rest[0])
756     except KeyError: return
757     disownpeer(p, *rest[1:])
758   elif code == 'GREET':
759     chal = rest[0]
760     try: cr = chalmap[chal]
761     except KeyError: pass
762     else: cr.switch(rest[1:])
763
764 ###--------------------------------------------------------------------------
765 ### Command implementation.
766
767 def cmd_kick(name):
768   """
769   kick NAME: Force a new connection attempt for the NAMEd peer.
770   """
771   try: pp = pinger.find(name)
772   except KeyError: raise T.TripeJobError('peer-not-adopted', name)
773   try: peer = Peer(name)
774   except KeyError: raise T.TripeJobError('unknown-peer', name)
775   conn = peer.get('connect', None)
776   if conn: T.spawn(run_connect, peer, peer.get('connect'))
777   else: T.spawn(lambda p: S.forcekx(p.name), peer)
778
779 def cmd_adopted():
780   """
781   adopted: Report a list of adopted peers.
782   """
783   for name in pinger.adopted():
784     T.svcinfo(name)
785
786 def cmd_active(name):
787   """
788   active NAME: Handle an active connection request for the peer called NAME.
789
790   The appropriate address is read from the database automatically.
791   """
792   try: peer = Peer(name)
793   except KeyError: raise T.TripeJobError('unknown-peer', name)
794   addr = peer.get('peer')
795   if addr == 'PASSIVE':
796     raise T.TripeJobError('passive-peer', name)
797   addpeer(peer, M.split(addr, quotep = True)[0])
798
799 def cmd_listactive():
800   """
801   list: Report a list of the available active peers.
802   """
803   cdb = CDB.init(opts.cdb)
804   for key in cdb.keys():
805     if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
806       T.svcinfo(key[1:])
807
808 def cmd_info(name):
809   """
810   info NAME: Report the database entries for the named peer.
811   """
812   try: peer = Peer(name)
813   except KeyError: raise T.TripeJobError('unknown-peer', name)
814   d = {}
815   try: pp = pinger.find(name)
816   except KeyError: pass
817   else: d.update(pp.info())
818   items = list(peer.list()) + d.keys()
819   items.sort()
820   for i in items:
821     try: v = d[i]
822     except KeyError: v = peer.get(i)
823     T.svcinfo('%s=%s' % (i, v))
824
825 def cmd_userpeer(user):
826   """
827   userpeer USER: Report the peer name for the named user.
828   """
829   try: name = CDB.init(opts.cdb)['U' + user]
830   except KeyError: raise T.TripeJobError('unknown-user', user)
831   T.svcinfo(name)
832
833 def cmd_passive(*args):
834   """
835   passive [OPTIONS] USER: Await the arrival of the named USER.
836
837   Report a challenge; when (and if!) the server receives a greeting quoting
838   this challenge, add the corresponding peer to the server.
839   """
840   timeout = 30
841   op = T.OptParse(args, ['-timeout'])
842   for opt in op:
843     if opt == '-timeout':
844       timeout = T.timespec(op.arg())
845   user, = op.rest(1, 1)
846   try: name = CDB.init(opts.cdb)['U' + user]
847   except KeyError: raise T.TripeJobError('unknown-user', user)
848   try: peer = Peer(name)
849   except KeyError: raise T.TripeJobError('unknown-peer', name)
850   chal = S.getchal()
851   cr = T.Coroutine.getcurrent()
852   timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
853   try:
854     T.svcinfo(chal)
855     chalmap[chal] = cr
856     addr = cr.parent.switch()
857     if addr is None:
858       raise T.TripeJobError('connect-timeout')
859     addpeer(peer, addr)
860   finally:
861     del chalmap[chal]
862
863 def cmd_sabotage(name):
864   """
865   sabotage NAME: Sabotage the NAMEd peer so that we think it can't be pinged.
866   """
867   try: pp = pinger.find(name)
868   except KeyError: raise T.TripeJobError('unknown-peer', name)
869   pp.sabotage()
870
871 ###--------------------------------------------------------------------------
872 ### Start up.
873
874 def setup():
875   """
876   Service setup.
877
878   Register the notification watcher, rescan the peers, and add automatic
879   active peers.
880   """
881   S.handler['NOTE'] = notify
882   S.watch('+n')
883
884   pinger.rescan(opts.startup)
885
886   if opts.startup:
887     cdb = CDB.init(opts.cdb)
888     try:
889       autos = cdb['%AUTO']
890     except KeyError:
891       autos = ''
892     for name in M.split(autos)[0]:
893       try:
894         peer = Peer(name, cdb)
895         addpeer(peer, M.split(peer.get('peer'), quotep = True)[0])
896       except T.TripeJobError, err:
897         S.warn('connect', 'auto-add-failed', name, *err.args)
898
899 def init():
900   """
901   Initialization to be done before service startup.
902   """
903   global errorwatch, childwatch, pinger
904   errorwatch = ErrorWatch()
905   childwatch = ChildWatch()
906   pinger = Pinger()
907   T.Coroutine(dbwatch, name = 'dbwatch').switch()
908   errorwatch.switch()
909   pinger.switch()
910
911 def parse_options():
912   """
913   Parse the command-line options.
914
915   Automatically changes directory to the requested configdir, and turns on
916   debugging.  Returns the options object.
917   """
918   op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
919                     version = '%%prog %s' % VERSION)
920
921   op.add_option('-a', '--admin-socket',
922                 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
923                 help = 'Select socket to connect to [default %default]')
924   op.add_option('-d', '--directory',
925                 metavar = 'DIR', dest = 'dir', default = T.configdir,
926                 help = 'Select current diretory [default %default]')
927   op.add_option('-p', '--peerdb',
928                 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
929                 help = 'Select peers database [default %default]')
930   op.add_option('--daemon', dest = 'daemon',
931                 default = False, action = 'store_true',
932                 help = 'Become a daemon after successful initialization')
933   op.add_option('--debug', dest = 'debug',
934                 default = False, action = 'store_true',
935                 help = 'Emit debugging trace information')
936   op.add_option('--startup', dest = 'startup',
937                 default = False, action = 'store_true',
938                 help = 'Being called as part of the server startup')
939
940   opts, args = op.parse_args()
941   if args: op.error('no arguments permitted')
942   OS.chdir(opts.dir)
943   T._debug = opts.debug
944   return opts
945
946 ## Service table, for running manually.
947 service_info = [('connect', T.VERSION, {
948   'adopted': (0, 0, '', cmd_adopted),
949   'kick': (1, 1, 'PEER', cmd_kick),
950   'passive': (1, None, '[OPTIONS] USER', cmd_passive),
951   'active': (1, 1, 'PEER', cmd_active),
952   'info': (1, 1, 'PEER', cmd_info),
953   'list-active': (0, 0, '', cmd_listactive),
954   'userpeer': (1, 1, 'USER', cmd_userpeer),
955   'sabotage': (1, 1, 'PEER', cmd_sabotage)
956 })]
957
958 if __name__ == '__main__':
959   opts = parse_options()
960   OS.environ['TRIPESOCK'] = opts.tripesock
961   T.runservices(opts.tripesock, service_info,
962                 init = init, setup = setup,
963                 daemon = opts.daemon)
964
965 ###----- That's all, folks --------------------------------------------------