#!/usr/bin/python3 # SPDX-License-Identifier: GPL-2.0+ # Copyright (C) 2018 Oracle. All rights reserved. # # Author: Darrick J. Wong # Run online scrubbers in parallel, but avoid thrashing. import subprocess import json import threading import time import sys import os import argparse retcode = 0 terminate = False def DEVNULL(): '''Return /dev/null in subprocess writable format.''' try: from subprocess import DEVNULL return DEVNULL except ImportError: return open(os.devnull, 'wb') def find_mounts(): '''Map mountpoints to physical disks.''' def find_xfs_mounts(bdev, fs, lastdisk): '''Attach lastdisk to each fs found under bdev.''' if bdev['fstype'] == 'xfs' and bdev['mountpoint'] is not None: mnt = bdev['mountpoint'] if mnt in fs: fs[mnt].add(lastdisk) else: fs[mnt] = set([lastdisk]) if 'children' not in bdev: return for child in bdev['children']: find_xfs_mounts(child, fs, lastdisk) fs = {} cmd=['lsblk', '-o', 'NAME,KNAME,TYPE,FSTYPE,MOUNTPOINT', '-J'] result = subprocess.Popen(cmd, stdout=subprocess.PIPE) result.wait() if result.returncode != 0: return fs sarray = [x.decode(sys.stdout.encoding) for x in result.stdout.readlines()] output = ' '.join(sarray) bdevdata = json.loads(output) # The lsblk output had better be in disks-then-partitions order for bdev in bdevdata['blockdevices']: lastdisk = bdev['kname'] find_xfs_mounts(bdev, fs, lastdisk) return fs def kill_systemd(unit, proc): '''Kill systemd unit.''' proc.terminate() cmd=['systemctl', 'stop', unit] x = subprocess.Popen(cmd) x.wait() def run_killable(cmd, stdout, killfuncs, kill_fn): '''Run a killable program. Returns program retcode or -1 if we can't start it.''' try: proc = subprocess.Popen(cmd, stdout = stdout) real_kill_fn = lambda: kill_fn(proc) killfuncs.add(real_kill_fn) proc.wait() try: killfuncs.remove(real_kill_fn) except: pass return proc.returncode except: return -1 # systemd doesn't like unit instance names with slashes in them, so it # replaces them with dashes when it invokes the service. However, it's not # smart enough to convert the dashes to something else, so when it unescapes # the instance name to feed to xfs_scrub, it turns all dashes into slashes. # "/moo-cow" becomes "-moo-cow" becomes "/moo/cow", which is wrong. systemd # actually /can/ escape the dashes correctly if it is told that this is a path # (and not a unit name), but it didn't do this prior to January 2017, so fix # this for them. # # systemd path escaping also drops the initial slash so we add that back in so # that log messages from the service units preserve the full path and users can # look up log messages using full paths. However, for "/" the escaping rules # do /not/ drop the initial slash, so we have to special-case that here. def systemd_escape(path): '''Escape a path to avoid mangled systemd mangling.''' if path == '/': return '-' cmd = ['systemd-escape', '--path', path] try: proc = subprocess.Popen(cmd, stdout = subprocess.PIPE) proc.wait() for line in proc.stdout: return '-' + line.decode(sys.stdout.encoding).strip() except: return path def run_scrub(mnt, cond, running_devs, mntdevs, killfuncs): '''Run a scrub process.''' global retcode, terminate print("Scrubbing %s..." % mnt) sys.stdout.flush() try: if terminate: return # Try it the systemd way cmd=['systemctl', 'start', 'xfs_scrub@%s' % systemd_escape(mnt)] ret = run_killable(cmd, DEVNULL(), killfuncs, \ lambda proc: kill_systemd('xfs_scrub@%s' % mnt, proc)) if ret == 0 or ret == 1: print("Scrubbing %s done, (err=%d)" % (mnt, ret)) sys.stdout.flush() retcode |= ret return if terminate: return # Invoke xfs_scrub manually cmd=['@sbindir@/xfs_scrub', '@scrub_args@', mnt] ret = run_killable(cmd, None, killfuncs, \ lambda proc: proc.terminate()) if ret >= 0: print("Scrubbing %s done, (err=%d)" % (mnt, ret)) sys.stdout.flush() retcode |= ret return if terminate: return print("Unable to start scrub tool.") sys.stdout.flush() finally: running_devs -= mntdevs cond.acquire() cond.notify() cond.release() def main(): '''Find mounts, schedule scrub runs.''' def thr(mnt, devs): a = (mnt, cond, running_devs, devs, killfuncs) thr = threading.Thread(target = run_scrub, args = a) thr.start() global retcode, terminate parser = argparse.ArgumentParser( \ description = "Scrub all mounted XFS filesystems.") parser.add_argument("-V", help = "Report version and exit.", \ action = "store_true") args = parser.parse_args() if args.V: print("xfs_scrub_all version @pkg_version@") sys.exit(0) fs = find_mounts() # Tail the journal if we ourselves aren't a service... journalthread = None if 'SERVICE_MODE' not in os.environ: try: cmd=['journalctl', '--no-pager', '-q', '-S', 'now', \ '-f', '-u', 'xfs_scrub@*', '-o', \ 'cat'] journalthread = subprocess.Popen(cmd) except: pass # Schedule scrub jobs... running_devs = set() killfuncs = set() cond = threading.Condition() while len(fs) > 0: if len(running_devs) == 0: mnt, devs = fs.popitem() running_devs.update(devs) thr(mnt, devs) poppers = set() for mnt in fs: devs = fs[mnt] can_run = True for dev in devs: if dev in running_devs: can_run = False break if can_run: running_devs.update(devs) poppers.add(mnt) thr(mnt, devs) for p in poppers: fs.pop(p) cond.acquire() try: cond.wait() except KeyboardInterrupt: terminate = True print("Terminating...") sys.stdout.flush() while len(killfuncs) > 0: fn = killfuncs.pop() fn() fs = [] cond.release() if journalthread is not None: journalthread.terminate() # See the service mode comments in xfs_scrub.c for why we do this. if 'SERVICE_MODE' in os.environ: time.sleep(2) if retcode != 0: retcode = 1 sys.exit(retcode) if __name__ == '__main__': main()