from typing import List, Optional
from knot_resolver_manager.datamodel.policy_schema import ActionEnum
-from knot_resolver_manager.datamodel.types import AnyPath
+from knot_resolver_manager.datamodel.types import CheckedPath
from knot_resolver_manager.datamodel.view_schema import FlagsEnum
from knot_resolver_manager.utils import SchemaNode
class RPZSchema(SchemaNode):
action: ActionEnum
- file: AnyPath
+ file: CheckedPath
watch: bool = True
views: Optional[List[str]] = None
options: Optional[List[FlagsEnum]] = None
from typing_extensions import Literal
-from knot_resolver_manager.datamodel.types import CheckedPath, Listen, UncheckedPath
+from knot_resolver_manager.datamodel.types import CheckedPath, DomainName, Listen, UncheckedPath
from knot_resolver_manager.exceptions import DataException
from knot_resolver_manager.utils import SchemaNode
from knot_resolver_manager.utils.types import LiteralEnum
BackendEnum = LiteralEnum["auto", "systemd", "supervisord"]
+class WatchDogSchema(SchemaNode):
+ qname: DomainName
+ qtype: str
+
+
class ManagementSchema(SchemaNode):
"""
Configuration of the Manager itself.
listen: Specifies where does the manager listen with its API. Can't be changed in runtime!
backend: Forces manager to use a specific service manager. Defaults to autodetection.
rundir: Directory where the manager can create files and which will be manager's cwd
+ watchdog: Systemd watchdog configuration. Can only be used with systemd backend.
"""
# the default listen path here MUST use the default rundir
listen: Listen = Listen({"unix-socket": "./manager.sock"})
backend: BackendEnum = "auto"
rundir: UncheckedPath = UncheckedPath(".")
+ backend: BackendEnum = "auto"
+ watchdog: Union[Literal[False], WatchDogSchema] = False
+
+ def _validate(self) -> None:
+ if self.watchdog and self.backend not in ["auto", "systemd"]:
+ raise ValueError("'watchdog' can only be configured for 'systemd' backend")
class WebmgmtSchema(SchemaNode):
nsid.name('{{ cfg.server.nsid }}_' .. worker.id)
{% endif %}
+{% if cfg.server.management.watchdog -%}
+modules.load('watchdog')
+watchdog.config({ qname = '{{ cfg.server.management.watchdog.qname }}', qtype = kres.type.{{ cfg.server.management.watchdog.qtype }} })
+{% else %}
+modules.unload('watchdog')
+{%- endif %}
+
{% if cfg.server.webmgmt %}
-- server.webmgmt
modules.load('http')
--- /dev/null
+from pytest import raises
+
+from knot_resolver_manager.datamodel.server_schema import ManagementSchema
+from knot_resolver_manager.exceptions import KresdManagerException
+
+
+def test_management_watchdog():
+ assert ManagementSchema({"watchdog": {"qname": "nic.cz.", "qtype": "A"}})
+
+ with raises(KresdManagerException):
+ ManagementSchema({"backend": "supervisord", "watchdog": {"qname": "nic.cz.", "qtype": "A"}})