chiark / gitweb /
svc/conntrack: Make the kickpeers coroutine more robust.
[tripe] / svc / conntrack.in
1 #! @PYTHON@
2 ### -*-python-*-
3 ###
4 ### Service for automatically tracking network connection status
5 ###
6 ### (c) 2010 Straylight/Edgeware
7 ###
8
9 ###----- Licensing notice ---------------------------------------------------
10 ###
11 ### This file is part of Trivial IP Encryption (TrIPE).
12 ###
13 ### TrIPE is free software; you can redistribute it and/or modify
14 ### it under the terms of the GNU General Public License as published by
15 ### the Free Software Foundation; either version 2 of the License, or
16 ### (at your option) any later version.
17 ###
18 ### TrIPE is distributed in the hope that it will be useful,
19 ### but WITHOUT ANY WARRANTY; without even the implied warranty of
20 ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21 ### GNU General Public License for more details.
22 ###
23 ### You should have received a copy of the GNU General Public License
24 ### along with TrIPE; if not, write to the Free Software Foundation,
25 ### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
26
27 VERSION = '@VERSION@'
28
29 ###--------------------------------------------------------------------------
30 ### External dependencies.
31
32 from ConfigParser import RawConfigParser
33 from optparse import OptionParser
34 import os as OS
35 import sys as SYS
36 import socket as S
37 import mLib as M
38 import tripe as T
39 import dbus as D
40 for i in ['mainloop', 'mainloop.glib']:
41   __import__('dbus.%s' % i)
42 import gobject as G
43 from struct import pack, unpack
44
45 SM = T.svcmgr
46 ##__import__('rmcr').__debug = True
47
48 ###--------------------------------------------------------------------------
49 ### Utilities.
50
51 class struct (object):
52   """A simple container object."""
53   def __init__(me, **kw):
54     me.__dict__.update(kw)
55
56 def toposort(cmp, things):
57   """
58   Generate the THINGS in an order consistent with a given partial order.
59
60   The function CMP(X, Y) should return true if X must precede Y, and false if
61   it doesn't care.  If X and Y are equal then it should return false.
62
63   The THINGS may be any finite iterable; it is converted to a list
64   internally.
65   """
66
67   ## Make sure we can index the THINGS, and prepare an ordering table.
68   ## What's going on?  The THINGS might not have a helpful equality
69   ## predicate, so it's easier to work with indices.  The ordering table will
70   ## remember which THINGS (by index) are considered greater than other
71   ## things.
72   things = list(things)
73   n = len(things)
74   order = [{} for i in xrange(n)]
75   rorder = [{} for i in xrange(n)]
76   for i in xrange(n):
77     for j in xrange(n):
78       if i != j and cmp(things[i], things[j]):
79         order[j][i] = True
80         rorder[i][j] = True
81
82   ## Now we can do the sort.
83   out = []
84   while True:
85     done = True
86     for i in xrange(n):
87       if order[i] is not None:
88         done = False
89         if len(order[i]) == 0:
90           for j in rorder[i]:
91             del order[j][i]
92           yield things[i]
93           order[i] = None
94     if done:
95       break
96
97 ###--------------------------------------------------------------------------
98 ### Parse the configuration file.
99
100 ## Hmm.  Should I try to integrate this with the peers database?  It's not a
101 ## good fit; it'd need special hacks in tripe-newpeers.  And the use case for
102 ## this service are largely going to be satellite notes, I don't think
103 ## scalability's going to be a problem.
104
105 class Config (object):
106   """
107   Represents a configuration file.
108
109   The most interesting thing is probably the `groups' slot, which stores a
110   list of pairs (NAME, PATTERNS); the NAME is a string, and the PATTERNS a
111   list of (TAG, PEER, ADDR, MASK) triples.  The implication is that there
112   should be precisely one peer with a name matching NAME-*, and that it
113   should be NAME-TAG, where (TAG, PEER, ADDR, MASK) is the first triple such
114   that the host's primary IP address (if PEER is None -- or the IP address it
115   would use for communicating with PEER) is within the network defined by
116   ADDR/MASK.
117   """
118
119   def __init__(me, file):
120     """
121     Construct a new Config object, reading the given FILE.
122     """
123     me._file = file
124     me._fwatch = M.FWatch(file)
125     me._update()
126
127   def check(me):
128     """
129     See whether the configuration file has been updated.
130     """
131     if me._fwatch.update():
132       me._update()
133
134   def _update(me):
135     """
136     Internal function to update the configuration from the underlying file.
137     """
138
139     ## Read the configuration.  We have no need of the fancy substitutions,
140     ## so turn them all off.
141     cp = RawConfigParser()
142     cp.read(me._file)
143
144     ## Save the test address.  Make sure it's vaguely sensible.  The default
145     ## is probably good for most cases, in fact, since that address isn't
146     ## actually in use.  Note that we never send packets to the test address;
147     ## we just use it to discover routing information.
148     if cp.has_option('DEFAULT', 'test-addr'):
149       testaddr = cp.get('DEFAULT', 'test-addr')
150       S.inet_aton(testaddr)
151     else:
152       testaddr = '1.2.3.4'
153
154     ## Scan the configuration file and build the groups structure.
155     groups = []
156     for sec in cp.sections():
157       pats = []
158       for tag in cp.options(sec):
159         spec = cp.get(sec, tag).split()
160
161         ## Parse the entry into peer and network.
162         if len(spec) == 1:
163           peer = None
164           net = spec[0]
165         else:
166           peer, net = spec
167
168         ## Syntax of a net is ADDRESS/MASK, where ADDRESS is a dotted-quad,
169         ## and MASK is either a dotted-quad or a single integer N indicating
170         ## a mask with N leading ones followed by trailing zeroes.
171         slash = net.index('/')
172         addr, = unpack('>L', S.inet_aton(net[:slash]))
173         if net.find('.', slash + 1) >= 0:
174           mask, = unpack('>L', S.inet_aton(net[:slash]))
175         else:
176           n = int(net[slash + 1:], 10)
177           mask = (1 << 32) - (1 << 32 - n)
178         pats.append((tag, peer, addr & mask, mask))
179
180       ## Annoyingly, RawConfigParser doesn't preserve the order of options.
181       ## In order to make things vaguely sane, we topologically sort the
182       ## patterns so that more specific patterns are checked first.
183       pats = list(toposort(lambda (t, p, a, m), (tt, pp, aa, mm): \
184                              (p and not pp) or \
185                              (p == pp and m == (m | mm) and aa == (a & mm)),
186                            pats))
187       groups.append((sec, pats))
188
189     ## Done.
190     me.testaddr = testaddr
191     me.groups = groups
192
193 ### This will be a configuration file.
194 CF = None
195
196 ###--------------------------------------------------------------------------
197 ### Responding to a network up/down event.
198
199 def localaddr(peer):
200   """
201   Return the local IP address used for talking to PEER.
202   """
203   sk = S.socket(S.AF_INET, S.SOCK_DGRAM)
204   try:
205     try:
206       sk.connect((peer, 1))
207       addr, _ = sk.getsockname()
208       addr, = unpack('>L', S.inet_aton(addr))
209       return addr
210     except S.error:
211       return None
212   finally:
213     sk.close()
214
215 _kick = T.Queue()
216 def kickpeers():
217   lastip = {}
218   while True:
219     upness, reason = _kick.get()
220
221     ## Make sure the configuration file is up-to-date.  Don't worry if we
222     ## can't do anything useful.
223     try:
224       CF.check()
225     except Exception, exc:
226       SM.warn('conntrack', 'config-file-error',
227               exc.__class__.__name__, str(exc))
228
229     ## Find the current list of peers.
230     peers = SM.list()
231
232     ## Work out the primary IP address.
233     if upness:
234       addr = localaddr(CF.testaddr)
235       if addr is None:
236         upness = False
237     else:
238       addr = None
239
240     ## Now decide what to do.
241     changes = []
242     for g, pp in CF.groups:
243
244       ## Find out which peer in the group ought to be active.
245       ip = None
246       map = {}
247       want = None
248       for t, p, a, m in pp:
249         if p is None or not upness:
250           ipq = addr
251         else:
252           ipq = localaddr(p)
253         if upness and ip is None and \
254               ipq is not None and (ipq & m) == a:
255           map[t] = 'up'
256           want = t
257           ip = ipq
258         else:
259           map[t] = 'down'
260
261       ## Shut down the wrong ones.
262       found = False
263       for p in peers:
264         what = map.get(p, 'leave')
265         if what == 'up':
266           found = True
267         elif what == 'down':
268           def _(p = p):
269             try:
270               SM.kill(p)
271             except T.TripeError, exc:
272               if exc.args[0] == 'unknown-peer':
273                 ## Inherently racy; don't worry about this.
274                 pass
275               else:
276                 raise
277           changes.append(_)
278
279       ## Start the right one if necessary.
280       if want is not None and (not found or ip != lastip.get(g, None)):
281         def _(want = want):
282           try:
283             SM.svcsubmit('connect', 'active', want)
284           except T.TripeError, exc:
285             SM.warn('conntrack', 'connect-failed', want, *exc.args)
286         changes.append(_)
287       lastip[g] = ip
288
289     ## Commit the changes.
290     if changes:
291       SM.notify('conntrack', upness and 'up' or 'down', *reason)
292       for c in changes: c()
293
294 def netupdown(upness, reason):
295   """
296   Add or kill peers according to whether the network is up or down.
297
298   UPNESS is true if the network is up, or false if it's down.
299   """
300
301   _kick.put((upness, reason))
302
303 ###--------------------------------------------------------------------------
304 ### NetworkManager monitor.
305
306 NM_NAME = 'org.freedesktop.NetworkManager'
307 NM_PATH = '/org/freedesktop/NetworkManager'
308 NM_IFACE = NM_NAME
309 NMCA_IFACE = NM_NAME + '.Connection.Active'
310
311 NM_STATE_CONNECTED = 3
312
313 class NetworkManagerMonitor (object):
314   """
315   Watch NetworkManager signals for changes in network state.
316   """
317
318   ## Strategy.  There are two kinds of interesting state transitions for us.
319   ## The first one is the global are-we-connected state, which we'll use to
320   ## toggle network upness on a global level.  The second is which connection
321   ## has the default route, which we'll use to tweak which peer in the peer
322   ## group is active.  The former is most easily tracked using the signal
323   ## org.freedesktop.NetworkManager.StateChanged; for the latter, we track
324   ## org.freedesktop.NetworkManager.Connection.Active.PropertiesChanged and
325   ## look for when a new connection gains the default route.
326
327   def attach(me, bus):
328     try:
329       nm = bus.get_object(NM_NAME, NM_PATH)
330       state = nm.Get(NM_IFACE, 'State')
331       if state == NM_STATE_CONNECTED:
332         netupdown(True, ['nm', 'initially-connected'])
333       else:
334         netupdown(False, ['nm', 'initially-disconnected'])
335     except D.DBusException:
336       pass
337     bus.add_signal_receiver(me._nm_state, 'StateChanged', NM_IFACE,
338                             NM_NAME, NM_PATH)
339     bus.add_signal_receiver(me._nm_connchange,
340                             'PropertiesChanged', NMCA_IFACE,
341                             NM_NAME, None)
342
343   def _nm_state(me, state):
344     if state == NM_STATE_CONNECTED:
345       netupdown(True, ['nm', 'connected'])
346     else:
347       netupdown(False, ['nm', 'disconnected'])
348
349   def _nm_connchange(me, props):
350     if props.get('Default', False):
351       netupdown(True, ['nm', 'default-connection-change'])
352
353 ###--------------------------------------------------------------------------
354 ### Maemo monitor.
355
356 ICD_NAME = 'com.nokia.icd'
357 ICD_PATH = '/com/nokia/icd'
358 ICD_IFACE = ICD_NAME
359
360 class MaemoICdMonitor (object):
361   """
362   Watch ICd signals for changes in network state.
363   """
364
365   ## Strategy.  ICd only handles one connection at a time in steady state,
366   ## though when switching between connections, it tries to bring the new one
367   ## up before shutting down the old one.  This makes life a bit easier than
368   ## it is with NetworkManager.  On the other hand, the notifications are
369   ## relative to particular connections only, and the indicator that the old
370   ## connection is down (`IDLE') comes /after/ the new one comes up
371   ## (`CONNECTED'), so we have to remember which one is active.
372
373   def attach(me, bus):
374     try:
375       icd = bus.get_object(ICD_NAME, ICD_PATH)
376       try:
377         iap = icd.get_ipinfo(dbus_interface = ICD_IFACE)[0]
378         me._iap = iap
379         netupdown(True, ['icd', 'initially-connected', iap])
380       except D.DBusException:
381         me._iap = None
382         netupdown(False, ['icd', 'initially-disconnected'])
383     except D.DBusException:
384       me._iap = None
385     bus.add_signal_receiver(me._icd_state, 'status_changed', ICD_IFACE,
386                             ICD_NAME, ICD_PATH)
387
388   def _icd_state(me, iap, ty, state, hunoz):
389     if state == 'CONNECTED':
390       me._iap = iap
391       netupdown(True, ['icd', 'connected', iap])
392     elif state == 'IDLE' and iap == me._iap:
393       me._iap = None
394       netupdown(False, ['icd', 'idle'])
395
396 ###--------------------------------------------------------------------------
397 ### D-Bus connection tracking.
398
399 class DBusMonitor (object):
400   """
401   Maintains a connection to the system D-Bus, and watches for signals.
402
403   If the connection is initially down, or drops for some reason, we retry
404   periodically (every five seconds at the moment).  If the connection
405   resurfaces, we reattach the monitors.
406   """
407
408   def __init__(me):
409     """
410     Initialise the object and try to establish a connection to the bus.
411     """
412     me._mons = []
413     me._loop = D.mainloop.glib.DBusGMainLoop()
414     me._state = 'startup'
415     me._reconnect()
416
417   def addmon(me, mon):
418     """
419     Add a monitor object to watch for signals.
420
421     MON.attach(BUS) is called, with BUS being the connection to the system
422     bus.  MON should query its service's current status and watch for
423     relevant signals.
424     """
425     me._mons.append(mon)
426     if me._bus is not None:
427       mon.attach(me._bus)
428
429   def _reconnect(me, hunoz = None):
430     """
431     Start connecting to the bus.
432
433     If we fail the first time, retry periodically.
434     """
435     if me._state == 'startup':
436       T.aside(SM.notify, 'conntrack', 'dbus-connection', 'startup')
437     elif me._state == 'connected':
438       T.aside(SM.notify, 'conntrack', 'dbus-connection', 'lost')
439     else:
440       T.aside(SM.notify, 'conntrack', 'dbus-connection',
441               'state=%s' % me._state)
442     me._state == 'reconnecting'
443     me._bus = None
444     if me._try_connect():
445       G.timeout_add_seconds(5, me._try_connect)
446
447   def _try_connect(me):
448     """
449     Actually make a connection attempt.
450
451     If we succeed, attach the monitors.
452     """
453     try:
454       addr = OS.getenv('TRIPE_CONNTRACK_BUS')
455       if addr == 'SESSION':
456         bus = D.SessionBus(mainloop = me._loop, private = True)
457       elif addr is not None:
458         bus = D.bus.BusConnection(addr, mainloop = me._loop)
459       else:
460         bus = D.SystemBus(mainloop = me._loop, private = True)
461       for m in me._mons:
462         m.attach(bus)
463     except D.DBusException, e:
464       return True
465     me._bus = bus
466     me._state = 'connected'
467     bus.call_on_disconnection(me._reconnect)
468     T.aside(SM.notify, 'conntrack', 'dbus-connection', 'connected')
469     return False
470
471 ###--------------------------------------------------------------------------
472 ### TrIPE service.
473
474 class GIOWatcher (object):
475   """
476   Monitor I/O events using glib.
477   """
478   def __init__(me, conn, mc = G.main_context_default()):
479     me._conn = conn
480     me._watch = None
481     me._mc = mc
482   def connected(me, sock):
483     me._watch = G.io_add_watch(sock, G.IO_IN,
484                                lambda *hunoz: me._conn.receive())
485   def disconnected(me):
486     G.source_remove(me._watch)
487     me._watch = None
488   def iterate(me):
489     me._mc.iteration(True)
490
491 SM.iowatch = GIOWatcher(SM)
492
493 def init():
494   """
495   Service initialization.
496
497   Add the D-Bus monitor here, because we might send commands off immediately,
498   and we want to make sure the server connection is up.
499   """
500   global DBM
501   T.Coroutine(kickpeers, name = 'kickpeers').switch()
502   DBM = DBusMonitor()
503   DBM.addmon(NetworkManagerMonitor())
504   DBM.addmon(MaemoICdMonitor())
505   G.timeout_add_seconds(30, lambda: (netupdown(True, ['interval-timer'])
506                                      or True))
507
508 def parse_options():
509   """
510   Parse the command-line options.
511
512   Automatically changes directory to the requested configdir, and turns on
513   debugging.  Returns the options object.
514   """
515   op = OptionParser(usage = '%prog [-a FILE] [-d DIR]',
516                     version = '%%prog %s' % VERSION)
517
518   op.add_option('-a', '--admin-socket',
519                 metavar = 'FILE', dest = 'tripesock', default = T.tripesock,
520                 help = 'Select socket to connect to [default %default]')
521   op.add_option('-d', '--directory',
522                 metavar = 'DIR', dest = 'dir', default = T.configdir,
523                 help = 'Select current diretory [default %default]')
524   op.add_option('-c', '--config',
525                 metavar = 'FILE', dest = 'conf', default = 'conntrack.conf',
526                 help = 'Select configuration [default %default]')
527   op.add_option('--daemon', dest = 'daemon',
528                 default = False, action = 'store_true',
529                 help = 'Become a daemon after successful initialization')
530   op.add_option('--debug', dest = 'debug',
531                 default = False, action = 'store_true',
532                 help = 'Emit debugging trace information')
533   op.add_option('--startup', dest = 'startup',
534                 default = False, action = 'store_true',
535                 help = 'Being called as part of the server startup')
536
537   opts, args = op.parse_args()
538   if args: op.error('no arguments permitted')
539   OS.chdir(opts.dir)
540   T._debug = opts.debug
541   return opts
542
543 ## Service table, for running manually.
544 def cmd_updown(upness):
545   return lambda *args: T.defer(netupdown, upness, ['manual'] + list(args))
546 service_info = [('conntrack', VERSION, {
547   'up': (0, None, '', cmd_updown(True)),
548   'down': (0, None, '', cmd_updown(False))
549 })]
550
551 if __name__ == '__main__':
552   opts = parse_options()
553   CF = Config(opts.conf)
554   T.runservices(opts.tripesock, service_info,
555                 init = init, daemon = opts.daemon)
556
557 ###----- That's all, folks --------------------------------------------------