with conn.pipeline() as p:
cur.execute("select 1")
- # PQsendQuery[BEGIN], PQsendQuery
- assert len(p.result_queue) == 2
+ assert len(p.result_queue) == 1
assert cur.fetchone() == (1,)
c1.execute("select 1")
c2.execute("select 2")
- # PQsendQuery[BEGIN], PQsendQuery(2)
- assert len(p.result_queue) == 3
+ assert len(p.result_queue) == 2
(r1,) = c1.fetchone()
assert r1 == 1
assert not notices
+def test_transaction_state_implicit_begin(conn, trace):
+ # Regression test to ensure that the transaction state is correct after
+ # the implicit BEGIN statement (in non-autocommit mode).
+ notices = []
+ conn.add_notice_handler(lambda diag: notices.append(diag.message_primary))
+ t = trace.trace(conn)
+ with conn.pipeline():
+ conn.execute("select 'x'").fetchone()
+ conn.execute("select 'y'")
+ assert not notices
+ assert [
+ e.content[0] for e in t if e.type == "Parse" and b"BEGIN" in e.content[0]
+ ] == [b' "" "BEGIN" 0']
+
+
def test_concurrency(conn):
with conn.transaction():
conn.execute("drop table if exists pipeline_concurrency")
async with aconn.pipeline() as p:
await cur.execute("select 1")
- # PQsendQuery[BEGIN], PQsendQuery
- assert len(p.result_queue) == 2
+ assert len(p.result_queue) == 1
assert await cur.fetchone() == (1,)
await c1.execute("select 1")
await c2.execute("select 2")
- # PQsendQuery[BEGIN], PQsendQuery(2)
- assert len(p.result_queue) == 3
+ assert len(p.result_queue) == 2
(r1,) = await c1.fetchone()
assert r1 == 1
assert not notices
+async def test_transaction_state_implicit_begin(aconn, trace):
+ # Regression test to ensure that the transaction state is correct after
+ # the implicit BEGIN statement (in non-autocommit mode).
+ notices = []
+ aconn.add_notice_handler(lambda diag: notices.append(diag.message_primary))
+ t = trace.trace(aconn)
+ async with aconn.pipeline():
+ await (await aconn.execute("select 'x'")).fetchone()
+ await aconn.execute("select 'y'")
+ assert not notices
+ assert [
+ e.content[0] for e in t if e.type == "Parse" and b"BEGIN" in e.content[0]
+ ] == [b' "" "BEGIN" 0']
+
+
async def test_concurrency(aconn):
async with aconn.transaction():
await aconn.execute("drop table if exists pipeline_concurrency")