f'http+unix://{quote(management["unix-socket"], safe="")}/',
f'Key "/management/unix-socket" in "{config}" file',
)
- elif "interface" in management:
+ if "interface" in management:
ip = IPAddressPort(management["interface"], object_path=f"/{mkey}/interface")
return SocketDesc(
f"http://{ip.addr}:{ip.port}",
def operation_to_method(operation: Operations) -> Literal["PUT", "GET", "DELETE"]:
if operation == Operations.SET:
return "PUT"
- elif operation == Operations.DELETE:
+ if operation == Operations.DELETE:
return "DELETE"
return "GET"
config_subparsers = config.add_subparsers(help="operation type")
# GET operation
- get = config_subparsers.add_parser("get", help="Get current configuration from the resolver.")
- get.set_defaults(operation=Operations.GET, format=DataFormat.YAML)
+ get_op = config_subparsers.add_parser("get", help="Get current configuration from the resolver.")
+ get_op.set_defaults(operation=Operations.GET, format=DataFormat.YAML)
- get.add_argument(
+ get_op.add_argument(
"-p",
"--path",
help=path_help,
type=str,
default="",
)
- get.add_argument(
+ get_op.add_argument(
"file",
help="Optional, path to the file where to save exported configuration data. If not specified, data will be printed.",
type=str,
nargs="?",
)
- get_formats = get.add_mutually_exclusive_group()
+ get_formats = get_op.add_mutually_exclusive_group()
get_formats.add_argument(
"--json",
help="Get configuration data in JSON format.",
)
# SET operation
- set = config_subparsers.add_parser("set", help="Set new configuration for the resolver.")
- set.set_defaults(operation=Operations.SET)
+ set_op = config_subparsers.add_parser("set", help="Set new configuration for the resolver.")
+ set_op.set_defaults(operation=Operations.SET)
- set.add_argument(
+ set_op.add_argument(
"-p",
"--path",
help=path_help,
default="",
)
- value_or_file = set.add_mutually_exclusive_group()
+ value_or_file = set_op.add_mutually_exclusive_group()
value_or_file.add_argument(
"file",
help="Optional, path to file with new configuraion.",
)
# DELETE operation
- delete = config_subparsers.add_parser(
+ delete_op = config_subparsers.add_parser(
"delete", help="Delete given configuration property or list item at the given index."
)
- delete.set_defaults(operation=Operations.DELETE)
- delete.add_argument(
+ delete_op.set_defaults(operation=Operations.DELETE)
+ delete_op.add_argument(
"-p",
"--path",
help=path_help,
if await controller.is_controller_available(config):
logger.info("Selected controller '%s'", str(controller))
return controller
- else:
- raise LookupError("The selected subprocess controller is not available for use on this system.")
+ raise LookupError("The selected subprocess controller is not available for use on this system.")
# run the imports on module load
from knot_resolver import KresBaseException
-class SubprocessControllerException(KresBaseException):
+class SubprocessControllerError(KresBaseException):
pass
-class SubprocessControllerExecException(Exception):
+class SubprocessControllerExecError(Exception):
"""
Exception that is used to deliberately terminate system startup
and make exec() of something else. This is used by the subprocess controller
from typing import Dict, Iterable, Optional, Type, TypeVar
from weakref import WeakValueDictionary
-from knot_resolver.controller.exceptions import SubprocessControllerException
+from knot_resolver.controller.exceptions import SubprocessControllerError
from knot_resolver.controller.registered_workers import register_worker, unregister_worker
from knot_resolver.datamodel.config_schema import KresConfig
from knot_resolver.manager.constants import kresd_config_file, policy_loader_config_file
# find free ID closest to zero
for i in itertools.count(start=0, step=1):
if i not in cls._used[typ]:
- res = cls.new(typ, i)
- return res
+ return cls.new(typ, i)
raise RuntimeError("Reached an end of an infinite loop. How?")
# typed based on subclass. I am not even sure that it's different between subclasses,
# it's probably still the same dict. But we don't really care about it
return cls._used[typ][n] # type: ignore
- else:
- val = cls(typ, n, _i_know_what_i_am_doing=True)
- cls._used[typ][n] = val
- return val
+ val = cls(typ, n, _i_know_what_i_am_doing=True)
+ cls._used[typ][n] = val
+ return val
def __init__(self, typ: SubprocessType, n: int, _i_know_what_i_am_doing: bool = False):
if not _i_know_what_i_am_doing:
if self.type is SubprocessType.KRESD:
register_worker(self)
self._registered_worker = True
- except SubprocessControllerException as e:
+ except SubprocessControllerError as e:
if config_file:
config_file.unlink()
raise e
import logging
from typing import TYPE_CHECKING, Dict, List, Tuple
-from .exceptions import SubprocessControllerException
+from .exceptions import SubprocessControllerError
if TYPE_CHECKING:
from knot_resolver.controller.interface import KresID, Subprocess
async def command_single_registered_worker(cmd: str) -> "Tuple[KresID, object]":
for sub in _REGISTERED_WORKERS.values():
return sub.id, await sub.command(cmd)
- raise SubprocessControllerException(
+ raise SubprocessControllerError(
"Unable to execute the command. There is no kresd worker running to execute the command."
"Try start/restart the resolver.",
)
import supervisor.xmlrpc # type: ignore[import]
-from knot_resolver.controller.exceptions import SubprocessControllerException, SubprocessControllerExecException
+from knot_resolver.controller.exceptions import SubprocessControllerError, SubprocessControllerExecError
from knot_resolver.controller.interface import (
KresID,
Subprocess,
logger.debug("Starting supervisord")
res = await call(["supervisord", "--configuration", str(supervisord_config_file(config).absolute())])
if res != 0:
- raise SubprocessControllerException(f"Supervisord exited with exit code {res}")
+ raise SubprocessControllerError(f"Supervisord exited with exit code {res}")
async def _exec_supervisord(config: KresConfig) -> NoReturn:
logger.debug("Writing supervisord config")
await write_config_file(config)
logger.debug("Execing supervisord")
- raise SubprocessControllerExecException(
+ raise SubprocessControllerExecError(
[
str(which.which("supervisord")),
"supervisord",
supervisord = _create_supervisord_proxy(config)
supervisord.reloadConfig()
except Fault as e:
- raise SubprocessControllerException("supervisord reload failed") from e
+ raise SubprocessControllerError("supervisord reload failed") from e
@async_in_a_thread
pid = await _get_supervisord_pid(config)
if pid is None:
return False
- elif not _is_process_runinng(pid):
+ if not _is_process_runinng(pid):
supervisord_pid_file(config).unlink()
return False
- else:
- return True
+ return True
def _create_proxy(config: KresConfig) -> ServerProxy:
supervisord = _create_supervisord_proxy(config)
processes: Any = supervisord.getAllProcessInfo()
except Fault as e:
- raise SubprocessControllerException(f"failed to get info from all running processes: {e}") from e
+ raise SubprocessControllerError(f"failed to get info from all running processes: {e}") from e
# there will be a manager process as well, but we don't want to report anything on ourselves
processes = [pr for pr in processes if pr["name"] != "manager"]
supervisord = _create_supervisord_proxy(self._config)
status = supervisord.getProcessInfo(self.name)
except Fault as e:
- raise SubprocessControllerException(f"failed to get status from '{self.id}' process: {e}") from e
+ raise SubprocessControllerError(f"failed to get status from '{self.id}' process: {e}") from e
return _convert_subprocess_status(status)
@async_in_a_thread
supervisord = _create_fast_proxy(self._config)
supervisord.startProcess(self.name)
except Fault as e:
- raise SubprocessControllerException(f"failed to start '{self.id}'") from e
+ raise SubprocessControllerError(f"failed to start '{self.id}'") from e
@async_in_a_thread
def _stop(self) -> None:
for id_ in states
if states[id_] == SubprocessStatus.RUNNING
]
- else:
- return []
+ return []
async def initialize_controller(self, config: KresConfig) -> None:
self._controller_config = config
# the double name is checked because thats how we read it from supervisord
if val in ("cache-gc", "cache-gc:cache-gc"):
return SupervisordKresID.new(SubprocessType.GC, 0)
- elif val in ("policy-loader", "policy-loader:policy-loader"):
+ if val in ("policy-loader", "policy-loader:policy-loader"):
return SupervisordKresID.new(SubprocessType.POLICY_LOADER, 0)
- else:
- val = val.replace("kresd:kresd", "")
- return SupervisordKresID.new(SubprocessType.KRESD, int(val))
+ val = val.replace("kresd:kresd", "")
+ return SupervisordKresID.new(SubprocessType.KRESD, int(val))
def __str__(self) -> str:
if self.subprocess_type is SubprocessType.GC:
return "cache-gc"
- elif self.subprocess_type is SubprocessType.POLICY_LOADER:
+ if self.subprocess_type is SubprocessType.POLICY_LOADER:
return "policy-loader"
- elif self.subprocess_type is SubprocessType.KRESD:
+ if self.subprocess_type is SubprocessType.KRESD:
return f"kresd:kresd{self._id}"
- else:
- raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}")
+ raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}")
def kres_cache_gc_args(config: KresConfig) -> str:
# RPC API methods
- def _getGroupAndProcess(self, name):
+ def _getGroupAndProcess(self, name): # noqa: N802
# get process to start from name
group_name, process_name = split_namespec(name)
return group, process
- def startProcess(self, name, wait=True):
+ def startProcess(self, name, wait=True): # noqa: N802
"""Start a process
@param string name Process name (or ``group:name``, or ``group:*``)
# eventually fail
try:
filename, argv = process.get_execv_args()
- except NotFound as why:
- raise RPCError(Faults.NO_FILE, why.args[0])
+ except NotFound as e:
+ raise RPCError(Faults.NO_FILE, e.args[0]) from e
except (BadCommand, NotExecutable, NoPermission) as why:
- raise RPCError(Faults.NOT_EXECUTABLE, why.args[0])
+ raise RPCError(Faults.NOT_EXECUTABLE, why.args[0]) from why
if process.get_state() in RUNNING_STATES:
raise RPCError(Faults.ALREADY_STARTED, name)
systemd_notify(READY="1", STATUS="Ready")
-def ServerOptions_get_signal(self):
+def get_server_options_signal(self):
sig = self.signal_receiver.get_signal()
if sig == signal.SIGHUP and superd is not None:
superd.options.logger.info("received SIGHUP, forwarding to the process 'manager'")
subscribe(ProcessStateRunningEvent, check_for_runnning_manager)
# forward SIGHUP to manager
- ServerOptions.get_signal = ServerOptions_get_signal
+ ServerOptions.get_signal = get_server_options_signal
# this method is called by supervisord when loading the plugin,
# it should return XML-RPC object, which we don't care about
FORWARD_MSG_FORMAT: str = "%(name)s[%(pid)d]%(stream)s: %(data)s"
-def POutputDispatcher_log(self: POutputDispatcher, data: bytearray):
+def p_output_dispatcher_log(self: POutputDispatcher, data: bytearray):
if data:
# parse the input
if not isinstance(data, bytes):
supervisord.options.logger.handlers = supervisord_handlers
# replace output handler for subprocesses
- POutputDispatcher._log = POutputDispatcher_log
+ POutputDispatcher._log = p_output_dispatcher_log # noqa: SLF001
# we forward stdio in all cases, even when logging to syslog. This should prevent the unforturtunate
# case of swallowing an error message leaving the users confused. To make the forwarded lines obvious
# type: ignore
-# pylint: disable=protected-access
+# ruff: noqa: SLF001
# pylint: disable=c-extension-no-member
+
import os
import signal
import time
res: Optional[Tuple[int, bytes]] = notify.read_message(self.fd)
if res is None:
- return None # there was some junk
+ return # there was some junk
pid, data = res
# pylint: disable=undefined-loop-variable
break
else:
logger.warn(f"ignoring ready notification from unregistered PID={pid}")
- return None
+ return
if data.startswith(b"READY=1"):
# handle case, when some process is really ready
else:
# handle case, when we got something unexpected
logger.warn(f"ignoring unrecognized data on $NOTIFY_SOCKET sent from PID={pid}, data='{data!r}'")
- return None
+ return
def handle_write_event(self):
raise ValueError("this dispatcher is not writable")
res = first(*args, **kwargs)
if isinstance(res, tuple):
return second(*res)
- else:
- return second(res)
+ return second(res)
return wrapper
def monkeypatch(supervisord: Supervisor) -> None:
"""Inject ourselves into supervisord code"""
- # append notify socket handler to event loopo
+ # append notify socket handler to event loop
supervisord.get_process_map = chain(supervisord.get_process_map, partial(supervisord_get_process_map, supervisord))
# prepend timeout handler to transition method
# raise all validation errors
if len(errs) == 1:
raise errs[0]
- elif len(errs) > 1:
+ if len(errs) > 1:
raise AggregateDataValidationError("/", errs)
def render_lua(self) -> str:
for server in servers:
if isinstance(server, IPAddressOptionalPort) and server.port:
return int(server.port) != 53
- elif isinstance(server, ForwardServerSchema):
+ if isinstance(server, ForwardServerSchema):
return is_port_custom(server.address.to_std())
return False
if not is_obj_type_valid(target, cast(Type[Any], LogTargetEnum)):
raise ValueError(f"logging target '{target}' read from $KRES_LOGGING_TARGET is invalid")
return cast(LogTargetEnum, target)
- else:
- return raw.target
+ return raw.target
def _validate(self):
if self.groups is None:
if origin.port:
return origin.port
# default port number based on kind
- elif origin.interface:
+ if origin.interface:
if origin.kind == "dot":
return PortNumber(853)
- elif origin.kind in ["doh-legacy", "doh2"]:
+ if origin.kind in ["doh-legacy", "doh2"]:
return PortNumber(443)
return PortNumber(53)
return None
+# ruff: noqa: SLF001
+
import re
from typing import Any, Dict, Type
val, unit = grouped.groups()
if unit is None:
raise ValueError(f"Missing units. Accepted units are {list(type(self)._units.keys())}", object_path)
- elif unit not in type(self)._units:
+ if unit not in type(self)._units:
raise ValueError(
f"Used unexpected unit '{unit}' for {type(self).__name__}."
f" Accepted units are {list(type(self)._units.keys())}",
_min: int = 1
-class Int0_512(IntRangeBase):
+class Int0_512(IntRangeBase): # noqa: N801
_min: int = 0
_max: int = 512
-class Int0_65535(IntRangeBase):
+class Int0_65535(IntRangeBase): # noqa: N801
_min: int = 0
_max: int = 65_535
object_path,
) from e
- if type(self)._re.match(punycode):
+ if type(self)._re.match(punycode): # noqa: SLF001
self._punycode = punycode
else:
raise ValueError(
-class KresBaseException(Exception):
+class KresBaseException(Exception): # noqa: N818
"""
Base class for all custom exceptions we use in Knot Resolver.
"""
if os.environ.get("KRES_SUPRESS_LOG_PREFIX") == "true":
# In this case, we are running under supervisord and it's adding prefixes to our output
return "[%(levelname)s] %(name)s: %(message)s"
- else:
- # In this case, we are running standalone during inicialization and we need to add a prefix to each line
- # by ourselves to make it consistent
- assert config.logging.target != "syslog"
- stream = ""
- if config.logging.target == "stderr":
- stream = " (stderr)"
-
- pid = os.getpid()
- return f"%(asctime)s manager[{pid}]{stream}: [%(levelname)s] %(name)s: %(message)s"
+ # In this case, we are running standalone during inicialization and we need to add a prefix to each line
+ # by ourselves to make it consistent
+ assert config.logging.target != "syslog"
+ stream = ""
+ if config.logging.target == "stderr":
+ stream = " (stderr)"
+
+ pid = os.getpid()
+ return f"%(asctime)s manager[{pid}]{stream}: [%(levelname)s] %(name)s: %(message)s"
async def _set_log_level(config: KresConfig) -> None:
from subprocess import SubprocessError
from typing import Any, Callable, List, Optional
-from knot_resolver.controller.exceptions import SubprocessControllerException
+from knot_resolver.controller.exceptions import SubprocessControllerError
from knot_resolver.controller.interface import Subprocess, SubprocessController, SubprocessStatus, SubprocessType
from knot_resolver.controller.registered_workers import command_registered_workers, get_registered_workers_kresids
from knot_resolver.datamodel import KresConfig
"Trying to create an instance of KresManager using normal constructor. Please use "
"`KresManager.get_instance()` instead"
)
- assert False
+ raise AssertionError
self._workers: List[Subprocess] = []
self._gc: Optional[Subprocess] = None
"""
inst = KresManager(shutdown_trigger, _i_know_what_i_am_doing=True)
- await inst._async_init(subprocess_controller, config_store) # pylint: disable=protected-access
+ await inst._async_init(subprocess_controller, config_store) # noqa: SLF001
return inst
async def _async_init(self, subprocess_controller: SubprocessController, config_store: ConfigStore) -> None:
# if it keeps running, the config is valid and others will soon join as well
# if it crashes and the startup fails, then well, it's not running anymore... :)
await self._spawn_new_worker(new)
- except (SubprocessError, SubprocessControllerException):
+ except (SubprocessError, SubprocessControllerError):
logger.error("Kresd with the new config failed to start, rejecting config")
return Result.err("canary kresd process failed to start. Config might be invalid.")
else:
logger.debug("Stopping cache GC")
await self._stop_gc()
- except SubprocessControllerException as e:
+ except SubprocessControllerError as e:
if _noretry:
raise
- elif self._fix_counter.is_too_high():
+ if self._fix_counter.is_too_high():
logger.error(f"Failed to apply config: {e}")
logger.error("There have already been problems recently, refusing to try to fix it.")
await (
while not self._is_policy_loader_exited():
await asyncio.sleep(1)
- except (SubprocessError, SubprocessControllerException) as e:
+ except (SubprocessError, SubprocessControllerError) as e:
logger.error(f"Failed to load policy rules: {e}")
return Result.err("kresd 'policy-loader' process failed to start. Config might be invalid.")
logger.error("Failed attempting to fix an error. Forcefully shutting down.", exc_info=True)
await self.forced_shutdown()
- async def _watchdog(self) -> None: # pylint: disable=too-many-branches
+ async def _watchdog(self) -> None: # pylint: disable=too-many-branches # noqa: PLR0912
while True:
await asyncio.sleep(WATCHDOG_INTERVAL_SEC)
cmd = "collect_lazy_statistics()"
logger.debug(f"Collecting stats from all kresd workers using method '{cmd}'")
- metrics_dict = await command_registered_workers(cmd)
- return metrics_dict
+ return await command_registered_workers(cmd)
async def report_json(config: KresConfig) -> bytes:
sid = str(instance_id)
# response latency histogram
- BUCKET_NAMES_IN_RESOLVER = ("1ms", "10ms", "50ms", "100ms", "250ms", "500ms", "1000ms", "1500ms", "slow")
- BUCKET_NAMES_PROMETHEUS = ("0.001", "0.01", "0.05", "0.1", "0.25", "0.5", "1.0", "1.5", "+Inf")
+ bucket_names_in_resolver = ("1ms", "10ms", "50ms", "100ms", "250ms", "500ms", "1000ms", "1500ms", "slow")
+ bucket_names_in_prometheus = ("0.001", "0.01", "0.05", "0.1", "0.25", "0.5", "1.0", "1.5", "+Inf")
yield _histogram(
"resolver_response_latency",
"Time it takes to respond to queries in seconds",
label=("instance_id", sid),
buckets=[
(bnp, metrics["answer"][f"{duration}"])
- for bnp, duration in zip(BUCKET_NAMES_PROMETHEUS, BUCKET_NAMES_IN_RESOLVER)
+ for bnp, duration in zip(bucket_names_in_prometheus, bucket_names_in_resolver)
],
sum_value=metrics["answer"]["sum_ms"] / 1_000,
)
from knot_resolver.constants import CONFIG_FILE, USER
from knot_resolver.controller import get_best_controller_implementation
-from knot_resolver.controller.exceptions import SubprocessControllerExecException
+from knot_resolver.controller.exceptions import SubprocessControllerExecError
from knot_resolver.controller.registered_workers import command_single_registered_worker
from knot_resolver.datamodel import kres_config_json_schema
from knot_resolver.datamodel.cache_schema import CacheClearRPCSchema
raise KresManagerException(
f"Manager is configured to load config file at {config} on startup, but the file does not exist."
)
- else:
- logger.info(f"Loading configuration from '{config}' file.")
- config = try_to_parse(await readfile(config))
+ logger.info(f"Loading configuration from '{config}' file.")
+ config = try_to_parse(await readfile(config))
# validate the initial configuration
assert isinstance(config, dict)
async def _load_config(config: Dict[str, Any]) -> KresConfig:
- config_validated = KresConfig(config)
- return config_validated
+ return KresConfig(config)
async def _init_config_store(config: Dict[str, Any]) -> ConfigStore:
config_validated = await _load_config(config)
- config_store = ConfigStore(config_validated)
- return config_store
+ return ConfigStore(config_validated)
async def _init_manager(config_store: ConfigStore, server: Server) -> KresManager:
"Another manager is running in the same working directory."
f" PID file is located at {os.getcwd()}/{PID_FILE_NAME}"
) from e
- else:
- raise KresManagerException(
- "Another manager is running in the same working directory."
- f" PID file is located at {os.getcwd()}/{PID_FILE_NAME}"
- ) from e
+ raise KresManagerException(
+ "Another manager is running in the same working directory."
+ f" PID file is located at {os.getcwd()}/{PID_FILE_NAME}"
+ ) from e
# now we know that we are the only manager running in this directory
sys.exit(128 + signal.SIGTERM)
-async def start_server(config: Path = CONFIG_FILE) -> int:
+async def start_server(config: Path = CONFIG_FILE) -> int: # noqa: PLR0915
# This function is quite long, but it describes how manager runs. So let's silence pylint
# pylint: disable=too-many-statements
# After we have loaded the configuration, we can start worring about subprocess management.
manager = await _init_manager(config_store, server)
- except SubprocessControllerExecException as e:
+ except SubprocessControllerExecError as e:
# if we caught this exception, some component wants to perform a reexec during startup. Most likely, it would
# be a subprocess manager like supervisord, which wants to make sure the manager runs under supervisord in
# the process tree. So now we stop everything, and exec what we are told to. We are assuming, that the thing
except BaseException as e:
if isinstance(e, exceptions):
return default
- else:
- raise e
+ raise e
return f
return await asyncio.to_thread(func, *args, **kwargs) # type: ignore[attr-defined]
# earlier versions, run with default executor
- else:
- loop = asyncio.get_event_loop()
- pfunc = functools.partial(func, *args, **kwargs)
- res = await loop.run_in_executor(None, pfunc)
- return res
+ loop = asyncio.get_event_loop()
+ pfunc = functools.partial(func, *args, **kwargs)
+ return await loop.run_in_executor(None, pfunc)
def async_in_a_thread(func: Callable[..., T]) -> Callable[..., Coroutine[None, None, T]]:
return asyncio.create_task(coro) # type: ignore[attr-defined,arg-type]
# earlier versions, use older function
- else:
- return asyncio.ensure_future(coro)
+ return asyncio.ensure_future(coro)
def is_event_loop_running() -> bool:
- loop = events._get_running_loop() # pylint: disable=protected-access
+ loop = events._get_running_loop() # noqa: SLF001
return loop is not None and loop.is_running()
return asyncio.run(coro, debug=debug) # type: ignore[attr-defined,arg-type]
# earlier versions, use backported version of the function
- if events._get_running_loop() is not None: # pylint: disable=protected-access
+ if events._get_running_loop() is not None: # noqa: SLF001
raise RuntimeError("asyncio.run() cannot be called from a running event loop")
if not coroutines.iscoroutine(coro):
if isinstance(obj, Serializable):
return obj.to_dict()
- elif isinstance(obj, (BaseValueType, BaseGenericTypeWrapper)):
+ if isinstance(obj, (BaseValueType, BaseGenericTypeWrapper)):
o = obj.serialize()
# if Serializable.is_serializable(o):
return Serializable.serialize(o)
# return o
- elif isinstance(obj, list):
+ if isinstance(obj, list):
res: List[Any] = [Serializable.serialize(i) for i in cast(List[Any], obj)]
return res
return obj
-class _lazy_default(Generic[T], Serializable):
+class _LazyDefault(Generic[T], Serializable):
"""
Wrapper for default values BaseSchema classes which deffers their instantiation until the schema
itself is being instantiated
def lazy_default(constructor: Callable[..., T], *args: Any, **kwargs: Any) -> T:
"""We use a factory function because you can't lie about the return type in `__new__`"""
- return _lazy_default(constructor, *args, **kwargs) # type: ignore
+ return _LazyDefault(constructor, *args, **kwargs) # type: ignore
def _split_docstring(docstring: str) -> Tuple[str, Optional[str]]:
return schema
-def _describe_type(typ: Type[Any]) -> Dict[Any, Any]:
+def _describe_type(typ: Type[Any]) -> Dict[Any, Any]: # noqa: PLR0911, PLR0912
# pylint: disable=too-many-branches
if inspect.isclass(typ) and issubclass(typ, BaseSchema):
return typ.json_schema(include_schema_definition=False)
- elif inspect.isclass(typ) and issubclass(typ, BaseValueType):
+ if inspect.isclass(typ) and issubclass(typ, BaseValueType):
return typ.json_schema()
- elif is_generic_type_wrapper(typ):
+ if is_generic_type_wrapper(typ):
wrapped = get_generic_type_wrapper_argument(typ)
return _describe_type(wrapped)
- elif is_none_type(typ):
+ if is_none_type(typ):
return {"type": "null"}
- elif typ is int:
+ if typ is int:
return {"type": "integer"}
- elif typ is bool:
+ if typ is bool:
return {"type": "boolean"}
- elif typ is str:
+ if typ is str:
return {"type": "string"}
- elif is_literal(typ):
+ if is_literal(typ):
lit = get_generic_type_arguments(typ)
return {"type": "string", "enum": lit}
- elif is_optional(typ):
+ if is_optional(typ):
desc = _describe_type(get_optional_inner_type(typ))
if "type" in desc:
desc["type"] = [desc["type"], "null"]
return desc
- else:
- return {"anyOf": [{"type": "null"}, desc]}
+ return {"anyOf": [{"type": "null"}, desc]}
- elif is_union(typ):
+ if is_union(typ):
variants = get_generic_type_arguments(typ)
return {"anyOf": [_describe_type(v) for v in variants]}
- elif is_list(typ):
+ if is_list(typ):
return {"type": "array", "items": _describe_type(get_generic_type_argument(typ))}
- elif is_dict(typ):
+ if is_dict(typ):
key, val = get_generic_type_arguments(typ)
if inspect.isclass(key) and issubclass(key, BaseValueType):
return {"type": "object", "additionalProperties": _describe_type(val)}
- elif inspect.isclass(typ) and issubclass(typ, enum.Enum): # same as our is_enum(typ), but inlined for type checker
+ if inspect.isclass(typ) and issubclass(typ, enum.Enum): # same as our is_enum(typ), but inlined for type checker
return {"type": "string", "enum": [str(v) for v in typ]}
raise NotImplementedError(f"Trying to get JSON schema for type '{typ}', which is not implemented")
errs.append(e)
if len(errs) == 1:
raise errs[0]
- elif len(errs) > 1:
+ if len(errs) > 1:
raise AggregateDataValidationError(object_path, child_exceptions=errs)
return tuple(res)
errs.append(e)
if len(errs) == 1:
raise errs[0]
- elif len(errs) > 1:
+ if len(errs) > 1:
raise AggregateDataValidationError(object_path, child_exceptions=errs)
return res
except AttributeError as e:
if len(errs) == 1:
raise errs[0]
- elif len(errs) > 1:
+ if len(errs) > 1:
raise AggregateDataValidationError(object_path, child_exceptions=errs)
return res
# we are willing to cast any primitive value to string, but no compound values are allowed
if is_obj_type(obj, (str, float, int)) or isinstance(obj, BaseValueType):
return str(obj)
- elif is_obj_type(obj, bool):
+ if is_obj_type(obj, bool):
raise DataValidationError(
"Expected str, found bool. Be careful, that YAML parsers consider even"
' "no" and "yes" as a bool. Search for the Norway Problem for more'
" details. And please use quotes explicitly.",
object_path,
)
- else:
- raise DataValidationError(
- f"expected str (or number that would be cast to string), but found type {type(obj)}", object_path
- )
+ raise DataValidationError(
+ f"expected str (or number that would be cast to string), but found type {type(obj)}", object_path
+ )
def _create_int(self, obj: Any, object_path: str) -> int:
# we don't want to make an int out of anything else than other int
inner: Type[Any] = get_optional_inner_type(tp)
if obj is None:
return None
- else:
- return self.map_object(inner, obj, object_path=object_path)
+ return self.map_object(inner, obj, object_path=object_path)
def _create_bool(self, obj: Any, object_path: str) -> bool:
if is_obj_type(obj, bool):
return obj
- else:
- raise DataValidationError(f"expected bool, found {type(obj)}", object_path)
+ raise DataValidationError(f"expected bool, found {type(obj)}", object_path)
def _create_literal(self, tp: Type[Any], obj: Any, object_path: str) -> Any:
expected = get_generic_type_arguments(tp)
if obj in expected:
return obj
- else:
- raise DataValidationError(f"'{obj}' does not match any of the expected values {expected}", object_path)
+ raise DataValidationError(f"'{obj}' does not match any of the expected values {expected}", object_path)
def _create_base_schema_object(self, tp: Type[Any], obj: Any, object_path: str) -> "BaseSchema":
if isinstance(obj, (dict, BaseSchema)):
if isinstance(obj, tp):
# if we already have a custom value type, just pass it through
return obj
- else:
- # no validation performed, the implementation does it in the constuctor
- try:
- return tp(obj, object_path=object_path)
- except ValueError as e:
- if len(e.args) > 0 and isinstance(e.args[0], str):
- msg = e.args[0]
- else:
- msg = f"Failed to validate value against {tp} type"
- raise DataValidationError(msg, object_path) from e
+ # no validation performed, the implementation does it in the constuctor
+ try:
+ return tp(obj, object_path=object_path)
+ except ValueError as e:
+ if len(e.args) > 0 and isinstance(e.args[0], str):
+ msg = e.args[0]
+ else:
+ msg = f"Failed to validate value against {tp} type"
+ raise DataValidationError(msg, object_path) from e
def _create_default(self, obj: Any) -> Any:
- if isinstance(obj, _lazy_default):
+ if isinstance(obj, _LazyDefault):
return obj.instantiate() # type: ignore
- else:
- return obj
+ return obj
- def map_object(
+ def map_object( # noqa: PLR0911, PLR0912
self,
tp: Type[Any],
obj: Any,
return self._create_default(default)
# NoneType
- elif is_none_type(tp):
+ if is_none_type(tp):
if obj is None:
return None
- else:
- raise DataValidationError(f"expected None, found '{obj}'.", object_path)
+ raise DataValidationError(f"expected None, found '{obj}'.", object_path)
# Optional[T] (could be technically handled by Union[*variants], but this way we have better error reporting)
- elif is_optional(tp):
+ if is_optional(tp):
return self._create_optional(tp, obj, object_path)
# Union[*variants]
- elif is_union(tp):
+ if is_union(tp):
return self._create_union(tp, obj, object_path)
# after this, there is no place for a None object
- elif obj is None:
+ if obj is None:
raise DataValidationError(f"unexpected value 'None' for type {tp}", object_path)
# int
- elif tp is int:
+ if tp is int:
return self._create_int(obj, object_path)
# str
- elif tp is str:
+ if tp is str:
return self._create_str(obj, object_path)
# bool
- elif tp is bool:
+ if tp is bool:
return self._create_bool(obj, object_path)
# float
- elif tp is float:
+ if tp is float:
raise NotImplementedError(
"Floating point values are not supported in the object mapper."
" Please implement them and be careful with type coercions"
)
# Literal[T]
- elif is_literal(tp):
+ if is_literal(tp):
return self._create_literal(tp, obj, object_path)
# Dict[K,V]
- elif is_dict(tp):
+ if is_dict(tp):
return self._create_dict(tp, obj, object_path)
# any Enums (probably used only internally in DataValidator)
- elif is_enum(tp):
+ if is_enum(tp):
if isinstance(obj, tp):
return obj
- else:
- raise DataValidationError(f"unexpected value '{obj}' for enum '{tp}'", object_path)
+ raise DataValidationError(f"unexpected value '{obj}' for enum '{tp}'", object_path)
# List[T]
- elif is_list(tp):
+ if is_list(tp):
return self._create_list(tp, obj, object_path)
# Tuple[A,B,C,D,...]
- elif is_tuple(tp):
+ if is_tuple(tp):
return self._create_tuple(tp, obj, object_path)
# type of obj and cls type match
- elif is_obj_type(obj, tp):
+ if is_obj_type(obj, tp):
return obj
# when the specified type is Any, just return the given value
# on mypy version 1.11.0 comparison-overlap error started popping up
# https://github.com/python/mypy/issues/17665
- elif tp == Any: # type: ignore[comparison-overlap]
+ if tp == Any: # type: ignore[comparison-overlap]
return obj
# BaseValueType subclasses
- elif inspect.isclass(tp) and issubclass(tp, BaseValueType):
+ if inspect.isclass(tp) and issubclass(tp, BaseValueType):
return self.create_value_type_object(tp, obj, object_path)
# BaseGenericTypeWrapper subclasses
- elif is_generic_type_wrapper(tp):
+ if is_generic_type_wrapper(tp):
inner_type = get_generic_type_wrapper_argument(tp)
obj_valid = self.map_object(inner_type, obj, object_path)
return tp(obj_valid, object_path=object_path) # type: ignore
# nested BaseSchema subclasses
- elif inspect.isclass(tp) and issubclass(tp, BaseSchema):
+ if inspect.isclass(tp) and issubclass(tp, BaseSchema):
return self._create_base_schema_object(tp, obj, object_path)
# if the object matches, just pass it through
- elif inspect.isclass(tp) and isinstance(obj, tp):
+ if inspect.isclass(tp) and isinstance(obj, tp):
return obj
# default error handler
- else:
- raise DataValidationError(
- f"Type {tp} cannot be parsed. This is a implementation error. "
- "Please fix your types in the class or improve the parser/validator.",
- object_path,
- )
+ raise DataValidationError(
+ f"Type {tp} cannot be parsed. This is a implementation error. "
+ "Please fix your types in the class or improve the parser/validator.",
+ object_path,
+ )
def is_obj_type_valid(self, obj: Any, tp: Type[Any]) -> bool:
"""
if len(errs) == 1:
raise errs[0]
- elif len(errs) > 1:
+ if len(errs) > 1:
raise AggregateDataValidationError(object_path, errs)
return used_keys
if argc == 1:
# it is a static method
return func(source)
- elif argc == 2:
+ if argc == 2:
# it is a instance method
return func(_create_untouchable("obj"), source)
- else:
- raise RuntimeError("Transformation function has wrong number of arguments")
+ raise RuntimeError("Transformation function has wrong number of arguments")
except ValueError as e:
if len(e.args) > 0 and isinstance(e.args[0], str):
msg = e.args[0]
worry about a different BaseSchema class, when we want to have dynamically renamed fields.
"""
# As this is a delegated constructor, we must ignore protected access warnings
- # pylint: disable=protected-access
# sanity check
if not isinstance(source, (BaseSchema, dict)): # type: ignore
raise DataValidationError(f"expected dict-like object, found '{type(source)}'", object_path)
# construct lower level schema first if configured to do so
- if obj._LAYER is not None:
- source = obj._LAYER(source, object_path=object_path) # pylint: disable=not-callable
+ if obj._LAYER is not None: # noqa: SLF001
+ source = obj._LAYER(source, object_path=object_path) # pylint: disable=not-callable # noqa: SLF001
# assign fields
used_keys = self._assign_fields(obj, source, object_path)
# validate the constructed value
try:
- obj._validate()
+ obj._validate() # noqa: SLF001
except ValueError as e:
raise DataValidationError(e.args[0] if len(e.args) > 0 else "Validation error", object_path or "/") from e
def get_unparsed_data(self) -> Dict[str, Any]:
if isinstance(self.__source, BaseSchema):
return self.__source.get_unparsed_data()
- elif isinstance(self.__source, Renamed):
+ if isinstance(self.__source, Renamed):
return self.__source.original()
- else:
- return self.__source
+ return self.__source
def __getitem__(self, key: str) -> Any:
if not hasattr(self, key):
indentation_level += 1
msg_parts.append("Configuration validation error detected:")
- INDENT = indentation_level * "\t"
- msg_parts.append(f"{INDENT}{self.msg()}")
+ indent = indentation_level * "\t"
+ msg_parts.append(f"{indent}{self.msg()}")
for c in self._child_exceptions:
msg_parts.append(c.recursive_msg(indentation_level + 1))
f"JSON pointer cannot reference nested non-existent object: object at ptr '{current_ptr}' already points to None, cannot nest deeper with token '{token}'"
)
- elif isinstance(current, (bool, int, float, str)):
+ if isinstance(current, (bool, int, float, str)):
raise ValueError(f"object at '{current_ptr}' is a scalar, JSON pointer cannot point into it")
- else:
- parent = current
- if isinstance(current, list):
- if token == "-":
- current = None
- else:
- try:
- token = int(token)
- current = current[token]
- except ValueError as e:
- raise ValueError(
- f"invalid JSON pointer: list '{current_ptr}' require numbers as keys, instead got '{token}'"
- ) from e
-
- elif isinstance(current, dict):
- current = current.get(token, None)
+ parent = current
+ if isinstance(current, list):
+ if token == "-":
+ current = None
+ else:
+ try:
+ token_num = int(token)
+ current = current[token_num]
+ except ValueError as e:
+ raise ValueError(
+ f"invalid JSON pointer: list '{current_ptr}' require numbers as keys, instead got '{token}'"
+ ) from e
+
+ elif isinstance(current, dict):
+ current = current.get(token, None)
current_ptr += f"/{token}"
# RaiseDuplicatesLoader extends yaml.SafeLoader, so this should be safe
# https://python.land/data-processing/python-yaml#PyYAML_safe_load_vs_load
return renamed(yaml.load(text, Loader=_RaiseDuplicatesLoader)) # type: ignore
- elif self is DataFormat.JSON:
+ if self is DataFormat.JSON:
return renamed(json.loads(text, object_pairs_hook=_json_raise_duplicates))
- else:
- raise NotImplementedError(f"Parsing of format '{self}' is not implemented")
+ raise NotImplementedError(f"Parsing of format '{self}' is not implemented")
def dict_dump(self, data: Union[Dict[str, Any], Renamed], indent: Optional[int] = None) -> str:
if isinstance(data, Renamed):
if self is DataFormat.YAML:
return yaml.safe_dump(data, indent=indent) # type: ignore
- elif self is DataFormat.JSON:
+ if self is DataFormat.JSON:
return json.dumps(data, indent=indent)
- else:
- raise NotImplementedError(f"Exporting to '{self}' format is not implemented")
+ raise NotImplementedError(f"Exporting to '{self}' format is not implemented")
def parse_yaml(data: str) -> Any:
# and we may not know which one is the actual one.
raise DataParsingError( # pylint: disable=raise-missing-from
f"failed to parse data, JSON: {je}, YAML: {ye}"
- )
+ ) from ye
assert isinstance(token, int)
parent.insert(token, self.value)
else:
- assert False, "never happens"
+ raise AssertionError("never happens")
return fakeroot
newobj = copy.deepcopy(obj)
fakeroot = RemoveOp({"op": "remove", "path": self.source}).eval(fakeroot)
- fakeroot = AddOp({"path": self.path, "value": newobj, "op": "add"}).eval(fakeroot)
- return fakeroot
+ return AddOp({"path": self.path, "value": newobj, "op": "add"}).eval(fakeroot)
class CopyOp(Op):
_parent, obj, _token = self._resolve_ptr(fakeroot, self.source)
newobj = copy.deepcopy(obj)
- fakeroot = AddOp({"path": self.path, "value": newobj, "op": "add"}).eval(fakeroot)
- return fakeroot
+ return AddOp({"path": self.path, "value": newobj, "op": "add"}).eval(fakeroot)
class TestOp(Op):
parent, obj, token = json_ptr_resolve(fakeroot, f"/root{ptr}")
return fakeroot["root"], obj
- elif method == "delete":
+ if method == "delete":
fakeroot = RemoveOp({"op": "remove", "path": ptr}).eval(fakeroot)
return fakeroot["root"], None
- elif method == "put":
+ if method == "put":
parent, obj, token = json_ptr_resolve(fakeroot, f"/root{ptr}")
assert parent is not None # we know this due to the fakeroot
if isinstance(parent, list) and token == "-":
parent[token] = payload
return fakeroot["root"], None
- elif method == "patch":
+ if method == "patch":
tp = List[Union[AddOp, RemoveOp, MoveOp, CopyOp, TestOp, ReplaceOp]]
transaction: tp = map_object(tp, payload)
return fakeroot["root"], None
- else:
- assert False, "invalid operation, never happens"
+ raise AssertionError("invalid operation, never happens")
def renamed(obj: Any) -> Any:
if isinstance(obj, dict):
return RenamedDict(**obj)
- elif isinstance(obj, list):
+ if isinstance(obj, list):
return RenamedList(obj)
- else:
- return obj
+ return obj
__all__ = ["renamed", "Renamed"]
def is_literal(tp: Any) -> bool:
if sys.version_info.minor == 6:
return isinstance(tp, type(Literal))
- else:
- return getattr(tp, "__origin__", None) == Literal
+ return getattr(tp, "__origin__", None) == Literal
def is_generic_type_wrapper(tp: Any) -> bool:
default: List[Any] = []
if sys.version_info.minor == 6 and is_literal(tp):
return getattr(tp, "__values__")
- else:
- return getattr(tp, "__args__", default)
+ return getattr(tp, "__args__", default)
def get_generic_type_argument(tp: Any) -> Any: