]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
python/knot_resolver/manager: added metrics module
authorAleš Mrázek <ales.mrazek@nic.cz>
Thu, 5 Sep 2024 13:13:54 +0000 (15:13 +0200)
committerAleš Mrázek <ales.mrazek@nic.cz>
Fri, 6 Sep 2024 22:28:31 +0000 (00:28 +0200)
- statistics.py: removed
- metrics: prometheus support is in separate module

python/knot_resolver/manager/metrics/__init__.py [new file with mode: 0644]
python/knot_resolver/manager/metrics/collect.py [new file with mode: 0644]
python/knot_resolver/manager/metrics/prometheus.py [moved from python/knot_resolver/manager/statistics.py with 66% similarity]
python/knot_resolver/manager/server.py
setup.py

diff --git a/python/knot_resolver/manager/metrics/__init__.py b/python/knot_resolver/manager/metrics/__init__.py
new file mode 100644 (file)
index 0000000..7e3a968
--- /dev/null
@@ -0,0 +1,4 @@
+from .collect import report_json
+from .prometheus import init_prometheus, report_prometheus
+
+__all__ = ["init_prometheus", "report_json", "report_prometheus"]
diff --git a/python/knot_resolver/manager/metrics/collect.py b/python/knot_resolver/manager/metrics/collect.py
new file mode 100644 (file)
index 0000000..cc9a071
--- /dev/null
@@ -0,0 +1,38 @@
+import logging
+from typing import Dict, Optional
+
+from knot_resolver.controller.interface import KresID
+from knot_resolver.controller.registered_workers import command_registered_workers, get_registered_workers_kresids
+from knot_resolver.datamodel import KresConfig
+from knot_resolver.utils.modeling.parsing import DataFormat
+
+logger = logging.getLogger(__name__)
+
+
+async def collect_kresd_workers_metrics(config: KresConfig) -> Optional[Dict[KresID, object]]:
+    if config.monitoring.enabled == "manager-only":
+        logger.debug("Skipping kresd stat collection due to configuration")
+        return None
+
+    cmd = "collect_statistics()"
+    if config.monitoring.enabled == "lazy":
+        cmd = "collect_lazy_statistics()"
+    logger.debug(f"Collecting stats from all kresd workers using method '{cmd}'")
+
+    metrics_dict = await command_registered_workers(cmd)
+    return metrics_dict
+
+
+async def report_json(config: KresConfig) -> bytes:
+    metrics_raw = await collect_kresd_workers_metrics(config)
+    metrics_dict: Dict[str, Optional[object]] = {}
+
+    if metrics_raw:
+        for kresd_id, kresd_metrics in metrics_raw.items():
+            metrics_dict[str(kresd_id)] = kresd_metrics
+    else:
+        # if we have no metrics, return None for every kresd worker
+        for kresd_id in get_registered_workers_kresids():
+            metrics_dict[str(kresd_id)] = None
+
+    return DataFormat.JSON.dict_dump(metrics_dict).encode()
similarity index 66%
rename from python/knot_resolver/manager/statistics.py
rename to python/knot_resolver/manager/metrics/prometheus.py
index 1dac843dfb564e333dbfaafbf66b1b656a79d7d4..ba5f63343caba79752b4bbbb4d837dfa7923e6f8 100644 (file)
@@ -1,33 +1,34 @@
 import asyncio
 import importlib
-import json
 import logging
-from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Tuple
+from typing import Any, Dict, Generator, List, Optional, Tuple
 
-from knot_resolver.controller.registered_workers import command_registered_workers, get_registered_workers_kresids
+from knot_resolver.controller.interface import KresID
+from knot_resolver.controller.registered_workers import get_registered_workers_kresids
 from knot_resolver.datamodel.config_schema import KresConfig
 from knot_resolver.manager.config_store import ConfigStore, only_on_real_changes_update
 from knot_resolver.utils import compat
 from knot_resolver.utils.functional import Result
-from knot_resolver.utils.modeling.parsing import DataFormat
 
