]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager: monitoring: graphite bridge
authorVasek Sraier <git@vakabus.cz>
Tue, 8 Feb 2022 14:10:31 +0000 (15:10 +0100)
committerAleš Mrázek <ales.mrazek@nic.cz>
Fri, 8 Apr 2022 14:17:54 +0000 (16:17 +0200)
manager/knot_resolver_manager/compat/asyncio.py
manager/knot_resolver_manager/datamodel/monitoring_schema.py
manager/knot_resolver_manager/datamodel/templates/monitoring.lua.j2
manager/knot_resolver_manager/server.py
manager/knot_resolver_manager/statistics.py

index da84b76ea9a22aff8c573e836f4fb5dc16baa7d3..9a495e4a87d558d92f0e769d38122d9700e2389f 100644 (file)
@@ -13,7 +13,6 @@ import asyncio
 import functools
 import logging
 import sys
-from asyncio.futures import Future
 from typing import Any, Awaitable, Callable, Coroutine, Optional, TypeVar
 
 logger = logging.getLogger(__name__)
@@ -48,7 +47,7 @@ async def to_thread(func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
         return res
 
 
-def create_task(coro: Awaitable[T], name: Optional[str] = None) -> "Future[T]":
+def create_task(coro: Awaitable[T], name: Optional[str] = None) -> "asyncio.Task[T]":
     # version 3.8 and higher, call directly
     if sys.version_info.major >= 3 and sys.version_info.minor >= 8:
         return asyncio.create_task(coro, name=name)  # type: ignore[attr-defined]
index 660737abc345075f22885b24971d4f6a43f17ffc..d023c8f792236f9564a0c163baa3ef5eda7e1903 100644 (file)
@@ -1,23 +1,29 @@
-from typing import Optional
+from typing import Union
 
 from typing_extensions import Literal
 
+from knot_resolver_manager.datamodel.types import TimeUnit
 from knot_resolver_manager.utils.modelling import SchemaNode
 
 
 class GraphiteSchema(SchemaNode):
-    endpoint: str
-    prefix: str
-    interval_sec: int
-    tcp: bool
+    host: str
+    port: int = 2003
+    prefix: str = ""
+    interval_sec: TimeUnit = TimeUnit("5s")
+    tcp: bool = False
+
+    def _validate(self):
+        if not 0 < self.port < 65_536:
+            raise ValueError("port must be between 0 and 65536 (both exclusive)")
 
 
 class MonitoringSchema(SchemaNode):
     """
     ---
-    state: configures, whether statistics module will be loaded into resolver
+    enabled: configures, whether statistics module will be loaded into resolver
     graphite: optionally configures where should graphite metrics be sent to
     """
 
-    state: Literal["manager-only", "lazy", "always"] = "always"
-    graphite: Optional[GraphiteSchema] = None
+    enabled: Literal["manager-only", "lazy", "always"] = "always"
+    graphite: Union[Literal[False], GraphiteSchema] = False
index 31374d0ef42b72e8fb46dbbd2a48dd2978b806f3..ebaaa3b346353e38fd0ad1aa55f26ec5e6c3134b 100644 (file)
@@ -14,7 +14,7 @@ else
        end
 end
 
-{% if cfg.monitoring.state == "always" %}
+{% if cfg.monitoring.enabled == "always" %}
 modules.load('stats')
 {% endif %}
 
index d216e08ea73a620736cad5d3816e0f755f651bbb..e1b533119c716d61a12d91351ec6ff559a7b2a29 100644 (file)
@@ -161,7 +161,7 @@ class Server:
 
     async def _handler_metrics(self, _request: web.Request) -> web.Response:
         return web.Response(
-            body=await statistics.report_stats(self.config_store.get()),
+            body=await statistics.report_stats(),
             content_type="text/plain",
             charset="utf8",
         )
@@ -357,6 +357,10 @@ async def start_server(config: Union[Path, ParsedTree] = DEFAULT_MANAGER_CONFIG_
         # can flush the buffer into the proper place
         await log.logger_init(config_store)
 
+        # With configuration on hand, we can initialize monitoring. We want to do this before any subprocesses are
+        # started, therefore before initializing manager
+        await statistics.init_monitoring(config_store)
+
         # After we have loaded the configuration, we can start worring about subprocess management.
         manager = await _init_manager(config_store)
     except KresManagerException as e:
index 29368d823e9d25209dbce2b1af4901ada5133fb0..17608013c7de6b7468862187849331ed2766794c 100644 (file)
@@ -4,6 +4,7 @@ import logging
 from typing import Any, Awaitable, Callable, Dict, Generator, List, Optional, Tuple, TypeVar
 
 from prometheus_client import Histogram, exposition  # type: ignore
+from prometheus_client.bridge.graphite import GraphiteBridge  # type: ignore
 from prometheus_client.core import (  # type: ignore
     REGISTRY,
     CounterMetricFamily,
@@ -12,9 +13,12 @@ from prometheus_client.core import (  # type: ignore
     Metric,
 )
 
+from knot_resolver_manager import compat
+from knot_resolver_manager.config_store import ConfigStore, only_on_real_changes
 from knot_resolver_manager.datamodel.config_schema import KresConfig
 from knot_resolver_manager.kres_id import KresID
 from knot_resolver_manager.kresd_controller.interface import Subprocess
+from knot_resolver_manager.utils.functional import Result
 
 logger = logging.getLogger(__name__)
 
@@ -73,22 +77,59 @@ def _histogram(
 
 
 class ResolverCollector:
-    def __init__(self) -> None:
+    def __init__(self, config_store: ConfigStore) -> None:
         self._stats_raw: Optional[Dict[KresID, str]] = None
+        self._config_store: ConfigStore = config_store
+        self._collection_task: "Optional[asyncio.Task[None]]" = None
 
-    def set_stats(self, stats_raw: Optional[Dict[KresID, str]]) -> None:
-        self._stats_raw = stats_raw
+    def _trigger_stats_collection(self) -> None:
+        async def collect(config: KresConfig) -> None:
+            if config.monitoring.enabled == "manager-only":
+                self._stats_raw = None
+                return
+
+            lazy = config.monitoring.enabled == "lazy"
+            cmd = "collect_lazy_statistics()" if lazy else "collect_statistics()"
+            stats_raw = await _command_registered_resolvers(cmd)
+            self._stats_raw = stats_raw
+
+        # we are running inside an event loop, but in a synchronous function and that sucks a lot
+        # it means that we shouldn't block the event loop by performing a blocking stats collection
+        # but it also means that we can't yield to the event loop as this function is synchronous
+        # therefore we can only start a new task, but we can't wait for it
+        # which causes the metrics to be delayed by one collection pass (not the best, but probably good enough)
+        if self._collection_task is not None and not self._collection_task.done():
+            logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!")
+        else:
+            self._collection_task = compat.asyncio.create_task(collect(self._config_store.get()))
+
+    def _create_resolver_metrics_loaded_gauge(self, kid: KresID, loaded: bool) -> GaugeMetricFamily:
+        return _gauge(
+            "resolver_metrics_loaded",
+            "0 if metrics from resolver instance were not loaded, otherwise 1",
+            label=("instance_id", str(kid)),
+            value=int(loaded),
+        )
 
     def collect(self) -> Generator[Metric, None, None]:
+        # schedule new stats collection
+        self._trigger_stats_collection()
+
+        # if we have no data, return metrics with information about it and exit
         if self._stats_raw is None:
+            for kid in _REGISTERED_RESOLVERS:
+                yield self._create_resolver_metrics_loaded_gauge(kid, False)
             return
 
-        for kid, raw in self._stats_raw.items():
+        # if we have data, parse them
+        for kid in _REGISTERED_RESOLVERS:
             success = False
             try:
-                metrics: Dict[str, int] = json.loads(raw[1:-1])
-                yield from self._parse_resolver_metrics(kid, metrics)
-                success = True
+                if kid in self._stats_raw:
+                    raw = self._stats_raw[kid]
+                    metrics: Dict[str, int] = json.loads(raw[1:-1])
+                    yield from self._parse_resolver_metrics(kid, metrics)
+                    success = True
             except json.JSONDecodeError:
                 logger.warning("Failed to load metrics from resolver instance %s: failed to parse statistics", str(kid))
             except KeyError as e:
@@ -98,12 +139,7 @@ class ResolverCollector:
                     str(e),
                 )
 
-            yield _gauge(
-                "resolver_metrics_loaded",
-                "0 if metrics from resolver instance were not loaded, otherwise 1",
-                label=("instance_id", str(kid)),
-                value=int(success),
-            )
+            yield self._create_resolver_metrics_loaded_gauge(kid, success)
 
     def _parse_resolver_metrics(self, instance_id: KresID, metrics: Any) -> Generator[Metric, None, None]:
         sid = str(instance_id)
@@ -262,10 +298,6 @@ class ResolverCollector:
         )
 
 
-_RESOLVER_COLLECTOR = ResolverCollector()
-REGISTRY.register(_RESOLVER_COLLECTOR)  # type: ignore
-
-
 def unregister_resolver_metrics_for(subprocess: Subprocess) -> None:
     """
     Cancel metric collection from resolver subprocess
@@ -280,22 +312,54 @@ def register_resolver_metrics_for(subprocess: Subprocess) -> None:
     _REGISTERED_RESOLVERS[subprocess.id] = subprocess
 
 
-async def report_stats(config: KresConfig) -> bytes:
+async def report_stats() -> bytes:
     """
     Collects metrics from everything, returns data string in Prometheus format.
     """
+    return exposition.generate_latest()  # type: ignore
 
-    try:
-        if config.monitoring.state != "manager-only":
-            lazy = config.monitoring.state == "lazy"
-            ON_DEMAND_STATS_QUERY = "collect_lazy_statistics()"
-            STATS_QUERY = "collect_statistics()"
 
-            cmd = ON_DEMAND_STATS_QUERY if lazy else STATS_QUERY
-            stats_raw = await _command_registered_resolvers(cmd)
-            _RESOLVER_COLLECTOR.set_stats(stats_raw)
+async def _deny_turning_off_graphite_bridge(old_config: KresConfig, new_config: KresConfig) -> Result[None, str]:
+    if old_config.monitoring.graphite is not None and new_config.monitoring.graphite is None:
+        return Result.err(
+            "You can't turn off graphite monitoring dynamically. If you really want this feature, please let the developers know."
+        )
+
+    if (
+        old_config.monitoring.graphite is not None
+        and new_config.monitoring.graphite is not None
+        and old_config.monitoring.graphite != new_config.monitoring.graphite
+    ):
+        return Result.err("Changing graphite exporter configuration in runtime is not allowed.")
+
+    return Result.ok(None)
+
+
+_graphite_bridge: Optional[GraphiteBridge] = None
+
+
+@only_on_real_changes(lambda c: c.monitoring.graphite)
+async def _configure_graphite_bridge(config: KresConfig) -> None:
+    """
+    Starts graphite bridge if required
+    """
+    global _graphite_bridge
+    if config.monitoring.graphite is not False and _graphite_bridge is None:
+        logger.info("Starting Graphite metrics exporter for [%s]:%d", config.monitoring.graphite.host, config.monitoring.graphite.port)
+        _graphite_bridge = GraphiteBridge((config.monitoring.graphite.host, config.monitoring.graphite.port))
+        _graphite_bridge.start(  # type: ignore
+            interval=config.monitoring.graphite.interval_sec.seconds(), prefix=config.monitoring.graphite.prefix
+        )
+
+
+async def init_monitoring(config_store: ConfigStore) -> None:
+    """
+    Initialize monitoring. Must be called before any other function from this module.
+    """
+    # register metrics collector
+    _RESOLVER_COLLECTOR = ResolverCollector(config_store)
+    REGISTRY.register(_RESOLVER_COLLECTOR)  # type: ignore
 
-        return exposition.generate_latest()  # type: ignore
-    finally:
-        # after the report has been generated, clean everything
-        _RESOLVER_COLLECTOR.set_stats(None)
+    # register graphite bridge
+    await config_store.register_verifier(_deny_turning_off_graphite_bridge)
+    await config_store.register_on_change_callback(_configure_graphite_bridge)