from __future__ import annotations
-from typing import Any, Dict, List, Type, Union
-from typing import cast, TYPE_CHECKING
+from typing import Any, Dict, List, Type, cast, TYPE_CHECKING
from . import pq
from . import errors as e
types: TypesRegistry
- _dumpers: Dict[PyFormat, Dict[Union[type, str], Type[Dumper]]]
+ _dumpers: Dict[PyFormat, Dict[type | str, Type[Dumper]]]
_dumpers_by_oid: List[Dict[int, Type[Dumper]]]
_loaders: List[Dict[int, Type[Loader]]]
def connection(self) -> "BaseConnection[Any]" | None:
return None
- def register_dumper(
- self, cls: Union[type, str, None], dumper: Type[Dumper]
- ) -> None:
+ def register_dumper(self, cls: type | str | None, dumper: Type[Dumper]) -> None:
"""
Configure the context to use `!dumper` to convert objects of type `!cls`.
self._dumpers_by_oid[dumper.format][dumper.oid] = dumper
- def register_loader(self, oid: Union[int, str], loader: Type["Loader"]) -> None:
+ def register_loader(self, oid: int | str, loader: Type["Loader"]) -> None:
"""
Configure the context to use `!loader` to convert data of oid `!oid`.
import sys
import logging
from typing import Callable, Generic
-from typing import List, NamedTuple, Tuple, Union
+from typing import List, NamedTuple, Tuple
from typing import TYPE_CHECKING
from weakref import ref, ReferenceType
from warnings import warn
self._check_tpc()
return Xid.from_parts(format_id, gtrid, bqual)
- def _tpc_begin_gen(self, xid: Union[Xid, str]) -> PQGen[None]:
+ def _tpc_begin_gen(self, xid: Xid | str) -> PQGen[None]:
self._check_tpc()
if not isinstance(xid, Xid):
yield from self._pipeline._sync_gen()
def _tpc_finish_gen(
- self, action: LiteralString, xid: Union[Xid, str, None]
+ self, action: LiteralString, xid: Xid | str | None
) -> PQGen[None]:
fname = f"tpc_{action.lower()}()"
if xid is None:
import sys
import struct
from abc import ABC, abstractmethod
-from typing import Any, Dict, Generic, List, Match
-from typing import Sequence, Tuple, Union, TYPE_CHECKING
+from typing import Any, Dict, Generic, List, Match, Sequence, Tuple, TYPE_CHECKING
from . import pq
from . import adapt
if self._finished:
raise TypeError("copy blocks can be used only once")
- def set_types(self, types: Sequence[Union[int, str]]) -> None:
+ def set_types(self, types: Sequence[int | str]) -> None:
"""
Set the types expected in a COPY operation.
def parse_row(self, data: Buffer) -> Tuple[Any, ...] | None: ...
@abstractmethod
- def write(self, buffer: Union[Buffer, str]) -> Buffer: ...
+ def write(self, buffer: Buffer | str) -> Buffer: ...
@abstractmethod
def write_row(self, row: Sequence[Any]) -> Buffer: ...
else:
return None
- def write(self, buffer: Union[Buffer, str]) -> Buffer:
+ def write(self, buffer: Buffer | str) -> Buffer:
data = self._ensure_bytes(buffer)
self._signature_sent = True
return data
buffer, self._write_buffer = self._write_buffer, bytearray()
return buffer
- def _ensure_bytes(self, data: Union[Buffer, str]) -> Buffer:
+ def _ensure_bytes(self, data: Buffer | str) -> Buffer:
if isinstance(data, str):
return data.encode(self._encoding)
else:
return parse_row_binary(data, self.transformer)
- def write(self, buffer: Union[Buffer, str]) -> Buffer:
+ def write(self, buffer: Buffer | str) -> Buffer:
data = self._ensure_bytes(buffer)
self._signature_sent = True
return data
buffer, self._write_buffer = self._write_buffer, bytearray()
return buffer
- def _ensure_bytes(self, data: Union[Buffer, str]) -> Buffer:
+ def _ensure_bytes(self, data: Buffer | str) -> Buffer:
if isinstance(data, str):
raise TypeError("cannot copy str data in binary mode: use bytes instead")
else:
import logging
from types import TracebackType
-from typing import Any, List, Union, Tuple, Type, TYPE_CHECKING
+from typing import Any, List, Tuple, Type, TYPE_CHECKING
from . import pq
from . import errors as e
from .connection_async import AsyncConnection
-PendingResult: TypeAlias = Union[
- None, Tuple["BaseCursor[Any, Any]", Tuple[Key, Prepare, bytes] | None]
-]
+PendingResult: TypeAlias = (
+ Tuple["BaseCursor[Any, Any]", Tuple[Key, Prepare, bytes] | None] | None
+)
FATAL_ERROR = pq.ExecStatus.FATAL_ERROR
PIPELINE_ABORTED = pq.ExecStatus.PIPELINE_ABORTED
import re
from typing import Any, Callable, Dict, List, Mapping, Match, NamedTuple
-from typing import Sequence, Tuple, Union, TYPE_CHECKING
+from typing import Sequence, Tuple, TYPE_CHECKING
from functools import lru_cache
from . import pq
class QueryPart(NamedTuple):
pre: bytes
- item: Union[int, str]
+ item: int | str
format: PyFormat
)
# Index or name
- item: Union[int, str]
+ item: int | str
item = m.group(1).decode(encoding) if m.group(1) else i
if not phtype:
import re
import datetime as dt
from base64 import b64encode, b64decode
-from typing import Union
from dataclasses import dataclass, replace
_re_xid = re.compile(r"^(\d+)_([^_]*)_([^_]*)$")
def __len__(self) -> int:
return 3
- def __getitem__(self, index: int) -> Union[int, str, None]:
+ def __getitem__(self, index: int) -> int | str | None:
return (self.format_id, self.gtrid, self.bqual)[index]
@classmethod
from __future__ import annotations
import logging
-from typing import Dict, Union
+from typing import Dict
from datetime import timezone, tzinfo
from .pq.abc import PGconn
logger = logging.getLogger("psycopg")
-_timezones: Dict[Union[None, bytes], tzinfo] = {
+_timezones: Dict[bytes | None, tzinfo] = {
None: timezone.utc,
b"UTC": timezone.utc,
}
from __future__ import annotations
from typing import Any, Dict, Callable, Generator, Mapping
-from typing import List, Protocol, Sequence, Tuple, Union
-from typing import TYPE_CHECKING
+from typing import List, Protocol, Sequence, Tuple, TYPE_CHECKING
from . import pq
from ._enums import PyFormat as PyFormat
from ._compat import LiteralString, TypeAlias, TypeVar
if TYPE_CHECKING:
- from . import sql
+ from . import sql # noqa: F401
from .rows import Row, RowMaker
from .pq.abc import PGresult
from .waiting import Wait, Ready
NoneType: type = type(None)
# An object implementing the buffer protocol
-Buffer: TypeAlias = Union[bytes, bytearray, memoryview]
+Buffer: TypeAlias = bytes | bytearray | memoryview
-Query: TypeAlias = Union[LiteralString, bytes, "sql.SQL", "sql.Composed"]
-Params: TypeAlias = Union[Sequence[Any], Mapping[str, Any]]
+Query: TypeAlias = LiteralString | bytes | "sql.SQL" | "sql.Composed"
+Params: TypeAlias = Sequence[Any] | Mapping[str, Any]
ConnectionType = TypeVar("ConnectionType", bound="BaseConnection[Any]")
PipelineCommand: TypeAlias = Callable[[], None]
-DumperKey: TypeAlias = Union[type, Tuple["DumperKey", ...]]
-ConnParam: TypeAlias = Union[str, int, None]
+DumperKey: TypeAlias = type | Tuple["DumperKey", ...]
+ConnParam: TypeAlias = str | int | None
ConnDict: TypeAlias = Dict[str, ConnParam]
ConnMapping: TypeAlias = Mapping[str, ConnParam]
RV = TypeVar("RV")
-PQGenConn: TypeAlias = Generator[Tuple[int, "Wait"], Union["Ready", int], RV]
+PQGenConn: TypeAlias = Generator[Tuple[int, "Wait"], "Ready" | int, RV]
"""Generator for processes where the connection file number can change.
This can happen in connection and reset, but not in normal querying.
"""
-PQGen: TypeAlias = Generator["Wait", Union["Ready", int], RV]
+PQGen: TypeAlias = Generator["Wait", "Ready" | int, RV]
"""Generator for processes where the connection file number won't change.
"""
from time import monotonic
from types import TracebackType
from typing import Any, Generator, Iterator, List
-from typing import Type, Union, cast, overload, TYPE_CHECKING
+from typing import Type, cast, overload, TYPE_CHECKING
from contextlib import contextmanager
from . import pq
row_factory: RowFactory[Any] | None = None,
scrollable: bool | None = None,
withhold: bool = False,
- ) -> Union[Cursor[Any], ServerCursor[Any]]:
+ ) -> Cursor[Any] | ServerCursor[Any]:
"""
Return a new `Cursor` to send commands and queries to the connection.
"""
if not row_factory:
row_factory = self.row_factory
- cur: Union[Cursor[Any], ServerCursor[Any]]
+ cur: Cursor[Any] | ServerCursor[Any]
if name:
cur = self.server_cursor_factory(
self,
with self.lock:
self.wait(self._set_deferrable_gen(value))
- def tpc_begin(self, xid: Union[Xid, str]) -> None:
+ def tpc_begin(self, xid: Xid | str) -> None:
"""
Begin a TPC transaction with the given transaction ID `!xid`.
"""
except e.ObjectNotInPrerequisiteState as ex:
raise e.NotSupportedError(str(ex)) from None
- def tpc_commit(self, xid: Union[Xid, str, None] = None) -> None:
+ def tpc_commit(self, xid: Xid | str | None = None) -> None:
"""
Commit a prepared two-phase transaction.
"""
with self.lock:
self.wait(self._tpc_finish_gen("COMMIT", xid))
- def tpc_rollback(self, xid: Union[Xid, str, None] = None) -> None:
+ def tpc_rollback(self, xid: Xid | str | None = None) -> None:
"""
Roll back a prepared two-phase transaction.
"""
from time import monotonic
from types import TracebackType
from typing import Any, AsyncGenerator, AsyncIterator, List
-from typing import Type, Union, cast, overload, TYPE_CHECKING
+from typing import Type, cast, overload, TYPE_CHECKING
from contextlib import asynccontextmanager
from . import pq
row_factory: AsyncRowFactory[Any] | None = None,
scrollable: bool | None = None,
withhold: bool = False,
- ) -> Union[AsyncCursor[Any], AsyncServerCursor[Any]]:
+ ) -> AsyncCursor[Any] | AsyncServerCursor[Any]:
"""
Return a new `AsyncCursor` to send commands and queries to the connection.
"""
if not row_factory:
row_factory = self.row_factory
- cur: Union[AsyncCursor[Any], AsyncServerCursor[Any]]
+ cur: AsyncCursor[Any] | AsyncServerCursor[Any]
if name:
cur = self.server_cursor_factory(
self,
f" please use 'await .set_{attribute}()' instead."
)
- async def tpc_begin(self, xid: Union[Xid, str]) -> None:
+ async def tpc_begin(self, xid: Xid | str) -> None:
"""
Begin a TPC transaction with the given transaction ID `!xid`.
"""
except e.ObjectNotInPrerequisiteState as ex:
raise e.NotSupportedError(str(ex)) from None
- async def tpc_commit(self, xid: Union[Xid, str, None] = None) -> None:
+ async def tpc_commit(self, xid: Xid | str | None = None) -> None:
"""
Commit a prepared two-phase transaction.
"""
async with self.lock:
await self.wait(self._tpc_finish_gen("COMMIT", xid))
- async def tpc_rollback(self, xid: Union[Xid, str, None] = None) -> None:
+ async def tpc_rollback(self, xid: Xid | str | None = None) -> None:
"""
Roll back a prepared two-phase transaction.
"""
from __future__ import annotations
import re
-from typing import Any, Union, TYPE_CHECKING
+from typing import Any, TYPE_CHECKING
from .. import errors as e
from ..rows import Row
pgconn: "PGconn"
@classmethod
- def is_crdb(
- cls, conn: Union[Connection[Any], AsyncConnection[Any], "PGconn"]
- ) -> bool:
+ def is_crdb(cls, conn: Connection[Any] | AsyncConnection[Any] | "PGconn") -> bool:
"""
Return `!True` if the server connected to `!conn` is CockroachDB.
"""
import time
import datetime as dt
from math import floor
-from typing import Any, Sequence, Union
+from typing import Any, Sequence
from . import _oids
from .abc import AdaptContext, Buffer
class BinaryBinaryDumper(BytesBinaryDumper):
- def dump(self, obj: Union[Buffer, Binary]) -> Buffer | None:
+ def dump(self, obj: Buffer | Binary) -> Buffer | None:
if isinstance(obj, Binary):
return super().dump(obj.obj)
else:
class BinaryTextDumper(BytesDumper):
- def dump(self, obj: Union[Buffer, Binary]) -> Buffer | None:
+ def dump(self, obj: Buffer | Binary) -> Buffer | None:
if isinstance(obj, Binary):
return super().dump(obj.obj)
else:
from dataclasses import dataclass, field, fields
from typing import Any, Callable, Dict, List, NoReturn, Sequence, Tuple, Type
-from typing import Union, TYPE_CHECKING
+from typing import TYPE_CHECKING
from asyncio import CancelledError
from .pq.abc import PGconn, PGresult
if TYPE_CHECKING:
from .pq.misc import PGnotify, ConninfoOption
-ErrorInfo: TypeAlias = Union[None, PGresult, Dict[int, bytes | None]]
+ErrorInfo: TypeAlias = None | PGresult | Dict[int, bytes | None]
_sqlcodes: Dict[str, "Type[Error]"] = {}
"""
return Diagnostic(self._info, encoding=self._encoding)
- def __reduce__(self) -> Union[str, Tuple[Any, ...]]:
+ def __reduce__(self) -> str | Tuple[Any, ...]:
res = super().__reduce__()
if isinstance(res, tuple) and len(res) >= 3:
# To make the exception picklable
return None
- def __reduce__(self) -> Union[str, Tuple[Any, ...]]:
+ def __reduce__(self) -> str | Tuple[Any, ...]:
res = super().__reduce__()
if isinstance(res, tuple) and len(res) >= 3:
res[2]["_info"] = _info_to_dict(self._info)
import logging
from time import monotonic
-from typing import List, Union
+from typing import List
from . import pq
from . import errors as e
return ns
-def copy_from(pgconn: PGconn) -> PQGen[Union[memoryview, PGresult]]:
+def copy_from(pgconn: PGconn) -> PQGen[memoryview | PGresult]:
while True:
nbytes, data = pgconn.get_copy_data(1)
if nbytes != 0:
from __future__ import annotations
-from typing import Any, Callable, List, Protocol, Sequence, Tuple
-from typing import Union, TYPE_CHECKING
+from typing import Any, Callable, List, Protocol, Sequence, Tuple, TYPE_CHECKING
from ._enums import Format, Trace
from .._compat import TypeAlias
from .misc import PGnotify, ConninfoOption, PGresAttDesc
# An object implementing the buffer protocol (ish)
-Buffer: TypeAlias = Union[bytes, bytearray, memoryview]
+Buffer: TypeAlias = bytes | bytearray | memoryview
class PGconn(Protocol):
import sys
import logging
import ctypes.util
-from typing import cast, NamedTuple, Union
+from typing import cast, NamedTuple
from .abc import PGconn, PGresult
from ._enums import ConnStatus, TransactionStatus, PipelineStatus
return libname
-def error_message(obj: Union[PGconn, PGresult], encoding: str = "utf8") -> str:
+def error_message(obj: PGconn | PGresult, encoding: str = "utf8") -> str:
"""
Return an error message from a `PGconn` or `PGresult`.
import codecs
import string
from abc import ABC, abstractmethod
-from typing import Any, Iterator, Iterable, List, Sequence, Union
+from typing import Any, Iterator, Iterable, List, Sequence
from .pq import Escaping
from .abc import AdaptContext
else:
return NotImplemented
- def join(self, joiner: Union["SQL", LiteralString]) -> "Composed":
+ def join(self, joiner: "SQL" | LiteralString) -> "Composed":
"""
Return a new `!Composed` interposing the `!joiner` with the `!Composed` items.
"""
- def __init__(self, name: str = "", format: Union[str, PyFormat] = PyFormat.AUTO):
+ def __init__(self, name: str = "", format: str | PyFormat = PyFormat.AUTO):
super().__init__(name)
if not isinstance(name, str):
raise TypeError(f"expected string as name, got {name!r}")
import logging
from types import TracebackType
-from typing import Generic, Iterator, Type, Union, TYPE_CHECKING
+from typing import Generic, Iterator, Type, TYPE_CHECKING
from . import pq
from . import sql
__module__ = "psycopg"
- def __init__(
- self,
- transaction: Union["Transaction", "AsyncTransaction", None] = None,
- ):
+ def __init__(self, transaction: "Transaction" | "AsyncTransaction" | None = None):
self.transaction = transaction
def __repr__(self) -> str:
from enum import Enum
from typing import Any, Dict, Generic, Mapping, Sequence
-from typing import Tuple, Type, Union, cast, TYPE_CHECKING
+from typing import Tuple, Type, cast, TYPE_CHECKING
from .. import sql
from .. import postgres
EnumDumpMap: TypeAlias = Dict[E, bytes]
EnumLoadMap: TypeAlias = Dict[bytes, E]
-EnumMapping: TypeAlias = Union[Mapping[E, str], Sequence[Tuple[E, str]], None]
+EnumMapping: TypeAlias = Mapping[E, str] | Sequence[Tuple[E, str]] | None
# Hashable versions
_HEnumDumpMap: TypeAlias = Tuple[Tuple[E, bytes], ...]
from __future__ import annotations
import json
-from typing import Any, Callable, Dict, Tuple, Type, Union
+from typing import Any, Callable, Dict, Tuple, Type
from .. import abc
from .. import _oids
from ..errors import DataError
from .._compat import cache
-JsonDumpsFunction = Callable[[Any], Union[str, bytes]]
-JsonLoadsFunction = Callable[[Union[str, bytes]], Any]
+JsonDumpsFunction = Callable[[Any], str | bytes]
+JsonLoadsFunction = Callable[[str | bytes], Any]
def set_json_dumps(
from decimal import Decimal
from typing import Any, Generic, List, Iterable, MutableSequence
-from typing import Type, Union, overload, TYPE_CHECKING
+from typing import Type, overload, TYPE_CHECKING
from datetime import date, datetime
from .. import sql
@overload
def __getitem__(self, index: slice) -> "Multirange[T]": ...
- def __getitem__(self, index: Union[int, slice]) -> "Union[Range[T],Multirange[T]]":
+ def __getitem__(self, index: int | slice) -> "Range[T] | Multirange[T]":
if isinstance(index, int):
return self._ranges[index]
else:
def __setitem__(self, index: slice, value: Iterable[Range[T]]) -> None: ...
def __setitem__(
- self,
- index: Union[int, slice],
- value: Union[Range[T], Iterable[Range[T]]],
+ self, index: int | slice, value: Range[T] | Iterable[Range[T]]
) -> None:
if isinstance(index, int):
self._check_type(value)
value = map(self._check_type, value)
self._ranges[index] = value
- def __delitem__(self, index: Union[int, slice]) -> None:
+ def __delitem__(self, index: int | slice) -> None:
del self._ranges[index]
def insert(self, index: int, value: Range[T]) -> None:
from __future__ import annotations
-from typing import Callable, Type, Union, TYPE_CHECKING
+from typing import Callable, Type, TYPE_CHECKING
from .. import _oids
from ..pq import Format
if TYPE_CHECKING:
import ipaddress
-Address: TypeAlias = Union["ipaddress.IPv4Address", "ipaddress.IPv6Address"]
-Interface: TypeAlias = Union["ipaddress.IPv4Interface", "ipaddress.IPv6Interface"]
-Network: TypeAlias = Union["ipaddress.IPv4Network", "ipaddress.IPv6Network"]
+Address: TypeAlias = "ipaddress.IPv4Address" | "ipaddress.IPv6Address"
+Interface: TypeAlias = "ipaddress.IPv4Interface" | "ipaddress.IPv6Interface"
+Network: TypeAlias = "ipaddress.IPv4Network" | "ipaddress.IPv6Network"
# These objects will be imported lazily
ip_address: Callable[[str], Address] = None # type: ignore[assignment]
super().__init__(cls, context)
self._ensure_module()
- def dump(self, obj: Union[Address, Interface]) -> Buffer | None:
+ def dump(self, obj: Address | Interface) -> Buffer | None:
packed = obj.packed
family = PGSQL_AF_INET if obj.version == 4 else PGSQL_AF_INET6
if isinstance(obj, (IPv4Interface, IPv6Interface)):
class InetLoader(_LazyIpaddressLoader):
- def load(self, data: Buffer) -> Union[Address, Interface]:
+ def load(self, data: Buffer) -> Address | Interface:
if isinstance(data, memoryview):
data = bytes(data)
class InetBinaryLoader(_LazyIpaddressLoader):
format = Format.BINARY
- def load(self, data: Buffer) -> Union[Address, Interface]:
+ def load(self, data: Buffer) -> Address | Interface:
if isinstance(data, memoryview):
data = bytes(data)
import struct
from abc import ABC, abstractmethod
from math import log
-from typing import Any, Callable, DefaultDict, Dict, Tuple, Union
-from typing import cast, TYPE_CHECKING
+from typing import Any, Callable, DefaultDict, Dict, Tuple, cast, TYPE_CHECKING
from decimal import Decimal, DefaultContext, Context
from .. import _oids
oid = _oids.NUMERIC_OID
# If numpy is available, the dumped object might be a numpy integer too
- int_classes: Union[type, Tuple[type, ...]] = ()
+ int_classes: type | Tuple[type, ...] = ()
def __init__(self, cls: type, context: AdaptContext | None = None):
super().__init__(cls, context)
_MixedNumericDumper.int_classes = int
@abstractmethod
- def dump(self, obj: Union[Decimal, int, "numpy.integer[Any]"]) -> Buffer | None: ...
+ def dump(self, obj: Decimal | int | "numpy.integer[Any]") -> Buffer | None: ...
class NumericDumper(_MixedNumericDumper):
- def dump(self, obj: Union[Decimal, int, "numpy.integer[Any]"]) -> Buffer | None:
+ def dump(self, obj: Decimal | int | "numpy.integer[Any]") -> Buffer | None:
if isinstance(obj, self.int_classes):
return str(obj).encode()
elif isinstance(obj, Decimal):
class NumericBinaryDumper(_MixedNumericDumper):
format = Format.BINARY
- def dump(self, obj: Union[Decimal, int, "numpy.integer[Any]"]) -> Buffer | None:
+ def dump(self, obj: Decimal | int | "numpy.integer[Any]") -> Buffer | None:
if type(obj) is int:
return dump_int_to_numeric_binary(obj)
elif isinstance(obj, Decimal):
return str(obj).encode()
-def dump_decimal_to_numeric_binary(obj: Decimal) -> Union[bytearray, bytes]:
+def dump_decimal_to_numeric_binary(obj: Decimal) -> bytearray | bytes:
sign, digits, exp = obj.as_tuple()
if exp == "n" or exp == "N":
return NUMERIC_NAN_BIN
from __future__ import annotations
-from typing import Union, TYPE_CHECKING
+from typing import TYPE_CHECKING
from .. import _oids
from ..pq import Format, Escaping
enc = conn_encoding(self.connection)
self._encoding = enc if enc != "ascii" else ""
- def load(self, data: Buffer) -> Union[bytes, str]:
+ def load(self, data: Buffer) -> bytes | str:
if self._encoding:
if isinstance(data, memoryview):
data = bytes(data)
from __future__ import annotations
-from typing import Any, Awaitable, Callable, Union, TYPE_CHECKING
+from typing import Any, Awaitable, Callable, TYPE_CHECKING
from ._compat import TypeAlias, TypeVar
# Callbacks to pass the pool to on connection failure
ConnectFailedCB: TypeAlias = Callable[["ConnectionPool[Any]"], None]
-AsyncConnectFailedCB: TypeAlias = Union[
- Callable[["AsyncConnectionPool[Any]"], None],
- Callable[["AsyncConnectionPool[Any]"], Awaitable[None]],
-]
+AsyncConnectFailedCB: TypeAlias = (
+ Callable[["AsyncConnectionPool[Any]"], None]
+ | Callable[["AsyncConnectionPool[Any]"], Awaitable[None]]
+)
Support module for test_cursor[_async].py
"""
+from __future__ import annotations
+
import re
-from typing import Any, List, Match, Union
+from typing import Any, List, Match
import pytest
import psycopg
def my_row_factory(
- cursor: Union[psycopg.Cursor[List[str]], psycopg.AsyncCursor[List[str]]]
+ cursor: psycopg.Cursor[List[str]] | psycopg.AsyncCursor[List[str]],
) -> RowMaker[List[str]]:
if cursor.description is not None:
titles = [c.name for c in cursor.description]
from math import isnan
from uuid import UUID
from random import choice, random, randrange
-from typing import Any, List, Set, Tuple, Union
+from typing import Any, List, Set, Tuple
from decimal import Decimal
from contextlib import contextmanager, asynccontextmanager
)
def choose_schema(self, ncols=20):
- schema: List[Union[Tuple[type, ...], type]] = []
+ schema: List[Tuple[type, ...] | type] = []
while len(schema) < ncols:
s = self.make_schema(choice(self.types))
if s is not None:
return rv
- def make_schema(self, cls: type) -> Union[Tuple[type, ...], type, None]:
+ def make_schema(self, cls: type) -> Tuple[type, ...] | type | None:
"""Create a schema spec from a Python type.
A schema specifies what Postgres type to generate when a Python type
return spec[0](empty=True)
while True:
- bounds: List[Union[Any, None]] = []
+ bounds: List[Any] = []
while len(bounds) < 2:
if random() < no_bound_chance:
bounds.append(None)
want = type(want)(want.lower, want.upper, want.bounds[0] + ")")
# Normalise discrete ranges
- unit: Union[dt.timedelta, int, None]
+ unit: dt.timedelta | int | None
if spec[1] is dt.date:
unit = dt.timedelta(days=1)
elif type(spec[1]) is type and issubclass(spec[1], int):
-from typing import List, Union
+from __future__ import annotations
+
+from typing import List
import pytest
ans = fake_hosts[qname, rdtype]
except KeyError:
raise DNSException(f"unknown test host: {qname} {rdtype}")
- rv: List[Union[A, SRV]] = []
+ rv: List[A | SRV] = []
if rdtype == "A":
for entry in ans:
stmts = "\n".join(f" {line}" for line in stmts.splitlines())
src = f"""\
-from typing import Any, Callable, Dict, List, NamedTuple, Sequence
-from typing import Tuple, Union
+from typing import Any, Callable, Dict, List, NamedTuple, Sequence, Tuple
import psycopg
from psycopg import rows
self.kwargs = kwargs
def thing_row(
- cur: Union[psycopg.Cursor[Any], psycopg.AsyncCursor[Any]],
+ cur: psycopg.Cursor[Any] | psycopg.AsyncCursor[Any],
) -> Callable[[Sequence[Any]], Thing]:
assert cur.description
names = [d.name for d in cur.description]
from __future__ import annotations
from dataclasses import dataclass
-from typing import Any, Callable, Dict, Sequence, Tuple, Union
+from typing import Any, Callable, Dict, Sequence, Tuple
from psycopg import Connection, Cursor, ServerCursor, connect, rows
from psycopg import AsyncConnection, AsyncCursor, AsyncServerCursor
def int_row_factory(
- cursor: Union[Cursor[Any], AsyncCursor[Any]]
+ cursor: Cursor[Any] | AsyncCursor[Any],
) -> Callable[[Sequence[int]], int]:
return lambda values: values[0] if values else 42
@classmethod
def row_factory(
- cls, cursor: Union[Cursor[Any], AsyncCursor[Any]]
+ cls, cursor: Cursor[Any] | AsyncCursor[Any]
) -> Callable[[Sequence[str]], Person]:
def mkrow(values: Sequence[str]) -> Person:
name, address = values
# Create the psycopg-binary package by renaming and patching psycopg-c
+from __future__ import annotations
+
import os
import re
import shutil
from pathlib import Path
-from typing import Union
curdir = Path(__file__).parent
pdir = curdir / "../.."
raise Exception(f"path {target} already exists")
-def sed_i(pattern: str, repl: str, filename: Union[str, Path]) -> None:
+def sed_i(pattern: str, repl: str, filename: str | Path) -> None:
with open(filename, "rb") as f:
data = f.read()
newdata = re.sub(pattern.encode("utf8"), repl.encode("utf8"), data)
shutil.move(str(target / "psycopg_c"), str(target / "psycopg_binary"))
shutil.move(str(target / "README-binary.rst"), str(target / "README.rst"))
sed_i("psycopg-c", "psycopg-binary", target / "setup.cfg")
-sed_i(
- r"__impl__\s*=.*", '__impl__ = "binary"', target / "psycopg_binary/pq.pyx"
-)
+sed_i(r"__impl__\s*=.*", '__impl__ = "binary"', target / "psycopg_binary/pq.pyx")
for dirpath, dirnames, filenames in os.walk(target):
for filename in filenames:
if os.path.splitext(filename)[1] not in (".pyx", ".pxd", ".py"):