from ._preparing import Key, Prepare
if TYPE_CHECKING:
- from .pq.abc import PGconn, PGresult
+ from .pq.abc import PGresult
from .cursor import BaseCursor
+ from .connection import BaseConnection, Connection
+ from .connection_async import AsyncConnection
if _psycopg:
pipeline_communicate = _psycopg.pipeline_communicate
class BasePipeline:
- def __init__(self, pgconn: "PGconn") -> None:
- self.pgconn = pgconn
+
+ command_queue: Deque[PipelineCommand]
+ result_queue: Deque[PendingResult]
+
+ def __init__(self, conn: "BaseConnection[Any]") -> None:
+ self._conn = conn
+ self.pgconn = conn.pgconn
self.command_queue = Deque[PipelineCommand]()
self.result_queue = Deque[PendingResult]()
class Pipeline(BasePipeline):
"""Handler for connection in pipeline mode."""
+ _conn: "Connection[Any]"
+
+ def __init__(self, conn: "Connection[Any]") -> None:
+ super().__init__(conn)
+
def __enter__(self) -> "Pipeline":
self._enter()
return self
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
- self._exit()
+ try:
+ with self._conn.lock:
+ self.sync()
+ try:
+ # Send an pending commands (e.g. COMMIT or Sync);
+ # while processing results, we might get errors...
+ self._conn.wait(self._communicate_gen())
+ finally:
+ # then fetch all remaining results but without forcing
+ # flush since we emitted a sync just before.
+ self._conn.wait(self._fetch_gen(flush=False))
+ finally:
+ self._exit()
class AsyncPipeline(BasePipeline):
"""Handler for async connection in pipeline mode."""
+ _conn: "AsyncConnection[Any]"
+
+ def __init__(self, conn: "AsyncConnection[Any]") -> None:
+ super().__init__(conn)
+
async def __aenter__(self) -> "AsyncPipeline":
self._enter()
return self
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
- self._exit()
+ try:
+ async with self._conn.lock:
+ self.sync()
+ try:
+ # Send an pending commands (e.g. COMMIT or Sync);
+ # while processing results, we might get errors...
+ await self._conn.wait(self._communicate_gen())
+ finally:
+ # then fetch all remaining results but without forcing
+ # flush since we emitted a sync just before.
+ await self._conn.wait(self._fetch_gen(flush=False))
+ finally:
+ self._exit()
with self.lock:
if self._pipeline is None:
# We must enter pipeline mode: create a new one
- pipeline = self._pipeline = Pipeline(self.pgconn)
+ # WARNING: reference loop, broken ahead.
+ pipeline = self._pipeline = Pipeline(self)
else:
# we are already in pipeline mode: bail out as soon as we
# leave the lock block.
try:
with pipeline:
- try:
- yield pipeline
- finally:
- with self.lock:
- pipeline.sync()
- try:
- # Send an pending commands (e.g. COMMIT or Sync);
- # while processing results, we might get errors...
- self.wait(pipeline._communicate_gen())
- finally:
- # then fetch all remaining results but without forcing
- # flush since we emitted a sync just before.
- self.wait(pipeline._fetch_gen(flush=False))
+ yield pipeline
finally:
assert pipeline.status == pq.PipelineStatus.OFF, pipeline.status
self._pipeline = None
async with self.lock:
if self._pipeline is None:
# We must enter pipeline mode: create a new one
- pipeline = self._pipeline = AsyncPipeline(self.pgconn)
+ # WARNING: reference loop, broken ahead.
+ pipeline = self._pipeline = AsyncPipeline(self)
else:
# we are already in pipeline mode: bail out as soon as we
# leave the lock block.
try:
async with pipeline:
- try:
- yield pipeline
- finally:
- async with self.lock:
- pipeline.sync()
- try:
- # Send an pending commands (e.g. COMMIT or Sync);
- # while processing results, we might get errors...
- await self.wait(pipeline._communicate_gen())
- finally:
- # then fetch all remaining results but without forcing
- # flush since we emitted a sync just before.
- await self.wait(pipeline._fetch_gen(flush=False))
+ yield pipeline
finally:
assert pipeline.status == PipelineStatus.OFF, pipeline.status
self._pipeline = None