]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager: verified and fix bug with graphite metrics exporter
authorVasek Sraier <git@vakabus.cz>
Wed, 9 Feb 2022 14:37:55 +0000 (15:37 +0100)
committerAleš Mrázek <ales.mrazek@nic.cz>
Fri, 8 Apr 2022 14:17:54 +0000 (16:17 +0200)
manager/knot_resolver_manager/compat/asyncio.py
manager/knot_resolver_manager/statistics.py

index 9a495e4a87d558d92f0e769d38122d9700e2389f..70485022b58295f3a6fdb36362010c264072ea5e 100644 (file)
@@ -3,16 +3,19 @@
 # pylint: disable=no-member
 
 # We disable pyright checks because it can't find method that don't exist in this Python version
-# so the reported error is correct, but due to the version checking conditions, it never happens
+# so the reported error is correct, but due to the version checking conditions, it never happens.
+# Due to backporting, we are also using private methods and non-existent members of classes
 #
 # pyright: reportUnknownMemberType=false
 # pyright: reportUnknownVariableType=false
 # pyright: reportGeneralTypeIssues=false
+# pyright: reportPrivateUsage=false
 
 import asyncio
 import functools
 import logging
 import sys
+from asyncio import AbstractEventLoop, coroutines, events, tasks
 from typing import Any, Awaitable, Callable, Coroutine, Optional, TypeVar
 
 logger = logging.getLogger(__name__)
@@ -61,25 +64,73 @@ def create_task(coro: Awaitable[T], name: Optional[str] = None) -> "asyncio.Task
         return asyncio.ensure_future(coro)
 
 
-def run(coro: Awaitable[T], debug: Optional[bool] = None) -> Awaitable[T]:
-    # ideally copy-paste of this:
+def is_event_loop_running() -> bool:
+    loop = events._get_running_loop()  # pylint: disable=protected-access
+    return loop is not None and loop.is_running()
+
+
+def run(coro: Awaitable[T], debug: Optional[bool] = None) -> T:
+    # Adapted version of this:
     # https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py#L8
 
     # version 3.7 and higher, call directly
     # disabled due to incompatibilities
-    # if sys.version_info.major >= 3 and sys.version_info.minor >= 7 and False:
-    #    return asyncio.run(coro, debug=debug)
-    # else:
-    # earlier versions, run with default executor
-    # Explicitelly create a new loop to match behaviour of asyncio.run
-    loop = asyncio.events.new_event_loop()
-    asyncio.set_event_loop(loop)
-    if debug is not None:
-        loop.set_debug(debug)
-    # The following line have a really weird type requirements. I don't understand the reasoning, but it works
-    return loop.run_until_complete(coro)  # type: ignore[arg-type]
-    # asyncio.run would cancel all running tasks, but it would use internal API for that
-    # so let's ignore it and let the tasks die
+    if sys.version_info.major >= 3 and sys.version_info.minor >= 7:
+        return asyncio.run(coro, debug=debug)  # type: ignore[attr-defined]
+
+    # earlier versions, use backported version of the function
+    if events._get_running_loop() is not None:  # pylint: disable=protected-access
+        raise RuntimeError("asyncio.run() cannot be called from a running event loop")
+
+    if not coroutines.iscoroutine(coro):
+        raise ValueError(f"a coroutine was expected, got {repr(coro)}")
+
+    loop = events.new_event_loop()
+    try:
+        events.set_event_loop(loop)
+        if debug is not None:
+            loop.set_debug(debug)
+        return loop.run_until_complete(coro)
+    finally:
+        try:
+            _cancel_all_tasks(loop)
+            loop.run_until_complete(loop.shutdown_asyncgens())
+            if hasattr(loop, "shutdown_default_executor"):
+                loop.run_until_complete(loop.shutdown_default_executor())  # type: ignore[attr-defined]
+        finally:
+            events.set_event_loop(None)
+            loop.close()
+
+
+def _cancel_all_tasks(loop: AbstractEventLoop) -> None:
+    # Backported from:
+    # https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py#L55-L74
+    #
+    to_cancel = tasks.Task.all_tasks(loop)
+    if not to_cancel:
+        return
+
+    for task in to_cancel:
+        task.cancel()
+
+    if sys.version_info.minor >= 7:
+        # since 3.7, the loop argument is implicitely the running loop
+        # since 3.10, the loop argument is removed
+        loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
+    else:
+        loop.run_until_complete(tasks.gather(*to_cancel, loop=loop, return_exceptions=True))
+
+    for task in to_cancel:
+        if task.cancelled():
+            continue
+        if task.exception() is not None:
+            loop.call_exception_handler(
+                {
+                    "message": "unhandled exception during asyncio.run() shutdown",
+                    "exception": task.exception(),
+                    "task": task,
+                }
+            )
 
 
 def add_async_signal_handler(signal: int, callback: Callable[[], Coroutine[Any, Any, None]]) -> None:
index 130503d05205b33f34ddc44adf2ceb1983305f1c..5e3b0168af2edf3bff106b0e930f31d8ccec4c70 100644 (file)
@@ -120,12 +120,19 @@ class ResolverCollector:
         # this issue can be prevented by calling the `collect_kresd_stats()` function manually before entering
         # the Prometheus library. We just have to prevent the library from invoking it again. See the mentioned
         # function for details
-        if self._collection_task is not None and not self._collection_task.done():
-            logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!")
+
+        if compat.asyncio.is_event_loop_running():
+            # when running, we can schedule the new data collection
+            if self._collection_task is not None and not self._collection_task.done():
+                logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!")
+            else:
+                self._collection_task = compat.asyncio.create_task(
+                    self.collect_kresd_stats(_triggered_from_prometheus_library=True)
+                )
+
         else:
-            self._collection_task = compat.asyncio.create_task(
-                self.collect_kresd_stats(_triggered_from_prometheus_library=True)
-            )
+            # when not running, we can start a new loop (we are not in the manager's main thread)
+            compat.asyncio.run(self.collect_kresd_stats(_triggered_from_prometheus_library=True))
 
     def _create_resolver_metrics_loaded_gauge(self, kid: KresID, loaded: bool) -> GaugeMetricFamily:
         return _gauge(