after a connection has been established. Other statuses might only be seen
during the connection phase and are considered internal.
- .. seealso:: :pq:`PQstatus()` returns this value.
+ .. seealso:: :pq:`PQstatus()` and `PQcancelStatus()` return this value.
.. autoclass:: PollingStatus
transaction control methods available on the async connections.
- Add support for libpq functions to close prepared statements and portals
introduced in libpq v17 (:ticket:`#603`).
+- Add support for libpq encrypted and non-blocking query cancellation
+ functions introduced in libpq v17 (:ticket:`#754`).
- The `!context` parameter of `sql` objects `~sql.Composable.as_string()` and
`~sql.Composable.as_bytes()` methods is now optional (:ticket:`#716`).
- Disable receiving more than one result on the same cursor in pipeline mode,
def set_single_row_mode(self) -> NoReturn:
self._raise()
+ def cancel_conn(self) -> NoReturn:
+ self._raise()
+
def get_cancel(self) -> NoReturn:
self._raise()
Conninfo: Type[abc.Conninfo]
Escaping: Type[abc.Escaping]
PGcancel: Type[abc.PGcancel]
+PGcancelConn: Type[abc.PGcancelConn]
def import_from_libpq() -> None:
"""
# import these names into the module on success as side effect
global __impl__, version, __build_version__
- global PGconn, PGresult, Conninfo, Escaping, PGcancel
+ global PGconn, PGresult, Conninfo, Escaping, PGcancel, PGcancelConn
impl = os.environ.get("PSYCOPG_IMPL", "").lower()
module = None
Conninfo = module.Conninfo
Escaping = module.Escaping
PGcancel = module.PGcancel
+ PGcancelConn = module.PGcancelConn
__build_version__ = module.__build_version__
elif impl:
raise ImportError(f"requested psycopg implementation '{impl}' unknown")
GSS_STARTUP = auto()
CHECK_TARGET = auto()
CHECK_STANDBY = auto()
+ ALLOCATED = auto() # Only for cancel connections.
class PollingStatus(IntEnum):
]
+class PGcancelConn_struct(Structure):
+ _fields_: List[Tuple[str, type]] = []
+
+
class PGcancel_struct(Structure):
_fields_: List[Tuple[str, type]] = []
PGresult_ptr = POINTER(PGresult_struct)
PQconninfoOption_ptr = POINTER(PQconninfoOption_struct)
PGnotify_ptr = POINTER(PGnotify_struct)
+PGcancelConn_ptr = POINTER(PGcancelConn_struct)
PGcancel_ptr = POINTER(PGcancel_struct)
PGresAttDesc_ptr = POINTER(PGresAttDesc_struct)
# 33.6. Canceling Queries in Progress
+if libpq_version >= 170000:
+ PQcancelCreate = pq.PQcancelCreate
+ PQcancelCreate.argtypes = [PGconn_ptr]
+ PQcancelCreate.restype = PGcancelConn_ptr
+
+ PQcancelStart = pq.PQcancelStart
+ PQcancelStart.argtypes = [PGcancelConn_ptr]
+ PQcancelStart.restype = c_int
+
+ PQcancelBlocking = pq.PQcancelBlocking
+ PQcancelBlocking.argtypes = [PGcancelConn_ptr]
+ PQcancelBlocking.restype = c_int
+
+ PQcancelPoll = pq.PQcancelPoll
+ PQcancelPoll.argtypes = [PGcancelConn_ptr]
+ PQcancelPoll.restype = c_int
+
+ PQcancelStatus = pq.PQcancelStatus
+ PQcancelStatus.argtypes = [PGcancelConn_ptr]
+ PQcancelStatus.restype = c_int
+
+ PQcancelSocket = pq.PQcancelSocket
+ PQcancelSocket.argtypes = [PGcancelConn_ptr]
+ PQcancelSocket.restype = c_int
+
+ PQcancelErrorMessage = pq.PQcancelErrorMessage
+ PQcancelErrorMessage.argtypes = [PGcancelConn_ptr]
+ PQcancelErrorMessage.restype = c_char_p
+
+ PQcancelReset = pq.PQcancelReset
+ PQcancelReset.argtypes = [PGcancelConn_ptr]
+ PQcancelReset.restype = None
+
+ PQcancelFinish = pq.PQcancelFinish
+ PQcancelFinish.argtypes = [PGcancelConn_ptr]
+ PQcancelFinish.restype = None
+
+else:
+ PQcancelCreate = not_supported_before("PQcancelCreate", 17)
+ PQcancelStart = not_supported_before("PQcancelStart", 17)
+ PQcancelBlocking = not_supported_before("PQcancelBlocking", 17)
+ PQcancelPoll = not_supported_before("PQcancelPoll", 17)
+ PQcancelStatus = not_supported_before("PQcancelStatus", 17)
+ PQcancelSocket = not_supported_before("PQcancelSocket", 17)
+ PQcancelErrorMessage = not_supported_before("PQcancelErrorMessage", 17)
+ PQcancelReset = not_supported_before("PQcancelReset", 17)
+ PQcancelFinish = not_supported_before("PQcancelFinish", 17)
+
+
PQgetCancel = pq.PQgetCancel
PQgetCancel.argtypes = [PGconn_ptr]
PQgetCancel.restype = PGcancel_ptr
elif t.__name__ in (
"LP_PGconn_struct",
"LP_PGresult_struct",
+ "LP_PGcancelConn_struct",
"LP_PGcancel_struct",
):
if narg is not None:
class PGconn_struct: ...
class PGresult_struct: ...
+class PGcancelConn_struct: ...
class PGcancel_struct: ...
class PQconninfoOption_struct:
arg6: Optional[Array[c_int]],
arg7: int,
) -> int: ...
+def PQcancelCreate(arg1: Optional[PGconn_struct]) -> PGcancelConn_struct: ...
+def PQcancelBlocking(arg1: Optional[PGcancelConn_struct]) -> int: ...
+def PQcancelStart(arg1: Optional[PGcancelConn_struct]) -> int: ...
+def PQcancelPoll(arg1: Optional[PGcancelConn_struct]) -> int: ...
+def PQcancelStatus(arg1: Optional[PGcancelConn_struct]) -> int: ...
+def PQcancelSocket(arg1: Optional[PGcancelConn_struct]) -> int: ...
+def PQcancelErrorMessage(arg1: Optional[PGcancelConn_struct]) -> bytes: ...
+def PQcancelReset(arg1: Optional[PGcancelConn_struct]) -> None: ...
+def PQcancelFinish(arg1: Optional[PGcancelConn_struct]) -> None: ...
def PQcancel(arg1: Optional[PGcancel_struct], arg2: c_char_p, arg3: int) -> int: ...
def PQsetNoticeReceiver(
arg1: PGconn_struct, arg2: Callable[[Any], PGresult_struct], arg3: Any
def set_single_row_mode(self) -> None: ...
+ def cancel_conn(self) -> "PGcancelConn": ...
+
def get_cancel(self) -> "PGcancel": ...
def notifies(self) -> Optional["PGnotify"]: ...
def set_attributes(self, descriptions: List["PGresAttDesc"]) -> None: ...
+class PGcancelConn(Protocol):
+ def start(self) -> None: ...
+
+ def blocking(self) -> None: ...
+
+ def poll(self) -> int: ...
+
+ @property
+ def status(self) -> int: ...
+
+ @property
+ def socket(self) -> int: ...
+
+ @property
+ def error_message(self) -> str: ...
+
+ def reset(self) -> None: ...
+
+ def finish(self) -> None: ...
+
+
class PGcancel(Protocol):
def free(self) -> None: ...
from ._enums import Format, ExecStatus, Trace
# Imported locally to call them from __del__ methods
-from ._pq_ctypes import PQclear, PQfinish, PQfreeCancel, PQstatus
+from ._pq_ctypes import PQclear, PQfinish, PQfreeCancel, PQcancelFinish, PQstatus
if TYPE_CHECKING:
from . import abc
if not impl.PQsetSingleRowMode(self._pgconn_ptr):
raise e.OperationalError("setting single row mode failed")
+ def cancel_conn(self) -> "PGcancelConn":
+ """
+ Create a connection over which a cancel request can be sent.
+
+ See :pq:`PQcancelCreate` for details.
+ """
+ rv = impl.PQcancelCreate(self._pgconn_ptr)
+ if not rv:
+ raise e.OperationalError("couldn't create cancelConn object")
+ return PGcancelConn(rv)
+
def get_cancel(self) -> "PGcancel":
"""
Create an object with the information needed to cancel a command.
raise e.OperationalError("PQsetResultAttrs failed")
+class PGcancelConn:
+ """
+ Token to handle non-blocking cancellation requests.
+
+ Created by `PGconn.cancel_conn()`.
+ """
+
+ __slots__ = ("pgcancelconn_ptr",)
+
+ def __init__(self, pgcancelconn_ptr: impl.PGcancelConn_struct):
+ self.pgcancelconn_ptr: Optional[impl.PGcancelConn_struct] = pgcancelconn_ptr
+
+ def __del__(self) -> None:
+ self.finish()
+
+ def start(self) -> None:
+ """Requests that the server abandons processing of the current command
+ in a non-blocking manner.
+
+ See :pq:`PQcancelStart` for details.
+ """
+ if not impl.PQcancelStart(self.pgcancelconn_ptr):
+ raise e.OperationalError(
+ f"couldn't start cancellation: {self.error_message}"
+ )
+
+ def blocking(self) -> None:
+ """Requests that the server abandons processing of the current command
+ in a blocking manner.
+
+ See :pq:`PQcancelBlocking` for details.
+ """
+ if not impl.PQcancelBlocking(self.pgcancelconn_ptr):
+ raise e.OperationalError(
+ f"couldn't start cancellation: {self.error_message}"
+ )
+
+ def poll(self) -> int:
+ return impl.PQcancelPoll(self.pgcancelconn_ptr)
+
+ @property
+ def status(self) -> int:
+ return impl.PQcancelStatus(self.pgcancelconn_ptr)
+
+ @property
+ def socket(self) -> int:
+ rv = impl.PQcancelSocket(self.pgcancelconn_ptr)
+ if rv == -1:
+ raise e.OperationalError("cancel connection not opened")
+ return rv
+
+ @property
+ def error_message(self) -> str:
+ return impl.PQcancelErrorMessage(self.pgcancelconn_ptr).decode()
+
+ def reset(self) -> None:
+ impl.PQcancelReset(self.pgcancelconn_ptr)
+
+ def finish(self) -> None:
+ """
+ Free the data structure created by `PQcancelCreate()`.
+
+ Automatically invoked by `!__del__()`.
+
+ See :pq:`PQcancelFinish()` for details.
+ """
+ self.pgcancelconn_ptr, p = None, self.pgcancelconn_ptr
+ if p:
+ PQcancelFinish(p)
+
+
class PGcancel:
"""
Token to cancel the current operation on a connection.
cdef PGresult _from_ptr(libpq.PGresult *ptr)
+cdef class PGcancelConn:
+ cdef libpq.PGcancelConn* pgcancelconn_ptr
+
+ @staticmethod
+ cdef PGcancelConn _from_ptr(libpq.PGcancelConn *ptr)
+
+
cdef class PGcancel:
cdef libpq.PGcancel* pgcancel_ptr
int be_pid
char *extra
+ ctypedef struct PGcancelConn:
+ pass
+
ctypedef struct PGcancel:
pass
CONNECTION_GSS_STARTUP
CONNECTION_CHECK_TARGET
CONNECTION_CHECK_STANDBY
+ CONNECTION_ALLOCATED
ctypedef enum PGTransactionStatusType:
PQTRANS_IDLE
# 33.5. Retrieving Query Results Row-by-Row
int PQsetSingleRowMode(PGconn *conn)
- # 33.6. Canceling Queries in Progress
+ # 34.7. Canceling Queries in Progress
+ PGcancelConn *PQcancelCreate(PGconn *conn)
+ int PQcancelStart(PGcancelConn *cancelConn)
+ int PQcancelBlocking(PGcancelConn *cancelConn)
+ PostgresPollingStatusType PQcancelPoll(PGcancelConn *cancelConn) nogil
+ ConnStatusType PQcancelStatus(const PGcancelConn *cancelConn)
+ int PQcancelSocket(PGcancelConn *cancelConn)
+ char *PQcancelErrorMessage(const PGcancelConn *cancelConn)
+ void PQcancelReset(PGcancelConn *cancelConn)
+ void PQcancelFinish(PGcancelConn *cancelConn)
PGcancel *PQgetCancel(PGconn *conn)
void PQfreeCancel(PGcancel *cancel)
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
#endif
#if PG_VERSION_NUM < 170000
+typedef struct pg_cancel_conn PGcancelConn;
#define PQclosePrepared(conn, name) NULL
#define PQclosePortal(conn, name) NULL
#define PQsendClosePrepared(conn, name) 0
#define PQsendClosePortal(conn, name) 0
+#define PQcancelCreate(conn) NULL
+#define PQcancelStart(cancelConn) 0
+#define PQcancelBlocking(cancelConn) 0
+#define PQcancelPoll(cancelConn) CONNECTION_OK
+#define PQcancelStatus(cancelConn) 0
+#define PQcancelSocket(cancelConn) -1
+#define PQcancelErrorMessage(cancelConn) NULL
+#define PQcancelReset(cancelConn) 0
+#define PQcancelFinish(cancelConn) 0
#endif
"""
# Copyright (C) 2020 The Psycopg Team
+cdef class PGcancelConn:
+ def __cinit__(self):
+ self.pgcancelconn_ptr = NULL
+
+ @staticmethod
+ cdef PGcancelConn _from_ptr(libpq.PGcancelConn *ptr):
+ cdef PGcancelConn rv = PGcancelConn.__new__(PGcancelConn)
+ rv.pgcancelconn_ptr = ptr
+ return rv
+
+ def __dealloc__(self) -> None:
+ self.finish()
+
+ def start(self) -> None:
+ """Requests that the server abandons processing of the current command
+ in a non-blocking manner.
+
+ See :pq:`PQcancelStart` for details.
+ """
+ if not libpq.PQcancelStart(self.pgcancelconn_ptr):
+ raise e.OperationalError(
+ f"couldn't send cancellation: {self.error_message}"
+ )
+
+ def blocking(self) -> None:
+ """Requests that the server abandons processing of the current command
+ in a blocking manner.
+
+ See :pq:`PQcancelBlocking` for details.
+ """
+ if not libpq.PQcancelBlocking(self.pgcancelconn_ptr):
+ raise e.OperationalError(
+ f"couldn't send cancellation: {self.error_message}"
+ )
+
+ def poll(self) -> int:
+ return libpq.PQcancelPoll(self.pgcancelconn_ptr)
+
+ @property
+ def status(self) -> int:
+ return libpq.PQcancelStatus(self.pgcancelconn_ptr)
+
+ @property
+ def socket(self) -> int:
+ rv = libpq.PQcancelSocket(self.pgcancelconn_ptr)
+ if rv == -1:
+ raise e.OperationalError("cancel connection not opened")
+ return rv
+
+ @property
+ def error_message(self) -> str:
+ return libpq.PQcancelErrorMessage(self.pgcancelconn_ptr).decode()
+
+ def reset(self) -> None:
+ libpq.PQcancelReset(self.pgcancelconn_ptr)
+
+ def finish(self) -> None:
+ if self.pgcancelconn_ptr is not NULL:
+ libpq.PQcancelFinish(self.pgcancelconn_ptr)
+ self.pgcancelconn_ptr = NULL
+
+
cdef class PGcancel:
def __cinit__(self):
self.pgcancel_ptr = NULL
if not libpq.PQsetSingleRowMode(self._pgconn_ptr):
raise e.OperationalError("setting single row mode failed")
+ def cancel_conn(self) -> PGcancelConn:
+ _check_supported("PQcancelCreate", 170000)
+ cdef libpq.PGcancelConn *ptr = libpq.PQcancelCreate(self._pgconn_ptr)
+ if not ptr:
+ raise e.OperationalError("couldn't create cancelConn object")
+ return PGcancelConn._from_ptr(ptr)
+
def get_cancel(self) -> PGcancel:
cdef libpq.PGcancel *ptr = libpq.PQgetCancel(self._pgconn_ptr)
if not ptr:
from __future__ import annotations
+import contextlib
import os
import sys
+import time
import ctypes
import logging
import weakref
+from functools import partial
from select import select
+from typing import Iterator
import pytest
import psycopg
from psycopg import pq
-from psycopg.pq.abc import PGconn
+from psycopg.pq.abc import PGcancelConn, PGconn
import psycopg.generators
def wait(
- conn: PGconn,
+ conn: PGconn | PGcancelConn,
poll_method: str = "connect_poll",
return_on: pq.PollingStatus = pq.PollingStatus.OK,
timeout: int | None = None,
pgconn.set_single_row_mode()
+@contextlib.contextmanager
+def cancellable_query(pgconn: PGconn, monitor_conn: PGconn) -> Iterator[None]:
+ pgconn.send_query_params(b"SELECT pg_sleep($1)", [b"180"])
+ while True:
+ r = monitor_conn.exec_(
+ b"SELECT count(*) FROM pg_stat_activity"
+ b" WHERE query = 'SELECT pg_sleep($1)'"
+ b" AND state = 'active'"
+ )
+ assert r.status == pq.ExecStatus.TUPLES_OK
+ if r.get_value(0, 0) != b"0":
+ break
+
+ time.sleep(0.01)
+
+ yield None
+
+ res = pgconn.get_result()
+ assert res is not None
+ assert res.status == pq.ExecStatus.FATAL_ERROR
+ assert res.error_field(pq.DiagnosticField.SQLSTATE) == b"57014"
+ while pgconn.is_busy():
+ pgconn.consume_input()
+
+
+@pytest.mark.libpq(">= 17")
+def test_cancel_nonblocking(dsn):
+ # mimic test_cancel() from src/test/modules/libpq_pipeline/libpq_pipeline.c
+ def connect() -> PGconn:
+ conn = pq.PGconn.connect(dsn.encode())
+ if conn.status != pq.ConnStatus.OK:
+ pytest.fail(
+ f"bad connection: {conn.error_message.decode('utf8', 'replace')}"
+ )
+ return conn
+
+ conn, monitor_conn = connect(), connect()
+ conn.nonblocking = 1
+
+ # test PQcancel
+ with cancellable_query(conn, monitor_conn):
+ cancel = conn.get_cancel()
+ cancel.cancel()
+
+ # PGcancel object can be reused for the next query
+ with cancellable_query(conn, monitor_conn):
+ cancel.cancel()
+
+ del cancel
+
+ # test PQcancelBlocking
+ with cancellable_query(conn, monitor_conn):
+ cancel_conn = conn.cancel_conn()
+ assert cancel_conn.status == pq.ConnStatus.ALLOCATED
+ cancel_conn.blocking()
+ assert cancel_conn.status == pq.ConnStatus.OK # type: ignore[comparison-overlap] # noqa: E501
+ cancel_conn.finish()
+ del cancel_conn
+
+ wait_cancel = partial(wait, poll_method="poll", timeout=3)
+
+ # test PQcancelCreate and then polling with PQcancelPoll
+ with cancellable_query(conn, monitor_conn):
+ cancel_conn = conn.cancel_conn()
+ assert cancel_conn.status == pq.ConnStatus.ALLOCATED
+ cancel_conn.start()
+ assert cancel_conn.status == pq.ConnStatus.STARTED
+ wait_cancel(cancel_conn)
+ assert cancel_conn.status == pq.ConnStatus.OK
+
+ # test PQcancelReset works on the cancel connection and it can be reused
+ # after
+ cancel_conn.reset()
+ with cancellable_query(conn, monitor_conn):
+ cancel_conn.start()
+ wait_cancel(cancel_conn)
+ assert cancel_conn.status == pq.ConnStatus.OK
+
+ cancel_conn.finish()
+
+
def test_cancel(pgconn):
cancel = pgconn.get_cancel()
cancel.cancel()