import threading
import time
-from rsyncfns import SCRATCHDIR, rsync_argv, test_fail, test_skipped
+from rsyncfns import SCRATCHDIR, claim_ports, rsync_argv, test_fail, test_skipped
if shutil.which('python3') is None:
workdir.mkdir(parents=True, exist_ok=True)
os.chdir(workdir)
-# In-process listener: bind a TCP socket, capture the chosen port,
-# accept one client, read up to end-of-headers (or 64 KiB), reply
-# with exactly 1023 'X' bytes and no '\n', then close. We use a
-# thread rather than spawning python3 again -- simpler synchronisation,
-# same effect on the rsync side.
+# Reserve our proxy port across any concurrent test processes; another
+# test asking for the same port blocks here until we exit.
+PROXY_PORT = 12873
+claim_ports(PROXY_PORT)
+
+# In-process listener: bind to the claimed port, accept one client, read
+# up to end-of-headers (or 64 KiB), reply with exactly 1023 'X' bytes and
+# no '\n', then close. A thread is simpler than spawning python3 again
+# and has the same effect on the rsync side.
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-listener.bind(('127.0.0.1', 0))
-port = listener.getsockname()[1]
+listener.bind(('127.0.0.1', PROXY_PORT))
+port = PROXY_PORT
listener.listen(1)
from __future__ import annotations
+import fcntl
import os
import shlex
import shutil
# --- rsync invocation ------------------------------------------------------
+# --- TCP port coordination across parallel tests ---------------------------
+
+_PORT_LOCK_PATH = '/tmp/rsync_test.lck'
+_port_lock_fd = None
+
+
+def claim_ports(*ports: int) -> 'None':
+ """Reserve the given TCP port numbers for the rest of this process.
+
+ Uses POSIX byte-range locks on /tmp/rsync_test.lck (one byte per port,
+ offset = port number) so that any number of tests can run in parallel
+ without colliding on a port: if another test has already claimed any of
+ the requested ports the call blocks until that test exits. The kernel
+ drops POSIX advisory locks automatically when the holding process
+ terminates, so a crashed test releases its ports without manual
+ cleanup.
+
+ Ports are claimed in sorted order, so two callers that ask for the same
+ set in different orders can't deadlock against each other.
+
+ Call once near the top of any test that binds to a specific TCP port,
+ BEFORE the bind:
+
+ from rsyncfns import claim_ports
+ claim_ports(12873)
+ listener = socket.socket(...)
+ listener.bind(('127.0.0.1', 12873))
+
+ The lock file lives in /tmp so it's shared across all rsync test
+ processes on the host. Ports outside the claim_ports() ecosystem are
+ not protected -- nothing stops an unrelated process from binding the
+ port. For the rsync testsuite that's fine; we just need to avoid
+ collisions between concurrent test scripts.
+ """
+ global _port_lock_fd
+ if _port_lock_fd is None:
+ _port_lock_fd = os.open(
+ _PORT_LOCK_PATH,
+ os.O_CREAT | os.O_RDWR,
+ 0o666,
+ )
+ # The mode arg to os.open is masked by umask; on a runner with a
+ # restrictive umask the lock file ends up 0o644, and a second user
+ # sharing the machine can't open it RDWR. Force 0o666 explicitly.
+ # EPERM is fine: we're not the owner and the bits were already
+ # broad enough that the first owner's create satisfied us.
+ try:
+ os.fchmod(_port_lock_fd, 0o666)
+ except PermissionError:
+ pass
+ for port in sorted(ports):
+ # F_SETLKW via fcntl.lockf(LOCK_EX, length, start): exclusive
+ # byte-range lock on byte `port`, blocking until acquired.
+ fcntl.lockf(_port_lock_fd, fcntl.LOCK_EX, 1, port)
+
+
def rsync_argv(*args: str) -> list:
"""Return the argv for invoking rsync with the given extra arguments.