]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
controller: moving workers registration helpers out of the statistics module
authorAleš Mrázek <ales.mrazek@nic.cz>
Mon, 22 Jan 2024 12:57:10 +0000 (13:57 +0100)
committerAleš Mrázek <ales.mrazek@nic.cz>
Wed, 14 Feb 2024 12:14:30 +0000 (13:14 +0100)
manager/knot_resolver_manager/kresd_controller/interface.py
manager/knot_resolver_manager/kresd_controller/registered_workers.py [new file with mode: 0644]
manager/knot_resolver_manager/statistics.py

index fe5306fa7420fa5308932f816ddc38040e5e004b..aa339c265a18b7a4a3dc1abb042744ca760dcf4d 100644 (file)
@@ -10,9 +10,10 @@ from weakref import WeakValueDictionary
 from knot_resolver_manager.constants import kresd_config_file
 from knot_resolver_manager.datamodel.config_schema import KresConfig
 from knot_resolver_manager.exceptions import SubprocessControllerException
-from knot_resolver_manager.statistics import register_resolver_metrics_for, unregister_resolver_metrics_for
 from knot_resolver_manager.utils.async_utils import writefile
 
+from knot_resolver_manager.kresd_controller.registered_workers import register_worker, unregister_worker
+
 logger = logging.getLogger(__name__)
 
 
@@ -100,7 +101,7 @@ class Subprocess(ABC):
     def __init__(self, config: KresConfig, kid: KresID) -> None:
         self._id = kid
         self._config = config
-        self._metrics_registered: bool = False
+        self._registered_worker: bool = False
 
     async def start(self) -> None:
         # create config file
@@ -109,8 +110,8 @@ class Subprocess(ABC):
         try:
             await self._start()
             if self.type is SubprocessType.KRESD:
-                register_resolver_metrics_for(self)
-                self._metrics_registered = True
+                register_worker(self)
+                self._registered_worker = True
         except SubprocessControllerException as e:
             kresd_config_file(self._config, self.id).unlink()
             raise e
@@ -126,8 +127,8 @@ class Subprocess(ABC):
         await self._restart()
 
     async def stop(self) -> None:
-        if self._metrics_registered:
-            unregister_resolver_metrics_for(self)
+        if self._registered_worker:
+            unregister_worker(self)
         await self._stop()
         await self.cleanup()
 
diff --git a/manager/knot_resolver_manager/kresd_controller/registered_workers.py b/manager/knot_resolver_manager/kresd_controller/registered_workers.py
new file mode 100644 (file)
index 0000000..af34713
--- /dev/null
@@ -0,0 +1,47 @@
+import asyncio
+import logging
+from typing import TYPE_CHECKING, Dict, List, Tuple
+
+if TYPE_CHECKING:
+    from knot_resolver_manager.kresd_controller.interface import KresID, Subprocess
+
+
+logger = logging.getLogger(__name__)
+
+
+_REGISTERED_WORKERS: "Dict[KresID, Subprocess]" = {}
+
+
+def get_registered_workers_kids() -> "List[KresID]":
+    return list(_REGISTERED_WORKERS.keys())
+
+
+async def command_single_registered_worker(cmd: str) -> "Tuple[KresID, str]":
+    for sub in _REGISTERED_WORKERS.values():
+        return sub.id, await sub.command(cmd)
+    raise SubprocessControllerException(
+        "Unable to execute the command. There is no kresd worker running to execute the command."
+        "Try start/restart the resolver.",
+    )
+
+
+async def command_registered_workers(cmd: str) -> "Dict[KresID, str]":
+    async def single_pair(sub: "Subprocess") -> "Tuple[KresID, str]":
+        return sub.id, await sub.command(cmd)
+
+    pairs = await asyncio.gather(*(single_pair(inst) for inst in _REGISTERED_WORKERS.values()))
+    return dict(pairs)
+
+
+def unregister_worker(subprocess: "Subprocess") -> None:
+    """
+    Unregister kresd worker "Subprocess" from the list.
+    """
+    del _REGISTERED_WORKERS[subprocess.id]
+
+
+def register_worker(subprocess: "Subprocess") -> None:
+    """
+    Register kresd worker "Subprocess" on the list.
+    """
+    _REGISTERED_WORKERS[subprocess.id] = subprocess
index 72eb7ab639b305fe20c303e601fcf6ab0241c0c8..4b2c2e7d0eb25be4aa17220836a6640708cc989e 100644 (file)
@@ -11,10 +11,14 @@ from prometheus_client.core import REGISTRY, CounterMetricFamily, HistogramMetri
 from knot_resolver_manager import compat
 from knot_resolver_manager.config_store import ConfigStore, only_on_real_changes
 from knot_resolver_manager.datamodel.config_schema import KresConfig
