chiark / gitweb /
mon/tripemon.in: Show per-peer crypto details in peer info sheet.
[tripe] / svc / connect.in
CommitLineData
a62f8e8a
MW
1#! @PYTHON@
2### -*-python-*-
3###
d64ce4ae 4### Connect to remote peers, and keep track of them
a62f8e8a 5###
d64ce4ae 6### (c) 2007 Straylight/Edgeware
a62f8e8a
MW
7###
8
9###----- Licensing notice ---------------------------------------------------
10###
11### This file is part of Trivial IP Encryption (TrIPE).
12###
13### TrIPE is free software; you can redistribute it and/or modify
14### it under the terms of the GNU General Public License as published by
15### the Free Software Foundation; either version 2 of the License, or
16### (at your option) any later version.
17###
18### TrIPE is distributed in the hope that it will be useful,
19### but WITHOUT ANY WARRANTY; without even the implied warranty of
20### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21### GNU General Public License for more details.
22###
23### You should have received a copy of the GNU General Public License
24### along with TrIPE; if not, write to the Free Software Foundation,
25### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
26
27VERSION = '@VERSION@'
28
29###--------------------------------------------------------------------------
30### External dependencies.
31
32from optparse import OptionParser
33import tripe as T
34import os as OS
d64ce4ae
MW
35import signal as SIG
36import errno as E
a62f8e8a
MW
37import cdb as CDB
38import mLib as M
d64ce4ae 39import re as RX
a62f8e8a 40from time import time
d64ce4ae 41import subprocess as PROC
a62f8e8a
MW
42
43S = T.svcmgr
44
45###--------------------------------------------------------------------------
d64ce4ae
MW
46### Running auxiliary commands.
47
48class SelLineQueue (M.SelLineBuffer):
49 """Glues the select-line-buffer into the coroutine queue system."""
50
51 def __new__(cls, file, queue, tag, kind):
52 """See __init__ for documentation."""
53 return M.SelLineBuffer.__new__(cls, file.fileno())
54
55 def __init__(me, file, queue, tag, kind):
56 """
57 Initialize a new line-reading adaptor.
58
59 The adaptor reads lines from FILE. Each line is inserted as a message of
60 the stated KIND, bearing the TAG, into the QUEUE. End-of-file is
61 represented as None.
62 """
63 me._q = queue
64 me._file = file
65 me._tag = tag
66 me._kind = kind
67 me.enable()
68
69 @T._callback
70 def line(me, line):
71 me._q.put((me._tag, me._kind, line))
72
73 @T._callback
74 def eof(me):
75 me.disable()
76 me._q.put((me._tag, me._kind, None))
77
78class ErrorWatch (T.Coroutine):
79 """
80 An object which watches stderr streams for errors and converts them into
81 warnings of the form
82
83 WARN connect INFO stderr LINE
84
85 The INFO is a list of tokens associated with the file when it was
86 registered.
87
88 Usually there is a single ErrorWatch object, called errorwatch.
89 """
90
91 def __init__(me):
92 """Initialization: there are no arguments."""
93 T.Coroutine.__init__(me)
94 me._q = T.Queue()
95 me._map = {}
96 me._seq = 1
97
98 def watch(me, file, info):
99 """
100 Adds FILE to the collection of files to watch.
101
102 INFO will be written in the warning messages from this FILE. Returns a
103 sequence number which can be used to unregister the file again.
104 """
105 seq = me._seq
106 me._seq += 1
107 me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
108 return seq
109
110 def unwatch(me, seq):
111 """Stop watching the file with sequence number SEQ."""
112 del me._map[seq]
113 return me
114
115 def run(me):
116 """
117 Coroutine function: read items from the queue and report them.
118
119 Unregisters files automatically when they reach EOF.
120 """
121 while True:
122 seq, _, line = me._q.get()
123 if line is None:
124 me.unwatch(seq)
125 else:
126 S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
127
128def dbwatch():
129 """
130 Coroutine function: wake up every second and notice changes to the
131 database. When a change happens, tell the Pinger (q.v.) to rescan its
132 peers.
133 """
134 cr = T.Coroutine.getcurrent()
135 main = cr.parent
136 fw = M.FWatch(opts.cdb)
137 while True:
138 timer = M.SelTimer(time() + 1, lambda: cr.switch())
139 main.switch()
140 if fw.update():
141 pinger.rescan(False)
142 S.notify('connect', 'peerdb-update')
143
144class ChildWatch (M.SelSignal):
145 """
146 An object which watches for specified processes exiting and reports
147 terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
148
149 There is usually only one ChildWatch object, called childwatch.
150 """
151
152 def __new__(cls):
153 """Initialize the child-watcher."""
154 return M.SelSignal.__new__(cls, SIG.SIGCHLD)
155
156 def __init__(me):
157 """Initialize the child-watcher."""
158 me._pid = {}
159 me.enable()
160
161 def watch(me, pid, queue, tag):
162 """
163 Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE)
164 to the QUEUE, where CODE is one of
165
166 * None (successful termination)
167 * ['exit-nonzero', CODE] (CODE is a string!)
168 * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
169 * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
170 """
171 me._pid[pid] = queue, tag
172 return me
173
174 def unwatch(me, pid):
175 """Unregister PID as a child to watch."""
176 del me._pid[pid]
177 return me
178
179 @T._callback
180 def signalled(me):
181 """
182 Called when child processes exit: collect exit statuses and report
183 failures.
184 """
185 while True:
186 try:
187 pid, status = OS.waitpid(-1, OS.WNOHANG)
188 except OSError, exc:
189 if exc.errno == E.ECHILD:
190 break
191 if pid == 0:
192 break
193 if pid not in me._pid:
194 continue
195 queue, tag = me._pid[pid]
196 if OS.WIFEXITED(status):
197 exit = OS.WEXITSTATUS(status)
198 if exit == 0:
199 code = None
200 else:
201 code = ['exit-nonzero', str(exit)]
202 elif OS.WIFSIGNALED(status):
203 code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
204 else:
205 code = ['exit-unknown', hex(status)]
206 queue.put((tag, 'exit', code))
207
208class Command (object):
209 """
210 Represents a running command.
211
212 This class is the main interface to the machery provided by the ChildWatch
213 and ErrorWatch objects. See also potwatch.
214 """
215
216 def __init__(me, info, queue, tag, args, env):
217 """
218 Start a new child process.
219
220 The ARGS are a list of arguments to be given to the child process. The
221 ENV is either None or a dictionary of environment variable assignments to
222 override the extant environment. INFO is a list of tokens to be included
223 in warnings about the child's stderr output. If the child writes a line
224 to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the
225 child exits, write (TAG, 'exit', CODE) to the QUEUE.
226 """
227 me._info = info
228 me._q = queue
229 me._tag = tag
230 myenv = OS.environ.copy()
231 if env: myenv.update(env)
232 me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
233 stdout = PROC.PIPE, stderr = PROC.PIPE)
234 me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
235 errorwatch.watch(me._proc.stderr, info)
236 childwatch.watch(me._proc.pid, queue, tag)
237
238 def __del__(me):
239 """
240 If I've been forgotten then stop watching for termination.
241 """
242 childwatch.unwatch(me._proc.pid)
243
244def potwatch(what, name, q):
245 """
246 Watch the queue Q for activity as reported by a Command object.
247
248 Information from the process's stdout is reported as
249
250 NOTE WHAT NAME stdout LINE
251
252 abnormal termination is reported as
253
254 WARN WHAT NAME CODE
255
256 where CODE is what the ChildWatch wrote.
257 """
258 eofp = deadp = False
259 while not deadp or not eofp:
260 _, kind, more = q.get()
261 if kind == 'stdout':
262 if more is None:
263 eofp = True
264 else:
265 S.notify('connect', what, name, 'stdout', more)
266 elif kind == 'exit':
267 if more: S.warn('connect', what, name, *more)
268 deadp = True
269
270###--------------------------------------------------------------------------
271### Peer database utilities.
a62f8e8a
MW
272
273_magic = ['_magic'] # An object distinct from all others
274
275class Peer (object):
276 """Representation of a peer in the database."""
277
278 def __init__(me, peer, cdb = None):
279 """
280 Create a new peer, named PEER.
281
282 Information about the peer is read from the database CDB, or the default
283 one given on the command-line.
284 """
285 me.name = peer
d64ce4ae 286 record = (cdb or CDB.init(opts.cdb))['P' + peer]
a62f8e8a
MW
287 me.__dict__.update(M.URLDecode(record, semip = True))
288
d64ce4ae 289 def get(me, key, default = _magic, filter = None):
a62f8e8a
MW
290 """
291 Get the information stashed under KEY from the peer's database record.
292
293 If DEFAULT is given, then use it if the database doesn't contain the
d64ce4ae
MW
294 necessary information. If no DEFAULT is given, then report an error. If
295 a FILTER function is given then apply it to the information from the
296 database before returning it.
a62f8e8a
MW
297 """
298 attr = me.__dict__.get(key, default)
299 if attr is _magic:
300 raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
d64ce4ae
MW
301 elif filter is not None:
302 attr = filter(attr)
a62f8e8a
MW
303 return attr
304
d64ce4ae
MW
305 def has(me, key):
306 """
307 Return whether the peer's database record has the KEY.
308 """
309 return key in me.__dict__
310
a62f8e8a
MW
311 def list(me):
312 """
313 Iterate over the available keys in the peer's database record.
314 """
315 return me.__dict__.iterkeys()
316
d64ce4ae
MW
317def boolean(value):
318 """Parse VALUE as a boolean."""
319 return value in ['t', 'true', 'y', 'yes', 'on']
320
321###--------------------------------------------------------------------------
322### Waking up and watching peers.
323
324def run_connect(peer, cmd):
325 """
326 Start the job of connecting to the passive PEER.
327
328 The CMD string is a shell command which will connect to the peer (via some
329 back-channel, say ssh and userv), issue a command
330
331 SVCSUBMIT connect passive [OPTIONS] USER
332
333 and write the resulting challenge to standard error.
334 """
335 q = T.Queue()
336 cmd = Command(['connect', peer.name], q, 'connect',
337 ['/bin/sh', '-c', cmd], None)
338 _, kind, more = q.peek()
339 if kind == 'stdout':
340 if more is None:
341 S.warn('connect', 'connect', peer.name, 'unexpected-eof')
342 else:
343 chal = more
344 S.greet(peer.name, chal)
345 q.get()
346 potwatch('connect', peer.name, q)
347
348def run_disconnect(peer, cmd):
349 """
350 Start the job of disconnecting from a passive PEER.
351
352 The CMD string is a shell command which will disconnect from the peer.
353 """
354 q = T.Queue()
355 cmd = Command(['disconnect', peer.name], q, 'disconnect',
356 ['/bin/sh', '-c', cmd], None)
357 potwatch('disconnect', peer.name, q)
358
359_pingseq = 0
360class PingPeer (object):
361 """
362 Object representing a peer which we are pinging to ensure that it is still
363 present.
364
365 PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an
366 event queue -- which saves us from having an enormous swarm of coroutines
367 -- but most of the actual work is done here.
368
369 In order to avoid confusion between different PingPeer instances for the
370 same actual peer, each PingPeer has a sequence number (its `seq'
371 attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair.
372 (Using the PingPeer instance itself will prevent garbage collection of
373 otherwise defunct instances.)
374 """
375
376 def __init__(me, pinger, queue, peer, pingnow):
377 """
378 Create a new PingPeer.
379
380 The PINGER is the Pinger object we should send the results to. This is
381 used when we remove ourselves, if the peer has been explicitly removed.
382
383 The QUEUE is the event queue on which timer and ping-command events
384 should be written.
385
386 The PEER is a `Peer' object describing the peer.
387
388 If PINGNOW is true, then immediately start pinging the peer. Otherwise
389 wait until the usual retry interval.
390 """
391 global _pingseq
392 me._pinger = pinger
393 me._q = queue
394 me._peer = peer.name
395 me.update(peer)
396 me.seq = _pingseq
397 _pingseq += 1
398 me._failures = 0
399 if pingnow:
400 me._timer = None
401 me._ping()
402 else:
403 me._timer = M.SelTimer(time() + me._every, me._time)
404
405 def update(me, peer):
406 """
407 Refreshes the timer parameters for this peer. We don't, however,
408 immediately reschedule anything: that will happen next time anything
409 interesting happens.
410 """
411 if peer is None: peer = Peer(me._peer)
412 assert peer.name == me._peer
413 me._every = peer.get('every', filter = T.timespec, default = 120)
414 me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
415 me._retries = peer.get('retries', filter = int, default = 5)
416 me._connectp = peer.has('connect')
417 return me
418
419 def _ping(me):
420 """
421 Send a ping to the peer; the result is sent to the Pinger's event queue.
422 """
423 S.rawcommand(T.TripeAsynchronousCommand(
424 me._q, (me._peer, me.seq),
425 ['EPING',
426 '-background', S.bgtag(),
427 '-timeout', str(me._timeout),
428 '--',
429 me._peer]))
430
431 def _reconnect(me):
432 peer = Peer(me._peer)
433 if me._connectp:
434 S.warn('connect', 'reconnecting', me._peer)
435 S.forcekx(me._peer)
436 T.spawn(run_connect, peer, peer.get('connect'))
437 me._timer = M.SelTimer(time() + me._every, me._time)
438 else:
439 S.kill(me._peer)
440
441 def event(me, code, stuff):
442 """
443 Respond to an event which happened to this peer.
444
445 Timer events indicate that we should start a new ping. (The server has
446 its own timeout which detects lost packets.)
447
448 We trap unknown-peer responses and detach from the Pinger.
449
450 If the ping fails and we run out of retries, we attempt to restart the
451 connection.
452 """
453 if code == 'TIMER':
454 me._failures = 0
455 me._ping()
456 elif code == 'FAIL':
457 S.notify('connect', 'ping-failed', me._peer, *stuff)
458 if not stuff:
459 pass
460 elif stuff[0] == 'unknown-peer':
461 me._pinger.kill(me._peer)
462 elif stuff[0] == 'ping-send-failed':
463 me._reconnect()
464 elif code == 'INFO':
465 if stuff[0] == 'ping-ok':
466 if me._failures > 0:
467 S.warn('connect', 'ping-ok', me._peer)
468 me._timer = M.SelTimer(time() + me._every, me._time)
469 elif stuff[0] == 'ping-timeout':
470 me._failures += 1
471 S.warn('connect', 'ping-timeout', me._peer,
472 'attempt', str(me._failures), 'of', str(me._retries))
473 if me._failures < me._retries:
474 me._ping()
475 else:
476 me._reconnect()
477 elif stuff[0] == 'ping-peer-died':
478 me._pinger.kill(me._peer)
479
480 @T._callback
481 def _time(me):
482 """
483 Handle timer callbacks by posting a timeout event on the queue.
484 """
485 me._timer = None
486 me._q.put(((me._peer, me.seq), 'TIMER', None))
487
488 def __str__(me):
489 return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
490 def __repr__(me):
491 return str(me)
492
493class Pinger (T.Coroutine):
494 """
495 The Pinger keeps track of the peers which we expect to be connected and
496 takes action if they seem to stop responding.
497
498 There is usually only one Pinger, called pinger.
499
500 The Pinger maintains a collection of PingPeer objects, and an event queue.
501 The PingPeers direct the results of their pings, and timer events, to the
502 event queue. The Pinger's coroutine picks items off the queue and
503 dispatches them back to the PingPeers as appropriate.
504 """
505
506 def __init__(me):
507 """Initialize the Pinger."""
508 T.Coroutine.__init__(me)
509 me._peers = {}
510 me._q = T.Queue()
511
512 def run(me):
513 """
514 Coroutine function: reads the pinger queue and sends events to the
515 PingPeer objects they correspond to.
516 """
517 while True:
518 (peer, seq), code, stuff = me._q.get()
519 if peer in me._peers and seq == me._peers[peer].seq:
520 me._peers[peer].event(code, stuff)
521
522 def add(me, peer, pingnow):
523 """
524 Add PEER to the collection of peers under the Pinger's watchful eye.
525 The arguments are as for PingPeer: see above.
526 """
527 me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
528 return me
529
530 def kill(me, peername):
531 """Remove PEER from the peers being watched by the Pinger."""
532 del me._peers[peername]
533 return me
534
535 def rescan(me, startup):
536 """
537 General resynchronization method.
538
539 We scan the list of peers (with connect scripts) known at the server.
540 Any which are known to the Pinger but aren't known to the server are
541 removed from our list; newly arrived peers are added. (Note that a peer
542 can change state here either due to the server sneakily changing its list
543 without issuing notifications or, more likely, the database changing its
544 idea of whether a peer is interesting.) Finally, PingPeers which are
545 still present are prodded to update their timing parameters.
546
547 This method is called once at startup to pick up the peers already
548 installed, and again by the dbwatcher coroutine when it detects a change
549 to the database.
550 """
551 if T._debug: print '# rescan peers'
552 correct = {}
553 start = {}
554 for name in S.list():
555 try: peer = Peer(name)
556 except KeyError: continue
557 if peer.get('watch', filter = boolean, default = False):
558 if T._debug: print '# interesting peer %s' % peer
559 correct[peer.name] = start[peer.name] = peer
560 elif startup:
561 if T._debug: print '# peer %s ready for adoption' % peer
562 start[peer.name] = peer
563 for name, obj in me._peers.items():
564 try:
565 peer = correct[name]
566 except KeyError:
567 if T._debug: print '# peer %s vanished' % name
568 del me._peers[name]
569 else:
570 obj.update(peer)
571 for name, peer in start.iteritems():
572 if name in me._peers: continue
573 if startup:
574 if T._debug: print '# setting up peer %s' % name
575 ifname = S.ifname(name)
576 addr = S.addr(name)
577 T.defer(adoptpeer, peer, ifname, *addr)
578 else:
579 if T._debug: print '# adopting new peer %s' % name
580 me.add(peer, True)
581 return me
582
583 def adopted(me):
584 """
585 Returns the list of peers being watched by the Pinger.
586 """
587 return me._peers.keys()
588
589###--------------------------------------------------------------------------
590### New connections.
591
592def encode_envvars(env, prefix, vars):
593 """
594 Encode the variables in VARS suitably for including in a program
595 environment. Lowercase letters in variable names are forced to uppercase;
596 runs of non-alphanumeric characters are replaced by single underscores; and
597 the PREFIX is prepended. The resulting variables are written to ENV.
598 """
599 for k, v in vars.iteritems():
600 env[prefix + r_bad.sub('_', k.upper())] = v
601
602r_bad = RX.compile(r'[\W_]+')
603def envvars(peer):
604 """
605 Translate the database information for a PEER into a dictionary of
606 environment variables with plausible upper-case names and a P_ prefix.
607 Also collect the crypto information into A_ variables.
608 """
609 env = {}
610 encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
611 encode_envvars(env, 'A_', S.algs(peer.name))
612 return env
613
614def run_ifupdown(what, peer, *args):
615 """
616 Run the interface up/down script for a peer.
617
618 WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. ARGS is a
619 list of arguments to pass to the script, in addition to the peer name.
620
621 The command is run and watched in the background by potwatch.
622 """
623 q = T.Queue()
624 c = Command([what, peer.name], q, what,
625 M.split(peer.get(what), quotep = True)[0] +
626 [peer.name] + list(args),
627 envvars(peer))
628 potwatch(what, peer.name, q)
629
630def adoptpeer(peer, ifname, *addr):
631 """
632 Add a new peer to our collection.
633
634 PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
635 ADDR is the list of tokens representing its address.
636
637 We try to bring up the interface and provoke a connection to the peer if
638 it's passive.
639 """
640 if peer.has('ifup'):
641 T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
642 .switch('ifup', peer, ifname, *addr)
643 cmd = peer.get('connect', default = None)
644 if cmd is not None:
645 T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
646 .switch(peer, cmd)
647 if peer.get('watch', filter = boolean, default = False):
648 pinger.add(peer, False)
649
650def disownpeer(peer):
651 """Drop the PEER from the Pinger and put its interface to bed."""
652 try: pinger.kill(peer)
653 except KeyError: pass
654 cmd = peer.get('disconnect', default = None)
655 if cmd is not None:
656 T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
657 .switch(peer, cmd)
658 if peer.has('ifdown'):
659 T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
660 .switch('ifdown', peer)
661
a62f8e8a
MW
662def addpeer(peer, addr):
663 """
664 Process a connect request from a new peer PEER on address ADDR.
665
666 Any existing peer with this name is disconnected from the server.
667 """
668 if peer.name in S.list():
669 S.kill(peer.name)
670 try:
6411163d 671 booltrue = ['t', 'true', 'y', 'yes', 'on']
a62f8e8a
MW
672 S.add(peer.name,
673 tunnel = peer.get('tunnel', None),
674 keepalive = peer.get('keepalive', None),
48b84569 675 key = peer.get('key', None),
fe2a5dcf 676 priv = peer.get('priv', None),
6411163d
MW
677 mobile = peer.get('mobile', 'nil') in booltrue,
678 cork = peer.get('cork', 'nil') in booltrue,
a62f8e8a
MW
679 *addr)
680 except T.TripeError, exc:
681 raise T.TripeJobError(*exc.args)
682
d64ce4ae
MW
683## Dictionary mapping challenges to waiting passive-connection coroutines.
684chalmap = {}
685
686def notify(_, code, *rest):
687 """
688 Watch for notifications.
689
690 We trap ADD and KILL notifications, and send them straight to adoptpeer and
691 disownpeer respectively; and dispatch GREET notifications to the
692 corresponding waiting coroutine.
693 """
694 if code == 'ADD':
695 try: p = Peer(rest[0])
696 except KeyError: return
697 adoptpeer(p, *rest[1:])
698 elif code == 'KILL':
699 try: p = Peer(rest[0])
700 except KeyError: return
701 disownpeer(p, *rest[1:])
702 elif code == 'GREET':
703 chal = rest[0]
704 try: cr = chalmap[chal]
705 except KeyError: pass
706 else: cr.switch(rest[1:])
707
708###--------------------------------------------------------------------------
709### Command implementation.
710
711def cmd_kick(name):
712 """
713 kick NAME: Force a new connection attempt for the NAMEd peer.
714 """
715 if name not in pinger.adopted():
716 raise T.TripeJobError('peer-not-adopted', name)
717 try: peer = Peer(name)
718 except KeyError: raise T.TripeJobError('unknown-peer', name)
719 T.spawn(connect, peer)
720
721def cmd_adopted():
722 """
723 adopted: Report a list of adopted peers.
724 """
725 for name in pinger.adopted():
726 T.svcinfo(name)
727
a62f8e8a
MW
728def cmd_active(name):
729 """
730 active NAME: Handle an active connection request for the peer called NAME.
731
732 The appropriate address is read from the database automatically.
733 """
d64ce4ae
MW
734 try: peer = Peer(name)
735 except KeyError: raise T.TripeJobError('unknown-peer', name)
a62f8e8a
MW
736 addr = peer.get('peer')
737 if addr == 'PASSIVE':
738 raise T.TripeJobError('passive-peer', name)
739 addpeer(peer, M.split(addr, quotep = True)[0])
740
d64ce4ae 741def cmd_listactive():
a62f8e8a
MW
742 """
743 list: Report a list of the available active peers.
744 """
745 cdb = CDB.init(opts.cdb)
746 for key in cdb.keys():
747 if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
748 T.svcinfo(key[1:])
749
750def cmd_info(name):
751 """
752 info NAME: Report the database entries for the named peer.
753 """
d64ce4ae
MW
754 try: peer = Peer(name)
755 except KeyError: raise T.TripeJobError('unknown-peer', name)
a62f8e8a
MW
756 items = list(peer.list())
757 items.sort()
758 for i in items:
759 T.svcinfo('%s=%s' % (i, peer.get(i)))
760
d3731285
MW
761def cmd_userpeer(user):
762 """
763 userpeer USER: Report the peer name for the named user.
764 """
d64ce4ae
MW
765 try: name = CDB.init(opts.cdb)['U' + user]
766 except KeyError: raise T.TripeJobError('unknown-user', user)
767 T.svcinfo(name)
a62f8e8a
MW
768
769def cmd_passive(*args):
770 """
771 passive [OPTIONS] USER: Await the arrival of the named USER.
772
773 Report a challenge; when (and if!) the server receives a greeting quoting
774 this challenge, add the corresponding peer to the server.
775 """
776 timeout = 30
777 op = T.OptParse(args, ['-timeout'])
778 for opt in op:
779 if opt == '-timeout':
780 timeout = T.timespec(op.arg())
781 user, = op.rest(1, 1)
d64ce4ae
MW
782 try: name = CDB.init(opts.cdb)['U' + user]
783 except KeyError: raise T.TripeJobError('unknown-user', user)
784 try: peer = Peer(name)
785 except KeyError: raise T.TripeJobError('unknown-peer', name)
a62f8e8a
MW
786 chal = S.getchal()
787 cr = T.Coroutine.getcurrent()
788 timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
789 try:
790 T.svcinfo(chal)
791 chalmap[chal] = cr
792 addr = cr.parent.switch()
793 if addr is None:
794 raise T.TripeJobError('connect-timeout')
d64ce4ae 795 addpeer(peer, addr)
a62f8e8a
MW
796 finally:
797 del chalmap[chal]
798
a62f8e8a
MW
799###--------------------------------------------------------------------------
800### Start up.
801
802def setup():
803 """
804 Service setup.
805
d64ce4ae
MW
806 Register the notification watcher, rescan the peers, and add automatic
807 active peers.
a62f8e8a
MW
808 """
809 S.handler['NOTE'] = notify
810 S.watch('+n')
d64ce4ae
MW
811
812 pinger.rescan(opts.startup)
813
a62f8e8a
MW
814 if opts.startup:
815 cdb = CDB.init(opts.cdb)
816 try:
817 autos = cdb['%AUTO']
818 except KeyError:
819 autos = ''
820 for name in M.split(autos)[0]:
821 try:
822 peer = Peer(name, cdb)
823 addpeer(peer, M.split(peer.get('peer'), quotep = True)[0])
824 except T.TripeJobError, err:
825 S.warn('connect', 'auto-add-failed', name, *err.args)
826
d64ce4ae
MW
827def init():
828 """
829 Initialization to be done before service startup.
830 """
831 global errorwatch, childwatch, pinger
832 errorwatch = ErrorWatch()
833 childwatch = ChildWatch()
834 pinger = Pinger()
835 T.Coroutine(dbwatch, name = 'dbwatch').switch()
836 errorwatch.switch()
837 pinger.switch()
838
a62f8e8a
MW
839def parse_options():
840 """
841 Parse the command-line options.
842
843 Automatically changes directory to the requested configdir, and turns on
844 debugging. Returns the options object.
845 """
846 op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
847 version = '%%prog %s' % VERSION)
848
849 op.add_option('-a', '--admin-socket',
850 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
851 help = 'Select socket to connect to [default %default]')
852 op.add_option('-d', '--directory',
853 metavar = 'DIR', dest = 'dir', default = T.configdir,
854 help = 'Select current diretory [default %default]')
855 op.add_option('-p', '--peerdb',
856 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
857 help = 'Select peers database [default %default]')
858 op.add_option('--daemon', dest = 'daemon',
859 default = False, action = 'store_true',
860 help = 'Become a daemon after successful initialization')
861 op.add_option('--debug', dest = 'debug',
862 default = False, action = 'store_true',
863 help = 'Emit debugging trace information')
864 op.add_option('--startup', dest = 'startup',
865 default = False, action = 'store_true',
866 help = 'Being called as part of the server startup')
867
868 opts, args = op.parse_args()
869 if args: op.error('no arguments permitted')
870 OS.chdir(opts.dir)
871 T._debug = opts.debug
872 return opts
873
874## Service table, for running manually.
d64ce4ae
MW
875service_info = [('connect', T.VERSION, {
876 'adopted': (0, 0, '', cmd_adopted),
877 'kick': (1, 1, 'PEER', cmd_kick),
a62f8e8a
MW
878 'passive': (1, None, '[OPTIONS] USER', cmd_passive),
879 'active': (1, 1, 'PEER', cmd_active),
880 'info': (1, 1, 'PEER', cmd_info),
d64ce4ae 881 'list-active': (0, 0, '', cmd_listactive),
d3731285 882 'userpeer': (1, 1, 'USER', cmd_userpeer)
a62f8e8a
MW
883})]
884
885if __name__ == '__main__':
886 opts = parse_options()
887 T.runservices(opts.tripesock, service_info,
d64ce4ae 888 init = init, setup = setup,
a62f8e8a
MW
889 daemon = opts.daemon)
890
891###----- That's all, folks --------------------------------------------------