import argparse
import asyncio
+import fcntl
import json
+import os
import plistlib
import re
import shutil
import subprocess
import sys
+import tempfile
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
pass
+class SimulatorLock:
+ # An fcntl-based filesystem lock that can be used to ensure that
+ def __init__(self, timeout):
+ self.filename = Path(tempfile.gettempdir()) / "python-ios-testbed"
+ self.timeout = timeout
+
+ self.fd = None
+
+ async def acquire(self):
+ # Ensure the lockfile exists
+ self.filename.touch(exist_ok=True)
+
+ # Try `timeout` times to acquire the lock file, with a 1 second pause
+ # between each attempt. Report status every 10 seconds.
+ for i in range(0, self.timeout):
+ try:
+ fd = os.open(self.filename, os.O_RDWR | os.O_TRUNC, 0o644)
+ fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except OSError:
+ os.close(fd)
+ if i % 10 == 0:
+ print("... waiting", flush=True)
+ await asyncio.sleep(1)
+ else:
+ self.fd = fd
+ return
+
+ # If we reach the end of the loop, we've exceeded the allowed number of
+ # attempts.
+ raise ValueError("Unable to obtain lock on iOS simulator creation")
+
+ def release(self):
+ # If a lock is held, release it.
+ if self.fd is not None:
+ # Release the lock.
+ fcntl.flock(self.fd, fcntl.LOCK_UN)
+ os.close(self.fd)
+ self.fd = None
+
+
# All subprocesses are executed through this context manager so that no matter
# what happens, they can always be cancelled from another task, and they will
# always be cleaned up on exit.
raise
-async def find_device(initial_devices):
+async def find_device(initial_devices, lock):
while True:
new_devices = set(await list_devices()).difference(initial_devices)
if len(new_devices) == 0:
elif len(new_devices) == 1:
udid = new_devices.pop()
print(f"{datetime.now():%Y-%m-%d %H:%M:%S}: New test simulator detected")
- print(f"UDID: {udid}")
+ print(f"UDID: {udid}", flush=True)
+ lock.release()
return udid
else:
exit(f"Found more than one new device: {new_devices}")
-async def log_stream_task(initial_devices):
+async def log_stream_task(initial_devices, lock):
# Wait up to 5 minutes for the build to complete and the simulator to boot.
- udid = await asyncio.wait_for(find_device(initial_devices), 5 * 60)
+ udid = await asyncio.wait_for(find_device(initial_devices, lock), 5 * 60)
# Stream the iOS device's logs, filtering out messages that come from the
# XCTest test suite (catching NSLog messages from the test method), or
async def xcode_test(location, simulator, verbose):
# Run the test suite on the named simulator
- print("Starting xcodebuild...")
+ print("Starting xcodebuild...", flush=True)
args = [
"xcodebuild",
"test",
location = Path(__file__).parent
print("Updating plist...", end="", flush=True)
update_plist(location, args)
- print(" done.")
+ print(" done.", flush=True)
+
+ # We need to get an exclusive lock on simulator creation, to avoid issues
+ # with multiple simulators starting and being unable to tell which
+ # simulator is due to which testbed instance. See
+ # https://github.com/python/cpython/issues/130294 for details. Wait up to
+ # 10 minutes for a simulator to boot.
+ print("Obtaining lock on simulator creation...", flush=True)
+ simulator_lock = SimulatorLock(timeout=10*60)
+ await simulator_lock.acquire()
+ print("Simulator lock acquired.", flush=True)
# Get the list of devices that are booted at the start of the test run.
# The simulator started by the test suite will be detected as the new
try:
async with asyncio.TaskGroup() as tg:
- tg.create_task(log_stream_task(initial_devices))
+ tg.create_task(log_stream_task(initial_devices, simulator_lock))
tg.create_task(xcode_test(location, simulator=simulator, verbose=verbose))
except* MySystemExit as e:
raise SystemExit(*e.exceptions[0].args) from None
except* subprocess.CalledProcessError as e:
# Extract it from the ExceptionGroup so it can be handled by `main`.
raise e.exceptions[0]
+ finally:
+ simulator_lock.release()
def main():