def get_socket_from_config(config: Path, optional_file: bool) -> Optional[SocketDesc]:
try:
- with open(config, "r") as f:
+ with open(config, "r", encoding="utf8") as f:
data = parsing.try_to_parse(f.read())
mkey = "management"
if mkey in data:
# Backported from:
# https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py#L55-L74
#
- to_cancel = tasks.Task.all_tasks(loop)
+ to_cancel = tasks.all_tasks(loop)
if not to_cancel:
return
# since 3.10, the loop argument is removed
loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
else:
- loop.run_until_complete(tasks.gather(*to_cancel, loop=loop, return_exceptions=True))
+ loop.run_until_complete(tasks.gather(*to_cancel, loop=loop, return_exceptions=True)) # type: ignore[call-overload]
for task in to_cancel:
if task.cancelled():
This module contains rather simplistic reimplementation of dataclasses due to them being unsupported on Python 3.6
"""
-
from typing import Any, Dict, Set, Type
dataclasses_import_success = False
def _validate(self) -> None:
if self.pin_sha256 and (self.hostname or self.ca_file):
- ValueError("'pin-sha256' cannot be configurad together with 'hostname' or 'ca-file'")
+ raise ValueError("'pin-sha256' cannot be configurad together with 'hostname' or 'ca-file'")
class ForwardOptionsSchema(ConfigSchema):
We supported multiple subprocess controllers while developing it. It now all converged onto just supervisord.
The interface however remains so that different controllers can be added in the future.
"""
+
# pylint: disable=import-outside-toplevel
import asyncio
from knot_resolver_manager.constants import kresd_config_file
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.exceptions import SubprocessControllerException
-from knot_resolver_manager.utils.async_utils import writefile
-
from knot_resolver_manager.kresd_controller.registered_workers import register_worker, unregister_worker
+from knot_resolver_manager.utils.async_utils import writefile
logger = logging.getLogger(__name__)
from typing import NoReturn
from knot_resolver_manager import compat
-from knot_resolver_manager.constants import DEFAULT_MANAGER_CONFIG_FILE, CONFIG_FILE_ENV_VAR
+from knot_resolver_manager.constants import CONFIG_FILE_ENV_VAR, DEFAULT_MANAGER_CONFIG_FILE
from knot_resolver_manager.log import logger_startup
from knot_resolver_manager.server import start_server
https://www.rfc-editor.org/rfc/rfc6901
"""
-
from typing import Any, Optional, Tuple, Union
# JSONPtrAddressable = Optional[Union[Dict[str, "JSONPtrAddressable"], List["JSONPtrAddressable"], int, float, bool, str, None]]