From: Denis Laxalde Date: Tue, 29 Mar 2022 07:07:44 +0000 (+0200) Subject: fix: sync and fetch nested pipelines when they exit X-Git-Tag: 3.1~145^2~11 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b9d1366a4428f88a84902ce6113c9c9eee719b09;p=thirdparty%2Fpsycopg.git fix: sync and fetch nested pipelines when they exit When running a nested pipeline, typically in executemany(), we now call Pipeline.communicate() at exit similarly to a single (unnested) pipeline but still keep the surrounding pipeline open (in contrast with unnested one). This resolves the issue around rowcount with returning executemany(). Accordingly, we check that rowcount is correct in test_executemany() pipeline tests and drop the previous xfailed test. --- diff --git a/psycopg/psycopg/connection.py b/psycopg/psycopg/connection.py index 5826b4beb..68e8306a6 100644 --- a/psycopg/psycopg/connection.py +++ b/psycopg/psycopg/connection.py @@ -882,7 +882,10 @@ class Connection(BaseConnection[Row]): if not pipeline: # No-op re-entered inner pipeline block. - yield self._pipeline + try: + yield self._pipeline + finally: + self._pipeline.communicate() return try: diff --git a/psycopg/psycopg/connection_async.py b/psycopg/psycopg/connection_async.py index 246dd3fc8..d0cbfea10 100644 --- a/psycopg/psycopg/connection_async.py +++ b/psycopg/psycopg/connection_async.py @@ -310,7 +310,10 @@ class AsyncConnection(BaseConnection[Row]): if not pipeline: # No-op re-entered inner pipeline block. - yield self._pipeline + try: + yield self._pipeline + finally: + await self._pipeline.communicate() return try: diff --git a/psycopg/psycopg/cursor.py b/psycopg/psycopg/cursor.py index cef1104de..98441c9ee 100644 --- a/psycopg/psycopg/cursor.py +++ b/psycopg/psycopg/cursor.py @@ -514,9 +514,6 @@ class BaseCursor(Generic[ConnectionType, Row]): if self._execmany_returning is None: # Received from execute() - # TODO: bug we also end up here on executemany() if run from inside - # a pipeline block. This causes a wrong rowcount. As it isn't so - # serious, currently leaving it this way. self._results.extend(results) if first_batch: self._set_current_result(0) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index c5e3a37cd..97d58dbd4 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -169,31 +169,13 @@ def test_executemany(conn): [(10,), (20,)], returning=True, ) + assert cur.rowcount == 2 assert cur.fetchone() == (1,) assert cur.nextset() assert cur.fetchone() == (2,) assert cur.nextset() is None -@pytest.mark.xfail -def test_executemany_rowcount(conn): - conn.autocommit = True - conn.execute( - "create temp table test_executemany_rowcount (" - " id serial primary key, num integer)" - ) - with conn.pipeline(), conn.cursor() as cur: - cur.executemany( - "insert into test_executemany_rowcount (num) values (%s) returning id", - [(10,), (20,)], - ) - - # TODO: failing. It is caused by reentering the pipeline mode in - # executemany(). Leaving it here to monitor how it changes. The snag is - # in Cursor._set_results_from_pipeline() - assert cur.rowcount == 2 - - def test_prepared(conn): conn.autocommit = True with conn.pipeline(): diff --git a/tests/test_pipeline_async.py b/tests/test_pipeline_async.py index ba4065821..d7baef894 100644 --- a/tests/test_pipeline_async.py +++ b/tests/test_pipeline_async.py @@ -172,6 +172,7 @@ async def test_executemany(aconn): [(10,), (20,)], returning=True, ) + assert cur.rowcount == 2 assert (await cur.fetchone()) == (1,) assert cur.nextset() assert (await cur.fetchone()) == (2,)