From: Aleš Mrázek Date: Mon, 22 Jan 2024 12:57:10 +0000 (+0100) Subject: controller: moving workers registration helpers out of the statistics module X-Git-Tag: v6.0.7~23^2~14 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=cfc4f764fff40f8cd07e8708137e0e38b98bc32e;p=thirdparty%2Fknot-resolver.git controller: moving workers registration helpers out of the statistics module --- diff --git a/manager/knot_resolver_manager/kresd_controller/interface.py b/manager/knot_resolver_manager/kresd_controller/interface.py index fe5306fa7..aa339c265 100644 --- a/manager/knot_resolver_manager/kresd_controller/interface.py +++ b/manager/knot_resolver_manager/kresd_controller/interface.py @@ -10,9 +10,10 @@ from weakref import WeakValueDictionary from knot_resolver_manager.constants import kresd_config_file from knot_resolver_manager.datamodel.config_schema import KresConfig from knot_resolver_manager.exceptions import SubprocessControllerException -from knot_resolver_manager.statistics import register_resolver_metrics_for, unregister_resolver_metrics_for from knot_resolver_manager.utils.async_utils import writefile +from knot_resolver_manager.kresd_controller.registered_workers import register_worker, unregister_worker + logger = logging.getLogger(__name__) @@ -100,7 +101,7 @@ class Subprocess(ABC): def __init__(self, config: KresConfig, kid: KresID) -> None: self._id = kid self._config = config - self._metrics_registered: bool = False + self._registered_worker: bool = False async def start(self) -> None: # create config file @@ -109,8 +110,8 @@ class Subprocess(ABC): try: await self._start() if self.type is SubprocessType.KRESD: - register_resolver_metrics_for(self) - self._metrics_registered = True + register_worker(self) + self._registered_worker = True except SubprocessControllerException as e: kresd_config_file(self._config, self.id).unlink() raise e @@ -126,8 +127,8 @@ class Subprocess(ABC): await self._restart() async def stop(self) -> None: - if self._metrics_registered: - unregister_resolver_metrics_for(self) + if self._registered_worker: + unregister_worker(self) await self._stop() await self.cleanup() diff --git a/manager/knot_resolver_manager/kresd_controller/registered_workers.py b/manager/knot_resolver_manager/kresd_controller/registered_workers.py new file mode 100644 index 000000000..af3471353 --- /dev/null +++ b/manager/knot_resolver_manager/kresd_controller/registered_workers.py @@ -0,0 +1,47 @@ +import asyncio +import logging +from typing import TYPE_CHECKING, Dict, List, Tuple + +if TYPE_CHECKING: + from knot_resolver_manager.kresd_controller.interface import KresID, Subprocess + + +logger = logging.getLogger(__name__) + + +_REGISTERED_WORKERS: "Dict[KresID, Subprocess]" = {} + + +def get_registered_workers_kids() -> "List[KresID]": + return list(_REGISTERED_WORKERS.keys()) + + +async def command_single_registered_worker(cmd: str) -> "Tuple[KresID, str]": + for sub in _REGISTERED_WORKERS.values(): + return sub.id, await sub.command(cmd) + raise SubprocessControllerException( + "Unable to execute the command. There is no kresd worker running to execute the command." + "Try start/restart the resolver.", + ) + + +async def command_registered_workers(cmd: str) -> "Dict[KresID, str]": + async def single_pair(sub: "Subprocess") -> "Tuple[KresID, str]": + return sub.id, await sub.command(cmd) + + pairs = await asyncio.gather(*(single_pair(inst) for inst in _REGISTERED_WORKERS.values())) + return dict(pairs) + + +def unregister_worker(subprocess: "Subprocess") -> None: + """ + Unregister kresd worker "Subprocess" from the list. + """ + del _REGISTERED_WORKERS[subprocess.id] + + +def register_worker(subprocess: "Subprocess") -> None: + """ + Register kresd worker "Subprocess" on the list. + """ + _REGISTERED_WORKERS[subprocess.id] = subprocess diff --git a/manager/knot_resolver_manager/statistics.py b/manager/knot_resolver_manager/statistics.py index 72eb7ab63..4b2c2e7d0 100644 --- a/manager/knot_resolver_manager/statistics.py +++ b/manager/knot_resolver_manager/statistics.py @@ -11,10 +11,14 @@ from prometheus_client.core import REGISTRY, CounterMetricFamily, HistogramMetri from knot_resolver_manager import compat from knot_resolver_manager.config_store import ConfigStore, only_on_real_changes from knot_resolver_manager.datamodel.config_schema import KresConfig +from knot_resolver_manager.kresd_controller.registered_workers import ( + command_registered_workers, + get_registered_workers_kids, +) from knot_resolver_manager.utils.functional import Result if TYPE_CHECKING: - from knot_resolver_manager.kresd_controller.interface import KresID, Subprocess + from knot_resolver_manager.kresd_controller.interface import KresID logger = logging.getLogger(__name__) @@ -23,8 +27,6 @@ MANAGER_REQUEST_RECONFIGURE_LATENCY = Histogram( "manager_request_reconfigure_latency", "Time it takes to change configuration" ) -_REGISTERED_RESOLVERS: "Dict[KresID, Subprocess]" = {} - T = TypeVar("T") @@ -45,14 +47,6 @@ def async_timing_histogram(metric: Histogram) -> Callable[[Callable[..., Awaitab return decorator -async def _command_registered_resolvers(cmd: str) -> "Dict[KresID, str]": - async def single_pair(sub: "Subprocess") -> "Tuple[KresID, str]": - return sub.id, await sub.command(cmd) - - pairs = await asyncio.gather(*(single_pair(inst) for inst in _REGISTERED_RESOLVERS.values())) - return dict(pairs) - - def _counter(name: str, description: str, label: Tuple[str, str], value: float) -> CounterMetricFamily: c = CounterMetricFamily(name, description, labels=(label[0],)) c.add_metric((label[1],), value) # type: ignore @@ -99,7 +93,7 @@ class ResolverCollector: lazy = config.monitoring.enabled == "lazy" cmd = "collect_lazy_statistics()" if lazy else "collect_statistics()" logger.debug("Collecting kresd stats with method '%s'", cmd) - stats_raw = await _command_registered_resolvers(cmd) + stats_raw = await command_registered_workers(cmd) self._stats_raw = stats_raw # if this function was not called by the prometheus library and calling collect() is imminent, @@ -145,12 +139,12 @@ class ResolverCollector: # if we have no data, return metrics with information about it and exit if self._stats_raw is None: - for kid in _REGISTERED_RESOLVERS: + for kid in get_registered_workers_kids(): yield self._create_resolver_metrics_loaded_gauge(kid, False) return # if we have data, parse them - for kid in _REGISTERED_RESOLVERS: + for kid in get_registered_workers_kids(): success = False try: if kid in self._stats_raw: @@ -333,20 +327,6 @@ class ResolverCollector: _resolver_collector: Optional[ResolverCollector] = None -def unregister_resolver_metrics_for(subprocess: "Subprocess") -> None: - """ - Cancel metric collection from resolver "Subprocess" - """ - del _REGISTERED_RESOLVERS[subprocess.id] - - -def register_resolver_metrics_for(subprocess: "Subprocess") -> None: - """ - Register resolver "Subprocess" for metric collection - """ - _REGISTERED_RESOLVERS[subprocess.id] = subprocess - - async def report_stats() -> bytes: """ Collects metrics from everything, returns data string in Prometheus format.