chiark / gitweb /
Use the new `mLib' annotations on varargs functions.
[tripe] / svc / watch.in
... / ...
CommitLineData
1#! @PYTHON@
2### -*-python-*-
3###
4### Watch arrival and departure of peers
5###
6### (c) 2007 Straylight/Edgeware
7###
8
9###----- Licensing notice ---------------------------------------------------
10###
11### This file is part of Trivial IP Encryption (TrIPE).
12###
13### TrIPE is free software; you can redistribute it and/or modify
14### it under the terms of the GNU General Public License as published by
15### the Free Software Foundation; either version 2 of the License, or
16### (at your option) any later version.
17###
18### TrIPE is distributed in the hope that it will be useful,
19### but WITHOUT ANY WARRANTY; without even the implied warranty of
20### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21### GNU General Public License for more details.
22###
23### You should have received a copy of the GNU General Public License
24### along with TrIPE; if not, write to the Free Software Foundation,
25### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
26
27VERSION = '@VERSION@'
28
29###--------------------------------------------------------------------------
30### External dependencies.
31
32from optparse import OptionParser
33import tripe as T
34import os as OS
35import signal as SIG
36import errno as E
37import cdb as CDB
38import mLib as M
39import re as RX
40from time import time
41import subprocess as PROC
42
43S = T.svcmgr
44
45###--------------------------------------------------------------------------
46### Running auxiliary commands.
47
48class SelLineQueue (M.SelLineBuffer):
49 """Glues the select-line-buffer into the coroutine queue system."""
50
51 def __new__(cls, file, queue, tag, kind):
52 """See __init__ for documentation."""
53 return M.SelLineBuffer.__new__(cls, file.fileno())
54
55 def __init__(me, file, queue, tag, kind):
56 """
57 Initialize a new line-reading adaptor.
58
59 The adaptor reads lines from FILE. Each line is inserted as a message of
60 the stated KIND, bearing the TAG, into the QUEUE. End-of-file is
61 represented as None.
62 """
63 me._q = queue
64 me._file = file
65 me._tag = tag
66 me._kind = kind
67 me.enable()
68
69 @T._callback
70 def line(me, line):
71 me._q.put((me._tag, me._kind, line))
72
73 @T._callback
74 def eof(me):
75 me.disable()
76 me._q.put((me._tag, me._kind, None))
77
78class ErrorWatch (T.Coroutine):
79 """
80 An object which watches stderr streams for errors and converts them into
81 warnings of the form
82
83 WARN watch INFO stderr LINE
84
85 The INFO is a list of tokens associated with the file when it was
86 registered.
87
88 Usually there is a single ErrorWatch object, called errorwatch.
89 """
90
91 def __init__(me):
92 """Initialization: there are no arguments."""
93 T.Coroutine.__init__(me)
94 me._q = T.Queue()
95 me._map = {}
96 me._seq = 1
97
98 def watch(me, file, info):
99 """
100 Adds FILE to the collection of files to watch.
101
102 INFO will be written in the warning messages from this FILE. Returns a
103 sequence number which can be used to unregister the file again.
104 """
105 seq = me._seq
106 me._seq += 1
107 me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
108 return seq
109
110 def unwatch(me, seq):
111 """Stop watching the file with sequence number SEQ."""
112 del me._map[seq]
113 return me
114
115 def run(me):
116 """
117 Coroutine function: read items from the queue and report them.
118
119 Unregisters files automatically when they reach EOF.
120 """
121 while True:
122 seq, _, line = me._q.get()
123 if line is None:
124 me.unwatch(seq)
125 else:
126 S.warn(*['watch'] + me._map[seq][0] + ['stderr', line])
127
128def dbwatch():
129 """
130 Coroutine function: wake up every second and notice changes to the
131 database. When a change happens, tell the Pinger (q.v.) to rescan its
132 peers.
133 """
134 cr = T.Coroutine.getcurrent()
135 main = cr.parent
136 fw = M.FWatch(opts.cdb)
137 while True:
138 timer = M.SelTimer(time() + 1, lambda: cr.switch())
139 main.switch()
140 if fw.update():
141 pinger.rescan(False)
142 S.notify('watch', 'peerdb-update')
143
144class ChildWatch (M.SelSignal):
145 """
146 An object which watches for specified processes exiting and reports
147 terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
148
149 There is usually only one ChildWatch object, called childwatch.
150 """
151
152 def __new__(cls):
153 """Initialize the child-watcher."""
154 return M.SelSignal.__new__(cls, SIG.SIGCHLD)
155
156 def __init__(me):
157 """Initialize the child-watcher."""
158 me._pid = {}
159 me.enable()
160
161 def watch(me, pid, queue, tag):
162 """
163 Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE)
164 to the QUEUE, where CODE is one of
165
166 * None (successful termination)
167 * ['exit-nonzero', CODE] (CODE is a string!)
168 * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
169 * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
170 """
171 me._pid[pid] = queue, tag
172 return me
173
174 def unwatch(me, pid):
175 """Unregister PID as a child to watch."""
176 del me._pid[pid]
177 return me
178
179 @T._callback
180 def signalled(me):
181 """
182 Called when child processes exit: collect exit statuses and report
183 failures.
184 """
185 while True:
186 try:
187 pid, status = OS.waitpid(-1, OS.WNOHANG)
188 except OSError, exc:
189 if exc.errno == E.ECHILD:
190 break
191 if pid == 0:
192 break
193 if pid not in me._pid:
194 continue
195 queue, tag = me._pid[pid]
196 if OS.WIFEXITED(status):
197 exit = OS.WEXITSTATUS(status)
198 if exit == 0:
199 code = None
200 else:
201 code = ['exit-nonzero', str(exit)]
202 elif OS.WIFSIGNALED(status):
203 code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
204 else:
205 code = ['exit-unknown', hex(status)]
206 queue.put((tag, 'exit', code))
207
208class Command (object):
209 """
210 Represents a running command.
211
212 This class is the main interface to the machery provided by the ChildWatch
213 and ErrorWatch objects. See also potwatch.
214 """
215
216 def __init__(me, info, queue, tag, args, env):
217 """
218 Start a new child process.
219
220 The ARGS are a list of arguments to be given to the child process. The
221 ENV is either None or a dictionary of environment variable assignments to
222 override the extant environment. INFO is a list of tokens to be included
223 in warnings about the child's stderr output. If the child writes a line
224 to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the
225 child exits, write (TAG, 'exit', CODE) to the QUEUE.
226 """
227 me._info = info
228 me._q = queue
229 me._tag = tag
230 myenv = OS.environ.copy()
231 if env: myenv.update(env)
232 me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
233 stdout = PROC.PIPE, stderr = PROC.PIPE)
234 me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
235 errorwatch.watch(me._proc.stderr, info)
236 childwatch.watch(me._proc.pid, queue, tag)
237
238 def __del__(me):
239 """
240 If I've been forgotten then stop watching for termination.
241 """
242 childwatch.unwatch(me._proc.pid)
243
244def potwatch(what, name, q):
245 """
246 Watch the queue Q for activity as reported by a Command object.
247
248 Information from the process's stdout is reported as
249
250 NOTE WHAT NAME stdout LINE
251
252 abnormal termination is reported as
253
254 WARN WHAT NAME CODE
255
256 where CODE is what the ChildWatch wrote.
257 """
258 eofp = deadp = False
259 while not deadp or not eofp:
260 _, kind, more = q.get()
261 if kind == 'stdout':
262 if more is None:
263 eofp = True
264 else:
265 S.notify('watch', what, name, 'stdout', more)
266 elif kind == 'exit':
267 if more: S.warn('watch', what, name, *more)
268 deadp = True
269
270###--------------------------------------------------------------------------
271### Peer database utilities.
272
273def timespec(info, key, default):
274 """Parse INFO[KEY] as a timespec, or return DEFAULT."""
275 try:
276 return T.timespec(info[key])
277 except (KeyError, T.TripeJobError):
278 return default
279
280def integer(info, key, default):
281 """Parse INFO[KEY] as an integer, or return DEFAULT."""
282 try:
283 return int(info[key])
284 except (KeyError, ValueError):
285 return default
286
287def boolean(info, key, default):
288 """Parse INFO[KEY] as a boolean, or return DEFAULT."""
289 try:
290 return info[key] in ['t', 'true', 'y', 'yes', 'on']
291 except (KeyError, ValueError):
292 return default
293
294def peerinfo(peer):
295 """
296 Return a dictionary containing information about PEER from the database.
297 """
298 return dict(M.URLDecode(CDB.init(opts.cdb)['P' + peer], semip = True))
299
300###--------------------------------------------------------------------------
301### Waking up and watching peers.
302
303def connect(peer, conn = None):
304 """
305 Start the job of connecting to the passive PEER.
306
307 The CONN string is a shell command which will connect to the peer (via some
308 back-channel, say ssh and userv), issue a command
309
310 SVCSUBMIT connect passive [OPTIONS] USER
311
312 and write the resulting challenge to standard error.
313 """
314 if conn is None:
315 try:
316 conn = peerinfo(peer)['connect']
317 except KeyError:
318 return
319 q = T.Queue()
320 cmd = Command(['connect', peer], q, 'connect',
321 ['/bin/sh', '-c', conn], None)
322 _, kind, more = q.peek()
323 if kind == 'stdout':
324 if more is None:
325 S.warn('watch', 'connect', peer, 'unexpected-eof')
326 else:
327 chal = more
328 S.greet(peer, chal)
329 q.get()
330 potwatch('connect', peer, q)
331
332def disconnect(peer, disconn = None):
333 """
334 Start the job of disconnecting from a passive PEER.
335
336 The DISCONN string is a shell command which will disconnect from the peer.
337 """
338 if disconn is None:
339 try:
340 conn = peerinfo(peer)['disconnect']
341 except KeyError:
342 return
343 q = T.Queue()
344 cmd = Command(['disconnect', peer], q, 'disconnect',
345 ['/bin/sh', '-c', disconn], None)
346 potwatch('disconnect', peer, q)
347
348_pingseq = 0
349class PingPeer (object):
350 """
351 Object representing a peer which we are pinging to ensure that it is still
352 present.
353
354 PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an
355 event queue -- which saves us from having an enormous swarm of coroutines
356 -- but most of the actual work is done here.
357
358 In order to avoid confusion between different PingPeer instances for the
359 same actual peer, each PingPeer has a sequence number (its `seq'
360 attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair.
361 (Using the PingPeer instance itself will prevent garbage collection of
362 otherwise defunct instances.)
363 """
364
365 def __init__(me, pinger, queue, peer, info, pingnow):
366 """
367 Create a new PingPeer.
368
369 The PINGER is the Pinger object we should send the results to. This is
370 used when we remove ourselves, if the peer has been explicitly removed.
371
372 The QUEUE is the event queue on which timer and ping-command events
373 should be written.
374
375 The PEER is just the peer's name, as a string.
376
377 The INFO is the database record for the peer, as a dictionary, or None if
378 it's not readily available. (This is just a tweak to save multiple
379 probes if we don't really need them.)
380
381 If PINGNOW is true, then immediately start pinging the peer. Otherwise
382 wait until the usual retry interval.
383 """
384 global _pingseq
385 me._pinger = pinger
386 me._q = queue
387 me._peer = peer
388 me.update(info)
389 me.seq = _pingseq
390 _pingseq += 1
391 me._failures = 0
392 if pingnow:
393 me._timer = None
394 me._ping()
395 else:
396 me._timer = M.SelTimer(time() + me._every, me._time)
397
398 def update(me, info):
399 """
400 Refreshes the timer parameters for this peer. We don't, however,
401 immediately reschedule anything: that will happen next time anything
402 interesting happens.
403 """
404 if info is None:
405 info = peerinfo(me._peer)
406 me._every = timespec(info, 'every', 120)
407 me._timeout = timespec(info, 'timeout', 10)
408 me._retries = integer(info, 'retries', 5)
409 me._connectp = 'connect' in info
410 return me
411
412 def _ping(me):
413 """
414 Send a ping to the peer; the result is sent to the Pinger's event queue.
415 """
416 S.rawcommand(T.TripeAsynchronousCommand(
417 me._q, (me._peer, me.seq),
418 ['EPING',
419 '-background', S.bgtag(),
420 '-timeout', str(me._timeout),
421 '--',
422 me._peer]))
423
424 def _reconnect(me):
425 info = peerinfo(me._peer)
426 if 'connect' in info:
427 S.warn('watch', 'reconnecting', me._peer)
428 S.forcekx(me._peer)
429 T.spawn(connect, me._peer)
430 me._timer = M.SelTimer(time() + me._every, me._time)
431 else:
432 S.kill(me._peer)
433
434 def event(me, code, stuff):
435 """
436 Respond to an event which happened to this peer.
437
438 Timer events indicate that we should start a new ping. (The server has
439 its own timeout which detects lost packets.)
440
441 We trap unknown-peer responses and detach from the Pinger.
442
443 If the ping fails and we run out of retries, we attempt to restart the
444 connection.
445 """
446 if code == 'TIMER':
447 me._failures = 0
448 me._ping()
449 elif code == 'FAIL':
450 S.notify('watch', 'ping-failed', me._peer, *stuff)
451 if not stuff:
452 pass
453 elif stuff[0] == 'unknown-peer':
454 me._pinger.kill(me._peer)
455 elif stuff[0] == 'ping-send-failed':
456 me._reconnect()
457 elif code == 'INFO':
458 if stuff[0] == 'ping-ok':
459 if me._failures > 0:
460 S.warn('watch', 'ping-ok', me._peer)
461 me._timer = M.SelTimer(time() + me._every, me._time)
462 elif stuff[0] == 'ping-timeout':
463 me._failures += 1
464 S.warn('watch', 'ping-timeout', me._peer,
465 'attempt', str(me._failures), 'of', str(me._retries))
466 if me._failures < me._retries:
467 me._ping()
468 else:
469 me._reconnect()
470 elif stuff[0] == 'ping-peer-died':
471 me._pinger.kill(me._peer)
472
473 @T._callback
474 def _time(me):
475 """
476 Handle timer callbacks by posting a timeout event on the queue.
477 """
478 me._timer = None
479 me._q.put(((me._peer, me.seq), 'TIMER', None))
480
481 def __str__(me):
482 return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
483 def __repr__(me):
484 return str(me)
485
486class Pinger (T.Coroutine):
487 """
488 The Pinger keeps track of the peers which we expect to be connected and
489 takes action if they seem to stop responding.
490
491 There is usually only one Pinger, called pinger.
492
493 The Pinger maintains a collection of PingPeer objects, and an event queue.
494 The PingPeers direct the results of their pings, and timer events, to the
495 event queue. The Pinger's coroutine picks items off the queue and
496 dispatches them back to the PingPeers as appropriate.
497 """
498
499 def __init__(me):
500 """Initialize the Pinger."""
501 T.Coroutine.__init__(me)
502 me._peers = {}
503 me._q = T.Queue()
504
505 def run(me):
506 """
507 Coroutine function: reads the pinger queue and sends events to the
508 PingPeer objects they correspond to.
509 """
510 while True:
511 (peer, seq), code, stuff = me._q.get()
512 if peer in me._peers and seq == me._peers[peer].seq:
513 me._peers[peer].event(code, stuff)
514
515 def add(me, peer, info, pingnow):
516 """
517 Add PEER to the collection of peers under the Pinger's watchful eye.
518 The arguments are as for PingPeer: see above.
519 """
520 me._peers[peer] = PingPeer(me, me._q, peer, info, pingnow)
521 return me
522
523 def kill(me, peer):
524 """Remove PEER from the peers being watched by the Pinger."""
525 del me._peers[peer]
526 return me
527
528 def rescan(me, startup):
529 """
530 General resynchronization method.
531
532 We scan the list of peers (with connect scripts) known at the server.
533 Any which are known to the Pinger but aren't known to the server are
534 removed from our list; newly arrived peers are added. (Note that a peer
535 can change state here either due to the server sneakily changing its list
536 without issuing notifications or, more likely, the database changing its
537 idea of whether a peer is interesting.) Finally, PingPeers which are
538 still present are prodded to update their timing parameters.
539
540 This method is called once at startup to pick up the peers already
541 installed, and again by the dbwatcher coroutine when it detects a change
542 to the database.
543 """
544 if T._debug: print '# rescan peers'
545 correct = {}
546 start = {}
547 for peer in S.list():
548 try:
549 info = peerinfo(peer)
550 except KeyError:
551 continue
552 if boolean(info, 'watch', False):
553 if T._debug: print '# interesting peer %s' % peer
554 correct[peer] = start[peer] = info
555 elif startup:
556 if T._debug: print '# peer %s ready for adoption' % peer
557 start[peer] = info
558 for peer, obj in me._peers.items():
559 if peer in correct:
560 obj.update(correct[peer])
561 else:
562 if T._debug: print '# peer %s vanished' % peer
563 del me._peers[peer]
564 for peer, info in start.iteritems():
565 if peer not in me._peers:
566 if startup:
567 if T._debug: print '# setting up peer %s' % peer
568 ifname = S.ifname(peer)
569 addr = S.addr(peer)
570 T.defer(addpeer, info, peer, ifname, *addr)
571 else:
572 if T._debug: print '# adopting new peer %s' % peer
573 me.add(peer, info, True)
574 return me
575
576 def adopted(me):
577 """
578 Returns the list of peers being watched by the Pinger.
579 """
580 return me._peers.keys()
581
582###--------------------------------------------------------------------------
583### New connections.
584
585def encode_envvars(env, prefix, vars):
586 """
587 Encode the variables in VARS suitably for including in a program
588 environment. Lowercase letters in variable names are forced to uppercase;
589 runs of non-alphanumeric characters are replaced by single underscores; and
590 the PREFIX is prepended. The resulting variables are written to ENV.
591 """
592 for k, v in vars.iteritems():
593 env[prefix + r_bad.sub('_', k.upper())] = v
594
595r_bad = RX.compile(r'[\W_]+')
596def envvars(peer, info):
597 """
598 Translate the database INFO dictionary for a PEER into a dictionary of
599 environment variables with plausible upper-case names and a P_ prefix.
600 Also collect the crypto information into A_ variables.
601 """
602 env = {}
603 encode_envvars(env, 'P_', info)
604 encode_envvars(env, 'A_', S.algs(peer))
605 return env
606
607def ifupdown(what, peer, info, *args):
608 """
609 Run the interface up/down script for a peer.
610
611 WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. INFO is the
612 database record dictionary. ARGS is a list of arguments to pass to the
613 script, in addition to the peer name.
614
615 The command is run and watched in the background by potwatch.
616 """
617 q = T.Queue()
618 c = Command([what, peer], q, what,
619 M.split(info[what], quotep = True)[0] +
620 [peer] + list(args),
621 envvars(peer, info))
622 potwatch(what, peer, q)
623
624def addpeer(info, peer, ifname, *addr):
625 """
626 Add a new peer to our collection.
627
628 INFO is the peer information dictionary, or None if we don't have one yet.
629
630 PEER names the peer; IFNAME is the interface name for its tunnel; and ADDR
631 is the list of tokens representing its address.
632
633 We try to bring up the interface and provoke a connection to the peer if
634 it's passive.
635 """
636 if info is None:
637 try:
638 info = peerinfo(peer)
639 except KeyError:
640 return
641 if 'ifup' in info:
642 T.Coroutine(ifupdown, name = 'ifup %s' % peer) \
643 .switch('ifup', peer, info, ifname, *addr)
644 if 'connect' in info:
645 T.Coroutine(connect, name = 'connect %s' % peer) \
646 .switch(peer, info['connect'])
647 if boolean(info, 'watch', False):
648 pinger.add(peer, info, False)
649
650def delpeer(peer):
651 """Drop the PEER from the Pinger and put its interface to bed."""
652 try:
653 info = peerinfo(peer)
654 except KeyError:
655 return
656 try:
657 pinger.kill(peer)
658 except KeyError:
659 pass
660 if 'disconnect' in info:
661 T.Coroutine(disconnect, name = 'disconnect %s' % peer) \
662 .switch(peer, info['disconnect'])
663 if 'ifdown' in info:
664 T.Coroutine(ifupdown, name = 'ifdown %s' % peer) \
665 .switch('ifdown', peer, info)
666
667def notify(_, code, *rest):
668 """
669 Watch for notifications.
670
671 We trap ADD and KILL notifications, and send them straight to addpeer and
672 delpeer respectively.
673 """
674 if code == 'ADD':
675 addpeer(None, *rest)
676 elif code == 'KILL':
677 delpeer(*rest)
678
679###--------------------------------------------------------------------------
680### Command stubs.
681
682def cmd_stub(*args):
683 raise T.TripeJobError('not-implemented')
684
685def cmd_kick(peer):
686 """
687 kick PEER: Force a new connection attempt for PEER
688 """
689 if peer not in pinger.adopted():
690 raise T.TripeJobError('peer-not-adopted', peer)
691 T.spawn(connect, peer)
692
693def cmd_adopted():
694 """
695 adopted: Report a list of adopted peers.
696 """
697 for peer in pinger.adopted():
698 T.svcinfo(peer)
699
700###--------------------------------------------------------------------------
701### Start up.
702
703def setup():
704 """
705 Service setup.
706
707 Register the notification watcher, and rescan the peers.
708 """
709 S.handler['NOTE'] = notify
710 S.watch('+n')
711 pinger.rescan(opts.startup)
712
713def init():
714 """
715 Initialization to be done before service startup.
716 """
717 global errorwatch, childwatch, pinger
718 errorwatch = ErrorWatch()
719 childwatch = ChildWatch()
720 pinger = Pinger()
721 T.Coroutine(dbwatch, name = 'dbwatch').switch()
722 errorwatch.switch()
723 pinger.switch()
724
725def parse_options():
726 """
727 Parse the command-line options.
728
729 Automatically changes directory to the requested configdir, and turns on
730 debugging. Returns the options object.
731 """
732 op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
733 version = '%%prog %s' % VERSION)
734
735 op.add_option('-a', '--admin-socket',
736 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
737 help = 'Select socket to connect to [default %default]')
738 op.add_option('-d', '--directory',
739 metavar = 'DIR', dest = 'dir', default = T.configdir,
740 help = 'Select current diretory [default %default]')
741 op.add_option('-p', '--peerdb',
742 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
743 help = 'Select peers database [default %default]')
744 op.add_option('--daemon', dest = 'daemon',
745 default = False, action = 'store_true',
746 help = 'Become a daemon after successful initialization')
747 op.add_option('--debug', dest = 'debug',
748 default = False, action = 'store_true',
749 help = 'Emit debugging trace information')
750 op.add_option('--startup', dest = 'startup',
751 default = False, action = 'store_true',
752 help = 'Being called as part of the server startup')
753
754 opts, args = op.parse_args()
755 if args: op.error('no arguments permitted')
756 OS.chdir(opts.dir)
757 T._debug = opts.debug
758 return opts
759
760## Service table, for running manually.
761service_info = [('watch', T.VERSION, {
762 'adopted': (0, 0, '', cmd_adopted),
763 'kick': (1, 1, 'PEER', cmd_kick)
764})]
765
766if __name__ == '__main__':
767 opts = parse_options()
768 T.runservices(opts.tripesock, service_info,
769 init = init, setup = setup,
770 daemon = opts.daemon)
771
772###----- That's all, folks --------------------------------------------------