raise e.ProgrammingError(
"rollback() cannot be used during a two-phase transaction"
)
+
+ # Get out of a "pipeline aborted" state
+ if self._pipeline and self.pgconn.pipeline_status == pq.PipelineStatus.ABORTED:
+ yield from self._pipeline._sync_gen()
+
if self.pgconn.transaction_status == TransactionStatus.IDLE:
return
for cmd in self._prepared.get_maintenance_commands():
yield from self._exec_command(cmd)
+ if self._pipeline:
+ yield from self._pipeline._sync_gen()
+
def xid(self, format_id: int, gtrid: str, bqual: str) -> Xid:
"""
Returns a `Xid` to pass to the `!tpc_*()` methods of this connection.
force_rollback: bool = False,
):
self._conn = connection
+ self.pgconn = self._conn.pgconn
self._savepoint_name = savepoint_name or ""
self.force_rollback = force_rollback
self._entered = self._exited = False
def __repr__(self) -> str:
cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
- info = pq.misc.connection_summary(self._conn.pgconn)
+ info = pq.misc.connection_summary(self.pgconn)
if not self._entered:
status = "inactive"
elif not self._exited:
if ex:
raise ex
+ # Get out of a "pipeline aborted" state
+ if (
+ self._conn._pipeline
+ and self.pgconn.pipeline_status == pq.PipelineStatus.ABORTED
+ ):
+ yield from self._conn._pipeline._sync_gen()
+
for command in self._get_rollback_commands():
yield from self._conn._exec_command(command)
Also set the internal state of the object and verify consistency.
"""
self._outer_transaction = (
- self._conn.pgconn.transaction_status == TransactionStatus.IDLE
+ self.pgconn.transaction_status == TransactionStatus.IDLE
)
if self._outer_transaction:
# outer transaction: if no name it's only a begin, else
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> bool:
- if self._conn.pgconn.status == ConnStatus.OK:
+ if self.pgconn.status == ConnStatus.OK:
with self._conn.lock:
return self._conn.wait(self._exit_gen(exc_type, exc_val, exc_tb))
else:
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> bool:
- if self._conn.pgconn.status == ConnStatus.OK:
+ if self.pgconn.status == ConnStatus.OK:
async with self._conn.lock:
return await self._conn.wait(self._exit_gen(exc_type, exc_val, exc_tb))
else:
conn.execute("create table voila ()")
+def test_rollback_explicit(conn):
+ conn.autocommit = True
+ with conn.pipeline():
+ with pytest.raises(e.DivisionByZero):
+ cur = conn.execute("select 1 / %s", [0])
+ cur.fetchone()
+ conn.rollback()
+ conn.execute("select 1")
+
+
+def test_rollback_transaction(conn):
+ conn.autocommit = True
+ with pytest.raises(e.DivisionByZero):
+ with conn.pipeline():
+ with conn.transaction():
+ cur = conn.execute("select 1 / %s", [0])
+ cur.fetchone()
+ conn.execute("select 1")
+
+
def test_concurrency(conn):
with conn.transaction():
conn.execute("drop table if exists pipeline_concurrency")
await aconn.execute("create table voila ()")
+async def test_rollback_explicit(aconn):
+ await aconn.set_autocommit(True)
+ async with aconn.pipeline():
+ with pytest.raises(e.DivisionByZero):
+ cur = await aconn.execute("select 1 / %s", [0])
+ await cur.fetchone()
+ await aconn.rollback()
+ await aconn.execute("select 1")
+
+
+async def test_rollback_transaction(aconn):
+ await aconn.set_autocommit(True)
+ with pytest.raises(e.DivisionByZero):
+ async with aconn.pipeline():
+ async with aconn.transaction():
+ cur = await aconn.execute("select 1 / %s", [0])
+ await cur.fetchone()
+ await aconn.execute("select 1")
+
+
async def test_concurrency(aconn):
async with aconn.transaction():
await aconn.execute("drop table if exists pipeline_concurrency")