]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
feat: add libpq interface for encrypted and non-blocking cancellation
authorDenis Laxalde <denis.laxalde@dalibo.com>
Thu, 23 Mar 2023 12:51:16 +0000 (13:51 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 5 Apr 2024 18:35:16 +0000 (18:35 +0000)
Encrypted and non-blocking cancellation should be part PostgreSQL 17:
- https://commitfest.postgresql.org/37/3511/
- https://git.postgresql.org/gitweb/?p=postgresql.git;a=commit;h=61461a300c1cb5d53955ecd792ad0ce75a104736

We here add the interface for libpq functions for this feature.

An extra test is introduced, it closely reproduces
src/test/modules/libpq_pipeline/libpq_pipeline::test_cancel() in
PostgreSQL test suite.

The error_message property of PGcancelConn is directly an str and
PGcancelConn has no encoding specified and all messages come from the
client (libpq).

14 files changed:
docs/api/pq.rst
docs/news.rst
psycopg/psycopg/errors.py
psycopg/psycopg/pq/__init__.py
psycopg/psycopg/pq/_enums.py
psycopg/psycopg/pq/_pq_ctypes.py
psycopg/psycopg/pq/_pq_ctypes.pyi
psycopg/psycopg/pq/abc.py
psycopg/psycopg/pq/pq_ctypes.py
psycopg_c/psycopg_c/pq.pxd
psycopg_c/psycopg_c/pq/libpq.pxd
psycopg_c/psycopg_c/pq/pgcancel.pyx
psycopg_c/psycopg_c/pq/pgconn.pyx
tests/pq/test_pgconn.py

index 3d9c033fc0d36a13b0a01ca29b3d9a32c3249b97..bbc34f704f66c86a5f238dc2fb75944e2b85bb9e 100644 (file)
@@ -149,7 +149,7 @@ Enumerations
     after a connection has been established. Other statuses might only be seen
     during the connection phase and are considered internal.
 
-    .. seealso:: :pq:`PQstatus()` returns this value.
+    .. seealso:: :pq:`PQstatus()` and `PQcancelStatus()` return this value.
 
 
 .. autoclass:: PollingStatus
index cdd13d064d11b7c186e1b780dcb58b0ad17a533a..59c326cd8956a52638c4a90e05a3741f31d54892 100644 (file)
@@ -24,6 +24,8 @@ Psycopg 3.2 (unreleased)
   transaction control methods available on the async connections.
 - Add support for libpq functions to close prepared statements and portals
   introduced in libpq v17 (:ticket:`#603`).
+- Add support for libpq encrypted and non-blocking query cancellation
+  functions introduced in libpq v17 (:ticket:`#754`).
 - The `!context` parameter of `sql` objects `~sql.Composable.as_string()` and
   `~sql.Composable.as_bytes()` methods is now optional (:ticket:`#716`).
 - Disable receiving more than one result on the same cursor in pipeline mode,
index af57a95f5ec3861bdd4699a0aa8628958f1420a6..6e9334a8c99e3bdcbc069628ceb089b485cf8d9d 100644 (file)
@@ -171,6 +171,9 @@ class FinishedPGconn:
     def set_single_row_mode(self) -> NoReturn:
         self._raise()
 
+    def cancel_conn(self) -> NoReturn:
+        self._raise()
+
     def get_cancel(self) -> NoReturn:
         self._raise()
 
index 0048ebbf1a451cfc9779225d12201b553549783d..4d819ff947b7501f577b1dc07e9c00040e039750 100644 (file)
@@ -43,6 +43,7 @@ PGresult: Type[abc.PGresult]
 Conninfo: Type[abc.Conninfo]
 Escaping: Type[abc.Escaping]
 PGcancel: Type[abc.PGcancel]
+PGcancelConn: Type[abc.PGcancelConn]
 
 
 def import_from_libpq() -> None:
@@ -54,7 +55,7 @@ def import_from_libpq() -> None:
     """
     # import these names into the module on success as side effect
     global __impl__, version, __build_version__
-    global PGconn, PGresult, Conninfo, Escaping, PGcancel
+    global PGconn, PGresult, Conninfo, Escaping, PGcancel, PGcancelConn
 
     impl = os.environ.get("PSYCOPG_IMPL", "").lower()
     module = None
@@ -98,6 +99,7 @@ def import_from_libpq() -> None:
         Conninfo = module.Conninfo
         Escaping = module.Escaping
         PGcancel = module.PGcancel
+        PGcancelConn = module.PGcancelConn
         __build_version__ = module.__build_version__
     elif impl:
         raise ImportError(f"requested psycopg implementation '{impl}' unknown")
index e0d4018c3f9522111989ca398967e9a298631530..1f6fcd48c1021da20852d19f5f0f1fb179827cae 100644 (file)
@@ -31,6 +31,7 @@ class ConnStatus(IntEnum):
     GSS_STARTUP = auto()
     CHECK_TARGET = auto()
     CHECK_STANDBY = auto()
+    ALLOCATED = auto()  # Only for cancel connections.
 
 
 class PollingStatus(IntEnum):
index 2c299772019b3b81828777c4045e94716b633b85..1f7f482a0f0d9b58c15f5eacbcc513013a008870 100644 (file)
@@ -83,6 +83,10 @@ class PGnotify_struct(Structure):
     ]
 
 
+class PGcancelConn_struct(Structure):
+    _fields_: List[Tuple[str, type]] = []
+
+
 class PGcancel_struct(Structure):
     _fields_: List[Tuple[str, type]] = []
 
@@ -103,6 +107,7 @@ PGconn_ptr = POINTER(PGconn_struct)
 PGresult_ptr = POINTER(PGresult_struct)
 PQconninfoOption_ptr = POINTER(PQconninfoOption_struct)
 PGnotify_ptr = POINTER(PGnotify_struct)
+PGcancelConn_ptr = POINTER(PGcancelConn_struct)
 PGcancel_ptr = POINTER(PGcancel_struct)
 PGresAttDesc_ptr = POINTER(PGresAttDesc_struct)
 
@@ -557,6 +562,55 @@ PQsetSingleRowMode.restype = c_int
 
 # 33.6. Canceling Queries in Progress
 
+if libpq_version >= 170000:
+    PQcancelCreate = pq.PQcancelCreate
+    PQcancelCreate.argtypes = [PGconn_ptr]
+    PQcancelCreate.restype = PGcancelConn_ptr
+
+    PQcancelStart = pq.PQcancelStart
+    PQcancelStart.argtypes = [PGcancelConn_ptr]
+    PQcancelStart.restype = c_int
+
+    PQcancelBlocking = pq.PQcancelBlocking
+    PQcancelBlocking.argtypes = [PGcancelConn_ptr]
+    PQcancelBlocking.restype = c_int
+
+    PQcancelPoll = pq.PQcancelPoll
+    PQcancelPoll.argtypes = [PGcancelConn_ptr]
+    PQcancelPoll.restype = c_int
+
+    PQcancelStatus = pq.PQcancelStatus
+    PQcancelStatus.argtypes = [PGcancelConn_ptr]
+    PQcancelStatus.restype = c_int
+
+    PQcancelSocket = pq.PQcancelSocket
+    PQcancelSocket.argtypes = [PGcancelConn_ptr]
+    PQcancelSocket.restype = c_int
+
+    PQcancelErrorMessage = pq.PQcancelErrorMessage
+    PQcancelErrorMessage.argtypes = [PGcancelConn_ptr]
+    PQcancelErrorMessage.restype = c_char_p
+
+    PQcancelReset = pq.PQcancelReset
+    PQcancelReset.argtypes = [PGcancelConn_ptr]
+    PQcancelReset.restype = None
+
+    PQcancelFinish = pq.PQcancelFinish
+    PQcancelFinish.argtypes = [PGcancelConn_ptr]
+    PQcancelFinish.restype = None
+
+else:
+    PQcancelCreate = not_supported_before("PQcancelCreate", 17)
+    PQcancelStart = not_supported_before("PQcancelStart", 17)
+    PQcancelBlocking = not_supported_before("PQcancelBlocking", 17)
+    PQcancelPoll = not_supported_before("PQcancelPoll", 17)
+    PQcancelStatus = not_supported_before("PQcancelStatus", 17)
+    PQcancelSocket = not_supported_before("PQcancelSocket", 17)
+    PQcancelErrorMessage = not_supported_before("PQcancelErrorMessage", 17)
+    PQcancelReset = not_supported_before("PQcancelReset", 17)
+    PQcancelFinish = not_supported_before("PQcancelFinish", 17)
+
+
 PQgetCancel = pq.PQgetCancel
 PQgetCancel.argtypes = [PGconn_ptr]
 PQgetCancel.restype = PGcancel_ptr
@@ -703,6 +757,7 @@ def generate_stub() -> None:
         elif t.__name__ in (
             "LP_PGconn_struct",
             "LP_PGresult_struct",
+            "LP_PGcancelConn_struct",
             "LP_PGcancel_struct",
         ):
             if narg is not None:
index 4a03767b5f5c80f3fb09c9f79ec378cf1b3976be..ca23070bd4eb930db19807304f97d3d595a1096a 100644 (file)
@@ -16,6 +16,7 @@ Oid = c_uint
 
 class PGconn_struct: ...
 class PGresult_struct: ...
+class PGcancelConn_struct: ...
 class PGcancel_struct: ...
 
 class PQconninfoOption_struct:
@@ -88,6 +89,15 @@ def PQsendQueryPrepared(
     arg6: Optional[Array[c_int]],
     arg7: int,
 ) -> int: ...
+def PQcancelCreate(arg1: Optional[PGconn_struct]) -> PGcancelConn_struct: ...
+def PQcancelBlocking(arg1: Optional[PGcancelConn_struct]) -> int: ...
+def PQcancelStart(arg1: Optional[PGcancelConn_struct]) -> int: ...
+def PQcancelPoll(arg1: Optional[PGcancelConn_struct]) -> int: ...
+def PQcancelStatus(arg1: Optional[PGcancelConn_struct]) -> int: ...
+def PQcancelSocket(arg1: Optional[PGcancelConn_struct]) -> int: ...
+def PQcancelErrorMessage(arg1: Optional[PGcancelConn_struct]) -> bytes: ...
+def PQcancelReset(arg1: Optional[PGcancelConn_struct]) -> None: ...
+def PQcancelFinish(arg1: Optional[PGcancelConn_struct]) -> None: ...
 def PQcancel(arg1: Optional[PGcancel_struct], arg2: c_char_p, arg3: int) -> int: ...
 def PQsetNoticeReceiver(
     arg1: PGconn_struct, arg2: Callable[[Any], PGresult_struct], arg3: Any
index 13a077211204a396eb144eb11ad3a00bd05b5f86..75733bdebccd9203005aa2446f1d3ef6ff5878df 100644 (file)
@@ -180,6 +180,8 @@ class PGconn(Protocol):
 
     def set_single_row_mode(self) -> None: ...
 
+    def cancel_conn(self) -> "PGcancelConn": ...
+
     def get_cancel(self) -> "PGcancel": ...
 
     def notifies(self) -> Optional["PGnotify"]: ...
@@ -267,6 +269,27 @@ class PGresult(Protocol):
     def set_attributes(self, descriptions: List["PGresAttDesc"]) -> None: ...
 
 
+class PGcancelConn(Protocol):
+    def start(self) -> None: ...
+
+    def blocking(self) -> None: ...
+
+    def poll(self) -> int: ...
+
+    @property
+    def status(self) -> int: ...
+
+    @property
+    def socket(self) -> int: ...
+
+    @property
+    def error_message(self) -> str: ...
+
+    def reset(self) -> None: ...
+
+    def finish(self) -> None: ...
+
+
 class PGcancel(Protocol):
     def free(self) -> None: ...
 
index f04a803679fcabbd6ba8395b0c1a4c974533a640..a2fabbbab21712c6df0908e719d5d8b3431f64ed 100644 (file)
@@ -25,7 +25,7 @@ from .misc import error_message, connection_summary
 from ._enums import Format, ExecStatus, Trace
 
 # Imported locally to call them from __del__ methods
-from ._pq_ctypes import PQclear, PQfinish, PQfreeCancel, PQstatus
+from ._pq_ctypes import PQclear, PQfinish, PQfreeCancel, PQcancelFinish, PQstatus
 
 if TYPE_CHECKING:
     from . import abc
@@ -592,6 +592,17 @@ class PGconn:
         if not impl.PQsetSingleRowMode(self._pgconn_ptr):
             raise e.OperationalError("setting single row mode failed")
 
+    def cancel_conn(self) -> "PGcancelConn":
+        """
+        Create a connection over which a cancel request can be sent.
+
+        See :pq:`PQcancelCreate` for details.
+        """
+        rv = impl.PQcancelCreate(self._pgconn_ptr)
+        if not rv:
+            raise e.OperationalError("couldn't create cancelConn object")
+        return PGcancelConn(rv)
+
     def get_cancel(self) -> "PGcancel":
         """
         Create an object with the information needed to cancel a command.
@@ -895,6 +906,77 @@ class PGresult:
             raise e.OperationalError("PQsetResultAttrs failed")
 
 
+class PGcancelConn:
+    """
+    Token to handle non-blocking cancellation requests.
+
+    Created by `PGconn.cancel_conn()`.
+    """
+
+    __slots__ = ("pgcancelconn_ptr",)
+
+    def __init__(self, pgcancelconn_ptr: impl.PGcancelConn_struct):
+        self.pgcancelconn_ptr: Optional[impl.PGcancelConn_struct] = pgcancelconn_ptr
+
+    def __del__(self) -> None:
+        self.finish()
+
+    def start(self) -> None:
+        """Requests that the server abandons processing of the current command
+        in a non-blocking manner.
+
+        See :pq:`PQcancelStart` for details.
+        """
+        if not impl.PQcancelStart(self.pgcancelconn_ptr):
+            raise e.OperationalError(
+                f"couldn't start cancellation: {self.error_message}"
+            )
+
+    def blocking(self) -> None:
+        """Requests that the server abandons processing of the current command
+        in a blocking manner.
+
+        See :pq:`PQcancelBlocking` for details.
+        """
+        if not impl.PQcancelBlocking(self.pgcancelconn_ptr):
+            raise e.OperationalError(
+                f"couldn't start cancellation: {self.error_message}"
+            )
+
+    def poll(self) -> int:
+        return impl.PQcancelPoll(self.pgcancelconn_ptr)
+
+    @property
+    def status(self) -> int:
+        return impl.PQcancelStatus(self.pgcancelconn_ptr)
+
+    @property
+    def socket(self) -> int:
+        rv = impl.PQcancelSocket(self.pgcancelconn_ptr)
+        if rv == -1:
+            raise e.OperationalError("cancel connection not opened")
+        return rv
+
+    @property
+    def error_message(self) -> str:
+        return impl.PQcancelErrorMessage(self.pgcancelconn_ptr).decode()
+
+    def reset(self) -> None:
+        impl.PQcancelReset(self.pgcancelconn_ptr)
+
+    def finish(self) -> None:
+        """
+        Free the data structure created by `PQcancelCreate()`.
+
+        Automatically invoked by `!__del__()`.
+
+        See :pq:`PQcancelFinish()` for details.
+        """
+        self.pgcancelconn_ptr, p = None, self.pgcancelconn_ptr
+        if p:
+            PQcancelFinish(p)
+
+
 class PGcancel:
     """
     Token to cancel the current operation on a connection.
index 57825dd3eb08c4da6a681ac0b2688f0beabe6701..8a2dbd6343c11db141bbc0efa58c28a2ecb29fc5 100644 (file)
@@ -38,6 +38,13 @@ cdef class PGresult:
     cdef PGresult _from_ptr(libpq.PGresult *ptr)
 
 
+cdef class PGcancelConn:
+    cdef libpq.PGcancelConn* pgcancelconn_ptr
+
+    @staticmethod
+    cdef PGcancelConn _from_ptr(libpq.PGcancelConn *ptr)
+
+
 cdef class PGcancel:
     cdef libpq.PGcancel* pgcancel_ptr
 
index 5972ae3030acaaa302866bc7345d392e826d7df5..04c3ca533724acb4130e8cb05e5ac78318b31427 100644 (file)
@@ -40,6 +40,9 @@ cdef extern from "libpq-fe.h":
         int     be_pid
         char   *extra
 
+    ctypedef struct PGcancelConn:
+        pass
+
     ctypedef struct PGcancel:
         pass
 
@@ -83,6 +86,7 @@ cdef extern from "libpq-fe.h":
         CONNECTION_GSS_STARTUP
         CONNECTION_CHECK_TARGET
         CONNECTION_CHECK_STANDBY
+        CONNECTION_ALLOCATED
 
     ctypedef enum PGTransactionStatusType:
         PQTRANS_IDLE
@@ -250,7 +254,16 @@ cdef extern from "libpq-fe.h":
     # 33.5. Retrieving Query Results Row-by-Row
     int PQsetSingleRowMode(PGconn *conn)
 
-    # 33.6. Canceling Queries in Progress
+    # 34.7. Canceling Queries in Progress
+    PGcancelConn *PQcancelCreate(PGconn *conn)
+    int PQcancelStart(PGcancelConn *cancelConn)
+    int PQcancelBlocking(PGcancelConn *cancelConn)
+    PostgresPollingStatusType PQcancelPoll(PGcancelConn *cancelConn) nogil
+    ConnStatusType PQcancelStatus(const PGcancelConn *cancelConn)
+    int PQcancelSocket(PGcancelConn *cancelConn)
+    char *PQcancelErrorMessage(const PGcancelConn *cancelConn)
+    void PQcancelReset(PGcancelConn *cancelConn)
+    void PQcancelFinish(PGcancelConn *cancelConn)
     PGcancel *PQgetCancel(PGconn *conn)
     void PQfreeCancel(PGcancel *cancel)
     int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
@@ -326,9 +339,19 @@ typedef enum {
 #endif
 
 #if PG_VERSION_NUM < 170000
+typedef struct pg_cancel_conn PGcancelConn;
 #define PQclosePrepared(conn, name) NULL
 #define PQclosePortal(conn, name) NULL
 #define PQsendClosePrepared(conn, name) 0
 #define PQsendClosePortal(conn, name) 0
+#define PQcancelCreate(conn) NULL
+#define PQcancelStart(cancelConn) 0
+#define PQcancelBlocking(cancelConn) 0
+#define PQcancelPoll(cancelConn) CONNECTION_OK
+#define PQcancelStatus(cancelConn) 0
+#define PQcancelSocket(cancelConn) -1
+#define PQcancelErrorMessage(cancelConn) NULL
+#define PQcancelReset(cancelConn) 0
+#define PQcancelFinish(cancelConn) 0
 #endif
 """
index b7cbb70ef1802cda2d684baf087264ff7b18629e..2922fd765827aa29aea735380090d9ea68ce7fd3 100644 (file)
@@ -5,6 +5,68 @@ psycopg_c.pq.PGcancel object implementation.
 # Copyright (C) 2020 The Psycopg Team
 
 
+cdef class PGcancelConn:
+    def __cinit__(self):
+        self.pgcancelconn_ptr = NULL
+
+    @staticmethod
+    cdef PGcancelConn _from_ptr(libpq.PGcancelConn *ptr):
+        cdef PGcancelConn rv = PGcancelConn.__new__(PGcancelConn)
+        rv.pgcancelconn_ptr = ptr
+        return rv
+
+    def __dealloc__(self) -> None:
+        self.finish()
+
+    def start(self) -> None:
+        """Requests that the server abandons processing of the current command
+        in a non-blocking manner.
+
+        See :pq:`PQcancelStart` for details.
+        """
+        if not libpq.PQcancelStart(self.pgcancelconn_ptr):
+            raise e.OperationalError(
+                f"couldn't send cancellation: {self.error_message}"
+            )
+
+    def blocking(self) -> None:
+        """Requests that the server abandons processing of the current command
+        in a blocking manner.
+
+        See :pq:`PQcancelBlocking` for details.
+        """
+        if not libpq.PQcancelBlocking(self.pgcancelconn_ptr):
+            raise e.OperationalError(
+                f"couldn't send cancellation: {self.error_message}"
+            )
+
+    def poll(self) -> int:
+        return libpq.PQcancelPoll(self.pgcancelconn_ptr)
+
+    @property
+    def status(self) -> int:
+        return libpq.PQcancelStatus(self.pgcancelconn_ptr)
+
+    @property
+    def socket(self) -> int:
+        rv = libpq.PQcancelSocket(self.pgcancelconn_ptr)
+        if rv == -1:
+            raise e.OperationalError("cancel connection not opened")
+        return rv
+
+    @property
+    def error_message(self) -> str:
+        return libpq.PQcancelErrorMessage(self.pgcancelconn_ptr).decode()
+
+    def reset(self) -> None:
+        libpq.PQcancelReset(self.pgcancelconn_ptr)
+
+    def finish(self) -> None:
+        if self.pgcancelconn_ptr is not NULL:
+            libpq.PQcancelFinish(self.pgcancelconn_ptr)
+            self.pgcancelconn_ptr = NULL
+
+
 cdef class PGcancel:
     def __cinit__(self):
         self.pgcancel_ptr = NULL
index 540ec917b6a1005b313b9378f94381cb7b021264..798c68d0d7c9e719bf325d36e33d965e58b609d0 100644 (file)
@@ -498,6 +498,13 @@ cdef class PGconn:
         if not libpq.PQsetSingleRowMode(self._pgconn_ptr):
             raise e.OperationalError("setting single row mode failed")
 
+    def cancel_conn(self) -> PGcancelConn:
+        _check_supported("PQcancelCreate", 170000)
+        cdef libpq.PGcancelConn *ptr = libpq.PQcancelCreate(self._pgconn_ptr)
+        if not ptr:
+            raise e.OperationalError("couldn't create cancelConn object")
+        return PGcancelConn._from_ptr(ptr)
+
     def get_cancel(self) -> PGcancel:
         cdef libpq.PGcancel *ptr = libpq.PQgetCancel(self._pgconn_ptr)
         if not ptr:
index 7064a5f59f827f98ff447babf91bd5c34897b216..50877929fa0a7f478e73d6ab6e18d43b63ec020f 100644 (file)
@@ -1,22 +1,26 @@
 from __future__ import annotations
 
+import contextlib
 import os
 import sys
+import time
 import ctypes
 import logging
 import weakref
+from functools import partial
 from select import select
+from typing import Iterator
 
 import pytest
 
 import psycopg
 from psycopg import pq
-from psycopg.pq.abc import PGconn
+from psycopg.pq.abc import PGcancelConn, PGconn
 import psycopg.generators
 
 
 def wait(
-    conn: PGconn,
+    conn: PGconn | PGcancelConn,
     poll_method: str = "connect_poll",
     return_on: pq.PollingStatus = pq.PollingStatus.OK,
     timeout: int | None = None,
@@ -381,6 +385,87 @@ def test_set_single_row_mode(pgconn):
     pgconn.set_single_row_mode()
 
 
+@contextlib.contextmanager
+def cancellable_query(pgconn: PGconn, monitor_conn: PGconn) -> Iterator[None]:
+    pgconn.send_query_params(b"SELECT pg_sleep($1)", [b"180"])
+    while True:
+        r = monitor_conn.exec_(
+            b"SELECT count(*) FROM pg_stat_activity"
+            b" WHERE query = 'SELECT pg_sleep($1)'"
+            b" AND state = 'active'"
+        )
+        assert r.status == pq.ExecStatus.TUPLES_OK
+        if r.get_value(0, 0) != b"0":
+            break
+
+        time.sleep(0.01)
+
+    yield None
+
+    res = pgconn.get_result()
+    assert res is not None
+    assert res.status == pq.ExecStatus.FATAL_ERROR
+    assert res.error_field(pq.DiagnosticField.SQLSTATE) == b"57014"
+    while pgconn.is_busy():
+        pgconn.consume_input()
+
+
+@pytest.mark.libpq(">= 17")
+def test_cancel_nonblocking(dsn):
+    # mimic test_cancel() from src/test/modules/libpq_pipeline/libpq_pipeline.c
+    def connect() -> PGconn:
+        conn = pq.PGconn.connect(dsn.encode())
+        if conn.status != pq.ConnStatus.OK:
+            pytest.fail(
+                f"bad connection: {conn.error_message.decode('utf8', 'replace')}"
+            )
+        return conn
+
+    conn, monitor_conn = connect(), connect()
+    conn.nonblocking = 1
+
+    # test PQcancel
+    with cancellable_query(conn, monitor_conn):
+        cancel = conn.get_cancel()
+        cancel.cancel()
+
+    # PGcancel object can be reused for the next query
+    with cancellable_query(conn, monitor_conn):
+        cancel.cancel()
+
+    del cancel
+
+    # test PQcancelBlocking
+    with cancellable_query(conn, monitor_conn):
+        cancel_conn = conn.cancel_conn()
+        assert cancel_conn.status == pq.ConnStatus.ALLOCATED
+        cancel_conn.blocking()
+        assert cancel_conn.status == pq.ConnStatus.OK  # type: ignore[comparison-overlap] # noqa: E501
+    cancel_conn.finish()
+    del cancel_conn
+
+    wait_cancel = partial(wait, poll_method="poll", timeout=3)
+
+    # test PQcancelCreate and then polling with PQcancelPoll
+    with cancellable_query(conn, monitor_conn):
+        cancel_conn = conn.cancel_conn()
+        assert cancel_conn.status == pq.ConnStatus.ALLOCATED
+        cancel_conn.start()
+        assert cancel_conn.status == pq.ConnStatus.STARTED
+        wait_cancel(cancel_conn)
+        assert cancel_conn.status == pq.ConnStatus.OK
+
+    # test PQcancelReset works on the cancel connection and it can be reused
+    # after
+    cancel_conn.reset()
+    with cancellable_query(conn, monitor_conn):
+        cancel_conn.start()
+        wait_cancel(cancel_conn)
+        assert cancel_conn.status == pq.ConnStatus.OK
+
+    cancel_conn.finish()
+
+
 def test_cancel(pgconn):
     cancel = pgconn.get_cancel()
     cancel.cancel()