self._row_loaders: List[LoadFunc] = []
@property
- def connection(self) -> Optional["BaseConnection"]:
+ def connection(self) -> Optional["BaseConnection[Any]"]:
return self._conn
@property
@classmethod
def fetch(
- cls: Type[T], conn: "Connection", name: Union[str, "Identifier"]
+ cls: Type[T], conn: "Connection[Any]", name: Union[str, "Identifier"]
) -> Optional[T]:
"""
Query a system catalog to read information about a type.
@classmethod
async def fetch_async(
- cls: Type[T], conn: "AsyncConnection", name: Union[str, "Identifier"]
+ cls: Type[T],
+ conn: "AsyncConnection[Any]",
+ name: Union[str, "Identifier"],
) -> Optional[T]:
"""
Query a system catalog to read information about a type.
def __init__(self, cls: type, context: Optional[AdaptContext] = None):
self.cls = cls
- self.connection: Optional["BaseConnection"] = (
+ self.connection: Optional["BaseConnection[Any]"] = (
context.connection if context else None
)
def __init__(self, oid: int, context: Optional[AdaptContext] = None):
self.oid = oid
- self.connection: Optional["BaseConnection"] = (
+ self.connection: Optional["BaseConnection[Any]"] = (
context.connection if context else None
)
return self
@property
- def connection(self) -> Optional["BaseConnection"]:
+ def connection(self) -> Optional["BaseConnection[Any]"]:
return None
def register_dumper(
import warnings
import threading
from types import TracebackType
-from typing import Any, AsyncIterator, Callable, Iterator, List, NamedTuple
-from typing import Optional, overload, Type, Union, TYPE_CHECKING
+from typing import Any, AsyncIterator, Callable, Generic, Iterator, List
+from typing import NamedTuple, Optional, Type, Union, TYPE_CHECKING, overload
from weakref import ref, ReferenceType
from functools import partial
from contextlib import contextmanager
from .pq import ConnStatus, ExecStatus, TransactionStatus, Format
from .sql import Composable
from .rows import tuple_row
-from .proto import PQGen, PQGenConn, RV, Row, RowFactory, Query, Params
-from .proto import AdaptContext, ConnectionType
+from .proto import AdaptContext, ConnectionType, Params, PQGen, PQGenConn
+from .proto import Query, Row, RowConn, RowFactory, RV
from .cursor import Cursor, AsyncCursor
from .conninfo import make_conninfo, ConnectionInfo
from .generators import notifies
NotifyHandler = Callable[[Notify], None]
-class BaseConnection(AdaptContext):
+class BaseConnection(AdaptContext, Generic[RowConn]):
"""
Base class for different types of connections.
ConnStatus = pq.ConnStatus
TransactionStatus = pq.TransactionStatus
- def __init__(self, pgconn: "PGconn", row_factory: RowFactory[Any]):
+ def __init__(self, pgconn: "PGconn", row_factory: RowFactory[RowConn]):
self.pgconn = pgconn # TODO: document this
self._row_factory = row_factory
self._autocommit = False
return self._adapters
@property
- def connection(self) -> "BaseConnection":
+ def connection(self) -> "BaseConnection[RowConn]":
# implement the AdaptContext protocol
return self
@property
- def row_factory(self) -> RowFactory[Any]:
+ def row_factory(self) -> RowFactory[RowConn]:
"""Writable attribute to control how result rows are formed."""
return self._row_factory
@row_factory.setter
- def row_factory(self, row_factory: RowFactory[Any]) -> None:
+ def row_factory(self, row_factory: RowFactory[RowConn]) -> None:
self._row_factory = row_factory
def fileno(self) -> int:
@staticmethod
def _notice_handler(
- wself: "ReferenceType[BaseConnection]", res: "PGresult"
+ wself: "ReferenceType[BaseConnection[RowConn]]", res: "PGresult"
) -> None:
self = wself()
if not (self and self._notice_handler):
@staticmethod
def _notify_handler(
- wself: "ReferenceType[BaseConnection]", pgn: pq.PGnotify
+ wself: "ReferenceType[BaseConnection[RowConn]]", pgn: pq.PGnotify
) -> None:
self = wself()
if not (self and self._notify_handlers):
yield from self._exec_command(b"rollback")
-class Connection(BaseConnection):
+class Connection(BaseConnection[RowConn]):
"""
Wrapper for a connection to the database.
"""
__module__ = "psycopg3"
- def __init__(self, pgconn: "PGconn", row_factory: RowFactory[Any]):
+ def __init__(self, pgconn: "PGconn", row_factory: RowFactory[RowConn]):
super().__init__(pgconn, row_factory)
self.lock = threading.Lock()
conninfo: str = "",
*,
autocommit: bool = False,
- row_factory: Optional[RowFactory[Any]] = None,
+ row_factory: Optional[RowFactory[RowConn]] = None,
**kwargs: Any,
- ) -> "Connection":
+ ) -> "Connection[RowConn]":
"""
Connect to a database server and return a new `Connection` instance.
)
)
- def __enter__(self) -> "Connection":
+ def __enter__(self) -> "Connection[RowConn]":
return self
def __exit__(
self.pgconn.finish()
@overload
- def cursor(self, *, binary: bool = False) -> Cursor[Any]:
+ def cursor(self, *, binary: bool = False) -> Cursor[RowConn]:
...
@overload
...
@overload
- def cursor(self, name: str, *, binary: bool = False) -> ServerCursor[Any]:
+ def cursor(
+ self, name: str, *, binary: bool = False
+ ) -> ServerCursor[RowConn]:
...
@overload
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
- ) -> Cursor[Any]:
+ ) -> Cursor[RowConn]:
"""Execute a query and return a cursor to read its results."""
- cur: Cursor[Any] = self.cursor()
+ cur = self.cursor()
try:
return cur.execute(query, params, prepare=prepare)
except e.Error as ex:
self.wait(self._set_client_encoding_gen(name))
-class AsyncConnection(BaseConnection):
+class AsyncConnection(BaseConnection[RowConn]):
"""
Asynchronous wrapper for a connection to the database.
"""
__module__ = "psycopg3"
- def __init__(self, pgconn: "PGconn", row_factory: RowFactory[Any]):
+ def __init__(self, pgconn: "PGconn", row_factory: RowFactory[RowConn]):
super().__init__(pgconn, row_factory)
self.lock = asyncio.Lock()
conninfo: str = "",
*,
autocommit: bool = False,
- row_factory: Optional[RowFactory[Any]] = None,
+ row_factory: Optional[RowFactory[RowConn]] = None,
**kwargs: Any,
- ) -> "AsyncConnection":
+ ) -> "AsyncConnection[RowConn]":
return await cls._wait_conn(
cls._connect_gen(
conninfo,
)
)
- async def __aenter__(self) -> "AsyncConnection":
+ async def __aenter__(self) -> "AsyncConnection[RowConn]":
return self
async def __aexit__(
self.pgconn.finish()
@overload
- def cursor(self, *, binary: bool = False) -> AsyncCursor[Any]:
+ def cursor(self, *, binary: bool = False) -> AsyncCursor[RowConn]:
...
@overload
@overload
def cursor(
self, name: str, *, binary: bool = False
- ) -> AsyncServerCursor[Any]:
+ ) -> AsyncServerCursor[RowConn]:
...
@overload
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
- ) -> AsyncCursor[Any]:
- cur: AsyncCursor[Any] = self.cursor()
+ ) -> AsyncCursor[RowConn]:
+ cur = self.cursor()
try:
return await cur.execute(query, params, prepare=prepare)
except e.Error as ex:
self._finished = True
-class Copy(BaseCopy["Connection"]):
+class Copy(BaseCopy["Connection[Any]"]):
"""Manage a :sql:`COPY` operation."""
__module__ = "psycopg3"
self._worker = None # break the loop
-class AsyncCopy(BaseCopy["AsyncConnection"]):
+class AsyncCopy(BaseCopy["AsyncConnection[Any]"]):
"""Manage an asynchronous :sql:`COPY` operation."""
__module__ = "psycopg3"
self._pgq = pgq
-class Cursor(BaseCursor["Connection", Row]):
+class Cursor(BaseCursor["Connection[Any]", Row]):
__module__ = "psycopg3"
__slots__ = ()
yield copy
-class AsyncCursor(BaseCursor["AsyncConnection", Row]):
+class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
__module__ = "psycopg3"
__slots__ = ()
logger = logging.getLogger("psycopg3.pool")
-class AsyncConnectionPool(BasePool[AsyncConnection]):
+class AsyncConnectionPool(BasePool[AsyncConnection[Any]]):
def __init__(
self,
conninfo: str = "",
*,
- connection_class: Type[AsyncConnection] = AsyncConnection,
+ connection_class: Type[AsyncConnection[Any]] = AsyncConnection,
configure: Optional[
- Callable[[AsyncConnection], Awaitable[None]]
+ Callable[[AsyncConnection[Any]], Awaitable[None]]
+ ] = None,
+ reset: Optional[
+ Callable[[AsyncConnection[Any]], Awaitable[None]]
] = None,
- reset: Optional[Callable[[AsyncConnection], Awaitable[None]]] = None,
**kwargs: Any,
):
# https://bugs.python.org/issue42600
@asynccontextmanager
async def connection(
self, timeout: Optional[float] = None
- ) -> AsyncIterator[AsyncConnection]:
+ ) -> AsyncIterator[AsyncConnection[Any]]:
conn = await self.getconn(timeout=timeout)
t0 = monotonic()
try:
async def getconn(
self, timeout: Optional[float] = None
- ) -> AsyncConnection:
+ ) -> AsyncConnection[Any]:
logger.info("connection requested from %r", self.name)
self._stats[self._REQUESTS_NUM] += 1
# Critical section: decide here if there's a connection ready
logger.info("connection given by %r", self.name)
return conn
- async def putconn(self, conn: AsyncConnection) -> None:
+ async def putconn(self, conn: AsyncConnection[Any]) -> None:
# Quick check to discard the wrong connection
pool = getattr(conn, "_pool", None)
if pool is not self:
ex,
)
- async def _connect(self) -> AsyncConnection:
+ async def _connect(self) -> AsyncConnection[Any]:
"""Return a new connection configured for the pool."""
self._stats[self._CONNECTIONS_NUM] += 1
t0 = monotonic()
else:
self._growing = False
- async def _return_connection(self, conn: AsyncConnection) -> None:
+ async def _return_connection(self, conn: AsyncConnection[Any]) -> None:
"""
Return a connection to the pool after usage.
"""
await self._add_to_pool(conn)
- async def _add_to_pool(self, conn: AsyncConnection) -> None:
+ async def _add_to_pool(self, conn: AsyncConnection[Any]) -> None:
"""
Add a connection to the pool.
if self._pool_full_event and len(self._pool) >= self._nconns:
self._pool_full_event.set()
- async def _reset_connection(self, conn: AsyncConnection) -> None:
+ async def _reset_connection(self, conn: AsyncConnection[Any]) -> None:
"""
Bring a connection to IDLE state or close it.
"""
await conn.close()
async def _shrink_pool(self) -> None:
- to_close: Optional[AsyncConnection] = None
+ to_close: Optional[AsyncConnection[Any]] = None
async with self._lock:
# Reset the min number of connections used
__slots__ = ("conn", "error", "_cond")
def __init__(self) -> None:
- self.conn: Optional[AsyncConnection] = None
+ self.conn: Optional[AsyncConnection[Any]] = None
self.error: Optional[Exception] = None
# The AsyncClient behaves in a way similar to an Event, but we need
# will be lost.
self._cond = asyncio.Condition()
- async def wait(self, timeout: float) -> AsyncConnection:
+ async def wait(self, timeout: float) -> AsyncConnection[Any]:
"""Wait for a connection to be set and return it.
Raise an exception if the wait times out or if fail() is called.
assert self.error
raise self.error
- async def set(self, conn: AsyncConnection) -> bool:
+ async def set(self, conn: AsyncConnection[Any]) -> bool:
"""Signal the client waiting that a connection is ready.
Return True if the client has "accepted" the connection, False
class ReturnConnection(MaintenanceTask):
"""Clean up and return a connection to the pool."""
- def __init__(self, pool: "AsyncConnectionPool", conn: "AsyncConnection"):
+ def __init__(
+ self, pool: "AsyncConnectionPool", conn: "AsyncConnection[Any]"
+ ):
super().__init__(pool)
self.conn = conn
"""
def __init__(
- self, pool: "AsyncConnectionPool", task: MaintenanceTask, delay: float
+ self,
+ pool: "AsyncConnectionPool",
+ task: MaintenanceTask,
+ delay: float,
):
super().__init__(pool)
self.task = task
from time import monotonic
from queue import Queue, Empty
from types import TracebackType
-from typing import Any, Callable, Deque, Dict, Iterator, List, Optional, Type
+from typing import Any, Callable, Deque, Dict, Iterator, List
+from typing import Optional, Type
from weakref import ref
from contextlib import contextmanager
from collections import deque
logger = logging.getLogger("psycopg3.pool")
-class ConnectionPool(BasePool[Connection]):
+class ConnectionPool(BasePool[Connection[Any]]):
def __init__(
self,
conninfo: str = "",
*,
- connection_class: Type[Connection] = Connection,
- configure: Optional[Callable[[Connection], None]] = None,
- reset: Optional[Callable[[Connection], None]] = None,
+ connection_class: Type[Connection[Any]] = Connection,
+ configure: Optional[Callable[[Connection[Any]], None]] = None,
+ reset: Optional[Callable[[Connection[Any]], None]] = None,
**kwargs: Any,
):
self.connection_class = connection_class
@contextmanager
def connection(
self, timeout: Optional[float] = None
- ) -> Iterator[Connection]:
+ ) -> Iterator[Connection[Any]]:
"""Context manager to obtain a connection from the pool.
Returned the connection immediately if available, otherwise wait up to
self._stats[self._USAGE_MS] += int(1000.0 * (t1 - t0))
self.putconn(conn)
- def getconn(self, timeout: Optional[float] = None) -> Connection:
+ def getconn(self, timeout: Optional[float] = None) -> Connection[Any]:
"""Obtain a contection from the pool.
You should preferrably use `connection()`. Use this function only if
logger.info("connection given by %r", self.name)
return conn
- def putconn(self, conn: Connection) -> None:
+ def putconn(self, conn: Connection[Any]) -> None:
"""Return a connection to the loving hands of its pool.
Use this function only paired with a `getconn()`. You don't need to use
ex,
)
- def _connect(self) -> Connection:
+ def _connect(self) -> Connection[Any]:
"""Return a new connection configured for the pool."""
self._stats[self._CONNECTIONS_NUM] += 1
t0 = monotonic()
else:
self._growing = False
- def _return_connection(self, conn: Connection) -> None:
+ def _return_connection(self, conn: Connection[Any]) -> None:
"""
Return a connection to the pool after usage.
"""
self._add_to_pool(conn)
- def _add_to_pool(self, conn: Connection) -> None:
+ def _add_to_pool(self, conn: Connection[Any]) -> None:
"""
Add a connection to the pool.
if self._pool_full_event and len(self._pool) >= self._nconns:
self._pool_full_event.set()
- def _reset_connection(self, conn: Connection) -> None:
+ def _reset_connection(self, conn: Connection[Any]) -> None:
"""
Bring a connection to IDLE state or close it.
"""
conn.close()
def _shrink_pool(self) -> None:
- to_close: Optional[Connection] = None
+ to_close: Optional[Connection[Any]] = None
with self._lock:
# Reset the min number of connections used
__slots__ = ("conn", "error", "_cond")
def __init__(self) -> None:
- self.conn: Optional[Connection] = None
+ self.conn: Optional[Connection[Any]] = None
self.error: Optional[Exception] = None
# The WaitingClient behaves in a way similar to an Event, but we need
# will be lost.
self._cond = threading.Condition()
- def wait(self, timeout: float) -> Connection:
+ def wait(self, timeout: float) -> Connection[Any]:
"""Wait for a connection to be set and return it.
Raise an exception if the wait times out or if fail() is called.
assert self.error
raise self.error
- def set(self, conn: Connection) -> bool:
+ def set(self, conn: Connection[Any]) -> bool:
"""Signal the client waiting that a connection is ready.
Return True if the client has "accepted" the connection, False
class ReturnConnection(MaintenanceTask):
"""Clean up and return a connection to the pool."""
- def __init__(self, pool: "ConnectionPool", conn: "Connection"):
+ def __init__(self, pool: "ConnectionPool", conn: "Connection[Any]"):
super().__init__(pool)
self.conn = conn
Query = Union[str, bytes, "Composable"]
Params = Union[Sequence[Any], Mapping[str, Any]]
-ConnectionType = TypeVar("ConnectionType", bound="BaseConnection")
+ConnectionType = TypeVar("ConnectionType", bound="BaseConnection[Any]")
# Waiting protocol types
Row = TypeVar("Row")
Row_co = TypeVar("Row_co", covariant=True)
+# Type variable for Connection (other are for Cursor).
+RowConn = TypeVar("RowConn")
class RowMaker(Protocol[Row_co]):
...
@property
- def connection(self) -> Optional["BaseConnection"]:
+ def connection(self) -> Optional["BaseConnection[Any]"]:
...
...
@property
- def connection(self) -> Optional["BaseConnection"]:
+ def connection(self) -> Optional["BaseConnection[Any]"]:
...
@property
from .proto import ConnectionType, Query, Params, PQGen, Row, RowFactory
if TYPE_CHECKING:
+ from typing import Any # noqa: F401
from .connection import BaseConnection # noqa: F401
from .connection import Connection, AsyncConnection # noqa: F401
return sql.SQL(" ").join(parts)
-class ServerCursor(BaseCursor["Connection", Row]):
+class ServerCursor(BaseCursor["Connection[Any]", Row]):
__module__ = "psycopg3"
__slots__ = ("_helper", "itersize")
def __init__(
self,
- connection: "Connection",
+ connection: "Connection[Any]",
name: str,
*,
format: pq.Format = pq.Format.TEXT,
row_factory: RowFactory[Row],
):
super().__init__(connection, format=format, row_factory=row_factory)
- self._helper: ServerCursorHelper["Connection", Row]
+ self._helper: ServerCursorHelper["Connection[Any]", Row]
self._helper = ServerCursorHelper(name)
self.itersize: int = DEFAULT_ITERSIZE
self._pos = value
-class AsyncServerCursor(BaseCursor["AsyncConnection", Row]):
+class AsyncServerCursor(BaseCursor["AsyncConnection[Any]", Row]):
__module__ = "psycopg3"
__slots__ = ("_helper", "itersize")
def __init__(
self,
- connection: "AsyncConnection",
+ connection: "AsyncConnection[Any]",
name: str,
*,
format: pq.Format = pq.Format.TEXT,
row_factory: RowFactory[Row],
):
super().__init__(connection, format=format, row_factory=row_factory)
- self._helper: ServerCursorHelper["AsyncConnection", Row]
+ self._helper: ServerCursorHelper["AsyncConnection[Any]", Row]
self._helper = ServerCursorHelper(name)
self.itersize: int = DEFAULT_ITERSIZE
from .pq.proto import PGresult
if TYPE_CHECKING:
+ from typing import Any # noqa: F401
from .connection import Connection, AsyncConnection # noqa: F401
logger = logging.getLogger(__name__)
return False
-class Transaction(BaseTransaction["Connection"]):
+class Transaction(BaseTransaction["Connection[Any]"]):
"""
Returned by `Connection.transaction()` to handle a transaction block.
"""
__module__ = "psycopg3"
@property
- def connection(self) -> "Connection":
+ def connection(self) -> "Connection[Any]":
"""The connection the object is managing."""
return self._conn
return self._conn.wait(self._exit_gen(exc_type, exc_val, exc_tb))
-class AsyncTransaction(BaseTransaction["AsyncConnection"]):
+class AsyncTransaction(BaseTransaction["AsyncConnection[Any]"]):
"""
Returned by `AsyncConnection.transaction()` to handle a transaction block.
"""
__module__ = "psycopg3"
@property
- def connection(self) -> "AsyncConnection":
+ def connection(self) -> "AsyncConnection[Any]":
return self._conn
async def __aenter__(self) -> "AsyncTransaction":
class Transformer(proto.AdaptContext):
def __init__(self, context: Optional[proto.AdaptContext] = None): ...
@property
- def connection(self) -> Optional[BaseConnection]: ...
+ def connection(self) -> Optional[BaseConnection[Any]]: ...
@property
def adapters(self) -> AdaptersMap: ...
@property
from __future__ import annotations
from dataclasses import dataclass
-from typing import Any, Callable, Optional, Sequence
+from typing import Any, Callable, Optional, Sequence, Tuple
-from psycopg3 import BaseCursor, Cursor, ServerCursor, connect
+from psycopg3 import BaseCursor, Connection, Cursor, ServerCursor, connect
def int_row_factory(
def check_row_factory_cursor() -> None:
"""Type-check connection.cursor(..., row_factory=<MyRowFactory>) case."""
- conn = connect()
+ conn = connect() # type: ignore[var-annotated] # Connection[Any]
cur1: Cursor[Any]
cur1 = conn.cursor()
def check_row_factory_connection() -> None:
"""Type-check connect(..., row_factory=<MyRowFactory>) or
Connection.row_factory cases.
-
- This example is incomplete because Connection is not generic on Row, hence
- all the Any, which we aim at getting rid of.
"""
- cur1: Cursor[Any]
- r1: Any
+ conn1: Connection[int]
+ cur1: Cursor[int]
+ r1: Optional[int]
conn1 = connect(row_factory=int_row_factory)
cur1 = conn1.execute("select 1")
r1 = cur1.fetchone()
with conn1.cursor() as cur1:
cur1.execute("select 2")
- cur2: Cursor[Any]
- r2: Any
+ conn2: Connection[Person]
+ cur2: Cursor[Person]
+ r2: Optional[Person]
conn2 = connect(row_factory=Person.row_factory)
cur2 = conn2.execute("select * from persons")
r2 = cur2.fetchone()
with conn2.cursor() as cur2:
cur2.execute("select 2")
- cur3: Cursor[Any]
- r3: Optional[Any]
- conn3 = connect()
+ cur3: Cursor[Tuple[Any, ...]]
+ r3: Optional[Tuple[Any, ...]]
+ conn3 = connect() # type: ignore[var-annotated]
cur3 = conn3.execute("select 3")
with conn3.cursor() as cur3:
cur3.execute("select 42")