]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
feat: add a timeout parameter to Connection.cancel_safe()
authorDenis Laxalde <denis.laxalde@dalibo.com>
Mon, 8 Apr 2024 13:00:17 +0000 (15:00 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 17 Apr 2024 21:51:20 +0000 (23:51 +0200)
This will only work for PGcancelConn, i.e. libpq >= 17, or thanks to
added waiting logic in AsyncConnection's implementation; so we add a
note about the limitation in Connection's documentation.

docs/api/connections.rst
docs/api/errors.rst
psycopg/psycopg/_connection_base.py
psycopg/psycopg/connection.py
psycopg/psycopg/connection_async.py
psycopg/psycopg/errors.py
psycopg/psycopg/generators.py
psycopg_c/psycopg_c/_psycopg.pyi
psycopg_c/psycopg_c/_psycopg/generators.pyx
tests/test_connection.py
tests/test_connection_async.py

index 0f9462ce20aefea6fdfca5cff491e91c62cb825e..afa546cc2fc915ba4637c885088bebba05665969 100644 (file)
@@ -292,6 +292,11 @@ The `!Connection` class
             You can use the `~Capabilities.has_cancel_safe` capability to check
             if `!cancel_safe()` will not fall back on the legacy implementation.
 
+        .. warning::
+
+            The `timeout` parameter has no effect for libpq older than version
+            17.
+
         .. warning::
 
             This method shouldn't be used as a `~signal.signal` handler.
index 6d3b2aab5496fae3fa1eb86567952e4ce385fba6..a64cd74f322f6bd20ef035eb618e4f2d06635a1c 100644 (file)
@@ -85,6 +85,7 @@ In addition to the standard DB-API errors, Psycopg defines a few more specific
 ones.
 
 .. autoexception:: ConnectionTimeout()
+.. autoexception:: CancellationTimeout()
 .. autoexception:: PipelineAborted()
 
 
index 44a7cb4b596e0c01e65ee7ad4868d3d36891527b..aede7e95ccb978d545ab332f89f826511904b81b 100644 (file)
@@ -319,10 +319,10 @@ class BaseConnection(Generic[Row]):
             )
         return True
 
-    def _cancel_gen(self) -> PQGenConn[None]:
+    def _cancel_gen(self, *, timeout: float) -> PQGenConn[None]:
         cancel_conn = self.pgconn.cancel_conn()
         cancel_conn.start()
-        yield from generators.cancel(cancel_conn)
+        yield from generators.cancel(cancel_conn, timeout=timeout)
 
     def add_notice_handler(self, callback: NoticeHandler) -> None:
         """
index 04e7e73af77454d664c8d1d54cf49d42456e2d3d..9701770d8d800ec13d7e78de997e54007a713a4c 100644 (file)
@@ -261,7 +261,7 @@ class Connection(BaseConnection[Row]):
         with self.lock:
             self.wait(self._rollback_gen())
 
-    def cancel_safe(self) -> None:
+    def cancel_safe(self, *, timeout: float = 30.0) -> None:
         """Cancel the current operation on the connection.
 
         This is a non-blocking version of `~Connection.cancel()` which
@@ -270,6 +270,9 @@ class Connection(BaseConnection[Row]):
 
         If the underlying libpq is older than version 17, the method will fall
         back to using the same implementation of `!cancel()`.
+
+        :raises ~psycopg.errors.CancellationTimeout: If the cancellation did
+            not terminate within specified timeout.
         """
         if not self._should_cancel():
             return
@@ -277,7 +280,9 @@ class Connection(BaseConnection[Row]):
         # TODO: replace with capabilities.has_safe_cancel after merging #782
         if pq.__build_version__ >= 170000:
             try:
-                waiting.wait_conn(self._cancel_gen(), interval=_WAIT_INTERVAL)
+                waiting.wait_conn(
+                    self._cancel_gen(timeout=timeout), interval=_WAIT_INTERVAL
+                )
             except Exception as ex:
                 logger.warning("couldn't try to cancel query: %s", ex)
         else:
index faf3c512bef56493cbf9aabd8c61e20fdfa93625..beb4c8714dfc1db8b46d3bfb2e8ad166d7b3e5ad 100644 (file)
@@ -278,7 +278,7 @@ class AsyncConnection(BaseConnection[Row]):
         async with self.lock:
             await self.wait(self._rollback_gen())
 
-    async def cancel_safe(self) -> None:
+    async def cancel_safe(self, *, timeout: float = 30.0) -> None:
         """Cancel the current operation on the connection.
 
         This is a non-blocking version of `~Connection.cancel()` which
@@ -287,6 +287,9 @@ class AsyncConnection(BaseConnection[Row]):
 
         If the underlying libpq is older than version 17, the method will fall
         back to using the same implementation of `!cancel()`.
+
+        :raises ~psycopg.errors.CancellationTimeout: If the cancellation did
+            not terminate within specified timeout.
         """
         if not self._should_cancel():
             return
@@ -295,7 +298,7 @@ class AsyncConnection(BaseConnection[Row]):
         if pq.__build_version__ >= 170000:
             try:
                 await waiting.wait_conn_async(
-                    self._cancel_gen(), interval=_WAIT_INTERVAL
+                    self._cancel_gen(timeout=timeout), interval=_WAIT_INTERVAL
                 )
             except Exception as ex:
                 logger.warning("couldn't try to cancel query: %s", ex)
index 15afa06033da8aca31f17786627c3ca09f1574aa..e02fe8556288005be799caf87057a5829f9df5b4 100644 (file)
@@ -401,6 +401,15 @@ class ConnectionTimeout(OperationalError):
     """
 
 
