Update for PEP
diff --git a/python2/crawl.py b/python2/crawl.py index cf4a218..f1d29ca 100644 --- a/python2/crawl.py +++ b/python2/crawl.py
@@ -19,26 +19,29 @@ 'http://www.youtube.com/', 'http://www.blogger.com/'] -def load_url(url): - return urllib2.urlopen(url).read() +def load_url(url, timeout): + return urllib2.urlopen(url, timeout=timeout).read() -def download_urls_sequential(urls): +def download_urls_sequential(urls, timeout=60): url_to_content = {} for url in urls: try: - url_to_content[url] = load_url(url) + url_to_content[url] = load_url(url, timeout=timeout) except: pass return url_to_content -def download_urls_with_executor(urls, executor): +def download_urls_with_executor(urls, executor, timeout=60): try: url_to_content = {} - fs = executor.run_to_futures( - (functools.partial(load_url, url) for url in urls)) - for future in fs.successful_futures(): - url = urls[future.index] - url_to_content[url] = future.result() + future_to_url = dict((executor.submit(load_url, url, timeout), url) + for url in urls) + + for future in futures.as_completed(future_to_url): + try: + url_to_content[future_to_url[future]] = future.result() + except: + pass return url_to_content finally: executor.shutdown() @@ -46,19 +49,20 @@ def main(): for name, fn in [('sequential', functools.partial(download_urls_sequential, URLS)), - ('threads', - functools.partial(download_urls_with_executor, - URLS, - futures.ThreadPoolExecutor(10))), ('processes', functools.partial(download_urls_with_executor, URLS, + futures.ProcessPoolExecutor(10))), + ('threads', + functools.partial(download_urls_with_executor, + URLS, futures.ThreadPoolExecutor(10)))]: - print '%s: ' % name.ljust(12), + print name.ljust(12), start = time.time() url_map = fn() print '%.2f seconds (%d of %d downloaded)' % (time.time() - start, - len(url_map), - len(URLS)) + len(url_map), + len(URLS)) -main() +if __name__ == '__main__': + main()
diff --git a/python2/futures/__init__.py b/python2/futures/__init__.py index 27a5720..8331d53 100644 --- a/python2/futures/__init__.py +++ b/python2/futures/__init__.py
@@ -1,18 +1,18 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. """Execute computations asynchronously using threads or processes.""" __author__ = 'Brian Quinlan (brian@sweetapp.com)' -from futures._base import (FIRST_COMPLETED, FIRST_EXCEPTION, - ALL_COMPLETED, RETURN_IMMEDIATELY, - CancelledError, TimeoutError, - Future, FutureList) +from futures._base import (FIRST_COMPLETED, + FIRST_EXCEPTION, + ALL_COMPLETED, + CancelledError, + TimeoutError, + Future, + Executor, + wait, + as_completed) +from futures.process import ProcessPoolExecutor from futures.thread import ThreadPoolExecutor - -try: - import multiprocessing -except ImportError: - pass -else: - from futures.process import ProcessPoolExecutor
diff --git a/python2/futures/_base.py b/python2/futures/_base.py index bec7212..ed7a094 100644 --- a/python2/futures/_base.py +++ b/python2/futures/_base.py
@@ -1,54 +1,24 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. __author__ = 'Brian Quinlan (brian@sweetapp.com)' +import collections +import functools import logging import threading import time -try: - from functools import partial -except ImportError: - def partial(func, *args, **keywords): - def newfunc(*fargs, **fkeywords): - newkeywords = keywords.copy() - newkeywords.update(fkeywords) - return func(*(args + fargs), **newkeywords) - newfunc.func = func - newfunc.args = args - newfunc.keywords = keywords - return newfunc - -# The "any" and "all" builtins weren't introduced until Python 2.5. -try: - any -except NameError: - def any(iterable): - for element in iterable: - if element: - return True - return False - -try: - all -except NameError: - def all(iterable): - for element in iterable: - if not element: - return False - return True - FIRST_COMPLETED = 'FIRST_COMPLETED' FIRST_EXCEPTION = 'FIRST_EXCEPTION' ALL_COMPLETED = 'ALL_COMPLETED' -RETURN_IMMEDIATELY = 'RETURN_IMMEDIATELY' # Possible future states (for internal use by the futures package). PENDING = 'PENDING' RUNNING = 'RUNNING' # The future was cancelled by the user... -CANCELLED = 'CANCELLED' -# ...and ThreadEventSink.add_cancelled() was called by a worker. +CANCELLED = 'CANCELLED' +# ...and _Waiter.add_cancelled() was called by a worker. CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' FINISHED = 'FINISHED' @@ -70,184 +40,249 @@ # Logger for internal use by the futures package. LOGGER = logging.getLogger("futures") -_handler = logging.StreamHandler() -LOGGER.addHandler(_handler) -del _handler - -def set_future_exception(future, event_sink, exception): - """Sets a future as having terminated with an exception. - - This function should only be used within the futures package. - - Args: - future: The Future that finished with an exception. - event_sink: The ThreadEventSink accociated with the Future's FutureList. - The event_sink will be notified of the Future's completion, which - may unblock some clients that have called FutureList.wait(). - exception: The expection that executing the Future raised. - """ - future._condition.acquire() - try: - future._exception = exception - event_sink._condition.acquire() - try: - future._state = FINISHED - event_sink.add_exception() - finally: - event_sink._condition.release() - - future._condition.notifyAll() - finally: - future._condition.release() - -def set_future_result(future, event_sink, result): - """Sets a future as having terminated without exception. - - This function should only be used within the futures package. - - Args: - future: The Future that completed. - event_sink: The ThreadEventSink accociated with the Future's FutureList. - The event_sink will be notified of the Future's completion, which - may unblock some clients that have called FutureList.wait(). - result: The value returned by the Future. - """ - future._condition.acquire() - try: - future._result = result - event_sink._condition.acquire() - try: - future._state = FINISHED - event_sink.add_result() - finally: - event_sink._condition.release() - - future._condition.notifyAll() - finally: - future._condition.release() +STDERR_HANDLER = logging.StreamHandler() +LOGGER.addHandler(STDERR_HANDLER) class Error(Exception): + """Base class for all future-related exceptions.""" pass class CancelledError(Error): + """The Future was cancelled.""" pass class TimeoutError(Error): + """The operation exceeded the given deadline.""" pass -class _WaitTracker(object): - """Provides the event that FutureList.wait(...) blocks on. - - """ +class _Waiter(object): + """Provides the event that wait() and as_completed() block on.""" def __init__(self): self.event = threading.Event() + self.finished_futures = [] - def add_result(self): - raise NotImplementedError() + def add_result(self, future): + self.finished_futures.append(future) - def add_exception(self): - raise NotImplementedError() + def add_exception(self, future): + self.finished_futures.append(future) - def add_cancelled(self): - raise NotImplementedError() + def add_cancelled(self, future): + self.finished_futures.append(future) -class _FirstCompletedWaitTracker(_WaitTracker): - """Used by wait(return_when=FIRST_COMPLETED).""" +class _FirstCompletedWaiter(_Waiter): + """Used by wait(return_when=FIRST_COMPLETED) and as_completed().""" - def add_result(self): + def add_result(self, future): + super(_FirstCompletedWaiter, self).add_result(future) self.event.set() - def add_exception(self): + def add_exception(self, future): + super(_FirstCompletedWaiter, self).add_exception(future) self.event.set() - def add_cancelled(self): + def add_cancelled(self, future): + super(_FirstCompletedWaiter, self).add_cancelled(future) self.event.set() -class _AllCompletedWaitTracker(_WaitTracker): +class _AllCompletedWaiter(_Waiter): """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" def __init__(self, num_pending_calls, stop_on_exception): self.num_pending_calls = num_pending_calls self.stop_on_exception = stop_on_exception - _WaitTracker.__init__(self) + super(_AllCompletedWaiter, self).__init__() - def add_result(self): + def _decrement_pending_calls(self): self.num_pending_calls -= 1 if not self.num_pending_calls: self.event.set() - def add_exception(self): + def add_result(self, future): + super(_AllCompletedWaiter, self).add_result(future) + self._decrement_pending_calls() + + def add_exception(self, future): + super(_AllCompletedWaiter, self).add_exception(future) if self.stop_on_exception: self.event.set() else: - self.add_result() + self._decrement_pending_calls() - def add_cancelled(self): - self.add_result() + def add_cancelled(self, future): + super(_AllCompletedWaiter, self).add_cancelled(future) + self._decrement_pending_calls() -class ThreadEventSink(object): - """Forwards events to many _WaitTrackers. +class _AcquireFutures(object): + """A context manager that does an ordered acquire of Future conditions.""" - Each FutureList has a ThreadEventSink and each call to FutureList.wait() - causes a new _WaitTracker to be added to the ThreadEventSink. This design - allows many threads to call FutureList.wait() on the same FutureList with - different arguments. + def __init__(self, futures): + self.futures = sorted(futures, key=id) - This class should not be used by clients. + def __enter__(self): + for future in self.futures: + future._condition.acquire() + + def __exit__(self, *args): + for future in self.futures: + future._condition.release() + +def _create_and_install_waiters(fs, return_when): + if return_when == FIRST_COMPLETED: + waiter = _FirstCompletedWaiter() + else: + pending_count = sum( + f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) + + if return_when == FIRST_EXCEPTION: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) + elif return_when == ALL_COMPLETED: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) + else: + raise ValueError("Invalid return condition: %r" % return_when) + + for f in fs: + f._waiters.append(waiter) + + return waiter + +def as_completed(fs, timeout=None): + """An iterator over the given futures that yields each as it completes. + + Args: + fs: The sequence of Futures (possibly created by different Executors) to + iterate over. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator that yields the given Futures as they complete (finished or + cancelled). + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. """ - def __init__(self): - self._condition = threading.Lock() - self._waiters = [] + if timeout is not None: + end_time = timeout + time.time() - def add(self, e): - self._waiters.append(e) + with _AcquireFutures(fs): + finished = set( + f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + pending = set(fs) - finished + waiter = _create_and_install_waiters(fs, FIRST_COMPLETED) - def remove(self, e): - self._waiters.remove(e) + try: + for future in finished: + yield future - def add_result(self): - for waiter in self._waiters: - waiter.add_result() + while pending: + if timeout is None: + wait_timeout = None + else: + wait_timeout = end_time - time.time() + if wait_timeout < 0: + raise TimeoutError( + '%d (of %d) futures unfinished' % ( + len(pending), len(fs))) - def add_exception(self): - for waiter in self._waiters: - waiter.add_exception() + waiter.event.wait(timeout) - def add_cancelled(self): - for waiter in self._waiters: - waiter.add_cancelled() + for future in waiter.finished_futures[:]: + yield future + waiter.finished_futures.remove(future) + pending.remove(future) + + finally: + for f in fs: + f._waiters.remove(waiter) + +DoneAndNotDoneFutures = collections.namedtuple( + 'DoneAndNotDoneFutures', 'done not_done') +def wait(fs, timeout=None, return_when=ALL_COMPLETED): + """Wait for the futures in the given sequence to complete. + + Args: + fs: The sequence of Futures (possibly created by different Executors) to + wait upon. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when this function should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises an exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + + Returns: + A named 2-tuple of sets. The first set, named 'done', contains the + futures that completed (is finished or cancelled) before the wait + completed. The second set, named 'not_done', contains uncompleted + futures. + """ + with _AcquireFutures(fs): + done = set(f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + not_done = set(fs) - done + + if (return_when == FIRST_COMPLETED) and done: + return DoneAndNotDoneFutures(done, not_done) + elif (return_when == FIRST_EXCEPTION) and done: + if any(f for f in done + if not f.cancelled() and f.exception() is not None): + return DoneAndNotDoneFutures(done, not_done) + + if len(done) == len(fs): + return DoneAndNotDoneFutures(done, not_done) + + waiter = _create_and_install_waiters(fs, return_when) + + waiter.event.wait(timeout) + for f in fs: + f._waiters.remove(waiter) + + done.update(waiter.finished_futures) + return DoneAndNotDoneFutures(done, set(fs) - done) class Future(object): """Represents the result of an asynchronous computation.""" - def __init__(self, index): + def __init__(self): """Initializes the future. Should not be called by clients.""" self._condition = threading.Condition() self._state = PENDING self._result = None self._exception = None - self._index = index + self._waiters = [] + self._done_callbacks = [] + + def _invoke_callbacks(self): + for callback in self._done_callbacks: + try: + callback(self) + except Exception: + LOGGER.exception('exception calling callback for %r', self) def __repr__(self): - self._condition.acquire() - try: + with self._condition: if self._state == FINISHED: if self._exception: - return '<Future state=%s raised %s>' % ( + return '<Future at %s state=%s raised %s>' % ( + hex(id(self)), _STATE_TO_DESCRIPTION_MAP[self._state], self._exception.__class__.__name__) else: - return '<Future state=%s returned %s>' % ( + return '<Future at %s state=%s returned %s>' % ( + hex(id(self)), _STATE_TO_DESCRIPTION_MAP[self._state], self._result.__class__.__name__) - return '<Future state=%s>' % _STATE_TO_DESCRIPTION_MAP[self._state] - finally: - self._condition.release() - - @property - def index(self): - """The index of the future in its FutureList.""" - return self._index + return '<Future at %s state=%s>' % ( + hex(id(self)), + _STATE_TO_DESCRIPTION_MAP[self._state]) def cancel(self): """Cancel the future if possible. @@ -255,40 +290,33 @@ Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed. """ - self._condition.acquire() - try: + with self._condition: if self._state in [RUNNING, FINISHED]: return False - if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED]: - self._state = CANCELLED - self._condition.notify_all() - return True - finally: - self._condition.release() + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + return True + + self._state = CANCELLED + self._condition.notify_all() + + self._invoke_callbacks() + return True def cancelled(self): """Return True if the future has cancelled.""" - self._condition.acquire() - try: + with self._condition: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] - finally: - self._condition.release() def running(self): - self._condition.acquire() - try: + """Return True if the future is currently executing.""" + with self._condition: return self._state == RUNNING - finally: - self._condition.release() def done(self): """Return True of the future was cancelled or finished executing.""" - self._condition.acquire() - try: + with self._condition: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] - finally: - self._condition.release() def __get_result(self): if self._exception: @@ -296,6 +324,23 @@ else: return self._result + def add_done_callback(self, fn): + """Attaches a callable that will be called when the future finishes. + + Args: + fn: A callable that will be called with this future as its only + argument when the future completes or is cancelled. The callable + will always be called by a thread in the same process in which + it was added. If the future has already completed or been + cancelled then the callable will be called immediately. These + callables are called in the order that they were added. + """ + with self._condition: + if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: + self._done_callbacks.append(fn) + return + fn(self) + def result(self, timeout=None): """Return the result of the call that the future represents. @@ -312,8 +357,7 @@ timeout. Exception: If the call raised then that exception will be raised. """ - self._condition.acquire() - try: + with self._condition: if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() elif self._state == FINISHED: @@ -327,8 +371,6 @@ return self.__get_result() else: raise TimeoutError() - finally: - self._condition.release() def exception(self, timeout=None): """Return the exception raised by the call that the future represents. @@ -348,8 +390,7 @@ timeout. """ - self._condition.acquire() - try: + with self._condition: if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: raise CancelledError() elif self._state == FINISHED: @@ -363,237 +404,94 @@ return self._exception else: raise TimeoutError() - finally: - self._condition.release() -class FutureList(object): - def __init__(self, futures, event_sink): - """Initializes the FutureList. Should not be called by clients.""" - self._futures = futures - self._event_sink = event_sink + # The following methods should only be used by Executors and in tests. + def set_running_or_notify_cancel(self): + """Mark the future as running or process any cancel notifications. - def wait(self, timeout=None, return_when=ALL_COMPLETED): - """Wait for the futures in the list to complete. + Should only be used by Executor implementations and unit tests. - Args: - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - return_when: Indicates when the method should return. The options - are: + If the future has been cancelled (cancel() was called and returned + True) then any threads waiting on the future completing (though calls + to as_completed() or wait()) are notified and False is returned. - FIRST_COMPLETED - Return when any future finishes or is - cancelled. - FIRST_EXCEPTION - Return when any future finishes by raising an - exception. If no future raises and exception - then it is equivalent to ALL_COMPLETED. - ALL_COMPLETED - Return when all futures finish or are cancelled. - RETURN_IMMEDIATELY - Return without waiting (this is not likely - to be a useful option but it is there to - be symmetrical with the - executor.run_to_futures() method. + If the future was not cancelled then it is put in the running state + (future calls to running() will return True) and True is returned. + + This method should be called by Executor implementations before + executing the work associated with this future. If this method returns + False then the work should not be executed. + + Returns: + False if the Future was cancelled, True otherwise. Raises: - TimeoutError: If the wait condition wasn't satisfied before the - given timeout. + RuntimeError: if this method was already called or if set_result() + or set_exception() was called. """ - if return_when == RETURN_IMMEDIATELY: - return - - # Futures cannot change state without this condition being held. - self._event_sink._condition.acquire() - try: - # Make a quick exit if every future is already done. This check is - # necessary because, if every future is in the - # CANCELLED_AND_NOTIFIED or FINISHED state then the WaitTracker will - # never receive any events. - if all(f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] - for f in self): - return - - if return_when == FIRST_COMPLETED: - completed_tracker = _FirstCompletedWaitTracker() + with self._condition: + if self._state == CANCELLED: + self._state = CANCELLED_AND_NOTIFIED + for waiter in self._waiters: + waiter.add_cancelled(self) + # self._condition.notify_all() is not necessary because + # self.cancel() triggers a notification. + return False + elif self._state == PENDING: + self._state = RUNNING + return True else: - # Calculate how many events are expected before every future - # is complete. This can be done without holding the futures' - # locks because a future cannot transition itself into either - # of the states being looked for. - pending_count = sum( - f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] - for f in self) + LOGGER.critical('Future %s in unexpected state: %s', + id(self.future), + self.future._state) + raise RuntimeError('Future in unexpected state') - if return_when == FIRST_EXCEPTION: - completed_tracker = _AllCompletedWaitTracker( - pending_count, stop_on_exception=True) - elif return_when == ALL_COMPLETED: - completed_tracker = _AllCompletedWaitTracker( - pending_count, stop_on_exception=False) + def set_result(self, result): + """Sets the return value of work associated with the future. - self._event_sink.add(completed_tracker) - finally: - self._event_sink._condition.release() - - try: - completed_tracker.event.wait(timeout) - finally: - self._event_sink.remove(completed_tracker) - - def cancel(self, timeout=None): - """Cancel the futures in the list. - - Args: - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - - Raises: - TimeoutError: If all the futures were not finished before the - given timeout. + Should only be used by Executor implementations and unit tests. """ - for f in self: - f.cancel() - self.wait(timeout=timeout, return_when=ALL_COMPLETED) - if any(not f.done() for f in self): - raise TimeoutError() + with self._condition: + self._result = result + self._state = FINISHED + for waiter in self._waiters: + waiter.add_result(self) + self._condition.notify_all() + self._invoke_callbacks() - def has_running_futures(self): - """Returns True if any futures in the list are still running.""" - return any(self.running_futures()) + def set_exception(self, exception): + """Sets the result of the future as being the given exception. - def has_cancelled_futures(self): - """Returns True if any futures in the list were cancelled.""" - return any(self.cancelled_futures()) - - def has_done_futures(self): - """Returns True if any futures in the list are finished or cancelled.""" - return any(self.done_futures()) - - def has_successful_futures(self): - """Returns True if any futures in the list finished without raising.""" - return any(self.successful_futures()) - - def has_exception_futures(self): - """Returns True if any futures in the list finished by raising.""" - return any(self.exception_futures()) - - def cancelled_futures(self): - """Returns all cancelled futures in the list.""" - return (f for f in self - if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED]) - - def done_futures(self): - """Returns all futures in the list that are finished or cancelled.""" - return (f for f in self - if f._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]) - - def successful_futures(self): - """Returns all futures in the list that finished without raising.""" - return (f for f in self - if f._state == FINISHED and f._exception is None) - - def exception_futures(self): - """Returns all futures in the list that finished by raising.""" - return (f for f in self - if f._state == FINISHED and f._exception is not None) - - def running_futures(self): - """Returns all futures in the list that are still running.""" - return (f for f in self if f._state == RUNNING) - - def __len__(self): - return len(self._futures) - - def __getitem__(self, i): - return self._futures[i] - - def __iter__(self): - return iter(self._futures) - - def __contains__(self, future): - return future in self._futures - - def __repr__(self): - states = dict([(state, 0) for state in _FUTURE_STATES]) - for f in self: - states[f._state] += 1 - - return ('<FutureList #futures=%d ' - '[#pending=%d #cancelled=%d #running=%d #finished=%d]>' % ( - len(self), - states[PENDING], - states[CANCELLED] + states[CANCELLED_AND_NOTIFIED], - states[RUNNING], - states[FINISHED])) + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + self._exception = exception + self._state = FINISHED + for waiter in self._waiters: + waiter.add_exception(self) + self._condition.notify_all() + self._invoke_callbacks() class Executor(object): """This is an abstract base class for concrete asynchronous executors.""" - def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): - """Return a list of futures representing the given calls. - Args: - calls: A sequence of callables that take no arguments. These will - be bound to Futures and returned. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - return_when: Indicates when the method should return. The options - are: + def submit(self, fn, *args, **kwargs): + """Submits a callable to be executed with the given arguments. - FIRST_COMPLETED - Return when any future finishes or is - cancelled. - FIRST_EXCEPTION - Return when any future finishes by raising an - exception. If no future raises and exception - then it is equivalent to ALL_COMPLETED. - ALL_COMPLETED - Return when all futures finish or are cancelled. - RETURN_IMMEDIATELY - Return without waiting. + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a Future instance representing the execution of the callable. Returns: - A FutureList containing Futures for the given calls. + A Future representing the given call. """ raise NotImplementedError() - def run_to_results(self, calls, timeout=None): - """Returns a iterator of the results of the given calls. - - Args: - calls: A sequence of callables that take no arguments. These will - be called and their results returned. - timeout: The maximum number of seconds to wait. If None, then there - is no limit on the wait time. - - Returns: - An iterator over the results of the given calls. Equivalent to: - (call() for call in calls) but the calls may be evaluated - out-of-order. - - Raises: - TimeoutError: If all the given calls were not completed before the - given timeout. - Exception: If any call() raises. - """ - if timeout is not None: - end_time = timeout + time.time() - - fs = self.run_to_futures(calls, return_when=RETURN_IMMEDIATELY) - - try: - for future in fs: - if timeout is None: - yield future.result() - else: - yield future.result(end_time - time.time()) - except Exception, e: - # Python 2.4 and earlier don't allow yield statements in - # try/finally blocks - try: - fs.cancel(timeout=0) - except TimeoutError: - pass - raise e - - def map(self, func, *iterables, **kwargs): + def map(self, fn, *iterables, **kwargs): """Returns a iterator equivalent to map(fn, iter). Args: - func: A callable that will take take as many arguments as there - are passed iterables. + fn: A callable that will take take as many arguments as there are + passed iterables. timeout: The maximum number of seconds to wait. If None, then there is no limit on the wait time. @@ -606,17 +504,38 @@ before the given timeout. Exception: If fn(*args) raises for any values. """ - timeout = kwargs.get('timeout') or None - calls = [partial(func, *args) for args in zip(*iterables)] - return self.run_to_results(calls, timeout=timeout) + timeout = kwargs.get('timeout') + if timeout is not None: + end_time = timeout + time.time() - def shutdown(self): - """Clean-up. No other methods can be called afterwards.""" - raise NotImplementedError() + fs = [self.submit(fn, *args) for args in zip(*iterables)] + + try: + for future in fs: + if timeout is None: + yield future.result() + else: + yield future.result(end_time - time.time()) + finally: + for future in fs: + future.cancel() + + def shutdown(self, wait=True): + """Clean-up the resources associated with the Executor. + + It is safe to call this method several times. Otherwise, no other + methods can be called after this one. + + Args: + wait: If True then shutdown will not return until all running + futures have finished executing and the resources used by the + executor have been reclaimed. + """ + pass def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): - self.shutdown() + self.shutdown(wait=True) return False
diff --git a/python2/futures/process.py b/python2/futures/process.py index f0d7fdf..ec48377 100644 --- a/python2/futures/process.py +++ b/python2/futures/process.py
@@ -1,4 +1,5 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. """Implements ProcessPoolExecutor. @@ -23,9 +24,8 @@ | | | ... | | | | 3, except | | | +----------+ +------------+ +--------+ +-----------+ +---------+ -Executor.run_to_futures() called: -- creates a uniquely numbered _WorkItem for each call and adds them to the - "Work Items" dict +Executor.submit() called: +- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict - adds the id of the _WorkItem to the "Work Ids" queue Local worker thread: @@ -42,15 +42,11 @@ - reads _CallItems from "Call Q", executes the calls, and puts the resulting _ResultItems in "Request Q" """ - + __author__ = 'Brian Quinlan (brian@sweetapp.com)' -from futures._base import (PENDING, RUNNING, CANCELLED, - CANCELLED_AND_NOTIFIED, FINISHED, - ALL_COMPLETED, - set_future_exception, set_future_result, - Executor, Future, FutureList, ThreadEventSink) import atexit +import _base import Queue import multiprocessing import threading @@ -63,7 +59,7 @@ # - The workers would still be running during interpretor shutdown, # meaning that they would fail in unpredictable ways. # - The workers could be killed while evaluating a work item, which could -# be bad if the function being evaluated has external side-effects e.g. +# be bad if the callable being evaluated has external side-effects e.g. # writing to a file. # # To work around this problem, an exit handler is installed which tells the @@ -100,10 +96,11 @@ EXTRA_QUEUED_CALLS = 1 class _WorkItem(object): - def __init__(self, call, future, completion_tracker): - self.call = call + def __init__(self, future, fn, args, kwargs): self.future = future - self.completion_tracker = completion_tracker + self.fn = fn + self.args = args + self.kwargs = kwargs class _ResultItem(object): def __init__(self, work_id, exception=None, result=None): @@ -112,9 +109,11 @@ self.result = result class _CallItem(object): - def __init__(self, work_id, call): + def __init__(self, work_id, fn, args, kwargs): self.work_id = work_id - self.call = call + self.fn = fn + self.args = args + self.kwargs = kwargs def _process_worker(call_queue, result_queue, shutdown): """Evaluates calls from call_queue and places the results in result_queue. @@ -137,8 +136,8 @@ return else: try: - r = call_item.call() - except Exception, e: + r = call_item.fn(*call_item.args, **call_item.kwargs) + except BaseException as e: result_queue.put(_ResultItem(call_item.work_id, exception=e)) else: @@ -172,19 +171,15 @@ else: work_item = pending_work_items[work_id] - if work_item.future.cancelled(): - work_item.future._condition.acquire() - work_item.future._condition.notify_all() - work_item.future._condition.release() - - work_item.completion_tracker.add_cancelled() + if work_item.future.set_running_or_notify_cancel(): + call_queue.put(_CallItem(work_id, + work_item.fn, + work_item.args, + work_item.kwargs), + block=True) + else: del pending_work_items[work_id] continue - else: - work_item.future._condition.acquire() - work_item.future._state = RUNNING - work_item.future._condition.release() - call_queue.put(_CallItem(work_id, work_item.call), block=True) def _queue_manangement_worker(executor_reference, processes, @@ -218,6 +213,7 @@ _add_call_item_to_queue(pending_work_items, work_ids_queue, call_queue) + try: result_item = result_queue.get(block=True, timeout=0.1) except Queue.Empty: @@ -244,15 +240,11 @@ del pending_work_items[result_item.work_id] if result_item.exception: - set_future_exception(work_item.future, - work_item.completion_tracker, - result_item.exception) + work_item.future.set_exception(result_item.exception) else: - set_future_result(work_item.future, - work_item.completion_tracker, - result_item.result) + work_item.future.set_result(result_item.result) -class ProcessPoolExecutor(Executor): +class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None): """Initializes a new ProcessPoolExecutor instance. @@ -296,7 +288,7 @@ self._call_queue, self._result_queue, self._shutdown_process_event)) - self._queue_management_thread.setDaemon(True) + self._queue_management_thread.daemon = True self._queue_management_thread.start() _thread_references.add(weakref.ref(self._queue_management_thread)) @@ -310,36 +302,36 @@ p.start() self._processes.add(p) - def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): - self._shutdown_lock.acquire() - try: + def submit(self, fn, *args, **kwargs): + with self._shutdown_lock: if self._shutdown_thread: - raise RuntimeError('cannot run new futures after shutdown') + raise RuntimeError('cannot schedule new futures after shutdown') - futures = [] - event_sink = ThreadEventSink() + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) - for index, call in enumerate(calls): - f = Future(index) - self._pending_work_items[self._queue_count] = _WorkItem( - call, f, event_sink) - self._work_ids.put(self._queue_count) - futures.append(f) - self._queue_count += 1 + self._pending_work_items[self._queue_count] = w + self._work_ids.put(self._queue_count) + self._queue_count += 1 self._start_queue_management_thread() self._adjust_process_count() - fl = FutureList(futures, event_sink) - fl.wait(timeout=timeout, return_when=return_when) - return fl - finally: - self._shutdown_lock.release() + return f + submit.__doc__ = _base.Executor.submit.__doc__ - def shutdown(self): - self._shutdown_lock.acquire() - try: + def shutdown(self, wait=True): + with self._shutdown_lock: self._shutdown_thread = True - finally: - self._shutdown_lock.release() + if wait: + if self._queue_management_thread: + self._queue_management_thread.join() + # To reduce the risk of openning too many files, remove references to + # objects that use file descriptors. + self._queue_management_thread = None + self._call_queue = None + self._result_queue = None + self._shutdown_process_event = None + self._processes = None + shutdown.__doc__ = _base.Executor.shutdown.__doc__ -atexit.register(_python_exit) \ No newline at end of file +atexit.register(_python_exit)
diff --git a/python2/futures/thread.py b/python2/futures/thread.py index 4f410fe..3f1584a 100644 --- a/python2/futures/thread.py +++ b/python2/futures/thread.py
@@ -1,16 +1,12 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. See LICENSE file. +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. """Implements ThreadPoolExecutor.""" __author__ = 'Brian Quinlan (brian@sweetapp.com)' -from futures._base import (PENDING, RUNNING, CANCELLED, - CANCELLED_AND_NOTIFIED, FINISHED, - ALL_COMPLETED, - LOGGER, - set_future_exception, set_future_result, - Executor, Future, FutureList, ThreadEventSink) import atexit +import _base import Queue import threading import weakref @@ -22,15 +18,15 @@ # - The workers would still be running during interpretor shutdown, # meaning that they would fail in unpredictable ways. # - The workers could be killed while evaluating a work item, which could -# be bad if the function being evaluated has external side-effects e.g. +# be bad if the callable being evaluated has external side-effects e.g. # writing to a file. # # To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until they -# finish. +# workers to exit when their work queues are empty and then waits until the +# threads finish. -_thread_references = set() # Weakrefs to every active worker thread. -_shutdown = False # Indicates that the interpreter is shutting down. +_thread_references = set() +_shutdown = False def _python_exit(): global _shutdown @@ -43,11 +39,10 @@ def _remove_dead_thread_references(): """Remove inactive threads from _thread_references. - Should be called periodically to prevent thread objects from accumulating in - scenarios such as: + Should be called periodically to prevent memory leaks in scenarios such as: >>> while True: - >>> ... t = ThreadPoolExecutor(max_workers=5) - >>> ... t.map(int, ['1', '2', '3', '4', '5']) + ... t = ThreadPoolExecutor(max_workers=5) + ... t.map(int, ['1', '2', '3', '4', '5']) """ for thread_reference in set(_thread_references): if thread_reference() is None: @@ -56,38 +51,22 @@ atexit.register(_python_exit) class _WorkItem(object): - def __init__(self, call, future, completion_tracker): - self.call = call + def __init__(self, future, fn, args, kwargs): self.future = future - self.completion_tracker = completion_tracker + self.fn = fn + self.args = args + self.kwargs = kwargs def run(self): - self.future._condition.acquire() - try: - if self.future._state == PENDING: - self.future._state = RUNNING - elif self.future._state == CANCELLED: - self.completion_tracker._condition.acquire() - try: - self.future._state = CANCELLED_AND_NOTIFIED - self.completion_tracker.add_cancelled() - return - finally: - self.completion_tracker._condition.release() - else: - LOGGER.critical('Future %s in unexpected state: %d', - id(self.future), - self.future._state) - return - finally: - self.future._condition.release() + if not self.future.set_running_or_notify_cancel(): + return try: - result = self.call() - except Exception, e: - set_future_exception(self.future, self.completion_tracker, e) + result = self.fn(*self.args, **self.kwargs) + except BaseException as e: + self.future.set_exception(e) else: - set_future_result(self.future, self.completion_tracker, result) + self.future.set_result(result) def _worker(executor_reference, work_queue): try: @@ -105,10 +84,10 @@ del executor else: work_item.run() - except Exception, e: - LOGGER.critical('Exception in worker', exc_info=True) + except BaseException as e: + _base.LOGGER.critical('Exception in worker', exc_info=True) -class ThreadPoolExecutor(Executor): +class ThreadPoolExecutor(_base.Executor): def __init__(self, max_workers): """Initializes a new ThreadPoolExecutor instance. @@ -124,42 +103,34 @@ self._shutdown = False self._shutdown_lock = threading.Lock() + def submit(self, fn, *args, **kwargs): + with self._shutdown_lock: + if self._shutdown: + raise RuntimeError('cannot schedule new futures after shutdown') + + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) + + self._work_queue.put(w) + self._adjust_thread_count() + return f + submit.__doc__ = _base.Executor.submit.__doc__ + def _adjust_thread_count(self): - for _ in range(len(self._threads), - min(self._max_workers, self._work_queue.qsize())): + # TODO(bquinlan): Should avoid creating new threads if there are more + # idle threads than items in the work queue. + if len(self._threads) < self._max_workers: t = threading.Thread(target=_worker, args=(weakref.ref(self), self._work_queue)) - t.setDaemon(True) + t.daemon = True t.start() self._threads.add(t) _thread_references.add(weakref.ref(t)) - def run_to_futures(self, calls, timeout=None, return_when=ALL_COMPLETED): - self._shutdown_lock.acquire() - try: - if self._shutdown: - raise RuntimeError('cannot run new futures after shutdown') - - futures = [] - event_sink = ThreadEventSink() - for index, call in enumerate(calls): - f = Future(index) - w = _WorkItem(call, f, event_sink) - self._work_queue.put(w) - futures.append(f) - - self._adjust_thread_count() - fl = FutureList(futures, event_sink) - fl.wait(timeout=timeout, return_when=return_when) - return fl - finally: - self._shutdown_lock.release() - run_to_futures.__doc__ = Executor.run_to_futures.__doc__ - - def shutdown(self): - self._shutdown_lock.acquire() - try: + def shutdown(self, wait=True): + with self._shutdown_lock: self._shutdown = True - finally: - self._shutdown_lock.release() - shutdown.__doc__ = Executor.shutdown.__doc__ + if wait: + for t in self._threads: + t.join() + shutdown.__doc__ = _base.Executor.shutdown.__doc__
diff --git a/python2/primes.py b/python2/primes.py index 0b2bf81..fa6c355 100644 --- a/python2/primes.py +++ b/python2/primes.py
@@ -25,29 +25,23 @@ return list(map(is_prime, PRIMES)) def with_process_pool_executor(): - executor = futures.ProcessPoolExecutor(10) - try: + with futures.ProcessPoolExecutor(10) as executor: return list(executor.map(is_prime, PRIMES)) - finally: - executor.shutdown() def with_thread_pool_executor(): - executor = futures.ThreadPoolExecutor(10) - try: + with futures.ThreadPoolExecutor(10) as executor: return list(executor.map(is_prime, PRIMES)) - finally: - executor.shutdown() def main(): for name, fn in [('sequential', sequential), ('processes', with_process_pool_executor), ('threads', with_thread_pool_executor)]: - print '%s: ' % name.ljust(12), - + print name.ljust(12), start = time.time() if fn() != [True] * len(PRIMES): print 'failed' else: print '%.2f seconds' % (time.time() - start) -main() \ No newline at end of file +if __name__ == '__main__': + main()
diff --git a/python2/setup.py b/python2/setup.py index 897dc86..fcd05f2 100755 --- a/python2/setup.py +++ b/python2/setup.py
@@ -1,10 +1,10 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 from distutils.core import setup -setup(name='futures', +setup(name='futures3', version='1.0', - description='Java-style futures implementation in Python 2.x', + description='Java-style futures implementation in Python 3.x', author='Brian Quinlan', author_email='brian@sweetapp.com', url='http://code.google.com/p/pythonfutures', @@ -14,5 +14,5 @@ classifiers=['License :: OSI Approved :: BSD License', 'Development Status :: 5 - Production/Stable', 'Intended Audience :: Developers', - 'Programming Language :: Python :: 2'] + 'Programming Language :: Python :: 3'] )
diff --git a/python2/test_futures.py b/python2/test_futures.py index cf74286..2d5672b 100644 --- a/python2/test_futures.py +++ b/python2/test_futures.py
@@ -1,16 +1,25 @@ -import unittest -import threading -import time +import logging import multiprocessing +import re +import StringIO +import sys +import threading from test import test_support +import time +import unittest + +if sys.platform.startswith('win'): + import ctypes + import ctypes.wintypes import futures -import futures._base from futures._base import ( - PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) + PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, + LOGGER, STDERR_HANDLER, wait) +import futures.process def create_future(state=PENDING, exception=None, result=None): - f = Future(0) + f = Future() f._state = state f._exception = exception f._result = result @@ -23,68 +32,104 @@ EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) +def mul(x, y): + return x * y + class Call(object): + """A call that can be submitted to a future.Executor for testing. + + The call signals when it is called and waits for an event before finishing. + """ CALL_LOCKS = {} + def _create_event(self): + if sys.platform.startswith('win'): + class SECURITY_ATTRIBUTES(ctypes.Structure): + _fields_ = [("nLength", ctypes.wintypes.DWORD), + ("lpSecurityDescriptor", ctypes.wintypes.LPVOID), + ("bInheritHandle", ctypes.wintypes.BOOL)] + + s = SECURITY_ATTRIBUTES() + s.nLength = ctypes.sizeof(s) + s.lpSecurityDescriptor = None + s.bInheritHandle = True + + handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s), + True, + False, + None) + assert handle is not None + return handle + else: + event = multiprocessing.Event() + self.CALL_LOCKS[id(event)] = event + return id(event) + + def _wait_on_event(self, handle): + if sys.platform.startswith('win'): + r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000) + assert r == 0 + else: + self.CALL_LOCKS[handle].wait() + + def _signal_event(self, handle): + if sys.platform.startswith('win'): + r = ctypes.windll.kernel32.SetEvent(handle) + assert r != 0 + else: + self.CALL_LOCKS[handle].set() + def __init__(self, manual_finish=False, result=42): - called_event = multiprocessing.Event() - can_finish = multiprocessing.Event() + self._called_event = self._create_event() + self._can_finish = self._create_event() self._result = result - self._called_event_id = id(called_event) - self._can_finish_event_id = id(can_finish) - - self.CALL_LOCKS[self._called_event_id] = called_event - self.CALL_LOCKS[self._can_finish_event_id] = can_finish if not manual_finish: - self._can_finish.set() - - @property - def _can_finish(self): - return self.CALL_LOCKS[self._can_finish_event_id] - - @property - def _called_event(self): - return self.CALL_LOCKS[self._called_event_id] + self._signal_event(self._can_finish) def wait_on_called(self): - self._called_event.wait() + self._wait_on_event(self._called_event) def set_can(self): - self._can_finish.set() - - def called(self): - return self._called_event.is_set() + self._signal_event(self._can_finish) def __call__(self): - if self._called_event.is_set(): print('called twice') + self._signal_event(self._called_event) + self._wait_on_event(self._can_finish) - self._called_event.set() - self._can_finish.wait() return self._result def close(self): - del self.CALL_LOCKS[self._called_event_id] - del self.CALL_LOCKS[self._can_finish_event_id] + self.set_can() + if sys.platform.startswith('win'): + ctypes.windll.kernel32.CloseHandle(self._called_event) + ctypes.windll.kernel32.CloseHandle(self._can_finish) + else: + del self.CALL_LOCKS[self._called_event] + del self.CALL_LOCKS[self._can_finish] class ExceptionCall(Call): def __call__(self): - assert not self._called_event.is_set(), 'already called' - - self._called_event.set() - self._can_finish.wait() + self._signal_event(self._called_event) + self._wait_on_event(self._can_finish) raise ZeroDivisionError() +class MapCall(Call): + def __init__(self, result=42): + super(MapCall, self).__init__(manual_finish=True, result=result) + + def __call__(self, manual_finish): + if manual_finish: + super(MapCall, self).__call__() + return self._result + class ExecutorShutdownTest(unittest.TestCase): def test_run_after_shutdown(self): - call1 = Call() - try: - self.executor.shutdown() - self.assertRaises(RuntimeError, - self.executor.run_to_futures, - [call1]) - finally: - call1.close() + self.executor.shutdown() + self.assertRaises(RuntimeError, + self.executor.submit, + pow, 2, 5) + def _start_some_futures(self): call1 = Call(manual_finish=True) @@ -92,13 +137,14 @@ call3 = Call(manual_finish=True) try: - self.executor.run_to_futures([call1, call2, call3], - return_when=futures.RETURN_IMMEDIATELY) - + self.executor.submit(call1) + self.executor.submit(call2) + self.executor.submit(call3) + call1.wait_on_called() call2.wait_on_called() call3.wait_on_called() - + call1.set_can() call2.set_can() call3.set_can() @@ -112,7 +158,7 @@ self.executor = futures.ThreadPoolExecutor(max_workers=5) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) def test_threads_terminate(self): self._start_some_futures() @@ -144,15 +190,15 @@ self.executor = futures.ProcessPoolExecutor(max_workers=5) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) def test_processes_terminate(self): self._start_some_futures() self.assertEqual(len(self.executor._processes), 5) + processes = self.executor._processes self.executor.shutdown() - self.executor._queue_management_thread.join() - for p in self.executor._processes: + for p in processes: p.join() def test_context_manager_shutdown(self): @@ -161,8 +207,7 @@ self.assertEqual(list(e.map(abs, range(-5, 5))), [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) - executor._queue_management_thread.join() - for p in executor._processes: + for p in self.executor._processes: p.join() def test_del_shutdown(self): @@ -176,312 +221,316 @@ for p in processes: p.join() -class WaitsTest(unittest.TestCase): - def test_concurrent_waits(self): - def wait_for_ALL_COMPLETED(): - fs.wait(return_when=futures.ALL_COMPLETED) - self.assertTrue(f1.done()) - self.assertTrue(f2.done()) - self.assertTrue(f3.done()) - self.assertTrue(f4.done()) - all_completed.release() +class WaitTests(unittest.TestCase): + def test_first_completed(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() - def wait_for_FIRST_COMPLETED(): - fs.wait(return_when=futures.FIRST_COMPLETED) - self.assertTrue(f1.done()) - self.assertFalse(f2.done()) - self.assertFalse(f3.done()) - self.assertFalse(f4.done()) - first_completed.release() - def wait_for_FIRST_EXCEPTION(): - fs.wait(return_when=futures.FIRST_EXCEPTION) - self.assertTrue(f1.done()) - self.assertTrue(f2.done()) - self.assertFalse(f3.done()) - self.assertFalse(f4.done()) - first_exception.release() + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) - all_completed = threading.Semaphore(0) - first_completed = threading.Semaphore(0) - first_exception = threading.Semaphore(0) + t = threading.Thread(target=wait_test) + t.start() + done, not_done = futures.wait( + [CANCELLED_FUTURE, future1, future2], + return_when=futures.FIRST_COMPLETED) + + self.assertEquals(set([future1]), done) + self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done) + finally: + call1.close() + call2.close() + + def test_first_completed_one_already_completed(self): + call1 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, future1], + return_when=futures.FIRST_COMPLETED) + + self.assertEquals(set([SUCCESSFUL_FUTURE]), finished) + self.assertEquals(set([future1]), pending) + finally: + call1.close() + + def test_first_exception(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + call2.set_can() call1 = Call(manual_finish=True) call2 = ExceptionCall(manual_finish=True) call3 = Call(manual_finish=True) - call4 = Call() - try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - threads = [] - for wait_test in [wait_for_ALL_COMPLETED, - wait_for_FIRST_COMPLETED, - wait_for_FIRST_EXCEPTION]: - t = threading.Thread(target=wait_test) - t.start() - threads.append(t) - - time.sleep(1) # give threads enough time to execute wait + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + future3 = self.executor.submit(call3) - call1.set_can() - first_completed.acquire() - call2.set_can() - first_exception.acquire() - call3.set_can() - all_completed.acquire() - - self.executor.shutdown() - finally: - call1.close() - call2.close() - call3.close() - call4.close() - -class ThreadPoolWaitTests(WaitsTest): - def setUp(self): - self.executor = futures.ThreadPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown() - -class ProcessPoolWaitTests(WaitsTest): - def setUp(self): - self.executor = futures.ProcessPoolExecutor(max_workers=1) - - def tearDown(self): - self.executor.shutdown() - -class CancelTests(unittest.TestCase): - def test_cancel_states(self): - call1 = Call(manual_finish=True) - call2 = Call() - call3 = Call() - call4 = Call() - - try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - call1.wait_on_called() - self.assertEqual(f1.cancel(), False) - self.assertEqual(f2.cancel(), True) - self.assertEqual(f4.cancel(), True) - self.assertEqual(f1.cancelled(), False) - self.assertEqual(f2.cancelled(), True) - self.assertEqual(f3.cancelled(), False) - self.assertEqual(f4.cancelled(), True) - self.assertEqual(f1.done(), False) - self.assertEqual(f2.done(), True) - self.assertEqual(f3.done(), False) - self.assertEqual(f4.done(), True) - - call1.set_can() - fs.wait(return_when=futures.ALL_COMPLETED) - self.assertEqual(f1.result(), 42) - self.assertRaises(futures.CancelledError, f2.result) - self.assertRaises(futures.CancelledError, f2.exception) - self.assertEqual(f3.result(), 42) - self.assertRaises(futures.CancelledError, f4.result) - self.assertRaises(futures.CancelledError, f4.exception) - - self.assertEqual(call2.called(), False) - self.assertEqual(call4.called(), False) - finally: - call1.close() - call2.close() - call3.close() - call4.close() - - def test_wait_for_individual_cancel_while_waiting(self): - def end_call(): - # Wait until the main thread is waiting on the results of the - # future. - time.sleep(1) - f2.cancel() - call1.set_can() - - call1 = Call(manual_finish=True) - call2 = Call() - - try: - fs = self.executor.run_to_futures( - [call1, call2], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2 = fs - - call1.wait_on_called() - t = threading.Thread(target=end_call) + t = threading.Thread(target=wait_test) t.start() - self.assertRaises(futures.CancelledError, f2.result) - self.assertRaises(futures.CancelledError, f2.exception) - t.join() + finished, pending = futures.wait( + [future1, future2, future3], + return_when=futures.FIRST_EXCEPTION) + + self.assertEquals(set([future1, future2]), finished) + self.assertEquals(set([future3]), pending) + finally: + call1.close() + call2.close() + call3.close() + + def test_first_exception_some_already_complete(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + + call1 = ExceptionCall(manual_finish=True) + call2 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2], + return_when=futures.FIRST_EXCEPTION) + + self.assertEquals(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1]), finished) + self.assertEquals(set([CANCELLED_FUTURE, future2]), pending) + + finally: call1.close() call2.close() - def test_wait_with_already_cancelled_futures(self): + def test_first_exception_one_already_failed(self): call1 = Call(manual_finish=True) - call2 = Call() - call3 = Call() - call4 = Call() - try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - call1.wait_on_called() - self.assertTrue(f2.cancel()) - self.assertTrue(f3.cancel()) + future1 = self.executor.submit(call1) + + finished, pending = futures.wait( + [EXCEPTION_FUTURE, future1], + return_when=futures.FIRST_EXCEPTION) + + self.assertEquals(set([EXCEPTION_FUTURE]), finished) + self.assertEquals(set([future1]), pending) + finally: + call1.close() + + def test_all_completed(self): + def wait_test(): + while not future1._waiters: + pass call1.set_can() - - fs.wait(return_when=futures.ALL_COMPLETED) + call2.set_can() + + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [future1, future2], + return_when=futures.ALL_COMPLETED) + + self.assertEquals(set([future1, future2]), finished) + self.assertEquals(set(), pending) + + + finally: + call1.close() + call2.close() + + def test_all_completed_some_already_completed(self): + def wait_test(): + while not future1._waiters: + pass + + future4.cancel() + call1.set_can() + call2.set_can() + call3.set_can() + + self.assertTrue( + futures.process.EXTRA_QUEUED_CALLS <= 1, + 'this test assumes that future4 will be cancelled before it is ' + 'queued to run - which might not be the case if ' + 'ProcessPoolExecutor is too aggresive in scheduling futures') + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) + call3 = Call(manual_finish=True) + call4 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + future3 = self.executor.submit(call3) + future4 = self.executor.submit(call4) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2, future3, future4], + return_when=futures.ALL_COMPLETED) + + self.assertEquals(set([SUCCESSFUL_FUTURE, + CANCELLED_AND_NOTIFIED_FUTURE, + future1, future2, future3, future4]), + finished) + self.assertEquals(set(), pending) finally: call1.close() call2.close() call3.close() call4.close() - def test_cancel_all(self): - call1 = Call(manual_finish=True) - call2 = Call() - call3 = Call() - call4 = Call() - - try: - fs = self.executor.run_to_futures( - [call1, call2, call3, call4], - return_when=futures.RETURN_IMMEDIATELY) - f1, f2, f3, f4 = fs - - call1.wait_on_called() - self.assertRaises(futures.TimeoutError, fs.cancel, timeout=0) + def test_timeout(self): + def wait_test(): + while not future1._waiters: + pass call1.set_can() - fs.cancel() - - self.assertFalse(f1.cancelled()) - self.assertTrue(f2.cancelled()) - self.assertTrue(f3.cancelled()) - self.assertTrue(f4.cancelled()) + + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) + t.start() + finished, pending = futures.wait( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2], + timeout=1, + return_when=futures.ALL_COMPLETED) + + self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1]), finished) + self.assertEquals(set([future2]), pending) + + finally: call1.close() call2.close() - call3.close() - call4.close() -class ThreadPoolCancelTests(CancelTests): + +class ThreadPoolWaitTests(WaitTests): def setUp(self): self.executor = futures.ThreadPoolExecutor(max_workers=1) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) -class ProcessPoolCancelTests(WaitsTest): +class ProcessPoolWaitTests(WaitTests): def setUp(self): self.executor = futures.ProcessPoolExecutor(max_workers=1) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) + +class AsCompletedTests(unittest.TestCase): + # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. + def test_no_timeout(self): + def wait_test(): + while not future1._waiters: + pass + call1.set_can() + call2.set_can() + + call1 = Call(manual_finish=True) + call2 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + future2 = self.executor.submit(call2) + + t = threading.Thread(target=wait_test) + t.start() + completed = set(futures.as_completed( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2])) + self.assertEquals(set( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1, future2]), + completed) + finally: + call1.close() + call2.close() + + def test_zero_timeout(self): + call1 = Call(manual_finish=True) + try: + future1 = self.executor.submit(call1) + completed_futures = set() + try: + for future in futures.as_completed( + [CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE, + future1], + timeout=0): + completed_futures.add(future) + except futures.TimeoutError: + pass + + self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE, + EXCEPTION_FUTURE, + SUCCESSFUL_FUTURE]), + completed_futures) + finally: + call1.close() + +class ThreadPoolAsCompletedTests(AsCompletedTests): + def setUp(self): + self.executor = futures.ThreadPoolExecutor(max_workers=1) + + def tearDown(self): + self.executor.shutdown(wait=True) + +class ProcessPoolAsCompletedTests(AsCompletedTests): + def setUp(self): + self.executor = futures.ProcessPoolExecutor(max_workers=1) + + def tearDown(self): + self.executor.shutdown(wait=True) class ExecutorTest(unittest.TestCase): # Executor.shutdown() and context manager usage is tested by # ExecutorShutdownTest. - def test_run_to_futures(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = Call(manual_finish=True) - call4 = Call() - call5 = Call() + def test_submit(self): + future = self.executor.submit(pow, 2, 8) + self.assertEquals(256, future.result()) - try: - f1, f2, f3, f4, f5 = self.executor.run_to_futures( - [call1, call2, call3, call4, call5], - return_when=futures.RETURN_IMMEDIATELY) - - call3.wait_on_called() - - # ProcessPoolExecutor uses a thread to propogate results into the - # future. Calling result() ensures that the thread has done its work - # before doing the next set of checks. - f1.result() - f2.result() - - self.assertTrue(f1.done()) - self.assertFalse(f1.running()) - self.assertEqual(f1.index, 0) - - self.assertTrue(f2.done()) - self.assertFalse(f2.running()) - self.assertEqual(f2.index, 1) - - self.assertFalse(f3.done()) - self.assertTrue(f3.running()) - self.assertEqual(f3.index, 2) - - # ProcessPoolExecutor may mark some futures as running before they - # actually are so don't check these ones. - self.assertFalse(f4.done()) - self.assertEqual(f4.index, 3) - - self.assertFalse(f5.done()) - self.assertEqual(f5.index, 4) - finally: - call3.set_can() # Let the call finish executing. - call1.close() - call2.close() - call3.close() - call4.close() - call5.close() - - def test_run_to_results(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = Call(result=3) - try: - self.assertEqual( - list(self.executor.run_to_results([call1, call2, call3])), - [1, 2, 3]) - finally: - call1.close() - call2.close() - call3.close() - - def test_run_to_results_exception(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = ExceptionCall() - try: - i = self.executor.run_to_results([call1, call2, call3]) - - self.assertEqual(i.next(), 1) - self.assertEqual(i.next(), 2) - self.assertRaises(ZeroDivisionError, i.next) - finally: - call1.close() - call2.close() - call3.close() - - def test_run_to_results_timeout(self): - call1 = Call(result=1) - call2 = Call(result=2) - call3 = Call(manual_finish=True) - - try: - i = self.executor.run_to_results([call1, call2, call3], timeout=1) - self.assertEqual(i.next(), 1) - self.assertEqual(i.next(), 2) - self.assertRaises(futures.TimeoutError, i.next) - call3.set_can() - finally: - call1.close() - call2.close() - call3.close() + def test_submit_keyword(self): + future = self.executor.submit(mul, 2, y=8) + self.assertEquals(16, future.result()) def test_map(self): self.assertEqual( @@ -494,36 +543,142 @@ self.assertEqual(i.next(), (0, 1)) self.assertRaises(ZeroDivisionError, i.next) + def test_map_timeout(self): + results = [] + timeout_call = MapCall() + try: + try: + for i in self.executor.map(timeout_call, + [False, False, True], + timeout=1): + results.append(i) + except futures.TimeoutError: + pass + else: + self.fail('expected TimeoutError') + finally: + timeout_call.close() + + self.assertEquals([42, 42], results) + class ThreadPoolExecutorTest(ExecutorTest): def setUp(self): self.executor = futures.ThreadPoolExecutor(max_workers=1) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) class ProcessPoolExecutorTest(ExecutorTest): def setUp(self): self.executor = futures.ProcessPoolExecutor(max_workers=1) def tearDown(self): - self.executor.shutdown() + self.executor.shutdown(wait=True) class FutureTests(unittest.TestCase): - # Future.index() is tested by ExecutorTest - # Future.cancel() is further tested by CancelTests. + def test_done_callback_with_result(self): + self.callback_result = None + def fn(callback_future): + self.callback_result = callback_future.result() + + f = Future() + f.add_done_callback(fn) + f.set_result(5) + self.assertEquals(5, self.callback_result) + + def test_done_callback_with_exception(self): + self.callback_exception = None + def fn(callback_future): + self.callback_exception = callback_future.exception() + + f = Future() + f.add_done_callback(fn) + f.set_exception(Exception('test')) + self.assertEquals(('test',), self.callback_exception.args) + + def test_done_callback_with_cancel(self): + self.was_cancelled = None + def fn(callback_future): + self.was_cancelled = callback_future.cancelled() + + f = Future() + f.add_done_callback(fn) + self.assertTrue(f.cancel()) + self.assertTrue(self.was_cancelled) + + def test_done_callback_raises(self): + LOGGER.removeHandler(STDERR_HANDLER) + logging_stream = StringIO.StringIO() + handler = logging.StreamHandler(logging_stream) + LOGGER.addHandler(handler) + try: + self.raising_was_called = False + self.fn_was_called = False + + def raising_fn(callback_future): + self.raising_was_called = True + raise Exception('doh!') + + def fn(callback_future): + self.fn_was_called = True + + f = Future() + f.add_done_callback(raising_fn) + f.add_done_callback(fn) + f.set_result(5) + self.assertTrue(self.raising_was_called) + self.assertTrue(self.fn_was_called) + self.assertTrue('Exception: doh!' in logging_stream.getvalue()) + finally: + LOGGER.removeHandler(handler) + LOGGER.addHandler(STDERR_HANDLER) + + def test_done_callback_already_successful(self): + self.callback_result = None + def fn(callback_future): + self.callback_result = callback_future.result() + + f = Future() + f.set_result(5) + f.add_done_callback(fn) + self.assertEquals(5, self.callback_result) + + def test_done_callback_already_failed(self): + self.callback_exception = None + def fn(callback_future): + self.callback_exception = callback_future.exception() + + f = Future() + f.set_exception(Exception('test')) + f.add_done_callback(fn) + self.assertEquals(('test',), self.callback_exception.args) + + def test_done_callback_already_cancelled(self): + self.was_cancelled = None + def fn(callback_future): + self.was_cancelled = callback_future.cancelled() + + f = Future() + self.assertTrue(f.cancel()) + f.add_done_callback(fn) + self.assertTrue(self.was_cancelled) def test_repr(self): - self.assertEqual(repr(PENDING_FUTURE), '<Future state=pending>') - self.assertEqual(repr(RUNNING_FUTURE), '<Future state=running>') - self.assertEqual(repr(CANCELLED_FUTURE), '<Future state=cancelled>') - self.assertEqual(repr(CANCELLED_AND_NOTIFIED_FUTURE), - '<Future state=cancelled>') - self.assertEqual(repr(EXCEPTION_FUTURE), - '<Future state=finished raised IOError>') - self.assertEqual(repr(SUCCESSFUL_FUTURE), - '<Future state=finished returned int>') + self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=pending>', + repr(PENDING_FUTURE))) + self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=running>', + repr(RUNNING_FUTURE))) + self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=cancelled>', + repr(CANCELLED_FUTURE))) + self.assertTrue(re.match('<Future at 0x[0-9a-f]+L? state=cancelled>', + repr(CANCELLED_AND_NOTIFIED_FUTURE))) + self.assertTrue(re.match( + '<Future at 0x[0-9a-f]+L? state=finished raised IOError>', + repr(EXCEPTION_FUTURE))) + self.assertTrue(re.match( + '<Future at 0x[0-9a-f]+L? state=finished returned int>', + repr(SUCCESSFUL_FUTURE))) - create_future def test_cancel(self): f1 = create_future(state=PENDING) @@ -588,13 +743,11 @@ self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) def test_result_with_success(self): + # TODO(brian@sweetapp.com): This test is timing dependant. def notification(): # Wait until the main thread is waiting for the result. time.sleep(1) - with f1._condition: - f1._state = FINISHED - f1._result = 42 - f1._condition.notify_all() + f1.set_result(42) f1 = create_future(state=PENDING) t = threading.Thread(target=notification) @@ -603,12 +756,11 @@ self.assertEquals(f1.result(timeout=5), 42) def test_result_with_cancel(self): + # TODO(brian@sweetapp.com): This test is timing dependant. def notification(): # Wait until the main thread is waiting for the result. time.sleep(1) - with f1._condition: - f1._state = CANCELLED - f1._condition.notify_all() + f1.cancel() f1 = create_future(state=PENDING) t = threading.Thread(target=notification) @@ -644,186 +796,16 @@ self.assertTrue(isinstance(f1.exception(timeout=5), IOError)) -class FutureListTests(unittest.TestCase): - # FutureList.wait() is further tested by WaitsTest. - # FutureList.cancel() is tested by CancelTests. - def test_wait_RETURN_IMMEDIATELY(self): - f = futures.FutureList(futures=None, event_sink=None) - f.wait(return_when=futures.RETURN_IMMEDIATELY) - - def test_wait_timeout(self): - f = futures.FutureList([PENDING_FUTURE], - futures._base.ThreadEventSink()) - - for t in [futures.FIRST_COMPLETED, - futures.FIRST_EXCEPTION, - futures.ALL_COMPLETED]: - f.wait(timeout=0.1, return_when=t) - self.assertFalse(PENDING_FUTURE.done()) - - def test_wait_all_done(self): - f = futures.FutureList([CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - SUCCESSFUL_FUTURE, - EXCEPTION_FUTURE], - futures._base.ThreadEventSink()) - - f.wait(return_when=futures.ALL_COMPLETED) - - def test_filters(self): - fs = [PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE] - f = futures.FutureList(fs, None) - - self.assertEqual(list(f.running_futures()), [RUNNING_FUTURE]) - self.assertEqual(list(f.cancelled_futures()), - [CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE]) - self.assertEqual(list(f.done_futures()), - [CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE, - SUCCESSFUL_FUTURE]) - self.assertEqual(list(f.successful_futures()), - [SUCCESSFUL_FUTURE]) - self.assertEqual(list(f.exception_futures()), - [EXCEPTION_FUTURE]) - - def test_has_running_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - SUCCESSFUL_FUTURE, - EXCEPTION_FUTURE], - None).has_running_futures()) - self.assertTrue( - futures.FutureList([RUNNING_FUTURE], - None).has_running_futures()) - - def test_has_cancelled_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - SUCCESSFUL_FUTURE, - EXCEPTION_FUTURE], - None).has_cancelled_futures()) - self.assertTrue( - futures.FutureList([CANCELLED_FUTURE], - None).has_cancelled_futures()) - - self.assertTrue( - futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE], - None).has_cancelled_futures()) - - def test_has_done_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE], - None).has_done_futures()) - self.assertTrue( - futures.FutureList([CANCELLED_FUTURE], - None).has_done_futures()) - - self.assertTrue( - futures.FutureList([CANCELLED_AND_NOTIFIED_FUTURE], - None).has_done_futures()) - - self.assertTrue( - futures.FutureList([EXCEPTION_FUTURE], - None).has_done_futures()) - - self.assertTrue( - futures.FutureList([SUCCESSFUL_FUTURE], - None).has_done_futures()) - - def test_has_successful_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - EXCEPTION_FUTURE], - None).has_successful_futures()) - - self.assertTrue( - futures.FutureList([SUCCESSFUL_FUTURE], - None).has_successful_futures()) - - def test_has_exception_futures(self): - self.assertFalse( - futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE, - CANCELLED_AND_NOTIFIED_FUTURE, - SUCCESSFUL_FUTURE], - None).has_exception_futures()) - - self.assertTrue( - futures.FutureList([EXCEPTION_FUTURE], - None).has_exception_futures()) - - def test_get_item(self): - fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE] - f = futures.FutureList(fs, None) - self.assertEqual(f[0], PENDING_FUTURE) - self.assertEqual(f[1], RUNNING_FUTURE) - self.assertEqual(f[2], CANCELLED_FUTURE) - self.assertRaises(IndexError, f.__getitem__, 3) - - def test_len(self): - f = futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE, - CANCELLED_FUTURE], - None) - self.assertEqual(len(f), 3) - - def test_iter(self): - fs = [PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE] - f = futures.FutureList(fs, None) - self.assertEqual(list(iter(f)), fs) - - def test_contains(self): - f = futures.FutureList([PENDING_FUTURE, - RUNNING_FUTURE], - None) - self.assertTrue(PENDING_FUTURE in f) - self.assertTrue(RUNNING_FUTURE in f) - self.assertFalse(CANCELLED_FUTURE in f) - - def test_repr(self): - pending = create_future(state=PENDING) - cancelled = create_future(state=CANCELLED) - cancelled2 = create_future(state=CANCELLED_AND_NOTIFIED) - running = create_future(state=RUNNING) - finished = create_future(state=FINISHED) - - f = futures.FutureList( - [PENDING_FUTURE] * 4 + [CANCELLED_FUTURE] * 2 + - [CANCELLED_AND_NOTIFIED_FUTURE] + - [RUNNING_FUTURE] * 2 + - [SUCCESSFUL_FUTURE, EXCEPTION_FUTURE] * 3, - None) - - self.assertEqual(repr(f), - '<FutureList #futures=15 ' - '[#pending=4 #cancelled=3 #running=2 #finished=6]>') - def test_main(): - test_support.run_unittest(ProcessPoolCancelTests, - ThreadPoolCancelTests, - ProcessPoolExecutorTest, + test_support.run_unittest(ProcessPoolExecutorTest, ThreadPoolExecutorTest, ProcessPoolWaitTests, ThreadPoolWaitTests, + ProcessPoolAsCompletedTests, + ThreadPoolAsCompletedTests, FutureTests, - FutureListTests, ProcessPoolShutdownTest, ThreadPoolShutdownTest) if __name__ == "__main__": - test_main() \ No newline at end of file + test_main()