From: Vasek Sraier Date: Tue, 8 Feb 2022 14:10:31 +0000 (+0100) Subject: manager: monitoring: graphite bridge X-Git-Tag: v6.0.0a1~43^2~2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=e2bdc9b68ac1788250ce16f58ed151cfcf74559c;p=thirdparty%2Fknot-resolver.git manager: monitoring: graphite bridge --- diff --git a/manager/knot_resolver_manager/compat/asyncio.py b/manager/knot_resolver_manager/compat/asyncio.py index da84b76ea..9a495e4a8 100644 --- a/manager/knot_resolver_manager/compat/asyncio.py +++ b/manager/knot_resolver_manager/compat/asyncio.py @@ -13,7 +13,6 @@ import asyncio import functools import logging import sys -from asyncio.futures import Future from typing import Any, Awaitable, Callable, Coroutine, Optional, TypeVar logger = logging.getLogger(__name__) @@ -48,7 +47,7 @@ async def to_thread(func: Callable[..., T], *args: Any, **kwargs: Any) -> T: return res -def create_task(coro: Awaitable[T], name: Optional[str] = None) -> "Future[T]": +def create_task(coro: Awaitable[T], name: Optional[str] = None) -> "asyncio.Task[T]": # version 3.8 and higher, call directly if sys.version_info.major >= 3 and sys.version_info.minor >= 8: return asyncio.create_task(coro, name=name) # type: ignore[attr-defined] diff --git a/manager/knot_resolver_manager/datamodel/monitoring_schema.py b/manager/knot_resolver_manager/datamodel/monitoring_schema.py index 660737abc..d023c8f79 100644 --- a/manager/knot_resolver_manager/datamodel/monitoring_schema.py +++ b/manager/knot_resolver_manager/datamodel/monitoring_schema.py @@ -1,23 +1,29 @@ -from typing import Optional +from typing import Union from typing_extensions import Literal +from knot_resolver_manager.datamodel.types import TimeUnit from knot_resolver_manager.utils.modelling import SchemaNode class GraphiteSchema(SchemaNode): - endpoint: str - prefix: str - interval_sec: int - tcp: bool + host: str + port: int = 2003 + prefix: str = "" + interval_sec: TimeUnit = TimeUnit("5s") + tcp: bool = False + + def _validate(self): + if not 0 < self.port < 65_536: + raise ValueError("port must be between 0 and 65536 (both exclusive)") class MonitoringSchema(SchemaNode): """ --- - state: configures, whether statistics module will be loaded into resolver + enabled: configures, whether statistics module will be loaded into resolver graphite: optionally configures where should graphite metrics be sent to """ - state: Literal["manager-only", "lazy", "always"] = "always" - graphite: Optional[GraphiteSchema] = None + enabled: Literal["manager-only", "lazy", "always"] = "always" + graphite: Union[Literal[False], GraphiteSchema] = False diff --git a/manager/knot_resolver_manager/datamodel/templates/monitoring.lua.j2 b/manager/knot_resolver_manager/datamodel/templates/monitoring.lua.j2 index 31374d0ef..ebaaa3b34 100644 --- a/manager/knot_resolver_manager/datamodel/templates/monitoring.lua.j2 +++ b/manager/knot_resolver_manager/datamodel/templates/monitoring.lua.j2 @@ -14,7 +14,7 @@ else end end -{% if cfg.monitoring.state == "always" %} +{% if cfg.monitoring.enabled == "always" %} modules.load('stats') {% endif %} diff --git a/manager/knot_resolver_manager/server.py b/manager/knot_resolver_manager/server.py index d216e08ea..e1b533119 100644 --- a/manager/knot_resolver_manager/server.py +++ b/manager/knot_resolver_manager/server.py @@ -161,7 +161,7 @@ class Server: async def _handler_metrics(self, _request: web.Request) -> web.Response: return web.Response( - body=await statistics.report_stats(self.config_store.get()), + body=await statistics.report_stats(), content_type="text/plain", charset="utf8", ) @@ -357,6 +357,10 @@ async def start_server(config: Union[Path, ParsedTree] = DEFAULT_MANAGER_CONFIG_ # can flush the buffer into the proper place await log.logger_init(config_store) + # With configuration on hand, we can initialize monitoring. We want to do this before any subprocesses are + # started, therefore before initializing manager + await statistics.init_monitoring(config_store) + # After we have loaded the configuration, we can start worring about subprocess management. manager = await _init_manager(config_store) except KresManagerException as e: diff --git a/manager/knot_resolver_manager/statistics.py b/manager/knot_resolver_manager/statistics.py index 29368d823..17608013c 100644 --- a/manager/knot_resolver_manager/statistics.py +++ b/manager/knot_resolver_manager/statistics.py @@ -4,6 +4,7 @@ import logging from typing import Any, Awaitable, Callable, Dict, Generator, List, Optional, Tuple, TypeVar from prometheus_client import Histogram, exposition # type: ignore +from prometheus_client.bridge.graphite import GraphiteBridge # type: ignore from prometheus_client.core import ( # type: ignore REGISTRY, CounterMetricFamily, @@ -12,9 +13,12 @@ from prometheus_client.core import ( # type: ignore Metric, ) +from knot_resolver_manager import compat +from knot_resolver_manager.config_store import ConfigStore, only_on_real_changes from knot_resolver_manager.datamodel.config_schema import KresConfig from knot_resolver_manager.kres_id import KresID from knot_resolver_manager.kresd_controller.interface import Subprocess +from knot_resolver_manager.utils.functional import Result logger = logging.getLogger(__name__) @@ -73,22 +77,59 @@ def _histogram( class ResolverCollector: - def __init__(self) -> None: + def __init__(self, config_store: ConfigStore) -> None: self._stats_raw: Optional[Dict[KresID, str]] = None + self._config_store: ConfigStore = config_store + self._collection_task: "Optional[asyncio.Task[None]]" = None - def set_stats(self, stats_raw: Optional[Dict[KresID, str]]) -> None: - self._stats_raw = stats_raw + def _trigger_stats_collection(self) -> None: + async def collect(config: KresConfig) -> None: + if config.monitoring.enabled == "manager-only": + self._stats_raw = None + return + + lazy = config.monitoring.enabled == "lazy" + cmd = "collect_lazy_statistics()" if lazy else "collect_statistics()" + stats_raw = await _command_registered_resolvers(cmd) + self._stats_raw = stats_raw + + # we are running inside an event loop, but in a synchronous function and that sucks a lot + # it means that we shouldn't block the event loop by performing a blocking stats collection + # but it also means that we can't yield to the event loop as this function is synchronous + # therefore we can only start a new task, but we can't wait for it + # which causes the metrics to be delayed by one collection pass (not the best, but probably good enough) + if self._collection_task is not None and not self._collection_task.done(): + logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!") + else: + self._collection_task = compat.asyncio.create_task(collect(self._config_store.get())) + + def _create_resolver_metrics_loaded_gauge(self, kid: KresID, loaded: bool) -> GaugeMetricFamily: + return _gauge( + "resolver_metrics_loaded", + "0 if metrics from resolver instance were not loaded, otherwise 1", + label=("instance_id", str(kid)), + value=int(loaded), + ) def collect(self) -> Generator[Metric, None, None]: + # schedule new stats collection + self._trigger_stats_collection() + + # if we have no data, return metrics with information about it and exit if self._stats_raw is None: + for kid in _REGISTERED_RESOLVERS: + yield self._create_resolver_metrics_loaded_gauge(kid, False) return - for kid, raw in self._stats_raw.items(): + # if we have data, parse them + for kid in _REGISTERED_RESOLVERS: success = False try: - metrics: Dict[str, int] = json.loads(raw[1:-1]) - yield from self._parse_resolver_metrics(kid, metrics) - success = True + if kid in self._stats_raw: + raw = self._stats_raw[kid] + metrics: Dict[str, int] = json.loads(raw[1:-1]) + yield from self._parse_resolver_metrics(kid, metrics) + success = True except json.JSONDecodeError: logger.warning("Failed to load metrics from resolver instance %s: failed to parse statistics", str(kid)) except KeyError as e: @@ -98,12 +139,7 @@ class ResolverCollector: str(e), ) - yield _gauge( - "resolver_metrics_loaded", - "0 if metrics from resolver instance were not loaded, otherwise 1", - label=("instance_id", str(kid)), - value=int(success), - ) + yield self._create_resolver_metrics_loaded_gauge(kid, success) def _parse_resolver_metrics(self, instance_id: KresID, metrics: Any) -> Generator[Metric, None, None]: sid = str(instance_id) @@ -262,10 +298,6 @@ class ResolverCollector: ) -_RESOLVER_COLLECTOR = ResolverCollector() -REGISTRY.register(_RESOLVER_COLLECTOR) # type: ignore - - def unregister_resolver_metrics_for(subprocess: Subprocess) -> None: """ Cancel metric collection from resolver subprocess @@ -280,22 +312,54 @@ def register_resolver_metrics_for(subprocess: Subprocess) -> None: _REGISTERED_RESOLVERS[subprocess.id] = subprocess -async def report_stats(config: KresConfig) -> bytes: +async def report_stats() -> bytes: """ Collects metrics from everything, returns data string in Prometheus format. """ + return exposition.generate_latest() # type: ignore - try: - if config.monitoring.state != "manager-only": - lazy = config.monitoring.state == "lazy" - ON_DEMAND_STATS_QUERY = "collect_lazy_statistics()" - STATS_QUERY = "collect_statistics()" - cmd = ON_DEMAND_STATS_QUERY if lazy else STATS_QUERY - stats_raw = await _command_registered_resolvers(cmd) - _RESOLVER_COLLECTOR.set_stats(stats_raw) +async def _deny_turning_off_graphite_bridge(old_config: KresConfig, new_config: KresConfig) -> Result[None, str]: + if old_config.monitoring.graphite is not None and new_config.monitoring.graphite is None: + return Result.err( + "You can't turn off graphite monitoring dynamically. If you really want this feature, please let the developers know." + ) + + if ( + old_config.monitoring.graphite is not None + and new_config.monitoring.graphite is not None + and old_config.monitoring.graphite != new_config.monitoring.graphite + ): + return Result.err("Changing graphite exporter configuration in runtime is not allowed.") + + return Result.ok(None) + + +_graphite_bridge: Optional[GraphiteBridge] = None + + +@only_on_real_changes(lambda c: c.monitoring.graphite) +async def _configure_graphite_bridge(config: KresConfig) -> None: + """ + Starts graphite bridge if required + """ + global _graphite_bridge + if config.monitoring.graphite is not False and _graphite_bridge is None: + logger.info("Starting Graphite metrics exporter for [%s]:%d", config.monitoring.graphite.host, config.monitoring.graphite.port) + _graphite_bridge = GraphiteBridge((config.monitoring.graphite.host, config.monitoring.graphite.port)) + _graphite_bridge.start( # type: ignore + interval=config.monitoring.graphite.interval_sec.seconds(), prefix=config.monitoring.graphite.prefix + ) + + +async def init_monitoring(config_store: ConfigStore) -> None: + """ + Initialize monitoring. Must be called before any other function from this module. + """ + # register metrics collector + _RESOLVER_COLLECTOR = ResolverCollector(config_store) + REGISTRY.register(_RESOLVER_COLLECTOR) # type: ignore - return exposition.generate_latest() # type: ignore - finally: - # after the report has been generated, clean everything - _RESOLVER_COLLECTOR.set_stats(None) + # register graphite bridge + await config_store.register_verifier(_deny_turning_off_graphite_bridge) + await config_store.register_on_change_callback(_configure_graphite_bridge)