For libpq < 14.0, always return 0 (PQ_PIPELINE_OFF).
"""
if libpq.PG_VERSION_NUM < 140000:
- return 0
+ return libpq.PQ_PIPELINE_OFF
cdef int status = libpq.PQpipelineStatus(self._pgconn_ptr)
return status
@pytest.mark.libpq(">= 14")
def test_work_in_progress(pgconn):
assert not pgconn.nonblocking
- assert not pgconn.pipeline_status
+ assert pgconn.pipeline_status == pq.PipelineStatus.OFF
pgconn.enter_pipeline_mode()
pgconn.send_query_params(b"select $1", [b"1"])
with pytest.raises(
@pytest.mark.libpq(">= 14")
def test_multi_pipelines(pgconn):
- assert not pgconn.pipeline_status
+ assert pgconn.pipeline_status == pq.PipelineStatus.OFF
pgconn.enter_pipeline_mode()
pgconn.send_query_params(b"select $1", [b"1"])
pgconn.pipeline_sync()
@pytest.mark.libpq(">= 14")
def test_pipeline_abort(pgconn, table):
- assert not pgconn.pipeline_status
+ assert pgconn.pipeline_status == pq.PipelineStatus.OFF
pgconn.enter_pipeline_mode()
pgconn.send_query_params(b"insert into pipeline values ($1)", [b"1"])
pgconn.send_query_params(b"select no_such_function($1)", [b"1"])