From: Aleš Mrázek Date: Wed, 30 Jul 2025 13:33:38 +0000 (+0200) Subject: fixup! manager/kafka_client.py: use headers instead of parsing message key X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=refs%2Fenvironments%2Fdocs-jezek-test-jq0zac%2Fdeployments%2F7312;p=thirdparty%2Fknot-resolver.git fixup! manager/kafka_client.py: use headers instead of parsing message key --- diff --git a/python/knot_resolver/manager/kafka_client.py b/python/knot_resolver/manager/kafka_client.py index 0e32b614e..ca0246866 100644 --- a/python/knot_resolver/manager/kafka_client.py +++ b/python/knot_resolver/manager/kafka_client.py @@ -45,7 +45,7 @@ if KAFKA_LIB: if self._consumer: self._consumer.close() - def _consume(self) -> None: # noqa: PLR0912 + def _consume(self) -> None: # noqa: PLR0912, PLR0915 if not self._consumer: return @@ -70,7 +70,7 @@ if KAFKA_LIB: continue if not headers.file_name: - logger.error("Missing 'file-name' header") + logger.error("Missing 'file-name' message header") continue # prepare files names @@ -93,7 +93,9 @@ if KAFKA_LIB: shutil.copy(file_path, file_path_backup) logger.debug(f"Created backup of '{file_path_backup}' file") - with open(file_path_tmp, "w") as file: + # rewrite only on first part, else append + mode = "w" if int(headers.chunk_index) == 1 else "a" + with open(file_path_tmp, mode) as file: file.write(value) config_extensions = (".json", ".yaml", ".yml") @@ -104,6 +106,7 @@ if KAFKA_LIB: os.replace(file_path_tmp, file_path) logger.info(f"Saved data to '{file_path}'") + # config files must be reloaded if file_extension in config_extensions: # trigger delayed configuration reload trigger_reload(self._config) @@ -117,7 +120,6 @@ if KAFKA_LIB: with open(file_path_tmp, mode) as file: file.write(value) logger.debug(f"Saved part {headers.chunk_index} of data to '{file_path_tmp}' file") - except Exception as e: logger.error(f"Processing message failed with error: \n{e}") continue