from typing import Any, Awaitable, Callable, Dict, Generator, List, Optional, Tuple, TypeVar
from prometheus_client import Histogram, exposition # type: ignore
+from prometheus_client.bridge.graphite import GraphiteBridge # type: ignore
from prometheus_client.core import ( # type: ignore
REGISTRY,
CounterMetricFamily,
Metric,
)
+from knot_resolver_manager import compat
+from knot_resolver_manager.config_store import ConfigStore, only_on_real_changes
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.kres_id import KresID
from knot_resolver_manager.kresd_controller.interface import Subprocess
+from knot_resolver_manager.utils.functional import Result
logger = logging.getLogger(__name__)
class ResolverCollector:
- def __init__(self) -> None:
+ def __init__(self, config_store: ConfigStore) -> None:
self._stats_raw: Optional[Dict[KresID, str]] = None
+ self._config_store: ConfigStore = config_store
+ self._collection_task: "Optional[asyncio.Task[None]]" = None
- def set_stats(self, stats_raw: Optional[Dict[KresID, str]]) -> None:
- self._stats_raw = stats_raw
+ def _trigger_stats_collection(self) -> None:
+ async def collect(config: KresConfig) -> None:
+ if config.monitoring.enabled == "manager-only":
+ self._stats_raw = None
+ return
+
+ lazy = config.monitoring.enabled == "lazy"
+ cmd = "collect_lazy_statistics()" if lazy else "collect_statistics()"
+ stats_raw = await _command_registered_resolvers(cmd)
+ self._stats_raw = stats_raw
+
+ # we are running inside an event loop, but in a synchronous function and that sucks a lot
+ # it means that we shouldn't block the event loop by performing a blocking stats collection
+ # but it also means that we can't yield to the event loop as this function is synchronous
+ # therefore we can only start a new task, but we can't wait for it
+ # which causes the metrics to be delayed by one collection pass (not the best, but probably good enough)
+ if self._collection_task is not None and not self._collection_task.done():
+ logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!")
+ else:
+ self._collection_task = compat.asyncio.create_task(collect(self._config_store.get()))
+
+ def _create_resolver_metrics_loaded_gauge(self, kid: KresID, loaded: bool) -> GaugeMetricFamily:
+ return _gauge(
+ "resolver_metrics_loaded",
+ "0 if metrics from resolver instance were not loaded, otherwise 1",
+ label=("instance_id", str(kid)),
+ value=int(loaded),
+ )
def collect(self) -> Generator[Metric, None, None]:
+ # schedule new stats collection
+ self._trigger_stats_collection()
+
+ # if we have no data, return metrics with information about it and exit
if self._stats_raw is None:
+ for kid in _REGISTERED_RESOLVERS:
+ yield self._create_resolver_metrics_loaded_gauge(kid, False)
return
- for kid, raw in self._stats_raw.items():
+ # if we have data, parse them
+ for kid in _REGISTERED_RESOLVERS:
success = False
try:
- metrics: Dict[str, int] = json.loads(raw[1:-1])
- yield from self._parse_resolver_metrics(kid, metrics)
- success = True
+ if kid in self._stats_raw:
+ raw = self._stats_raw[kid]
+ metrics: Dict[str, int] = json.loads(raw[1:-1])
+ yield from self._parse_resolver_metrics(kid, metrics)
+ success = True
except json.JSONDecodeError:
logger.warning("Failed to load metrics from resolver instance %s: failed to parse statistics", str(kid))
except KeyError as e:
str(e),
)
- yield _gauge(
- "resolver_metrics_loaded",
- "0 if metrics from resolver instance were not loaded, otherwise 1",
- label=("instance_id", str(kid)),
- value=int(success),
- )
+ yield self._create_resolver_metrics_loaded_gauge(kid, success)
def _parse_resolver_metrics(self, instance_id: KresID, metrics: Any) -> Generator[Metric, None, None]:
sid = str(instance_id)
)
-_RESOLVER_COLLECTOR = ResolverCollector()
-REGISTRY.register(_RESOLVER_COLLECTOR) # type: ignore
-
-
def unregister_resolver_metrics_for(subprocess: Subprocess) -> None:
"""
Cancel metric collection from resolver subprocess
_REGISTERED_RESOLVERS[subprocess.id] = subprocess
-async def report_stats(config: KresConfig) -> bytes:
+async def report_stats() -> bytes:
"""
Collects metrics from everything, returns data string in Prometheus format.
"""
+ return exposition.generate_latest() # type: ignore
- try:
- if config.monitoring.state != "manager-only":
- lazy = config.monitoring.state == "lazy"
- ON_DEMAND_STATS_QUERY = "collect_lazy_statistics()"
- STATS_QUERY = "collect_statistics()"
- cmd = ON_DEMAND_STATS_QUERY if lazy else STATS_QUERY
- stats_raw = await _command_registered_resolvers(cmd)
- _RESOLVER_COLLECTOR.set_stats(stats_raw)
+async def _deny_turning_off_graphite_bridge(old_config: KresConfig, new_config: KresConfig) -> Result[None, str]:
+ if old_config.monitoring.graphite is not None and new_config.monitoring.graphite is None:
+ return Result.err(
+ "You can't turn off graphite monitoring dynamically. If you really want this feature, please let the developers know."
+ )
+
+ if (
+ old_config.monitoring.graphite is not None
+ and new_config.monitoring.graphite is not None
+ and old_config.monitoring.graphite != new_config.monitoring.graphite
+ ):
+ return Result.err("Changing graphite exporter configuration in runtime is not allowed.")
+
+ return Result.ok(None)
+
+
+_graphite_bridge: Optional[GraphiteBridge] = None
+
+
+@only_on_real_changes(lambda c: c.monitoring.graphite)
+async def _configure_graphite_bridge(config: KresConfig) -> None:
+ """
+ Starts graphite bridge if required
+ """
+ global _graphite_bridge
+ if config.monitoring.graphite is not False and _graphite_bridge is None:
+ logger.info("Starting Graphite metrics exporter for [%s]:%d", config.monitoring.graphite.host, config.monitoring.graphite.port)
+ _graphite_bridge = GraphiteBridge((config.monitoring.graphite.host, config.monitoring.graphite.port))
+ _graphite_bridge.start( # type: ignore
+ interval=config.monitoring.graphite.interval_sec.seconds(), prefix=config.monitoring.graphite.prefix
+ )
+
+
+async def init_monitoring(config_store: ConfigStore) -> None:
+ """
+ Initialize monitoring. Must be called before any other function from this module.
+ """
+ # register metrics collector
+ _RESOLVER_COLLECTOR = ResolverCollector(config_store)
+ REGISTRY.register(_RESOLVER_COLLECTOR) # type: ignore
- return exposition.generate_latest() # type: ignore
- finally:
- # after the report has been generated, clean everything
- _RESOLVER_COLLECTOR.set_stats(None)
+ # register graphite bridge
+ await config_store.register_verifier(_deny_turning_off_graphite_bridge)
+ await config_store.register_on_change_callback(_configure_graphite_bridge)