# Copyright (C) 2020-2021 The Psycopg Team
cimport cython
-from cpython.ref cimport Py_INCREF
+from cpython.ref cimport Py_INCREF, Py_DECREF
from cpython.set cimport PySet_Add, PySet_Contains
from cpython.dict cimport PyDict_GetItem, PyDict_SetItem
from cpython.list cimport (
cdef object make_row = self.make_row
if make_row is not tuple:
- for i in range(len(records)):
- records[i] = make_row(records[i])
+ for i in range(row1 - row0):
+ brecord = PyList_GET_ITEM(records, i)
+ record = PyObject_CallFunctionObjArgs(
+ make_row, <PyObject *>brecord, NULL)
+ Py_INCREF(record)
+ PyList_SET_ITEM(records, i, record)
+ Py_DECREF(<object>brecord)
return records
def load_row(self, int row) -> Optional[Row]:
cdef object make_row = self.make_row
if make_row is not tuple:
- record = make_row(record)
+ record = PyObject_CallFunctionObjArgs(
+ make_row, <PyObject *>record, NULL)
return record
cpdef object load_sequence(self, record: Sequence[Optional[bytes]]):
import pytest
import psycopg3
-from psycopg3 import sql
+from psycopg3 import sql, rows
from psycopg3.oids import postgres_types as builtins
-from psycopg3.rows import dict_row
from psycopg3.adapt import Format
assert cur.fetchall() == [["Yy", "Zz"]]
cur.scroll(-1)
- cur.row_factory = dict_row
+ cur.row_factory = rows.dict_row
assert cur.fetchone() == {"y": "y", "z": "z"}
@pytest.mark.slow
@pytest.mark.parametrize("fmt", [Format.AUTO, Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
-def test_leak(dsn, faker, fmt, fetch):
+@pytest.mark.parametrize(
+ "row_factory", ["tuple_row", "dict_row", "namedtuple_row"]
+)
+def test_leak(dsn, faker, fmt, fetch, row_factory):
faker.format = fmt
faker.choose_schema(ncols=5)
faker.make_records(10)
+ row_factory = getattr(rows, row_factory)
n = []
for i in range(3):
with psycopg3.connect(dsn) as conn:
- with conn.cursor(binary=Format.as_pq(fmt)) as cur:
+ with conn.cursor(
+ binary=Format.as_pq(fmt), row_factory=row_factory
+ ) as cur:
cur.execute(faker.drop_stmt)
cur.execute(faker.create_stmt)
cur.executemany(faker.insert_stmt, faker.records)
import datetime as dt
import psycopg3
-from psycopg3 import sql
-from psycopg3.rows import dict_row
+from psycopg3 import sql, rows
from psycopg3.adapt import Format
from .test_cursor import my_row_factory
assert await cur.fetchall() == [["Yy", "Zz"]]
await cur.scroll(-1)
- cur.row_factory = dict_row
+ cur.row_factory = rows.dict_row
assert await cur.fetchone() == {"y": "y", "z": "z"}
@pytest.mark.slow
@pytest.mark.parametrize("fmt", [Format.AUTO, Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
-async def test_leak(dsn, faker, fmt, fetch):
+@pytest.mark.parametrize(
+ "row_factory", ["tuple_row", "dict_row", "namedtuple_row"]
+)
+async def test_leak(dsn, faker, fmt, fetch, row_factory):
faker.format = fmt
faker.choose_schema(ncols=5)
faker.make_records(10)
+ row_factory = getattr(rows, row_factory)
n = []
for i in range(3):
async with await psycopg3.AsyncConnection.connect(dsn) as conn:
- async with conn.cursor(binary=Format.as_pq(fmt)) as cur:
+ async with conn.cursor(
+ binary=Format.as_pq(fmt), row_factory=row_factory
+ ) as cur:
await cur.execute(faker.drop_stmt)
await cur.execute(faker.create_stmt)
await cur.executemany(faker.insert_stmt, faker.records)