]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
refactor: Pipeline classes moved to their module
authorDenis Laxalde <denis@laxalde.org>
Sun, 27 Mar 2022 10:41:56 +0000 (12:41 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 2 Apr 2022 23:17:57 +0000 (01:17 +0200)
psycopg/psycopg/__init__.py
psycopg/psycopg/_pipeline.py [new file with mode: 0644]
psycopg/psycopg/abc.py
psycopg/psycopg/connection.py
psycopg/psycopg/connection_async.py
psycopg/psycopg/generators.py
psycopg_c/psycopg_c/_psycopg.pyi
psycopg_c/psycopg_c/_psycopg/generators.pyx
tests/scripts/pipeline-demo.py

index 4879bcdf6cfbbb760aef4750794d6f0fab8604ee..c86d268226c347d0ba40a8f792e4a589509cfb6f 100644 (file)
@@ -18,11 +18,12 @@ from .errors import DataError, OperationalError, IntegrityError
 from .errors import InternalError, ProgrammingError, NotSupportedError
 from ._column import Column
 from .conninfo import ConnectionInfo
-from .connection import BaseConnection, Connection, Notify, Pipeline
+from ._pipeline import Pipeline, AsyncPipeline
+from .connection import BaseConnection, Connection, Notify
 from .transaction import Rollback, Transaction, AsyncTransaction
 from .cursor_async import AsyncCursor
 from .server_cursor import AsyncServerCursor, ServerCursor
-from .connection_async import AsyncConnection, AsyncPipeline
+from .connection_async import AsyncConnection
 
 from . import dbapi20
 from .dbapi20 import BINARY, DATETIME, NUMBER, ROWID, STRING
diff --git a/psycopg/psycopg/_pipeline.py b/psycopg/psycopg/_pipeline.py
new file mode 100644 (file)
index 0000000..b2b32f0
--- /dev/null
@@ -0,0 +1,158 @@
+"""
+commands pipeline management
+"""
+
+# Copyright (C) 2021 The Psycopg Team
+
+from types import TracebackType
+from typing import Any, List, Optional, Union, Tuple, Type, TYPE_CHECKING
+
+from . import pq
+from . import errors as e
+from .pq import ExecStatus
+from .abc import PipelineCommand, PQGen
+from ._compat import Deque, TypeAlias
+from ._cmodule import _psycopg
+from ._encodings import pgconn_encoding
+from ._preparing import Key, Prepare
+
+if TYPE_CHECKING:
+    from .pq.abc import PGconn, PGresult
+    from .cursor import BaseCursor
+
+if _psycopg:
+    pipeline_communicate = _psycopg.pipeline_communicate
+    fetch_many = _psycopg.fetch_many
+    send = _psycopg.send
+
+else:
+    from . import generators
+
+    pipeline_communicate = generators.pipeline_communicate
+    fetch_many = generators.fetch_many
+    send = generators.send
+
+PendingResult: TypeAlias = Union[
+    None, Tuple["BaseCursor[Any, Any]", Optional[Tuple[Key, Prepare, bytes]]]
+]
+
+
+class BasePipeline:
+    def __init__(self, pgconn: "PGconn") -> None:
+        self.pgconn = pgconn
+        self.command_queue = Deque[PipelineCommand]()
+        self.result_queue = Deque[PendingResult]()
+
+    @property
+    def status(self) -> pq.PipelineStatus:
+        return pq.PipelineStatus(self.pgconn.pipeline_status)
+
+    def sync(self) -> None:
+        """Enqueue a PQpipelineSync() command."""
+        self.command_queue.append(self.pgconn.pipeline_sync)
+        self.result_queue.append(None)
+
+    def _enter(self) -> None:
+        self.pgconn.enter_pipeline_mode()
+
+    def _exit(self) -> None:
+        self.pgconn.exit_pipeline_mode()
+
+    def _communicate_gen(self) -> PQGen[None]:
+        """Communicate with pipeline to send commands and possibly fetch
+        results, which are then processed.
+        """
+        fetched = yield from pipeline_communicate(self.pgconn, self.command_queue)
+        to_process = [(self.result_queue.popleft(), results) for results in fetched]
+        for queued, results in to_process:
+            self._process_results(queued, results)
+
+    def _fetch_gen(self, *, flush: bool) -> PQGen[None]:
+        """Fetch available results from the connection and process them with
+        pipeline queued items.
+
+        If 'flush' is True, a PQsendFlushRequest() is issued in order to make
+        sure results can be fetched. Otherwise, the caller may emit a
+        PQpipelineSync() call to ensure the output buffer gets flushed before
+        fetching.
+        """
+        if not self.result_queue:
+            return
+
+        if flush:
+            self.pgconn.send_flush_request()
+            yield from send(self.pgconn)
+
+        to_process = []
+        while self.result_queue:
+            results = yield from fetch_many(self.pgconn)
+            if not results:
+                # No more results to fetch, but there may still be pending
+                # commands.
+                break
+            queued = self.result_queue.popleft()
+            to_process.append((queued, results))
+
+        for queued, results in to_process:
+            self._process_results(queued, results)
+
+    def _process_results(
+        self, queued: PendingResult, results: List["PGresult"]
+    ) -> None:
+        """Process a results set fetched from the current pipeline.
+
+        This matchs 'results' with its respective element in the pipeline
+        queue. For commands (None value in the pipeline queue), results are
+        checked directly. For prepare statement creation requests, update the
+        cache. Otherwise, results are attached to their respective cursor.
+        """
+        if queued is None:
+            (result,) = results
+            if result.status == ExecStatus.FATAL_ERROR:
+                raise e.error_from_result(result, encoding=pgconn_encoding(self.pgconn))
+            elif result.status == ExecStatus.PIPELINE_ABORTED:
+                raise e.OperationalError("pipeline aborted")
+        else:
+            cursor, prepinfo = queued
+            cursor._check_results(results)
+            if not cursor._results:
+                cursor._results = results
+                cursor._set_current_result(0)
+            else:
+                cursor._results.extend(results)
+            if prepinfo:
+                key, prep, name = prepinfo
+                # Update the prepare state of the query.
+                cursor._conn._prepared.validate(key, prep, name, results)
+
+
+class Pipeline(BasePipeline):
+    """Handler for connection in pipeline mode."""
+
+    def __enter__(self) -> "Pipeline":
+        self._enter()
+        return self
+
+    def __exit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exc_val: Optional[BaseException],
+        exc_tb: Optional[TracebackType],
+    ) -> None:
+        self._exit()
+
+
+class AsyncPipeline(BasePipeline):
+    """Handler for async connection in pipeline mode."""
+
+    async def __aenter__(self) -> "AsyncPipeline":
+        self._enter()
+        return self
+
+    async def __aexit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exc_val: Optional[BaseException],
+        exc_tb: Optional[TracebackType],
+    ) -> None:
+        self._exit()
index 42a2614b4293cc95fd81fa53befbd849e510b07e..d23873fe56e25edfe5102f5d0d93af20c4b5ddee 100644 (file)
@@ -26,7 +26,7 @@ Buffer: TypeAlias = Union[bytes, bytearray, memoryview]
 Query: TypeAlias = Union[str, bytes, "Composable"]
 Params: TypeAlias = Union[Sequence[Any], Mapping[str, Any]]
 ConnectionType = TypeVar("ConnectionType", bound="BaseConnection[Any]")
