cdef PyObject *fmt = <PyObject *>PG_TEXT
cdef PyObject *row_dumper
- # try to get preloaded dumpers from set_types
- if not tx._row_dumpers:
- tx._row_dumpers = PyList_New(rowlen)
-
dumpers = tx._row_dumpers
- if PyList_GET_SIZE(dumpers) != rowlen:
+ if dumpers and PyList_GET_SIZE(dumpers) != rowlen:
raise e.DataError(f"expected {len(dumpers)} values in row, got {rowlen}")
for i in range(rowlen):
_append_text_none(out, &pos, with_tab)
continue
- row_dumper = PyList_GET_ITEM(dumpers, i)
- if not row_dumper:
+ if dumpers:
+ # pinned dumpers from set_types are authoritative
+ row_dumper = PyList_GET_ITEM(dumpers, i)
+ else:
+ # no pinned dumpers, thus free value dumping
row_dumper = tx.get_row_dumper(<PyObject *>item, fmt)
- Py_INCREF(<object>row_dumper)
- PyList_SET_ITEM(dumpers, i, <object>row_dumper)
if (<RowDumper>row_dumper).cdumper is not None:
# A cdumper can resize if necessary and copy in place
assert data == [(1, None, "hello"), (2, None, "world")]
+def test_copy_in_text_no_pinning(conn):
+ cur = conn.cursor()
+ cols = [
+ "col1 serial primary key",
+ "col2 int",
+ "col3 int",
+ "col4 double precision",
+ "col5 double precision",
+ ]
+ ensure_table(cur, ",".join(cols))
+
+ with cur.copy(
+ "copy copy_in (col2,col3,col4,col5) from stdin (format text)"
+ ) as copy:
+ # no pinned dumpers: type check & cast done on postgres side
+ # allows to mix castable reprs more freely
+ # slower than pinned, late errors from postgres jeopardizing copy cursor
+ copy.write_row([1, "2", 3, "4.1"])
+ copy.write_row(["1", 2, 3.0, 4])
+
+ cur.execute("select col2,col3,col4,col5 from copy_in order by 1")
+ data = cur.fetchall()
+ assert data == [(1, 2, 3, 4.1), (1, 2, 3, 4)]
+
+
+def test_copy_in_text_pinned(conn):
+ # FIXME: this test works currently only in c,
+ # as c/python dumpers differ in what they accept as valid input
+ # here: python int & float text dumpers always allow str as input
+ from psycopg._cmodule import _psycopg
+
+ if not _psycopg:
+ return
+ cur = conn.cursor()
+ cols = [
+ "col1 serial primary key",
+ "col2 int",
+ "col3 int",
+ "col4 double precision",
+ "col5 double precision",
+ ]
+ ensure_table(cur, ",".join(cols))
+
+ with cur.copy(
+ "copy copy_in (col2,col3,col4,col5) from stdin (format text)"
+ ) as copy:
+ # pinned dumpers from set_types: type check & cast done on psycopg side
+ # much faster, allows catching errors early without postgres involvement
+ copy.set_types(["int4", "int4", "double precision", "double precision"])
+ copy.write_row([1, 2, 3, 4.1])
+ with pytest.raises(
+ (e.DataError, TypeError)
+ ): # FIXME: should errors from dumpers be harmonized?
+ copy.write_row([1.0, 2, 3, 4.1])
+ with pytest.raises((e.DataError, TypeError)):
+ copy.write_row([1, "2", 3, 4.1])
+ with pytest.raises((e.DataError, TypeError)):
+ copy.write_row([1, 2, 3, "4.1"])
+
+ cur.execute("select col2,col3,col4,col5 from copy_in order by 1")
+ data = cur.fetchall()
+ assert data == [(1, 2, 3, 4.1)]
+
+
def test_copy_in_allchars(conn):
cur = conn.cursor()
ensure_table(cur, sample_tabledef)
assert data == [(1, None, "hello"), (2, None, "world")]
+async def test_copy_in_text_no_pinning(aconn):
+ cur = aconn.cursor()
+ cols = [
+ "col1 serial primary key",
+ "col2 int",
+ "col3 int",
+ "col4 double precision",
+ "col5 double precision",
+ ]
+ await ensure_table_async(cur, ",".join(cols))
+
+ async with cur.copy(
+ "copy copy_in (col2,col3,col4,col5) from stdin (format text)"
+ ) as copy:
+ # no pinned dumpers: type check & cast done on postgres side
+ # allows to mix castable reprs more freely
+ # slower than pinned, late errors from postgres jeopardizing copy cursor
+ await copy.write_row([1, "2", 3, "4.1"])
+ await copy.write_row(["1", 2, 3.0, 4])
+
+ await cur.execute("select col2,col3,col4,col5 from copy_in order by 1")
+ data = await cur.fetchall()
+ assert data == [(1, 2, 3, 4.1), (1, 2, 3, 4)]
+
+
+async def test_copy_in_text_pinned(aconn):
+ # FIXME: this test works currently only in c,
+ # as c/python dumpers differ in what they accept as valid input
+ # here: python int & float text dumpers always allow str as input
+ from psycopg._cmodule import _psycopg
+
+ if not _psycopg:
+ return
+ cur = aconn.cursor()
+ cols = [
+ "col1 serial primary key",
+ "col2 int",
+ "col3 int",
+ "col4 double precision",
+ "col5 double precision",
+ ]
+ await ensure_table_async(cur, ",".join(cols))
+
+ async with cur.copy(
+ "copy copy_in (col2,col3,col4,col5) from stdin (format text)"
+ ) as copy:
+ # pinned dumpers from set_types: type check & cast done on psycopg side
+ # much faster, allows catching errors early without postgres involvement
+ copy.set_types(["int4", "int4", "double precision", "double precision"])
+ await copy.write_row([1, 2, 3, 4.1])
+ with pytest.raises(
+ (e.DataError, TypeError)
+ ): # FIXME: should errors from dumpers be harmonized?
+ await copy.write_row([1.0, 2, 3, 4.1])
+ with pytest.raises((e.DataError, TypeError)):
+ await copy.write_row([1, "2", 3, 4.1])
+ with pytest.raises((e.DataError, TypeError)):
+ await copy.write_row([1, 2, 3, "4.1"])
+
+ await cur.execute("select col2,col3,col4,col5 from copy_in order by 1")
+ data = await cur.fetchall()
+ assert data == [(1, 2, 3, 4.1)]
+
+
async def test_copy_in_allchars(aconn):
cur = aconn.cursor()
await ensure_table_async(cur, sample_tabledef)