]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager/kafka_client.py: support for binary files
authorAleš Mrázek <ales.mrazek@nic.cz>
Wed, 8 Oct 2025 12:41:30 +0000 (14:41 +0200)
committerVladimír Čunát <vladimir.cunat@nic.cz>
Thu, 9 Oct 2025 09:04:36 +0000 (11:04 +0200)
python/knot_resolver/manager/kafka_client.py

index d8ab89463d81a258a93f38382fd0eecc9d39658f..a6cc2cd9ccbe5d49801a57d4d9843f99ae373a9a 100644 (file)
@@ -30,6 +30,7 @@ if KAFKA_LIB:
     from kafka.structs import TopicPartition  # type: ignore[import-untyped,import-not-found]
 
     config_file_extensions = (".json", ".yaml", ".yml")
+    binary_file_extensions = ".pt"
 
     _kafka: Optional["KresKafkaClient"] = None
 
@@ -125,7 +126,7 @@ if KAFKA_LIB:
 
     def process_record(config: KresConfig, record: ConsumerRecord) -> None:  # noqa: PLR0912, PLR0915
         key: str = record.key.decode("utf-8")
-        value: str = record.value.decode("utf-8")
+        value: bytes = record.value
         headers = Headers(record.headers)
 
         logger.info(f"Received message with '{key}' key")
@@ -154,7 +155,7 @@ if KAFKA_LIB:
 
         # received complete data in one message
         if not index and not total or index == 1 and total == 1:
-            with open(file_tmp_path, "w") as file:
+            with open(file_tmp_path, "wb") as file:
                 file.write(value)
             logger.debug(f"Saved complete data to '{file_tmp_path}' file")
             file_is_ready = True
@@ -164,7 +165,7 @@ if KAFKA_LIB:
             file_chunk_path = create_file_chunk_path(file_path, index)
             # create chunks dir if not exists
             file_chunk_path.parent.mkdir(exist_ok=True)
-            with open(file_chunk_path, "w") as file:
+            with open(file_chunk_path, "wb") as file:
                 file.write(value)
             logger.debug(f"Saved chunk {index} of data to '{file_chunk_path}' file")
 
@@ -197,7 +198,7 @@ if KAFKA_LIB:
             # configuration files (.yaml, .json, ...all)
             if file_extension in config_file_extensions:
                 # validate configuration
-                KresConfig(try_to_parse(value))
+                KresConfig(try_to_parse(value.decode("utf-8")))
 
                 # backup and replace file with new data
                 backup_and_replace(file_tmp_path, file_path)
@@ -208,12 +209,14 @@ if KAFKA_LIB:
                 # trigger reload
                 trigger_reload(config)
 
-            # other files (.rpz, ...)
+            # other files (.rpz, .pt, ...)
             else:
                 # backup and replace file with new data
                 backup_and_replace(file_tmp_path, file_path)
                 # trigger renew
                 trigger_renew(config)
+                if file_extension in binary_file_extensions:
+                    pass
 
     logger.info("Successfully processed message")