]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
refactor: drop use of typing.Union
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Thu, 30 May 2024 00:40:30 +0000 (02:40 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 4 Jun 2024 14:52:16 +0000 (16:52 +0200)
31 files changed:
psycopg/psycopg/_adapters_map.py
psycopg/psycopg/_connection_base.py
psycopg/psycopg/_copy_base.py
psycopg/psycopg/_pipeline.py
psycopg/psycopg/_queries.py
psycopg/psycopg/_tpc.py
psycopg/psycopg/_tz.py
psycopg/psycopg/abc.py
psycopg/psycopg/connection.py
psycopg/psycopg/connection_async.py
psycopg/psycopg/crdb/connection.py
psycopg/psycopg/dbapi20.py
psycopg/psycopg/errors.py
psycopg/psycopg/generators.py
psycopg/psycopg/pq/abc.py
psycopg/psycopg/pq/misc.py
psycopg/psycopg/sql.py
psycopg/psycopg/transaction.py
psycopg/psycopg/types/enum.py
psycopg/psycopg/types/json.py
psycopg/psycopg/types/multirange.py
psycopg/psycopg/types/net.py
psycopg/psycopg/types/numeric.py
psycopg/psycopg/types/string.py
psycopg_pool/psycopg_pool/abc.py
tests/_test_cursor.py
tests/fix_faker.py
tests/test_dns_srv.py
tests/test_typing.py
tests/typing_example.py
tools/build/copy_to_binary.py

index d76c8247fa0076d22bfc1bbd8f47880f0afd4de1..28b63956b42ca71161a9074fa71ecd81e8db7c27 100644 (file)
@@ -6,8 +6,7 @@ Mapping from types/oids to Dumpers/Loaders
 
 from __future__ import annotations
 
-from typing import Any, Dict, List, Type, Union
-from typing import cast, TYPE_CHECKING
+from typing import Any, Dict, List, Type, cast, TYPE_CHECKING
 
 from . import pq
 from . import errors as e
@@ -62,7 +61,7 @@ class AdaptersMap:
 
     types: TypesRegistry
 
-    _dumpers: Dict[PyFormat, Dict[Union[type, str], Type[Dumper]]]
+    _dumpers: Dict[PyFormat, Dict[type | str, Type[Dumper]]]
     _dumpers_by_oid: List[Dict[int, Type[Dumper]]]
     _loaders: List[Dict[int, Type[Loader]]]
 
@@ -110,9 +109,7 @@ class AdaptersMap:
     def connection(self) -> "BaseConnection[Any]" | None:
         return None
 
-    def register_dumper(
-        self, cls: Union[type, str, None], dumper: Type[Dumper]
-    ) -> None:
+    def register_dumper(self, cls: type | str | None, dumper: Type[Dumper]) -> None:
         """
         Configure the context to use `!dumper` to convert objects of type `!cls`.
 
@@ -163,7 +160,7 @@ class AdaptersMap:
 
             self._dumpers_by_oid[dumper.format][dumper.oid] = dumper
 
-    def register_loader(self, oid: Union[int, str], loader: Type["Loader"]) -> None:
+    def register_loader(self, oid: int | str, loader: Type["Loader"]) -> None:
         """
         Configure the context to use `!loader` to convert data of oid `!oid`.
 
index 2b862e8038ac4e8acb0bfd0d9791cd23cc3fe0af..9230408622c4bc1d219fdd4a4317569b9d03c2f0 100644 (file)
@@ -9,7 +9,7 @@ from __future__ import annotations
 import sys
 import logging
 from typing import Callable, Generic
-from typing import List, NamedTuple, Tuple, Union
+from typing import List, NamedTuple, Tuple
 from typing import TYPE_CHECKING
 from weakref import ref, ReferenceType
 from warnings import warn
@@ -625,7 +625,7 @@ class BaseConnection(Generic[Row]):
         self._check_tpc()
         return Xid.from_parts(format_id, gtrid, bqual)
 
-    def _tpc_begin_gen(self, xid: Union[Xid, str]) -> PQGen[None]:
+    def _tpc_begin_gen(self, xid: Xid | str) -> PQGen[None]:
         self._check_tpc()
 
         if not isinstance(xid, Xid):
@@ -661,7 +661,7 @@ class BaseConnection(Generic[Row]):
             yield from self._pipeline._sync_gen()
 
     def _tpc_finish_gen(
-        self, action: LiteralString, xid: Union[Xid, str, None]
+        self, action: LiteralString, xid: Xid | str | None
     ) -> PQGen[None]:
         fname = f"tpc_{action.lower()}()"
         if xid is None:
index 3d239ad50c7c0d1b99ccf6ada4860310aef34b08..60b455cca3ea2a6ec03c041a115ee2e8f57a3745 100644 (file)
@@ -10,8 +10,7 @@ import re
 import sys
 import struct
 from abc import ABC, abstractmethod
-from typing import Any, Dict, Generic, List, Match
-from typing import Sequence, Tuple, Union, TYPE_CHECKING
+from typing import Any, Dict, Generic, List, Match, Sequence, Tuple, TYPE_CHECKING
 
 from . import pq
 from . import adapt
@@ -117,7 +116,7 @@ class BaseCopy(Generic[ConnectionType]):
         if self._finished:
             raise TypeError("copy blocks can be used only once")
 
-    def set_types(self, types: Sequence[Union[int, str]]) -> None:
+    def set_types(self, types: Sequence[int | str]) -> None:
         """
         Set the types expected in a COPY operation.
 
@@ -203,7 +202,7 @@ class Formatter(ABC):
     def parse_row(self, data: Buffer) -> Tuple[Any, ...] | None: ...
 
     @abstractmethod
-    def write(self, buffer: Union[Buffer, str]) -> Buffer: ...
+    def write(self, buffer: Buffer | str) -> Buffer: ...
 
     @abstractmethod
     def write_row(self, row: Sequence[Any]) -> Buffer: ...
@@ -225,7 +224,7 @@ class TextFormatter(Formatter):
         else:
             return None
 
-    def write(self, buffer: Union[Buffer, str]) -> Buffer:
+    def write(self, buffer: Buffer | str) -> Buffer:
         data = self._ensure_bytes(buffer)
         self._signature_sent = True
         return data
@@ -246,7 +245,7 @@ class TextFormatter(Formatter):
         buffer, self._write_buffer = self._write_buffer, bytearray()
         return buffer
 
-    def _ensure_bytes(self, data: Union[Buffer, str]) -> Buffer:
+    def _ensure_bytes(self, data: Buffer | str) -> Buffer:
         if isinstance(data, str):
             return data.encode(self._encoding)
         else:
@@ -277,7 +276,7 @@ class BinaryFormatter(Formatter):
 
         return parse_row_binary(data, self.transformer)
 
-    def write(self, buffer: Union[Buffer, str]) -> Buffer:
+    def write(self, buffer: Buffer | str) -> Buffer:
         data = self._ensure_bytes(buffer)
         self._signature_sent = True
         return data
@@ -317,7 +316,7 @@ class BinaryFormatter(Formatter):
         buffer, self._write_buffer = self._write_buffer, bytearray()
         return buffer
 
-    def _ensure_bytes(self, data: Union[Buffer, str]) -> Buffer:
+    def _ensure_bytes(self, data: Buffer | str) -> Buffer:
         if isinstance(data, str):
             raise TypeError("cannot copy str data in binary mode: use bytes instead")
         else:
index 268872234f52f68d42960e7643cb826a396d52b5..72dfe911134effa67594aeb823226495f59b3b8a 100644 (file)
@@ -8,7 +8,7 @@ from __future__ import annotations
 
 import logging
 from types import TracebackType
-from typing import Any, List, Union, Tuple, Type, TYPE_CHECKING
+from typing import Any, List, Tuple, Type, TYPE_CHECKING
 
 from . import pq
 from . import errors as e
@@ -28,9 +28,9 @@ if TYPE_CHECKING:
     from .connection_async import AsyncConnection
 
 
-PendingResult: TypeAlias = Union[
-    None, Tuple["BaseCursor[Any, Any]", Tuple[Key, Prepare, bytes] | None]
-]
+PendingResult: TypeAlias = (
+    Tuple["BaseCursor[Any, Any]", Tuple[Key, Prepare, bytes] | None] | None
+)
 
 FATAL_ERROR = pq.ExecStatus.FATAL_ERROR
 PIPELINE_ABORTED = pq.ExecStatus.PIPELINE_ABORTED
index b868d51d03548daaac8f18d5b3b2a72cb30aada0..71a11ed09c5462de89b4fbac3d44e3e64413de71 100644 (file)
@@ -8,7 +8,7 @@ from __future__ import annotations
 
 import re
 from typing import Any, Callable, Dict, List, Mapping, Match, NamedTuple
-from typing import Sequence, Tuple, Union, TYPE_CHECKING
+from typing import Sequence, Tuple, TYPE_CHECKING
 from functools import lru_cache
 
 from . import pq
@@ -28,7 +28,7 @@ MAX_CACHED_STATEMENT_PARAMS = 50
 
 class QueryPart(NamedTuple):
     pre: bytes
-    item: Union[int, str]
+    item: int | str
     format: PyFormat
 
 
@@ -401,7 +401,7 @@ def _split_query(
             )
 
         # Index or name
-        item: Union[int, str]
+        item: int | str
         item = m.group(1).decode(encoding) if m.group(1) else i
 
         if not phtype:
index 839db682902827161d17fb6220a1ba95430ac7ed..c809d5e84086945b2958bcca41a6867503b86a76 100644 (file)
@@ -9,7 +9,6 @@ from __future__ import annotations
 import re
 import datetime as dt
 from base64 import b64encode, b64decode
-from typing import Union
 from dataclasses import dataclass, replace
 
 _re_xid = re.compile(r"^(\d+)_([^_]*)_([^_]*)$")
@@ -48,7 +47,7 @@ class Xid:
     def __len__(self) -> int:
         return 3
 
-    def __getitem__(self, index: int) -> Union[int, str, None]:
+    def __getitem__(self, index: int) -> int | str | None:
         return (self.format_id, self.gtrid, self.bqual)[index]
 
     @classmethod
index b7ae957398e0e425587b55d1e82932c5a76aebc3..acc38c8eb19b3a398bf085ac5b52cf6eaaa5db48 100644 (file)
@@ -7,7 +7,7 @@ Timezone utility functions.
 from __future__ import annotations
 
 import logging
-from typing import Dict, Union
+from typing import Dict
 from datetime import timezone, tzinfo
 
 from .pq.abc import PGconn
@@ -15,7 +15,7 @@ from ._compat import ZoneInfo
 
 logger = logging.getLogger("psycopg")
 
-_timezones: Dict[Union[None, bytes], tzinfo] = {
+_timezones: Dict[bytes | None, tzinfo] = {
     None: timezone.utc,
     b"UTC": timezone.utc,
 }
index 74bc3f1be690ea14cb8e785e1bc91067917595db..0e6061fc4e09a1f5bb85efaf096f39a87a8e756c 100644 (file)
@@ -7,15 +7,14 @@ Protocol objects representing different implementations of the same classes.
 from __future__ import annotations
 
 from typing import Any, Dict, Callable, Generator, Mapping
-from typing import List, Protocol, Sequence, Tuple, Union
-from typing import TYPE_CHECKING
+from typing import List, Protocol, Sequence, Tuple, TYPE_CHECKING
 
 from . import pq
 from ._enums import PyFormat as PyFormat
 from ._compat import LiteralString, TypeAlias, TypeVar
 
 if TYPE_CHECKING:
-    from . import sql
+    from . import sql  # noqa: F401
     from .rows import Row, RowMaker
     from .pq.abc import PGresult
     from .waiting import Wait, Ready
@@ -25,14 +24,14 @@ if TYPE_CHECKING:
 NoneType: type = type(None)
 
 # An object implementing the buffer protocol
-Buffer: TypeAlias = Union[bytes, bytearray, memoryview]
+Buffer: TypeAlias = bytes | bytearray | memoryview
 
-Query: TypeAlias = Union[LiteralString, bytes, "sql.SQL", "sql.Composed"]
-Params: TypeAlias = Union[Sequence[Any], Mapping[str, Any]]
+Query: TypeAlias = LiteralString | bytes | "sql.SQL" | "sql.Composed"
+Params: TypeAlias = Sequence[Any] | Mapping[str, Any]
 ConnectionType = TypeVar("ConnectionType", bound="BaseConnection[Any]")
 PipelineCommand: TypeAlias = Callable[[], None]
-DumperKey: TypeAlias = Union[type, Tuple["DumperKey", ...]]
-ConnParam: TypeAlias = Union[str, int, None]
+DumperKey: TypeAlias = type | Tuple["DumperKey", ...]
+ConnParam: TypeAlias = str | int | None
 ConnDict: TypeAlias = Dict[str, ConnParam]
 ConnMapping: TypeAlias = Mapping[str, ConnParam]
 
@@ -41,13 +40,13 @@ ConnMapping: TypeAlias = Mapping[str, ConnParam]
 
 RV = TypeVar("RV")
 
-PQGenConn: TypeAlias = Generator[Tuple[int, "Wait"], Union["Ready", int], RV]
+PQGenConn: TypeAlias = Generator[Tuple[int, "Wait"], "Ready" | int, RV]
 """Generator for processes where the connection file number can change.
 
 This can happen in connection and reset, but not in normal querying.
 """
 
-PQGen: TypeAlias = Generator["Wait", Union["Ready", int], RV]
+PQGen: TypeAlias = Generator["Wait", "Ready" | int, RV]
 """Generator for processes where the connection file number won't change.
 """
 
index 7adad16c401cefe79a871b4f347525aaf2ab8233..b4f71857dd0dffc4668037debb59fe0585efc7e9 100644 (file)
@@ -13,7 +13,7 @@ import logging
 from time import monotonic
 from types import TracebackType
 from typing import Any, Generator, Iterator, List
-from typing import Type, Union, cast, overload, TYPE_CHECKING
+from typing import Type, cast, overload, TYPE_CHECKING
 from contextlib import contextmanager
 
 from . import pq
@@ -208,7 +208,7 @@ class Connection(BaseConnection[Row]):
         row_factory: RowFactory[Any] | None = None,
         scrollable: bool | None = None,
         withhold: bool = False,
-    ) -> Union[Cursor[Any], ServerCursor[Any]]:
+    ) -> Cursor[Any] | ServerCursor[Any]:
         """
         Return a new `Cursor` to send commands and queries to the connection.
         """
@@ -217,7 +217,7 @@ class Connection(BaseConnection[Row]):
         if not row_factory:
             row_factory = self.row_factory
 
-        cur: Union[Cursor[Any], ServerCursor[Any]]
+        cur: Cursor[Any] | ServerCursor[Any]
         if name:
             cur = self.server_cursor_factory(
                 self,
@@ -438,7 +438,7 @@ class Connection(BaseConnection[Row]):
         with self.lock:
             self.wait(self._set_deferrable_gen(value))
 
-    def tpc_begin(self, xid: Union[Xid, str]) -> None:
+    def tpc_begin(self, xid: Xid | str) -> None:
         """
         Begin a TPC transaction with the given transaction ID `!xid`.
         """
@@ -455,14 +455,14 @@ class Connection(BaseConnection[Row]):
         except e.ObjectNotInPrerequisiteState as ex:
             raise e.NotSupportedError(str(ex)) from None
 
-    def tpc_commit(self, xid: Union[Xid, str, None] = None) -> None:
+    def tpc_commit(self, xid: Xid | str | None = None) -> None:
         """
         Commit a prepared two-phase transaction.
         """
         with self.lock:
             self.wait(self._tpc_finish_gen("COMMIT", xid))
 
-    def tpc_rollback(self, xid: Union[Xid, str, None] = None) -> None:
+    def tpc_rollback(self, xid: Xid | str | None = None) -> None:
         """
         Roll back a prepared two-phase transaction.
         """
index cf3f7230dd3c653177bdf5ed389edc772dd6dc06..37421579cd128d71aac41a5dfcecbc644f702432 100644 (file)
@@ -10,7 +10,7 @@ import logging
 from time import monotonic
 from types import TracebackType
 from typing import Any, AsyncGenerator, AsyncIterator, List
-from typing import Type, Union, cast, overload, TYPE_CHECKING
+from typing import Type, cast, overload, TYPE_CHECKING
 from contextlib import asynccontextmanager
 
 from . import pq
@@ -224,7 +224,7 @@ class AsyncConnection(BaseConnection[Row]):
         row_factory: AsyncRowFactory[Any] | None = None,
         scrollable: bool | None = None,
         withhold: bool = False,
-    ) -> Union[AsyncCursor[Any], AsyncServerCursor[Any]]:
+    ) -> AsyncCursor[Any] | AsyncServerCursor[Any]:
         """
         Return a new `AsyncCursor` to send commands and queries to the connection.
         """
@@ -233,7 +233,7 @@ class AsyncConnection(BaseConnection[Row]):
         if not row_factory:
             row_factory = self.row_factory
 
-        cur: Union[AsyncCursor[Any], AsyncServerCursor[Any]]
+        cur: AsyncCursor[Any] | AsyncServerCursor[Any]
         if name:
             cur = self.server_cursor_factory(
                 self,
@@ -478,7 +478,7 @@ class AsyncConnection(BaseConnection[Row]):
                 f" please use 'await .set_{attribute}()' instead."
             )
 
-    async def tpc_begin(self, xid: Union[Xid, str]) -> None:
+    async def tpc_begin(self, xid: Xid | str) -> None:
         """
         Begin a TPC transaction with the given transaction ID `!xid`.
         """
@@ -495,14 +495,14 @@ class AsyncConnection(BaseConnection[Row]):
         except e.ObjectNotInPrerequisiteState as ex:
             raise e.NotSupportedError(str(ex)) from None
 
-    async def tpc_commit(self, xid: Union[Xid, str, None] = None) -> None:
+    async def tpc_commit(self, xid: Xid | str | None = None) -> None:
         """
         Commit a prepared two-phase transaction.
         """
         async with self.lock:
             await self.wait(self._tpc_finish_gen("COMMIT", xid))
 
-    async def tpc_rollback(self, xid: Union[Xid, str, None] = None) -> None:
+    async def tpc_rollback(self, xid: Xid | str | None = None) -> None:
         """
         Roll back a prepared two-phase transaction.
         """
index 950b6d66e40b1de5f0f64994654a9a08c2387d92..5792e6f6b410c25443435fe4ddd2cc3fc138b654 100644 (file)
@@ -7,7 +7,7 @@ CockroachDB-specific connections.
 from __future__ import annotations
 
 import re
-from typing import Any, Union, TYPE_CHECKING
+from typing import Any, TYPE_CHECKING
 
 from .. import errors as e
 from ..rows import Row
@@ -26,9 +26,7 @@ class _CrdbConnectionMixin:
     pgconn: "PGconn"
 
     @classmethod
-    def is_crdb(
-        cls, conn: Union[Connection[Any], AsyncConnection[Any], "PGconn"]
-    ) -> bool:
+    def is_crdb(cls, conn: Connection[Any] | AsyncConnection[Any] | "PGconn") -> bool:
         """
         Return `!True` if the server connected to `!conn` is CockroachDB.
         """
index 2536f3bcd563b6fdba608fb8573a217092761687..204e5712c7b00da12dfb0834e6ae963e66755d95 100644 (file)
@@ -9,7 +9,7 @@ from __future__ import annotations
 import time
 import datetime as dt
 from math import floor
-from typing import Any, Sequence, Union
+from typing import Any, Sequence
 
 from . import _oids
 from .abc import AdaptContext, Buffer
@@ -78,7 +78,7 @@ class Binary:
 
 
 class BinaryBinaryDumper(BytesBinaryDumper):
-    def dump(self, obj: Union[Buffer, Binary]) -> Buffer | None:
+    def dump(self, obj: Buffer | Binary) -> Buffer | None:
         if isinstance(obj, Binary):
             return super().dump(obj.obj)
         else:
@@ -86,7 +86,7 @@ class BinaryBinaryDumper(BytesBinaryDumper):
 
 
 class BinaryTextDumper(BytesDumper):
-    def dump(self, obj: Union[Buffer, Binary]) -> Buffer | None:
+    def dump(self, obj: Buffer | Binary) -> Buffer | None:
         if isinstance(obj, Binary):
             return super().dump(obj.obj)
         else:
index 082976be9d0b3810281123857b1b29386928dd1b..b90b478a4cd1159e616d17b39e33cdd5e487bc62 100644 (file)
@@ -22,7 +22,7 @@ from __future__ import annotations
 
 from dataclasses import dataclass, field, fields
 from typing import Any, Callable, Dict, List, NoReturn, Sequence, Tuple, Type
-from typing import Union, TYPE_CHECKING
+from typing import TYPE_CHECKING
 from asyncio import CancelledError
 
 from .pq.abc import PGconn, PGresult
@@ -32,7 +32,7 @@ from ._compat import TypeAlias, TypeGuard
 if TYPE_CHECKING:
     from .pq.misc import PGnotify, ConninfoOption
 
-ErrorInfo: TypeAlias = Union[None, PGresult, Dict[int, bytes | None]]
+ErrorInfo: TypeAlias = None | PGresult | Dict[int, bytes | None]
 
 _sqlcodes: Dict[str, "Type[Error]"] = {}
 
@@ -299,7 +299,7 @@ class Error(Exception):
         """
         return Diagnostic(self._info, encoding=self._encoding)
 
-    def __reduce__(self) -> Union[str, Tuple[Any, ...]]:
+    def __reduce__(self) -> str | Tuple[Any, ...]:
         res = super().__reduce__()
         if isinstance(res, tuple) and len(res) >= 3:
             # To make the exception picklable
@@ -514,7 +514,7 @@ class Diagnostic:
 
         return None
 
-    def __reduce__(self) -> Union[str, Tuple[Any, ...]]:
+    def __reduce__(self) -> str | Tuple[Any, ...]:
         res = super().__reduce__()
         if isinstance(res, tuple) and len(res) >= 3:
             res[2]["_info"] = _info_to_dict(self._info)
index 823c655358e2ac756616954eca5a71a1a5727d2d..f90addf17ad3e647866f4b00633a6c416d0371c6 100644 (file)
@@ -24,7 +24,7 @@ from __future__ import annotations
 
 import logging
 from time import monotonic
-from typing import List, Union
+from typing import List
 
 from . import pq
 from . import errors as e
@@ -305,7 +305,7 @@ def notifies(pgconn: PGconn) -> PQGen[List[pq.PGnotify]]:
     return ns
 
 
-def copy_from(pgconn: PGconn) -> PQGen[Union[memoryview, PGresult]]:
+def copy_from(pgconn: PGconn) -> PQGen[memoryview | PGresult]:
     while True:
         nbytes, data = pgconn.get_copy_data(1)
         if nbytes != 0:
index 0f6c29edf71d6c9e4c2bd3d9ad422020010a08bb..fadd71e35d29ee388bb30ba5d040227bb6d39a51 100644 (file)
@@ -6,8 +6,7 @@ Protocol objects to represent objects exposed by different pq implementations.
 
 from __future__ import annotations
 
-from typing import Any, Callable, List, Protocol, Sequence, Tuple
-from typing import Union, TYPE_CHECKING
+from typing import Any, Callable, List, Protocol, Sequence, Tuple, TYPE_CHECKING
 
 from ._enums import Format, Trace
 from .._compat import TypeAlias
@@ -16,7 +15,7 @@ if TYPE_CHECKING:
     from .misc import PGnotify, ConninfoOption, PGresAttDesc
 
 # An object implementing the buffer protocol (ish)
-Buffer: TypeAlias = Union[bytes, bytearray, memoryview]
+Buffer: TypeAlias = bytes | bytearray | memoryview
 
 
 class PGconn(Protocol):
index 793c95d89c53a4324bcc689f46eba7886c711823..6a5294ba0798be313989b501602e7f7a3c07b930 100644 (file)
@@ -11,7 +11,7 @@ import os
 import sys
 import logging
 import ctypes.util
-from typing import cast, NamedTuple, Union
+from typing import cast, NamedTuple
 
 from .abc import PGconn, PGresult
 from ._enums import ConnStatus, TransactionStatus, PipelineStatus
@@ -76,7 +76,7 @@ def find_libpq_full_path() -> str | None:
     return libname
 
 
-def error_message(obj: Union[PGconn, PGresult], encoding: str = "utf8") -> str:
+def error_message(obj: PGconn | PGresult, encoding: str = "utf8") -> str:
     """
     Return an error message from a `PGconn` or `PGresult`.
 
index c27b6657a8fc63cb77f43759458c475f657fbe77..89647d2f89010de3c48e9aef01247f02733de798 100644 (file)
@@ -9,7 +9,7 @@ from __future__ import annotations
 import codecs
 import string
 from abc import ABC, abstractmethod
-from typing import Any, Iterator, Iterable, List, Sequence, Union
+from typing import Any, Iterator, Iterable, List, Sequence
 
 from .pq import Escaping
 from .abc import AdaptContext
@@ -146,7 +146,7 @@ class Composed(Composable):
         else:
             return NotImplemented
 
-    def join(self, joiner: Union["SQL", LiteralString]) -> "Composed":
+    def join(self, joiner: "SQL" | LiteralString) -> "Composed":
         """
         Return a new `!Composed` interposing the `!joiner` with the `!Composed` items.
 
@@ -435,7 +435,7 @@ class Placeholder(Composable):
 
     """
 
-    def __init__(self, name: str = "", format: Union[str, PyFormat] = PyFormat.AUTO):
+    def __init__(self, name: str = "", format: str | PyFormat = PyFormat.AUTO):
         super().__init__(name)
         if not isinstance(name, str):
             raise TypeError(f"expected string as name, got {name!r}")
index da9f9804893dfdb0bd7b0756b0ee3eff27f90025..7aa7b11714aca1f41b9a84f030913903c51a609b 100644 (file)
@@ -9,7 +9,7 @@ from __future__ import annotations
 import logging
 
 from types import TracebackType
-from typing import Generic, Iterator, Type, Union, TYPE_CHECKING
+from typing import Generic, Iterator, Type, TYPE_CHECKING
 
 from . import pq
 from . import sql
@@ -41,10 +41,7 @@ class Rollback(Exception):
 
     __module__ = "psycopg"
 
-    def __init__(
-        self,
-        transaction: Union["Transaction", "AsyncTransaction", None] = None,
-    ):
+    def __init__(self, transaction: "Transaction" | "AsyncTransaction" | None = None):
         self.transaction = transaction
 
     def __repr__(self) -> str:
index 0ae3d951779558862899994236278891ea7009e9..d8551906af1712491e5635695f4e58bb4bbde2ed 100644 (file)
@@ -6,7 +6,7 @@ from __future__ import annotations
 
 from enum import Enum
 from typing import Any, Dict, Generic, Mapping, Sequence
-from typing import Tuple, Type, Union, cast, TYPE_CHECKING
+from typing import Tuple, Type, cast, TYPE_CHECKING
 
 from .. import sql
 from .. import postgres
@@ -25,7 +25,7 @@ E = TypeVar("E", bound=Enum)
 
 EnumDumpMap: TypeAlias = Dict[E, bytes]
 EnumLoadMap: TypeAlias = Dict[bytes, E]
-EnumMapping: TypeAlias = Union[Mapping[E, str], Sequence[Tuple[E, str]], None]
+EnumMapping: TypeAlias = Mapping[E, str] | Sequence[Tuple[E, str]] | None
 
 # Hashable versions
 _HEnumDumpMap: TypeAlias = Tuple[Tuple[E, bytes], ...]
index 8e2422de48d52e15736a2166fbaa515e65e18c83..55944decc5db29e58e20cbed35781af948941472 100644 (file)
@@ -7,7 +7,7 @@ Adapters for JSON types.
 from __future__ import annotations
 
 import json
-from typing import Any, Callable, Dict, Tuple, Type, Union
+from typing import Any, Callable, Dict, Tuple, Type
 
 from .. import abc
 from .. import _oids
@@ -17,8 +17,8 @@ from ..adapt import Buffer, Dumper, Loader, PyFormat, AdaptersMap
 from ..errors import DataError
 from .._compat import cache
 
-JsonDumpsFunction = Callable[[Any], Union[str, bytes]]
-JsonLoadsFunction = Callable[[Union[str, bytes]], Any]
+JsonDumpsFunction = Callable[[Any], str | bytes]
+JsonLoadsFunction = Callable[[str | bytes], Any]
 
 
 def set_json_dumps(
index 5704c80015cb1aae30b3ba40d09d80e05237e513..67d5e97402dcac98ec8a20d69306de70f3f1f746 100644 (file)
@@ -8,7 +8,7 @@ from __future__ import annotations
 
 from decimal import Decimal
 from typing import Any, Generic, List, Iterable, MutableSequence
-from typing import Type, Union, overload, TYPE_CHECKING
+from typing import Type, overload, TYPE_CHECKING
 from datetime import date, datetime
 
 from .. import sql
@@ -98,7 +98,7 @@ class Multirange(MutableSequence[Range[T]]):
     @overload
     def __getitem__(self, index: slice) -> "Multirange[T]": ...
 
-    def __getitem__(self, index: Union[int, slice]) -> "Union[Range[T],Multirange[T]]":
+    def __getitem__(self, index: int | slice) -> "Range[T] | Multirange[T]":
         if isinstance(index, int):
             return self._ranges[index]
         else:
@@ -114,9 +114,7 @@ class Multirange(MutableSequence[Range[T]]):
     def __setitem__(self, index: slice, value: Iterable[Range[T]]) -> None: ...
 
     def __setitem__(
-        self,
-        index: Union[int, slice],
-        value: Union[Range[T], Iterable[Range[T]]],
+        self, index: int | slice, value: Range[T] | Iterable[Range[T]]
     ) -> None:
         if isinstance(index, int):
             self._check_type(value)
@@ -127,7 +125,7 @@ class Multirange(MutableSequence[Range[T]]):
             value = map(self._check_type, value)
             self._ranges[index] = value
 
-    def __delitem__(self, index: Union[int, slice]) -> None:
+    def __delitem__(self, index: int | slice) -> None:
         del self._ranges[index]
 
     def insert(self, index: int, value: Range[T]) -> None:
index 593b272e9ad46a00133b98516e1f187654610662..854ae98e8deed5bf39ade674fab95bc145901542 100644 (file)
@@ -6,7 +6,7 @@ Adapters for network types.
 
 from __future__ import annotations
 
-from typing import Callable, Type, Union, TYPE_CHECKING
+from typing import Callable, Type, TYPE_CHECKING
 
 from .. import _oids
 from ..pq import Format
@@ -17,9 +17,9 @@ from .._compat import TypeAlias
 if TYPE_CHECKING:
     import ipaddress
 
-Address: TypeAlias = Union["ipaddress.IPv4Address", "ipaddress.IPv6Address"]
-Interface: TypeAlias = Union["ipaddress.IPv4Interface", "ipaddress.IPv6Interface"]
-Network: TypeAlias = Union["ipaddress.IPv4Network", "ipaddress.IPv6Network"]
+Address: TypeAlias = "ipaddress.IPv4Address" | "ipaddress.IPv6Address"
+Interface: TypeAlias = "ipaddress.IPv4Interface" | "ipaddress.IPv6Interface"
+Network: TypeAlias = "ipaddress.IPv4Network" | "ipaddress.IPv6Network"
 
 # These objects will be imported lazily
 ip_address: Callable[[str], Address] = None  # type: ignore[assignment]
@@ -96,7 +96,7 @@ class InetBinaryDumper(_AIBinaryDumper, _LazyIpaddress):
         super().__init__(cls, context)
         self._ensure_module()
 
-    def dump(self, obj: Union[Address, Interface]) -> Buffer | None:
+    def dump(self, obj: Address | Interface) -> Buffer | None:
         packed = obj.packed
         family = PGSQL_AF_INET if obj.version == 4 else PGSQL_AF_INET6
         if isinstance(obj, (IPv4Interface, IPv6Interface)):
@@ -126,7 +126,7 @@ class _LazyIpaddressLoader(Loader, _LazyIpaddress):
 
 
 class InetLoader(_LazyIpaddressLoader):
-    def load(self, data: Buffer) -> Union[Address, Interface]:
+    def load(self, data: Buffer) -> Address | Interface:
         if isinstance(data, memoryview):
             data = bytes(data)
 
@@ -139,7 +139,7 @@ class InetLoader(_LazyIpaddressLoader):
 class InetBinaryLoader(_LazyIpaddressLoader):
     format = Format.BINARY
 
-    def load(self, data: Buffer) -> Union[Address, Interface]:
+    def load(self, data: Buffer) -> Address | Interface:
         if isinstance(data, memoryview):
             data = bytes(data)
 
index c3ece299150115c6820b1158c56e414e466dfd6d..fc22e81bf1fb233dd8d3767418f35bad96146c06 100644 (file)
@@ -10,8 +10,7 @@ import sys
 import struct
 from abc import ABC, abstractmethod
 from math import log
-from typing import Any, Callable, DefaultDict, Dict, Tuple, Union
-from typing import cast, TYPE_CHECKING
+from typing import Any, Callable, DefaultDict, Dict, Tuple, cast, TYPE_CHECKING
 from decimal import Decimal, DefaultContext, Context
 
 from .. import _oids
@@ -371,7 +370,7 @@ class _MixedNumericDumper(Dumper, ABC):
     oid = _oids.NUMERIC_OID
 
     # If numpy is available, the dumped object might be a numpy integer too
-    int_classes: Union[type, Tuple[type, ...]] = ()
+    int_classes: type | Tuple[type, ...] = ()
 
     def __init__(self, cls: type, context: AdaptContext | None = None):
         super().__init__(cls, context)
@@ -387,11 +386,11 @@ class _MixedNumericDumper(Dumper, ABC):
                 _MixedNumericDumper.int_classes = int
 
     @abstractmethod
-    def dump(self, obj: Union[Decimal, int, "numpy.integer[Any]"]) -> Buffer | None: ...
+    def dump(self, obj: Decimal | int | "numpy.integer[Any]") -> Buffer | None: ...
 
 
 class NumericDumper(_MixedNumericDumper):
-    def dump(self, obj: Union[Decimal, int, "numpy.integer[Any]"]) -> Buffer | None:
+    def dump(self, obj: Decimal | int | "numpy.integer[Any]") -> Buffer | None:
         if isinstance(obj, self.int_classes):
             return str(obj).encode()
         elif isinstance(obj, Decimal):
@@ -405,7 +404,7 @@ class NumericDumper(_MixedNumericDumper):
 class NumericBinaryDumper(_MixedNumericDumper):
     format = Format.BINARY
 
-    def dump(self, obj: Union[Decimal, int, "numpy.integer[Any]"]) -> Buffer | None:
+    def dump(self, obj: Decimal | int | "numpy.integer[Any]") -> Buffer | None:
         if type(obj) is int:
             return dump_int_to_numeric_binary(obj)
         elif isinstance(obj, Decimal):
@@ -426,7 +425,7 @@ def dump_decimal_to_text(obj: Decimal) -> bytes:
         return str(obj).encode()
 
 
-def dump_decimal_to_numeric_binary(obj: Decimal) -> Union[bytearray, bytes]:
+def dump_decimal_to_numeric_binary(obj: Decimal) -> bytearray | bytes:
     sign, digits, exp = obj.as_tuple()
     if exp == "n" or exp == "N":
         return NUMERIC_NAN_BIN
index 2d17a94fcb7b6819449cee03e73d65b855e1735f..cb3b24e12035f84277f958b7e155c2f3ebc90341 100644 (file)
@@ -6,7 +6,7 @@ Adapters for textual types.
 
 from __future__ import annotations
 
-from typing import Union, TYPE_CHECKING
+from typing import TYPE_CHECKING
 
 from .. import _oids
 from ..pq import Format, Escaping
@@ -110,7 +110,7 @@ class TextLoader(Loader):
         enc = conn_encoding(self.connection)
         self._encoding = enc if enc != "ascii" else ""
 
-    def load(self, data: Buffer) -> Union[bytes, str]:
+    def load(self, data: Buffer) -> bytes | str:
         if self._encoding:
             if isinstance(data, memoryview):
                 data = bytes(data)
index 07209a64c2854b7e968a75cef78752dfecfee58b..5f04d316efc5867fb50beda8274ba2c9a79ebe5a 100644 (file)
@@ -6,7 +6,7 @@ Types used in the psycopg_pool package
 
 from __future__ import annotations
 
-from typing import Any, Awaitable, Callable, Union, TYPE_CHECKING
+from typing import Any, Awaitable, Callable, TYPE_CHECKING
 
 from ._compat import TypeAlias, TypeVar
 
@@ -26,7 +26,7 @@ AsyncConnectionCB: TypeAlias = Callable[[ACT], Awaitable[None]]
 
 # Callbacks to pass the pool to on connection failure
 ConnectFailedCB: TypeAlias = Callable[["ConnectionPool[Any]"], None]
-AsyncConnectFailedCB: TypeAlias = Union[
-    Callable[["AsyncConnectionPool[Any]"], None],
-    Callable[["AsyncConnectionPool[Any]"], Awaitable[None]],
-]
+AsyncConnectFailedCB: TypeAlias = (
+    Callable[["AsyncConnectionPool[Any]"], None]
+    | Callable[["AsyncConnectionPool[Any]"], Awaitable[None]]
+)
index 4f773bea450256497ace13ffcd7e3ace5d7637de..c0f463d5f814f4cd00127202efc40882b8e6e73a 100644 (file)
@@ -2,8 +2,10 @@
 Support module for test_cursor[_async].py
 """
 
+from __future__ import annotations
+
 import re
-from typing import Any, List, Match, Union
+from typing import Any, List, Match
 
 import pytest
 import psycopg
@@ -47,7 +49,7 @@ def ph(cur: Any, query: str) -> str:
 
 
 def my_row_factory(
-    cursor: Union[psycopg.Cursor[List[str]], psycopg.AsyncCursor[List[str]]]
+    cursor: psycopg.Cursor[List[str]] | psycopg.AsyncCursor[List[str]],
 ) -> RowMaker[List[str]]:
     if cursor.description is not None:
         titles = [c.name for c in cursor.description]
index 0256a94d5cac9ee93844f15e9d604c52b30b52b7..7821429e9a6a163fb0d8a1e21acc5b0975605459 100644 (file)
@@ -6,7 +6,7 @@ import ipaddress
 from math import isnan
 from uuid import UUID
 from random import choice, random, randrange
-from typing import Any, List, Set, Tuple, Union
+from typing import Any, List, Set, Tuple
 from decimal import Decimal
 from contextlib import contextmanager, asynccontextmanager
 
@@ -199,7 +199,7 @@ class Faker:
         )
 
     def choose_schema(self, ncols=20):
