assert not recwarn, [str(w.message) for w in recwarn.list]
-def test_context_commit(conn, dsn):
+@pytest.fixture
+def testctx(svcconn):
+ svcconn.execute("create table if not exists testctx (id int primary key)")
+ svcconn.execute("delete from testctx")
+ return None
+
+
+def test_context_commit(testctx, conn, dsn):
with conn:
with conn.cursor() as cur:
- cur.execute("drop table if exists textctx")
- cur.execute("create table textctx ()")
+ cur.execute("insert into testctx values (42)")
assert conn.closed
assert not conn.broken
with psycopg.connect(dsn) as conn:
with conn.cursor() as cur:
- cur.execute("select * from textctx")
- assert cur.fetchall() == []
+ cur.execute("select * from testctx")
+ assert cur.fetchall() == [(42,)]
-def test_context_rollback(conn, dsn):
- with conn.cursor() as cur:
- cur.execute("drop table if exists textctx")
- conn.commit()
-
+def test_context_rollback(testctx, conn, dsn):
with pytest.raises(ZeroDivisionError):
with conn:
with conn.cursor() as cur:
- cur.execute("create table textctx ()")
+ cur.execute("insert into testctx values (42)")
1 / 0
assert conn.closed
with psycopg.connect(dsn) as conn:
with conn.cursor() as cur:
- with pytest.raises(e.UndefinedTable):
- cur.execute("select * from textctx")
+ cur.execute("select * from testctx")
+ assert cur.fetchall() == []
def test_context_close(conn):
from .test_cursor import my_row_factory
from .test_connection import tx_params, tx_params_isolation, tx_values_map
from .test_connection import conninfo_params_timeout
+from .test_connection import testctx # noqa: F401 # fixture
from .test_adapt import make_bin_dumper, make_dumper
from .test_conninfo import fake_resolve # noqa: F401
assert not recwarn, [str(w.message) for w in recwarn.list]
+@pytest.mark.usefixtures("testctx")
async def test_context_commit(aconn, dsn):
async with aconn:
async with aconn.cursor() as cur:
- await cur.execute("drop table if exists textctx")
- await cur.execute("create table textctx ()")
+ await cur.execute("insert into testctx values (42)")
assert aconn.closed
assert not aconn.broken
async with await psycopg.AsyncConnection.connect(dsn) as aconn:
async with aconn.cursor() as cur:
- await cur.execute("select * from textctx")
- assert await cur.fetchall() == []
+ await cur.execute("select * from testctx")
+ assert await cur.fetchall() == [(42,)]
+@pytest.mark.usefixtures("testctx")
async def test_context_rollback(aconn, dsn):
- async with aconn.cursor() as cur:
- await cur.execute("drop table if exists textctx")
- await aconn.commit()
-
with pytest.raises(ZeroDivisionError):
async with aconn:
async with aconn.cursor() as cur:
- await cur.execute("create table textctx ()")
+ await cur.execute("insert into testctx values (42)")
1 / 0
assert aconn.closed
async with await psycopg.AsyncConnection.connect(dsn) as aconn:
async with aconn.cursor() as cur:
- with pytest.raises(e.UndefinedTable):
- await cur.execute("select * from textctx")
+ await cur.execute("select * from testctx")
+ assert await cur.fetchall() == []
async def test_context_close(aconn):