.. autoclass:: psycopg.rows.RowFactory()
- .. method:: __call__(cursor: AnyCursor[Row]) -> RowMaker[Row]
+ .. method:: __call__(cursor: Cursor[Row]) -> RowMaker[Row]
Inspect the result on a cursor and return a `RowMaker` to convert rows.
- `!AnyCursor` may be either a `~psycopg.Cursor` or an
- `~psycopg.AsyncCursor`.
+.. autoclass:: psycopg.rows.AsyncRowFactory()
+ .. method:: __call__(cursor: AsyncCursor[Row]) -> RowMaker[Row]
+
+ Inspect the result on a cursor and return a `RowMaker` to convert rows.
+
+Note that it's easy to implement an object implementing both `!RowFactory` and
+`!AsyncRowFactory`: usually, everything you need to implement a row factory is
+to access `~Cursor.description`, which is provided by both the cursor flavours.
+The `psycopg` module also exposes a class `AnyCursor` which you may use if you
+want to use the same row factory for both sync and async cursors.
`~RowFactory` objects can be implemented as a class, for instance:
import warnings
import threading
from types import TracebackType
-from typing import Any, AsyncIterator, Callable, Generic, Iterator, List
+from typing import Any, AsyncIterator, Callable, cast, Generic, Iterator, List
from typing import NamedTuple, Optional, Type, TypeVar, Union
from typing import overload, TYPE_CHECKING
from weakref import ref, ReferenceType
from .pq import ConnStatus, ExecStatus, TransactionStatus, Format
from .abc import ConnectionType, Params, PQGen, PQGenConn, Query, RV
from .sql import Composable
-from .rows import Row, RowFactory, tuple_row, TupleRow
+from .rows import Row, RowFactory, AsyncRowFactory, tuple_row, TupleRow
from ._enums import IsolationLevel
from .compat import asynccontextmanager
from .cursor import Cursor, AsyncCursor
ConnStatus = pq.ConnStatus
TransactionStatus = pq.TransactionStatus
- def __init__(self, pgconn: "PGconn", row_factory: RowFactory[Row]):
+ def __init__(
+ self,
+ pgconn: "PGconn",
+ row_factory: Union[RowFactory[Row], AsyncRowFactory[Row]],
+ ):
self.pgconn = pgconn # TODO: document this
self._row_factory = row_factory
self._autocommit = False
# implement the AdaptContext protocol
return self
- @property
- def row_factory(self) -> RowFactory[Row]:
- """Writable attribute to control how result rows are formed."""
- return self._row_factory
-
- @row_factory.setter
- def row_factory(self, row_factory: RowFactory[Row]) -> None:
- self._row_factory = row_factory
-
def fileno(self) -> int:
"""Return the file descriptor of the connection.
self._closed = True
self.pgconn.finish()
+ @property
+ def row_factory(self) -> RowFactory[Row]:
+ """Writable attribute to control how result rows are formed."""
+ return cast(RowFactory[Row], self._row_factory)
+
+ @row_factory.setter
+ def row_factory(self, row_factory: RowFactory[Row]) -> None:
+ self._row_factory = row_factory
+
@overload
def cursor(self, *, binary: bool = False) -> Cursor[Row]:
...
cursor_factory: Type[AsyncCursor[Row]]
server_cursor_factory: Type[AsyncServerCursor[Row]]
- def __init__(self, pgconn: "PGconn", row_factory: RowFactory[Row]):
+ def __init__(self, pgconn: "PGconn", row_factory: AsyncRowFactory[Row]):
super().__init__(pgconn, row_factory)
self.lock = asyncio.Lock()
self.cursor_factory = AsyncCursor
conninfo: str = "",
*,
autocommit: bool = False,
- row_factory: RowFactory[Row],
+ row_factory: AsyncRowFactory[Row],
**kwargs: Union[None, int, str],
) -> "AsyncConnection[Row]":
...
self._closed = True
self.pgconn.finish()
+ @property
+ def row_factory(self) -> AsyncRowFactory[Row]:
+ """Writable attribute to control how result rows are formed."""
+ return cast(AsyncRowFactory[Row], self._row_factory)
+
+ @row_factory.setter
+ def row_factory(self, row_factory: AsyncRowFactory[Row]) -> None:
+ self._row_factory = row_factory
+
@overload
def cursor(self, *, binary: bool = False) -> AsyncCursor[Row]:
...
@overload
def cursor(
- self, *, binary: bool = False, row_factory: RowFactory[CursorRow]
+ self, *, binary: bool = False, row_factory: AsyncRowFactory[CursorRow]
) -> AsyncCursor[CursorRow]:
...
name: str,
*,
binary: bool = False,
- row_factory: RowFactory[CursorRow],
+ row_factory: AsyncRowFactory[CursorRow],
scrollable: Optional[bool] = None,
withhold: bool = False,
) -> AsyncServerCursor[CursorRow]:
name: str = "",
*,
binary: bool = False,
- row_factory: Optional[RowFactory[Any]] = None,
+ row_factory: Optional[AsyncRowFactory[Any]] = None,
scrollable: Optional[bool] = None,
withhold: bool = False,
) -> Union[AsyncCursor[Any], AsyncServerCursor[Any]]:
import sys
from types import TracebackType
from typing import Any, AsyncIterator, Callable, Generic, Iterator, List
-from typing import Optional, NoReturn, Sequence, Type, TYPE_CHECKING
+from typing import Optional, NoReturn, Sequence, Type, TYPE_CHECKING, TypeVar
from contextlib import contextmanager
from . import pq
from .pq import ExecStatus, Format
from .abc import ConnectionType, Query, Params, PQGen
from .copy import Copy, AsyncCopy
-from .rows import Row, RowFactory
+from .rows import Row, RowMaker, RowFactory, AsyncRowFactory
from .compat import asynccontextmanager
from ._column import Column
from ._cmodule import _psycopg
ExecStatus = pq.ExecStatus
_tx: "Transformer"
+ _make_row: RowMaker[Row]
def __init__(
self,
connection: ConnectionType,
- *,
- row_factory: RowFactory[Row],
):
self._conn = connection
self.format = Format.TEXT
self._adapters = adapt.AdaptersMap(connection.adapters)
- self._row_factory = row_factory
self.arraysize = 1
self._closed = False
self._last_query: Optional[Query] = None
if self._iresult < len(self._results):
self.pgresult = self._results[self._iresult]
self._tx.set_pgresult(self._results[self._iresult])
- self._make_row = self._row_factory(self)
+ self._make_row = self._make_row_maker()
self._pos = 0
nrows = self.pgresult.command_tuples
self._rowcount = nrows if nrows is not None else -1
else:
return None
- @property
- def row_factory(self) -> RowFactory[Row]:
- """Writable attribute to control how result rows are formed."""
- return self._row_factory
-
- @row_factory.setter
- def row_factory(self, row_factory: RowFactory[Row]) -> None:
- self._row_factory = row_factory
- if self.pgresult:
- self._make_row = row_factory(self)
+ def _make_row_maker(self) -> RowMaker[Row]:
+ raise NotImplementedError
#
# Generators for the high level operations on the cursor
self.pgresult = res
self._tx.set_pgresult(res, set_loaders=first)
if first:
- self._make_row = self._row_factory(self)
+ self._make_row = self._make_row_maker()
return res
elif res.status in (ExecStatus.TUPLES_OK, ExecStatus.COMMAND_OK):
self._results = list(results)
self.pgresult = results[0]
self._tx.set_pgresult(results[0])
- self._make_row = self._row_factory(self)
+ self._make_row = self._make_row_maker()
nrows = self.pgresult.command_tuples
if nrows is not None:
if self._rowcount < 0:
else:
self._rowcount += nrows
- return
-
def _raise_from_results(self, results: Sequence["PGresult"]) -> NoReturn:
statuses = {res.status for res in results}
badstats = statuses.difference(self._status_ok)
AnyCursor = BaseCursor[Any, Row]
+C = TypeVar("C", bound="BaseCursor[Any, Any]")
+
+
class Cursor(BaseCursor["Connection[Any]", Row]):
__module__ = "psycopg"
__slots__ = ()
- def __enter__(self) -> "Cursor[Row]":
+ def __init__(
+ self, connection: "Connection[Any]", *, row_factory: RowFactory[Row]
+ ):
+ super().__init__(connection)
+ self._row_factory = row_factory
+
+ def __enter__(self: C) -> C:
return self
def __exit__(
"""
self._close()
+ @property
+ def row_factory(self) -> RowFactory[Row]:
+ """Writable attribute to control how result rows are formed."""
+ return self._row_factory
+
+ @row_factory.setter
+ def row_factory(self, row_factory: RowFactory[Row]) -> None:
+ self._row_factory = row_factory
+ if self.pgresult:
+ self._make_row = row_factory(self)
+
+ def _make_row_maker(self) -> RowMaker[Row]:
+ return self._row_factory(self)
+
def execute(
- self,
+ self: C,
query: Query,
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
- ) -> "Cursor[Row]":
+ ) -> C:
"""
Execute a query or command to the database.
"""
__module__ = "psycopg"
__slots__ = ()
- async def __aenter__(self) -> "AsyncCursor[Row]":
+ def __init__(
+ self,
+ connection: "AsyncConnection[Any]",
+ *,
+ row_factory: AsyncRowFactory[Row],
+ ):
+ super().__init__(connection)
+ self._row_factory = row_factory
+
+ async def __aenter__(self: C) -> C:
return self
async def __aexit__(
async def close(self) -> None:
self._close()
+ @property
+ def row_factory(self) -> AsyncRowFactory[Row]:
+ return self._row_factory
+
+ @row_factory.setter
+ def row_factory(self, row_factory: AsyncRowFactory[Row]) -> None:
+ self._row_factory = row_factory
+ if self.pgresult:
+ self._make_row = row_factory(self)
+
+ def _make_row_maker(self) -> RowMaker[Row]:
+ return self._row_factory(self)
+
async def execute(
- self,
+ self: C,
query: Query,
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
- ) -> "AsyncCursor[Row]":
+ ) -> C:
try:
async with self._conn.lock:
await self._conn.wait(
from .compat import Protocol
if TYPE_CHECKING:
- from .cursor import AnyCursor
+ from .cursor import AnyCursor, Cursor, AsyncCursor
# Row factories
use the values to create a dictionary for each record.
"""
- def __call__(self, __cursor: "AnyCursor[Row]") -> RowMaker[Row]:
+ def __call__(self, __cursor: "Cursor[Row]") -> RowMaker[Row]:
+ ...
+
+
+class AsyncRowFactory(Protocol[Row]):
+ """
+ Callable protocol taking an `~psycopg.AsyncCursor` and returning a `RowMaker`.
+ """
+
+ def __call__(self, __cursor: "AsyncCursor[Row]") -> RowMaker[Row]:
...
# Copyright (C) 2020-2021 The Psycopg Team
import warnings
-from types import TracebackType
-from typing import AsyncIterator, Generic, List, Iterator, Optional
-from typing import Sequence, Type, TYPE_CHECKING
+from typing import Any, AsyncIterator, cast, Generic, List, Iterator, Optional
+from typing import Sequence, TYPE_CHECKING
from . import pq
from . import sql
from . import errors as e
from .abc import ConnectionType, Query, Params, PQGen
-from .rows import Row, RowFactory
-from .cursor import BaseCursor, execute
+from .rows import Row, RowFactory, AsyncRowFactory
+from .cursor import C, BaseCursor, Cursor, AsyncCursor, execute
if TYPE_CHECKING:
- from typing import Any # noqa: F401
from .connection import BaseConnection # noqa: F401
from .connection import Connection, AsyncConnection # noqa: F401
return sql.SQL(" ").join(parts)
-class ServerCursor(BaseCursor["Connection[Any]", Row]):
+class ServerCursor(Cursor[Row]):
__module__ = "psycopg"
__slots__ = ("_helper", "itersize")
def __repr__(self) -> str:
return self._helper._repr(self)
- def __enter__(self) -> "ServerCursor[Row]":
- return self
-
- def __exit__(
- self,
- exc_type: Optional[Type[BaseException]],
- exc_val: Optional[BaseException],
- exc_tb: Optional[TracebackType],
- ) -> None:
- self.close()
-
@property
def name(self) -> str:
"""The name of the cursor."""
if self.closed:
return
self._conn.wait(self._helper._close_gen(self))
- self._close()
+ super().close()
def execute(
- self,
+ self: C,
query: Query,
params: Optional[Params] = None,
- ) -> "ServerCursor[Row]":
+ **kwargs: Any,
+ ) -> C:
"""
Open a cursor to execute a query to the database.
"""
- query = self._helper._make_declare_statement(self, query)
+ if kwargs:
+ raise TypeError(f"keyword not supported: {list(kwargs)[0]}")
+ helper = cast(ServerCursor[Row], self)._helper
+ query = helper._make_declare_statement(self, query)
with self._conn.lock:
- self._conn.wait(self._helper._declare_gen(self, query, params))
+ self._conn.wait(helper._declare_gen(self, query, params))
return self
def executemany(self, query: Query, params_seq: Sequence[Params]) -> None:
self._pos = value
-class AsyncServerCursor(BaseCursor["AsyncConnection[Any]", Row]):
+class AsyncServerCursor(AsyncCursor[Row]):
__module__ = "psycopg"
__slots__ = ("_helper", "itersize")
connection: "AsyncConnection[Any]",
name: str,
*,
- row_factory: RowFactory[Row],
+ row_factory: AsyncRowFactory[Row],
scrollable: Optional[bool] = None,
withhold: bool = False,
):
def __repr__(self) -> str:
return self._helper._repr(self)
- async def __aenter__(self) -> "AsyncServerCursor[Row]":
- return self
-
- async def __aexit__(
- self,
- exc_type: Optional[Type[BaseException]],
- exc_val: Optional[BaseException],
- exc_tb: Optional[TracebackType],
- ) -> None:
- await self.close()
-
@property
def name(self) -> str:
return self._helper.name
if self.closed:
return
await self._conn.wait(self._helper._close_gen(self))
- self._close()
+ await super().close()
async def execute(
- self,
+ self: C,
query: Query,
params: Optional[Params] = None,
- ) -> "AsyncServerCursor[Row]":
- query = self._helper._make_declare_statement(self, query)
+ **kwargs: Any,
+ ) -> C:
+ if kwargs:
+ raise TypeError(f"keyword not supported: {list(kwargs)[0]}")
+ helper = cast(AsyncServerCursor[Row], self)._helper
+ query = helper._make_declare_statement(self, query)
async with self._conn.lock:
- await self._conn.wait(
- self._helper._declare_gen(self, query, params)
- )
+ await self._conn.wait(helper._declare_gen(self, query, params))
return self
async def executemany(
from __future__ import annotations
from dataclasses import dataclass
-from typing import Any, Callable, Optional, Sequence, Tuple
+from typing import Any, Callable, Optional, Sequence, Tuple, Union
-from psycopg import AnyCursor, Connection, Cursor, ServerCursor, connect
+from psycopg import Connection, Cursor, ServerCursor, connect
+from psycopg import AsyncConnection, AsyncCursor, AsyncServerCursor
-def int_row_factory(cursor: AnyCursor[int]) -> Callable[[Sequence[int]], int]:
+def int_row_factory(
+ cursor: Union[Cursor[int], AsyncCursor[int]]
+) -> Callable[[Sequence[int]], int]:
return lambda values: values[0] if values else 42
@classmethod
def row_factory(
- cls, cursor: AnyCursor[Person]
+ cls, cursor: Union[Cursor[Person], AsyncCursor[Person]]
) -> Callable[[Sequence[str]], Person]:
def mkrow(values: Sequence[str]) -> Person:
name, address = values
persons[0].address
+async def async_check_row_factory_cursor() -> None:
+ """Type-check connection.cursor(..., row_factory=<MyRowFactory>) case."""
+ conn = await AsyncConnection.connect()
+
+ cur1: AsyncCursor[Any]
+ cur1 = conn.cursor()
+ r1: Optional[Any]
+ r1 = await cur1.fetchone()
+ r1 is not None
+
+ cur2: AsyncCursor[int]
+ r2: Optional[int]
+ async with conn.cursor(row_factory=int_row_factory) as cur2:
+ await cur2.execute("select 1")
+ r2 = await cur2.fetchone()
+ r2 and r2 > 0
+
+ cur3: AsyncServerCursor[Person]
+ persons: Sequence[Person]
+ async with conn.cursor(name="s", row_factory=Person.row_factory) as cur3:
+ await cur3.execute("select * from persons where name like 'al%'")
+ persons = await cur3.fetchall()
+ persons[0].address
+
+
def check_row_factory_connection() -> None:
"""Type-check connect(..., row_factory=<MyRowFactory>) or
Connection.row_factory cases.
cur3.execute("select 42")
r3 = cur3.fetchone()
r3 and len(r3)
+
+
+async def async_check_row_factory_connection() -> None:
+ """Type-check connect(..., row_factory=<MyRowFactory>) or
+ Connection.row_factory cases.
+ """
+ conn1: AsyncConnection[int]
+ cur1: AsyncCursor[int]
+ r1: Optional[int]
+ conn1 = await AsyncConnection.connect(row_factory=int_row_factory)
+ cur1 = await conn1.execute("select 1")
+ r1 = await cur1.fetchone()
+ r1 != 0
+ async with conn1.cursor() as cur1:
+ await cur1.execute("select 2")
+
+ conn2: AsyncConnection[Person]
+ cur2: AsyncCursor[Person]
+ r2: Optional[Person]
+ conn2 = await AsyncConnection.connect(row_factory=Person.row_factory)
+ cur2 = await conn2.execute("select * from persons")
+ r2 = await cur2.fetchone()
+ r2 and r2.name
+ async with conn2.cursor() as cur2:
+ await cur2.execute("select 2")
+
+ cur3: AsyncCursor[Tuple[Any, ...]]
+ r3: Optional[Tuple[Any, ...]]
+ conn3 = await AsyncConnection.connect()
+ cur3 = await conn3.execute("select 3")
+ async with conn3.cursor() as cur3:
+ await cur3.execute("select 42")
+ r3 = await cur3.fetchone()
+ r3 and len(r3)