]>
git.ipfire.org Git - thirdparty/systemd.git/blob - test/sysv-generator-test.py
9774378ae6aa105b0ed2c960ee5a00a112990809
3 # systemd-sysv-generator integration test
5 # (C) 2015 Canonical Ltd.
6 # Author: Martin Pitt <martin.pitt@ubuntu.com>
8 # systemd is free software; you can redistribute it and/or modify it
9 # under the terms of the GNU Lesser General Public License as published by
10 # the Free Software Foundation; either version 2.1 of the License, or
11 # (at your option) any later version.
13 # systemd is distributed in the hope that it will be useful, but
14 # WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 # Lesser General Public License for more details.
18 # You should have received a copy of the GNU Lesser General Public License
19 # along with systemd; If not, see <http://www.gnu.org/licenses/>.
29 from configparser
import RawConfigParser
31 sysv_generator
= os
.path
.join(os
.environ
.get('builddir', '.'), 'systemd-sysv-generator')
33 class MultiDict(collections
.OrderedDict
):
34 def __setitem__(self
, key
, value
):
35 if isinstance(value
, list) and key
in self
:
36 self
[key
].extend(value
)
38 super(MultiDict
, self
).__setitem
__(key
, value
)
40 class SysvGeneratorTest(unittest
.TestCase
):
42 self
.workdir
= tempfile
.mkdtemp(prefix
='sysv-gen-test.')
43 self
.init_d_dir
= os
.path
.join(self
.workdir
, 'init.d')
44 os
.mkdir(self
.init_d_dir
)
45 self
.rcnd_dir
= self
.workdir
46 self
.unit_dir
= os
.path
.join(self
.workdir
, 'systemd')
47 os
.mkdir(self
.unit_dir
)
48 self
.out_dir
= os
.path
.join(self
.workdir
, 'output')
49 os
.mkdir(self
.out_dir
)
52 shutil
.rmtree(self
.workdir
)
58 def run_generator(self
, expect_error
=False):
59 '''Run sysv-generator.
61 Fail if stderr contains any "Fail", unless expect_error is True.
62 Return (stderr, filename -> ConfigParser) pair with ouput to stderr and
63 parsed generated units.
65 env
= os
.environ
.copy()
66 env
['SYSTEMD_LOG_LEVEL'] = 'debug'
67 env
['SYSTEMD_LOG_TARGET'] = 'console'
68 env
['SYSTEMD_SYSVINIT_PATH'] = self
.init_d_dir
69 env
['SYSTEMD_SYSVRCND_PATH'] = self
.rcnd_dir
70 env
['SYSTEMD_UNIT_PATH'] = self
.unit_dir
71 gen
= subprocess
.Popen(
72 [sysv_generator
, 'ignored', 'ignored', self
.out_dir
],
73 stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
,
74 universal_newlines
=True, env
=env
)
75 (out
, err
) = gen
.communicate()
77 self
.assertFalse('Fail' in err
, err
)
78 self
.assertEqual(gen
.returncode
, 0, err
)
81 for service
in glob(self
.out_dir
+ '/*.service'):
82 if os
.path
.islink(service
):
85 # for python3 we need here strict=False to parse multiple
86 # lines with the same key
87 cp
= RawConfigParser(dict_type
=MultiDict
, strict
=False)
89 # RawConfigParser in python2 does not have the strict option
90 # but it allows multiple lines with the same key by default
91 cp
= RawConfigParser(dict_type
=MultiDict
)
92 cp
.optionxform
= lambda o
: o
# don't lower-case option names
93 with
open(service
) as f
:
95 results
[os
.path
.basename(service
)] = cp
99 def add_sysv(self
, fname
, keys
, enable
=False, prio
=1):
100 '''Create a SysV init script with the given keys in the LSB header
102 There are sensible default values for all fields.
103 If enable is True, links will be created in the rcN.d dirs. In that
104 case, the priority can be given with "prio" (default to 1).
106 Return path of generated script.
108 name_without_sh
= fname
.endswith('.sh') and fname
[:-3] or fname
109 keys
.setdefault('Provides', name_without_sh
)
110 keys
.setdefault('Required-Start', '$local_fs')
111 keys
.setdefault('Required-Stop', keys
['Required-Start'])
112 keys
.setdefault('Default-Start', '2 3 4 5')
113 keys
.setdefault('Default-Stop', '0 1 6')
114 keys
.setdefault('Short-Description', 'test %s service' %
116 keys
.setdefault('Description', 'long description for test %s service' %
118 script
= os
.path
.join(self
.init_d_dir
, fname
)
119 with
open(script
, 'w') as f
:
120 f
.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
121 for k
, v
in keys
.items():
123 f
.write('#%20s %s\n' % (k
+ ':', v
))
124 f
.write('### END INIT INFO\ncode --goes here\n')
125 os
.chmod(script
, 0o755)
128 def make_link(prefix
, runlevel
):
129 d
= os
.path
.join(self
.rcnd_dir
, 'rc%s.d' % runlevel
)
130 if not os
.path
.isdir(d
):
132 os
.symlink('../init.d/' + fname
, os
.path
.join(d
, prefix
+ fname
))
134 for rl
in keys
['Default-Start'].split():
135 make_link('S%02i' % prio
, rl
)
136 for rl
in keys
['Default-Stop'].split():
137 make_link('K%02i' % (99 - prio
), rl
)
141 def assert_enabled(self
, unit
, targets
):
142 '''assert that a unit is enabled in precisely the given targets'''
144 all_targets
= ['multi-user', 'graphical']
147 for target
in all_targets
:
148 link
= os
.path
.join(self
.out_dir
, '%s.target.wants' % target
, unit
)
149 if target
in targets
:
150 unit_file
= os
.readlink(link
)
151 # os.path.exists() will fail on a dangling symlink
152 self
.assertTrue(os
.path
.exists(link
))
153 self
.assertEqual(os
.path
.basename(unit_file
), unit
)
155 self
.assertFalse(os
.path
.exists(link
),
156 '%s unexpectedly exists' % link
)
162 def test_nothing(self
):
165 results
= self
.run_generator()[1]
166 self
.assertEqual(results
, {})
167 self
.assertEqual(os
.listdir(self
.out_dir
), [])
169 def test_simple_disabled(self
):
170 '''simple service without dependencies, disabled'''
172 self
.add_sysv('foo', {}, enable
=False)
173 err
, results
= self
.run_generator()
174 self
.assertEqual(len(results
), 1)
176 # no enablement links or other stuff
177 self
.assertEqual(os
.listdir(self
.out_dir
), ['foo.service'])
179 s
= results
['foo.service']
180 self
.assertEqual(s
.sections(), ['Unit', 'Service'])
181 self
.assertEqual(s
.get('Unit', 'Description'), 'LSB: test foo service')
182 # $local_fs does not need translation, don't expect any dependency
184 self
.assertEqual(set(s
.options('Unit')),
185 set(['Documentation', 'SourcePath', 'Description']))
187 self
.assertEqual(s
.get('Service', 'Type'), 'forking')
188 init_script
= os
.path
.join(self
.init_d_dir
, 'foo')
189 self
.assertEqual(s
.get('Service', 'ExecStart'),
190 '%s start' % init_script
)
191 self
.assertEqual(s
.get('Service', 'ExecStop'),
192 '%s stop' % init_script
)
194 self
.assertNotIn('Overwriting', err
)
196 def test_simple_enabled_all(self
):
197 '''simple service without dependencies, enabled in all runlevels'''
199 self
.add_sysv('foo', {}, enable
=True)
200 err
, results
= self
.run_generator()
201 self
.assertEqual(list(results
), ['foo.service'])
202 self
.assert_enabled('foo.service', ['multi-user', 'graphical'])
203 self
.assertNotIn('Overwriting', err
)
205 def test_simple_escaped(self
):
206 '''simple service without dependencies, that requires escaping the name'''
208 self
.add_sysv('foo+', {})
209 self
.add_sysv('foo-admin', {})
210 err
, results
= self
.run_generator()
211 self
.assertEqual(set(results
), {'foo-admin.service', 'foo\\x2b.service'})
212 self
.assertNotIn('Overwriting', err
)
214 def test_simple_enabled_some(self
):
215 '''simple service without dependencies, enabled in some runlevels'''
217 self
.add_sysv('foo', {'Default-Start': '2 4'}, enable
=True)
218 err
, results
= self
.run_generator()
219 self
.assertEqual(list(results
), ['foo.service'])
220 self
.assert_enabled('foo.service', ['multi-user'])
222 def test_lsb_macro_dep_single(self
):
223 '''single LSB macro dependency: $network'''
225 self
.add_sysv('foo', {'Required-Start': '$network'})
226 s
= self
.run_generator()[1]['foo.service']
227 self
.assertEqual(set(s
.options('Unit')),
228 set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
229 self
.assertEqual(s
.get('Unit', 'After'), 'network-online.target')
230 self
.assertEqual(s
.get('Unit', 'Wants'), 'network-online.target')
232 def test_lsb_macro_dep_multi(self
):
233 '''multiple LSB macro dependencies'''
235 self
.add_sysv('foo', {'Required-Start': '$named $portmap'})
236 s
= self
.run_generator()[1]['foo.service']
237 self
.assertEqual(set(s
.options('Unit')),
238 set(['Documentation', 'SourcePath', 'Description', 'After']))
239 self
.assertEqual(s
.get('Unit', 'After').split(), ['nss-lookup.target', 'rpcbind.target'])
241 def test_lsb_deps(self
):
242 '''LSB header dependencies to other services'''
244 # also give symlink priorities here; they should be ignored
245 self
.add_sysv('foo', {'Required-Start': 'must1 must2',
246 'Should-Start': 'may1 ne_may2'},
247 enable
=True, prio
=40)
248 self
.add_sysv('must1', {}, enable
=True, prio
=10)
249 self
.add_sysv('must2', {}, enable
=True, prio
=15)
250 self
.add_sysv('may1', {}, enable
=True, prio
=20)
251 # do not create ne_may2
252 err
, results
= self
.run_generator()
253 self
.assertEqual(sorted(results
),
254 ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
256 # foo should depend on all of them
257 self
.assertEqual(sorted(results
['foo.service'].get('Unit', 'After').split()),
258 ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
260 # other services should not depend on each other
261 self
.assertFalse(results
['must1.service'].has_option('Unit', 'After'))
262 self
.assertFalse(results
['must2.service'].has_option('Unit', 'After'))
263 self
.assertFalse(results
['may1.service'].has_option('Unit', 'After'))
265 def test_symlink_prio_deps(self
):
266 '''script without LSB headers use rcN.d priority'''
268 # create two init.d scripts without LSB header and enable them with
270 for prio
, name
in [(10, 'provider'), (15, 'consumer')]:
271 with
open(os
.path
.join(self
.init_d_dir
, name
), 'w') as f
:
272 f
.write('#!/bin/init-d-interpreter\ncode --goes here\n')
273 os
.fchmod(f
.fileno(), 0o755)
275 d
= os
.path
.join(self
.rcnd_dir
, 'rc2.d')
276 if not os
.path
.isdir(d
):
278 os
.symlink('../init.d/' + name
, os
.path
.join(d
, 'S%02i%s' % (prio
, name
)))
280 err
, results
= self
.run_generator()
281 self
.assertEqual(sorted(results
), ['consumer.service', 'provider.service'])
282 self
.assertFalse(results
['provider.service'].has_option('Unit', 'After'))
283 self
.assertEqual(results
['consumer.service'].get('Unit', 'After'),
286 def test_multiple_provides(self
):
287 '''multiple Provides: names'''
289 self
.add_sysv('foo', {'Provides': 'foo bar baz'})
290 err
, results
= self
.run_generator()
291 self
.assertEqual(list(results
), ['foo.service'])
292 self
.assertEqual(set(results
['foo.service'].options('Unit')),
293 set(['Documentation', 'SourcePath', 'Description']))
294 # should create symlinks for the alternative names
295 for f
in ['bar.service', 'baz.service']:
296 self
.assertEqual(os
.readlink(os
.path
.join(self
.out_dir
, f
)),
298 self
.assertNotIn('Overwriting', err
)
300 def test_provides_escaped(self
):
301 '''a script that Provides: a name that requires escaping'''
303 self
.add_sysv('foo', {'Provides': 'foo foo+'})
304 err
, results
= self
.run_generator()
305 self
.assertEqual(list(results
), ['foo.service'])
306 self
.assertEqual(os
.readlink(os
.path
.join(self
.out_dir
, 'foo\\x2b.service')),
308 self
.assertNotIn('Overwriting', err
)
310 def test_same_provides_in_multiple_scripts(self
):
311 '''multiple init.d scripts provide the same name'''
313 self
.add_sysv('foo', {'Provides': 'foo common'}, enable
=True, prio
=1)
314 self
.add_sysv('bar', {'Provides': 'bar common'}, enable
=True, prio
=2)
315 err
, results
= self
.run_generator()
316 self
.assertEqual(sorted(results
), ['bar.service', 'foo.service'])
317 # should create symlink for the alternative name for either unit
318 self
.assertIn(os
.readlink(os
.path
.join(self
.out_dir
, 'common.service')),
319 ['foo.service', 'bar.service'])
321 def test_provide_other_script(self
):
322 '''init.d scripts provides the name of another init.d script'''
324 self
.add_sysv('foo', {'Provides': 'foo bar'}, enable
=True)
325 self
.add_sysv('bar', {'Provides': 'bar'}, enable
=True)
326 err
, results
= self
.run_generator()
327 self
.assertEqual(sorted(results
), ['bar.service', 'foo.service'])
328 # we do expect an overwrite here, bar.service should overwrite the
329 # alias link from foo.service
330 self
.assertIn('Overwriting', err
)
332 def test_nonexecutable_script(self
):
333 '''ignores non-executable init.d script'''
335 os
.chmod(self
.add_sysv('foo', {}), 0o644)
336 err
, results
= self
.run_generator()
337 self
.assertEqual(results
, {})
339 def test_sh_suffix(self
):
340 '''init.d script with .sh suffix'''
342 self
.add_sysv('foo.sh', {}, enable
=True)
343 err
, results
= self
.run_generator()
344 s
= results
['foo.service']
346 self
.assertEqual(s
.sections(), ['Unit', 'Service'])
347 # should not have a .sh
348 self
.assertEqual(s
.get('Unit', 'Description'), 'LSB: test foo service')
350 # calls correct script with .sh
351 init_script
= os
.path
.join(self
.init_d_dir
, 'foo.sh')
352 self
.assertEqual(s
.get('Service', 'ExecStart'),
353 '%s start' % init_script
)
354 self
.assertEqual(s
.get('Service', 'ExecStop'),
355 '%s stop' % init_script
)
357 self
.assert_enabled('foo.service', ['multi-user', 'graphical'])
359 def test_sh_suffix_with_provides(self
):
360 '''init.d script with .sh suffix and Provides:'''
362 self
.add_sysv('foo.sh', {'Provides': 'foo bar'})
363 err
, results
= self
.run_generator()
364 # ensure we don't try to create a symlink to itself
365 self
.assertNotIn('itself', err
)
366 self
.assertEqual(list(results
), ['foo.service'])
367 self
.assertEqual(results
['foo.service'].get('Unit', 'Description'),
368 'LSB: test foo service')
370 # should create symlink for the alternative name
371 self
.assertEqual(os
.readlink(os
.path
.join(self
.out_dir
, 'bar.service')),
374 def test_hidden_files(self
):
375 '''init.d script with hidden file suffix'''
377 script
= self
.add_sysv('foo', {}, enable
=True)
378 # backup files (not enabled in rcN.d/)
379 shutil
.copy(script
, script
+ '.dpkg-new')
380 shutil
.copy(script
, script
+ '.dpkg-dist')
381 shutil
.copy(script
, script
+ '.swp')
382 shutil
.copy(script
, script
+ '.rpmsave')
384 err
, results
= self
.run_generator()
385 self
.assertEqual(list(results
), ['foo.service'])
387 self
.assert_enabled('foo.service', ['multi-user', 'graphical'])
389 def test_backup_file(self
):
390 '''init.d script with backup file'''
392 script
= self
.add_sysv('foo', {}, enable
=True)
393 # backup files (not enabled in rcN.d/)
394 shutil
.copy(script
, script
+ '.bak')
395 shutil
.copy(script
, script
+ '.old')
396 shutil
.copy(script
, script
+ '.tmp')
397 shutil
.copy(script
, script
+ '.new')
399 err
, results
= self
.run_generator()
401 self
.assertEqual(sorted(results
), ['foo.service', 'foo.tmp.service'])
403 # ensure we don't try to create a symlink to itself
404 self
.assertNotIn('itself', err
)
406 self
.assert_enabled('foo.service', ['multi-user', 'graphical'])
407 self
.assert_enabled('foo.bak.service', [])
408 self
.assert_enabled('foo.old.service', [])
410 def test_existing_native_unit(self
):
411 '''existing native unit'''
413 with
open(os
.path
.join(self
.unit_dir
, 'foo.service'), 'w') as f
:
416 self
.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable
=True)
417 err
, results
= self
.run_generator()
418 self
.assertEqual(list(results
), [])
419 # no enablement or alias links, as native unit is disabled
420 self
.assertEqual(os
.listdir(self
.out_dir
), [])
423 if __name__
== '__main__':
424 unittest
.main(testRunner
=unittest
.TextTestRunner(stream
=sys
.stdout
, verbosity
=2))