+class CancellationTimeout(OperationalError):
+    """
+    Exception raised on timeout of connection's
+    `~psycopg.Connection.cancel_safe()` method.
+
+    Subclass of `~psycopg.OperationalError`.
+    """
+
+
 class PipelineAborted(OperationalError):
     """
     Raised when a operation fails because the current pipeline is in aborted state.
index 0c6098cf63ad12517136000b2760ce5ca7604199..489c9bb3216b93e50a0e284c4259ca5f91f075c8 100644 (file)
@@ -100,8 +100,11 @@ def _connect(conninfo: str, *, timeout: float = 0.0) -> PQGenConn[PGconn]:
     return conn
 
 
-def _cancel(cancel_conn: PGcancelConn) -> PQGenConn[None]:
+def _cancel(cancel_conn: PGcancelConn, *, timeout: float = 0.0) -> PQGenConn[None]:
+    deadline = monotonic() + timeout if timeout else 0.0
     while True:
+        if deadline and monotonic() > deadline:
+            raise e.CancellationTimeout("cancellation timeout expired")
         status = cancel_conn.poll()
         if status == POLL_OK:
             break
index 3881501918e4f459ba4988639e8a7c10785dae45..b653001096d7616bba6303e2a417305f451cb1f8 100644 (file)
@@ -52,7 +52,9 @@ class Transformer(abc.AdaptContext):
 
 # Generators
 def connect(conninfo: str, *, timeout: float = 0.0) -> abc.PQGenConn[PGconn]: ...
-def cancel(cancel_conn: PGcancelConn) -> abc.PQGenConn[None]: ...
+def cancel(
+    cancel_conn: PGcancelConn, *, timeout: float = 0.0
+) -> abc.PQGenConn[None]: ...
 def execute(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ...
 def send(pgconn: PGconn) -> abc.PQGen[None]: ...
 def fetch_many(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ...
index 7198fddf152bbc199ca9b91d8ae5ce6e403f393b..df06b1d1cbe23702700e3b5f822513588cc696a4 100644 (file)
@@ -81,10 +81,17 @@ def connect(conninfo: str, *, timeout: float = 0.0) -> PQGenConn[abc.PGconn]:
     return conn
 
 
-def cancel(pq.PGcancelConn cancel_conn) -> PQGenConn[None]:
+def cancel(pq.PGcancelConn cancel_conn, *, timeout: float = 0.0) -> PQGenConn[None]:
     cdef libpq.PGcancelConn *pgcancelconn_ptr = cancel_conn.pgcancelconn_ptr
     cdef int status
+    cdef float deadline = 0.0
+
+    if timeout:
+        deadline = monotonic() + timeout
+
     while True:
+        if deadline and monotonic() > deadline:
+            raise e.CancellationTimeout("cancellation timeout expired")
         with nogil:
             status = libpq.PQcancelPoll(pgcancelconn_ptr)
         if status == libpq.PGRES_POLLING_OK:
index b8ac939dded897bef8f962984d196f1bc5185cae..0f2f768be252634e3ff803cb3c12f35f5be76e17 100644 (file)
@@ -825,6 +825,21 @@ def test_cancel_safe_closed(conn):
     conn.cancel_safe()
 
 
+@pytest.mark.slow
+@pytest.mark.timing
+@pytest.mark.libpq(">= 17")
+def test_cancel_safe_timeout(conn_cls, proxy):
+    proxy.start()
+    with conn_cls.connect(proxy.client_dsn) as conn:
+        proxy.stop()
+        with proxy.deaf_listen():
+            t0 = time.time()
+            with pytest.raises(e.CancellationTimeout, match="timeout expired"):
+                conn.cancel_safe(timeout=1)
+    elapsed = time.time() - t0
+    assert elapsed == pytest.approx(1.0, abs=0.05)
+
+
 def test_resolve_hostaddr_conn(conn_cls, monkeypatch, fake_resolve):
     got = ""
 
index cb97c09874236d5de86a0fdc96fda9bea3758e80..cc293babfed2b37bec835eb654268e90eeb650fa 100644 (file)
@@ -829,6 +829,21 @@ async def test_cancel_safe_closed(aconn):
     await aconn.cancel_safe()
 
 
+@pytest.mark.slow
+@pytest.mark.timing
+@pytest.mark.libpq(">= 17")
+async def test_cancel_safe_timeout(aconn_cls, proxy):
+    proxy.start()
+    async with await aconn_cls.connect(proxy.client_dsn) as aconn:
+        proxy.stop()
+        with proxy.deaf_listen():
+            t0 = time.time()
+            with pytest.raises(e.CancellationTimeout, match="timeout expired"):
+                await aconn.cancel_safe(timeout=1)
+    elapsed = time.time() - t0
+    assert elapsed == pytest.approx(1.0, abs=0.05)
+
+
 async def test_resolve_hostaddr_conn(aconn_cls, monkeypatch, fake_resolve):
     got = ""