chiark / gitweb /
systemd-sysv-generator test: test scripts with hidden suffixes
[elogind.git] / test / sysv-generator-test.py
1 # systemd-sysv-generator integration test
2 #
3 # (C) 2015 Canonical Ltd.
4 # Author: Martin Pitt <martin.pitt@ubuntu.com>
5 #
6 # systemd is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as published by
8 # the Free Software Foundation; either version 2.1 of the License, or
9 # (at your option) any later version.
10
11 # systemd is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # Lesser General Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public License
17 # along with systemd; If not, see <http://www.gnu.org/licenses/>.
18
19 import unittest
20 import sys
21 import os
22 import subprocess
23 import tempfile
24 import shutil
25 from glob import glob
26
27 try:
28     from configparser import RawConfigParser
29 except ImportError:
30     # python 2
31     from ConfigParser import RawConfigParser
32
33 sysv_generator = os.path.join(os.environ.get('builddir', '.'), 'systemd-sysv-generator')
34
35
36 class SysvGeneratorTest(unittest.TestCase):
37     def setUp(self):
38         self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.')
39         self.init_d_dir = os.path.join(self.workdir, 'init.d')
40         os.mkdir(self.init_d_dir)
41         self.rcnd_dir = self.workdir
42         self.unit_dir = os.path.join(self.workdir, 'systemd')
43         os.mkdir(self.unit_dir)
44         self.out_dir = os.path.join(self.workdir, 'output')
45         os.mkdir(self.out_dir)
46
47     def tearDown(self):
48         shutil.rmtree(self.workdir)
49
50     #
51     # Helper methods
52     #
53
54     def run_generator(self, expect_error=False):
55         '''Run sysv-generator.
56
57         Fail if stderr contains any "Fail", unless expect_error is True.
58         Return (stderr, filename -> ConfigParser) pair with ouput to stderr and
59         parsed generated units.
60         '''
61         env = os.environ.copy()
62         env['SYSTEMD_LOG_LEVEL'] = 'debug'
63         env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir
64         env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir
65         env['SYSTEMD_UNIT_PATH'] = self.unit_dir
66         gen = subprocess.Popen(
67             [sysv_generator, 'ignored', 'ignored', self.out_dir],
68             stdout=subprocess.PIPE, stderr=subprocess.PIPE,
69             universal_newlines=True, env=env)
70         (out, err) = gen.communicate()
71         if not expect_error:
72             self.assertFalse('Fail' in err, err)
73         self.assertEqual(gen.returncode, 0, err)
74
75         results = {}
76         for service in glob(self.out_dir + '/*.service'):
77             if os.path.islink(service):
78                 continue
79             cp = RawConfigParser()
80             cp.optionxform = lambda o: o  # don't lower-case option names
81             with open(service) as f:
82                 cp.readfp(f)
83             results[os.path.basename(service)] = cp
84
85         return (err, results)
86
87     def add_sysv(self, fname, keys, enable=False, prio=1):
88         '''Create a SysV init script with the given keys in the LSB header
89
90         There are sensible default values for all fields.
91         If enable is True, links will be created in the rcN.d dirs. In that
92         case, the priority can be given with "prio" (default to 1).
93
94         Return path of generated script.
95         '''
96         name_without_sh = fname.endswith('.sh') and fname[:-3] or fname
97         keys.setdefault('Provides', name_without_sh)
98         keys.setdefault('Required-Start', '$local_fs')
99         keys.setdefault('Required-Stop', keys['Required-Start'])
100         keys.setdefault('Default-Start', '2 3 4 5')
101         keys.setdefault('Default-Stop', '0 1 6')
102         keys.setdefault('Short-Description', 'test %s service' %
103                         name_without_sh)
104         keys.setdefault('Description', 'long description for test %s service' %
105                         name_without_sh)
106         script = os.path.join(self.init_d_dir, fname)
107         with open(script, 'w') as f:
108             f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
109             for k, v in keys.items():
110                 if v is not None:
111                     f.write('#%20s %s\n' % (k + ':', v))
112             f.write('### END INIT INFO\ncode --goes here\n')
113         os.chmod(script, 0o755)
114
115         if enable:
116             def make_link(prefix, runlevel):
117                 d = os.path.join(self.rcnd_dir, 'rc%s.d' % runlevel)
118                 if not os.path.isdir(d):
119                     os.mkdir(d)
120                 os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
121
122             for rl in keys['Default-Start'].split():
123                 make_link('S%02i' % prio, rl)
124             for rl in keys['Default-Stop'].split():
125                 make_link('K%02i' % (99 - prio), rl)
126
127         return script
128
129     def assert_enabled(self, unit, runlevels):
130         '''assert that a unit is enabled in precisely the given runlevels'''
131
132         all_runlevels = [2, 3, 4, 5]
133
134         # should be enabled
135         for runlevel in all_runlevels:
136             link = os.path.join(self.out_dir, 'runlevel%i.target.wants' % runlevel, unit)
137             if runlevel in runlevels:
138                 target = os.readlink(link)
139                 self.assertTrue(os.path.exists(target))
140                 self.assertEqual(os.path.basename(target), unit)
141             else:
142                 self.assertFalse(os.path.exists(link),
143                                  '%s unexpectedly exists' % link)
144
145     #
146     # test cases
147     #
148
149     def test_nothing(self):
150         '''no input files'''
151
152         results = self.run_generator()[1]
153         self.assertEqual(results, {})
154         self.assertEqual(os.listdir(self.out_dir), [])
155
156     def test_simple_disabled(self):
157         '''simple service without dependencies, disabled'''
158
159         self.add_sysv('foo', {}, enable=False)
160         err, results = self.run_generator()
161         self.assertEqual(len(results), 1)
162
163         # no enablement links or other stuff
164         self.assertEqual(os.listdir(self.out_dir), ['foo.service'])
165
166         s = results['foo.service']
167         self.assertEqual(s.sections(), ['Unit', 'Service'])
168         self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
169         # $local_fs does not need translation, don't expect any dependency
170         # fields here
171         self.assertEqual(set(s.options('Unit')),
172                          set(['Documentation', 'SourcePath', 'Description']))
173
174         self.assertEqual(s.get('Service', 'Type'), 'forking')
175         init_script = os.path.join(self.init_d_dir, 'foo')
176         self.assertEqual(s.get('Service', 'ExecStart'),
177                          '%s start' % init_script)
178         self.assertEqual(s.get('Service', 'ExecStop'),
179                          '%s stop' % init_script)
180
181     def test_simple_enabled_all(self):
182         '''simple service without dependencies, enabled in all runlevels'''
183
184         self.add_sysv('foo', {}, enable=True)
185         err, results = self.run_generator()
186         self.assertEqual(list(results), ['foo.service'])
187         self.assert_enabled('foo.service', [2, 3, 4, 5])
188
189     def test_simple_enabled_some(self):
190         '''simple service without dependencies, enabled in some runlevels'''
191
192         self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True)
193         err, results = self.run_generator()
194         self.assertEqual(list(results), ['foo.service'])
195         self.assert_enabled('foo.service', [2, 4])
196
197     def test_lsb_macro_dep_single(self):
198         '''single LSB macro dependency: $network'''
199
200         self.add_sysv('foo', {'Required-Start': '$network'})
201         s = self.run_generator()[1]['foo.service']
202         self.assertEqual(set(s.options('Unit')),
203                          set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
204         self.assertEqual(s.get('Unit', 'After'), 'network-online.target')
205         self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target')
206
207     def test_lsb_macro_dep_multi(self):
208         '''multiple LSB macro dependencies'''
209
210         self.add_sysv('foo', {'Required-Start': '$named $portmap'})
211         s = self.run_generator()[1]['foo.service']
212         self.assertEqual(set(s.options('Unit')),
213                          set(['Documentation', 'SourcePath', 'Description', 'After']))
214         self.assertEqual(s.get('Unit', 'After'), 'nss-lookup.target rpcbind.target')
215
216     def test_lsb_deps(self):
217         '''LSB header dependencies to other services'''
218
219         # also give symlink priorities here; they should be ignored
220         self.add_sysv('foo', {'Required-Start': 'must1 must2',
221                               'Should-Start': 'may1 ne_may2'},
222                       enable=True, prio=40)
223         self.add_sysv('must1', {}, enable=True, prio=10)
224         self.add_sysv('must2', {}, enable=True, prio=15)
225         self.add_sysv('may1', {}, enable=True, prio=20)
226         # do not create ne_may2
227         err, results = self.run_generator()
228         self.assertEqual(sorted(results),
229                          ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
230
231         # foo should depend on all of them
232         self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()),
233                          ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
234
235         # other services should not depend on each other
236         self.assertFalse(results['must1.service'].has_option('Unit', 'After'))
237         self.assertFalse(results['must2.service'].has_option('Unit', 'After'))
238         self.assertFalse(results['may1.service'].has_option('Unit', 'After'))
239
240     def test_symlink_prio_deps(self):
241         '''script without LSB headers use rcN.d priority'''
242
243         # create two init.d scripts without LSB header and enable them with
244         # startup priorities
245         for prio, name in [(10, 'provider'), (15, 'consumer')]:
246             with open(os.path.join(self.init_d_dir, name), 'w') as f:
247                 f.write('#!/bin/init-d-interpreter\ncode --goes here\n')
248                 os.fchmod(f.fileno(), 0o755)
249
250             d = os.path.join(self.rcnd_dir, 'rc2.d')
251             if not os.path.isdir(d):
252                 os.mkdir(d)
253             os.symlink('../init.d/' + name, os.path.join(d, 'S%02i%s' % (prio, name)))
254
255         err, results = self.run_generator()
256         self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
257         self.assertFalse(results['provider.service'].has_option('Unit', 'After'))
258         self.assertEqual(results['consumer.service'].get('Unit', 'After'),
259                          'provider.service')
260
261     def test_multiple_provides(self):
262         '''multiple Provides: names'''
263
264         self.add_sysv('foo', {'Provides': 'foo bar baz'})
265         err, results = self.run_generator()
266         self.assertEqual(list(results), ['foo.service'])
267         self.assertEqual(set(results['foo.service'].options('Unit')),
268                          set(['Documentation', 'SourcePath', 'Description']))
269         # should create symlinks for the alternative names
270         for f in ['bar.service', 'baz.service']:
271             self.assertEqual(os.readlink(os.path.join(self.out_dir, f)),
272                              'foo.service')
273
274     def test_nonexecutable_script(self):
275         '''ignores non-executable init.d script'''
276
277         os.chmod(self.add_sysv('foo', {}), 0o644)
278         err, results = self.run_generator()
279         self.assertEqual(results, {})
280
281     def test_sh_suffix(self):
282         '''init.d script with .sh suffix'''
283
284         self.add_sysv('foo.sh', {}, enable=True)
285         err, results = self.run_generator()
286         s = results['foo.service']
287
288         self.assertEqual(s.sections(), ['Unit', 'Service'])
289         # should not have a .sh
290         self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
291
292         # calls correct script with .sh
293         init_script = os.path.join(self.init_d_dir, 'foo.sh')
294         self.assertEqual(s.get('Service', 'ExecStart'),
295                          '%s start' % init_script)
296         self.assertEqual(s.get('Service', 'ExecStop'),
297                          '%s stop' % init_script)
298
299         self.assert_enabled('foo.service', [2, 3, 4, 5])
300
301     def test_sh_suffix_with_provides(self):
302         '''init.d script with .sh suffix and Provides:'''
303
304         self.add_sysv('foo.sh', {'Provides': 'foo bar'})
305         err, results = self.run_generator()
306         # ensure we don't try to create a symlink to itself
307         self.assertNotIn(err, 'itself')
308         self.assertEqual(list(results), ['foo.service'])
309         self.assertEqual(results['foo.service'].get('Unit', 'Description'),
310                          'LSB: test foo service')
311
312         # should create symlink for the alternative name
313         self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')),
314                          'foo.service')
315
316     def test_hidden_files(self):
317         '''init.d script with hidden file suffix'''
318
319         script = self.add_sysv('foo', {}, enable=True)
320         # backup files (not enabled in rcN.d/)
321         shutil.copy(script, script + '.dpkg-new')
322         shutil.copy(script, script + '.dpkg-dist')
323         shutil.copy(script, script + '.swp')
324         shutil.copy(script, script + '.rpmsave')
325
326         err, results = self.run_generator()
327         self.assertEqual(list(results), ['foo.service'])
328
329         self.assert_enabled('foo.service', [2, 3, 4, 5])
330
331
332 if __name__ == '__main__':
333     unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))