.. automethod:: fileno
+ .. automethod:: pipeline
+
.. _tpc-methods:
.. automethod:: set_read_only
.. automethod:: set_deferrable
+ .. automethod:: pipeline
+
+ .. note::
+
+ It must be called as::
+
+ async with conn.pipeline():
+ ...
+
.. automethod:: tpc_prepare
.. automethod:: tpc_commit
.. automethod:: tpc_rollback
from .errors import InternalError, ProgrammingError, NotSupportedError
from ._column import Column
from .conninfo import ConnectionInfo
-from .connection import BaseConnection, Connection, Notify
+from .connection import BaseConnection, Connection, Notify, Pipeline
from .transaction import Rollback, Transaction, AsyncTransaction
from .cursor_async import AsyncCursor
from .server_cursor import AsyncServerCursor, ServerCursor
-from .connection_async import AsyncConnection
+from .connection_async import AsyncConnection, AsyncPipeline
from . import dbapi20
from .dbapi20 import BINARY, DATETIME, NUMBER, ROWID, STRING
"AsyncConnection",
"AsyncCopy",
"AsyncCursor",
+ "AsyncPipeline",
"AsyncServerCursor",
"AsyncTransaction",
"BaseConnection",
"Cursor",
"IsolationLevel",
"Notify",
+ "Pipeline",
"Rollback",
"ServerCursor",
"Transaction",
NotifyHandler: TypeAlias = Callable[[Notify], None]
+class BasePipeline:
+ def __init__(self, pgconn: "PGconn") -> None:
+ self.pgconn = pgconn
+
+ @property
+ def status(self) -> pq.PipelineStatus:
+ return pq.PipelineStatus(self.pgconn.pipeline_status)
+
+ def _enter(self) -> None:
+ self.pgconn.enter_pipeline_mode()
+
+ def _exit(self) -> None:
+ self.pgconn.exit_pipeline_mode()
+
+
+class Pipeline(BasePipeline):
+ """Handler for connection in pipeline mode."""
+
+ def __enter__(self) -> "Pipeline":
+ self._enter()
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
+ self._exit()
+
+
class BaseConnection(Generic[Row]):
"""
Base class for different types of connections.
# apart a connection in the pool too (when _pool = None)
self._pool: Optional["BasePool[Any]"]
+ self._pipeline: Optional[BasePipeline] = None
+
# Time after which the connection should be closed
self._expire_at: float
server_cursor_factory: Type[ServerCursor[Row]]
row_factory: RowFactory[Row]
+ _pipeline: Optional[Pipeline]
+
def __init__(self, pgconn: "PGconn", row_factory: Optional[RowFactory[Row]] = None):
super().__init__(pgconn)
self.row_factory = row_factory or cast(RowFactory[Row], tuple_row)
n = Notify(pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid)
yield n
+ @contextmanager
+ def pipeline(self) -> Iterator[None]:
+ """Context manager to switch the connection into pipeline mode."""
+ if self._pipeline is not None:
+ raise e.ProgrammingError("already in pipeline mode")
+
+ pipeline = self._pipeline = Pipeline(self.pgconn)
+ try:
+ with pipeline:
+ yield
+ finally:
+ assert pipeline.status == pq.PipelineStatus.OFF, pipeline.status
+ self._pipeline = None
+
def wait(self, gen: PQGen[RV], timeout: Optional[float] = 0.1) -> RV:
"""
Consume a generator operating on the connection.
from . import errors as e
from . import waiting
-from .pq import Format, TransactionStatus
+from .pq import Format, PipelineStatus, TransactionStatus
from .abc import AdaptContext, Params, PQGen, PQGenConn, Query, RV
from ._tpc import Xid
from .rows import Row, AsyncRowFactory, tuple_row, TupleRow, args_row
from ._enums import IsolationLevel
from .conninfo import make_conninfo, conninfo_to_dict
from ._encodings import pgconn_encoding
-from .connection import BaseConnection, CursorRow, Notify
+from .connection import BaseConnection, BasePipeline, CursorRow, Notify
from .generators import notifies
from .transaction import AsyncTransaction
from .cursor_async import AsyncCursor
logger = logging.getLogger("psycopg")
+class AsyncPipeline(BasePipeline):
+ """Handler for async connection in pipeline mode."""
+
+ async def __aenter__(self) -> "AsyncPipeline":
+ self._enter()
+ return self
+
+ async def __aexit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
+ self._exit()
+
+
class AsyncConnection(BaseConnection[Row]):
"""
Asynchronous wrapper for a connection to the database.
server_cursor_factory: Type[AsyncServerCursor[Row]]
row_factory: AsyncRowFactory[Row]
+ _pipeline: Optional[AsyncPipeline]
+
def __init__(
self,
pgconn: "PGconn",
n = Notify(pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid)
yield n
+ @asynccontextmanager
+ async def pipeline(self) -> AsyncIterator[None]:
+ """Context manager to switch the connection into pipeline mode."""
+ if self._pipeline is not None:
+ raise e.ProgrammingError("already in pipeline mode")
+
+ pipeline = self._pipeline = AsyncPipeline(self.pgconn)
+ try:
+ async with pipeline:
+ yield
+ finally:
+ assert pipeline.status == PipelineStatus.OFF, pipeline.status
+ self._pipeline = None
+
async def wait(self, gen: PQGen[RV]) -> RV:
try:
return await waiting.wait_async(gen, self.pgconn.socket)
--- /dev/null
+import pytest
+
+from psycopg import pq
+from psycopg.errors import ProgrammingError
+
+pytestmark = pytest.mark.libpq(">= 14")
+
+
+def test_pipeline_status(conn):
+ with conn.pipeline():
+ p = conn._pipeline
+ assert p is not None
+ assert p.status == pq.PipelineStatus.ON
+ with pytest.raises(ProgrammingError):
+ with conn.pipeline():
+ pass
+ assert p.status == pq.PipelineStatus.OFF
+ assert not conn._pipeline
--- /dev/null
+import pytest
+
+from psycopg import pq
+from psycopg.errors import ProgrammingError
+
+pytestmark = [
+ pytest.mark.libpq(">= 14"),
+ pytest.mark.asyncio,
+]
+
+
+async def test_pipeline_status(aconn):
+ async with aconn.pipeline():
+ p = aconn._pipeline
+ assert p is not None
+ assert p.status == pq.PipelineStatus.ON
+ with pytest.raises(ProgrammingError):
+ async with aconn.pipeline():
+ pass
+ assert p.status == pq.PipelineStatus.OFF
+ assert not aconn._pipeline