From: Ben Darnell Date: Mon, 5 Jun 2017 00:52:51 +0000 (-0400) Subject: concurrent: Introduce future_add_done_callback X-Git-Tag: v5.0.0~45^2~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=89b46a51720776a57cbdf89e8615a09cc020404a;p=thirdparty%2Ftornado.git concurrent: Introduce future_add_done_callback This function guarantees synchronous execution of the callback for Futures that are already done, allowing the Future implementation to change to one that does not provide this guarantee. --- diff --git a/tornado/concurrent.py b/tornado/concurrent.py index 5f723dfed..135f1e14a 100644 --- a/tornado/concurrent.py +++ b/tornado/concurrent.py @@ -498,7 +498,7 @@ def return_future(f): callback() else: callback(future.result()) - future.add_done_callback(wrap(run_callback)) + future_add_done_callback(future, wrap(run_callback)) return future return wrapper @@ -520,7 +520,7 @@ def chain_future(a, b): b.set_exception(a.exception()) else: b.set_result(a.result()) - a.add_done_callback(copy) + future_add_done_callback(a, copy) def future_set_exc_info(future, exc_info): @@ -537,3 +537,18 @@ def future_set_exc_info(future, exc_info): else: # asyncio.Future future.set_exception(exc_info[1]) + + +def future_add_done_callback(future, callback): + """Arrange to call ``callback`` when ``future`` is complete. + + ``callback`` is invoked with one argument, the ``future``. + + If ``future`` is already done, ``callback`` is invoked immediately. + This may differ from the behavior of `.Future.add_done_callback`, + which makes no such guarantee. + """ + if future.done(): + callback(future) + else: + future.add_done_callback(callback) diff --git a/tornado/gen.py b/tornado/gen.py index 92eef367a..86a53be6b 100644 --- a/tornado/gen.py +++ b/tornado/gen.py @@ -85,7 +85,7 @@ import textwrap import types import weakref -from tornado.concurrent import Future, is_future, chain_future, future_set_exc_info +from tornado.concurrent import Future, is_future, chain_future, future_set_exc_info, future_add_done_callback from tornado.ioloop import IOLoop from tornado.log import app_log from tornado import stack_context @@ -200,7 +200,7 @@ def engine(func): # The engine interface doesn't give us any way to return # errors but to raise them into the stack context. # Save the stack context here to use when the Future has resolved. - future.add_done_callback(stack_context.wrap(final_callback)) + future_add_done_callback(future, stack_context.wrap(final_callback)) return wrapper @@ -440,7 +440,7 @@ class WaitIterator(object): self._running_future = None for future in futures: - future.add_done_callback(self._done_callback) + future_add_done_callback(future, self._done_callback) def done(self): """Returns True if this iterator has no more results.""" @@ -838,7 +838,7 @@ def multi_future(children, quiet_exceptions=()): for f in children: if f not in listening: listening.add(f) - f.add_done_callback(callback) + future_add_done_callback(f, callback) return future @@ -910,14 +910,14 @@ def with_timeout(timeout, future, quiet_exceptions=()): def timeout_callback(): result.set_exception(TimeoutError("Timeout")) # In case the wrapped future goes on to fail, log it. - future.add_done_callback(error_callback) + future_add_done_callback(future, error_callback) timeout_handle = io_loop.add_timeout( timeout, timeout_callback) if isinstance(future, Future): # We know this future will resolve on the IOLoop, so we don't # need the extra thread-safety of IOLoop.add_future (and we also # don't care about StackContext here. - future.add_done_callback( + future_add_done_callback(future, lambda future: io_loop.remove_timeout(timeout_handle)) else: # concurrent.futures.Futures may resolve on any thread, so we diff --git a/tornado/http1connection.py b/tornado/http1connection.py index ec05b7b52..39d776c75 100644 --- a/tornado/http1connection.py +++ b/tornado/http1connection.py @@ -23,7 +23,7 @@ from __future__ import absolute_import, division, print_function import re -from tornado.concurrent import Future +from tornado.concurrent import Future, future_add_done_callback from tornado.escape import native_str, utf8 from tornado import gen from tornado import httputil @@ -470,7 +470,7 @@ class HTTP1Connection(httputil.HTTPConnection): if self._pending_write is None: self._finish_request(None) else: - self._pending_write.add_done_callback(self._finish_request) + future_add_done_callback(self._pending_write, self._finish_request) def _on_write_complete(self, future): exc = future.exception() diff --git a/tornado/ioloop.py b/tornado/ioloop.py index fd8f369cc..e587d0f33 100644 --- a/tornado/ioloop.py +++ b/tornado/ioloop.py @@ -44,7 +44,7 @@ import time import traceback import math -from tornado.concurrent import Future, is_future, chain_future, future_set_exc_info +from tornado.concurrent import Future, is_future, chain_future, future_set_exc_info, future_add_done_callback from tornado.log import app_log, gen_log from tornado.platform.auto import set_close_exec, Waker from tornado import stack_context @@ -636,8 +636,8 @@ class IOLoop(Configurable): """ assert is_future(future) callback = stack_context.wrap(callback) - future.add_done_callback( - lambda future: self.add_callback(callback, future)) + future_add_done_callback(future, + lambda future: self.add_callback(callback, future)) def run_in_executor(self, executor, func, *args): """Runs a function in a ``concurrent.futures.Executor``. If diff --git a/tornado/tcpclient.py b/tornado/tcpclient.py index 6d5355b32..2aa6ccf04 100644 --- a/tornado/tcpclient.py +++ b/tornado/tcpclient.py @@ -24,7 +24,7 @@ import time import numbers import datetime -from tornado.concurrent import Future +from tornado.concurrent import Future, future_add_done_callback from tornado.ioloop import IOLoop from tornado.iostream import IOStream from tornado import gen @@ -105,8 +105,8 @@ class _Connector(object): return stream, future = self.connect(af, addr) self.streams.add(stream) - future.add_done_callback(functools.partial(self.on_connect_done, - addrs, af, addr)) + future_add_done_callback(future, functools.partial(self.on_connect_done, + addrs, af, addr)) def on_connect_done(self, addrs, af, addr, future): self.remaining -= 1