from kafka.structs import TopicPartition # type: ignore[import-untyped,import-not-found]
config_file_extensions = (".json", ".yaml", ".yml")
+ binary_file_extensions = ".pt"
_kafka: Optional["KresKafkaClient"] = None
def process_record(config: KresConfig, record: ConsumerRecord) -> None: # noqa: PLR0912, PLR0915
key: str = record.key.decode("utf-8")
- value: str = record.value.decode("utf-8")
+ value: bytes = record.value
headers = Headers(record.headers)
logger.info(f"Received message with '{key}' key")
# received complete data in one message
if not index and not total or index == 1 and total == 1:
- with open(file_tmp_path, "w") as file:
+ with open(file_tmp_path, "wb") as file:
file.write(value)
logger.debug(f"Saved complete data to '{file_tmp_path}' file")
file_is_ready = True
file_chunk_path = create_file_chunk_path(file_path, index)
# create chunks dir if not exists
file_chunk_path.parent.mkdir(exist_ok=True)
- with open(file_chunk_path, "w") as file:
+ with open(file_chunk_path, "wb") as file:
file.write(value)
logger.debug(f"Saved chunk {index} of data to '{file_chunk_path}' file")
# configuration files (.yaml, .json, ...all)
if file_extension in config_file_extensions:
# validate configuration
- KresConfig(try_to_parse(value))
+ KresConfig(try_to_parse(value.decode("utf-8")))
# backup and replace file with new data
backup_and_replace(file_tmp_path, file_path)
# trigger reload
trigger_reload(config)
- # other files (.rpz, ...)
+ # other files (.rpz, .pt, ...)
else:
# backup and replace file with new data
backup_and_replace(file_tmp_path, file_path)
# trigger renew
trigger_renew(config)
+ if file_extension in binary_file_extensions:
+ pass
logger.info("Successfully processed message")