From: Aleš Mrázek Date: Thu, 31 Jul 2025 09:27:14 +0000 (+0200) Subject: manager/kafka_client.py: init kafka using config_store callback X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=refs%2Fheads%2Fjezek-test;p=thirdparty%2Fknot-resolver.git manager/kafka_client.py: init kafka using config_store callback --- diff --git a/python/knot_resolver/manager/kafka_client.py b/python/knot_resolver/manager/kafka_client.py index 89b60894a..a777428bd 100644 --- a/python/knot_resolver/manager/kafka_client.py +++ b/python/knot_resolver/manager/kafka_client.py @@ -3,11 +3,11 @@ import os import shutil from pathlib import Path from threading import Timer -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional from knot_resolver.constants import KAFKA_LIB from knot_resolver.datamodel import KresConfig -from knot_resolver.manager.config_store import ConfigStore +from knot_resolver.manager.config_store import ConfigStore, only_on_real_changes_update from knot_resolver.manager.triggers import trigger_reload, trigger_renew from knot_resolver.utils.functional import Result from knot_resolver.utils.modeling.parsing import parse_json @@ -15,6 +15,13 @@ from knot_resolver.utils.modeling.parsing import parse_json logger = logging.getLogger(__name__) +def kafka_config(config: KresConfig) -> List[Any]: + return [ + config.hostname, + config.kafka, + ] + + if KAFKA_LIB: from kafka import KafkaConsumer # type: ignore[import-untyped,import-not-found] from kafka.consumer.fetcher import ConsumerRecord # type: ignore[import-untyped,import-not-found] @@ -154,6 +161,16 @@ if KAFKA_LIB: self._consumer = None +@only_on_real_changes_update(kafka_config) +async def _init_kafka_client(config: KresConfig) -> None: + if KAFKA_LIB: + global _kafka + if _kafka: + _kafka.deinit() + logger.info("Initializing Kafka client") + _kafka = KresKafkaClient(config) + + async def _deny_kafka_change(old_config: KresConfig, new_config: KresConfig) -> Result[None, str]: if old_config.kafka != new_config.kafka: return Result.err("Changing 'kafka' configuration is not allowed at runtime.") @@ -161,13 +178,8 @@ async def _deny_kafka_change(old_config: KresConfig, new_config: KresConfig) -> async def init_kafka_client(config_store: ConfigStore) -> None: - config = config_store.get() - - if config.kafka.enable and KAFKA_LIB: - logger.info("Initializing Kafka client") - global _kafka - _kafka = KresKafkaClient(config) - await config_store.register_verifier(_deny_kafka_change) + await config_store.register_on_change_callback(_init_kafka_client) + await config_store.register_verifier(_deny_kafka_change) def deinit_kafka_client() -> None: