From: Vasek Sraier Date: Wed, 9 Feb 2022 14:37:55 +0000 (+0100) Subject: manager: verified and fix bug with graphite metrics exporter X-Git-Tag: v6.0.0a1~43^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f74c3aa2c80dee083731ea7aea2f684568880461;p=thirdparty%2Fknot-resolver.git manager: verified and fix bug with graphite metrics exporter --- diff --git a/manager/knot_resolver_manager/compat/asyncio.py b/manager/knot_resolver_manager/compat/asyncio.py index 9a495e4a8..70485022b 100644 --- a/manager/knot_resolver_manager/compat/asyncio.py +++ b/manager/knot_resolver_manager/compat/asyncio.py @@ -3,16 +3,19 @@ # pylint: disable=no-member # We disable pyright checks because it can't find method that don't exist in this Python version -# so the reported error is correct, but due to the version checking conditions, it never happens +# so the reported error is correct, but due to the version checking conditions, it never happens. +# Due to backporting, we are also using private methods and non-existent members of classes # # pyright: reportUnknownMemberType=false # pyright: reportUnknownVariableType=false # pyright: reportGeneralTypeIssues=false +# pyright: reportPrivateUsage=false import asyncio import functools import logging import sys +from asyncio import AbstractEventLoop, coroutines, events, tasks from typing import Any, Awaitable, Callable, Coroutine, Optional, TypeVar logger = logging.getLogger(__name__) @@ -61,25 +64,73 @@ def create_task(coro: Awaitable[T], name: Optional[str] = None) -> "asyncio.Task return asyncio.ensure_future(coro) -def run(coro: Awaitable[T], debug: Optional[bool] = None) -> Awaitable[T]: - # ideally copy-paste of this: +def is_event_loop_running() -> bool: + loop = events._get_running_loop() # pylint: disable=protected-access + return loop is not None and loop.is_running() + + +def run(coro: Awaitable[T], debug: Optional[bool] = None) -> T: + # Adapted version of this: # https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py#L8 # version 3.7 and higher, call directly # disabled due to incompatibilities - # if sys.version_info.major >= 3 and sys.version_info.minor >= 7 and False: - # return asyncio.run(coro, debug=debug) - # else: - # earlier versions, run with default executor - # Explicitelly create a new loop to match behaviour of asyncio.run - loop = asyncio.events.new_event_loop() - asyncio.set_event_loop(loop) - if debug is not None: - loop.set_debug(debug) - # The following line have a really weird type requirements. I don't understand the reasoning, but it works - return loop.run_until_complete(coro) # type: ignore[arg-type] - # asyncio.run would cancel all running tasks, but it would use internal API for that - # so let's ignore it and let the tasks die + if sys.version_info.major >= 3 and sys.version_info.minor >= 7: + return asyncio.run(coro, debug=debug) # type: ignore[attr-defined] + + # earlier versions, use backported version of the function + if events._get_running_loop() is not None: # pylint: disable=protected-access + raise RuntimeError("asyncio.run() cannot be called from a running event loop") + + if not coroutines.iscoroutine(coro): + raise ValueError(f"a coroutine was expected, got {repr(coro)}") + + loop = events.new_event_loop() + try: + events.set_event_loop(loop) + if debug is not None: + loop.set_debug(debug) + return loop.run_until_complete(coro) + finally: + try: + _cancel_all_tasks(loop) + loop.run_until_complete(loop.shutdown_asyncgens()) + if hasattr(loop, "shutdown_default_executor"): + loop.run_until_complete(loop.shutdown_default_executor()) # type: ignore[attr-defined] + finally: + events.set_event_loop(None) + loop.close() + + +def _cancel_all_tasks(loop: AbstractEventLoop) -> None: + # Backported from: + # https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py#L55-L74 + # + to_cancel = tasks.Task.all_tasks(loop) + if not to_cancel: + return + + for task in to_cancel: + task.cancel() + + if sys.version_info.minor >= 7: + # since 3.7, the loop argument is implicitely the running loop + # since 3.10, the loop argument is removed + loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True)) + else: + loop.run_until_complete(tasks.gather(*to_cancel, loop=loop, return_exceptions=True)) + + for task in to_cancel: + if task.cancelled(): + continue + if task.exception() is not None: + loop.call_exception_handler( + { + "message": "unhandled exception during asyncio.run() shutdown", + "exception": task.exception(), + "task": task, + } + ) def add_async_signal_handler(signal: int, callback: Callable[[], Coroutine[Any, Any, None]]) -> None: diff --git a/manager/knot_resolver_manager/statistics.py b/manager/knot_resolver_manager/statistics.py index 130503d05..5e3b0168a 100644 --- a/manager/knot_resolver_manager/statistics.py +++ b/manager/knot_resolver_manager/statistics.py @@ -120,12 +120,19 @@ class ResolverCollector: # this issue can be prevented by calling the `collect_kresd_stats()` function manually before entering # the Prometheus library. We just have to prevent the library from invoking it again. See the mentioned # function for details - if self._collection_task is not None and not self._collection_task.done(): - logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!") + + if compat.asyncio.is_event_loop_running(): + # when running, we can schedule the new data collection + if self._collection_task is not None and not self._collection_task.done(): + logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!") + else: + self._collection_task = compat.asyncio.create_task( + self.collect_kresd_stats(_triggered_from_prometheus_library=True) + ) + else: - self._collection_task = compat.asyncio.create_task( - self.collect_kresd_stats(_triggered_from_prometheus_library=True) - ) + # when not running, we can start a new loop (we are not in the manager's main thread) + compat.asyncio.run(self.collect_kresd_stats(_triggered_from_prometheus_library=True)) def _create_resolver_metrics_loaded_gauge(self, kid: KresID, loaded: bool) -> GaugeMetricFamily: return _gauge(