help="Overrides default config location at '" + str(DEFAULT_MANAGER_CONFIG_FILE) + "'",
)
@click.option("--list-backends", "-l", type=bool, is_flag=True, default=False)
-def main(config: Optional[str], list_backends: bool):
+def main(config: Optional[str], list_backends: bool) -> None:
# pylint: disable=expression-not-assigned
"""Knot Resolver Manager
def _create_url(self, path: str) -> str:
return urllib.parse.urljoin(self._url, path)
- def stop(self):
+ def stop(self) -> None:
response = requests.post(self._create_url("/stop"))
print(response.text)
- def set_num_workers(self, n: int):
+ def set_num_workers(self, n: int) -> None:
response = requests.post(self._create_url("/config/server/workers"), data=str(n))
print(response.text)
- def set_static_hints(self, hints: Dict[str, List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]]):
+ def set_static_hints(self, hints: Dict[str, List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]]) -> None:
payload = {name: [str(a) for a in addrs] for name, addrs in hints.items()}
response = requests.post(self._create_url("/config/static-hints/hints"), json=payload)
print(response.text)
- def set_listen_ip_address(self, ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address], port: int):
+ def set_listen_ip_address(self, ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address], port: int) -> None:
payload = [{"listen": {"ip": str(ip), "port": port}}]
response = requests.post(self._create_url("/config/network/interfaces"), json=payload)
print(response)
- def wait_for_initialization(self, timeout_sec: float = 5, time_step: float = 0.4):
+ def wait_for_initialization(self, timeout_sec: float = 5, time_step: float = 0.4) -> None:
started = time.time()
while True:
try:
help="Set base URL on which the manager communicates",
)
@click.pass_context
-def main(ctx: click.Context, base_url: str):
+def main(ctx: click.Context, base_url: str) -> None:
ctx.ensure_object(dict)
ctx.obj[BASE_URL] = base_url
@main.command(help="Shutdown the manager and all workers")
@click.pass_context
-def stop(ctx: click.Context):
+def stop(ctx: click.Context) -> None:
client = KnotManagerClient(ctx.obj[BASE_URL])
client.stop()
@main.command("gen-lua", help="Generate LUA config from a given declarative config")
@click.argument("config_path", type=str, nargs=1)
-def gen_lua(config_path: str):
+def gen_lua(config_path: str) -> None:
try:
with open(config_path, "r", encoding="utf8") as f:
data = f.read()
@main.command(help="Set number of workers")
@click.argument("instances", type=int, nargs=1)
@click.pass_context
-def workers(ctx: click.Context, instances: int):
+def workers(ctx: click.Context, instances: int) -> None:
client = KnotManagerClient(ctx.obj[BASE_URL])
client.set_num_workers(instances)
@click.argument("name", type=str, nargs=1)
@click.argument("ip", type=str, nargs=1)
@click.pass_context
-def one_static_hint(ctx: click.Context, name: str, ip: str):
+def one_static_hint(ctx: click.Context, name: str, ip: str) -> None:
client = KnotManagerClient(ctx.obj[BASE_URL])
client.set_static_hints({name: [ipaddress.ip_address(ip)]})
@click.argument("ip", type=str, nargs=1)
@click.argument("port", type=int, nargs=1)
@click.pass_context
-def listen_ip(ctx: click.Context, ip: str, port: int):
+def listen_ip(ctx: click.Context, ip: str, port: int) -> None:
client = KnotManagerClient(ctx.obj[BASE_URL])
client.set_listen_ip_address(ipaddress.ip_address(ip), port)
@main.command(help="Wait for manager initialization")
@click.pass_context
-def wait(ctx: click.Context):
+def wait(ctx: click.Context) -> None:
client = KnotManagerClient(ctx.obj[BASE_URL])
try:
client.wait_for_initialization()
from asyncio.futures import Future
from typing import Any, Awaitable, Callable, Coroutine, Optional, TypeVar
-from knot_resolver_manager.utils.types import NoneType
-
logger = logging.getLogger(__name__)
T = TypeVar("T")
async def to_thread(func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
# version 3.9 and higher, call directly
if sys.version_info.major >= 3 and sys.version_info.minor >= 9:
- return await asyncio.to_thread(func, *args, **kwargs)
+ return await asyncio.to_thread(func, *args, **kwargs) # type: ignore[attr-defined]
# earlier versions, run with default executor
else:
return res
-def create_task(coro: Coroutine[Any, T, NoneType], name: Optional[str] = None) -> "Future[T]":
+def create_task(coro: Awaitable[T], name: Optional[str] = None) -> "Future[T]":
# version 3.8 and higher, call directly
if sys.version_info.major >= 3 and sys.version_info.minor >= 8:
- return asyncio.create_task(coro, name=name)
+ return asyncio.create_task(coro, name=name) # type: ignore[attr-defined]
# version 3.7 and higher, call directly without the name argument
if sys.version_info.major >= 3 and sys.version_info.minor >= 8:
- return asyncio.create_task(coro)
+ return asyncio.create_task(coro) # type: ignore[attr-defined]
# earlier versions, use older function
else:
return asyncio.ensure_future(coro)
-def run(coro: Coroutine[Any, T, NoneType], debug: Optional[bool] = None) -> Awaitable[T]:
+def run(coro: Awaitable[T], debug: Optional[bool] = None) -> Awaitable[T]:
# ideally copy-paste of this:
# https://github.com/python/cpython/blob/3.9/Lib/asyncio/runners.py#L8
asyncio.set_event_loop(loop)
if debug is not None:
loop.set_debug(debug)
- return loop.run_until_complete(coro)
+ # The following line have a really weird type requirements. I don't understand the reasoning, but it works
+ return loop.run_until_complete(coro) # type: ignore[arg-type]
# asyncio.run would cancel all running tasks, but it would use internal API for that
# so let's ignore it and let the tasks die
-def add_async_signal_handler(signal: int, callback: Callable[[], Awaitable[None]]):
+def add_async_signal_handler(signal: int, callback: Callable[[], Coroutine[Any, Any, None]]) -> None:
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal, lambda: create_task(callback()))
_CUSTOM_DATACLASS_MARKER = "_CUSTOM_DATACLASS_MARKER"
-def dataclass(cls: Any):
+def dataclass(cls: Any) -> Any:
if dataclasses_import_success:
return dataclasses.dataclass(cls)
self._callbacks: List[UpdateCallback] = []
self._update_lock: Lock = Lock()
- async def update(self, config: KresConfig):
+ async def update(self, config: KresConfig) -> None:
# invoke pre-change verifiers
- results: Tuple[Result[None, str], ...] = await asyncio.gather(
- *[ver(self._config, config) for ver in self._verifiers]
+ results: Tuple[Result[None, str], ...] = tuple(
+ await asyncio.gather(*[ver(self._config, config) for ver in self._verifiers])
)
err_res = filter(lambda r: r.is_err(), results)
errs = list(map(lambda r: r.unwrap_err(), err_res))
for call in self._callbacks:
await call(config)
- async def register_verifier(self, verifier: VerifyCallback):
+ async def register_verifier(self, verifier: VerifyCallback) -> None:
self._verifiers.append(verifier)
res = await verifier(self.get(), self.get())
if res.is_err():
raise DataException(f"Initial config verification failed with error: {res.unwrap_err()}")
- async def register_on_change_callback(self, callback: UpdateCallback):
+ async def register_on_change_callback(self, callback: UpdateCallback) -> None:
"""
Registers new callback and immediatelly calls it with current config
"""
original_value_set: Any = False
original_value: Any = None
- async def new_func(config: KresConfig):
+ async def new_func(config: KresConfig) -> None:
nonlocal original_value_set
nonlocal original_value
if not original_value_set:
kind: KindEnum
freebind: bool
- def _listen(self, origin: Raw):
+ def _listen(self, origin: Raw) -> Listen:
if not origin.listen.port:
if origin.kind == "dot":
origin.listen.port = 853
from typing import List, Optional
-from knot_resolver_manager.datamodel.policy_schema import ActionEnum
-from knot_resolver_manager.datamodel.types import CheckedPath, FlagsEnum
+from knot_resolver_manager.datamodel.types import ActionEnum, CheckedPath, FlagsEnum
from knot_resolver_manager.utils import SchemaNode
unix_socket: Optional[CheckedPath]
interface: Optional[str]
- def _typ(self, origin: Raw):
+ def _typ(self, origin: Raw) -> ListenType:
present = {
"ip" if origin.ip is not None else ...,
"port" if origin.port is not None else ...,
"You can use (IP and PORT) or (UNIX_SOCKET) or (INTERFACE and PORT)."
)
- def _port(self, origin: Raw):
+ def _port(self, origin: Raw) -> Optional[int]:
if origin.port is None:
return None
if not 0 <= origin.port <= 65_535:
self._id = n
self._repr: Optional[str] = None
- def set_custom_str_representation(self, representation: str):
+ def set_custom_str_representation(self, representation: str) -> None:
self._repr = representation
def __str__(self) -> str:
Instantiate with `KresManager.create()`, not with the usual constructor!
"""
+ def __init__(self, _i_know_what_i_am_doing: bool = False):
+ if not _i_know_what_i_am_doing:
+ logger.error(
+ "Trying to create an instance of KresManager using normal constructor. Please use "
+ "`KresManager.get_instance()` instead"
+ )
+ sys.exit(1)
+
+ self._workers: List[Subprocess] = []
+ self._gc: Optional[Subprocess] = None
+ self._manager_lock = asyncio.Lock()
+ self._controller: SubprocessController
+ self._watchdog_task: Optional["Future[None]"] = None
+
@staticmethod
async def create(selected_controller: Optional[SubprocessController], config_store: ConfigStore) -> "KresManager":
"""
await inst._async_init(selected_controller, config_store) # pylint: disable=protected-access
return inst
- async def _async_init(self, selected_controller: Optional[SubprocessController], config_store: ConfigStore):
+ async def _async_init(self, selected_controller: Optional[SubprocessController], config_store: ConfigStore) -> None:
if selected_controller is None:
self._controller = await knot_resolver_manager.kresd_controller.get_best_controller_implementation(
config_store.get()
await config_store.register_verifier(self.validate_config)
await config_store.register_on_change_callback(self.apply_config)
- def __init__(self, _i_know_what_i_am_doing: bool = False):
- if not _i_know_what_i_am_doing:
- logger.error(
- "Trying to create an instance of KresManager using normal constructor. Please use "
- "`KresManager.get_instance()` instead"
- )
- sys.exit(1)
-
- self._workers: List[Subprocess] = []
- self._gc: Optional[Subprocess] = None
- self._manager_lock = asyncio.Lock()
- self._controller: SubprocessController
- self._watchdog_task: Optional["Future[None]"] = None
-
- async def _load_system_state(self):
+ async def _load_system_state(self) -> None:
async with self._manager_lock:
await self._collect_already_running_children()
- async def _spawn_new_worker(self, config: KresConfig):
+ async def _spawn_new_worker(self, config: KresConfig) -> None:
subprocess = await self._controller.create_subprocess(config, SubprocessType.KRESD, kres_id.alloc())
await subprocess.start()
self._workers.append(subprocess)
- async def _stop_a_worker(self):
+ async def _stop_a_worker(self) -> None:
if len(self._workers) == 0:
raise IndexError("Can't stop a kresd when there are no running")
kresd = self._workers.pop()
await kresd.stop()
- async def _collect_already_running_children(self):
+ async def _collect_already_running_children(self) -> None:
for subp in await self._controller.get_all_running_instances():
if subp.type == SubprocessType.KRESD:
self._workers.append(subp)
else:
raise RuntimeError("unexpected subprocess type")
- async def _rolling_restart(self, new_config: KresConfig):
+ async def _rolling_restart(self, new_config: KresConfig) -> None:
for kresd in self._workers:
await kresd.apply_new_config(new_config)
await asyncio.sleep(1)
- async def _ensure_number_of_children(self, config: KresConfig, n: int):
+ async def _ensure_number_of_children(self, config: KresConfig, n: int) -> None:
# kill children that are not needed
while len(self._workers) > n:
await self._stop_a_worker()
def _is_gc_running(self) -> bool:
return self._gc is not None
- async def _start_gc(self, config: KresConfig):
+ async def _start_gc(self, config: KresConfig) -> None:
subprocess = await self._controller.create_subprocess(config, SubprocessType.GC, kres_id.alloc())
await subprocess.start()
self._gc = subprocess
- async def _stop_gc(self):
+ async def _stop_gc(self) -> None:
assert self._gc is not None
await self._gc.stop()
self._gc = None
logger.debug("Canary process test passed.")
return Result.ok(None)
- async def apply_config(self, config: KresConfig):
+ async def apply_config(self, config: KresConfig) -> None:
async with self._manager_lock:
logger.debug("Applying new config to all workers")
await self._ensure_number_of_children(config, config.server.workers)
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
from xmlrpc.client import ServerProxy
-import supervisor.xmlrpc
+import supervisor.xmlrpc # type: ignore[import]
from jinja2 import Template
from knot_resolver_manager.compat.asyncio import to_thread
raise NotImplementedError("This subprocess type is not supported")
-async def _write_config_file(config: KresConfig, instances: Set["SupervisordSubprocess"]):
+async def _write_config_file(config: KresConfig, instances: Set["SupervisordSubprocess"]) -> None:
@dataclass
class SupervisordConfig:
unix_http_server: str
template = template.decode("utf8")
config_string = Template(template).render( # pyright: reportUnknownMemberType=false
instances=[
- _Instance(
+ _Instance( # type: ignore[call-arg]
type=i.type.name,
logfile=supervisord_subprocess_log_dir(config) / f"{i.id}.log",
id=str(i.id),
)
for i in instances
],
- config=SupervisordConfig(
+ config=SupervisordConfig( # type: ignore[call-arg]
unix_http_server=supervisord_sock_file(config),
pid_file=supervisord_pid_file(config),
workdir=str(config.server.rundir.to_path().absolute()),
os.rename(supervisord_config_file_tmp(config), supervisord_config_file(config))
-async def _start_supervisord(config: KresConfig):
+async def _start_supervisord(config: KresConfig) -> None:
await _write_config_file(config, set())
res = await call(f'supervisord --configuration="{supervisord_config_file(config).absolute()}"', shell=True)
assert res == 0
-async def _stop_supervisord(config: KresConfig):
+async def _stop_supervisord(config: KresConfig) -> None:
pid = int(await readfile(supervisord_pid_file(config)))
kill(pid, signal.SIGINT)
await wait_for_process_termination(pid)
-async def _update_config(config: KresConfig, instances: Set["SupervisordSubprocess"]):
+async def _update_config(config: KresConfig, instances: Set["SupervisordSubprocess"]) -> None:
await _write_config_file(config, instances)
await call(f'supervisorctl -c "{supervisord_config_file(config).absolute()}" update', shell=True)
-async def _restart(config: KresConfig, id_: KresID):
+async def _restart(config: KresConfig, id_: KresID) -> None:
await call(f'supervisorctl -c "{supervisord_config_file(config).absolute()}" restart {id_}', shell=True)
def __str__(self):
return "supervisord"
- def should_be_running(self, subprocess: SupervisordSubprocess):
+ def should_be_running(self, subprocess: SupervisordSubprocess) -> bool:
return subprocess in self._running_instances
async def is_controller_available(self, config: KresConfig) -> bool:
logger.debug("Detection - supervisord controller is available for use")
return res
- async def _update_config_with_real_state(self, config: KresConfig):
+ async def _update_config_with_real_state(self, config: KresConfig) -> None:
assert self._controller_config is not None
running = await _is_supervisord_running(config)
assert self._controller_config is not None
await _stop_supervisord(self._controller_config)
- async def start_subprocess(self, subprocess: SupervisordSubprocess):
+ async def start_subprocess(self, subprocess: SupervisordSubprocess) -> None:
assert self._controller_config is not None
assert subprocess not in self._running_instances
self._running_instances.add(subprocess)
await _update_config(self._controller_config, self._running_instances)
- async def stop_subprocess(self, subprocess: SupervisordSubprocess):
+ async def stop_subprocess(self, subprocess: SupervisordSubprocess) -> None:
assert self._controller_config is not None
assert subprocess in self._running_instances
self._running_instances.remove(subprocess)
await _update_config(self._controller_config, self._running_instances)
- async def restart_subprocess(self, subprocess: SupervisordSubprocess):
+ async def restart_subprocess(self, subprocess: SupervisordSubprocess) -> None:
assert self._controller_config is not None
assert subprocess in self._running_instances
await _restart(self._controller_config, subprocess.id)
import os
from enum import Enum, auto
from threading import Thread
-from typing import Any, Callable, Dict, List, Optional, Tuple, Union
+from typing import Any, Callable, Dict, List, Optional, Tuple
-from gi.repository import GLib
-from pydbus import SystemBus
-from pydbus.bus import SessionBus
+from gi.repository import GLib # type: ignore[import]
+from pydbus import SystemBus # type: ignore[import]
+from pydbus.bus import SessionBus # type: ignore[import]
from typing_extensions import Literal
from knot_resolver_manager.compat.dataclasses import dataclass
return _create_object_proxy(type_, ".systemd1")
-def _wait_for_job_completion(systemd: Any, job_creating_func: Callable[[], str]):
+def _wait_for_job_completion(systemd: Any, job_creating_func: Callable[[], str]) -> None:
"""
Takes a function returning a systemd job path, executes it while simultaneously waiting
for its completion. This prevents race conditions.
def _wait_for_job_completion_handler(loop: Any) -> Any:
completed_jobs: Dict[str, str] = {}
- def event_hander(_job_id: Any, path: Any, _unit: Any, state: Any):
+ def event_hander(_job_id: Any, path: Any, _unit: Any, state: Any) -> None:
nonlocal result_state
nonlocal completed_jobs
return event_hander
- def event_loop_isolation_thread():
+ def event_loop_isolation_thread() -> None:
loop: Any = GLib.MainLoop()
systemd.JobRemoved.connect(_wait_for_job_completion_handler(loop))
loop.run()
def get_unit_file_state(
type_: SystemdType,
unit_name: str,
-) -> Union[Literal["disabled"], Literal["enabled"]]:
+) -> Literal["disabled", "enabled"]:
res = str(_create_manager_proxy(type_).GetUnitFileState(unit_name))
assert res == "disabled" or res == "enabled"
- return res
+ return res # type: ignore
@dataclass
def list_units(type_: SystemdType) -> List[Unit]:
- return [Unit(name=str(u[0]), state=str(u[4])) for u in _list_units_internal(type_)]
+ return [Unit(name=str(u[0]), state=str(u[4])) for u in _list_units_internal(type_)] # type: ignore[call-arg]
def list_unit_names(type_: SystemdType) -> List[str]:
return [str(u[0]) for u in _list_units_internal(type_)]
-def reset_failed_unit(typ: SystemdType, unit_name: str):
+def reset_failed_unit(typ: SystemdType, unit_name: str) -> None:
systemd = _create_manager_proxy(typ)
systemd.ResetFailedUnit(unit_name)
-def restart_unit(type_: SystemdType, unit_name: str):
+def restart_unit(type_: SystemdType, unit_name: str) -> None:
systemd = _create_manager_proxy(type_)
def job():
def start_transient_kresd_unit(
config: KresConfig, type_: SystemdType, kres_id: KresID, subprocess_type: SubprocessType
-):
+) -> None:
name, properties = {
SubprocessType.KRESD: (f"kresd_{kres_id}.service", _kresd_unit_properties(config, kres_id)),
SubprocessType.GC: (GC_SERVICE_NAME, _gc_unit_properties(config)),
raise SubprocessControllerException(f"Failed to start systemd transient service '{name}'") from e
-def start_unit(type_: SystemdType, unit_name: str):
+def start_unit(type_: SystemdType, unit_name: str) -> None:
systemd = _create_manager_proxy(type_)
- def job():
+ def job() -> Any:
return systemd.StartUnit(unit_name, "fail")
_wait_for_job_completion(systemd, job)
-def stop_unit(type_: SystemdType, unit_name: str):
+def stop_unit(type_: SystemdType, unit_name: str) -> None:
systemd = _create_manager_proxy(type_)
- def job():
+ def job() -> Any:
return systemd.StopUnit(unit_name, "fail")
_wait_for_job_completion(systemd, job)
logger = logging.getLogger(__name__)
-async def _set_log_level(config: KresConfig):
+async def _set_log_level(config: KresConfig) -> None:
levels_map = {
"crit": "CRITICAL",
"err": "ERROR",
logging.getLogger().setLevel(target)
-async def _set_logging_handler(config: KresConfig):
+async def _set_logging_handler(config: KresConfig) -> None:
target: Optional[LogTargetEnum] = config.logging.target
if target is None:
target = "stdout"
+ handler: logging.Handler
if target == "syslog":
handler = logging.handlers.SysLogHandler(address="/dev/log")
handler.setFormatter(logging.Formatter("%(name)s:%(message)s"))
@only_on_real_changes(lambda config: config.logging)
-async def _configure_logger(config: KresConfig):
+async def _configure_logger(config: KresConfig) -> None:
await _set_logging_handler(config)
await _set_log_level(config)
-async def logger_init(config_store: ConfigStore):
+async def logger_init(config_store: ConfigStore) -> None:
await config_store.register_on_change_callback(_configure_logger)
-def logger_startup():
+def logger_startup() -> None:
logging.getLogger().setLevel(STARTUP_LOG_LEVEL)
err_handler = logging.StreamHandler(sys.stderr)
err_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
@middleware
-async def error_handler(request: web.Request, handler: Any):
+async def error_handler(request: web.Request, handler: Any) -> web.Response:
"""
Generic error handler for route handlers.
self.shutdown_event = asyncio.Event()
- async def _reconfigure(self, config: KresConfig):
+ async def _reconfigure(self, config: KresConfig) -> None:
await self._reconfigure_listen_address(config)
async def _deny_listen_address_changes(self, config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
return Result.ok(None)
- async def sigint_handler(self):
+ async def sigint_handler(self) -> None:
logger.info("Received SIGINT, triggering graceful shutdown")
self.shutdown_event.set()
logger.error(f"Reloading of the configuration file failed: {e}")
logger.error("Configuration have NOT been changed.")
- async def start(self):
+ async def start(self) -> None:
self._setup_routes()
asyncio_compat.add_async_signal_handler(signal.SIGINT, self.sigint_handler)
asyncio_compat.add_async_signal_handler(signal.SIGHUP, self.sighup_handler)
await self.config_store.register_verifier(self._deny_listen_address_changes)
await self.config_store.register_on_change_callback(self._reconfigure)
- async def wait_for_shutdown(self):
+ async def wait_for_shutdown(self) -> None:
await self.shutdown_event.wait()
async def _handler_index(self, _request: web.Request) -> web.Response:
logger.info("Shutdown event triggered...")
return web.Response(text="Shutting down...")
- def _setup_routes(self):
+ def _setup_routes(self) -> None:
self.app.add_routes(
[
web.get("/", self._handler_index),
]
)
- async def _reconfigure_listen_address(self, config: KresConfig):
+ async def _reconfigure_listen_address(self, config: KresConfig) -> None:
async with self.listen_lock:
mgn = config.server.management
return
# start the new listen address
+ nsite: Union[web.TCPSite, web.UnixSite]
if mgn.listen.typ is ListenType.UNIX_SOCKET:
nsite = web.UnixSite(self.runner, str(mgn.listen.unix_socket))
logger.info(f"Starting API HTTP server on http+unix://{mgn.listen.unix_socket}")
self.listen = mgn.listen
self.site = nsite
- async def shutdown(self):
+ async def shutdown(self) -> None:
if self.site is not None:
await self.site.stop()
await self.runner.cleanup()
return Result.ok(None)
-def _set_working_directory(config_raw: ParsedTree):
+def _set_working_directory(config_raw: ParsedTree) -> None:
config = KresConfig(config_raw)
if not config.server.rundir.to_path().exists():
os.chdir(config.server.rundir.to_path())
-async def start_server(config: Union[Path, ParsedTree] = DEFAULT_MANAGER_CONFIG_FILE):
+async def start_server(config: Union[Path, ParsedTree] = DEFAULT_MANAGER_CONFIG_FILE) -> None:
start_time = time()
manager: Optional[KresManager] = None
asynchronously read whole file and return its content
"""
- def readfile_sync(path: Union[str, PurePath]):
+ def readfile_sync(path: Union[str, PurePath]) -> str:
with open(path, "r", encoding="utf8") as f:
return f.read()
return await to_thread(readfile_sync, path)
-async def writefile(path: Union[str, PurePath], content: str):
+async def writefile(path: Union[str, PurePath], content: str) -> None:
"""
asynchronously set content of a file to a given string `content`.
"""
- def writefile_sync(path: Union[str, PurePath], content: str):
+ def writefile_sync(path: Union[str, PurePath], content: str) -> int:
with open(path, "w", encoding="utf8") as f:
return f.write(content)
await to_thread(writefile_sync, path, content)
-async def wait_for_process_termination(pid: int, sleep_sec: float = 0):
+async def wait_for_process_termination(pid: int, sleep_sec: float = 0) -> None:
"""
will wait for any process (does not have to be a child process) given by its PID to terminate
sleep_sec configures the granularity, with which we should return
"""
- def wait_sync(pid: int, sleep_sec: float):
+ def wait_sync(pid: int, sleep_sec: float) -> None:
while True:
try:
os.kill(pid, 0)
self._removed_unit_names: "asyncio.Queue[T]" = asyncio.Queue()
self._main_event_loop = asyncio.get_event_loop()
- def dispatch_event(self, event: T):
+ def dispatch_event(self, event: T) -> None:
"""
Method to dispatch events from the blocking thread
"""
return {"type": "string"}
elif is_literal(typ):
- val = get_generic_type_arguments(typ)
- return {"enum": val}
+ lit = get_generic_type_arguments(typ)
+ return {"enum": lit}
elif is_union(typ):
variants = get_generic_type_arguments(typ)
TSource = Union[NoneType, ParsedTree, "SchemaNode", Dict[str, Any]]
-def _create_untouchable(name: str):
+def _create_untouchable(name: str) -> object:
class _Untouchable:
def __getattribute__(self, item_name: str) -> Any:
raise RuntimeError(f"You are not supposed to access object '{name}'.")
_PREVIOUS_SCHEMA: Optional[Type["SchemaNode"]] = None
- def _assign_default(self, name: str, python_type: Any, object_path: str):
+ def _assign_default(self, name: str, python_type: Any, object_path: str) -> None:
cls = self.__class__
default = getattr(cls, name, None)
value = _validated_object_type(python_type, default, object_path=f"{object_path}/{name}")
setattr(self, name, value)
- def _assign_field(self, name: str, python_type: Any, value: Any, object_path: str):
+ def _assign_field(self, name: str, python_type: Any, value: Any, object_path: str) -> None:
value = _validated_object_type(python_type, value, object_path=f"{object_path}/{name}")
setattr(self, name, value)
def to_raw(self) -> Union[Dict[str, Any], str, int, bool]:
return self._data
- def __getitem__(self, key: str):
+ def __getitem__(self, key: str) -> Any:
assert isinstance(self._data, dict)
return self._data[ParsedTree._convert_internal_field_name_to_external(key)]
- def __contains__(self, key: str):
+ def __contains__(self, key: str) -> bool:
assert isinstance(self._data, dict)
return ParsedTree._convert_internal_field_name_to_external(key) in self._data
def get_optional_inner_type(optional: Type[Optional[T]]) -> Type[T]:
assert is_optional(optional)
- return get_generic_type_arguments(optional)[0]
+ t: Type[T] = get_generic_type_arguments(optional)[0]
+ return t
def is_internal_field_name(field_name: str) -> bool:
optional = false
python-versions = "*"
+[[package]]
+name = "types-dataclasses"
+version = "0.6.4"
+description = "Typing stubs for dataclasses"
+category = "dev"
+optional = false
+python-versions = "*"
+
[[package]]
name = "types-jinja2"
version = "2.11.9"
[metadata]
lock-version = "1.1"
python-versions = "^3.6.8"
-content-hash = "db994629f529ada321967b8bb6e8d5c536461ec2fed89d986a2f71c348c24b30"
+content-hash = "5c4e322d694d291900f743ff1b4881c80311eae7320b41dae7edfbb2eb982d45"
[metadata.files]
aiohttp = [
{file = "types-click-7.1.8.tar.gz", hash = "sha256:b6604968be6401dc516311ca50708a0a28baa7a0cb840efd7412f0dbbff4e092"},
{file = "types_click-7.1.8-py3-none-any.whl", hash = "sha256:8cb030a669e2e927461be9827375f83c16b8178c365852c060a34e24871e7e81"},
]
+types-dataclasses = [
+ {file = "types-dataclasses-0.6.4.tar.gz", hash = "sha256:2f7ab6c565cf05cc7f27f31a4c2fcc803384e319aab292807b857ddf1473429f"},
+ {file = "types_dataclasses-0.6.4-py3-none-any.whl", hash = "sha256:fef6ed4742ca27996530c6d549cd704772a4a86e4781841c9bb387001e369ec3"},
+]
types-jinja2 = [
{file = "types-Jinja2-2.11.9.tar.gz", hash = "sha256:dbdc74a40aba7aed520b7e4d89e8f0fe4286518494208b35123bcf084d4b8c81"},
{file = "types_Jinja2-2.11.9-py3-none-any.whl", hash = "sha256:60a1e21e8296979db32f9374d8a239af4cb541ff66447bb915d8ad398f9c63b2"},
mypy = "^0.930"
types-click = "^7.1.8"
types-Jinja2 = "^2.11.9"
+types-dataclasses = "^0.6.4"
[tool.poe.tasks]
run = { cmd = "scripts/run", help = "Run the manager" }
[build-system]
requires = ["poetry>=0.12"]
build-backend = "poetry.masonry.api"
+
+
+[tool.mypy]
+python_version = "3.6"
+#strict = true
+disallow_any_generics = true
+disallow_subclassing_any = true
+disallow_untyped_calls = false
+disallow_untyped_decorators = true
+pretty = true
+show_error_codes = true
+allow_redefinition = true
+disallow_untyped_defs = false
+strict_equality = true
+disallow_incomplete_defs = true
+check_untyped_defs = true
+implicit_reexport = false
+no_implicit_optional = true
check_rv $?
echo
+# check types with mypy
+echo -e "${yellow}Type checking using mypy...${reset}"
+mypy knot_resolver_manager
+check_rv $?
+echo
+
# check that setup.py is not behind pyproject.toml
echo -e "${yellow}Checking setup.py${reset}"