from ._compat import TypeVar
Worker: TypeAlias = threading.Thread
-AWorker: TypeAlias = "asyncio.Task[None]"
+AWorker: TypeAlias = asyncio.Task[None]
T = TypeVar("T")
from psycopg import types
if cls.__module__.startswith(types.__name__):
- if new := cast("type[RV]", getattr(_psycopg, cls.__name__, None)):
+ if new := cast(type[RV], getattr(_psycopg, cls.__name__, None)):
self._optimised[cls] = new
return new
from .pq.abc import PGresult
from ._connection_base import BaseConnection
-Key: TypeAlias = "tuple[bytes, tuple[int, ...]]"
+Key: TypeAlias = tuple[bytes, tuple[int, ...]]
COMMAND_OK = pq.ExecStatus.COMMAND_OK
TUPLES_OK = pq.ExecStatus.TUPLES_OK
# Counter to generate prepared statements names
self._prepared_idx = 0
- self._to_flush = deque["bytes | None"]()
+ self._to_flush = deque[bytes | None]()
@staticmethod
def key(query: PostgresQuery) -> Key:
from ._connection_base import BaseConnection
DumperCache: TypeAlias = "dict[DumperKey, abc.Dumper]"
-OidDumperCache: TypeAlias = "dict[int, abc.Dumper]"
-LoaderCache: TypeAlias = "dict[int, abc.Loader]"
+OidDumperCache: TypeAlias = dict[int, abc.Dumper]
+LoaderCache: TypeAlias = dict[int, abc.Loader]
TEXT = pq.Format.TEXT
PY_TEXT = PyFormat.TEXT
from .abc import Buffer
PackInt: TypeAlias = Callable[[int], bytes]
-UnpackInt: TypeAlias = Callable[[Buffer], "tuple[int]"]
+UnpackInt: TypeAlias = Callable[[Buffer], tuple[int]]
PackFloat: TypeAlias = Callable[[float], bytes]
-UnpackFloat: TypeAlias = Callable[[Buffer], "tuple[float]"]
+UnpackFloat: TypeAlias = Callable[[Buffer], tuple[float]]
class UnpackLen(Protocol):
from .connection_async import AsyncConnection
T = TypeVar("T", bound="TypeInfo")
-RegistryKey: TypeAlias = "str | int | tuple[type, int]"
+RegistryKey: TypeAlias = str | int | tuple[type, int]
class TypeInfo:
from ._compat import LiteralString, TypeVar
if TYPE_CHECKING:
- from . import sql # noqa: F401
+ from . import sql
from .rows import Row, RowMaker
from .pq.abc import PGresult
- from .waiting import Ready, Wait # noqa: F401
+ from .waiting import Ready, Wait
from ._adapters_map import AdaptersMap
from ._connection_base import BaseConnection
RV = TypeVar("RV")
-PQGenConn: TypeAlias = Generator["tuple[int, Wait]", "Ready | int", RV]
+PQGenConn: TypeAlias = Generator[tuple[int, "Wait"], "Ready | int", RV]
"""Generator for processes where the connection file number can change.
This can happen in connection and reset, but not in normal querying.
# Adaptation types
-DumpFunc: TypeAlias = Callable[[Any], "Buffer | None"]
+DumpFunc: TypeAlias = Callable[[Any], Buffer | None]
LoadFunc: TypeAlias = Callable[[Buffer], Any]
if TYPE_CHECKING:
from .pq.misc import ConninfoOption, PGnotify
-ErrorInfo: TypeAlias = "PGresult | dict[int, bytes | None] | None"
+ErrorInfo: TypeAlias = PGresult | dict[int, bytes | None] | None
_sqlcodes: dict[str, type[Error]] = {}
E = TypeVar("E", bound=Enum)
-EnumDumpMap: TypeAlias = "dict[E, bytes]"
-EnumLoadMap: TypeAlias = "dict[bytes, E]"
-EnumMapping: TypeAlias = "Mapping[E, str] | Sequence[tuple[E, str]] | None"
+EnumDumpMap: TypeAlias = dict[E, bytes]
+EnumLoadMap: TypeAlias = dict[bytes, E]
+EnumMapping: TypeAlias = Mapping[E, str] | Sequence[tuple[E, str]] | None
# Hashable versions
-_HEnumDumpMap: TypeAlias = "tuple[tuple[E, bytes], ...]"
-_HEnumLoadMap: TypeAlias = "tuple[tuple[bytes, E], ...]"
+_HEnumDumpMap: TypeAlias = tuple[tuple[E, bytes], ...]
+_HEnumLoadMap: TypeAlias = tuple[tuple[bytes, E], ...]
TEXT = Format.TEXT
BINARY = Format.BINARY
raise TypeError("no info passed. Is the requested enum available?")
if enum is None:
- enum = cast("type[E]", _make_enum(info.name, tuple(info.labels)))
+ enum = cast(type[E], _make_enum(info.name, tuple(info.labels)))
info.enum = enum
adapters = context.adapters if context else postgres.adapters
"""Lookup list for small ints to bytes conversions."""
-Hstore: TypeAlias = "dict[str, str | None]"
+Hstore: TypeAlias = dict[str, str | None]
class BaseHstoreDumper(RecursiveDumper):
from ..adapt import AdaptersMap, Buffer, Dumper, Loader, PyFormat
from ..errors import DataError
-JsonDumpsFunction: TypeAlias = Callable[[Any], "str | bytes"]
-JsonLoadsFunction: TypeAlias = Callable[["str | bytes"], Any]
+JsonDumpsFunction: TypeAlias = Callable[[Any], str | bytes]
+JsonLoadsFunction: TypeAlias = Callable[[str | bytes], Any]
def set_json_dumps(
sleep = time.sleep
Worker: TypeAlias = threading.Thread
-AWorker: TypeAlias = "asyncio.Task[None]"
+AWorker: TypeAlias = asyncio.Task[None]
def current_thread_name() -> str:
from ._compat import TypeVar
if TYPE_CHECKING:
- from typing import Any # noqa: F401
+ from typing import Any
from psycopg import AsyncConnection, Connection # noqa: F401
from psycopg.rows import TupleRow # noqa: F401
self,
conninfo: str = "",
*,
- connection_class: type[CT] = cast("type[CT]", Connection),
+ connection_class: type[CT] = cast(type[CT], Connection),
kwargs: dict[str, Any] | None = None,
min_size: int = 0,
max_size: int | None = None,
self,
conninfo: str = "",
*,
- connection_class: type[ACT] = cast("type[ACT]", AsyncConnection),
+ connection_class: type[ACT] = cast(type[ACT], AsyncConnection),
kwargs: dict[str, Any] | None = None,
min_size: int = 0, # Note: min_size default value changed to 0.
max_size: int | None = None,
self,
conninfo: str = "",
*,
- connection_class: type[CT] = cast("type[CT]", Connection),
+ connection_class: type[CT] = cast(type[CT], Connection),
kwargs: dict[str, Any] | None = None,
min_size: int = 4,
max_size: int | None = None,
self,
conninfo: str = "",
*,
- connection_class: type[ACT] = cast("type[ACT]", AsyncConnection),
+ connection_class: type[ACT] = cast(type[ACT], AsyncConnection),
kwargs: dict[str, Any] | None = None,
min_size: int = 4,
max_size: int | None = None,
ensure_table(cur, sample_tabledef)
with cur.copy(f"copy copy_in from stdin {copyopt(format)}") as copy:
- row: "tuple[Any, ...]"
+ row: tuple[Any, ...]
for row in sample_records:
if format == pq.Format.BINARY:
row = tuple((Int4(i) if isinstance(i, int) else i for i in row))
await ensure_table_async(cur, sample_tabledef)
async with cur.copy(f"copy copy_in from stdin {copyopt(format)}") as copy:
- row: "tuple[Any, ...]"
+ row: tuple[Any, ...]
for row in sample_records:
if format == pq.Format.BINARY:
row = tuple(Int4(i) if isinstance(i, int) else i for i in row)