]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Add pipeline_communicate() generator
authorDenis Laxalde <denis.laxalde@dalibo.com>
Mon, 11 Oct 2021 09:35:06 +0000 (11:35 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 2 Apr 2022 23:17:57 +0000 (01:17 +0200)
This generator will be used as the send operations during execute()
step when the connection is in pipeline mode. It can consume results or
send queries depending on socket read/write ready state. Queries to be
sent are in a queue of Callable[[], None] which are built from partial
application of pgconn.send_query() and similar functions.

We add a couple of unit tests, in test_generators.py along with a test
script is taken from PostgreSQL sources. It demonstrates the use of
pipeline mode where query-send and results-fetch steps are interleaved
without any sync point emitted. (In this test, this works because the
output buffer gets full.) The test writes data to logs. Typically, we'd
get:

    enter pipeline
    sent BEGIN TRANSACTION
    sent DROP TABLE IF EXISTS pq_pipeline_demo
    sent CREATE UNLOGGED TABLE pq_pipeline_demo( id serial primary key, itemno integer, int8filler int8)
    prepare INSERT INTO pq_pipeline_demo(itemno, int8filler) VALUES ($1, $2) as 'prepare'
    sent prepared 'prepare' with [b'10000', b'4611686018427387904']
    sent prepared 'prepare' with [b'9999', b'4611686018427387904']
    sent prepared 'prepare' with [b'9998', b'4611686018427387904']
    sent prepared 'prepare' with [b'9997', b'4611686018427387904']
    sent prepared 'prepare' with [b'9996', b'4611686018427387904']
    sent prepared 'prepare' with [b'9995', b'4611686018427387904']
    sent prepared 'prepare' with [b'9994', b'4611686018427387904']
    sent prepared 'prepare' with [b'9993', b'4611686018427387904']
    sent prepared 'prepare' with [b'9992', b'4611686018427387904']
    ...
    sent prepared 'prepare' with [b'9690', b'4611686018427387904']
    sent prepared 'prepare' with [b'9689', b'4611686018427387904']
    sent prepared 'prepare' with [b'9688', b'4611686018427387904']
    got COMMAND_OK results
    got COMMAND_OK results
    got COMMAND_OK results
    got COMMAND_OK results
    got COMMAND_OK results
    got COMMAND_OK results
    got COMMAND_OK results
    got COMMAND_OK results
    got COMMAND_OK results
    ...
    got COMMAND_OK results
    got COMMAND_OK results
    sent prepared 'prepare' with [b'9687', b'4611686018427387904']
    sent prepared 'prepare' with [b'9686', b'4611686018427387904']
    sent prepared 'prepare' with [b'9685', b'4611686018427387904']
    sent prepared 'prepare' with [b'9684', b'4611686018427387904']
    sent prepared 'prepare' with [b'9683', b'4611686018427387904']
    ...
    sent prepared 'prepare' with [b'2', b'4611686018427387904']
    sent prepared 'prepare' with [b'1', b'4611686018427387904']
    sent COMMIT
    pipeline sync sent
    got COMMAND_OK results
    got COMMAND_OK results
    got COMMAND_OK results
    got COMMAND_OK results
    ...
    got COMMAND_OK results
    got COMMAND_OK results
    got PIPELINE_SYNC results
    exit pipeline

We can see that commands are sent, until the output buffer is full (the
connection is then Read-ready only), then results are fetched, until
more commands can be sent, and the cycle repeat.

psycopg/psycopg/abc.py
psycopg/psycopg/generators.py
psycopg_c/psycopg_c/_psycopg.pyi
psycopg_c/psycopg_c/_psycopg/generators.pyx
tests/fix_psycopg.py
tests/scripts/pipeline-demo.py [new file with mode: 0644]
tests/test_generators.py [new file with mode: 0644]

index 227287a0583709ceffd3e49fe388d370080b802b..42a2614b4293cc95fd81fa53befbd849e510b07e 100644 (file)
@@ -26,6 +26,7 @@ Buffer: TypeAlias = Union[bytes, bytearray, memoryview]
 Query: TypeAlias = Union[str, bytes, "Composable"]
 Params: TypeAlias = Union[Sequence[Any], Mapping[str, Any]]
 ConnectionType = TypeVar("ConnectionType", bound="BaseConnection[Any]")
+Command = Callable[[], None]
 
 # TODO: make it recursive when mypy will support it
 # DumperKey: TypeAlias = Union[type, Tuple[Union[type, "DumperKey"]]]
index 5f8f967fb3bc4e36607775b8430a78de5b788c2b..76583bbf367d9e225f288b790a0a6217d59b1d7c 100644 (file)
@@ -21,9 +21,10 @@ from typing import List, Optional, Union
 from . import pq
 from . import errors as e
 from .pq import ConnStatus, PollingStatus, ExecStatus
-from .abc import PQGen, PQGenConn
+from .abc import Command, PQGen, PQGenConn
 from .pq.abc import PGconn, PGresult
 from .waiting import Wait, Ready
+from ._compat import Deque
 from ._encodings import pgconn_encoding, conninfo_encoding
 
 logger = logging.getLogger(__name__)
@@ -153,6 +154,51 @@ def fetch(pgconn: PGconn) -> PQGen[Optional[PGresult]]:
     return pgconn.get_result()
 
 
+def pipeline_communicate(
+    pgconn: PGconn, commands: Deque[Command]
+) -> PQGen[List[List[PGresult]]]:
+    """Generator to send queries from a connection in pipeline mode while also
+    receiving results.
+
+    Return a list results, including single PIPELINE_SYNC elements.
+    """
+    results = []
+
+    while True:
+        ready = yield Wait.RW
+
+        if ready & Ready.R:
+            pgconn.consume_input()
+            while True:
+                n = pgconn.notifies()
+                if not n:
+                    break
+                if pgconn.notify_handler:
+                    pgconn.notify_handler(n)
+
+            res: List[PGresult] = []
+            while not pgconn.is_busy():
+                r = pgconn.get_result()
+                if r is None:
+                    if not res:
+                        break
+                    results.append(res)
+                    res = []
+                elif r.status == pq.ExecStatus.PIPELINE_SYNC:
+                    assert not res
+                    results.append([r])
+                else:
+                    res.append(r)
+
+        if ready & Ready.W:
+            pgconn.flush()
+            if not commands:
+                break
+            commands.popleft()()
+
+    return results
+
+
 _copy_statuses = (
     ExecStatus.COPY_IN,
     ExecStatus.COPY_OUT,
index b992cb5e8a2bc1e8b96729f2d48ae596cf8923e4..f415fda5e6831cd7b211a15dd125360ee69447b0 100644 (file)
@@ -12,9 +12,11 @@ from typing import Any, Iterable, List, Optional, Sequence, Tuple
 from psycopg import pq
 from psycopg import abc
 from psycopg.rows import Row, RowMaker
+from psycopg.abc import Command
 from psycopg.adapt import AdaptersMap, PyFormat
 from psycopg.pq.abc import PGconn, PGresult
 from psycopg.connection import BaseConnection
+from psycopg._compat import Deque
 
 class Transformer(abc.AdaptContext):
     types: Optional[Tuple[int, ...]]
@@ -50,6 +52,9 @@ def execute(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ...
 def send(pgconn: PGconn) -> abc.PQGen[None]: ...
 def fetch_many(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ...
 def fetch(pgconn: PGconn) -> abc.PQGen[Optional[PGresult]]: ...
+def pipeline_communicate(
+    pgconn: PGconn, commands: Deque[Command]
+) -> abc.PQGen[List[List[PGresult]]]: ...
 
 # Copy support
 def format_row_text(
index 951785e741da747b58cefd4aefb77fdd92f601be..239494974ddf74d8fd74ce2091a9f8b713bbec16 100644 (file)
@@ -11,8 +11,9 @@ from typing import List
 
 from psycopg import errors as e
 from psycopg.pq import abc, error_message
-from psycopg.abc import PQGen
+from psycopg.abc import Command, PQGen
 from psycopg.waiting import Wait, Ready
+from psycopg._compat import Deque
 from psycopg._encodings import conninfo_encoding
 
 cdef object WAIT_W = Wait.W
@@ -187,3 +188,72 @@ def fetch(pq.PGconn pgconn) -> PQGen[Optional[PGresult]]:
     if pgres is NULL:
         return None
     return pq.PGresult._from_ptr(pgres)
+
+
+def pipeline_communicate(pq.PGconn pgconn, commands: Deque[Command]) -> PQGen[List[List[PGresult]]]:
+    """Generator to send queries from a connection in pipeline mode while also
+    receiving results.
+
+    Return a list results, including single PIPELINE_SYNC elements.
+    """
+    cdef libpq.PGconn *pgconn_ptr = pgconn._pgconn_ptr
+    cdef object notify_handler = pgconn.notify_handler
+    cdef libpq.PGnotify *notify
+    cdef int cires
+    cdef libpq.PGresult *pgres
+    cdef list res = []
+    cdef list results = []
+    cdef pq.PGresult r
+
+    while True:
+        ready = yield WAIT_RW
+
+        if ready & Ready.R:
+            pgconn.consume_input()
+            with nogil:
+                cires = libpq.PQconsumeInput(pgconn_ptr)
+            if 1 != cires:
+                raise e.OperationalError(
+                    f"consuming input failed: {error_message(pgconn)}")
+
+            if notify_handler is not None:
+                while True:
+                    pynotify = pgconn.notifies()
+                    if pynotify is None:
+                        break
+                    PyObject_CallFunctionObjArgs(
+                        notify_handler, <PyObject *>pynotify, NULL
+                    )
+            else:
+                while True:
+                    notify = libpq.PQnotifies(pgconn_ptr)
+                    if notify is NULL:
+                        break
+                    libpq.PQfreemem(notify)
+
+            res: List[PGresult] = []
+            while not libpq.PQisBusy(pgconn_ptr):
+                pgres = libpq.PQgetResult(pgconn_ptr)
+                if pgres is NULL:
+                    if not res:
+                        break
+                    results.append(res)
+                    res = []
+                else:
+                    status = libpq.PQresultStatus(pgres)
+                    r = pq.PGresult._from_ptr(pgres)
+                    if status == libpq.PGRES_PIPELINE_SYNC:
+                        assert not res
+                        results.append([r])
+                        break
+                    else:
+                        res.append(r)
+
+
+        if ready & Ready.W:
+            libpq.PQflush(pgconn_ptr)
+            if not commands:
+                break
+            commands.popleft()()
+
+    return results
index dc703e5df1f5a0bdbddacde59f3e1eb2d22249af..8a4671a9c973d152d84f3efc5aa1cb39956e13d9 100644 (file)
@@ -75,3 +75,18 @@ class Tpc:
         """Return the number of records in the test table."""
         cur = self.conn.execute("select count(*) from test_tpc")
         return cur.fetchone()[0]
+
+
+@pytest.fixture(scope="module")
+def generators():
+    """Return the 'generators' module for selected psycopg implementation."""
+    from psycopg import pq
+
+    if pq.__impl__ == "c":
+        from psycopg._cmodule import _psycopg
+
+        return _psycopg
+    else:
+        import psycopg.generators
+
+        return psycopg.generators
diff --git a/tests/scripts/pipeline-demo.py b/tests/scripts/pipeline-demo.py
new file mode 100644 (file)
index 0000000..b806847
--- /dev/null
@@ -0,0 +1,238 @@
+"""Pipeline mode demo
+
+This reproduces libpq_pipeline::pipelined_insert PostgreSQL test at
+src/test/modules/libpq_pipeline/libpq_pipeline.c::test_pipelined_insert().
+
+We do not fetch results explicitly (using cursor.fetch*()), this is
+handled by execute() calls when pgconn socket is read-ready, which
+happens when the output buffer is full.
+"""
+import argparse
+import asyncio
+import logging
+from contextlib import contextmanager
+from functools import partial
+from typing import Any, Iterator, Optional, Sequence, Tuple
+
+from psycopg import AsyncConnection, Connection
+from psycopg import pq, waiting
+from psycopg import errors as e
+from psycopg.abc import Command
+from psycopg.generators import pipeline_communicate
+from psycopg.pq._enums import Format
+from psycopg._compat import Deque
+
+
+class LoggingPGconn:
+    """Wrapper for PGconn that logs fetched results."""
+
+    def __init__(self, pgconn: pq.abc.PGconn, logger: logging.Logger):
+        self._pgconn = pgconn
+        self._logger = logger
+
+    def __getattr__(self, name: str) -> Any:
+        return getattr(self._pgconn, name)
+
+    def send_query(self, command: bytes) -> None:
+        self._pgconn.send_query(command)
+        self._logger.info("sent %s", command.decode())
+
+    def send_query_params(
+        self,
+        command: bytes,
+        param_values: Optional[Sequence[Optional[bytes]]],
+        param_types: Optional[Sequence[int]] = None,
+        param_formats: Optional[Sequence[int]] = None,
+        result_format: int = Format.TEXT,
+    ) -> None:
+        self._pgconn.send_query_params(
+            command, param_values, param_types, param_formats, result_format
+        )
+        self._logger.info("sent %s", command.decode())
+
+    def send_query_prepared(
+        self,
+        name: bytes,
+        param_values: Optional[Sequence[Optional[bytes]]],
+        param_formats: Optional[Sequence[int]] = None,
+        result_format: int = Format.TEXT,
+    ) -> None:
+        self._pgconn.send_query_prepared(
+            name, param_values, param_formats, result_format
+        )
+        self._logger.info("sent prepared '%s' with %s", name.decode(), param_values)
+
+    def send_prepare(
+        self,
+        name: bytes,
+        command: bytes,
+        param_types: Optional[Sequence[int]] = None,
+    ) -> None:
+        self._pgconn.send_prepare(name, command, param_types)
+        self._logger.info("prepare %s as '%s'", command.decode(), name.decode())
+
+    def get_result(self) -> Optional[pq.abc.PGresult]:
+        r = self._pgconn.get_result()
+        if r is not None:
+            self._logger.info("got %s result", pq.ExecStatus(r.status).name)
+        return r
+
+
+@contextmanager
+def prepare_pipeline_demo_pq(
+    pgconn: LoggingPGconn, rows_to_send: int, logger: logging.Logger
+) -> Iterator[Tuple[Deque[Command], Deque[str]]]:
+    """Set up pipeline demo with initial queries and yield commands and
+    results queue for pipeline_communicate().
+    """
+    logger.debug("enter pipeline")
+    pgconn.enter_pipeline_mode()
+
+    setup_queries = [
+        ("begin", "BEGIN TRANSACTION"),
+        ("drop table", "DROP TABLE IF EXISTS pq_pipeline_demo"),
+        (
+            "create table",
+            (
+                "CREATE UNLOGGED TABLE pq_pipeline_demo("
+                " id serial primary key,"
+                " itemno integer,"
+                " int8filler int8"
+                ")"
+            ),
+        ),
+        (
+            "prepare",
+            ("INSERT INTO pq_pipeline_demo(itemno, int8filler)" " VALUES ($1, $2)"),
+        ),
+    ]
+
+    commands = Deque[Command]()
+    results_queue = Deque[str]()
+
+    for qname, query in setup_queries:
+        if qname == "prepare":
+            pgconn.send_prepare(qname.encode(), query.encode())
+        else:
+            pgconn.send_query(query.encode())
+        results_queue.append(qname)
+
+    committed = False
+    synced = False
+
+    while True:
+        if rows_to_send:
+            params = [f"{rows_to_send}".encode(), f"{1 << 62}".encode()]
+            commands.append(partial(pgconn.send_query_prepared, b"prepare", params))
+            results_queue.append(f"row {rows_to_send}")
+            rows_to_send -= 1
+
+        elif not committed:
+            committed = True
+            commands.append(partial(pgconn.send_query, b"COMMIT"))
+            results_queue.append("commit")
+
+        elif not synced:
+
+            def sync() -> None:
+                pgconn.pipeline_sync()
+                logger.info("pipeline sync sent")
+
+            synced = True
+            commands.append(sync)
+            results_queue.append("sync")
+
+        else:
+            break
+
+    try:
+        yield commands, results_queue
+    finally:
+        logger.debug("exit pipeline")
+        pgconn.exit_pipeline_mode()
+
+
+def pipeline_demo_pq(rows_to_send: int, logger: logging.Logger) -> None:
+    pgconn = LoggingPGconn(Connection.connect().pgconn, logger)
+    with prepare_pipeline_demo_pq(pgconn, rows_to_send, logger) as (
+        commands,
+        results_queue,
+    ):
+        while results_queue:
+            fetched = waiting.wait(
+                pipeline_communicate(
+                    pgconn,  # type: ignore[arg-type]
+                    commands,
+                ),
+                pgconn.socket,
+            )
+            assert not commands, commands
+            for results in fetched:
+                results_queue.popleft()
+                for r in results:
+                    if r.status in (
+                        pq.ExecStatus.FATAL_ERROR,
+                        pq.ExecStatus.PIPELINE_ABORTED,
+                    ):
+                        raise e.error_from_result(r)
+
+
+async def pipeline_demo_pq_async(rows_to_send: int, logger: logging.Logger) -> None:
+    pgconn = LoggingPGconn((await AsyncConnection.connect()).pgconn, logger)
+
+    with prepare_pipeline_demo_pq(pgconn, rows_to_send, logger) as (
+        commands,
+        results_queue,
+    ):
+        while results_queue:
+            fetched = await waiting.wait_async(
+                pipeline_communicate(
+                    pgconn,  # type: ignore[arg-type]
+                    commands,
+                ),
+                pgconn.socket,
+            )
+            assert not commands, commands
+            for results in fetched:
+                results_queue.popleft()
+                for r in results:
+                    if r.status in (
+                        pq.ExecStatus.FATAL_ERROR,
+                        pq.ExecStatus.PIPELINE_ABORTED,
+                    ):
+                        raise e.error_from_result(r)
+
+
+def main() -> None:
+    parser = argparse.ArgumentParser()
+    parser.add_argument(
+        "-n",
+        dest="nrows",
+        metavar="ROWS",
+        default=10_000,
+        type=int,
+        help="number of rows to insert",
+    )
+    parser.add_argument(
+        "--async", dest="async_", action="store_true", help="use async API"
+    )
+    parser.add_argument("-l", "--log", help="log file (stderr by default)")
+    args = parser.parse_args()
+    logger = logging.getLogger("psycopg")
+    logger.setLevel(logging.DEBUG)
+    pipeline_logger = logging.getLogger("pipeline")
+    pipeline_logger.setLevel(logging.DEBUG)
+    if args.log:
+        logger.addHandler(logging.FileHandler(args.log))
+        pipeline_logger.addHandler(logging.FileHandler(args.log))
+    else:
+        logger.addHandler(logging.StreamHandler())
+        pipeline_logger.addHandler(logging.StreamHandler())
+    if args.async_:
+        asyncio.run(pipeline_demo_pq_async(args.nrows, pipeline_logger))
+    else:
+        pipeline_demo_pq(args.nrows, pipeline_logger)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/tests/test_generators.py b/tests/test_generators.py
new file mode 100644 (file)
index 0000000..c836b42
--- /dev/null
@@ -0,0 +1,155 @@
+from collections import deque
+from functools import partial
+from typing import List
+
+import pytest
+
+from psycopg import waiting
+from psycopg import pq
+
+from .utils import check_libpq_version
+
+
+@pytest.fixture
+def pipeline(pgconn):
+    if check_libpq_version(pq.version(), ">= 14"):
+        pytest.skip("require libpq >= 14")
+    nb, pgconn.nonblocking = pgconn.nonblocking, True
+    assert pgconn.nonblocking
+    pgconn.enter_pipeline_mode()
+    yield
+    if pgconn.pipeline_status:
+        pgconn.exit_pipeline_mode()
+    pgconn.nonblocking = nb
+
+
+def _run_pipeline_communicate(pgconn, generators, commands, expected_statuses):
+    actual_statuses: List[pq.ExecStatus] = []
+    while len(actual_statuses) != len(expected_statuses):
+        if commands:
+            gen = generators.pipeline_communicate(pgconn, commands)
+            results = waiting.wait(gen, pgconn.socket)
+            for (result,) in results:
+                actual_statuses.append(result.status)
+        else:
+            gen = generators.fetch_many(pgconn)
+            results = waiting.wait(gen, pgconn.socket)
+            for result in results:
+                actual_statuses.append(result.status)
+
+    assert actual_statuses == expected_statuses
+
+
+def test_pipeline_communicate_multi_pipeline(pgconn, pipeline, generators):
+    commands = deque(
+        [
+            partial(pgconn.send_query, b"select 1"),
+            pgconn.pipeline_sync,
+            partial(pgconn.send_query, b"select 2"),
+            pgconn.pipeline_sync,
+        ]
+    )
+    expected_statuses = [
+        pq.ExecStatus.TUPLES_OK,
+        pq.ExecStatus.PIPELINE_SYNC,
+        pq.ExecStatus.TUPLES_OK,
+        pq.ExecStatus.PIPELINE_SYNC,
+    ]
+    _run_pipeline_communicate(pgconn, generators, commands, expected_statuses)
+
+
+def test_pipeline_communicate_no_sync(pgconn, pipeline, generators):
+    numqueries = 10
+    commands = deque(
+        [partial(pgconn.send_query, b"select repeat('xyzxz', 12)")] * numqueries
+        + [pgconn.send_flush_request]
+    )
+    expected_statuses = [pq.ExecStatus.TUPLES_OK] * numqueries
+    _run_pipeline_communicate(pgconn, generators, commands, expected_statuses)
+
+
+@pytest.fixture
+def pipeline_demo(pgconn):
+    if check_libpq_version(pq.version(), ">= 14"):
+        pytest.skip("require libpq >= 14")
+    assert pgconn.pipeline_status == 0
+    res = pgconn.exec_(b"DROP TABLE IF EXISTS pg_pipeline")
+    assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+    res = pgconn.exec_(
+        b"CREATE UNLOGGED TABLE pg_pipeline(" b" id serial primary key, itemno integer)"
+    )
+    assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+    yield "pg_pipeline"
+    res = pgconn.exec_(b"DROP TABLE IF EXISTS pg_pipeline")
+    assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+
+
+def test_pipeline_communicate_abort(pgconn, pipeline_demo, pipeline, generators):
+    insert_sql = b"insert into pg_pipeline(itemno) values ($1)"
+    commands = deque(
+        [
+            partial(pgconn.send_query_params, insert_sql, [b"1"]),
+            partial(pgconn.send_query, b"select no_such_function(1)"),
+            partial(pgconn.send_query_params, insert_sql, [b"2"]),
+            pgconn.pipeline_sync,
+            partial(pgconn.send_query_params, insert_sql, [b"3"]),
+            pgconn.pipeline_sync,
+        ]
+    )
+    expected_statuses = [
+        pq.ExecStatus.COMMAND_OK,
+        pq.ExecStatus.FATAL_ERROR,
+        pq.ExecStatus.PIPELINE_ABORTED,
+        pq.ExecStatus.PIPELINE_SYNC,
+        pq.ExecStatus.COMMAND_OK,
+        pq.ExecStatus.PIPELINE_SYNC,
+    ]
+    _run_pipeline_communicate(pgconn, generators, commands, expected_statuses)
+    pgconn.exit_pipeline_mode()
+    res = pgconn.exec_(b"select itemno from pg_pipeline order by itemno")
+    assert res.ntuples == 1
+    assert res.get_value(0, 0) == b"3"
+
+
+@pytest.fixture
+def pipeline_uniqviol(pgconn):
+    if check_libpq_version(pq.version(), ">= 14"):
+        pytest.skip("require libpq >= 14")
+    assert pgconn.pipeline_status == 0
+    res = pgconn.exec_(b"DROP TABLE IF EXISTS pg_pipeline_uniqviol")
+    assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+    res = pgconn.exec_(
+        b"CREATE UNLOGGED TABLE pg_pipeline_uniqviol("
+        b" id bigint primary key, idata bigint)"
+    )
+    assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+    res = pgconn.exec_(b"BEGIN")
+    assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+    res = pgconn.prepare(
+        b"insertion",
+        b"insert into pg_pipeline_uniqviol values ($1, $2) returning id",
+    )
+    assert res.status == pq.ExecStatus.COMMAND_OK, res.error_message
+    return "pg_pipeline_uniqviol"
+
+
+def test_pipeline_communicate_uniqviol(pgconn, pipeline_uniqviol, pipeline, generators):
+    commands = deque(
+        [
+            partial(pgconn.send_query_prepared, b"insertion", [b"1", b"2"]),
+            partial(pgconn.send_query_prepared, b"insertion", [b"2", b"2"]),
+            partial(pgconn.send_query_prepared, b"insertion", [b"1", b"2"]),
+            partial(pgconn.send_query_prepared, b"insertion", [b"3", b"2"]),
+            partial(pgconn.send_query_prepared, b"insertion", [b"4", b"2"]),
+            partial(pgconn.send_query, b"commit"),
+        ]
+    )
+    expected_statuses = [
+        pq.ExecStatus.TUPLES_OK,
+        pq.ExecStatus.TUPLES_OK,
+        pq.ExecStatus.FATAL_ERROR,
+        pq.ExecStatus.PIPELINE_ABORTED,
+        pq.ExecStatus.PIPELINE_ABORTED,
+        pq.ExecStatus.PIPELINE_ABORTED,
+    ]
+    _run_pipeline_communicate(pgconn, generators, commands, expected_statuses)