self.gen = gen
self.result_future = result_future
self.future = _null_future
- self.yield_point = None
- self.pending_callbacks = None
- self.results = None
self.running = False
self.finished = False
- self.had_exception = False
self.io_loop = IOLoop.current()
if self.handle_yield(first_yielded):
gen = result_future = first_yielded = None
self.run()
- def register_callback(self, key):
- """Adds ``key`` to the list of callbacks."""
- if self.pending_callbacks is None:
- # Lazily initialize the old-style YieldPoint data structures.
- self.pending_callbacks = set()
- self.results = {}
- if key in self.pending_callbacks:
- raise KeyReuseError("key %r is already pending" % (key,))
- self.pending_callbacks.add(key)
-
- def is_ready(self, key):
- """Returns true if a result is available for ``key``."""
- if self.pending_callbacks is None or key not in self.pending_callbacks:
- raise UnknownKeyError("key %r is not pending" % (key,))
- return key in self.results
-
- def set_result(self, key, result):
- """Sets the result for ``key`` and attempts to resume the generator."""
- self.results[key] = result
- if self.yield_point is not None and self.yield_point.is_ready():
- try:
- future_set_result_unless_cancelled(self.future,
- self.yield_point.get_result())
- except:
- future_set_exc_info(self.future, sys.exc_info())
- self.yield_point = None
- self.run()
-
- def pop_result(self, key):
- """Returns the result for ``key`` and unregisters it."""
- self.pending_callbacks.remove(key)
- return self.results.pop(key)
-
def run(self):
"""Starts or resumes the generator, running until it reaches a
yield point that is not ready.
try:
value = future.result()
except Exception:
- self.had_exception = True
exc_info = sys.exc_info()
future = None
except (StopIteration, Return) as e:
self.finished = True
self.future = _null_future
- if self.pending_callbacks and not self.had_exception:
- # If we ran cleanly without waiting on all callbacks
- # raise an error (really more of a warning). If we
- # had an exception then some callbacks may have been
- # orphaned, so skip the check in that case.
- raise LeakedCallbackError(
- "finished without waiting for callbacks %r" %
- self.pending_callbacks)
future_set_result_unless_cancelled(self.result_future,
_value_from_stopiteration(e))
self.result_future = None