| # Copyright 2017 The Emscripten Authors. All rights reserved. |
| # Emscripten is available under two separate licenses, the MIT license and the |
| # University of Illinois/NCSA Open Source License. Both these licenses can be |
| # found in the LICENSE file. |
| |
| import json |
| import multiprocessing |
| import os |
| import sys |
| import tempfile |
| import time |
| import unittest |
| |
| import browser_common |
| import common |
| from color_runner import BufferingMixin |
| from common import errlog |
| |
| from tools import emprofile, utils |
| from tools.utils import WINDOWS |
| |
| EMTEST_VISUALIZE = os.getenv('EMTEST_VISUALIZE') |
| NUM_CORES = None |
| seen_class = set() |
| torn_down = False |
| |
| |
| # Older Python versions have a bug with multiprocessing shared data |
| # structures. https://github.com/emscripten-core/emscripten/issues/25103 |
| # and https://github.com/python/cpython/issues/71936 |
| def python_multiprocessing_structures_are_buggy(): |
| v = sys.version_info |
| return (v.major, v.minor, v.micro) <= (3, 12, 7) or (v.major, v.minor, v.micro) == (3, 13, 0) |
| |
| |
| def cap_max_workers_in_pool(max_workers, is_browser): |
| if max_workers > 1 and is_browser and 'EMTEST_CORES' not in os.environ and 'EMCC_CORES' not in os.environ: |
| # TODO experiment with this number. In browser tests we'll be creating |
| # a browser instance per worker which is expensive. |
| max_workers //= 2 |
| # Python has an issue that it can only use max 61 cores on Windows: https://github.com/python/cpython/issues/89240 |
| if WINDOWS: |
| return min(max_workers, 61) |
| return max_workers |
| |
| |
| def run_test(args): |
| test, allowed_failures_counter, buffer = args |
| # If we have exceeded the number of allowed failures during the test run, |
| # abort executing further tests immediately. |
| if allowed_failures_counter and allowed_failures_counter.value < 0: |
| return None |
| |
| # Handle setUpClass which needs to be called on each worker |
| # TODO: Better handling of exceptions that happen during setUpClass |
| if test.__class__ not in seen_class: |
| seen_class.add(test.__class__) |
| test.__class__.setUpClass() |
| |
| start_time = time.perf_counter() |
| olddir = os.getcwd() |
| result = BufferedParallelTestResult() |
| result.start_time = start_time |
| result.buffer = buffer |
| temp_dir = tempfile.mkdtemp(prefix='emtest_') |
| test.set_temp_dir(temp_dir) |
| try: |
| test(result) |
| except KeyboardInterrupt: |
| # In case of KeyboardInterrupt do not emit buffered stderr/stdout |
| # as we unwind. |
| result._mirrorOutput = False |
| finally: |
| result.elapsed = time.perf_counter() - start_time |
| |
| # Before attempting to delete the tmp dir make sure the current |
| # working directory is not within it. |
| os.chdir(olddir) |
| common.force_delete_dir(temp_dir) |
| |
| # Since we are returning this result to the main thread we need to make sure |
| # that it is serializable/picklable. To do this, we delete any non-picklable |
| # fields from the instance. |
| del result._original_stdout |
| del result._original_stderr |
| |
| return result |
| |
| |
| # Executes the teardown process once. Returns True if the teardown was |
| # performed, False if it was already torn down. |
| def tear_down(): |
| global torn_down |
| if torn_down: |
| return False |
| torn_down = True |
| for cls in seen_class: |
| cls.tearDownClass() |
| return True |
| |
| |
| class ParallelTestSuite(unittest.BaseTestSuite): |
| """Runs a suite of tests in parallel. |
| |
| Creates worker threads, manages the task queue, and combines the results. |
| """ |
| |
| def __init__(self, options): |
| super().__init__() |
| self.max_failures = options.max_failures |
| self.failing_and_slow_first = options.failing_and_slow_first |
| |
| def addTest(self, test): |
| super().addTest(test) |
| test.is_parallel = True |
| |
| def run(self, result): |
| # The 'spawn' method is used on windows and it can be useful to set this on |
| # all platforms when debugging multiprocessing issues. Without this we |
| # default to 'fork' on unix which is better because global state is |
| # inherited by the child process, but can lead to hard-to-debug windows-only |
| # issues. |
| # multiprocessing.set_start_method('spawn') |
| |
| # No need to worry about stdout/stderr buffering since are a not |
| # actually running the test here, only setting the results. |
| buffer = result.buffer |
| result.buffer = False |
| |
| result.core_time = 0 |
| tests = self.get_sorted_tests() |
| self.num_tests = self.countTestCases() |
| contains_browser_test = any(test.is_browser_test() for test in tests) |
| use_cores = cap_max_workers_in_pool(min(self.num_tests, num_cores()), contains_browser_test) |
| errlog(f'Using {use_cores} parallel test processes') |
| with multiprocessing.Manager() as manager: |
| # Give each worker a unique ID. |
| worker_id_counter = manager.Value('i', 0) # 'i' for integer, starting at 0 |
| worker_id_lock = manager.Lock() |
| with multiprocessing.Pool( |
| processes=use_cores, |
| initializer=browser_common.init_worker, |
| initargs=(worker_id_counter, worker_id_lock), |
| ) as pool: |
| if python_multiprocessing_structures_are_buggy(): |
| # When multiprocessing shared structures are buggy we don't support failfast |
| # or the progress bar. |
| allowed_failures_counter = None |
| if self.max_failures < 2**31 - 1: |
| errlog('The version of python being used is not compatible with --failfast and --max-failures options. See https://github.com/python/cpython/issues/71936') |
| sys.exit(1) |
| else: |
| allowed_failures_counter = manager.Value('i', self.max_failures) |
| |
| results = [] |
| args = ((t, allowed_failures_counter, buffer) for t in tests) |
| for res in pool.imap_unordered(run_test, args, chunksize=1): |
| # results may be be None if # of allowed errors was exceeded |
| # and the harness aborted. |
| if res: |
| if res.test_result not in {'success', 'skipped'} and allowed_failures_counter is not None: |
| # Signal existing multiprocess pool runners so that they can exit early if needed. |
| allowed_failures_counter.value -= 1 |
| res.integrate_result(result) |
| results.append(res) |
| |
| # Send a task to each worker to tear down the browser and server. This |
| # relies on the implementation detail in the worker pool that all workers |
| # are cycled through once. |
| num_tear_downs = sum(pool.apply(tear_down, ()) for _ in range(use_cores)) |
| # Assert the assumed behavior above hasn't changed. |
| if num_tear_downs != use_cores and not buffer: |
| errlog(f'Expected {use_cores} teardowns, got {num_tear_downs}') |
| |
| if self.failing_and_slow_first: |
| previous_test_run_results = common.load_previous_test_run_results() |
| for r in results: |
| # Save a test result record with the specific suite name (e.g. "core0.test_foo") |
| test_failed = r.test_result not in {'success', 'skipped'} |
| |
| def update_test_results_to(test_name): |
| fail_frequency = previous_test_run_results[test_name]['fail_frequency'] if test_name in previous_test_run_results else int(test_failed) |
| # Apply exponential moving average with 50% weighting to merge previous fail frequency with new fail frequency |
| fail_frequency = (fail_frequency + int(test_failed)) / 2 |
| previous_test_run_results[test_name] = { |
| 'result': r.test_result, |
| 'duration': r.test_duration, |
| 'fail_frequency': fail_frequency, |
| } |
| |
| update_test_results_to(r.test_name) |
| # Also save a test result record without suite name (e.g. just "test_foo"). This enables different suite runs to order tests |
| # for quick --failfast termination, in case a test fails in multiple suites |
| update_test_results_to(r.test_name.split(' ')[0]) |
| |
| utils.write_file(common.PREVIOUS_TEST_RUN_RESULTS_FILE, json.dumps(previous_test_run_results, indent=2)) |
| |
| if EMTEST_VISUALIZE: |
| self.visualize_results(results) |
| return result |
| |
| def get_sorted_tests(self): |
| """Return a list of this suite's tests, sorted with the @is_slow_test tests first. |
| |
| Future work: measure and store the speed of tests each test sort more accurately. |
| """ |
| if self.failing_and_slow_first: |
| # If we are running with --failing-and-slow-first, then the test list has been |
| # pre-sorted based on previous test run results (see `runner.py`) |
| return list(self) |
| |
| def test_key(test): |
| testMethod = getattr(test, test._testMethodName) |
| is_slow = getattr(testMethod, 'is_slow', False) |
| return (is_slow, str(test)) |
| |
| return sorted(self, key=test_key, reverse=True) |
| |
| def visualize_results(self, results): |
| assert EMTEST_VISUALIZE |
| # Sort the results back into alphabetical order. Running the tests in |
| # parallel causes mis-orderings, this makes the results more readable. |
| results = sorted(results, key=lambda res: str(res.test)) |
| |
| # shared data structures are hard in the python multi-processing world, so |
| # use a file to share the flaky test information across test processes. |
| flaky_tests = utils.read_file(common.flaky_tests_log_filename).split() if os.path.isfile(common.flaky_tests_log_filename) else [] |
| # Extract only the test short names |
| flaky_tests = [x.split('.')[-1] for x in flaky_tests] |
| |
| for r in results: |
| r.log_test_run_for_visualization(flaky_tests) |
| |
| # Generate the parallel test run visualization |
| emprofile.create_profiling_graph(utils.path_from_root('out/graph')) |
| # Cleanup temp files that were used for the visualization |
| emprofile.delete_profiler_logs() |
| utils.delete_file(common.flaky_tests_log_filename) |
| |
| |
| class BufferedParallelTestResult(BufferingMixin, unittest.TestResult): |
| """A picklable struct used to communicate test results across processes.""" |
| |
| def __init__(self): |
| super().__init__() |
| self.test_duration = 0 |
| self.test_result = 'errored' |
| self.test_name = '' |
| self.test = None |
| |
| def test_short_name(self): |
| # Given a test name e.g. "test_atomic_cxx (test_core.core0.test_atomic_cxx)" |
| # returns a short form "test_atomic_cxx" of the test. |
| return self.test_name.split(' ', 1)[0] |
| |
| def addDuration(self, test, elapsed): |
| self.test_duration = elapsed |
| |
| def integrate_result(self, overall_results): |
| """Integrate buffered results from a worker process. |
| |
| This method gets called on the main thread once the buffered result is received. |
| It adds the buffered result to the overall result. |
| """ |
| |
| # Turns a <test, string> pair back into something that looks enough |
| # link a <test, exc_info> pair. The exc_info triple has the exception |
| # type as its first element. This is needed in particular in the |
| # XMLTestRunner. |
| def restore_exc_info(pair): |
| test, exn_string = pair |
| assert self.last_err_type, exn_string |
| return (test, (self.last_err_type, exn_string, None)) |
| |
| # Our fake exc_info triple keep the pre-serialized string in the |
| # second element of the triple so we override _exc_info_to_string |
| # _exc_info_to_string to simply return it. |
| overall_results._exc_info_to_string = lambda x, _y: x[1] |
| |
| overall_results.startTest(self.test) |
| if self.test_result == 'success': |
| overall_results.addSuccess(self.test) |
| elif self.test_result == 'failed': |
| overall_results.addFailure(*restore_exc_info(self.failures[0])) |
| elif self.test_result == 'errored': |
| overall_results.addError(*restore_exc_info(self.errors[0])) |
| elif self.test_result == 'skipped': |
| overall_results.addSkip(*self.skipped[0]) |
| elif self.test_result == 'unexpected success': |
| overall_results.addUnexpectedSuccess(self.unexpectedSuccesses[0]) |
| elif self.test_result == 'expected failure': |
| overall_results.addExpectedFailure(*restore_exc_info(self.expectedFailures[0])) |
| else: |
| assert False, f'unhandled test result {self.test_result}' |
| overall_results.stopTest(self.test) |
| overall_results.core_time += self.test_duration |
| |
| def log_test_run_for_visualization(self, flaky_tests): |
| assert EMTEST_VISUALIZE |
| if self.test_result != 'skipped' or self.test_duration > 0.2: |
| test_result = self.test_result |
| if test_result == 'success' and self.test_short_name() in flaky_tests: |
| test_result = 'warnings' |
| profiler_logs_path = os.path.join(tempfile.gettempdir(), 'emscripten_toolchain_profiler_logs') |
| os.makedirs(profiler_logs_path, exist_ok=True) |
| profiler_log_file = os.path.join(profiler_logs_path, 'toolchain_profiler.pid_0.json') |
| color = { |
| 'success': '#80ff80', |
| 'warnings': '#ffb040', |
| 'skipped': '#a0a0a0', |
| 'expected failure': '#ff8080', |
| 'unexpected success': '#ff8080', |
| 'failed': '#ff8080', |
| 'errored': '#ff8080', |
| }[test_result] |
| # Write profiling entries for emprofile.py tool to visualize. This needs a unique identifier for each |
| # block, so generate one on the fly. |
| dummy_test_task_counter = os.path.getsize(profiler_log_file) if os.path.isfile(profiler_log_file) else 0 |
| # Remove the redundant 'test_' prefix from each test, since character space is at a premium in the visualized graph. |
| test_name = self.test_short_name().removeprefix('test_') |
| with open(profiler_log_file, 'a', encoding='utf-8') as prof: |
| prof.write(f',\n{{"pid":{dummy_test_task_counter},"op":"start","time":{self.start_time},"cmdLine":["{test_name}"],"color":"{color}"}}') |
| prof.write(f',\n{{"pid":{dummy_test_task_counter},"op":"exit","time":{self.start_time + self.test_duration},"returncode":0}}') |
| |
| def startTest(self, test): |
| super().startTest(test) |
| self.test = test |
| self.test_name = str(test) |
| |
| def addSuccess(self, test): |
| super().addSuccess(test) |
| self.test_result = 'success' |
| |
| def addExpectedFailure(self, test, err): |
| super().addExpectedFailure(test, err) |
| self.last_err_type = err[0] |
| self.test_result = 'expected failure' |
| |
| def addUnexpectedSuccess(self, test): |
| super().addUnexpectedSuccess(test) |
| self.test_result = 'unexpected success' |
| |
| def addSkip(self, test, reason): |
| super().addSkip(test, reason) |
| self.test_result = 'skipped' |
| |
| def addFailure(self, test, err): |
| super().addFailure(test, err) |
| self.last_err_type = err[0] |
| self.test_result = 'failed' |
| |
| def addError(self, test, err): |
| super().addError(test, err) |
| self.last_err_type = err[0] |
| self.test_result = 'errored' |
| |
| |
| def num_cores(): |
| if NUM_CORES: |
| return int(NUM_CORES) |
| return utils.get_num_cores() |