import time
import uuid
-from tornado.concurrent import Future, return_future, chain_future, future_set_exc_info
+from tornado.concurrent import (Future, return_future, chain_future,
+ future_set_exc_info,
+ future_set_result_unless_cancelled)
from tornado import gen
from tornado import httpclient
from tornado import escape
claimed_id = self.get_argument("openid.claimed_id", None)
if claimed_id:
user["claimed_id"] = claimed_id
- future.set_result(user)
+ future_set_result_unless_cancelled(future, user)
def get_auth_http_client(self):
"""Returns the `.AsyncHTTPClient` instance to be used for auth requests.
future.set_exception(AuthError("Error getting user"))
return
user["access_token"] = access_token
- future.set_result(user)
+ future_set_result_unless_cancelled(future, user)
def _oauth_request_parameters(self, url, access_token, parameters={},
method="GET"):
(response.error, response.request.url)))
return
- future.set_result(escape.json_decode(response.body))
+ future_set_result_unless_cancelled(future, escape.json_decode(response.body))
def get_auth_http_client(self):
"""Returns the `.AsyncHTTPClient` instance to be used for auth requests.
"Error response %s fetching %s" % (response.error,
response.request.url)))
return
- future.set_result(escape.json_decode(response.body))
+ future_set_result_unless_cancelled(future, escape.json_decode(response.body))
def _oauth_consumer_token(self):
self.require_setting("twitter_consumer_key", "Twitter OAuth")
return
args = escape.json_decode(response.body)
- future.set_result(args)
+ future_set_result_unless_cancelled(future, args)
class FacebookGraphMixin(OAuth2Mixin):
def _on_get_user_info(self, future, session, fields, user):
if user is None:
- future.set_result(None)
+ future_set_result_unless_cancelled(future, None)
return
fieldmap = {}
# This should change in Tornado 5.0.
fieldmap.update({"access_token": session["access_token"],
"session_expires": str(session.get("expires_in"))})
- future.set_result(fieldmap)
+ future_set_result_unless_cancelled(future, fieldmap)
@_auth_return_future
def facebook_request(self, path, callback, access_token=None,
def submit(self, fn, *args, **kwargs):
future = Future()
try:
- future.set_result(fn(*args, **kwargs))
+ future_set_result_unless_cancelled(future, fn(*args, **kwargs))
except Exception:
future_set_exc_info(future, sys.exc_info())
return future
def wrapper(*args, **kwargs):
future = Future()
callback, args, kwargs = replacer.replace(
- lambda value=_NO_RESULT: future.set_result(value),
+ lambda value=_NO_RESULT: future_set_result_unless_cancelled(future, value),
args, kwargs)
def handle_error(typ, value, tb):
IOLoop.current().add_future(a, copy)
+def future_set_result_unless_cancelled(future, value):
+ """Set the given ``value`` as the `Future`'s result, if not cancelled.
+
+ Avoids asyncio.InvalidStateError when calling set_result() on
+ a cancelled `asyncio.Future`.
+
+ .. versionadded:: 5.0
+ """
+ if not future.cancelled():
+ future.set_result(value)
+
+
def future_set_exc_info(future, exc_info):
"""Set the given ``exc_info`` as the `Future`'s exception.
import types
import weakref
-from tornado.concurrent import Future, is_future, chain_future, future_set_exc_info, future_add_done_callback
+from tornado.concurrent import (Future, is_future, chain_future, future_set_exc_info,
+ future_add_done_callback, future_set_result_unless_cancelled)
from tornado.ioloop import IOLoop
from tornado.log import app_log
from tornado import stack_context
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
- future.set_result(_value_from_stopiteration(e))
+ future_set_result_unless_cancelled(future, _value_from_stopiteration(e))
except Exception:
future_set_exc_info(future, sys.exc_info())
else:
# used in the absence of cycles). We can avoid the
# cycle by clearing the local variable after we return it.
future = None
- future.set_result(result)
+ future_set_result_unless_cancelled(future, result)
return future
wrapper.__wrapped__ = wrapped
def set_result(result):
if future.done():
return
- future.set_result(result)
+ future_set_result_unless_cancelled(future, result)
with stack_context.ExceptionStackContext(handle_exception):
func(*args, callback=_argument_adapter(set_result), **kwargs)
return future
future = _create_future()
if not children:
- future.set_result({} if keys is not None else [])
+ future_set_result_unless_cancelled(future,
+ {} if keys is not None else [])
def callback(f):
unfinished_children.remove(f)
future_set_exc_info(future, sys.exc_info())
if not future.done():
if keys is not None:
- future.set_result(dict(zip(keys, result_list)))
+ future_set_result_unless_cancelled(future,
+ dict(zip(keys, result_list)))
else:
- future.set_result(result_list)
+ future_set_result_unless_cancelled(future, result_list)
listening = set()
for f in children:
.. versionadded:: 4.1
"""
f = _create_future()
- IOLoop.current().call_later(duration, lambda: f.set_result(None))
+ IOLoop.current().call_later(duration,
+ lambda: future_set_result_unless_cancelled(f, None))
return f
self.results[key] = result
if self.yield_point is not None and self.yield_point.is_ready():
try:
- self.future.set_result(self.yield_point.get_result())
+ future_set_result_unless_cancelled(self.future,
+ self.yield_point.get_result())
except:
future_set_exc_info(self.future, sys.exc_info())
self.yield_point = None
raise LeakedCallbackError(
"finished without waiting for callbacks %r" %
self.pending_callbacks)
- self.result_future.set_result(_value_from_stopiteration(e))
+ future_set_result_unless_cancelled(self.result_future,
+ _value_from_stopiteration(e))
self.result_future = None
self._deactivate_stack_context()
return
try:
yielded.start(self)
if yielded.is_ready():
- self.future.set_result(
+ future_set_result_unless_cancelled(self.future,
yielded.get_result())
else:
self.yield_point = yielded
import re
-from tornado.concurrent import Future, future_add_done_callback
+from tornado.concurrent import (Future, future_add_done_callback,
+ future_set_result_unless_cancelled)
from tornado.escape import native_str, utf8
from tornado import gen
from tornado import httputil
self._close_callback = None
callback()
if not self._finish_future.done():
- self._finish_future.set_result(None)
+ future_set_result_unless_cancelled(self._finish_future, None)
self._clear_callbacks()
def close(self):
self.stream.close()
self._clear_callbacks()
if not self._finish_future.done():
- self._finish_future.set_result(None)
+ future_set_result_unless_cancelled(self._finish_future, None)
def detach(self):
"""Take control of the underlying stream.
stream = self.stream
self.stream = None
if not self._finish_future.done():
- self._finish_future.set_result(None)
+ future_set_result_unless_cancelled(self._finish_future, None)
return stream
def set_body_timeout(self, timeout):
if self._write_future is not None:
future = self._write_future
self._write_future = None
- future.set_result(None)
+ future_set_result_unless_cancelled(future, None)
def _can_keep_alive(self, start_line, headers):
if self.params.no_keep_alive:
# default state for the next request.
self.stream.set_nodelay(False)
if not self._finish_future.done():
- self._finish_future.set_result(None)
+ future_set_result_unless_cancelled(self._finish_future, None)
def _parse_headers(self, data):
# The lstrip removes newlines that some implementations sometimes
import time
import weakref
-from tornado.concurrent import Future
+from tornado.concurrent import Future, future_set_result_unless_cancelled
from tornado.escape import utf8, native_str
from tornado import gen, httputil, stack_context
from tornado.ioloop import IOLoop
if raise_error and response.error:
future.set_exception(response.error)
else:
- future.set_result(response)
+ future_set_result_unless_cancelled(future, response)
self.fetch_impl(request, handle_response)
return future
import collections
from tornado import gen, ioloop
-from tornado.concurrent import Future
+from tornado.concurrent import Future, future_set_result_unless_cancelled
__all__ = ['Condition', 'Event', 'Semaphore', 'BoundedSemaphore', 'Lock']
if timeout:
def on_timeout():
if not waiter.done():
- waiter.set_result(False)
+ future_set_result_unless_cancelled(waiter, False)
self._garbage_collect()
io_loop = ioloop.IOLoop.current()
timeout_handle = io_loop.add_timeout(timeout, on_timeout)
waiters.append(waiter)
for waiter in waiters:
- waiter.set_result(True)
+ future_set_result_unless_cancelled(waiter, True)
def notify_all(self):
"""Wake all waiters."""
from binascii import hexlify
-from tornado.concurrent import Future
+from tornado.concurrent import Future, future_set_result_unless_cancelled
from tornado import ioloop
from tornado.iostream import PipeIOStream
from tornado.log import gen_log
# Unfortunately we don't have the original args any more.
future.set_exception(CalledProcessError(ret, None))
else:
- future.set_result(ret)
+ future_set_result_unless_cancelled(future, ret)
self.set_exit_callback(callback)
return future
import heapq
from tornado import gen, ioloop
-from tornado.concurrent import Future
+from tornado.concurrent import Future, future_set_result_unless_cancelled
from tornado.locks import Event
__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']
assert self.empty(), "queue non-empty, why are getters waiting?"
getter = self._getters.popleft()
self.__put_internal(item)
- getter.set_result(self._get())
+ future_set_result_unless_cancelled(getter, self._get())
elif self.full():
raise QueueFull
else:
assert self.full(), "queue not full, why are putters waiting?"
item, putter = self._putters.popleft()
self.__put_internal(item)
- putter.set_result(None)
+ future_set_result_unless_cancelled(putter, None)
return self._get()
elif self.qsize():
return self._get()
import sys
import traceback
-from tornado.concurrent import Future, return_future, ReturnValueIgnoredError, run_on_executor
+from tornado.concurrent import (Future, return_future, ReturnValueIgnoredError,
+ run_on_executor, future_set_result_unless_cancelled)
from tornado.escape import utf8, to_unicode
from tornado import gen
from tornado.ioloop import IOLoop
futures = None
+class MiscFutureTest(AsyncTestCase):
+
+ def test_future_set_result_unless_cancelled(self):
+ fut = Future()
+ future_set_result_unless_cancelled(fut, 42)
+ self.assertEqual(fut.result(), 42)
+ self.assertFalse(fut.cancelled())
+
+ fut = Future()
+ fut.cancel()
+ is_cancelled = fut.cancelled()
+ future_set_result_unless_cancelled(fut, 42)
+ self.assertEqual(fut.cancelled(), is_cancelled)
+ if not is_cancelled:
+ self.assertEqual(fut.result(), 42)
+
+
class ReturnFutureTest(AsyncTestCase):
@return_future
def sync_future(self, callback):
from inspect import isclass
from io import BytesIO
-from tornado.concurrent import Future
+from tornado.concurrent import Future, future_set_result_unless_cancelled
from tornado import escape
from tornado import gen
from tornado import httputil
if self._prepared_future is not None:
# Tell the Application we've finished with prepare()
# and are ready for the body to arrive.
- self._prepared_future.set_result(None)
+ future_set_result_unless_cancelled(self._prepared_future, None)
if self._finished:
return
def finish(self):
if self.stream_request_body:
- self.request.body.set_result(None)
+ future_set_result_unless_cancelled(self.request.body, None)
else:
self.request.body = b''.join(self.chunks)
self.request._parse_body()
import tornado.web
import zlib
-from tornado.concurrent import Future
+from tornado.concurrent import Future, future_set_result_unless_cancelled
from tornado.escape import utf8, native_str, to_unicode
from tornado import gen, httpclient, httputil
from tornado.ioloop import IOLoop, PeriodicCallback
# ability to see exceptions.
self.final_callback = None
- self.connect_future.set_result(self)
+ future_set_result_unless_cancelled(self.connect_future, self)
def write_message(self, message, binary=False):
"""Sends a message to the WebSocket server."""
assert self.read_future is None
future = Future()
if self.read_queue:
- future.set_result(self.read_queue.popleft())
+ future_set_result_unless_cancelled(future, self.read_queue.popleft())
else:
self.read_future = future
if callback is not None:
if self._on_message_callback:
self._on_message_callback(message)
elif self.read_future is not None:
- self.read_future.set_result(message)
+ future_set_result_unless_cancelled(self.read_future, message)
self.read_future = None
else:
self.read_queue.append(message)