if self._consumer:
self._consumer.close()
- def _consume(self) -> None: # noqa: PLR0915
+ def _consume(self) -> None: # noqa: PLR0912, PLR0915
if not self._consumer:
return
# messages without key
# config
if not key:
+ logger.info("Received configuration message")
+
config_orig = self._config.get_unparsed_data()
parsed = parse_json(value)
config_new = data_combine(config_orig, parsed)
file_path = self._config.kafka.files_dir.to_path() / "config.json"
file_path_tmp = f"{file_path}.tmp"
file_path_backup = f"{file_path}.backup"
- shutil.copy(file_path, file_path_backup)
+
+ if file_path.exists():
+ shutil.copy(file_path, file_path_backup)
with open(file_path_tmp, "w") as file:
file.write(value)
logger.error(f"Failed to apply new config:\n{response.body}")
continue
os.replace(file_path_tmp, file_path)
+ logger.info(f"Successfully applied config, saved to '{file_path}'")
continue
# messages with key
logger.debug(f"Saved part {file_part} of data to '{file_path_tmp}' file")
# received END of data
elif file_part and file_part == "END":
- shutil.copy(file_path, file_path_backup)
- logger.debug(f"Created backup of '{file_path_backup}' file")
+ if file_path.exists():
+ shutil.copy(file_path, file_path_backup)
+ logger.debug(f"Created backup of '{file_path_backup}' file")
- os.replace(file_path, file_path_tmp)
- logger.info(f"Saved data to '{file_path}'")
+ os.replace(file_path_tmp, file_path)
+ logger.info(f"Saved file data to '{file_path}'")
# trigger delayed configuration renew
trigger_renew(self._config)