This function is associated with the pipeline mode.
def pipeline_sync(self) -> None:
...
+ def send_flush_request(self) -> None:
+ ...
+
class PGresult(Protocol):
def clear(self) -> None:
if rv != 1:
raise e.OperationalError("failed to sync pipeline")
+ def send_flush_request(self) -> None:
+ """Sends a request for the server to flush its output buffer.
+
+ :raises ~e.OperationalError: if the flush request failed.
+ """
+ if impl.PQsendFlushRequest(self._pgconn_ptr) == 0:
+ raise e.OperationalError(
+ f"flush request failed: {error_message(self)}"
+ )
+
def _call_bytes(
self, func: Callable[[impl.PGconn_struct], Optional[bytes]]
) -> bytes:
if rv != 1:
raise e.OperationalError("failed to sync pipeline")
+ def send_flush_request(self) -> None:
+ """Sends a request for the server to flush its output buffer.
+
+ :raises ~e.OperationalError: if the flush request failed.
+ """
+ if libpq.PG_VERSION_NUM < 140000:
+ raise e.NotSupportedError(
+ f"PQsendFlushRequest requires libpq from PostgreSQL 14,"
+ f" {libpq.PG_VERSION_NUM} available instead"
+ )
+ cdef int rv = libpq.PQsendFlushRequest(self._pgconn_ptr)
+ if rv == 0:
+ raise e.OperationalError(f"flush request failed: {error_message(self)}")
+
cdef int _ensure_pgconn(PGconn pgconn) except 0:
if pgconn._pgconn_ptr is not NULL:
pgconn.exit_pipeline_mode()
with pytest.raises(psycopg.NotSupportedError):
pgconn.pipeline_sync()
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.send_flush_request()
@pytest.mark.libpq(">= 14")
assert result2.get_value(0, 0) == b"2"
+@pytest.mark.libpq(">= 14")
+def test_flush_request(pgconn):
+ assert pgconn.pipeline_status == pq.PipelineStatus.OFF
+ pgconn.enter_pipeline_mode()
+ pgconn.send_query_params(b"select $1", [b"1"])
+ pgconn.send_flush_request()
+ r = pgconn.get_result()
+ assert r.status == pq.ExecStatus.TUPLES_OK
+ assert r.get_value(0, 0) == b"1"
+ pgconn.exit_pipeline_mode()
+
+
@pytest.fixture
def table(pgconn):
tablename = "pipeline"