]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager/kafka_client.py: use asyncio.Task for consumer and consuming messages docs-jezek-test-jq0zac/deployments/7900 jezek-test
authorAleš Mrázek <ales.mrazek@nic.cz>
Thu, 6 Nov 2025 09:21:19 +0000 (10:21 +0100)
committerAleš Mrázek <ales.mrazek@nic.cz>
Thu, 6 Nov 2025 09:39:53 +0000 (10:39 +0100)
python/knot_resolver/manager/kafka_client.py

index 27b1446f14cb32a8abd2161bb18fc2ae3bbe2b28..d2b710a7dfc8c03033baf0329e730df36517ee0a 100644 (file)
@@ -1,8 +1,8 @@
+import asyncio
 import json
 import logging
 import shutil
 from pathlib import Path
-from threading import Timer
 from typing import Any, Dict, List, Optional, Tuple
 
 from knot_resolver.constants import KAFKA_LIB
@@ -10,6 +10,7 @@ from knot_resolver.datamodel import KresConfig
 from knot_resolver.manager.config_store import ConfigStore
 from knot_resolver.manager.exceptions import KresKafkaClientError
 from knot_resolver.manager.triggers import trigger_reload
+from knot_resolver.utils import compat
 from knot_resolver.utils.functional import Result
 from knot_resolver.utils.modeling import try_to_parse
 from knot_resolver.utils.modeling.exceptions import DataParsingError, DataValidationError
@@ -259,8 +260,8 @@ if KAFKA_LIB:
     class KresKafkaClient:
         def __init__(self, config: KresConfig) -> None:
             self._config = config
-            self._consumer_timer: Optional[Timer] = None
             self._consumer: Optional[KafkaConsumer] = None
+            self._consumer_task: Optional["asyncio.Task[None]"] = None
             self._producer: Optional[KafkaProducer] = None
 
             # reduce the verbosity of kafka module logger
@@ -273,9 +274,13 @@ if KAFKA_LIB:
                 broker = str(server)
                 brokers.append(broker.replace("@", ":") if server.port else f"{broker}:9092")
             self._brokers: List[str] = brokers
-            self._consumer_run()
             self._producer_connect()
 
+            if compat.asyncio.is_event_loop_running():
+                self._consumer_task = compat.asyncio.create_task(self._consumer_run())
+            else:
+                self._consumer_task = compat.asyncio.run(self._consumer_run())
+
         def _consumer_connect(self) -> None:
             error_msg_prefix = f"Connecting consumer to Kafka broker(s) '{self._brokers}' has failed with"
             config_kafka = self._config.kafka
@@ -305,8 +310,8 @@ if KAFKA_LIB:
                 logger.error(f"{error_msg_prefix} unknown error:\n{e}")
 
         def deinit(self) -> None:
-            if self._consumer_timer:
-                self._consumer_timer.cancel()
+            if self._consumer_task:
+                self._consumer_task.cancel()
             if self._consumer:
                 self._consumer.close()
                 self._consumer = None
@@ -334,38 +339,28 @@ if KAFKA_LIB:
             except Exception as e:
                 logger.error(f"{error_msg_prefix} unknown error:\n{e}")
 
-        def _consumer_run(self) -> None:
-            keep_consuming = False
-
-            if not self._consumer:
-                # connect to brokers
-                self._consumer_connect()
-            else:
-                # ready to consume messages
-                error_msg_prefix = "Consuming messages failed with"
-                try:
-                    logger.info("Started consuming messages...")
-                    messages: Dict[TopicPartition, List[ConsumerRecord]] = self._consumer.poll(timeout_ms=100)
-                    logger.debug(f"Successfully consumed {len(messages)} messages")
-                except KafkaError as e:
-                    logger.error(f"{error_msg_prefix} Kafka error:\n{e}")
-                    self._consumer_connect()
-                except Exception as e:
-                    logger.error(f"{error_msg_prefix} unknown error:\n{e}")
+        async def _consumer_run(self) -> None:
+            while True:
+                if not self._consumer:
+                    # connect to brokers
                     self._consumer_connect()
                 else:
-                    # ready to process messages
-                    process_messages(messages, self._config)
-                    if messages:
-                        keep_consuming = True
-
-            if keep_consuming:
-                # keep consuming if received messages
-                self._consumer_run()
-            else:
-                # else start new timer
-                self._consumer_timer = Timer(5, self._consumer_run)
-                self._consumer_timer.start()
+                    # ready to consume messages
+                    error_msg_prefix = "Consuming messages failed with"
+                    try:
+                        logger.info("Started consuming messages...")
+                        messages: Dict[TopicPartition, List[ConsumerRecord]] = self._consumer.poll(timeout_ms=100)
+                        logger.debug(f"Successfully consumed {len(messages)} messages")
+                        # ready to process messages
+                        process_messages(messages, self._config)
+                        if not messages:
+                            await asyncio.sleep(10)
+                    except KafkaError as e:
+                        logger.error(f"{error_msg_prefix} Kafka error:\n{e}")
+                        self._consumer_connect()
+                    except Exception as e:
+                        logger.error(f"{error_msg_prefix} unknown error:\n{e}")
+                        self._consumer_connect()
 
 
 async def _deny_kafka_change(old_config: KresConfig, new_config: KresConfig, _force: bool = False) -> Result[None, str]: