# pylint: disable=no-member
# We disable pyright checks because it can't find method that don't exist in this Python version
-# so the reported error is correct, but due to the version checking conditions, it never happens
+# so the reported error is correct, but due to the version checking conditions, it never happens.
+# Due to backporting, we are also using private methods and non-existent members of classes
#
# pyright: reportUnknownMemberType=false
# pyright: reportUnknownVariableType=false
# pyright: reportGeneralTypeIssues=false
+# pyright: reportPrivateUsage=false
import asyncio
import functools
import logging
import sys
+from asyncio import AbstractEventLoop, coroutines, events, tasks
from typing import Any, Awaitable, Callable, Coroutine, Optional, TypeVar
logger = logging.getLogger(__name__)
return asyncio.ensure_future(coro)
-def run(coro: Awaitable[T], debug: Optional[bool] = None) -> Awaitable[T]:
- # ideally copy-paste of this:
+def is_event_loop_running() -> bool:
+ loop = events._get_running_loop() # pylint: disable=protected-access
+ return loop is not None and loop.is_running()
+
+
+def run(coro: Awaitable[T], debug: Optional[bool] = None) -> T:
+ # Adapted version of this:
# https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py#L8
# version 3.7 and higher, call directly
# disabled due to incompatibilities
- # if sys.version_info.major >= 3 and sys.version_info.minor >= 7 and False:
- # return asyncio.run(coro, debug=debug)
- # else:
- # earlier versions, run with default executor
- # Explicitelly create a new loop to match behaviour of asyncio.run
- loop = asyncio.events.new_event_loop()
- asyncio.set_event_loop(loop)
- if debug is not None:
- loop.set_debug(debug)
- # The following line have a really weird type requirements. I don't understand the reasoning, but it works
- return loop.run_until_complete(coro) # type: ignore[arg-type]
- # asyncio.run would cancel all running tasks, but it would use internal API for that
- # so let's ignore it and let the tasks die
+ if sys.version_info.major >= 3 and sys.version_info.minor >= 7:
+ return asyncio.run(coro, debug=debug) # type: ignore[attr-defined]
+
+ # earlier versions, use backported version of the function
+ if events._get_running_loop() is not None: # pylint: disable=protected-access
+ raise RuntimeError("asyncio.run() cannot be called from a running event loop")
+
+ if not coroutines.iscoroutine(coro):
+ raise ValueError(f"a coroutine was expected, got {repr(coro)}")
+
+ loop = events.new_event_loop()
+ try:
+ events.set_event_loop(loop)
+ if debug is not None:
+ loop.set_debug(debug)
+ return loop.run_until_complete(coro)
+ finally:
+ try:
+ _cancel_all_tasks(loop)
+ loop.run_until_complete(loop.shutdown_asyncgens())
+ if hasattr(loop, "shutdown_default_executor"):
+ loop.run_until_complete(loop.shutdown_default_executor()) # type: ignore[attr-defined]
+ finally:
+ events.set_event_loop(None)
+ loop.close()
+
+
+def _cancel_all_tasks(loop: AbstractEventLoop) -> None:
+ # Backported from:
+ # https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py#L55-L74
+ #
+ to_cancel = tasks.Task.all_tasks(loop)
+ if not to_cancel:
+ return
+
+ for task in to_cancel:
+ task.cancel()
+
+ if sys.version_info.minor >= 7:
+ # since 3.7, the loop argument is implicitely the running loop
+ # since 3.10, the loop argument is removed
+ loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
+ else:
+ loop.run_until_complete(tasks.gather(*to_cancel, loop=loop, return_exceptions=True))
+
+ for task in to_cancel:
+ if task.cancelled():
+ continue
+ if task.exception() is not None:
+ loop.call_exception_handler(
+ {
+ "message": "unhandled exception during asyncio.run() shutdown",
+ "exception": task.exception(),
+ "task": task,
+ }
+ )
def add_async_signal_handler(signal: int, callback: Callable[[], Coroutine[Any, Any, None]]) -> None:
# this issue can be prevented by calling the `collect_kresd_stats()` function manually before entering
# the Prometheus library. We just have to prevent the library from invoking it again. See the mentioned
# function for details
- if self._collection_task is not None and not self._collection_task.done():
- logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!")
+
+ if compat.asyncio.is_event_loop_running():
+ # when running, we can schedule the new data collection
+ if self._collection_task is not None and not self._collection_task.done():
+ logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!")
+ else:
+ self._collection_task = compat.asyncio.create_task(
+ self.collect_kresd_stats(_triggered_from_prometheus_library=True)
+ )
+
else:
- self._collection_task = compat.asyncio.create_task(
- self.collect_kresd_stats(_triggered_from_prometheus_library=True)
- )
+ # when not running, we can start a new loop (we are not in the manager's main thread)
+ compat.asyncio.run(self.collect_kresd_stats(_triggered_from_prometheus_library=True))
def _create_resolver_metrics_loaded_gauge(self, kid: KresID, loaded: bool) -> GaugeMetricFamily:
return _gauge(