import atexit
import fcntl
+import filecmp
import os
import platform
import shlex
return shlex.split(RSYNC) + list(args)
+def forced_protocol():
+ """The protocol version pinned via --protocol=N in the RSYNC command, or
+ None when the run isn't pinning one (so the binary negotiates its newest).
+ Protocol-sensitive tests use this to gate sub-cases -- e.g. the split
+ between --append and --append-verify only exists at protocol >= 30; at
+ protocol 29 plain --append behaves like the old verifying append."""
+ import re
+ m = re.search(r'--protocol[ =](\d+)', RSYNC)
+ return int(m.group(1)) if m else None
+
+
def run_rsync(*args: str, check: bool = True,
capture_output: bool = False) -> subprocess.CompletedProcess:
"""Run rsync with the given arguments.
print(f"permissions: {perms} on {path}")
print(f"should be: {expected}")
test_fail(f"check_perms failed for {path}")
+
+
+# --- depth / cross-dir coverage helpers ------------------------------------
+# Added for the option-coverage expansion (see testsuite/COVERAGE.md).
+# The path-handling restructure changes how parent components resolve, so its
+# bugs surface only at DEPTH and across directory boundaries -- these helpers
+# build trees with an entry at every level and assert the concrete property an
+# option controls (not just dest == src).
+
+def make_tree(root, depth: int = 3, *, data: bool = False,
+ content_lines: int = 20, data_size: int = 4096,
+ dirname: str = 'd', leaf: str = 'f'):
+ """Create a layered directory tree with one regular file at every level.
+
+ For depth=3 under `root`:
+ root/f0
+ root/d1/f1
+ root/d1/d2/f2
+ root/d1/d2/d3/f3
+ so an option's effect can be checked at the tree root AND >=3 levels deep
+ (the parent-component resolution the path restructure rewrites).
+
+ Returns (dirs, files): `dirs` the created subdirectories outermost-first,
+ `files` the regular files shallow-first. Content is deterministic
+ (make_text_file) unless data=True (make_data_file, delta-friendly).
+ """
+ root = Path(root)
+ root.mkdir(parents=True, exist_ok=True)
+ dirs = []
+ files = []
+ cur = root
+ for level in range(depth + 1):
+ f = cur / f'{leaf}{level}'
+ if data:
+ make_data_file(f, data_size)
+ else:
+ make_text_file(f, content_lines)
+ files.append(f)
+ if level < depth:
+ cur = cur / f'{dirname}{level + 1}'
+ cur.mkdir(exist_ok=True)
+ dirs.append(cur)
+ return dirs, files
+
+
+def walk_files(root) -> list:
+ """Every regular (non-symlink) file under `root`, sorted, recursively.
+ For asserting a per-entry property holds at every depth."""
+ root = Path(root)
+ return sorted(p for p in root.rglob('*')
+ if p.is_file() and not p.is_symlink())
+
+
+def walk_dirs(root) -> list:
+ """Every subdirectory under `root`, sorted, recursively."""
+ root = Path(root)
+ return sorted(p for p in root.rglob('*')
+ if p.is_dir() and not p.is_symlink())
+
+
+def _tag(label: str) -> str:
+ return f"{label}: " if label else ""
+
+
+def assert_same(a, b, label: str = '') -> 'None':
+ """Fail unless files `a` and `b` have byte-identical content."""
+ if not filecmp.cmp(str(a), str(b), shallow=False):
+ test_fail(f"{_tag(label)}content differs between {a} and {b}")
+
+
+def assert_mode(path, expected_octal: int, label: str = '') -> 'None':
+ """Fail unless the permission bits (low 12) of `path` equal expected_octal
+ (pass an int like 0o644). Does not follow symlinks."""
+ mode = stat.S_IMODE(os.stat(path, follow_symlinks=False).st_mode)
+ if mode != expected_octal:
+ test_fail(f"{_tag(label)}mode {mode:04o} != expected "
+ f"{expected_octal:04o} on {path}")
+
+
+def assert_mtime_close(a, b, tol: float = 1.0, label: str = '') -> 'None':
+ """Fail unless the mtimes of `a` and `b` are within `tol` seconds.
+ `b` may be a number (an explicit epoch mtime) instead of a path."""
+ ma = os.stat(a, follow_symlinks=False).st_mtime
+ mb = b if isinstance(b, (int, float)) else os.stat(
+ b, follow_symlinks=False).st_mtime
+ if abs(ma - mb) > tol:
+ test_fail(f"{_tag(label)}mtime {ma} vs {mb} differ by > {tol}s "
+ f"(checking {a})")
+
+
+def assert_is_symlink(path, target: str = None, label: str = '') -> 'None':
+ """Fail unless `path` is a symlink (optionally pointing exactly at
+ `target`)."""
+ if not os.path.islink(path):
+ test_fail(f"{_tag(label)}{path} is not a symlink")
+ if target is not None:
+ actual = os.readlink(path)
+ if actual != target:
+ test_fail(f"{_tag(label)}{path} -> {actual!r}, "
+ f"expected {target!r}")
+
+
+def assert_hardlinked(a, b, label: str = '') -> 'None':
+ """Fail unless `a` and `b` are the same inode (a hard link / --link-dest
+ result)."""
+ sa = os.stat(a, follow_symlinks=False)
+ sb = os.stat(b, follow_symlinks=False)
+ if (sa.st_dev, sa.st_ino) != (sb.st_dev, sb.st_ino):
+ test_fail(f"{_tag(label)}{a} and {b} are not hard-linked "
+ f"(ino {sa.st_ino} vs {sb.st_ino})")
+
+
+def assert_not_hardlinked(a, b, label: str = '') -> 'None':
+ """Fail if `a` and `b` share an inode (e.g. --copy-dest must copy, not
+ link)."""
+ sa = os.stat(a, follow_symlinks=False)
+ sb = os.stat(b, follow_symlinks=False)
+ if (sa.st_dev, sa.st_ino) == (sb.st_dev, sb.st_ino):
+ test_fail(f"{_tag(label)}{a} and {b} unexpectedly share "
+ f"inode {sa.st_ino}")
+
+
+def assert_exists(path, label: str = '') -> 'None':
+ """Fail unless `path` exists (a symlink counts even if dangling)."""
+ if not os.path.lexists(path):
+ test_fail(f"{_tag(label)}{path} does not exist")
+
+
+def assert_not_exists(path, label: str = '') -> 'None':
+ """Fail if `path` exists (a dangling symlink counts as existing)."""
+ if os.path.lexists(path):
+ test_fail(f"{_tag(label)}{path} exists but should not")
+
+
+def write_daemon_conf(modules, globals=None, *,
+ name: str = 'test-rsyncd.conf') -> 'Path':
+ """Write a custom rsyncd.conf for daemon-parameter tests.
+
+ `modules` is a list of (module_name, {param: value}) pairs; `globals` an
+ optional dict of global parameters that override the minimal defaults
+ (pid file / use chroot=no / hosts allow / log file / max verbosity).
+ Mirrors build_rsyncd_conf()'s root-aware uid/gid handling (only emitted
+ when running as root) and writes the same `ignore23` wrapper, but lets a
+ test set arbitrary parameters/modules beyond the fixed four. Returns the
+ config path; pair with start_test_daemon().
+ """
+ conf = SCRATCHDIR / name
+ pidfile = SCRATCHDIR / 'rsyncd.pid'
+ logfile = SCRATCHDIR / 'rsyncd.log'
+
+ g = {
+ 'pid file': str(pidfile),
+ 'use chroot': 'no',
+ 'hosts allow': 'localhost 127.0.0.0/8',
+ 'log file': str(logfile),
+ 'max verbosity': '4',
+ }
+ if globals:
+ g.update(globals)
+ if get_testuid() == get_rootuid():
+ g.setdefault('uid', str(get_rootuid()))
+ g.setdefault('gid', str(get_rootgid()))
+ else:
+ # Non-root cannot set uid/gid in rsyncd.conf.
+ g.pop('uid', None)
+ g.pop('gid', None)
+
+ lines = ['# autogenerated by rsyncfns.write_daemon_conf', '']
+ lines += [f'{k} = {v}' for k, v in g.items()]
+ lines.append('')
+ for mod_name, params in modules:
+ lines.append(f'[{mod_name}]')
+ lines += [f'\t{k} = {v}' for k, v in params.items()]
+ lines.append('')
+ conf.write_text('\n'.join(lines) + '\n')
+
+ ignore23 = SCRATCHDIR / 'ignore23'
+ if not ignore23.exists():
+ ignore23.write_text(
+ '#!/bin/sh\n'
+ 'if "${@}"; then exit; fi\n'
+ 'ret=$?\n'
+ 'if test $ret = 23; then exit; fi\n'
+ 'exit $ret\n'
+ )
+ ignore23.chmod(0o755)
+
+ return conf