chiark / gitweb /
cf7d4673685107369c8895a8c569d5a89879563f
[elogind.git] / test / sysv-generator-test.py
1 # systemd-sysv-generator integration test
2 #
3 # (C) 2015 Canonical Ltd.
4 # Author: Martin Pitt <martin.pitt@ubuntu.com>
5 #
6 # systemd is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as published by
8 # the Free Software Foundation; either version 2.1 of the License, or
9 # (at your option) any later version.
10
11 # systemd is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # Lesser General Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public License
17 # along with systemd; If not, see <http://www.gnu.org/licenses/>.
18
19 import unittest
20 import sys
21 import os
22 import subprocess
23 import tempfile
24 import shutil
25 from glob import glob
26
27 try:
28     from configparser import RawConfigParser
29 except ImportError:
30     # python 2
31     from ConfigParser import RawConfigParser
32
33 sysv_generator = os.path.join(os.environ.get('builddir', '.'), 'systemd-sysv-generator')
34
35
36 class SysvGeneratorTest(unittest.TestCase):
37     def setUp(self):
38         self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.')
39         self.init_d_dir = os.path.join(self.workdir, 'init.d')
40         os.mkdir(self.init_d_dir)
41         self.rcnd_dir = self.workdir
42         self.unit_dir = os.path.join(self.workdir, 'systemd')
43         os.mkdir(self.unit_dir)
44         self.out_dir = os.path.join(self.workdir, 'output')
45         os.mkdir(self.out_dir)
46
47     def tearDown(self):
48         shutil.rmtree(self.workdir)
49
50     #
51     # Helper methods
52     #
53
54     def run_generator(self, expect_error=False):
55         '''Run sysv-generator.
56
57         Fail if stderr contains any "Fail", unless expect_error is True.
58         Return (stderr, filename -> ConfigParser) pair with ouput to stderr and
59         parsed generated units.
60         '''
61         env = os.environ.copy()
62         env['SYSTEMD_LOG_LEVEL'] = 'debug'
63         env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir
64         env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir
65         env['SYSTEMD_UNIT_PATH'] = self.unit_dir
66         gen = subprocess.Popen(
67             [sysv_generator, 'ignored', 'ignored', self.out_dir],
68             stdout=subprocess.PIPE, stderr=subprocess.PIPE,
69             universal_newlines=True, env=env)
70         (out, err) = gen.communicate()
71         if not expect_error:
72             self.assertFalse('Fail' in err, err)
73         self.assertEqual(gen.returncode, 0, err)
74
75         results = {}
76         for service in glob(self.out_dir + '/*.service'):
77             if os.path.islink(service):
78                 continue
79             cp = RawConfigParser()
80             cp.optionxform = lambda o: o  # don't lower-case option names
81             with open(service) as f:
82                 cp.readfp(f)
83             results[os.path.basename(service)] = cp
84
85         return (err, results)
86
87     def add_sysv(self, fname, keys, enable=False, prio=1):
88         '''Create a SysV init script with the given keys in the LSB header
89
90         There are sensible default values for all fields.
91         If enable is True, links will be created in the rcN.d dirs. In that
92         case, the priority can be given with "prio" (default to 1).
93
94         Return path of generated script.
95         '''
96         name_without_sh = fname.endswith('.sh') and fname[:-3] or fname
97         keys.setdefault('Provides', name_without_sh)
98         keys.setdefault('Required-Start', '$local_fs')
99         keys.setdefault('Required-Stop', keys['Required-Start'])
100         keys.setdefault('Default-Start', '2 3 4 5')
101         keys.setdefault('Default-Stop', '0 1 6')
102         keys.setdefault('Short-Description', 'test %s service' %
103                         name_without_sh)
104         keys.setdefault('Description', 'long description for test %s service' %
105                         name_without_sh)
106         script = os.path.join(self.init_d_dir, fname)
107         with open(script, 'w') as f:
108             f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
109             for k, v in keys.items():
110                 if v is not None:
111                     f.write('#%20s %s\n' % (k + ':', v))
112             f.write('### END INIT INFO\ncode --goes here\n')
113         os.chmod(script, 0o755)
114
115         if enable:
116             def make_link(prefix, runlevel):
117                 d = os.path.join(self.rcnd_dir, 'rc%s.d' % runlevel)
118                 if not os.path.isdir(d):
119                     os.mkdir(d)
120                 os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
121
122             for rl in keys['Default-Start'].split():
123                 make_link('S%02i' % prio, rl)
124             for rl in keys['Default-Stop'].split():
125                 make_link('K%02i' % (99 - prio), rl)
126
127         return script
128
129     def assert_enabled(self, unit, targets):
130         '''assert that a unit is enabled in precisely the given targets'''
131
132         all_targets = ['multi-user', 'graphical']
133
134         # should be enabled
135         for target in all_targets:
136             link = os.path.join(self.out_dir, '%s.target.wants' % target, unit)
137             if target in targets:
138                 unit_file = os.readlink(link)
139                 self.assertTrue(os.path.exists(unit_file))
140                 self.assertEqual(os.path.basename(unit_file), unit)
141             else:
142                 self.assertFalse(os.path.exists(link),
143                                  '%s unexpectedly exists' % link)
144
145     #
146     # test cases
147     #
148
149     def test_nothing(self):
150         '''no input files'''
151
152         results = self.run_generator()[1]
153         self.assertEqual(results, {})
154         self.assertEqual(os.listdir(self.out_dir), [])
155
156     def test_simple_disabled(self):
157         '''simple service without dependencies, disabled'''
158
159         self.add_sysv('foo', {}, enable=False)
160         err, results = self.run_generator()
161         self.assertEqual(len(results), 1)
162
163         # no enablement links or other stuff
164         self.assertEqual(os.listdir(self.out_dir), ['foo.service'])
165
166         s = results['foo.service']
167         self.assertEqual(s.sections(), ['Unit', 'Service'])
168         self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
169         # $local_fs does not need translation, don't expect any dependency
170         # fields here
171         self.assertEqual(set(s.options('Unit')),
172                          set(['Documentation', 'SourcePath', 'Description']))
173
174         self.assertEqual(s.get('Service', 'Type'), 'forking')
175         init_script = os.path.join(self.init_d_dir, 'foo')
176         self.assertEqual(s.get('Service', 'ExecStart'),
177                          '%s start' % init_script)
178         self.assertEqual(s.get('Service', 'ExecStop'),
179                          '%s stop' % init_script)
180
181         self.assertNotIn('Overwriting', err)
182
183     def test_simple_enabled_all(self):
184         '''simple service without dependencies, enabled in all runlevels'''
185
186         self.add_sysv('foo', {}, enable=True)
187         err, results = self.run_generator()
188         self.assertEqual(list(results), ['foo.service'])
189         self.assert_enabled('foo.service', ['multi-user', 'graphical'])
190         self.assertNotIn('Overwriting', err)
191
192     def test_simple_enabled_some(self):
193         '''simple service without dependencies, enabled in some runlevels'''
194
195         self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True)
196         err, results = self.run_generator()
197         self.assertEqual(list(results), ['foo.service'])
198         self.assert_enabled('foo.service', ['multi-user'])
199
200     def test_lsb_macro_dep_single(self):
201         '''single LSB macro dependency: $network'''
202
203         self.add_sysv('foo', {'Required-Start': '$network'})
204         s = self.run_generator()[1]['foo.service']
205         self.assertEqual(set(s.options('Unit')),
206                          set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
207         self.assertEqual(s.get('Unit', 'After'), 'network-online.target')
208         self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target')
209
210     def test_lsb_macro_dep_multi(self):
211         '''multiple LSB macro dependencies'''
212
213         self.add_sysv('foo', {'Required-Start': '$named $portmap'})
214         s = self.run_generator()[1]['foo.service']
215         self.assertEqual(set(s.options('Unit')),
216                          set(['Documentation', 'SourcePath', 'Description', 'After']))
217         self.assertEqual(s.get('Unit', 'After'), 'nss-lookup.target rpcbind.target')
218
219     def test_lsb_deps(self):
220         '''LSB header dependencies to other services'''
221
222         # also give symlink priorities here; they should be ignored
223         self.add_sysv('foo', {'Required-Start': 'must1 must2',
224                               'Should-Start': 'may1 ne_may2'},
225                       enable=True, prio=40)
226         self.add_sysv('must1', {}, enable=True, prio=10)
227         self.add_sysv('must2', {}, enable=True, prio=15)
228         self.add_sysv('may1', {}, enable=True, prio=20)
229         # do not create ne_may2
230         err, results = self.run_generator()
231         self.assertEqual(sorted(results),
232                          ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
233
234         # foo should depend on all of them
235         self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()),
236                          ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
237
238         # other services should not depend on each other
239         self.assertFalse(results['must1.service'].has_option('Unit', 'After'))
240         self.assertFalse(results['must2.service'].has_option('Unit', 'After'))
241         self.assertFalse(results['may1.service'].has_option('Unit', 'After'))
242
243     def test_symlink_prio_deps(self):
244         '''script without LSB headers use rcN.d priority'''
245
246         # create two init.d scripts without LSB header and enable them with
247         # startup priorities
248         for prio, name in [(10, 'provider'), (15, 'consumer')]:
249             with open(os.path.join(self.init_d_dir, name), 'w') as f:
250                 f.write('#!/bin/init-d-interpreter\ncode --goes here\n')
251                 os.fchmod(f.fileno(), 0o755)
252
253             d = os.path.join(self.rcnd_dir, 'rc2.d')
254             if not os.path.isdir(d):
255                 os.mkdir(d)
256             os.symlink('../init.d/' + name, os.path.join(d, 'S%02i%s' % (prio, name)))
257
258         err, results = self.run_generator()
259         self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
260         self.assertFalse(results['provider.service'].has_option('Unit', 'After'))
261         self.assertEqual(results['consumer.service'].get('Unit', 'After'),
262                          'provider.service')
263
264     def test_multiple_provides(self):
265         '''multiple Provides: names'''
266
267         self.add_sysv('foo', {'Provides': 'foo bar baz'})
268         err, results = self.run_generator()
269         self.assertEqual(list(results), ['foo.service'])
270         self.assertEqual(set(results['foo.service'].options('Unit')),
271                          set(['Documentation', 'SourcePath', 'Description']))
272         # should create symlinks for the alternative names
273         for f in ['bar.service', 'baz.service']:
274             self.assertEqual(os.readlink(os.path.join(self.out_dir, f)),
275                              'foo.service')
276         self.assertNotIn('Overwriting', err)
277
278     def test_same_provides_in_multiple_scripts(self):
279         '''multiple init.d scripts provide the same name'''
280
281         self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1)
282         self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2)
283         err, results = self.run_generator()
284         self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
285         # should create symlink for the alternative name for either unit
286         self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')),
287                       ['foo.service', 'bar.service'])
288
289     def test_provide_other_script(self):
290         '''init.d scripts provides the name of another init.d script'''
291
292         self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True)
293         self.add_sysv('bar', {'Provides': 'bar'}, enable=True)
294         err, results = self.run_generator()
295         self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
296         # we do expect an overwrite here, bar.service should overwrite the
297         # alias link from foo.service
298         self.assertIn('Overwriting', err)
299
300     def test_nonexecutable_script(self):
301         '''ignores non-executable init.d script'''
302
303         os.chmod(self.add_sysv('foo', {}), 0o644)
304         err, results = self.run_generator()
305         self.assertEqual(results, {})
306
307     def test_sh_suffix(self):
308         '''init.d script with .sh suffix'''
309
310         self.add_sysv('foo.sh', {}, enable=True)
311         err, results = self.run_generator()
312         s = results['foo.service']
313
314         self.assertEqual(s.sections(), ['Unit', 'Service'])
315         # should not have a .sh
316         self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
317
318         # calls correct script with .sh
319         init_script = os.path.join(self.init_d_dir, 'foo.sh')
320         self.assertEqual(s.get('Service', 'ExecStart'),
321                          '%s start' % init_script)
322         self.assertEqual(s.get('Service', 'ExecStop'),
323                          '%s stop' % init_script)
324
325         self.assert_enabled('foo.service', ['multi-user', 'graphical'])
326
327     def test_sh_suffix_with_provides(self):
328         '''init.d script with .sh suffix and Provides:'''
329
330         self.add_sysv('foo.sh', {'Provides': 'foo bar'})
331         err, results = self.run_generator()
332         # ensure we don't try to create a symlink to itself
333         self.assertNotIn(err, 'itself')
334         self.assertEqual(list(results), ['foo.service'])
335         self.assertEqual(results['foo.service'].get('Unit', 'Description'),
336                          'LSB: test foo service')
337
338         # should create symlink for the alternative name
339         self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')),
340                          'foo.service')
341
342     def test_hidden_files(self):
343         '''init.d script with hidden file suffix'''
344
345         script = self.add_sysv('foo', {}, enable=True)
346         # backup files (not enabled in rcN.d/)
347         shutil.copy(script, script + '.dpkg-new')
348         shutil.copy(script, script + '.dpkg-dist')
349         shutil.copy(script, script + '.swp')
350         shutil.copy(script, script + '.rpmsave')
351
352         err, results = self.run_generator()
353         self.assertEqual(list(results), ['foo.service'])
354
355         self.assert_enabled('foo.service', ['multi-user', 'graphical'])
356
357     def test_backup_file(self):
358         '''init.d script with backup file'''
359
360         script = self.add_sysv('foo', {}, enable=True)
361         # backup files (not enabled in rcN.d/)
362         shutil.copy(script, script + '.bak')
363         shutil.copy(script, script + '.old')
364
365         err, results = self.run_generator()
366         print(err)
367         self.assertEqual(sorted(results),
368                          ['foo.bak.service', 'foo.old.service', 'foo.service'])
369
370         # ensure we don't try to create a symlink to itself
371         self.assertNotIn(err, 'itself')
372
373         self.assert_enabled('foo.service', ['multi-user', 'graphical'])
374         self.assert_enabled('foo.bak.service', [])
375         self.assert_enabled('foo.old.service', [])
376
377     def test_existing_native_unit(self):
378         '''existing native unit'''
379
380         with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f:
381             f.write('[Unit]\n')
382
383         self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True)
384         err, results = self.run_generator()
385         self.assertEqual(list(results), [])
386         # no enablement or alias links, as native unit is disabled
387         self.assertEqual(os.listdir(self.out_dir), [])
388
389
390 if __name__ == '__main__':
391     unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))