+++ /dev/null
-from typing import List, Optional, Union
-
-from knot_resolver.datamodel.forward_schema import ForwardServerSchema
-from knot_resolver.datamodel.network_schema import AddressRenumberingSchema
-from knot_resolver.datamodel.types import (
- DNSRecordTypeEnum,
- IPAddressOptionalPort,
- PolicyActionEnum,
- PolicyFlagEnum,
- TimeUnit,
-)
-from knot_resolver.utils.modeling import ConfigSchema
-
-
-class FilterSchema(ConfigSchema):
- """
- Query filtering configuration.
-
- ---
- suffix: Filter based on the suffix of the query name.
- pattern: Filter based on the pattern that match query name.
- qtype: Filter based on the DNS query type.
- """
-
- suffix: Optional[str] = None
- pattern: Optional[str] = None
- qtype: Optional[DNSRecordTypeEnum] = None
-
-
-class AnswerSchema(ConfigSchema):
- """
- Configuration of custom resource record for DNS answer.
-
- ---
- rtype: Type of DNS resource record.
- rdata: Data of DNS resource record.
- ttl: Time-to-live value for defined answer.
- nodata: Answer with NODATA If requested type is not configured in the answer. Otherwise policy rule is ignored.
- """
-
- rtype: DNSRecordTypeEnum
- rdata: str
- ttl: TimeUnit = TimeUnit("1s")
- nodata: bool = False
-
-
-def _validate_policy_action(policy_action: Union["ActionSchema", "PolicySchema"]) -> None:
- servers = ["mirror", "forward", "stub"]
-
- def _field(ac: str) -> str:
- if ac in servers:
- return "servers"
- return "message" if ac == "deny" else ac
-
- configurable_actions = ["deny", "reroute", "answer"] + servers
-
- # checking for missing mandatory fields for actions
- field = _field(policy_action.action)
- if policy_action.action in configurable_actions and not getattr(policy_action, field):
- raise ValueError(f"missing mandatory field '{field}' for '{policy_action.action}' action")
-
- # checking for unnecessary fields
- for ac in configurable_actions + ["deny"]:
- field = _field(ac)
- if getattr(policy_action, field) and _field(policy_action.action) != field:
- raise ValueError(f"'{field}' field can only be defined for '{ac}' action")
-
- # ForwardServerSchema is valid only for 'forward' action
- if policy_action.servers:
- for server in policy_action.servers: # pylint: disable=not-an-iterable
- if policy_action.action != "forward" and isinstance(server, ForwardServerSchema):
- raise ValueError(
- f"'ForwardServerSchema' in 'servers' is valid only for 'forward' action, got '{policy_action.action}'"
- )
-
-
-class ActionSchema(ConfigSchema):
- """
- Configuration of policy action.
-
- ---
- action: Policy action.
- message: Deny message for 'deny' action.
- reroute: Configuration for 'reroute' action.
- answer: Answer definition for 'answer' action.
- servers: Servers configuration for 'mirror', 'forward' and 'stub' action.
- """
-
- action: PolicyActionEnum
- message: Optional[str] = None
- reroute: Optional[List[AddressRenumberingSchema]] = None
- answer: Optional[AnswerSchema] = None
- servers: Optional[Union[List[IPAddressOptionalPort], List[ForwardServerSchema]]] = None
-
- def _validate(self) -> None:
- _validate_policy_action(self)
-
-
-class PolicySchema(ConfigSchema):
- """
- Configuration of policy rule.
-
- ---
- action: Policy rule action.
- priority: Policy rule priority.
- filter: Query filtering configuration.
- views: Use policy rule only for clients defined by views.
- options: Configuration flags for policy rule.
- message: Deny message for 'deny' action.
- reroute: Configuration for 'reroute' action.
- answer: Answer definition for 'answer' action.
- servers: Servers configuration for 'mirror', 'forward' and 'stub' action.
- """
-
- action: PolicyActionEnum
- priority: Optional[int] = None
- filter: Optional[FilterSchema] = None
- views: Optional[List[str]] = None
- options: Optional[List[PolicyFlagEnum]] = None
- message: Optional[str] = None
- reroute: Optional[List[AddressRenumberingSchema]] = None
- answer: Optional[AnswerSchema] = None
- servers: Optional[Union[List[IPAddressOptionalPort], List[ForwardServerSchema]]] = None
-
- def _validate(self) -> None:
- _validate_policy_action(self)
+++ /dev/null
-from typing import List, Optional
-
-from knot_resolver.datamodel.types import PolicyActionEnum, PolicyFlagEnum, ReadableFile
-from knot_resolver.utils.modeling import ConfigSchema
-
-
-class RPZSchema(ConfigSchema):
- """
- Configuration or Response Policy Zone (RPZ).
-
- ---
- action: RPZ rule action, typically 'deny'.
- file: Path to the RPZ zone file.
- watch: Reload the file when it changes.
- views: Use RPZ rule only for clients defined by views.
- options: Configuration flags for RPZ rule.
- message: Deny message for 'deny' action.
- """
-
- action: PolicyActionEnum
- file: ReadableFile
- watch: bool = True
- views: Optional[List[str]] = None
- options: Optional[List[PolicyFlagEnum]] = None
- message: Optional[str] = None
-
- def _validate(self) -> None:
- if self.message and not self.action == "deny":
- raise ValueError("'message' field can only be defined for 'deny' action")
+++ /dev/null
-from typing import List, Literal, Optional
-
-from knot_resolver.datamodel.policy_schema import ActionSchema
-from knot_resolver.utils.modeling import ConfigSchema
-
-
-class SliceSchema(ConfigSchema):
- """
- Split the entire DNS namespace into distinct slices.
-
- ---
- function: Slicing function that returns index based on query
- views: Use this Slice only for clients defined by views.
- actions: Actions for slice.
- """
-
- function: Literal["randomize-psl"] = "randomize-psl"
- views: Optional[List[str]] = None
- actions: List[ActionSchema]
+++ /dev/null
-from typing import Dict, List, Optional
-
-from knot_resolver.datamodel.types import DomainName, IPAddress, ReadableFile, TimeUnit
-from knot_resolver.utils.modeling import ConfigSchema
-
-
-class StaticHintsSchema(ConfigSchema):
- """
- Static hints for forward records (A/AAAA) and reverse records (PTR)
-
- ---
- ttl: TTL value used for records added from static hints.
- nodata: Use NODATA synthesis. NODATA will be synthesised for matching hint name, but mismatching type.
- etc_hosts: Add hints from '/etc/hosts' file.
- root_hints: Direct addition of root hints pairs (hostname, list of addresses).
- root_hints_file: Path to root hints in zonefile. Replaces all current root hints.
- hints: Direct addition of hints pairs (hostname, list of addresses).
- hints_files: Path to hints in hosts-like file.
- """
-
- ttl: Optional[TimeUnit] = None
- nodata: bool = True
- etc_hosts: bool = False
- root_hints: Optional[Dict[DomainName, List[IPAddress]]] = None
- root_hints_file: Optional[ReadableFile] = None
- hints: Optional[Dict[DomainName, List[IPAddress]]] = None
- hints_files: Optional[List[ReadableFile]] = None
+++ /dev/null
-from typing import List, Optional, Union
-
-from knot_resolver.datamodel.types import DomainName, IPAddressOptionalPort, PolicyFlagEnum
-from knot_resolver.utils.modeling import ConfigSchema
-
-
-class StubServerSchema(ConfigSchema):
- """
- Configuration of Stub server.
-
- ---
- address: IP address of Stub server.
- """
-
- address: IPAddressOptionalPort
-
-
-class StubZoneSchema(ConfigSchema):
- """
- Configuration of Stub Zone.
-
- ---
- subtree: Domain name of the zone.
- servers: IP address of Stub server.
- views: Use this Stub Zone only for clients defined by views.
- options: Configuration flags for Stub Zone.
- """
-
- subtree: DomainName
- servers: Union[List[IPAddressOptionalPort], List[StubServerSchema]]
- views: Optional[List[str]] = None
- options: Optional[List[PolicyFlagEnum]] = None
+++ /dev/null
-{% if cfg.static_hints.etc_hosts or cfg.static_hints.root_hints_file or cfg.static_hints.hints_files or cfg.static_hints.root_hints or cfg.static_hints.hints %}
-modules.load('hints > iterate')
-
-{% if cfg.static_hints.ttl %}
--- static-hints.ttl
-hints.ttl({{ cfg.static_hints.ttl.seconds()|string }})
-{% endif %}
-
--- static-hints.no-data
-hints.use_nodata({{ 'true' if cfg.static_hints.nodata else 'false' }})
-
-{% if cfg.static_hints.etc_hosts %}
--- static-hints.etc-hosts
-hints.add_hosts('/etc/hosts')
-{% endif %}
-
-{% if cfg.static_hints.root_hints_file %}
--- static-hints.root-hints-file
-hints.root_file('{{ cfg.static_hints.root_hints_file }}')
-{% endif %}
-
-{% if cfg.static_hints.hints_files %}
--- static-hints.hints-files
-{% for item in cfg.static_hints.hints_files %}
-hints.add_hosts('{{ item }}')
-{% endfor %}
-{% endif %}
-
-{% if cfg.static_hints.root_hints %}
--- static-hints.root-hints
-hints.root({
-{% for name, addrs in cfg.static_hints.root_hints.items() %}
-['{{ name.punycode() }}'] = {
-{% for addr in addrs %}
- '{{ addr }}',
-{% endfor %}
- },
-{% endfor %}
-})
-{% endif %}
-
-{% if cfg.static_hints.hints %}
--- static-hints.hints
-{% for name, addrs in cfg.static_hints.hints.items() %}
-{% for addr in addrs %}
-hints.set('{{ name.punycode() }} {{ addr }}')
-{% endfor %}
-{% endfor %}
-{% endif %}
-
-{% endif %}
\ No newline at end of file
from typing import List
from knot_resolver.datamodel.network_schema import AddressRenumberingSchema
-from knot_resolver.datamodel.policy_schema import AnswerSchema
from knot_resolver.datamodel.templates import template_from_str
-from knot_resolver.datamodel.types import PolicyFlagEnum
def test_policy_add():
tmpl.render(reroute=r)
== f"policy.REROUTE({{['{r[0].source}']='{r[0].destination}'}},{{['{r[1].source}']='{r[1].destination}'}},)"
)
-
-
-def test_policy_answer():
- ans = AnswerSchema({"rtype": "AAAA", "rdata": "192.0.2.7"})
- tmpl_str = """{% from 'macros/policy_macros.lua.j2' import policy_answer %}
-{{ policy_answer(ans) }}"""
-
- tmpl = template_from_str(tmpl_str)
- assert (
- tmpl.render(ans=ans)
- == f"policy.ANSWER({{[kres.type.{ans.rtype}]={{rdata=kres.str2ip('{ans.rdata}'),ttl={ans.ttl.seconds()}}}}},{str(ans.nodata).lower()})"
- )
+++ /dev/null
-from typing import Any, Dict
-
-import pytest
-from pytest import raises
-
-from knot_resolver.datamodel.policy_schema import ActionSchema, PolicySchema
-from knot_resolver.datamodel.types import PolicyActionEnum
-from knot_resolver.utils.modeling.exceptions import DataValidationError
-from knot_resolver.utils.modeling.types import get_generic_type_arguments
-
-noconfig_actions = [
- "pass",
- "drop",
- "refuse",
- "tc",
- "debug-always",
- "debug-cache-miss",
- "qtrace",
- "reqtrace",
-]
-configurable_actions = ["deny", "reroute", "answer", "mirror", "forward", "stub"]
-policy_actions = get_generic_type_arguments(PolicyActionEnum)
-
-
-@pytest.mark.parametrize("val", [item for item in policy_actions if item not in configurable_actions])
-def test_policy_action_valid(val: Any):
- PolicySchema({"action": val})
- ActionSchema({"action": val})
-
-
-@pytest.mark.parametrize("val", [{"action": "invalid-action"}])
-def test_action_invalid(val: Dict[str, Any]):
- with raises(DataValidationError):
- PolicySchema(val)
- with raises(DataValidationError):
- ActionSchema(val)
-
-
-@pytest.mark.parametrize(
- "val",
- [
- {"action": "deny", "message": "this is deny message"},
- {
- "action": "reroute",
- "reroute": [
- {"source": "192.0.2.0/24", "destination": "127.0.0.0"},
- {"source": "10.10.10.0/24", "destination": "192.168.1.0"},
- ],
- },
- {"action": "answer", "answer": {"rtype": "AAAA", "rdata": "192.0.2.7"}},
- {"action": "mirror", "servers": ["192.0.2.1@5353", "2001:148f:ffff::1"]},
- {"action": "forward", "servers": ["192.0.2.1@5353", "2001:148f:ffff::1"]},
- {"action": "stub", "servers": ["192.0.2.1@5353", "2001:148f:ffff::1"]},
- {"action": "forward", "servers": [{"address": ["127.0.0.1@5353"]}]},
- ],
-)
-def test_policy_valid(val: Dict[str, Any]):
- PolicySchema(val)
- ActionSchema(val)
-
-
-@pytest.mark.parametrize(
- "val",
- [
- {"action": "reroute"},
- {"action": "answer"},
- {"action": "mirror"},
- {"action": "pass", "reroute": [{"source": "192.0.2.0/24", "destination": "127.0.0.0"}]},
- {"action": "pass", "answer": {"rtype": "AAAA", "rdata": "::1"}},
- {"action": "pass", "servers": ["127.0.0.1@5353"]},
- {"action": "mirror", "servers": [{"address": ["127.0.0.1@5353"]}]},
- ],
-)
-def test_policy_invalid(val: Dict[str, Any]):
- with raises(DataValidationError):
- PolicySchema(val)
- with raises(DataValidationError):
- ActionSchema(val)
-
-
-@pytest.mark.parametrize(
- "val",
- noconfig_actions,
-)
-def test_policy_message_invalid(val: str):
- with raises(DataValidationError):
- PolicySchema({"action": f"{val}", "message": "this is deny message"})
- with raises(DataValidationError):
- ActionSchema({"action": f"{val}", "message": "this is deny message"})
+++ /dev/null
-import pytest
-from pytest import raises
-
-from knot_resolver.datamodel.rpz_schema import RPZSchema
-from knot_resolver.utils.modeling.exceptions import DataValidationError
-
-
-@pytest.mark.parametrize(
- "val",
- [
- "pass",
- "drop",
- "refuse",
- "tc",
- "debug-always",
- "debug-cache-miss",
- "qtrace",
- "reqtrace",
- ],
-)
-def test_message_invalid(val: str):
- with raises(DataValidationError):
- RPZSchema({"action": f"{val}", "file": "whitelist.rpz", "message": "this is deny message"})