From 67ab8f2d6ab02148cad2799fe5b0a35918acf68b Mon Sep 17 00:00:00 2001 From: Denis Laxalde Date: Mon, 8 Apr 2024 15:00:17 +0200 Subject: [PATCH] feat: add a timeout parameter to Connection.cancel_safe() This will only work for PGcancelConn, i.e. libpq >= 17, or thanks to added waiting logic in AsyncConnection's implementation; so we add a note about the limitation in Connection's documentation. --- docs/api/connections.rst | 5 +++++ docs/api/errors.rst | 1 + psycopg/psycopg/_connection_base.py | 4 ++-- psycopg/psycopg/connection.py | 9 +++++++-- psycopg/psycopg/connection_async.py | 7 +++++-- psycopg/psycopg/errors.py | 9 +++++++++ psycopg/psycopg/generators.py | 5 ++++- psycopg_c/psycopg_c/_psycopg.pyi | 4 +++- psycopg_c/psycopg_c/_psycopg/generators.pyx | 9 ++++++++- tests/test_connection.py | 15 +++++++++++++++ tests/test_connection_async.py | 15 +++++++++++++++ 11 files changed, 74 insertions(+), 9 deletions(-) diff --git a/docs/api/connections.rst b/docs/api/connections.rst index 0f9462ce2..afa546cc2 100644 --- a/docs/api/connections.rst +++ b/docs/api/connections.rst @@ -292,6 +292,11 @@ The `!Connection` class You can use the `~Capabilities.has_cancel_safe` capability to check if `!cancel_safe()` will not fall back on the legacy implementation. + .. warning:: + + The `timeout` parameter has no effect for libpq older than version + 17. + .. warning:: This method shouldn't be used as a `~signal.signal` handler. diff --git a/docs/api/errors.rst b/docs/api/errors.rst index 6d3b2aab5..a64cd74f3 100644 --- a/docs/api/errors.rst +++ b/docs/api/errors.rst @@ -85,6 +85,7 @@ In addition to the standard DB-API errors, Psycopg defines a few more specific ones. .. autoexception:: ConnectionTimeout() +.. autoexception:: CancellationTimeout() .. autoexception:: PipelineAborted() diff --git a/psycopg/psycopg/_connection_base.py b/psycopg/psycopg/_connection_base.py index 44a7cb4b5..aede7e95c 100644 --- a/psycopg/psycopg/_connection_base.py +++ b/psycopg/psycopg/_connection_base.py @@ -319,10 +319,10 @@ class BaseConnection(Generic[Row]): ) return True - def _cancel_gen(self) -> PQGenConn[None]: + def _cancel_gen(self, *, timeout: float) -> PQGenConn[None]: cancel_conn = self.pgconn.cancel_conn() cancel_conn.start() - yield from generators.cancel(cancel_conn) + yield from generators.cancel(cancel_conn, timeout=timeout) def add_notice_handler(self, callback: NoticeHandler) -> None: """ diff --git a/psycopg/psycopg/connection.py b/psycopg/psycopg/connection.py index 04e7e73af..9701770d8 100644 --- a/psycopg/psycopg/connection.py +++ b/psycopg/psycopg/connection.py @@ -261,7 +261,7 @@ class Connection(BaseConnection[Row]): with self.lock: self.wait(self._rollback_gen()) - def cancel_safe(self) -> None: + def cancel_safe(self, *, timeout: float = 30.0) -> None: """Cancel the current operation on the connection. This is a non-blocking version of `~Connection.cancel()` which @@ -270,6 +270,9 @@ class Connection(BaseConnection[Row]): If the underlying libpq is older than version 17, the method will fall back to using the same implementation of `!cancel()`. + + :raises ~psycopg.errors.CancellationTimeout: If the cancellation did + not terminate within specified timeout. """ if not self._should_cancel(): return @@ -277,7 +280,9 @@ class Connection(BaseConnection[Row]): # TODO: replace with capabilities.has_safe_cancel after merging #782 if pq.__build_version__ >= 170000: try: - waiting.wait_conn(self._cancel_gen(), interval=_WAIT_INTERVAL) + waiting.wait_conn( + self._cancel_gen(timeout=timeout), interval=_WAIT_INTERVAL + ) except Exception as ex: logger.warning("couldn't try to cancel query: %s", ex) else: diff --git a/psycopg/psycopg/connection_async.py b/psycopg/psycopg/connection_async.py index faf3c512b..beb4c8714 100644 --- a/psycopg/psycopg/connection_async.py +++ b/psycopg/psycopg/connection_async.py @@ -278,7 +278,7 @@ class AsyncConnection(BaseConnection[Row]): async with self.lock: await self.wait(self._rollback_gen()) - async def cancel_safe(self) -> None: + async def cancel_safe(self, *, timeout: float = 30.0) -> None: """Cancel the current operation on the connection. This is a non-blocking version of `~Connection.cancel()` which @@ -287,6 +287,9 @@ class AsyncConnection(BaseConnection[Row]): If the underlying libpq is older than version 17, the method will fall back to using the same implementation of `!cancel()`. + + :raises ~psycopg.errors.CancellationTimeout: If the cancellation did + not terminate within specified timeout. """ if not self._should_cancel(): return @@ -295,7 +298,7 @@ class AsyncConnection(BaseConnection[Row]): if pq.__build_version__ >= 170000: try: await waiting.wait_conn_async( - self._cancel_gen(), interval=_WAIT_INTERVAL + self._cancel_gen(timeout=timeout), interval=_WAIT_INTERVAL ) except Exception as ex: logger.warning("couldn't try to cancel query: %s", ex) diff --git a/psycopg/psycopg/errors.py b/psycopg/psycopg/errors.py index 15afa0603..e02fe8556 100644 --- a/psycopg/psycopg/errors.py +++ b/psycopg/psycopg/errors.py @@ -401,6 +401,15 @@ class ConnectionTimeout(OperationalError): """ +class CancellationTimeout(OperationalError): + """ + Exception raised on timeout of connection's + `~psycopg.Connection.cancel_safe()` method. + + Subclass of `~psycopg.OperationalError`. + """ + + class PipelineAborted(OperationalError): """ Raised when a operation fails because the current pipeline is in aborted state. diff --git a/psycopg/psycopg/generators.py b/psycopg/psycopg/generators.py index 0c6098cf6..489c9bb32 100644 --- a/psycopg/psycopg/generators.py +++ b/psycopg/psycopg/generators.py @@ -100,8 +100,11 @@ def _connect(conninfo: str, *, timeout: float = 0.0) -> PQGenConn[PGconn]: return conn -def _cancel(cancel_conn: PGcancelConn) -> PQGenConn[None]: +def _cancel(cancel_conn: PGcancelConn, *, timeout: float = 0.0) -> PQGenConn[None]: + deadline = monotonic() + timeout if timeout else 0.0 while True: + if deadline and monotonic() > deadline: + raise e.CancellationTimeout("cancellation timeout expired") status = cancel_conn.poll() if status == POLL_OK: break diff --git a/psycopg_c/psycopg_c/_psycopg.pyi b/psycopg_c/psycopg_c/_psycopg.pyi index 388150191..b65300109 100644 --- a/psycopg_c/psycopg_c/_psycopg.pyi +++ b/psycopg_c/psycopg_c/_psycopg.pyi @@ -52,7 +52,9 @@ class Transformer(abc.AdaptContext): # Generators def connect(conninfo: str, *, timeout: float = 0.0) -> abc.PQGenConn[PGconn]: ... -def cancel(cancel_conn: PGcancelConn) -> abc.PQGenConn[None]: ... +def cancel( + cancel_conn: PGcancelConn, *, timeout: float = 0.0 +) -> abc.PQGenConn[None]: ... def execute(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ... def send(pgconn: PGconn) -> abc.PQGen[None]: ... def fetch_many(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ... diff --git a/psycopg_c/psycopg_c/_psycopg/generators.pyx b/psycopg_c/psycopg_c/_psycopg/generators.pyx index 7198fddf1..df06b1d1c 100644 --- a/psycopg_c/psycopg_c/_psycopg/generators.pyx +++ b/psycopg_c/psycopg_c/_psycopg/generators.pyx @@ -81,10 +81,17 @@ def connect(conninfo: str, *, timeout: float = 0.0) -> PQGenConn[abc.PGconn]: return conn -def cancel(pq.PGcancelConn cancel_conn) -> PQGenConn[None]: +def cancel(pq.PGcancelConn cancel_conn, *, timeout: float = 0.0) -> PQGenConn[None]: cdef libpq.PGcancelConn *pgcancelconn_ptr = cancel_conn.pgcancelconn_ptr cdef int status + cdef float deadline = 0.0 + + if timeout: + deadline = monotonic() + timeout + while True: + if deadline and monotonic() > deadline: + raise e.CancellationTimeout("cancellation timeout expired") with nogil: status = libpq.PQcancelPoll(pgcancelconn_ptr) if status == libpq.PGRES_POLLING_OK: diff --git a/tests/test_connection.py b/tests/test_connection.py index b8ac939dd..0f2f768be 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -825,6 +825,21 @@ def test_cancel_safe_closed(conn): conn.cancel_safe() +@pytest.mark.slow +@pytest.mark.timing +@pytest.mark.libpq(">= 17") +def test_cancel_safe_timeout(conn_cls, proxy): + proxy.start() + with conn_cls.connect(proxy.client_dsn) as conn: + proxy.stop() + with proxy.deaf_listen(): + t0 = time.time() + with pytest.raises(e.CancellationTimeout, match="timeout expired"): + conn.cancel_safe(timeout=1) + elapsed = time.time() - t0 + assert elapsed == pytest.approx(1.0, abs=0.05) + + def test_resolve_hostaddr_conn(conn_cls, monkeypatch, fake_resolve): got = "" diff --git a/tests/test_connection_async.py b/tests/test_connection_async.py index cb97c0987..cc293babf 100644 --- a/tests/test_connection_async.py +++ b/tests/test_connection_async.py @@ -829,6 +829,21 @@ async def test_cancel_safe_closed(aconn): await aconn.cancel_safe() +@pytest.mark.slow +@pytest.mark.timing +@pytest.mark.libpq(">= 17") +async def test_cancel_safe_timeout(aconn_cls, proxy): + proxy.start() + async with await aconn_cls.connect(proxy.client_dsn) as aconn: + proxy.stop() + with proxy.deaf_listen(): + t0 = time.time() + with pytest.raises(e.CancellationTimeout, match="timeout expired"): + await aconn.cancel_safe(timeout=1) + elapsed = time.time() - t0 + assert elapsed == pytest.approx(1.0, abs=0.05) + + async def test_resolve_hostaddr_conn(aconn_cls, monkeypatch, fake_resolve): got = "" -- 2.47.2