From: Frantisek Tobias Date: Tue, 20 Aug 2024 07:32:38 +0000 (+0200) Subject: datamodel: file permission checks: #814 created function to check that kresd_user... X-Git-Tag: v6.0.9~23^2~7 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5cbc876119b1f9c5ae1e760fc22ae7b31554e6f9;p=thirdparty%2Fknot-resolver.git datamodel: file permission checks: #814 created function to check that kresd_user() can access the files and directories --- diff --git a/manager/knot_resolver_manager/constants.py b/manager/knot_resolver_manager/constants.py index 90ceed9f8..9253c2db1 100644 --- a/manager/knot_resolver_manager/constants.py +++ b/manager/knot_resolver_manager/constants.py @@ -35,6 +35,14 @@ def kres_gc_executable() -> Path: return knot_resolver.sbin_dir / "kres-cache-gc" +def kresd_user(): + return None if knot_resolver is None else knot_resolver.user + + +def kresd_group(): + return None if knot_resolver is None else knot_resolver.group + + def kresd_cache_dir(config: "KresConfig") -> Path: return config.cache.storage.to_path() diff --git a/manager/knot_resolver_manager/datamodel/types/__init__.py b/manager/knot_resolver_manager/datamodel/types/__init__.py index a87c5c7c1..52ab1cf8c 100644 --- a/manager/knot_resolver_manager/datamodel/types/__init__.py +++ b/manager/knot_resolver_manager/datamodel/types/__init__.py @@ -1,5 +1,5 @@ from .enums import DNSRecordTypeEnum, PolicyActionEnum, PolicyFlagEnum -from .files import AbsoluteDir, Dir, File, FilePath, WritableFile, ReadableFile +from .files import AbsoluteDir, Dir, File, FilePath, WritableDir, ReadableFile from .generic_types import ListOrItem from .types import ( DomainName, @@ -61,7 +61,7 @@ __all__ = [ "TimeUnit", "AbsoluteDir", "ReadableFile", - "WritableFile", + "WritableDir", "File", "FilePath", "Dir", diff --git a/manager/knot_resolver_manager/datamodel/types/files.py b/manager/knot_resolver_manager/datamodel/types/files.py index ec2fcca54..df95a886b 100644 --- a/manager/knot_resolver_manager/datamodel/types/files.py +++ b/manager/knot_resolver_manager/datamodel/types/files.py @@ -1,8 +1,11 @@ -from logging import debug -from os import close from pathlib import Path from typing import Any, Dict, Tuple, Type, TypeVar +import os +import stat +from pwd import getpwnam +from grp import getgrnam +from knot_resolver_manager.constants import kresd_user, kresd_group from knot_resolver_manager.datamodel.globals import get_resolve_root, get_strict_validation from knot_resolver_manager.utils.modeling.base_value_type import BaseValueType @@ -135,11 +138,49 @@ class FilePath(UncheckedPath): p = self._value.parent if self.strict_validation and (not p.exists() or not p.is_dir()): raise ValueError(f"path '{self._value}' does not point inside an existing directory") + + # WARNING: is_dir() fails for knot-resolver owned paths when using kresctl to validate config if self.strict_validation and self._value.is_dir(): raise ValueError(f"path '{self._value}' points to a directory when we expected a file") -class ReadableFile(UncheckedPath): +READ_MODE = 0 +WRITE_MODE = 1 +EXECUTE_MODE = 2 + +def kresd_accesible(dest_path: Path, perm_mode: int) -> bool: + chflags = [ + [stat.S_IRUSR, stat.S_IRGRP, stat.S_IROTH], + [stat.S_IWUSR, stat.S_IWGRP, stat.S_IWOTH], + [stat.S_IXUSR, stat.S_IXGRP, stat.S_IXOTH], + ] + + username = kresd_user() + groupname = kresd_group() + + if username is None or groupname is None: + return True + + user_uid = getpwnam(username).pw_uid + user_gid = getgrnam(groupname).gr_gid + + dest_stat = os.stat(dest_path) + dest_uid = dest_stat.st_uid + dest_gid = dest_stat.st_gid + + _mode = dest_stat.st_mode + + if user_uid == dest_uid: + return bool(_mode & chflags[perm_mode][0]) + + b_groups = os.getgrouplist(os.getlogin(), user_gid) + if user_gid == dest_gid or dest_gid in b_groups: + return bool(_mode & chflags[perm_mode][1]) + + return bool(_mode & chflags[perm_mode][2]) + + +class ReadableFile(File): """ File, that is enforced to be: - readable by kresd @@ -148,33 +189,20 @@ class ReadableFile(UncheckedPath): self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/" ) -> None: super().__init__(source_value, parents=parents, object_path=object_path) - try: - f = open(self._value, "r") - except IOError as e: - if e.args == (13, 'permission denied'): - raise ValueError(f"file'{self._value}' isn't readable") - raise ValueError(f"Unexpected error '{e}'") - f.close() + if self.strict_validation and not kresd_accesible(self._value, READ_MODE): + raise ValueError(f"{kresd_user()}:{kresd_group()} has insuficient permissions to read \"{self._value}\"") -class WritableFile(UncheckedPath): +class WritableDir(Dir): """ - File, that is enforced to be: - - writable by kresd + Dif, that is enforced to be: + - writable to by kresd """ def __init__( self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/" ) -> None: - print(type(self)) super().__init__(source_value, parents=parents, object_path=object_path) - try: - f = open(self._value, "w") - except IOError as e: - if e.args == (13, 'permission denied'): - raise ValueError(f"file'{self._value}' isn't readable") - raise ValueError(f"Unexpected error '{e}'") - - f.close() - + if self.strict_validation and not kresd_accesible(self._value, WRITE_MODE): + raise ValueError(f"{kresd_user()}:{kresd_group()} has insuficient permissions to write to \"{self._value}\"") diff --git a/manager/knot_resolver_manager/server.py b/manager/knot_resolver_manager/server.py index b5ebd6c2f..b7cd02122 100644 --- a/manager/knot_resolver_manager/server.py +++ b/manager/knot_resolver_manager/server.py @@ -17,8 +17,6 @@ from aiohttp.web_app import Application from aiohttp.web_response import json_response from aiohttp.web_runner import AppRunner, TCPSite, UnixSite from typing_extensions import Literal - -from knot_resolver_manager.datamodel.types.files import ReadableFile, WritableFile import knot_resolver_manager.utils.custom_atexit as atexit from knot_resolver_manager import log, statistics from knot_resolver_manager.compat import asyncio as asyncio_compat @@ -509,7 +507,6 @@ async def start_server(config: Path = DEFAULT_MANAGER_CONFIG_FILE) -> int: # This function is quite long, but it describes how manager runs. So let's silence pylint # pylint: disable=too-many-statements - ReadableFile(config) start_time = time() working_directory_on_startup = os.getcwd() manager: Optional[KresManager] = None diff --git a/python/knot_resolver.py.in b/python/knot_resolver.py.in index 262f7a840..e6b2accbd 100644 --- a/python/knot_resolver.py.in +++ b/python/knot_resolver.py.in @@ -8,3 +8,5 @@ etc_dir = Path("@etc_dir@") run_dir = Path("@run_dir@") lib_dir = Path("@lib_dir@") modules_dir = Path("@modules_dir@") +user = "@user@" +group = "@group@" diff --git a/python/meson.build b/python/meson.build index e209df542..c6b2a27ed 100644 --- a/python/meson.build +++ b/python/meson.build @@ -9,6 +9,8 @@ python_config.set('etc_dir', etc_dir) python_config.set('run_dir', run_dir) python_config.set('lib_dir', lib_dir) python_config.set('modules_dir', modules_dir) +python_config.set('user', user) +python_config.set('group', group) configure_file( input: 'knot_resolver.py.in',