__module__ = "psycopg3"
- def __init__(self, cursor: "BaseCursor[Any]", index: int):
+ def __init__(self, cursor: "BaseCursor[Any, Any]", index: int):
res = cursor.pgresult
assert res
dumper = cache[key1] = dumper.upgrade(obj, format)
return dumper
- def load_rows(self, row0: int, row1: int, make_row: RowMaker) -> List[Row]:
+ def load_rows(
+ self, row0: int, row1: int, make_row: RowMaker[Row]
+ ) -> List[Row]:
res = self._pgresult
if not res:
raise e.InterfaceError("result not set")
f"rows must be included between 0 and {self._ntuples}"
)
- records: List[Row] = []
+ records = []
for row in range(row0, row1):
record: List[Any] = [None] * self._nfields
for col in range(self._nfields):
return records
- def load_row(self, row: int, make_row: RowMaker) -> Optional[Row]:
+ def load_row(self, row: int, make_row: RowMaker[Row]) -> Optional[Row]:
res = self._pgresult
if not res:
return None
if val is not None:
record[col] = self._row_loaders[col](val)
- return make_row(record) # type: ignore[no-any-return]
+ return make_row(record)
def load_sequence(
self, record: Sequence[Optional[bytes]]
name = name.as_string(conn)
cur = conn.cursor(binary=True, row_factory=dict_row)
cur.execute(cls._info_query, {"name": name})
- recs: Sequence[Dict[str, Any]] = cur.fetchall()
+ recs = cur.fetchall()
return cls._fetch(name, recs)
@classmethod
cur = conn.cursor(binary=True, row_factory=dict_row)
await cur.execute(cls._info_query, {"name": name})
- recs: Sequence[Dict[str, Any]] = await cur.fetchall()
+ recs = await cur.fetchall()
return cls._fetch(name, recs)
@classmethod
from .pq import ConnStatus, ExecStatus, TransactionStatus, Format
from .sql import Composable
from .rows import tuple_row
-from .proto import PQGen, PQGenConn, RV, RowFactory, Query, Params
+from .proto import PQGen, PQGenConn, RV, Row, RowFactory, Query, Params
from .proto import AdaptContext, ConnectionType
from .cursor import Cursor, AsyncCursor
from .conninfo import make_conninfo, ConnectionInfo
ConnStatus = pq.ConnStatus
TransactionStatus = pq.TransactionStatus
- row_factory: RowFactory = tuple_row
+ row_factory: RowFactory[Any] = tuple_row
def __init__(self, pgconn: "PGconn"):
self.pgconn = pgconn # TODO: document this
conninfo: str = "",
*,
autocommit: bool = False,
- row_factory: RowFactory,
+ row_factory: RowFactory[Any],
**kwargs: Any,
) -> PQGenConn[ConnectionType]:
"""Generator to connect to the database and create a new instance."""
conninfo: str = "",
*,
autocommit: bool = False,
- row_factory: RowFactory = tuple_row,
+ row_factory: RowFactory[Any] = tuple_row,
**kwargs: Any,
) -> "Connection":
"""
self._closed = True
self.pgconn.finish()
+ @overload
+ def cursor(self, *, binary: bool = False) -> Cursor[Any]:
+ ...
+
@overload
def cursor(
- self, *, binary: bool = False, row_factory: Optional[RowFactory] = None
- ) -> Cursor:
+ self, *, binary: bool = False, row_factory: RowFactory[Row]
+ ) -> Cursor[Row]:
+ ...
+
+ @overload
+ def cursor(self, name: str, *, binary: bool = False) -> ServerCursor[Any]:
...
@overload
def cursor(
- self,
- name: str,
- *,
- binary: bool = False,
- row_factory: Optional[RowFactory] = None,
- ) -> ServerCursor:
+ self, name: str, *, binary: bool = False, row_factory: RowFactory[Row]
+ ) -> ServerCursor[Row]:
...
def cursor(
name: str = "",
*,
binary: bool = False,
- row_factory: Optional[RowFactory] = None,
- ) -> Union[Cursor, ServerCursor]:
+ row_factory: Optional[RowFactory[Any]] = None,
+ ) -> Union[Cursor[Any], ServerCursor[Any]]:
"""
Return a new cursor to send commands and queries to the connection.
"""
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
- ) -> Cursor:
+ ) -> Cursor[Any]:
"""Execute a query and return a cursor to read its results."""
- cur = self.cursor()
+ cur: Cursor[Any] = self.cursor()
try:
return cur.execute(query, params, prepare=prepare)
except e.Error as ex:
conninfo: str = "",
*,
autocommit: bool = False,
- row_factory: RowFactory = tuple_row,
+ row_factory: RowFactory[Any] = tuple_row,
**kwargs: Any,
) -> "AsyncConnection":
return await cls._wait_conn(
self._closed = True
self.pgconn.finish()
+ @overload
+ def cursor(self, *, binary: bool = False) -> AsyncCursor[Any]:
+ ...
+
@overload
def cursor(
- self, *, binary: bool = False, row_factory: Optional[RowFactory] = None
- ) -> AsyncCursor:
+ self, *, binary: bool = False, row_factory: RowFactory[Row]
+ ) -> AsyncCursor[Row]:
...
@overload
def cursor(
- self,
- name: str,
- *,
- binary: bool = False,
- row_factory: Optional[RowFactory] = None,
- ) -> AsyncServerCursor:
+ self, name: str, *, binary: bool = False
+ ) -> AsyncServerCursor[Any]:
+ ...
+
+ @overload
+ def cursor(
+ self, name: str, *, binary: bool = False, row_factory: RowFactory[Row]
+ ) -> AsyncServerCursor[Row]:
...
def cursor(
name: str = "",
*,
binary: bool = False,
- row_factory: Optional[RowFactory] = None,
- ) -> Union[AsyncCursor, AsyncServerCursor]:
+ row_factory: Optional[RowFactory[Any]] = None,
+ ) -> Union[AsyncCursor[Any], AsyncServerCursor[Any]]:
"""
Return a new `AsyncCursor` to send commands and queries to the connection.
"""
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
- ) -> AsyncCursor:
- cur = self.cursor()
+ ) -> AsyncCursor[Any]:
+ cur: AsyncCursor[Any] = self.cursor()
try:
return await cur.execute(query, params, prepare=prepare)
except e.Error as ex:
formatter: "Formatter"
- def __init__(self, cursor: "BaseCursor[ConnectionType]"):
+ def __init__(self, cursor: "BaseCursor[ConnectionType, Any]"):
self.cursor = cursor
self.connection = cursor.connection
self._pgconn = self.connection.pgconn
__module__ = "psycopg3"
- def __init__(self, cursor: "Cursor"):
+ def __init__(self, cursor: "Cursor[Any]"):
super().__init__(cursor)
self._queue: queue.Queue[Optional[bytes]] = queue.Queue(
maxsize=self.QUEUE_SIZE
__module__ = "psycopg3"
- def __init__(self, cursor: "AsyncCursor"):
+ def __init__(self, cursor: "AsyncCursor[Any]"):
super().__init__(cursor)
self._queue: asyncio.Queue[Optional[bytes]] = asyncio.Queue(
maxsize=self.QUEUE_SIZE
from .pq import ExecStatus, Format
from .copy import Copy, AsyncCopy
-from .rows import tuple_row
from .proto import ConnectionType, Query, Params, PQGen
from .proto import Row, RowFactory
from ._column import Column
execute = generators.execute
-class BaseCursor(Generic[ConnectionType]):
+class BaseCursor(Generic[ConnectionType, Row]):
# Slots with __weakref__ and generic bases don't work on Py 3.6
# https://bugs.python.org/issue41451
if sys.version_info >= (3, 7):
connection: ConnectionType,
*,
format: Format = Format.TEXT,
- row_factory: RowFactory = tuple_row,
+ row_factory: RowFactory[Row],
):
self._conn = connection
self.format = format
return None
@property
- def row_factory(self) -> RowFactory:
+ def row_factory(self) -> RowFactory[Row]:
"""Writable attribute to control how result rows are formed."""
return self._row_factory
@row_factory.setter
- def row_factory(self, row_factory: RowFactory) -> None:
+ def row_factory(self, row_factory: RowFactory[Row]) -> None:
self._row_factory = row_factory
if self.pgresult:
self._make_row = row_factory(self)
self._pgq = pgq
-class Cursor(BaseCursor["Connection"]):
+class Cursor(BaseCursor["Connection", Row]):
__module__ = "psycopg3"
__slots__ = ()
- def __enter__(self) -> "Cursor":
+ def __enter__(self) -> "Cursor[Row]":
return self
def __exit__(
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
- ) -> "Cursor":
+ ) -> "Cursor[Row]":
"""
Execute a query or command to the database.
"""
if not size:
size = self.arraysize
- records: List[Row] = self._tx.load_rows(
+ records = self._tx.load_rows(
self._pos,
min(self._pos + size, self.pgresult.ntuples),
self._make_row,
"""
self._check_result()
assert self.pgresult
- records: List[Row] = self._tx.load_rows(
+ records = self._tx.load_rows(
self._pos, self.pgresult.ntuples, self._make_row
)
self._pos = self.pgresult.ntuples
yield copy
-class AsyncCursor(BaseCursor["AsyncConnection"]):
+class AsyncCursor(BaseCursor["AsyncConnection", Row]):
__module__ = "psycopg3"
__slots__ = ()
- async def __aenter__(self) -> "AsyncCursor":
+ async def __aenter__(self) -> "AsyncCursor[Row]":
return self
async def __aexit__(
params: Optional[Params] = None,
*,
prepare: Optional[bool] = None,
- ) -> "AsyncCursor":
+ ) -> "AsyncCursor[Row]":
try:
async with self._conn.lock:
await self._conn.wait(
if not size:
size = self.arraysize
- records: List[Row] = self._tx.load_rows(
+ records = self._tx.load_rows(
self._pos,
min(self._pos + size, self.pgresult.ntuples),
self._make_row,
async def fetchall(self) -> List[Row]:
self._check_result()
assert self.pgresult
- records: List[Row] = self._tx.load_rows(
+ records = self._tx.load_rows(
self._pos, self.pgresult.ntuples, self._make_row
)
self._pos = self.pgresult.ntuples
# Row factories
Row = TypeVar("Row")
+Row_co = TypeVar("Row_co", covariant=True)
-class RowMaker(Protocol):
- def __call__(self, __values: Sequence[Any]) -> Any:
+class RowMaker(Protocol[Row_co]):
+ def __call__(self, __values: Sequence[Any]) -> Row_co:
...
-class RowFactory(Protocol):
- def __call__(self, __cursor: "BaseCursor[Any]") -> RowMaker:
+class RowFactory(Protocol[Row]):
+ def __call__(self, __cursor: "BaseCursor[Any, Row]") -> RowMaker[Row]:
...
def get_dumper(self, obj: Any, format: Format) -> "Dumper":
...
- def load_rows(self, row0: int, row1: int, make_row: RowMaker) -> List[Row]:
+ def load_rows(
+ self, row0: int, row1: int, make_row: RowMaker[Row]
+ ) -> List[Row]:
...
- def load_row(self, row: int, make_row: RowMaker) -> Optional[Row]:
+ def load_row(self, row: int, make_row: RowMaker[Row]) -> Optional[Row]:
...
def load_sequence(
from .cursor import BaseCursor
+TupleRow = Tuple[Any, ...]
+
+
def tuple_row(
- cursor: "BaseCursor[Any]",
-) -> Callable[[Sequence[Any]], Tuple[Any, ...]]:
+ cursor: "BaseCursor[Any, TupleRow]",
+) -> Callable[[Sequence[Any]], TupleRow]:
"""Row factory to represent rows as simple tuples.
This is the default factory.
return tuple
+DictRow = Dict[str, Any]
+
+
def dict_row(
- cursor: "BaseCursor[Any]",
-) -> Callable[[Sequence[Any]], Dict[str, Any]]:
+ cursor: "BaseCursor[Any, DictRow]",
+) -> Callable[[Sequence[Any]], DictRow]:
"""Row factory to represent rows as dicts.
Note that this is not compatible with the DBAPI, which expects the records
def namedtuple_row(
- cursor: "BaseCursor[Any]",
+ cursor: "BaseCursor[Any, NamedTuple]",
) -> Callable[[Sequence[Any]], NamedTuple]:
"""Row factory to represent rows as `~collections.namedtuple`."""
from . import pq
from . import sql
from . import errors as e
-from .rows import tuple_row
from .cursor import BaseCursor, execute
from .proto import ConnectionType, Query, Params, PQGen, Row, RowFactory
DEFAULT_ITERSIZE = 100
-class ServerCursorHelper(Generic[ConnectionType]):
+class ServerCursorHelper(Generic[ConnectionType, Row]):
__slots__ = ("name", "described")
"""Helper object for common ServerCursor code.
self.name = name
self.described = False
- def _repr(self, cur: BaseCursor[ConnectionType]) -> str:
+ def _repr(self, cur: BaseCursor[ConnectionType, Row]) -> str:
cls = f"{cur.__class__.__module__}.{cur.__class__.__qualname__}"
info = pq.misc.connection_summary(cur._conn.pgconn)
if cur._closed:
def _declare_gen(
self,
- cur: BaseCursor[ConnectionType],
+ cur: BaseCursor[ConnectionType, Row],
query: Query,
params: Optional[Params] = None,
) -> PQGen[None]:
# The above result only returned COMMAND_OK. Get the cursor shape
yield from self._describe_gen(cur)
- def _describe_gen(self, cur: BaseCursor[ConnectionType]) -> PQGen[None]:
+ def _describe_gen(
+ self, cur: BaseCursor[ConnectionType, Row]
+ ) -> PQGen[None]:
conn = cur._conn
conn.pgconn.send_describe_portal(
self.name.encode(conn.client_encoding)
cur._execute_results(results)
self.described = True
- def _close_gen(self, cur: BaseCursor[ConnectionType]) -> PQGen[None]:
+ def _close_gen(self, cur: BaseCursor[ConnectionType, Row]) -> PQGen[None]:
# if the connection is not in a sane state, don't even try
if cur._conn.pgconn.transaction_status not in (
pq.TransactionStatus.IDLE,
yield from cur._conn._exec_command(query)
def _fetch_gen(
- self, cur: BaseCursor[ConnectionType], num: Optional[int]
+ self, cur: BaseCursor[ConnectionType, Row], num: Optional[int]
) -> PQGen[List[Row]]:
# If we are stealing the cursor, make sure we know its shape
if not self.described:
return cur._tx.load_rows(0, res.ntuples, cur._make_row)
def _scroll_gen(
- self, cur: BaseCursor[ConnectionType], value: int, mode: str
+ self, cur: BaseCursor[ConnectionType, Row], value: int, mode: str
) -> PQGen[None]:
if mode not in ("relative", "absolute"):
raise ValueError(
def _make_declare_statement(
self,
- cur: BaseCursor[ConnectionType],
+ cur: BaseCursor[ConnectionType, Row],
query: Query,
scrollable: Optional[bool],
hold: bool,
return sql.SQL(" ").join(parts)
-class ServerCursor(BaseCursor["Connection"]):
+class ServerCursor(BaseCursor["Connection", Row]):
__module__ = "psycopg3"
__slots__ = ("_helper", "itersize")
name: str,
*,
format: pq.Format = pq.Format.TEXT,
- row_factory: RowFactory = tuple_row,
+ row_factory: RowFactory[Row],
):
super().__init__(connection, format=format, row_factory=row_factory)
- self._helper: ServerCursorHelper["Connection"]
+ self._helper: ServerCursorHelper["Connection", Row]
self._helper = ServerCursorHelper(name)
self.itersize: int = DEFAULT_ITERSIZE
def __repr__(self) -> str:
return self._helper._repr(self)
- def __enter__(self) -> "ServerCursor":
+ def __enter__(self) -> "ServerCursor[Row]":
return self
def __exit__(
*,
scrollable: Optional[bool] = None,
hold: bool = False,
- ) -> "ServerCursor":
+ ) -> "ServerCursor[Row]":
"""
Open a cursor to execute a query to the database.
"""
def fetchone(self) -> Optional[Row]:
with self._conn.lock:
- recs: List[Row] = self._conn.wait(self._helper._fetch_gen(self, 1))
+ recs = self._conn.wait(self._helper._fetch_gen(self, 1))
if recs:
self._pos += 1
return recs[0]
if not size:
size = self.arraysize
with self._conn.lock:
- recs: List[Row] = self._conn.wait(
- self._helper._fetch_gen(self, size)
- )
+ recs = self._conn.wait(self._helper._fetch_gen(self, size))
self._pos += len(recs)
return recs
def fetchall(self) -> Sequence[Row]:
with self._conn.lock:
- recs: List[Row] = self._conn.wait(
- self._helper._fetch_gen(self, None)
- )
+ recs = self._conn.wait(self._helper._fetch_gen(self, None))
self._pos += len(recs)
return recs
def __iter__(self) -> Iterator[Row]:
while True:
with self._conn.lock:
- recs: List[Row] = self._conn.wait(
+ recs = self._conn.wait(
self._helper._fetch_gen(self, self.itersize)
)
for rec in recs:
self._pos = value
-class AsyncServerCursor(BaseCursor["AsyncConnection"]):
+class AsyncServerCursor(BaseCursor["AsyncConnection", Row]):
__module__ = "psycopg3"
__slots__ = ("_helper", "itersize")
name: str,
*,
format: pq.Format = pq.Format.TEXT,
- row_factory: RowFactory = tuple_row,
+ row_factory: RowFactory[Row],
):
super().__init__(connection, format=format, row_factory=row_factory)
- self._helper: ServerCursorHelper["AsyncConnection"]
+ self._helper: ServerCursorHelper["AsyncConnection", Row]
self._helper = ServerCursorHelper(name)
self.itersize: int = DEFAULT_ITERSIZE
def __repr__(self) -> str:
return self._helper._repr(self)
- async def __aenter__(self) -> "AsyncServerCursor":
+ async def __aenter__(self) -> "AsyncServerCursor[Row]":
return self
async def __aexit__(
*,
scrollable: Optional[bool] = None,
hold: bool = False,
- ) -> "AsyncServerCursor":
+ ) -> "AsyncServerCursor[Row]":
query = self._helper._make_declare_statement(
self, query, scrollable=scrollable, hold=hold
)
async def fetchone(self) -> Optional[Row]:
async with self._conn.lock:
- recs: List[Row] = await self._conn.wait(
- self._helper._fetch_gen(self, 1)
- )
+ recs = await self._conn.wait(self._helper._fetch_gen(self, 1))
if recs:
self._pos += 1
return recs[0]
if not size:
size = self.arraysize
async with self._conn.lock:
- recs: List[Row] = await self._conn.wait(
- self._helper._fetch_gen(self, size)
- )
+ recs = await self._conn.wait(self._helper._fetch_gen(self, size))
self._pos += len(recs)
return recs
async def fetchall(self) -> Sequence[Row]:
async with self._conn.lock:
- recs: List[Row] = await self._conn.wait(
- self._helper._fetch_gen(self, None)
- )
+ recs = await self._conn.wait(self._helper._fetch_gen(self, None))
self._pos += len(recs)
return recs
async def __aiter__(self) -> AsyncIterator[Row]:
while True:
async with self._conn.lock:
- recs: List[Row] = await self._conn.wait(
+ recs = await self._conn.wait(
self._helper._fetch_gen(self, self.itersize)
)
for rec in recs:
) -> Tuple[List[Any], Tuple[int, ...], Sequence[pq.Format]]: ...
def get_dumper(self, obj: Any, format: Format) -> Dumper: ...
def load_rows(
- self, row0: int, row1: int, make_row: proto.RowMaker
+ self, row0: int, row1: int, make_row: proto.RowMaker[proto.Row]
) -> List[proto.Row]: ...
def load_row(
- self, row: int, make_row: proto.RowMaker
+ self, row: int, make_row: proto.RowMaker[proto.Row]
) -> Optional[proto.Row]: ...
def load_sequence(
self, record: Sequence[Optional[bytes]]