From 6ee340a1fb8d28ca91ff5e68b11df22ed1764147 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sat, 6 Sep 2025 03:00:24 +0200 Subject: [PATCH] fix: keep a lock for the entire duration of executemany Before this change we had the lock context inside the pipeline context, because conn.pipepline() and Pipeline.__enter/exit__ take a lock. But this created windows of opportunities for other threads to execute concurrent operations on the connection, resulting in "another command is already in progress" errors. Fix #1130. --- docs/news.rst | 2 ++ psycopg/psycopg/_pipeline.py | 23 +++++++++++++++++++---- psycopg/psycopg/_pipeline_async.py | 22 ++++++++++++++++++---- psycopg/psycopg/connection.py | 21 +++++++++++++++++++++ psycopg/psycopg/connection_async.py | 21 +++++++++++++++++++++ psycopg/psycopg/cursor.py | 27 ++++++++++++++------------- psycopg/psycopg/cursor_async.py | 29 +++++++++++++++-------------- tests/test_cursor_common.py | 17 ++++++++++++++++- tests/test_cursor_common_async.py | 18 +++++++++++++++++- 9 files changed, 143 insertions(+), 37 deletions(-) diff --git a/docs/news.rst b/docs/news.rst index 8b2717b47..2891a6f92 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -15,6 +15,8 @@ Psycopg 3.2.10 (unreleased) - Fix memory leak when lambda/local functions are used as argument for `~.psycopg.types.json.set_json_dumps()`, `~.psycopg.types.json.set_json_loads()` (:ticket:`#1108`). +- Fix coordination of `~Cursor.executemany()` with other concurrent operations + on other cursors (:ticket:`#1130`). - Add support for Python 3.14 (:ticket:`#1053`). - Fix `psycopg_binary.__version__`. - Raise a warning if a GSSAPI connection is obtained using the diff --git a/psycopg/psycopg/_pipeline.py b/psycopg/psycopg/_pipeline.py index 1276f434a..57c7871da 100644 --- a/psycopg/psycopg/_pipeline.py +++ b/psycopg/psycopg/_pipeline.py @@ -23,27 +23,42 @@ if TYPE_CHECKING: logger = logging.getLogger("psycopg") +class _DummyLock: + + def __enter__(self) -> None: + pass + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + pass + + class Pipeline(BasePipeline): """Handler for (sync) connection in pipeline mode.""" __module__ = "psycopg" _conn: Connection[Any] - def __init__(self, conn: Connection[Any]) -> None: + def __init__(self, conn: Connection[Any], _no_lock: bool = False) -> None: super().__init__(conn) + self._lock = _DummyLock() if _no_lock else conn.lock def sync(self) -> None: """Sync the pipeline, send any pending command and receive and process all available results. """ try: - with self._conn.lock: + with self._lock: self._conn.wait(self._sync_gen()) except e._NO_TRACEBACK as ex: raise ex.with_traceback(None) def __enter__(self) -> Self: - with self._conn.lock: + with self._lock: self._conn.wait(self._enter_gen()) return self @@ -54,7 +69,7 @@ class Pipeline(BasePipeline): exc_tb: TracebackType | None, ) -> None: try: - with self._conn.lock: + with self._lock: self._conn.wait(self._exit_gen()) except Exception as exc2: # Don't clobber an exception raised in the block with this one diff --git a/psycopg/psycopg/_pipeline_async.py b/psycopg/psycopg/_pipeline_async.py index 058526071..1f04765b4 100644 --- a/psycopg/psycopg/_pipeline_async.py +++ b/psycopg/psycopg/_pipeline_async.py @@ -20,27 +20,41 @@ if TYPE_CHECKING: logger = logging.getLogger("psycopg") +class _DummyLock: + async def __aenter__(self) -> None: + pass + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + pass + + class AsyncPipeline(BasePipeline): """Handler for (async) connection in pipeline mode.""" __module__ = "psycopg" _conn: AsyncConnection[Any] - def __init__(self, conn: AsyncConnection[Any]) -> None: + def __init__(self, conn: AsyncConnection[Any], _no_lock: bool = False) -> None: super().__init__(conn) + self._lock = _DummyLock() if _no_lock else conn.lock async def sync(self) -> None: """Sync the pipeline, send any pending command and receive and process all available results. """ try: - async with self._conn.lock: + async with self._lock: await self._conn.wait(self._sync_gen()) except e._NO_TRACEBACK as ex: raise ex.with_traceback(None) async def __aenter__(self) -> Self: - async with self._conn.lock: + async with self._lock: await self._conn.wait(self._enter_gen()) return self @@ -51,7 +65,7 @@ class AsyncPipeline(BasePipeline): exc_tb: TracebackType | None, ) -> None: try: - async with self._conn.lock: + async with self._lock: await self._conn.wait(self._exit_gen()) except Exception as exc2: # Don't clobber an exception raised in the block with this one diff --git a/psycopg/psycopg/connection.py b/psycopg/psycopg/connection.py index ec149efcc..2a55f0485 100644 --- a/psycopg/psycopg/connection.py +++ b/psycopg/psycopg/connection.py @@ -415,6 +415,27 @@ class Connection(BaseConnection[Row]): assert pipeline is self._pipeline self._pipeline = None + @contextmanager + def _pipeline_nolock(self) -> Iterator[Pipeline]: + """like pipeline() but don't acquire a lock. + + Assume that the caller is holding the lock. + """ + + # Currently only used internally by Cursor.executemany() in a branch + # in which we already established that the connection has no pipeline. + # If this changes we may relax the asserts. + assert not self._pipeline + # WARNING: reference loop, broken ahead. + pipeline = self._pipeline = Pipeline(self, _no_lock=True) + try: + with pipeline: + yield pipeline + finally: + assert pipeline.level == 0 + assert pipeline is self._pipeline + self._pipeline = None + def wait(self, gen: PQGen[RV], interval: float | None = _WAIT_INTERVAL) -> RV: """ Consume a generator operating on the connection. diff --git a/psycopg/psycopg/connection_async.py b/psycopg/psycopg/connection_async.py index 77c8e1098..cc9bd7247 100644 --- a/psycopg/psycopg/connection_async.py +++ b/psycopg/psycopg/connection_async.py @@ -447,6 +447,27 @@ class AsyncConnection(BaseConnection[Row]): assert pipeline is self._pipeline self._pipeline = None + @asynccontextmanager + async def _pipeline_nolock(self) -> AsyncIterator[AsyncPipeline]: + """like pipeline() but don't acquire a lock. + + Assume that the caller is holding the lock. + """ + + # Currently only used internally by Cursor.executemany() in a branch + # in which we already established that the connection has no pipeline. + # If this changes we may relax the asserts. + assert not self._pipeline + # WARNING: reference loop, broken ahead. + pipeline = self._pipeline = AsyncPipeline(self, _no_lock=True) + try: + async with pipeline: + yield pipeline + finally: + assert pipeline.level == 0 + assert pipeline is self._pipeline + self._pipeline = None + async def wait(self, gen: PQGen[RV], interval: float | None = _WAIT_INTERVAL) -> RV: """ Consume a generator operating on the connection. diff --git a/psycopg/psycopg/cursor.py b/psycopg/psycopg/cursor.py index b5754930e..5b4fa6d7a 100644 --- a/psycopg/psycopg/cursor.py +++ b/psycopg/psycopg/cursor.py @@ -104,22 +104,23 @@ class Cursor(BaseCursor["Connection[Any]", Row]): Execute the same command with a sequence of input data. """ try: - if Pipeline.is_supported(): - # If there is already a pipeline, ride it, in order to avoid - # sending unnecessary Sync. - with self._conn.lock: - if p := self._conn._pipeline: - self._conn.wait( - self._executemany_gen_pipeline(query, params_seq, returning) - ) - # Otherwise, make a new one - if not p: - with self._conn.pipeline(), self._conn.lock: + with self._conn.lock: + if Pipeline.is_supported(): + # If there is already a pipeline, ride it, in order to avoid + # sending unnecessary Sync. + if self._conn._pipeline: self._conn.wait( self._executemany_gen_pipeline(query, params_seq, returning) ) - else: - with self._conn.lock: + else: + # Otherwise, make a new one + with self._conn._pipeline_nolock(): + self._conn.wait( + self._executemany_gen_pipeline( + query, params_seq, returning + ) + ) + else: self._conn.wait( self._executemany_gen_no_pipeline(query, params_seq, returning) ) diff --git a/psycopg/psycopg/cursor_async.py b/psycopg/psycopg/cursor_async.py index b714024ee..bc4ecb8d3 100644 --- a/psycopg/psycopg/cursor_async.py +++ b/psycopg/psycopg/cursor_async.py @@ -16,8 +16,8 @@ from .abc import Params, Query from .copy import AsyncCopy, AsyncWriter from .rows import AsyncRowFactory, Row, RowMaker from ._compat import Self -from ._pipeline import Pipeline from ._cursor_base import BaseCursor +from ._pipeline_async import AsyncPipeline if TYPE_CHECKING: from .connection_async import AsyncConnection @@ -104,22 +104,23 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]): Execute the same command with a sequence of input data. """ try: - if Pipeline.is_supported(): - # If there is already a pipeline, ride it, in order to avoid - # sending unnecessary Sync. - async with self._conn.lock: - if p := self._conn._pipeline: - await self._conn.wait( - self._executemany_gen_pipeline(query, params_seq, returning) - ) - # Otherwise, make a new one - if not p: - async with self._conn.pipeline(), self._conn.lock: + async with self._conn.lock: + if AsyncPipeline.is_supported(): + # If there is already a pipeline, ride it, in order to avoid + # sending unnecessary Sync. + if self._conn._pipeline: await self._conn.wait( self._executemany_gen_pipeline(query, params_seq, returning) ) - else: - async with self._conn.lock: + # Otherwise, make a new one + else: + async with self._conn._pipeline_nolock(): + await self._conn.wait( + self._executemany_gen_pipeline( + query, params_seq, returning + ) + ) + else: await self._conn.wait( self._executemany_gen_no_pipeline(query, params_seq, returning) ) diff --git a/tests/test_cursor_common.py b/tests/test_cursor_common.py index 74c428448..2809617bb 100644 --- a/tests/test_cursor_common.py +++ b/tests/test_cursor_common.py @@ -18,7 +18,7 @@ from psycopg.adapt import PyFormat from psycopg.types import TypeInfo from .utils import raiseif -from .acompat import closing +from .acompat import closing, gather, spawn from .fix_crdb import crdb_encoding from ._test_cursor import _execmany, execmany, my_row_factory, ph # noqa: F401 @@ -451,6 +451,21 @@ def test_executemany_null_first(conn, fmt_in): ) +@pytest.mark.slow +def test_executemany_lock(conn): + + def do_execmany(): + with conn.cursor() as cur: + cur.executemany(ph(cur, "select pg_sleep(%s)"), [(0.1,) for _ in range(10)]) + + def do_exec(): + with conn.cursor() as cur: + for i in range(100): + cur.execute("select 1") + + gather(spawn(do_execmany), spawn(do_exec)) + + def test_rowcount(conn): cur = conn.cursor() diff --git a/tests/test_cursor_common_async.py b/tests/test_cursor_common_async.py index 2b0624028..43cbbc679 100644 --- a/tests/test_cursor_common_async.py +++ b/tests/test_cursor_common_async.py @@ -15,7 +15,7 @@ from psycopg.adapt import PyFormat from psycopg.types import TypeInfo from .utils import raiseif -from .acompat import aclosing, alist, anext +from .acompat import aclosing, alist, anext, gather, spawn from .fix_crdb import crdb_encoding from ._test_cursor import _execmany, execmany, my_row_factory, ph # noqa: F401 @@ -453,6 +453,22 @@ async def test_executemany_null_first(aconn, fmt_in): ) +@pytest.mark.slow +async def test_executemany_lock(aconn): + async def do_execmany(): + async with aconn.cursor() as cur: + await cur.executemany( + ph(cur, "select pg_sleep(%s)"), [(0.1,) for _ in range(10)] + ) + + async def do_exec(): + async with aconn.cursor() as cur: + for i in range(100): + await cur.execute("select 1") + + await gather(spawn(do_execmany), spawn(do_exec)) + + async def test_rowcount(aconn): cur = aconn.cursor() -- 2.47.3