from psycopg import Connection, ProgrammingError, Rollback
+@pytest.fixture
+def conn(conn, pipeline):
+ return conn
+
+
@pytest.fixture(autouse=True)
def create_test_table(svcconn):
"""Creates a table called 'test_table' for use in tests."""
pass
-def test_basic(conn):
+def test_basic(conn, pipeline):
"""Basic use of transaction() to BEGIN and COMMIT a transaction."""
assert not in_transaction(conn)
with conn.transaction():
+ if pipeline:
+ pipeline.sync()
assert in_transaction(conn)
assert not in_transaction(conn)
pass
-def test_begins_on_enter(conn):
+def test_begins_on_enter(conn, pipeline):
"""Transaction does not begin until __enter__() is called."""
tx = conn.transaction()
assert not in_transaction(conn)
with tx:
+ if pipeline:
+ pipeline.sync()
assert in_transaction(conn)
assert not in_transaction(conn)
assert not inserted(conn)
-def test_context_inerror_rollback_no_clobber(conn, dsn, caplog):
+def test_context_inerror_rollback_no_clobber(conn, pipeline, dsn, caplog):
+ if pipeline:
+ # Only 'conn' is possibly in pipeline mode, but the transaction and
+ # checks are on 'conn2'.
+ pytest.skip("not applicable")
caplog.set_level(logging.WARNING, logger="psycopg")
with pytest.raises(ZeroDivisionError):
assert inserted(conn) == {"foo", "baz"}
-def test_prohibits_use_of_commit_rollback_autocommit(conn):
+def test_prohibits_use_of_commit_rollback_autocommit(conn, pipeline):
"""
Within a Transaction block, it is forbidden to touch commit, rollback,
or the autocommit setting on the connection, as this would interfere
with the transaction scope being managed by the Transaction block.
"""
+ if pipeline:
+ # TODO: Fixing Connection._check_intrans() would require calling
+ # conn._pipeline.sync(), which implies turning _check_intrans() into a
+ # generator method.
+ pytest.xfail("Connection._check_intrans() does not account for pipeline mode")
conn.autocommit = False
conn.commit()
conn.rollback()
assert not inserted(svcconn)
-def test_autocommit_off_and_tx_in_progress_successful_exit(conn, svcconn):
+def test_autocommit_off_and_tx_in_progress_successful_exit(conn, pipeline, svcconn):
"""
Scenario:
* Connection has autocommit off but and a transaction is already in
"""
conn.autocommit = False
insert_row(conn, "prior")
+ if pipeline:
+ pipeline.sync()
assert in_transaction(conn)
with conn.transaction():
insert_row(conn, "new")
assert not inserted(svcconn)
-def test_autocommit_off_and_tx_in_progress_exception_exit(conn, svcconn):
+def test_autocommit_off_and_tx_in_progress_exception_exit(conn, pipeline, svcconn):
"""
Scenario:
* Connection has autocommit off but and a transaction is already in
"""
conn.autocommit = False
insert_row(conn, "prior")
+ if pipeline:
+ pipeline.sync()
assert in_transaction(conn)
with pytest.raises(ExpectedException):
with conn.transaction():
assert inserted(svcconn) == {"outer-before", "outer-after"}
-def test_str(conn):
+def test_str(conn, pipeline):
with conn.transaction() as tx:
- assert "[INTRANS]" in str(tx)
+ if pipeline:
+ assert "INTRANS" not in str(tx)
+ pipeline.sync()
+ assert "[INTRANS, pipeline=ON]" in str(tx)
+ else:
+ assert "[INTRANS]" in str(tx)
assert "(active)" in str(tx)
assert "'" not in str(tx)
with conn.transaction("wat") as tx2:
- assert "[INTRANS]" in str(tx2)
+ if pipeline:
+ assert "[INTRANS, pipeline=ON]" in str(tx2)
+ else:
+ assert "[INTRANS]" in str(tx2)
assert "'wat'" in str(tx2)
- assert "[IDLE]" in str(tx)
+ if pipeline:
+ assert "[IDLE, pipeline=ON]" in str(tx)
+ else:
+ assert "[IDLE]" in str(tx)
assert "(terminated)" in str(tx)
with pytest.raises(ZeroDivisionError):
pytestmark = pytest.mark.asyncio
-async def test_basic(aconn):
+@pytest.fixture
+async def aconn(aconn, apipeline):
+ return aconn
+
+
+async def test_basic(aconn, apipeline):
"""Basic use of transaction() to BEGIN and COMMIT a transaction."""
assert not in_transaction(aconn)
async with aconn.transaction():
+ if apipeline:
+ await apipeline.sync()
assert in_transaction(aconn)
assert not in_transaction(aconn)
pass
-async def test_begins_on_enter(aconn):
+async def test_begins_on_enter(aconn, apipeline):
"""Transaction does not begin until __enter__() is called."""
tx = aconn.transaction()
assert not in_transaction(aconn)
async with tx:
+ if apipeline:
+ await apipeline.sync()
assert in_transaction(aconn)
assert not in_transaction(aconn)
assert not await inserted(aconn)
-async def test_context_inerror_rollback_no_clobber(aconn, dsn, caplog):
+async def test_context_inerror_rollback_no_clobber(aconn, apipeline, dsn, caplog):
+ if apipeline:
+ # Only 'aconn' is possibly in pipeline mode, but the transaction and
+ # checks are on 'conn2'.
+ pytest.skip("not applicable")
caplog.set_level(logging.WARNING, logger="psycopg")
with pytest.raises(ZeroDivisionError):
assert await inserted(aconn) == {"foo", "baz"}
-async def test_prohibits_use_of_commit_rollback_autocommit(aconn):
+async def test_prohibits_use_of_commit_rollback_autocommit(aconn, apipeline):
"""
Within a Transaction block, it is forbidden to touch commit, rollback,
or the autocommit setting on the connection, as this would interfere
with the transaction scope being managed by the Transaction block.
"""
+ if apipeline:
+ # TODO: Fixing Connection._check_intrans() would require calling
+ # conn._pipeline.sync(), which implies turning _check_intrans() into a
+ # generator method.
+ pytest.xfail("Connection._check_intrans() does not account for pipeline mode")
await aconn.set_autocommit(False)
await aconn.commit()
await aconn.rollback()
assert not inserted(svcconn)
-async def test_autocommit_off_and_tx_in_progress_successful_exit(aconn, svcconn):
+async def test_autocommit_off_and_tx_in_progress_successful_exit(
+ aconn, apipeline, svcconn
+):
"""
Scenario:
* Connection has autocommit off but and a transaction is already in
"""
await aconn.set_autocommit(False)
await insert_row(aconn, "prior")
+ if apipeline:
+ await apipeline.sync()
assert in_transaction(aconn)
async with aconn.transaction():
await insert_row(aconn, "new")
assert not inserted(svcconn)
-async def test_autocommit_off_and_tx_in_progress_exception_exit(aconn, svcconn):
+async def test_autocommit_off_and_tx_in_progress_exception_exit(
+ aconn, apipeline, svcconn
+):
"""
Scenario:
* Connection has autocommit off but and a transaction is already in
"""
await aconn.set_autocommit(False)
await insert_row(aconn, "prior")
+ if apipeline:
+ await apipeline.sync()
assert in_transaction(aconn)
with pytest.raises(ExpectedException):
async with aconn.transaction():
assert inserted(svcconn) == {"outer-before", "outer-after"}
-async def test_str(aconn):
+async def test_str(aconn, apipeline):
async with aconn.transaction() as tx:
- assert "[INTRANS]" in str(tx)
+ if apipeline:
+ assert "[INTRANS]" not in str(tx)
+ await apipeline.sync()
+ assert "[INTRANS, pipeline=ON]" in str(tx)
+ else:
+ assert "[INTRANS]" in str(tx)
assert "(active)" in str(tx)
assert "'" not in str(tx)
async with aconn.transaction("wat") as tx2:
- assert "[INTRANS]" in str(tx2)
+ if apipeline:
+ assert "[INTRANS, pipeline=ON]" in str(tx2)
+ else:
+ assert "[INTRANS]" in str(tx2)
assert "'wat'" in str(tx2)
- assert "[IDLE]" in str(tx)
+ if apipeline:
+ assert "[IDLE, pipeline=ON]" in str(tx)
+ else:
+ assert "[IDLE]" in str(tx)
assert "(terminated)" in str(tx)
with pytest.raises(ZeroDivisionError):