chiark / gitweb /
test: add integration test for systemd-sysv-generator
[elogind.git] / test / sysv-generator-test.py
1 # systemd-sysv-generator integration test
2 #
3 # (C) 2015 Canonical Ltd.
4 # Author: Martin Pitt <martin.pitt@ubuntu.com>
5 #
6 # systemd is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as published by
8 # the Free Software Foundation; either version 2.1 of the License, or
9 # (at your option) any later version.
10
11 # systemd is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # Lesser General Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public License
17 # along with systemd; If not, see <http://www.gnu.org/licenses/>.
18
19 import unittest
20 import sys
21 import os
22 import subprocess
23 import tempfile
24 import shutil
25 from glob import glob
26
27 try:
28     from configparser import RawConfigParser
29 except ImportError:
30     # python 2
31     from ConfigParser import RawConfigParser
32
33 sysv_generator = os.path.join(os.environ.get('builddir', '.'), 'systemd-sysv-generator')
34
35
36 class SysvGeneratorTest(unittest.TestCase):
37     def setUp(self):
38         self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.')
39         self.init_d_dir = os.path.join(self.workdir, 'init.d')
40         os.mkdir(self.init_d_dir)
41         self.rcnd_dir = self.workdir
42         self.unit_dir = os.path.join(self.workdir, 'systemd')
43         os.mkdir(self.unit_dir)
44         self.out_dir = os.path.join(self.workdir, 'output')
45         os.mkdir(self.out_dir)
46
47     def tearDown(self):
48         shutil.rmtree(self.workdir)
49
50     #
51     # Helper methods
52     #
53
54     def run_generator(self, expect_error=False):
55         '''Run sysv-generator.
56
57         Fail if stderr contains any "Fail", unless expect_error is True.
58         Return (stderr, filename -> ConfigParser) pair with ouput to stderr and
59         parsed generated units.
60         '''
61         env = os.environ.copy()
62         env['SYSTEMD_LOG_LEVEL'] = 'debug'
63         env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir
64         env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir
65         env['SYSTEMD_UNIT_PATH'] = self.unit_dir
66         gen = subprocess.Popen(
67             [sysv_generator, 'ignored', 'ignored', self.out_dir],
68             stdout=subprocess.PIPE, stderr=subprocess.PIPE,
69             universal_newlines=True, env=env)
70         (out, err) = gen.communicate()
71         if not expect_error:
72             self.assertFalse('Fail' in err, err)
73         self.assertEqual(gen.returncode, 0, err)
74
75         results = {}
76         for service in glob(self.out_dir + '/*.service'):
77             cp = RawConfigParser()
78             cp.optionxform = lambda o: o  # don't lower-case option names
79             with open(service) as f:
80                 cp.readfp(f)
81             results[os.path.basename(service)] = cp
82
83         return (err, results)
84
85     def add_sysv(self, fname, keys, enable=False, prio=1):
86         '''Create a SysV init script with the given keys in the LSB header
87
88         There are sensible default values for all fields.
89         If enable is True, links will be created in the rcN.d dirs. In that
90         case, the priority can be given with "prio" (default to 1).
91
92         Return path of generated script.
93         '''
94         name_without_sh = fname.endswith('.sh') and fname[:-3] or fname
95         keys.setdefault('Provides', name_without_sh)
96         keys.setdefault('Required-Start', '$local_fs')
97         keys.setdefault('Required-Stop', keys['Required-Start'])
98         keys.setdefault('Default-Start', '2 3 4 5')
99         keys.setdefault('Default-Stop', '0 1 6')
100         keys.setdefault('Short-Description', 'test %s service' %
101                         name_without_sh)
102         keys.setdefault('Description', 'long description for test %s service' %
103                         name_without_sh)
104         script = os.path.join(self.init_d_dir, fname)
105         with open(script, 'w') as f:
106             f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
107             for k, v in keys.items():
108                 if v is not None:
109                     f.write('#%20s %s\n' % (k + ':', v))
110             f.write('### END INIT INFO\ncode --goes here\n')
111         os.chmod(script, 0o755)
112
113         if enable:
114             def make_link(prefix, runlevel):
115                 d = os.path.join(self.rcnd_dir, 'rc%s.d' % runlevel)
116                 if not os.path.isdir(d):
117                     os.mkdir(d)
118                 os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
119
120             for rl in keys['Default-Start'].split():
121                 make_link('S%02i' % prio, rl)
122             for rl in keys['Default-Stop'].split():
123                 make_link('K%02i' % (99 - prio), rl)
124
125         return script
126
127     def assert_enabled(self, unit, runlevels):
128         '''assert that a unit is enabled in precisely the given runlevels'''
129
130         all_runlevels = [2, 3, 4, 5]
131
132         # should be enabled
133         for runlevel in all_runlevels:
134             link = os.path.join(self.out_dir, 'runlevel%i.target.wants' % runlevel, 'foo.service')
135             if runlevel in runlevels:
136                 target = os.readlink(link)
137                 self.assertTrue(os.path.exists(target))
138                 self.assertEqual(os.path.basename(target), 'foo.service')
139             else:
140                 self.assertFalse(os.path.exists(link),
141                                  '%s unexpectedly exists' % link)
142
143     #
144     # test cases
145     #
146
147     def test_nothing(self):
148         '''no input files'''
149
150         results = self.run_generator()[1]
151         self.assertEqual(results, {})
152         self.assertEqual(os.listdir(self.out_dir), [])
153
154     def test_simple_disabled(self):
155         '''simple service without dependencies, disabled'''
156
157         self.add_sysv('foo', {}, enable=False)
158         err, results = self.run_generator()
159         self.assertEqual(len(results), 1)
160
161         # no enablement links or other stuff
162         self.assertEqual(os.listdir(self.out_dir), ['foo.service'])
163
164         s = results['foo.service']
165         self.assertEqual(s.sections(), ['Unit', 'Service'])
166         self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
167         # $local_fs does not need translation, don't expect any dependency
168         # fields here
169         self.assertEqual(set(s.options('Unit')),
170                          set(['Documentation', 'SourcePath', 'Description']))
171
172         self.assertEqual(s.get('Service', 'Type'), 'forking')
173         init_script = os.path.join(self.init_d_dir, 'foo')
174         self.assertEqual(s.get('Service', 'ExecStart'),
175                          '%s start' % init_script)
176         self.assertEqual(s.get('Service', 'ExecStop'),
177                          '%s stop' % init_script)
178
179     def test_simple_enabled_all(self):
180         '''simple service without dependencies, enabled in all runlevels'''
181
182         self.add_sysv('foo', {}, enable=True)
183         err, results = self.run_generator()
184         self.assertEqual(list(results), ['foo.service'])
185         self.assert_enabled('foo.service', [2, 3, 4, 5])
186
187     def test_simple_enabled_some(self):
188         '''simple service without dependencies, enabled in some runlevels'''
189
190         self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True)
191         err, results = self.run_generator()
192         self.assertEqual(list(results), ['foo.service'])
193         self.assert_enabled('foo.service', [2, 4])
194
195     def test_lsb_macro_dep_single(self):
196         '''single LSB macro dependency: $network'''
197
198         self.add_sysv('foo', {'Required-Start': '$network'})
199         s = self.run_generator()[1]['foo.service']
200         self.assertEqual(set(s.options('Unit')),
201                          set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
202         self.assertEqual(s.get('Unit', 'After'), 'network-online.target')
203         self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target')
204
205     def test_lsb_macro_dep_multi(self):
206         '''multiple LSB macro dependencies'''
207
208         self.add_sysv('foo', {'Required-Start': '$named $portmap'})
209         s = self.run_generator()[1]['foo.service']
210         self.assertEqual(set(s.options('Unit')),
211                          set(['Documentation', 'SourcePath', 'Description', 'After']))
212         self.assertEqual(s.get('Unit', 'After'), 'nss-lookup.target rpcbind.target')
213
214     def test_lsb_deps(self):
215         '''LSB header dependencies to other services'''
216
217         # also give symlink priorities here; they should be ignored
218         self.add_sysv('foo', {'Required-Start': 'must1 must2',
219                               'Should-Start': 'may1 ne_may2'},
220                       enable=True, prio=40)
221         self.add_sysv('must1', {}, enable=True, prio=10)
222         self.add_sysv('must2', {}, enable=True, prio=15)
223         self.add_sysv('may1', {}, enable=True, prio=20)
224         # do not create ne_may2
225         err, results = self.run_generator()
226         self.assertEqual(sorted(results),
227                          ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
228
229         # foo should depend on all of them
230         self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()),
231                          ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
232
233         # other services should not depend on each other
234         self.assertFalse(results['must1.service'].has_option('Unit', 'After'))
235         self.assertFalse(results['must2.service'].has_option('Unit', 'After'))
236         self.assertFalse(results['may1.service'].has_option('Unit', 'After'))
237
238     def test_symlink_prio_deps(self):
239         '''script without LSB headers use rcN.d priority'''
240
241         # create two init.d scripts without LSB header and enable them with
242         # startup priorities
243         for prio, name in [(10, 'provider'), (15, 'consumer')]:
244             with open(os.path.join(self.init_d_dir, name), 'w') as f:
245                 f.write('#!/bin/init-d-interpreter\ncode --goes here\n')
246                 os.fchmod(f.fileno(), 0o755)
247
248             d = os.path.join(self.rcnd_dir, 'rc2.d')
249             if not os.path.isdir(d):
250                 os.mkdir(d)
251             os.symlink('../init.d/' + name, os.path.join(d, 'S%02i%s' % (prio, name)))
252
253         err, results = self.run_generator()
254         self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
255         self.assertFalse(results['provider.service'].has_option('Unit', 'After'))
256         self.assertEqual(results['consumer.service'].get('Unit', 'After'),
257                          'provider.service')
258
259     def test_multiple_provides(self):
260         '''multiple Provides: names'''
261
262         self.add_sysv('foo', {'Provides': 'foo bar baz'})
263         s = self.run_generator()[1]['foo.service']
264         self.assertEqual(set(s.options('Unit')),
265                          set(['Documentation', 'SourcePath', 'Description']))
266         # should create symlinks for the alternative names
267         for f in ['bar.service', 'baz.service']:
268             self.assertEqual(os.readlink(os.path.join(self.out_dir, f)),
269                              'foo.service')
270
271     def test_nonexecutable_script(self):
272         '''ignores non-executable init.d script'''
273
274         os.chmod(self.add_sysv('foo', {}), 0o644)
275         err, results = self.run_generator()
276         self.assertEqual(results, {})
277
278
279 if __name__ == '__main__':
280     unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))