# https://bugs.python.org/issue41451
if sys.version_info >= (3, 7):
__slots__ = """
- _conn format _adapters arraysize _closed _results _pgresult _pos
+ _conn format _adapters arraysize _closed _results pgresult _pos
_iresult _rowcount _pgq _tx _last_query _row_factory
__weakref__
""".split()
def _reset(self) -> None:
self._results: List["PGresult"] = []
- self._pgresult: Optional["PGresult"] = None
+ self.pgresult: Optional["PGresult"] = None
+ """The `~psycopg3.pq.PGresult` exposed by the cursor."""
self._pos = 0
self._iresult = 0
self._rowcount = -1
info = pq.misc.connection_summary(self._conn.pgconn)
if self._closed:
status = "closed"
- elif not self._pgresult:
- status = "no result"
+ elif self.pgresult:
+ status = pq.ExecStatus(self.pgresult.status).name
else:
- status = pq.ExecStatus(self._pgresult.status).name
+ status = "no result"
return f"<{cls} [{status}] {info} at 0x{id(self):x}>"
@property
"""The last set of parameters sent to the server, if available."""
return self._pgq.params if self._pgq else None
- @property
- def pgresult(self) -> Optional["PGresult"]:
- """The `~psycopg3.pq.PGresult` exposed by the cursor."""
- return self._pgresult
-
- @pgresult.setter
- def pgresult(self, result: Optional["PGresult"]) -> None:
- self._pgresult = result
-
@property
def description(self) -> Optional[List[Column]]:
"""
`!None` if there is no result to fetch.
"""
- return self._pos if self._pgresult else None
+ return self._pos if self.pgresult else None
def setinputsizes(self, sizes: Sequence[Any]) -> None:
# no-op
else:
return None
+ @property
+ def row_factory(self) -> RowFactory:
+ return self._row_factory
+
+ @row_factory.setter
+ def row_factory(self, row_factory: RowFactory) -> None:
+ self._row_factory = row_factory
+ if self.pgresult:
+ self._tx.make_row = row_factory(self)
+
#
# Generators for the high level operations on the cursor
#
info = pq.misc.connection_summary(cur._conn.pgconn)
if cur._closed:
status = "closed"
- elif not cur._pgresult:
+ elif not cur.pgresult:
status = "no result"
else:
- status = pq.ExecStatus(cur._pgresult.status).name
+ status = pq.ExecStatus(cur.pgresult.status).name
return f"<{cls} {self.name!r} [{status}] {info} at 0x{id(cur):x}>"
def _declare_gen(
import psycopg3
from psycopg3 import sql
from psycopg3.oids import postgres_types as builtins
+from psycopg3.rows import dict_row
from psycopg3.adapt import Format
assert cur.fetchall() == [["Xx"]]
assert cur.nextset()
assert cur.fetchall() == [["Yy", "Zz"]]
- assert cur.nextset() is None
+
+ cur.scroll(-1)
+ cur.row_factory = dict_row
+ assert cur.fetchone() == {"y": "y", "z": "z"}
def test_scroll(conn):
import psycopg3
from psycopg3 import sql
+from psycopg3.rows import dict_row
from psycopg3.adapt import Format
from .test_cursor import my_row_factory
assert await cur.fetchall() == [["Xx"]]
assert cur.nextset()
assert await cur.fetchall() == [["Yy", "Zz"]]
- assert cur.nextset() is None
+
+ await cur.scroll(-1)
+ cur.row_factory = dict_row
+ assert await cur.fetchone() == {"y": "y", "z": "z"}
async def test_scroll(aconn):
from psycopg3 import errors as e
from psycopg3.pq import Format
+from psycopg3.rows import dict_row
def test_funny_name(conn):
return lambda values: [n] + [-v for v in values]
cur = conn.cursor("foo", row_factory=my_row_factory)
- cur.execute("select generate_series(1, 3)", scrollable=True)
+ cur.execute("select generate_series(1, 3) as x", scrollable=True)
rows = cur.fetchall()
cur.scroll(0, "absolute")
while 1:
rows.append(row)
assert rows == [[1, -1], [1, -2], [1, -3]] * 2
+ cur.scroll(0, "absolute")
+ cur.row_factory = dict_row
+ assert cur.fetchone() == {"x": 1}
+
def test_rownumber(conn):
cur = conn.cursor("foo")
import pytest
from psycopg3 import errors as e
+from psycopg3.rows import dict_row
from psycopg3.pq import Format
pytestmark = pytest.mark.asyncio
return lambda values: [n] + [-v for v in values]
cur = aconn.cursor("foo", row_factory=my_row_factory)
- await cur.execute("select generate_series(1, 3)", scrollable=True)
+ await cur.execute("select generate_series(1, 3) as x", scrollable=True)
rows = await cur.fetchall()
await cur.scroll(0, "absolute")
while 1:
rows.append(row)
assert rows == [[1, -1], [1, -2], [1, -3]] * 2
+ await cur.scroll(0, "absolute")
+ cur.row_factory = dict_row
+ assert await cur.fetchone() == {"x": 1}
+
async def test_rownumber(aconn):
cur = aconn.cursor("foo")