]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager/kafka_client.py: iterate over consumed messages
authorAleš Mrázek <ales.mrazek@nic.cz>
Mon, 14 Jul 2025 14:32:22 +0000 (16:32 +0200)
committerAleš Mrázek <ales.mrazek@nic.cz>
Mon, 20 Oct 2025 11:53:45 +0000 (13:53 +0200)
python/knot_resolver/manager/kafka_client.py

index bbcec98cce6b1e1428b9b9a2c27d54038cb84285..972d3ab4e6d656264b8c3d3c30f3c5e95ff25512 100644 (file)
@@ -1,6 +1,6 @@
 import logging
 from threading import Timer
-from typing import Any, Dict, Optional
+from typing import Dict, List, Optional
 
 from knot_resolver.constants import KAFKA_LIB
 from knot_resolver.datamodel import KresConfig
@@ -12,7 +12,9 @@ logger = logging.getLogger(__name__)
 
 if KAFKA_LIB:
     from kafka import KafkaConsumer  # type: ignore[import-untyped]
+    from kafka.consumer.fetcher import ConsumerRecord  # type: ignore[import-untyped]
     from kafka.errors import NoBrokersAvailable  # type: ignore[import-untyped]
+    from kafka.structs import TopicPartition  # type: ignore[import-untyped]
 
     _kafka: Optional["KresKafkaClient"] = None
 
@@ -36,8 +38,22 @@ if KAFKA_LIB:
                 return
 
             logger.info("Consuming...")
-            messages: Dict[Any, Any] = self._consumer.poll()
-            logger.info(messages)
+            messages: Dict[TopicPartition, List[ConsumerRecord]] = self._consumer.poll()
+
+            for _partition, records in messages.items():
+                for record in records:
+                    key: str = record.key.decode("utf-8")
+                    value: str = record.value.decode("utf-8")
+
+                    # views and rpz config
+                    if key == "config":
+                        pass
+                    # start of rpz file
+                    elif key[-2] == "." and key[-1].isdigit():
+                        pass
+                    # end of rpz file
+                    elif key.endswith(".END") and value is None:
+                        pass
 
             self._consumer_timer = Timer(5, self._consume)
             self._consumer_timer.start()