]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager: add option to list PIDs
authorOto Šťáva <oto.stava@nic.cz>
Tue, 22 Aug 2023 07:55:03 +0000 (09:55 +0200)
committerAleš Mrázek <ales.mrazek@nic.cz>
Tue, 3 Dec 2024 10:44:41 +0000 (11:44 +0100)
This commit is the groundwork for adding support for debugging via GDB
(or another compatible debugger). This way, we can retrieve the
subprocesses' PIDs and pass those to the debugger.

python/knot_resolver/client/commands/pids.py [new file with mode: 0644]
python/knot_resolver/controller/interface.py
python/knot_resolver/controller/supervisord/__init__.py
python/knot_resolver/manager/manager.py
python/knot_resolver/manager/server.py

diff --git a/python/knot_resolver/client/commands/pids.py b/python/knot_resolver/client/commands/pids.py
new file mode 100644 (file)
index 0000000..c14c734
--- /dev/null
@@ -0,0 +1,52 @@
+import argparse
+import json
+import sys
+from typing import Iterable, List, Optional, Tuple, Type
+
+from knot_resolver.client.command import Command, CommandArgs, CompWords, register_command
+from knot_resolver.utils.requests import request
+
+PIDS_TYPE = Iterable
+
+
+@register_command
+class PidsCommand(Command):
+    def __init__(self, namespace: argparse.Namespace) -> None:
+        self.proc_type: Optional[str] = namespace.proc_type
+
+        super().__init__(namespace)
+
+    @staticmethod
+    def register_args_subparser(
+        subparser: "argparse._SubParsersAction[argparse.ArgumentParser]",
+    ) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
+        pids = subparser.add_parser("pids", help="list the PIDs of kresd manager subprocesses")
+        pids.add_argument(
+            "proc_type",
+            help="Optional, the type of process to query. May be 'kresd', 'gc', or 'all' (default).",
+            nargs="?",
+            default="all",
+        )
+        return pids, PidsCommand
+
+    @staticmethod
+    def completion(args: List[str], parser: argparse.ArgumentParser) -> CompWords:
+        return {}
+
+    def run(self, args: CommandArgs) -> None:
+        response = request(args.socket, "GET", f"pids/{self.proc_type}")
+
+        if response.status == 200:
+            pids = json.loads(response.body)
+            if isinstance(pids, PIDS_TYPE):
+                for pid in pids:
+                    print(pid)
+            else:
+                print(
+                    f"Unexpected response type '{type(pids).__name__}' from manager. Expected '{PIDS_TYPE.__name__}'",
+                    file=sys.stderr,
+                )
+                sys.exit(1)
+        else:
+            print(response, file=sys.stderr)
+            sys.exit(1)
index 43c24257e866f47aa7f5c2a0f854ff5e7c3bd6b6..49808d01ac1befb360646d0282a43c97f34105b3 100644 (file)
@@ -109,6 +109,7 @@ class Subprocess(ABC):
         self._id = kresid
         self._config = config
         self._registered_worker: bool = False
+        self._pid: Optional[int] = None
 
         self._config_file: Optional[Path] = None
         if self.type is SubprocessType.KRESD:
@@ -189,6 +190,10 @@ class Subprocess(ABC):
     async def _restart(self) -> None:
         pass
 
+    @abstractmethod
+    async def get_pid(self) -> int:
+        pass
+
     @abstractmethod
     def status(self) -> SubprocessStatus:
         pass
index 347ac1e717809667302edbc0a76c976e626fa967..ddb9b29b1265b9fd91883963217e45c171094b7e 100644 (file)
@@ -223,6 +223,14 @@ class SupervisordSubprocess(Subprocess):
         fast = _create_fast_proxy(self._config)
         fast.startProcess(self.name)
 
+    @async_in_a_thread
+    def get_pid(self) -> int:
+        if self._pid is None:
+            supervisord = _create_supervisord_proxy(self._config)
+            info = supervisord.getProcessInfo(self.name)
+            self._pid = info["pid"]
+        return self._pid
+
     def get_used_config(self) -> KresConfig:
         return self._config
 
