]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: keep a lock for the entire duration of executemany
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 6 Sep 2025 01:00:24 +0000 (03:00 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 6 Sep 2025 20:14:20 +0000 (22:14 +0200)
Before this change we had the lock context inside the pipeline context,
because conn.pipepline() and Pipeline.__enter/exit__ take a lock. But
this created windows of opportunities for other threads to execute
concurrent operations on the connection, resulting in "another command
is already in progress" errors.

Fix #1130.

docs/news.rst
psycopg/psycopg/_pipeline.py
psycopg/psycopg/_pipeline_async.py
psycopg/psycopg/connection.py
psycopg/psycopg/connection_async.py
psycopg/psycopg/cursor.py
psycopg/psycopg/cursor_async.py
tests/test_cursor_common.py
tests/test_cursor_common_async.py

index 4e348611af09c628553c7d4a11b7643558571f40..bacf4fc0183a3ad2eafa39735eeb6ca040e3ce83 100644 (file)
@@ -39,6 +39,8 @@ Psycopg 3.2.10 (unreleased)
 - Fix memory leak when lambda/local functions are used as argument for
   `~.psycopg.types.json.set_json_dumps()`, `~.psycopg.types.json.set_json_loads()`
   (:ticket:`#1108`).
+- Fix coordination of `~Cursor.executemany()` with other concurrent operations
+  on other cursors (:ticket:`#1130`).
 - Add support for Python 3.14 (:ticket:`#1053`).
 - Fix `psycopg_binary.__version__`.
 
index 1276f434aea4d0c6ffcdb6c33dcf2001209b98e6..57c7871da4ae22ccbf02576cef1fb21ca9f28e14 100644 (file)
@@ -23,27 +23,42 @@ if TYPE_CHECKING:
 logger = logging.getLogger("psycopg")
 
 
+class _DummyLock:
+
+    def __enter__(self) -> None:
+        pass
+
+    def __exit__(
+        self,
+        exc_type: type[BaseException] | None,
+        exc_val: BaseException | None,
+        exc_tb: TracebackType | None,
+    ) -> None:
+        pass
+
+
 class Pipeline(BasePipeline):
     """Handler for (sync) connection in pipeline mode."""
 
     __module__ = "psycopg"
     _conn: Connection[Any]
 
-    def __init__(self, conn: Connection[Any]) -> None:
+    def __init__(self, conn: Connection[Any], _no_lock: bool = False) -> None:
         super().__init__(conn)
+        self._lock = _DummyLock() if _no_lock else conn.lock
 
     def sync(self) -> None:
         """Sync the pipeline, send any pending command and receive and process
         all available results.
         """
         try:
-            with self._conn.lock:
+            with self._lock:
                 self._conn.wait(self._sync_gen())
         except e._NO_TRACEBACK as ex:
             raise ex.with_traceback(None)
 
     def __enter__(self) -> Self:
-        with self._conn.lock:
+        with self._lock:
             self._conn.wait(self._enter_gen())
         return self
 
@@ -54,7 +69,7 @@ class Pipeline(BasePipeline):
         exc_tb: TracebackType | None,
     ) -> None:
         try:
-            with self._conn.lock:
+            with self._lock:
                 self._conn.wait(self._exit_gen())
         except Exception as exc2:
             # Don't clobber an exception raised in the block with this one
index 058526071478fe80fbbb8f080fe5390fbd162ae5..1f04765b4581ac9910a8eee26b9c9757e0d6fbfb 100644 (file)
@@ -20,27 +20,41 @@ if TYPE_CHECKING:
 logger = logging.getLogger("psycopg")
 
 
+class _DummyLock:
+    async def __aenter__(self) -> None:
+        pass
+
+    async def __aexit__(
+        self,
+        exc_type: type[BaseException] | None,
+        exc_val: BaseException | None,
+        exc_tb: TracebackType | None,
+    ) -> None:
+        pass
+
+
 class AsyncPipeline(BasePipeline):
     """Handler for (async) connection in pipeline mode."""
 
     __module__ = "psycopg"
     _conn: AsyncConnection[Any]
 
-    def __init__(self, conn: AsyncConnection[Any]) -> None:
+    def __init__(self, conn: AsyncConnection[Any], _no_lock: bool = False) -> None:
         super().__init__(conn)
