From: Daniele Varrazzo Date: Mon, 9 May 2022 23:03:13 +0000 (+0200) Subject: fix: fix integration between pipelines and nested transaction X-Git-Tag: 3.1~113^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3047730f1ca6a53a4b86435ecfca0dd101eef67e;p=thirdparty%2Fpsycopg.git fix: fix integration between pipelines and nested transaction Refactor to remove all the knowledge about pipelines from the transaction object. Replaced by just syncing before entering the transaction. Had to drop the pipeline exit, which will now fetch results whatever the level. If not doing so, a final sync should have been forced to exit with a clean state from an inner transaction block. --- diff --git a/psycopg/psycopg/_pipeline.py b/psycopg/psycopg/_pipeline.py index 7aab4f2f7..e207f5834 100644 --- a/psycopg/psycopg/_pipeline.py +++ b/psycopg/psycopg/_pipeline.py @@ -90,16 +90,15 @@ class BasePipeline: yield from self._fetch_gen(flush=False) def _exit_gen(self) -> PQGen[None]: - """Exit current pipeline by sending a Sync and, unless within a nested - pipeline, also fetch back all remaining results. + """ + Exit current pipeline by sending a Sync and fetch back all remaining results. """ try: self._enqueue_sync() yield from self._communicate_gen() finally: - if self.level == 1: - # No need to force flush since we emitted a sync just before. - yield from self._fetch_gen(flush=False) + # No need to force flush since we emitted a sync just before. + yield from self._fetch_gen(flush=False) def _communicate_gen(self) -> PQGen[None]: """Communicate with pipeline to send commands and possibly fetch diff --git a/psycopg/psycopg/connection.py b/psycopg/psycopg/connection.py index f8a43bb92..a80ab619d 100644 --- a/psycopg/psycopg/connection.py +++ b/psycopg/psycopg/connection.py @@ -871,6 +871,7 @@ class Connection(BaseConnection[Row]): """ tx = Transaction(self, savepoint_name, force_rollback) if self._pipeline: + self._pipeline.sync() with tx, self.pipeline(): yield tx else: diff --git a/psycopg/psycopg/connection_async.py b/psycopg/psycopg/connection_async.py index 606c68386..6e8fab8bc 100644 --- a/psycopg/psycopg/connection_async.py +++ b/psycopg/psycopg/connection_async.py @@ -283,6 +283,7 @@ class AsyncConnection(BaseConnection[Row]): """ tx = AsyncTransaction(self, savepoint_name, force_rollback) if self._pipeline: + await self._pipeline.sync() async with tx, self.pipeline(): yield tx else: diff --git a/psycopg/psycopg/transaction.py b/psycopg/psycopg/transaction.py index be46ead97..a7bfa2b50 100644 --- a/psycopg/psycopg/transaction.py +++ b/psycopg/psycopg/transaction.py @@ -90,16 +90,6 @@ class BaseTransaction(Generic[ConnectionType]): raise TypeError("transaction blocks can be used only once") self._entered = True - # If we are in pipeline mode and connection idle, we might be in a - # nested statement but we haven't received yet the result to go - # INTRANS. This would make _push_savepoint() fail. If so, synchronize - # with the server. - if ( - self._conn._pipeline - and self.pgconn.transaction_status == TransactionStatus.IDLE - ): - yield from self._conn._pipeline._sync_gen() - self._push_savepoint() for command in self._get_enter_commands(): yield from self._conn._exec_command(command) @@ -138,9 +128,6 @@ class BaseTransaction(Generic[ConnectionType]): for command in self._get_commit_commands(): yield from self._conn._exec_command(command) - if self._conn._pipeline: - yield from self._conn._pipeline._sync_gen() - def _rollback_gen(self, exc_val: Optional[BaseException]) -> PQGen[bool]: if isinstance(exc_val, Rollback): logger.debug(f"{self._conn}: Explicit rollback from: ", exc_info=True) @@ -150,13 +137,6 @@ class BaseTransaction(Generic[ConnectionType]): if ex: raise ex - # Get out of a "pipeline aborted" state - if ( - self._conn._pipeline - and self.pgconn.pipeline_status == pq.PipelineStatus.ABORTED - ): - yield from self._conn._pipeline._sync_gen() - for command in self._get_rollback_commands(): yield from self._conn._exec_command(command)