+import asyncio
import json
import logging
import shutil
from pathlib import Path
-from threading import Timer
from typing import Any, Dict, List, Optional, Tuple
from knot_resolver.constants import KAFKA_LIB
from knot_resolver.manager.config_store import ConfigStore
from knot_resolver.manager.exceptions import KresKafkaClientError
from knot_resolver.manager.triggers import trigger_reload
+from knot_resolver.utils import compat
from knot_resolver.utils.functional import Result
from knot_resolver.utils.modeling import try_to_parse
from knot_resolver.utils.modeling.exceptions import DataParsingError, DataValidationError
class KresKafkaClient:
def __init__(self, config: KresConfig) -> None:
self._config = config
- self._consumer_timer: Optional[Timer] = None
self._consumer: Optional[KafkaConsumer] = None
+ self._consumer_task: Optional["asyncio.Task[None]"] = None
self._producer: Optional[KafkaProducer] = None
# reduce the verbosity of kafka module logger
broker = str(server)
brokers.append(broker.replace("@", ":") if server.port else f"{broker}:9092")
self._brokers: List[str] = brokers
- self._consumer_run()
self._producer_connect()
+ if compat.asyncio.is_event_loop_running():
+ self._consumer_task = compat.asyncio.create_task(self._consumer_run())
+ else:
+ self._consumer_task = compat.asyncio.run(self._consumer_run())
+
def _consumer_connect(self) -> None:
error_msg_prefix = f"Connecting consumer to Kafka broker(s) '{self._brokers}' has failed with"
config_kafka = self._config.kafka
logger.error(f"{error_msg_prefix} unknown error:\n{e}")
def deinit(self) -> None:
- if self._consumer_timer:
- self._consumer_timer.cancel()
+ if self._consumer_task:
+ self._consumer_task.cancel()
if self._consumer:
self._consumer.close()
self._consumer = None
except Exception as e:
logger.error(f"{error_msg_prefix} unknown error:\n{e}")
- def _consumer_run(self) -> None:
- keep_consuming = False
-
- if not self._consumer:
- # connect to brokers
- self._consumer_connect()
- else:
- # ready to consume messages
- error_msg_prefix = "Consuming messages failed with"
- try:
- logger.info("Started consuming messages...")
- messages: Dict[TopicPartition, List[ConsumerRecord]] = self._consumer.poll(timeout_ms=100)
- logger.debug(f"Successfully consumed {len(messages)} messages")
- except KafkaError as e:
- logger.error(f"{error_msg_prefix} Kafka error:\n{e}")
- self._consumer_connect()
- except Exception as e:
- logger.error(f"{error_msg_prefix} unknown error:\n{e}")
+ async def _consumer_run(self) -> None:
+ while True:
+ if not self._consumer:
+ # connect to brokers
self._consumer_connect()
else:
- # ready to process messages
- process_messages(messages, self._config)
- if messages:
- keep_consuming = True
-
- if keep_consuming:
- # keep consuming if received messages
- self._consumer_run()
- else:
- # else start new timer
- self._consumer_timer = Timer(5, self._consumer_run)
- self._consumer_timer.start()
+ # ready to consume messages
+ error_msg_prefix = "Consuming messages failed with"
+ try:
+ logger.info("Started consuming messages...")
+ messages: Dict[TopicPartition, List[ConsumerRecord]] = self._consumer.poll(timeout_ms=100)
+ logger.debug(f"Successfully consumed {len(messages)} messages")
+ # ready to process messages
+ process_messages(messages, self._config)
+ if not messages:
+ await asyncio.sleep(10)
+ except KafkaError as e:
+ logger.error(f"{error_msg_prefix} Kafka error:\n{e}")
+ self._consumer_connect()
+ except Exception as e:
+ logger.error(f"{error_msg_prefix} unknown error:\n{e}")
+ self._consumer_connect()
async def _deny_kafka_change(old_config: KresConfig, new_config: KresConfig, _force: bool = False) -> Result[None, str]: