import logging
import os
import socket
-from typing import Any, Dict, List, Optional, Union
+from typing import Any, Dict, List, Optional, Tuple, Union
from typing_extensions import Literal
from knot_resolver_manager.datamodel.dns64_schema import Dns64Schema
from knot_resolver_manager.datamodel.dnssec_schema import DnssecSchema
from knot_resolver_manager.datamodel.forward_schema import ForwardSchema
-from knot_resolver_manager.datamodel.local_data_schema import LocalDataSchema
+from knot_resolver_manager.datamodel.local_data_schema import LocalDataSchema, RPZSchema, RuleSchema
from knot_resolver_manager.datamodel.logging_schema import LoggingSchema
from knot_resolver_manager.datamodel.lua_schema import LuaSchema
from knot_resolver_manager.datamodel.management_schema import ManagementSchema
from knot_resolver_manager.datamodel.webmgmt_schema import WebmgmtSchema
from knot_resolver_manager.utils.modeling import ConfigSchema
from knot_resolver_manager.utils.modeling.base_schema import lazy_default
+from knot_resolver_manager.utils.modeling.exceptions import AggregateDataValidationError, DataValidationError
_DEFAULT_RUNDIR = "/var/run/knot-resolver"
return MAX_WORKERS
+def _get_views_tags(views: List[ViewSchema]) -> List[str]:
+ tags = []
+ for view in views:
+ if view.tags:
+ tags += [str(tag) for tag in view.tags if tag not in tags]
+ return tags
+
+
+def _check_local_data_tags(
+ views_tags: List[str], rules_or_rpz: Union[List[RuleSchema], List[RPZSchema]]
+) -> Tuple[List[str], List[DataValidationError]]:
+ tags = []
+ errs = []
+
+ i = 0
+ for rule in rules_or_rpz:
+ tags_not_in = []
+ if rule.tags:
+ for tag in rule.tags:
+ tag_str = str(tag)
+ if tag_str not in tags:
+ tags.append(tag_str)
+ if tag_str not in views_tags:
+ tags_not_in.append(tag_str)
+ if len(tags_not_in) > 0:
+ errs.append(
+ DataValidationError(
+ f"some tags {tags_not_in} not found in '/views' tags", f"/local-data/rules[{i}]/tags"
+ )
+ )
+ i += 1
+ return tags, errs
+
+
class KresConfig(ConfigSchema):
class Raw(ConfigSchema):
"""
"refusing to run with more then 10 workers per cpu core, the system wouldn't behave nicely"
)
+ # get all tags from views
+ views_tags = []
+ if self.views:
+ views_tags = _get_views_tags(self.views)
+
+ # get local-data tags and check its existence in views
+ errs = []
+ local_data_tags = []
+ if self.local_data.rules:
+ rules_tags, rules_errs = _check_local_data_tags(views_tags, self.local_data.rules)
+ errs += rules_errs
+ local_data_tags += rules_tags
+ if self.local_data.rpz:
+ rpz_tags, rpz_errs = _check_local_data_tags(views_tags, self.local_data.rpz)
+ errs += rpz_errs
+ local_data_tags += rpz_tags
+
+ # look for unused tags in /views
+ unused_tags = views_tags.copy()
+ for tag in local_data_tags:
+ if tag in unused_tags:
+ unused_tags.remove(tag)
+ if len(unused_tags) > 1:
+ errs.append(DataValidationError(f"unused tags {unused_tags} found", "/views"))
+
+ # raise all validation errors
+ if len(errs) == 1:
+ raise errs[0]
+ elif len(errs) > 1:
+ raise AggregateDataValidationError("/", errs)
+
def render_lua(self) -> str:
# FIXME the `cwd` argument is used only for configuring control socket path
# it should be removed and relative path used instead as soon as issue