"""
import argparse
+import concurrent.futures
import glob
import os
-import signal
import subprocess
import sys
-import time
+import threading
def parse_args():
p = argparse.ArgumentParser(description='Run rsync test suite')
p.add_argument('tests', nargs='*', metavar='TEST',
help='Test names or patterns to run (default: all)')
+ p.add_argument('-j', '--parallel', type=int, default=1, metavar='N',
+ help='Run up to N tests in parallel (default: 1)')
p.add_argument('--valgrind', action='store_true',
help='Run rsync under valgrind (logs to per-process files)')
p.add_argument('--valgrind-opts', default='', metavar='OPTS',
return cmd[:2] if cmd[0] == 'setacl' else cmd[:2]
except (FileNotFoundError, subprocess.TimeoutExpired):
continue
- # Also check if setfacl supports -k via --help
try:
r = subprocess.run(['setfacl', '--help'], capture_output=True, text=True, timeout=5)
if '-k,' in r.stdout or '-k,' in r.stderr:
os.chmod(scratchdir, os.stat(scratchdir).st_mode & ~0o2000) # clear setgid
except OSError:
pass
- # Symlink to source directory
src_link = os.path.join(scratchdir, 'src')
if not os.path.exists(src_link):
if os.path.isabs(srcdir):
return tests
-def build_rsync_cmd(rsync_bin, args, extra_rsync_opts, scratchbase):
+def build_rsync_cmd(rsync_bin, args, scratchbase):
"""Build the RSYNC command string for tests."""
parts = []
if args.valgrind:
parts.append(rsync_bin)
if args.protocol is not None:
parts.append(f'--protocol={args.protocol}')
- if extra_rsync_opts:
- parts.extend(extra_rsync_opts)
return ' '.join(parts)
-def run_test(testscript, scratchdir, env, timeout):
- """Run a single test script with timeout. Returns exit code."""
+class TestResult:
+ """Result of a single test execution."""
+ __slots__ = ('testbase', 'result', 'output', 'skipped_reason')
+
+ def __init__(self, testbase, result, output='', skipped_reason=''):
+ self.testbase = testbase
+ self.result = result
+ self.output = output
+ self.skipped_reason = skipped_reason
+
+
+def run_one_test(testscript, testbase, scratchdir, base_env, timeout,
+ srcdir, tooldir, setfacl_nodef, always_log):
+ """Run a single test. Returns a TestResult.
+
+ This function is safe to call from multiple threads — it uses only
+ per-test state (unique scratchdir, copy of env).
+ """
+ prep_scratch(scratchdir, srcdir, tooldir, setfacl_nodef)
+
+ env = base_env.copy()
+ env['scratchdir'] = scratchdir
+
logfile = os.path.join(scratchdir, 'test.log')
try:
with open(logfile, 'w') as log:
env=env, timeout=timeout,
cwd=env.get('TOOLDIR', '.')
)
- return proc.returncode
+ result = proc.returncode
except subprocess.TimeoutExpired:
- sys.stderr.write(f"TIMEOUT: {testscript} took over {timeout} seconds\n")
- return 1
+ result = 1
+ with open(logfile, 'a') as log:
+ log.write(f"\nTIMEOUT: test took over {timeout} seconds\n")
+
+ # Build output text
+ output_parts = []
+
+ show_log = always_log or (result not in (0, 77, 78))
+ if show_log:
+ output_parts.append(f'----- {testbase} log follows')
+ try:
+ with open(logfile) as f:
+ output_parts.append(f.read().rstrip())
+ except FileNotFoundError:
+ pass
+ output_parts.append(f'----- {testbase} log ends')
+ rsyncd_log = os.path.join(scratchdir, 'rsyncd.log')
+ if os.path.isfile(rsyncd_log):
+ output_parts.append(f'----- {testbase} rsyncd.log follows')
+ with open(rsyncd_log) as f:
+ output_parts.append(f.read().rstrip())
+ output_parts.append(f'----- {testbase} rsyncd.log ends')
+
+ skipped_reason = ''
+ if result == 0:
+ output_parts.append(f'PASS {testbase}')
+ elif result == 77:
+ whyfile = os.path.join(scratchdir, 'whyskipped')
+ try:
+ with open(whyfile) as f:
+ skipped_reason = f.read().strip()
+ except FileNotFoundError:
+ pass
+ output_parts.append(f'SKIP {testbase} ({skipped_reason})')
+ elif result == 78:
+ output_parts.append(f'XFAIL {testbase}')
+ else:
+ output_parts.append(f'FAIL {testbase}')
+
+ return TestResult(testbase, result, '\n'.join(output_parts), skipped_reason)
+
+
+# Lock for serializing output in parallel mode
+_print_lock = threading.Lock()
def main():
scratchbase = os.path.join(os.environ.get('scratchbase', tooldir), 'testtmp')
os.makedirs(scratchbase, exist_ok=True)
- # Read shconfig for ECHO_N/ECHO_C/ECHO_T, HOST_OS, etc.
shconfig = read_shconfig(os.path.join(tooldir, 'shconfig'))
-
- # Determine TLS args and setfacl
tls_args = get_tls_args(os.path.join(tooldir, 'config.h'))
setfacl_nodef = find_setfacl_nodef(scratchbase)
+ rsync_cmd = build_rsync_cmd(rsync_bin, args, scratchbase)
- # Collect extra rsync options from remaining argv (after --)
- extra_rsync_opts = []
-
- # Build RSYNC command
- rsync_cmd = build_rsync_cmd(rsync_bin, args, extra_rsync_opts, scratchbase)
-
- # Validate
if not os.path.isfile(rsync_bin):
sys.stderr.write(f"rsync_bin {rsync_bin} is not a file\n")
sys.exit(2)
print(f' preserve_scratch={"yes" if args.preserve_scratch else "no"}')
if args.valgrind:
print(f' valgrind=enabled (logs in valgrind.*.log)')
+ if args.parallel > 1:
+ print(f' parallel={args.parallel}')
print(f' scratchbase={scratchbase}')
- # Build environment for test scripts
- # For Solaris compatibility
+ # Build base environment for test scripts
path = os.environ.get('PATH', '')
if os.path.isdir('/usr/xpg4/bin'):
path = '/usr/xpg4/bin:' + path
- test_env = os.environ.copy()
- test_env.update({
+ base_env = os.environ.copy()
+ base_env.update({
'PATH': path,
'POSIXLY_CORRECT': '1',
'TOOLDIR': tooldir,
'TESTRUN_TIMEOUT': str(args.timeout),
'HOME': scratchbase,
})
- # Pass through shconfig values
for k, v in shconfig.items():
if v:
- test_env[k] = v
- # setfacl_nodef as a shell-friendly string
+ base_env[k] = v
if setfacl_nodef:
- test_env['setfacl_nodef'] = ' '.join(setfacl_nodef)
+ base_env['setfacl_nodef'] = ' '.join(setfacl_nodef)
else:
- test_env['setfacl_nodef'] = 'true'
-
+ base_env['setfacl_nodef'] = 'true'
if args.log_level > 8:
- test_env['RUNSHFLAGS'] = '-e -x'
+ base_env['RUNSHFLAGS'] = '-e -x'
# Collect tests
tests = collect_tests(suitedir, args.tests)
skipped = 0
skipped_list = []
- for testscript in tests:
- testbase = os.path.basename(testscript).replace('.test', '')
- scratchdir = os.path.join(scratchbase, testbase)
-
- prep_scratch(scratchdir, srcdir, tooldir, setfacl_nodef)
-
- test_env['scratchdir'] = scratchdir
-
- # Longer timeout for hardlinks test
- timeout = 600 if 'hardlinks' in testbase else args.timeout
-
- result = run_test(testscript, scratchdir, test_env, timeout)
-
- logfile = os.path.join(scratchdir, 'test.log')
-
- # Show log on failure or if always_log
- if args.always_log or (result not in (0, 77, 78)):
- print(f'----- {testbase} log follows')
- try:
- with open(logfile) as f:
- print(f.read(), end='')
- except FileNotFoundError:
- pass
- print(f'----- {testbase} log ends')
- rsyncd_log = os.path.join(scratchdir, 'rsyncd.log')
- if os.path.isfile(rsyncd_log):
- print(f'----- {testbase} rsyncd.log follows')
- with open(rsyncd_log) as f:
- print(f.read(), end='')
- print(f'----- {testbase} rsyncd.log ends')
-
- if result == 0:
- print(f'PASS {testbase}')
+ def process_result(tr):
+ """Process a TestResult and update counters. Returns True if test failed."""
+ nonlocal passed, failed, skipped
+ with _print_lock:
+ if tr.output:
+ print(tr.output)
+ scratchdir = os.path.join(scratchbase, tr.testbase)
+ if tr.result == 0:
passed += 1
if not args.preserve_scratch and os.path.isdir(scratchdir):
subprocess.run(['rm', '-rf', scratchdir], capture_output=True)
- elif result == 77:
- whyfile = os.path.join(scratchdir, 'whyskipped')
- why = ''
- try:
- with open(whyfile) as f:
- why = f.read().strip()
- except FileNotFoundError:
- pass
- print(f'SKIP {testbase} ({why})')
- skipped_list.append(testbase)
+ return False
+ elif tr.result == 77:
+ skipped_list.append(tr.testbase)
skipped += 1
if not args.preserve_scratch and os.path.isdir(scratchdir):
subprocess.run(['rm', '-rf', scratchdir], capture_output=True)
- elif result == 78:
- print(f'XFAIL {testbase}')
+ return False
+ elif tr.result == 78:
failed += 1
+ return True
else:
- print(f'FAIL {testbase}')
failed += 1
- if args.stop_on_fail:
+ return True
+
+ if args.parallel > 1:
+ # Parallel execution
+ with concurrent.futures.ThreadPoolExecutor(max_workers=args.parallel) as executor:
+ futures = {}
+ for testscript in tests:
+ testbase = os.path.basename(testscript).replace('.test', '')
+ scratchdir = os.path.join(scratchbase, testbase)
+ timeout = 600 if 'hardlinks' in testbase else args.timeout
+ f = executor.submit(
+ run_one_test, testscript, testbase, scratchdir,
+ base_env, timeout, srcdir, tooldir, setfacl_nodef,
+ args.always_log
+ )
+ futures[f] = testbase
+
+ for f in concurrent.futures.as_completed(futures):
+ tr = f.result()
+ is_fail = process_result(tr)
+ if is_fail and args.stop_on_fail:
+ # Cancel pending futures
+ for pending in futures:
+ pending.cancel()
+ break
+ else:
+ # Sequential execution
+ for testscript in tests:
+ testbase = os.path.basename(testscript).replace('.test', '')
+ scratchdir = os.path.join(scratchbase, testbase)
+ timeout = 600 if 'hardlinks' in testbase else args.timeout
+ tr = run_one_test(
+ testscript, testbase, scratchdir,
+ base_env, timeout, srcdir, tooldir, setfacl_nodef,
+ args.always_log
+ )
+ is_fail = process_result(tr)
+ if is_fail and args.stop_on_fail:
break
# Check valgrind logs for errors
try:
with open(vlog) as f:
content = f.read()
- # Check for non-zero error summary
for line in content.splitlines():
if 'ERROR SUMMARY:' in line and 'ERROR SUMMARY: 0 errors' not in line:
vg_errors += 1
if vg_errors > 0:
print(f' {vg_errors} valgrind error(s) found (see logs in {scratchbase})')
- skipped_str = ','.join(skipped_list)
+ skipped_str = ','.join(sorted(skipped_list))
if full_run and args.expect_skipped != 'IGNORE':
print('----- skipped results:')
print(f' expected: {args.expect_skipped}')