]> git.ipfire.org Git - thirdparty/xfsprogs-dev.git/blob - scrub/xfs_scrub_all.in
xfs_scrub_all: fix termination signal handling
[thirdparty/xfsprogs-dev.git] / scrub / xfs_scrub_all.in
1 #!/usr/bin/python3
2
3 # SPDX-License-Identifier: GPL-2.0-or-later
4 # Copyright (C) 2018-2024 Oracle. All rights reserved.
5 #
6 # Author: Darrick J. Wong <djwong@kernel.org>
7
8 # Run online scrubbers in parallel, but avoid thrashing.
9
10 import subprocess
11 import json
12 import threading
13 import time
14 import sys
15 import os
16 import argparse
17 import signal
18 from io import TextIOWrapper
19
20 retcode = 0
21 terminate = False
22
23 def DEVNULL():
24 '''Return /dev/null in subprocess writable format.'''
25 try:
26 from subprocess import DEVNULL
27 return DEVNULL
28 except ImportError:
29 return open(os.devnull, 'wb')
30
31 def find_mounts():
32 '''Map mountpoints to physical disks.'''
33 def find_xfs_mounts(bdev, fs, lastdisk):
34 '''Attach lastdisk to each fs found under bdev.'''
35 if bdev['fstype'] == 'xfs' and bdev['mountpoint'] is not None:
36 mnt = bdev['mountpoint']
37 if mnt in fs:
38 fs[mnt].add(lastdisk)
39 else:
40 fs[mnt] = set([lastdisk])
41 if 'children' not in bdev:
42 return
43 for child in bdev['children']:
44 find_xfs_mounts(child, fs, lastdisk)
45
46 fs = {}
47 cmd=['lsblk', '-o', 'NAME,KNAME,TYPE,FSTYPE,MOUNTPOINT', '-J']
48 result = subprocess.Popen(cmd, stdout=subprocess.PIPE)
49 result.wait()
50 if result.returncode != 0:
51 return fs
52 sarray = [x.decode(sys.stdout.encoding) for x in result.stdout.readlines()]
53 output = ' '.join(sarray)
54 bdevdata = json.loads(output)
55
56 # The lsblk output had better be in disks-then-partitions order
57 for bdev in bdevdata['blockdevices']:
58 lastdisk = bdev['kname']
59 find_xfs_mounts(bdev, fs, lastdisk)
60
61 return fs
62
63 def backtick(cmd):
64 '''Generator function that yields lines of a program's stdout.'''
65 p = subprocess.Popen(cmd, stdout = subprocess.PIPE)
66 for line in TextIOWrapper(p.stdout, encoding="utf-8"):
67 yield line.strip()
68
69 def remove_killfunc(killfuncs, fn):
70 '''Ensure fn is not in killfuncs.'''
71 try:
72 killfuncs.remove(fn)
73 except:
74 pass
75
76 def run_killable(cmd, stdout, killfuncs):
77 '''Run a killable program. Returns program retcode or -1 if we can't
78 start it.'''
79 try:
80 proc = subprocess.Popen(cmd, stdout = stdout)
81 killfuncs.add(proc.terminate)
82 proc.wait()
83 remove_killfunc(killfuncs, proc.terminate)
84 return proc.returncode
85 except:
86 return -1
87
88 # systemd doesn't like unit instance names with slashes in them, so it
89 # replaces them with dashes when it invokes the service. Filesystem paths
90 # need a special --path argument so that dashes do not get mangled.
91 def path_to_serviceunit(path):
92 '''Convert a pathname into a systemd service unit name.'''
93
94 cmd = ['systemd-escape', '--template', '@scrub_svcname@',
95 '--path', path]
96 try:
97 proc = subprocess.Popen(cmd, stdout = subprocess.PIPE)
98 proc.wait()
99 for line in proc.stdout:
100 return line.decode(sys.stdout.encoding).strip()
101 except:
102 return None
103
104 def systemctl_stop(unitname):
105 '''Stop a systemd unit.'''
106 cmd = ['systemctl', 'stop', unitname]
107 x = subprocess.Popen(cmd)
108 x.wait()
109
110 def systemctl_start(unitname, killfuncs):
111 '''Start a systemd unit and wait for it to complete.'''
112 stop_fn = None
113 cmd = ['systemctl', 'start', unitname]
114 try:
115 proc = subprocess.Popen(cmd, stdout = DEVNULL())
116 stop_fn = lambda: systemctl_stop(unitname)
117 killfuncs.add(stop_fn)
118 proc.wait()
119 ret = proc.returncode
120 except:
121 if stop_fn is not None:
122 remove_killfunc(killfuncs, stop_fn)
123 return -1
124
125 if ret != 1:
126 remove_killfunc(killfuncs, stop_fn)
127 return ret
128
129 # If systemctl-start returns 1, it's possible that the service failed
130 # or that dbus/systemd restarted and the client program lost its
131 # connection -- according to the systemctl man page, 1 means "unit not
132 # failed".
133 #
134 # Either way, we switch to polling the service status to try to wait
135 # for the service to end. As of systemd 249, the is-active command
136 # returns any of the following states: active, reloading, inactive,
137 # failed, activating, deactivating, or maintenance. Apparently these
138 # strings are not localized.
139 while True:
140 try:
141 for l in backtick(['systemctl', 'is-active', unitname]):
142 if l == 'failed':
143 remove_killfunc(killfuncs, stop_fn)
144 return 1
145 if l == 'inactive':
146 remove_killfunc(killfuncs, stop_fn)
147 return 0
148 except:
149 remove_killfunc(killfuncs, stop_fn)
150 return -1
151
152 time.sleep(1)
153
154 def run_scrub(mnt, cond, running_devs, mntdevs, killfuncs):
155 '''Run a scrub process.'''
156 global retcode, terminate
157
158 print("Scrubbing %s..." % mnt)
159 sys.stdout.flush()
160
161 try:
162 if terminate:
163 return
164
165 # Try it the systemd way
166 unitname = path_to_serviceunit(path)
167 if unitname is not None:
168 ret = systemctl_start(unitname, killfuncs)
169 if ret == 0 or ret == 1:
170 print("Scrubbing %s done, (err=%d)" % (mnt, ret))
171 sys.stdout.flush()
172 retcode |= ret
173 return
174
175 if terminate:
176 return
177
178 # Invoke xfs_scrub manually
179 cmd = ['@sbindir@/xfs_scrub']
180 cmd += '@scrub_args@'.split()
181 cmd += [mnt]
182 ret = run_killable(cmd, None, killfuncs)
183 if ret >= 0:
184 print("Scrubbing %s done, (err=%d)" % (mnt, ret))
185 sys.stdout.flush()
186 retcode |= ret
187 return
188
189 if terminate:
190 return
191
192 print("Unable to start scrub tool.")
193 sys.stdout.flush()
194 finally:
195 running_devs -= mntdevs
196 cond.acquire()
197 cond.notify()
198 cond.release()
199
200 def signal_scrubs(signum, cond):
201 '''Handle termination signals by killing xfs_scrub children.'''
202 global debug, terminate
203
204 if debug:
205 print('Signal handler called with signal', signum)
206 sys.stdout.flush()
207
208 terminate = True
209 cond.acquire()
210 cond.notify()
211 cond.release()
212
213 def wait_for_termination(cond, killfuncs):
214 '''Wait for a child thread to terminate. Returns True if we should
215 abort the program, False otherwise.'''
216 global debug, terminate
217
218 if debug:
219 print('waiting for threads to terminate')
220 sys.stdout.flush()
221
222 cond.acquire()
223 try:
224 cond.wait()
225 except KeyboardInterrupt:
226 terminate = True
227 cond.release()
228
229 if not terminate:
230 return False
231
232 print("Terminating...")
233 sys.stdout.flush()
234 while len(killfuncs) > 0:
235 fn = killfuncs.pop()
236 fn()
237 return True
238
239 def main():
240 '''Find mounts, schedule scrub runs.'''
241 def thr(mnt, devs):
242 a = (mnt, cond, running_devs, devs, killfuncs)
243 thr = threading.Thread(target = run_scrub, args = a)
244 thr.start()
245 global retcode, terminate
246
247 parser = argparse.ArgumentParser( \
248 description = "Scrub all mounted XFS filesystems.")
249 parser.add_argument("-V", help = "Report version and exit.", \
250 action = "store_true")
251 args = parser.parse_args()
252
253 if args.V:
254 print("xfs_scrub_all version @pkg_version@")
255 sys.exit(0)
256
257 fs = find_mounts()
258
259 # Tail the journal if we ourselves aren't a service...
260 journalthread = None
261 if 'SERVICE_MODE' not in os.environ:
262 try:
263 cmd=['journalctl', '--no-pager', '-q', '-S', 'now', \
264 '-f', '-u', 'xfs_scrub@*', '-o', \
265 'cat']
266 journalthread = subprocess.Popen(cmd)
267 except:
268 pass
269
270 # Schedule scrub jobs...
271 running_devs = set()
272 killfuncs = set()
273 cond = threading.Condition()
274
275 signal.signal(signal.SIGINT, lambda s, f: signal_scrubs(s, cond))
276 signal.signal(signal.SIGTERM, lambda s, f: signal_scrubs(s, cond))
277
278 while len(fs) > 0:
279 if len(running_devs) == 0:
280 mnt, devs = fs.popitem()
281 running_devs.update(devs)
282 thr(mnt, devs)
283 poppers = set()
284 for mnt in fs:
285 devs = fs[mnt]
286 can_run = True
287 for dev in devs:
288 if dev in running_devs:
289 can_run = False
290 break
291 if can_run:
292 running_devs.update(devs)
293 poppers.add(mnt)
294 thr(mnt, devs)
295 for p in poppers:
296 fs.pop(p)
297
298 # Wait for one thread to finish
299 if wait_for_termination(cond, killfuncs):
300 break
301
302 # Wait for the rest of the threads to finish
303 while len(killfuncs) > 0:
304 wait_for_termination(cond, killfuncs)
305
306 if journalthread is not None:
307 journalthread.terminate()
308
309 # See the service mode comments in xfs_scrub.c for why we do this.
310 if 'SERVICE_MODE' in os.environ:
311 time.sleep(2)
312 if retcode != 0:
313 retcode = 1
314
315 sys.exit(retcode)
316
317 if __name__ == '__main__':
318 main()