]>
git.ipfire.org Git - thirdparty/systemd.git/blob - test/sysv-generator-test.py
2 # SPDX-License-Identifier: LGPL-2.1+
4 # systemd-sysv-generator integration test
6 # © 2015 Canonical Ltd.
7 # Author: Martin Pitt <martin.pitt@ubuntu.com>
17 from configparser
import RawConfigParser
20 sysv_generator
= './systemd-sysv-generator'
22 class MultiDict(collections
.OrderedDict
):
23 def __setitem__(self
, key
, value
):
24 if isinstance(value
, list) and key
in self
:
25 self
[key
].extend(value
)
27 super(MultiDict
, self
).__setitem
__(key
, value
)
29 class SysvGeneratorTest(unittest
.TestCase
):
31 self
.workdir
= tempfile
.mkdtemp(prefix
='sysv-gen-test.')
32 self
.init_d_dir
= os
.path
.join(self
.workdir
, 'init.d')
33 os
.mkdir(self
.init_d_dir
)
34 self
.rcnd_dir
= self
.workdir
35 self
.unit_dir
= os
.path
.join(self
.workdir
, 'systemd')
36 os
.mkdir(self
.unit_dir
)
37 self
.out_dir
= os
.path
.join(self
.workdir
, 'output')
38 os
.mkdir(self
.out_dir
)
41 shutil
.rmtree(self
.workdir
)
47 def run_generator(self
, expect_error
=False):
48 '''Run sysv-generator.
50 Fail if stderr contains any "Fail", unless expect_error is True.
51 Return (stderr, filename -> ConfigParser) pair with output to stderr and
52 parsed generated units.
54 env
= os
.environ
.copy()
55 env
['SYSTEMD_LOG_LEVEL'] = 'debug'
56 env
['SYSTEMD_LOG_TARGET'] = 'console'
57 env
['SYSTEMD_SYSVINIT_PATH'] = self
.init_d_dir
58 env
['SYSTEMD_SYSVRCND_PATH'] = self
.rcnd_dir
59 env
['SYSTEMD_UNIT_PATH'] = self
.unit_dir
60 gen
= subprocess
.Popen(
61 [sysv_generator
, 'ignored', 'ignored', self
.out_dir
],
62 stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
,
63 universal_newlines
=True, env
=env
)
64 (out
, err
) = gen
.communicate()
66 self
.assertFalse('Fail' in err
, err
)
67 self
.assertEqual(gen
.returncode
, 0, err
)
70 for service
in glob(self
.out_dir
+ '/*.service'):
71 if os
.path
.islink(service
):
74 # for python3 we need here strict=False to parse multiple
75 # lines with the same key
76 cp
= RawConfigParser(dict_type
=MultiDict
, strict
=False)
78 # RawConfigParser in python2 does not have the strict option
79 # but it allows multiple lines with the same key by default
80 cp
= RawConfigParser(dict_type
=MultiDict
)
81 cp
.optionxform
= lambda o
: o
# don't lower-case option names
82 with
open(service
) as f
:
84 results
[os
.path
.basename(service
)] = cp
88 def add_sysv(self
, fname
, keys
, enable
=False, prio
=1):
89 '''Create a SysV init script with the given keys in the LSB header
91 There are sensible default values for all fields.
92 If enable is True, links will be created in the rcN.d dirs. In that
93 case, the priority can be given with "prio" (default to 1).
95 Return path of generated script.
97 name_without_sh
= fname
.endswith('.sh') and fname
[:-3] or fname
98 keys
.setdefault('Provides', name_without_sh
)
99 keys
.setdefault('Required-Start', '$local_fs')
100 keys
.setdefault('Required-Stop', keys
['Required-Start'])
101 keys
.setdefault('Default-Start', '2 3 4 5')
102 keys
.setdefault('Default-Stop', '0 1 6')
103 keys
.setdefault('Short-Description', 'test {} service'.format(name_without_sh
))
104 keys
.setdefault('Description', 'long description for test {} service'.format(name_without_sh
))
105 script
= os
.path
.join(self
.init_d_dir
, fname
)
106 with
open(script
, 'w') as f
:
107 f
.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
108 for k
, v
in keys
.items():
110 f
.write('#{:>20} {}\n'.format(k
+ ':', v
))
111 f
.write('### END INIT INFO\ncode --goes here\n')
112 os
.chmod(script
, 0o755)
115 def make_link(prefix
, runlevel
):
116 d
= os
.path
.join(self
.rcnd_dir
, 'rc{}.d'.format(runlevel
))
117 if not os
.path
.isdir(d
):
119 os
.symlink('../init.d/' + fname
, os
.path
.join(d
, prefix
+ fname
))
121 for rl
in keys
['Default-Start'].split():
122 make_link('S%02i' % prio
, rl
)
123 for rl
in keys
['Default-Stop'].split():
124 make_link('K%02i' % (99 - prio
), rl
)
128 def assert_enabled(self
, unit
, targets
):
129 '''assert that a unit is enabled in precisely the given targets'''
131 all_targets
= ['multi-user', 'graphical']
134 for target
in all_targets
:
135 link
= os
.path
.join(self
.out_dir
, '{}.target.wants'.format(target
), unit
)
136 if target
in targets
:
137 unit_file
= os
.readlink(link
)
138 # os.path.exists() will fail on a dangling symlink
139 self
.assertTrue(os
.path
.exists(link
))
140 self
.assertEqual(os
.path
.basename(unit_file
), unit
)
142 self
.assertFalse(os
.path
.exists(link
),
143 '{} unexpectedly exists'.format(link
))
149 def test_nothing(self
):
152 results
= self
.run_generator()[1]
153 self
.assertEqual(results
, {})
154 self
.assertEqual(os
.listdir(self
.out_dir
), [])
156 def test_simple_disabled(self
):
157 '''simple service without dependencies, disabled'''
159 self
.add_sysv('foo', {}, enable
=False)
160 err
, results
= self
.run_generator()
161 self
.assertEqual(len(results
), 1)
163 # no enablement links or other stuff
164 self
.assertEqual(os
.listdir(self
.out_dir
), ['foo.service'])
166 s
= results
['foo.service']
167 self
.assertEqual(s
.sections(), ['Unit', 'Service'])
168 self
.assertEqual(s
.get('Unit', 'Description'), 'LSB: test foo service')
169 # $local_fs does not need translation, don't expect any dependency
171 self
.assertEqual(set(s
.options('Unit')),
172 set(['Documentation', 'SourcePath', 'Description']))
174 self
.assertEqual(s
.get('Service', 'Type'), 'forking')
175 init_script
= os
.path
.join(self
.init_d_dir
, 'foo')
176 self
.assertEqual(s
.get('Service', 'ExecStart'),
177 '{} start'.format(init_script
))
178 self
.assertEqual(s
.get('Service', 'ExecStop'),
179 '{} stop'.format(init_script
))
181 self
.assertNotIn('Overwriting', err
)
183 def test_simple_enabled_all(self
):
184 '''simple service without dependencies, enabled in all runlevels'''
186 self
.add_sysv('foo', {}, enable
=True)
187 err
, results
= self
.run_generator()
188 self
.assertEqual(list(results
), ['foo.service'])
189 self
.assert_enabled('foo.service', ['multi-user', 'graphical'])
190 self
.assertNotIn('Overwriting', err
)
192 def test_simple_escaped(self
):
193 '''simple service without dependencies, that requires escaping the name'''
195 self
.add_sysv('foo+', {})
196 self
.add_sysv('foo-admin', {})
197 err
, results
= self
.run_generator()
198 self
.assertEqual(set(results
), {'foo-admin.service', 'foo\\x2b.service'})
199 self
.assertNotIn('Overwriting', err
)
201 def test_simple_enabled_some(self
):
202 '''simple service without dependencies, enabled in some runlevels'''
204 self
.add_sysv('foo', {'Default-Start': '2 4'}, enable
=True)
205 err
, results
= self
.run_generator()
206 self
.assertEqual(list(results
), ['foo.service'])
207 self
.assert_enabled('foo.service', ['multi-user'])
209 def test_lsb_macro_dep_single(self
):
210 '''single LSB macro dependency: $network'''
212 self
.add_sysv('foo', {'Required-Start': '$network'})
213 s
= self
.run_generator()[1]['foo.service']
214 self
.assertEqual(set(s
.options('Unit')),
215 set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
216 self
.assertEqual(s
.get('Unit', 'After'), 'network-online.target')
217 self
.assertEqual(s
.get('Unit', 'Wants'), 'network-online.target')
219 def test_lsb_macro_dep_multi(self
):
220 '''multiple LSB macro dependencies'''
222 self
.add_sysv('foo', {'Required-Start': '$named $portmap'})
223 s
= self
.run_generator()[1]['foo.service']
224 self
.assertEqual(set(s
.options('Unit')),
225 set(['Documentation', 'SourcePath', 'Description', 'After']))
226 self
.assertEqual(s
.get('Unit', 'After').split(), ['nss-lookup.target', 'rpcbind.target'])
228 def test_lsb_deps(self
):
229 '''LSB header dependencies to other services'''
231 # also give symlink priorities here; they should be ignored
232 self
.add_sysv('foo', {'Required-Start': 'must1 must2',
233 'Should-Start': 'may1 ne_may2'},
234 enable
=True, prio
=40)
235 self
.add_sysv('must1', {}, enable
=True, prio
=10)
236 self
.add_sysv('must2', {}, enable
=True, prio
=15)
237 self
.add_sysv('may1', {}, enable
=True, prio
=20)
238 # do not create ne_may2
239 err
, results
= self
.run_generator()
240 self
.assertEqual(sorted(results
),
241 ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
243 # foo should depend on all of them
244 self
.assertEqual(sorted(results
['foo.service'].get('Unit', 'After').split()),
245 ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
247 # other services should not depend on each other
248 self
.assertFalse(results
['must1.service'].has_option('Unit', 'After'))
249 self
.assertFalse(results
['must2.service'].has_option('Unit', 'After'))
250 self
.assertFalse(results
['may1.service'].has_option('Unit', 'After'))
252 def test_symlink_prio_deps(self
):
253 '''script without LSB headers use rcN.d priority'''
255 # create two init.d scripts without LSB header and enable them with
257 for prio
, name
in [(10, 'provider'), (15, 'consumer')]:
258 with
open(os
.path
.join(self
.init_d_dir
, name
), 'w') as f
:
259 f
.write('#!/bin/init-d-interpreter\ncode --goes here\n')
260 os
.fchmod(f
.fileno(), 0o755)
262 d
= os
.path
.join(self
.rcnd_dir
, 'rc2.d')
263 if not os
.path
.isdir(d
):
265 os
.symlink('../init.d/' + name
, os
.path
.join(d
, 'S{:>2}{}'.format(prio
, name
)))
267 err
, results
= self
.run_generator()
268 self
.assertEqual(sorted(results
), ['consumer.service', 'provider.service'])
269 self
.assertFalse(results
['provider.service'].has_option('Unit', 'After'))
270 self
.assertEqual(results
['consumer.service'].get('Unit', 'After'),
273 def test_multiple_provides(self
):
274 '''multiple Provides: names'''
276 self
.add_sysv('foo', {'Provides': 'foo bar baz'})
277 err
, results
= self
.run_generator()
278 self
.assertEqual(list(results
), ['foo.service'])
279 self
.assertEqual(set(results
['foo.service'].options('Unit')),
280 set(['Documentation', 'SourcePath', 'Description']))
281 # should create symlinks for the alternative names
282 for f
in ['bar.service', 'baz.service']:
283 self
.assertEqual(os
.readlink(os
.path
.join(self
.out_dir
, f
)),
285 self
.assertNotIn('Overwriting', err
)
287 def test_provides_escaped(self
):
288 '''a script that Provides: a name that requires escaping'''
290 self
.add_sysv('foo', {'Provides': 'foo foo+'})
291 err
, results
= self
.run_generator()
292 self
.assertEqual(list(results
), ['foo.service'])
293 self
.assertEqual(os
.readlink(os
.path
.join(self
.out_dir
, 'foo\\x2b.service')),
295 self
.assertNotIn('Overwriting', err
)
297 def test_same_provides_in_multiple_scripts(self
):
298 '''multiple init.d scripts provide the same name'''
300 self
.add_sysv('foo', {'Provides': 'foo common'}, enable
=True, prio
=1)
301 self
.add_sysv('bar', {'Provides': 'bar common'}, enable
=True, prio
=2)
302 err
, results
= self
.run_generator()
303 self
.assertEqual(sorted(results
), ['bar.service', 'foo.service'])
304 # should create symlink for the alternative name for either unit
305 self
.assertIn(os
.readlink(os
.path
.join(self
.out_dir
, 'common.service')),
306 ['foo.service', 'bar.service'])
308 def test_provide_other_script(self
):
309 '''init.d scripts provides the name of another init.d script'''
311 self
.add_sysv('foo', {'Provides': 'foo bar'}, enable
=True)
312 self
.add_sysv('bar', {'Provides': 'bar'}, enable
=True)
313 err
, results
= self
.run_generator()
314 self
.assertEqual(sorted(results
), ['bar.service', 'foo.service'])
315 # we do expect an overwrite here, bar.service should overwrite the
316 # alias link from foo.service
317 self
.assertIn('Overwriting', err
)
319 def test_nonexecutable_script(self
):
320 '''ignores non-executable init.d script'''
322 os
.chmod(self
.add_sysv('foo', {}), 0o644)
323 err
, results
= self
.run_generator()
324 self
.assertEqual(results
, {})
326 def test_sh_suffix(self
):
327 '''init.d script with .sh suffix'''
329 self
.add_sysv('foo.sh', {}, enable
=True)
330 err
, results
= self
.run_generator()
331 s
= results
['foo.service']
333 self
.assertEqual(s
.sections(), ['Unit', 'Service'])
334 # should not have a .sh
335 self
.assertEqual(s
.get('Unit', 'Description'), 'LSB: test foo service')
337 # calls correct script with .sh
338 init_script
= os
.path
.join(self
.init_d_dir
, 'foo.sh')
339 self
.assertEqual(s
.get('Service', 'ExecStart'),
340 '{} start'.format(init_script
))
341 self
.assertEqual(s
.get('Service', 'ExecStop'),
342 '{} stop'.format(init_script
))
344 self
.assert_enabled('foo.service', ['multi-user', 'graphical'])
346 def test_sh_suffix_with_provides(self
):
347 '''init.d script with .sh suffix and Provides:'''
349 self
.add_sysv('foo.sh', {'Provides': 'foo bar'})
350 err
, results
= self
.run_generator()
351 # ensure we don't try to create a symlink to itself
352 self
.assertNotIn('itself', err
)
353 self
.assertEqual(list(results
), ['foo.service'])
354 self
.assertEqual(results
['foo.service'].get('Unit', 'Description'),
355 'LSB: test foo service')
357 # should create symlink for the alternative name
358 self
.assertEqual(os
.readlink(os
.path
.join(self
.out_dir
, 'bar.service')),
361 def test_hidden_files(self
):
362 '''init.d script with hidden file suffix'''
364 script
= self
.add_sysv('foo', {}, enable
=True)
365 # backup files (not enabled in rcN.d/)
366 shutil
.copy(script
, script
+ '.dpkg-new')
367 shutil
.copy(script
, script
+ '.dpkg-dist')
368 shutil
.copy(script
, script
+ '.swp')
369 shutil
.copy(script
, script
+ '.rpmsave')
371 err
, results
= self
.run_generator()
372 self
.assertEqual(list(results
), ['foo.service'])
374 self
.assert_enabled('foo.service', ['multi-user', 'graphical'])
376 def test_backup_file(self
):
377 '''init.d script with backup file'''
379 script
= self
.add_sysv('foo', {}, enable
=True)
380 # backup files (not enabled in rcN.d/)
381 shutil
.copy(script
, script
+ '.bak')
382 shutil
.copy(script
, script
+ '.old')
383 shutil
.copy(script
, script
+ '.tmp')
384 shutil
.copy(script
, script
+ '.new')
386 err
, results
= self
.run_generator()
388 self
.assertEqual(sorted(results
), ['foo.service', 'foo.tmp.service'])
390 # ensure we don't try to create a symlink to itself
391 self
.assertNotIn('itself', err
)
393 self
.assert_enabled('foo.service', ['multi-user', 'graphical'])
394 self
.assert_enabled('foo.bak.service', [])
395 self
.assert_enabled('foo.old.service', [])
397 def test_existing_native_unit(self
):
398 '''existing native unit'''
400 with
open(os
.path
.join(self
.unit_dir
, 'foo.service'), 'w') as f
:
403 self
.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable
=True)
404 err
, results
= self
.run_generator()
405 self
.assertEqual(list(results
), [])
406 # no enablement or alias links, as native unit is disabled
407 self
.assertEqual(os
.listdir(self
.out_dir
), [])
410 if __name__
== '__main__':
411 unittest
.main(testRunner
=unittest
.TextTestRunner(stream
=sys
.stdout
, verbosity
=2))