]> git.ipfire.org Git - thirdparty/systemd.git/blob - test/sysv-generator-test.py
sysv-generator: Skip init scripts for existing native services
[thirdparty/systemd.git] / test / sysv-generator-test.py
1 # systemd-sysv-generator integration test
2 #
3 # (C) 2015 Canonical Ltd.
4 # Author: Martin Pitt <martin.pitt@ubuntu.com>
5 #
6 # systemd is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU Lesser General Public License as published by
8 # the Free Software Foundation; either version 2.1 of the License, or
9 # (at your option) any later version.
10
11 # systemd is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # Lesser General Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public License
17 # along with systemd; If not, see <http://www.gnu.org/licenses/>.
18
19 import unittest
20 import sys
21 import os
22 import subprocess
23 import tempfile
24 import shutil
25 from glob import glob
26
27 try:
28 from configparser import RawConfigParser
29 except ImportError:
30 # python 2
31 from ConfigParser import RawConfigParser
32
33 sysv_generator = os.path.join(os.environ.get('builddir', '.'), 'systemd-sysv-generator')
34
35
36 class SysvGeneratorTest(unittest.TestCase):
37 def setUp(self):
38 self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.')
39 self.init_d_dir = os.path.join(self.workdir, 'init.d')
40 os.mkdir(self.init_d_dir)
41 self.rcnd_dir = self.workdir
42 self.unit_dir = os.path.join(self.workdir, 'systemd')
43 os.mkdir(self.unit_dir)
44 self.out_dir = os.path.join(self.workdir, 'output')
45 os.mkdir(self.out_dir)
46
47 def tearDown(self):
48 shutil.rmtree(self.workdir)
49
50 #
51 # Helper methods
52 #
53
54 def run_generator(self, expect_error=False):
55 '''Run sysv-generator.
56
57 Fail if stderr contains any "Fail", unless expect_error is True.
58 Return (stderr, filename -> ConfigParser) pair with ouput to stderr and
59 parsed generated units.
60 '''
61 env = os.environ.copy()
62 env['SYSTEMD_LOG_LEVEL'] = 'debug'
63 env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir
64 env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir
65 env['SYSTEMD_UNIT_PATH'] = self.unit_dir
66 gen = subprocess.Popen(
67 [sysv_generator, 'ignored', 'ignored', self.out_dir],
68 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
69 universal_newlines=True, env=env)
70 (out, err) = gen.communicate()
71 if not expect_error:
72 self.assertFalse('Fail' in err, err)
73 self.assertEqual(gen.returncode, 0, err)
74
75 results = {}
76 for service in glob(self.out_dir + '/*.service'):
77 if os.path.islink(service):
78 continue
79 cp = RawConfigParser()
80 cp.optionxform = lambda o: o # don't lower-case option names
81 with open(service) as f:
82 cp.readfp(f)
83 results[os.path.basename(service)] = cp
84
85 return (err, results)
86
87 def add_sysv(self, fname, keys, enable=False, prio=1):
88 '''Create a SysV init script with the given keys in the LSB header
89
90 There are sensible default values for all fields.
91 If enable is True, links will be created in the rcN.d dirs. In that
92 case, the priority can be given with "prio" (default to 1).
93
94 Return path of generated script.
95 '''
96 name_without_sh = fname.endswith('.sh') and fname[:-3] or fname
97 keys.setdefault('Provides', name_without_sh)
98 keys.setdefault('Required-Start', '$local_fs')
99 keys.setdefault('Required-Stop', keys['Required-Start'])
100 keys.setdefault('Default-Start', '2 3 4 5')
101 keys.setdefault('Default-Stop', '0 1 6')
102 keys.setdefault('Short-Description', 'test %s service' %
103 name_without_sh)
104 keys.setdefault('Description', 'long description for test %s service' %
105 name_without_sh)
106 script = os.path.join(self.init_d_dir, fname)
107 with open(script, 'w') as f:
108 f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
109 for k, v in keys.items():
110 if v is not None:
111 f.write('#%20s %s\n' % (k + ':', v))
112 f.write('### END INIT INFO\ncode --goes here\n')
113 os.chmod(script, 0o755)
114
115 if enable:
116 def make_link(prefix, runlevel):
117 d = os.path.join(self.rcnd_dir, 'rc%s.d' % runlevel)
118 if not os.path.isdir(d):
119 os.mkdir(d)
120 os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
121
122 for rl in keys['Default-Start'].split():
123 make_link('S%02i' % prio, rl)
124 for rl in keys['Default-Stop'].split():
125 make_link('K%02i' % (99 - prio), rl)
126
127 return script
128
129 def assert_enabled(self, unit, runlevels):
130 '''assert that a unit is enabled in precisely the given runlevels'''
131
132 all_runlevels = [2, 3, 4, 5]
133
134 # should be enabled
135 for runlevel in all_runlevels:
136 link = os.path.join(self.out_dir, 'runlevel%i.target.wants' % runlevel, unit)
137 if runlevel in runlevels:
138 target = os.readlink(link)
139 self.assertTrue(os.path.exists(target))
140 self.assertEqual(os.path.basename(target), unit)
141 else:
142 self.assertFalse(os.path.exists(link),
143 '%s unexpectedly exists' % link)
144
145 #
146 # test cases
147 #
148
149 def test_nothing(self):
150 '''no input files'''
151
152 results = self.run_generator()[1]
153 self.assertEqual(results, {})
154 self.assertEqual(os.listdir(self.out_dir), [])
155
156 def test_simple_disabled(self):
157 '''simple service without dependencies, disabled'''
158
159 self.add_sysv('foo', {}, enable=False)
160 err, results = self.run_generator()
161 self.assertEqual(len(results), 1)
162
163 # no enablement links or other stuff
164 self.assertEqual(os.listdir(self.out_dir), ['foo.service'])
165
166 s = results['foo.service']
167 self.assertEqual(s.sections(), ['Unit', 'Service'])
168 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
169 # $local_fs does not need translation, don't expect any dependency
170 # fields here
171 self.assertEqual(set(s.options('Unit')),
172 set(['Documentation', 'SourcePath', 'Description']))
173
174 self.assertEqual(s.get('Service', 'Type'), 'forking')
175 init_script = os.path.join(self.init_d_dir, 'foo')
176 self.assertEqual(s.get('Service', 'ExecStart'),
177 '%s start' % init_script)
178 self.assertEqual(s.get('Service', 'ExecStop'),
179 '%s stop' % init_script)
180
181 def test_simple_enabled_all(self):
182 '''simple service without dependencies, enabled in all runlevels'''
183
184 self.add_sysv('foo', {}, enable=True)
185 err, results = self.run_generator()
186 self.assertEqual(list(results), ['foo.service'])
187 self.assert_enabled('foo.service', [2, 3, 4, 5])
188
189 def test_simple_enabled_some(self):
190 '''simple service without dependencies, enabled in some runlevels'''
191
192 self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True)
193 err, results = self.run_generator()
194 self.assertEqual(list(results), ['foo.service'])
195 self.assert_enabled('foo.service', [2, 4])
196
197 def test_lsb_macro_dep_single(self):
198 '''single LSB macro dependency: $network'''
199
200 self.add_sysv('foo', {'Required-Start': '$network'})
201 s = self.run_generator()[1]['foo.service']
202 self.assertEqual(set(s.options('Unit')),
203 set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
204 self.assertEqual(s.get('Unit', 'After'), 'network-online.target')
205 self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target')
206
207 def test_lsb_macro_dep_multi(self):
208 '''multiple LSB macro dependencies'''
209
210 self.add_sysv('foo', {'Required-Start': '$named $portmap'})
211 s = self.run_generator()[1]['foo.service']
212 self.assertEqual(set(s.options('Unit')),
213 set(['Documentation', 'SourcePath', 'Description', 'After']))
214 self.assertEqual(s.get('Unit', 'After'), 'nss-lookup.target rpcbind.target')
215
216 def test_lsb_deps(self):
217 '''LSB header dependencies to other services'''
218
219 # also give symlink priorities here; they should be ignored
220 self.add_sysv('foo', {'Required-Start': 'must1 must2',
221 'Should-Start': 'may1 ne_may2'},
222 enable=True, prio=40)
223 self.add_sysv('must1', {}, enable=True, prio=10)
224 self.add_sysv('must2', {}, enable=True, prio=15)
225 self.add_sysv('may1', {}, enable=True, prio=20)
226 # do not create ne_may2
227 err, results = self.run_generator()
228 self.assertEqual(sorted(results),
229 ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
230
231 # foo should depend on all of them
232 self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()),
233 ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
234
235 # other services should not depend on each other
236 self.assertFalse(results['must1.service'].has_option('Unit', 'After'))
237 self.assertFalse(results['must2.service'].has_option('Unit', 'After'))
238 self.assertFalse(results['may1.service'].has_option('Unit', 'After'))
239
240 def test_symlink_prio_deps(self):
241 '''script without LSB headers use rcN.d priority'''
242
243 # create two init.d scripts without LSB header and enable them with
244 # startup priorities
245 for prio, name in [(10, 'provider'), (15, 'consumer')]:
246 with open(os.path.join(self.init_d_dir, name), 'w') as f:
247 f.write('#!/bin/init-d-interpreter\ncode --goes here\n')
248 os.fchmod(f.fileno(), 0o755)
249
250 d = os.path.join(self.rcnd_dir, 'rc2.d')
251 if not os.path.isdir(d):
252 os.mkdir(d)
253 os.symlink('../init.d/' + name, os.path.join(d, 'S%02i%s' % (prio, name)))
254
255 err, results = self.run_generator()
256 self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
257 self.assertFalse(results['provider.service'].has_option('Unit', 'After'))
258 self.assertEqual(results['consumer.service'].get('Unit', 'After'),
259 'provider.service')
260
261 def test_multiple_provides(self):
262 '''multiple Provides: names'''
263
264 self.add_sysv('foo', {'Provides': 'foo bar baz'})
265 err, results = self.run_generator()
266 self.assertEqual(list(results), ['foo.service'])
267 self.assertEqual(set(results['foo.service'].options('Unit')),
268 set(['Documentation', 'SourcePath', 'Description']))
269 # should create symlinks for the alternative names
270 for f in ['bar.service', 'baz.service']:
271 self.assertEqual(os.readlink(os.path.join(self.out_dir, f)),
272 'foo.service')
273
274 def test_same_provides_in_multiple_scripts(self):
275 '''multiple init.d scripts provide the same name'''
276
277 self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1)
278 self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2)
279 err, results = self.run_generator()
280 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
281 # should create symlink for the alternative name for either unit
282 self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')),
283 ['foo.service', 'bar.service'])
284
285 def test_provide_other_script(self):
286 '''init.d scripts provides the name of another init.d script'''
287
288 self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True)
289 self.add_sysv('bar', {'Provides': 'bar'}, enable=True)
290 err, results = self.run_generator()
291 self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
292
293 def test_nonexecutable_script(self):
294 '''ignores non-executable init.d script'''
295
296 os.chmod(self.add_sysv('foo', {}), 0o644)
297 err, results = self.run_generator()
298 self.assertEqual(results, {})
299
300 def test_sh_suffix(self):
301 '''init.d script with .sh suffix'''
302
303 self.add_sysv('foo.sh', {}, enable=True)
304 err, results = self.run_generator()
305 s = results['foo.service']
306
307 self.assertEqual(s.sections(), ['Unit', 'Service'])
308 # should not have a .sh
309 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
310
311 # calls correct script with .sh
312 init_script = os.path.join(self.init_d_dir, 'foo.sh')
313 self.assertEqual(s.get('Service', 'ExecStart'),
314 '%s start' % init_script)
315 self.assertEqual(s.get('Service', 'ExecStop'),
316 '%s stop' % init_script)
317
318 self.assert_enabled('foo.service', [2, 3, 4, 5])
319
320 def test_sh_suffix_with_provides(self):
321 '''init.d script with .sh suffix and Provides:'''
322
323 self.add_sysv('foo.sh', {'Provides': 'foo bar'})
324 err, results = self.run_generator()
325 # ensure we don't try to create a symlink to itself
326 self.assertNotIn(err, 'itself')
327 self.assertEqual(list(results), ['foo.service'])
328 self.assertEqual(results['foo.service'].get('Unit', 'Description'),
329 'LSB: test foo service')
330
331 # should create symlink for the alternative name
332 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')),
333 'foo.service')
334
335 def test_hidden_files(self):
336 '''init.d script with hidden file suffix'''
337
338 script = self.add_sysv('foo', {}, enable=True)
339 # backup files (not enabled in rcN.d/)
340 shutil.copy(script, script + '.dpkg-new')
341 shutil.copy(script, script + '.dpkg-dist')
342 shutil.copy(script, script + '.swp')
343 shutil.copy(script, script + '.rpmsave')
344
345 err, results = self.run_generator()
346 self.assertEqual(list(results), ['foo.service'])
347
348 self.assert_enabled('foo.service', [2, 3, 4, 5])
349
350 def test_backup_file(self):
351 '''init.d script with backup file'''
352
353 script = self.add_sysv('foo', {}, enable=True)
354 # backup files (not enabled in rcN.d/)
355 shutil.copy(script, script + '.bak')
356 shutil.copy(script, script + '.old')
357
358 err, results = self.run_generator()
359 print(err)
360 self.assertEqual(sorted(results),
361 ['foo.bak.service', 'foo.old.service', 'foo.service'])
362
363 # ensure we don't try to create a symlink to itself
364 self.assertNotIn(err, 'itself')
365
366 self.assert_enabled('foo.service', [2, 3, 4, 5])
367 self.assert_enabled('foo.bak.service', [])
368 self.assert_enabled('foo.old.service', [])
369
370 def test_existing_native_unit(self):
371 '''existing native unit'''
372
373 with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f:
374 f.write('[Unit]\n')
375
376 self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True)
377 err, results = self.run_generator()
378 self.assertEqual(list(results), [])
379 # no enablement or alias links, as native unit is disabled
380 self.assertEqual(os.listdir(self.out_dir), [])
381
382
383 if __name__ == '__main__':
384 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))