From: Daniele Varrazzo Date: Tue, 29 Mar 2022 20:03:49 +0000 (+0200) Subject: fix: restore sending a Sync on block exit but not on executemany end X-Git-Tag: 3.1~145^2~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f2e0ab1adfb577cb66b5f6ca20b891af9b7faebf;p=thirdparty%2Fpsycopg.git fix: restore sending a Sync on block exit but not on executemany end It was removed a few commits ago, but as Denis suggests, it makes sense to keep it. Reintroduce it (and test for it), but make sure executemany doesn't send extra sync. --- diff --git a/psycopg/psycopg/connection.py b/psycopg/psycopg/connection.py index 5826b4beb..bf2c584a4 100644 --- a/psycopg/psycopg/connection.py +++ b/psycopg/psycopg/connection.py @@ -882,7 +882,10 @@ class Connection(BaseConnection[Row]): if not pipeline: # No-op re-entered inner pipeline block. - yield self._pipeline + try: + yield self._pipeline + finally: + self._pipeline.sync() return try: diff --git a/psycopg/psycopg/connection_async.py b/psycopg/psycopg/connection_async.py index 246dd3fc8..7d4bebfb1 100644 --- a/psycopg/psycopg/connection_async.py +++ b/psycopg/psycopg/connection_async.py @@ -310,7 +310,10 @@ class AsyncConnection(BaseConnection[Row]): if not pipeline: # No-op re-entered inner pipeline block. - yield self._pipeline + try: + yield self._pipeline + finally: + await self._pipeline.sync() return try: diff --git a/psycopg/psycopg/cursor.py b/psycopg/psycopg/cursor.py index 7433ccb51..4fd756ff9 100644 --- a/psycopg/psycopg/cursor.py +++ b/psycopg/psycopg/cursor.py @@ -698,10 +698,20 @@ class Cursor(BaseCursor["Connection[Any]", Row]): """ try: if Pipeline.is_supported(): - with self._conn.pipeline(), self._conn.lock: - self._conn.wait( - self._executemany_gen_pipeline(query, params_seq, returning) - ) + # If there is already a pipeline, ride it, in order to avoid + # sending unnecessary Sync. + with self._conn.lock: + p = self._conn._pipeline + if p: + self._conn.wait( + self._executemany_gen_pipeline(query, params_seq, returning) + ) + # Otherwise, make a new one + if not p: + with self._conn.pipeline(), self._conn.lock: + self._conn.wait( + self._executemany_gen_pipeline(query, params_seq, returning) + ) else: with self._conn.lock: self._conn.wait( diff --git a/psycopg/psycopg/cursor_async.py b/psycopg/psycopg/cursor_async.py index 7bd8b1dbb..3bf1004d5 100644 --- a/psycopg/psycopg/cursor_async.py +++ b/psycopg/psycopg/cursor_async.py @@ -88,10 +88,20 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]): ) -> None: try: if Pipeline.is_supported(): - async with self._conn.pipeline(), self._conn.lock: - await self._conn.wait( - self._executemany_gen_pipeline(query, params_seq, returning) - ) + # If there is already a pipeline, ride it, in order to avoid + # sending unnecessary Sync. + async with self._conn.lock: + p = self._conn._pipeline + if p: + await self._conn.wait( + self._executemany_gen_pipeline(query, params_seq, returning) + ) + # Otherwise, make a new one + if not p: + async with self._conn.pipeline(), self._conn.lock: + await self._conn.wait( + self._executemany_gen_pipeline(query, params_seq, returning) + ) else: await self._conn.wait( self._executemany_gen_no_pipeline(query, params_seq, returning) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 6af8b44d0..52a022ffe 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -62,6 +62,23 @@ def test_pipeline_exit_error_noclobber(conn, caplog): assert len(caplog.records) == 1 +def test_pipeline_exit_sync_trace(conn, trace): + t = trace.trace(conn) + with conn.pipeline(): + pass + conn.close() + assert len([i for i in t if i.type == "Sync"]) == 1 + + +def test_pipeline_nested_sync_trace(conn, trace): + t = trace.trace(conn) + with conn.pipeline(): + with conn.pipeline(): + pass + conn.close() + assert len([i for i in t if i.type == "Sync"]) == 2 + + def test_cursor_stream(conn): with conn.pipeline(), conn.cursor() as cur: with pytest.raises(psycopg.ProgrammingError): diff --git a/tests/test_pipeline_async.py b/tests/test_pipeline_async.py index b79ab8728..60338062c 100644 --- a/tests/test_pipeline_async.py +++ b/tests/test_pipeline_async.py @@ -65,6 +65,23 @@ async def test_pipeline_exit_error_noclobber(aconn, caplog): assert len(caplog.records) == 1 +async def test_pipeline_exit_sync_trace(aconn, trace): + t = trace.trace(aconn) + async with aconn.pipeline(): + pass + await aconn.close() + assert len([i for i in t if i.type == "Sync"]) == 1 + + +async def test_pipeline_nested_sync_trace(aconn, trace): + t = trace.trace(aconn) + async with aconn.pipeline(): + async with aconn.pipeline(): + pass + await aconn.close() + assert len([i for i in t if i.type == "Sync"]) == 2 + + async def test_cursor_stream(aconn): async with aconn.pipeline(), aconn.cursor() as cur: with pytest.raises(psycopg.ProgrammingError):