]> git.ipfire.org Git - thirdparty/systemd.git/blob - test/sysv-generator-test.py
Merge pull request #2080 from chaloulo/split-mode-host-remove-port-from-journal-filename
[thirdparty/systemd.git] / test / sysv-generator-test.py
1 # systemd-sysv-generator integration test
2 #
3 # (C) 2015 Canonical Ltd.
4 # Author: Martin Pitt <martin.pitt@ubuntu.com>
5 #
6 # systemd is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as published by
8 # the Free Software Foundation; either version 2.1 of the License, or
9 # (at your option) any later version.
10
11 # systemd is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # Lesser General Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public License
17 # along with systemd; If not, see <http://www.gnu.org/licenses/>.
18
19 import unittest
20 import sys
21 import os
22 import subprocess
23 import tempfile
24 import shutil
25 from glob import glob
26 import collections
27
28 try:
29 from configparser import RawConfigParser
30 except ImportError:
31 # python 2
32 from ConfigParser import RawConfigParser
33
34 sysv_generator = os.path.join(os.environ.get('builddir', '.'), 'systemd-sysv-generator')
35
36 class MultiDict(collections.OrderedDict):
37 def __setitem__(self, key, value):
38 if isinstance(value, list) and key in self:
39 self[key].extend(value)
40 else:
41 super(MultiDict, self).__setitem__(key, value)
42
43 class SysvGeneratorTest(unittest.TestCase):
44 def setUp(self):
45 self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.')
46 self.init_d_dir = os.path.join(self.workdir, 'init.d')
47 os.mkdir(self.init_d_dir)
48 self.rcnd_dir = self.workdir
49 self.unit_dir = os.path.join(self.workdir, 'systemd')
50 os.mkdir(self.unit_dir)
51 self.out_dir = os.path.join(self.workdir, 'output')
52 os.mkdir(self.out_dir)
53
54 def tearDown(self):
55 shutil.rmtree(self.workdir)
56
57 #
58 # Helper methods
59 #
60
61 def run_generator(self, expect_error=False):
62 '''Run sysv-generator.
63
64 Fail if stderr contains any "Fail", unless expect_error is True.
65 Return (stderr, filename -> ConfigParser) pair with ouput to stderr and
66 parsed generated units.
67 '''
68 env = os.environ.copy()
69 env['SYSTEMD_LOG_LEVEL'] = 'debug'
70 env['SYSTEMD_LOG_TARGET'] = 'console'
71 env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir
72 env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir
73 env['SYSTEMD_UNIT_PATH'] = self.unit_dir
74 gen = subprocess.Popen(
75 [sysv_generator, 'ignored', 'ignored', self.out_dir],
76 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
77 universal_newlines=True, env=env)
78 (out, err) = gen.communicate()
79 if not expect_error:
80 self.assertFalse('Fail' in err, err)
81 self.assertEqual(gen.returncode, 0, err)
82
83 results = {}
84 for service in glob(self.out_dir + '/*.service'):
85 if os.path.islink(service):
86 continue
87 try:
88 # for python3 we need here strict=False to parse multiple
89 # lines with the same key
90 cp = RawConfigParser(dict_type=MultiDict, strict=False)
91 except TypeError:
92 # RawConfigParser in python2 does not have the strict option
93 # but it allows multiple lines with the same key by default
94 cp = RawConfigParser(dict_type=MultiDict)
95 cp.optionxform = lambda o: o # don't lower-case option names
96 with open(service) as f:
97 cp.readfp(f)
98 results[os.path.basename(service)] = cp
99
100 return (err, results)
101
102 def add_sysv(self, fname, keys, enable=False, prio=1):
103 '''Create a SysV init script with the given keys in the LSB header
104
105 There are sensible default values for all fields.
106 If enable is True, links will be created in the rcN.d dirs. In that
107 case, the priority can be given with "prio" (default to 1).
108
109 Return path of generated script.
110 '''
111 name_without_sh = fname.endswith('.sh') and fname[:-3] or fname
112 keys.setdefault('Provides', name_without_sh)
113 keys.setdefault('Required-Start', '$local_fs')
114 keys.setdefault('Required-Stop', keys['Required-Start'])
115 keys.setdefault('Default-Start', '2 3 4 5')
116 keys.setdefault('Default-Stop', '0 1 6')
117 keys.setdefault('Short-Description', 'test %s service' %
118 name_without_sh)
119 keys.setdefault('Description', 'long description for test %s service' %
120 name_without_sh)
121 script = os.path.join(self.init_d_dir, fname)
122 with open(script, 'w') as f:
123 f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
124 for k, v in keys.items():
125 if v is not None:
126 f.write('#%20s %s\n' % (k + ':', v))
127 f.write('### END INIT INFO\ncode --goes here\n')
128 os.chmod(script, 0o755)
129
130 if enable:
131 def make_link(prefix, runlevel):
132 d = os.path.join(self.rcnd_dir, 'rc%s.d' % runlevel)
133 if not os.path.isdir(d):
134 os.mkdir(d)
135 os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
136
137 for rl in keys['Default-Start'].split():
138 make_link('S%02i' % prio, rl)
139 for rl in keys['Default-Stop'].split():
140 make_link('K%02i' % (99 - prio), rl)
141
142 return script
143
144 def assert_enabled(self, unit, targets):
145 '''assert that a unit is enabled in precisely the given targets'''
146
147 all_targets = ['multi-user', 'graphical']
148
149 # should be enabled
150 for target in all_targets:
151 link = os.path.join(self.out_dir, '%s.target.wants' % target, unit)
152 if target in targets:
153 unit_file = os.readlink(link)
154 self.assertTrue(os.path.exists(unit_file))
155 self.assertEqual(os.path.basename(unit_file), unit)
156 else:
157 self.assertFalse(os.path.exists(link),
158 '%s unexpectedly exists' % link)
159
160 #
161 # test cases
162 #
163
164 def test_nothing(self):
165 '''no input files'''
166
167 results = self.run_generator()[1]
168 self.assertEqual(results, {})
169 self.assertEqual(os.listdir(self.out_dir), [])
170
171 def test_simple_disabled(self):
172 '''simple service without dependencies, disabled'''
173
174 self.add_sysv('foo', {}, enable=False)
175 err, results = self.run_generator()
176 self.assertEqual(len(results), 1)
177
178 # no enablement links or other stuff
179 self.assertEqual(os.listdir(self.out_dir), ['foo.service'])
180
181 s = results['foo.service']
182 self.assertEqual(s.sections(), ['Unit', 'Service'])
183 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
184 # $local_fs does not need translation, don't expect any dependency
185 # fields here
186 self.assertEqual(set(s.options('Unit')),
187 set(['Documentation', 'SourcePath', 'Description']))
188
189 self.assertEqual(s.get('Service', 'Type'), 'forking')
190 init_script = os.path.join(self.init_d_dir, 'foo')
191 self.assertEqual(s.get('Service', 'ExecStart'),
192 '%s start' % init_script)
193 self.assertEqual(s.get('Service', 'ExecStop'),
194 '%s stop' % init_script)
195
196 self.assertNotIn('Overwriting', err)
197
198 def test_simple_enabled_all(self):
199 '''simple service without dependencies, enabled in all runlevels'''
200
201 self.add_sysv('foo', {}, enable=True)
202 err, results = self.run_generator()
203 self.assertEqual(list(results), ['foo.service'])
204 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
205 self.assertNotIn('Overwriting', err)
206
207 def test_simple_escaped(self):
208 '''simple service without dependencies, that requires escaping the name'''
209
210 self.add_sysv('foo+', {})
211 self.add_sysv('foo-admin', {})
212 err, results = self.run_generator()
213 self.assertEqual(set(results), {'foo-admin.service', 'foo\\x2b.service'})
214 self.assertNotIn('Overwriting', err)
215
216 def test_simple_enabled_some(self):
217 '''simple service without dependencies, enabled in some runlevels'''
218
219 self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True)
220 err, results = self.run_generator()
221 self.assertEqual(list(results), ['foo.service'])
222 self.assert_enabled('foo.service', ['multi-user'])
223
224 def test_lsb_macro_dep_single(self):
225 '''single LSB macro dependency: $network'''
226
227 self.add_sysv('foo', {'Required-Start': '$network'})
228 s = self.run_generator()[1]['foo.service']
229 self.assertEqual(set(s.options('Unit')),
230 set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
231 self.assertEqual(s.get('Unit', 'After'), 'network-online.target')
232 self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target')
233
234 def test_lsb_macro_dep_multi(self):
235 '''multiple LSB macro dependencies'''
236
237 self.add_sysv('foo', {'Required-Start': '$named $portmap'})
238 s = self.run_generator()[1]['foo.service']
239 self.assertEqual(set(s.options('Unit')),
240 set(['Documentation', 'SourcePath', 'Description', 'After']))
241 self.assertEqual(s.get('Unit', 'After').split(), ['nss-lookup.target', 'rpcbind.target'])
242
243 def test_lsb_deps(self):
244 '''LSB header dependencies to other services'''
245
246 # also give symlink priorities here; they should be ignored
247 self.add_sysv('foo', {'Required-Start': 'must1 must2',
248 'Should-Start': 'may1 ne_may2'},
249 enable=True, prio=40)
250 self.add_sysv('must1', {}, enable=True, prio=10)
251 self.add_sysv('must2', {}, enable=True, prio=15)
252 self.add_sysv('may1', {}, enable=True, prio=20)
253 # do not create ne_may2
254 err, results = self.run_generator()
255 self.assertEqual(sorted(results),
256 ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
257
258 # foo should depend on all of them
259 self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()),
260 ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
261
262 # other services should not depend on each other
263 self.assertFalse(results['must1.service'].has_option('Unit', 'After'))
264 self.assertFalse(results['must2.service'].has_option('Unit', 'After'))
265 self.assertFalse(results['may1.service'].has_option('Unit', 'After'))
266
267 def test_symlink_prio_deps(self):
268 '''script without LSB headers use rcN.d priority'''
269
270 # create two init.d scripts without LSB header and enable them with
271 # startup priorities
272 for prio, name in [(10, 'provider'), (15, 'consumer')]:
273 with open(os.path.join(self.init_d_dir, name), 'w') as f:
274 f.write('#!/bin/init-d-interpreter\ncode --goes here\n')
275 os.fchmod(f.fileno(), 0o755)
276
277 d = os.path.join(self.rcnd_dir, 'rc2.d')
278 if not os.path.isdir(d):
279 os.mkdir(d)
280 os.symlink('../init.d/' + name, os.path.join(d, 'S%02i%s' % (prio, name)))
281
282 err, results = self.run_generator()
283 self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
284 self.assertFalse(results['provider.service'].has_option('Unit', 'After'))
285 self.assertEqual(results['consumer.service'].get('Unit', 'After'),
286 'provider.service')
287
288 def test_multiple_provides(self):
289 '''multiple Provides: names'''
290
291 self.add_sysv('foo', {'Provides': 'foo bar baz'})
292 err, results = self.run_generator()
293 self.assertEqual(list(results), ['foo.service'])
294 self.assertEqual(set(results['foo.service'].options('Unit')),
295 set(['Documentation', 'SourcePath', 'Description']))
296 # should create symlinks for the alternative names
297 for f in ['bar.service', 'baz.service']:
298 self.assertEqual(os.readlink(os.path.join(self.out_dir, f)),
299 'foo.service')
300 self.assertNotIn('Overwriting', err)
301
302 def test_provides_escaped(self):
303 '''a script that Provides: a name that requires escaping'''
304
305 self.add_sysv('foo', {'Provides': 'foo foo+'})
306 err, results = self.run_generator()
307 self.assertEqual(list(results), ['foo.service'])
308 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'foo\\x2b.service')),
309 'foo.service')
310 self.assertNotIn('Overwriting', err)
311
312 def test_same_provides_in_multiple_scripts(self):
313 '''multiple init.d scripts provide the same name'''
314
315 self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1)
316 self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2)
317 err, results = self.run_generator()
318 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
319 # should create symlink for the alternative name for either unit
320 self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')),
321 ['foo.service', 'bar.service'])
322
323 def test_provide_other_script(self):
324 '''init.d scripts provides the name of another init.d script'''
325
326 self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True)
327 self.add_sysv('bar', {'Provides': 'bar'}, enable=True)
328 err, results = self.run_generator()
329 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
330 # we do expect an overwrite here, bar.service should overwrite the
331 # alias link from foo.service
332 self.assertIn('Overwriting', err)
333
334 def test_nonexecutable_script(self):
335 '''ignores non-executable init.d script'''
336
337 os.chmod(self.add_sysv('foo', {}), 0o644)
338 err, results = self.run_generator()
339 self.assertEqual(results, {})
340
341 def test_sh_suffix(self):
342 '''init.d script with .sh suffix'''
343
344 self.add_sysv('foo.sh', {}, enable=True)
345 err, results = self.run_generator()
346 s = results['foo.service']
347
348 self.assertEqual(s.sections(), ['Unit', 'Service'])
349 # should not have a .sh
350 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
351
352 # calls correct script with .sh
353 init_script = os.path.join(self.init_d_dir, 'foo.sh')
354 self.assertEqual(s.get('Service', 'ExecStart'),
355 '%s start' % init_script)
356 self.assertEqual(s.get('Service', 'ExecStop'),
357 '%s stop' % init_script)
358
359 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
360
361 def test_sh_suffix_with_provides(self):
362 '''init.d script with .sh suffix and Provides:'''
363
364 self.add_sysv('foo.sh', {'Provides': 'foo bar'})
365 err, results = self.run_generator()
366 # ensure we don't try to create a symlink to itself
367 self.assertNotIn('itself', err)
368 self.assertEqual(list(results), ['foo.service'])
369 self.assertEqual(results['foo.service'].get('Unit', 'Description'),
370 'LSB: test foo service')
371
372 # should create symlink for the alternative name
373 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')),
374 'foo.service')
375
376 def test_hidden_files(self):
377 '''init.d script with hidden file suffix'''
378
379 script = self.add_sysv('foo', {}, enable=True)
380 # backup files (not enabled in rcN.d/)
381 shutil.copy(script, script + '.dpkg-new')
382 shutil.copy(script, script + '.dpkg-dist')
383 shutil.copy(script, script + '.swp')
384 shutil.copy(script, script + '.rpmsave')
385
386 err, results = self.run_generator()
387 self.assertEqual(list(results), ['foo.service'])
388
389 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
390
391 def test_backup_file(self):
392 '''init.d script with backup file'''
393
394 script = self.add_sysv('foo', {}, enable=True)
395 # backup files (not enabled in rcN.d/)
396 shutil.copy(script, script + '.bak')
397 shutil.copy(script, script + '.old')
398
399 err, results = self.run_generator()
400 print(err)
401 self.assertEqual(sorted(results),
402 ['foo.bak.service', 'foo.old.service', 'foo.service'])
403
404 # ensure we don't try to create a symlink to itself
405 self.assertNotIn('itself', err)
406
407 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
408 self.assert_enabled('foo.bak.service', [])
409 self.assert_enabled('foo.old.service', [])
410
411 def test_existing_native_unit(self):
412 '''existing native unit'''
413
414 with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f:
415 f.write('[Unit]\n')
416
417 self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True)
418 err, results = self.run_generator()
419 self.assertEqual(list(results), [])
420 # no enablement or alias links, as native unit is disabled
421 self.assertEqual(os.listdir(self.out_dir), [])
422
423
424 if __name__ == '__main__':
425 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))