]> git.ipfire.org Git - thirdparty/systemd.git/blame - test/sysv-generator-test.py
systemctl: don't use get_process_comm() on non-local PIDs (#7518)
[thirdparty/systemd.git] / test / sysv-generator-test.py
CommitLineData
3e67e5c9 1#!/usr/bin/env python3
35df7443 2# SPDX-License-Identifier: LGPL-2.1+
1f7be300 3#
e28aa588
MP
4# systemd-sysv-generator integration test
5#
6# (C) 2015 Canonical Ltd.
7# Author: Martin Pitt <martin.pitt@ubuntu.com>
8#
9# systemd is free software; you can redistribute it and/or modify it
10# under the terms of the GNU Lesser General Public License as published by
11# the Free Software Foundation; either version 2.1 of the License, or
12# (at your option) any later version.
13
14# systemd is distributed in the hope that it will be useful, but
15# WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17# Lesser General Public License for more details.
18#
19# You should have received a copy of the GNU Lesser General Public License
20# along with systemd; If not, see <http://www.gnu.org/licenses/>.
21
22import unittest
23import sys
24import os
25import subprocess
26import tempfile
27import shutil
28from glob import glob
c584ffc0 29import collections
a4b57b32 30from configparser import RawConfigParser
e28aa588 31
73e406e1 32sysv_generator = './systemd-sysv-generator'
e28aa588 33
c584ffc0
LN
34class MultiDict(collections.OrderedDict):
35 def __setitem__(self, key, value):
36 if isinstance(value, list) and key in self:
37 self[key].extend(value)
38 else:
39 super(MultiDict, self).__setitem__(key, value)
e28aa588
MP
40
41class SysvGeneratorTest(unittest.TestCase):
42 def setUp(self):
43 self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.')
44 self.init_d_dir = os.path.join(self.workdir, 'init.d')
45 os.mkdir(self.init_d_dir)
46 self.rcnd_dir = self.workdir
47 self.unit_dir = os.path.join(self.workdir, 'systemd')
48 os.mkdir(self.unit_dir)
49 self.out_dir = os.path.join(self.workdir, 'output')
50 os.mkdir(self.out_dir)
51
52 def tearDown(self):
53 shutil.rmtree(self.workdir)
54
55 #
56 # Helper methods
57 #
58
59 def run_generator(self, expect_error=False):
60 '''Run sysv-generator.
61
62 Fail if stderr contains any "Fail", unless expect_error is True.
63 Return (stderr, filename -> ConfigParser) pair with ouput to stderr and
64 parsed generated units.
65 '''
66 env = os.environ.copy()
67 env['SYSTEMD_LOG_LEVEL'] = 'debug'
6b7d32ad 68 env['SYSTEMD_LOG_TARGET'] = 'console'
e28aa588
MP
69 env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir
70 env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir
71 env['SYSTEMD_UNIT_PATH'] = self.unit_dir
72 gen = subprocess.Popen(
73 [sysv_generator, 'ignored', 'ignored', self.out_dir],
74 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
75 universal_newlines=True, env=env)
76 (out, err) = gen.communicate()
77 if not expect_error:
78 self.assertFalse('Fail' in err, err)
79 self.assertEqual(gen.returncode, 0, err)
80
81 results = {}
82 for service in glob(self.out_dir + '/*.service'):
56401ac5
MP
83 if os.path.islink(service):
84 continue
c584ffc0
LN
85 try:
86 # for python3 we need here strict=False to parse multiple
87 # lines with the same key
88 cp = RawConfigParser(dict_type=MultiDict, strict=False)
89 except TypeError:
90 # RawConfigParser in python2 does not have the strict option
91 # but it allows multiple lines with the same key by default
92 cp = RawConfigParser(dict_type=MultiDict)
e28aa588
MP
93 cp.optionxform = lambda o: o # don't lower-case option names
94 with open(service) as f:
95 cp.readfp(f)
96 results[os.path.basename(service)] = cp
97
98 return (err, results)
99
100 def add_sysv(self, fname, keys, enable=False, prio=1):
101 '''Create a SysV init script with the given keys in the LSB header
102
103 There are sensible default values for all fields.
104 If enable is True, links will be created in the rcN.d dirs. In that
105 case, the priority can be given with "prio" (default to 1).
106
107 Return path of generated script.
108 '''
109 name_without_sh = fname.endswith('.sh') and fname[:-3] or fname
110 keys.setdefault('Provides', name_without_sh)
111 keys.setdefault('Required-Start', '$local_fs')
112 keys.setdefault('Required-Stop', keys['Required-Start'])
113 keys.setdefault('Default-Start', '2 3 4 5')
114 keys.setdefault('Default-Stop', '0 1 6')
115 keys.setdefault('Short-Description', 'test %s service' %
116 name_without_sh)
117 keys.setdefault('Description', 'long description for test %s service' %
118 name_without_sh)
119 script = os.path.join(self.init_d_dir, fname)
120 with open(script, 'w') as f:
121 f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
122 for k, v in keys.items():
123 if v is not None:
124 f.write('#%20s %s\n' % (k + ':', v))
125 f.write('### END INIT INFO\ncode --goes here\n')
126 os.chmod(script, 0o755)
127
128 if enable:
129 def make_link(prefix, runlevel):
130 d = os.path.join(self.rcnd_dir, 'rc%s.d' % runlevel)
131 if not os.path.isdir(d):
132 os.mkdir(d)
133 os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
134
135 for rl in keys['Default-Start'].split():
136 make_link('S%02i' % prio, rl)
137 for rl in keys['Default-Stop'].split():
138 make_link('K%02i' % (99 - prio), rl)
139
140 return script
141
0377e373
MP
142 def assert_enabled(self, unit, targets):
143 '''assert that a unit is enabled in precisely the given targets'''
e28aa588 144
0377e373 145 all_targets = ['multi-user', 'graphical']
e28aa588
MP
146
147 # should be enabled
0377e373
MP
148 for target in all_targets:
149 link = os.path.join(self.out_dir, '%s.target.wants' % target, unit)
150 if target in targets:
151 unit_file = os.readlink(link)
7f0cc637
ZJS
152 # os.path.exists() will fail on a dangling symlink
153 self.assertTrue(os.path.exists(link))
0377e373 154 self.assertEqual(os.path.basename(unit_file), unit)
e28aa588
MP
155 else:
156 self.assertFalse(os.path.exists(link),
157 '%s unexpectedly exists' % link)
158
159 #
160 # test cases
161 #
162
163 def test_nothing(self):
164 '''no input files'''
165
166 results = self.run_generator()[1]
167 self.assertEqual(results, {})
168 self.assertEqual(os.listdir(self.out_dir), [])
169
170 def test_simple_disabled(self):
171 '''simple service without dependencies, disabled'''
172
173 self.add_sysv('foo', {}, enable=False)
174 err, results = self.run_generator()
175 self.assertEqual(len(results), 1)
176
177 # no enablement links or other stuff
178 self.assertEqual(os.listdir(self.out_dir), ['foo.service'])
179
180 s = results['foo.service']
181 self.assertEqual(s.sections(), ['Unit', 'Service'])
182 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
183 # $local_fs does not need translation, don't expect any dependency
184 # fields here
185 self.assertEqual(set(s.options('Unit')),
186 set(['Documentation', 'SourcePath', 'Description']))
187
188 self.assertEqual(s.get('Service', 'Type'), 'forking')
189 init_script = os.path.join(self.init_d_dir, 'foo')
190 self.assertEqual(s.get('Service', 'ExecStart'),
191 '%s start' % init_script)
192 self.assertEqual(s.get('Service', 'ExecStop'),
193 '%s stop' % init_script)
194
4e558983
MP
195 self.assertNotIn('Overwriting', err)
196
e28aa588
MP
197 def test_simple_enabled_all(self):
198 '''simple service without dependencies, enabled in all runlevels'''
199
200 self.add_sysv('foo', {}, enable=True)
201 err, results = self.run_generator()
202 self.assertEqual(list(results), ['foo.service'])
0377e373 203 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
4e558983 204 self.assertNotIn('Overwriting', err)
e28aa588 205
264581a2
FS
206 def test_simple_escaped(self):
207 '''simple service without dependencies, that requires escaping the name'''
208
209 self.add_sysv('foo+', {})
210 self.add_sysv('foo-admin', {})
211 err, results = self.run_generator()
52a321d8 212 self.assertEqual(set(results), {'foo-admin.service', 'foo\\x2b.service'})
264581a2
FS
213 self.assertNotIn('Overwriting', err)
214
e28aa588
MP
215 def test_simple_enabled_some(self):
216 '''simple service without dependencies, enabled in some runlevels'''
217
218 self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True)
219 err, results = self.run_generator()
220 self.assertEqual(list(results), ['foo.service'])
0377e373 221 self.assert_enabled('foo.service', ['multi-user'])
e28aa588
MP
222
223 def test_lsb_macro_dep_single(self):
224 '''single LSB macro dependency: $network'''
225
226 self.add_sysv('foo', {'Required-Start': '$network'})
227 s = self.run_generator()[1]['foo.service']
228 self.assertEqual(set(s.options('Unit')),
229 set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
230 self.assertEqual(s.get('Unit', 'After'), 'network-online.target')
231 self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target')
232
233 def test_lsb_macro_dep_multi(self):
234 '''multiple LSB macro dependencies'''
235
236 self.add_sysv('foo', {'Required-Start': '$named $portmap'})
237 s = self.run_generator()[1]['foo.service']
238 self.assertEqual(set(s.options('Unit')),
239 set(['Documentation', 'SourcePath', 'Description', 'After']))
c584ffc0 240 self.assertEqual(s.get('Unit', 'After').split(), ['nss-lookup.target', 'rpcbind.target'])
e28aa588
MP
241
242 def test_lsb_deps(self):
243 '''LSB header dependencies to other services'''
244
245 # also give symlink priorities here; they should be ignored
246 self.add_sysv('foo', {'Required-Start': 'must1 must2',
247 'Should-Start': 'may1 ne_may2'},
248 enable=True, prio=40)
249 self.add_sysv('must1', {}, enable=True, prio=10)
250 self.add_sysv('must2', {}, enable=True, prio=15)
251 self.add_sysv('may1', {}, enable=True, prio=20)
252 # do not create ne_may2
253 err, results = self.run_generator()
254 self.assertEqual(sorted(results),
255 ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
256
257 # foo should depend on all of them
258 self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()),
259 ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
260
261 # other services should not depend on each other
262 self.assertFalse(results['must1.service'].has_option('Unit', 'After'))
263 self.assertFalse(results['must2.service'].has_option('Unit', 'After'))
264 self.assertFalse(results['may1.service'].has_option('Unit', 'After'))
265
266 def test_symlink_prio_deps(self):
267 '''script without LSB headers use rcN.d priority'''
268
269 # create two init.d scripts without LSB header and enable them with
270 # startup priorities
271 for prio, name in [(10, 'provider'), (15, 'consumer')]:
272 with open(os.path.join(self.init_d_dir, name), 'w') as f:
273 f.write('#!/bin/init-d-interpreter\ncode --goes here\n')
274 os.fchmod(f.fileno(), 0o755)
275
276 d = os.path.join(self.rcnd_dir, 'rc2.d')
277 if not os.path.isdir(d):
278 os.mkdir(d)
279 os.symlink('../init.d/' + name, os.path.join(d, 'S%02i%s' % (prio, name)))
280
281 err, results = self.run_generator()
282 self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
283 self.assertFalse(results['provider.service'].has_option('Unit', 'After'))
284 self.assertEqual(results['consumer.service'].get('Unit', 'After'),
285 'provider.service')
286
287 def test_multiple_provides(self):
288 '''multiple Provides: names'''
289
290 self.add_sysv('foo', {'Provides': 'foo bar baz'})
56401ac5
MP
291 err, results = self.run_generator()
292 self.assertEqual(list(results), ['foo.service'])
293 self.assertEqual(set(results['foo.service'].options('Unit')),
e28aa588
MP
294 set(['Documentation', 'SourcePath', 'Description']))
295 # should create symlinks for the alternative names
296 for f in ['bar.service', 'baz.service']:
297 self.assertEqual(os.readlink(os.path.join(self.out_dir, f)),
298 'foo.service')
4e558983 299 self.assertNotIn('Overwriting', err)
e28aa588 300
264581a2
FS
301 def test_provides_escaped(self):
302 '''a script that Provides: a name that requires escaping'''
303
304 self.add_sysv('foo', {'Provides': 'foo foo+'})
305 err, results = self.run_generator()
306 self.assertEqual(list(results), ['foo.service'])
307 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'foo\\x2b.service')),
00d5eaaf 308 'foo.service')
264581a2
FS
309 self.assertNotIn('Overwriting', err)
310
77354c7e
MP
311 def test_same_provides_in_multiple_scripts(self):
312 '''multiple init.d scripts provide the same name'''
313
314 self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1)
315 self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2)
316 err, results = self.run_generator()
317 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
318 # should create symlink for the alternative name for either unit
319 self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')),
320 ['foo.service', 'bar.service'])
321
322 def test_provide_other_script(self):
323 '''init.d scripts provides the name of another init.d script'''
324
325 self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True)
326 self.add_sysv('bar', {'Provides': 'bar'}, enable=True)
327 err, results = self.run_generator()
328 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
4e558983
MP
329 # we do expect an overwrite here, bar.service should overwrite the
330 # alias link from foo.service
331 self.assertIn('Overwriting', err)
77354c7e 332
e28aa588
MP
333 def test_nonexecutable_script(self):
334 '''ignores non-executable init.d script'''
335
336 os.chmod(self.add_sysv('foo', {}), 0o644)
337 err, results = self.run_generator()
338 self.assertEqual(results, {})
339
29e0e6d8
MP
340 def test_sh_suffix(self):
341 '''init.d script with .sh suffix'''
342
343 self.add_sysv('foo.sh', {}, enable=True)
344 err, results = self.run_generator()
345 s = results['foo.service']
346
347 self.assertEqual(s.sections(), ['Unit', 'Service'])
348 # should not have a .sh
349 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
350
351 # calls correct script with .sh
352 init_script = os.path.join(self.init_d_dir, 'foo.sh')
353 self.assertEqual(s.get('Service', 'ExecStart'),
354 '%s start' % init_script)
355 self.assertEqual(s.get('Service', 'ExecStop'),
356 '%s stop' % init_script)
357
0377e373 358 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
29e0e6d8
MP
359
360 def test_sh_suffix_with_provides(self):
361 '''init.d script with .sh suffix and Provides:'''
362
363 self.add_sysv('foo.sh', {'Provides': 'foo bar'})
364 err, results = self.run_generator()
365 # ensure we don't try to create a symlink to itself
230f0485 366 self.assertNotIn('itself', err)
29e0e6d8
MP
367 self.assertEqual(list(results), ['foo.service'])
368 self.assertEqual(results['foo.service'].get('Unit', 'Description'),
369 'LSB: test foo service')
370
371 # should create symlink for the alternative name
372 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')),
373 'foo.service')
374
d816e2b7
MP
375 def test_hidden_files(self):
376 '''init.d script with hidden file suffix'''
377
378 script = self.add_sysv('foo', {}, enable=True)
379 # backup files (not enabled in rcN.d/)
380 shutil.copy(script, script + '.dpkg-new')
381 shutil.copy(script, script + '.dpkg-dist')
382 shutil.copy(script, script + '.swp')
383 shutil.copy(script, script + '.rpmsave')
384
385 err, results = self.run_generator()
386 self.assertEqual(list(results), ['foo.service'])
387
0377e373 388 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
29e0e6d8 389
77354c7e
MP
390 def test_backup_file(self):
391 '''init.d script with backup file'''
392
393 script = self.add_sysv('foo', {}, enable=True)
394 # backup files (not enabled in rcN.d/)
395 shutil.copy(script, script + '.bak')
396 shutil.copy(script, script + '.old')
94a0ef6e
ZJS
397 shutil.copy(script, script + '.tmp')
398 shutil.copy(script, script + '.new')
77354c7e
MP
399
400 err, results = self.run_generator()
401 print(err)
94a0ef6e 402 self.assertEqual(sorted(results), ['foo.service', 'foo.tmp.service'])
77354c7e
MP
403
404 # ensure we don't try to create a symlink to itself
230f0485 405 self.assertNotIn('itself', err)
77354c7e 406
0377e373 407 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
77354c7e
MP
408 self.assert_enabled('foo.bak.service', [])
409 self.assert_enabled('foo.old.service', [])
410
f4f01ec1
MP
411 def test_existing_native_unit(self):
412 '''existing native unit'''
413
414 with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f:
415 f.write('[Unit]\n')
416
417 self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True)
418 err, results = self.run_generator()
419 self.assertEqual(list(results), [])
420 # no enablement or alias links, as native unit is disabled
421 self.assertEqual(os.listdir(self.out_dir), [])
422
e28aa588
MP
423
424if __name__ == '__main__':
425 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))