This project uses [`apkg`](https://gitlab.nic.cz/packaging/apkg) for packaging. See [`distro/README.md`](distro/README.md) for packaging specific instructions.
-Not yet properly implemented. Ideal situation would be a command like `poe package` which would create all possible packages.
-
-Temporary solution to build a wheel/sdist - just call `poetry build`. The result will be in the `dist/` directory.
-
## FAQ
### What all those dev dependencies for?
PYTHONHASHSEED=random \
PYTHONDONTWRITEBYTECODE=1
+RUN dnf install -y knot-resolver procps-ng
# How does this work?
# ===================
# We do not have to install the dependencies every single time. They are cached in the container itself and
# we can rebuild it only when it's definition or the list of dependencies changes.
-COPY pyproject.toml package.json .
+COPY pyproject.toml poetry.lock package.json .
RUN source $HOME/.poetry/env \
&& poetry config --list \
&& poetry env use $(which python3.6) \
runtime/
-cache/
\ No newline at end of file
+cache/
\ No newline at end of file
import logging
-from sys import exc_info
+import sys
from typing import Callable
from knot_resolver_manager.client import KnotManagerClient, count_running_kresds, start_manager_in_background
logger = logging.getLogger(__name__)
-def test_wrapper(test: Test):
+def test_wrapper(test: Test) -> bool:
p = start_manager_in_background(HOST, PORT)
client = KnotManagerClient(BASE_URL)
client.wait_for_initialization()
logger.info("Starting test %s", test.__name__)
try:
- res = test(client)
+ test(client)
+ res = True
except AssertionError:
logger.error("Test %s failed", exc_info=True)
res = False
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
- test_wrapper(worker_count)
- # test_wrapper(crash_resistance)
+ success = True
+ success &= test_wrapper(worker_count)
+ # success &= test_wrapper(crash_resistance)
+ sys.exit(int(not success))
MANAGER_CONFIG_FILE = CONFIGURATION_DIR / "config.yml"
LISTEN_SOCKET_PATH = RUNTIME_DIR / "manager.sock"
+
+"""
+Used in KresdManager. It's a number of seconds in between system health checks.
+"""
+WATCHDOG_INTERVAL: float = 5
--- /dev/null
+import itertools
+import weakref
+from typing import Optional
+
+from knot_resolver_manager.utils import ignore_exceptions_optional
+
+
+class KresID:
+ """
+ ID object. Effectively only a wrapper around an int, so that the references
+ behave normally (bypassing integer interning and other optimizations)
+ """
+
+ def __init__(self, n: int):
+ self._id = n
+ self._repr: Optional[str] = None
+
+ def set_custom_str_representation(self, representation: str):
+ self._repr = representation
+
+ def __str__(self) -> str:
+ if self._repr is None:
+ return str(self._id)
+ else:
+ return self._repr
+
+ def __hash__(self) -> int:
+ return self._id
+
+ def __eq__(self, o: object) -> bool:
+ return isinstance(o, KresID) and self._id == o._id
+
+
+_used: "weakref.WeakSet[KresID]" = weakref.WeakSet()
+
+
+def alloc() -> KresID:
+ for i in itertools.count(start=1):
+ val = KresID(i)
+ if val not in _used:
+ _used.add(val)
+ return val
+
+ raise RuntimeError("Reached an end of an infinite loop. How?")
+
+
+def alloc_from_string(val: str) -> KresID:
+ int_val = ignore_exceptions_optional(int, None, ValueError)(int)(val)
+ if int_val is not None:
+ res = KresID(int_val)
+ assert res not in _used
+ _used.add(res)
+ return res
+ else:
+ res = alloc()
+ res.set_custom_str_representation(val)
+ return res
import asyncio
-import itertools
import logging
-import weakref
+import sys
+from asyncio.futures import Future
from subprocess import SubprocessError
-from typing import List, Optional, Type
+from typing import List, Optional
-from knot_resolver_manager.constants import KRESD_CONFIG_FILE
+import knot_resolver_manager.kresd_controller
+from knot_resolver_manager import kres_id
+from knot_resolver_manager.compat.asyncio import create_task
+from knot_resolver_manager.constants import KRESD_CONFIG_FILE, WATCHDOG_INTERVAL
from knot_resolver_manager.exceptions import ValidationException
-from knot_resolver_manager.kresd_controller import get_best_controller_implementation
-from knot_resolver_manager.kresd_controller.interface import Subprocess, SubprocessController, SubprocessType
+from knot_resolver_manager.kresd_controller.interface import (
+ Subprocess,
+ SubprocessController,
+ SubprocessStatus,
+ SubprocessType,
+)
from knot_resolver_manager.utils.async_utils import writefile
from .datamodel import KresConfig
logger = logging.getLogger(__name__)
-class _PrettyID:
- """
- ID object. Effectively only a wrapper around an int, so that the references
- behave normally (bypassing integer interning and other optimizations)
- """
-
- def __init__(self, n: int):
- self._id = n
-
- def __str__(self):
- return str(self._id)
-
- def __hash__(self) -> int:
- return self._id
-
- def __eq__(self, o: object) -> bool:
- return isinstance(o, _PrettyID) and self._id == o._id
-
-
-class _PrettyIDAllocator:
- """
- Pretty numeric ID allocator. Keeps weak refences to the IDs it has
- allocated. The IDs get recycled once the previously allocated ID
- objects get garbage collected
- """
-
- def __init__(self):
- self._used: "weakref.WeakSet[_PrettyID]" = weakref.WeakSet()
-
- def alloc(self) -> _PrettyID:
- for i in itertools.count(start=1):
- val = _PrettyID(i)
- if val not in self._used:
- self._used.add(val)
- return val
-
- raise RuntimeError("Reached an end of an infinite loop. How?")
-
-
class KresManager:
"""
Core of the whole operation. Orchestrates individual instances under some
Instantiate with `KresManager.create()`, not with the usual constructor!
"""
- @classmethod
- async def create(cls: Type["KresManager"], controller: Optional[SubprocessController]) -> "KresManager":
- obj = cls()
- await obj._async_init(controller) # pylint: disable=protected-access
- return obj
+ _instance_lock = asyncio.Lock()
+ _instance: Optional["KresManager"] = None
+
+ @staticmethod
+ async def create_instance(selected_controller: Optional[SubprocessController]) -> "KresManager":
+ """
+ Creates new singleton instance of KresManager. Can be called only once. Afterwards, use
+ `KresManager.get_instance()` to obtain the already existing instance
+ """
+
+ assert KresManager._instance is None
+
+ async with KresManager._instance_lock:
+ # trying to create, but racing and somebody already did it
+ if KresManager._instance is not None:
+ raise AssertionError("Must NOT call `create_instance` multiple times - race detected!")
+
+ # create it for real
+ inst = KresManager(_i_know_what_i_am_doing=True)
+ await inst._async_init(selected_controller) # pylint: disable=protected-access
+ KresManager._instance = inst
+ return inst
+
+ @staticmethod
+ def get_instance() -> "KresManager":
+ """
+ Obtain reference to the singleton instance of this class. If you want to create an instance,
+ use `create_instance()`
+ """
+ assert KresManager._instance is not None
+ return KresManager._instance
async def _async_init(self, selected_controller: Optional[SubprocessController]):
if selected_controller is None:
- self._controller = await get_best_controller_implementation()
+ self._controller = await knot_resolver_manager.kresd_controller.get_best_controller_implementation()
else:
self._controller = selected_controller
await self._controller.initialize_controller()
+ self._watchdog_task = create_task(self._watchdog())
await self.load_system_state()
- def __init__(self):
+ def __init__(self, _i_know_what_i_am_doing: bool = False):
+ if not _i_know_what_i_am_doing:
+ logger.error(
+ "Trying to create an instance of KresManager using normal contructor. Please use "
+ "`KresManager.get_instance()` instead"
+ )
+ sys.exit(1)
+
self._workers: List[Subprocess] = []
self._gc: Optional[Subprocess] = None
self._manager_lock = asyncio.Lock()
self._controller: SubprocessController
self._last_used_config: Optional[KresConfig] = None
- self._id_allocator = _PrettyIDAllocator()
+ self._watchdog_task: Optional["Future[None]"] = None
async def load_system_state(self):
async with self._manager_lock:
await self._collect_already_running_children()
async def _spawn_new_worker(self):
- subprocess = await self._controller.create_subprocess(SubprocessType.KRESD, self._id_allocator.alloc())
+ subprocess = await self._controller.create_subprocess(SubprocessType.KRESD, kres_id.alloc())
await subprocess.start()
self._workers.append(subprocess)
return self._gc is not None
async def _start_gc(self):
- subprocess = await self._controller.create_subprocess(SubprocessType.GC, "gc")
+ subprocess = await self._controller.create_subprocess(SubprocessType.GC, kres_id.alloc())
await subprocess.start()
self._gc = subprocess
await self._ensure_number_of_children(0)
await self._controller.shutdown_controller()
+ if self._watchdog_task is not None:
+ self._watchdog_task.cancel()
+
def get_last_used_config(self) -> Optional[KresConfig]:
return self._last_used_config
+
+ async def _instability_handler(self) -> None:
+ logger.error("Instability callback invoked. No idea how to react, performing suicide. See you later!")
+ sys.exit(1)
+
+ async def _watchdog(self) -> None:
+ while True:
+ await asyncio.sleep(WATCHDOG_INTERVAL)
+
+ # gather current state
+ units = {u.id: u for u in await self._controller.get_subprocess_info()}
+ worker_ids = [x.id for x in self._workers]
+ invoke_callback = False
+
+ for w in worker_ids:
+ if w not in units:
+ logger.error("Expected to find subprocess with id '%s' in the system, but did not.", w)
+ invoke_callback = True
+ continue
+
+ if units[w].status is SubprocessStatus.FAILED:
+ logger.error("Subprocess '%s' is failed.", w)
+ invoke_callback = True
+ continue
+
+ if units[w].status is SubprocessStatus.UNKNOWN:
+ logger.warning("Subprocess '%s' is in unknown state!", w)
+
+ if invoke_callback:
+ await self._instability_handler()
"""
This file contains autodetection logic for available subprocess controllers. Because we have to catch errors
-from imports, you can not see a simple list, but it's more complicated.
+from imports, they are located in functions which are invoked at the end of this file.
"""
# pylint: disable=import-outside-toplevel
+from dataclasses import dataclass
from enum import Enum, auto
-from typing import Iterable
+from typing import Iterable, List
+
+from knot_resolver_manager.kres_id import KresID
class SubprocessType(Enum):
return hash(type(self)) ^ hash(self.type) ^ hash(self.id)
+class SubprocessStatus(Enum):
+ RUNNING = auto()
+ FAILED = auto()
+ UNKNOWN = auto()
+
+
+@dataclass
+class SubprocessInfo:
+ id: str
+ status: SubprocessStatus
+
+
class SubprocessController:
"""
The common Subprocess Controller interface. This is what KresManager requires and what has to be implemented by all
"""
raise NotImplementedError()
- async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: object) -> Subprocess:
+ async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: KresID) -> Subprocess:
"""
Return a Subprocess object which can be operated on. The subprocess is not
started or in any way active after this call. That has to be performaed manually
Must NOT be called before initialize_controller()
"""
raise NotImplementedError()
+
+ async def get_subprocess_info(self) -> List[SubprocessInfo]:
+ """
+ Get a status of running subprocesses as seen by the controller. This method actively polls
+ for information.
+ """
+ raise NotImplementedError()
import logging
-from asyncio.futures import Future
-from typing import Any, Iterable, Set
-
-from knot_resolver_manager.compat.asyncio import create_task
-from knot_resolver_manager.kresd_controller.interface import Subprocess, SubprocessController, SubprocessType
+from typing import Iterable, List, Set
+
+from knot_resolver_manager.compat.asyncio import to_thread
+from knot_resolver_manager.kres_id import KresID, alloc_from_string
+from knot_resolver_manager.kresd_controller.interface import (
+ Subprocess,
+ SubprocessController,
+ SubprocessInfo,
+ SubprocessType,
+)
from .config import (
SupervisordConfig,
is_supervisord_available,
is_supervisord_running,
list_ids_from_existing_config,
+ list_subprocesses,
restart,
start_supervisord,
stop_supervisord,
update_config,
- watchdog,
)
logger = logging.getLogger(__name__)
class SupervisordSubprocess(Subprocess):
- def __init__(self, controller: "SupervisordSubprocessController", id_: object, type_: SubprocessType):
+ def __init__(self, controller: "SupervisordSubprocessController", id_: KresID, type_: SubprocessType):
self._controller: "SupervisordSubprocessController" = controller
- self._id = id_
+ self._id: KresID = id_
self._type: SubprocessType = type_
@property
class SupervisordSubprocessController(SubprocessController):
def __init__(self):
self._running_instances: Set[SupervisordSubprocess] = set()
- self._watchdog_task: "Future[Any]"
def __str__(self):
return "supervisord"
res = await is_supervisord_available()
if not res:
logger.info("Failed to find usable supervisord.")
+
+ logger.debug("Detection - supervisord controller is available for use")
return res
async def _update_config_with_real_state(self):
if running:
ids = await list_ids_from_existing_config()
for tp, id_ in ids:
- self._running_instances.add(SupervisordSubprocess(self, id_, tp))
+ self._running_instances.add(SupervisordSubprocess(self, alloc_from_string(id_), tp))
async def get_all_running_instances(self) -> Iterable[Subprocess]:
await self._update_config_with_real_state()
if not await is_supervisord_running():
config = self._create_config()
await start_supervisord(config)
- self._watchdog_task = create_task(watchdog())
async def shutdown_controller(self) -> None:
- self._watchdog_task.cancel()
await stop_supervisord()
async def start_subprocess(self, subprocess: SupervisordSubprocess):
assert subprocess in self._running_instances
await restart(subprocess.id)
- async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: object) -> Subprocess:
+ async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: KresID) -> Subprocess:
return SupervisordSubprocess(self, id_hint, subprocess_type)
+
+ async def get_subprocess_info(self) -> List[SubprocessInfo]:
+ return await to_thread(list_subprocesses)
-import asyncio
import configparser
import logging
import os
import signal
-import sys
from os import kill
from pathlib import Path
from typing import Any, List, Optional, Set, Tuple
import supervisor.xmlrpc
from jinja2 import Template
-from knot_resolver_manager.compat.asyncio import to_thread
from knot_resolver_manager.compat.dataclasses import dataclass
from knot_resolver_manager.constants import (
GC_EXECUTABLE,
SUPERVISORD_SOCK,
SUPERVISORD_SUBPROCESS_LOG_DIR,
)
-from knot_resolver_manager.kresd_controller.interface import Subprocess, SubprocessType
+from knot_resolver_manager.kresd_controller.interface import (
+ Subprocess,
+ SubprocessInfo,
+ SubprocessStatus,
+ SubprocessType,
+)
from knot_resolver_manager.utils.async_utils import (
call,
read_resource,
return True
-def list_fatal_subprocesses_ids() -> List[str]:
+def list_subprocesses() -> List[SubprocessInfo]:
proxy = ServerProxy(
"http://127.0.0.1",
transport=supervisor.xmlrpc.SupervisorTransport(None, None, serverurl="unix://" + str(SUPERVISORD_SOCK)),
)
processes: Any = proxy.supervisor.getAllProcessInfo()
- return [pr["name"] for pr in processes if pr["statename"] == "FATAL"]
+
+ def convert(proc: Any) -> SubprocessInfo:
+ conversion_tbl = {
+ "FATAL": SubprocessStatus.FAILED,
+ "EXITED": SubprocessStatus.FAILED,
+ "RUNNING": SubprocessStatus.RUNNING,
+ }
+
+ if proc["statename"] in conversion_tbl:
+ status = conversion_tbl[proc["statename"]]
+ else:
+ status = SubprocessStatus.UNKNOWN
+
+ return SubprocessInfo(id=proc["name"], status=status)
+
+ return [convert(pr) for pr in processes]
def create_id(type_name: SubprocessType, id_: object) -> str:
program_id = section.replace("program:", "")
res.append(parse_id(program_id))
return res
-
-
-async def watchdog() -> None:
- while True:
- # the sleep is split into two, because is very likely a problem will occur
- # just after start
- await asyncio.sleep(10)
-
- logger.debug("Watchdog running and checking system's sanity")
-
- # check that supervisord is running fine
- if not await is_supervisord_running():
- logger.error(
- "Supervisord is not running! It might have crashed, it might "
- "have been stopped. This behavior is unexpected and we can't "
- "really tell what's happening right now. The safest option is "
- "to terminate... Sorry and bye!"
- )
- sys.exit(1)
-
- # check that there are no subprocesses in FATAL state
- fatals = await to_thread(list_fatal_subprocesses_ids)
- if len(fatals) != 0:
- logger.error(
- "Some kresd instances are in a FATAL state. The configuration "
- "provided was probably invalid and passed the validation. You "
- "might find it helpful to look at logfiles for ids %s."
- "Sadly, there is no safer way forward than a simple suicide.",
- fatals,
- )
- logger.info("Killing supervisord")
- await stop_supervisord()
- logger.info("So Long, and Thanks for All the Fish")
- sys.exit(1)
-
- await asyncio.sleep(WATCHDOG_INTERVAL - 10)
from typing import Iterable, List
from knot_resolver_manager import compat
-from knot_resolver_manager.kresd_controller.interface import Subprocess, SubprocessController, SubprocessType
+from knot_resolver_manager.compat.asyncio import to_thread
+from knot_resolver_manager.kres_id import KresID, alloc, alloc_from_string
+from knot_resolver_manager.kresd_controller.interface import (
+ Subprocess,
+ SubprocessController,
+ SubprocessInfo,
+ SubprocessStatus,
+ SubprocessType,
+)
from knot_resolver_manager.utils.async_utils import call
from . import dbus_api as systemd
def __init__(
self,
type_: SubprocessType,
- id_: object,
+ id_: KresID,
systemd_type: systemd.SystemdType,
persistance_type: SystemdPersistanceType = SystemdPersistanceType.PERSISTENT,
):
self._type = type_
- self._id = id_
+ self._id: KresID = id_
self._systemd_type = systemd_type
self._persistance_type = persistance_type
]
return f"kresd{sep}{self._id}.service"
+ @staticmethod
+ def id_could_be_ours(unit_name: str) -> bool:
+ is_ours = unit_name == "kres-cache-gc.service"
+ is_ours |= unit_name.startswith("kresd") and unit_name.endswith(".service")
+ return is_ours
+
@property
def type(self):
return self._type
async def is_running(self) -> bool:
raise NotImplementedError()
+ async def _on_unexpected_termination(self):
+ logger.warning("Detected unexpected termination of unit %s", self.id)
+
async def start(self):
if self._persistance_type is SystemdPersistanceType.PERSISTENT:
await compat.asyncio.to_thread(systemd.start_unit, self._systemd_type, self.id)
async def get_all_running_instances(self) -> Iterable[Subprocess]:
res: List[SystemdSubprocess] = []
- units = await compat.asyncio.to_thread(systemd.list_units, self._systemd_type)
+ units = await compat.asyncio.to_thread(systemd.list_unit_names, self._systemd_type)
for unit in units:
u: str = unit
if u.startswith("kresd") and u.endswith(".service"):
iden = u.replace("kresd", "")[1:].replace(".service", "")
persistance_type = SystemdPersistanceType.PERSISTENT if "@" in u else SystemdPersistanceType.TRANSIENT
- res.append(SystemdSubprocess(SubprocessType.KRESD, iden, self._systemd_type, persistance_type))
+ res.append(
+ SystemdSubprocess(
+ SubprocessType.KRESD,
+ alloc_from_string(iden),
+ self._systemd_type,
+ persistance_type,
+ )
+ )
elif u == "kres-cache-gc.service":
- res.append(SystemdSubprocess(SubprocessType.GC, "", self._systemd_type))
+ res.append(SystemdSubprocess(SubprocessType.GC, alloc(), self._systemd_type))
return res
async def initialize_controller(self) -> None:
async def shutdown_controller(self) -> None:
pass
- async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: object) -> Subprocess:
+ async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: KresID) -> Subprocess:
return SystemdSubprocess(subprocess_type, id_hint, self._systemd_type, self._persistance_type)
+
+ async def get_subprocess_info(self) -> List[SubprocessInfo]:
+ def convert(u: systemd.Unit) -> SubprocessInfo:
+ status_lookup_table = {"failed": SubprocessStatus.FAILED, "running": SubprocessStatus.RUNNING}
+
+ if u.state in status_lookup_table:
+ status = status_lookup_table[u.state]
+ else:
+ status = SubprocessStatus.UNKNOWN
+
+ return SubprocessInfo(id=u.name, status=status)
+
+ return list(map(convert, await to_thread(systemd.list_units, self._systemd_type)))
# pyright: reportUnknownMemberType=false
# pyright: reportMissingTypeStubs=false
+import logging
+from dataclasses import dataclass
from enum import Enum, auto
from threading import Thread
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from knot_resolver_manager.exceptions import SubprocessControllerException
from knot_resolver_manager.kresd_controller.interface import SubprocessType
+logger = logging.getLogger(__name__)
+
class SystemdType(Enum):
SYSTEM = auto()
return res
-def list_units(type_: SystemdType) -> List[str]:
- return [str(u[0]) for u in _create_manager_proxy(type_).ListUnits()]
+@dataclass
+class Unit:
+ name: str
+ state: str
+
+
+def _list_units_internal(type_: SystemdType) -> List[Any]:
+ return _create_manager_proxy(type_).ListUnits()
+
+
+def list_units(type_: SystemdType) -> List[Unit]:
+ return [Unit(name=str(u[0]), state=str(u[4])) for u in _list_units_internal(type_)]
+
+
+def list_unit_names(type_: SystemdType) -> List[str]:
+ return [str(u[0]) for u in _list_units_internal(type_)]
+
+
+def list_failed_unit_names(type_: SystemdType) -> List[str]:
+ return [str(u[0]) for u in _list_units_internal(type_) if str(u[3]) == "failed"]
def restart_unit(type_: SystemdType, unit_name: str):
import asyncio
import logging
import sys
-from functools import partial
from http import HTTPStatus
from pathlib import Path
from time import time
from .datamodel import KresConfig
from .kres_manager import KresManager
-_MANAGER = "kres_manager"
_SHUTDOWN_EVENT = "shutdown-event"
logger = logging.getLogger(__name__)
-async def _index(request: web.Request) -> web.Response:
+async def _index(_request: web.Request) -> web.Response:
"""
Dummy index handler to indicate that the server is indeed running...
"""
return json_response(
{
"msg": "Knot Resolver Manager is running! The configuration endpoint is at /config",
- "status": "RUNNING" if get_kres_manager(request.app) is not None else "INITIALIZING",
+ "status": "RUNNING",
}
)
document_path = request.match_info["path"]
- manager: KresManager = get_kres_manager(request.app)
+ manager: KresManager = KresManager.get_instance()
if manager is None:
# handle the case when the manager is not yet initialized
return web.Response(
logger.info("Shutdown event triggered...")
-def get_kres_manager(app: web.Application) -> KresManager:
- if _MANAGER not in app:
- raise ValueError("Accessing manager in an application where it was not defined")
-
- return app[_MANAGER]
-
-
class _DefaultSentinel:
pass
async def _init_manager(
config: Union[None, Path, KresConfig, _DefaultSentinel],
subprocess_controller_name: Optional[str],
- app: web.Application,
):
"""
Called asynchronously when the application initializes.
# Create KresManager. This will perform autodetection of available service managers and
# select the most appropriate to use (or use the one configured directly)
- manager = await KresManager.create(controller)
- app[_MANAGER] = manager
+ manager = await KresManager.create_instance(controller)
# Initial configuration of the manager
if config is None:
):
start_time = time()
- app = web.Application(middlewares=[error_handler])
+ # before starting any server, initialize the subprocess controller etc.
+ await _init_manager(config, subprocess_controller_name)
- app[_MANAGER] = None
+ app = web.Application(middlewares=[error_handler])
app[_SHUTDOWN_EVENT] = asyncio.Event()
- app.on_startup.append(partial(_init_manager, config, subprocess_controller_name))
-
# configure routing
setup_routes(app)
await app[_SHUTDOWN_EVENT].wait()
logger.info("Gracefull shutdown triggered. Cleaning up...")
await runner.cleanup()
- await get_kres_manager(app).stop()
+ await KresManager.get_instance().stop()
logger.info(f"The manager run for {round(time() - start_time)} seconds... Hope it served well. Bye!")
__all__ = [
"ignore_exceptions_optional",
"ignore_exceptions",
- "types",
"DataclassParserValidatorMixin",
"Overloaded",
]
import time
from asyncio import create_subprocess_exec, create_subprocess_shell
from pathlib import PurePath
-from typing import List, Optional, Union
+from threading import Thread
+from typing import Generic, List, Optional, TypeVar, Union
from knot_resolver_manager.compat.asyncio import to_thread
async def read_resource(package: str, filename: str) -> Optional[bytes]:
return await to_thread(pkgutil.get_data, package, filename)
+
+
+T = TypeVar("T")
+
+
+class BlockingEventDispatcher(Thread, Generic[T]):
+ def __init__(self, name: str = "blocking_event_dispatcher") -> None:
+ super().__init__(name=name, daemon=True)
+ # warning: the asyncio queue is not thread safe
+ self._removed_unit_names: "asyncio.Queue[T]" = asyncio.Queue()
+ self._main_event_loop = asyncio.get_event_loop()
+
+ def dispatch_event(self, event: T):
+ """
+ Method to dispatch events from the blocking thread
+ """
+
+ async def add_to_queue():
+ await self._removed_unit_names.put(event)
+
+ self._main_event_loop.call_soon_threadsafe(add_to_queue)
+
+ async def next_event(self) -> T:
+ return await self._removed_unit_names.get()
"logging-fstring-interpolation", # see https://github.com/PyCQA/pylint/issues/1788
"no-else-raise", # not helpful for readability, when we want explicit branches
"raising-bad-type", # handled by type checker
+ "too-many-arguments", # sure, but how can we change the signatures to take less arguments? artificially create objects with arguments? That's stupid...
]
[tool.pylint.SIMILARITIES]