From: Daniele Varrazzo Date: Wed, 25 May 2022 00:03:37 +0000 (+0200) Subject: test(crdb): adapt pipeline tests X-Git-Tag: 3.1~49^2~45 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=674444b1f7c519e4cf4527d9e4e7b2d78aed75c5;p=thirdparty%2Fpsycopg.git test(crdb): adapt pipeline tests Transaction-related tests currently fail. --- diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 2eba02a1a..ea11367ff 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -115,10 +115,9 @@ def test_server_cursor(conn): def test_cannot_insert_multiple_commands(conn): - with pytest.raises(psycopg.errors.SyntaxError) as cm: + with pytest.raises((e.SyntaxError, e.InvalidPreparedStatementDefinition)): with conn.pipeline(): conn.execute("select 1; select 2") - assert cm.value.sqlstate == "42601" def test_copy(conn): @@ -267,6 +266,7 @@ def test_errors_raised_on_nested_transaction_exit(conn): assert cur2.fetchone() == (2,) +@pytest.mark.crdb("skip", reason="deferrable") def test_error_on_commit(conn): conn.execute( """ @@ -305,14 +305,14 @@ def test_executemany(conn): ) with conn.pipeline(), conn.cursor() as cur: cur.executemany( - "insert into execmanypipeline(num) values (%s) returning id", + "insert into execmanypipeline(num) values (%s) returning num", [(10,), (20,)], returning=True, ) assert cur.rowcount == 2 - assert cur.fetchone() == (1,) + assert cur.fetchone() == (10,) assert cur.nextset() - assert cur.fetchone() == (2,) + assert cur.fetchone() == (20,) assert cur.nextset() is None @@ -337,6 +337,7 @@ def test_executemany_no_returning(conn): assert cur.nextset() is None +@pytest.mark.crdb("skip", reason="temp tables") def test_executemany_trace(conn, trace): conn.autocommit = True cur = conn.cursor() @@ -354,6 +355,7 @@ def test_executemany_trace(conn, trace): assert len([i for i in items if i.type == "Sync"]) == 1 +@pytest.mark.crdb("skip", reason="temp tables") def test_executemany_trace_returning(conn, trace): conn.autocommit = True cur = conn.cursor() @@ -380,7 +382,9 @@ def test_prepared(conn): conn.autocommit = True with conn.pipeline(): c1 = conn.execute("select %s::int", [10], prepare=True) - c2 = conn.execute("select count(*) from pg_prepared_statements") + c2 = conn.execute( + "select count(*) from pg_prepared_statements where name != ''" + ) (r,) = c1.fetchone() assert r == 10 @@ -394,7 +398,7 @@ def test_auto_prepare(conn): conn.prepared_threshold = 5 with conn.pipeline(): cursors = [ - conn.execute("select count(*) from pg_prepared_statements") + conn.execute("select count(*) from pg_prepared_statements where name != ''") for i in range(10) ] @@ -451,9 +455,10 @@ def test_transaction_nested_no_statement(conn): def test_outer_transaction(conn): + with conn.transaction(): + conn.execute("drop table if exists outertx") with conn.transaction(): with conn.pipeline(): - conn.execute("drop table if exists outertx") conn.execute("create table outertx as (select 1)") cur = conn.execute("select * from outertx") (r,) = cur.fetchone() @@ -506,18 +511,19 @@ def test_message_0x33(conn): def test_concurrency(conn): with conn.transaction(): conn.execute("drop table if exists pipeline_concurrency") + conn.execute("drop table if exists accessed") + with conn.transaction(): conn.execute( "create unlogged table pipeline_concurrency (" " id serial primary key," " value integer" ")" ) - conn.execute("drop table if exists accessed") conn.execute("create unlogged table accessed as (select now() as value)") def update(value): cur = conn.execute( - "insert into pipeline_concurrency(value) values (%s) returning id", + "insert into pipeline_concurrency(value) values (%s) returning value", (value,), ) conn.execute("update accessed set value = now()") diff --git a/tests/test_pipeline_async.py b/tests/test_pipeline_async.py index b5e8c5484..8b67abd1e 100644 --- a/tests/test_pipeline_async.py +++ b/tests/test_pipeline_async.py @@ -116,10 +116,9 @@ async def test_server_cursor(aconn): async def test_cannot_insert_multiple_commands(aconn): - with pytest.raises(psycopg.errors.SyntaxError) as cm: + with pytest.raises((e.SyntaxError, e.InvalidPreparedStatementDefinition)): async with aconn.pipeline(): await aconn.execute("select 1; select 2") - assert cm.value.sqlstate == "42601" async def test_copy(aconn): @@ -268,6 +267,7 @@ async def test_errors_raised_on_nested_transaction_exit(aconn): assert await cur2.fetchone() == (2,) +@pytest.mark.crdb("skip", reason="deferrable") async def test_error_on_commit(aconn): await aconn.execute( """ @@ -306,14 +306,14 @@ async def test_executemany(aconn): ) async with aconn.pipeline(), aconn.cursor() as cur: await cur.executemany( - "insert into execmanypipeline(num) values (%s) returning id", + "insert into execmanypipeline(num) values (%s) returning num", [(10,), (20,)], returning=True, ) assert cur.rowcount == 2 - assert (await cur.fetchone()) == (1,) + assert (await cur.fetchone()) == (10,) assert cur.nextset() - assert (await cur.fetchone()) == (2,) + assert (await cur.fetchone()) == (20,) assert cur.nextset() is None @@ -338,6 +338,7 @@ async def test_executemany_no_returning(aconn): assert cur.nextset() is None +@pytest.mark.crdb("skip", reason="temp tables") async def test_executemany_trace(aconn, trace): await aconn.set_autocommit(True) cur = aconn.cursor() @@ -355,6 +356,7 @@ async def test_executemany_trace(aconn, trace): assert len([i for i in items if i.type == "Sync"]) == 1 +@pytest.mark.crdb("skip", reason="temp tables") async def test_executemany_trace_returning(aconn, trace): await aconn.set_autocommit(True) cur = aconn.cursor() @@ -381,7 +383,9 @@ async def test_prepared(aconn): await aconn.set_autocommit(True) async with aconn.pipeline(): c1 = await aconn.execute("select %s::int", [10], prepare=True) - c2 = await aconn.execute("select count(*) from pg_prepared_statements") + c2 = await aconn.execute( + "select count(*) from pg_prepared_statements where name != ''" + ) (r,) = await c1.fetchone() assert r == 10 @@ -394,7 +398,9 @@ async def test_auto_prepare(aconn): aconn.prepared_threshold = 5 async with aconn.pipeline(): cursors = [ - await aconn.execute("select count(*) from pg_prepared_statements") + await aconn.execute( + "select count(*) from pg_prepared_statements where name != ''" + ) for i in range(10) ] @@ -451,9 +457,10 @@ async def test_transaction_nested_no_statement(aconn): async def test_outer_transaction(aconn): + async with aconn.transaction(): + await aconn.execute("drop table if exists outertx") async with aconn.transaction(): async with aconn.pipeline(): - await aconn.execute("drop table if exists outertx") await aconn.execute("create table outertx as (select 1)") cur = await aconn.execute("select * from outertx") (r,) = await cur.fetchone() @@ -508,18 +515,19 @@ async def test_message_0x33(aconn): async def test_concurrency(aconn): async with aconn.transaction(): await aconn.execute("drop table if exists pipeline_concurrency") + await aconn.execute("drop table if exists accessed") + async with aconn.transaction(): await aconn.execute( "create unlogged table pipeline_concurrency (" " id serial primary key," " value integer" ")" ) - await aconn.execute("drop table if exists accessed") await aconn.execute("create unlogged table accessed as (select now() as value)") async def update(value): cur = await aconn.execute( - "insert into pipeline_concurrency(value) values (%s) returning id", + "insert into pipeline_concurrency(value) values (%s) returning value", (value,), ) await aconn.execute("update accessed set value = now()")