]> git.ipfire.org Git - thirdparty/systemd.git/blob - test/sysv-generator-test.py
sd-bus: rename bloom-tag to arg0-has
[thirdparty/systemd.git] / test / sysv-generator-test.py
1 # systemd-sysv-generator integration test
2 #
3 # (C) 2015 Canonical Ltd.
4 # Author: Martin Pitt <martin.pitt@ubuntu.com>
5 #
6 # systemd is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as published by
8 # the Free Software Foundation; either version 2.1 of the License, or
9 # (at your option) any later version.
10
11 # systemd is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # Lesser General Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public License
17 # along with systemd; If not, see <http://www.gnu.org/licenses/>.
18
19 import unittest
20 import sys
21 import os
22 import subprocess
23 import tempfile
24 import shutil
25 from glob import glob
26
27 try:
28 from configparser import RawConfigParser
29 except ImportError:
30 # python 2
31 from ConfigParser import RawConfigParser
32
33 sysv_generator = os.path.join(os.environ.get('builddir', '.'), 'systemd-sysv-generator')
34
35
36 class SysvGeneratorTest(unittest.TestCase):
37 def setUp(self):
38 self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.')
39 self.init_d_dir = os.path.join(self.workdir, 'init.d')
40 os.mkdir(self.init_d_dir)
41 self.rcnd_dir = self.workdir
42 self.unit_dir = os.path.join(self.workdir, 'systemd')
43 os.mkdir(self.unit_dir)
44 self.out_dir = os.path.join(self.workdir, 'output')
45 os.mkdir(self.out_dir)
46
47 def tearDown(self):
48 shutil.rmtree(self.workdir)
49
50 #
51 # Helper methods
52 #
53
54 def run_generator(self, expect_error=False):
55 '''Run sysv-generator.
56
57 Fail if stderr contains any "Fail", unless expect_error is True.
58 Return (stderr, filename -> ConfigParser) pair with ouput to stderr and
59 parsed generated units.
60 '''
61 env = os.environ.copy()
62 env['SYSTEMD_LOG_LEVEL'] = 'debug'
63 env['SYSTEMD_LOG_TARGET'] = 'console'
64 env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir
65 env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir
66 env['SYSTEMD_UNIT_PATH'] = self.unit_dir
67 gen = subprocess.Popen(
68 [sysv_generator, 'ignored', 'ignored', self.out_dir],
69 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
70 universal_newlines=True, env=env)
71 (out, err) = gen.communicate()
72 if not expect_error:
73 self.assertFalse('Fail' in err, err)
74 self.assertEqual(gen.returncode, 0, err)
75
76 results = {}
77 for service in glob(self.out_dir + '/*.service'):
78 if os.path.islink(service):
79 continue
80 cp = RawConfigParser()
81 cp.optionxform = lambda o: o # don't lower-case option names
82 with open(service) as f:
83 cp.readfp(f)
84 results[os.path.basename(service)] = cp
85
86 return (err, results)
87
88 def add_sysv(self, fname, keys, enable=False, prio=1):
89 '''Create a SysV init script with the given keys in the LSB header
90
91 There are sensible default values for all fields.
92 If enable is True, links will be created in the rcN.d dirs. In that
93 case, the priority can be given with "prio" (default to 1).
94
95 Return path of generated script.
96 '''
97 name_without_sh = fname.endswith('.sh') and fname[:-3] or fname
98 keys.setdefault('Provides', name_without_sh)
99 keys.setdefault('Required-Start', '$local_fs')
100 keys.setdefault('Required-Stop', keys['Required-Start'])
101 keys.setdefault('Default-Start', '2 3 4 5')
102 keys.setdefault('Default-Stop', '0 1 6')
103 keys.setdefault('Short-Description', 'test %s service' %
104 name_without_sh)
105 keys.setdefault('Description', 'long description for test %s service' %
106 name_without_sh)
107 script = os.path.join(self.init_d_dir, fname)
108 with open(script, 'w') as f:
109 f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
110 for k, v in keys.items():
111 if v is not None:
112 f.write('#%20s %s\n' % (k + ':', v))
113 f.write('### END INIT INFO\ncode --goes here\n')
114 os.chmod(script, 0o755)
115
116 if enable:
117 def make_link(prefix, runlevel):
118 d = os.path.join(self.rcnd_dir, 'rc%s.d' % runlevel)
119 if not os.path.isdir(d):
120 os.mkdir(d)
121 os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
122
123 for rl in keys['Default-Start'].split():
124 make_link('S%02i' % prio, rl)
125 for rl in keys['Default-Stop'].split():
126 make_link('K%02i' % (99 - prio), rl)
127
128 return script
129
130 def assert_enabled(self, unit, targets):
131 '''assert that a unit is enabled in precisely the given targets'''
132
133 all_targets = ['multi-user', 'graphical']
134
135 # should be enabled
136 for target in all_targets:
137 link = os.path.join(self.out_dir, '%s.target.wants' % target, unit)
138 if target in targets:
139 unit_file = os.readlink(link)
140 self.assertTrue(os.path.exists(unit_file))
141 self.assertEqual(os.path.basename(unit_file), unit)
142 else:
143 self.assertFalse(os.path.exists(link),
144 '%s unexpectedly exists' % link)
145
146 #
147 # test cases
148 #
149
150 def test_nothing(self):
151 '''no input files'''
152
153 results = self.run_generator()[1]
154 self.assertEqual(results, {})
155 self.assertEqual(os.listdir(self.out_dir), [])
156
157 def test_simple_disabled(self):
158 '''simple service without dependencies, disabled'''
159
160 self.add_sysv('foo', {}, enable=False)
161 err, results = self.run_generator()
162 self.assertEqual(len(results), 1)
163
164 # no enablement links or other stuff
165 self.assertEqual(os.listdir(self.out_dir), ['foo.service'])
166
167 s = results['foo.service']
168 self.assertEqual(s.sections(), ['Unit', 'Service'])
169 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
170 # $local_fs does not need translation, don't expect any dependency
171 # fields here
172 self.assertEqual(set(s.options('Unit')),
173 set(['Documentation', 'SourcePath', 'Description']))
174
175 self.assertEqual(s.get('Service', 'Type'), 'forking')
176 init_script = os.path.join(self.init_d_dir, 'foo')
177 self.assertEqual(s.get('Service', 'ExecStart'),
178 '%s start' % init_script)
179 self.assertEqual(s.get('Service', 'ExecStop'),
180 '%s stop' % init_script)
181
182 self.assertNotIn('Overwriting', err)
183
184 def test_simple_enabled_all(self):
185 '''simple service without dependencies, enabled in all runlevels'''
186
187 self.add_sysv('foo', {}, enable=True)
188 err, results = self.run_generator()
189 self.assertEqual(list(results), ['foo.service'])
190 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
191 self.assertNotIn('Overwriting', err)
192
193 def test_simple_escaped(self):
194 '''simple service without dependencies, that requires escaping the name'''
195
196 self.add_sysv('foo+', {})
197 self.add_sysv('foo-admin', {})
198 err, results = self.run_generator()
199 self.assertEqual(set(results), {'foo-admin.service', 'foo\\x2b.service'})
200 self.assertNotIn('Overwriting', err)
201
202 def test_simple_enabled_some(self):
203 '''simple service without dependencies, enabled in some runlevels'''
204
205 self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True)
206 err, results = self.run_generator()
207 self.assertEqual(list(results), ['foo.service'])
208 self.assert_enabled('foo.service', ['multi-user'])
209
210 def test_lsb_macro_dep_single(self):
211 '''single LSB macro dependency: $network'''
212
213 self.add_sysv('foo', {'Required-Start': '$network'})
214 s = self.run_generator()[1]['foo.service']
215 self.assertEqual(set(s.options('Unit')),
216 set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
217 self.assertEqual(s.get('Unit', 'After'), 'network-online.target')
218 self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target')
219
220 def test_lsb_macro_dep_multi(self):
221 '''multiple LSB macro dependencies'''
222
223 self.add_sysv('foo', {'Required-Start': '$named $portmap'})
224 s = self.run_generator()[1]['foo.service']
225 self.assertEqual(set(s.options('Unit')),
226 set(['Documentation', 'SourcePath', 'Description', 'After']))
227 self.assertEqual(s.get('Unit', 'After'), 'nss-lookup.target rpcbind.target')
228
229 def test_lsb_deps(self):
230 '''LSB header dependencies to other services'''
231
232 # also give symlink priorities here; they should be ignored
233 self.add_sysv('foo', {'Required-Start': 'must1 must2',
234 'Should-Start': 'may1 ne_may2'},
235 enable=True, prio=40)
236 self.add_sysv('must1', {}, enable=True, prio=10)
237 self.add_sysv('must2', {}, enable=True, prio=15)
238 self.add_sysv('may1', {}, enable=True, prio=20)
239 # do not create ne_may2
240 err, results = self.run_generator()
241 self.assertEqual(sorted(results),
242 ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
243
244 # foo should depend on all of them
245 self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()),
246 ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
247
248 # other services should not depend on each other
249 self.assertFalse(results['must1.service'].has_option('Unit', 'After'))
250 self.assertFalse(results['must2.service'].has_option('Unit', 'After'))
251 self.assertFalse(results['may1.service'].has_option('Unit', 'After'))
252
253 def test_symlink_prio_deps(self):
254 '''script without LSB headers use rcN.d priority'''
255
256 # create two init.d scripts without LSB header and enable them with
257 # startup priorities
258 for prio, name in [(10, 'provider'), (15, 'consumer')]:
259 with open(os.path.join(self.init_d_dir, name), 'w') as f:
260 f.write('#!/bin/init-d-interpreter\ncode --goes here\n')
261 os.fchmod(f.fileno(), 0o755)
262
263 d = os.path.join(self.rcnd_dir, 'rc2.d')
264 if not os.path.isdir(d):
265 os.mkdir(d)
266 os.symlink('../init.d/' + name, os.path.join(d, 'S%02i%s' % (prio, name)))
267
268 err, results = self.run_generator()
269 self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
270 self.assertFalse(results['provider.service'].has_option('Unit', 'After'))
271 self.assertEqual(results['consumer.service'].get('Unit', 'After'),
272 'provider.service')
273
274 def test_multiple_provides(self):
275 '''multiple Provides: names'''
276
277 self.add_sysv('foo', {'Provides': 'foo bar baz'})
278 err, results = self.run_generator()
279 self.assertEqual(list(results), ['foo.service'])
280 self.assertEqual(set(results['foo.service'].options('Unit')),
281 set(['Documentation', 'SourcePath', 'Description']))
282 # should create symlinks for the alternative names
283 for f in ['bar.service', 'baz.service']:
284 self.assertEqual(os.readlink(os.path.join(self.out_dir, f)),
285 'foo.service')
286 self.assertNotIn('Overwriting', err)
287
288 def test_provides_escaped(self):
289 '''a script that Provides: a name that requires escaping'''
290
291 self.add_sysv('foo', {'Provides': 'foo foo+'})
292 err, results = self.run_generator()
293 self.assertEqual(list(results), ['foo.service'])
294 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'foo\\x2b.service')),
295 'foo.service')
296 self.assertNotIn('Overwriting', err)
297
298 def test_same_provides_in_multiple_scripts(self):
299 '''multiple init.d scripts provide the same name'''
300
301 self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1)
302 self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2)
303 err, results = self.run_generator()
304 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
305 # should create symlink for the alternative name for either unit
306 self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')),
307 ['foo.service', 'bar.service'])
308
309 def test_provide_other_script(self):
310 '''init.d scripts provides the name of another init.d script'''
311
312 self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True)
313 self.add_sysv('bar', {'Provides': 'bar'}, enable=True)
314 err, results = self.run_generator()
315 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
316 # we do expect an overwrite here, bar.service should overwrite the
317 # alias link from foo.service
318 self.assertIn('Overwriting', err)
319
320 def test_nonexecutable_script(self):
321 '''ignores non-executable init.d script'''
322
323 os.chmod(self.add_sysv('foo', {}), 0o644)
324 err, results = self.run_generator()
325 self.assertEqual(results, {})
326
327 def test_sh_suffix(self):
328 '''init.d script with .sh suffix'''
329
330 self.add_sysv('foo.sh', {}, enable=True)
331 err, results = self.run_generator()
332 s = results['foo.service']
333
334 self.assertEqual(s.sections(), ['Unit', 'Service'])
335 # should not have a .sh
336 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
337
338 # calls correct script with .sh
339 init_script = os.path.join(self.init_d_dir, 'foo.sh')
340 self.assertEqual(s.get('Service', 'ExecStart'),
341 '%s start' % init_script)
342 self.assertEqual(s.get('Service', 'ExecStop'),
343 '%s stop' % init_script)
344
345 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
346
347 def test_sh_suffix_with_provides(self):
348 '''init.d script with .sh suffix and Provides:'''
349
350 self.add_sysv('foo.sh', {'Provides': 'foo bar'})
351 err, results = self.run_generator()
352 # ensure we don't try to create a symlink to itself
353 self.assertNotIn('itself', err)
354 self.assertEqual(list(results), ['foo.service'])
355 self.assertEqual(results['foo.service'].get('Unit', 'Description'),
356 'LSB: test foo service')
357
358 # should create symlink for the alternative name
359 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')),
360 'foo.service')
361
362 def test_hidden_files(self):
363 '''init.d script with hidden file suffix'''
364
365 script = self.add_sysv('foo', {}, enable=True)
366 # backup files (not enabled in rcN.d/)
367 shutil.copy(script, script + '.dpkg-new')
368 shutil.copy(script, script + '.dpkg-dist')
369 shutil.copy(script, script + '.swp')
370 shutil.copy(script, script + '.rpmsave')
371
372 err, results = self.run_generator()
373 self.assertEqual(list(results), ['foo.service'])
374
375 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
376
377 def test_backup_file(self):
378 '''init.d script with backup file'''
379
380 script = self.add_sysv('foo', {}, enable=True)
381 # backup files (not enabled in rcN.d/)
382 shutil.copy(script, script + '.bak')
383 shutil.copy(script, script + '.old')
384
385 err, results = self.run_generator()
386 print(err)
387 self.assertEqual(sorted(results),
388 ['foo.bak.service', 'foo.old.service', 'foo.service'])
389
390 # ensure we don't try to create a symlink to itself
391 self.assertNotIn('itself', err)
392
393 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
394 self.assert_enabled('foo.bak.service', [])
395 self.assert_enabled('foo.old.service', [])
396
397 def test_existing_native_unit(self):
398 '''existing native unit'''
399
400 with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f:
401 f.write('[Unit]\n')
402
403 self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True)
404 err, results = self.run_generator()
405 self.assertEqual(list(results), [])
406 # no enablement or alias links, as native unit is disabled
407 self.assertEqual(os.listdir(self.out_dir), [])
408
409
410 if __name__ == '__main__':
411 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))