]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: also sync nested pipeline when a transaction is active
authorDenis Laxalde <denis@laxalde.org>
Thu, 15 Sep 2022 16:28:21 +0000 (18:28 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 16 Sep 2022 22:43:45 +0000 (23:43 +0100)
Upon enter, transaction() checks the in-transaction status to determine
if a BEGIN (when IDLE) or a SAVEPOINT (otherwise) should be inserted.
However, in pipeline mode, if the pipeline is in "implicit transaction"
(i.e. it has no active BEGIN) and statements have been sent, the
in-transaction status might be ACTIVE if those statements have not yet
completed (typically, when results have not been fetched).

By issuing a sync() before entering a nested pipeline (which will happen
when entering transaction(), if already within a pipeline), we force
completion of those statements, and thus get a predictable
in-transaction status before entering the transaction() block.

Closes #374.

docs/news.rst
psycopg/psycopg/_pipeline.py
tests/test_pipeline.py
tests/test_pipeline_async.py

index edcd55696f341235194bc9f4ebe53896416a5096..1ede13cd81a3371f85404a0eda2e8dec613e9fb0 100644 (file)
@@ -16,6 +16,8 @@ Psycopg 3.1.2 (unreleased)
 - Fix handling of certain invalid time zones causing problems on Windows
   (:ticket:`#371`).
 - Fix segfault occurring when a loader fails initialization (:ticket:`#372`).
+- Fix invalid SAVEPOINT issued when entering `Connection.transaction()` within
+  a pipeline using an implicit transaction (:ticket:`#374`).
 
 
 Current release
index 023de014c50ac8313be838a2235641442ac71e8b..75bb6b2eb31aeabe9c1bb99e75ab27bd3cfcae2f 100644 (file)
@@ -31,6 +31,8 @@ FATAL_ERROR = pq.ExecStatus.FATAL_ERROR
 PIPELINE_ABORTED = pq.ExecStatus.PIPELINE_ABORTED
 BAD = pq.ConnStatus.BAD
 
+ACTIVE = pq.TransactionStatus.ACTIVE
+
 logger = logging.getLogger("psycopg")
 
 
@@ -92,7 +94,13 @@ class BasePipeline:
             )
         if self.level == 0:
             self.pgconn.enter_pipeline_mode()
-        elif self.command_queue:
+        elif self.command_queue or self.pgconn.transaction_status == ACTIVE:
+            # Nested pipeline case.
+            #  Transaction might be ACTIVE when the pipeline uses an "implicit
+            #  transaction", typically in autocommit mode. But when entering a
+            #  Psycopg transaction(), we expect the IDLE state. By sync()-ing,
+            #  we make sure all previous commands are completed and the
+            #  transaction gets back to IDLE.
             yield from self._sync_gen()
         self.level += 1
 
index afd035afb8616262eed1bf874841c2dac638697d..1ee1ae98c4f937045bb32ab727ae2b7fe9ae26b9 100644 (file)
@@ -266,6 +266,23 @@ def test_errors_raised_on_nested_transaction_exit(conn):
     assert cur2.fetchone() == (2,)
 
 
+def test_implicit_transaction(conn):
+    conn.autocommit = True
+    with conn.pipeline():
+        assert conn.pgconn.transaction_status == pq.TransactionStatus.IDLE
+        conn.execute("select 'before'")
+        # Transaction is ACTIVE because previous command is not completed
+        # since we have not fetched its results.
+        assert conn.pgconn.transaction_status == pq.TransactionStatus.ACTIVE
+        # Upon entering the nested pipeline through "with transaction():", a
+        # sync() is emitted to restore the transaction state to IDLE, as
+        # expected to emit a BEGIN.
+        with conn.transaction():
+            conn.execute("select 'tx'")
+        cur = conn.execute("select 'after'")
+    assert cur.fetchone() == ("after",)
+
+
 @pytest.mark.crdb_skip("deferrable")
 def test_error_on_commit(conn):
     conn.execute(
index 354e641a3a4766ab1a1df0bde428262baee91593..9e026f0c9abcde4dfca1ddc64066d8f4330b44a8 100644 (file)
@@ -267,6 +267,23 @@ async def test_errors_raised_on_nested_transaction_exit(aconn):
     assert await cur2.fetchone() == (2,)
 
 
+async def test_implicit_transaction(aconn):
+    await aconn.set_autocommit(True)
+    async with aconn.pipeline():
+        assert aconn.pgconn.transaction_status == pq.TransactionStatus.IDLE
+        await aconn.execute("select 'before'")
+        # Transaction is ACTIVE because previous command is not completed
+        # since we have not fetched its results.
+        assert aconn.pgconn.transaction_status == pq.TransactionStatus.ACTIVE
+        # Upon entering the nested pipeline through "with transaction():", a
+        # sync() is emitted to restore the transaction state to IDLE, as
+        # expected to emit a BEGIN.
+        async with aconn.transaction():
+            await aconn.execute("select 'tx'")
+        cur = await aconn.execute("select 'after'")
+    assert await cur.fetchone() == ("after",)
+
+
 @pytest.mark.crdb_skip("deferrable")
 async def test_error_on_commit(aconn):
     await aconn.execute(