]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
server: dynamic configuration of listen address and log level, refactoring of startup
authorVasek Sraier <git@vakabus.cz>
Fri, 1 Oct 2021 20:39:16 +0000 (22:39 +0200)
committerAleš Mrázek <ales.mrazek@nic.cz>
Fri, 8 Apr 2022 14:17:53 +0000 (16:17 +0200)
13 files changed:
manager/etc/knot-resolver/config.yml
manager/integration/.gitignore [new file with mode: 0644]
manager/integration/config.yml [new file with mode: 0644]
manager/integration/runner.py
manager/knot_resolver_manager/__main__.py
manager/knot_resolver_manager/client/__init__.py
manager/knot_resolver_manager/constants.py
manager/knot_resolver_manager/datamodel/server_schema.py
manager/knot_resolver_manager/datamodel/types.py
manager/knot_resolver_manager/kres_manager.py
manager/knot_resolver_manager/server.py
manager/knot_resolver_manager/utils/modelling.py
manager/scripts/run

index b05b9e02e2c917913c477d9645a1d7ec4eab3dce..cc18a162fb97d811f40feb97dd3dead9b15d6e9e 100644 (file)
@@ -1,11 +1,16 @@
-server:
-    workers: 1
-network:
-    interfaces:
-      - listen:
-            ip: 127.0.0.1
-            port: 5353
 cache:
-    storage: etc/knot-resolver/cache
+  storage: etc/knot-resolver/cache
 logging:
-    level: debug
+  level: debug
+network:
+  interfaces:
+    - listen:
+        ip: 127.0.0.1
+        port: 5353
+server:
+  workers: 1
+  management:
+    listen:
+      ip: 127.0.0.1
+      port: 5000
+    log-level: DEBUG
diff --git a/manager/integration/.gitignore b/manager/integration/.gitignore
new file mode 100644 (file)
index 0000000..6e25fa8
--- /dev/null
@@ -0,0 +1 @@
+cache/
\ No newline at end of file
diff --git a/manager/integration/config.yml b/manager/integration/config.yml
new file mode 100644 (file)
index 0000000..859bbd1
--- /dev/null
@@ -0,0 +1,15 @@
+network:
+  interfaces:
+    - listen:
+        ip: 127.0.0.1
+        port: 5353
+server:
+  workers: 1
+  management:
+    listen:
+      ip: 127.0.0.1
+      port: 5001
+    log-level: DEBUG
+cache:
+  storage: integration/cache
+
index 55df72fef1b9765de9e599939355756b8019a1be..45b2111e35720971877db11f513bea95fa4c61c0 100644 (file)
@@ -1,5 +1,6 @@
 import logging
 import sys
+from pathlib import Path
 from typing import Callable
 
 from knot_resolver_manager.client import KnotManagerClient, count_running_kresds, start_manager_in_background
@@ -16,7 +17,7 @@ logger = logging.getLogger(__name__)
 
 
 def test_wrapper(test: Test) -> bool:
-    p = start_manager_in_background(HOST, PORT)
+    p = start_manager_in_background(Path("integration/config.yml"))
     client = KnotManagerClient(BASE_URL)
     client.wait_for_initialization()
 
@@ -63,7 +64,7 @@ def crash_resistance(client: KnotManagerClient):
     assert cnt == 2, f"Expected 2 kresd instances, found {cnt}"
 
     # start the server again
-    p = start_manager_in_background("localhost", PORT, initial_config=None)
+    p = start_manager_in_background(Path("integration/config.yml"))
     try:
         client.wait_for_initialization()
     except TimeoutError as e:
index ee6ab5e9a224621c9a552f046e7dd06fd694f60a..211764eac74cc8f8e1bb4fdf3e255504cafb02b3 100644 (file)
@@ -1,19 +1,17 @@
 import logging
 import sys
 from pathlib import Path
-from typing import List, Optional, Tuple
+from typing import Optional
 
 import click
 
 from knot_resolver_manager import compat
-from knot_resolver_manager.constants import LISTEN_SOCKET_PATH, LOG_LEVEL, MANAGER_CONFIG_FILE
+from knot_resolver_manager.constants import MANAGER_CONFIG_FILE, STARTUP_LOG_LEVEL
 from knot_resolver_manager.kresd_controller import list_controller_names
 from knot_resolver_manager.server import start_server
-from knot_resolver_manager.utils import ignore_exceptions_optional
 
 
 @click.command()
-@click.argument("listen", type=str, nargs=1, required=False, default=None)
 @click.option(
     "--config",
     "-c",
@@ -23,17 +21,8 @@ from knot_resolver_manager.utils import ignore_exceptions_optional
     default=None,
     help="Overrides default config location at '" + str(MANAGER_CONFIG_FILE) + "'",
 )
-@click.option(
-    "--backend",
-    "-b",
-    type=str,
-    nargs=1,
-    required=False,
-    default=None,
-    help="Use specified subprocess controller, default auto detection",
-)
 @click.option("--list-backends", "-l", type=bool, is_flag=True, default=False)
-def main(listen: Optional[str], config: Optional[str], backend: Optional[str], list_backends: bool):
+def main(config: Optional[str], list_backends: bool):
     # pylint: disable=expression-not-assigned
 
     """Knot Resolver Manager
@@ -49,24 +38,12 @@ def main(listen: Optional[str], config: Optional[str], backend: Optional[str], l
             click.echo(f" - {n}")
         sys.exit(0)
 
-    # determine where should the manager listen based on the given argument
-    tcp: List[Tuple[str, int]] = []
-    unix: List[Path] = []
-    if listen is None:
-        unix.append(LISTEN_SOCKET_PATH)
-    else:
-        port = ignore_exceptions_optional(int, None, ValueError)(int)(listen)
-        if port is not None:
-            tcp.append(("localhost", port))
-        else:
-            unix.append(Path(listen))
-
     # where to look for config
     config_path = MANAGER_CONFIG_FILE if config is None else Path(config)
 
-    compat.asyncio.run(start_server(tcp=tcp, unix=unix, config=config_path, subprocess_controller_name=backend))
+    compat.asyncio.run(start_server(config=config_path))
 
 
 if __name__ == "__main__":
-    logging.basicConfig(level=LOG_LEVEL)
+    logging.basicConfig(level=STARTUP_LOG_LEVEL)
     main()  # pylint: disable=no-value-for-parameter
index c1d0df7eb475aded6a648c46b4c874ffd6c08a7a..5c0d53a89c21552a16600bba7c9fd1f0ec1b3510 100644 (file)
@@ -3,6 +3,7 @@ import multiprocessing
 import subprocess
 import time
 import urllib.parse
+from pathlib import Path
 from typing import Union
 
 import requests
@@ -64,13 +65,11 @@ _DEFAULT_SENTINEL = _DefaultSentinel()
 
 
 def start_manager_in_background(
-    host: str, port: int, initial_config: Union[None, ParsedTree, _DefaultSentinel] = _DEFAULT_SENTINEL
+    initial_config: Union[Path, ParsedTree, _DefaultSentinel] = _DEFAULT_SENTINEL
 ) -> multiprocessing.Process:
     if isinstance(initial_config, _DefaultSentinel):
-        p = multiprocessing.Process(target=compat.asyncio.run, args=(start_server(tcp=[(host, port)], unix=[]),))
+        p = multiprocessing.Process(target=compat.asyncio.run, args=(start_server(),))
     else:
-        p = multiprocessing.Process(
-            target=compat.asyncio.run, args=(start_server(tcp=[(host, port)], unix=[], config=initial_config),)
-        )
+        p = multiprocessing.Process(target=compat.asyncio.run, args=(start_server(config=initial_config),))
     p.start()
     return p
index ebdb97d5f59666c8d9fb728154238c5d3768ab15..2fd284c851ba97dfff501be959f69ff713ec9c73 100644 (file)
@@ -1,7 +1,7 @@
 import logging
 from pathlib import Path
 
-LOG_LEVEL = logging.DEBUG
+STARTUP_LOG_LEVEL = logging.DEBUG
 
 CONFIGURATION_DIR = Path("etc/knot-resolver").absolute()
 CONFIGURATION_DIR.mkdir(exist_ok=True)
index f27876430c13d8d88e7b8ee36fce0c539c05d1c8..2fb59114468232f43f12f4c01b64ad55d396746b 100644 (file)
@@ -32,12 +32,14 @@ def _cpu_count() -> int:
 
 
 BackendEnum = LiteralEnum["auto", "systemd", "supervisord"]
+LogLevelEnum = LiteralEnum["CRITICAL", "FATAL", "ERROR", "WARN", "WARNING", "INFO", "DEBUG"]
 
 
 class ManagementSchema(SchemaNode):
     listen: Listen = Listen({"unix-socket": "/tmp/manager.sock"})
     backend: BackendEnum = "auto"
     rundir: AnyPath = AnyPath(".")
+    log_level: LogLevelEnum = "INFO"
 
 
 class WebmgmtSchema(SchemaNode):
index e70bd89d3ad93e70dfcf9103473893545b8dd183..acf8ec7a2b4cbea2ac04c1f74cd9f2808d32ebce 100644 (file)
@@ -181,6 +181,28 @@ class Listen(SchemaNode):
         # we already check that it's there is only one option in the `_typ` method
         pass
 
+    def __str__(self) -> str:
+        if self.typ is ListenType.IP_AND_PORT:
+            return f"{self.ip} @ {self.port}"
+        elif self.typ is ListenType.UNIX_SOCKET:
+            return f"{self.unix_socket}"
+        elif self.typ is ListenType.INTERFACE_AND_PORT:
+            return f"{self.interface} @ {self.port}"
+        else:
+            raise NotImplementedError()
+
+    def __eq__(self, o: object) -> bool:
+        if not isinstance(o, Listen):
+            return False
+
+        return (
+            self.port == o.port
+            and self.ip == o.ip
+            and self.typ == o.typ
+            and self.unix_socket == o.unix_socket
+            and self.interface == o.interface
+        )
+
 
 class IPNetwork(CustomValueType):
     def __init__(self, source_value: Any, object_path: str = "/") -> None:
index 995d1bf9b713099a48040ec523e63457a60a6706..a8920258260455b9cf3055fc7f9e661ed2ad2f81 100644 (file)
@@ -36,35 +36,17 @@ class KresManager:
     _instance: Optional["KresManager"] = None
 
     @staticmethod
-    async def create_instance(selected_controller: Optional[SubprocessController]) -> "KresManager":
+    async def create_instance(selected_controller: Optional[SubprocessController], config: KresConfig) -> "KresManager":
         """
-        Creates new singleton instance of KresManager. Can be called only once. Afterwards, use
-        `KresManager.get_instance()` to obtain the already existing instance
+        Creates new instance of KresManager.
         """
 
-        assert KresManager._instance is None
+        inst = KresManager(config, _i_know_what_i_am_doing=True)
+        await inst._async_init(selected_controller, config)  # pylint: disable=protected-access
+        KresManager._instance = inst
+        return inst
 
-        async with KresManager._instance_lock:
-            # trying to create, but racing and somebody already did it
-            if KresManager._instance is not None:
-                raise AssertionError("Must NOT call `create_instance` multiple times - race detected!")
-
-            # create it for real
-            inst = KresManager(_i_know_what_i_am_doing=True)
-            await inst._async_init(selected_controller)  # pylint: disable=protected-access
-            KresManager._instance = inst
-            return inst
-
-    @staticmethod
-    def get_instance() -> "KresManager":
-        """
-        Obtain reference to the singleton instance of this class. If you want to create an instance,
-        use `create_instance()`
-        """
-        assert KresManager._instance is not None
-        return KresManager._instance
-
-    async def _async_init(self, selected_controller: Optional[SubprocessController]):
+    async def _async_init(self, selected_controller: Optional[SubprocessController], config: KresConfig):
         if selected_controller is None:
             self._controller = await knot_resolver_manager.kresd_controller.get_best_controller_implementation()
         else:
@@ -72,8 +54,9 @@ class KresManager:
         await self._controller.initialize_controller()
         self._watchdog_task = create_task(self._watchdog())
         await self.load_system_state()
+        await self.apply_config(config)
 
-    def __init__(self, _i_know_what_i_am_doing: bool = False):
+    def __init__(self, config: KresConfig, _i_know_what_i_am_doing: bool = False):
         if not _i_know_what_i_am_doing:
             logger.error(
                 "Trying to create an instance of KresManager using normal contructor. Please use "
@@ -86,7 +69,7 @@ class KresManager:
         self._manager_lock = asyncio.Lock()
         self._controller: SubprocessController
         self._last_used_config_raw: Optional[ParsedTree]
-        self._last_used_config: Optional[KresConfig] = None
+        self._last_used_config: KresConfig = config
         self._watchdog_task: Optional["Future[None]"] = None
 
     async def load_system_state(self):
@@ -146,11 +129,8 @@ class KresManager:
         lua_config = config.render_lua()
         await writefile(KRESD_CONFIG_FILE, lua_config)
 
-    async def apply_config(self, config_raw: ParsedTree):
+    async def apply_config(self, config: KresConfig):
         async with self._manager_lock:
-            logger.debug("Validating configuration...")
-            config = KresConfig(config_raw)
-
             logger.debug("Writing new config to file...")
             await self._write_config(config)
 
@@ -166,7 +146,6 @@ class KresManager:
 
             logger.debug("Canary process test passed, Applying new config to all workers")
             self._last_used_config = config
-            self._last_used_config_raw = config_raw
             await self._ensure_number_of_children(config.server.workers)
             await self._rolling_restart()
 
@@ -186,12 +165,9 @@ class KresManager:
         if self._watchdog_task is not None:
             self._watchdog_task.cancel()
 
-    def get_last_used_config(self) -> Optional[KresConfig]:
+    def get_last_used_config(self) -> KresConfig:
         return self._last_used_config
 
-    def get_last_used_config_raw(self) -> Optional[ParsedTree]:
-        return self._last_used_config_raw
-
     async def _instability_handler(self) -> None:
         logger.error(
             "Instability callback invoked. Something is wrong, no idea how to react."
index 912f82a45e296d3ff4e7634be169cbbd2429c7da..e1e2e45cee0fc5ba51e65110d6142e369f241c68 100644 (file)
@@ -4,73 +4,29 @@ import sys
 from http import HTTPStatus
 from pathlib import Path
 from time import time
-from typing import Any, List, Optional, Tuple, Union
+from typing import Any, Optional, Union
 
 from aiohttp import web
 from aiohttp.web import middleware
+from aiohttp.web_app import Application
 from aiohttp.web_response import json_response
+from aiohttp.web_runner import AppRunner, TCPSite, UnixSite
 
 from knot_resolver_manager.constants import MANAGER_CONFIG_FILE
+from knot_resolver_manager.datamodel.config_schema import KresConfig
+from knot_resolver_manager.datamodel.types import Listen, ListenType
 from knot_resolver_manager.exceptions import DataException, KresdManagerException, TreeException
 from knot_resolver_manager.kresd_controller import get_controller_by_name
 from knot_resolver_manager.kresd_controller.interface import SubprocessController
 from knot_resolver_manager.utils.async_utils import readfile
 from knot_resolver_manager.utils.parsing import ParsedTree, parse, parse_yaml
+from knot_resolver_manager.utils.types import NoneType
 
 from .kres_manager import KresManager
 
-_SHUTDOWN_EVENT = "shutdown-event"
-
 logger = logging.getLogger(__name__)
 
 
-async def _index(_request: web.Request) -> web.Response:
-    """
-    Dummy index handler to indicate that the server is indeed running...
-    """
-    return json_response(
-        {
-            "msg": "Knot Resolver Manager is running! The configuration endpoint is at /config",
-            "status": "RUNNING",
-        }
-    )
-
-
-async def _apply_config(request: web.Request) -> web.Response:
-    """
-    Route handler for changing resolver configuration
-    """
-
-    document_path = request.match_info["path"]
-
-    manager: KresManager = KresManager.get_instance()
-    if manager is None:
-        # handle the case when the manager is not yet initialized
-        return web.Response(
-            status=503, headers={"Retry-After": "3"}, text="Knot Resolver Manager is not yet fully initialized"
-        )
-
-    # parse the incoming data
-    last: ParsedTree = manager.get_last_used_config_raw() or ParsedTree({})
-    new_partial: ParsedTree = parse(await request.text(), request.content_type)
-    config = last.update(document_path, new_partial)
-
-    # apply config
-    await manager.apply_config(config)
-
-    # return success
-    return web.Response()
-
-
-async def _stop(request: web.Request) -> web.Response:
-    """
-    Route handler for shutting down the server (and whole manager)
-    """
-
-    stop_server(request.app)
-    return web.Response(text="Shutting down...")
-
-
 @middleware
 async def error_handler(request: web.Request, handler: Any):
     """
@@ -94,13 +50,134 @@ async def error_handler(request: web.Request, handler: Any):
             return web.Response(text=f"Request processing failed: {e}", status=HTTPStatus.INTERNAL_SERVER_ERROR)
 
 
-def setup_routes(app: web.Application):
-    app.add_routes([web.get("/", _index), web.post(r"/config{path:.*}", _apply_config), web.post("/stop", _stop)])
+class Server:
+    # pylint: disable=too-many-instance-attributes
+    # This is top-level class containing pretty much everything. Instead of global
+    # variables, we use instance attributes. That's why there are so many and it's
+    # ok.
+    def __init__(self, manager: KresManager):
+
+        self.manager = manager
+        self.app = Application(middlewares=[error_handler])
+        self.runner = AppRunner(self.app)
+
+        self.listen: Optional[Listen] = None
+        self.site: Union[NoneType, TCPSite, UnixSite] = None
+        self.listen_lock = asyncio.Lock()
+
+        self.log_level = "dummy"
+
+        self.shutdown_event = asyncio.Event()
+
+    async def _reconfigure(self, config: KresConfig):
+        self._set_log_level(config)
+        await self._reconfigure_listen_address(config)
+
+    async def start(self):
+        config = self.manager.get_last_used_config()
+        self.setup_routes()
+        await self.runner.setup()
+        await self._reconfigure(config)
 
+    async def wait_for_shutdown(self):
+        await self.shutdown_event.wait()
 
-def stop_server(app: web.Application):
-    app[_SHUTDOWN_EVENT].set()
-    logger.info("Shutdown event triggered...")
+    async def _handler_index(self, _request: web.Request) -> web.Response:
+        """
+        Dummy index handler to indicate that the server is indeed running...
+        """
+        return json_response(
+            {
+                "msg": "Knot Resolver Manager is running! The configuration endpoint is at /config",
+                "status": "RUNNING",
+            }
+        )
+
+    async def _handler_apply_config(self, request: web.Request) -> web.Response:
+        """
+        Route handler for changing resolver configuration
+        """
+
+        # parse the incoming data
+        document_path = request.match_info["path"]
+        last: ParsedTree = self.manager.get_last_used_config().get_unparsed_data()
+        new_partial: ParsedTree = parse(await request.text(), request.content_type)
+        config = last.update(document_path, new_partial)
+
+        # validate config
+        config_validated = KresConfig(config)
+
+        # apply config
+        await self._reconfigure(config_validated)
+        await self.manager.apply_config(config_validated)
+
+        # return success
+        return web.Response()
+
+    def _set_log_level(self, config: KresConfig):
+        if self.log_level != config.server.management.log_level:
+            # expects one existing log handler on the root
+            h = logging.getLogger().handlers
+            assert len(h) == 1
+            target = config.server.management.log_level
+            logger.warning(f"Changing log level to '{target}'")
+            h[0].setLevel(target)
+            self.log_level = target
+
+    async def _handler_stop(self, _request: web.Request) -> web.Response:
+        """
+        Route handler for shutting down the server (and whole manager)
+        """
+
+        self.shutdown_event.set()
+        logger.info("Shutdown event triggered...")
+        return web.Response(text="Shutting down...")
+
+    def setup_routes(self):
+        self.app.add_routes(
+            [
+                web.get("/", self._handler_index),
+                web.post(r"/config{path:.*}", self._handler_apply_config),
+                web.post("/stop", self._handler_stop),
+            ]
+        )
+
+    async def _reconfigure_listen_address(self, config: KresConfig):
+        async with self.listen_lock:
+            mgn = config.server.management
+
+            # if the listen address did not change, do nothing
+            if self.listen == mgn.listen:
+                return
+
+            # start the new listen address
+            if mgn.listen.typ is ListenType.UNIX_SOCKET:
+                nsite = web.UnixSite(self.runner, str(mgn.listen.unix_socket))
+                logger.info(f"Starting API HTTP server on http+unix://{mgn.listen.unix_socket}")
+            elif mgn.listen.typ is ListenType.IP_AND_PORT:
+                nsite = web.TCPSite(self.runner, str(mgn.listen.ip), mgn.listen.port)
+                logger.info(f"Starting API HTTP server on http://{mgn.listen.ip}:{mgn.listen.port}")
+            else:
+                raise KresdManagerException(f"Requested API on unsupported configuration format {mgn.listen.typ}")
+            await nsite.start()
+
+            # stop the old listen
+            assert (self.listen is None) == (self.site is None)
+            if self.listen is not None and self.site is not None:
+                if self.listen.typ is ListenType.UNIX_SOCKET:
+                    logger.info(f"Stopping API HTTP server on http+unix://{mgn.listen.unix_socket}")
+                elif mgn.listen.typ is ListenType.IP_AND_PORT:
+                    logger.info(f"Stopping API HTTP server on http://{mgn.listen.ip}:{mgn.listen.port}")
+                await self.site.stop()
+
+            # save new state
+            self.listen = mgn.listen
+            self.site = nsite
+
+    async def shutdown(self):
+        if self.site is not None:
+            await self.site.stop()
+        await self.runner.cleanup()
 
 
 class _DefaultSentinel:
@@ -110,27 +187,12 @@ class _DefaultSentinel:
 _DEFAULT_SENTINEL = _DefaultSentinel()
 
 
-async def _init_manager(
-    config: Union[None, Path, ParsedTree, _DefaultSentinel],
-    subprocess_controller_name: Optional[str],
-):
+async def _init_manager(config: Union[Path, ParsedTree, _DefaultSentinel]) -> KresManager:
     """
     Called asynchronously when the application initializes.
     """
     try:
-        # if configured, create a subprocess controller manually
-        controller: Optional[SubprocessController] = None
-        if subprocess_controller_name is not None:
-            controller = await get_controller_by_name(subprocess_controller_name)
-
-        # Create KresManager. This will perform autodetection of available service managers and
-        # select the most appropriate to use (or use the one configured directly)
-        manager = await KresManager.create_instance(controller)
-
         # Initial configuration of the manager
-        if config is None:
-            # do nothing, there won't be any initial config
-            pass
         if isinstance(config, _DefaultSentinel):
             # use default
             config = MANAGER_CONFIG_FILE
@@ -144,51 +206,43 @@ async def _init_manager(
             else:
                 logger.info("Loading initial configuration from %s", config)
                 config = parse_yaml(await readfile(config))
-        if isinstance(config, ParsedTree):
-            await manager.apply_config(config)
-            logger.info("Initial configuration applied...")
 
-        logger.info("Process manager initialized...")
+        # validate the initial configuration
+        assert isinstance(config, ParsedTree)
+        logger.info("Validating initial configuration...")
+        config_validated = KresConfig(config)
+
+        # if configured, create a subprocess controller manually
+        controller: Optional[SubprocessController] = None
+        if config_validated.server.management.backend != "auto":
+            controller = await get_controller_by_name(config_validated.server.management.backend)
+
+        # Create KresManager. This will perform autodetection of available service managers and
+        # select the most appropriate to use (or use the one configured directly)
+        manager = await KresManager.create_instance(controller, config_validated)
+
+        logger.info("Initial configuration applied. Process manager initialized...")
+        return manager
     except BaseException:
         logger.error("Manager initialization failed... Shutting down!", exc_info=True)
         sys.exit(1)
 
 
-async def start_server(
-    tcp: List[Tuple[str, int]],
-    unix: List[Path],
-    config: Union[None, Path, ParsedTree, _DefaultSentinel] = _DEFAULT_SENTINEL,
-    subprocess_controller_name: Optional[str] = None,
-):
+async def start_server(config: Union[Path, ParsedTree, _DefaultSentinel] = _DEFAULT_SENTINEL):
     start_time = time()
 
-    # before starting any server, initialize the subprocess controller etc.
-    await _init_manager(config, subprocess_controller_name)
-
-    app = web.Application(middlewares=[error_handler])
-    app[_SHUTDOWN_EVENT] = asyncio.Event()
-
-    # configure routing
-    setup_routes(app)
+    # before starting server, initialize the subprocess controller etc.
+    manager = await _init_manager(config)
 
-    # run forever, listen at the appropriate place
-    runner = web.AppRunner(app)
-    await runner.setup()
-
-    for host, port in tcp:
-        site = web.TCPSite(runner, host, port)
-        await site.start()
-        logger.info(f"HTTP server started listening on http://{host}:{port} ===")
-    for file in unix:
-        file.parent.mkdir(exist_ok=True)
-        site = web.UnixSite(runner, str(file))
-        await site.start()
-        logger.info(f"HTTP server started listening on on http+unix://{file} ===")
+    server = Server(manager)
+    await server.start()
 
     # stop the server gracefully and cleanup everything
     logger.info(f"Manager fully initialized and running in {round(time() - start_time, 3)} seconds")
-    await app[_SHUTDOWN_EVENT].wait()
+
+    await server.wait_for_shutdown()
+
     logger.info("Gracefull shutdown triggered. Cleaning up...")
-    await runner.cleanup()
-    await KresManager.get_instance().stop()
+    await server.shutdown()
+    await manager.stop()
     logger.info(f"The manager run for {round(time() - start_time)} seconds... Hope it served well. Bye!")
index f207efbb81a07ced1f9658de3a85fd6801f82362..c4f0b91371f2b613f241c15ef1e32f3871c283b2 100644 (file)
@@ -315,14 +315,19 @@ class SchemaNode:
         return used_keys
 
     def __init__(self, source: TSource = None, object_path: str = ""):
-        # construct lower level schema node first if configured to do so
-        if self._PREVIOUS_SCHEMA is not None:
-            source = self._PREVIOUS_SCHEMA(source, object_path=object_path)  # pylint: disable=not-callable
-
         # make sure that all raw data checks passed on the source object
+        if source is None:
+            source = ParsedTree({})
         if isinstance(source, dict):
             source = ParsedTree(source)
 
+        # save source
+        self._source: Union[ParsedTree, SchemaNode] = source
+
+        # construct lower level schema node first if configured to do so
+        if self._PREVIOUS_SCHEMA is not None:
+            source = self._PREVIOUS_SCHEMA(source, object_path=object_path)  # pylint: disable=not-callable
+
         # assign fields
         used_keys = self._assign_fields(source, object_path)
 
@@ -342,6 +347,12 @@ class SchemaNode:
         except ValueError as e:
             raise SchemaException(e.args[0] if len(e.args) > 0 else "Validation error", object_path) from e
 
+    def get_unparsed_data(self) -> ParsedTree:
+        if isinstance(self._source, SchemaNode):
+            return self._source.get_unparsed_data()
+        else:
+            return self._source
+
     def _get_converted_value(self, key: str, source: TSource, object_path: str) -> Any:
         """
         Get a value of a field by invoking appropriate transformation function.
index f267064a012bba708dd50ed3c92677be61df0c82..fdd5839eb8eaffc951e3145eadb38e77d9fd665c 100755 (executable)
@@ -7,4 +7,4 @@ source $src_dir/_env.sh
 echo Knot Manager API is accessible on http://localhost:5000
 echo -------------------------------------------------------
 
-poetry run python -m knot_resolver_manager 5000 $@
\ No newline at end of file
+poetry run python -m knot_resolver_manager $@
\ No newline at end of file