]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
datamodel: file permission checks: #814 created function to check that kresd_user...
authorFrantisek Tobias <frantisek.tobias@nic.cz>
Tue, 20 Aug 2024 07:32:38 +0000 (09:32 +0200)
committerAleš Mrázek <ales.mrazek@nic.cz>
Mon, 2 Sep 2024 14:42:37 +0000 (16:42 +0200)
manager/knot_resolver_manager/constants.py
manager/knot_resolver_manager/datamodel/types/__init__.py
manager/knot_resolver_manager/datamodel/types/files.py
manager/knot_resolver_manager/server.py
python/knot_resolver.py.in
python/meson.build

index 90ceed9f85f7df1eae49b7ad9f6c660fb2c49688..9253c2db15857c15e2266154a36534d4ed45950b 100644 (file)
@@ -35,6 +35,14 @@ def kres_gc_executable() -> Path:
     return knot_resolver.sbin_dir / "kres-cache-gc"
 
 
+def kresd_user():
+    return None if knot_resolver is None else knot_resolver.user
+
+
+def kresd_group():
+    return None if knot_resolver is None else knot_resolver.group
+
+
 def kresd_cache_dir(config: "KresConfig") -> Path:
     return config.cache.storage.to_path()
 
index a87c5c7c147c3b6e894ce5f764865a50222e75c3..52ab1cf8cf3eb596124ac3e468cc6eeab2aeb292 100644 (file)
@@ -1,5 +1,5 @@
 from .enums import DNSRecordTypeEnum, PolicyActionEnum, PolicyFlagEnum
