chiark / gitweb /
svc/connect.in: Compress the code a bit.
[tripe] / svc / connect.in
1 #! @PYTHON@
2 ### -*-python-*-
3 ###
4 ### Connect to remote peers, and keep track of them
5 ###
6 ### (c) 2007 Straylight/Edgeware
7 ###
8
9 ###----- Licensing notice ---------------------------------------------------
10 ###
11 ### This file is part of Trivial IP Encryption (TrIPE).
12 ###
13 ### TrIPE is free software: you can redistribute it and/or modify it under
14 ### the terms of the GNU General Public License as published by the Free
15 ### Software Foundation; either version 3 of the License, or (at your
16 ### option) any later version.
17 ###
18 ### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19 ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20 ### FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
21 ### for more details.
22 ###
23 ### You should have received a copy of the GNU General Public License
24 ### along with TrIPE.  If not, see <https://www.gnu.org/licenses/>.
25
26 VERSION = '@VERSION@'
27
28 ###--------------------------------------------------------------------------
29 ### External dependencies.
30
31 from optparse import OptionParser
32 import tripe as T
33 import os as OS
34 import signal as SIG
35 import errno as E
36 from math import sqrt
37 import cdb as CDB
38 import mLib as M
39 import re as RX
40 import sys as SYS
41 from time import time
42 import subprocess as PROC
43
44 S = T.svcmgr
45
46 ###--------------------------------------------------------------------------
47 ### Running auxiliary commands.
48
49 class SelLineQueue (M.SelLineBuffer):
50   """Glues the select-line-buffer into the coroutine queue system."""
51
52   def __new__(cls, file, queue, tag, kind):
53     """See __init__ for documentation."""
54     return M.SelLineBuffer.__new__(cls, file.fileno())
55
56   def __init__(me, file, queue, tag, kind):
57     """
58     Initialize a new line-reading adaptor.
59
60     The adaptor reads lines from FILE.  Each line is inserted as a message of
61     the stated KIND, bearing the TAG, into the QUEUE.  End-of-file is
62     represented as None.
63     """
64     me._q = queue
65     me._file = file
66     me._tag = tag
67     me._kind = kind
68     me.enable()
69
70   @T._callback
71   def line(me, line):
72     me._q.put((me._tag, me._kind, line))
73
74   @T._callback
75   def eof(me):
76     me.disable()
77     me._q.put((me._tag, me._kind, None))
78
79 class ErrorWatch (T.Coroutine):
80   """
81   An object which watches stderr streams for errors and converts them into
82   warnings of the form
83
84     WARN connect INFO stderr LINE
85
86   The INFO is a list of tokens associated with the file when it was
87   registered.
88
89   Usually there is a single ErrorWatch object, called errorwatch.
90   """
91
92   def __init__(me):
93     """Initialization: there are no arguments."""
94     T.Coroutine.__init__(me)
95     me._q = T.Queue()
96     me._map = {}
97     me._seq = 1
98
99   def watch(me, file, info):
100     """
101     Adds FILE to the collection of files to watch.
102
103     INFO will be written in the warning messages from this FILE.  Returns a
104     sequence number which can be used to unregister the file again.
105     """
106     seq = me._seq
107     me._seq += 1
108     me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
109     return seq
110
111   def unwatch(me, seq):
112     """Stop watching the file with sequence number SEQ."""
113     del me._map[seq]
114     return me
115
116   def run(me):
117     """
118     Coroutine function: read items from the queue and report them.
119
120     Unregisters files automatically when they reach EOF.
121     """
122     while True:
123       seq, _, line = me._q.get()
124       if line is None:
125         me.unwatch(seq)
126       else:
127         S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
128
129 def dbwatch():
130   """
131   Coroutine function: wake up every minute and notice changes to the
132   database.  When a change happens, tell the Pinger (q.v.) to rescan its
133   peers.
134   """
135   cr = T.Coroutine.getcurrent()
136   main = cr.parent
137   fw = M.FWatch(opts.cdb)
138   while True:
139     timer = M.SelTimer(time() + 60, lambda: cr.switch())
140     main.switch()
141     if fw.update():
142       pinger.rescan(False)
143       S.notify('connect', 'peerdb-update')
144
145 class ChildWatch (M.SelSignal):
146   """
147   An object which watches for specified processes exiting and reports
148   terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
149
150   There is usually only one ChildWatch object, called childwatch.
151   """
152
153   def __new__(cls):
154     """Initialize the child-watcher."""
155     return M.SelSignal.__new__(cls, SIG.SIGCHLD)
156
157   def __init__(me):
158     """Initialize the child-watcher."""
159     me._pid = {}
160     me.enable()
161
162   def watch(me, pid, queue, tag):
163     """
164     Register PID as a child to watch.  If it exits, write (TAG, 'exit', CODE)
165     to the QUEUE, where CODE is one of
166
167       * None (successful termination)
168       * ['exit-nonzero', CODE] (CODE is a string!)
169       * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
170       * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
171     """
172     me._pid[pid] = queue, tag
173     return me
174
175   def unwatch(me, pid):
176     """Unregister PID as a child to watch."""
177     del me._pid[pid]
178     return me
179
180   @T._callback
181   def signalled(me):
182     """
183     Called when child processes exit: collect exit statuses and report
184     failures.
185     """
186     while True:
187       try:
188         pid, status = OS.waitpid(-1, OS.WNOHANG)
189       except OSError, exc:
190         if exc.errno == E.ECHILD:
191           break
192       if pid == 0:
193         break
194       if pid not in me._pid:
195         continue
196       queue, tag = me._pid[pid]
197       if OS.WIFEXITED(status):
198         exit = OS.WEXITSTATUS(status)
199         if exit == 0:
200           code = None
201         else:
202           code = ['exit-nonzero', str(exit)]
203       elif OS.WIFSIGNALED(status):
204         code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
205       else:
206         code = ['exit-unknown', hex(status)]
207       queue.put((tag, 'exit', code))
208
209 class Command (object):
210   """
211   Represents a running command.
212
213   This class is the main interface to the machery provided by the ChildWatch
214   and ErrorWatch objects.  See also potwatch.
215   """
216
217   def __init__(me, info, queue, tag, args, env):
218     """
219     Start a new child process.
220
221     The ARGS are a list of arguments to be given to the child process.  The
222     ENV is either None or a dictionary of environment variable assignments to
223     override the extant environment.  INFO is a list of tokens to be included
224     in warnings about the child's stderr output.  If the child writes a line
225     to standard output, put (TAG, 'stdout', LINE) to the QUEUE.  When the
226     child exits, write (TAG, 'exit', CODE) to the QUEUE.
227     """
228     me._info = info
229     me._q = queue
230     me._tag = tag
231     myenv = OS.environ.copy()
232     if env: myenv.update(env)
233     me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
234                           stdout = PROC.PIPE, stderr = PROC.PIPE)
235     me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
236     errorwatch.watch(me._proc.stderr, info)
237     childwatch.watch(me._proc.pid, queue, tag)
238
239   def __del__(me):
240     """
241     If I've been forgotten then stop watching for termination.
242     """
243     childwatch.unwatch(me._proc.pid)
244
245 def potwatch(what, name, q):
246   """
247   Watch the queue Q for activity as reported by a Command object.
248
249   Information from the process's stdout is reported as
250
251     NOTE WHAT NAME stdout LINE
252
253   abnormal termination is reported as
254
255     WARN WHAT NAME CODE
256
257   where CODE is what the ChildWatch wrote.
258   """
259   eofp = deadp = False
260   while not deadp or not eofp:
261     _, kind, more = q.get()
262     if kind == 'stdout':
263       if more is None:
264         eofp = True
265       else:
266         S.notify('connect', what, name, 'stdout', more)
267     elif kind == 'exit':
268       if more: S.warn('connect', what, name, *more)
269       deadp = True
270
271 ###--------------------------------------------------------------------------
272 ### Peer database utilities.
273
274 _magic = ['_magic']                     # An object distinct from all others
275
276 class Peer (object):
277   """Representation of a peer in the database."""
278
279   def __init__(me, peer, cdb = None):
280     """
281     Create a new peer, named PEER.
282
283     Information about the peer is read from the database CDB, or the default
284     one given on the command-line.
285     """
286     me.name = peer
287     record = (cdb or CDB.init(opts.cdb))['P' + peer]
288     me.__dict__.update(M.URLDecode(record, semip = True))
289
290   def get(me, key, default = _magic, filter = None):
291     """
292     Get the information stashed under KEY from the peer's database record.
293
294     If DEFAULT is given, then use it if the database doesn't contain the
295     necessary information.  If no DEFAULT is given, then report an error.  If
296     a FILTER function is given then apply it to the information from the
297     database before returning it.
298     """
299     attr = me.__dict__.get(key, default)
300     if attr is _magic:
301       raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
302     elif filter is not None:
303       attr = filter(attr)
304     return attr
305
306   def has(me, key):
307     """
308     Return whether the peer's database record has the KEY.
309     """
310     return key in me.__dict__
311
312   def list(me):
313     """
314     Iterate over the available keys in the peer's database record.
315     """
316     return me.__dict__.iterkeys()
317
318 def boolean(value):
319   """Parse VALUE as a boolean."""
320   return value in ['t', 'true', 'y', 'yes', 'on']
321
322 ###--------------------------------------------------------------------------
323 ### Waking up and watching peers.
324
325 def run_connect(peer, cmd):
326   """
327   Start the job of connecting to the passive PEER.
328
329   The CMD string is a shell command which will connect to the peer (via some
330   back-channel, say ssh and userv), issue a command
331
332     SVCSUBMIT connect passive [OPTIONS] USER
333
334   and write the resulting challenge to standard error.
335   """
336   q = T.Queue()
337   cmd = Command(['connect', peer.name], q, 'connect',
338                 ['/bin/sh', '-c', cmd], None)
339   _, kind, more = q.peek()
340   if kind == 'stdout':
341     if more is None:
342       S.warn('connect', 'connect', peer.name, 'unexpected-eof')
343     else:
344       chal = more
345       S.greet(peer.name, chal)
346       q.get()
347   potwatch('connect', peer.name, q)
348
349 def run_disconnect(peer, cmd):
350   """
351   Start the job of disconnecting from a passive PEER.
352
353   The CMD string is a shell command which will disconnect from the peer.
354   """
355   q = T.Queue()
356   cmd = Command(['disconnect', peer.name], q, 'disconnect',
357                 ['/bin/sh', '-c', cmd], None)
358   potwatch('disconnect', peer.name, q)
359
360 _pingseq = 0
361 class PingPeer (object):
362   """
363   Object representing a peer which we are pinging to ensure that it is still
364   present.
365
366   PingPeer objects are held by the Pinger (q.v.).  The Pinger maintains an
367   event queue -- which saves us from having an enormous swarm of coroutines
368   -- but most of the actual work is done here.
369
370   In order to avoid confusion between different PingPeer instances for the
371   same actual peer, each PingPeer has a sequence number (its `seq'
372   attribute).  Events for the PingPeer are identified by a (PEER, SEQ) pair.
373   (Using the PingPeer instance itself will prevent garbage collection of
374   otherwise defunct instances.)
375   """
376
377   def __init__(me, pinger, queue, peer, pingnow):
378     """
379     Create a new PingPeer.
380
381     The PINGER is the Pinger object we should send the results to.  This is
382     used when we remove ourselves, if the peer has been explicitly removed.
383
384     The QUEUE is the event queue on which timer and ping-command events
385     should be written.
386
387     The PEER is a `Peer' object describing the peer.
388
389     If PINGNOW is true, then immediately start pinging the peer.  Otherwise
390     wait until the usual retry interval.
391     """
392     global _pingseq
393     me._pinger = pinger
394     me._q = queue
395     me._peer = peer.name
396     me.update(peer)
397     me.seq = _pingseq
398     _pingseq += 1
399     me._failures = 0
400     me._sabotage = False
401     me._last = '-'
402     me._nping = 0
403     me._nlost = 0
404     me._sigma_t = 0
405     me._sigma_t2 = 0
406     me._min = me._max = '-'
407     if pingnow:
408       me._timer = None
409       me._ping()
410     else:
411       me._timer = M.SelTimer(time() + me._every, me._time)
412
413   def update(me, peer):
414     """
415     Refreshes the timer parameters for this peer.  We don't, however,
416     immediately reschedule anything: that will happen next time anything
417     interesting happens.
418     """
419     if peer is None: peer = Peer(me._peer)
420     assert peer.name == me._peer
421     me._every = peer.get('every', filter = T.timespec, default = 120)
422     me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
423     me._retries = peer.get('retries', filter = int, default = 5)
424     me._connectp = peer.has('connect')
425     return me
426
427   def _ping(me):
428     """
429     Send a ping to the peer; the result is sent to the Pinger's event queue.
430     """
431     S.rawcommand(T.TripeAsynchronousCommand(
432       me._q, (me._peer, me.seq),
433       ['EPING',
434        '-background', S.bgtag(),
435        '-timeout', str(me._timeout),
436        '--',
437        me._peer]))
438
439   def _reconnect(me):
440     try:
441       peer = Peer(me._peer)
442       if me._connectp:
443         S.warn('connect', 'reconnecting', me._peer)
444         S.forcekx(me._peer)
445         T.spawn(run_connect, peer, peer.get('connect'))
446         me._timer = M.SelTimer(time() + me._every, me._time)
447         me._sabotage = False
448       else:
449         S.kill(me._peer)
450     except TripeError, e:
451       if e.args[0] == 'unknown-peer': me._pinger.kill(me._peer)
452
453   def event(me, code, stuff):
454     """
455     Respond to an event which happened to this peer.
456
457     Timer events indicate that we should start a new ping.  (The server has
458     its own timeout which detects lost packets.)
459
460     We trap unknown-peer responses and detach from the Pinger.
461
462     If the ping fails and we run out of retries, we attempt to restart the
463     connection.
464     """
465     if code == 'TIMER':
466       me._failures = 0
467       me._ping()
468     elif code == 'FAIL':
469       S.notify('connect', 'ping-failed', me._peer, *stuff)
470       if not stuff: pass
471       elif stuff[0] == 'unknown-peer': me._pinger.kill(me._peer)
472       elif stuff[0] == 'ping-send-failed': me._reconnect()
473     elif code == 'INFO':
474       outcome = stuff[0]
475       if outcome == 'ping-ok' and me._sabotage:
476         outcome = 'ping-timeout'
477       if outcome == 'ping-ok':
478         if me._failures > 0: S.warn('connect', 'ping-ok', me._peer)
479         t = float(stuff[1])
480         me._last = '%.1fms' % t
481         me._sigma_t += t
482         me._sigma_t2 += t*t
483         me._nping += 1
484         if me._min == '-' or t < me._min: me._min = t
485         if me._max == '-' or t > me._max: me._max = t
486         me._timer = M.SelTimer(time() + me._every, me._time)
487       elif outcome == 'ping-timeout':
488         me._failures += 1
489         me._nlost += 1
490         S.warn('connect', 'ping-timeout', me._peer,
491                'attempt', str(me._failures), 'of', str(me._retries))
492         if me._failures < me._retries:
493           me._ping()
494           me._last = 'timeout'
495         else:
496           me._reconnect()
497           me._last = 'reconnect'
498       elif outcome == 'ping-peer-died':
499         me._pinger.kill(me._peer)
500
501   def sabotage(me):
502     """Sabotage the peer, for testing purposes."""
503     me._sabotage = True
504     if me._timer: me._timer.kill()
505     T.defer(me._time)
506
507   def info(me):
508     if not me._nping:
509       mean = sd = '-'
510     else:
511       mean = me._sigma_t/me._nping
512       sd = sqrt(me._sigma_t2/me._nping - mean*mean)
513     n = me._nping + me._nlost
514     if not n: pclost = '-'
515     else: pclost = '%d' % ((100*me._nlost + n//2)//n)
516     return { 'last-ping': me._last,
517              'mean-ping': '%.1fms' % mean,
518              'sd-ping': '%.1fms' % sd,
519              'n-ping': '%d' % me._nping,
520              'n-lost': '%d' % me._nlost,
521              'percent-lost': pclost,
522              'min-ping': '%.1fms' % me._min,
523              'max-ping': '%.1fms' % me._max,
524              'state': me._timer and 'idle' or 'check',
525              'failures': me._failures }
526
527   @T._callback
528   def _time(me):
529     """
530     Handle timer callbacks by posting a timeout event on the queue.
531     """
532     me._timer = None
533     me._q.put(((me._peer, me.seq), 'TIMER', None))
534
535   def __str__(me):
536     return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
537   def __repr__(me):
538     return str(me)
539
540 class Pinger (T.Coroutine):
541   """
542   The Pinger keeps track of the peers which we expect to be connected and
543   takes action if they seem to stop responding.
544
545   There is usually only one Pinger, called pinger.
546
547   The Pinger maintains a collection of PingPeer objects, and an event queue.
548   The PingPeers direct the results of their pings, and timer events, to the
549   event queue.  The Pinger's coroutine picks items off the queue and
550   dispatches them back to the PingPeers as appropriate.
551   """
552
553   def __init__(me):
554     """Initialize the Pinger."""
555     T.Coroutine.__init__(me)
556     me._peers = {}
557     me._q = T.Queue()
558
559   def run(me):
560     """
561     Coroutine function: reads the pinger queue and sends events to the
562     PingPeer objects they correspond to.
563     """
564     while True:
565       (peer, seq), code, stuff = me._q.get()
566       if peer in me._peers and seq == me._peers[peer].seq:
567         try: me._peers[peer].event(code, stuff)
568         except Exception, e:
569           SYS.excepthook(*SYS.exc_info())
570
571   def add(me, peer, pingnow):
572     """
573     Add PEER to the collection of peers under the Pinger's watchful eye.
574     The arguments are as for PingPeer: see above.
575     """
576     me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
577     return me
578
579   def kill(me, peername):
580     """Remove PEER from the peers being watched by the Pinger."""
581     try: del me._peers[peername]
582     except KeyError: pass
583     return me
584
585   def rescan(me, startup):
586     """
587     General resynchronization method.
588
589     We scan the list of peers (with connect scripts) known at the server.
590     Any which are known to the Pinger but aren't known to the server are
591     removed from our list; newly arrived peers are added.  (Note that a peer
592     can change state here either due to the server sneakily changing its list
593     without issuing notifications or, more likely, the database changing its
594     idea of whether a peer is interesting.)  Finally, PingPeers which are
595     still present are prodded to update their timing parameters.
596
597     This method is called once at startup to pick up the peers already
598     installed, and again by the dbwatcher coroutine when it detects a change
599     to the database.
600     """
601     if T._debug: print '# rescan peers'
602     correct = {}
603     start = {}
604     for name in S.list():
605       try: peer = Peer(name)
606       except KeyError: continue
607       if peer.get('watch', filter = boolean, default = False):
608         if T._debug: print '# interesting peer %s' % peer
609         correct[peer.name] = start[peer.name] = peer
610       elif startup:
611         if T._debug: print '# peer %s ready for adoption' % peer
612         start[peer.name] = peer
613     for name, obj in me._peers.items():
614       try:
615         peer = correct[name]
616       except KeyError:
617         if T._debug: print '# peer %s vanished' % name
618         del me._peers[name]
619       else:
620         obj.update(peer)
621     for name, peer in start.iteritems():
622       if name in me._peers: continue
623       if startup:
624         if T._debug: print '# setting up peer %s' % name
625         ifname = S.ifname(name)
626         addr = S.addr(name)
627         T.defer(adoptpeer, peer, ifname, *addr)
628       else:
629         if T._debug: print '# adopting new peer %s' % name
630         me.add(peer, True)
631     return me
632
633   def adopted(me):
634     """
635     Returns the list of peers being watched by the Pinger.
636     """
637     return me._peers.keys()
638
639   def find(me, name):
640     """Return the PingPeer with the given name."""
641     return me._peers[name]
642
643 ###--------------------------------------------------------------------------
644 ### New connections.
645
646 def encode_envvars(env, prefix, vars):
647   """
648   Encode the variables in VARS suitably for including in a program
649   environment.  Lowercase letters in variable names are forced to uppercase;
650   runs of non-alphanumeric characters are replaced by single underscores; and
651   the PREFIX is prepended.  The resulting variables are written to ENV.
652   """
653   for k, v in vars.iteritems():
654     env[prefix + r_bad.sub('_', k.upper())] = v
655
656 r_bad = RX.compile(r'[\W_]+')
657 def envvars(peer):
658   """
659   Translate the database information for a PEER into a dictionary of
660   environment variables with plausible upper-case names and a P_ prefix.
661   Also collect the crypto information into A_ variables.
662   """
663   env = {}
664   encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
665   encode_envvars(env, 'A_', S.algs(peer.name))
666   return env
667
668 def run_ifupdown(what, peer, *args):
669   """
670   Run the interface up/down script for a peer.
671
672   WHAT is 'ifup' or 'ifdown'.  PEER names the peer in question.  ARGS is a
673   list of arguments to pass to the script, in addition to the peer name.
674
675   The command is run and watched in the background by potwatch.
676   """
677   q = T.Queue()
678   c = Command([what, peer.name], q, what,
679               M.split(peer.get(what), quotep = True)[0] +
680               [peer.name] + list(args),
681               envvars(peer))
682   potwatch(what, peer.name, q)
683
684 def adoptpeer(peer, ifname, *addr):
685   """
686   Add a new peer to our collection.
687
688   PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
689   ADDR is the list of tokens representing its address.
690
691   We try to bring up the interface and provoke a connection to the peer if
692   it's passive.
693   """
694   if peer.has('ifup'):
695     T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
696         .switch('ifup', peer, ifname, *addr)
697   cmd = peer.get('connect', default = None)
698   if cmd is not None:
699     T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
700         .switch(peer, cmd)
701   if peer.get('watch', filter = boolean, default = False):
702     pinger.add(peer, False)
703
704 def disownpeer(peer):
705   """Drop the PEER from the Pinger and put its interface to bed."""
706   try: pinger.kill(peer)
707   except KeyError: pass
708   cmd = peer.get('disconnect', default = None)
709   if cmd is not None:
710     T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
711         .switch(peer, cmd)
712   if peer.has('ifdown'):
713     T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
714         .switch('ifdown', peer)
715
716 def addpeer(peer, addr):
717   """
718   Process a connect request from a new peer PEER on address ADDR.
719
720   Any existing peer with this name is disconnected from the server.
721   """
722   if peer.name in S.list():
723     S.kill(peer.name)
724   try:
725     booltrue = ['t', 'true', 'y', 'yes', 'on']
726     S.add(peer.name,
727           tunnel = peer.get('tunnel', None),
728           keepalive = peer.get('keepalive', None),
729           key = peer.get('key', None),
730           priv = peer.get('priv', None),
731           mobile = peer.get('mobile', 'nil') in booltrue,
732           cork = peer.get('cork', 'nil') in booltrue,
733           *addr)
734   except T.TripeError, exc:
735     raise T.TripeJobError(*exc.args)
736
737 ## Dictionary mapping challenges to waiting passive-connection coroutines.
738 chalmap = {}
739
740 def notify(_, code, *rest):
741   """
742   Watch for notifications.
743
744   We trap ADD and KILL notifications, and send them straight to adoptpeer and
745   disownpeer respectively; and dispatch GREET notifications to the
746   corresponding waiting coroutine.
747   """
748   if code == 'ADD':
749     try: p = Peer(rest[0])
750     except KeyError: return
751     adoptpeer(p, *rest[1:])
752   elif code == 'KILL':
753     try: p = Peer(rest[0])
754     except KeyError: return
755     disownpeer(p, *rest[1:])
756   elif code == 'GREET':
757     chal = rest[0]
758     try: cr = chalmap[chal]
759     except KeyError: pass
760     else: cr.switch(rest[1:])
761
762 ###--------------------------------------------------------------------------
763 ### Command implementation.
764
765 def cmd_kick(name):
766   """
767   kick NAME: Force a new connection attempt for the NAMEd peer.
768   """
769   try: pp = pinger.find(name)
770   except KeyError: raise T.TripeJobError('peer-not-adopted', name)
771   try: peer = Peer(name)
772   except KeyError: raise T.TripeJobError('unknown-peer', name)
773   conn = peer.get('connect', None)
774   if conn: T.spawn(run_connect, peer, peer.get('connect'))
775   else: T.spawn(lambda p: S.forcekx(p.name), peer)
776
777 def cmd_adopted():
778   """
779   adopted: Report a list of adopted peers.
780   """
781   for name in pinger.adopted():
782     T.svcinfo(name)
783
784 def cmd_active(name):
785   """
786   active NAME: Handle an active connection request for the peer called NAME.
787
788   The appropriate address is read from the database automatically.
789   """
790   try: peer = Peer(name)
791   except KeyError: raise T.TripeJobError('unknown-peer', name)
792   addr = peer.get('peer')
793   if addr == 'PASSIVE':
794     raise T.TripeJobError('passive-peer', name)
795   addpeer(peer, M.split(addr, quotep = True)[0])
796
797 def cmd_listactive():
798   """
799   list: Report a list of the available active peers.
800   """
801   cdb = CDB.init(opts.cdb)
802   for key in cdb.keys():
803     if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
804       T.svcinfo(key[1:])
805
806 def cmd_info(name):
807   """
808   info NAME: Report the database entries for the named peer.
809   """
810   try: peer = Peer(name)
811   except KeyError: raise T.TripeJobError('unknown-peer', name)
812   d = {}
813   try: pp = pinger.find(name)
814   except KeyError: pass
815   else: d.update(pp.info())
816   items = list(peer.list()) + d.keys()
817   items.sort()
818   for i in items:
819     try: v = d[i]
820     except KeyError: v = peer.get(i)
821     T.svcinfo('%s=%s' % (i, v))
822
823 def cmd_userpeer(user):
824   """
825   userpeer USER: Report the peer name for the named user.
826   """
827   try: name = CDB.init(opts.cdb)['U' + user]
828   except KeyError: raise T.TripeJobError('unknown-user', user)
829   T.svcinfo(name)
830
831 def cmd_passive(*args):
832   """
833   passive [OPTIONS] USER: Await the arrival of the named USER.
834
835   Report a challenge; when (and if!) the server receives a greeting quoting
836   this challenge, add the corresponding peer to the server.
837   """
838   timeout = 30
839   op = T.OptParse(args, ['-timeout'])
840   for opt in op:
841     if opt == '-timeout':
842       timeout = T.timespec(op.arg())
843   user, = op.rest(1, 1)
844   try: name = CDB.init(opts.cdb)['U' + user]
845   except KeyError: raise T.TripeJobError('unknown-user', user)
846   try: peer = Peer(name)
847   except KeyError: raise T.TripeJobError('unknown-peer', name)
848   chal = S.getchal()
849   cr = T.Coroutine.getcurrent()
850   timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
851   try:
852     T.svcinfo(chal)
853     chalmap[chal] = cr
854     addr = cr.parent.switch()
855     if addr is None:
856       raise T.TripeJobError('connect-timeout')
857     addpeer(peer, addr)
858   finally:
859     del chalmap[chal]
860
861 def cmd_sabotage(name):
862   """
863   sabotage NAME: Sabotage the NAMEd peer so that we think it can't be pinged.
864   """
865   try: pp = pinger.find(name)
866   except KeyError: raise T.TripeJobError('unknown-peer', name)
867   pp.sabotage()
868
869 ###--------------------------------------------------------------------------
870 ### Start up.
871
872 def setup():
873   """
874   Service setup.
875
876   Register the notification watcher, rescan the peers, and add automatic
877   active peers.
878   """
879   S.handler['NOTE'] = notify
880   S.watch('+n')
881
882   pinger.rescan(opts.startup)
883
884   if opts.startup:
885     cdb = CDB.init(opts.cdb)
886     try:
887       autos = cdb['%AUTO']
888     except KeyError:
889       autos = ''
890     for name in M.split(autos)[0]:
891       try:
892         peer = Peer(name, cdb)
893         addpeer(peer, M.split(peer.get('peer'), quotep = True)[0])
894       except T.TripeJobError, err:
895         S.warn('connect', 'auto-add-failed', name, *err.args)
896
897 def init():
898   """
899   Initialization to be done before service startup.
900   """
901   global errorwatch, childwatch, pinger
902   errorwatch = ErrorWatch()
903   childwatch = ChildWatch()
904   pinger = Pinger()
905   T.Coroutine(dbwatch, name = 'dbwatch').switch()
906   errorwatch.switch()
907   pinger.switch()
908
909 def parse_options():
910   """
911   Parse the command-line options.
912
913   Automatically changes directory to the requested configdir, and turns on
914   debugging.  Returns the options object.
915   """
916   op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
917                     version = '%%prog %s' % VERSION)
918
919   op.add_option('-a', '--admin-socket',
920                 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
921                 help = 'Select socket to connect to [default %default]')
922   op.add_option('-d', '--directory',
923                 metavar = 'DIR', dest = 'dir', default = T.configdir,
924                 help = 'Select current diretory [default %default]')
925   op.add_option('-p', '--peerdb',
926                 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
927                 help = 'Select peers database [default %default]')
928   op.add_option('--daemon', dest = 'daemon',
929                 default = False, action = 'store_true',
930                 help = 'Become a daemon after successful initialization')
931   op.add_option('--debug', dest = 'debug',
932                 default = False, action = 'store_true',
933                 help = 'Emit debugging trace information')
934   op.add_option('--startup', dest = 'startup',
935                 default = False, action = 'store_true',
936                 help = 'Being called as part of the server startup')
937
938   opts, args = op.parse_args()
939   if args: op.error('no arguments permitted')
940   OS.chdir(opts.dir)
941   T._debug = opts.debug
942   return opts
943
944 ## Service table, for running manually.
945 service_info = [('connect', T.VERSION, {
946   'adopted': (0, 0, '', cmd_adopted),
947   'kick': (1, 1, 'PEER', cmd_kick),
948   'passive': (1, None, '[OPTIONS] USER', cmd_passive),
949   'active': (1, 1, 'PEER', cmd_active),
950   'info': (1, 1, 'PEER', cmd_info),
951   'list-active': (0, 0, '', cmd_listactive),
952   'userpeer': (1, 1, 'USER', cmd_userpeer),
953   'sabotage': (1, 1, 'PEER', cmd_sabotage)
954 })]
955
956 if __name__ == '__main__':
957   opts = parse_options()
958   OS.environ['TRIPESOCK'] = opts.tripesock
959   T.runservices(opts.tripesock, service_info,
960                 init = init, setup = setup,
961                 daemon = opts.daemon)
962
963 ###----- That's all, folks --------------------------------------------------