def run_attack(args):
- subprocess.run(
+ """Run the (expected-to-be-contained) attack transfer and return its exit
+ code; output is discarded since the security assertion is on the filesystem."""
+ return subprocess.run(
rsync_argv(*args),
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
- )
+ ).returncode
+
+
+def positive_control():
+ """Confirm the upload module accepts an ordinary write. Without this, every
+ scenario below would also pass if the daemon refused (or failed) before the
+ vulnerable code ran -- the sentinel-unchanged check alone can't tell a
+ securely-contained attack from one that never reached the receiver path."""
+ for d in (mod, src):
+ rmtree(d)
+ d.mkdir(parents=True)
+ (src / 'legit.txt').write_text("LEGIT_IN_MODULE\n")
+ rc = run_attack(['-t', f'{src}/legit.txt', f'{daemon_url}/upload/legit.txt'])
+ if rc != 0 or not (mod / 'legit.txt').is_file() \
+ or (mod / 'legit.txt').read_text() != "LEGIT_IN_MODULE\n":
+ test_fail(f"positive control: upload module did not accept a normal "
+ f"write (rc={rc}); attack scenarios would be vacuous")
+
+
+positive_control()
# Scenario 3b: --inplace --backup --backup-dir=cd
(src / 'target.txt').write_text("NEW_DATA_FROM_SENDER\n")
os.chmod(src / 'target.txt', 0o644)
-run_attack([
+rc = run_attack([
'--inplace', '--backup', '--backup-dir=cd',
f'{src}/target.txt', f'{daemon_url}/upload/target.txt',
])
+if rc >= 128:
+ test_fail(f"3b inplace+backup-dir=cd: rsync died from a signal (rc={rc})")
verify_outside_unchanged("3b inplace+backup-dir=cd")
(src / 'cd').mkdir()
os.symlink('/etc/passwd', src / 'cd' / 'sym')
-run_attack(['-rl', f'{src}/', f'{daemon_url}/upload_fake/'])
+rc = run_attack(['-rl', f'{src}/', f'{daemon_url}/upload_fake/'])
+if rc >= 128:
+ test_fail(f"3c-symlink: rsync died from a signal (rc={rc})")
verify_outside_unchanged_or_absent("3c-symlink fake-super symlink push", "sym")
if not stat.S_ISFIFO((src / 'cd' / 'fifo').stat().st_mode):
test_skipped("mkfifo unavailable; cannot exercise 3c-mknod")
-run_attack(['-rD', f'{src}/', f'{daemon_url}/upload_fake/'])
+rc = run_attack(['-rD', f'{src}/', f'{daemon_url}/upload_fake/'])
+if rc >= 128:
+ test_fail(f"3c-mknod: rsync died from a signal (rc={rc})")
verify_outside_unchanged_or_absent("3c-mknod fake-super FIFO push", "fifo")
def run_attack(label: str, *args) -> None:
reset_outside()
- subprocess.run(
+ rc = subprocess.run(
rsync_argv(*args),
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
- )
+ ).returncode
+ if rc >= 128:
+ test_fail(f"{label}: rsync died from a signal (rc={rc})")
verify_unchanged(label)
+def positive_control() -> None:
+ """Confirm the receiver writes into an ordinary in-module subdirectory, so
+ the symlink-escape scenarios below genuinely exercise the chdir path rather
+ than passing because the daemon refused (or failed) before reaching it."""
+ real = mod / 'realdir'
+ rmtree(real)
+ real.mkdir()
+ # When this test runs as root the daemon serves as 'nobody' (the module
+ # sets no uid), so make the control target world-writable; push a single
+ # file with no attribute preservation so the write never needs to own/chmod
+ # the dir -- it should land purely on the receiver's normal write path.
+ os.chmod(real, 0o777)
+ rc = subprocess.run(
+ rsync_argv(f'{src}/subdir/target.txt', f'{url}upload/realdir/'),
+ stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
+ ).returncode
+ landed = real / 'target.txt'
+ if rc != 0 or not landed.is_file() \
+ or not filecmp.cmp(landed, src / 'subdir' / 'target.txt', shallow=False):
+ test_fail(f"positive control: receiver did not write into an ordinary "
+ f"in-module subdir (rc={rc}); attack scenarios would be vacuous")
+
+
+positive_control()
+
+
# 1. Single file with --size-only -- receiver normally skips basis open and
# goes straight to chmod; only the chdir-escape blocks it.
run_attack("single-file --size-only",
#
# Ensure clean_fname() does not read-before-buffer when collapsing "..".
# Exercises the --server path where a crafted merge filename hits
-# clean_fname(); a non-zero exit is expected (the input is bogus), but
-# the test fails if rsync dies from a signal (status >= 128).
+# clean_fname(); the test fails if rsync dies from a signal (status >= 128)
+# AND if rsync exits 0 -- the bogus input must be rejected (clean_fname
+# collapses "a/../test" to "test", whose merge file does not exist, so rsync
+# errors out non-zero). Accepting it would mean the name was mis-collapsed.
import os
import shlex
if proc.returncode >= 128:
test_fail(f"rsync exited due to a signal (status={proc.returncode})")
+if proc.returncode == 0:
+ test_fail("rsync accepted the bogus 'a/../test' merge filter (expected a "
+ "non-zero rejection); clean_fname() may have mis-collapsed it")
-print("OK: clean_fname() handled 'a/../test' without crashing")
+print("OK: clean_fname() handled 'a/../test' without crashing, and rejected it")
os.symlink(str(outside), mod / 'cd')
+# A legitimate in-module file, used as a positive control so the leak check
+# below can't pass simply because the daemon's listing machinery is broken.
+(mod / 'realdir').mkdir()
+(mod / 'realdir' / 'in_module.txt').write_text("INSIDE_THE_MODULE\n")
+
my_uid = get_testuid()
root_uid = get_rootuid()
root_gid = get_rootgid()
url = start_test_daemon(conf, DAEMON_PORT)
+# Positive control: a normal recursive listing of an in-module path must
+# enumerate the in-module file. If this fails, the daemon's flist generation is
+# broken and the leak check below would be vacuously satisfied.
+ctl = subprocess.run(
+ rsync_argv('-nrv', f'{url}upload/realdir/', f'{SCRATCHDIR}/dst/'),
+ capture_output=True, text=True,
+)
+if ctl.returncode != 0 or 'in_module.txt' not in ctl.stdout:
+ test_fail("positive control: listing an in-module path did not enumerate "
+ f"in_module.txt (rc={ctl.returncode}); leak check would be vacuous"
+ f"\n{ctl.stdout}{ctl.stderr}")
+
proc = subprocess.run(
rsync_argv('-nrv', f'{url}upload/cd/', f'{SCRATCHDIR}/dst/'),
capture_output=True, text=True,
)
+if proc.returncode >= 128:
+ test_fail(f"leak pull: rsync died from a signal (rc={proc.returncode})")
listfile.write_text(proc.stdout + proc.stderr)
if 'leak_marker.txt' in listfile.read_text():