- Add `~Connection.cancel_safe()` for encrypted and non-blocking cancellation
If possible, use such method internally upon `KeyboardInterrupt` and `Copy`
termination (:ticket:`#754`).
+- Add support for libpq function to retrieve results in chunks introduced in
+ libpq v17 (:ticket:`#793`).
.. __: https://numpy.org/doc/stable/reference/arrays.scalars.html#built-in-scalar-types
def set_single_row_mode(self) -> NoReturn:
self._raise()
+ def set_chunked_rows_mode(self, size: int) -> NoReturn:
+ self._raise()
+
def cancel_conn(self) -> NoReturn:
self._raise()
status code until the end of the current pipeline, at which point it will
return PGRES_PIPELINE_SYNC and normal processing can resume.
"""
+ TUPLES_CHUNK = auto()
+ """The PGresult contains several result tuples from the current command.
+
+ This status occurs only when chunked mode has been selected for the query.
+ """
class TransactionStatus(IntEnum):
PQflush.restype = c_int
-# 33.5. Retrieving Query Results Row-by-Row
+# 32.6. Retrieving Query Results in Chunks
PQsetSingleRowMode = pq.PQsetSingleRowMode
PQsetSingleRowMode.argtypes = [PGconn_ptr]
PQsetSingleRowMode.restype = c_int
+if libpq_version >= 170000:
+ PQsetChunkedRowsMode = pq.PQsetChunkedRowsMode
+ PQsetChunkedRowsMode.argtypes = [PGconn_ptr, c_int]
+ PQsetChunkedRowsMode.restype = c_int
+else:
+ PQsetChunkedRowsMode = not_supported_before("PQsetChunkedRowsMode", 170000)
# 33.6. Canceling Queries in Progress
def PQisnonblocking(arg1: Optional[PGconn_struct]) -> int: ...
def PQflush(arg1: Optional[PGconn_struct]) -> int: ...
def PQsetSingleRowMode(arg1: Optional[PGconn_struct]) -> int: ...
+def PQsetChunkedRowsMode(arg1: Optional[PGconn_struct], arg2: int) -> int: ...
def PQgetCancel(arg1: Optional[PGconn_struct]) -> PGcancel_struct: ...
def PQfreeCancel(arg1: Optional[PGcancel_struct]) -> None: ...
def PQputCopyData(arg1: Optional[PGconn_struct], arg2: bytes, arg3: int) -> int: ...
def set_single_row_mode(self) -> None: ...
+ def set_chunked_rows_mode(self, size: int) -> None: ...
+
def cancel_conn(self) -> "PGcancelConn": ...
def get_cancel(self) -> "PGcancel": ...
if not impl.PQsetSingleRowMode(self._pgconn_ptr):
raise e.OperationalError("setting single row mode failed")
+ def set_chunked_rows_mode(self, size: int) -> None:
+ if not impl.PQsetChunkedRowsMode(self._pgconn_ptr, size):
+ raise e.OperationalError("setting chunked rows mode failed")
+
def cancel_conn(self) -> "PGcancelConn":
"""
Create a connection over which a cancel request can be sent.
PGRES_SINGLE_TUPLE
PGRES_PIPELINE_SYNC
PGRES_PIPELINE_ABORT
+ PGRES_TUPLES_CHUNK
# 33.1. Database Connection Control Functions
PGconn *PQconnectdb(const char *conninfo)
int PQisnonblocking(const PGconn *conn)
int PQflush(PGconn *conn) nogil
- # 33.5. Retrieving Query Results Row-by-Row
+ # 32.6. Retrieving Query Results in Chunks
int PQsetSingleRowMode(PGconn *conn)
+ int PQsetChunkedRowsMode(PGconn *conn, int chunkSize)
# 34.7. Canceling Queries in Progress
PGcancelConn *PQcancelCreate(PGconn *conn)
#define PQcancelErrorMessage(cancelConn) NULL
#define PQcancelReset(cancelConn) 0
#define PQcancelFinish(cancelConn) 0
+#define PQsetChunkedRowsMode(conn, chunkSize) 0
#endif
"""
if not libpq.PQsetSingleRowMode(self._pgconn_ptr):
raise e.OperationalError("setting single row mode failed")
+ def set_chunked_rows_mode(self, size: int) -> None:
+ if not libpq.PQsetChunkedRowsMode(self._pgconn_ptr, size):
+ raise e.OperationalError("setting chunked rows mode failed")
+
def cancel_conn(self) -> PGcancelConn:
_check_supported("PQcancelCreate", 170000)
cdef libpq.PGcancelConn *ptr = libpq.PQcancelCreate(self._pgconn_ptr)
assert res.ntuples == 0
+@pytest.mark.libpq(">= 17")
+def test_chunked_rows_mode(pgconn):
+ pgconn.send_query(b"select generate_series(1,7)")
+ pgconn.set_chunked_rows_mode(3)
+
+ results = execute_wait(pgconn)
+ assert len(results) == 4
+
+ res = results[0]
+ assert res.status == pq.ExecStatus.TUPLES_CHUNK
+ assert res.ntuples == 3
+ assert [res.get_value(i, 0) for i in range(3)] == [b"1", b"2", b"3"]
+
+ res = results[1]
+ assert res.status == pq.ExecStatus.TUPLES_CHUNK
+ assert res.ntuples == 3
+ assert [res.get_value(i, 0) for i in range(3)] == [b"4", b"5", b"6"]
+
+ res = results[2]
+ assert res.status == pq.ExecStatus.TUPLES_CHUNK
+ assert res.ntuples == 1
+ assert res.get_value(0, 0) == b"7"
+
+ res = results[3]
+ assert res.status == pq.ExecStatus.TUPLES_OK
+ assert res.ntuples == 0
+
+
def test_send_query_params(pgconn):
pgconn.send_query_params(b"select $1::int + $2", [b"5", b"3"])
(res,) = execute_wait(pgconn)
pgconn.set_single_row_mode()
+@pytest.mark.libpq(">= 17")
+def test_set_chunked_rows_mode(pgconn):
+ with pytest.raises(psycopg.OperationalError):
+ pgconn.set_chunked_rows_mode(42)
+
+ pgconn.send_query(b"select 1")
+ pgconn.set_chunked_rows_mode(42)
+
+
@contextlib.contextmanager
def cancellable_query(pgconn: PGconn) -> Iterator[None]:
dsn = b" ".join(b"%s='%s'" % (i.keyword, i.val) for i in pgconn.info if i.val)