import logging
from threading import Timer
-from typing import Any, Dict, Optional
+from typing import Dict, List, Optional
from knot_resolver.constants import KAFKA_LIB
from knot_resolver.datamodel import KresConfig
if KAFKA_LIB:
from kafka import KafkaConsumer # type: ignore[import-untyped]
+ from kafka.consumer.fetcher import ConsumerRecord # type: ignore[import-untyped]
from kafka.errors import NoBrokersAvailable # type: ignore[import-untyped]
+ from kafka.structs import TopicPartition # type: ignore[import-untyped]
_kafka: Optional["KresKafkaClient"] = None
return
logger.info("Consuming...")
- messages: Dict[Any, Any] = self._consumer.poll()
- logger.info(messages)
+ messages: Dict[TopicPartition, List[ConsumerRecord]] = self._consumer.poll()
+
+ for _partition, records in messages.items():
+ for record in records:
+ key: str = record.key.decode("utf-8")
+ value: str = record.value.decode("utf-8")
+
+ # views and rpz config
+ if key == "config":
+ pass
+ # start of rpz file
+ elif key[-2] == "." and key[-1].isdigit():
+ pass
+ # end of rpz file
+ elif key.endswith(".END") and value is None:
+ pass
self._consumer_timer = Timer(5, self._consume)
self._consumer_timer.start()