-        schema: List[Union[Tuple[type, ...], type]] = []
+        schema: List[Tuple[type, ...] | type] = []
         while len(schema) < ncols:
             s = self.make_schema(choice(self.types))
             if s is not None:
@@ -245,7 +245,7 @@ class Faker:
 
         return rv
 
-    def make_schema(self, cls: type) -> Union[Tuple[type, ...], type, None]:
+    def make_schema(self, cls: type) -> Tuple[type, ...] | type | None:
         """Create a schema spec from a Python type.
 
         A schema specifies what Postgres type to generate when a Python type
@@ -669,7 +669,7 @@ class Faker:
             return spec[0](empty=True)
 
         while True:
-            bounds: List[Union[Any, None]] = []
+            bounds: List[Any] = []
             while len(bounds) < 2:
                 if random() < no_bound_chance:
                     bounds.append(None)
@@ -730,7 +730,7 @@ class Faker:
             want = type(want)(want.lower, want.upper, want.bounds[0] + ")")
 
         # Normalise discrete ranges
-        unit: Union[dt.timedelta, int, None]
+        unit: dt.timedelta | int | None
         if spec[1] is dt.date:
             unit = dt.timedelta(days=1)
         elif type(spec[1]) is type and issubclass(spec[1], int):
index 5244271314da21c9163a982b75146d30b8e3725d..1ab4535cb011adca18a980808dfb7a0832b1badf 100644 (file)
@@ -1,4 +1,6 @@
-from typing import List, Union
+from __future__ import annotations
+
+from typing import List
 
 import pytest
 
@@ -132,7 +134,7 @@ def get_fake_srv_function(monkeypatch):
             ans = fake_hosts[qname, rdtype]
         except KeyError:
             raise DNSException(f"unknown test host: {qname} {rdtype}")
-        rv: List[Union[A, SRV]] = []
+        rv: List[A | SRV] = []
 
         if rdtype == "A":
             for entry in ans:
index 76555aac329b05d46e396df924db9b4f1da06966..35bd48c26da7beb8496f2b792d9d300a1e91c74a 100644 (file)
@@ -377,8 +377,7 @@ def _test_reveal(stmts, type, mypy):
     stmts = "\n".join(f"    {line}" for line in stmts.splitlines())
 
     src = f"""\
