command_queue: Deque[PipelineCommand]
result_queue: Deque[PendingResult]
+ _is_supported: Optional[bool] = None
def __init__(self, conn: "BaseConnection[Any]") -> None:
self._conn = conn
self.command_queue.append(self.pgconn.pipeline_sync)
self.result_queue.append(None)
+ @staticmethod
+ def is_supported() -> bool:
+ """Return `True` if the psycopg libpq wrapper suports pipeline mode."""
+ if BasePipeline._is_supported is None:
+ # Support only depends on the libpq functions available in the pq
+ # wrapper, not on the database version.
+ pq_version = pq.__build_version__ or pq.version()
+ BasePipeline._is_supported = pq_version >= 140000
+ return BasePipeline._is_supported
+
def _enter(self) -> None:
self.pgconn.enter_pipeline_mode()
+import psycopg
from psycopg import pq
assert pq.__build_version__ and pq.__build_version__ >= 70400
else:
assert False, f"unexpected libpq implementation: {pq.__impl__}"
+
+
+def test_pipeline_supported():
+ # Note: This test is here because pipeline tests are skipped on libpq < 14
+ if pq.__impl__ == "python":
+ assert psycopg.Pipeline.is_supported() == (pq.version() >= 140000)
+ else:
+ assert pq.__build_version__ is not None
+ assert psycopg.Pipeline.is_supported() == (pq.__build_version__ >= 140000)