]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: sync and fetch nested pipelines when they exit
authorDenis Laxalde <denis.laxalde@dalibo.com>
Tue, 29 Mar 2022 07:07:44 +0000 (09:07 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 2 Apr 2022 23:23:22 +0000 (01:23 +0200)
When running a nested pipeline, typically in executemany(), we now call
Pipeline.communicate() at exit similarly to a single (unnested) pipeline
but still keep the surrounding pipeline open (in contrast with unnested
one).

This resolves the issue around rowcount with returning executemany().
Accordingly, we check that rowcount is correct in test_executemany()
pipeline tests and drop the previous xfailed test.

psycopg/psycopg/connection.py
psycopg/psycopg/connection_async.py
psycopg/psycopg/cursor.py
tests/test_pipeline.py
tests/test_pipeline_async.py

index 5826b4beb82f5969a43ff7f79ad0d7478fe6ba0b..68e8306a6a6b35d0ed579d8e6b288737d12059e5 100644 (file)
@@ -882,7 +882,10 @@ class Connection(BaseConnection[Row]):
 
         if not pipeline:
             # No-op re-entered inner pipeline block.
-            yield self._pipeline
+            try:
+                yield self._pipeline
+            finally:
+                self._pipeline.communicate()
             return
 
         try:
index 246dd3fc837558f3a87ef445802733a9ebd1b57d..d0cbfea1051f6080ac7bf7b649f8a34e5ee8c45d 100644 (file)
@@ -310,7 +310,10 @@ class AsyncConnection(BaseConnection[Row]):
 
         if not pipeline:
             # No-op re-entered inner pipeline block.
-            yield self._pipeline
+            try:
+                yield self._pipeline
+            finally:
+                await self._pipeline.communicate()
             return
 
         try:
index cef1104de8840b9fa2661a2b4727d523023ac3bc..98441c9ee31a021fedc3f45e1cf99c29c93f2700 100644 (file)
@@ -514,9 +514,6 @@ class BaseCursor(Generic[ConnectionType, Row]):
 
         if self._execmany_returning is None:
             # Received from execute()
-            # TODO: bug we also end up here on executemany() if run from inside
-            # a pipeline block. This causes a wrong rowcount. As it isn't so
-            # serious, currently leaving it this way.
             self._results.extend(results)
             if first_batch:
                 self._set_current_result(0)
index c5e3a37cdd4c403a1c8340a551df820511edd21b..97d58dbd4f883440f3bd8ca6918c0fe06b1ef550 100644 (file)
@@ -169,31 +169,13 @@ def test_executemany(conn):
             [(10,), (20,)],
             returning=True,
         )
+        assert cur.rowcount == 2
         assert cur.fetchone() == (1,)
         assert cur.nextset()
         assert cur.fetchone() == (2,)
         assert cur.nextset() is None
 
 
-@pytest.mark.xfail
-def test_executemany_rowcount(conn):
-    conn.autocommit = True
-    conn.execute(
-        "create temp table test_executemany_rowcount ("
-        " id serial primary key, num integer)"
-    )
-    with conn.pipeline(), conn.cursor() as cur:
-        cur.executemany(
-            "insert into test_executemany_rowcount (num) values (%s) returning id",
-            [(10,), (20,)],
-        )
-
-        # TODO: failing. It is caused by reentering the pipeline mode in
-        # executemany(). Leaving it here to monitor how it changes. The snag is
-        # in Cursor._set_results_from_pipeline()
-        assert cur.rowcount == 2
-
-
 def test_prepared(conn):
     conn.autocommit = True
     with conn.pipeline():
index ba4065821603d740ec35acb207046eded990a1bd..d7baef894f81c955bde2cf7bcc11f12d17676d8d 100644 (file)
@@ -172,6 +172,7 @@ async def test_executemany(aconn):
             [(10,), (20,)],
             returning=True,
         )
+        assert cur.rowcount == 2
         assert (await cur.fetchone()) == (1,)
         assert cur.nextset()
         assert (await cur.fetchone()) == (2,)