From e8b07574659417ecf2ff92a26ef79fa681125009 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sat, 6 Sep 2025 01:48:48 +0200 Subject: [PATCH] refactor: separate sync/async pipeline in their own modules --- psycopg/psycopg/__init__.py | 3 +- psycopg/psycopg/_connection_base.py | 2 +- psycopg/psycopg/_pipeline.py | 209 +--------------------------- psycopg/psycopg/_pipeline_async.py | 60 ++++++++ psycopg/psycopg/_pipeline_base.py | 181 ++++++++++++++++++++++++ psycopg/psycopg/connection_async.py | 2 +- tools/async_to_sync.py | 1 + 7 files changed, 248 insertions(+), 210 deletions(-) create mode 100644 psycopg/psycopg/_pipeline_async.py create mode 100644 psycopg/psycopg/_pipeline_base.py diff --git a/psycopg/psycopg/__init__.py b/psycopg/psycopg/__init__.py index 10f6f59e9..89165a859 100644 --- a/psycopg/psycopg/__init__.py +++ b/psycopg/psycopg/__init__.py @@ -19,7 +19,7 @@ from ._column import Column from .dbapi20 import BINARY, DATETIME, NUMBER, ROWID, STRING, Binary, Date from .dbapi20 import DateFromTicks, Time, TimeFromTicks, Timestamp, TimestampFromTicks from .version import __version__ as __version__ # noqa: F401 -from ._pipeline import AsyncPipeline, Pipeline +from ._pipeline import Pipeline from .connection import Connection from .raw_cursor import AsyncRawCursor, AsyncRawServerCursor, RawCursor, RawServerCursor from .transaction import AsyncTransaction, Rollback, Transaction @@ -27,6 +27,7 @@ from .cursor_async import AsyncCursor from ._capabilities import Capabilities, capabilities from .client_cursor import AsyncClientCursor, ClientCursor from ._server_cursor import ServerCursor +from ._pipeline_async import AsyncPipeline from ._connection_base import BaseConnection, Notify from ._connection_info import ConnectionInfo from .connection_async import AsyncConnection diff --git a/psycopg/psycopg/_connection_base.py b/psycopg/psycopg/_connection_base.py index 1a2b777af..47409e2bb 100644 --- a/psycopg/psycopg/_connection_base.py +++ b/psycopg/psycopg/_connection_base.py @@ -25,9 +25,9 @@ from .adapt import AdaptersMap from ._enums import IsolationLevel from ._compat import LiteralString, Self, TypeVar from .pq.misc import connection_summary -from ._pipeline import BasePipeline from ._preparing import PrepareManager from ._capabilities import capabilities +from ._pipeline_base import BasePipeline from ._connection_info import ConnectionInfo if TYPE_CHECKING: diff --git a/psycopg/psycopg/_pipeline.py b/psycopg/psycopg/_pipeline.py index dcc225c9f..a32064af7 100644 --- a/psycopg/psycopg/_pipeline.py +++ b/psycopg/psycopg/_pipeline.py @@ -8,183 +8,18 @@ from __future__ import annotations import logging from types import TracebackType -from typing import TYPE_CHECKING, Any, TypeAlias -from collections import deque +from typing import TYPE_CHECKING, Any from . import errors as e -from . import pq -from .abc import PipelineCommand, PQGen from ._compat import Self -from .pq.misc import connection_summary -from .generators import fetch_many, pipeline_communicate, send -from ._capabilities import capabilities +from ._pipeline_base import BasePipeline if TYPE_CHECKING: - from .pq.abc import PGresult - from ._preparing import Key, Prepare # noqa: F401 from .connection import Connection - from ._cursor_base import BaseCursor # noqa: F401 - from ._connection_base import BaseConnection - from .connection_async import AsyncConnection - - -PendingResult: TypeAlias = ( - "tuple[BaseCursor[Any, Any], tuple[Key, Prepare, bytes] | None] | None" -) - -FATAL_ERROR = pq.ExecStatus.FATAL_ERROR -PIPELINE_ABORTED = pq.ExecStatus.PIPELINE_ABORTED -BAD = pq.ConnStatus.BAD - -ACTIVE = pq.TransactionStatus.ACTIVE logger = logging.getLogger("psycopg") -class BasePipeline: - command_queue: deque[PipelineCommand] - result_queue: deque[PendingResult] - - def __init__(self, conn: BaseConnection[Any]) -> None: - self._conn = conn - self.pgconn = conn.pgconn - self.command_queue = deque[PipelineCommand]() - self.result_queue = deque[PendingResult]() - self.level = 0 - - def __repr__(self) -> str: - cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}" - info = connection_summary(self._conn.pgconn) - return f"<{cls} {info} at 0x{id(self):x}>" - - @property - def status(self) -> pq.PipelineStatus: - return pq.PipelineStatus(self.pgconn.pipeline_status) - - @classmethod - def is_supported(cls) -> bool: - """Return `!True` if the psycopg libpq wrapper supports pipeline mode.""" - return capabilities.has_pipeline() - - def _enter_gen(self) -> PQGen[None]: - capabilities.has_pipeline(check=True) - if self.level == 0: - self.pgconn.enter_pipeline_mode() - elif self.command_queue or self.pgconn.transaction_status == ACTIVE: - # Nested pipeline case. - # Transaction might be ACTIVE when the pipeline uses an "implicit - # transaction", typically in autocommit mode. But when entering a - # Psycopg transaction(), we expect the IDLE state. By sync()-ing, - # we make sure all previous commands are completed and the - # transaction gets back to IDLE. - yield from self._sync_gen() - self.level += 1 - - def _exit(self, exc: BaseException | None) -> None: - self.level -= 1 - if self.level == 0 and self.pgconn.status != BAD: - try: - self.pgconn.exit_pipeline_mode() - except e.OperationalError as exc2: - # Notice that this error might be pretty irrecoverable. It - # happens on COPY, for instance: even if sync succeeds, exiting - # fails with "cannot exit pipeline mode with uncollected results" - if exc: - logger.warning("error ignored exiting %r: %s", self, exc2) - else: - raise exc2.with_traceback(None) - - def _sync_gen(self) -> PQGen[None]: - self._enqueue_sync() - yield from self._communicate_gen() - yield from self._fetch_gen(flush=False) - - def _exit_gen(self) -> PQGen[None]: - """ - Exit current pipeline by sending a Sync and fetch back all remaining results. - """ - try: - self._enqueue_sync() - yield from self._communicate_gen() - finally: - yield from self._fetch_gen(flush=True) - - def _communicate_gen(self) -> PQGen[None]: - """Communicate with pipeline to send commands and possibly fetch - results, which are then processed. - """ - fetched = yield from pipeline_communicate(self.pgconn, self.command_queue) - exception = None - for results in fetched: - queued = self.result_queue.popleft() - try: - self._process_results(queued, results) - except e.Error as exc: - if exception is None: - exception = exc - if exception is not None: - raise exception - - def _fetch_gen(self, *, flush: bool) -> PQGen[None]: - """Fetch available results from the connection and process them with - pipeline queued items. - - If 'flush' is True, a PQsendFlushRequest() is issued in order to make - sure results can be fetched. Otherwise, the caller may emit a - PQpipelineSync() call to ensure the output buffer gets flushed before - fetching. - """ - if not self.result_queue: - return - - if flush: - self.pgconn.send_flush_request() - yield from send(self.pgconn) - - exception = None - while self.result_queue: - if not (results := (yield from fetch_many(self.pgconn))): - # No more results to fetch, but there may still be pending - # commands. - break - queued = self.result_queue.popleft() - try: - self._process_results(queued, results) - except e.Error as exc: - if exception is None: - exception = exc - if exception is not None: - raise exception - - def _process_results(self, queued: PendingResult, results: list[PGresult]) -> None: - """Process a results set fetched from the current pipeline. - - This matches 'results' with its respective element in the pipeline - queue. For commands (None value in the pipeline queue), results are - checked directly. For prepare statement creation requests, update the - cache. Otherwise, results are attached to their respective cursor. - """ - if queued is None: - (result,) = results - if result.status == FATAL_ERROR: - raise e.error_from_result(result, encoding=self.pgconn._encoding) - elif result.status == PIPELINE_ABORTED: - raise e.PipelineAborted("pipeline aborted") - else: - cursor, prepinfo = queued - if prepinfo: - key, prep, name = prepinfo - # Update the prepare state of the query. - cursor._conn._prepared.validate(key, prep, name, results) - cursor._check_results(results) - cursor._set_results(results) - - def _enqueue_sync(self) -> None: - """Enqueue a PQpipelineSync() command.""" - self.command_queue.append(self.pgconn.pipeline_sync) - self.result_queue.append(None) - - class Pipeline(BasePipeline): """Handler for connection in pipeline mode.""" @@ -226,43 +61,3 @@ class Pipeline(BasePipeline): raise exc2.with_traceback(None) finally: self._exit(exc_val) - - -class AsyncPipeline(BasePipeline): - """Handler for async connection in pipeline mode.""" - - __module__ = "psycopg" - _conn: AsyncConnection[Any] - - def __init__(self, conn: AsyncConnection[Any]) -> None: - super().__init__(conn) - - async def sync(self) -> None: - try: - async with self._conn.lock: - await self._conn.wait(self._sync_gen()) - except e._NO_TRACEBACK as ex: - raise ex.with_traceback(None) - - async def __aenter__(self) -> Self: - async with self._conn.lock: - await self._conn.wait(self._enter_gen()) - return self - - async def __aexit__( - self, - exc_type: type[BaseException] | None, - exc_val: BaseException | None, - exc_tb: TracebackType | None, - ) -> None: - try: - async with self._conn.lock: - await self._conn.wait(self._exit_gen()) - except Exception as exc2: - # Don't clobber an exception raised in the block with this one - if exc_val: - logger.warning("error ignored terminating %r: %s", self, exc2) - else: - raise exc2.with_traceback(None) - finally: - self._exit(exc_val) diff --git a/psycopg/psycopg/_pipeline_async.py b/psycopg/psycopg/_pipeline_async.py new file mode 100644 index 000000000..dd4deca65 --- /dev/null +++ b/psycopg/psycopg/_pipeline_async.py @@ -0,0 +1,60 @@ +""" +commands pipeline management +""" + +# Copyright (C) 2021 The Psycopg Team + +from __future__ import annotations + +import logging +from types import TracebackType +from typing import TYPE_CHECKING, Any + +from . import errors as e +from ._compat import Self +from ._pipeline_base import BasePipeline + +if TYPE_CHECKING: + from .connection_async import AsyncConnection + +logger = logging.getLogger("psycopg") + + +class AsyncPipeline(BasePipeline): + """Handler for async connection in pipeline mode.""" + + __module__ = "psycopg" + _conn: AsyncConnection[Any] + + def __init__(self, conn: AsyncConnection[Any]) -> None: + super().__init__(conn) + + async def sync(self) -> None: + try: + async with self._conn.lock: + await self._conn.wait(self._sync_gen()) + except e._NO_TRACEBACK as ex: + raise ex.with_traceback(None) + + async def __aenter__(self) -> Self: + async with self._conn.lock: + await self._conn.wait(self._enter_gen()) + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + try: + async with self._conn.lock: + await self._conn.wait(self._exit_gen()) + except Exception as exc2: + # Don't clobber an exception raised in the block with this one + if exc_val: + logger.warning("error ignored terminating %r: %s", self, exc2) + else: + raise exc2.with_traceback(None) + finally: + self._exit(exc_val) diff --git a/psycopg/psycopg/_pipeline_base.py b/psycopg/psycopg/_pipeline_base.py new file mode 100644 index 000000000..3a9a9da00 --- /dev/null +++ b/psycopg/psycopg/_pipeline_base.py @@ -0,0 +1,181 @@ +""" +commands pipeline management +""" + +# Copyright (C) 2021 The Psycopg Team + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any, TypeAlias +from collections import deque + +from . import errors as e +from . import pq +from .abc import PipelineCommand, PQGen +from .pq.misc import connection_summary +from .generators import fetch_many, pipeline_communicate, send +from ._capabilities import capabilities + +if TYPE_CHECKING: + from .pq.abc import PGresult + from ._preparing import Key, Prepare # noqa: F401 + from ._cursor_base import BaseCursor # noqa: F401 + from ._connection_base import BaseConnection + + +PendingResult: TypeAlias = ( + "tuple[BaseCursor[Any, Any], tuple[Key, Prepare, bytes] | None] | None" +) + +FATAL_ERROR = pq.ExecStatus.FATAL_ERROR +PIPELINE_ABORTED = pq.ExecStatus.PIPELINE_ABORTED +BAD = pq.ConnStatus.BAD + +ACTIVE = pq.TransactionStatus.ACTIVE + +logger = logging.getLogger("psycopg") + + +class BasePipeline: + command_queue: deque[PipelineCommand] + result_queue: deque[PendingResult] + + def __init__(self, conn: BaseConnection[Any]) -> None: + self._conn = conn + self.pgconn = conn.pgconn + self.command_queue = deque[PipelineCommand]() + self.result_queue = deque[PendingResult]() + self.level = 0 + + def __repr__(self) -> str: + cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}" + info = connection_summary(self._conn.pgconn) + return f"<{cls} {info} at 0x{id(self):x}>" + + @property + def status(self) -> pq.PipelineStatus: + return pq.PipelineStatus(self.pgconn.pipeline_status) + + @classmethod + def is_supported(cls) -> bool: + """Return `!True` if the psycopg libpq wrapper supports pipeline mode.""" + return capabilities.has_pipeline() + + def _enter_gen(self) -> PQGen[None]: + capabilities.has_pipeline(check=True) + if self.level == 0: + self.pgconn.enter_pipeline_mode() + elif self.command_queue or self.pgconn.transaction_status == ACTIVE: + # Nested pipeline case. + # Transaction might be ACTIVE when the pipeline uses an "implicit + # transaction", typically in autocommit mode. But when entering a + # Psycopg transaction(), we expect the IDLE state. By sync()-ing, + # we make sure all previous commands are completed and the + # transaction gets back to IDLE. + yield from self._sync_gen() + self.level += 1 + + def _exit(self, exc: BaseException | None) -> None: + self.level -= 1 + if self.level == 0 and self.pgconn.status != BAD: + try: + self.pgconn.exit_pipeline_mode() + except e.OperationalError as exc2: + # Notice that this error might be pretty irrecoverable. It + # happens on COPY, for instance: even if sync succeeds, exiting + # fails with "cannot exit pipeline mode with uncollected results" + if exc: + logger.warning("error ignored exiting %r: %s", self, exc2) + else: + raise exc2.with_traceback(None) + + def _sync_gen(self) -> PQGen[None]: + self._enqueue_sync() + yield from self._communicate_gen() + yield from self._fetch_gen(flush=False) + + def _exit_gen(self) -> PQGen[None]: + """ + Exit current pipeline by sending a Sync and fetch back all remaining results. + """ + try: + self._enqueue_sync() + yield from self._communicate_gen() + finally: + yield from self._fetch_gen(flush=True) + + def _communicate_gen(self) -> PQGen[None]: + """Communicate with pipeline to send commands and possibly fetch + results, which are then processed. + """ + fetched = yield from pipeline_communicate(self.pgconn, self.command_queue) + exception = None + for results in fetched: + queued = self.result_queue.popleft() + try: + self._process_results(queued, results) + except e.Error as exc: + if exception is None: + exception = exc + if exception is not None: + raise exception + + def _fetch_gen(self, *, flush: bool) -> PQGen[None]: + """Fetch available results from the connection and process them with + pipeline queued items. + + If 'flush' is True, a PQsendFlushRequest() is issued in order to make + sure results can be fetched. Otherwise, the caller may emit a + PQpipelineSync() call to ensure the output buffer gets flushed before + fetching. + """ + if not self.result_queue: + return + + if flush: + self.pgconn.send_flush_request() + yield from send(self.pgconn) + + exception = None + while self.result_queue: + if not (results := (yield from fetch_many(self.pgconn))): + # No more results to fetch, but there may still be pending + # commands. + break + queued = self.result_queue.popleft() + try: + self._process_results(queued, results) + except e.Error as exc: + if exception is None: + exception = exc + if exception is not None: + raise exception + + def _process_results(self, queued: PendingResult, results: list[PGresult]) -> None: + """Process a results set fetched from the current pipeline. + + This matches 'results' with its respective element in the pipeline + queue. For commands (None value in the pipeline queue), results are + checked directly. For prepare statement creation requests, update the + cache. Otherwise, results are attached to their respective cursor. + """ + if queued is None: + (result,) = results + if result.status == FATAL_ERROR: + raise e.error_from_result(result, encoding=self.pgconn._encoding) + elif result.status == PIPELINE_ABORTED: + raise e.PipelineAborted("pipeline aborted") + else: + cursor, prepinfo = queued + if prepinfo: + key, prep, name = prepinfo + # Update the prepare state of the query. + cursor._conn._prepared.validate(key, prep, name, results) + cursor._check_results(results) + cursor._set_results(results) + + def _enqueue_sync(self) -> None: + """Enqueue a PQpipelineSync() command.""" + self.command_queue.append(self.pgconn.pipeline_sync) + self.result_queue.append(None) diff --git a/psycopg/psycopg/connection_async.py b/psycopg/psycopg/connection_async.py index d78775093..408f3827f 100644 --- a/psycopg/psycopg/connection_async.py +++ b/psycopg/psycopg/connection_async.py @@ -24,11 +24,11 @@ from ._compat import Self from ._acompat import ALock from .conninfo import conninfo_attempts_async, conninfo_to_dict, make_conninfo from .conninfo import timeout_from_conninfo -from ._pipeline import AsyncPipeline from .generators import notifies from .transaction import AsyncTransaction from .cursor_async import AsyncCursor from ._capabilities import capabilities +from ._pipeline_async import AsyncPipeline from ._connection_base import BaseConnection, CursorRow, Notify from ._server_cursor_async import AsyncServerCursor diff --git a/tools/async_to_sync.py b/tools/async_to_sync.py index 888417abe..6b3012df2 100755 --- a/tools/async_to_sync.py +++ b/tools/async_to_sync.py @@ -312,6 +312,7 @@ class RenameAsyncToSync(ast.NodeTransformer): # type: ignore "__aiter__": "__iter__", "__anext__": "__next__", "_copy_async": "_copy", + "_pipeline_async": "_pipeline", "_server_cursor_async": "_server_cursor", "aclose": "close", "aclosing": "closing", -- 2.47.3