From 6ba95ce8f57462412e9c94162c555e3cbdb08cca Mon Sep 17 00:00:00 2001 From: Denis Laxalde Date: Mon, 11 Oct 2021 11:35:06 +0200 Subject: [PATCH] Add pipeline_communicate() generator This generator will be used as the send operations during execute() step when the connection is in pipeline mode. It can consume results or send queries depending on socket read/write ready state. Queries to be sent are in a queue of Callable[[], None] which are built from partial application of pgconn.send_query() and similar functions. We add a couple of unit tests, in test_generators.py along with a test script is taken from PostgreSQL sources. It demonstrates the use of pipeline mode where query-send and results-fetch steps are interleaved without any sync point emitted. (In this test, this works because the output buffer gets full.) The test writes data to logs. Typically, we'd get: enter pipeline sent BEGIN TRANSACTION sent DROP TABLE IF EXISTS pq_pipeline_demo sent CREATE UNLOGGED TABLE pq_pipeline_demo( id serial primary key, itemno integer, int8filler int8) prepare INSERT INTO pq_pipeline_demo(itemno, int8filler) VALUES ($1, $2) as 'prepare' sent prepared 'prepare' with [b'10000', b'4611686018427387904'] sent prepared 'prepare' with [b'9999', b'4611686018427387904'] sent prepared 'prepare' with [b'9998', b'4611686018427387904'] sent prepared 'prepare' with [b'9997', b'4611686018427387904'] sent prepared 'prepare' with [b'9996', b'4611686018427387904'] sent prepared 'prepare' with [b'9995', b'4611686018427387904'] sent prepared 'prepare' with [b'9994', b'4611686018427387904'] sent prepared 'prepare' with [b'9993', b'4611686018427387904'] sent prepared 'prepare' with [b'9992', b'4611686018427387904'] ... sent prepared 'prepare' with [b'9690', b'4611686018427387904'] sent prepared 'prepare' with [b'9689', b'4611686018427387904'] sent prepared 'prepare' with [b'9688', b'4611686018427387904'] got COMMAND_OK results got COMMAND_OK results got COMMAND_OK results got COMMAND_OK results got COMMAND_OK results got COMMAND_OK results got COMMAND_OK results got COMMAND_OK results got COMMAND_OK results ... got COMMAND_OK results got COMMAND_OK results sent prepared 'prepare' with [b'9687', b'4611686018427387904'] sent prepared 'prepare' with [b'9686', b'4611686018427387904'] sent prepared 'prepare' with [b'9685', b'4611686018427387904'] sent prepared 'prepare' with [b'9684', b'4611686018427387904'] sent prepared 'prepare' with [b'9683', b'4611686018427387904'] ... sent prepared 'prepare' with [b'2', b'4611686018427387904'] sent prepared 'prepare' with [b'1', b'4611686018427387904'] sent COMMIT pipeline sync sent got COMMAND_OK results got COMMAND_OK results got COMMAND_OK results got COMMAND_OK results ... got COMMAND_OK results got COMMAND_OK results got PIPELINE_SYNC results exit pipeline We can see that commands are sent, until the output buffer is full (the connection is then Read-ready only), then results are fetched, until more commands can be sent, and the cycle repeat. --- psycopg/psycopg/abc.py | 1 + psycopg/psycopg/generators.py | 48 +++- psycopg_c/psycopg_c/_psycopg.pyi | 5 + psycopg_c/psycopg_c/_psycopg/generators.pyx | 72 +++++- tests/fix_psycopg.py | 15 ++ tests/scripts/pipeline-demo.py | 238 ++++++++++++++++++++ tests/test_generators.py | 155 +++++++++++++ 7 files changed, 532 insertions(+), 2 deletions(-) create mode 100644 tests/scripts/pipeline-demo.py create mode 100644 tests/test_generators.py diff --git a/psycopg/psycopg/abc.py b/psycopg/psycopg/abc.py index 227287a05..42a2614b4 100644 --- a/psycopg/psycopg/abc.py +++ b/psycopg/psycopg/abc.py @@ -26,6 +26,7 @@ Buffer: TypeAlias = Union[bytes, bytearray, memoryview] Query: TypeAlias = Union[str, bytes, "Composable"] Params: TypeAlias = Union[Sequence[Any], Mapping[str, Any]] ConnectionType = TypeVar("ConnectionType", bound="BaseConnection[Any]") +Command = Callable[[], None] # TODO: make it recursive when mypy will support it # DumperKey: TypeAlias = Union[type, Tuple[Union[type, "DumperKey"]]] diff --git a/psycopg/psycopg/generators.py b/psycopg/psycopg/generators.py index 5f8f967fb..76583bbf3 100644 --- a/psycopg/psycopg/generators.py +++ b/psycopg/psycopg/generators.py @@ -21,9 +21,10 @@ from typing import List, Optional, Union from . import pq from . import errors as e from .pq import ConnStatus, PollingStatus, ExecStatus -from .abc import PQGen, PQGenConn +from .abc import Command, PQGen, PQGenConn from .pq.abc import PGconn, PGresult from .waiting import Wait, Ready +from ._compat import Deque from ._encodings import pgconn_encoding, conninfo_encoding logger = logging.getLogger(__name__) @@ -153,6 +154,51 @@ def fetch(pgconn: PGconn) -> PQGen[Optional[PGresult]]: return pgconn.get_result() +def pipeline_communicate( + pgconn: PGconn, commands: Deque[Command] +) -> PQGen[List[List[PGresult]]]: + """Generator to send queries from a connection in pipeline mode while also + receiving results. + + Return a list results, including single PIPELINE_SYNC elements. + """ + results = [] + + while True: + ready = yield Wait.RW + + if ready & Ready.R: + pgconn.consume_input() + while True: + n = pgconn.notifies() + if not n: + break + if pgconn.notify_handler: + pgconn.notify_handler(n) + + res: List[PGresult] = [] + while not pgconn.is_busy(): + r = pgconn.get_result() + if r is None: + if not res: + break + results.append(res) + res = [] + elif r.status == pq.ExecStatus.PIPELINE_SYNC: + assert not res + results.append([r]) + else: + res.append(r) + + if ready & Ready.W: + pgconn.flush() + if not commands: + break + commands.popleft()() + + return results + + _copy_statuses = ( ExecStatus.COPY_IN, ExecStatus.COPY_OUT, diff --git a/psycopg_c/psycopg_c/_psycopg.pyi b/psycopg_c/psycopg_c/_psycopg.pyi index b992cb5e8..f415fda5e 100644 --- a/psycopg_c/psycopg_c/_psycopg.pyi +++ b/psycopg_c/psycopg_c/_psycopg.pyi @@ -12,9 +12,11 @@ from typing import Any, Iterable, List, Optional, Sequence, Tuple from psycopg import pq from psycopg import abc from psycopg.rows import Row, RowMaker +from psycopg.abc import Command from psycopg.adapt import AdaptersMap, PyFormat from psycopg.pq.abc import PGconn, PGresult from psycopg.connection import BaseConnection +from psycopg._compat import Deque class Transformer(abc.AdaptContext): types: Optional[Tuple[int, ...]] @@ -50,6 +52,9 @@ def execute(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ... def send(pgconn: PGconn) -> abc.PQGen[None]: ... def fetch_many(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ... def fetch(pgconn: PGconn) -> abc.PQGen[Optional[PGresult]]: ... +def pipeline_communicate( + pgconn: PGconn, commands: Deque[Command] +) -> abc.PQGen[List[List[PGresult]]]: ... # Copy support def format_row_text( diff --git a/psycopg_c/psycopg_c/_psycopg/generators.pyx b/psycopg_c/psycopg_c/_psycopg/generators.pyx index 951785e74..239494974 100644 --- a/psycopg_c/psycopg_c/_psycopg/generators.pyx +++ b/psycopg_c/psycopg_c/_psycopg/generators.pyx @@ -11,8 +11,9 @@ from typing import List from psycopg import errors as e from psycopg.pq import abc, error_message -from psycopg.abc import PQGen +from psycopg.abc import Command, PQGen from psycopg.waiting import Wait, Ready +from psycopg._compat import Deque from psycopg._encodings import conninfo_encoding cdef object WAIT_W = Wait.W @@ -187,3 +188,72 @@ def fetch(pq.PGconn pgconn) -> PQGen[Optional[PGresult]]: if pgres is NULL: return None return pq.PGresult._from_ptr(pgres) + + +def pipeline_communicate(pq.PGconn pgconn, commands: Deque[Command]) -> PQGen[List[List[PGresult]]]: + """Generator to send queries from a connection in pipeline mode while also + receiving results. + + Return a list results, including single PIPELINE_SYNC elements. + """ + cdef libpq.PGconn *pgconn_ptr = pgconn._pgconn_ptr + cdef object notify_handler = pgconn.notify_handler + cdef libpq.PGnotify *notify + cdef int cires + cdef libpq.PGresult *pgres + cdef list res = [] + cdef list results = [] + cdef pq.PGresult r + + while True: + ready = yield WAIT_RW + + if ready & Ready.R: + pgconn.consume_input() + with nogil: + cires = libpq.PQconsumeInput(pgconn_ptr) + if 1 != cires: + raise e.OperationalError( + f"consuming input failed: {error_message(pgconn)}") + + if notify_handler is not None: + while True: + pynotify = pgconn.notifies() + if pynotify is None: + break + PyObject_CallFunctionObjArgs( + notify_handler, pynotify, NULL + ) + else: + while True: + notify = libpq.PQnotifies(pgconn_ptr) + if notify is NULL: + break + libpq.PQfreemem(notify) + + res: List[PGresult] = [] + while not libpq.PQisBusy(pgconn_ptr): + pgres = libpq.PQgetResult(pgconn_ptr) + if pgres is NULL: + if not res: + break + results.append(res) + res = [] + else: + status = libpq.PQresultStatus(pgres) + r = pq.PGresult._from_ptr(pgres) + if status == libpq.PGRES_PIPELINE_SYNC: + assert not res + results.append([r]) + break + else: + res.append(r) + + + if ready & Ready.W: + libpq.PQflush(pgconn_ptr) + if not commands: + break + commands.popleft()() + + return results diff --git a/tests/fix_psycopg.py b/tests/fix_psycopg.py index dc703e5df..8a4671a9c 100644 --- a/tests/fix_psycopg.py +++ b/tests/fix_psycopg.py @@ -75,3 +75,18 @@ class Tpc: """Return the number of records in the test table.""" cur = self.conn.execute("select count(*) from test_tpc") return cur.fetchone()[0] + + +@pytest.fixture(scope="module") +def generators(): + """Return the 'generators' module for selected psycopg implementation.""" + from psycopg import pq + + if pq.__impl__ == "c": + from psycopg._cmodule import _psycopg + + return _psycopg + else: + import psycopg.generators + + return psycopg.generators diff --git a/tests/scripts/pipeline-demo.py b/tests/scripts/pipeline-demo.py new file mode 100644 index 000000000..b8068476b --- /dev/null +++ b/tests/scripts/pipeline-demo.py @@ -0,0 +1,238 @@ +"""Pipeline mode demo + +This reproduces libpq_pipeline::pipelined_insert PostgreSQL test at +src/test/modules/libpq_pipeline/libpq_pipeline.c::test_pipelined_insert(). + +We do not fetch results explicitly (using cursor.fetch*()), this is +handled by execute() calls when pgconn socket is read-ready, which +happens when the output buffer is full. +""" +import argparse +import asyncio +import logging +from contextlib import contextmanager +from functools import partial +from typing import Any, Iterator, Optional, Sequence, Tuple + +from psycopg import AsyncConnection, Connection +from psycopg import pq, waiting +from psycopg import errors as e +from psycopg.abc import Command +from psycopg.generators import pipeline_communicate +from psycopg.pq._enums import Format +from psycopg._compat import Deque + + +class LoggingPGconn: + """Wrapper for PGconn that logs fetched results.""" + + def __init__(self, pgconn: pq.abc.PGconn, logger: logging.Logger): + self._pgconn = pgconn + self._logger = logger + + def __getattr__(self, name: str) -> Any: + return getattr(self._pgconn, name) + + def send_query(self, command: bytes) -> None: + self._pgconn.send_query(command) + self._logger.info("sent %s", command.decode()) + + def send_query_params( + self, + command: bytes, + param_values: Optional[Sequence[Optional[bytes]]], + param_types: Optional[Sequence[int]] = None, + param_formats: Optional[Sequence[int]] = None, + result_format: int = Format.TEXT, + ) -> None: + self._pgconn.send_query_params( + command, param_values, param_types, param_formats, result_format + ) + self._logger.info("sent %s", command.decode()) + + def send_query_prepared( + self, + name: bytes, + param_values: Optional[Sequence[Optional[bytes]]], + param_formats: Optional[Sequence[int]] = None, + result_format: int = Format.TEXT, + ) -> None: + self._pgconn.send_query_prepared( + name, param_values, param_formats, result_format + ) + self._logger.info("sent prepared '%s' with %s", name.decode(), param_values) + + def send_prepare( + self, + name: bytes, + command: bytes, + param_types: Optional[Sequence[int]] = None, + ) -> None: + self._pgconn.send_prepare(name, command, param_types) + self._logger.info("prepare %s as '%s'", command.decode(), name.decode()) + + def get_result(self) -> Optional[pq.abc.PGresult]: + r = self._pgconn.get_result() + if r is not None: + self._logger.info("got %s result", pq.ExecStatus(r.status).name) + return r + + +@contextmanager +def prepare_pipeline_demo_pq( + pgconn: LoggingPGconn, rows_to_send: int, logger: logging.Logger +) -> Iterator[Tuple[Deque[Command], Deque[str]]]: + """Set up pipeline demo with initial queries and yield commands and + results queue for pipeline_communicate(). + """ + logger.debug("enter pipeline") + pgconn.enter_pipeline_mode() + + setup_queries = [ + ("begin", "BEGIN TRANSACTION"), + ("drop table", "DROP TABLE IF EXISTS pq_pipeline_demo"), + ( + "create table", + ( + "CREATE UNLOGGED TABLE pq_pipeline_demo(" + " id serial primary key," + " itemno integer," + " int8filler int8" + ")" + ), + ), + ( + "prepare", + ("INSERT INTO pq_pipeline_demo(itemno, int8filler)" " VALUES ($1, $2)"), + ), + ] + + commands = Deque[Command]() + results_queue = Deque[str]() + + for qname, query in setup_queries: + if qname == "prepare": + pgconn.send_prepare(qname.encode(), query.encode()) + else: + pgconn.send_query(query.encode()) + results_queue.append(qname) + + committed = False + synced = False + + while True: + if rows_to_send: + params = [f"{rows_to_send}".encode(), f"{1 << 62}".encode()] + commands.append(partial(pgconn.send_query_prepared, b"prepare", params)) + results_queue.append(f"row {rows_to_send}") + rows_to_send -= 1 + + elif not committed: + committed = True + commands.append(partial(pgconn.send_query, b"COMMIT")) + results_queue.append("commit") + + elif not synced: + + def sync() -> None: + pgconn.pipeline_sync() + logger.info("pipeline sync sent") + + synced = True + commands.append(sync) + results_queue.append("sync") + + else: + break + + try: + yield commands, results_queue + finally: + logger.debug("exit pipeline") + pgconn.exit_pipeline_mode() + + +def pipeline_demo_pq(rows_to_send: int, logger: logging.Logger) -> None: + pgconn = LoggingPGconn(Connection.connect().pgconn, logger) + with prepare_pipeline_demo_pq(pgconn, rows_to_send, logger) as ( + commands, + results_queue, + ): + while results_queue: + fetched = waiting.wait( + pipeline_communicate( + pgconn, # type: ignore[arg-type] + commands, + ), + pgconn.socket, + ) + assert not commands, commands + for results in fetched: + results_queue.popleft() + for r in results: + if r.status in ( + pq.ExecStatus.FATAL_ERROR, + pq.ExecStatus.PIPELINE_ABORTED, + ): + raise e.error_from_result(r) + + +async def pipeline_demo_pq_async(rows_to_send: int, logger: logging.Logger) -> None: + pgconn = LoggingPGconn((await AsyncConnection.connect()).pgconn, logger) + + with prepare_pipeline_demo_pq(pgconn, rows_to_send, logger) as ( + commands, + results_queue, + ): + while results_queue: + fetched = await waiting.wait_async( + pipeline_communicate( + pgconn, # type: ignore[arg-type] + commands, + ), + pgconn.socket, + ) + assert not commands, commands + for results in fetched: + results_queue.popleft() + for r in results: + if r.status in ( + pq.ExecStatus.FATAL_ERROR, + pq.ExecStatus.PIPELINE_ABORTED, + ): + raise e.error_from_result(r) + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument( + "-n", + dest="nrows", + metavar="ROWS", + default=10_000, + type=int, + help="number of rows to insert", + ) + parser.add_argument( + "--async", dest="async_", action="store_true", help="use async API" + ) + parser.add_argument("-l", "--log", help="log file (stderr by default)") + args = parser.parse_args() + logger = logging.getLogger("psycopg") + logger.setLevel(logging.DEBUG) + pipeline_logger = logging.getLogger("pipeline") + pipeline_logger.setLevel(logging.DEBUG) + if args.log: + logger.addHandler(logging.FileHandler(args.log)) + pipeline_logger.addHandler(logging.FileHandler(args.log)) + else: + logger.addHandler(logging.StreamHandler()) + pipeline_logger.addHandler(logging.StreamHandler()) + if args.async_: + asyncio.run(pipeline_demo_pq_async(args.nrows, pipeline_logger)) + else: + pipeline_demo_pq(args.nrows, pipeline_logger) + + +if __name__ == "__main__": + main() diff --git a/tests/test_generators.py b/tests/test_generators.py new file mode 100644 index 000000000..c836b428c --- /dev/null +++ b/tests/test_generators.py @@ -0,0 +1,155 @@ +from collections import deque +from functools import partial +from typing import List + +import pytest + +from psycopg import waiting +from psycopg import pq + +from .utils import check_libpq_version + + +@pytest.fixture +def pipeline(pgconn): + if check_libpq_version(pq.version(), ">= 14"): + pytest.skip("require libpq >= 14") + nb, pgconn.nonblocking = pgconn.nonblocking, True + assert pgconn.nonblocking + pgconn.enter_pipeline_mode() + yield + if pgconn.pipeline_status: + pgconn.exit_pipeline_mode() + pgconn.nonblocking = nb + + +def _run_pipeline_communicate(pgconn, generators, commands, expected_statuses): + actual_statuses: List[pq.ExecStatus] = [] + while len(actual_statuses) != len(expected_statuses): + if commands: + gen = generators.pipeline_communicate(pgconn, commands) + results = waiting.wait(gen, pgconn.socket) + for (result,) in results: + actual_statuses.append(result.status) + else: + gen = generators.fetch_many(pgconn) + results = waiting.wait(gen, pgconn.socket) + for result in results: + actual_statuses.append(result.status) + + assert actual_statuses == expected_statuses + + +def test_pipeline_communicate_multi_pipeline(pgconn, pipeline, generators): + commands = deque( + [ + partial(pgconn.send_query, b"select 1"), + pgconn.pipeline_sync, + partial(pgconn.send_query, b"select 2"), + pgconn.pipeline_sync, + ] + ) + expected_statuses = [ + pq.ExecStatus.TUPLES_OK, + pq.ExecStatus.PIPELINE_SYNC, + pq.ExecStatus.TUPLES_OK, + pq.ExecStatus.PIPELINE_SYNC, + ] + _run_pipeline_communicate(pgconn, generators, commands, expected_statuses) + + +def test_pipeline_communicate_no_sync(pgconn, pipeline, generators): + numqueries = 10 + commands = deque( + [partial(pgconn.send_query, b"select repeat('xyzxz', 12)")] * numqueries + + [pgconn.send_flush_request] + ) + expected_statuses = [pq.ExecStatus.TUPLES_OK] * numqueries + _run_pipeline_communicate(pgconn, generators, commands, expected_statuses) + + +@pytest.fixture +def pipeline_demo(pgconn): + if check_libpq_version(pq.version(), ">= 14"): + pytest.skip("require libpq >= 14") + assert pgconn.pipeline_status == 0 + res = pgconn.exec_(b"DROP TABLE IF EXISTS pg_pipeline") + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + res = pgconn.exec_( + b"CREATE UNLOGGED TABLE pg_pipeline(" b" id serial primary key, itemno integer)" + ) + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + yield "pg_pipeline" + res = pgconn.exec_(b"DROP TABLE IF EXISTS pg_pipeline") + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + + +def test_pipeline_communicate_abort(pgconn, pipeline_demo, pipeline, generators): + insert_sql = b"insert into pg_pipeline(itemno) values ($1)" + commands = deque( + [ + partial(pgconn.send_query_params, insert_sql, [b"1"]), + partial(pgconn.send_query, b"select no_such_function(1)"), + partial(pgconn.send_query_params, insert_sql, [b"2"]), + pgconn.pipeline_sync, + partial(pgconn.send_query_params, insert_sql, [b"3"]), + pgconn.pipeline_sync, + ] + ) + expected_statuses = [ + pq.ExecStatus.COMMAND_OK, + pq.ExecStatus.FATAL_ERROR, + pq.ExecStatus.PIPELINE_ABORTED, + pq.ExecStatus.PIPELINE_SYNC, + pq.ExecStatus.COMMAND_OK, + pq.ExecStatus.PIPELINE_SYNC, + ] + _run_pipeline_communicate(pgconn, generators, commands, expected_statuses) + pgconn.exit_pipeline_mode() + res = pgconn.exec_(b"select itemno from pg_pipeline order by itemno") + assert res.ntuples == 1 + assert res.get_value(0, 0) == b"3" + + +@pytest.fixture +def pipeline_uniqviol(pgconn): + if check_libpq_version(pq.version(), ">= 14"): + pytest.skip("require libpq >= 14") + assert pgconn.pipeline_status == 0 + res = pgconn.exec_(b"DROP TABLE IF EXISTS pg_pipeline_uniqviol") + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + res = pgconn.exec_( + b"CREATE UNLOGGED TABLE pg_pipeline_uniqviol(" + b" id bigint primary key, idata bigint)" + ) + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + res = pgconn.exec_(b"BEGIN") + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + res = pgconn.prepare( + b"insertion", + b"insert into pg_pipeline_uniqviol values ($1, $2) returning id", + ) + assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message + return "pg_pipeline_uniqviol" + + +def test_pipeline_communicate_uniqviol(pgconn, pipeline_uniqviol, pipeline, generators): + commands = deque( + [ + partial(pgconn.send_query_prepared, b"insertion", [b"1", b"2"]), + partial(pgconn.send_query_prepared, b"insertion", [b"2", b"2"]), + partial(pgconn.send_query_prepared, b"insertion", [b"1", b"2"]), + partial(pgconn.send_query_prepared, b"insertion", [b"3", b"2"]), + partial(pgconn.send_query_prepared, b"insertion", [b"4", b"2"]), + partial(pgconn.send_query, b"commit"), + ] + ) + expected_statuses = [ + pq.ExecStatus.TUPLES_OK, + pq.ExecStatus.TUPLES_OK, + pq.ExecStatus.FATAL_ERROR, + pq.ExecStatus.PIPELINE_ABORTED, + pq.ExecStatus.PIPELINE_ABORTED, + pq.ExecStatus.PIPELINE_ABORTED, + ] + _run_pipeline_communicate(pgconn, generators, commands, expected_statuses) -- 2.47.2