From: Aleš Mrázek Date: Thu, 6 Nov 2025 09:21:19 +0000 (+0100) Subject: manager/kafka_client.py: use asyncio.Task for consumer and consuming messages X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=refs%2Fheads%2Fjezek-test;p=thirdparty%2Fknot-resolver.git manager/kafka_client.py: use asyncio.Task for consumer and consuming messages --- diff --git a/python/knot_resolver/manager/kafka_client.py b/python/knot_resolver/manager/kafka_client.py index 27b1446f1..d2b710a7d 100644 --- a/python/knot_resolver/manager/kafka_client.py +++ b/python/knot_resolver/manager/kafka_client.py @@ -1,8 +1,8 @@ +import asyncio import json import logging import shutil from pathlib import Path -from threading import Timer from typing import Any, Dict, List, Optional, Tuple from knot_resolver.constants import KAFKA_LIB @@ -10,6 +10,7 @@ from knot_resolver.datamodel import KresConfig from knot_resolver.manager.config_store import ConfigStore from knot_resolver.manager.exceptions import KresKafkaClientError from knot_resolver.manager.triggers import trigger_reload +from knot_resolver.utils import compat from knot_resolver.utils.functional import Result from knot_resolver.utils.modeling import try_to_parse from knot_resolver.utils.modeling.exceptions import DataParsingError, DataValidationError @@ -259,8 +260,8 @@ if KAFKA_LIB: class KresKafkaClient: def __init__(self, config: KresConfig) -> None: self._config = config - self._consumer_timer: Optional[Timer] = None self._consumer: Optional[KafkaConsumer] = None + self._consumer_task: Optional["asyncio.Task[None]"] = None self._producer: Optional[KafkaProducer] = None # reduce the verbosity of kafka module logger @@ -273,9 +274,13 @@ if KAFKA_LIB: broker = str(server) brokers.append(broker.replace("@", ":") if server.port else f"{broker}:9092") self._brokers: List[str] = brokers - self._consumer_run() self._producer_connect() + if compat.asyncio.is_event_loop_running(): + self._consumer_task = compat.asyncio.create_task(self._consumer_run()) + else: + self._consumer_task = compat.asyncio.run(self._consumer_run()) + def _consumer_connect(self) -> None: error_msg_prefix = f"Connecting consumer to Kafka broker(s) '{self._brokers}' has failed with" config_kafka = self._config.kafka @@ -305,8 +310,8 @@ if KAFKA_LIB: logger.error(f"{error_msg_prefix} unknown error:\n{e}") def deinit(self) -> None: - if self._consumer_timer: - self._consumer_timer.cancel() + if self._consumer_task: + self._consumer_task.cancel() if self._consumer: self._consumer.close() self._consumer = None @@ -334,38 +339,28 @@ if KAFKA_LIB: except Exception as e: logger.error(f"{error_msg_prefix} unknown error:\n{e}") - def _consumer_run(self) -> None: - keep_consuming = False - - if not self._consumer: - # connect to brokers - self._consumer_connect() - else: - # ready to consume messages - error_msg_prefix = "Consuming messages failed with" - try: - logger.info("Started consuming messages...") - messages: Dict[TopicPartition, List[ConsumerRecord]] = self._consumer.poll(timeout_ms=100) - logger.debug(f"Successfully consumed {len(messages)} messages") - except KafkaError as e: - logger.error(f"{error_msg_prefix} Kafka error:\n{e}") - self._consumer_connect() - except Exception as e: - logger.error(f"{error_msg_prefix} unknown error:\n{e}") + async def _consumer_run(self) -> None: + while True: + if not self._consumer: + # connect to brokers self._consumer_connect() else: - # ready to process messages - process_messages(messages, self._config) - if messages: - keep_consuming = True - - if keep_consuming: - # keep consuming if received messages - self._consumer_run() - else: - # else start new timer - self._consumer_timer = Timer(5, self._consumer_run) - self._consumer_timer.start() + # ready to consume messages + error_msg_prefix = "Consuming messages failed with" + try: + logger.info("Started consuming messages...") + messages: Dict[TopicPartition, List[ConsumerRecord]] = self._consumer.poll(timeout_ms=100) + logger.debug(f"Successfully consumed {len(messages)} messages") + # ready to process messages + process_messages(messages, self._config) + if not messages: + await asyncio.sleep(10) + except KafkaError as e: + logger.error(f"{error_msg_prefix} Kafka error:\n{e}") + self._consumer_connect() + except Exception as e: + logger.error(f"{error_msg_prefix} unknown error:\n{e}") + self._consumer_connect() async def _deny_kafka_change(old_config: KresConfig, new_config: KresConfig, _force: bool = False) -> Result[None, str]: