]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
feat: add libpq interface for retrieving results in chunks 793/head
authorDenis Laxalde <denis.laxalde@dalibo.com>
Wed, 17 Apr 2024 08:35:45 +0000 (10:35 +0200)
committerDenis Laxalde <denis.laxalde@dalibo.com>
Wed, 17 Apr 2024 10:13:53 +0000 (12:13 +0200)
https://git.postgresql.org/gitweb/?p=postgresql.git;a=commit;h=4643a2b265e967cc5f13ffa0c7c6912dbb3466d0

docs/news.rst
psycopg/psycopg/errors.py
psycopg/psycopg/pq/_enums.py
psycopg/psycopg/pq/_pq_ctypes.py
psycopg/psycopg/pq/_pq_ctypes.pyi
psycopg/psycopg/pq/abc.py
psycopg/psycopg/pq/pq_ctypes.py
psycopg_c/psycopg_c/pq/libpq.pxd
psycopg_c/psycopg_c/pq/pgconn.pyx
tests/pq/test_async.py
tests/pq/test_pgconn.py

index af3f2e32cc55256e8589c661da56b7e16ae598ad..3b0c06ffef68a337a75c253d4472d9c8701dc710 100644 (file)
@@ -40,6 +40,8 @@ Psycopg 3.2 (unreleased)
 - Add `~Connection.cancel_safe()` for encrypted and non-blocking cancellation
   If possible, use such method internally upon `KeyboardInterrupt` and `Copy`
   termination (:ticket:`#754`).
+- Add support for libpq function to retrieve results in chunks introduced in
+  libpq v17 (:ticket:`#793`).
 
 .. __: https://numpy.org/doc/stable/reference/arrays.scalars.html#built-in-scalar-types
 
index 6e9334a8c99e3bdcbc069628ceb089b485cf8d9d..15afa06033da8aca31f17786627c3ca09f1574aa 100644 (file)
@@ -171,6 +171,9 @@ class FinishedPGconn:
     def set_single_row_mode(self) -> NoReturn:
         self._raise()
 
+    def set_chunked_rows_mode(self, size: int) -> NoReturn:
+        self._raise()
+
     def cancel_conn(self) -> NoReturn:
         self._raise()
 
index 6b03b0621feb18398f79b001a010814bdbbffc9a..3218fef87594d1f2ec4ce76c0b3237c525ea47f7 100644 (file)
@@ -121,6 +121,11 @@ class ExecStatus(IntEnum):
     status code until the end of the current pipeline, at which point it will
     return PGRES_PIPELINE_SYNC and normal processing can resume.
     """
+    TUPLES_CHUNK = auto()
+    """The PGresult contains several result tuples from the current command.
+
+    This status occurs only when chunked mode has been selected for the query.
+    """
 
 
 class TransactionStatus(IntEnum):
index c423083fba25f2b76ad9cc209ca50d26c0b0e0f7..787ab99992fc6226bbe753a850779ed173b5dcbc 100644 (file)
@@ -554,11 +554,17 @@ PQflush.argtypes = [PGconn_ptr]
 PQflush.restype = c_int
 
 
-# 33.5. Retrieving Query Results Row-by-Row
+# 32.6. Retrieving Query Results in Chunks
 PQsetSingleRowMode = pq.PQsetSingleRowMode
 PQsetSingleRowMode.argtypes = [PGconn_ptr]
 PQsetSingleRowMode.restype = c_int
 
+if libpq_version >= 170000:
+    PQsetChunkedRowsMode = pq.PQsetChunkedRowsMode
+    PQsetChunkedRowsMode.argtypes = [PGconn_ptr, c_int]
+    PQsetChunkedRowsMode.restype = c_int
+else:
+    PQsetChunkedRowsMode = not_supported_before("PQsetChunkedRowsMode", 170000)
 
 # 33.6. Canceling Queries in Progress
 
index ca23070bd4eb930db19807304f97d3d595a1096a..20da60ac04cea32b09c6824ce106b92b2ff4f157 100644 (file)
@@ -210,6 +210,7 @@ def PQsetnonblocking(arg1: Optional[PGconn_struct], arg2: int) -> int: ...
 def PQisnonblocking(arg1: Optional[PGconn_struct]) -> int: ...
 def PQflush(arg1: Optional[PGconn_struct]) -> int: ...
 def PQsetSingleRowMode(arg1: Optional[PGconn_struct]) -> int: ...
+def PQsetChunkedRowsMode(arg1: Optional[PGconn_struct], arg2: int) -> int: ...
 def PQgetCancel(arg1: Optional[PGconn_struct]) -> PGcancel_struct: ...
 def PQfreeCancel(arg1: Optional[PGcancel_struct]) -> None: ...
 def PQputCopyData(arg1: Optional[PGconn_struct], arg2: bytes, arg3: int) -> int: ...
index 75733bdebccd9203005aa2446f1d3ef6ff5878df..c439c85a14e7461dfe1773f2c0481955e60de870 100644 (file)
@@ -180,6 +180,8 @@ class PGconn(Protocol):
 
     def set_single_row_mode(self) -> None: ...
 
+    def set_chunked_rows_mode(self, size: int) -> None: ...
+
     def cancel_conn(self) -> "PGcancelConn": ...
 
     def get_cancel(self) -> "PGcancel": ...
index ee3b51c19c55eb7ba3ea52b8174e4a2ba63b97e3..fe68fbc53b9936edb1d651ba060a474618036802 100644 (file)
@@ -592,6 +592,10 @@ class PGconn:
         if not impl.PQsetSingleRowMode(self._pgconn_ptr):
             raise e.OperationalError("setting single row mode failed")
 
+    def set_chunked_rows_mode(self, size: int) -> None:
+        if not impl.PQsetChunkedRowsMode(self._pgconn_ptr, size):
+            raise e.OperationalError("setting chunked rows mode failed")
+
     def cancel_conn(self) -> "PGcancelConn":
         """
         Create a connection over which a cancel request can be sent.
index 04c3ca533724acb4130e8cb05e5ac78318b31427..ec5e5f4c444951556b3f93c98ec7885d6dad75f0 100644 (file)
@@ -108,6 +108,7 @@ cdef extern from "libpq-fe.h":
         PGRES_SINGLE_TUPLE
         PGRES_PIPELINE_SYNC
         PGRES_PIPELINE_ABORT
+        PGRES_TUPLES_CHUNK
 
     # 33.1. Database Connection Control Functions
     PGconn *PQconnectdb(const char *conninfo)
@@ -251,8 +252,9 @@ cdef extern from "libpq-fe.h":
     int PQisnonblocking(const PGconn *conn)
     int PQflush(PGconn *conn) nogil
 
-    # 33.5. Retrieving Query Results Row-by-Row
+    # 32.6. Retrieving Query Results in Chunks
     int PQsetSingleRowMode(PGconn *conn)
+    int PQsetChunkedRowsMode(PGconn *conn, int chunkSize)
 
     # 34.7. Canceling Queries in Progress
     PGcancelConn *PQcancelCreate(PGconn *conn)
@@ -353,5 +355,6 @@ typedef struct pg_cancel_conn PGcancelConn;
 #define PQcancelErrorMessage(cancelConn) NULL
 #define PQcancelReset(cancelConn) 0
 #define PQcancelFinish(cancelConn) 0
+#define PQsetChunkedRowsMode(conn, chunkSize) 0
 #endif
 """
index 2f9510328769a3a500a685b1eb85110bf1292404..9f84a4b34715c96d7692b38f0ce0fa251eb91e04 100644 (file)
@@ -499,6 +499,10 @@ cdef class PGconn:
         if not libpq.PQsetSingleRowMode(self._pgconn_ptr):
             raise e.OperationalError("setting single row mode failed")
 
+    def set_chunked_rows_mode(self, size: int) -> None:
+        if not libpq.PQsetChunkedRowsMode(self._pgconn_ptr, size):
+            raise e.OperationalError("setting chunked rows mode failed")
+
     def cancel_conn(self) -> PGcancelConn:
         _check_supported("PQcancelCreate", 170000)
         cdef libpq.PGcancelConn *ptr = libpq.PQcancelCreate(self._pgconn_ptr)
index e529eb743f695eb8650504014eb188c79c43f13e..6fd50e67f98f637c0c09a7277bc5b83a799d6cd8 100644 (file)
@@ -106,6 +106,34 @@ def test_single_row_mode(pgconn):
     assert res.ntuples == 0
 
 
+@pytest.mark.libpq(">= 17")
+def test_chunked_rows_mode(pgconn):
+    pgconn.send_query(b"select generate_series(1,7)")
+    pgconn.set_chunked_rows_mode(3)
+
+    results = execute_wait(pgconn)
+    assert len(results) == 4
+
+    res = results[0]
+    assert res.status == pq.ExecStatus.TUPLES_CHUNK
+    assert res.ntuples == 3
+    assert [res.get_value(i, 0) for i in range(3)] == [b"1", b"2", b"3"]
+
+    res = results[1]
+    assert res.status == pq.ExecStatus.TUPLES_CHUNK
+    assert res.ntuples == 3
+    assert [res.get_value(i, 0) for i in range(3)] == [b"4", b"5", b"6"]
+
+    res = results[2]
+    assert res.status == pq.ExecStatus.TUPLES_CHUNK
+    assert res.ntuples == 1
+    assert res.get_value(0, 0) == b"7"
+
+    res = results[3]
+    assert res.status == pq.ExecStatus.TUPLES_OK
+    assert res.ntuples == 0
+
+
 def test_send_query_params(pgconn):
     pgconn.send_query_params(b"select $1::int + $2", [b"5", b"3"])
     (res,) = execute_wait(pgconn)
index de5271449d92d4042da62771f382afe2e2c688b8..f1010b1ec863f31132c0c85491248d1559e5a49a 100644 (file)
@@ -387,6 +387,15 @@ def test_set_single_row_mode(pgconn):
     pgconn.set_single_row_mode()
 
 
+@pytest.mark.libpq(">= 17")
+def test_set_chunked_rows_mode(pgconn):
+    with pytest.raises(psycopg.OperationalError):
+        pgconn.set_chunked_rows_mode(42)
+
+    pgconn.send_query(b"select 1")
+    pgconn.set_chunked_rows_mode(42)
+
+
 @contextlib.contextmanager
 def cancellable_query(pgconn: PGconn) -> Iterator[None]:
     dsn = b" ".join(b"%s='%s'" % (i.keyword, i.val) for i in pgconn.info if i.val)