]>
git.ipfire.org Git - thirdparty/systemd.git/blob - test/sysv-generator-test.py
1 # systemd-sysv-generator integration test
3 # (C) 2015 Canonical Ltd.
4 # Author: Martin Pitt <martin.pitt@ubuntu.com>
6 # systemd is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as published by
8 # the Free Software Foundation; either version 2.1 of the License, or
9 # (at your option) any later version.
11 # systemd is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # Lesser General Public License for more details.
16 # You should have received a copy of the GNU Lesser General Public License
17 # along with systemd; If not, see <http://www.gnu.org/licenses/>.
29 from configparser
import RawConfigParser
32 from ConfigParser
import RawConfigParser
34 sysv_generator
= os
.path
.join(os
.environ
.get('builddir', '.'), 'systemd-sysv-generator')
36 class MultiDict(collections
.OrderedDict
):
37 def __setitem__(self
, key
, value
):
38 if isinstance(value
, list) and key
in self
:
39 self
[key
].extend(value
)
41 super(MultiDict
, self
).__setitem
__(key
, value
)
43 class SysvGeneratorTest(unittest
.TestCase
):
45 self
.workdir
= tempfile
.mkdtemp(prefix
='sysv-gen-test.')
46 self
.init_d_dir
= os
.path
.join(self
.workdir
, 'init.d')
47 os
.mkdir(self
.init_d_dir
)
48 self
.rcnd_dir
= self
.workdir
49 self
.unit_dir
= os
.path
.join(self
.workdir
, 'systemd')
50 os
.mkdir(self
.unit_dir
)
51 self
.out_dir
= os
.path
.join(self
.workdir
, 'output')
52 os
.mkdir(self
.out_dir
)
55 shutil
.rmtree(self
.workdir
)
61 def run_generator(self
, expect_error
=False):
62 '''Run sysv-generator.
64 Fail if stderr contains any "Fail", unless expect_error is True.
65 Return (stderr, filename -> ConfigParser) pair with ouput to stderr and
66 parsed generated units.
68 env
= os
.environ
.copy()
69 env
['SYSTEMD_LOG_LEVEL'] = 'debug'
70 env
['SYSTEMD_LOG_TARGET'] = 'console'
71 env
['SYSTEMD_SYSVINIT_PATH'] = self
.init_d_dir
72 env
['SYSTEMD_SYSVRCND_PATH'] = self
.rcnd_dir
73 env
['SYSTEMD_UNIT_PATH'] = self
.unit_dir
74 gen
= subprocess
.Popen(
75 [sysv_generator
, 'ignored', 'ignored', self
.out_dir
],
76 stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
,
77 universal_newlines
=True, env
=env
)
78 (out
, err
) = gen
.communicate()
80 self
.assertFalse('Fail' in err
, err
)
81 self
.assertEqual(gen
.returncode
, 0, err
)
84 for service
in glob(self
.out_dir
+ '/*.service'):
85 if os
.path
.islink(service
):
88 # for python3 we need here strict=False to parse multiple
89 # lines with the same key
90 cp
= RawConfigParser(dict_type
=MultiDict
, strict
=False)
92 # RawConfigParser in python2 does not have the strict option
93 # but it allows multiple lines with the same key by default
94 cp
= RawConfigParser(dict_type
=MultiDict
)
95 cp
.optionxform
= lambda o
: o
# don't lower-case option names
96 with
open(service
) as f
:
98 results
[os
.path
.basename(service
)] = cp
100 return (err
, results
)
102 def add_sysv(self
, fname
, keys
, enable
=False, prio
=1):
103 '''Create a SysV init script with the given keys in the LSB header
105 There are sensible default values for all fields.
106 If enable is True, links will be created in the rcN.d dirs. In that
107 case, the priority can be given with "prio" (default to 1).
109 Return path of generated script.
111 name_without_sh
= fname
.endswith('.sh') and fname
[:-3] or fname
112 keys
.setdefault('Provides', name_without_sh
)
113 keys
.setdefault('Required-Start', '$local_fs')
114 keys
.setdefault('Required-Stop', keys
['Required-Start'])
115 keys
.setdefault('Default-Start', '2 3 4 5')
116 keys
.setdefault('Default-Stop', '0 1 6')
117 keys
.setdefault('Short-Description', 'test %s service' %
119 keys
.setdefault('Description', 'long description for test %s service' %
121 script
= os
.path
.join(self
.init_d_dir
, fname
)
122 with
open(script
, 'w') as f
:
123 f
.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
124 for k
, v
in keys
.items():
126 f
.write('#%20s %s\n' % (k
+ ':', v
))
127 f
.write('### END INIT INFO\ncode --goes here\n')
128 os
.chmod(script
, 0o755)
131 def make_link(prefix
, runlevel
):
132 d
= os
.path
.join(self
.rcnd_dir
, 'rc%s.d' % runlevel
)
133 if not os
.path
.isdir(d
):
135 os
.symlink('../init.d/' + fname
, os
.path
.join(d
, prefix
+ fname
))
137 for rl
in keys
['Default-Start'].split():
138 make_link('S%02i' % prio
, rl
)
139 for rl
in keys
['Default-Stop'].split():
140 make_link('K%02i' % (99 - prio
), rl
)
144 def assert_enabled(self
, unit
, targets
):
145 '''assert that a unit is enabled in precisely the given targets'''
147 all_targets
= ['multi-user', 'graphical']
150 for target
in all_targets
:
151 link
= os
.path
.join(self
.out_dir
, '%s.target.wants' % target
, unit
)
152 if target
in targets
:
153 unit_file
= os
.readlink(link
)
154 self
.assertTrue(os
.path
.exists(unit_file
))
155 self
.assertEqual(os
.path
.basename(unit_file
), unit
)
157 self
.assertFalse(os
.path
.exists(link
),
158 '%s unexpectedly exists' % link
)
164 def test_nothing(self
):
167 results
= self
.run_generator()[1]
168 self
.assertEqual(results
, {})
169 self
.assertEqual(os
.listdir(self
.out_dir
), [])
171 def test_simple_disabled(self
):
172 '''simple service without dependencies, disabled'''
174 self
.add_sysv('foo', {}, enable
=False)
175 err
, results
= self
.run_generator()
176 self
.assertEqual(len(results
), 1)
178 # no enablement links or other stuff
179 self
.assertEqual(os
.listdir(self
.out_dir
), ['foo.service'])
181 s
= results
['foo.service']
182 self
.assertEqual(s
.sections(), ['Unit', 'Service'])
183 self
.assertEqual(s
.get('Unit', 'Description'), 'LSB: test foo service')
184 # $local_fs does not need translation, don't expect any dependency
186 self
.assertEqual(set(s
.options('Unit')),
187 set(['Documentation', 'SourcePath', 'Description']))
189 self
.assertEqual(s
.get('Service', 'Type'), 'forking')
190 init_script
= os
.path
.join(self
.init_d_dir
, 'foo')
191 self
.assertEqual(s
.get('Service', 'ExecStart'),
192 '%s start' % init_script
)
193 self
.assertEqual(s
.get('Service', 'ExecStop'),
194 '%s stop' % init_script
)
196 self
.assertNotIn('Overwriting', err
)
198 def test_simple_enabled_all(self
):
199 '''simple service without dependencies, enabled in all runlevels'''
201 self
.add_sysv('foo', {}, enable
=True)
202 err
, results
= self
.run_generator()
203 self
.assertEqual(list(results
), ['foo.service'])
204 self
.assert_enabled('foo.service', ['multi-user', 'graphical'])
205 self
.assertNotIn('Overwriting', err
)
207 def test_simple_escaped(self
):
208 '''simple service without dependencies, that requires escaping the name'''
210 self
.add_sysv('foo+', {})
211 self
.add_sysv('foo-admin', {})
212 err
, results
= self
.run_generator()
213 self
.assertEqual(set(results
), {'foo-admin.service', 'foo\\x2b.service'})
214 self
.assertNotIn('Overwriting', err
)
216 def test_simple_enabled_some(self
):
217 '''simple service without dependencies, enabled in some runlevels'''
219 self
.add_sysv('foo', {'Default-Start': '2 4'}, enable
=True)
220 err
, results
= self
.run_generator()
221 self
.assertEqual(list(results
), ['foo.service'])
222 self
.assert_enabled('foo.service', ['multi-user'])
224 def test_lsb_macro_dep_single(self
):
225 '''single LSB macro dependency: $network'''
227 self
.add_sysv('foo', {'Required-Start': '$network'})
228 s
= self
.run_generator()[1]['foo.service']
229 self
.assertEqual(set(s
.options('Unit')),
230 set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
231 self
.assertEqual(s
.get('Unit', 'After'), 'network-online.target')
232 self
.assertEqual(s
.get('Unit', 'Wants'), 'network-online.target')
234 def test_lsb_macro_dep_multi(self
):
235 '''multiple LSB macro dependencies'''
237 self
.add_sysv('foo', {'Required-Start': '$named $portmap'})
238 s
= self
.run_generator()[1]['foo.service']
239 self
.assertEqual(set(s
.options('Unit')),
240 set(['Documentation', 'SourcePath', 'Description', 'After']))
241 self
.assertEqual(s
.get('Unit', 'After').split(), ['nss-lookup.target', 'rpcbind.target'])
243 def test_lsb_deps(self
):
244 '''LSB header dependencies to other services'''
246 # also give symlink priorities here; they should be ignored
247 self
.add_sysv('foo', {'Required-Start': 'must1 must2',
248 'Should-Start': 'may1 ne_may2'},
249 enable
=True, prio
=40)
250 self
.add_sysv('must1', {}, enable
=True, prio
=10)
251 self
.add_sysv('must2', {}, enable
=True, prio
=15)
252 self
.add_sysv('may1', {}, enable
=True, prio
=20)
253 # do not create ne_may2
254 err
, results
= self
.run_generator()
255 self
.assertEqual(sorted(results
),
256 ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
258 # foo should depend on all of them
259 self
.assertEqual(sorted(results
['foo.service'].get('Unit', 'After').split()),
260 ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
262 # other services should not depend on each other
263 self
.assertFalse(results
['must1.service'].has_option('Unit', 'After'))
264 self
.assertFalse(results
['must2.service'].has_option('Unit', 'After'))
265 self
.assertFalse(results
['may1.service'].has_option('Unit', 'After'))
267 def test_symlink_prio_deps(self
):
268 '''script without LSB headers use rcN.d priority'''
270 # create two init.d scripts without LSB header and enable them with
272 for prio
, name
in [(10, 'provider'), (15, 'consumer')]:
273 with
open(os
.path
.join(self
.init_d_dir
, name
), 'w') as f
:
274 f
.write('#!/bin/init-d-interpreter\ncode --goes here\n')
275 os
.fchmod(f
.fileno(), 0o755)
277 d
= os
.path
.join(self
.rcnd_dir
, 'rc2.d')
278 if not os
.path
.isdir(d
):
280 os
.symlink('../init.d/' + name
, os
.path
.join(d
, 'S%02i%s' % (prio
, name
)))
282 err
, results
= self
.run_generator()
283 self
.assertEqual(sorted(results
), ['consumer.service', 'provider.service'])
284 self
.assertFalse(results
['provider.service'].has_option('Unit', 'After'))
285 self
.assertEqual(results
['consumer.service'].get('Unit', 'After'),
288 def test_multiple_provides(self
):
289 '''multiple Provides: names'''
291 self
.add_sysv('foo', {'Provides': 'foo bar baz'})
292 err
, results
= self
.run_generator()
293 self
.assertEqual(list(results
), ['foo.service'])
294 self
.assertEqual(set(results
['foo.service'].options('Unit')),
295 set(['Documentation', 'SourcePath', 'Description']))
296 # should create symlinks for the alternative names
297 for f
in ['bar.service', 'baz.service']:
298 self
.assertEqual(os
.readlink(os
.path
.join(self
.out_dir
, f
)),
300 self
.assertNotIn('Overwriting', err
)
302 def test_provides_escaped(self
):
303 '''a script that Provides: a name that requires escaping'''
305 self
.add_sysv('foo', {'Provides': 'foo foo+'})
306 err
, results
= self
.run_generator()
307 self
.assertEqual(list(results
), ['foo.service'])
308 self
.assertEqual(os
.readlink(os
.path
.join(self
.out_dir
, 'foo\\x2b.service')),
310 self
.assertNotIn('Overwriting', err
)
312 def test_same_provides_in_multiple_scripts(self
):
313 '''multiple init.d scripts provide the same name'''
315 self
.add_sysv('foo', {'Provides': 'foo common'}, enable
=True, prio
=1)
316 self
.add_sysv('bar', {'Provides': 'bar common'}, enable
=True, prio
=2)
317 err
, results
= self
.run_generator()
318 self
.assertEqual(sorted(results
), ['bar.service', 'foo.service'])
319 # should create symlink for the alternative name for either unit
320 self
.assertIn(os
.readlink(os
.path
.join(self
.out_dir
, 'common.service')),
321 ['foo.service', 'bar.service'])
323 def test_provide_other_script(self
):
324 '''init.d scripts provides the name of another init.d script'''
326 self
.add_sysv('foo', {'Provides': 'foo bar'}, enable
=True)
327 self
.add_sysv('bar', {'Provides': 'bar'}, enable
=True)
328 err
, results
= self
.run_generator()
329 self
.assertEqual(sorted(results
), ['bar.service', 'foo.service'])
330 # we do expect an overwrite here, bar.service should overwrite the
331 # alias link from foo.service
332 self
.assertIn('Overwriting', err
)
334 def test_nonexecutable_script(self
):
335 '''ignores non-executable init.d script'''
337 os
.chmod(self
.add_sysv('foo', {}), 0o644)
338 err
, results
= self
.run_generator()
339 self
.assertEqual(results
, {})
341 def test_sh_suffix(self
):
342 '''init.d script with .sh suffix'''
344 self
.add_sysv('foo.sh', {}, enable
=True)
345 err
, results
= self
.run_generator()
346 s
= results
['foo.service']
348 self
.assertEqual(s
.sections(), ['Unit', 'Service'])
349 # should not have a .sh
350 self
.assertEqual(s
.get('Unit', 'Description'), 'LSB: test foo service')
352 # calls correct script with .sh
353 init_script
= os
.path
.join(self
.init_d_dir
, 'foo.sh')
354 self
.assertEqual(s
.get('Service', 'ExecStart'),
355 '%s start' % init_script
)
356 self
.assertEqual(s
.get('Service', 'ExecStop'),
357 '%s stop' % init_script
)
359 self
.assert_enabled('foo.service', ['multi-user', 'graphical'])
361 def test_sh_suffix_with_provides(self
):
362 '''init.d script with .sh suffix and Provides:'''
364 self
.add_sysv('foo.sh', {'Provides': 'foo bar'})
365 err
, results
= self
.run_generator()
366 # ensure we don't try to create a symlink to itself
367 self
.assertNotIn('itself', err
)
368 self
.assertEqual(list(results
), ['foo.service'])
369 self
.assertEqual(results
['foo.service'].get('Unit', 'Description'),
370 'LSB: test foo service')
372 # should create symlink for the alternative name
373 self
.assertEqual(os
.readlink(os
.path
.join(self
.out_dir
, 'bar.service')),
376 def test_hidden_files(self
):
377 '''init.d script with hidden file suffix'''
379 script
= self
.add_sysv('foo', {}, enable
=True)
380 # backup files (not enabled in rcN.d/)
381 shutil
.copy(script
, script
+ '.dpkg-new')
382 shutil
.copy(script
, script
+ '.dpkg-dist')
383 shutil
.copy(script
, script
+ '.swp')
384 shutil
.copy(script
, script
+ '.rpmsave')
386 err
, results
= self
.run_generator()
387 self
.assertEqual(list(results
), ['foo.service'])
389 self
.assert_enabled('foo.service', ['multi-user', 'graphical'])
391 def test_backup_file(self
):
392 '''init.d script with backup file'''
394 script
= self
.add_sysv('foo', {}, enable
=True)
395 # backup files (not enabled in rcN.d/)
396 shutil
.copy(script
, script
+ '.bak')
397 shutil
.copy(script
, script
+ '.old')
399 err
, results
= self
.run_generator()
401 self
.assertEqual(sorted(results
),
402 ['foo.bak.service', 'foo.old.service', 'foo.service'])
404 # ensure we don't try to create a symlink to itself
405 self
.assertNotIn('itself', err
)
407 self
.assert_enabled('foo.service', ['multi-user', 'graphical'])
408 self
.assert_enabled('foo.bak.service', [])
409 self
.assert_enabled('foo.old.service', [])
411 def test_existing_native_unit(self
):
412 '''existing native unit'''
414 with
open(os
.path
.join(self
.unit_dir
, 'foo.service'), 'w') as f
:
417 self
.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable
=True)
418 err
, results
= self
.run_generator()
419 self
.assertEqual(list(results
), [])
420 # no enablement or alias links, as native unit is disabled
421 self
.assertEqual(os
.listdir(self
.out_dir
), [])
424 if __name__
== '__main__':
425 unittest
.main(testRunner
=unittest
.TextTestRunner(stream
=sys
.stdout
, verbosity
=2))