From 193e9d923769ada89fe568c9ebd1d0b935b9b622 Mon Sep 17 00:00:00 2001 From: Kamil Monicz Date: Fri, 14 Nov 2025 06:09:07 +0100 Subject: [PATCH] fix; keep the lock for the entire duration of copy Avoid errors in concurrent operations on the same connection, from different cursors. Close #1210 Fix AsyncCursor.copy concurrency Apply linter diff Add test_copy_concurrency and update news Update docs/news.rst Co-authored-by: Daniele Varrazzo Update tests/test_copy_async.py Co-authored-by: Daniele Varrazzo Update tests/test_copy_async.py Co-authored-by: Daniele Varrazzo Update tests/test_copy_async.py Co-authored-by: Daniele Varrazzo Simplify execution_log assertion Revert unnecessary imports --- docs/news.rst | 7 +++ psycopg/psycopg/cursor.py | 4 +- psycopg/psycopg/cursor_async.py | 4 +- tests/test_copy.py | 100 +++++++++++++++++++++++++++++++ tests/test_copy_async.py | 101 +++++++++++++++++++++++++++++++- 5 files changed, 211 insertions(+), 5 deletions(-) diff --git a/docs/news.rst b/docs/news.rst index 4ad615743..47aca8c9f 100644 --- a/docs/news.rst +++ b/docs/news.rst @@ -46,6 +46,13 @@ Psycopg 3.3.0 (unreleased) - Drop support for Python 3.8 (:ticket:`#976`) and 3.9 (:ticket:`#1056`). +Psycopg 3.2.13 (unreleased) +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +- Fix `Cursor.copy()` and `AsyncCursor.copy()` to hold the connection lock + for the entire operation, preventing concurrent access issues (:ticket:`#1210`). + + Current release --------------- diff --git a/psycopg/psycopg/cursor.py b/psycopg/psycopg/cursor.py index fd5600bad..a3383dbae 100644 --- a/psycopg/psycopg/cursor.py +++ b/psycopg/psycopg/cursor.py @@ -298,8 +298,8 @@ class Cursor(BaseCursor["Connection[Any]", Row]): with self._conn.lock: self._conn.wait(self._start_copy_gen(statement, params)) - with Copy(self, writer=writer) as copy: - yield copy + with Copy(self, writer=writer) as copy: + yield copy except e._NO_TRACEBACK as ex: raise ex.with_traceback(None) diff --git a/psycopg/psycopg/cursor_async.py b/psycopg/psycopg/cursor_async.py index ae7d6cec3..22dd94583 100644 --- a/psycopg/psycopg/cursor_async.py +++ b/psycopg/psycopg/cursor_async.py @@ -300,8 +300,8 @@ class AsyncCursor(BaseCursor["AsyncConnection[Any]", Row]): async with self._conn.lock: await self._conn.wait(self._start_copy_gen(statement, params)) - async with AsyncCopy(self, writer=writer) as copy: - yield copy + async with AsyncCopy(self, writer=writer) as copy: + yield copy except e._NO_TRACEBACK as ex: raise ex.with_traceback(None) diff --git a/tests/test_copy.py b/tests/test_copy.py index 18975c99a..4a983e1f3 100644 --- a/tests/test_copy.py +++ b/tests/test_copy.py @@ -20,6 +20,7 @@ from psycopg.types.hstore import register_hstore from psycopg.types.numeric import Int4 from .utils import eur +from .acompat import Event, gather, spawn from ._test_copy import sample_binary # noqa: F401 from ._test_copy import FileWriter, ensure_table, py_to_raw, sample_binary_rows from ._test_copy import sample_records, sample_tabledef, sample_text, sample_values @@ -952,6 +953,105 @@ def test_copy_table_across(conn_cls, dsn, faker, mode): faker.assert_record(got, want) +def test_copy_concurrency(conn): + """ + Test that copy operations hold the connection lock for the entire operation. + + This test verifies the fix for the concurrency issue where Cursor.copy() + was not holding the connection lock throughout the copy context, allowing + concurrent operations to interfere. + """ + conn.execute("create temp table copy_concurrency_test (id int, data text)") + + # Events to coordinate execution between copy task and workers + copy_entered = Event() + wrote_first = Event() + wrote_second = Event() + can_proceed = Event() + + # Track execution order to verify workers run after copy completes + execution_log = [] + + def copy_task(): + """Copy task that writes two rows with controlled pauses.""" + cur = conn.cursor() + with cur.copy("copy copy_concurrency_test from stdin") as copy: + # Pause after entering copy context + execution_log.append("entered_copy") + copy_entered.set() + can_proceed.wait() + + # Write first row and pause + copy.write_row((1, "first")) + execution_log.append("wrote_row_1") + wrote_first.set() + can_proceed.wait() + + # Write second row and pause + copy.write_row((2, "second")) + execution_log.append("wrote_row_2") + wrote_second.set() + can_proceed.wait() + + # Copy context exited, lock should now be released + execution_log.append("exited_copy") + + def worker_task(): + """ + Worker that attempts to execute a query on a different cursor. + Should block until copy completes due to connection lock. + """ + # Try to execute on another cursor - this should block until copy exits + worker_cur = conn.cursor() + worker_cur.execute("select 1") + execution_log.append("worker_completed") + + # Start the copy task + t_copy = spawn(copy_task) + + # Wait for copy to enter, then spawn first worker + copy_entered.wait() + t_worker1 = spawn(worker_task) + + # Allow copy to proceed to write first row + can_proceed.set() + can_proceed.clear() + wrote_first.wait() + + # Spawn second worker after first row + t_worker2 = spawn(worker_task) + + # Allow copy to proceed to write second row + can_proceed.set() + can_proceed.clear() + wrote_second.wait() + + # Spawn third worker after second row + t_worker3 = spawn(worker_task) + + # Allow copy to exit + can_proceed.set() + + # Wait for all tasks to complete + gather(t_copy, t_worker1, t_worker2, t_worker3) + + # Verify the data was written correctly + cur = conn.execute("select * from copy_concurrency_test order by id") + rows = cur.fetchall() + assert rows == [(1, "first"), (2, "second")] + + # Verify that all workers completed AFTER copy exited + assert execution_log == [ + "entered_copy", + "wrote_row_1", + "wrote_row_2", + "exited_copy", + "worker_completed", + "worker_completed", + "worker_completed", + ] + + class DataGenerator: def __init__(self, conn, nrecs, srec, offset=0, block_size=8192): diff --git a/tests/test_copy_async.py b/tests/test_copy_async.py index df3a7e6b7..c28401809 100644 --- a/tests/test_copy_async.py +++ b/tests/test_copy_async.py @@ -17,7 +17,7 @@ from psycopg.types.hstore import register_hstore from psycopg.types.numeric import Int4 from .utils import eur -from .acompat import alist +from .acompat import AEvent, alist, gather, spawn from ._test_copy import sample_binary # noqa: F401 from ._test_copy import AsyncFileWriter, ensure_table_async, py_to_raw from ._test_copy import sample_binary_rows, sample_records, sample_tabledef @@ -972,6 +972,105 @@ async def test_copy_table_across(aconn_cls, dsn, faker, mode): faker.assert_record(got, want) +async def test_copy_concurrency(aconn): + """ + Test that copy operations hold the connection lock for the entire operation. + + This test verifies the fix for the concurrency issue where AsyncCursor.copy() + was not holding the connection lock throughout the copy context, allowing + concurrent operations to interfere. + """ + await aconn.execute("create temp table copy_concurrency_test (id int, data text)") + + # Events to coordinate execution between copy task and workers + copy_entered = AEvent() + wrote_first = AEvent() + wrote_second = AEvent() + can_proceed = AEvent() + + # Track execution order to verify workers run after copy completes + execution_log = [] + + async def copy_task(): + """Copy task that writes two rows with controlled pauses.""" + cur = aconn.cursor() + async with cur.copy("copy copy_concurrency_test from stdin") as copy: + # Pause after entering copy context + execution_log.append("entered_copy") + copy_entered.set() + await can_proceed.wait() + + # Write first row and pause + await copy.write_row((1, "first")) + execution_log.append("wrote_row_1") + wrote_first.set() + await can_proceed.wait() + + # Write second row and pause + await copy.write_row((2, "second")) + execution_log.append("wrote_row_2") + wrote_second.set() + await can_proceed.wait() + + # Copy context exited, lock should now be released + execution_log.append("exited_copy") + + async def worker_task(): + """ + Worker that attempts to execute a query on a different cursor. + Should block until copy completes due to connection lock. + """ + # Try to execute on another cursor - this should block until copy exits + worker_cur = aconn.cursor() + await worker_cur.execute("select 1") + execution_log.append("worker_completed") + + # Start the copy task + t_copy = spawn(copy_task) + + # Wait for copy to enter, then spawn first worker + await copy_entered.wait() + t_worker1 = spawn(worker_task) + + # Allow copy to proceed to write first row + can_proceed.set() + can_proceed.clear() + await wrote_first.wait() + + # Spawn second worker after first row + t_worker2 = spawn(worker_task) + + # Allow copy to proceed to write second row + can_proceed.set() + can_proceed.clear() + await wrote_second.wait() + + # Spawn third worker after second row + t_worker3 = spawn(worker_task) + + # Allow copy to exit + can_proceed.set() + + # Wait for all tasks to complete + await gather(t_copy, t_worker1, t_worker2, t_worker3) + + # Verify the data was written correctly + cur = await aconn.execute("select * from copy_concurrency_test order by id") + rows = await cur.fetchall() + assert rows == [(1, "first"), (2, "second")] + + # Verify that all workers completed AFTER copy exited + assert execution_log == [ + "entered_copy", + "wrote_row_1", + "wrote_row_2", + "exited_copy", + "worker_completed", + "worker_completed", + "worker_completed", + ] + + class DataGenerator: def __init__(self, conn, nrecs, srec, offset=0, block_size=8192): self.conn = conn -- 2.47.3