| """A tasklet decorator. |
| |
| Tasklets are a way to write concurrently running functions without |
| threads; tasklets are executed by an event loop and can suspend |
| themselves blocking for I/O or some other operation using a yield |
| statement. The notion of a blocking operation is abstracted into the |
| Future class, but a tasklet may also yield an RPC in order to wait for |
| that RPC to complete. |
| |
| The @tasklet decorator wraps generator function so that when it is |
| called, a Future is returned while the generator is executed by the |
| event loop. Within the tasklet, any yield of a Future waits for and |
| returns the Future's result. For example: |
| |
| @tasklet |
| def foo(): |
| a = yield <some Future> |
| b = yield <another Future> |
| raise Return(a + b) |
| |
| def main(): |
| f = foo() |
| x = f.get_result() |
| print x |
| |
| Note that blocking until the Future's result is available using |
| get_result() is somewhat inefficient (though not vastly -- it is not |
| busy-waiting). In most cases such code should be rewritten as a tasklet |
| instead: |
| |
| @tasklet |
| def main_tasklet(): |
| f = foo() |
| x = yield f |
| print x |
| |
| Calling a tasklet automatically schedules it with the event loop: |
| |
| def main(): |
| f = main_tasklet() |
| eventloop.run() # Run until no tasklets left to do |
| f.done() # Returns True |
| |
| As a special feature, if the wrapped function is not a generator |
| function, its return value is returned via the Future. This makes the |
| following two equivalent: |
| |
| @tasklet |
| def foo(): |
| return 42 |
| |
| @tasklet |
| def foo(): |
| if False: yield # The presence of 'yield' makes foo a generator |
| raise Return(42) # Or, after PEP 380, return 42 |
| |
| This feature (inspired by Monocle) is handy in case you are |
| implementing an interface that expects tasklets but you have no need to |
| suspend -- there's no need to insert a dummy yield in order to make |
| the tasklet into a generator. |
| """ |
| |
| import collections |
| import logging |
| import os |
| import sys |
| import types |
| |
| from .google_imports import apiproxy_stub_map |
| from .google_imports import apiproxy_rpc |
| from .google_imports import datastore |
| from .google_imports import datastore_errors |
| from .google_imports import datastore_rpc |
| from .google_imports import namespace_manager |
| |
| from . import eventloop |
| from . import utils |
| |
| __all__ = ['Return', 'tasklet', 'synctasklet', 'toplevel', 'sleep', |
| 'add_flow_exception', 'get_return_value', |
| 'get_context', 'set_context', |
| 'make_default_context', 'make_context', |
| 'Future', 'MultiFuture', 'QueueFuture', 'SerialQueueFuture', |
| 'ReducingFuture', |
| ] |
| |
| _logging_debug = utils.logging_debug |
| |
| |
| def _is_generator(obj): |
| """Helper to test for a generator object. |
| |
| NOTE: This tests for the (iterable) object returned by calling a |
| generator function, not for a generator function. |
| """ |
| return isinstance(obj, types.GeneratorType) |
| |
| |
| class _State(utils.threading_local): |
| """Hold thread-local state.""" |
| |
| current_context = None |
| |
| def __init__(self): |
| super(_State, self).__init__() |
| self.all_pending = set() |
| |
| def add_pending(self, fut): |
| _logging_debug('all_pending: add %s', fut) |
| self.all_pending.add(fut) |
| |
| def remove_pending(self, fut, status='success'): |
| if fut in self.all_pending: |
| _logging_debug('all_pending: %s: remove %s', status, fut) |
| self.all_pending.remove(fut) |
| else: |
| _logging_debug('all_pending: %s: not found %s', status, fut) |
| |
| def clear_all_pending(self): |
| if self.all_pending: |
| logging.info('all_pending: clear %s', self.all_pending) |
| self.all_pending.clear() |
| else: |
| _logging_debug('all_pending: clear no-op') |
| |
| def dump_all_pending(self, verbose=False): |
| pending = [] |
| for fut in self.all_pending: |
| if verbose: |
| line = fut.dump() + ('\n' + '-'*40) |
| else: |
| line = fut.dump_stack() |
| pending.append(line) |
| return '\n'.join(pending) |
| |
| |
| _state = _State() |
| |
| |
| # Tuple of exceptions that should not be logged (except in debug mode). |
| _flow_exceptions = () |
| |
| |
| def add_flow_exception(exc): |
| """Add an exception that should not be logged. |
| |
| The argument must be a subclass of Exception. |
| """ |
| global _flow_exceptions |
| if not isinstance(exc, type) or not issubclass(exc, Exception): |
| raise TypeError('Expected an Exception subclass, got %r' % (exc,)) |
| as_set = set(_flow_exceptions) |
| as_set.add(exc) |
| _flow_exceptions = tuple(as_set) |
| |
| |
| def _init_flow_exceptions(): |
| """Internal helper to initialize _flow_exceptions. |
| |
| This automatically adds webob.exc.HTTPException, if it can be imported. |
| """ |
| global _flow_exceptions |
| _flow_exceptions = () |
| add_flow_exception(datastore_errors.Rollback) |
| try: |
| from webob import exc |
| except ImportError: |
| pass |
| else: |
| add_flow_exception(exc.HTTPException) |
| |
| |
| _init_flow_exceptions() |
| |
| |
| class Future(object): |
| """A Future has 0 or more callbacks. |
| |
| The callbacks will be called when the result is ready. |
| |
| NOTE: This is somewhat inspired but not conformant to the Future interface |
| defined by PEP 3148. It is also inspired (and tries to be somewhat |
| compatible with) the App Engine specific UserRPC and MultiRpc classes. |
| """ |
| # TODO: Trim the API; there are too many ways to do the same thing. |
| # TODO: Compare to Monocle's much simpler Callback class. |
| |
| # Constants for state property. |
| IDLE = apiproxy_rpc.RPC.IDLE # Not yet running (unused) |
| RUNNING = apiproxy_rpc.RPC.RUNNING # Not yet completed. |
| FINISHING = apiproxy_rpc.RPC.FINISHING # Completed. |
| |
| # XXX Add docstrings to all methods. Separate PEP 3148 API from RPC API. |
| |
| _geninfo = None # Extra info about suspended generator. |
| |
| def __init__(self, info=None): |
| # TODO: Make done a method, to match PEP 3148? |
| __ndb_debug__ = 'SKIP' # Hide this frame from self._where |
| self._info = info # Info from the caller about this Future's purpose. |
| self._where = utils.get_stack() |
| self._context = None |
| self._reset() |
| |
| def _reset(self): |
| self._done = False |
| self._result = None |
| self._exception = None |
| self._traceback = None |
| self._callbacks = [] |
| self._immediate_callbacks = [] |
| _state.add_pending(self) |
| self._next = None # Links suspended Futures together in a stack. |
| |
| # TODO: Add a __del__ that complains if neither get_exception() nor |
| # check_success() was ever called? What if it's not even done? |
| |
| def __repr__(self): |
| if self._done: |
| if self._exception is not None: |
| state = 'exception %s: %s' % (self._exception.__class__.__name__, |
| self._exception) |
| else: |
| state = 'result %r' % (self._result,) |
| else: |
| state = 'pending' |
| line = '?' |
| for line in self._where: |
| if 'tasklets.py' not in line: |
| break |
| if self._info: |
| line += ' for %s' % self._info |
| if self._geninfo: |
| line += ' %s' % self._geninfo |
| return '<%s %x created by %s; %s>' % ( |
| self.__class__.__name__, id(self), line, state) |
| |
| def dump(self): |
| return '%s\nCreated by %s' % (self.dump_stack(), |
| '\n called by '.join(self._where)) |
| |
| def dump_stack(self): |
| lines = [] |
| fut = self |
| while fut is not None: |
| lines.append(str(fut)) |
| fut = fut._next |
| return '\n waiting for '.join(lines) |
| |
| def add_callback(self, callback, *args, **kwds): |
| if self._done: |
| eventloop.queue_call(None, callback, *args, **kwds) |
| else: |
| self._callbacks.append((callback, args, kwds)) |
| |
| def add_immediate_callback(self, callback, *args, **kwds): |
| if self._done: |
| callback(*args, **kwds) |
| else: |
| self._immediate_callbacks.append((callback, args, kwds)) |
| |
| def set_result(self, result): |
| if self._done: |
| raise RuntimeError('Result cannot be set twice.') |
| self._result = result |
| self._done = True |
| _state.remove_pending(self) |
| for callback, args, kwds in self._immediate_callbacks: |
| callback(*args, **kwds) |
| for callback, args, kwds in self._callbacks: |
| eventloop.queue_call(None, callback, *args, **kwds) |
| |
| def set_exception(self, exc, tb=None): |
| if not isinstance(exc, BaseException): |
| raise TypeError('exc must be an Exception; received %r' % exc) |
| if self._done: |
| raise RuntimeError('Exception cannot be set twice.') |
| self._exception = exc |
| self._traceback = tb |
| self._done = True |
| _state.remove_pending(self, status='fail') |
| for callback, args, kwds in self._immediate_callbacks: |
| callback(*args, **kwds) |
| for callback, args, kwds in self._callbacks: |
| eventloop.queue_call(None, callback, *args, **kwds) |
| |
| def done(self): |
| return self._done |
| |
| @property |
| def state(self): |
| # This is just for compatibility with UserRPC and MultiRpc. |
| # A Future is considered running as soon as it is created. |
| if self._done: |
| return self.FINISHING |
| else: |
| return self.RUNNING |
| |
| def wait(self): |
| if self._done: |
| return |
| ev = eventloop.get_event_loop() |
| while not self._done: |
| if not ev.run1(): |
| logging.info('Deadlock in %s', self) |
| logging.info('All pending Futures:\n%s', _state.dump_all_pending()) |
| _logging_debug('All pending Futures (verbose):\n%s', |
| _state.dump_all_pending(verbose=True)) |
| self.set_exception(RuntimeError('Deadlock waiting for %s' % self)) |
| |
| def get_exception(self): |
| self.wait() |
| return self._exception |
| |
| def get_traceback(self): |
| self.wait() |
| return self._traceback |
| |
| def check_success(self): |
| self.wait() |
| if self._exception is not None: |
| raise self._exception.__class__, self._exception, self._traceback |
| |
| def get_result(self): |
| self.check_success() |
| return self._result |
| |
| # TODO: Have a tasklet that does this |
| @classmethod |
| def wait_any(cls, futures): |
| # TODO: Flatten MultiRpcs. |
| waiting_on = set(futures) |
| ev = eventloop.get_event_loop() |
| while waiting_on: |
| for f in waiting_on: |
| if f.state == cls.FINISHING: |
| return f |
| ev.run1() |
| return None |
| |
| # TODO: Have a tasklet that does this |
| @classmethod |
| def wait_all(cls, futures): |
| # TODO: Flatten MultiRpcs. |
| waiting_on = set(futures) |
| ev = eventloop.get_event_loop() |
| while waiting_on: |
| waiting_on = set(f for f in waiting_on if f.state == cls.RUNNING) |
| ev.run1() |
| |
| def _help_tasklet_along(self, ns, ds_conn, gen, val=None, exc=None, tb=None): |
| # XXX Docstring |
| info = utils.gen_info(gen) |
| __ndb_debug__ = info |
| try: |
| save_context = get_context() |
| save_namespace = namespace_manager.get_namespace() |
| save_ds_connection = datastore._GetConnection() |
| try: |
| set_context(self._context) |
| if ns != save_namespace: |
| namespace_manager.set_namespace(ns) |
| if ds_conn is not save_ds_connection: |
| datastore._SetConnection(ds_conn) |
| if exc is not None: |
| _logging_debug('Throwing %s(%s) into %s', |
| exc.__class__.__name__, exc, info) |
| value = gen.throw(exc.__class__, exc, tb) |
| else: |
| _logging_debug('Sending %r to %s', val, info) |
| value = gen.send(val) |
| self._context = get_context() |
| finally: |
| ns = namespace_manager.get_namespace() |
| ds_conn = datastore._GetConnection() |
| set_context(save_context) |
| if save_namespace != ns: |
| namespace_manager.set_namespace(save_namespace) |
| if save_ds_connection is not ds_conn: |
| datastore._SetConnection(save_ds_connection) |
| |
| except StopIteration, err: |
| result = get_return_value(err) |
| _logging_debug('%s returned %r', info, result) |
| self.set_result(result) |
| return |
| |
| except GeneratorExit: |
| # In Python 2.5, this derives from Exception, but we don't want |
| # to handle it like other Exception instances. So we catch and |
| # re-raise it immediately. See issue 127. http://goo.gl/2p5Pn |
| # TODO: Remove when Python 2.5 is no longer supported. |
| raise |
| |
| except Exception, err: |
| _, _, tb = sys.exc_info() |
| if isinstance(err, _flow_exceptions): |
| # Flow exceptions aren't logged except in "heavy debug" mode, |
| # and then only at DEBUG level, without a traceback. |
| _logging_debug('%s raised %s(%s)', |
| info, err.__class__.__name__, err) |
| elif utils.DEBUG and logging.getLogger().level < logging.DEBUG: |
| # In "heavy debug" mode, log a warning with traceback. |
| # (This is the same condition as used in utils.logging_debug().) |
| logging.warning('%s raised %s(%s)', |
| info, err.__class__.__name__, err, exc_info=True) |
| else: |
| # Otherwise, log a warning without a traceback. |
| logging.warning('%s raised %s(%s)', info, err.__class__.__name__, err) |
| self.set_exception(err, tb) |
| return |
| |
| else: |
| _logging_debug('%s yielded %r', info, value) |
| if isinstance(value, (apiproxy_stub_map.UserRPC, |
| datastore_rpc.MultiRpc)): |
| # TODO: Tail recursion if the RPC is already complete. |
| eventloop.queue_rpc(value, self._on_rpc_completion, |
| value, ns, ds_conn, gen) |
| return |
| if isinstance(value, Future): |
| # TODO: Tail recursion if the Future is already done. |
| if self._next: |
| raise RuntimeError('Future has already completed yet next is %r' % |
| self._next) |
| self._next = value |
| self._geninfo = utils.gen_info(gen) |
| _logging_debug('%s is now blocked waiting for %s', self, value) |
| value.add_callback(self._on_future_completion, value, ns, ds_conn, gen) |
| return |
| if isinstance(value, (tuple, list)): |
| # Arrange for yield to return a list of results (not Futures). |
| info = 'multi-yield from %s' % utils.gen_info(gen) |
| mfut = MultiFuture(info) |
| try: |
| for subfuture in value: |
| mfut.add_dependent(subfuture) |
| mfut.complete() |
| except GeneratorExit: |
| raise |
| except Exception, err: |
| _, _, tb = sys.exc_info() |
| mfut.set_exception(err, tb) |
| mfut.add_callback(self._on_future_completion, mfut, ns, ds_conn, gen) |
| return |
| if _is_generator(value): |
| # TODO: emulate PEP 380 here? |
| raise NotImplementedError('Cannot defer to another generator.') |
| raise RuntimeError('A tasklet should not yield a plain value: ' |
| '%.200s yielded %.200r' % (info, value)) |
| |
| def _on_rpc_completion(self, rpc, ns, ds_conn, gen): |
| try: |
| result = rpc.get_result() |
| except GeneratorExit: |
| raise |
| except Exception, err: |
| _, _, tb = sys.exc_info() |
| self._help_tasklet_along(ns, ds_conn, gen, exc=err, tb=tb) |
| else: |
| self._help_tasklet_along(ns, ds_conn, gen, result) |
| |
| def _on_future_completion(self, future, ns, ds_conn, gen): |
| if self._next is future: |
| self._next = None |
| self._geninfo = None |
| _logging_debug('%s is no longer blocked waiting for %s', self, future) |
| exc = future.get_exception() |
| if exc is not None: |
| self._help_tasklet_along(ns, ds_conn, gen, |
| exc=exc, tb=future.get_traceback()) |
| else: |
| val = future.get_result() # This won't raise an exception. |
| self._help_tasklet_along(ns, ds_conn, gen, val) |
| |
| def sleep(dt): |
| """Public function to sleep some time. |
| |
| Example: |
| yield tasklets.sleep(0.5) # Sleep for half a sec. |
| """ |
| fut = Future('sleep(%.3f)' % dt) |
| eventloop.queue_call(dt, fut.set_result, None) |
| return fut |
| |
| |
| class MultiFuture(Future): |
| """A Future that depends on multiple other Futures. |
| |
| This is used internally by 'v1, v2, ... = yield f1, f2, ...'; the |
| semantics (e.g. error handling) are constrained by that use case. |
| |
| The protocol from the caller's POV is: |
| |
| mf = MultiFuture() |
| mf.add_dependent(<some other Future>) -OR- mf.putq(<some value>) |
| mf.add_dependent(<some other Future>) -OR- mf.putq(<some value>) |
| . |
| . (More mf.add_dependent() and/or mf.putq() calls) |
| . |
| mf.complete() # No more dependents will be added. |
| . |
| . (Time passes) |
| . |
| results = mf.get_result() |
| |
| Now, results is a list of results from all dependent Futures in |
| the order in which they were added. |
| |
| It is legal to add the same dependent multiple times. |
| |
| Callbacks can be added at any point. |
| |
| From a dependent Future POV, there's nothing to be done: a callback |
| is automatically added to each dependent Future which will signal |
| its completion to the MultiFuture. |
| |
| Error handling: if any dependent future raises an error, it is |
| propagated to mf. To force an early error, you can call |
| mf.set_exception() instead of mf.complete(). After this you can't |
| call mf.add_dependent() or mf.putq() any more. |
| """ |
| |
| def __init__(self, info=None): |
| __ndb_debug__ = 'SKIP' # Hide this frame from self._where |
| self._full = False |
| self._dependents = set() |
| self._results = [] |
| super(MultiFuture, self).__init__(info=info) |
| |
| def __repr__(self): |
| # TODO: This may be invoked before __init__() returns, |
| # from Future.__init__(). Beware. |
| line = super(MultiFuture, self).__repr__() |
| lines = [line] |
| for fut in self._results: |
| lines.append(fut.dump_stack().replace('\n', '\n ')) |
| return '\n waiting for '.join(lines) |
| |
| # TODO: Maybe rename this method, since completion of a Future/RPC |
| # already means something else. But to what? |
| def complete(self): |
| if self._full: |
| raise RuntimeError('MultiFuture cannot complete twice.') |
| self._full = True |
| if not self._dependents: |
| self._finish() |
| |
| # TODO: Maybe don't overload set_exception() with this? |
| def set_exception(self, exc, tb=None): |
| self._full = True |
| super(MultiFuture, self).set_exception(exc, tb) |
| |
| def _finish(self): |
| if not self._full: |
| raise RuntimeError('MultiFuture cannot finish until completed.') |
| if self._dependents: |
| raise RuntimeError('MultiFuture cannot finish whilst waiting for ' |
| 'dependents %r' % self._dependents) |
| if self._done: |
| raise RuntimeError('MultiFuture done before finishing.') |
| try: |
| result = [r.get_result() for r in self._results] |
| except GeneratorExit: |
| raise |
| except Exception, err: |
| _, _, tb = sys.exc_info() |
| self.set_exception(err, tb) |
| else: |
| self.set_result(result) |
| |
| def putq(self, value): |
| if isinstance(value, Future): |
| fut = value |
| else: |
| fut = Future() |
| fut.set_result(value) |
| self.add_dependent(fut) |
| |
| def add_dependent(self, fut): |
| if isinstance(fut, list): |
| mfut = MultiFuture() |
| map(mfut.add_dependent, fut) |
| mfut.complete() |
| fut = mfut |
| elif not isinstance(fut, Future): |
| raise TypeError('Expected Future, received %s: %r' % (type(fut), fut)) |
| if self._full: |
| raise RuntimeError('MultiFuture cannot add a dependent once complete.') |
| self._results.append(fut) |
| if fut not in self._dependents: |
| self._dependents.add(fut) |
| fut.add_callback(self._signal_dependent_done, fut) |
| |
| def _signal_dependent_done(self, fut): |
| self._dependents.remove(fut) |
| if self._full and not self._dependents and not self._done: |
| self._finish() |
| |
| |
| class QueueFuture(Future): |
| """A Queue following the same protocol as MultiFuture. |
| |
| However, instead of returning results as a list, it lets you |
| retrieve results as soon as they are ready, one at a time, using |
| getq(). The Future itself finishes with a result of None when the |
| last result is ready (regardless of whether it was retrieved). |
| |
| The getq() method returns a Future which blocks until the next |
| result is ready, and then returns that result. Each getq() call |
| retrieves one unique result. Extra getq() calls after the last |
| result is already returned return EOFError as their Future's |
| exception. (I.e., q.getq() returns a Future as always, but yieding |
| that Future raises EOFError.) |
| |
| NOTE: Values can also be pushed directly via .putq(value). However |
| there is no flow control -- if the producer is faster than the |
| consumer, the queue will grow unbounded. |
| """ |
| # TODO: Refactor to share code with MultiFuture. |
| |
| def __init__(self, info=None): |
| self._full = False |
| self._dependents = set() |
| self._completed = collections.deque() |
| self._waiting = collections.deque() |
| # Invariant: at least one of _completed and _waiting is empty. |
| # Also: _full and not _dependents <==> _done. |
| super(QueueFuture, self).__init__(info=info) |
| |
| # TODO: __repr__ |
| |
| def complete(self): |
| if self._full: |
| raise RuntimeError('MultiFuture cannot complete twice.') |
| self._full = True |
| if not self._dependents: |
| self.set_result(None) |
| self._mark_finished() |
| |
| def set_exception(self, exc, tb=None): |
| self._full = True |
| super(QueueFuture, self).set_exception(exc, tb) |
| if not self._dependents: |
| self._mark_finished() |
| |
| def putq(self, value): |
| if isinstance(value, Future): |
| fut = value |
| else: |
| fut = Future() |
| fut.set_result(value) |
| self.add_dependent(fut) |
| |
| def add_dependent(self, fut): |
| if not isinstance(fut, Future): |
| raise TypeError('fut must be a Future instance; received %r' % fut) |
| if self._full: |
| raise RuntimeError('QueueFuture add dependent once complete.') |
| if fut not in self._dependents: |
| self._dependents.add(fut) |
| fut.add_callback(self._signal_dependent_done, fut) |
| |
| def _signal_dependent_done(self, fut): |
| if not fut.done(): |
| raise RuntimeError('Future not done before signalling dependant done.') |
| self._dependents.remove(fut) |
| exc = fut.get_exception() |
| tb = fut.get_traceback() |
| val = None |
| if exc is None: |
| val = fut.get_result() |
| if self._waiting: |
| waiter = self._waiting.popleft() |
| self._pass_result(waiter, exc, tb, val) |
| else: |
| self._completed.append((exc, tb, val)) |
| if self._full and not self._dependents and not self._done: |
| self.set_result(None) |
| self._mark_finished() |
| |
| def _mark_finished(self): |
| if not self.done(): |
| raise RuntimeError('Future not done before marking as finished.') |
| while self._waiting: |
| waiter = self._waiting.popleft() |
| self._pass_eof(waiter) |
| |
| def getq(self): |
| fut = Future() |
| if self._completed: |
| exc, tb, val = self._completed.popleft() |
| self._pass_result(fut, exc, tb, val) |
| elif self._full and not self._dependents: |
| self._pass_eof(fut) |
| else: |
| self._waiting.append(fut) |
| return fut |
| |
| def _pass_eof(self, fut): |
| if not self._done: |
| raise RuntimeError('QueueFuture cannot pass EOF until done.') |
| exc = self.get_exception() |
| if exc is not None: |
| tb = self.get_traceback() |
| else: |
| exc = EOFError('Queue is empty') |
| tb = None |
| self._pass_result(fut, exc, tb, None) |
| |
| def _pass_result(self, fut, exc, tb, val): |
| if exc is not None: |
| fut.set_exception(exc, tb) |
| else: |
| fut.set_result(val) |
| |
| |
| class SerialQueueFuture(Future): |
| """Like QueueFuture but maintains the order of insertion. |
| |
| This class is used by Query operations. |
| |
| Invariants: |
| |
| - At least one of _queue and _waiting is empty. |
| - The Futures in _waiting are always pending. |
| |
| (The Futures in _queue may be pending or completed.) |
| |
| In the discussion below, add_dependent() is treated the same way as |
| putq(). |
| |
| If putq() is ahead of getq(), the situation is like this: |
| |
| putq() |
| v |
| _queue: [f1, f2, ...]; _waiting: [] |
| ^ |
| getq() |
| |
| Here, putq() appends a Future to the right of _queue, and getq() |
| removes one from the left. |
| |
| If getq() is ahead of putq(), it's like this: |
| |
| putq() |
| v |
| _queue: []; _waiting: [f1, f2, ...] |
| ^ |
| getq() |
| |
| Here, putq() removes a Future from the left of _waiting, and getq() |
| appends one to the right. |
| |
| When both are empty, putq() appends a Future to the right of _queue, |
| while getq() appends one to the right of _waiting. |
| |
| The _full flag means that no more calls to putq() will be made; it |
| is set by calling either complete() or set_exception(). |
| |
| Calling complete() signals that no more putq() calls will be made. |
| If getq() is behind, subsequent getq() calls will eat up _queue |
| until it is empty, and after that will return a Future that passes |
| EOFError (note that getq() itself never raises EOFError). If getq() |
| is ahead when complete() is called, the Futures in _waiting are all |
| passed an EOFError exception (thereby eating up _waiting). |
| |
| If, instead of complete(), set_exception() is called, the exception |
| and traceback set there will be used instead of EOFError. |
| """ |
| |
| def __init__(self, info=None): |
| self._full = False |
| self._queue = collections.deque() |
| self._waiting = collections.deque() |
| super(SerialQueueFuture, self).__init__(info=info) |
| |
| # TODO: __repr__ |
| |
| def complete(self): |
| if self._full: |
| raise RuntimeError('SerialQueueFuture cannot complete twice.') |
| self._full = True |
| while self._waiting: |
| waiter = self._waiting.popleft() |
| waiter.set_exception(EOFError('Queue is empty')) |
| if not self._queue: |
| self.set_result(None) |
| |
| def set_exception(self, exc, tb=None): |
| self._full = True |
| super(SerialQueueFuture, self).set_exception(exc, tb) |
| while self._waiting: |
| waiter = self._waiting.popleft() |
| waiter.set_exception(exc, tb) |
| |
| def putq(self, value): |
| if isinstance(value, Future): |
| fut = value |
| else: |
| if self._waiting: |
| waiter = self._waiting.popleft() |
| waiter.set_result(value) |
| return |
| fut = Future() |
| fut.set_result(value) |
| self.add_dependent(fut) |
| |
| def add_dependent(self, fut): |
| if not isinstance(fut, Future): |
| raise TypeError('fut must be a Future instance; received %r' % fut) |
| if self._full: |
| raise RuntimeError('SerialQueueFuture cannot add dependent ' |
| 'once complete.') |
| if self._waiting: |
| waiter = self._waiting.popleft() |
| fut.add_callback(_transfer_result, fut, waiter) |
| else: |
| self._queue.append(fut) |
| |
| def getq(self): |
| if self._queue: |
| fut = self._queue.popleft() |
| # TODO: Isn't it better to call self.set_result(None) in complete()? |
| if not self._queue and self._full and not self._done: |
| self.set_result(None) |
| else: |
| fut = Future() |
| if self._full: |
| if not self._done: |
| raise RuntimeError('self._queue should be non-empty.') |
| err = self.get_exception() |
| if err is not None: |
| tb = self.get_traceback() |
| else: |
| err = EOFError('Queue is empty') |
| tb = None |
| fut.set_exception(err, tb) |
| else: |
| self._waiting.append(fut) |
| return fut |
| |
| |
| def _transfer_result(fut1, fut2): |
| """Helper to transfer result or errors from one Future to another.""" |
| exc = fut1.get_exception() |
| if exc is not None: |
| tb = fut1.get_traceback() |
| fut2.set_exception(exc, tb) |
| else: |
| val = fut1.get_result() |
| fut2.set_result(val) |
| |
| |
| class ReducingFuture(Future): |
| """A Queue following the same protocol as MultiFuture. |
| |
| However the result, instead of being a list of results of dependent |
| Futures, is computed by calling a 'reducer' tasklet. The reducer tasklet |
| takes a list of values and returns a single value. It may be called |
| multiple times on sublists of values and should behave like |
| e.g. sum(). |
| |
| NOTE: The reducer input values may be reordered compared to the |
| order in which they were added to the queue. |
| """ |
| # TODO: Refactor to reuse some code with MultiFuture. |
| |
| def __init__(self, reducer, info=None, batch_size=20): |
| self._reducer = reducer |
| self._batch_size = batch_size |
| self._full = False |
| self._dependents = set() |
| self._completed = collections.deque() |
| self._queue = collections.deque() |
| super(ReducingFuture, self).__init__(info=info) |
| |
| # TODO: __repr__ |
| |
| def complete(self): |
| if self._full: |
| raise RuntimeError('ReducingFuture cannot complete twice.') |
| self._full = True |
| if not self._dependents: |
| self._mark_finished() |
| |
| def set_exception(self, exc, tb=None): |
| self._full = True |
| self._queue.clear() |
| super(ReducingFuture, self).set_exception(exc, tb) |
| |
| def putq(self, value): |
| if isinstance(value, Future): |
| fut = value |
| else: |
| fut = Future() |
| fut.set_result(value) |
| self.add_dependent(fut) |
| |
| def add_dependent(self, fut): |
| if self._full: |
| raise RuntimeError('ReducingFuture cannot add dependent once complete.') |
| self._internal_add_dependent(fut) |
| |
| def _internal_add_dependent(self, fut): |
| if not isinstance(fut, Future): |
| raise TypeError('fut must be a Future; received %r' % fut) |
| if fut not in self._dependents: |
| self._dependents.add(fut) |
| fut.add_callback(self._signal_dependent_done, fut) |
| |
| def _signal_dependent_done(self, fut): |
| if not fut.done(): |
| raise RuntimeError('Future not done before signalling dependant done.') |
| self._dependents.remove(fut) |
| if self._done: |
| return # Already done. |
| try: |
| val = fut.get_result() |
| except GeneratorExit: |
| raise |
| except Exception, err: |
| _, _, tb = sys.exc_info() |
| self.set_exception(err, tb) |
| return |
| self._queue.append(val) |
| if len(self._queue) >= self._batch_size: |
| todo = list(self._queue) |
| self._queue.clear() |
| try: |
| nval = self._reducer(todo) |
| except GeneratorExit: |
| raise |
| except Exception, err: |
| _, _, tb = sys.exc_info() |
| self.set_exception(err, tb) |
| return |
| if isinstance(nval, Future): |
| self._internal_add_dependent(nval) |
| else: |
| self._queue.append(nval) |
| if self._full and not self._dependents: |
| self._mark_finished() |
| |
| def _mark_finished(self): |
| if not self._queue: |
| self.set_result(None) |
| elif len(self._queue) == 1: |
| self.set_result(self._queue.pop()) |
| else: |
| todo = list(self._queue) |
| self._queue.clear() |
| try: |
| nval = self._reducer(todo) |
| except GeneratorExit: |
| raise |
| except Exception, err: |
| _, _, tb = sys.exc_info() |
| self.set_exception(err, tb) |
| return |
| if isinstance(nval, Future): |
| self._internal_add_dependent(nval) |
| else: |
| self.set_result(nval) |
| |
| |
| # Alias for StopIteration used to mark return values. |
| # To use this, raise Return(<your return value>). The semantics |
| # are exactly the same as raise StopIteration(<your return value>) |
| # but using Return clarifies that you are intending this to be the |
| # return value of a tasklet. |
| # TODO: According to Monocle authors Steve and Greg Hazel, Twisted |
| # used an exception to signal a return value from a generator early |
| # on, and they found out it was error-prone. Should I worry? |
| Return = StopIteration |
| |
| |
| def get_return_value(err): |
| # XXX Docstring |
| if not err.args: |
| result = None |
| elif len(err.args) == 1: |
| result = err.args[0] |
| else: |
| result = err.args |
| return result |
| |
| |
| def tasklet(func): |
| # XXX Docstring |
| |
| @utils.wrapping(func) |
| def tasklet_wrapper(*args, **kwds): |
| # XXX Docstring |
| |
| # TODO: make most of this a public function so you can take a bare |
| # generator and turn it into a tasklet dynamically. (Monocle has |
| # this I believe.) |
| # __ndb_debug__ = utils.func_info(func) |
| fut = Future('tasklet %s' % utils.func_info(func)) |
| fut._context = get_context() |
| try: |
| result = func(*args, **kwds) |
| except StopIteration, err: |
| # Just in case the function is not a generator but still uses |
| # the "raise Return(...)" idiom, we'll extract the return value. |
| result = get_return_value(err) |
| if _is_generator(result): |
| ns = namespace_manager.get_namespace() |
| ds_conn = datastore._GetConnection() |
| eventloop.queue_call(None, fut._help_tasklet_along, ns, ds_conn, result) |
| else: |
| fut.set_result(result) |
| return fut |
| |
| return tasklet_wrapper |
| |
| |
| def synctasklet(func): |
| """Decorator to run a function as a tasklet when called. |
| |
| Use this to wrap a request handler function that will be called by |
| some web application framework (e.g. a Django view function or a |
| webapp.RequestHandler.get method). |
| """ |
| taskletfunc = tasklet(func) # wrap at declaration time. |
| @utils.wrapping(func) |
| def synctasklet_wrapper(*args, **kwds): |
| __ndb_debug__ = utils.func_info(func) |
| return taskletfunc(*args, **kwds).get_result() |
| return synctasklet_wrapper |
| |
| |
| def toplevel(func): |
| """A sync tasklet that sets a fresh default Context. |
| |
| Use this for toplevel view functions such as |
| webapp.RequestHandler.get() or Django view functions. |
| """ |
| synctaskletfunc = synctasklet(func) # wrap at declaration time. |
| @utils.wrapping(func) |
| def add_context_wrapper(*args, **kwds): |
| __ndb_debug__ = utils.func_info(func) |
| _state.clear_all_pending() |
| # Create and install a new context. |
| ctx = make_default_context() |
| try: |
| set_context(ctx) |
| return synctaskletfunc(*args, **kwds) |
| finally: |
| set_context(None) |
| ctx.flush().check_success() |
| eventloop.run() # Ensure writes are flushed, etc. |
| return add_context_wrapper |
| |
| |
| _CONTEXT_KEY = '__CONTEXT__' |
| |
| |
| def get_context(): |
| # XXX Docstring |
| ctx = None |
| if os.getenv(_CONTEXT_KEY): |
| ctx = _state.current_context |
| if ctx is None: |
| ctx = make_default_context() |
| set_context(ctx) |
| return ctx |
| |
| |
| def make_default_context(): |
| # XXX Docstring |
| return make_context() |
| |
| |
| @utils.positional(0) |
| def make_context(conn=None, config=None): |
| # XXX Docstring |
| from . import context # Late import to deal with circular imports. |
| return context.Context(conn=conn, config=config) |
| |
| |
| def set_context(new_context): |
| # XXX Docstring |
| os.environ[_CONTEXT_KEY] = '1' |
| _state.current_context = new_context |
| |
| |
| # TODO: Rework the following into documentation. |
| |
| # A tasklet/coroutine/generator can yield the following things: |
| # - Another tasklet/coroutine/generator; this is entirely equivalent to |
| # "for x in g: yield x"; this is handled entirely by the @tasklet wrapper. |
| # (Actually, not. @tasklet returns a function that when called returns |
| # a Future. You can use the pep380 module's @gwrap decorator to support |
| # yielding bare generators though.) |
| # - An RPC (or MultiRpc); the tasklet will be resumed when this completes. |
| # This does not use the RPC's callback mechanism. |
| # - A Future; the tasklet will be resumed when the Future is done. |
| # This uses the Future's callback mechanism. |
| |
| # A Future can be used in several ways: |
| # - Yield it from a tasklet; see above. |
| # - Check (poll) its status via f.done. |
| # - Call its wait() method, perhaps indirectly via check_success() |
| # or get_result(). This invokes the event loop. |
| # - Call the Future.wait_any() or Future.wait_all() method. |
| # This is waits for any or all Futures and RPCs in the argument list. |
| |
| # XXX HIRO XXX |
| |
| # - A tasklet is a (generator) function decorated with @tasklet. |
| |
| # - Calling a tasklet schedules the function for execution and returns a Future. |
| |
| # - A function implementing a tasklet may: |
| # = yield a Future; this waits for the Future which returns f.get_result(); |
| # = yield an RPC; this waits for the RPC and then returns rpc.get_result(); |
| # = raise Return(result); this sets the outer Future's result; |
| # = raise StopIteration or return; this sets the outer Future's result; |
| # = raise another exception: this sets the outer Future's exception. |
| |
| # - If a function implementing a tasklet is not a generator it will be |
| # immediately executed to completion and the tasklet wrapper will |
| # return a Future that is already done. (XXX Alternative behavior: |
| # it schedules the call to be run by the event loop.) |
| |
| # - Code not running in a tasklet can call f.get_result() or f.wait() on |
| # a future. This is implemented by a simple loop like the following: |
| |
| # while not self._done: |
| # eventloop.run1() |
| |
| # - Here eventloop.run1() runs one "atomic" part of the event loop: |
| # = either it calls one immediately ready callback; |
| # = or it waits for the first RPC to complete; |
| # = or it sleeps until the first callback should be ready; |
| # = or it raises an exception indicating all queues are empty. |
| |
| # - It is possible but suboptimal to call rpc.get_result() or |
| # rpc.wait() directly on an RPC object since this will not allow |
| # other callbacks to run as they become ready. Wrapping an RPC in a |
| # Future will take care of this issue. |
| |
| # - The important insight is that when a generator function |
| # implementing a tasklet yields, raises or returns, there is always a |
| # wrapper that catches this event and either turns it into a |
| # callback sent to the event loop, or sets the result or exception |
| # for the tasklet's Future. |