]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: keep a lock for the entire duration of executemany 1159/head
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 6 Sep 2025 01:00:24 +0000 (03:00 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 6 Sep 2025 17:10:40 +0000 (19:10 +0200)
Before this change we had the lock context inside the pipeline context,
because conn.pipepline() and Pipeline.__enter/exit__ take a lock. But
this created windows of opportunities for other threads to execute
concurrent operations on the connection, resulting in "another command
is already in progress" errors.

Fix #1130.

docs/news.rst
psycopg/psycopg/_pipeline.py
psycopg/psycopg/_pipeline_async.py
psycopg/psycopg/connection.py
psycopg/psycopg/connection_async.py
psycopg/psycopg/cursor.py
psycopg/psycopg/cursor_async.py
tests/test_cursor_common.py
tests/test_cursor_common_async.py

index 8b2717b47a6e0f4901e63eb1c58221c61f031359..2891a6f92b3bf40cd71fd4c40193651766c888ef 100644 (file)
@@ -15,6 +15,8 @@ Psycopg 3.2.10 (unreleased)
 - Fix memory leak when lambda/local functions are used as argument for
   `~.psycopg.types.json.set_json_dumps()`, `~.psycopg.types.json.set_json_loads()`
   (:ticket:`#1108`).
+- Fix coordination of `~Cursor.executemany()` with other concurrent operations
+  on other cursors (:ticket:`#1130`).
 - Add support for Python 3.14 (:ticket:`#1053`).
 - Fix `psycopg_binary.__version__`.
 - Raise a warning if a GSSAPI connection is obtained using the
index 1276f434aea4d0c6ffcdb6c33dcf2001209b98e6..57c7871da4ae22ccbf02576cef1fb21ca9f28e14 100644 (file)
@@ -23,27 +23,42 @@ if TYPE_CHECKING:
 logger = logging.getLogger("psycopg")
 
 
+class _DummyLock:
+
+    def __enter__(self) -> None:
+        pass
+
+    def __exit__(
+        self,
+        exc_type: type[BaseException] | None,
+        exc_val: BaseException | None,
+        exc_tb: TracebackType | None,
+    ) -> None:
+        pass
+
+
 class Pipeline(BasePipeline):
     """Handler for (sync) connection in pipeline mode."""
 
     __module__ = "psycopg"
     _conn: Connection[Any]
 
-    def __init__(self, conn: Connection[Any]) -> None:
+    def __init__(self, conn: Connection[Any], _no_lock: bool = False) -> None:
         super().__init__(conn)
+        self._lock = _DummyLock() if _no_lock else conn.lock
 
     def sync(self) -> None:
         """Sync the pipeline, send any pending command and receive and process
         all available results.
         """
         try:
-            with self._conn.lock:
+            with self._lock:
                 self._conn.wait(self._sync_gen())
         except e._NO_TRACEBACK as ex:
             raise ex.with_traceback(None)
 
     def __enter__(self) -> Self:
-        with self._conn.lock:
+        with self._lock:
             self._conn.wait(self._enter_gen())
         return self
 
@@ -54,7 +69,7 @@ class Pipeline(BasePipeline):
         exc_tb: TracebackType | None,
     ) -> None:
         try:
-            with self._conn.lock:
+            with self._lock:
                 self._conn.wait(self._exit_gen())
         except Exception as exc2:
             # Don't clobber an exception raised in the block with this one
index 058526071478fe80fbbb8f080fe5390fbd162ae5..1f04765b4581ac9910a8eee26b9c9757e0d6fbfb 100644 (file)
@@ -20,27 +20,41 @@ if TYPE_CHECKING:
 logger = logging.getLogger("psycopg")
 
 
+class _DummyLock:
+    async def __aenter__(self) -> None:
+        pass
+
+    async def __aexit__(
+        self,
+        exc_type: type[BaseException] | None,
+        exc_val: BaseException | None,
+        exc_tb: TracebackType | None,
+    ) -> None:
+        pass
+
+
 class AsyncPipeline(BasePipeline):
     """Handler for (async) connection in pipeline mode."""
 
     __module__ = "psycopg"
     _conn: AsyncConnection[Any]
 
-    def __init__(self, conn: AsyncConnection[Any]) -> None:
+    def __init__(self, conn: AsyncConnection[Any], _no_lock: bool = False) -> None:
         super().__init__(conn)
+        self._lock = _DummyLock() if _no_lock else conn.lock
 
     async def sync(self) -> None:
         """Sync the pipeline, send any pending command and receive and process
         all available results.
         """
         try:
