]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
fixup! manager/kafka_client.py: callback and verifier fix docs-jezek-test-jq0zac/deployments/7752
authorAleš Mrázek <ales.mrazek@nic.cz>
Mon, 13 Oct 2025 12:17:47 +0000 (14:17 +0200)
committerAleš Mrázek <ales.mrazek@nic.cz>
Mon, 13 Oct 2025 12:17:47 +0000 (14:17 +0200)
python/knot_resolver/manager/kafka_client.py

index c7c2955a79f01fe14b0a3b6c52ba08330ccec1ce..a33f8e667451e553154dbee2c666bf4929c7d5e5 100644 (file)
@@ -6,7 +6,7 @@ from typing import Any, Dict, List, Optional, Tuple
 
 from knot_resolver.constants import KAFKA_LIB
 from knot_resolver.datamodel import KresConfig
-from knot_resolver.manager.config_store import ConfigStore, only_on_real_changes_update
+from knot_resolver.manager.config_store import ConfigStore
 from knot_resolver.manager.exceptions import KresKafkaClientError
 from knot_resolver.manager.triggers import trigger_reload, trigger_renew
 from knot_resolver.utils.functional import Result
@@ -327,16 +327,6 @@ if KAFKA_LIB:
                 self._consumer_timer.start()
 
 
-@only_on_real_changes_update(kafka_config)
-async def _init_kafka_client(config: KresConfig, _force: bool = False) -> None:
-    if KAFKA_LIB and config.kafka.enable:
-        global _kafka
-        if _kafka:
-            _kafka.deinit()
-        logger.info("Initializing Kafka client")
-        _kafka = KresKafkaClient(config)
-
-
 async def _deny_kafka_change(old_config: KresConfig, new_config: KresConfig, _force: bool = False) -> Result[None, str]:
     if old_config.kafka != new_config.kafka:
         return Result.err("Changing 'kafka' configuration is not allowed at runtime.")
@@ -344,7 +334,14 @@ async def _deny_kafka_change(old_config: KresConfig, new_config: KresConfig, _fo
 
 
 async def init_kafka_client(config_store: ConfigStore) -> None:
-    await config_store.register_on_change_callback(_init_kafka_client)
+    config = config_store.get()
+    if KAFKA_LIB and config.kafka.enable:
+        global _kafka
+        if _kafka:
+            _kafka.deinit()
+        logger.info("Initializing Kafka client")
+        _kafka = KresKafkaClient(config)
+    # await config_store.register_on_change_callback(_init_kafka_client)
     await config_store.register_verifier(_deny_kafka_change)