]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix; keep the lock for the entire duration of copy 1210/head
authorKamil Monicz <kamil@monicz.dev>
Fri, 14 Nov 2025 05:09:07 +0000 (06:09 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 16 Nov 2025 14:51:14 +0000 (15:51 +0100)
Avoid errors in concurrent operations on the same connection, from
different cursors.

Close #1210

Fix AsyncCursor.copy concurrency

Apply linter diff

Add test_copy_concurrency and update news

Update docs/news.rst

Co-authored-by: Daniele Varrazzo <daniele.varrazzo@gmail.com>
Update tests/test_copy_async.py

Co-authored-by: Daniele Varrazzo <daniele.varrazzo@gmail.com>
Update tests/test_copy_async.py

Co-authored-by: Daniele Varrazzo <daniele.varrazzo@gmail.com>
Update tests/test_copy_async.py

Co-authored-by: Daniele Varrazzo <daniele.varrazzo@gmail.com>
Simplify execution_log assertion

Revert unnecessary imports

docs/news.rst
psycopg/psycopg/cursor.py
psycopg/psycopg/cursor_async.py
tests/test_copy.py
tests/test_copy_async.py

index 4ad61574384decbdf261f80417ae0632a6993301..47aca8c9f24eef2fb735fa78d612d50754f17970 100644 (file)
@@ -46,6 +46,13 @@ Psycopg 3.3.0 (unreleased)
 - Drop support for Python 3.8 (:ticket:`#976`) and 3.9 (:ticket:`#1056`).
 
 
+Psycopg 3.2.13 (unreleased)
+^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+- Fix `Cursor.copy()` and `AsyncCursor.copy()` to hold the connection lock
+  for the entire operation, preventing concurrent access issues (:ticket:`#1210`).
+
+
 Current release
 ---------------
 
index fd5600badcf6270628469c9a61f816b3b7e11af5..a3383dbaecdae64b4c2ecd293c7a512192abafeb 100644 (file)
@@ -298,8 +298,8 @@ class Cursor(BaseCursor["Connection[Any]", Row]):
             with self._conn.lock:
                 self._conn.wait(self._start_copy_gen(statement, params))
 
-            with Copy(self, writer=writer) as copy:
-                yield copy
+                with Copy(self, writer=writer) as copy:
+                    yield copy
         except e._NO_TRACEBACK as ex:
             raise ex.with_traceback(None)
 
index ae7d6cec36c9e70430f0ce819be845b8a7e22260..22dd9458321a097faf72810f951b39297d3eff82 100644 (file)
@@ -300,8 +300,8 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]):
             async with self._conn.lock:
                 await self._conn.wait(self._start_copy_gen(statement, params))
 
-            async with AsyncCopy(self, writer=writer) as copy:
-                yield copy
+                async with AsyncCopy(self, writer=writer) as copy:
+                    yield copy
         except e._NO_TRACEBACK as ex:
             raise ex.with_traceback(None)
 
index 18975c99abc45d1cc7d804ab05d69748e808608b..4a983e1f3161f84cb8c5024f55dcc9de3c3b74e1 100644 (file)
@@ -20,6 +20,7 @@ from psycopg.types.hstore import register_hstore
 from psycopg.types.numeric import Int4
 
 from .utils import eur
+from .acompat import Event, gather, spawn
 from ._test_copy import sample_binary  # noqa: F401
 from ._test_copy import FileWriter, ensure_table, py_to_raw, sample_binary_rows
 from ._test_copy import sample_records, sample_tabledef, sample_text, sample_values
@@ -952,6 +953,105 @@ def test_copy_table_across(conn_cls, dsn, faker, mode):
             faker.assert_record(got, want)
 
 
+def test_copy_concurrency(conn):
+    """
+    Test that copy operations hold the connection lock for the entire operation.
+
+    This test verifies the fix for the concurrency issue where Cursor.copy()
+    was not holding the connection lock throughout the copy context, allowing
+    concurrent operations to interfere.
+    """
+    conn.execute("create temp table copy_concurrency_test (id int, data text)")
+
+    # Events to coordinate execution between copy task and workers
+    copy_entered = Event()
+    wrote_first = Event()
+    wrote_second = Event()
+    can_proceed = Event()
+
+    # Track execution order to verify workers run after copy completes
+    execution_log = []
+
+    def copy_task():
+        """Copy task that writes two rows with controlled pauses."""
+        cur = conn.cursor()
+        with cur.copy("copy copy_concurrency_test from stdin") as copy:
+            # Pause after entering copy context
+            execution_log.append("entered_copy")
+            copy_entered.set()
+            can_proceed.wait()
+
+            # Write first row and pause
+            copy.write_row((1, "first"))
+            execution_log.append("wrote_row_1")
+            wrote_first.set()
+            can_proceed.wait()
+
+            # Write second row and pause
+            copy.write_row((2, "second"))
+            execution_log.append("wrote_row_2")
+            wrote_second.set()
+            can_proceed.wait()
+
+        # Copy context exited, lock should now be released
+        execution_log.append("exited_copy")
+
+    def worker_task():
+        """
+        Worker that attempts to execute a query on a different cursor.
+        Should block until copy completes due to connection lock.
+        """
+        # Try to execute on another cursor - this should block until copy exits
+        worker_cur = conn.cursor()
+        worker_cur.execute("select 1")
+        execution_log.append("worker_completed")
+
+    # Start the copy task
+    t_copy = spawn(copy_task)
+
+    # Wait for copy to enter, then spawn first worker
+    copy_entered.wait()
+    t_worker1 = spawn(worker_task)
+
+    # Allow copy to proceed to write first row
+    can_proceed.set()
+    can_proceed.clear()
+    wrote_first.wait()
+
+    # Spawn second worker after first row
+    t_worker2 = spawn(worker_task)
+
+    # Allow copy to proceed to write second row
+    can_proceed.set()
+    can_proceed.clear()
+    wrote_second.wait()
+
+    # Spawn third worker after second row
+    t_worker3 = spawn(worker_task)
+
+    # Allow copy to exit
+    can_proceed.set()
+
+    # Wait for all tasks to complete
+    gather(t_copy, t_worker1, t_worker2, t_worker3)
+
+    # Verify the data was written correctly
+    cur = conn.execute("select * from copy_concurrency_test order by id")
+    rows = cur.fetchall()
+    assert rows == [(1, "first"), (2, "second")]
+
+    # Verify that all workers completed AFTER copy exited
+    assert execution_log == [
+        "entered_copy",
+        "wrote_row_1",
+        "wrote_row_2",
+        "exited_copy",
+        "worker_completed",
+        "worker_completed",
+        "worker_completed",
+    ]
+
+
 class DataGenerator:
 
     def __init__(self, conn, nrecs, srec, offset=0, block_size=8192):
