point is established by Psycopg:
- using the `Pipeline.sync()` method;
-- on `Connection.rollback()`;
+- on `Connection.commit()` or `~Connection.rollback()`;
- at the end of a `!Pipeline` block;
- using a fetch method such as `Cursor.fetchone()` (which only flushes the
query but doesn't issue a Sync and doesn't reset a pipeline state error).
yield from self._exec_command(b"COMMIT")
+ if self._pipeline:
+ yield from self._pipeline._sync_gen()
+
def _rollback_gen(self) -> PQGen[None]:
"""Generator implementing `Connection.rollback()`."""
if self._num_transactions:
p.sync()
+def test_errors_raised_on_commit(conn):
+ with conn.pipeline():
+ conn.execute("select 1 from nosuchtable")
+ with pytest.raises(e.UndefinedTable):
+ conn.commit()
+ conn.rollback()
+ cur1 = conn.execute("select 1")
+ cur2 = conn.execute("select 2")
+
+ assert cur1.fetchone() == (1,)
+ assert cur2.fetchone() == (2,)
+
+
+def test_error_on_commit(conn):
+ conn.execute(
+ """
+ drop table if exists selfref;
+ create table selfref (
+ x serial primary key,
+ y int references selfref (x) deferrable initially deferred)
+ """
+ )
+ conn.commit()
+
+ with conn.pipeline():
+ conn.execute("insert into selfref (y) values (-1)")
+ with pytest.raises(e.ForeignKeyViolation):
+ conn.commit()
+ cur1 = conn.execute("select 1")
+ cur2 = conn.execute("select 2")
+
+ assert cur1.fetchone() == (1,)
+ assert cur2.fetchone() == (2,)
+
+
def test_fetch_no_result(conn):
with conn.pipeline():
cur = conn.cursor()
await p.sync()
+async def test_errors_raised_on_commit(aconn):
+ async with aconn.pipeline():
+ await aconn.execute("select 1 from nosuchtable")
+ with pytest.raises(e.UndefinedTable):
+ await aconn.commit()
+ await aconn.rollback()
+ cur1 = await aconn.execute("select 1")
+ cur2 = await aconn.execute("select 2")
+
+ assert await cur1.fetchone() == (1,)
+ assert await cur2.fetchone() == (2,)
+
+
+async def test_error_on_commit(aconn):
+ await aconn.execute(
+ """
+ drop table if exists selfref;
+ create table selfref (
+ x serial primary key,
+ y int references selfref (x) deferrable initially deferred)
+ """
+ )
+ await aconn.commit()
+
+ async with aconn.pipeline():
+ await aconn.execute("insert into selfref (y) values (-1)")
+ with pytest.raises(e.ForeignKeyViolation):
+ await aconn.commit()
+ cur1 = await aconn.execute("select 1")
+ cur2 = await aconn.execute("select 2")
+
+ assert (await cur1.fetchone()) == (1,)
+ assert (await cur2.fetchone()) == (2,)
+
+
async def test_fetch_no_result(aconn):
async with aconn.pipeline():
cur = aconn.cursor()