]> git.ipfire.org Git - thirdparty/systemd.git/blob - test/sysv-generator-test.py
network: make all failures in route configuration fatal
[thirdparty/systemd.git] / test / sysv-generator-test.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: LGPL-2.1+
3 #
4 # systemd-sysv-generator integration test
5 #
6 # © 2015 Canonical Ltd.
7 # Author: Martin Pitt <martin.pitt@ubuntu.com>
8
9 import collections
10 import os
11 import shutil
12 import subprocess
13 import sys
14 import tempfile
15 import unittest
16
17 from configparser import RawConfigParser
18 from glob import glob
19
20 sysv_generator = './systemd-sysv-generator'
21
22 class MultiDict(collections.OrderedDict):
23 def __setitem__(self, key, value):
24 if isinstance(value, list) and key in self:
25 self[key].extend(value)
26 else:
27 super(MultiDict, self).__setitem__(key, value)
28
29 class SysvGeneratorTest(unittest.TestCase):
30 def setUp(self):
31 self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.')
32 self.init_d_dir = os.path.join(self.workdir, 'init.d')
33 os.mkdir(self.init_d_dir)
34 self.rcnd_dir = self.workdir
35 self.unit_dir = os.path.join(self.workdir, 'systemd')
36 os.mkdir(self.unit_dir)
37 self.out_dir = os.path.join(self.workdir, 'output')
38 os.mkdir(self.out_dir)
39
40 def tearDown(self):
41 shutil.rmtree(self.workdir)
42
43 #
44 # Helper methods
45 #
46
47 def run_generator(self, expect_error=False):
48 '''Run sysv-generator.
49
50 Fail if stderr contains any "Fail", unless expect_error is True.
51 Return (stderr, filename -> ConfigParser) pair with output to stderr and
52 parsed generated units.
53 '''
54 env = os.environ.copy()
55 env['SYSTEMD_LOG_LEVEL'] = 'debug'
56 env['SYSTEMD_LOG_TARGET'] = 'console'
57 env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir
58 env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir
59 env['SYSTEMD_UNIT_PATH'] = self.unit_dir
60 gen = subprocess.Popen(
61 [sysv_generator, 'ignored', 'ignored', self.out_dir],
62 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
63 universal_newlines=True, env=env)
64 (out, err) = gen.communicate()
65 if not expect_error:
66 self.assertFalse('Fail' in err, err)
67 self.assertEqual(gen.returncode, 0, err)
68
69 results = {}
70 for service in glob(self.out_dir + '/*.service'):
71 if os.path.islink(service):
72 continue
73 try:
74 # for python3 we need here strict=False to parse multiple
75 # lines with the same key
76 cp = RawConfigParser(dict_type=MultiDict, strict=False)
77 except TypeError:
78 # RawConfigParser in python2 does not have the strict option
79 # but it allows multiple lines with the same key by default
80 cp = RawConfigParser(dict_type=MultiDict)
81 cp.optionxform = lambda o: o # don't lower-case option names
82 with open(service) as f:
83 cp.readfp(f)
84 results[os.path.basename(service)] = cp
85
86 return (err, results)
87
88 def add_sysv(self, fname, keys, enable=False, prio=1):
89 '''Create a SysV init script with the given keys in the LSB header
90
91 There are sensible default values for all fields.
92 If enable is True, links will be created in the rcN.d dirs. In that
93 case, the priority can be given with "prio" (default to 1).
94
95 Return path of generated script.
96 '''
97 name_without_sh = fname.endswith('.sh') and fname[:-3] or fname
98 keys.setdefault('Provides', name_without_sh)
99 keys.setdefault('Required-Start', '$local_fs')
100 keys.setdefault('Required-Stop', keys['Required-Start'])
101 keys.setdefault('Default-Start', '2 3 4 5')
102 keys.setdefault('Default-Stop', '0 1 6')
103 keys.setdefault('Short-Description', 'test {} service'.format(name_without_sh))
104 keys.setdefault('Description', 'long description for test {} service'.format(name_without_sh))
105 script = os.path.join(self.init_d_dir, fname)
106 with open(script, 'w') as f:
107 f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
108 for k, v in keys.items():
109 if v is not None:
110 f.write('#{:>20} {}\n'.format(k + ':', v))
111 f.write('### END INIT INFO\ncode --goes here\n')
112 os.chmod(script, 0o755)
113
114 if enable:
115 def make_link(prefix, runlevel):
116 d = os.path.join(self.rcnd_dir, 'rc{}.d'.format(runlevel))
117 if not os.path.isdir(d):
118 os.mkdir(d)
119 os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
120
121 for rl in keys['Default-Start'].split():
122 make_link('S%02i' % prio, rl)
123 for rl in keys['Default-Stop'].split():
124 make_link('K%02i' % (99 - prio), rl)
125
126 return script
127
128 def assert_enabled(self, unit, targets):
129 '''assert that a unit is enabled in precisely the given targets'''
130
131 all_targets = ['multi-user', 'graphical']
132
133 # should be enabled
134 for target in all_targets:
135 link = os.path.join(self.out_dir, '{}.target.wants'.format(target), unit)
136 if target in targets:
137 unit_file = os.readlink(link)
138 # os.path.exists() will fail on a dangling symlink
139 self.assertTrue(os.path.exists(link))
140 self.assertEqual(os.path.basename(unit_file), unit)
141 else:
142 self.assertFalse(os.path.exists(link),
143 '{} unexpectedly exists'.format(link))
144
145 #
146 # test cases
147 #
148
149 def test_nothing(self):
150 '''no input files'''
151
152 results = self.run_generator()[1]
153 self.assertEqual(results, {})
154 self.assertEqual(os.listdir(self.out_dir), [])
155
156 def test_simple_disabled(self):
157 '''simple service without dependencies, disabled'''
158
159 self.add_sysv('foo', {}, enable=False)
160 err, results = self.run_generator()
161 self.assertEqual(len(results), 1)
162
163 # no enablement links or other stuff
164 self.assertEqual(os.listdir(self.out_dir), ['foo.service'])
165
166 s = results['foo.service']
167 self.assertEqual(s.sections(), ['Unit', 'Service'])
168 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
169 # $local_fs does not need translation, don't expect any dependency
170 # fields here
171 self.assertEqual(set(s.options('Unit')),
172 set(['Documentation', 'SourcePath', 'Description']))
173
174 self.assertEqual(s.get('Service', 'Type'), 'forking')
175 init_script = os.path.join(self.init_d_dir, 'foo')
176 self.assertEqual(s.get('Service', 'ExecStart'),
177 '{} start'.format(init_script))
178 self.assertEqual(s.get('Service', 'ExecStop'),
179 '{} stop'.format(init_script))
180
181 self.assertNotIn('Overwriting', err)
182
183 def test_simple_enabled_all(self):
184 '''simple service without dependencies, enabled in all runlevels'''
185
186 self.add_sysv('foo', {}, enable=True)
187 err, results = self.run_generator()
188 self.assertEqual(list(results), ['foo.service'])
189 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
190 self.assertNotIn('Overwriting', err)
191
192 def test_simple_escaped(self):
193 '''simple service without dependencies, that requires escaping the name'''
194
195 self.add_sysv('foo+', {})
196 self.add_sysv('foo-admin', {})
197 err, results = self.run_generator()
198 self.assertEqual(set(results), {'foo-admin.service', 'foo\\x2b.service'})
199 self.assertNotIn('Overwriting', err)
200
201 def test_simple_enabled_some(self):
202 '''simple service without dependencies, enabled in some runlevels'''
203
204 self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True)
205 err, results = self.run_generator()
206 self.assertEqual(list(results), ['foo.service'])
207 self.assert_enabled('foo.service', ['multi-user'])
208
209 def test_lsb_macro_dep_single(self):
210 '''single LSB macro dependency: $network'''
211
212 self.add_sysv('foo', {'Required-Start': '$network'})
213 s = self.run_generator()[1]['foo.service']
214 self.assertEqual(set(s.options('Unit')),
215 set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
216 self.assertEqual(s.get('Unit', 'After'), 'network-online.target')
217 self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target')
218
219 def test_lsb_macro_dep_multi(self):
220 '''multiple LSB macro dependencies'''
221
222 self.add_sysv('foo', {'Required-Start': '$named $portmap'})
223 s = self.run_generator()[1]['foo.service']
224 self.assertEqual(set(s.options('Unit')),
225 set(['Documentation', 'SourcePath', 'Description', 'After']))
226 self.assertEqual(s.get('Unit', 'After').split(), ['nss-lookup.target', 'rpcbind.target'])
227
228 def test_lsb_deps(self):
229 '''LSB header dependencies to other services'''
230
231 # also give symlink priorities here; they should be ignored
232 self.add_sysv('foo', {'Required-Start': 'must1 must2',
233 'Should-Start': 'may1 ne_may2'},
234 enable=True, prio=40)
235 self.add_sysv('must1', {}, enable=True, prio=10)
236 self.add_sysv('must2', {}, enable=True, prio=15)
237 self.add_sysv('may1', {}, enable=True, prio=20)
238 # do not create ne_may2
239 err, results = self.run_generator()
240 self.assertEqual(sorted(results),
241 ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
242
243 # foo should depend on all of them
244 self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()),
245 ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
246
247 # other services should not depend on each other
248 self.assertFalse(results['must1.service'].has_option('Unit', 'After'))
249 self.assertFalse(results['must2.service'].has_option('Unit', 'After'))
250 self.assertFalse(results['may1.service'].has_option('Unit', 'After'))
251
252 def test_symlink_prio_deps(self):
253 '''script without LSB headers use rcN.d priority'''
254
255 # create two init.d scripts without LSB header and enable them with
256 # startup priorities
257 for prio, name in [(10, 'provider'), (15, 'consumer')]:
258 with open(os.path.join(self.init_d_dir, name), 'w') as f:
259 f.write('#!/bin/init-d-interpreter\ncode --goes here\n')
260 os.fchmod(f.fileno(), 0o755)
261
262 d = os.path.join(self.rcnd_dir, 'rc2.d')
263 if not os.path.isdir(d):
264 os.mkdir(d)
265 os.symlink('../init.d/' + name, os.path.join(d, 'S{:>2}{}'.format(prio, name)))
266
267 err, results = self.run_generator()
268 self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
269 self.assertFalse(results['provider.service'].has_option('Unit', 'After'))
270 self.assertEqual(results['consumer.service'].get('Unit', 'After'),
271 'provider.service')
272
273 def test_multiple_provides(self):
274 '''multiple Provides: names'''
275
276 self.add_sysv('foo', {'Provides': 'foo bar baz'})
277 err, results = self.run_generator()
278 self.assertEqual(list(results), ['foo.service'])
279 self.assertEqual(set(results['foo.service'].options('Unit')),
280 set(['Documentation', 'SourcePath', 'Description']))
281 # should create symlinks for the alternative names
282 for f in ['bar.service', 'baz.service']:
283 self.assertEqual(os.readlink(os.path.join(self.out_dir, f)),
284 'foo.service')
285 self.assertNotIn('Overwriting', err)
286
287 def test_provides_escaped(self):
288 '''a script that Provides: a name that requires escaping'''
289
290 self.add_sysv('foo', {'Provides': 'foo foo+'})
291 err, results = self.run_generator()
292 self.assertEqual(list(results), ['foo.service'])
293 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'foo\\x2b.service')),
294 'foo.service')
295 self.assertNotIn('Overwriting', err)
296
297 def test_same_provides_in_multiple_scripts(self):
298 '''multiple init.d scripts provide the same name'''
299
300 self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1)
301 self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2)
302 err, results = self.run_generator()
303 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
304 # should create symlink for the alternative name for either unit
305 self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')),
306 ['foo.service', 'bar.service'])
307
308 def test_provide_other_script(self):
309 '''init.d scripts provides the name of another init.d script'''
310
311 self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True)
312 self.add_sysv('bar', {'Provides': 'bar'}, enable=True)
313 err, results = self.run_generator()
314 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
315 # we do expect an overwrite here, bar.service should overwrite the
316 # alias link from foo.service
317 self.assertIn('Overwriting', err)
318
319 def test_nonexecutable_script(self):
320 '''ignores non-executable init.d script'''
321
322 os.chmod(self.add_sysv('foo', {}), 0o644)
323 err, results = self.run_generator()
324 self.assertEqual(results, {})
325
326 def test_sh_suffix(self):
327 '''init.d script with .sh suffix'''
328
329 self.add_sysv('foo.sh', {}, enable=True)
330 err, results = self.run_generator()
331 s = results['foo.service']
332
333 self.assertEqual(s.sections(), ['Unit', 'Service'])
334 # should not have a .sh
335 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
336
337 # calls correct script with .sh
338 init_script = os.path.join(self.init_d_dir, 'foo.sh')
339 self.assertEqual(s.get('Service', 'ExecStart'),
340 '{} start'.format(init_script))
341 self.assertEqual(s.get('Service', 'ExecStop'),
342 '{} stop'.format(init_script))
343
344 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
345
346 def test_sh_suffix_with_provides(self):
347 '''init.d script with .sh suffix and Provides:'''
348
349 self.add_sysv('foo.sh', {'Provides': 'foo bar'})
350 err, results = self.run_generator()
351 # ensure we don't try to create a symlink to itself
352 self.assertNotIn('itself', err)
353 self.assertEqual(list(results), ['foo.service'])
354 self.assertEqual(results['foo.service'].get('Unit', 'Description'),
355 'LSB: test foo service')
356
357 # should create symlink for the alternative name
358 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')),
359 'foo.service')
360
361 def test_hidden_files(self):
362 '''init.d script with hidden file suffix'''
363
364 script = self.add_sysv('foo', {}, enable=True)
365 # backup files (not enabled in rcN.d/)
366 shutil.copy(script, script + '.dpkg-new')
367 shutil.copy(script, script + '.dpkg-dist')
368 shutil.copy(script, script + '.swp')
369 shutil.copy(script, script + '.rpmsave')
370
371 err, results = self.run_generator()
372 self.assertEqual(list(results), ['foo.service'])
373
374 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
375
376 def test_backup_file(self):
377 '''init.d script with backup file'''
378
379 script = self.add_sysv('foo', {}, enable=True)
380 # backup files (not enabled in rcN.d/)
381 shutil.copy(script, script + '.bak')
382 shutil.copy(script, script + '.old')
383 shutil.copy(script, script + '.tmp')
384 shutil.copy(script, script + '.new')
385
386 err, results = self.run_generator()
387 print(err)
388 self.assertEqual(sorted(results), ['foo.service', 'foo.tmp.service'])
389
390 # ensure we don't try to create a symlink to itself
391 self.assertNotIn('itself', err)
392
393 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
394 self.assert_enabled('foo.bak.service', [])
395 self.assert_enabled('foo.old.service', [])
396
397 def test_existing_native_unit(self):
398 '''existing native unit'''
399
400 with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f:
401 f.write('[Unit]\n')
402
403 self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True)
404 err, results = self.run_generator()
405 self.assertEqual(list(results), [])
406 # no enablement or alias links, as native unit is disabled
407 self.assertEqual(os.listdir(self.out_dir), [])
408
409
410 if __name__ == '__main__':
411 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))