From 15f4acc4bac776cd314090feb2fe5a7028a629c8 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ale=C5=A1=20Mr=C3=A1zek?= Date: Thu, 5 Sep 2024 15:13:54 +0200 Subject: [PATCH] python/knot_resolver/manager: added metrics module - statistics.py: removed - metrics: prometheus support is in separate module --- .../knot_resolver/manager/metrics/__init__.py | 4 + .../knot_resolver/manager/metrics/collect.py | 38 +++ .../{statistics.py => metrics/prometheus.py} | 260 +++++++----------- python/knot_resolver/manager/server.py | 13 +- setup.py | 1 + 5 files changed, 154 insertions(+), 162 deletions(-) create mode 100644 python/knot_resolver/manager/metrics/__init__.py create mode 100644 python/knot_resolver/manager/metrics/collect.py rename python/knot_resolver/manager/{statistics.py => metrics/prometheus.py} (66%) diff --git a/python/knot_resolver/manager/metrics/__init__.py b/python/knot_resolver/manager/metrics/__init__.py new file mode 100644 index 000000000..7e3a968d1 --- /dev/null +++ b/python/knot_resolver/manager/metrics/__init__.py @@ -0,0 +1,4 @@ +from .collect import report_json +from .prometheus import init_prometheus, report_prometheus + +__all__ = ["init_prometheus", "report_json", "report_prometheus"] diff --git a/python/knot_resolver/manager/metrics/collect.py b/python/knot_resolver/manager/metrics/collect.py new file mode 100644 index 000000000..cc9a0712a --- /dev/null +++ b/python/knot_resolver/manager/metrics/collect.py @@ -0,0 +1,38 @@ +import logging +from typing import Dict, Optional + +from knot_resolver.controller.interface import KresID +from knot_resolver.controller.registered_workers import command_registered_workers, get_registered_workers_kresids +from knot_resolver.datamodel import KresConfig +from knot_resolver.utils.modeling.parsing import DataFormat + +logger = logging.getLogger(__name__) + + +async def collect_kresd_workers_metrics(config: KresConfig) -> Optional[Dict[KresID, object]]: + if config.monitoring.enabled == "manager-only": + logger.debug("Skipping kresd stat collection due to configuration") + return None + + cmd = "collect_statistics()" + if config.monitoring.enabled == "lazy": + cmd = "collect_lazy_statistics()" + logger.debug(f"Collecting stats from all kresd workers using method '{cmd}'") + + metrics_dict = await command_registered_workers(cmd) + return metrics_dict + + +async def report_json(config: KresConfig) -> bytes: + metrics_raw = await collect_kresd_workers_metrics(config) + metrics_dict: Dict[str, Optional[object]] = {} + + if metrics_raw: + for kresd_id, kresd_metrics in metrics_raw.items(): + metrics_dict[str(kresd_id)] = kresd_metrics + else: + # if we have no metrics, return None for every kresd worker + for kresd_id in get_registered_workers_kresids(): + metrics_dict[str(kresd_id)] = None + + return DataFormat.JSON.dict_dump(metrics_dict).encode() diff --git a/python/knot_resolver/manager/statistics.py b/python/knot_resolver/manager/metrics/prometheus.py similarity index 66% rename from python/knot_resolver/manager/statistics.py rename to python/knot_resolver/manager/metrics/prometheus.py index 1dac843df..ba5f63343 100644 --- a/python/knot_resolver/manager/statistics.py +++ b/python/knot_resolver/manager/metrics/prometheus.py @@ -1,33 +1,34 @@ import asyncio import importlib -import json import logging -from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Tuple +from typing import Any, Dict, Generator, List, Optional, Tuple -from knot_resolver.controller.registered_workers import command_registered_workers, get_registered_workers_kresids +from knot_resolver.controller.interface import KresID +from knot_resolver.controller.registered_workers import get_registered_workers_kresids from knot_resolver.datamodel.config_schema import KresConfig from knot_resolver.manager.config_store import ConfigStore, only_on_real_changes_update from knot_resolver.utils import compat from knot_resolver.utils.functional import Result -from knot_resolver.utils.modeling.parsing import DataFormat -if TYPE_CHECKING: - from knot_resolver.controller.interface import KresID +from .collect import collect_kresd_workers_metrics -logger = logging.getLogger(__name__) - - -_prometheus_support = False +_prometheus_client = False if importlib.util.find_spec("prometheus_client"): - _prometheus_support = True + _prometheus_client = True +logger = logging.getLogger(__name__) + +if _prometheus_client: -if _prometheus_support: from prometheus_client import exposition # type: ignore from prometheus_client.bridge.graphite import GraphiteBridge # type: ignore from prometheus_client.core import GaugeMetricFamily # type: ignore from prometheus_client.core import REGISTRY, CounterMetricFamily, HistogramMetricFamily, Metric + _graphite_bridge: Optional[GraphiteBridge] = None + + _metrics_collector: Optional["KresPrometheusMetricsCollector"] = None + def _counter(name: str, description: str, label: Tuple[str, str], value: float) -> CounterMetricFamily: c = CounterMetricFamily(name, description, labels=(label[0],)) c.add_metric((label[1],), value) # type: ignore @@ -236,51 +237,12 @@ if _prometheus_support: value=int(loaded), ) - async def _deny_turning_off_graphite_bridge(old_config: KresConfig, new_config: KresConfig) -> Result[None, str]: - if old_config.monitoring.graphite and not new_config.monitoring.graphite: - return Result.err( - "You can't turn off graphite monitoring dynamically. If you really want this feature, please let the developers know." - ) - - if ( - old_config.monitoring.graphite is not None - and new_config.monitoring.graphite is not None - and old_config.monitoring.graphite != new_config.monitoring.graphite - ): - return Result.err("Changing graphite exporter configuration in runtime is not allowed.") - - return Result.ok(None) - - _graphite_bridge: Optional[GraphiteBridge] = None - - @only_on_real_changes_update(lambda c: c.monitoring.graphite) - async def _configure_graphite_bridge(config: KresConfig) -> None: - """ - Starts graphite bridge if required - """ - global _graphite_bridge - if config.monitoring.graphite is not False and _graphite_bridge is None: - logger.info( - "Starting Graphite metrics exporter for [%s]:%d", - str(config.monitoring.graphite.host), - int(config.monitoring.graphite.port), - ) - _graphite_bridge = GraphiteBridge( - (str(config.monitoring.graphite.host), int(config.monitoring.graphite.port)) - ) - _graphite_bridge.start( # type: ignore - interval=config.monitoring.graphite.interval.seconds(), prefix=str(config.monitoring.graphite.prefix) - ) - - -class ResolverCollector: - def __init__(self, config_store: ConfigStore) -> None: - self._stats_raw: "Optional[Dict[KresID, object]]" = None - self._config_store: ConfigStore = config_store - self._collection_task: "Optional[asyncio.Task[None]]" = None - self._skip_immediate_collection: bool = False - - if _prometheus_support: + class KresPrometheusMetricsCollector: + def __init__(self, config_store: ConfigStore) -> None: + self._stats_raw: "Optional[Dict[KresID, object]]" = None + self._config_store: ConfigStore = config_store + self._collection_task: "Optional[asyncio.Task[None]]" = None + self._skip_immediate_collection: bool = False def collect(self) -> Generator[Metric, None, None]: # schedule new stats collection @@ -300,10 +262,6 @@ class ResolverCollector: metrics = self._stats_raw[kresid] yield from _parse_resolver_metrics(kresid, metrics) success = True - except json.JSONDecodeError: - logger.warning( - "Failed to load metrics from resolver instance %s: failed to parse statistics", str(kresid) - ) except KeyError as e: logger.warning( "Failed to load metrics from resolver instance %s: attempted to read missing statistic %s", @@ -317,115 +275,103 @@ class ResolverCollector: # this function prevents the collector registry from invoking the collect function on startup return [] - def report_json(self) -> str: - # schedule new stats collection - self._trigger_stats_collection() - - # if we have no data, return metrics with information about it and exit - if self._stats_raw is None: - no_stats_dict: Dict[str, None] = {} - for kresid in get_registered_workers_kresids(): - no_stats_dict[str(kresid)] = None - return DataFormat.JSON.dict_dump(no_stats_dict) + async def collect_kresd_stats(self, _triggered_from_prometheus_library: bool = False) -> None: + if self._skip_immediate_collection: + # this would happen because we are calling this function first manually before stat generation, + # and once again immediately afterwards caused by the prometheus library's stat collection + # + # this is a code made to solve problem with calling async functions from sync methods + self._skip_immediate_collection = False + return - stats_dict: Dict[str, object] = {} - for kresid, stats in self._stats_raw.items(): - stats_dict[str(kresid)] = stats + config = self._config_store.get() + self._stats_raw = await collect_kresd_workers_metrics(config) - return DataFormat.JSON.dict_dump(stats_dict) + # if this function was not called by the prometheus library and calling collect() is imminent, + # we should block the next collection cycle as it would be useless + if not _triggered_from_prometheus_library: + self._skip_immediate_collection = True - async def collect_kresd_stats(self, _triggered_from_prometheus_library: bool = False) -> None: - if self._skip_immediate_collection: - # this would happen because we are calling this function first manually before stat generation, - # and once again immediately afterwards caused by the prometheus library's stat collection + def _trigger_stats_collection(self) -> None: + # we are running inside an event loop, but in a synchronous function and that sucks a lot + # it means that we shouldn't block the event loop by performing a blocking stats collection + # but it also means that we can't yield to the event loop as this function is synchronous + # therefore we can only start a new task, but we can't wait for it + # which causes the metrics to be delayed by one collection pass (not the best, but probably good enough) # - # this is a code made to solve problem with calling async functions from sync methods - self._skip_immediate_collection = False - return - - config = self._config_store.get() - - if config.monitoring.enabled == "manager-only": - logger.debug("Skipping kresd stat collection due to configuration") - self._stats_raw = None - return - - lazy = config.monitoring.enabled == "lazy" - cmd = "collect_lazy_statistics()" if lazy else "collect_statistics()" - logger.debug("Collecting kresd stats with method '%s'", cmd) - stats_raw = await command_registered_workers(cmd) - self._stats_raw = stats_raw - - # if this function was not called by the prometheus library and calling collect() is imminent, - # we should block the next collection cycle as it would be useless - if not _triggered_from_prometheus_library: - self._skip_immediate_collection = True - - def _trigger_stats_collection(self) -> None: - # we are running inside an event loop, but in a synchronous function and that sucks a lot - # it means that we shouldn't block the event loop by performing a blocking stats collection - # but it also means that we can't yield to the event loop as this function is synchronous - # therefore we can only start a new task, but we can't wait for it - # which causes the metrics to be delayed by one collection pass (not the best, but probably good enough) - # - # this issue can be prevented by calling the `collect_kresd_stats()` function manually before entering - # the Prometheus library. We just have to prevent the library from invoking it again. See the mentioned - # function for details - - if compat.asyncio.is_event_loop_running(): - # when running, we can schedule the new data collection - if self._collection_task is not None and not self._collection_task.done(): - logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!") - else: - self._collection_task = compat.asyncio.create_task( - self.collect_kresd_stats(_triggered_from_prometheus_library=True) - ) - - else: - # when not running, we can start a new loop (we are not in the manager's main thread) - compat.asyncio.run(self.collect_kresd_stats(_triggered_from_prometheus_library=True)) - - -_resolver_collector: Optional[ResolverCollector] = None - + # this issue can be prevented by calling the `collect_kresd_stats()` function manually before entering + # the Prometheus library. We just have to prevent the library from invoking it again. See the mentioned + # function for details + + if compat.asyncio.is_event_loop_running(): + # when running, we can schedule the new data collection + if self._collection_task is not None and not self._collection_task.done(): + logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!") + else: + self._collection_task = compat.asyncio.create_task( + self.collect_kresd_stats(_triggered_from_prometheus_library=True) + ) -async def _collect_stats() -> None: - # manually trigger stat collection so that we do not have to wait for it - if _resolver_collector is not None: - await _resolver_collector.collect_kresd_stats() - else: - raise RuntimeError("Function invoked before initializing the module!") + else: + # when not running, we can start a new loop (we are not in the manager's main thread) + compat.asyncio.run(self.collect_kresd_stats(_triggered_from_prometheus_library=True)) + @only_on_real_changes_update(lambda c: c.monitoring.graphite) + async def _init_graphite_bridge(config: KresConfig) -> None: + """ + Starts graphite bridge if required + """ + global _graphite_bridge + if config.monitoring.graphite is not False and _graphite_bridge is None: + logger.info( + "Starting Graphite metrics exporter for [%s]:%d", + str(config.monitoring.graphite.host), + int(config.monitoring.graphite.port), + ) + _graphite_bridge = GraphiteBridge( + (str(config.monitoring.graphite.host), int(config.monitoring.graphite.port)) + ) + _graphite_bridge.start( # type: ignore + interval=config.monitoring.graphite.interval.seconds(), prefix=str(config.monitoring.graphite.prefix) + ) -async def report_stats(prometheus_format: bool = False) -> Optional[bytes]: - """ - Collects metrics from everything, returns data string in JSON (default) or Prometheus format. - """ + async def _deny_turning_off_graphite_bridge(old_config: KresConfig, new_config: KresConfig) -> Result[None, str]: + if old_config.monitoring.graphite and not new_config.monitoring.graphite: + return Result.err( + "You can't turn off graphite monitoring dynamically. If you really want this feature, please let the developers know." + ) - # manually trigger stat collection so that we do not have to wait for it - if _resolver_collector is not None: - await _resolver_collector.collect_kresd_stats() - else: - raise RuntimeError("Function invoked before initializing the module!") + if ( + old_config.monitoring.graphite is not None + and new_config.monitoring.graphite is not None + and old_config.monitoring.graphite != new_config.monitoring.graphite + ): + return Result.err("Changing graphite exporter configuration in runtime is not allowed.") - if prometheus_format: - if _prometheus_support: - return exposition.generate_latest() # type: ignore - return None - return _resolver_collector.report_json().encode() + return Result.ok(None) -async def init_monitoring(config_store: ConfigStore) -> None: +async def init_prometheus(config_store: ConfigStore) -> None: """ - Initialize monitoring. Must be called before any other function from this module. + Initialize metrics collection. Must be called before any other function from this module. """ - global _resolver_collector - _resolver_collector = ResolverCollector(config_store) - - if _prometheus_support: - # register metrics collector - REGISTRY.register(_resolver_collector) # type: ignore + if _prometheus_client: + # init and register metrics collector + global _metrics_collector + _metrics_collector = KresPrometheusMetricsCollector(config_store) + REGISTRY.register(_metrics_collector) # type: ignore # register graphite bridge await config_store.register_verifier(_deny_turning_off_graphite_bridge) - await config_store.register_on_change_callback(_configure_graphite_bridge) + await config_store.register_on_change_callback(_init_graphite_bridge) + + +async def report_prometheus() -> Optional[bytes]: + if _prometheus_client: + # manually trigger stat collection so that we do not have to wait for it + if _metrics_collector is not None: + await _metrics_collector.collect_kresd_stats() + else: + raise RuntimeError("Function invoked before initializing the module!") + return exposition.generate_latest() # type: ignore + return None diff --git a/python/knot_resolver/manager/server.py b/python/knot_resolver/manager/server.py index e109cb77a..d9f7f9eea 100644 --- a/python/knot_resolver/manager/server.py +++ b/python/knot_resolver/manager/server.py @@ -25,7 +25,7 @@ from knot_resolver.datamodel.cache_schema import CacheClearRPCSchema from knot_resolver.datamodel.config_schema import KresConfig, get_rundir_without_validation from knot_resolver.datamodel.globals import Context, set_global_validation_context from knot_resolver.datamodel.management_schema import ManagementSchema -from knot_resolver.manager import statistics +from knot_resolver.manager import metrics from knot_resolver.utils import custom_atexit as atexit from knot_resolver.utils import ignore_exceptions_optional from knot_resolver.utils.async_utils import readfile @@ -105,7 +105,7 @@ class Server: async def _deny_management_changes(self, config_old: KresConfig, config_new: KresConfig) -> Result[None, str]: if config_old.management != config_new.management: return Result.err( - "/server/management: Changing management API address/unix-socket dynamically is not allowed as it's really dangerous." + "/server/management: Changing management API address/uTruenix-socket dynamically is not allowed as it's really dangerous." " If you really need this feature, please contact the developers and explain why. Technically," " there are no problems in supporting it. We are only blocking the dynamic changes because" " we think the consequences of leaving this footgun unprotected are worse than its usefulness." @@ -239,15 +239,18 @@ class Server: raise web.HTTPMovedPermanently("/metrics/json") async def _handler_metrics_json(self, _request: web.Request) -> web.Response: + + config = self.config_store.get() + return web.Response( - body=await statistics.report_stats(), + body=await metrics.report_json(config), content_type="application/json", charset="utf8", ) async def _handler_metrics_prometheus(self, _request: web.Request) -> web.Response: - metrics_report = await statistics.report_stats(prometheus_format=True) + metrics_report = await metrics.report_prometheus() if not metrics_report: raise web.HTTPNotFound() @@ -555,7 +558,7 @@ async def start_server(config: Path = CONFIG_FILE_PATH_DEFAULT) -> int: # With configuration on hand, we can initialize monitoring. We want to do this before any subprocesses are # started, therefore before initializing manager - await statistics.init_monitoring(config_store) + await metrics.init_prometheus(config_store) # prepare instance of the server (no side effects) server = Server(config_store, config) diff --git a/setup.py b/setup.py index 904ded57d..4234d8b0d 100644 --- a/setup.py +++ b/setup.py @@ -15,6 +15,7 @@ packages = \ 'knot_resolver.datamodel.templates', 'knot_resolver.datamodel.types', 'knot_resolver.manager', + 'knot_resolver.manager.metrics', 'knot_resolver.utils', 'knot_resolver.utils.compat', 'knot_resolver.utils.modeling'] -- 2.47.2