From: Denis Laxalde Date: Thu, 15 Sep 2022 16:28:21 +0000 (+0200) Subject: fix: also sync nested pipeline when a transaction is active X-Git-Tag: 3.1.2~3 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d41d8fc6a09718babb570ec7a891ef047bd6121c;p=thirdparty%2Fpsycopg.git fix: also sync nested pipeline when a transaction is active Upon enter, transaction() checks the in-transaction status to determine if a BEGIN (when IDLE) or a SAVEPOINT (otherwise) should be inserted. However, in pipeline mode, if the pipeline is in "implicit transaction" (i.e. it has no active BEGIN) and statements have been sent, the in-transaction status might be ACTIVE if those statements have not yet completed (typically, when results have not been fetched). By issuing a sync() before entering a nested pipeline (which will happen when entering transaction(), if already within a pipeline), we force completion of those statements, and thus get a predictable in-transaction status before entering the transaction() block. Closes #374. --- diff --git a/docs/news.rst b/docs/news.rst index edcd55696..1ede13cd8 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -16,6 +16,8 @@ Psycopg 3.1.2 (unreleased) - Fix handling of certain invalid time zones causing problems on Windows (:ticket:`#371`). - Fix segfault occurring when a loader fails initialization (:ticket:`#372`). +- Fix invalid SAVEPOINT issued when entering `Connection.transaction()` within + a pipeline using an implicit transaction (:ticket:`#374`). Current release diff --git a/psycopg/psycopg/_pipeline.py b/psycopg/psycopg/_pipeline.py index 023de014c..75bb6b2eb 100644 --- a/psycopg/psycopg/_pipeline.py +++ b/psycopg/psycopg/_pipeline.py @@ -31,6 +31,8 @@ FATAL_ERROR = pq.ExecStatus.FATAL_ERROR PIPELINE_ABORTED = pq.ExecStatus.PIPELINE_ABORTED BAD = pq.ConnStatus.BAD +ACTIVE = pq.TransactionStatus.ACTIVE + logger = logging.getLogger("psycopg") @@ -92,7 +94,13 @@ class BasePipeline: ) if self.level == 0: self.pgconn.enter_pipeline_mode() - elif self.command_queue: + elif self.command_queue or self.pgconn.transaction_status == ACTIVE: + # Nested pipeline case. + # Transaction might be ACTIVE when the pipeline uses an "implicit + # transaction", typically in autocommit mode. But when entering a + # Psycopg transaction(), we expect the IDLE state. By sync()-ing, + # we make sure all previous commands are completed and the + # transaction gets back to IDLE. yield from self._sync_gen() self.level += 1 diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index afd035afb..1ee1ae98c 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -266,6 +266,23 @@ def test_errors_raised_on_nested_transaction_exit(conn): assert cur2.fetchone() == (2,) +def test_implicit_transaction(conn): + conn.autocommit = True + with conn.pipeline(): + assert conn.pgconn.transaction_status == pq.TransactionStatus.IDLE + conn.execute("select 'before'") + # Transaction is ACTIVE because previous command is not completed + # since we have not fetched its results. + assert conn.pgconn.transaction_status == pq.TransactionStatus.ACTIVE + # Upon entering the nested pipeline through "with transaction():", a + # sync() is emitted to restore the transaction state to IDLE, as + # expected to emit a BEGIN. + with conn.transaction(): + conn.execute("select 'tx'") + cur = conn.execute("select 'after'") + assert cur.fetchone() == ("after",) + + @pytest.mark.crdb_skip("deferrable") def test_error_on_commit(conn): conn.execute( diff --git a/tests/test_pipeline_async.py b/tests/test_pipeline_async.py index 354e641a3..9e026f0c9 100644 --- a/tests/test_pipeline_async.py +++ b/tests/test_pipeline_async.py @@ -267,6 +267,23 @@ async def test_errors_raised_on_nested_transaction_exit(aconn): assert await cur2.fetchone() == (2,) +async def test_implicit_transaction(aconn): + await aconn.set_autocommit(True) + async with aconn.pipeline(): + assert aconn.pgconn.transaction_status == pq.TransactionStatus.IDLE + await aconn.execute("select 'before'") + # Transaction is ACTIVE because previous command is not completed + # since we have not fetched its results. + assert aconn.pgconn.transaction_status == pq.TransactionStatus.ACTIVE + # Upon entering the nested pipeline through "with transaction():", a + # sync() is emitted to restore the transaction state to IDLE, as + # expected to emit a BEGIN. + async with aconn.transaction(): + await aconn.execute("select 'tx'") + cur = await aconn.execute("select 'after'") + assert await cur.fetchone() == ("after",) + + @pytest.mark.crdb_skip("deferrable") async def test_error_on_commit(aconn): await aconn.execute(