import sys
import logging
-from typing import Callable, Generic, NamedTuple, Tuple, TYPE_CHECKING
+from typing import Callable, Generic, NamedTuple, TYPE_CHECKING
from weakref import ref, ReferenceType
from warnings import warn
from functools import partial
self._closed = False # closed by an explicit close()
self._prepared: PrepareManager = PrepareManager()
- self._tpc: Tuple[Xid, bool] | None = None # xid, prepared
+ self._tpc: tuple[Xid, bool] | None = None # xid, prepared
wself = ref(self)
pgconn.notice_handler = partial(BaseConnection._notice_handler, wself)
from abc import ABC, abstractmethod
from types import TracebackType
-from typing import Any, Iterator, Tuple, Sequence, TYPE_CHECKING
+from typing import Any, Iterator, Sequence, TYPE_CHECKING
from . import pq
from . import errors as e
"""
return self.connection.wait(self._read_gen())
- def rows(self) -> Iterator[Tuple[Any, ...]]:
+ def rows(self) -> Iterator[tuple[Any, ...]]:
"""
Iterate on the result of a :sql:`COPY TO` operation record by record.
break
yield record
- def read_row(self) -> Tuple[Any, ...] | None:
+ def read_row(self) -> tuple[Any, ...] | None:
"""
Read a parsed row of data from a table after a :sql:`COPY TO` operation.
from abc import ABC, abstractmethod
from types import TracebackType
-from typing import Any, AsyncIterator, Tuple, Sequence, TYPE_CHECKING
+from typing import Any, AsyncIterator, Sequence, TYPE_CHECKING
from . import pq
from . import errors as e
"""
return await self.connection.wait(self._read_gen())
- async def rows(self) -> AsyncIterator[Tuple[Any, ...]]:
+ async def rows(self) -> AsyncIterator[tuple[Any, ...]]:
"""
Iterate on the result of a :sql:`COPY TO` operation record by record.
break
yield record
- async def read_row(self) -> Tuple[Any, ...] | None:
+ async def read_row(self) -> tuple[Any, ...] | None:
"""
Read a parsed row of data from a table after a :sql:`COPY TO` operation.
import sys
import struct
from abc import ABC, abstractmethod
-from typing import Any, Generic, Match, Sequence, Tuple, TYPE_CHECKING
+from typing import Any, Generic, Match, Sequence, TYPE_CHECKING
from . import pq
from . import adapt
self.cursor._rowcount = nrows if nrows is not None else -1
return memoryview(b"")
- def _read_row_gen(self) -> PQGen[Tuple[Any, ...] | None]:
+ def _read_row_gen(self) -> PQGen[tuple[Any, ...] | None]:
data = yield from self._read_gen()
if not data:
return None
self._row_mode = False # true if the user is using write_row()
@abstractmethod
- def parse_row(self, data: Buffer) -> Tuple[Any, ...] | None: ...
+ def parse_row(self, data: Buffer) -> tuple[Any, ...] | None: ...
@abstractmethod
def write(self, buffer: Buffer | str) -> Buffer: ...
super().__init__(transformer)
self._encoding = encoding
- def parse_row(self, data: Buffer) -> Tuple[Any, ...] | None:
+ def parse_row(self, data: Buffer) -> tuple[Any, ...] | None:
if data:
return parse_row_text(data, self.transformer)
else:
super().__init__(transformer)
self._signature_sent = False
- def parse_row(self, data: Buffer) -> Tuple[Any, ...] | None:
+ def parse_row(self, data: Buffer) -> tuple[Any, ...] | None:
if not self._signature_sent:
if data[: len(_binary_signature)] != _binary_signature:
raise e.DataError(
return out
-def _parse_row_text(data: Buffer, tx: Transformer) -> Tuple[Any, ...]:
+def _parse_row_text(data: Buffer, tx: Transformer) -> tuple[Any, ...]:
if not isinstance(data, bytes):
data = bytes(data)
fields = data.split(b"\t")
return tx.load_sequence(row)
-def _parse_row_binary(data: Buffer, tx: Transformer) -> Tuple[Any, ...]:
+def _parse_row_binary(data: Buffer, tx: Transformer) -> tuple[Any, ...]:
row: list[Buffer | None] = []
nfields = _unpack_int2(data, 0)[0]
pos = 2
from __future__ import annotations
from functools import partial
-from typing import Any, Generic, Iterable, NoReturn, Sequence, Tuple
+from typing import Any, Generic, Iterable, NoReturn, Sequence
from typing import TYPE_CHECKING
from . import pq
def _get_prepared(
self, pgq: PostgresQuery, prepare: bool | None = None
- ) -> Tuple[Prepare, bytes]:
+ ) -> tuple[Prepare, bytes]:
return self._conn._prepared.get(pgq, prepare)
def _stream_send_gen(
import logging
from types import TracebackType
-from typing import Any, Tuple, TYPE_CHECKING
+from typing import Any, TYPE_CHECKING
from . import pq
from . import errors as e
PendingResult: TypeAlias = (
- Tuple["BaseCursor[Any, Any]", Tuple[Key, Prepare, bytes] | None] | None
+ tuple[BaseCursor[Any, Any], tuple[Key, Prepare, bytes] | None] | None
)
FATAL_ERROR = pq.ExecStatus.FATAL_ERROR
from __future__ import annotations
from enum import IntEnum, auto
-from typing import Sequence, Tuple, TYPE_CHECKING
+from typing import Sequence, TYPE_CHECKING
from collections import OrderedDict
from . import pq
from .pq.abc import PGresult
from ._connection_base import BaseConnection
-Key: TypeAlias = Tuple[bytes, Tuple[int, ...]]
+Key: TypeAlias = tuple[bytes, tuple[int, ...]]
COMMAND_OK = pq.ExecStatus.COMMAND_OK
TUPLES_OK = pq.ExecStatus.TUPLES_OK
def get(
self, query: PostgresQuery, prepare: bool | None = None
- ) -> Tuple[Prepare, bytes]:
+ ) -> tuple[Prepare, bytes]:
"""
Check if a query is prepared, tell back whether to prepare it.
"""
from __future__ import annotations
-from typing import Any, Sequence, Tuple, DefaultDict, TYPE_CHECKING
+from typing import Any, Sequence, DefaultDict, TYPE_CHECKING
from collections import defaultdict
from . import pq
_oid_dumpers _oid_types _row_dumpers _row_loaders
""".split()
- types: Tuple[int, ...] | None
+ types: tuple[int, ...] | None
formats: list[pq.Format] | None
_adapters: "AdaptersMap"
# mapping fmt, oid -> Dumper instance
# Not often used, so create it only if needed.
- self._oid_dumpers: Tuple[OidDumperCache, OidDumperCache] | None
+ self._oid_dumpers: tuple[OidDumperCache, OidDumperCache] | None
self._oid_dumpers = None
# mapping fmt, oid -> Loader instance
- self._loaders: Tuple[LoaderCache, LoaderCache] = ({}, {})
+ self._loaders: tuple[LoaderCache, LoaderCache] = ({}, {})
self._row_dumpers: list[abc.Dumper] | None = None
return make_row(record)
- def load_sequence(self, record: Sequence[Buffer | None]) -> Tuple[Any, ...]:
+ def load_sequence(self, record: Sequence[Buffer | None]) -> tuple[Any, ...]:
if len(self._row_loaders) != len(record):
raise e.ProgrammingError(
f"cannot load sequence of {len(record)} items:"
import re
from typing import Any, Callable, Mapping, Match, NamedTuple
-from typing import Sequence, Tuple, TYPE_CHECKING
+from typing import Sequence, TYPE_CHECKING
from functools import lru_cache
from . import pq
self.params: Sequence[Buffer | None] | None = None
# these are tuples so they can be used as keys e.g. in prepared stmts
- self.types: Tuple[int, ...] = ()
+ self.types: tuple[int, ...] = ()
# The format requested by the user and the ones to really pass Postgres
self._want_formats: list[PyFormat] | None = None
# The type of the _query2pg() and _query2pg_nocache() methods
_Query2Pg: TypeAlias = Callable[
- [bytes, str], Tuple[bytes, list[PyFormat], list[str] | None, list[QueryPart]]
+ [bytes, str], tuple[bytes, list[PyFormat], list[str] | None, list[QueryPart]]
]
def _query2pg_nocache(
query: bytes, encoding: str
-) -> Tuple[bytes, list[PyFormat], list[str] | None, list[QueryPart]]:
+) -> tuple[bytes, list[PyFormat], list[str] | None, list[QueryPart]]:
"""
Convert Python query and params into something Postgres understands.
formats.append(part.format)
elif isinstance(parts[0].item, str):
- seen: dict[str, Tuple[bytes, PyFormat]] = {}
+ seen: dict[str, tuple[bytes, PyFormat]] = {}
order = []
for part in parts[:-1]:
assert isinstance(part.item, str)
_Query2PgClient: TypeAlias = Callable[
- [bytes, str], Tuple[bytes, list[str] | None, list[QueryPart]]
+ [bytes, str], tuple[bytes, list[str] | None, list[QueryPart]]
]
def _query2pg_client_nocache(
query: bytes, encoding: str
-) -> Tuple[bytes, list[str] | None, list[QueryPart]]:
+) -> tuple[bytes, list[str] | None, list[QueryPart]]:
"""
Convert Python query and params into a template to perform client-side binding
"""
chunks.append(b"%s")
elif isinstance(parts[0].item, str):
- seen: dict[str, Tuple[bytes, PyFormat]] = {}
+ seen: dict[str, tuple[bytes, PyFormat]] = {}
order = []
for part in parts[:-1]:
assert isinstance(part.item, str)
def _split_query(
query: bytes, encoding: str = "ascii", collapse_double_percent: bool = True
) -> list[QueryPart]:
- parts: list[Tuple[bytes, Match[bytes] | None]] = []
+ parts: list[tuple[bytes, Match[bytes] | None]] = []
cur = 0
# pairs [(fragment, match], with the last match None
from __future__ import annotations
import struct
-from typing import Callable, cast, Protocol, Tuple
+from typing import Callable, cast, Protocol
from . import errors as e
from .abc import Buffer
from ._compat import TypeAlias
PackInt: TypeAlias = Callable[[int], bytes]
-UnpackInt: TypeAlias = Callable[[Buffer], Tuple[int]]
+UnpackInt: TypeAlias = Callable[[Buffer], tuple[int]]
PackFloat: TypeAlias = Callable[[float], bytes]
-UnpackFloat: TypeAlias = Callable[[Buffer], Tuple[float]]
+UnpackFloat: TypeAlias = Callable[[Buffer], tuple[float]]
class UnpackLen(Protocol):
- def __call__(self, data: Buffer, start: int | None) -> Tuple[int]: ...
+ def __call__(self, data: Buffer, start: int | None) -> tuple[int]: ...
pack_int2 = cast(PackInt, struct.Struct("!h").pack)
from __future__ import annotations
from typing import Any, Callable, Generator, Mapping
-from typing import Protocol, Sequence, Tuple, TYPE_CHECKING
+from typing import Protocol, Sequence, TYPE_CHECKING
from . import pq
from ._enums import PyFormat as PyFormat
Params: TypeAlias = Sequence[Any] | Mapping[str, Any]
ConnectionType = TypeVar("ConnectionType", bound="BaseConnection[Any]")
PipelineCommand: TypeAlias = Callable[[], None]
-DumperKey: TypeAlias = type | Tuple["DumperKey", ...]
+DumperKey: TypeAlias = type | tuple["DumperKey", ...]
ConnParam: TypeAlias = str | int | None
ConnDict: TypeAlias = dict[str, ConnParam]
ConnMapping: TypeAlias = Mapping[str, ConnParam]
RV = TypeVar("RV")
-PQGenConn: TypeAlias = Generator[Tuple[int, "Wait"], "Ready" | int, RV]
+PQGenConn: TypeAlias = Generator[tuple[int, Wait], Ready | int, RV]
"""Generator for processes where the connection file number can change.
This can happen in connection and reset, but not in normal querying.
class Transformer(Protocol):
- types: Tuple[int, ...] | None
+ types: tuple[int, ...] | None
formats: list[pq.Format] | None
def __init__(self, context: AdaptContext | None = None): ...
def load_row(self, row: int, make_row: "RowMaker[Row]") -> "Row" | None: ...
- def load_sequence(self, record: Sequence[Buffer | None]) -> Tuple[Any, ...]: ...
+ def load_sequence(self, record: Sequence[Buffer | None]) -> tuple[Any, ...]: ...
def get_loader(self, oid: int, format: pq.Format) -> Loader: ...
from __future__ import annotations
-from typing import Tuple, TYPE_CHECKING
+from typing import TYPE_CHECKING
from functools import partial
from ._queries import PostgresQuery, PostgresClientQuery
def _get_prepared(
self, pgq: PostgresQuery, prepare: bool | None = None
- ) -> Tuple[Prepare, bytes]:
+ ) -> tuple[Prepare, bytes]:
return (Prepare.NO, b"")
from __future__ import annotations
from dataclasses import dataclass, field, fields
-from typing import Any, Callable, NoReturn, Sequence, Tuple, TYPE_CHECKING
+from typing import Any, Callable, NoReturn, Sequence, TYPE_CHECKING
from asyncio import CancelledError
from .pq.abc import PGconn, PGresult
"""
return Diagnostic(self._info, encoding=self._encoding)
- def __reduce__(self) -> str | Tuple[Any, ...]:
+ def __reduce__(self) -> str | tuple[Any, ...]:
res = super().__reduce__()
if isinstance(res, tuple) and len(res) >= 3:
# To make the exception picklable
return None
- def __reduce__(self) -> str | Tuple[Any, ...]:
+ def __reduce__(self) -> str | tuple[Any, ...]:
res = super().__reduce__()
if isinstance(res, tuple) and len(res) >= 3:
res[2]["_info"] = _info_to_dict(self._info)
import ctypes.util
from ctypes import Structure, CFUNCTYPE, POINTER
from ctypes import c_char, c_char_p, c_int, c_size_t, c_ubyte, c_uint, c_void_p
-from typing import Any, NoReturn, Tuple
+from typing import Any, NoReturn
from .misc import find_libpq_full_path, version_pretty
from ..errors import NotSupportedError
class PGconn_struct(Structure):
- _fields_: list[Tuple[str, type]] = []
+ _fields_: list[tuple[str, type]] = []
class PGresult_struct(Structure):
- _fields_: list[Tuple[str, type]] = []
+ _fields_: list[tuple[str, type]] = []
class PQconninfoOption_struct(Structure):
class PGcancelConn_struct(Structure):
- _fields_: list[Tuple[str, type]] = []
+ _fields_: list[tuple[str, type]] = []
class PGcancel_struct(Structure):
- _fields_: list[Tuple[str, type]] = []
+ _fields_: list[tuple[str, type]] = []
class PGresAttDesc_struct(Structure):
from __future__ import annotations
-from typing import Any, Callable, Protocol, Sequence, Tuple, TYPE_CHECKING
+from typing import Any, Callable, Protocol, Sequence, TYPE_CHECKING
from ._enums import Format, Trace
from .._compat import TypeAlias
def put_copy_end(self, error: bytes | None = None) -> int: ...
- def get_copy_data(self, async_: int) -> Tuple[int, memoryview]: ...
+ def get_copy_data(self, async_: int) -> tuple[int, memoryview]: ...
def trace(self, fileno: int) -> None: ...
from ctypes import Array, POINTER, cast, string_at, create_string_buffer, byref
from ctypes import addressof, c_char_p, c_int, c_size_t, c_ulong, c_void_p, py_object
-from typing import Any, Callable, Sequence, Tuple
+from typing import Any, Callable, Sequence
from typing import cast as t_cast, TYPE_CHECKING
from .. import errors as e
raise e.OperationalError(f"sending copy end failed: {error_message(self)}")
return rv
- def get_copy_data(self, async_: int) -> Tuple[int, memoryview]:
+ def get_copy_data(self, async_: int) -> tuple[int, memoryview]:
buffer_ptr = c_char_p()
nbytes = impl.PQgetCopyData(self._pgconn_ptr, byref(buffer_ptr), async_)
if nbytes == -2:
import functools
from typing import Any, Callable, NamedTuple, NoReturn
-from typing import TYPE_CHECKING, Protocol, Sequence, Tuple
+from typing import TYPE_CHECKING, Protocol, Sequence
from collections import namedtuple
from . import pq
def __call__(self, __cursor: "BaseCursor[Any, Any]") -> RowMaker[Row]: ...
-TupleRow: TypeAlias = Tuple[Any, ...]
+TupleRow: TypeAlias = tuple[Any, ...]
"""
An alias for the type returned by `tuple_row()` (i.e. a tuple of any content).
"""
import re
import struct
from math import prod
-from typing import Any, cast, Callable, Pattern, Set, Tuple
+from typing import Any, cast, Callable, Pattern, Set
from .. import pq
from .. import errors as e
_struct_head = struct.Struct("!III") # ndims, hasnull, elem oid
_pack_head = cast(Callable[[int, int, int], bytes], _struct_head.pack)
-_unpack_head = cast(Callable[[Buffer], Tuple[int, int, int]], _struct_head.unpack_from)
+_unpack_head = cast(Callable[[Buffer], tuple[int, int, int]], _struct_head.unpack_from)
_struct_dim = struct.Struct("!II") # dim, lower bound
_pack_dim = cast(Callable[[int, int], bytes], _struct_dim.pack)
-_unpack_dim = cast(Callable[[Buffer, int], Tuple[int, int]], _struct_dim.unpack_from)
+_unpack_dim = cast(Callable[[Buffer, int], tuple[int, int]], _struct_dim.unpack_from)
PY_TEXT = PyFormat.TEXT
PQ_BINARY = pq.Format.BINARY
import struct
from collections import namedtuple
from typing import Any, Callable, cast, Iterator
-from typing import NamedTuple, Sequence, Tuple, TYPE_CHECKING
+from typing import NamedTuple, Sequence, TYPE_CHECKING
from .. import pq
from .. import abc
_struct_oidlen = struct.Struct("!Ii")
_pack_oidlen = cast(Callable[[int, int], bytes], _struct_oidlen.pack)
_unpack_oidlen = cast(
- Callable[[abc.Buffer, int], Tuple[int, int]], _struct_oidlen.unpack_from
+ Callable[[abc.Buffer, int], tuple[int, int]], _struct_oidlen.unpack_from
)
# Should be this, but it doesn't work
# oid = _oids.RECORD_OID
- def dump(self, obj: Tuple[Any, ...]) -> Buffer | None:
+ def dump(self, obj: tuple[Any, ...]) -> Buffer | None:
return self._dump_sequence(obj, b"(", b")", b",")
format = pq.Format.BINARY
# Subclasses must set this info
- _field_types: Tuple[int, ...]
+ _field_types: tuple[int, ...]
def __init__(self, cls: type, context: abc.AdaptContext | None = None):
super().__init__(cls, context)
nfields = len(self._field_types)
self._formats = (PyFormat.from_pq(self.format),) * nfields
- def dump(self, obj: Tuple[Any, ...]) -> Buffer | None:
+ def dump(self, obj: tuple[Any, ...]) -> Buffer | None:
out = bytearray(pack_len(len(obj)))
adapted = self._tx.dump_sequence(obj, self._formats)
for i in range(len(obj)):
class RecordLoader(BaseCompositeLoader):
- def load(self, data: abc.Buffer) -> Tuple[Any, ...]:
+ def load(self, data: abc.Buffer) -> tuple[Any, ...]:
if data == b"()":
return ()
# Usually there will be only one, but if there is more than one
# row in the same query (in different columns, or even in different
# records), oids might differ and we'd need separate transformers.
- self._txs: dict[Tuple[int, ...], abc.Transformer] = {}
+ self._txs: dict[tuple[int, ...], abc.Transformer] = {}
- def load(self, data: abc.Buffer) -> Tuple[Any, ...]:
+ def load(self, data: abc.Buffer) -> tuple[Any, ...]:
nfields = unpack_len(data, 0)[0]
offset = 4
oids = []
@cache
-def _make_nt(name: str, fields: Tuple[str, ...]) -> type[NamedTuple]:
+def _make_nt(name: str, fields: tuple[str, ...]) -> type[NamedTuple]:
return namedtuple(name, fields) # type: ignore[return-value]
@cache
def _make_loader(
- name: str, types: Tuple[int, ...], factory: Callable[..., Any]
+ name: str, types: tuple[int, ...], factory: Callable[..., Any]
) -> type[BaseCompositeLoader]:
return type(
f"{name.title()}Loader",
@cache
def _make_binary_dumper(
- name: str, oid: int, field_types: Tuple[int, ...]
+ name: str, oid: int, field_types: tuple[int, ...]
) -> type[TupleBinaryDumper]:
return type(
f"{name.title()}BinaryDumper",
import re
import struct
from datetime import date, datetime, time, timedelta, timezone
-from typing import Any, Callable, cast, Tuple, TYPE_CHECKING
+from typing import Any, Callable, cast, TYPE_CHECKING
from .. import _oids
from ..pq import Format
_struct_timetz = struct.Struct("!qi") # microseconds, sec tz offset
_pack_timetz = cast(Callable[[int, int], bytes], _struct_timetz.pack)
-_unpack_timetz = cast(Callable[[Buffer], Tuple[int, int]], _struct_timetz.unpack)
+_unpack_timetz = cast(Callable[[Buffer], tuple[int, int]], _struct_timetz.unpack)
_struct_interval = struct.Struct("!qii") # microseconds, days, months
_pack_interval = cast(Callable[[int, int, int], bytes], _struct_interval.pack)
_unpack_interval = cast(
- Callable[[Buffer], Tuple[int, int, int]], _struct_interval.unpack
+ Callable[[Buffer], tuple[int, int, int]], _struct_interval.unpack
)
utc = timezone.utc
from __future__ import annotations
from enum import Enum
-from typing import Any, Generic, Mapping, Sequence, Tuple, cast, TYPE_CHECKING
+from typing import Any, Generic, Mapping, Sequence, cast, TYPE_CHECKING
from .. import sql
from .. import postgres
EnumDumpMap: TypeAlias = dict[E, bytes]
EnumLoadMap: TypeAlias = dict[bytes, E]
-EnumMapping: TypeAlias = Mapping[E, str] | Sequence[Tuple[E, str]] | None
+EnumMapping: TypeAlias = Mapping[E, str] | Sequence[tuple[E, str]] | None
# Hashable versions
-_HEnumDumpMap: TypeAlias = Tuple[Tuple[E, bytes], ...]
-_HEnumLoadMap: TypeAlias = Tuple[Tuple[bytes, E], ...]
+_HEnumDumpMap: TypeAlias = tuple[tuple[E, bytes], ...]
+_HEnumLoadMap: TypeAlias = tuple[tuple[bytes, E], ...]
TEXT = Format.TEXT
BINARY = Format.BINARY
@cache
-def _make_enum(name: str, labels: Tuple[str, ...]) -> Enum:
+def _make_enum(name: str, labels: tuple[str, ...]) -> Enum:
return Enum(name.title(), labels, module=__name__)
from __future__ import annotations
import json
-from typing import Any, Callable, Tuple
+from typing import Any, Callable
from .. import abc
from .. import _oids
return _default_dumpers[cls, format]
-_default_dumpers: dict[Tuple[type[_JsonWrapper], PyFormat], type[Dumper]] = {
+_default_dumpers: dict[tuple[type[_JsonWrapper], PyFormat], type[Dumper]] = {
(Json, PyFormat.BINARY): JsonBinaryDumper,
(Json, PyFormat.TEXT): JsonDumper,
(Jsonb, PyFormat.BINARY): JsonbBinaryDumper,
import struct
from abc import ABC, abstractmethod
from math import log
-from typing import Any, Callable, DefaultDict, Tuple, cast, TYPE_CHECKING
+from typing import Any, Callable, DefaultDict, cast, TYPE_CHECKING
from decimal import Decimal, DefaultContext, Context
from .. import _oids
_contexts[i] = DefaultContext
_unpack_numeric_head = cast(
- Callable[[Buffer], Tuple[int, int, int, int]],
+ Callable[[Buffer], tuple[int, int, int, int]],
struct.Struct("!HhHH").unpack_from,
)
_pack_numeric_head = cast(
oid = _oids.NUMERIC_OID
# If numpy is available, the dumped object might be a numpy integer too
- int_classes: type | Tuple[type, ...] = ()
+ int_classes: type | tuple[type, ...] = ()
def __init__(self, cls: type, context: AdaptContext | None = None):
super().__init__(cls, context)
from __future__ import annotations
import re
-from typing import Any, Generic, Tuple, cast, TYPE_CHECKING
+from typing import Any, Generic, cast, TYPE_CHECKING
from decimal import Decimal
from datetime import date, datetime
return load_range_text(data, self._load)[0]
-def load_range_text(data: Buffer, load: LoadFunc) -> Tuple[Range[Any], int]:
+def load_range_text(data: Buffer, load: LoadFunc) -> tuple[Range[Any], int]:
if data == b"empty":
return Range(empty=True), 5
from __future__ import annotations
-from typing import Any, Sequence, Tuple
+from typing import Any, Sequence
from psycopg import pq, abc, BaseConnection
from psycopg.rows import Row, RowMaker
from psycopg._compat import Deque
class Transformer(abc.AdaptContext):
- types: Tuple[int, ...] | None
+ types: tuple[int, ...] | None
formats: list[pq.Format] | None
def __init__(self, context: abc.AdaptContext | None = None): ...
@classmethod
def get_dumper(self, obj: Any, format: PyFormat) -> abc.Dumper: ...
def load_rows(self, row0: int, row1: int, make_row: RowMaker[Row]) -> list[Row]: ...
def load_row(self, row: int, make_row: RowMaker[Row]) -> Row | None: ...
- def load_sequence(self, record: Sequence[abc.Buffer | None]) -> Tuple[Any, ...]: ...
+ def load_sequence(self, record: Sequence[abc.Buffer | None]) -> tuple[Any, ...]: ...
def get_loader(self, oid: int, format: pq.Format) -> abc.Loader: ...
# Generators
def format_row_binary(
row: Sequence[Any], tx: abc.Transformer, out: bytearray | None = None
) -> bytearray: ...
-def parse_row_text(data: abc.Buffer, tx: abc.Transformer) -> Tuple[Any, ...]: ...
-def parse_row_binary(data: abc.Buffer, tx: abc.Transformer) -> Tuple[Any, ...]: ...
+def parse_row_text(data: abc.Buffer, tx: abc.Transformer) -> tuple[Any, ...]: ...
+def parse_row_binary(data: abc.Buffer, tx: abc.Transformer) -> tuple[Any, ...]: ...
# Arrays optimization
def array_load_text(
return 0
-def parse_row_binary(data, tx: Transformer) -> Tuple[Any, ...]:
+def parse_row_binary(data, tx: Transformer) -> tuple[Any, ...]:
cdef unsigned char *ptr
cdef Py_ssize_t bufsize
_buffer_as_string_and_size(data, <char **>&ptr, &bufsize)
return tx.load_sequence(row)
-def parse_row_text(data, tx: Transformer) -> Tuple[Any, ...]:
+def parse_row_text(data, tx: Transformer) -> tuple[Any, ...]:
cdef unsigned char *fstart
cdef Py_ssize_t size
_buffer_as_string_and_size(data, <char **>&fstart, &size)
from cpython.tuple cimport PyTuple_New, PyTuple_SET_ITEM
from cpython.object cimport PyObject, PyObject_CallFunctionObjArgs
-from typing import Any, Iterable, Sequence, Tuple
+from typing import Any, Iterable, Sequence
from psycopg import errors as e
from psycopg.pq import Format as PqFormat
raise e.OperationalError(f"sending copy end failed: {error_message(self)}")
return rv
- def get_copy_data(self, int async_) -> Tuple[int, memoryview]:
+ def get_copy_data(self, int async_) -> tuple[int, memoryview]:
cdef char *buffer_ptr = NULL
cdef int nbytes
nbytes = libpq.PQgetCopyData(self._pgconn_ptr, &buffer_ptr, async_)
from time import monotonic
from random import random
-from typing import Any, Tuple, TYPE_CHECKING
+from typing import Any, TYPE_CHECKING
from psycopg import errors as e
"""`!True` if the pool is closed."""
return self._closed
- def _check_size(self, min_size: int, max_size: int | None) -> Tuple[int, int]:
+ def _check_size(self, min_size: int, max_size: int | None) -> tuple[int, int]:
if max_size is None:
max_size = min_size
from math import isnan
from uuid import UUID
from random import choice, random, randrange
-from typing import Any, Set, Tuple
+from typing import Any, Set
from decimal import Decimal
from contextlib import contextmanager, asynccontextmanager
)
def choose_schema(self, ncols=20):
- schema: list[Tuple[type, ...] | type] = []
+ schema: list[tuple[type, ...] | type] = []
while len(schema) < ncols:
s = self.make_schema(choice(self.types))
if s is not None:
return rv
- def make_schema(self, cls: type) -> Tuple[type, ...] | type | None:
+ def make_schema(self, cls: type) -> tuple[type, ...] | type | None:
"""Create a schema spec from a Python type.
A schema specifies what Postgres type to generate when a Python type
+from __future__ import annotations
+
import gc
import sys
-from typing import Tuple
import pytest
)
-NO_COUNT_TYPES: Tuple[type, ...] = ()
+NO_COUNT_TYPES: tuple[type, ...] = ()
if sys.version_info[:2] == (3, 10):
# On my laptop there are occasional creations of a single one of these objects
import logging
import weakref
from time import time
-from typing import Any, Tuple
+from typing import Any
import pytest
with pool.ConnectionPool(dsn, min_size=min_size, max_size=4, num_workers=3) as p:
p.wait(1.0)
- results: list[Tuple[int, float]] = []
+ results: list[tuple[int, float]] = []
ts = [spawn(worker, args=(i,)) for i in range(len(want_times))]
gather(*ts)
def test_shrink(dsn, monkeypatch):
from psycopg_pool.pool import ShrinkPool
- results: list[Tuple[int, int]] = []
+ results: list[tuple[int, int]] = []
def run_hacked(self, pool):
n0 = pool._nconns
import logging
import weakref
from time import time
-from typing import Any, Tuple
+from typing import Any
import pytest
dsn, min_size=min_size, max_size=4, num_workers=3
) as p:
await p.wait(1.0)
- results: list[Tuple[int, float]] = []
+ results: list[tuple[int, float]] = []
ts = [spawn(worker, args=(i,)) for i in range(len(want_times))]
await gather(*ts)
async def test_shrink(dsn, monkeypatch):
from psycopg_pool.pool_async import ShrinkPool
- results: list[Tuple[int, int]] = []
+ results: list[tuple[int, int]] = []
async def run_hacked(self, pool):
n0 = pool._nconns
import logging
from time import time
-from typing import Any, Tuple
+from typing import Any
import pytest
t1 = time()
results.append((n, t1 - t0, pid))
- results: list[Tuple[int, float, int]] = []
+ results: list[tuple[int, float, int]] = []
with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2) as p:
p.wait()
ts = [spawn(worker, args=(i,)) for i in range(6)]
t1 = time()
results.append((n, t1 - t0, pid))
- results: list[Tuple[int, float, int]] = []
- errors: list[Tuple[int, float, Exception]] = []
+ results: list[tuple[int, float, int]] = []
+ errors: list[tuple[int, float, Exception]] = []
with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1) as p:
ts = [spawn(worker, args=(i,)) for i in range(4)]
t1 = time()
results.append((n, t1 - t0, pid))
- results: list[Tuple[int, float, int]] = []
- errors: list[Tuple[int, float, Exception]] = []
+ results: list[tuple[int, float, int]] = []
+ errors: list[tuple[int, float, Exception]] = []
with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1) as p:
ts = [spawn(worker, args=(i,)) for i in range(4)]
import logging
from time import time
-from typing import Any, Tuple
+from typing import Any
import pytest
t1 = time()
results.append((n, t1 - t0, pid))
- results: list[Tuple[int, float, int]] = []
+ results: list[tuple[int, float, int]] = []
async with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2) as p:
await p.wait()
ts = [spawn(worker, args=(i,)) for i in range(6)]
t1 = time()
results.append((n, t1 - t0, pid))
- results: list[Tuple[int, float, int]] = []
- errors: list[Tuple[int, float, Exception]] = []
+ results: list[tuple[int, float, int]] = []
+ errors: list[tuple[int, float, Exception]] = []
async with pool_cls(
dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1
t1 = time()
results.append((n, t1 - t0, pid))
- results: list[Tuple[int, float, int]] = []
- errors: list[Tuple[int, float, Exception]] = []
+ results: list[tuple[int, float, int]] = []
+ errors: list[tuple[int, float, Exception]] = []
async with pool_cls(
dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1
import logging
from contextlib import contextmanager
from functools import partial
-from typing import Any, Iterator, Sequence, Tuple
+from typing import Any, Iterator, Sequence
from psycopg import AsyncConnection, Connection
from psycopg import pq, waiting
@contextmanager
def prepare_pipeline_demo_pq(
pgconn: LoggingPGconn, rows_to_send: int, logger: logging.Logger
-) -> Iterator[Tuple[Deque[PipelineCommand], Deque[str]]]:
+) -> Iterator[tuple[Deque[PipelineCommand], Deque[str]]]:
"""Set up pipeline demo with initial queries and yield commands and
results queue for pipeline_communicate().
"""
from __future__ import annotations
from dataclasses import dataclass
-from typing import Any, Callable, Sequence, Tuple
+from typing import Any, Callable, Sequence
from psycopg import Connection, Cursor, ServerCursor, connect, rows
from psycopg import AsyncConnection, AsyncCursor, AsyncServerCursor
with conn2.cursor() as cur2:
cur2.execute("select 2")
- cur3: Cursor[Tuple[Any, ...]]
- r3: Tuple[Any, ...] | None
+ cur3: Cursor[tuple[Any, ...]]
+ r3: tuple[Any, ...] | None
conn3 = connect()
cur3 = conn3.execute("select 3")
with conn3.cursor() as cur3:
async with conn2.cursor() as cur2:
await cur2.execute("select 2")
- cur3: AsyncCursor[Tuple[Any, ...]]
- r3: Tuple[Any, ...] | None
+ cur3: AsyncCursor[tuple[Any, ...]]
+ r3: tuple[Any, ...] | None
conn3 = await AsyncConnection.connect()
cur3 = await conn3.execute("select 3")
async with conn3.cursor() as cur3:
def check_row_factories() -> None:
conn1 = connect(row_factory=rows.tuple_row)
- v1: Tuple[Any, ...] = conn1.execute("").fetchall()[0]
+ v1: tuple[Any, ...] = conn1.execute("").fetchall()[0]
conn2 = connect(row_factory=rows.dict_row)
v2: dict[str, Any] = conn2.execute("").fetchall()[0]
import re
import sys
import operator
-from typing import Callable, Tuple
+from typing import Callable
from contextlib import contextmanager
*,
skip: bool = False,
op: str | None = None,
- version_tuple: Tuple[int, ...] = (),
+ version_tuple: tuple[int, ...] = (),
whose: str = "(wanted)",
postgres_rule: bool = False,
):
_OP_NAMES = {">=": "ge", "<=": "le", ">": "gt", "<": "lt", "==": "eq", "!=": "ne"}
- def _match_version(self, got_tuple: Tuple[int, ...]) -> bool:
+ def _match_version(self, got_tuple: tuple[int, ...]) -> bool:
if not self.version_tuple:
return True
assert len(version_tuple) <= 2
version_tuple = version_tuple[:1] + (0,) + version_tuple[1:]
- op: Callable[[Tuple[int, ...], Tuple[int, ...]], bool]
+ op: Callable[[tuple[int, ...], tuple[int, ...]], bool]
op = getattr(operator, self._OP_NAMES[self.op])
return op(got_tuple, version_tuple)
- def _parse_int_version(self, version: int | None) -> Tuple[int, ...]:
+ def _parse_int_version(self, version: int | None) -> tuple[int, ...]:
if version is None:
return ()
version, ver_fix = divmod(version, 100)