# when changing this, change the help message in main()
_SOCKET_PATH = "/tmp/manager.sock"
+_MANAGER = "kres_manager"
-async def hello(_request: web.Request) -> web.Response:
- return web.Response(text="Hello, world")
+async def index(_request: web.Request) -> web.Response:
+ return web.Response(text="Knot Resolver Manager is running! The configuration endpoint is at /config")
async def apply_config(request: web.Request) -> web.Response:
+ manager: KresManager = request.app[_MANAGER]
+ if manager is None:
+ # handle the case when the manager is not yet initialized
+ return web.Response(
+ status=503, headers={"Retry-After": "3"}, text="Knot Resolver Manager is not yet fully initialized"
+ )
+
+ # process the request
config = KresConfig.from_json(await request.text())
- manager: KresManager = request.app["kres_manager"]
await manager.apply_config(config)
return web.Response(text="OK")
app = web.Application()
# initialize KresManager
- manager = KresManager()
- app["kres_manager"] = manager
+ app[_MANAGER] = None
async def init_manager(app: web.Application):
- manager = app["kres_manager"]
- await manager.load_system_state()
+ manager = await KresManager.create()
+ app[_MANAGER] = manager
if config is not None:
# TODO Use config loaded from the file system
pass
app.on_startup.append(init_manager)
# configure routing
- app.add_routes([web.get("/", hello), web.post("/config", apply_config)])
+ app.add_routes([web.get("/", index), web.post("/config", apply_config)])
# run forever, listen at the appropriate place
maybe_port = ignore_exceptions(None, ValueError, TypeError)(int)(listen)
import asyncio
-from typing import List, Optional
-from uuid import uuid4
+from typing import Any, List, Type
-from . import compat, configuration, systemd
-from .datamodel import KresConfig
-
-
-class Kresd:
- def __init__(self, kresd_id: Optional[str] = None):
- self._lock = asyncio.Lock()
- self._id: str = kresd_id or str(uuid4())
-
- # if we got existing id, mark for restart
- self._needs_restart: bool = id is not None
+from knot_resolver_manager.kresd_controller import BaseKresdController, get_best_controller_implementation
- async def is_running(self) -> bool:
- raise NotImplementedError()
+from . import configuration
+from .datamodel import KresConfig
- async def start(self):
- await compat.asyncio.to_thread(systemd.start_unit, f"kresd@{self._id}.service")
- async def stop(self):
- await compat.asyncio.to_thread(systemd.stop_unit, f"kresd@{self._id}.service")
+class KresManager:
+ """
+ Core of the whole operation. Orchestrates individual instances under some
+ service manager like systemd.
- async def restart(self):
- await compat.asyncio.to_thread(systemd.restart_unit, f"kresd@{self._id}.service")
+ Instantiate with `KresManager.create()`, not with the usual constructor!
+ """
- def mark_for_restart(self):
- self._needs_restart = True
+ @classmethod
+ async def create(cls: Type["KresManager"], *args: Any, **kwargs: Any) -> "KresManager":
+ obj = cls()
+ await obj._async_init(*args, **kwargs) # pylint: disable=protected-access
+ return obj
+ async def _async_init(self):
+ self._controller = await get_best_controller_implementation()
+ await self.load_system_state()
-class KresManager:
def __init__(self):
- self._children: List[Kresd] = []
+ self._children: List[BaseKresdController] = []
self._children_lock = asyncio.Lock()
+ self._controller: Type[BaseKresdController]
async def load_system_state(self):
async with self._children_lock:
await self._collect_already_running_children()
async def _spawn_new_child(self):
- kresd = Kresd()
+ kresd = self._controller()
await kresd.start()
self._children.append(kresd)
await kresd.stop()
async def _collect_already_running_children(self):
- units = await compat.asyncio.to_thread(systemd.list_units)
- for unit in units:
- u: str = unit
- if u.startswith("kresd@") and u.endswith(".service"):
- iden = u.replace("kresd@", "").replace(".service", "")
- self._children.append(Kresd(kresd_id=iden))
+ self._children.extend(await self._controller.get_all_running_instances())
async def _rolling_restart(self):
for kresd in self._children:
--- /dev/null
+import asyncio
+from typing import Type
+
+from knot_resolver_manager.kresd_controller.base import BaseKresdController
+from knot_resolver_manager.kresd_controller.systemd import SystemdKresdController
+
+
+# In this tuple, every supported controller should be listed. In the order of preference (preferred first)
+_registered_controllers = (SystemdKresdController,)
+
+
+async def get_best_controller_implementation() -> Type[BaseKresdController]:
+ # check all controllers concurrently
+ res = await asyncio.gather(*(cont.is_controller_available() for cont in _registered_controllers))
+
+ # take the first one on the list which is available
+ for avail, controller in zip(res, _registered_controllers):
+ if avail:
+ return controller
+
+ # or fail
+ raise LookupError("Can't find any available service manager!")
--- /dev/null
+import asyncio
+from typing import Iterable, Optional
+from uuid import uuid4
+
+
+class BaseKresdController:
+ """
+ The common Kresd Controller interface. This is what KresManager requires and what has to be implemented by all
+ controllers.
+ """
+
+ def __init__(self, kresd_id: Optional[str] = None):
+ self._lock = asyncio.Lock()
+ self.id: str = kresd_id or str(uuid4())
+
+ @staticmethod
+ async def is_controller_available() -> bool:
+ raise NotImplementedError()
+
+ async def is_running(self) -> bool:
+ raise NotImplementedError()
+
+ async def start(self) -> None:
+ raise NotImplementedError()
+
+ async def stop(self) -> None:
+ raise NotImplementedError()
+
+ async def restart(self) -> None:
+ raise NotImplementedError()
+
+ @staticmethod
+ async def get_all_running_instances() -> Iterable["BaseKresdController"]:
+ raise NotImplementedError()
--- /dev/null
+from typing import Iterable, List
+
+from knot_resolver_manager import compat
+from knot_resolver_manager.kresd_controller.base import BaseKresdController
+
+from . import dbus_api as systemd
+
+
+class SystemdKresdController(BaseKresdController):
+ async def is_running(self) -> bool:
+ raise NotImplementedError()
+
+ async def start(self):
+ await compat.asyncio.to_thread(systemd.start_unit, f"kresd@{self.id}.service")
+
+ async def stop(self):
+ await compat.asyncio.to_thread(systemd.stop_unit, f"kresd@{self.id}.service")
+
+ async def restart(self):
+ await compat.asyncio.to_thread(systemd.restart_unit, f"kresd@{self.id}.service")
+
+ @staticmethod
+ async def is_controller_available() -> bool:
+ # TODO: implement a proper check
+ return True
+
+ @staticmethod
+ async def get_all_running_instances() -> Iterable["BaseKresdController"]:
+ res: List[SystemdKresdController] = []
+ units = await compat.asyncio.to_thread(systemd.list_units)
+ for unit in units:
+ u: str = unit
+ if u.startswith("kresd@") and u.endswith(".service"):
+ iden = u.replace("kresd@", "").replace(".service", "")
+ res.append(SystemdKresdController(kresd_id=iden))
+ return res