- Fix memory leak when lambda/local functions are used as argument for
`~.psycopg.types.json.set_json_dumps()`, `~.psycopg.types.json.set_json_loads()`
(:ticket:`#1108`).
+- Fix coordination of `~Cursor.executemany()` with other concurrent operations
+ on other cursors (:ticket:`#1130`).
- Add support for Python 3.14 (:ticket:`#1053`).
- Fix `psycopg_binary.__version__`.
- Raise a warning if a GSSAPI connection is obtained using the
logger = logging.getLogger("psycopg")
+class _DummyLock:
+
+ def __enter__(self) -> None:
+ pass
+
+ def __exit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> None:
+ pass
+
+
class Pipeline(BasePipeline):
"""Handler for (sync) connection in pipeline mode."""
__module__ = "psycopg"
_conn: Connection[Any]
- def __init__(self, conn: Connection[Any]) -> None:
+ def __init__(self, conn: Connection[Any], _no_lock: bool = False) -> None:
super().__init__(conn)
+ self._lock = _DummyLock() if _no_lock else conn.lock
def sync(self) -> None:
"""Sync the pipeline, send any pending command and receive and process
all available results.
"""
try:
- with self._conn.lock:
+ with self._lock:
self._conn.wait(self._sync_gen())
except e._NO_TRACEBACK as ex:
raise ex.with_traceback(None)
def __enter__(self) -> Self:
- with self._conn.lock:
+ with self._lock:
self._conn.wait(self._enter_gen())
return self
exc_tb: TracebackType | None,
) -> None:
try:
- with self._conn.lock:
+ with self._lock:
self._conn.wait(self._exit_gen())
except Exception as exc2:
# Don't clobber an exception raised in the block with this one
logger = logging.getLogger("psycopg")
+class _DummyLock:
+ async def __aenter__(self) -> None:
+ pass
+
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> None:
+ pass
+
+
class AsyncPipeline(BasePipeline):
"""Handler for (async) connection in pipeline mode."""
__module__ = "psycopg"
_conn: AsyncConnection[Any]
- def __init__(self, conn: AsyncConnection[Any]) -> None:
+ def __init__(self, conn: AsyncConnection[Any], _no_lock: bool = False) -> None:
super().__init__(conn)
+ self._lock = _DummyLock() if _no_lock else conn.lock
async def sync(self) -> None:
"""Sync the pipeline, send any pending command and receive and process
all available results.
"""
try:
- async with self._conn.lock:
+ async with self._lock:
await self._conn.wait(self._sync_gen())
except e._NO_TRACEBACK as ex:
raise ex.with_traceback(None)
async def __aenter__(self) -> Self:
- async with self._conn.lock:
+ async with self._lock:
await self._conn.wait(self._enter_gen())
return self
exc_tb: TracebackType | None,
) -> None:
try:
- async with self._conn.lock:
+ async with self._lock:
await self._conn.wait(self._exit_gen())
except Exception as exc2:
# Don't clobber an exception raised in the block with this one
assert pipeline is self._pipeline
self._pipeline = None
+ @contextmanager
+ def _pipeline_nolock(self) -> Iterator[Pipeline]:
+ """like pipeline() but don't acquire a lock.
+
+ Assume that the caller is holding the lock.
+ """
+
+ # Currently only used internally by Cursor.executemany() in a branch
+ # in which we already established that the connection has no pipeline.
+ # If this changes we may relax the asserts.
+ assert not self._pipeline
+ # WARNING: reference loop, broken ahead.
+ pipeline = self._pipeline = Pipeline(self, _no_lock=True)
+ try:
+ with pipeline:
+ yield pipeline
+ finally:
+ assert pipeline.level == 0
+ assert pipeline is self._pipeline
+ self._pipeline = None
+
def wait(self, gen: PQGen[RV], interval: float | None = _WAIT_INTERVAL) -> RV:
"""
Consume a generator operating on the connection.
assert pipeline is self._pipeline
self._pipeline = None
+ @asynccontextmanager
+ async def _pipeline_nolock(self) -> AsyncIterator[AsyncPipeline]:
+ """like pipeline() but don't acquire a lock.
+
+ Assume that the caller is holding the lock.
+ """
+
+ # Currently only used internally by Cursor.executemany() in a branch
+ # in which we already established that the connection has no pipeline.
+ # If this changes we may relax the asserts.
+ assert not self._pipeline
+ # WARNING: reference loop, broken ahead.
+ pipeline = self._pipeline = AsyncPipeline(self, _no_lock=True)
+ try:
+ async with pipeline:
+ yield pipeline
+ finally:
+ assert pipeline.level == 0
+ assert pipeline is self._pipeline
+ self._pipeline = None
+
async def wait(self, gen: PQGen[RV], interval: float | None = _WAIT_INTERVAL) -> RV:
"""
Consume a generator operating on the connection.
Execute the same command with a sequence of input data.
"""
try:
- if Pipeline.is_supported():
- # If there is already a pipeline, ride it, in order to avoid
- # sending unnecessary Sync.
- with self._conn.lock:
- if p := self._conn._pipeline:
- self._conn.wait(
- self._executemany_gen_pipeline(query, params_seq, returning)
- )
- # Otherwise, make a new one
- if not p:
- with self._conn.pipeline(), self._conn.lock:
+ with self._conn.lock:
+ if Pipeline.is_supported():
+ # If there is already a pipeline, ride it, in order to avoid
+ # sending unnecessary Sync.
+ if self._conn._pipeline:
self._conn.wait(
self._executemany_gen_pipeline(query, params_seq, returning)
)
- else:
- with self._conn.lock:
+ else:
+ # Otherwise, make a new one
+ with self._conn._pipeline_nolock():
+ self._conn.wait(
+ self._executemany_gen_pipeline(
+ query, params_seq, returning
+ )
+ )
+ else:
self._conn.wait(
self._executemany_gen_no_pipeline(query, params_seq, returning)
)
from .copy import AsyncCopy, AsyncWriter
from .rows import AsyncRowFactory, Row, RowMaker
from ._compat import Self
-from ._pipeline import Pipeline
from ._cursor_base import BaseCursor
+from ._pipeline_async import AsyncPipeline
if TYPE_CHECKING:
from .connection_async import AsyncConnection
Execute the same command with a sequence of input data.
"""
try:
- if Pipeline.is_supported():
- # If there is already a pipeline, ride it, in order to avoid
- # sending unnecessary Sync.
- async with self._conn.lock:
- if p := self._conn._pipeline:
- await self._conn.wait(
- self._executemany_gen_pipeline(query, params_seq, returning)
- )
- # Otherwise, make a new one
- if not p:
- async with self._conn.pipeline(), self._conn.lock:
+ async with self._conn.lock:
+ if AsyncPipeline.is_supported():
+ # If there is already a pipeline, ride it, in order to avoid
+ # sending unnecessary Sync.
+ if self._conn._pipeline:
await self._conn.wait(
self._executemany_gen_pipeline(query, params_seq, returning)
)
- else:
- async with self._conn.lock:
+ # Otherwise, make a new one
+ else:
+ async with self._conn._pipeline_nolock():
+ await self._conn.wait(
+ self._executemany_gen_pipeline(
+ query, params_seq, returning
+ )
+ )
+ else:
await self._conn.wait(
self._executemany_gen_no_pipeline(query, params_seq, returning)
)
from psycopg.types import TypeInfo
from .utils import raiseif
-from .acompat import closing
+from .acompat import closing, gather, spawn
from .fix_crdb import crdb_encoding
from ._test_cursor import _execmany, execmany, my_row_factory, ph # noqa: F401
)
+@pytest.mark.slow
+def test_executemany_lock(conn):
+
+ def do_execmany():
+ with conn.cursor() as cur:
+ cur.executemany(ph(cur, "select pg_sleep(%s)"), [(0.1,) for _ in range(10)])
+
+ def do_exec():
+ with conn.cursor() as cur:
+ for i in range(100):
+ cur.execute("select 1")
+
+ gather(spawn(do_execmany), spawn(do_exec))
+
+
def test_rowcount(conn):
cur = conn.cursor()
from psycopg.types import TypeInfo
from .utils import raiseif
-from .acompat import aclosing, alist, anext
+from .acompat import aclosing, alist, anext, gather, spawn
from .fix_crdb import crdb_encoding
from ._test_cursor import _execmany, execmany, my_row_factory, ph # noqa: F401
)
+@pytest.mark.slow
+async def test_executemany_lock(aconn):
+ async def do_execmany():
+ async with aconn.cursor() as cur:
+ await cur.executemany(
+ ph(cur, "select pg_sleep(%s)"), [(0.1,) for _ in range(10)]
+ )
+
+ async def do_exec():
+ async with aconn.cursor() as cur:
+ for i in range(100):
+ await cur.execute("select 1")
+
+ await gather(spawn(do_execmany), spawn(do_exec))
+
+
async def test_rowcount(aconn):
cur = aconn.cursor()