]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Replace Union[S, T] with S | T
authorŠtěpán Balážik <stepan@isc.org>
Mon, 9 Feb 2026 14:40:36 +0000 (15:40 +0100)
committerŠtěpán Balážik <stepan@isc.org>
Fri, 20 Feb 2026 14:17:32 +0000 (15:17 +0100)
Generated with: ruff check --extend-select UP007 --fix && black .

bin/tests/system/isctest/asyncserver.py
bin/tests/system/isctest/hypothesis/strategies.py
bin/tests/system/isctest/kasp.py
bin/tests/system/isctest/template.py
bin/tests/system/isctest/vars/algorithms.py
bin/tests/system/resolver/ans2/ans.py
bin/tests/system/resolver/resolver_ans.py

index 8cb9efc6058da44feb1d1b75a198f52f234d92fa..b43cd32d8ffb858a640df86524e68f703ffea42b 100644 (file)
@@ -12,16 +12,7 @@ information regarding copyright ownership.
 """
 
 from dataclasses import dataclass, field
-from typing import (
-    Any,
-    AsyncGenerator,
-    Callable,
-    Coroutine,
-    Optional,
-    Sequence,
-    Union,
-    cast,
-)
+from typing import Any, AsyncGenerator, Callable, Coroutine, Optional, Sequence, cast
 
 import abc
 import asyncio
@@ -318,7 +309,7 @@ class ResponseAction(abc.ABC):
     """
 
     @abc.abstractmethod
-    async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
+    async def perform(self) -> Optional[dns.message.Message | bytes]:
         """
         This method is expected to carry out arbitrary actions (e.g. wait for a
         specific amount of time, modify the answer, etc.) and then return the
@@ -345,7 +336,7 @@ class DnsResponseSend(ResponseAction):
     delay: float = 0.0
     acknowledge_hand_rolled_response: bool = False
 
-    async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
+    async def perform(self) -> Optional[dns.message.Message | bytes]:
         """
         Yield a potentially delayed response that is a dns.message.Message.
         """
@@ -391,7 +382,7 @@ class BytesResponseSend(ResponseAction):
     response: bytes
     delay: float = 0.0
 
-    async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
+    async def perform(self) -> Optional[dns.message.Message | bytes]:
         """
         Yield a potentially delayed response that is a sequence of bytes.
         """
@@ -408,7 +399,7 @@ class ResponseDrop(ResponseAction):
     Action which does nothing - as if a packet was dropped.
     """
 
-    async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
+    async def perform(self) -> Optional[dns.message.Message | bytes]:
         return None
 
 
@@ -426,7 +417,7 @@ class CloseConnection(ResponseAction):
 
     delay: float = 0.0
 
-    async def perform(self) -> Optional[Union[dns.message.Message, bytes]]:
+    async def perform(self) -> Optional[dns.message.Message | bytes]:
         if self.delay > 0:
             logging.info("Waiting %.1fs before closing TCP connection", self.delay)
             await asyncio.sleep(self.delay)
@@ -1050,9 +1041,9 @@ class AsyncDnsServer(AsyncServer):
         /,
         default_rcode: dns.rcode.Rcode = dns.rcode.REFUSED,
         default_aa: bool = False,
-        keyring: Union[
-            dict[dns.name.Name, dns.tsig.Key], None, _NoKeyringType
-        ] = _NoKeyringType(),
+        keyring: (
+            dict[dns.name.Name, dns.tsig.Key] | None | _NoKeyringType
+        ) = _NoKeyringType(),
         acknowledge_manual_dname_handling: bool = False,
     ) -> None:
         super().__init__(self._handle_udp, self._handle_tcp, "ans.pid")
@@ -1295,7 +1286,7 @@ class AsyncDnsServer(AsyncServer):
         )
 
     def _log_response(
-        self, qctx: QueryContext, response: Optional[Union[dns.message.Message, bytes]]
+        self, qctx: QueryContext, response: Optional[dns.message.Message | bytes]
     ) -> None:
         if not response:
             logging.info(
@@ -1395,7 +1386,7 @@ class AsyncDnsServer(AsyncServer):
 
     async def _prepare_responses(
         self, qctx: QueryContext
-    ) -> AsyncGenerator[Optional[Union[dns.message.Message, bytes]], None]:
+    ) -> AsyncGenerator[Optional[dns.message.Message | bytes], None]:
         """
         Yield response(s) either from response handlers or zone data.
         """
