def __init__(
self,
- template: "AdaptersMap" | None = None,
+ template: AdaptersMap | None = None,
types: TypesRegistry | None = None,
):
if template:
# implement the AdaptContext protocol too
@property
- def adapters(self) -> "AdaptersMap":
+ def adapters(self) -> AdaptersMap:
return self
@property
- def connection(self) -> "BaseConnection[Any]" | None:
+ def connection(self) -> BaseConnection[Any] | None:
return None
def register_dumper(self, cls: type | str | None, dumper: type[Dumper]) -> None:
class Column(Sequence[Any]):
__module__ = "psycopg"
- def __init__(self, cursor: "BaseCursor[Any, Any]", index: int):
+ def __init__(self, cursor: BaseCursor[Any, Any], index: int):
res = cursor.pgresult
assert res
ConnStatus = pq.ConnStatus
TransactionStatus = pq.TransactionStatus
- def __init__(self, pgconn: "PGconn"):
+ def __init__(self, pgconn: PGconn):
self.pgconn = pgconn
self._autocommit = False
# Attribute is only set if the connection is from a pool so we can tell
# apart a connection in the pool too (when _pool = None)
- self._pool: "BasePool" | None
+ self._pool: BasePool | None
self._pipeline: BasePipeline | None = None
@staticmethod
def _notice_handler(
- wself: "ReferenceType[BaseConnection[Row]]", res: "PGresult"
+ wself: "ReferenceType[BaseConnection[Row]]", res: PGresult
) -> None:
self = wself()
if not (self and self._notice_handlers):
def _exec_command(
self, command: Query, result_format: pq.Format = TEXT
- ) -> PQGen["PGresult" | None]:
+ ) -> PQGen[PGresult | None]:
"""
Generator to send a command and receive the result to the backend.
if TYPE_CHECKING:
from .abc import Buffer
from .cursor import Cursor
- from .connection import Connection # noqa: F401
+ from .connection import Connection
COPY_IN = pq.ExecStatus.COPY_IN
COPY_OUT = pq.ExecStatus.COPY_OUT
ACTIVE = pq.TransactionStatus.ACTIVE
-class Copy(BaseCopy["Connection[Any]"]):
+class Copy(BaseCopy[Connection[Any]]):
"""Manage an asynchronous :sql:`COPY` operation.
:param cursor: the cursor where the operation is performed.
if TYPE_CHECKING:
from .abc import Buffer
from .cursor_async import AsyncCursor
- from .connection_async import AsyncConnection # noqa: F401
+ from .connection_async import AsyncConnection
COPY_IN = pq.ExecStatus.COPY_IN
COPY_OUT = pq.ExecStatus.COPY_OUT
ACTIVE = pq.TransactionStatus.ACTIVE
-class AsyncCopy(BaseCopy["AsyncConnection[Any]"]):
+class AsyncCopy(BaseCopy[AsyncConnection[Any]]):
"""Manage an asynchronous :sql:`COPY` operation.
:param cursor: the cursor where the operation is performed.
if TYPE_CHECKING:
from ._cursor_base import BaseCursor
- from .connection import Connection # noqa: F401
- from .connection_async import AsyncConnection # noqa: F401
PY_TEXT = adapt.PyFormat.TEXT
PY_BINARY = adapt.PyFormat.BINARY
def __init__(
self,
- cursor: "BaseCursor[ConnectionType, Any]",
+ cursor: BaseCursor[ConnectionType, Any],
*,
binary: bool | None = None,
):
ExecStatus = pq.ExecStatus
- _tx: "Transformer"
+ _tx: Transformer
_make_row: RowMaker[Row]
- _pgconn: "PGconn"
+ _pgconn: PGconn
_query_cls: type[PostgresQuery] = PostgresQuery
def __init__(self, connection: ConnectionType):
self._reset()
def _reset(self, reset_query: bool = True) -> None:
- self._results: list["PGresult"] = []
- self.pgresult: "PGresult" | None = None
+ self._results: list[PGresult] = []
+ self.pgresult: PGresult | None = None
self._pos = 0
self._iresult = 0
self._rowcount = -1
self._last_query = query
yield from send(self._pgconn)
- def _stream_fetchone_gen(self, first: bool) -> PQGen["PGresult" | None]:
+ def _stream_fetchone_gen(self, first: bool) -> PQGen[PGresult | None]:
res = yield from fetch(self._pgconn)
if res is None:
return None
pgq.convert(query, params)
return pgq
- def _check_results(self, results: list["PGresult"]) -> None:
+ def _check_results(self, results: list[PGresult]) -> None:
"""
Verify that the results of a query are valid.
if status != TUPLES_OK and status != COMMAND_OK and status != EMPTY_QUERY:
self._raise_for_result(res)
- def _raise_for_result(self, result: "PGresult") -> NoReturn:
+ def _raise_for_result(self, result: PGresult) -> NoReturn:
"""
Raise an appropriate error message for an unexpected database result
"""
self._make_row = self._make_row_maker()
- def _set_results(self, results: list["PGresult"]) -> None:
+ def _set_results(self, results: list[PGresult]) -> None:
if self._execmany_returning is None:
# Received from execute()
self._results[:] = results
else:
raise e.ProgrammingError("the last operation didn't produce a result")
- def _check_copy_result(self, result: "PGresult") -> None:
+ def _check_copy_result(self, result: PGresult) -> None:
"""
Check that the value returned in a copy() operation is a legit COPY.
"""
pg_codecs = {v: k.encode() for k, v in _py_codecs.items()}
-def conn_encoding(conn: "BaseConnection[Any] | None") -> str:
+def conn_encoding(conn: BaseConnection[Any] | None) -> str:
"""
Return the Python encoding name of a psycopg connection.
return "utf-8"
-def pgconn_encoding(pgconn: "PGconn") -> str:
+def pgconn_encoding(pgconn: PGconn) -> str:
"""
Return the Python encoding name of a libpq connection.
command_queue: Deque[PipelineCommand]
result_queue: Deque[PendingResult]
- def __init__(self, conn: "BaseConnection[Any]") -> None:
+ def __init__(self, conn: BaseConnection[Any]) -> None:
self._conn = conn
self.pgconn = conn.pgconn
self.command_queue = Deque[PipelineCommand]()
if exception is not None:
raise exception
- def _process_results(
- self, queued: PendingResult, results: list["PGresult"]
- ) -> None:
+ def _process_results(self, queued: PendingResult, results: list[PGresult]) -> None:
"""Process a results set fetched from the current pipeline.
This matches 'results' with its respective element in the pipeline
"""Handler for connection in pipeline mode."""
__module__ = "psycopg"
- _conn: "Connection[Any]"
+ _conn: Connection[Any]
- def __init__(self, conn: "Connection[Any]") -> None:
+ def __init__(self, conn: Connection[Any]) -> None:
super().__init__(conn)
def sync(self) -> None:
"""Handler for async connection in pipeline mode."""
__module__ = "psycopg"
- _conn: "AsyncConnection[Any]"
+ _conn: AsyncConnection[Any]
- def __init__(self, conn: "AsyncConnection[Any]") -> None:
+ def __init__(self, conn: AsyncConnection[Any]) -> None:
super().__init__(conn)
async def sync(self) -> None:
from __future__ import annotations
from enum import IntEnum, auto
-from typing import Sequence, TYPE_CHECKING
+from typing import Any, Sequence, TYPE_CHECKING
from collections import OrderedDict
from . import pq
from ._queries import PostgresQuery
if TYPE_CHECKING:
- from typing import Any
from .pq.abc import PGresult
from ._connection_base import BaseConnection
# The query is not to be prepared yet
return Prepare.NO, b""
- def _should_discard(self, prep: Prepare, results: Sequence["PGresult"]) -> bool:
+ def _should_discard(self, prep: Prepare, results: Sequence[PGresult]) -> bool:
"""Check if we need to discard our entire state: it should happen on
rollback or on dropping objects, because the same object may get
recreated and postgres would fail internal lookups.
return False
@staticmethod
- def _check_results(results: Sequence["PGresult"]) -> bool:
+ def _check_results(results: Sequence[PGresult]) -> bool:
"""Return False if 'results' are invalid for prepared statement cache."""
if len(results) != 1:
# We cannot prepare a multiple statement
key: Key,
prep: Prepare,
name: bytes,
- results: Sequence["PGresult"],
+ results: Sequence[PGresult],
) -> None:
"""Validate cached entry with 'key' by checking query 'results'.
else:
return False
- def maintain_gen(self, conn: "BaseConnection[Any]") -> PQGen[None]:
+ def maintain_gen(self, conn: BaseConnection[Any]) -> PQGen[None]:
"""
Generator to send the commands to perform periodic maintenance
types: tuple[int, ...] | None
formats: list[pq.Format] | None
- _adapters: "AdaptersMap"
- _pgresult: "PGresult" | None
+ _adapters: AdaptersMap
+ _pgresult: PGresult | None
_none_oid: int
def __init__(self, context: AdaptContext | None = None):
return cls(context)
@property
- def connection(self) -> "BaseConnection[Any]" | None:
+ def connection(self) -> BaseConnection[Any] | None:
return self._conn
@property
return self._encoding
@property
- def adapters(self) -> "AdaptersMap":
+ def adapters(self) -> AdaptersMap:
return self._adapters
@property
- def pgresult(self) -> "PGresult" | None:
+ def pgresult(self) -> PGresult | None:
return self._pgresult
def set_pgresult(
self,
- result: "PGresult" | None,
+ result: PGresult | None,
*,
set_loaders: bool = True,
format: pq.Format | None = None,
_tx _want_formats _parts _encoding _order
""".split()
- def __init__(self, transformer: "Transformer"):
+ def __init__(self, transformer: Transformer):
self._tx = transformer
self.params: Sequence[Buffer | None] | None = None
from ._compat import LiteralString, TypeAlias, TypeVar
if TYPE_CHECKING:
- from . import sql # noqa: F401
+ from . import sql
from .rows import Row, RowMaker
from .pq.abc import PGresult
from .waiting import Wait, Ready
# An object implementing the buffer protocol
Buffer: TypeAlias = bytes | bytearray | memoryview
-Query: TypeAlias = LiteralString | bytes | "sql.SQL" | "sql.Composed"
+Query: TypeAlias = LiteralString | bytes | sql.SQL | sql.Composed
Params: TypeAlias = Sequence[Any] | Mapping[str, Any]
ConnectionType = TypeVar("ConnectionType", bound="BaseConnection[Any]")
PipelineCommand: TypeAlias = Callable[[], None]
This can happen in connection and reset, but not in normal querying.
"""
-PQGen: TypeAlias = Generator["Wait", "Ready" | int, RV]
+PQGen: TypeAlias = Generator[Wait, Ready | int, RV]
"""Generator for processes where the connection file number won't change.
"""
"""
@property
- def adapters(self) -> "AdaptersMap":
+ def adapters(self) -> AdaptersMap:
"""The adapters configuration that this object uses."""
...
@property
- def connection(self) -> "BaseConnection[Any]" | None:
+ def connection(self) -> BaseConnection[Any] | None:
"""The connection used by this object, if available.
:rtype: `~psycopg.Connection` or `~psycopg.AsyncConnection` or `!None`
"""
...
- def upgrade(self, obj: Any, format: PyFormat) -> "Dumper":
+ def upgrade(self, obj: Any, format: PyFormat) -> Dumper:
"""Return a new dumper to manage `!obj`.
:param obj: The object to convert
def __init__(self, context: AdaptContext | None = None): ...
@classmethod
- def from_context(cls, context: AdaptContext | None) -> "Transformer": ...
+ def from_context(cls, context: AdaptContext | None) -> Transformer: ...
@property
- def connection(self) -> "BaseConnection[Any]" | None: ...
+ def connection(self) -> BaseConnection[Any] | None: ...
@property
def encoding(self) -> str: ...
@property
- def adapters(self) -> "AdaptersMap": ...
+ def adapters(self) -> AdaptersMap: ...
@property
- def pgresult(self) -> "PGresult" | None: ...
+ def pgresult(self) -> PGresult | None: ...
def set_pgresult(
self,
- result: "PGresult" | None,
+ result: PGresult | None,
*,
set_loaders: bool = True,
format: pq.Format | None = None,
def get_dumper(self, obj: Any, format: PyFormat) -> Dumper: ...
- def load_rows(
- self, row0: int, row1: int, make_row: "RowMaker[Row]"
- ) -> list["Row"]: ...
+ def load_rows(self, row0: int, row1: int, make_row: RowMaker[Row]) -> list[Row]: ...
- def load_row(self, row: int, make_row: "RowMaker[Row]") -> "Row" | None: ...
+ def load_row(self, row: int, make_row: RowMaker[Row]) -> Row | None: ...
def load_sequence(self, record: Sequence[Buffer | None]) -> tuple[Any, ...]: ...
# Objects exported here
from ._enums import PyFormat as PyFormat
from ._transformer import Transformer as Transformer
-from ._adapters_map import AdaptersMap as AdaptersMap # noqa: F401
+from ._adapters_map import AdaptersMap as AdaptersMap # noqa: F401 # reexport
if TYPE_CHECKING:
from ._connection_base import BaseConnection
def __init__(self, cls: type, context: abc.AdaptContext | None = None):
self.cls = cls
- self.connection: "BaseConnection[Any]" | None
+ self.connection: BaseConnection[Any] | None
self.connection = context.connection if context else None
def __repr__(self) -> str:
def __init__(self, oid: int, context: abc.AdaptContext | None = None):
self.oid = oid
- self.connection: "BaseConnection[Any]" | None
+ self.connection: BaseConnection[Any] | None
self.connection = context.connection if context else None
@abstractmethod
from __future__ import annotations
-from typing import TYPE_CHECKING
+from typing import Any, TYPE_CHECKING
from functools import partial
from ._queries import PostgresQuery, PostgresClientQuery
from .cursor_async import AsyncCursor
if TYPE_CHECKING:
- from typing import Any # noqa: F401
- from .connection import Connection # noqa: F401
- from .connection_async import AsyncConnection # noqa: F401
+ from .connection import Connection
+ from .connection_async import AsyncConnection
TEXT = pq.Format.TEXT
BINARY = pq.Format.BINARY
return (Prepare.NO, b"")
-class ClientCursor(ClientCursorMixin["Connection[Any]", Row], Cursor[Row]):
+class ClientCursor(ClientCursorMixin[Connection[Any], Row], Cursor[Row]):
__module__ = "psycopg"
-class AsyncClientCursor(
- ClientCursorMixin["AsyncConnection[Any]", Row], AsyncCursor[Row]
-):
+class AsyncClientCursor(ClientCursorMixin[AsyncConnection[Any], Row], AsyncCursor[Row]):
__module__ = "psycopg"
def __init__(
self,
- pgconn: "PGconn",
+ pgconn: PGconn,
row_factory: RowFactory[Row] = cast(RowFactory[Row], tuple_row),
):
super().__init__(pgconn)
def __init__(
self,
- pgconn: "PGconn",
+ pgconn: PGconn,
row_factory: AsyncRowFactory[Row] = cast(AsyncRowFactory[Row], tuple_row),
):
super().__init__(pgconn)
class _CrdbConnectionMixin:
_adapters: AdaptersMap | None
- pgconn: "PGconn"
+ pgconn: PGconn
@classmethod
- def is_crdb(cls, conn: Connection[Any] | AsyncConnection[Any] | "PGconn") -> bool:
+ def is_crdb(cls, conn: Connection[Any] | AsyncConnection[Any] | PGconn) -> bool:
"""
Return `!True` if the server connected to `!conn` is CockroachDB.
"""
ACTIVE = pq.TransactionStatus.ACTIVE
-class Cursor(BaseCursor["Connection[Any]", Row]):
+class Cursor(BaseCursor[Connection[Any], Row]):
__module__ = "psycopg"
__slots__ = ()
ACTIVE = pq.TransactionStatus.ACTIVE
-class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
+class AsyncCursor(BaseCursor[AsyncConnection[Any], Row]):
__module__ = "psycopg"
__slots__ = ()
nonblocking: int = 0
- notice_handler: Callable[["PGresult"], None] | None = None
- notify_handler: Callable[["PGnotify"], None] | None = None
+ notice_handler: Callable[[PGresult], None] | None = None
+ notify_handler: Callable[[PGnotify], None] | None = None
@staticmethod
def _raise() -> NoReturn:
class PGconnDebug:
"""Wrapper for a PQconn logging all its access."""
- _pgconn: "abc.PGconn"
+ _pgconn: abc.PGconn
- def __init__(self, pgconn: "abc.PGconn"):
+ def __init__(self, pgconn: abc.PGconn):
super().__setattr__("_pgconn", pgconn)
def __repr__(self) -> str:
from typing import Any, Callable, Protocol, Sequence, TYPE_CHECKING
from ._enums import Format, Trace
-from .._compat import TypeAlias
+from .._compat import Self, TypeAlias
if TYPE_CHECKING:
from .misc import PGnotify, ConninfoOption, PGresAttDesc
class PGconn(Protocol):
- notice_handler: Callable[["PGresult"], None] | None
- notify_handler: Callable[["PGnotify"], None] | None
+ notice_handler: Callable[[PGresult], None] | None
+ notify_handler: Callable[[PGnotify], None] | None
@classmethod
- def connect(cls, conninfo: bytes) -> "PGconn": ...
+ def connect(cls, conninfo: bytes) -> Self: ...
@classmethod
- def connect_start(cls, conninfo: bytes) -> "PGconn": ...
+ def connect_start(cls, conninfo: bytes) -> Self: ...
def connect_poll(self) -> int: ...
def finish(self) -> None: ...
@property
- def info(self) -> list["ConninfoOption"]: ...
+ def info(self) -> list[ConninfoOption]: ...
def reset(self) -> None: ...
@property
def ssl_in_use(self) -> bool: ...
- def exec_(self, command: bytes) -> "PGresult": ...
+ def exec_(self, command: bytes) -> PGresult: ...
def send_query(self, command: bytes) -> None: ...
param_types: Sequence[int] | None = None,
param_formats: Sequence[int] | None = None,
result_format: int = Format.TEXT,
- ) -> "PGresult": ...
+ ) -> PGresult: ...
def send_query_params(
self,
name: bytes,
command: bytes,
param_types: Sequence[int] | None = None,
- ) -> "PGresult": ...
+ ) -> PGresult: ...
def exec_prepared(
self,
param_values: Sequence[Buffer] | None,
param_formats: Sequence[int] | None = None,
result_format: int = 0,
- ) -> "PGresult": ...
+ ) -> PGresult: ...
- def describe_prepared(self, name: bytes) -> "PGresult": ...
+ def describe_prepared(self, name: bytes) -> PGresult: ...
def send_describe_prepared(self, name: bytes) -> None: ...
- def describe_portal(self, name: bytes) -> "PGresult": ...
+ def describe_portal(self, name: bytes) -> PGresult: ...
def send_describe_portal(self, name: bytes) -> None: ...
- def close_prepared(self, name: bytes) -> "PGresult": ...
+ def close_prepared(self, name: bytes) -> PGresult: ...
def send_close_prepared(self, name: bytes) -> None: ...
- def close_portal(self, name: bytes) -> "PGresult": ...
+ def close_portal(self, name: bytes) -> PGresult: ...
def send_close_portal(self, name: bytes) -> None: ...
- def get_result(self) -> "PGresult" | None: ...
+ def get_result(self) -> PGresult | None: ...
def consume_input(self) -> None: ...
def set_chunked_rows_mode(self, size: int) -> None: ...
- def cancel_conn(self) -> "PGcancelConn": ...
+ def cancel_conn(self) -> PGcancelConn: ...
- def get_cancel(self) -> "PGcancel": ...
+ def get_cancel(self) -> PGcancel: ...
- def notifies(self) -> "PGnotify" | None: ...
+ def notifies(self) -> PGnotify | None: ...
def put_copy_data(self, buffer: Buffer) -> int: ...
def change_password(self, user: bytes, passwd: bytes) -> None: ...
- def make_empty_result(self, exec_status: int) -> "PGresult": ...
+ def make_empty_result(self, exec_status: int) -> PGresult: ...
@property
def pipeline_status(self) -> int: ...
@property
def oid_value(self) -> int: ...
- def set_attributes(self, descriptions: list["PGresAttDesc"]) -> None: ...
+ def set_attributes(self, descriptions: list[PGresAttDesc]) -> None: ...
class PGcancelConn(Protocol):
class Conninfo(Protocol):
@classmethod
- def get_defaults(cls) -> list["ConninfoOption"]: ...
+ def get_defaults(cls) -> list[ConninfoOption]: ...
@classmethod
- def parse(cls, conninfo: bytes) -> list["ConninfoOption"]: ...
+ def parse(cls, conninfo: bytes) -> list[ConninfoOption]: ...
@classmethod
- def _options_from_array(cls, opts: Sequence[Any]) -> list["ConninfoOption"]: ...
+ def _options_from_array(cls, opts: Sequence[Any]) -> list[ConninfoOption]: ...
class Escaping(Protocol):
def __init__(self, pgconn_ptr: impl.PGconn_struct):
self._pgconn_ptr: impl.PGconn_struct | None = pgconn_ptr
- self.notice_handler: Callable[["abc.PGresult"], None] | None = None
+ self.notice_handler: Callable[[abc.PGresult], None] | None = None
self.notify_handler: Callable[[PGnotify], None] | None = None
# Keep alive for the lifetime of PGconn
return f"<{cls} {info} at 0x{id(self):x}>"
@classmethod
- def connect(cls, conninfo: bytes) -> "PGconn":
+ def connect(cls, conninfo: bytes) -> PGconn:
if not isinstance(conninfo, bytes):
raise TypeError(f"bytes expected, got {type(conninfo)} instead")
return cls(pgconn_ptr)
@classmethod
- def connect_start(cls, conninfo: bytes) -> "PGconn":
+ def connect_start(cls, conninfo: bytes) -> PGconn:
if not isinstance(conninfo, bytes):
raise TypeError(f"bytes expected, got {type(conninfo)} instead")
return addressof(self._pgconn_ptr.contents) # type: ignore[attr-defined]
@property
- def info(self) -> list["ConninfoOption"]:
+ def info(self) -> list[ConninfoOption]:
self._ensure_pgconn()
opts = impl.PQconninfo(self._pgconn_ptr)
if not opts:
def ssl_in_use(self) -> bool:
return self._call_bool(impl.PQsslInUse)
- def exec_(self, command: bytes) -> "PGresult":
+ def exec_(self, command: bytes) -> PGresult:
if not isinstance(command, bytes):
raise TypeError(f"bytes expected, got {type(command)} instead")
self._ensure_pgconn()
def exec_params(
self,
command: bytes,
- param_values: Sequence["abc.Buffer" | None] | None,
+ param_values: Sequence[abc.Buffer | None] | None,
param_types: Sequence[int] | None = None,
param_formats: Sequence[int] | None = None,
result_format: int = Format.TEXT,
- ) -> "PGresult":
+ ) -> PGresult:
args = self._query_params_args(
command, param_values, param_types, param_formats, result_format
)
def send_query_params(
self,
command: bytes,
- param_values: Sequence["abc.Buffer" | None] | None,
+ param_values: Sequence[abc.Buffer | None] | None,
param_types: Sequence[int] | None = None,
param_formats: Sequence[int] | None = None,
result_format: int = Format.TEXT,
def send_query_prepared(
self,
name: bytes,
- param_values: Sequence["abc.Buffer" | None] | None,
+ param_values: Sequence[abc.Buffer | None] | None,
param_formats: Sequence[int] | None = None,
result_format: int = Format.TEXT,
) -> None:
def _query_params_args(
self,
command: bytes,
- param_values: Sequence["abc.Buffer" | None] | None,
+ param_values: Sequence[abc.Buffer | None] | None,
param_types: Sequence[int] | None = None,
param_formats: Sequence[int] | None = None,
result_format: int = Format.TEXT,
name: bytes,
command: bytes,
param_types: Sequence[int] | None = None,
- ) -> "PGresult":
+ ) -> PGresult:
if not isinstance(name, bytes):
raise TypeError(f"'name' must be bytes, got {type(name)} instead")
def exec_prepared(
self,
name: bytes,
- param_values: Sequence["abc.Buffer"] | None,
+ param_values: Sequence[abc.Buffer] | None,
param_formats: Sequence[int] | None = None,
result_format: int = 0,
- ) -> "PGresult":
+ ) -> PGresult:
if not isinstance(name, bytes):
raise TypeError(f"'name' must be bytes, got {type(name)} instead")
)
return PGresult(rv)
- def describe_prepared(self, name: bytes) -> "PGresult":
+ def describe_prepared(self, name: bytes) -> PGresult:
if not isinstance(name, bytes):
raise TypeError(f"'name' must be bytes, got {type(name)} instead")
self._ensure_pgconn()
f"sending describe prepared failed: {error_message(self)}"
)
- def describe_portal(self, name: bytes) -> "PGresult":
+ def describe_portal(self, name: bytes) -> PGresult:
if not isinstance(name, bytes):
raise TypeError(f"'name' must be bytes, got {type(name)} instead")
self._ensure_pgconn()
f"sending describe portal failed: {error_message(self)}"
)
- def close_prepared(self, name: bytes) -> "PGresult":
+ def close_prepared(self, name: bytes) -> PGresult:
if not isinstance(name, bytes):
raise TypeError(f"'name' must be bytes, got {type(name)} instead")
self._ensure_pgconn()
f"sending close prepared failed: {error_message(self)}"
)
- def close_portal(self, name: bytes) -> "PGresult":
+ def close_portal(self, name: bytes) -> PGresult:
if not isinstance(name, bytes):
raise TypeError(f"'name' must be bytes, got {type(name)} instead")
self._ensure_pgconn()
f"sending close portal failed: {error_message(self)}"
)
- def get_result(self) -> "PGresult" | None:
+ def get_result(self) -> PGresult | None:
rv = impl.PQgetResult(self._pgconn_ptr)
return PGresult(rv) if rv else None
if not impl.PQsetChunkedRowsMode(self._pgconn_ptr, size):
raise e.OperationalError("setting chunked rows mode failed")
- def cancel_conn(self) -> "PGcancelConn":
+ def cancel_conn(self) -> PGcancelConn:
"""
Create a connection over which a cancel request can be sent.
raise e.OperationalError("couldn't create cancelConn object")
return PGcancelConn(rv)
- def get_cancel(self) -> "PGcancel":
+ def get_cancel(self) -> PGcancel:
"""
Create an object with the information needed to cancel a command.
else:
return None
- def put_copy_data(self, buffer: "abc.Buffer") -> int:
+ def put_copy_data(self, buffer: abc.Buffer) -> int:
if not isinstance(buffer, bytes):
buffer = bytes(buffer)
rv = impl.PQputCopyData(self._pgconn_ptr, buffer, len(buffer))
f"failed to change password change command: {error_message(self)}"
)
- def make_empty_result(self, exec_status: int) -> "PGresult":
+ def make_empty_result(self, exec_status: int) -> PGresult:
rv = impl.PQmakeEmptyPGresult(self._pgconn_ptr, exec_status)
if not rv:
raise MemoryError("couldn't allocate empty PGresult")
def __init__(self, conn: PGconn | None = None):
self.conn = conn
- def escape_literal(self, data: "abc.Buffer") -> bytes:
+ def escape_literal(self, data: abc.Buffer) -> bytes:
if not self.conn:
raise e.OperationalError("escape_literal failed: no connection provided")
impl.PQfreemem(out)
return rv
- def escape_identifier(self, data: "abc.Buffer") -> bytes:
+ def escape_identifier(self, data: abc.Buffer) -> bytes:
if not self.conn:
raise e.OperationalError("escape_identifier failed: no connection provided")
impl.PQfreemem(out)
return rv
- def escape_string(self, data: "abc.Buffer") -> bytes:
+ def escape_string(self, data: abc.Buffer) -> bytes:
if not isinstance(data, bytes):
data = bytes(data)
return out.value
- def escape_bytea(self, data: "abc.Buffer") -> bytes:
+ def escape_bytea(self, data: abc.Buffer) -> bytes:
len_out = c_size_t()
# TODO: might be able to do without a copy but it's a mess.
# the C library does it better anyway, so maybe not worth optimising
impl.PQfreemem(out)
return rv
- def unescape_bytea(self, data: "abc.Buffer") -> bytes:
+ def unescape_bytea(self, data: abc.Buffer) -> bytes:
# not needed, but let's keep it symmetric with the escaping:
# if a connection is passed in, it must be valid.
if self.conn:
from __future__ import annotations
-from typing import TYPE_CHECKING
+from typing import Any, TYPE_CHECKING
from .abc import ConnectionType, Query, Params
from .sql import Composable
from ._cursor_base import BaseCursor
if TYPE_CHECKING:
- from typing import Any # noqa: F401
- from .connection import Connection # noqa: F401
- from .connection_async import AsyncConnection # noqa: F401
+ from .connection import Connection
+ from .connection_async import AsyncConnection
class PostgresRawQuery(PostgresQuery):
_query_cls = PostgresRawQuery
-class RawCursor(RawCursorMixin["Connection[Any]", Row], Cursor[Row]):
+class RawCursor(RawCursorMixin[Connection[Any], Row], Cursor[Row]):
__module__ = "psycopg"
-class AsyncRawCursor(RawCursorMixin["AsyncConnection[Any]", Row], AsyncCursor[Row]):
+class AsyncRawCursor(RawCursorMixin[AsyncConnection[Any], Row], AsyncCursor[Row]):
__module__ = "psycopg"
use the values to create a dictionary for each record.
"""
- def __call__(self, __cursor: "Cursor[Any]") -> RowMaker[Row]: ...
+ def __call__(self, __cursor: Cursor[Any]) -> RowMaker[Row]: ...
class AsyncRowFactory(Protocol[Row]):
Like `RowFactory`, taking an async cursor as argument.
"""
- def __call__(self, __cursor: "AsyncCursor[Any]") -> RowMaker[Row]: ...
+ def __call__(self, __cursor: AsyncCursor[Any]) -> RowMaker[Row]: ...
class BaseRowFactory(Protocol[Row]):
Like `RowFactory`, taking either type of cursor as argument.
"""
- def __call__(self, __cursor: "BaseCursor[Any, Any]") -> RowMaker[Row]: ...
+ def __call__(self, __cursor: BaseCursor[Any, Any]) -> RowMaker[Row]: ...
TupleRow: TypeAlias = tuple[Any, ...]
"""
-def tuple_row(cursor: "BaseCursor[Any, Any]") -> "RowMaker[TupleRow]":
+def tuple_row(cursor: BaseCursor[Any, Any]) -> RowMaker[TupleRow]:
r"""Row factory to represent rows as simple tuples.
This is the default factory, used when `~psycopg.Connection.connect()` or
return tuple
-def dict_row(cursor: "BaseCursor[Any, Any]") -> "RowMaker[DictRow]":
+def dict_row(cursor: BaseCursor[Any, Any]) -> RowMaker[DictRow]:
"""Row factory to represent rows as dictionaries.
The dictionary keys are taken from the column names of the returned columns.
return dict_row_
-def namedtuple_row(
- cursor: "BaseCursor[Any, Any]",
-) -> "RowMaker[NamedTuple]":
+def namedtuple_row(cursor: BaseCursor[Any, Any]) -> RowMaker[NamedTuple]:
"""Row factory to represent rows as `~collections.namedtuple`.
The field names are taken from the column names of the returned columns,
:rtype: `!Callable[[Cursor],` `RowMaker`\[~T]]
"""
- def class_row_(cursor: "BaseCursor[Any, Any]") -> "RowMaker[T]":
+ def class_row_(cursor: BaseCursor[Any, Any]) -> RowMaker[T]:
names = _get_names(cursor)
if names is None:
return no_result
returned by the query as positional arguments.
"""
- def args_row_(cur: "BaseCursor[Any, T]") -> "RowMaker[T]":
+ def args_row_(cur: BaseCursor[Any, T]) -> RowMaker[T]:
def args_row__(values: Sequence[Any]) -> T:
return func(*values)
returned by the query as keyword arguments.
"""
- def kwargs_row_(cursor: "BaseCursor[Any, T]") -> "RowMaker[T]":
+ def kwargs_row_(cursor: BaseCursor[Any, T]) -> RowMaker[T]:
names = _get_names(cursor)
if names is None:
return no_result
return kwargs_row_
-def scalar_row(cursor: "BaseCursor[Any, Any]") -> "RowMaker[Any]":
+def scalar_row(cursor: BaseCursor[Any, Any]) -> RowMaker[Any]:
"""
Generate a row factory returning the first column
as a scalar value.
raise e.InterfaceError("the cursor doesn't have a result")
-def _get_names(cursor: "BaseCursor[Any, Any]") -> list[str] | None:
+def _get_names(cursor: BaseCursor[Any, Any]) -> list[str] | None:
res = cursor.pgresult
if not res:
return None
]
-def _get_nfields(res: "PGresult") -> int | None:
+def _get_nfields(res: PGresult) -> int | None:
"""
Return the number of columns in a result, if it returns tuples else None
return sql.SQL(" ").join(parts)
-class ServerCursor(ServerCursorMixin["Connection[Any]", Row], Cursor[Row]):
+class ServerCursor(ServerCursorMixin[Connection[Any], Row], Cursor[Row]):
__module__ = "psycopg"
__slots__ = ()
@overload
def __init__(
self,
- connection: "Connection[Row]",
+ connection: Connection[Row],
name: str,
*,
scrollable: bool | None = None,
@overload
def __init__(
self,
- connection: "Connection[Any]",
+ connection: Connection[Any],
name: str,
*,
row_factory: RowFactory[Row],
def __init__(
self,
- connection: "Connection[Any]",
+ connection: Connection[Any],
name: str,
*,
row_factory: RowFactory[Row] | None = None,
self._pos = value
-class AsyncServerCursor(
- ServerCursorMixin["AsyncConnection[Any]", Row], AsyncCursor[Row]
-):
+class AsyncServerCursor(ServerCursorMixin[AsyncConnection[Any], Row], AsyncCursor[Row]):
__module__ = "psycopg"
__slots__ = ()
@overload
def __init__(
self,
- connection: "AsyncConnection[Row]",
+ connection: AsyncConnection[Row],
name: str,
*,
scrollable: bool | None = None,
@overload
def __init__(
self,
- connection: "AsyncConnection[Any]",
+ connection: AsyncConnection[Any],
name: str,
*,
row_factory: AsyncRowFactory[Row],
def __init__(
self,
- connection: "AsyncConnection[Any]",
+ connection: AsyncConnection[Any],
name: str,
*,
row_factory: AsyncRowFactory[Row] | None = None,
import logging
from types import TracebackType
-from typing import Generic, Iterator, TYPE_CHECKING
+from typing import Any, Generic, Iterator, TYPE_CHECKING
from . import pq
from . import sql
from .pq.misc import connection_summary
if TYPE_CHECKING:
- from typing import Any
from .connection import Connection
from .connection_async import AsyncConnection
)
-class Transaction(BaseTransaction["Connection[Any]"]):
+class Transaction(BaseTransaction[Connection[Any]]):
"""
Returned by `Connection.transaction()` to handle a transaction block.
"""
__module__ = "psycopg"
@property
- def connection(self) -> "Connection[Any]":
+ def connection(self) -> Connection[Any]:
"""The connection the object is managing."""
return self._conn
return False
-class AsyncTransaction(BaseTransaction["AsyncConnection[Any]"]):
+class AsyncTransaction(BaseTransaction[AsyncConnection[Any]]):
"""
Returned by `AsyncConnection.transaction()` to handle a transaction block.
"""
__module__ = "psycopg"
@property
- def connection(self) -> "AsyncConnection[Any]":
+ def connection(self) -> AsyncConnection[Any]:
return self._conn
async def __aenter__(self) -> Self:
self.python_type: type | None = None
@classmethod
- def _get_info_query(cls, conn: "BaseConnection[Any]") -> abc.Query:
+ def _get_info_query(cls, conn: BaseConnection[Any]) -> abc.Query:
return sql.SQL(
"""\
SELECT
raise DataError(f"can't parse interval: {e}") from None
-def _get_datestyle(conn: "BaseConnection[Any]" | None) -> bytes:
+def _get_datestyle(conn: BaseConnection[Any] | None) -> bytes:
if conn:
ds = conn.pgconn.parameter_status(b"DateStyle")
if ds:
return b"ISO, DMY"
-def _get_intervalstyle(conn: "BaseConnection[Any]" | None) -> bytes:
+def _get_intervalstyle(conn: BaseConnection[Any] | None) -> bytes:
if conn:
ints = conn.pgconn.parameter_status(b"IntervalStyle")
if ints:
def _get_timestamp_load_error(
- conn: "BaseConnection[Any]" | None, data: Buffer, ex: Exception | None = None
+ conn: BaseConnection[Any] | None, data: Buffer, ex: Exception | None = None
) -> Exception:
s = bytes(data).decode("utf8", "replace")
self.enum: type[Enum] | None = None
@classmethod
- def _get_info_query(cls, conn: "BaseConnection[Any]") -> Query:
+ def _get_info_query(cls, conn: BaseConnection[Any]) -> Query:
return sql.SQL(
"""\
SELECT name, oid, array_oid, array_agg(label) AS labels
self.subtype_oid = subtype_oid
@classmethod
- def _get_info_query(cls, conn: "BaseConnection[Any]") -> Query:
+ def _get_info_query(cls, conn: BaseConnection[Any]) -> Query:
if conn.info.server_version < 140000:
raise e.NotSupportedError(
"multirange types are only available from PostgreSQL 14"
self.subtype_oid = subtype_oid
@classmethod
- def _get_info_query(cls, conn: "BaseConnection[Any]") -> Query:
+ def _get_info_query(cls, conn: BaseConnection[Any]) -> Query:
return sql.SQL(
"""\
SELECT t.typname AS name, t.oid AS oid, t.typarray AS array_oid,
class ByteaLoader(Loader):
- _escaping: "EscapingProto"
+ _escaping: EscapingProto
def __init__(self, oid: int, context: AdaptContext | None = None):
super().__init__(oid, context)
import uuid
# Importing the uuid module is slow, so import it only on request.
-UUID: Callable[..., "uuid.UUID"] = None # type: ignore[assignment]
+UUID: Callable[..., uuid.UUID] = None # type: ignore[assignment]
class UUIDDumper(Dumper):
oid = _oids.UUID_OID
- def dump(self, obj: "uuid.UUID") -> Buffer | None:
+ def dump(self, obj: uuid.UUID) -> Buffer | None:
return obj.hex.encode()
class UUIDBinaryDumper(UUIDDumper):
format = Format.BINARY
- def dump(self, obj: "uuid.UUID") -> Buffer | None:
+ def dump(self, obj: uuid.UUID) -> Buffer | None:
return obj.bytes
if UUID is None:
from uuid import UUID
- def load(self, data: Buffer) -> "uuid.UUID":
+ def load(self, data: Buffer) -> uuid.UUID:
if isinstance(data, memoryview):
data = bytes(data)
return UUID(data.decode())
class UUIDBinaryLoader(UUIDLoader):
format = Format.BINARY
- def load(self, data: Buffer) -> "uuid.UUID":
+ def load(self, data: Buffer) -> uuid.UUID:
if isinstance(data, memoryview):
data = bytes(data)
return UUID(bytes=data)
return None
@property
- def info(self) -> list["ConninfoOption"]:
+ def info(self) -> list[ConninfoOption]:
_ensure_pgconn(self)
cdef libpq.PQconninfoOption *opts = libpq.PQconninfo(self._pgconn_ptr)
if opts is NULL:
AsyncConnectionCB: TypeAlias = Callable[[ACT], Awaitable[None]]
# Callbacks to pass the pool to on connection failure
-ConnectFailedCB: TypeAlias = Callable[["ConnectionPool[Any]"], None]
+ConnectFailedCB: TypeAlias = Callable[[ConnectionPool[Any]], None]
AsyncConnectFailedCB: TypeAlias = (
- Callable[["AsyncConnectionPool[Any]"], None]
- | Callable[["AsyncConnectionPool[Any]"], Awaitable[None]]
+ Callable[[AsyncConnectionPool[Any]], None]
+ | Callable[[AsyncConnectionPool[Any]], Awaitable[None]]
)
else:
raise PoolClosed(f"the pool {self.name!r} is not open yet")
- def _check_pool_putconn(self, conn: "BaseConnection[Any]") -> None:
+ def _check_pool_putconn(self, conn: BaseConnection[Any]) -> None:
pool = getattr(conn, "_pool", None)
if pool is self:
return
"""
return value * (1.0 + ((max_pc - min_pc) * random()) + min_pc)
- def _set_connection_expiry_date(self, conn: "BaseConnection[Any]") -> None:
+ def _set_connection_expiry_date(self, conn: BaseConnection[Any]) -> None:
"""Set an expiry date on a connection.
Add some randomness to avoid mass reconnection.