]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
test(crdb): adapt pipeline tests
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 25 May 2022 00:03:37 +0000 (02:03 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 12 Jul 2022 11:58:34 +0000 (12:58 +0100)
Transaction-related tests currently fail.

tests/test_pipeline.py
tests/test_pipeline_async.py

index 2eba02a1afb7a48ae9593d831d6514276b8c5c21..ea11367ff3fdf0edbc0e40126bc5d81934ebcf81 100644 (file)
@@ -115,10 +115,9 @@ def test_server_cursor(conn):
 
 
 def test_cannot_insert_multiple_commands(conn):
-    with pytest.raises(psycopg.errors.SyntaxError) as cm:
+    with pytest.raises((e.SyntaxError, e.InvalidPreparedStatementDefinition)):
         with conn.pipeline():
             conn.execute("select 1; select 2")
-    assert cm.value.sqlstate == "42601"
 
 
 def test_copy(conn):
@@ -267,6 +266,7 @@ def test_errors_raised_on_nested_transaction_exit(conn):
     assert cur2.fetchone() == (2,)
 
 
+@pytest.mark.crdb("skip", reason="deferrable")
 def test_error_on_commit(conn):
     conn.execute(
         """
@@ -305,14 +305,14 @@ def test_executemany(conn):
     )
     with conn.pipeline(), conn.cursor() as cur:
         cur.executemany(
-            "insert into execmanypipeline(num) values (%s) returning id",
+            "insert into execmanypipeline(num) values (%s) returning num",
             [(10,), (20,)],
             returning=True,
         )
         assert cur.rowcount == 2
-        assert cur.fetchone() == (1,)
+        assert cur.fetchone() == (10,)
         assert cur.nextset()
-        assert cur.fetchone() == (2,)
+        assert cur.fetchone() == (20,)
         assert cur.nextset() is None
 
 
@@ -337,6 +337,7 @@ def test_executemany_no_returning(conn):
         assert cur.nextset() is None
 
 
+@pytest.mark.crdb("skip", reason="temp tables")
 def test_executemany_trace(conn, trace):
     conn.autocommit = True
     cur = conn.cursor()
@@ -354,6 +355,7 @@ def test_executemany_trace(conn, trace):
     assert len([i for i in items if i.type == "Sync"]) == 1
 
 
+@pytest.mark.crdb("skip", reason="temp tables")
 def test_executemany_trace_returning(conn, trace):
     conn.autocommit = True
     cur = conn.cursor()
@@ -380,7 +382,9 @@ def test_prepared(conn):
     conn.autocommit = True
     with conn.pipeline():
         c1 = conn.execute("select %s::int", [10], prepare=True)
-        c2 = conn.execute("select count(*) from pg_prepared_statements")
+        c2 = conn.execute(
+            "select count(*) from pg_prepared_statements where name != ''"
+        )
 
         (r,) = c1.fetchone()
         assert r == 10
@@ -394,7 +398,7 @@ def test_auto_prepare(conn):
     conn.prepared_threshold = 5
     with conn.pipeline():
         cursors = [
-            conn.execute("select count(*) from pg_prepared_statements")
+            conn.execute("select count(*) from pg_prepared_statements where name != ''")
             for i in range(10)
         ]
 
@@ -451,9 +455,10 @@ def test_transaction_nested_no_statement(conn):
 
 
 def test_outer_transaction(conn):
+    with conn.transaction():
+        conn.execute("drop table if exists outertx")
     with conn.transaction():
         with conn.pipeline():
-            conn.execute("drop table if exists outertx")
             conn.execute("create table outertx as (select 1)")
             cur = conn.execute("select * from outertx")
     (r,) = cur.fetchone()
@@ -506,18 +511,19 @@ def test_message_0x33(conn):
 def test_concurrency(conn):
     with conn.transaction():
         conn.execute("drop table if exists pipeline_concurrency")
+        conn.execute("drop table if exists accessed")
+    with conn.transaction():
         conn.execute(
             "create unlogged table pipeline_concurrency ("
             " id serial primary key,"
             " value integer"
             ")"
         )
-        conn.execute("drop table if exists accessed")
         conn.execute("create unlogged table accessed as (select now() as value)")
 
     def update(value):
         cur = conn.execute(
-            "insert into pipeline_concurrency(value) values (%s) returning id",
+            "insert into pipeline_concurrency(value) values (%s) returning value",
             (value,),
         )
         conn.execute("update accessed set value = now()")
index b5e8c5484d68ddc3bb8f56e6323fb86a6382d980..8b67abd1e6464e508d2c755ca68457a50798497d 100644 (file)
@@ -116,10 +116,9 @@ async def test_server_cursor(aconn):
 
 
 async def test_cannot_insert_multiple_commands(aconn):
-    with pytest.raises(psycopg.errors.SyntaxError) as cm:
+    with pytest.raises((e.SyntaxError, e.InvalidPreparedStatementDefinition)):
         async with aconn.pipeline():
             await aconn.execute("select 1; select 2")
-    assert cm.value.sqlstate == "42601"
 
 
 async def test_copy(aconn):
@@ -268,6 +267,7 @@ async def test_errors_raised_on_nested_transaction_exit(aconn):
     assert await cur2.fetchone() == (2,)
 
 
+@pytest.mark.crdb("skip", reason="deferrable")
 async def test_error_on_commit(aconn):
     await aconn.execute(
         """
@@ -306,14 +306,14 @@ async def test_executemany(aconn):
     )
     async with aconn.pipeline(), aconn.cursor() as cur:
         await cur.executemany(
-            "insert into execmanypipeline(num) values (%s) returning id",
+            "insert into execmanypipeline(num) values (%s) returning num",
             [(10,), (20,)],
             returning=True,
         )
         assert cur.rowcount == 2
-        assert (await cur.fetchone()) == (1,)
+        assert (await cur.fetchone()) == (10,)
         assert cur.nextset()
-        assert (await cur.fetchone()) == (2,)
+        assert (await cur.fetchone()) == (20,)
         assert cur.nextset() is None
 
 
@@ -338,6 +338,7 @@ async def test_executemany_no_returning(aconn):
         assert cur.nextset() is None
 
 
+@pytest.mark.crdb("skip", reason="temp tables")
 async def test_executemany_trace(aconn, trace):
     await aconn.set_autocommit(True)
     cur = aconn.cursor()
@@ -355,6 +356,7 @@ async def test_executemany_trace(aconn, trace):
     assert len([i for i in items if i.type == "Sync"]) == 1
 
 
+@pytest.mark.crdb("skip", reason="temp tables")
 async def test_executemany_trace_returning(aconn, trace):
     await aconn.set_autocommit(True)
     cur = aconn.cursor()
@@ -381,7 +383,9 @@ async def test_prepared(aconn):
     await aconn.set_autocommit(True)
     async with aconn.pipeline():
         c1 = await aconn.execute("select %s::int", [10], prepare=True)
-        c2 = await aconn.execute("select count(*) from pg_prepared_statements")
+        c2 = await aconn.execute(
+            "select count(*) from pg_prepared_statements where name != ''"
+        )
 
         (r,) = await c1.fetchone()
         assert r == 10
@@ -394,7 +398,9 @@ async def test_auto_prepare(aconn):
     aconn.prepared_threshold = 5
     async with aconn.pipeline():
         cursors = [
-            await aconn.execute("select count(*) from pg_prepared_statements")
+            await aconn.execute(
+                "select count(*) from pg_prepared_statements where name != ''"
+            )
             for i in range(10)
         ]
 
@@ -451,9 +457,10 @@ async def test_transaction_nested_no_statement(aconn):
 
 
 async def test_outer_transaction(aconn):
+    async with aconn.transaction():
+        await aconn.execute("drop table if exists outertx")
     async with aconn.transaction():
         async with aconn.pipeline():
-            await aconn.execute("drop table if exists outertx")
             await aconn.execute("create table outertx as (select 1)")
             cur = await aconn.execute("select * from outertx")
     (r,) = await cur.fetchone()
@@ -508,18 +515,19 @@ async def test_message_0x33(aconn):
 async def test_concurrency(aconn):
     async with aconn.transaction():
         await aconn.execute("drop table if exists pipeline_concurrency")
+        await aconn.execute("drop table if exists accessed")
+    async with aconn.transaction():
         await aconn.execute(
             "create unlogged table pipeline_concurrency ("
             " id serial primary key,"
             " value integer"
             ")"
         )
-        await aconn.execute("drop table if exists accessed")
         await aconn.execute("create unlogged table accessed as (select now() as value)")
 
     async def update(value):
         cur = await aconn.execute(
-            "insert into pipeline_concurrency(value) values (%s) returning id",
+            "insert into pipeline_concurrency(value) values (%s) returning value",
             (value,),
         )
         await aconn.execute("update accessed set value = now()")