]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager: supervisord: assert number of workers before starting a new one
authorVasek Sraier <git@vakabus.cz>
Fri, 5 Aug 2022 14:07:47 +0000 (16:07 +0200)
committerVasek Sraier <git@vakabus.cz>
Fri, 5 Aug 2022 14:07:47 +0000 (16:07 +0200)
this change resulted in small refactoring of the ID system

manager/knot_resolver_manager/constants.py
manager/knot_resolver_manager/kres_manager.py
manager/knot_resolver_manager/kresd_controller/interface.py
manager/knot_resolver_manager/kresd_controller/supervisord/__init__.py
manager/knot_resolver_manager/kresd_controller/supervisord/config_file.py

index 8b4b775c4ce81d119071dc72eec0df87da7c69b7..f45c307fdda6d9f82df1138e77544ce2c9c7bbe8 100644 (file)
@@ -30,7 +30,7 @@ def kresd_cache_dir(config: "KresConfig") -> Path:
 
 
 def kresd_config_file(_config: "KresConfig", kres_id: "KresID") -> Path:
-    return Path(f"{kres_id}.conf")
+    return Path(f"kresd{int(kres_id)}.conf")
 
 
 def kresd_config_file_supervisord_pattern(_config: "KresConfig") -> Path:
index ec88b2949d1b3461a0eee6a0fe28a88ef33ac761..36cddc137c5cdcd111a8678c038030273fc08201 100644 (file)
@@ -174,8 +174,8 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
                 #   if it crashes and the startup fails, then well, it's not running anymore... :)
                 await self._spawn_new_worker(new)
             except (SubprocessError, SubprocessControllerException):
-                logger.error("kresd with the new config failed to start, rejecting config")
-                return Result.err("Canary kresd instance failed to start. Config is invalid.")
+                logger.error("Kresd with the new config failed to start, rejecting config")
+                return Result.err("canary kresd process failed to start. Config might be invalid.")
 
             logger.debug("Canary process test passed.")
             return Result.ok(None)
index df5ebd741cb98a32a037b306ed9bd98749776a5f..3db894fcb611ad2be09e08c2c0b97f273b42fe7a 100644 (file)
@@ -28,27 +28,13 @@ class KresID:
     ID object used for identifying subprocesses.
     """
 
-    _used: "WeakValueDictionary[int, KresID]" = WeakValueDictionary()
+    _used: "Dict[SubprocessType, WeakValueDictionary[int, KresID]]" = {k: WeakValueDictionary() for k in SubprocessType}
 
     @classmethod
     def alloc(cls: Type[T], typ: SubprocessType) -> T:
-        # we split them in order to make the numbers nice (no gaps, pretty naming)
-        # there are no strictly technical reasons to do this
-        #
-        # GC - negative IDs
-        # KRESD - positive IDs
-        if typ is SubprocessType.GC:
-            start = -1
-            step = -1
-        elif typ is SubprocessType.KRESD:
-            start = 1
-            step = 1
-        else:
-            raise RuntimeError(f"Unexpected subprocess type {typ}")
-
         # find free ID closest to zero
-        for i in itertools.count(start=start, step=step):
-            if i not in cls._used:
+        for i in itertools.count(start=0, step=1):
+            if i not in cls._used[typ]:
                 res = cls.new(typ, i)
                 return res
 
@@ -56,14 +42,14 @@ class KresID:
 
     @classmethod
     def new(cls: "Type[T]", typ: SubprocessType, n: int) -> "T":
-        if n in cls._used:
+        if n in cls._used[typ]:
             # Ignoring typing here, because I can't find a way how to make the _used dict
             # typed based on subclass. I am not even sure that it's different between subclasses,
             # it's probably still the same dict. But we don't really care about it
-            return cls._used[n]  # type: ignore
+            return cls._used[typ][n]  # type: ignore
         else:
             val = cls(typ, n, _i_know_what_i_am_doing=True)
-            cls._used[n] = val
+            cls._used[typ][n] = val
             return val
 
     def __init__(self, typ: SubprocessType, n: int, _i_know_what_i_am_doing: bool = False):
@@ -101,6 +87,9 @@ class KresID:
         """
         raise NotImplementedError()
 
+    def __int__(self) -> int:
+        return self._id
+
 
 class Subprocess:
     """
index 49e53a631220e473d255b0a6c9ea83e045a1465f..7229254ed6eaecaf21cc38347ae69fc1128d0b4b 100644 (file)
@@ -166,7 +166,11 @@ def _list_running_subprocesses(config: KresConfig) -> Dict[SupervisordKresID, Su
     processes = [pr for pr in processes if pr["name"] != "manager"]
 
     # convert all the names
-    return {SupervisordKresID.from_string(pr["name"]): convert(pr) for pr in processes if pr["statename"] != "STOPPED"}
+    return {
+        SupervisordKresID.from_string(f"{pr['group']}:{pr['name']}"): convert(pr)
+        for pr in processes
+        if pr["statename"] != "STOPPED"
+    }
 
 
 class SupervisordSubprocess(Subprocess):
@@ -182,31 +186,31 @@ class SupervisordSubprocess(Subprocess):
             super().__init__(config, base_id)
         self._controller: "SupervisordSubprocessController" = controller
 
-    def _name(self):
-        if self.type is SubprocessType.GC:
-            return str(self.id)
-        else:
-            return f"kresd:{self.id}"
+    @property
+    def name(self):
+        return str(self.id)
 
     @async_in_a_thread
     def _start(self) -> None:
+        # +1 for canary process (same as in config_file.py)
+        assert int(self.id) <= int(self._config.max_workers) + 1, "trying to spawn more than allowed limit of workers"
         try:
             supervisord = _create_fast_proxy(self._config)
-            supervisord.startProcess(self._name())
+            supervisord.startProcess(self.name)
         except Fault as e:
             raise SubprocessControllerException(f"failed to start '{self.id}'") from e
 
     @async_in_a_thread
     def _stop(self) -> None:
         supervisord = _create_supervisord_proxy(self._config)
-        supervisord.stopProcess(self._name())
+        supervisord.stopProcess(self.name)
 
     @async_in_a_thread
     def _restart(self) -> None:
         supervisord = _create_supervisord_proxy(self._config)
-        supervisord.stopProcess(self._name())
+        supervisord.stopProcess(self.name)
         fast = _create_fast_proxy(self._config)
-        fast.startProcess(self._name())
+        fast.startProcess(self.name)
 
     def get_used_config(self) -> KresConfig:
         return self._config
index 1fd1e1b56a65698c0dbd6f4cf78c199dbddde264..3a0d545424f70596fde7f91bde802c34d61d511d 100644 (file)
@@ -28,17 +28,17 @@ class SupervisordKresID(KresID):
 
     @staticmethod
     def from_string(val: str) -> "SupervisordKresID":
-        if val == "gc":
-            return SupervisordKresID.new(SubprocessType.GC, -1)
+        if val == "gc:gc":
+            return SupervisordKresID.new(SubprocessType.GC, 0)
         else:
-            val = val.replace("kresd", "")
+            val = val.replace("kresd:kresd", "")
             return SupervisordKresID.new(SubprocessType.KRESD, int(val))
 
     def __str__(self) -> str:
         if self.subprocess_type is SubprocessType.GC:
             return "gc"
         elif self.subprocess_type is SubprocessType.KRESD:
-            return f"kresd{self._id}"
+            return f"kresd:kresd{self._id}"
         else:
             raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}")