#!/usr/bin/python3 # SPDX-License-Identifier: GPL-2.0-or-later # Copyright (C) 2018-2024 Oracle. All rights reserved. # # Author: Darrick J. Wong # Run online scrubbers in parallel, but avoid thrashing. import subprocess import json import threading import time import sys import os import argparse from io import TextIOWrapper retcode = 0 terminate = False def DEVNULL(): '''Return /dev/null in subprocess writable format.''' try: from subprocess import DEVNULL return DEVNULL except ImportError: return open(os.devnull, 'wb') def find_mounts(): '''Map mountpoints to physical disks.''' def find_xfs_mounts(bdev, fs, lastdisk): '''Attach lastdisk to each fs found under bdev.''' if bdev['fstype'] == 'xfs' and bdev['mountpoint'] is not None: mnt = bdev['mountpoint'] if mnt in fs: fs[mnt].add(lastdisk) else: fs[mnt] = set([lastdisk]) if 'children' not in bdev: return for child in bdev['children']: find_xfs_mounts(child, fs, lastdisk) fs = {} cmd=['lsblk', '-o', 'NAME,KNAME,TYPE,FSTYPE,MOUNTPOINT', '-J'] result = subprocess.Popen(cmd, stdout=subprocess.PIPE) result.wait() if result.returncode != 0: return fs sarray = [x.decode(sys.stdout.encoding) for x in result.stdout.readlines()] output = ' '.join(sarray) bdevdata = json.loads(output) # The lsblk output had better be in disks-then-partitions order for bdev in bdevdata['blockdevices']: lastdisk = bdev['kname'] find_xfs_mounts(bdev, fs, lastdisk) return fs def backtick(cmd): '''Generator function that yields lines of a program's stdout.''' p = subprocess.Popen(cmd, stdout = subprocess.PIPE) for line in TextIOWrapper(p.stdout, encoding="utf-8"): yield line.strip() def remove_killfunc(killfuncs, fn): '''Ensure fn is not in killfuncs.''' try: killfuncs.remove(fn) except: pass def run_killable(cmd, stdout, killfuncs): '''Run a killable program. Returns program retcode or -1 if we can't start it.''' try: proc = subprocess.Popen(cmd, stdout = stdout) killfuncs.add(proc.terminate) proc.wait() remove_killfunc(killfuncs, proc.terminate) return proc.returncode except: return -1 # systemd doesn't like unit instance names with slashes in them, so it # replaces them with dashes when it invokes the service. Filesystem paths # need a special --path argument so that dashes do not get mangled. def path_to_serviceunit(path): '''Convert a pathname into a systemd service unit name.''' cmd = ['systemd-escape', '--template', '@scrub_svcname@', '--path', path] try: proc = subprocess.Popen(cmd, stdout = subprocess.PIPE) proc.wait() for line in proc.stdout: return line.decode(sys.stdout.encoding).strip() except: return None def systemctl_stop(unitname): '''Stop a systemd unit.''' cmd = ['systemctl', 'stop', unitname] x = subprocess.Popen(cmd) x.wait() def systemctl_start(unitname, killfuncs): '''Start a systemd unit and wait for it to complete.''' stop_fn = None cmd = ['systemctl', 'start', unitname] try: proc = subprocess.Popen(cmd, stdout = DEVNULL()) stop_fn = lambda: systemctl_stop(unitname) killfuncs.add(stop_fn) proc.wait() ret = proc.returncode except: if stop_fn is not None: remove_killfunc(killfuncs, stop_fn) return -1 if ret != 1: remove_killfunc(killfuncs, stop_fn) return ret # If systemctl-start returns 1, it's possible that the service failed # or that dbus/systemd restarted and the client program lost its # connection -- according to the systemctl man page, 1 means "unit not # failed". # # Either way, we switch to polling the service status to try to wait # for the service to end. As of systemd 249, the is-active command # returns any of the following states: active, reloading, inactive, # failed, activating, deactivating, or maintenance. Apparently these # strings are not localized. while True: try: for l in backtick(['systemctl', 'is-active', unitname]): if l == 'failed': remove_killfunc(killfuncs, stop_fn) return 1 if l == 'inactive': remove_killfunc(killfuncs, stop_fn) return 0 except: remove_killfunc(killfuncs, stop_fn) return -1 time.sleep(1) def run_scrub(mnt, cond, running_devs, mntdevs, killfuncs): '''Run a scrub process.''' global retcode, terminate print("Scrubbing %s..." % mnt) sys.stdout.flush() try: if terminate: return # Try it the systemd way unitname = path_to_serviceunit(path) if unitname is not None: ret = systemctl_start(unitname, killfuncs) if ret == 0 or ret == 1: print("Scrubbing %s done, (err=%d)" % (mnt, ret)) sys.stdout.flush() retcode |= ret return if terminate: return # Invoke xfs_scrub manually cmd = ['@sbindir@/xfs_scrub'] cmd += '@scrub_args@'.split() cmd += [mnt] ret = run_killable(cmd, None, killfuncs) if ret >= 0: print("Scrubbing %s done, (err=%d)" % (mnt, ret)) sys.stdout.flush() retcode |= ret return if terminate: return print("Unable to start scrub tool.") sys.stdout.flush() finally: running_devs -= mntdevs cond.acquire() cond.notify() cond.release() def main(): '''Find mounts, schedule scrub runs.''' def thr(mnt, devs): a = (mnt, cond, running_devs, devs, killfuncs) thr = threading.Thread(target = run_scrub, args = a) thr.start() global retcode, terminate parser = argparse.ArgumentParser( \ description = "Scrub all mounted XFS filesystems.") parser.add_argument("-V", help = "Report version and exit.", \ action = "store_true") args = parser.parse_args() if args.V: print("xfs_scrub_all version @pkg_version@") sys.exit(0) fs = find_mounts() # Tail the journal if we ourselves aren't a service... journalthread = None if 'SERVICE_MODE' not in os.environ: try: cmd=['journalctl', '--no-pager', '-q', '-S', 'now', \ '-f', '-u', 'xfs_scrub@*', '-o', \ 'cat'] journalthread = subprocess.Popen(cmd) except: pass # Schedule scrub jobs... running_devs = set() killfuncs = set() cond = threading.Condition() while len(fs) > 0: if len(running_devs) == 0: mnt, devs = fs.popitem() running_devs.update(devs) thr(mnt, devs) poppers = set() for mnt in fs: devs = fs[mnt] can_run = True for dev in devs: if dev in running_devs: can_run = False break if can_run: running_devs.update(devs) poppers.add(mnt) thr(mnt, devs) for p in poppers: fs.pop(p) cond.acquire() try: cond.wait() except KeyboardInterrupt: terminate = True print("Terminating...") sys.stdout.flush() while len(killfuncs) > 0: fn = killfuncs.pop() fn() fs = [] cond.release() if journalthread is not None: journalthread.terminate() # See the service mode comments in xfs_scrub.c for why we do this. if 'SERVICE_MODE' in os.environ: time.sleep(2) if retcode != 0: retcode = 1 sys.exit(retcode) if __name__ == '__main__': main()