import concurrent.futures
import functools
+from threading import get_ident
from tornado.gen import convert_yielded
from tornado.ioloop import IOLoop, _Selectable
if loop.is_closed():
del IOLoop._ioloop_for_asyncio[loop]
IOLoop._ioloop_for_asyncio[asyncio_loop] = self
+
+ self._thread_identity = 0
+
super(BaseAsyncIOLoop, self).initialize(**kwargs)
+ def assign_thread_identity() -> None:
+ self._thread_identity = get_ident()
+
+ self.add_callback(assign_thread_identity)
+
def close(self, all_fds: bool=False) -> None:
self.closing = True
for fd in list(self.handlers):
timeout.cancel() # type: ignore
def add_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None:
+ if get_ident() == self._thread_identity:
+ call_soon = self.asyncio_loop.call_soon
+ else:
+ call_soon = self.asyncio_loop.call_soon_threadsafe
try:
- self.asyncio_loop.call_soon_threadsafe(
+ call_soon(
self._run_callback,
functools.partial(callback, *args, **kwargs))
except RuntimeError:
# eventually execute).
pass
- add_callback_from_signal = add_callback
+ def add_callback_from_signal(self, callback: Callable, *args: Any, **kwargs: Any) -> None:
+ try:
+ self.asyncio_loop.call_soon_threadsafe(
+ self._run_callback,
+ functools.partial(callback, *args, **kwargs))
+ except RuntimeError:
+ pass
def run_in_executor(self, executor: Optional[concurrent.futures.Executor],
func: Callable[..., _T], *args: Any) -> Awaitable[_T]:
class SyncHTTPClientTest(unittest.TestCase):
def setUp(self):
self.server_ioloop = IOLoop()
+ event = threading.Event()
@gen.coroutine
def init_server():
app = Application([('/', HelloWorldHandler)])
self.server = HTTPServer(app)
self.server.add_socket(sock)
- self.server_ioloop.run_sync(init_server)
+ event.set()
- self.server_thread = threading.Thread(target=self.server_ioloop.start)
+ def start():
+ self.server_ioloop.run_sync(init_server)
+ self.server_ioloop.start()
+
+ self.server_thread = threading.Thread(target=start)
self.server_thread.start()
+ event.wait()
self.http_client = HTTPClient()