]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
feat: add encrypted and non-blocking cancellation
authorDenis Laxalde <denis.laxalde@dalibo.com>
Fri, 24 Mar 2023 13:55:18 +0000 (14:55 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 9 Apr 2024 10:07:43 +0000 (12:07 +0200)
We introduce Connection.cancel_safe() which uses the encrypted and
non-blocking libpq cancellation API available with PostgreSQL 17. As a
non-blocking entry point, cancel_safe() delegates to a generator function,
namely _cancel_gen(). If the libpq version is too old, the method raises
a NotSupportedError.

CTRL+C handling (in Connection.wait() or Cursor.stream()) also uses the
non-blocking cancellation but falls back to old method if the former is
not supported.

The behavior of cancel() method (either on Connection or
AsyncConnection) is kept unchanged to use the old cancellation API.

12 files changed:
docs/api/connections.rst
docs/news.rst
psycopg/psycopg/_connection_base.py
psycopg/psycopg/connection.py
psycopg/psycopg/connection_async.py
psycopg/psycopg/cursor.py
psycopg/psycopg/cursor_async.py
tests/test_concurrency_async.py
tests/test_connection.py
tests/test_connection_async.py
tests/test_tpc.py
tests/test_tpc_async.py

index 11f29a8b93e4e025d9a69db55fef51d136b5243a..71f10dbc9e2901d5084e3fe0678e27859358fee0 100644 (file)
@@ -286,6 +286,7 @@ The `!Connection` class
     .. rubric:: Methods you can use to do something cool
 
     .. automethod:: cancel
+    .. automethod:: cancel_safe
 
     .. automethod:: notifies
 
index 42ca9ecafb6e8f17cdb085f71065273beb5f52a9..62687abc069f750645c3f8db440d7518b40f23f1 100644 (file)
@@ -35,6 +35,10 @@ Psycopg 3.2 (unreleased)
   in non-pipeline mode and not totally reliable (:ticket:`#604`).
   The `Cursor` now only preserves the results set of the last
   `~Cursor.execute()`, consistently with non-pipeline mode.
+- Add `~Connection.cancel_safe()` for encrypted and non-blocking cancellation
+  (:ticket:`#754`).
+- Possibly use non-blocking cancellation upon `KeyboardInterrupt`
+  (:ticket:`#754`).
 
 .. __: https://numpy.org/doc/stable/reference/arrays.scalars.html#built-in-scalar-types
 
index 013c0c10c97a308a68c45814a6179b1b4a399334..4fdda67b563e625226236cf7908b96cde54ca5cf 100644 (file)
@@ -293,21 +293,29 @@ class BaseConnection(Generic[Row]):
 
     def cancel(self) -> None:
         """Cancel the current operation on the connection."""
-        # No-op if the connection is closed
+        if self._should_cancel():
+            self._try_cancel(self.pgconn)
+
+    def _should_cancel(self) -> bool:
+        """Check whether the current command should actually be cancelled when
+        invoking cancel*().
+        """
+        # cancel() is a no-op if the connection is closed;
         # this allows to use the method as callback handler without caring
         # about its life.
         if self.closed:
-            return
-
+            return False
         if self._tpc and self._tpc[1]:
             raise e.ProgrammingError(
                 "cancel() cannot be used with a prepared two-phase transaction"
             )
-
-        self._try_cancel(self.pgconn)
+        return True
 
     @classmethod
     def _try_cancel(cls, pgconn: "PGconn") -> None:
+        """Try to cancel the current command using a PGcancel object,
+        which is deprecated since libpq 17.
+        """
         try:
             # Can fail if the connection is closed
             c = pgconn.get_cancel()
@@ -316,6 +324,15 @@ class BaseConnection(Generic[Row]):
         else:
             c.cancel()
 
+    def _cancel_gen(self) -> PQGenConn[None]:
+        try:
+            cancel_conn = self.pgconn.cancel_conn()
+        except e.OperationalError as ex:  # if the connection is closed
+            logger.warning("couldn't create a cancel connection: %s", ex)
+        else:
+            cancel_conn.start()
+            yield from generators.cancel(cancel_conn)
+
     def add_notice_handler(self, callback: NoticeHandler) -> None:
         """
         Register a callable to be invoked when a notice message is received.
index 65c1b1dcd088bfdf20e882e6eae33d309c0a8255..b5430122d01602c915af1acacb9c3cdff2c0f8e6 100644 (file)
@@ -261,6 +261,22 @@ class Connection(BaseConnection[Row]):
         with self.lock:
             self.wait(self._rollback_gen())
 
+    def cancel_safe(self) -> None:
+        """Cancel the current operation on the connection.
+
+        This is a non-blocking version of `cancel()` which leverages a more
+        secure and improved cancellation feature of the libpq (available from
+        version 17).
+
+        In contrast with `cancel()`, it is not appropriate for use in a signal
+        handler.
+
+        :raises ~psycopg.NotSupportedError: if the underlying libpq is older
+            than version 17.
+        """
+        if self._should_cancel():
+            waiting.wait_conn(self._cancel_gen(), interval=_WAIT_INTERVAL)
+
     @contextmanager
     def transaction(
         self, savepoint_name: Optional[str] = None, force_rollback: bool = False
@@ -366,7 +382,10 @@ class Connection(BaseConnection[Row]):
             if self.pgconn.transaction_status == ACTIVE:
                 # On Ctrl-C, try to cancel the query in the server, otherwise
                 # the connection will remain stuck in ACTIVE state.
-                self._try_cancel(self.pgconn)
+                try:
+                    self.cancel_safe()
+                except e.NotSupportedError:
+                    self.cancel()
                 try:
                     waiting.wait(gen, self.pgconn.socket, interval=interval)
                 except e.QueryCanceled:
index 4e67c5ef8546ba57924966dd2f4886ec58b5b511..3b2022a69dae68455f254dbd5a36357d71f6ba0c 100644 (file)
@@ -277,6 +277,22 @@ class AsyncConnection(BaseConnection[Row]):
         async with self.lock:
             await self.wait(self._rollback_gen())
 
+    async def cancel_safe(self) -> None:
+        """Cancel the current operation on the connection.
+
+        This is a non-blocking version of `cancel()` which leverages a more
+        secure and improved cancellation feature of the libpq (available from
+        version 17).
+
+        In contrast with `cancel()`, it is not appropriate for use in a signal
+        handler.
+
+        :raises ~psycopg.NotSupportedError: if the underlying libpq is older
+            than version 17.
+        """
+        if self._should_cancel():
+            await waiting.wait_conn_async(self._cancel_gen(), interval=_WAIT_INTERVAL)
+
     @asynccontextmanager
     async def transaction(
         self, savepoint_name: Optional[str] = None, force_rollback: bool = False
@@ -384,7 +400,10 @@ class AsyncConnection(BaseConnection[Row]):
             if self.pgconn.transaction_status == ACTIVE:
                 # On Ctrl-C, try to cancel the query in the server, otherwise
                 # the connection will remain stuck in ACTIVE state.
-                self._try_cancel(self.pgconn)
+                try:
+                    await self.cancel_safe()
+                except e.NotSupportedError:
+                    self.cancel()
                 try:
                     await waiting.wait_async(gen, self.pgconn.socket, interval=interval)
                 except e.QueryCanceled:
index 6b48929bce9348de6c5f557e3179f04eb8eacd44..530bbd364d2e0b10f9319eb0b126e7443017c2bf 100644 (file)
@@ -159,7 +159,10 @@ class Cursor(BaseCursor["Connection[Any]", Row]):
                 if self._pgconn.transaction_status == ACTIVE:
                     # Try to cancel the query, then consume the results
                     # already received.
-                    self._conn.cancel()
+                    try:
+                        self._conn.cancel_safe()
+                    except e.NotSupportedError:
+                        self._conn.cancel()
                     try:
                         while self._conn.wait(self._stream_fetchone_gen(first=False)):
                             pass
index 55dc9a5c2616f29e38aa7ea68e9b5bc09e61cad8..2ea18ee54ac93086153e2728775e702ee22acd6e 100644 (file)
@@ -164,7 +164,10 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
                 if self._pgconn.transaction_status == ACTIVE:
                     # Try to cancel the query, then consume the results
                     # already received.
-                    self._conn.cancel()
+                    try:
+                        await self._conn.cancel_safe()
+                    except e.NotSupportedError:
+                        self._conn.cancel()
                     try:
                         while await self._conn.wait(
                             self._stream_fetchone_gen(first=False)
index a14fb93e6569bbc5a53dc819e897be2a539d287d..98d659d689300726ddfc183e73e0ef5d8453b066 100644 (file)
@@ -61,7 +61,10 @@ async def test_concurrent_execution(aconn_cls, dsn):
 async def canceller(aconn, errors):
     try:
         await asyncio.sleep(0.5)
-        aconn.cancel()
+        try:
+            await aconn.cancel_safe()
+        except e.NotSupportedError:
+            aconn.cancel()
     except Exception as exc:
         errors.append(exc)
 
index eb854123817b33357b11443cff1cf3d6f04583a0..e4dd61a9824ee161362217a3463926e50d12543a 100644 (file)
@@ -817,6 +817,11 @@ def test_cancel_closed(conn):
     conn.cancel()
 
 
+def test_cancel_safe_closed(conn):
+    conn.close()
+    conn.cancel_safe()
+
+
 def test_resolve_hostaddr_conn(conn_cls, monkeypatch, fake_resolve):
     got = ""
 
index cd761b330ede8f3b43f497c287ffcfda87189bc2..450492177d3802105a7db0e93d119ea70d92e2aa 100644 (file)
@@ -821,6 +821,11 @@ async def test_cancel_closed(aconn):
     aconn.cancel()
 
 
+async def test_cancel_safe_closed(aconn):
+    await aconn.close()
+    await aconn.cancel_safe()
+
+
 async def test_resolve_hostaddr_conn(aconn_cls, monkeypatch, fake_resolve):
     got = ""
 
index 864f8a7868e77b3616cd13460a19138b87d1ec69..90ba2f09d2223756d236caf78669a7c19378cb55 100644 (file)
@@ -277,6 +277,8 @@ class TestTPC:
         conn.tpc_prepare()
         with pytest.raises(psycopg.ProgrammingError):
             conn.cancel()
+        with pytest.raises(psycopg.ProgrammingError):
+            conn.cancel_safe()
 
     def test_tpc_recover_non_dbapi_connection(self, conn_cls, conn, dsn, tpc):
         conn.row_factory = psycopg.rows.dict_row
index 3fa9493301bcdca7080b68c64a3e84bf4ed60feb..8a448c7147ed1cab9862b2c61be81bfb5b5dd006 100644 (file)
@@ -285,6 +285,8 @@ class TestTPC:
         await aconn.tpc_prepare()
         with pytest.raises(psycopg.ProgrammingError):
             aconn.cancel()
+        with pytest.raises(psycopg.ProgrammingError):
+            await aconn.cancel_safe()
 
     async def test_tpc_recover_non_dbapi_connection(self, aconn_cls, aconn, dsn, tpc):
         aconn.row_factory = psycopg.rows.dict_row