From: Denis Laxalde Date: Wed, 17 Apr 2024 08:35:45 +0000 (+0200) Subject: feat: add libpq interface for retrieving results in chunks X-Git-Tag: 3.2.0~42 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=refs%2Fpull%2F793%2Fhead;p=thirdparty%2Fpsycopg.git feat: add libpq interface for retrieving results in chunks https://git.postgresql.org/gitweb/?p=postgresql.git;a=commit;h=4643a2b265e967cc5f13ffa0c7c6912dbb3466d0 --- diff --git a/docs/news.rst b/docs/news.rst index af3f2e32c..3b0c06ffe 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -40,6 +40,8 @@ Psycopg 3.2 (unreleased) - Add `~Connection.cancel_safe()` for encrypted and non-blocking cancellation If possible, use such method internally upon `KeyboardInterrupt` and `Copy` termination (:ticket:`#754`). +- Add support for libpq function to retrieve results in chunks introduced in + libpq v17 (:ticket:`#793`). .. __: https://numpy.org/doc/stable/reference/arrays.scalars.html#built-in-scalar-types diff --git a/psycopg/psycopg/errors.py b/psycopg/psycopg/errors.py index 6e9334a8c..15afa0603 100644 --- a/psycopg/psycopg/errors.py +++ b/psycopg/psycopg/errors.py @@ -171,6 +171,9 @@ class FinishedPGconn: def set_single_row_mode(self) -> NoReturn: self._raise() + def set_chunked_rows_mode(self, size: int) -> NoReturn: + self._raise() + def cancel_conn(self) -> NoReturn: self._raise() diff --git a/psycopg/psycopg/pq/_enums.py b/psycopg/psycopg/pq/_enums.py index 6b03b0621..3218fef87 100644 --- a/psycopg/psycopg/pq/_enums.py +++ b/psycopg/psycopg/pq/_enums.py @@ -121,6 +121,11 @@ class ExecStatus(IntEnum): status code until the end of the current pipeline, at which point it will return PGRES_PIPELINE_SYNC and normal processing can resume. """ + TUPLES_CHUNK = auto() + """The PGresult contains several result tuples from the current command. + + This status occurs only when chunked mode has been selected for the query. + """ class TransactionStatus(IntEnum): diff --git a/psycopg/psycopg/pq/_pq_ctypes.py b/psycopg/psycopg/pq/_pq_ctypes.py index c423083fb..787ab9999 100644 --- a/psycopg/psycopg/pq/_pq_ctypes.py +++ b/psycopg/psycopg/pq/_pq_ctypes.py @@ -554,11 +554,17 @@ PQflush.argtypes = [PGconn_ptr] PQflush.restype = c_int -# 33.5. Retrieving Query Results Row-by-Row +# 32.6. Retrieving Query Results in Chunks PQsetSingleRowMode = pq.PQsetSingleRowMode PQsetSingleRowMode.argtypes = [PGconn_ptr] PQsetSingleRowMode.restype = c_int +if libpq_version >= 170000: + PQsetChunkedRowsMode = pq.PQsetChunkedRowsMode + PQsetChunkedRowsMode.argtypes = [PGconn_ptr, c_int] + PQsetChunkedRowsMode.restype = c_int +else: + PQsetChunkedRowsMode = not_supported_before("PQsetChunkedRowsMode", 170000) # 33.6. Canceling Queries in Progress diff --git a/psycopg/psycopg/pq/_pq_ctypes.pyi b/psycopg/psycopg/pq/_pq_ctypes.pyi index ca23070bd..20da60ac0 100644 --- a/psycopg/psycopg/pq/_pq_ctypes.pyi +++ b/psycopg/psycopg/pq/_pq_ctypes.pyi @@ -210,6 +210,7 @@ def PQsetnonblocking(arg1: Optional[PGconn_struct], arg2: int) -> int: ... def PQisnonblocking(arg1: Optional[PGconn_struct]) -> int: ... def PQflush(arg1: Optional[PGconn_struct]) -> int: ... def PQsetSingleRowMode(arg1: Optional[PGconn_struct]) -> int: ... +def PQsetChunkedRowsMode(arg1: Optional[PGconn_struct], arg2: int) -> int: ... def PQgetCancel(arg1: Optional[PGconn_struct]) -> PGcancel_struct: ... def PQfreeCancel(arg1: Optional[PGcancel_struct]) -> None: ... def PQputCopyData(arg1: Optional[PGconn_struct], arg2: bytes, arg3: int) -> int: ... diff --git a/psycopg/psycopg/pq/abc.py b/psycopg/psycopg/pq/abc.py index 75733bdeb..c439c85a1 100644 --- a/psycopg/psycopg/pq/abc.py +++ b/psycopg/psycopg/pq/abc.py @@ -180,6 +180,8 @@ class PGconn(Protocol): def set_single_row_mode(self) -> None: ... + def set_chunked_rows_mode(self, size: int) -> None: ... + def cancel_conn(self) -> "PGcancelConn": ... def get_cancel(self) -> "PGcancel": ... diff --git a/psycopg/psycopg/pq/pq_ctypes.py b/psycopg/psycopg/pq/pq_ctypes.py index ee3b51c19..fe68fbc53 100644 --- a/psycopg/psycopg/pq/pq_ctypes.py +++ b/psycopg/psycopg/pq/pq_ctypes.py @@ -592,6 +592,10 @@ class PGconn: if not impl.PQsetSingleRowMode(self._pgconn_ptr): raise e.OperationalError("setting single row mode failed") + def set_chunked_rows_mode(self, size: int) -> None: + if not impl.PQsetChunkedRowsMode(self._pgconn_ptr, size): + raise e.OperationalError("setting chunked rows mode failed") + def cancel_conn(self) -> "PGcancelConn": """ Create a connection over which a cancel request can be sent. diff --git a/psycopg_c/psycopg_c/pq/libpq.pxd b/psycopg_c/psycopg_c/pq/libpq.pxd index 04c3ca533..ec5e5f4c4 100644 --- a/psycopg_c/psycopg_c/pq/libpq.pxd +++ b/psycopg_c/psycopg_c/pq/libpq.pxd @@ -108,6 +108,7 @@ cdef extern from "libpq-fe.h": PGRES_SINGLE_TUPLE PGRES_PIPELINE_SYNC PGRES_PIPELINE_ABORT + PGRES_TUPLES_CHUNK # 33.1. Database Connection Control Functions PGconn *PQconnectdb(const char *conninfo) @@ -251,8 +252,9 @@ cdef extern from "libpq-fe.h": int PQisnonblocking(const PGconn *conn) int PQflush(PGconn *conn) nogil - # 33.5. Retrieving Query Results Row-by-Row + # 32.6. Retrieving Query Results in Chunks int PQsetSingleRowMode(PGconn *conn) + int PQsetChunkedRowsMode(PGconn *conn, int chunkSize) # 34.7. Canceling Queries in Progress PGcancelConn *PQcancelCreate(PGconn *conn) @@ -353,5 +355,6 @@ typedef struct pg_cancel_conn PGcancelConn; #define PQcancelErrorMessage(cancelConn) NULL #define PQcancelReset(cancelConn) 0 #define PQcancelFinish(cancelConn) 0 +#define PQsetChunkedRowsMode(conn, chunkSize) 0 #endif """ diff --git a/psycopg_c/psycopg_c/pq/pgconn.pyx b/psycopg_c/psycopg_c/pq/pgconn.pyx index 2f9510328..9f84a4b34 100644 --- a/psycopg_c/psycopg_c/pq/pgconn.pyx +++ b/psycopg_c/psycopg_c/pq/pgconn.pyx @@ -499,6 +499,10 @@ cdef class PGconn: if not libpq.PQsetSingleRowMode(self._pgconn_ptr): raise e.OperationalError("setting single row mode failed") + def set_chunked_rows_mode(self, size: int) -> None: + if not libpq.PQsetChunkedRowsMode(self._pgconn_ptr, size): + raise e.OperationalError("setting chunked rows mode failed") + def cancel_conn(self) -> PGcancelConn: _check_supported("PQcancelCreate", 170000) cdef libpq.PGcancelConn *ptr = libpq.PQcancelCreate(self._pgconn_ptr) diff --git a/tests/pq/test_async.py b/tests/pq/test_async.py index e529eb743..6fd50e67f 100644 --- a/tests/pq/test_async.py +++ b/tests/pq/test_async.py @@ -106,6 +106,34 @@ def test_single_row_mode(pgconn): assert res.ntuples == 0 +@pytest.mark.libpq(">= 17") +def test_chunked_rows_mode(pgconn): + pgconn.send_query(b"select generate_series(1,7)") + pgconn.set_chunked_rows_mode(3) + + results = execute_wait(pgconn) + assert len(results) == 4 + + res = results[0] + assert res.status == pq.ExecStatus.TUPLES_CHUNK + assert res.ntuples == 3 + assert [res.get_value(i, 0) for i in range(3)] == [b"1", b"2", b"3"] + + res = results[1] + assert res.status == pq.ExecStatus.TUPLES_CHUNK + assert res.ntuples == 3 + assert [res.get_value(i, 0) for i in range(3)] == [b"4", b"5", b"6"] + + res = results[2] + assert res.status == pq.ExecStatus.TUPLES_CHUNK + assert res.ntuples == 1 + assert res.get_value(0, 0) == b"7" + + res = results[3] + assert res.status == pq.ExecStatus.TUPLES_OK + assert res.ntuples == 0 + + def test_send_query_params(pgconn): pgconn.send_query_params(b"select $1::int + $2", [b"5", b"3"]) (res,) = execute_wait(pgconn) diff --git a/tests/pq/test_pgconn.py b/tests/pq/test_pgconn.py index de5271449..f1010b1ec 100644 --- a/tests/pq/test_pgconn.py +++ b/tests/pq/test_pgconn.py @@ -387,6 +387,15 @@ def test_set_single_row_mode(pgconn): pgconn.set_single_row_mode() +@pytest.mark.libpq(">= 17") +def test_set_chunked_rows_mode(pgconn): + with pytest.raises(psycopg.OperationalError): + pgconn.set_chunked_rows_mode(42) + + pgconn.send_query(b"select 1") + pgconn.set_chunked_rows_mode(42) + + @contextlib.contextmanager def cancellable_query(pgconn: PGconn) -> Iterator[None]: dsn = b" ".join(b"%s='%s'" % (i.keyword, i.val) for i in pgconn.info if i.val)