from knot_resolver_manager.constants import kresd_config_file
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.exceptions import SubprocessControllerException
-from knot_resolver_manager.statistics import register_resolver_metrics_for, unregister_resolver_metrics_for
from knot_resolver_manager.utils.async_utils import writefile
+from knot_resolver_manager.kresd_controller.registered_workers import register_worker, unregister_worker
+
logger = logging.getLogger(__name__)
def __init__(self, config: KresConfig, kid: KresID) -> None:
self._id = kid
self._config = config
- self._metrics_registered: bool = False
+ self._registered_worker: bool = False
async def start(self) -> None:
# create config file
try:
await self._start()
if self.type is SubprocessType.KRESD:
- register_resolver_metrics_for(self)
- self._metrics_registered = True
+ register_worker(self)
+ self._registered_worker = True
except SubprocessControllerException as e:
kresd_config_file(self._config, self.id).unlink()
raise e
await self._restart()
async def stop(self) -> None:
- if self._metrics_registered:
- unregister_resolver_metrics_for(self)
+ if self._registered_worker:
+ unregister_worker(self)
await self._stop()
await self.cleanup()
--- /dev/null
+import asyncio
+import logging
+from typing import TYPE_CHECKING, Dict, List, Tuple
+
+if TYPE_CHECKING:
+ from knot_resolver_manager.kresd_controller.interface import KresID, Subprocess
+
+
+logger = logging.getLogger(__name__)
+
+
+_REGISTERED_WORKERS: "Dict[KresID, Subprocess]" = {}
+
+
+def get_registered_workers_kids() -> "List[KresID]":
+ return list(_REGISTERED_WORKERS.keys())
+
+
+async def command_single_registered_worker(cmd: str) -> "Tuple[KresID, str]":
+ for sub in _REGISTERED_WORKERS.values():
+ return sub.id, await sub.command(cmd)
+ raise SubprocessControllerException(
+ "Unable to execute the command. There is no kresd worker running to execute the command."
+ "Try start/restart the resolver.",
+ )
+
+
+async def command_registered_workers(cmd: str) -> "Dict[KresID, str]":
+ async def single_pair(sub: "Subprocess") -> "Tuple[KresID, str]":
+ return sub.id, await sub.command(cmd)
+
+ pairs = await asyncio.gather(*(single_pair(inst) for inst in _REGISTERED_WORKERS.values()))
+ return dict(pairs)
+
+
+def unregister_worker(subprocess: "Subprocess") -> None:
+ """
+ Unregister kresd worker "Subprocess" from the list.
+ """
+ del _REGISTERED_WORKERS[subprocess.id]
+
+
+def register_worker(subprocess: "Subprocess") -> None:
+ """
+ Register kresd worker "Subprocess" on the list.
+ """
+ _REGISTERED_WORKERS[subprocess.id] = subprocess
from knot_resolver_manager import compat
from knot_resolver_manager.config_store import ConfigStore, only_on_real_changes
from knot_resolver_manager.datamodel.config_schema import KresConfig
+from knot_resolver_manager.kresd_controller.registered_workers import (
+ command_registered_workers,
+ get_registered_workers_kids,
+)
from knot_resolver_manager.utils.functional import Result
if TYPE_CHECKING:
- from knot_resolver_manager.kresd_controller.interface import KresID, Subprocess
+ from knot_resolver_manager.kresd_controller.interface import KresID
logger = logging.getLogger(__name__)
"manager_request_reconfigure_latency", "Time it takes to change configuration"
)
-_REGISTERED_RESOLVERS: "Dict[KresID, Subprocess]" = {}
-
T = TypeVar("T")
return decorator
-async def _command_registered_resolvers(cmd: str) -> "Dict[KresID, str]":
- async def single_pair(sub: "Subprocess") -> "Tuple[KresID, str]":
- return sub.id, await sub.command(cmd)
-
- pairs = await asyncio.gather(*(single_pair(inst) for inst in _REGISTERED_RESOLVERS.values()))
- return dict(pairs)
-
-
def _counter(name: str, description: str, label: Tuple[str, str], value: float) -> CounterMetricFamily:
c = CounterMetricFamily(name, description, labels=(label[0],))
c.add_metric((label[1],), value) # type: ignore
lazy = config.monitoring.enabled == "lazy"
cmd = "collect_lazy_statistics()" if lazy else "collect_statistics()"
logger.debug("Collecting kresd stats with method '%s'", cmd)
- stats_raw = await _command_registered_resolvers(cmd)
+ stats_raw = await command_registered_workers(cmd)
self._stats_raw = stats_raw
# if this function was not called by the prometheus library and calling collect() is imminent,
# if we have no data, return metrics with information about it and exit
if self._stats_raw is None:
- for kid in _REGISTERED_RESOLVERS:
+ for kid in get_registered_workers_kids():
yield self._create_resolver_metrics_loaded_gauge(kid, False)
return
# if we have data, parse them
- for kid in _REGISTERED_RESOLVERS:
+ for kid in get_registered_workers_kids():
success = False
try:
if kid in self._stats_raw:
_resolver_collector: Optional[ResolverCollector] = None
-def unregister_resolver_metrics_for(subprocess: "Subprocess") -> None:
- """
- Cancel metric collection from resolver "Subprocess"
- """
- del _REGISTERED_RESOLVERS[subprocess.id]
-
-
-def register_resolver_metrics_for(subprocess: "Subprocess") -> None:
- """
- Register resolver "Subprocess" for metric collection
- """
- _REGISTERED_RESOLVERS[subprocess.id] = subprocess
-
-
async def report_stats() -> bytes:
"""
Collects metrics from everything, returns data string in Prometheus format.