import psycopg
from psycopg import pq
-from psycopg.errors import (
- OperationalError,
- ProgrammingError,
- UndefinedColumn,
- UndefinedTable,
-)
+from psycopg import errors as e
pytestmark = pytest.mark.libpq(">= 14")
def test_pipeline_status(conn):
+ assert conn._pipeline is None
with conn.pipeline():
p = conn._pipeline
assert p is not None
assert p.status == pq.PipelineStatus.ON
- with pytest.raises(ProgrammingError):
- with conn.pipeline():
- pass
assert p.status == pq.PipelineStatus.OFF
assert not conn._pipeline
+def test_pipeline_reenter(conn):
+ with conn.pipeline():
+ p = conn._pipeline
+ with conn.pipeline():
+ assert conn._pipeline is p
+ assert p.status == pq.PipelineStatus.ON
+ assert conn._pipeline is p
+ assert p.status == pq.PipelineStatus.ON
+ assert conn._pipeline is None
+ assert p.status == pq.PipelineStatus.OFF
+
+
def test_cursor_stream(conn):
with conn.pipeline(), conn.cursor() as cur:
with pytest.raises(psycopg.ProgrammingError):
def test_pipeline_errors_processed_at_exit(conn):
conn.autocommit = True
- with pytest.raises((OperationalError, UndefinedTable)):
+ with pytest.raises((e.OperationalError, e.UndefinedTable)):
with conn.pipeline():
conn.execute("select * from nosuchtable")
conn.execute("create table voila ()")
conn.autocommit = True
with conn.pipeline():
c1 = conn.execute("select 1")
- with pytest.raises(UndefinedTable):
+ with pytest.raises(e.UndefinedTable):
conn.execute("select * from doesnotexist").fetchone()
- with pytest.raises(OperationalError, match="pipeline aborted"):
+ with pytest.raises(e.OperationalError, match="pipeline aborted"):
conn.execute("select 'aborted'").fetchone()
# Sync restore the connection in usable state.
conn._pipeline.sync()
def test_pipeline_commit_aborted(conn):
- with pytest.raises((UndefinedColumn, OperationalError)):
+ with pytest.raises((e.UndefinedColumn, e.OperationalError)):
with conn.pipeline():
conn.execute("select error")
conn.execute("create table voila ()")
def test_outer_transaction_error(conn):
with conn.transaction():
- with pytest.raises((UndefinedColumn, OperationalError)):
+ with pytest.raises((e.UndefinedColumn, e.OperationalError)):
with conn.pipeline():
conn.execute("select error")
conn.execute("create table voila ()")
import psycopg
from psycopg import pq
-from psycopg.errors import (
- OperationalError,
- ProgrammingError,
- UndefinedColumn,
- UndefinedTable,
-)
+from psycopg import errors as e
pytestmark = [
pytest.mark.libpq(">= 14"),
async def test_pipeline_status(aconn):
+ assert aconn._pipeline is None
async with aconn.pipeline():
p = aconn._pipeline
assert p is not None
assert p.status == pq.PipelineStatus.ON
- with pytest.raises(ProgrammingError):
- async with aconn.pipeline():
- pass
assert p.status == pq.PipelineStatus.OFF
assert not aconn._pipeline
+async def test_pipeline_reenter(aconn):
+ async with aconn.pipeline():
+ p = aconn._pipeline
+ async with aconn.pipeline():
+ assert aconn._pipeline is p
+ assert p.status == pq.PipelineStatus.ON
+ assert aconn._pipeline is p
+ assert p.status == pq.PipelineStatus.ON
+ assert aconn._pipeline is None
+ assert p.status == pq.PipelineStatus.OFF
+
+
async def test_cursor_stream(aconn):
async with aconn.pipeline(), aconn.cursor() as cur:
with pytest.raises(psycopg.ProgrammingError):
async def test_pipeline_errors_processed_at_exit(aconn):
await aconn.set_autocommit(True)
- with pytest.raises((OperationalError, UndefinedTable)):
+ with pytest.raises((e.OperationalError, e.UndefinedTable)):
async with aconn.pipeline():
await aconn.execute("select * from nosuchtable")
await aconn.execute("create table voila ()")
await aconn.set_autocommit(True)
async with aconn.pipeline():
c1 = await aconn.execute("select 1")
- with pytest.raises(UndefinedTable):
+ with pytest.raises(e.UndefinedTable):
await (await aconn.execute("select * from doesnotexist")).fetchone()
- with pytest.raises(OperationalError, match="pipeline aborted"):
+ with pytest.raises(e.OperationalError, match="pipeline aborted"):
await (await aconn.execute("select 'aborted'")).fetchone()
# Sync restore the connection in usable state.
aconn._pipeline.sync()
async def test_pipeline_commit_aborted(aconn):
- with pytest.raises((UndefinedColumn, OperationalError)):
+ with pytest.raises((e.UndefinedColumn, e.OperationalError)):
async with aconn.pipeline():
await aconn.execute("select error")
await aconn.execute("create table voila ()")
async def test_outer_transaction_error(aconn):
async with aconn.transaction():
- with pytest.raises((UndefinedColumn, OperationalError)):
+ with pytest.raises((e.UndefinedColumn, e.OperationalError)):
async with aconn.pipeline():
await aconn.execute("select error")
await aconn.execute("create table voila ()")