import shutil
from pathlib import Path
from threading import Timer
-from typing import Dict, List, Optional
+from typing import Any, Dict, List, Optional
from knot_resolver.constants import KAFKA_LIB
from knot_resolver.datamodel import KresConfig
-from knot_resolver.manager.config_store import ConfigStore
+from knot_resolver.manager.config_store import ConfigStore, only_on_real_changes_update
from knot_resolver.manager.triggers import trigger_reload, trigger_renew
from knot_resolver.utils.functional import Result
from knot_resolver.utils.modeling.parsing import parse_json
logger = logging.getLogger(__name__)
+def kafka_config(config: KresConfig) -> List[Any]:
+ return [
+ config.hostname,
+ config.kafka,
+ ]
+
+
if KAFKA_LIB:
from kafka import KafkaConsumer # type: ignore[import-untyped,import-not-found]
from kafka.consumer.fetcher import ConsumerRecord # type: ignore[import-untyped,import-not-found]
self._consumer = None
+@only_on_real_changes_update(kafka_config)
+async def _init_kafka_client(config: KresConfig) -> None:
+ if KAFKA_LIB:
+ global _kafka
+ if _kafka:
+ _kafka.deinit()
+ logger.info("Initializing Kafka client")
+ _kafka = KresKafkaClient(config)
+
+
async def _deny_kafka_change(old_config: KresConfig, new_config: KresConfig) -> Result[None, str]:
if old_config.kafka != new_config.kafka:
return Result.err("Changing 'kafka' configuration is not allowed at runtime.")
async def init_kafka_client(config_store: ConfigStore) -> None:
- config = config_store.get()
-
- if config.kafka.enable and KAFKA_LIB:
- logger.info("Initializing Kafka client")
- global _kafka
- _kafka = KresKafkaClient(config)
- await config_store.register_verifier(_deny_kafka_change)
+ await config_store.register_on_change_callback(_init_kafka_client)
+ await config_store.register_verifier(_deny_kafka_change)
def deinit_kafka_client() -> None: