chiark / gitweb /
Merge branch '1.0.0pre19.x'
[tripe] / svc / connect.in
1 #! @PYTHON@
2 ### -*-python-*-
3 ###
4 ### Connect to remote peers, and keep track of them
5 ###
6 ### (c) 2007 Straylight/Edgeware
7 ###
8
9 ###----- Licensing notice ---------------------------------------------------
10 ###
11 ### This file is part of Trivial IP Encryption (TrIPE).
12 ###
13 ### TrIPE is free software: you can redistribute it and/or modify it under
14 ### the terms of the GNU General Public License as published by the Free
15 ### Software Foundation; either version 3 of the License, or (at your
16 ### option) any later version.
17 ###
18 ### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19 ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20 ### FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
21 ### for more details.
22 ###
23 ### You should have received a copy of the GNU General Public License
24 ### along with TrIPE.  If not, see <https://www.gnu.org/licenses/>.
25
26 VERSION = '@VERSION@'
27
28 ###--------------------------------------------------------------------------
29 ### External dependencies.
30
31 from optparse import OptionParser
32 import tripe as T
33 import os as OS
34 import signal as SIG
35 import errno as E
36 from math import sqrt
37 import cdb as CDB
38 import mLib as M
39 import re as RX
40 import sys as SYS
41 from time import time
42 import subprocess as PROC
43
44 S = T.svcmgr
45
46 ###--------------------------------------------------------------------------
47 ### Running auxiliary commands.
48
49 class SelLineQueue (M.SelLineBuffer):
50   """Glues the select-line-buffer into the coroutine queue system."""
51
52   def __new__(cls, file, queue, tag, kind):
53     """See __init__ for documentation."""
54     return M.SelLineBuffer.__new__(cls, file.fileno())
55
56   def __init__(me, file, queue, tag, kind):
57     """
58     Initialize a new line-reading adaptor.
59
60     The adaptor reads lines from FILE.  Each line is inserted as a message of
61     the stated KIND, bearing the TAG, into the QUEUE.  End-of-file is
62     represented as None.
63     """
64     me._q = queue
65     me._file = file
66     me._tag = tag
67     me._kind = kind
68     me.enable()
69
70   @T._callback
71   def line(me, line):
72     me._q.put((me._tag, me._kind, line))
73
74   @T._callback
75   def eof(me):
76     me.disable()
77     me._q.put((me._tag, me._kind, None))
78
79 class ErrorWatch (T.Coroutine):
80   """
81   An object which watches stderr streams for errors and converts them into
82   warnings of the form
83
84     WARN connect INFO stderr LINE
85
86   The INFO is a list of tokens associated with the file when it was
87   registered.
88
89   Usually there is a single ErrorWatch object, called errorwatch.
90   """
91
92   def __init__(me):
93     """Initialization: there are no arguments."""
94     T.Coroutine.__init__(me)
95     me._q = T.Queue()
96     me._map = {}
97     me._seq = 1
98
99   def watch(me, file, info):
100     """
101     Adds FILE to the collection of files to watch.
102
103     INFO will be written in the warning messages from this FILE.  Returns a
104     sequence number which can be used to unregister the file again.
105     """
106     seq = me._seq
107     me._seq += 1
108     me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
109     return seq
110
111   def unwatch(me, seq):
112     """Stop watching the file with sequence number SEQ."""
113     del me._map[seq]
114     return me
115
116   def run(me):
117     """
118     Coroutine function: read items from the queue and report them.
119
120     Unregisters files automatically when they reach EOF.
121     """
122     while True:
123       seq, _, line = me._q.get()
124       if line is None:
125         me.unwatch(seq)
126       else:
127         S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
128
129 def dbwatch():
130   """
131   Coroutine function: wake up every minute and notice changes to the
132   database.  When a change happens, tell the Pinger (q.v.) to rescan its
133   peers.
134   """
135   cr = T.Coroutine.getcurrent()
136   main = cr.parent
137   fw = M.FWatch(opts.cdb)
138   while True:
139     timer = M.SelTimer(time() + 60, lambda: cr.switch())
140     main.switch()
141     if fw.update():
142       pinger.rescan(False)
143       S.notify('connect', 'peerdb-update')
144
145 class ChildWatch (M.SelSignal):
146   """
147   An object which watches for specified processes exiting and reports
148   terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
149
150   There is usually only one ChildWatch object, called childwatch.
151   """
152
153   def __new__(cls):
154     """Initialize the child-watcher."""
155     return M.SelSignal.__new__(cls, SIG.SIGCHLD)
156
157   def __init__(me):
158     """Initialize the child-watcher."""
159     me._pid = {}
160     me.enable()
161
162   def watch(me, pid, queue, tag):
163     """
164     Register PID as a child to watch.  If it exits, write (TAG, 'exit', CODE)
165     to the QUEUE, where CODE is one of
166
167       * None (successful termination)
168       * ['exit-nonzero', CODE] (CODE is a string!)
169       * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
170       * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
171     """
172     me._pid[pid] = queue, tag
173     return me
174
175   def unwatch(me, pid):
176     """Unregister PID as a child to watch."""
177     del me._pid[pid]
178     return me
179
180   @T._callback
181   def signalled(me):
182     """
183     Called when child processes exit: collect exit statuses and report
184     failures.
185     """
186     while True:
187       try:
188         pid, status = OS.waitpid(-1, OS.WNOHANG)
189       except OSError, exc:
190         if exc.errno == E.ECHILD:
191           break
192       if pid == 0:
193         break
194       if pid not in me._pid:
195         continue
196       queue, tag = me._pid[pid]
197       if OS.WIFEXITED(status):
198         exit = OS.WEXITSTATUS(status)
199         if exit == 0:
200           code = None
201         else:
202           code = ['exit-nonzero', str(exit)]
203       elif OS.WIFSIGNALED(status):
204         code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
205       else:
206         code = ['exit-unknown', hex(status)]
207       queue.put((tag, 'exit', code))
208
209 class Command (object):
210   """
211   Represents a running command.
212
213   This class is the main interface to the machery provided by the ChildWatch
214   and ErrorWatch objects.  See also potwatch.
215   """
216
217   def __init__(me, info, queue, tag, args, env):
218     """
219     Start a new child process.
220
221     The ARGS are a list of arguments to be given to the child process.  The
222     ENV is either None or a dictionary of environment variable assignments to
223     override the extant environment.  INFO is a list of tokens to be included
224     in warnings about the child's stderr output.  If the child writes a line
225     to standard output, put (TAG, 'stdout', LINE) to the QUEUE.  When the
226     child exits, write (TAG, 'exit', CODE) to the QUEUE.
227     """
228     me._info = info
229     me._q = queue
230     me._tag = tag
231     myenv = OS.environ.copy()
232     if env: myenv.update(env)
233     me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
234                           stdout = PROC.PIPE, stderr = PROC.PIPE)
235     me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
236     errorwatch.watch(me._proc.stderr, info)
237     childwatch.watch(me._proc.pid, queue, tag)
238
239   def __del__(me):
240     """
241     If I've been forgotten then stop watching for termination.
242     """
243     childwatch.unwatch(me._proc.pid)
244
245 def potwatch(what, name, q):
246   """
247   Watch the queue Q for activity as reported by a Command object.
248
249   Information from the process's stdout is reported as
250
251     NOTE WHAT NAME stdout LINE
252
253   abnormal termination is reported as
254
255     WARN WHAT NAME CODE
256
257   where CODE is what the ChildWatch wrote.
258   """
259   eofp = deadp = False
260   while not deadp or not eofp:
261     _, kind, more = q.get()
262     if kind == 'stdout':
263       if more is None:
264         eofp = True
265       else:
266         S.notify('connect', what, name, 'stdout', more)
267     elif kind == 'exit':
268       if more: S.warn('connect', what, name, *more)
269       deadp = True
270
271 ###--------------------------------------------------------------------------
272 ### Peer database utilities.
273
274 _magic = ['_magic']                     # An object distinct from all others
275
276 class Peer (object):
277   """Representation of a peer in the database."""
278
279   def __init__(me, peer, cdb = None):
280     """
281     Create a new peer, named PEER.
282
283     Information about the peer is read from the database CDB, or the default
284     one given on the command-line.
285     """
286     me.name = peer
287     record = (cdb or CDB.init(opts.cdb))['P' + peer]
288     me.__dict__.update(M.URLDecode(record, semip = True))
289
290   def get(me, key, default = _magic, filter = None):
291     """
292     Get the information stashed under KEY from the peer's database record.
293
294     If DEFAULT is given, then use it if the database doesn't contain the
295     necessary information.  If no DEFAULT is given, then report an error.  If
296     a FILTER function is given then apply it to the information from the
297     database before returning it.
298     """
299     try:
300       attr = me.__dict__[key]
301     except KeyError:
302       if default is _magic:
303         raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
304       return default
305     else:
306       if filter is not None: attr = filter(attr)
307       return attr
308
309   def has(me, key):
310     """
311     Return whether the peer's database record has the KEY.
312     """
313     return key in me.__dict__
314
315   def list(me):
316     """
317     Iterate over the available keys in the peer's database record.
318     """
319     return me.__dict__.iterkeys()
320
321 def boolean(value):
322   """Parse VALUE as a boolean."""
323   return value in ['t', 'true', 'y', 'yes', 'on']
324
325 ###--------------------------------------------------------------------------
326 ### Waking up and watching peers.
327
328 def run_connect(peer, cmd):
329   """
330   Start the job of connecting to the passive PEER.
331
332   The CMD string is a shell command which will connect to the peer (via some
333   back-channel, say ssh and userv), issue a command
334
335     SVCSUBMIT connect passive [OPTIONS] USER
336
337   and write the resulting challenge to standard error.
338   """
339   q = T.Queue()
340   cmd = Command(['connect', peer.name], q, 'connect',
341                 ['/bin/sh', '-c', cmd], None)
342   _, kind, more = q.peek()
343   if kind == 'stdout':
344     if more is None:
345       S.warn('connect', 'connect', peer.name, 'unexpected-eof')
346     else:
347       chal = more
348       S.greet(peer.name, chal)
349       q.get()
350   potwatch('connect', peer.name, q)
351
352 def run_disconnect(peer, cmd):
353   """
354   Start the job of disconnecting from a passive PEER.
355
356   The CMD string is a shell command which will disconnect from the peer.
357   """
358   q = T.Queue()
359   cmd = Command(['disconnect', peer.name], q, 'disconnect',
360                 ['/bin/sh', '-c', cmd], None)
361   potwatch('disconnect', peer.name, q)
362
363 _pingseq = 0
364 class PingPeer (object):
365   """
366   Object representing a peer which we are pinging to ensure that it is still
367   present.
368
369   PingPeer objects are held by the Pinger (q.v.).  The Pinger maintains an
370   event queue -- which saves us from having an enormous swarm of coroutines
371   -- but most of the actual work is done here.
372
373   In order to avoid confusion between different PingPeer instances for the
374   same actual peer, each PingPeer has a sequence number (its `seq'
375   attribute).  Events for the PingPeer are identified by a (PEER, SEQ) pair.
376   (Using the PingPeer instance itself will prevent garbage collection of
377   otherwise defunct instances.)
378   """
379
380   def __init__(me, pinger, queue, peer, pingnow):
381     """
382     Create a new PingPeer.
383
384     The PINGER is the Pinger object we should send the results to.  This is
385     used when we remove ourselves, if the peer has been explicitly removed.
386
387     The QUEUE is the event queue on which timer and ping-command events
388     should be written.
389
390     The PEER is a `Peer' object describing the peer.
391
392     If PINGNOW is true, then immediately start pinging the peer.  Otherwise
393     wait until the usual retry interval.
394     """
395     global _pingseq
396     me._pinger = pinger
397     me._q = queue
398     me._peer = peer.name
399     me.update(peer)
400     me.seq = _pingseq
401     _pingseq += 1
402     me._failures = 0
403     me._sabotage = False
404     me._last = '-'
405     me._nping = 0
406     me._nlost = 0
407     me._sigma_t = 0
408     me._sigma_t2 = 0
409     me._min = me._max = '-'
410     if pingnow:
411       me._timer = None
412       me._ping()
413     else:
414       me._timer = M.SelTimer(time() + me._every, me._time)
415
416   def update(me, peer):
417     """
418     Refreshes the timer parameters for this peer.  We don't, however,
419     immediately reschedule anything: that will happen next time anything
420     interesting happens.
421     """
422     if peer is None: peer = Peer(me._peer)
423     assert peer.name == me._peer
424     me._every = peer.get('every', filter = T.timespec, default = 120)
425     me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
426     me._retries = peer.get('retries', filter = int, default = 5)
427     me._connectp = peer.has('connect')
428     me._knockp = peer.has('knock')
429     return me
430
431   def _ping(me):
432     """
433     Send a ping to the peer; the result is sent to the Pinger's event queue.
434     """
435     S.rawcommand(T.TripeAsynchronousCommand(
436       me._q, (me._peer, me.seq),
437       ['EPING',
438        '-background', S.bgtag(),
439        '-timeout', str(me._timeout),
440        '--',
441        me._peer]))
442
443   def _reconnect(me):
444     try:
445       peer = Peer(me._peer)
446       if me._connectp or me._knockp:
447         S.warn('connect', 'reconnecting', me._peer)
448         S.forcekx(me._peer)
449         if me._connectp: T.spawn(run_connect, peer, peer.get('connect'))
450         me._timer = M.SelTimer(time() + me._every, me._time)
451         me._sabotage = False
452       else:
453         S.kill(me._peer)
454     except TripeError, e:
455       if e.args[0] == 'unknown-peer': me._pinger.kill(me._peer)
456
457   def event(me, code, stuff):
458     """
459     Respond to an event which happened to this peer.
460
461     Timer events indicate that we should start a new ping.  (The server has
462     its own timeout which detects lost packets.)
463
464     We trap unknown-peer responses and detach from the Pinger.
465
466     If the ping fails and we run out of retries, we attempt to restart the
467     connection.
468     """
469     if code == 'TIMER':
470       me._failures = 0
471       me._ping()
472     elif code == 'FAIL':
473       S.notify('connect', 'ping-failed', me._peer, *stuff)
474       if not stuff: pass
475       elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer)
476       elif stuff[0] == 'ping-send-failed': me._reconnect()
477     elif code == 'INFO':
478       outcome = stuff[0]
479       if outcome == 'ping-ok' and me._sabotage:
480         outcome = 'ping-timeout'
481       if outcome == 'ping-ok':
482         if me._failures > 0: S.warn('connect', 'ping-ok', me._peer)
483         t = float(stuff[1])
484         me._last = '%.1fms' % t
485         me._sigma_t += t
486         me._sigma_t2 += t*t
487         me._nping += 1
488         if me._min == '-' or t < me._min: me._min = t
489         if me._max == '-' or t > me._max: me._max = t
490         me._timer = M.SelTimer(time() + me._every, me._time)
491       elif outcome == 'ping-timeout':
492         me._failures += 1
493         me._nlost += 1
494         S.warn('connect', 'ping-timeout', me._peer,
495                'attempt', str(me._failures), 'of', str(me._retries))
496         if me._failures < me._retries:
497           me._ping()
498           me._last = 'timeout'
499         else:
500           me._reconnect()
501           me._last = 'reconnect'
502       elif outcome == 'ping-peer-died':
503         me._pinger.kill(me._peer)
504
505   def sabotage(me):
506     """Sabotage the peer, for testing purposes."""
507     me._sabotage = True
508     if me._timer: me._timer.kill()
509     T.defer(me._time)
510
511   def info(me):
512     if not me._nping:
513       mean = sd = '-'
514     else:
515       mean = me._sigma_t/me._nping
516       sd = sqrt(me._sigma_t2/me._nping - mean*mean)
517     n = me._nping + me._nlost
518     if not n: pclost = '-'
519     else: pclost = '%d' % ((100*me._nlost + n//2)//n)
520     return { 'last-ping': me._last,
521              'mean-ping': '%.1fms' % mean,
522              'sd-ping': '%.1fms' % sd,
523              'n-ping': '%d' % me._nping,
524              'n-lost': '%d' % me._nlost,
525              'percent-lost': pclost,
526              'min-ping': '%.1fms' % me._min,
527              'max-ping': '%.1fms' % me._max,
528              'state': me._timer and 'idle' or 'check',
529              'failures': me._failures }
530
531   @T._callback
532   def _time(me):
533     """
534     Handle timer callbacks by posting a timeout event on the queue.
535     """
536     me._timer = None
537     me._q.put(((me._peer, me.seq), 'TIMER', None))
538
539   def __str__(me):
540     return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
541   def __repr__(me):
542     return str(me)
543
544 class Pinger (T.Coroutine):
545   """
546   The Pinger keeps track of the peers which we expect to be connected and
547   takes action if they seem to stop responding.
548
549   There is usually only one Pinger, called pinger.
550
551   The Pinger maintains a collection of PingPeer objects, and an event queue.
552   The PingPeers direct the results of their pings, and timer events, to the
553   event queue.  The Pinger's coroutine picks items off the queue and
554   dispatches them back to the PingPeers as appropriate.
555   """
556
557   def __init__(me):
558     """Initialize the Pinger."""
559     T.Coroutine.__init__(me)
560     me._peers = {}
561     me._q = T.Queue()
562
563   def run(me):
564     """
565     Coroutine function: reads the pinger queue and sends events to the
566     PingPeer objects they correspond to.
567     """
568     while True:
569       (peer, seq), code, stuff = me._q.get()
570       if peer in me._peers and seq == me._peers[peer].seq:
571         try: me._peers[peer].event(code, stuff)
572         except Exception, e:
573           SYS.excepthook(*SYS.exc_info())
574
575   def add(me, peer, pingnow):
576     """
577     Add PEER to the collection of peers under the Pinger's watchful eye.
578     The arguments are as for PingPeer: see above.
579     """
580     me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
581     return me
582
583   def kill(me, peername):
584     """Remove PEER from the peers being watched by the Pinger."""
585     try: del me._peers[peername]
586     except KeyError: pass
587     return me
588
589   def rescan(me, startup):
590     """
591     General resynchronization method.
592
593     We scan the list of peers (with connect scripts) known at the server.
594     Any which are known to the Pinger but aren't known to the server are
595     removed from our list; newly arrived peers are added.  (Note that a peer
596     can change state here either due to the server sneakily changing its list
597     without issuing notifications or, more likely, the database changing its
598     idea of whether a peer is interesting.)  Finally, PingPeers which are
599     still present are prodded to update their timing parameters.
600
601     This method is called once at startup to pick up the peers already
602     installed, and again by the dbwatcher coroutine when it detects a change
603     to the database.
604     """
605     if T._debug: print '# rescan peers'
606     correct = {}
607     start = {}
608     for name in S.list():
609       try: peer = Peer(name)
610       except KeyError: continue
611       if peer.get('watch', filter = boolean, default = False):
612         if T._debug: print '# interesting peer %s' % peer
613         correct[peer.name] = start[peer.name] = peer
614       elif startup:
615         if T._debug: print '# peer %s ready for adoption' % peer
616         start[peer.name] = peer
617     for name, obj in me._peers.items():
618       try:
619         peer = correct[name]
620       except KeyError:
621         if T._debug: print '# peer %s vanished' % name
622         del me._peers[name]
623       else:
624         obj.update(peer)
625     for name, peer in start.iteritems():
626       if name in me._peers: continue
627       if startup:
628         if T._debug: print '# setting up peer %s' % name
629         ifname = S.ifname(name)
630         addr = S.addr(name)
631         T.defer(adoptpeer, peer, ifname, *addr)
632       else:
633         if T._debug: print '# adopting new peer %s' % name
634         me.add(peer, True)
635     return me
636
637   def adopted(me):
638     """
639     Returns the list of peers being watched by the Pinger.
640     """
641     return me._peers.keys()
642
643   def find(me, name):
644     """Return the PingPeer with the given name."""
645     return me._peers[name]
646
647 ###--------------------------------------------------------------------------
648 ### New connections.
649
650 def encode_envvars(env, prefix, vars):
651   """
652   Encode the variables in VARS suitably for including in a program
653   environment.  Lowercase letters in variable names are forced to uppercase;
654   runs of non-alphanumeric characters are replaced by single underscores; and
655   the PREFIX is prepended.  The resulting variables are written to ENV.
656   """
657   for k, v in vars.iteritems():
658     env[prefix + r_bad.sub('_', k.upper())] = v
659
660 r_bad = RX.compile(r'[\W_]+')
661 def envvars(peer):
662   """
663   Translate the database information for a PEER into a dictionary of
664   environment variables with plausible upper-case names and a P_ prefix.
665   Also collect the crypto information into A_ variables.
666   """
667   env = {}
668   encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
669   encode_envvars(env, 'A_', S.algs(peer.name))
670   return env
671
672 def run_ifupdown(what, peer, *args):
673   """
674   Run the interface up/down script for a peer.
675
676   WHAT is 'ifup' or 'ifdown'.  PEER names the peer in question.  ARGS is a
677   list of arguments to pass to the script, in addition to the peer name.
678
679   The command is run and watched in the background by potwatch.
680   """
681   q = T.Queue()
682   c = Command([what, peer.name], q, what,
683               M.split(peer.get(what), quotep = True)[0] +
684               [peer.name] + list(args),
685               envvars(peer))
686   potwatch(what, peer.name, q)
687
688 def adoptpeer(peer, ifname, *addr):
689   """
690   Add a new peer to our collection.
691
692   PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
693   ADDR is the list of tokens representing its address.
694
695   We try to bring up the interface and provoke a connection to the peer if
696   it's passive.
697   """
698   if peer.has('ifup'):
699     T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
700         .switch('ifup', peer, ifname, *addr)
701   cmd = peer.get('connect', default = None)
702   if cmd is not None:
703     T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
704         .switch(peer, cmd)
705   if peer.get('watch', filter = boolean, default = False):
706     pinger.add(peer, False)
707
708 def disownpeer(peer):
709   """Drop the PEER from the Pinger and put its interface to bed."""
710   try: pinger.kill(peer)
711   except KeyError: pass
712   cmd = peer.get('disconnect', default = None)
713   if cmd is not None:
714     T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
715         .switch(peer, cmd)
716   if peer.has('ifdown'):
717     T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
718         .switch('ifdown', peer)
719
720 def addpeer(peer, addr, ephemp):
721   """
722   Process a connect request from a new peer PEER on address ADDR.
723
724   Any existing peer with this name is disconnected from the server.  EPHEMP
725   is the default ephemeral-ness state for the new peer.
726   """
727   if peer.name in S.list():
728     S.kill(peer.name)
729   try:
730     S.add(peer.name,
731           tunnel = peer.get('tunnel', default = None),
732           keepalive = peer.get('keepalive', default = None),
733           key = peer.get('key', default = None),
734           priv = peer.get('priv', default = None),
735           mobile = peer.get('mobile', filter = boolean, default = False),
736           knock = peer.get('knock', default = None),
737           cork = peer.get('cork', filter = boolean, default = False),
738           ephemeral = peer.get('ephemeral', filter = boolean,
739                                default = ephemp),
740           *addr)
741   except T.TripeError, exc:
742     raise T.TripeJobError(*exc.args)
743
744 ## Dictionary mapping challenges to waiting passive-connection coroutines.
745 chalmap = {}
746
747 def notify(_, code, *rest):
748   """
749   Watch for notifications.
750
751   We trap ADD and KILL notifications, and send them straight to adoptpeer and
752   disownpeer respectively; and dispatch GREET notifications to the
753   corresponding waiting coroutine.
754   """
755   if code == 'ADD':
756     try: p = Peer(rest[0])
757     except KeyError: return
758     adoptpeer(p, *rest[1:])
759   elif code == 'KILL':
760     try: p = Peer(rest[0])
761     except KeyError: return
762     disownpeer(p, *rest[1:])
763   elif code == 'GREET':
764     chal = rest[0]
765     try: cr = chalmap[chal]
766     except KeyError: pass
767     else: cr.switch(rest[1:])
768   elif code == 'KNOCK':
769     try: p = Peer(rest[0])
770     except KeyError:
771       S.warn(['connect', 'knock-unknown-peer', rest[0]])
772       return
773     if p.get('peer') != 'PASSIVE':
774       S.warn(['connect', 'knock-active-peer', p.name])
775       return
776     dot = p.name.find('.')
777     if dot >= 0: kname = p.name[dot + 1:]
778     else: kname = p.name
779     ktag = p.get('key', p.name)
780     if kname != ktag:
781       S.warn(['connect', 'knock-tag-mismatch',
782               'peer', pname, 'public-key-tag', ktag])
783       return
784     T.spawn(addpeer, p, rest[1:], True)
785
786 ###--------------------------------------------------------------------------
787 ### Command implementation.
788
789 def cmd_kick(name):
790   """
791   kick NAME: Force a new connection attempt for the NAMEd peer.
792   """
793   try: pp = pinger.find(name)
794   except KeyError: raise T.TripeJobError('peer-not-adopted', name)
795   try: peer = Peer(name)
796   except KeyError: raise T.TripeJobError('unknown-peer', name)
797   conn = peer.get('connect', None)
798   if conn: T.spawn(run_connect, peer, peer.get('connect'))
799   else: T.spawn(lambda p: S.forcekx(p.name), peer)
800
801 def cmd_adopted():
802   """
803   adopted: Report a list of adopted peers.
804   """
805   for name in pinger.adopted():
806     T.svcinfo(name)
807
808 def cmd_active(name):
809   """
810   active NAME: Handle an active connection request for the peer called NAME.
811
812   The appropriate address is read from the database automatically.
813   """
814   try: peer = Peer(name)
815   except KeyError: raise T.TripeJobError('unknown-peer', name)
816   addr = peer.get('peer')
817   if addr == 'PASSIVE':
818     raise T.TripeJobError('passive-peer', name)
819   addpeer(peer, M.split(addr, quotep = True)[0], True)
820
821 def cmd_listactive():
822   """
823   list: Report a list of the available active peers.
824   """
825   cdb = CDB.init(opts.cdb)
826   for key in cdb.keys():
827     if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
828       T.svcinfo(key[1:])
829
830 def cmd_info(name):
831   """
832   info NAME: Report the database entries for the named peer.
833   """
834   try: peer = Peer(name)
835   except KeyError: raise T.TripeJobError('unknown-peer', name)
836   d = {}
837   try: pp = pinger.find(name)
838   except KeyError: pass
839   else: d.update(pp.info())
840   items = list(peer.list()) + d.keys()
841   items.sort()
842   for i in items:
843     try: v = d[i]
844     except KeyError: v = peer.get(i)
845     T.svcinfo('%s=%s' % (i, v.replace('\n', ' ')))
846
847 def cmd_userpeer(user):
848   """
849   userpeer USER: Report the peer name for the named user.
850   """
851   try: name = CDB.init(opts.cdb)['U' + user]
852   except KeyError: raise T.TripeJobError('unknown-user', user)
853   T.svcinfo(name)
854
855 def cmd_passive(*args):
856   """
857   passive [OPTIONS] USER: Await the arrival of the named USER.
858
859   Report a challenge; when (and if!) the server receives a greeting quoting
860   this challenge, add the corresponding peer to the server.
861   """
862   timeout = 30
863   op = T.OptParse(args, ['-timeout'])
864   for opt in op:
865     if opt == '-timeout':
866       timeout = T.timespec(op.arg())
867   user, = op.rest(1, 1)
868   try: name = CDB.init(opts.cdb)['U' + user]
869   except KeyError: raise T.TripeJobError('unknown-user', user)
870   try: peer = Peer(name)
871   except KeyError: raise T.TripeJobError('unknown-peer', name)
872   chal = S.getchal()
873   cr = T.Coroutine.getcurrent()
874   timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
875   try:
876     T.svcinfo(chal)
877     chalmap[chal] = cr
878     addr = cr.parent.switch()
879     if addr is None:
880       raise T.TripeJobError('connect-timeout')
881     addpeer(peer, addr, True)
882   finally:
883     del chalmap[chal]
884
885 def cmd_sabotage(name):
886   """
887   sabotage NAME: Sabotage the NAMEd peer so that we think it can't be pinged.
888   """
889   try: pp = pinger.find(name)
890   except KeyError: raise T.TripeJobError('unknown-peer', name)
891   pp.sabotage()
892
893 ###--------------------------------------------------------------------------
894 ### Start up.
895
896 def setup():
897   """
898   Service setup.
899
900   Register the notification watcher, rescan the peers, and add automatic
901   active peers.
902   """
903   S.handler['NOTE'] = notify
904   S.watch('+n')
905
906   pinger.rescan(opts.startup)
907
908   if opts.startup:
909     cdb = CDB.init(opts.cdb)
910     try:
911       autos = cdb['%AUTO']
912     except KeyError:
913       autos = ''
914     for name in M.split(autos)[0]:
915       try:
916         peer = Peer(name, cdb)
917         addpeer(peer, M.split(peer.get('peer'), quotep = True)[0], False)
918       except T.TripeJobError, err:
919         S.warn('connect', 'auto-add-failed', name, *err.args)
920
921 def init():
922   """
923   Initialization to be done before service startup.
924   """
925   global errorwatch, childwatch, pinger
926   errorwatch = ErrorWatch()
927   childwatch = ChildWatch()
928   pinger = Pinger()
929   T.Coroutine(dbwatch, name = 'dbwatch').switch()
930   errorwatch.switch()
931   pinger.switch()
932
933 def parse_options():
934   """
935   Parse the command-line options.
936
937   Automatically changes directory to the requested configdir, and turns on
938   debugging.  Returns the options object.
939   """
940   op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
941                     version = '%%prog %s' % VERSION)
942
943   op.add_option('-a', '--admin-socket',
944                 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
945                 help = 'Select socket to connect to [default %default]')
946   op.add_option('-d', '--directory',
947                 metavar = 'DIR', dest = 'dir', default = T.configdir,
948                 help = 'Select current diretory [default %default]')
949   op.add_option('-p', '--peerdb',
950                 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
951                 help = 'Select peers database [default %default]')
952   op.add_option('--daemon', dest = 'daemon',
953                 default = False, action = 'store_true',
954                 help = 'Become a daemon after successful initialization')
955   op.add_option('--debug', dest = 'debug',
956                 default = False, action = 'store_true',
957                 help = 'Emit debugging trace information')
958   op.add_option('--startup', dest = 'startup',
959                 default = False, action = 'store_true',
960                 help = 'Being called as part of the server startup')
961
962   opts, args = op.parse_args()
963   if args: op.error('no arguments permitted')
964   OS.chdir(opts.dir)
965   T._debug = opts.debug
966   return opts
967
968 ## Service table, for running manually.
969 service_info = [('connect', T.VERSION, {
970   'adopted': (0, 0, '', cmd_adopted),
971   'kick': (1, 1, 'PEER', cmd_kick),
972   'passive': (1, None, '[OPTIONS] USER', cmd_passive),
973   'active': (1, 1, 'PEER', cmd_active),
974   'info': (1, 1, 'PEER', cmd_info),
975   'list-active': (0, 0, '', cmd_listactive),
976   'userpeer': (1, 1, 'USER', cmd_userpeer),
977   'sabotage': (1, 1, 'PEER', cmd_sabotage)
978 })]
979
980 if __name__ == '__main__':
981   opts = parse_options()
982   OS.environ['TRIPESOCK'] = opts.tripesock
983   T.runservices(opts.tripesock, service_info,
984                 init = init, setup = setup,
985                 daemon = opts.daemon)
986
987 ###----- That's all, folks --------------------------------------------------