]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: do not fetch in Pipeline.sync()
authorDenis Laxalde <denis.laxalde@dalibo.com>
Mon, 4 Apr 2022 09:41:06 +0000 (11:41 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 7 May 2022 12:51:51 +0000 (14:51 +0200)
During previous refactorings, we made Pipeline.sync() also fetch results
from the server. But this somehow breaks the semantics of the
synchronization point as defined by Postgres because the user might be
interested in emitting Sync message as a way to solely close the current
series of queries in the pipeline: i.e., flush queries from client to
server and reset the pipeline error state. In this respect, the 'fetch'
step should be explicit.

BasePipeline._sync_gen() is changed to only emit a Sync and a new
_exit_gen() method is introduced doing what _sync_gen() previously did.
Accordingly, the warning emitted when calling this _exit_gen() at
pipeline exit is adjusted to say "terminating" instead of "syncing".

psycopg/psycopg/_pipeline.py
psycopg/psycopg/cursor.py

index 63bf765cbc6bd3a0d1a9e817ed4b0f36fde622ea..068018dd2d81ed8e2754303af0a2004f71334d0c 100644 (file)
@@ -86,10 +86,13 @@ class BasePipeline:
 
     def _sync_gen(self) -> PQGen[None]:
         self._enqueue_sync()
+        yield from self._communicate_gen()
+
+    def _exit_gen(self) -> PQGen[None]:
         try:
             # Send any pending commands (e.g. COMMIT or Sync);
             # while processing results, we might get errors...
-            yield from self._communicate_gen()
+            yield from self._sync_gen()
         finally:
             # then fetch all remaining results but without forcing
             # flush since we emitted a sync just before.
@@ -173,11 +176,8 @@ class Pipeline(BasePipeline):
         super().__init__(conn)
 
     def sync(self) -> None:
-        """Sync the pipeline, send any pending command and fetch and process
+        """Sync the pipeline, send any pending command and receive and process
         all available results.
-
-        This is called when exiting the pipeline, but can be used for other
-        purposes (e.g. in nested pipelines).
         """
         try:
             with self._conn.lock:
@@ -196,11 +196,12 @@ class Pipeline(BasePipeline):
         exc_tb: Optional[TracebackType],
     ) -> None:
         try:
-            self.sync()
+            with self._conn.lock:
+                self._conn.wait(self._exit_gen())
         except Exception as exc2:
             # Don't clobber an exception raised in the block with this one
             if exc_val:
-                logger.warning("error ignored syncing %r: %s", self, exc2)
+                logger.warning("error ignored terminating %r: %s", self, exc2)
             else:
                 raise exc2.with_traceback(None)
         finally:
@@ -249,11 +250,12 @@ class AsyncPipeline(BasePipeline):
         exc_tb: Optional[TracebackType],
     ) -> None:
         try:
-            await self.sync()
+            async with self._conn.lock:
+                await self._conn.wait(self._exit_gen())
         except Exception as exc2:
             # Don't clobber an exception raised in the block with this one
             if exc_val:
-                logger.warning("error ignored syncing %r: %s", self, exc2)
+                logger.warning("error ignored terminating %r: %s", self, exc2)
             else:
                 raise exc2.with_traceback(None)
         finally:
index 0a534d563d9e4db7653c0d1f0c134d95c41f2de8..8a473a9876f92b21043e607eb35279e392ccf6df 100644 (file)
@@ -241,7 +241,7 @@ class BaseCursor(Generic[ConnectionType, Row]):
         self._last_query = query
 
         if returning:
-            yield from pipeline._sync_gen()
+            yield from pipeline._exit_gen()
 
         for cmd in self._conn._prepared.get_maintenance_commands():
             yield from self._conn._exec_command(cmd)