chiark / gitweb /
svc/connect.in: Add a backstop exception handler to the pinger loop.
[tripe] / svc / connect.in
CommitLineData
a62f8e8a
MW
1#! @PYTHON@
2### -*-python-*-
3###
d64ce4ae 4### Connect to remote peers, and keep track of them
a62f8e8a 5###
d64ce4ae 6### (c) 2007 Straylight/Edgeware
a62f8e8a
MW
7###
8
9###----- Licensing notice ---------------------------------------------------
10###
11### This file is part of Trivial IP Encryption (TrIPE).
12###
11ad66c2
MW
13### TrIPE is free software: you can redistribute it and/or modify it under
14### the terms of the GNU General Public License as published by the Free
15### Software Foundation; either version 3 of the License, or (at your
16### option) any later version.
a62f8e8a 17###
11ad66c2
MW
18### TrIPE is distributed in the hope that it will be useful, but WITHOUT
19### ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
20### FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21### for more details.
a62f8e8a
MW
22###
23### You should have received a copy of the GNU General Public License
11ad66c2 24### along with TrIPE. If not, see <https://www.gnu.org/licenses/>.
a62f8e8a
MW
25
26VERSION = '@VERSION@'
27
28###--------------------------------------------------------------------------
29### External dependencies.
30
31from optparse import OptionParser
32import tripe as T
33import os as OS
d64ce4ae
MW
34import signal as SIG
35import errno as E
b9dedfa6 36from math import sqrt
a62f8e8a
MW
37import cdb as CDB
38import mLib as M
d64ce4ae 39import re as RX
ebe48771 40import sys as SYS
a62f8e8a 41from time import time
d64ce4ae 42import subprocess as PROC
a62f8e8a
MW
43
44S = T.svcmgr
45
46###--------------------------------------------------------------------------
d64ce4ae
MW
47### Running auxiliary commands.
48
49class SelLineQueue (M.SelLineBuffer):
50 """Glues the select-line-buffer into the coroutine queue system."""
51
52 def __new__(cls, file, queue, tag, kind):
53 """See __init__ for documentation."""
54 return M.SelLineBuffer.__new__(cls, file.fileno())
55
56 def __init__(me, file, queue, tag, kind):
57 """
58 Initialize a new line-reading adaptor.
59
60 The adaptor reads lines from FILE. Each line is inserted as a message of
61 the stated KIND, bearing the TAG, into the QUEUE. End-of-file is
62 represented as None.
63 """
64 me._q = queue
65 me._file = file
66 me._tag = tag
67 me._kind = kind
68 me.enable()
69
70 @T._callback
71 def line(me, line):
72 me._q.put((me._tag, me._kind, line))
73
74 @T._callback
75 def eof(me):
76 me.disable()
77 me._q.put((me._tag, me._kind, None))
78
79class ErrorWatch (T.Coroutine):
80 """
81 An object which watches stderr streams for errors and converts them into
82 warnings of the form
83
84 WARN connect INFO stderr LINE
85
86 The INFO is a list of tokens associated with the file when it was
87 registered.
88
89 Usually there is a single ErrorWatch object, called errorwatch.
90 """
91
92 def __init__(me):
93 """Initialization: there are no arguments."""
94 T.Coroutine.__init__(me)
95 me._q = T.Queue()
96 me._map = {}
97 me._seq = 1
98
99 def watch(me, file, info):
100 """
101 Adds FILE to the collection of files to watch.
102
103 INFO will be written in the warning messages from this FILE. Returns a
104 sequence number which can be used to unregister the file again.
105 """
106 seq = me._seq
107 me._seq += 1
108 me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
109 return seq
110
111 def unwatch(me, seq):
112 """Stop watching the file with sequence number SEQ."""
113 del me._map[seq]
114 return me
115
116 def run(me):
117 """
118 Coroutine function: read items from the queue and report them.
119
120 Unregisters files automatically when they reach EOF.
121 """
122 while True:
123 seq, _, line = me._q.get()
124 if line is None:
125 me.unwatch(seq)
126 else:
127 S.warn(*['connect'] + me._map[seq][0] + ['stderr', line])
128
129def dbwatch():
130 """
14b77b60 131 Coroutine function: wake up every minute and notice changes to the
d64ce4ae
MW
132 database. When a change happens, tell the Pinger (q.v.) to rescan its
133 peers.
134 """
135 cr = T.Coroutine.getcurrent()
136 main = cr.parent
137 fw = M.FWatch(opts.cdb)
138 while True:
14b77b60 139 timer = M.SelTimer(time() + 60, lambda: cr.switch())
d64ce4ae
MW
140 main.switch()
141 if fw.update():
142 pinger.rescan(False)
143 S.notify('connect', 'peerdb-update')
144
145class ChildWatch (M.SelSignal):
146 """
147 An object which watches for specified processes exiting and reports
148 terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
149
150 There is usually only one ChildWatch object, called childwatch.
151 """
152
153 def __new__(cls):
154 """Initialize the child-watcher."""
155 return M.SelSignal.__new__(cls, SIG.SIGCHLD)
156
157 def __init__(me):
158 """Initialize the child-watcher."""
159 me._pid = {}
160 me.enable()
161
162 def watch(me, pid, queue, tag):
163 """
164 Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE)
165 to the QUEUE, where CODE is one of
166
167 * None (successful termination)
168 * ['exit-nonzero', CODE] (CODE is a string!)
169 * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
170 * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
171 """
172 me._pid[pid] = queue, tag
173 return me
174
175 def unwatch(me, pid):
176 """Unregister PID as a child to watch."""
177 del me._pid[pid]
178 return me
179
180 @T._callback
181 def signalled(me):
182 """
183 Called when child processes exit: collect exit statuses and report
184 failures.
185 """
186 while True:
187 try:
188 pid, status = OS.waitpid(-1, OS.WNOHANG)
189 except OSError, exc:
190 if exc.errno == E.ECHILD:
191 break
192 if pid == 0:
193 break
194 if pid not in me._pid:
195 continue
196 queue, tag = me._pid[pid]
197 if OS.WIFEXITED(status):
198 exit = OS.WEXITSTATUS(status)
199 if exit == 0:
200 code = None
201 else:
202 code = ['exit-nonzero', str(exit)]
203 elif OS.WIFSIGNALED(status):
204 code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
205 else:
206 code = ['exit-unknown', hex(status)]
207 queue.put((tag, 'exit', code))
208
209class Command (object):
210 """
211 Represents a running command.
212
213 This class is the main interface to the machery provided by the ChildWatch
214 and ErrorWatch objects. See also potwatch.
215 """
216
217 def __init__(me, info, queue, tag, args, env):
218 """
219 Start a new child process.
220
221 The ARGS are a list of arguments to be given to the child process. The
222 ENV is either None or a dictionary of environment variable assignments to
223 override the extant environment. INFO is a list of tokens to be included
224 in warnings about the child's stderr output. If the child writes a line
225 to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the
226 child exits, write (TAG, 'exit', CODE) to the QUEUE.
227 """
228 me._info = info
229 me._q = queue
230 me._tag = tag
231 myenv = OS.environ.copy()
232 if env: myenv.update(env)
233 me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
234 stdout = PROC.PIPE, stderr = PROC.PIPE)
235 me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
236 errorwatch.watch(me._proc.stderr, info)
237 childwatch.watch(me._proc.pid, queue, tag)
238
239 def __del__(me):
240 """
241 If I've been forgotten then stop watching for termination.
242 """
243 childwatch.unwatch(me._proc.pid)
244
245def potwatch(what, name, q):
246 """
247 Watch the queue Q for activity as reported by a Command object.
248
249 Information from the process's stdout is reported as
250
251 NOTE WHAT NAME stdout LINE
252
253 abnormal termination is reported as
254
255 WARN WHAT NAME CODE
256
257 where CODE is what the ChildWatch wrote.
258 """
259 eofp = deadp = False
260 while not deadp or not eofp:
261 _, kind, more = q.get()
262 if kind == 'stdout':
263 if more is None:
264 eofp = True
265 else:
266 S.notify('connect', what, name, 'stdout', more)
267 elif kind == 'exit':
268 if more: S.warn('connect', what, name, *more)
269 deadp = True
270
271###--------------------------------------------------------------------------
272### Peer database utilities.
a62f8e8a
MW
273
274_magic = ['_magic'] # An object distinct from all others
275
276class Peer (object):
277 """Representation of a peer in the database."""
278
279 def __init__(me, peer, cdb = None):
280 """
281 Create a new peer, named PEER.
282
283 Information about the peer is read from the database CDB, or the default
284 one given on the command-line.
285 """
286 me.name = peer
d64ce4ae 287 record = (cdb or CDB.init(opts.cdb))['P' + peer]
a62f8e8a
MW
288 me.__dict__.update(M.URLDecode(record, semip = True))
289
d64ce4ae 290 def get(me, key, default = _magic, filter = None):
a62f8e8a
MW
291 """
292 Get the information stashed under KEY from the peer's database record.
293
294 If DEFAULT is given, then use it if the database doesn't contain the
d64ce4ae
MW
295 necessary information. If no DEFAULT is given, then report an error. If
296 a FILTER function is given then apply it to the information from the
297 database before returning it.
a62f8e8a
MW
298 """
299 attr = me.__dict__.get(key, default)
300 if attr is _magic:
301 raise T.TripeJobError('malformed-peer', me.name, 'missing-key', key)
d64ce4ae
MW
302 elif filter is not None:
303 attr = filter(attr)
a62f8e8a
MW
304 return attr
305
d64ce4ae
MW
306 def has(me, key):
307 """
308 Return whether the peer's database record has the KEY.
309 """
310 return key in me.__dict__
311
a62f8e8a
MW
312 def list(me):
313 """
314 Iterate over the available keys in the peer's database record.
315 """
316 return me.__dict__.iterkeys()
317
d64ce4ae
MW
318def boolean(value):
319 """Parse VALUE as a boolean."""
320 return value in ['t', 'true', 'y', 'yes', 'on']
321
322###--------------------------------------------------------------------------
323### Waking up and watching peers.
324
325def run_connect(peer, cmd):
326 """
327 Start the job of connecting to the passive PEER.
328
329 The CMD string is a shell command which will connect to the peer (via some
330 back-channel, say ssh and userv), issue a command
331
332 SVCSUBMIT connect passive [OPTIONS] USER
333
334 and write the resulting challenge to standard error.
335 """
336 q = T.Queue()
337 cmd = Command(['connect', peer.name], q, 'connect',
338 ['/bin/sh', '-c', cmd], None)
339 _, kind, more = q.peek()
340 if kind == 'stdout':
341 if more is None:
342 S.warn('connect', 'connect', peer.name, 'unexpected-eof')
343 else:
344 chal = more
345 S.greet(peer.name, chal)
346 q.get()
347 potwatch('connect', peer.name, q)
348
349def run_disconnect(peer, cmd):
350 """
351 Start the job of disconnecting from a passive PEER.
352
353 The CMD string is a shell command which will disconnect from the peer.
354 """
355 q = T.Queue()
356 cmd = Command(['disconnect', peer.name], q, 'disconnect',
357 ['/bin/sh', '-c', cmd], None)
358 potwatch('disconnect', peer.name, q)
359
360_pingseq = 0
361class PingPeer (object):
362 """
363 Object representing a peer which we are pinging to ensure that it is still
364 present.
365
366 PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an
367 event queue -- which saves us from having an enormous swarm of coroutines
368 -- but most of the actual work is done here.
369
370 In order to avoid confusion between different PingPeer instances for the
371 same actual peer, each PingPeer has a sequence number (its `seq'
372 attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair.
373 (Using the PingPeer instance itself will prevent garbage collection of
374 otherwise defunct instances.)
375 """
376
377 def __init__(me, pinger, queue, peer, pingnow):
378 """
379 Create a new PingPeer.
380
381 The PINGER is the Pinger object we should send the results to. This is
382 used when we remove ourselves, if the peer has been explicitly removed.
383
384 The QUEUE is the event queue on which timer and ping-command events
385 should be written.
386
387 The PEER is a `Peer' object describing the peer.
388
389 If PINGNOW is true, then immediately start pinging the peer. Otherwise
390 wait until the usual retry interval.
391 """
392 global _pingseq
393 me._pinger = pinger
394 me._q = queue
395 me._peer = peer.name
396 me.update(peer)
397 me.seq = _pingseq
398 _pingseq += 1
399 me._failures = 0
965eb987 400 me._sabotage = False
b9dedfa6
MW
401 me._last = '-'
402 me._nping = 0
403 me._nlost = 0
404 me._sigma_t = 0
405 me._sigma_t2 = 0
406 me._min = me._max = '-'
d64ce4ae
MW
407 if pingnow:
408 me._timer = None
409 me._ping()
410 else:
411 me._timer = M.SelTimer(time() + me._every, me._time)
412
413 def update(me, peer):
414 """
415 Refreshes the timer parameters for this peer. We don't, however,
416 immediately reschedule anything: that will happen next time anything
417 interesting happens.
418 """
419 if peer is None: peer = Peer(me._peer)
420 assert peer.name == me._peer
421 me._every = peer.get('every', filter = T.timespec, default = 120)
422 me._timeout = peer.get('timeout', filter = T.timespec, default = 10)
423 me._retries = peer.get('retries', filter = int, default = 5)
424 me._connectp = peer.has('connect')
425 return me
426
427 def _ping(me):
428 """
429 Send a ping to the peer; the result is sent to the Pinger's event queue.
430 """
431 S.rawcommand(T.TripeAsynchronousCommand(
432 me._q, (me._peer, me.seq),
433 ['EPING',
434 '-background', S.bgtag(),
435 '-timeout', str(me._timeout),
436 '--',
437 me._peer]))
438
439 def _reconnect(me):
8ad21c29
MW
440 try:
441 peer = Peer(me._peer)
442 if me._connectp:
443 S.warn('connect', 'reconnecting', me._peer)
444 S.forcekx(me._peer)
445 T.spawn(run_connect, peer, peer.get('connect'))
446 me._timer = M.SelTimer(time() + me._every, me._time)
447 me._sabotage = False
448 else:
449 S.kill(me._peer)
450 except TripeError, e:
451 if e.args[0] == 'unknown-peer': me._pinger.kill(me._peer)
d64ce4ae
MW
452
453 def event(me, code, stuff):
454 """
455 Respond to an event which happened to this peer.
456
457 Timer events indicate that we should start a new ping. (The server has
458 its own timeout which detects lost packets.)
459
460 We trap unknown-peer responses and detach from the Pinger.
461
462 If the ping fails and we run out of retries, we attempt to restart the
463 connection.
464 """
465 if code == 'TIMER':
466 me._failures = 0
467 me._ping()
468 elif code == 'FAIL':
469 S.notify('connect', 'ping-failed', me._peer, *stuff)
470 if not stuff:
471 pass
472 elif stuff[0] == 'unknown-peer':
473 me._pinger.kill(me._peer)
474 elif stuff[0] == 'ping-send-failed':
475 me._reconnect()
476 elif code == 'INFO':
965eb987
MW
477 outcome = stuff[0]
478 if outcome == 'ping-ok' and me._sabotage:
479 outcome = 'ping-timeout'
480 if outcome == 'ping-ok':
d64ce4ae
MW
481 if me._failures > 0:
482 S.warn('connect', 'ping-ok', me._peer)
b9dedfa6
MW
483 t = float(stuff[1])
484 me._last = '%.1fms' % t
485 me._sigma_t += t
486 me._sigma_t2 += t*t
487 me._nping += 1
488 if me._min == '-' or t < me._min: me._min = t
489 if me._max == '-' or t > me._max: me._max = t
d64ce4ae 490 me._timer = M.SelTimer(time() + me._every, me._time)
965eb987 491 elif outcome == 'ping-timeout':
d64ce4ae 492 me._failures += 1
b9dedfa6 493 me._nlost += 1
d64ce4ae
MW
494 S.warn('connect', 'ping-timeout', me._peer,
495 'attempt', str(me._failures), 'of', str(me._retries))
496 if me._failures < me._retries:
497 me._ping()
b9dedfa6 498 me._last = 'timeout'
d64ce4ae
MW
499 else:
500 me._reconnect()
965eb987
MW
501 me._last = 'reconnect'
502 elif outcome == 'ping-peer-died':
d64ce4ae
MW
503 me._pinger.kill(me._peer)
504
965eb987
MW
505 def sabotage(me):
506 """Sabotage the peer, for testing purposes."""
507 me._sabotage = True
508 if me._timer: me._timer.kill()
509 T.defer(me._time)
510
b9dedfa6
MW
511 def info(me):
512 if not me._nping:
513 mean = sd = '-'
514 else:
515 mean = me._sigma_t/me._nping
516 sd = sqrt(me._sigma_t2/me._nping - mean*mean)
517 n = me._nping + me._nlost
518 if not n: pclost = '-'
519 else: pclost = '%d' % ((100*me._nlost + n//2)//n)
520 return { 'last-ping': me._last,
521 'mean-ping': '%.1fms' % mean,
522 'sd-ping': '%.1fms' % sd,
523 'n-ping': '%d' % me._nping,
524 'n-lost': '%d' % me._nlost,
525 'percent-lost': pclost,
526 'min-ping': '%.1fms' % me._min,
527 'max-ping': '%.1fms' % me._max,
528 'state': me._timer and 'idle' or 'check',
529 'failures': me._failures }
530
d64ce4ae
MW
531 @T._callback
532 def _time(me):
533 """
534 Handle timer callbacks by posting a timeout event on the queue.
535 """
536 me._timer = None
537 me._q.put(((me._peer, me.seq), 'TIMER', None))
538
539 def __str__(me):
540 return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
541 def __repr__(me):
542 return str(me)
543
544class Pinger (T.Coroutine):
545 """
546 The Pinger keeps track of the peers which we expect to be connected and
547 takes action if they seem to stop responding.
548
549 There is usually only one Pinger, called pinger.
550
551 The Pinger maintains a collection of PingPeer objects, and an event queue.
552 The PingPeers direct the results of their pings, and timer events, to the
553 event queue. The Pinger's coroutine picks items off the queue and
554 dispatches them back to the PingPeers as appropriate.
555 """
556
557 def __init__(me):
558 """Initialize the Pinger."""
559 T.Coroutine.__init__(me)
560 me._peers = {}
561 me._q = T.Queue()
562
563 def run(me):
564 """
565 Coroutine function: reads the pinger queue and sends events to the
566 PingPeer objects they correspond to.
567 """
568 while True:
569 (peer, seq), code, stuff = me._q.get()
570 if peer in me._peers and seq == me._peers[peer].seq:
ebe48771
MW
571 try: me._peers[peer].event(code, stuff)
572 except Exception, e:
573 SYS.excepthook(*SYS.exc_info())
d64ce4ae
MW
574
575 def add(me, peer, pingnow):
576 """
577 Add PEER to the collection of peers under the Pinger's watchful eye.
578 The arguments are as for PingPeer: see above.
579 """
580 me._peers[peer.name] = PingPeer(me, me._q, peer, pingnow)
581 return me
582
583 def kill(me, peername):
584 """Remove PEER from the peers being watched by the Pinger."""
47912108
MW
585 try: del me._peers[peername]
586 except KeyError: pass
d64ce4ae
MW
587 return me
588
589 def rescan(me, startup):
590 """
591 General resynchronization method.
592
593 We scan the list of peers (with connect scripts) known at the server.
594 Any which are known to the Pinger but aren't known to the server are
595 removed from our list; newly arrived peers are added. (Note that a peer
596 can change state here either due to the server sneakily changing its list
597 without issuing notifications or, more likely, the database changing its
598 idea of whether a peer is interesting.) Finally, PingPeers which are
599 still present are prodded to update their timing parameters.
600
601 This method is called once at startup to pick up the peers already
602 installed, and again by the dbwatcher coroutine when it detects a change
603 to the database.
604 """
605 if T._debug: print '# rescan peers'
606 correct = {}
607 start = {}
608 for name in S.list():
609 try: peer = Peer(name)
610 except KeyError: continue
611 if peer.get('watch', filter = boolean, default = False):
612 if T._debug: print '# interesting peer %s' % peer
613 correct[peer.name] = start[peer.name] = peer
614 elif startup:
615 if T._debug: print '# peer %s ready for adoption' % peer
616 start[peer.name] = peer
617 for name, obj in me._peers.items():
618 try:
619 peer = correct[name]
620 except KeyError:
621 if T._debug: print '# peer %s vanished' % name
622 del me._peers[name]
623 else:
624 obj.update(peer)
625 for name, peer in start.iteritems():
626 if name in me._peers: continue
627 if startup:
628 if T._debug: print '# setting up peer %s' % name
629 ifname = S.ifname(name)
630 addr = S.addr(name)
631 T.defer(adoptpeer, peer, ifname, *addr)
632 else:
633 if T._debug: print '# adopting new peer %s' % name
634 me.add(peer, True)
635 return me
636
637 def adopted(me):
638 """
639 Returns the list of peers being watched by the Pinger.
640 """
641 return me._peers.keys()
642
fb52c291
MW
643 def find(me, name):
644 """Return the PingPeer with the given name."""
645 return me._peers[name]
646
d64ce4ae
MW
647###--------------------------------------------------------------------------
648### New connections.
649
650def encode_envvars(env, prefix, vars):
651 """
652 Encode the variables in VARS suitably for including in a program
653 environment. Lowercase letters in variable names are forced to uppercase;
654 runs of non-alphanumeric characters are replaced by single underscores; and
655 the PREFIX is prepended. The resulting variables are written to ENV.
656 """
657 for k, v in vars.iteritems():
658 env[prefix + r_bad.sub('_', k.upper())] = v
659
660r_bad = RX.compile(r'[\W_]+')
661def envvars(peer):
662 """
663 Translate the database information for a PEER into a dictionary of
664 environment variables with plausible upper-case names and a P_ prefix.
665 Also collect the crypto information into A_ variables.
666 """
667 env = {}
668 encode_envvars(env, 'P_', dict([(k, peer.get(k)) for k in peer.list()]))
669 encode_envvars(env, 'A_', S.algs(peer.name))
670 return env
671
672def run_ifupdown(what, peer, *args):
673 """
674 Run the interface up/down script for a peer.
675
676 WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. ARGS is a
677 list of arguments to pass to the script, in addition to the peer name.
678
679 The command is run and watched in the background by potwatch.
680 """
681 q = T.Queue()
682 c = Command([what, peer.name], q, what,
683 M.split(peer.get(what), quotep = True)[0] +
684 [peer.name] + list(args),
685 envvars(peer))
686 potwatch(what, peer.name, q)
687
688def adoptpeer(peer, ifname, *addr):
689 """
690 Add a new peer to our collection.
691
692 PEER is the `Peer' object; IFNAME is the interface name for its tunnel; and
693 ADDR is the list of tokens representing its address.
694
695 We try to bring up the interface and provoke a connection to the peer if
696 it's passive.
697 """
698 if peer.has('ifup'):
699 T.Coroutine(run_ifupdown, name = 'ifup %s' % peer.name) \
700 .switch('ifup', peer, ifname, *addr)
701 cmd = peer.get('connect', default = None)
702 if cmd is not None:
703 T.Coroutine(run_connect, name = 'connect %s' % peer.name) \
704 .switch(peer, cmd)
705 if peer.get('watch', filter = boolean, default = False):
706 pinger.add(peer, False)
707
708def disownpeer(peer):
709 """Drop the PEER from the Pinger and put its interface to bed."""
710 try: pinger.kill(peer)
711 except KeyError: pass
712 cmd = peer.get('disconnect', default = None)
713 if cmd is not None:
714 T.Coroutine(run_disconnect, name = 'disconnect %s' % peer.name) \
715 .switch(peer, cmd)
716 if peer.has('ifdown'):
717 T.Coroutine(run_ifupdown, name = 'ifdown %s' % peer.name) \
718 .switch('ifdown', peer)
719
a62f8e8a
MW
720def addpeer(peer, addr):
721 """
722 Process a connect request from a new peer PEER on address ADDR.
723
724 Any existing peer with this name is disconnected from the server.
725 """
726 if peer.name in S.list():
727 S.kill(peer.name)
728 try:
6411163d 729 booltrue = ['t', 'true', 'y', 'yes', 'on']
a62f8e8a
MW
730 S.add(peer.name,
731 tunnel = peer.get('tunnel', None),
732 keepalive = peer.get('keepalive', None),
48b84569 733 key = peer.get('key', None),
fe2a5dcf 734 priv = peer.get('priv', None),
6411163d
MW
735 mobile = peer.get('mobile', 'nil') in booltrue,
736 cork = peer.get('cork', 'nil') in booltrue,
a62f8e8a
MW
737 *addr)
738 except T.TripeError, exc:
739 raise T.TripeJobError(*exc.args)
740
d64ce4ae
MW
741## Dictionary mapping challenges to waiting passive-connection coroutines.
742chalmap = {}
743
744def notify(_, code, *rest):
745 """
746 Watch for notifications.
747
748 We trap ADD and KILL notifications, and send them straight to adoptpeer and
749 disownpeer respectively; and dispatch GREET notifications to the
750 corresponding waiting coroutine.
751 """
752 if code == 'ADD':
753 try: p = Peer(rest[0])
754 except KeyError: return
755 adoptpeer(p, *rest[1:])
756 elif code == 'KILL':
757 try: p = Peer(rest[0])
758 except KeyError: return
759 disownpeer(p, *rest[1:])
760 elif code == 'GREET':
761 chal = rest[0]
762 try: cr = chalmap[chal]
763 except KeyError: pass
764 else: cr.switch(rest[1:])
765
766###--------------------------------------------------------------------------
767### Command implementation.
768
769def cmd_kick(name):
770 """
771 kick NAME: Force a new connection attempt for the NAMEd peer.
772 """
fb52c291
MW
773 try: pp = pinger.find(name)
774 except KeyError: raise T.TripeJobError('peer-not-adopted', name)
d64ce4ae
MW
775 try: peer = Peer(name)
776 except KeyError: raise T.TripeJobError('unknown-peer', name)
8bd108e4
MW
777 conn = peer.get('connect', None)
778 if conn: T.spawn(run_connect, peer, peer.get('connect'))
779 else: T.spawn(lambda p: S.forcekx(p.name), peer)
d64ce4ae
MW
780
781def cmd_adopted():
782 """
783 adopted: Report a list of adopted peers.
784 """
785 for name in pinger.adopted():
786 T.svcinfo(name)
787
a62f8e8a
MW
788def cmd_active(name):
789 """
790 active NAME: Handle an active connection request for the peer called NAME.
791
792 The appropriate address is read from the database automatically.
793 """
d64ce4ae
MW
794 try: peer = Peer(name)
795 except KeyError: raise T.TripeJobError('unknown-peer', name)
a62f8e8a
MW
796 addr = peer.get('peer')
797 if addr == 'PASSIVE':
798 raise T.TripeJobError('passive-peer', name)
799 addpeer(peer, M.split(addr, quotep = True)[0])
800
d64ce4ae 801def cmd_listactive():
a62f8e8a
MW
802 """
803 list: Report a list of the available active peers.
804 """
805 cdb = CDB.init(opts.cdb)
806 for key in cdb.keys():
807 if key.startswith('P') and Peer(key[1:]).get('peer', '') != 'PASSIVE':
808 T.svcinfo(key[1:])
809
810def cmd_info(name):
811 """
812 info NAME: Report the database entries for the named peer.
813 """
d64ce4ae
MW
814 try: peer = Peer(name)
815 except KeyError: raise T.TripeJobError('unknown-peer', name)
b9dedfa6
MW
816 d = {}
817 try: pp = pinger.find(name)
818 except KeyError: pass
819 else: d.update(pp.info())
820 items = list(peer.list()) + d.keys()
a62f8e8a
MW
821 items.sort()
822 for i in items:
b9dedfa6
MW
823 try: v = d[i]
824 except KeyError: v = peer.get(i)
825 T.svcinfo('%s=%s' % (i, v))
a62f8e8a 826
d3731285
MW
827def cmd_userpeer(user):
828 """
829 userpeer USER: Report the peer name for the named user.
830 """
d64ce4ae
MW
831 try: name = CDB.init(opts.cdb)['U' + user]
832 except KeyError: raise T.TripeJobError('unknown-user', user)
833 T.svcinfo(name)
a62f8e8a
MW
834
835def cmd_passive(*args):
836 """
837 passive [OPTIONS] USER: Await the arrival of the named USER.
838
839 Report a challenge; when (and if!) the server receives a greeting quoting
840 this challenge, add the corresponding peer to the server.
841 """
842 timeout = 30
843 op = T.OptParse(args, ['-timeout'])
844 for opt in op:
845 if opt == '-timeout':
846 timeout = T.timespec(op.arg())
847 user, = op.rest(1, 1)
d64ce4ae
MW
848 try: name = CDB.init(opts.cdb)['U' + user]
849 except KeyError: raise T.TripeJobError('unknown-user', user)
850 try: peer = Peer(name)
851 except KeyError: raise T.TripeJobError('unknown-peer', name)
a62f8e8a
MW
852 chal = S.getchal()
853 cr = T.Coroutine.getcurrent()
854 timer = M.SelTimer(time() + timeout, lambda: cr.switch(None))
855 try:
856 T.svcinfo(chal)
857 chalmap[chal] = cr
858 addr = cr.parent.switch()
859 if addr is None:
860 raise T.TripeJobError('connect-timeout')
d64ce4ae 861 addpeer(peer, addr)
a62f8e8a
MW
862 finally:
863 del chalmap[chal]
864
965eb987
MW
865def cmd_sabotage(name):
866 """
867 sabotage NAME: Sabotage the NAMEd peer so that we think it can't be pinged.
868 """
869 try: pp = pinger.find(name)
870 except KeyError: raise T.TripeJobError('unknown-peer', name)
871 pp.sabotage()
872
a62f8e8a
MW
873###--------------------------------------------------------------------------
874### Start up.
875
876def setup():
877 """
878 Service setup.
879
d64ce4ae
MW
880 Register the notification watcher, rescan the peers, and add automatic
881 active peers.
a62f8e8a
MW
882 """
883 S.handler['NOTE'] = notify
884 S.watch('+n')
d64ce4ae
MW
885
886 pinger.rescan(opts.startup)
887
a62f8e8a
MW
888 if opts.startup:
889 cdb = CDB.init(opts.cdb)
890 try:
891 autos = cdb['%AUTO']
892 except KeyError:
893 autos = ''
894 for name in M.split(autos)[0]:
895 try:
896 peer = Peer(name, cdb)
897 addpeer(peer, M.split(peer.get('peer'), quotep = True)[0])
898 except T.TripeJobError, err:
899 S.warn('connect', 'auto-add-failed', name, *err.args)
900
d64ce4ae
MW
901def init():
902 """
903 Initialization to be done before service startup.
904 """
905 global errorwatch, childwatch, pinger
906 errorwatch = ErrorWatch()
907 childwatch = ChildWatch()
908 pinger = Pinger()
909 T.Coroutine(dbwatch, name = 'dbwatch').switch()
910 errorwatch.switch()
911 pinger.switch()
912
a62f8e8a
MW
913def parse_options():
914 """
915 Parse the command-line options.
916
917 Automatically changes directory to the requested configdir, and turns on
918 debugging. Returns the options object.
919 """
920 op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
921 version = '%%prog %s' % VERSION)
922
923 op.add_option('-a', '--admin-socket',
924 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
925 help = 'Select socket to connect to [default %default]')
926 op.add_option('-d', '--directory',
927 metavar = 'DIR', dest = 'dir', default = T.configdir,
928 help = 'Select current diretory [default %default]')
929 op.add_option('-p', '--peerdb',
930 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
931 help = 'Select peers database [default %default]')
932 op.add_option('--daemon', dest = 'daemon',
933 default = False, action = 'store_true',
934 help = 'Become a daemon after successful initialization')
935 op.add_option('--debug', dest = 'debug',
936 default = False, action = 'store_true',
937 help = 'Emit debugging trace information')
938 op.add_option('--startup', dest = 'startup',
939 default = False, action = 'store_true',
940 help = 'Being called as part of the server startup')
941
942 opts, args = op.parse_args()
943 if args: op.error('no arguments permitted')
944 OS.chdir(opts.dir)
945 T._debug = opts.debug
946 return opts
947
948## Service table, for running manually.
d64ce4ae
MW
949service_info = [('connect', T.VERSION, {
950 'adopted': (0, 0, '', cmd_adopted),
951 'kick': (1, 1, 'PEER', cmd_kick),
a62f8e8a
MW
952 'passive': (1, None, '[OPTIONS] USER', cmd_passive),
953 'active': (1, 1, 'PEER', cmd_active),
954 'info': (1, 1, 'PEER', cmd_info),
d64ce4ae 955 'list-active': (0, 0, '', cmd_listactive),
965eb987
MW
956 'userpeer': (1, 1, 'USER', cmd_userpeer),
957 'sabotage': (1, 1, 'PEER', cmd_sabotage)
a62f8e8a
MW
958})]
959
960if __name__ == '__main__':
961 opts = parse_options()
04b3c57c 962 OS.environ['TRIPESOCK'] = opts.tripesock
a62f8e8a 963 T.runservices(opts.tripesock, service_info,
d64ce4ae 964 init = init, setup = setup,
a62f8e8a
MW
965 daemon = opts.daemon)
966
967###----- That's all, folks --------------------------------------------------