from psycopg import types
if cls.__module__.startswith(types.__name__):
- new = cast(type[RV], getattr(_psycopg, cls.__name__, None))
+ new = cast("type[RV]", getattr(_psycopg, cls.__name__, None))
if new:
self._optimised[cls] = new
return new
if TYPE_CHECKING:
from .abc import Buffer
from .cursor import Cursor
- from .connection import Connection
+ from .connection import Connection # noqa: F401
COPY_IN = pq.ExecStatus.COPY_IN
COPY_OUT = pq.ExecStatus.COPY_OUT
ACTIVE = pq.TransactionStatus.ACTIVE
-class Copy(BaseCopy[Connection[Any]]):
+class Copy(BaseCopy["Connection[Any]"]):
"""Manage an asynchronous :sql:`COPY` operation.
:param cursor: the cursor where the operation is performed.
if TYPE_CHECKING:
from .abc import Buffer
from .cursor_async import AsyncCursor
- from .connection_async import AsyncConnection
+ from .connection_async import AsyncConnection # noqa: F401
COPY_IN = pq.ExecStatus.COPY_IN
COPY_OUT = pq.ExecStatus.COPY_OUT
ACTIVE = pq.TransactionStatus.ACTIVE
-class AsyncCopy(BaseCopy[AsyncConnection[Any]]):
+class AsyncCopy(BaseCopy["AsyncConnection[Any]"]):
"""Manage an asynchronous :sql:`COPY` operation.
:param cursor: the cursor where the operation is performed.
from ._compat import Deque, Self, TypeAlias
from .pq.misc import connection_summary
from ._encodings import pgconn_encoding
-from ._preparing import Key, Prepare
from .generators import pipeline_communicate, fetch_many, send
from ._capabilities import capabilities
if TYPE_CHECKING:
from .pq.abc import PGresult
- from ._cursor_base import BaseCursor
from .connection import Connection
+ from ._preparing import Key, Prepare # noqa: F401
+ from ._cursor_base import BaseCursor # noqa: F401
from ._connection_base import BaseConnection
from .connection_async import AsyncConnection
PendingResult: TypeAlias = (
- tuple[BaseCursor[Any, Any], tuple[Key, Prepare, bytes] | None] | None
+ "tuple[BaseCursor[Any, Any], tuple[Key, Prepare, bytes] | None] | None"
)
FATAL_ERROR = pq.ExecStatus.FATAL_ERROR
from .pq.abc import PGresult
from ._connection_base import BaseConnection
-Key: TypeAlias = tuple[bytes, tuple[int, ...]]
+Key: TypeAlias = "tuple[bytes, tuple[int, ...]]"
COMMAND_OK = pq.ExecStatus.COMMAND_OK
TUPLES_OK = pq.ExecStatus.TUPLES_OK
# Counter to generate prepared statements names
self._prepared_idx = 0
- self._to_flush = Deque[bytes | None]()
+ self._to_flush = Deque["bytes | None"]()
@staticmethod
def key(query: PostgresQuery) -> Key:
from . import pq
from . import abc
from . import errors as e
-from .abc import Buffer, LoadFunc, AdaptContext, PyFormat, DumperKey, NoneType
+from .abc import Buffer, LoadFunc, AdaptContext, PyFormat, NoneType
from .rows import Row, RowMaker
from ._oids import INVALID_OID, TEXT_OID
from ._compat import TypeAlias
from ._encodings import conn_encoding
if TYPE_CHECKING:
+ from .abc import DumperKey # noqa: F401
from .adapt import AdaptersMap
from .pq.abc import PGresult
from ._connection_base import BaseConnection
-DumperCache: TypeAlias = dict[DumperKey, abc.Dumper]
-OidDumperCache: TypeAlias = dict[int, abc.Dumper]
-LoaderCache: TypeAlias = dict[int, abc.Loader]
+DumperCache: TypeAlias = "dict[DumperKey, abc.Dumper]"
+OidDumperCache: TypeAlias = "dict[int, abc.Dumper]"
+LoaderCache: TypeAlias = "dict[int, abc.Loader]"
TEXT = pq.Format.TEXT
PY_TEXT = PyFormat.TEXT
# The type of the _query2pg() and _query2pg_nocache() methods
_Query2Pg: TypeAlias = Callable[
- [bytes, str], tuple[bytes, list[PyFormat], list[str] | None, list[QueryPart]]
+ [bytes, str], "tuple[bytes, list[PyFormat], list[str] | None, list[QueryPart]]"
]
_Query2PgClient: TypeAlias = Callable[
- [bytes, str], tuple[bytes, list[str] | None, list[QueryPart]]
+ [bytes, str], "tuple[bytes, list[str] | None, list[QueryPart]]"
]
from ._compat import TypeAlias
PackInt: TypeAlias = Callable[[int], bytes]
-UnpackInt: TypeAlias = Callable[[Buffer], tuple[int]]
+UnpackInt: TypeAlias = Callable[[Buffer], "tuple[int]"]
PackFloat: TypeAlias = Callable[[float], bytes]
-UnpackFloat: TypeAlias = Callable[[Buffer], tuple[float]]
+UnpackFloat: TypeAlias = Callable[[Buffer], "tuple[float]"]
class UnpackLen(Protocol):
from . import pq
from ._enums import PyFormat as PyFormat
-from ._compat import LiteralString, TypeAlias, TypeVar
+from ._compat import TypeAlias, TypeVar
if TYPE_CHECKING:
- from . import sql
+ from . import sql # noqa: F401
from .rows import Row, RowMaker
from .pq.abc import PGresult
- from .waiting import Wait, Ready
+ from .waiting import Wait, Ready # noqa: F401
+ from ._compat import LiteralString # noqa: F401
from ._adapters_map import AdaptersMap
from ._connection_base import BaseConnection
NoneType: type = type(None)
# An object implementing the buffer protocol
-Buffer: TypeAlias = bytes | bytearray | memoryview
+Buffer: TypeAlias = "bytes | bytearray | memoryview"
-Query: TypeAlias = LiteralString | bytes | sql.SQL | sql.Composed
-Params: TypeAlias = Sequence[Any] | Mapping[str, Any]
+Query: TypeAlias = "LiteralString | bytes | sql.SQL | sql.Composed"
+Params: TypeAlias = "Sequence[Any] | Mapping[str, Any]"
ConnectionType = TypeVar("ConnectionType", bound="BaseConnection[Any]")
PipelineCommand: TypeAlias = Callable[[], None]
-DumperKey: TypeAlias = type | tuple["DumperKey", ...]
-ConnParam: TypeAlias = str | int | None
-ConnDict: TypeAlias = dict[str, ConnParam]
+DumperKey: TypeAlias = "type | tuple[DumperKey, ...]"
+ConnParam: TypeAlias = "str | int | None"
+ConnDict: TypeAlias = "dict[str, ConnParam]"
ConnMapping: TypeAlias = Mapping[str, ConnParam]
RV = TypeVar("RV")
-PQGenConn: TypeAlias = Generator[tuple[int, Wait], Ready | int, RV]
+PQGenConn: TypeAlias = Generator["tuple[int, Wait]", "Ready | int", RV]
"""Generator for processes where the connection file number can change.
This can happen in connection and reset, but not in normal querying.
"""
-PQGen: TypeAlias = Generator[Wait, Ready | int, RV]
+PQGen: TypeAlias = Generator["Wait", "Ready | int", RV]
"""Generator for processes where the connection file number won't change.
"""
# Adaptation types
-DumpFunc: TypeAlias = Callable[[Any], Buffer | None]
+DumpFunc: TypeAlias = Callable[[Any], "Buffer | None"]
LoadFunc: TypeAlias = Callable[[Buffer], Any]
from __future__ import annotations
-from typing import Any, TYPE_CHECKING
+from typing import TYPE_CHECKING
from functools import partial
from ._queries import PostgresQuery, PostgresClientQuery
from .cursor_async import AsyncCursor
if TYPE_CHECKING:
- from .connection import Connection
- from .connection_async import AsyncConnection
+ from typing import Any # noqa: F401
+ from .connection import Connection # noqa: F401
+ from .connection_async import AsyncConnection # noqa: F401
TEXT = pq.Format.TEXT
BINARY = pq.Format.BINARY
return (Prepare.NO, b"")
-class ClientCursor(ClientCursorMixin[Connection[Any], Row], Cursor[Row]):
+class ClientCursor(ClientCursorMixin["Connection[Any]", Row], Cursor[Row]):
__module__ = "psycopg"
-class AsyncClientCursor(ClientCursorMixin[AsyncConnection[Any], Row], AsyncCursor[Row]):
+class AsyncClientCursor(
+ ClientCursorMixin["AsyncConnection[Any]", Row], AsyncCursor[Row]
+):
__module__ = "psycopg"
ACTIVE = pq.TransactionStatus.ACTIVE
-class Cursor(BaseCursor[Connection[Any], Row]):
+class Cursor(BaseCursor["Connection[Any]", Row]):
__module__ = "psycopg"
__slots__ = ()
ACTIVE = pq.TransactionStatus.ACTIVE
-class AsyncCursor(BaseCursor[AsyncConnection[Any], Row]):
+class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
__module__ = "psycopg"
__slots__ = ()
if TYPE_CHECKING:
from .pq.misc import PGnotify, ConninfoOption
-ErrorInfo: TypeAlias = None | PGresult | dict[int, bytes | None]
+ErrorInfo: TypeAlias = "PGresult | dict[int, bytes | None] | None"
_sqlcodes: dict[str, type[Error]] = {}
import inspect
import logging
-from typing import Any, Callable, TYPE_CHECKING
+from typing import Any, Callable
from functools import wraps
from .._compat import Self, TypeVar
+from . import abc
from . import PGconn
from .misc import connection_summary
-if TYPE_CHECKING:
- from . import abc
-
Func = TypeVar("Func", bound=Callable[..., Any])
logger = logging.getLogger("psycopg.debug")
from .misc import PGnotify, ConninfoOption, PGresAttDesc
# An object implementing the buffer protocol (ish)
-Buffer: TypeAlias = bytes | bytearray | memoryview
+Buffer: TypeAlias = "bytes | bytearray | memoryview"
class PGconn(Protocol):
from __future__ import annotations
-from typing import Any, TYPE_CHECKING
-
+from typing import TYPE_CHECKING
from .abc import ConnectionType, Query, Params
from .sql import Composable
from .rows import Row
from ._cursor_base import BaseCursor
if TYPE_CHECKING:
- from .connection import Connection
- from .connection_async import AsyncConnection
+ from typing import Any # noqa: F401
+ from .connection import Connection # noqa: F401
+ from .connection_async import AsyncConnection # noqa: F401
class PostgresRawQuery(PostgresQuery):
_query_cls = PostgresRawQuery
-class RawCursor(RawCursorMixin[Connection[Any], Row], Cursor[Row]):
+class RawCursor(RawCursorMixin["Connection[Any]", Row], Cursor[Row]):
__module__ = "psycopg"
-class AsyncRawCursor(RawCursorMixin[AsyncConnection[Any], Row], AsyncCursor[Row]):
+class AsyncRawCursor(RawCursorMixin["AsyncConnection[Any]", Row], AsyncCursor[Row]):
__module__ = "psycopg"
def __call__(self, __cursor: BaseCursor[Any, Any]) -> RowMaker[Row]: ...
-TupleRow: TypeAlias = tuple[Any, ...]
+TupleRow: TypeAlias = "tuple[Any, ...]"
"""
An alias for the type returned by `tuple_row()` (i.e. a tuple of any content).
"""
-DictRow: TypeAlias = dict[str, Any]
+DictRow: TypeAlias = "dict[str, Any]"
"""
An alias for the type returned by `dict_row()`
return sql.SQL(" ").join(parts)
-class ServerCursor(ServerCursorMixin[Connection[Any], Row], Cursor[Row]):
+class ServerCursor(ServerCursorMixin["Connection[Any]", Row], Cursor[Row]):
__module__ = "psycopg"
__slots__ = ()
self._pos = value
-class AsyncServerCursor(ServerCursorMixin[AsyncConnection[Any], Row], AsyncCursor[Row]):
+class AsyncServerCursor(
+ ServerCursorMixin["AsyncConnection[Any]", Row], AsyncCursor[Row]
+):
__module__ = "psycopg"
__slots__ = ()
)
-class Transaction(BaseTransaction[Connection[Any]]):
+class Transaction(BaseTransaction["Connection[Any]"]):
"""
Returned by `Connection.transaction()` to handle a transaction block.
"""
return False
-class AsyncTransaction(BaseTransaction[AsyncConnection[Any]]):
+class AsyncTransaction(BaseTransaction["AsyncConnection[Any]"]):
"""
Returned by `AsyncConnection.transaction()` to handle a transaction block.
"""
_struct_head = struct.Struct("!III") # ndims, hasnull, elem oid
_pack_head = cast(Callable[[int, int, int], bytes], _struct_head.pack)
-_unpack_head = cast(Callable[[Buffer], tuple[int, int, int]], _struct_head.unpack_from)
+_unpack_head = cast(
+ Callable[[Buffer], "tuple[int, int, int]"], _struct_head.unpack_from
+)
_struct_dim = struct.Struct("!II") # dim, lower bound
_pack_dim = cast(Callable[[int, int], bytes], _struct_dim.pack)
-_unpack_dim = cast(Callable[[Buffer, int], tuple[int, int]], _struct_dim.unpack_from)
+_unpack_dim = cast(Callable[[Buffer, int], "tuple[int, int]"], _struct_dim.unpack_from)
PY_TEXT = PyFormat.TEXT
PQ_BINARY = pq.Format.BINARY
_struct_oidlen = struct.Struct("!Ii")
_pack_oidlen = cast(Callable[[int, int], bytes], _struct_oidlen.pack)
_unpack_oidlen = cast(
- Callable[[abc.Buffer, int], tuple[int, int]], _struct_oidlen.unpack_from
+ Callable[[abc.Buffer, int], "tuple[int, int]"], _struct_oidlen.unpack_from
)
_struct_timetz = struct.Struct("!qi") # microseconds, sec tz offset
_pack_timetz = cast(Callable[[int, int], bytes], _struct_timetz.pack)
-_unpack_timetz = cast(Callable[[Buffer], tuple[int, int]], _struct_timetz.unpack)
+_unpack_timetz = cast(Callable[[Buffer], "tuple[int, int]"], _struct_timetz.unpack)
_struct_interval = struct.Struct("!qii") # microseconds, days, months
_pack_interval = cast(Callable[[int, int, int], bytes], _struct_interval.pack)
_unpack_interval = cast(
- Callable[[Buffer], tuple[int, int, int]], _struct_interval.unpack
+ Callable[[Buffer], "tuple[int, int, int]"], _struct_interval.unpack
)
utc = timezone.utc
E = TypeVar("E", bound=Enum)
-EnumDumpMap: TypeAlias = dict[E, bytes]
-EnumLoadMap: TypeAlias = dict[bytes, E]
-EnumMapping: TypeAlias = Mapping[E, str] | Sequence[tuple[E, str]] | None
+EnumDumpMap: TypeAlias = "dict[E, bytes]"
+EnumLoadMap: TypeAlias = "dict[bytes, E]"
+EnumMapping: TypeAlias = "Mapping[E, str] | Sequence[tuple[E, str]] | None"
# Hashable versions
-_HEnumDumpMap: TypeAlias = tuple[tuple[E, bytes], ...]
-_HEnumLoadMap: TypeAlias = tuple[tuple[bytes, E], ...]
+_HEnumDumpMap: TypeAlias = "tuple[tuple[E, bytes], ...]"
+_HEnumLoadMap: TypeAlias = "tuple[tuple[bytes, E], ...]"
TEXT = Format.TEXT
BINARY = Format.BINARY
raise TypeError("no info passed. Is the requested enum available?")
if enum is None:
- enum = cast(type[E], _make_enum(info.name, tuple(info.labels)))
+ enum = cast("type[E]", _make_enum(info.name, tuple(info.labels)))
info.enum = enum
adapters = context.adapters if context else postgres.adapters
)
-Hstore: TypeAlias = dict[str, str | None]
+Hstore: TypeAlias = "dict[str, str | None]"
class BaseHstoreDumper(RecursiveDumper):
from ..pq import Format
from ..adapt import Buffer, Dumper, Loader, PyFormat, AdaptersMap
from ..errors import DataError
-from .._compat import cache
+from .._compat import cache, TypeAlias
-JsonDumpsFunction = Callable[[Any], str | bytes]
-JsonLoadsFunction = Callable[[str | bytes], Any]
+JsonDumpsFunction: TypeAlias = Callable[[Any], "str | bytes"]
+JsonLoadsFunction: TypeAlias = Callable[["str | bytes"], Any]
def set_json_dumps(
if TYPE_CHECKING:
import ipaddress
-Address: TypeAlias = "ipaddress.IPv4Address" | "ipaddress.IPv6Address"
-Interface: TypeAlias = "ipaddress.IPv4Interface" | "ipaddress.IPv6Interface"
-Network: TypeAlias = "ipaddress.IPv4Network" | "ipaddress.IPv6Network"
+Address: TypeAlias = "ipaddress.IPv4Address | ipaddress.IPv6Address"
+Interface: TypeAlias = "ipaddress.IPv4Interface | ipaddress.IPv6Interface"
+Network: TypeAlias = "ipaddress.IPv4Network | ipaddress.IPv6Network"
# These objects will be imported lazily
ip_address: Callable[[str], Address] = None # type: ignore[assignment]
_contexts[i] = DefaultContext
_unpack_numeric_head = cast(
- Callable[[Buffer], tuple[int, int, int, int]],
+ Callable[[Buffer], "tuple[int, int, int, int]"],
struct.Struct("!HhHH").unpack_from,
)
_pack_numeric_head = cast(
from __future__ import annotations
-from typing import Any, Awaitable, Callable, TYPE_CHECKING
+from typing import Awaitable, Callable, TYPE_CHECKING
from ._compat import TypeAlias, TypeVar
if TYPE_CHECKING:
- from .pool import ConnectionPool
- from .pool_async import AsyncConnectionPool
+ from typing import Any # noqa: F401
+ from .pool import ConnectionPool # noqa: F401
+ from .pool_async import AsyncConnectionPool # noqa: F401
from psycopg import Connection, AsyncConnection # noqa: F401
from psycopg.rows import TupleRow # noqa: F401
AsyncConnectionCB: TypeAlias = Callable[[ACT], Awaitable[None]]
# Callbacks to pass the pool to on connection failure
-ConnectFailedCB: TypeAlias = Callable[[ConnectionPool[Any]], None]
+ConnectFailedCB: TypeAlias = "Callable[[ConnectionPool[Any]], None]"
AsyncConnectFailedCB: TypeAlias = (
- Callable[[AsyncConnectionPool[Any]], None]
- | Callable[[AsyncConnectionPool[Any]], Awaitable[None]]
+ "Callable[[AsyncConnectionPool[Any]], None]"
+ "| Callable[[AsyncConnectionPool[Any]], Awaitable[None]]"
)
self,
conninfo: str = "",
*,
- connection_class: type[CT] = cast(type[CT], Connection),
+ connection_class: type[CT] = cast("type[CT]", Connection),
kwargs: dict[str, Any] | None = None,
min_size: int = 0,
max_size: int | None = None,
self,
conninfo: str = "",
*,
- connection_class: type[ACT] = cast(type[ACT], AsyncConnection),
+ connection_class: type[ACT] = cast("type[ACT]", AsyncConnection),
kwargs: dict[str, Any] | None = None,
min_size: int = 0, # Note: min_size default value changed to 0.
max_size: int | None = None,
self,
conninfo: str = "",
*,
- connection_class: type[CT] = cast(type[CT], Connection),
+ connection_class: type[CT] = cast("type[CT]", Connection),
kwargs: dict[str, Any] | None = None,
min_size: int = 4,
max_size: int | None = None,
self,
conninfo: str = "",
*,
- connection_class: type[ACT] = cast(type[ACT], AsyncConnection),
+ connection_class: type[ACT] = cast("type[ACT]", AsyncConnection),
kwargs: dict[str, Any] | None = None,
min_size: int = 4,
max_size: int | None = None,
import logging
import weakref
from time import time
-from typing import Any
+from typing import Any, Dict
import pytest
pool.ConnectionPool(min_size=min_size, max_size=max_size)
-class MyRow(dict[str, Any]):
+class MyRow(Dict[str, Any]):
pass
import logging
import weakref
from time import time
-from typing import Any
+from typing import Any, Dict
import pytest
pool.AsyncConnectionPool(min_size=min_size, max_size=max_size)
-class MyRow(dict[str, Any]):
+class MyRow(Dict[str, Any]):
pass
from __future__ import annotations
import logging
-from typing import Any
+from typing import Any, Dict
import pytest
from packaging.version import parse as ver # noqa: F401 # used in skipif
pool.NullConnectionPool(min_size=min_size, max_size=max_size)
-class MyRow(dict[str, Any]):
+class MyRow(Dict[str, Any]):
pass
from __future__ import annotations
import logging
-from typing import Any
+from typing import Any, Dict
import pytest
from packaging.version import parse as ver # noqa: F401 # used in skipif
pool.AsyncNullConnectionPool(min_size=min_size, max_size=max_size)
-class MyRow(dict[str, Any]):
+class MyRow(Dict[str, Any]):
pass
# WARNING: this file is auto-generated by 'async_to_sync.py'
# from the original file 'test_pipeline_async.py'
# DO NOT CHANGE! Change the original file instead.
+from __future__ import annotations
+
import logging
from typing import Any
from operator import attrgetter
+from __future__ import annotations
+
import logging
from typing import Any
from operator import attrgetter
+from __future__ import annotations
+
import pytest
import datetime as dt
from typing import Any
self.generic_visit(node)
return node
+ def visit_Call(self, node: ast.Call) -> ast.AST:
+ match node:
+ case ast.Call(func=ast.Name(id="cast")):
+ node.args[0] = self._convert_if_literal_string(node.args[0])
+
+ self.generic_visit(node)
+ return node
+
def visit_FunctionDef(self, node: ast.FunctionDef) -> ast.AST:
self._fix_docstring(node.body)
if node.decorator_list: