This generator will be used as the send operations during execute()
step when the connection is in pipeline mode. It can consume results or
send queries depending on socket read/write ready state. Queries to be
sent are in a queue of Callable[[], None] which are built from partial
application of pgconn.send_query() and similar functions.
We add a couple of unit tests, in test_generators.py along with a test
script is taken from PostgreSQL sources. It demonstrates the use of
pipeline mode where query-send and results-fetch steps are interleaved
without any sync point emitted. (In this test, this works because the
output buffer gets full.) The test writes data to logs. Typically, we'd
get:
enter pipeline
sent BEGIN TRANSACTION
sent DROP TABLE IF EXISTS pq_pipeline_demo
sent CREATE UNLOGGED TABLE pq_pipeline_demo( id serial primary key, itemno integer, int8filler int8)
prepare INSERT INTO pq_pipeline_demo(itemno, int8filler) VALUES ($1, $2) as 'prepare'
sent prepared 'prepare' with [b'10000', b'
4611686018427387904']
sent prepared 'prepare' with [b'9999', b'
4611686018427387904']
sent prepared 'prepare' with [b'9998', b'
4611686018427387904']
sent prepared 'prepare' with [b'9997', b'
4611686018427387904']
sent prepared 'prepare' with [b'9996', b'
4611686018427387904']
sent prepared 'prepare' with [b'9995', b'
4611686018427387904']
sent prepared 'prepare' with [b'9994', b'
4611686018427387904']
sent prepared 'prepare' with [b'9993', b'
4611686018427387904']
sent prepared 'prepare' with [b'9992', b'
4611686018427387904']
...
sent prepared 'prepare' with [b'9690', b'
4611686018427387904']
sent prepared 'prepare' with [b'9689', b'
4611686018427387904']
sent prepared 'prepare' with [b'9688', b'
4611686018427387904']
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
...
got COMMAND_OK results
got COMMAND_OK results
sent prepared 'prepare' with [b'9687', b'
4611686018427387904']
sent prepared 'prepare' with [b'9686', b'
4611686018427387904']
sent prepared 'prepare' with [b'9685', b'
4611686018427387904']
sent prepared 'prepare' with [b'9684', b'
4611686018427387904']
sent prepared 'prepare' with [b'9683', b'
4611686018427387904']
...
sent prepared 'prepare' with [b'2', b'
4611686018427387904']
sent prepared 'prepare' with [b'1', b'
4611686018427387904']
sent COMMIT
pipeline sync sent
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
...
got COMMAND_OK results
got COMMAND_OK results
got PIPELINE_SYNC results
exit pipeline
We can see that commands are sent, until the output buffer is full (the
connection is then Read-ready only), then results are fetched, until
more commands can be sent, and the cycle repeat.
Query: TypeAlias = Union[str, bytes, "Composable"]
Params: TypeAlias = Union[Sequence[Any], Mapping[str, Any]]
ConnectionType = TypeVar("ConnectionType", bound="BaseConnection[Any]")
+Command = Callable[[], None]
# TODO: make it recursive when mypy will support it
# DumperKey: TypeAlias = Union[type, Tuple[Union[type, "DumperKey"]]]
from . import pq
from . import errors as e
from .pq import ConnStatus, PollingStatus, ExecStatus
-from .abc import PQGen, PQGenConn
+from .abc import Command, PQGen, PQGenConn
from .pq.abc import PGconn, PGresult
from .waiting import Wait, Ready
+from ._compat import Deque
from ._encodings import pgconn_encoding, conninfo_encoding
logger = logging.getLogger(__name__)
return pgconn.get_result()
+def pipeline_communicate(
+ pgconn: PGconn, commands: Deque[Command]
+) -> PQGen[List[List[PGresult]]]:
+ """Generator to send queries from a connection in pipeline mode while also
+ receiving results.
+
+ Return a list results, including single PIPELINE_SYNC elements.
+ """
+ results = []
+
+ while True:
+ ready = yield Wait.RW
+
+ if ready & Ready.R:
+ pgconn.consume_input()
+ while True:
+ n = pgconn.notifies()
+ if not n:
+ break
+ if pgconn.notify_handler:
+ pgconn.notify_handler(n)
+
+ res: List[PGresult] = []
+ while not pgconn.is_busy():
+ r = pgconn.get_result()
+ if r is None:
+ if not res:
+ break
+ results.append(res)
+ res = []
+ elif r.status == pq.ExecStatus.PIPELINE_SYNC:
+ assert not res
+ results.append([r])
+ else:
+ res.append(r)
+
+ if ready & Ready.W:
+ pgconn.flush()
+ if not commands:
+ break
+ commands.popleft()()
+
+ return results
+
+
_copy_statuses = (
ExecStatus.COPY_IN,
ExecStatus.COPY_OUT,
from psycopg import pq
from psycopg import abc
from psycopg.rows import Row, RowMaker
+from psycopg.abc import Command
from psycopg.adapt import AdaptersMap, PyFormat
from psycopg.pq.abc import PGconn, PGresult
from psycopg.connection import BaseConnection
+from psycopg._compat import Deque
class Transformer(abc.AdaptContext):
types: Optional[Tuple[int, ...]]
def send(pgconn: PGconn) -> abc.PQGen[None]: ...
def fetch_many(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ...
def fetch(pgconn: PGconn) -> abc.PQGen[Optional[PGresult]]: ...
+def pipeline_communicate(
+ pgconn: PGconn, commands: Deque[Command]
+) -> abc.PQGen[List[List[PGresult]]]: ...
# Copy support
def format_row_text(
from psycopg import errors as e
from psycopg.pq import abc, error_message
-from psycopg.abc import PQGen
+from psycopg.abc import Command, PQGen
from psycopg.waiting import Wait, Ready
+from psycopg._compat import Deque
from psycopg._encodings import conninfo_encoding
cdef object WAIT_W = Wait.W
if pgres is NULL:
return None
return pq.PGresult._from_ptr(pgres)
+
+
+def pipeline_communicate(pq.PGconn pgconn, commands: Deque[Command]) -> PQGen[List[List[PGresult]]]:
+ """Generator to send queries from a connection in pipeline mode while also
+ receiving results.
+
+ Return a list results, including single PIPELINE_SYNC elements.
+ """
+ cdef libpq.PGconn *pgconn_ptr = pgconn._pgconn_ptr
+ cdef object notify_handler = pgconn.notify_handler
+ cdef libpq.PGnotify *notify
+ cdef int cires
+ cdef libpq.PGresult *pgres
+ cdef list res = []
+ cdef list results = []
+ cdef pq.PGresult r
+
+ while True:
+ ready = yield WAIT_RW
+
+ if ready & Ready.R:
+ pgconn.consume_input()
+ with nogil:
+ cires = libpq.PQconsumeInput(pgconn_ptr)
+ if 1 != cires:
+ raise e.OperationalError(
+ f"consuming input failed: {error_message(pgconn)}")
+
+ if notify_handler is not None:
+ while True:
+ pynotify = pgconn.notifies()
+ if pynotify is None:
+ break
+ PyObject_CallFunctionObjArgs(
+ notify_handler, <PyObject *>pynotify, NULL
+ )
+ else:
+ while True:
+ notify = libpq.PQnotifies(pgconn_ptr)
+ if notify is NULL:
+ break
+ libpq.PQfreemem(notify)
+
+ res: List[PGresult] = []
+ while not libpq.PQisBusy(pgconn_ptr):
+ pgres = libpq.PQgetResult(pgconn_ptr)
+ if pgres is NULL:
+ if not res:
+ break
+ results.append(res)
+ res = []
+ else:
+ status = libpq.PQresultStatus(pgres)
+ r = pq.PGresult._from_ptr(pgres)
+ if status == libpq.PGRES_PIPELINE_SYNC:
+ assert not res
+ results.append([r])
+ break
+ else:
+ res.append(r)
+
+
+ if ready & Ready.W:
+ libpq.PQflush(pgconn_ptr)
+ if not commands:
+ break
+ commands.popleft()()
+
+ return results
"""Return the number of records in the test table."""
cur = self.conn.execute("select count(*) from test_tpc")
return cur.fetchone()[0]
+
+
+@pytest.fixture(scope="module")
+def generators():
+ """Return the 'generators' module for selected psycopg implementation."""
+ from psycopg import pq
+
+ if pq.__impl__ == "c":
+ from psycopg._cmodule import _psycopg
+
+ return _psycopg
+ else:
+ import psycopg.generators
+
+ return psycopg.generators
--- /dev/null
+"""Pipeline mode demo
+
+This reproduces libpq_pipeline::pipelined_insert PostgreSQL test at
+src/test/modules/libpq_pipeline/libpq_pipeline.c::test_pipelined_insert().
+
+We do not fetch results explicitly (using cursor.fetch*()), this is
+handled by execute() calls when pgconn socket is read-ready, which
+happens when the output buffer is full.
+"""
+import argparse
+import asyncio
+import logging
+from contextlib import contextmanager
+from functools import partial
+from typing import Any, Iterator, Optional, Sequence, Tuple
+
+from psycopg import AsyncConnection, Connection
+from psycopg import pq, waiting
+from psycopg import errors as e
+from psycopg.abc import Command
+from psycopg.generators import pipeline_communicate
+from psycopg.pq._enums import Format
+from psycopg._compat import Deque
+
+
+class LoggingPGconn:
+ """Wrapper for PGconn that logs fetched results."""
+
+ def __init__(self, pgconn: pq.abc.PGconn, logger: logging.Logger):
+ self._pgconn = pgconn
+ self._logger = logger
+
+ def __getattr__(self, name: str) -> Any:
+ return getattr(self._pgconn, name)
+
+ def send_query(self, command: bytes) -> None:
+ self._pgconn.send_query(command)
+ self._logger.info("sent %s", command.decode())
+
+ def send_query_params(
+ self,
+ command: bytes,
+ param_values: Optional[Sequence[Optional[bytes]]],
+ param_types: Optional[Sequence[int]] = None,
+ param_formats: Optional[Sequence[int]] = None,
+ result_format: int = Format.TEXT,
+ ) -> None:
+ self._pgconn.send_query_params(
+ command, param_values, param_types, param_formats, result_format
+ )
+ self._logger.info("sent %s", command.decode())
+
+ def send_query_prepared(
+ self,
+ name: bytes,
+ param_values: Optional[Sequence[Optional[bytes]]],
+ param_formats: Optional[Sequence[int]] = None,
+ result_format: int = Format.TEXT,
+ ) -> None:
+ self._pgconn.send_query_prepared(
+ name, param_values, param_formats, result_format
+ )
+ self._logger.info("sent prepared '%s' with %s", name.decode(), param_values)
+
+ def send_prepare(
+ self,
+ name: bytes,
+ command: bytes,
+ param_types: Optional[Sequence[int]] = None,
+ ) -> None:
+ self._pgconn.send_prepare(name, command, param_types)
+ self._logger.info("prepare %s as '%s'", command.decode(), name.decode())
+
+ def get_result(self) -> Optional[pq.abc.PGresult]:
+ r = self._pgconn.get_result()
+ if r is not None:
+ self._logger.info("got %s result", pq.ExecStatus(r.status).name)
+ return r
+
+
+@contextmanager
+def prepare_pipeline_demo_pq(
+ pgconn: LoggingPGconn, rows_to_send: int, logger: logging.Logger
+) -> Iterator[Tuple[Deque[Command], Deque[str]]]:
+ """Set up pipeline demo with initial queries and yield commands and
+ results queue for pipeline_communicate().
+ """
+ logger.debug("enter pipeline")
+ pgconn.enter_pipeline_mode()
+
+ setup_queries = [
+ ("begin", "BEGIN TRANSACTION"),
+ ("drop table", "DROP TABLE IF EXISTS pq_pipeline_demo"),
+ (
+ "create table",
+ (
+ "CREATE UNLOGGED TABLE pq_pipeline_demo("
+ " id serial primary key,"
+ " itemno integer,"
+ " int8filler int8"
+ ")"
+ ),
+ ),
+ (
+ "prepare",
+ ("INSERT INTO pq_pipeline_demo(itemno, int8filler)" " VALUES ($1, $2)"),
+ ),
+ ]
+
+ commands = Deque[Command]()
+ results_queue = Deque[str]()
+
+ for qname, query in setup_queries:
+ if qname == "prepare":
+ pgconn.send_prepare(qname.encode(), query.encode())
+ else:
+ pgconn.send_query(query.encode())
+ results_queue.append(qname)
+
+ committed = False
+ synced = False
+
+ while True:
+ if rows_to_send:
+ params = [f"{rows_to_send}".encode(), f"{1 << 62}".encode()]
+ commands.append(partial(pgconn.send_query_prepared, b"prepare", params))
+ results_queue.append(f"row {rows_to_send}")
+ rows_to_send -= 1
+
+ elif not committed:
+ committed = True
+ commands.append(partial(pgconn.send_query, b"COMMIT"))
+ results_queue.append("commit")
+
+ elif not synced:
+
+ def sync() -> None:
+ pgconn.pipeline_sync()
+ logger.info("pipeline sync sent")
+
+ synced = True
+ commands.append(sync)
+ results_queue.append("sync")
+
+ else:
+ break
+
+ try:
+ yield commands, results_queue
+ finally:
+ logger.debug("exit pipeline")
+ pgconn.exit_pipeline_mode()
+
+
+def pipeline_demo_pq(rows_to_send: int, logger: logging.Logger) -> None:
+ pgconn = LoggingPGconn(Connection.connect().pgconn, logger)
+ with prepare_pipeline_demo_pq(pgconn, rows_to_send, logger) as (
+ commands,
+ results_queue,
+ ):
+ while results_queue:
+ fetched = waiting.wait(
+ pipeline_communicate(
+ pgconn, # type: ignore[arg-type]
+ commands,
+ ),
+ pgconn.socket,
+ )
+ assert not commands, commands
+ for results in fetched:
+ results_queue.popleft()
+ for r in results:
+ if r.status in (
+ pq.ExecStatus.FATAL_ERROR,
+ pq.ExecStatus.PIPELINE_ABORTED,
+ ):
+ raise e.error_from_result(r)
+
+
+async def pipeline_demo_pq_async(rows_to_send: int, logger: logging.Logger) -> None:
+ pgconn = LoggingPGconn((await AsyncConnection.connect()).pgconn, logger)
+
+ with prepare_pipeline_demo_pq(pgconn, rows_to_send, logger) as (
+ commands,
+ results_queue,
+ ):
+ while results_queue:
+ fetched = await waiting.wait_async(
+ pipeline_communicate(
+ pgconn, # type: ignore[arg-type]
+ commands,
+ ),
+ pgconn.socket,
+ )
+ assert not commands, commands
+ for results in fetched:
+ results_queue.popleft()
+ for r in results:
+ if r.status in (
+ pq.ExecStatus.FATAL_ERROR,
+ pq.ExecStatus.PIPELINE_ABORTED,
+ ):
+ raise e.error_from_result(r)
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "-n",
+ dest="nrows",
+ metavar="ROWS",
+ default=10_000,
+ type=int,
+ help="number of rows to insert",
+ )
+ parser.add_argument(
+ "--async", dest="async_", action="store_true", help="use async API"
+ )
+ parser.add_argument("-l", "--log", help="log file (stderr by default)")
+ args = parser.parse_args()
+ logger = logging.getLogger("psycopg")
+ logger.setLevel(logging.DEBUG)
+ pipeline_logger = logging.getLogger("pipeline")
+ pipeline_logger.setLevel(logging.DEBUG)
+ if args.log:
+ logger.addHandler(logging.FileHandler(args.log))
+ pipeline_logger.addHandler(logging.FileHandler(args.log))
+ else:
+ logger.addHandler(logging.StreamHandler())
+ pipeline_logger.addHandler(logging.StreamHandler())
+ if args.async_:
+ asyncio.run(pipeline_demo_pq_async(args.nrows, pipeline_logger))
+ else:
+ pipeline_demo_pq(args.nrows, pipeline_logger)
+
+
+if __name__ == "__main__":
+ main()
--- /dev/null
+from collections import deque
+from functools import partial
+from typing import List
+
+import pytest
+
+from psycopg import waiting
+from psycopg import pq
+
+from .utils import check_libpq_version
+
+
+@pytest.fixture
+def pipeline(pgconn):
+ if check_libpq_version(pq.version(), ">= 14"):
+ pytest.skip("require libpq >= 14")
+ nb, pgconn.nonblocking = pgconn.nonblocking, True
+ assert pgconn.nonblocking
+ pgconn.enter_pipeline_mode()
+ yield
+ if pgconn.pipeline_status:
+ pgconn.exit_pipeline_mode()
+ pgconn.nonblocking = nb
+
+
+def _run_pipeline_communicate(pgconn, generators, commands, expected_statuses):
+ actual_statuses: List[pq.ExecStatus] = []
+ while len(actual_statuses) != len(expected_statuses):
+ if commands:
+ gen = generators.pipeline_communicate(pgconn, commands)
+ results = waiting.wait(gen, pgconn.socket)
+ for (result,) in results:
+ actual_statuses.append(result.status)
+ else:
+ gen = generators.fetch_many(pgconn)
+ results = waiting.wait(gen, pgconn.socket)
+ for result in results:
+ actual_statuses.append(result.status)
+
+ assert actual_statuses == expected_statuses
+
+
+def test_pipeline_communicate_multi_pipeline(pgconn, pipeline, generators):
+ commands = deque(
+ [
+ partial(pgconn.send_query, b"select 1"),
+ pgconn.pipeline_sync,
+ partial(pgconn.send_query, b"select 2"),
+ pgconn.pipeline_sync,
+ ]
+ )
+ expected_statuses = [
+ pq.ExecStatus.TUPLES_OK,
+ pq.ExecStatus.PIPELINE_SYNC,
+ pq.ExecStatus.TUPLES_OK,
+ pq.ExecStatus.PIPELINE_SYNC,
+ ]
+ _run_pipeline_communicate(pgconn, generators, commands, expected_statuses)
+
+
+def test_pipeline_communicate_no_sync(pgconn, pipeline, generators):
+ numqueries = 10
+ commands = deque(
+ [partial(pgconn.send_query, b"select repeat('xyzxz', 12)")] * numqueries
+ + [pgconn.send_flush_request]
+ )
+ expected_statuses = [pq.ExecStatus.TUPLES_OK] * numqueries
+ _run_pipeline_communicate(pgconn, generators, commands, expected_statuses)
+
+
+@pytest.fixture
+def pipeline_demo(pgconn):
+ if check_libpq_version(pq.version(), ">= 14"):
+ pytest.skip("require libpq >= 14")
+ assert pgconn.pipeline_status == 0
+ res = pgconn.exec_(b"DROP TABLE IF EXISTS pg_pipeline")
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+ res = pgconn.exec_(
+ b"CREATE UNLOGGED TABLE pg_pipeline(" b" id serial primary key, itemno integer)"
+ )
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+ yield "pg_pipeline"
+ res = pgconn.exec_(b"DROP TABLE IF EXISTS pg_pipeline")
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+
+def test_pipeline_communicate_abort(pgconn, pipeline_demo, pipeline, generators):
+ insert_sql = b"insert into pg_pipeline(itemno) values ($1)"
+ commands = deque(
+ [
+ partial(pgconn.send_query_params, insert_sql, [b"1"]),
+ partial(pgconn.send_query, b"select no_such_function(1)"),
+ partial(pgconn.send_query_params, insert_sql, [b"2"]),
+ pgconn.pipeline_sync,
+ partial(pgconn.send_query_params, insert_sql, [b"3"]),
+ pgconn.pipeline_sync,
+ ]
+ )
+ expected_statuses = [
+ pq.ExecStatus.COMMAND_OK,
+ pq.ExecStatus.FATAL_ERROR,
+ pq.ExecStatus.PIPELINE_ABORTED,
+ pq.ExecStatus.PIPELINE_SYNC,
+ pq.ExecStatus.COMMAND_OK,
+ pq.ExecStatus.PIPELINE_SYNC,
+ ]
+ _run_pipeline_communicate(pgconn, generators, commands, expected_statuses)
+ pgconn.exit_pipeline_mode()
+ res = pgconn.exec_(b"select itemno from pg_pipeline order by itemno")
+ assert res.ntuples == 1
+ assert res.get_value(0, 0) == b"3"
+
+
+@pytest.fixture
+def pipeline_uniqviol(pgconn):
+ if check_libpq_version(pq.version(), ">= 14"):
+ pytest.skip("require libpq >= 14")
+ assert pgconn.pipeline_status == 0
+ res = pgconn.exec_(b"DROP TABLE IF EXISTS pg_pipeline_uniqviol")
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+ res = pgconn.exec_(
+ b"CREATE UNLOGGED TABLE pg_pipeline_uniqviol("
+ b" id bigint primary key, idata bigint)"
+ )
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+ res = pgconn.exec_(b"BEGIN")
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+ res = pgconn.prepare(
+ b"insertion",
+ b"insert into pg_pipeline_uniqviol values ($1, $2) returning id",
+ )
+ assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+ return "pg_pipeline_uniqviol"
+
+
+def test_pipeline_communicate_uniqviol(pgconn, pipeline_uniqviol, pipeline, generators):
+ commands = deque(
+ [
+ partial(pgconn.send_query_prepared, b"insertion", [b"1", b"2"]),
+ partial(pgconn.send_query_prepared, b"insertion", [b"2", b"2"]),
+ partial(pgconn.send_query_prepared, b"insertion", [b"1", b"2"]),
+ partial(pgconn.send_query_prepared, b"insertion", [b"3", b"2"]),
+ partial(pgconn.send_query_prepared, b"insertion", [b"4", b"2"]),
+ partial(pgconn.send_query, b"commit"),
+ ]
+ )
+ expected_statuses = [
+ pq.ExecStatus.TUPLES_OK,
+ pq.ExecStatus.TUPLES_OK,
+ pq.ExecStatus.FATAL_ERROR,
+ pq.ExecStatus.PIPELINE_ABORTED,
+ pq.ExecStatus.PIPELINE_ABORTED,
+ pq.ExecStatus.PIPELINE_ABORTED,
+ ]
+ _run_pipeline_communicate(pgconn, generators, commands, expected_statuses)