import logging
from pathlib import Path
-from threading import Timer
from typing import Any, Dict, List, Optional
-from urllib.parse import quote
from knot_resolver.constants import WATCHDOG_LIB
-from knot_resolver.controller.registered_workers import command_registered_workers
from knot_resolver.datamodel import KresConfig
from knot_resolver.manager.config_store import ConfigStore, only_on_real_changes_update
-from knot_resolver.utils import compat
-from knot_resolver.utils.requests import SocketDesc, request
+from knot_resolver.manager.triggers import cancel_cmd, trigger_cmd, trigger_renew
logger = logging.getLogger(__name__)
def __init__(self, files: FilesToWatch, config: KresConfig) -> None:
self._files = files
self._config = config
- self._policy_timer: Optional[Timer] = None
- self._timers: Dict[str, Timer] = {}
def _trigger(self, cmd: Optional[str]) -> None:
- def policy_reload() -> None:
- management = self._config.management
- socket = SocketDesc(
- f'http+unix://{quote(str(management.unix_socket), safe="")}/',
- 'Key "/management/unix-socket" in validated configuration',
- )
- if management.interface:
- socket = SocketDesc(
- f"http://{management.interface.addr}:{management.interface.port}",
- 'Key "/management/interface" in validated configuration',
- )
-
- response = request(socket, "POST", "renew")
- if response.status != 200:
- logger.error(f"Failed to reload policy rules: {response.body}")
- logger.info("Reloading policy rules has finished")
-
- if not cmd:
- # skipping if reload was already triggered
- if self._policy_timer and self._policy_timer.is_alive():
- logger.info("Skipping reloading policy rules, it was already triggered")
- return
- # start a 5sec timer
- logger.info("Delayed policy rules reload has started")
- self._policy_timer = Timer(5, policy_reload)
- self._policy_timer.start()
- return
-
- def command() -> None:
- if compat.asyncio.is_event_loop_running():
- compat.asyncio.create_task(command_registered_workers(cmd))
- else:
- compat.asyncio.run(command_registered_workers(cmd))
- logger.info(f"Sending '{cmd}' command to reload watched files has finished")
-
- # skipping if command was already triggered
- if cmd in self._timers and self._timers[cmd].is_alive():
- logger.info(f"Skipping sending '{cmd}' command, it was already triggered")
- return
- # start a 5sec timer
- logger.info(f"Delayed send of '{cmd}' command has started")
- self._timers[cmd] = Timer(5, command)
- self._timers[cmd].start()
+ if cmd:
+ trigger_cmd(self._config, cmd)
+ trigger_renew(self._config)
def on_created(self, event: FileSystemEvent) -> None:
src_path = Path(str(event.src_path))
if src_path in self._files.keys():
logger.warning(f"Watched file '{src_path}' has been deleted")
cmd = self._files[src_path]
- if cmd in self._timers:
- self._timers[cmd].cancel()
+ if cmd:
+ cancel_cmd(cmd)
for file in self._files.keys():
if file.parent == src_path:
logger.warning(f"Watched directory '{src_path}' has been deleted")
cmd = self._files[file]
- if cmd in self._timers:
- self._timers[cmd].cancel()
+ if cmd:
+ cancel_cmd(cmd)
def on_moved(self, event: FileSystemEvent) -> None:
src_path = Path(str(event.src_path))