]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Raise a ProgrammingError when using cursor.stream() in pipeline mode
authorDenis Laxalde <denis.laxalde@dalibo.com>
Wed, 6 Oct 2021 08:34:16 +0000 (10:34 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 2 Apr 2022 23:17:57 +0000 (01:17 +0200)
As suggested at [1], stream() and pipeline mode are different use cases.

[1]: https://github.com/psycopg/psycopg/pull/93#discussion_r722666980

psycopg/psycopg/cursor.py
psycopg/psycopg/cursor_async.py
tests/test_pipeline.py
tests/test_pipeline_async.py

index c114eaede803da8f596d3e66e393243909d87e7c..df2a324ddd05743fea533639a4dcd8140abb42d5 100644 (file)
@@ -587,6 +587,9 @@ class Cursor(BaseCursor["Connection[Any]", Row]):
         """
         Iterate row-by-row on a result from the database.
         """
+        if self._pgconn.pipeline_status:
+            raise e.ProgrammingError("stream() cannot be used in pipeline mode")
+
         with self._conn.lock:
             self._conn.wait(self._stream_send_gen(query, params, binary=binary))
             first = True
index 3e9050cca7c73e18f15fdacbc7a937235fc0c1fb..b1211e5aacdff827b1a9c72e34a035dde4b0aba8 100644 (file)
@@ -100,6 +100,9 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
         *,
         binary: Optional[bool] = None,
     ) -> AsyncIterator[Row]:
+        if self._pgconn.pipeline_status:
+            raise e.ProgrammingError("stream() cannot be used in pipeline mode")
+
         async with self._conn.lock:
             await self._conn.wait(self._stream_send_gen(query, params, binary=binary))
             first = True
index 0e0aa0d5661d6bb19c5af76810d690eae822b015..7546f5b2b2aed0418619e2642ea023ef8a7e8386 100644 (file)
@@ -1,5 +1,6 @@
 import pytest
 
+import psycopg
 from psycopg import pq
 from psycopg.errors import ProgrammingError
 
@@ -16,3 +17,9 @@ def test_pipeline_status(conn):
                 pass
     assert p.status == pq.PipelineStatus.OFF
     assert not conn._pipeline
+
+
+def test_cursor_stream(conn):
+    with conn.pipeline(), conn.cursor() as cur:
+        with pytest.raises(psycopg.ProgrammingError):
+            cur.stream("select 1").__next__()
index 1b0843fe8c422263b3e10f54dfa453aa5619e5b6..aa9a739e7b9117d944fa411be13d195446c3dbb0 100644 (file)
@@ -1,5 +1,6 @@
 import pytest
 
+import psycopg
 from psycopg import pq
 from psycopg.errors import ProgrammingError
 
@@ -19,3 +20,9 @@ async def test_pipeline_status(aconn):
                 pass
     assert p.status == pq.PipelineStatus.OFF
     assert not aconn._pipeline
+
+
+async def test_cursor_stream(aconn):
+    async with aconn.pipeline(), aconn.cursor() as cur:
+        with pytest.raises(psycopg.ProgrammingError):
+            await cur.stream("select 1").__anext__()