From: Vladimír Čunát Date: Wed, 16 Jul 2025 15:58:20 +0000 (+0200) Subject: datamodel/types/files.py: improve file permission check X-Git-Tag: v6.0.15~5^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=da67d3b4e2cfabb019de03d85b7f5943cf8b92e8;p=thirdparty%2Fknot-resolver.git datamodel/types/files.py: improve file permission check In particular with ACLs this could have prevented resolver starting even if the permissions were OK. os.access() should be accurate at least when running from manager (not from kresctl). --- diff --git a/python/knot_resolver/datamodel/types/files.py b/python/knot_resolver/datamodel/types/files.py index 9e326999b..b19237fa2 100644 --- a/python/knot_resolver/datamodel/types/files.py +++ b/python/knot_resolver/datamodel/types/files.py @@ -1,3 +1,4 @@ +import logging import os import stat from enum import Flag, auto @@ -10,6 +11,8 @@ from knot_resolver.constants import GROUP, USER from knot_resolver.datamodel.globals import get_permissions_default, get_resolve_root, get_strict_validation from knot_resolver.utils.modeling.base_value_type import BaseValueType +logger = logging.getLogger(__name__) + class UncheckedPath(BaseValueType): """ @@ -157,8 +160,7 @@ class _PermissionMode(Flag): WRITE = auto() EXECUTE = auto() - -def _kres_accessible(dest_path: Path, perm_mode: _PermissionMode) -> bool: +def _check_permission(dest_path: Path, perm_mode: _PermissionMode) -> bool: chflags = { _PermissionMode.READ: [stat.S_IRUSR, stat.S_IRGRP, stat.S_IROTH], _PermissionMode.WRITE: [stat.S_IWUSR, stat.S_IWGRP, stat.S_IWOTH], @@ -208,8 +210,11 @@ class ReadableFile(File): ) -> None: super().__init__(source_value, parents=parents, object_path=object_path) - if self.strict_validation and not _kres_accessible(self._value, _PermissionMode.READ): - raise ValueError(f"{USER}:{GROUP} has insufficient permissions to read '{self._value}'") + if self.strict_validation and not _check_permission(self._value, _PermissionMode.READ): + msg = f"{USER}:{GROUP} has insufficient permissions to read '{self._value}'" + if not os.access(self._value, os.R_OK): + raise ValueError(msg) + logger.info(f"{msg}, but the resolver can somehow (ACLs, ...) read the file") class WritableDir(Dir): @@ -224,10 +229,13 @@ class WritableDir(Dir): ) -> None: super().__init__(source_value, parents=parents, object_path=object_path) - if self.strict_validation and not _kres_accessible( + if self.strict_validation and not _check_permission( self._value, _PermissionMode.WRITE | _PermissionMode.EXECUTE ): - raise ValueError(f"{USER}:{GROUP} has insufficient permissions to write/execute '{self._value}'") + msg = f"{USER}:{GROUP} has insufficient permissions to write/execute '{self._value}'" + if not os.access(self._value, os.W_OK | os.X_OK): + raise ValueError(msg) + logger.info(f"{msg}, but the resolver can somehow (ACLs, ...) write to the directory") class WritableFilePath(FilePath): @@ -243,7 +251,21 @@ class WritableFilePath(FilePath): ) -> None: super().__init__(source_value, parents=parents, object_path=object_path) - if self.strict_validation and not _kres_accessible( - self._value.parent, _PermissionMode.WRITE | _PermissionMode.EXECUTE - ): - raise ValueError(f"{USER}:{GROUP} has insufficient permissions to write/execute'{self._value.parent}'") + if self.strict_validation: + # check that parent dir is writable + if not _check_permission(self._value.parent, _PermissionMode.WRITE | _PermissionMode.EXECUTE): + msg = f"{USER}:{GROUP} has insufficient permissions to write/execute '{self._value.parent}'" + # os.access() on the dir just provides a more precise message, + # as the os.access() on the file below check everything in one go + if not os.access(self._value.parent, os.W_OK | os.X_OK): + raise ValueError(msg) + logger.info(f"{msg}, but the resolver can somehow (ACLs, ...) write to the directory") + + # check that existing file is writable + if self._value.exists() and not _check_permission( + self._value, _PermissionMode.WRITE + ): + msg = f"{USER}:{GROUP} has insufficient permissions to write/execute '{self._value}'" + if not os.access(self._value, os.W_OK): + raise ValueError(msg) + logger.info(f"{msg}, but the resolver can somehow (ACLs, ...) write to the file")