import asyncio
from typing import Any, List, Type
+from uuid import uuid4
from knot_resolver_manager.constants import KRESD_CONFIG_FILE
-from knot_resolver_manager.kresd_controller import BaseKresdController, get_best_controller_implementation
+from knot_resolver_manager.kresd_controller import get_best_controller_implementation
+from knot_resolver_manager.kresd_controller.interface import Subprocess, SubprocessController, SubprocessType
from knot_resolver_manager.utils.async_utils import writefile
from .datamodel import KresConfig
await self.load_system_state()
def __init__(self):
- self._children: List[BaseKresdController] = []
+ self._children: List[Subprocess] = []
self._children_lock = asyncio.Lock()
- self._controller: Type[BaseKresdController]
+ self._controller: SubprocessController
async def load_system_state(self):
async with self._children_lock:
await self._collect_already_running_children()
async def _spawn_new_child(self):
- kresd = self._controller()
- await kresd.start()
- self._children.append(kresd)
+ subprocess = await self._controller.create_subprocess(SubprocessType.KRESD, str(uuid4()))
+ await subprocess.start()
+ self._children.append(subprocess)
async def _stop_a_child(self):
if len(self._children) == 0:
import asyncio
-from typing import Type
+import logging
+from typing import Tuple
-from knot_resolver_manager.kresd_controller.base import BaseKresdController
-from knot_resolver_manager.kresd_controller.supervisord import SupervisordKresdController
-from knot_resolver_manager.kresd_controller.systemd import SystemdKresdController
+from knot_resolver_manager.kresd_controller.interface import SubprocessController
+from knot_resolver_manager.kresd_controller.supervisord import SupervisordSubprocessController
+from knot_resolver_manager.kresd_controller.systemd import SystemdSubprocessController
+from knot_resolver_manager.kresd_controller.systemd.dbus_api import SystemdType
# In this tuple, every supported controller should be listed. In the order of preference (preferred first)
-_registered_controllers = (SystemdKresdController, SupervisordKresdController)
+_registered_controllers: Tuple[SubprocessController, ...] = (
+ SystemdSubprocessController(SystemdType.SESSION),
+ SystemdSubprocessController(SystemdType.SYSTEM),
+ SupervisordSubprocessController(),
+)
+logger = logging.getLogger(__name__)
-async def get_best_controller_implementation() -> Type[BaseKresdController]:
+
+async def get_best_controller_implementation() -> SubprocessController:
# check all controllers concurrently
res = await asyncio.gather(*(cont.is_controller_available() for cont in _registered_controllers))
# take the first one on the list which is available
for avail, controller in zip(res, _registered_controllers):
if avail:
+ logger.info("Selected controller '%s'", str(controller))
return controller
# or fail
+++ /dev/null
-import asyncio
-from typing import Iterable, Optional
-from uuid import uuid4
-
-
-class BaseKresdController:
- """
- The common Kresd Controller interface. This is what KresManager requires and what has to be implemented by all
- controllers.
- """
-
- def __init__(self, kresd_id: Optional[str] = None):
- self._lock = asyncio.Lock()
- self.id: str = kresd_id or str(uuid4())
-
- @staticmethod
- async def is_controller_available() -> bool:
- raise NotImplementedError()
-
- async def is_running(self) -> bool:
- raise NotImplementedError()
-
- async def start(self) -> None:
- raise NotImplementedError()
-
- async def stop(self) -> None:
- raise NotImplementedError()
-
- async def restart(self) -> None:
- raise NotImplementedError()
-
- @staticmethod
- async def get_all_running_instances() -> Iterable["BaseKresdController"]:
- raise NotImplementedError()
-
- @staticmethod
- async def initialize_controller() -> None:
- raise NotImplementedError()
--- /dev/null
+from enum import Enum, auto
+from typing import Iterable
+
+
+class SubprocessType(Enum):
+ KRESD = auto()
+ GC = auto()
+
+
+class Subprocess:
+ """
+ One SubprocessInstance corresponds to one manager's subprocess
+ """
+
+ @property
+ def type(self) -> SubprocessType:
+ raise NotImplementedError()
+
+ @property
+ def id(self) -> str:
+ raise NotImplementedError()
+
+ async def is_running(self) -> bool:
+ raise NotImplementedError()
+
+ async def start(self) -> None:
+ raise NotImplementedError()
+
+ async def stop(self) -> None:
+ raise NotImplementedError()
+
+ async def restart(self) -> None:
+ raise NotImplementedError()
+
+ def __eq__(self, o: object) -> bool:
+ return isinstance(o, type(self)) and o.type == self.type and o.id == self.id
+
+ def __hash__(self) -> int:
+ return hash(type(self)) ^ hash(self.type) ^ hash(self.id)
+
+
+class SubprocessController:
+ """
+ The common Subprocess Controller interface. This is what KresManager requires and what has to be implemented by all
+ controllers.
+ """
+
+ async def is_controller_available(self) -> bool:
+ raise NotImplementedError()
+
+ async def get_all_running_instances(self) -> Iterable[Subprocess]:
+ raise NotImplementedError()
+
+ async def initialize_controller(self) -> None:
+ """
+ Should be called when we want to really start using the controller.
+ """
+ raise NotImplementedError()
+
+ async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: str) -> Subprocess:
+ """
+ Return a Subprocess object which can be operated on. The subprocess is not
+ started or in any way active after this call. That has to be performaed manually
+ using the returned object itself.
+
+ Must NOT be called before initialize_controller()
+ """
+ raise NotImplementedError()
import logging
-from typing import Iterable
+from typing import Iterable, Set
-from knot_resolver_manager.kresd_controller.base import BaseKresdController
+from knot_resolver_manager.kresd_controller.interface import Subprocess, SubprocessController, SubprocessType
from .config import (
SupervisordConfig,
+ create_id,
is_supervisord_available,
is_supervisord_running,
list_ids_from_existing_config,
logger = logging.getLogger(__name__)
-class SupervisordKresdController(BaseKresdController):
- # ignore the type issue bellow. It's valid, but the type-checker does not understand dataclasses
- _config = SupervisordConfig(instances=[]) # type: ignore
+class SupervisordSubprocess(Subprocess):
+ def __init__(self, controller: "SupervisordSubprocessController", id_: str, type_: SubprocessType):
+ self._controller: "SupervisordSubprocessController" = controller
+ self._id: str = id_
+ self._type: SubprocessType = type_
+
+ @property
+ def type(self) -> SubprocessType:
+ return self._type
+
+ @property
+ def id(self) -> str:
+ return create_id(self._type, self._id)
async def is_running(self) -> bool:
- return self.id in SupervisordKresdController._config.instances
+ return self._controller.should_be_running(self)
+
+ async def start(self) -> None:
+ return await self._controller.start_subprocess(self)
- async def start(self):
- # note: O(n) test, but the number of instances will be very small
- if self.id in SupervisordKresdController._config.instances:
- raise RuntimeError("Can't start an instance with the same ID as already started instance")
+ async def stop(self) -> None:
+ return await self._controller.stop_subprocess(self)
- SupervisordKresdController._config.instances.append(self.id)
- await update_config(SupervisordKresdController._config)
+ async def restart(self) -> None:
+ return await self._controller.restart_subprocess(self)
- async def stop(self):
- # note: O(n) test, but the number of instances will be very small
- if self.id not in SupervisordKresdController._config.instances:
- raise RuntimeError("Can't stop an instance that is not started")
- SupervisordKresdController._config.instances.remove(self.id)
- await update_config(SupervisordKresdController._config)
+class SupervisordSubprocessController(SubprocessController):
+ def __init__(self):
+ self._running_instances: Set[SupervisordSubprocess] = set()
- async def restart(self):
- # note: O(n) test, but the number of instances will be very small
- if self.id not in SupervisordKresdController._config.instances:
- raise RuntimeError("Can't restart an instance that is not started")
+ def __str__(self):
+ return type(self).__name__
- await restart(self.id)
+ def should_be_running(self, subprocess: SupervisordSubprocess):
+ return subprocess in self._running_instances
- @staticmethod
- async def is_controller_available() -> bool:
- return await is_supervisord_available()
+ async def is_controller_available(self) -> bool:
+ res = await is_supervisord_available()
+ if not res:
+ logger.info("Failed to find usable supervisord.")
+ return res
- @staticmethod
- async def get_all_running_instances() -> Iterable["BaseKresdController"]:
+ async def _update_config_with_real_state(self):
running = await is_supervisord_running()
if running:
ids = await list_ids_from_existing_config()
- return [SupervisordKresdController(id) for id in ids]
- else:
- return []
+ for tp, id_ in ids:
+ self._running_instances.add(SupervisordSubprocess(self, id_, tp))
+
+ async def get_all_running_instances(self) -> Iterable[Subprocess]:
+ await self._update_config_with_real_state()
+ return iter(self._running_instances)
+
+ def _create_config(self) -> SupervisordConfig:
+ return SupervisordConfig(instances=self._running_instances) # type: ignore
- @staticmethod
- async def initialize_controller() -> None:
+ async def initialize_controller(self) -> None:
if not await is_supervisord_running():
- await start_supervisord(SupervisordKresdController._config)
+ config = self._create_config()
+ await start_supervisord(config)
+
+ async def start_subprocess(self, subprocess: SupervisordSubprocess):
+ assert subprocess not in self._running_instances
+ self._running_instances.add(subprocess)
+ await update_config(self._create_config())
+
+ async def stop_subprocess(self, subprocess: SupervisordSubprocess):
+ assert subprocess in self._running_instances
+ self._running_instances.remove(subprocess)
+ await update_config(self._create_config())
+
+ async def restart_subprocess(self, subprocess: SupervisordSubprocess):
+ assert subprocess in self._running_instances
+ await restart(subprocess.id)
+
+ async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: str) -> Subprocess:
+ return SupervisordSubprocess(self, id_hint, subprocess_type)
+import configparser
import os.path
import signal
from os import kill
from pathlib import Path
-from typing import List
+from typing import List, Set, Tuple
from jinja2 import Template
from knot_resolver_manager.compat.dataclasses import dataclass
+from knot_resolver_manager.kresd_controller.interface import Subprocess, SubprocessType
from knot_resolver_manager.utils.async_utils import call, readfile, wait_for_process_termination, writefile
CONFIG_FILE = "/tmp/knot-resolver-manager-supervisord.conf"
@dataclass
class SupervisordConfig:
- instances: List[str]
+ instances: Set[Subprocess]
unix_http_server: str = SERVER_SOCK
pid_file: str = PID_FILE
return False
-async def list_ids_from_existing_config() -> List[str]:
+def create_id(type_name: SubprocessType, id_: str) -> str:
+ return f"{type_name.name}_{id_}"
+
+
+def parse_id(id_: str) -> Tuple[SubprocessType, str]:
+ tp, id_ = id_.split("_", maxsplit=1)
+ return (SubprocessType[tp], id_)
+
+
+async def list_ids_from_existing_config() -> List[Tuple[SubprocessType, str]]:
config = await readfile(CONFIG_FILE)
- res: List[str] = []
- for line in config.splitlines():
- if line.startswith("[program:"):
- id_ = line.replace("[program:", "").replace("]", "").strip()
- res.append(id_)
+ cp = configparser.ConfigParser()
+ cp.read_string(config)
+
+ res: List[Tuple[SubprocessType, str]] = []
+ for section in cp.sections():
+ if section.startswith("program:"):
+ program_id = section.replace("program:", "")
+ res.append(parse_id(program_id))
return res
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
-{% for id in config.instances -%}
-[program:{{ id }}]
+
+
+{% for instance in config.instances %}
+
+[program:{{ instance.id }}]
+
+{%- if instance.type.name == "KRESD" %}
+
directory=/var/lib/knot-resolver
command=/usr/sbin/kresd -c /usr/lib/knot-resolver/distro-preconfig.lua -c /etc/knot-resolver/kresd.conf -n
-environment=SYSTEMD_INSTANCE={{ id }}
+environment=SYSTEMD_INSTANCE={{ instance.id }}
+{%- elif instance.type.name == "GC" %}
+
+directory=/var/lib/knot-resolver
+command=/usr/sbin/kres-cache-gc -c /var/cache/knot-resolver -d 1000
+
+{%- else %}
+
+{# other subprocess types are not implemented, fail #}
+{{ 0 / 0 }}
+
+{% endif %}
-{% endfor %}
\ No newline at end of file
+{%- endfor -%}
\ No newline at end of file
from typing import Iterable, List
from knot_resolver_manager import compat
-from knot_resolver_manager.kresd_controller.base import BaseKresdController
+from knot_resolver_manager.kresd_controller.interface import Subprocess, SubprocessController, SubprocessType
from knot_resolver_manager.utils.async_utils import call
from . import dbus_api as systemd
logger = logging.getLogger(__name__)
-class SystemdKresdController(BaseKresdController):
+class SystemdSubprocess(Subprocess):
+ def __init__(self, type_: SubprocessType, id_: str, systemd_type: systemd.SystemdType):
+ self._type = type_
+ self._id = id_
+ self._systemd_type = systemd_type
+
+ @property
+ def id(self):
+ return self._id
+
+ @property
+ def type(self):
+ return self._type
+
async def is_running(self) -> bool:
raise NotImplementedError()
async def start(self):
- await compat.asyncio.to_thread(systemd.start_unit, f"kresd@{self.id}.service")
+ await compat.asyncio.to_thread(systemd.start_unit, self._systemd_type, f"kresd@{self.id}.service")
async def stop(self):
- await compat.asyncio.to_thread(systemd.stop_unit, f"kresd@{self.id}.service")
+ await compat.asyncio.to_thread(systemd.stop_unit, self._systemd_type, f"kresd@{self.id}.service")
async def restart(self):
- await compat.asyncio.to_thread(systemd.restart_unit, f"kresd@{self.id}.service")
+ await compat.asyncio.to_thread(systemd.restart_unit, self._systemd_type, f"kresd@{self.id}.service")
+
- @staticmethod
- async def is_controller_available() -> bool:
+class SystemdSubprocessController(SubprocessController):
+ def __init__(self, systemd_type: systemd.SystemdType):
+ self._systemd_type = systemd_type
+
+ def __str__(self):
+ if self._systemd_type == systemd.SystemdType.SESSION:
+ return "SystemdController(SESSION)"
+ elif self._systemd_type == systemd.SystemdType.SYSTEM:
+ return "SystemdController(SYSTEM)"
+ else:
+ raise NotImplementedError("unknown systemd type")
+
+ async def is_controller_available(self) -> bool:
# try to run systemctl (should be quite fast)
- ret = await call("systemctl status", shell=True, discard_output=True)
+ cmd = f"systemctl {'--user' if self._systemd_type == systemd.SystemdType.SESSION else ''} status"
+ ret = await call(cmd, shell=True, discard_output=True)
if ret != 0:
+ logger.info(
+ "Calling '%s' failed. Assumming systemd (%s) is not running/installed.", cmd, self._systemd_type
+ )
return False
# if that passes, try to list units
try:
- _ = await compat.asyncio.to_thread(systemd.list_units)
+ if not compat.asyncio.to_thread(
+ systemd.has_some_exec_start_commands, self._systemd_type, "kresd@1.service"
+ ):
+ logger.info("Systemd (%s) accessible, but no 'kresd@.service' unit detected.", self._systemd_type)
+ return False
+
return True
except BaseException: # we want every possible exception to be caught
- logger.warning("systemd DBus API backend failed to initialize")
+ logger.warning("Communicating with systemd DBus API failed", exc_info=True)
return False
- @staticmethod
- async def get_all_running_instances() -> Iterable["BaseKresdController"]:
- res: List[SystemdKresdController] = []
- units = await compat.asyncio.to_thread(systemd.list_units)
+ async def get_all_running_instances(self) -> Iterable[Subprocess]:
+ res: List[SystemdSubprocess] = []
+ units = await compat.asyncio.to_thread(systemd.list_units, self._systemd_type)
for unit in units:
u: str = unit
if u.startswith("kresd@") and u.endswith(".service"):
iden = u.replace("kresd@", "").replace(".service", "")
- res.append(SystemdKresdController(kresd_id=iden))
+ res.append(SystemdSubprocess(SubprocessType.KRESD, iden, self._systemd_type))
return res
- @staticmethod
- async def initialize_controller() -> None:
+ async def initialize_controller(self) -> None:
pass
+
+ async def create_subprocess(self, subprocess_type: SubprocessType, id_hint: str) -> Subprocess:
+ return SystemdSubprocess(subprocess_type, id_hint, self._systemd_type)
# pyright: reportUnknownMemberType=false
# pyright: reportMissingTypeStubs=false
+from enum import Enum, auto
from threading import Thread
from typing import Any, List, Union
from gi.repository import GLib
from pydbus import SystemBus
+from pydbus.bus import SessionBus
from typing_extensions import Literal
# ugly global result variable, but this module will be used once in a every
result_state = None
+class SystemdType(Enum):
+ SYSTEM = auto()
+ SESSION = auto()
+
+
class SystemdException(Exception):
pass
-def _create_manager_interface() -> Any:
- bus: Any = SystemBus()
+def _create_manager_interface(type_: SystemdType) -> Any:
+ bus: Any = SystemBus() if type_ is SystemdType.SYSTEM else SessionBus()
systemd = bus.get(".systemd1")
return systemd
def get_unit_file_state(
+ type_: SystemdType,
unit_name: str,
) -> Union[Literal["disabled"], Literal["enabled"]]:
- res = str(_create_manager_interface().GetUnitFileState(unit_name))
+ res = str(_create_manager_interface(type_).GetUnitFileState(unit_name))
assert res == "disabled" or res == "enabled"
return res
-def list_units() -> List[str]:
- return [str(u[0]) for u in _create_manager_interface().ListUnits()]
+def list_units(type_: SystemdType) -> List[str]:
+ return [str(u[0]) for u in _create_manager_interface(type_).ListUnits()]
-def restart_unit(unit_name: str):
- systemd = _create_manager_interface()
+def restart_unit(type_: SystemdType, unit_name: str):
+ systemd = _create_manager_interface(type_)
job = systemd.RestartUnit(unit_name, "fail")
_wait_for_job_completion(systemd, job)
-def start_unit(unit_name: str):
- systemd = _create_manager_interface()
+def start_unit(type_: SystemdType, unit_name: str):
+ systemd = _create_manager_interface(type_)
job = systemd.StartUnit(unit_name, "fail")
_wait_for_job_completion(systemd, job)
-def stop_unit(unit_name: str):
- systemd = _create_manager_interface()
+def stop_unit(type_: SystemdType, unit_name: str):
+ systemd = _create_manager_interface(type_)
job = systemd.StopUnit(unit_name, "fail")
_wait_for_job_completion(systemd, job)
+
+
+def list_unit_files(type_: SystemdType) -> List[str]:
+ systemd = _create_manager_interface(type_)
+ files = systemd.ListUnitFiles()
+ return [str(x[0]) for x in files]
+
+
+def has_some_exec_start_commands(type_: SystemdType, unit_name: str) -> bool:
+ systemd = _create_manager_interface(type_)
+ try:
+ unit_object = systemd.LoadUnit(unit_name)
+ return len(unit_object.ExecStart) != 0
+ except Exception:
+ # if this fails in any way, we can assume that the unit is not properly loaded
+ return False
import asyncio
import logging
+import sys
from http import HTTPStatus
from pathlib import Path
from time import time
"""
Called asynchronously when the application initializes.
"""
- # Create KresManager. This will perform autodetection of available service managers and
- # select the most appropriate to use
- manager = await KresManager.create()
- app[_MANAGER] = manager
-
- # Initial static configuration of the manager
- # optional step, could be skipped
- if config_path is not None:
- if not config_path.exists():
- logger.warning(
- "Manager is configured to load config file at %s on startup, but the file does not exist.",
- config_path,
- )
- else:
- initial_config = KresConfig.from_yaml(await readfile(config_path))
- await manager.apply_config(initial_config)
-
- logger.info("Process manager initialized...")
+ try:
+ # Create KresManager. This will perform autodetection of available service managers and
+ # select the most appropriate to use
+ manager = await KresManager.create()
+ app[_MANAGER] = manager
+
+ # Initial static configuration of the manager
+ # optional step, could be skipped
+ if config_path is not None:
+ if not config_path.exists():
+ logger.warning(
+ "Manager is configured to load config file at %s on startup, but the file does not exist.",
+ config_path,
+ )
+ else:
+ initial_config = KresConfig.from_yaml(await readfile(config_path))
+ await manager.apply_config(initial_config)
+
+ logger.info("Process manager initialized...")
+ except BaseException:
+ logger.error("Manager initialization failed... Shutting down!", exc_info=True)
+ sys.exit(1)
app.on_startup.append(init_manager)
-from typing import Any, Callable, Optional, Type, TypeVar
+from typing import Any, Callable, Iterable, Optional, Type, TypeVar
from .dataclasses_parservalidator import DataclassParserValidatorMixin, ValidationException
from .overload import Overloaded
return ignore_exceptions_optional(type(default), default, *exceptions)
+def foldl(oper: Callable[[T, T], T], default: T, arr: Iterable[T]) -> T:
+ val = default
+ for x in arr:
+ val = oper(val, x)
+ return val
+
+
+def contains_element_matching(cond: Callable[[T], bool], arr: Iterable[T]) -> bool:
+ return foldl(lambda x, y: x or y, False, map(cond, arr))
+
+
__all__ = [
"ignore_exceptions_optional",
"ignore_exceptions",
[[package]]
name = "apkg"
-version = "0.0.5.dev8+g272a73b"
-description = "minimalist cross-distro packaging automation tool"
+version = "0.1.1.dev1+g08244e6"
+description = "cross-distro upstream packaging automation tool"
category = "dev"
optional = false
python-versions = "*"
[package.dependencies]
blessings = "*"
cached-property = "*"
+click = "*"
distro = "*"
-docopt = "*"
htmllistparse = "*"
jinja2 = "*"
packaging = "*"
type = "git"
url = "https://gitlab.nic.cz/packaging/apkg.git"
reference = "master"
-resolved_reference = "272a73b7aab7cadf6d4fd2bb06a351145d88920b"
+resolved_reference = "08244e6e0b5842931a5ab27ba976c50e66d887d7"
[[package]]
name = "appdirs"
optional = false
python-versions = "*"
-[[package]]
-name = "docopt"
-version = "0.6.2"
-description = "Pythonic argument parser, that will make you smile"
-category = "dev"
-optional = false
-python-versions = "*"
-
[[package]]
name = "docutils"
version = "0.17.1"
python-dateutil = ">=2.6.0"
"ruamel.yaml" = ">=0.14.2"
+[[package]]
+name = "supervisor"
+version = "4.2.2"
+description = "A system for controlling process state under UNIX"
+category = "dev"
+optional = false
+python-versions = "*"
+
+[package.extras]
+testing = ["pytest", "pytest-cov"]
+
[[package]]
name = "toml"
version = "0.10.2"
[metadata]
lock-version = "1.1"
python-versions = "^3.6.12"
-content-hash = "ab70f3d6702baa7a8567eea9d58e0b5c9650e5c6b1c786309604dbabe8d44af6"
+content-hash = "32f7e392d56071f55fff1d440f2c90327b26caa637e91d93601dce19fcd48a8e"
[metadata.files]
aiohttp = [
{file = "distro-1.5.0-py2.py3-none-any.whl", hash = "sha256:df74eed763e18d10d0da624258524ae80486432cd17392d9c3d96f5e83cd2799"},
{file = "distro-1.5.0.tar.gz", hash = "sha256:0e58756ae38fbd8fc3020d54badb8eae17c5b9dcbed388b17bb55b8a5928df92"},
]
-docopt = [
- {file = "docopt-0.6.2.tar.gz", hash = "sha256:49b3a825280bd66b3aa83585ef59c4a8c82f2c8a522dbe754a8bc8d08c85c491"},
-]
docutils = [
{file = "docutils-0.17.1-py2.py3-none-any.whl", hash = "sha256:cf316c8370a737a022b72b56874f6602acf974a37a9fba42ec2876387549fc61"},
{file = "docutils-0.17.1.tar.gz", hash = "sha256:686577d2e4c32380bb50cbb22f575ed742d58168cee37e99117a854bcd88f125"},
strictyaml = [
{file = "strictyaml-1.4.0.tar.gz", hash = "sha256:9f4e5814f96e17e82044a7dbbb8a0976048f2841b073e8114c2a2c025e7f81b9"},
]
+supervisor = [
+ {file = "supervisor-4.2.2-py2.py3-none-any.whl", hash = "sha256:4adf63c8f18cf42313171ce06a73c9ae2c5e88ef27c2bb0de3b8405368595c04"},
+ {file = "supervisor-4.2.2.tar.gz", hash = "sha256:5b2b8882ec8a3c3733cce6965cc098b6d80b417f21229ab90b18fe551d619f90"},
+]
toml = [
{file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"},
{file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"},
debugpy = "^1.2.1"
apkg = { git = "https://gitlab.nic.cz/packaging/apkg.git", branch = "master" }
Sphinx = "^4.0.2"
+supervisor = "^4.2.2"
[tool.poe.tasks]
run = { cmd = "scripts/run", help = "Run the manager" }
f"--volume={str(src)}:{str(dst)}:O"
for src, dst in ro_mounts.items()
]
- command = ["podman", "run", "--rm", "-d", *options, image]
+ command = ["podman", "run", "--rm", "-d", "--security-opt=seccomp=unconfined", *options, image]
proc = subprocess.run(
command, shell=False, executable=PODMAN_EXECUTABLE, stdout=subprocess.PIPE
)
def _build(tag: str):
- command = ["podman", "build", "-f", str(GIT_ROOT / "containers" / tag / "Containerfile"), "-t", _full_name_from_tag(tag), str(GIT_ROOT)]
+ command = ["podman", "build", "--security-opt=seccomp=unconfined", "-f", str(GIT_ROOT / "containers" / tag / "Containerfile"), "-t", _full_name_from_tag(tag), str(GIT_ROOT)]
ret = subprocess.call(command, shell=False, executable=PODMAN_EXECUTABLE)
assert ret == 0
--- /dev/null
+#!/bin/bash
+
+# ensure consistent behaviour
+src_dir="$(dirname "$(realpath "$0")")"
+source $src_dir/_env.sh
+
+
+# Debian 10 packaging
+if [[ -f "/etc/debian_version" && "$(cat /etc/debian_version)" == 10.* ]] ; then
+ # if running on debian 10, just run the packaging process
+
+ # if possible, make sure we use the latest apkg
+ if [[ "$(id -u)" == "0" ]]; then
+ python3 -m pip install -U apkg
+ apt update
+ fi
+
+ apkg build -i
+elif [[ "${container:-}" != "podman" ]]; then
+ # if not running debian 10 and not running inside a container, launch a build container and try again
+ poe container run -c --artifact /code/pkg/pkgs/debian-10:pkg/pkgs/debian-10 --artifact /code/pkg/srcpkgs/debian-10:pkg/srcpkgs/debian-10 -- debian10 scripts/package
+else
+ echo -e "${red}Debian 10 package creation skipped...${reset}"
+fi
\ No newline at end of file
--- /dev/null
+from knot_resolver_manager.utils import containsElementMatching, foldl
+
+
+def test_foldl():
+ lst = list(range(10))
+
+ assert foldl(lambda x, y: x + y, 0, lst) == sum(range(10))
+ assert foldl(lambda x, y: x + y, 55, lst) == sum(range(10)) + 55
+
+
+def test_containsElementMatching():
+ lst = list(range(10))
+
+ assert containsElementMatching(lambda e: e == 5, lst)
+ assert not containsElementMatching(lambda e: e == 11, lst)