from typing import List, Optional
-from knot_resolver_manager.datamodel.types import AnyPath, SizeUnit, TimeUnit
+from knot_resolver_manager.datamodel.types import CheckedPath, SizeUnit, TimeUnit
from knot_resolver_manager.utils import SchemaNode
domain: str
url: str
refresh_interval: TimeUnit = TimeUnit("1d")
- ca_file: Optional[AnyPath] = None
+ ca_file: Optional[CheckedPath] = None
class CacheSchema(SchemaNode):
- storage: AnyPath = AnyPath("/var/cache/knot-resolver")
+ storage: CheckedPath = CheckedPath("/var/cache/knot-resolver")
size_max: SizeUnit = SizeUnit("100M")
ttl_min: TimeUnit = TimeUnit("5s")
ttl_max: TimeUnit = TimeUnit("6d")
from typing import List, Optional
from knot_resolver_manager.datamodel.types import (
- AnyPath,
+ CheckedPath,
IPAddress,
+ IPNetwork,
IPv4Address,
IPv6Address,
Listen,
SizeUnit,
- IPNetwork,
)
from knot_resolver_manager.utils import SchemaNode
from knot_resolver_manager.utils.types import LiteralEnum
class TLSSchema(SchemaNode):
- cert_file: Optional[AnyPath] = None
- key_file: Optional[AnyPath] = None
+ cert_file: Optional[CheckedPath] = None
+ key_file: Optional[CheckedPath] = None
sticket_secret: Optional[str] = None
- sticket_secret_file: Optional[AnyPath] = None
+ sticket_secret_file: Optional[CheckedPath] = None
auto_discovery: bool = False
padding: int = 1
from typing_extensions import Literal
-from knot_resolver_manager.datamodel.types import AnyPath, Listen
+from knot_resolver_manager.datamodel.types import CheckedPath, Listen, UncheckedPath
from knot_resolver_manager.exceptions import DataException
from knot_resolver_manager.utils import SchemaNode
from knot_resolver_manager.utils.types import LiteralEnum
# the default listen path here MUST use the default rundir
listen: Listen = Listen({"unix-socket": "./manager.sock"})
backend: BackendEnum = "auto"
- rundir: AnyPath = AnyPath(".")
+ rundir: UncheckedPath = UncheckedPath(".")
class WebmgmtSchema(SchemaNode):
listen: Listen
tls: bool = False
- cert_file: Optional[AnyPath] = None
- key_file: Optional[AnyPath] = None
+ cert_file: Optional[CheckedPath] = None
+ key_file: Optional[CheckedPath] = None
class ServerSchema(SchemaNode):
from typing import List, Optional
-from knot_resolver_manager.datamodel.types import AnyPath, TimeUnit
+from knot_resolver_manager.datamodel.types import CheckedPath, TimeUnit
from knot_resolver_manager.utils import SchemaNode
ttl: Optional[TimeUnit] = None
no_data: bool = True
etc_hosts: bool = False
- root_hints_file: Optional[AnyPath] = None
- hints_files: Optional[List[AnyPath]] = None
+ root_hints_file: Optional[CheckedPath] = None
+ hints_files: Optional[List[CheckedPath]] = None
root_hints: Optional[List[Hint]] = None
hints: Optional[List[Hint]] = None
return self._value
-class AnyPath(CustomValueType):
+class UncheckedPath(CustomValueType):
+ """
+ Wrapper around pathlib.Path object. Can represent pretty much any Path. No checks are
+ performed on the value. The value is taken as is.
+ """
+
def __init__(self, source_value: Any, object_path: str = "/") -> None:
- super().__init__(source_value)
+ super().__init__(source_value, object_path=object_path)
if isinstance(source_value, str):
self._value: Path = Path(source_value)
else:
f"Expected file path in a string, got '{source_value}' with type '{type(source_value)}'", object_path
)
- try:
- self._value = self._value.resolve(strict=False)
- except RuntimeError as e:
- raise SchemaException("Failed to resolve given file path. Is there a symlink loop?", object_path) from e
-
def __str__(self) -> str:
return str(self._value)
def __eq__(self, o: object) -> bool:
- if not isinstance(o, AnyPath):
+ if not isinstance(o, UncheckedPath):
return False
-
+
return o._value == self._value
def __int__(self) -> int:
return str(self._value)
@classmethod
- def json_schema(cls: Type["AnyPath"]) -> Dict[Any, Any]:
+ def json_schema(cls: Type["UncheckedPath"]) -> Dict[Any, Any]:
return {
"type": "string",
}
+class CheckedPath(UncheckedPath):
+ """
+ Like UncheckedPath, but the file path is checked for being valid. So no non-existent directories in the middle,
+ no symlink loops. This however means, that resolving of relative path happens while validating.
+ """
+
+ def __init__(self, source_value: Any, object_path: str = "/") -> None:
+ super().__init__(source_value, object_path=object_path)
+ try:
+ self._value = self._value.resolve(strict=False)
+ except RuntimeError as e:
+ raise SchemaException("Failed to resolve given file path. Is there a symlink loop?", object_path) from e
+
+
class DomainName(CustomValueType):
_re = re.compile(
r"^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|"
class Raw(SchemaNode):
ip: Optional[IPAddress] = None
port: Optional[int] = None
- unix_socket: Optional[AnyPath] = None
+ unix_socket: Optional[CheckedPath] = None
interface: Optional[str] = None
_PREVIOUS_SCHEMA = Raw
typ: ListenType
ip: Optional[IPAddress]
port: Optional[int]
- unix_socket: Optional[AnyPath]
+ unix_socket: Optional[CheckedPath]
interface: Optional[str]
def _typ(self, origin: Raw):
_DEFAULT_SENTINEL = _DefaultSentinel()
-async def _init_config_store(config: Union[Path, ParsedTree, _DefaultSentinel]) -> ConfigStore:
+async def _load_raw_config(config: Union[Path, ParsedTree, _DefaultSentinel]) -> ParsedTree:
# Initial configuration of the manager
if isinstance(config, _DefaultSentinel):
# use default
config = DEFAULT_MANAGER_CONFIG_FILE
if isinstance(config, Path):
if not config.exists():
- raise KresdManagerException(f"Manager is configured to load config file at {config} on startup, but the file does not exist.")
+ raise KresdManagerException(
+ f"Manager is configured to load config file at {config} on startup, but the file does not exist."
+ )
else:
logger.info("Loading initial configuration from %s", config)
config = parse_yaml(await readfile(config))
# validate the initial configuration
assert isinstance(config, ParsedTree)
+ return config
+
+
+async def _load_config(config: ParsedTree) -> KresConfig:
logger.info("Validating initial configuration...")
config_validated = KresConfig(config)
+ return config_validated
+
+async def _init_config_store(config: ParsedTree) -> ConfigStore:
+ config_validated = await _load_config(config)
return ConfigStore(config_validated)
return manager
-async def _validate_working_directory(config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
+async def _deny_working_directory_changes(config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
if config_old.server.management.rundir != config_new.server.management.rundir:
return Result.err("Changing manager's `rundir` during runtime is not allowed.")
-
- if not config_new.server.management.rundir.to_path().exists():
- return Result.err(f"Configured `rundir` directory ({config_new.server.management.rundir}) does not exist!")
return Result.ok(None)
-async def _set_working_directory(config: KresConfig):
+def _set_working_directory(config_raw: ParsedTree):
+ config = KresConfig(config_raw)
+
+ if not config.server.management.rundir.to_path().exists():
+ raise KresdManagerException(f"`rundir` directory ({config.server.management.rundir}) does not exist!")
+
os.chdir(config.server.management.rundir.to_path())
async def start_server(config: Union[Path, ParsedTree, _DefaultSentinel] = _DEFAULT_SENTINEL):
start_time = time()
- # before starting server, initialize the subprocess controller etc. Any errors during inicialization are fatal
+ # before starting server, initialize the subprocess controller, config store, etc. Any errors during inicialization
+ # are fatal
try:
- config_store = await _init_config_store(config)
- await config_store.register_verifier(_validate_working_directory)
- await config_store.register_on_change_callback(_set_working_directory)
+ # Preprocess config - load from file or in general take it to the last step before validation.
+ config_raw = await _load_raw_config(config)
+
+ # We want to change cwd as soon as possible. Especially before any real config validation, because cwd
+ # is used for resolving relative paths. Thats also a reason, why in practice, we validate the config twice.
+ # Once when setting up the cwd just to read the `rundir` property. When cwd is set, we do it again to resolve
+ # all paths correctly.
+ # Note: the first config validation is done here - therefore all initial config validation errors will
+ # originate from here.
+ _set_working_directory(config_raw)
+
+ # After the working directory is set, we can initialize proper config store with a newly parsed configuration.
+ config_store = await _init_config_store(config_raw)
+
+ # This behaviour described above with paths means, that we MUST NOT allow `rundir` change after initialization.
+ # It would cause strange problems because every other path configuration depends on it. Therefore, we have to
+ # add a check to the config store, which disallows changes.
+ await config_store.register_verifier(_deny_working_directory_changes)
+
+ # After we have loaded the configuration, we can start worring about subprocess management.
manager = await _init_manager(config_store)
except KresdManagerException as e:
logger.error(e)
sys.exit(1)
- except BaseException as e:
- logger.error("Uncaught generic exception during manager inicialization..." , exc_info=True)
+ except BaseException:
+ logger.error("Uncaught generic exception during manager inicialization...", exc_info=True)
sys.exit(1)
+ # At this point, all backend functionality-providing components are initialized. It's therefore save to start
+ # the API server.
server = Server(config_store)
await server.start()
-
- # stop the server gracefully and cleanup everything
logger.info(f"Manager fully initialized and running in {round(time() - start_time, 3)} seconds")
await server.wait_for_shutdown()
+ # After triggering shutdown, we neet to clean everything up
logger.info("Gracefull shutdown triggered.")
logger.info("Stopping API service...")
await server.shutdown()
from knot_resolver_manager.datamodel.types import (
AnyPath,
+ CheckedPath,
DomainName,
IPAddress,
IPAddressPort,
+ IPNetwork,
IPv4Address,
IPv6Address,
- IPNetwork,
IPv6Network96,
Listen,
ListenType,
SizeUnit,
TimeUnit,
+ UncheckedPath,
)
from knot_resolver_manager.exceptions import KresdManagerException
from knot_resolver_manager.utils import SchemaNode
assert o.time.seconds() == 10 * 60
-def test_anypath():
+def test_checked_path():
class TestSchema(SchemaNode):
- p: AnyPath
+ p: CheckedPath
assert str(TestSchema({"p": "/tmp"}).p) == "/tmp"