from . import pq
from . import errors as e
-from .pq import ExecStatus
+from .pq import ConnStatus, ExecStatus
from .abc import PipelineCommand, PQGen
from ._compat import Deque, TypeAlias
from ._cmodule import _psycopg
self.pgconn.enter_pipeline_mode()
def _exit(self) -> None:
- self.pgconn.exit_pipeline_mode()
+ if self.pgconn.status != ConnStatus.BAD:
+ self.pgconn.exit_pipeline_mode()
def _communicate_gen(self) -> PQGen[None]:
"""Communicate with pipeline to send commands and possibly fetch
assert p1.status == pq.PipelineStatus.OFF
+def test_pipeline_broken_conn_exit(conn: psycopg.Connection[Any]) -> None:
+ with pytest.raises(e.OperationalError):
+ with conn.pipeline():
+ conn.execute("select 1")
+ conn.close()
+ closed = True
+
+ assert closed
+
+
def test_cursor_stream(conn):
with conn.pipeline(), conn.cursor() as cur:
with pytest.raises(psycopg.ProgrammingError):
assert p1.status == pq.PipelineStatus.OFF
+async def test_pipeline_broken_conn_exit(aconn: psycopg.AsyncConnection[Any]) -> None:
+ with pytest.raises(e.OperationalError):
+ async with aconn.pipeline():
+ await aconn.execute("select 1")
+ await aconn.close()
+ closed = True
+
+ assert closed
+
+
async def test_cursor_stream(aconn):
async with aconn.pipeline(), aconn.cursor() as cur:
with pytest.raises(psycopg.ProgrammingError):