chiark / gitweb /
svc/connect.in: Change the idiom for handling peer nonexistence.
[tripe] / svc / connect.in
1 #! @PYTHON@
2 ### -*-python-*-
3 ###
4 ### Connect to remote peers, and keep track of them
5 ###
6 ### (c) 2007 Straylight/Edgeware
7 ###
8
9 ###----- Licensing notice ---------------------------------------------------
10 ###
11 ### This file is part of Trivial IP Encryption (TrIPE).
12 ###
13 ### TrIPE is free software: you can redistribute it and/or modify it under
14 ### the terms of the GNU General Public License as published by the Free
15 ### Software Foundation; either version 3 of the License, or (at your
16 ### option) any later version.
17 ###
18 ### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19 ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20 ### FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
21 ### for more details.
22 ###
23 ### You should have received a copy of the GNU General Public License
24 ### along with TrIPE.  If not, see <https://www.gnu.org/licenses/>.
25
26 VERSION = '@VERSION@'
27
28 ###--------------------------------------------------------------------------
29 ### External dependencies.
30
31 from optparse import OptionParser
32 import tripe as T
33 import os as OS
34 import signal as SIG
35 import errno as E
36 from math import sqrt
37 import cdb as CDB
38 import mLib as M
39 import re as RX
40 import sys as SYS
41 from time import time
42 import subprocess as PROC
43
44 S = T.svcmgr
45
46 ###--------------------------------------------------------------------------
47 ### Running auxiliary commands.
48
49 class SelLineQueue (M.SelLineBuffer):
50   """Glues the select-line-buffer into the coroutine queue system."""
51
52   def __new__(cls, file, queue, tag, kind):
53     """See __init__ for documentation."""
54     return M.SelLineBuffer.__new__(cls, file.fileno())
55
56   def __init__(me, file, queue, tag, kind):
57     """
58     Initialize a new line-reading adaptor.
59
60     The adaptor reads lines from FILE.  Each line is inserted as a message of
61     the stated KIND, bearing the TAG, into the QUEUE.  End-of-file is
62     represented as None.
63     """
64     me._q = queue
65     me._file = file
66     me._tag = tag
67     me._kind = kind
68     me.enable()
69
70   @T._callback
71   def line(me, line):
72     me._q.put((me._tag, me._kind, line))
73
74   @T._callback
75   def eof(me):
76     me.disable()
77     me._q.put((me._tag, me._kind, None))
78
79 class ErrorWatch (T.Coroutine):
80   """
81   An object which watches stderr streams for errors and converts them into
82   warnings of the form
83
84     WARN connect INFO stderr LINE
85
86   The INFO is a list of tokens associated with the file when it was
87   registered.
88
89   Usually there is a single ErrorWatch object, called errorwatch.
90   """
91
92   def __init__(me):
93     """Initialization: there are no arguments."""
94     T.Coroutine.__init__(me)
95     me._q = T.Queue()
96     me._map = {}
97     me._seq = 1
98
99   def watch(me, file, info):
100     """
101     Adds FILE to the collection of files to watch.
102
103     INFO will be written in the warning messages from this FILE.  Returns a
104     sequence number which can be used to unregister the file again.
105     """
106     seq = me._seq
107     me._seq += 1
108     me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
109     return seq
110
111   def unwatch(me, seq):
112     """Stop watching the file with sequence number SEQ."""
113     del me._map[seq]
114     return me
115
116   def run(me):
117     """
118     Coroutine function: read items from the queue and report them.
119
120     Unregisters files automatically when they reach EOF.
121     """
122     while True:
123       seq, _, line = me._q.get()
124       if line is None:
125         me.unwatch(seq)
126       else:
127         S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
128
129 def dbwatch():
130   """
131   Coroutine function: wake up every minute and notice changes to the
132   database.  When a change happens, tell the Pinger (q.v.) to rescan its
133   peers.
134   """
135   cr = T.Coroutine.getcurrent()
136   main = cr.parent
137   fw = M.FWatch(opts.cdb)
138   while True:
139     timer = M.SelTimer(time() + 60, lambda: cr.switch())
140     main.switch()
141     if fw.update():
142       pinger.rescan(False)
143       S.notify('connect', 'peerdb-update')
144
145 class ChildWatch (M.SelSignal):
146   """
147   An object which watches for specified processes exiting and reports
148   terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
149
150   There is usually only one ChildWatch object, called childwatch.
151   """
152
153   def __new__(cls):
154     """Initialize the child-watcher."""
155     return M.SelSignal.__new__(cls, SIG.SIGCHLD)
156
157   def __init__(me):
158     """Initialize the child-watcher."""
159     me._pid = {}
160     me.enable()
161
162   def watch(me, pid, queue, tag):
163     """
164     Register PID as a child to watch.  If it exits, write (TAG, 'exit', CODE)
165     to the QUEUE, where CODE is one of
166
167       * None (successful termination)
168       * ['exit-nonzero', CODE] (CODE is a string!)
169       * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
170       * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
171     """
172     me._pid[pid] = queue, tag
173     return me
174
175   def unwatch(me, pid):
176     """Unregister PID as a child to watch."""
177     del me._pid[pid]
178     return me
179
180   @T._callback
181   def signalled(me):
182     """
183     Called when child processes exit: collect exit statuses and report
184     failures.
185     """
186     while True:
187       try:
188         pid, status = OS.waitpid(-1, OS.WNOHANG)
189       except OSError, exc:
190         if exc.errno == E.ECHILD:
191           break
192       if pid == 0:
193         break
194       if pid not in me._pid:
195         continue
196       queue, tag = me._pid[pid]
197       if OS.WIFEXITED(status):
198         exit = OS.WEXITSTATUS(status)
199         if exit == 0:
200           code = None
201         else:
202           code = ['exit-nonzero', str(exit)]
203       elif OS.WIFSIGNALED(status):
204         code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
205       else:
206         code = ['exit-unknown', hex(status)]
207       queue.put((tag, 'exit', code))
208
209 class Command (object):
210   """
211   Represents a running command.
212
213   This class is the main interface to the machery provided by the ChildWatch
214   and ErrorWatch objects.  See also potwatch.
215   """
216
217   def __init__(me, info, queue, tag, args, env):
218     """
219     Start a new child process.
220
221     The ARGS are a list of arguments to be given to the child process.  The
222     ENV is either None or a dictionary of environment variable assignments to
223     override the extant environment.  INFO is a list of tokens to be included
224     in warnings about the child's stderr output.  If the child writes a line
225     to standard output, put (TAG, 'stdout', LINE) to the QUEUE.  When the
226     child exits, write (TAG, 'exit', CODE) to the QUEUE.
227     """
228     me._info = info
229     me._q = queue
230     me._tag = tag
231     myenv = OS.environ.copy()
232     if env: myenv.update(env)
233     me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
234                           stdout = PROC.PIPE, stderr = PROC.PIPE)
235     me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
236     errorwatch.watch(me._proc.stderr, info)
237     childwatch.watch(me._proc.pid, queue, tag)
238
239   def __del__(me):
240     """
241     If I've been forgotten then stop watching for termination.
242     """
243     childwatch.unwatch(me._proc.pid)
244
245 def potwatch(what, name, q):
246   """
247   Watch the queue Q for activity as reported by a Command object.
248
249   Information from the process's stdout is reported as
250
251     NOTE WHAT NAME stdout LINE
252
253   abnormal termination is reported as
254
255     WARN WHAT NAME CODE
256
257   where CODE is what the ChildWatch wrote.
258   """
259   eofp = deadp = False
260   while not deadp or not eofp:
261     _, kind, more = q.get()
262     if kind == 'stdout':
263       if more is None:
264         eofp = True
265       else:
266         S.notify('connect', what, name, 'stdout', more)
267     elif kind == 'exit':
268       if more: S.warn('connect', what, name, *more)
269       deadp = True
270
271 ###--------------------------------------------------------------------------
272 ### Peer database utilities.
273
274 _magic = ['_magic']                     # An object distinct from all others
275
276 class Peer (object):
277   """Representation of a peer in the database."""
278
279   def __init__(me, peer, cdb = None):
280     """
281     Create a new peer, named PEER.
282
283     Information about the peer is read from the database CDB, or the default
284     one given on the command-line.
285     """
286     me.name = peer
287     record = (cdb or CDB.init(opts.cdb))['P' + peer]
288     me.__dict__.update(M.URLDecode(record, semip = True))
289
290   def get(me, key, default = _magic, filter = None):
291     """
292     Get the information stashed under KEY from the peer's database record.
293
294     If DEFAULT is given, then use it if the database doesn't contain the
295     necessary information.  If no DEFAULT is given, then report an error.  If
296     a FILTER function is given then apply it to the information from the
297     database before returning it.
298     """
299     try:
300       attr = me.__dict__[key]
301     except KeyError:
302       if default is _magic:
303         raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
304       return default
305     else:
306       if filter is not None: attr = filter(attr)
307       return attr
308
309   def has(me, key):
310     """
311     Return whether the peer's database record has the KEY.
312     """
313     return key in me.__dict__
314
315   def list(me):
316     """
317     Iterate over the available keys in the peer's database record.
318     """
319     return me.__dict__.iterkeys()
320
321 def boolean(value):
322   """Parse VALUE as a boolean."""
323   return value in ['t', 'true', 'y', 'yes', 'on']
324
325 ###--------------------------------------------------------------------------
326 ### Waking up and watching peers.
327
328 def run_connect(peer, cmd):
329   """
330   Start the job of connecting to the passive PEER.
331
332   The CMD string is a shell command which will connect to the peer (via some
333   back-channel, say ssh and userv), issue a command
334
335     SVCSUBMIT connect passive [OPTIONS] USER
336
337   and write the resulting challenge to standard error.
338   """
339   q = T.Queue()
340   cmd = Command(['connect', peer.name], q, 'connect',
341                 ['/bin/sh', '-c', cmd], None)
342   _, kind, more = q.peek()
343   if kind == 'stdout':
344     if more is None:
345       S.warn('connect', 'connect', peer.name, 'unexpected-eof')
346     else:
347       chal = more
348       S.greet(peer.name, chal)
349       q.get()
350   potwatch('connect', peer.name, q)
351
352 def run_disconnect(peer, cmd):
353   """
354   Start the job of disconnecting from a passive PEER.
355
356   The CMD string is a shell command which will disconnect from the peer.
357   """
358   q = T.Queue()
359   cmd = Command(['disconnect', peer.name], q, 'disconnect',
360                 ['/bin/sh', '-c', cmd], None)
361   potwatch('disconnect', peer.name, q)
362
363 _pingseq = 0
364 class PingPeer (object):
365   """
366   Object representing a peer which we are pinging to ensure that it is still
367   present.
368
369   PingPeer objects are held by the Pinger (q.v.).  The Pinger maintains an
370   event queue -- which saves us from having an enormous swarm of coroutines
371   -- but most of the actual work is done here.
372
373   In order to avoid confusion between different PingPeer instances for the
374   same actual peer, each PingPeer has a sequence number (its `seq'
375   attribute).  Events for the PingPeer are identified by a (PEER, SEQ) pair.
376   (Using the PingPeer instance itself will prevent garbage collection of
377   otherwise defunct instances.)
378   """
379
380   def __init__(me, pinger, queue, peer, pingnow):
381     """
382     Create a new PingPeer.
383
384     The PINGER is the Pinger object we should send the results to.  This is
385     used when we remove ourselves, if the peer has been explicitly removed.
386
387     The QUEUE is the event queue on which timer and ping-command events
388     should be written.
389
390     The PEER is a `Peer' object describing the peer.
391
392     If PINGNOW is true, then immediately start pinging the peer.  Otherwise
393     wait until the usual retry interval.
394     """
395     global _pingseq
396     me._pinger = pinger
397     me._q = queue
398     me._peer = peer.name
399     me.update(peer)
400     me.seq = _pingseq
401     _pingseq += 1
402     me._failures = 0
403     me._sabotage = False
404     me._last = '-'
405     me._nping = 0
406     me._nlost = 0
407     me._sigma_t = 0
408     me._sigma_t2 = 0
409     me._min = me._max = '-'
410     if pingnow:
411       me._timer = None
412       me._ping()
413     else:
414       me._timer = M.SelTimer(time() + me._every, me._time)
415
416   def update(me, peer):
417     """
418     Refreshes the timer parameters for this peer.  We don't, however,
419     immediately reschedule anything: that will happen next time anything
420     interesting happens.
421     """
422     if peer is None: peer = Peer(me._peer)
423     assert peer.name == me._peer
424     me._every = peer.get('every', filter = T.timespec, default = 120)
425     me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
426     me._retries = peer.get('retries', filter = int, default = 5)
427     me._connectp = peer.has('connect')
428     me._knockp = peer.has('knock')
429     return me
430
431   def _ping(me):
432     """
433     Send a ping to the peer; the result is sent to the Pinger's event queue.
434     """
435     S.rawcommand(T.TripeAsynchronousCommand(
436       me._q, (me._peer, me.seq),
437       ['EPING',
438        '-background', S.bgtag(),
439        '-timeout', str(me._timeout),
440        '--',
441        me._peer]))
442
443   def _reconnect(me):
444     try:
445       peer = Peer(me._peer)
446       if me._connectp or me._knockp:
447         S.warn('connect', 'reconnecting', me._peer)
448         S.forcekx(me._peer, quiet = not me._knockp)
449         if me._connectp: T.spawn(run_connect, peer, peer.get('connect'))
450         me._timer = M.SelTimer(time() + me._every, me._time)
451         me._sabotage = False
452       else:
453         S.kill(me._peer)
454     except T.TripeError, e:
455       if e.args[0] == 'unknown-peer': me._pinger.kill(me._peer)
456
457   def event(me, code, stuff):
458     """
459     Respond to an event which happened to this peer.
460
461     Timer events indicate that we should start a new ping.  (The server has
462     its own timeout which detects lost packets.)
463
464     We trap unknown-peer responses and detach from the Pinger.
465
466     If the ping fails and we run out of retries, we attempt to restart the
467     connection.
468     """
469     if code == 'TIMER':
470       me._failures = 0
471       me._ping()
472     elif code == 'FAIL':
473       S.notify('connect', 'ping-failed', me._peer, *stuff)
474       if not stuff: pass
475       elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer)
476       elif stuff[0] == 'ping-send-failed': me._reconnect()
477     elif code == 'INFO':
478       outcome = stuff[0]
479       if outcome == 'ping-ok' and me._sabotage:
480         outcome = 'ping-timeout'
481       if outcome == 'ping-ok':
482         if me._failures > 0: S.warn('connect', 'ping-ok', me._peer)
483         t = float(stuff[1])
484         me._last = '%.1fms' % t
485         me._sigma_t += t
486         me._sigma_t2 += t*t
487         me._nping += 1
488         if me._min == '-' or t < me._min: me._min = t
489         if me._max == '-' or t > me._max: me._max = t
490         me._timer = M.SelTimer(time() + me._every, me._time)
491       elif outcome == 'ping-timeout':
492         me._failures += 1
493         me._nlost += 1
494         S.warn('connect', 'ping-timeout', me._peer,
495                'attempt', str(me._failures), 'of', str(me._retries))
496         if me._failures < me._retries:
497           me._ping()
498           me._last = 'timeout'
499         else:
500           me._reconnect()
501           me._last = 'reconnect'
502       elif outcome == 'ping-peer-died':
503         me._pinger.kill(me._peer)
504
505   def sabotage(me):
506     """Sabotage the peer, for testing purposes."""
507     me._sabotage = True
508     if me._timer: me._timer.kill()
509     T.defer(me._time)
510
511   def info(me):
512     if not me._nping:
513       mean = sd = min = max = '-'
514     else:
515       meanval = me._sigma_t/me._nping
516       mean = '%.1fms' % meanval
517       sd = '%.1fms' % sqrt(me._sigma_t2/me._nping - meanval*meanval)
518       min = '%.1fms' % me._min
519       max = '%.1fms' % me._max
520     n = me._nping + me._nlost
521     if not n: pclost = '-'
522     else: pclost = '%d' % ((100*me._nlost + n//2)//n)
523     return { 'last-ping': me._last,
524              'mean-ping': mean,
525              'sd-ping': sd,
526              'n-ping': '%d' % me._nping,
527              'n-lost': '%d' % me._nlost,
528              'percent-lost': pclost,
529              'min-ping': min,
530              'max-ping': max,
531              'state': me._timer and 'idle' or 'check',
532              'failures': str(me._failures) }
533
534   @T._callback
535   def _time(me):
536     """
537     Handle timer callbacks by posting a timeout event on the queue.
538     """
539     me._timer = None
540     me._q.put(((me._peer, me.seq), 'TIMER', None))
541
542   def __str__(me):
543     return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
544   def __repr__(me):
545     return str(me)
546
547 class Pinger (T.Coroutine):
548   """
549   The Pinger keeps track of the peers which we expect to be connected and
550   takes action if they seem to stop responding.
551
552   There is usually only one Pinger, called pinger.
553
554   The Pinger maintains a collection of PingPeer objects, and an event queue.
555   The PingPeers direct the results of their pings, and timer events, to the
556   event queue.  The Pinger's coroutine picks items off the queue and
557   dispatches them back to the PingPeers as appropriate.
558   """
559
560   def __init__(me):
561     """Initialize the Pinger."""
562     T.Coroutine.__init__(me)
563     me._peers = {}
564     me._q = T.Queue()
565
566   def run(me):
567     """
568     Coroutine function: reads the pinger queue and sends events to the
569     PingPeer objects they correspond to.
570     """
571     while True:
572       (peer, seq), code, stuff = me._q.get()
573       if peer in me._peers and seq == me._peers[peer].seq:
574         try: me._peers[peer].event(code, stuff)
575         except Exception, e:
576           SYS.excepthook(*SYS.exc_info())
577
578   def add(me, peer, pingnow):
579     """
580     Add PEER to the collection of peers under the Pinger's watchful eye.
581     The arguments are as for PingPeer: see above.
582     """
583     me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
584     return me
585
586   def kill(me, peername):
587     """Remove PEER from the peers being watched by the Pinger."""
588     try: del me._peers[peername]
589     except KeyError: pass
590     return me
591
592   def rescan(me, startup):
593     """
594     General resynchronization method.
595
596     We scan the list of peers (with connect scripts) known at the server.
597     Any which are known to the Pinger but aren't known to the server are
598     removed from our list; newly arrived peers are added.  (Note that a peer
599     can change state here either due to the server sneakily changing its list
600     without issuing notifications or, more likely, the database changing its
601     idea of whether a peer is interesting.)  Finally, PingPeers which are
602     still present are prodded to update their timing parameters.
603
604     This method is called once at startup to pick up the peers already
605     installed, and again by the dbwatcher coroutine when it detects a change
606     to the database.
607     """
608     if T._debug: print '# rescan peers'
609     correct = {}
610     start = {}
611     for name in S.list():
612       try: peer = Peer(name)
613       except KeyError: continue
614       if peer.get('watch', filter = boolean, default = False):
615         if T._debug: print '# interesting peer %s' % peer
616         correct[peer.name] = start[peer.name] = peer
617       elif startup:
618         if T._debug: print '# peer %s ready for adoption' % peer
619         start[peer.name] = peer
620     for name, obj in me._peers.items():
621       try:
622         peer = correct[name]
623       except KeyError:
624         if T._debug: print '# peer %s vanished' % name
625         del me._peers[name]
626       else:
627         obj.update(peer)
628     for name, peer in start.iteritems():
629       if name in me._peers: continue
630       if startup:
631         if T._debug: print '# setting up peer %s' % name
632         ifname = S.ifname(name)
633         addr = S.addr(name)
634         T.defer(adoptpeer, peer, ifname, *addr)
635       else:
636         if T._debug: print '# adopting new peer %s' % name
637         me.add(peer, True)
638     return me
639
640   def adopted(me):
641     """
642     Returns the list of peers being watched by the Pinger.
643     """
644     return me._peers.keys()
645
646   def find(me, name):
647     """Return the PingPeer with the given name."""
648     return me._peers[name]
649
650 ###--------------------------------------------------------------------------
651 ### New connections.
652
653 def encode_envvars(env, prefix, vars):
654   """
655   Encode the variables in VARS suitably for including in a program
656   environment.  Lowercase letters in variable names are forced to uppercase;
657   runs of non-alphanumeric characters are replaced by single underscores; and
658   the PREFIX is prepended.  The resulting variables are written to ENV.
659   """
660   for k, v in vars.iteritems():
661     env[prefix + r_bad.sub('_', k.upper())] = v
662
663 r_bad = RX.compile(r'[\W_]+')
664 def envvars(peer):
665   """
666   Translate the database information for a PEER into a dictionary of
667   environment variables with plausible upper-case names and a P_ prefix.
668   Also collect the crypto information into A_ variables.
669   """
670   env = {}
671   encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
672   encode_envvars(env, 'A_', S.algs(peer.name))
673   return env
674
675 def run_ifupdown(what, peer, *args):
676   """
677   Run the interface up/down script for a peer.
678
679   WHAT is 'ifup' or 'ifdown'.  PEER names the peer in question.  ARGS is a
680   list of arguments to pass to the script, in addition to the peer name.
681
682   The command is run and watched in the background by potwatch.
683   """
684   q = T.Queue()
685   c = Command([what, peer.name], q, what,
686               M.split(peer.get(what), quotep = True)[0] +
687               [peer.name] + list(args),
688               envvars(peer))
689   potwatch(what, peer.name, q)
690
691 def adoptpeer(peer, ifname, *addr):
692   """
693   Add a new peer to our collection.
694
695   PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
696   ADDR is the list of tokens representing its address.
697
698   We try to bring up the interface and provoke a connection to the peer if
699   it's passive.
700   """
701   if peer.has('ifup'):
702     T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
703         .switch('ifup', peer, ifname, *addr)
704   cmd = peer.get('connect', default = None)
705   if cmd is not None:
706     T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
707         .switch(peer, cmd)
708   if peer.get('watch', filter = boolean, default = False):
709     pinger.add(peer, False)
710
711 def disownpeer(peer):
712   """Drop the PEER from the Pinger and put its interface to bed."""
713   try: pinger.kill(peer)
714   except KeyError: pass
715   cmd = peer.get('disconnect', default = None)
716   if cmd is not None:
717     T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
718         .switch(peer, cmd)
719   if peer.has('ifdown'):
720     T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
721         .switch('ifdown', peer)
722
723 def addpeer(peer, addr, ephemp):
724   """
725   Process a connect request from a new peer PEER on address ADDR.
726
727   Any existing peer with this name is disconnected from the server.  EPHEMP
728   is the default ephemeral-ness state for the new peer.
729   """
730   if peer.name in S.list():
731     S.kill(peer.name)
732   try:
733     S.add(peer.name,
734           tunnel = peer.get('tunnel', default = None),
735           keepalive = peer.get('keepalive', default = None),
736           key = peer.get('key', default = None),
737           priv = peer.get('priv', default = None),
738           mobile = peer.get('mobile', filter = boolean, default = False),
739           knock = peer.get('knock', default = None),
740           cork = peer.get('cork', filter = boolean, default = False),
741           ephemeral = peer.get('ephemeral', filter = boolean,
742                                default = ephemp),
743           *addr)
744   except T.TripeError, exc:
745     raise T.TripeJobError(*exc.args)
746
747 ## Dictionary mapping challenges to waiting passive-connection coroutines.
748 chalmap = {}
749
750 def notify(_, code, *rest):
751   """
752   Watch for notifications.
753
754   We trap ADD and KILL notifications, and send them straight to adoptpeer and
755   disownpeer respectively; and dispatch GREET notifications to the
756   corresponding waiting coroutine.
757   """
758   if code == 'ADD':
759     try: p = Peer(rest[0])
760     except KeyError: pass
761     else: adoptpeer(p, *rest[1:])
762   elif code == 'KILL':
763     try: p = Peer(rest[0])
764     except KeyError: pass
765     else: disownpeer(p, *rest[1:])
766   elif code == 'GREET':
767     chal = rest[0]
768     try: cr = chalmap[chal]
769     except KeyError: pass
770     else: cr.switch(rest[1:])
771   elif code == 'KNOCK':
772     try: p = Peer(rest[0])
773     except KeyError:
774       S.warn(['connect', 'knock-unknown-peer', rest[0]])
775       return
776     if p.get('peer') != 'PASSIVE':
777       S.warn(['connect', 'knock-active-peer', p.name])
778       return
779     dot = p.name.find('.')
780     if dot >= 0: kname = p.name[dot + 1:]
781     else: kname = p.name
782     ktag = p.get('key', p.name)
783     if kname != ktag:
784       S.warn(['connect', 'knock-tag-mismatch',
785               'peer', pname, 'public-key-tag', ktag])
786       return
787     T.spawn(addpeer, p, rest[1:], True)
788
789 ###--------------------------------------------------------------------------
790 ### Command implementation.
791
792 def cmd_kick(name):
793   """
794   kick NAME: Force a new connection attempt for the NAMEd peer.
795   """
796   try: pp = pinger.find(name)
797   except KeyError: raise T.TripeJobError('peer-not-adopted', name)
798   try: peer = Peer(name)
799   except KeyError: raise T.TripeJobError('unknown-peer', name)
800   conn = peer.get('connect', None)
801   if conn: T.spawn(run_connect, peer, peer.get('connect'))
802   else: T.spawn(lambda p: S.forcekx(p.name), peer)
803
804 def cmd_adopted():
805   """
806   adopted: Report a list of adopted peers.
807   """
808   for name in pinger.adopted():
809     T.svcinfo(name)
810
811 def cmd_active(name):
812   """
813   active NAME: Handle an active connection request for the peer called NAME.
814
815   The appropriate address is read from the database automatically.
816   """
817   try: peer = Peer(name)
818   except KeyError: raise T.TripeJobError('unknown-peer', name)
819   addr = peer.get('peer')
820   if addr == 'PASSIVE':
821     raise T.TripeJobError('passive-peer', name)
822   addpeer(peer, M.split(addr, quotep = True)[0], True)
823
824 def cmd_listactive():
825   """
826   list: Report a list of the available active peers.
827   """
828   cdb = CDB.init(opts.cdb)
829   for key in cdb.keys():
830     if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
831       T.svcinfo(key[1:])
832
833 def cmd_info(name):
834   """
835   info NAME: Report the database entries for the named peer.
836   """
837   try: peer = Peer(name)
838   except KeyError: raise T.TripeJobError('unknown-peer', name)
839   d = {}
840   try: pp = pinger.find(name)
841   except KeyError: pass
842   else: d.update(pp.info())
843   items = list(peer.list()) + d.keys()
844   items.sort()
845   for i in items:
846     try: v = d[i]
847     except KeyError: v = peer.get(i)
848     T.svcinfo('%s=%s' % (i, v.replace('\n', ' ')))
849
850 def cmd_userpeer(user):
851   """
852   userpeer USER: Report the peer name for the named user.
853   """
854   try: name = CDB.init(opts.cdb)['U' + user]
855   except KeyError: raise T.TripeJobError('unknown-user', user)
856   T.svcinfo(name)
857
858 def cmd_passive(*args):
859   """
860   passive [OPTIONS] USER: Await the arrival of the named USER.
861
862   Report a challenge; when (and if!) the server receives a greeting quoting
863   this challenge, add the corresponding peer to the server.
864   """
865   timeout = 30
866   op = T.OptParse(args, ['-timeout'])
867   for opt in op:
868     if opt == '-timeout':
869       timeout = T.timespec(op.arg())
870   user, = op.rest(1, 1)
871   try: name = CDB.init(opts.cdb)['U' + user]
872   except KeyError: raise T.TripeJobError('unknown-user', user)
873   try: peer = Peer(name)
874   except KeyError: raise T.TripeJobError('unknown-peer', name)
875   chal = S.getchal()
876   cr = T.Coroutine.getcurrent()
877   timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
878   try:
879     T.svcinfo(chal)
880     chalmap[chal] = cr
881     addr = cr.parent.switch()
882     if addr is None:
883       raise T.TripeJobError('connect-timeout')
884     addpeer(peer, addr, True)
885   finally:
886     del chalmap[chal]
887
888 def cmd_sabotage(name):
889   """
890   sabotage NAME: Sabotage the NAMEd peer so that we think it can't be pinged.
891   """
892   try: pp = pinger.find(name)
893   except KeyError: raise T.TripeJobError('unknown-peer', name)
894   pp.sabotage()
895
896 ###--------------------------------------------------------------------------
897 ### Start up.
898
899 def setup():
900   """
901   Service setup.
902
903   Register the notification watcher, rescan the peers, and add automatic
904   active peers.
905   """
906   S.handler['NOTE'] = notify
907   S.watch('+n')
908
909   pinger.rescan(opts.startup)
910
911   if opts.startup:
912     cdb = CDB.init(opts.cdb)
913     try:
914       autos = cdb['%AUTO']
915     except KeyError:
916       autos = ''
917     for name in M.split(autos)[0]:
918       try:
919         peer = Peer(name, cdb)
920         addpeer(peer, M.split(peer.get('peer'), quotep = True)[0], False)
921       except T.TripeJobError, err:
922         S.warn('connect', 'auto-add-failed', name, *err.args)
923
924 def init():
925   """
926   Initialization to be done before service startup.
927   """
928   global errorwatch, childwatch, pinger
929   errorwatch = ErrorWatch()
930   childwatch = ChildWatch()
931   pinger = Pinger()
932   T.Coroutine(dbwatch, name = 'dbwatch').switch()
933   errorwatch.switch()
934   pinger.switch()
935
936 def parse_options():
937   """
938   Parse the command-line options.
939
940   Automatically changes directory to the requested configdir, and turns on
941   debugging.  Returns the options object.
942   """
943   op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
944                     version = '%%prog %s' % VERSION)
945
946   op.add_option('-a', '--admin-socket',
947                 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
948                 help = 'Select socket to connect to [default %default]')
949   op.add_option('-d', '--directory',
950                 metavar = 'DIR', dest = 'dir', default = T.configdir,
951                 help = 'Select current diretory [default %default]')
952   op.add_option('-p', '--peerdb',
953                 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
954                 help = 'Select peers database [default %default]')
955   op.add_option('--daemon', dest = 'daemon',
956                 default = False, action = 'store_true',
957                 help = 'Become a daemon after successful initialization')
958   op.add_option('--debug', dest = 'debug',
959                 default = False, action = 'store_true',
960                 help = 'Emit debugging trace information')
961   op.add_option('--startup', dest = 'startup',
962                 default = False, action = 'store_true',
963                 help = 'Being called as part of the server startup')
964
965   opts, args = op.parse_args()
966   if args: op.error('no arguments permitted')
967   OS.chdir(opts.dir)
968   T._debug = opts.debug
969   return opts
970
971 ## Service table, for running manually.
972 service_info = [('connect', VERSION, {
973   'adopted': (0, 0, '', cmd_adopted),
974   'kick': (1, 1, 'PEER', cmd_kick),
975   'passive': (1, None, '[OPTIONS] USER', cmd_passive),
976   'active': (1, 1, 'PEER', cmd_active),
977   'info': (1, 1, 'PEER', cmd_info),
978   'list-active': (0, 0, '', cmd_listactive),
979   'userpeer': (1, 1, 'USER', cmd_userpeer),
980   'sabotage': (1, 1, 'PEER', cmd_sabotage)
981 })]
982
983 if __name__ == '__main__':
984   opts = parse_options()
985   OS.environ['TRIPESOCK'] = opts.tripesock
986   T.runservices(opts.tripesock, service_info,
987                 init = init, setup = setup,
988                 daemon = opts.daemon)
989
990 ###----- That's all, folks --------------------------------------------------