From fea393ac2a48a4d60096adb6ee3e91f8e0a3a339 Mon Sep 17 00:00:00 2001 From: Denis Laxalde Date: Thu, 6 Oct 2022 16:41:05 +0200 Subject: [PATCH] fix: lock the connection during all stream() lifetime Namely, also lock the connection when closing the generator, in the finally clause, where we are fetching results. --- psycopg/psycopg/cursor.py | 40 ++++++++++++++++--------------- psycopg/psycopg/cursor_async.py | 42 ++++++++++++++++++--------------- 2 files changed, 44 insertions(+), 38 deletions(-) diff --git a/psycopg/psycopg/cursor.py b/psycopg/psycopg/cursor.py index 7fe4b4773..789954de5 100644 --- a/psycopg/psycopg/cursor.py +++ b/psycopg/psycopg/cursor.py @@ -772,8 +772,9 @@ class Cursor(BaseCursor["Connection[Any]", Row]): if self._pgconn.pipeline_status: raise e.ProgrammingError("stream() cannot be used in pipeline mode") - try: - with self._conn.lock: + with self._conn.lock: + + try: self._conn.wait(self._stream_send_gen(query, params, binary=binary)) first = True while self._conn.wait(self._stream_fetchone_gen(first)): @@ -782,25 +783,26 @@ class Cursor(BaseCursor["Connection[Any]", Row]): yield rec first = False - except e.Error as ex: - raise ex.with_traceback(None) + except e.Error as ex: + raise ex.with_traceback(None) + + finally: + if self._pgconn.transaction_status == ACTIVE: + # Try to cancel the query, then consume the results + # already received. + self._conn.cancel() + try: + while self._conn.wait(self._stream_fetchone_gen(first=False)): + pass + except Exception: + pass - finally: - if self._pgconn.transaction_status == ACTIVE: - # Try to cancel the query, then consume the results already received. - self._conn.cancel() - try: - while self._conn.wait(self._stream_fetchone_gen(first=False)): + # Try to get out of ACTIVE state. Just do a single attempt, which + # should work to recover from an error or query cancelled. + try: + self._conn.wait(self._stream_fetchone_gen(first=False)) + except Exception: pass - except Exception: - pass - - # Try to get out of ACTIVE state. Just do a single attempt, which - # should work to recover from an error or query cancelled. - try: - self._conn.wait(self._stream_fetchone_gen(first=False)) - except Exception: - pass def fetchone(self) -> Optional[Row]: """ diff --git a/psycopg/psycopg/cursor_async.py b/psycopg/psycopg/cursor_async.py index 4a108175b..8971d4097 100644 --- a/psycopg/psycopg/cursor_async.py +++ b/psycopg/psycopg/cursor_async.py @@ -134,8 +134,9 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]): if self._pgconn.pipeline_status: raise e.ProgrammingError("stream() cannot be used in pipeline mode") - try: - async with self._conn.lock: + async with self._conn.lock: + + try: await self._conn.wait( self._stream_send_gen(query, params, binary=binary) ) @@ -146,25 +147,28 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]): yield rec first = False - except e.Error as ex: - raise ex.with_traceback(None) + except e.Error as ex: + raise ex.with_traceback(None) + + finally: + if self._pgconn.transaction_status == ACTIVE: + # Try to cancel the query, then consume the results + # already received. + self._conn.cancel() + try: + while await self._conn.wait( + self._stream_fetchone_gen(first=False) + ): + pass + except Exception: + pass - finally: - if self._pgconn.transaction_status == ACTIVE: - # Try to cancel the query, then consume the results already received. - self._conn.cancel() - try: - while await self._conn.wait(self._stream_fetchone_gen(first=False)): + # Try to get out of ACTIVE state. Just do a single attempt, which + # should work to recover from an error or query cancelled. + try: + await self._conn.wait(self._stream_fetchone_gen(first=False)) + except Exception: pass - except Exception: - pass - - # Try to get out of ACTIVE state. Just do a single attempt, which - # should work to recover from an error or query cancelled. - try: - await self._conn.wait(self._stream_fetchone_gen(first=False)) - except Exception: - pass async def fetchone(self) -> Optional[Row]: await self._fetch_pipeline() -- 2.47.2