From: Denis Laxalde Date: Tue, 22 Jun 2021 14:07:47 +0000 (+0200) Subject: Add libpq interface for pipeline mode X-Git-Tag: 3.0~40^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=515e246b5f933ba93fdde93a03287c6940dbf85b;p=thirdparty%2Fpsycopg.git Add libpq interface for pipeline mode Bindings are conditional so that compilation works with libpq version older than 14. Add a couple of tests inspired by PostgreSQL's tests. --- diff --git a/psycopg/psycopg/pq/__init__.py b/psycopg/psycopg/pq/__init__.py index 5b46c36df..bc1c70dfe 100644 --- a/psycopg/psycopg/pq/__init__.py +++ b/psycopg/psycopg/pq/__init__.py @@ -17,7 +17,7 @@ from . import abc from .misc import ConninfoOption, PGnotify, PGresAttDesc from .misc import error_message from ._enums import ConnStatus, DiagnosticField, ExecStatus, Format -from ._enums import Ping, PollingStatus, TransactionStatus +from ._enums import Ping, PipelineStatus, PollingStatus, TransactionStatus logger = logging.getLogger(__name__) @@ -115,6 +115,7 @@ import_from_libpq() __all__ = ( "ConnStatus", + "PipelineStatus", "PollingStatus", "TransactionStatus", "ExecStatus", diff --git a/psycopg/psycopg/pq/_enums.py b/psycopg/psycopg/pq/_enums.py index 1ea4e7e01..8eca77b45 100644 --- a/psycopg/psycopg/pq/_enums.py +++ b/psycopg/psycopg/pq/_enums.py @@ -102,6 +102,23 @@ class ExecStatus(IntEnum): query. """ + PIPELINE_SYNC = auto() + """ + The PGresult represents a synchronization point in pipeline mode, + requested by PQpipelineSync. + + This status occurs only when pipeline mode has been selected. + """ + + PIPELINE_ABORTED = auto() + """ + The PGresult represents a pipeline that has received an error from the server. + + PQgetResult must be called repeatedly, and each time it will return this + status code until the end of the current pipeline, at which point it will + return PGRES_PIPELINE_SYNC and normal processing can resume. + """ + class TransactionStatus(IntEnum): """ @@ -152,6 +169,27 @@ class Ping(IntEnum): """ +class PipelineStatus(IntEnum): + """Pipeline mode status of the libpq connection.""" + + __module__ = "psycopg.pq" + + OFF = 0 + """ + The libpq connection is *not* in pipeline mode. + """ + ON = auto() + """ + The libpq connection is in pipeline mode. + """ + ABORTED = auto() + """ + The libpq connection is in pipeline mode and an error occurred while + processing the current pipeline. The aborted flag is cleared when + PQgetResult returns a result of type PGRES_PIPELINE_SYNC. + """ + + class DiagnosticField(IntEnum): """ Fields in an error report. diff --git a/psycopg/psycopg/pq/_pq_ctypes.py b/psycopg/psycopg/pq/_pq_ctypes.py index 79d6d9a59..d7f326ed7 100644 --- a/psycopg/psycopg/pq/_pq_ctypes.py +++ b/psycopg/psycopg/pq/_pq_ctypes.py @@ -597,6 +597,80 @@ PQsetNoticeReceiver = pq.PQsetNoticeReceiver PQsetNoticeReceiver.argtypes = [PGconn_ptr, PQnoticeReceiver, c_void_p] PQsetNoticeReceiver.restype = PQnoticeReceiver +# 34.5 Pipeline Mode + +_PQpipelineStatus = None +_PQenterPipelineMode = None +_PQexitPipelineMode = None +_PQpipelineSync = None +_PQsendFlushRequest = None + +if libpq_version >= 140000: + _PQpipelineStatus = pq.PQpipelineStatus + _PQpipelineStatus.argtypes = [PGconn_ptr] + _PQpipelineStatus.restype = c_int + + _PQenterPipelineMode = pq.PQenterPipelineMode + _PQenterPipelineMode.argtypes = [PGconn_ptr] + _PQenterPipelineMode.restype = c_int + + _PQexitPipelineMode = pq.PQexitPipelineMode + _PQexitPipelineMode.argtypes = [PGconn_ptr] + _PQexitPipelineMode.restype = c_int + + _PQpipelineSync = pq.PQpipelineSync + _PQpipelineSync.argtypes = [PGconn_ptr] + _PQpipelineSync.restype = c_int + + _PQsendFlushRequest = pq.PQsendFlushRequest + _PQsendFlushRequest.argtypes = [PGconn_ptr] + _PQsendFlushRequest.restype = c_int + + +def PQpipelineStatus(pgconn: PGconn_struct) -> int: + if not _PQpipelineStatus: + raise NotSupportedError( + f"PQpipelineStatus requires libpq from PostgreSQL 14," + f" {libpq_version} available instead" + ) + return _PQpipelineStatus(pgconn) + + +def PQenterPipelineMode(pgconn: PGconn_struct) -> int: + if not _PQenterPipelineMode: + raise NotSupportedError( + f"PQenterPipelineMode requires libpq from PostgreSQL 14," + f" {libpq_version} available instead" + ) + return _PQenterPipelineMode(pgconn) + + +def PQexitPipelineMode(pgconn: PGconn_struct) -> int: + if not _PQexitPipelineMode: + raise NotSupportedError( + f"PQexitPipelineMode requires libpq from PostgreSQL 14," + f" {libpq_version} available instead" + ) + return _PQexitPipelineMode(pgconn) + + +def PQpipelineSync(pgconn: PGconn_struct) -> int: + if not _PQpipelineSync: + raise NotSupportedError( + f"PQpipelineSync requires libpq from PostgreSQL 14," + f" {libpq_version} available instead" + ) + return _PQpipelineSync(pgconn) + + +def PQsendFlushRequest(pgconn: PGconn_struct) -> int: + if not _PQsendFlushRequest: + raise NotSupportedError( + f"PQsendFlushRequest requires libpq from PostgreSQL 14," + f" {libpq_version} available instead" + ) + return _PQsendFlushRequest(pgconn) + # 33.18. SSL Support diff --git a/psycopg/psycopg/pq/_pq_ctypes.pyi b/psycopg/psycopg/pq/_pq_ctypes.pyi index e55bfc2e9..c2fe57ca6 100644 --- a/psycopg/psycopg/pq/_pq_ctypes.pyi +++ b/psycopg/psycopg/pq/_pq_ctypes.pyi @@ -116,6 +116,11 @@ def PQencryptPasswordConn( arg3: bytes, arg4: Optional[bytes], ) -> bytes: ... +def PQpipelineStatus(pgconn: Optional[PGconn_struct]) -> int: ... +def PQenterPipelineMode(pgconn: Optional[PGconn_struct]) -> int: ... +def PQexitPipelineMode(pgconn: Optional[PGconn_struct]) -> int: ... +def PQpipelineSync(pgconn: Optional[PGconn_struct]) -> int: ... +def PQsendFlushRequest(pgconn: Optional[PGconn_struct]) -> int: ... # fmt: off # autogenerated: start @@ -194,6 +199,11 @@ def PQfreeCancel(arg1: Optional[PGcancel_struct]) -> None: ... def PQputCopyData(arg1: Optional[PGconn_struct], arg2: bytes, arg3: int) -> int: ... def PQfreemem(arg1: Any) -> None: ... def PQmakeEmptyPGresult(arg1: Optional[PGconn_struct], arg2: int) -> PGresult_struct: ... +def _PQpipelineStatus(arg1: Optional[PGconn_struct]) -> int: ... +def _PQenterPipelineMode(arg1: Optional[PGconn_struct]) -> int: ... +def _PQexitPipelineMode(arg1: Optional[PGconn_struct]) -> int: ... +def _PQpipelineSync(arg1: Optional[PGconn_struct]) -> int: ... +def _PQsendFlushRequest(arg1: Optional[PGconn_struct]) -> int: ... def PQinitOpenSSL(arg1: int, arg2: int) -> None: ... # autogenerated: end # fmt: on diff --git a/psycopg/psycopg/pq/abc.py b/psycopg/psycopg/pq/abc.py index 1423ed485..a7f5d847c 100644 --- a/psycopg/psycopg/pq/abc.py +++ b/psycopg/psycopg/pq/abc.py @@ -4,10 +4,10 @@ Protocol objects to represent objects exposed by different pq implementations. # Copyright (C) 2020-2021 The Psycopg Team -from typing import Any, Callable, List, Optional, Sequence, Tuple, Union -from typing import TYPE_CHECKING +from typing import Any, Callable, List, Optional, Sequence, Tuple +from typing import Union, TYPE_CHECKING -from ._enums import Format +from ._enums import Format, PipelineStatus from .._compat import Protocol if TYPE_CHECKING: @@ -242,6 +242,18 @@ class PGconn(Protocol): def make_empty_result(self, exec_status: int) -> "PGresult": ... + def pipeline_status(self) -> PipelineStatus: + ... + + def enter_pipeline_mode(self) -> None: + ... + + def exit_pipeline_mode(self) -> None: + ... + + def pipeline_sync(self) -> None: + ... + class PGresult(Protocol): def clear(self) -> None: diff --git a/psycopg/psycopg/pq/pq_ctypes.py b/psycopg/psycopg/pq/pq_ctypes.py index 5c8bb0ba8..0b18f31e7 100644 --- a/psycopg/psycopg/pq/pq_ctypes.py +++ b/psycopg/psycopg/pq/pq_ctypes.py @@ -22,7 +22,7 @@ from .. import errors as e from . import _pq_ctypes as impl from .misc import PGnotify, ConninfoOption, PGresAttDesc from .misc import error_message, connection_summary -from ._enums import Format, ExecStatus +from ._enums import Format, ExecStatus, PipelineStatus if TYPE_CHECKING: from . import abc @@ -629,6 +629,39 @@ class PGconn: raise MemoryError("couldn't allocate empty PGresult") return PGresult(rv) + def pipeline_status(self) -> PipelineStatus: + return PipelineStatus(impl.PQpipelineStatus(self._pgconn_ptr)) + + def enter_pipeline_mode(self) -> None: + """Enter pipeline mode. + + :raises ~e.OperationalError: in case of failure to enter the pipeline + mode. + """ + if impl.PQenterPipelineMode(self._pgconn_ptr) != 1: + raise e.OperationalError("failed to enter pipeline mode") + + def exit_pipeline_mode(self) -> None: + """Exit pipeline mode. + + :raises ~e.OperationalError: in case of failure to exit the pipeline + mode. + """ + if impl.PQexitPipelineMode(self._pgconn_ptr) != 1: + raise e.OperationalError(error_message(self)) + + def pipeline_sync(self) -> None: + """Mark a synchronization point in a pipeline. + + :raises ~e.OperationalError: if the connection is not in pipeline mode + or if sync failed. + """ + rv = impl.PQpipelineSync(self._pgconn_ptr) + if rv == 0: + raise e.OperationalError("connection not in pipeline mode") + if rv != 1: + raise e.OperationalError("failed to sync pipeline") + def _call_bytes( self, func: Callable[[impl.PGconn_struct], Optional[bytes]] ) -> bytes: diff --git a/psycopg_c/psycopg_c/pq/libpq.pxd b/psycopg_c/psycopg_c/pq/libpq.pxd index 40b36e98e..9fed6018b 100644 --- a/psycopg_c/psycopg_c/pq/libpq.pxd +++ b/psycopg_c/psycopg_c/pq/libpq.pxd @@ -267,6 +267,18 @@ cdef extern from "libpq-fe.h": # 33.18. SSL Support void PQinitOpenSSL(int do_ssl, int do_crypto) + # 34.5 Pipeline Mode + + ctypedef enum PGpipelineStatus: + PQ_PIPELINE_OFF + PQ_PIPELINE_ON + PQ_PIPELINE_ABORTED + + PGpipelineStatus PQpipelineStatus(const PGconn *conn) + int PQenterPipelineMode(PGconn *conn) + int PQexitPipelineMode(PGconn *conn) + int PQpipelineSync(PGconn *conn) + int PQsendFlushRequest(PGconn *conn) cdef extern from *: """ @@ -278,4 +290,17 @@ cdef extern from *: #if PG_VERSION_NUM < 120000 #define PQhostaddr(conn) NULL #endif + +#if PG_VERSION_NUM < 140000 +typedef enum { + PQ_PIPELINE_OFF, + PQ_PIPELINE_ON, + PQ_PIPELINE_ABORTED +} PGpipelineStatus; +#define PQpipelineStatus(conn) NULL +#define PQenterPipelineMode(conn) NULL +#define PQexitPipelineMode(conn) NULL +#define PQpipelineSync(conn) NULL +#define PQsendFlushRequest(conn) NULL +#endif """ diff --git a/psycopg_c/psycopg_c/pq/pgconn.pyx b/psycopg_c/psycopg_c/pq/pgconn.pyx index 4062ed8d8..c0bd78253 100644 --- a/psycopg_c/psycopg_c/pq/pgconn.pyx +++ b/psycopg_c/psycopg_c/pq/pgconn.pyx @@ -23,8 +23,9 @@ from cpython.memoryview cimport PyMemoryView_FromObject import ctypes import logging +from typing import Iterator -from psycopg.pq import Format as PqFormat +from psycopg.pq import Format as PqFormat, PipelineStatus from psycopg.pq.misc import PGnotify, connection_summary from psycopg_c.pq cimport PQBuffer @@ -535,6 +536,61 @@ cdef class PGconn: raise MemoryError("couldn't allocate empty PGresult") return PGresult._from_ptr(rv) + def pipeline_status(self) -> PipelineStatus: + """Return the current pipeline mode status.""" + if libpq.PG_VERSION_NUM < 140000: + raise e.NotSupportedError( + f"PQpipelineStatus requires libpq from PostgreSQL 14," + f" {libpq.PG_VERSION_NUM} available instead" + ) + cdef int status = libpq.PQpipelineStatus(self._pgconn_ptr) + return PipelineStatus(status) + + def enter_pipeline_mode(self) -> None: + """Enter pipeline mode. + + :raises ~e.OperationalError: in case of failure to enter the pipeline + mode. + """ + if libpq.PG_VERSION_NUM < 140000: + raise e.NotSupportedError( + f"PQenterPipelineMode requires libpq from PostgreSQL 14," + f" {libpq.PG_VERSION_NUM} available instead" + ) + if libpq.PQenterPipelineMode(self._pgconn_ptr) != 1: + raise e.OperationalError("failed to enter pipeline mode") + + def exit_pipeline_mode(self) -> None: + """Exit pipeline mode. + + :raises ~e.OperationalError: in case of failure to exit the pipeline + mode. + """ + if libpq.PG_VERSION_NUM < 140000: + raise e.NotSupportedError( + f"PQexitPipelineMode requires libpq from PostgreSQL 14," + f" {libpq.PG_VERSION_NUM} available instead" + ) + if libpq.PQexitPipelineMode(self._pgconn_ptr) != 1: + raise e.OperationalError(error_message(self)) + + def pipeline_sync(self) -> None: + """Mark a synchronization point in a pipeline. + + :raises ~e.OperationalError: if the connection is not in pipeline mode + or if sync failed. + """ + if libpq.PG_VERSION_NUM < 140000: + raise e.NotSupportedError( + f"PQpipelineSync requires libpq from PostgreSQL 14," + f" {libpq.PG_VERSION_NUM} available instead" + ) + rv = libpq.PQpipelineSync(self._pgconn_ptr) + if rv == 0: + raise e.OperationalError("connection not in pipeline mode") + if rv != 1: + raise e.OperationalError("failed to sync pipeline") + cdef int _ensure_pgconn(PGconn pgconn) except 0: if pgconn._pgconn_ptr is not NULL: diff --git a/tests/pq/test_pipeline.py b/tests/pq/test_pipeline.py new file mode 100644 index 000000000..5fe879da3 --- /dev/null +++ b/tests/pq/test_pipeline.py @@ -0,0 +1,147 @@ +import pytest + +import psycopg +from psycopg import pq + + +@pytest.mark.libpq("< 14") +def test_unsupported(pgconn): + with pytest.raises(psycopg.NotSupportedError): + pgconn.enter_pipeline_mode() + with pytest.raises(psycopg.NotSupportedError): + pgconn.exit_pipeline_mode() + with pytest.raises(psycopg.NotSupportedError): + pgconn.pipeline_status() + with pytest.raises(psycopg.NotSupportedError): + pgconn.pipeline_sync() + + +@pytest.mark.libpq(">= 14") +def test_work_in_progress(pgconn): + assert not pgconn.nonblocking + pgconn.enter_pipeline_mode() + pgconn.send_query_params(b"select $1", [b"1"]) + with pytest.raises( + psycopg.OperationalError, match="cannot exit pipeline mode" + ): + pgconn.exit_pipeline_mode() + + +@pytest.mark.libpq(">= 14") +def test_multi_pipelines(pgconn): + pgconn.enter_pipeline_mode() + pgconn.send_query_params(b"select $1", [b"1"]) + pgconn.pipeline_sync() + pgconn.send_query_params(b"select $1", [b"2"]) + pgconn.pipeline_sync() + + # result from first query + result1 = pgconn.get_result() + assert result1 is not None + assert result1.status == pq.ExecStatus.TUPLES_OK + + # NULL signals end of result + assert pgconn.get_result() is None + + # first sync result + sync_result = pgconn.get_result() + assert sync_result is not None + assert sync_result.status == pq.ExecStatus.PIPELINE_SYNC + + # result from second query + result2 = pgconn.get_result() + assert result2 is not None + assert result2.status == pq.ExecStatus.TUPLES_OK + + # NULL signals end of result + assert pgconn.get_result() is None + + # second sync result + sync_result = pgconn.get_result() + assert sync_result is not None + assert sync_result.status == pq.ExecStatus.PIPELINE_SYNC + + # pipeline still ON + assert pgconn.pipeline_status() == pq.PipelineStatus.ON + + pgconn.exit_pipeline_mode() + + assert pgconn.pipeline_status() == pq.PipelineStatus.OFF + + assert result1.get_value(0, 0) == b"1" + assert result2.get_value(0, 0) == b"2" + + +@pytest.fixture +def table(pgconn): + tablename = "pipeline" + pgconn.exec_(f"create table {tablename} (s text)".encode("ascii")) + yield tablename + pgconn.exec_(f"drop table if exists {tablename}".encode("ascii")) + + +@pytest.mark.libpq(">= 14") +def test_pipeline_abort(pgconn, table): + pgconn.enter_pipeline_mode() + pgconn.send_query_params(b"insert into pipeline values ($1)", [b"1"]) + pgconn.send_query_params(b"select no_such_function($1)", [b"1"]) + pgconn.send_query_params(b"insert into pipeline values ($1)", [b"2"]) + pgconn.pipeline_sync() + pgconn.send_query_params(b"insert into pipeline values ($1)", [b"3"]) + pgconn.pipeline_sync() + + # result from first INSERT + r = pgconn.get_result() + assert r is not None + assert r.status == pq.ExecStatus.COMMAND_OK + + # NULL signals end of result + assert pgconn.get_result() is None + + # error result from second query (SELECT) + r = pgconn.get_result() + assert r is not None + assert r.status == pq.ExecStatus.FATAL_ERROR + + # NULL signals end of result + assert pgconn.get_result() is None + + # pipeline should be aborted, due to previous error + assert pgconn.pipeline_status() == pq.PipelineStatus.ABORTED + + # result from second INSERT, aborted due to previous error + r = pgconn.get_result() + assert r is not None + assert r.status == pq.ExecStatus.PIPELINE_ABORTED + + # NULL signals end of result + assert pgconn.get_result() is None + + # pipeline is still aborted + assert pgconn.pipeline_status() == pq.PipelineStatus.ABORTED + + # sync result + r = pgconn.get_result() + assert r is not None + assert r.status == pq.ExecStatus.PIPELINE_SYNC + + # aborted flag is clear, pipeline is on again + assert pgconn.pipeline_status() == pq.PipelineStatus.ON + + # result from the third INSERT + r = pgconn.get_result() + assert r is not None + assert r.status == pq.ExecStatus.COMMAND_OK + + # NULL signals end of result + assert pgconn.get_result() is None + + # second sync result + r = pgconn.get_result() + assert r is not None + assert r.status == pq.ExecStatus.PIPELINE_SYNC + + # NULL signals end of result + assert pgconn.get_result() is None + + pgconn.exit_pipeline_mode()