-from typing import Any, Callable, Dict, List, NamedTuple, Sequence
-from typing import Tuple, Union
+from typing import Any, Callable, Dict, List, NamedTuple, Sequence, Tuple
 import psycopg
 from psycopg import rows
 
@@ -387,7 +386,7 @@ class Thing:
         self.kwargs = kwargs
 
 def thing_row(
-    cur: Union[psycopg.Cursor[Any], psycopg.AsyncCursor[Any]],
+    cur: psycopg.Cursor[Any] | psycopg.AsyncCursor[Any],
 ) -> Callable[[Sequence[Any]], Thing]:
     assert cur.description
     names = [d.name for d in cur.description]
index 139c92a216853a5ff6a71d6caac6310a424af8cb..68b9a1365b5ab90b29f430feb5a36f6b9d2b91ed 100644 (file)
@@ -3,14 +3,14 @@
 from __future__ import annotations
 
 from dataclasses import dataclass
-from typing import Any, Callable, Dict, Sequence, Tuple, Union
+from typing import Any, Callable, Dict, Sequence, Tuple
 
 from psycopg import Connection, Cursor, ServerCursor, connect, rows
 from psycopg import AsyncConnection, AsyncCursor, AsyncServerCursor
 
 
 def int_row_factory(
-    cursor: Union[Cursor[Any], AsyncCursor[Any]]
+    cursor: Cursor[Any] | AsyncCursor[Any],
 ) -> Callable[[Sequence[int]], int]:
     return lambda values: values[0] if values else 42
 
