From: Denis Laxalde Date: Mon, 11 Oct 2021 12:18:46 +0000 (+0200) Subject: Add PGconn.send_flush_request() X-Git-Tag: 3.0~10 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b79ebc37cb34836b45e086195b0e16c1b55a9f66;p=thirdparty%2Fpsycopg.git Add PGconn.send_flush_request() This function is associated with the pipeline mode. --- diff --git a/psycopg/psycopg/pq/abc.py b/psycopg/psycopg/pq/abc.py index ae2ad8914..63bb30a71 100644 --- a/psycopg/psycopg/pq/abc.py +++ b/psycopg/psycopg/pq/abc.py @@ -255,6 +255,9 @@ class PGconn(Protocol): def pipeline_sync(self) -> None: ... + def send_flush_request(self) -> None: + ... + class PGresult(Protocol): def clear(self) -> None: diff --git a/psycopg/psycopg/pq/pq_ctypes.py b/psycopg/psycopg/pq/pq_ctypes.py index 6c7b197ed..42b86a553 100644 --- a/psycopg/psycopg/pq/pq_ctypes.py +++ b/psycopg/psycopg/pq/pq_ctypes.py @@ -665,6 +665,16 @@ class PGconn: if rv != 1: raise e.OperationalError("failed to sync pipeline") + def send_flush_request(self) -> None: + """Sends a request for the server to flush its output buffer. + + :raises ~e.OperationalError: if the flush request failed. + """ + if impl.PQsendFlushRequest(self._pgconn_ptr) == 0: + raise e.OperationalError( + f"flush request failed: {error_message(self)}" + ) + def _call_bytes( self, func: Callable[[impl.PGconn_struct], Optional[bytes]] ) -> bytes: diff --git a/psycopg_c/psycopg_c/pq/pgconn.pyx b/psycopg_c/psycopg_c/pq/pgconn.pyx index 6c9039d84..9d13eb3e7 100644 --- a/psycopg_c/psycopg_c/pq/pgconn.pyx +++ b/psycopg_c/psycopg_c/pq/pgconn.pyx @@ -592,6 +592,20 @@ cdef class PGconn: if rv != 1: raise e.OperationalError("failed to sync pipeline") + def send_flush_request(self) -> None: + """Sends a request for the server to flush its output buffer. + + :raises ~e.OperationalError: if the flush request failed. + """ + if libpq.PG_VERSION_NUM < 140000: + raise e.NotSupportedError( + f"PQsendFlushRequest requires libpq from PostgreSQL 14," + f" {libpq.PG_VERSION_NUM} available instead" + ) + cdef int rv = libpq.PQsendFlushRequest(self._pgconn_ptr) + if rv == 0: + raise e.OperationalError(f"flush request failed: {error_message(self)}") + cdef int _ensure_pgconn(PGconn pgconn) except 0: if pgconn._pgconn_ptr is not NULL: diff --git a/tests/pq/test_pipeline.py b/tests/pq/test_pipeline.py index a19333a95..c73f67bcc 100644 --- a/tests/pq/test_pipeline.py +++ b/tests/pq/test_pipeline.py @@ -13,6 +13,8 @@ def test_old_libpq(pgconn): pgconn.exit_pipeline_mode() with pytest.raises(psycopg.NotSupportedError): pgconn.pipeline_sync() + with pytest.raises(psycopg.NotSupportedError): + pgconn.send_flush_request() @pytest.mark.libpq(">= 14") @@ -73,6 +75,18 @@ def test_multi_pipelines(pgconn): assert result2.get_value(0, 0) == b"2" +@pytest.mark.libpq(">= 14") +def test_flush_request(pgconn): + assert pgconn.pipeline_status == pq.PipelineStatus.OFF + pgconn.enter_pipeline_mode() + pgconn.send_query_params(b"select $1", [b"1"]) + pgconn.send_flush_request() + r = pgconn.get_result() + assert r.status == pq.ExecStatus.TUPLES_OK + assert r.get_value(0, 0) == b"1" + pgconn.exit_pipeline_mode() + + @pytest.fixture def table(pgconn): tablename = "pipeline"