From 9bbf50317bc4ffbed8918b2556c9f091ad4362d5 Mon Sep 17 00:00:00 2001 From: Denis Laxalde Date: Thu, 23 Mar 2023 13:51:16 +0100 Subject: [PATCH] feat: add libpq interface for encrypted and non-blocking cancellation Encrypted and non-blocking cancellation should be part PostgreSQL 17: - https://commitfest.postgresql.org/37/3511/ - https://git.postgresql.org/gitweb/?p=postgresql.git;a=commit;h=61461a300c1cb5d53955ecd792ad0ce75a104736 We here add the interface for libpq functions for this feature. An extra test is introduced, it closely reproduces src/test/modules/libpq_pipeline/libpq_pipeline::test_cancel() in PostgreSQL test suite. The error_message property of PGcancelConn is directly an str and PGcancelConn has no encoding specified and all messages come from the client (libpq). --- docs/api/pq.rst | 2 +- docs/news.rst | 2 + psycopg/psycopg/errors.py | 3 + psycopg/psycopg/pq/__init__.py | 4 +- psycopg/psycopg/pq/_enums.py | 1 + psycopg/psycopg/pq/_pq_ctypes.py | 55 ++++++++++++++++++ psycopg/psycopg/pq/_pq_ctypes.pyi | 10 ++++ psycopg/psycopg/pq/abc.py | 23 ++++++++ psycopg/psycopg/pq/pq_ctypes.py | 84 ++++++++++++++++++++++++++- psycopg_c/psycopg_c/pq.pxd | 7 +++ psycopg_c/psycopg_c/pq/libpq.pxd | 25 +++++++- psycopg_c/psycopg_c/pq/pgcancel.pyx | 62 ++++++++++++++++++++ psycopg_c/psycopg_c/pq/pgconn.pyx | 7 +++ tests/pq/test_pgconn.py | 89 ++++++++++++++++++++++++++++- 14 files changed, 368 insertions(+), 6 deletions(-) diff --git a/docs/api/pq.rst b/docs/api/pq.rst index 3d9c033fc..bbc34f704 100644 --- a/docs/api/pq.rst +++ b/docs/api/pq.rst @@ -149,7 +149,7 @@ Enumerations after a connection has been established. Other statuses might only be seen during the connection phase and are considered internal. - .. seealso:: :pq:`PQstatus()` returns this value. + .. seealso:: :pq:`PQstatus()` and `PQcancelStatus()` return this value. .. autoclass:: PollingStatus diff --git a/docs/news.rst b/docs/news.rst index cdd13d064..59c326cd8 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -24,6 +24,8 @@ Psycopg 3.2 (unreleased) transaction control methods available on the async connections. - Add support for libpq functions to close prepared statements and portals introduced in libpq v17 (:ticket:`#603`). +- Add support for libpq encrypted and non-blocking query cancellation + functions introduced in libpq v17 (:ticket:`#754`). - The `!context` parameter of `sql` objects `~sql.Composable.as_string()` and `~sql.Composable.as_bytes()` methods is now optional (:ticket:`#716`). - Disable receiving more than one result on the same cursor in pipeline mode, diff --git a/psycopg/psycopg/errors.py b/psycopg/psycopg/errors.py index af57a95f5..6e9334a8c 100644 --- a/psycopg/psycopg/errors.py +++ b/psycopg/psycopg/errors.py @@ -171,6 +171,9 @@ class FinishedPGconn: def set_single_row_mode(self) -> NoReturn: self._raise() + def cancel_conn(self) -> NoReturn: + self._raise() + def get_cancel(self) -> NoReturn: self._raise() diff --git a/psycopg/psycopg/pq/__init__.py b/psycopg/psycopg/pq/__init__.py index 0048ebbf1..4d819ff94 100644 --- a/psycopg/psycopg/pq/__init__.py +++ b/psycopg/psycopg/pq/__init__.py @@ -43,6 +43,7 @@ PGresult: Type[abc.PGresult] Conninfo: Type[abc.Conninfo] Escaping: Type[abc.Escaping] PGcancel: Type[abc.PGcancel] +PGcancelConn: Type[abc.PGcancelConn] def import_from_libpq() -> None: @@ -54,7 +55,7 @@ def import_from_libpq() -> None: """ # import these names into the module on success as side effect global __impl__, version, __build_version__ - global PGconn, PGresult, Conninfo, Escaping, PGcancel + global PGconn, PGresult, Conninfo, Escaping, PGcancel, PGcancelConn impl = os.environ.get("PSYCOPG_IMPL", "").lower() module = None @@ -98,6 +99,7 @@ def import_from_libpq() -> None: Conninfo = module.Conninfo Escaping = module.Escaping PGcancel = module.PGcancel + PGcancelConn = module.PGcancelConn __build_version__ = module.__build_version__ elif impl: raise ImportError(f"requested psycopg implementation '{impl}' unknown") diff --git a/psycopg/psycopg/pq/_enums.py b/psycopg/psycopg/pq/_enums.py index e0d4018c3..1f6fcd48c 100644 --- a/psycopg/psycopg/pq/_enums.py +++ b/psycopg/psycopg/pq/_enums.py @@ -31,6 +31,7 @@ class ConnStatus(IntEnum): GSS_STARTUP = auto() CHECK_TARGET = auto() CHECK_STANDBY = auto() + ALLOCATED = auto() # Only for cancel connections. class PollingStatus(IntEnum): diff --git a/psycopg/psycopg/pq/_pq_ctypes.py b/psycopg/psycopg/pq/_pq_ctypes.py index 2c2997720..1f7f482a0 100644 --- a/psycopg/psycopg/pq/_pq_ctypes.py +++ b/psycopg/psycopg/pq/_pq_ctypes.py @@ -83,6 +83,10 @@ class PGnotify_struct(Structure): ] +class PGcancelConn_struct(Structure): + _fields_: List[Tuple[str, type]] = [] + + class PGcancel_struct(Structure): _fields_: List[Tuple[str, type]] = [] @@ -103,6 +107,7 @@ PGconn_ptr = POINTER(PGconn_struct) PGresult_ptr = POINTER(PGresult_struct) PQconninfoOption_ptr = POINTER(PQconninfoOption_struct) PGnotify_ptr = POINTER(PGnotify_struct) +PGcancelConn_ptr = POINTER(PGcancelConn_struct) PGcancel_ptr = POINTER(PGcancel_struct) PGresAttDesc_ptr = POINTER(PGresAttDesc_struct) @@ -557,6 +562,55 @@ PQsetSingleRowMode.restype = c_int # 33.6. Canceling Queries in Progress +if libpq_version >= 170000: + PQcancelCreate = pq.PQcancelCreate + PQcancelCreate.argtypes = [PGconn_ptr] + PQcancelCreate.restype = PGcancelConn_ptr + + PQcancelStart = pq.PQcancelStart + PQcancelStart.argtypes = [PGcancelConn_ptr] + PQcancelStart.restype = c_int + + PQcancelBlocking = pq.PQcancelBlocking + PQcancelBlocking.argtypes = [PGcancelConn_ptr] + PQcancelBlocking.restype = c_int + + PQcancelPoll = pq.PQcancelPoll + PQcancelPoll.argtypes = [PGcancelConn_ptr] + PQcancelPoll.restype = c_int + + PQcancelStatus = pq.PQcancelStatus + PQcancelStatus.argtypes = [PGcancelConn_ptr] + PQcancelStatus.restype = c_int + + PQcancelSocket = pq.PQcancelSocket + PQcancelSocket.argtypes = [PGcancelConn_ptr] + PQcancelSocket.restype = c_int + + PQcancelErrorMessage = pq.PQcancelErrorMessage + PQcancelErrorMessage.argtypes = [PGcancelConn_ptr] + PQcancelErrorMessage.restype = c_char_p + + PQcancelReset = pq.PQcancelReset + PQcancelReset.argtypes = [PGcancelConn_ptr] + PQcancelReset.restype = None + + PQcancelFinish = pq.PQcancelFinish + PQcancelFinish.argtypes = [PGcancelConn_ptr] + PQcancelFinish.restype = None + +else: + PQcancelCreate = not_supported_before("PQcancelCreate", 17) + PQcancelStart = not_supported_before("PQcancelStart", 17) + PQcancelBlocking = not_supported_before("PQcancelBlocking", 17) + PQcancelPoll = not_supported_before("PQcancelPoll", 17) + PQcancelStatus = not_supported_before("PQcancelStatus", 17) + PQcancelSocket = not_supported_before("PQcancelSocket", 17) + PQcancelErrorMessage = not_supported_before("PQcancelErrorMessage", 17) + PQcancelReset = not_supported_before("PQcancelReset", 17) + PQcancelFinish = not_supported_before("PQcancelFinish", 17) + + PQgetCancel = pq.PQgetCancel PQgetCancel.argtypes = [PGconn_ptr] PQgetCancel.restype = PGcancel_ptr @@ -703,6 +757,7 @@ def generate_stub() -> None: elif t.__name__ in ( "LP_PGconn_struct", "LP_PGresult_struct", + "LP_PGcancelConn_struct", "LP_PGcancel_struct", ): if narg is not None: diff --git a/psycopg/psycopg/pq/_pq_ctypes.pyi b/psycopg/psycopg/pq/_pq_ctypes.pyi index 4a03767b5..ca23070bd 100644 --- a/psycopg/psycopg/pq/_pq_ctypes.pyi +++ b/psycopg/psycopg/pq/_pq_ctypes.pyi @@ -16,6 +16,7 @@ Oid = c_uint class PGconn_struct: ... class PGresult_struct: ... +class PGcancelConn_struct: ... class PGcancel_struct: ... class PQconninfoOption_struct: @@ -88,6 +89,15 @@ def PQsendQueryPrepared( arg6: Optional[Array[c_int]], arg7: int, ) -> int: ... +def PQcancelCreate(arg1: Optional[PGconn_struct]) -> PGcancelConn_struct: ... +def PQcancelBlocking(arg1: Optional[PGcancelConn_struct]) -> int: ... +def PQcancelStart(arg1: Optional[PGcancelConn_struct]) -> int: ... +def PQcancelPoll(arg1: Optional[PGcancelConn_struct]) -> int: ... +def PQcancelStatus(arg1: Optional[PGcancelConn_struct]) -> int: ... +def PQcancelSocket(arg1: Optional[PGcancelConn_struct]) -> int: ... +def PQcancelErrorMessage(arg1: Optional[PGcancelConn_struct]) -> bytes: ... +def PQcancelReset(arg1: Optional[PGcancelConn_struct]) -> None: ... +def PQcancelFinish(arg1: Optional[PGcancelConn_struct]) -> None: ... def PQcancel(arg1: Optional[PGcancel_struct], arg2: c_char_p, arg3: int) -> int: ... def PQsetNoticeReceiver( arg1: PGconn_struct, arg2: Callable[[Any], PGresult_struct], arg3: Any diff --git a/psycopg/psycopg/pq/abc.py b/psycopg/psycopg/pq/abc.py index 13a077211..75733bdeb 100644 --- a/psycopg/psycopg/pq/abc.py +++ b/psycopg/psycopg/pq/abc.py @@ -180,6 +180,8 @@ class PGconn(Protocol): def set_single_row_mode(self) -> None: ... + def cancel_conn(self) -> "PGcancelConn": ... + def get_cancel(self) -> "PGcancel": ... def notifies(self) -> Optional["PGnotify"]: ... @@ -267,6 +269,27 @@ class PGresult(Protocol): def set_attributes(self, descriptions: List["PGresAttDesc"]) -> None: ... +class PGcancelConn(Protocol): + def start(self) -> None: ... + + def blocking(self) -> None: ... + + def poll(self) -> int: ... + + @property + def status(self) -> int: ... + + @property + def socket(self) -> int: ... + + @property + def error_message(self) -> str: ... + + def reset(self) -> None: ... + + def finish(self) -> None: ... + + class PGcancel(Protocol): def free(self) -> None: ... diff --git a/psycopg/psycopg/pq/pq_ctypes.py b/psycopg/psycopg/pq/pq_ctypes.py index f04a80367..a2fabbbab 100644 --- a/psycopg/psycopg/pq/pq_ctypes.py +++ b/psycopg/psycopg/pq/pq_ctypes.py @@ -25,7 +25,7 @@ from .misc import error_message, connection_summary from ._enums import Format, ExecStatus, Trace # Imported locally to call them from __del__ methods -from ._pq_ctypes import PQclear, PQfinish, PQfreeCancel, PQstatus +from ._pq_ctypes import PQclear, PQfinish, PQfreeCancel, PQcancelFinish, PQstatus if TYPE_CHECKING: from . import abc @@ -592,6 +592,17 @@ class PGconn: if not impl.PQsetSingleRowMode(self._pgconn_ptr): raise e.OperationalError("setting single row mode failed") + def cancel_conn(self) -> "PGcancelConn": + """ + Create a connection over which a cancel request can be sent. + + See :pq:`PQcancelCreate` for details. + """ + rv = impl.PQcancelCreate(self._pgconn_ptr) + if not rv: + raise e.OperationalError("couldn't create cancelConn object") + return PGcancelConn(rv) + def get_cancel(self) -> "PGcancel": """ Create an object with the information needed to cancel a command. @@ -895,6 +906,77 @@ class PGresult: raise e.OperationalError("PQsetResultAttrs failed") +class PGcancelConn: + """ + Token to handle non-blocking cancellation requests. + + Created by `PGconn.cancel_conn()`. + """ + + __slots__ = ("pgcancelconn_ptr",) + + def __init__(self, pgcancelconn_ptr: impl.PGcancelConn_struct): + self.pgcancelconn_ptr: Optional[impl.PGcancelConn_struct] = pgcancelconn_ptr + + def __del__(self) -> None: + self.finish() + + def start(self) -> None: + """Requests that the server abandons processing of the current command + in a non-blocking manner. + + See :pq:`PQcancelStart` for details. + """ + if not impl.PQcancelStart(self.pgcancelconn_ptr): + raise e.OperationalError( + f"couldn't start cancellation: {self.error_message}" + ) + + def blocking(self) -> None: + """Requests that the server abandons processing of the current command + in a blocking manner. + + See :pq:`PQcancelBlocking` for details. + """ + if not impl.PQcancelBlocking(self.pgcancelconn_ptr): + raise e.OperationalError( + f"couldn't start cancellation: {self.error_message}" + ) + + def poll(self) -> int: + return impl.PQcancelPoll(self.pgcancelconn_ptr) + + @property + def status(self) -> int: + return impl.PQcancelStatus(self.pgcancelconn_ptr) + + @property + def socket(self) -> int: + rv = impl.PQcancelSocket(self.pgcancelconn_ptr) + if rv == -1: + raise e.OperationalError("cancel connection not opened") + return rv + + @property + def error_message(self) -> str: + return impl.PQcancelErrorMessage(self.pgcancelconn_ptr).decode() + + def reset(self) -> None: + impl.PQcancelReset(self.pgcancelconn_ptr) + + def finish(self) -> None: + """ + Free the data structure created by `PQcancelCreate()`. + + Automatically invoked by `!__del__()`. + + See :pq:`PQcancelFinish()` for details. + """ + self.pgcancelconn_ptr, p = None, self.pgcancelconn_ptr + if p: + PQcancelFinish(p) + + class PGcancel: """ Token to cancel the current operation on a connection. diff --git a/psycopg_c/psycopg_c/pq.pxd b/psycopg_c/psycopg_c/pq.pxd index 57825dd3e..8a2dbd634 100644 --- a/psycopg_c/psycopg_c/pq.pxd +++ b/psycopg_c/psycopg_c/pq.pxd @@ -38,6 +38,13 @@ cdef class PGresult: cdef PGresult _from_ptr(libpq.PGresult *ptr) +cdef class PGcancelConn: + cdef libpq.PGcancelConn* pgcancelconn_ptr + + @staticmethod + cdef PGcancelConn _from_ptr(libpq.PGcancelConn *ptr) + + cdef class PGcancel: cdef libpq.PGcancel* pgcancel_ptr diff --git a/psycopg_c/psycopg_c/pq/libpq.pxd b/psycopg_c/psycopg_c/pq/libpq.pxd index 5972ae303..04c3ca533 100644 --- a/psycopg_c/psycopg_c/pq/libpq.pxd +++ b/psycopg_c/psycopg_c/pq/libpq.pxd @@ -40,6 +40,9 @@ cdef extern from "libpq-fe.h": int be_pid char *extra + ctypedef struct PGcancelConn: + pass + ctypedef struct PGcancel: pass @@ -83,6 +86,7 @@ cdef extern from "libpq-fe.h": CONNECTION_GSS_STARTUP CONNECTION_CHECK_TARGET CONNECTION_CHECK_STANDBY + CONNECTION_ALLOCATED ctypedef enum PGTransactionStatusType: PQTRANS_IDLE @@ -250,7 +254,16 @@ cdef extern from "libpq-fe.h": # 33.5. Retrieving Query Results Row-by-Row int PQsetSingleRowMode(PGconn *conn) - # 33.6. Canceling Queries in Progress + # 34.7. Canceling Queries in Progress + PGcancelConn *PQcancelCreate(PGconn *conn) + int PQcancelStart(PGcancelConn *cancelConn) + int PQcancelBlocking(PGcancelConn *cancelConn) + PostgresPollingStatusType PQcancelPoll(PGcancelConn *cancelConn) nogil + ConnStatusType PQcancelStatus(const PGcancelConn *cancelConn) + int PQcancelSocket(PGcancelConn *cancelConn) + char *PQcancelErrorMessage(const PGcancelConn *cancelConn) + void PQcancelReset(PGcancelConn *cancelConn) + void PQcancelFinish(PGcancelConn *cancelConn) PGcancel *PQgetCancel(PGconn *conn) void PQfreeCancel(PGcancel *cancel) int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize) @@ -326,9 +339,19 @@ typedef enum { #endif #if PG_VERSION_NUM < 170000 +typedef struct pg_cancel_conn PGcancelConn; #define PQclosePrepared(conn, name) NULL #define PQclosePortal(conn, name) NULL #define PQsendClosePrepared(conn, name) 0 #define PQsendClosePortal(conn, name) 0 +#define PQcancelCreate(conn) NULL +#define PQcancelStart(cancelConn) 0 +#define PQcancelBlocking(cancelConn) 0 +#define PQcancelPoll(cancelConn) CONNECTION_OK +#define PQcancelStatus(cancelConn) 0 +#define PQcancelSocket(cancelConn) -1 +#define PQcancelErrorMessage(cancelConn) NULL +#define PQcancelReset(cancelConn) 0 +#define PQcancelFinish(cancelConn) 0 #endif """ diff --git a/psycopg_c/psycopg_c/pq/pgcancel.pyx b/psycopg_c/psycopg_c/pq/pgcancel.pyx index b7cbb70ef..2922fd765 100644 --- a/psycopg_c/psycopg_c/pq/pgcancel.pyx +++ b/psycopg_c/psycopg_c/pq/pgcancel.pyx @@ -5,6 +5,68 @@ psycopg_c.pq.PGcancel object implementation. # Copyright (C) 2020 The Psycopg Team +cdef class PGcancelConn: + def __cinit__(self): + self.pgcancelconn_ptr = NULL + + @staticmethod + cdef PGcancelConn _from_ptr(libpq.PGcancelConn *ptr): + cdef PGcancelConn rv = PGcancelConn.__new__(PGcancelConn) + rv.pgcancelconn_ptr = ptr + return rv + + def __dealloc__(self) -> None: + self.finish() + + def start(self) -> None: + """Requests that the server abandons processing of the current command + in a non-blocking manner. + + See :pq:`PQcancelStart` for details. + """ + if not libpq.PQcancelStart(self.pgcancelconn_ptr): + raise e.OperationalError( + f"couldn't send cancellation: {self.error_message}" + ) + + def blocking(self) -> None: + """Requests that the server abandons processing of the current command + in a blocking manner. + + See :pq:`PQcancelBlocking` for details. + """ + if not libpq.PQcancelBlocking(self.pgcancelconn_ptr): + raise e.OperationalError( + f"couldn't send cancellation: {self.error_message}" + ) + + def poll(self) -> int: + return libpq.PQcancelPoll(self.pgcancelconn_ptr) + + @property + def status(self) -> int: + return libpq.PQcancelStatus(self.pgcancelconn_ptr) + + @property + def socket(self) -> int: + rv = libpq.PQcancelSocket(self.pgcancelconn_ptr) + if rv == -1: + raise e.OperationalError("cancel connection not opened") + return rv + + @property + def error_message(self) -> str: + return libpq.PQcancelErrorMessage(self.pgcancelconn_ptr).decode() + + def reset(self) -> None: + libpq.PQcancelReset(self.pgcancelconn_ptr) + + def finish(self) -> None: + if self.pgcancelconn_ptr is not NULL: + libpq.PQcancelFinish(self.pgcancelconn_ptr) + self.pgcancelconn_ptr = NULL + + cdef class PGcancel: def __cinit__(self): self.pgcancel_ptr = NULL diff --git a/psycopg_c/psycopg_c/pq/pgconn.pyx b/psycopg_c/psycopg_c/pq/pgconn.pyx index 540ec917b..798c68d0d 100644 --- a/psycopg_c/psycopg_c/pq/pgconn.pyx +++ b/psycopg_c/psycopg_c/pq/pgconn.pyx @@ -498,6 +498,13 @@ cdef class PGconn: if not libpq.PQsetSingleRowMode(self._pgconn_ptr): raise e.OperationalError("setting single row mode failed") + def cancel_conn(self) -> PGcancelConn: + _check_supported("PQcancelCreate", 170000) + cdef libpq.PGcancelConn *ptr = libpq.PQcancelCreate(self._pgconn_ptr) + if not ptr: + raise e.OperationalError("couldn't create cancelConn object") + return PGcancelConn._from_ptr(ptr) + def get_cancel(self) -> PGcancel: cdef libpq.PGcancel *ptr = libpq.PQgetCancel(self._pgconn_ptr) if not ptr: diff --git a/tests/pq/test_pgconn.py b/tests/pq/test_pgconn.py index 7064a5f59..50877929f 100644 --- a/tests/pq/test_pgconn.py +++ b/tests/pq/test_pgconn.py @@ -1,22 +1,26 @@ from __future__ import annotations +import contextlib import os import sys +import time import ctypes import logging import weakref +from functools import partial from select import select +from typing import Iterator import pytest import psycopg from psycopg import pq -from psycopg.pq.abc import PGconn +from psycopg.pq.abc import PGcancelConn, PGconn import psycopg.generators def wait( - conn: PGconn, + conn: PGconn | PGcancelConn, poll_method: str = "connect_poll", return_on: pq.PollingStatus = pq.PollingStatus.OK, timeout: int | None = None, @@ -381,6 +385,87 @@ def test_set_single_row_mode(pgconn): pgconn.set_single_row_mode() +@contextlib.contextmanager +def cancellable_query(pgconn: PGconn, monitor_conn: PGconn) -> Iterator[None]: + pgconn.send_query_params(b"SELECT pg_sleep($1)", [b"180"]) + while True: + r = monitor_conn.exec_( + b"SELECT count(*) FROM pg_stat_activity" + b" WHERE query = 'SELECT pg_sleep($1)'" + b" AND state = 'active'" + ) + assert r.status == pq.ExecStatus.TUPLES_OK + if r.get_value(0, 0) != b"0": + break + + time.sleep(0.01) + + yield None + + res = pgconn.get_result() + assert res is not None + assert res.status == pq.ExecStatus.FATAL_ERROR + assert res.error_field(pq.DiagnosticField.SQLSTATE) == b"57014" + while pgconn.is_busy(): + pgconn.consume_input() + + +@pytest.mark.libpq(">= 17") +def test_cancel_nonblocking(dsn): + # mimic test_cancel() from src/test/modules/libpq_pipeline/libpq_pipeline.c + def connect() -> PGconn: + conn = pq.PGconn.connect(dsn.encode()) + if conn.status != pq.ConnStatus.OK: + pytest.fail( + f"bad connection: {conn.error_message.decode('utf8', 'replace')}" + ) + return conn + + conn, monitor_conn = connect(), connect() + conn.nonblocking = 1 + + # test PQcancel + with cancellable_query(conn, monitor_conn): + cancel = conn.get_cancel() + cancel.cancel() + + # PGcancel object can be reused for the next query + with cancellable_query(conn, monitor_conn): + cancel.cancel() + + del cancel + + # test PQcancelBlocking + with cancellable_query(conn, monitor_conn): + cancel_conn = conn.cancel_conn() + assert cancel_conn.status == pq.ConnStatus.ALLOCATED + cancel_conn.blocking() + assert cancel_conn.status == pq.ConnStatus.OK # type: ignore[comparison-overlap] # noqa: E501 + cancel_conn.finish() + del cancel_conn + + wait_cancel = partial(wait, poll_method="poll", timeout=3) + + # test PQcancelCreate and then polling with PQcancelPoll + with cancellable_query(conn, monitor_conn): + cancel_conn = conn.cancel_conn() + assert cancel_conn.status == pq.ConnStatus.ALLOCATED + cancel_conn.start() + assert cancel_conn.status == pq.ConnStatus.STARTED + wait_cancel(cancel_conn) + assert cancel_conn.status == pq.ConnStatus.OK + + # test PQcancelReset works on the cancel connection and it can be reused + # after + cancel_conn.reset() + with cancellable_query(conn, monitor_conn): + cancel_conn.start() + wait_cancel(cancel_conn) + assert cancel_conn.status == pq.ConnStatus.OK + + cancel_conn.finish() + + def test_cancel(pgconn): cancel = pgconn.get_cancel() cancel.cancel() -- 2.47.2