]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
python: applied changes from utils/* modules docs-python-utils-ux4c0w/deployments/8492 python-utils-cleanup 1791/head
authorAleš Mrázek <ales.mrazek@nic.cz>
Mon, 12 Jan 2026 13:06:29 +0000 (14:06 +0100)
committerAleš Mrázek <ales.mrazek@nic.cz>
Fri, 30 Jan 2026 23:05:10 +0000 (00:05 +0100)
python/knot_resolver/datamodel/types/base_types.py
python/knot_resolver/manager/main.py
python/knot_resolver/manager/manager.py
python/knot_resolver/manager/metrics/prometheus.py
python/knot_resolver/manager/server.py
python/knot_resolver/manager/triggers.py

index c034fdb8523307e3cc25292064705ae6a3b8382f..fb131b5ca17a0085e73391a774112c2e42c4df03 100644 (file)
@@ -1,9 +1,8 @@
 # ruff: noqa: SLF001
 
 import re
-from typing import Any, Dict, Type, Union
+from typing import Any, Dict, Pattern, Type, Union
 
-from knot_resolver.utils.compat.typing import Pattern
 from knot_resolver.utils.modeling import BaseValueType
 
 
index 1068373365c0431e1ecdbd3d68933739f5f66679..e629d2507b8768edbb452d9c23d50d3f5f468d39 100644 (file)
@@ -6,13 +6,13 @@ file to allow us to exclude the __main__.py file from black's autoformatting
 """
 
 import argparse
+import asyncio
 import sys
 from typing import NoReturn
 
 from knot_resolver.constants import CONFIG_FILE, VERSION
 from knot_resolver.manager.logger import logger_startup
 from knot_resolver.manager.server import start_server
-from knot_resolver.utils import compat
 
 
 def parse_args() -> argparse.Namespace:
@@ -48,5 +48,5 @@ def main() -> NoReturn:
     # parse arguments
     args = parse_args()
 
-    exit_code = compat.asyncio.run(start_server(config=args.config))
+    exit_code = asyncio.run(start_server(config=args.config))
     sys.exit(exit_code)
index b19ac8428208b5b3b25d441c74ba59c0426df2fb..3a99d333783ef32d01ad7bb145f89af736a54112 100644 (file)
@@ -18,7 +18,6 @@ from knot_resolver.manager.config_store import (
     only_on_real_changes_verifier,
 )
 from knot_resolver.manager.files import files_reload
-from knot_resolver.utils.compat.asyncio import create_task
 from knot_resolver.utils.functional import Result
 from knot_resolver.utils.modeling.types import NoneType
 
@@ -103,7 +102,7 @@ class KresManager:  # pylint: disable=too-many-instance-attributes
         # initialize subprocess controller
         logger.debug("Starting controller")
         await self._controller.initialize_controller(config_store.get())
-        self._processes_watchdog_task = create_task(self._processes_watchdog())
+        self._processes_watchdog_task = asyncio.create_task(self._processes_watchdog())
         logger.debug("Looking for already running workers")
         await self._collect_already_running_workers()
 
index dd92ce9b632d7204fc8d8d8323265bf892643a13..dbabd44432fda05244a412b1ac436b5d73331011 100644 (file)
@@ -7,7 +7,7 @@ from knot_resolver.controller.interface import KresID
 from knot_resolver.controller.registered_workers import get_registered_workers_kresids
 from knot_resolver.datamodel.config_schema import KresConfig
 from knot_resolver.manager.config_store import ConfigStore, only_on_real_changes_update
-from knot_resolver.utils import compat
+from knot_resolver.utils.compat import asyncio as asyncio_compat
 from knot_resolver.utils.functional import Result
 
 from .collect import collect_kresd_workers_metrics
@@ -387,18 +387,18 @@ if PROMETHEUS_LIB:
             # the Prometheus library. We just have to prevent the library from invoking it again. See the mentioned
             # function for details
 
-            if compat.asyncio.is_event_loop_running():
+            if asyncio_compat.is_event_loop_running():
                 # when running, we can schedule the new data collection
                 if self._collection_task is not None and not self._collection_task.done():
                     logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!")
                 else:
-                    self._collection_task = compat.asyncio.create_task(
+                    self._collection_task = asyncio.create_task(
                         self.collect_kresd_stats(_triggered_from_prometheus_library=True)
                     )
 
             else:
                 # when not running, we can start a new loop (we are not in the manager's main thread)
-                compat.asyncio.run(self.collect_kresd_stats(_triggered_from_prometheus_library=True))
+                asyncio.run(self.collect_kresd_stats(_triggered_from_prometheus_library=True))
 
     @only_on_real_changes_update(lambda c: c.monitoring.graphite)
     async def _init_graphite_bridge(config: KresConfig, force: bool = False) -> None:
index c0e5312eefeb8209cb7455acbae4288e9f7a1da5..cb04690ee7a7fa2b49aff1d56499493daf4b870a 100644 (file)
@@ -223,7 +223,7 @@ class Server:
         else:
             update_with = parse_from_mime_type(await request.text(), request.content_type)
         document_path = request.match_info["path"]
-        getheaders = ignore_exceptions_optional(List[str], None, KeyError)(request.headers.getall)
+        getheaders = ignore_exceptions_optional(KeyError, None)(request.headers.getall)
         etags = getheaders("if-match")
         not_etags = getheaders("if-none-match")
         current_config: Dict[str, Any] = self.config_store.get().get_unparsed_data()
index f823f3ab3fd7ab18b726abf36f17d6da5d2d472f..8f639baabda82eb6b50565bd63246c5b73743230 100644 (file)
@@ -1,3 +1,4 @@
+import asyncio
 import logging
 from threading import Timer
 from typing import Dict, Optional
@@ -5,7 +6,7 @@ from urllib.parse import quote
 
 from knot_resolver.controller.registered_workers import command_registered_workers
 from knot_resolver.datamodel import KresConfig
-from knot_resolver.utils import compat
+from knot_resolver.utils.compat import asyncio as asyncio_compat
 from knot_resolver.utils.requests import SocketDesc, request
 
 logger = logging.getLogger(__name__)
@@ -37,10 +38,10 @@ class Triggers:
 
     def trigger_cmd(self, cmd: str) -> None:
         def _cmd() -> None:
-            if compat.asyncio.is_event_loop_running():
-                compat.asyncio.create_task(command_registered_workers(cmd))
+            if asyncio_compat.is_event_loop_running():
+                asyncio.create_task(command_registered_workers(cmd))  # noqa: RUF006
             else:
-                compat.asyncio.run(command_registered_workers(cmd))
+                asyncio.run(command_registered_workers(cmd))
             logger.info(f"Sending '{cmd}' command to reload watched files has finished")
 
         # skipping if command was already triggered