From: Denis Laxalde Date: Wed, 25 May 2022 06:51:05 +0000 (+0200) Subject: fix: sync nested pipeline with pending commands upon enter X-Git-Tag: 3.1~70^2~1 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=459ddafc0cafaa73519c4dcd519147c0f0cd5a81;p=thirdparty%2Fpsycopg.git fix: sync nested pipeline with pending commands upon enter When entering a nested pipeline and the outer one has pending commands, we now sync the pipeline. This is probably less surprising at it makes the implicit transaction from a nested pipeline isolated from the outer one. With this, the explicit Sync when entering a transaction is no longer needed. Fix #309. --- diff --git a/docs/advanced/pipeline.rst b/docs/advanced/pipeline.rst index 12a7eb187..21f27b5b6 100644 --- a/docs/advanced/pipeline.rst +++ b/docs/advanced/pipeline.rst @@ -239,6 +239,7 @@ point is established by Psycopg: - using the `Pipeline.sync()` method; - on `Connection.commit()` or `~Connection.rollback()`; - at the end of a `!Pipeline` block; +- possibly when opening a nested `!Pipeline` block; - using a fetch method such as `Cursor.fetchone()` (which only flushes the query but doesn't issue a Sync and doesn't reset a pipeline state error). diff --git a/psycopg/psycopg/_pipeline.py b/psycopg/psycopg/_pipeline.py index e11c7b423..787184860 100644 --- a/psycopg/psycopg/_pipeline.py +++ b/psycopg/psycopg/_pipeline.py @@ -77,9 +77,11 @@ class BasePipeline: BasePipeline._is_supported = pq_version >= 140000 return BasePipeline._is_supported - def _enter(self) -> None: + def _enter_gen(self) -> PQGen[None]: if self.level == 0: self.pgconn.enter_pipeline_mode() + elif self.command_queue: + yield from self._sync_gen() self.level += 1 def _exit(self) -> None: @@ -191,7 +193,8 @@ class Pipeline(BasePipeline): raise ex.with_traceback(None) def __enter__(self) -> "Pipeline": - self._enter() + with self._conn.lock: + self._conn.wait(self._enter_gen()) return self def __exit__( @@ -239,7 +242,8 @@ class AsyncPipeline(BasePipeline): raise ex.with_traceback(None) async def __aenter__(self) -> "AsyncPipeline": - self._enter() + async with self._conn.lock: + await self._conn.wait(self._enter_gen()) return self async def __aexit__( diff --git a/psycopg/psycopg/connection.py b/psycopg/psycopg/connection.py index 4c610bf3f..58cb63c6a 100644 --- a/psycopg/psycopg/connection.py +++ b/psycopg/psycopg/connection.py @@ -903,7 +903,6 @@ class Connection(BaseConnection[Row]): """ tx = Transaction(self, savepoint_name, force_rollback) if self._pipeline: - self._pipeline.sync() with self.pipeline(), tx, self.pipeline(): yield tx else: diff --git a/psycopg/psycopg/connection_async.py b/psycopg/psycopg/connection_async.py index ae8715da1..3a2bc91af 100644 --- a/psycopg/psycopg/connection_async.py +++ b/psycopg/psycopg/connection_async.py @@ -293,7 +293,6 @@ class AsyncConnection(BaseConnection[Row]): """ tx = AsyncTransaction(self, savepoint_name, force_rollback) if self._pipeline: - await self._pipeline.sync() async with self.pipeline(), tx, self.pipeline(): yield tx else: diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 03a299d36..841379bb9 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -402,6 +402,9 @@ def test_auto_prepare(conn): def test_transaction(conn): + notices = [] + conn.add_notice_handler(lambda diag: notices.append(diag.message_primary)) + with conn.pipeline(): with conn.transaction(): cur = conn.execute("select 'tx'") @@ -416,6 +419,8 @@ def test_transaction(conn): (r,) = cur.fetchone() assert r == "rb" + assert not notices + def test_transaction_nested(conn): with conn.pipeline(): diff --git a/tests/test_pipeline_async.py b/tests/test_pipeline_async.py index 70468688c..c170dfa23 100644 --- a/tests/test_pipeline_async.py +++ b/tests/test_pipeline_async.py @@ -402,6 +402,9 @@ async def test_auto_prepare(aconn): async def test_transaction(aconn): + notices = [] + aconn.add_notice_handler(lambda diag: notices.append(diag.message_primary)) + async with aconn.pipeline(): async with aconn.transaction(): cur = await aconn.execute("select 'tx'") @@ -416,6 +419,8 @@ async def test_transaction(aconn): (r,) = await cur.fetchone() assert r == "rb" + assert not notices + async def test_transaction_nested(aconn): async with aconn.pipeline():