if self._iresult < len(self._results):
self.pgresult = self._results[self._iresult]
self._tx.set_pgresult(self._results[self._iresult])
+ if self._row_factory:
+ self._tx.make_row = self._row_factory(self)
self._pos = 0
nrows = self.pgresult.command_tuples
self._rowcount = nrows if nrows is not None else -1
def test_row_factory(conn):
- def my_row_factory(cur):
- return lambda values: [-v for v in values]
-
cur = conn.cursor(row_factory=my_row_factory)
- cur.execute("select generate_series(1, 3)")
- r = cur.fetchall()
- assert r == [[-1], [-2], [-3]]
+ cur.execute("select 'foo' as bar")
+ (r,) = cur.fetchone()
+ assert r == "FOObar"
- cur.execute("select 42; select generate_series(1,3)")
- assert cur.fetchall() == [[-42]]
+ cur.execute("select 'x' as x; select 'y' as y, 'z' as z")
+ assert cur.fetchall() == [["Xx"]]
assert cur.nextset()
- assert cur.fetchall() == [[-1], [-2], [-3]]
+ assert cur.fetchall() == [["Yy", "Zz"]]
assert cur.nextset() is None
assert (
n[0] == n[1] == n[2]
), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+
+
+def my_row_factory(cursor):
+ assert cursor.description is not None
+ titles = [c.name for c in cursor.description]
+
+ def mkrow(values):
+ return [
+ f"{value.upper()}{title}" for title, value in zip(titles, values)
+ ]
+
+ return mkrow
import psycopg3
from psycopg3 import sql
from psycopg3.adapt import Format
+from .test_cursor import my_row_factory
pytestmark = pytest.mark.asyncio
async def test_row_factory(aconn):
- def my_row_factory(cursor):
- def mkrow(values):
- assert cursor.description is not None
- titles = [c.name for c in cursor.description]
- return [
- f"{value.upper()}{title}"
- for title, value in zip(titles, values)
- ]
-
- return mkrow
-
cur = aconn.cursor(row_factory=my_row_factory)
await cur.execute("select 'foo' as bar")
(r,) = await cur.fetchone()