- Fix handling of certain invalid time zones causing problems on Windows
(:ticket:`#371`).
- Fix segfault occurring when a loader fails initialization (:ticket:`#372`).
+- Fix invalid SAVEPOINT issued when entering `Connection.transaction()` within
+ a pipeline using an implicit transaction (:ticket:`#374`).
Current release
PIPELINE_ABORTED = pq.ExecStatus.PIPELINE_ABORTED
BAD = pq.ConnStatus.BAD
+ACTIVE = pq.TransactionStatus.ACTIVE
+
logger = logging.getLogger("psycopg")
)
if self.level == 0:
self.pgconn.enter_pipeline_mode()
- elif self.command_queue:
+ elif self.command_queue or self.pgconn.transaction_status == ACTIVE:
+ # Nested pipeline case.
+ # Transaction might be ACTIVE when the pipeline uses an "implicit
+ # transaction", typically in autocommit mode. But when entering a
+ # Psycopg transaction(), we expect the IDLE state. By sync()-ing,
+ # we make sure all previous commands are completed and the
+ # transaction gets back to IDLE.
yield from self._sync_gen()
self.level += 1
assert cur2.fetchone() == (2,)
+def test_implicit_transaction(conn):
+ conn.autocommit = True
+ with conn.pipeline():
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.IDLE
+ conn.execute("select 'before'")
+ # Transaction is ACTIVE because previous command is not completed
+ # since we have not fetched its results.
+ assert conn.pgconn.transaction_status == pq.TransactionStatus.ACTIVE
+ # Upon entering the nested pipeline through "with transaction():", a
+ # sync() is emitted to restore the transaction state to IDLE, as
+ # expected to emit a BEGIN.
+ with conn.transaction():
+ conn.execute("select 'tx'")
+ cur = conn.execute("select 'after'")
+ assert cur.fetchone() == ("after",)
+
+
@pytest.mark.crdb_skip("deferrable")
def test_error_on_commit(conn):
conn.execute(
assert await cur2.fetchone() == (2,)
+async def test_implicit_transaction(aconn):
+ await aconn.set_autocommit(True)
+ async with aconn.pipeline():
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.IDLE
+ await aconn.execute("select 'before'")
+ # Transaction is ACTIVE because previous command is not completed
+ # since we have not fetched its results.
+ assert aconn.pgconn.transaction_status == pq.TransactionStatus.ACTIVE
+ # Upon entering the nested pipeline through "with transaction():", a
+ # sync() is emitted to restore the transaction state to IDLE, as
+ # expected to emit a BEGIN.
+ async with aconn.transaction():
+ await aconn.execute("select 'tx'")
+ cur = await aconn.execute("select 'after'")
+ assert await cur.fetchone() == ("after",)
+
+
@pytest.mark.crdb_skip("deferrable")
async def test_error_on_commit(aconn):
await aconn.execute(