# ruff: noqa: SLF001
import re
-from typing import Any, Dict, Type, Union
+from typing import Any, Dict, Pattern, Type, Union
-from knot_resolver.utils.compat.typing import Pattern
from knot_resolver.utils.modeling import BaseValueType
"""
import argparse
+import asyncio
import sys
from typing import NoReturn
from knot_resolver.constants import CONFIG_FILE, VERSION
from knot_resolver.manager.logger import logger_startup
from knot_resolver.manager.server import start_server
-from knot_resolver.utils import compat
def parse_args() -> argparse.Namespace:
# parse arguments
args = parse_args()
- exit_code = compat.asyncio.run(start_server(config=args.config))
+ exit_code = asyncio.run(start_server(config=args.config))
sys.exit(exit_code)
only_on_real_changes_verifier,
)
from knot_resolver.manager.files import files_reload
-from knot_resolver.utils.compat.asyncio import create_task
from knot_resolver.utils.functional import Result
from knot_resolver.utils.modeling.types import NoneType
# initialize subprocess controller
logger.debug("Starting controller")
await self._controller.initialize_controller(config_store.get())
- self._processes_watchdog_task = create_task(self._processes_watchdog())
+ self._processes_watchdog_task = asyncio.create_task(self._processes_watchdog())
logger.debug("Looking for already running workers")
await self._collect_already_running_workers()
from knot_resolver.controller.registered_workers import get_registered_workers_kresids
from knot_resolver.datamodel.config_schema import KresConfig
from knot_resolver.manager.config_store import ConfigStore, only_on_real_changes_update
-from knot_resolver.utils import compat
+from knot_resolver.utils.compat import asyncio as asyncio_compat
from knot_resolver.utils.functional import Result
from .collect import collect_kresd_workers_metrics
# the Prometheus library. We just have to prevent the library from invoking it again. See the mentioned
# function for details
- if compat.asyncio.is_event_loop_running():
+ if asyncio_compat.is_event_loop_running():
# when running, we can schedule the new data collection
if self._collection_task is not None and not self._collection_task.done():
logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!")
else:
- self._collection_task = compat.asyncio.create_task(
+ self._collection_task = asyncio.create_task(
self.collect_kresd_stats(_triggered_from_prometheus_library=True)
)
else:
# when not running, we can start a new loop (we are not in the manager's main thread)
- compat.asyncio.run(self.collect_kresd_stats(_triggered_from_prometheus_library=True))
+ asyncio.run(self.collect_kresd_stats(_triggered_from_prometheus_library=True))
@only_on_real_changes_update(lambda c: c.monitoring.graphite)
async def _init_graphite_bridge(config: KresConfig, force: bool = False) -> None:
else:
update_with = parse_from_mime_type(await request.text(), request.content_type)
document_path = request.match_info["path"]
- getheaders = ignore_exceptions_optional(List[str], None, KeyError)(request.headers.getall)
+ getheaders = ignore_exceptions_optional(KeyError, None)(request.headers.getall)
etags = getheaders("if-match")
not_etags = getheaders("if-none-match")
current_config: Dict[str, Any] = self.config_store.get().get_unparsed_data()
+import asyncio
import logging
from threading import Timer
from typing import Dict, Optional
from knot_resolver.controller.registered_workers import command_registered_workers
from knot_resolver.datamodel import KresConfig
-from knot_resolver.utils import compat
+from knot_resolver.utils.compat import asyncio as asyncio_compat
from knot_resolver.utils.requests import SocketDesc, request
logger = logging.getLogger(__name__)
def trigger_cmd(self, cmd: str) -> None:
def _cmd() -> None:
- if compat.asyncio.is_event_loop_running():
- compat.asyncio.create_task(command_registered_workers(cmd))
+ if asyncio_compat.is_event_loop_running():
+ asyncio.create_task(command_registered_workers(cmd)) # noqa: RUF006
else:
- compat.asyncio.run(command_registered_workers(cmd))
+ asyncio.run(command_registered_workers(cmd))
logger.info(f"Sending '{cmd}' command to reload watched files has finished")
# skipping if command was already triggered