]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
bug fixes within manager's slice usage (in systemd)
authorVasek Sraier <git@vakabus.cz>
Wed, 23 Mar 2022 22:47:19 +0000 (23:47 +0100)
committerAleš Mrázek <ales.mrazek@nic.cz>
Fri, 8 Apr 2022 14:17:54 +0000 (16:17 +0200)
The previous implementation was actually flawed, because:
- we were not detecting failed units on startup
- we were using same service names and counting on different slices being different namespaces. This is NOT valid assumption.

manager/knot_resolver_manager/constants.py
manager/knot_resolver_manager/datamodel/config_schema.py
manager/knot_resolver_manager/kresd_controller/systemd/__init__.py
manager/knot_resolver_manager/server.py
manager/knot_resolver_manager/utils/modelling.py
manager/tests/unit/datamodel/test_config_schema.py
manager/tests/unit/test_config_store.py

index 44eea9308c052572fa61e9f4d7d31a52ebeb67f5..1f2e7f71134ede7cf7551d0ee24f366b0b73dae3 100644 (file)
@@ -1,9 +1,11 @@
 import logging
 from pathlib import Path
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Optional
 
+from knot_resolver_manager.config_store import ConfigStore
 from knot_resolver_manager.datamodel.config_schema import KresConfig
 from knot_resolver_manager.utils import which
+from knot_resolver_manager.utils.functional import Result
 
 if TYPE_CHECKING:
     from knot_resolver_manager.kresd_controller.interface import KresID
@@ -59,3 +61,40 @@ WATCHDOG_INTERVAL: float = 5
 """
 Used in KresdManager. It's a number of seconds in between system health checks.
 """
+
+
+class _UserConstants:
+    """
+    Class for accessing constants, which are technically not constants as they are user configurable.
+    """
+
+    def __init__(self, config_store: ConfigStore) -> None:
+        self._config_store = config_store
+
+    @property
+    def ID(self) -> str:
+        return self._config_store.get().id
+
+
+_user_constants: Optional[_UserConstants] = None
+
+
+async def _deny_id_changes(config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
+    if config_old.id != config_new.id:
+        return Result.err(
+            "/id: Based on the groupid, the manager recognizes subprocesses,"
+            " so it is not possible to change it while services are running."
+        )
+    return Result.ok(None)
+
+
+async def init_user_constants(config_store: ConfigStore) -> None:
+    global _user_constants
+    _user_constants = _UserConstants(config_store)
+
+    await config_store.register_verifier(_deny_id_changes)
+
+
+def user_constants() -> _UserConstants:
+    assert _user_constants is not None
+    return _user_constants
index 3b346ffb42abe444deeaff9e9920e8885c1f88b2..0c02232d8b9763f32a2ab198e67f4e34ac3e207f 100644 (file)
@@ -134,3 +134,11 @@ class KresConfig(SchemaNode):
         # it should be removed and relative path used instead as soon as issue
         # https://gitlab.nic.cz/knot/knot-resolver/-/issues/720 is fixed
         return _MAIN_TEMPLATE.render(cfg=self, cwd=os.getcwd())  # pyright: reportUnknownMemberType=false
+
+    @staticmethod
+    def test_instance() -> "KresConfig":
+        """
+        Funtion used just for testing purposes. Creates an instance of KresConfig without requiring
+        any arguments.
+        """
+        return KresConfig({"id": "test"})
index 9b0ae41728a9fcc5f8e75f8d6a4852e8f924d180..5848fa7d413c5887c1c545faa8d32c1b6b399a81 100644 (file)
@@ -6,6 +6,7 @@ from typing import Dict, Iterable, List, Optional, Union
 
 from knot_resolver_manager import compat
 from knot_resolver_manager.compat.asyncio import to_thread
+from knot_resolver_manager.constants import user_constants
 from knot_resolver_manager.datamodel.config_schema import KresConfig
 from knot_resolver_manager.exceptions import SubprocessControllerException
 from knot_resolver_manager.kresd_controller.interface import (
@@ -18,8 +19,6 @@ from knot_resolver_manager.kresd_controller.interface import (
 from knot_resolver_manager.kresd_controller.systemd.dbus_api import (
     SystemdType,
     Unit,
-    is_unit_failed,
-    list_our_slice_processes,
     list_units,
     reset_failed_unit,
     restart_unit,
@@ -34,33 +33,35 @@ from knot_resolver_manager.utils.async_utils import call
 logger = logging.getLogger(__name__)
 
 
-GC_SERVICE_BASE_NAME = "kres_cache_gc.service"
-KRESD_SERVICE_BASE_PATTERN = re.compile(r"^kresd_([0-9]+).service$")
+GC_SERVICE_BASE_NAME = "kres-cache-gc-{id}.service"
+KRESD_SERVICE_BASE_NMAE = "kresd-{id}-{num}.service"
+KRESD_SERVICE_BASE_PATTERN = re.compile(r"^kresd-([0-9a-zA-Z]*)-([0-9]+).service$")
 
 
 def _is_service_name_ours(name: str) -> bool:
-    is_ours = name == GC_SERVICE_BASE_NAME
-    is_ours |= bool(KRESD_SERVICE_BASE_PATTERN.match(name))
+    is_ours = name == GC_SERVICE_BASE_NAME.format(id=user_constants().ID)
+    m = KRESD_SERVICE_BASE_PATTERN.match(name)
+    is_ours |= m is not None and m.groups()[0] == user_constants().ID
     return is_ours
 
 
 class SystemdKresID(KresID):
     @staticmethod
     def from_string(val: str) -> "SystemdKresID":
-        if val == GC_SERVICE_BASE_NAME:
+        if val == GC_SERVICE_BASE_NAME.format(id=user_constants().ID):
             return SystemdKresID.new(SubprocessType.GC, -1)
         else:
             kid = KRESD_SERVICE_BASE_PATTERN.search(val)
-            if kid:
-                return SystemdKresID.new(SubprocessType.KRESD, int(kid.groups()[0]))
+            if kid and kid.groups()[0] == user_constants().ID:
+                return SystemdKresID.new(SubprocessType.KRESD, int(kid.groups()[1]))
             else:
                 raise RuntimeError("Trying to parse systemd service name which does not match our expectations")
 
     def __str__(self) -> str:
         if self.subprocess_type is SubprocessType.GC:
-            return GC_SERVICE_BASE_NAME
+            return GC_SERVICE_BASE_NAME.format(id=user_constants().ID)
         elif self.subprocess_type is SubprocessType.KRESD:
-            return f"kresd_{self._id}.service"
+            return KRESD_SERVICE_BASE_NMAE.format(id=user_constants().ID, num=self._id)
         else:
             raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}")
 
@@ -130,23 +131,35 @@ class SystemdSubprocessController(SubprocessController):
     async def get_all_running_instances(self) -> Iterable[Subprocess]:
         assert self._controller_config is not None
 
-        units = await compat.asyncio.to_thread(list_our_slice_processes, self._controller_config, self._systemd_type)
-
-        async def load(name: str) -> Optional[SystemdSubprocess]:
+        # There are two possibilities (that I knew about when writing this) how to implement this function. We could
+        #
+        # 1. list all units in the system/session
+        # 2. list processes within our slice
+        #
+        # With the list of all units, we would get information about unit states with one DBus method call. However,
+        # there are usually lot of units and the message being passed through DBus is quite big.
+        #
+        # Other option is to query processes within our slice. We can extract service names from the result of the call,
+        # we won't however know whether the units are failed. Actually, we won't know about failed units. This is in
+        # general cheaper as there won't be any processes in the slice. However, missing any failed units would lead to
+        # problems later on - we have to reset state of those. Therefore, we have to query all units and use the first
+        # method.
+        units = await compat.asyncio.to_thread(list_units, self._systemd_type)
+
+        async def load(unit: Unit) -> Optional[SystemdSubprocess]:
             assert self._controller_config is not None
 
-            if _is_service_name_ours(name):
-                failed = await to_thread(is_unit_failed, self._systemd_type, name)
-                if failed:
+            if _is_service_name_ours(unit.name):
+                if unit.state == "failed":
                     # if a unit is failed, remove it from the system by reseting its state
-                    logger.warning("Unit '%s' is already failed, resetting its state and ignoring it", name)
-                    await compat.asyncio.to_thread(reset_failed_unit, self._systemd_type, name)
+                    logger.warning("Unit '%s' is already failed, resetting its state and ignoring it", unit.name)
+                    await compat.asyncio.to_thread(reset_failed_unit, self._systemd_type, unit.name)
                     return None
 
                 return SystemdSubprocess(
                     self._controller_config,
                     self._systemd_type,
-                    SystemdKresID.from_string(name),
+                    SystemdKresID.from_string(unit.name),
                 )
             else:
                 return None
index a35f4a10dd104a6ea6e94157b022f4717cf7b50c..9b2a08e8d78ef8052e2fcd8ac61197dc0cf21ea2 100644 (file)
@@ -19,7 +19,7 @@ from aiohttp.web_runner import AppRunner, TCPSite, UnixSite
 from knot_resolver_manager import log, statistics
 from knot_resolver_manager.compat import asyncio as asyncio_compat
 from knot_resolver_manager.config_store import ConfigStore
-from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE, PID_FILE_NAME
+from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE, PID_FILE_NAME, init_user_constants
 from knot_resolver_manager.datamodel.config_schema import KresConfig
 from knot_resolver_manager.datamodel.server_schema import ManagementSchema
 from knot_resolver_manager.exceptions import DataException, KresManagerException, SchemaException
@@ -305,6 +305,7 @@ async def _load_config(config: ParsedTree) -> KresConfig:
 async def _init_config_store(config: ParsedTree) -> ConfigStore:
     config_validated = await _load_config(config)
     config_store = ConfigStore(config_validated)
+    await init_user_constants(config_store)
     return config_store
 
 
@@ -354,9 +355,6 @@ def _lock_working_directory(attempt: int = 0) -> None:
                 os.kill(pid, 0)
             except OSError as e:
                 if e.errno == errno.ESRCH:
-                    logger.warning(
-                        "Detected old lock file in the working directory, previous instance of the manager must have crashed."
-                    )
                     os.unlink(PID_FILE_NAME)
                     _lock_working_directory(attempt=attempt + 1)
                     return
index 4ca2f26418ba19ab2cd440e6675c5edbfa1dd9a2..7276c65bb153ae590830082ad2c7210dafb9e8b2 100644 (file)
@@ -193,7 +193,9 @@ def _validated_tuple(cls: Type[Any], obj: Tuple[Any, ...], object_path: str) ->
             res.append(_validated_object_type(tp, val, object_path=f"{object_path}[{i}]"))
         except SchemaException as e:
             errs.append(e)
-    if len(errs) > 0:
+    if len(errs) == 1:
+        raise errs[0]
+    elif len(errs) > 1:
         raise AggregateSchemaException(object_path, child_exceptions=errs)
     return tuple(res)
 
@@ -210,7 +212,9 @@ def _validated_dict(cls: Type[Any], obj: Dict[Any, Any], object_path: str) -> Di
                 res[nkey] = nval
             except SchemaException as e:
                 errs.append(e)
-        if len(errs) > 0:
+        if len(errs) == 1:
+            raise errs[0]
+        elif len(errs) > 1:
             raise AggregateSchemaException(object_path, child_exceptions=errs)
         return res
     except AttributeError as e:
@@ -228,7 +232,9 @@ def _validated_list(cls: Type[Any], obj: List[Any], object_path: str) -> List[An
             res.append(_validated_object_type(inner_type, val, object_path=f"{object_path}[{i}]"))
         except SchemaException as e:
             errs.append(e)
-    if len(errs) > 0:
+    if len(errs) == 1:
+        raise errs[0]
+    elif len(errs) > 1:
         raise AggregateSchemaException(object_path, child_exceptions=errs)
     return res
 
index e55777cf5141d7d65b392b8d3ab8362eee423adb..bf2c7a4839332a2fc01ac36064ae72a1c224e2b7 100644 (file)
@@ -10,26 +10,26 @@ from knot_resolver_manager.utils.modelling import SchemaNode
 
 
 def test_dns64_true():
-    config = KresConfig({"dns64": True})
+    config = KresConfig({"id": "test", "dns64": True})
 
     assert config.dns64
     assert config.dns64.prefix == IPv6Network96("64:ff9b::/96")
 
 
 def test_dns64_default_false():
-    config = KresConfig()
+    config = KresConfig.test_instance()
 
     assert config.dns64 == False
 
 
 def test_dnssec_false():
-    config = KresConfig({"dnssec": False})
+    config = KresConfig({"id": "test", "dnssec": False})
 
     assert config.dnssec == False
 
 
 def test_dnssec_default_true():
-    config = KresConfig()
+    config = KresConfig.test_instance()
 
     assert config.dnssec.trust_anchor_sentinel == True
     assert config.dnssec.trust_anchor_signal_query == True
index 8feaa73659d62af9d3c415842fffafc134ec165b..fcceeddb017a95dfa678f61acdfc3f257a03d5bf 100644 (file)
@@ -9,22 +9,22 @@ async def test_only_once():
     count = 0
 
     @only_on_real_changes(lambda config: config.logging.level)
-    async def change_callback(config: KresConfig):
+    async def change_callback(config: KresConfig) -> None:
         nonlocal count
         count += 1
 
-    config = KresConfig()
+    config = KresConfig.test_instance()
     store = ConfigStore(config)
 
     await store.register_on_change_callback(change_callback)
     assert count == 1
 
-    config = KresConfig()
+    config = KresConfig.test_instance()
     config.logging.level = "crit"
     await store.update(config)
     assert count == 2
 
-    config = KresConfig()
+    config = KresConfig.test_instance()
     config.lua.script_only = True
     config.lua.script = "meaningless value"
     await store.update(config)