]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Add thread identity check to add_callback (#2469)
authorMatthew Rocklin <mrocklin@gmail.com>
Sun, 16 Sep 2018 20:09:40 +0000 (16:09 -0400)
committerBen Darnell <ben@bendarnell.com>
Sun, 16 Sep 2018 20:09:40 +0000 (16:09 -0400)
* Add thread identity check to add_callback

Fixes #2463

This reduces the overhead of add_callback when called on the same thread
as the event loop.  This uses asyncio's call_soon rather than
call_soon_threadsafe.

* Separately define add_callback_from_signal

tornado/platform/asyncio.py
tornado/test/httpclient_test.py

index f1b072c7b4ae9e663c0be8019216f7a56740e4b8..0ee5afe588b839919f31c45af19bde8f3a2f0805 100644 (file)
@@ -22,6 +22,7 @@ the same event loop.
 import concurrent.futures
 import functools
 
+from threading import get_ident
 from tornado.gen import convert_yielded
 from tornado.ioloop import IOLoop, _Selectable
 
@@ -60,8 +61,16 @@ class BaseAsyncIOLoop(IOLoop):
             if loop.is_closed():
                 del IOLoop._ioloop_for_asyncio[loop]
         IOLoop._ioloop_for_asyncio[asyncio_loop] = self
+
+        self._thread_identity = 0
+
         super(BaseAsyncIOLoop, self).initialize(**kwargs)
 
+        def assign_thread_identity() -> None:
+            self._thread_identity = get_ident()
+
+        self.add_callback(assign_thread_identity)
+
     def close(self, all_fds: bool=False) -> None:
         self.closing = True
         for fd in list(self.handlers):
@@ -157,8 +166,12 @@ class BaseAsyncIOLoop(IOLoop):
         timeout.cancel()  # type: ignore
 
     def add_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None:
+        if get_ident() == self._thread_identity:
+            call_soon = self.asyncio_loop.call_soon
+        else:
+            call_soon = self.asyncio_loop.call_soon_threadsafe
         try:
-            self.asyncio_loop.call_soon_threadsafe(
+            call_soon(
                 self._run_callback,
                 functools.partial(callback, *args, **kwargs))
         except RuntimeError:
@@ -169,7 +182,13 @@ class BaseAsyncIOLoop(IOLoop):
             # eventually execute).
             pass
 
-    add_callback_from_signal = add_callback
+    def add_callback_from_signal(self, callback: Callable, *args: Any, **kwargs: Any) -> None:
+        try:
+            self.asyncio_loop.call_soon_threadsafe(
+                self._run_callback,
+                functools.partial(callback, *args, **kwargs))
+        except RuntimeError:
+            pass
 
     def run_in_executor(self, executor: Optional[concurrent.futures.Executor],
                         func: Callable[..., _T], *args: Any) -> Awaitable[_T]:
index 4b4e3b538361bad5a190818f87640c0f5da67d53..d11ceff863e67ea24d72db02de3ee14fd0fea261 100644 (file)
@@ -547,6 +547,7 @@ class HTTPResponseTestCase(unittest.TestCase):
 class SyncHTTPClientTest(unittest.TestCase):
     def setUp(self):
         self.server_ioloop = IOLoop()
+        event = threading.Event()
 
         @gen.coroutine
         def init_server():
@@ -554,10 +555,15 @@ class SyncHTTPClientTest(unittest.TestCase):
             app = Application([('/', HelloWorldHandler)])
             self.server = HTTPServer(app)
             self.server.add_socket(sock)
-        self.server_ioloop.run_sync(init_server)
+            event.set()
 
-        self.server_thread = threading.Thread(target=self.server_ioloop.start)
+        def start():
+            self.server_ioloop.run_sync(init_server)
+            self.server_ioloop.start()
+
+        self.server_thread = threading.Thread(target=start)
         self.server_thread.start()
+        event.wait()
 
         self.http_client = HTTPClient()