from knot_resolver.constants import KAFKA_LIB
from knot_resolver.datamodel import KresConfig
-from knot_resolver.manager.config_store import ConfigStore, only_on_real_changes_update
+from knot_resolver.manager.config_store import ConfigStore
from knot_resolver.manager.exceptions import KresKafkaClientError
from knot_resolver.manager.triggers import trigger_reload, trigger_renew
from knot_resolver.utils.functional import Result
self._consumer_timer.start()
-@only_on_real_changes_update(kafka_config)
-async def _init_kafka_client(config: KresConfig, _force: bool = False) -> None:
- if KAFKA_LIB and config.kafka.enable:
- global _kafka
- if _kafka:
- _kafka.deinit()
- logger.info("Initializing Kafka client")
- _kafka = KresKafkaClient(config)
-
-
async def _deny_kafka_change(old_config: KresConfig, new_config: KresConfig, _force: bool = False) -> Result[None, str]:
if old_config.kafka != new_config.kafka:
return Result.err("Changing 'kafka' configuration is not allowed at runtime.")
async def init_kafka_client(config_store: ConfigStore) -> None:
- await config_store.register_on_change_callback(_init_kafka_client)
+ config = config_store.get()
+ if KAFKA_LIB and config.kafka.enable:
+ global _kafka
+ if _kafka:
+ _kafka.deinit()
+ logger.info("Initializing Kafka client")
+ _kafka = KresKafkaClient(config)
+ # await config_store.register_on_change_callback(_init_kafka_client)
await config_store.register_verifier(_deny_kafka_change)