Generated with: ruff check --extend-select UP007 --fix && black .
"""
from dataclasses import dataclass, field
-from typing import (
- Any,
- AsyncGenerator,
- Callable,
- Coroutine,
- Optional,
- Sequence,
- Union,
- cast,
-)
+from typing import Any, AsyncGenerator, Callable, Coroutine, Optional, Sequence, cast
import abc
import asyncio
"""
@abc.abstractmethod
- async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
+ async def perform(self) -> Optional[dns.message.Message | bytes]:
"""
This method is expected to carry out arbitrary actions (e.g. wait for a
specific amount of time, modify the answer, etc.) and then return the
delay: float = 0.0
acknowledge_hand_rolled_response: bool = False
- async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
+ async def perform(self) -> Optional[dns.message.Message | bytes]:
"""
Yield a potentially delayed response that is a dns.message.Message.
"""
response: bytes
delay: float = 0.0
- async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
+ async def perform(self) -> Optional[dns.message.Message | bytes]:
"""
Yield a potentially delayed response that is a sequence of bytes.
"""
Action which does nothing - as if a packet was dropped.
"""
- async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
+ async def perform(self) -> Optional[dns.message.Message | bytes]:
return None
delay: float = 0.0
- async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
+ async def perform(self) -> Optional[dns.message.Message | bytes]:
if self.delay > 0:
logging.info("Waiting %.1fs before closing TCP connection", self.delay)
await asyncio.sleep(self.delay)
/,
default_rcode: dns.rcode.Rcode = dns.rcode.REFUSED,
default_aa: bool = False,
- keyring: Union[
- dict[dns.name.Name, dns.tsig.Key], None, _NoKeyringType
- ] = _NoKeyringType(),
+ keyring: (
+ dict[dns.name.Name, dns.tsig.Key] | None | _NoKeyringType
+ ) = _NoKeyringType(),
acknowledge_manual_dname_handling: bool = False,
) -> None:
super().__init__(self._handle_udp, self._handle_tcp, "ans.pid")
)
def _log_response(
- self, qctx: QueryContext, response: Optional[Union[dns.message.Message, bytes]]
+ self, qctx: QueryContext, response: Optional[dns.message.Message | bytes]
) -> None:
if not response:
logging.info(
async def _prepare_responses(
self, qctx: QueryContext
- ) -> AsyncGenerator[Optional[Union[dns.message.Message, bytes]], None]:
+ ) -> AsyncGenerator[Optional[dns.message.Message | bytes], None]:
"""
Yield response(s) either from response handlers or zone data.
"""
async def _prepare_responses(
self, qctx: QueryContext
- ) -> AsyncGenerator[Optional[Union[dns.message.Message, bytes]], None]:
+ ) -> AsyncGenerator[Optional[dns.message.Message | bytes], None]:
"""
Detect and handle control queries, falling back to normal processing
for non-control queries.
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-from typing import Union
from warnings import warn
import collections.abc
draw,
*,
prefix: dns.name.Name = dns.name.empty,
- suffix: Union[
- dns.name.Name, collections.abc.Iterable[dns.name.Name]
- ] = dns.name.root,
+ suffix: dns.name.Name | collections.abc.Iterable[dns.name.Name] = dns.name.root,
min_labels: int = 1,
max_labels: int = 128,
) -> dns.name.Name:
from functools import total_ordering
from pathlib import Path
from re import compile as Re
-from typing import Optional, Union
+from typing import Optional
import glob
import os
def __str__(self) -> str:
return self.value.strftime(self.FORMAT)
- def __add__(self, other: Union[timedelta, int]):
+ def __add__(self, other: timedelta | int):
if isinstance(other, int):
other = timedelta(seconds=other)
result = KeyTimingMetadata.__new__(KeyTimingMetadata)
result.value = self.value + other
return result
- def __sub__(self, other: Union[timedelta, int]):
+ def __sub__(self, other: timedelta | int):
if isinstance(other, int):
other = timedelta(seconds=other)
result = KeyTimingMetadata.__new__(KeyTimingMetadata)
result.value = self.value - other
return result
- def __iadd__(self, other: Union[timedelta, int]):
+ def __iadd__(self, other: timedelta | int):
if isinstance(other, int):
other = timedelta(seconds=other)
self.value += other
- def __isub__(self, other: Union[timedelta, int]):
+ def __isub__(self, other: timedelta | int):
if isinstance(other, int):
other = timedelta(seconds=other)
self.value -= other
flags: int = 257,
keytag_min: int = 0,
keytag_max: int = 65535,
- offset: Union[timedelta, int] = 0,
+ offset: timedelta | int = 0,
):
self.name = name
self.key = None
operations for KASP tests.
"""
- def __init__(self, name: str, keydir: Optional[Union[str, Path]] = None):
+ def __init__(self, name: str, keydir: Optional[str | Path] = None):
self.name = name
if keydir is None:
self.keydir = Path()
line = key.split()
# defaults
- metadata: dict[str, Union[str, int]] = {}
+ metadata: dict[str, str | int] = {}
timing: dict[str, KeyTimingMetadata] = {}
private = True
legacy = False
from dataclasses import dataclass
from pathlib import Path
-from typing import Any, Optional, Union
+from typing import Any, Optional
import jinja2
Engine for rendering jinja2 templates in system test directories.
"""
- def __init__(self, directory: Union[str, Path], env_vars=ALL):
+ def __init__(self, directory: str | Path, env_vars=ALL):
"""
Initialize the template engine for `directory`, optionally overriding
the `env_vars` that will be used when rendering the templates (defaults
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
-from typing import NamedTuple, Optional, Union
+from typing import NamedTuple, Optional
import os
import platform
class AlgorithmSet(NamedTuple):
"""Collection of DEFAULT, ALTERNATIVE and DISABLED algorithms"""
- default: Union[Algorithm, list[Algorithm]]
+ default: Algorithm | list[Algorithm]
"""DEFAULT is the algorithm for testing."""
- alternative: Union[Algorithm, list[Algorithm]]
+ alternative: Algorithm | list[Algorithm]
"""ALTERNATIVE is an alternative algorithm for test cases that require more
than one algorithm (for example algorithm rollover)."""
- disabled: Union[Algorithm, list[Algorithm]]
+ disabled: Algorithm | list[Algorithm]
"""DISABLED is an algorithm that is used for tests against the
"disable-algorithms" configuration option."""
information regarding copyright ownership.
"""
-from typing import AsyncGenerator, Union
+from typing import AsyncGenerator
import dns.edns
import dns.name
def _cname_rrsets(
- qname: Union[dns.name.Name, str],
+ qname: dns.name.Name | str,
) -> tuple[dns.rrset.RRset, dns.rrset.RRset]:
return (
rrset(qname, dns.rdatatype.CNAME, f"{qname}"),
information regarding copyright ownership.
"""
-from typing import AsyncGenerator, NamedTuple, Union
+from typing import AsyncGenerator, NamedTuple
import abc
def rrset(
- qname: Union[dns.name.Name, str],
+ qname: dns.name.Name | str,
rtype: dns.rdatatype.RdataType,
rdata: str,
ttl: int = 300,
def rrset_from_list(
- qname: Union[dns.name.Name, str],
+ qname: dns.name.Name | str,
rtype: dns.rdatatype.RdataType,
rdata_list: list[str],
ttl: int = 300,
return dns.rrset.from_text_list(qname, ttl, dns.rdataclass.IN, rtype, rdata_list)
-def soa_rrset(qname: Union[dns.name.Name, str]) -> dns.rrset.RRset:
+def soa_rrset(qname: dns.name.Name | str) -> dns.rrset.RRset:
return rrset(qname, dns.rdatatype.SOA, ". . 0 0 0 0 0")
def delegation_rrsets(
- owner: Union[dns.name.Name, str],
+ owner: dns.name.Name | str,
server_number: int,
- ns_name: Union[dns.name.Name, str, None] = None,
+ ns_name: dns.name.Name | str | None = None,
) -> DelegationRRsets:
if ns_name is None:
ns_name = f"ns.{owner}"
def setup_delegation(
- qctx: QueryContext, owner: Union[dns.name.Name, str], server_number: int
+ qctx: QueryContext, owner: dns.name.Name | str, server_number: int
) -> None:
delegation = delegation_rrsets(owner, server_number)
qctx.response.authority.append(delegation.ns_rrset)