+from knot_resolver_manager.kresd_controller.registered_workers import (
+    command_registered_workers,
+    get_registered_workers_kids,
+)
 from knot_resolver_manager.utils.functional import Result
 
 if TYPE_CHECKING:
-    from knot_resolver_manager.kresd_controller.interface import KresID, Subprocess
+    from knot_resolver_manager.kresd_controller.interface import KresID
 
 
 logger = logging.getLogger(__name__)
@@ -23,8 +27,6 @@ MANAGER_REQUEST_RECONFIGURE_LATENCY = Histogram(
     "manager_request_reconfigure_latency", "Time it takes to change configuration"
 )
 
-_REGISTERED_RESOLVERS: "Dict[KresID, Subprocess]" = {}
-
 
 T = TypeVar("T")
 
@@ -45,14 +47,6 @@ def async_timing_histogram(metric: Histogram) -> Callable[[Callable[..., Awaitab
     return decorator
 
 
-async def _command_registered_resolvers(cmd: str) -> "Dict[KresID, str]":
-    async def single_pair(sub: "Subprocess") -> "Tuple[KresID, str]":
-        return sub.id, await sub.command(cmd)
-
-    pairs = await asyncio.gather(*(single_pair(inst) for inst in _REGISTERED_RESOLVERS.values()))
-    return dict(pairs)
-
-
 def _counter(name: str, description: str, label: Tuple[str, str], value: float) -> CounterMetricFamily:
     c = CounterMetricFamily(name, description, labels=(label[0],))
     c.add_metric((label[1],), value)  # type: ignore
@@ -99,7 +93,7 @@ class ResolverCollector:
         lazy = config.monitoring.enabled == "lazy"
         cmd = "collect_lazy_statistics()" if lazy else "collect_statistics()"
         logger.debug("Collecting kresd stats with method '%s'", cmd)
-        stats_raw = await _command_registered_resolvers(cmd)
+        stats_raw = await command_registered_workers(cmd)
         self._stats_raw = stats_raw
 
         # if this function was not called by the prometheus library and calling collect() is imminent,
@@ -145,12 +139,12 @@ class ResolverCollector:
 
         # if we have no data, return metrics with information about it and exit
         if self._stats_raw is None:
-            for kid in _REGISTERED_RESOLVERS:
+            for kid in get_registered_workers_kids():
                 yield self._create_resolver_metrics_loaded_gauge(kid, False)
             return
 
         # if we have data, parse them
-        for kid in _REGISTERED_RESOLVERS:
+        for kid in get_registered_workers_kids():
             success = False
             try:
                 if kid in self._stats_raw:
@@ -333,20 +327,6 @@ class ResolverCollector:
 _resolver_collector: Optional[ResolverCollector] = None
 
 
-def unregister_resolver_metrics_for(subprocess: "Subprocess") -> None:
-    """
-    Cancel metric collection from resolver "Subprocess"
-    """
-    del _REGISTERED_RESOLVERS[subprocess.id]
-
-
-def register_resolver_metrics_for(subprocess: "Subprocess") -> None:
-    """
-    Register resolver "Subprocess" for metric collection
-    """
-    _REGISTERED_RESOLVERS[subprocess.id] = subprocess
-
-
 async def report_stats() -> bytes:
     """
     Collects metrics from everything, returns data string in Prometheus format.