From: Denis Laxalde Date: Sun, 27 Mar 2022 10:41:56 +0000 (+0200) Subject: refactor: Pipeline classes moved to their module X-Git-Tag: 3.1~146^2~11 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c5ceb1000b3ec19d8d82619edaaa00c2d9a3fbe2;p=thirdparty%2Fpsycopg.git refactor: Pipeline classes moved to their module --- diff --git a/psycopg/psycopg/__init__.py b/psycopg/psycopg/__init__.py index 4879bcdf6..c86d26822 100644 --- a/psycopg/psycopg/__init__.py +++ b/psycopg/psycopg/__init__.py @@ -18,11 +18,12 @@ from .errors import DataError, OperationalError, IntegrityError from .errors import InternalError, ProgrammingError, NotSupportedError from ._column import Column from .conninfo import ConnectionInfo -from .connection import BaseConnection, Connection, Notify, Pipeline +from ._pipeline import Pipeline, AsyncPipeline +from .connection import BaseConnection, Connection, Notify from .transaction import Rollback, Transaction, AsyncTransaction from .cursor_async import AsyncCursor from .server_cursor import AsyncServerCursor, ServerCursor -from .connection_async import AsyncConnection, AsyncPipeline +from .connection_async import AsyncConnection from . import dbapi20 from .dbapi20 import BINARY, DATETIME, NUMBER, ROWID, STRING diff --git a/psycopg/psycopg/_pipeline.py b/psycopg/psycopg/_pipeline.py new file mode 100644 index 000000000..b2b32f0bd --- /dev/null +++ b/psycopg/psycopg/_pipeline.py @@ -0,0 +1,158 @@ +""" +commands pipeline management +""" + +# Copyright (C) 2021 The Psycopg Team + +from types import TracebackType +from typing import Any, List, Optional, Union, Tuple, Type, TYPE_CHECKING + +from . import pq +from . import errors as e +from .pq import ExecStatus +from .abc import PipelineCommand, PQGen +from ._compat import Deque, TypeAlias +from ._cmodule import _psycopg +from ._encodings import pgconn_encoding +from ._preparing import Key, Prepare + +if TYPE_CHECKING: + from .pq.abc import PGconn, PGresult + from .cursor import BaseCursor + +if _psycopg: + pipeline_communicate = _psycopg.pipeline_communicate + fetch_many = _psycopg.fetch_many + send = _psycopg.send + +else: + from . import generators + + pipeline_communicate = generators.pipeline_communicate + fetch_many = generators.fetch_many + send = generators.send + +PendingResult: TypeAlias = Union[ + None, Tuple["BaseCursor[Any, Any]", Optional[Tuple[Key, Prepare, bytes]]] +] + + +class BasePipeline: + def __init__(self, pgconn: "PGconn") -> None: + self.pgconn = pgconn + self.command_queue = Deque[PipelineCommand]() + self.result_queue = Deque[PendingResult]() + + @property + def status(self) -> pq.PipelineStatus: + return pq.PipelineStatus(self.pgconn.pipeline_status) + + def sync(self) -> None: + """Enqueue a PQpipelineSync() command.""" + self.command_queue.append(self.pgconn.pipeline_sync) + self.result_queue.append(None) + + def _enter(self) -> None: + self.pgconn.enter_pipeline_mode() + + def _exit(self) -> None: + self.pgconn.exit_pipeline_mode() + + def _communicate_gen(self) -> PQGen[None]: + """Communicate with pipeline to send commands and possibly fetch + results, which are then processed. + """ + fetched = yield from pipeline_communicate(self.pgconn, self.command_queue) + to_process = [(self.result_queue.popleft(), results) for results in fetched] + for queued, results in to_process: + self._process_results(queued, results) + + def _fetch_gen(self, *, flush: bool) -> PQGen[None]: + """Fetch available results from the connection and process them with + pipeline queued items. + + If 'flush' is True, a PQsendFlushRequest() is issued in order to make + sure results can be fetched. Otherwise, the caller may emit a + PQpipelineSync() call to ensure the output buffer gets flushed before + fetching. + """ + if not self.result_queue: + return + + if flush: + self.pgconn.send_flush_request() + yield from send(self.pgconn) + + to_process = [] + while self.result_queue: + results = yield from fetch_many(self.pgconn) + if not results: + # No more results to fetch, but there may still be pending + # commands. + break + queued = self.result_queue.popleft() + to_process.append((queued, results)) + + for queued, results in to_process: + self._process_results(queued, results) + + def _process_results( + self, queued: PendingResult, results: List["PGresult"] + ) -> None: + """Process a results set fetched from the current pipeline. + + This matchs 'results' with its respective element in the pipeline + queue. For commands (None value in the pipeline queue), results are + checked directly. For prepare statement creation requests, update the + cache. Otherwise, results are attached to their respective cursor. + """ + if queued is None: + (result,) = results + if result.status == ExecStatus.FATAL_ERROR: + raise e.error_from_result(result, encoding=pgconn_encoding(self.pgconn)) + elif result.status == ExecStatus.PIPELINE_ABORTED: + raise e.OperationalError("pipeline aborted") + else: + cursor, prepinfo = queued + cursor._check_results(results) + if not cursor._results: + cursor._results = results + cursor._set_current_result(0) + else: + cursor._results.extend(results) + if prepinfo: + key, prep, name = prepinfo + # Update the prepare state of the query. + cursor._conn._prepared.validate(key, prep, name, results) + + +class Pipeline(BasePipeline): + """Handler for connection in pipeline mode.""" + + def __enter__(self) -> "Pipeline": + self._enter() + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + self._exit() + + +class AsyncPipeline(BasePipeline): + """Handler for async connection in pipeline mode.""" + + async def __aenter__(self) -> "AsyncPipeline": + self._enter() + return self + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + self._exit() diff --git a/psycopg/psycopg/abc.py b/psycopg/psycopg/abc.py index 42a2614b4..d23873fe5 100644 --- a/psycopg/psycopg/abc.py +++ b/psycopg/psycopg/abc.py @@ -26,7 +26,7 @@ Buffer: TypeAlias = Union[bytes, bytearray, memoryview] Query: TypeAlias = Union[str, bytes, "Composable"] Params: TypeAlias = Union[Sequence[Any], Mapping[str, Any]] ConnectionType = TypeVar("ConnectionType", bound="BaseConnection[Any]") -Command = Callable[[], None] +PipelineCommand: TypeAlias = Callable[[], None] # TODO: make it recursive when mypy will support it # DumperKey: TypeAlias = Union[type, Tuple[Union[type, "DumperKey"]]] diff --git a/psycopg/psycopg/connection.py b/psycopg/psycopg/connection.py index c3b1c44c1..dc440967b 100644 --- a/psycopg/psycopg/connection.py +++ b/psycopg/psycopg/connection.py @@ -20,7 +20,7 @@ from . import errors as e from . import waiting from . import postgres from .pq import ConnStatus, ExecStatus, TransactionStatus, Format -from .abc import AdaptContext, Command, ConnectionType, Params, Query, RV +from .abc import AdaptContext, ConnectionType, Params, Query, RV from .abc import PQGen, PQGenConn from .sql import Composable, SQL from ._tpc import Xid @@ -28,25 +28,22 @@ from .rows import Row, RowFactory, tuple_row, TupleRow, args_row from .adapt import AdaptersMap from ._enums import IsolationLevel from .cursor import Cursor -from ._compat import Deque, TypeAlias +from ._compat import TypeAlias from ._cmodule import _psycopg from .conninfo import make_conninfo, conninfo_to_dict, ConnectionInfo +from ._pipeline import BasePipeline, Pipeline from .generators import notifies from ._encodings import pgconn_encoding -from ._preparing import Key, Prepare, PrepareManager +from ._preparing import PrepareManager from .transaction import Transaction from .server_cursor import ServerCursor if TYPE_CHECKING: from .pq.abc import PGconn, PGresult - from .cursor import BaseCursor from psycopg_pool.base import BasePool logger = logging.getLogger("psycopg") -connect: Callable[[str], PQGenConn["PGconn"]] -execute: Callable[["PGconn"], PQGen[List["PGresult"]]] - # Row Type variable for Cursor (when it needs to be distinguished from the # connection's one) CursorRow = TypeVar("CursorRow") @@ -54,18 +51,12 @@ CursorRow = TypeVar("CursorRow") if _psycopg: connect = _psycopg.connect execute = _psycopg.execute - fetch_many = _psycopg.fetch_many - pipeline_communicate = _psycopg.pipeline_communicate - send = _psycopg.send else: from . import generators connect = generators.connect execute = generators.execute - fetch_many = generators.fetch_many - pipeline_communicate = generators.pipeline_communicate - send = generators.send class Notify(NamedTuple): @@ -86,119 +77,6 @@ Notify.__module__ = "psycopg" NoticeHandler: TypeAlias = Callable[[e.Diagnostic], None] NotifyHandler: TypeAlias = Callable[[Notify], None] -PipelinePendingResult = Union[ - None, - Tuple[ - "BaseCursor[Any, Any]", - Optional[Tuple[Key, Prepare, bytes]], - ], -] - - -class BasePipeline: - def __init__(self, pgconn: "PGconn") -> None: - self.pgconn = pgconn - self.command_queue = Deque[Command]() - self.result_queue = Deque[PipelinePendingResult]() - - @property - def status(self) -> pq.PipelineStatus: - return pq.PipelineStatus(self.pgconn.pipeline_status) - - def sync(self) -> None: - """Enqueue a PQpipelineSync() command.""" - self.command_queue.append(self.pgconn.pipeline_sync) - self.result_queue.append(None) - - def _enter(self) -> None: - self.pgconn.enter_pipeline_mode() - - def _exit(self) -> None: - self.pgconn.exit_pipeline_mode() - - def _communicate_gen(self) -> PQGen[None]: - """Communicate with pipeline to send commands and possibly fetch - results, which are then processed. - """ - fetched = yield from pipeline_communicate(self.pgconn, self.command_queue) - to_process = [(self.result_queue.popleft(), results) for results in fetched] - for queued, results in to_process: - self._process_results(queued, results) - - def _fetch_gen(self, *, flush: bool) -> PQGen[None]: - """Fetch available results from the connection and process them with - pipeline queued items. - - If 'flush' is True, a PQsendFlushRequest() is issued in order to make - sure results can be fetched. Otherwise, the caller may emit a - PQpipelineSync() call to ensure the output buffer gets flushed before - fetching. - """ - if not self.result_queue: - return - - if flush: - self.pgconn.send_flush_request() - yield from send(self.pgconn) - - to_process = [] - while self.result_queue: - results = yield from fetch_many(self.pgconn) - if not results: - # No more results to fetch, but there may still be pending - # commands. - break - queued = self.result_queue.popleft() - to_process.append((queued, results)) - - for queued, results in to_process: - self._process_results(queued, results) - - def _process_results( - self, queued: PipelinePendingResult, results: List["PGresult"] - ) -> None: - """Process a results set fetched from the current pipeline. - - This matchs 'results' with its respective element in the pipeline - queue. For commands (None value in the pipeline queue), results are - checked directly. For prepare statement creation requests, update the - cache. Otherwise, results are attached to their respective cursor. - """ - if queued is None: - (result,) = results - if result.status == ExecStatus.FATAL_ERROR: - raise e.error_from_result(result, encoding=pgconn_encoding(self.pgconn)) - elif result.status == ExecStatus.PIPELINE_ABORTED: - raise e.OperationalError("pipeline aborted") - else: - cursor, prepinfo = queued - cursor._check_results(results) - if not cursor._results: - cursor._results = results - cursor._set_current_result(0) - else: - cursor._results.extend(results) - if prepinfo: - key, prep, name = prepinfo - # Update the prepare state of the query. - cursor._conn._prepared.validate(key, prep, name, results) - - -class Pipeline(BasePipeline): - """Handler for connection in pipeline mode.""" - - def __enter__(self) -> "Pipeline": - self._enter() - return self - - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> None: - self._exit() - class BaseConnection(Generic[Row]): """ diff --git a/psycopg/psycopg/connection_async.py b/psycopg/psycopg/connection_async.py index 5208c8b24..a51291483 100644 --- a/psycopg/psycopg/connection_async.py +++ b/psycopg/psycopg/connection_async.py @@ -21,8 +21,9 @@ from .rows import Row, AsyncRowFactory, tuple_row, TupleRow, args_row from .adapt import AdaptersMap from ._enums import IsolationLevel from .conninfo import make_conninfo, conninfo_to_dict +from ._pipeline import AsyncPipeline from ._encodings import pgconn_encoding -from .connection import BaseConnection, BasePipeline, CursorRow, Notify +from .connection import BaseConnection, CursorRow, Notify from .generators import notifies from .transaction import AsyncTransaction from .cursor_async import AsyncCursor @@ -35,22 +36,6 @@ if TYPE_CHECKING: logger = logging.getLogger("psycopg") -class AsyncPipeline(BasePipeline): - """Handler for async connection in pipeline mode.""" - - async def __aenter__(self) -> "AsyncPipeline": - self._enter() - return self - - async def __aexit__( - self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> None: - self._exit() - - class AsyncConnection(BaseConnection[Row]): """ Asynchronous wrapper for a connection to the database. @@ -62,7 +47,7 @@ class AsyncConnection(BaseConnection[Row]): server_cursor_factory: Type[AsyncServerCursor[Row]] row_factory: AsyncRowFactory[Row] - _pipeline: Optional[AsyncPipeline] + _pipeline: "Optional[AsyncPipeline]" def __init__( self, diff --git a/psycopg/psycopg/generators.py b/psycopg/psycopg/generators.py index 2a4aeaf5d..4dd2d72ec 100644 --- a/psycopg/psycopg/generators.py +++ b/psycopg/psycopg/generators.py @@ -21,7 +21,7 @@ from typing import List, Optional, Union from . import pq from . import errors as e from .pq import ConnStatus, PollingStatus, ExecStatus -from .abc import Command, PQGen, PQGenConn +from .abc import PipelineCommand, PQGen, PQGenConn from .pq.abc import PGconn, PGresult from .waiting import Wait, Ready from ._compat import Deque @@ -161,7 +161,7 @@ def fetch(pgconn: PGconn) -> PQGen[Optional[PGresult]]: def pipeline_communicate( - pgconn: PGconn, commands: Deque[Command] + pgconn: PGconn, commands: Deque[PipelineCommand] ) -> PQGen[List[List[PGresult]]]: """Generator to send queries from a connection in pipeline mode while also receiving results. diff --git a/psycopg_c/psycopg_c/_psycopg.pyi b/psycopg_c/psycopg_c/_psycopg.pyi index f415fda5e..6668e337f 100644 --- a/psycopg_c/psycopg_c/_psycopg.pyi +++ b/psycopg_c/psycopg_c/_psycopg.pyi @@ -12,7 +12,7 @@ from typing import Any, Iterable, List, Optional, Sequence, Tuple from psycopg import pq from psycopg import abc from psycopg.rows import Row, RowMaker -from psycopg.abc import Command +from psycopg.abc import PipelineCommand from psycopg.adapt import AdaptersMap, PyFormat from psycopg.pq.abc import PGconn, PGresult from psycopg.connection import BaseConnection @@ -53,7 +53,7 @@ def send(pgconn: PGconn) -> abc.PQGen[None]: ... def fetch_many(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ... def fetch(pgconn: PGconn) -> abc.PQGen[Optional[PGresult]]: ... def pipeline_communicate( - pgconn: PGconn, commands: Deque[Command] + pgconn: PGconn, commands: Deque[PipelineCommand] ) -> abc.PQGen[List[List[PGresult]]]: ... # Copy support diff --git a/psycopg_c/psycopg_c/_psycopg/generators.pyx b/psycopg_c/psycopg_c/_psycopg/generators.pyx index 86461024e..8d623a4e1 100644 --- a/psycopg_c/psycopg_c/_psycopg/generators.pyx +++ b/psycopg_c/psycopg_c/_psycopg/generators.pyx @@ -11,7 +11,7 @@ from typing import List from psycopg import errors as e from psycopg.pq import abc, error_message -from psycopg.abc import Command, PQGen +from psycopg.abc import PipelineCommand, PQGen from psycopg.waiting import Wait, Ready from psycopg._compat import Deque from psycopg._encodings import conninfo_encoding @@ -196,7 +196,9 @@ def fetch(pq.PGconn pgconn) -> PQGen[Optional[PGresult]]: return pq.PGresult._from_ptr(pgres) -def pipeline_communicate(pq.PGconn pgconn, commands: Deque[Command]) -> PQGen[List[List[PGresult]]]: +def pipeline_communicate( + pq.PGconn pgconn, commands: Deque[PipelineCommand] +) -> PQGen[List[List[PGresult]]]: """Generator to send queries from a connection in pipeline mode while also receiving results. diff --git a/tests/scripts/pipeline-demo.py b/tests/scripts/pipeline-demo.py index e7719e9bc..e3cac248d 100644 --- a/tests/scripts/pipeline-demo.py +++ b/tests/scripts/pipeline-demo.py @@ -17,7 +17,7 @@ from typing import Any, Iterator, Optional, Sequence, Tuple from psycopg import AsyncConnection, Connection from psycopg import pq, waiting from psycopg import errors as e -from psycopg.abc import Command +from psycopg.abc import PipelineCommand from psycopg.generators import pipeline_communicate from psycopg.pq._enums import Format from psycopg._compat import Deque @@ -94,7 +94,7 @@ class LoggingPGconn: @contextmanager def prepare_pipeline_demo_pq( pgconn: LoggingPGconn, rows_to_send: int, logger: logging.Logger -) -> Iterator[Tuple[Deque[Command], Deque[str]]]: +) -> Iterator[Tuple[Deque[PipelineCommand], Deque[str]]]: """Set up pipeline demo with initial queries and yield commands and results queue for pipeline_communicate(). """ @@ -120,7 +120,7 @@ def prepare_pipeline_demo_pq( ), ] - commands = Deque[Command]() + commands = Deque[PipelineCommand]() results_queue = Deque[str]() for qname, query in setup_queries: