]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager/kafka_client.py: consume config
authorAleš Mrázek <ales.mrazek@nic.cz>
Fri, 25 Jul 2025 12:32:54 +0000 (14:32 +0200)
committerVladimír Čunát <vladimir.cunat@nic.cz>
Thu, 9 Oct 2025 08:58:45 +0000 (10:58 +0200)
python/knot_resolver/manager/kafka_client.py

index bc316ab336f6ed7dd1527f6f12a563ac5cf6a749..2c3344923d9e7a6af106b81d16412e27c7020280 100644 (file)
@@ -3,12 +3,15 @@ import os
 import shutil
 from threading import Timer
 from typing import Dict, List, Optional
+from urllib.parse import quote
 
 from knot_resolver.constants import KAFKA_LIB
 from knot_resolver.datamodel import KresConfig
 from knot_resolver.manager.config_store import ConfigStore
-from knot_resolver.manager.triggers import trigger_reload
+from knot_resolver.manager.triggers import trigger_renew
 from knot_resolver.utils.functional import Result
+from knot_resolver.utils.modeling.parsing import DataFormat, data_combine, parse_json
+from knot_resolver.utils.requests import SocketDesc, request
 
 logger = logging.getLogger(__name__)
 
@@ -36,7 +39,7 @@ if KAFKA_LIB:
             if self._consumer:
                 self._consumer.close()
 
-        def _consume(self) -> None:
+        def _consume(self) -> None:  # noqa: PLR0915
             if not self._consumer:
                 return
 
@@ -47,10 +50,47 @@ if KAFKA_LIB:
                 for record in records:
                     try:
                         key: str = record.key.decode("utf-8")
-                        key_split = key.split(":")
                         value: str = record.value.decode("utf-8")
 
+                        # messages without key
+                        # config
+                        if not key:
+                            config_orig = self._config.get_unparsed_data()
+                            parsed = parse_json(value)
+                            config_new = data_combine(config_orig, parsed)
+
+                            file_name = "config.kafka.json"
+                            file_name_tmp = f"{file_name}.tmp"
+                            file_name_backup = f"{file_name}.backup"
+                            shutil.copy(file_name, file_name_backup)
+                            with open(file_name_tmp, "w") as file:
+                                file.write(value)
+
+                            management = self._config.management
+                            socket = SocketDesc(
+                                f'http+unix://{quote(str(management.unix_socket), safe="")}/',
+                                'Key "/management/unix-socket" in validated configuration',
+                            )
+                            if management.interface:
+                                socket = SocketDesc(
+                                    f"http://{management.interface.addr}:{management.interface.port}",
+                                    'Key "/management/interface" in validated configuration',
+                                )
+
+                            body = DataFormat.JSON.dict_dump(config_new)
+                            response = request(socket, "PUT", "v1/config", body)
+
+                            if response.status != 200:
+                                logger.error(f"Failed to apply new config:\n{response.body}")
+                                continue
+                            os.replace(file_name_tmp, file_name)
+                            continue
+
+                        # messages with key
+                        # RPZ or other files
+
                         logger.info(f"Received message with '{key}' key")
+                        key_split = key.split(":")
 
                         # prepare files names
                         file_name = key_split[0]
@@ -75,12 +115,12 @@ if KAFKA_LIB:
                             os.replace(file_name, file_name_tmp)
                             logger.info(f"Saved data to '{file_name}'")
 
-                            # trigger delayed configuration reload
-                            trigger_reload(self._config)
+                            # trigger delayed configuration renew
+                            trigger_renew(self._config)
                         else:
                             logger.error("Failed to parse message key")
                     except Exception as e:
-                        logger.error(f"Processing message failed with error: {e}")
+                        logger.error(f"Processing message failed with error: \n{e}")
                         continue
 
             self._consumer_timer = Timer(5, self._consume)