import asyncio
import importlib
-import json
import logging
-from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Tuple
+from typing import Any, Dict, Generator, List, Optional, Tuple
-from knot_resolver.controller.registered_workers import command_registered_workers, get_registered_workers_kresids
+from knot_resolver.controller.interface import KresID
+from knot_resolver.controller.registered_workers import get_registered_workers_kresids
from knot_resolver.datamodel.config_schema import KresConfig
from knot_resolver.manager.config_store import ConfigStore, only_on_real_changes_update
from knot_resolver.utils import compat
from knot_resolver.utils.functional import Result
-from knot_resolver.utils.modeling.parsing import DataFormat
-if TYPE_CHECKING:
- from knot_resolver.controller.interface import KresID
+from .collect import collect_kresd_workers_metrics
-logger = logging.getLogger(__name__)
-
-
-_prometheus_support = False
+_prometheus_client = False
if importlib.util.find_spec("prometheus_client"):
- _prometheus_support = True
+ _prometheus_client = True
+logger = logging.getLogger(__name__)
+
+if _prometheus_client:
-if _prometheus_support:
from prometheus_client import exposition # type: ignore
from prometheus_client.bridge.graphite import GraphiteBridge # type: ignore
from prometheus_client.core import GaugeMetricFamily # type: ignore
from prometheus_client.core import REGISTRY, CounterMetricFamily, HistogramMetricFamily, Metric
+ _graphite_bridge: Optional[GraphiteBridge] = None
+
+ _metrics_collector: Optional["KresPrometheusMetricsCollector"] = None
+
def _counter(name: str, description: str, label: Tuple[str, str], value: float) -> CounterMetricFamily:
c = CounterMetricFamily(name, description, labels=(label[0],))
c.add_metric((label[1],), value) # type: ignore
value=int(loaded),
)
- async def _deny_turning_off_graphite_bridge(old_config: KresConfig, new_config: KresConfig) -> Result[None, str]:
- if old_config.monitoring.graphite and not new_config.monitoring.graphite:
- return Result.err(
- "You can't turn off graphite monitoring dynamically. If you really want this feature, please let the developers know."
- )
-
- if (
- old_config.monitoring.graphite is not None
- and new_config.monitoring.graphite is not None
- and old_config.monitoring.graphite != new_config.monitoring.graphite
- ):
- return Result.err("Changing graphite exporter configuration in runtime is not allowed.")
-
- return Result.ok(None)
-
- _graphite_bridge: Optional[GraphiteBridge] = None
-
- @only_on_real_changes_update(lambda c: c.monitoring.graphite)
- async def _configure_graphite_bridge(config: KresConfig) -> None:
- """
- Starts graphite bridge if required
- """
- global _graphite_bridge
- if config.monitoring.graphite is not False and _graphite_bridge is None:
- logger.info(
- "Starting Graphite metrics exporter for [%s]:%d",
- str(config.monitoring.graphite.host),
- int(config.monitoring.graphite.port),
- )
- _graphite_bridge = GraphiteBridge(
- (str(config.monitoring.graphite.host), int(config.monitoring.graphite.port))
- )
- _graphite_bridge.start( # type: ignore
- interval=config.monitoring.graphite.interval.seconds(), prefix=str(config.monitoring.graphite.prefix)
- )
-
-
-class ResolverCollector:
- def __init__(self, config_store: ConfigStore) -> None:
- self._stats_raw: "Optional[Dict[KresID, object]]" = None
- self._config_store: ConfigStore = config_store
- self._collection_task: "Optional[asyncio.Task[None]]" = None
- self._skip_immediate_collection: bool = False
-
- if _prometheus_support:
+ class KresPrometheusMetricsCollector:
+ def __init__(self, config_store: ConfigStore) -> None:
+ self._stats_raw: "Optional[Dict[KresID, object]]" = None
+ self._config_store: ConfigStore = config_store
+ self._collection_task: "Optional[asyncio.Task[None]]" = None
+ self._skip_immediate_collection: bool = False
def collect(self) -> Generator[Metric, None, None]:
# schedule new stats collection
metrics = self._stats_raw[kresid]
yield from _parse_resolver_metrics(kresid, metrics)
success = True
- except json.JSONDecodeError:
- logger.warning(
- "Failed to load metrics from resolver instance %s: failed to parse statistics", str(kresid)
- )
except KeyError as e:
logger.warning(
"Failed to load metrics from resolver instance %s: attempted to read missing statistic %s",
# this function prevents the collector registry from invoking the collect function on startup
return []
- def report_json(self) -> str:
- # schedule new stats collection
- self._trigger_stats_collection()
-
- # if we have no data, return metrics with information about it and exit
- if self._stats_raw is None:
- no_stats_dict: Dict[str, None] = {}
- for kresid in get_registered_workers_kresids():
- no_stats_dict[str(kresid)] = None
- return DataFormat.JSON.dict_dump(no_stats_dict)
+ async def collect_kresd_stats(self, _triggered_from_prometheus_library: bool = False) -> None:
+ if self._skip_immediate_collection:
+ # this would happen because we are calling this function first manually before stat generation,
+ # and once again immediately afterwards caused by the prometheus library's stat collection
+ #
+ # this is a code made to solve problem with calling async functions from sync methods
+ self._skip_immediate_collection = False
+ return
- stats_dict: Dict[str, object] = {}
- for kresid, stats in self._stats_raw.items():
- stats_dict[str(kresid)] = stats
+ config = self._config_store.get()
+ self._stats_raw = await collect_kresd_workers_metrics(config)
- return DataFormat.JSON.dict_dump(stats_dict)
+ # if this function was not called by the prometheus library and calling collect() is imminent,
+ # we should block the next collection cycle as it would be useless
+ if not _triggered_from_prometheus_library:
+ self._skip_immediate_collection = True
- async def collect_kresd_stats(self, _triggered_from_prometheus_library: bool = False) -> None:
- if self._skip_immediate_collection:
- # this would happen because we are calling this function first manually before stat generation,
- # and once again immediately afterwards caused by the prometheus library's stat collection
+ def _trigger_stats_collection(self) -> None:
+ # we are running inside an event loop, but in a synchronous function and that sucks a lot
+ # it means that we shouldn't block the event loop by performing a blocking stats collection
+ # but it also means that we can't yield to the event loop as this function is synchronous
+ # therefore we can only start a new task, but we can't wait for it
+ # which causes the metrics to be delayed by one collection pass (not the best, but probably good enough)
#
- # this is a code made to solve problem with calling async functions from sync methods
- self._skip_immediate_collection = False
- return
-
- config = self._config_store.get()
-
- if config.monitoring.enabled == "manager-only":
- logger.debug("Skipping kresd stat collection due to configuration")
- self._stats_raw = None
- return
-
- lazy = config.monitoring.enabled == "lazy"
- cmd = "collect_lazy_statistics()" if lazy else "collect_statistics()"
- logger.debug("Collecting kresd stats with method '%s'", cmd)
- stats_raw = await command_registered_workers(cmd)
- self._stats_raw = stats_raw
-
- # if this function was not called by the prometheus library and calling collect() is imminent,
- # we should block the next collection cycle as it would be useless
- if not _triggered_from_prometheus_library:
- self._skip_immediate_collection = True
-
- def _trigger_stats_collection(self) -> None:
- # we are running inside an event loop, but in a synchronous function and that sucks a lot
- # it means that we shouldn't block the event loop by performing a blocking stats collection
- # but it also means that we can't yield to the event loop as this function is synchronous
- # therefore we can only start a new task, but we can't wait for it
- # which causes the metrics to be delayed by one collection pass (not the best, but probably good enough)
- #
- # this issue can be prevented by calling the `collect_kresd_stats()` function manually before entering
- # the Prometheus library. We just have to prevent the library from invoking it again. See the mentioned
- # function for details
-
- if compat.asyncio.is_event_loop_running():
- # when running, we can schedule the new data collection
- if self._collection_task is not None and not self._collection_task.done():
- logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!")
- else:
- self._collection_task = compat.asyncio.create_task(
- self.collect_kresd_stats(_triggered_from_prometheus_library=True)
- )
-
- else:
- # when not running, we can start a new loop (we are not in the manager's main thread)
- compat.asyncio.run(self.collect_kresd_stats(_triggered_from_prometheus_library=True))
-
-
-_resolver_collector: Optional[ResolverCollector] = None
-
+ # this issue can be prevented by calling the `collect_kresd_stats()` function manually before entering
+ # the Prometheus library. We just have to prevent the library from invoking it again. See the mentioned
+ # function for details
+
+ if compat.asyncio.is_event_loop_running():
+ # when running, we can schedule the new data collection
+ if self._collection_task is not None and not self._collection_task.done():
+ logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!")
+ else:
+ self._collection_task = compat.asyncio.create_task(
+ self.collect_kresd_stats(_triggered_from_prometheus_library=True)
+ )
-async def _collect_stats() -> None:
- # manually trigger stat collection so that we do not have to wait for it
- if _resolver_collector is not None:
- await _resolver_collector.collect_kresd_stats()
- else:
- raise RuntimeError("Function invoked before initializing the module!")
+ else:
+ # when not running, we can start a new loop (we are not in the manager's main thread)
+ compat.asyncio.run(self.collect_kresd_stats(_triggered_from_prometheus_library=True))
+ @only_on_real_changes_update(lambda c: c.monitoring.graphite)
+ async def _init_graphite_bridge(config: KresConfig) -> None:
+ """
+ Starts graphite bridge if required
+ """
+ global _graphite_bridge
+ if config.monitoring.graphite is not False and _graphite_bridge is None:
+ logger.info(
+ "Starting Graphite metrics exporter for [%s]:%d",
+ str(config.monitoring.graphite.host),
+ int(config.monitoring.graphite.port),
+ )
+ _graphite_bridge = GraphiteBridge(
+ (str(config.monitoring.graphite.host), int(config.monitoring.graphite.port))
+ )
+ _graphite_bridge.start( # type: ignore
+ interval=config.monitoring.graphite.interval.seconds(), prefix=str(config.monitoring.graphite.prefix)
+ )
-async def report_stats(prometheus_format: bool = False) -> Optional[bytes]:
- """
- Collects metrics from everything, returns data string in JSON (default) or Prometheus format.
- """
+ async def _deny_turning_off_graphite_bridge(old_config: KresConfig, new_config: KresConfig) -> Result[None, str]:
+ if old_config.monitoring.graphite and not new_config.monitoring.graphite:
+ return Result.err(
+ "You can't turn off graphite monitoring dynamically. If you really want this feature, please let the developers know."
+ )
- # manually trigger stat collection so that we do not have to wait for it
- if _resolver_collector is not None:
- await _resolver_collector.collect_kresd_stats()
- else:
- raise RuntimeError("Function invoked before initializing the module!")
+ if (
+ old_config.monitoring.graphite is not None
+ and new_config.monitoring.graphite is not None
+ and old_config.monitoring.graphite != new_config.monitoring.graphite
+ ):
+ return Result.err("Changing graphite exporter configuration in runtime is not allowed.")
- if prometheus_format:
- if _prometheus_support:
- return exposition.generate_latest() # type: ignore
- return None
- return _resolver_collector.report_json().encode()
+ return Result.ok(None)
-async def init_monitoring(config_store: ConfigStore) -> None:
+async def init_prometheus(config_store: ConfigStore) -> None:
"""
- Initialize monitoring. Must be called before any other function from this module.
+ Initialize metrics collection. Must be called before any other function from this module.
"""
- global _resolver_collector
- _resolver_collector = ResolverCollector(config_store)
-
- if _prometheus_support:
- # register metrics collector
- REGISTRY.register(_resolver_collector) # type: ignore
+ if _prometheus_client:
+ # init and register metrics collector
+ global _metrics_collector
+ _metrics_collector = KresPrometheusMetricsCollector(config_store)
+ REGISTRY.register(_metrics_collector) # type: ignore
# register graphite bridge
await config_store.register_verifier(_deny_turning_off_graphite_bridge)
- await config_store.register_on_change_callback(_configure_graphite_bridge)
+ await config_store.register_on_change_callback(_init_graphite_bridge)
+
+
+async def report_prometheus() -> Optional[bytes]:
+ if _prometheus_client:
+ # manually trigger stat collection so that we do not have to wait for it
+ if _metrics_collector is not None:
+ await _metrics_collector.collect_kresd_stats()
+ else:
+ raise RuntimeError("Function invoked before initializing the module!")
+ return exposition.generate_latest() # type: ignore
+ return None