-from .files import AbsoluteDir, Dir, File, FilePath, WritableFile, ReadableFile
+from .files import AbsoluteDir, Dir, File, FilePath, WritableDir, ReadableFile
 from .generic_types import ListOrItem
 from .types import (
     DomainName,
@@ -61,7 +61,7 @@ __all__ = [
     "TimeUnit",
     "AbsoluteDir",
     "ReadableFile",
-    "WritableFile",
+    "WritableDir",
     "File",
     "FilePath",
     "Dir",
index ec2fcca541ccfe1160caf74c477b94c2e51e79f2..df95a886b2bae0db6814615663e8abe134d4c423 100644 (file)
@@ -1,8 +1,11 @@
-from logging import debug
-from os import close
 from pathlib import Path
 from typing import Any, Dict, Tuple, Type, TypeVar
+import os
+import stat
+from pwd import getpwnam
+from grp import getgrnam
 
+from knot_resolver_manager.constants import kresd_user, kresd_group
 from knot_resolver_manager.datamodel.globals import get_resolve_root, get_strict_validation
 from knot_resolver_manager.utils.modeling.base_value_type import BaseValueType
 
@@ -135,11 +138,49 @@ class FilePath(UncheckedPath):
         p = self._value.parent
         if self.strict_validation and (not p.exists() or not p.is_dir()):
             raise ValueError(f"path '{self._value}' does not point inside an existing directory")
+
+        # WARNING: is_dir() fails for knot-resolver owned paths when using kresctl to validate config
         if self.strict_validation and self._value.is_dir():
             raise ValueError(f"path '{self._value}' points to a directory when we expected a file")
 
 
-class ReadableFile(UncheckedPath):
+READ_MODE = 0
+WRITE_MODE = 1
+EXECUTE_MODE = 2
+
+def kresd_accesible(dest_path: Path, perm_mode: int) -> bool:
+    chflags = [
+        [stat.S_IRUSR, stat.S_IRGRP, stat.S_IROTH],
+        [stat.S_IWUSR, stat.S_IWGRP, stat.S_IWOTH],
+        [stat.S_IXUSR, stat.S_IXGRP, stat.S_IXOTH],
+    ]
+
+    username = kresd_user()
+    groupname = kresd_group()
+
+    if username is None or groupname is None:
+        return True
+
+    user_uid = getpwnam(username).pw_uid
+    user_gid = getgrnam(groupname).gr_gid
+
+    dest_stat = os.stat(dest_path)
+    dest_uid = dest_stat.st_uid
+    dest_gid = dest_stat.st_gid
+
+    _mode = dest_stat.st_mode
+
+    if user_uid == dest_uid:
+        return bool(_mode & chflags[perm_mode][0])
+
+    b_groups = os.getgrouplist(os.getlogin(), user_gid)
+    if user_gid == dest_gid or dest_gid in b_groups:
+        return bool(_mode & chflags[perm_mode][1])
+
+    return bool(_mode & chflags[perm_mode][2])
+
+
+class ReadableFile(File):
     """
     File, that is enforced to be:
     - readable by kresd
@@ -148,33 +189,20 @@ class ReadableFile(UncheckedPath):
         self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/"
     ) -> None:
         super().__init__(source_value, parents=parents, object_path=object_path)
-        try:
-            f = open(self._value, "r")
-        except IOError as e:
-            if e.args == (13, 'permission denied'):
-                raise ValueError(f"file'{self._value}' isn't readable")
-            raise ValueError(f"Unexpected error '{e}'")
 
-            f.close()
+        if self.strict_validation and not kresd_accesible(self._value, READ_MODE):
+            raise ValueError(f"{kresd_user()}:{kresd_group()} has insuficient permissions to read \"{self._value}\"")
 
 
-class WritableFile(UncheckedPath):
+class WritableDir(Dir):
     """
-    File, that is enforced to be:
-    - writable by kresd
+    Dif, that is enforced to be:
+    - writable to by kresd
     """
     def __init__(
         self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/"
     ) -> None:
-        print(type(self))
         super().__init__(source_value, parents=parents, object_path=object_path)
-        try:
-            f = open(self._value, "w")
-        except IOError as e:
-            if e.args == (13, 'permission denied'):
-                raise ValueError(f"file'{self._value}' isn't readable")
-            raise ValueError(f"Unexpected error '{e}'")
-
-        f.close()
-
 
+        if self.strict_validation and not kresd_accesible(self._value, WRITE_MODE):
+            raise ValueError(f"{kresd_user()}:{kresd_group()} has insuficient permissions to write to \"{self._value}\"")
index b5ebd6c2f771c1c5b45766353538a64cd115c394..b7cd021227fa286867ab060b6ba506febc2b2c27 100644 (file)
@@ -17,8 +17,6 @@ from aiohttp.web_app import Application
 from aiohttp.web_response import json_response
 from aiohttp.web_runner import AppRunner, TCPSite, UnixSite
 from typing_extensions import Literal
-
-from knot_resolver_manager.datamodel.types.files import ReadableFile, WritableFile
 import knot_resolver_manager.utils.custom_atexit as atexit
 from knot_resolver_manager import log, statistics
 from knot_resolver_manager.compat import asyncio as asyncio_compat
@@ -509,7 +507,6 @@ async def start_server(config: Path = DEFAULT_MANAGER_CONFIG_FILE) -> int:
     # This function is quite long, but it describes how manager runs. So let's silence pylint
     # pylint: disable=too-many-statements
 
-    ReadableFile(config)
     start_time = time()
     working_directory_on_startup = os.getcwd()
     manager: Optional[KresManager] = None
index 262f7a8402e0a867feea187a0c22dbd91c44591a..e6b2accbd231391aa9cfe01e47c65815021bb3fa 100644 (file)
@@ -8,3 +8,5 @@ etc_dir = Path("@etc_dir@")
 run_dir = Path("@run_dir@")
 lib_dir = Path("@lib_dir@")
 modules_dir = Path("@modules_dir@")
+user = "@user@"
+group = "@group@"
index e209df542f998bd8260c926f320401d3830c0f7a..c6b2a27ed19063d663d89b80c573a007b8807f52 100644 (file)
@@ -9,6 +9,8 @@ python_config.set('etc_dir', etc_dir)
 python_config.set('run_dir', run_dir)
 python_config.set('lib_dir', lib_dir)
 python_config.set('modules_dir', modules_dir)
+python_config.set('user', user)
+python_config.set('group', group)
 
 configure_file(
   input: 'knot_resolver.py.in',