This project uses [`apkg`](https://gitlab.nic.cz/packaging/apkg) for packaging. See [`distro/README.md`](distro/README.md) for packaging specific instructions.
+Not yet properly implemented. Ideal situation would be a command like `poe package` which would create all possible packages.
+
+Temporary solution to build a wheel/sdist - just call `poetry build`. The result will be in the `dist/` directory.
+
## FAQ
### What all those dev dependencies for?
+++ /dev/null
-import itertools
-import weakref
-from typing import Optional
-
-from knot_resolver_manager.utils import ignore_exceptions_optional
-
-
-class KresID:
- """
- ID object. Effectively only a wrapper around an int, so that the references
- behave normally (bypassing integer interning and other optimizations)
- """
-
- def __init__(self, n: int):
- self._id = n
- self._repr: Optional[str] = None
-
- def set_custom_str_representation(self, representation: str):
- self._repr = representation
-
- def __str__(self) -> str:
- if self._repr is None:
- return str(self._id)
- else:
- return self._repr
-
- def __hash__(self) -> int:
- return self._id
-
- def __eq__(self, o: object) -> bool:
- return isinstance(o, KresID) and self._id == o._id
-
-
-_used: "weakref.WeakSet[KresID]" = weakref.WeakSet()
-
-
-def alloc() -> KresID:
- for i in itertools.count(start=1):
- val = KresID(i)
- if val not in _used:
- _used.add(val)
- return val
-
- raise RuntimeError("Reached an end of an infinite loop. How?")
-
-
-def alloc_from_string(val: str) -> KresID:
- int_val = ignore_exceptions_optional(int, None, ValueError)(int)(val)
- if int_val is not None:
- res = KresID(int_val)
- assert res not in _used
- _used.add(res)
- return res
- else:
- res = alloc()
- res.set_custom_str_representation(val)
- return res
import asyncio
+import itertools
import logging
+import weakref
from subprocess import SubprocessError
from typing import List, Optional, Type
-from knot_resolver_manager import kres_id
from knot_resolver_manager.constants import KRESD_CONFIG_FILE
from knot_resolver_manager.exceptions import ValidationException
from knot_resolver_manager.kresd_controller import get_best_controller_implementation
logger = logging.getLogger(__name__)
+class _PrettyID:
+ """
+ ID object. Effectively only a wrapper around an int, so that the references
+ behave normally (bypassing integer interning and other optimizations)
+ """
+
+ def __init__(self, n: int):
+ self._id = n
+
+ def __str__(self):
+ return str(self._id)
+
+ def __hash__(self) -> int:
+ return self._id
+
+ def __eq__(self, o: object) -> bool:
+ return isinstance(o, _PrettyID) and self._id == o._id
+
+
+class _PrettyIDAllocator:
+ """
+ Pretty numeric ID allocator. Keeps weak refences to the IDs it has
+ allocated. The IDs get recycled once the previously allocated ID
+ objects get garbage collected
+ """
+
+ def __init__(self):
+ self._used: "weakref.WeakSet[_PrettyID]" = weakref.WeakSet()
+
+ def alloc(self) -> _PrettyID:
+ for i in itertools.count(start=1):
+ val = _PrettyID(i)
+ if val not in self._used:
+ self._used.add(val)
+ return val
+
+ raise RuntimeError("Reached an end of an infinite loop. How?")
+
+
class KresManager:
"""
Core of the whole operation. Orchestrates individual instances under some
self._manager_lock = asyncio.Lock()
self._controller: SubprocessController
self._last_used_config: Optional[KresConfig] = None
+ self._id_allocator = _PrettyIDAllocator()
async def load_system_state(self):
async with self._manager_lock:
await self._collect_already_running_children()
async def _spawn_new_worker(self):
- subprocess = await self._controller.create_subprocess(SubprocessType.KRESD, kres_id.alloc())
+ subprocess = await self._controller.create_subprocess(SubprocessType.KRESD, self._id_allocator.alloc())
await subprocess.start()
self._workers.append(subprocess)
return self._gc is not None
async def _start_gc(self):
- subprocess = await self._controller.create_subprocess(SubprocessType.GC, kres_id.alloc())
+ subprocess = await self._controller.create_subprocess(SubprocessType.GC, "gc")
await subprocess.start()
self._gc = subprocess
from enum import Enum, auto
from typing import Iterable
-from knot_resolver_manager.kres_id import KresID
-
class SubprocessType(Enum):
KRESD = auto()
"""
raise NotImplementedError()
- async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: KresID) -> Subprocess:
+ async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: object) -> Subprocess:
"""
Return a Subprocess object which can be operated on. The subprocess is not
started or in any way active after this call. That has to be performaed manually
from typing import Any, Iterable, Set
from knot_resolver_manager.compat.asyncio import create_task
-from knot_resolver_manager.kres_id import KresID, alloc_from_string
from knot_resolver_manager.kresd_controller.interface import Subprocess, SubprocessController, SubprocessType
from .config import (
class SupervisordSubprocess(Subprocess):
- def __init__(self, controller: "SupervisordSubprocessController", id_: KresID, type_: SubprocessType):
+ def __init__(self, controller: "SupervisordSubprocessController", id_: object, type_: SubprocessType):
self._controller: "SupervisordSubprocessController" = controller
- self._id: KresID = id_
+ self._id = id_
self._type: SubprocessType = type_
@property
res = await is_supervisord_available()
if not res:
logger.info("Failed to find usable supervisord.")
-
- logger.debug("Detection - supervisord controller is available for use")
return res
async def _update_config_with_real_state(self):
if running:
ids = await list_ids_from_existing_config()
for tp, id_ in ids:
- self._running_instances.add(SupervisordSubprocess(self, alloc_from_string(id_), tp))
+ self._running_instances.add(SupervisordSubprocess(self, id_, tp))
async def get_all_running_instances(self) -> Iterable[Subprocess]:
await self._update_config_with_real_state()
assert subprocess in self._running_instances
await restart(subprocess.id)
- async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: KresID) -> Subprocess:
+ async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: object) -> Subprocess:
return SupervisordSubprocess(self, id_hint, subprocess_type)
import logging
import os
-from asyncio.futures import Future
from enum import Enum, auto
-from typing import Any, Callable, Coroutine, Dict, Iterable, List, Optional, Tuple
+from typing import Iterable, List
from knot_resolver_manager import compat
-from knot_resolver_manager.compat.asyncio import create_task
-from knot_resolver_manager.kres_id import KresID, alloc, alloc_from_string
from knot_resolver_manager.kresd_controller.interface import Subprocess, SubprocessController, SubprocessType
from knot_resolver_manager.utils.async_utils import call
-from knot_resolver_manager.utils.types import NoneType
from . import dbus_api as systemd
logger = logging.getLogger(__name__)
-_CallbackType = Callable[[], Coroutine[Any, NoneType, NoneType]]
-_callbacks: Dict[Tuple[systemd.SystemdType, str], _CallbackType] = dict()
-_dispatcher_task: "Optional[Future[NoneType]]" = None
-
-
-async def _monitor_unit_termination(type_: systemd.SystemdType) -> NoneType:
- dispatcher = systemd.UnitRemovedEventDispatcher(type_)
-
- try:
- dispatcher.start()
-
- while True:
- event = await dispatcher.next_event()
- if (type_, event) in _callbacks:
- await _callbacks[(type_, event)]()
- finally:
- dispatcher.stop()
-
-
-def _register_terminated_callback(type_: systemd.SystemdType, unit_name: str, callback: _CallbackType):
- assert (type_, unit_name) not in _callbacks
- _callbacks[(type_, unit_name)] = callback
-
- global _dispatcher_task
- if _dispatcher_task is None:
- _dispatcher_task = create_task(_monitor_unit_termination(type_))
-
-
-def _unregister_terminated_callback(type_: systemd.SystemdType, unit_name: str):
- del _callbacks[(type_, unit_name)]
-
-
class SystemdPersistanceType(Enum):
PERSISTENT = auto()
TRANSIENT = auto()
def __init__(
self,
type_: SubprocessType,
- id_: KresID,
+ id_: object,
systemd_type: systemd.SystemdType,
persistance_type: SystemdPersistanceType = SystemdPersistanceType.PERSISTENT,
- already_running: bool = False,
):
self._type = type_
- self._id: KresID = id_
+ self._id = id_
self._systemd_type = systemd_type
self._persistance_type = persistance_type
- if already_running:
- _register_terminated_callback(systemd_type, self.id, self._on_unexpected_termination)
-
@property
def id(self):
if self._type is SubprocessType.GC:
async def is_running(self) -> bool:
raise NotImplementedError()
- async def _on_unexpected_termination(self):
- logger.warning("Detected unexpected termination of unit %s", self.id)
-
async def start(self):
- _register_terminated_callback(self._systemd_type, self.id, self._on_unexpected_termination)
-
if self._persistance_type is SystemdPersistanceType.PERSISTENT:
await compat.asyncio.to_thread(systemd.start_unit, self._systemd_type, self.id)
elif self._persistance_type is SystemdPersistanceType.TRANSIENT:
await compat.asyncio.to_thread(systemd.start_transient_unit, self._systemd_type, self.id, self._type)
async def stop(self):
- _unregister_terminated_callback(self._systemd_type, self.id)
await compat.asyncio.to_thread(systemd.stop_unit, self._systemd_type, self.id)
async def restart(self):
):
self._systemd_type = systemd_type
self._persistance_type = persistance_type
- self._unit_removed_event_dispatcher = systemd.UnitRemovedEventDispatcher(systemd_type)
def __str__(self):
if self._systemd_type == systemd.SystemdType.SESSION:
if u.startswith("kresd") and u.endswith(".service"):
iden = u.replace("kresd", "")[1:].replace(".service", "")
persistance_type = SystemdPersistanceType.PERSISTENT if "@" in u else SystemdPersistanceType.TRANSIENT
- res.append(
- SystemdSubprocess(
- SubprocessType.KRESD,
- alloc_from_string(iden),
- self._systemd_type,
- persistance_type,
- already_running=True,
- )
- )
+ res.append(SystemdSubprocess(SubprocessType.KRESD, iden, self._systemd_type, persistance_type))
elif u == "kres-cache-gc.service":
- res.append(SystemdSubprocess(SubprocessType.GC, alloc(), self._systemd_type, already_running=True))
+ res.append(SystemdSubprocess(SubprocessType.GC, "", self._systemd_type))
return res
async def initialize_controller(self) -> None:
async def shutdown_controller(self) -> None:
pass
- async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: KresID) -> Subprocess:
+ async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: object) -> Subprocess:
return SystemdSubprocess(subprocess_type, id_hint, self._systemd_type, self._persistance_type)
from knot_resolver_manager.constants import KRES_CACHE_DIR, KRESD_CONFIG_FILE, RUNTIME_DIR
from knot_resolver_manager.exceptions import SubprocessControllerException
from knot_resolver_manager.kresd_controller.interface import SubprocessType
-from knot_resolver_manager.utils.async_utils import BlockingEventDispatcher
class SystemdType(Enum):
except Exception:
# if this fails in any way, we can assume that the unit is not properly loaded
return False
-
-
-class UnitRemovedEventDispatcher(BlockingEventDispatcher[str]):
- def __init__(self, type_: SystemdType) -> None:
- super().__init__(name="UnitRemovedEventDispatcher")
- self._systemd = _create_manager_proxy(type_)
- self._loop: Any = GLib.MainLoop()
-
- def _handler(self, name: str, _unit: Any):
- self.dispatch_event(name)
-
- def run(self) -> None:
- self._systemd.UnitRemoved.connect(self._handler)
- self._loop.run()
-
- def stop(self) -> None:
- if self.is_alive():
- self._loop.quit()
import time
from asyncio import create_subprocess_exec, create_subprocess_shell
from pathlib import PurePath
-from threading import Thread
-from typing import Generic, List, Optional, TypeVar, Union
+from typing import List, Optional, Union
from knot_resolver_manager.compat.asyncio import to_thread
async def read_resource(package: str, filename: str) -> Optional[bytes]:
return await to_thread(pkgutil.get_data, package, filename)
-
-
-T = TypeVar("T")
-
-
-class BlockingEventDispatcher(Thread, Generic[T]):
- def __init__(self, name: str = "blocking_event_dispatcher") -> None:
- super().__init__(name=name, daemon=True)
- # warning: the asyncio queue is not thread safe
- self._removed_unit_names: "asyncio.Queue[T]" = asyncio.Queue()
- self._main_event_loop = asyncio.get_event_loop()
-
- def dispatch_event(self, event: T):
- """
- Method to dispatch events from the blocking thread
- """
-
- async def add_to_queue():
- await self._removed_unit_names.put(event)
-
- self._main_event_loop.call_soon_threadsafe(add_to_queue)
-
- async def next_event(self) -> T:
- return await self._removed_unit_names.get()
"logging-fstring-interpolation", # see https://github.com/PyCQA/pylint/issues/1788
"no-else-raise", # not helpful for readability, when we want explicit branches
"raising-bad-type", # handled by type checker
- "too-many-arguments", # sure, but how can we change the signatures to take less arguments? artificially create objects with arguments? That's stupid...
]
[tool.pylint.SIMILARITIES]