| """Implements InterpreterPoolExecutor.""" |
| |
| import contextlib |
| import pickle |
| import textwrap |
| from . import thread as _thread |
| import _interpreters |
| import _interpqueues |
| |
| |
| class ExecutionFailed(_interpreters.InterpreterError): |
| """An unhandled exception happened during execution.""" |
| |
| def __init__(self, excinfo): |
| msg = excinfo.formatted |
| if not msg: |
| if excinfo.type and excinfo.msg: |
| msg = f'{excinfo.type.__name__}: {excinfo.msg}' |
| else: |
| msg = excinfo.type.__name__ or excinfo.msg |
| super().__init__(msg) |
| self.excinfo = excinfo |
| |
| def __str__(self): |
| try: |
| formatted = self.excinfo.errdisplay |
| except Exception: |
| return super().__str__() |
| else: |
| return textwrap.dedent(f""" |
| {super().__str__()} |
| |
| Uncaught in the interpreter: |
| |
| {formatted} |
| """.strip()) |
| |
| |
| UNBOUND = 2 # error; this should not happen. |
| |
| |
| class WorkerContext(_thread.WorkerContext): |
| |
| @classmethod |
| def prepare(cls, initializer, initargs, shared): |
| def resolve_task(fn, args, kwargs): |
| if isinstance(fn, str): |
| # XXX Circle back to this later. |
| raise TypeError('scripts not supported') |
| if args or kwargs: |
| raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}') |
| data = textwrap.dedent(fn) |
| kind = 'script' |
| # Make sure the script compiles. |
| # Ideally we wouldn't throw away the resulting code |
| # object. However, there isn't much to be done until |
| # code objects are shareable and/or we do a better job |
| # of supporting code objects in _interpreters.exec(). |
| compile(data, '<string>', 'exec') |
| else: |
| # Functions defined in the __main__ module can't be pickled, |
| # so they can't be used here. In the future, we could possibly |
| # borrow from multiprocessing to work around this. |
| data = pickle.dumps((fn, args, kwargs)) |
| kind = 'function' |
| return (data, kind) |
| |
| if initializer is not None: |
| try: |
| initdata = resolve_task(initializer, initargs, {}) |
| except ValueError: |
| if isinstance(initializer, str) and initargs: |
| raise ValueError(f'an initializer script does not take args, got {initargs!r}') |
| raise # re-raise |
| else: |
| initdata = None |
| def create_context(): |
| return cls(initdata, shared) |
| return create_context, resolve_task |
| |
| @classmethod |
| @contextlib.contextmanager |
| def _capture_exc(cls, resultsid): |
| try: |
| yield |
| except BaseException as exc: |
| # Send the captured exception out on the results queue, |
| # but still leave it unhandled for the interpreter to handle. |
| err = pickle.dumps(exc) |
| _interpqueues.put(resultsid, (None, err), 1, UNBOUND) |
| raise # re-raise |
| |
| @classmethod |
| def _send_script_result(cls, resultsid): |
| _interpqueues.put(resultsid, (None, None), 0, UNBOUND) |
| |
| @classmethod |
| def _call(cls, func, args, kwargs, resultsid): |
| with cls._capture_exc(resultsid): |
| res = func(*args or (), **kwargs or {}) |
| # Send the result back. |
| try: |
| _interpqueues.put(resultsid, (res, None), 0, UNBOUND) |
| except _interpreters.NotShareableError: |
| res = pickle.dumps(res) |
| _interpqueues.put(resultsid, (res, None), 1, UNBOUND) |
| |
| @classmethod |
| def _call_pickled(cls, pickled, resultsid): |
| with cls._capture_exc(resultsid): |
| fn, args, kwargs = pickle.loads(pickled) |
| cls._call(fn, args, kwargs, resultsid) |
| |
| def __init__(self, initdata, shared=None): |
| self.initdata = initdata |
| self.shared = dict(shared) if shared else None |
| self.interpid = None |
| self.resultsid = None |
| |
| def __del__(self): |
| if self.interpid is not None: |
| self.finalize() |
| |
| def _exec(self, script): |
| assert self.interpid is not None |
| excinfo = _interpreters.exec(self.interpid, script, restrict=True) |
| if excinfo is not None: |
| raise ExecutionFailed(excinfo) |
| |
| def initialize(self): |
| assert self.interpid is None, self.interpid |
| self.interpid = _interpreters.create(reqrefs=True) |
| try: |
| _interpreters.incref(self.interpid) |
| |
| maxsize = 0 |
| fmt = 0 |
| self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND) |
| |
| self._exec(f'from {__name__} import WorkerContext') |
| |
| if self.shared: |
| _interpreters.set___main___attrs( |
| self.interpid, self.shared, restrict=True) |
| |
| if self.initdata: |
| self.run(self.initdata) |
| except BaseException: |
| self.finalize() |
| raise # re-raise |
| |
| def finalize(self): |
| interpid = self.interpid |
| resultsid = self.resultsid |
| self.resultsid = None |
| self.interpid = None |
| if resultsid is not None: |
| try: |
| _interpqueues.destroy(resultsid) |
| except _interpqueues.QueueNotFoundError: |
| pass |
| if interpid is not None: |
| try: |
| _interpreters.decref(interpid) |
| except _interpreters.InterpreterNotFoundError: |
| pass |
| |
| def run(self, task): |
| data, kind = task |
| if kind == 'script': |
| raise NotImplementedError('script kind disabled') |
| script = f""" |
| with WorkerContext._capture_exc({self.resultsid}): |
| {textwrap.indent(data, ' ')} |
| WorkerContext._send_script_result({self.resultsid})""" |
| elif kind == 'function': |
| script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})' |
| else: |
| raise NotImplementedError(kind) |
| |
| try: |
| self._exec(script) |
| except ExecutionFailed as exc: |
| exc_wrapper = exc |
| else: |
| exc_wrapper = None |
| |
| # Return the result, or raise the exception. |
| while True: |
| try: |
| obj = _interpqueues.get(self.resultsid) |
| except _interpqueues.QueueNotFoundError: |
| raise # re-raise |
| except _interpqueues.QueueError: |
| continue |
| except ModuleNotFoundError: |
| # interpreters.queues doesn't exist, which means |
| # QueueEmpty doesn't. Act as though it does. |
| continue |
| else: |
| break |
| (res, excdata), pickled, unboundop = obj |
| assert unboundop is None, unboundop |
| if excdata is not None: |
| assert res is None, res |
| assert pickled |
| assert exc_wrapper is not None |
| exc = pickle.loads(excdata) |
| raise exc from exc_wrapper |
| return pickle.loads(res) if pickled else res |
| |
| |
| class BrokenInterpreterPool(_thread.BrokenThreadPool): |
| """ |
| Raised when a worker thread in an InterpreterPoolExecutor failed initializing. |
| """ |
| |
| |
| class InterpreterPoolExecutor(_thread.ThreadPoolExecutor): |
| |
| BROKEN = BrokenInterpreterPool |
| |
| @classmethod |
| def prepare_context(cls, initializer, initargs, shared): |
| return WorkerContext.prepare(initializer, initargs, shared) |
| |
| def __init__(self, max_workers=None, thread_name_prefix='', |
| initializer=None, initargs=(), shared=None): |
| """Initializes a new InterpreterPoolExecutor instance. |
| |
| Args: |
| max_workers: The maximum number of interpreters that can be used to |
| execute the given calls. |
| thread_name_prefix: An optional name prefix to give our threads. |
| initializer: A callable or script used to initialize |
| each worker interpreter. |
| initargs: A tuple of arguments to pass to the initializer. |
| shared: A mapping of shareabled objects to be inserted into |
| each worker interpreter. |
| """ |
| super().__init__(max_workers, thread_name_prefix, |
| initializer, initargs, shared=shared) |