From 4cc6c576080f01ec6a1642effa64910a355e8356 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ale=C5=A1=20Mr=C3=A1zek?= Date: Wed, 30 Jul 2025 15:33:38 +0200 Subject: [PATCH] fixup! manager/kafka_client.py: use headers instead of parsing message key --- python/knot_resolver/manager/kafka_client.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/python/knot_resolver/manager/kafka_client.py b/python/knot_resolver/manager/kafka_client.py index 0e32b614e..ca0246866 100644 --- a/python/knot_resolver/manager/kafka_client.py +++ b/python/knot_resolver/manager/kafka_client.py @@ -45,7 +45,7 @@ if KAFKA_LIB: if self._consumer: self._consumer.close() - def _consume(self) -> None: # noqa: PLR0912 + def _consume(self) -> None: # noqa: PLR0912, PLR0915 if not self._consumer: return @@ -70,7 +70,7 @@ if KAFKA_LIB: continue if not headers.file_name: - logger.error("Missing 'file-name' header") + logger.error("Missing 'file-name' message header") continue # prepare files names @@ -93,7 +93,9 @@ if KAFKA_LIB: shutil.copy(file_path, file_path_backup) logger.debug(f"Created backup of '{file_path_backup}' file") - with open(file_path_tmp, "w") as file: + # rewrite only on first part, else append + mode = "w" if int(headers.chunk_index) == 1 else "a" + with open(file_path_tmp, mode) as file: file.write(value) config_extensions = (".json", ".yaml", ".yml") @@ -104,6 +106,7 @@ if KAFKA_LIB: os.replace(file_path_tmp, file_path) logger.info(f"Saved data to '{file_path}'") + # config files must be reloaded if file_extension in config_extensions: # trigger delayed configuration reload trigger_reload(self._config) @@ -117,7 +120,6 @@ if KAFKA_LIB: with open(file_path_tmp, mode) as file: file.write(value) logger.debug(f"Saved part {headers.chunk_index} of data to '{file_path_tmp}' file") - except Exception as e: logger.error(f"Processing message failed with error: \n{e}") continue -- 2.47.2