From: Denis Laxalde Date: Wed, 6 Oct 2021 08:34:16 +0000 (+0200) Subject: Raise a ProgrammingError when using cursor.stream() in pipeline mode X-Git-Tag: 3.1~146^2~19 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d65c48e573c2d86ab217c772904978ad74917a36;p=thirdparty%2Fpsycopg.git Raise a ProgrammingError when using cursor.stream() in pipeline mode As suggested at [1], stream() and pipeline mode are different use cases. [1]: https://github.com/psycopg/psycopg/pull/93#discussion_r722666980 --- diff --git a/psycopg/psycopg/cursor.py b/psycopg/psycopg/cursor.py index c114eaede..df2a324dd 100644 --- a/psycopg/psycopg/cursor.py +++ b/psycopg/psycopg/cursor.py @@ -587,6 +587,9 @@ class Cursor(BaseCursor["Connection[Any]", Row]): """ Iterate row-by-row on a result from the database. """ + if self._pgconn.pipeline_status: + raise e.ProgrammingError("stream() cannot be used in pipeline mode") + with self._conn.lock: self._conn.wait(self._stream_send_gen(query, params, binary=binary)) first = True diff --git a/psycopg/psycopg/cursor_async.py b/psycopg/psycopg/cursor_async.py index 3e9050cca..b1211e5aa 100644 --- a/psycopg/psycopg/cursor_async.py +++ b/psycopg/psycopg/cursor_async.py @@ -100,6 +100,9 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]): *, binary: Optional[bool] = None, ) -> AsyncIterator[Row]: + if self._pgconn.pipeline_status: + raise e.ProgrammingError("stream() cannot be used in pipeline mode") + async with self._conn.lock: await self._conn.wait(self._stream_send_gen(query, params, binary=binary)) first = True diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 0e0aa0d56..7546f5b2b 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -1,5 +1,6 @@ import pytest +import psycopg from psycopg import pq from psycopg.errors import ProgrammingError @@ -16,3 +17,9 @@ def test_pipeline_status(conn): pass assert p.status == pq.PipelineStatus.OFF assert not conn._pipeline + + +def test_cursor_stream(conn): + with conn.pipeline(), conn.cursor() as cur: + with pytest.raises(psycopg.ProgrammingError): + cur.stream("select 1").__next__() diff --git a/tests/test_pipeline_async.py b/tests/test_pipeline_async.py index 1b0843fe8..aa9a739e7 100644 --- a/tests/test_pipeline_async.py +++ b/tests/test_pipeline_async.py @@ -1,5 +1,6 @@ import pytest +import psycopg from psycopg import pq from psycopg.errors import ProgrammingError @@ -19,3 +20,9 @@ async def test_pipeline_status(aconn): pass assert p.status == pq.PipelineStatus.OFF assert not aconn._pipeline + + +async def test_cursor_stream(aconn): + async with aconn.pipeline(), aconn.cursor() as cur: + with pytest.raises(psycopg.ProgrammingError): + await cur.stream("select 1").__anext__()