self.command_queue = Deque[PipelineCommand]()
self.result_queue = Deque[PendingResult]()
+ def __repr__(self) -> str:
+ cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
+ info = pq.misc.connection_summary(self._conn.pgconn)
+ return f"<{cls} {info} at 0x{id(self):x}>"
+
@property
def status(self) -> pq.PipelineStatus:
return pq.PipelineStatus(self.pgconn.pipeline_status)
class Pipeline(BasePipeline):
"""Handler for connection in pipeline mode."""
+ __module__ = "psycopg"
_conn: "Connection[Any]"
def __init__(self, conn: "Connection[Any]") -> None:
class AsyncPipeline(BasePipeline):
"""Handler for async connection in pipeline mode."""
+ __module__ = "psycopg"
_conn: "AsyncConnection[Any]"
def __init__(self, conn: "AsyncConnection[Any]") -> None:
pytestmark = pytest.mark.libpq(">= 14")
+def test_repr(conn):
+ with conn.pipeline() as p:
+ assert "psycopg.Pipeline" in repr(p)
+ assert "[IDLE]" in repr(p)
+
+ conn.close()
+ assert "[BAD]" in repr(p)
+
+
def test_pipeline_status(conn: psycopg.Connection[Any]) -> None:
assert conn._pipeline is None
with conn.pipeline() as p:
]
+async def test_repr(aconn):
+ async with aconn.pipeline() as p:
+ assert "psycopg.AsyncPipeline" in repr(p)
+ assert "[IDLE]" in repr(p)
+
+ await aconn.close()
+ assert "[BAD]" in repr(p)
+
+
async def test_pipeline_status(aconn: psycopg.AsyncConnection[Any]) -> None:
assert aconn._pipeline is None
async with aconn.pipeline() as p: