def cancel(self) -> None:
"""Cancel the current operation on the connection."""
- if not self._should_cancel():
- return
-
- # Don't fail cancelling (which might happen on connection closing) to
- # avoid clobbering eventual exceptions with ours, which is less important.
- try:
+ if self._should_cancel():
c = self.pgconn.get_cancel()
c.cancel()
- except Exception as ex:
- logger.warning("couldn't try to cancel query: %s", ex)
def _should_cancel(self) -> bool:
"""Check whether the current command should actually be cancelled when
# (which might or might not have been already transferred entirely to
# the client, so we won't necessary see the exception associated with
# canceling).
- self.connection.cancel_safe()
+ self.connection._try_cancel(timeout=5.0)
self.connection.wait(self._end_copy_out_gen())
# (which might or might not have been already transferred entirely to
# the client, so we won't necessary see the exception associated with
# canceling).
- await self.connection.cancel_safe()
+ await self.connection._try_cancel(timeout=5.0)
await self.connection.wait(self._end_copy_out_gen())
return
if capabilities.has_cancel_safe():
- try:
- waiting.wait_conn(
- self._cancel_gen(timeout=timeout), interval=_WAIT_INTERVAL
- )
- except Exception as ex:
- logger.warning("couldn't try to cancel query: %s", ex)
+ waiting.wait_conn(
+ self._cancel_gen(timeout=timeout), interval=_WAIT_INTERVAL
+ )
else:
self.cancel()
+ def _try_cancel(self, *, timeout: float = 30.0) -> None:
+ try:
+ self.cancel_safe(timeout=timeout)
+ except Exception as ex:
+ logger.warning("query cancellation failed: %s", ex)
+
@contextmanager
def transaction(
self, savepoint_name: Optional[str] = None, force_rollback: bool = False
if self.pgconn.transaction_status == ACTIVE:
# On Ctrl-C, try to cancel the query in the server, otherwise
# the connection will remain stuck in ACTIVE state.
- self.cancel_safe()
+ self._try_cancel(timeout=5.0)
try:
waiting.wait(gen, self.pgconn.socket, interval=interval)
except e.QueryCanceled:
return
if capabilities.has_cancel_safe():
- try:
- await waiting.wait_conn_async(
- self._cancel_gen(timeout=timeout), interval=_WAIT_INTERVAL
- )
- except Exception as ex:
- logger.warning("couldn't try to cancel query: %s", ex)
-
+ await waiting.wait_conn_async(
+ self._cancel_gen(timeout=timeout), interval=_WAIT_INTERVAL
+ )
else:
if True: # ASYNC
await to_thread(self.cancel)
else:
self.cancel()
+ async def _try_cancel(self, *, timeout: float = 30.0) -> None:
+ try:
+ await self.cancel_safe(timeout=timeout)
+ except Exception as ex:
+ logger.warning("query cancellation failed: %s", ex)
+
@asynccontextmanager
async def transaction(
self, savepoint_name: Optional[str] = None, force_rollback: bool = False
if self.pgconn.transaction_status == ACTIVE:
# On Ctrl-C, try to cancel the query in the server, otherwise
# the connection will remain stuck in ACTIVE state.
- await self.cancel_safe()
+ await self._try_cancel(timeout=5.0)
try:
await waiting.wait_async(gen, self.pgconn.socket, interval=interval)
except e.QueryCanceled:
if self._pgconn.transaction_status == ACTIVE:
# Try to cancel the query, then consume the results
# already received.
- self._conn.cancel_safe()
+ self._conn._try_cancel(timeout=5.0)
try:
while self._conn.wait(self._stream_fetchone_gen(first=False)):
pass
if self._pgconn.transaction_status == ACTIVE:
# Try to cancel the query, then consume the results
# already received.
- await self._conn.cancel_safe()
+ await self._conn._try_cancel(timeout=5.0)
try:
while await self._conn.wait(
self._stream_fetchone_gen(first=False)
conn.cancel_safe()
+@pytest.mark.slow
+@pytest.mark.timing
+def test_cancel_safe_error(conn_cls, proxy, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg")
+ proxy.start()
+ with conn_cls.connect(proxy.client_dsn) as conn:
+ proxy.stop()
+ with pytest.raises(
+ e.OperationalError, match="(Connection refused)|(connect\\(\\) failed)"
+ ) as ex:
+ conn.cancel_safe(timeout=2)
+ assert not caplog.records
+
+ # Note: testing an internal method. It's ok if this behaviour changes
+ conn._try_cancel(timeout=2.0)
+ assert len(caplog.records) == 1
+ caplog.records[0].message == str(ex.value)
+
+
@pytest.mark.slow
@pytest.mark.timing
@pytest.mark.libpq(">= 17")
await aconn.cancel_safe()
+@pytest.mark.slow
+@pytest.mark.timing
+async def test_cancel_safe_error(aconn_cls, proxy, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg")
+ proxy.start()
+ async with await aconn_cls.connect(proxy.client_dsn) as aconn:
+ proxy.stop()
+ with pytest.raises(
+ e.OperationalError, match=r"(Connection refused)|(connect\(\) failed)"
+ ) as ex:
+ await aconn.cancel_safe(timeout=2)
+ assert not caplog.records
+
+ # Note: testing an internal method. It's ok if this behaviour changes
+ await aconn._try_cancel(timeout=2.0)
+ assert len(caplog.records) == 1
+ caplog.records[0].message == str(ex.value)
+
+
@pytest.mark.slow
@pytest.mark.timing
@pytest.mark.libpq(">= 17")