def status(self) -> pq.PipelineStatus:
return pq.PipelineStatus(self.pgconn.pipeline_status)
- @staticmethod
- def is_supported() -> bool:
+ @classmethod
+ def is_supported(cls) -> bool:
"""Return `!True` if the psycopg libpq wrapper supports pipeline mode."""
if BasePipeline._is_supported is None:
- # Support only depends on the libpq functions available in the pq
- # wrapper, not on the database version.
- pq_version = pq.__build_version__ or pq.version()
- # Pipeline support broken in libpq 14.5 (#350)
- BasePipeline._is_supported = pq_version >= 140000 and pq_version != 140005
+ BasePipeline._is_supported = not cls._not_supported_reason()
return BasePipeline._is_supported
+ @classmethod
+ def _not_supported_reason(cls) -> str:
+ """Return the reason why the pipeline mode is not supported.
+
+ Return an empty string if pipeline mode is supported.
+ """
+ # Support only depends on the libpq functions available in the pq
+ # wrapper, not on the database version.
+ if pq.version() < 140000:
+ return (
+ f"libpq too old {pq.version()};"
+ " v14 or greater required for pipeline mode"
+ )
+
+ if pq.__build_version__ < 140000:
+ return (
+ f"libpq too old: module built for {pq.__build_version__};"
+ " v14 or greater required for pipeline mode"
+ )
+
+ # Bug #350
+ if pq.version() == 140005:
+ return f"pipeline mode broken in libpq version {pq.version()}"
+
+ return ""
+
def _enter_gen(self) -> PQGen[None]:
+ if not self.is_supported():
+ raise e.NotSupportedError(
+ f"pipeline mode not supported: {self._not_supported_reason()}"
+ )
if self.level == 0:
self.pgconn.enter_pipeline_mode()
elif self.command_queue:
"timing: the test is timing based and can fail on cheese hardware",
"dns: the test requires dnspython to run",
"postgis: the test requires the PostGIS extension to run",
- "pipeline: the test runs with connection in pipeline mode",
]
for marker in markers:
from psycopg import pq
from psycopg import sql
-from .utils import check_libpq_version, check_postgres_version
+from .utils import check_postgres_version
# Set by warm_up_database() the first time the dsn fixture is used
pg_version: int
break
+def pytest_runtest_setup(item):
+ for m in item.iter_markers(name="pipeline"):
+ if not psycopg.Pipeline.is_supported():
+ pytest.skip(psycopg.Pipeline._not_supported_reason())
+
+
def pytest_configure(config):
# register pg marker
- config.addinivalue_line(
- "markers",
+ markers = [
"pg(version_expr): run the test only with matching server version"
" (e.g. '>= 10', '< 9.6')",
- )
+ "pipeline: the test runs with connection in pipeline mode",
+ ]
+ for marker in markers:
+ config.addinivalue_line("markers", marker)
@pytest.fixture(scope="session")
@pytest.fixture(params=[True, False], ids=["pipeline=on", "pipeline=off"])
def pipeline(request, conn):
if request.param:
- msg = check_libpq_version(pq.version(), ">= 14")
- if msg:
- pytest.skip(msg)
+ if not psycopg.Pipeline.is_supported():
+ pytest.skip(psycopg.Pipeline._not_supported_reason())
with conn.pipeline() as p:
yield p
return
@pytest.fixture(params=[True, False], ids=["pipeline=on", "pipeline=off"])
async def apipeline(request, aconn):
if request.param:
- msg = check_libpq_version(pq.version(), ">= 14")
- if msg:
- pytest.skip(msg)
+ if not psycopg.Pipeline.is_supported():
+ pytest.skip(psycopg.Pipeline._not_supported_reason())
async with aconn.pipeline() as p:
yield p
return
assert not check_libpq_version(got, want)
-def test_pipeline_supported():
- # Note: This test is here because pipeline tests are skipped on libpq < 14
- if pq.__impl__ == "python":
- assert psycopg.Pipeline.is_supported() == (pq.version() >= 140000)
- else:
- assert pq.__build_version__ is not None
- assert psycopg.Pipeline.is_supported() == (pq.__build_version__ >= 140000)
+# Note: These tests are here because test_pipeline.py tests are all skipped
+# when pipeline mode is not supported.
+
+
+@pytest.mark.libpq(">= 14")
+@pytest.mark.libpq("!= 14.5")
+def test_pipeline_supported(conn):
+ assert psycopg.Pipeline.is_supported()
+ assert psycopg.AsyncPipeline.is_supported()
+
+ with conn.pipeline():
+ pass
+
+
+@pytest.mark.libpq("< 14")
+def test_pipeline_not_supported(conn):
+ assert not psycopg.Pipeline.is_supported()
+ assert not psycopg.AsyncPipeline.is_supported()
+
+ with pytest.raises(psycopg.NotSupportedError) as exc:
+ with conn.pipeline():
+ pass
+
+ assert "too old" in str(exc.value)
+
+
+@pytest.mark.libpq("14.5")
+def test_pipeline_not_supported_14_5(conn):
+ # Affected by #350
+ # NOTE: we might support it in binary using a patched libpq version.
+ assert not psycopg.Pipeline.is_supported()
+ assert not psycopg.AsyncPipeline.is_supported()
+
+ with pytest.raises(psycopg.NotSupportedError) as exc:
+ with conn.pipeline():
+ pass
+
+ assert "broken" in str(exc.value)
conn.cursor().mogrify("select %(s)s", {"s": "\u20ac"})
-@pytest.mark.libpq(">= 14")
@pytest.mark.pipeline
def test_message_0x33(conn):
# https://github.com/psycopg/psycopg/issues/314
aconn.cursor().mogrify("select %(s)s", {"s": "\u20ac"})
-@pytest.mark.libpq(">= 14")
@pytest.mark.pipeline
async def test_message_0x33(aconn):
# https://github.com/psycopg/psycopg/issues/314
conn.close()
assert conn.info.transaction_status.name == "UNKNOWN"
- @pytest.mark.libpq(">= 14")
+ @pytest.mark.pipeline
def test_pipeline_status(self, conn):
assert not conn.info.pipeline_status
assert conn.info.pipeline_status.name == "OFF"
import pytest
+import psycopg
from psycopg import waiting
from psycopg import pq
-from .utils import check_libpq_version
-
@pytest.fixture
def pipeline(pgconn):
- if check_libpq_version(pq.version(), ">= 14"):
- pytest.skip("require libpq >= 14")
nb, pgconn.nonblocking = pgconn.nonblocking, True
assert pgconn.nonblocking
pgconn.enter_pipeline_mode()
assert actual_statuses == expected_statuses
+@pytest.mark.pipeline
def test_pipeline_communicate_multi_pipeline(pgconn, pipeline, generators):
commands = deque(
[
_run_pipeline_communicate(pgconn, generators, commands, expected_statuses)
+@pytest.mark.pipeline
def test_pipeline_communicate_no_sync(pgconn, pipeline, generators):
numqueries = 10
commands = deque(
@pytest.fixture
def pipeline_demo(pgconn):
- if check_libpq_version(pq.version(), ">= 14"):
- pytest.skip("require libpq >= 14")
assert pgconn.pipeline_status == 0
res = pgconn.exec_(b"DROP TABLE IF EXISTS pg_pipeline")
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
# TODOCRDB: 1 doesn't get rolled back. Open a ticket?
+@pytest.mark.pipeline
@pytest.mark.crdb("skip", reason="pipeline aborted")
def test_pipeline_communicate_abort(pgconn, pipeline_demo, pipeline, generators):
insert_sql = b"insert into pg_pipeline(itemno) values ($1)"
@pytest.fixture
def pipeline_uniqviol(pgconn):
- if check_libpq_version(pq.version(), ">= 14"):
- pytest.skip("require libpq >= 14")
+ if not psycopg.Pipeline.is_supported():
+ pytest.skip(psycopg.Pipeline._not_supported_reason())
assert pgconn.pipeline_status == 0
res = pgconn.exec_(b"DROP TABLE IF EXISTS pg_pipeline_uniqviol")
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
from psycopg import errors as e
pytestmark = [
- pytest.mark.libpq(">= 14"),
pytest.mark.pipeline,
+ pytest.mark.skipif("not psycopg.Pipeline.is_supported()"),
]
pipeline_aborted = pytest.mark.flakey("the server might get in pipeline aborted")
pytestmark = [
pytest.mark.asyncio,
- pytest.mark.libpq(">= 14"),
pytest.mark.pipeline,
+ pytest.mark.skipif("not psycopg.AsyncPipeline.is_supported()"),
]