From: Daniele Varrazzo Date: Sat, 13 Apr 2024 10:23:38 +0000 (+0200) Subject: fix: avoid to swallow exceptions in public cancel methods X-Git-Tag: 3.2.0~41^2~1 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=62b43bbd8949c35fdeb0d30f3b867ddd940e132c;p=thirdparty%2Fpsycopg.git fix: avoid to swallow exceptions in public cancel methods Only ignore errors in every internal usage, in order to cancel as a best effort and don't clobber a likely more meaningful exception. --- diff --git a/psycopg/psycopg/_connection_base.py b/psycopg/psycopg/_connection_base.py index aede7e95c..3c2ecf16e 100644 --- a/psycopg/psycopg/_connection_base.py +++ b/psycopg/psycopg/_connection_base.py @@ -293,16 +293,9 @@ class BaseConnection(Generic[Row]): def cancel(self) -> None: """Cancel the current operation on the connection.""" - if not self._should_cancel(): - return - - # Don't fail cancelling (which might happen on connection closing) to - # avoid clobbering eventual exceptions with ours, which is less important. - try: + if self._should_cancel(): c = self.pgconn.get_cancel() c.cancel() - except Exception as ex: - logger.warning("couldn't try to cancel query: %s", ex) def _should_cancel(self) -> bool: """Check whether the current command should actually be cancelled when diff --git a/psycopg/psycopg/_copy.py b/psycopg/psycopg/_copy.py index 844b74de5..d4db84a2d 100644 --- a/psycopg/psycopg/_copy.py +++ b/psycopg/psycopg/_copy.py @@ -160,7 +160,7 @@ class Copy(BaseCopy["Connection[Any]"]): # (which might or might not have been already transferred entirely to # the client, so we won't necessary see the exception associated with # canceling). - self.connection.cancel_safe() + self.connection._try_cancel(timeout=5.0) self.connection.wait(self._end_copy_out_gen()) diff --git a/psycopg/psycopg/_copy_async.py b/psycopg/psycopg/_copy_async.py index 16bc714be..a371f185d 100644 --- a/psycopg/psycopg/_copy_async.py +++ b/psycopg/psycopg/_copy_async.py @@ -159,7 +159,7 @@ class AsyncCopy(BaseCopy["AsyncConnection[Any]"]): # (which might or might not have been already transferred entirely to # the client, so we won't necessary see the exception associated with # canceling). - await self.connection.cancel_safe() + await self.connection._try_cancel(timeout=5.0) await self.connection.wait(self._end_copy_out_gen()) diff --git a/psycopg/psycopg/connection.py b/psycopg/psycopg/connection.py index a4680d33d..ad233c375 100644 --- a/psycopg/psycopg/connection.py +++ b/psycopg/psycopg/connection.py @@ -282,15 +282,18 @@ class Connection(BaseConnection[Row]): return if capabilities.has_cancel_safe(): - try: - waiting.wait_conn( - self._cancel_gen(timeout=timeout), interval=_WAIT_INTERVAL - ) - except Exception as ex: - logger.warning("couldn't try to cancel query: %s", ex) + waiting.wait_conn( + self._cancel_gen(timeout=timeout), interval=_WAIT_INTERVAL + ) else: self.cancel() + def _try_cancel(self, *, timeout: float = 30.0) -> None: + try: + self.cancel_safe(timeout=timeout) + except Exception as ex: + logger.warning("query cancellation failed: %s", ex) + @contextmanager def transaction( self, savepoint_name: Optional[str] = None, force_rollback: bool = False @@ -396,7 +399,7 @@ class Connection(BaseConnection[Row]): if self.pgconn.transaction_status == ACTIVE: # On Ctrl-C, try to cancel the query in the server, otherwise # the connection will remain stuck in ACTIVE state. - self.cancel_safe() + self._try_cancel(timeout=5.0) try: waiting.wait(gen, self.pgconn.socket, interval=interval) except e.QueryCanceled: diff --git a/psycopg/psycopg/connection_async.py b/psycopg/psycopg/connection_async.py index f718cbe3e..785018522 100644 --- a/psycopg/psycopg/connection_async.py +++ b/psycopg/psycopg/connection_async.py @@ -299,19 +299,21 @@ class AsyncConnection(BaseConnection[Row]): return if capabilities.has_cancel_safe(): - try: - await waiting.wait_conn_async( - self._cancel_gen(timeout=timeout), interval=_WAIT_INTERVAL - ) - except Exception as ex: - logger.warning("couldn't try to cancel query: %s", ex) - + await waiting.wait_conn_async( + self._cancel_gen(timeout=timeout), interval=_WAIT_INTERVAL + ) else: if True: # ASYNC await to_thread(self.cancel) else: self.cancel() + async def _try_cancel(self, *, timeout: float = 30.0) -> None: + try: + await self.cancel_safe(timeout=timeout) + except Exception as ex: + logger.warning("query cancellation failed: %s", ex) + @asynccontextmanager async def transaction( self, savepoint_name: Optional[str] = None, force_rollback: bool = False @@ -419,7 +421,7 @@ class AsyncConnection(BaseConnection[Row]): if self.pgconn.transaction_status == ACTIVE: # On Ctrl-C, try to cancel the query in the server, otherwise # the connection will remain stuck in ACTIVE state. - await self.cancel_safe() + await self._try_cancel(timeout=5.0) try: await waiting.wait_async(gen, self.pgconn.socket, interval=interval) except e.QueryCanceled: diff --git a/psycopg/psycopg/cursor.py b/psycopg/psycopg/cursor.py index 099076b51..b49c3e5af 100644 --- a/psycopg/psycopg/cursor.py +++ b/psycopg/psycopg/cursor.py @@ -159,7 +159,7 @@ class Cursor(BaseCursor["Connection[Any]", Row]): if self._pgconn.transaction_status == ACTIVE: # Try to cancel the query, then consume the results # already received. - self._conn.cancel_safe() + self._conn._try_cancel(timeout=5.0) try: while self._conn.wait(self._stream_fetchone_gen(first=False)): pass diff --git a/psycopg/psycopg/cursor_async.py b/psycopg/psycopg/cursor_async.py index 485cf0e93..46700da52 100644 --- a/psycopg/psycopg/cursor_async.py +++ b/psycopg/psycopg/cursor_async.py @@ -164,7 +164,7 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]): if self._pgconn.transaction_status == ACTIVE: # Try to cancel the query, then consume the results # already received. - await self._conn.cancel_safe() + await self._conn._try_cancel(timeout=5.0) try: while await self._conn.wait( self._stream_fetchone_gen(first=False) diff --git a/tests/test_connection.py b/tests/test_connection.py index c9a7c7bfe..2acf0d747 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -825,6 +825,25 @@ def test_cancel_safe_closed(conn): conn.cancel_safe() +@pytest.mark.slow +@pytest.mark.timing +def test_cancel_safe_error(conn_cls, proxy, caplog): + caplog.set_level(logging.WARNING, logger="psycopg") + proxy.start() + with conn_cls.connect(proxy.client_dsn) as conn: + proxy.stop() + with pytest.raises( + e.OperationalError, match="(Connection refused)|(connect\\(\\) failed)" + ) as ex: + conn.cancel_safe(timeout=2) + assert not caplog.records + + # Note: testing an internal method. It's ok if this behaviour changes + conn._try_cancel(timeout=2.0) + assert len(caplog.records) == 1 + caplog.records[0].message == str(ex.value) + + @pytest.mark.slow @pytest.mark.timing @pytest.mark.libpq(">= 17") diff --git a/tests/test_connection_async.py b/tests/test_connection_async.py index 7d0932ea2..f5c355b25 100644 --- a/tests/test_connection_async.py +++ b/tests/test_connection_async.py @@ -829,6 +829,25 @@ async def test_cancel_safe_closed(aconn): await aconn.cancel_safe() +@pytest.mark.slow +@pytest.mark.timing +async def test_cancel_safe_error(aconn_cls, proxy, caplog): + caplog.set_level(logging.WARNING, logger="psycopg") + proxy.start() + async with await aconn_cls.connect(proxy.client_dsn) as aconn: + proxy.stop() + with pytest.raises( + e.OperationalError, match=r"(Connection refused)|(connect\(\) failed)" + ) as ex: + await aconn.cancel_safe(timeout=2) + assert not caplog.records + + # Note: testing an internal method. It's ok if this behaviour changes + await aconn._try_cancel(timeout=2.0) + assert len(caplog.records) == 1 + caplog.records[0].message == str(ex.value) + + @pytest.mark.slow @pytest.mark.timing @pytest.mark.libpq(">= 17")