]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Add PGconn.send_flush_request()
authorDenis Laxalde <denis.laxalde@dalibo.com>
Mon, 11 Oct 2021 12:18:46 +0000 (14:18 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Mon, 11 Oct 2021 15:08:57 +0000 (17:08 +0200)
This function is associated with the pipeline mode.

psycopg/psycopg/pq/abc.py
psycopg/psycopg/pq/pq_ctypes.py
psycopg_c/psycopg_c/pq/pgconn.pyx
tests/pq/test_pipeline.py

index ae2ad8914803aa04fa43535d8c99a5ffb7049e27..63bb30a71f41a7771f6aab40d614869471aa9c86 100644 (file)
@@ -255,6 +255,9 @@ class PGconn(Protocol):
     def pipeline_sync(self) -> None:
         ...
 
+    def send_flush_request(self) -> None:
+        ...
+
 
 class PGresult(Protocol):
     def clear(self) -> None:
index 6c7b197ede31e2183bc32b3170c6ee9927087c74..42b86a5530bac461ef47aa95c1b9558e2db77d9a 100644 (file)
@@ -665,6 +665,16 @@ class PGconn:
         if rv != 1:
             raise e.OperationalError("failed to sync pipeline")
 
+    def send_flush_request(self) -> None:
+        """Sends a request for the server to flush its output buffer.
+
+        :raises ~e.OperationalError: if the flush request failed.
+        """
+        if impl.PQsendFlushRequest(self._pgconn_ptr) == 0:
+            raise e.OperationalError(
+                f"flush request failed: {error_message(self)}"
+            )
+
     def _call_bytes(
         self, func: Callable[[impl.PGconn_struct], Optional[bytes]]
     ) -> bytes:
index 6c9039d84b57027ce02bc93eb25c225440eeb42a..9d13eb3e787a761d2e21d5032bba3c43b58a8875 100644 (file)
@@ -592,6 +592,20 @@ cdef class PGconn:
         if rv != 1:
             raise e.OperationalError("failed to sync pipeline")
 
+    def send_flush_request(self) -> None:
+        """Sends a request for the server to flush its output buffer.
+
+        :raises ~e.OperationalError: if the flush request failed.
+        """
+        if libpq.PG_VERSION_NUM < 140000:
+            raise e.NotSupportedError(
+                f"PQsendFlushRequest requires libpq from PostgreSQL 14,"
+                f" {libpq.PG_VERSION_NUM} available instead"
+            )
+        cdef int rv = libpq.PQsendFlushRequest(self._pgconn_ptr)
+        if rv == 0:
+            raise e.OperationalError(f"flush request failed: {error_message(self)}")
+
 
 cdef int _ensure_pgconn(PGconn pgconn) except 0:
     if pgconn._pgconn_ptr is not NULL:
index a19333a951e808ee45521fd1d492f3d91f2f6da9..c73f67bcc8baddd04fe0553d4d7670036d6dc1a4 100644 (file)
@@ -13,6 +13,8 @@ def test_old_libpq(pgconn):
         pgconn.exit_pipeline_mode()
     with pytest.raises(psycopg.NotSupportedError):
         pgconn.pipeline_sync()
+    with pytest.raises(psycopg.NotSupportedError):
+        pgconn.send_flush_request()
 
 
 @pytest.mark.libpq(">= 14")
@@ -73,6 +75,18 @@ def test_multi_pipelines(pgconn):
     assert result2.get_value(0, 0) == b"2"
 
 
+@pytest.mark.libpq(">= 14")
+def test_flush_request(pgconn):
+    assert pgconn.pipeline_status == pq.PipelineStatus.OFF
+    pgconn.enter_pipeline_mode()
+    pgconn.send_query_params(b"select $1", [b"1"])
+    pgconn.send_flush_request()
+    r = pgconn.get_result()
+    assert r.status == pq.ExecStatus.TUPLES_OK
+    assert r.get_value(0, 0) == b"1"
+    pgconn.exit_pipeline_mode()
+
+
 @pytest.fixture
 def table(pgconn):
     tablename = "pipeline"