From: Denis Laxalde Date: Thu, 6 Oct 2022 14:41:05 +0000 (+0200) Subject: fix: lock the connection during all stream() lifetime X-Git-Tag: 3.1.4~10 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=fea393ac2a48a4d60096adb6ee3e91f8e0a3a339;p=thirdparty%2Fpsycopg.git fix: lock the connection during all stream() lifetime Namely, also lock the connection when closing the generator, in the finally clause, where we are fetching results. --- diff --git a/psycopg/psycopg/cursor.py b/psycopg/psycopg/cursor.py index 7fe4b4773..789954de5 100644 --- a/psycopg/psycopg/cursor.py +++ b/psycopg/psycopg/cursor.py @@ -772,8 +772,9 @@ class Cursor(BaseCursor["Connection[Any]", Row]): if self._pgconn.pipeline_status: raise e.ProgrammingError("stream() cannot be used in pipeline mode") - try: - with self._conn.lock: + with self._conn.lock: + + try: self._conn.wait(self._stream_send_gen(query, params, binary=binary)) first = True while self._conn.wait(self._stream_fetchone_gen(first)): @@ -782,25 +783,26 @@ class Cursor(BaseCursor["Connection[Any]", Row]): yield rec first = False - except e.Error as ex: - raise ex.with_traceback(None) + except e.Error as ex: + raise ex.with_traceback(None) + + finally: + if self._pgconn.transaction_status == ACTIVE: + # Try to cancel the query, then consume the results + # already received. + self._conn.cancel() + try: + while self._conn.wait(self._stream_fetchone_gen(first=False)): + pass + except Exception: + pass - finally: - if self._pgconn.transaction_status == ACTIVE: - # Try to cancel the query, then consume the results already received. - self._conn.cancel() - try: - while self._conn.wait(self._stream_fetchone_gen(first=False)): + # Try to get out of ACTIVE state. Just do a single attempt, which + # should work to recover from an error or query cancelled. + try: + self._conn.wait(self._stream_fetchone_gen(first=False)) + except Exception: pass - except Exception: - pass - - # Try to get out of ACTIVE state. Just do a single attempt, which - # should work to recover from an error or query cancelled. - try: - self._conn.wait(self._stream_fetchone_gen(first=False)) - except Exception: - pass def fetchone(self) -> Optional[Row]: """ diff --git a/psycopg/psycopg/cursor_async.py b/psycopg/psycopg/cursor_async.py index 4a108175b..8971d4097 100644 --- a/psycopg/psycopg/cursor_async.py +++ b/psycopg/psycopg/cursor_async.py @@ -134,8 +134,9 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]): if self._pgconn.pipeline_status: raise e.ProgrammingError("stream() cannot be used in pipeline mode") - try: - async with self._conn.lock: + async with self._conn.lock: + + try: await self._conn.wait( self._stream_send_gen(query, params, binary=binary) ) @@ -146,25 +147,28 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]): yield rec first = False - except e.Error as ex: - raise ex.with_traceback(None) + except e.Error as ex: + raise ex.with_traceback(None) + + finally: + if self._pgconn.transaction_status == ACTIVE: + # Try to cancel the query, then consume the results + # already received. + self._conn.cancel() + try: + while await self._conn.wait( + self._stream_fetchone_gen(first=False) + ): + pass + except Exception: + pass - finally: - if self._pgconn.transaction_status == ACTIVE: - # Try to cancel the query, then consume the results already received. - self._conn.cancel() - try: - while await self._conn.wait(self._stream_fetchone_gen(first=False)): + # Try to get out of ACTIVE state. Just do a single attempt, which + # should work to recover from an error or query cancelled. + try: + await self._conn.wait(self._stream_fetchone_gen(first=False)) + except Exception: pass - except Exception: - pass - - # Try to get out of ACTIVE state. Just do a single attempt, which - # should work to recover from an error or query cancelled. - try: - await self._conn.wait(self._stream_fetchone_gen(first=False)) - except Exception: - pass async def fetchone(self) -> Optional[Row]: await self._fetch_pipeline()