_instance: Optional["KresManager"] = None
@staticmethod
- async def create_instance(selected_controller: Optional[SubprocessController]) -> "KresManager":
+ async def create_instance(selected_controller: Optional[SubprocessController], config: KresConfig) -> "KresManager":
"""
- Creates new singleton instance of KresManager. Can be called only once. Afterwards, use
- `KresManager.get_instance()` to obtain the already existing instance
+ Creates new instance of KresManager.
"""
- assert KresManager._instance is None
+ inst = KresManager(config, _i_know_what_i_am_doing=True)
+ await inst._async_init(selected_controller, config) # pylint: disable=protected-access
+ KresManager._instance = inst
+ return inst
- async with KresManager._instance_lock:
- # trying to create, but racing and somebody already did it
- if KresManager._instance is not None:
- raise AssertionError("Must NOT call `create_instance` multiple times - race detected!")
-
- # create it for real
- inst = KresManager(_i_know_what_i_am_doing=True)
- await inst._async_init(selected_controller) # pylint: disable=protected-access
- KresManager._instance = inst
- return inst
-
- @staticmethod
- def get_instance() -> "KresManager":
- """
- Obtain reference to the singleton instance of this class. If you want to create an instance,
- use `create_instance()`
- """
- assert KresManager._instance is not None
- return KresManager._instance
-
- async def _async_init(self, selected_controller: Optional[SubprocessController]):
+ async def _async_init(self, selected_controller: Optional[SubprocessController], config: KresConfig):
if selected_controller is None:
self._controller = await knot_resolver_manager.kresd_controller.get_best_controller_implementation()
else:
await self._controller.initialize_controller()
self._watchdog_task = create_task(self._watchdog())
await self.load_system_state()
+ await self.apply_config(config)
- def __init__(self, _i_know_what_i_am_doing: bool = False):
+ def __init__(self, config: KresConfig, _i_know_what_i_am_doing: bool = False):
if not _i_know_what_i_am_doing:
logger.error(
"Trying to create an instance of KresManager using normal contructor. Please use "
self._manager_lock = asyncio.Lock()
self._controller: SubprocessController
self._last_used_config_raw: Optional[ParsedTree]
- self._last_used_config: Optional[KresConfig] = None
+ self._last_used_config: KresConfig = config
self._watchdog_task: Optional["Future[None]"] = None
async def load_system_state(self):
lua_config = config.render_lua()
await writefile(KRESD_CONFIG_FILE, lua_config)
- async def apply_config(self, config_raw: ParsedTree):
+ async def apply_config(self, config: KresConfig):
async with self._manager_lock:
- logger.debug("Validating configuration...")
- config = KresConfig(config_raw)
-
logger.debug("Writing new config to file...")
await self._write_config(config)
logger.debug("Canary process test passed, Applying new config to all workers")
self._last_used_config = config
- self._last_used_config_raw = config_raw
await self._ensure_number_of_children(config.server.workers)
await self._rolling_restart()
if self._watchdog_task is not None:
self._watchdog_task.cancel()
- def get_last_used_config(self) -> Optional[KresConfig]:
+ def get_last_used_config(self) -> KresConfig:
return self._last_used_config
- def get_last_used_config_raw(self) -> Optional[ParsedTree]:
- return self._last_used_config_raw
-
async def _instability_handler(self) -> None:
logger.error(
"Instability callback invoked. Something is wrong, no idea how to react."
from http import HTTPStatus
from pathlib import Path
from time import time
-from typing import Any, List, Optional, Tuple, Union
+from typing import Any, Optional, Union
from aiohttp import web
from aiohttp.web import middleware
+from aiohttp.web_app import Application
from aiohttp.web_response import json_response
+from aiohttp.web_runner import AppRunner, TCPSite, UnixSite
from knot_resolver_manager.constants import MANAGER_CONFIG_FILE
+from knot_resolver_manager.datamodel.config_schema import KresConfig
+from knot_resolver_manager.datamodel.types import Listen, ListenType
from knot_resolver_manager.exceptions import DataException, KresdManagerException, TreeException
from knot_resolver_manager.kresd_controller import get_controller_by_name
from knot_resolver_manager.kresd_controller.interface import SubprocessController
from knot_resolver_manager.utils.async_utils import readfile
from knot_resolver_manager.utils.parsing import ParsedTree, parse, parse_yaml
+from knot_resolver_manager.utils.types import NoneType
from .kres_manager import KresManager
-_SHUTDOWN_EVENT = "shutdown-event"
-
logger = logging.getLogger(__name__)
-async def _index(_request: web.Request) -> web.Response:
- """
- Dummy index handler to indicate that the server is indeed running...
- """
- return json_response(
- {
- "msg": "Knot Resolver Manager is running! The configuration endpoint is at /config",
- "status": "RUNNING",
- }
- )
-
-
-async def _apply_config(request: web.Request) -> web.Response:
- """
- Route handler for changing resolver configuration
- """
-
- document_path = request.match_info["path"]
-
- manager: KresManager = KresManager.get_instance()
- if manager is None:
- # handle the case when the manager is not yet initialized
- return web.Response(
- status=503, headers={"Retry-After": "3"}, text="Knot Resolver Manager is not yet fully initialized"
- )
-
- # parse the incoming data
- last: ParsedTree = manager.get_last_used_config_raw() or ParsedTree({})
- new_partial: ParsedTree = parse(await request.text(), request.content_type)
- config = last.update(document_path, new_partial)
-
- # apply config
- await manager.apply_config(config)
-
- # return success
- return web.Response()
-
-
-async def _stop(request: web.Request) -> web.Response:
- """
- Route handler for shutting down the server (and whole manager)
- """
-
- stop_server(request.app)
- return web.Response(text="Shutting down...")
-
-
@middleware
async def error_handler(request: web.Request, handler: Any):
"""
return web.Response(text=f"Request processing failed: {e}", status=HTTPStatus.INTERNAL_SERVER_ERROR)
-def setup_routes(app: web.Application):
- app.add_routes([web.get("/", _index), web.post(r"/config{path:.*}", _apply_config), web.post("/stop", _stop)])
+class Server:
+ # pylint: disable=too-many-instance-attributes
+ # This is top-level class containing pretty much everything. Instead of global
+ # variables, we use instance attributes. That's why there are so many and it's
+ # ok.
+ def __init__(self, manager: KresManager):
+
+ self.manager = manager
+ self.app = Application(middlewares=[error_handler])
+ self.runner = AppRunner(self.app)
+
+ self.listen: Optional[Listen] = None
+ self.site: Union[NoneType, TCPSite, UnixSite] = None
+ self.listen_lock = asyncio.Lock()
+
+ self.log_level = "dummy"
+
+ self.shutdown_event = asyncio.Event()
+
+ async def _reconfigure(self, config: KresConfig):
+ self._set_log_level(config)
+ await self._reconfigure_listen_address(config)
+
+ async def start(self):
+ config = self.manager.get_last_used_config()
+ self.setup_routes()
+ await self.runner.setup()
+ await self._reconfigure(config)
+ async def wait_for_shutdown(self):
+ await self.shutdown_event.wait()
-def stop_server(app: web.Application):
- app[_SHUTDOWN_EVENT].set()
- logger.info("Shutdown event triggered...")
+ async def _handler_index(self, _request: web.Request) -> web.Response:
+ """
+ Dummy index handler to indicate that the server is indeed running...
+ """
+ return json_response(
+ {
+ "msg": "Knot Resolver Manager is running! The configuration endpoint is at /config",
+ "status": "RUNNING",
+ }
+ )
+
+ async def _handler_apply_config(self, request: web.Request) -> web.Response:
+ """
+ Route handler for changing resolver configuration
+ """
+
+ # parse the incoming data
+ document_path = request.match_info["path"]
+ last: ParsedTree = self.manager.get_last_used_config().get_unparsed_data()
+ new_partial: ParsedTree = parse(await request.text(), request.content_type)
+ config = last.update(document_path, new_partial)
+
+ # validate config
+ config_validated = KresConfig(config)
+
+ # apply config
+ await self._reconfigure(config_validated)
+ await self.manager.apply_config(config_validated)
+
+ # return success
+ return web.Response()
+
+ def _set_log_level(self, config: KresConfig):
+ if self.log_level != config.server.management.log_level:
+ # expects one existing log handler on the root
+ h = logging.getLogger().handlers
+ assert len(h) == 1
+ target = config.server.management.log_level
+ logger.warning(f"Changing log level to '{target}'")
+ h[0].setLevel(target)
+ self.log_level = target
+
+ async def _handler_stop(self, _request: web.Request) -> web.Response:
+ """
+ Route handler for shutting down the server (and whole manager)
+ """
+
+ self.shutdown_event.set()
+ logger.info("Shutdown event triggered...")
+ return web.Response(text="Shutting down...")
+
+ def setup_routes(self):
+ self.app.add_routes(
+ [
+ web.get("/", self._handler_index),
+ web.post(r"/config{path:.*}", self._handler_apply_config),
+ web.post("/stop", self._handler_stop),
+ ]
+ )
+
+ async def _reconfigure_listen_address(self, config: KresConfig):
+ async with self.listen_lock:
+ mgn = config.server.management
+
+ # if the listen address did not change, do nothing
+ if self.listen == mgn.listen:
+ return
+
+ # start the new listen address
+ if mgn.listen.typ is ListenType.UNIX_SOCKET:
+ nsite = web.UnixSite(self.runner, str(mgn.listen.unix_socket))
+ logger.info(f"Starting API HTTP server on http+unix://{mgn.listen.unix_socket}")
+ elif mgn.listen.typ is ListenType.IP_AND_PORT:
+ nsite = web.TCPSite(self.runner, str(mgn.listen.ip), mgn.listen.port)
+ logger.info(f"Starting API HTTP server on http://{mgn.listen.ip}:{mgn.listen.port}")
+ else:
+ raise KresdManagerException(f"Requested API on unsupported configuration format {mgn.listen.typ}")
+ await nsite.start()
+
+ # stop the old listen
+ assert (self.listen is None) == (self.site is None)
+ if self.listen is not None and self.site is not None:
+ if self.listen.typ is ListenType.UNIX_SOCKET:
+ logger.info(f"Stopping API HTTP server on http+unix://{mgn.listen.unix_socket}")
+ elif mgn.listen.typ is ListenType.IP_AND_PORT:
+ logger.info(f"Stopping API HTTP server on http://{mgn.listen.ip}:{mgn.listen.port}")
+ await self.site.stop()
+
+ # save new state
+ self.listen = mgn.listen
+ self.site = nsite
+
+ async def shutdown(self):
+ if self.site is not None:
+ await self.site.stop()
+ await self.runner.cleanup()
class _DefaultSentinel:
_DEFAULT_SENTINEL = _DefaultSentinel()
-async def _init_manager(
- config: Union[None, Path, ParsedTree, _DefaultSentinel],
- subprocess_controller_name: Optional[str],
-):
+async def _init_manager(config: Union[Path, ParsedTree, _DefaultSentinel]) -> KresManager:
"""
Called asynchronously when the application initializes.
"""
try:
- # if configured, create a subprocess controller manually
- controller: Optional[SubprocessController] = None
- if subprocess_controller_name is not None:
- controller = await get_controller_by_name(subprocess_controller_name)
-
- # Create KresManager. This will perform autodetection of available service managers and
- # select the most appropriate to use (or use the one configured directly)
- manager = await KresManager.create_instance(controller)
-
# Initial configuration of the manager
- if config is None:
- # do nothing, there won't be any initial config
- pass
if isinstance(config, _DefaultSentinel):
# use default
config = MANAGER_CONFIG_FILE
else:
logger.info("Loading initial configuration from %s", config)
config = parse_yaml(await readfile(config))
- if isinstance(config, ParsedTree):
- await manager.apply_config(config)
- logger.info("Initial configuration applied...")
- logger.info("Process manager initialized...")
+ # validate the initial configuration
+ assert isinstance(config, ParsedTree)
+ logger.info("Validating initial configuration...")
+ config_validated = KresConfig(config)
+
+ # if configured, create a subprocess controller manually
+ controller: Optional[SubprocessController] = None
+ if config_validated.server.management.backend != "auto":
+ controller = await get_controller_by_name(config_validated.server.management.backend)
+
+ # Create KresManager. This will perform autodetection of available service managers and
+ # select the most appropriate to use (or use the one configured directly)
+ manager = await KresManager.create_instance(controller, config_validated)
+
+ logger.info("Initial configuration applied. Process manager initialized...")
+ return manager
except BaseException:
logger.error("Manager initialization failed... Shutting down!", exc_info=True)
sys.exit(1)
-async def start_server(
- tcp: List[Tuple[str, int]],
- unix: List[Path],
- config: Union[None, Path, ParsedTree, _DefaultSentinel] = _DEFAULT_SENTINEL,
- subprocess_controller_name: Optional[str] = None,
-):
+async def start_server(config: Union[Path, ParsedTree, _DefaultSentinel] = _DEFAULT_SENTINEL):
start_time = time()
- # before starting any server, initialize the subprocess controller etc.
- await _init_manager(config, subprocess_controller_name)
-
- app = web.Application(middlewares=[error_handler])
- app[_SHUTDOWN_EVENT] = asyncio.Event()
-
- # configure routing
- setup_routes(app)
+ # before starting server, initialize the subprocess controller etc.
+ manager = await _init_manager(config)
- # run forever, listen at the appropriate place
- runner = web.AppRunner(app)
- await runner.setup()
-
- for host, port in tcp:
- site = web.TCPSite(runner, host, port)
- await site.start()
- logger.info(f"HTTP server started listening on http://{host}:{port} ===")
- for file in unix:
- file.parent.mkdir(exist_ok=True)
- site = web.UnixSite(runner, str(file))
- await site.start()
- logger.info(f"HTTP server started listening on on http+unix://{file} ===")
+ server = Server(manager)
+ await server.start()
# stop the server gracefully and cleanup everything
logger.info(f"Manager fully initialized and running in {round(time() - start_time, 3)} seconds")
- await app[_SHUTDOWN_EVENT].wait()
+
+ await server.wait_for_shutdown()
+
logger.info("Gracefull shutdown triggered. Cleaning up...")
- await runner.cleanup()
- await KresManager.get_instance().stop()
+ await server.shutdown()
+ await manager.stop()
logger.info(f"The manager run for {round(time() - start_time)} seconds... Hope it served well. Bye!")