]> git.ipfire.org Git - thirdparty/systemd.git/blob - test/sysv-generator-test.py
Merge pull request #32249 from CodethinkLabs/vmspawn/predicatable_tap_names
[thirdparty/systemd.git] / test / sysv-generator-test.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: LGPL-2.1-or-later
3 #
4 # systemd-sysv-generator integration test
5 #
6 # © 2015 Canonical Ltd.
7 # Author: Martin Pitt <martin.pitt@ubuntu.com>
8
9 import collections
10 import os
11 import shutil
12 import subprocess
13 import sys
14 import tempfile
15 import unittest
16
17 from configparser import RawConfigParser
18 from glob import glob
19
20 sysv_generator = './systemd-sysv-generator'
21
22 class MultiDict(collections.OrderedDict):
23 def __setitem__(self, key, value):
24 if isinstance(value, list) and key in self:
25 self[key].extend(value)
26 else:
27 super(MultiDict, self).__setitem__(key, value)
28
29 class SysvGeneratorTest(unittest.TestCase):
30 def setUp(self):
31 self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.')
32 self.init_d_dir = os.path.join(self.workdir, 'init.d')
33 os.mkdir(self.init_d_dir)
34 self.rcnd_dir = self.workdir
35 self.unit_dir = os.path.join(self.workdir, 'systemd')
36 os.mkdir(self.unit_dir)
37 self.out_dir = os.path.join(self.workdir, 'output')
38 os.mkdir(self.out_dir)
39
40 def tearDown(self):
41 shutil.rmtree(self.workdir)
42
43 #
44 # Helper methods
45 #
46
47 def run_generator(self, expect_error=False):
48 '''Run sysv-generator.
49
50 Fail if stderr contains any "Fail", unless expect_error is True.
51 Return (stderr, filename -> ConfigParser) pair with output to stderr and
52 parsed generated units.
53 '''
54 env = os.environ.copy()
55 # We might debug log about errors that aren't actually fatal so let's bump the log level to info to
56 # prevent those logs from interfering with the test.
57 env['SYSTEMD_LOG_LEVEL'] = 'info'
58 env['SYSTEMD_LOG_TARGET'] = 'console'
59 env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir
60 env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir
61 env['SYSTEMD_UNIT_PATH'] = self.unit_dir
62 gen = subprocess.Popen(
63 [sysv_generator, 'ignored', 'ignored', self.out_dir],
64 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
65 universal_newlines=True, env=env)
66 (out, err) = gen.communicate()
67 if not expect_error:
68 self.assertFalse('Fail' in err, err)
69 self.assertEqual(gen.returncode, 0, err)
70
71 results = {}
72 for service in glob(self.out_dir + '/*.service'):
73 if os.path.islink(service):
74 continue
75 try:
76 # for python3 we need here strict=False to parse multiple
77 # lines with the same key
78 cp = RawConfigParser(dict_type=MultiDict, strict=False)
79 except TypeError:
80 # RawConfigParser in python2 does not have the strict option
81 # but it allows multiple lines with the same key by default
82 cp = RawConfigParser(dict_type=MultiDict)
83 cp.optionxform = lambda o: o # don't lower-case option names
84 with open(service) as f:
85 cp.read_file(f)
86 results[os.path.basename(service)] = cp
87
88 return (err, results)
89
90 def add_sysv(self, fname, keys, enable=False, prio=1):
91 '''Create a SysV init script with the given keys in the LSB header
92
93 There are sensible default values for all fields.
94 If enable is True, links will be created in the rcN.d dirs. In that
95 case, the priority can be given with "prio" (default to 1).
96
97 Return path of generated script.
98 '''
99 name_without_sh = fname.endswith('.sh') and fname[:-3] or fname
100 keys.setdefault('Provides', name_without_sh)
101 keys.setdefault('Required-Start', '$local_fs')
102 keys.setdefault('Required-Stop', keys['Required-Start'])
103 keys.setdefault('Default-Start', '2 3 4 5')
104 keys.setdefault('Default-Stop', '0 1 6')
105 keys.setdefault('Short-Description', 'test {} service'.format(name_without_sh))
106 keys.setdefault('Description', 'long description for test {} service'.format(name_without_sh))
107 script = os.path.join(self.init_d_dir, fname)
108 with open(script, 'w') as f:
109 f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
110 for k, v in keys.items():
111 if v is not None:
112 f.write('#{:>20} {}\n'.format(k + ':', v))
113 f.write('### END INIT INFO\ncode --goes here\n')
114 os.chmod(script, 0o755)
115
116 if enable:
117 def make_link(prefix, runlevel):
118 d = os.path.join(self.rcnd_dir, 'rc{}.d'.format(runlevel))
119 if not os.path.isdir(d):
120 os.mkdir(d)
121 os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
122
123 for rl in keys['Default-Start'].split():
124 make_link('S%02i' % prio, rl)
125 for rl in keys['Default-Stop'].split():
126 make_link('K%02i' % (99 - prio), rl)
127
128 return script
129
130 def assert_enabled(self, unit, targets):
131 '''assert that a unit is enabled in precisely the given targets'''
132
133 all_targets = ['multi-user', 'graphical']
134
135 # should be enabled
136 for target in all_targets:
137 link = os.path.join(self.out_dir, '{}.target.wants'.format(target), unit)
138 if target in targets:
139 unit_file = os.readlink(link)
140 # os.path.exists() will fail on a dangling symlink
141 self.assertTrue(os.path.exists(link))
142 self.assertEqual(os.path.basename(unit_file), unit)
143 else:
144 self.assertFalse(os.path.exists(link),
145 '{} unexpectedly exists'.format(link))
146
147 #
148 # test cases
149 #
150
151 def test_nothing(self):
152 '''no input files'''
153
154 results = self.run_generator()[1]
155 self.assertEqual(results, {})
156 self.assertEqual(os.listdir(self.out_dir), [])
157
158 def test_simple_disabled(self):
159 '''simple service without dependencies, disabled'''
160
161 self.add_sysv('foo', {}, enable=False)
162 err, results = self.run_generator()
163 self.assertEqual(len(results), 1)
164
165 # no enablement links or other stuff
166 self.assertEqual(os.listdir(self.out_dir), ['foo.service'])
167
168 s = results['foo.service']
169 self.assertEqual(s.sections(), ['Unit', 'Service'])
170 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
171 # $local_fs does not need translation, don't expect any dependency
172 # fields here
173 self.assertEqual(set(s.options('Unit')),
174 set(['Documentation', 'SourcePath', 'Description']))
175
176 self.assertEqual(s.get('Service', 'Type'), 'forking')
177 init_script = os.path.join(self.init_d_dir, 'foo')
178 self.assertEqual(s.get('Service', 'ExecStart'),
179 '{} start'.format(init_script))
180 self.assertEqual(s.get('Service', 'ExecStop'),
181 '{} stop'.format(init_script))
182
183 self.assertNotIn('Overwriting', err)
184
185 def test_simple_enabled_all(self):
186 '''simple service without dependencies, enabled in all runlevels'''
187
188 self.add_sysv('foo', {}, enable=True)
189 err, results = self.run_generator()
190 self.assertEqual(list(results), ['foo.service'])
191 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
192 self.assertNotIn('Overwriting', err)
193
194 def test_simple_escaped(self):
195 '''simple service without dependencies, that requires escaping the name'''
196
197 self.add_sysv('foo+', {})
198 self.add_sysv('foo-admin', {})
199 err, results = self.run_generator()
200 self.assertEqual(set(results), {'foo-admin.service', 'foo\\x2b.service'})
201 self.assertNotIn('Overwriting', err)
202
203 def test_simple_enabled_some(self):
204 '''simple service without dependencies, enabled in some runlevels'''
205
206 self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True)
207 err, results = self.run_generator()
208 self.assertEqual(list(results), ['foo.service'])
209 self.assert_enabled('foo.service', ['multi-user'])
210
211 def test_lsb_macro_dep_single(self):
212 '''single LSB macro dependency: $network'''
213
214 self.add_sysv('foo', {'Required-Start': '$network'})
215 s = self.run_generator()[1]['foo.service']
216 self.assertEqual(set(s.options('Unit')),
217 set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
218 self.assertEqual(s.get('Unit', 'After'), 'network-online.target')
219 self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target')
220
221 def test_lsb_macro_dep_multi(self):
222 '''multiple LSB macro dependencies'''
223
224 self.add_sysv('foo', {'Required-Start': '$named $portmap'})
225 s = self.run_generator()[1]['foo.service']
226 self.assertEqual(set(s.options('Unit')),
227 set(['Documentation', 'SourcePath', 'Description', 'After']))
228 self.assertEqual(s.get('Unit', 'After').split(), ['nss-lookup.target', 'rpcbind.target'])
229
230 def test_lsb_deps(self):
231 '''LSB header dependencies to other services'''
232
233 # also give symlink priorities here; they should be ignored
234 self.add_sysv('foo', {'Required-Start': 'must1 must2',
235 'Should-Start': 'may1 ne_may2'},
236 enable=True, prio=40)
237 self.add_sysv('must1', {}, enable=True, prio=10)
238 self.add_sysv('must2', {}, enable=True, prio=15)
239 self.add_sysv('may1', {}, enable=True, prio=20)
240 # do not create ne_may2
241 err, results = self.run_generator()
242 self.assertEqual(sorted(results),
243 ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
244
245 # foo should depend on all of them
246 self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()),
247 ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
248
249 # other services should not depend on each other
250 self.assertFalse(results['must1.service'].has_option('Unit', 'After'))
251 self.assertFalse(results['must2.service'].has_option('Unit', 'After'))
252 self.assertFalse(results['may1.service'].has_option('Unit', 'After'))
253
254 def test_symlink_prio_deps(self):
255 '''script without LSB headers use rcN.d priority'''
256
257 # create two init.d scripts without LSB header and enable them with
258 # startup priorities
259 for prio, name in [(10, 'provider'), (15, 'consumer')]:
260 with open(os.path.join(self.init_d_dir, name), 'w') as f:
261 f.write('#!/bin/init-d-interpreter\ncode --goes here\n')
262 os.fchmod(f.fileno(), 0o755)
263
264 d = os.path.join(self.rcnd_dir, 'rc2.d')
265 if not os.path.isdir(d):
266 os.mkdir(d)
267 os.symlink('../init.d/' + name, os.path.join(d, 'S{:>2}{}'.format(prio, name)))
268
269 err, results = self.run_generator()
270 self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
271 self.assertFalse(results['provider.service'].has_option('Unit', 'After'))
272 self.assertEqual(results['consumer.service'].get('Unit', 'After'),
273 'provider.service')
274
275 def test_multiple_provides(self):
276 '''multiple Provides: names'''
277
278 self.add_sysv('foo', {'Provides': 'foo bar baz'})
279 err, results = self.run_generator()
280 self.assertEqual(list(results), ['foo.service'])
281 self.assertEqual(set(results['foo.service'].options('Unit')),
282 set(['Documentation', 'SourcePath', 'Description']))
283 # should create symlinks for the alternative names
284 for f in ['bar.service', 'baz.service']:
285 self.assertEqual(os.readlink(os.path.join(self.out_dir, f)),
286 'foo.service')
287 self.assertNotIn('Overwriting', err)
288
289 def test_provides_escaped(self):
290 '''a script that Provides: a name that requires escaping'''
291
292 self.add_sysv('foo', {'Provides': 'foo foo+'})
293 err, results = self.run_generator()
294 self.assertEqual(list(results), ['foo.service'])
295 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'foo\\x2b.service')),
296 'foo.service')
297 self.assertNotIn('Overwriting', err)
298
299 def test_same_provides_in_multiple_scripts(self):
300 '''multiple init.d scripts provide the same name'''
301
302 self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1)
303 self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2)
304 err, results = self.run_generator()
305 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
306 # should create symlink for the alternative name for either unit
307 self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')),
308 ['foo.service', 'bar.service'])
309
310 def test_provide_other_script(self):
311 '''init.d scripts provides the name of another init.d script'''
312
313 self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True)
314 self.add_sysv('bar', {'Provides': 'bar'}, enable=True)
315 err, results = self.run_generator()
316 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
317 # we do expect an overwrite here, bar.service should overwrite the
318 # alias link from foo.service
319 self.assertIn('Overwriting', err)
320
321 def test_nonexecutable_script(self):
322 '''ignores non-executable init.d script'''
323
324 os.chmod(self.add_sysv('foo', {}), 0o644)
325 err, results = self.run_generator()
326 self.assertEqual(results, {})
327
328 def test_sh_suffix(self):
329 '''init.d script with .sh suffix'''
330
331 self.add_sysv('foo.sh', {}, enable=True)
332 err, results = self.run_generator()
333 s = results['foo.service']
334
335 self.assertEqual(s.sections(), ['Unit', 'Service'])
336 # should not have a .sh
337 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
338
339 # calls correct script with .sh
340 init_script = os.path.join(self.init_d_dir, 'foo.sh')
341 self.assertEqual(s.get('Service', 'ExecStart'),
342 '{} start'.format(init_script))
343 self.assertEqual(s.get('Service', 'ExecStop'),
344 '{} stop'.format(init_script))
345
346 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
347
348 def test_sh_suffix_with_provides(self):
349 '''init.d script with .sh suffix and Provides:'''
350
351 self.add_sysv('foo.sh', {'Provides': 'foo bar'})
352 err, results = self.run_generator()
353 # ensure we don't try to create a symlink to itself
354 self.assertNotIn('itself', err)
355 self.assertEqual(list(results), ['foo.service'])
356 self.assertEqual(results['foo.service'].get('Unit', 'Description'),
357 'LSB: test foo service')
358
359 # should create symlink for the alternative name
360 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')),
361 'foo.service')
362
363 def test_hidden_files(self):
364 '''init.d script with hidden file suffix'''
365
366 script = self.add_sysv('foo', {}, enable=True)
367 # backup files (not enabled in rcN.d/)
368 shutil.copy(script, script + '.dpkg-new')
369 shutil.copy(script, script + '.dpkg-dist')
370 shutil.copy(script, script + '.swp')
371 shutil.copy(script, script + '.rpmsave')
372
373 err, results = self.run_generator()
374 self.assertEqual(list(results), ['foo.service'])
375
376 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
377
378 def test_backup_file(self):
379 '''init.d script with backup file'''
380
381 script = self.add_sysv('foo', {}, enable=True)
382 # backup files (not enabled in rcN.d/)
383 shutil.copy(script, script + '.bak')
384 shutil.copy(script, script + '.old')
385 shutil.copy(script, script + '.tmp')
386 shutil.copy(script, script + '.new')
387
388 err, results = self.run_generator()
389 print(err)
390 self.assertEqual(sorted(results), ['foo.service', 'foo.tmp.service'])
391
392 # ensure we don't try to create a symlink to itself
393 self.assertNotIn('itself', err)
394
395 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
396 self.assert_enabled('foo.bak.service', [])
397 self.assert_enabled('foo.old.service', [])
398
399 def test_existing_native_unit(self):
400 '''existing native unit'''
401
402 with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f:
403 f.write('[Unit]\n')
404
405 self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True)
406 err, results = self.run_generator()
407 self.assertEqual(list(results), [])
408 # no enablement or alias links, as native unit is disabled
409 self.assertEqual(os.listdir(self.out_dir), [])
410
411
412 if __name__ == '__main__':
413 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))