+        self._lock = _DummyLock() if _no_lock else conn.lock
 
     async def sync(self) -> None:
         """Sync the pipeline, send any pending command and receive and process
         all available results.
         """
         try:
-            async with self._conn.lock:
+            async with self._lock:
                 await self._conn.wait(self._sync_gen())
         except e._NO_TRACEBACK as ex:
             raise ex.with_traceback(None)
 
     async def __aenter__(self) -> Self:
-        async with self._conn.lock:
+        async with self._lock:
             await self._conn.wait(self._enter_gen())
         return self
 
@@ -51,7 +65,7 @@ class AsyncPipeline(BasePipeline):
         exc_tb: TracebackType | None,
     ) -> None:
         try:
-            async with self._conn.lock:
+            async with self._lock:
                 await self._conn.wait(self._exit_gen())
         except Exception as exc2:
             # Don't clobber an exception raised in the block with this one
index 338665276fa3a8154fb5e7f444b27afc8f267815..745715673168bfb790ffe8112a44163b7b9b6807 100644 (file)
@@ -404,6 +404,27 @@ class Connection(BaseConnection[Row]):
                     assert pipeline is self._pipeline
                     self._pipeline = None
 
+    @contextmanager
+    def _pipeline_nolock(self) -> Iterator[Pipeline]:
+        """like pipeline() but don't acquire a lock.
+
+        Assume that the caller is holding the lock.
+        """
+
+        # Currently only used internally by Cursor.executemany() in a branch
+        # in which we already established that the connection has no pipeline.
+        # If this changes we may relax the asserts.
+        assert not self._pipeline
+        # WARNING: reference loop, broken ahead.
+        pipeline = self._pipeline = Pipeline(self, _no_lock=True)
+        try:
+            with pipeline:
+                yield pipeline
+        finally:
+            assert pipeline.level == 0
+            assert pipeline is self._pipeline
+            self._pipeline = None
+
     def wait(self, gen: PQGen[RV], interval: float | None = _WAIT_INTERVAL) -> RV:
         """
         Consume a generator operating on the connection.
index 408f3827fb34c80d6de1f984a8399ded7dc279ff..037750efca2e819d63134e40b793c56904227d69 100644 (file)
@@ -427,6 +427,27 @@ class AsyncConnection(BaseConnection[Row]):
                     assert pipeline is self._pipeline
                     self._pipeline = None
 
+    @asynccontextmanager
+    async def _pipeline_nolock(self) -> AsyncIterator[AsyncPipeline]:
+        """like pipeline() but don't acquire a lock.
+
+        Assume that the caller is holding the lock.
+        """
+
+        # Currently only used internally by Cursor.executemany() in a branch
+        # in which we already established that the connection has no pipeline.
+        # If this changes we may relax the asserts.
+        assert not self._pipeline
+        # WARNING: reference loop, broken ahead.
+        pipeline = self._pipeline = AsyncPipeline(self, _no_lock=True)
+        try:
+            async with pipeline:
+                yield pipeline
+        finally:
+            assert pipeline.level == 0
+            assert pipeline is self._pipeline
+            self._pipeline = None
+
     async def wait(self, gen: PQGen[RV], interval: float | None = _WAIT_INTERVAL) -> RV:
         """
         Consume a generator operating on the connection.
index b5b127a6ba45ff35453e348f5aaf6aa0d7ce457a..02a4bbbe54ee9e6a58a876cffdb4ac16da53493a 100644 (file)
@@ -105,22 +105,23 @@ class Cursor(BaseCursor["Connection[Any]", Row]):
         Execute the same command with a sequence of input data.
         """
         try:
-            if Pipeline.is_supported():
-                # If there is already a pipeline, ride it, in order to avoid
-                # sending unnecessary Sync.
-                with self._conn.lock:
-                    if p := self._conn._pipeline:
-                        self._conn.wait(
-                            self._executemany_gen_pipeline(query, params_seq, returning)
-                        )
-                # Otherwise, make a new one
-                if not p:
-                    with self._conn.pipeline(), self._conn.lock:
+            with self._conn.lock:
+                if Pipeline.is_supported():
+                    # If there is already a pipeline, ride it, in order to avoid
+                    # sending unnecessary Sync.
+                    if self._conn._pipeline:
                         self._conn.wait(
                             self._executemany_gen_pipeline(query, params_seq, returning)
                         )
-            else:
-                with self._conn.lock:
+                    else:
+                        # Otherwise, make a new one
+                        with self._conn._pipeline_nolock():
+                            self._conn.wait(
+                                self._executemany_gen_pipeline(
+                                    query, params_seq, returning
+                                )
+                            )
+                else:
                     self._conn.wait(
                         self._executemany_gen_no_pipeline(query, params_seq, returning)
                     )
index d237ab17b82559c0c59ac90be6c610cfd51bf164..06c715858d497ea50850c6e08ed5200955e21698 100644 (file)
@@ -17,8 +17,8 @@ from .abc import Params, Query
 from .copy import AsyncCopy, AsyncWriter
 from .rows import AsyncRowFactory, Row, RowMaker
 from ._compat import Self
-from ._pipeline import Pipeline
 from ._cursor_base import BaseCursor
+from ._pipeline_async import AsyncPipeline
 
 if TYPE_CHECKING:
     from .connection_async import AsyncConnection
@@ -105,22 +105,23 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
         Execute the same command with a sequence of input data.
         """
         try:
