# )
# assert not cur.fetchone()
assert t - t0 < 2
+
+
+@pytest.mark.parametrize("what", ["commit", "rollback", "error"])
+def test_transaction_concurrency(conn, what):
+ conn.autocommit = True
+
+ evs = [threading.Event() for i in range(3)]
+
+ def worker(unlock, wait_on):
+ with pytest.raises(e.ProgrammingError) as ex:
+ with conn.transaction():
+ unlock.set()
+ wait_on.wait()
+ conn.execute("select 1")
+
+ if what == "error":
+ 1 / 0
+ elif what == "rollback":
+ raise psycopg.Rollback()
+ else:
+ assert what == "commit"
+
+ if what == "error":
+ assert "transaction rollback" in str(ex.value)
+ assert isinstance(ex.value.__context__, ZeroDivisionError)
+ elif what == "rollback":
+ assert "transaction rollback" in str(ex.value)
+ assert isinstance(ex.value.__context__, psycopg.Rollback)
+ else:
+ assert "transaction commit" in str(ex.value)
+
+ # Start a first transaction in a thread
+ t1 = threading.Thread(target=worker, kwargs={"unlock": evs[0], "wait_on": evs[1]})
+ t1.start()
+ evs[0].wait()
+
+ # Start a nested transaction in a thread
+ t2 = threading.Thread(target=worker, kwargs={"unlock": evs[1], "wait_on": evs[2]})
+ t2.start()
+
+ # Terminate the first transaction before the second does
+ t1.join()
+ evs[2].set()
+ t2.join()
assert t - t0 < 2
await asyncio.wait_for(test(), 5.0)
+
+
+@pytest.mark.parametrize("what", ["commit", "rollback", "error"])
+async def test_transaction_concurrency(aconn, what):
+ await aconn.set_autocommit(True)
+
+ evs = [asyncio.Event() for i in range(3)]
+
+ async def worker(unlock, wait_on):
+ with pytest.raises(e.ProgrammingError) as ex:
+ async with aconn.transaction():
+ unlock.set()
+ await wait_on.wait()
+ await aconn.execute("select 1")
+
+ if what == "error":
+ 1 / 0
+ elif what == "rollback":
+ raise psycopg.Rollback()
+ else:
+ assert what == "commit"
+
+ if what == "error":
+ assert "transaction rollback" in str(ex.value)
+ assert isinstance(ex.value.__context__, ZeroDivisionError)
+ elif what == "rollback":
+ assert "transaction rollback" in str(ex.value)
+ assert isinstance(ex.value.__context__, psycopg.Rollback)
+ else:
+ assert "transaction commit" in str(ex.value)
+
+ # Start a first transaction in a task
+ t1 = create_task(worker(unlock=evs[0], wait_on=evs[1]))
+ await evs[0].wait()
+
+ # Start a nested transaction in a task
+ t2 = create_task(worker(unlock=evs[1], wait_on=evs[2]))
+
+ # Terminate the first transaction before the second does
+ await asyncio.gather(t1)
+ evs[2].set()
+ await asyncio.gather(t2)
+# WARNING: this file is auto-generated by 'async_to_sync.py'
+# from the original file 'test_tpc_async.py'
+# DO NOT CHANGE! Change the original file instead.
import pytest
import psycopg
def test_tpc_disabled(conn, pipeline):
- val = int(conn.execute("show max_prepared_transactions").fetchone()[0])
+ cur = conn.execute("show max_prepared_transactions")
+ val = int(cur.fetchone()[0])
if val:
pytest.skip("prepared transactions enabled")
def test_recovered_xids(self, conn, tpc):
# insert a few test xns
+
conn.autocommit = True
cur = conn.cursor()
cur.execute("begin; prepare transaction '1-foo'")
cur = conn.cursor()
cur.execute(
- "select gid from pg_prepared_xacts where database = %s",
- (conn.info.dbname,),
+ "select gid from pg_prepared_xacts where database = %s", (conn.info.dbname,)
)
assert "42_Z3RyaWQ=_YnF1YWw=" == cur.fetchone()[0]
@pytest.mark.parametrize(
"fid, gtrid, bqual",
- [
- (0, "", ""),
- (42, "gtrid", "bqual"),
- (0x7FFFFFFF, "x" * 64, "y" * 64),
- ],
+ [(0, "", ""), (42, "gtrid", "bqual"), (2147483647, "x" * 64, "y" * 64)],
)
def test_xid_roundtrip(self, conn_cls, conn, dsn, tpc, fid, gtrid, bqual):
xid = conn.xid(fid, gtrid, bqual)
with conn_cls.connect(dsn) as conn:
xids = [x for x in conn.tpc_recover() if x.database == conn.info.dbname]
-
assert len(xids) == 1
xid = xids[0]
conn.tpc_rollback(xid)
assert xid.gtrid == gtrid
assert xid.bqual == bqual
- @pytest.mark.parametrize(
- "tid",
- [
- "",
- "hello, world!",
- "x" * 199, # PostgreSQL's limit in transaction id length
- ],
- )
+ # 199 is PostgreSQL's limit in transaction id length
+
+ @pytest.mark.parametrize("tid", ["", "hello, world!", "x" * 199])
def test_unparsed_roundtrip(self, conn_cls, conn, dsn, tpc, tid):
conn.tpc_begin(tid)
conn.tpc_prepare()
with conn_cls.connect(dsn) as conn:
xids = [x for x in conn.tpc_recover() if x.database == conn.info.dbname]
-
assert len(xids) == 1
xid = xids[0]
conn.tpc_rollback(xid)
with conn_cls.connect(dsn) as conn:
xid = [x for x in conn.tpc_recover() if x.database == conn.info.dbname][0]
+
assert 10 == xid.format_id
assert "uni" == xid.gtrid
assert "code" == xid.bqual
assert xid.format_id is None
assert xid.gtrid == "dict-connection"
assert xid.bqual is None
-
-
-class TestXidObject:
- def test_xid_construction(self):
- x1 = psycopg.Xid(74, "foo", "bar")
- 74 == x1.format_id
- "foo" == x1.gtrid
- "bar" == x1.bqual
-
- def test_xid_from_string(self):
- x2 = psycopg.Xid.from_string("42_Z3RyaWQ=_YnF1YWw=")
- 42 == x2.format_id
- "gtrid" == x2.gtrid
- "bqual" == x2.bqual
-
- x3 = psycopg.Xid.from_string("99_xxx_yyy")
- None is x3.format_id
- "99_xxx_yyy" == x3.gtrid
- None is x3.bqual
-
- def test_xid_to_string(self):
- x1 = psycopg.Xid.from_string("42_Z3RyaWQ=_YnF1YWw=")
- str(x1) == "42_Z3RyaWQ=_YnF1YWw="
-
- x2 = psycopg.Xid.from_string("99_xxx_yyy")
- str(x2) == "99_xxx_yyy"
import psycopg
from psycopg.pq import TransactionStatus
-pytestmark = [
- pytest.mark.crdb_skip("2-phase commit"),
-]
+pytestmark = pytest.mark.crdb_skip("2-phase commit")
async def test_tpc_disabled(aconn, apipeline):
assert xid.gtrid == gtrid
assert xid.bqual == bqual
- @pytest.mark.parametrize(
- "tid",
- [
- "",
- "hello, world!",
- "x" * 199, # PostgreSQL's limit in transaction id length
- ],
- )
+ # 199 is PostgreSQL's limit in transaction id length
+ @pytest.mark.parametrize("tid", ["", "hello, world!", "x" * 199])
async def test_unparsed_roundtrip(self, aconn_cls, aconn, dsn, tpc, tid):
await aconn.tpc_begin(tid)
await aconn.tpc_prepare()
+# WARNING: this file is auto-generated by 'async_to_sync.py'
+# from the original file 'test_transaction_async.py'
+# DO NOT CHANGE! Change the original file instead.
import logging
-from threading import Thread, Event
import pytest
@pytest.fixture
-def conn(conn, pipeline):
+def conn(conn, pipeline, anyio_backend):
return conn
@pytest.mark.crdb_skip("pg_terminate_backend")
def test_context_inerror_rollback_no_clobber(conn_cls, conn, pipeline, dsn, caplog):
if pipeline:
- # Only 'conn' is possibly in pipeline mode, but the transaction and
+ # Only 'aconn' is possibly in pipeline mode, but the transaction and
# checks are on 'conn2'.
pytest.skip("not applicable")
caplog.set_level(logging.WARNING, logger="psycopg")
with conn2.transaction():
conn2.execute("select 1")
conn.execute(
- "select pg_terminate_backend(%s::int)",
- [conn2.pgconn.backend_pid],
+ "select pg_terminate_backend(%s::int)", [conn2.pgconn.backend_pid]
)
1 / 0
or the autocommit setting on the connection, as this would interfere
with the transaction scope being managed by the Transaction block.
"""
+
conn.autocommit = False
conn.commit()
conn.rollback()
"""
Connection.autocommit is unchanged both during and after Transaction block.
"""
+
conn.autocommit = autocommit
with conn.transaction():
assert conn.autocommit is autocommit
Outcome:
* Changes made within Transaction context are committed
"""
+
conn.autocommit = False
assert not in_transaction(conn)
with conn.transaction():
Outcome:
* Changes made within Transaction context are discarded
"""
+
conn.autocommit = False
assert not in_transaction(conn)
with pytest.raises(ExpectedException):
* Outer transaction is left running, and no changes are visible to an
outside observer from another connection.
"""
+
conn.autocommit = False
insert_row(conn, "prior")
if pipeline:
* Outer transaction is left running, and no changes are visible to an
outside observer from another connection.
"""
+
conn.autocommit = False
insert_row(conn, "prior")
if pipeline:
with conn.transaction() as tx:
assert commands.popall() == ['SAVEPOINT "_pg3_1"']
assert tx.savepoint_name == "_pg3_1"
+
assert commands.popall() == ['RELEASE "_pg3_1"']
conn.rollback()
assert commands.popall() == ["ROLLBACK"]
assert commands.popall() == ['SAVEPOINT "_pg3_2"']
assert tx.savepoint_name == "_pg3_2"
raise ExpectedException
- assert commands.popall() == [
- 'ROLLBACK TO "_pg3_2"',
- 'RELEASE "_pg3_2"',
- ]
+ assert commands.popall() == ['ROLLBACK TO "_pg3_2"', 'RELEASE "_pg3_2"']
assert commands.popall() == ["COMMIT"]
def test_str(conn, pipeline):
with conn.transaction() as tx:
if pipeline:
+ assert "[INTRANS]" not in str(tx)
+ pipeline.sync()
assert "[INTRANS, pipeline=ON]" in str(tx)
else:
assert "[INTRANS]" in str(tx)
with pytest.raises(e.ProgrammingError):
t2.__exit__(*get_exc_info(exit_error))
-
-
-@pytest.mark.parametrize("what", ["commit", "rollback", "error"])
-def test_concurrency(conn, what):
- conn.autocommit = True
-
- evs = [Event() for i in range(3)]
-
- def worker(unlock, wait_on):
- with pytest.raises(e.ProgrammingError) as ex:
- with conn.transaction():
- unlock.set()
- wait_on.wait()
- conn.execute("select 1")
-
- if what == "error":
- 1 / 0
- elif what == "rollback":
- raise Rollback()
- else:
- assert what == "commit"
-
- if what == "error":
- assert "transaction rollback" in str(ex.value)
- assert isinstance(ex.value.__context__, ZeroDivisionError)
- elif what == "rollback":
- assert "transaction rollback" in str(ex.value)
- assert isinstance(ex.value.__context__, Rollback)
- else:
- assert "transaction commit" in str(ex.value)
-
- # Start a first transaction in a thread
- t1 = Thread(target=worker, kwargs={"unlock": evs[0], "wait_on": evs[1]})
- t1.start()
- evs[0].wait()
-
- # Start a nested transaction in a thread
- t2 = Thread(target=worker, kwargs={"unlock": evs[1], "wait_on": evs[2]})
- t2.start()
-
- # Terminate the first transaction before the second does
- t1.join()
- evs[2].set()
- t2.join()
-import asyncio
import logging
import pytest
...and exiting the context successfully will "commit" the same.
"""
- commands = acommands
-
# Case 1
# Using Transaction explicitly because conn.transaction() enters the contetx
+ assert not acommands
async with aconn.transaction() as tx:
- assert commands.popall() == ["BEGIN"]
+ assert acommands.popall() == ["BEGIN"]
assert not tx.savepoint_name
- assert commands.popall() == ["COMMIT"]
+ assert acommands.popall() == ["COMMIT"]
# Case 1 (with a transaction already started)
await aconn.cursor().execute("select 1")
- assert commands.popall() == ["BEGIN"]
+ assert acommands.popall() == ["BEGIN"]
async with aconn.transaction() as tx:
- assert commands.popall() == ['SAVEPOINT "_pg3_1"']
+ assert acommands.popall() == ['SAVEPOINT "_pg3_1"']
assert tx.savepoint_name == "_pg3_1"
- assert commands.popall() == ['RELEASE "_pg3_1"']
+ assert acommands.popall() == ['RELEASE "_pg3_1"']
await aconn.rollback()
- assert commands.popall() == ["ROLLBACK"]
+ assert acommands.popall() == ["ROLLBACK"]
# Case 2
async with aconn.transaction(savepoint_name="foo") as tx:
- assert commands.popall() == ["BEGIN", 'SAVEPOINT "foo"']
+ assert acommands.popall() == ["BEGIN", 'SAVEPOINT "foo"']
assert tx.savepoint_name == "foo"
- assert commands.popall() == ["COMMIT"]
+ assert acommands.popall() == ["COMMIT"]
# Case 3 (with savepoint name provided)
async with aconn.transaction():
- assert commands.popall() == ["BEGIN"]
+ assert acommands.popall() == ["BEGIN"]
async with aconn.transaction(savepoint_name="bar") as tx:
- assert commands.popall() == ['SAVEPOINT "bar"']
+ assert acommands.popall() == ['SAVEPOINT "bar"']
assert tx.savepoint_name == "bar"
- assert commands.popall() == ['RELEASE "bar"']
- assert commands.popall() == ["COMMIT"]
+ assert acommands.popall() == ['RELEASE "bar"']
+ assert acommands.popall() == ["COMMIT"]
# Case 3 (with savepoint name auto-generated)
async with aconn.transaction():
- assert commands.popall() == ["BEGIN"]
+ assert acommands.popall() == ["BEGIN"]
async with aconn.transaction() as tx:
- assert commands.popall() == ['SAVEPOINT "_pg3_2"']
+ assert acommands.popall() == ['SAVEPOINT "_pg3_2"']
assert tx.savepoint_name == "_pg3_2"
- assert commands.popall() == ['RELEASE "_pg3_2"']
- assert commands.popall() == ["COMMIT"]
+ assert acommands.popall() == ['RELEASE "_pg3_2"']
+ assert acommands.popall() == ["COMMIT"]
async def test_named_savepoints_exception_exit(aconn, acommands):
exception, whatever transaction and/or savepoint was started on enter will
be rolled-back as appropriate.
"""
- commands = acommands
-
# Case 1
with pytest.raises(ExpectedException):
async with aconn.transaction() as tx:
- assert commands.popall() == ["BEGIN"]
+ assert acommands.popall() == ["BEGIN"]
assert not tx.savepoint_name
raise ExpectedException
- assert commands.popall() == ["ROLLBACK"]
+ assert acommands.popall() == ["ROLLBACK"]
# Case 2
with pytest.raises(ExpectedException):
async with aconn.transaction(savepoint_name="foo") as tx:
- assert commands.popall() == ["BEGIN", 'SAVEPOINT "foo"']
+ assert acommands.popall() == ["BEGIN", 'SAVEPOINT "foo"']
assert tx.savepoint_name == "foo"
raise ExpectedException
- assert commands.popall() == ["ROLLBACK"]
+ assert acommands.popall() == ["ROLLBACK"]
# Case 3 (with savepoint name provided)
async with aconn.transaction():
- assert commands.popall() == ["BEGIN"]
+ assert acommands.popall() == ["BEGIN"]
with pytest.raises(ExpectedException):
async with aconn.transaction(savepoint_name="bar") as tx:
- assert commands.popall() == ['SAVEPOINT "bar"']
+ assert acommands.popall() == ['SAVEPOINT "bar"']
assert tx.savepoint_name == "bar"
raise ExpectedException
- assert commands.popall() == ['ROLLBACK TO "bar"', 'RELEASE "bar"']
- assert commands.popall() == ["COMMIT"]
+ assert acommands.popall() == ['ROLLBACK TO "bar"', 'RELEASE "bar"']
+ assert acommands.popall() == ["COMMIT"]
# Case 3 (with savepoint name auto-generated)
async with aconn.transaction():
- assert commands.popall() == ["BEGIN"]
+ assert acommands.popall() == ["BEGIN"]
with pytest.raises(ExpectedException):
async with aconn.transaction() as tx:
- assert commands.popall() == ['SAVEPOINT "_pg3_2"']
+ assert acommands.popall() == ['SAVEPOINT "_pg3_2"']
assert tx.savepoint_name == "_pg3_2"
raise ExpectedException
- assert commands.popall() == [
+ assert acommands.popall() == [
'ROLLBACK TO "_pg3_2"',
'RELEASE "_pg3_2"',
]
- assert commands.popall() == ["COMMIT"]
+ assert acommands.popall() == ["COMMIT"]
async def test_named_savepoints_with_repeated_names_works(aconn):
with pytest.raises(e.ProgrammingError):
await t2.__aexit__(*get_exc_info(exit_error))
-
-
-@pytest.mark.parametrize("what", ["commit", "rollback", "error"])
-async def test_concurrency(aconn, what):
- await aconn.set_autocommit(True)
-
- evs = [asyncio.Event() for i in range(3)]
-
- async def worker(unlock, wait_on):
- with pytest.raises(e.ProgrammingError) as ex:
- async with aconn.transaction():
- unlock.set()
- await wait_on.wait()
- await aconn.execute("select 1")
-
- if what == "error":
- 1 / 0
- elif what == "rollback":
- raise Rollback()
- else:
- assert what == "commit"
-
- if what == "error":
- assert "transaction rollback" in str(ex.value)
- assert isinstance(ex.value.__context__, ZeroDivisionError)
- elif what == "rollback":
- assert "transaction rollback" in str(ex.value)
- assert isinstance(ex.value.__context__, Rollback)
- else:
- assert "transaction commit" in str(ex.value)
-
- # Start a first transaction in a task
- t1 = asyncio.create_task(worker(unlock=evs[0], wait_on=evs[1]))
- await evs[0].wait()
-
- # Start a nested transaction in a task
- t2 = asyncio.create_task(worker(unlock=evs[1], wait_on=evs[2]))
-
- # Terminate the first transaction before the second does
- await asyncio.gather(t1)
- evs[2].set()
- await asyncio.gather(t2)
--- /dev/null
+import psycopg
+
+
+class TestXidObject:
+ def test_xid_construction(self):
+ x1 = psycopg.Xid(74, "foo", "bar")
+ 74 == x1.format_id
+ "foo" == x1.gtrid
+ "bar" == x1.bqual
+
+ def test_xid_from_string(self):
+ x2 = psycopg.Xid.from_string("42_Z3RyaWQ=_YnF1YWw=")
+ 42 == x2.format_id
+ "gtrid" == x2.gtrid
+ "bqual" == x2.bqual
+
+ x3 = psycopg.Xid.from_string("99_xxx_yyy")
+ None is x3.format_id
+ "99_xxx_yyy" == x3.gtrid
+ None is x3.bqual
+
+ def test_xid_to_string(self):
+ x1 = psycopg.Xid.from_string("42_Z3RyaWQ=_YnF1YWw=")
+ str(x1) == "42_Z3RyaWQ=_YnF1YWw="
+
+ x2 = psycopg.Xid.from_string("99_xxx_yyy")
+ str(x2) == "99_xxx_yyy"
"AsyncQueuedLibpqWriter": "QueuedLibpqWriter",
"AsyncRawCursor": "RawCursor",
"AsyncServerCursor": "ServerCursor",
+ "__aenter__": "__enter__",
+ "__aexit__": "__exit__",
"aclose": "close",
"aclosing": "closing",
"acommands": "commands",
"aconn_set": "conn_set",
"alist": "list",
"anext": "next",
+ "apipeline": "pipeline",
"ensure_table_async": "ensure_table",
"find_insert_problem_async": "find_insert_problem",
}
tests/test_pipeline_async.py \
tests/test_prepared_async.py \
tests/test_raw_cursor_async.py \
- tests/test_server_cursor_async.py
+ tests/test_server_cursor_async.py \
+ tests/test_tpc_async.py \
+ tests/test_transaction_async.py
do
sync=${async/_async/}
echo "converting '${async}' -> '${sync}'" >&2