import logging
from pathlib import Path
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Optional
+from knot_resolver_manager.config_store import ConfigStore
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.utils import which
+from knot_resolver_manager.utils.functional import Result
if TYPE_CHECKING:
from knot_resolver_manager.kresd_controller.interface import KresID
"""
Used in KresdManager. It's a number of seconds in between system health checks.
"""
+
+
+class _UserConstants:
+ """
+ Class for accessing constants, which are technically not constants as they are user configurable.
+ """
+
+ def __init__(self, config_store: ConfigStore) -> None:
+ self._config_store = config_store
+
+ @property
+ def ID(self) -> str:
+ return self._config_store.get().id
+
+
+_user_constants: Optional[_UserConstants] = None
+
+
+async def _deny_id_changes(config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
+ if config_old.id != config_new.id:
+ return Result.err(
+ "/id: Based on the groupid, the manager recognizes subprocesses,"
+ " so it is not possible to change it while services are running."
+ )
+ return Result.ok(None)
+
+
+async def init_user_constants(config_store: ConfigStore) -> None:
+ global _user_constants
+ _user_constants = _UserConstants(config_store)
+
+ await config_store.register_verifier(_deny_id_changes)
+
+
+def user_constants() -> _UserConstants:
+ assert _user_constants is not None
+ return _user_constants
# it should be removed and relative path used instead as soon as issue
# https://gitlab.nic.cz/knot/knot-resolver/-/issues/720 is fixed
return _MAIN_TEMPLATE.render(cfg=self, cwd=os.getcwd()) # pyright: reportUnknownMemberType=false
+
+ @staticmethod
+ def test_instance() -> "KresConfig":
+ """
+ Funtion used just for testing purposes. Creates an instance of KresConfig without requiring
+ any arguments.
+ """
+ return KresConfig({"id": "test"})
from knot_resolver_manager import compat
from knot_resolver_manager.compat.asyncio import to_thread
+from knot_resolver_manager.constants import user_constants
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.exceptions import SubprocessControllerException
from knot_resolver_manager.kresd_controller.interface import (
from knot_resolver_manager.kresd_controller.systemd.dbus_api import (
SystemdType,
Unit,
- is_unit_failed,
- list_our_slice_processes,
list_units,
reset_failed_unit,
restart_unit,
logger = logging.getLogger(__name__)
-GC_SERVICE_BASE_NAME = "kres_cache_gc.service"
-KRESD_SERVICE_BASE_PATTERN = re.compile(r"^kresd_([0-9]+).service$")
+GC_SERVICE_BASE_NAME = "kres-cache-gc-{id}.service"
+KRESD_SERVICE_BASE_NMAE = "kresd-{id}-{num}.service"
+KRESD_SERVICE_BASE_PATTERN = re.compile(r"^kresd-([0-9a-zA-Z]*)-([0-9]+).service$")
def _is_service_name_ours(name: str) -> bool:
- is_ours = name == GC_SERVICE_BASE_NAME
- is_ours |= bool(KRESD_SERVICE_BASE_PATTERN.match(name))
+ is_ours = name == GC_SERVICE_BASE_NAME.format(id=user_constants().ID)
+ m = KRESD_SERVICE_BASE_PATTERN.match(name)
+ is_ours |= m is not None and m.groups()[0] == user_constants().ID
return is_ours
class SystemdKresID(KresID):
@staticmethod
def from_string(val: str) -> "SystemdKresID":
- if val == GC_SERVICE_BASE_NAME:
+ if val == GC_SERVICE_BASE_NAME.format(id=user_constants().ID):
return SystemdKresID.new(SubprocessType.GC, -1)
else:
kid = KRESD_SERVICE_BASE_PATTERN.search(val)
- if kid:
- return SystemdKresID.new(SubprocessType.KRESD, int(kid.groups()[0]))
+ if kid and kid.groups()[0] == user_constants().ID:
+ return SystemdKresID.new(SubprocessType.KRESD, int(kid.groups()[1]))
else:
raise RuntimeError("Trying to parse systemd service name which does not match our expectations")
def __str__(self) -> str:
if self.subprocess_type is SubprocessType.GC:
- return GC_SERVICE_BASE_NAME
+ return GC_SERVICE_BASE_NAME.format(id=user_constants().ID)
elif self.subprocess_type is SubprocessType.KRESD:
- return f"kresd_{self._id}.service"
+ return KRESD_SERVICE_BASE_NMAE.format(id=user_constants().ID, num=self._id)
else:
raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}")
async def get_all_running_instances(self) -> Iterable[Subprocess]:
assert self._controller_config is not None
- units = await compat.asyncio.to_thread(list_our_slice_processes, self._controller_config, self._systemd_type)
-
- async def load(name: str) -> Optional[SystemdSubprocess]:
+ # There are two possibilities (that I knew about when writing this) how to implement this function. We could
+ #
+ # 1. list all units in the system/session
+ # 2. list processes within our slice
+ #
+ # With the list of all units, we would get information about unit states with one DBus method call. However,
+ # there are usually lot of units and the message being passed through DBus is quite big.
+ #
+ # Other option is to query processes within our slice. We can extract service names from the result of the call,
+ # we won't however know whether the units are failed. Actually, we won't know about failed units. This is in
+ # general cheaper as there won't be any processes in the slice. However, missing any failed units would lead to
+ # problems later on - we have to reset state of those. Therefore, we have to query all units and use the first
+ # method.
+ units = await compat.asyncio.to_thread(list_units, self._systemd_type)
+
+ async def load(unit: Unit) -> Optional[SystemdSubprocess]:
assert self._controller_config is not None
- if _is_service_name_ours(name):
- failed = await to_thread(is_unit_failed, self._systemd_type, name)
- if failed:
+ if _is_service_name_ours(unit.name):
+ if unit.state == "failed":
# if a unit is failed, remove it from the system by reseting its state
- logger.warning("Unit '%s' is already failed, resetting its state and ignoring it", name)
- await compat.asyncio.to_thread(reset_failed_unit, self._systemd_type, name)
+ logger.warning("Unit '%s' is already failed, resetting its state and ignoring it", unit.name)
+ await compat.asyncio.to_thread(reset_failed_unit, self._systemd_type, unit.name)
return None
return SystemdSubprocess(
self._controller_config,
self._systemd_type,
- SystemdKresID.from_string(name),
+ SystemdKresID.from_string(unit.name),
)
else:
return None
from knot_resolver_manager import log, statistics
from knot_resolver_manager.compat import asyncio as asyncio_compat
from knot_resolver_manager.config_store import ConfigStore
-from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE, PID_FILE_NAME
+from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE, PID_FILE_NAME, init_user_constants
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.datamodel.server_schema import ManagementSchema
from knot_resolver_manager.exceptions import DataException, KresManagerException, SchemaException
async def _init_config_store(config: ParsedTree) -> ConfigStore:
config_validated = await _load_config(config)
config_store = ConfigStore(config_validated)
+ await init_user_constants(config_store)
return config_store
os.kill(pid, 0)
except OSError as e:
if e.errno == errno.ESRCH:
- logger.warning(
- "Detected old lock file in the working directory, previous instance of the manager must have crashed."
- )
os.unlink(PID_FILE_NAME)
_lock_working_directory(attempt=attempt + 1)
return
res.append(_validated_object_type(tp, val, object_path=f"{object_path}[{i}]"))
except SchemaException as e:
errs.append(e)
- if len(errs) > 0:
+ if len(errs) == 1:
+ raise errs[0]
+ elif len(errs) > 1:
raise AggregateSchemaException(object_path, child_exceptions=errs)
return tuple(res)
res[nkey] = nval
except SchemaException as e:
errs.append(e)
- if len(errs) > 0:
+ if len(errs) == 1:
+ raise errs[0]
+ elif len(errs) > 1:
raise AggregateSchemaException(object_path, child_exceptions=errs)
return res
except AttributeError as e:
res.append(_validated_object_type(inner_type, val, object_path=f"{object_path}[{i}]"))
except SchemaException as e:
errs.append(e)
- if len(errs) > 0:
+ if len(errs) == 1:
+ raise errs[0]
+ elif len(errs) > 1:
raise AggregateSchemaException(object_path, child_exceptions=errs)
return res
def test_dns64_true():
- config = KresConfig({"dns64": True})
+ config = KresConfig({"id": "test", "dns64": True})
assert config.dns64
assert config.dns64.prefix == IPv6Network96("64:ff9b::/96")
def test_dns64_default_false():
- config = KresConfig()
+ config = KresConfig.test_instance()
assert config.dns64 == False
def test_dnssec_false():
- config = KresConfig({"dnssec": False})
+ config = KresConfig({"id": "test", "dnssec": False})
assert config.dnssec == False
def test_dnssec_default_true():
- config = KresConfig()
+ config = KresConfig.test_instance()
assert config.dnssec.trust_anchor_sentinel == True
assert config.dnssec.trust_anchor_signal_query == True
count = 0
@only_on_real_changes(lambda config: config.logging.level)
- async def change_callback(config: KresConfig):
+ async def change_callback(config: KresConfig) -> None:
nonlocal count
count += 1
- config = KresConfig()
+ config = KresConfig.test_instance()
store = ConfigStore(config)
await store.register_on_change_callback(change_callback)
assert count == 1
- config = KresConfig()
+ config = KresConfig.test_instance()
config.logging.level = "crit"
await store.update(config)
assert count == 2
- config = KresConfig()
+ config = KresConfig.test_instance()
config.lua.script_only = True
config.lua.script = "meaningless value"
await store.update(config)