As suggested at [1], stream() and pipeline mode are different use cases.
[1]: https://github.com/psycopg/psycopg/pull/93#discussion_r722666980
"""
Iterate row-by-row on a result from the database.
"""
+ if self._pgconn.pipeline_status:
+ raise e.ProgrammingError("stream() cannot be used in pipeline mode")
+
with self._conn.lock:
self._conn.wait(self._stream_send_gen(query, params, binary=binary))
first = True
*,
binary: Optional[bool] = None,
) -> AsyncIterator[Row]:
+ if self._pgconn.pipeline_status:
+ raise e.ProgrammingError("stream() cannot be used in pipeline mode")
+
async with self._conn.lock:
await self._conn.wait(self._stream_send_gen(query, params, binary=binary))
first = True
import pytest
+import psycopg
from psycopg import pq
from psycopg.errors import ProgrammingError
pass
assert p.status == pq.PipelineStatus.OFF
assert not conn._pipeline
+
+
+def test_cursor_stream(conn):
+ with conn.pipeline(), conn.cursor() as cur:
+ with pytest.raises(psycopg.ProgrammingError):
+ cur.stream("select 1").__next__()
import pytest
+import psycopg
from psycopg import pq
from psycopg.errors import ProgrammingError
pass
assert p.status == pq.PipelineStatus.OFF
assert not aconn._pipeline
+
+
+async def test_cursor_stream(aconn):
+ async with aconn.pipeline(), aconn.cursor() as cur:
+ with pytest.raises(psycopg.ProgrammingError):
+ await cur.stream("select 1").__anext__()