import asyncio
import atexit
import concurrent.futures
+import contextvars
import errno
import functools
import select
_closed = False
def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
+ self._main_thread_ctx = contextvars.copy_context()
+
self._real_loop = real_loop
self._select_cond = threading.Condition()
# clean up if we get to this point but the event loop is closed without
# starting.
self._real_loop.call_soon(
- lambda: self._real_loop.create_task(thread_manager_anext())
+ lambda: self._real_loop.create_task(thread_manager_anext()),
+ context=self._main_thread_ctx,
)
self._readers: Dict[_FileDescriptorLike, Callable] = {}
raise
try:
- self._real_loop.call_soon_threadsafe(self._handle_select, rs, ws)
+ self._real_loop.call_soon_threadsafe(
+ self._handle_select, rs, ws, context=self._main_thread_ctx
+ )
except RuntimeError:
# "Event loop is closed". Swallow the exception for
# consistency with PollIOLoop (and logical consistency
# under the License.
import asyncio
+import contextvars
import threading
import time
import unittest
to_asyncio_future,
AddThreadSelectorEventLoop,
)
-from tornado.testing import AsyncTestCase, gen_test, setup_with_context_manager
+from tornado.testing import (
+ AsyncTestCase,
+ gen_test,
+ setup_with_context_manager,
+ AsyncHTTPTestCase,
+)
from tornado.test.util import ignore_deprecation
+from tornado.web import Application, RequestHandler
class AsyncIOLoopTest(AsyncTestCase):
asyncio.set_event_loop_policy(self.AnyThreadEventLoopPolicy())
self.assertIsInstance(self.executor.submit(IOLoop.current).result(), IOLoop)
self.executor.submit(lambda: asyncio.get_event_loop().close()).result() # type: ignore
+
+
+class SelectorThreadContextvarsTest(AsyncHTTPTestCase):
+ ctx_value = "foo"
+ test_endpoint = "/"
+ tornado_test_ctx = contextvars.ContextVar("tornado_test_ctx", default="default")
+ tornado_test_ctx.set(ctx_value)
+
+ def get_app(self) -> Application:
+ tornado_test_ctx = self.tornado_test_ctx
+
+ class Handler(RequestHandler):
+ async def get(self):
+ # On the Windows platform,
+ # when a asyncio.events.Handle is created
+ # in the SelectorThread without providing a context,
+ # it will copy the current thread's context,
+ # which can lead to the loss of the main thread's context
+ # when executing the handle.
+ # Therefore, it is necessary to
+ # save a copy of the main thread's context in the SelectorThread
+ # for creating the handle.
+ self.write(tornado_test_ctx.get())
+
+ return Application([(self.test_endpoint, Handler)])
+
+ def test_context_vars(self):
+ self.assertEqual(self.ctx_value, self.fetch(self.test_endpoint).body.decode())