import logging
+import sys
from pathlib import Path
from typing import List, Optional, Tuple
from knot_resolver_manager import compat
from knot_resolver_manager.constants import LISTEN_SOCKET_PATH, LOG_LEVEL, MANAGER_CONFIG_FILE
+from knot_resolver_manager.kresd_controller import list_controller_names
from knot_resolver_manager.server import start_server
from knot_resolver_manager.utils import ignore_exceptions_optional
default=None,
help="Overrides default config location at '" + str(MANAGER_CONFIG_FILE) + "'",
)
-def main(listen: Optional[str], config: Optional[str]):
+@click.option(
+ "--backend",
+ "-b",
+ type=str,
+ nargs=1,
+ required=False,
+ default=None,
+ help="Use specified subprocess controller, default auto detection",
+)
+@click.option("--list-backends", "-l", type=bool, is_flag=True, default=False)
+def main(listen: Optional[str], config: Optional[str], backend: Optional[str], list_backends: bool):
# pylint: disable=expression-not-assigned
"""Knot Resolver Manager
MANAGER_CONFIG_FILE
)
+ # print list of backends and exit (if specified)
+ if list_backends:
+ click.echo("Available subprocess controllers are:")
+ for n in list_controller_names():
+ click.echo(f" - {n}")
+ sys.exit(0)
+
+ # determine where should the manager listen based on the given argument
tcp: List[Tuple[str, int]] = []
unix: List[Path] = []
-
if listen is None:
unix.append(LISTEN_SOCKET_PATH)
else:
else:
unix.append(Path(listen))
+ # where to look for config
config_path = MANAGER_CONFIG_FILE if config is None else Path(config)
- compat.asyncio.run(start_server(tcp=tcp, unix=unix, config=config_path))
+ compat.asyncio.run(start_server(tcp=tcp, unix=unix, config=config_path, subprocess_controller_name=backend))
if __name__ == "__main__":
"""
@classmethod
- async def create(cls: Type["KresManager"]) -> "KresManager":
+ async def create(cls: Type["KresManager"], controller: Optional[SubprocessController]) -> "KresManager":
obj = cls()
- await obj._async_init() # pylint: disable=protected-access
+ await obj._async_init(controller) # pylint: disable=protected-access
return obj
- async def _async_init(self):
- self._controller = await get_best_controller_implementation()
+ async def _async_init(self, selected_controller: Optional[SubprocessController]):
+ if selected_controller is None:
+ self._controller = await get_best_controller_implementation()
+ else:
+ self._controller = selected_controller
await self._controller.initialize_controller()
await self.load_system_state()
import asyncio
import logging
-from typing import List
+from typing import List, Optional
from knot_resolver_manager.kresd_controller.interface import SubprocessController
raise LookupError("Can't find any available service manager!")
+def list_controller_names() -> List[str]:
+ """
+ Returns a list of names of registered controllers. The listed controllers are not necessarly functional.
+ """
+
+ return [str(controller) for controller in sorted(_registered_controllers, key=str)]
+
+
+async def get_controller_by_name(name: str) -> SubprocessController:
+ logger.debug("Subprocess controller selected manualy by the user, testing feasibility...")
+
+ controller: Optional[SubprocessController] = None
+ for c in sorted(_registered_controllers, key=str):
+ if str(c).startswith(name):
+ if str(c) != name:
+ logger.debug("Assuming '%s' is a shortcut for '%s'", name, str(c))
+ controller = c
+ break
+
+ if controller is None:
+ logger.error("Subprocess controller with name '%s' was not found", name)
+ raise LookupError(f"No subprocess controller named '{name}' found")
+
+ if await controller.is_controller_available():
+ logger.info("Selected controller '%s'", str(controller))
+ return controller
+ else:
+ raise LookupError("The selected subprocess controller is not available for use on this system.")
+
+
# run the imports on module load
try_systemd()
try_supervisord()
self._watchdog_task: "Future[Any]"
def __str__(self):
- return type(self).__name__
+ return "supervisord"
def should_be_running(self, subprocess: SupervisordSubprocess):
return subprocess in self._running_instances
def __str__(self):
if self._systemd_type == systemd.SystemdType.SESSION:
if self._persistance_type is SystemdPersistanceType.TRANSIENT:
- return "SystemdController(SESSION, TRANSIENT)"
+ return "systemd-session-transient"
else:
- return "SystemdController(SESSION)"
+ return "systemd-session"
elif self._systemd_type == systemd.SystemdType.SYSTEM:
- return "SystemdController(SYSTEM)"
+ return "systemd"
else:
raise NotImplementedError("unknown systemd type")
from http import HTTPStatus
from pathlib import Path
from time import time
-from typing import Any, List, Tuple, Union
+from typing import Any, List, Optional, Tuple, Union
from aiohttp import web
from aiohttp.web import middleware
from knot_resolver_manager.constants import MANAGER_CONFIG_FILE
from knot_resolver_manager.exceptions import ValidationException
+from knot_resolver_manager.kresd_controller import get_controller_by_name
+from knot_resolver_manager.kresd_controller.interface import SubprocessController
from knot_resolver_manager.utils.async_utils import readfile
from knot_resolver_manager.utils.dataclasses_parservalidator import Format
_DEFAULT_SENTINEL = _DefaultSentinel()
-async def _init_manager(config: Union[None, Path, KresConfig, _DefaultSentinel], app: web.Application):
+async def _init_manager(
+ config: Union[None, Path, KresConfig, _DefaultSentinel],
+ subprocess_controller_name: Optional[str],
+ app: web.Application,
+):
"""
Called asynchronously when the application initializes.
"""
try:
+ # if configured, create a subprocess controller manually
+ controller: Optional[SubprocessController] = None
+ if subprocess_controller_name is not None:
+ controller = await get_controller_by_name(subprocess_controller_name)
+
# Create KresManager. This will perform autodetection of available service managers and
- # select the most appropriate to use
- manager = await KresManager.create()
+ # select the most appropriate to use (or use the one configured directly)
+ manager = await KresManager.create(controller)
app[_MANAGER] = manager
# Initial configuration of the manager
tcp: List[Tuple[str, int]],
unix: List[Path],
config: Union[None, Path, KresConfig, _DefaultSentinel] = _DEFAULT_SENTINEL,
+ subprocess_controller_name: Optional[str] = None,
):
start_time = time()
app[_MANAGER] = None
app[_SHUTDOWN_EVENT] = asyncio.Event()
- app.on_startup.append(partial(_init_manager, config))
+ app.on_startup.append(partial(_init_manager, config, subprocess_controller_name))
# configure routing
setup_routes(app)
# settings.json
cat > .vscode/settings.json <<EOF
{
- "python.pythonPath": "$(poetry env info -p)",
+ "python.defaultInterpreterPath": "$(poetry env info -p)",
"python.venvPath": "~/.cache/pypoetry/virtualenvs"
}
EOF
},
"pathMappings": [
{
- "localRoot": "${workspaceFolder}",
+ "localRoot": "\${workspaceFolder}",
"remoteRoot": "."
}
]
echo Knot Manager API is accessible on http://localhost:5000
echo -------------------------------------------------------
-poetry run python -m knot_resolver_manager 5000 # -c config/kres-manager.yaml
\ No newline at end of file
+poetry run python -m knot_resolver_manager 5000 $@
\ No newline at end of file
echo API will be running on port 5000
echo ----------------------------------------
-poetry run python -m debugpy --listen 0.0.0.0:5678 --wait-for-client -m knot_resolver_manager 5000
\ No newline at end of file
+poetry run python -m debugpy --listen 0.0.0.0:5678 --wait-for-client -m knot_resolver_manager 5000 $@
\ No newline at end of file