From b79ebc37cb34836b45e086195b0e16c1b55a9f66 Mon Sep 17 00:00:00 2001 From: Denis Laxalde Date: Mon, 11 Oct 2021 14:18:46 +0200 Subject: [PATCH] Add PGconn.send_flush_request() This function is associated with the pipeline mode. --- psycopg/psycopg/pq/abc.py | 3 +++ psycopg/psycopg/pq/pq_ctypes.py | 10 ++++++++++ psycopg_c/psycopg_c/pq/pgconn.pyx | 14 ++++++++++++++ tests/pq/test_pipeline.py | 14 ++++++++++++++ 4 files changed, 41 insertions(+) diff --git a/psycopg/psycopg/pq/abc.py b/psycopg/psycopg/pq/abc.py index ae2ad8914..63bb30a71 100644 --- a/psycopg/psycopg/pq/abc.py +++ b/psycopg/psycopg/pq/abc.py @@ -255,6 +255,9 @@ class PGconn(Protocol): def pipeline_sync(self) -> None: ... + def send_flush_request(self) -> None: + ... + class PGresult(Protocol): def clear(self) -> None: diff --git a/psycopg/psycopg/pq/pq_ctypes.py b/psycopg/psycopg/pq/pq_ctypes.py index 6c7b197ed..42b86a553 100644 --- a/psycopg/psycopg/pq/pq_ctypes.py +++ b/psycopg/psycopg/pq/pq_ctypes.py @@ -665,6 +665,16 @@ class PGconn: if rv != 1: raise e.OperationalError("failed to sync pipeline") + def send_flush_request(self) -> None: + """Sends a request for the server to flush its output buffer. + + :raises ~e.OperationalError: if the flush request failed. + """ + if impl.PQsendFlushRequest(self._pgconn_ptr) == 0: + raise e.OperationalError( + f"flush request failed: {error_message(self)}" + ) + def _call_bytes( self, func: Callable[[impl.PGconn_struct], Optional[bytes]] ) -> bytes: diff --git a/psycopg_c/psycopg_c/pq/pgconn.pyx b/psycopg_c/psycopg_c/pq/pgconn.pyx index 6c9039d84..9d13eb3e7 100644 --- a/psycopg_c/psycopg_c/pq/pgconn.pyx +++ b/psycopg_c/psycopg_c/pq/pgconn.pyx @@ -592,6 +592,20 @@ cdef class PGconn: if rv != 1: raise e.OperationalError("failed to sync pipeline") + def send_flush_request(self) -> None: + """Sends a request for the server to flush its output buffer. + + :raises ~e.OperationalError: if the flush request failed. + """ + if libpq.PG_VERSION_NUM < 140000: + raise e.NotSupportedError( + f"PQsendFlushRequest requires libpq from PostgreSQL 14," + f" {libpq.PG_VERSION_NUM} available instead" + ) + cdef int rv = libpq.PQsendFlushRequest(self._pgconn_ptr) + if rv == 0: + raise e.OperationalError(f"flush request failed: {error_message(self)}") + cdef int _ensure_pgconn(PGconn pgconn) except 0: if pgconn._pgconn_ptr is not NULL: diff --git a/tests/pq/test_pipeline.py b/tests/pq/test_pipeline.py index a19333a95..c73f67bcc 100644 --- a/tests/pq/test_pipeline.py +++ b/tests/pq/test_pipeline.py @@ -13,6 +13,8 @@ def test_old_libpq(pgconn): pgconn.exit_pipeline_mode() with pytest.raises(psycopg.NotSupportedError): pgconn.pipeline_sync() + with pytest.raises(psycopg.NotSupportedError): + pgconn.send_flush_request() @pytest.mark.libpq(">= 14") @@ -73,6 +75,18 @@ def test_multi_pipelines(pgconn): assert result2.get_value(0, 0) == b"2" +@pytest.mark.libpq(">= 14") +def test_flush_request(pgconn): + assert pgconn.pipeline_status == pq.PipelineStatus.OFF + pgconn.enter_pipeline_mode() + pgconn.send_query_params(b"select $1", [b"1"]) + pgconn.send_flush_request() + r = pgconn.get_result() + assert r.status == pq.ExecStatus.TUPLES_OK + assert r.get_value(0, 0) == b"1" + pgconn.exit_pipeline_mode() + + @pytest.fixture def table(pgconn): tablename = "pipeline" -- 2.47.2