- python: 3.6
addons:
- postgresql: '9.5'
+ postgresql: '10'
apt:
packages:
- - postgresql-client-9.5
+ - postgresql-client-10
env:
- TOXENV=py36
- TOXDIR=psycopg3_c
- - PGVER=9.5
+ - PGVER=10
- PSYCOPG3_IMPL=c
- PGPORT=5432
# skip tests failing on importing psycopg3_c.pq on subprocess
- python: 3.6
addons:
- postgresql: '9.6'
+ postgresql: '11'
apt:
packages:
- - postgresql-client-9.6
+ - postgresql-11
+ - postgresql-client-11
env:
- TOXENV=py36
- TOXDIR=psycopg3
- - PGVER=9.6
+ - PGVER=11
- PSYCOPG3_IMPL=python
- - PGPORT=5432
+ - PGPORT=5433
- python: 3.7
addons:
- postgresql: '10'
+ postgresql: '11'
apt:
packages:
- - postgresql-client-10
+ - postgresql-11
+ - postgresql-client-11
env:
- TOXENV=py37
- TOXDIR=psycopg3_c
- - PGVER=10
+ - PGVER=11
- PSYCOPG3_IMPL=c
- - PGPORT=5432
+ - PGPORT=5433
- PYTEST_ADDOPTS="-m 'not subprocess'"
- python: 3.7
addons:
- postgresql: '11'
+ postgresql: '12'
apt:
packages:
- - postgresql-11
- - postgresql-client-11
+ - postgresql-12
+ - postgresql-client-12
env:
- TOXENV=py37
- TOXDIR=psycopg3
from . import errors as e
from .pq import Format
-from .oids import INVALID_OID, TEXT_OID
+from .oids import INVALID_OID
from .proto import LoadFunc, AdaptContext
if TYPE_CHECKING:
_pgresult: Optional["PGresult"] = None
def __init__(self, context: Optional[AdaptContext] = None):
- self._unknown_oid = INVALID_OID
# WARNING: don't store context, or you'll create a loop with the Cursor
if context:
self._adapters = context.adapters
- conn = self._connection = context.connection
-
- # PG 9.6 gives an error if an unknown oid is emitted as column
- if conn and conn.pgconn.server_version < 100000:
- self._unknown_oid = TEXT_OID
+ self._connection = context.connection
else:
from .adapt import global_adapters
# representation for unknown array, so let's dump it as text[].
# This means that placeholders receiving a binary array should
# be almost always cast to the target type.
- d.oid = self._unknown_oid
+ d.oid = INVALID_OID
self._dumpers_cache[format][key] = d
return d
from . import pq
from . import proto
from .pq import Format as Format
-from .oids import builtins, TEXT_OID
+from .oids import builtins
from .proto import AdaptContext
if TYPE_CHECKING:
self.oid = self._oid
"""The oid to pass to the server, if known."""
- # Postgres 9.6 doesn't deal well with unknown oids
- if (
- not self.oid
- and self.connection
- and self.connection.pgconn.server_version < 100000
- ):
- self.oid = TEXT_OID
-
@abstractmethod
def dump(self, obj: Any) -> bytes:
"""Convert the object *obj* to PostgreSQL representation."""
conn = context.connection if context is not None else None
self._pgconn = conn.pgconn if conn is not None else None
- # default oid is implicitly set to 0, subclasses may override it
- # PG 9.6 goes a bit bonker sending unknown oids, so use text instead
- # (this does cause side effect, and requres casts more often than >= 10)
- if (
- self.oid == 0
- and self._pgconn is not None
- and self._pgconn.server_version < 100000
- ):
- self.oid = oids.TEXT_OID
-
cdef Py_ssize_t cdump(self, obj, bytearray rv, Py_ssize_t offset) except -1:
"""Store the Postgres representation *obj* into *rv* at *offset*
cdef int _nfields, _ntuples
cdef list _row_dumpers
cdef list _row_loaders
- cdef int _unknown_oid
def __cinit__(self, context: Optional["AdaptContext"] = None):
- self._unknown_oid = oids.INVALID_OID
if context is not None:
self.adapters = context.adapters
self.connection = context.connection
-
- # PG 9.6 gives an error if an unknown oid is emitted as column
- if self.connection and self.connection.pgconn.server_version < 100000:
- self._unknown_oid = oids.TEXT_OID
else:
from psycopg3.adapt import global_adapters
self.adapters = global_adapters
# representation for unknown array, so let's dump it as text[].
# This means that placeholders receiving a binary array should
# be almost always cast to the target type.
- d.oid = self._unknown_oid
+ d.oid = oids.INVALID_OID
PyDict_SetItem(cache, key, d)
return d
dempty = t.get_dumper([], fmt_out)
assert t.get_dumper([None, [None]], fmt_out) is dempty
if fmt_out == Format.TEXT:
- if conn.pgconn.server_version >= 100000:
- assert dempty.oid == 0
- else:
- assert dempty.oid == builtins["text"].oid
+ assert dempty.oid == 0
else:
assert dempty.oid == builtins["text"].array_oid
assert dempty.sub_oid == builtins["text"].oid
cur = conn.cursor()
# Currently string are passed as unknown oid to libpq. This is because
# unknown is more easily cast by postgres to different types (see jsonb
- # later). However Postgres < 10 refuses to emit unknown types.
- if conn.pgconn.server_version >= 100000:
- cur.execute("select %s, %s", ["hello", 10])
- assert cur.fetchone() == ("hello", 10)
- else:
- # We used to tolerate an error on roundtrip for unknown on pg < 10
- # however after introducing prepared statements the error happens
- # in every context, so now we cannot just use unknown oid on PG < 10
- # with pytest.raises(psycopg3.errors.IndeterminateDatatype):
- # cur.execute("select %s, %s", ["hello", 10])
- # conn.rollback()
- # cur.execute("select %s::text, %s", ["hello", 10])
- cur.execute("select %s, %s", ["hello", 10])
- assert cur.fetchone() == ("hello", 10)
-
- # It would be nice if above all postgres version behaved consistently.
- # However this below shouldn't break either.
- # (unfortunately it does: a cast is required for pre 10 versions)
- cast = "" if conn.pgconn.server_version >= 100000 else "::jsonb"
+ # later).
+ cur.execute("select %s, %s", ["hello", 10])
+ assert cur.fetchone() == ("hello", 10)
+
cur.execute("create table testjson(data jsonb)")
- cur.execute(f"insert into testjson (data) values (%s{cast})", ["{}"])
+ cur.execute("insert into testjson (data) values (%s)", ["{}"])
assert cur.execute("select data from testjson").fetchone() == ({},)
def test_execute_binary_result(conn):
- cur = conn.cursor(format=psycopg3.pq.Format.BINARY)
+ cur = conn.cursor(format=Format.BINARY)
cur.execute("select %s::text, %s::text", ["foo", None])
assert cur.pgresult.fformat(0) == 1
cur.executemany(query, [(10, "hello"), (20, "world")])
-def test_executemany_null_first(conn):
+@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
+def test_executemany_null_first(conn, fmt):
+ ph = "%s" if fmt == Format.TEXT else "%b"
cur = conn.cursor()
- cur.executemany("select %s, %s", [[1, None], [3, 4]])
- with pytest.raises(TypeError):
- cur.executemany("select %s, %s", [[1, ""], [3, 4]])
+ cur.execute("create table testmany (a bigint, b bigint)")
+ cur.executemany(
+ f"insert into testmany values ({ph}, {ph})", [[1, None], [3, 4]]
+ )
+ with pytest.raises(
+ (
+ psycopg3.errors.InvalidTextRepresentation,
+ psycopg3.errors.ProtocolViolation,
+ )
+ ):
+ cur.executemany(
+ f"insert into testmany values ({ph}, {ph})", [[1, ""], [3, 4]]
+ )
def test_rowcount(conn):
import weakref
import psycopg3
+from psycopg3.adapt import Format
pytestmark = pytest.mark.asyncio
async def test_execute_binary_result(aconn):
- cur = await aconn.cursor(format=psycopg3.pq.Format.BINARY)
+ cur = await aconn.cursor(format=Format.BINARY)
await cur.execute("select %s::text, %s::text", ["foo", None])
assert cur.pgresult.fformat(0) == 1
await cur.executemany(query, [(10, "hello"), (20, "world")])
-async def test_executemany_null_first(aconn):
+@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
+async def test_executemany_null_first(aconn, fmt):
+ ph = "%s" if fmt == Format.TEXT else "%b"
cur = await aconn.cursor()
- await cur.executemany("select %s, %s", [[1, None], [3, 4]])
- with pytest.raises(TypeError):
- await cur.executemany("select %s, %s", [[1, ""], [3, 4]])
+ await cur.execute("create table testmany (a bigint, b bigint)")
+ await cur.executemany(
+ f"insert into testmany values ({ph}, {ph})", [[1, None], [3, 4]]
+ )
+ with pytest.raises(
+ (
+ psycopg3.errors.InvalidTextRepresentation,
+ psycopg3.errors.ProtocolViolation,
+ )
+ ):
+ await cur.executemany(
+ f"insert into testmany values ({ph}, {ph})", [[1, ""], [3, 4]]
+ )
async def test_rowcount(aconn):
def test_untyped_json(conn):
conn.prepare_threshold = 1
conn.execute("create table testjson(data jsonb)")
- if conn.pgconn.server_version >= 100000:
- cast, t = "", "jsonb"
- else:
- cast, t = "::jsonb", "text"
for i in range(2):
- conn.execute(f"insert into testjson (data) values (%s{cast})", ["{}"])
+ conn.execute("insert into testjson (data) values (%s)", ["{}"])
cur = conn.execute("select parameter_types from pg_prepared_statements")
- assert cur.fetchall() == [([t],)]
+ assert cur.fetchall() == [(["jsonb"],)]
async def test_untyped_json(aconn):
aconn.prepare_threshold = 1
await aconn.execute("create table testjson(data jsonb)")
- if aconn.pgconn.server_version >= 100000:
- cast, t = "", "jsonb"
- else:
- cast, t = "::jsonb", "text"
-
for i in range(2):
- await aconn.execute(
- f"insert into testjson (data) values (%s{cast})", ["{}"]
- )
+ await aconn.execute("insert into testjson (data) values (%s)", ["{}"])
cur = await aconn.execute(
"select parameter_types from pg_prepared_statements"
)
- assert await cur.fetchall() == [([t],)]
+ assert await cur.fetchall() == [(["jsonb"],)]
ph = "%s" if fmt_in == Format.TEXT else "%b"
objs = list(range(3))
# pro tip: don't get confused with the types
- f1, f2 = conn.execute(f"select {ph}, {ph}", (objs, [])).fetchone()
+ conn.execute("create table testarrays (col1 bigint[], col2 bigint[])")
+ f1, f2 = conn.execute(
+ f"insert into testarrays values ({ph}, {ph}) returning *", (objs, [])
+ ).fetchone()
assert f1 == objs
- if f2 == "{}":
- pytest.xfail("text empty arrays don't roundtrip well")
assert f2 == []
cur = conn.cursor()
cur.execute("create table test (id serial primary key, data date[])")
with conn.transaction():
- try:
- cur.execute("insert into test (data) values (%s)", ([],))
- except psycopg3.errors.DatatypeMismatch:
- if conn.pgconn.server_version < 100000:
- pytest.xfail("on PG 9.6 empty arrays are passed as text")
- else:
- raise
+ cur.execute("insert into test (data) values (%s)", ([],))
cur.execute("select data from test")
assert cur.fetchone() == ([],)
def test_roundtrip_bool(conn, b, fmt_in, fmt_out):
cur = conn.cursor(format=fmt_out)
ph = "%s" if fmt_in == Format.TEXT else "%b"
- cast = "" if conn.pgconn.server_version > 100000 else "::bool"
- result = cur.execute(f"select {ph}{cast}", (b,)).fetchone()[0]
+ result = cur.execute(f"select {ph}", (b,)).fetchone()[0]
assert cur.pgresult.fformat(0) == fmt_out
if b is not None:
assert cur.pgresult.ftype(0) == builtins["bool"].oid