from .exceptions import KresManagerException
-VerifyCallback = Callable[[KresConfig, KresConfig], Awaitable[Result[None, str]]]
-UpdateCallback = Callable[[KresConfig], Awaitable[None]]
+VerifyCallback = Callable[[KresConfig, KresConfig, bool], Awaitable[Result[None, str]]]
+UpdateCallback = Callable[[KresConfig, bool], Awaitable[None]]
class ConfigStore:
self._callbacks: List[UpdateCallback] = []
self._update_lock: Lock = Lock()
- async def update(self, config: KresConfig) -> None:
+ async def update(self, config: KresConfig, force: bool = False) -> None:
# invoke pre-change verifiers
results: Tuple[Result[None, str], ...] = tuple(
- await asyncio.gather(*[ver(self._config, config) for ver in self._verifiers])
+ await asyncio.gather(*[ver(self._config, config, force) for ver in self._verifiers])
)
err_res = filter(lambda r: r.is_err(), results)
errs = list(map(lambda r: r.unwrap_err(), err_res))
# invoke change callbacks
for call in self._callbacks:
- await call(config)
+ await call(config, force)
- async def renew(self) -> None:
- await self.update(self._config)
+ async def renew(self, force: bool = False) -> None:
+ await self.update(self._config, force)
async def register_verifier(self, verifier: VerifyCallback) -> None:
self._verifiers.append(verifier)
- res = await verifier(self.get(), self.get())
+ res = await verifier(self.get(), self.get(), False)
if res.is_err():
raise DataParsingError(f"Initial config verification failed with error: {res.unwrap_err()}")
"""
self._callbacks.append(callback)
- await callback(self.get())
+ await callback(self.get(), False)
def get(self) -> KresConfig:
return self._config
original_value_set: Any = False
original_value: Any = None
- async def new_func_update(config: KresConfig) -> None:
+ async def new_func_update(config: KresConfig, force: bool = False) -> None:
nonlocal original_value_set
nonlocal original_value
if not original_value_set:
original_value_set = True
elif original_value == selector(config):
- await orig_func(config)
+ await orig_func(config, force)
+ elif force:
+ await orig_func(config, force)
original_value = selector(config)
return new_func_update
original_value_set: Any = False
original_value: Any = None
- async def new_func_update(config: KresConfig) -> None:
+ async def new_func_update(config: KresConfig, force: bool) -> None:
nonlocal original_value_set
nonlocal original_value
if not original_value_set:
original_value_set = True
- original_value = selector(config)
- await orig_func(config)
+ await orig_func(config, force)
elif original_value != selector(config):
- original_value = selector(config)
- await orig_func(config)
+ await orig_func(config, force)
+ elif force:
+ await orig_func(config, force)
+ original_value = selector(config)
return new_func_update
original_value_set: Any = False
original_value: Any = None
- async def new_func_verifier(old: KresConfig, new: KresConfig) -> Result[NoneType, str]:
+ async def new_func_verifier(old: KresConfig, new: KresConfig, force: bool) -> Result[NoneType, str]:
nonlocal original_value_set
nonlocal original_value
if not original_value_set:
original_value_set = True
original_value = selector(new)
- await orig_func(old, new)
+ await orig_func(old, new, force)
elif original_value != selector(new):
original_value = selector(new)
- await orig_func(old, new)
+ await orig_func(old, new, force)
return Result.ok(None)
return new_func_verifier
logger = logging.getLogger(__name__)
-async def files_reload(config: KresConfig) -> None:
+async def files_reload(config: KresConfig, force: bool = False) -> None:
cert_file = config.network.tls.cert_file
key_file = config.network.tls.key_file
@only_on_real_changes_update(watched_files_config)
-async def _init_files_watchdog(config: KresConfig) -> None:
+async def _init_files_watchdog(config: KresConfig, force: bool = False) -> None:
if WATCHDOG_LIB:
global _files_watchdog
@only_on_real_changes_update(lambda config: config.logging)
-async def _configure_logger(config: KresConfig) -> None:
+async def _configure_logger(config: KresConfig, force: bool = False) -> None:
await _set_logging_handler(config)
await _set_log_level(config)
return self._counter >= FIX_COUNTER_ATTEMPTS_MAX
-async def _deny_max_worker_changes(config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
+async def _deny_max_worker_changes(
+ config_old: KresConfig, config_new: KresConfig, force: bool = False
+) -> Result[None, str]:
if config_old.max_workers != config_new.max_workers:
return Result.err(
"Changing 'max-workers', the maximum number of workers allowed to run, is not allowed at runtime."
def add_shutdown_trigger(self, trigger: Callable[[int], None]) -> None:
self._shutdown_triggers.append(trigger)
- async def validate_config(self, _old: KresConfig, new: KresConfig) -> Result[NoneType, str]:
+ async def validate_config(self, _old: KresConfig, new: KresConfig, force: bool = False) -> Result[NoneType, str]:
async with self._manager_lock:
if _old.rate_limiting != new.rate_limiting:
logger.debug("Unlinking shared ratelimiting memory")
self._gc = None
await self._collect_already_running_workers()
- async def reset_workers_policy_rules(self, _config: KresConfig) -> None:
+ async def reset_workers_policy_rules(self, _config: KresConfig, force: bool = False) -> None:
# command all running 'kresd' workers to reset their old policy rules,
# unless the workers have already been started with a new config so reset is not needed
if self._workers_reset_needed and get_registered_workers_kresids():
" the workers are already running with new configuration"
)
- async def set_new_tls_sticket_secret(self, config: KresConfig) -> None:
+ async def set_new_tls_sticket_secret(self, config: KresConfig, force: bool = False) -> None:
if config.network.tls.sticket_secret or config.network.tls.sticket_secret_file:
logger.debug("User-configured TLS resumption secret found - skipping auto-generation.")
return
if res not in (0, True):
logger.error("Failed to set TLS session ticket secret in %s: %s", worker, res)
- async def apply_config(self, config: KresConfig, _noretry: bool = False) -> None:
+ async def apply_config(self, config: KresConfig, force: bool = False, _noretry: bool = False) -> None:
try:
async with self._manager_lock:
logger.debug("Applying config to all workers")
await self._reload_system_state()
await self.apply_config(config, _noretry=True)
+ logger.info("Config applied successfully to all workers")
self._workers_reset_needed = False
- async def load_policy_rules(self, _old: KresConfig, new: KresConfig) -> Result[NoneType, str]:
+ async def load_policy_rules(self, _old: KresConfig, new: KresConfig, force: bool = False) -> Result[NoneType, str]:
try:
async with self._manager_lock:
logger.debug("Running kresd 'policy-loader'")
compat.asyncio.run(self.collect_kresd_stats(_triggered_from_prometheus_library=True))
@only_on_real_changes_update(lambda c: c.monitoring.graphite)
- async def _init_graphite_bridge(config: KresConfig) -> None:
+ async def _init_graphite_bridge(config: KresConfig, force: bool = False) -> None:
"""
Starts graphite bridge if required
"""
interval=config.monitoring.graphite.interval.seconds(), prefix=str(config.monitoring.graphite.prefix)
)
- async def _deny_turning_off_graphite_bridge(old_config: KresConfig, new_config: KresConfig) -> Result[None, str]:
+ async def _deny_turning_off_graphite_bridge(
+ old_config: KresConfig, new_config: KresConfig, force: bool = False
+ ) -> Result[None, str]:
if old_config.monitoring.graphite and not new_config.monitoring.graphite:
return Result.err(
"You can't turn off graphite monitoring dynamically. If you really want this feature, please let the developers know."
self._shutdown_event = asyncio.Event()
self._manager = manager
- async def _reconfigure(self, config: KresConfig) -> None:
+ async def _reconfigure(self, config: KresConfig, force: bool = False) -> None:
await self._reconfigure_listen_address(config)
- async def _deny_management_changes(self, config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
+ async def _deny_management_changes(
+ self, config_old: KresConfig, config_new: KresConfig, force: bool = False
+ ) -> Result[None, str]:
if config_old.management != config_new.management:
return Result.err(
"/server/management: Changing management API address/uTruenix-socket dynamically is not allowed as it's really dangerous."
)
return Result.ok(None)
- async def _reload_config(self) -> None:
+ async def _reload_config(self, force: bool = False) -> None:
if self._config_path is None:
logger.warning("The manager was started with inlined configuration - can't reload")
else:
data = data_combine(data, file_data)
config = KresConfig(data)
- await self.config_store.update(config)
+ await self.config_store.update(config, force)
logger.info("Configuration file successfully reloaded")
except FileNotFoundError:
logger.error(
logger.error(f"Reloading of the configuration file failed: {e}")
logger.error("Configuration has NOT been changed.")
- async def _renew_config(self) -> None:
+ async def _renew_config(self, force: bool = False) -> None:
try:
- await self.config_store.renew()
+ await self.config_store.renew(force)
logger.info("Configuration successfully renewed")
except KresManagerException as e:
logger.error(f"Renewing the configuration failed: {e}")
logger.info("Shutdown event triggered...")
return web.Response(text="Shutting down...")
- async def _handler_reload(self, _request: web.Request) -> web.Response:
+ async def _handler_reload(self, request: web.Request) -> web.Response:
"""
- Route handler for reloading the server
+ Route handler for reloading the configuration
"""
logger.info("Reloading event triggered...")
- await self._reload_config()
+ await self._reload_config(force=bool(request.path.endswith("/force")))
return web.Response(text="Reloading...")
- async def _handler_renew(self, _request: web.Request) -> web.Response:
+ async def _handler_renew(self, request: web.Request) -> web.Response:
"""
Route handler for renewing the configuration
"""
logger.info("Renewing configuration event triggered...")
- await self._renew_config()
+ await self._renew_config(force=bool(request.path.endswith("/force")))
return web.Response(text="Renewing configuration...")
async def _handler_processes(self, request: web.Request) -> web.Response:
web.patch(r"/v1/config{path:.*}", self._handler_config_query),
web.post("/stop", self._handler_stop),
web.post("/reload", self._handler_reload),
+ web.post("/reload/force", self._handler_reload),
web.post("/renew", self._handler_renew),
+ web.post("/renew/force", self._handler_renew),
web.get("/schema", self._handler_schema),
web.get("/schema/ui", self._handle_view_schema),
web.get("/metrics", self._handler_metrics),
return manager
-async def _deny_working_directory_changes(config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
+async def _deny_working_directory_changes(
+ config_old: KresConfig, config_new: KresConfig, force: bool = False
+) -> Result[None, str]:
if config_old.rundir != config_new.rundir:
return Result.err("Changing manager's `rundir` during runtime is not allowed.")
count = 0
@only_on_real_changes_update(lambda config: config.logging.level)
- async def change_callback(config: KresConfig) -> None:
+ async def change_callback(config: KresConfig, force: bool = False) -> None:
nonlocal count
count += 1