]> git.ipfire.org Git - thirdparty/systemd.git/blob - test/sysv-generator-test.py
test: chmod +x sysv-generator-test
[thirdparty/systemd.git] / test / sysv-generator-test.py
1 #!/usr/bin/python
2 #
3 # systemd-sysv-generator integration test
4 #
5 # (C) 2015 Canonical Ltd.
6 # Author: Martin Pitt <martin.pitt@ubuntu.com>
7 #
8 # systemd is free software; you can redistribute it and/or modify it
9 # under the terms of the GNU Lesser General Public License as published by
10 # the Free Software Foundation; either version 2.1 of the License, or
11 # (at your option) any later version.
12
13 # systemd is distributed in the hope that it will be useful, but
14 # WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 # Lesser General Public License for more details.
17 #
18 # You should have received a copy of the GNU Lesser General Public License
19 # along with systemd; If not, see <http://www.gnu.org/licenses/>.
20
21 import unittest
22 import sys
23 import os
24 import subprocess
25 import tempfile
26 import shutil
27 from glob import glob
28 import collections
29
30 try:
31 from configparser import RawConfigParser
32 except ImportError:
33 # python 2
34 from ConfigParser import RawConfigParser
35
36 sysv_generator = os.path.join(os.environ.get('builddir', '.'), 'systemd-sysv-generator')
37
38 class MultiDict(collections.OrderedDict):
39 def __setitem__(self, key, value):
40 if isinstance(value, list) and key in self:
41 self[key].extend(value)
42 else:
43 super(MultiDict, self).__setitem__(key, value)
44
45 class SysvGeneratorTest(unittest.TestCase):
46 def setUp(self):
47 self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.')
48 self.init_d_dir = os.path.join(self.workdir, 'init.d')
49 os.mkdir(self.init_d_dir)
50 self.rcnd_dir = self.workdir
51 self.unit_dir = os.path.join(self.workdir, 'systemd')
52 os.mkdir(self.unit_dir)
53 self.out_dir = os.path.join(self.workdir, 'output')
54 os.mkdir(self.out_dir)
55
56 def tearDown(self):
57 shutil.rmtree(self.workdir)
58
59 #
60 # Helper methods
61 #
62
63 def run_generator(self, expect_error=False):
64 '''Run sysv-generator.
65
66 Fail if stderr contains any "Fail", unless expect_error is True.
67 Return (stderr, filename -> ConfigParser) pair with ouput to stderr and
68 parsed generated units.
69 '''
70 env = os.environ.copy()
71 env['SYSTEMD_LOG_LEVEL'] = 'debug'
72 env['SYSTEMD_LOG_TARGET'] = 'console'
73 env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir
74 env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir
75 env['SYSTEMD_UNIT_PATH'] = self.unit_dir
76 gen = subprocess.Popen(
77 [sysv_generator, 'ignored', 'ignored', self.out_dir],
78 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
79 universal_newlines=True, env=env)
80 (out, err) = gen.communicate()
81 if not expect_error:
82 self.assertFalse('Fail' in err, err)
83 self.assertEqual(gen.returncode, 0, err)
84
85 results = {}
86 for service in glob(self.out_dir + '/*.service'):
87 if os.path.islink(service):
88 continue
89 try:
90 # for python3 we need here strict=False to parse multiple
91 # lines with the same key
92 cp = RawConfigParser(dict_type=MultiDict, strict=False)
93 except TypeError:
94 # RawConfigParser in python2 does not have the strict option
95 # but it allows multiple lines with the same key by default
96 cp = RawConfigParser(dict_type=MultiDict)
97 cp.optionxform = lambda o: o # don't lower-case option names
98 with open(service) as f:
99 cp.readfp(f)
100 results[os.path.basename(service)] = cp
101
102 return (err, results)
103
104 def add_sysv(self, fname, keys, enable=False, prio=1):
105 '''Create a SysV init script with the given keys in the LSB header
106
107 There are sensible default values for all fields.
108 If enable is True, links will be created in the rcN.d dirs. In that
109 case, the priority can be given with "prio" (default to 1).
110
111 Return path of generated script.
112 '''
113 name_without_sh = fname.endswith('.sh') and fname[:-3] or fname
114 keys.setdefault('Provides', name_without_sh)
115 keys.setdefault('Required-Start', '$local_fs')
116 keys.setdefault('Required-Stop', keys['Required-Start'])
117 keys.setdefault('Default-Start', '2 3 4 5')
118 keys.setdefault('Default-Stop', '0 1 6')
119 keys.setdefault('Short-Description', 'test %s service' %
120 name_without_sh)
121 keys.setdefault('Description', 'long description for test %s service' %
122 name_without_sh)
123 script = os.path.join(self.init_d_dir, fname)
124 with open(script, 'w') as f:
125 f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
126 for k, v in keys.items():
127 if v is not None:
128 f.write('#%20s %s\n' % (k + ':', v))
129 f.write('### END INIT INFO\ncode --goes here\n')
130 os.chmod(script, 0o755)
131
132 if enable:
133 def make_link(prefix, runlevel):
134 d = os.path.join(self.rcnd_dir, 'rc%s.d' % runlevel)
135 if not os.path.isdir(d):
136 os.mkdir(d)
137 os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
138
139 for rl in keys['Default-Start'].split():
140 make_link('S%02i' % prio, rl)
141 for rl in keys['Default-Stop'].split():
142 make_link('K%02i' % (99 - prio), rl)
143
144 return script
145
146 def assert_enabled(self, unit, targets):
147 '''assert that a unit is enabled in precisely the given targets'''
148
149 all_targets = ['multi-user', 'graphical']
150
151 # should be enabled
152 for target in all_targets:
153 link = os.path.join(self.out_dir, '%s.target.wants' % target, unit)
154 if target in targets:
155 unit_file = os.readlink(link)
156 self.assertTrue(os.path.exists(unit_file))
157 self.assertEqual(os.path.basename(unit_file), unit)
158 else:
159 self.assertFalse(os.path.exists(link),
160 '%s unexpectedly exists' % link)
161
162 #
163 # test cases
164 #
165
166 def test_nothing(self):
167 '''no input files'''
168
169 results = self.run_generator()[1]
170 self.assertEqual(results, {})
171 self.assertEqual(os.listdir(self.out_dir), [])
172
173 def test_simple_disabled(self):
174 '''simple service without dependencies, disabled'''
175
176 self.add_sysv('foo', {}, enable=False)
177 err, results = self.run_generator()
178 self.assertEqual(len(results), 1)
179
180 # no enablement links or other stuff
181 self.assertEqual(os.listdir(self.out_dir), ['foo.service'])
182
183 s = results['foo.service']
184 self.assertEqual(s.sections(), ['Unit', 'Service'])
185 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
186 # $local_fs does not need translation, don't expect any dependency
187 # fields here
188 self.assertEqual(set(s.options('Unit')),
189 set(['Documentation', 'SourcePath', 'Description']))
190
191 self.assertEqual(s.get('Service', 'Type'), 'forking')
192 init_script = os.path.join(self.init_d_dir, 'foo')
193 self.assertEqual(s.get('Service', 'ExecStart'),
194 '%s start' % init_script)
195 self.assertEqual(s.get('Service', 'ExecStop'),
196 '%s stop' % init_script)
197
198 self.assertNotIn('Overwriting', err)
199
200 def test_simple_enabled_all(self):
201 '''simple service without dependencies, enabled in all runlevels'''
202
203 self.add_sysv('foo', {}, enable=True)
204 err, results = self.run_generator()
205 self.assertEqual(list(results), ['foo.service'])
206 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
207 self.assertNotIn('Overwriting', err)
208
209 def test_simple_escaped(self):
210 '''simple service without dependencies, that requires escaping the name'''
211
212 self.add_sysv('foo+', {})
213 self.add_sysv('foo-admin', {})
214 err, results = self.run_generator()
215 self.assertEqual(set(results), {'foo-admin.service', 'foo\\x2b.service'})
216 self.assertNotIn('Overwriting', err)
217
218 def test_simple_enabled_some(self):
219 '''simple service without dependencies, enabled in some runlevels'''
220
221 self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True)
222 err, results = self.run_generator()
223 self.assertEqual(list(results), ['foo.service'])
224 self.assert_enabled('foo.service', ['multi-user'])
225
226 def test_lsb_macro_dep_single(self):
227 '''single LSB macro dependency: $network'''
228
229 self.add_sysv('foo', {'Required-Start': '$network'})
230 s = self.run_generator()[1]['foo.service']
231 self.assertEqual(set(s.options('Unit')),
232 set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
233 self.assertEqual(s.get('Unit', 'After'), 'network-online.target')
234 self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target')
235
236 def test_lsb_macro_dep_multi(self):
237 '''multiple LSB macro dependencies'''
238
239 self.add_sysv('foo', {'Required-Start': '$named $portmap'})
240 s = self.run_generator()[1]['foo.service']
241 self.assertEqual(set(s.options('Unit')),
242 set(['Documentation', 'SourcePath', 'Description', 'After']))
243 self.assertEqual(s.get('Unit', 'After').split(), ['nss-lookup.target', 'rpcbind.target'])
244
245 def test_lsb_deps(self):
246 '''LSB header dependencies to other services'''
247
248 # also give symlink priorities here; they should be ignored
249 self.add_sysv('foo', {'Required-Start': 'must1 must2',
250 'Should-Start': 'may1 ne_may2'},
251 enable=True, prio=40)
252 self.add_sysv('must1', {}, enable=True, prio=10)
253 self.add_sysv('must2', {}, enable=True, prio=15)
254 self.add_sysv('may1', {}, enable=True, prio=20)
255 # do not create ne_may2
256 err, results = self.run_generator()
257 self.assertEqual(sorted(results),
258 ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
259
260 # foo should depend on all of them
261 self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()),
262 ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
263
264 # other services should not depend on each other
265 self.assertFalse(results['must1.service'].has_option('Unit', 'After'))
266 self.assertFalse(results['must2.service'].has_option('Unit', 'After'))
267 self.assertFalse(results['may1.service'].has_option('Unit', 'After'))
268
269 def test_symlink_prio_deps(self):
270 '''script without LSB headers use rcN.d priority'''
271
272 # create two init.d scripts without LSB header and enable them with
273 # startup priorities
274 for prio, name in [(10, 'provider'), (15, 'consumer')]:
275 with open(os.path.join(self.init_d_dir, name), 'w') as f:
276 f.write('#!/bin/init-d-interpreter\ncode --goes here\n')
277 os.fchmod(f.fileno(), 0o755)
278
279 d = os.path.join(self.rcnd_dir, 'rc2.d')
280 if not os.path.isdir(d):
281 os.mkdir(d)
282 os.symlink('../init.d/' + name, os.path.join(d, 'S%02i%s' % (prio, name)))
283
284 err, results = self.run_generator()
285 self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
286 self.assertFalse(results['provider.service'].has_option('Unit', 'After'))
287 self.assertEqual(results['consumer.service'].get('Unit', 'After'),
288 'provider.service')
289
290 def test_multiple_provides(self):
291 '''multiple Provides: names'''
292
293 self.add_sysv('foo', {'Provides': 'foo bar baz'})
294 err, results = self.run_generator()
295 self.assertEqual(list(results), ['foo.service'])
296 self.assertEqual(set(results['foo.service'].options('Unit')),
297 set(['Documentation', 'SourcePath', 'Description']))
298 # should create symlinks for the alternative names
299 for f in ['bar.service', 'baz.service']:
300 self.assertEqual(os.readlink(os.path.join(self.out_dir, f)),
301 'foo.service')
302 self.assertNotIn('Overwriting', err)
303
304 def test_provides_escaped(self):
305 '''a script that Provides: a name that requires escaping'''
306
307 self.add_sysv('foo', {'Provides': 'foo foo+'})
308 err, results = self.run_generator()
309 self.assertEqual(list(results), ['foo.service'])
310 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'foo\\x2b.service')),
311 'foo.service')
312 self.assertNotIn('Overwriting', err)
313
314 def test_same_provides_in_multiple_scripts(self):
315 '''multiple init.d scripts provide the same name'''
316
317 self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1)
318 self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2)
319 err, results = self.run_generator()
320 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
321 # should create symlink for the alternative name for either unit
322 self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')),
323 ['foo.service', 'bar.service'])
324
325 def test_provide_other_script(self):
326 '''init.d scripts provides the name of another init.d script'''
327
328 self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True)
329 self.add_sysv('bar', {'Provides': 'bar'}, enable=True)
330 err, results = self.run_generator()
331 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
332 # we do expect an overwrite here, bar.service should overwrite the
333 # alias link from foo.service
334 self.assertIn('Overwriting', err)
335
336 def test_nonexecutable_script(self):
337 '''ignores non-executable init.d script'''
338
339 os.chmod(self.add_sysv('foo', {}), 0o644)
340 err, results = self.run_generator()
341 self.assertEqual(results, {})
342
343 def test_sh_suffix(self):
344 '''init.d script with .sh suffix'''
345
346 self.add_sysv('foo.sh', {}, enable=True)
347 err, results = self.run_generator()
348 s = results['foo.service']
349
350 self.assertEqual(s.sections(), ['Unit', 'Service'])
351 # should not have a .sh
352 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
353
354 # calls correct script with .sh
355 init_script = os.path.join(self.init_d_dir, 'foo.sh')
356 self.assertEqual(s.get('Service', 'ExecStart'),
357 '%s start' % init_script)
358 self.assertEqual(s.get('Service', 'ExecStop'),
359 '%s stop' % init_script)
360
361 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
362
363 def test_sh_suffix_with_provides(self):
364 '''init.d script with .sh suffix and Provides:'''
365
366 self.add_sysv('foo.sh', {'Provides': 'foo bar'})
367 err, results = self.run_generator()
368 # ensure we don't try to create a symlink to itself
369 self.assertNotIn('itself', err)
370 self.assertEqual(list(results), ['foo.service'])
371 self.assertEqual(results['foo.service'].get('Unit', 'Description'),
372 'LSB: test foo service')
373
374 # should create symlink for the alternative name
375 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')),
376 'foo.service')
377
378 def test_hidden_files(self):
379 '''init.d script with hidden file suffix'''
380
381 script = self.add_sysv('foo', {}, enable=True)
382 # backup files (not enabled in rcN.d/)
383 shutil.copy(script, script + '.dpkg-new')
384 shutil.copy(script, script + '.dpkg-dist')
385 shutil.copy(script, script + '.swp')
386 shutil.copy(script, script + '.rpmsave')
387
388 err, results = self.run_generator()
389 self.assertEqual(list(results), ['foo.service'])
390
391 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
392
393 def test_backup_file(self):
394 '''init.d script with backup file'''
395
396 script = self.add_sysv('foo', {}, enable=True)
397 # backup files (not enabled in rcN.d/)
398 shutil.copy(script, script + '.bak')
399 shutil.copy(script, script + '.old')
400
401 err, results = self.run_generator()
402 print(err)
403 self.assertEqual(sorted(results),
404 ['foo.bak.service', 'foo.old.service', 'foo.service'])
405
406 # ensure we don't try to create a symlink to itself
407 self.assertNotIn('itself', err)
408
409 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
410 self.assert_enabled('foo.bak.service', [])
411 self.assert_enabled('foo.old.service', [])
412
413 def test_existing_native_unit(self):
414 '''existing native unit'''
415
416 with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f:
417 f.write('[Unit]\n')
418
419 self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True)
420 err, results = self.run_generator()
421 self.assertEqual(list(results), [])
422 # no enablement or alias links, as native unit is disabled
423 self.assertEqual(os.listdir(self.out_dir), [])
424
425
426 if __name__ == '__main__':
427 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))