| #!/usr/bin/env python3 |
| # |
| # Copyright 2016 WebAssembly Community Group participants |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import argparse |
| import difflib |
| import fnmatch |
| import multiprocessing |
| import os |
| import re |
| import shlex |
| import shutil |
| import struct |
| import subprocess |
| import sys |
| import threading |
| import time |
| |
| import find_exe |
| from utils import Error |
| |
| IS_WINDOWS = sys.platform == 'win32' |
| TEST_DIR = os.path.dirname(os.path.abspath(__file__)) |
| REPO_ROOT_DIR = os.path.dirname(TEST_DIR) |
| OUT_DIR = os.path.join(REPO_ROOT_DIR, 'out') |
| DEFAULT_TIMEOUT = 120 # seconds |
| SLOW_TIMEOUT_MULTIPLIER = 3 |
| |
| |
| # default configurations for tests |
| TOOLS = { |
| 'wat2wasm': [ |
| ('RUN', '%(wat2wasm)s'), |
| ('ARGS', ['%(in_file)s', '-o', '%(out_dir)s/out.wasm']), |
| ('VERBOSE-ARGS', ['-v']), |
| ], |
| 'wast2json': [ |
| ('RUN', '%(wast2json)s'), |
| ('ARGS', ['%(in_file)s', '-o', '%(out_dir)s/out.json']), |
| ('VERBOSE-ARGS', ['-v']), |
| ], |
| 'wat-desugar': [ |
| ('RUN', '%(wat-desugar)s'), |
| ('ARGS', ['%(in_file)s']), |
| ], |
| 'run-objdump': [ |
| ('RUN', '%(wat2wasm)s %(in_file)s -o %(temp_file)s.wasm'), |
| ('RUN', '%(wasm-objdump)s -r -d %(temp_file)s.wasm'), |
| ('VERBOSE-ARGS', ['-v']), |
| ], |
| 'run-objdump-gen-wasm': [ |
| ('RUN', '%(gen_wasm_py)s %(in_file)s -o %(temp_file)s.wasm'), |
| ('RUN', '%(wasm-objdump)s -r -d %(temp_file)s.wasm'), |
| ('VERBOSE-ARGS', ['-v']), |
| ], |
| 'run-objdump-spec': [ |
| ('RUN', '%(wast2json)s %(in_file)s -o %(temp_file)s.json'), |
| # NOTE: wasm files must be passed in manually via ARGS1 |
| ('RUN', '%(wasm-objdump)s -r -d'), |
| ('VERBOSE-ARGS', ['-v']), |
| ], |
| 'run-roundtrip': [ |
| ('RUN', 'test/run-roundtrip.py'), |
| ('ARGS', [ |
| '%(in_file)s', |
| '-v', |
| '--bindir=%(bindir)s', |
| '--no-error-cmdline', |
| '-o', |
| '%(out_dir)s', |
| ]), |
| ('VERBOSE-ARGS', ['--print-cmd', '-v']), |
| ], |
| 'run-interp': [ |
| ('RUN', '%(wat2wasm)s %(in_file)s -o %(temp_file)s.wasm'), |
| ('RUN', '%(wasm-interp)s %(temp_file)s.wasm --run-all-exports'), |
| ('VERBOSE-ARGS', ['--print-cmd', '-v']), |
| ], |
| 'run-interp-wasi': [ |
| ('RUN', '%(wat2wasm)s %(in_file)s -o %(temp_file)s.wasm'), |
| ('RUN', '%(wasm-interp)s --wasi %(temp_file)s.wasm'), |
| ('VERBOSE-ARGS', ['--print-cmd', '-v']), |
| ], |
| 'run-interp-spec': [ |
| ('RUN', '%(wast2json)s %(in_file)s -o %(temp_file)s.json'), |
| ('RUN', '%(spectest-interp)s %(temp_file)s.json'), |
| ('VERBOSE-ARGS', ['--print-cmd', '-v']), |
| ], |
| 'run-gen-wasm': [ |
| ('RUN', '%(gen_wasm_py)s %(in_file)s -o %(temp_file)s.wasm'), |
| ('RUN', '%(wasm-validate)s %(temp_file)s.wasm'), |
| ('RUN', '%(wasm2wat)s %(temp_file)s.wasm'), |
| ('VERBOSE-ARGS', ['--print-cmd', '-v']), |
| ], |
| 'run-gen-wasm-bad': [ |
| ('RUN', '%(gen_wasm_py)s %(in_file)s -o %(temp_file)s.wasm'), |
| ('RUN', '%(wasm-validate)s %(temp_file)s.wasm'), |
| ('ERROR', '1'), |
| ('RUN', '%(wasm2wat)s %(temp_file)s.wasm'), |
| ('ERROR', '1'), |
| ('VERBOSE-ARGS', ['--print-cmd', '-v']), |
| ], |
| 'run-gen-wasm-interp': [ |
| ('RUN', '%(gen_wasm_py)s %(in_file)s -o %(temp_file)s.wasm'), |
| ('RUN', '%(wasm-interp)s --run-all-exports %(temp_file)s.wasm'), |
| ('VERBOSE-ARGS', ['--print-cmd', '-v']), |
| ], |
| 'run-gen-wasm-strip': [ |
| ('RUN', '%(gen_wasm_py)s %(in_file)s -o %(temp_file)s.wasm'), |
| ('RUN', '%(wasm-strip)s %(temp_file)s.wasm'), |
| ('RUN', '%(wasm-objdump)s -h %(temp_file)s.wasm'), |
| ('VERBOSE-ARGS', ['--print-cmd', '-v']), |
| ], |
| 'run-gen-wasm-decompile': [ |
| ('RUN', '%(gen_wasm_py)s %(in_file)s -o %(temp_file)s.wasm'), |
| ('RUN', '%(wasm-validate)s %(temp_file)s.wasm'), |
| ('RUN', '%(wasm-decompile)s %(temp_file)s.wasm'), |
| ('VERBOSE-ARGS', ['--print-cmd', '-v']), |
| ], |
| 'run-opcodecnt': [ |
| ('RUN', '%(wat2wasm)s %(in_file)s -o %(temp_file)s.wasm'), |
| ('RUN', '%(wasm-opcodecnt)s %(temp_file)s.wasm'), |
| ('VERBOSE-ARGS', ['--print-cmd', '-v']), |
| ], |
| 'run-gen-spec-js': [ |
| ('RUN', '%(wast2json)s %(in_file)s -o %(temp_file)s.json'), |
| ('RUN', '%(gen_spec_js_py)s %(temp_file)s.json'), |
| ('VERBOSE-ARGS', ['--print-cmd', '-v']), |
| ], |
| 'run-spec-wasm2c': [ |
| ('RUN', 'test/run-spec-wasm2c.py'), |
| ('ARGS', [ |
| '%(in_file)s', |
| '--bindir=%(bindir)s', |
| '--no-error-cmdline', |
| '--cflags=-DWABT_BIG_ENDIAN=' + '01'[struct.pack('<h', *struct.unpack('=h', b'\x00\x01'))[0]], |
| '-o', |
| '%(out_dir)s', |
| ]), |
| ('VERBOSE-ARGS', ['--print-cmd', '-v']), |
| ], |
| 'run-wasm-decompile': [ |
| ('RUN', '%(wat2wasm)s --enable-all %(in_file)s -o %(temp_file)s.wasm'), |
| ('RUN', '%(wasm-decompile)s --enable-all %(temp_file)s.wasm'), |
| ] |
| } |
| |
| # TODO(binji): Add Windows support for compiling using run-spec-wasm2c.py |
| if IS_WINDOWS: |
| TOOLS['run-spec-wasm2c'].append(('SKIP', '')) |
| |
| ROUNDTRIP_TOOLS = ('wat2wasm',) |
| |
| |
| class NoRoundtripError(Error): |
| pass |
| |
| |
| def Indent(s, spaces): |
| return ''.join(' ' * spaces + l for l in s.splitlines(1)) |
| |
| |
| def DiffLines(expected, actual): |
| def Decode(s): |
| return s.decode('utf-8', 'replace') |
| |
| expected_lines = [Decode(line) for line in expected.splitlines() if line] |
| actual_lines = [Decode(line) for line in actual.splitlines() if line] |
| return list( |
| difflib.unified_diff(expected_lines, actual_lines, fromfile='expected', |
| tofile='actual', lineterm='')) |
| |
| |
| class Cell(object): |
| |
| def __init__(self, value): |
| self.value = [value] |
| |
| def Set(self, value): |
| self.value[0] = value |
| |
| def Get(self): |
| return self.value[0] |
| |
| |
| def SplitArgs(value): |
| if isinstance(value, list): |
| return value |
| return shlex.split(value) |
| |
| |
| def FixPythonExecutable(args): |
| """Given an argument list beginning with a `*.py` file, return one with that |
| uses sys.executable as arg[0]. |
| """ |
| exe, rest = args[0], args[1:] |
| if os.path.splitext(exe)[1] == '.py': |
| return [sys.executable, os.path.join(REPO_ROOT_DIR, exe)] + rest |
| return args |
| |
| |
| class CommandTemplate(object): |
| |
| def __init__(self, exe): |
| self.args = SplitArgs(exe) |
| self.verbose_args = [] |
| self.stdin = None |
| self.expected_returncode = 0 |
| |
| def AppendArgs(self, args): |
| self.args += SplitArgs(args) |
| |
| def SetExpectedReturncode(self, returncode): |
| self.expected_returncode = returncode |
| |
| def AppendVerboseArgs(self, args_list): |
| for level, level_args in enumerate(args_list): |
| while level >= len(self.verbose_args): |
| self.verbose_args.append([]) |
| self.verbose_args[level] += SplitArgs(level_args) |
| |
| def SetStdin(self, filename): |
| self.stdin = filename |
| |
| def _Format(self, cmd, variables): |
| return [arg % variables for arg in cmd] |
| |
| def GetCommand(self, variables, extra_args=None, verbose_level=0): |
| args = self.args[:] |
| vl = 0 |
| while vl < verbose_level and vl < len(self.verbose_args): |
| args += self.verbose_args[vl] |
| vl += 1 |
| if extra_args: |
| args += extra_args |
| args = self._Format(args, variables) |
| stdin = self.stdin |
| if stdin: |
| stdin = stdin % variables |
| return Command(self, FixPythonExecutable(args), stdin) |
| |
| |
| class Command(object): |
| |
| def __init__(self, template, args, stdin): |
| self.template = template |
| self.args = args |
| self.stdin = stdin |
| |
| def GetExpectedReturncode(self): |
| return self.template.expected_returncode |
| |
| def Run(self, cwd, timeout, console_out=False, env=None): |
| process = None |
| is_timeout = Cell(False) |
| |
| def KillProcess(timeout=True): |
| if process: |
| try: |
| if IS_WINDOWS: |
| # http://stackoverflow.com/a/10830753: deleting child processes in |
| # Windows |
| subprocess.call(['taskkill', '/F', '/T', '/PID', str(process.pid)]) |
| else: |
| os.killpg(os.getpgid(process.pid), 15) |
| except OSError: |
| pass |
| is_timeout.Set(timeout) |
| |
| try: |
| start_time = time.time() |
| kwargs = {} |
| if not IS_WINDOWS: |
| kwargs['preexec_fn'] = os.setsid |
| stdin_data = None |
| if self.stdin: |
| stdin_data = open(self.stdin, 'rb').read() |
| |
| # http://stackoverflow.com/a/10012262: subprocess with a timeout |
| # http://stackoverflow.com/a/22582602: kill subprocess and children |
| process = subprocess.Popen(self.args, cwd=cwd, env=env, |
| stdout=None if console_out else subprocess.PIPE, |
| stderr=None if console_out else subprocess.PIPE, |
| stdin=None if not self.stdin else subprocess.PIPE, |
| **kwargs) |
| timer = threading.Timer(timeout, KillProcess) |
| try: |
| timer.start() |
| stdout, stderr = process.communicate(input=stdin_data) |
| finally: |
| returncode = process.returncode |
| process = None |
| timer.cancel() |
| if is_timeout.Get(): |
| raise Error('TIMEOUT') |
| duration = time.time() - start_time |
| except OSError as e: |
| raise Error(str(e)) |
| finally: |
| KillProcess(False) |
| |
| return RunResult(self, stdout, stderr, returncode, duration) |
| |
| def __str__(self): |
| return ' '.join(self.args) |
| |
| |
| class RunResult(object): |
| |
| def __init__(self, cmd=None, stdout='', stderr='', returncode=0, duration=0): |
| self.cmd = cmd |
| self.stdout = stdout |
| self.stderr = stderr |
| self.returncode = returncode |
| self.duration = duration |
| |
| def GetExpectedReturncode(self): |
| return self.cmd.GetExpectedReturncode() |
| |
| def Failed(self): |
| return self.returncode != self.GetExpectedReturncode() |
| |
| def __repr__(self): |
| return 'RunResult(%s, %s, %s, %s, %s)' % ( |
| self.cmd, self.stdout, self.stderr, self.returncode, self.duration) |
| |
| |
| class TestResult(object): |
| |
| def __init__(self): |
| self.results = [] |
| self.stdout = b'' |
| self.stderr = b'' |
| self.duration = 0 |
| |
| def GetLastCommand(self): |
| return self.results[-1].cmd |
| |
| def GetLastFailure(self): |
| return [r for r in self.results if r.Failed()][-1] |
| |
| def Failed(self): |
| return any(r.Failed() for r in self.results) |
| |
| def Append(self, result): |
| self.results.append(result) |
| |
| if result.stdout is not None: |
| self.stdout += result.stdout |
| |
| if result.stderr is not None: |
| self.stderr += result.stderr |
| |
| self.duration += result.duration |
| |
| |
| class TestInfo(object): |
| |
| def __init__(self): |
| self.filename = '' |
| self.header = [] |
| self.input_filename = '' |
| self.input_ = '' |
| self.expected_stdout = '' |
| self.expected_stderr = '' |
| self.tool = None |
| self.cmds = [] |
| self.env = {} |
| self.slow = False |
| self.skip = False |
| self.is_roundtrip = False |
| |
| def CreateRoundtripInfo(self, fold_exprs): |
| if self.tool not in ROUNDTRIP_TOOLS: |
| raise NoRoundtripError() |
| |
| if len(self.cmds) != 1: |
| raise NoRoundtripError() |
| |
| result = TestInfo() |
| result.SetTool('run-roundtrip') |
| result.filename = self.filename |
| result.header = self.header |
| result.input_filename = self.input_filename |
| result.input_ = self.input_ |
| result.expected_stdout = '' |
| result.expected_stderr = '' |
| |
| # TODO(binji): It's kind of cheesy to keep the enable flag based on prefix. |
| # Maybe it would be nicer to add a new directive ENABLE instead. |
| old_cmd = self.cmds[0] |
| new_cmd = result.cmds[0] |
| new_cmd.AppendArgs([f for f in old_cmd.args if f.startswith('--enable')]) |
| if fold_exprs: |
| new_cmd.AppendArgs('--fold-exprs') |
| |
| result.env = self.env |
| result.slow = self.slow |
| result.skip = self.skip |
| result.is_roundtrip = True |
| result.fold_exprs = fold_exprs |
| return result |
| |
| def GetName(self): |
| name = self.filename |
| if self.is_roundtrip: |
| if self.fold_exprs: |
| name += ' (roundtrip fold-exprs)' |
| else: |
| name += ' (roundtrip)' |
| return name |
| |
| def GetGeneratedInputFilename(self): |
| # All tests should be generated in their own directories, even if they |
| # share the same input filename. We want the input filename to be correct, |
| # though, so we use the directory of the test (.txt) file, but the basename |
| # of the input file (likely a .wast). |
| |
| path = OUT_DIR |
| if self.input_filename: |
| basename = os.path.basename(self.input_filename) |
| else: |
| basename = os.path.basename(self.filename) |
| |
| path = os.path.join(path, os.path.dirname(self.filename), basename) |
| |
| if self.is_roundtrip: |
| dirname = os.path.dirname(path) |
| basename = os.path.basename(path) |
| if self.fold_exprs: |
| path = os.path.join(dirname, 'roundtrip_folded', basename) |
| else: |
| path = os.path.join(dirname, 'roundtrip', basename) |
| |
| return path |
| |
| def SetTool(self, tool): |
| if tool not in TOOLS: |
| raise Error('Unknown tool: %s' % tool) |
| self.tool = tool |
| for tool_key, tool_value in TOOLS[tool]: |
| self.ParseDirective(tool_key, tool_value) |
| |
| def GetCommand(self, index): |
| try: |
| return self.cmds[index] |
| except IndexError: |
| raise Error('Invalid command index: %s' % index) |
| |
| def GetLastCommand(self): |
| return self.GetCommand(len(self.cmds) - 1) |
| |
| def ApplyToCommandBySuffix(self, suffix, fn): |
| if suffix == '': |
| fn(self.GetLastCommand()) |
| elif re.match(r'^\d+$', suffix): |
| fn(self.GetCommand(int(suffix))) |
| elif suffix == '*': |
| for cmd in self.cmds: |
| fn(cmd) |
| else: |
| raise Error('Invalid directive suffix: %s' % suffix) |
| |
| def ParseDirective(self, key, value): |
| if key == 'RUN': |
| self.cmds.append(CommandTemplate(value)) |
| elif key == 'STDIN_FILE': |
| self.input_filename = value |
| elif key.startswith('ARGS'): |
| suffix = key[len('ARGS'):] |
| self.ApplyToCommandBySuffix(suffix, lambda cmd: cmd.AppendArgs(value)) |
| elif key.startswith('ERROR'): |
| suffix = key[len('ERROR'):] |
| self.ApplyToCommandBySuffix( |
| suffix, lambda cmd: cmd.SetExpectedReturncode(int(value))) |
| elif key == 'SLOW': |
| self.slow = True |
| elif key == 'SKIP': |
| self.skip = True |
| elif key == 'VERBOSE-ARGS': |
| self.GetLastCommand().AppendVerboseArgs(value) |
| elif key in ['TODO', 'NOTE']: |
| pass |
| elif key == 'TOOL': |
| self.SetTool(value) |
| elif key == 'STDIN': |
| self.GetLastCommand().SetStdin(value) |
| elif key == 'ENV': |
| # Pattern: FOO=1 BAR=stuff |
| self.env = dict(x.split('=') for x in value.split()) |
| else: |
| raise Error('Unknown directive: %s' % key) |
| |
| def Parse(self, filename): |
| self.filename = filename |
| |
| test_path = os.path.join(REPO_ROOT_DIR, filename) |
| # Read/write as binary because spec tests may have invalid UTF-8 codes. |
| with open(test_path, 'rb') as f: |
| state = 'header' |
| empty = True |
| header_lines = [] |
| input_lines = [] |
| stdout_lines = [] |
| stderr_lines = [] |
| for line in f.readlines(): |
| empty = False |
| m = re.match(b'\\s*\\(;; (STDOUT|STDERR) ;;;$', line.strip()) |
| if m: |
| directive = m.group(1).decode('utf-8') |
| if directive == 'STDERR': |
| state = 'stderr' |
| continue |
| elif directive == 'STDOUT': |
| state = 'stdout' |
| continue |
| else: |
| m = re.match(b'\\s*;;;(.*)$', line) |
| if m: |
| directive = m.group(1).decode('utf-8').strip() |
| if state == 'header': |
| key, value = directive.split(':', 1) |
| key = key.strip() |
| value = value.strip() |
| self.ParseDirective(key, value) |
| elif state in ('stdout', 'stderr'): |
| if not re.match(r'%s ;;\)$' % state.upper(), directive): |
| raise Error('Bad directive in %s block: %s' % (state, |
| directive)) |
| state = 'none' |
| else: |
| raise Error('Unexpected directive: %s' % directive) |
| elif state == 'header': |
| state = 'input' |
| |
| if state == 'header': |
| header_lines.append(line.decode('utf-8')) |
| if state == 'input': |
| if self.input_filename: |
| raise Error('Can\'t have STDIN_FILE and input') |
| input_lines.append(line) |
| elif state == 'stderr': |
| stderr_lines.append(line) |
| elif state == 'stdout': |
| stdout_lines.append(line) |
| if empty: |
| raise Error('empty test file') |
| |
| self.header = ''.join(header_lines) |
| self.input_ = b''.join(input_lines) |
| self.expected_stdout = b''.join(stdout_lines) |
| self.expected_stderr = b''.join(stderr_lines) |
| |
| if not self.cmds: |
| raise Error('test has no commands') |
| |
| def CreateInputFile(self): |
| gen_input_path = self.GetGeneratedInputFilename() |
| gen_input_dir = os.path.dirname(gen_input_path) |
| try: |
| os.makedirs(gen_input_dir) |
| except OSError: |
| if not os.path.isdir(gen_input_dir): |
| raise |
| |
| # Read/write as binary because spec tests may have invalid UTF-8 codes. |
| with open(gen_input_path, 'wb') as gen_input_file: |
| if self.input_filename: |
| input_path = os.path.join(REPO_ROOT_DIR, self.input_filename) |
| with open(input_path, 'rb') as input_file: |
| gen_input_file.write(input_file.read()) |
| else: |
| # add an empty line for each header line so the line numbers match |
| gen_input_file.write(('\n' * self.header.count('\n')).encode('ascii')) |
| gen_input_file.write(self.input_) |
| gen_input_file.flush() |
| return gen_input_file.name |
| |
| def Rebase(self, stdout, stderr): |
| test_path = os.path.join(REPO_ROOT_DIR, self.filename) |
| with open(test_path, 'wb') as f: |
| f.write(self.header.encode('ascii')) |
| f.write(self.input_) |
| if stderr: |
| f.write(b'(;; STDERR ;;;\n') |
| f.write(stderr) |
| f.write(b';;; STDERR ;;)\n') |
| if stdout: |
| f.write(b'(;; STDOUT ;;;\n') |
| f.write(stdout) |
| f.write(b';;; STDOUT ;;)\n') |
| |
| def Diff(self, stdout, stderr): |
| msg = '' |
| if self.expected_stderr != stderr: |
| diff_lines = DiffLines(self.expected_stderr, stderr) |
| if len(diff_lines) > 0: |
| msg += 'STDERR MISMATCH:\n' + '\n'.join(diff_lines) + '\n' |
| |
| if self.expected_stdout != stdout: |
| diff_lines = DiffLines(self.expected_stdout, stdout) |
| if len(diff_lines) > 0: |
| msg += 'STDOUT MISMATCH:\n' + '\n'.join(diff_lines) + '\n' |
| |
| if msg: |
| raise Error(msg) |
| |
| |
| class Status(object): |
| |
| def __init__(self, isatty): |
| self.isatty = isatty |
| self.start_time = None |
| self.last_length = 0 |
| self.last_finished = None |
| self.skipped = 0 |
| self.passed = 0 |
| self.failed = 0 |
| self.total = 0 |
| self.failed_tests = [] |
| |
| def Start(self, total): |
| self.total = total |
| self.start_time = time.time() |
| |
| def Passed(self, info, duration): |
| self.passed += 1 |
| if self.isatty: |
| self._Clear() |
| self._PrintShortStatus(info) |
| else: |
| sys.stderr.write('+ %s (%.3fs)\n' % (info.GetName(), duration)) |
| |
| def Failed(self, info, error_msg, result=None): |
| self.failed += 1 |
| self.failed_tests.append((info, result)) |
| if self.isatty: |
| self._Clear() |
| sys.stderr.write('- %s\n%s\n' % (info.GetName(), Indent(error_msg, 2))) |
| |
| def Skipped(self, info): |
| self.skipped += 1 |
| if not self.isatty: |
| sys.stderr.write('. %s (skipped)\n' % info.GetName()) |
| |
| def Done(self): |
| if self.isatty: |
| sys.stderr.write('\n') |
| |
| def _PrintShortStatus(self, info): |
| assert(self.isatty) |
| total_duration = time.time() - self.start_time |
| name = info.GetName() if info else '' |
| if (self.total - self.skipped): |
| percent = 100 * (self.passed + self.failed) / (self.total - self.skipped) |
| else: |
| percent = 100 |
| status = '[+%d|-%d|%%%d] (%.2fs) %s' % (self.passed, self.failed, |
| percent, total_duration, name) |
| self.last_length = len(status) |
| self.last_finished = info |
| sys.stderr.write(status) |
| sys.stderr.flush() |
| |
| def _Clear(self): |
| assert(self.isatty) |
| sys.stderr.write('\r%s\r' % (' ' * self.last_length)) |
| |
| |
| def FindTestFiles(ext, filter_pattern_re, exclude_dirs): |
| tests = [] |
| for root, dirs, files in os.walk(TEST_DIR): |
| for ex in exclude_dirs: |
| if ex in dirs: |
| # Filtering out dirs here causes os.walk not to descend into them |
| dirs.remove(ex) |
| for f in files: |
| path = os.path.join(root, f) |
| if os.path.splitext(f)[1] == ext: |
| tests.append(os.path.relpath(path, REPO_ROOT_DIR)) |
| tests.sort() |
| return [test for test in tests if re.match(filter_pattern_re, test)] |
| |
| |
| def GetAllTestInfo(test_names, status): |
| infos = [] |
| for test_name in test_names: |
| info = TestInfo() |
| try: |
| info.Parse(test_name) |
| infos.append(info) |
| except Error as e: |
| status.Failed(info, str(e)) |
| return infos |
| |
| |
| def RunTest(info, options, variables, verbose_level=0): |
| timeout = options.timeout |
| if info.slow: |
| timeout *= SLOW_TIMEOUT_MULTIPLIER |
| |
| # Clone variables dict so it can be safely modified. |
| variables = dict(variables) |
| |
| cwd = REPO_ROOT_DIR |
| env = dict(os.environ) |
| env.update(info.env) |
| gen_input_path = info.CreateInputFile() |
| rel_gen_input_path = ( |
| os.path.relpath(gen_input_path, cwd).replace(os.path.sep, '/')) |
| variables['in_file'] = rel_gen_input_path |
| |
| # Each test runs with a unique output directory which is removed before |
| # we run the test. |
| out_dir = os.path.splitext(rel_gen_input_path)[0] |
| if os.path.isdir(out_dir): |
| shutil.rmtree(out_dir) |
| os.makedirs(out_dir) |
| variables['out_dir'] = out_dir |
| |
| # Temporary files typically are placed in `out_dir` and use the same test's |
| # basename. This name does include an extension. |
| input_basename = os.path.basename(rel_gen_input_path) |
| variables['temp_file'] = os.path.join(out_dir, |
| os.path.splitext(input_basename)[0]) |
| |
| test_result = TestResult() |
| |
| for cmd_template in info.cmds: |
| cmd = cmd_template.GetCommand(variables, options.arg, verbose_level) |
| if options.print_cmd: |
| print(cmd) |
| |
| try: |
| result = cmd.Run(cwd, timeout, verbose_level > 0, env) |
| except (Error, KeyboardInterrupt) as e: |
| return e |
| |
| test_result.Append(result) |
| if result.Failed(): |
| break |
| |
| return test_result |
| |
| |
| def HandleTestResult(status, info, result, rebase=False): |
| try: |
| if isinstance(result, (Error, KeyboardInterrupt)): |
| raise result |
| |
| if info.is_roundtrip: |
| if result.Failed(): |
| if result.GetLastFailure().returncode == 2: |
| # run-roundtrip.py returns 2 if the file couldn't be parsed. |
| # it's likely a "bad-*" file. |
| status.Skipped(info) |
| else: |
| raise Error(result.stderr) |
| else: |
| status.Passed(info, result.duration) |
| else: |
| if result.Failed(): |
| # This test has already failed, but diff it anyway. |
| last_failure = result.GetLastFailure() |
| msg = 'expected error code %d, got %d.' % ( |
| last_failure.GetExpectedReturncode(), |
| last_failure.returncode) |
| try: |
| info.Diff(result.stdout, result.stderr) |
| except Error as e: |
| msg += '\n' + str(e) |
| raise Error(msg) |
| else: |
| if rebase: |
| info.Rebase(result.stdout, result.stderr) |
| else: |
| info.Diff(result.stdout, result.stderr) |
| status.Passed(info, result.duration) |
| except Error as e: |
| status.Failed(info, str(e), result) |
| |
| |
| # Source : http://stackoverflow.com/questions/3041986/python-command-line-yes-no-input |
| def YesNoPrompt(question, default='yes'): |
| """Ask a yes/no question via input() and return their answer. |
| |
| "question" is a string that is presented to the user. |
| "default" is the presumed answer if the user just hits <Enter>. |
| It must be "yes" (the default), "no" or None (meaning |
| an answer is required of the user). |
| |
| The "answer" return value is True for "yes" or False for "no". |
| """ |
| valid = {'yes': True, 'y': True, 'ye': True, 'no': False, 'n': False} |
| if default is None: |
| prompt = ' [y/n] ' |
| elif default == 'yes': |
| prompt = ' [Y/n] ' |
| elif default == 'no': |
| prompt = ' [y/N] ' |
| else: |
| raise ValueError('invalid default answer: \'%s\'' % default) |
| |
| while True: |
| sys.stdout.write(question + prompt) |
| choice = input().lower() |
| if default is not None and choice == '': |
| return valid[default] |
| elif choice in valid: |
| return valid[choice] |
| else: |
| sys.stdout.write('Please respond with \'yes\' or \'no\' ' |
| '(or \'y\' or \'n\').\n') |
| |
| |
| def RunMultiThreaded(infos_to_run, status, options, variables): |
| pool = multiprocessing.Pool(options.jobs) |
| try: |
| results = [(info, pool.apply_async(RunTest, (info, options, variables))) |
| for info in infos_to_run] |
| while results: |
| new_results = [] |
| for info, result in results: |
| if result.ready(): |
| HandleTestResult(status, info, result.get(0), options.rebase) |
| else: |
| new_results.append((info, result)) |
| time.sleep(0.01) |
| results = new_results |
| pool.close() |
| finally: |
| pool.terminate() |
| pool.join() |
| |
| |
| def RunSingleThreaded(infos_to_run, status, options, variables): |
| continued_errors = 0 |
| |
| for info in infos_to_run: |
| result = RunTest(info, options, variables) |
| HandleTestResult(status, info, result, options.rebase) |
| if status.failed > continued_errors: |
| if options.fail_fast: |
| break |
| elif options.stop_interactive: |
| rerun_verbose = YesNoPrompt(question='Rerun with verbose option?', |
| default='no') |
| if rerun_verbose: |
| RunTest(info, options, variables, verbose_level=2) |
| should_continue = YesNoPrompt(question='Continue testing?', |
| default='yes') |
| if not should_continue: |
| break |
| elif options.verbose: |
| RunTest(info, options, variables, verbose_level=1) |
| continued_errors += 1 |
| |
| |
| def GetDefaultJobCount(): |
| cpu_count = multiprocessing.cpu_count() |
| if cpu_count <= 1: |
| return 1 |
| else: |
| return cpu_count // 2 |
| |
| |
| def main(args): |
| parser = argparse.ArgumentParser() |
| parser.add_argument('-a', '--arg', |
| help='additional args to pass to executable', |
| action='append') |
| parser.add_argument('--bindir', metavar='PATH', |
| default=find_exe.GetDefaultPath(), |
| help='directory to search for all executables.') |
| parser.add_argument('-v', '--verbose', help='print more diagnotic messages.', |
| action='store_true') |
| parser.add_argument('-f', '--fail-fast', help='Exit on first failure. ' |
| 'Extra options with \'--jobs 1\'', action='store_true') |
| parser.add_argument('--stop-interactive', |
| help='Enter interactive mode on errors. ' |
| 'Extra options with \'--jobs 1\'', action='store_true') |
| parser.add_argument('-l', '--list', help='list all tests.', |
| action='store_true') |
| parser.add_argument('-r', '--rebase', |
| help='rebase a test to its current output.', |
| action='store_true') |
| parser.add_argument('-j', '--jobs', |
| help='number of jobs to use to run tests', type=int, |
| default=GetDefaultJobCount()) |
| parser.add_argument('-t', '--timeout', type=float, default=DEFAULT_TIMEOUT, |
| help='per test timeout in seconds') |
| parser.add_argument('--no-roundtrip', |
| help='don\'t run roundtrip.py on all tests', |
| action='store_false', default=True, dest='roundtrip') |
| parser.add_argument('-p', '--print-cmd', |
| help='print the commands that are run.', |
| action='store_true') |
| parser.add_argument('patterns', metavar='pattern', nargs='*', |
| help='test patterns.') |
| options = parser.parse_args(args) |
| |
| if options.jobs != 1: |
| if options.fail_fast: |
| parser.error('--fail-fast only works with -j1') |
| if options.stop_interactive: |
| parser.error('--stop-interactive only works with -j1') |
| |
| if options.patterns: |
| exclude_dirs = [] |
| pattern_re = '|'.join( |
| fnmatch.translate('*%s*' % p) for p in options.patterns) |
| else: |
| pattern_re = '.*' |
| # By default, exclude wasi tests because WASI support is not include |
| # by int the build by default. |
| # TODO(sbc): Find some way to detect the WASI support. |
| exclude_dirs = ['wasi'] |
| |
| test_names = FindTestFiles('.txt', pattern_re, exclude_dirs) |
| |
| if options.list: |
| for test_name in test_names: |
| print(test_name) |
| return 0 |
| if not test_names: |
| print('no tests match that filter') |
| return 1 |
| |
| variables = {} |
| variables['test_dir'] = os.path.abspath(TEST_DIR) |
| variables['bindir'] = options.bindir |
| variables['gen_wasm_py'] = os.path.join(TEST_DIR, 'gen-wasm.py') |
| variables['gen_spec_js_py'] = os.path.join(TEST_DIR, 'gen-spec-js.py') |
| for exe_basename in find_exe.EXECUTABLES: |
| exe_override = os.path.join(options.bindir, exe_basename) |
| variables[exe_basename] = find_exe.FindExecutable(exe_basename, |
| exe_override) |
| |
| status = Status(sys.stderr.isatty() and not options.verbose) |
| infos = GetAllTestInfo(test_names, status) |
| infos_to_run = [] |
| for info in infos: |
| if info.skip: |
| status.Skipped(info) |
| continue |
| infos_to_run.append(info) |
| |
| if options.roundtrip: |
| for fold_exprs in False, True: |
| try: |
| infos_to_run.append(info.CreateRoundtripInfo(fold_exprs=fold_exprs)) |
| except NoRoundtripError: |
| pass |
| |
| if not os.path.exists(OUT_DIR): |
| os.makedirs(OUT_DIR) |
| |
| status.Start(len(infos_to_run)) |
| |
| try: |
| if options.jobs > 1: |
| RunMultiThreaded(infos_to_run, status, options, variables) |
| else: |
| RunSingleThreaded(infos_to_run, status, options, variables) |
| except KeyboardInterrupt: |
| print('\nInterrupted testing\n') |
| |
| status.Done() |
| |
| ret = 0 |
| if status.failed: |
| sys.stderr.write('**** FAILED %s\n' % ('*' * (80 - 14))) |
| for info, result in status.failed_tests: |
| if isinstance(result, TestResult): |
| msg = result.GetLastCommand() |
| elif isinstance(result, Error): |
| msg = result |
| else: |
| msg = '' |
| sys.stderr.write('- %s\n %s\n' % (info.GetName(), msg)) |
| ret = 1 |
| |
| return ret |
| |
| |
| if __name__ == '__main__': |
| try: |
| sys.exit(main(sys.argv[1:])) |
| except Error as e: |
| sys.stderr.write(str(e) + '\n') |
| sys.exit(1) |