--- /dev/null
+import argparse
+import json
+import sys
+from typing import Iterable, List, Optional, Tuple, Type
+
+from knot_resolver.client.command import Command, CommandArgs, CompWords, register_command
+from knot_resolver.utils.requests import request
+
+PIDS_TYPE = Iterable
+
+
+@register_command
+class PidsCommand(Command):
+ def __init__(self, namespace: argparse.Namespace) -> None:
+ self.proc_type: Optional[str] = namespace.proc_type
+
+ super().__init__(namespace)
+
+ @staticmethod
+ def register_args_subparser(
+ subparser: "argparse._SubParsersAction[argparse.ArgumentParser]",
+ ) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
+ pids = subparser.add_parser("pids", help="list the PIDs of kresd manager subprocesses")
+ pids.add_argument(
+ "proc_type",
+ help="Optional, the type of process to query. May be 'kresd', 'gc', or 'all' (default).",
+ nargs="?",
+ default="all",
+ )
+ return pids, PidsCommand
+
+ @staticmethod
+ def completion(args: List[str], parser: argparse.ArgumentParser) -> CompWords:
+ return {}
+
+ def run(self, args: CommandArgs) -> None:
+ response = request(args.socket, "GET", f"pids/{self.proc_type}")
+
+ if response.status == 200:
+ pids = json.loads(response.body)
+ if isinstance(pids, PIDS_TYPE):
+ for pid in pids:
+ print(pid)
+ else:
+ print(
+ f"Unexpected response type '{type(pids).__name__}' from manager. Expected '{PIDS_TYPE.__name__}'",
+ file=sys.stderr,
+ )
+ sys.exit(1)
+ else:
+ print(response, file=sys.stderr)
+ sys.exit(1)
self._id = kresid
self._config = config
self._registered_worker: bool = False
+ self._pid: Optional[int] = None
self._config_file: Optional[Path] = None
if self.type is SubprocessType.KRESD:
async def _restart(self) -> None:
pass
+ @abstractmethod
+ async def get_pid(self) -> int:
+ pass
+
@abstractmethod
def status(self) -> SubprocessStatus:
pass
fast = _create_fast_proxy(self._config)
fast.startProcess(self.name)
+ @async_in_a_thread
+ def get_pid(self) -> int:
+ if self._pid is None:
+ supervisord = _create_supervisord_proxy(self._config)
+ info = supervisord.getProcessInfo(self.name)
+ self._pid = info["pid"]
+ return self._pid
+
def get_used_config(self) -> KresConfig:
return self._config
Instantiate with `KresManager.create()`, not with the usual constructor!
"""
- def __init__(self, shutdown_trigger: Callable[[int], None], _i_know_what_i_am_doing: bool = False):
+ def __init__(self, _i_know_what_i_am_doing: bool = False):
if not _i_know_what_i_am_doing:
logger.error(
"Trying to create an instance of KresManager using normal constructor. Please use "
self._watchdog_task: Optional["asyncio.Task[None]"] = None
self._fix_counter: _FixCounter = _FixCounter()
self._config_store: ConfigStore
- self._shutdown_trigger: Callable[[int], None] = shutdown_trigger
+ self._shutdown_triggers: List[Callable[[int], None]] = []
@staticmethod
async def create(
subprocess_controller: SubprocessController,
config_store: ConfigStore,
- shutdown_trigger: Callable[[int], None],
) -> "KresManager":
"""
Creates new instance of KresManager.
"""
- inst = KresManager(shutdown_trigger, _i_know_what_i_am_doing=True)
+ inst = KresManager(_i_know_what_i_am_doing=True)
await inst._async_init(subprocess_controller, config_store) # noqa: SLF001
return inst
await self._gc.stop()
self._gc = None
+ def add_shutdown_trigger(self, trigger: Callable[[int], None]) -> None:
+ self._shutdown_triggers.append(trigger)
+
async def validate_config(self, _old: KresConfig, new: KresConfig) -> Result[NoneType, str]:
async with self._manager_lock:
if _old.rate_limiting != new.rate_limiting:
logger.debug("Canary process test passed.")
return Result.ok(None)
+ async def get_pids(self, proc_type: Optional[SubprocessType]) -> List[int]:
+ processes = await self._controller.get_all_running_instances()
+ return [await pr.get_pid() for pr in processes if proc_type is None or pr.type == proc_type]
+
async def _reload_system_state(self) -> None:
async with self._manager_lock:
self._workers = []
logger.warning("Collecting all remaining workers...")
await self._reload_system_state()
logger.warning("Terminating...")
- self._shutdown_trigger(1)
+ for trigger in self._shutdown_triggers:
+ trigger(1)
async def _instability_handler(self) -> None:
if self._fix_counter.is_too_high():
from knot_resolver.constants import CONFIG_FILE, USER
from knot_resolver.controller import get_best_controller_implementation
from knot_resolver.controller.exceptions import SubprocessControllerExecError
+from knot_resolver.controller.interface import SubprocessType
from knot_resolver.controller.registered_workers import command_single_registered_worker
from knot_resolver.datamodel import kres_config_json_schema
from knot_resolver.datamodel.cache_schema import CacheClearRPCSchema
# This is top-level class containing pretty much everything. Instead of global
# variables, we use instance attributes. That's why there are so many and it's
# ok.
- def __init__(self, store: ConfigStore, config_path: Optional[Path]):
+ def __init__(self, store: ConfigStore, config_path: Optional[Path], manager: KresManager):
# config store & server dynamic reconfiguration
self.config_store = store
self._config_path: Optional[Path] = config_path
self._exit_code: int = 0
self._shutdown_event = asyncio.Event()
+ self._manager = manager
async def _reconfigure(self, config: KresConfig) -> None:
await self._reconfigure_listen_address(config)
await self._reload_config()
return web.Response(text="Reloading...")
+ async def _handler_pids(self, request: web.Request) -> web.Response:
+ """
+ Route handler for listing PIDs of subprocesses
+ """
+
+ proc_type: Optional[SubprocessType] = None
+
+ if "path" in request.match_info and len(request.match_info["path"]) > 0:
+ ptstr = request.match_info["path"]
+ if ptstr == "/kresd":
+ proc_type = SubprocessType.KRESD
+ elif ptstr == "/gc":
+ proc_type = SubprocessType.GC
+ elif ptstr == "/all":
+ proc_type = None
+ else:
+ return web.Response(text=f"Invalid process type '{ptstr}'", status=400)
+
+ return web.json_response(
+ await self._manager.get_pids(proc_type),
+ headers={"Access-Control-Allow-Origin": "*"},
+ dumps=partial(json.dumps, indent=4),
+ )
+
def _setup_routes(self) -> None:
self.app.add_routes(
[
web.get("/metrics/json", self._handler_metrics_json),
web.get("/metrics/prometheus", self._handler_metrics_prometheus),
web.post("/cache/clear", self._handler_cache_clear),
+ web.get("/pids{path:.*}", self._handler_pids),
]
)
return ConfigStore(config_validated)
-async def _init_manager(config_store: ConfigStore, server: Server) -> KresManager:
+async def _init_manager(config_store: ConfigStore) -> KresManager:
"""
Called asynchronously when the application initializes.
"""
# Create KresManager. This will perform autodetection of available service managers and
# select the most appropriate to use (or use the one configured directly)
- manager = await KresManager.create(controller, config_store, server.trigger_shutdown)
+ manager = await KresManager.create(controller, config_store)
logger.info("Initial configuration applied. Process manager initialized...")
return manager
await files.init_files_watchdog(config_store)
+ # After we have loaded the configuration, we can start worrying about subprocess management.
+ manager = await _init_manager(config_store)
+
# prepare instance of the server (no side effects)
- server = Server(config_store, config)
+ server = Server(config_store, config, manager)
- # After we have loaded the configuration, we can start worring about subprocess management.
- manager = await _init_manager(config_store, server)
+ # add Server's shutdown trigger to the manager
+ manager.add_shutdown_trigger(server.trigger_shutdown)
except SubprocessControllerExecError as e:
# if we caught this exception, some component wants to perform a reexec during startup. Most likely, it would