from .misc import ConninfoOption, PGnotify, PGresAttDesc
from .misc import error_message
from ._enums import ConnStatus, DiagnosticField, ExecStatus, Format
-from ._enums import Ping, PollingStatus, TransactionStatus
+from ._enums import Ping, PipelineStatus, PollingStatus, TransactionStatus
logger = logging.getLogger(__name__)
__all__ = (
"ConnStatus",
+ "PipelineStatus",
"PollingStatus",
"TransactionStatus",
"ExecStatus",
query.
"""
+ PIPELINE_SYNC = auto()
+ """
+ The PGresult represents a synchronization point in pipeline mode,
+ requested by PQpipelineSync.
+
+ This status occurs only when pipeline mode has been selected.
+ """
+
+ PIPELINE_ABORTED = auto()
+ """
+ The PGresult represents a pipeline that has received an error from the server.
+
+ PQgetResult must be called repeatedly, and each time it will return this
+ status code until the end of the current pipeline, at which point it will
+ return PGRES_PIPELINE_SYNC and normal processing can resume.
+ """
+
class TransactionStatus(IntEnum):
"""
"""
+class PipelineStatus(IntEnum):
+ """Pipeline mode status of the libpq connection."""
+
+ __module__ = "psycopg.pq"
+
+ OFF = 0
+ """
+ The libpq connection is *not* in pipeline mode.
+ """
+ ON = auto()
+ """
+ The libpq connection is in pipeline mode.
+ """
+ ABORTED = auto()
+ """
+ The libpq connection is in pipeline mode and an error occurred while
+ processing the current pipeline. The aborted flag is cleared when
+ PQgetResult returns a result of type PGRES_PIPELINE_SYNC.
+ """
+
+
class DiagnosticField(IntEnum):
"""
Fields in an error report.
PQsetNoticeReceiver.argtypes = [PGconn_ptr, PQnoticeReceiver, c_void_p]
PQsetNoticeReceiver.restype = PQnoticeReceiver
+# 34.5 Pipeline Mode
+
+_PQpipelineStatus = None
+_PQenterPipelineMode = None
+_PQexitPipelineMode = None
+_PQpipelineSync = None
+_PQsendFlushRequest = None
+
+if libpq_version >= 140000:
+ _PQpipelineStatus = pq.PQpipelineStatus
+ _PQpipelineStatus.argtypes = [PGconn_ptr]
+ _PQpipelineStatus.restype = c_int
+
+ _PQenterPipelineMode = pq.PQenterPipelineMode
+ _PQenterPipelineMode.argtypes = [PGconn_ptr]
+ _PQenterPipelineMode.restype = c_int
+
+ _PQexitPipelineMode = pq.PQexitPipelineMode
+ _PQexitPipelineMode.argtypes = [PGconn_ptr]
+ _PQexitPipelineMode.restype = c_int
+
+ _PQpipelineSync = pq.PQpipelineSync
+ _PQpipelineSync.argtypes = [PGconn_ptr]
+ _PQpipelineSync.restype = c_int
+
+ _PQsendFlushRequest = pq.PQsendFlushRequest
+ _PQsendFlushRequest.argtypes = [PGconn_ptr]
+ _PQsendFlushRequest.restype = c_int
+
+
+def PQpipelineStatus(pgconn: PGconn_struct) -> int:
+ if not _PQpipelineStatus:
+ raise NotSupportedError(
+ f"PQpipelineStatus requires libpq from PostgreSQL 14,"
+ f" {libpq_version} available instead"
+ )
+ return _PQpipelineStatus(pgconn)
+
+
+def PQenterPipelineMode(pgconn: PGconn_struct) -> int:
+ if not _PQenterPipelineMode:
+ raise NotSupportedError(
+ f"PQenterPipelineMode requires libpq from PostgreSQL 14,"
+ f" {libpq_version} available instead"
+ )
+ return _PQenterPipelineMode(pgconn)
+
+
+def PQexitPipelineMode(pgconn: PGconn_struct) -> int:
+ if not _PQexitPipelineMode:
+ raise NotSupportedError(
+ f"PQexitPipelineMode requires libpq from PostgreSQL 14,"
+ f" {libpq_version} available instead"
+ )
+ return _PQexitPipelineMode(pgconn)
+
+
+def PQpipelineSync(pgconn: PGconn_struct) -> int:
+ if not _PQpipelineSync:
+ raise NotSupportedError(
+ f"PQpipelineSync requires libpq from PostgreSQL 14,"
+ f" {libpq_version} available instead"
+ )
+ return _PQpipelineSync(pgconn)
+
+
+def PQsendFlushRequest(pgconn: PGconn_struct) -> int:
+ if not _PQsendFlushRequest:
+ raise NotSupportedError(
+ f"PQsendFlushRequest requires libpq from PostgreSQL 14,"
+ f" {libpq_version} available instead"
+ )
+ return _PQsendFlushRequest(pgconn)
+
# 33.18. SSL Support
arg3: bytes,
arg4: Optional[bytes],
) -> bytes: ...
+def PQpipelineStatus(pgconn: Optional[PGconn_struct]) -> int: ...
+def PQenterPipelineMode(pgconn: Optional[PGconn_struct]) -> int: ...
+def PQexitPipelineMode(pgconn: Optional[PGconn_struct]) -> int: ...
+def PQpipelineSync(pgconn: Optional[PGconn_struct]) -> int: ...
+def PQsendFlushRequest(pgconn: Optional[PGconn_struct]) -> int: ...
# fmt: off
# autogenerated: start
def PQputCopyData(arg1: Optional[PGconn_struct], arg2: bytes, arg3: int) -> int: ...
def PQfreemem(arg1: Any) -> None: ...
def PQmakeEmptyPGresult(arg1: Optional[PGconn_struct], arg2: int) -> PGresult_struct: ...
+def _PQpipelineStatus(arg1: Optional[PGconn_struct]) -> int: ...
+def _PQenterPipelineMode(arg1: Optional[PGconn_struct]) -> int: ...
+def _PQexitPipelineMode(arg1: Optional[PGconn_struct]) -> int: ...
+def _PQpipelineSync(arg1: Optional[PGconn_struct]) -> int: ...
+def _PQsendFlushRequest(arg1: Optional[PGconn_struct]) -> int: ...
def PQinitOpenSSL(arg1: int, arg2: int) -> None: ...
# autogenerated: end
# fmt: on
# Copyright (C) 2020-2021 The Psycopg Team
-from typing import Any, Callable, List, Optional, Sequence, Tuple, Union
-from typing import TYPE_CHECKING
+from typing import Any, Callable, List, Optional, Sequence, Tuple
+from typing import Union, TYPE_CHECKING
-from ._enums import Format
+from ._enums import Format, PipelineStatus
from .._compat import Protocol
if TYPE_CHECKING:
def make_empty_result(self, exec_status: int) -> "PGresult":
...
+ def pipeline_status(self) -> PipelineStatus:
+ ...
+
+ def enter_pipeline_mode(self) -> None:
+ ...
+
+ def exit_pipeline_mode(self) -> None:
+ ...
+
+ def pipeline_sync(self) -> None:
+ ...
+
class PGresult(Protocol):
def clear(self) -> None:
from . import _pq_ctypes as impl
from .misc import PGnotify, ConninfoOption, PGresAttDesc
from .misc import error_message, connection_summary
-from ._enums import Format, ExecStatus
+from ._enums import Format, ExecStatus, PipelineStatus
if TYPE_CHECKING:
from . import abc
raise MemoryError("couldn't allocate empty PGresult")
return PGresult(rv)
+ def pipeline_status(self) -> PipelineStatus:
+ return PipelineStatus(impl.PQpipelineStatus(self._pgconn_ptr))
+
+ def enter_pipeline_mode(self) -> None:
+ """Enter pipeline mode.
+
+ :raises ~e.OperationalError: in case of failure to enter the pipeline
+ mode.
+ """
+ if impl.PQenterPipelineMode(self._pgconn_ptr) != 1:
+ raise e.OperationalError("failed to enter pipeline mode")
+
+ def exit_pipeline_mode(self) -> None:
+ """Exit pipeline mode.
+
+ :raises ~e.OperationalError: in case of failure to exit the pipeline
+ mode.
+ """
+ if impl.PQexitPipelineMode(self._pgconn_ptr) != 1:
+ raise e.OperationalError(error_message(self))
+
+ def pipeline_sync(self) -> None:
+ """Mark a synchronization point in a pipeline.
+
+ :raises ~e.OperationalError: if the connection is not in pipeline mode
+ or if sync failed.
+ """
+ rv = impl.PQpipelineSync(self._pgconn_ptr)
+ if rv == 0:
+ raise e.OperationalError("connection not in pipeline mode")
+ if rv != 1:
+ raise e.OperationalError("failed to sync pipeline")
+
def _call_bytes(
self, func: Callable[[impl.PGconn_struct], Optional[bytes]]
) -> bytes:
# 33.18. SSL Support
void PQinitOpenSSL(int do_ssl, int do_crypto)
+ # 34.5 Pipeline Mode
+
+ ctypedef enum PGpipelineStatus:
+ PQ_PIPELINE_OFF
+ PQ_PIPELINE_ON
+ PQ_PIPELINE_ABORTED
+
+ PGpipelineStatus PQpipelineStatus(const PGconn *conn)
+ int PQenterPipelineMode(PGconn *conn)
+ int PQexitPipelineMode(PGconn *conn)
+ int PQpipelineSync(PGconn *conn)
+ int PQsendFlushRequest(PGconn *conn)
cdef extern from *:
"""
#if PG_VERSION_NUM < 120000
#define PQhostaddr(conn) NULL
#endif
+
+#if PG_VERSION_NUM < 140000
+typedef enum {
+ PQ_PIPELINE_OFF,
+ PQ_PIPELINE_ON,
+ PQ_PIPELINE_ABORTED
+} PGpipelineStatus;
+#define PQpipelineStatus(conn) NULL
+#define PQenterPipelineMode(conn) NULL
+#define PQexitPipelineMode(conn) NULL
+#define PQpipelineSync(conn) NULL
+#define PQsendFlushRequest(conn) NULL
+#endif
"""
import ctypes
import logging
+from typing import Iterator
-from psycopg.pq import Format as PqFormat
+from psycopg.pq import Format as PqFormat, PipelineStatus
from psycopg.pq.misc import PGnotify, connection_summary
from psycopg_c.pq cimport PQBuffer
raise MemoryError("couldn't allocate empty PGresult")
return PGresult._from_ptr(rv)
+ def pipeline_status(self) -> PipelineStatus:
+ """Return the current pipeline mode status."""
+ if libpq.PG_VERSION_NUM < 140000:
+ raise e.NotSupportedError(
+ f"PQpipelineStatus requires libpq from PostgreSQL 14,"
+ f" {libpq.PG_VERSION_NUM} available instead"
+ )
+ cdef int status = libpq.PQpipelineStatus(self._pgconn_ptr)
+ return PipelineStatus(status)
+
+ def enter_pipeline_mode(self) -> None:
+ """Enter pipeline mode.
+
+ :raises ~e.OperationalError: in case of failure to enter the pipeline
+ mode.
+ """
+ if libpq.PG_VERSION_NUM < 140000:
+ raise e.NotSupportedError(
+ f"PQenterPipelineMode requires libpq from PostgreSQL 14,"
+ f" {libpq.PG_VERSION_NUM} available instead"
+ )
+ if libpq.PQenterPipelineMode(self._pgconn_ptr) != 1:
+ raise e.OperationalError("failed to enter pipeline mode")
+
+ def exit_pipeline_mode(self) -> None:
+ """Exit pipeline mode.
+
+ :raises ~e.OperationalError: in case of failure to exit the pipeline
+ mode.
+ """
+ if libpq.PG_VERSION_NUM < 140000:
+ raise e.NotSupportedError(
+ f"PQexitPipelineMode requires libpq from PostgreSQL 14,"
+ f" {libpq.PG_VERSION_NUM} available instead"
+ )
+ if libpq.PQexitPipelineMode(self._pgconn_ptr) != 1:
+ raise e.OperationalError(error_message(self))
+
+ def pipeline_sync(self) -> None:
+ """Mark a synchronization point in a pipeline.
+
+ :raises ~e.OperationalError: if the connection is not in pipeline mode
+ or if sync failed.
+ """
+ if libpq.PG_VERSION_NUM < 140000:
+ raise e.NotSupportedError(
+ f"PQpipelineSync requires libpq from PostgreSQL 14,"
+ f" {libpq.PG_VERSION_NUM} available instead"
+ )
+ rv = libpq.PQpipelineSync(self._pgconn_ptr)
+ if rv == 0:
+ raise e.OperationalError("connection not in pipeline mode")
+ if rv != 1:
+ raise e.OperationalError("failed to sync pipeline")
+
cdef int _ensure_pgconn(PGconn pgconn) except 0:
if pgconn._pgconn_ptr is not NULL:
--- /dev/null
+import pytest
+
+import psycopg
+from psycopg import pq
+
+
+@pytest.mark.libpq("< 14")
+def test_unsupported(pgconn):
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.enter_pipeline_mode()
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.exit_pipeline_mode()
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.pipeline_status()
+ with pytest.raises(psycopg.NotSupportedError):
+ pgconn.pipeline_sync()
+
+
+@pytest.mark.libpq(">= 14")
+def test_work_in_progress(pgconn):
+ assert not pgconn.nonblocking
+ pgconn.enter_pipeline_mode()
+ pgconn.send_query_params(b"select $1", [b"1"])
+ with pytest.raises(
+ psycopg.OperationalError, match="cannot exit pipeline mode"
+ ):
+ pgconn.exit_pipeline_mode()
+
+
+@pytest.mark.libpq(">= 14")
+def test_multi_pipelines(pgconn):
+ pgconn.enter_pipeline_mode()
+ pgconn.send_query_params(b"select $1", [b"1"])
+ pgconn.pipeline_sync()
+ pgconn.send_query_params(b"select $1", [b"2"])
+ pgconn.pipeline_sync()
+
+ # result from first query
+ result1 = pgconn.get_result()
+ assert result1 is not None
+ assert result1.status == pq.ExecStatus.TUPLES_OK
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ # first sync result
+ sync_result = pgconn.get_result()
+ assert sync_result is not None
+ assert sync_result.status == pq.ExecStatus.PIPELINE_SYNC
+
+ # result from second query
+ result2 = pgconn.get_result()
+ assert result2 is not None
+ assert result2.status == pq.ExecStatus.TUPLES_OK
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ # second sync result
+ sync_result = pgconn.get_result()
+ assert sync_result is not None
+ assert sync_result.status == pq.ExecStatus.PIPELINE_SYNC
+
+ # pipeline still ON
+ assert pgconn.pipeline_status() == pq.PipelineStatus.ON
+
+ pgconn.exit_pipeline_mode()
+
+ assert pgconn.pipeline_status() == pq.PipelineStatus.OFF
+
+ assert result1.get_value(0, 0) == b"1"
+ assert result2.get_value(0, 0) == b"2"
+
+
+@pytest.fixture
+def table(pgconn):
+ tablename = "pipeline"
+ pgconn.exec_(f"create table {tablename} (s text)".encode("ascii"))
+ yield tablename
+ pgconn.exec_(f"drop table if exists {tablename}".encode("ascii"))
+
+
+@pytest.mark.libpq(">= 14")
+def test_pipeline_abort(pgconn, table):
+ pgconn.enter_pipeline_mode()
+ pgconn.send_query_params(b"insert into pipeline values ($1)", [b"1"])
+ pgconn.send_query_params(b"select no_such_function($1)", [b"1"])
+ pgconn.send_query_params(b"insert into pipeline values ($1)", [b"2"])
+ pgconn.pipeline_sync()
+ pgconn.send_query_params(b"insert into pipeline values ($1)", [b"3"])
+ pgconn.pipeline_sync()
+
+ # result from first INSERT
+ r = pgconn.get_result()
+ assert r is not None
+ assert r.status == pq.ExecStatus.COMMAND_OK
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ # error result from second query (SELECT)
+ r = pgconn.get_result()
+ assert r is not None
+ assert r.status == pq.ExecStatus.FATAL_ERROR
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ # pipeline should be aborted, due to previous error
+ assert pgconn.pipeline_status() == pq.PipelineStatus.ABORTED
+
+ # result from second INSERT, aborted due to previous error
+ r = pgconn.get_result()
+ assert r is not None
+ assert r.status == pq.ExecStatus.PIPELINE_ABORTED
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ # pipeline is still aborted
+ assert pgconn.pipeline_status() == pq.PipelineStatus.ABORTED
+
+ # sync result
+ r = pgconn.get_result()
+ assert r is not None
+ assert r.status == pq.ExecStatus.PIPELINE_SYNC
+
+ # aborted flag is clear, pipeline is on again
+ assert pgconn.pipeline_status() == pq.PipelineStatus.ON
+
+ # result from the third INSERT
+ r = pgconn.get_result()
+ assert r is not None
+ assert r.status == pq.ExecStatus.COMMAND_OK
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ # second sync result
+ r = pgconn.get_result()
+ assert r is not None
+ assert r.status == pq.ExecStatus.PIPELINE_SYNC
+
+ # NULL signals end of result
+ assert pgconn.get_result() is None
+
+ pgconn.exit_pipeline_mode()