]> git.ipfire.org Git - thirdparty/systemd.git/blame - test/sysv-generator-test.py
Merge pull request #6355 from vcaputo/journal_avoid_mmap_cache_get_calls
[thirdparty/systemd.git] / test / sysv-generator-test.py
CommitLineData
3e67e5c9 1#!/usr/bin/env python3
1f7be300 2#
e28aa588
MP
3# systemd-sysv-generator integration test
4#
5# (C) 2015 Canonical Ltd.
6# Author: Martin Pitt <martin.pitt@ubuntu.com>
7#
8# systemd is free software; you can redistribute it and/or modify it
9# under the terms of the GNU Lesser General Public License as published by
10# the Free Software Foundation; either version 2.1 of the License, or
11# (at your option) any later version.
12
13# systemd is distributed in the hope that it will be useful, but
14# WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16# Lesser General Public License for more details.
17#
18# You should have received a copy of the GNU Lesser General Public License
19# along with systemd; If not, see <http://www.gnu.org/licenses/>.
20
21import unittest
22import sys
23import os
24import subprocess
25import tempfile
26import shutil
27from glob import glob
c584ffc0 28import collections
e28aa588
MP
29
30try:
31 from configparser import RawConfigParser
32except ImportError:
33 # python 2
34 from ConfigParser import RawConfigParser
35
36sysv_generator = os.path.join(os.environ.get('builddir', '.'), 'systemd-sysv-generator')
37
c584ffc0
LN
38class MultiDict(collections.OrderedDict):
39 def __setitem__(self, key, value):
40 if isinstance(value, list) and key in self:
41 self[key].extend(value)
42 else:
43 super(MultiDict, self).__setitem__(key, value)
e28aa588
MP
44
45class SysvGeneratorTest(unittest.TestCase):
46 def setUp(self):
47 self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.')
48 self.init_d_dir = os.path.join(self.workdir, 'init.d')
49 os.mkdir(self.init_d_dir)
50 self.rcnd_dir = self.workdir
51 self.unit_dir = os.path.join(self.workdir, 'systemd')
52 os.mkdir(self.unit_dir)
53 self.out_dir = os.path.join(self.workdir, 'output')
54 os.mkdir(self.out_dir)
55
56 def tearDown(self):
57 shutil.rmtree(self.workdir)
58
59 #
60 # Helper methods
61 #
62
63 def run_generator(self, expect_error=False):
64 '''Run sysv-generator.
65
66 Fail if stderr contains any "Fail", unless expect_error is True.
67 Return (stderr, filename -> ConfigParser) pair with ouput to stderr and
68 parsed generated units.
69 '''
70 env = os.environ.copy()
71 env['SYSTEMD_LOG_LEVEL'] = 'debug'
6b7d32ad 72 env['SYSTEMD_LOG_TARGET'] = 'console'
e28aa588
MP
73 env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir
74 env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir
75 env['SYSTEMD_UNIT_PATH'] = self.unit_dir
76 gen = subprocess.Popen(
77 [sysv_generator, 'ignored', 'ignored', self.out_dir],
78 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
79 universal_newlines=True, env=env)
80 (out, err) = gen.communicate()
81 if not expect_error:
82 self.assertFalse('Fail' in err, err)
83 self.assertEqual(gen.returncode, 0, err)
84
85 results = {}
86 for service in glob(self.out_dir + '/*.service'):
56401ac5
MP
87 if os.path.islink(service):
88 continue
c584ffc0
LN
89 try:
90 # for python3 we need here strict=False to parse multiple
91 # lines with the same key
92 cp = RawConfigParser(dict_type=MultiDict, strict=False)
93 except TypeError:
94 # RawConfigParser in python2 does not have the strict option
95 # but it allows multiple lines with the same key by default
96 cp = RawConfigParser(dict_type=MultiDict)
e28aa588
MP
97 cp.optionxform = lambda o: o # don't lower-case option names
98 with open(service) as f:
99 cp.readfp(f)
100 results[os.path.basename(service)] = cp
101
102 return (err, results)
103
104 def add_sysv(self, fname, keys, enable=False, prio=1):
105 '''Create a SysV init script with the given keys in the LSB header
106
107 There are sensible default values for all fields.
108 If enable is True, links will be created in the rcN.d dirs. In that
109 case, the priority can be given with "prio" (default to 1).
110
111 Return path of generated script.
112 '''
113 name_without_sh = fname.endswith('.sh') and fname[:-3] or fname
114 keys.setdefault('Provides', name_without_sh)
115 keys.setdefault('Required-Start', '$local_fs')
116 keys.setdefault('Required-Stop', keys['Required-Start'])
117 keys.setdefault('Default-Start', '2 3 4 5')
118 keys.setdefault('Default-Stop', '0 1 6')
119 keys.setdefault('Short-Description', 'test %s service' %
120 name_without_sh)
121 keys.setdefault('Description', 'long description for test %s service' %
122 name_without_sh)
123 script = os.path.join(self.init_d_dir, fname)
124 with open(script, 'w') as f:
125 f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
126 for k, v in keys.items():
127 if v is not None:
128 f.write('#%20s %s\n' % (k + ':', v))
129 f.write('### END INIT INFO\ncode --goes here\n')
130 os.chmod(script, 0o755)
131
132 if enable:
133 def make_link(prefix, runlevel):
134 d = os.path.join(self.rcnd_dir, 'rc%s.d' % runlevel)
135 if not os.path.isdir(d):
136 os.mkdir(d)
137 os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
138
139 for rl in keys['Default-Start'].split():
140 make_link('S%02i' % prio, rl)
141 for rl in keys['Default-Stop'].split():
142 make_link('K%02i' % (99 - prio), rl)
143
144 return script
145
0377e373
MP
146 def assert_enabled(self, unit, targets):
147 '''assert that a unit is enabled in precisely the given targets'''
e28aa588 148
0377e373 149 all_targets = ['multi-user', 'graphical']
e28aa588
MP
150
151 # should be enabled
0377e373
MP
152 for target in all_targets:
153 link = os.path.join(self.out_dir, '%s.target.wants' % target, unit)
154 if target in targets:
155 unit_file = os.readlink(link)
7f0cc637
ZJS
156 # os.path.exists() will fail on a dangling symlink
157 self.assertTrue(os.path.exists(link))
0377e373 158 self.assertEqual(os.path.basename(unit_file), unit)
e28aa588
MP
159 else:
160 self.assertFalse(os.path.exists(link),
161 '%s unexpectedly exists' % link)
162
163 #
164 # test cases
165 #
166
167 def test_nothing(self):
168 '''no input files'''
169
170 results = self.run_generator()[1]
171 self.assertEqual(results, {})
172 self.assertEqual(os.listdir(self.out_dir), [])
173
174 def test_simple_disabled(self):
175 '''simple service without dependencies, disabled'''
176
177 self.add_sysv('foo', {}, enable=False)
178 err, results = self.run_generator()
179 self.assertEqual(len(results), 1)
180
181 # no enablement links or other stuff
182 self.assertEqual(os.listdir(self.out_dir), ['foo.service'])
183
184 s = results['foo.service']
185 self.assertEqual(s.sections(), ['Unit', 'Service'])
186 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
187 # $local_fs does not need translation, don't expect any dependency
188 # fields here
189 self.assertEqual(set(s.options('Unit')),
190 set(['Documentation', 'SourcePath', 'Description']))
191
192 self.assertEqual(s.get('Service', 'Type'), 'forking')
193 init_script = os.path.join(self.init_d_dir, 'foo')
194 self.assertEqual(s.get('Service', 'ExecStart'),
195 '%s start' % init_script)
196 self.assertEqual(s.get('Service', 'ExecStop'),
197 '%s stop' % init_script)
198
4e558983
MP
199 self.assertNotIn('Overwriting', err)
200
e28aa588
MP
201 def test_simple_enabled_all(self):
202 '''simple service without dependencies, enabled in all runlevels'''
203
204 self.add_sysv('foo', {}, enable=True)
205 err, results = self.run_generator()
206 self.assertEqual(list(results), ['foo.service'])
0377e373 207 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
4e558983 208 self.assertNotIn('Overwriting', err)
e28aa588 209
264581a2
FS
210 def test_simple_escaped(self):
211 '''simple service without dependencies, that requires escaping the name'''
212
213 self.add_sysv('foo+', {})
214 self.add_sysv('foo-admin', {})
215 err, results = self.run_generator()
52a321d8 216 self.assertEqual(set(results), {'foo-admin.service', 'foo\\x2b.service'})
264581a2
FS
217 self.assertNotIn('Overwriting', err)
218
e28aa588
MP
219 def test_simple_enabled_some(self):
220 '''simple service without dependencies, enabled in some runlevels'''
221
222 self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True)
223 err, results = self.run_generator()
224 self.assertEqual(list(results), ['foo.service'])
0377e373 225 self.assert_enabled('foo.service', ['multi-user'])
e28aa588
MP
226
227 def test_lsb_macro_dep_single(self):
228 '''single LSB macro dependency: $network'''
229
230 self.add_sysv('foo', {'Required-Start': '$network'})
231 s = self.run_generator()[1]['foo.service']
232 self.assertEqual(set(s.options('Unit')),
233 set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
234 self.assertEqual(s.get('Unit', 'After'), 'network-online.target')
235 self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target')
236
237 def test_lsb_macro_dep_multi(self):
238 '''multiple LSB macro dependencies'''
239
240 self.add_sysv('foo', {'Required-Start': '$named $portmap'})
241 s = self.run_generator()[1]['foo.service']
242 self.assertEqual(set(s.options('Unit')),
243 set(['Documentation', 'SourcePath', 'Description', 'After']))
c584ffc0 244 self.assertEqual(s.get('Unit', 'After').split(), ['nss-lookup.target', 'rpcbind.target'])
e28aa588
MP
245
246 def test_lsb_deps(self):
247 '''LSB header dependencies to other services'''
248
249 # also give symlink priorities here; they should be ignored
250 self.add_sysv('foo', {'Required-Start': 'must1 must2',
251 'Should-Start': 'may1 ne_may2'},
252 enable=True, prio=40)
253 self.add_sysv('must1', {}, enable=True, prio=10)
254 self.add_sysv('must2', {}, enable=True, prio=15)
255 self.add_sysv('may1', {}, enable=True, prio=20)
256 # do not create ne_may2
257 err, results = self.run_generator()
258 self.assertEqual(sorted(results),
259 ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
260
261 # foo should depend on all of them
262 self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()),
263 ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
264
265 # other services should not depend on each other
266 self.assertFalse(results['must1.service'].has_option('Unit', 'After'))
267 self.assertFalse(results['must2.service'].has_option('Unit', 'After'))
268 self.assertFalse(results['may1.service'].has_option('Unit', 'After'))
269
270 def test_symlink_prio_deps(self):
271 '''script without LSB headers use rcN.d priority'''
272
273 # create two init.d scripts without LSB header and enable them with
274 # startup priorities
275 for prio, name in [(10, 'provider'), (15, 'consumer')]:
276 with open(os.path.join(self.init_d_dir, name), 'w') as f:
277 f.write('#!/bin/init-d-interpreter\ncode --goes here\n')
278 os.fchmod(f.fileno(), 0o755)
279
280 d = os.path.join(self.rcnd_dir, 'rc2.d')
281 if not os.path.isdir(d):
282 os.mkdir(d)
283 os.symlink('../init.d/' + name, os.path.join(d, 'S%02i%s' % (prio, name)))
284
285 err, results = self.run_generator()
286 self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
287 self.assertFalse(results['provider.service'].has_option('Unit', 'After'))
288 self.assertEqual(results['consumer.service'].get('Unit', 'After'),
289 'provider.service')
290
291 def test_multiple_provides(self):
292 '''multiple Provides: names'''
293
294 self.add_sysv('foo', {'Provides': 'foo bar baz'})
56401ac5
MP
295 err, results = self.run_generator()
296 self.assertEqual(list(results), ['foo.service'])
297 self.assertEqual(set(results['foo.service'].options('Unit')),
e28aa588
MP
298 set(['Documentation', 'SourcePath', 'Description']))
299 # should create symlinks for the alternative names
300 for f in ['bar.service', 'baz.service']:
301 self.assertEqual(os.readlink(os.path.join(self.out_dir, f)),
302 'foo.service')
4e558983 303 self.assertNotIn('Overwriting', err)
e28aa588 304
264581a2
FS
305 def test_provides_escaped(self):
306 '''a script that Provides: a name that requires escaping'''
307
308 self.add_sysv('foo', {'Provides': 'foo foo+'})
309 err, results = self.run_generator()
310 self.assertEqual(list(results), ['foo.service'])
311 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'foo\\x2b.service')),
00d5eaaf 312 'foo.service')
264581a2
FS
313 self.assertNotIn('Overwriting', err)
314
77354c7e
MP
315 def test_same_provides_in_multiple_scripts(self):
316 '''multiple init.d scripts provide the same name'''
317
318 self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1)
319 self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2)
320 err, results = self.run_generator()
321 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
322 # should create symlink for the alternative name for either unit
323 self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')),
324 ['foo.service', 'bar.service'])
325
326 def test_provide_other_script(self):
327 '''init.d scripts provides the name of another init.d script'''
328
329 self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True)
330 self.add_sysv('bar', {'Provides': 'bar'}, enable=True)
331 err, results = self.run_generator()
332 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
4e558983
MP
333 # we do expect an overwrite here, bar.service should overwrite the
334 # alias link from foo.service
335 self.assertIn('Overwriting', err)
77354c7e 336
e28aa588
MP
337 def test_nonexecutable_script(self):
338 '''ignores non-executable init.d script'''
339
340 os.chmod(self.add_sysv('foo', {}), 0o644)
341 err, results = self.run_generator()
342 self.assertEqual(results, {})
343
29e0e6d8
MP
344 def test_sh_suffix(self):
345 '''init.d script with .sh suffix'''
346
347 self.add_sysv('foo.sh', {}, enable=True)
348 err, results = self.run_generator()
349 s = results['foo.service']
350
351 self.assertEqual(s.sections(), ['Unit', 'Service'])
352 # should not have a .sh
353 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
354
355 # calls correct script with .sh
356 init_script = os.path.join(self.init_d_dir, 'foo.sh')
357 self.assertEqual(s.get('Service', 'ExecStart'),
358 '%s start' % init_script)
359 self.assertEqual(s.get('Service', 'ExecStop'),
360 '%s stop' % init_script)
361
0377e373 362 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
29e0e6d8
MP
363
364 def test_sh_suffix_with_provides(self):
365 '''init.d script with .sh suffix and Provides:'''
366
367 self.add_sysv('foo.sh', {'Provides': 'foo bar'})
368 err, results = self.run_generator()
369 # ensure we don't try to create a symlink to itself
230f0485 370 self.assertNotIn('itself', err)
29e0e6d8
MP
371 self.assertEqual(list(results), ['foo.service'])
372 self.assertEqual(results['foo.service'].get('Unit', 'Description'),
373 'LSB: test foo service')
374
375 # should create symlink for the alternative name
376 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')),
377 'foo.service')
378
d816e2b7
MP
379 def test_hidden_files(self):
380 '''init.d script with hidden file suffix'''
381
382 script = self.add_sysv('foo', {}, enable=True)
383 # backup files (not enabled in rcN.d/)
384 shutil.copy(script, script + '.dpkg-new')
385 shutil.copy(script, script + '.dpkg-dist')
386 shutil.copy(script, script + '.swp')
387 shutil.copy(script, script + '.rpmsave')
388
389 err, results = self.run_generator()
390 self.assertEqual(list(results), ['foo.service'])
391
0377e373 392 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
29e0e6d8 393
77354c7e
MP
394 def test_backup_file(self):
395 '''init.d script with backup file'''
396
397 script = self.add_sysv('foo', {}, enable=True)
398 # backup files (not enabled in rcN.d/)
399 shutil.copy(script, script + '.bak')
400 shutil.copy(script, script + '.old')
94a0ef6e
ZJS
401 shutil.copy(script, script + '.tmp')
402 shutil.copy(script, script + '.new')
77354c7e
MP
403
404 err, results = self.run_generator()
405 print(err)
94a0ef6e 406 self.assertEqual(sorted(results), ['foo.service', 'foo.tmp.service'])
77354c7e
MP
407
408 # ensure we don't try to create a symlink to itself
230f0485 409 self.assertNotIn('itself', err)
77354c7e 410
0377e373 411 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
77354c7e
MP
412 self.assert_enabled('foo.bak.service', [])
413 self.assert_enabled('foo.old.service', [])
414
f4f01ec1
MP
415 def test_existing_native_unit(self):
416 '''existing native unit'''
417
418 with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f:
419 f.write('[Unit]\n')
420
421 self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True)
422 err, results = self.run_generator()
423 self.assertEqual(list(results), [])
424 # no enablement or alias links, as native unit is disabled
425 self.assertEqual(os.listdir(self.out_dir), [])
426
e28aa588
MP
427
428if __name__ == '__main__':
429 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))