]> git.ipfire.org Git - thirdparty/systemd.git/blame - test/sysv-generator-test.py
network: make all failures in route configuration fatal
[thirdparty/systemd.git] / test / sysv-generator-test.py
CommitLineData
3e67e5c9 1#!/usr/bin/env python3
35df7443 2# SPDX-License-Identifier: LGPL-2.1+
1f7be300 3#
e28aa588
MP
4# systemd-sysv-generator integration test
5#
810adae9 6# © 2015 Canonical Ltd.
e28aa588 7# Author: Martin Pitt <martin.pitt@ubuntu.com>
e28aa588 8
278391c2 9import collections
e28aa588 10import os
278391c2 11import shutil
e28aa588 12import subprocess
278391c2 13import sys
e28aa588 14import tempfile
278391c2
BOT
15import unittest
16
a4b57b32 17from configparser import RawConfigParser
278391c2 18from glob import glob
e28aa588 19
73e406e1 20sysv_generator = './systemd-sysv-generator'
e28aa588 21
c584ffc0
LN
22class MultiDict(collections.OrderedDict):
23 def __setitem__(self, key, value):
24 if isinstance(value, list) and key in self:
25 self[key].extend(value)
26 else:
27 super(MultiDict, self).__setitem__(key, value)
e28aa588
MP
28
29class SysvGeneratorTest(unittest.TestCase):
30 def setUp(self):
31 self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.')
32 self.init_d_dir = os.path.join(self.workdir, 'init.d')
33 os.mkdir(self.init_d_dir)
34 self.rcnd_dir = self.workdir
35 self.unit_dir = os.path.join(self.workdir, 'systemd')
36 os.mkdir(self.unit_dir)
37 self.out_dir = os.path.join(self.workdir, 'output')
38 os.mkdir(self.out_dir)
39
40 def tearDown(self):
41 shutil.rmtree(self.workdir)
42
43 #
44 # Helper methods
45 #
46
47 def run_generator(self, expect_error=False):
48 '''Run sysv-generator.
49
50 Fail if stderr contains any "Fail", unless expect_error is True.
5238e957 51 Return (stderr, filename -> ConfigParser) pair with output to stderr and
e28aa588
MP
52 parsed generated units.
53 '''
54 env = os.environ.copy()
55 env['SYSTEMD_LOG_LEVEL'] = 'debug'
6b7d32ad 56 env['SYSTEMD_LOG_TARGET'] = 'console'
e28aa588
MP
57 env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir
58 env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir
59 env['SYSTEMD_UNIT_PATH'] = self.unit_dir
60 gen = subprocess.Popen(
61 [sysv_generator, 'ignored', 'ignored', self.out_dir],
62 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
63 universal_newlines=True, env=env)
64 (out, err) = gen.communicate()
65 if not expect_error:
66 self.assertFalse('Fail' in err, err)
67 self.assertEqual(gen.returncode, 0, err)
68
69 results = {}
70 for service in glob(self.out_dir + '/*.service'):
56401ac5
MP
71 if os.path.islink(service):
72 continue
c584ffc0
LN
73 try:
74 # for python3 we need here strict=False to parse multiple
75 # lines with the same key
76 cp = RawConfigParser(dict_type=MultiDict, strict=False)
77 except TypeError:
78 # RawConfigParser in python2 does not have the strict option
79 # but it allows multiple lines with the same key by default
80 cp = RawConfigParser(dict_type=MultiDict)
e28aa588
MP
81 cp.optionxform = lambda o: o # don't lower-case option names
82 with open(service) as f:
83 cp.readfp(f)
84 results[os.path.basename(service)] = cp
85
86 return (err, results)
87
88 def add_sysv(self, fname, keys, enable=False, prio=1):
89 '''Create a SysV init script with the given keys in the LSB header
90
91 There are sensible default values for all fields.
92 If enable is True, links will be created in the rcN.d dirs. In that
93 case, the priority can be given with "prio" (default to 1).
94
95 Return path of generated script.
96 '''
97 name_without_sh = fname.endswith('.sh') and fname[:-3] or fname
98 keys.setdefault('Provides', name_without_sh)
99 keys.setdefault('Required-Start', '$local_fs')
100 keys.setdefault('Required-Stop', keys['Required-Start'])
101 keys.setdefault('Default-Start', '2 3 4 5')
102 keys.setdefault('Default-Stop', '0 1 6')
278391c2
BOT
103 keys.setdefault('Short-Description', 'test {} service'.format(name_without_sh))
104 keys.setdefault('Description', 'long description for test {} service'.format(name_without_sh))
e28aa588
MP
105 script = os.path.join(self.init_d_dir, fname)
106 with open(script, 'w') as f:
107 f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
108 for k, v in keys.items():
109 if v is not None:
278391c2 110 f.write('#{:>20} {}\n'.format(k + ':', v))
e28aa588
MP
111 f.write('### END INIT INFO\ncode --goes here\n')
112 os.chmod(script, 0o755)
113
114 if enable:
115 def make_link(prefix, runlevel):
278391c2 116 d = os.path.join(self.rcnd_dir, 'rc{}.d'.format(runlevel))
e28aa588
MP
117 if not os.path.isdir(d):
118 os.mkdir(d)
119 os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
120
121 for rl in keys['Default-Start'].split():
122 make_link('S%02i' % prio, rl)
123 for rl in keys['Default-Stop'].split():
124 make_link('K%02i' % (99 - prio), rl)
125
126 return script
127
0377e373
MP
128 def assert_enabled(self, unit, targets):
129 '''assert that a unit is enabled in precisely the given targets'''
e28aa588 130
0377e373 131 all_targets = ['multi-user', 'graphical']
e28aa588
MP
132
133 # should be enabled
0377e373 134 for target in all_targets:
278391c2 135 link = os.path.join(self.out_dir, '{}.target.wants'.format(target), unit)
0377e373
MP
136 if target in targets:
137 unit_file = os.readlink(link)
7f0cc637
ZJS
138 # os.path.exists() will fail on a dangling symlink
139 self.assertTrue(os.path.exists(link))
0377e373 140 self.assertEqual(os.path.basename(unit_file), unit)
e28aa588
MP
141 else:
142 self.assertFalse(os.path.exists(link),
278391c2 143 '{} unexpectedly exists'.format(link))
e28aa588
MP
144
145 #
146 # test cases
147 #
148
149 def test_nothing(self):
150 '''no input files'''
151
152 results = self.run_generator()[1]
153 self.assertEqual(results, {})
154 self.assertEqual(os.listdir(self.out_dir), [])
155
156 def test_simple_disabled(self):
157 '''simple service without dependencies, disabled'''
158
159 self.add_sysv('foo', {}, enable=False)
160 err, results = self.run_generator()
161 self.assertEqual(len(results), 1)
162
163 # no enablement links or other stuff
164 self.assertEqual(os.listdir(self.out_dir), ['foo.service'])
165
166 s = results['foo.service']
167 self.assertEqual(s.sections(), ['Unit', 'Service'])
168 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
169 # $local_fs does not need translation, don't expect any dependency
170 # fields here
171 self.assertEqual(set(s.options('Unit')),
172 set(['Documentation', 'SourcePath', 'Description']))
173
174 self.assertEqual(s.get('Service', 'Type'), 'forking')
175 init_script = os.path.join(self.init_d_dir, 'foo')
176 self.assertEqual(s.get('Service', 'ExecStart'),
278391c2 177 '{} start'.format(init_script))
e28aa588 178 self.assertEqual(s.get('Service', 'ExecStop'),
278391c2 179 '{} stop'.format(init_script))
e28aa588 180
4e558983
MP
181 self.assertNotIn('Overwriting', err)
182
e28aa588
MP
183 def test_simple_enabled_all(self):
184 '''simple service without dependencies, enabled in all runlevels'''
185
186 self.add_sysv('foo', {}, enable=True)
187 err, results = self.run_generator()
188 self.assertEqual(list(results), ['foo.service'])
0377e373 189 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
4e558983 190 self.assertNotIn('Overwriting', err)
e28aa588 191
264581a2
FS
192 def test_simple_escaped(self):
193 '''simple service without dependencies, that requires escaping the name'''
194
195 self.add_sysv('foo+', {})
196 self.add_sysv('foo-admin', {})
197 err, results = self.run_generator()
52a321d8 198 self.assertEqual(set(results), {'foo-admin.service', 'foo\\x2b.service'})
264581a2
FS
199 self.assertNotIn('Overwriting', err)
200
e28aa588
MP
201 def test_simple_enabled_some(self):
202 '''simple service without dependencies, enabled in some runlevels'''
203
204 self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True)
205 err, results = self.run_generator()
206 self.assertEqual(list(results), ['foo.service'])
0377e373 207 self.assert_enabled('foo.service', ['multi-user'])
e28aa588
MP
208
209 def test_lsb_macro_dep_single(self):
210 '''single LSB macro dependency: $network'''
211
212 self.add_sysv('foo', {'Required-Start': '$network'})
213 s = self.run_generator()[1]['foo.service']
214 self.assertEqual(set(s.options('Unit')),
215 set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
216 self.assertEqual(s.get('Unit', 'After'), 'network-online.target')
217 self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target')
218
219 def test_lsb_macro_dep_multi(self):
220 '''multiple LSB macro dependencies'''
221
222 self.add_sysv('foo', {'Required-Start': '$named $portmap'})
223 s = self.run_generator()[1]['foo.service']
224 self.assertEqual(set(s.options('Unit')),
225 set(['Documentation', 'SourcePath', 'Description', 'After']))
c584ffc0 226 self.assertEqual(s.get('Unit', 'After').split(), ['nss-lookup.target', 'rpcbind.target'])
e28aa588
MP
227
228 def test_lsb_deps(self):
229 '''LSB header dependencies to other services'''
230
231 # also give symlink priorities here; they should be ignored
232 self.add_sysv('foo', {'Required-Start': 'must1 must2',
233 'Should-Start': 'may1 ne_may2'},
234 enable=True, prio=40)
235 self.add_sysv('must1', {}, enable=True, prio=10)
236 self.add_sysv('must2', {}, enable=True, prio=15)
237 self.add_sysv('may1', {}, enable=True, prio=20)
238 # do not create ne_may2
239 err, results = self.run_generator()
240 self.assertEqual(sorted(results),
241 ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
242
243 # foo should depend on all of them
244 self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()),
245 ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
246
247 # other services should not depend on each other
248 self.assertFalse(results['must1.service'].has_option('Unit', 'After'))
249 self.assertFalse(results['must2.service'].has_option('Unit', 'After'))
250 self.assertFalse(results['may1.service'].has_option('Unit', 'After'))
251
252 def test_symlink_prio_deps(self):
253 '''script without LSB headers use rcN.d priority'''
254
255 # create two init.d scripts without LSB header and enable them with
256 # startup priorities
257 for prio, name in [(10, 'provider'), (15, 'consumer')]:
258 with open(os.path.join(self.init_d_dir, name), 'w') as f:
259 f.write('#!/bin/init-d-interpreter\ncode --goes here\n')
260 os.fchmod(f.fileno(), 0o755)
261
262 d = os.path.join(self.rcnd_dir, 'rc2.d')
263 if not os.path.isdir(d):
264 os.mkdir(d)
278391c2 265 os.symlink('../init.d/' + name, os.path.join(d, 'S{:>2}{}'.format(prio, name)))
e28aa588
MP
266
267 err, results = self.run_generator()
268 self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
269 self.assertFalse(results['provider.service'].has_option('Unit', 'After'))
270 self.assertEqual(results['consumer.service'].get('Unit', 'After'),
271 'provider.service')
272
273 def test_multiple_provides(self):
274 '''multiple Provides: names'''
275
276 self.add_sysv('foo', {'Provides': 'foo bar baz'})
56401ac5
MP
277 err, results = self.run_generator()
278 self.assertEqual(list(results), ['foo.service'])
279 self.assertEqual(set(results['foo.service'].options('Unit')),
e28aa588
MP
280 set(['Documentation', 'SourcePath', 'Description']))
281 # should create symlinks for the alternative names
282 for f in ['bar.service', 'baz.service']:
283 self.assertEqual(os.readlink(os.path.join(self.out_dir, f)),
284 'foo.service')
4e558983 285 self.assertNotIn('Overwriting', err)
e28aa588 286
264581a2
FS
287 def test_provides_escaped(self):
288 '''a script that Provides: a name that requires escaping'''
289
290 self.add_sysv('foo', {'Provides': 'foo foo+'})
291 err, results = self.run_generator()
292 self.assertEqual(list(results), ['foo.service'])
293 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'foo\\x2b.service')),
00d5eaaf 294 'foo.service')
264581a2
FS
295 self.assertNotIn('Overwriting', err)
296
77354c7e
MP
297 def test_same_provides_in_multiple_scripts(self):
298 '''multiple init.d scripts provide the same name'''
299
300 self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1)
301 self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2)
302 err, results = self.run_generator()
303 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
304 # should create symlink for the alternative name for either unit
305 self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')),
306 ['foo.service', 'bar.service'])
307
308 def test_provide_other_script(self):
309 '''init.d scripts provides the name of another init.d script'''
310
311 self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True)
312 self.add_sysv('bar', {'Provides': 'bar'}, enable=True)
313 err, results = self.run_generator()
314 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
4e558983
MP
315 # we do expect an overwrite here, bar.service should overwrite the
316 # alias link from foo.service
317 self.assertIn('Overwriting', err)
77354c7e 318
e28aa588
MP
319 def test_nonexecutable_script(self):
320 '''ignores non-executable init.d script'''
321
322 os.chmod(self.add_sysv('foo', {}), 0o644)
323 err, results = self.run_generator()
324 self.assertEqual(results, {})
325
29e0e6d8
MP
326 def test_sh_suffix(self):
327 '''init.d script with .sh suffix'''
328
329 self.add_sysv('foo.sh', {}, enable=True)
330 err, results = self.run_generator()
331 s = results['foo.service']
332
333 self.assertEqual(s.sections(), ['Unit', 'Service'])
334 # should not have a .sh
335 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
336
337 # calls correct script with .sh
338 init_script = os.path.join(self.init_d_dir, 'foo.sh')
339 self.assertEqual(s.get('Service', 'ExecStart'),
278391c2 340 '{} start'.format(init_script))
29e0e6d8 341 self.assertEqual(s.get('Service', 'ExecStop'),
278391c2 342 '{} stop'.format(init_script))
29e0e6d8 343
0377e373 344 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
29e0e6d8
MP
345
346 def test_sh_suffix_with_provides(self):
347 '''init.d script with .sh suffix and Provides:'''
348
349 self.add_sysv('foo.sh', {'Provides': 'foo bar'})
350 err, results = self.run_generator()
351 # ensure we don't try to create a symlink to itself
230f0485 352 self.assertNotIn('itself', err)
29e0e6d8
MP
353 self.assertEqual(list(results), ['foo.service'])
354 self.assertEqual(results['foo.service'].get('Unit', 'Description'),
355 'LSB: test foo service')
356
357 # should create symlink for the alternative name
358 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')),
359 'foo.service')
360
d816e2b7
MP
361 def test_hidden_files(self):
362 '''init.d script with hidden file suffix'''
363
364 script = self.add_sysv('foo', {}, enable=True)
365 # backup files (not enabled in rcN.d/)
366 shutil.copy(script, script + '.dpkg-new')
367 shutil.copy(script, script + '.dpkg-dist')
368 shutil.copy(script, script + '.swp')
369 shutil.copy(script, script + '.rpmsave')
370
371 err, results = self.run_generator()
372 self.assertEqual(list(results), ['foo.service'])
373
0377e373 374 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
29e0e6d8 375
77354c7e
MP
376 def test_backup_file(self):
377 '''init.d script with backup file'''
378
379 script = self.add_sysv('foo', {}, enable=True)
380 # backup files (not enabled in rcN.d/)
381 shutil.copy(script, script + '.bak')
382 shutil.copy(script, script + '.old')
94a0ef6e
ZJS
383 shutil.copy(script, script + '.tmp')
384 shutil.copy(script, script + '.new')
77354c7e
MP
385
386 err, results = self.run_generator()
387 print(err)
94a0ef6e 388 self.assertEqual(sorted(results), ['foo.service', 'foo.tmp.service'])
77354c7e
MP
389
390 # ensure we don't try to create a symlink to itself
230f0485 391 self.assertNotIn('itself', err)
77354c7e 392
0377e373 393 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
77354c7e
MP
394 self.assert_enabled('foo.bak.service', [])
395 self.assert_enabled('foo.old.service', [])
396
f4f01ec1
MP
397 def test_existing_native_unit(self):
398 '''existing native unit'''
399
400 with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f:
401 f.write('[Unit]\n')
402
403 self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True)
404 err, results = self.run_generator()
405 self.assertEqual(list(results), [])
406 # no enablement or alias links, as native unit is disabled
407 self.assertEqual(os.listdir(self.out_dir), [])
408
e28aa588
MP
409
410if __name__ == '__main__':
411 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))