]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Add libpq interface for pipeline mode
authorDenis Laxalde <denis.laxalde@dalibo.com>
Tue, 22 Jun 2021 14:07:47 +0000 (16:07 +0200)
committerDenis Laxalde <denis.laxalde@dalibo.com>
Fri, 1 Oct 2021 14:32:09 +0000 (16:32 +0200)
Bindings are conditional so that compilation works with libpq version
older than 14.

Add a couple of tests inspired by PostgreSQL's tests.

psycopg/psycopg/pq/__init__.py
psycopg/psycopg/pq/_enums.py
psycopg/psycopg/pq/_pq_ctypes.py
psycopg/psycopg/pq/_pq_ctypes.pyi
psycopg/psycopg/pq/abc.py
psycopg/psycopg/pq/pq_ctypes.py
psycopg_c/psycopg_c/pq/libpq.pxd
psycopg_c/psycopg_c/pq/pgconn.pyx
tests/pq/test_pipeline.py [new file with mode: 0644]

index 5b46c36dff5a376e410f3593c4566a0f2f91d95a..bc1c70dfe9a3218bfa21ee3a5c63a5b4bd576025 100644 (file)
@@ -17,7 +17,7 @@ from . import abc
 from .misc import ConninfoOption, PGnotify, PGresAttDesc
 from .misc import error_message
 from ._enums import ConnStatus, DiagnosticField, ExecStatus, Format
-from ._enums import Ping, PollingStatus, TransactionStatus
+from ._enums import Ping, PipelineStatus, PollingStatus, TransactionStatus
 
 logger = logging.getLogger(__name__)
 
