from typing_extensions import TypedDict
import psycopg
-from psycopg import Connection, Notify
+from psycopg import Connection, Notify, errors as e
from psycopg.rows import tuple_row
-from psycopg.errors import UndefinedTable
from psycopg.conninfo import conninfo_to_dict, make_conninfo
from .utils import gc_collect
with psycopg.connect(dsn) as conn:
with conn.cursor() as cur:
- with pytest.raises(UndefinedTable):
+ with pytest.raises(e.UndefinedTable):
cur.execute("select * from textctx")
conn.commit()
+def test_commit_error(conn):
+ conn.execute(
+ """
+ drop table if exists selfref;
+ create table selfref (
+ x serial primary key,
+ y int references selfref (x) deferrable initially deferred)
+ """
+ )
+ conn.commit()
+
+ conn.execute("insert into selfref (y) values (-1)")
+ with pytest.raises(e.ForeignKeyViolation):
+ conn.commit()
+ assert conn.pgconn.transaction_status == conn.TransactionStatus.IDLE
+ cur = conn.execute("select 1")
+ assert cur.fetchone() == (1,)
+
+
def test_rollback(conn):
conn.pgconn.exec_(b"drop table if exists foo")
conn.pgconn.exec_(b"create table foo (id int primary key)")
import weakref
import psycopg
-from psycopg import AsyncConnection, Notify
+from psycopg import AsyncConnection, Notify, errors as e
from psycopg.rows import tuple_row
-from psycopg.errors import UndefinedTable
from psycopg.conninfo import conninfo_to_dict, make_conninfo
from .utils import gc_collect
async with await psycopg.AsyncConnection.connect(dsn) as aconn:
async with aconn.cursor() as cur:
- with pytest.raises(UndefinedTable):
+ with pytest.raises(e.UndefinedTable):
await cur.execute("select * from textctx")
await aconn.commit()
+async def test_commit_error(aconn):
+ await aconn.execute(
+ """
+ drop table if exists selfref;
+ create table selfref (
+ x serial primary key,
+ y int references selfref (x) deferrable initially deferred)
+ """
+ )
+ await aconn.commit()
+
+ await aconn.execute("insert into selfref (y) values (-1)")
+ with pytest.raises(e.ForeignKeyViolation):
+ await aconn.commit()
+ assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
+ cur = await aconn.execute("select 1")
+ assert await cur.fetchone() == (1,)
+
+
async def test_rollback(aconn):
aconn.pgconn.exec_(b"drop table if exists foo")
aconn.pgconn.exec_(b"create table foo (id int primary key)")