]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
refactor: move final pipeline steps into a communicate() method
authorDenis Laxalde <denis.laxalde@dalibo.com>
Tue, 29 Mar 2022 06:53:57 +0000 (08:53 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 2 Apr 2022 23:23:22 +0000 (01:23 +0200)
psycopg/psycopg/_pipeline.py

index 99a0103398773809a253925962b80959cc92ab6b..4a73286e66d786e296ad0c38eef6124eb2731238 100644 (file)
@@ -160,6 +160,24 @@ class Pipeline(BasePipeline):
     def __init__(self, conn: "Connection[Any]") -> None:
         super().__init__(conn)
 
+    def communicate(self) -> None:
+        """Sync the pipeline, send any pending command and fetch and process
+        all available results.
+
+        This is called when exiting the pipeline, but can be used for other
+        purposes (e.g. in nested pipelines).
+        """
+        with self._conn.lock:
+            self.sync()
+            try:
+                # Send any pending commands (e.g. COMMIT or Sync);
+                # while processing results, we might get errors...
+                self._conn.wait(self._communicate_gen())
+            finally:
+                # then fetch all remaining results but without forcing
+                # flush since we emitted a sync just before.
+                self._conn.wait(self._fetch_gen(flush=False))
+
     def __enter__(self) -> "Pipeline":
         self._enter()
         return self
@@ -171,16 +189,7 @@ class Pipeline(BasePipeline):
         exc_tb: Optional[TracebackType],
     ) -> None:
         try:
-            with self._conn.lock:
-                self.sync()
-                try:
-                    # Send any pending commands (e.g. COMMIT or Sync);
-                    # while processing results, we might get errors...
-                    self._conn.wait(self._communicate_gen())
-                finally:
-                    # then fetch all remaining results but without forcing
-                    # flush since we emitted a sync just before.
-                    self._conn.wait(self._fetch_gen(flush=False))
+            self.communicate()
         except Exception as exc2:
             # Don't clobber an exception raised in the block with this one
             if exc_val:
@@ -200,6 +209,24 @@ class AsyncPipeline(BasePipeline):
     def __init__(self, conn: "AsyncConnection[Any]") -> None:
         super().__init__(conn)
 
+    async def communicate(self) -> None:
+        """Sync the pipeline, send any pending command and fetch and process
+        all available results.
+
+        This is called when exiting the pipeline, but can be used for other
+        purposes (e.g. in nested pipelines).
+        """
+        async with self._conn.lock:
+            self.sync()
+            try:
+                # Send any pending commands (e.g. COMMIT or Sync);
+                # while processing results, we might get errors...
+                await self._conn.wait(self._communicate_gen())
+            finally:
+                # then fetch all remaining results but without forcing
+                # flush since we emitted a sync just before.
+                await self._conn.wait(self._fetch_gen(flush=False))
+
     async def __aenter__(self) -> "AsyncPipeline":
         self._enter()
         return self
@@ -211,16 +238,7 @@ class AsyncPipeline(BasePipeline):
         exc_tb: Optional[TracebackType],
     ) -> None:
         try:
-            async with self._conn.lock:
-                self.sync()
-                try:
-                    # Send any pending commands (e.g. COMMIT or Sync);
-                    # while processing results, we might get errors...
-                    await self._conn.wait(self._communicate_gen())
-                finally:
-                    # then fetch all remaining results but without forcing
-                    # flush since we emitted a sync just before.
-                    await self._conn.wait(self._fetch_gen(flush=False))
+            await self.communicate()
         except Exception as exc2:
             # Don't clobber an exception raised in the block with this one
             if exc_val: