chiark / gitweb /
09f5c0176296310d27df3855cc2a44089e6b9e60
[elogind.git] / test / sysv-generator-test.py
1 # systemd-sysv-generator integration test
2 #
3 # (C) 2015 Canonical Ltd.
4 # Author: Martin Pitt <martin.pitt@ubuntu.com>
5 #
6 # systemd is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as published by
8 # the Free Software Foundation; either version 2.1 of the License, or
9 # (at your option) any later version.
10
11 # systemd is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # Lesser General Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public License
17 # along with systemd; If not, see <http://www.gnu.org/licenses/>.
18
19 import unittest
20 import sys
21 import os
22 import subprocess
23 import tempfile
24 import shutil
25 from glob import glob
26
27 try:
28     from configparser import RawConfigParser
29 except ImportError:
30     # python 2
31     from ConfigParser import RawConfigParser
32
33 sysv_generator = os.path.join(os.environ.get('builddir', '.'), 'systemd-sysv-generator')
34
35
36 class SysvGeneratorTest(unittest.TestCase):
37     def setUp(self):
38         self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.')
39         self.init_d_dir = os.path.join(self.workdir, 'init.d')
40         os.mkdir(self.init_d_dir)
41         self.rcnd_dir = self.workdir
42         self.unit_dir = os.path.join(self.workdir, 'systemd')
43         os.mkdir(self.unit_dir)
44         self.out_dir = os.path.join(self.workdir, 'output')
45         os.mkdir(self.out_dir)
46
47     def tearDown(self):
48         shutil.rmtree(self.workdir)
49
50     #
51     # Helper methods
52     #
53
54     def run_generator(self, expect_error=False):
55         '''Run sysv-generator.
56
57         Fail if stderr contains any "Fail", unless expect_error is True.
58         Return (stderr, filename -> ConfigParser) pair with ouput to stderr and
59         parsed generated units.
60         '''
61         env = os.environ.copy()
62         env['SYSTEMD_LOG_LEVEL'] = 'debug'
63         env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir
64         env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir
65         env['SYSTEMD_UNIT_PATH'] = self.unit_dir
66         gen = subprocess.Popen(
67             [sysv_generator, 'ignored', 'ignored', self.out_dir],
68             stdout=subprocess.PIPE, stderr=subprocess.PIPE,
69             universal_newlines=True, env=env)
70         (out, err) = gen.communicate()
71         if not expect_error:
72             self.assertFalse('Fail' in err, err)
73         self.assertEqual(gen.returncode, 0, err)
74
75         results = {}
76         for service in glob(self.out_dir + '/*.service'):
77             if os.path.islink(service):
78                 continue
79             cp = RawConfigParser()
80             cp.optionxform = lambda o: o  # don't lower-case option names
81             with open(service) as f:
82                 cp.readfp(f)
83             results[os.path.basename(service)] = cp
84
85         return (err, results)
86
87     def add_sysv(self, fname, keys, enable=False, prio=1):
88         '''Create a SysV init script with the given keys in the LSB header
89
90         There are sensible default values for all fields.
91         If enable is True, links will be created in the rcN.d dirs. In that
92         case, the priority can be given with "prio" (default to 1).
93
94         Return path of generated script.
95         '''
96         name_without_sh = fname.endswith('.sh') and fname[:-3] or fname
97         keys.setdefault('Provides', name_without_sh)
98         keys.setdefault('Required-Start', '$local_fs')
99         keys.setdefault('Required-Stop', keys['Required-Start'])
100         keys.setdefault('Default-Start', '2 3 4 5')
101         keys.setdefault('Default-Stop', '0 1 6')
102         keys.setdefault('Short-Description', 'test %s service' %
103                         name_without_sh)
104         keys.setdefault('Description', 'long description for test %s service' %
105                         name_without_sh)
106         script = os.path.join(self.init_d_dir, fname)
107         with open(script, 'w') as f:
108             f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
109             for k, v in keys.items():
110                 if v is not None:
111                     f.write('#%20s %s\n' % (k + ':', v))
112             f.write('### END INIT INFO\ncode --goes here\n')
113         os.chmod(script, 0o755)
114
115         if enable:
116             def make_link(prefix, runlevel):
117                 d = os.path.join(self.rcnd_dir, 'rc%s.d' % runlevel)
118                 if not os.path.isdir(d):
119                     os.mkdir(d)
120                 os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
121
122             for rl in keys['Default-Start'].split():
123                 make_link('S%02i' % prio, rl)
124             for rl in keys['Default-Stop'].split():
125                 make_link('K%02i' % (99 - prio), rl)
126
127         return script
128
129     def assert_enabled(self, unit, runlevels):
130         '''assert that a unit is enabled in precisely the given runlevels'''
131
132         all_runlevels = [2, 3, 4, 5]
133
134         # should be enabled
135         for runlevel in all_runlevels:
136             link = os.path.join(self.out_dir, 'runlevel%i.target.wants' % runlevel, unit)
137             if runlevel in runlevels:
138                 target = os.readlink(link)
139                 self.assertTrue(os.path.exists(target))
140                 self.assertEqual(os.path.basename(target), unit)
141             else:
142                 self.assertFalse(os.path.exists(link),
143                                  '%s unexpectedly exists' % link)
144
145     #
146     # test cases
147     #
148
149     def test_nothing(self):
150         '''no input files'''
151
152         results = self.run_generator()[1]
153         self.assertEqual(results, {})
154         self.assertEqual(os.listdir(self.out_dir), [])
155
156     def test_simple_disabled(self):
157         '''simple service without dependencies, disabled'''
158
159         self.add_sysv('foo', {}, enable=False)
160         err, results = self.run_generator()
161         self.assertEqual(len(results), 1)
162
163         # no enablement links or other stuff
164         self.assertEqual(os.listdir(self.out_dir), ['foo.service'])
165
166         s = results['foo.service']
167         self.assertEqual(s.sections(), ['Unit', 'Service'])
168         self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
169         # $local_fs does not need translation, don't expect any dependency
170         # fields here
171         self.assertEqual(set(s.options('Unit')),
172                          set(['Documentation', 'SourcePath', 'Description']))
173
174         self.assertEqual(s.get('Service', 'Type'), 'forking')
175         init_script = os.path.join(self.init_d_dir, 'foo')
176         self.assertEqual(s.get('Service', 'ExecStart'),
177                          '%s start' % init_script)
178         self.assertEqual(s.get('Service', 'ExecStop'),
179                          '%s stop' % init_script)
180
181     def test_simple_enabled_all(self):
182         '''simple service without dependencies, enabled in all runlevels'''
183
184         self.add_sysv('foo', {}, enable=True)
185         err, results = self.run_generator()
186         self.assertEqual(list(results), ['foo.service'])
187         self.assert_enabled('foo.service', [2, 3, 4, 5])
188
189     def test_simple_enabled_some(self):
190         '''simple service without dependencies, enabled in some runlevels'''
191
192         self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True)
193         err, results = self.run_generator()
194         self.assertEqual(list(results), ['foo.service'])
195         self.assert_enabled('foo.service', [2, 4])
196
197     def test_lsb_macro_dep_single(self):
198         '''single LSB macro dependency: $network'''
199
200         self.add_sysv('foo', {'Required-Start': '$network'})
201         s = self.run_generator()[1]['foo.service']
202         self.assertEqual(set(s.options('Unit')),
203                          set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
204         self.assertEqual(s.get('Unit', 'After'), 'network-online.target')
205         self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target')
206
207     def test_lsb_macro_dep_multi(self):
208         '''multiple LSB macro dependencies'''
209
210         self.add_sysv('foo', {'Required-Start': '$named $portmap'})
211         s = self.run_generator()[1]['foo.service']
212         self.assertEqual(set(s.options('Unit')),
213                          set(['Documentation', 'SourcePath', 'Description', 'After']))
214         self.assertEqual(s.get('Unit', 'After'), 'nss-lookup.target rpcbind.target')
215
216     def test_lsb_deps(self):
217         '''LSB header dependencies to other services'''
218
219         # also give symlink priorities here; they should be ignored
220         self.add_sysv('foo', {'Required-Start': 'must1 must2',
221                               'Should-Start': 'may1 ne_may2'},
222                       enable=True, prio=40)
223         self.add_sysv('must1', {}, enable=True, prio=10)
224         self.add_sysv('must2', {}, enable=True, prio=15)
225         self.add_sysv('may1', {}, enable=True, prio=20)
226         # do not create ne_may2
227         err, results = self.run_generator()
228         self.assertEqual(sorted(results),
229                          ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
230
231         # foo should depend on all of them
232         self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()),
233                          ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
234
235         # other services should not depend on each other
236         self.assertFalse(results['must1.service'].has_option('Unit', 'After'))
237         self.assertFalse(results['must2.service'].has_option('Unit', 'After'))
238         self.assertFalse(results['may1.service'].has_option('Unit', 'After'))
239
240     def test_symlink_prio_deps(self):
241         '''script without LSB headers use rcN.d priority'''
242
243         # create two init.d scripts without LSB header and enable them with
244         # startup priorities
245         for prio, name in [(10, 'provider'), (15, 'consumer')]:
246             with open(os.path.join(self.init_d_dir, name), 'w') as f:
247                 f.write('#!/bin/init-d-interpreter\ncode --goes here\n')
248                 os.fchmod(f.fileno(), 0o755)
249
250             d = os.path.join(self.rcnd_dir, 'rc2.d')
251             if not os.path.isdir(d):
252                 os.mkdir(d)
253             os.symlink('../init.d/' + name, os.path.join(d, 'S%02i%s' % (prio, name)))
254
255         err, results = self.run_generator()
256         self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
257         self.assertFalse(results['provider.service'].has_option('Unit', 'After'))
258         self.assertEqual(results['consumer.service'].get('Unit', 'After'),
259                          'provider.service')
260
261     def test_multiple_provides(self):
262         '''multiple Provides: names'''
263
264         self.add_sysv('foo', {'Provides': 'foo bar baz'})
265         err, results = self.run_generator()
266         self.assertEqual(list(results), ['foo.service'])
267         self.assertEqual(set(results['foo.service'].options('Unit')),
268                          set(['Documentation', 'SourcePath', 'Description']))
269         # should create symlinks for the alternative names
270         for f in ['bar.service', 'baz.service']:
271             self.assertEqual(os.readlink(os.path.join(self.out_dir, f)),
272                              'foo.service')
273
274     def test_same_provides_in_multiple_scripts(self):
275         '''multiple init.d scripts provide the same name'''
276
277         self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1)
278         self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2)
279         err, results = self.run_generator()
280         self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
281         # should create symlink for the alternative name for either unit
282         self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')),
283                       ['foo.service', 'bar.service'])
284
285     def test_provide_other_script(self):
286         '''init.d scripts provides the name of another init.d script'''
287
288         self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True)
289         self.add_sysv('bar', {'Provides': 'bar'}, enable=True)
290         err, results = self.run_generator()
291         self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
292
293     def test_nonexecutable_script(self):
294         '''ignores non-executable init.d script'''
295
296         os.chmod(self.add_sysv('foo', {}), 0o644)
297         err, results = self.run_generator()
298         self.assertEqual(results, {})
299
300     def test_sh_suffix(self):
301         '''init.d script with .sh suffix'''
302
303         self.add_sysv('foo.sh', {}, enable=True)
304         err, results = self.run_generator()
305         s = results['foo.service']
306
307         self.assertEqual(s.sections(), ['Unit', 'Service'])
308         # should not have a .sh
309         self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
310
311         # calls correct script with .sh
312         init_script = os.path.join(self.init_d_dir, 'foo.sh')
313         self.assertEqual(s.get('Service', 'ExecStart'),
314                          '%s start' % init_script)
315         self.assertEqual(s.get('Service', 'ExecStop'),
316                          '%s stop' % init_script)
317
318         self.assert_enabled('foo.service', [2, 3, 4, 5])
319
320     def test_sh_suffix_with_provides(self):
321         '''init.d script with .sh suffix and Provides:'''
322
323         self.add_sysv('foo.sh', {'Provides': 'foo bar'})
324         err, results = self.run_generator()
325         # ensure we don't try to create a symlink to itself
326         self.assertNotIn(err, 'itself')
327         self.assertEqual(list(results), ['foo.service'])
328         self.assertEqual(results['foo.service'].get('Unit', 'Description'),
329                          'LSB: test foo service')
330
331         # should create symlink for the alternative name
332         self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')),
333                          'foo.service')
334
335     def test_hidden_files(self):
336         '''init.d script with hidden file suffix'''
337
338         script = self.add_sysv('foo', {}, enable=True)
339         # backup files (not enabled in rcN.d/)
340         shutil.copy(script, script + '.dpkg-new')
341         shutil.copy(script, script + '.dpkg-dist')
342         shutil.copy(script, script + '.swp')
343         shutil.copy(script, script + '.rpmsave')
344
345         err, results = self.run_generator()
346         self.assertEqual(list(results), ['foo.service'])
347
348         self.assert_enabled('foo.service', [2, 3, 4, 5])
349
350     def test_backup_file(self):
351         '''init.d script with backup file'''
352
353         script = self.add_sysv('foo', {}, enable=True)
354         # backup files (not enabled in rcN.d/)
355         shutil.copy(script, script + '.bak')
356         shutil.copy(script, script + '.old')
357
358         err, results = self.run_generator()
359         print(err)
360         self.assertEqual(sorted(results),
361                          ['foo.bak.service', 'foo.old.service', 'foo.service'])
362
363         # ensure we don't try to create a symlink to itself
364         self.assertNotIn(err, 'itself')
365
366         self.assert_enabled('foo.service', [2, 3, 4, 5])
367         self.assert_enabled('foo.bak.service', [])
368         self.assert_enabled('foo.old.service', [])
369
370     def test_existing_native_unit(self):
371         '''existing native unit'''
372
373         with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f:
374             f.write('[Unit]\n')
375
376         self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True)
377         err, results = self.run_generator()
378         self.assertEqual(list(results), [])
379         # no enablement or alias links, as native unit is disabled
380         self.assertEqual(os.listdir(self.out_dir), [])
381
382
383 if __name__ == '__main__':
384     unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))