chiark / gitweb /
svc/connect.in: Add a method for finding a named PingPeer.
[tripe] / svc / connect.in
CommitLineData
a62f8e8a
MW
1#! @PYTHON@
2### -*-python-*-
3###
d64ce4ae 4### Connect to remote peers, and keep track of them
a62f8e8a 5###
d64ce4ae 6### (c) 2007 Straylight/Edgeware
a62f8e8a
MW
7###
8
9###----- Licensing notice ---------------------------------------------------
10###
11### This file is part of Trivial IP Encryption (TrIPE).
12###
11ad66c2
MW
13### TrIPE is free software: you can redistribute it and/or modify it under
14### the terms of the GNU General Public License as published by the Free
15### Software Foundation; either version 3 of the License, or (at your
16### option) any later version.
a62f8e8a 17###
11ad66c2
MW
18### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20### FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21### for more details.
a62f8e8a
MW
22###
23### You should have received a copy of the GNU General Public License
11ad66c2 24### along with TrIPE. If not, see <https://www.gnu.org/licenses/>.
a62f8e8a
MW
25
26VERSION = '@VERSION@'
27
28###--------------------------------------------------------------------------
29### External dependencies.
30
31from optparse import OptionParser
32import tripe as T
33import os as OS
d64ce4ae
MW
34import signal as SIG
35import errno as E
a62f8e8a
MW
36import cdb as CDB
37import mLib as M
d64ce4ae 38import re as RX
a62f8e8a 39from time import time
d64ce4ae 40import subprocess as PROC
a62f8e8a
MW
41
42S = T.svcmgr
43
44###--------------------------------------------------------------------------
d64ce4ae
MW
45### Running auxiliary commands.
46
47class SelLineQueue (M.SelLineBuffer):
48 """Glues the select-line-buffer into the coroutine queue system."""
49
50 def __new__(cls, file, queue, tag, kind):
51 """See __init__ for documentation."""
52 return M.SelLineBuffer.__new__(cls, file.fileno())
53
54 def __init__(me, file, queue, tag, kind):
55 """
56 Initialize a new line-reading adaptor.
57
58 The adaptor reads lines from FILE. Each line is inserted as a message of
59 the stated KIND, bearing the TAG, into the QUEUE. End-of-file is
60 represented as None.
61 """
62 me._q = queue
63 me._file = file
64 me._tag = tag
65 me._kind = kind
66 me.enable()
67
68 @T._callback
69 def line(me, line):
70 me._q.put((me._tag, me._kind, line))
71
72 @T._callback
73 def eof(me):
74 me.disable()
75 me._q.put((me._tag, me._kind, None))
76
77class ErrorWatch (T.Coroutine):
78 """
79 An object which watches stderr streams for errors and converts them into
80 warnings of the form
81
82 WARN connect INFO stderr LINE
83
84 The INFO is a list of tokens associated with the file when it was
85 registered.
86
87 Usually there is a single ErrorWatch object, called errorwatch.
88 """
89
90 def __init__(me):
91 """Initialization: there are no arguments."""
92 T.Coroutine.__init__(me)
93 me._q = T.Queue()
94 me._map = {}
95 me._seq = 1
96
97 def watch(me, file, info):
98 """
99 Adds FILE to the collection of files to watch.
100
101 INFO will be written in the warning messages from this FILE. Returns a
102 sequence number which can be used to unregister the file again.
103 """
104 seq = me._seq
105 me._seq += 1
106 me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
107 return seq
108
109 def unwatch(me, seq):
110 """Stop watching the file with sequence number SEQ."""
111 del me._map[seq]
112 return me
113
114 def run(me):
115 """
116 Coroutine function: read items from the queue and report them.
117
118 Unregisters files automatically when they reach EOF.
119 """
120 while True:
121 seq, _, line = me._q.get()
122 if line is None:
123 me.unwatch(seq)
124 else:
125 S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
126
127def dbwatch():
128 """
14b77b60 129 Coroutine function: wake up every minute and notice changes to the
d64ce4ae
MW
130 database. When a change happens, tell the Pinger (q.v.) to rescan its
131 peers.
132 """
133 cr = T.Coroutine.getcurrent()
134 main = cr.parent
135 fw = M.FWatch(opts.cdb)
136 while True:
14b77b60 137 timer = M.SelTimer(time() + 60, lambda: cr.switch())
d64ce4ae
MW
138 main.switch()
139 if fw.update():
140 pinger.rescan(False)
141 S.notify('connect', 'peerdb-update')
142
143class ChildWatch (M.SelSignal):
144 """
145 An object which watches for specified processes exiting and reports
146 terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
147
148 There is usually only one ChildWatch object, called childwatch.
149 """
150
151 def __new__(cls):
152 """Initialize the child-watcher."""
153 return M.SelSignal.__new__(cls, SIG.SIGCHLD)
154
155 def __init__(me):
156 """Initialize the child-watcher."""
157 me._pid = {}
158 me.enable()
159
160 def watch(me, pid, queue, tag):
161 """
162 Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE)
163 to the QUEUE, where CODE is one of
164
165 * None (successful termination)
166 * ['exit-nonzero', CODE] (CODE is a string!)
167 * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
168 * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
169 """
170 me._pid[pid] = queue, tag
171 return me
172
173 def unwatch(me, pid):
174 """Unregister PID as a child to watch."""
175 del me._pid[pid]
176 return me
177
178 @T._callback
179 def signalled(me):
180 """
181 Called when child processes exit: collect exit statuses and report
182 failures.
183 """
184 while True:
185 try:
186 pid, status = OS.waitpid(-1, OS.WNOHANG)
187 except OSError, exc:
188 if exc.errno == E.ECHILD:
189 break
190 if pid == 0:
191 break
192 if pid not in me._pid:
193 continue
194 queue, tag = me._pid[pid]
195 if OS.WIFEXITED(status):
196 exit = OS.WEXITSTATUS(status)
197 if exit == 0:
198 code = None
199 else:
200 code = ['exit-nonzero', str(exit)]
201 elif OS.WIFSIGNALED(status):
202 code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
203 else:
204 code = ['exit-unknown', hex(status)]
205 queue.put((tag, 'exit', code))
206
207class Command (object):
208 """
209 Represents a running command.
210
211 This class is the main interface to the machery provided by the ChildWatch
212 and ErrorWatch objects. See also potwatch.
213 """
214
215 def __init__(me, info, queue, tag, args, env):
216 """
217 Start a new child process.
218
219 The ARGS are a list of arguments to be given to the child process. The
220 ENV is either None or a dictionary of environment variable assignments to
221 override the extant environment. INFO is a list of tokens to be included
222 in warnings about the child's stderr output. If the child writes a line
223 to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the
224 child exits, write (TAG, 'exit', CODE) to the QUEUE.
225 """
226 me._info = info
227 me._q = queue
228 me._tag = tag
229 myenv = OS.environ.copy()
230 if env: myenv.update(env)
231 me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
232 stdout = PROC.PIPE, stderr = PROC.PIPE)
233 me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
234 errorwatch.watch(me._proc.stderr, info)
235 childwatch.watch(me._proc.pid, queue, tag)
236
237 def __del__(me):
238 """
239 If I've been forgotten then stop watching for termination.
240 """
241 childwatch.unwatch(me._proc.pid)
242
243def potwatch(what, name, q):
244 """
245 Watch the queue Q for activity as reported by a Command object.
246
247 Information from the process's stdout is reported as
248
249 NOTE WHAT NAME stdout LINE
250
251 abnormal termination is reported as
252
253 WARN WHAT NAME CODE
254
255 where CODE is what the ChildWatch wrote.
256 """
257 eofp = deadp = False
258 while not deadp or not eofp:
259 _, kind, more = q.get()
260 if kind == 'stdout':
261 if more is None:
262 eofp = True
263 else:
264 S.notify('connect', what, name, 'stdout', more)
265 elif kind == 'exit':
266 if more: S.warn('connect', what, name, *more)
267 deadp = True
268
269###--------------------------------------------------------------------------
270### Peer database utilities.
a62f8e8a
MW
271
272_magic = ['_magic'] # An object distinct from all others
273
274class Peer (object):
275 """Representation of a peer in the database."""
276
277 def __init__(me, peer, cdb = None):
278 """
279 Create a new peer, named PEER.
280
281 Information about the peer is read from the database CDB, or the default
282 one given on the command-line.
283 """
284 me.name = peer
d64ce4ae 285 record = (cdb or CDB.init(opts.cdb))['P' + peer]
a62f8e8a
MW
286 me.__dict__.update(M.URLDecode(record, semip = True))
287
d64ce4ae 288 def get(me, key, default = _magic, filter = None):
a62f8e8a
MW
289 """
290 Get the information stashed under KEY from the peer's database record.
291
292 If DEFAULT is given, then use it if the database doesn't contain the
d64ce4ae
MW
293 necessary information. If no DEFAULT is given, then report an error. If
294 a FILTER function is given then apply it to the information from the
295 database before returning it.
a62f8e8a
MW
296 """
297 attr = me.__dict__.get(key, default)
298 if attr is _magic:
299 raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
d64ce4ae
MW
300 elif filter is not None:
301 attr = filter(attr)
a62f8e8a
MW
302 return attr
303
d64ce4ae
MW
304 def has(me, key):
305 """
306 Return whether the peer's database record has the KEY.
307 """
308 return key in me.__dict__
309
a62f8e8a
MW
310 def list(me):
311 """
312 Iterate over the available keys in the peer's database record.
313 """
314 return me.__dict__.iterkeys()
315
d64ce4ae
MW
316def boolean(value):
317 """Parse VALUE as a boolean."""
318 return value in ['t', 'true', 'y', 'yes', 'on']
319
320###--------------------------------------------------------------------------
321### Waking up and watching peers.
322
323def run_connect(peer, cmd):
324 """
325 Start the job of connecting to the passive PEER.
326
327 The CMD string is a shell command which will connect to the peer (via some
328 back-channel, say ssh and userv), issue a command
329
330 SVCSUBMIT connect passive [OPTIONS] USER
331
332 and write the resulting challenge to standard error.
333 """
334 q = T.Queue()
335 cmd = Command(['connect', peer.name], q, 'connect',
336 ['/bin/sh', '-c', cmd], None)
337 _, kind, more = q.peek()
338 if kind == 'stdout':
339 if more is None:
340 S.warn('connect', 'connect', peer.name, 'unexpected-eof')
341 else:
342 chal = more
343 S.greet(peer.name, chal)
344 q.get()
345 potwatch('connect', peer.name, q)
346
347def run_disconnect(peer, cmd):
348 """
349 Start the job of disconnecting from a passive PEER.
350
351 The CMD string is a shell command which will disconnect from the peer.
352 """
353 q = T.Queue()
354 cmd = Command(['disconnect', peer.name], q, 'disconnect',
355 ['/bin/sh', '-c', cmd], None)
356 potwatch('disconnect', peer.name, q)
357
358_pingseq = 0
359class PingPeer (object):
360 """
361 Object representing a peer which we are pinging to ensure that it is still
362 present.
363
364 PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an
365 event queue -- which saves us from having an enormous swarm of coroutines
366 -- but most of the actual work is done here.
367
368 In order to avoid confusion between different PingPeer instances for the
369 same actual peer, each PingPeer has a sequence number (its `seq'
370 attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair.
371 (Using the PingPeer instance itself will prevent garbage collection of
372 otherwise defunct instances.)
373 """
374
375 def __init__(me, pinger, queue, peer, pingnow):
376 """
377 Create a new PingPeer.
378
379 The PINGER is the Pinger object we should send the results to. This is
380 used when we remove ourselves, if the peer has been explicitly removed.
381
382 The QUEUE is the event queue on which timer and ping-command events
383 should be written.
384
385 The PEER is a `Peer' object describing the peer.
386
387 If PINGNOW is true, then immediately start pinging the peer. Otherwise
388 wait until the usual retry interval.
389 """
390 global _pingseq
391 me._pinger = pinger
392 me._q = queue
393 me._peer = peer.name
394 me.update(peer)
395 me.seq = _pingseq
396 _pingseq += 1
397 me._failures = 0
398 if pingnow:
399 me._timer = None
400 me._ping()
401 else:
402 me._timer = M.SelTimer(time() + me._every, me._time)
403
404 def update(me, peer):
405 """
406 Refreshes the timer parameters for this peer. We don't, however,
407 immediately reschedule anything: that will happen next time anything
408 interesting happens.
409 """
410 if peer is None: peer = Peer(me._peer)
411 assert peer.name == me._peer
412 me._every = peer.get('every', filter = T.timespec, default = 120)
413 me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
414 me._retries = peer.get('retries', filter = int, default = 5)
415 me._connectp = peer.has('connect')
416 return me
417
418 def _ping(me):
419 """
420 Send a ping to the peer; the result is sent to the Pinger's event queue.
421 """
422 S.rawcommand(T.TripeAsynchronousCommand(
423 me._q, (me._peer, me.seq),
424 ['EPING',
425 '-background', S.bgtag(),
426 '-timeout', str(me._timeout),
427 '--',
428 me._peer]))
429
430 def _reconnect(me):
431 peer = Peer(me._peer)
432 if me._connectp:
433 S.warn('connect', 'reconnecting', me._peer)
434 S.forcekx(me._peer)
435 T.spawn(run_connect, peer, peer.get('connect'))
436 me._timer = M.SelTimer(time() + me._every, me._time)
437 else:
438 S.kill(me._peer)
439
440 def event(me, code, stuff):
441 """
442 Respond to an event which happened to this peer.
443
444 Timer events indicate that we should start a new ping. (The server has
445 its own timeout which detects lost packets.)
446
447 We trap unknown-peer responses and detach from the Pinger.
448
449 If the ping fails and we run out of retries, we attempt to restart the
450 connection.
451 """
452 if code == 'TIMER':
453 me._failures = 0
454 me._ping()
455 elif code == 'FAIL':
456 S.notify('connect', 'ping-failed', me._peer, *stuff)
457 if not stuff:
458 pass
459 elif stuff[0] == 'unknown-peer':
460 me._pinger.kill(me._peer)
461 elif stuff[0] == 'ping-send-failed':
462 me._reconnect()
463 elif code == 'INFO':
464 if stuff[0] == 'ping-ok':
465 if me._failures > 0:
466 S.warn('connect', 'ping-ok', me._peer)
467 me._timer = M.SelTimer(time() + me._every, me._time)
468 elif stuff[0] == 'ping-timeout':
469 me._failures += 1
470 S.warn('connect', 'ping-timeout', me._peer,
471 'attempt', str(me._failures), 'of', str(me._retries))
472 if me._failures < me._retries:
473 me._ping()
474 else:
475 me._reconnect()
476 elif stuff[0] == 'ping-peer-died':
477 me._pinger.kill(me._peer)
478
479 @T._callback
480 def _time(me):
481 """
482 Handle timer callbacks by posting a timeout event on the queue.
483 """
484 me._timer = None
485 me._q.put(((me._peer, me.seq), 'TIMER', None))
486
487 def __str__(me):
488 return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
489 def __repr__(me):
490 return str(me)
491
492class Pinger (T.Coroutine):
493 """
494 The Pinger keeps track of the peers which we expect to be connected and
495 takes action if they seem to stop responding.
496
497 There is usually only one Pinger, called pinger.
498
499 The Pinger maintains a collection of PingPeer objects, and an event queue.
500 The PingPeers direct the results of their pings, and timer events, to the
501 event queue. The Pinger's coroutine picks items off the queue and
502 dispatches them back to the PingPeers as appropriate.
503 """
504
505 def __init__(me):
506 """Initialize the Pinger."""
507 T.Coroutine.__init__(me)
508 me._peers = {}
509 me._q = T.Queue()
510
511 def run(me):
512 """
513 Coroutine function: reads the pinger queue and sends events to the
514 PingPeer objects they correspond to.
515 """
516 while True:
517 (peer, seq), code, stuff = me._q.get()
518 if peer in me._peers and seq == me._peers[peer].seq:
519 me._peers[peer].event(code, stuff)
520
521 def add(me, peer, pingnow):
522 """
523 Add PEER to the collection of peers under the Pinger's watchful eye.
524 The arguments are as for PingPeer: see above.
525 """
526 me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
527 return me
528
529 def kill(me, peername):
530 """Remove PEER from the peers being watched by the Pinger."""
47912108
MW
531 try: del me._peers[peername]
532 except KeyError: pass
d64ce4ae
MW
533 return me
534
535 def rescan(me, startup):
536 """
537 General resynchronization method.
538
539 We scan the list of peers (with connect scripts) known at the server.
540 Any which are known to the Pinger but aren't known to the server are
541 removed from our list; newly arrived peers are added. (Note that a peer
542 can change state here either due to the server sneakily changing its list
543 without issuing notifications or, more likely, the database changing its
544 idea of whether a peer is interesting.) Finally, PingPeers which are
545 still present are prodded to update their timing parameters.
546
547 This method is called once at startup to pick up the peers already
548 installed, and again by the dbwatcher coroutine when it detects a change
549 to the database.
550 """
551 if T._debug: print '# rescan peers'
552 correct = {}
553 start = {}
554 for name in S.list():
555 try: peer = Peer(name)
556 except KeyError: continue
557 if peer.get('watch', filter = boolean, default = False):
558 if T._debug: print '# interesting peer %s' % peer
559 correct[peer.name] = start[peer.name] = peer
560 elif startup:
561 if T._debug: print '# peer %s ready for adoption' % peer
562 start[peer.name] = peer
563 for name, obj in me._peers.items():
564 try:
565 peer = correct[name]
566 except KeyError:
567 if T._debug: print '# peer %s vanished' % name
568 del me._peers[name]
569 else:
570 obj.update(peer)
571 for name, peer in start.iteritems():
572 if name in me._peers: continue
573 if startup:
574 if T._debug: print '# setting up peer %s' % name
575 ifname = S.ifname(name)
576 addr = S.addr(name)
577 T.defer(adoptpeer, peer, ifname, *addr)
578 else:
579 if T._debug: print '# adopting new peer %s' % name
580 me.add(peer, True)
581 return me
582
583 def adopted(me):
584 """
585 Returns the list of peers being watched by the Pinger.
586 """
587 return me._peers.keys()
588
fb52c291
MW
589 def find(me, name):
590 """Return the PingPeer with the given name."""
591 return me._peers[name]
592
d64ce4ae
MW
593###--------------------------------------------------------------------------
594### New connections.
595
596def encode_envvars(env, prefix, vars):
597 """
598 Encode the variables in VARS suitably for including in a program
599 environment. Lowercase letters in variable names are forced to uppercase;
600 runs of non-alphanumeric characters are replaced by single underscores; and
601 the PREFIX is prepended. The resulting variables are written to ENV.
602 """
603 for k, v in vars.iteritems():
604 env[prefix + r_bad.sub('_', k.upper())] = v
605
606r_bad = RX.compile(r'[\W_]+')
607def envvars(peer):
608 """
609 Translate the database information for a PEER into a dictionary of
610 environment variables with plausible upper-case names and a P_ prefix.
611 Also collect the crypto information into A_ variables.
612 """
613 env = {}
614 encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
615 encode_envvars(env, 'A_', S.algs(peer.name))
616 return env
617
618def run_ifupdown(what, peer, *args):
619 """
620 Run the interface up/down script for a peer.
621
622 WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. ARGS is a
623 list of arguments to pass to the script, in addition to the peer name.
624
625 The command is run and watched in the background by potwatch.
626 """
627 q = T.Queue()
628 c = Command([what, peer.name], q, what,
629 M.split(peer.get(what), quotep = True)[0] +
630 [peer.name] + list(args),
631 envvars(peer))
632 potwatch(what, peer.name, q)
633
634def adoptpeer(peer, ifname, *addr):
635 """
636 Add a new peer to our collection.
637
638 PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
639 ADDR is the list of tokens representing its address.
640
641 We try to bring up the interface and provoke a connection to the peer if
642 it's passive.
643 """
644 if peer.has('ifup'):
645 T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
646 .switch('ifup', peer, ifname, *addr)
647 cmd = peer.get('connect', default = None)
648 if cmd is not None:
649 T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
650 .switch(peer, cmd)
651 if peer.get('watch', filter = boolean, default = False):
652 pinger.add(peer, False)
653
654def disownpeer(peer):
655 """Drop the PEER from the Pinger and put its interface to bed."""
656 try: pinger.kill(peer)
657 except KeyError: pass
658 cmd = peer.get('disconnect', default = None)
659 if cmd is not None:
660 T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
661 .switch(peer, cmd)
662 if peer.has('ifdown'):
663 T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
664 .switch('ifdown', peer)
665
a62f8e8a
MW
666def addpeer(peer, addr):
667 """
668 Process a connect request from a new peer PEER on address ADDR.
669
670 Any existing peer with this name is disconnected from the server.
671 """
672 if peer.name in S.list():
673 S.kill(peer.name)
674 try:
6411163d 675 booltrue = ['t', 'true', 'y', 'yes', 'on']
a62f8e8a
MW
676 S.add(peer.name,
677 tunnel = peer.get('tunnel', None),
678 keepalive = peer.get('keepalive', None),
48b84569 679 key = peer.get('key', None),
fe2a5dcf 680 priv = peer.get('priv', None),
6411163d
MW
681 mobile = peer.get('mobile', 'nil') in booltrue,
682 cork = peer.get('cork', 'nil') in booltrue,
a62f8e8a
MW
683 *addr)
684 except T.TripeError, exc:
685 raise T.TripeJobError(*exc.args)
686
d64ce4ae
MW
687## Dictionary mapping challenges to waiting passive-connection coroutines.
688chalmap = {}
689
690def notify(_, code, *rest):
691 """
692 Watch for notifications.
693
694 We trap ADD and KILL notifications, and send them straight to adoptpeer and
695 disownpeer respectively; and dispatch GREET notifications to the
696 corresponding waiting coroutine.
697 """
698 if code == 'ADD':
699 try: p = Peer(rest[0])
700 except KeyError: return
701 adoptpeer(p, *rest[1:])
702 elif code == 'KILL':
703 try: p = Peer(rest[0])
704 except KeyError: return
705 disownpeer(p, *rest[1:])
706 elif code == 'GREET':
707 chal = rest[0]
708 try: cr = chalmap[chal]
709 except KeyError: pass
710 else: cr.switch(rest[1:])
711
712###--------------------------------------------------------------------------
713### Command implementation.
714
715def cmd_kick(name):
716 """
717 kick NAME: Force a new connection attempt for the NAMEd peer.
718 """
fb52c291
MW
719 try: pp = pinger.find(name)
720 except KeyError: raise T.TripeJobError('peer-not-adopted', name)
d64ce4ae
MW
721 try: peer = Peer(name)
722 except KeyError: raise T.TripeJobError('unknown-peer', name)
bbaa0bc1 723 T.spawn(run_connect, peer, peer.get('connect'))
d64ce4ae
MW
724
725def cmd_adopted():
726 """
727 adopted: Report a list of adopted peers.
728 """
729 for name in pinger.adopted():
730 T.svcinfo(name)
731
a62f8e8a
MW
732def cmd_active(name):
733 """
734 active NAME: Handle an active connection request for the peer called NAME.
735
736 The appropriate address is read from the database automatically.
737 """
d64ce4ae
MW
738 try: peer = Peer(name)
739 except KeyError: raise T.TripeJobError('unknown-peer', name)
a62f8e8a
MW
740 addr = peer.get('peer')
741 if addr == 'PASSIVE':
742 raise T.TripeJobError('passive-peer', name)
743 addpeer(peer, M.split(addr, quotep = True)[0])
744
d64ce4ae 745def cmd_listactive():
a62f8e8a
MW
746 """
747 list: Report a list of the available active peers.
748 """
749 cdb = CDB.init(opts.cdb)
750 for key in cdb.keys():
751 if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
752 T.svcinfo(key[1:])
753
754def cmd_info(name):
755 """
756 info NAME: Report the database entries for the named peer.
757 """
d64ce4ae
MW
758 try: peer = Peer(name)
759 except KeyError: raise T.TripeJobError('unknown-peer', name)
a62f8e8a
MW
760 items = list(peer.list())
761 items.sort()
762 for i in items:
763 T.svcinfo('%s=%s' % (i, peer.get(i)))
764
d3731285
MW
765def cmd_userpeer(user):
766 """
767 userpeer USER: Report the peer name for the named user.
768 """
d64ce4ae
MW
769 try: name = CDB.init(opts.cdb)['U' + user]
770 except KeyError: raise T.TripeJobError('unknown-user', user)
771 T.svcinfo(name)
a62f8e8a
MW
772
773def cmd_passive(*args):
774 """
775 passive [OPTIONS] USER: Await the arrival of the named USER.
776
777 Report a challenge; when (and if!) the server receives a greeting quoting
778 this challenge, add the corresponding peer to the server.
779 """
780 timeout = 30
781 op = T.OptParse(args, ['-timeout'])
782 for opt in op:
783 if opt == '-timeout':
784 timeout = T.timespec(op.arg())
785 user, = op.rest(1, 1)
d64ce4ae
MW
786 try: name = CDB.init(opts.cdb)['U' + user]
787 except KeyError: raise T.TripeJobError('unknown-user', user)
788 try: peer = Peer(name)
789 except KeyError: raise T.TripeJobError('unknown-peer', name)
a62f8e8a
MW
790 chal = S.getchal()
791 cr = T.Coroutine.getcurrent()
792 timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
793 try:
794 T.svcinfo(chal)
795 chalmap[chal] = cr
796 addr = cr.parent.switch()
797 if addr is None:
798 raise T.TripeJobError('connect-timeout')
d64ce4ae 799 addpeer(peer, addr)
a62f8e8a
MW
800 finally:
801 del chalmap[chal]
802
a62f8e8a
MW
803###--------------------------------------------------------------------------
804### Start up.
805
806def setup():
807 """
808 Service setup.
809
d64ce4ae
MW
810 Register the notification watcher, rescan the peers, and add automatic
811 active peers.
a62f8e8a
MW
812 """
813 S.handler['NOTE'] = notify
814 S.watch('+n')
d64ce4ae
MW
815
816 pinger.rescan(opts.startup)
817
a62f8e8a
MW
818 if opts.startup:
819 cdb = CDB.init(opts.cdb)
820 try:
821 autos = cdb['%AUTO']
822 except KeyError:
823 autos = ''
824 for name in M.split(autos)[0]:
825 try:
826 peer = Peer(name, cdb)
827 addpeer(peer, M.split(peer.get('peer'), quotep = True)[0])
828 except T.TripeJobError, err:
829 S.warn('connect', 'auto-add-failed', name, *err.args)
830
d64ce4ae
MW
831def init():
832 """
833 Initialization to be done before service startup.
834 """
835 global errorwatch, childwatch, pinger
836 errorwatch = ErrorWatch()
837 childwatch = ChildWatch()
838 pinger = Pinger()
839 T.Coroutine(dbwatch, name = 'dbwatch').switch()
840 errorwatch.switch()
841 pinger.switch()
842
a62f8e8a
MW
843def parse_options():
844 """
845 Parse the command-line options.
846
847 Automatically changes directory to the requested configdir, and turns on
848 debugging. Returns the options object.
849 """
850 op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
851 version = '%%prog %s' % VERSION)
852
853 op.add_option('-a', '--admin-socket',
854 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
855 help = 'Select socket to connect to [default %default]')
856 op.add_option('-d', '--directory',
857 metavar = 'DIR', dest = 'dir', default = T.configdir,
858 help = 'Select current diretory [default %default]')
859 op.add_option('-p', '--peerdb',
860 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
861 help = 'Select peers database [default %default]')
862 op.add_option('--daemon', dest = 'daemon',
863 default = False, action = 'store_true',
864 help = 'Become a daemon after successful initialization')
865 op.add_option('--debug', dest = 'debug',
866 default = False, action = 'store_true',
867 help = 'Emit debugging trace information')
868 op.add_option('--startup', dest = 'startup',
869 default = False, action = 'store_true',
870 help = 'Being called as part of the server startup')
871
872 opts, args = op.parse_args()
873 if args: op.error('no arguments permitted')
874 OS.chdir(opts.dir)
875 T._debug = opts.debug
876 return opts
877
878## Service table, for running manually.
d64ce4ae
MW
879service_info = [('connect', T.VERSION, {
880 'adopted': (0, 0, '', cmd_adopted),
881 'kick': (1, 1, 'PEER', cmd_kick),
a62f8e8a
MW
882 'passive': (1, None, '[OPTIONS] USER', cmd_passive),
883 'active': (1, 1, 'PEER', cmd_active),
884 'info': (1, 1, 'PEER', cmd_info),
d64ce4ae 885 'list-active': (0, 0, '', cmd_listactive),
d3731285 886 'userpeer': (1, 1, 'USER', cmd_userpeer)
a62f8e8a
MW
887})]
888
889if __name__ == '__main__':
890 opts = parse_options()
891 T.runservices(opts.tripesock, service_info,
d64ce4ae 892 init = init, setup = setup,
a62f8e8a
MW
893 daemon = opts.daemon)
894
895###----- That's all, folks --------------------------------------------------