atexit.register(_atexit_callback)
-if sys.version_info >= (3, 10):
-
- def _get_event_loop() -> asyncio.AbstractEventLoop:
- try:
- return asyncio.get_running_loop()
- except RuntimeError:
- pass
-
- return asyncio.get_event_loop_policy().get_event_loop()
-
-else:
- from asyncio import get_event_loop as _get_event_loop
-
class BaseAsyncIOLoop(IOLoop):
def initialize( # type: ignore
handler_func(fileobj, events)
def start(self) -> None:
- try:
- old_loop = _get_event_loop()
- except (RuntimeError, AssertionError):
- old_loop = None # type: ignore
- try:
- asyncio.set_event_loop(self.asyncio_loop)
- self.asyncio_loop.run_forever()
- finally:
- asyncio.set_event_loop(old_loop)
+ self.asyncio_loop.run_forever()
def stop(self) -> None:
self.asyncio_loop.stop()