.. admonition:: TODO
Document the other types
+
+ Document that empty array don't roundtrip in text mode and require
+ a cast in binary to be used in any context.
# mapping oid, fmt -> Loader instance
self._loaders_cache: Tuple[LoaderCache, LoaderCache] = ({}, {})
+ self._row_dumpers: List[Optional["Dumper"]] = []
+
# sequence of load functions from value to python
# the length of the result columns
self._row_loaders: List[LoadFunc] = []
) -> Tuple[List[Any], Tuple[int, ...]]:
ps: List[Optional[bytes]] = [None] * len(params)
ts = [self._unknown_oid] * len(params)
+
+ dumpers = self._row_dumpers
+ if not dumpers:
+ dumpers = self._row_dumpers = [None] * len(params)
+
for i in range(len(params)):
param = params[i]
if param is not None:
- dumper = self.get_dumper(param, formats[i])
+ dumper = dumpers[i]
+ if not dumper:
+ dumper = dumpers[i] = self.get_dumper(param, formats[i])
ps[i] = dumper.dump(param)
ts[i] = dumper.oid
else:
# TODO: Can be probably generalised to handle other recursive types
subobj = self._find_list_element(obj)
- if subobj is None:
- subobj = ""
key = (cls, type(subobj))
try:
f" to format {Format(format).name}"
)
- d = self._dumpers_cache[format][key] = dcls(cls, self)
+ d = dcls(cls, self)
if cls is list:
- sub_dumper = self.get_dumper(subobj, format)
- cast("BaseListDumper", d).set_sub_dumper(sub_dumper)
-
+ if subobj is not None:
+ sub_dumper = self.get_dumper(subobj, format)
+ cast("BaseListDumper", d).set_sub_dumper(sub_dumper)
+ elif format == Format.TEXT:
+ # Special case dumping an empty list (or containing no None
+ # element). In text mode we cast them as unknown, so that
+ # postgres can cast them automatically to something useful.
+ # In binary we cannot do it, it doesn't seem there is a
+ # representation for unknown array, so let's dump it as text[].
+ # This means that placeholders receiving a binary array should
+ # be almost always cast to the target type.
+ d.oid = self._unknown_oid
+
+ self._dumpers_cache[format][key] = d
return d
def load_rows(self, row0: int, row1: int) -> List[Tuple[Any, ...]]:
class BaseListDumper(Dumper):
-
- _oid = TEXT_ARRAY_OID
-
def __init__(self, cls: type, context: Optional[AdaptContext] = None):
super().__init__(cls, context)
tx = Transformer(context)
def dump(self, obj: List[Any]) -> bytes:
if not obj:
- return _struct_head.pack(0, 0, TEXT_OID)
+ return _struct_head.pack(0, 0, self.sub_oid)
data: List[bytes] = [b"", b""] # placeholders to avoid a resize
dims: List[int] = []
# Copyright (C) 2020 The Psycopg Team
+cimport cython
from cpython.ref cimport Py_INCREF
from cpython.set cimport PySet_Add, PySet_Contains
from cpython.dict cimport PyDict_GetItem, PyDict_SetItem
# ...more members, which we ignore
+@cython.freelist(16)
cdef class RowLoader:
- cdef object pyloader
+ cdef object loadfunc
cdef CLoader cloader
+@cython.freelist(16)
+cdef class RowDumper:
+ cdef object dumpfunc
+ cdef object oid
+ cdef CDumper cdumper
+
+
cdef class Transformer:
"""
An object that can adapt efficiently between Python and PostgreSQL.
cdef dict _binary_loaders
cdef pq.PGresult _pgresult
cdef int _nfields, _ntuples
+ cdef list _row_dumpers
cdef list _row_loaders
cdef int _unknown_oid
self._binary_loaders = {}
self.pgresult = None
+ self._row_dumpers = None
self._row_loaders = []
@property
cdef RowLoader _get_row_loader(self, PyObject *oid, PyObject *fmt):
cdef RowLoader row_loader = RowLoader()
loader = self._c_get_loader(oid, fmt)
- row_loader.pyloader = loader.load
+ row_loader.loadfunc = loader.load
if isinstance(loader, CLoader):
row_loader.cloader = loader
key = cls
else:
subobj = self._find_list_element(obj, set())
- if subobj is None:
- subobj = ""
key = (cls, type(subobj))
cache = self._binary_dumpers if format else self._text_dumpers
if dcls is None:
raise e.ProgrammingError(
f"cannot adapt type {cls.__name__}"
- f" to format {Format(format).name}"
- )
+ f" to format {Format(format).name}")
d = PyObject_CallFunctionObjArgs(
dcls, <PyObject *>cls, <PyObject *>self, NULL)
if cls is list:
- sub_dumper = self.get_dumper(subobj, format)
- d.set_sub_dumper(sub_dumper)
+ if subobj is not None:
+ sub_dumper = self.get_dumper(subobj, format)
+ d.set_sub_dumper(sub_dumper)
+ elif format == FORMAT_TEXT:
+ # Special case dumping an empty list (or containing no None
+ # element). In text mode we cast them as unknown, so that
+ # postgres can cast them automatically to something useful.
+ # In binary we cannot do it, it doesn't seem there is a
+ # representation for unknown array, so let's dump it as text[].
+ # This means that placeholders receiving a binary array should
+ # be almost always cast to the target type.
+ d.oid = self._unknown_oid
PyDict_SetItem(cache, key, d)
return d
cdef tuple ts = PyTuple_New(nparams)
cdef object dumped, oid
cdef Py_ssize_t size
+ cdef PyObject *dumper_ptr # borrowed pointer to row dumper
+
+ if self._row_dumpers is None:
+ self._row_dumpers = PyList_New(nparams)
+
+ dumpers = self._row_dumpers
cdef int i
for i in range(nparams):
param = params[i]
if param is not None:
- format = formats[i]
- dumper = self.get_dumper(param, format)
- if isinstance(dumper, CDumper):
+ dumper_ptr = PyList_GET_ITEM(dumpers, i)
+ if dumper_ptr == NULL:
+ format = formats[i]
+ tmp_dumper = self._get_row_dumper(param, format)
+ Py_INCREF(tmp_dumper)
+ PyList_SET_ITEM(dumpers, i, tmp_dumper)
+ dumper_ptr = <PyObject *>tmp_dumper
+
+ oid = (<RowDumper>dumper_ptr).oid
+ if (<RowDumper>dumper_ptr).cdumper is not None:
dumped = PyByteArray_FromStringAndSize("", 0)
- size = (<CDumper>dumper).cdump(param, <bytearray>dumped, 0)
+ size = (<RowDumper>dumper_ptr).cdumper.cdump(
+ param, <bytearray>dumped, 0)
PyByteArray_Resize(dumped, size)
- oid = (<CDumper>dumper).oid
else:
- dumped = dumper.dump(param)
- oid = dumper.oid
+ dumped = PyObject_CallFunctionObjArgs(
+ (<RowDumper>dumper_ptr).dumpfunc,
+ <PyObject *>param, NULL)
else:
dumped = None
oid = self._unknown_oid
return ps, ts
+ cdef RowDumper _get_row_dumper(self, object param, object fmt):
+ cdef RowDumper row_dumper = RowDumper()
+
+ dumper = self.get_dumper(param, fmt)
+ row_dumper.dumpfunc = dumper.dump
+ row_dumper.oid = dumper.oid
+
+ if isinstance(dumper, CDumper):
+ row_dumper.cdumper = <CDumper>dumper
+ else:
+ row_dumper.cdumper = None
+
+ return row_dumper
+
def load_rows(self, int row0, int row1) -> List[Tuple[Any, ...]]:
if self._pgresult is None:
raise e.InterfaceError("result not set")
# TODO: no copy
b = attval.value[:attval.len]
pyval = PyObject_CallFunctionObjArgs(
- (<RowLoader>loader).pyloader, <PyObject *>b, NULL)
+ (<RowLoader>loader).loadfunc, <PyObject *>b, NULL)
Py_INCREF(pyval)
PyTuple_SET_ITEM(<object>brecord, col, pyval)
# TODO: no copy
b = attval.value[:attval.len]
pyval = PyObject_CallFunctionObjArgs(
- (<RowLoader>loader).pyloader, <PyObject *>b, NULL)
+ (<RowLoader>loader).loadfunc, <PyObject *>b, NULL)
Py_INCREF(pyval)
PyTuple_SET_ITEM(record, col, pyval)
pyval = (<RowLoader>loader).cloader.cload(ptr, size)
else:
pyval = PyObject_CallFunctionObjArgs(
- (<RowLoader>loader).pyloader, <PyObject *>item, NULL)
+ (<RowLoader>loader).loadfunc, <PyObject *>item, NULL)
Py_INCREF(pyval)
PyTuple_SET_ITEM(out, col, pyval)
cur.executemany(query, [(10, "hello"), (20, "world")])
+def test_executemany_null_first(conn):
+ cur = conn.cursor()
+ cur.executemany("select %s, %s", [[1, None], [3, 4]])
+ with pytest.raises(TypeError):
+ cur.executemany("select %s, %s", [[1, ""], [3, 4]])
+
+
def test_rowcount(conn):
cur = conn.cursor()
assert cur.query == b"select $1, $2"
assert cur.params == [b"3", b"4"]
- with pytest.raises(psycopg3.DataError):
+ with pytest.raises((psycopg3.DataError, TypeError)):
cur.executemany("select %s::int", [[1], ["x"], [2]])
assert cur.query == b"select $1::int"
- assert cur.params == [b"x"]
+ # TODO: cannot really check this: after introduced row_dumpers, this
+ # fails dumping, not query passing.
+ # assert cur.params == [b"x"]
class TestColumn:
await cur.executemany(query, [(10, "hello"), (20, "world")])
+async def test_executemany_null_first(aconn):
+ cur = await aconn.cursor()
+ await cur.executemany("select %s, %s", [[1, None], [3, 4]])
+ with pytest.raises(TypeError):
+ await cur.executemany("select %s, %s", [[1, ""], [3, 4]])
+
+
async def test_rowcount(aconn):
cur = await aconn.cursor()
+ await cur.execute("select 1 from generate_series(1, 0)")
+ assert cur.rowcount == 0
+
await cur.execute("select 1 from generate_series(1, 42)")
assert cur.rowcount == 42
assert res == [(1,), (2,), (3,)]
+async def test_iter_stop(aconn):
+ cur = await aconn.cursor()
+ await cur.execute("select generate_series(1, 3)")
+ async for rec in cur:
+ assert rec == (1,)
+ break
+
+ async for rec in cur:
+ assert rec == (2,)
+ break
+
+ assert (await cur.fetchone()) == (3,)
+ async for rec in cur:
+ assert False
+
+
async def test_query_params_execute(aconn):
cur = await aconn.cursor()
assert cur.query is None
assert cur.query == b"select $1, $2"
assert cur.params == [b"3", b"4"]
- with pytest.raises(psycopg3.DataError):
+ with pytest.raises((psycopg3.DataError, TypeError)):
await cur.executemany("select %s::int", [[1], ["x"], [2]])
assert cur.query == b"select $1::int"
- assert cur.params == [b"x"]
-
-
-async def test_iter_stop(aconn):
- cur = await aconn.cursor()
- await cur.execute("select generate_series(1, 3)")
- async for rec in cur:
- assert rec == (1,)
- break
-
- async for rec in cur:
- assert rec == (2,)
- break
-
- assert (await cur.fetchone()) == (3,)
- async for rec in cur:
- assert False
+ # TODO: cannot really check this: after introduced row_dumpers, this
+ # fails dumping, not query passing.
+ # assert cur.params == [b"x"]
async def test_str(aconn):
# pro tip: don't get confused with the types
f1, f2 = conn.execute(f"select {ph}, {ph}", (objs, [])).fetchone()
assert f1 == objs
+ if f2 == "{}":
+ pytest.xfail("text empty arrays don't roundtrip well")
assert f2 == []
+
+
+def test_empty_list_text(conn):
+ cur = conn.cursor()
+ cur.execute("create table test (id serial primary key, data date[])")
+ with conn.transaction():
+ try:
+ cur.execute("insert into test (data) values (%s)", ([],))
+ except psycopg3.errors.DatatypeMismatch:
+ if conn.pgconn.server_version < 100000:
+ pytest.xfail("on PG 9.6 empty arrays are passed as text")
+ else:
+ raise
+ cur.execute("select data from test")
+ assert cur.fetchone() == ([],)
+
+ # test untyped list in a filter
+ cur.execute("select data from test where id = any(%s)", ([1],))
+ assert cur.fetchone()
+ cur.execute("select data from test where id = any(%s)", ([],))
+ assert not cur.fetchone()
+
+
+def test_empty_list_binary(conn):
+ cur = conn.cursor()
+ cur.execute("create table test (id serial primary key, data date[])")
+ with pytest.raises(psycopg3.errors.DatatypeMismatch):
+ with conn.transaction():
+ cur.execute("insert into test (data) values (%b)", ([],))
+ cur.execute("insert into test (data) values (%b::date[])", ([],))
+
+ cur.execute("select data from test")
+ assert cur.fetchone() == ([],)
+
+ # test untyped list in a filter
+ cur.execute("select data from test where id = any(%b)", ([1],))
+ assert cur.fetchone()
+ with pytest.raises(psycopg3.errors.UndefinedFunction):
+ with conn.transaction():
+ cur.execute("select data from test where id = any(%b)", ([],))
+ cur.execute("select data from test where id = any(%b::int[])", ([],))
+ assert not cur.fetchone()
+
+
+@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
+def test_empty_list_after_choice(conn, fmt_in):
+ ph = "%s" if fmt_in == Format.TEXT else "%b"
+ cur = conn.cursor()
+ cur.execute("create table test (id serial primary key, data float[])")
+ cur.executemany(
+ f"insert into test (data) values ({ph})", [([1.0],), ([],)]
+ )
+ cur.execute("select data from test order by id")
+ assert cur.fetchall() == [([1.0],), ([],)]
@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
-@pytest.mark.parametrize("b", [True, False, None])
+@pytest.mark.parametrize("b", [True, False])
def test_roundtrip_bool(conn, b, fmt_in, fmt_out):
cur = conn.cursor(format=fmt_out)
ph = "%s" if fmt_in == Format.TEXT else "%b"