@@ -1609,7 +1600,7 @@ class ControllableAsyncDnsServer(AsyncDnsServer):
 
     async def _prepare_responses(
         self, qctx: QueryContext
-    ) -> AsyncGenerator[Optional[Union[dns.message.Message, bytes]], None]:
+    ) -> AsyncGenerator[Optional[dns.message.Message | bytes], None]:
         """
         Detect and handle control queries, falling back to normal processing
         for non-control queries.
index 74a86c6a644acc8606333a0f5921b14d5d784cae..60d42dcea7b7e0fd7c58e7fdc4fcb466a5346ca0 100644 (file)
@@ -11,7 +11,6 @@
 # See the COPYRIGHT file distributed with this work for additional
 # information regarding copyright ownership.
 
-from typing import Union
 from warnings import warn
 
 import collections.abc
@@ -40,9 +39,7 @@ def dns_names(
     draw,
     *,
     prefix: dns.name.Name = dns.name.empty,
-    suffix: Union[
-        dns.name.Name, collections.abc.Iterable[dns.name.Name]
-    ] = dns.name.root,
+    suffix: dns.name.Name | collections.abc.Iterable[dns.name.Name] = dns.name.root,
     min_labels: int = 1,
     max_labels: int = 128,
 ) -> dns.name.Name:
index 5e8b1877c7ddcfb4866bcb18333e8fbddea300d7..737511dd5ccb25884e112e652d620e42796506d1 100644 (file)
@@ -14,7 +14,7 @@ from datetime import datetime, timedelta, timezone
 from functools import total_ordering
 from pathlib import Path
 from re import compile as Re
-from typing import Optional, Union
+from typing import Optional
 
 import glob
 import os
@@ -134,26 +134,26 @@ class KeyTimingMetadata:
     def __str__(self) -> str:
         return self.value.strftime(self.FORMAT)
 
-    def __add__(self, other: Union[timedelta, int]):
+    def __add__(self, other: timedelta | int):
         if isinstance(other, int):
             other = timedelta(seconds=other)
         result = KeyTimingMetadata.__new__(KeyTimingMetadata)
         result.value = self.value + other
         return result
 
-    def __sub__(self, other: Union[timedelta, int]):
+    def __sub__(self, other: timedelta | int):
         if isinstance(other, int):
             other = timedelta(seconds=other)
         result = KeyTimingMetadata.__new__(KeyTimingMetadata)
         result.value = self.value - other
         return result
 
-    def __iadd__(self, other: Union[timedelta, int]):
+    def __iadd__(self, other: timedelta | int):
         if isinstance(other, int):
             other = timedelta(seconds=other)
         self.value += other
 
-    def __isub__(self, other: Union[timedelta, int]):
+    def __isub__(self, other: timedelta | int):
         if isinstance(other, int):
             other = timedelta(seconds=other)
         self.value -= other
@@ -188,7 +188,7 @@ class KeyProperties:
         flags: int = 257,
         keytag_min: int = 0,
         keytag_max: int = 65535,
-        offset: Union[timedelta, int] = 0,
+        offset: timedelta | int = 0,
     ):
         self.name = name
         self.key = None
@@ -384,7 +384,7 @@ class Key:
     operations for KASP tests.
     """
 
-    def __init__(self, name: str, keydir: Optional[Union[str, Path]] = None):
+    def __init__(self, name: str, keydir: Optional[str | Path] = None):
         self.name = name
         if keydir is None:
             self.keydir = Path()
@@ -1663,7 +1663,7 @@ def policy_to_properties(ttl, keys: list[str]) -> list[KeyProperties]:
         line = key.split()
 
         # defaults
-        metadata: dict[str, Union[str, int]] = {}
+        metadata: dict[str, str | int] = {}
         timing: dict[str, KeyTimingMetadata] = {}
         private = True
         legacy = False
index cf6e7e93d8740764c79398b05957fc30ad65f1dd..6fea61b44939b70dab76383d0901d6d4cd2852d2 100644 (file)
@@ -13,7 +13,7 @@
 
 from dataclasses import dataclass
 from pathlib import Path
-from typing import Any, Optional, Union
+from typing import Any, Optional
 
 import jinja2
 
