from .errors import InternalError, ProgrammingError, NotSupportedError
from ._column import Column
from .conninfo import ConnectionInfo
-from .connection import BaseConnection, Connection, Notify, Pipeline
+from ._pipeline import Pipeline, AsyncPipeline
+from .connection import BaseConnection, Connection, Notify
from .transaction import Rollback, Transaction, AsyncTransaction
from .cursor_async import AsyncCursor
from .server_cursor import AsyncServerCursor, ServerCursor
-from .connection_async import AsyncConnection, AsyncPipeline
+from .connection_async import AsyncConnection
from . import dbapi20
from .dbapi20 import BINARY, DATETIME, NUMBER, ROWID, STRING
--- /dev/null
+"""
+commands pipeline management
+"""
+
+# Copyright (C) 2021 The Psycopg Team
+
+from types import TracebackType
+from typing import Any, List, Optional, Union, Tuple, Type, TYPE_CHECKING
+
+from . import pq
+from . import errors as e
+from .pq import ExecStatus
+from .abc import PipelineCommand, PQGen
+from ._compat import Deque, TypeAlias
+from ._cmodule import _psycopg
+from ._encodings import pgconn_encoding
+from ._preparing import Key, Prepare
+
+if TYPE_CHECKING:
+ from .pq.abc import PGconn, PGresult
+ from .cursor import BaseCursor
+
+if _psycopg:
+ pipeline_communicate = _psycopg.pipeline_communicate
+ fetch_many = _psycopg.fetch_many
+ send = _psycopg.send
+
+else:
+ from . import generators
+
+ pipeline_communicate = generators.pipeline_communicate
+ fetch_many = generators.fetch_many
+ send = generators.send
+
+PendingResult: TypeAlias = Union[
+ None, Tuple["BaseCursor[Any, Any]", Optional[Tuple[Key, Prepare, bytes]]]
+]
+
+
+class BasePipeline:
+ def __init__(self, pgconn: "PGconn") -> None:
+ self.pgconn = pgconn
+ self.command_queue = Deque[PipelineCommand]()
+ self.result_queue = Deque[PendingResult]()
+
+ @property
+ def status(self) -> pq.PipelineStatus:
+ return pq.PipelineStatus(self.pgconn.pipeline_status)
+
+ def sync(self) -> None:
+ """Enqueue a PQpipelineSync() command."""
+ self.command_queue.append(self.pgconn.pipeline_sync)
+ self.result_queue.append(None)
+
+ def _enter(self) -> None:
+ self.pgconn.enter_pipeline_mode()
+
+ def _exit(self) -> None:
+ self.pgconn.exit_pipeline_mode()
+
+ def _communicate_gen(self) -> PQGen[None]:
+ """Communicate with pipeline to send commands and possibly fetch
+ results, which are then processed.
+ """
+ fetched = yield from pipeline_communicate(self.pgconn, self.command_queue)
+ to_process = [(self.result_queue.popleft(), results) for results in fetched]
+ for queued, results in to_process:
+ self._process_results(queued, results)
+
+ def _fetch_gen(self, *, flush: bool) -> PQGen[None]:
+ """Fetch available results from the connection and process them with
+ pipeline queued items.
+
+ If 'flush' is True, a PQsendFlushRequest() is issued in order to make
+ sure results can be fetched. Otherwise, the caller may emit a
+ PQpipelineSync() call to ensure the output buffer gets flushed before
+ fetching.
+ """
+ if not self.result_queue:
+ return
+
+ if flush:
+ self.pgconn.send_flush_request()
+ yield from send(self.pgconn)
+
+ to_process = []
+ while self.result_queue:
+ results = yield from fetch_many(self.pgconn)
+ if not results:
+ # No more results to fetch, but there may still be pending
+ # commands.
+ break
+ queued = self.result_queue.popleft()
+ to_process.append((queued, results))
+
+ for queued, results in to_process:
+ self._process_results(queued, results)
+
+ def _process_results(
+ self, queued: PendingResult, results: List["PGresult"]
+ ) -> None:
+ """Process a results set fetched from the current pipeline.
+
+ This matchs 'results' with its respective element in the pipeline
+ queue. For commands (None value in the pipeline queue), results are
+ checked directly. For prepare statement creation requests, update the
+ cache. Otherwise, results are attached to their respective cursor.
+ """
+ if queued is None:
+ (result,) = results
+ if result.status == ExecStatus.FATAL_ERROR:
+ raise e.error_from_result(result, encoding=pgconn_encoding(self.pgconn))
+ elif result.status == ExecStatus.PIPELINE_ABORTED:
+ raise e.OperationalError("pipeline aborted")
+ else:
+ cursor, prepinfo = queued
+ cursor._check_results(results)
+ if not cursor._results:
+ cursor._results = results
+ cursor._set_current_result(0)
+ else:
+ cursor._results.extend(results)
+ if prepinfo:
+ key, prep, name = prepinfo
+ # Update the prepare state of the query.
+ cursor._conn._prepared.validate(key, prep, name, results)
+
+
+class Pipeline(BasePipeline):
+ """Handler for connection in pipeline mode."""
+
+ def __enter__(self) -> "Pipeline":
+ self._enter()
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
+ self._exit()
+
+
+class AsyncPipeline(BasePipeline):
+ """Handler for async connection in pipeline mode."""
+
+ async def __aenter__(self) -> "AsyncPipeline":
+ self._enter()
+ return self
+
+ async def __aexit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> None:
+ self._exit()
Query: TypeAlias = Union[str, bytes, "Composable"]
Params: TypeAlias = Union[Sequence[Any], Mapping[str, Any]]
ConnectionType = TypeVar("ConnectionType", bound="BaseConnection[Any]")
-Command = Callable[[], None]
+PipelineCommand: TypeAlias = Callable[[], None]
# TODO: make it recursive when mypy will support it
# DumperKey: TypeAlias = Union[type, Tuple[Union[type, "DumperKey"]]]
from . import waiting
from . import postgres
from .pq import ConnStatus, ExecStatus, TransactionStatus, Format
-from .abc import AdaptContext, Command, ConnectionType, Params, Query, RV
+from .abc import AdaptContext, ConnectionType, Params, Query, RV
from .abc import PQGen, PQGenConn
from .sql import Composable, SQL
from ._tpc import Xid
from .adapt import AdaptersMap
from ._enums import IsolationLevel
from .cursor import Cursor
-from ._compat import Deque, TypeAlias
+from ._compat import TypeAlias
from ._cmodule import _psycopg
from .conninfo import make_conninfo, conninfo_to_dict, ConnectionInfo
+from ._pipeline import BasePipeline, Pipeline
from .generators import notifies
from ._encodings import pgconn_encoding
-from ._preparing import Key, Prepare, PrepareManager
+from ._preparing import PrepareManager
from .transaction import Transaction
from .server_cursor import ServerCursor
if TYPE_CHECKING:
from .pq.abc import PGconn, PGresult
- from .cursor import BaseCursor
from psycopg_pool.base import BasePool
logger = logging.getLogger("psycopg")
-connect: Callable[[str], PQGenConn["PGconn"]]
-execute: Callable[["PGconn"], PQGen[List["PGresult"]]]
-
# Row Type variable for Cursor (when it needs to be distinguished from the
# connection's one)
CursorRow = TypeVar("CursorRow")
if _psycopg:
connect = _psycopg.connect
execute = _psycopg.execute
- fetch_many = _psycopg.fetch_many
- pipeline_communicate = _psycopg.pipeline_communicate
- send = _psycopg.send
else:
from . import generators
connect = generators.connect
execute = generators.execute
- fetch_many = generators.fetch_many
- pipeline_communicate = generators.pipeline_communicate
- send = generators.send
class Notify(NamedTuple):
NoticeHandler: TypeAlias = Callable[[e.Diagnostic], None]
NotifyHandler: TypeAlias = Callable[[Notify], None]
-PipelinePendingResult = Union[
- None,
- Tuple[
- "BaseCursor[Any, Any]",
- Optional[Tuple[Key, Prepare, bytes]],
- ],
-]
-
-
-class BasePipeline:
- def __init__(self, pgconn: "PGconn") -> None:
- self.pgconn = pgconn
- self.command_queue = Deque[Command]()
- self.result_queue = Deque[PipelinePendingResult]()
-
- @property
- def status(self) -> pq.PipelineStatus:
- return pq.PipelineStatus(self.pgconn.pipeline_status)
-
- def sync(self) -> None:
- """Enqueue a PQpipelineSync() command."""
- self.command_queue.append(self.pgconn.pipeline_sync)
- self.result_queue.append(None)
-
- def _enter(self) -> None:
- self.pgconn.enter_pipeline_mode()
-
- def _exit(self) -> None:
- self.pgconn.exit_pipeline_mode()
-
- def _communicate_gen(self) -> PQGen[None]:
- """Communicate with pipeline to send commands and possibly fetch
- results, which are then processed.
- """
- fetched = yield from pipeline_communicate(self.pgconn, self.command_queue)
- to_process = [(self.result_queue.popleft(), results) for results in fetched]
- for queued, results in to_process:
- self._process_results(queued, results)
-
- def _fetch_gen(self, *, flush: bool) -> PQGen[None]:
- """Fetch available results from the connection and process them with
- pipeline queued items.
-
- If 'flush' is True, a PQsendFlushRequest() is issued in order to make
- sure results can be fetched. Otherwise, the caller may emit a
- PQpipelineSync() call to ensure the output buffer gets flushed before
- fetching.
- """
- if not self.result_queue:
- return
-
- if flush:
- self.pgconn.send_flush_request()
- yield from send(self.pgconn)
-
- to_process = []
- while self.result_queue:
- results = yield from fetch_many(self.pgconn)
- if not results:
- # No more results to fetch, but there may still be pending
- # commands.
- break
- queued = self.result_queue.popleft()
- to_process.append((queued, results))
-
- for queued, results in to_process:
- self._process_results(queued, results)
-
- def _process_results(
- self, queued: PipelinePendingResult, results: List["PGresult"]
- ) -> None:
- """Process a results set fetched from the current pipeline.
-
- This matchs 'results' with its respective element in the pipeline
- queue. For commands (None value in the pipeline queue), results are
- checked directly. For prepare statement creation requests, update the
- cache. Otherwise, results are attached to their respective cursor.
- """
- if queued is None:
- (result,) = results
- if result.status == ExecStatus.FATAL_ERROR:
- raise e.error_from_result(result, encoding=pgconn_encoding(self.pgconn))
- elif result.status == ExecStatus.PIPELINE_ABORTED:
- raise e.OperationalError("pipeline aborted")
- else:
- cursor, prepinfo = queued
- cursor._check_results(results)
- if not cursor._results:
- cursor._results = results
- cursor._set_current_result(0)
- else:
- cursor._results.extend(results)
- if prepinfo:
- key, prep, name = prepinfo
- # Update the prepare state of the query.
- cursor._conn._prepared.validate(key, prep, name, results)
-
-
-class Pipeline(BasePipeline):
- """Handler for connection in pipeline mode."""
-
- def __enter__(self) -> "Pipeline":
- self._enter()
- return self
-
- def __exit__(
- self,
- exc_type: Optional[Type[BaseException]],
- exc_val: Optional[BaseException],
- exc_tb: Optional[TracebackType],
- ) -> None:
- self._exit()
-
class BaseConnection(Generic[Row]):
"""
from .adapt import AdaptersMap
from ._enums import IsolationLevel
from .conninfo import make_conninfo, conninfo_to_dict
+from ._pipeline import AsyncPipeline
from ._encodings import pgconn_encoding
-from .connection import BaseConnection, BasePipeline, CursorRow, Notify
+from .connection import BaseConnection, CursorRow, Notify
from .generators import notifies
from .transaction import AsyncTransaction
from .cursor_async import AsyncCursor
logger = logging.getLogger("psycopg")
-class AsyncPipeline(BasePipeline):
- """Handler for async connection in pipeline mode."""
-
- async def __aenter__(self) -> "AsyncPipeline":
- self._enter()
- return self
-
- async def __aexit__(
- self,
- exc_type: Optional[Type[BaseException]],
- exc_val: Optional[BaseException],
- exc_tb: Optional[TracebackType],
- ) -> None:
- self._exit()
-
-
class AsyncConnection(BaseConnection[Row]):
"""
Asynchronous wrapper for a connection to the database.
server_cursor_factory: Type[AsyncServerCursor[Row]]
row_factory: AsyncRowFactory[Row]
- _pipeline: Optional[AsyncPipeline]
+ _pipeline: "Optional[AsyncPipeline]"
def __init__(
self,
from . import pq
from . import errors as e
from .pq import ConnStatus, PollingStatus, ExecStatus
-from .abc import Command, PQGen, PQGenConn
+from .abc import PipelineCommand, PQGen, PQGenConn
from .pq.abc import PGconn, PGresult
from .waiting import Wait, Ready
from ._compat import Deque
def pipeline_communicate(
- pgconn: PGconn, commands: Deque[Command]
+ pgconn: PGconn, commands: Deque[PipelineCommand]
) -> PQGen[List[List[PGresult]]]:
"""Generator to send queries from a connection in pipeline mode while also
receiving results.
from psycopg import pq
from psycopg import abc
from psycopg.rows import Row, RowMaker
-from psycopg.abc import Command
+from psycopg.abc import PipelineCommand
from psycopg.adapt import AdaptersMap, PyFormat
from psycopg.pq.abc import PGconn, PGresult
from psycopg.connection import BaseConnection
def fetch_many(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ...
def fetch(pgconn: PGconn) -> abc.PQGen[Optional[PGresult]]: ...
def pipeline_communicate(
- pgconn: PGconn, commands: Deque[Command]
+ pgconn: PGconn, commands: Deque[PipelineCommand]
) -> abc.PQGen[List[List[PGresult]]]: ...
# Copy support
from psycopg import errors as e
from psycopg.pq import abc, error_message
-from psycopg.abc import Command, PQGen
+from psycopg.abc import PipelineCommand, PQGen
from psycopg.waiting import Wait, Ready
from psycopg._compat import Deque
from psycopg._encodings import conninfo_encoding
return pq.PGresult._from_ptr(pgres)
-def pipeline_communicate(pq.PGconn pgconn, commands: Deque[Command]) -> PQGen[List[List[PGresult]]]:
+def pipeline_communicate(
+ pq.PGconn pgconn, commands: Deque[PipelineCommand]
+) -> PQGen[List[List[PGresult]]]:
"""Generator to send queries from a connection in pipeline mode while also
receiving results.
from psycopg import AsyncConnection, Connection
from psycopg import pq, waiting
from psycopg import errors as e
-from psycopg.abc import Command
+from psycopg.abc import PipelineCommand
from psycopg.generators import pipeline_communicate
from psycopg.pq._enums import Format
from psycopg._compat import Deque
@contextmanager
def prepare_pipeline_demo_pq(
pgconn: LoggingPGconn, rows_to_send: int, logger: logging.Logger
-) -> Iterator[Tuple[Deque[Command], Deque[str]]]:
+) -> Iterator[Tuple[Deque[PipelineCommand], Deque[str]]]:
"""Set up pipeline demo with initial queries and yield commands and
results queue for pipeline_communicate().
"""
),
]
- commands = Deque[Command]()
+ commands = Deque[PipelineCommand]()
results_queue = Deque[str]()
for qname, query in setup_queries: