from typing import Any, Callable, List, Optional, Sequence, Tuple
from typing import Union, TYPE_CHECKING
-from ._enums import Format, PipelineStatus
+from ._enums import Format
from .._compat import Protocol
if TYPE_CHECKING:
def make_empty_result(self, exec_status: int) -> "PGresult":
...
- def pipeline_status(self) -> PipelineStatus:
+ @property
+ def pipeline_status(self) -> int:
...
def enter_pipeline_mode(self) -> None:
from . import _pq_ctypes as impl
from .misc import PGnotify, ConninfoOption, PGresAttDesc
from .misc import error_message, connection_summary
-from ._enums import Format, ExecStatus, PipelineStatus
+from ._enums import Format, ExecStatus
if TYPE_CHECKING:
from . import abc
raise MemoryError("couldn't allocate empty PGresult")
return PGresult(rv)
- def pipeline_status(self) -> PipelineStatus:
- return PipelineStatus(impl.PQpipelineStatus(self._pgconn_ptr))
+ @property
+ def pipeline_status(self) -> int:
+ if version() < 140000:
+ return 0
+ return impl.PQpipelineStatus(self._pgconn_ptr)
def enter_pipeline_mode(self) -> None:
"""Enter pipeline mode.
raise MemoryError("couldn't allocate empty PGresult")
return PGresult._from_ptr(rv)
- def pipeline_status(self) -> PipelineStatus:
- """Return the current pipeline mode status."""
+ @property
+ def pipeline_status(self) -> int:
+ """The current pipeline mode status.
+
+ For libpq < 14.0, always return 0 (PQ_PIPELINE_OFF).
+ """
if libpq.PG_VERSION_NUM < 140000:
- raise e.NotSupportedError(
- f"PQpipelineStatus requires libpq from PostgreSQL 14,"
- f" {libpq.PG_VERSION_NUM} available instead"
- )
+ return 0
cdef int status = libpq.PQpipelineStatus(self._pgconn_ptr)
- return PipelineStatus(status)
+ return status
def enter_pipeline_mode(self) -> None:
"""Enter pipeline mode.
@pytest.mark.libpq("< 14")
-def test_unsupported(pgconn):
+def test_old_libpq(pgconn):
+ assert pgconn.pipeline_status == 0
with pytest.raises(psycopg.NotSupportedError):
pgconn.enter_pipeline_mode()
with pytest.raises(psycopg.NotSupportedError):
pgconn.exit_pipeline_mode()
- with pytest.raises(psycopg.NotSupportedError):
- pgconn.pipeline_status()
with pytest.raises(psycopg.NotSupportedError):
pgconn.pipeline_sync()
@pytest.mark.libpq(">= 14")
def test_work_in_progress(pgconn):
assert not pgconn.nonblocking
+ assert not pgconn.pipeline_status
pgconn.enter_pipeline_mode()
pgconn.send_query_params(b"select $1", [b"1"])
with pytest.raises(
@pytest.mark.libpq(">= 14")
def test_multi_pipelines(pgconn):
+ assert not pgconn.pipeline_status
pgconn.enter_pipeline_mode()
pgconn.send_query_params(b"select $1", [b"1"])
pgconn.pipeline_sync()
assert sync_result.status == pq.ExecStatus.PIPELINE_SYNC
# pipeline still ON
- assert pgconn.pipeline_status() == pq.PipelineStatus.ON
+ assert pgconn.pipeline_status == pq.PipelineStatus.ON
pgconn.exit_pipeline_mode()
- assert pgconn.pipeline_status() == pq.PipelineStatus.OFF
+ assert pgconn.pipeline_status == pq.PipelineStatus.OFF
assert result1.get_value(0, 0) == b"1"
assert result2.get_value(0, 0) == b"2"
@pytest.mark.libpq(">= 14")
def test_pipeline_abort(pgconn, table):
+ assert not pgconn.pipeline_status
pgconn.enter_pipeline_mode()
pgconn.send_query_params(b"insert into pipeline values ($1)", [b"1"])
pgconn.send_query_params(b"select no_such_function($1)", [b"1"])
assert pgconn.get_result() is None
# pipeline should be aborted, due to previous error
- assert pgconn.pipeline_status() == pq.PipelineStatus.ABORTED
+ assert pgconn.pipeline_status == pq.PipelineStatus.ABORTED
# result from second INSERT, aborted due to previous error
r = pgconn.get_result()
assert pgconn.get_result() is None
# pipeline is still aborted
- assert pgconn.pipeline_status() == pq.PipelineStatus.ABORTED
+ assert pgconn.pipeline_status == pq.PipelineStatus.ABORTED
# sync result
r = pgconn.get_result()
assert r.status == pq.ExecStatus.PIPELINE_SYNC
# aborted flag is clear, pipeline is on again
- assert pgconn.pipeline_status() == pq.PipelineStatus.ON
+ assert pgconn.pipeline_status == pq.PipelineStatus.ON
# result from the third INSERT
r = pgconn.get_result()