from .connection import Connection
from .transaction import Rollback, Transaction, AsyncTransaction
from .cursor_async import AsyncCursor
-from ._capabilities import Capabilities
+from ._capabilities import Capabilities, capabilities
from .server_cursor import AsyncServerCursor, ServerCursor
from .client_cursor import AsyncClientCursor, ClientCursor
from .raw_cursor import AsyncRawCursor, RawCursor
if logger.level == logging.NOTSET:
logger.setLevel(logging.WARNING)
-# A global object to check for capabilities.
-capabilities = Capabilities()
-
# DBAPI compliance
connect = Connection.connect
apilevel = "2.0"
"AsyncServerCursor",
"AsyncTransaction",
"BaseConnection",
+ "Capabilities",
+ "capabilities",
"ClientCursor",
"Column",
"Connection",
return f"the psycopg[binary] package version {version}"
else:
return "system libraries"
+
+
+# The object that will be exposed by the module.
+capabilities = Capabilities()
from ._encodings import pgconn_encoding
from ._preparing import Key, Prepare
from .generators import pipeline_communicate, fetch_many, send
+from ._capabilities import capabilities
if TYPE_CHECKING:
from .pq.abc import PGresult
def is_supported(cls) -> bool:
"""Return `!True` if the psycopg libpq wrapper supports pipeline mode."""
if BasePipeline._is_supported is None:
- BasePipeline._is_supported = not cls._not_supported_reason()
+ BasePipeline._is_supported = capabilities.has_pipeline()
return BasePipeline._is_supported
- @classmethod
- def _not_supported_reason(cls) -> str:
- """Return the reason why the pipeline mode is not supported.
-
- Return an empty string if pipeline mode is supported.
- """
- # Support only depends on the libpq functions available in the pq
- # wrapper, not on the database version.
- if pq.version() < 140000:
- return (
- f"libpq too old {pq.version()};"
- " v14 or greater required for pipeline mode"
- )
-
- if pq.__build_version__ < 140000:
- return (
- f"libpq too old: module built for {pq.__build_version__};"
- " v14 or greater required for pipeline mode"
- )
-
- return ""
-
def _enter_gen(self) -> PQGen[None]:
- if not self.is_supported():
- raise e.NotSupportedError(
- f"pipeline mode not supported: {self._not_supported_reason()}"
- )
+ if not self._is_supported:
+ capabilities.has_pipeline(check=True)
if self.level == 0:
self.pgconn.enter_pipeline_mode()
elif self.command_queue or self.pgconn.transaction_status == ACTIVE:
def pytest_runtest_setup(item):
- for m in item.iter_markers(name="pipeline"):
- if not psycopg.Pipeline.is_supported():
- pytest.skip(psycopg.Pipeline._not_supported_reason())
+ try:
+ psycopg.capabilities.has_pipeline(check=True)
+ except psycopg.NotSupportedError as ex:
+ for m in item.iter_markers(name="pipeline"):
+ pytest.skip(str(ex))
def pytest_configure(config):
@pytest.fixture(params=[True, False], ids=["pipeline=on", "pipeline=off"])
def pipeline(request, conn):
if request.param:
- if not psycopg.Pipeline.is_supported():
- pytest.skip(psycopg.Pipeline._not_supported_reason())
+ try:
+ psycopg.capabilities.has_pipeline(check=True)
+ except psycopg.NotSupportedError as ex:
+ pytest.skip(str(ex))
with conn.pipeline() as p:
yield p
return
@pytest.fixture(params=[True, False], ids=["pipeline=on", "pipeline=off"])
async def apipeline(request, aconn):
if request.param:
- if not psycopg.Pipeline.is_supported():
- pytest.skip(psycopg.Pipeline._not_supported_reason())
+ try:
+ psycopg.capabilities.has_pipeline(check=True)
+ except psycopg.NotSupportedError as ex:
+ pytest.skip(str(ex))
async with aconn.pipeline() as p:
yield p
return
with conn.pipeline():
pass
- assert "too old" in str(exc.value)
+ assert "requires libpq version 14.0 or newer" in str(exc.value)
@pytest.fixture
def pipeline_uniqviol(pgconn):
- if not psycopg.Pipeline.is_supported():
- pytest.skip(psycopg.Pipeline._not_supported_reason())
+ try:
+ psycopg.capabilities.has_pipeline(check=True)
+ except psycopg.NotSupportedError as ex:
+ pytest.skip(str(ex))
assert pgconn.pipeline_status == 0
res = pgconn.exec_(b"DROP TABLE IF EXISTS pg_pipeline_uniqviol")
assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message