blob: cf2783b6cd9ece2e64db4eb0a6182f7d65c317c9 [file] [log] [blame]
# Copyright 2017 The Emscripten Authors. All rights reserved.
# Emscripten is available under two separate licenses, the MIT license and the
# University of Illinois/NCSA Open Source License. Both these licenses can be
# found in the LICENSE file.
import multiprocessing
import os
import sys
import tempfile
import time
import unittest
import common
from tools.shared import cap_max_workers_in_pool
NUM_CORES = None
seen_class = set()
def run_test(test):
olddir = os.getcwd()
result = BufferedParallelTestResult()
temp_dir = tempfile.mkdtemp(prefix='emtest_')
test.set_temp_dir(temp_dir)
try:
if test.__class__ not in seen_class:
seen_class.add(test.__class__)
test.__class__.setUpClass()
test(result)
except unittest.SkipTest as e:
result.addSkip(test, e)
except Exception as e:
result.addError(test, e)
# Before attempting to delete the tmp dir make sure the current
# working directory is not within it.
os.chdir(olddir)
common.force_delete_dir(temp_dir)
return result
class ParallelTestSuite(unittest.BaseTestSuite):
"""Runs a suite of tests in parallel.
Creates worker threads, manages the task queue, and combines the results.
"""
def __init__(self, max_cores):
super().__init__()
self.max_cores = max_cores
def addTest(self, test):
super().addTest(test)
test.is_parallel = True
def run(self, result):
# The 'spawn' method is used on windows and it can be useful to set this on
# all platforms when debugging multiprocessing issues. Without this we
# default to 'fork' on unix which is better because global state is
# inherited by the child process, but can lead to hard-to-debug windows-only
# issues.
# multiprocessing.set_start_method('spawn')
tests = list(self.reversed_tests())
use_cores = cap_max_workers_in_pool(min(self.max_cores, len(tests), num_cores()))
print('Using %s parallel test processes' % use_cores)
pool = multiprocessing.Pool(use_cores)
results = [pool.apply_async(run_test, (t,)) for t in tests]
results = [r.get() for r in results]
pool.close()
pool.join()
return self.combine_results(result, results)
def reversed_tests(self):
"""A list of this suite's tests, sorted reverse alphabetical order.
Many of the tests in test_core are intentionally named so that long tests
fall toward the end of the alphabet (e.g. test_the_bullet). Tests are
loaded in alphabetical order, so here we reverse that in order to start
running longer tasks earlier, which should lead to better core utilization.
Future work: measure slowness of tests and sort accordingly.
"""
return reversed(sorted(self, key=str))
def combine_results(self, result, buffered_results):
print()
print('DONE: combining results on main thread')
print()
# Sort the results back into alphabetical order. Running the tests in
# parallel causes mis-orderings, this makes the results more readable.
results = sorted(buffered_results, key=lambda res: str(res.test))
for r in results:
r.updateResult(result)
return result
class BufferedParallelTestResult():
"""A picklable struct used to communicate test results across processes
Fulfills the interface for unittest.TestResult
"""
def __init__(self):
self.buffered_result = None
@property
def test(self):
return self.buffered_result.test
def addDuration(self, test, elapsed):
pass
def updateResult(self, result):
result.startTest(self.test)
self.buffered_result.updateResult(result)
result.stopTest(self.test)
def startTest(self, test):
self.start_time = time.perf_counter()
def stopTest(self, test):
# TODO(sbc): figure out a way to display this duration information again when
# these results get passed back to the TextTestRunner/TextTestResult.
if hasattr(time, 'perf_counter'):
self.buffered_result.duration = time.perf_counter() - self.start_time
def addSuccess(self, test):
if hasattr(time, 'perf_counter'):
print(test, '... ok (%.2fs)' % (time.perf_counter() - self.start_time), file=sys.stderr)
self.buffered_result = BufferedTestSuccess(test)
def addExpectedFailure(self, test, err):
if hasattr(time, 'perf_counter'):
print(test, '... expected failure (%.2fs)' % (time.perf_counter() - self.start_time), file=sys.stderr)
self.buffered_result = BufferedTestExpectedFailure(test, err)
def addUnexpectedSuccess(self, test):
if hasattr(time, 'perf_counter'):
print(test, '... unexpected success (%.2fs)' % (time.perf_counter() - self.start_time), file=sys.stderr)
self.buffered_result = BufferedTestUnexpectedSuccess(test)
def addSkip(self, test, reason):
print(test, "... skipped '%s'" % reason, file=sys.stderr)
self.buffered_result = BufferedTestSkip(test, reason)
def addFailure(self, test, err):
print(test, '... FAIL', file=sys.stderr)
self.buffered_result = BufferedTestFailure(test, err)
def addError(self, test, err):
print(test, '... ERROR', file=sys.stderr)
self.buffered_result = BufferedTestError(test, err)
class BufferedTestBase():
"""Abstract class that holds test result data, split by type of result."""
def __init__(self, test, err=None):
self.test = test
if err:
exctype, value, tb = err
self.error = exctype, value, FakeTraceback(tb)
def updateResult(self, result):
assert False, 'Base class should not be used directly'
class BufferedTestSuccess(BufferedTestBase):
def updateResult(self, result):
result.addSuccess(self.test)
class BufferedTestSkip(BufferedTestBase):
def __init__(self, test, reason):
self.test = test
self.reason = reason
def updateResult(self, result):
result.addSkip(self.test, self.reason)
def fixup_fake_exception(fake_exc):
ex = fake_exc[2]
if ex.tb_frame.f_code.positions is None:
return
while ex is not None:
# .co_positions is supposed to be a function that returns an enumerable
# to the list of code positions. Create a function object wrapper around
# the data
def make_wrapper(rtn):
return lambda: rtn
ex.tb_frame.f_code.co_positions = make_wrapper(ex.tb_frame.f_code.positions)
ex = ex.tb_next
class BufferedTestFailure(BufferedTestBase):
def updateResult(self, result):
fixup_fake_exception(self.error)
result.addFailure(self.test, self.error)
class BufferedTestExpectedFailure(BufferedTestBase):
def updateResult(self, result):
fixup_fake_exception(self.error)
result.addExpectedFailure(self.test, self.error)
class BufferedTestError(BufferedTestBase):
def updateResult(self, result):
fixup_fake_exception(self.error)
result.addError(self.test, self.error)
class BufferedTestUnexpectedSuccess(BufferedTestBase):
def updateResult(self, result):
fixup_fake_exception(self.error)
result.addUnexpectedSuccess(self.test)
class FakeTraceback():
"""A fake version of a traceback object that is picklable across processes.
Python's traceback objects contain hidden stack information that isn't able
to be pickled. Further, traceback objects aren't constructable from Python,
so we need a dummy object that fulfills its interface.
The fields we expose are exactly those which are used by
unittest.TextTestResult to show a text representation of a traceback. Any
other use is not intended.
"""
def __init__(self, tb):
self.tb_frame = FakeFrame(tb.tb_frame)
self.tb_lineno = tb.tb_lineno
self.tb_next = FakeTraceback(tb.tb_next) if tb.tb_next is not None else None
self.tb_lasti = tb.tb_lasti
class FakeFrame():
def __init__(self, f):
self.f_code = FakeCode(f.f_code)
# f.f_globals is not picklable, not used in stack traces, and needs to be iterable
self.f_globals = []
class FakeCode():
def __init__(self, co):
self.co_filename = co.co_filename
self.co_name = co.co_name
# co.co_positions() returns an iterator. Flatten it to a list so that it can
# be pickled to the parent process
if hasattr(co, 'co_positions'):
self.positions = list(co.co_positions())
else:
self.positions = None
def num_cores():
if NUM_CORES:
return int(NUM_CORES)
return multiprocessing.cpu_count()