-from .exceptions import KresBaseException
from .constants import VERSION
+from .exceptions import KresBaseException
__version__ = VERSION
from urllib.parse import quote
from knot_resolver.constants import (
+ API_SOCK_PATH_DEFAULT,
+ API_SOCK_PATH_ENV_VAR,
CONFIG_FILE_PATH_DEFAULT,
CONFIG_FILE_PATH_ENV_VAR,
- API_SOCK_PATH_ENV_VAR,
- API_SOCK_PATH_DEFAULT,
)
from knot_resolver.datamodel.types import IPAddressPort
from knot_resolver.utils.modeling import parsing
from knot_resolver.client.command import Command, CommandArgs, CompWords, register_command
from knot_resolver.datamodel import KresConfig
-from knot_resolver.datamodel.globals import (
- Context,
- reset_global_validation_context,
- set_global_validation_context,
-)
+from knot_resolver.datamodel.globals import Context, reset_global_validation_context, set_global_validation_context
from knot_resolver.utils.modeling import try_to_parse
from knot_resolver.utils.modeling.exceptions import DataParsingError, DataValidationError
from knot_resolver.client.command import Command, CommandArgs, CompWords, register_command
from knot_resolver.datamodel import KresConfig
-from knot_resolver.datamodel.globals import (
- Context,
- reset_global_validation_context,
- set_global_validation_context,
-)
+from knot_resolver.datamodel.globals import Context, reset_global_validation_context, set_global_validation_context
from knot_resolver.utils.modeling import try_to_parse
from knot_resolver.utils.modeling.exceptions import DataParsingError, DataValidationError
from knot_resolver.constants import VERSION
+from .client import KRES_CLIENT_NAME, KresClient
from .command import install_commands_parsers
-from .client import KresClient, KRES_CLIENT_NAME
def auto_import_commands() -> None:
import logging
-
from importlib.metadata import version
from importlib.util import find_spec
from pathlib import Path
import logging
from typing import List, Optional
-from knot_resolver.datamodel.config_schema import KresConfig
from knot_resolver.controller.interface import SubprocessController
+from knot_resolver.datamodel.config_schema import KresConfig
logger = logging.getLogger(__name__)
from typing import Dict, Iterable, Optional, Type, TypeVar
from weakref import WeakValueDictionary
-from knot_resolver.manager.constants import kresd_config_file, policy_loader_config_file
-from knot_resolver.datamodel.config_schema import KresConfig
from knot_resolver.controller.exceptions import SubprocessControllerException
from knot_resolver.controller.registered_workers import register_worker, unregister_worker
+from knot_resolver.datamodel.config_schema import KresConfig
+from knot_resolver.manager.constants import kresd_config_file, policy_loader_config_file
from knot_resolver.utils.async_utils import writefile
logger = logging.getLogger(__name__)
import supervisor.xmlrpc # type: ignore[import]
from knot_resolver.compat.asyncio import async_in_a_thread
-from knot_resolver.manager.constants import supervisord_config_file, supervisord_pid_file, supervisord_sock_file
-from knot_resolver.datamodel.config_schema import KresConfig
-from knot_resolver.controller.exceptions import SubprocessControllerExecException, SubprocessControllerException
+from knot_resolver.controller.exceptions import SubprocessControllerException, SubprocessControllerExecException
from knot_resolver.controller.interface import (
KresID,
Subprocess,
SubprocessType,
)
from knot_resolver.controller.supervisord.config_file import SupervisordKresID, write_config_file
+from knot_resolver.datamodel.config_schema import KresConfig
+from knot_resolver.manager.constants import supervisord_config_file, supervisord_pid_file, supervisord_sock_file
from knot_resolver.utils import which
from knot_resolver.utils.async_utils import call, readfile
import logging
import os
-
-from typing import Literal
from dataclasses import dataclass
from pathlib import Path
+from typing import Literal
+
from jinja2 import Template
-from knot_resolver.constants import (
- kres_cache_gc_executable,
- kresd_executable,
-)
+from knot_resolver.constants import kres_cache_gc_executable, kresd_executable
+from knot_resolver.controller.interface import KresID, SubprocessType
+from knot_resolver.datamodel.config_schema import KresConfig
+from knot_resolver.datamodel.logging_schema import LogTargetEnum
from knot_resolver.manager.constants import (
kres_cache_dir,
kresd_config_file_supervisord_pattern,
supervisord_subprocess_log_dir,
user_constants,
)
-from knot_resolver.datamodel.config_schema import KresConfig
-from knot_resolver.datamodel.logging_schema import LogTargetEnum
-from knot_resolver.controller.interface import KresID, SubprocessType
from knot_resolver.utils.async_utils import read_resource, writefile
logger = logging.getLogger(__name__)
import socket
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
-from knot_resolver.constants import RUN_DIR_DEFAULT, API_SOCK_PATH_DEFAULT, WORKERS_MAX_DEFAULT
-
+from knot_resolver.constants import API_SOCK_PATH_DEFAULT, RUN_DIR_DEFAULT, WORKERS_MAX_DEFAULT
from knot_resolver.datamodel.cache_schema import CacheSchema
from knot_resolver.datamodel.dns64_schema import Dns64Schema
from knot_resolver.datamodel.dnssec_schema import DnssecSchema
from typing import Any, List, Literal, Optional, Union
-from knot_resolver.datamodel.types import (
- DomainName,
- IPAddressOptionalPort,
- ListOrItem,
- PinSha256,
- ReadableFile,
-)
+from knot_resolver.datamodel.types import DomainName, IPAddressOptionalPort, ListOrItem, PinSha256, ReadableFile
from knot_resolver.utils.modeling import ConfigSchema
from typing import Optional
-from knot_resolver.datamodel.types import WritableFilePath, IPAddressPort
+from knot_resolver.datamodel.types import IPAddressPort, WritableFilePath
from knot_resolver.utils.modeling import ConfigSchema
from knot_resolver.datamodel.types import (
EscapedStr32B,
- WritableFilePath,
Int0_512,
Int0_65535,
InterfaceOptionalPort,
PortNumber,
ReadableFile,
SizeUnit,
+ WritableFilePath,
)
from knot_resolver.utils.modeling import ConfigSchema
import os
import stat
-from enum import auto, Flag
+from enum import Flag, auto
from grp import getgrnam
from pathlib import Path
from pwd import getpwnam
from typing import Any, Dict, Tuple, Type, TypeVar
-from knot_resolver.constants import USER_DEFAULT, GROUP_DEFAULT
+from knot_resolver.constants import GROUP_DEFAULT, USER_DEFAULT
from knot_resolver.datamodel.globals import get_resolve_root, get_strict_validation
from knot_resolver.utils.modeling.base_value_type import BaseValueType
import re
from typing import Any, Dict, Optional, Type, Union
-from knot_resolver.datamodel.types.base_types import (
- IntRangeBase,
- PatternBase,
- StrBase,
- StringLengthBase,
- UnitBase,
-)
+from knot_resolver.datamodel.types.base_types import IntRangeBase, PatternBase, StrBase, StringLengthBase, UnitBase
from knot_resolver.utils.modeling import BaseValueType
from typing import Optional
-from knot_resolver.datamodel.types import WritableFilePath, InterfacePort, ReadableFile
+from knot_resolver.datamodel.types import InterfacePort, ReadableFile, WritableFilePath
from knot_resolver.utils.modeling import ConfigSchema
from knot_resolver.manager.main import main
-
if __name__ == "__main__":
main()
from asyncio import Lock
from typing import Any, Awaitable, Callable, List, Tuple
-from knot_resolver.datamodel import KresConfig
from knot_resolver import KresBaseException
+from knot_resolver.datamodel import KresConfig
from knot_resolver.utils.functional import Result
from knot_resolver.utils.modeling.exceptions import DataParsingError
from knot_resolver.utils.modeling.types import NoneType
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
- from knot_resolver.manager.config_store import ConfigStore
- from knot_resolver.datamodel.config_schema import KresConfig
from knot_resolver.controller.interface import KresID
+ from knot_resolver.datamodel.config_schema import KresConfig
+ from knot_resolver.manager.config_store import ConfigStore
def kres_cache_dir(config: "KresConfig") -> Path:
from typing import Any, Callable, List, Optional
from knot_resolver.compat.asyncio import create_task
-from knot_resolver.manager.config_store import (
- ConfigStore,
- only_on_real_changes_update,
- only_on_real_changes_verifier,
-)
-from knot_resolver.constants import (
- FIX_COUNTER_ATTEMPTS_MAX,
- FIX_COUNTER_DECREASE_INTERVAL_SEC,
- WATCHDOG_INTERVAL_SEC,
-)
+from knot_resolver.constants import FIX_COUNTER_ATTEMPTS_MAX, FIX_COUNTER_DECREASE_INTERVAL_SEC, WATCHDOG_INTERVAL_SEC
from knot_resolver.controller.exceptions import SubprocessControllerException
-from knot_resolver.controller.interface import (
- Subprocess,
- SubprocessController,
- SubprocessStatus,
- SubprocessType,
-)
-from knot_resolver.controller.registered_workers import (
- command_registered_workers,
- get_registered_workers_kresids,
-)
+from knot_resolver.controller.interface import Subprocess, SubprocessController, SubprocessStatus, SubprocessType
+from knot_resolver.controller.registered_workers import command_registered_workers, get_registered_workers_kresids
+from knot_resolver.datamodel import KresConfig
+from knot_resolver.manager.config_store import ConfigStore, only_on_real_changes_update, only_on_real_changes_verifier
from knot_resolver.utils.functional import Result
from knot_resolver.utils.modeling.types import NoneType
-from knot_resolver.datamodel import KresConfig
-
logger = logging.getLogger(__name__)
from typing import Optional
from knot_resolver.constants import LOGGING_LEVEL_STARTUP
-from knot_resolver.manager.config_store import ConfigStore, only_on_real_changes_update
from knot_resolver.datamodel.config_schema import KresConfig
from knot_resolver.datamodel.logging_schema import LogTargetEnum
+from knot_resolver.manager.config_store import ConfigStore, only_on_real_changes_update
logger = logging.getLogger(__name__)
from pathlib import Path
from typing import NoReturn
-from knot_resolver.constants import VERSION
-
from knot_resolver import compat
-from knot_resolver.constants import CONFIG_FILE_PATH_DEFAULT, CONFIG_FILE_PATH_ENV_VAR
+from knot_resolver.constants import CONFIG_FILE_PATH_DEFAULT, CONFIG_FILE_PATH_ENV_VAR, VERSION
from knot_resolver.manager.log import logger_startup
from knot_resolver.manager.server import start_server
import knot_resolver.utils.custom_atexit as atexit
from knot_resolver import KresBaseException
-from knot_resolver.constants import CONFIG_FILE_PATH_DEFAULT, PID_FILE_NAME
-from knot_resolver.manager import log, statistics
from knot_resolver.compat import asyncio as asyncio_compat
-from knot_resolver.manager.config_store import ConfigStore
-from knot_resolver.manager.constants import init_user_constants
+from knot_resolver.constants import CONFIG_FILE_PATH_DEFAULT, PID_FILE_NAME
+from knot_resolver.controller import get_best_controller_implementation
+from knot_resolver.controller.exceptions import SubprocessControllerExecException
+from knot_resolver.controller.registered_workers import command_single_registered_worker
from knot_resolver.datamodel.cache_schema import CacheClearRPCSchema
from knot_resolver.datamodel.config_schema import KresConfig, get_rundir_without_validation
from knot_resolver.datamodel.globals import Context, set_global_validation_context
from knot_resolver.datamodel.management_schema import ManagementSchema
-from knot_resolver.controller import get_best_controller_implementation
-from knot_resolver.controller.exceptions import SubprocessControllerExecException
-from knot_resolver.controller.registered_workers import command_single_registered_worker
+from knot_resolver.manager import log, statistics
+from knot_resolver.manager.config_store import ConfigStore
+from knot_resolver.manager.constants import init_user_constants
from knot_resolver.utils import ignore_exceptions_optional
from knot_resolver.utils.async_utils import readfile
from knot_resolver.utils.etag import structural_etag
from knot_resolver.utils.functional import Result
-from knot_resolver.utils.modeling.exceptions import (
- AggregateDataValidationError,
- DataParsingError,
- DataValidationError,
-)
+from knot_resolver.utils.modeling.exceptions import AggregateDataValidationError, DataParsingError, DataValidationError
from knot_resolver.utils.modeling.parsing import DataFormat, try_to_parse
from knot_resolver.utils.modeling.query import query
from knot_resolver.utils.modeling.types import NoneType
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Tuple
from knot_resolver import compat
-from knot_resolver.manager.config_store import ConfigStore, only_on_real_changes_update
+from knot_resolver.controller.registered_workers import command_registered_workers, get_registered_workers_kresids
from knot_resolver.datamodel.config_schema import KresConfig
-from knot_resolver.controller.registered_workers import (
- command_registered_workers,
- get_registered_workers_kresids,
-)
+from knot_resolver.manager.config_store import ConfigStore, only_on_real_changes_update
from knot_resolver.utils.functional import Result
from knot_resolver.utils.modeling.parsing import DataFormat
from abc import ABC, abstractmethod # pylint: disable=[no-name-in-module]
from typing import Any, List, Literal, Optional, Tuple, Union
-
from knot_resolver.utils.modeling.base_schema import BaseSchema, map_object
from knot_resolver.utils.modeling.json_pointer import json_ptr_resolve
import pytest
from pytest import raises
-from knot_resolver.datamodel.types.base_types import IntRangeBase, StringLengthBase
from knot_resolver import KresBaseException
+from knot_resolver.datamodel.types.base_types import IntRangeBase, StringLengthBase
@pytest.mark.parametrize("min,max", [(0, None), (None, 0), (1, 65535), (-65535, -1)])
import pytest
-from knot_resolver.manager.config_store import ConfigStore, only_on_real_changes_update
from knot_resolver.datamodel.config_schema import KresConfig
+from knot_resolver.manager.config_store import ConfigStore, only_on_real_changes_update
@pytest.mark.asyncio # type: ignore