1 # systemd-sysv-generator integration test
3 # (C) 2015 Canonical Ltd.
4 # Author: Martin Pitt <martin.pitt@ubuntu.com>
6 # systemd is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as published by
8 # the Free Software Foundation; either version 2.1 of the License, or
9 # (at your option) any later version.
11 # systemd is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # Lesser General Public License for more details.
16 # You should have received a copy of the GNU Lesser General Public License
17 # along with systemd; If not, see <http://www.gnu.org/licenses/>.
28 from configparser import RawConfigParser
31 from ConfigParser import RawConfigParser
33 sysv_generator = os.path.join(os.environ.get('builddir', '.'), 'systemd-sysv-generator')
36 class SysvGeneratorTest(unittest.TestCase):
38 self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.')
39 self.init_d_dir = os.path.join(self.workdir, 'init.d')
40 os.mkdir(self.init_d_dir)
41 self.rcnd_dir = self.workdir
42 self.unit_dir = os.path.join(self.workdir, 'systemd')
43 os.mkdir(self.unit_dir)
44 self.out_dir = os.path.join(self.workdir, 'output')
45 os.mkdir(self.out_dir)
48 shutil.rmtree(self.workdir)
54 def run_generator(self, expect_error=False):
55 '''Run sysv-generator.
57 Fail if stderr contains any "Fail", unless expect_error is True.
58 Return (stderr, filename -> ConfigParser) pair with ouput to stderr and
59 parsed generated units.
61 env = os.environ.copy()
62 env['SYSTEMD_LOG_LEVEL'] = 'debug'
63 env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir
64 env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir
65 env['SYSTEMD_UNIT_PATH'] = self.unit_dir
66 gen = subprocess.Popen(
67 [sysv_generator, 'ignored', 'ignored', self.out_dir],
68 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
69 universal_newlines=True, env=env)
70 (out, err) = gen.communicate()
72 self.assertFalse('Fail' in err, err)
73 self.assertEqual(gen.returncode, 0, err)
76 for service in glob(self.out_dir + '/*.service'):
77 if os.path.islink(service):
79 cp = RawConfigParser()
80 cp.optionxform = lambda o: o # don't lower-case option names
81 with open(service) as f:
83 results[os.path.basename(service)] = cp
87 def add_sysv(self, fname, keys, enable=False, prio=1):
88 '''Create a SysV init script with the given keys in the LSB header
90 There are sensible default values for all fields.
91 If enable is True, links will be created in the rcN.d dirs. In that
92 case, the priority can be given with "prio" (default to 1).
94 Return path of generated script.
96 name_without_sh = fname.endswith('.sh') and fname[:-3] or fname
97 keys.setdefault('Provides', name_without_sh)
98 keys.setdefault('Required-Start', '$local_fs')
99 keys.setdefault('Required-Stop', keys['Required-Start'])
100 keys.setdefault('Default-Start', '2 3 4 5')
101 keys.setdefault('Default-Stop', '0 1 6')
102 keys.setdefault('Short-Description', 'test %s service' %
104 keys.setdefault('Description', 'long description for test %s service' %
106 script = os.path.join(self.init_d_dir, fname)
107 with open(script, 'w') as f:
108 f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
109 for k, v in keys.items():
111 f.write('#%20s %s\n' % (k + ':', v))
112 f.write('### END INIT INFO\ncode --goes here\n')
113 os.chmod(script, 0o755)
116 def make_link(prefix, runlevel):
117 d = os.path.join(self.rcnd_dir, 'rc%s.d' % runlevel)
118 if not os.path.isdir(d):
120 os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
122 for rl in keys['Default-Start'].split():
123 make_link('S%02i' % prio, rl)
124 for rl in keys['Default-Stop'].split():
125 make_link('K%02i' % (99 - prio), rl)
129 def assert_enabled(self, unit, targets):
130 '''assert that a unit is enabled in precisely the given targets'''
132 all_targets = ['multi-user', 'graphical']
135 for target in all_targets:
136 link = os.path.join(self.out_dir, '%s.target.wants' % target, unit)
137 if target in targets:
138 unit_file = os.readlink(link)
139 self.assertTrue(os.path.exists(unit_file))
140 self.assertEqual(os.path.basename(unit_file), unit)
142 self.assertFalse(os.path.exists(link),
143 '%s unexpectedly exists' % link)
149 def test_nothing(self):
152 results = self.run_generator()[1]
153 self.assertEqual(results, {})
154 self.assertEqual(os.listdir(self.out_dir), [])
156 def test_simple_disabled(self):
157 '''simple service without dependencies, disabled'''
159 self.add_sysv('foo', {}, enable=False)
160 err, results = self.run_generator()
161 self.assertEqual(len(results), 1)
163 # no enablement links or other stuff
164 self.assertEqual(os.listdir(self.out_dir), ['foo.service'])
166 s = results['foo.service']
167 self.assertEqual(s.sections(), ['Unit', 'Service'])
168 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
169 # $local_fs does not need translation, don't expect any dependency
171 self.assertEqual(set(s.options('Unit')),
172 set(['Documentation', 'SourcePath', 'Description']))
174 self.assertEqual(s.get('Service', 'Type'), 'forking')
175 init_script = os.path.join(self.init_d_dir, 'foo')
176 self.assertEqual(s.get('Service', 'ExecStart'),
177 '%s start' % init_script)
178 self.assertEqual(s.get('Service', 'ExecStop'),
179 '%s stop' % init_script)
181 self.assertNotIn('Overwriting', err)
183 def test_simple_enabled_all(self):
184 '''simple service without dependencies, enabled in all runlevels'''
186 self.add_sysv('foo', {}, enable=True)
187 err, results = self.run_generator()
188 self.assertEqual(list(results), ['foo.service'])
189 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
190 self.assertNotIn('Overwriting', err)
192 def test_simple_enabled_some(self):
193 '''simple service without dependencies, enabled in some runlevels'''
195 self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True)
196 err, results = self.run_generator()
197 self.assertEqual(list(results), ['foo.service'])
198 self.assert_enabled('foo.service', ['multi-user'])
200 def test_lsb_macro_dep_single(self):
201 '''single LSB macro dependency: $network'''
203 self.add_sysv('foo', {'Required-Start': '$network'})
204 s = self.run_generator()[1]['foo.service']
205 self.assertEqual(set(s.options('Unit')),
206 set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
207 self.assertEqual(s.get('Unit', 'After'), 'network-online.target')
208 self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target')
210 def test_lsb_macro_dep_multi(self):
211 '''multiple LSB macro dependencies'''
213 self.add_sysv('foo', {'Required-Start': '$named $portmap'})
214 s = self.run_generator()[1]['foo.service']
215 self.assertEqual(set(s.options('Unit')),
216 set(['Documentation', 'SourcePath', 'Description', 'After']))
217 self.assertEqual(s.get('Unit', 'After'), 'nss-lookup.target rpcbind.target')
219 def test_lsb_deps(self):
220 '''LSB header dependencies to other services'''
222 # also give symlink priorities here; they should be ignored
223 self.add_sysv('foo', {'Required-Start': 'must1 must2',
224 'Should-Start': 'may1 ne_may2'},
225 enable=True, prio=40)
226 self.add_sysv('must1', {}, enable=True, prio=10)
227 self.add_sysv('must2', {}, enable=True, prio=15)
228 self.add_sysv('may1', {}, enable=True, prio=20)
229 # do not create ne_may2
230 err, results = self.run_generator()
231 self.assertEqual(sorted(results),
232 ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
234 # foo should depend on all of them
235 self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()),
236 ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
238 # other services should not depend on each other
239 self.assertFalse(results['must1.service'].has_option('Unit', 'After'))
240 self.assertFalse(results['must2.service'].has_option('Unit', 'After'))
241 self.assertFalse(results['may1.service'].has_option('Unit', 'After'))
243 def test_symlink_prio_deps(self):
244 '''script without LSB headers use rcN.d priority'''
246 # create two init.d scripts without LSB header and enable them with
248 for prio, name in [(10, 'provider'), (15, 'consumer')]:
249 with open(os.path.join(self.init_d_dir, name), 'w') as f:
250 f.write('#!/bin/init-d-interpreter\ncode --goes here\n')
251 os.fchmod(f.fileno(), 0o755)
253 d = os.path.join(self.rcnd_dir, 'rc2.d')
254 if not os.path.isdir(d):
256 os.symlink('../init.d/' + name, os.path.join(d, 'S%02i%s' % (prio, name)))
258 err, results = self.run_generator()
259 self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
260 self.assertFalse(results['provider.service'].has_option('Unit', 'After'))
261 self.assertEqual(results['consumer.service'].get('Unit', 'After'),
264 def test_multiple_provides(self):
265 '''multiple Provides: names'''
267 self.add_sysv('foo', {'Provides': 'foo bar baz'})
268 err, results = self.run_generator()
269 self.assertEqual(list(results), ['foo.service'])
270 self.assertEqual(set(results['foo.service'].options('Unit')),
271 set(['Documentation', 'SourcePath', 'Description']))
272 # should create symlinks for the alternative names
273 for f in ['bar.service', 'baz.service']:
274 self.assertEqual(os.readlink(os.path.join(self.out_dir, f)),
276 self.assertNotIn('Overwriting', err)
278 def test_same_provides_in_multiple_scripts(self):
279 '''multiple init.d scripts provide the same name'''
281 self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1)
282 self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2)
283 err, results = self.run_generator()
284 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
285 # should create symlink for the alternative name for either unit
286 self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')),
287 ['foo.service', 'bar.service'])
289 def test_provide_other_script(self):
290 '''init.d scripts provides the name of another init.d script'''
292 self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True)
293 self.add_sysv('bar', {'Provides': 'bar'}, enable=True)
294 err, results = self.run_generator()
295 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
296 # we do expect an overwrite here, bar.service should overwrite the
297 # alias link from foo.service
298 self.assertIn('Overwriting', err)
300 def test_nonexecutable_script(self):
301 '''ignores non-executable init.d script'''
303 os.chmod(self.add_sysv('foo', {}), 0o644)
304 err, results = self.run_generator()
305 self.assertEqual(results, {})
307 def test_sh_suffix(self):
308 '''init.d script with .sh suffix'''
310 self.add_sysv('foo.sh', {}, enable=True)
311 err, results = self.run_generator()
312 s = results['foo.service']
314 self.assertEqual(s.sections(), ['Unit', 'Service'])
315 # should not have a .sh
316 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
318 # calls correct script with .sh
319 init_script = os.path.join(self.init_d_dir, 'foo.sh')
320 self.assertEqual(s.get('Service', 'ExecStart'),
321 '%s start' % init_script)
322 self.assertEqual(s.get('Service', 'ExecStop'),
323 '%s stop' % init_script)
325 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
327 def test_sh_suffix_with_provides(self):
328 '''init.d script with .sh suffix and Provides:'''
330 self.add_sysv('foo.sh', {'Provides': 'foo bar'})
331 err, results = self.run_generator()
332 # ensure we don't try to create a symlink to itself
333 self.assertNotIn(err, 'itself')
334 self.assertEqual(list(results), ['foo.service'])
335 self.assertEqual(results['foo.service'].get('Unit', 'Description'),
336 'LSB: test foo service')
338 # should create symlink for the alternative name
339 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')),
342 def test_hidden_files(self):
343 '''init.d script with hidden file suffix'''
345 script = self.add_sysv('foo', {}, enable=True)
346 # backup files (not enabled in rcN.d/)
347 shutil.copy(script, script + '.dpkg-new')
348 shutil.copy(script, script + '.dpkg-dist')
349 shutil.copy(script, script + '.swp')
350 shutil.copy(script, script + '.rpmsave')
352 err, results = self.run_generator()
353 self.assertEqual(list(results), ['foo.service'])
355 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
357 def test_backup_file(self):
358 '''init.d script with backup file'''
360 script = self.add_sysv('foo', {}, enable=True)
361 # backup files (not enabled in rcN.d/)
362 shutil.copy(script, script + '.bak')
363 shutil.copy(script, script + '.old')
365 err, results = self.run_generator()
367 self.assertEqual(sorted(results),
368 ['foo.bak.service', 'foo.old.service', 'foo.service'])
370 # ensure we don't try to create a symlink to itself
371 self.assertNotIn(err, 'itself')
373 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
374 self.assert_enabled('foo.bak.service', [])
375 self.assert_enabled('foo.old.service', [])
377 def test_existing_native_unit(self):
378 '''existing native unit'''
380 with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f:
383 self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True)
384 err, results = self.run_generator()
385 self.assertEqual(list(results), [])
386 # no enablement or alias links, as native unit is disabled
387 self.assertEqual(os.listdir(self.out_dir), [])
390 if __name__ == '__main__':
391 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))