+import logging
import os
import stat
from enum import Flag, auto
from knot_resolver.datamodel.globals import get_permissions_default, get_resolve_root, get_strict_validation
from knot_resolver.utils.modeling.base_value_type import BaseValueType
+logger = logging.getLogger(__name__)
+
class UncheckedPath(BaseValueType):
"""
WRITE = auto()
EXECUTE = auto()
-
-def _kres_accessible(dest_path: Path, perm_mode: _PermissionMode) -> bool:
+def _check_permission(dest_path: Path, perm_mode: _PermissionMode) -> bool:
chflags = {
_PermissionMode.READ: [stat.S_IRUSR, stat.S_IRGRP, stat.S_IROTH],
_PermissionMode.WRITE: [stat.S_IWUSR, stat.S_IWGRP, stat.S_IWOTH],
) -> None:
super().__init__(source_value, parents=parents, object_path=object_path)
- if self.strict_validation and not _kres_accessible(self._value, _PermissionMode.READ):
- raise ValueError(f"{USER}:{GROUP} has insufficient permissions to read '{self._value}'")
+ if self.strict_validation and not _check_permission(self._value, _PermissionMode.READ):
+ msg = f"{USER}:{GROUP} has insufficient permissions to read '{self._value}'"
+ if not os.access(self._value, os.R_OK):
+ raise ValueError(msg)
+ logger.info(f"{msg}, but the resolver can somehow (ACLs, ...) read the file")
class WritableDir(Dir):
) -> None:
super().__init__(source_value, parents=parents, object_path=object_path)
- if self.strict_validation and not _kres_accessible(
+ if self.strict_validation and not _check_permission(
self._value, _PermissionMode.WRITE | _PermissionMode.EXECUTE
):
- raise ValueError(f"{USER}:{GROUP} has insufficient permissions to write/execute '{self._value}'")
+ msg = f"{USER}:{GROUP} has insufficient permissions to write/execute '{self._value}'"
+ if not os.access(self._value, os.W_OK | os.X_OK):
+ raise ValueError(msg)
+ logger.info(f"{msg}, but the resolver can somehow (ACLs, ...) write to the directory")
class WritableFilePath(FilePath):
) -> None:
super().__init__(source_value, parents=parents, object_path=object_path)
- if self.strict_validation and not _kres_accessible(
- self._value.parent, _PermissionMode.WRITE | _PermissionMode.EXECUTE
- ):
- raise ValueError(f"{USER}:{GROUP} has insufficient permissions to write/execute'{self._value.parent}'")
+ if self.strict_validation:
+ # check that parent dir is writable
+ if not _check_permission(self._value.parent, _PermissionMode.WRITE | _PermissionMode.EXECUTE):
+ msg = f"{USER}:{GROUP} has insufficient permissions to write/execute '{self._value.parent}'"
+ # os.access() on the dir just provides a more precise message,
+ # as the os.access() on the file below check everything in one go
+ if not os.access(self._value.parent, os.W_OK | os.X_OK):
+ raise ValueError(msg)
+ logger.info(f"{msg}, but the resolver can somehow (ACLs, ...) write to the directory")
+
+ # check that existing file is writable
+ if self._value.exists() and not _check_permission(
+ self._value, _PermissionMode.WRITE
+ ):
+ msg = f"{USER}:{GROUP} has insufficient permissions to write/execute '{self._value}'"
+ if not os.access(self._value, os.W_OK):
+ raise ValueError(msg)
+ logger.info(f"{msg}, but the resolver can somehow (ACLs, ...) write to the file")