-Command = Callable[[], None]
+PipelineCommand: TypeAlias = Callable[[], None]
 
 # TODO: make it recursive when mypy will support it
 # DumperKey: TypeAlias = Union[type, Tuple[Union[type, "DumperKey"]]]
index c3b1c44c14700bcd620e82a72f5beb4a365bb66d..dc440967b18dc2bd38766db3299c2820a17601df 100644 (file)
@@ -20,7 +20,7 @@ from . import errors as e
 from . import waiting
 from . import postgres
 from .pq import ConnStatus, ExecStatus, TransactionStatus, Format
-from .abc import AdaptContext, Command, ConnectionType, Params, Query, RV
+from .abc import AdaptContext, ConnectionType, Params, Query, RV
 from .abc import PQGen, PQGenConn
 from .sql import Composable, SQL
 from ._tpc import Xid
@@ -28,25 +28,22 @@ from .rows import Row, RowFactory, tuple_row, TupleRow, args_row
 from .adapt import AdaptersMap
 from ._enums import IsolationLevel
 from .cursor import Cursor
-from ._compat import Deque, TypeAlias
+from ._compat import TypeAlias
 from ._cmodule import _psycopg
 from .conninfo import make_conninfo, conninfo_to_dict, ConnectionInfo
+from ._pipeline import BasePipeline, Pipeline
 from .generators import notifies
 from ._encodings import pgconn_encoding