index df3a7e6b77785300c756c9a62161b8e0a5d9084b..c284018094c07cef79585ef1a5af23b603c0fdf9 100644 (file)
@@ -17,7 +17,7 @@ from psycopg.types.hstore import register_hstore
 from psycopg.types.numeric import Int4
 
 from .utils import eur
-from .acompat import alist
+from .acompat import AEvent, alist, gather, spawn
 from ._test_copy import sample_binary  # noqa: F401
 from ._test_copy import AsyncFileWriter, ensure_table_async, py_to_raw
 from ._test_copy import sample_binary_rows, sample_records, sample_tabledef
@@ -972,6 +972,105 @@ async def test_copy_table_across(aconn_cls, dsn, faker, mode):
             faker.assert_record(got, want)
 
 
+async def test_copy_concurrency(aconn):
+    """
+    Test that copy operations hold the connection lock for the entire operation.
+
+    This test verifies the fix for the concurrency issue where AsyncCursor.copy()
+    was not holding the connection lock throughout the copy context, allowing
+    concurrent operations to interfere.
+    """
+    await aconn.execute("create temp table copy_concurrency_test (id int, data text)")
+
+    # Events to coordinate execution between copy task and workers
+    copy_entered = AEvent()
+    wrote_first = AEvent()
+    wrote_second = AEvent()
+    can_proceed = AEvent()
+
+    # Track execution order to verify workers run after copy completes
+    execution_log = []
+
+    async def copy_task():
+        """Copy task that writes two rows with controlled pauses."""
+        cur = aconn.cursor()
+        async with cur.copy("copy copy_concurrency_test from stdin") as copy:
+            # Pause after entering copy context
+            execution_log.append("entered_copy")
+            copy_entered.set()
+            await can_proceed.wait()
+
+            # Write first row and pause
+            await copy.write_row((1, "first"))
+            execution_log.append("wrote_row_1")
+            wrote_first.set()
+            await can_proceed.wait()
+
+            # Write second row and pause
+            await copy.write_row((2, "second"))
+            execution_log.append("wrote_row_2")
+            wrote_second.set()
+            await can_proceed.wait()
+
+        # Copy context exited, lock should now be released
+        execution_log.append("exited_copy")
+
+    async def worker_task():
+        """
+        Worker that attempts to execute a query on a different cursor.
+        Should block until copy completes due to connection lock.
+        """
+        # Try to execute on another cursor - this should block until copy exits
+        worker_cur = aconn.cursor()
+        await worker_cur.execute("select 1")
+        execution_log.append("worker_completed")
+
+    # Start the copy task
+    t_copy = spawn(copy_task)
+
+    # Wait for copy to enter, then spawn first worker
+    await copy_entered.wait()
+    t_worker1 = spawn(worker_task)
+
+    # Allow copy to proceed to write first row
+    can_proceed.set()
+    can_proceed.clear()
+    await wrote_first.wait()
+
+    # Spawn second worker after first row
+    t_worker2 = spawn(worker_task)
+
+    # Allow copy to proceed to write second row
+    can_proceed.set()
+    can_proceed.clear()
+    await wrote_second.wait()
+
+    # Spawn third worker after second row
+    t_worker3 = spawn(worker_task)
+
+    # Allow copy to exit
+    can_proceed.set()
+
+    # Wait for all tasks to complete
+    await gather(t_copy, t_worker1, t_worker2, t_worker3)
+
+    # Verify the data was written correctly
+    cur = await aconn.execute("select * from copy_concurrency_test order by id")
+    rows = await cur.fetchall()
+    assert rows == [(1, "first"), (2, "second")]
+
+    # Verify that all workers completed AFTER copy exited
+    assert execution_log == [
+        "entered_copy",
+        "wrote_row_1",
+        "wrote_row_2",
+        "exited_copy",
+        "worker_completed",
+        "worker_completed",
+        "worker_completed",
+    ]
+
+
 class DataGenerator:
     def __init__(self, conn, nrecs, srec, offset=0, block_size=8192):
         self.conn = conn