]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager/kafka_client.py: init kafka using config_store callback docs-jezek-test-jq0zac/deployments/7317 jezek-test
authorAleš Mrázek <ales.mrazek@nic.cz>
Thu, 31 Jul 2025 09:27:14 +0000 (11:27 +0200)
committerAleš Mrázek <ales.mrazek@nic.cz>
Thu, 31 Jul 2025 09:27:14 +0000 (11:27 +0200)
python/knot_resolver/manager/kafka_client.py

index 89b60894a5568d1436ddf4fc7feb952c649d47de..a777428bdeaed605c1c2f39e7ee32c73d00afc0a 100644 (file)
@@ -3,11 +3,11 @@ import os
 import shutil
 from pathlib import Path
 from threading import Timer
-from typing import Dict, List, Optional
+from typing import Any, Dict, List, Optional
 
 from knot_resolver.constants import KAFKA_LIB
 from knot_resolver.datamodel import KresConfig
-from knot_resolver.manager.config_store import ConfigStore
+from knot_resolver.manager.config_store import ConfigStore, only_on_real_changes_update
 from knot_resolver.manager.triggers import trigger_reload, trigger_renew
 from knot_resolver.utils.functional import Result
 from knot_resolver.utils.modeling.parsing import parse_json
@@ -15,6 +15,13 @@ from knot_resolver.utils.modeling.parsing import parse_json
 logger = logging.getLogger(__name__)
 
 
+def kafka_config(config: KresConfig) -> List[Any]:
+    return [
+        config.hostname,
+        config.kafka,
+    ]
+
+
 if KAFKA_LIB:
     from kafka import KafkaConsumer  # type: ignore[import-untyped,import-not-found]
     from kafka.consumer.fetcher import ConsumerRecord  # type: ignore[import-untyped,import-not-found]
@@ -154,6 +161,16 @@ if KAFKA_LIB:
                 self._consumer = None
 
 
+@only_on_real_changes_update(kafka_config)
+async def _init_kafka_client(config: KresConfig) -> None:
+    if KAFKA_LIB:
+        global _kafka
+        if _kafka:
+            _kafka.deinit()
+        logger.info("Initializing Kafka client")
+        _kafka = KresKafkaClient(config)
+
+
 async def _deny_kafka_change(old_config: KresConfig, new_config: KresConfig) -> Result[None, str]:
     if old_config.kafka != new_config.kafka:
         return Result.err("Changing 'kafka' configuration is not allowed at runtime.")
@@ -161,13 +178,8 @@ async def _deny_kafka_change(old_config: KresConfig, new_config: KresConfig) ->
 
 
 async def init_kafka_client(config_store: ConfigStore) -> None:
-    config = config_store.get()
-
-    if config.kafka.enable and KAFKA_LIB:
-        logger.info("Initializing Kafka client")
-        global _kafka
-        _kafka = KresKafkaClient(config)
-        await config_store.register_verifier(_deny_kafka_change)
+    await config_store.register_on_change_callback(_init_kafka_client)
+    await config_store.register_verifier(_deny_kafka_change)
 
 
 def deinit_kafka_client() -> None: