``psycopg`` release notes
=========================
+Psycopg 3.1.6 (unreleased)
+^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+- Fix `cursor.copy()` with cursors using row factories (:ticket:`#460`).
+
+
Current release
---------------
"""
nfields = res.nfields
- if nfields or res.status == TUPLES_OK or res.status == SINGLE_TUPLE:
+ if res.status == TUPLES_OK or res.status == SINGLE_TUPLE:
return nfields
else:
return None
@pytest.mark.parametrize("format", Format)
-def test_copy_out_iter(conn, format):
+@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
+def test_copy_out_iter(conn, format, row_factory):
if format == pq.Format.TEXT:
want = [row + b"\n" for row in sample_text.splitlines()]
else:
want = sample_binary_rows
- cur = conn.cursor()
+ rf = getattr(psycopg.rows, row_factory)
+ cur = conn.cursor(row_factory=rf)
with cur.copy(f"copy ({sample_values}) to stdout (format {format.name})") as copy:
assert list(copy) == want
assert conn.info.transaction_status == conn.TransactionStatus.INTRANS
+@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
+def test_copy_out_no_result(conn, format, row_factory):
+ rf = getattr(psycopg.rows, row_factory)
+ cur = conn.cursor(row_factory=rf)
+ with cur.copy(f"copy ({sample_values}) to stdout (format {format.name})"):
+ with pytest.raises(e.ProgrammingError):
+ cur.fetchone()
+
+
@pytest.mark.parametrize("ph, params", [("%s", (10,)), ("%(n)s", {"n": 10})])
def test_copy_out_param(conn, ph, params):
cur = conn.cursor()
@pytest.mark.parametrize("format", Format)
-async def test_copy_out_iter(aconn, format):
+@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
+async def test_copy_out_iter(aconn, format, row_factory):
if format == pq.Format.TEXT:
want = [row + b"\n" for row in sample_text.splitlines()]
else:
want = sample_binary_rows
- cur = aconn.cursor()
+ rf = getattr(psycopg.rows, row_factory)
+ cur = aconn.cursor(row_factory=rf)
async with cur.copy(
f"copy ({sample_values}) to stdout (format {format.name})"
) as copy:
assert aconn.info.transaction_status == aconn.TransactionStatus.INTRANS
+@pytest.mark.parametrize("format", Format)
+@pytest.mark.parametrize("row_factory", ["tuple_row", "dict_row", "namedtuple_row"])
+async def test_copy_out_no_result(aconn, format, row_factory):
+ rf = getattr(psycopg.rows, row_factory)
+ cur = aconn.cursor(row_factory=rf)
+ async with cur.copy(f"copy ({sample_values}) to stdout (format {format.name})"):
+ with pytest.raises(e.ProgrammingError):
+ await cur.fetchone()
+
+
@pytest.mark.parametrize("ph, params", [("%s", (10,)), ("%(n)s", {"n": 10})])
async def test_copy_out_param(aconn, ph, params):
cur = aconn.cursor()