When entering a transaction on a connection in pipeline mode, we open an
inner pipeline to ensure that a Sync is emitted at the end of
transaction thus restoring the connection in its expected state (i.e.
the same as in non-pipeline mode).
block even if there were no error (e.g. to try a no-op process).
:rtype: Transaction
"""
- with Transaction(self, savepoint_name, force_rollback) as tx:
- yield tx
+ tx = Transaction(self, savepoint_name, force_rollback)
+ if self._pipeline:
+ with tx, self.pipeline():
+ yield tx
+ else:
+ with tx:
+ yield tx
def notifies(self) -> Generator[Notify, None, None]:
"""
:rtype: AsyncTransaction
"""
tx = AsyncTransaction(self, savepoint_name, force_rollback)
- async with tx:
- yield tx
+ if self._pipeline:
+ async with tx, self.pipeline():
+ yield tx
+ else:
+ async with tx:
+ yield tx
async def notifies(self) -> AsyncGenerator[Notify, None]:
while 1:
with conn.transaction():
conn.execute("select 1 from nosuchtable")
here = True
- conn.rollback() # TODO: inconsistent with non-pipeline.
cur1 = conn.execute("select 1")
assert here
cur2 = conn.execute("select 2")
async with aconn.transaction():
await aconn.execute("select 1 from nosuchtable")
here = True
- await aconn.rollback() # TODO: inconsistent with non-pipeline.
cur1 = await aconn.execute("select 1")
assert here
cur2 = await aconn.execute("select 2")