request_delegate = delegate.start_request(self, conn)
try:
ret = await conn.read_response(request_delegate)
- except (iostream.StreamClosedError, iostream.UnsatisfiableReadError):
+ except (
+ iostream.StreamClosedError,
+ iostream.UnsatisfiableReadError,
+ asyncio.CancelledError,
+ ):
return
except _QuietException:
# This exception was already logged.
def _run_callback(self, callback: Callable[[], Any]) -> None:
"""Runs a callback with error handling.
- For use in subclasses.
+ .. versionchanged:: 6.0
+
+ CancelledErrors are no longer logged.
"""
try:
ret = callback()
pass
else:
self.add_future(ret, self._discard_future_result)
+ except asyncio.CancelledError:
+ pass
except Exception:
app_log.error("Exception in callback %r", callback, exc_info=True)
import sys
import re
-from tornado.concurrent import Future
+from tornado.concurrent import Future, future_set_result_unless_cancelled
from tornado import ioloop
from tornado.log import gen_log
from tornado.netutil import ssl_wrap_socket, _client_ssl_defaults, _server_ssl_defaults
if self._read_future is not None:
future = self._read_future
self._read_future = None
- future.set_result(result)
+ future_set_result_unless_cancelled(future, result)
self._maybe_add_error_listener()
def _try_inline_read(self) -> None:
if index > self._total_write_done_index:
break
self._write_futures.popleft()
- future.set_result(None)
+ future_set_result_unless_cancelled(future, None)
def _consume(self, loc: int) -> bytes:
# Consume loc bytes from the read buffer and return them
if self._connect_future is not None:
future = self._connect_future
self._connect_future = None
- future.set_result(self)
+ future_set_result_unless_cancelled(future, self)
self._connecting = False
def set_nodelay(self, value: bool) -> None:
if self._ssl_connect_future is not None:
future = self._ssl_connect_future
self._ssl_connect_future = None
- future.set_result(self)
+ future_set_result_unless_cancelled(future, self)
def _verify_cert(self, peercert: Any) -> bool:
"""Returns True if peercert is valid according to the configured
@gen.coroutine
def slow_stop():
+ yield self.server.close_all_connections()
# The number of iterations is difficult to predict. Typically,
# one is sufficient, although sometimes it needs more.
for i in range(5):
from tornado import gen, ioloop
from tornado.httpserver import HTTPServer
+from tornado.locks import Event
from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, bind_unused_port, gen_test
from tornado.web import Application
import asyncio
import contextlib
+import gc
import os
import platform
import traceback
self.wait(timeout=0.15)
+class LeakTest(AsyncTestCase):
+ def tearDown(self):
+ super().tearDown()
+ # Trigger a gc to make warnings more deterministic.
+ gc.collect()
+
+ def test_leaked_coroutine(self):
+ # This test verifies that "leaked" coroutines are shut down
+ # without triggering warnings like "task was destroyed but it
+ # is pending". If this test were to fail, it would fail
+ # because runtests.py detected unexpected output to stderr.
+ event = Event()
+
+ async def callback():
+ try:
+ await event.wait()
+ except asyncio.CancelledError:
+ pass
+
+ self.io_loop.add_callback(callback)
+ self.io_loop.add_callback(self.stop)
+ self.wait()
+
+
class AsyncHTTPTestCaseTest(AsyncHTTPTestCase):
def setUp(self):
super(AsyncHTTPTestCaseTest, self).setUp()
for the tornado.autoreload module to rerun the tests when code changes.
"""
+import asyncio
from collections.abc import Generator
import functools
import inspect
self.io_loop.make_current()
def tearDown(self) -> None:
+ # Native coroutines tend to produce warnings if they're not
+ # allowed to run to completion. It's difficult to ensure that
+ # this always happens in tests, so cancel any tasks that are
+ # still pending by the time we get here.
+ asyncio_loop = self.io_loop.asyncio_loop # type: ignore
+ if hasattr(asyncio, "all_tasks"): # py37
+ tasks = asyncio.all_tasks(asyncio_loop) # type: ignore
+ else:
+ tasks = asyncio.Task.all_tasks(asyncio_loop)
+ # Tasks that are done may still appear here and may contain
+ # non-cancellation exceptions, so filter them out.
+ tasks = [t for t in tasks if not t.done()]
+ for t in tasks:
+ t.cancel()
+ # Allow the tasks to run and finalize themselves (which means
+ # raising a CancelledError inside the coroutine). This may
+ # just transform the "task was destroyed but it is pending"
+ # warning into a "uncaught CancelledError" warning, but
+ # catching CancelledErrors in coroutines that may leak is
+ # simpler than ensuring that no coroutines leak.
+ if tasks:
+ done, pending = self.io_loop.run_sync(lambda: asyncio.wait(tasks))
+ assert not pending
+ # If any task failed with anything but a CancelledError, raise it.
+ for f in done:
+ try:
+ f.result()
+ except asyncio.CancelledError:
+ pass
+
# Clean up Subprocess, so it can be used again with a new ioloop.
Subprocess.uninitialize()
self.io_loop.clear_current()