]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
datamodel/types/files.py: improve file permission check
authorVladimír Čunát <vladimir.cunat@nic.cz>
Wed, 16 Jul 2025 15:58:20 +0000 (17:58 +0200)
committerVladimír Čunát <vladimir.cunat@nic.cz>
Wed, 16 Jul 2025 15:58:36 +0000 (17:58 +0200)
In particular with ACLs this could have prevented resolver starting
even if the permissions were OK.  os.access() should be accurate
at least when running from manager (not from kresctl).

python/knot_resolver/datamodel/types/files.py

index 9e326999b2436bd98e1b118cd5be382b4266d87c..b19237fa2554b1487672de24bb6835cd0e6118f1 100644 (file)
@@ -1,3 +1,4 @@
+import logging
 import os
 import stat
 from enum import Flag, auto
@@ -10,6 +11,8 @@ from knot_resolver.constants import GROUP, USER
 from knot_resolver.datamodel.globals import get_permissions_default, get_resolve_root, get_strict_validation
 from knot_resolver.utils.modeling.base_value_type import BaseValueType
 
+logger = logging.getLogger(__name__)
+
 
 class UncheckedPath(BaseValueType):
     """
@@ -157,8 +160,7 @@ class _PermissionMode(Flag):
     WRITE = auto()
     EXECUTE = auto()
 
-
-def _kres_accessible(dest_path: Path, perm_mode: _PermissionMode) -> bool:
+def _check_permission(dest_path: Path, perm_mode: _PermissionMode) -> bool:
     chflags = {
         _PermissionMode.READ: [stat.S_IRUSR, stat.S_IRGRP, stat.S_IROTH],
         _PermissionMode.WRITE: [stat.S_IWUSR, stat.S_IWGRP, stat.S_IWOTH],
@@ -208,8 +210,11 @@ class ReadableFile(File):
     ) -> None:
         super().__init__(source_value, parents=parents, object_path=object_path)
 
-        if self.strict_validation and not _kres_accessible(self._value, _PermissionMode.READ):
-            raise ValueError(f"{USER}:{GROUP} has insufficient permissions to read '{self._value}'")
+        if self.strict_validation and not _check_permission(self._value, _PermissionMode.READ):
+            msg = f"{USER}:{GROUP} has insufficient permissions to read '{self._value}'"
+            if not os.access(self._value, os.R_OK):
+                raise ValueError(msg)
+            logger.info(f"{msg}, but the resolver can somehow (ACLs, ...) read the file")
 
 
 class WritableDir(Dir):
@@ -224,10 +229,13 @@ class WritableDir(Dir):
     ) -> None:
         super().__init__(source_value, parents=parents, object_path=object_path)
 
-        if self.strict_validation and not _kres_accessible(
+        if self.strict_validation and not _check_permission(
             self._value, _PermissionMode.WRITE | _PermissionMode.EXECUTE
         ):
-            raise ValueError(f"{USER}:{GROUP} has insufficient permissions to write/execute '{self._value}'")
+            msg = f"{USER}:{GROUP} has insufficient permissions to write/execute '{self._value}'"
+            if not os.access(self._value, os.W_OK | os.X_OK):
+                raise ValueError(msg)
+            logger.info(f"{msg}, but the resolver can somehow (ACLs, ...) write to the directory")
 
 
 class WritableFilePath(FilePath):
@@ -243,7 +251,21 @@ class WritableFilePath(FilePath):
     ) -> None:
         super().__init__(source_value, parents=parents, object_path=object_path)
 
-        if self.strict_validation and not _kres_accessible(
-            self._value.parent, _PermissionMode.WRITE | _PermissionMode.EXECUTE
-        ):
-            raise ValueError(f"{USER}:{GROUP} has insufficient permissions to write/execute'{self._value.parent}'")
+        if self.strict_validation:
+            # check that parent dir is writable
+            if not _check_permission(self._value.parent, _PermissionMode.WRITE | _PermissionMode.EXECUTE):
+                msg = f"{USER}:{GROUP} has insufficient permissions to write/execute '{self._value.parent}'"
+                # os.access() on the dir just provides a more precise message,
+                # as the os.access() on the file below check everything in one go
+                if not os.access(self._value.parent, os.W_OK | os.X_OK):
+                    raise ValueError(msg)
+                logger.info(f"{msg}, but the resolver can somehow (ACLs, ...) write to the directory")
+
+            # check that existing file is writable
+            if self._value.exists() and not _check_permission(
+                self._value, _PermissionMode.WRITE
+            ):
+                msg = f"{USER}:{GROUP} has insufficient permissions to write/execute '{self._value}'"
+                if not os.access(self._value, os.W_OK):
+                    raise ValueError(msg)
+                logger.info(f"{msg}, but the resolver can somehow (ACLs, ...) write to the file")