]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: lock the connection during all stream() lifetime
authorDenis Laxalde <denis.laxalde@dalibo.com>
Thu, 6 Oct 2022 14:41:05 +0000 (16:41 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Thu, 6 Oct 2022 16:10:36 +0000 (17:10 +0100)
Namely, also lock the connection when closing the generator, in the
finally clause, where we are fetching results.

psycopg/psycopg/cursor.py
psycopg/psycopg/cursor_async.py

index 7fe4b4773f606f834f6700c2ddba06b364740354..789954de5b5fc471036fccef6cf59f71e07165da 100644 (file)
@@ -772,8 +772,9 @@ class Cursor(BaseCursor["Connection[Any]", Row]):
         if self._pgconn.pipeline_status:
             raise e.ProgrammingError("stream() cannot be used in pipeline mode")
 
-        try:
-            with self._conn.lock:
+        with self._conn.lock:
+
+            try:
                 self._conn.wait(self._stream_send_gen(query, params, binary=binary))
                 first = True
                 while self._conn.wait(self._stream_fetchone_gen(first)):
@@ -782,25 +783,26 @@ class Cursor(BaseCursor["Connection[Any]", Row]):
                     yield rec
                     first = False
 
-        except e.Error as ex:
-            raise ex.with_traceback(None)
+            except e.Error as ex:
+                raise ex.with_traceback(None)
+
+            finally:
+                if self._pgconn.transaction_status == ACTIVE:
+                    # Try to cancel the query, then consume the results
+                    # already received.
+                    self._conn.cancel()
+                    try:
+                        while self._conn.wait(self._stream_fetchone_gen(first=False)):
+                            pass
+                    except Exception:
+                        pass
 
-        finally:
-            if self._pgconn.transaction_status == ACTIVE:
-                # Try to cancel the query, then consume the results already received.
-                self._conn.cancel()
-                try:
-                    while self._conn.wait(self._stream_fetchone_gen(first=False)):
+                    # Try to get out of ACTIVE state. Just do a single attempt, which
+                    # should work to recover from an error or query cancelled.
+                    try:
+                        self._conn.wait(self._stream_fetchone_gen(first=False))
+                    except Exception:
                         pass
-                except Exception:
-                    pass
-
-                # Try to get out of ACTIVE state. Just do a single attempt, which
-                # should work to recover from an error or query cancelled.
-                try:
-                    self._conn.wait(self._stream_fetchone_gen(first=False))
-                except Exception:
-                    pass
 
     def fetchone(self) -> Optional[Row]:
         """
index 4a108175b09d3b6a711fdcadbf325e1834f9618e..8971d40971596a8ce8b6b10cad64afc03837ec37 100644 (file)
@@ -134,8 +134,9 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
         if self._pgconn.pipeline_status:
             raise e.ProgrammingError("stream() cannot be used in pipeline mode")
 
-        try:
-            async with self._conn.lock:
+        async with self._conn.lock:
+
+            try:
                 await self._conn.wait(
                     self._stream_send_gen(query, params, binary=binary)
                 )
@@ -146,25 +147,28 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
                     yield rec
                     first = False
 
-        except e.Error as ex:
-            raise ex.with_traceback(None)
+            except e.Error as ex:
+                raise ex.with_traceback(None)
+
+            finally:
+                if self._pgconn.transaction_status == ACTIVE:
+                    # Try to cancel the query, then consume the results
+                    # already received.
+                    self._conn.cancel()
+                    try:
+                        while await self._conn.wait(
+                            self._stream_fetchone_gen(first=False)
+                        ):
+                            pass
+                    except Exception:
+                        pass
 
-        finally:
-            if self._pgconn.transaction_status == ACTIVE:
-                # Try to cancel the query, then consume the results already received.
-                self._conn.cancel()
-                try:
-                    while await self._conn.wait(self._stream_fetchone_gen(first=False)):
+                    # Try to get out of ACTIVE state. Just do a single attempt, which
+                    # should work to recover from an error or query cancelled.
+                    try:
+                        await self._conn.wait(self._stream_fetchone_gen(first=False))
+                    except Exception:
                         pass
-                except Exception:
-                    pass
-
-                # Try to get out of ACTIVE state. Just do a single attempt, which
-                # should work to recover from an error or query cancelled.
-                try:
-                    await self._conn.wait(self._stream_fetchone_gen(first=False))
-                except Exception:
-                    pass
 
     async def fetchone(self) -> Optional[Row]:
         await self._fetch_pipeline()