-from ._preparing import Key, Prepare, PrepareManager
+from ._preparing import PrepareManager
 from .transaction import Transaction
 from .server_cursor import ServerCursor
 
 if TYPE_CHECKING:
     from .pq.abc import PGconn, PGresult
-    from .cursor import BaseCursor
     from psycopg_pool.base import BasePool
 
 logger = logging.getLogger("psycopg")
 
-connect: Callable[[str], PQGenConn["PGconn"]]
-execute: Callable[["PGconn"], PQGen[List["PGresult"]]]
-
 # Row Type variable for Cursor (when it needs to be distinguished from the
 # connection's one)
 CursorRow = TypeVar("CursorRow")
@@ -54,18 +51,12 @@ CursorRow = TypeVar("CursorRow")
 if _psycopg:
     connect = _psycopg.connect
     execute = _psycopg.execute
-    fetch_many = _psycopg.fetch_many
-    pipeline_communicate = _psycopg.pipeline_communicate
-    send = _psycopg.send
 
 else:
     from . import generators
 
     connect = generators.connect
     execute = generators.execute
-    fetch_many = generators.fetch_many
-    pipeline_communicate = generators.pipeline_communicate
-    send = generators.send
 
 
 class Notify(NamedTuple):
@@ -86,119 +77,6 @@ Notify.__module__ = "psycopg"
 NoticeHandler: TypeAlias = Callable[[e.Diagnostic], None]
 NotifyHandler: TypeAlias = Callable[[Notify], None]
 
