- using the `Pipeline.sync()` method;
- on `Connection.commit()` or `~Connection.rollback()`;
- at the end of a `!Pipeline` block;
+- possibly when opening a nested `!Pipeline` block;
- using a fetch method such as `Cursor.fetchone()` (which only flushes the
query but doesn't issue a Sync and doesn't reset a pipeline state error).
BasePipeline._is_supported = pq_version >= 140000
return BasePipeline._is_supported
- def _enter(self) -> None:
+ def _enter_gen(self) -> PQGen[None]:
if self.level == 0:
self.pgconn.enter_pipeline_mode()
+ elif self.command_queue:
+ yield from self._sync_gen()
self.level += 1
def _exit(self) -> None:
raise ex.with_traceback(None)
def __enter__(self) -> "Pipeline":
- self._enter()
+ with self._conn.lock:
+ self._conn.wait(self._enter_gen())
return self
def __exit__(
raise ex.with_traceback(None)
async def __aenter__(self) -> "AsyncPipeline":
- self._enter()
+ async with self._conn.lock:
+ await self._conn.wait(self._enter_gen())
return self
async def __aexit__(
"""
tx = Transaction(self, savepoint_name, force_rollback)
if self._pipeline:
- self._pipeline.sync()
with self.pipeline(), tx, self.pipeline():
yield tx
else:
"""
tx = AsyncTransaction(self, savepoint_name, force_rollback)
if self._pipeline:
- await self._pipeline.sync()
async with self.pipeline(), tx, self.pipeline():
yield tx
else:
def test_transaction(conn):
+ notices = []
+ conn.add_notice_handler(lambda diag: notices.append(diag.message_primary))
+
with conn.pipeline():
with conn.transaction():
cur = conn.execute("select 'tx'")
(r,) = cur.fetchone()
assert r == "rb"
+ assert not notices
+
def test_transaction_nested(conn):
with conn.pipeline():
async def test_transaction(aconn):
+ notices = []
+ aconn.add_notice_handler(lambda diag: notices.append(diag.message_primary))
+
async with aconn.pipeline():
async with aconn.transaction():
cur = await aconn.execute("select 'tx'")
(r,) = await cur.fetchone()
assert r == "rb"
+ assert not notices
+
async def test_transaction_nested(aconn):
async with aconn.pipeline():