if self._consumer:
self._consumer.close()
- def _consume(self) -> None: # noqa: PLR0912
+ def _consume(self) -> None: # noqa: PLR0912, PLR0915
if not self._consumer:
return
continue
if not headers.file_name:
- logger.error("Missing 'file-name' header")
+ logger.error("Missing 'file-name' message header")
continue
# prepare files names
shutil.copy(file_path, file_path_backup)
logger.debug(f"Created backup of '{file_path_backup}' file")
- with open(file_path_tmp, "w") as file:
+ # rewrite only on first part, else append
+ mode = "w" if int(headers.chunk_index) == 1 else "a"
+ with open(file_path_tmp, mode) as file:
file.write(value)
config_extensions = (".json", ".yaml", ".yml")
os.replace(file_path_tmp, file_path)
logger.info(f"Saved data to '{file_path}'")
+ # config files must be reloaded
if file_extension in config_extensions:
# trigger delayed configuration reload
trigger_reload(self._config)
with open(file_path_tmp, mode) as file:
file.write(value)
logger.debug(f"Saved part {headers.chunk_index} of data to '{file_path_tmp}' file")
-
except Exception as e:
logger.error(f"Processing message failed with error: \n{e}")
continue