From: Oto Šťáva Date: Tue, 22 Aug 2023 07:55:03 +0000 (+0200) Subject: manager: add option to list PIDs X-Git-Tag: v6.0.10~9^2~8 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2b86418ba49a56f5063337e1e6cde2ee54abf73a;p=thirdparty%2Fknot-resolver.git manager: add option to list PIDs This commit is the groundwork for adding support for debugging via GDB (or another compatible debugger). This way, we can retrieve the subprocesses' PIDs and pass those to the debugger. --- diff --git a/python/knot_resolver/client/commands/pids.py b/python/knot_resolver/client/commands/pids.py new file mode 100644 index 000000000..c14c734e8 --- /dev/null +++ b/python/knot_resolver/client/commands/pids.py @@ -0,0 +1,52 @@ +import argparse +import json +import sys +from typing import Iterable, List, Optional, Tuple, Type + +from knot_resolver.client.command import Command, CommandArgs, CompWords, register_command +from knot_resolver.utils.requests import request + +PIDS_TYPE = Iterable + + +@register_command +class PidsCommand(Command): + def __init__(self, namespace: argparse.Namespace) -> None: + self.proc_type: Optional[str] = namespace.proc_type + + super().__init__(namespace) + + @staticmethod + def register_args_subparser( + subparser: "argparse._SubParsersAction[argparse.ArgumentParser]", + ) -> Tuple[argparse.ArgumentParser, "Type[Command]"]: + pids = subparser.add_parser("pids", help="list the PIDs of kresd manager subprocesses") + pids.add_argument( + "proc_type", + help="Optional, the type of process to query. May be 'kresd', 'gc', or 'all' (default).", + nargs="?", + default="all", + ) + return pids, PidsCommand + + @staticmethod + def completion(args: List[str], parser: argparse.ArgumentParser) -> CompWords: + return {} + + def run(self, args: CommandArgs) -> None: + response = request(args.socket, "GET", f"pids/{self.proc_type}") + + if response.status == 200: + pids = json.loads(response.body) + if isinstance(pids, PIDS_TYPE): + for pid in pids: + print(pid) + else: + print( + f"Unexpected response type '{type(pids).__name__}' from manager. Expected '{PIDS_TYPE.__name__}'", + file=sys.stderr, + ) + sys.exit(1) + else: + print(response, file=sys.stderr) + sys.exit(1) diff --git a/python/knot_resolver/controller/interface.py b/python/knot_resolver/controller/interface.py index 43c24257e..49808d01a 100644 --- a/python/knot_resolver/controller/interface.py +++ b/python/knot_resolver/controller/interface.py @@ -109,6 +109,7 @@ class Subprocess(ABC): self._id = kresid self._config = config self._registered_worker: bool = False + self._pid: Optional[int] = None self._config_file: Optional[Path] = None if self.type is SubprocessType.KRESD: @@ -189,6 +190,10 @@ class Subprocess(ABC): async def _restart(self) -> None: pass + @abstractmethod + async def get_pid(self) -> int: + pass + @abstractmethod def status(self) -> SubprocessStatus: pass diff --git a/python/knot_resolver/controller/supervisord/__init__.py b/python/knot_resolver/controller/supervisord/__init__.py index 347ac1e71..ddb9b29b1 100644 --- a/python/knot_resolver/controller/supervisord/__init__.py +++ b/python/knot_resolver/controller/supervisord/__init__.py @@ -223,6 +223,14 @@ class SupervisordSubprocess(Subprocess): fast = _create_fast_proxy(self._config) fast.startProcess(self.name) + @async_in_a_thread + def get_pid(self) -> int: + if self._pid is None: + supervisord = _create_supervisord_proxy(self._config) + info = supervisord.getProcessInfo(self.name) + self._pid = info["pid"] + return self._pid + def get_used_config(self) -> KresConfig: return self._config diff --git a/python/knot_resolver/manager/manager.py b/python/knot_resolver/manager/manager.py index f9c687087..aef7ae855 100644 --- a/python/knot_resolver/manager/manager.py +++ b/python/knot_resolver/manager/manager.py @@ -63,7 +63,7 @@ class KresManager: # pylint: disable=too-many-instance-attributes Instantiate with `KresManager.create()`, not with the usual constructor! """ - def __init__(self, shutdown_trigger: Callable[[int], None], _i_know_what_i_am_doing: bool = False): + def __init__(self, _i_know_what_i_am_doing: bool = False): if not _i_know_what_i_am_doing: logger.error( "Trying to create an instance of KresManager using normal constructor. Please use " @@ -80,19 +80,18 @@ class KresManager: # pylint: disable=too-many-instance-attributes self._watchdog_task: Optional["asyncio.Task[None]"] = None self._fix_counter: _FixCounter = _FixCounter() self._config_store: ConfigStore - self._shutdown_trigger: Callable[[int], None] = shutdown_trigger + self._shutdown_triggers: List[Callable[[int], None]] = [] @staticmethod async def create( subprocess_controller: SubprocessController, config_store: ConfigStore, - shutdown_trigger: Callable[[int], None], ) -> "KresManager": """ Creates new instance of KresManager. """ - inst = KresManager(shutdown_trigger, _i_know_what_i_am_doing=True) + inst = KresManager(_i_know_what_i_am_doing=True) await inst._async_init(subprocess_controller, config_store) # noqa: SLF001 return inst @@ -211,6 +210,9 @@ class KresManager: # pylint: disable=too-many-instance-attributes await self._gc.stop() self._gc = None + def add_shutdown_trigger(self, trigger: Callable[[int], None]) -> None: + self._shutdown_triggers.append(trigger) + async def validate_config(self, _old: KresConfig, new: KresConfig) -> Result[NoneType, str]: async with self._manager_lock: if _old.rate_limiting != new.rate_limiting: @@ -233,6 +235,10 @@ class KresManager: # pylint: disable=too-many-instance-attributes logger.debug("Canary process test passed.") return Result.ok(None) + async def get_pids(self, proc_type: Optional[SubprocessType]) -> List[int]: + processes = await self._controller.get_all_running_instances() + return [await pr.get_pid() for pr in processes if proc_type is None or pr.type == proc_type] + async def _reload_system_state(self) -> None: async with self._manager_lock: self._workers = [] @@ -338,7 +344,8 @@ class KresManager: # pylint: disable=too-many-instance-attributes logger.warning("Collecting all remaining workers...") await self._reload_system_state() logger.warning("Terminating...") - self._shutdown_trigger(1) + for trigger in self._shutdown_triggers: + trigger(1) async def _instability_handler(self) -> None: if self._fix_counter.is_too_high(): diff --git a/python/knot_resolver/manager/server.py b/python/knot_resolver/manager/server.py index b09ff7b99..5e0e484ed 100644 --- a/python/knot_resolver/manager/server.py +++ b/python/knot_resolver/manager/server.py @@ -21,6 +21,7 @@ from aiohttp.web_runner import AppRunner, TCPSite, UnixSite from knot_resolver.constants import CONFIG_FILE, USER from knot_resolver.controller import get_best_controller_implementation from knot_resolver.controller.exceptions import SubprocessControllerExecError +from knot_resolver.controller.interface import SubprocessType from knot_resolver.controller.registered_workers import command_single_registered_worker from knot_resolver.datamodel import kres_config_json_schema from knot_resolver.datamodel.cache_schema import CacheClearRPCSchema @@ -87,7 +88,7 @@ class Server: # This is top-level class containing pretty much everything. Instead of global # variables, we use instance attributes. That's why there are so many and it's # ok. - def __init__(self, store: ConfigStore, config_path: Optional[Path]): + def __init__(self, store: ConfigStore, config_path: Optional[Path], manager: KresManager): # config store & server dynamic reconfiguration self.config_store = store @@ -100,6 +101,7 @@ class Server: self._config_path: Optional[Path] = config_path self._exit_code: int = 0 self._shutdown_event = asyncio.Event() + self._manager = manager async def _reconfigure(self, config: KresConfig) -> None: await self._reconfigure_listen_address(config) @@ -323,6 +325,30 @@ class Server: await self._reload_config() return web.Response(text="Reloading...") + async def _handler_pids(self, request: web.Request) -> web.Response: + """ + Route handler for listing PIDs of subprocesses + """ + + proc_type: Optional[SubprocessType] = None + + if "path" in request.match_info and len(request.match_info["path"]) > 0: + ptstr = request.match_info["path"] + if ptstr == "/kresd": + proc_type = SubprocessType.KRESD + elif ptstr == "/gc": + proc_type = SubprocessType.GC + elif ptstr == "/all": + proc_type = None + else: + return web.Response(text=f"Invalid process type '{ptstr}'", status=400) + + return web.json_response( + await self._manager.get_pids(proc_type), + headers={"Access-Control-Allow-Origin": "*"}, + dumps=partial(json.dumps, indent=4), + ) + def _setup_routes(self) -> None: self.app.add_routes( [ @@ -339,6 +365,7 @@ class Server: web.get("/metrics/json", self._handler_metrics_json), web.get("/metrics/prometheus", self._handler_metrics_prometheus), web.post("/cache/clear", self._handler_cache_clear), + web.get("/pids{path:.*}", self._handler_pids), ] ) @@ -410,7 +437,7 @@ async def _init_config_store(config: Dict[str, Any]) -> ConfigStore: return ConfigStore(config_validated) -async def _init_manager(config_store: ConfigStore, server: Server) -> KresManager: +async def _init_manager(config_store: ConfigStore) -> KresManager: """ Called asynchronously when the application initializes. """ @@ -420,7 +447,7 @@ async def _init_manager(config_store: ConfigStore, server: Server) -> KresManage # Create KresManager. This will perform autodetection of available service managers and # select the most appropriate to use (or use the one configured directly) - manager = await KresManager.create(controller, config_store, server.trigger_shutdown) + manager = await KresManager.create(controller, config_store) logger.info("Initial configuration applied. Process manager initialized...") return manager @@ -559,11 +586,14 @@ async def start_server(config: Path = CONFIG_FILE) -> int: # noqa: PLR0915 await files.init_files_watchdog(config_store) + # After we have loaded the configuration, we can start worrying about subprocess management. + manager = await _init_manager(config_store) + # prepare instance of the server (no side effects) - server = Server(config_store, config) + server = Server(config_store, config, manager) - # After we have loaded the configuration, we can start worring about subprocess management. - manager = await _init_manager(config_store, server) + # add Server's shutdown trigger to the manager + manager.add_shutdown_trigger(server.trigger_shutdown) except SubprocessControllerExecError as e: # if we caught this exception, some component wants to perform a reexec during startup. Most likely, it would