]> git.ipfire.org Git - thirdparty/systemd.git/blame - test/sysv-generator-test.py
sysv-generator: do not join dependencies on one line, split them
[thirdparty/systemd.git] / test / sysv-generator-test.py
CommitLineData
e28aa588
MP
1# systemd-sysv-generator integration test
2#
3# (C) 2015 Canonical Ltd.
4# Author: Martin Pitt <martin.pitt@ubuntu.com>
5#
6# systemd is free software; you can redistribute it and/or modify it
7# under the terms of the GNU Lesser General Public License as published by
8# the Free Software Foundation; either version 2.1 of the License, or
9# (at your option) any later version.
10
11# systemd is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Lesser General Public License for more details.
15#
16# You should have received a copy of the GNU Lesser General Public License
17# along with systemd; If not, see <http://www.gnu.org/licenses/>.
18
19import unittest
20import sys
21import os
22import subprocess
23import tempfile
24import shutil
25from glob import glob
c584ffc0 26import collections
e28aa588
MP
27
28try:
29 from configparser import RawConfigParser
30except ImportError:
31 # python 2
32 from ConfigParser import RawConfigParser
33
34sysv_generator = os.path.join(os.environ.get('builddir', '.'), 'systemd-sysv-generator')
35
c584ffc0
LN
36class MultiDict(collections.OrderedDict):
37 def __setitem__(self, key, value):
38 if isinstance(value, list) and key in self:
39 self[key].extend(value)
40 else:
41 super(MultiDict, self).__setitem__(key, value)
e28aa588
MP
42
43class SysvGeneratorTest(unittest.TestCase):
44 def setUp(self):
45 self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.')
46 self.init_d_dir = os.path.join(self.workdir, 'init.d')
47 os.mkdir(self.init_d_dir)
48 self.rcnd_dir = self.workdir
49 self.unit_dir = os.path.join(self.workdir, 'systemd')
50 os.mkdir(self.unit_dir)
51 self.out_dir = os.path.join(self.workdir, 'output')
52 os.mkdir(self.out_dir)
53
54 def tearDown(self):
55 shutil.rmtree(self.workdir)
56
57 #
58 # Helper methods
59 #
60
61 def run_generator(self, expect_error=False):
62 '''Run sysv-generator.
63
64 Fail if stderr contains any "Fail", unless expect_error is True.
65 Return (stderr, filename -> ConfigParser) pair with ouput to stderr and
66 parsed generated units.
67 '''
68 env = os.environ.copy()
69 env['SYSTEMD_LOG_LEVEL'] = 'debug'
6b7d32ad 70 env['SYSTEMD_LOG_TARGET'] = 'console'
e28aa588
MP
71 env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir
72 env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir
73 env['SYSTEMD_UNIT_PATH'] = self.unit_dir
74 gen = subprocess.Popen(
75 [sysv_generator, 'ignored', 'ignored', self.out_dir],
76 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
77 universal_newlines=True, env=env)
78 (out, err) = gen.communicate()
79 if not expect_error:
80 self.assertFalse('Fail' in err, err)
81 self.assertEqual(gen.returncode, 0, err)
82
83 results = {}
84 for service in glob(self.out_dir + '/*.service'):
56401ac5
MP
85 if os.path.islink(service):
86 continue
c584ffc0
LN
87 try:
88 # for python3 we need here strict=False to parse multiple
89 # lines with the same key
90 cp = RawConfigParser(dict_type=MultiDict, strict=False)
91 except TypeError:
92 # RawConfigParser in python2 does not have the strict option
93 # but it allows multiple lines with the same key by default
94 cp = RawConfigParser(dict_type=MultiDict)
e28aa588
MP
95 cp.optionxform = lambda o: o # don't lower-case option names
96 with open(service) as f:
97 cp.readfp(f)
98 results[os.path.basename(service)] = cp
99
100 return (err, results)
101
102 def add_sysv(self, fname, keys, enable=False, prio=1):
103 '''Create a SysV init script with the given keys in the LSB header
104
105 There are sensible default values for all fields.
106 If enable is True, links will be created in the rcN.d dirs. In that
107 case, the priority can be given with "prio" (default to 1).
108
109 Return path of generated script.
110 '''
111 name_without_sh = fname.endswith('.sh') and fname[:-3] or fname
112 keys.setdefault('Provides', name_without_sh)
113 keys.setdefault('Required-Start', '$local_fs')
114 keys.setdefault('Required-Stop', keys['Required-Start'])
115 keys.setdefault('Default-Start', '2 3 4 5')
116 keys.setdefault('Default-Stop', '0 1 6')
117 keys.setdefault('Short-Description', 'test %s service' %
118 name_without_sh)
119 keys.setdefault('Description', 'long description for test %s service' %
120 name_without_sh)
121 script = os.path.join(self.init_d_dir, fname)
122 with open(script, 'w') as f:
123 f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
124 for k, v in keys.items():
125 if v is not None:
126 f.write('#%20s %s\n' % (k + ':', v))
127 f.write('### END INIT INFO\ncode --goes here\n')
128 os.chmod(script, 0o755)
129
130 if enable:
131 def make_link(prefix, runlevel):
132 d = os.path.join(self.rcnd_dir, 'rc%s.d' % runlevel)
133 if not os.path.isdir(d):
134 os.mkdir(d)
135 os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
136
137 for rl in keys['Default-Start'].split():
138 make_link('S%02i' % prio, rl)
139 for rl in keys['Default-Stop'].split():
140 make_link('K%02i' % (99 - prio), rl)
141
142 return script
143
0377e373
MP
144 def assert_enabled(self, unit, targets):
145 '''assert that a unit is enabled in precisely the given targets'''
e28aa588 146
0377e373 147 all_targets = ['multi-user', 'graphical']
e28aa588
MP
148
149 # should be enabled
0377e373
MP
150 for target in all_targets:
151 link = os.path.join(self.out_dir, '%s.target.wants' % target, unit)
152 if target in targets:
153 unit_file = os.readlink(link)
154 self.assertTrue(os.path.exists(unit_file))
155 self.assertEqual(os.path.basename(unit_file), unit)
e28aa588
MP
156 else:
157 self.assertFalse(os.path.exists(link),
158 '%s unexpectedly exists' % link)
159
160 #
161 # test cases
162 #
163
164 def test_nothing(self):
165 '''no input files'''
166
167 results = self.run_generator()[1]
168 self.assertEqual(results, {})
169 self.assertEqual(os.listdir(self.out_dir), [])
170
171 def test_simple_disabled(self):
172 '''simple service without dependencies, disabled'''
173
174 self.add_sysv('foo', {}, enable=False)
175 err, results = self.run_generator()
176 self.assertEqual(len(results), 1)
177
178 # no enablement links or other stuff
179 self.assertEqual(os.listdir(self.out_dir), ['foo.service'])
180
181 s = results['foo.service']
182 self.assertEqual(s.sections(), ['Unit', 'Service'])
183 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
184 # $local_fs does not need translation, don't expect any dependency
185 # fields here
186 self.assertEqual(set(s.options('Unit')),
187 set(['Documentation', 'SourcePath', 'Description']))
188
189 self.assertEqual(s.get('Service', 'Type'), 'forking')
190 init_script = os.path.join(self.init_d_dir, 'foo')
191 self.assertEqual(s.get('Service', 'ExecStart'),
192 '%s start' % init_script)
193 self.assertEqual(s.get('Service', 'ExecStop'),
194 '%s stop' % init_script)
195
4e558983
MP
196 self.assertNotIn('Overwriting', err)
197
e28aa588
MP
198 def test_simple_enabled_all(self):
199 '''simple service without dependencies, enabled in all runlevels'''
200
201 self.add_sysv('foo', {}, enable=True)
202 err, results = self.run_generator()
203 self.assertEqual(list(results), ['foo.service'])
0377e373 204 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
4e558983 205 self.assertNotIn('Overwriting', err)
e28aa588 206
264581a2
FS
207 def test_simple_escaped(self):
208 '''simple service without dependencies, that requires escaping the name'''
209
210 self.add_sysv('foo+', {})
211 self.add_sysv('foo-admin', {})
212 err, results = self.run_generator()
52a321d8 213 self.assertEqual(set(results), {'foo-admin.service', 'foo\\x2b.service'})
264581a2
FS
214 self.assertNotIn('Overwriting', err)
215
e28aa588
MP
216 def test_simple_enabled_some(self):
217 '''simple service without dependencies, enabled in some runlevels'''
218
219 self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True)
220 err, results = self.run_generator()
221 self.assertEqual(list(results), ['foo.service'])
0377e373 222 self.assert_enabled('foo.service', ['multi-user'])
e28aa588
MP
223
224 def test_lsb_macro_dep_single(self):
225 '''single LSB macro dependency: $network'''
226
227 self.add_sysv('foo', {'Required-Start': '$network'})
228 s = self.run_generator()[1]['foo.service']
229 self.assertEqual(set(s.options('Unit')),
230 set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
231 self.assertEqual(s.get('Unit', 'After'), 'network-online.target')
232 self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target')
233
234 def test_lsb_macro_dep_multi(self):
235 '''multiple LSB macro dependencies'''
236
237 self.add_sysv('foo', {'Required-Start': '$named $portmap'})
238 s = self.run_generator()[1]['foo.service']
239 self.assertEqual(set(s.options('Unit')),
240 set(['Documentation', 'SourcePath', 'Description', 'After']))
c584ffc0 241 self.assertEqual(s.get('Unit', 'After').split(), ['nss-lookup.target', 'rpcbind.target'])
e28aa588
MP
242
243 def test_lsb_deps(self):
244 '''LSB header dependencies to other services'''
245
246 # also give symlink priorities here; they should be ignored
247 self.add_sysv('foo', {'Required-Start': 'must1 must2',
248 'Should-Start': 'may1 ne_may2'},
249 enable=True, prio=40)
250 self.add_sysv('must1', {}, enable=True, prio=10)
251 self.add_sysv('must2', {}, enable=True, prio=15)
252 self.add_sysv('may1', {}, enable=True, prio=20)
253 # do not create ne_may2
254 err, results = self.run_generator()
255 self.assertEqual(sorted(results),
256 ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
257
258 # foo should depend on all of them
259 self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()),
260 ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
261
262 # other services should not depend on each other
263 self.assertFalse(results['must1.service'].has_option('Unit', 'After'))
264 self.assertFalse(results['must2.service'].has_option('Unit', 'After'))
265 self.assertFalse(results['may1.service'].has_option('Unit', 'After'))
266
267 def test_symlink_prio_deps(self):
268 '''script without LSB headers use rcN.d priority'''
269
270 # create two init.d scripts without LSB header and enable them with
271 # startup priorities
272 for prio, name in [(10, 'provider'), (15, 'consumer')]:
273 with open(os.path.join(self.init_d_dir, name), 'w') as f:
274 f.write('#!/bin/init-d-interpreter\ncode --goes here\n')
275 os.fchmod(f.fileno(), 0o755)
276
277 d = os.path.join(self.rcnd_dir, 'rc2.d')
278 if not os.path.isdir(d):
279 os.mkdir(d)
280 os.symlink('../init.d/' + name, os.path.join(d, 'S%02i%s' % (prio, name)))
281
282 err, results = self.run_generator()
283 self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
284 self.assertFalse(results['provider.service'].has_option('Unit', 'After'))
285 self.assertEqual(results['consumer.service'].get('Unit', 'After'),
286 'provider.service')
287
288 def test_multiple_provides(self):
289 '''multiple Provides: names'''
290
291 self.add_sysv('foo', {'Provides': 'foo bar baz'})
56401ac5
MP
292 err, results = self.run_generator()
293 self.assertEqual(list(results), ['foo.service'])
294 self.assertEqual(set(results['foo.service'].options('Unit')),
e28aa588
MP
295 set(['Documentation', 'SourcePath', 'Description']))
296 # should create symlinks for the alternative names
297 for f in ['bar.service', 'baz.service']:
298 self.assertEqual(os.readlink(os.path.join(self.out_dir, f)),
299 'foo.service')
4e558983 300 self.assertNotIn('Overwriting', err)
e28aa588 301
264581a2
FS
302 def test_provides_escaped(self):
303 '''a script that Provides: a name that requires escaping'''
304
305 self.add_sysv('foo', {'Provides': 'foo foo+'})
306 err, results = self.run_generator()
307 self.assertEqual(list(results), ['foo.service'])
308 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'foo\\x2b.service')),
309 'foo.service')
310 self.assertNotIn('Overwriting', err)
311
77354c7e
MP
312 def test_same_provides_in_multiple_scripts(self):
313 '''multiple init.d scripts provide the same name'''
314
315 self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1)
316 self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2)
317 err, results = self.run_generator()
318 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
319 # should create symlink for the alternative name for either unit
320 self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')),
321 ['foo.service', 'bar.service'])
322
323 def test_provide_other_script(self):
324 '''init.d scripts provides the name of another init.d script'''
325
326 self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True)
327 self.add_sysv('bar', {'Provides': 'bar'}, enable=True)
328 err, results = self.run_generator()
329 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
4e558983
MP
330 # we do expect an overwrite here, bar.service should overwrite the
331 # alias link from foo.service
332 self.assertIn('Overwriting', err)
77354c7e 333
e28aa588
MP
334 def test_nonexecutable_script(self):
335 '''ignores non-executable init.d script'''
336
337 os.chmod(self.add_sysv('foo', {}), 0o644)
338 err, results = self.run_generator()
339 self.assertEqual(results, {})
340
29e0e6d8
MP
341 def test_sh_suffix(self):
342 '''init.d script with .sh suffix'''
343
344 self.add_sysv('foo.sh', {}, enable=True)
345 err, results = self.run_generator()
346 s = results['foo.service']
347
348 self.assertEqual(s.sections(), ['Unit', 'Service'])
349 # should not have a .sh
350 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
351
352 # calls correct script with .sh
353 init_script = os.path.join(self.init_d_dir, 'foo.sh')
354 self.assertEqual(s.get('Service', 'ExecStart'),
355 '%s start' % init_script)
356 self.assertEqual(s.get('Service', 'ExecStop'),
357 '%s stop' % init_script)
358
0377e373 359 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
29e0e6d8
MP
360
361 def test_sh_suffix_with_provides(self):
362 '''init.d script with .sh suffix and Provides:'''
363
364 self.add_sysv('foo.sh', {'Provides': 'foo bar'})
365 err, results = self.run_generator()
366 # ensure we don't try to create a symlink to itself
230f0485 367 self.assertNotIn('itself', err)
29e0e6d8
MP
368 self.assertEqual(list(results), ['foo.service'])
369 self.assertEqual(results['foo.service'].get('Unit', 'Description'),
370 'LSB: test foo service')
371
372 # should create symlink for the alternative name
373 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')),
374 'foo.service')
375
d816e2b7
MP
376 def test_hidden_files(self):
377 '''init.d script with hidden file suffix'''
378
379 script = self.add_sysv('foo', {}, enable=True)
380 # backup files (not enabled in rcN.d/)
381 shutil.copy(script, script + '.dpkg-new')
382 shutil.copy(script, script + '.dpkg-dist')
383 shutil.copy(script, script + '.swp')
384 shutil.copy(script, script + '.rpmsave')
385
386 err, results = self.run_generator()
387 self.assertEqual(list(results), ['foo.service'])
388
0377e373 389 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
29e0e6d8 390
77354c7e
MP
391 def test_backup_file(self):
392 '''init.d script with backup file'''
393
394 script = self.add_sysv('foo', {}, enable=True)
395 # backup files (not enabled in rcN.d/)
396 shutil.copy(script, script + '.bak')
397 shutil.copy(script, script + '.old')
398
399 err, results = self.run_generator()
400 print(err)
401 self.assertEqual(sorted(results),
402 ['foo.bak.service', 'foo.old.service', 'foo.service'])
403
404 # ensure we don't try to create a symlink to itself
230f0485 405 self.assertNotIn('itself', err)
77354c7e 406
0377e373 407 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
77354c7e
MP
408 self.assert_enabled('foo.bak.service', [])
409 self.assert_enabled('foo.old.service', [])
410
f4f01ec1
MP
411 def test_existing_native_unit(self):
412 '''existing native unit'''
413
414 with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f:
415 f.write('[Unit]\n')
416
417 self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True)
418 err, results = self.run_generator()
419 self.assertEqual(list(results), [])
420 # no enablement or alias links, as native unit is disabled
421 self.assertEqual(os.listdir(self.out_dir), [])
422
e28aa588
MP
423
424if __name__ == '__main__':
425 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))