From: Denis Laxalde Date: Fri, 24 Mar 2023 13:55:18 +0000 (+0100) Subject: feat: add encrypted and non-blocking cancellation X-Git-Tag: 3.2.0~54^2~5 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=eb35c81c43aa9912b5193585b3d81a5568430b8b;p=thirdparty%2Fpsycopg.git feat: add encrypted and non-blocking cancellation We introduce Connection.cancel_safe() which uses the encrypted and non-blocking libpq cancellation API available with PostgreSQL 17. As a non-blocking entry point, cancel_safe() delegates to a generator function, namely _cancel_gen(). If the libpq version is too old, the method raises a NotSupportedError. CTRL+C handling (in Connection.wait() or Cursor.stream()) also uses the non-blocking cancellation but falls back to old method if the former is not supported. The behavior of cancel() method (either on Connection or AsyncConnection) is kept unchanged to use the old cancellation API. --- diff --git a/docs/api/connections.rst b/docs/api/connections.rst index 11f29a8b9..71f10dbc9 100644 --- a/docs/api/connections.rst +++ b/docs/api/connections.rst @@ -286,6 +286,7 @@ The `!Connection` class .. rubric:: Methods you can use to do something cool .. automethod:: cancel + .. automethod:: cancel_safe .. automethod:: notifies diff --git a/docs/news.rst b/docs/news.rst index 42ca9ecaf..62687abc0 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -35,6 +35,10 @@ Psycopg 3.2 (unreleased) in non-pipeline mode and not totally reliable (:ticket:`#604`). The `Cursor` now only preserves the results set of the last `~Cursor.execute()`, consistently with non-pipeline mode. +- Add `~Connection.cancel_safe()` for encrypted and non-blocking cancellation + (:ticket:`#754`). +- Possibly use non-blocking cancellation upon `KeyboardInterrupt` + (:ticket:`#754`). .. __: https://numpy.org/doc/stable/reference/arrays.scalars.html#built-in-scalar-types diff --git a/psycopg/psycopg/_connection_base.py b/psycopg/psycopg/_connection_base.py index 013c0c10c..4fdda67b5 100644 --- a/psycopg/psycopg/_connection_base.py +++ b/psycopg/psycopg/_connection_base.py @@ -293,21 +293,29 @@ class BaseConnection(Generic[Row]): def cancel(self) -> None: """Cancel the current operation on the connection.""" - # No-op if the connection is closed + if self._should_cancel(): + self._try_cancel(self.pgconn) + + def _should_cancel(self) -> bool: + """Check whether the current command should actually be cancelled when + invoking cancel*(). + """ + # cancel() is a no-op if the connection is closed; # this allows to use the method as callback handler without caring # about its life. if self.closed: - return - + return False if self._tpc and self._tpc[1]: raise e.ProgrammingError( "cancel() cannot be used with a prepared two-phase transaction" ) - - self._try_cancel(self.pgconn) + return True @classmethod def _try_cancel(cls, pgconn: "PGconn") -> None: + """Try to cancel the current command using a PGcancel object, + which is deprecated since libpq 17. + """ try: # Can fail if the connection is closed c = pgconn.get_cancel() @@ -316,6 +324,15 @@ class BaseConnection(Generic[Row]): else: c.cancel() + def _cancel_gen(self) -> PQGenConn[None]: + try: + cancel_conn = self.pgconn.cancel_conn() + except e.OperationalError as ex: # if the connection is closed + logger.warning("couldn't create a cancel connection: %s", ex) + else: + cancel_conn.start() + yield from generators.cancel(cancel_conn) + def add_notice_handler(self, callback: NoticeHandler) -> None: """ Register a callable to be invoked when a notice message is received. diff --git a/psycopg/psycopg/connection.py b/psycopg/psycopg/connection.py index 65c1b1dcd..b5430122d 100644 --- a/psycopg/psycopg/connection.py +++ b/psycopg/psycopg/connection.py @@ -261,6 +261,22 @@ class Connection(BaseConnection[Row]): with self.lock: self.wait(self._rollback_gen()) + def cancel_safe(self) -> None: + """Cancel the current operation on the connection. + + This is a non-blocking version of `cancel()` which leverages a more + secure and improved cancellation feature of the libpq (available from + version 17). + + In contrast with `cancel()`, it is not appropriate for use in a signal + handler. + + :raises ~psycopg.NotSupportedError: if the underlying libpq is older + than version 17. + """ + if self._should_cancel(): + waiting.wait_conn(self._cancel_gen(), interval=_WAIT_INTERVAL) + @contextmanager def transaction( self, savepoint_name: Optional[str] = None, force_rollback: bool = False @@ -366,7 +382,10 @@ class Connection(BaseConnection[Row]): if self.pgconn.transaction_status == ACTIVE: # On Ctrl-C, try to cancel the query in the server, otherwise # the connection will remain stuck in ACTIVE state. - self._try_cancel(self.pgconn) + try: + self.cancel_safe() + except e.NotSupportedError: + self.cancel() try: waiting.wait(gen, self.pgconn.socket, interval=interval) except e.QueryCanceled: diff --git a/psycopg/psycopg/connection_async.py b/psycopg/psycopg/connection_async.py index 4e67c5ef8..3b2022a69 100644 --- a/psycopg/psycopg/connection_async.py +++ b/psycopg/psycopg/connection_async.py @@ -277,6 +277,22 @@ class AsyncConnection(BaseConnection[Row]): async with self.lock: await self.wait(self._rollback_gen()) + async def cancel_safe(self) -> None: + """Cancel the current operation on the connection. + + This is a non-blocking version of `cancel()` which leverages a more + secure and improved cancellation feature of the libpq (available from + version 17). + + In contrast with `cancel()`, it is not appropriate for use in a signal + handler. + + :raises ~psycopg.NotSupportedError: if the underlying libpq is older + than version 17. + """ + if self._should_cancel(): + await waiting.wait_conn_async(self._cancel_gen(), interval=_WAIT_INTERVAL) + @asynccontextmanager async def transaction( self, savepoint_name: Optional[str] = None, force_rollback: bool = False @@ -384,7 +400,10 @@ class AsyncConnection(BaseConnection[Row]): if self.pgconn.transaction_status == ACTIVE: # On Ctrl-C, try to cancel the query in the server, otherwise # the connection will remain stuck in ACTIVE state. - self._try_cancel(self.pgconn) + try: + await self.cancel_safe() + except e.NotSupportedError: + self.cancel() try: await waiting.wait_async(gen, self.pgconn.socket, interval=interval) except e.QueryCanceled: diff --git a/psycopg/psycopg/cursor.py b/psycopg/psycopg/cursor.py index 6b48929bc..530bbd364 100644 --- a/psycopg/psycopg/cursor.py +++ b/psycopg/psycopg/cursor.py @@ -159,7 +159,10 @@ class Cursor(BaseCursor["Connection[Any]", Row]): if self._pgconn.transaction_status == ACTIVE: # Try to cancel the query, then consume the results # already received. - self._conn.cancel() + try: + self._conn.cancel_safe() + except e.NotSupportedError: + self._conn.cancel() try: while self._conn.wait(self._stream_fetchone_gen(first=False)): pass diff --git a/psycopg/psycopg/cursor_async.py b/psycopg/psycopg/cursor_async.py index 55dc9a5c2..2ea18ee54 100644 --- a/psycopg/psycopg/cursor_async.py +++ b/psycopg/psycopg/cursor_async.py @@ -164,7 +164,10 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]): if self._pgconn.transaction_status == ACTIVE: # Try to cancel the query, then consume the results # already received. - self._conn.cancel() + try: + await self._conn.cancel_safe() + except e.NotSupportedError: + self._conn.cancel() try: while await self._conn.wait( self._stream_fetchone_gen(first=False) diff --git a/tests/test_concurrency_async.py b/tests/test_concurrency_async.py index a14fb93e6..98d659d68 100644 --- a/tests/test_concurrency_async.py +++ b/tests/test_concurrency_async.py @@ -61,7 +61,10 @@ async def test_concurrent_execution(aconn_cls, dsn): async def canceller(aconn, errors): try: await asyncio.sleep(0.5) - aconn.cancel() + try: + await aconn.cancel_safe() + except e.NotSupportedError: + aconn.cancel() except Exception as exc: errors.append(exc) diff --git a/tests/test_connection.py b/tests/test_connection.py index eb8541238..e4dd61a98 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -817,6 +817,11 @@ def test_cancel_closed(conn): conn.cancel() +def test_cancel_safe_closed(conn): + conn.close() + conn.cancel_safe() + + def test_resolve_hostaddr_conn(conn_cls, monkeypatch, fake_resolve): got = "" diff --git a/tests/test_connection_async.py b/tests/test_connection_async.py index cd761b330..450492177 100644 --- a/tests/test_connection_async.py +++ b/tests/test_connection_async.py @@ -821,6 +821,11 @@ async def test_cancel_closed(aconn): aconn.cancel() +async def test_cancel_safe_closed(aconn): + await aconn.close() + await aconn.cancel_safe() + + async def test_resolve_hostaddr_conn(aconn_cls, monkeypatch, fake_resolve): got = "" diff --git a/tests/test_tpc.py b/tests/test_tpc.py index 864f8a786..90ba2f09d 100644 --- a/tests/test_tpc.py +++ b/tests/test_tpc.py @@ -277,6 +277,8 @@ class TestTPC: conn.tpc_prepare() with pytest.raises(psycopg.ProgrammingError): conn.cancel() + with pytest.raises(psycopg.ProgrammingError): + conn.cancel_safe() def test_tpc_recover_non_dbapi_connection(self, conn_cls, conn, dsn, tpc): conn.row_factory = psycopg.rows.dict_row diff --git a/tests/test_tpc_async.py b/tests/test_tpc_async.py index 3fa949330..8a448c714 100644 --- a/tests/test_tpc_async.py +++ b/tests/test_tpc_async.py @@ -285,6 +285,8 @@ class TestTPC: await aconn.tpc_prepare() with pytest.raises(psycopg.ProgrammingError): aconn.cancel() + with pytest.raises(psycopg.ProgrammingError): + await aconn.cancel_safe() async def test_tpc_recover_non_dbapi_connection(self, aconn_cls, aconn, dsn, tpc): aconn.row_factory = psycopg.rows.dict_row