| # -*- coding: utf-8 -*- |
| import errno |
| import platform |
| import re |
| import warnings |
| |
| import pep8 |
| |
| from flake8 import __version__ |
| from flake8 import callbacks |
| from flake8.reporter import (multiprocessing, BaseQReport, FileQReport, |
| QueueReport) |
| from flake8 import util |
| |
| _flake8_noqa = re.compile(r'\s*# flake8[:=]\s*noqa', re.I).search |
| |
| EXTRA_EXCLUDE = ['.tox', '.eggs', '*.egg'] |
| |
| pep8.PROJECT_CONFIG += ('.flake8',) |
| |
| |
| def _load_entry_point(entry_point, verify_requirements): |
| """Based on the version of setuptools load an entry-point correctly. |
| |
| setuptools 11.3 deprecated `require=False` in the call to EntryPoint.load. |
| To load entry points correctly after that without requiring all |
| dependencies be present, the proper way is to call EntryPoint.resolve. |
| |
| This function will provide backwards compatibility for older versions of |
| setuptools while also ensuring we do the right thing for the future. |
| """ |
| if hasattr(entry_point, 'resolve') and hasattr(entry_point, 'require'): |
| if verify_requirements: |
| entry_point.require() |
| plugin = entry_point.resolve() |
| else: |
| plugin = entry_point.load(require=verify_requirements) |
| |
| return plugin |
| |
| |
| def _register_extensions(): |
| """Register all the extensions.""" |
| extensions = util.OrderedSet() |
| extensions.add(('pep8', pep8.__version__)) |
| parser_hooks = [] |
| options_hooks = [] |
| ignored_hooks = [] |
| try: |
| from pkg_resources import iter_entry_points |
| except ImportError: |
| pass |
| else: |
| for entry in iter_entry_points('flake8.extension'): |
| # Do not verify that the requirements versions are valid |
| checker = _load_entry_point(entry, verify_requirements=False) |
| pep8.register_check(checker, codes=[entry.name]) |
| extensions.add((checker.name, checker.version)) |
| if hasattr(checker, 'add_options'): |
| parser_hooks.append(checker.add_options) |
| if hasattr(checker, 'parse_options'): |
| options_hooks.append(checker.parse_options) |
| if getattr(checker, 'off_by_default', False) is True: |
| ignored_hooks.append(entry.name) |
| return extensions, parser_hooks, options_hooks, ignored_hooks |
| |
| |
| def get_parser(): |
| """This returns an instance of optparse.OptionParser with all the |
| extensions registered and options set. This wraps ``pep8.get_parser``. |
| """ |
| (extensions, parser_hooks, options_hooks, ignored) = _register_extensions() |
| details = ', '.join('%s: %s' % ext for ext in extensions) |
| python_version = get_python_version() |
| parser = pep8.get_parser('flake8', '%s (%s) %s' % ( |
| __version__, details, python_version |
| )) |
| for opt in ('--repeat', '--testsuite', '--doctest'): |
| try: |
| parser.remove_option(opt) |
| except ValueError: |
| pass |
| |
| if multiprocessing: |
| parser.config_options.append('jobs') |
| parser.add_option('-j', '--jobs', type='string', default='auto', |
| help="number of jobs to run simultaneously, " |
| "or 'auto'. This is ignored on Windows.") |
| |
| parser.add_option('--exit-zero', action='store_true', |
| help="exit with code 0 even if there are errors") |
| for parser_hook in parser_hooks: |
| parser_hook(parser) |
| # See comment above regarding why this has to be a callback. |
| parser.add_option('--install-hook', default=False, dest='install_hook', |
| help='Install the appropriate hook for this ' |
| 'repository.', action='callback', |
| callback=callbacks.install_vcs_hook) |
| parser.add_option('--output-file', default=None, |
| help='Redirect report to a file.', |
| type='string', nargs=1, action='callback', |
| callback=callbacks.redirect_stdout) |
| parser.add_option('--enable-extensions', default='', |
| dest='enabled_extensions', |
| help='Enable plugins and extensions that are disabled ' |
| 'by default', |
| type='string') |
| parser.ignored_extensions = ignored |
| return parser, options_hooks |
| |
| |
| class NoQAStyleGuide(pep8.StyleGuide): |
| |
| def input_file(self, filename, lines=None, expected=None, line_offset=0): |
| """Run all checks on a Python source file.""" |
| if self.options.verbose: |
| print('checking %s' % filename) |
| fchecker = self.checker_class( |
| filename, lines=lines, options=self.options) |
| # Any "flake8: noqa" comments to ignore the entire file? |
| if any(_flake8_noqa(line) for line in fchecker.lines): |
| return 0 |
| return fchecker.check_all(expected=expected, line_offset=line_offset) |
| |
| |
| class StyleGuide(object): |
| """A wrapper StyleGuide object for Flake8 usage. |
| |
| This allows for OSErrors to be caught in the styleguide and special logic |
| to be used to handle those errors. |
| """ |
| |
| # Reasoning for error numbers is in-line below |
| serial_retry_errors = set([ |
| # ENOSPC: Added by sigmavirus24 |
| # > On some operating systems (OSX), multiprocessing may cause an |
| # > ENOSPC error while trying to trying to create a Semaphore. |
| # > In those cases, we should replace the customized Queue Report |
| # > class with pep8's StandardReport class to ensure users don't run |
| # > into this problem. |
| # > (See also: https://gitlab.com/pycqa/flake8/issues/74) |
| errno.ENOSPC, |
| # NOTE(sigmavirus24): When adding to this list, include the reasoning |
| # on the lines before the error code and always append your error |
| # code. Further, please always add a trailing `,` to reduce the visual |
| # noise in diffs. |
| ]) |
| |
| def __init__(self, **kwargs): |
| # This allows us to inject a mocked StyleGuide in the tests. |
| self._styleguide = kwargs.pop('styleguide', NoQAStyleGuide(**kwargs)) |
| |
| @property |
| def options(self): |
| return self._styleguide.options |
| |
| @property |
| def paths(self): |
| return self._styleguide.paths |
| |
| def _retry_serial(self, func, *args, **kwargs): |
| """This will retry the passed function in serial if necessary. |
| |
| In the event that we encounter an OSError with an errno in |
| :attr:`serial_retry_errors`, this function will retry this function |
| using pep8's default Report class which operates in serial. |
| """ |
| try: |
| return func(*args, **kwargs) |
| except OSError as oserr: |
| if oserr.errno in self.serial_retry_errors: |
| self.init_report(pep8.StandardReport) |
| else: |
| raise |
| return func(*args, **kwargs) |
| |
| def check_files(self, paths=None): |
| return self._retry_serial(self._styleguide.check_files, paths=paths) |
| |
| def excluded(self, filename, parent=None): |
| return self._styleguide.excluded(filename, parent=parent) |
| |
| def init_report(self, reporter=None): |
| return self._styleguide.init_report(reporter) |
| |
| def input_file(self, filename, lines=None, expected=None, line_offset=0): |
| return self._retry_serial( |
| self._styleguide.input_file, |
| filename=filename, |
| lines=lines, |
| expected=expected, |
| line_offset=line_offset, |
| ) |
| |
| |
| def _parse_multi_options(options, split_token=','): |
| r"""Split and strip and discard empties. |
| |
| Turns the following: |
| |
| A, |
| B, |
| |
| into ["A", "B"]. |
| |
| Credit: Kristian Glass as contributed to pep8 |
| """ |
| if options: |
| return [o.strip() for o in options.split(split_token) if o.strip()] |
| else: |
| return options |
| |
| |
| def _disable_extensions(parser, options): |
| ignored_extensions = set(getattr(parser, 'ignored_extensions', [])) |
| enabled = set(_parse_multi_options(options.enabled_extensions)) |
| |
| # Remove any of the selected extensions from the extensions ignored by |
| # default. |
| ignored_extensions -= enabled |
| |
| # Whatever is left afterwards should be unioned with options.ignore and |
| # options.ignore should be updated with that. |
| options.ignore = tuple(ignored_extensions.union(options.ignore)) |
| |
| |
| def get_style_guide(**kwargs): |
| """Parse the options and configure the checker. This returns a sub-class |
| of ``pep8.StyleGuide``.""" |
| kwargs['parser'], options_hooks = get_parser() |
| styleguide = StyleGuide(**kwargs) |
| options = styleguide.options |
| |
| _disable_extensions(kwargs['parser'], options) |
| |
| if options.exclude and not isinstance(options.exclude, list): |
| options.exclude = pep8.normalize_paths(options.exclude) |
| elif not options.exclude: |
| options.exclude = [] |
| |
| # Add patterns in EXTRA_EXCLUDE to the list of excluded patterns |
| options.exclude.extend(pep8.normalize_paths(EXTRA_EXCLUDE)) |
| |
| for options_hook in options_hooks: |
| options_hook(options) |
| |
| if util.warn_when_using_jobs(options): |
| if not multiprocessing: |
| warnings.warn("The multiprocessing module is not available. " |
| "Ignoring --jobs arguments.") |
| if util.is_windows(): |
| warnings.warn("The --jobs option is not available on Windows. " |
| "Ignoring --jobs arguments.") |
| if util.is_using_stdin(styleguide.paths): |
| warnings.warn("The --jobs option is not compatible with supplying " |
| "input using - . Ignoring --jobs arguments.") |
| if options.diff: |
| warnings.warn("The --diff option was specified with --jobs but " |
| "they are not compatible. Ignoring --jobs arguments." |
| ) |
| |
| if options.diff: |
| options.jobs = None |
| |
| force_disable_jobs = util.force_disable_jobs(styleguide) |
| |
| if multiprocessing and options.jobs and not force_disable_jobs: |
| if options.jobs.isdigit(): |
| n_jobs = int(options.jobs) |
| else: |
| try: |
| n_jobs = multiprocessing.cpu_count() |
| except NotImplementedError: |
| n_jobs = 1 |
| if n_jobs > 1: |
| options.jobs = n_jobs |
| reporter = QueueReport |
| if options.quiet: |
| reporter = BaseQReport |
| if options.quiet == 1: |
| reporter = FileQReport |
| report = styleguide.init_report(reporter) |
| report.input_file = styleguide.input_file |
| styleguide.runner = report.task_queue.put |
| |
| return styleguide |
| |
| |
| def get_python_version(): |
| # The implementation isn't all that important. |
| try: |
| impl = platform.python_implementation() + " " |
| except AttributeError: # Python 2.5 |
| impl = '' |
| return '%s%s on %s' % (impl, platform.python_version(), platform.system()) |