You can use the `~Capabilities.has_cancel_safe` capability to check
if `!cancel_safe()` will not fall back on the legacy implementation.
+ .. warning::
+
+ The `timeout` parameter has no effect for libpq older than version
+ 17.
+
.. warning::
This method shouldn't be used as a `~signal.signal` handler.
ones.
.. autoexception:: ConnectionTimeout()
+.. autoexception:: CancellationTimeout()
.. autoexception:: PipelineAborted()
)
return True
- def _cancel_gen(self) -> PQGenConn[None]:
+ def _cancel_gen(self, *, timeout: float) -> PQGenConn[None]:
cancel_conn = self.pgconn.cancel_conn()
cancel_conn.start()
- yield from generators.cancel(cancel_conn)
+ yield from generators.cancel(cancel_conn, timeout=timeout)
def add_notice_handler(self, callback: NoticeHandler) -> None:
"""
with self.lock:
self.wait(self._rollback_gen())
- def cancel_safe(self) -> None:
+ def cancel_safe(self, *, timeout: float = 30.0) -> None:
"""Cancel the current operation on the connection.
This is a non-blocking version of `~Connection.cancel()` which
If the underlying libpq is older than version 17, the method will fall
back to using the same implementation of `!cancel()`.
+
+ :raises ~psycopg.errors.CancellationTimeout: If the cancellation did
+ not terminate within specified timeout.
"""
if not self._should_cancel():
return
# TODO: replace with capabilities.has_safe_cancel after merging #782
if pq.__build_version__ >= 170000:
try:
- waiting.wait_conn(self._cancel_gen(), interval=_WAIT_INTERVAL)
+ waiting.wait_conn(
+ self._cancel_gen(timeout=timeout), interval=_WAIT_INTERVAL
+ )
except Exception as ex:
logger.warning("couldn't try to cancel query: %s", ex)
else:
async with self.lock:
await self.wait(self._rollback_gen())
- async def cancel_safe(self) -> None:
+ async def cancel_safe(self, *, timeout: float = 30.0) -> None:
"""Cancel the current operation on the connection.
This is a non-blocking version of `~Connection.cancel()` which
If the underlying libpq is older than version 17, the method will fall
back to using the same implementation of `!cancel()`.
+
+ :raises ~psycopg.errors.CancellationTimeout: If the cancellation did
+ not terminate within specified timeout.
"""
if not self._should_cancel():
return
if pq.__build_version__ >= 170000:
try:
await waiting.wait_conn_async(
- self._cancel_gen(), interval=_WAIT_INTERVAL
+ self._cancel_gen(timeout=timeout), interval=_WAIT_INTERVAL
)
except Exception as ex:
logger.warning("couldn't try to cancel query: %s", ex)
"""
+class CancellationTimeout(OperationalError):
+ """
+ Exception raised on timeout of connection's
+ `~psycopg.Connection.cancel_safe()` method.
+
+ Subclass of `~psycopg.OperationalError`.
+ """
+
+
class PipelineAborted(OperationalError):
"""
Raised when a operation fails because the current pipeline is in aborted state.
return conn
-def _cancel(cancel_conn: PGcancelConn) -> PQGenConn[None]:
+def _cancel(cancel_conn: PGcancelConn, *, timeout: float = 0.0) -> PQGenConn[None]:
+ deadline = monotonic() + timeout if timeout else 0.0
while True:
+ if deadline and monotonic() > deadline:
+ raise e.CancellationTimeout("cancellation timeout expired")
status = cancel_conn.poll()
if status == POLL_OK:
break
# Generators
def connect(conninfo: str, *, timeout: float = 0.0) -> abc.PQGenConn[PGconn]: ...
-def cancel(cancel_conn: PGcancelConn) -> abc.PQGenConn[None]: ...
+def cancel(
+ cancel_conn: PGcancelConn, *, timeout: float = 0.0
+) -> abc.PQGenConn[None]: ...
def execute(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ...
def send(pgconn: PGconn) -> abc.PQGen[None]: ...
def fetch_many(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ...
return conn
-def cancel(pq.PGcancelConn cancel_conn) -> PQGenConn[None]:
+def cancel(pq.PGcancelConn cancel_conn, *, timeout: float = 0.0) -> PQGenConn[None]:
cdef libpq.PGcancelConn *pgcancelconn_ptr = cancel_conn.pgcancelconn_ptr
cdef int status
+ cdef float deadline = 0.0
+
+ if timeout:
+ deadline = monotonic() + timeout
+
while True:
+ if deadline and monotonic() > deadline:
+ raise e.CancellationTimeout("cancellation timeout expired")
with nogil:
status = libpq.PQcancelPoll(pgcancelconn_ptr)
if status == libpq.PGRES_POLLING_OK:
conn.cancel_safe()
+@pytest.mark.slow
+@pytest.mark.timing
+@pytest.mark.libpq(">= 17")
+def test_cancel_safe_timeout(conn_cls, proxy):
+ proxy.start()
+ with conn_cls.connect(proxy.client_dsn) as conn:
+ proxy.stop()
+ with proxy.deaf_listen():
+ t0 = time.time()
+ with pytest.raises(e.CancellationTimeout, match="timeout expired"):
+ conn.cancel_safe(timeout=1)
+ elapsed = time.time() - t0
+ assert elapsed == pytest.approx(1.0, abs=0.05)
+
+
def test_resolve_hostaddr_conn(conn_cls, monkeypatch, fake_resolve):
got = ""
await aconn.cancel_safe()
+@pytest.mark.slow
+@pytest.mark.timing
+@pytest.mark.libpq(">= 17")
+async def test_cancel_safe_timeout(aconn_cls, proxy):
+ proxy.start()
+ async with await aconn_cls.connect(proxy.client_dsn) as aconn:
+ proxy.stop()
+ with proxy.deaf_listen():
+ t0 = time.time()
+ with pytest.raises(e.CancellationTimeout, match="timeout expired"):
+ await aconn.cancel_safe(timeout=1)
+ elapsed = time.time() - t0
+ assert elapsed == pytest.approx(1.0, abs=0.05)
+
+
async def test_resolve_hostaddr_conn(aconn_cls, monkeypatch, fake_resolve):
got = ""