| import argparse |
| import asyncio |
| import json |
| import plistlib |
| import shutil |
| import subprocess |
| import sys |
| from contextlib import asynccontextmanager |
| from datetime import datetime |
| from pathlib import Path |
| |
| |
| DECODE_ARGS = ("UTF-8", "backslashreplace") |
| |
| |
| # Work around a bug involving sys.exit and TaskGroups |
| # (https://github.com/python/cpython/issues/101515). |
| def exit(*args): |
| raise MySystemExit(*args) |
| |
| |
| class MySystemExit(Exception): |
| pass |
| |
| |
| # All subprocesses are executed through this context manager so that no matter |
| # what happens, they can always be cancelled from another task, and they will |
| # always be cleaned up on exit. |
| @asynccontextmanager |
| async def async_process(*args, **kwargs): |
| process = await asyncio.create_subprocess_exec(*args, **kwargs) |
| try: |
| yield process |
| finally: |
| if process.returncode is None: |
| # Allow a reasonably long time for Xcode to clean itself up, |
| # because we don't want stale emulators left behind. |
| timeout = 10 |
| process.terminate() |
| try: |
| await asyncio.wait_for(process.wait(), timeout) |
| except TimeoutError: |
| print( |
| f"Command {args} did not terminate after {timeout} seconds " |
| f" - sending SIGKILL" |
| ) |
| process.kill() |
| |
| # Even after killing the process we must still wait for it, |
| # otherwise we'll get the warning "Exception ignored in __del__". |
| await asyncio.wait_for(process.wait(), timeout=1) |
| |
| |
| async def async_check_output(*args, **kwargs): |
| async with async_process( |
| *args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs |
| ) as process: |
| stdout, stderr = await process.communicate() |
| if process.returncode == 0: |
| return stdout.decode(*DECODE_ARGS) |
| else: |
| raise subprocess.CalledProcessError( |
| process.returncode, |
| args, |
| stdout.decode(*DECODE_ARGS), |
| stderr.decode(*DECODE_ARGS), |
| ) |
| |
| |
| # Return a list of UDIDs associated with booted simulators |
| async def list_devices(): |
| # List the testing simulators, in JSON format |
| raw_json = await async_check_output( |
| "xcrun", "simctl", "--set", "testing", "list", "-j" |
| ) |
| json_data = json.loads(raw_json) |
| |
| # Filter out the booted iOS simulators |
| return [ |
| simulator["udid"] |
| for runtime, simulators in json_data["devices"].items() |
| for simulator in simulators |
| if runtime.split(".")[-1].startswith("iOS") and simulator["state"] == "Booted" |
| ] |
| |
| |
| async def find_device(initial_devices): |
| while True: |
| new_devices = set(await list_devices()).difference(initial_devices) |
| if len(new_devices) == 0: |
| await asyncio.sleep(1) |
| elif len(new_devices) == 1: |
| udid = new_devices.pop() |
| print(f"{datetime.now():%Y-%m-%d %H:%M:%S}: New test simulator detected") |
| print(f"UDID: {udid}") |
| return udid |
| else: |
| exit(f"Found more than one new device: {new_devices}") |
| |
| |
| async def log_stream_task(initial_devices): |
| # Wait up to 5 minutes for the build to complete and the simulator to boot. |
| udid = await asyncio.wait_for(find_device(initial_devices), 5 * 60) |
| |
| # Stream the iOS device's logs, filtering out messages that come from the |
| # XCTest test suite (catching NSLog messages from the test method), or |
| # Python itself (catching stdout/stderr content routed to the system log |
| # with config->use_system_logger). |
| args = [ |
| "xcrun", |
| "simctl", |
| "--set", |
| "testing", |
| "spawn", |
| udid, |
| "log", |
| "stream", |
| "--style", |
| "compact", |
| "--predicate", |
| ( |
| 'senderImagePath ENDSWITH "/iOSTestbedTests.xctest/iOSTestbedTests"' |
| ' OR senderImagePath ENDSWITH "/Python.framework/Python"' |
| ), |
| ] |
| |
| async with async_process( |
| *args, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| ) as process: |
| suppress_dupes = False |
| while line := (await process.stdout.readline()).decode(*DECODE_ARGS): |
| # The iOS log streamer can sometimes lag; when it does, it outputs |
| # a warning about messages being dropped... often multiple times. |
| # Only print the first of these duplicated warnings. |
| if line.startswith("=== Messages dropped "): |
| if not suppress_dupes: |
| suppress_dupes = True |
| sys.stdout.write(line) |
| else: |
| suppress_dupes = False |
| sys.stdout.write(line) |
| sys.stdout.flush() |
| |
| |
| async def xcode_test(location, simulator, verbose): |
| # Run the test suite on the named simulator |
| print("Starting xcodebuild...") |
| args = [ |
| "xcodebuild", |
| "test", |
| "-project", |
| str(location / "iOSTestbed.xcodeproj"), |
| "-scheme", |
| "iOSTestbed", |
| "-destination", |
| f"platform=iOS Simulator,name={simulator}", |
| "-resultBundlePath", |
| str(location / f"{datetime.now():%Y%m%d-%H%M%S}.xcresult"), |
| "-derivedDataPath", |
| str(location / "DerivedData"), |
| ] |
| if not verbose: |
| args += ["-quiet"] |
| |
| async with async_process( |
| *args, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| ) as process: |
| while line := (await process.stdout.readline()).decode(*DECODE_ARGS): |
| sys.stdout.write(line) |
| sys.stdout.flush() |
| |
| status = await asyncio.wait_for(process.wait(), timeout=1) |
| exit(status) |
| |
| |
| def clone_testbed( |
| source: Path, |
| target: Path, |
| framework: Path, |
| apps: list[Path], |
| ) -> None: |
| if target.exists(): |
| print(f"{target} already exists; aborting without creating project.") |
| sys.exit(10) |
| |
| if framework is None: |
| if not ( |
| source / "Python.xcframework/ios-arm64_x86_64-simulator/bin" |
| ).is_dir(): |
| print( |
| f"The testbed being cloned ({source}) does not contain " |
| f"a simulator framework. Re-run with --framework" |
| ) |
| sys.exit(11) |
| else: |
| if not framework.is_dir(): |
| print(f"{framework} does not exist.") |
| sys.exit(12) |
| elif not ( |
| framework.suffix == ".xcframework" |
| or (framework / "Python.framework").is_dir() |
| ): |
| print( |
| f"{framework} is not an XCframework, " |
| f"or a simulator slice of a framework build." |
| ) |
| sys.exit(13) |
| |
| print("Cloning testbed project:") |
| print(f" Cloning {source}...", end="", flush=True) |
| shutil.copytree(source, target, symlinks=True) |
| print(" done") |
| |
| if framework is not None: |
| if framework.suffix == ".xcframework": |
| print(" Installing XCFramework...", end="", flush=True) |
| xc_framework_path = (target / "Python.xcframework").resolve() |
| if xc_framework_path.is_dir(): |
| shutil.rmtree(xc_framework_path) |
| else: |
| xc_framework_path.unlink() |
| xc_framework_path.symlink_to( |
| framework.relative_to(xc_framework_path.parent, walk_up=True) |
| ) |
| print(" done") |
| else: |
| print(" Installing simulator framework...", end="", flush=True) |
| sim_framework_path = ( |
| target / "Python.xcframework" / "ios-arm64_x86_64-simulator" |
| ).resolve() |
| if sim_framework_path.is_dir(): |
| shutil.rmtree(sim_framework_path) |
| else: |
| sim_framework_path.unlink() |
| sim_framework_path.symlink_to( |
| framework.relative_to(sim_framework_path.parent, walk_up=True) |
| ) |
| print(" done") |
| else: |
| print(" Using pre-existing iOS framework.") |
| |
| for app_src in apps: |
| print(f" Installing app {app_src.name!r}...", end="", flush=True) |
| app_target = target / f"iOSTestbed/app/{app_src.name}" |
| if app_target.is_dir(): |
| shutil.rmtree(app_target) |
| shutil.copytree(app_src, app_target) |
| print(" done") |
| |
| print(f"Successfully cloned testbed: {target.resolve()}") |
| |
| |
| def update_plist(testbed_path, args): |
| # Add the test runner arguments to the testbed's Info.plist file. |
| info_plist = testbed_path / "iOSTestbed" / "iOSTestbed-Info.plist" |
| with info_plist.open("rb") as f: |
| info = plistlib.load(f) |
| |
| info["TestArgs"] = args |
| |
| with info_plist.open("wb") as f: |
| plistlib.dump(info, f) |
| |
| |
| async def run_testbed(simulator: str, args: list[str], verbose: bool=False): |
| location = Path(__file__).parent |
| print("Updating plist...", end="", flush=True) |
| update_plist(location, args) |
| print(" done.") |
| |
| # Get the list of devices that are booted at the start of the test run. |
| # The simulator started by the test suite will be detected as the new |
| # entry that appears on the device list. |
| initial_devices = await list_devices() |
| |
| try: |
| async with asyncio.TaskGroup() as tg: |
| tg.create_task(log_stream_task(initial_devices)) |
| tg.create_task(xcode_test(location, simulator=simulator, verbose=verbose)) |
| except* MySystemExit as e: |
| raise SystemExit(*e.exceptions[0].args) from None |
| except* subprocess.CalledProcessError as e: |
| # Extract it from the ExceptionGroup so it can be handled by `main`. |
| raise e.exceptions[0] |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser( |
| description=( |
| "Manages the process of testing a Python project in the iOS simulator." |
| ), |
| ) |
| |
| subcommands = parser.add_subparsers(dest="subcommand") |
| |
| clone = subcommands.add_parser( |
| "clone", |
| description=( |
| "Clone the testbed project, copying in an iOS Python framework and" |
| "any specified application code." |
| ), |
| help="Clone a testbed project to a new location.", |
| ) |
| clone.add_argument( |
| "--framework", |
| help=( |
| "The location of the XCFramework (or simulator-only slice of an " |
| "XCFramework) to use when running the testbed" |
| ), |
| ) |
| clone.add_argument( |
| "--app", |
| dest="apps", |
| action="append", |
| default=[], |
| help="The location of any code to include in the testbed project", |
| ) |
| clone.add_argument( |
| "location", |
| help="The path where the testbed will be cloned.", |
| ) |
| |
| run = subcommands.add_parser( |
| "run", |
| usage="%(prog)s [-h] [--simulator SIMULATOR] -- <test arg> [<test arg> ...]", |
| description=( |
| "Run a testbed project. The arguments provided after `--` will be " |
| "passed to the running iOS process as if they were arguments to " |
| "`python -m`." |
| ), |
| help="Run a testbed project", |
| ) |
| run.add_argument( |
| "--simulator", |
| default="iPhone SE (3rd Generation)", |
| help="The name of the simulator to use (default: 'iPhone SE (3rd Generation)')", |
| ) |
| run.add_argument( |
| "-v", "--verbose", |
| action="store_true", |
| help="Enable verbose output", |
| ) |
| |
| try: |
| pos = sys.argv.index("--") |
| testbed_args = sys.argv[1:pos] |
| test_args = sys.argv[pos + 1 :] |
| except ValueError: |
| testbed_args = sys.argv[1:] |
| test_args = [] |
| |
| context = parser.parse_args(testbed_args) |
| |
| if context.subcommand == "clone": |
| clone_testbed( |
| source=Path(__file__).parent, |
| target=Path(context.location), |
| framework=Path(context.framework).resolve() if context.framework else None, |
| apps=[Path(app) for app in context.apps], |
| ) |
| elif context.subcommand == "run": |
| if test_args: |
| if not ( |
| Path(__file__).parent / "Python.xcframework/ios-arm64_x86_64-simulator/bin" |
| ).is_dir(): |
| print( |
| f"Testbed does not contain a compiled iOS framework. Use " |
| f"`python {sys.argv[0]} clone ...` to create a runnable " |
| f"clone of this testbed." |
| ) |
| sys.exit(20) |
| |
| asyncio.run( |
| run_testbed( |
| simulator=context.simulator, |
| verbose=context.verbose, |
| args=test_args, |
| ) |
| ) |
| else: |
| print(f"Must specify test arguments (e.g., {sys.argv[0]} run -- test)") |
| print() |
| parser.print_help(sys.stderr) |
| sys.exit(21) |
| else: |
| parser.print_help(sys.stderr) |
| sys.exit(1) |
| |
| |
| if __name__ == "__main__": |
| main() |