]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
datamodel: file permission checks: Created new types to check if files can be opened
authorFrantisek Tobias <frantisek.tobias@nic.cz>
Thu, 15 Aug 2024 11:41:44 +0000 (13:41 +0200)
committerAleš Mrázek <ales.mrazek@nic.cz>
Mon, 2 Sep 2024 14:42:37 +0000 (16:42 +0200)
manager/knot_resolver_manager/datamodel/types/__init__.py
manager/knot_resolver_manager/datamodel/types/files.py
manager/knot_resolver_manager/server.py

index 350cf2133888e9aa47879b70a1cb86e2d74c952c..a87c5c7c147c3b6e894ce5f764865a50222e75c3 100644 (file)
@@ -1,5 +1,5 @@
 from .enums import DNSRecordTypeEnum, PolicyActionEnum, PolicyFlagEnum
-from .files import AbsoluteDir, Dir, File, FilePath
+from .files import AbsoluteDir, Dir, File, FilePath, WritableFile, ReadableFile
 from .generic_types import ListOrItem
 from .types import (
     DomainName,
@@ -60,6 +60,8 @@ __all__ = [
     "SizeUnit",
     "TimeUnit",
     "AbsoluteDir",
+    "ReadableFile",
+    "WritableFile",
     "File",
     "FilePath",
     "Dir",
index 49b51f713b29708fc25e24bca942f67f8ebb4bd2..ec2fcca541ccfe1160caf74c477b94c2e51e79f2 100644 (file)
@@ -1,3 +1,5 @@
+from logging import debug
+from os import close
 from pathlib import Path
 from typing import Any, Dict, Tuple, Type, TypeVar
 
@@ -135,3 +137,44 @@ class FilePath(UncheckedPath):
             raise ValueError(f"path '{self._value}' does not point inside an existing directory")
         if self.strict_validation and self._value.is_dir():
             raise ValueError(f"path '{self._value}' points to a directory when we expected a file")
+
+
+class ReadableFile(UncheckedPath):
+    """
+    File, that is enforced to be:
+    - readable by kresd
+    """
+    def __init__(
+        self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/"
+    ) -> None:
+        super().__init__(source_value, parents=parents, object_path=object_path)
+        try:
+            f = open(self._value, "r")
+        except IOError as e:
+            if e.args == (13, 'permission denied'):
+                raise ValueError(f"file'{self._value}' isn't readable")
+            raise ValueError(f"Unexpected error '{e}'")
+
+            f.close()
+
+
+class WritableFile(UncheckedPath):
+    """
+    File, that is enforced to be:
+    - writable by kresd
+    """
+    def __init__(
+        self, source_value: Any, parents: Tuple["UncheckedPath", ...] = tuple(), object_path: str = "/"
+    ) -> None:
+        print(type(self))
+        super().__init__(source_value, parents=parents, object_path=object_path)
+        try:
+            f = open(self._value, "w")
+        except IOError as e:
+            if e.args == (13, 'permission denied'):
+                raise ValueError(f"file'{self._value}' isn't readable")
+            raise ValueError(f"Unexpected error '{e}'")
+
+        f.close()
+
+
index b27cadb339fc04571c53ed9170b7e1c896b4187a..b5ebd6c2f771c1c5b45766353538a64cd115c394 100644 (file)
@@ -18,6 +18,7 @@ from aiohttp.web_response import json_response
 from aiohttp.web_runner import AppRunner, TCPSite, UnixSite
 from typing_extensions import Literal
 
+from knot_resolver_manager.datamodel.types.files import ReadableFile, WritableFile
 import knot_resolver_manager.utils.custom_atexit as atexit
 from knot_resolver_manager import log, statistics
 from knot_resolver_manager.compat import asyncio as asyncio_compat
@@ -508,6 +509,7 @@ async def start_server(config: Path = DEFAULT_MANAGER_CONFIG_FILE) -> int:
     # This function is quite long, but it describes how manager runs. So let's silence pylint
     # pylint: disable=too-many-statements
 
+    ReadableFile(config)
     start_time = time()
     working_directory_on_startup = os.getcwd()
     manager: Optional[KresManager] = None
@@ -586,6 +588,11 @@ async def start_server(config: Path = DEFAULT_MANAGER_CONFIG_FILE) -> int:
         logger.error(e)
         return 1
 
+    except PermissionError as e:
+        logger.error(f"Reading of the configuration file failed: {e}")
+        # logger.error("Insufficient permissions")
+        return 1
+
     except BaseException:
         logger.error("Uncaught generic exception during manager inicialization...", exc_info=True)
         return 1