-if TYPE_CHECKING:
-    from knot_resolver.controller.interface import KresID
+from .collect import collect_kresd_workers_metrics
 
-logger = logging.getLogger(__name__)
-
-
-_prometheus_support = False
+_prometheus_client = False
 if importlib.util.find_spec("prometheus_client"):
-    _prometheus_support = True
+    _prometheus_client = True
 
+logger = logging.getLogger(__name__)
+
+if _prometheus_client:
 
-if _prometheus_support:
     from prometheus_client import exposition  # type: ignore
     from prometheus_client.bridge.graphite import GraphiteBridge  # type: ignore
     from prometheus_client.core import GaugeMetricFamily  # type: ignore
     from prometheus_client.core import REGISTRY, CounterMetricFamily, HistogramMetricFamily, Metric
 
+    _graphite_bridge: Optional[GraphiteBridge] = None
+
+    _metrics_collector: Optional["KresPrometheusMetricsCollector"] = None
+
     def _counter(name: str, description: str, label: Tuple[str, str], value: float) -> CounterMetricFamily:
         c = CounterMetricFamily(name, description, labels=(label[0],))
         c.add_metric((label[1],), value)  # type: ignore
@@ -236,51 +237,12 @@ if _prometheus_support:
             value=int(loaded),
         )
 
-    async def _deny_turning_off_graphite_bridge(old_config: KresConfig, new_config: KresConfig) -> Result[None, str]:
-        if old_config.monitoring.graphite and not new_config.monitoring.graphite:
-            return Result.err(
-                "You can't turn off graphite monitoring dynamically. If you really want this feature, please let the developers know."
-            )
-
-        if (
-            old_config.monitoring.graphite is not None
-            and new_config.monitoring.graphite is not None
-            and old_config.monitoring.graphite != new_config.monitoring.graphite
-        ):
-            return Result.err("Changing graphite exporter configuration in runtime is not allowed.")
-
-        return Result.ok(None)
-
-    _graphite_bridge: Optional[GraphiteBridge] = None
-
-    @only_on_real_changes_update(lambda c: c.monitoring.graphite)
-    async def _configure_graphite_bridge(config: KresConfig) -> None:
-        """
-        Starts graphite bridge if required
-        """
-        global _graphite_bridge
-        if config.monitoring.graphite is not False and _graphite_bridge is None:
-            logger.info(
-                "Starting Graphite metrics exporter for [%s]:%d",
-                str(config.monitoring.graphite.host),
-                int(config.monitoring.graphite.port),
-            )
-            _graphite_bridge = GraphiteBridge(
-                (str(config.monitoring.graphite.host), int(config.monitoring.graphite.port))
-            )
-            _graphite_bridge.start(  # type: ignore
-                interval=config.monitoring.graphite.interval.seconds(), prefix=str(config.monitoring.graphite.prefix)
-            )
-
-
-class ResolverCollector:
-    def __init__(self, config_store: ConfigStore) -> None:
-        self._stats_raw: "Optional[Dict[KresID, object]]" = None
-        self._config_store: ConfigStore = config_store
-        self._collection_task: "Optional[asyncio.Task[None]]" = None
-        self._skip_immediate_collection: bool = False
-
-    if _prometheus_support:
+    class KresPrometheusMetricsCollector:
+        def __init__(self, config_store: ConfigStore) -> None:
+            self._stats_raw: "Optional[Dict[KresID, object]]" = None
+            self._config_store: ConfigStore = config_store
+            self._collection_task: "Optional[asyncio.Task[None]]" = None
+            self._skip_immediate_collection: bool = False
 
         def collect(self) -> Generator[Metric, None, None]:
             # schedule new stats collection
@@ -300,10 +262,6 @@ class ResolverCollector:
                         metrics = self._stats_raw[kresid]
                         yield from _parse_resolver_metrics(kresid, metrics)
                         success = True