@@ -22,7 +22,7 @@ class Person:
 
     @classmethod
     def row_factory(
-        cls, cursor: Union[Cursor[Any], AsyncCursor[Any]]
+        cls, cursor: Cursor[Any] | AsyncCursor[Any]
     ) -> Callable[[Sequence[str]], Person]:
         def mkrow(values: Sequence[str]) -> Person:
             name, address = values
index 7cab25cb857dc985825cf2f74e2d5fec73da0261..4834d6b8045721ab868c6892513856bf25eb6b0f 100755 (executable)
@@ -2,11 +2,12 @@
 
 # Create the psycopg-binary package by renaming and patching psycopg-c
 
+from __future__ import annotations
+
 import os
 import re
 import shutil
 from pathlib import Path
-from typing import Union
 
 curdir = Path(__file__).parent
 pdir = curdir / "../.."
@@ -16,7 +17,7 @@ if target.exists():
     raise Exception(f"path {target} already exists")
 
 
-def sed_i(pattern: str, repl: str, filename: Union[str, Path]) -> None:
+def sed_i(pattern: str, repl: str, filename: str | Path) -> None:
     with open(filename, "rb") as f:
         data = f.read()
     newdata = re.sub(pattern.encode("utf8"), repl.encode("utf8"), data)
@@ -29,9 +30,7 @@ shutil.copytree(pdir / "psycopg_c", target)
 shutil.move(str(target / "psycopg_c"), str(target / "psycopg_binary"))
 shutil.move(str(target / "README-binary.rst"), str(target / "README.rst"))
 sed_i("psycopg-c", "psycopg-binary", target / "setup.cfg")
-sed_i(
-    r"__impl__\s*=.*", '__impl__ = "binary"', target / "psycopg_binary/pq.pyx"
-)
+sed_i(r"__impl__\s*=.*", '__impl__ = "binary"', target / "psycopg_binary/pq.pyx")
 for dirpath, dirnames, filenames in os.walk(target):
     for filename in filenames:
         if os.path.splitext(filename)[1] not in (".pyx", ".pxd", ".py"):