if not pipeline:
# No-op re-entered inner pipeline block.
- yield self._pipeline
+ try:
+ yield self._pipeline
+ finally:
+ self._pipeline.sync()
return
try:
if not pipeline:
# No-op re-entered inner pipeline block.
- yield self._pipeline
+ try:
+ yield self._pipeline
+ finally:
+ await self._pipeline.sync()
return
try:
"""
try:
if Pipeline.is_supported():
- with self._conn.pipeline(), self._conn.lock:
- self._conn.wait(
- self._executemany_gen_pipeline(query, params_seq, returning)
- )
+ # If there is already a pipeline, ride it, in order to avoid
+ # sending unnecessary Sync.
+ with self._conn.lock:
+ p = self._conn._pipeline
+ if p:
+ self._conn.wait(
+ self._executemany_gen_pipeline(query, params_seq, returning)
+ )
+ # Otherwise, make a new one
+ if not p:
+ with self._conn.pipeline(), self._conn.lock:
+ self._conn.wait(
+ self._executemany_gen_pipeline(query, params_seq, returning)
+ )
else:
with self._conn.lock:
self._conn.wait(
) -> None:
try:
if Pipeline.is_supported():
- async with self._conn.pipeline(), self._conn.lock:
- await self._conn.wait(
- self._executemany_gen_pipeline(query, params_seq, returning)
- )
+ # If there is already a pipeline, ride it, in order to avoid
+ # sending unnecessary Sync.
+ async with self._conn.lock:
+ p = self._conn._pipeline
+ if p:
+ await self._conn.wait(
+ self._executemany_gen_pipeline(query, params_seq, returning)
+ )
+ # Otherwise, make a new one
+ if not p:
+ async with self._conn.pipeline(), self._conn.lock:
+ await self._conn.wait(
+ self._executemany_gen_pipeline(query, params_seq, returning)
+ )
else:
await self._conn.wait(
self._executemany_gen_no_pipeline(query, params_seq, returning)
assert len(caplog.records) == 1
+def test_pipeline_exit_sync_trace(conn, trace):
+ t = trace.trace(conn)
+ with conn.pipeline():
+ pass
+ conn.close()
+ assert len([i for i in t if i.type == "Sync"]) == 1
+
+
+def test_pipeline_nested_sync_trace(conn, trace):
+ t = trace.trace(conn)
+ with conn.pipeline():
+ with conn.pipeline():
+ pass
+ conn.close()
+ assert len([i for i in t if i.type == "Sync"]) == 2
+
+
def test_cursor_stream(conn):
with conn.pipeline(), conn.cursor() as cur:
with pytest.raises(psycopg.ProgrammingError):
assert len(caplog.records) == 1
+async def test_pipeline_exit_sync_trace(aconn, trace):
+ t = trace.trace(aconn)
+ async with aconn.pipeline():
+ pass
+ await aconn.close()
+ assert len([i for i in t if i.type == "Sync"]) == 1
+
+
+async def test_pipeline_nested_sync_trace(aconn, trace):
+ t = trace.trace(aconn)
+ async with aconn.pipeline():
+ async with aconn.pipeline():
+ pass
+ await aconn.close()
+ assert len([i for i in t if i.type == "Sync"]) == 2
+
+
async def test_cursor_stream(aconn):
async with aconn.pipeline(), aconn.cursor() as cur:
with pytest.raises(psycopg.ProgrammingError):