def test_cannot_insert_multiple_commands(conn):
- with pytest.raises(psycopg.errors.SyntaxError) as cm:
+ with pytest.raises((e.SyntaxError, e.InvalidPreparedStatementDefinition)):
with conn.pipeline():
conn.execute("select 1; select 2")
- assert cm.value.sqlstate == "42601"
def test_copy(conn):
assert cur2.fetchone() == (2,)
+@pytest.mark.crdb("skip", reason="deferrable")
def test_error_on_commit(conn):
conn.execute(
"""
)
with conn.pipeline(), conn.cursor() as cur:
cur.executemany(
- "insert into execmanypipeline(num) values (%s) returning id",
+ "insert into execmanypipeline(num) values (%s) returning num",
[(10,), (20,)],
returning=True,
)
assert cur.rowcount == 2
- assert cur.fetchone() == (1,)
+ assert cur.fetchone() == (10,)
assert cur.nextset()
- assert cur.fetchone() == (2,)
+ assert cur.fetchone() == (20,)
assert cur.nextset() is None
assert cur.nextset() is None
+@pytest.mark.crdb("skip", reason="temp tables")
def test_executemany_trace(conn, trace):
conn.autocommit = True
cur = conn.cursor()
assert len([i for i in items if i.type == "Sync"]) == 1
+@pytest.mark.crdb("skip", reason="temp tables")
def test_executemany_trace_returning(conn, trace):
conn.autocommit = True
cur = conn.cursor()
conn.autocommit = True
with conn.pipeline():
c1 = conn.execute("select %s::int", [10], prepare=True)
- c2 = conn.execute("select count(*) from pg_prepared_statements")
+ c2 = conn.execute(
+ "select count(*) from pg_prepared_statements where name != ''"
+ )
(r,) = c1.fetchone()
assert r == 10
conn.prepared_threshold = 5
with conn.pipeline():
cursors = [
- conn.execute("select count(*) from pg_prepared_statements")
+ conn.execute("select count(*) from pg_prepared_statements where name != ''")
for i in range(10)
]
def test_outer_transaction(conn):
+ with conn.transaction():
+ conn.execute("drop table if exists outertx")
with conn.transaction():
with conn.pipeline():
- conn.execute("drop table if exists outertx")
conn.execute("create table outertx as (select 1)")
cur = conn.execute("select * from outertx")
(r,) = cur.fetchone()
def test_concurrency(conn):
with conn.transaction():
conn.execute("drop table if exists pipeline_concurrency")
+ conn.execute("drop table if exists accessed")
+ with conn.transaction():
conn.execute(
"create unlogged table pipeline_concurrency ("
" id serial primary key,"
" value integer"
")"
)
- conn.execute("drop table if exists accessed")
conn.execute("create unlogged table accessed as (select now() as value)")
def update(value):
cur = conn.execute(
- "insert into pipeline_concurrency(value) values (%s) returning id",
+ "insert into pipeline_concurrency(value) values (%s) returning value",
(value,),
)
conn.execute("update accessed set value = now()")
async def test_cannot_insert_multiple_commands(aconn):
- with pytest.raises(psycopg.errors.SyntaxError) as cm:
+ with pytest.raises((e.SyntaxError, e.InvalidPreparedStatementDefinition)):
async with aconn.pipeline():
await aconn.execute("select 1; select 2")
- assert cm.value.sqlstate == "42601"
async def test_copy(aconn):
assert await cur2.fetchone() == (2,)
+@pytest.mark.crdb("skip", reason="deferrable")
async def test_error_on_commit(aconn):
await aconn.execute(
"""
)
async with aconn.pipeline(), aconn.cursor() as cur:
await cur.executemany(
- "insert into execmanypipeline(num) values (%s) returning id",
+ "insert into execmanypipeline(num) values (%s) returning num",
[(10,), (20,)],
returning=True,
)
assert cur.rowcount == 2
- assert (await cur.fetchone()) == (1,)
+ assert (await cur.fetchone()) == (10,)
assert cur.nextset()
- assert (await cur.fetchone()) == (2,)
+ assert (await cur.fetchone()) == (20,)
assert cur.nextset() is None
assert cur.nextset() is None
+@pytest.mark.crdb("skip", reason="temp tables")
async def test_executemany_trace(aconn, trace):
await aconn.set_autocommit(True)
cur = aconn.cursor()
assert len([i for i in items if i.type == "Sync"]) == 1
+@pytest.mark.crdb("skip", reason="temp tables")
async def test_executemany_trace_returning(aconn, trace):
await aconn.set_autocommit(True)
cur = aconn.cursor()
await aconn.set_autocommit(True)
async with aconn.pipeline():
c1 = await aconn.execute("select %s::int", [10], prepare=True)
- c2 = await aconn.execute("select count(*) from pg_prepared_statements")
+ c2 = await aconn.execute(
+ "select count(*) from pg_prepared_statements where name != ''"
+ )
(r,) = await c1.fetchone()
assert r == 10
aconn.prepared_threshold = 5
async with aconn.pipeline():
cursors = [
- await aconn.execute("select count(*) from pg_prepared_statements")
+ await aconn.execute(
+ "select count(*) from pg_prepared_statements where name != ''"
+ )
for i in range(10)
]
async def test_outer_transaction(aconn):
+ async with aconn.transaction():
+ await aconn.execute("drop table if exists outertx")
async with aconn.transaction():
async with aconn.pipeline():
- await aconn.execute("drop table if exists outertx")
await aconn.execute("create table outertx as (select 1)")
cur = await aconn.execute("select * from outertx")
(r,) = await cur.fetchone()
async def test_concurrency(aconn):
async with aconn.transaction():
await aconn.execute("drop table if exists pipeline_concurrency")
+ await aconn.execute("drop table if exists accessed")
+ async with aconn.transaction():
await aconn.execute(
"create unlogged table pipeline_concurrency ("
" id serial primary key,"
" value integer"
")"
)
- await aconn.execute("drop table if exists accessed")
await aconn.execute("create unlogged table accessed as (select now() as value)")
async def update(value):
cur = await aconn.execute(
- "insert into pipeline_concurrency(value) values (%s) returning id",
+ "insert into pipeline_concurrency(value) values (%s) returning value",
(value,),
)
await aconn.execute("update accessed set value = now()")