import shutil
from threading import Timer
from typing import Dict, List, Optional
+from urllib.parse import quote
from knot_resolver.constants import KAFKA_LIB
from knot_resolver.datamodel import KresConfig
from knot_resolver.manager.config_store import ConfigStore
-from knot_resolver.manager.triggers import trigger_reload
+from knot_resolver.manager.triggers import trigger_renew
from knot_resolver.utils.functional import Result
+from knot_resolver.utils.modeling.parsing import DataFormat, data_combine, parse_json
+from knot_resolver.utils.requests import SocketDesc, request
logger = logging.getLogger(__name__)
if self._consumer:
self._consumer.close()
- def _consume(self) -> None:
+ def _consume(self) -> None: # noqa: PLR0915
if not self._consumer:
return
for record in records:
try:
key: str = record.key.decode("utf-8")
- key_split = key.split(":")
value: str = record.value.decode("utf-8")
+ # messages without key
+ # config
+ if not key:
+ config_orig = self._config.get_unparsed_data()
+ parsed = parse_json(value)
+ config_new = data_combine(config_orig, parsed)
+
+ file_name = "config.kafka.json"
+ file_name_tmp = f"{file_name}.tmp"
+ file_name_backup = f"{file_name}.backup"
+ shutil.copy(file_name, file_name_backup)
+ with open(file_name_tmp, "w") as file:
+ file.write(value)
+
+ management = self._config.management
+ socket = SocketDesc(
+ f'http+unix://{quote(str(management.unix_socket), safe="")}/',
+ 'Key "/management/unix-socket" in validated configuration',
+ )
+ if management.interface:
+ socket = SocketDesc(
+ f"http://{management.interface.addr}:{management.interface.port}",
+ 'Key "/management/interface" in validated configuration',
+ )
+
+ body = DataFormat.JSON.dict_dump(config_new)
+ response = request(socket, "PUT", "v1/config", body)
+
+ if response.status != 200:
+ logger.error(f"Failed to apply new config:\n{response.body}")
+ continue
+ os.replace(file_name_tmp, file_name)
+ continue
+
+ # messages with key
+ # RPZ or other files
+
logger.info(f"Received message with '{key}' key")
+ key_split = key.split(":")
# prepare files names
file_name = key_split[0]
os.replace(file_name, file_name_tmp)
logger.info(f"Saved data to '{file_name}'")
- # trigger delayed configuration reload
- trigger_reload(self._config)
+ # trigger delayed configuration renew
+ trigger_renew(self._config)
else:
logger.error("Failed to parse message key")
except Exception as e:
- logger.error(f"Processing message failed with error: {e}")
+ logger.error(f"Processing message failed with error: \n{e}")
continue
self._consumer_timer = Timer(5, self._consume)