]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: restore sending a Sync on block exit but not on executemany end
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 29 Mar 2022 20:03:49 +0000 (22:03 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 2 Apr 2022 23:23:22 +0000 (01:23 +0200)
It was removed a few commits ago, but as Denis suggests, it makes sense
to keep it. Reintroduce it (and test for it), but make sure executemany
doesn't send extra sync.

psycopg/psycopg/connection.py
psycopg/psycopg/connection_async.py
psycopg/psycopg/cursor.py
psycopg/psycopg/cursor_async.py
tests/test_pipeline.py
tests/test_pipeline_async.py

index 5826b4beb82f5969a43ff7f79ad0d7478fe6ba0b..bf2c584a47d7e8d0d4e114db2db7d21108debde4 100644 (file)
@@ -882,7 +882,10 @@ class Connection(BaseConnection[Row]):
 
         if not pipeline:
             # No-op re-entered inner pipeline block.
-            yield self._pipeline
+            try:
+                yield self._pipeline
+            finally:
+                self._pipeline.sync()
             return
 
         try:
index 246dd3fc837558f3a87ef445802733a9ebd1b57d..7d4bebfb1325641ca3b240e5412caba26f605eb4 100644 (file)
@@ -310,7 +310,10 @@ class AsyncConnection(BaseConnection[Row]):
 
         if not pipeline:
             # No-op re-entered inner pipeline block.
-            yield self._pipeline
+            try:
+                yield self._pipeline
+            finally:
+                await self._pipeline.sync()
             return
 
         try:
index 7433ccb51a4e24599932c48aab9deabfd2d4e7b6..4fd756ff9da01d3893253cdf704734a40fecd2ae 100644 (file)
@@ -698,10 +698,20 @@ class Cursor(BaseCursor["Connection[Any]", Row]):
         """
         try:
             if Pipeline.is_supported():
-                with self._conn.pipeline(), self._conn.lock:
-                    self._conn.wait(
-                        self._executemany_gen_pipeline(query, params_seq, returning)
-                    )
+                # If there is already a pipeline, ride it, in order to avoid
+                # sending unnecessary Sync.
+                with self._conn.lock:
+                    p = self._conn._pipeline
+                    if p:
+                        self._conn.wait(
+                            self._executemany_gen_pipeline(query, params_seq, returning)
+                        )
+                # Otherwise, make a new one
+                if not p:
+                    with self._conn.pipeline(), self._conn.lock:
+                        self._conn.wait(
+                            self._executemany_gen_pipeline(query, params_seq, returning)
+                        )
             else:
                 with self._conn.lock:
                     self._conn.wait(
index 7bd8b1dbb1941643185ab6c6967a79f05289faa3..3bf1004d52ab5abc6729a76205ff41ce55be94e4 100644 (file)
@@ -88,10 +88,20 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
     ) -> None:
         try:
             if Pipeline.is_supported():
-                async with self._conn.pipeline(), self._conn.lock:
-                    await self._conn.wait(
-                        self._executemany_gen_pipeline(query, params_seq, returning)
-                    )
+                # If there is already a pipeline, ride it, in order to avoid
+                # sending unnecessary Sync.
+                async with self._conn.lock:
+                    p = self._conn._pipeline
+                    if p:
+                        await self._conn.wait(
+                            self._executemany_gen_pipeline(query, params_seq, returning)
+                        )
+                # Otherwise, make a new one
+                if not p:
+                    async with self._conn.pipeline(), self._conn.lock:
+                        await self._conn.wait(
+                            self._executemany_gen_pipeline(query, params_seq, returning)
+                        )
             else:
                 await self._conn.wait(
                     self._executemany_gen_no_pipeline(query, params_seq, returning)
index 6af8b44d060912610b68c205c394f2d03dd1dcd7..52a022ffec580607a8e990fd9e13ad4740d0a83e 100644 (file)
@@ -62,6 +62,23 @@ def test_pipeline_exit_error_noclobber(conn, caplog):
     assert len(caplog.records) == 1
 
 
+def test_pipeline_exit_sync_trace(conn, trace):
+    t = trace.trace(conn)
+    with conn.pipeline():
+        pass
+    conn.close()
+    assert len([i for i in t if i.type == "Sync"]) == 1
+
+
+def test_pipeline_nested_sync_trace(conn, trace):
+    t = trace.trace(conn)
+    with conn.pipeline():
+        with conn.pipeline():
+            pass
+    conn.close()
+    assert len([i for i in t if i.type == "Sync"]) == 2
+
+
 def test_cursor_stream(conn):
     with conn.pipeline(), conn.cursor() as cur:
         with pytest.raises(psycopg.ProgrammingError):
index b79ab8728c7536197f2a9e60851b773d335483b6..60338062ce8c27c35ddf586ff08b5a8fb8f2634d 100644 (file)
@@ -65,6 +65,23 @@ async def test_pipeline_exit_error_noclobber(aconn, caplog):
     assert len(caplog.records) == 1
 
 
+async def test_pipeline_exit_sync_trace(aconn, trace):
+    t = trace.trace(aconn)
+    async with aconn.pipeline():
+        pass
+    await aconn.close()
+    assert len([i for i in t if i.type == "Sync"]) == 1
+
+
+async def test_pipeline_nested_sync_trace(aconn, trace):
+    t = trace.trace(aconn)
+    async with aconn.pipeline():
+        async with aconn.pipeline():
+            pass
+    await aconn.close()
+    assert len([i for i in t if i.type == "Sync"]) == 2
+
+
 async def test_cursor_stream(aconn):
     async with aconn.pipeline(), aconn.cursor() as cur:
         with pytest.raises(psycopg.ProgrammingError):