-                except json.JSONDecodeError:
-                    logger.warning(
-                        "Failed to load metrics from resolver instance %s: failed to parse statistics", str(kresid)
-                    )
                 except KeyError as e:
                     logger.warning(
                         "Failed to load metrics from resolver instance %s: attempted to read missing statistic %s",
@@ -317,115 +275,103 @@ class ResolverCollector:
             # this function prevents the collector registry from invoking the collect function on startup
             return []
 
-    def report_json(self) -> str:
-        # schedule new stats collection
-        self._trigger_stats_collection()
-
-        # if we have no data, return metrics with information about it and exit
-        if self._stats_raw is None:
-            no_stats_dict: Dict[str, None] = {}
-            for kresid in get_registered_workers_kresids():
-                no_stats_dict[str(kresid)] = None
-            return DataFormat.JSON.dict_dump(no_stats_dict)
+        async def collect_kresd_stats(self, _triggered_from_prometheus_library: bool = False) -> None:
+            if self._skip_immediate_collection:
+                # this would happen because we are calling this function first manually before stat generation,
+                # and once again immediately afterwards caused by the prometheus library's stat collection
+                #
+                # this is a code made to solve problem with calling async functions from sync methods
+                self._skip_immediate_collection = False
+                return
 
-        stats_dict: Dict[str, object] = {}
-        for kresid, stats in self._stats_raw.items():
-            stats_dict[str(kresid)] = stats
+            config = self._config_store.get()
+            self._stats_raw = await collect_kresd_workers_metrics(config)
 
-        return DataFormat.JSON.dict_dump(stats_dict)
+            # if this function was not called by the prometheus library and calling collect() is imminent,
+            # we should block the next collection cycle as it would be useless
+            if not _triggered_from_prometheus_library:
+                self._skip_immediate_collection = True
 
-    async def collect_kresd_stats(self, _triggered_from_prometheus_library: bool = False) -> None:
-        if self._skip_immediate_collection:
-            # this would happen because we are calling this function first manually before stat generation,
-            # and once again immediately afterwards caused by the prometheus library's stat collection
+        def _trigger_stats_collection(self) -> None:
+            # we are running inside an event loop, but in a synchronous function and that sucks a lot
+            # it means that we shouldn't block the event loop by performing a blocking stats collection
+            # but it also means that we can't yield to the event loop as this function is synchronous
+            # therefore we can only start a new task, but we can't wait for it
+            # which causes the metrics to be delayed by one collection pass (not the best, but probably good enough)
             #
-            # this is a code made to solve problem with calling async functions from sync methods
-            self._skip_immediate_collection = False
-            return
-
-        config = self._config_store.get()
-
-        if config.monitoring.enabled == "manager-only":
-            logger.debug("Skipping kresd stat collection due to configuration")
-            self._stats_raw = None
-            return
-
-        lazy = config.monitoring.enabled == "lazy"
-        cmd = "collect_lazy_statistics()" if lazy else "collect_statistics()"
-        logger.debug("Collecting kresd stats with method '%s'", cmd)
-        stats_raw = await command_registered_workers(cmd)
-        self._stats_raw = stats_raw
-
-        # if this function was not called by the prometheus library and calling collect() is imminent,
-        # we should block the next collection cycle as it would be useless
-        if not _triggered_from_prometheus_library:
-            self._skip_immediate_collection = True
-
-    def _trigger_stats_collection(self) -> None:
-        # we are running inside an event loop, but in a synchronous function and that sucks a lot
-        # it means that we shouldn't block the event loop by performing a blocking stats collection
-        # but it also means that we can't yield to the event loop as this function is synchronous
-        # therefore we can only start a new task, but we can't wait for it
-        # which causes the metrics to be delayed by one collection pass (not the best, but probably good enough)
-        #
-        # this issue can be prevented by calling the `collect_kresd_stats()` function manually before entering
-        # the Prometheus library. We just have to prevent the library from invoking it again. See the mentioned
-        # function for details
-
-        if compat.asyncio.is_event_loop_running():
-            # when running, we can schedule the new data collection
-            if self._collection_task is not None and not self._collection_task.done():
-                logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!")
-            else:
-                self._collection_task = compat.asyncio.create_task(
-                    self.collect_kresd_stats(_triggered_from_prometheus_library=True)
-                )
-
-        else:
-            # when not running, we can start a new loop (we are not in the manager's main thread)
-            compat.asyncio.run(self.collect_kresd_stats(_triggered_from_prometheus_library=True))
-
-
-_resolver_collector: Optional[ResolverCollector] = None
-
+            # this issue can be prevented by calling the `collect_kresd_stats()` function manually before entering
+            # the Prometheus library. We just have to prevent the library from invoking it again. See the mentioned
+            # function for details
+
+            if compat.asyncio.is_event_loop_running():
+                # when running, we can schedule the new data collection
+                if self._collection_task is not None and not self._collection_task.done():
+                    logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!")
+                else:
+                    self._collection_task = compat.asyncio.create_task(
+                        self.collect_kresd_stats(_triggered_from_prometheus_library=True)
+                    )
 
-async def _collect_stats() -> None:
-    # manually trigger stat collection so that we do not have to wait for it
-    if _resolver_collector is not None:
-        await _resolver_collector.collect_kresd_stats()
-    else:
-        raise RuntimeError("Function invoked before initializing the module!")
+            else:
+                # when not running, we can start a new loop (we are not in the manager's main thread)
+                compat.asyncio.run(self.collect_kresd_stats(_triggered_from_prometheus_library=True))
 
+    @only_on_real_changes_update(lambda c: c.monitoring.graphite)
+    async def _init_graphite_bridge(config: KresConfig) -> None:
+        """
+        Starts graphite bridge if required
+        """
+        global _graphite_bridge
+        if config.monitoring.graphite is not False and _graphite_bridge is None:
+            logger.info(
+                "Starting Graphite metrics exporter for [%s]:%d",
+                str(config.monitoring.graphite.host),
+                int(config.monitoring.graphite.port),
+            )
+            _graphite_bridge = GraphiteBridge(
+                (str(config.monitoring.graphite.host), int(config.monitoring.graphite.port))
+            )
+            _graphite_bridge.start(  # type: ignore
+                interval=config.monitoring.graphite.interval.seconds(), prefix=str(config.monitoring.graphite.prefix)
+            )
 
-async def report_stats(prometheus_format: bool = False) -> Optional[bytes]:
-    """
-    Collects metrics from everything, returns data string in JSON (default) or Prometheus format.
-    """
+    async def _deny_turning_off_graphite_bridge(old_config: KresConfig, new_config: KresConfig) -> Result[None, str]:
+        if old_config.monitoring.graphite and not new_config.monitoring.graphite:
+            return Result.err(
+                "You can't turn off graphite monitoring dynamically. If you really want this feature, please let the developers know."
+            )
 
-    # manually trigger stat collection so that we do not have to wait for it
-    if _resolver_collector is not None:
-        await _resolver_collector.collect_kresd_stats()
-    else:
-        raise RuntimeError("Function invoked before initializing the module!")
+        if (
+            old_config.monitoring.graphite is not None
+            and new_config.monitoring.graphite is not None
+            and old_config.monitoring.graphite != new_config.monitoring.graphite
+        ):
+            return Result.err("Changing graphite exporter configuration in runtime is not allowed.")
 
-    if prometheus_format:
-        if _prometheus_support:
-            return exposition.generate_latest()  # type: ignore
-        return None
-    return _resolver_collector.report_json().encode()
+        return Result.ok(None)
 
 
-async def init_monitoring(config_store: ConfigStore) -> None:
+async def init_prometheus(config_store: ConfigStore) -> None:
     """
-    Initialize monitoring. Must be called before any other function from this module.
+    Initialize metrics collection. Must be called before any other function from this module.
     """
-    global _resolver_collector
-    _resolver_collector = ResolverCollector(config_store)
-
-    if _prometheus_support:
-        # register metrics collector
-        REGISTRY.register(_resolver_collector)  # type: ignore
+    if _prometheus_client:
+        # init and register metrics collector
+        global _metrics_collector
+        _metrics_collector = KresPrometheusMetricsCollector(config_store)
+        REGISTRY.register(_metrics_collector)  # type: ignore
 
         # register graphite bridge
         await config_store.register_verifier(_deny_turning_off_graphite_bridge)
-        await config_store.register_on_change_callback(_configure_graphite_bridge)
+        await config_store.register_on_change_callback(_init_graphite_bridge)
+
+
+async def report_prometheus() -> Optional[bytes]:
+    if _prometheus_client:
+        # manually trigger stat collection so that we do not have to wait for it
+        if _metrics_collector is not None:
+            await _metrics_collector.collect_kresd_stats()
+        else:
+            raise RuntimeError("Function invoked before initializing the module!")
+        return exposition.generate_latest()  # type: ignore
+    return None
index e109cb77a6e0441ce05b402bac61d602c7823a5a..d9f7f9eeac4a6db7838e0b8ab96b168cdbdd99d0 100644 (file)
@@ -25,7 +25,7 @@ from knot_resolver.datamodel.cache_schema import CacheClearRPCSchema
 from knot_resolver.datamodel.config_schema import KresConfig, get_rundir_without_validation
 from knot_resolver.datamodel.globals import Context, set_global_validation_context
 from knot_resolver.datamodel.management_schema import ManagementSchema
-from knot_resolver.manager import statistics
+from knot_resolver.manager import metrics
 from knot_resolver.utils import custom_atexit as atexit
 from knot_resolver.utils import ignore_exceptions_optional
 from knot_resolver.utils.async_utils import readfile
@@ -105,7 +105,7 @@ class Server:
     async def _deny_management_changes(self, config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
         if config_old.management != config_new.management:
             return Result.err(
-                "/server/management: Changing management API address/unix-socket dynamically is not allowed as it's really dangerous."
+                "/server/management: Changing management API address/uTruenix-socket dynamically is not allowed as it's really dangerous."
                 " If you really need this feature, please contact the developers and explain why. Technically,"
                 " there are no problems in supporting it. We are only blocking the dynamic changes because"
                 " we think the consequences of leaving this footgun unprotected are worse than its usefulness."
@@ -239,15 +239,18 @@ class Server:
         raise web.HTTPMovedPermanently("/metrics/json")
 
     async def _handler_metrics_json(self, _request: web.Request) -> web.Response:
+
+        config = self.config_store.get()
+
         return web.Response(
-            body=await statistics.report_stats(),
+            body=await metrics.report_json(config),
             content_type="application/json",
             charset="utf8",
         )
 
     async def _handler_metrics_prometheus(self, _request: web.Request) -> web.Response:
 
-        metrics_report = await statistics.report_stats(prometheus_format=True)
+        metrics_report = await metrics.report_prometheus()
         if not metrics_report:
             raise web.HTTPNotFound()
 
@@ -555,7 +558,7 @@ async def start_server(config: Path = CONFIG_FILE_PATH_DEFAULT) -> int:
 
         # With configuration on hand, we can initialize monitoring. We want to do this before any subprocesses are
         # started, therefore before initializing manager
-        await statistics.init_monitoring(config_store)
+        await metrics.init_prometheus(config_store)
 
         # prepare instance of the server (no side effects)
         server = Server(config_store, config)
index 904ded57d336917729c7d5163c210410bcf7a8a4..4234d8b0d4d1975fecd401b56abdd93179ca6f40 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -15,6 +15,7 @@ packages = \
  'knot_resolver.datamodel.templates',
  'knot_resolver.datamodel.types',
  'knot_resolver.manager',
+ 'knot_resolver.manager.metrics',
  'knot_resolver.utils',
  'knot_resolver.utils.compat',
  'knot_resolver.utils.modeling']