]>
Commit | Line | Data |
---|---|---|
d9daae55 | 1 | #!/usr/bin/env python3 |
db9ecf05 | 2 | # SPDX-License-Identifier: LGPL-2.1-or-later |
d9daae55 | 3 | # |
d8a0bcfd YW |
4 | # systemd is free software; you can redistribute it and/or modify it |
5 | # under the terms of the GNU Lesser General Public License as published by | |
6 | # the Free Software Foundation; either version 2.1 of the License, or | |
7 | # (at your option) any later version. | |
d9daae55 | 8 | |
5a8575ef | 9 | import os |
d9daae55 | 10 | import sys |
03025f46 | 11 | import socket |
d9daae55 | 12 | import subprocess |
03025f46 ZJS |
13 | import tempfile |
14 | import pwd | |
b75f0c69 | 15 | import grp |
f582e61b | 16 | from pathlib import Path |
03025f46 ZJS |
17 | |
18 | try: | |
19 | from systemd import id128 | |
20 | except ImportError: | |
21 | id128 = None | |
d9daae55 ZJS |
22 | |
23 | EX_DATAERR = 65 # from sysexits.h | |
7488492a ZJS |
24 | EXIT_TEST_SKIP = 77 |
25 | ||
26 | try: | |
27 | subprocess.run | |
28 | except AttributeError: | |
29 | sys.exit(EXIT_TEST_SKIP) | |
d9daae55 | 30 | |
59ca366c | 31 | exe_with_args = sys.argv[1:] |
458e8d6d | 32 | temp_dir = tempfile.TemporaryDirectory(prefix='test-systemd-tmpfiles.') |
d9daae55 | 33 | |
8473ece9 DB |
34 | # If /tmp isn't owned by either 'root' or the current user |
35 | # systemd-tmpfiles will exit with "Detected unsafe path transition" | |
36 | # breaking this test | |
37 | tmpowner = os.stat("/tmp").st_uid | |
38 | if tmpowner != 0 and tmpowner != os.getuid(): | |
39 | print("Skip: /tmp is not owned by 'root' or current user") | |
40 | sys.exit(EXIT_TEST_SKIP) | |
41 | ||
03025f46 | 42 | def test_line(line, *, user, returncode=EX_DATAERR, extra={}): |
5a8575ef | 43 | args = ['--user'] if user else [] |
59ca366c EV |
44 | print('Running {} on {!r}'.format(' '.join(exe_with_args + args), line)) |
45 | c = subprocess.run(exe_with_args + ['--create', '-'] + args, | |
7488492a ZJS |
46 | input=line, stdout=subprocess.PIPE, universal_newlines=True, |
47 | **extra) | |
5a8575ef ZJS |
48 | assert c.returncode == returncode, c |
49 | ||
50 | def test_invalids(*, user): | |
51 | test_line('asdfa', user=user) | |
52 | test_line('f "open quote', user=user) | |
53 | test_line('f closed quote""', user=user) | |
54 | test_line('Y /unknown/letter', user=user) | |
55 | test_line('w non/absolute/path', user=user) | |
56 | test_line('s', user=user) # s is for short | |
57 | test_line('f!! /too/many/bangs', user=user) | |
58 | test_line('f++ /too/many/plusses', user=user) | |
59 | test_line('f+!+ /too/many/plusses', user=user) | |
60 | test_line('f!+! /too/many/bangs', user=user) | |
c46c3233 | 61 | test_line('f== /too/many/equals', user=user) |
5a8575ef ZJS |
62 | test_line('w /unresolved/argument - - - - "%Y"', user=user) |
63 | test_line('w /unresolved/argument/sandwich - - - - "%v%Y%v"', user=user) | |
64 | test_line('w /unresolved/filename/%Y - - - - "whatever"', user=user) | |
65 | test_line('w /unresolved/filename/sandwich/%v%Y%v - - - - "whatever"', user=user) | |
5238e957 BB |
66 | test_line('w - - - - - "no file specified"', user=user) |
67 | test_line('C - - - - - "no file specified"', user=user) | |
5a8575ef ZJS |
68 | test_line('C non/absolute/path - - - - -', user=user) |
69 | test_line('b - - - - - -', user=user) | |
70 | test_line('b 1234 - - - - -', user=user) | |
71 | test_line('c - - - - - -', user=user) | |
72 | test_line('c 1234 - - - - -', user=user) | |
73 | test_line('t - - -', user=user) | |
74 | test_line('T - - -', user=user) | |
75 | test_line('a - - -', user=user) | |
76 | test_line('A - - -', user=user) | |
77 | test_line('h - - -', user=user) | |
78 | test_line('H - - -', user=user) | |
79 | ||
a6dffbb7 | 80 | def test_uninitialized_t(): |
5a8575ef ZJS |
81 | if os.getuid() == 0: |
82 | return | |
83 | ||
03025f46 | 84 | test_line('w /foo - - - - "specifier for --user %t"', |
60f42f7e | 85 | user=True, returncode=0, extra={'env':{'HOME': os.getenv('HOME')}}) |
03025f46 | 86 | |
c46c3233 | 87 | def test_content(line, expected, *, user, extra={}, subpath='/arg', path_cb=None): |
458e8d6d | 88 | d = tempfile.TemporaryDirectory(prefix='test-content.', dir=temp_dir.name) |
c46c3233 AW |
89 | if path_cb is not None: |
90 | path_cb(d.name, subpath) | |
91 | arg = d.name + subpath | |
03025f46 ZJS |
92 | spec = line.format(arg) |
93 | test_line(spec, user=user, returncode=0, extra=extra) | |
94 | content = open(arg).read() | |
95 | print('expect: {!r}\nactual: {!r}'.format(expected, content)) | |
96 | assert content == expected | |
97 | ||
98 | def test_valid_specifiers(*, user): | |
99 | test_content('f {} - - - - two words', 'two words', user=user) | |
b2d896f0 | 100 | if id128 and os.path.isfile('/etc/machine-id'): |
df1172fe ZJS |
101 | try: |
102 | test_content('f {} - - - - %m', '{}'.format(id128.get_machine().hex), user=user) | |
103 | except AssertionError as e: | |
104 | print(e) | |
105 | print('/etc/machine-id: {!r}'.format(open('/etc/machine-id').read())) | |
106 | print('/proc/cmdline: {!r}'.format(open('/proc/cmdline').read())) | |
107 | print('skipping') | |
03025f46 ZJS |
108 | test_content('f {} - - - - %b', '{}'.format(id128.get_boot().hex), user=user) |
109 | test_content('f {} - - - - %H', '{}'.format(socket.gethostname()), user=user) | |
110 | test_content('f {} - - - - %v', '{}'.format(os.uname().release), user=user) | |
172e9cc3 ZJS |
111 | test_content('f {} - - - - %U', '{}'.format(os.getuid() if user else 0), user=user) |
112 | test_content('f {} - - - - %G', '{}'.format(os.getgid() if user else 0), user=user) | |
03025f46 | 113 | |
60f42f7e DDM |
114 | try: |
115 | puser = pwd.getpwuid(os.getuid() if user else 0) | |
116 | except KeyError: | |
117 | puser = None | |
b75f0c69 | 118 | |
60f42f7e DDM |
119 | if puser: |
120 | test_content('f {} - - - - %u', '{}'.format(puser.pw_name), user=user) | |
121 | ||
122 | try: | |
123 | pgroup = grp.getgrgid(os.getgid() if user else 0) | |
124 | except KeyError: | |
125 | pgroup = None | |
126 | ||
127 | if pgroup: | |
128 | test_content('f {} - - - - %g', '{}'.format(pgroup.gr_name), user=user) | |
2f813b8a ZJS |
129 | |
130 | # Note that %h is the only specifier in which we look the environment, | |
131 | # because we check $HOME. Should we even be doing that? | |
132 | home = os.path.expanduser("~") | |
133 | test_content('f {} - - - - %h', '{}'.format(home), user=user) | |
03025f46 ZJS |
134 | |
135 | xdg_runtime_dir = os.getenv('XDG_RUNTIME_DIR') | |
136 | if xdg_runtime_dir is not None or not user: | |
137 | test_content('f {} - - - - %t', | |
138 | xdg_runtime_dir if user else '/run', | |
139 | user=user) | |
140 | ||
b0efbe9b YW |
141 | xdg_state_home = os.getenv('XDG_STATE_HOME') |
142 | if xdg_state_home is None and user: | |
143 | xdg_state_home = os.path.join(home, ".local/state") | |
144 | test_content('f {} - - - - %S', | |
145 | xdg_state_home if user else '/var/lib', | |
146 | user=user) | |
03025f46 ZJS |
147 | |
148 | xdg_cache_home = os.getenv('XDG_CACHE_HOME') | |
b0efbe9b YW |
149 | if xdg_cache_home is None and user: |
150 | xdg_cache_home = os.path.join(home, ".cache") | |
151 | test_content('f {} - - - - %C', | |
152 | xdg_cache_home if user else '/var/cache', | |
153 | user=user) | |
154 | ||
155 | test_content('f {} - - - - %L', | |
156 | os.path.join(xdg_state_home, 'log') if user else '/var/log', | |
157 | user=user) | |
03025f46 ZJS |
158 | |
159 | test_content('f {} - - - - %%', '%', user=user) | |
d9daae55 | 160 | |
c46c3233 AW |
161 | def mkfifo(parent, subpath): |
162 | os.makedirs(parent, mode=0o755, exist_ok=True) | |
163 | first_component = subpath.split('/')[1] | |
164 | path = parent + '/' + first_component | |
165 | print('path: {}'.format(path)) | |
166 | os.mkfifo(path) | |
167 | ||
168 | def mkdir(parent, subpath): | |
169 | first_component = subpath.split('/')[1] | |
170 | path = parent + '/' + first_component | |
171 | os.makedirs(path, mode=0o755, exist_ok=True) | |
172 | os.symlink(path, path + '/self', target_is_directory=True) | |
173 | ||
174 | def symlink(parent, subpath): | |
175 | link_path = parent + '/link-target' | |
176 | os.makedirs(parent, mode=0o755, exist_ok=True) | |
177 | with open(link_path, 'wb') as f: | |
178 | f.write(b'target') | |
179 | first_component = subpath.split('/')[1] | |
180 | path = parent + '/' + first_component | |
181 | os.symlink(link_path, path, target_is_directory=True) | |
182 | ||
183 | def file(parent, subpath): | |
184 | content = 'file-' + subpath.split('/')[1] | |
185 | path = parent + subpath | |
186 | os.makedirs(os.path.dirname(path), mode=0o755, exist_ok=True) | |
187 | with open(path, 'wb') as f: | |
188 | f.write(content.encode()) | |
189 | ||
190 | def valid_symlink(parent, subpath): | |
191 | target = 'link-target' | |
192 | link_path = parent + target | |
193 | os.makedirs(link_path, mode=0o755, exist_ok=True) | |
194 | first_component = subpath.split('/')[1] | |
195 | path = parent + '/' + first_component | |
196 | os.symlink(target, path, target_is_directory=True) | |
197 | ||
198 | def test_hard_cleanup(*, user): | |
199 | type_cbs = [None, file, mkdir, symlink] | |
200 | if 'mkfifo' in dir(os): | |
201 | type_cbs.append(mkfifo) | |
202 | ||
203 | for type_cb in type_cbs: | |
204 | for subpath in ['/shallow', '/deep/1/2']: | |
205 | label = '{}-{}'.format('None' if type_cb is None else type_cb.__name__, subpath.split('/')[1]) | |
206 | test_content('f= {} - - - - ' + label, label, user=user, subpath=subpath, path_cb=type_cb) | |
207 | ||
208 | # Test the case that a valid symlink is in the path. | |
209 | label = 'valid_symlink-deep' | |
210 | test_content('f= {} - - - - ' + label, label, user=user, subpath='/deep/1/2', path_cb=valid_symlink) | |
211 | ||
708daf42 | 212 | def test_base64(): |
015ddd4b | 213 | test_content('f~ {} - - - - UGlmZgpQYWZmClB1ZmYgCg==', "Piff\nPaff\nPuff \n", user=False) |
708daf42 | 214 | |
f582e61b MY |
215 | def test_conditionalized_execute_bit(): |
216 | c = subprocess.run(exe_with_args + ['--version', '|', 'grep', '-F', '+ACL'], shell=True, stdout=subprocess.DEVNULL) | |
217 | if c.returncode != 0: | |
218 | return 0 | |
219 | ||
220 | d = tempfile.TemporaryDirectory(prefix='test-acl.', dir=temp_dir.name) | |
221 | temp = Path(d.name) / "cond_exec" | |
222 | temp.touch() | |
223 | temp.chmod(0o644) | |
224 | ||
225 | test_line(f"a {temp} - - - - u:root:Xwr", user=False, returncode=0) | |
226 | c = subprocess.run(["getfacl", "-Ec", temp], | |
227 | stdout=subprocess.PIPE, check=True, text=True) | |
228 | assert "user:root:rw-" in c.stdout | |
229 | ||
230 | temp.chmod(0o755) | |
231 | test_line(f"a+ {temp} - - - - u:root:Xwr,g:root:rX", user=False, returncode=0) | |
232 | c = subprocess.run(["getfacl", "-Ec", temp], | |
233 | stdout=subprocess.PIPE, check=True, text=True) | |
234 | assert "user:root:rwx" in c.stdout and "group:root:r-x" in c.stdout | |
235 | ||
d9daae55 | 236 | if __name__ == '__main__': |
5a8575ef ZJS |
237 | test_invalids(user=False) |
238 | test_invalids(user=True) | |
a6dffbb7 | 239 | test_uninitialized_t() |
03025f46 ZJS |
240 | |
241 | test_valid_specifiers(user=False) | |
242 | test_valid_specifiers(user=True) | |
c46c3233 AW |
243 | |
244 | test_hard_cleanup(user=False) | |
245 | test_hard_cleanup(user=True) | |
708daf42 LP |
246 | |
247 | test_base64() | |
f582e61b MY |
248 | |
249 | test_conditionalized_execute_bit() |