.. rubric:: Methods you can use to do something cool
.. automethod:: cancel
+ .. automethod:: cancel_safe
.. automethod:: notifies
in non-pipeline mode and not totally reliable (:ticket:`#604`).
The `Cursor` now only preserves the results set of the last
`~Cursor.execute()`, consistently with non-pipeline mode.
+- Add `~Connection.cancel_safe()` for encrypted and non-blocking cancellation
+ (:ticket:`#754`).
+- Possibly use non-blocking cancellation upon `KeyboardInterrupt`
+ (:ticket:`#754`).
.. __: https://numpy.org/doc/stable/reference/arrays.scalars.html#built-in-scalar-types
def cancel(self) -> None:
"""Cancel the current operation on the connection."""
- # No-op if the connection is closed
+ if self._should_cancel():
+ self._try_cancel(self.pgconn)
+
+ def _should_cancel(self) -> bool:
+ """Check whether the current command should actually be cancelled when
+ invoking cancel*().
+ """
+ # cancel() is a no-op if the connection is closed;
# this allows to use the method as callback handler without caring
# about its life.
if self.closed:
- return
-
+ return False
if self._tpc and self._tpc[1]:
raise e.ProgrammingError(
"cancel() cannot be used with a prepared two-phase transaction"
)
-
- self._try_cancel(self.pgconn)
+ return True
@classmethod
def _try_cancel(cls, pgconn: "PGconn") -> None:
+ """Try to cancel the current command using a PGcancel object,
+ which is deprecated since libpq 17.
+ """
try:
# Can fail if the connection is closed
c = pgconn.get_cancel()
else:
c.cancel()
+ def _cancel_gen(self) -> PQGenConn[None]:
+ try:
+ cancel_conn = self.pgconn.cancel_conn()
+ except e.OperationalError as ex: # if the connection is closed
+ logger.warning("couldn't create a cancel connection: %s", ex)
+ else:
+ cancel_conn.start()
+ yield from generators.cancel(cancel_conn)
+
def add_notice_handler(self, callback: NoticeHandler) -> None:
"""
Register a callable to be invoked when a notice message is received.
with self.lock:
self.wait(self._rollback_gen())
+ def cancel_safe(self) -> None:
+ """Cancel the current operation on the connection.
+
+ This is a non-blocking version of `cancel()` which leverages a more
+ secure and improved cancellation feature of the libpq (available from
+ version 17).
+
+ In contrast with `cancel()`, it is not appropriate for use in a signal
+ handler.
+
+ :raises ~psycopg.NotSupportedError: if the underlying libpq is older
+ than version 17.
+ """
+ if self._should_cancel():
+ waiting.wait_conn(self._cancel_gen(), interval=_WAIT_INTERVAL)
+
@contextmanager
def transaction(
self, savepoint_name: Optional[str] = None, force_rollback: bool = False
if self.pgconn.transaction_status == ACTIVE:
# On Ctrl-C, try to cancel the query in the server, otherwise
# the connection will remain stuck in ACTIVE state.
- self._try_cancel(self.pgconn)
+ try:
+ self.cancel_safe()
+ except e.NotSupportedError:
+ self.cancel()
try:
waiting.wait(gen, self.pgconn.socket, interval=interval)
except e.QueryCanceled:
async with self.lock:
await self.wait(self._rollback_gen())
+ async def cancel_safe(self) -> None:
+ """Cancel the current operation on the connection.
+
+ This is a non-blocking version of `cancel()` which leverages a more
+ secure and improved cancellation feature of the libpq (available from
+ version 17).
+
+ In contrast with `cancel()`, it is not appropriate for use in a signal
+ handler.
+
+ :raises ~psycopg.NotSupportedError: if the underlying libpq is older
+ than version 17.
+ """
+ if self._should_cancel():
+ await waiting.wait_conn_async(self._cancel_gen(), interval=_WAIT_INTERVAL)
+
@asynccontextmanager
async def transaction(
self, savepoint_name: Optional[str] = None, force_rollback: bool = False
if self.pgconn.transaction_status == ACTIVE:
# On Ctrl-C, try to cancel the query in the server, otherwise
# the connection will remain stuck in ACTIVE state.
- self._try_cancel(self.pgconn)
+ try:
+ await self.cancel_safe()
+ except e.NotSupportedError:
+ self.cancel()
try:
await waiting.wait_async(gen, self.pgconn.socket, interval=interval)
except e.QueryCanceled:
if self._pgconn.transaction_status == ACTIVE:
# Try to cancel the query, then consume the results
# already received.
- self._conn.cancel()
+ try:
+ self._conn.cancel_safe()
+ except e.NotSupportedError:
+ self._conn.cancel()
try:
while self._conn.wait(self._stream_fetchone_gen(first=False)):
pass
if self._pgconn.transaction_status == ACTIVE:
# Try to cancel the query, then consume the results
# already received.
- self._conn.cancel()
+ try:
+ await self._conn.cancel_safe()
+ except e.NotSupportedError:
+ self._conn.cancel()
try:
while await self._conn.wait(
self._stream_fetchone_gen(first=False)
async def canceller(aconn, errors):
try:
await asyncio.sleep(0.5)
- aconn.cancel()
+ try:
+ await aconn.cancel_safe()
+ except e.NotSupportedError:
+ aconn.cancel()
except Exception as exc:
errors.append(exc)
conn.cancel()
+def test_cancel_safe_closed(conn):
+ conn.close()
+ conn.cancel_safe()
+
+
def test_resolve_hostaddr_conn(conn_cls, monkeypatch, fake_resolve):
got = ""
aconn.cancel()
+async def test_cancel_safe_closed(aconn):
+ await aconn.close()
+ await aconn.cancel_safe()
+
+
async def test_resolve_hostaddr_conn(aconn_cls, monkeypatch, fake_resolve):
got = ""
conn.tpc_prepare()
with pytest.raises(psycopg.ProgrammingError):
conn.cancel()
+ with pytest.raises(psycopg.ProgrammingError):
+ conn.cancel_safe()
def test_tpc_recover_non_dbapi_connection(self, conn_cls, conn, dsn, tpc):
conn.row_factory = psycopg.rows.dict_row
await aconn.tpc_prepare()
with pytest.raises(psycopg.ProgrammingError):
aconn.cancel()
+ with pytest.raises(psycopg.ProgrammingError):
+ await aconn.cancel_safe()
async def test_tpc_recover_non_dbapi_connection(self, aconn_cls, aconn, dsn, tpc):
aconn.row_factory = psycopg.rows.dict_row