-PipelinePendingResult = Union[
-    None,
-    Tuple[
-        "BaseCursor[Any, Any]",
-        Optional[Tuple[Key, Prepare, bytes]],
-    ],
-]
-
-
-class BasePipeline:
-    def __init__(self, pgconn: "PGconn") -> None:
-        self.pgconn = pgconn
-        self.command_queue = Deque[Command]()
-        self.result_queue = Deque[PipelinePendingResult]()
-
-    @property
-    def status(self) -> pq.PipelineStatus:
-        return pq.PipelineStatus(self.pgconn.pipeline_status)
-
-    def sync(self) -> None:
-        """Enqueue a PQpipelineSync() command."""
-        self.command_queue.append(self.pgconn.pipeline_sync)
-        self.result_queue.append(None)
-
-    def _enter(self) -> None:
-        self.pgconn.enter_pipeline_mode()
-
-    def _exit(self) -> None:
-        self.pgconn.exit_pipeline_mode()
-
-    def _communicate_gen(self) -> PQGen[None]:
-        """Communicate with pipeline to send commands and possibly fetch
-        results, which are then processed.
-        """
-        fetched = yield from pipeline_communicate(self.pgconn, self.command_queue)
-        to_process = [(self.result_queue.popleft(), results) for results in fetched]
-        for queued, results in to_process:
-            self._process_results(queued, results)
-
-    def _fetch_gen(self, *, flush: bool) -> PQGen[None]:
-        """Fetch available results from the connection and process them with
-        pipeline queued items.
-
-        If 'flush' is True, a PQsendFlushRequest() is issued in order to make
-        sure results can be fetched. Otherwise, the caller may emit a
-        PQpipelineSync() call to ensure the output buffer gets flushed before
-        fetching.
-        """
-        if not self.result_queue:
-            return
-
-        if flush:
-            self.pgconn.send_flush_request()
-            yield from send(self.pgconn)
-
-        to_process = []
-        while self.result_queue:
-            results = yield from fetch_many(self.pgconn)
-            if not results:
-                # No more results to fetch, but there may still be pending
-                # commands.
-                break
-            queued = self.result_queue.popleft()
-            to_process.append((queued, results))
-
-        for queued, results in to_process:
-            self._process_results(queued, results)
-
-    def _process_results(
-        self, queued: PipelinePendingResult, results: List["PGresult"]
-    ) -> None:
-        """Process a results set fetched from the current pipeline.
-
-        This matchs 'results' with its respective element in the pipeline
-        queue. For commands (None value in the pipeline queue), results are
-        checked directly. For prepare statement creation requests, update the
-        cache. Otherwise, results are attached to their respective cursor.
-        """
-        if queued is None:
-            (result,) = results
-            if result.status == ExecStatus.FATAL_ERROR:
-                raise e.error_from_result(result, encoding=pgconn_encoding(self.pgconn))
-            elif result.status == ExecStatus.PIPELINE_ABORTED:
-                raise e.OperationalError("pipeline aborted")
-        else:
-            cursor, prepinfo = queued
-            cursor._check_results(results)
-            if not cursor._results:
-                cursor._results = results
-                cursor._set_current_result(0)
-            else:
-                cursor._results.extend(results)
-            if prepinfo:
-                key, prep, name = prepinfo
-                # Update the prepare state of the query.
-                cursor._conn._prepared.validate(key, prep, name, results)
-
-
-class Pipeline(BasePipeline):
-    """Handler for connection in pipeline mode."""
-
-    def __enter__(self) -> "Pipeline":
-        self._enter()
-        return self
-
-    def __exit__(
-        self,
-        exc_type: Optional[Type[BaseException]],
-        exc_val: Optional[BaseException],
-        exc_tb: Optional[TracebackType],
-    ) -> None:
-        self._exit()
-
 
 class BaseConnection(Generic[Row]):
     """
index 5208c8b245c1457f0db4a38ab5b955e15ae0f272..a5129148384916adfdff17da79a7e90e2c7911bc 100644 (file)
@@ -21,8 +21,9 @@ from .rows import Row, AsyncRowFactory, tuple_row, TupleRow, args_row
 from .adapt import AdaptersMap
 from ._enums import IsolationLevel
 from .conninfo import make_conninfo, conninfo_to_dict
+from ._pipeline import AsyncPipeline
 from ._encodings import pgconn_encoding
-from .connection import BaseConnection, BasePipeline, CursorRow, Notify
+from .connection import BaseConnection, CursorRow, Notify
 from .generators import notifies
 from .transaction import AsyncTransaction
 from .cursor_async import AsyncCursor
@@ -35,22 +36,6 @@ if TYPE_CHECKING:
 logger = logging.getLogger("psycopg")
 
 
-class AsyncPipeline(BasePipeline):
-    """Handler for async connection in pipeline mode."""
-
-    async def __aenter__(self) -> "AsyncPipeline":
-        self._enter()
-        return self
-
-    async def __aexit__(
-        self,
-        exc_type: Optional[Type[BaseException]],
-        exc_val: Optional[BaseException],
-        exc_tb: Optional[TracebackType],
-    ) -> None:
-        self._exit()
-
-
 class AsyncConnection(BaseConnection[Row]):
     """
     Asynchronous wrapper for a connection to the database.
@@ -62,7 +47,7 @@ class AsyncConnection(BaseConnection[Row]):
     server_cursor_factory: Type[AsyncServerCursor[Row]]
     row_factory: AsyncRowFactory[Row]
 
-    _pipeline: Optional[AsyncPipeline]
+    _pipeline: "Optional[AsyncPipeline]"
 
     def __init__(
         self,
index 2a4aeaf5df2d3a024f4bff689b151a6ad9678cf3..4dd2d72eceecda01bb4ad91335086ba925677d32 100644 (file)
@@ -21,7 +21,7 @@ from typing import List, Optional, Union
 from . import pq
 from . import errors as e
 from .pq import ConnStatus, PollingStatus, ExecStatus
-from .abc import Command, PQGen, PQGenConn
+from .abc import PipelineCommand, PQGen, PQGenConn
 from .pq.abc import PGconn, PGresult
 from .waiting import Wait, Ready
 from ._compat import Deque
@@ -161,7 +161,7 @@ def fetch(pgconn: PGconn) -> PQGen[Optional[PGresult]]:
 
 
 def pipeline_communicate(
-    pgconn: PGconn, commands: Deque[Command]
+    pgconn: PGconn, commands: Deque[PipelineCommand]
 ) -> PQGen[List[List[PGresult]]]:
     """Generator to send queries from a connection in pipeline mode while also
     receiving results.
index f415fda5e6831cd7b211a15dd125360ee69447b0..6668e337f4b8396e9217cc873e4e7688b7a6ce0f 100644 (file)
@@ -12,7 +12,7 @@ from typing import Any, Iterable, List, Optional, Sequence, Tuple
 from psycopg import pq
 from psycopg import abc
 from psycopg.rows import Row, RowMaker
-from psycopg.abc import Command
+from psycopg.abc import PipelineCommand
 from psycopg.adapt import AdaptersMap, PyFormat
 from psycopg.pq.abc import PGconn, PGresult
 from psycopg.connection import BaseConnection
@@ -53,7 +53,7 @@ def send(pgconn: PGconn) -> abc.PQGen[None]: ...
 def fetch_many(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ...
 def fetch(pgconn: PGconn) -> abc.PQGen[Optional[PGresult]]: ...
 def pipeline_communicate(
-    pgconn: PGconn, commands: Deque[Command]
+    pgconn: PGconn, commands: Deque[PipelineCommand]
 ) -> abc.PQGen[List[List[PGresult]]]: ...
 
 # Copy support
index 86461024e30f784bc5d68431702a899aa261b635..8d623a4e18c81f0d3d365244aaa7078294705b72 100644 (file)
@@ -11,7 +11,7 @@ from typing import List
 
 from psycopg import errors as e
 from psycopg.pq import abc, error_message
-from psycopg.abc import Command, PQGen
+from psycopg.abc import PipelineCommand, PQGen
 from psycopg.waiting import Wait, Ready
 from psycopg._compat import Deque
 from psycopg._encodings import conninfo_encoding
@@ -196,7 +196,9 @@ def fetch(pq.PGconn pgconn) -> PQGen[Optional[PGresult]]:
     return pq.PGresult._from_ptr(pgres)
 
 
-def pipeline_communicate(pq.PGconn pgconn, commands: Deque[Command]) -> PQGen[List[List[PGresult]]]:
+def pipeline_communicate(
+    pq.PGconn pgconn, commands: Deque[PipelineCommand]
+) -> PQGen[List[List[PGresult]]]:
     """Generator to send queries from a connection in pipeline mode while also
     receiving results.
 
index e7719e9bc12cb62aed2318c4b98e588789910584..e3cac248d7e186673c4488275d82d561a598c37f 100644 (file)
@@ -17,7 +17,7 @@ from typing import Any, Iterator, Optional, Sequence, Tuple
 from psycopg import AsyncConnection, Connection
 from psycopg import pq, waiting
 from psycopg import errors as e
-from psycopg.abc import Command
+from psycopg.abc import PipelineCommand
 from psycopg.generators import pipeline_communicate
 from psycopg.pq._enums import Format
 from psycopg._compat import Deque
@@ -94,7 +94,7 @@ class LoggingPGconn:
 @contextmanager
 def prepare_pipeline_demo_pq(
     pgconn: LoggingPGconn, rows_to_send: int, logger: logging.Logger
-) -> Iterator[Tuple[Deque[Command], Deque[str]]]:
+) -> Iterator[Tuple[Deque[PipelineCommand], Deque[str]]]:
     """Set up pipeline demo with initial queries and yield commands and
     results queue for pipeline_communicate().
     """
@@ -120,7 +120,7 @@ def prepare_pipeline_demo_pq(
         ),
     ]
 
-    commands = Deque[Command]()
+    commands = Deque[PipelineCommand]()
     results_queue = Deque[str]()
 
     for qname, query in setup_queries: