import logging
from pathlib import Path
from threading import Timer
-from typing import Any, List, Optional
+from typing import Any, Dict, List, Optional
from knot_resolver.constants import WATCHDOG_LIB
from knot_resolver.controller.registered_workers import command_registered_workers
]
+FilesToWatch = Dict[Path, str]
+
+
if WATCHDOG_LIB:
from watchdog.events import (
FileSystemEvent,
)
from watchdog.observers import Observer
- _tls_cert_watchdog: Optional["TLSCertWatchDog"] = None
-
- class TLSCertEventHandler(FileSystemEventHandler):
- def __init__(self, files: List[Path], cmd: str) -> None:
+ class FilesWatchdogEventHandler(FileSystemEventHandler):
+ def __init__(self, files: FilesToWatch) -> None:
self._files = files
- self._cmd = cmd
self._timer: Optional[Timer] = None
- def _reload(self) -> None:
+ def _reload(self, cmd: str) -> None:
def command() -> None:
if compat.asyncio.is_event_loop_running():
- compat.asyncio.create_task(command_registered_workers(self._cmd))
+ compat.asyncio.create_task(command_registered_workers(cmd))
else:
- compat.asyncio.run(command_registered_workers(self._cmd))
+ compat.asyncio.run(command_registered_workers(cmd))
logger.info("Reloading of TLS certificate files has finished")
# skipping if reload was already triggered
def on_created(self, event: FileSystemEvent) -> None:
src_path = Path(str(event.src_path))
- if src_path in self._files:
+ if src_path in self._files.keys():
logger.info(f"Watched file '{src_path}' has been created")
- self._reload()
+ self._reload(self._files[src_path])
def on_deleted(self, event: FileSystemEvent) -> None:
src_path = Path(str(event.src_path))
- if src_path in self._files:
+ if src_path in self._files.keys():
logger.warning(f"Watched file '{src_path}' has been deleted")
if self._timer:
self._timer.cancel()
- for file in self._files:
+ for file in self._files.keys():
if file.parent == src_path:
logger.warning(f"Watched directory '{src_path}' has been deleted")
if self._timer:
def on_modified(self, event: FileSystemEvent) -> None:
src_path = Path(str(event.src_path))
- if src_path in self._files:
+ if src_path in self._files.keys():
logger.info(f"Watched file '{src_path}' has been modified")
- self._reload()
+ self._reload(self._files[src_path])
- class TLSCertWatchDog:
- def __init__(self, cert_file: Path, key_file: Path) -> None:
- self._observer = Observer()
+ _files_watchdog: Optional["FilesWatchdog"] = None
- cmd = f"net.tls('{cert_file}', '{key_file}')"
-
- cert_files: List[Path] = []
- cert_files.append(cert_file)
- cert_files.append(key_file)
+ class FilesWatchdog:
+ def __init__(self, files_to_watch: FilesToWatch) -> None:
+ self._observer = Observer()
- cert_dirs: List[Path] = []
- cert_dirs.append(cert_file.parent)
- if cert_file.parent != key_file.parent:
- cert_dirs.append(key_file.parent)
+ event_handler = FilesWatchdogEventHandler(files_to_watch)
+ dirs_to_watch: List[Path] = []
+ for file in files_to_watch.keys():
+ if file.parent not in dirs_to_watch:
+ dirs_to_watch.append(file.parent)
- event_handler = TLSCertEventHandler(cert_files, cmd)
- for d in cert_dirs:
+ for d in dirs_to_watch:
self._observer.schedule(
event_handler,
str(d),
@only_on_real_changes_update(tls_cert_files_config)
-async def _init_tls_cert_watchdog(config: KresConfig) -> None:
+async def _init_files_watchdog(config: KresConfig) -> None:
if WATCHDOG_LIB:
- global _tls_cert_watchdog
+ global _files_watchdog
- if _tls_cert_watchdog:
- _tls_cert_watchdog.stop()
+ if _files_watchdog:
+ _files_watchdog.stop()
+ files_to_watch: FilesToWatch = {}
+ # network.tls
if config.network.tls.files_watchdog and config.network.tls.cert_file and config.network.tls.key_file:
- logger.info("Initializing TLS certificate files WatchDog")
- _tls_cert_watchdog = TLSCertWatchDog(
- config.network.tls.cert_file.to_path(),
- config.network.tls.key_file.to_path(),
- )
- _tls_cert_watchdog.start()
+ net_tls = f"net.tls('{config.network.tls.cert_file}', '{config.network.tls.key_file}')"
+ files_to_watch[config.network.tls.cert_file.to_path()] = net_tls
+ files_to_watch[config.network.tls.key_file.to_path()] = net_tls
+
+ if files_to_watch:
+ logger.info("Initializing files watchdog")
+ _files_watchdog = FilesWatchdog(files_to_watch)
+ _files_watchdog.start()
async def init_files_watchdog(config_store: ConfigStore) -> None:
- # watchdog for TLS certificate files
- await config_store.register_on_change_callback(_init_tls_cert_watchdog)
+ # register files watchdog callback
+ await config_store.register_on_change_callback(_init_files_watchdog)