+from typing import Any
import concurrent.futures
import pytest
pytestmark = pytest.mark.libpq(">= 14")
-def test_pipeline_status(conn):
+def test_pipeline_status(conn: psycopg.Connection[Any]) -> None:
assert conn._pipeline is None
- with conn.pipeline():
- p = conn._pipeline
- assert p is not None
+ with conn.pipeline() as p:
+ assert conn._pipeline is p
assert p.status == pq.PipelineStatus.ON
assert p.status == pq.PipelineStatus.OFF
assert not conn._pipeline
-def test_pipeline_reenter(conn):
- with conn.pipeline():
- p = conn._pipeline
- with conn.pipeline():
- assert conn._pipeline is p
- assert p.status == pq.PipelineStatus.ON
- assert conn._pipeline is p
- assert p.status == pq.PipelineStatus.ON
+def test_pipeline_reenter(conn: psycopg.Connection[Any]) -> None:
+ with conn.pipeline() as p1:
+ with conn.pipeline() as p2:
+ assert p2 is p1
+ assert p1.status == pq.PipelineStatus.ON
+ assert p2 is p1
+ assert p2.status == pq.PipelineStatus.ON
assert conn._pipeline is None
- assert p.status == pq.PipelineStatus.OFF
+ assert p1.status == pq.PipelineStatus.OFF
def test_cursor_stream(conn):
def test_pipeline_processed_at_exit(conn):
with conn.cursor() as cur:
- with conn.pipeline():
+ with conn.pipeline() as p:
cur.execute("select 1")
# PQsendQuery[BEGIN], PQsendQuery
- assert len(conn._pipeline.result_queue) == 2
+ assert len(p.result_queue) == 2
assert cur.fetchone() == (1,)
def test_pipeline(conn):
- with conn.pipeline():
+ with conn.pipeline() as p:
c1 = conn.cursor()
c2 = conn.cursor()
c1.execute("select 1")
c2.execute("select 2")
# PQsendQuery[BEGIN], PQsendQuery(2)
- assert len(conn._pipeline.result_queue) == 3
+ assert len(p.result_queue) == 3
(r1,) = c1.fetchone()
assert r1 == 1
def test_pipeline_aborted(conn):
conn.autocommit = True
- with conn.pipeline():
+ with conn.pipeline() as p:
c1 = conn.execute("select 1")
with pytest.raises(e.UndefinedTable):
conn.execute("select * from doesnotexist").fetchone()
with pytest.raises(e.OperationalError, match="pipeline aborted"):
conn.execute("select 'aborted'").fetchone()
# Sync restore the connection in usable state.
- conn._pipeline.sync()
+ p.sync()
c2 = conn.execute("select 2")
(r,) = c1.fetchone()
import asyncio
+from typing import Any
import pytest
]
-async def test_pipeline_status(aconn):
+async def test_pipeline_status(aconn: psycopg.AsyncConnection[Any]) -> None:
assert aconn._pipeline is None
- async with aconn.pipeline():
- p = aconn._pipeline
- assert p is not None
+ async with aconn.pipeline() as p:
+ assert aconn._pipeline is p
assert p.status == pq.PipelineStatus.ON
assert p.status == pq.PipelineStatus.OFF
assert not aconn._pipeline
-async def test_pipeline_reenter(aconn):
- async with aconn.pipeline():
- p = aconn._pipeline
- async with aconn.pipeline():
- assert aconn._pipeline is p
- assert p.status == pq.PipelineStatus.ON
- assert aconn._pipeline is p
- assert p.status == pq.PipelineStatus.ON
+async def test_pipeline_reenter(aconn: psycopg.AsyncConnection[Any]) -> None:
+ async with aconn.pipeline() as p1:
+ async with aconn.pipeline() as p2:
+ assert p2 is p1
+ assert p1.status == pq.PipelineStatus.ON
+ assert p2 is p1
+ assert p2.status == pq.PipelineStatus.ON
assert aconn._pipeline is None
- assert p.status == pq.PipelineStatus.OFF
+ assert p1.status == pq.PipelineStatus.OFF
async def test_cursor_stream(aconn):
async def test_pipeline_processed_at_exit(aconn):
async with aconn.cursor() as cur:
- async with aconn.pipeline():
+ async with aconn.pipeline() as p:
await cur.execute("select 1")
# PQsendQuery[BEGIN], PQsendQuery
- assert len(aconn._pipeline.result_queue) == 2
+ assert len(p.result_queue) == 2
assert await cur.fetchone() == (1,)
async def test_pipeline(aconn):
- async with aconn.pipeline():
+ async with aconn.pipeline() as p:
c1 = aconn.cursor()
c2 = aconn.cursor()
await c1.execute("select 1")
await c2.execute("select 2")
# PQsendQuery[BEGIN], PQsendQuery(2)
- assert len(aconn._pipeline.result_queue) == 3
+ assert len(p.result_queue) == 3
(r1,) = await c1.fetchone()
assert r1 == 1
async def test_pipeline_aborted(aconn):
await aconn.set_autocommit(True)
- async with aconn.pipeline():
+ async with aconn.pipeline() as p:
c1 = await aconn.execute("select 1")
with pytest.raises(e.UndefinedTable):
await (await aconn.execute("select * from doesnotexist")).fetchone()
with pytest.raises(e.OperationalError, match="pipeline aborted"):
await (await aconn.execute("select 'aborted'")).fetchone()
# Sync restore the connection in usable state.
- aconn._pipeline.sync()
+ p.sync()
c2 = await aconn.execute("select 2")
(r,) = await c1.fetchone()