chiark / gitweb /
Merge branch 'mdw/backoff'
[tripe] / svc / watch.in
CommitLineData
a62f8e8a
MW
1#! @PYTHON@
2### -*-python-*-
3###
4### Watch arrival and departure of peers
5###
6### (c) 2007 Straylight/Edgeware
7###
8
9###----- Licensing notice ---------------------------------------------------
10###
11### This file is part of Trivial IP Encryption (TrIPE).
12###
13### TrIPE is free software; you can redistribute it and/or modify
14### it under the terms of the GNU General Public License as published by
15### the Free Software Foundation; either version 2 of the License, or
16### (at your option) any later version.
17###
18### TrIPE is distributed in the hope that it will be useful,
19### but WITHOUT ANY WARRANTY; without even the implied warranty of
20### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21### GNU General Public License for more details.
22###
23### You should have received a copy of the GNU General Public License
24### along with TrIPE; if not, write to the Free Software Foundation,
25### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
26
27VERSION = '@VERSION@'
28
29###--------------------------------------------------------------------------
30### External dependencies.
31
32from optparse import OptionParser
33import tripe as T
34import os as OS
35import signal as SIG
36import errno as E
37import cdb as CDB
38import mLib as M
39import re as RX
40from time import time
41import subprocess as PROC
42
43S = T.svcmgr
44
45###--------------------------------------------------------------------------
46### Running auxiliary commands.
47
48class SelLineQueue (M.SelLineBuffer):
49 """Glues the select-line-buffer into the coroutine queue system."""
50
51 def __new__(cls, file, queue, tag, kind):
52 """See __init__ for documentation."""
53 return M.SelLineBuffer.__new__(cls, file.fileno())
54
55 def __init__(me, file, queue, tag, kind):
56 """
57 Initialize a new line-reading adaptor.
58
59 The adaptor reads lines from FILE. Each line is inserted as a message of
60 the stated KIND, bearing the TAG, into the QUEUE. End-of-file is
61 represented as None.
62 """
63 me._q = queue
64 me._file = file
65 me._tag = tag
66 me._kind = kind
67 me.enable()
68
69 @T._callback
70 def line(me, line):
71 me._q.put((me._tag, me._kind, line))
72
73 @T._callback
74 def eof(me):
75 me.disable()
76 me._q.put((me._tag, me._kind, None))
77
78class ErrorWatch (T.Coroutine):
79 """
80 An object which watches stderr streams for errors and converts them into
81 warnings of the form
82
83 WARN watch INFO stderr LINE
84
85 The INFO is a list of tokens associated with the file when it was
86 registered.
87
88 Usually there is a single ErrorWatch object, called errorwatch.
89 """
90
91 def __init__(me):
92 """Initialization: there are no arguments."""
93 T.Coroutine.__init__(me)
94 me._q = T.Queue()
95 me._map = {}
96 me._seq = 1
97
98 def watch(me, file, info):
99 """
100 Adds FILE to the collection of files to watch.
101
102 INFO will be written in the warning messages from this FILE. Returns a
103 sequence number which can be used to unregister the file again.
104 """
105 seq = me._seq
106 me._seq += 1
107 me._map[seq] = info, SelLineQueue(file, me._q, seq, 'stderr')
108 return seq
109
110 def unwatch(me, seq):
111 """Stop watching the file with sequence number SEQ."""
112 del me._map[seq]
113 return me
114
115 def run(me):
116 """
117 Coroutine function: read items from the queue and report them.
118
119 Unregisters files automatically when they reach EOF.
120 """
121 while True:
122 seq, _, line = me._q.get()
123 if line is None:
124 me.unwatch(seq)
125 else:
126 S.warn(*['watch'] + me._map[seq][0] + ['stderr', line])
127
128def dbwatch():
129 """
130 Coroutine function: wake up every second and notice changes to the
131 database. When a change happens, tell the Pinger (q.v.) to rescan its
132 peers.
133 """
134 cr = T.Coroutine.getcurrent()
135 main = cr.parent
136 fw = M.FWatch(opts.cdb)
137 while True:
138 timer = M.SelTimer(time() + 1, lambda: cr.switch())
139 main.switch()
140 if fw.update():
141 pinger.rescan(False)
142 S.notify('watch', 'peerdb-update')
143
144class ChildWatch (M.SelSignal):
145 """
146 An object which watches for specified processes exiting and reports
147 terminations by writing items of the form (TAG, 'exit', RESULT) to a queue.
148
149 There is usually only one ChildWatch object, called childwatch.
150 """
151
152 def __new__(cls):
153 """Initialize the child-watcher."""
154 return M.SelSignal.__new__(cls, SIG.SIGCHLD)
155
156 def __init__(me):
157 """Initialize the child-watcher."""
158 me._pid = {}
159 me.enable()
160
161 def watch(me, pid, queue, tag):
162 """
163 Register PID as a child to watch. If it exits, write (TAG, 'exit', CODE)
164 to the QUEUE, where CODE is one of
165
166 * None (successful termination)
167 * ['exit-nonzero', CODE] (CODE is a string!)
168 * ['exit-signal', 'S' + CODE] (CODE is the signal number as a string)
169 * ['exit-unknown', STATUS] (STATUS is the entire exit status, in hex)
170 """
171 me._pid[pid] = queue, tag
172 return me
173
174 def unwatch(me, pid):
175 """Unregister PID as a child to watch."""
176 del me._pid[pid]
177 return me
178
179 @T._callback
180 def signalled(me):
181 """
182 Called when child processes exit: collect exit statuses and report
183 failures.
184 """
185 while True:
186 try:
187 pid, status = OS.waitpid(-1, OS.WNOHANG)
188 except OSError, exc:
189 if exc.errno == E.ECHILD:
190 break
191 if pid == 0:
192 break
193 if pid not in me._pid:
194 continue
195 queue, tag = me._pid[pid]
196 if OS.WIFEXITED(status):
197 exit = OS.WEXITSTATUS(status)
198 if exit == 0:
199 code = None
200 else:
201 code = ['exit-nonzero', str(exit)]
202 elif OS.WIFSIGNALED(status):
203 code = ['exit-signal', 'S' + str(OS.WTERMSIG(status))]
204 else:
205 code = ['exit-unknown', hex(status)]
206 queue.put((tag, 'exit', code))
207
208class Command (object):
209 """
210 Represents a running command.
211
212 This class is the main interface to the machery provided by the ChildWatch
213 and ErrorWatch objects. See also potwatch.
214 """
215
216 def __init__(me, info, queue, tag, args, env):
217 """
218 Start a new child process.
219
220 The ARGS are a list of arguments to be given to the child process. The
221 ENV is either None or a dictionary of environment variable assignments to
222 override the extant environment. INFO is a list of tokens to be included
223 in warnings about the child's stderr output. If the child writes a line
224 to standard output, put (TAG, 'stdout', LINE) to the QUEUE. When the
225 child exits, write (TAG, 'exit', CODE) to the QUEUE.
226 """
227 me._info = info
228 me._q = queue
229 me._tag = tag
230 myenv = OS.environ.copy()
231 if env: myenv.update(env)
232 me._proc = PROC.Popen(args = args, env = myenv, bufsize = 1,
233 stdout = PROC.PIPE, stderr = PROC.PIPE)
234 me._lq = SelLineQueue(me._proc.stdout, queue, tag, 'stdout')
235 errorwatch.watch(me._proc.stderr, info)
236 childwatch.watch(me._proc.pid, queue, tag)
237
238 def __del__(me):
239 """
240 If I've been forgotten then stop watching for termination.
241 """
242 childwatch.unwatch(me._proc.pid)
243
244def potwatch(what, name, q):
245 """
246 Watch the queue Q for activity as reported by a Command object.
247
248 Information from the process's stdout is reported as
249
250 NOTE WHAT NAME stdout LINE
251
252 abnormal termination is reported as
253
254 WARN WHAT NAME CODE
255
256 where CODE is what the ChildWatch wrote.
257 """
258 eofp = deadp = False
259 while not deadp or not eofp:
260 _, kind, more = q.get()
261 if kind == 'stdout':
262 if more is None:
263 eofp = True
264 else:
265 S.notify('watch', what, name, 'stdout', more)
266 elif kind == 'exit':
267 if more: S.warn('watch', what, name, *more)
268 deadp = True
269
270###--------------------------------------------------------------------------
271### Peer database utilities.
272
273def timespec(info, key, default):
274 """Parse INFO[KEY] as a timespec, or return DEFAULT."""
275 try:
276 return T.timespec(info[key])
277 except (KeyError, T.TripeJobError):
278 return default
279
280def integer(info, key, default):
281 """Parse INFO[KEY] as an integer, or return DEFAULT."""
282 try:
283 return int(info[key])
284 except (KeyError, ValueError):
285 return default
286
287def boolean(info, key, default):
288 """Parse INFO[KEY] as a boolean, or return DEFAULT."""
289 try:
290 return info[key] in ['t', 'true', 'y', 'yes', 'on']
291 except (KeyError, ValueError):
292 return default
293
294def peerinfo(peer):
295 """
296 Return a dictionary containing information about PEER from the database.
297 """
298 return dict(M.URLDecode(CDB.init(opts.cdb)['P' + peer], semip = True))
299
300###--------------------------------------------------------------------------
301### Waking up and watching peers.
302
303def connect(peer, conn = None):
304 """
305 Start the job of connecting to the passive PEER.
306
307 The CONN string is a shell command which will connect to the peer (via some
308 back-channel, say ssh and userv), issue a command
309
310 SVCSUBMIT connect passive [OPTIONS] USER
311
312 and write the resulting challenge to standard error.
313 """
314 if conn is None:
315 try:
316 conn = peerinfo(peer)['connect']
317 except KeyError:
318 return
319 q = T.Queue()
320 cmd = Command(['connect', peer], q, 'connect',
321 ['/bin/sh', '-c', conn], None)
322 _, kind, more = q.peek()
323 if kind == 'stdout':
324 if more is None:
325 S.warn('watch', 'connect', peer, 'unexpected-eof')
326 else:
327 chal = more
328 S.greet(peer, chal)
329 q.get()
330 potwatch('connect', peer, q)
331
332_pingseq = 0
333class PingPeer (object):
334 """
335 Object representing a peer which we are pinging to ensure that it is still
336 present.
337
338 PingPeer objects are held by the Pinger (q.v.). The Pinger maintains an
339 event queue -- which saves us from having an enormous swarm of coroutines
340 -- but most of the actual work is done here.
341
342 In order to avoid confusion between different PingPeer instances for the
343 same actual peer, each PingPeer has a sequence number (its `seq'
344 attribute). Events for the PingPeer are identified by a (PEER, SEQ) pair.
345 (Using the PingPeer instance itself will prevent garbage collection of
346 otherwise defunct instances.)
347 """
348
349 def __init__(me, pinger, queue, peer, info, pingnow):
350 """
351 Create a new PingPeer.
352
353 The PINGER is the Pinger object we should send the results to. This is
354 used when we remove ourselves, if the peer has been explicitly removed.
355
356 The QUEUE is the event queue on which timer and ping-command events
357 should be written.
358
359 The PEER is just the peer's name, as a string.
360
361 The INFO is the database record for the peer, as a dictionary, or None if
362 it's not readily available. (This is just a tweak to save multiple
363 probes if we don't really need them.)
364
365 If PINGNOW is true, then immediately start pinging the peer. Otherwise
366 wait until the usual retry interval.
367 """
368 global _pingseq
369 me._pinger = pinger
370 me._q = queue
371 me._peer = peer
372 me.update(info)
373 me.seq = _pingseq
374 _pingseq += 1
375 me._failures = 0
376 if pingnow:
377 me._timer = None
378 me._ping()
379 else:
380 me._timer = M.SelTimer(time() + me._every, me._time)
381
382 def update(me, info):
383 """
384 Refreshes the timer parameters for this peer. We don't, however,
385 immediately reschedule anything: that will happen next time anything
386 interesting happens.
387 """
388 if info is None:
389 info = peerinfo(me._peer)
390 me._every = timespec(info, 'every', 120)
391 me._timeout = timespec(info, 'timeout', 10)
392 me._retries = integer(info, 'retries', 5)
393 me._connectp = 'connect' in info
394 return me
395
396 def _ping(me):
397 """
398 Send a ping to the peer; the result is sent to the Pinger's event queue.
399 """
400 S.rawcommand(T.TripeAsynchronousCommand(
401 me._q, (me._peer, me.seq),
474ac4c5 402 ['EPING',
a62f8e8a
MW
403 '-background', S.bgtag(),
404 '-timeout', str(me._timeout),
405 '--',
406 me._peer]))
407
690a6ec1
MW
408 def _reconnect(me):
409 info = peerinfo(me._peer)
410 if 'connect' in info:
411 S.warn('watch', 'reconnecting', me._peer)
412 S.forcekx(me._peer)
413 T.spawn(connect, me._peer)
414 me._timer = M.SelTimer(time() + me._every, me._time)
415 else:
416 S.kill(me._peer)
417
a62f8e8a
MW
418 def event(me, code, stuff):
419 """
420 Respond to an event which happened to this peer.
421
422 Timer events indicate that we should start a new ping. (The server has
423 its own timeout which detects lost packets.)
424
425 We trap unknown-peer responses and detach from the Pinger.
426
427 If the ping fails and we run out of retries, we attempt to restart the
428 connection.
429 """
430 if code == 'TIMER':
431 me._failures = 0
432 me._ping()
433 elif code == 'FAIL':
434 S.notify('watch', 'ping-failed', me._peer, *stuff)
474ac4c5
MW
435 if not stuff:
436 pass
437 elif stuff[0] == 'unknown-peer':
a62f8e8a 438 me._pinger.kill(me._peer)
474ac4c5
MW
439 elif stuff[0] == 'ping-send-failed':
440 me._reconnect()
a62f8e8a
MW
441 elif code == 'INFO':
442 if stuff[0] == 'ping-ok':
443 if me._failures > 0:
444 S.warn('watch', 'ping-ok', me._peer)
445 me._timer = M.SelTimer(time() + me._every, me._time)
446 elif stuff[0] == 'ping-timeout':
447 me._failures += 1
448 S.warn('watch', 'ping-timeout', me._peer,
449 'attempt', str(me._failures), 'of', str(me._retries))
450 if me._failures < me._retries:
451 me._ping()
452 else:
690a6ec1 453 me._reconnect()
a62f8e8a
MW
454 elif stuff[0] == 'ping-peer-died':
455 me._pinger.kill(me._peer)
456
457 @T._callback
458 def _time(me):
459 """
460 Handle timer callbacks by posting a timeout event on the queue.
461 """
462 me._timer = None
463 me._q.put(((me._peer, me.seq), 'TIMER', None))
464
465 def __str__(me):
466 return 'PingPeer(%s, %d, f = %d)' % (me._peer, me.seq, me._failures)
467 def __repr__(me):
468 return str(me)
469
470class Pinger (T.Coroutine):
471 """
472 The Pinger keeps track of the peers which we expect to be connected and
473 takes action if they seem to stop responding.
474
475 There is usually only one Pinger, called pinger.
476
477 The Pinger maintains a collection of PingPeer objects, and an event queue.
478 The PingPeers direct the results of their pings, and timer events, to the
479 event queue. The Pinger's coroutine picks items off the queue and
480 dispatches them back to the PingPeers as appropriate.
481 """
482
483 def __init__(me):
484 """Initialize the Pinger."""
485 T.Coroutine.__init__(me)
486 me._peers = {}
487 me._q = T.Queue()
488
489 def run(me):
490 """
491 Coroutine function: reads the pinger queue and sends events to the
492 PingPeer objects they correspond to.
493 """
494 while True:
495 (peer, seq), code, stuff = me._q.get()
496 if peer in me._peers and seq == me._peers[peer].seq:
497 me._peers[peer].event(code, stuff)
498
499 def add(me, peer, info, pingnow):
500 """
501 Add PEER to the collection of peers under the Pinger's watchful eye.
502 The arguments are as for PingPeer: see above.
503 """
504 me._peers[peer] = PingPeer(me, me._q, peer, info, pingnow)
505 return me
506
507 def kill(me, peer):
508 """Remove PEER from the peers being watched by the Pinger."""
509 del me._peers[peer]
510 return me
511
512 def rescan(me, startup):
513 """
514 General resynchronization method.
515
516 We scan the list of peers (with connect scripts) known at the server.
517 Any which are known to the Pinger but aren't known to the server are
518 removed from our list; newly arrived peers are added. (Note that a peer
519 can change state here either due to the server sneakily changing its list
520 without issuing notifications or, more likely, the database changing its
521 idea of whether a peer is interesting.) Finally, PingPeers which are
522 still present are prodded to update their timing parameters.
523
524 This method is called once at startup to pick up the peers already
525 installed, and again by the dbwatcher coroutine when it detects a change
526 to the database.
527 """
220025b9 528 if T._debug: print '# rescan peers'
a62f8e8a 529 correct = {}
97bbf88a 530 start = {}
a62f8e8a
MW
531 for peer in S.list():
532 try:
533 info = peerinfo(peer)
534 except KeyError:
535 continue
536 if boolean(info, 'watch', False):
220025b9 537 if T._debug: print '# interesting peer %s' % peer
97bbf88a
MW
538 correct[peer] = start[peer] = info
539 elif startup:
540 if T._debug: print '# peer %s ready for adoption' % peer
541 start[peer] = info
a62f8e8a
MW
542 for peer, obj in me._peers.items():
543 if peer in correct:
544 obj.update(correct[peer])
545 else:
220025b9 546 if T._debug: print '# peer %s vanished' % peer
a62f8e8a 547 del me._peers[peer]
97bbf88a 548 for peer, info in start.iteritems():
a62f8e8a
MW
549 if peer not in me._peers:
550 if startup:
220025b9 551 if T._debug: print '# setting up peer %s' % peer
a62f8e8a
MW
552 ifname = S.ifname(peer)
553 addr = S.addr(peer)
a02240ba 554 T.defer(addpeer, info, peer, ifname, *addr)
a62f8e8a 555 else:
220025b9 556 if T._debug: print '# adopting new peer %s' % peer
a62f8e8a
MW
557 me.add(peer, info, True)
558 return me
559
560 def adopted(me):
561 """
562 Returns the list of peers being watched by the Pinger.
563 """
564 return me._peers.keys()
565
566###--------------------------------------------------------------------------
567### New connections.
568
569def encode_envvars(env, prefix, vars):
570 """
571 Encode the variables in VARS suitably for including in a program
572 environment. Lowercase letters in variable names are forced to uppercase;
573 runs of non-alphanumeric characters are replaced by single underscores; and
574 the PREFIX is prepended. The resulting variables are written to ENV.
575 """
576 for k, v in vars.iteritems():
577 env[prefix + r_bad.sub('_', k.upper())] = v
578
579r_bad = RX.compile(r'[\W_]+')
580def envvars(info):
581 """
582 Translate the database INFO dictionary for a peer into a dictionary of
583 environment variables with plausible upper-case names and a P_ prefix.
584 Also collect the crypto information into A_ variables.
585 """
586 env = {}
587 encode_envvars(env, 'P_', info)
588 encode_envvars(env, 'A_', S.algs())
589 return env
590
591def ifupdown(what, peer, info, *args):
592 """
593 Run the interface up/down script for a peer.
594
595 WHAT is 'ifup' or 'ifdown'. PEER names the peer in question. INFO is the
596 database record dictionary. ARGS is a list of arguments to pass to the
597 script, in addition to the peer name.
598
599 The command is run and watched in the background by potwatch.
600 """
601 q = T.Queue()
602 c = Command([what, peer], q, what,
603 M.split(info[what], quotep = True)[0] +
604 [peer] + list(args),
605 envvars(info))
606 potwatch(what, peer, q)
607
608def addpeer(info, peer, ifname, *addr):
609 """
610 Add a new peer to our collection.
611
612 INFO is the peer information dictionary, or None if we don't have one yet.
613
614 PEER names the peer; IFNAME is the interface name for its tunnel; and ADDR
615 is the list of tokens representing its address.
616
617 We try to bring up the interface and provoke a connection to the peer if
618 it's passive.
619 """
620 if info is None:
621 try:
622 info = peerinfo(peer)
623 except KeyError:
624 return
625 if 'ifup' in info:
22b47552
MW
626 T.Coroutine(ifupdown, name = 'ifup %s' % peer) \
627 .switch('ifup', peer, info, ifname, *addr)
a62f8e8a 628 if 'connect' in info:
22b47552
MW
629 T.Coroutine(connect, name = 'connect %s' % peer) \
630 .switch(peer, info['connect'])
a62f8e8a
MW
631 if boolean(info, 'watch', False):
632 pinger.add(peer, info, False)
633
634def delpeer(peer):
635 """Drop the PEER from the Pinger and put its interface to bed."""
636 try:
637 info = peerinfo(peer)
638 except KeyError:
639 return
640 try:
641 pinger.kill(peer)
642 except KeyError:
643 pass
644 if 'ifdown' in info:
22b47552
MW
645 T.Coroutine(ifupdown, name = 'ifdown %s' % peer) \
646 .switch('ifdown', peer, info)
a62f8e8a
MW
647
648def notify(_, code, *rest):
649 """
650 Watch for notifications.
651
652 We trap ADD and KILL notifications, and send them straight to addpeer and
653 delpeer respectively.
654 """
655 if code == 'ADD':
656 addpeer(None, *rest)
657 elif code == 'KILL':
658 delpeer(*rest)
659
660###--------------------------------------------------------------------------
661### Command stubs.
662
663def cmd_stub(*args):
664 raise T.TripeJobError('not-implemented')
665
666def cmd_kick(peer):
667 """
668 kick PEER: Force a new connection attempt for PEER
669 """
670 if peer not in pinger.adopted():
671 raise T.TripeJobError('peer-not-adopted', peer)
690a6ec1 672 T.spawn(connect, peer)
a62f8e8a
MW
673
674def cmd_adopted():
675 """
676 adopted: Report a list of adopted peers.
677 """
678 for peer in pinger.adopted():
679 T.svcinfo(peer)
680
681###--------------------------------------------------------------------------
682### Start up.
683
684def setup():
685 """
686 Service setup.
687
688 Register the notification watcher, and rescan the peers.
689 """
690 S.handler['NOTE'] = notify
691 S.watch('+n')
692 pinger.rescan(opts.startup)
693
694def init():
695 """
696 Initialization to be done before service startup.
697 """
698 global errorwatch, childwatch, pinger
699 errorwatch = ErrorWatch()
700 childwatch = ChildWatch()
701 pinger = Pinger()
22b47552 702 T.Coroutine(dbwatch, name = 'dbwatch').switch()
a62f8e8a
MW
703 errorwatch.switch()
704 pinger.switch()
705
706def parse_options():
707 """
708 Parse the command-line options.
709
710 Automatically changes directory to the requested configdir, and turns on
711 debugging. Returns the options object.
712 """
713 op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
714 version = '%%prog %s' % VERSION)
715
716 op.add_option('-a', '--admin-socket',
717 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
718 help = 'Select socket to connect to [default %default]')
719 op.add_option('-d', '--directory',
720 metavar = 'DIR', dest = 'dir', default = T.configdir,
721 help = 'Select current diretory [default %default]')
722 op.add_option('-p', '--peerdb',
723 metavar = 'FILE', dest = 'cdb', default = T.peerdb,
724 help = 'Select peers database [default %default]')
725 op.add_option('--daemon', dest = 'daemon',
726 default = False, action = 'store_true',
727 help = 'Become a daemon after successful initialization')
728 op.add_option('--debug', dest = 'debug',
729 default = False, action = 'store_true',
730 help = 'Emit debugging trace information')
731 op.add_option('--startup', dest = 'startup',
732 default = False, action = 'store_true',
733 help = 'Being called as part of the server startup')
734
735 opts, args = op.parse_args()
736 if args: op.error('no arguments permitted')
737 OS.chdir(opts.dir)
738 T._debug = opts.debug
739 return opts
740
741## Service table, for running manually.
742service_info = [('watch', T.VERSION, {
743 'adopted': (0, 0, '', cmd_adopted),
744 'kick': (1, 1, 'PEER', cmd_kick)
745})]
746
747if __name__ == '__main__':
748 opts = parse_options()
749 T.runservices(opts.tripesock, service_info,
750 init = init, setup = setup,
751 daemon = opts.daemon)
752
753###----- That's all, folks --------------------------------------------------