)
continue
- if not headers.file_name:
- logger.error("Missing 'file-name' message header")
- continue
-
# prepare files names
- file_path = Path(headers.file_name)
+ file_name = headers.file_name if headers.file_name else key
+ file_path = Path(file_name)
if not file_path.is_absolute():
file_path = self._config.kafka.files_dir.to_path() / file_path
file_path_tmp = f"{file_path}.tmp"
file_path_backup = f"{file_path}.backup"
- _, file_extension = os.path.splitext(headers.file_name)
+ _, file_extension = os.path.splitext(file_name)
- if not headers.chunk_index:
- logger.error("Missing 'chunk-index' message header")
- elif not headers.total_chunks:
- logger.error("Missing 'total-chunks' message header")
# received full data in one message
# or last chunk of data
- elif headers.chunk_index == headers.total_chunks:
+ if headers.chunk_index == headers.total_chunks:
if file_path.exists():
shutil.copy(file_path, file_path_backup)
logger.debug(f"Created backup of '{file_path_backup}' file")
# rewrite only on first part, else append
- mode = "w" if int(headers.chunk_index) == 1 else "a"
+ mode = (
+ "w"
+ if (headers.chunk_index and int(headers.chunk_index)) or not headers.total_chunks == 1
+ else "a"
+ )
with open(file_path_tmp, mode) as file:
file.write(value)
# received part of data
else:
# rewrite only on first part, else append
- mode = "w" if int(headers.chunk_index) == 1 else "a"
+ mode = "w" if headers.chunk_index and int(headers.chunk_index) == 1 else "a"
with open(file_path_tmp, mode) as file:
file.write(value)
logger.debug(f"Saved part {headers.chunk_index} of data to '{file_path_tmp}' file")