From: Frantisek Tobias Date: Thu, 15 Aug 2024 11:41:44 +0000 (+0200) Subject: datamodel: file permission checks: Created new types to check if files can be opened X-Git-Tag: v6.0.9~23^2~8 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3dd77dbd7696345970d866ab254e324e2ea95666;p=thirdparty%2Fknot-resolver.git datamodel: file permission checks: Created new types to check if files can be opened --- diff --git a/manager/knot_resolver_manager/datamodel/types/__init__.py b/manager/knot_resolver_manager/datamodel/types/__init__.py index 350cf2133..a87c5c7c1 100644 --- a/manager/knot_resolver_manager/datamodel/types/__init__.py +++ b/manager/knot_resolver_manager/datamodel/types/__init__.py @@ -1,5 +1,5 @@ from .enums import DNSRecordTypeEnum, PolicyActionEnum, PolicyFlagEnum -from .files import AbsoluteDir, Dir, File, FilePath +from .files import AbsoluteDir, Dir, File, FilePath, WritableFile, ReadableFile from .generic_types import ListOrItem from .types import ( DomainName, @@ -60,6 +60,8 @@ __all__ = [ "SizeUnit", "TimeUnit", "AbsoluteDir", + "ReadableFile", + "WritableFile", "File", "FilePath", "Dir", diff --git a/manager/knot_resolver_manager/datamodel/types/files.py b/manager/knot_resolver_manager/datamodel/types/files.py index 49b51f713..ec2fcca54 100644 --- a/manager/knot_resolver_manager/datamodel/types/files.py +++ b/manager/knot_resolver_manager/datamodel/types/files.py @@ -1,3 +1,5 @@ +from logging import debug +from os import close from pathlib import Path from typing import Any, Dict, Tuple, Type, TypeVar @@ -135,3 +137,44 @@ class FilePath(UncheckedPath): raise ValueError(f"path '{self._value}' does not point inside an existing directory") if self.strict_validation and self._value.is_dir(): raise ValueError(f"path '{self._value}' points to a directory when we expected a file") + + +class ReadableFile(UncheckedPath): + """ + File, that is enforced to be: + - readable by kresd + """ + def __init__( + self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/" + ) -> None: + super().__init__(source_value, parents=parents, object_path=object_path) + try: + f = open(self._value, "r") + except IOError as e: + if e.args == (13, 'permission denied'): + raise ValueError(f"file'{self._value}' isn't readable") + raise ValueError(f"Unexpected error '{e}'") + + f.close() + + +class WritableFile(UncheckedPath): + """ + File, that is enforced to be: + - writable by kresd + """ + def __init__( + self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/" + ) -> None: + print(type(self)) + super().__init__(source_value, parents=parents, object_path=object_path) + try: + f = open(self._value, "w") + except IOError as e: + if e.args == (13, 'permission denied'): + raise ValueError(f"file'{self._value}' isn't readable") + raise ValueError(f"Unexpected error '{e}'") + + f.close() + + diff --git a/manager/knot_resolver_manager/server.py b/manager/knot_resolver_manager/server.py index b27cadb33..b5ebd6c2f 100644 --- a/manager/knot_resolver_manager/server.py +++ b/manager/knot_resolver_manager/server.py @@ -18,6 +18,7 @@ from aiohttp.web_response import json_response from aiohttp.web_runner import AppRunner, TCPSite, UnixSite from typing_extensions import Literal +from knot_resolver_manager.datamodel.types.files import ReadableFile, WritableFile import knot_resolver_manager.utils.custom_atexit as atexit from knot_resolver_manager import log, statistics from knot_resolver_manager.compat import asyncio as asyncio_compat @@ -508,6 +509,7 @@ async def start_server(config: Path = DEFAULT_MANAGER_CONFIG_FILE) -> int: # This function is quite long, but it describes how manager runs. So let's silence pylint # pylint: disable=too-many-statements + ReadableFile(config) start_time = time() working_directory_on_startup = os.getcwd() manager: Optional[KresManager] = None @@ -586,6 +588,11 @@ async def start_server(config: Path = DEFAULT_MANAGER_CONFIG_FILE) -> int: logger.error(e) return 1 + except PermissionError as e: + logger.error(f"Reading of the configuration file failed: {e}") + # logger.error("Insufficient permissions") + return 1 + except BaseException: logger.error("Uncaught generic exception during manager inicialization...", exc_info=True) return 1