@@ -115,6 +115,7 @@ import_from_libpq()
 
 __all__ = (
     "ConnStatus",
+    "PipelineStatus",
     "PollingStatus",
     "TransactionStatus",
     "ExecStatus",
index 1ea4e7e01016bbb3f443b390b80aa4eafbcf57ff..8eca77b453fa127187691f596030a10ee9f3b659 100644 (file)
@@ -102,6 +102,23 @@ class ExecStatus(IntEnum):
     query.
     """
 
+    PIPELINE_SYNC = auto()
+    """
+    The PGresult represents a synchronization point in pipeline mode,
+    requested by PQpipelineSync.
+
+    This status occurs only when pipeline mode has been selected.
+    """
+
+    PIPELINE_ABORTED = auto()
+    """
+    The PGresult represents a pipeline that has received an error from the server.
+
+    PQgetResult must be called repeatedly, and each time it will return this
+    status code until the end of the current pipeline, at which point it will
+    return PGRES_PIPELINE_SYNC and normal processing can resume.
+    """
+
 
 class TransactionStatus(IntEnum):
     """
@@ -152,6 +169,27 @@ class Ping(IntEnum):
     """
 
 
+class PipelineStatus(IntEnum):
+    """Pipeline mode status of the libpq connection."""
+
+    __module__ = "psycopg.pq"
+
+    OFF = 0
+    """
+    The libpq connection is *not* in pipeline mode.
+    """
+    ON = auto()
+    """
+    The libpq connection is in pipeline mode.
+    """
+    ABORTED = auto()
+    """
+    The libpq connection is in pipeline mode and an error occurred while
+    processing the current pipeline. The aborted flag is cleared when
+    PQgetResult returns a result of type PGRES_PIPELINE_SYNC.
+    """
+
+
 class DiagnosticField(IntEnum):
     """
     Fields in an error report.
index 79d6d9a59698a03d1326ef2f254f610ea5a4c6de..d7f326ed75b659a1b8966ee67b0c3b7a2859ca93 100644 (file)
@@ -597,6 +597,80 @@ PQsetNoticeReceiver = pq.PQsetNoticeReceiver
 PQsetNoticeReceiver.argtypes = [PGconn_ptr, PQnoticeReceiver, c_void_p]
 PQsetNoticeReceiver.restype = PQnoticeReceiver
 
+# 34.5 Pipeline Mode
+
+_PQpipelineStatus = None
+_PQenterPipelineMode = None
+_PQexitPipelineMode = None
+_PQpipelineSync = None
+_PQsendFlushRequest = None
+
+if libpq_version >= 140000:
+    _PQpipelineStatus = pq.PQpipelineStatus
+    _PQpipelineStatus.argtypes = [PGconn_ptr]
+    _PQpipelineStatus.restype = c_int
+
+    _PQenterPipelineMode = pq.PQenterPipelineMode
+    _PQenterPipelineMode.argtypes = [PGconn_ptr]
+    _PQenterPipelineMode.restype = c_int
+
+    _PQexitPipelineMode = pq.PQexitPipelineMode
+    _PQexitPipelineMode.argtypes = [PGconn_ptr]
+    _PQexitPipelineMode.restype = c_int
+
+    _PQpipelineSync = pq.PQpipelineSync
+    _PQpipelineSync.argtypes = [PGconn_ptr]
+    _PQpipelineSync.restype = c_int
+
+    _PQsendFlushRequest = pq.PQsendFlushRequest
+    _PQsendFlushRequest.argtypes = [PGconn_ptr]
+    _PQsendFlushRequest.restype = c_int
+
+
+def PQpipelineStatus(pgconn: PGconn_struct) -> int:
+    if not _PQpipelineStatus:
+        raise NotSupportedError(
+            f"PQpipelineStatus requires libpq from PostgreSQL 14,"
+            f" {libpq_version} available instead"
+        )
+    return _PQpipelineStatus(pgconn)
+
+
+def PQenterPipelineMode(pgconn: PGconn_struct) -> int:
+    if not _PQenterPipelineMode:
+        raise NotSupportedError(
+            f"PQenterPipelineMode requires libpq from PostgreSQL 14,"
+            f" {libpq_version} available instead"
+        )
+    return _PQenterPipelineMode(pgconn)
+
+
+def PQexitPipelineMode(pgconn: PGconn_struct) -> int:
+    if not _PQexitPipelineMode:
+        raise NotSupportedError(
+            f"PQexitPipelineMode requires libpq from PostgreSQL 14,"
+            f" {libpq_version} available instead"
+        )
+    return _PQexitPipelineMode(pgconn)
+
+
+def PQpipelineSync(pgconn: PGconn_struct) -> int:
+    if not _PQpipelineSync:
+        raise NotSupportedError(
+            f"PQpipelineSync requires libpq from PostgreSQL 14,"
+            f" {libpq_version} available instead"
+        )
+    return _PQpipelineSync(pgconn)
+
+
+def PQsendFlushRequest(pgconn: PGconn_struct) -> int:
+    if not _PQsendFlushRequest:
+        raise NotSupportedError(
+            f"PQsendFlushRequest requires libpq from PostgreSQL 14,"
+            f" {libpq_version} available instead"
+        )
+    return _PQsendFlushRequest(pgconn)
+
 
 # 33.18. SSL Support
 
index e55bfc2e96c783d548588f05e00f87ae448bf1f5..c2fe57ca640f09a886c18b2a00fc13651be54b38 100644 (file)
@@ -116,6 +116,11 @@ def PQencryptPasswordConn(
     arg3: bytes,
     arg4: Optional[bytes],
 ) -> bytes: ...
+def PQpipelineStatus(pgconn: Optional[PGconn_struct]) -> int: ...
+def PQenterPipelineMode(pgconn: Optional[PGconn_struct]) -> int: ...
+def PQexitPipelineMode(pgconn: Optional[PGconn_struct]) -> int: ...
+def PQpipelineSync(pgconn: Optional[PGconn_struct]) -> int: ...
+def PQsendFlushRequest(pgconn: Optional[PGconn_struct]) -> int: ...
 
 # fmt: off
 # autogenerated: start
@@ -194,6 +199,11 @@ def PQfreeCancel(arg1: Optional[PGcancel_struct]) -> None: ...
 def PQputCopyData(arg1: Optional[PGconn_struct], arg2: bytes, arg3: int) -> int: ...
 def PQfreemem(arg1: Any) -> None: ...
 def PQmakeEmptyPGresult(arg1: Optional[PGconn_struct], arg2: int) -> PGresult_struct: ...
+def _PQpipelineStatus(arg1: Optional[PGconn_struct]) -> int: ...
+def _PQenterPipelineMode(arg1: Optional[PGconn_struct]) -> int: ...
+def _PQexitPipelineMode(arg1: Optional[PGconn_struct]) -> int: ...
+def _PQpipelineSync(arg1: Optional[PGconn_struct]) -> int: ...
+def _PQsendFlushRequest(arg1: Optional[PGconn_struct]) -> int: ...
 def PQinitOpenSSL(arg1: int, arg2: int) -> None: ...
 # autogenerated: end
 # fmt: on
index 1423ed48553523a96bc25d298808a3c8e2b33605..a7f5d847c9dc73e31cd74a7e16d41571a7bab595 100644 (file)
@@ -4,10 +4,10 @@ Protocol objects to represent objects exposed by different pq implementations.
 
 # Copyright (C) 2020-2021 The Psycopg Team
 
-from typing import Any, Callable, List, Optional, Sequence, Tuple, Union
-from typing import TYPE_CHECKING
+from typing import Any, Callable, List, Optional, Sequence, Tuple
+from typing import Union, TYPE_CHECKING
 
-from ._enums import Format
+from ._enums import Format, PipelineStatus
 from .._compat import Protocol
 
 if TYPE_CHECKING:
@@ -242,6 +242,18 @@ class PGconn(Protocol):
     def make_empty_result(self, exec_status: int) -> "PGresult":
         ...
 
+    def pipeline_status(self) -> PipelineStatus:
+        ...
+
+    def enter_pipeline_mode(self) -> None:
+        ...
+
+    def exit_pipeline_mode(self) -> None:
+        ...
+
+    def pipeline_sync(self) -> None:
+        ...
+
 
 class PGresult(Protocol):
     def clear(self) -> None:
index 5c8bb0ba80e3aed113e409bcebf9807c34f89d1c..0b18f31e7a049f68564a6465d6a57e41773c46d3 100644 (file)
@@ -22,7 +22,7 @@ from .. import errors as e
 from . import _pq_ctypes as impl
 from .misc import PGnotify, ConninfoOption, PGresAttDesc
 from .misc import error_message, connection_summary
-from ._enums import Format, ExecStatus
+from ._enums import Format, ExecStatus, PipelineStatus
 
 if TYPE_CHECKING:
     from . import abc
@@ -629,6 +629,39 @@ class PGconn:
             raise MemoryError("couldn't allocate empty PGresult")
         return PGresult(rv)
 
+    def pipeline_status(self) -> PipelineStatus:
+        return PipelineStatus(impl.PQpipelineStatus(self._pgconn_ptr))
+
+    def enter_pipeline_mode(self) -> None:
+        """Enter pipeline mode.
+
+        :raises ~e.OperationalError: in case of failure to enter the pipeline
+            mode.
+        """
+        if impl.PQenterPipelineMode(self._pgconn_ptr) != 1:
+            raise e.OperationalError("failed to enter pipeline mode")
+
+    def exit_pipeline_mode(self) -> None:
+        """Exit pipeline mode.
+
+        :raises ~e.OperationalError: in case of failure to exit the pipeline
+            mode.
+        """
+        if impl.PQexitPipelineMode(self._pgconn_ptr) != 1:
+            raise e.OperationalError(error_message(self))
+
+    def pipeline_sync(self) -> None:
+        """Mark a synchronization point in a pipeline.
+
+        :raises ~e.OperationalError: if the connection is not in pipeline mode
+            or if sync failed.
+        """
+        rv = impl.PQpipelineSync(self._pgconn_ptr)
+        if rv == 0:
+            raise e.OperationalError("connection not in pipeline mode")
+        if rv != 1:
+            raise e.OperationalError("failed to sync pipeline")
+
     def _call_bytes(
         self, func: Callable[[impl.PGconn_struct], Optional[bytes]]
     ) -> bytes:
index 40b36e98e9b9d15ba5a78fde5b4c7e58a2179601..9fed6018b48955a477f6aadbd76d3a9d00e4f32e 100644 (file)
@@ -267,6 +267,18 @@ cdef extern from "libpq-fe.h":
     # 33.18. SSL Support
     void PQinitOpenSSL(int do_ssl, int do_crypto)
 
+    # 34.5 Pipeline Mode
+
+    ctypedef enum PGpipelineStatus:
+        PQ_PIPELINE_OFF
+        PQ_PIPELINE_ON
+        PQ_PIPELINE_ABORTED
+
+    PGpipelineStatus PQpipelineStatus(const PGconn *conn)
+    int PQenterPipelineMode(PGconn *conn)
+    int PQexitPipelineMode(PGconn *conn)
+    int PQpipelineSync(PGconn *conn)
+    int PQsendFlushRequest(PGconn *conn)
 
 cdef extern from *:
     """
@@ -278,4 +290,17 @@ cdef extern from *:
 #if PG_VERSION_NUM < 120000
 #define PQhostaddr(conn) NULL
 #endif
+
+#if PG_VERSION_NUM < 140000
+typedef enum {
+    PQ_PIPELINE_OFF,
+    PQ_PIPELINE_ON,
+    PQ_PIPELINE_ABORTED
+} PGpipelineStatus;
+#define PQpipelineStatus(conn) NULL
+#define PQenterPipelineMode(conn) NULL
+#define PQexitPipelineMode(conn) NULL
+#define PQpipelineSync(conn) NULL
+#define PQsendFlushRequest(conn) NULL
+#endif
 """
index 4062ed8d8228caff24377dde37bd9fcc04466b9d..c0bd782532ae268dac95e4f815788f113d1d7b8a 100644 (file)
@@ -23,8 +23,9 @@ from cpython.memoryview cimport PyMemoryView_FromObject
 
 import ctypes
 import logging
+from typing import Iterator
 
-from psycopg.pq import Format as PqFormat
+from psycopg.pq import Format as PqFormat, PipelineStatus
 from psycopg.pq.misc import PGnotify, connection_summary
 from psycopg_c.pq cimport PQBuffer
 
@@ -535,6 +536,61 @@ cdef class PGconn:
             raise MemoryError("couldn't allocate empty PGresult")
         return PGresult._from_ptr(rv)
 
+    def pipeline_status(self) -> PipelineStatus:
+        """Return the current pipeline mode status."""
+        if libpq.PG_VERSION_NUM < 140000:
+            raise e.NotSupportedError(
+                f"PQpipelineStatus requires libpq from PostgreSQL 14,"
+                f" {libpq.PG_VERSION_NUM} available instead"
+            )
+        cdef int status = libpq.PQpipelineStatus(self._pgconn_ptr)
+        return PipelineStatus(status)
+
+    def enter_pipeline_mode(self) -> None:
+        """Enter pipeline mode.
+
+        :raises ~e.OperationalError: in case of failure to enter the pipeline
+            mode.
+        """
+        if libpq.PG_VERSION_NUM < 140000:
+            raise e.NotSupportedError(
+                f"PQenterPipelineMode requires libpq from PostgreSQL 14,"
+                f" {libpq.PG_VERSION_NUM} available instead"
+            )
+        if libpq.PQenterPipelineMode(self._pgconn_ptr) != 1:
+            raise e.OperationalError("failed to enter pipeline mode")
+
+    def exit_pipeline_mode(self) -> None:
+        """Exit pipeline mode.
+
+        :raises ~e.OperationalError: in case of failure to exit the pipeline
+            mode.
+        """
+        if libpq.PG_VERSION_NUM < 140000:
+            raise e.NotSupportedError(
+                f"PQexitPipelineMode requires libpq from PostgreSQL 14,"
+                f" {libpq.PG_VERSION_NUM} available instead"
+            )
+        if libpq.PQexitPipelineMode(self._pgconn_ptr) != 1:
+            raise e.OperationalError(error_message(self))
+
+    def pipeline_sync(self) -> None:
+        """Mark a synchronization point in a pipeline.
+
+        :raises ~e.OperationalError: if the connection is not in pipeline mode
+            or if sync failed.
+        """
+        if libpq.PG_VERSION_NUM < 140000:
+            raise e.NotSupportedError(
+                f"PQpipelineSync requires libpq from PostgreSQL 14,"
+                f" {libpq.PG_VERSION_NUM} available instead"
+            )
+        rv = libpq.PQpipelineSync(self._pgconn_ptr)
+        if rv == 0:
+            raise e.OperationalError("connection not in pipeline mode")
+        if rv != 1:
+            raise e.OperationalError("failed to sync pipeline")
+
 
 cdef int _ensure_pgconn(PGconn pgconn) except 0:
     if pgconn._pgconn_ptr is not NULL:
diff --git a/tests/pq/test_pipeline.py b/tests/pq/test_pipeline.py
new file mode 100644 (file)
index 0000000..5fe879d
--- /dev/null
@@ -0,0 +1,147 @@
+import pytest
+
+import psycopg
+from psycopg import pq
+
+
+@pytest.mark.libpq("< 14")
+def test_unsupported(pgconn):
+    with pytest.raises(psycopg.NotSupportedError):
+        pgconn.enter_pipeline_mode()
+    with pytest.raises(psycopg.NotSupportedError):
+        pgconn.exit_pipeline_mode()
+    with pytest.raises(psycopg.NotSupportedError):
+        pgconn.pipeline_status()
+    with pytest.raises(psycopg.NotSupportedError):
+        pgconn.pipeline_sync()
+
+
+@pytest.mark.libpq(">= 14")
+def test_work_in_progress(pgconn):
+    assert not pgconn.nonblocking
+    pgconn.enter_pipeline_mode()
+    pgconn.send_query_params(b"select $1", [b"1"])
+    with pytest.raises(
+        psycopg.OperationalError, match="cannot exit pipeline mode"
+    ):
+        pgconn.exit_pipeline_mode()
+
+
+@pytest.mark.libpq(">= 14")
+def test_multi_pipelines(pgconn):
+    pgconn.enter_pipeline_mode()
+    pgconn.send_query_params(b"select $1", [b"1"])
+    pgconn.pipeline_sync()
+    pgconn.send_query_params(b"select $1", [b"2"])
+    pgconn.pipeline_sync()
+
+    # result from first query
+    result1 = pgconn.get_result()
+    assert result1 is not None
+    assert result1.status == pq.ExecStatus.TUPLES_OK
+
+    # NULL signals end of result
+    assert pgconn.get_result() is None
+
+    # first sync result
+    sync_result = pgconn.get_result()
+    assert sync_result is not None
+    assert sync_result.status == pq.ExecStatus.PIPELINE_SYNC
+
+    # result from second query
+    result2 = pgconn.get_result()
+    assert result2 is not None
+    assert result2.status == pq.ExecStatus.TUPLES_OK
+
+    # NULL signals end of result
+    assert pgconn.get_result() is None
+
+    # second sync result
+    sync_result = pgconn.get_result()
+    assert sync_result is not None
+    assert sync_result.status == pq.ExecStatus.PIPELINE_SYNC
+
+    # pipeline still ON
+    assert pgconn.pipeline_status() == pq.PipelineStatus.ON
+
+    pgconn.exit_pipeline_mode()
+
+    assert pgconn.pipeline_status() == pq.PipelineStatus.OFF
+
+    assert result1.get_value(0, 0) == b"1"
+    assert result2.get_value(0, 0) == b"2"
+
+
+@pytest.fixture
+def table(pgconn):
+    tablename = "pipeline"
+    pgconn.exec_(f"create table {tablename} (s text)".encode("ascii"))
+    yield tablename
+    pgconn.exec_(f"drop table if exists {tablename}".encode("ascii"))
+
+
+@pytest.mark.libpq(">= 14")
+def test_pipeline_abort(pgconn, table):
+    pgconn.enter_pipeline_mode()
+    pgconn.send_query_params(b"insert into pipeline values ($1)", [b"1"])
+    pgconn.send_query_params(b"select no_such_function($1)", [b"1"])
+    pgconn.send_query_params(b"insert into pipeline values ($1)", [b"2"])
+    pgconn.pipeline_sync()
+    pgconn.send_query_params(b"insert into pipeline values ($1)", [b"3"])
+    pgconn.pipeline_sync()
+
+    # result from first INSERT
+    r = pgconn.get_result()
+    assert r is not None
+    assert r.status == pq.ExecStatus.COMMAND_OK
+
+    # NULL signals end of result
+    assert pgconn.get_result() is None
+
+    # error result from second query (SELECT)
+    r = pgconn.get_result()
+    assert r is not None
+    assert r.status == pq.ExecStatus.FATAL_ERROR
+
+    # NULL signals end of result
+    assert pgconn.get_result() is None
+
+    # pipeline should be aborted, due to previous error
+    assert pgconn.pipeline_status() == pq.PipelineStatus.ABORTED
+
+    # result from second INSERT, aborted due to previous error
+    r = pgconn.get_result()
+    assert r is not None
+    assert r.status == pq.ExecStatus.PIPELINE_ABORTED
+
+    # NULL signals end of result
+    assert pgconn.get_result() is None
+
+    # pipeline is still aborted
+    assert pgconn.pipeline_status() == pq.PipelineStatus.ABORTED
+
+    # sync result
+    r = pgconn.get_result()
+    assert r is not None
+    assert r.status == pq.ExecStatus.PIPELINE_SYNC
+
+    # aborted flag is clear, pipeline is on again
+    assert pgconn.pipeline_status() == pq.PipelineStatus.ON
+
+    # result from the third INSERT
+    r = pgconn.get_result()
+    assert r is not None
+    assert r.status == pq.ExecStatus.COMMAND_OK
+
+    # NULL signals end of result
+    assert pgconn.get_result() is None
+
+    # second sync result
+    r = pgconn.get_result()
+    assert r is not None
+    assert r.status == pq.ExecStatus.PIPELINE_SYNC
+
+    # NULL signals end of result
+    assert pgconn.get_result() is None
+
+    pgconn.exit_pipeline_mode()