]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
fixup! manager/kafka_client.py: use headers instead of parsing message key docs-jezek-test-jq0zac/deployments/7312
authorAleš Mrázek <ales.mrazek@nic.cz>
Wed, 30 Jul 2025 13:33:38 +0000 (15:33 +0200)
committerAleš Mrázek <ales.mrazek@nic.cz>
Wed, 30 Jul 2025 13:33:38 +0000 (15:33 +0200)
python/knot_resolver/manager/kafka_client.py

index 0e32b614e75d633670bf6b50f4a3cb9169d76cde..ca02468665277fe584ed0f460ecf8a7d2c81723c 100644 (file)
@@ -45,7 +45,7 @@ if KAFKA_LIB:
             if self._consumer:
                 self._consumer.close()
 
-        def _consume(self) -> None:  # noqa: PLR0912
+        def _consume(self) -> None:  # noqa: PLR0912, PLR0915
             if not self._consumer:
                 return
 
@@ -70,7 +70,7 @@ if KAFKA_LIB:
                             continue
 
                         if not headers.file_name:
-                            logger.error("Missing 'file-name' header")
+                            logger.error("Missing 'file-name' message header")
                             continue
 
                         # prepare files names
@@ -93,7 +93,9 @@ if KAFKA_LIB:
                                 shutil.copy(file_path, file_path_backup)
                                 logger.debug(f"Created backup of '{file_path_backup}' file")
 
-                            with open(file_path_tmp, "w") as file:
+                            # rewrite only on first part, else append
+                            mode = "w" if int(headers.chunk_index) == 1 else "a"
+                            with open(file_path_tmp, mode) as file:
                                 file.write(value)
 
                             config_extensions = (".json", ".yaml", ".yml")
@@ -104,6 +106,7 @@ if KAFKA_LIB:
                             os.replace(file_path_tmp, file_path)
                             logger.info(f"Saved data to '{file_path}'")
 
+                            # config files must be reloaded
                             if file_extension in config_extensions:
                                 # trigger delayed configuration reload
                                 trigger_reload(self._config)
@@ -117,7 +120,6 @@ if KAFKA_LIB:
                             with open(file_path_tmp, mode) as file:
                                 file.write(value)
                             logger.debug(f"Saved part {headers.chunk_index} of data to '{file_path_tmp}' file")
-
                     except Exception as e:
                         logger.error(f"Processing message failed with error: \n{e}")
                         continue