]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Fixed tests with PostgreSQL 9.6 and previous
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Thu, 29 Oct 2020 14:06:19 +0000 (15:06 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Thu, 29 Oct 2020 14:06:19 +0000 (15:06 +0100)
Such database version doesn't allow to emit unknown oid literals.
However specifying a different type for unknown, such as text, will fail
as soon as the value is used in a typed context, which is a far more
useful use case.

tests/test_cursor.py
tests/test_cursor_async.py
tests/types/test_singletons.py

index 9a9038dca8ac856391c3067589a5bda347ff6fcd..8ffbb3cc56415c51b5ae6baeeefd1a61fe9ecc51 100644 (file)
@@ -63,7 +63,8 @@ def test_execute_many_results(conn):
 
 def test_execute_sequence(conn):
     cur = conn.cursor()
-    rv = cur.execute("select %s, %s, %s", [1, "foo", None])
+    cast = "::text" if conn.pgconn.server_version < 100000 else ""
+    rv = cur.execute(f"select %s, %s, %s{cast}", [1, "foo", None])
     assert rv is cur
     assert len(cur._results) == 1
     assert cur.pgresult.get_value(0, 0) == b"1"
@@ -83,7 +84,8 @@ def test_execute_empty_query(conn, query):
 
 def test_fetchone(conn):
     cur = conn.cursor()
-    cur.execute("select %s, %s, %s", [1, "foo", None])
+    cast = "::text" if conn.pgconn.server_version < 100000 else ""
+    cur.execute(f"select %s, %s, %s{cast}", [1, "foo", None])
     assert cur.pgresult.fformat(0) == 0
 
     row = cur.fetchone()
@@ -96,7 +98,8 @@ def test_fetchone(conn):
 
 def test_execute_binary_result(conn):
     cur = conn.cursor(format=psycopg3.pq.Format.BINARY)
-    cur.execute("select %s, %s", ["foo", None])
+    cast = "::text" if conn.pgconn.server_version < 100000 else ""
+    cur.execute(f"select %s, %s{cast}", ["foo", None])
     assert cur.pgresult.fformat(0) == 1
 
     row = cur.fetchone()
index dabada93f4ae020a5f7c7f76d77c62dee75a895a..3462b093bcb5fdf343c45797fa3fec3656a202b0 100644 (file)
@@ -66,7 +66,8 @@ async def test_execute_many_results(aconn):
 
 async def test_execute_sequence(aconn):
     cur = aconn.cursor()
-    rv = await cur.execute("select %s, %s, %s", [1, "foo", None])
+    cast = "::text" if aconn.pgconn.server_version < 100000 else ""
+    rv = await cur.execute(f"select %s, %s, %s{cast}", [1, "foo", None])
     assert rv is cur
     assert len(cur._results) == 1
     assert cur.pgresult.get_value(0, 0) == b"1"
@@ -86,7 +87,8 @@ async def test_execute_empty_query(aconn, query):
 
 async def test_fetchone(aconn):
     cur = aconn.cursor()
-    await cur.execute("select %s, %s, %s", [1, "foo", None])
+    cast = "::text" if aconn.pgconn.server_version < 100000 else ""
+    await cur.execute(f"select %s, %s, %s{cast}", [1, "foo", None])
     assert cur.pgresult.fformat(0) == 0
 
     row = await cur.fetchone()
@@ -99,7 +101,8 @@ async def test_fetchone(aconn):
 
 async def test_execute_binary_result(aconn):
     cur = aconn.cursor(format=psycopg3.pq.Format.BINARY)
-    await cur.execute("select %s, %s", ["foo", None])
+    cast = "::text" if aconn.pgconn.server_version < 100000 else ""
+    await cur.execute(f"select %s, %s{cast}", ["foo", None])
     assert cur.pgresult.fformat(0) == 1
 
     row = await cur.fetchone()
index 14ff018d8c1ccd8095f2037c7afc5b529f89030f..c3c41e2837801b6053c689aee17979eb679bacac 100644 (file)
@@ -11,7 +11,8 @@ from psycopg3.types import builtins
 def test_roundtrip_bool(conn, b, fmt_in, fmt_out):
     cur = conn.cursor(format=fmt_out)
     ph = "%s" if fmt_in == Format.TEXT else "%b"
-    result = cur.execute(f"select {ph}", (b,)).fetchone()[0]
+    cast = "" if conn.pgconn.server_version > 100000 else "::bool"
+    result = cur.execute(f"select {ph}{cast}", (b,)).fetchone()[0]
     assert cur.pgresult.fformat(0) == fmt_out
     if b is not None:
         assert cur.pgresult.ftype(0) == builtins["bool"].oid