]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: fix integration between pipelines and nested transaction
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 9 May 2022 23:03:13 +0000 (01:03 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 9 May 2022 23:03:13 +0000 (01:03 +0200)
Refactor to remove all the knowledge about pipelines from the
transaction object. Replaced by just syncing before entering the
transaction.

Had to drop the pipeline exit, which will now fetch results whatever the
level. If not doing so, a final sync should have been forced to exit
with a clean state from an inner transaction block.

psycopg/psycopg/_pipeline.py
psycopg/psycopg/connection.py
psycopg/psycopg/connection_async.py
psycopg/psycopg/transaction.py

index 7aab4f2f7a6da6bf8804754ba297b2e33cf2f466..e207f58341be16143dcdc1e88378c788ba45481a 100644 (file)
@@ -90,16 +90,15 @@ class BasePipeline:
         yield from self._fetch_gen(flush=False)
 
     def _exit_gen(self) -> PQGen[None]:
-        """Exit current pipeline by sending a Sync and, unless within a nested
-        pipeline, also fetch back all remaining results.
+        """
+        Exit current pipeline by sending a Sync and fetch back all remaining results.
         """
         try:
             self._enqueue_sync()
             yield from self._communicate_gen()
         finally:
-            if self.level == 1:
-                # No need to force flush since we emitted a sync just before.
-                yield from self._fetch_gen(flush=False)
+            # No need to force flush since we emitted a sync just before.
+            yield from self._fetch_gen(flush=False)
 
     def _communicate_gen(self) -> PQGen[None]:
         """Communicate with pipeline to send commands and possibly fetch
index f8a43bb9241b7fd999d6a21868061f74a7f6585b..a80ab619dda69f219eff16863e23ffe5791eb90a 100644 (file)
@@ -871,6 +871,7 @@ class Connection(BaseConnection[Row]):
         """
         tx = Transaction(self, savepoint_name, force_rollback)
         if self._pipeline:
+            self._pipeline.sync()
             with tx, self.pipeline():
                 yield tx
         else:
index 606c68386e10ab34825aac26bd83308265c07a89..6e8fab8bc7a171d897d3acc2b87e5374fa260eac 100644 (file)
@@ -283,6 +283,7 @@ class AsyncConnection(BaseConnection[Row]):
         """
         tx = AsyncTransaction(self, savepoint_name, force_rollback)
         if self._pipeline:
+            await self._pipeline.sync()
             async with tx, self.pipeline():
                 yield tx
         else:
index be46ead978093cf9aaa93216657e198ae7eed422..a7bfa2b501a7f08af191569648a44f732ff49e6c 100644 (file)
@@ -90,16 +90,6 @@ class BaseTransaction(Generic[ConnectionType]):
             raise TypeError("transaction blocks can be used only once")
         self._entered = True
 
-        # If we are in pipeline mode and connection idle, we might be in a
-        # nested statement but we haven't received yet the result to go
-        # INTRANS. This would make _push_savepoint() fail. If so, synchronize
-        # with the server.
-        if (
-            self._conn._pipeline
-            and self.pgconn.transaction_status == TransactionStatus.IDLE
-        ):
-            yield from self._conn._pipeline._sync_gen()
-
         self._push_savepoint()
         for command in self._get_enter_commands():
             yield from self._conn._exec_command(command)
@@ -138,9 +128,6 @@ class BaseTransaction(Generic[ConnectionType]):
         for command in self._get_commit_commands():
             yield from self._conn._exec_command(command)
 
-        if self._conn._pipeline:
-            yield from self._conn._pipeline._sync_gen()
-
     def _rollback_gen(self, exc_val: Optional[BaseException]) -> PQGen[bool]:
         if isinstance(exc_val, Rollback):
             logger.debug(f"{self._conn}: Explicit rollback from: ", exc_info=True)
@@ -150,13 +137,6 @@ class BaseTransaction(Generic[ConnectionType]):
         if ex:
             raise ex
 
-        # Get out of a "pipeline aborted" state
-        if (
-            self._conn._pipeline
-            and self.pgconn.pipeline_status == pq.PipelineStatus.ABORTED
-        ):
-            yield from self._conn._pipeline._sync_gen()
-
         for command in self._get_rollback_commands():
             yield from self._conn._exec_command(command)