class Context:
resolve_root: Optional[Path]
strict_validation: bool
+ permissions_default: bool
- def __init__(self, resolve_root: Optional[Path], strict_validation: bool = True) -> None:
+ def __init__(
+ self, resolve_root: Optional[Path], strict_validation: bool = True, permissions_default: bool = True
+ ) -> None:
self.resolve_root = resolve_root
self.strict_validation = strict_validation
+ self.permissions_default = permissions_default
_global_context: Context = Context(None)
def get_strict_validation() -> bool:
return _global_context.strict_validation
+
+
+def get_permissions_default() -> bool:
+ return _global_context.permissions_default
-import logging
import os
import stat
from enum import Flag, auto
from typing import Any, Dict, Tuple, Type, TypeVar
from knot_resolver.constants import GROUP, USER
-from knot_resolver.datamodel.globals import get_resolve_root, get_strict_validation
+from knot_resolver.datamodel.globals import get_permissions_default, get_resolve_root, get_strict_validation
from knot_resolver.utils.modeling.base_value_type import BaseValueType
-logger = logging.Logger(__name__)
-
class UncheckedPath(BaseValueType):
"""
_PermissionMode.EXECUTE: [stat.S_IXUSR, stat.S_IXGRP, stat.S_IXOTH],
}
- # process working user id
- pwuid = os.getuid()
- pwgid = os.getgid()
-
- # defaults
- user_uid = getpwnam(USER).pw_uid
- user_gid = getgrnam(GROUP).gr_gid
-
- # if current user do not match intended user
- # log warning message and check permissions for current user running the manager
- if pwuid != user_uid:
- logger.warning(
- f"Knot Resolver does not run under the intended '{USER}' user, '{getpwuid(pwuid).pw_name}' instead."
- " This may or may not affect the configuration validation and the proper functioning of the resolver."
- )
- user_uid = pwuid
- user_gid = pwgid
+ if get_permissions_default():
+ user_uid = getpwnam(USER).pw_uid
+ user_gid = getgrnam(GROUP).gr_gid
+ username = USER
+ else:
+ user_uid = os.getuid()
+ user_gid = os.getgid()
+ username = getpwuid(user_uid).pw_name
dest_stat = os.stat(dest_path)
dest_uid = dest_stat.st_uid
def accessible(perm: _PermissionMode) -> bool:
if user_uid == dest_uid:
return bool(dest_mode & chflags[perm][0])
- b_groups = os.getgrouplist(getpwuid(pwuid).pw_name, user_gid)
+ b_groups = os.getgrouplist(username, user_gid)
if user_gid == dest_gid or dest_gid in b_groups:
return bool(dest_mode & chflags[perm][1])
return bool(dest_mode & chflags[perm][2])
config_raw = await _load_raw_config(config)
# before processing any configuration, set validation context
- # - resolve_root = root against which all relative paths will be resolved
- set_global_validation_context(Context(config.parent, True))
+ # - resolve_root: root against which all relative paths will be resolved
+ # - strict_validation: check for path existence during configuration validation
+ # - permissions_default: validate dirs/files rwx permissions against default user:group in constants
+ set_global_validation_context(Context(config.parent, True, False))
# We want to change cwd as soon as possible. Some parts of the codebase are using os.getcwd() to get the
# working directory.