"instances": 1
},
"lua": {
- "script": [
+ "script_list": [
"-- SPDX-License-Identifier: CC0-1.0",
"-- vim:syntax=lua:set ts=4 sw=4:",
"-- Refer to manual: https://knot-resolver.readthedocs.org/en/stable/",
"instances": %s
},
"lua": {
- "script": [
+ "script_list": [
"-- SPDX-License-Identifier: CC0-1.0",
"-- vim:syntax=lua:set ts=4 sw=4:",
"-- Refer to manual: https://knot-resolver.readthedocs.org/en/stable/",
import click
from aiohttp import web
-from . import configuration
from .datamodel import KresConfig
from .kres_manager import KresManager
from .utils import ignore_exceptions
async def apply_config(request: web.Request) -> web.Response:
- config: KresConfig = await configuration.parse_json(await request.text())
+ config = KresConfig.from_json(await request.text())
manager: KresManager = request.app["kres_manager"]
await manager.apply_config(config)
return web.Response(text="OK")
app["kres_manager"] = manager
async def init_manager(app: web.Application):
- await app["kres_manager"].load_system_state()
+ manager = app["kres_manager"]
+ await manager.load_system_state()
+ if config is not None:
+ # TODO Use config loaded from the file system
+ pass
app.on_startup.append(init_manager)
-import json
from typing import Text
-import yaml
from jinja2 import Environment, Template
from .datamodel import KresConfig
return await _LUA_TEMPLATE.render_async(cfg=config)
-async def parse_yaml(yaml_str: str) -> KresConfig:
- data = yaml.safe_load(yaml_str)
- config = KresConfig(**data)
- await config.validate()
- return config
-
-
-async def parse_json(json_str: str) -> KresConfig:
- data = json.loads(json_str)
- config: KresConfig = KresConfig(**data)
- await config.validate()
- return config
-
-
async def load_file(path: str) -> KresConfig:
try:
with open(path, "r") as file:
except FileNotFoundError:
# return defaults
return KresConfig()
- return parse_yaml(yaml_str)
+ return KresConfig.from_yaml(yaml_str)
-from typing import List, Union
+from typing import List, Optional
-from .utils import dataclass_nested
+from knot_resolver_manager.utils.dataclasses_parservalidator import DataclassParserValidatorMixin
+
+from .compat.dataclasses import dataclass
class DataValidationError(Exception):
pass
-@dataclass_nested
-class ServerConfig:
+@dataclass
+class ServerConfig(DataclassParserValidatorMixin):
instances: int = 1
- async def validate(self):
+ def validate(self):
if self.instances < 0:
raise DataValidationError("Number of workers must be non-negative")
-@dataclass_nested
-class LuaConfig:
- script: Union[str, List[str], None] = None
+@dataclass
+class LuaConfig(DataclassParserValidatorMixin):
+ script_list: Optional[List[str]] = None
+ script: Optional[str] = None
def __post_init__(self):
# Concatenate array to single string
- if isinstance(self.script, List):
- self.script = "\n".join(self.script)
+ if self.script_list is not None:
+ self.script = "\n".join(self.script_list)
+
+ def validate(self):
+ assert self.script_list is not None or self.script is not None
-@dataclass_nested
-class KresConfig:
+@dataclass
+class KresConfig(DataclassParserValidatorMixin):
server: ServerConfig = ServerConfig()
lua: LuaConfig = LuaConfig()
- async def validate(self):
- await self.server.validate()
+ def validate(self):
+ pass
from typing import Any, Callable, Optional, Type, TypeVar
-from .dataclasses_nested import dataclass_nested
-from .dataclasses_yaml import StrictyamlParser, dataclass_strictyaml, dataclass_strictyaml_schema
+from .dataclasses_parservalidator import DataclassParserValidatorMixin, ValidationException
+from .overload import Overloaded
T = TypeVar("T")
__all__ = [
- "dataclass_strictyaml_schema",
- "dataclass_strictyaml",
- "StrictyamlParser",
"ignore_exceptions",
- "dataclass_nested",
"types",
+ "DataclassParserValidatorMixin",
+ "ValidationException",
+ "Overloaded",
]
+++ /dev/null
-from ..compat.dataclasses import dataclass, is_dataclass
-
-
-# source: https://www.geeksforgeeks.org/creating-nested-dataclass-objects-in-python/
-# decorator to wrap original __init__
-def dataclass_nested(*args, **kwargs):
- def wrapper(check_class):
- # passing class to investigate
- check_class = dataclass(check_class, **kwargs)
- o_init = check_class.__init__
-
- def __init__(self, *args, **kwargs):
- for name, value in kwargs.items():
- # getting field type
- ft = check_class.__annotations__.get(name, None)
- if is_dataclass(ft) and isinstance(value, dict):
- obj = ft(**value)
- kwargs[name] = obj
- o_init(self, *args, **kwargs)
-
- check_class.__init__ = __init__
- return check_class
-
- return wrapper(args[0]) if args else wrapper
--- /dev/null
+import json
+from typing import Any, Type, TypeVar
+
+import yaml
+
+from knot_resolver_manager.utils.types import (
+ get_generic_type_argument,
+ get_generic_type_arguments,
+ get_optional_inner_type,
+ is_dict,
+ is_list,
+ is_optional,
+ is_tuple,
+)
+
+from ..compat.dataclasses import is_dataclass
+
+
+class ValidationException(Exception):
+ pass
+
+
+def _from_dictlike_obj(cls: Any, obj: Any, default: Any, use_default: bool) -> Any:
+ # default values
+ if obj is None and use_default:
+ return default
+
+ # primitive types
+ if cls in (int, float, str):
+ return cls(obj)
+
+ # Optional[T]
+ if is_optional(cls):
+ if obj is None:
+ return None
+ else:
+ return _from_dictlike_obj(get_optional_inner_type(cls), obj, ..., False)
+
+ # Dict[K,V]
+ elif is_dict(cls):
+ key_type, val_type = get_generic_type_arguments(cls)
+ return {
+ _from_dictlike_obj(key_type, key, ..., False): _from_dictlike_obj(val_type, val, ..., False)
+ for key, val in obj.items()
+ }
+
+ # List[T]
+ elif is_list(cls):
+ inner_type = get_generic_type_argument(cls)
+ return [_from_dictlike_obj(inner_type, val, ..., False) for val in obj]
+
+ # Tuple[A,B,C,D,...]
+ elif is_tuple(cls):
+ types = get_generic_type_arguments(cls)
+ return tuple(_from_dictlike_obj(typ, val, ..., False) for typ, val in zip(types, obj))
+
+ # nested dataclass
+ elif is_dataclass(cls):
+ anot = cls.__dict__.get("__annotations__", {})
+ kwargs = {}
+ for name, python_type in anot.items():
+ value = obj[name] if name in obj else None
+ use_default = hasattr(cls, name)
+ default = getattr(cls, name, ...)
+ kwargs[name] = _from_dictlike_obj(python_type, value, default, use_default)
+ return cls(**kwargs)
+
+ # default error handler
+ else:
+ raise ValidationException(
+ f"Type {cls} cannot be parsed. This is a implementation error. "
+ "Please fix your types in the dataclass or improve the parser/validator."
+ )
+
+
+_T = TypeVar("_T", bound="DataclassParserValidatorMixin")
+
+
+class DataclassParserValidatorMixin:
+ def validate_recursive(self) -> None:
+ for field_name in dir(self):
+ # skip internal fields
+ if field_name.startswith("_"):
+ continue
+
+ field = getattr(self, field_name)
+ if is_dataclass(field):
+ if not isinstance(field, DataclassParserValidatorMixin):
+ raise ValidationException(
+ f"Nested dataclass in the field {field_name} does not include the ParserValidatorMixin"
+ )
+ field.validate_recursive()
+
+ self.validate()
+
+ def validate(self) -> None:
+ raise NotImplementedError(f"Validation function is not implemented in class {type(self).__name__}")
+
+ @classmethod
+ def from_yaml(cls: Type[_T], text: str, default: _T = ..., use_default: bool = False) -> _T:
+ data = yaml.safe_load(text)
+ config: _T = _from_dictlike_obj(cls, data, default, use_default)
+ config.validate_recursive()
+ return config
+
+ @classmethod
+ def from_json(cls: Type[_T], text: str, default: _T = ..., use_default: bool = False) -> _T:
+ data = json.loads(text)
+ config: _T = _from_dictlike_obj(cls, data, default, use_default)
+ config.validate_recursive()
+ return config
+++ /dev/null
-from typing import Any, Dict, List, Tuple, Type, TypeVar, Union
-
-import strictyaml
-from strictyaml import YAML, EmptyDict, FixedSeq, Float, Int, Map, MapPattern, Seq, Str, load
-
-
-class _DummyType:
- pass
-
-
-NoneType = type(None)
-
-
-_TYPE_MAP = {
- int: Int,
- str: Str,
- float: Float,
- List: Seq,
- Dict: MapPattern,
- Tuple: FixedSeq,
- Union: _DummyType,
-}
-
-_SCHEMA_FIELD_NAME = "STRICTYAML_SCHEMA"
-
-
-class StrictYAMLSchemaGenerationError(Exception):
- pass
-
-
-class StrictYAMLValueMappingError(Exception):
- pass
-
-
-def _get_strictyaml_type(python_type):
- # another already processed class
- if hasattr(python_type, _SCHEMA_FIELD_NAME):
- return getattr(python_type, _SCHEMA_FIELD_NAME)
-
- # compount types like List
- elif (
- hasattr(python_type, "__origin__")
- and hasattr(python_type, "__args__")
- and getattr(python_type, "__origin__") in _TYPE_MAP
- ):
- origin = getattr(python_type, "__origin__")
- args = getattr(python_type, "__args__")
-
- # special case for Optional[T]
- if origin == Union and len(args) == 2 and args[1] == NoneType:
- # for some weird reason, the optional wrapper is on the key, not on the value type
- return _get_strictyaml_type(args[0])
-
- type_constructor = _TYPE_MAP[origin]
- type_arguments = [_get_strictyaml_type(a) for a in args]
- print(type_constructor, type_arguments)
-
- # special case for Tuple
- if origin == Tuple:
- return type_constructor(type_arguments)
-
- # default behaviour
- return type_constructor(*type_arguments)
-
- # error handlers for non existent primitive types
- elif python_type not in _TYPE_MAP:
- raise StrictYAMLSchemaGenerationError(f"Type {python_type} is not supported for YAML schema generation")
-
- # remaining primitive and untyped types
- else:
- return _TYPE_MAP[python_type]()
-
-
-def dataclass_strictyaml_schema(cls):
- anot = cls.__dict__.get("__annotations__", {})
-
- if len(anot) == 0:
- schema = EmptyDict()
- else:
- fields = {}
- for name, python_type in anot.items():
- # special case for Optional[T], because it's weird
- # https://hitchdev.com/strictyaml/using/alpha/compound/optional-keys-with-defaults/
- if (
- hasattr(python_type, "__origin__")
- and hasattr(python_type, "__args__")
- and getattr(python_type, "__origin__") == Union
- and len(getattr(python_type, "__args__")) == 2
- and getattr(python_type, "__args__")[1] == NoneType
- ):
- name = strictyaml.Optional(name)
- fields[name] = _get_strictyaml_type(python_type)
- schema = Map(fields)
-
- setattr(cls, _SCHEMA_FIELD_NAME, schema)
-
- return cls
-
-
-def _yamlobj_to_dataclass(cls, obj: YAML) -> Any:
- # native values recursion helper
- if cls in (int, float):
- return cls(obj)
- if cls == str:
- return str(obj.text)
- # compount types
- if (
- hasattr(cls, "__origin__")
- and hasattr(cls, "__args__")
- and getattr(cls, "__origin__") in (Union, Dict, List, Tuple)
- ):
- origin = getattr(cls, "__origin__")
- args = getattr(cls, "__args__")
-
- # Optional[T]
- if origin == Union and len(args) == 2 and args[1] == NoneType:
- return _yamlobj_to_dataclass(args[0], obj) if obj is not None else None
-
- # Dict[K, V]
- elif origin == Dict and len(args) == 2:
- return {
- _yamlobj_to_dataclass(args[0], key): _yamlobj_to_dataclass(args[1], val) for key, val in obj.items()
- }
-
- # List[T]
- elif origin == List and len(args) == 1:
- return [_yamlobj_to_dataclass(args[0], val) for val in obj]
-
- # Tuple
- elif origin == Tuple:
- return tuple(_yamlobj_to_dataclass(typ, val) for typ, val in zip(args, obj))
-
- # ^ that's full list of native types
- # the remaining code handles cases when cls is a dataclasses
-
- # assert that no weird class without schema gets here
- if not hasattr(cls, _SCHEMA_FIELD_NAME):
- raise Exception(
- f"{str(cls)} does not have a schema field and is not primitive - don't know how to parse. "
- + "Did you forget to add @dataclass_strictyaml_schema to nested dataclass?"
- )
-
- anot = cls.__dict__.get("__annotations__", {})
- kwargs = {}
- for name, python_type in anot.items():
- kwargs[name] = _yamlobj_to_dataclass(python_type, obj[name] if name in obj else None)
- return cls(**kwargs)
-
-
-def _from_yaml(cls, text: str):
- schema = getattr(cls, _SCHEMA_FIELD_NAME)
-
- yamlobj = load(text, schema)
- return _yamlobj_to_dataclass(cls, yamlobj)
-
-
-def dataclass_strictyaml(cls):
- if not hasattr(cls, _SCHEMA_FIELD_NAME):
- cls = dataclass_strictyaml_schema(cls)
-
- setattr(cls, "from_yaml", classmethod(_from_yaml))
- return cls
-
-
-_T = TypeVar("_T", bound="StrictyamlParser")
-
-
-class StrictyamlParser:
- @classmethod
- def from_yaml(cls: Type[_T], text: str) -> _T:
- if not hasattr(cls, _SCHEMA_FIELD_NAME):
- dataclass_strictyaml_schema(cls)
-
- return _from_yaml(cls, text)
-from knot_resolver_manager.utils.types import NoneType, get_optional_inner_type, is_optional
+from collections import defaultdict
from typing import Any, Callable, Dict, Generic, List, Tuple, Type, TypeVar
+from knot_resolver_manager.utils.types import NoneType, get_optional_inner_type, is_optional
+
T = TypeVar("T")
-class OverloadedFunctionException(Exception): pass
-class overloaded(Generic[T]):
+class OverloadedFunctionException(Exception):
+ pass
+
+
+class Overloaded(Generic[T]):
def __init__(self):
self._vtable: Dict[Tuple[Any], Callable[..., T]] = {}
result = [p + [NoneType] for p in result] + [p + [tp] for p in result]
else:
result = [p + [arg_type] for p in result]
-
+
# make tuples
return [tuple(x) for x in result]
-
+
def add(self, *args: Type[Any], **kwargs: Type[Any]) -> Callable[[Callable[..., T]], Callable[..., T]]:
if len(kwargs) != 0:
- raise OverloadedFunctionException("Sorry, named arguments are not supported. You can however implement them and make them functional... ;)")
+ raise OverloadedFunctionException(
+ "Sorry, named arguments are not supported. "
+ "You can however implement them and make them functional... ;)"
+ )
- def wrapper(func: Callable[...,T]) -> Callable[...,T]:
- signatures = overloaded._create_signatures(*args)
+ def wrapper(func: Callable[..., T]) -> Callable[..., T]:
+ signatures = Overloaded._create_signatures(*args)
for signature in signatures:
if signature in self._vtable:
- raise OverloadedFunctionException("Sorry, signature {signature} is already defined. You can't make a second definition of the same signature.")
+ raise OverloadedFunctionException(
+ "Sorry, signature {signature} is already defined. "
+ "You can't make a second definition of the same signature."
+ )
self._vtable[signature] = func
- def inner_wrapper(*args: Any, **kwargs: Any) -> T:
- return self(*args, **kwargs)
- return inner_wrapper
+ return self
+
return wrapper
-
+
+ def default(self, func: Callable[..., T]) -> Callable[..., T]:
+ if isinstance(self._vtable, defaultdict):
+ raise OverloadedFunctionException(
+ "Sorry, you can't define multiple default handlers in an overloaded function"
+ )
+
+ self._vtable = defaultdict(lambda: func, self._vtable)
+ return self
+
def __call__(self, *args: Any, **kwargs: Any) -> T:
if len(kwargs) != 0:
- raise OverloadedFunctionException("Sorry, named arguments are not supported. You can however implement them and make them functional... ;)")
+ raise OverloadedFunctionException(
+ "Sorry, named arguments are not supported. "
+ "You can however implement them and make them functional... ;)"
+ )
signature = tuple(type(a) for a in args)
if signature not in self._vtable:
- raise OverloadedFunctionException(f"Function overload with signature {signature} is not registered and can't be called.")
+ raise OverloadedFunctionException(
+ f"Function overload with signature {signature} is not registered and can't be called."
+ )
return self._vtable[signature](*args)
-
+
def _print_vtable(self):
for signature in self._vtable:
print(f"{signature} registered")
- print()
\ No newline at end of file
+ print()
+from typing import Any, Dict, List, Optional, Tuple, Type, TypeVar, Union
+
NoneType = type(None)
+
+
+def is_optional(tp: Any) -> bool:
+ origin = getattr(tp, "__origin__", None)
+ args = getattr(tp, "__args__", [])
+
+ return origin == Union and len(args) == 2 and args[1] == NoneType
+
+
+def is_dict(tp: Any) -> bool:
+ return getattr(tp, "__origin__", None) == Dict
+
+
+def is_list(tp: Any) -> bool:
+ return getattr(tp, "__origin__", None) == List
+
+
+def is_tuple(tp: Any) -> bool:
+ return getattr(tp, "__origin__", None) == Tuple
+
+
+def is_union(tp: Any) -> bool:
+ """Returns False if it is Union but looks like Optional"""
+ return not is_optional(tp) and getattr(tp, "__origin__", None) == Union
+
+
+def get_generic_type_arguments(tp: Any) -> List[Any]:
+ return list(getattr(tp, "__args__", []))
+
+
+def get_generic_type_argument(tp: Any) -> Any:
+ """ same as function get_generic_type_arguments, but expects just one type argument"""
+
+ args = get_generic_type_arguments(tp)
+ assert len(args) == 1
+ return args[0]
+
+
+T = TypeVar("T")
+
+
+def get_optional_inner_type(optional: Type[Optional[T]]) -> Type[T]:
+ assert is_optional(optional)
+ return get_generic_type_arguments(optional)[0]
"include": [
"knot_resolver_manager"
],
- "exclude": [
- "knot_resolver_manager/utils/dataclasses_yaml.py"
- ],
+ "exclude": [],
"typeCheckingMode": "strict"
}
\ No newline at end of file
echo The debug server will be listening on port localhost:5678
echo Use VSCode remote attach feature to connect to the debug server
echo The manager will start after you connect
+echo API will be running on port 5000
echo ----------------------------------------
-poe container-run -p 5678 --code -- knot-manager:dev python -m debugpy --listen 0.0.0.0:5678 --wait-for-client -m knot_resolver_manager
\ No newline at end of file
+poe container-run -p 5000 -p 5678 --code -- knot-manager:dev python -m debugpy --listen 0.0.0.0:5678 --wait-for-client -m knot_resolver_manager 5000
\ No newline at end of file
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
-import pytest
-import strictyaml
-from strictyaml import EmptyDict, FixedSeq, Float, Int, Map, MapPattern, Seq, Str
-
-from knot_resolver_manager.utils import dataclass_strictyaml_schema
-from knot_resolver_manager.utils.dataclasses_yaml import StrictyamlParser, dataclass_strictyaml
+from knot_resolver_manager.utils.dataclasses_parservalidator import DataclassParserValidatorMixin
def test_parsing_primitive():
@dataclass
- @dataclass_strictyaml
- class TestClass:
+ class TestClass(DataclassParserValidatorMixin):
i: int
s: str
f: float
+ def validate(self):
+ pass
+
yaml = """i: 5
s: "test"
f: 3.14"""
def test_parsing_nested():
@dataclass
- @dataclass_strictyaml_schema
- class Lower:
+ class Lower(DataclassParserValidatorMixin):
i: int
+ def validate(self):
+ pass
+
@dataclass
- class Upper(StrictyamlParser):
+ class Upper(DataclassParserValidatorMixin):
l: Lower
+ def validate(self):
+ pass
+
yaml = """l:
i: 5"""
def test_simple_compount_types():
@dataclass
- class TestClass(StrictyamlParser):
+ class TestClass(DataclassParserValidatorMixin):
l: List[int]
d: Dict[str, str]
t: Tuple[str, int]
o: Optional[int]
+ def validate(self):
+ pass
+
yaml = """l:
- 1
- 2
def test_nested_compount_types():
@dataclass
- class TestClass(StrictyamlParser):
+ class TestClass(DataclassParserValidatorMixin):
o: Optional[Dict[str, str]]
+ def validate(self):
+ pass
+
yaml = """o:
key: val"""
def test_nested_compount_types2():
@dataclass
- class TestClass(StrictyamlParser):
+ class TestClass(DataclassParserValidatorMixin):
i: int
o: Optional[Dict[str, str]]
+ def validate(self):
+ pass
+
yaml = "i: 5"
obj = TestClass.from_yaml(yaml)
def test_real_failing_dummy_confdata():
@dataclass
- class ConfData(StrictyamlParser):
+ class ConfData(DataclassParserValidatorMixin):
num_workers: int = 1
lua_config: Optional[str] = None
- async def validate(self) -> bool:
+ def validate(self):
if self.num_workers < 0:
raise Exception("Number of workers must be non-negative")
- return True
-
# prepare the payload
lua_config = "dummy"
config = f"""
+++ /dev/null
-from dataclasses import dataclass
-from typing import Dict, List, Optional, Tuple
-
-import pytest
-import strictyaml
-from strictyaml import EmptyDict, FixedSeq, Float, Int, Map, MapPattern, Seq, Str
-
-from knot_resolver_manager.utils import dataclass_strictyaml_schema
-from knot_resolver_manager.utils.dataclasses_yaml import dataclass_strictyaml
-
-
-def _schema_eq(schema1, schema2) -> bool:
- """
- Hacky way to determine, whether two schemas are the same... It works well, so why not... :)
- """
- return str(schema1) == str(schema2)
-
-
-def test_empty_class():
- @dataclass_strictyaml_schema
- class TestClass:
- pass
-
- assert _schema_eq(TestClass.STRICTYAML_SCHEMA, EmptyDict())
-
-
-def test_int_field():
- @dataclass_strictyaml_schema
- class TestClass:
- field: int
-
- assert _schema_eq(TestClass.STRICTYAML_SCHEMA, Map({"field": Int()}))
-
-
-def test_string_field():
- @dataclass_strictyaml_schema
- class TestClass:
- field: str
-
- assert _schema_eq(TestClass.STRICTYAML_SCHEMA, Map({"field": Str()}))
-
-
-def test_float_field():
- @dataclass_strictyaml_schema
- class TestClass:
- field: float
-
- assert _schema_eq(TestClass.STRICTYAML_SCHEMA, Map({"field": Float()}))
-
-
-def test_multiple_fields():
- @dataclass_strictyaml_schema
- class TestClass:
- field1: str
- field2: int
- field3: float
-
- assert _schema_eq(
- TestClass.STRICTYAML_SCHEMA,
- Map({"field1": Str(), "field2": Int(), "field3": Float()}),
- )
-
-
-def test_list_field():
- @dataclass_strictyaml_schema
- class TestClass:
- field: List[str]
-
- assert _schema_eq(TestClass.STRICTYAML_SCHEMA, Map({"field": Seq(Str())}))
-
-
-def test_dict_field():
- @dataclass_strictyaml_schema
- class TestClass:
- field: Dict[str, int]
-
- assert _schema_eq(TestClass.STRICTYAML_SCHEMA, Map({"field": MapPattern(Str(), Int())}))
-
-
-def test_optional_field():
- @dataclass_strictyaml_schema
- class TestClass:
- field: Optional[int]
-
- assert _schema_eq(TestClass.STRICTYAML_SCHEMA, Map({strictyaml.Optional("field"): Int()}))
-
-
-def test_nested_dict_list():
- @dataclass_strictyaml_schema
- class TestClass:
- field: Dict[str, List[int]]
-
- assert _schema_eq(TestClass.STRICTYAML_SCHEMA, Map({"field": MapPattern(Str(), Seq(Int()))}))
-
-
-@pytest.mark.xfail(strict=True)
-def test_nested_dict_key_list():
- """
- List can't be a dict key, so this should fail
- """
-
- @dataclass_strictyaml_schema
- class TestClass:
- field: Dict[List[int], List[int]]
-
- assert _schema_eq(TestClass.STRICTYAML_SCHEMA, Map({"field": MapPattern(Seq(Int()), Seq(Int()))}))
-
-
-def test_nested_list():
- @dataclass_strictyaml_schema
- class TestClass:
- field: List[List[List[List[int]]]]
-
- assert _schema_eq(TestClass.STRICTYAML_SCHEMA, Map({"field": Seq(Seq(Seq(Seq(Int()))))}))
-
-
-def test_tuple_field():
- @dataclass_strictyaml_schema
- class TestClass:
- field: Tuple[str, int]
-
- assert _schema_eq(TestClass.STRICTYAML_SCHEMA, Map({"field": FixedSeq([Str(), Int()])}))
-
-
-def test_nested_tuple():
- @dataclass_strictyaml_schema
- class TestClass:
- field: Tuple[str, Dict[str, int], List[List[int]]]
-
- assert _schema_eq(
- TestClass.STRICTYAML_SCHEMA,
- Map({"field": FixedSeq([Str(), MapPattern(Str(), Int()), Seq(Seq(Int()))])}),
- )
-
-
-def test_chained_classes():
- @dataclass_strictyaml_schema
- class TestClass:
- field: int
-
- @dataclass_strictyaml_schema
- class CompoundClass:
- c: TestClass
-
- assert _schema_eq(CompoundClass.STRICTYAML_SCHEMA, Map({"c": Map({"field": Int()})}))
-
-
-def test_combined_with_dataclass():
- from dataclasses import dataclass
-
- @dataclass
- @dataclass_strictyaml_schema
- class TestClass:
- field: int
-
- assert _schema_eq(TestClass.STRICTYAML_SCHEMA, Map({"field": Int()}))
-
-
-def test_combined_with_dataclass2():
- from dataclasses import dataclass
-
- @dataclass_strictyaml_schema
- @dataclass
- class TestClass:
- field: int
-
- assert _schema_eq(TestClass.STRICTYAML_SCHEMA, Map({"field": Int()}))
-
-
-def test_parsing_primitive():
- @dataclass
- @dataclass_strictyaml
- class TestClass:
- i: int
- s: str
- f: float
-
- yaml = """i: 5
-s: "test"
-f: 3.14"""
-
- obj = TestClass.from_yaml(yaml)
-
- assert obj.i == 5
- assert obj.s == "test"
- assert obj.f == 3.14
-
-
-def test_parsing_nested():
- @dataclass
- @dataclass_strictyaml
- class Lower:
- i: int
-
- @dataclass
- @dataclass_strictyaml
- class Upper:
- l: Lower
-
- yaml = """l:
- i: 5"""
-
- obj = Upper.from_yaml(yaml)
- assert obj.l.i == 5
-
-
-def test_simple_compount_types():
- @dataclass
- @dataclass_strictyaml
- class TestClass:
- l: List[int]
- d: Dict[str, str]
- t: Tuple[str, int]
- o: Optional[int]
-
- yaml = """l:
- - 1
- - 2
- - 3
- - 4
- - 5
-d:
- something: else
- w: all
-t:
- - test
- - 5"""
-
- obj = TestClass.from_yaml(yaml)
-
- assert obj.l == [1, 2, 3, 4, 5]
- assert obj.d == {"something": "else", "w": "all"}
- assert obj.t == ("test", 5)
- assert obj.o is None
-
-
-def test_nested_compount_types():
- @dataclass
- @dataclass_strictyaml
- class TestClass:
- o: Optional[Dict[str, str]]
-
- yaml = """o:
- key: val"""
-
- obj = TestClass.from_yaml(yaml)
-
- assert obj.o == {"key": "val"}
-
-
-def test_nested_compount_types2():
- @dataclass
- @dataclass_strictyaml
- class TestClass:
- i: int
- o: Optional[Dict[str, str]]
-
- yaml = "i: 5"
-
- obj = TestClass.from_yaml(yaml)
-
- assert obj.o is None
from typing import Optional
-from knot_resolver_manager.utils.overload import overloaded
+
+from knot_resolver_manager.utils import Overloaded
def test_simple():
- func: overloaded[None] = overloaded()
+ func: Overloaded[None] = Overloaded()
@func.add(int)
def f1(a: int) -> None:
assert type(a) == int
-
+
@func.add(str)
def f2(a: str) -> None:
assert type(a) == str
-
+
func("test")
func(5)
f1("test")
def test_optional():
- func: overloaded[int] = overloaded()
+ # pyright: reportUnusedFunction=false
+
+ func: Overloaded[int] = Overloaded()
@func.add(Optional[int], str)
def f1(a: Optional[int], b: str) -> int:
assert a is None or type(a) == int
assert type(b) == str
return -1
-
+
@func.add(Optional[str], int)
def f2(a: Optional[str], b: int) -> int:
assert a is None or type(a) == str
assert type(b) == int
return 1
-
func(None, 5)
func("str", 5)
func(None, "str")
- func(5, "str")
\ No newline at end of file
+ func(5, "str")
--- /dev/null
+from typing import List
+
+from knot_resolver_manager.utils.types import is_list
+
+
+def test_is_list():
+ assert is_list(List[str])
+ assert is_list(List[int])
This type stub file was generated by pyright.
"""
-from jinja2.bccache import BytecodeCache as BytecodeCache, FileSystemBytecodeCache as FileSystemBytecodeCache, MemcachedBytecodeCache as MemcachedBytecodeCache
-from jinja2.environment import Environment as Environment, Template as Template
-from jinja2.exceptions import TemplateAssertionError as TemplateAssertionError, TemplateError as TemplateError, TemplateNotFound as TemplateNotFound, TemplateSyntaxError as TemplateSyntaxError, TemplatesNotFound as TemplatesNotFound, UndefinedError as UndefinedError
-from jinja2.filters import contextfilter as contextfilter, environmentfilter as environmentfilter, evalcontextfilter as evalcontextfilter
-from jinja2.loaders import BaseLoader as BaseLoader, ChoiceLoader as ChoiceLoader, DictLoader as DictLoader, FileSystemLoader as FileSystemLoader, FunctionLoader as FunctionLoader, ModuleLoader as ModuleLoader, PackageLoader as PackageLoader, PrefixLoader as PrefixLoader
-from jinja2.runtime import DebugUndefined as DebugUndefined, StrictUndefined as StrictUndefined, Undefined as Undefined, make_logging_undefined as make_logging_undefined
-from jinja2.utils import Markup as Markup, clear_caches as clear_caches, contextfunction as contextfunction, environmentfunction as environmentfunction, escape as escape, evalcontextfunction as evalcontextfunction, is_undefined as is_undefined, select_autoescape as select_autoescape
-
+from jinja2.bccache import BytecodeCache as BytecodeCache
+from jinja2.bccache import FileSystemBytecodeCache as FileSystemBytecodeCache
+from jinja2.bccache import MemcachedBytecodeCache as MemcachedBytecodeCache
+from jinja2.environment import Environment as Environment
+from jinja2.environment import Template as Template
+from jinja2.exceptions import TemplateAssertionError as TemplateAssertionError
+from jinja2.exceptions import TemplateError as TemplateError
+from jinja2.exceptions import TemplateNotFound as TemplateNotFound
+from jinja2.exceptions import TemplatesNotFound as TemplatesNotFound
+from jinja2.exceptions import TemplateSyntaxError as TemplateSyntaxError
+from jinja2.exceptions import UndefinedError as UndefinedError
+from jinja2.filters import contextfilter as contextfilter
+from jinja2.filters import environmentfilter as environmentfilter
+from jinja2.filters import evalcontextfilter as evalcontextfilter
+from jinja2.loaders import BaseLoader as BaseLoader
+from jinja2.loaders import ChoiceLoader as ChoiceLoader
+from jinja2.loaders import DictLoader as DictLoader
+from jinja2.loaders import FileSystemLoader as FileSystemLoader
+from jinja2.loaders import FunctionLoader as FunctionLoader
+from jinja2.loaders import ModuleLoader as ModuleLoader
+from jinja2.loaders import PackageLoader as PackageLoader
+from jinja2.loaders import PrefixLoader as PrefixLoader
+from jinja2.runtime import DebugUndefined as DebugUndefined
+from jinja2.runtime import StrictUndefined as StrictUndefined
+from jinja2.runtime import Undefined as Undefined
+from jinja2.runtime import make_logging_undefined as make_logging_undefined
+from jinja2.utils import Markup as Markup
+from jinja2.utils import clear_caches as clear_caches
+from jinja2.utils import contextfunction as contextfunction
+from jinja2.utils import environmentfunction as environmentfunction
+from jinja2.utils import escape as escape
+from jinja2.utils import evalcontextfunction as evalcontextfunction
+from jinja2.utils import is_undefined as is_undefined
+from jinja2.utils import select_autoescape as select_autoescape
get_next: Any
def with_metaclass(meta, *bases):
...
-
xid_continue: str
def allexcept(*args):
...
-
def dump_bytecode(self, bucket):
...
-
-
-
"""
from typing import Any, Optional
+
from jinja2.visitor import NodeVisitor
operators: Any
def visit_ScopedEvalContextModifier(self, node, frame):
...
-
-
-
"""
from typing import Any, Dict, Optional
+
from jinja2.filters import FILTERS
from jinja2.tests import TESTS
import sys
from typing import Any, AsyncIterator, Awaitable, Callable, Dict, Iterator, List, Optional, Sequence, Text, Type, Union
+
from .bccache import BytecodeCache
from .loaders import BaseLoader
from .runtime import Context, Undefined
def __next__(self):
...
-
-
-
class FilterArgumentError(TemplateRuntimeError):
...
-
-
def tokeniter(self, source, name, filename: Optional[Any] = ..., state: Optional[Any] = ...):
...
-
-
-
import sys
from types import ModuleType
from typing import Any, Callable, Iterable, List, Optional, Text, Tuple, Union
+
from .environment import Environment
if sys.version_info >= (3, 7):
def load(self, environment, name, globals: Optional[Any] = ...):
...
-
-
-
"""
from typing import Any
+
from jinja2.compiler import CodeGenerator
class TrackingCodeGenerator(CodeGenerator):
def find_referenced_templates(ast):
...
-
class ScopedEvalContextModifier(EvalContextModifier):
fields: Any
...
-
-
"""
from typing import Any
+
from jinja2.visitor import NodeTransformer
def optimize(node, environment):
visit_Filter: Any
visit_Test: Any
visit_CondExpr: Any
-
-
def parse(self):
...
-
-
-
"""
from typing import Any, Dict, Optional, Text, Union
+
from jinja2.environment import Environment
to_string: Any
__bool__: Any
__hash__: Any
...
-
-
"""
from typing import Any
+
from jinja2.environment import Environment
MAX_RANGE: int
class ImmutableSandboxedEnvironment(SandboxedEnvironment):
def is_safe_attribute(self, obj, attr, value):
...
-
-
-
This type stub file was generated by pyright.
"""
-from _typeshed import AnyPath
-from typing import Any, Callable, IO, Iterable, Optional, Protocol, Text, TypeVar, Union
-from typing_extensions import Literal
+from typing import IO, Any, Callable, Iterable, Optional, Protocol, Text, TypeVar, Union
+
from markupsafe import Markup as Markup
+from typing_extensions import Literal
+
+from _typeshed import AnyPath
missing: Any
internal_code: Any
def __call__(self):
...
-
-
-
def visit_list(self, node, *args, **kwargs):
...
-
-
-