@@ -26,7 +26,7 @@ class TemplateEngine:
     Engine for rendering jinja2 templates in system test directories.
     """
 
-    def __init__(self, directory: Union[str, Path], env_vars=ALL):
+    def __init__(self, directory: str | Path, env_vars=ALL):
         """
         Initialize the template engine for `directory`, optionally overriding
         the `env_vars` that will be used when rendering the templates (defaults
index 1630e8215badf080894a051fad7d6e7f7a994124..a65a108031674c9f4bb6d37ff98161cfc82dfbd8 100644 (file)
@@ -9,7 +9,7 @@
 # See the COPYRIGHT file distributed with this work for additional
 # information regarding copyright ownership.
 
-from typing import NamedTuple, Optional, Union
+from typing import NamedTuple, Optional
 
 import os
 import platform
@@ -74,14 +74,14 @@ class Algorithm(NamedTuple):
 class AlgorithmSet(NamedTuple):
     """Collection of DEFAULT, ALTERNATIVE and DISABLED algorithms"""
 
-    default: Union[Algorithm, list[Algorithm]]
+    default: Algorithm | list[Algorithm]
     """DEFAULT is the algorithm for testing."""
 
-    alternative: Union[Algorithm, list[Algorithm]]
+    alternative: Algorithm | list[Algorithm]
     """ALTERNATIVE is an alternative algorithm for test cases that require more
     than one algorithm (for example algorithm rollover)."""
 
-    disabled: Union[Algorithm, list[Algorithm]]
+    disabled: Algorithm | list[Algorithm]
     """DISABLED is an algorithm that is used for tests against the
     "disable-algorithms" configuration option."""
 
index 165f6c14415f92f37cdcad0f8fb06cf867a1b038..88413d14e9b49c30d712c54f49d5187db1761f65 100644 (file)
@@ -11,7 +11,7 @@ See the COPYRIGHT file distributed with this work for additional
 information regarding copyright ownership.
 """
 
-from typing import AsyncGenerator, Union
+from typing import AsyncGenerator
 
 import dns.edns
 import dns.name
@@ -54,7 +54,7 @@ class BadGoodDnameNsHandler(QnameQtypeHandler, StaticResponseHandler):
 
 
 def _cname_rrsets(
-    qname: Union[dns.name.Name, str],
+    qname: dns.name.Name | str,
 ) -> tuple[dns.rrset.RRset, dns.rrset.RRset]:
     return (
         rrset(qname, dns.rdatatype.CNAME, f"{qname}"),
index d06b14938669fd147844892d664c08480f5376c1..285cf44b97507e665ecbc74456faba18829919a6 100644 (file)
@@ -11,7 +11,7 @@ See the COPYRIGHT file distributed with this work for additional
 information regarding copyright ownership.
 """
 
-from typing import AsyncGenerator, NamedTuple, Union
+from typing import AsyncGenerator, NamedTuple
 
 import abc
 
@@ -29,7 +29,7 @@ from isctest.asyncserver import (
 
 
 def rrset(
-    qname: Union[dns.name.Name, str],
+    qname: dns.name.Name | str,
     rtype: dns.rdatatype.RdataType,
     rdata: str,
     ttl: int = 300,
@@ -38,7 +38,7 @@ def rrset(
 
 
 def rrset_from_list(
-    qname: Union[dns.name.Name, str],
+    qname: dns.name.Name | str,
     rtype: dns.rdatatype.RdataType,
     rdata_list: list[str],
     ttl: int = 300,
@@ -46,7 +46,7 @@ def rrset_from_list(
     return dns.rrset.from_text_list(qname, ttl, dns.rdataclass.IN, rtype, rdata_list)
 
 
-def soa_rrset(qname: Union[dns.name.Name, str]) -> dns.rrset.RRset:
+def soa_rrset(qname: dns.name.Name | str) -> dns.rrset.RRset:
     return rrset(qname, dns.rdatatype.SOA, ". . 0 0 0 0 0")
 
 
@@ -56,9 +56,9 @@ class DelegationRRsets(NamedTuple):
 
 
 def delegation_rrsets(
-    owner: Union[dns.name.Name, str],
+    owner: dns.name.Name | str,
     server_number: int,
-    ns_name: Union[dns.name.Name, str, None] = None,
+    ns_name: dns.name.Name | str | None = None,
 ) -> DelegationRRsets:
     if ns_name is None:
         ns_name = f"ns.{owner}"
@@ -68,7 +68,7 @@ def delegation_rrsets(
 
 
 def setup_delegation(
-    qctx: QueryContext, owner: Union[dns.name.Name, str], server_number: int
+    qctx: QueryContext, owner: dns.name.Name | str, server_number: int
 ) -> None:
     delegation = delegation_rrsets(owner, server_number)
     qctx.response.authority.append(delegation.ns_rrset)