def pipeline(self) -> Iterator[Pipeline]:
"""Switch the connection into pipeline mode."""
with self.lock:
+ self._check_connection_ok()
+
pipeline = self._pipeline
if pipeline is None:
# WARNING: reference loop, broken ahead.
async def pipeline(self) -> AsyncIterator[AsyncPipeline]:
"""Context manager to switch the connection into pipeline mode."""
async with self.lock:
+ self._check_connection_ok()
+
pipeline = self._pipeline
if pipeline is None:
# WARNING: reference loop, broken ahead.
assert "[BAD]" in repr(p)
+def test_connection_closed(conn):
+ conn.close()
+ with pytest.raises(e.OperationalError):
+ with conn.pipeline():
+ pass
+
+
def test_pipeline_status(conn: psycopg.Connection[Any]) -> None:
assert conn._pipeline is None
with conn.pipeline() as p:
assert "[BAD]" in repr(p)
+async def test_connection_closed(aconn):
+ await aconn.close()
+ with pytest.raises(e.OperationalError):
+ async with aconn.pipeline():
+ pass
+
+
async def test_pipeline_status(aconn: psycopg.AsyncConnection[Any]) -> None:
assert aconn._pipeline is None
async with aconn.pipeline() as p: