]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
refactor: separate sync/async pipeline in their own modules
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 5 Sep 2025 23:48:48 +0000 (01:48 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 6 Sep 2025 17:10:40 +0000 (19:10 +0200)
psycopg/psycopg/__init__.py
psycopg/psycopg/_connection_base.py
psycopg/psycopg/_pipeline.py
psycopg/psycopg/_pipeline_async.py [new file with mode: 0644]
psycopg/psycopg/_pipeline_base.py [new file with mode: 0644]
psycopg/psycopg/connection_async.py
tools/async_to_sync.py

index 10f6f59e96c53d35e7865b3566d537ff291c58e8..89165a859a18a98f5c69d7b4a756b445e5ac693f 100644 (file)
@@ -19,7 +19,7 @@ from ._column import Column
 from .dbapi20 import BINARY, DATETIME, NUMBER, ROWID, STRING, Binary, Date
 from .dbapi20 import DateFromTicks, Time, TimeFromTicks, Timestamp, TimestampFromTicks
 from .version import __version__ as __version__  # noqa: F401
-from ._pipeline import AsyncPipeline, Pipeline
+from ._pipeline import Pipeline
 from .connection import Connection
 from .raw_cursor import AsyncRawCursor, AsyncRawServerCursor, RawCursor, RawServerCursor
 from .transaction import AsyncTransaction, Rollback, Transaction
@@ -27,6 +27,7 @@ from .cursor_async import AsyncCursor
 from ._capabilities import Capabilities, capabilities
 from .client_cursor import AsyncClientCursor, ClientCursor
 from ._server_cursor import ServerCursor
+from ._pipeline_async import AsyncPipeline
 from ._connection_base import BaseConnection, Notify
 from ._connection_info import ConnectionInfo
 from .connection_async import AsyncConnection
index b19d60bb41954b9128706ed4a0396d69c85f0211..c65e948abd4a632b899cf830dfdc75ec86406a8f 100644 (file)
@@ -23,9 +23,9 @@ from .adapt import AdaptersMap
 from ._enums import IsolationLevel
 from ._compat import Deque, LiteralString, Self, TypeAlias, TypeVar
 from .pq.misc import connection_summary
-from ._pipeline import BasePipeline
 from ._preparing import PrepareManager
 from ._capabilities import capabilities
+from ._pipeline_base import BasePipeline
 from ._connection_info import ConnectionInfo
 
 if TYPE_CHECKING:
index 8ae21b66fab69d0eca8ef22fcbe23c030bedf389..a32064af73c0240d0d50172f83667aa5eca57c55 100644 (file)
@@ -11,179 +11,15 @@ from types import TracebackType
 from typing import TYPE_CHECKING, Any
 
 from . import errors as e
-from . import pq
-from .abc import PipelineCommand, PQGen
-from ._compat import Deque, Self, TypeAlias
-from .pq.misc import connection_summary
-from .generators import fetch_many, pipeline_communicate, send
-from ._capabilities import capabilities
+from ._compat import Self
+from ._pipeline_base import BasePipeline
 
 if TYPE_CHECKING:
-    from .pq.abc import PGresult
-    from ._preparing import Key, Prepare  # noqa: F401
     from .connection import Connection
-    from ._cursor_base import BaseCursor  # noqa: F401
-    from ._connection_base import BaseConnection
-    from .connection_async import AsyncConnection
-
-
-PendingResult: TypeAlias = (
-    "tuple[BaseCursor[Any, Any], tuple[Key, Prepare, bytes] | None] | None"
-)
-
-FATAL_ERROR = pq.ExecStatus.FATAL_ERROR
-PIPELINE_ABORTED = pq.ExecStatus.PIPELINE_ABORTED
-BAD = pq.ConnStatus.BAD
-
-ACTIVE = pq.TransactionStatus.ACTIVE
 
 logger = logging.getLogger("psycopg")
 
 
-class BasePipeline:
-    command_queue: Deque[PipelineCommand]
-    result_queue: Deque[PendingResult]
-
-    def __init__(self, conn: BaseConnection[Any]) -> None:
-        self._conn = conn
-        self.pgconn = conn.pgconn
-        self.command_queue = Deque[PipelineCommand]()
-        self.result_queue = Deque[PendingResult]()
-        self.level = 0
-
-    def __repr__(self) -> str:
-        cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
-        info = connection_summary(self._conn.pgconn)
-        return f"<{cls} {info} at 0x{id(self):x}>"
-
-    @property
-    def status(self) -> pq.PipelineStatus:
-        return pq.PipelineStatus(self.pgconn.pipeline_status)
-
-    @classmethod
-    def is_supported(cls) -> bool:
-        """Return `!True` if the psycopg libpq wrapper supports pipeline mode."""
-        return capabilities.has_pipeline()
-
-    def _enter_gen(self) -> PQGen[None]:
-        capabilities.has_pipeline(check=True)
-        if self.level == 0:
-            self.pgconn.enter_pipeline_mode()
-        elif self.command_queue or self.pgconn.transaction_status == ACTIVE:
-            # Nested pipeline case.
-            #  Transaction might be ACTIVE when the pipeline uses an "implicit
-            #  transaction", typically in autocommit mode. But when entering a
-            #  Psycopg transaction(), we expect the IDLE state. By sync()-ing,
-            #  we make sure all previous commands are completed and the
-            #  transaction gets back to IDLE.
-            yield from self._sync_gen()
-        self.level += 1
-
-    def _exit(self, exc: BaseException | None) -> None:
-        self.level -= 1
-        if self.level == 0 and self.pgconn.status != BAD:
-            try:
-                self.pgconn.exit_pipeline_mode()
-            except e.OperationalError as exc2:
-                # Notice that this error might be pretty irrecoverable. It
-                # happens on COPY, for instance: even if sync succeeds, exiting
-                # fails with "cannot exit pipeline mode with uncollected results"
-                if exc:
-                    logger.warning("error ignored exiting %r: %s", self, exc2)
-                else:
-                    raise exc2.with_traceback(None)
-
-    def _sync_gen(self) -> PQGen[None]:
-        self._enqueue_sync()
-        yield from self._communicate_gen()
-        yield from self._fetch_gen(flush=False)
-
-    def _exit_gen(self) -> PQGen[None]:
-        """
-        Exit current pipeline by sending a Sync and fetch back all remaining results.
-        """
-        try:
-            self._enqueue_sync()
-            yield from self._communicate_gen()
-        finally:
-            yield from self._fetch_gen(flush=True)
-
-    def _communicate_gen(self) -> PQGen[None]:
-        """Communicate with pipeline to send commands and possibly fetch
-        results, which are then processed.
-        """
-        fetched = yield from pipeline_communicate(self.pgconn, self.command_queue)
-        exception = None
-        for results in fetched:
-            queued = self.result_queue.popleft()
-            try:
-                self._process_results(queued, results)
-            except e.Error as exc:
-                if exception is None:
-                    exception = exc
-        if exception is not None:
-            raise exception
-
-    def _fetch_gen(self, *, flush: bool) -> PQGen[None]:
-        """Fetch available results from the connection and process them with
-        pipeline queued items.
-
-        If 'flush' is True, a PQsendFlushRequest() is issued in order to make
-        sure results can be fetched. Otherwise, the caller may emit a
-        PQpipelineSync() call to ensure the output buffer gets flushed before
-        fetching.
-        """
-        if not self.result_queue:
-            return
-
-        if flush:
-            self.pgconn.send_flush_request()
-            yield from send(self.pgconn)
-
-        exception = None
-        while self.result_queue:
-            if not (results := (yield from fetch_many(self.pgconn))):
-                # No more results to fetch, but there may still be pending
-                # commands.
-                break
-            queued = self.result_queue.popleft()
-            try:
-                self._process_results(queued, results)
-            except e.Error as exc:
-                if exception is None:
-                    exception = exc
-        if exception is not None:
-            raise exception
-
-    def _process_results(self, queued: PendingResult, results: list[PGresult]) -> None:
-        """Process a results set fetched from the current pipeline.
-
-        This matches 'results' with its respective element in the pipeline
-        queue. For commands (None value in the pipeline queue), results are
-        checked directly. For prepare statement creation requests, update the
-        cache. Otherwise, results are attached to their respective cursor.
-        """
-        if queued is None:
-            (result,) = results
-            if result.status == FATAL_ERROR:
-                raise e.error_from_result(result, encoding=self.pgconn._encoding)
-            elif result.status == PIPELINE_ABORTED:
-                raise e.PipelineAborted("pipeline aborted")
-        else:
-            cursor, prepinfo = queued
-            if prepinfo:
-                key, prep, name = prepinfo
-                # Update the prepare state of the query.
-                cursor._conn._prepared.validate(key, prep, name, results)
-            cursor._check_results(results)
-            cursor._set_results(results)
-
-    def _enqueue_sync(self) -> None:
-        """Enqueue a PQpipelineSync() command."""
-        self.command_queue.append(self.pgconn.pipeline_sync)
-        self.result_queue.append(None)
-
-
 class Pipeline(BasePipeline):
     """Handler for connection in pipeline mode."""
 
@@ -225,43 +61,3 @@ class Pipeline(BasePipeline):
                 raise exc2.with_traceback(None)
         finally:
             self._exit(exc_val)
-
-
-class AsyncPipeline(BasePipeline):
-    """Handler for async connection in pipeline mode."""
-
-    __module__ = "psycopg"
-    _conn: AsyncConnection[Any]
-
-    def __init__(self, conn: AsyncConnection[Any]) -> None:
-        super().__init__(conn)
-
-    async def sync(self) -> None:
-        try:
-            async with self._conn.lock:
-                await self._conn.wait(self._sync_gen())
-        except e._NO_TRACEBACK as ex:
-            raise ex.with_traceback(None)
-
-    async def __aenter__(self) -> Self:
-        async with self._conn.lock:
-            await self._conn.wait(self._enter_gen())
-        return self
-
-    async def __aexit__(
-        self,
-        exc_type: type[BaseException] | None,
-        exc_val: BaseException | None,
-        exc_tb: TracebackType | None,
-    ) -> None:
-        try:
-            async with self._conn.lock:
-                await self._conn.wait(self._exit_gen())
-        except Exception as exc2:
-            # Don't clobber an exception raised in the block with this one
-            if exc_val:
-                logger.warning("error ignored terminating %r: %s", self, exc2)
-            else:
-                raise exc2.with_traceback(None)
-        finally:
-            self._exit(exc_val)
diff --git a/psycopg/psycopg/_pipeline_async.py b/psycopg/psycopg/_pipeline_async.py
new file mode 100644 (file)
index 0000000..dd4deca
--- /dev/null
@@ -0,0 +1,60 @@
+"""
+commands pipeline management
+"""
+
+# Copyright (C) 2021 The Psycopg Team
+
+from __future__ import annotations
+
+import logging
+from types import TracebackType
+from typing import TYPE_CHECKING, Any
+
+from . import errors as e
+from ._compat import Self
+from ._pipeline_base import BasePipeline
+
+if TYPE_CHECKING:
+    from .connection_async import AsyncConnection
+
+logger = logging.getLogger("psycopg")
+
+
+class AsyncPipeline(BasePipeline):
+    """Handler for async connection in pipeline mode."""
+
+    __module__ = "psycopg"
+    _conn: AsyncConnection[Any]
+
+    def __init__(self, conn: AsyncConnection[Any]) -> None:
+        super().__init__(conn)
+
+    async def sync(self) -> None:
+        try:
+            async with self._conn.lock:
+                await self._conn.wait(self._sync_gen())
+        except e._NO_TRACEBACK as ex:
+            raise ex.with_traceback(None)
+
+    async def __aenter__(self) -> Self:
+        async with self._conn.lock:
+            await self._conn.wait(self._enter_gen())
+        return self
+
+    async def __aexit__(
+        self,
+        exc_type: type[BaseException] | None,
+        exc_val: BaseException | None,
+        exc_tb: TracebackType | None,
+    ) -> None:
+        try:
+            async with self._conn.lock:
+                await self._conn.wait(self._exit_gen())
+        except Exception as exc2:
+            # Don't clobber an exception raised in the block with this one
+            if exc_val:
+                logger.warning("error ignored terminating %r: %s", self, exc2)
+            else:
+                raise exc2.with_traceback(None)
+        finally:
+            self._exit(exc_val)
diff --git a/psycopg/psycopg/_pipeline_base.py b/psycopg/psycopg/_pipeline_base.py
new file mode 100644 (file)
index 0000000..99dc104
--- /dev/null
@@ -0,0 +1,181 @@
+"""
+commands pipeline management
+"""
+
+# Copyright (C) 2021 The Psycopg Team
+
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING, Any
+
+from . import errors as e
+from . import pq
+from .abc import PipelineCommand, PQGen
+from ._compat import Deque, TypeAlias
+from .pq.misc import connection_summary
+from .generators import fetch_many, pipeline_communicate, send
+from ._capabilities import capabilities
+
+if TYPE_CHECKING:
+    from .pq.abc import PGresult
+    from ._preparing import Key, Prepare  # noqa: F401
+    from ._cursor_base import BaseCursor  # noqa: F401
+    from ._connection_base import BaseConnection
+
+
+PendingResult: TypeAlias = (
+    "tuple[BaseCursor[Any, Any], tuple[Key, Prepare, bytes] | None] | None"
+)
+
+FATAL_ERROR = pq.ExecStatus.FATAL_ERROR
+PIPELINE_ABORTED = pq.ExecStatus.PIPELINE_ABORTED
+BAD = pq.ConnStatus.BAD
+
+ACTIVE = pq.TransactionStatus.ACTIVE
+
+logger = logging.getLogger("psycopg")
+
+
+class BasePipeline:
+    command_queue: Deque[PipelineCommand]
+    result_queue: Deque[PendingResult]
+
+    def __init__(self, conn: BaseConnection[Any]) -> None:
+        self._conn = conn
+        self.pgconn = conn.pgconn
+        self.command_queue = Deque[PipelineCommand]()
+        self.result_queue = Deque[PendingResult]()
+        self.level = 0
+
+    def __repr__(self) -> str:
+        cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
+        info = connection_summary(self._conn.pgconn)
+        return f"<{cls} {info} at 0x{id(self):x}>"
+
+    @property
+    def status(self) -> pq.PipelineStatus:
+        return pq.PipelineStatus(self.pgconn.pipeline_status)
+
+    @classmethod
+    def is_supported(cls) -> bool:
+        """Return `!True` if the psycopg libpq wrapper supports pipeline mode."""
+        return capabilities.has_pipeline()
+
+    def _enter_gen(self) -> PQGen[None]:
+        capabilities.has_pipeline(check=True)
+        if self.level == 0:
+            self.pgconn.enter_pipeline_mode()
+        elif self.command_queue or self.pgconn.transaction_status == ACTIVE:
+            # Nested pipeline case.
+            #  Transaction might be ACTIVE when the pipeline uses an "implicit
+            #  transaction", typically in autocommit mode. But when entering a
+            #  Psycopg transaction(), we expect the IDLE state. By sync()-ing,
+            #  we make sure all previous commands are completed and the
+            #  transaction gets back to IDLE.
+            yield from self._sync_gen()
+        self.level += 1
+
+    def _exit(self, exc: BaseException | None) -> None:
+        self.level -= 1
+        if self.level == 0 and self.pgconn.status != BAD:
+            try:
+                self.pgconn.exit_pipeline_mode()
+            except e.OperationalError as exc2:
+                # Notice that this error might be pretty irrecoverable. It
+                # happens on COPY, for instance: even if sync succeeds, exiting
+                # fails with "cannot exit pipeline mode with uncollected results"
+                if exc:
+                    logger.warning("error ignored exiting %r: %s", self, exc2)
+                else:
+                    raise exc2.with_traceback(None)
+
+    def _sync_gen(self) -> PQGen[None]:
+        self._enqueue_sync()
+        yield from self._communicate_gen()
+        yield from self._fetch_gen(flush=False)
+
+    def _exit_gen(self) -> PQGen[None]:
+        """
+        Exit current pipeline by sending a Sync and fetch back all remaining results.
+        """
+        try:
+            self._enqueue_sync()
+            yield from self._communicate_gen()
+        finally:
+            yield from self._fetch_gen(flush=True)
+
+    def _communicate_gen(self) -> PQGen[None]:
+        """Communicate with pipeline to send commands and possibly fetch
+        results, which are then processed.
+        """
+        fetched = yield from pipeline_communicate(self.pgconn, self.command_queue)
+        exception = None
+        for results in fetched:
+            queued = self.result_queue.popleft()
+            try:
+                self._process_results(queued, results)
+            except e.Error as exc:
+                if exception is None:
+                    exception = exc
+        if exception is not None:
+            raise exception
+
+    def _fetch_gen(self, *, flush: bool) -> PQGen[None]:
+        """Fetch available results from the connection and process them with
+        pipeline queued items.
+
+        If 'flush' is True, a PQsendFlushRequest() is issued in order to make
+        sure results can be fetched. Otherwise, the caller may emit a
+        PQpipelineSync() call to ensure the output buffer gets flushed before
+        fetching.
+        """
+        if not self.result_queue:
+            return
+
+        if flush:
+            self.pgconn.send_flush_request()
+            yield from send(self.pgconn)
+
+        exception = None
+        while self.result_queue:
+            if not (results := (yield from fetch_many(self.pgconn))):
+                # No more results to fetch, but there may still be pending
+                # commands.
+                break
+            queued = self.result_queue.popleft()
+            try:
+                self._process_results(queued, results)
+            except e.Error as exc:
+                if exception is None:
+                    exception = exc
+        if exception is not None:
+            raise exception
+
+    def _process_results(self, queued: PendingResult, results: list[PGresult]) -> None:
+        """Process a results set fetched from the current pipeline.
+
+        This matches 'results' with its respective element in the pipeline
+        queue. For commands (None value in the pipeline queue), results are
+        checked directly. For prepare statement creation requests, update the
+        cache. Otherwise, results are attached to their respective cursor.
+        """
+        if queued is None:
+            (result,) = results
+            if result.status == FATAL_ERROR:
+                raise e.error_from_result(result, encoding=self.pgconn._encoding)
+            elif result.status == PIPELINE_ABORTED:
+                raise e.PipelineAborted("pipeline aborted")
+        else:
+            cursor, prepinfo = queued
+            if prepinfo:
+                key, prep, name = prepinfo
+                # Update the prepare state of the query.
+                cursor._conn._prepared.validate(key, prep, name, results)
+            cursor._check_results(results)
+            cursor._set_results(results)
+
+    def _enqueue_sync(self) -> None:
+        """Enqueue a PQpipelineSync() command."""
+        self.command_queue.append(self.pgconn.pipeline_sync)
+        self.result_queue.append(None)
index 43dc8c0e86471dcce5436965aa911b4947e064a6..77c8e1098b0a5eb6fa4f476534225f09206c45af 100644 (file)
@@ -23,12 +23,12 @@ from ._enums import IsolationLevel
 from ._compat import Self
 from .conninfo import conninfo_attempts_async, conninfo_to_dict, make_conninfo
 from .conninfo import timeout_from_conninfo
-from ._pipeline import AsyncPipeline
 from .generators import notifies
 from .transaction import AsyncTransaction
 from .cursor_async import AsyncCursor
 from ._capabilities import capabilities
 from ._conninfo_utils import gssapi_requested
+from ._pipeline_async import AsyncPipeline
 from ._connection_base import BaseConnection, CursorRow, Notify
 from ._server_cursor_async import AsyncServerCursor
 
index 24d8918cc3b62895036547f6ad971f6e07e79b69..d0895d9c4e32bef907665e68016a1d1cbe2d4911 100755 (executable)
@@ -300,6 +300,7 @@ class RenameAsyncToSync(ast.NodeTransformer):  # type: ignore
         "__aexit__": "__exit__",
         "__aiter__": "__iter__",
         "_copy_async": "_copy",
+        "_pipeline_async": "_pipeline",
         "_server_cursor_async": "_server_cursor",
         "aclose": "close",
         "aclosing": "closing",