Update to point to Catapult.
diff --git a/README.rst b/README.rst index dc03841..3b5f0db 100644 --- a/README.rst +++ b/README.rst
@@ -2,67 +2,8 @@ ======================= typ is a simple program for testing command line executables and Python code. -When testing Python code, it is basically a wrapper around the standard -unittest module, but it provides the following bits of additional -functionality: +The code for this project was folded into part of Chromium's Catapult project: -* Parallel test execution. -* Clean output in the style of the Ninja build tool. -* A more flexible mechanism for discovering tests from the - command line and controlling how they are run: +<https://source.chromium.org/chromium/chromium/src/+/main:third_party/catapult/third_party/typ/> - * Support for importing tests by directory, filename, or module. - * Support for specifying tests to skip, tests to run in parallel, - and tests that need to be run by themselves - -* Support for producing traces of test times compatible with Chrome's - tracing infrastructure (trace_viewer). -* Integrated test coverage reporting (including parallel coverage). -* Integrated support for debugging tests. -* Support for uploading test results automatically to a server - (useful for continuous integration monitoring of test results). -* An abstraction of operating system functionality called the - Host class. This can be used by other python code to write more - portable and easily testable code by wrapping the multiprocessing, - os, subprocess, and time modules. -* Simple libraries for integrating Ninja-style statistics and line - printing into your own code (the Stats and Printer classes). -* Support for processing arbitrary arguments from calling code to - test cases. -* Support for once-per-process setup and teardown hooks. - -(These last two bullet points allow one to write tests that do not require -Python globals). - -History -------- - -typ originated out of work on the Blink and Chromium projects, as a way to -provide a friendlier interface to the Python unittest modules. - -Work remaining --------------- - -typ is still a work in progress, but it's getting close to being done. -Things remaining for 1.0, roughly in priority order: - -- Implement a non-python file format for testing command line interfaces -- Write documentation - -Possible future work --------------------- - -- MainTestCase.check() improvements: - - - check all arguments and show all errors at once? - - make multi-line regexp matches easier to follow? - -- --debugger improvements: - - - make it skip the initial breakpoint? - -- Support testing javascript, java, c++/gtest-style binaries? -- Support for test sharding in addition to parallel execution (so that - run-webkit-tests can re-use as much of the code as possible)? -- Support for non-unittest runtest invocation (for run-webkit-tests, - other harnesses?) +It is not currently being maintained as a separate project.
diff --git a/codereview.settings b/codereview.settings deleted file mode 100644 index 8fbef92..0000000 --- a/codereview.settings +++ /dev/null
@@ -1,3 +0,0 @@ -# This file is used by gcl to get repository specific information. -CODE_REVIEW_SERVER: codereview.chromium.org -PROJECT: typ
diff --git a/pylintrc b/pylintrc deleted file mode 100644 index 9ccea6f..0000000 --- a/pylintrc +++ /dev/null
@@ -1,272 +0,0 @@ -# Copyright 2014 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -[MASTER] - -# Specify a configuration file. -#rcfile= - -# Python code to execute, usually for sys.path manipulation such as -# pygtk.require(). -#init-hook= - -# Profiled execution. -profile=no - -# Add files or directories to the blacklist. They should be base names, not -# paths. -ignore=CVS - -# Pickle collected data for later comparisons. -persistent=yes - -# List of plugins (as comma separated values of python modules names) to load, -# usually to register additional checkers. -load-plugins= - - -[MESSAGES CONTROL] - -# Enable the message, report, category or checker with the given id(s). You can -# either give multiple identifier separated by comma (,) or put this option -# multiple time. -#enable= - -# Disable the message, report, category or checker with the given id(s). You -# can either give multiple identifier separated by comma (,) or put this option -# multiple time (only on the command line, not in the configuration file where -# it should appear only once). -# CHANGED: -# C0111: Missing docstring -# I0011: Locally disabling WNNNN -# R0201: Method could be a function -# R0801: Similar lines -# W0141: Used builtin function 'map' -# W0142: Used * or ** magic -# W0511: TODO -# W0703: Catch "Exception" -disable=C0111,I0011,R0201,R0801,W0141,W0142,W0511,W0703 - - -[REPORTS] - -# Set the output format. Available formats are text, parseable, colorized, msvs -# (visual studio) and html -output-format=text - -# Put messages in a separate file for each module / package specified on the -# command line instead of printing them on stdout. Reports (if any) will be -# written in a file name "pylint_global.[txt|html]". -files-output=no - -# Tells whether to display a full report or only the messages -# CHANGED: -reports=no - -# Python expression which should return a note less than 10 (10 is the highest -# note). You have access to the variables errors warning, statement which -# respectively contain the number of errors / warnings messages and the total -# number of statements analyzed. This is used by the global evaluation report -# (RP0004). -evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10) - -# Add a comment according to your evaluation note. This is used by the global -# evaluation report (RP0004). -comment=no - - -[VARIABLES] - -# Tells whether we should check for unused import in __init__ files. -init-import=no - -# A regular expression matching the beginning of the name of dummy variables -# (i.e. not used). -dummy-variables-rgx=_|dummy - -# List of additional names supposed to be defined in builtins. Remember that -# you should avoid to define new builtins when possible. -additional-builtins= - - -[TYPECHECK] - -# Tells whether missing members accessed in mixin class should be ignored. A -# mixin class is detected if its name ends with "mixin" (case insensitive). -ignore-mixin-members=yes - -# List of classes names for which member attributes should not be checked -# (useful for classes with attributes dynamically set). -ignored-classes= - -# When zope mode is activated, add a predefined set of Zope acquired attributes -# to generated-members. -zope=no - -# List of members which are set dynamically and missed by pylint inference -# system, and so shouldn't trigger E0201 when accessed. Python regular -# expressions are accepted. -generated-members= - - -[MISCELLANEOUS] - -# List of note tags to take in consideration, separated by a comma. -notes=FIXME,XXX,TODO - - -[SIMILARITIES] - -# Minimum lines number of a similarity. -min-similarity-lines=4 - -# Ignore comments when computing similarities. -ignore-comments=yes - -# Ignore docstrings when computing similarities. -ignore-docstrings=yes - - -[FORMAT] - -# Maximum number of characters on a single line. -# max-line-length=200 -max-line-length=80 - -# Maximum number of lines in a module -# max-module-lines=1000 - -# String used as indentation unit. This is usually " " (4 spaces) or "\t" (1 -# tab). -# CHANGED: -indent-string=' ' - - -[BASIC] - -# Required attributes for module, separated by a comma -required-attributes= - -# List of builtins function names that should not be used, separated by a comma -bad-functions=map,filter,apply,input - -# Regular expression which should only match correct module names -module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$ - -# Regular expression which should only match correct module level names -const-rgx=(([a-zA-Z_][a-zA-Z0-9_]*)|(__.*__))$ - -# Regular expression which should only match correct class names -class-rgx=[A-Z_][a-zA-Z0-9]+$ - -# Regular expression which should only match correct function names -function-rgx=[a-z_][a-z0-9_]{0,40}$ - -# Regular expression which should only match correct method names -method-rgx=[a-z_][a-z0-9_]{0,48}$ - -# Regular expression which should only match correct instance attribute names -attr-rgx=[a-z_][a-z0-9_]{0,30}$ - -# Regular expression which should only match correct argument names -argument-rgx=[a-z_][a-z0-9_]{0,30}$ - -# Regular expression which should only match correct variable names -variable-rgx=[a-zA-Z0-9_]{0,30}$ - -# Regular expression which should only match correct list comprehension / -# generator expression variable names -inlinevar-rgx=[A-Za-z_][A-Za-z0-9_]*$ - -# Good variable names which should always be accepted, separated by a comma -good-names=i,j,k,ex,Run,_ - -# Bad variable names which should always be refused, separated by a comma -bad-names=foo,bar,baz,toto,tutu,tata - -# Regular expression which should only match functions or classes name which do -# not require a docstring -no-docstring-rgx=__.*__ - - -[DESIGN] - -# Maximum number of arguments for function / method -max-args=8 - -# Argument names that match this expression will be ignored. Default to name -# with leading underscore -ignored-argument-names=_.* - -# Maximum number of locals for function / method body -max-locals=32 - -# Maximum number of return / yield for function / method body -max-returns=32 - -# Maximum number of branch for function / method body -max-branches=32 - -# Maximum number of statements in function / method body -max-statements=65 - -# Maximum number of parents for a class (see R0901). -max-parents=7 - -# Maximum number of attributes for a class (see R0902). -max-attributes=16 - -# Minimum number of public methods for a class (see R0903). -min-public-methods=0 - -# Maximum number of public methods for a class (see R0904). -max-public-methods=100 - - -[CLASSES] - -# List of interface methods to ignore, separated by a comma. This is used for -# instance to not check methods defines in Zope's Interface base class. -ignore-iface-methods=isImplementedBy,deferred,extends,names,namesAndDescriptions,queryDescriptionFor,getBases,getDescriptionFor,getDoc,getName,getTaggedValue,getTaggedValueTags,isEqualOrExtendedBy,setTaggedValue,isImplementedByInstancesOf,adaptWith,is_implemented_by - -# List of method names used to declare (i.e. assign) instance attributes. -defining-attr-methods=__init__,__new__,setUp - -# List of valid names for the first argument in a class method. -valid-classmethod-first-arg=cls - - -[IMPORTS] - -# Deprecated modules which should not be used, separated by a comma -deprecated-modules=regsub,string,TERMIOS,Bastion,rexec - -# Create a graph of every (i.e. internal and external) dependencies in the -# given file (report RP0402 must not be disabled) -import-graph= - -# Create a graph of external dependencies in the given file (report RP0402 must -# not be disabled) -ext-import-graph= - -# Create a graph of internal dependencies in the given file (report RP0402 must -# not be disabled) -int-import-graph= - - -[EXCEPTIONS] - -# Exceptions that will emit a warning when being caught. Defaults to -# "Exception" -overgeneral-exceptions=Exception
diff --git a/run b/run deleted file mode 100755 index 401296b..0000000 --- a/run +++ /dev/null
@@ -1,98 +0,0 @@ -#!/usr/bin/env python - -from __future__ import print_function - -import argparse -import os -import subprocess -import sys - -from tools import cov - - -class Runner(object): - - def __init__(self): - self._verbose = False - self._repo_dir = os.path.abspath(os.path.dirname(__file__)) - self._path_to_cov = os.path.join(self._repo_dir, 'tools', 'cov.py') - self._path_to_runner = os.path.join(self._repo_dir, 'typ', 'runner.py') - self._python = sys.executable - - def main(self, argv): - parser = argparse.ArgumentParser(prog='run') - parser.add_argument('-v', '--verbose', action='store_true') - subps = parser.add_subparsers() - - subp = subps.add_parser('clean', help='Remove any local files.') - subp.set_defaults(func=self.run_clean) - - subp = subps.add_parser('coverage', - help='Run the tests and report code coverage.') - subp.set_defaults(func=self.run_coverage) - cov.add_arguments(subp) - - subp = subps.add_parser('help', - help='Get help on a subcommand.') - subp.add_argument(nargs='?', action='store', dest='subcommand', - help='The command to get help for.') - subp.set_defaults(func=self.run_help) - - subp = subps.add_parser('lint', - help='run lint over the source') - subp.set_defaults(func=self.run_lint) - - subp = subps.add_parser('tests', - help='run the tests') - subp.set_defaults(func=self.run_tests) - - args = parser.parse_args(argv) - - self._verbose = args.verbose - args.func(args) - - def call(self, *args, **kwargs): - if self._verbose: - print(' '.join(args[0])) - ret = subprocess.call(*args, **kwargs) - if ret != 0: - sys.exit(ret) - - def run_clean(self, _args): - self.call(['git', 'clean', '-fxd']) - - def run_coverage(self, args): - if not args.path: - args.path = [self._repo_dir] - if not args.source: - args.source = [os.path.join(self._repo_dir, 'typ')] - argv = cov.argv_from_args(args) - cov_args = [self._path_to_runner, '-j', '1'] - self.call([self._python, self._path_to_cov] + argv + cov_args) - - def run_help(self, args): - if args.subcommand: - self.main([args.subcommand, '--help']) - self.main(['--help']) - - def run_lint(self, _args): - self.call('pylint --rcfile=pylintrc */*.py */*/*.py', shell=True) - - def run_tests(self, _args): - # Test running the typ module directly if it is in sys.path. - self.call([ - self._python, '-m', 'typ', - 'typ.tests.main_test.TestMain.test_basic', - ]) - - # Testing running the runner directly if nothing is in sys.path. - home_dir = os.environ['HOME'] - self.call([self._python, self._path_to_runner, - 'typ.tests.main_test.TestMain.test_basic'], cwd=home_dir) - - # Run the remaining tests. - self.call([self._python, self._path_to_runner]) - - -if __name__ == '__main__': - sys.exit(Runner().main(sys.argv[1:]))
diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 3c6e79c..0000000 --- a/setup.cfg +++ /dev/null
@@ -1,2 +0,0 @@ -[bdist_wheel] -universal=1
diff --git a/setup.py b/setup.py deleted file mode 100644 index 2886c18..0000000 --- a/setup.py +++ /dev/null
@@ -1,59 +0,0 @@ -# Copyright 2014 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import sys - -from setuptools import setup, find_packages - -here = os.path.abspath(os.path.dirname(__file__)) -if here not in sys.path: - sys.path.insert(0, here) - -from typ.version import VERSION - -with open(os.path.join(here, 'README.rst')) as fp: - readme = fp.read().strip() - -readme_lines = readme.splitlines() - -setup( - name='typ', - packages=find_packages(), - package_data={'': ['../README.rst']}, - entry_points={ - 'console_scripts': [ - 'typ=typ.runner:main', - ] - }, - install_requires=[ - ], - version=VERSION, - author='Dirk Pranke', - author_email='dpranke@chromium.org', - description=readme_lines[3], - long_description=('\n' + '\n'.join(readme_lines)), - url='https://github.com/dpranke/typ', - license='Apache', - classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: Apache Software License', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.6', - 'Topic :: Software Development :: Testing', - ], -)
diff --git a/tools/__init__.py b/tools/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/tools/__init__.py +++ /dev/null
diff --git a/tools/cov.py b/tools/cov.py deleted file mode 100755 index ff97728..0000000 --- a/tools/cov.py +++ /dev/null
@@ -1,142 +0,0 @@ -#!/usr/bin/python -# Copyright 2014 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import argparse -import sys -import textwrap - -is_python3 = bool(sys.version_info.major == 3) - - -ALL_PRAGMAS = ['no cover', 'no win32', 'python2', 'python3', 'untested', - 'win32'] -DEFAULT_PRAGMAS = ALL_PRAGMAS[:] - -if is_python3: - DEFAULT_PRAGMAS.remove('python3') -else: - DEFAULT_PRAGMAS.remove('python2') - -if sys.platform == 'win32': - DEFAULT_PRAGMAS.remove('win32') -else: - DEFAULT_PRAGMAS.remove('no win32') - - -def add_arguments(parser): - parser.add_argument('--no-pragmas', action='store_true', default=False, - help='Show all uncovered lines (no pragmas).') - parser.add_argument('--path', action='append', default=[], - help='Prepend given directories to sys.path.') - parser.add_argument('--pragma', action='append', default=[], - help=('The coverage pragmas to honor ' - '(defaults to %s).' % DEFAULT_PRAGMAS)) - parser.add_argument('--show', action='append', default=[], - help='Show code protected by the specified pragmas ' - '(uses all pragmas *except* for the ones ' - 'specified).') - parser.add_argument('--show-missing', action='store_true', - default=False, help='Show missing lines.') - parser.add_argument('--source', action='append', default=[], - help='Limit coverage data to the given directories.') - - parser.formatter_class = argparse.RawTextHelpFormatter - parser.epilog = textwrap.dedent(""" - Valid pragma values are: - 'no cover': The default coverage pragma, this now means we - truly cannot cover it. - 'no win32': Code that only executes when not on Windows. - 'python2': Code that only executes under Python2. - 'python3': Code that only executes under Python3. - 'untested': Code that does not yet have tests. - 'win32': Code that only executes on Windows. - - In typ, we aim for 'no cover' to only apply to code that executes only - when coverage is not available (and hence can never be counted). Most - code, if annotated at all, should be 'untested', and we should strive - for 'untested' to not be used, either. - """) - - -def argv_from_args(args): - argv = [] - if args.no_pragmas: - argv.append('--no-pragmas') - for arg in args.path: - argv.extend(['--path', arg]) - for arg in args.show: - argv.extend(['--show', arg]) - if args.show_missing: - argv.append('--show-missing') - for arg in args.source: - argv.extend(['--source', arg]) - for arg in args.pragma: - argv.extend(['--pragma', arg]) - return argv - - -def main(argv=None): - parser = argparse.ArgumentParser() - add_arguments(parser) - args, remaining_args = parser.parse_known_args(argv) - - for path in args.path: - if path not in sys.path: - sys.path.append(path) - - try: - import coverage - from coverage.execfile import run_python_module, run_python_file - except ImportError: - print("Error: coverage is not available.") - sys.exit(1) - - cov = coverage.coverage(source=args.source) - cov.erase() - cov.clear_exclude() - - if args.no_pragmas: - args.pragma = [] - - args.pragma = args.pragma or DEFAULT_PRAGMAS - - if args.show: - args.show_missing = True - for pragma in args.show: - if pragma in args.pragma: - args.pragma.remove(pragma) - - for pragma in args.pragma: - cov.exclude('pragma: %s' % pragma) - - ret = 0 - cov.start() - try: - if remaining_args[0] == '-m': - run_python_module(remaining_args[1], remaining_args[1:]) - else: - run_python_file(remaining_args[0], remaining_args) - except SystemExit as e: - ret = e.code - cov.stop() - cov.save() - cov.report(show_missing=args.show_missing) - return ret - - -if __name__ == '__main__': - sys.exit(main())
diff --git a/typ/__init__.py b/typ/__init__.py deleted file mode 100644 index ab04414..0000000 --- a/typ/__init__.py +++ /dev/null
@@ -1,93 +0,0 @@ -# Copyright 2014 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Test Your Project - -typ is a simple program for testing command line executables and Python code. - -When testing Python code, it is basically a wrapper around the standard -unittest module, but it provides the following bits of additional -functionality: - - * Parallel test execution. - - * Clean output in the style of the Ninja build tool. - - * A more flexible mechanism for discovering tests from the - command line and controlling how they are run: - * Support for importing tests by directory, filename, or module. - * Support for specifying tests to skip, tests to run in parallel, - and tests that need to be run by themselves - - * Support for producing traces of test times compatible with Chrome's - tracing infrastructure (trace_viewer). - - * Integrated test coverage reporting. - - * Integrated support for debugging tests. - - * Support for uploading test results automatically to a server - (useful for continuous integration monitoring of test results). - - * An abstraction of operating system functionality called the - Host class. This can be used by other python code to write more - portable and easily testable code by wrapping the multiprocessing, - os, subprocess, and time modules. - - * Simple libraries for integrating Ninja-style statistics and line - printing into your own code (the Stats and Printer classes). - - * Support for processing arbitrary arguments from calling code to - test cases. - - * Support for once-per-process setup and teardown hooks. - (These last two bullet points allow one to write tests that do not - require Python globals). -""" - -from typ.arg_parser import ArgumentParser -from typ.fakes.host_fake import FakeHost -from typ.host import Host -from typ.json_results import exit_code_from_full_results -from typ.json_results import make_full_results, make_upload_request -from typ.json_results import Result, ResultSet, ResultType -from typ.runner import Runner, TestInput, TestSet, WinMultiprocessing, main -from typ.stats import Stats -from typ.printer import Printer -from typ.test_case import convert_newlines, TestCase, MainTestCase -from typ.version import VERSION - - -__all__ = [ - 'ArgumentParser', - 'FakeHost', - 'Host', - 'MainTestCase', - 'Printer', - 'Result', - 'ResultSet', - 'ResultType', - 'Runner', - 'Stats', - 'TestCase', - 'TestInput', - 'TestSet', - 'VERSION', - 'WinMultiprocessing', - 'convert_newlines', - 'exit_code_from_full_results', - 'main', - 'make_full_results', - 'make_upload_request', -]
diff --git a/typ/__main__.py b/typ/__main__.py deleted file mode 100644 index 0e026e8..0000000 --- a/typ/__main__.py +++ /dev/null
@@ -1,21 +0,0 @@ -# Copyright 2014 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys # pragma: no cover - -from typ import main # pragma: no cover - - -if __name__ == '__main__': # pragma: no cover - sys.exit(main())
diff --git a/typ/arg_parser.py b/typ/arg_parser.py deleted file mode 100644 index 707adab..0000000 --- a/typ/arg_parser.py +++ /dev/null
@@ -1,342 +0,0 @@ -# Copyright 2014 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import argparse -import optparse - -from typ.host import Host - - -class _Bailout(Exception): - pass - - -DEFAULT_COVERAGE_OMIT = ['*/typ/*', '*/site-packages/*'] -DEFAULT_STATUS_FORMAT = '[%f/%t] ' -DEFAULT_SUFFIXES = ['*_test.py', '*_unittest.py'] - - -class ArgumentParser(argparse.ArgumentParser): - - @staticmethod - def add_option_group(parser, title, discovery=False, - running=False, reporting=False, skip=None): - # TODO: Get rid of this when telemetry upgrades to argparse. - ap = ArgumentParser(add_help=False, version=False, discovery=discovery, - running=running, reporting=reporting) - optlist = ap.optparse_options(skip=skip) - group = optparse.OptionGroup(parser, title) - group.add_options(optlist) - parser.add_option_group(group) - - def __init__(self, host=None, add_help=True, version=True, discovery=True, - reporting=True, running=True): - super(ArgumentParser, self).__init__(prog='typ', add_help=add_help) - - self._host = host or Host() - self.exit_status = None - - self.usage = '%(prog)s [options] [tests...]' - - if version: - self.add_argument('-V', '--version', action='store_true', - help='Print the typ version and exit.') - - if discovery: - self.add_argument('-f', '--file-list', metavar='FILENAME', - action='store', - help=('Takes the list of tests from the file ' - '(use "-" for stdin).')) - self.add_argument('--all', action='store_true', - help=('Run all the tests, including the ones ' - 'normally skipped.')) - self.add_argument('--isolate', metavar='glob', default=[], - action='append', - help=('Globs of tests to run in isolation ' - '(serially).')) - self.add_argument('--skip', metavar='glob', default=[], - action='append', - help=('Globs of test names to skip (' - 'defaults to %(default)s).')) - self.add_argument('--suffixes', metavar='glob', default=[], - action='append', - help=('Globs of test filenames to look for (' - 'can specify multiple times; defaults ' - 'to %s).' % DEFAULT_SUFFIXES)) - - if reporting: - self.add_argument('--builder-name', - help=('Builder name to include in the ' - 'uploaded data.')) - self.add_argument('-c', '--coverage', action='store_true', - help='Reports coverage information.') - self.add_argument('--coverage-source', action='append', - default=[], - help=('Directories to include when running and ' - 'reporting coverage (defaults to ' - '--top-level-dirs plus --path)')) - self.add_argument('--coverage-omit', action='append', - default=[], - help=('Globs to omit when reporting coverage ' - '(defaults to %s).' % - DEFAULT_COVERAGE_OMIT)) - self.add_argument('--coverage-annotate', action='store_true', - help=('Produce an annotate source report.')) - self.add_argument('--coverage-show-missing', action='store_true', - help=('Show missing line ranges in coverage ' - 'report.')) - self.add_argument('--master-name', - help=('Buildbot master name to include in the ' - 'uploaded data.')) - self.add_argument('--metadata', action='append', default=[], - help=('Optional key=value metadata that will ' - 'be included in the results.')) - self.add_argument('--test-results-server', - help=('If specified, uploads the full results ' - 'to this server.')) - self.add_argument('--test-type', - help=('Name of test type to include in the ' - 'uploaded data (e.g., ' - '"telemetry_unittests").')) - self.add_argument('--write-full-results-to', metavar='FILENAME', - action='store', - help=('If specified, writes the full results to ' - 'that path.')) - self.add_argument('--write-trace-to', metavar='FILENAME', - action='store', - help=('If specified, writes the trace to ' - 'that path.')) - self.add_argument('tests', nargs='*', default=[], - help=argparse.SUPPRESS) - - if running: - self.add_argument('-d', '--debugger', action='store_true', - help='Runs the tests under the debugger.') - self.add_argument('-j', '--jobs', metavar='N', type=int, - default=self._host.cpu_count(), - help=('Runs N jobs in parallel ' - '(defaults to %(default)s).')) - self.add_argument('-l', '--list-only', action='store_true', - help='Lists all the test names found and exits.') - self.add_argument('-n', '--dry-run', action='store_true', - help=argparse.SUPPRESS) - self.add_argument('-q', '--quiet', action='store_true', - default=False, - help=('Runs as quietly as possible ' - '(only prints errors).')) - self.add_argument('-s', '--status-format', - default=self._host.getenv('NINJA_STATUS', - DEFAULT_STATUS_FORMAT), - help=argparse.SUPPRESS) - self.add_argument('-t', '--timing', action='store_true', - help='Prints timing info.') - self.add_argument('-v', '--verbose', action='count', default=0, - help=('Prints more stuff (can specify multiple ' - 'times for more output).')) - self.add_argument('--passthrough', action='store_true', - default=False, - help='Prints all output while running.') - self.add_argument('--total-shards', default=1, type=int, - help=('Total number of shards being used for ' - 'this test run. (The user of ' - 'this script is responsible for spawning ' - 'all of the shards.)')) - self.add_argument('--shard-index', default=0, type=int, - help=('Shard index (0..total_shards-1) of this ' - 'test run.')) - self.add_argument('--retry-limit', type=int, default=0, - help='Retries each failure up to N times.') - self.add_argument('--terminal-width', type=int, - default=self._host.terminal_width(), - help=argparse.SUPPRESS) - self.add_argument('--overwrite', action='store_true', - default=None, - help=argparse.SUPPRESS) - self.add_argument('--no-overwrite', action='store_false', - dest='overwrite', default=None, - help=argparse.SUPPRESS) - - if discovery or running: - self.add_argument('-P', '--path', action='append', default=[], - help=('Adds dir to sys.path (can specify ' - 'multiple times).')) - self.add_argument('--top-level-dir', action='store', default=None, - help=argparse.SUPPRESS) - self.add_argument('--top-level-dirs', action='append', default=[], - help=('Sets the top directory of project ' - '(used when running subdirs).')) - - def parse_args(self, args=None, namespace=None): - try: - rargs = super(ArgumentParser, self).parse_args(args=args, - namespace=namespace) - except _Bailout: - return None - - for val in rargs.metadata: - if '=' not in val: - self._print_message('Error: malformed --metadata "%s"' % val) - self.exit_status = 2 - - if rargs.test_results_server: - if not rargs.builder_name: - self._print_message('Error: --builder-name must be specified ' - 'along with --test-result-server') - self.exit_status = 2 - if not rargs.master_name: - self._print_message('Error: --master-name must be specified ' - 'along with --test-result-server') - self.exit_status = 2 - if not rargs.test_type: - self._print_message('Error: --test-type must be specified ' - 'along with --test-result-server') - self.exit_status = 2 - - if rargs.total_shards < 1: - self._print_message('Error: --total-shards must be at least 1') - self.exit_status = 2 - - if rargs.shard_index < 0: - self._print_message('Error: --shard-index must be at least 0') - self.exit_status = 2 - - if rargs.shard_index >= rargs.total_shards: - self._print_message('Error: --shard-index must be no more than ' - 'the number of shards (%i) minus 1' % - rargs.total_shards) - self.exit_status = 2 - - if not rargs.suffixes: - rargs.suffixes = DEFAULT_SUFFIXES - - if not rargs.coverage_omit: - rargs.coverage_omit = DEFAULT_COVERAGE_OMIT - - if rargs.debugger: # pragma: no cover - rargs.jobs = 1 - rargs.passthrough = True - - if rargs.overwrite is None: - rargs.overwrite = self._host.stdout.isatty() and not rargs.verbose - - return rargs - - # Redefining built-in 'file' pylint: disable=W0622 - - def _print_message(self, msg, file=None): - self._host.print_(msg=msg, stream=file, end='\n') - - def print_help(self, file=None): - self._print_message(msg=self.format_help(), file=file) - - def error(self, message, bailout=True): # pylint: disable=W0221 - self.exit(2, '%s: error: %s\n' % (self.prog, message), bailout=bailout) - - def exit(self, status=0, message=None, # pylint: disable=W0221 - bailout=True): - self.exit_status = status - if message: - self._print_message(message, file=self._host.stderr) - if bailout: - raise _Bailout() - - def optparse_options(self, skip=None): - skip = skip or [] - options = [] - for action in self._actions: - args = [flag for flag in action.option_strings if flag not in skip] - if not args or action.help == '==SUPPRESS==': - # must either be a positional argument like 'tests' - # or an option we want to skip altogether. - continue - - kwargs = { - 'default': action.default, - 'dest': action.dest, - 'help': action.help, - 'metavar': action.metavar, - 'type': action.type, - 'action': _action_str(action) - } - options.append(optparse.make_option(*args, **kwargs)) - return options - - def argv_from_args(self, args): - default_parser = ArgumentParser(host=self._host) - default_args = default_parser.parse_args([]) - argv = [] - tests = [] - d = vars(args) - for k in sorted(d.keys()): - v = d[k] - argname = _argname_from_key(k) - action = self._action_for_key(k) - if not action: - continue - action_str = _action_str(action) - if k == 'tests': - tests = v - continue - if getattr(default_args, k) == v: - # this arg has the default value, so skip it. - continue - - assert action_str in ['append', 'count', 'store', 'store_true'] - if action_str == 'append': - for el in v: - argv.append(argname) - argv.append(el) - elif action_str == 'count': - for _ in range(v): - argv.append(argname) - elif action_str == 'store': - argv.append(argname) - argv.append(str(v)) - else: - # action_str == 'store_true' - argv.append(argname) - - return argv + tests - - def _action_for_key(self, key): - for action in self._actions: - if action.dest == key: - return action - - # Assume foreign argument: something used by the embedder of typ, for - # example. - return None - - -def _action_str(action): - # Access to a protected member pylint: disable=W0212 - assert action.__class__ in ( - argparse._AppendAction, - argparse._CountAction, - argparse._StoreAction, - argparse._StoreTrueAction - ) - - if isinstance(action, argparse._AppendAction): - return 'append' - if isinstance(action, argparse._CountAction): - return 'count' - if isinstance(action, argparse._StoreAction): - return 'store' - if isinstance(action, argparse._StoreTrueAction): - return 'store_true' - - -def _argname_from_key(key): - return '--' + key.replace('_', '-')
diff --git a/typ/fakes/__init__.py b/typ/fakes/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/typ/fakes/__init__.py +++ /dev/null
diff --git a/typ/fakes/host_fake.py b/typ/fakes/host_fake.py deleted file mode 100644 index 89c0075..0000000 --- a/typ/fakes/host_fake.py +++ /dev/null
@@ -1,292 +0,0 @@ -# Copyright 2014 Dirk Pranke. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import copy -import io -import logging -import sys - -from typ.host import _TeedStream - - -is_python3 = bool(sys.version_info.major == 3) - -if is_python3: # pragma: python3 - # pylint: disable=redefined-builtin,invalid-name - unicode = str - - -class FakeHost(object): - # "too many instance attributes" pylint: disable=R0902 - # "redefining built-in" pylint: disable=W0622 - # "unused arg" pylint: disable=W0613 - - python_interpreter = 'python' - is_python3 = bool(sys.version_info.major == 3) - - def __init__(self): - self.logger = logging.getLogger() - self.stdin = io.StringIO() - self.stdout = io.StringIO() - self.stderr = io.StringIO() - self.platform = 'linux2' - self.env = {} - self.sep = '/' - self.dirs = set([]) - self.files = {} - self.fetches = [] - self.fetch_responses = {} - self.written_files = {} - self.last_tmpdir = None - self.current_tmpno = 0 - self.mtimes = {} - self.cmds = [] - self.cwd = '/tmp' - self._orig_logging_handlers = [] - - def __getstate__(self): - d = copy.copy(self.__dict__) - del d['stderr'] - del d['stdout'] - del d['stdin'] - del d['logger'] - del d['_orig_logging_handlers'] - return d - - def __setstate__(self, d): - for k, v in d.items(): - setattr(self, k, v) - self.logger = logging.getLogger() - self.stdin = io.StringIO() - self.stdout = io.StringIO() - self.stderr = io.StringIO() - - def abspath(self, *comps): - relpath = self.join(*comps) - if relpath.startswith('/'): - return relpath - return self.join(self.cwd, relpath) - - def add_to_path(self, *comps): - absolute_path = self.abspath(*comps) - if absolute_path not in sys.path: - sys.path.append(absolute_path) - - def basename(self, path): - return path.split(self.sep)[-1] - - def call(self, argv, stdin=None, env=None): - self.cmds.append(argv) - return 0, '', '' - - def call_inline(self, argv): - return self.call(argv)[0] - - def chdir(self, *comps): - path = self.join(*comps) - if not path.startswith('/'): - path = self.join(self.cwd, path) - self.cwd = path - - def cpu_count(self): - return 1 - - def dirname(self, path): - return '/'.join(path.split('/')[:-1]) - - def exists(self, *comps): - path = self.abspath(*comps) - return ((path in self.files and self.files[path] is not None) or - path in self.dirs) - - def files_under(self, top): - files = [] - top = self.abspath(top) - for f in self.files: - if self.files[f] is not None and f.startswith(top): - files.append(self.relpath(f, top)) - return files - - def for_mp(self): - return self - - def getcwd(self): - return self.cwd - - def getenv(self, key, default=None): - return self.env.get(key, default) - - def getpid(self): - return 1 - - def isdir(self, *comps): - path = self.abspath(*comps) - return path in self.dirs - - def isfile(self, *comps): - path = self.abspath(*comps) - return path in self.files and self.files[path] is not None - - def join(self, *comps): - p = '' - for c in comps: - if c in ('', '.'): - continue - elif c.startswith('/'): - p = c - elif p: - p += '/' + c - else: - p = c - - # Handle ./ - p = p.replace('/./', '/') - - # Handle ../ - while '/..' in p: - comps = p.split('/') - idx = comps.index('..') - comps = comps[:idx-1] + comps[idx+1:] - p = '/'.join(comps) - return p - - def maybe_mkdir(self, *comps): - path = self.abspath(self.join(*comps)) - if path not in self.dirs: - self.dirs.add(path) - - def mktempfile(self, delete=True): - curno = self.current_tmpno - self.current_tmpno += 1 - f = io.StringIO() - f.name = '__im_tmp/tmpfile_%u' % curno - return f - - def mkdtemp(self, suffix='', prefix='tmp', dir=None, **_kwargs): - if dir is None: - dir = self.sep + '__im_tmp' - curno = self.current_tmpno - self.current_tmpno += 1 - self.last_tmpdir = self.join(dir, '%s_%u_%s' % (prefix, curno, suffix)) - self.dirs.add(self.last_tmpdir) - return self.last_tmpdir - - def mtime(self, *comps): - return self.mtimes.get(self.join(*comps), 0) - - def print_(self, msg='', end='\n', stream=None): - stream = stream or self.stdout - stream.write(msg + end) - stream.flush() - - def read_binary_file(self, *comps): - return self._read(comps) - - def read_text_file(self, *comps): - return self._read(comps) - - def _read(self, comps): - return self.files[self.abspath(*comps)] - - def realpath(self, *comps): - return self.abspath(*comps) - - def relpath(self, path, start): - return path.replace(start + '/', '') - - def remove(self, *comps): - path = self.abspath(*comps) - self.files[path] = None - self.written_files[path] = None - - def rmtree(self, *comps): - path = self.abspath(*comps) - for f in self.files: - if f.startswith(path): - self.files[f] = None - self.written_files[f] = None - self.dirs.remove(path) - - def terminal_width(self): - return 80 - - def splitext(self, path): - idx = path.rfind('.') - if idx == -1: - return (path, '') - return (path[:idx], path[idx:]) - - def time(self): - return 0 - - def write_binary_file(self, path, contents): - self._write(path, contents) - - def write_text_file(self, path, contents): - self._write(path, contents) - - def _write(self, path, contents): - full_path = self.abspath(path) - self.maybe_mkdir(self.dirname(full_path)) - self.files[full_path] = contents - self.written_files[full_path] = contents - - def fetch(self, url, data=None, headers=None): - resp = self.fetch_responses.get(url, FakeResponse(unicode(''), url)) - self.fetches.append((url, data, headers, resp)) - return resp - - def _tap_output(self): - self.stdout = _TeedStream(self.stdout) - self.stderr = _TeedStream(self.stderr) - if True: - sys.stdout = self.stdout - sys.stderr = self.stderr - - def _untap_output(self): - assert isinstance(self.stdout, _TeedStream) - self.stdout = self.stdout.stream - self.stderr = self.stderr.stream - if True: - sys.stdout = self.stdout - sys.stderr = self.stderr - - def capture_output(self, divert=True): - self._tap_output() - self._orig_logging_handlers = self.logger.handlers - if self._orig_logging_handlers: - self.logger.handlers = [logging.StreamHandler(self.stderr)] - self.stdout.capture(divert=divert) - self.stderr.capture(divert=divert) - - def restore_output(self): - assert isinstance(self.stdout, _TeedStream) - out, err = (self.stdout.restore(), self.stderr.restore()) - self.logger.handlers = self._orig_logging_handlers - self._untap_output() - return out, err - - -class FakeResponse(io.StringIO): - - def __init__(self, response, url, code=200): - io.StringIO.__init__(self, response) - self._url = url - self.code = code - - def geturl(self): - return self._url - - def getcode(self): - return self.code
diff --git a/typ/fakes/test_result_server_fake.py b/typ/fakes/test_result_server_fake.py deleted file mode 100644 index 2cc876c..0000000 --- a/typ/fakes/test_result_server_fake.py +++ /dev/null
@@ -1,79 +0,0 @@ -# Copyright 2014 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""A fake implementation of test-results.appspot.com.""" - -import io -import sys -import threading - - -if sys.version_info.major == 2: # pragma: python2 - from SimpleHTTPServer import SimpleHTTPRequestHandler as HTTPRequestHandler - from SocketServer import TCPServer -else: # pragma: python3 - assert sys.version_info.major == 3 - # pylint: disable=invalid-name, redefined-builtin - unicode = str - from http.server import BaseHTTPRequestHandler # pylint: disable=F0401 - HTTPRequestHandler = BaseHTTPRequestHandler - from socketserver import TCPServer # pylint: disable=F0401 - - -def start(code=200): - server = _Server(code=code) - thread = threading.Thread(target=_run, args=(server,)) - server.main_thread = thread - thread.daemon = True - thread.start() - return server - - -def _run(server): - server.serve_forever(0.05) - - -class _Server(TCPServer): - - def __init__(self, code): - self.allow_reuse_address = True - TCPServer.__init__(self, ('localhost', 0), _RequestHandler) - self.log = io.StringIO() - self.requests = [] - self.main_thread = None - self.code = code - - def stop(self): - self.shutdown() - self.main_thread.join() - return self.requests - - -class _RequestHandler(HTTPRequestHandler): - - def __init__(self, *args, **kwargs): - HTTPRequestHandler.__init__(self, *args, **kwargs) - - # 'Invalid Name' pylint: disable=C0103 - def do_POST(self): - path = self.path - length = int(self.headers['content-length']) - payload = self.rfile.read(length) - self.server.requests.append(('post', path, payload)) - self.send_response(self.server.code) - self.end_headers() - - # 'Redefining built-in' pylint: disable=W0622 - def log_message(self, format, *args): - self.server.log.write(unicode('%s\n' % (format % args)))
diff --git a/typ/fakes/tests/__init__.py b/typ/fakes/tests/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/typ/fakes/tests/__init__.py +++ /dev/null
diff --git a/typ/fakes/tests/host_fake_test.py b/typ/fakes/tests/host_fake_test.py deleted file mode 100644 index a16b7ba..0000000 --- a/typ/fakes/tests/host_fake_test.py +++ /dev/null
@@ -1,83 +0,0 @@ -# Copyright 2014 Dirk Pranke. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys - -from typ.tests import host_test -from typ.fakes.host_fake import FakeHost, FakeResponse - -is_python3 = bool(sys.version_info.major == 3) - -if is_python3: # pragma: python3 - # redefining built-in 'unicode' pylint: disable=W0622 - unicode = str - -class TestFakeHost(host_test.TestHost): - - def host(self): - return FakeHost() - - def test_add_to_path(self): - # TODO: FakeHost uses the real sys.path, and then gets - # confused becayse host.abspath() doesn't work right for - # windows-style paths. - if sys.platform != 'win32': - super(TestFakeHost, self).test_add_to_path() - - def test_call(self): - h = self.host() - ret, out, err = h.call(['echo', 'hello, world']) - self.assertEqual(ret, 0) - self.assertEqual(out, '') - self.assertEqual(err, '') - self.assertEqual(h.cmds, [['echo', 'hello, world']]) - - def test_call_inline(self): - h = self.host() - ret = h.call_inline(['echo', 'hello, world']) - self.assertEqual(ret, 0) - - def test_capture_output(self): - h = self.host() - self.host = lambda: h - super(TestFakeHost, self).test_capture_output() - - # This tests that the super-method only tested the - # divert=True case, and things were diverted properly. - self.assertEqual(h.stdout.getvalue(), '') - self.assertEqual(h.stderr.getvalue(), '') - - h.capture_output(divert=False) - h.print_('on stdout') - h.print_('on stderr', stream=h.stderr) - out, err = h.restore_output() - self.assertEqual(out, 'on stdout\n') - self.assertEqual(err, 'on stderr\n') - self.assertEqual(h.stdout.getvalue(), 'on stdout\n') - self.assertEqual(h.stderr.getvalue(), 'on stderr\n') - - def test_for_mp(self): - h = self.host() - self.assertNotEqual(h.for_mp(), None) - - def test_fetch(self): - h = self.host() - url = 'http://localhost/test' - resp = FakeResponse(unicode('foo'), url) - h.fetch_responses[url] = resp - actual_resp = h.fetch(url) - self.assertEqual(actual_resp.geturl(), url) - self.assertEqual(actual_resp.getcode(), 200) - self.assertEqual(resp, actual_resp) - self.assertEqual(h.fetches, [(url, None, None, actual_resp)])
diff --git a/typ/fakes/tests/test_result_server_fake_test.py b/typ/fakes/tests/test_result_server_fake_test.py deleted file mode 100644 index 87316c8..0000000 --- a/typ/fakes/tests/test_result_server_fake_test.py +++ /dev/null
@@ -1,36 +0,0 @@ -# Copyright 2014 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -from typ.fakes import test_result_server_fake -from typ import Host - - -class TestResultServerFakeTest(unittest.TestCase): - def test_basic_upload(self): - host = Host() - server = None - posts = [] - try: - server = test_result_server_fake.start() - url = 'http://%s:%d/testfile/upload' % server.server_address - if server: - resp = host.fetch(url, 'foo=bar') - finally: - if server: - posts = server.stop() - self.assertEqual(posts, [('post', '/testfile/upload', - 'foo=bar'.encode('utf8'))]) - self.assertNotEqual(server.log.getvalue(), '')
diff --git a/typ/host.py b/typ/host.py deleted file mode 100644 index 42890db..0000000 --- a/typ/host.py +++ /dev/null
@@ -1,286 +0,0 @@ -# Copyright 2014 Dirk Pranke. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import io -import logging -import multiprocessing -import os -import shutil -import subprocess -import sys -import tempfile -import time - - -if sys.version_info.major == 2: # pragma: python2 - from urllib2 import urlopen, Request -else: # pragma: python3 - # pylint: disable=E0611 - assert sys.version_info.major == 3 - from urllib.request import urlopen, Request # pylint: disable=F0401,E0611 - - -class Host(object): - python_interpreter = sys.executable - is_python3 = bool(sys.version_info.major == 3) - - pathsep = os.pathsep - sep = os.sep - env = os.environ - - _orig_stdout = sys.stdout - _orig_stderr = sys.stderr - - def __init__(self): - self.logger = logging.getLogger() - self._orig_logging_handlers = None - self.stdout = sys.stdout - self.stderr = sys.stderr - self.stdin = sys.stdin - self.env = os.environ - self.platform = sys.platform - - def abspath(self, *comps): - return os.path.abspath(self.join(*comps)) - - def add_to_path(self, *comps): - absolute_path = self.abspath(*comps) - if absolute_path not in sys.path: - sys.path.insert(0, absolute_path) - - def basename(self, path): - return os.path.basename(path) - - def call(self, argv, stdin=None, env=None): - if stdin: - stdin_pipe = subprocess.PIPE - else: - stdin_pipe = None - proc = subprocess.Popen(argv, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, stdin=stdin_pipe, - env=env) - if stdin_pipe: - proc.stdin.write(stdin.encode('utf-8')) - stdout, stderr = proc.communicate() - - # pylint type checking bug - pylint: disable=E1103 - return proc.returncode, stdout.decode('utf-8'), stderr.decode('utf-8') - - def call_inline(self, argv, env=None): - if isinstance(self.stdout, _TeedStream): # pragma: no cover - ret, out, err = self.call(argv, env) - self.print_(out, end='') - self.print_(err, end='', stream=self.stderr) - return ret - return subprocess.call(argv, stdin=self.stdin, stdout=self.stdout, - stderr=self.stderr, env=env) - - def chdir(self, *comps): - return os.chdir(self.join(*comps)) - - def cpu_count(self): - return multiprocessing.cpu_count() - - def dirname(self, *comps): - return os.path.dirname(self.join(*comps)) - - def exists(self, *comps): - return os.path.exists(self.join(*comps)) - - def files_under(self, top): - all_files = [] - for root, _, files in os.walk(top): - for f in files: - relpath = self.relpath(os.path.join(root, f), top) - all_files.append(relpath) - return all_files - - def getcwd(self): - return os.getcwd() - - def getenv(self, key, default=None): - return self.env.get(key, default) - - def getpid(self): - return os.getpid() - - def for_mp(self): - return None - - def isdir(self, *comps): - return os.path.isdir(os.path.join(*comps)) - - def isfile(self, *comps): - return os.path.isfile(os.path.join(*comps)) - - def join(self, *comps): - return os.path.join(*comps) - - def maybe_mkdir(self, *comps): - path = self.abspath(self.join(*comps)) - if not self.exists(path): - os.makedirs(path) - - def mktempfile(self, delete=True): - return tempfile.NamedTemporaryFile(delete=delete) - - def mkdtemp(self, **kwargs): - return tempfile.mkdtemp(**kwargs) - - def mtime(self, *comps): - return os.stat(self.join(*comps)).st_mtime - - def print_(self, msg='', end='\n', stream=None): - stream = stream or self.stdout - stream.write(str(msg) + end) - stream.flush() - - def read_text_file(self, *comps): - return self._read(comps, 'r') - - def read_binary_file(self, *comps): - return self._read(comps, 'rb') - - def _read(self, comps, mode): - path = self.join(*comps) - with open(path, mode) as f: - return f.read() - - def realpath(self, *comps): - return os.path.realpath(os.path.join(*comps)) - - def relpath(self, path, start): - return os.path.relpath(path, start) - - def remove(self, *comps): - os.remove(self.join(*comps)) - - def rmtree(self, path): - shutil.rmtree(path, ignore_errors=True) - - def splitext(self, path): - return os.path.splitext(path) - - def time(self): - return time.time() - - def write_text_file(self, path, contents): - return self._write(path, contents, mode='w') - - def write_binary_file(self, path, contents): - return self._write(path, contents, mode='wb') - - def _write(self, path, contents, mode): - with open(path, mode) as f: - f.write(contents) - - def fetch(self, url, data=None, headers=None): - headers = headers or {} - return urlopen(Request(url, data.encode('utf8'), headers)) - - def terminal_width(self): - """Returns 0 if the width cannot be determined.""" - try: - if sys.platform == 'win32': # pragma: win32 - # From http://code.activestate.com/recipes/ \ - # 440694-determine-size-of-console-window-on-windows/ - from ctypes import windll, create_string_buffer - - STDERR_HANDLE = -12 - handle = windll.kernel32.GetStdHandle(STDERR_HANDLE) - - SCREEN_BUFFER_INFO_SZ = 22 - buf = create_string_buffer(SCREEN_BUFFER_INFO_SZ) - - if windll.kernel32.GetConsoleScreenBufferInfo(handle, buf): - import struct - fields = struct.unpack("hhhhHhhhhhh", buf.raw) - left = fields[5] - right = fields[7] - - # Note that we return 1 less than the width since writing - # into the rightmost column automatically performs a - # line feed. - return right - left - return 0 - else: # pragma: no win32 - import fcntl - import struct - import termios - packed = fcntl.ioctl(self.stderr.fileno(), - termios.TIOCGWINSZ, '\0' * 8) - _, columns, _, _ = struct.unpack('HHHH', packed) - return columns - except Exception: - return 0 - - def _tap_output(self): - self.stdout = sys.stdout = _TeedStream(self.stdout) - self.stderr = sys.stderr = _TeedStream(self.stderr) - - def _untap_output(self): - assert isinstance(self.stdout, _TeedStream) - self.stdout = sys.stdout = self.stdout.stream - self.stderr = sys.stderr = self.stderr.stream - - def capture_output(self, divert=True): - self._tap_output() - self._orig_logging_handlers = self.logger.handlers - if self._orig_logging_handlers: - self.logger.handlers = [logging.StreamHandler(self.stderr)] - self.stdout.capture(divert) - self.stderr.capture(divert) - - def restore_output(self): - assert isinstance(self.stdout, _TeedStream) - out, err = (self.stdout.restore(), self.stderr.restore()) - self.logger.handlers = self._orig_logging_handlers - self._untap_output() - return out, err - - -class _TeedStream(io.StringIO): - - def __init__(self, stream): - super(_TeedStream, self).__init__() - self.stream = stream - self.capturing = False - self.diverting = False - - def write(self, msg, *args, **kwargs): - if self.capturing: - if (sys.version_info.major == 2 and - isinstance(msg, str)): # pragma: python2 - msg = unicode(msg) - super(_TeedStream, self).write(msg, *args, **kwargs) - if not self.diverting: - self.stream.write(msg, *args, **kwargs) - - def flush(self): - if self.capturing: - super(_TeedStream, self).flush() - if not self.diverting: - self.stream.flush() - - def capture(self, divert=True): - self.truncate(0) - self.capturing = True - self.diverting = divert - - def restore(self): - msg = self.getvalue() - self.truncate(0) - self.capturing = False - self.diverting = False - return msg
diff --git a/typ/json_results.py b/typ/json_results.py deleted file mode 100644 index f048d05..0000000 --- a/typ/json_results.py +++ /dev/null
@@ -1,212 +0,0 @@ -# Copyright 2014 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from collections import OrderedDict - -import json - - -class ResultType(object): - Pass = 'Pass' - Failure = 'Failure' - ImageOnlyFailure = 'ImageOnlyFailure' - Timeout = 'Timeout' - Crash = 'Crash' - Skip = 'Skip' - - values = (Pass, Failure, ImageOnlyFailure, Timeout, Crash, Skip) - - -class Result(object): - # too many instance attributes pylint: disable=R0902 - # too many arguments pylint: disable=R0913 - - def __init__(self, name, actual, started, took, worker, - expected=None, unexpected=False, - flaky=False, code=0, out='', err='', pid=0): - self.name = name - self.actual = actual - self.started = started - self.took = took - self.worker = worker - self.expected = expected or [ResultType.Pass] - self.unexpected = unexpected - self.flaky = flaky - self.code = code - self.out = out - self.err = err - self.pid = pid - - -class ResultSet(object): - - def __init__(self): - self.results = [] - - def add(self, result): - self.results.append(result) - - -TEST_SEPARATOR = '.' - - -def make_full_results(metadata, seconds_since_epoch, all_test_names, results): - """Convert the typ results to the Chromium JSON test result format. - - See http://www.chromium.org/developers/the-json-test-results-format - """ - - # We use OrderedDicts here so that the output is stable. - full_results = OrderedDict() - full_results['version'] = 3 - full_results['interrupted'] = False - full_results['path_delimiter'] = TEST_SEPARATOR - full_results['seconds_since_epoch'] = seconds_since_epoch - - for md in metadata: - key, val = md.split('=', 1) - full_results[key] = val - - passing_tests = _passing_test_names(results) - failed_tests = failed_test_names(results) - skipped_tests = set(all_test_names) - passing_tests - failed_tests - - full_results['num_failures_by_type'] = OrderedDict() - full_results['num_failures_by_type']['FAIL'] = len(failed_tests) - full_results['num_failures_by_type']['PASS'] = len(passing_tests) - full_results['num_failures_by_type']['SKIP'] = len(skipped_tests) - - full_results['tests'] = OrderedDict() - - for test_name in all_test_names: - value = _results_for_test(test_name, results) - if test_name in skipped_tests: - value['expected'] = 'SKIP' - else: - value['expected'] = 'PASS' - if value['actual'].endswith('FAIL'): - value['is_unexpected'] = True - _add_path_to_trie(full_results['tests'], test_name, value) - - return full_results - - -def make_upload_request(test_results_server, builder, master, testtype, - full_results): - if test_results_server.startswith('http'): - url = '%s/testfile/upload' % test_results_server - else: - url = 'https://%s/testfile/upload' % test_results_server - attrs = [('builder', builder), - ('master', master), - ('testtype', testtype)] - content_type, data = _encode_multipart_form_data(attrs, full_results) - return url, content_type, data - - -def exit_code_from_full_results(full_results): - return 1 if num_failures(full_results) else 0 - - -def num_failures(full_results): - return full_results['num_failures_by_type']['FAIL'] - - -def num_passes(full_results): - return full_results['num_failures_by_type']['PASS'] - - -def num_skips(full_results): - return full_results['num_failures_by_type']['SKIP'] - - -def failed_test_names(results): - names = set() - for r in results.results: - if r.actual == ResultType.Failure: - names.add(r.name) - elif ((r.actual == ResultType.Pass or r.actual == ResultType.Skip) - and r.name in names): - # This check indicates that a test failed, and then either passed - # or was skipped on a retry. It is somewhat counterintuitive - # that a test that failed and then skipped wouldn't be considered - # failed, but that's at least consistent with a test that is - # skipped every time. - names.remove(r.name) - return names - - -def _passing_test_names(results): - return set(r.name for r in results.results if r.actual == ResultType.Pass) - - -def _results_for_test(test_name, results): - value = OrderedDict() - actuals = [] - times = [] - for r in results.results: - if r.name == test_name: - if r.actual == ResultType.Failure: - actuals.append('FAIL') - elif r.actual == ResultType.Pass: - actuals.append('PASS') - elif r.actual == ResultType.Skip: - actuals.append('SKIP') - - # The time a test takes is a floating point number of seconds; - # if we were to encode this unmodified, then when we converted it - # to JSON it might make the file significantly larger. Instead - # we truncate the file to ten-thousandths of a second, which is - # probably more than good enough for most tests. - times.append(round(r.took, 4)) - if not actuals: # pragma: untested - actuals.append('SKIP') - value['actual'] = ' '.join(actuals) - value['times'] = times - return value - -def _add_path_to_trie(trie, path, value): - if TEST_SEPARATOR not in path: - trie[path] = value - return - directory, rest = path.split(TEST_SEPARATOR, 1) - if directory not in trie: - trie[directory] = {} - _add_path_to_trie(trie[directory], rest, value) - - -def _encode_multipart_form_data(attrs, test_results): - # Cloned from webkitpy/common/net/file_uploader.py - BOUNDARY = '-J-S-O-N-R-E-S-U-L-T-S---B-O-U-N-D-A-R-Y-' - CRLF = '\r\n' - lines = [] - - for key, value in attrs: - lines.append('--' + BOUNDARY) - lines.append('Content-Disposition: form-data; name="%s"' % key) - lines.append('') - lines.append(value) - - lines.append('--' + BOUNDARY) - lines.append('Content-Disposition: form-data; name="file"; ' - 'filename="full_results.json"') - lines.append('Content-Type: application/json') - lines.append('') - lines.append(json.dumps(test_results)) - - lines.append('--' + BOUNDARY + '--') - lines.append('') - body = CRLF.join(lines) - content_type = 'multipart/form-data; boundary=%s' % BOUNDARY - return content_type, body
diff --git a/typ/pool.py b/typ/pool.py deleted file mode 100644 index 6200a8a..0000000 --- a/typ/pool.py +++ /dev/null
@@ -1,204 +0,0 @@ -# Copyright 2014 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import copy -import multiprocessing -import pickle -import traceback - -from typ.host import Host - - -def make_pool(host, jobs, callback, context, pre_fn, post_fn): - _validate_args(context, pre_fn, post_fn) - if jobs > 1: - return _ProcessPool(host, jobs, callback, context, pre_fn, post_fn) - else: - return _AsyncPool(host, jobs, callback, context, pre_fn, post_fn) - - -class _MessageType(object): - Request = 'Request' - Response = 'Response' - Close = 'Close' - Done = 'Done' - Error = 'Error' - Interrupt = 'Interrupt' - - values = [Request, Response, Close, Done, Error, Interrupt] - - -def _validate_args(context, pre_fn, post_fn): - try: - _ = pickle.dumps(context) - except Exception as e: - raise ValueError('context passed to make_pool is not picklable: %s' - % str(e)) - try: - _ = pickle.dumps(pre_fn) - except pickle.PickleError: - raise ValueError('pre_fn passed to make_pool is not picklable') - try: - _ = pickle.dumps(post_fn) - except pickle.PickleError: - raise ValueError('post_fn passed to make_pool is not picklable') - - -class _ProcessPool(object): - - def __init__(self, host, jobs, callback, context, pre_fn, post_fn): - self.host = host - self.jobs = jobs - self.requests = multiprocessing.Queue() - self.responses = multiprocessing.Queue() - self.workers = [] - self.discarded_responses = [] - self.closed = False - self.erred = False - for worker_num in range(1, jobs + 1): - w = multiprocessing.Process(target=_loop, - args=(self.requests, self.responses, - host.for_mp(), worker_num, - callback, context, - pre_fn, post_fn)) - w.start() - self.workers.append(w) - - def send(self, msg): - self.requests.put((_MessageType.Request, msg)) - - def get(self): - msg_type, resp = self.responses.get() - if msg_type == _MessageType.Error: - self._handle_error(resp) - elif msg_type == _MessageType.Interrupt: - raise KeyboardInterrupt - assert msg_type == _MessageType.Response - return resp - - def close(self): - for _ in self.workers: - self.requests.put((_MessageType.Close, None)) - self.closed = True - - def join(self): - # TODO: one would think that we could close self.requests in close(), - # above, and close self.responses below, but if we do, we get - # weird tracebacks in the daemon threads multiprocessing starts up. - # Instead, we have to hack the innards of multiprocessing. It - # seems likely that there's a bug somewhere, either in this module or - # in multiprocessing. - # pylint: disable=protected-access - if self.host.is_python3: # pragma: python3 - multiprocessing.queues.is_exiting = lambda: True - else: # pragma: python2 - multiprocessing.util._exiting = True - - if not self.closed: - # We must be aborting; terminate the workers rather than - # shutting down cleanly. - for w in self.workers: - w.terminate() - w.join() - return [] - - final_responses = [] - error = None - interrupted = None - for w in self.workers: - while True: - msg_type, resp = self.responses.get() - if msg_type == _MessageType.Error: - error = resp - break - if msg_type == _MessageType.Interrupt: - interrupted = True - break - if msg_type == _MessageType.Done: - final_responses.append(resp[1]) - break - self.discarded_responses.append(resp) - - for w in self.workers: - w.join() - - # TODO: See comment above at the beginning of the function for - # why this is commented out. - # self.responses.close() - - if error: - self._handle_error(error) - if interrupted: - raise KeyboardInterrupt - return final_responses - - def _handle_error(self, msg): - worker_num, tb = msg - self.erred = True - raise Exception("Error from worker %d (traceback follows):\n%s" % - (worker_num, tb)) - - -# 'Too many arguments' pylint: disable=R0913 - -def _loop(requests, responses, host, worker_num, - callback, context, pre_fn, post_fn, should_loop=True): - host = host or Host() - try: - context_after_pre = pre_fn(host, worker_num, context) - keep_looping = True - while keep_looping: - message_type, args = requests.get(block=True) - if message_type == _MessageType.Close: - responses.put((_MessageType.Done, - (worker_num, post_fn(context_after_pre)))) - break - assert message_type == _MessageType.Request - resp = callback(context_after_pre, args) - responses.put((_MessageType.Response, resp)) - keep_looping = should_loop - except KeyboardInterrupt as e: - responses.put((_MessageType.Interrupt, (worker_num, str(e)))) - except Exception as e: - responses.put((_MessageType.Error, - (worker_num, traceback.format_exc(e)))) - - -class _AsyncPool(object): - - def __init__(self, host, jobs, callback, context, pre_fn, post_fn): - self.host = host or Host() - self.jobs = jobs - self.callback = callback - self.context = copy.deepcopy(context) - self.msgs = [] - self.closed = False - self.post_fn = post_fn - self.context_after_pre = pre_fn(self.host, 1, self.context) - self.final_context = None - - def send(self, msg): - self.msgs.append(msg) - - def get(self): - return self.callback(self.context_after_pre, self.msgs.pop(0)) - - def close(self): - self.closed = True - self.final_context = self.post_fn(self.context_after_pre) - - def join(self): - if not self.closed: - self.close() - return [self.final_context]
diff --git a/typ/printer.py b/typ/printer.py deleted file mode 100644 index 473537b..0000000 --- a/typ/printer.py +++ /dev/null
@@ -1,40 +0,0 @@ -# Copyright 2014 Dirk Pranke. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -class Printer(object): - - def __init__(self, print_, should_overwrite, cols): - self.print_ = print_ - self.should_overwrite = should_overwrite - self.cols = cols - self.last_line = '' - - def flush(self): - if self.last_line: - self.print_('') - self.last_line = '' - - def update(self, msg, elide=True): - msg_len = len(msg) - if elide and self.cols and msg_len > self.cols - 5: - new_len = int((self.cols - 5) / 2) - msg = msg[:new_len] + '...' + msg[-new_len:] - if self.should_overwrite and self.last_line: - self.print_('\r' + ' ' * len(self.last_line) + '\r', end='') - elif self.last_line: - self.print_('') - self.print_(msg, end='') - last_nl = msg.rfind('\n') - self.last_line = msg[last_nl + 1:]
diff --git a/typ/runner.py b/typ/runner.py deleted file mode 100644 index 62d92c7..0000000 --- a/typ/runner.py +++ /dev/null
@@ -1,993 +0,0 @@ -# Copyright 2014 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import fnmatch -import importlib -import inspect -import json -import os -import pdb -import sys -import unittest -import traceback - -from collections import OrderedDict - -# This ensures that absolute imports of typ modules will work when -# running typ/runner.py as a script even if typ is not installed. -# We need this entry in addition to the one in __main__.py to ensure -# that typ/runner.py works when invoked via subprocess on windows in -# _spawn_main(). -path_to_file = os.path.realpath(__file__) -if path_to_file.endswith('.pyc'): # pragma: no cover - path_to_file = path_to_file[:-1] -dir_above_typ = os.path.dirname(os.path.dirname(path_to_file)) -if dir_above_typ not in sys.path: # pragma: no cover - sys.path.append(dir_above_typ) - - -from typ import json_results -from typ.arg_parser import ArgumentParser -from typ.host import Host -from typ.pool import make_pool -from typ.stats import Stats -from typ.printer import Printer -from typ.test_case import TestCase as TypTestCase -from typ.version import VERSION - - -Result = json_results.Result -ResultSet = json_results.ResultSet -ResultType = json_results.ResultType - - -def main(argv=None, host=None, win_multiprocessing=None, **defaults): - host = host or Host() - runner = Runner(host=host) - if win_multiprocessing is not None: - runner.win_multiprocessing = win_multiprocessing - return runner.main(argv, **defaults) - - -class TestInput(object): - - def __init__(self, name, msg='', timeout=None, expected=None): - self.name = name - self.msg = msg - self.timeout = timeout - self.expected = expected - - -class TestSet(object): - - def __init__(self, parallel_tests=None, isolated_tests=None, - tests_to_skip=None): - - def promote(tests): - tests = tests or [] - return [test if isinstance(test, TestInput) else TestInput(test) - for test in tests] - - self.parallel_tests = promote(parallel_tests) - self.isolated_tests = promote(isolated_tests) - self.tests_to_skip = promote(tests_to_skip) - - -class WinMultiprocessing(object): - ignore = 'ignore' - importable = 'importable' - spawn = 'spawn' - - values = [ignore, importable, spawn] - - -class _AddTestsError(Exception): - pass - - -class Runner(object): - - def __init__(self, host=None): - self.args = None - self.classifier = None - self.cov = None - self.context = None - self.coverage_source = None - self.host = host or Host() - self.loader = unittest.loader.TestLoader() - self.printer = None - self.setup_fn = None - self.stats = None - self.teardown_fn = None - self.top_level_dir = None - self.top_level_dirs = [] - self.win_multiprocessing = WinMultiprocessing.spawn - self.final_responses = [] - - # initialize self.args to the defaults. - parser = ArgumentParser(self.host) - self.parse_args(parser, []) - - def main(self, argv=None, **defaults): - parser = ArgumentParser(self.host) - self.parse_args(parser, argv, **defaults) - if parser.exit_status is not None: - return parser.exit_status - - try: - ret, _, _ = self.run() - return ret - except KeyboardInterrupt: - self.print_("interrupted, exiting", stream=self.host.stderr) - return 130 - - def parse_args(self, parser, argv, **defaults): - for attrname in defaults: - if not hasattr(self.args, attrname): - parser.error("Unknown default argument name '%s'" % attrname, - bailout=False) - return - parser.set_defaults(**defaults) - self.args = parser.parse_args(args=argv) - if parser.exit_status is not None: - return - - def print_(self, msg='', end='\n', stream=None): - self.host.print_(msg, end, stream=stream) - - def run(self, test_set=None): - - ret = 0 - h = self.host - - if self.args.version: - self.print_(VERSION) - return ret, None, None - - should_spawn = self._check_win_multiprocessing() - if should_spawn: - return self._spawn(test_set) - - ret = self._set_up_runner() - if ret: - return ret, None, None - - find_start = h.time() - if self.cov: # pragma: no cover - self.cov.erase() - self.cov.start() - - full_results = None - result_set = ResultSet() - - if not test_set: - ret, test_set = self.find_tests(self.args) - find_end = h.time() - - if not ret: - ret, full_results = self._run_tests(result_set, test_set) - - if self.cov: # pragma: no cover - self.cov.stop() - self.cov.save() - test_end = h.time() - - trace = self._trace_from_results(result_set) - if full_results: - self._summarize(full_results) - self._write(self.args.write_full_results_to, full_results) - upload_ret = self._upload(full_results) - if not ret: - ret = upload_ret - reporting_end = h.time() - self._add_trace_event(trace, 'run', find_start, reporting_end) - self._add_trace_event(trace, 'discovery', find_start, find_end) - self._add_trace_event(trace, 'testing', find_end, test_end) - self._add_trace_event(trace, 'reporting', test_end, reporting_end) - self._write(self.args.write_trace_to, trace) - self.report_coverage() - else: - upload_ret = 0 - - return ret, full_results, trace - - def _check_win_multiprocessing(self): - wmp = self.win_multiprocessing - - ignore, importable, spawn = WinMultiprocessing.values - - if wmp not in WinMultiprocessing.values: - raise ValueError('illegal value %s for win_multiprocessing' % - wmp) - - h = self.host - if wmp == ignore and h.platform == 'win32': # pragma: win32 - raise ValueError('Cannot use WinMultiprocessing.ignore for ' - 'win_multiprocessing when actually running ' - 'on Windows.') - - if wmp == ignore or self.args.jobs == 1: - return False - - if wmp == importable: - if self._main_is_importable(): - return False - raise ValueError('The __main__ module (%s) ' # pragma: no cover - 'may not be importable' % - sys.modules['__main__'].__file__) - - assert wmp == spawn - return True - - def _main_is_importable(self): # pragma: untested - path = sys.modules['__main__'].__file__ - if not path: - return False - if path.endswith('.pyc'): - path = path[:-1] - if not path.endswith('.py'): - return False - if path.endswith('__main__.py'): - # main modules are not directly importable. - return False - - path = self.host.realpath(path) - for d in sys.path: - if path.startswith(self.host.realpath(d)): - return True - return False # pragma: no cover - - def _spawn(self, test_set): - # TODO: Handle picklable hooks, rather than requiring them to be None. - assert self.classifier is None - assert self.context is None - assert self.setup_fn is None - assert self.teardown_fn is None - assert test_set is None - h = self.host - - if self.args.write_trace_to: # pragma: untested - should_delete_trace = False - else: - should_delete_trace = True - fp = h.mktempfile(delete=False) - fp.close() - self.args.write_trace_to = fp.name - - if self.args.write_full_results_to: # pragma: untested - should_delete_results = False - else: - should_delete_results = True - fp = h.mktempfile(delete=False) - fp.close() - self.args.write_full_results_to = fp.name - - argv = ArgumentParser(h).argv_from_args(self.args) - ret = h.call_inline([h.python_interpreter, path_to_file] + argv) - - trace = self._read_and_delete(self.args.write_trace_to, - should_delete_trace) - full_results = self._read_and_delete(self.args.write_full_results_to, - should_delete_results) - return ret, full_results, trace - - def _set_up_runner(self): - h = self.host - args = self.args - - self.stats = Stats(args.status_format, h.time, args.jobs) - self.printer = Printer( - self.print_, args.overwrite, args.terminal_width) - - if self.args.top_level_dirs and self.args.top_level_dir: - self.print_( - 'Cannot specify both --top-level-dir and --top-level-dirs', - stream=h.stderr) - return 1 - - self.top_level_dirs = args.top_level_dirs - if not self.top_level_dirs and args.top_level_dir: - self.top_level_dirs = [args.top_level_dir] - - if not self.top_level_dirs: - for test in [t for t in args.tests if h.exists(t)]: - if h.isdir(test): - top_dir = test - else: - top_dir = h.dirname(test) - while h.exists(top_dir, '__init__.py'): - top_dir = h.dirname(top_dir) - top_dir = h.realpath(top_dir) - if not top_dir in self.top_level_dirs: - self.top_level_dirs.append(top_dir) - if not self.top_level_dirs: - top_dir = h.getcwd() - while h.exists(top_dir, '__init__.py'): - top_dir = h.dirname(top_dir) - top_dir = h.realpath(top_dir) - self.top_level_dirs.append(top_dir) - - if not self.top_level_dir and self.top_level_dirs: - self.top_level_dir = self.top_level_dirs[0] - - for path in self.top_level_dirs: - h.add_to_path(path) - - for path in args.path: - h.add_to_path(path) - - if args.coverage: # pragma: no cover - try: - import coverage - except ImportError: - return 1 - - source = self.args.coverage_source - if not source: - source = self.top_level_dirs + self.args.path - self.coverage_source = source - self.cov = coverage.coverage(source=self.coverage_source, - data_suffix=True) - self.cov.erase() - return 0 - - def find_tests(self, args): - test_set = TestSet() - - orig_skip = unittest.skip - orig_skip_if = unittest.skipIf - if args.all: - unittest.skip = lambda reason: lambda x: x - unittest.skipIf = lambda condition, reason: lambda x: x - - try: - names = self._name_list_from_args(args) - classifier = self.classifier or _default_classifier(args) - - for name in names: - try: - self._add_tests_to_set(test_set, args.suffixes, - self.top_level_dirs, classifier, - name) - except (AttributeError, ImportError, SyntaxError) as e: - ex_str = traceback.format_exc() - self.print_('Failed to load "%s" in find_tests: %s' % - (name, e)) - self.print_(' %s' % - '\n '.join(ex_str.splitlines())) - self.print_(ex_str) - return 1, None - except _AddTestsError as e: - self.print_(str(e)) - return 1, None - - # TODO: Add support for discovering setupProcess/teardownProcess? - - shard_index = args.shard_index - total_shards = args.total_shards - assert total_shards >= 1 - assert shard_index >= 0 and shard_index < total_shards, ( - 'shard_index (%d) must be >= 0 and < total_shards (%d)' % - (shard_index, total_shards)) - test_set.parallel_tests = _sort_inputs( - test_set.parallel_tests)[shard_index::total_shards] - test_set.isolated_tests = _sort_inputs( - test_set.isolated_tests)[shard_index::total_shards] - test_set.tests_to_skip = _sort_inputs( - test_set.tests_to_skip)[shard_index::total_shards] - return 0, test_set - finally: - unittest.skip = orig_skip - unittest.skipIf = orig_skip_if - - def _name_list_from_args(self, args): - if args.tests: - names = args.tests - elif args.file_list: - if args.file_list == '-': - s = self.host.stdin.read() - else: - s = self.host.read_text_file(args.file_list) - names = [line.strip() for line in s.splitlines()] - else: - names = self.top_level_dirs - return names - - def _add_tests_to_set(self, test_set, suffixes, top_level_dirs, classifier, - name): - h = self.host - loader = self.loader - add_tests = _test_adder(test_set, classifier) - - found = set() - for d in top_level_dirs: - if h.isfile(name): - rpath = h.relpath(name, d) - if rpath.startswith('..'): - continue - if rpath.endswith('.py'): - rpath = rpath[:-3] - module = rpath.replace(h.sep, '.') - if module not in found: - found.add(module) - add_tests(loader.loadTestsFromName(module)) - elif h.isdir(name): - rpath = h.relpath(name, d) - if rpath.startswith('..'): - continue - for suffix in suffixes: - if not name in found: - found.add(name + '/' + suffix) - add_tests(loader.discover(name, suffix, d)) - else: - possible_dir = name.replace('.', h.sep) - if h.isdir(d, possible_dir): - for suffix in suffixes: - path = h.join(d, possible_dir) - if not path in found: - found.add(path + '/' + suffix) - suite = loader.discover(path, suffix, d) - add_tests(suite) - elif not name in found: - found.add(name) - add_tests(loader.loadTestsFromName(name)) - - # pylint: disable=no-member - if hasattr(loader, 'errors') and loader.errors: # pragma: python3 - # In Python3's version of unittest, loader failures get converted - # into failed test cases, rather than raising exceptions. However, - # the errors also get recorded so you can err out immediately. - raise ImportError(loader.errors) - - def _run_tests(self, result_set, test_set): - h = self.host - - all_tests = [ti.name for ti in - _sort_inputs(test_set.parallel_tests + - test_set.isolated_tests + - test_set.tests_to_skip)] - - if self.args.list_only: - self.print_('\n'.join(all_tests)) - return 0, None - - self._run_one_set(self.stats, result_set, test_set) - - failed_tests = sorted(json_results.failed_test_names(result_set)) - retry_limit = self.args.retry_limit - - while retry_limit and failed_tests: - if retry_limit == self.args.retry_limit: - self.flush() - self.args.overwrite = False - self.printer.should_overwrite = False - self.args.verbose = min(self.args.verbose, 1) - - self.print_('') - self.print_('Retrying failed tests (attempt #%d of %d)...' % - (self.args.retry_limit - retry_limit + 1, - self.args.retry_limit)) - self.print_('') - - stats = Stats(self.args.status_format, h.time, 1) - stats.total = len(failed_tests) - tests_to_retry = TestSet(isolated_tests=list(failed_tests)) - retry_set = ResultSet() - self._run_one_set(stats, retry_set, tests_to_retry) - result_set.results.extend(retry_set.results) - failed_tests = json_results.failed_test_names(retry_set) - retry_limit -= 1 - - if retry_limit != self.args.retry_limit: - self.print_('') - - full_results = json_results.make_full_results(self.args.metadata, - int(h.time()), - all_tests, result_set) - - return (json_results.exit_code_from_full_results(full_results), - full_results) - - def _run_one_set(self, stats, result_set, test_set): - stats.total = (len(test_set.parallel_tests) + - len(test_set.isolated_tests) + - len(test_set.tests_to_skip)) - self._skip_tests(stats, result_set, test_set.tests_to_skip) - self._run_list(stats, result_set, - test_set.parallel_tests, self.args.jobs) - self._run_list(stats, result_set, - test_set.isolated_tests, 1) - - def _skip_tests(self, stats, result_set, tests_to_skip): - for test_input in tests_to_skip: - last = self.host.time() - stats.started += 1 - self._print_test_started(stats, test_input) - now = self.host.time() - result = Result(test_input.name, actual=ResultType.Skip, - started=last, took=(now - last), worker=0, - expected=[ResultType.Skip], - out=test_input.msg) - result_set.add(result) - stats.finished += 1 - self._print_test_finished(stats, result) - - def _run_list(self, stats, result_set, test_inputs, jobs): - h = self.host - running_jobs = set() - - jobs = min(len(test_inputs), jobs) - if not jobs: - return - - child = _Child(self) - pool = make_pool(h, jobs, _run_one_test, child, - _setup_process, _teardown_process) - try: - while test_inputs or running_jobs: - while test_inputs and (len(running_jobs) < self.args.jobs): - test_input = test_inputs.pop(0) - stats.started += 1 - pool.send(test_input) - running_jobs.add(test_input.name) - self._print_test_started(stats, test_input) - - result = pool.get() - running_jobs.remove(result.name) - result_set.add(result) - stats.finished += 1 - self._print_test_finished(stats, result) - pool.close() - finally: - self.final_responses.extend(pool.join()) - - def _print_test_started(self, stats, test_input): - if self.args.quiet: - # Print nothing when --quiet was passed. - return - - # If -vvv was passed, print when the test is queued to be run. - # We don't actually know when the test picked up to run, because - # that is handled by the child process (where we can't easily - # print things). Otherwise, only print when the test is started - # if we know we can overwrite the line, so that we do not - # get multiple lines of output as noise (in -vvv, we actually want - # the noise). - test_start_msg = stats.format() + test_input.name - if self.args.verbose > 2: - self.update(test_start_msg + ' queued', elide=False) - if self.args.overwrite: - self.update(test_start_msg, elide=(not self.args.verbose)) - - def _print_test_finished(self, stats, result): - stats.add_time() - - assert result.actual in [ResultType.Failure, ResultType.Skip, - ResultType.Pass] - if result.actual == ResultType.Failure: - result_str = ' failed' - elif result.actual == ResultType.Skip: - result_str = ' was skipped' - elif result.actual == ResultType.Pass: - result_str = ' passed' - - if result.unexpected: - result_str += ' unexpectedly' - if self.args.timing: - timing_str = ' %.4fs' % result.took - else: - timing_str = '' - suffix = '%s%s' % (result_str, timing_str) - out = result.out - err = result.err - if result.code: - if out or err: - suffix += ':\n' - self.update(stats.format() + result.name + suffix, elide=False) - for l in out.splitlines(): - self.print_(' %s' % l) - for l in err.splitlines(): - self.print_(' %s' % l) - elif not self.args.quiet: - if self.args.verbose > 1 and (out or err): - suffix += ':\n' - self.update(stats.format() + result.name + suffix, - elide=(not self.args.verbose)) - if self.args.verbose > 1: - for l in out.splitlines(): - self.print_(' %s' % l) - for l in err.splitlines(): - self.print_(' %s' % l) - if self.args.verbose: - self.flush() - - def update(self, msg, elide): - self.printer.update(msg, elide) - - def flush(self): - self.printer.flush() - - def _summarize(self, full_results): - num_passes = json_results.num_passes(full_results) - num_failures = json_results.num_failures(full_results) - num_skips = json_results.num_skips(full_results) - - if self.args.quiet and num_failures == 0: - return - - if self.args.timing: - timing_clause = ' in %.1fs' % (self.host.time() - - self.stats.started_time) - else: - timing_clause = '' - self.update('%d test%s passed%s, %d skipped, %d failure%s.' % - (num_passes, - '' if num_passes == 1 else 's', - timing_clause, - num_skips, - num_failures, - '' if num_failures == 1 else 's'), elide=False) - self.print_() - - def _read_and_delete(self, path, delete): - h = self.host - obj = None - if h.exists(path): - contents = h.read_text_file(path) - if contents: - obj = json.loads(contents) - if delete: - h.remove(path) - return obj - - def _write(self, path, obj): - if path: - self.host.write_text_file(path, json.dumps(obj, indent=2) + '\n') - - def _upload(self, full_results): - h = self.host - if not self.args.test_results_server: - return 0 - - url, content_type, data = json_results.make_upload_request( - self.args.test_results_server, self.args.builder_name, - self.args.master_name, self.args.test_type, - full_results) - - try: - h.fetch(url, data, {'Content-Type': content_type}) - return 0 - except Exception as e: - h.print_('Uploading the JSON results raised "%s"' % str(e)) - return 1 - - def report_coverage(self): - if self.args.coverage: # pragma: no cover - self.host.print_() - import coverage - cov = coverage.coverage(data_suffix=True) - cov.combine() - cov.report(show_missing=self.args.coverage_show_missing, - omit=self.args.coverage_omit) - if self.args.coverage_annotate: - cov.annotate(omit=self.args.coverage_omit) - - def _add_trace_event(self, trace, name, start, end): - event = { - 'name': name, - 'ts': int((start - self.stats.started_time) * 1000000), - 'dur': int((end - start) * 1000000), - 'ph': 'X', - 'pid': self.host.getpid(), - 'tid': 0, - } - trace['traceEvents'].append(event) - - def _trace_from_results(self, result_set): - trace = OrderedDict() - trace['traceEvents'] = [] - trace['otherData'] = {} - for m in self.args.metadata: - k, v = m.split('=') - trace['otherData'][k] = v - - for result in result_set.results: - started = int((result.started - self.stats.started_time) * 1000000) - took = int(result.took * 1000000) - event = OrderedDict() - event['name'] = result.name - event['dur'] = took - event['ts'] = started - event['ph'] = 'X' # "Complete" events - event['pid'] = result.pid - event['tid'] = result.worker - - args = OrderedDict() - args['expected'] = sorted(str(r) for r in result.expected) - args['actual'] = str(result.actual) - args['out'] = result.out - args['err'] = result.err - args['code'] = result.code - args['unexpected'] = result.unexpected - args['flaky'] = result.flaky - event['args'] = args - - trace['traceEvents'].append(event) - return trace - - -def _matches(name, globs): - return any(fnmatch.fnmatch(name, glob) for glob in globs) - - -def _default_classifier(args): - def default_classifier(test_set, test): - name = test.id() - if not args.all and _matches(name, args.skip): - test_set.tests_to_skip.append(TestInput(name, - 'skipped by request')) - elif _matches(name, args.isolate): - test_set.isolated_tests.append(TestInput(name)) - else: - test_set.parallel_tests.append(TestInput(name)) - return default_classifier - - -def _test_adder(test_set, classifier): - def add_tests(obj): - if isinstance(obj, unittest.suite.TestSuite): - for el in obj: - add_tests(el) - elif (obj.id().startswith('unittest.loader.LoadTestsFailure') or - obj.id().startswith('unittest.loader.ModuleImportFailure')): - # Access to protected member pylint: disable=W0212 - module_name = obj._testMethodName - try: - method = getattr(obj, obj._testMethodName) - method() - except Exception as e: - if 'LoadTests' in obj.id(): - raise _AddTestsError('%s.load_tests() failed: %s' - % (module_name, str(e))) - else: - raise _AddTestsError(str(e)) - else: - assert isinstance(obj, unittest.TestCase) - classifier(test_set, obj) - return add_tests - - -class _Child(object): - - def __init__(self, parent): - self.host = None - self.worker_num = None - self.all = parent.args.all - self.debugger = parent.args.debugger - self.coverage = parent.args.coverage and parent.args.jobs > 1 - self.coverage_source = parent.coverage_source - self.dry_run = parent.args.dry_run - self.loader = parent.loader - self.passthrough = parent.args.passthrough - self.context = parent.context - self.setup_fn = parent.setup_fn - self.teardown_fn = parent.teardown_fn - self.context_after_setup = None - self.top_level_dir = parent.top_level_dir - self.top_level_dirs = parent.top_level_dirs - self.loaded_suites = {} - self.cov = None - - -def _setup_process(host, worker_num, child): - child.host = host - child.worker_num = worker_num - # pylint: disable=protected-access - - if child.coverage: # pragma: no cover - import coverage - child.cov = coverage.coverage(source=child.coverage_source, - data_suffix=True) - child.cov._warn_no_data = False - child.cov.start() - - if child.setup_fn: - child.context_after_setup = child.setup_fn(child, child.context) - else: - child.context_after_setup = child.context - return child - - -def _teardown_process(child): - res = None - e = None - if child.teardown_fn: - try: - res = child.teardown_fn(child, child.context_after_setup) - except Exception as e: - pass - - if child.cov: # pragma: no cover - child.cov.stop() - child.cov.save() - - return (child.worker_num, res, e) - - -def _run_one_test(child, test_input): - h = child.host - pid = h.getpid() - test_name = test_input.name - - start = h.time() - - # It is important to capture the output before loading the test - # to ensure that - # 1) the loader doesn't logs something we don't captured - # 2) neither the loader nor the test case grab a reference to the - # uncaptured stdout or stderr that later is used when the test is run. - # This comes up when using the FakeTestLoader and testing typ itself, - # but could come up when testing non-typ code as well. - h.capture_output(divert=not child.passthrough) - - ex_str = '' - try: - orig_skip = unittest.skip - orig_skip_if = unittest.skipIf - if child.all: - unittest.skip = lambda reason: lambda x: x - unittest.skipIf = lambda condition, reason: lambda x: x - - try: - suite = child.loader.loadTestsFromName(test_name) - except Exception as e: - ex_str = ('loadTestsFromName("%s") failed: %s\n%s\n' % - (test_name, e, traceback.format_exc())) - try: - suite = _load_via_load_tests(child, test_name) - ex_str += ('\nload_via_load_tests(\"%s\") returned %d tests\n' % - (test_name, len(list(suite)))) - except Exception as e: # pragma: untested - suite = [] - ex_str += ('\nload_via_load_tests("%s") failed: %s\n%s\n' % - (test_name, e, traceback.format_exc())) - finally: - unittest.skip = orig_skip - unittest.skipIf = orig_skip_if - - tests = list(suite) - if len(tests) != 1: - err = 'Failed to load "%s" in run_one_test' % test_name - if ex_str: # pragma: untested - err += '\n ' + '\n '.join(ex_str.splitlines()) - - h.restore_output() - return Result(test_name, ResultType.Failure, start, 0, - child.worker_num, unexpected=True, code=1, - err=err, pid=pid) - - test_case = tests[0] - if isinstance(test_case, TypTestCase): - test_case.child = child - test_case.context = child.context_after_setup - - test_result = unittest.TestResult() - out = '' - err = '' - try: - if child.dry_run: - pass - elif child.debugger: # pragma: no cover - _run_under_debugger(h, test_case, suite, test_result) - else: - suite.run(test_result) - finally: - out, err = h.restore_output() - - took = h.time() - start - return _result_from_test_result(test_result, test_name, start, took, out, - err, child.worker_num, pid) - - -def _run_under_debugger(host, test_case, suite, - test_result): # pragma: no cover - # Access to protected member pylint: disable=W0212 - test_func = getattr(test_case, test_case._testMethodName) - fname = inspect.getsourcefile(test_func) - lineno = inspect.getsourcelines(test_func)[1] + 1 - dbg = pdb.Pdb(stdout=host.stdout.stream) - dbg.set_break(fname, lineno) - dbg.runcall(suite.run, test_result) - - -def _result_from_test_result(test_result, test_name, start, took, out, err, - worker_num, pid): - flaky = False - if test_result.failures: - expected = [ResultType.Pass] - actual = ResultType.Failure - code = 1 - unexpected = True - err = err + test_result.failures[0][1] - elif test_result.errors: - expected = [ResultType.Pass] - actual = ResultType.Failure - code = 1 - unexpected = True - err = err + test_result.errors[0][1] - elif test_result.skipped: - expected = [ResultType.Skip] - actual = ResultType.Skip - err = err + test_result.skipped[0][1] - code = 0 - unexpected = False - elif test_result.expectedFailures: - expected = [ResultType.Failure] - actual = ResultType.Failure - code = 1 - err = err + test_result.expectedFailures[0][1] - unexpected = False - elif test_result.unexpectedSuccesses: - expected = [ResultType.Failure] - actual = ResultType.Pass - code = 0 - unexpected = True - else: - expected = [ResultType.Pass] - actual = ResultType.Pass - code = 0 - unexpected = False - - return Result(test_name, actual, start, took, worker_num, - expected, unexpected, flaky, code, out, err, pid) - - -def _load_via_load_tests(child, test_name): - # If we couldn't import a test directly, the test may be only loadable - # via unittest's load_tests protocol. See if we can find a load_tests - # entry point that will work for this test. - loader = child.loader - comps = test_name.split('.') - new_suite = unittest.TestSuite() - - while comps: - name = '.'.join(comps) - module = None - suite = None - if name not in child.loaded_suites: - try: - module = importlib.import_module(name) - except ImportError: - pass - if module: - suite = loader.loadTestsFromModule(module) - child.loaded_suites[name] = suite - suite = child.loaded_suites[name] - if suite: - for test_case in suite: - assert isinstance(test_case, unittest.TestCase) - if test_case.id() == test_name: # pragma: untested - new_suite.addTest(test_case) - break - comps.pop() - return new_suite - - -def _sort_inputs(inps): - return sorted(inps, key=lambda inp: inp.name) - - -if __name__ == '__main__': # pragma: no cover - sys.modules['__main__'].__file__ = path_to_file - sys.exit(main(win_multiprocessing=WinMultiprocessing.importable))
diff --git a/typ/stats.py b/typ/stats.py deleted file mode 100644 index 0cd408d..0000000 --- a/typ/stats.py +++ /dev/null
@@ -1,83 +0,0 @@ -# Copyright 2014 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -class Stats(object): - - def __init__(self, status_format, time_fn, size): - self.fmt = status_format - self.finished = 0 - self.started = 0 - self.total = 0 - self.started_time = time_fn() - self._times = [] - self._size = size - self._time = time_fn - self._times.append(self.started_time) - - def add_time(self): - if len(self._times) > self._size: - self._times.pop(0) - self._times.append(self._time()) - - def format(self): - # Too many statements pylint: disable=R0915 - out = '' - p = 0 - end = len(self.fmt) - while p < end: - c = self.fmt[p] - if c == '%' and p < end - 1: - cn = self.fmt[p + 1] - if cn == 'c': - elapsed = self._times[-1] - self._times[0] - if elapsed > 0: - out += '%5.1f' % ((len(self._times) - 1) / elapsed) - else: - out += '-' - elif cn == 'e': - now = self._time() - assert now >= self.started_time - out += '%-5.3f' % (now - self.started_time) - elif cn == 'f': - out += str(self.finished) - elif cn == 'o': - now = self._time() - if now > self.started_time: - out += '%5.1f' % (self.finished * 1.0 / - (now - self.started_time)) - else: - out += '-' - elif cn == 'p': - if self.total: - out += '%5.1f' % (self.started * 100.0 / self.total) - else: - out += '-' - elif cn == 'r': - out += str(self.started - self.finished) - elif cn == 's': - out += str(self.started) - elif cn == 't': - out += str(self.total) - elif cn == 'u': - out += str(self.total - self.finished) - elif cn == '%': - out += '%' - else: - out += c + cn - p += 2 - else: - out += c - p += 1 - return out
diff --git a/typ/test_case.py b/typ/test_case.py deleted file mode 100644 index d709a57..0000000 --- a/typ/test_case.py +++ /dev/null
@@ -1,127 +0,0 @@ -# Copyright 2014 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import fnmatch -import shlex -import unittest - - -def convert_newlines(msg): - """A routine that mimics Python's universal_newlines conversion.""" - return msg.replace('\r\n', '\n').replace('\r', '\n') - - -class TestCase(unittest.TestCase): - child = None - context = None - maxDiff = 80 * 66 - - -class MainTestCase(TestCase): - prog = None - files_to_ignore = [] - - def _write_files(self, host, files): - for path, contents in list(files.items()): - dirname = host.dirname(path) - if dirname: - host.maybe_mkdir(dirname) - host.write_text_file(path, contents) - - def _read_files(self, host, tmpdir): - out_files = {} - for f in host.files_under(tmpdir): - if any(fnmatch.fnmatch(f, pat) for pat in self.files_to_ignore): - continue - key = f.replace(host.sep, '/') - out_files[key] = host.read_text_file(tmpdir, f) - return out_files - - def assert_files(self, expected_files, actual_files, files_to_ignore=None): - files_to_ignore = files_to_ignore or [] - for k, v in expected_files.items(): - self.assertMultiLineEqual(expected_files[k], v) - interesting_files = set(actual_files.keys()).difference( - files_to_ignore) - self.assertEqual(interesting_files, set(expected_files.keys())) - - def make_host(self): - # If we are ever called by unittest directly, and not through typ, - # this will probably fail. - assert self.child - return self.child.host - - def call(self, host, argv, stdin, env): - return host.call(argv, stdin=stdin, env=env) - - def check(self, cmd=None, stdin=None, env=None, aenv=None, files=None, - prog=None, cwd=None, host=None, - ret=None, out=None, rout=None, err=None, rerr=None, - exp_files=None, - files_to_ignore=None, universal_newlines=True): - # Too many arguments pylint: disable=R0913 - prog = prog or self.prog or [] - host = host or self.make_host() - argv = shlex.split(cmd) if isinstance(cmd, str) else cmd or [] - - tmpdir = None - orig_wd = host.getcwd() - try: - tmpdir = host.mkdtemp() - host.chdir(tmpdir) - if files: - self._write_files(host, files) - if cwd: - host.chdir(cwd) - if aenv: - env = host.env.copy() - env.update(aenv) - - if self.child.debugger: # pragma: no cover - host.print_('') - host.print_('cd %s' % tmpdir, stream=host.stdout.stream) - host.print_(' '.join(prog + argv), stream=host.stdout.stream) - host.print_('') - import pdb - dbg = pdb.Pdb(stdout=host.stdout.stream) - dbg.set_trace() - - result = self.call(host, prog + argv, stdin=stdin, env=env) - - actual_ret, actual_out, actual_err = result - actual_files = self._read_files(host, tmpdir) - finally: - host.chdir(orig_wd) - if tmpdir: - host.rmtree(tmpdir) - - if universal_newlines: - actual_out = convert_newlines(actual_out) - if universal_newlines: - actual_err = convert_newlines(actual_err) - - if ret is not None: - self.assertEqual(ret, actual_ret) - if out is not None: - self.assertMultiLineEqual(out, actual_out) - if rout is not None: - self.assertRegexpMatches(actual_out, rout) - if err is not None: - self.assertMultiLineEqual(err, actual_err) - if rerr is not None: - self.assertRegexpMatches(actual_err, rerr) - if exp_files: - self.assert_files(exp_files, actual_files, files_to_ignore) - - return actual_ret, actual_out, actual_err, actual_files
diff --git a/typ/tests/__init__.py b/typ/tests/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/typ/tests/__init__.py +++ /dev/null
diff --git a/typ/tests/arg_parser_test.py b/typ/tests/arg_parser_test.py deleted file mode 100644 index 3d443f6..0000000 --- a/typ/tests/arg_parser_test.py +++ /dev/null
@@ -1,83 +0,0 @@ -# Copyright 2014 Dirk Pranke. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import optparse -import unittest - -from typ import ArgumentParser - - -class ArgumentParserTest(unittest.TestCase): - - def test_optparse_options(self): - parser = optparse.OptionParser() - ArgumentParser.add_option_group(parser, 'foo', - discovery=True, - running=True, - reporting=True, - skip='[-d]') - options, _ = parser.parse_args(['-j', '1']) - self.assertEqual(options.jobs, 1) - - def test_argv_from_args(self): - - def check(argv, expected=None): - parser = ArgumentParser() - args = parser.parse_args(argv) - actual_argv = parser.argv_from_args(args) - expected = expected or argv - self.assertEqual(expected, actual_argv) - - check(['--version']) - check(['--coverage', '--coverage-omit', 'foo']) - check(['--jobs', '3']) - check(['-vv'], ['--verbose', '--verbose']) - - def test_argv_from_args_foreign_argument(self): - parser = ArgumentParser() - parser.add_argument('--some-foreign-argument', default=False, - action='store_true') - args = parser.parse_args(['--some-foreign-argument', '--verbose']) - self.assertEqual(['--verbose'], ArgumentParser().argv_from_args(args)) - - def test_valid_shard_options(self): - parser = ArgumentParser() - - parser.parse_args(['--total-shards', '1']) - self.assertEqual(parser.exit_status, None) - - parser.parse_args(['--total-shards', '5', '--shard-index', '4']) - self.assertEqual(parser.exit_status, None) - - parser.parse_args(['--total-shards', '5', '--shard-index', '0']) - self.assertEqual(parser.exit_status, None) - - - def test_invalid_shard_options(self): - parser = ArgumentParser() - - parser.parse_args(['--total-shards', '0']) - self.assertEqual(parser.exit_status, 2) - - parser.parse_args(['--total-shards', '-1']) - self.assertEqual(parser.exit_status, 2) - - parser.parse_args(['--total-shards', '5', '--shard-index', '-1']) - self.assertEqual(parser.exit_status, 2) - - parser.parse_args(['--total-shards', '5', '--shard-index', '5']) - self.assertEqual(parser.exit_status, 2) - - parser.parse_args(['--total-shards', '5', '--shard-index', '6']) - self.assertEqual(parser.exit_status, 2)
diff --git a/typ/tests/host_test.py b/typ/tests/host_test.py deleted file mode 100644 index 5100a2a..0000000 --- a/typ/tests/host_test.py +++ /dev/null
@@ -1,216 +0,0 @@ -# Copyright 2014 Dirk Pranke. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import pickle -import sys -import unittest - -from typ.host import Host - - -class TestHost(unittest.TestCase): - - def host(self): - return Host() - - def test_capture_output(self): - try: - logging.basicConfig() - h = self.host() - h.capture_output() - h.print_('on stdout') - h.print_('on stderr', stream=h.stderr) - logging.critical('critical log failure') - out, err = h.restore_output() - self.assertEqual(out, 'on stdout\n') - self.assertEqual(err, 'on stderr\ncritical log failure\n') - finally: - h.logger.handlers = [] - - # TODO: Add tests for divert=False or eliminate the flag? - - def test_abspath_and_realpath(self): - h = self.host() - self.assertNotEqual(h.abspath(h.getcwd()), None) - self.assertNotEqual(h.realpath(h.getcwd()), None) - - def test_chdir(self): - h = self.host() - orig_cwd = h.getcwd() - h.chdir('.') - self.assertEqual(orig_cwd, h.getcwd()) - h.chdir('..') - self.assertNotEqual(orig_cwd, h.getcwd()) - - def test_files(self): - h = self.host() - orig_cwd = h.getcwd() - try: - now = h.time() - - # TODO: MacOS does goofy things with temp dirs by default, so - # we can't compare for equality. Figure out how to get the normpath - # from mkdtemp - dirpath = h.mkdtemp(suffix='host_test') - self.assertTrue(h.isdir(dirpath)) - h.chdir(dirpath) - self.assertIn(dirpath, h.getcwd()) - - h.maybe_mkdir('bar') - self.assertTrue(h.exists(dirpath, 'bar')) - self.assertTrue(h.isdir(dirpath, 'bar')) - self.assertFalse(h.isfile(dirpath, 'bar')) - - bar_path = h.join(dirpath, 'bar') - self.assertEqual(dirpath, h.dirname(bar_path)) - - h.write_text_file('bar/foo.txt', 'foo') - self.assertTrue(h.exists('bar', 'foo.txt')) - self.assertEqual(h.read_text_file('bar/foo.txt'), 'foo') - self.assertTrue(h.exists(dirpath, 'bar', 'foo.txt')) - self.assertTrue(h.isfile(dirpath, 'bar', 'foo.txt')) - self.assertFalse(h.isdir(dirpath, 'bar', 'foo.txt')) - - h.write_binary_file('binfile', b'bin contents') - self.assertEqual(h.read_binary_file('binfile'), - b'bin contents') - - self.assertEqual(sorted(h.files_under(dirpath)), - ['bar' + h.sep + 'foo.txt', 'binfile']) - - mtime = h.mtime(dirpath, 'bar', 'foo.txt') - self.assertGreaterEqual(now, mtime - 0.1) - h.remove(dirpath, 'bar', 'foo.txt') - self.assertFalse(h.exists(dirpath, 'bar', 'foo.txt')) - self.assertFalse(h.isfile(dirpath, 'bar', 'foo.txt')) - - h.chdir(orig_cwd) - h.rmtree(dirpath) - self.assertFalse(h.exists(dirpath)) - self.assertFalse(h.isdir(dirpath)) - finally: - h.chdir(orig_cwd) - - def test_terminal_width(self): - h = self.host() - self.assertGreaterEqual(h.terminal_width(), 0) - - def test_for_mp_and_pickling(self): - h = self.host() - mp_host = h.for_mp() - s = pickle.dumps(mp_host) - pickle.loads(s) - - def test_cpu_count(self): - h = self.host() - self.assertGreaterEqual(h.cpu_count(), 1) - - def test_getenv(self): - h = self.host() - self.assertNotEqual(h.getenv('PATH', ''), None) - - def test_getpid(self): - h = self.host() - self.assertNotEqual(h.getpid(), 0) - - def test_basename(self): - h = self.host() - self.assertEqual(h.basename('foo.txt'), 'foo.txt') - self.assertEqual(h.basename('foo/bar.txt'), 'bar.txt') - - def test_mktempfile(self, delete=False): # pylint: disable=unused-argument - h = self.host() - f = h.mktempfile() - f.close() - self.assertNotEqual(f.name, None) - - def test_splitext(self): - h = self.host() - self.assertEqual(h.splitext('foo'), ('foo', '')) - self.assertEqual(h.splitext('foo.txt'), ('foo', '.txt')) - self.assertEqual(h.splitext('foo/bar'), ('foo/bar', '')) - self.assertEqual(h.splitext('foo/bar.txt'), ('foo/bar', '.txt')) - - def test_print(self): - h = self.host() - - class FakeStream(object): - - def __init__(self): - self.contents = None - self.flush_called = False - - def write(self, m): - self.contents = m - - def flush(self): - self.flush_called = True - - s = FakeStream() - h.print_('hello', stream=s) - self.assertEqual(s.contents, 'hello\n') - self.assertTrue(s.flush_called) - - s = FakeStream() - h.stdout = s - h.print_('hello') - self.assertEqual(s.contents, 'hello\n') - - s = FakeStream() - h.stdout = s - h.print_('hello', '') - self.assertEqual(s.contents, 'hello') - - def test_call(self): - h = self.host() - ret, out, err = h.call( - [h.python_interpreter, - '-c', 'import sys; sys.stdout.write(sys.stdin.read())'], - stdin='foo', env={}) - self.assertEqual(ret, 0) - self.assertEqual(out, 'foo') - self.assertEqual(err, '') - - ret, out, err = h.call( - [h.python_interpreter, - '-c', 'import sys; sys.stderr.write("err\\n")']) - self.assertEqual(ret, 0) - self.assertEqual(out, '') - self.assertIn(err, ('err\n', 'err\r\n')) - - def test_call_inline(self): - h = self.host() - h.stdout = None - h.stderr = None - ret = h.call_inline([h.python_interpreter, - '-c', 'import sys; sys.exit(0)']) - self.assertEqual(ret, 0) - - def test_add_to_path(self): - orig_sys_path = sys.path[:] - try: - h = self.host() - h.add_to_path(sys.path[-1]) - self.assertEqual(sys.path, orig_sys_path) - - dirpath = h.mkdtemp() - h.add_to_path(dirpath) - self.assertNotEqual(sys.path, orig_sys_path) - finally: - sys.path = orig_sys_path - - def test_platform(self): - h = self.host() - self.assertNotEqual(h.platform, None)
diff --git a/typ/tests/json_results_test.py b/typ/tests/json_results_test.py deleted file mode 100644 index 76446f9..0000000 --- a/typ/tests/json_results_test.py +++ /dev/null
@@ -1,113 +0,0 @@ -# Copyright 2014 Dirk Pranke. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -from typ import json_results - - -class TestMakeUploadRequest(unittest.TestCase): - maxDiff = 4096 - - def test_basic_upload(self): - results = json_results.ResultSet() - full_results = json_results.make_full_results([], 0, [], results) - url, content_type, data = json_results.make_upload_request( - 'localhost', 'fake_builder_name', 'fake_master', 'fake_test_type', - full_results) - - self.assertEqual( - content_type, - 'multipart/form-data; ' - 'boundary=-J-S-O-N-R-E-S-U-L-T-S---B-O-U-N-D-A-R-Y-') - - self.assertEqual(url, 'https://localhost/testfile/upload') - self.assertMultiLineEqual( - data, - ('---J-S-O-N-R-E-S-U-L-T-S---B-O-U-N-D-A-R-Y-\r\n' - 'Content-Disposition: form-data; name="builder"\r\n' - '\r\n' - 'fake_builder_name\r\n' - '---J-S-O-N-R-E-S-U-L-T-S---B-O-U-N-D-A-R-Y-\r\n' - 'Content-Disposition: form-data; name="master"\r\n' - '\r\n' - 'fake_master\r\n' - '---J-S-O-N-R-E-S-U-L-T-S---B-O-U-N-D-A-R-Y-\r\n' - 'Content-Disposition: form-data; name="testtype"\r\n' - '\r\n' - 'fake_test_type\r\n' - '---J-S-O-N-R-E-S-U-L-T-S---B-O-U-N-D-A-R-Y-\r\n' - 'Content-Disposition: form-data; name="file"; ' - 'filename="full_results.json"\r\n' - 'Content-Type: application/json\r\n' - '\r\n' - '{"version": 3, "interrupted": false, "path_delimiter": ".", ' - '"seconds_since_epoch": 0, ' - '"num_failures_by_type": {"FAIL": 0, "PASS": 0, "SKIP": 0}, ' - '"tests": {}}\r\n' - '---J-S-O-N-R-E-S-U-L-T-S---B-O-U-N-D-A-R-Y---\r\n')) - - -class TestMakeFullResults(unittest.TestCase): - maxDiff = 2048 - - def test_basic(self): - test_names = ['foo_test.FooTest.test_fail', - 'foo_test.FooTest.test_pass', - 'foo_test.FooTest.test_skip'] - - result_set = json_results.ResultSet() - result_set.add( - json_results.Result('foo_test.FooTest.test_fail', - json_results.ResultType.Failure, 0, 0.1, 0, - unexpected=True)) - result_set.add(json_results.Result('foo_test.FooTest.test_pass', - json_results.ResultType.Pass, - 0, 0.2, 0)) - result_set.add(json_results.Result('foo_test.FooTest.test_skip', - json_results.ResultType.Skip, - 0, 0.3, 0, unexpected=False)) - - full_results = json_results.make_full_results( - ['foo=bar'], 0, test_names, result_set) - expected_full_results = { - 'foo': 'bar', - 'interrupted': False, - 'num_failures_by_type': { - 'FAIL': 1, - 'PASS': 1, - 'SKIP': 1}, - 'path_delimiter': '.', - 'seconds_since_epoch': 0, - 'tests': { - 'foo_test': { - 'FooTest': { - 'test_fail': { - 'expected': 'PASS', - 'actual': 'FAIL', - 'times': [0.1], - 'is_unexpected': True, - }, - 'test_pass': { - 'expected': 'PASS', - 'actual': 'PASS', - 'times': [0.2], - }, - 'test_skip': { - 'expected': 'SKIP', - 'actual': 'SKIP', - 'times': [0.3], - }}}}, - 'version': 3} - self.assertEqual(full_results, expected_full_results)
diff --git a/typ/tests/main_test.py b/typ/tests/main_test.py deleted file mode 100644 index f33d0b3..0000000 --- a/typ/tests/main_test.py +++ /dev/null
@@ -1,829 +0,0 @@ -# Copyright 2014 Dirk Pranke. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import io -import json -import os -import sys -import textwrap - -from typ import main -from typ import test_case -from typ import Host -from typ import VERSION -from typ.fakes import test_result_server_fake - - -is_python3 = bool(sys.version_info.major == 3) - -if is_python3: # pragma: python3 - # pylint: disable=redefined-builtin,invalid-name - unicode = str - -d = textwrap.dedent - - -PASS_TEST_PY = """ -import unittest -class PassingTest(unittest.TestCase): - def test_pass(self): - pass -""" - - -PASS_TEST_FILES = {'pass_test.py': PASS_TEST_PY} - - -FAIL_TEST_PY = """ -import unittest -class FailingTest(unittest.TestCase): - def test_fail(self): - self.fail() -""" - - -FAIL_TEST_FILES = {'fail_test.py': FAIL_TEST_PY} - - -OUTPUT_TEST_PY = """ -import sys -import unittest - -class PassTest(unittest.TestCase): - def test_out(self): - sys.stdout.write("hello on stdout\\n") - sys.stdout.flush() - - def test_err(self): - sys.stderr.write("hello on stderr\\n") - -class FailTest(unittest.TestCase): - def test_out_err_fail(self): - sys.stdout.write("hello on stdout\\n") - sys.stdout.flush() - sys.stderr.write("hello on stderr\\n") - self.fail() -""" - - -OUTPUT_TEST_FILES = {'output_test.py': OUTPUT_TEST_PY} - - -SF_TEST_PY = """ -import sys -import unittest - -class SkipMethods(unittest.TestCase): - @unittest.skip('reason') - def test_reason(self): - self.fail() - - @unittest.skipIf(True, 'reason') - def test_skip_if_true(self): - self.fail() - - @unittest.skipIf(False, 'reason') - def test_skip_if_false(self): - self.fail() - - -class SkipSetup(unittest.TestCase): - def setUp(self): - self.skipTest('setup failed') - - def test_notrun(self): - self.fail() - - -@unittest.skip('skip class') -class SkipClass(unittest.TestCase): - def test_method(self): - self.fail() - -class SetupClass(unittest.TestCase): - @classmethod - def setUpClass(cls): - sys.stdout.write('in setupClass\\n') - sys.stdout.flush() - assert False, 'setupClass failed' - - def test_method1(self): - pass - - def test_method2(self): - pass - -class ExpectedFailures(unittest.TestCase): - @unittest.expectedFailure - def test_fail(self): - self.fail() - - @unittest.expectedFailure - def test_pass(self): - pass -""" - - -SF_TEST_FILES = {'sf_test.py': SF_TEST_PY} - - -LOAD_TEST_PY = """ -import unittest - - -class BaseTest(unittest.TestCase): - pass - - -def method_fail(self): - self.fail() - - -def method_pass(self): - pass - - -def load_tests(_, _2, _3): - setattr(BaseTest, "test_fail", method_fail) - setattr(BaseTest, "test_pass", method_pass) - suite = unittest.TestSuite() - suite.addTest(BaseTest("test_fail")) - suite.addTest(BaseTest("test_pass")) - return suite -""" - -LOAD_TEST_FILES = {'load_test.py': LOAD_TEST_PY} - - - -path_to_main = os.path.join( - os.path.dirname(os.path.dirname(os.path.abspath(__file__))), - 'runner.py') - - -class TestCli(test_case.MainTestCase): - prog = [sys.executable, path_to_main] - files_to_ignore = ['*.pyc'] - - def test_bad_arg(self): - self.check(['--bad-arg'], ret=2, out='', - rerr='.*: error: unrecognized arguments: --bad-arg\n') - self.check(['-help'], ret=2, out='', - rerr=(".*: error: argument -h/--help: " - "ignored explicit argument 'elp'\n")) - - def test_bad_metadata(self): - self.check(['--metadata', 'foo'], ret=2, err='', - out='Error: malformed --metadata "foo"\n') - - def test_basic(self): - self.check([], files=PASS_TEST_FILES, - ret=0, - out=('[1/1] pass_test.PassingTest.test_pass passed\n' - '1 test passed, 0 skipped, 0 failures.\n'), err='') - - def test_coverage(self): - try: - import coverage # pylint: disable=W0612 - files = { - 'pass_test.py': PASS_TEST_PY, - 'fail_test.py': FAIL_TEST_PY, - } - self.check(['-c', 'pass_test'], files=files, ret=0, err='', - out=d("""\ - [1/1] pass_test.PassingTest.test_pass passed - 1 test passed, 0 skipped, 0 failures. - - Name Stmts Miss Cover - ---------------------------------- - fail_test.py 4 4 0% - pass_test.py 4 0 100% - ---------------------------------- - TOTAL 8 4 50% - """)) - except ImportError: # pragma: no cover - # We can never cover this line, since running coverage means - # that import will succeed. - self.check(['-c'], files=PASS_TEST_FILES, ret=1, - out='Error: coverage is not installed\n', err='') - - def test_debugger(self): - if sys.version_info.major == 3: # pragma: python3 - return - else: # pragma: python2 - _, out, _, _ = self.check(['-d'], stdin='quit()\n', - files=PASS_TEST_FILES, ret=0, err='') - self.assertIn('(Pdb) ', out) - - def test_dryrun(self): - self.check(['-n'], files=PASS_TEST_FILES, ret=0, err='', - out=d("""\ - [1/1] pass_test.PassingTest.test_pass passed - 1 test passed, 0 skipped, 0 failures. - """)) - - def test_error(self): - files = {'err_test.py': d("""\ - import unittest - class ErrTest(unittest.TestCase): - def test_err(self): - foo = bar - """)} - _, out, _, _ = self.check([''], files=files, ret=1, err='') - self.assertIn('[1/1] err_test.ErrTest.test_err failed unexpectedly', - out) - self.assertIn('0 tests passed, 0 skipped, 1 failure', out) - - def test_fail(self): - _, out, _, _ = self.check([], files=FAIL_TEST_FILES, ret=1, err='') - self.assertIn('fail_test.FailingTest.test_fail failed unexpectedly', - out) - - def test_fail_then_pass(self): - files = {'fail_then_pass_test.py': d("""\ - import unittest - count = 0 - class FPTest(unittest.TestCase): - def test_count(self): - global count - count += 1 - if count == 1: - self.fail() - """)} - _, out, _, files = self.check(['--retry-limit', '3', - '--write-full-results-to', - 'full_results.json'], - files=files, ret=0, err='') - self.assertIn('Retrying failed tests (attempt #1 of 3)', out) - self.assertNotIn('Retrying failed tests (attempt #2 of 3)', out) - self.assertIn('1 test passed, 0 skipped, 0 failures.\n', out) - results = json.loads(files['full_results.json']) - self.assertEqual( - results['tests'][ - 'fail_then_pass_test']['FPTest']['test_count']['actual'], - 'FAIL PASS') - - def test_fail_then_skip(self): - files = {'fail_then_skip_test.py': d("""\ - import unittest - count = 0 - class FPTest(unittest.TestCase): - def test_count(self): - global count - count += 1 - if count == 1: - self.fail() - elif count == 2: - self.skipTest('') - """)} - _, out, _, files = self.check(['--retry-limit', '3', - '--write-full-results-to', - 'full_results.json'], - files=files, ret=0, err='') - self.assertIn('Retrying failed tests (attempt #1 of 3)', out) - self.assertNotIn('Retrying failed tests (attempt #2 of 3)', out) - self.assertIn('0 tests passed, 1 skipped, 0 failures.\n', out) - results = json.loads(files['full_results.json']) - self.assertEqual( - results['tests'][ - 'fail_then_skip_test']['FPTest']['test_count']['actual'], - 'FAIL SKIP') - - def test_failures_are_not_elided(self): - _, out, _, _ = self.check(['--terminal-width=20'], - files=FAIL_TEST_FILES, ret=1, err='') - self.assertIn('[1/1] fail_test.FailingTest.test_fail failed ' - 'unexpectedly:\n', out) - - def test_file_list(self): - files = PASS_TEST_FILES - self.check(['-f', '-'], files=files, stdin='pass_test\n', ret=0) - self.check(['-f', '-'], files=files, stdin='pass_test.PassingTest\n', - ret=0) - self.check(['-f', '-'], files=files, - stdin='pass_test.PassingTest.test_pass\n', - ret=0) - files = {'pass_test.py': PASS_TEST_PY, - 'test_list.txt': 'pass_test.PassingTest.test_pass\n'} - self.check(['-f', 'test_list.txt'], files=files, ret=0) - - def test_find(self): - files = PASS_TEST_FILES - self.check(['-l'], files=files, ret=0, - out='pass_test.PassingTest.test_pass\n') - self.check(['-l', 'pass_test'], files=files, ret=0, err='', - out='pass_test.PassingTest.test_pass\n') - self.check(['-l', 'pass_test.py'], files=files, ret=0, err='', - out='pass_test.PassingTest.test_pass\n') - self.check(['-l', './pass_test.py'], files=files, ret=0, err='', - out='pass_test.PassingTest.test_pass\n') - self.check(['-l', '.'], files=files, ret=0, err='', - out='pass_test.PassingTest.test_pass\n') - self.check(['-l', 'pass_test.PassingTest.test_pass'], files=files, - ret=0, err='', - out='pass_test.PassingTest.test_pass\n') - self.check(['-l', '.'], files=files, ret=0, err='', - out='pass_test.PassingTest.test_pass\n') - - def test_find_from_subdirs(self): - files = { - 'foo/__init__.py': '', - 'foo/pass_test.py': PASS_TEST_PY, - 'bar/__init__.py': '', - 'bar/tmp': '', - - } - self.check(['-l', '../foo/pass_test.py'], files=files, cwd='bar', - ret=0, err='', - out='foo.pass_test.PassingTest.test_pass\n') - self.check(['-l', 'foo'], files=files, cwd='bar', - ret=0, err='', - out='foo.pass_test.PassingTest.test_pass\n') - self.check(['-l', '--path', '../foo', 'pass_test'], - files=files, cwd='bar', ret=0, err='', - out='pass_test.PassingTest.test_pass\n') - - def test_multiple_top_level_dirs(self): - files = { - 'foo/bar/__init__.py': '', - 'foo/bar/pass_test.py': PASS_TEST_PY, - 'baz/quux/__init__.py': '', - 'baz/quux/second_test.py': PASS_TEST_PY, - } - self.check(['-l', 'foo/bar', 'baz/quux'], files=files, - ret=0, err='', - out=( - 'bar.pass_test.PassingTest.test_pass\n' - 'quux.second_test.PassingTest.test_pass\n' - )) - self.check(['-l', 'foo/bar/pass_test.py', 'baz/quux'], files=files, - ret=0, err='', - out=( - 'bar.pass_test.PassingTest.test_pass\n' - 'quux.second_test.PassingTest.test_pass\n' - )) - self.check(['-l', '--top-level-dirs', 'foo', '--top-level-dirs', 'baz'], - files=files, - ret=0, err='', - out=( - 'bar.pass_test.PassingTest.test_pass\n' - 'quux.second_test.PassingTest.test_pass\n' - )) - - def test_single_top_level_dir(self): - files = { - 'foo/bar/__init__.py': '', - 'foo/bar/pass_test.py': PASS_TEST_PY, - 'baz/quux/__init__.py': '', - 'baz/quux/second_test.py': PASS_TEST_PY, - } - self.check(['-l', '--top-level-dir', 'foo'], - files=files, - ret=0, err='', - out=( - 'bar.pass_test.PassingTest.test_pass\n' - )) - - def test_can_not_have_both_top_level_flags(self): - files = { - 'foo/bar/__init__.py': '', - 'foo/bar/pass_test.py': PASS_TEST_PY, - 'baz/quux/__init__.py': '', - 'baz/quux/second_test.py': PASS_TEST_PY, - } - self.check( - ['-l', '--top-level-dir', 'foo', '--top-level-dirs', 'bar'], - files=files, - ret=1, out='', - err='Cannot specify both --top-level-dir and --top-level-dirs\n') - - def test_help(self): - self.check(['--help'], ret=0, rout='.*', err='') - - def test_import_failure_missing_file(self): - _, out, _, _ = self.check(['-l', 'foo'], ret=1, err='') - self.assertIn('Failed to load "foo" in find_tests', out) - self.assertIn('No module named', out) - - def test_import_failure_missing_package(self): - files = {'foo.py': d("""\ - import unittest - import package_that_does_not_exist - - class ImportFailureTest(unittest.TestCase): - def test_case(self): - pass - """)} - _, out, _, _ = self.check(['-l', 'foo.py'], files=files, ret=1, err='') - self.assertIn('Failed to load "foo.py" in find_tests', out) - self.assertIn('No module named', out) - - def test_import_failure_no_tests(self): - files = {'foo.py': 'import unittest'} - self.check(['-l', 'foo'], files=files, ret=0, err='', - out='\n') - - def test_import_failure_syntax_error(self): - files = {'syn_test.py': d("""\ - import unittest - - class SyntaxErrorTest(unittest.TestCase): - def test_syntax_error_in_test(self): - syntax error - """)} - _, out, _, _ = self.check([], files=files, ret=1, err='') - self.assertIn('Failed to import test module: syn_test', out) - self.assertIn('SyntaxError: invalid syntax', out) - - def test_interrupt(self): - files = {'interrupt_test.py': d("""\ - import unittest - class Foo(unittest.TestCase): - def test_interrupt(self): - raise KeyboardInterrupt() - """)} - self.check(['-j', '1'], files=files, ret=130, out='', - err='interrupted, exiting\n') - - def test_isolate(self): - self.check(['--isolate', '*test_pass*'], files=PASS_TEST_FILES, ret=0, - out=('[1/1] pass_test.PassingTest.test_pass passed\n' - '1 test passed, 0 skipped, 0 failures.\n'), err='') - - def test_load_tests_failure(self): - files = {'foo_test.py': d("""\ - import unittest - - def load_tests(_, _2, _3): - raise ValueError('this should fail') - """)} - _, out, _, _ = self.check([], files=files, ret=1, err='') - self.assertIn('this should fail', out) - - def test_load_tests_single_worker(self): - files = LOAD_TEST_FILES - _, out, _, _ = self.check(['-j', '1', '-v'], files=files, ret=1, - err='') - self.assertIn('[1/2] load_test.BaseTest.test_fail failed', out) - self.assertIn('[2/2] load_test.BaseTest.test_pass passed', out) - self.assertIn('1 test passed, 0 skipped, 1 failure.\n', out) - - def test_load_tests_multiple_workers(self): - _, out, _, _ = self.check([], files=LOAD_TEST_FILES, ret=1, err='') - - # The output for this test is nondeterministic since we may run - # two tests in parallel. So, we just test that some of the substrings - # we care about are present. - self.assertIn('test_pass passed', out) - self.assertIn('test_fail failed', out) - self.assertIn('1 test passed, 0 skipped, 1 failure.\n', out) - - def test_missing_builder_name(self): - self.check(['--test-results-server', 'localhost'], ret=2, - out=('Error: --builder-name must be specified ' - 'along with --test-result-server\n' - 'Error: --master-name must be specified ' - 'along with --test-result-server\n' - 'Error: --test-type must be specified ' - 'along with --test-result-server\n'), err='') - - def test_ninja_status_env(self): - self.check(['-v', 'output_test.PassTest.test_out'], - files=OUTPUT_TEST_FILES, aenv={'NINJA_STATUS': 'ns: '}, - out=d("""\ - ns: output_test.PassTest.test_out passed - 1 test passed, 0 skipped, 0 failures. - """), err='') - - def test_output_for_failures(self): - _, out, _, _ = self.check(['output_test.FailTest'], - files=OUTPUT_TEST_FILES, - ret=1, err='') - self.assertIn('[1/1] output_test.FailTest.test_out_err_fail ' - 'failed unexpectedly:\n' - ' hello on stdout\n' - ' hello on stderr\n', out) - - def test_quiet(self): - self.check(['-q'], files=PASS_TEST_FILES, ret=0, err='', out='') - - def test_retry_limit(self): - _, out, _, _ = self.check(['--retry-limit', '2'], - files=FAIL_TEST_FILES, ret=1, err='') - self.assertIn('Retrying failed tests', out) - lines = out.splitlines() - self.assertEqual(len([l for l in lines - if 'test_fail failed unexpectedly:' in l]), - 3) - - def test_skip(self): - _, out, _, _ = self.check(['--skip', '*test_fail*'], - files=FAIL_TEST_FILES, ret=0) - self.assertIn('0 tests passed, 1 skipped, 0 failures.', out) - - files = {'fail_test.py': FAIL_TEST_PY, - 'pass_test.py': PASS_TEST_PY} - self.check(['-j', '1', '--skip', '*test_fail*'], files=files, ret=0, - out=('[1/2] fail_test.FailingTest.test_fail was skipped\n' - '[2/2] pass_test.PassingTest.test_pass passed\n' - '1 test passed, 1 skipped, 0 failures.\n'), err='') - - # This tests that we print test_started updates for skipped tests - # properly. It also tests how overwriting works. - _, out, _, _ = self.check(['-j', '1', '--overwrite', '--skip', - '*test_fail*'], files=files, ret=0, - err='', universal_newlines=False) - - # We test this string separately and call out.strip() to - # avoid the trailing \r\n we get on windows, while keeping - # the \r's elsewhere in the string. - self.assertMultiLineEqual( - out.strip(), - ('[0/2] fail_test.FailingTest.test_fail\r' - ' \r' - '[1/2] fail_test.FailingTest.test_fail was skipped\r' - ' \r' - '[1/2] pass_test.PassingTest.test_pass\r' - ' \r' - '[2/2] pass_test.PassingTest.test_pass passed\r' - ' \r' - '1 test passed, 1 skipped, 0 failures.')) - - def test_skips_and_failures(self): - _, out, _, _ = self.check(['-j', '1', '-v', '-v'], files=SF_TEST_FILES, - ret=1, err='') - - # We do a bunch of assertIn()'s to work around the non-portable - # tracebacks. - self.assertIn(('[1/9] sf_test.ExpectedFailures.test_fail failed:\n' - ' Traceback '), out) - self.assertIn(('[2/9] sf_test.ExpectedFailures.test_pass ' - 'passed unexpectedly'), out) - self.assertIn(('[3/9] sf_test.SetupClass.test_method1 ' - 'failed unexpectedly:\n' - ' in setupClass\n'), out) - self.assertIn(('[4/9] sf_test.SetupClass.test_method2 ' - 'failed unexpectedly:\n' - ' in setupClass\n'), out) - self.assertIn(('[5/9] sf_test.SkipClass.test_method was skipped:\n' - ' skip class\n'), out) - self.assertIn(('[6/9] sf_test.SkipMethods.test_reason was skipped:\n' - ' reason\n'), out) - self.assertIn(('[7/9] sf_test.SkipMethods.test_skip_if_false ' - 'failed unexpectedly:\n' - ' Traceback'), out) - self.assertIn(('[8/9] sf_test.SkipMethods.test_skip_if_true ' - 'was skipped:\n' - ' reason\n' - '[9/9] sf_test.SkipSetup.test_notrun was skipped:\n' - ' setup failed\n' - '1 test passed, 4 skipped, 4 failures.\n'), out) - - def test_skip_and_all(self): - # --all should override --skip - _, out, _, _ = self.check(['--skip', '*test_pass'], - files=PASS_TEST_FILES, ret=0, err='') - self.assertIn('0 tests passed, 1 skipped, 0 failures.', out) - - _, out, _, _ = self.check(['--all', '--skip', '*test_pass'], - files=PASS_TEST_FILES, ret=0, err='') - self.assertIn('1 test passed, 0 skipped, 0 failures.', out) - - def test_skip_decorators_and_all(self): - _, out, _, _ = self.check(['--all', '-j', '1', '-v', '-v'], - files=SF_TEST_FILES, ret=1, err='') - self.assertIn('sf_test.SkipClass.test_method failed', out) - self.assertIn('sf_test.SkipMethods.test_reason failed', out) - self.assertIn('sf_test.SkipMethods.test_skip_if_true failed', out) - self.assertIn('sf_test.SkipMethods.test_skip_if_false failed', out) - - # --all does not override explicit calls to skipTest(), only - # the decorators. - self.assertIn('sf_test.SkipSetup.test_notrun was skipped', out) - - def test_sharding(self): - - def run(shard_index, total_shards, tests): - files = {'shard_test.py': textwrap.dedent( - """\ - import unittest - class ShardTest(unittest.TestCase): - def test_01(self): - pass - - def test_02(self): - pass - - def test_03(self): - pass - - def test_04(self): - pass - - def test_05(self): - pass - """)} - _, out, _, _ = self.check( - ['--shard-index', str(shard_index), - '--total-shards', str(total_shards), - '--jobs', '1'], - files=files) - - exp_out = '' - total_tests = len(tests) - for i, test in enumerate(tests): - exp_out += ('[%d/%d] shard_test.ShardTest.test_%s passed\n' % - (i + 1, total_tests, test)) - exp_out += '%d test%s passed, 0 skipped, 0 failures.\n' % ( - total_tests, "" if total_tests == 1 else "s") - self.assertEqual(out, exp_out) - - run(0, 1, ['01', '02', '03', '04', '05']) - run(0, 2, ['01', '03', '05']) - run(1, 2, ['02', '04']) - run(0, 6, ['01']) - - def test_subdir(self): - files = { - 'foo/__init__.py': '', - 'foo/bar/__init__.py': '', - 'foo/bar/pass_test.py': PASS_TEST_PY - } - self.check(['foo/bar'], files=files, ret=0, err='', - out=d("""\ - [1/1] foo.bar.pass_test.PassingTest.test_pass passed - 1 test passed, 0 skipped, 0 failures. - """)) - - def test_timing(self): - self.check(['-t'], files=PASS_TEST_FILES, ret=0, err='', - rout=(r'\[1/1\] pass_test.PassingTest.test_pass passed ' - r'\d+.\d+s\n' - r'1 test passed in \d+.\d+s, 0 skipped, 0 failures.')) - - def test_test_results_server(self): - server = test_result_server_fake.start() - self.assertNotEqual(server, None, 'could not start fake server') - - try: - self.check(['--test-results-server', - 'http://%s:%d' % server.server_address, - '--master-name', 'fake_master', - '--builder-name', 'fake_builder', - '--test-type', 'typ_tests', - '--metadata', 'foo=bar'], - files=PASS_TEST_FILES, ret=0, err='', - out=('[1/1] pass_test.PassingTest.test_pass passed\n' - '1 test passed, 0 skipped, 0 failures.\n')) - - finally: - posts = server.stop() - - self.assertEqual(len(posts), 1) - payload = posts[0][2].decode('utf8') - self.assertIn('"test_pass": {"actual": "PASS"', - payload) - self.assertTrue(payload.endswith('--\r\n')) - self.assertNotEqual(server.log.getvalue(), '') - - def test_test_results_server_error(self): - server = test_result_server_fake.start(code=500) - self.assertNotEqual(server, None, 'could not start fake server') - - try: - self.check(['--test-results-server', - 'http://%s:%d' % server.server_address, - '--master-name', 'fake_master', - '--builder-name', 'fake_builder', - '--test-type', 'typ_tests', - '--metadata', 'foo=bar'], - files=PASS_TEST_FILES, ret=1, err='', - out=('[1/1] pass_test.PassingTest.test_pass passed\n' - '1 test passed, 0 skipped, 0 failures.\n' - 'Uploading the JSON results raised ' - '"HTTP Error 500: Internal Server Error"\n')) - - finally: - _ = server.stop() - - def test_test_results_server_not_running(self): - self.check(['--test-results-server', 'http://localhost:99999', - '--master-name', 'fake_master', - '--builder-name', 'fake_builder', - '--test-type', 'typ_tests', - '--metadata', 'foo=bar'], - files=PASS_TEST_FILES, ret=1, err='', - rout=(r'\[1/1\] pass_test.PassingTest.test_pass passed\n' - '1 test passed, 0 skipped, 0 failures.\n' - 'Uploading the JSON results raised .*\n')) - - def test_verbose_2(self): - self.check(['-vv', '-j', '1', 'output_test.PassTest'], - files=OUTPUT_TEST_FILES, ret=0, - out=d("""\ - [1/2] output_test.PassTest.test_err passed: - hello on stderr - [2/2] output_test.PassTest.test_out passed: - hello on stdout - 2 tests passed, 0 skipped, 0 failures. - """), err='') - - def test_verbose_3(self): - self.check(['-vvv', '-j', '1', 'output_test.PassTest'], - files=OUTPUT_TEST_FILES, ret=0, - out=d("""\ - [0/2] output_test.PassTest.test_err queued - [1/2] output_test.PassTest.test_err passed: - hello on stderr - [1/2] output_test.PassTest.test_out queued - [2/2] output_test.PassTest.test_out passed: - hello on stdout - 2 tests passed, 0 skipped, 0 failures. - """), err='') - - def test_version(self): - self.check('--version', ret=0, out=(VERSION + '\n')) - - def test_write_full_results_to(self): - _, _, _, files = self.check(['--write-full-results-to', - 'results.json'], files=PASS_TEST_FILES) - self.assertIn('results.json', files) - results = json.loads(files['results.json']) - self.assertEqual(results['interrupted'], False) - self.assertEqual(results['path_delimiter'], '.') - - # The time it takes to run the test varies, so we test that - # we got a single entry greater than zero, but then delete it from - # the result so we can do an exact match on the rest of the trie. - result = results['tests']['pass_test']['PassingTest']['test_pass'] - self.assertEqual(len(result['times']), 1) - self.assertGreater(result['times'][0], 0) - result.pop('times') - self.assertEqual(results['tests'], - {u'pass_test': { - u'PassingTest': { - u'test_pass': { - u'actual': u'PASS', - u'expected': u'PASS', - } - } - }}) - - def test_write_trace_to(self): - _, _, _, files = self.check(['--write-trace-to', 'trace.json'], - files=PASS_TEST_FILES) - self.assertIn('trace.json', files) - trace_obj = json.loads(files['trace.json']) - self.assertEqual(trace_obj['otherData'], {}) - self.assertEqual(len(trace_obj['traceEvents']), 5) - event = trace_obj['traceEvents'][0] - self.assertEqual(event['name'], 'pass_test.PassingTest.test_pass') - self.assertEqual(event['ph'], 'X') - self.assertEqual(event['tid'], 1) - self.assertEqual(event['args']['expected'], ['Pass']) - self.assertEqual(event['args']['actual'], 'Pass') - - -class TestMain(TestCli): - prog = [] - - def make_host(self): - return Host() - - def call(self, host, argv, stdin, env): - stdin = unicode(stdin) - host.stdin = io.StringIO(stdin) - if env: - host.getenv = env.get - host.capture_output() - orig_sys_path = sys.path[:] - orig_sys_modules = list(sys.modules.keys()) - - try: - ret = main(argv + ['-j', '1'], host) - finally: - out, err = host.restore_output() - modules_to_unload = [] - for k in sys.modules: - if k not in orig_sys_modules: - modules_to_unload.append(k) - for k in modules_to_unload: - del sys.modules[k] - sys.path = orig_sys_path - - return ret, out, err - - def test_debugger(self): - # TODO: this test seems to hang under coverage. - pass
diff --git a/typ/tests/pool_test.py b/typ/tests/pool_test.py deleted file mode 100644 index 319c396..0000000 --- a/typ/tests/pool_test.py +++ /dev/null
@@ -1,172 +0,0 @@ -# Copyright 2014 Dirk Pranke. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys -import unittest - -from typ import test_case -from typ.host import Host -from typ.pool import make_pool, _MessageType, _ProcessPool, _loop - - -def _pre(host, worker_num, context): # pylint: disable=W0613 - context['pre'] = True - return context - - -def _post(context): - context['post'] = True - return context - - -def _echo(context, msg): - return '%s/%s/%s' % (context['pre'], context['post'], msg) - - -def _error(context, msg): # pylint: disable=W0613 - raise Exception('_error() raised Exception') - - -def _interrupt(context, msg): # pylint: disable=W0613 - raise KeyboardInterrupt() - - -def _stub(*args): # pylint: disable=W0613 - return None - - -class TestPool(test_case.TestCase): - - def run_basic_test(self, jobs): - host = Host() - context = {'pre': False, 'post': False} - pool = make_pool(host, jobs, _echo, context, _pre, _post) - pool.send('hello') - pool.send('world') - msg1 = pool.get() - msg2 = pool.get() - pool.close() - final_contexts = pool.join() - self.assertEqual(set([msg1, msg2]), - set(['True/False/hello', - 'True/False/world'])) - expected_context = {'pre': True, 'post': True} - expected_final_contexts = [expected_context for _ in range(jobs)] - self.assertEqual(final_contexts, expected_final_contexts) - - def run_through_loop(self, callback=None, pool=None): - callback = callback or _stub - if pool: - host = pool.host - else: - host = Host() - pool = _ProcessPool(host, 0, _stub, None, _stub, _stub) - pool.send('hello') - - worker_num = 1 - _loop(pool.requests, pool.responses, host, worker_num, callback, - None, _stub, _stub, should_loop=False) - return pool - - def test_async_close(self): - host = Host() - pool = make_pool(host, 1, _echo, None, _stub, _stub) - pool.join() - - def test_basic_one_job(self): - self.run_basic_test(1) - - def test_basic_two_jobs(self): - self.run_basic_test(2) - - def test_join_discards_messages(self): - host = Host() - context = {'pre': False, 'post': False} - pool = make_pool(host, 2, _echo, context, _pre, _post) - pool.send('hello') - pool.close() - pool.join() - self.assertEqual(len(pool.discarded_responses), 1) - - @unittest.skipIf(sys.version_info.major == 3, 'fails under python3') - def test_join_gets_an_error(self): - host = Host() - pool = make_pool(host, 2, _error, None, _stub, _stub) - pool.send('hello') - pool.close() - try: - pool.join() - except Exception as e: - self.assertIn('_error() raised Exception', str(e)) - - def test_join_gets_an_interrupt(self): - host = Host() - pool = make_pool(host, 2, _interrupt, None, _stub, _stub) - pool.send('hello') - pool.close() - self.assertRaises(KeyboardInterrupt, pool.join) - - def test_loop(self): - pool = self.run_through_loop() - resp = pool.get() - self.assertEqual(resp, None) - pool.requests.put((_MessageType.Close, None)) - pool.close() - self.run_through_loop(pool=pool) - pool.join() - - def test_loop_fails_to_respond(self): - # This tests what happens if _loop() tries to send a response - # on a closed queue; we can't simulate this directly through the - # api in a single thread. - pool = self.run_through_loop() - pool.requests.put((_MessageType.Request, None)) - pool.requests.put((_MessageType.Close, None)) - self.run_through_loop(pool=pool) - pool.join() - - @unittest.skipIf(sys.version_info.major == 3, 'fails under python3') - def test_loop_get_raises_error(self): - pool = self.run_through_loop(_error) - self.assertRaises(Exception, pool.get) - pool.requests.put((_MessageType.Close, None)) - pool.close() - pool.join() - - def test_loop_get_raises_interrupt(self): - pool = self.run_through_loop(_interrupt) - self.assertRaises(KeyboardInterrupt, pool.get) - pool.requests.put((_MessageType.Close, None)) - pool.close() - pool.join() - - def test_pickling_errors(self): - def unpicklable_fn(): # pragma: no cover - pass - - host = Host() - jobs = 2 - self.assertRaises(Exception, make_pool, - host, jobs, _stub, unpicklable_fn, None, None) - self.assertRaises(Exception, make_pool, - host, jobs, _stub, None, unpicklable_fn, None) - self.assertRaises(Exception, make_pool, - host, jobs, _stub, None, None, unpicklable_fn) - - def test_no_close(self): - host = Host() - context = {'pre': False, 'post': False} - pool = make_pool(host, 2, _echo, context, _pre, _post) - final_contexts = pool.join() - self.assertEqual(final_contexts, [])
diff --git a/typ/tests/printer_test.py b/typ/tests/printer_test.py deleted file mode 100644 index b2310bc..0000000 --- a/typ/tests/printer_test.py +++ /dev/null
@@ -1,61 +0,0 @@ -# Copyright 2014 Dirk Pranke. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -from typ.printer import Printer - - -class TestPrinter(unittest.TestCase): - - def setUp(self): - # 'Invalid name' pylint: disable=C0103 - self.out = [] - - def print_(self, msg, end='\n'): - self.out.append(msg + end) - - def test_basic(self): - pr = Printer(self.print_, False, 80) - pr.update('foo') - pr.flush() - self.assertEqual(self.out, ['foo', '\n']) - - def test_elide(self): - pr = Printer(self.print_, False, 8) - pr.update('hello world') - pr.flush() - self.assertEqual(self.out, ['h...d', '\n']) - - def test_overwrite(self): - pr = Printer(self.print_, True, 80) - pr.update('hello world') - pr.update('goodbye world') - pr.flush() - self.assertEqual(self.out, - ['hello world', - '\r \r', - 'goodbye world', - '\n']) - - def test_last_line_flushed_when_not_overwriting(self): - pr = Printer(self.print_, False, 80) - pr.update('foo\nbar') - pr.update('baz') - pr.flush() - self.assertEqual(self.out, - ['foo\nbar', - '\n', - 'baz', - '\n'])
diff --git a/typ/tests/runner_test.py b/typ/tests/runner_test.py deleted file mode 100644 index 1d14416..0000000 --- a/typ/tests/runner_test.py +++ /dev/null
@@ -1,223 +0,0 @@ -# Copyright 2014 Dirk Pranke. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys -import tempfile -import unittest - -from textwrap import dedent as d - - -from typ import Host, Runner, TestCase, TestSet, TestInput -from typ import WinMultiprocessing - - -def _setup_process(child, context): # pylint: disable=W0613 - return context - - -def _teardown_process(child, context): # pylint: disable=W0613 - return context - -def _teardown_throws(child, context): # pylint: disable=W0613 - raise Exception("exception in teardown") - - -class RunnerTests(TestCase): - def test_context(self): - r = Runner() - r.args.tests = ['typ.tests.runner_test.ContextTests'] - r.context = {'foo': 'bar'} - r.setup_fn = _setup_process - r.teardown_fn = _teardown_process - r.win_multiprocessing = WinMultiprocessing.importable - ret, _, _ = r.run() - self.assertEqual(ret, 0) - - @unittest.skipIf(sys.version_info.major == 3, 'fails under python3') - def test_exception_in_teardown(self): - r = Runner() - r.args.tests = ['typ.tests.runner_test.ContextTests'] - r.context = {'foo': 'bar'} - r.setup_fn = _setup_process - r.teardown_fn = _teardown_throws - r.win_multiprocessing = WinMultiprocessing.importable - ret, _, _ = r.run() - self.assertEqual(ret, 0) - self.assertEqual(r.final_responses[0][2].message, - 'exception in teardown') - - def test_bad_default(self): - r = Runner() - ret = r.main([], foo='bar') - self.assertEqual(ret, 2) - - def test_good_default(self): - r = Runner() - ret = r.main([], tests=['typ.tests.runner_test.ContextTests']) - self.assertEqual(ret, 0) - - -class TestSetTests(TestCase): - # This class exists to test the failures that can come up if you - # create your own test sets and bypass find_tests(); failures that - # would normally be caught there can occur later during test execution. - - def test_missing_name(self): - test_set = TestSet() - test_set.parallel_tests = [TestInput('nonexistent test')] - r = Runner() - r.args.jobs = 1 - ret, _, _ = r.run(test_set) - self.assertEqual(ret, 1) - - def test_failing_load_test(self): - h = Host() - orig_wd = h.getcwd() - tmpdir = None - try: - tmpdir = h.mkdtemp() - h.chdir(tmpdir) - h.write_text_file('load_test.py', d("""\ - import unittest - def load_tests(_, _2, _3): - assert False - """)) - test_set = TestSet() - test_set.parallel_tests = [TestInput('load_test.BaseTest.test_x')] - r = Runner() - r.args.jobs = 1 - ret, _, trace = r.run(test_set) - self.assertEqual(ret, 1) - self.assertIn('BaseTest', - trace['traceEvents'][0]['args']['err']) - finally: - h.chdir(orig_wd) - if tmpdir: - h.rmtree(tmpdir) - - -class TestWinMultiprocessing(TestCase): - def make_host(self): - return Host() - - def call(self, argv, platform=None, win_multiprocessing=None, **kwargs): - h = self.make_host() - orig_wd = h.getcwd() - tmpdir = None - try: - tmpdir = h.mkdtemp() - h.chdir(tmpdir) - h.capture_output() - if platform is not None: - h.platform = platform - r = Runner(h) - if win_multiprocessing is not None: - r.win_multiprocessing = win_multiprocessing - ret = r.main(argv, **kwargs) - finally: - out, err = h.restore_output() - h.chdir(orig_wd) - if tmpdir: - h.rmtree(tmpdir) - - return ret, out, err - - def test_bad_value(self): - self.assertRaises(ValueError, self.call, [], win_multiprocessing='foo') - - def test_ignore(self): - h = self.make_host() - if h.platform == 'win32': # pragma: win32 - self.assertRaises(ValueError, self.call, [], - win_multiprocessing=WinMultiprocessing.ignore) - else: - result = self.call([], - win_multiprocessing=WinMultiprocessing.ignore) - ret, out, err = result - self.assertEqual(ret, 0) - self.assertEqual(out, '0 tests passed, 0 skipped, 0 failures.\n') - self.assertEqual(err, '') - - def test_real_unimportable_main(self): - h = self.make_host() - tmpdir = None - orig_wd = h.getcwd() - out = err = None - out_str = err_str = '' - try: - tmpdir = h.mkdtemp() - h.chdir(tmpdir) - out = tempfile.NamedTemporaryFile(delete=False) - err = tempfile.NamedTemporaryFile(delete=False) - path_above_typ = h.realpath(h.dirname(__file__), '..', '..') - env = h.env.copy() - if 'PYTHONPATH' in env: # pragma: untested - env['PYTHONPATH'] = '%s%s%s' % (env['PYTHONPATH'], - h.pathsep, - path_above_typ) - else: # pragma: untested. - env['PYTHONPATH'] = path_above_typ - - h.write_text_file('test', d(""" - import sys - import typ - importable = typ.WinMultiprocessing.importable - sys.exit(typ.main(win_multiprocessing=importable)) - """)) - h.stdout = out - h.stderr = err - ret = h.call_inline([h.python_interpreter, h.join(tmpdir, 'test')], - env=env) - finally: - h.chdir(orig_wd) - if tmpdir: - h.rmtree(tmpdir) - if out: - out.close() - out = open(out.name) - out_str = out.read() - out.close() - h.remove(out.name) - if err: - err.close() - err = open(err.name) - err_str = err.read() - err.close() - h.remove(err.name) - - self.assertEqual(ret, 1) - self.assertEqual(out_str, '') - self.assertIn('ValueError: The __main__ module ', - err_str) - - def test_single_job(self): - ret, out, err = self.call(['-j', '1'], platform='win32') - self.assertEqual(ret, 0) - self.assertEqual('0 tests passed, 0 skipped, 0 failures.\n', out ) - self.assertEqual(err, '') - - def test_spawn(self): - ret, out, err = self.call([]) - self.assertEqual(ret, 0) - self.assertEqual('0 tests passed, 0 skipped, 0 failures.\n', out) - self.assertEqual(err, '') - - -class ContextTests(TestCase): - def test_context(self): - # This test is mostly intended to be called by - # RunnerTests.test_context, above. It is not interesting on its own. - if self.context: - self.assertEquals(self.context['foo'], 'bar')
diff --git a/typ/tests/stats_test.py b/typ/tests/stats_test.py deleted file mode 100644 index 3154768..0000000 --- a/typ/tests/stats_test.py +++ /dev/null
@@ -1,74 +0,0 @@ -# Copyright 2014 Dirk Pranke. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -from typ.stats import Stats - - -class TestStats(unittest.TestCase): - - def test_basic(self): - s = Stats('foo', lambda: 0, 32) - self.assertEqual(s.format(), 'foo') - - def test_edges(self): - s = Stats('[%s/%f/%t/%r/%p]', lambda: 0, 32) - self.assertEqual(s.format(), '[0/0/0/0/-]') - s.started = 3 - s.total = 5 - s.finished = 1 - self.assertEqual(s.format(), '[3/1/5/2/ 60.0]') - - s.started = 5 - s.finished = 5 - self.assertEqual(s.format(), '[5/5/5/0/100.0]') - - def test_elapsed_time(self): - times = [0.0, 0.4] - s = Stats('[%e]', lambda: times.pop(0), 32) - self.assertEqual(s.format(), '[0.400]') - - s = Stats('[%e]', lambda: 0, 32) - self.assertEqual(s.format(), '[0.000]') - - def test_current_rate(self): - times = [0.0, 0.1, 0.2] - s = Stats('[%c]', lambda: times.pop(0), 1) - self.assertEquals(s.format(), '[-]') - s.add_time() - s.add_time() - self.assertEquals(s.format(), '[ 10.0]') - - def test_overall_rate(self): - times = [0, 0, 5] - s = Stats('[%o]', lambda: times.pop(0), 32) - self.assertEqual(s.format(), '[-]') - s.started = 3 - s.finished = 1 - s.total = 5 - self.assertEqual(s.format(), '[ 0.2]') - - def test_escaped_percent(self): - s = Stats('%%', lambda: 0, 32) - self.assertEqual(s.format(), '%') - - def test_unrecognized_escape(self): - s = Stats('%x', lambda: 0, 32) - self.assertEqual(s.format(), '%x') - - def test_remaining(self): - s = Stats('%u', lambda: 0, 32) - s.total = 2 - self.assertEqual(s.format(), '2')
diff --git a/typ/tests/test_case_test.py b/typ/tests/test_case_test.py deleted file mode 100644 index 4d22bd9..0000000 --- a/typ/tests/test_case_test.py +++ /dev/null
@@ -1,54 +0,0 @@ -# Copyright 2014 Dirk Pranke. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typ import test_case - - -class TestFuncs(test_case.MainTestCase): - - def test_convert_newlines(self): - cn = test_case.convert_newlines - self.assertEqual(cn('foo'), 'foo') - self.assertEqual(cn('foo\nbar\nbaz'), 'foo\nbar\nbaz') - self.assertEqual(cn('foo\rbar\nbaz\r'), 'foo\nbar\nbaz\n') - self.assertEqual(cn('foo\r\nbar\r\nbaz\r\nmeh\n'), - 'foo\nbar\nbaz\nmeh\n') - - -class TestMainTestCase(test_case.MainTestCase): - - def test_basic(self): - h = self.make_host() - files = { - 'test.py': """ -import os -import sys -sys.stdout.write("in: %s\\n" % sys.stdin.read()) -sys.stdout.write("out: %s\\n" % os.environ['TEST_VAR']) -sys.stderr.write("err\\n") -with open("../results", "w") as fp: - fp.write(open("../input", "r").read() + " written") -""", - 'input': 'results', - 'subdir/x': 'y', - } - exp_files = files.copy() - exp_files['results'] = 'results written' - self.check(prog=[h.python_interpreter, '../test.py'], - stdin='hello on stdin', - env={'TEST_VAR': 'foo'}, - cwd='subdir', - files=files, - ret=0, out='in: hello on stdin\nout: foo\n', - err='err\n', exp_files=exp_files)
diff --git a/typ/version.py b/typ/version.py deleted file mode 100644 index a2ca94e..0000000 --- a/typ/version.py +++ /dev/null
@@ -1,15 +0,0 @@ -# Copyright 2014 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -VERSION = '0.11.0'