]> git.ipfire.org Git - thirdparty/systemd.git/blame - test/sysv-generator-test.py
test-execute: Add tests for new PassEnvironment= directive
[thirdparty/systemd.git] / test / sysv-generator-test.py
CommitLineData
e28aa588
MP
1# systemd-sysv-generator integration test
2#
3# (C) 2015 Canonical Ltd.
4# Author: Martin Pitt <martin.pitt@ubuntu.com>
5#
6# systemd is free software; you can redistribute it and/or modify it
7# under the terms of the GNU Lesser General Public License as published by
8# the Free Software Foundation; either version 2.1 of the License, or
9# (at your option) any later version.
10
11# systemd is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14# Lesser General Public License for more details.
15#
16# You should have received a copy of the GNU Lesser General Public License
17# along with systemd; If not, see <http://www.gnu.org/licenses/>.
18
19import unittest
20import sys
21import os
22import subprocess
23import tempfile
24import shutil
25from glob import glob
26
27try:
28 from configparser import RawConfigParser
29except ImportError:
30 # python 2
31 from ConfigParser import RawConfigParser
32
33sysv_generator = os.path.join(os.environ.get('builddir', '.'), 'systemd-sysv-generator')
34
35
36class SysvGeneratorTest(unittest.TestCase):
37 def setUp(self):
38 self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.')
39 self.init_d_dir = os.path.join(self.workdir, 'init.d')
40 os.mkdir(self.init_d_dir)
41 self.rcnd_dir = self.workdir
42 self.unit_dir = os.path.join(self.workdir, 'systemd')
43 os.mkdir(self.unit_dir)
44 self.out_dir = os.path.join(self.workdir, 'output')
45 os.mkdir(self.out_dir)
46
47 def tearDown(self):
48 shutil.rmtree(self.workdir)
49
50 #
51 # Helper methods
52 #
53
54 def run_generator(self, expect_error=False):
55 '''Run sysv-generator.
56
57 Fail if stderr contains any "Fail", unless expect_error is True.
58 Return (stderr, filename -> ConfigParser) pair with ouput to stderr and
59 parsed generated units.
60 '''
61 env = os.environ.copy()
62 env['SYSTEMD_LOG_LEVEL'] = 'debug'
6b7d32ad 63 env['SYSTEMD_LOG_TARGET'] = 'console'
e28aa588
MP
64 env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir
65 env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir
66 env['SYSTEMD_UNIT_PATH'] = self.unit_dir
67 gen = subprocess.Popen(
68 [sysv_generator, 'ignored', 'ignored', self.out_dir],
69 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
70 universal_newlines=True, env=env)
71 (out, err) = gen.communicate()
72 if not expect_error:
73 self.assertFalse('Fail' in err, err)
74 self.assertEqual(gen.returncode, 0, err)
75
76 results = {}
77 for service in glob(self.out_dir + '/*.service'):
56401ac5
MP
78 if os.path.islink(service):
79 continue
e28aa588
MP
80 cp = RawConfigParser()
81 cp.optionxform = lambda o: o # don't lower-case option names
82 with open(service) as f:
83 cp.readfp(f)
84 results[os.path.basename(service)] = cp
85
86 return (err, results)
87
88 def add_sysv(self, fname, keys, enable=False, prio=1):
89 '''Create a SysV init script with the given keys in the LSB header
90
91 There are sensible default values for all fields.
92 If enable is True, links will be created in the rcN.d dirs. In that
93 case, the priority can be given with "prio" (default to 1).
94
95 Return path of generated script.
96 '''
97 name_without_sh = fname.endswith('.sh') and fname[:-3] or fname
98 keys.setdefault('Provides', name_without_sh)
99 keys.setdefault('Required-Start', '$local_fs')
100 keys.setdefault('Required-Stop', keys['Required-Start'])
101 keys.setdefault('Default-Start', '2 3 4 5')
102 keys.setdefault('Default-Stop', '0 1 6')
103 keys.setdefault('Short-Description', 'test %s service' %
104 name_without_sh)
105 keys.setdefault('Description', 'long description for test %s service' %
106 name_without_sh)
107 script = os.path.join(self.init_d_dir, fname)
108 with open(script, 'w') as f:
109 f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
110 for k, v in keys.items():
111 if v is not None:
112 f.write('#%20s %s\n' % (k + ':', v))
113 f.write('### END INIT INFO\ncode --goes here\n')
114 os.chmod(script, 0o755)
115
116 if enable:
117 def make_link(prefix, runlevel):
118 d = os.path.join(self.rcnd_dir, 'rc%s.d' % runlevel)
119 if not os.path.isdir(d):
120 os.mkdir(d)
121 os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
122
123 for rl in keys['Default-Start'].split():
124 make_link('S%02i' % prio, rl)
125 for rl in keys['Default-Stop'].split():
126 make_link('K%02i' % (99 - prio), rl)
127
128 return script
129
0377e373
MP
130 def assert_enabled(self, unit, targets):
131 '''assert that a unit is enabled in precisely the given targets'''
e28aa588 132
0377e373 133 all_targets = ['multi-user', 'graphical']
e28aa588
MP
134
135 # should be enabled
0377e373
MP
136 for target in all_targets:
137 link = os.path.join(self.out_dir, '%s.target.wants' % target, unit)
138 if target in targets:
139 unit_file = os.readlink(link)
140 self.assertTrue(os.path.exists(unit_file))
141 self.assertEqual(os.path.basename(unit_file), unit)
e28aa588
MP
142 else:
143 self.assertFalse(os.path.exists(link),
144 '%s unexpectedly exists' % link)
145
146 #
147 # test cases
148 #
149
150 def test_nothing(self):
151 '''no input files'''
152
153 results = self.run_generator()[1]
154 self.assertEqual(results, {})
155 self.assertEqual(os.listdir(self.out_dir), [])
156
157 def test_simple_disabled(self):
158 '''simple service without dependencies, disabled'''
159
160 self.add_sysv('foo', {}, enable=False)
161 err, results = self.run_generator()
162 self.assertEqual(len(results), 1)
163
164 # no enablement links or other stuff
165 self.assertEqual(os.listdir(self.out_dir), ['foo.service'])
166
167 s = results['foo.service']
168 self.assertEqual(s.sections(), ['Unit', 'Service'])
169 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
170 # $local_fs does not need translation, don't expect any dependency
171 # fields here
172 self.assertEqual(set(s.options('Unit')),
173 set(['Documentation', 'SourcePath', 'Description']))
174
175 self.assertEqual(s.get('Service', 'Type'), 'forking')
176 init_script = os.path.join(self.init_d_dir, 'foo')
177 self.assertEqual(s.get('Service', 'ExecStart'),
178 '%s start' % init_script)
179 self.assertEqual(s.get('Service', 'ExecStop'),
180 '%s stop' % init_script)
181
4e558983
MP
182 self.assertNotIn('Overwriting', err)
183
e28aa588
MP
184 def test_simple_enabled_all(self):
185 '''simple service without dependencies, enabled in all runlevels'''
186
187 self.add_sysv('foo', {}, enable=True)
188 err, results = self.run_generator()
189 self.assertEqual(list(results), ['foo.service'])
0377e373 190 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
4e558983 191 self.assertNotIn('Overwriting', err)
e28aa588 192
264581a2
FS
193 def test_simple_escaped(self):
194 '''simple service without dependencies, that requires escaping the name'''
195
196 self.add_sysv('foo+', {})
197 self.add_sysv('foo-admin', {})
198 err, results = self.run_generator()
52a321d8 199 self.assertEqual(set(results), {'foo-admin.service', 'foo\\x2b.service'})
264581a2
FS
200 self.assertNotIn('Overwriting', err)
201
e28aa588
MP
202 def test_simple_enabled_some(self):
203 '''simple service without dependencies, enabled in some runlevels'''
204
205 self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True)
206 err, results = self.run_generator()
207 self.assertEqual(list(results), ['foo.service'])
0377e373 208 self.assert_enabled('foo.service', ['multi-user'])
e28aa588
MP
209
210 def test_lsb_macro_dep_single(self):
211 '''single LSB macro dependency: $network'''
212
213 self.add_sysv('foo', {'Required-Start': '$network'})
214 s = self.run_generator()[1]['foo.service']
215 self.assertEqual(set(s.options('Unit')),
216 set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
217 self.assertEqual(s.get('Unit', 'After'), 'network-online.target')
218 self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target')
219
220 def test_lsb_macro_dep_multi(self):
221 '''multiple LSB macro dependencies'''
222
223 self.add_sysv('foo', {'Required-Start': '$named $portmap'})
224 s = self.run_generator()[1]['foo.service']
225 self.assertEqual(set(s.options('Unit')),
226 set(['Documentation', 'SourcePath', 'Description', 'After']))
227 self.assertEqual(s.get('Unit', 'After'), 'nss-lookup.target rpcbind.target')
228
229 def test_lsb_deps(self):
230 '''LSB header dependencies to other services'''
231
232 # also give symlink priorities here; they should be ignored
233 self.add_sysv('foo', {'Required-Start': 'must1 must2',
234 'Should-Start': 'may1 ne_may2'},
235 enable=True, prio=40)
236 self.add_sysv('must1', {}, enable=True, prio=10)
237 self.add_sysv('must2', {}, enable=True, prio=15)
238 self.add_sysv('may1', {}, enable=True, prio=20)
239 # do not create ne_may2
240 err, results = self.run_generator()
241 self.assertEqual(sorted(results),
242 ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
243
244 # foo should depend on all of them
245 self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()),
246 ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
247
248 # other services should not depend on each other
249 self.assertFalse(results['must1.service'].has_option('Unit', 'After'))
250 self.assertFalse(results['must2.service'].has_option('Unit', 'After'))
251 self.assertFalse(results['may1.service'].has_option('Unit', 'After'))
252
253 def test_symlink_prio_deps(self):
254 '''script without LSB headers use rcN.d priority'''
255
256 # create two init.d scripts without LSB header and enable them with
257 # startup priorities
258 for prio, name in [(10, 'provider'), (15, 'consumer')]:
259 with open(os.path.join(self.init_d_dir, name), 'w') as f:
260 f.write('#!/bin/init-d-interpreter\ncode --goes here\n')
261 os.fchmod(f.fileno(), 0o755)
262
263 d = os.path.join(self.rcnd_dir, 'rc2.d')
264 if not os.path.isdir(d):
265 os.mkdir(d)
266 os.symlink('../init.d/' + name, os.path.join(d, 'S%02i%s' % (prio, name)))
267
268 err, results = self.run_generator()
269 self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
270 self.assertFalse(results['provider.service'].has_option('Unit', 'After'))
271 self.assertEqual(results['consumer.service'].get('Unit', 'After'),
272 'provider.service')
273
274 def test_multiple_provides(self):
275 '''multiple Provides: names'''
276
277 self.add_sysv('foo', {'Provides': 'foo bar baz'})
56401ac5
MP
278 err, results = self.run_generator()
279 self.assertEqual(list(results), ['foo.service'])
280 self.assertEqual(set(results['foo.service'].options('Unit')),
e28aa588
MP
281 set(['Documentation', 'SourcePath', 'Description']))
282 # should create symlinks for the alternative names
283 for f in ['bar.service', 'baz.service']:
284 self.assertEqual(os.readlink(os.path.join(self.out_dir, f)),
285 'foo.service')
4e558983 286 self.assertNotIn('Overwriting', err)
e28aa588 287
264581a2
FS
288 def test_provides_escaped(self):
289 '''a script that Provides: a name that requires escaping'''
290
291 self.add_sysv('foo', {'Provides': 'foo foo+'})
292 err, results = self.run_generator()
293 self.assertEqual(list(results), ['foo.service'])
294 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'foo\\x2b.service')),
295 'foo.service')
296 self.assertNotIn('Overwriting', err)
297
77354c7e
MP
298 def test_same_provides_in_multiple_scripts(self):
299 '''multiple init.d scripts provide the same name'''
300
301 self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1)
302 self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2)
303 err, results = self.run_generator()
304 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
305 # should create symlink for the alternative name for either unit
306 self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')),
307 ['foo.service', 'bar.service'])
308
309 def test_provide_other_script(self):
310 '''init.d scripts provides the name of another init.d script'''
311
312 self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True)
313 self.add_sysv('bar', {'Provides': 'bar'}, enable=True)
314 err, results = self.run_generator()
315 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
4e558983
MP
316 # we do expect an overwrite here, bar.service should overwrite the
317 # alias link from foo.service
318 self.assertIn('Overwriting', err)
77354c7e 319
e28aa588
MP
320 def test_nonexecutable_script(self):
321 '''ignores non-executable init.d script'''
322
323 os.chmod(self.add_sysv('foo', {}), 0o644)
324 err, results = self.run_generator()
325 self.assertEqual(results, {})
326
29e0e6d8
MP
327 def test_sh_suffix(self):
328 '''init.d script with .sh suffix'''
329
330 self.add_sysv('foo.sh', {}, enable=True)
331 err, results = self.run_generator()
332 s = results['foo.service']
333
334 self.assertEqual(s.sections(), ['Unit', 'Service'])
335 # should not have a .sh
336 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
337
338 # calls correct script with .sh
339 init_script = os.path.join(self.init_d_dir, 'foo.sh')
340 self.assertEqual(s.get('Service', 'ExecStart'),
341 '%s start' % init_script)
342 self.assertEqual(s.get('Service', 'ExecStop'),
343 '%s stop' % init_script)
344
0377e373 345 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
29e0e6d8
MP
346
347 def test_sh_suffix_with_provides(self):
348 '''init.d script with .sh suffix and Provides:'''
349
350 self.add_sysv('foo.sh', {'Provides': 'foo bar'})
351 err, results = self.run_generator()
352 # ensure we don't try to create a symlink to itself
230f0485 353 self.assertNotIn('itself', err)
29e0e6d8
MP
354 self.assertEqual(list(results), ['foo.service'])
355 self.assertEqual(results['foo.service'].get('Unit', 'Description'),
356 'LSB: test foo service')
357
358 # should create symlink for the alternative name
359 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')),
360 'foo.service')
361
d816e2b7
MP
362 def test_hidden_files(self):
363 '''init.d script with hidden file suffix'''
364
365 script = self.add_sysv('foo', {}, enable=True)
366 # backup files (not enabled in rcN.d/)
367 shutil.copy(script, script + '.dpkg-new')
368 shutil.copy(script, script + '.dpkg-dist')
369 shutil.copy(script, script + '.swp')
370 shutil.copy(script, script + '.rpmsave')
371
372 err, results = self.run_generator()
373 self.assertEqual(list(results), ['foo.service'])
374
0377e373 375 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
29e0e6d8 376
77354c7e
MP
377 def test_backup_file(self):
378 '''init.d script with backup file'''
379
380 script = self.add_sysv('foo', {}, enable=True)
381 # backup files (not enabled in rcN.d/)
382 shutil.copy(script, script + '.bak')
383 shutil.copy(script, script + '.old')
384
385 err, results = self.run_generator()
386 print(err)
387 self.assertEqual(sorted(results),
388 ['foo.bak.service', 'foo.old.service', 'foo.service'])
389
390 # ensure we don't try to create a symlink to itself
230f0485 391 self.assertNotIn('itself', err)
77354c7e 392
0377e373 393 self.assert_enabled('foo.service', ['multi-user', 'graphical'])
77354c7e
MP
394 self.assert_enabled('foo.bak.service', [])
395 self.assert_enabled('foo.old.service', [])
396
f4f01ec1
MP
397 def test_existing_native_unit(self):
398 '''existing native unit'''
399
400 with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f:
401 f.write('[Unit]\n')
402
403 self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True)
404 err, results = self.run_generator()
405 self.assertEqual(list(results), [])
406 # no enablement or alias links, as native unit is disabled
407 self.assertEqual(os.listdir(self.out_dir), [])
408
e28aa588
MP
409
410if __name__ == '__main__':
411 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))