From: Denis Laxalde Date: Tue, 4 Oct 2022 15:53:26 +0000 (+0200) Subject: fix: sync pipeline after implicit BEGIN X-Git-Tag: 3.1.3^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=refs%2Fpull%2F401%2Fhead;p=thirdparty%2Fpsycopg.git fix: sync pipeline after implicit BEGIN Prior to this change, the following code: with conn.pipeline(): conn.execute("select 'x'").fetchone() conn.execute("select 'y'") would produce the following libpq trace: F 13 Parse "" "BEGIN" 0 F 14 Bind "" "" 0 0 1 0 F 6 Describe P "" F 9 Execute "" 0 F 18 Parse "" "select 'x'" 0 F 14 Bind "" "" 0 0 1 0 F 6 Describe P "" F 9 Execute "" 0 F 4 Flush B 4 ParseComplete B 4 BindComplete B 4 NoData B 10 CommandComplete "BEGIN" B 4 ParseComplete B 4 BindComplete B 33 RowDescription 1 "?column?" NNNN 0 NNNN 65535 -1 0 B 11 DataRow 1 1 'x' B 13 CommandComplete "SELECT 1" F 13 Parse "" "BEGIN" 0 F 14 Bind "" "" 0 0 1 0 F 6 Describe P "" F 9 Execute "" 0 F 18 Parse "" "select 'y'" 0 F 14 Bind "" "" 0 0 1 0 F 6 Describe P "" F 9 Execute "" 0 B 4 ParseComplete B 4 BindComplete B 4 NoData B NN NoticeResponse S "WARNING" V "WARNING" C "25001" M "there is already a transaction in progress" F "SSSS" L "SSSS" R "SSSS" \x00 F 4 Sync B 10 CommandComplete "BEGIN" B 4 ParseComplete B 4 BindComplete B 33 RowDescription 1 "?column?" NNNN 0 NNNN 65535 -1 0 B 11 DataRow 1 1 'y' B 13 CommandComplete "SELECT 1" B 5 ReadyForQuery T where we can see that the BEGIN statement (due to the connection being in non-autocommit mode) is emitted twice, as notified by the server. This is because the transaction state after the implicit BEGIN is not "correct" (i.e. should be INTRANS, but is IDLE) since the result from respective statement has not been received yet. By syncing after the BEGIN, we fetch result from this command thus get the transaction state INTRANS for following queries. This is similar to what happens with explicit transaction, by using nested pipelines. --- diff --git a/docs/news.rst b/docs/news.rst index 0d14c1774..bfc968b4c 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -19,6 +19,8 @@ Psycopg 3.1.3 for non-ascii attribute names (:ticket:`#386`). - Fix handling of queries with escaped percent signs (``%%``) in `ClientCursor` (:ticket:`#399`). +- Fix possible duplicated BEGIN statements emitted in pipeline mode + (:ticket:`#401`). Psycopg 3.1.2 diff --git a/psycopg/psycopg/connection.py b/psycopg/psycopg/connection.py index bc15717cd..dbd185b69 100644 --- a/psycopg/psycopg/connection.py +++ b/psycopg/psycopg/connection.py @@ -490,6 +490,8 @@ class BaseConnection(Generic[Row]): return yield from self._exec_command(self._get_tx_start_command()) + if self._pipeline: + yield from self._pipeline._sync_gen() def _get_tx_start_command(self) -> bytes: if self._begin_statement: diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 1ee1ae98c..78d13591a 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -133,8 +133,7 @@ def test_pipeline_processed_at_exit(conn): with conn.pipeline() as p: cur.execute("select 1") - # PQsendQuery[BEGIN], PQsendQuery - assert len(p.result_queue) == 2 + assert len(p.result_queue) == 1 assert cur.fetchone() == (1,) @@ -159,8 +158,7 @@ def test_pipeline(conn): c1.execute("select 1") c2.execute("select 2") - # PQsendQuery[BEGIN], PQsendQuery(2) - assert len(p.result_queue) == 3 + assert len(p.result_queue) == 2 (r1,) = c1.fetchone() assert r1 == 1 @@ -525,6 +523,21 @@ def test_message_0x33(conn): assert not notices +def test_transaction_state_implicit_begin(conn, trace): + # Regression test to ensure that the transaction state is correct after + # the implicit BEGIN statement (in non-autocommit mode). + notices = [] + conn.add_notice_handler(lambda diag: notices.append(diag.message_primary)) + t = trace.trace(conn) + with conn.pipeline(): + conn.execute("select 'x'").fetchone() + conn.execute("select 'y'") + assert not notices + assert [ + e.content[0] for e in t if e.type == "Parse" and b"BEGIN" in e.content[0] + ] == [b' "" "BEGIN" 0'] + + def test_concurrency(conn): with conn.transaction(): conn.execute("drop table if exists pipeline_concurrency") diff --git a/tests/test_pipeline_async.py b/tests/test_pipeline_async.py index 9e026f0c9..337a60fe8 100644 --- a/tests/test_pipeline_async.py +++ b/tests/test_pipeline_async.py @@ -134,8 +134,7 @@ async def test_pipeline_processed_at_exit(aconn): async with aconn.pipeline() as p: await cur.execute("select 1") - # PQsendQuery[BEGIN], PQsendQuery - assert len(p.result_queue) == 2 + assert len(p.result_queue) == 1 assert await cur.fetchone() == (1,) @@ -160,8 +159,7 @@ async def test_pipeline(aconn): await c1.execute("select 1") await c2.execute("select 2") - # PQsendQuery[BEGIN], PQsendQuery(2) - assert len(p.result_queue) == 3 + assert len(p.result_queue) == 2 (r1,) = await c1.fetchone() assert r1 == 1 @@ -529,6 +527,21 @@ async def test_message_0x33(aconn): assert not notices +async def test_transaction_state_implicit_begin(aconn, trace): + # Regression test to ensure that the transaction state is correct after + # the implicit BEGIN statement (in non-autocommit mode). + notices = [] + aconn.add_notice_handler(lambda diag: notices.append(diag.message_primary)) + t = trace.trace(aconn) + async with aconn.pipeline(): + await (await aconn.execute("select 'x'")).fetchone() + await aconn.execute("select 'y'") + assert not notices + assert [ + e.content[0] for e in t if e.type == "Parse" and b"BEGIN" in e.content[0] + ] == [b' "" "BEGIN" 0'] + + async def test_concurrency(aconn): async with aconn.transaction(): await aconn.execute("drop table if exists pipeline_concurrency")