index f9c68708769d91884e3c07855b13f227a58e563f..aef7ae85571cdf388fad9d35418b0b809bb8ce7d 100644 (file)
@@ -63,7 +63,7 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
     Instantiate with `KresManager.create()`, not with the usual constructor!
     """
 
-    def __init__(self, shutdown_trigger: Callable[[int], None], _i_know_what_i_am_doing: bool = False):
+    def __init__(self, _i_know_what_i_am_doing: bool = False):
         if not _i_know_what_i_am_doing:
             logger.error(
                 "Trying to create an instance of KresManager using normal constructor. Please use "
@@ -80,19 +80,18 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
         self._watchdog_task: Optional["asyncio.Task[None]"] = None
         self._fix_counter: _FixCounter = _FixCounter()
         self._config_store: ConfigStore
-        self._shutdown_trigger: Callable[[int], None] = shutdown_trigger
+        self._shutdown_triggers: List[Callable[[int], None]] = []
 
     @staticmethod
     async def create(
         subprocess_controller: SubprocessController,
         config_store: ConfigStore,
-        shutdown_trigger: Callable[[int], None],
     ) -> "KresManager":
         """
         Creates new instance of KresManager.
         """
 
-        inst = KresManager(shutdown_trigger, _i_know_what_i_am_doing=True)
+        inst = KresManager(_i_know_what_i_am_doing=True)
         await inst._async_init(subprocess_controller, config_store)  # noqa: SLF001
         return inst
 
@@ -211,6 +210,9 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
         await self._gc.stop()
         self._gc = None
 
+    def add_shutdown_trigger(self, trigger: Callable[[int], None]) -> None:
+        self._shutdown_triggers.append(trigger)
+
     async def validate_config(self, _old: KresConfig, new: KresConfig) -> Result[NoneType, str]:
         async with self._manager_lock:
             if _old.rate_limiting != new.rate_limiting:
@@ -233,6 +235,10 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
             logger.debug("Canary process test passed.")
             return Result.ok(None)
 
+    async def get_pids(self, proc_type: Optional[SubprocessType]) -> List[int]:
+        processes = await self._controller.get_all_running_instances()
+        return [await pr.get_pid() for pr in processes if proc_type is None or pr.type == proc_type]
+
     async def _reload_system_state(self) -> None:
         async with self._manager_lock:
             self._workers = []
@@ -338,7 +344,8 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
         logger.warning("Collecting all remaining workers...")
         await self._reload_system_state()
         logger.warning("Terminating...")
-        self._shutdown_trigger(1)
+        for trigger in self._shutdown_triggers:
+            trigger(1)
 
     async def _instability_handler(self) -> None:
         if self._fix_counter.is_too_high():
index b09ff7b99a234440d43bc68535c726579280e9ad..5e0e484edd55c6b8db0b1a66628ba00f9474d781 100644 (file)
@@ -21,6 +21,7 @@ from aiohttp.web_runner import AppRunner, TCPSite, UnixSite
 from knot_resolver.constants import CONFIG_FILE, USER
 from knot_resolver.controller import get_best_controller_implementation
 from knot_resolver.controller.exceptions import SubprocessControllerExecError
+from knot_resolver.controller.interface import SubprocessType
 from knot_resolver.controller.registered_workers import command_single_registered_worker
 from knot_resolver.datamodel import kres_config_json_schema
 from knot_resolver.datamodel.cache_schema import CacheClearRPCSchema
@@ -87,7 +88,7 @@ class Server:
     # This is top-level class containing pretty much everything. Instead of global
     # variables, we use instance attributes. That's why there are so many and it's
     # ok.
-    def __init__(self, store: ConfigStore, config_path: Optional[Path]):
+    def __init__(self, store: ConfigStore, config_path: Optional[Path], manager: KresManager):
         # config store & server dynamic reconfiguration
         self.config_store = store
 
@@ -100,6 +101,7 @@ class Server:
         self._config_path: Optional[Path] = config_path
         self._exit_code: int = 0
         self._shutdown_event = asyncio.Event()
+        self._manager = manager
 
     async def _reconfigure(self, config: KresConfig) -> None:
         await self._reconfigure_listen_address(config)
@@ -323,6 +325,30 @@ class Server:
         await self._reload_config()
         return web.Response(text="Reloading...")
 
+    async def _handler_pids(self, request: web.Request) -> web.Response:
+        """
+        Route handler for listing PIDs of subprocesses
+        """
+
+        proc_type: Optional[SubprocessType] = None
+
+        if "path" in request.match_info and len(request.match_info["path"]) > 0:
+            ptstr = request.match_info["path"]
+            if ptstr == "/kresd":
+                proc_type = SubprocessType.KRESD
+            elif ptstr == "/gc":
+                proc_type = SubprocessType.GC
+            elif ptstr == "/all":
+                proc_type = None
+            else:
+                return web.Response(text=f"Invalid process type '{ptstr}'", status=400)
+
+        return web.json_response(
+            await self._manager.get_pids(proc_type),
+            headers={"Access-Control-Allow-Origin": "*"},
+            dumps=partial(json.dumps, indent=4),
+        )
+
     def _setup_routes(self) -> None:
         self.app.add_routes(
             [
@@ -339,6 +365,7 @@ class Server:
                 web.get("/metrics/json", self._handler_metrics_json),
                 web.get("/metrics/prometheus", self._handler_metrics_prometheus),
                 web.post("/cache/clear", self._handler_cache_clear),
+                web.get("/pids{path:.*}", self._handler_pids),
             ]
         )
 
@@ -410,7 +437,7 @@ async def _init_config_store(config: Dict[str, Any]) -> ConfigStore:
     return ConfigStore(config_validated)
 
 
-async def _init_manager(config_store: ConfigStore, server: Server) -> KresManager:
+async def _init_manager(config_store: ConfigStore) -> KresManager:
     """
     Called asynchronously when the application initializes.
     """
@@ -420,7 +447,7 @@ async def _init_manager(config_store: ConfigStore, server: Server) -> KresManage
 
     # Create KresManager. This will perform autodetection of available service managers and
     # select the most appropriate to use (or use the one configured directly)
-    manager = await KresManager.create(controller, config_store, server.trigger_shutdown)
+    manager = await KresManager.create(controller, config_store)
 
     logger.info("Initial configuration applied. Process manager initialized...")
     return manager
@@ -559,11 +586,14 @@ async def start_server(config: Path = CONFIG_FILE) -> int:  # noqa: PLR0915
 
         await files.init_files_watchdog(config_store)
 
+        # After we have loaded the configuration, we can start worrying about subprocess management.
+        manager = await _init_manager(config_store)
+
         # prepare instance of the server (no side effects)
-        server = Server(config_store, config)
+        server = Server(config_store, config, manager)
 
-        # After we have loaded the configuration, we can start worring about subprocess management.
-        manager = await _init_manager(config_store, server)
+        # add Server's shutdown trigger to the manager
+        manager.add_shutdown_trigger(server.trigger_shutdown)
 
     except SubprocessControllerExecError as e:
         # if we caught this exception, some component wants to perform a reexec during startup. Most likely, it would