from __future__ import annotations
-from typing import Any, Dict, List, Type, cast, TYPE_CHECKING
+from typing import Any, Dict, Type, cast, TYPE_CHECKING
from . import pq
from . import errors as e
types: TypesRegistry
_dumpers: Dict[PyFormat, Dict[type | str, Type[Dumper]]]
- _dumpers_by_oid: List[Dict[int, Type[Dumper]]]
- _loaders: List[Dict[int, Type[Loader]]]
+ _dumpers_by_oid: list[Dict[int, Type[Dumper]]]
+ _loaders: list[Dict[int, Type[Loader]]]
# Record if a dumper or loader has an optimised version.
_optimised: Dict[type, type] = {}
import sys
import logging
-from typing import Callable, Generic
-from typing import List, NamedTuple, Tuple
-from typing import TYPE_CHECKING
+from typing import Callable, Generic, NamedTuple, Tuple, TYPE_CHECKING
from weakref import ref, ReferenceType
from warnings import warn
from functools import partial
# None, but set to a copy of the global adapters map as soon as requested.
self._adapters: AdaptersMap | None = None
- self._notice_handlers: List[NoticeHandler] = []
- self._notify_handlers: List[NotifyHandler] = []
+ self._notice_handlers: list[NoticeHandler] = []
+ self._notify_handlers: list[NotifyHandler] = []
# Number of transaction blocks currently entered
self._num_transactions = 0
import sys
import struct
from abc import ABC, abstractmethod
-from typing import Any, Dict, Generic, List, Match, Sequence, Tuple, TYPE_CHECKING
+from typing import Any, Dict, Generic, Match, Sequence, Tuple, TYPE_CHECKING
from . import pq
from . import adapt
def _parse_row_binary(data: Buffer, tx: Transformer) -> Tuple[Any, ...]:
- row: List[Buffer | None] = []
+ row: list[Buffer | None] = []
nfields = _unpack_int2(data, 0)[0]
pos = 2
for i in range(nfields):
from __future__ import annotations
from functools import partial
-from typing import Any, Generic, Iterable, List
-from typing import NoReturn, Sequence, Tuple, Type
+from typing import Any, Generic, Iterable, NoReturn, Sequence, Tuple, Type
from typing import TYPE_CHECKING
from . import pq
self._reset()
def _reset(self, reset_query: bool = True) -> None:
- self._results: List["PGresult"] = []
+ self._results: list["PGresult"] = []
self.pgresult: "PGresult" | None = None
self._pos = 0
self._iresult = 0
return self._closed
@property
- def description(self) -> List[Column] | None:
+ def description(self) -> list[Column] | None:
"""
A list of `Column` objects describing the current resultset.
pgq.convert(query, params)
return pgq
- def _check_results(self, results: List["PGresult"]) -> None:
+ def _check_results(self, results: list["PGresult"]) -> None:
"""
Verify that the results of a query are valid.
self._make_row = self._make_row_maker()
- def _set_results(self, results: List["PGresult"]) -> None:
+ def _set_results(self, results: list["PGresult"]) -> None:
if self._execmany_returning is None:
# Received from execute()
self._results[:] = results
import re
import warnings
from random import randint
-from typing import Any, DefaultDict, Dict, List, NamedTuple, Sequence
-from typing import TYPE_CHECKING
+from typing import Any, DefaultDict, Dict, NamedTuple, Sequence, TYPE_CHECKING
from collections import defaultdict
try:
return self._return_params(params, hps)
- def _get_attempts(self, params: Dict[str, Any]) -> List[HostPort]:
+ def _get_attempts(self, params: Dict[str, Any]) -> list[HostPort]:
"""
Return the list of host, and for each host if SRV lookup must be tried.
return out if srv_found else []
- def _resolve_srv(self, hp: HostPort) -> List[HostPort]:
+ def _resolve_srv(self, hp: HostPort) -> list[HostPort]:
try:
ans = resolver.resolve(hp.host, "SRV")
except DNSException:
ans = ()
return self._get_solved_entries(hp, ans)
- async def _resolve_srv_async(self, hp: HostPort) -> List[HostPort]:
+ async def _resolve_srv_async(self, hp: HostPort) -> list[HostPort]:
try:
ans = await async_resolver.resolve(hp.host, "SRV")
except DNSException:
def _get_solved_entries(
self, hp: HostPort, entries: "Sequence[SRV]"
- ) -> List[HostPort]:
+ ) -> list[HostPort]:
if not entries:
# No SRV entry found. Delegate the libpq a QNAME=target lookup
if hp.target and hp.port.lower() != "srv":
]
def _return_params(
- self, params: Dict[str, Any], hps: List[HostPort]
+ self, params: Dict[str, Any], hps: list[HostPort]
) -> Dict[str, Any]:
if not hps:
# Nothing found, we ended up with an empty list
out["port"] = ",".join(str(hp.port) for hp in hps)
return out
- def sort_rfc2782(self, ans: "Sequence[SRV]") -> "List[SRV]":
+ def sort_rfc2782(self, ans: "Sequence[SRV]") -> "list[SRV]":
"""
Implement the priority/weight ordering defined in RFC 2782.
"""
# Divide the entries by priority:
- priorities: DefaultDict[int, "List[SRV]"] = defaultdict(list)
- out: "List[SRV]" = []
+ priorities: DefaultDict[int, "list[SRV]"] = defaultdict(list)
+ out: "list[SRV]" = []
for entry in ans:
priorities[entry.priority].append(entry)
import logging
from types import TracebackType
-from typing import Any, List, Tuple, Type, TYPE_CHECKING
+from typing import Any, Tuple, Type, TYPE_CHECKING
from . import pq
from . import errors as e
raise exception
def _process_results(
- self, queued: PendingResult, results: List["PGresult"]
+ self, queued: PendingResult, results: list["PGresult"]
) -> None:
"""Process a results set fetched from the current pipeline.
from __future__ import annotations
-from typing import Any, Dict, List, Sequence, Tuple
-from typing import DefaultDict, TYPE_CHECKING
+from typing import Any, Dict, Sequence, Tuple, DefaultDict, TYPE_CHECKING
from collections import defaultdict
from . import pq
""".split()
types: Tuple[int, ...] | None
- formats: List[pq.Format] | None
+ formats: list[pq.Format] | None
_adapters: "AdaptersMap"
_pgresult: "PGresult" | None
# mapping fmt, oid -> Loader instance
self._loaders: Tuple[LoaderCache, LoaderCache] = ({}, {})
- self._row_dumpers: List[abc.Dumper] | None = None
+ self._row_dumpers: list[abc.Dumper] | None = None
# sequence of load functions from value to python
# the length of the result columns
- self._row_loaders: List[LoadFunc] = []
+ self._row_loaders: list[LoadFunc] = []
# mapping oid -> type sql representation
self._oid_types: Dict[int, bytes] = {}
self, params: Sequence[Any], formats: Sequence[PyFormat]
) -> Sequence[Buffer | None]:
nparams = len(params)
- out: List[Buffer | None] = [None] * nparams
+ out: list[Buffer | None] = [None] * nparams
# If we have dumpers, it means set_dumper_types had been called, in
# which case self.types and self.formats are set to sequences of the
return dumper
- def load_rows(self, row0: int, row1: int, make_row: RowMaker[Row]) -> List[Row]:
+ def load_rows(self, row0: int, row1: int, make_row: RowMaker[Row]) -> list[Row]:
res = self._pgresult
if not res:
raise e.InterfaceError("result not set")
records = []
for row in range(row0, row1):
- record: List[Any] = [None] * self._nfields
+ record: list[Any] = [None] * self._nfields
for col in range(self._nfields):
val = res.get_value(row, col)
if val is not None:
if not 0 <= row < self._ntuples:
return None
- record: List[Any] = [None] * self._nfields
+ record: list[Any] = [None] * self._nfields
for col in range(self._nfields):
val = res.get_value(row, col)
if val is not None:
from __future__ import annotations
import re
-from typing import Any, Callable, Dict, List, Mapping, Match, NamedTuple
+from typing import Any, Callable, Dict, Mapping, Match, NamedTuple
from typing import Sequence, Tuple, TYPE_CHECKING
from functools import lru_cache
self.types: Tuple[int, ...] = ()
# The format requested by the user and the ones to really pass Postgres
- self._want_formats: List[PyFormat] | None = None
+ self._want_formats: list[PyFormat] | None = None
self.formats: Sequence[pq.Format] | None = None
self._encoding = conn_encoding(transformer.connection)
- self._parts: List[QueryPart]
+ self._parts: list[QueryPart]
self.query = b""
- self._order: List[str] | None = None
+ self._order: list[str] | None = None
def convert(self, query: Query, vars: Params | None) -> None:
"""
@staticmethod
def validate_and_reorder_params(
- parts: List[QueryPart], vars: Params, order: List[str] | None
+ parts: list[QueryPart], vars: Params, order: list[str] | None
) -> Sequence[Any]:
"""
Verify the compatibility between a query and a set of params.
# The type of the _query2pg() and _query2pg_nocache() methods
_Query2Pg: TypeAlias = Callable[
- [bytes, str], Tuple[bytes, List[PyFormat], List[str] | None, List[QueryPart]]
+ [bytes, str], Tuple[bytes, list[PyFormat], list[str] | None, list[QueryPart]]
]
def _query2pg_nocache(
query: bytes, encoding: str
-) -> Tuple[bytes, List[PyFormat], List[str] | None, List[QueryPart]]:
+) -> Tuple[bytes, list[PyFormat], list[str] | None, list[QueryPart]]:
"""
Convert Python query and params into something Postgres understands.
``parts`` (splits of queries and placeholders).
"""
parts = _split_query(query, encoding)
- order: List[str] | None = None
- chunks: List[bytes] = []
+ order: list[str] | None = None
+ chunks: list[bytes] = []
formats = []
if isinstance(parts[0].item, int):
_Query2PgClient: TypeAlias = Callable[
- [bytes, str], Tuple[bytes, List[str] | None, List[QueryPart]]
+ [bytes, str], Tuple[bytes, list[str] | None, list[QueryPart]]
]
def _query2pg_client_nocache(
query: bytes, encoding: str
-) -> Tuple[bytes, List[str] | None, List[QueryPart]]:
+) -> Tuple[bytes, list[str] | None, list[QueryPart]]:
"""
Convert Python query and params into a template to perform client-side binding
"""
parts = _split_query(query, encoding, collapse_double_percent=False)
- order: List[str] | None = None
- chunks: List[bytes] = []
+ order: list[str] | None = None
+ chunks: list[bytes] = []
if isinstance(parts[0].item, int):
for part in parts[:-1]:
def _split_query(
query: bytes, encoding: str = "ascii", collapse_double_percent: bool = True
-) -> List[QueryPart]:
- parts: List[Tuple[bytes, Match[bytes] | None]] = []
+) -> list[QueryPart]:
+ parts: list[Tuple[bytes, Match[bytes] | None]] = []
cur = 0
# pairs [(fragment, match], with the last match None
from __future__ import annotations
from typing import Any, Dict, Callable, Generator, Mapping
-from typing import List, Protocol, Sequence, Tuple, TYPE_CHECKING
+from typing import Protocol, Sequence, Tuple, TYPE_CHECKING
from . import pq
from ._enums import PyFormat as PyFormat
class Transformer(Protocol):
types: Tuple[int, ...] | None
- formats: List[pq.Format] | None
+ formats: list[pq.Format] | None
def __init__(self, context: AdaptContext | None = None): ...
def load_rows(
self, row0: int, row1: int, make_row: "RowMaker[Row]"
- ) -> List["Row"]: ...
+ ) -> list["Row"]: ...
def load_row(self, row: int, make_row: "RowMaker[Row]") -> "Row" | None: ...
import logging
from time import monotonic
from types import TracebackType
-from typing import Any, Generator, Iterator, List
+from typing import Any, Generator, Iterator
from typing import Type, cast, overload, TYPE_CHECKING
from contextlib import contextmanager
with self.lock:
self.wait(self._tpc_finish_gen("ROLLBACK", xid))
- def tpc_recover(self) -> List[Xid]:
+ def tpc_recover(self) -> list[Xid]:
self._check_tpc()
status = self.info.transaction_status
with self.cursor(row_factory=args_row(Xid._from_record)) as cur:
import logging
from time import monotonic
from types import TracebackType
-from typing import Any, AsyncGenerator, AsyncIterator, List
+from typing import Any, AsyncGenerator, AsyncIterator
from typing import Type, cast, overload, TYPE_CHECKING
from contextlib import asynccontextmanager
async with self.lock:
await self.wait(self._tpc_finish_gen("ROLLBACK", xid))
- async def tpc_recover(self) -> List[Xid]:
+ async def tpc_recover(self) -> list[Xid]:
self._check_tpc()
status = self.info.transaction_status
async with self.cursor(row_factory=args_row(Xid._from_record)) as cur:
from __future__ import annotations
from types import TracebackType
-from typing import Any, Iterator, Iterable, List, Type
-from typing import TYPE_CHECKING, overload
+from typing import Any, Iterator, Iterable, Type, TYPE_CHECKING, overload
from contextlib import contextmanager
from . import pq
self._pos += 1
return record
- def fetchmany(self, size: int = 0) -> List[Row]:
+ def fetchmany(self, size: int = 0) -> list[Row]:
"""
Return the next `!size` records from the current recordset.
self._pos += len(records)
return records
- def fetchall(self) -> List[Row]:
+ def fetchall(self) -> list[Row]:
"""
Return all the remaining records from the current recordset.
from __future__ import annotations
from types import TracebackType
-from typing import Any, AsyncIterator, Iterable, List, Type
-from typing import TYPE_CHECKING, overload
+from typing import Any, AsyncIterator, Iterable, Type, TYPE_CHECKING, overload
from contextlib import asynccontextmanager
from . import pq
self._pos += 1
return record
- async def fetchmany(self, size: int = 0) -> List[Row]:
+ async def fetchmany(self, size: int = 0) -> list[Row]:
"""
Return the next `!size` records from the current recordset.
self._pos += len(records)
return records
- async def fetchall(self) -> List[Row]:
+ async def fetchall(self) -> list[Row]:
"""
Return all the remaining records from the current recordset.
from __future__ import annotations
from dataclasses import dataclass, field, fields
-from typing import Any, Callable, Dict, List, NoReturn, Sequence, Tuple, Type
-from typing import TYPE_CHECKING
+from typing import Any, Callable, Dict, NoReturn, Sequence, Tuple, Type, TYPE_CHECKING
from asyncio import CancelledError
from .pq.abc import PGconn, PGresult
raise an `~psycopg.OperationalError`.
"""
- info: List["ConninfoOption"] = field(default_factory=list)
+ info: list[ConninfoOption] = field(default_factory=list)
db: bytes = b""
user: bytes = b""
import logging
from time import monotonic
-from typing import List
from . import pq
from . import errors as e
raise e.InternalError(f"unexpected poll status: {status}")
-def _execute(pgconn: PGconn) -> PQGen[List[PGresult]]:
+def _execute(pgconn: PGconn) -> PQGen[list[PGresult]]:
"""
Generator sending a query and returning results without blocking.
pgconn.consume_input()
-def _fetch_many(pgconn: PGconn) -> PQGen[List[PGresult]]:
+def _fetch_many(pgconn: PGconn) -> PQGen[list[PGresult]]:
"""
Generator retrieving results from the database without blocking.
Return the list of results returned by the database (whether success
or error).
"""
- results: List[PGresult] = []
+ results: list[PGresult] = []
while True:
res = yield from _fetch(pgconn)
if not res:
def _pipeline_communicate(
pgconn: PGconn, commands: Deque[PipelineCommand]
-) -> PQGen[List[List[PGresult]]]:
+) -> PQGen[list[list[PGresult]]]:
"""Generator to send queries from a connection in pipeline mode while also
receiving results.
pgconn.consume_input()
_consume_notifies(pgconn)
- res: List[PGresult] = []
+ res: list[PGresult] = []
while not pgconn.is_busy():
r = pgconn.get_result()
if r is None:
pgconn.notify_handler(n)
-def notifies(pgconn: PGconn) -> PQGen[List[pq.PGnotify]]:
+def notifies(pgconn: PGconn) -> PQGen[list[pq.PGnotify]]:
yield WAIT_R
pgconn.consume_input()
# Copyright (C) 2020 The Psycopg Team
+from __future__ import annotations
+
import os
import logging
-from typing import Callable, List, Type
+from typing import Callable, Type
from . import abc
from .misc import ConninfoOption, PGnotify, PGresAttDesc
impl = os.environ.get("PSYCOPG_IMPL", "").lower()
module = None
- attempts: List[str] = []
+ attempts: list[str] = []
def handle_error(name: str, e: Exception) -> None:
if not impl:
import ctypes.util
from ctypes import Structure, CFUNCTYPE, POINTER
from ctypes import c_char, c_char_p, c_int, c_size_t, c_ubyte, c_uint, c_void_p
-from typing import Any, List, NoReturn, Tuple
+from typing import Any, NoReturn, Tuple
from .misc import find_libpq_full_path, version_pretty
from ..errors import NotSupportedError
class PGconn_struct(Structure):
- _fields_: List[Tuple[str, type]] = []
+ _fields_: list[Tuple[str, type]] = []
class PGresult_struct(Structure):
- _fields_: List[Tuple[str, type]] = []
+ _fields_: list[Tuple[str, type]] = []
class PQconninfoOption_struct(Structure):
class PGcancelConn_struct(Structure):
- _fields_: List[Tuple[str, type]] = []
+ _fields_: list[Tuple[str, type]] = []
class PGcancel_struct(Structure):
- _fields_: List[Tuple[str, type]] = []
+ _fields_: list[Tuple[str, type]] = []
class PGresAttDesc_struct(Structure):
from __future__ import annotations
-from typing import Any, Callable, List, Protocol, Sequence, Tuple, TYPE_CHECKING
+from typing import Any, Callable, Protocol, Sequence, Tuple, TYPE_CHECKING
from ._enums import Format, Trace
from .._compat import TypeAlias
def finish(self) -> None: ...
@property
- def info(self) -> List["ConninfoOption"]: ...
+ def info(self) -> list["ConninfoOption"]: ...
def reset(self) -> None: ...
@property
def oid_value(self) -> int: ...
- def set_attributes(self, descriptions: List["PGresAttDesc"]) -> None: ...
+ def set_attributes(self, descriptions: list["PGresAttDesc"]) -> None: ...
class PGcancelConn(Protocol):
class Conninfo(Protocol):
@classmethod
- def get_defaults(cls) -> List["ConninfoOption"]: ...
+ def get_defaults(cls) -> list["ConninfoOption"]: ...
@classmethod
- def parse(cls, conninfo: bytes) -> List["ConninfoOption"]: ...
+ def parse(cls, conninfo: bytes) -> list["ConninfoOption"]: ...
@classmethod
- def _options_from_array(cls, opts: Sequence[Any]) -> List["ConninfoOption"]: ...
+ def _options_from_array(cls, opts: Sequence[Any]) -> list["ConninfoOption"]: ...
class Escaping(Protocol):
from ctypes import Array, POINTER, cast, string_at, create_string_buffer, byref
from ctypes import addressof, c_char_p, c_int, c_size_t, c_ulong, c_void_p, py_object
-from typing import Any, Callable, List, Sequence, Tuple
+from typing import Any, Callable, Sequence, Tuple
from typing import cast as t_cast, TYPE_CHECKING
from .. import errors as e
return addressof(self._pgconn_ptr.contents) # type: ignore[attr-defined]
@property
- def info(self) -> List["ConninfoOption"]:
+ def info(self) -> list["ConninfoOption"]:
self._ensure_pgconn()
opts = impl.PQconninfo(self._pgconn_ptr)
if not opts:
def oid_value(self) -> int:
return impl.PQoidValue(self._pgresult_ptr)
- def set_attributes(self, descriptions: List[PGresAttDesc]) -> None:
+ def set_attributes(self, descriptions: list[PGresAttDesc]) -> None:
structs = [
impl.PGresAttDesc_struct(*desc) for desc in descriptions # type: ignore
]
"""
@classmethod
- def get_defaults(cls) -> List[ConninfoOption]:
+ def get_defaults(cls) -> list[ConninfoOption]:
opts = impl.PQconndefaults()
if not opts:
raise MemoryError("couldn't allocate connection defaults")
impl.PQconninfoFree(opts)
@classmethod
- def parse(cls, conninfo: bytes) -> List[ConninfoOption]:
+ def parse(cls, conninfo: bytes) -> list[ConninfoOption]:
if not isinstance(conninfo, bytes):
raise TypeError(f"bytes expected, got {type(conninfo)} instead")
@classmethod
def _options_from_array(
cls, opts: Sequence[impl.PQconninfoOption_struct]
- ) -> List[ConninfoOption]:
+ ) -> list[ConninfoOption]:
rv = []
skws = "keyword envvar compiled val label dispchar".split()
for opt in opts:
from __future__ import annotations
import functools
-from typing import Any, Callable, Dict, List, NamedTuple, NoReturn
+from typing import Any, Callable, Dict, NamedTuple, NoReturn
from typing import TYPE_CHECKING, Protocol, Sequence, Tuple, Type
from collections import namedtuple
raise e.InterfaceError("the cursor doesn't have a result")
-def _get_names(cursor: "BaseCursor[Any, Any]") -> List[str] | None:
+def _get_names(cursor: "BaseCursor[Any, Any]") -> list[str] | None:
res = cursor.pgresult
if not res:
return None
from __future__ import annotations
-from typing import Any, AsyncIterator, List, Iterable, Iterator
-from typing import TYPE_CHECKING, overload
+from typing import Any, AsyncIterator, Iterable, Iterator, TYPE_CHECKING, overload
from warnings import warn
from . import pq
query = sql.SQL("CLOSE {}").format(sql.Identifier(self._name))
yield from self._conn._exec_command(query)
- def _fetch_gen(self, num: int | None) -> PQGen[List[Row]]:
+ def _fetch_gen(self, num: int | None) -> PQGen[list[Row]]:
if self.closed:
raise e.InterfaceError("the cursor is closed")
# If we are stealing the cursor, make sure we know its shape
else:
return None
- def fetchmany(self, size: int = 0) -> List[Row]:
+ def fetchmany(self, size: int = 0) -> list[Row]:
if not size:
size = self.arraysize
with self._conn.lock:
self._pos += len(recs)
return recs
- def fetchall(self) -> List[Row]:
+ def fetchall(self) -> list[Row]:
with self._conn.lock:
recs = self._conn.wait(self._fetch_gen(None))
self._pos += len(recs)
else:
return None
- async def fetchmany(self, size: int = 0) -> List[Row]:
+ async def fetchmany(self, size: int = 0) -> list[Row]:
if not size:
size = self.arraysize
async with self._conn.lock:
self._pos += len(recs)
return recs
- async def fetchall(self) -> List[Row]:
+ async def fetchall(self) -> list[Row]:
async with self._conn.lock:
recs = await self._conn.wait(self._fetch_gen(None))
self._pos += len(recs)
import codecs
import string
from abc import ABC, abstractmethod
-from typing import Any, Iterator, Iterable, List, Sequence
+from typing import Any, Iterator, Iterable, Sequence
from .pq import Escaping
from .abc import AdaptContext
instance).
"""
- _obj: List[Composable]
+ _obj: list[Composable]
def __init__(self, seq: Sequence[Any]):
seq = [obj if isinstance(obj, Composable) else Literal(obj) for obj in seq]
SELECT * FROM "people" WHERE name = 'O''Rourke'
"""
- rv: List[Composable] = []
+ rv: list[Composable] = []
autonum: int | None = 0
# TODO: this is probably not the right way to whitelist pre
# pyre complains. Will wait for mypy to complain too to fix.
import re
import struct
from math import prod
-from typing import Any, cast, Callable, List, Pattern, Set, Tuple, Type
+from typing import Any, cast, Callable, Pattern, Set, Tuple, Type
from .. import pq
from .. import errors as e
sdclass = context.adapters.get_dumper_by_oid(self.element_oid, self.format)
self.sub_dumper = sdclass(NoneType, context)
- def _find_list_element(self, L: List[Any], format: PyFormat) -> Any:
+ def _find_list_element(self, L: list[Any], format: PyFormat) -> Any:
"""
Find the first non-null element of an eventually nested list
"""
else:
return max(imax, -imin - 1)
- def _flatiter(self, L: List[Any], seen: Set[int]) -> Any:
+ def _flatiter(self, L: list[Any], seen: Set[int]) -> Any:
if id(L) in seen:
raise e.DataError("cannot dump a recursive list")
class ListDumper(BaseListDumper):
delimiter = b","
- def get_key(self, obj: List[Any], format: PyFormat) -> DumperKey:
+ def get_key(self, obj: list[Any], format: PyFormat) -> DumperKey:
if self.oid:
return self.cls
sd = self._tx.get_dumper(item, format)
return (self.cls, sd.get_key(item, format))
- def upgrade(self, obj: List[Any], format: PyFormat) -> "BaseListDumper":
+ def upgrade(self, obj: list[Any], format: PyFormat) -> "BaseListDumper":
# If we have an oid we don't need to upgrade
if self.oid:
return self
# backslash-escaped.
_re_esc = re.compile(rb'(["\\])')
- def dump(self, obj: List[Any]) -> Buffer | None:
- tokens: List[Buffer] = []
+ def dump(self, obj: list[Any]) -> Buffer | None:
+ tokens: list[Buffer] = []
needs_quotes = _get_needs_quotes_regexp(self.delimiter).search
- def dump_list(obj: List[Any]) -> None:
+ def dump_list(obj: list[Any]) -> None:
if not obj:
tokens.append(b"{}")
return
class ListBinaryDumper(BaseListDumper):
format = pq.Format.BINARY
- def get_key(self, obj: List[Any], format: PyFormat) -> DumperKey:
+ def get_key(self, obj: list[Any], format: PyFormat) -> DumperKey:
if self.oid:
return self.cls
sd = self._tx.get_dumper(item, format)
return (self.cls, sd.get_key(item, format))
- def upgrade(self, obj: List[Any], format: PyFormat) -> "BaseListDumper":
+ def upgrade(self, obj: list[Any], format: PyFormat) -> "BaseListDumper":
# If we have an oid we don't need to upgrade
if self.oid:
return self
return dumper
- def dump(self, obj: List[Any]) -> Buffer | None:
+ def dump(self, obj: list[Any]) -> Buffer | None:
# Postgres won't take unknown for element oid: fall back on text
sub_oid = self.sub_dumper and self.sub_dumper.oid or TEXT_OID
if not obj:
return _pack_head(0, 0, sub_oid)
- data: List[Buffer] = [b"", b""] # placeholders to avoid a resize
- dims: List[int] = []
+ data: list[Buffer] = [b"", b""] # placeholders to avoid a resize
+ dims: list[int] = []
hasnull = 0
- def calc_dims(L: List[Any]) -> None:
+ def calc_dims(L: list[Any]) -> None:
if isinstance(L, self.cls):
if not L:
raise e.DataError("lists cannot contain empty lists")
calc_dims(obj)
- def dump_list(L: List[Any], dim: int) -> None:
+ def dump_list(L: list[Any], dim: int) -> None:
nonlocal hasnull
if len(L) != dims[dim]:
raise e.DataError("nested lists have inconsistent lengths")
delimiter = b","
base_oid: int
- def load(self, data: Buffer) -> List[Any]:
+ def load(self, data: Buffer) -> list[Any]:
loader = self._tx.get_loader(self.base_oid, self.format)
return _load_text(data, loader, self.delimiter)
class ArrayBinaryLoader(RecursiveLoader):
format = pq.Format.BINARY
- def load(self, data: Buffer) -> List[Any]:
+ def load(self, data: Buffer) -> list[Any]:
return _load_binary(data, self._tx)
loader: Loader,
delimiter: bytes = b",",
__re_unescape: Pattern[bytes] = re.compile(rb"\\(.)"),
-) -> List[Any]:
+) -> list[Any]:
rv = None
- stack: List[Any] = []
- a: List[Any] = []
+ stack: list[Any] = []
+ a: list[Any] = []
rv = a
load = loader.load
)
-def _load_binary(data: Buffer, tx: Transformer) -> List[Any]:
+def _load_binary(data: Buffer, tx: Transformer) -> list[Any]:
ndims, hasnull, oid = _unpack_head(data)
load = tx.get_loader(oid, PQ_BINARY).load
dims = [_unpack_dim(data, i)[0] for i in range(12, p, 8)]
nelems = prod(dims)
- out: List[Any] = [None] * nelems
+ out: list[Any] = [None] * nelems
for i in range(nelems):
size = unpack_len(data, p)[0]
p += 4
import re
import struct
from collections import namedtuple
-from typing import Any, Callable, cast, Dict, Iterator, List
+from typing import Any, Callable, cast, Dict, Iterator
from typing import NamedTuple, Sequence, Tuple, Type, TYPE_CHECKING
from .. import pq
if not obj:
return start + end
- parts: List[abc.Buffer] = [start]
+ parts: list[abc.Buffer] = [start]
for item in obj:
if item is None:
class CompositeLoader(RecordLoader):
factory: Callable[..., Any]
- fields_types: List[int]
+ fields_types: list[int]
_types_set = False
def load(self, data: abc.Buffer) -> Any:
from __future__ import annotations
import re
-from typing import Dict, List, Type
+from typing import Dict, Type
from .. import errors as e
from .. import postgres
if not obj:
return b""
- tokens: List[str] = []
+ tokens: list[str] = []
def add_token(s: str) -> None:
tokens.append('"')
from __future__ import annotations
from decimal import Decimal
-from typing import Any, Generic, List, Iterable, MutableSequence
+from typing import Any, Generic, Iterable, MutableSequence
from typing import Type, overload, TYPE_CHECKING
from datetime import date, datetime
"""
def __init__(self, items: Iterable[Range[T]] = ()):
- self._ranges: List[Range[T]] = list(map(self._check_type, items))
+ self._ranges: list[Range[T]] = list(map(self._check_type, items))
def _check_type(self, item: Any) -> Range[Any]:
if not isinstance(item, Range):
else:
dump = fail_dump
- out: List[Buffer] = [b"{"]
+ out: list[Buffer] = [b"{"]
for r in obj:
out.append(dump_range_text(r, dump))
out.append(b",")
else:
dump = fail_dump
- out: List[Buffer] = [pack_len(len(obj))]
+ out: list[Buffer] = [pack_len(len(obj))]
for r in obj:
data = dump_range_binary(r, dump)
out.append(pack_len(len(data)))
from __future__ import annotations
import re
-from typing import Any, Dict, Generic, List, Type, Tuple
+from typing import Any, Dict, Generic, Type, Tuple
from typing import cast, TYPE_CHECKING
from decimal import Decimal
from datetime import date, datetime
if obj.isempty:
return b"empty"
- parts: List[Buffer] = [b"[" if obj.lower_inc else b"("]
+ parts: list[Buffer] = [b"[" if obj.lower_inc else b"("]
def dump_item(item: Any) -> Buffer:
ad = dump(item)
# Copyright (C) 2023 The Psycopg Team
+from __future__ import annotations
+
import os
-from typing import Any, List
+from typing import Any
import tomli
from setuptools import build_meta
-def get_requires_for_build_wheel(config_settings: Any = None) -> List[str]:
+def get_requires_for_build_wheel(config_settings: Any = None) -> list[str]:
if not os.path.exists("psycopg_c/_psycopg.pyx"):
# Cython files don't exist: we must be in a sdist and we can trust
# that the .c files we have packaged exist.
with open("pyproject.toml", "rb") as f:
pyprj = tomli.load(f)
- rv: List[str] = pyprj["cython-backend"]["cython-requires"]
+ rv: list[str] = pyprj["cython-backend"]["cython-requires"]
return rv
# Copyright (C) 2020 The Psycopg Team
-from typing import Any, List, Sequence, Tuple
+from __future__ import annotations
+
+from typing import Any, Sequence, Tuple
from psycopg import pq, abc, BaseConnection
from psycopg.rows import Row, RowMaker
class Transformer(abc.AdaptContext):
types: Tuple[int, ...] | None
- formats: List[pq.Format] | None
+ formats: list[pq.Format] | None
def __init__(self, context: abc.AdaptContext | None = None): ...
@classmethod
def from_context(cls, context: abc.AdaptContext | None) -> "Transformer": ...
) -> Sequence[abc.Buffer | None]: ...
def as_literal(self, obj: Any) -> bytes: ...
def get_dumper(self, obj: Any, format: PyFormat) -> abc.Dumper: ...
- def load_rows(self, row0: int, row1: int, make_row: RowMaker[Row]) -> List[Row]: ...
+ def load_rows(self, row0: int, row1: int, make_row: RowMaker[Row]) -> list[Row]: ...
def load_row(self, row: int, make_row: RowMaker[Row]) -> Row | None: ...
def load_sequence(self, record: Sequence[abc.Buffer | None]) -> Tuple[Any, ...]: ...
def get_loader(self, oid: int, format: pq.Format) -> abc.Loader: ...
def cancel(
cancel_conn: PGcancelConn, *, timeout: float = 0.0
) -> abc.PQGenConn[None]: ...
-def execute(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ...
+def execute(pgconn: PGconn) -> abc.PQGen[list[PGresult]]: ...
def send(pgconn: PGconn) -> abc.PQGen[None]: ...
-def fetch_many(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ...
+def fetch_many(pgconn: PGconn) -> abc.PQGen[list[PGresult]]: ...
def fetch(pgconn: PGconn) -> abc.PQGen[PGresult | None]: ...
def pipeline_communicate(
pgconn: PGconn, commands: Deque[abc.PipelineCommand]
-) -> abc.PQGen[List[List[PGresult]]]: ...
+) -> abc.PQGen[list[list[PGresult]]]: ...
def wait_c(
gen: abc.PQGen[abc.RV], fileno: int, interval: float | None = None
) -> abc.RV: ...
# Arrays optimization
def array_load_text(
data: abc.Buffer, loader: abc.Loader, delimiter: bytes = b","
-) -> List[Any]: ...
-def array_load_binary(data: abc.Buffer, tx: abc.Transformer) -> List[Any]: ...
+) -> list[Any]: ...
+def array_load_binary(data: abc.Buffer, tx: abc.Transformer) -> list[Any]: ...
from cpython.object cimport PyObject_CallFunctionObjArgs
-from typing import List
from time import monotonic
from psycopg import errors as e
raise e.InternalError(f"unexpected poll status: {status}")
-def execute(pq.PGconn pgconn) -> PQGen[List[abc.PGresult]]:
+def execute(pq.PGconn pgconn) -> PQGen[list[abc.PGresult]]:
"""
Generator sending a query and returning results without blocking.
f"consuming input failed: {error_message(pgconn)}")
-def fetch_many(pq.PGconn pgconn) -> PQGen[List[PGresult]]:
+def fetch_many(pq.PGconn pgconn) -> PQGen[list[PGresult]]:
"""
Generator retrieving results from the database without blocking.
def pipeline_communicate(
pq.PGconn pgconn, commands: Deque[PipelineCommand]
-) -> PQGen[List[List[PGresult]]]:
+) -> PQGen[list[list[PGresult]]]:
"""Generator to send queries from a connection in pipeline mode while also
receiving results.
_consume_notifies(pgconn)
- res: List[PGresult] = []
+ res: list[PGresult] = []
while True:
with nogil:
ibres = libpq.PQisBusy(pgconn_ptr)
from cpython.tuple cimport PyTuple_New, PyTuple_SET_ITEM
from cpython.object cimport PyObject, PyObject_CallFunctionObjArgs
-from typing import Any, Dict, Iterable, List, Sequence, Tuple
+from typing import Any, Dict, Iterable, Sequence, Tuple
from psycopg import errors as e
from psycopg.pq import Format as PqFormat
self.formats = pqformats
return out
- def load_rows(self, int row0, int row1, object make_row) -> List[Row]:
+ def load_rows(self, int row0, int row1, object make_row) -> list[Row]:
if self._pgresult is None:
raise e.InterfaceError("result not set")
class Conninfo:
@classmethod
- def get_defaults(cls) -> List[ConninfoOption]:
+ def get_defaults(cls) -> list[ConninfoOption]:
cdef libpq.PQconninfoOption *opts = libpq.PQconndefaults()
if opts is NULL :
raise MemoryError("couldn't allocate connection defaults")
return rv
@classmethod
- def parse(cls, const char *conninfo) -> List[ConninfoOption]:
+ def parse(cls, const char *conninfo) -> list[ConninfoOption]:
cdef char *errmsg = NULL
cdef libpq.PQconninfoOption *opts = libpq.PQconninfoParse(conninfo, &errmsg)
if opts is NULL:
return None
@property
- def info(self) -> List["ConninfoOption"]:
+ def info(self) -> list["ConninfoOption"]:
_ensure_pgconn(self)
cdef libpq.PQconninfoOption *opts = libpq.PQconninfo(self._pgconn_ptr)
if opts is NULL:
def oid_value(self) -> int:
return libpq.PQoidValue(self._pgresult_ptr)
- def set_attributes(self, descriptions: List[PGresAttDesc]):
+ def set_attributes(self, descriptions: list[PGresAttDesc]):
cdef Py_ssize_t num = len(descriptions)
cdef libpq.PGresAttDesc *attrs = <libpq.PGresAttDesc *>PyMem_Malloc(
num * sizeof(libpq.PGresAttDesc))
from abc import ABC, abstractmethod
from time import monotonic
from types import TracebackType
-from typing import Any, Iterator, cast, Dict, Generic, List
+from typing import Any, Iterator, cast, Dict, Generic
from typing import Type
from weakref import ref
from contextlib import contextmanager
self._pool_full_event: Event | None = None
self._sched_runner: Worker | None = None
- self._workers: List[Worker] = []
+ self._workers: list[Worker] = []
super().__init__(
conninfo,
from abc import ABC, abstractmethod
from time import monotonic
from types import TracebackType
-from typing import Any, AsyncIterator, cast, Dict, Generic, List
+from typing import Any, AsyncIterator, cast, Dict, Generic
from typing import Type
from weakref import ref
from contextlib import asynccontextmanager
self._pool_full_event: AEvent | None = None
self._sched_runner: AWorker | None = None
- self._workers: List[AWorker] = []
+ self._workers: list[AWorker] = []
super().__init__(
conninfo,
import logging
from time import monotonic
from heapq import heappush, heappop
-from typing import Any, Callable, List
+from typing import Any, Callable
from ._task import Task
from ._acompat import Lock, Event
class Scheduler:
def __init__(self) -> None:
- self._queue: List[Task] = []
+ self._queue: list[Task] = []
self._lock = Lock()
self._event = Event()
import logging
from time import monotonic
from heapq import heappush, heappop
-from typing import Any, Callable, List
+from typing import Any, Callable
from ._task import Task
from ._acompat import ALock, AEvent
class AsyncScheduler:
def __init__(self) -> None:
- self._queue: List[Task] = []
+ self._queue: list[Task] = []
self._lock = ALock()
self._event = AEvent()
Support module for test_connection[_async].py
"""
-from typing import Any, List
+from __future__ import annotations
+
+from typing import Any
from dataclasses import dataclass
import pytest
class ParamDef:
name: str
guc: str
- values: List[Any]
+ values: list[Any]
non_default: str
from __future__ import annotations
import re
-from typing import Any, List, Match
+from typing import Any, Match
import pytest
import psycopg
def my_row_factory(
- cursor: psycopg.Cursor[List[str]] | psycopg.AsyncCursor[List[str]],
-) -> RowMaker[List[str]]:
+ cursor: psycopg.Cursor[list[str]] | psycopg.AsyncCursor[list[str]],
+) -> RowMaker[list[str]]:
if cursor.description is not None:
titles = [c.name for c in cursor.description]
+from __future__ import annotations
+
import asyncio
import selectors
import sys
-from typing import Any, Dict, List
+from typing import Any, Dict
import pytest
return backend, options
-allow_fail_messages: List[str] = []
+allow_fail_messages: list[str] = []
def pytest_sessionfinish(session, exitstatus):
from math import isnan
from uuid import UUID
from random import choice, random, randrange
-from typing import Any, List, Set, Tuple
+from typing import Any, Set, Tuple
from decimal import Decimal
from contextlib import contextmanager, asynccontextmanager
self.records = []
self._schema = None
- self._types: List[type] | None = None
+ self._types: list[type] | None = None
self._types_names = None
self._makers = {}
self.table_name = sql.Identifier("fake_table")
return [sql.Identifier(f"fld_{i}") for i in range(len(self.schema))]
@property
- def types(self) -> List[type]:
+ def types(self) -> list[type]:
if not self._types:
def key(cls: type) -> str:
return self._types
@types.setter
- def types(self, types: List[type]) -> None:
+ def types(self, types: list[type]) -> None:
self._types = types
@property
)
def choose_schema(self, ncols=20):
- schema: List[Tuple[type, ...] | type] = []
+ schema: list[Tuple[type, ...] | type] = []
while len(schema) < ncols:
s = self.make_schema(choice(self.types))
if s is not None:
return l1 <= u2 and l2 <= u1
- out: List[Range[Any]] = []
+ out: list[Range[Any]] = []
for i in range(length):
r = self.make_Range((Range, spec[1]), **kwargs)
if r.isempty:
return spec[0](empty=True)
while True:
- bounds: List[Any] = []
+ bounds: list[Any] = []
while len(bounds) < 2:
if random() < no_bound_chance:
bounds.append(None)
if not length:
length = randrange(self.str_max_length)
- rv: List[int] = []
+ rv: list[int] = []
while len(rv) < length:
c = randrange(1, 128) if random() < 0.5 else randrange(1, 0x110000)
if not (0xD800 <= c <= 0xDBFF or 0xDC00 <= c <= 0xDFFF):
+from __future__ import annotations
+
import os
import sys
import ctypes
-from typing import Iterator, List, NamedTuple
+from typing import Iterator, NamedTuple
from tempfile import TemporaryFile
import pytest
direction: str
length: int
type: str
- content: List[bytes]
+ content: list[bytes]
# WARNING: this file is auto-generated by 'async_to_sync.py'
# from the original file 'test_pool_async.py'
# DO NOT CHANGE! Change the original file instead.
+from __future__ import annotations
+
import logging
import weakref
from time import time
-from typing import Any, Dict, List, Tuple
+from typing import Any, Dict, Tuple
import pytest
add_orig = pool.ConnectionPool._add_to_pool
monkeypatch.setattr(pool.ConnectionPool, "_add_to_pool", add_time)
- times: List[float] = []
+ times: list[float] = []
t0 = time()
with pool.ConnectionPool(dsn, min_size=5, num_workers=2) as p:
with pool.ConnectionPool(dsn, min_size=min_size, max_size=4, num_workers=3) as p:
p.wait(1.0)
- results: List[Tuple[int, float]] = []
+ results: list[Tuple[int, float]] = []
ts = [spawn(worker, args=(i,)) for i in range(len(want_times))]
gather(*ts)
def test_shrink(dsn, monkeypatch):
from psycopg_pool.pool import ShrinkPool
- results: List[Tuple[int, int]] = []
+ results: list[Tuple[int, int]] = []
def run_hacked(self, pool):
n0 = pool._nconns
with p.connection() as conn:
conn.execute("select pg_sleep(%s)", [t])
- size: List[int] = []
+ size: list[int] = []
with pool.ConnectionPool(dsn, min_size=2, max_idle=0.2) as p:
s = spawn(sampler)
+from __future__ import annotations
+
import logging
import weakref
from time import time
-from typing import Any, Dict, List, Tuple
+from typing import Any, Dict, Tuple
import pytest
add_orig = pool.AsyncConnectionPool._add_to_pool
monkeypatch.setattr(pool.AsyncConnectionPool, "_add_to_pool", add_time)
- times: List[float] = []
+ times: list[float] = []
t0 = time()
async with pool.AsyncConnectionPool(dsn, min_size=5, num_workers=2) as p:
dsn, min_size=min_size, max_size=4, num_workers=3
) as p:
await p.wait(1.0)
- results: List[Tuple[int, float]] = []
+ results: list[Tuple[int, float]] = []
ts = [spawn(worker, args=(i,)) for i in range(len(want_times))]
await gather(*ts)
async def test_shrink(dsn, monkeypatch):
from psycopg_pool.pool_async import ShrinkPool
- results: List[Tuple[int, int]] = []
+ results: list[Tuple[int, int]] = []
async def run_hacked(self, pool):
n0 = pool._nconns
async with p.connection() as conn:
await conn.execute("select pg_sleep(%s)", [t])
- size: List[int] = []
+ size: list[int] = []
async with pool.AsyncConnectionPool(dsn, min_size=2, max_idle=0.2) as p:
s = spawn(sampler)
# WARNING: this file is auto-generated by 'async_to_sync.py'
# from the original file 'test_pool_common_async.py'
# DO NOT CHANGE! Change the original file instead.
+from __future__ import annotations
+
import logging
from time import time
-from typing import Any, List, Tuple
+from typing import Any, Tuple
import pytest
t1 = time()
results.append((n, t1 - t0, pid))
- results: List[Tuple[int, float, int]] = []
+ results: list[Tuple[int, float, int]] = []
with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2) as p:
p.wait()
ts = [spawn(worker, args=(i,)) for i in range(6)]
else:
success.append(True)
- errors: List[Exception] = []
- success: List[bool] = []
+ errors: list[Exception] = []
+ success: list[bool] = []
with pool_cls(dsn, min_size=min_size(pool_cls), max_size=1, max_waiting=3) as p:
p.wait()
t1 = time()
results.append((n, t1 - t0, pid))
- results: List[Tuple[int, float, int]] = []
- errors: List[Tuple[int, float, Exception]] = []
+ results: list[Tuple[int, float, int]] = []
+ errors: list[Tuple[int, float, Exception]] = []
with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1) as p:
ts = [spawn(worker, args=(i,)) for i in range(4)]
raise
with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2) as p:
- results: List[int] = []
+ results: list[int] = []
ts = [
spawn(worker, args=(i, timeout))
for i, timeout in enumerate([0.4, 0.4, 0.1, 0.4, 0.4])
t1 = time()
results.append((n, t1 - t0, pid))
- results: List[Tuple[int, float, int]] = []
- errors: List[Tuple[int, float, Exception]] = []
+ results: list[Tuple[int, float, int]] = []
+ errors: list[Tuple[int, float, Exception]] = []
with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1) as p:
ts = [spawn(worker, args=(i,)) for i in range(4)]
with pool_cls(dsn, min_size=min_size(pool_cls), max_size=1) as p:
p.wait()
- success: List[str] = []
+ success: list[str] = []
t1 = spawn(w1)
# Wait until w1 has received a connection
+from __future__ import annotations
+
import logging
from time import time
-from typing import Any, List, Tuple
+from typing import Any, Tuple
import pytest
t1 = time()
results.append((n, t1 - t0, pid))
- results: List[Tuple[int, float, int]] = []
+ results: list[Tuple[int, float, int]] = []
async with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2) as p:
await p.wait()
ts = [spawn(worker, args=(i,)) for i in range(6)]
else:
success.append(True)
- errors: List[Exception] = []
- success: List[bool] = []
+ errors: list[Exception] = []
+ success: list[bool] = []
async with pool_cls(
dsn, min_size=min_size(pool_cls), max_size=1, max_waiting=3
t1 = time()
results.append((n, t1 - t0, pid))
- results: List[Tuple[int, float, int]] = []
- errors: List[Tuple[int, float, Exception]] = []
+ results: list[Tuple[int, float, int]] = []
+ errors: list[Tuple[int, float, Exception]] = []
async with pool_cls(
dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1
raise
async with pool_cls(dsn, min_size=min_size(pool_cls, 2), max_size=2) as p:
- results: List[int] = []
+ results: list[int] = []
ts = [
spawn(worker, args=(i, timeout))
for i, timeout in enumerate([0.4, 0.4, 0.1, 0.4, 0.4])
t1 = time()
results.append((n, t1 - t0, pid))
- results: List[Tuple[int, float, int]] = []
- errors: List[Tuple[int, float, Exception]] = []
+ results: list[Tuple[int, float, int]] = []
+ errors: list[Tuple[int, float, Exception]] = []
async with pool_cls(
dsn, min_size=min_size(pool_cls, 2), max_size=2, timeout=0.1
async with pool_cls(dsn, min_size=min_size(pool_cls), max_size=1) as p:
await p.wait()
- success: List[str] = []
+ success: list[str] = []
t1 = spawn(w1)
# Wait until w1 has received a connection
# WARNING: this file is auto-generated by 'async_to_sync.py'
# from the original file 'test_pool_null_async.py'
# DO NOT CHANGE! Change the original file instead.
+from __future__ import annotations
+
import logging
-from typing import Any, Dict, List
+from typing import Any, Dict
import pytest
from packaging.version import parse as ver # noqa: F401 # used in skipif
@pytest.mark.timing
@pytest.mark.crdb_skip("backend pid")
def test_max_lifetime(dsn):
- pids: List[int] = []
+ pids: list[int] = []
def worker():
with p.connection() as conn:
+from __future__ import annotations
+
import logging
-from typing import Any, Dict, List
+from typing import Any, Dict
import pytest
from packaging.version import parse as ver # noqa: F401 # used in skipif
@pytest.mark.timing
@pytest.mark.crdb_skip("backend pid")
async def test_max_lifetime(dsn):
- pids: List[int] = []
+ pids: list[int] = []
async def worker():
async with p.connection() as conn:
+from __future__ import annotations
+
import os
import sys
import time
import asyncio
import logging
from enum import Enum
-from typing import Any, Dict, List, Generator
+from typing import Any, Dict, Generator
from argparse import ArgumentParser, Namespace
from contextlib import contextmanager
from concurrent.futures import ThreadPoolExecutor
asyncpg = "asyncpg"
-ids: List[int] = []
-data: List[Dict[str, Any]] = []
+ids: list[int] = []
+data: list[Dict[str, Any]] = []
def main() -> None:
import datetime as dt
from types import ModuleType
-from typing import Any, List
+from typing import Any
import pytest
assert dempty.oid == 0
assert dempty.dump([]) == b"{}"
- L: List[List[Any]] = []
+ L: list[list[Any]] = []
L.append(L)
with pytest.raises(psycopg.DataError):
assert t.get_dumper(L, fmt_in)
import threading
import multiprocessing
import subprocess as sp
-from typing import List
import pytest
@pytest.mark.slow
@pytest.mark.crdb_skip("cancel")
def test_cancel(conn):
- errors: List[Exception] = []
+ errors: list[Exception] = []
cur = conn.cursor()
t = threading.Thread(target=canceller, args=(conn, errors))
@pytest.mark.slow
@pytest.mark.crdb_skip("cancel")
def test_cancel_stream(conn):
- errors: List[Exception] = []
+ errors: list[Exception] = []
cur = conn.cursor()
t = threading.Thread(target=canceller, args=(conn, errors))
+from __future__ import annotations
+
import sys
import time
import signal
import subprocess as sp
from asyncio import create_task
from asyncio.queues import Queue
-from typing import List
import pytest
with pytest.raises(e.QueryCanceled):
await cur.execute("select pg_sleep(2)")
- errors: List[Exception] = []
+ errors: list[Exception] = []
workers = [worker(), canceller(aconn, errors)]
t0 = time.time()
async for row in cur.stream("select pg_sleep(2)"):
pass
- errors: List[Exception] = []
+ errors: list[Exception] = []
workers = [worker(), canceller(aconn, errors)]
t0 = time.time()
# WARNING: this file is auto-generated by 'async_to_sync.py'
# from the original file 'test_connection_async.py'
# DO NOT CHANGE! Change the original file instead.
+from __future__ import annotations
+
import sys
import time
import pytest
import logging
import weakref
-from typing import Any, List
+from typing import Any
import psycopg
from psycopg import pq, errors as e
@skip_async
@pytest.mark.crdb("skip", reason="transaction isolation")
def test_set_transaction_param_all_property(conn):
- params: List[Any] = tx_params[:]
+ params: list[Any] = tx_params[:]
params[2] = params[2].values[0]
for param in params:
@pytest.mark.crdb("skip", reason="transaction isolation")
def test_set_transaction_param_all(conn):
- params: List[Any] = tx_params[:]
+ params: list[Any] = tx_params[:]
params[2] = params[2].values[0]
for param in params:
+from __future__ import annotations
+
import sys
import time
import pytest
import logging
import weakref
-from typing import Any, List
+from typing import Any
import psycopg
from psycopg import pq, errors as e
@skip_async
@pytest.mark.crdb("skip", reason="transaction isolation")
def test_set_transaction_param_all_property(conn):
- params: List[Any] = tx_params[:]
+ params: list[Any] = tx_params[:]
params[2] = params[2].values[0]
for param in params:
@pytest.mark.crdb("skip", reason="transaction isolation")
async def test_set_transaction_param_all(aconn):
- params: List[Any] = tx_params[:]
+ params: list[Any] = tx_params[:]
params[2] = params[2].values[0]
for param in params:
import weakref
import datetime as dt
-from typing import Any, List
+from typing import Any
from packaging.version import parse as ver
import pytest
assert cur.rownumber == 2
cur.fetchmany(10)
assert cur.rownumber == 12
- rns: List[int] = []
+ rns: list[int] = []
for i in cur:
assert cur.rownumber
rns.append(cur.rownumber)
import weakref
import datetime as dt
-from typing import Any, List
+from typing import Any
from packaging.version import parse as ver
import pytest
assert cur.rownumber == 2
await cur.fetchmany(10)
assert cur.rownumber == 12
- rns: List[int] = []
+ rns: list[int] = []
async for i in cur:
assert cur.rownumber
rns.append(cur.rownumber)
from __future__ import annotations
-from typing import List
-
import pytest
import psycopg
ans = fake_hosts[qname, rdtype]
except KeyError:
raise DNSException(f"unknown test host: {qname} {rdtype}")
- rv: List[A | SRV] = []
+ rv: list[A | SRV] = []
if rdtype == "A":
for entry in ans:
+from __future__ import annotations
+
import sys
import pickle
-from typing import List
from weakref import ref
import pytest
diag = e.Diagnostic(res)
to_check: pq.DiagnosticField
- checked: List[pq.DiagnosticField] = []
+ checked: list[pq.DiagnosticField] = []
def check_val(self, v):
nonlocal to_check
import time
from collections import deque
from functools import partial
-from typing import List
import pytest
def _run_pipeline_communicate(pgconn, generators, commands, expected_statuses):
- actual_statuses: List[pq.ExecStatus] = []
+ actual_statuses: list[pq.ExecStatus] = []
while len(actual_statuses) != len(expected_statuses):
if commands:
gen = generators.pipeline_communicate(pgconn, commands)
[
(
"conn.cursor()",
- "List[Tuple[Any, ...]]",
+ "list[Tuple[Any, ...]]",
),
(
"conn.cursor(row_factory=rows.dict_row)",
- "List[Dict[str, Any]]",
+ "list[Dict[str, Any]]",
),
(
"conn.cursor(row_factory=thing_row)",
- "List[Thing]",
+ "list[Thing]",
),
],
)
stmts = "\n".join(f" {line}" for line in stmts.splitlines())
src = f"""\
-from typing import Any, Callable, Dict, List, NamedTuple, Sequence, Tuple
+from __future__ import annotations
+
+from typing import Any, Callable, Dict, NamedTuple, Sequence, Tuple
import psycopg
from psycopg import rows
+from __future__ import annotations
+
import gc
from math import prod
-from typing import List, Any
+from typing import Any
from decimal import Decimal
import pytest
@pytest.mark.parametrize("fmt_out", pq.Format)
def test_load_nested_array(conn, fmt_out):
dims = [3, 4, 5, 6]
- a: List[Any] = list(range(prod(dims)))
+ a: list[Any] = list(range(prod(dims)))
for dim in dims[-1:0:-1]:
a = [a[i : i + dim] for i in range(0, len(a), dim)]
%(prog)s "host=localhost port=11111 user=postgres password=password"
"""
+from __future__ import annotations
+
import re
import argparse
import subprocess as sp
-from typing import List
from pathlib import Path
import psycopg
sp.check_call(["black", "-q", fn])
-def get_version_comment(conn: Connection) -> List[str]:
+def get_version_comment(conn: Connection) -> list[str]:
if conn.info.vendor == "PostgreSQL":
version = version_pretty(conn.info.server_version)
elif conn.info.vendor == "CockroachDB":
return ["", f" # Generated from {conn.info.vendor} {version}", ""]
-def get_py_oids(conn: Connection) -> List[str]:
+def get_py_oids(conn: Connection) -> list[str]:
lines = []
for typname, oid in conn.execute(
"""
}
-def get_py_types(conn: Connection) -> List[str]:
+def get_py_types(conn: Connection) -> list[str]:
# Note: "record" is a pseudotype but still a useful one to have.
# "pg_lsn" is a documented public type and useful in streaming replication
lines = []
return lines
-def get_py_ranges(conn: Connection) -> List[str]:
+def get_py_ranges(conn: Connection) -> list[str]:
lines = []
for typname, oid, typarray, rngsubtype in conn.execute(
"""
return lines
-def get_py_multiranges(conn: Connection) -> List[str]:
+def get_py_multiranges(conn: Connection) -> list[str]:
lines = []
for typname, oid, typarray, rngtypid, rngsubtype in conn.execute(
"""
return lines
-def get_cython_oids(conn: Connection) -> List[str]:
+def get_cython_oids(conn: Connection) -> list[str]:
lines = []
for typname, oid in conn.execute(
"""
return lines
-def update_file(fn: Path, new: List[str]) -> None:
+def update_file(fn: Path, new: list[str]) -> None:
with fn.open("r") as f:
lines = f.read().splitlines()
istart, iend = [