-            async with self._conn.lock:
+            async with self._lock:
                 await self._conn.wait(self._sync_gen())
         except e._NO_TRACEBACK as ex:
             raise ex.with_traceback(None)
 
     async def __aenter__(self) -> Self:
-        async with self._conn.lock:
+        async with self._lock:
             await self._conn.wait(self._enter_gen())
         return self
 
@@ -51,7 +65,7 @@ class AsyncPipeline(BasePipeline):
         exc_tb: TracebackType | None,
     ) -> None:
         try:
-            async with self._conn.lock:
+            async with self._lock:
                 await self._conn.wait(self._exit_gen())
         except Exception as exc2:
             # Don't clobber an exception raised in the block with this one
index ec149efcc7d1ca2717e087435cde0a7039ae1eb9..2a55f0485f77c786a2c50fbdea5acbb20d7638da 100644 (file)
@@ -415,6 +415,27 @@ class Connection(BaseConnection[Row]):
                     assert pipeline is self._pipeline
                     self._pipeline = None
 
+    @contextmanager
+    def _pipeline_nolock(self) -> Iterator[Pipeline]:
+        """like pipeline() but don't acquire a lock.
+
+        Assume that the caller is holding the lock.
+        """
+
+        # Currently only used internally by Cursor.executemany() in a branch
+        # in which we already established that the connection has no pipeline.
+        # If this changes we may relax the asserts.
+        assert not self._pipeline
+        # WARNING: reference loop, broken ahead.
+        pipeline = self._pipeline = Pipeline(self, _no_lock=True)
+        try:
+            with pipeline:
+                yield pipeline
+        finally:
+            assert pipeline.level == 0
+            assert pipeline is self._pipeline
+            self._pipeline = None
+
     def wait(self, gen: PQGen[RV], interval: float | None = _WAIT_INTERVAL) -> RV:
         """
         Consume a generator operating on the connection.
index 77c8e1098b0a5eb6fa4f476534225f09206c45af..cc9bd7247013ad5bdbc414309d25b343b172c61e 100644 (file)
@@ -447,6 +447,27 @@ class AsyncConnection(BaseConnection[Row]):
                     assert pipeline is self._pipeline
                     self._pipeline = None
 
+    @asynccontextmanager
+    async def _pipeline_nolock(self) -> AsyncIterator[AsyncPipeline]:
+        """like pipeline() but don't acquire a lock.
+
+        Assume that the caller is holding the lock.
+        """
+
+        # Currently only used internally by Cursor.executemany() in a branch
+        # in which we already established that the connection has no pipeline.
+        # If this changes we may relax the asserts.
+        assert not self._pipeline
+        # WARNING: reference loop, broken ahead.
+        pipeline = self._pipeline = AsyncPipeline(self, _no_lock=True)
+        try:
+            async with pipeline:
+                yield pipeline
+        finally:
+            assert pipeline.level == 0
+            assert pipeline is self._pipeline
+            self._pipeline = None
+
     async def wait(self, gen: PQGen[RV], interval: float | None = _WAIT_INTERVAL) -> RV:
         """
         Consume a generator operating on the connection.
index b5754930eac01ac66392ed318d6e746b305c05bb..5b4fa6d7a384d073f932f4dc7ddfe683f81d5cf1 100644 (file)
@@ -104,22 +104,23 @@ class Cursor(BaseCursor["Connection[Any]", Row]):
         Execute the same command with a sequence of input data.
         """
         try:
-            if Pipeline.is_supported():
-                # If there is already a pipeline, ride it, in order to avoid
-                # sending unnecessary Sync.
-                with self._conn.lock:
-                    if p := self._conn._pipeline:
-                        self._conn.wait(
-                            self._executemany_gen_pipeline(query, params_seq, returning)
-                        )
-                # Otherwise, make a new one
-                if not p:
-                    with self._conn.pipeline(), self._conn.lock:
+            with self._conn.lock:
+                if Pipeline.is_supported():
+                    # If there is already a pipeline, ride it, in order to avoid
+                    # sending unnecessary Sync.
+                    if self._conn._pipeline:
                         self._conn.wait(
                             self._executemany_gen_pipeline(query, params_seq, returning)
                         )
-            else:
-                with self._conn.lock:
+                    else:
+                        # Otherwise, make a new one
+                        with self._conn._pipeline_nolock():
+                            self._conn.wait(
+                                self._executemany_gen_pipeline(
+                                    query, params_seq, returning
+                                )
+                            )
+                else:
                     self._conn.wait(
                         self._executemany_gen_no_pipeline(query, params_seq, returning)
                     )
index b714024ee8ad673e4ffc405503684090dba3fd9d..bc4ecb8d3adb993313cd5a2d71f915caebf773db 100644 (file)
@@ -16,8 +16,8 @@ from .abc import Params, Query
 from .copy import AsyncCopy, AsyncWriter
 from .rows import AsyncRowFactory, Row, RowMaker
 from ._compat import Self
-from ._pipeline import Pipeline
 from ._cursor_base import BaseCursor
+from ._pipeline_async import AsyncPipeline
 
 if TYPE_CHECKING:
     from .connection_async import AsyncConnection
@@ -104,22 +104,23 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
         Execute the same command with a sequence of input data.
         """
         try:
