chiark / gitweb /
svc/connect.in (cmd_kick): Don't assume there's a `connect' command.
[tripe] / svc / connect.in
CommitLineData
a62f8e8a
MW
1#! @PYTHON@
2### -*-python-*-
3###
d64ce4ae 4### Connect to remote peers, and keep track of them
a62f8e8a 5###
d64ce4ae 6### (c) 2007 Straylight/Edgeware
a62f8e8a
MW
7###
8
9###----- Licensing notice ---------------------------------------------------
10###
11### This file is part of Trivial IP Encryption (TrIPE).
12###
11ad66c2
MW
13### TrIPE is free software: you can redistribute it and/or modify it under
14### the terms of the GNU General Public License as published by the Free
15### Software Foundation; either version 3 of the License, or (at your
16### option) any later version.
a62f8e8a 17###
11ad66c2
MW
18### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20### FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21### for more details.
a62f8e8a
MW
22###
23### You should have received a copy of the GNU General Public License
11ad66c2 24### along with TrIPE. If not, see <https://www.gnu.org/licenses/>.
a62f8e8a
MW
25
26VERSION = '@VERSION@'
27
28###--------------------------------------------------------------------------
29### External dependencies.
30
31from optparse import OptionParser
32import tripe as T
33import os as OS
d64ce4ae
MW
34import signal as SIG
35import errno as E
a62f8e8a
MW
36import cdb as CDB
37import mLib as M
d64ce4ae 38import re as RX
a62f8e8a 39from time import time
d64ce4ae 40import subprocess as PROC
a62f8e8a
MW
41
42S = T.svcmgr
43
44###--------------------------------------------------------------------------
d64ce4ae
MW
45### Running auxiliary commands.
46
47class SelLineQueue (M.SelLineBuffer):
48 """Glues the select-line-buffer into the coroutine queue system."""
49
50 def __new__(cls, file, queue, tag, kind):
51 """See __init__ for documentation."""
52 return M.SelLineBuffer.__new__(cls, file.fileno())
53
54 def __init__(me, file, queue, tag, kind):
55 """
56 Initialize a new line-reading adaptor.
57
58 The adaptor reads lines from FILE. Each line is inserted as a message of
59 the stated KIND, bearing the TAG, into the QUEUE. End-of-file is
60 represented as None.
61 """
62 me._q = queue
63 me._file = file
64 me._tag = tag
65 me._kind = kind
66 me.enable()
67
68 @T._callback
69 def line(me, line):
70 me._q.put((me._tag, me._kind, line))
71
72 @T._callback
73 def eof(me):
74 me.disable()
75 me._q.put((me._tag, me._kind, None))
76
77class ErrorWatch (T.Coroutine):
78 """
79 An object which watches stderr streams for errors and converts them into
80 warnings of the form
81
82 WARN connect INFO stderr LINE
83
84 The INFO is a list of tokens associated with the file when it was
85 registered.
86
87 Usually there is a single ErrorWatch object, called errorwatch.
88 """
89
90 def __init__(me):
91 """Initialization: there are no arguments."""
92 T.Coroutine.__init__(me)
93 me._q = T.Queue()
94 me._map = {}
95 me._seq = 1
96
97 def watch(me, file, info):
98 """
99 Adds FILE to the collection of files to watch.
100
101 INFO will be written in the warning messages from this FILE. Returns a
102 sequence number which can be used to unregister the file again.
103 """
104 seq = me._seq
105 me._seq += 1
106 me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
107 return seq
108
109 def unwatch(me, seq):
110 """Stop watching the file with sequence number SEQ."""
111 del me._map[seq]
112 return me
113
114 def run(me):
115 """
116 Coroutine function: read items from the queue and report them.
117
118 Unregisters files automatically when they reach EOF.
119 """
120 while True:
121 seq, _, line = me._q.get()
122 if line is None:
123 me.unwatch(seq)
124 else:
125 S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
126
127def dbwatch():
128 """
14b77b60 129 Coroutine function: wake up every minute and notice changes to the
d64ce4ae
MW
130 database. When a change happens, tell the Pinger (q.v.) to rescan its
131 peers.
132 """
133 cr = T.Coroutine.getcurrent()
134 main = cr.parent
135 fw = M.FWatch(opts.cdb)
136 while True:
14b77b60 137 timer = M.SelTimer(time() + 60, lambda: cr.switch())
d64ce4ae
MW
138 main.switch()
139 if fw.update():
140 pinger.rescan(False)
141 S.notify('connect', 'peerdb-update')
142
143class ChildWatch (M.SelSignal):
144 """
145 An object which watches for specified processes exiting and reports
146 terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
147
148 There is usually only one ChildWatch object, called childwatch.
149 """
150
151 def __new__(cls):
152 """Initialize the child-watcher."""
153 return M.SelSignal.__new__(cls, SIG.SIGCHLD)
154
155 def __init__(me):
156 """Initialize the child-watcher."""
157 me._pid = {}
158 me.enable()
159
160 def watch(me, pid, queue, tag):
161 """
162 Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE)
163 to the QUEUE, where CODE is one of
164
165 * None (successful termination)
166 * ['exit-nonzero', CODE] (CODE is a string!)
167 * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
168 * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
169 """
170 me._pid[pid] = queue, tag
171 return me
172
173 def unwatch(me, pid):
174 """Unregister PID as a child to watch."""
175 del me._pid[pid]
176 return me
177
178 @T._callback
179 def signalled(me):
180 """
181 Called when child processes exit: collect exit statuses and report
182 failures.
183 """
184 while True:
185 try:
186 pid, status = OS.waitpid(-1, OS.WNOHANG)
187 except OSError, exc:
188 if exc.errno == E.ECHILD:
189 break
190 if pid == 0:
191 break
192 if pid not in me._pid:
193 continue
194 queue, tag = me._pid[pid]
195 if OS.WIFEXITED(status):
196 exit = OS.WEXITSTATUS(status)
197 if exit == 0:
198 code = None
199 else:
200 code = ['exit-nonzero', str(exit)]
201 elif OS.WIFSIGNALED(status):
202 code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
203 else:
204 code = ['exit-unknown', hex(status)]
205 queue.put((tag, 'exit', code))
206
207class Command (object):
208 """
209 Represents a running command.
210
211 This class is the main interface to the machery provided by the ChildWatch
212 and ErrorWatch objects. See also potwatch.
213 """
214
215 def __init__(me, info, queue, tag, args, env):
216 """
217 Start a new child process.
218
219 The ARGS are a list of arguments to be given to the child process. The
220 ENV is either None or a dictionary of environment variable assignments to
221 override the extant environment. INFO is a list of tokens to be included
222 in warnings about the child's stderr output. If the child writes a line
223 to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the
224 child exits, write (TAG, 'exit', CODE) to the QUEUE.
225 """
226 me._info = info
227 me._q = queue
228 me._tag = tag
229 myenv = OS.environ.copy()
230 if env: myenv.update(env)
231 me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
232 stdout = PROC.PIPE, stderr = PROC.PIPE)
233 me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
234 errorwatch.watch(me._proc.stderr, info)
235 childwatch.watch(me._proc.pid, queue, tag)
236
237 def __del__(me):
238 """
239 If I've been forgotten then stop watching for termination.
240 """
241 childwatch.unwatch(me._proc.pid)
242
243def potwatch(what, name, q):
244 """
245 Watch the queue Q for activity as reported by a Command object.
246
247 Information from the process's stdout is reported as
248
249 NOTE WHAT NAME stdout LINE
250
251 abnormal termination is reported as
252
253 WARN WHAT NAME CODE
254
255 where CODE is what the ChildWatch wrote.
256 """
257 eofp = deadp = False
258 while not deadp or not eofp:
259 _, kind, more = q.get()
260 if kind == 'stdout':
261 if more is None:
262 eofp = True
263 else:
264 S.notify('connect', what, name, 'stdout', more)
265 elif kind == 'exit':
266 if more: S.warn('connect', what, name, *more)
267 deadp = True
268
269###--------------------------------------------------------------------------
270### Peer database utilities.
a62f8e8a
MW
271
272_magic = ['_magic'] # An object distinct from all others
273
274class Peer (object):
275 """Representation of a peer in the database."""
276
277 def __init__(me, peer, cdb = None):
278 """
279 Create a new peer, named PEER.
280
281 Information about the peer is read from the database CDB, or the default
282 one given on the command-line.
283 """
284 me.name = peer
d64ce4ae 285 record = (cdb or CDB.init(opts.cdb))['P' + peer]
a62f8e8a
MW
286 me.__dict__.update(M.URLDecode(record, semip = True))
287
d64ce4ae 288 def get(me, key, default = _magic, filter = None):
a62f8e8a
MW
289 """
290 Get the information stashed under KEY from the peer's database record.
291
292 If DEFAULT is given, then use it if the database doesn't contain the
d64ce4ae
MW
293 necessary information. If no DEFAULT is given, then report an error. If
294 a FILTER function is given then apply it to the information from the
295 database before returning it.
a62f8e8a
MW
296 """
297 attr = me.__dict__.get(key, default)
298 if attr is _magic:
299 raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
d64ce4ae
MW
300 elif filter is not None:
301 attr = filter(attr)
a62f8e8a
MW
302 return attr
303
d64ce4ae
MW
304 def has(me, key):
305 """
306 Return whether the peer's database record has the KEY.
307 """
308 return key in me.__dict__
309
a62f8e8a
MW
310 def list(me):
311 """
312 Iterate over the available keys in the peer's database record.
313 """
314 return me.__dict__.iterkeys()
315
d64ce4ae
MW
316def boolean(value):
317 """Parse VALUE as a boolean."""
318 return value in ['t', 'true', 'y', 'yes', 'on']
319
320###--------------------------------------------------------------------------
321### Waking up and watching peers.
322
323def run_connect(peer, cmd):
324 """
325 Start the job of connecting to the passive PEER.
326
327 The CMD string is a shell command which will connect to the peer (via some
328 back-channel, say ssh and userv), issue a command
329
330 SVCSUBMIT connect passive [OPTIONS] USER
331
332 and write the resulting challenge to standard error.
333 """
334 q = T.Queue()
335 cmd = Command(['connect', peer.name], q, 'connect',
336 ['/bin/sh', '-c', cmd], None)
337 _, kind, more = q.peek()
338 if kind == 'stdout':
339 if more is None:
340 S.warn('connect', 'connect', peer.name, 'unexpected-eof')
341 else:
342 chal = more
343 S.greet(peer.name, chal)
344 q.get()
345 potwatch('connect', peer.name, q)
346
347def run_disconnect(peer, cmd):
348 """
349 Start the job of disconnecting from a passive PEER.
350
351 The CMD string is a shell command which will disconnect from the peer.
352 """
353 q = T.Queue()
354 cmd = Command(['disconnect', peer.name], q, 'disconnect',
355 ['/bin/sh', '-c', cmd], None)
356 potwatch('disconnect', peer.name, q)
357
358_pingseq = 0
359class PingPeer (object):
360 """
361 Object representing a peer which we are pinging to ensure that it is still
362 present.
363
364 PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an
365 event queue -- which saves us from having an enormous swarm of coroutines
366 -- but most of the actual work is done here.
367
368 In order to avoid confusion between different PingPeer instances for the
369 same actual peer, each PingPeer has a sequence number (its `seq'
370 attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair.
371 (Using the PingPeer instance itself will prevent garbage collection of
372 otherwise defunct instances.)
373 """
374
375 def __init__(me, pinger, queue, peer, pingnow):
376 """
377 Create a new PingPeer.
378
379 The PINGER is the Pinger object we should send the results to. This is
380 used when we remove ourselves, if the peer has been explicitly removed.
381
382 The QUEUE is the event queue on which timer and ping-command events
383 should be written.
384
385 The PEER is a `Peer' object describing the peer.
386
387 If PINGNOW is true, then immediately start pinging the peer. Otherwise
388 wait until the usual retry interval.
389 """
390 global _pingseq
391 me._pinger = pinger
392 me._q = queue
393 me._peer = peer.name
394 me.update(peer)
395 me.seq = _pingseq
396 _pingseq += 1
397 me._failures = 0
398 if pingnow:
399 me._timer = None
400 me._ping()
401 else:
402 me._timer = M.SelTimer(time() + me._every, me._time)
403
404 def update(me, peer):
405 """
406 Refreshes the timer parameters for this peer. We don't, however,
407 immediately reschedule anything: that will happen next time anything
408 interesting happens.
409 """
410 if peer is None: peer = Peer(me._peer)
411 assert peer.name == me._peer
412 me._every = peer.get('every', filter = T.timespec, default = 120)
413 me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
414 me._retries = peer.get('retries', filter = int, default = 5)
415 me._connectp = peer.has('connect')
416 return me
417
418 def _ping(me):
419 """
420 Send a ping to the peer; the result is sent to the Pinger's event queue.
421 """
422 S.rawcommand(T.TripeAsynchronousCommand(
423 me._q, (me._peer, me.seq),
424 ['EPING',
425 '-background', S.bgtag(),
426 '-timeout', str(me._timeout),
427 '--',
428 me._peer]))
429
430 def _reconnect(me):
431 peer = Peer(me._peer)
432 if me._connectp:
433 S.warn('connect', 'reconnecting', me._peer)
434 S.forcekx(me._peer)
435 T.spawn(run_connect, peer, peer.get('connect'))
436 me._timer = M.SelTimer(time() + me._every, me._time)
437 else:
438 S.kill(me._peer)
439
440 def event(me, code, stuff):
441 """
442 Respond to an event which happened to this peer.
443
444 Timer events indicate that we should start a new ping. (The server has
445 its own timeout which detects lost packets.)
446
447 We trap unknown-peer responses and detach from the Pinger.
448
449 If the ping fails and we run out of retries, we attempt to restart the
450 connection.
451 """
452 if code == 'TIMER':
453 me._failures = 0
454 me._ping()
455 elif code == 'FAIL':
456 S.notify('connect', 'ping-failed', me._peer, *stuff)
457 if not stuff:
458 pass
459 elif stuff[0] == 'unknown-peer':
460 me._pinger.kill(me._peer)
461 elif stuff[0] == 'ping-send-failed':
462 me._reconnect()
463 elif code == 'INFO':
464 if stuff[0] == 'ping-ok':
465 if me._failures > 0:
466 S.warn('connect', 'ping-ok', me._peer)
467 me._timer = M.SelTimer(time() + me._every, me._time)
468 elif stuff[0] == 'ping-timeout':
469 me._failures += 1
470 S.warn('connect', 'ping-timeout', me._peer,
471 'attempt', str(me._failures), 'of', str(me._retries))
472 if me._failures < me._retries:
473 me._ping()
474 else:
475 me._reconnect()
476 elif stuff[0] == 'ping-peer-died':
477 me._pinger.kill(me._peer)
478
479 @T._callback
480 def _time(me):
481 """
482 Handle timer callbacks by posting a timeout event on the queue.
483 """
484 me._timer = None
485 me._q.put(((me._peer, me.seq), 'TIMER', None))
486
487 def __str__(me):
488 return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
489 def __repr__(me):
490 return str(me)
491
492class Pinger (T.Coroutine):
493 """
494 The Pinger keeps track of the peers which we expect to be connected and
495 takes action if they seem to stop responding.
496
497 There is usually only one Pinger, called pinger.
498
499 The Pinger maintains a collection of PingPeer objects, and an event queue.
500 The PingPeers direct the results of their pings, and timer events, to the
501 event queue. The Pinger's coroutine picks items off the queue and
502 dispatches them back to the PingPeers as appropriate.
503 """
504
505 def __init__(me):
506 """Initialize the Pinger."""
507 T.Coroutine.__init__(me)
508 me._peers = {}
509 me._q = T.Queue()
510
511 def run(me):
512 """
513 Coroutine function: reads the pinger queue and sends events to the
514 PingPeer objects they correspond to.
515 """
516 while True:
517 (peer, seq), code, stuff = me._q.get()
518 if peer in me._peers and seq == me._peers[peer].seq:
519 me._peers[peer].event(code, stuff)
520
521 def add(me, peer, pingnow):
522 """
523 Add PEER to the collection of peers under the Pinger's watchful eye.
524 The arguments are as for PingPeer: see above.
525 """
526 me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
527 return me
528
529 def kill(me, peername):
530 """Remove PEER from the peers being watched by the Pinger."""
47912108
MW
531 try: del me._peers[peername]
532 except KeyError: pass
d64ce4ae
MW
533 return me
534
535 def rescan(me, startup):
536 """
537 General resynchronization method.
538
539 We scan the list of peers (with connect scripts) known at the server.
540 Any which are known to the Pinger but aren't known to the server are
541 removed from our list; newly arrived peers are added. (Note that a peer
542 can change state here either due to the server sneakily changing its list
543 without issuing notifications or, more likely, the database changing its
544 idea of whether a peer is interesting.) Finally, PingPeers which are
545 still present are prodded to update their timing parameters.
546
547 This method is called once at startup to pick up the peers already
548 installed, and again by the dbwatcher coroutine when it detects a change
549 to the database.
550 """
551 if T._debug: print '# rescan peers'
552 correct = {}
553 start = {}
554 for name in S.list():
555 try: peer = Peer(name)
556 except KeyError: continue
557 if peer.get('watch', filter = boolean, default = False):
558 if T._debug: print '# interesting peer %s' % peer
559 correct[peer.name] = start[peer.name] = peer
560 elif startup:
561 if T._debug: print '# peer %s ready for adoption' % peer
562 start[peer.name] = peer
563 for name, obj in me._peers.items():
564 try:
565 peer = correct[name]
566 except KeyError:
567 if T._debug: print '# peer %s vanished' % name
568 del me._peers[name]
569 else:
570 obj.update(peer)
571 for name, peer in start.iteritems():
572 if name in me._peers: continue
573 if startup:
574 if T._debug: print '# setting up peer %s' % name
575 ifname = S.ifname(name)
576 addr = S.addr(name)
577 T.defer(adoptpeer, peer, ifname, *addr)
578 else:
579 if T._debug: print '# adopting new peer %s' % name
580 me.add(peer, True)
581 return me
582
583 def adopted(me):
584 """
585 Returns the list of peers being watched by the Pinger.
586 """
587 return me._peers.keys()
588
fb52c291
MW
589 def find(me, name):
590 """Return the PingPeer with the given name."""
591 return me._peers[name]
592
d64ce4ae
MW
593###--------------------------------------------------------------------------
594### New connections.
595
596def encode_envvars(env, prefix, vars):
597 """
598 Encode the variables in VARS suitably for including in a program
599 environment. Lowercase letters in variable names are forced to uppercase;
600 runs of non-alphanumeric characters are replaced by single underscores; and
601 the PREFIX is prepended. The resulting variables are written to ENV.
602 """
603 for k, v in vars.iteritems():
604 env[prefix + r_bad.sub('_', k.upper())] = v
605
606r_bad = RX.compile(r'[\W_]+')
607def envvars(peer):
608 """
609 Translate the database information for a PEER into a dictionary of
610 environment variables with plausible upper-case names and a P_ prefix.
611 Also collect the crypto information into A_ variables.
612 """
613 env = {}
614 encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
615 encode_envvars(env, 'A_', S.algs(peer.name))
616 return env
617
618def run_ifupdown(what, peer, *args):
619 """
620 Run the interface up/down script for a peer.
621
622 WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. ARGS is a
623 list of arguments to pass to the script, in addition to the peer name.
624
625 The command is run and watched in the background by potwatch.
626 """
627 q = T.Queue()
628 c = Command([what, peer.name], q, what,
629 M.split(peer.get(what), quotep = True)[0] +
630 [peer.name] + list(args),
631 envvars(peer))
632 potwatch(what, peer.name, q)
633
634def adoptpeer(peer, ifname, *addr):
635 """
636 Add a new peer to our collection.
637
638 PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
639 ADDR is the list of tokens representing its address.
640
641 We try to bring up the interface and provoke a connection to the peer if
642 it's passive.
643 """
644 if peer.has('ifup'):
645 T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
646 .switch('ifup', peer, ifname, *addr)
647 cmd = peer.get('connect', default = None)
648 if cmd is not None:
649 T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
650 .switch(peer, cmd)
651 if peer.get('watch', filter = boolean, default = False):
652 pinger.add(peer, False)
653
654def disownpeer(peer):
655 """Drop the PEER from the Pinger and put its interface to bed."""
656 try: pinger.kill(peer)
657 except KeyError: pass
658 cmd = peer.get('disconnect', default = None)
659 if cmd is not None:
660 T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
661 .switch(peer, cmd)
662 if peer.has('ifdown'):
663 T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
664 .switch('ifdown', peer)
665
a62f8e8a
MW
666def addpeer(peer, addr):
667 """
668 Process a connect request from a new peer PEER on address ADDR.
669
670 Any existing peer with this name is disconnected from the server.
671 """
672 if peer.name in S.list():
673 S.kill(peer.name)
674 try:
6411163d 675 booltrue = ['t', 'true', 'y', 'yes', 'on']
a62f8e8a
MW
676 S.add(peer.name,
677 tunnel = peer.get('tunnel', None),
678 keepalive = peer.get('keepalive', None),
48b84569 679 key = peer.get('key', None),
fe2a5dcf 680 priv = peer.get('priv', None),
6411163d
MW
681 mobile = peer.get('mobile', 'nil') in booltrue,
682 cork = peer.get('cork', 'nil') in booltrue,
a62f8e8a
MW
683 *addr)
684 except T.TripeError, exc:
685 raise T.TripeJobError(*exc.args)
686
d64ce4ae
MW
687## Dictionary mapping challenges to waiting passive-connection coroutines.
688chalmap = {}
689
690def notify(_, code, *rest):
691 """
692 Watch for notifications.
693
694 We trap ADD and KILL notifications, and send them straight to adoptpeer and
695 disownpeer respectively; and dispatch GREET notifications to the
696 corresponding waiting coroutine.
697 """
698 if code == 'ADD':
699 try: p = Peer(rest[0])
700 except KeyError: return
701 adoptpeer(p, *rest[1:])
702 elif code == 'KILL':
703 try: p = Peer(rest[0])
704 except KeyError: return
705 disownpeer(p, *rest[1:])
706 elif code == 'GREET':
707 chal = rest[0]
708 try: cr = chalmap[chal]
709 except KeyError: pass
710 else: cr.switch(rest[1:])
711
712###--------------------------------------------------------------------------
713### Command implementation.
714
715def cmd_kick(name):
716 """
717 kick NAME: Force a new connection attempt for the NAMEd peer.
718 """
fb52c291
MW
719 try: pp = pinger.find(name)
720 except KeyError: raise T.TripeJobError('peer-not-adopted', name)
d64ce4ae
MW
721 try: peer = Peer(name)
722 except KeyError: raise T.TripeJobError('unknown-peer', name)
8bd108e4
MW
723 conn = peer.get('connect', None)
724 if conn: T.spawn(run_connect, peer, peer.get('connect'))
725 else: T.spawn(lambda p: S.forcekx(p.name), peer)
d64ce4ae
MW
726
727def cmd_adopted():
728 """
729 adopted: Report a list of adopted peers.
730 """
731 for name in pinger.adopted():
732 T.svcinfo(name)
733
a62f8e8a
MW
734def cmd_active(name):
735 """
736 active NAME: Handle an active connection request for the peer called NAME.
737
738 The appropriate address is read from the database automatically.
739 """
d64ce4ae
MW
740 try: peer = Peer(name)
741 except KeyError: raise T.TripeJobError('unknown-peer', name)
a62f8e8a
MW
742 addr = peer.get('peer')
743 if addr == 'PASSIVE':
744 raise T.TripeJobError('passive-peer', name)
745 addpeer(peer, M.split(addr, quotep = True)[0])
746
d64ce4ae 747def cmd_listactive():
a62f8e8a
MW
748 """
749 list: Report a list of the available active peers.
750 """
751 cdb = CDB.init(opts.cdb)
752 for key in cdb.keys():
753 if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
754 T.svcinfo(key[1:])
755
756def cmd_info(name):
757 """
758 info NAME: Report the database entries for the named peer.
759 """
d64ce4ae
MW
760 try: peer = Peer(name)
761 except KeyError: raise T.TripeJobError('unknown-peer', name)
a62f8e8a
MW
762 items = list(peer.list())
763 items.sort()
764 for i in items:
765 T.svcinfo('%s=%s' % (i, peer.get(i)))
766
d3731285
MW
767def cmd_userpeer(user):
768 """
769 userpeer USER: Report the peer name for the named user.
770 """
d64ce4ae
MW
771 try: name = CDB.init(opts.cdb)['U' + user]
772 except KeyError: raise T.TripeJobError('unknown-user', user)
773 T.svcinfo(name)
a62f8e8a
MW
774
775def cmd_passive(*args):
776 """
777 passive [OPTIONS] USER: Await the arrival of the named USER.
778
779 Report a challenge; when (and if!) the server receives a greeting quoting
780 this challenge, add the corresponding peer to the server.
781 """
782 timeout = 30
783 op = T.OptParse(args, ['-timeout'])
784 for opt in op:
785 if opt == '-timeout':
786 timeout = T.timespec(op.arg())
787 user, = op.rest(1, 1)
d64ce4ae
MW
788 try: name = CDB.init(opts.cdb)['U' + user]
789 except KeyError: raise T.TripeJobError('unknown-user', user)
790 try: peer = Peer(name)
791 except KeyError: raise T.TripeJobError('unknown-peer', name)
a62f8e8a
MW
792 chal = S.getchal()
793 cr = T.Coroutine.getcurrent()
794 timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
795 try:
796 T.svcinfo(chal)
797 chalmap[chal] = cr
798 addr = cr.parent.switch()
799 if addr is None:
800 raise T.TripeJobError('connect-timeout')
d64ce4ae 801 addpeer(peer, addr)
a62f8e8a
MW
802 finally:
803 del chalmap[chal]
804
a62f8e8a
MW
805###--------------------------------------------------------------------------
806### Start up.
807
808def setup():
809 """
810 Service setup.
811
d64ce4ae
MW
812 Register the notification watcher, rescan the peers, and add automatic
813 active peers.
a62f8e8a
MW
814 """
815 S.handler['NOTE'] = notify
816 S.watch('+n')
d64ce4ae
MW
817
818 pinger.rescan(opts.startup)
819
a62f8e8a
MW
820 if opts.startup:
821 cdb = CDB.init(opts.cdb)
822 try:
823 autos = cdb['%AUTO']
824 except KeyError:
825 autos = ''
826 for name in M.split(autos)[0]:
827 try:
828 peer = Peer(name, cdb)
829 addpeer(peer, M.split(peer.get('peer'), quotep = True)[0])
830 except T.TripeJobError, err:
831 S.warn('connect', 'auto-add-failed', name, *err.args)
832
d64ce4ae
MW
833def init():
834 """
835 Initialization to be done before service startup.
836 """
837 global errorwatch, childwatch, pinger
838 errorwatch = ErrorWatch()
839 childwatch = ChildWatch()
840 pinger = Pinger()
841 T.Coroutine(dbwatch, name = 'dbwatch').switch()
842 errorwatch.switch()
843 pinger.switch()
844
a62f8e8a
MW
845def parse_options():
846 """
847 Parse the command-line options.
848
849 Automatically changes directory to the requested configdir, and turns on
850 debugging. Returns the options object.
851 """
852 op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
853 version = '%%prog %s' % VERSION)
854
855 op.add_option('-a', '--admin-socket',
856 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
857 help = 'Select socket to connect to [default %default]')
858 op.add_option('-d', '--directory',
859 metavar = 'DIR', dest = 'dir', default = T.configdir,
860 help = 'Select current diretory [default %default]')
861 op.add_option('-p', '--peerdb',
862 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
863 help = 'Select peers database [default %default]')
864 op.add_option('--daemon', dest = 'daemon',
865 default = False, action = 'store_true',
866 help = 'Become a daemon after successful initialization')
867 op.add_option('--debug', dest = 'debug',
868 default = False, action = 'store_true',
869 help = 'Emit debugging trace information')
870 op.add_option('--startup', dest = 'startup',
871 default = False, action = 'store_true',
872 help = 'Being called as part of the server startup')
873
874 opts, args = op.parse_args()
875 if args: op.error('no arguments permitted')
876 OS.chdir(opts.dir)
877 T._debug = opts.debug
878 return opts
879
880## Service table, for running manually.
d64ce4ae
MW
881service_info = [('connect', T.VERSION, {
882 'adopted': (0, 0, '', cmd_adopted),
883 'kick': (1, 1, 'PEER', cmd_kick),
a62f8e8a
MW
884 'passive': (1, None, '[OPTIONS] USER', cmd_passive),
885 'active': (1, 1, 'PEER', cmd_active),
886 'info': (1, 1, 'PEER', cmd_info),
d64ce4ae 887 'list-active': (0, 0, '', cmd_listactive),
d3731285 888 'userpeer': (1, 1, 'USER', cmd_userpeer)
a62f8e8a
MW
889})]
890
891if __name__ == '__main__':
892 opts = parse_options()
893 T.runservices(opts.tripesock, service_info,
d64ce4ae 894 init = init, setup = setup,
a62f8e8a
MW
895 daemon = opts.daemon)
896
897###----- That's all, folks --------------------------------------------------