From: Aleš Mrázek Date: Thu, 21 Mar 2024 13:53:54 +0000 (+0100) Subject: manager: api: metrics: JSON support as default X-Git-Tag: v6.0.8~27^2~7 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=0496fb8c52d47fd51e46d95a8ec45e1deb02f023;p=thirdparty%2Fknot-resolver.git manager: api: metrics: JSON support as default - /metrics - returns 301, redirects to /metrics/json - /metrics/json - exports metrics in JSON format - /metrics/prometheus - optional, exports metrics in Prometheus format, returns 404 if not supported --- diff --git a/doc/user/manager-api.rst b/doc/user/manager-api.rst index cc297cc47..21efb350b 100644 --- a/doc/user/manager-api.rst +++ b/doc/user/manager-api.rst @@ -85,7 +85,10 @@ List of API endpoints - ``GET /schema`` returns JSON schema of the configuration data model - ``GET /schema/ui`` redirect to an external website with the JSON schema visualization -- ``GET /metrics`` provides Prometheus metrics +- ``GET /metrics`` returns 301 (Moved Permanently) and redirects to ``/metrics/json`` +- ``GET /metrics/json`` provides aggregated metrics in JSON format +- ``GET /metrics/prometheus`` provides metrics in Prometheus format + The ``prometheus-client`` Python package needs to be installed. If not installed, it returns 404 (Not Found). - ``GET /`` static response that could be used to determine, whether the Manager is running - ``POST /stop`` gracefully stops the Manager, empty request body - ``POST /cache/clear`` purges cache records matching the specified criteria, see :ref:`cache clearing ` diff --git a/manager/knot_resolver_manager/server.py b/manager/knot_resolver_manager/server.py index a6fc408d0..b27cadb33 100644 --- a/manager/knot_resolver_manager/server.py +++ b/manager/knot_resolver_manager/server.py @@ -188,7 +188,6 @@ class Server: } ) - @statistics.async_timing_histogram(statistics.MANAGER_REQUEST_RECONFIGURE_LATENCY) async def _handler_config_query(self, request: web.Request) -> web.Response: """ Route handler for changing resolver configuration @@ -238,9 +237,24 @@ class Server: res.headers.add("ETag", f'"{structural_etag(new_config)}"') return res - async def _handler_metrics(self, _request: web.Request) -> web.Response: + async def _handler_metrics(self, request: web.Request) -> web.Response: + raise web.HTTPMovedPermanently("/metrics/json") + + async def _handler_metrics_json(self, _request: web.Request) -> web.Response: return web.Response( body=await statistics.report_stats(), + content_type="application/json", + charset="utf8", + ) + + async def _handler_metrics_prometheus(self, _request: web.Request) -> web.Response: + + metrics_report = await statistics.report_stats(prometheus_format=True) + if not metrics_report: + raise web.HTTPNotFound() + + return web.Response( + body=metrics_report, content_type="text/plain", charset="utf8", ) @@ -330,6 +344,8 @@ class Server: web.get("/schema", self._handler_schema), web.get("/schema/ui", self._handle_view_schema), web.get("/metrics", self._handler_metrics), + web.get("/metrics/json", self._handler_metrics_json), + web.get("/metrics/prometheus", self._handler_metrics_prometheus), web.post("/cache/clear", self._handler_cache_clear), ] ) diff --git a/manager/knot_resolver_manager/statistics.py b/manager/knot_resolver_manager/statistics.py index aa51c423d..51e8708d6 100644 --- a/manager/knot_resolver_manager/statistics.py +++ b/manager/knot_resolver_manager/statistics.py @@ -1,12 +1,8 @@ import asyncio +import importlib import json import logging -from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Generator, List, Optional, Tuple, TypeVar - -from prometheus_client import Histogram, exposition # type: ignore -from prometheus_client.bridge.graphite import GraphiteBridge # type: ignore -from prometheus_client.core import GaugeMetricFamily # type: ignore -from prometheus_client.core import REGISTRY, CounterMetricFamily, HistogramMetricFamily, Metric +from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Tuple from knot_resolver_manager import compat from knot_resolver_manager.config_store import ConfigStore, only_on_real_changes @@ -16,159 +12,43 @@ from knot_resolver_manager.kresd_controller.registered_workers import ( get_registered_workers_kresids, ) from knot_resolver_manager.utils.functional import Result +from knot_resolver_manager.utils.modeling.parsing import DataFormat if TYPE_CHECKING: from knot_resolver_manager.kresd_controller.interface import KresID - logger = logging.getLogger(__name__) -MANAGER_REQUEST_RECONFIGURE_LATENCY = Histogram( - "manager_request_reconfigure_latency", "Time it takes to change configuration" -) - - -T = TypeVar("T") - - -def async_timing_histogram(metric: Histogram) -> Callable[[Callable[..., Awaitable[T]]], Callable[..., Awaitable[T]]]: - """ - Decorator which can be used to report duration on async functions - """ - - def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]: - async def wrapper(*args: Any, **kwargs: Any) -> T: - with metric.time(): - res = await func(*args, **kwargs) - return res - - return wrapper - - return decorator - - -def _counter(name: str, description: str, label: Tuple[str, str], value: float) -> CounterMetricFamily: - c = CounterMetricFamily(name, description, labels=(label[0],)) - c.add_metric((label[1],), value) # type: ignore - return c - - -def _gauge(name: str, description: str, label: Tuple[str, str], value: float) -> GaugeMetricFamily: - c = GaugeMetricFamily(name, description, labels=(label[0],)) - c.add_metric((label[1],), value) # type: ignore - return c - - -def _histogram( - name: str, description: str, label: Tuple[str, str], buckets: List[Tuple[str, int]], sum_value: float -) -> HistogramMetricFamily: - c = HistogramMetricFamily(name, description, labels=(label[0],)) - c.add_metric((label[1],), buckets, sum_value=sum_value) # type: ignore - return c - - -class ResolverCollector: - def __init__(self, config_store: ConfigStore) -> None: - self._stats_raw: "Optional[Dict[KresID, object]]" = None - self._config_store: ConfigStore = config_store - self._collection_task: "Optional[asyncio.Task[None]]" = None - self._skip_immediate_collection: bool = False - - async def collect_kresd_stats(self, _triggered_from_prometheus_library: bool = False) -> None: - if self._skip_immediate_collection: - # this would happen because we are calling this function first manually before stat generation, - # and once again immediately afterwards caused by the prometheus library's stat collection - # - # this is a code made to solve problem with calling async functions from sync methods - self._skip_immediate_collection = False - return - - config = self._config_store.get() - - if config.monitoring.enabled == "manager-only": - logger.debug("Skipping kresd stat collection due to configuration") - self._stats_raw = None - return - - lazy = config.monitoring.enabled == "lazy" - cmd = "collect_lazy_statistics()" if lazy else "collect_statistics()" - logger.debug("Collecting kresd stats with method '%s'", cmd) - stats_raw = await command_registered_workers(cmd) - self._stats_raw = stats_raw - - # if this function was not called by the prometheus library and calling collect() is imminent, - # we should block the next collection cycle as it would be useless - if not _triggered_from_prometheus_library: - self._skip_immediate_collection = True - def _trigger_stats_collection(self) -> None: - # we are running inside an event loop, but in a synchronous function and that sucks a lot - # it means that we shouldn't block the event loop by performing a blocking stats collection - # but it also means that we can't yield to the event loop as this function is synchronous - # therefore we can only start a new task, but we can't wait for it - # which causes the metrics to be delayed by one collection pass (not the best, but probably good enough) - # - # this issue can be prevented by calling the `collect_kresd_stats()` function manually before entering - # the Prometheus library. We just have to prevent the library from invoking it again. See the mentioned - # function for details +_prometheus_support = False +if importlib.util.find_spec("prometheus_client"): + _prometheus_support = True - if compat.asyncio.is_event_loop_running(): - # when running, we can schedule the new data collection - if self._collection_task is not None and not self._collection_task.done(): - logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!") - else: - self._collection_task = compat.asyncio.create_task( - self.collect_kresd_stats(_triggered_from_prometheus_library=True) - ) - else: - # when not running, we can start a new loop (we are not in the manager's main thread) - compat.asyncio.run(self.collect_kresd_stats(_triggered_from_prometheus_library=True)) +if _prometheus_support: + from prometheus_client import exposition # type: ignore + from prometheus_client.bridge.graphite import GraphiteBridge # type: ignore + from prometheus_client.core import GaugeMetricFamily # type: ignore + from prometheus_client.core import REGISTRY, CounterMetricFamily, HistogramMetricFamily, Metric - def _create_resolver_metrics_loaded_gauge(self, kresid: "KresID", loaded: bool) -> GaugeMetricFamily: - return _gauge( - "resolver_metrics_loaded", - "0 if metrics from resolver instance were not loaded, otherwise 1", - label=("instance_id", str(kresid)), - value=int(loaded), - ) + def _counter(name: str, description: str, label: Tuple[str, str], value: float) -> CounterMetricFamily: + c = CounterMetricFamily(name, description, labels=(label[0],)) + c.add_metric((label[1],), value) # type: ignore + return c - def collect(self) -> Generator[Metric, None, None]: - # schedule new stats collection - self._trigger_stats_collection() + def _gauge(name: str, description: str, label: Tuple[str, str], value: float) -> GaugeMetricFamily: + c = GaugeMetricFamily(name, description, labels=(label[0],)) + c.add_metric((label[1],), value) # type: ignore + return c - # if we have no data, return metrics with information about it and exit - if self._stats_raw is None: - for kresid in get_registered_workers_kresids(): - yield self._create_resolver_metrics_loaded_gauge(kresid, False) - return - - # if we have data, parse them - for kresid in get_registered_workers_kresids(): - success = False - try: - if kresid in self._stats_raw: - metrics = self._stats_raw[kresid] - yield from self._parse_resolver_metrics(kresid, metrics) - success = True - except json.JSONDecodeError: - logger.warning( - "Failed to load metrics from resolver instance %s: failed to parse statistics", str(kresid) - ) - except KeyError as e: - logger.warning( - "Failed to load metrics from resolver instance %s: attempted to read missing statistic %s", - str(kresid), - str(e), - ) + def _histogram( + name: str, description: str, label: Tuple[str, str], buckets: List[Tuple[str, int]], sum_value: float + ) -> HistogramMetricFamily: + c = HistogramMetricFamily(name, description, labels=(label[0],)) + c.add_metric((label[1],), buckets, sum_value=sum_value) # type: ignore + return c - yield self._create_resolver_metrics_loaded_gauge(kresid, success) - - def describe(self) -> List[Metric]: - # this function prevents the collector registry from invoking the collect function on startup - return [] - - def _parse_resolver_metrics(self, instance_id: "KresID", metrics: Any) -> Generator[Metric, None, None]: + def _parse_resolver_metrics(instance_id: "KresID", metrics: Any) -> Generator[Metric, None, None]: sid = str(instance_id) # response latency histogram @@ -179,216 +59,349 @@ class ResolverCollector: "Time it takes to respond to queries in seconds", label=("instance_id", sid), buckets=[ - (bnp, metrics[f"answer.{duration}"]) + (bnp, metrics["answer"][f"{duration}"]) for bnp, duration in zip(BUCKET_NAMES_PROMETHEUS, BUCKET_NAMES_IN_RESOLVER) ], - sum_value=metrics["answer.sum_ms"] / 1_000, + sum_value=metrics["answer"]["sum_ms"] / 1_000, ) yield _counter( "resolver_request_total", "total number of DNS requests (including internal client requests)", label=("instance_id", sid), - value=metrics["request.total"], + value=metrics["request"]["total"], ) yield _counter( "resolver_request_internal", "number of internal requests generated by Knot Resolver (e.g. DNSSEC trust anchor updates)", label=("instance_id", sid), - value=metrics["request.internal"], + value=metrics["request"]["internal"], ) yield _counter( "resolver_request_udp", "number of external requests received over plain UDP (RFC 1035)", label=("instance_id", sid), - value=metrics["request.udp"], + value=metrics["request"]["udp"], ) yield _counter( "resolver_request_tcp", "number of external requests received over plain TCP (RFC 1035)", label=("instance_id", sid), - value=metrics["request.tcp"], + value=metrics["request"]["tcp"], ) yield _counter( "resolver_request_dot", "number of external requests received over DNS-over-TLS (RFC 7858)", label=("instance_id", sid), - value=metrics["request.dot"], + value=metrics["request"]["dot"], ) yield _counter( "resolver_request_doh", "number of external requests received over DNS-over-HTTP (RFC 8484)", label=("instance_id", sid), - value=metrics["request.doh"], + value=metrics["request"]["doh"], ) yield _counter( "resolver_request_xdp", "number of external requests received over plain UDP via an AF_XDP socket", label=("instance_id", sid), - value=metrics["request.xdp"], + value=metrics["request"]["xdp"], ) yield _counter( "resolver_answer_total", "total number of answered queries", label=("instance_id", sid), - value=metrics["answer.total"], + value=metrics["answer"]["total"], ) yield _counter( "resolver_answer_cached", "number of queries answered from cache", label=("instance_id", sid), - value=metrics["answer.cached"], + value=metrics["answer"]["cached"], ) yield _counter( "resolver_answer_rcode_noerror", "number of NOERROR answers", label=("instance_id", sid), - value=metrics["answer.noerror"], + value=metrics["answer"]["noerror"], ) yield _counter( "resolver_answer_rcode_nodata", "number of NOERROR answers without any data", label=("instance_id", sid), - value=metrics["answer.nodata"], + value=metrics["answer"]["nodata"], ) yield _counter( "resolver_answer_rcode_nxdomain", "number of NXDOMAIN answers", label=("instance_id", sid), - value=metrics["answer.nxdomain"], + value=metrics["answer"]["nxdomain"], ) yield _counter( "resolver_answer_rcode_servfail", "number of SERVFAIL answers", label=("instance_id", sid), - value=metrics["answer.servfail"], + value=metrics["answer"]["servfail"], ) yield _counter( "resolver_answer_flag_aa", "number of authoritative answers", label=("instance_id", sid), - value=metrics["answer.aa"], + value=metrics["answer"]["aa"], ) yield _counter( "resolver_answer_flag_tc", "number of truncated answers", label=("instance_id", sid), - value=metrics["answer.tc"], + value=metrics["answer"]["tc"], ) yield _counter( "resolver_answer_flag_ra", "number of answers with recursion available flag", label=("instance_id", sid), - value=metrics["answer.ra"], + value=metrics["answer"]["ra"], ) yield _counter( "resolver_answer_flags_rd", "number of recursion desired (in answer!)", label=("instance_id", sid), - value=metrics["answer.rd"], + value=metrics["answer"]["rd"], ) yield _counter( "resolver_answer_flag_ad", "number of authentic data (DNSSEC) answers", label=("instance_id", sid), - value=metrics["answer.ad"], + value=metrics["answer"]["ad"], ) yield _counter( "resolver_answer_flag_cd", "number of checking disabled (DNSSEC) answers", label=("instance_id", sid), - value=metrics["answer.cd"], + value=metrics["answer"]["cd"], ) yield _counter( "resolver_answer_flag_do", "number of DNSSEC answer OK", label=("instance_id", sid), - value=metrics["answer.do"], + value=metrics["answer"]["do"], ) yield _counter( "resolver_answer_flag_edns0", "number of answers with EDNS0 present", label=("instance_id", sid), - value=metrics["answer.edns0"], + value=metrics["answer"]["edns0"], ) yield _counter( "resolver_query_edns", "number of queries with EDNS present", label=("instance_id", sid), - value=metrics["query.edns"], + value=metrics["query"]["edns"], ) yield _counter( "resolver_query_dnssec", "number of queries with DNSSEC DO=1", label=("instance_id", sid), - value=metrics["query.dnssec"], + value=metrics["query"]["dnssec"], + ) + + def _create_resolver_metrics_loaded_gauge(kresid: "KresID", loaded: bool) -> GaugeMetricFamily: + return _gauge( + "resolver_metrics_loaded", + "0 if metrics from resolver instance were not loaded, otherwise 1", + label=("instance_id", str(kresid)), + value=int(loaded), ) + async def _deny_turning_off_graphite_bridge(old_config: KresConfig, new_config: KresConfig) -> Result[None, str]: + if old_config.monitoring.graphite and not new_config.monitoring.graphite: + return Result.err( + "You can't turn off graphite monitoring dynamically. If you really want this feature, please let the developers know." + ) + + if ( + old_config.monitoring.graphite is not None + and new_config.monitoring.graphite is not None + and old_config.monitoring.graphite != new_config.monitoring.graphite + ): + return Result.err("Changing graphite exporter configuration in runtime is not allowed.") + + return Result.ok(None) + + _graphite_bridge: Optional[GraphiteBridge] = None + + @only_on_real_changes(lambda c: c.monitoring.graphite) + async def _configure_graphite_bridge(config: KresConfig) -> None: + """ + Starts graphite bridge if required + """ + global _graphite_bridge + if config.monitoring.graphite is not False and _graphite_bridge is None: + logger.info( + "Starting Graphite metrics exporter for [%s]:%d", + str(config.monitoring.graphite.host), + int(config.monitoring.graphite.port), + ) + _graphite_bridge = GraphiteBridge( + (str(config.monitoring.graphite.host), int(config.monitoring.graphite.port)) + ) + _graphite_bridge.start( # type: ignore + interval=config.monitoring.graphite.interval.seconds(), prefix=str(config.monitoring.graphite.prefix) + ) -_resolver_collector: Optional[ResolverCollector] = None +class ResolverCollector: + def __init__(self, config_store: ConfigStore) -> None: + self._stats_raw: "Optional[Dict[KresID, object]]" = None + self._config_store: ConfigStore = config_store + self._collection_task: "Optional[asyncio.Task[None]]" = None + self._skip_immediate_collection: bool = False -async def report_stats() -> bytes: - """ - Collects metrics from everything, returns data string in Prometheus format. - """ + if _prometheus_support: - # manually trigger stat collection so that we do not have to wait for it - if _resolver_collector is not None: - await _resolver_collector.collect_kresd_stats() - else: - raise RuntimeError("Function invoked before initializing the module!") + def collect(self) -> Generator[Metric, None, None]: + # schedule new stats collection + self._trigger_stats_collection() - # generate the report - return exposition.generate_latest() # type: ignore + # if we have no data, return metrics with information about it and exit + if self._stats_raw is None: + for kresid in get_registered_workers_kresids(): + yield _create_resolver_metrics_loaded_gauge(kresid, False) + return + # if we have data, parse them + for kresid in get_registered_workers_kresids(): + success = False + try: + if kresid in self._stats_raw: + metrics = self._stats_raw[kresid] + yield from _parse_resolver_metrics(kresid, metrics) + success = True + except json.JSONDecodeError: + logger.warning( + "Failed to load metrics from resolver instance %s: failed to parse statistics", str(kresid) + ) + except KeyError as e: + logger.warning( + "Failed to load metrics from resolver instance %s: attempted to read missing statistic %s", + str(kresid), + str(e), + ) + + yield _create_resolver_metrics_loaded_gauge(kresid, success) + + def describe(self) -> List[Metric]: + # this function prevents the collector registry from invoking the collect function on startup + return [] + + def report_json(self) -> str: + # schedule new stats collection + self._trigger_stats_collection() -async def _deny_turning_off_graphite_bridge(old_config: KresConfig, new_config: KresConfig) -> Result[None, str]: - if old_config.monitoring.graphite and not new_config.monitoring.graphite: - return Result.err( - "You can't turn off graphite monitoring dynamically. If you really want this feature, please let the developers know." - ) + # if we have no data, return metrics with information about it and exit + if self._stats_raw is None: + no_stats_dict: Dict[str, None] = {} + for kresid in get_registered_workers_kresids(): + no_stats_dict[str(kresid)] = None + return DataFormat.JSON.dict_dump(no_stats_dict) + + stats_dict: Dict[str, object] = {} + for kresid, stats in self._stats_raw.items(): + stats_dict[str(kresid)] = stats + + return DataFormat.JSON.dict_dump(stats_dict) + + async def collect_kresd_stats(self, _triggered_from_prometheus_library: bool = False) -> None: + if self._skip_immediate_collection: + # this would happen because we are calling this function first manually before stat generation, + # and once again immediately afterwards caused by the prometheus library's stat collection + # + # this is a code made to solve problem with calling async functions from sync methods + self._skip_immediate_collection = False + return + + config = self._config_store.get() + + if config.monitoring.enabled == "manager-only": + logger.debug("Skipping kresd stat collection due to configuration") + self._stats_raw = None + return + + lazy = config.monitoring.enabled == "lazy" + cmd = "collect_lazy_statistics()" if lazy else "collect_statistics()" + logger.debug("Collecting kresd stats with method '%s'", cmd) + stats_raw = await command_registered_workers(cmd) + self._stats_raw = stats_raw - if ( - old_config.monitoring.graphite is not None - and new_config.monitoring.graphite is not None - and old_config.monitoring.graphite != new_config.monitoring.graphite - ): - return Result.err("Changing graphite exporter configuration in runtime is not allowed.") + # if this function was not called by the prometheus library and calling collect() is imminent, + # we should block the next collection cycle as it would be useless + if not _triggered_from_prometheus_library: + self._skip_immediate_collection = True - return Result.ok(None) + def _trigger_stats_collection(self) -> None: + # we are running inside an event loop, but in a synchronous function and that sucks a lot + # it means that we shouldn't block the event loop by performing a blocking stats collection + # but it also means that we can't yield to the event loop as this function is synchronous + # therefore we can only start a new task, but we can't wait for it + # which causes the metrics to be delayed by one collection pass (not the best, but probably good enough) + # + # this issue can be prevented by calling the `collect_kresd_stats()` function manually before entering + # the Prometheus library. We just have to prevent the library from invoking it again. See the mentioned + # function for details + if compat.asyncio.is_event_loop_running(): + # when running, we can schedule the new data collection + if self._collection_task is not None and not self._collection_task.done(): + logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!") + else: + self._collection_task = compat.asyncio.create_task( + self.collect_kresd_stats(_triggered_from_prometheus_library=True) + ) -_graphite_bridge: Optional[GraphiteBridge] = None + else: + # when not running, we can start a new loop (we are not in the manager's main thread) + compat.asyncio.run(self.collect_kresd_stats(_triggered_from_prometheus_library=True)) + + +_resolver_collector: Optional[ResolverCollector] = None -@only_on_real_changes(lambda c: c.monitoring.graphite) -async def _configure_graphite_bridge(config: KresConfig) -> None: +async def _collect_stats() -> None: + # manually trigger stat collection so that we do not have to wait for it + if _resolver_collector is not None: + await _resolver_collector.collect_kresd_stats() + else: + raise RuntimeError("Function invoked before initializing the module!") + + +async def report_stats(prometheus_format: bool = False) -> Optional[bytes]: """ - Starts graphite bridge if required + Collects metrics from everything, returns data string in JSON (default) or Prometheus format. """ - global _graphite_bridge - if config.monitoring.graphite is not False and _graphite_bridge is None: - logger.info( - "Starting Graphite metrics exporter for [%s]:%d", - str(config.monitoring.graphite.host), - int(config.monitoring.graphite.port), - ) - _graphite_bridge = GraphiteBridge((str(config.monitoring.graphite.host), int(config.monitoring.graphite.port))) - _graphite_bridge.start( # type: ignore - interval=config.monitoring.graphite.interval.seconds(), prefix=str(config.monitoring.graphite.prefix) - ) + + # manually trigger stat collection so that we do not have to wait for it + if _resolver_collector is not None: + await _resolver_collector.collect_kresd_stats() + else: + raise RuntimeError("Function invoked before initializing the module!") + + if prometheus_format: + if _prometheus_support: + return exposition.generate_latest() # type: ignore + return None + return _resolver_collector.report_json().encode() async def init_monitoring(config_store: ConfigStore) -> None: """ Initialize monitoring. Must be called before any other function from this module. """ - # register metrics collector global _resolver_collector _resolver_collector = ResolverCollector(config_store) - REGISTRY.register(_resolver_collector) # type: ignore - # register graphite bridge - await config_store.register_verifier(_deny_turning_off_graphite_bridge) - await config_store.register_on_change_callback(_configure_graphite_bridge) + if _prometheus_support: + # register metrics collector + REGISTRY.register(_resolver_collector) # type: ignore + + # register graphite bridge + await config_store.register_verifier(_deny_turning_off_graphite_bridge) + await config_store.register_on_change_callback(_configure_graphite_bridge)