-            if Pipeline.is_supported():
-                # If there is already a pipeline, ride it, in order to avoid
-                # sending unnecessary Sync.
-                async with self._conn.lock:
-                    if p := self._conn._pipeline:
-                        await self._conn.wait(
-                            self._executemany_gen_pipeline(query, params_seq, returning)
-                        )
-                # Otherwise, make a new one
-                if not p:
-                    async with self._conn.pipeline(), self._conn.lock:
+            async with self._conn.lock:
+                if AsyncPipeline.is_supported():
+                    # If there is already a pipeline, ride it, in order to avoid
+                    # sending unnecessary Sync.
+                    if self._conn._pipeline:
                         await self._conn.wait(
                             self._executemany_gen_pipeline(query, params_seq, returning)
                         )
-            else:
-                async with self._conn.lock:
+                    # Otherwise, make a new one
+                    else:
+                        async with self._conn._pipeline_nolock():
+                            await self._conn.wait(
+                                self._executemany_gen_pipeline(
+                                    query, params_seq, returning
+                                )
+                            )
+                else:
                     await self._conn.wait(
                         self._executemany_gen_no_pipeline(query, params_seq, returning)
                     )
index 74c428448b98b7febb5427418863ea5c9ece1010..2809617bb24d71998caeb168e41f7ce475a9666c 100644 (file)
@@ -18,7 +18,7 @@ from psycopg.adapt import PyFormat
 from psycopg.types import TypeInfo
 
 from .utils import raiseif
-from .acompat import closing
+from .acompat import closing, gather, spawn
 from .fix_crdb import crdb_encoding
 from ._test_cursor import _execmany, execmany, my_row_factory, ph  # noqa: F401
 
@@ -451,6 +451,21 @@ def test_executemany_null_first(conn, fmt_in):
         )
 
 
+@pytest.mark.slow
+def test_executemany_lock(conn):
+
+    def do_execmany():
+        with conn.cursor() as cur:
+            cur.executemany(ph(cur, "select pg_sleep(%s)"), [(0.1,) for _ in range(10)])
+
+    def do_exec():
+        with conn.cursor() as cur:
+            for i in range(100):
+                cur.execute("select 1")
+
+    gather(spawn(do_execmany), spawn(do_exec))
+
+
 def test_rowcount(conn):
     cur = conn.cursor()
 
index 2b06240287d1ce9b7321dca3e32678c79abba8aa..43cbbc679c3372b10ccd77f8e59dd7adecdf6a87 100644 (file)
@@ -15,7 +15,7 @@ from psycopg.adapt import PyFormat
 from psycopg.types import TypeInfo
 
 from .utils import raiseif
-from .acompat import aclosing, alist, anext
+from .acompat import aclosing, alist, anext, gather, spawn
 from .fix_crdb import crdb_encoding
 from ._test_cursor import _execmany, execmany, my_row_factory, ph  # noqa: F401
 
@@ -453,6 +453,22 @@ async def test_executemany_null_first(aconn, fmt_in):
         )
 
 
+@pytest.mark.slow
+async def test_executemany_lock(aconn):
+    async def do_execmany():
+        async with aconn.cursor() as cur:
+            await cur.executemany(
+                ph(cur, "select pg_sleep(%s)"), [(0.1,) for _ in range(10)]
+            )
+
+    async def do_exec():
+        async with aconn.cursor() as cur:
+            for i in range(100):
+                await cur.execute("select 1")
+
+    await gather(spawn(do_execmany), spawn(do_exec))
+
+
 async def test_rowcount(aconn):
     cur = aconn.cursor()