for marker in markers:
config.addinivalue_line("markers", marker)
+ config.addinivalue_line(
+ "markers",
+ "pipeline: the test runs with connection in pipeline mode",
+ )
+
def pytest_addoption(parser):
parser.addoption(
import psycopg
from psycopg import pq
-from .utils import check_server_version
+from .utils import check_libpq_version, check_server_version
def pytest_addoption(parser):
]
+def pytest_collection_modifyitems(items):
+ for item in items:
+ for name in item.fixturenames:
+ if name in ("pipeline", "apipeline"):
+ item.add_marker(pytest.mark.pipeline)
+ break
+
+
def pytest_configure(config):
# register pg marker
config.addinivalue_line(
conn.close()
+@pytest.fixture(params=[True, False], ids=["pipeline=on", "pipeline=off"])
+def pipeline(request, conn):
+ if request.param:
+ msg = check_libpq_version(pq.version(), ">= 14")
+ if msg:
+ pytest.skip(msg)
+ with conn.pipeline() as p:
+ yield p
+ return
+ else:
+ yield None
+
+
@pytest.fixture
async def aconn(dsn, request, tracefile):
"""Return an `AsyncConnection` connected to the ``--test-dsn`` database."""
await conn.close()
+@pytest.fixture(params=[True, False], ids=["pipeline=on", "pipeline=off"])
+async def apipeline(request, aconn):
+ if request.param:
+ msg = check_libpq_version(pq.version(), ">= 14")
+ if msg:
+ pytest.skip(msg)
+ async with aconn.pipeline() as p:
+ yield p
+ return
+ else:
+ yield None
+
+
@pytest.fixture(scope="session")
def svcconn(dsn):
"""