-            if Pipeline.is_supported():
-                # If there is already a pipeline, ride it, in order to avoid
-                # sending unnecessary Sync.
-                async with self._conn.lock:
-                    if p := self._conn._pipeline:
-                        await self._conn.wait(
-                            self._executemany_gen_pipeline(query, params_seq, returning)
-                        )
-                # Otherwise, make a new one
-                if not p:
-                    async with self._conn.pipeline(), self._conn.lock:
+            async with self._conn.lock:
+                if AsyncPipeline.is_supported():
+                    # If there is already a pipeline, ride it, in order to avoid
+                    # sending unnecessary Sync.
+                    if self._conn._pipeline:
                         await self._conn.wait(
                             self._executemany_gen_pipeline(query, params_seq, returning)
                         )
-            else:
-                async with self._conn.lock:
+                    # Otherwise, make a new one
+                    else:
+                        async with self._conn._pipeline_nolock():
+                            await self._conn.wait(
+                                self._executemany_gen_pipeline(
+                                    query, params_seq, returning
+                                )
+                            )
+                else:
                     await self._conn.wait(
                         self._executemany_gen_no_pipeline(query, params_seq, returning)
                     )
index c5e07297acc2893063a3cb27031459df906f9527..56db3b0814a658a7ed640986d6f024ee88856f5d 100644 (file)
@@ -20,6 +20,7 @@ from psycopg.types import TypeInfo
 
 from . import _test_cursor
 from .utils import raiseif
+from .acompat import gather, spawn
 from .fix_crdb import crdb_encoding
 from ._test_cursor import my_row_factory, ph
 
@@ -461,6 +462,21 @@ def test_executemany_null_first(conn, fmt_in):
         )
 
 
+@pytest.mark.slow
+def test_executemany_lock(conn):
+
+    def do_execmany():
+        with conn.cursor() as cur:
+            cur.executemany(ph(cur, "select pg_sleep(%s)"), [(0.1,) for _ in range(10)])
+
+    def do_exec():
+        with conn.cursor() as cur:
+            for i in range(100):
+                cur.execute("select 1")
+
+    gather(spawn(do_execmany), spawn(do_exec))
+
+
 def test_rowcount(conn):
     cur = conn.cursor()
 
index 600f254a94ce7775c35d8661619fea77c2848826..3b7b50194d56849186899a9e381e3ca9eb711cb1 100644 (file)
@@ -17,7 +17,7 @@ from psycopg.types import TypeInfo
 
 from . import _test_cursor
 from .utils import raiseif
-from .acompat import alist
+from .acompat import alist, gather, spawn
 from .fix_crdb import crdb_encoding
 from ._test_cursor import my_row_factory, ph
 
@@ -464,6 +464,22 @@ async def test_executemany_null_first(aconn, fmt_in):
         )
 
 
+@pytest.mark.slow
+async def test_executemany_lock(aconn):
+    async def do_execmany():
+        async with aconn.cursor() as cur:
+            await cur.executemany(
+                ph(cur, "select pg_sleep(%s)"), [(0.1,) for _ in range(10)]
+            )
+
+    async def do_exec():
+        async with aconn.cursor() as cur:
+            for i in range(100):
+                await cur.execute("select 1")
+
+    await gather(spawn(do_execmany), spawn(do_exec))
+
+
 async def test_rowcount(aconn):
     cur = aconn.cursor()