- Drop support for Python 3.8 (:ticket:`#976`) and 3.9 (:ticket:`#1056`).
+Psycopg 3.2.13 (unreleased)
+^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+- Fix `Cursor.copy()` and `AsyncCursor.copy()` to hold the connection lock
+ for the entire operation, preventing concurrent access issues (:ticket:`#1210`).
+
+
Current release
---------------
with self._conn.lock:
self._conn.wait(self._start_copy_gen(statement, params))
- with Copy(self, writer=writer) as copy:
- yield copy
+ with Copy(self, writer=writer) as copy:
+ yield copy
except e._NO_TRACEBACK as ex:
raise ex.with_traceback(None)
async with self._conn.lock:
await self._conn.wait(self._start_copy_gen(statement, params))
- async with AsyncCopy(self, writer=writer) as copy:
- yield copy
+ async with AsyncCopy(self, writer=writer) as copy:
+ yield copy
except e._NO_TRACEBACK as ex:
raise ex.with_traceback(None)
from psycopg.types.numeric import Int4
from .utils import eur
+from .acompat import Event, gather, spawn
from ._test_copy import sample_binary # noqa: F401
from ._test_copy import FileWriter, ensure_table, py_to_raw, sample_binary_rows
from ._test_copy import sample_records, sample_tabledef, sample_text, sample_values
faker.assert_record(got, want)
+def test_copy_concurrency(conn):
+ """
+ Test that copy operations hold the connection lock for the entire operation.
+
+ This test verifies the fix for the concurrency issue where Cursor.copy()
+ was not holding the connection lock throughout the copy context, allowing
+ concurrent operations to interfere.
+ """
+ conn.execute("create temp table copy_concurrency_test (id int, data text)")
+
+ # Events to coordinate execution between copy task and workers
+ copy_entered = Event()
+ wrote_first = Event()
+ wrote_second = Event()
+ can_proceed = Event()
+
+ # Track execution order to verify workers run after copy completes
+ execution_log = []
+
+ def copy_task():
+ """Copy task that writes two rows with controlled pauses."""
+ cur = conn.cursor()
+ with cur.copy("copy copy_concurrency_test from stdin") as copy:
+ # Pause after entering copy context
+ execution_log.append("entered_copy")
+ copy_entered.set()
+ can_proceed.wait()
+
+ # Write first row and pause
+ copy.write_row((1, "first"))
+ execution_log.append("wrote_row_1")
+ wrote_first.set()
+ can_proceed.wait()
+
+ # Write second row and pause
+ copy.write_row((2, "second"))
+ execution_log.append("wrote_row_2")
+ wrote_second.set()
+ can_proceed.wait()
+
+ # Copy context exited, lock should now be released
+ execution_log.append("exited_copy")
+
+ def worker_task():
+ """
+ Worker that attempts to execute a query on a different cursor.
+ Should block until copy completes due to connection lock.
+ """
+ # Try to execute on another cursor - this should block until copy exits
+ worker_cur = conn.cursor()
+ worker_cur.execute("select 1")
+ execution_log.append("worker_completed")
+
+ # Start the copy task
+ t_copy = spawn(copy_task)
+
+ # Wait for copy to enter, then spawn first worker
+ copy_entered.wait()
+ t_worker1 = spawn(worker_task)
+
+ # Allow copy to proceed to write first row
+ can_proceed.set()
+ can_proceed.clear()
+ wrote_first.wait()
+
+ # Spawn second worker after first row
+ t_worker2 = spawn(worker_task)
+
+ # Allow copy to proceed to write second row
+ can_proceed.set()
+ can_proceed.clear()
+ wrote_second.wait()
+
+ # Spawn third worker after second row
+ t_worker3 = spawn(worker_task)
+
+ # Allow copy to exit
+ can_proceed.set()
+
+ # Wait for all tasks to complete
+ gather(t_copy, t_worker1, t_worker2, t_worker3)
+
+ # Verify the data was written correctly
+ cur = conn.execute("select * from copy_concurrency_test order by id")
+ rows = cur.fetchall()
+ assert rows == [(1, "first"), (2, "second")]
+
+ # Verify that all workers completed AFTER copy exited
+ assert execution_log == [
+ "entered_copy",
+ "wrote_row_1",
+ "wrote_row_2",
+ "exited_copy",
+ "worker_completed",
+ "worker_completed",
+ "worker_completed",
+ ]
+
+
class DataGenerator:
def __init__(self, conn, nrecs, srec, offset=0, block_size=8192):
from psycopg.types.numeric import Int4
from .utils import eur
-from .acompat import alist
+from .acompat import AEvent, alist, gather, spawn
from ._test_copy import sample_binary # noqa: F401
from ._test_copy import AsyncFileWriter, ensure_table_async, py_to_raw
from ._test_copy import sample_binary_rows, sample_records, sample_tabledef
faker.assert_record(got, want)
+async def test_copy_concurrency(aconn):
+ """
+ Test that copy operations hold the connection lock for the entire operation.
+
+ This test verifies the fix for the concurrency issue where AsyncCursor.copy()
+ was not holding the connection lock throughout the copy context, allowing
+ concurrent operations to interfere.
+ """
+ await aconn.execute("create temp table copy_concurrency_test (id int, data text)")
+
+ # Events to coordinate execution between copy task and workers
+ copy_entered = AEvent()
+ wrote_first = AEvent()
+ wrote_second = AEvent()
+ can_proceed = AEvent()
+
+ # Track execution order to verify workers run after copy completes
+ execution_log = []
+
+ async def copy_task():
+ """Copy task that writes two rows with controlled pauses."""
+ cur = aconn.cursor()
+ async with cur.copy("copy copy_concurrency_test from stdin") as copy:
+ # Pause after entering copy context
+ execution_log.append("entered_copy")
+ copy_entered.set()
+ await can_proceed.wait()
+
+ # Write first row and pause
+ await copy.write_row((1, "first"))
+ execution_log.append("wrote_row_1")
+ wrote_first.set()
+ await can_proceed.wait()
+
+ # Write second row and pause
+ await copy.write_row((2, "second"))
+ execution_log.append("wrote_row_2")
+ wrote_second.set()
+ await can_proceed.wait()
+
+ # Copy context exited, lock should now be released
+ execution_log.append("exited_copy")
+
+ async def worker_task():
+ """
+ Worker that attempts to execute a query on a different cursor.
+ Should block until copy completes due to connection lock.
+ """
+ # Try to execute on another cursor - this should block until copy exits
+ worker_cur = aconn.cursor()
+ await worker_cur.execute("select 1")
+ execution_log.append("worker_completed")
+
+ # Start the copy task
+ t_copy = spawn(copy_task)
+
+ # Wait for copy to enter, then spawn first worker
+ await copy_entered.wait()
+ t_worker1 = spawn(worker_task)
+
+ # Allow copy to proceed to write first row
+ can_proceed.set()
+ can_proceed.clear()
+ await wrote_first.wait()
+
+ # Spawn second worker after first row
+ t_worker2 = spawn(worker_task)
+
+ # Allow copy to proceed to write second row
+ can_proceed.set()
+ can_proceed.clear()
+ await wrote_second.wait()
+
+ # Spawn third worker after second row
+ t_worker3 = spawn(worker_task)
+
+ # Allow copy to exit
+ can_proceed.set()
+
+ # Wait for all tasks to complete
+ await gather(t_copy, t_worker1, t_worker2, t_worker3)
+
+ # Verify the data was written correctly
+ cur = await aconn.execute("select * from copy_concurrency_test order by id")
+ rows = await cur.fetchall()
+ assert rows == [(1, "first"), (2, "second")]
+
+ # Verify that all workers completed AFTER copy exited
+ assert execution_log == [
+ "entered_copy",
+ "wrote_row_1",
+ "wrote_row_2",
+ "exited_copy",
+ "worker_completed",
+ "worker_completed",
+ "worker_completed",
+ ]
+
+
class DataGenerator:
def __init__(self, conn, nrecs, srec, offset=0, block_size=8192):
self.conn = conn