]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: sync pipeline after implicit BEGIN 401/head
authorDenis Laxalde <denis.laxalde@dalibo.com>
Tue, 4 Oct 2022 15:53:26 +0000 (17:53 +0200)
committerDenis Laxalde <denis.laxalde@dalibo.com>
Tue, 4 Oct 2022 17:22:02 +0000 (19:22 +0200)
Prior to this change, the following code:

    with conn.pipeline():
       conn.execute("select 'x'").fetchone()
       conn.execute("select 'y'")

would produce the following libpq trace:

    F 13 Parse  "" "BEGIN" 0
    F 14 Bind  "" "" 0 0 1 0
    F 6 Describe  P ""
    F 9 Execute  "" 0
    F 18 Parse  "" "select 'x'" 0
    F 14 Bind  "" "" 0 0 1 0
    F 6 Describe  P ""
    F 9 Execute  "" 0
    F 4 Flush
    B 4 ParseComplete
    B 4 BindComplete
    B 4 NoData
    B 10 CommandComplete  "BEGIN"
    B 4 ParseComplete
    B 4 BindComplete
    B 33 RowDescription  1 "?column?" NNNN 0 NNNN 65535 -1 0
    B 11 DataRow  1 1 'x'
    B 13 CommandComplete  "SELECT 1"
    F 13 Parse  "" "BEGIN" 0
    F 14 Bind  "" "" 0 0 1 0
    F 6 Describe  P ""
    F 9 Execute  "" 0
    F 18 Parse  "" "select 'y'" 0
    F 14 Bind  "" "" 0 0 1 0
    F 6 Describe  P ""
    F 9 Execute  "" 0
    B 4 ParseComplete
    B 4 BindComplete
    B 4 NoData
    B NN NoticeResponse  S "WARNING" V "WARNING" C "25001" M "there is already a transaction in progress" F "SSSS" L "SSSS" R "SSSS" \x00
    F 4 Sync
    B 10 CommandComplete  "BEGIN"
    B 4 ParseComplete
    B 4 BindComplete
    B 33 RowDescription  1 "?column?" NNNN 0 NNNN 65535 -1 0
    B 11 DataRow  1 1 'y'
    B 13 CommandComplete  "SELECT 1"
    B 5 ReadyForQuery  T

where we can see that the BEGIN statement (due to the connection being
in non-autocommit mode) is emitted twice, as notified by the server.

This is because the transaction state after the implicit BEGIN is not
"correct" (i.e. should be INTRANS, but is IDLE) since the result from
respective statement has not been received yet.

By syncing after the BEGIN, we fetch result from this command thus get
the transaction state INTRANS for following queries. This is similar to
what happens with explicit transaction, by using nested pipelines.

docs/news.rst
psycopg/psycopg/connection.py
tests/test_pipeline.py
tests/test_pipeline_async.py

index 0d14c1774fce63e2f2890bd2591307ff2545c39c..bfc968b4cc058b2ec521d402d2b043c08e9603fe 100644 (file)
@@ -19,6 +19,8 @@ Psycopg 3.1.3
   for non-ascii attribute names (:ticket:`#386`).
 - Fix handling of queries with escaped percent signs (``%%``) in `ClientCursor`
   (:ticket:`#399`).
+- Fix possible duplicated BEGIN statements emitted in pipeline mode
+  (:ticket:`#401`).
 
 
 Psycopg 3.1.2
index bc15717cda21ef0116042ee89fd763b64defc015..dbd185b69284d7f34dc0bb4bbc76aff7af734fbc 100644 (file)
@@ -490,6 +490,8 @@ class BaseConnection(Generic[Row]):
             return
 
         yield from self._exec_command(self._get_tx_start_command())
+        if self._pipeline:
+            yield from self._pipeline._sync_gen()
 
     def _get_tx_start_command(self) -> bytes:
         if self._begin_statement:
index 1ee1ae98c4f937045bb32ab727ae2b7fe9ae26b9..78d13591a1b4a9eb915b62071559c3e9ad7d93fe 100644 (file)
@@ -133,8 +133,7 @@ def test_pipeline_processed_at_exit(conn):
         with conn.pipeline() as p:
             cur.execute("select 1")
 
-            # PQsendQuery[BEGIN], PQsendQuery
-            assert len(p.result_queue) == 2
+            assert len(p.result_queue) == 1
 
         assert cur.fetchone() == (1,)
 
@@ -159,8 +158,7 @@ def test_pipeline(conn):
         c1.execute("select 1")
         c2.execute("select 2")
 
-        # PQsendQuery[BEGIN], PQsendQuery(2)
-        assert len(p.result_queue) == 3
+        assert len(p.result_queue) == 2
 
         (r1,) = c1.fetchone()
         assert r1 == 1
@@ -525,6 +523,21 @@ def test_message_0x33(conn):
     assert not notices
 
 
+def test_transaction_state_implicit_begin(conn, trace):
+    # Regression test to ensure that the transaction state is correct after
+    # the implicit BEGIN statement (in non-autocommit mode).
+    notices = []
+    conn.add_notice_handler(lambda diag: notices.append(diag.message_primary))
+    t = trace.trace(conn)
+    with conn.pipeline():
+        conn.execute("select 'x'").fetchone()
+        conn.execute("select 'y'")
+    assert not notices
+    assert [
+        e.content[0] for e in t if e.type == "Parse" and b"BEGIN" in e.content[0]
+    ] == [b' "" "BEGIN" 0']
+
+
 def test_concurrency(conn):
     with conn.transaction():
         conn.execute("drop table if exists pipeline_concurrency")
index 9e026f0c9abcde4dfca1ddc64066d8f4330b44a8..337a60fe8f4faaf85f17d42cd5f1f45ec211f88a 100644 (file)
@@ -134,8 +134,7 @@ async def test_pipeline_processed_at_exit(aconn):
         async with aconn.pipeline() as p:
             await cur.execute("select 1")
 
-            # PQsendQuery[BEGIN], PQsendQuery
-            assert len(p.result_queue) == 2
+            assert len(p.result_queue) == 1
 
         assert await cur.fetchone() == (1,)
 
@@ -160,8 +159,7 @@ async def test_pipeline(aconn):
         await c1.execute("select 1")
         await c2.execute("select 2")
 
-        # PQsendQuery[BEGIN], PQsendQuery(2)
-        assert len(p.result_queue) == 3
+        assert len(p.result_queue) == 2
 
         (r1,) = await c1.fetchone()
         assert r1 == 1
@@ -529,6 +527,21 @@ async def test_message_0x33(aconn):
     assert not notices
 
 
+async def test_transaction_state_implicit_begin(aconn, trace):
+    # Regression test to ensure that the transaction state is correct after
+    # the implicit BEGIN statement (in non-autocommit mode).
+    notices = []
+    aconn.add_notice_handler(lambda diag: notices.append(diag.message_primary))
+    t = trace.trace(aconn)
+    async with aconn.pipeline():
+        await (await aconn.execute("select 'x'")).fetchone()
+        await aconn.execute("select 'y'")
+    assert not notices
+    assert [
+        e.content[0] for e in t if e.type == "Parse" and b"BEGIN" in e.content[0]
+    ] == [b' "" "BEGIN" 0']
+
+
 async def test_concurrency(aconn):
     async with aconn.transaction():
         await aconn.execute("drop table if exists pipeline_concurrency")