]>
Commit | Line | Data |
---|---|---|
3e67e5c9 | 1 | #!/usr/bin/env python3 |
1f7be300 | 2 | # |
e28aa588 MP |
3 | # systemd-sysv-generator integration test |
4 | # | |
5 | # (C) 2015 Canonical Ltd. | |
6 | # Author: Martin Pitt <martin.pitt@ubuntu.com> | |
7 | # | |
8 | # systemd is free software; you can redistribute it and/or modify it | |
9 | # under the terms of the GNU Lesser General Public License as published by | |
10 | # the Free Software Foundation; either version 2.1 of the License, or | |
11 | # (at your option) any later version. | |
12 | ||
13 | # systemd is distributed in the hope that it will be useful, but | |
14 | # WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
16 | # Lesser General Public License for more details. | |
17 | # | |
18 | # You should have received a copy of the GNU Lesser General Public License | |
19 | # along with systemd; If not, see <http://www.gnu.org/licenses/>. | |
20 | ||
21 | import unittest | |
22 | import sys | |
23 | import os | |
24 | import subprocess | |
25 | import tempfile | |
26 | import shutil | |
27 | from glob import glob | |
c584ffc0 | 28 | import collections |
e28aa588 MP |
29 | |
30 | try: | |
31 | from configparser import RawConfigParser | |
32 | except ImportError: | |
33 | # python 2 | |
34 | from ConfigParser import RawConfigParser | |
35 | ||
36 | sysv_generator = os.path.join(os.environ.get('builddir', '.'), 'systemd-sysv-generator') | |
37 | ||
c584ffc0 LN |
38 | class MultiDict(collections.OrderedDict): |
39 | def __setitem__(self, key, value): | |
40 | if isinstance(value, list) and key in self: | |
41 | self[key].extend(value) | |
42 | else: | |
43 | super(MultiDict, self).__setitem__(key, value) | |
e28aa588 MP |
44 | |
45 | class SysvGeneratorTest(unittest.TestCase): | |
46 | def setUp(self): | |
47 | self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.') | |
48 | self.init_d_dir = os.path.join(self.workdir, 'init.d') | |
49 | os.mkdir(self.init_d_dir) | |
50 | self.rcnd_dir = self.workdir | |
51 | self.unit_dir = os.path.join(self.workdir, 'systemd') | |
52 | os.mkdir(self.unit_dir) | |
53 | self.out_dir = os.path.join(self.workdir, 'output') | |
54 | os.mkdir(self.out_dir) | |
55 | ||
56 | def tearDown(self): | |
57 | shutil.rmtree(self.workdir) | |
58 | ||
59 | # | |
60 | # Helper methods | |
61 | # | |
62 | ||
63 | def run_generator(self, expect_error=False): | |
64 | '''Run sysv-generator. | |
65 | ||
66 | Fail if stderr contains any "Fail", unless expect_error is True. | |
67 | Return (stderr, filename -> ConfigParser) pair with ouput to stderr and | |
68 | parsed generated units. | |
69 | ''' | |
70 | env = os.environ.copy() | |
71 | env['SYSTEMD_LOG_LEVEL'] = 'debug' | |
6b7d32ad | 72 | env['SYSTEMD_LOG_TARGET'] = 'console' |
e28aa588 MP |
73 | env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir |
74 | env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir | |
75 | env['SYSTEMD_UNIT_PATH'] = self.unit_dir | |
76 | gen = subprocess.Popen( | |
77 | [sysv_generator, 'ignored', 'ignored', self.out_dir], | |
78 | stdout=subprocess.PIPE, stderr=subprocess.PIPE, | |
79 | universal_newlines=True, env=env) | |
80 | (out, err) = gen.communicate() | |
81 | if not expect_error: | |
82 | self.assertFalse('Fail' in err, err) | |
83 | self.assertEqual(gen.returncode, 0, err) | |
84 | ||
85 | results = {} | |
86 | for service in glob(self.out_dir + '/*.service'): | |
56401ac5 MP |
87 | if os.path.islink(service): |
88 | continue | |
c584ffc0 LN |
89 | try: |
90 | # for python3 we need here strict=False to parse multiple | |
91 | # lines with the same key | |
92 | cp = RawConfigParser(dict_type=MultiDict, strict=False) | |
93 | except TypeError: | |
94 | # RawConfigParser in python2 does not have the strict option | |
95 | # but it allows multiple lines with the same key by default | |
96 | cp = RawConfigParser(dict_type=MultiDict) | |
e28aa588 MP |
97 | cp.optionxform = lambda o: o # don't lower-case option names |
98 | with open(service) as f: | |
99 | cp.readfp(f) | |
100 | results[os.path.basename(service)] = cp | |
101 | ||
102 | return (err, results) | |
103 | ||
104 | def add_sysv(self, fname, keys, enable=False, prio=1): | |
105 | '''Create a SysV init script with the given keys in the LSB header | |
106 | ||
107 | There are sensible default values for all fields. | |
108 | If enable is True, links will be created in the rcN.d dirs. In that | |
109 | case, the priority can be given with "prio" (default to 1). | |
110 | ||
111 | Return path of generated script. | |
112 | ''' | |
113 | name_without_sh = fname.endswith('.sh') and fname[:-3] or fname | |
114 | keys.setdefault('Provides', name_without_sh) | |
115 | keys.setdefault('Required-Start', '$local_fs') | |
116 | keys.setdefault('Required-Stop', keys['Required-Start']) | |
117 | keys.setdefault('Default-Start', '2 3 4 5') | |
118 | keys.setdefault('Default-Stop', '0 1 6') | |
119 | keys.setdefault('Short-Description', 'test %s service' % | |
120 | name_without_sh) | |
121 | keys.setdefault('Description', 'long description for test %s service' % | |
122 | name_without_sh) | |
123 | script = os.path.join(self.init_d_dir, fname) | |
124 | with open(script, 'w') as f: | |
125 | f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n') | |
126 | for k, v in keys.items(): | |
127 | if v is not None: | |
128 | f.write('#%20s %s\n' % (k + ':', v)) | |
129 | f.write('### END INIT INFO\ncode --goes here\n') | |
130 | os.chmod(script, 0o755) | |
131 | ||
132 | if enable: | |
133 | def make_link(prefix, runlevel): | |
134 | d = os.path.join(self.rcnd_dir, 'rc%s.d' % runlevel) | |
135 | if not os.path.isdir(d): | |
136 | os.mkdir(d) | |
137 | os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname)) | |
138 | ||
139 | for rl in keys['Default-Start'].split(): | |
140 | make_link('S%02i' % prio, rl) | |
141 | for rl in keys['Default-Stop'].split(): | |
142 | make_link('K%02i' % (99 - prio), rl) | |
143 | ||
144 | return script | |
145 | ||
0377e373 MP |
146 | def assert_enabled(self, unit, targets): |
147 | '''assert that a unit is enabled in precisely the given targets''' | |
e28aa588 | 148 | |
0377e373 | 149 | all_targets = ['multi-user', 'graphical'] |
e28aa588 MP |
150 | |
151 | # should be enabled | |
0377e373 MP |
152 | for target in all_targets: |
153 | link = os.path.join(self.out_dir, '%s.target.wants' % target, unit) | |
154 | if target in targets: | |
155 | unit_file = os.readlink(link) | |
7f0cc637 ZJS |
156 | # os.path.exists() will fail on a dangling symlink |
157 | self.assertTrue(os.path.exists(link)) | |
0377e373 | 158 | self.assertEqual(os.path.basename(unit_file), unit) |
e28aa588 MP |
159 | else: |
160 | self.assertFalse(os.path.exists(link), | |
161 | '%s unexpectedly exists' % link) | |
162 | ||
163 | # | |
164 | # test cases | |
165 | # | |
166 | ||
167 | def test_nothing(self): | |
168 | '''no input files''' | |
169 | ||
170 | results = self.run_generator()[1] | |
171 | self.assertEqual(results, {}) | |
172 | self.assertEqual(os.listdir(self.out_dir), []) | |
173 | ||
174 | def test_simple_disabled(self): | |
175 | '''simple service without dependencies, disabled''' | |
176 | ||
177 | self.add_sysv('foo', {}, enable=False) | |
178 | err, results = self.run_generator() | |
179 | self.assertEqual(len(results), 1) | |
180 | ||
181 | # no enablement links or other stuff | |
182 | self.assertEqual(os.listdir(self.out_dir), ['foo.service']) | |
183 | ||
184 | s = results['foo.service'] | |
185 | self.assertEqual(s.sections(), ['Unit', 'Service']) | |
186 | self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service') | |
187 | # $local_fs does not need translation, don't expect any dependency | |
188 | # fields here | |
189 | self.assertEqual(set(s.options('Unit')), | |
190 | set(['Documentation', 'SourcePath', 'Description'])) | |
191 | ||
192 | self.assertEqual(s.get('Service', 'Type'), 'forking') | |
193 | init_script = os.path.join(self.init_d_dir, 'foo') | |
194 | self.assertEqual(s.get('Service', 'ExecStart'), | |
195 | '%s start' % init_script) | |
196 | self.assertEqual(s.get('Service', 'ExecStop'), | |
197 | '%s stop' % init_script) | |
198 | ||
4e558983 MP |
199 | self.assertNotIn('Overwriting', err) |
200 | ||
e28aa588 MP |
201 | def test_simple_enabled_all(self): |
202 | '''simple service without dependencies, enabled in all runlevels''' | |
203 | ||
204 | self.add_sysv('foo', {}, enable=True) | |
205 | err, results = self.run_generator() | |
206 | self.assertEqual(list(results), ['foo.service']) | |
0377e373 | 207 | self.assert_enabled('foo.service', ['multi-user', 'graphical']) |
4e558983 | 208 | self.assertNotIn('Overwriting', err) |
e28aa588 | 209 | |
264581a2 FS |
210 | def test_simple_escaped(self): |
211 | '''simple service without dependencies, that requires escaping the name''' | |
212 | ||
213 | self.add_sysv('foo+', {}) | |
214 | self.add_sysv('foo-admin', {}) | |
215 | err, results = self.run_generator() | |
52a321d8 | 216 | self.assertEqual(set(results), {'foo-admin.service', 'foo\\x2b.service'}) |
264581a2 FS |
217 | self.assertNotIn('Overwriting', err) |
218 | ||
e28aa588 MP |
219 | def test_simple_enabled_some(self): |
220 | '''simple service without dependencies, enabled in some runlevels''' | |
221 | ||
222 | self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True) | |
223 | err, results = self.run_generator() | |
224 | self.assertEqual(list(results), ['foo.service']) | |
0377e373 | 225 | self.assert_enabled('foo.service', ['multi-user']) |
e28aa588 MP |
226 | |
227 | def test_lsb_macro_dep_single(self): | |
228 | '''single LSB macro dependency: $network''' | |
229 | ||
230 | self.add_sysv('foo', {'Required-Start': '$network'}) | |
231 | s = self.run_generator()[1]['foo.service'] | |
232 | self.assertEqual(set(s.options('Unit')), | |
233 | set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants'])) | |
234 | self.assertEqual(s.get('Unit', 'After'), 'network-online.target') | |
235 | self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target') | |
236 | ||
237 | def test_lsb_macro_dep_multi(self): | |
238 | '''multiple LSB macro dependencies''' | |
239 | ||
240 | self.add_sysv('foo', {'Required-Start': '$named $portmap'}) | |
241 | s = self.run_generator()[1]['foo.service'] | |
242 | self.assertEqual(set(s.options('Unit')), | |
243 | set(['Documentation', 'SourcePath', 'Description', 'After'])) | |
c584ffc0 | 244 | self.assertEqual(s.get('Unit', 'After').split(), ['nss-lookup.target', 'rpcbind.target']) |
e28aa588 MP |
245 | |
246 | def test_lsb_deps(self): | |
247 | '''LSB header dependencies to other services''' | |
248 | ||
249 | # also give symlink priorities here; they should be ignored | |
250 | self.add_sysv('foo', {'Required-Start': 'must1 must2', | |
251 | 'Should-Start': 'may1 ne_may2'}, | |
252 | enable=True, prio=40) | |
253 | self.add_sysv('must1', {}, enable=True, prio=10) | |
254 | self.add_sysv('must2', {}, enable=True, prio=15) | |
255 | self.add_sysv('may1', {}, enable=True, prio=20) | |
256 | # do not create ne_may2 | |
257 | err, results = self.run_generator() | |
258 | self.assertEqual(sorted(results), | |
259 | ['foo.service', 'may1.service', 'must1.service', 'must2.service']) | |
260 | ||
261 | # foo should depend on all of them | |
262 | self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()), | |
263 | ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service']) | |
264 | ||
265 | # other services should not depend on each other | |
266 | self.assertFalse(results['must1.service'].has_option('Unit', 'After')) | |
267 | self.assertFalse(results['must2.service'].has_option('Unit', 'After')) | |
268 | self.assertFalse(results['may1.service'].has_option('Unit', 'After')) | |
269 | ||
270 | def test_symlink_prio_deps(self): | |
271 | '''script without LSB headers use rcN.d priority''' | |
272 | ||
273 | # create two init.d scripts without LSB header and enable them with | |
274 | # startup priorities | |
275 | for prio, name in [(10, 'provider'), (15, 'consumer')]: | |
276 | with open(os.path.join(self.init_d_dir, name), 'w') as f: | |
277 | f.write('#!/bin/init-d-interpreter\ncode --goes here\n') | |
278 | os.fchmod(f.fileno(), 0o755) | |
279 | ||
280 | d = os.path.join(self.rcnd_dir, 'rc2.d') | |
281 | if not os.path.isdir(d): | |
282 | os.mkdir(d) | |
283 | os.symlink('../init.d/' + name, os.path.join(d, 'S%02i%s' % (prio, name))) | |
284 | ||
285 | err, results = self.run_generator() | |
286 | self.assertEqual(sorted(results), ['consumer.service', 'provider.service']) | |
287 | self.assertFalse(results['provider.service'].has_option('Unit', 'After')) | |
288 | self.assertEqual(results['consumer.service'].get('Unit', 'After'), | |
289 | 'provider.service') | |
290 | ||
291 | def test_multiple_provides(self): | |
292 | '''multiple Provides: names''' | |
293 | ||
294 | self.add_sysv('foo', {'Provides': 'foo bar baz'}) | |
56401ac5 MP |
295 | err, results = self.run_generator() |
296 | self.assertEqual(list(results), ['foo.service']) | |
297 | self.assertEqual(set(results['foo.service'].options('Unit')), | |
e28aa588 MP |
298 | set(['Documentation', 'SourcePath', 'Description'])) |
299 | # should create symlinks for the alternative names | |
300 | for f in ['bar.service', 'baz.service']: | |
301 | self.assertEqual(os.readlink(os.path.join(self.out_dir, f)), | |
302 | 'foo.service') | |
4e558983 | 303 | self.assertNotIn('Overwriting', err) |
e28aa588 | 304 | |
264581a2 FS |
305 | def test_provides_escaped(self): |
306 | '''a script that Provides: a name that requires escaping''' | |
307 | ||
308 | self.add_sysv('foo', {'Provides': 'foo foo+'}) | |
309 | err, results = self.run_generator() | |
310 | self.assertEqual(list(results), ['foo.service']) | |
311 | self.assertEqual(os.readlink(os.path.join(self.out_dir, 'foo\\x2b.service')), | |
00d5eaaf | 312 | 'foo.service') |
264581a2 FS |
313 | self.assertNotIn('Overwriting', err) |
314 | ||
77354c7e MP |
315 | def test_same_provides_in_multiple_scripts(self): |
316 | '''multiple init.d scripts provide the same name''' | |
317 | ||
318 | self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1) | |
319 | self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2) | |
320 | err, results = self.run_generator() | |
321 | self.assertEqual(sorted(results), ['bar.service', 'foo.service']) | |
322 | # should create symlink for the alternative name for either unit | |
323 | self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')), | |
324 | ['foo.service', 'bar.service']) | |
325 | ||
326 | def test_provide_other_script(self): | |
327 | '''init.d scripts provides the name of another init.d script''' | |
328 | ||
329 | self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True) | |
330 | self.add_sysv('bar', {'Provides': 'bar'}, enable=True) | |
331 | err, results = self.run_generator() | |
332 | self.assertEqual(sorted(results), ['bar.service', 'foo.service']) | |
4e558983 MP |
333 | # we do expect an overwrite here, bar.service should overwrite the |
334 | # alias link from foo.service | |
335 | self.assertIn('Overwriting', err) | |
77354c7e | 336 | |
e28aa588 MP |
337 | def test_nonexecutable_script(self): |
338 | '''ignores non-executable init.d script''' | |
339 | ||
340 | os.chmod(self.add_sysv('foo', {}), 0o644) | |
341 | err, results = self.run_generator() | |
342 | self.assertEqual(results, {}) | |
343 | ||
29e0e6d8 MP |
344 | def test_sh_suffix(self): |
345 | '''init.d script with .sh suffix''' | |
346 | ||
347 | self.add_sysv('foo.sh', {}, enable=True) | |
348 | err, results = self.run_generator() | |
349 | s = results['foo.service'] | |
350 | ||
351 | self.assertEqual(s.sections(), ['Unit', 'Service']) | |
352 | # should not have a .sh | |
353 | self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service') | |
354 | ||
355 | # calls correct script with .sh | |
356 | init_script = os.path.join(self.init_d_dir, 'foo.sh') | |
357 | self.assertEqual(s.get('Service', 'ExecStart'), | |
358 | '%s start' % init_script) | |
359 | self.assertEqual(s.get('Service', 'ExecStop'), | |
360 | '%s stop' % init_script) | |
361 | ||
0377e373 | 362 | self.assert_enabled('foo.service', ['multi-user', 'graphical']) |
29e0e6d8 MP |
363 | |
364 | def test_sh_suffix_with_provides(self): | |
365 | '''init.d script with .sh suffix and Provides:''' | |
366 | ||
367 | self.add_sysv('foo.sh', {'Provides': 'foo bar'}) | |
368 | err, results = self.run_generator() | |
369 | # ensure we don't try to create a symlink to itself | |
230f0485 | 370 | self.assertNotIn('itself', err) |
29e0e6d8 MP |
371 | self.assertEqual(list(results), ['foo.service']) |
372 | self.assertEqual(results['foo.service'].get('Unit', 'Description'), | |
373 | 'LSB: test foo service') | |
374 | ||
375 | # should create symlink for the alternative name | |
376 | self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')), | |
377 | 'foo.service') | |
378 | ||
d816e2b7 MP |
379 | def test_hidden_files(self): |
380 | '''init.d script with hidden file suffix''' | |
381 | ||
382 | script = self.add_sysv('foo', {}, enable=True) | |
383 | # backup files (not enabled in rcN.d/) | |
384 | shutil.copy(script, script + '.dpkg-new') | |
385 | shutil.copy(script, script + '.dpkg-dist') | |
386 | shutil.copy(script, script + '.swp') | |
387 | shutil.copy(script, script + '.rpmsave') | |
388 | ||
389 | err, results = self.run_generator() | |
390 | self.assertEqual(list(results), ['foo.service']) | |
391 | ||
0377e373 | 392 | self.assert_enabled('foo.service', ['multi-user', 'graphical']) |
29e0e6d8 | 393 | |
77354c7e MP |
394 | def test_backup_file(self): |
395 | '''init.d script with backup file''' | |
396 | ||
397 | script = self.add_sysv('foo', {}, enable=True) | |
398 | # backup files (not enabled in rcN.d/) | |
399 | shutil.copy(script, script + '.bak') | |
400 | shutil.copy(script, script + '.old') | |
94a0ef6e ZJS |
401 | shutil.copy(script, script + '.tmp') |
402 | shutil.copy(script, script + '.new') | |
77354c7e MP |
403 | |
404 | err, results = self.run_generator() | |
405 | print(err) | |
94a0ef6e | 406 | self.assertEqual(sorted(results), ['foo.service', 'foo.tmp.service']) |
77354c7e MP |
407 | |
408 | # ensure we don't try to create a symlink to itself | |
230f0485 | 409 | self.assertNotIn('itself', err) |
77354c7e | 410 | |
0377e373 | 411 | self.assert_enabled('foo.service', ['multi-user', 'graphical']) |
77354c7e MP |
412 | self.assert_enabled('foo.bak.service', []) |
413 | self.assert_enabled('foo.old.service', []) | |
414 | ||
f4f01ec1 MP |
415 | def test_existing_native_unit(self): |
416 | '''existing native unit''' | |
417 | ||
418 | with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f: | |
419 | f.write('[Unit]\n') | |
420 | ||
421 | self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True) | |
422 | err, results = self.run_generator() | |
423 | self.assertEqual(list(results), []) | |
424 | # no enablement or alias links, as native unit is disabled | |
425 | self.assertEqual(os.listdir(self.out_dir), []) | |
426 | ||
e28aa588 MP |
427 | |
428 | if __name__ == '__main__': | |
429 | unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2)) |