chiark / gitweb /
Upgrade licence to GPLv3+.
[tripe] / svc / connect.in
1 #! @PYTHON@
2 ### -*-python-*-
3 ###
4 ### Connect to remote peers, and keep track of them
5 ###
6 ### (c) 2007 Straylight/Edgeware
7 ###
8
9 ###----- Licensing notice ---------------------------------------------------
10 ###
11 ### This file is part of Trivial IP Encryption (TrIPE).
12 ###
13 ### TrIPE is free software: you can redistribute it and/or modify it under
14 ### the terms of the GNU General Public License as published by the Free
15 ### Software Foundation; either version 3 of the License, or (at your
16 ### option) any later version.
17 ###
18 ### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19 ### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20 ### FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
21 ### for more details.
22 ###
23 ### You should have received a copy of the GNU General Public License
24 ### along with TrIPE.  If not, see <https://www.gnu.org/licenses/>.
25
26 VERSION = '@VERSION@'
27
28 ###--------------------------------------------------------------------------
29 ### External dependencies.
30
31 from optparse import OptionParser
32 import tripe as T
33 import os as OS
34 import signal as SIG
35 import errno as E
36 import cdb as CDB
37 import mLib as M
38 import re as RX
39 from time import time
40 import subprocess as PROC
41
42 S = T.svcmgr
43
44 ###--------------------------------------------------------------------------
45 ### Running auxiliary commands.
46
47 class SelLineQueue (M.SelLineBuffer):
48   """Glues the select-line-buffer into the coroutine queue system."""
49
50   def __new__(cls, file, queue, tag, kind):
51     """See __init__ for documentation."""
52     return M.SelLineBuffer.__new__(cls, file.fileno())
53
54   def __init__(me, file, queue, tag, kind):
55     """
56     Initialize a new line-reading adaptor.
57
58     The adaptor reads lines from FILE.  Each line is inserted as a message of
59     the stated KIND, bearing the TAG, into the QUEUE.  End-of-file is
60     represented as None.
61     """
62     me._q = queue
63     me._file = file
64     me._tag = tag
65     me._kind = kind
66     me.enable()
67
68   @T._callback
69   def line(me, line):
70     me._q.put((me._tag, me._kind, line))
71
72   @T._callback
73   def eof(me):
74     me.disable()
75     me._q.put((me._tag, me._kind, None))
76
77 class ErrorWatch (T.Coroutine):
78   """
79   An object which watches stderr streams for errors and converts them into
80   warnings of the form
81
82     WARN connect INFO stderr LINE
83
84   The INFO is a list of tokens associated with the file when it was
85   registered.
86
87   Usually there is a single ErrorWatch object, called errorwatch.
88   """
89
90   def __init__(me):
91     """Initialization: there are no arguments."""
92     T.Coroutine.__init__(me)
93     me._q = T.Queue()
94     me._map = {}
95     me._seq = 1
96
97   def watch(me, file, info):
98     """
99     Adds FILE to the collection of files to watch.
100
101     INFO will be written in the warning messages from this FILE.  Returns a
102     sequence number which can be used to unregister the file again.
103     """
104     seq = me._seq
105     me._seq += 1
106     me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
107     return seq
108
109   def unwatch(me, seq):
110     """Stop watching the file with sequence number SEQ."""
111     del me._map[seq]
112     return me
113
114   def run(me):
115     """
116     Coroutine function: read items from the queue and report them.
117
118     Unregisters files automatically when they reach EOF.
119     """
120     while True:
121       seq, _, line = me._q.get()
122       if line is None:
123         me.unwatch(seq)
124       else:
125         S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
126
127 def dbwatch():
128   """
129   Coroutine function: wake up every minute and notice changes to the
130   database.  When a change happens, tell the Pinger (q.v.) to rescan its
131   peers.
132   """
133   cr = T.Coroutine.getcurrent()
134   main = cr.parent
135   fw = M.FWatch(opts.cdb)
136   while True:
137     timer = M.SelTimer(time() + 60, lambda: cr.switch())
138     main.switch()
139     if fw.update():
140       pinger.rescan(False)
141       S.notify('connect', 'peerdb-update')
142
143 class ChildWatch (M.SelSignal):
144   """
145   An object which watches for specified processes exiting and reports
146   terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
147
148   There is usually only one ChildWatch object, called childwatch.
149   """
150
151   def __new__(cls):
152     """Initialize the child-watcher."""
153     return M.SelSignal.__new__(cls, SIG.SIGCHLD)
154
155   def __init__(me):
156     """Initialize the child-watcher."""
157     me._pid = {}
158     me.enable()
159
160   def watch(me, pid, queue, tag):
161     """
162     Register PID as a child to watch.  If it exits, write (TAG, 'exit', CODE)
163     to the QUEUE, where CODE is one of
164
165       * None (successful termination)
166       * ['exit-nonzero', CODE] (CODE is a string!)
167       * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
168       * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
169     """
170     me._pid[pid] = queue, tag
171     return me
172
173   def unwatch(me, pid):
174     """Unregister PID as a child to watch."""
175     del me._pid[pid]
176     return me
177
178   @T._callback
179   def signalled(me):
180     """
181     Called when child processes exit: collect exit statuses and report
182     failures.
183     """
184     while True:
185       try:
186         pid, status = OS.waitpid(-1, OS.WNOHANG)
187       except OSError, exc:
188         if exc.errno == E.ECHILD:
189           break
190       if pid == 0:
191         break
192       if pid not in me._pid:
193         continue
194       queue, tag = me._pid[pid]
195       if OS.WIFEXITED(status):
196         exit = OS.WEXITSTATUS(status)
197         if exit == 0:
198           code = None
199         else:
200           code = ['exit-nonzero', str(exit)]
201       elif OS.WIFSIGNALED(status):
202         code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
203       else:
204         code = ['exit-unknown', hex(status)]
205       queue.put((tag, 'exit', code))
206
207 class Command (object):
208   """
209   Represents a running command.
210
211   This class is the main interface to the machery provided by the ChildWatch
212   and ErrorWatch objects.  See also potwatch.
213   """
214
215   def __init__(me, info, queue, tag, args, env):
216     """
217     Start a new child process.
218
219     The ARGS are a list of arguments to be given to the child process.  The
220     ENV is either None or a dictionary of environment variable assignments to
221     override the extant environment.  INFO is a list of tokens to be included
222     in warnings about the child's stderr output.  If the child writes a line
223     to standard output, put (TAG, 'stdout', LINE) to the QUEUE.  When the
224     child exits, write (TAG, 'exit', CODE) to the QUEUE.
225     """
226     me._info = info
227     me._q = queue
228     me._tag = tag
229     myenv = OS.environ.copy()
230     if env: myenv.update(env)
231     me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
232                           stdout = PROC.PIPE, stderr = PROC.PIPE)
233     me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
234     errorwatch.watch(me._proc.stderr, info)
235     childwatch.watch(me._proc.pid, queue, tag)
236
237   def __del__(me):
238     """
239     If I've been forgotten then stop watching for termination.
240     """
241     childwatch.unwatch(me._proc.pid)
242
243 def potwatch(what, name, q):
244   """
245   Watch the queue Q for activity as reported by a Command object.
246
247   Information from the process's stdout is reported as
248
249     NOTE WHAT NAME stdout LINE
250
251   abnormal termination is reported as
252
253     WARN WHAT NAME CODE
254
255   where CODE is what the ChildWatch wrote.
256   """
257   eofp = deadp = False
258   while not deadp or not eofp:
259     _, kind, more = q.get()
260     if kind == 'stdout':
261       if more is None:
262         eofp = True
263       else:
264         S.notify('connect', what, name, 'stdout', more)
265     elif kind == 'exit':
266       if more: S.warn('connect', what, name, *more)
267       deadp = True
268
269 ###--------------------------------------------------------------------------
270 ### Peer database utilities.
271
272 _magic = ['_magic']                     # An object distinct from all others
273
274 class Peer (object):
275   """Representation of a peer in the database."""
276
277   def __init__(me, peer, cdb = None):
278     """
279     Create a new peer, named PEER.
280
281     Information about the peer is read from the database CDB, or the default
282     one given on the command-line.
283     """
284     me.name = peer
285     record = (cdb or CDB.init(opts.cdb))['P' + peer]
286     me.__dict__.update(M.URLDecode(record, semip = True))
287
288   def get(me, key, default = _magic, filter = None):
289     """
290     Get the information stashed under KEY from the peer's database record.
291
292     If DEFAULT is given, then use it if the database doesn't contain the
293     necessary information.  If no DEFAULT is given, then report an error.  If
294     a FILTER function is given then apply it to the information from the
295     database before returning it.
296     """
297     attr = me.__dict__.get(key, default)
298     if attr is _magic:
299       raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
300     elif filter is not None:
301       attr = filter(attr)
302     return attr
303
304   def has(me, key):
305     """
306     Return whether the peer's database record has the KEY.
307     """
308     return key in me.__dict__
309
310   def list(me):
311     """
312     Iterate over the available keys in the peer's database record.
313     """
314     return me.__dict__.iterkeys()
315
316 def boolean(value):
317   """Parse VALUE as a boolean."""
318   return value in ['t', 'true', 'y', 'yes', 'on']
319
320 ###--------------------------------------------------------------------------
321 ### Waking up and watching peers.
322
323 def run_connect(peer, cmd):
324   """
325   Start the job of connecting to the passive PEER.
326
327   The CMD string is a shell command which will connect to the peer (via some
328   back-channel, say ssh and userv), issue a command
329
330     SVCSUBMIT connect passive [OPTIONS] USER
331
332   and write the resulting challenge to standard error.
333   """
334   q = T.Queue()
335   cmd = Command(['connect', peer.name], q, 'connect',
336                 ['/bin/sh', '-c', cmd], None)
337   _, kind, more = q.peek()
338   if kind == 'stdout':
339     if more is None:
340       S.warn('connect', 'connect', peer.name, 'unexpected-eof')
341     else:
342       chal = more
343       S.greet(peer.name, chal)
344       q.get()
345   potwatch('connect', peer.name, q)
346
347 def run_disconnect(peer, cmd):
348   """
349   Start the job of disconnecting from a passive PEER.
350
351   The CMD string is a shell command which will disconnect from the peer.
352   """
353   q = T.Queue()
354   cmd = Command(['disconnect', peer.name], q, 'disconnect',
355                 ['/bin/sh', '-c', cmd], None)
356   potwatch('disconnect', peer.name, q)
357
358 _pingseq = 0
359 class PingPeer (object):
360   """
361   Object representing a peer which we are pinging to ensure that it is still
362   present.
363
364   PingPeer objects are held by the Pinger (q.v.).  The Pinger maintains an
365   event queue -- which saves us from having an enormous swarm of coroutines
366   -- but most of the actual work is done here.
367
368   In order to avoid confusion between different PingPeer instances for the
369   same actual peer, each PingPeer has a sequence number (its `seq'
370   attribute).  Events for the PingPeer are identified by a (PEER, SEQ) pair.
371   (Using the PingPeer instance itself will prevent garbage collection of
372   otherwise defunct instances.)
373   """
374
375   def __init__(me, pinger, queue, peer, pingnow):
376     """
377     Create a new PingPeer.
378
379     The PINGER is the Pinger object we should send the results to.  This is
380     used when we remove ourselves, if the peer has been explicitly removed.
381
382     The QUEUE is the event queue on which timer and ping-command events
383     should be written.
384
385     The PEER is a `Peer' object describing the peer.
386
387     If PINGNOW is true, then immediately start pinging the peer.  Otherwise
388     wait until the usual retry interval.
389     """
390     global _pingseq
391     me._pinger = pinger
392     me._q = queue
393     me._peer = peer.name
394     me.update(peer)
395     me.seq = _pingseq
396     _pingseq += 1
397     me._failures = 0
398     if pingnow:
399       me._timer = None
400       me._ping()
401     else:
402       me._timer = M.SelTimer(time() + me._every, me._time)
403
404   def update(me, peer):
405     """
406     Refreshes the timer parameters for this peer.  We don't, however,
407     immediately reschedule anything: that will happen next time anything
408     interesting happens.
409     """
410     if peer is None: peer = Peer(me._peer)
411     assert peer.name == me._peer
412     me._every = peer.get('every', filter = T.timespec, default = 120)
413     me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
414     me._retries = peer.get('retries', filter = int, default = 5)
415     me._connectp = peer.has('connect')
416     return me
417
418   def _ping(me):
419     """
420     Send a ping to the peer; the result is sent to the Pinger's event queue.
421     """
422     S.rawcommand(T.TripeAsynchronousCommand(
423       me._q, (me._peer, me.seq),
424       ['EPING',
425        '-background', S.bgtag(),
426        '-timeout', str(me._timeout),
427        '--',
428        me._peer]))
429
430   def _reconnect(me):
431     peer = Peer(me._peer)
432     if me._connectp:
433       S.warn('connect', 'reconnecting', me._peer)
434       S.forcekx(me._peer)
435       T.spawn(run_connect, peer, peer.get('connect'))
436       me._timer = M.SelTimer(time() + me._every, me._time)
437     else:
438       S.kill(me._peer)
439
440   def event(me, code, stuff):
441     """
442     Respond to an event which happened to this peer.
443
444     Timer events indicate that we should start a new ping.  (The server has
445     its own timeout which detects lost packets.)
446
447     We trap unknown-peer responses and detach from the Pinger.
448
449     If the ping fails and we run out of retries, we attempt to restart the
450     connection.
451     """
452     if code == 'TIMER':
453       me._failures = 0
454       me._ping()
455     elif code == 'FAIL':
456       S.notify('connect', 'ping-failed', me._peer, *stuff)
457       if not stuff:
458         pass
459       elif stuff[0] == 'unknown-peer':
460         me._pinger.kill(me._peer)
461       elif stuff[0] == 'ping-send-failed':
462         me._reconnect()
463     elif code == 'INFO':
464       if stuff[0] == 'ping-ok':
465         if me._failures > 0:
466           S.warn('connect', 'ping-ok', me._peer)
467         me._timer = M.SelTimer(time() + me._every, me._time)
468       elif stuff[0] == 'ping-timeout':
469         me._failures += 1
470         S.warn('connect', 'ping-timeout', me._peer,
471                'attempt', str(me._failures), 'of', str(me._retries))
472         if me._failures < me._retries:
473           me._ping()
474         else:
475           me._reconnect()
476       elif stuff[0] == 'ping-peer-died':
477         me._pinger.kill(me._peer)
478
479   @T._callback
480   def _time(me):
481     """
482     Handle timer callbacks by posting a timeout event on the queue.
483     """
484     me._timer = None
485     me._q.put(((me._peer, me.seq), 'TIMER', None))
486
487   def __str__(me):
488     return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
489   def __repr__(me):
490     return str(me)
491
492 class Pinger (T.Coroutine):
493   """
494   The Pinger keeps track of the peers which we expect to be connected and
495   takes action if they seem to stop responding.
496
497   There is usually only one Pinger, called pinger.
498
499   The Pinger maintains a collection of PingPeer objects, and an event queue.
500   The PingPeers direct the results of their pings, and timer events, to the
501   event queue.  The Pinger's coroutine picks items off the queue and
502   dispatches them back to the PingPeers as appropriate.
503   """
504
505   def __init__(me):
506     """Initialize the Pinger."""
507     T.Coroutine.__init__(me)
508     me._peers = {}
509     me._q = T.Queue()
510
511   def run(me):
512     """
513     Coroutine function: reads the pinger queue and sends events to the
514     PingPeer objects they correspond to.
515     """
516     while True:
517       (peer, seq), code, stuff = me._q.get()
518       if peer in me._peers and seq == me._peers[peer].seq:
519         me._peers[peer].event(code, stuff)
520
521   def add(me, peer, pingnow):
522     """
523     Add PEER to the collection of peers under the Pinger's watchful eye.
524     The arguments are as for PingPeer: see above.
525     """
526     me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
527     return me
528
529   def kill(me, peername):
530     """Remove PEER from the peers being watched by the Pinger."""
531     del me._peers[peername]
532     return me
533
534   def rescan(me, startup):
535     """
536     General resynchronization method.
537
538     We scan the list of peers (with connect scripts) known at the server.
539     Any which are known to the Pinger but aren't known to the server are
540     removed from our list; newly arrived peers are added.  (Note that a peer
541     can change state here either due to the server sneakily changing its list
542     without issuing notifications or, more likely, the database changing its
543     idea of whether a peer is interesting.)  Finally, PingPeers which are
544     still present are prodded to update their timing parameters.
545
546     This method is called once at startup to pick up the peers already
547     installed, and again by the dbwatcher coroutine when it detects a change
548     to the database.
549     """
550     if T._debug: print '# rescan peers'
551     correct = {}
552     start = {}
553     for name in S.list():
554       try: peer = Peer(name)
555       except KeyError: continue
556       if peer.get('watch', filter = boolean, default = False):
557         if T._debug: print '# interesting peer %s' % peer
558         correct[peer.name] = start[peer.name] = peer
559       elif startup:
560         if T._debug: print '# peer %s ready for adoption' % peer
561         start[peer.name] = peer
562     for name, obj in me._peers.items():
563       try:
564         peer = correct[name]
565       except KeyError:
566         if T._debug: print '# peer %s vanished' % name
567         del me._peers[name]
568       else:
569         obj.update(peer)
570     for name, peer in start.iteritems():
571       if name in me._peers: continue
572       if startup:
573         if T._debug: print '# setting up peer %s' % name
574         ifname = S.ifname(name)
575         addr = S.addr(name)
576         T.defer(adoptpeer, peer, ifname, *addr)
577       else:
578         if T._debug: print '# adopting new peer %s' % name
579         me.add(peer, True)
580     return me
581
582   def adopted(me):
583     """
584     Returns the list of peers being watched by the Pinger.
585     """
586     return me._peers.keys()
587
588 ###--------------------------------------------------------------------------
589 ### New connections.
590
591 def encode_envvars(env, prefix, vars):
592   """
593   Encode the variables in VARS suitably for including in a program
594   environment.  Lowercase letters in variable names are forced to uppercase;
595   runs of non-alphanumeric characters are replaced by single underscores; and
596   the PREFIX is prepended.  The resulting variables are written to ENV.
597   """
598   for k, v in vars.iteritems():
599     env[prefix + r_bad.sub('_', k.upper())] = v
600
601 r_bad = RX.compile(r'[\W_]+')
602 def envvars(peer):
603   """
604   Translate the database information for a PEER into a dictionary of
605   environment variables with plausible upper-case names and a P_ prefix.
606   Also collect the crypto information into A_ variables.
607   """
608   env = {}
609   encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
610   encode_envvars(env, 'A_', S.algs(peer.name))
611   return env
612
613 def run_ifupdown(what, peer, *args):
614   """
615   Run the interface up/down script for a peer.
616
617   WHAT is 'ifup' or 'ifdown'.  PEER names the peer in question.  ARGS is a
618   list of arguments to pass to the script, in addition to the peer name.
619
620   The command is run and watched in the background by potwatch.
621   """
622   q = T.Queue()
623   c = Command([what, peer.name], q, what,
624               M.split(peer.get(what), quotep = True)[0] +
625               [peer.name] + list(args),
626               envvars(peer))
627   potwatch(what, peer.name, q)
628
629 def adoptpeer(peer, ifname, *addr):
630   """
631   Add a new peer to our collection.
632
633   PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
634   ADDR is the list of tokens representing its address.
635
636   We try to bring up the interface and provoke a connection to the peer if
637   it's passive.
638   """
639   if peer.has('ifup'):
640     T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
641         .switch('ifup', peer, ifname, *addr)
642   cmd = peer.get('connect', default = None)
643   if cmd is not None:
644     T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
645         .switch(peer, cmd)
646   if peer.get('watch', filter = boolean, default = False):
647     pinger.add(peer, False)
648
649 def disownpeer(peer):
650   """Drop the PEER from the Pinger and put its interface to bed."""
651   try: pinger.kill(peer)
652   except KeyError: pass
653   cmd = peer.get('disconnect', default = None)
654   if cmd is not None:
655     T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
656         .switch(peer, cmd)
657   if peer.has('ifdown'):
658     T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
659         .switch('ifdown', peer)
660
661 def addpeer(peer, addr):
662   """
663   Process a connect request from a new peer PEER on address ADDR.
664
665   Any existing peer with this name is disconnected from the server.
666   """
667   if peer.name in S.list():
668     S.kill(peer.name)
669   try:
670     booltrue = ['t', 'true', 'y', 'yes', 'on']
671     S.add(peer.name,
672           tunnel = peer.get('tunnel', None),
673           keepalive = peer.get('keepalive', None),
674           key = peer.get('key', None),
675           priv = peer.get('priv', None),
676           mobile = peer.get('mobile', 'nil') in booltrue,
677           cork = peer.get('cork', 'nil') in booltrue,
678           *addr)
679   except T.TripeError, exc:
680     raise T.TripeJobError(*exc.args)
681
682 ## Dictionary mapping challenges to waiting passive-connection coroutines.
683 chalmap = {}
684
685 def notify(_, code, *rest):
686   """
687   Watch for notifications.
688
689   We trap ADD and KILL notifications, and send them straight to adoptpeer and
690   disownpeer respectively; and dispatch GREET notifications to the
691   corresponding waiting coroutine.
692   """
693   if code == 'ADD':
694     try: p = Peer(rest[0])
695     except KeyError: return
696     adoptpeer(p, *rest[1:])
697   elif code == 'KILL':
698     try: p = Peer(rest[0])
699     except KeyError: return
700     disownpeer(p, *rest[1:])
701   elif code == 'GREET':
702     chal = rest[0]
703     try: cr = chalmap[chal]
704     except KeyError: pass
705     else: cr.switch(rest[1:])
706
707 ###--------------------------------------------------------------------------
708 ### Command implementation.
709
710 def cmd_kick(name):
711   """
712   kick NAME: Force a new connection attempt for the NAMEd peer.
713   """
714   if name not in pinger.adopted():
715     raise T.TripeJobError('peer-not-adopted', name)
716   try: peer = Peer(name)
717   except KeyError: raise T.TripeJobError('unknown-peer', name)
718   T.spawn(run_connect, peer, peer.get('connect'))
719
720 def cmd_adopted():
721   """
722   adopted: Report a list of adopted peers.
723   """
724   for name in pinger.adopted():
725     T.svcinfo(name)
726
727 def cmd_active(name):
728   """
729   active NAME: Handle an active connection request for the peer called NAME.
730
731   The appropriate address is read from the database automatically.
732   """
733   try: peer = Peer(name)
734   except KeyError: raise T.TripeJobError('unknown-peer', name)
735   addr = peer.get('peer')
736   if addr == 'PASSIVE':
737     raise T.TripeJobError('passive-peer', name)
738   addpeer(peer, M.split(addr, quotep = True)[0])
739
740 def cmd_listactive():
741   """
742   list: Report a list of the available active peers.
743   """
744   cdb = CDB.init(opts.cdb)
745   for key in cdb.keys():
746     if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
747       T.svcinfo(key[1:])
748
749 def cmd_info(name):
750   """
751   info NAME: Report the database entries for the named peer.
752   """
753   try: peer = Peer(name)
754   except KeyError: raise T.TripeJobError('unknown-peer', name)
755   items = list(peer.list())
756   items.sort()
757   for i in items:
758     T.svcinfo('%s=%s' % (i, peer.get(i)))
759
760 def cmd_userpeer(user):
761   """
762   userpeer USER: Report the peer name for the named user.
763   """
764   try: name = CDB.init(opts.cdb)['U' + user]
765   except KeyError: raise T.TripeJobError('unknown-user', user)
766   T.svcinfo(name)
767
768 def cmd_passive(*args):
769   """
770   passive [OPTIONS] USER: Await the arrival of the named USER.
771
772   Report a challenge; when (and if!) the server receives a greeting quoting
773   this challenge, add the corresponding peer to the server.
774   """
775   timeout = 30
776   op = T.OptParse(args, ['-timeout'])
777   for opt in op:
778     if opt == '-timeout':
779       timeout = T.timespec(op.arg())
780   user, = op.rest(1, 1)
781   try: name = CDB.init(opts.cdb)['U' + user]
782   except KeyError: raise T.TripeJobError('unknown-user', user)
783   try: peer = Peer(name)
784   except KeyError: raise T.TripeJobError('unknown-peer', name)
785   chal = S.getchal()
786   cr = T.Coroutine.getcurrent()
787   timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
788   try:
789     T.svcinfo(chal)
790     chalmap[chal] = cr
791     addr = cr.parent.switch()
792     if addr is None:
793       raise T.TripeJobError('connect-timeout')
794     addpeer(peer, addr)
795   finally:
796     del chalmap[chal]
797
798 ###--------------------------------------------------------------------------
799 ### Start up.
800
801 def setup():
802   """
803   Service setup.
804
805   Register the notification watcher, rescan the peers, and add automatic
806   active peers.
807   """
808   S.handler['NOTE'] = notify
809   S.watch('+n')
810
811   pinger.rescan(opts.startup)
812
813   if opts.startup:
814     cdb = CDB.init(opts.cdb)
815     try:
816       autos = cdb['%AUTO']
817     except KeyError:
818       autos = ''
819     for name in M.split(autos)[0]:
820       try:
821         peer = Peer(name, cdb)
822         addpeer(peer, M.split(peer.get('peer'), quotep = True)[0])
823       except T.TripeJobError, err:
824         S.warn('connect', 'auto-add-failed', name, *err.args)
825
826 def init():
827   """
828   Initialization to be done before service startup.
829   """
830   global errorwatch, childwatch, pinger
831   errorwatch = ErrorWatch()
832   childwatch = ChildWatch()
833   pinger = Pinger()
834   T.Coroutine(dbwatch, name = 'dbwatch').switch()
835   errorwatch.switch()
836   pinger.switch()
837
838 def parse_options():
839   """
840   Parse the command-line options.
841
842   Automatically changes directory to the requested configdir, and turns on
843   debugging.  Returns the options object.
844   """
845   op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
846                     version = '%%prog %s' % VERSION)
847
848   op.add_option('-a', '--admin-socket',
849                 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
850                 help = 'Select socket to connect to [default %default]')
851   op.add_option('-d', '--directory',
852                 metavar = 'DIR', dest = 'dir', default = T.configdir,
853                 help = 'Select current diretory [default %default]')
854   op.add_option('-p', '--peerdb',
855                 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
856                 help = 'Select peers database [default %default]')
857   op.add_option('--daemon', dest = 'daemon',
858                 default = False, action = 'store_true',
859                 help = 'Become a daemon after successful initialization')
860   op.add_option('--debug', dest = 'debug',
861                 default = False, action = 'store_true',
862                 help = 'Emit debugging trace information')
863   op.add_option('--startup', dest = 'startup',
864                 default = False, action = 'store_true',
865                 help = 'Being called as part of the server startup')
866
867   opts, args = op.parse_args()
868   if args: op.error('no arguments permitted')
869   OS.chdir(opts.dir)
870   T._debug = opts.debug
871   return opts
872
873 ## Service table, for running manually.
874 service_info = [('connect', T.VERSION, {
875   'adopted': (0, 0, '', cmd_adopted),
876   'kick': (1, 1, 'PEER', cmd_kick),
877   'passive': (1, None, '[OPTIONS] USER', cmd_passive),
878   'active': (1, 1, 'PEER', cmd_active),
879   'info': (1, 1, 'PEER', cmd_info),
880   'list-active': (0, 0, '', cmd_listactive),
881   'userpeer': (1, 1, 'USER', cmd_userpeer)
882 })]
883
884 if __name__ == '__main__':
885   opts = parse_options()
886   T.runservices(opts.tripesock, service_info,
887                 init = init, setup = setup,
888                 daemon = opts.daemon)
889
890 ###----- That's all, folks --------------------------------------------------