]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: avoid to swallow exceptions in public cancel methods
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 13 Apr 2024 10:23:38 +0000 (12:23 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 17 Apr 2024 21:51:20 +0000 (23:51 +0200)
Only ignore errors in every internal usage, in order to cancel as a best
effort and don't clobber a likely more meaningful exception.

psycopg/psycopg/_connection_base.py
psycopg/psycopg/_copy.py
psycopg/psycopg/_copy_async.py
psycopg/psycopg/connection.py
psycopg/psycopg/connection_async.py
psycopg/psycopg/cursor.py
psycopg/psycopg/cursor_async.py
tests/test_connection.py
tests/test_connection_async.py

index aede7e95ccb978d545ab332f89f826511904b81b..3c2ecf16e95eed6ae1e8f54d55233720697629c4 100644 (file)
@@ -293,16 +293,9 @@ class BaseConnection(Generic[Row]):
 
     def cancel(self) -> None:
         """Cancel the current operation on the connection."""
-        if not self._should_cancel():
-            return
-
-        # Don't fail cancelling (which might happen on connection closing) to
-        # avoid clobbering eventual exceptions with ours, which is less important.
-        try:
+        if self._should_cancel():
             c = self.pgconn.get_cancel()
             c.cancel()
-        except Exception as ex:
-            logger.warning("couldn't try to cancel query: %s", ex)
 
     def _should_cancel(self) -> bool:
         """Check whether the current command should actually be cancelled when
index 844b74de56763dc3eadfd2d93d9c02c6f260405b..d4db84a2d94fc1155d3c84bf3cf88d78841b5fcd 100644 (file)
@@ -160,7 +160,7 @@ class Copy(BaseCopy["Connection[Any]"]):
             # (which might or might not have been already transferred entirely to
             # the client, so we won't necessary see the exception associated with
             # canceling).
-            self.connection.cancel_safe()
+            self.connection._try_cancel(timeout=5.0)
             self.connection.wait(self._end_copy_out_gen())
 
 
index 16bc714be6522ac4232bc22cf76e8633604aa37c..a371f185dc1eae44f7c12c091ac36c78db24b8ca 100644 (file)
@@ -159,7 +159,7 @@ class AsyncCopy(BaseCopy["AsyncConnection[Any]"]):
             # (which might or might not have been already transferred entirely to
             # the client, so we won't necessary see the exception associated with
             # canceling).
-            await self.connection.cancel_safe()
+            await self.connection._try_cancel(timeout=5.0)
             await self.connection.wait(self._end_copy_out_gen())
 
 
index a4680d33ddaddeaff122615eb823ab30019d9ed2..ad233c3754f866812a884f658052a09838b2bca9 100644 (file)
@@ -282,15 +282,18 @@ class Connection(BaseConnection[Row]):
             return
 
         if capabilities.has_cancel_safe():
-            try:
-                waiting.wait_conn(
-                    self._cancel_gen(timeout=timeout), interval=_WAIT_INTERVAL
-                )
-            except Exception as ex:
-                logger.warning("couldn't try to cancel query: %s", ex)
+            waiting.wait_conn(
+                self._cancel_gen(timeout=timeout), interval=_WAIT_INTERVAL
+            )
         else:
             self.cancel()
 
+    def _try_cancel(self, *, timeout: float = 30.0) -> None:
+        try:
+            self.cancel_safe(timeout=timeout)
+        except Exception as ex:
+            logger.warning("query cancellation failed: %s", ex)
+
     @contextmanager
     def transaction(
         self, savepoint_name: Optional[str] = None, force_rollback: bool = False
@@ -396,7 +399,7 @@ class Connection(BaseConnection[Row]):
             if self.pgconn.transaction_status == ACTIVE:
                 # On Ctrl-C, try to cancel the query in the server, otherwise
                 # the connection will remain stuck in ACTIVE state.
-                self.cancel_safe()
+                self._try_cancel(timeout=5.0)
                 try:
                     waiting.wait(gen, self.pgconn.socket, interval=interval)
                 except e.QueryCanceled:
index f718cbe3ed0020aee1d682d9e6a5b7b463ef34a3..78501852271a8aa53e2d422f53e75ab0e1549aa4 100644 (file)
@@ -299,19 +299,21 @@ class AsyncConnection(BaseConnection[Row]):
             return
 
         if capabilities.has_cancel_safe():
-            try:
-                await waiting.wait_conn_async(
-                    self._cancel_gen(timeout=timeout), interval=_WAIT_INTERVAL
-                )
-            except Exception as ex:
-                logger.warning("couldn't try to cancel query: %s", ex)
-
+            await waiting.wait_conn_async(
+                self._cancel_gen(timeout=timeout), interval=_WAIT_INTERVAL
+            )
         else:
             if True:  # ASYNC
                 await to_thread(self.cancel)
             else:
                 self.cancel()
 
+    async def _try_cancel(self, *, timeout: float = 30.0) -> None:
+        try:
+            await self.cancel_safe(timeout=timeout)
+        except Exception as ex:
+            logger.warning("query cancellation failed: %s", ex)
+
     @asynccontextmanager
     async def transaction(
         self, savepoint_name: Optional[str] = None, force_rollback: bool = False
@@ -419,7 +421,7 @@ class AsyncConnection(BaseConnection[Row]):
             if self.pgconn.transaction_status == ACTIVE:
                 # On Ctrl-C, try to cancel the query in the server, otherwise
                 # the connection will remain stuck in ACTIVE state.
-                await self.cancel_safe()
+                await self._try_cancel(timeout=5.0)
                 try:
                     await waiting.wait_async(gen, self.pgconn.socket, interval=interval)
                 except e.QueryCanceled:
index 099076b516e51fac92e5b1f0b4953eb57fdcfd3c..b49c3e5afdbe4527ba18d3b58248613963e06d28 100644 (file)
@@ -159,7 +159,7 @@ class Cursor(BaseCursor["Connection[Any]", Row]):
                 if self._pgconn.transaction_status == ACTIVE:
                     # Try to cancel the query, then consume the results
                     # already received.
-                    self._conn.cancel_safe()
+                    self._conn._try_cancel(timeout=5.0)
                     try:
                         while self._conn.wait(self._stream_fetchone_gen(first=False)):
                             pass
index 485cf0e9358a7d154dd5ad88e3ffd410835d99b7..46700da526c6f37db66fdd62442d4bad92be65f5 100644 (file)
@@ -164,7 +164,7 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
                 if self._pgconn.transaction_status == ACTIVE:
                     # Try to cancel the query, then consume the results
                     # already received.
-                    await self._conn.cancel_safe()
+                    await self._conn._try_cancel(timeout=5.0)
                     try:
                         while await self._conn.wait(
                             self._stream_fetchone_gen(first=False)
index c9a7c7bfe477f368195e4ba780361d8a06b1fd43..2acf0d747d2a1495b0aa1221fa579c7b78a3908d 100644 (file)
@@ -825,6 +825,25 @@ def test_cancel_safe_closed(conn):
     conn.cancel_safe()
 
 
+@pytest.mark.slow
+@pytest.mark.timing
+def test_cancel_safe_error(conn_cls, proxy, caplog):
+    caplog.set_level(logging.WARNING, logger="psycopg")
+    proxy.start()
+    with conn_cls.connect(proxy.client_dsn) as conn:
+        proxy.stop()
+        with pytest.raises(
+            e.OperationalError, match="(Connection refused)|(connect\\(\\) failed)"
+        ) as ex:
+            conn.cancel_safe(timeout=2)
+        assert not caplog.records
+
+        # Note: testing an internal method. It's ok if this behaviour changes
+        conn._try_cancel(timeout=2.0)
+        assert len(caplog.records) == 1
+        caplog.records[0].message == str(ex.value)
+
+
 @pytest.mark.slow
 @pytest.mark.timing
 @pytest.mark.libpq(">= 17")
index 7d0932ea222945150aa64cd896c08c963cfbf691..f5c355b2581ee61d3dfcb57b5f2593c070741ffa 100644 (file)
@@ -829,6 +829,25 @@ async def test_cancel_safe_closed(aconn):
     await aconn.cancel_safe()
 
 
+@pytest.mark.slow
+@pytest.mark.timing
+async def test_cancel_safe_error(aconn_cls, proxy, caplog):
+    caplog.set_level(logging.WARNING, logger="psycopg")
+    proxy.start()
+    async with await aconn_cls.connect(proxy.client_dsn) as aconn:
+        proxy.stop()
+        with pytest.raises(
+            e.OperationalError, match=r"(Connection refused)|(connect\(\) failed)"
+        ) as ex:
+            await aconn.cancel_safe(timeout=2)
+        assert not caplog.records
+
+        # Note: testing an internal method. It's ok if this behaviour changes
+        await aconn._try_cancel(timeout=2.0)
+        assert len(caplog.records) == 1
+        caplog.records[0].message == str(ex.value)
+
+
 @pytest.mark.slow
 @pytest.mark.timing
 @pytest.mark.libpq(">= 17")