def status(self) -> pq.PipelineStatus:
return pq.PipelineStatus(self.pgconn.pipeline_status)
- def sync(self) -> None:
- """Enqueue a PQpipelineSync() command."""
- self.command_queue.append(self.pgconn.pipeline_sync)
- self.result_queue.append(None)
-
@staticmethod
def is_supported() -> bool:
"""Return `True` if the psycopg libpq wrapper suports pipeline mode."""
if self.pgconn.status != ConnStatus.BAD:
self.pgconn.exit_pipeline_mode()
+ def _sync_gen(self) -> PQGen[None]:
+ self._enqueue_sync()
+ try:
+ # Send any pending commands (e.g. COMMIT or Sync);
+ # while processing results, we might get errors...
+ yield from self._communicate_gen()
+ finally:
+ # then fetch all remaining results but without forcing
+ # flush since we emitted a sync just before.
+ yield from self._fetch_gen(flush=False)
+
def _communicate_gen(self) -> PQGen[None]:
"""Communicate with pipeline to send commands and possibly fetch
results, which are then processed.
return
if flush:
- yield from self._flush_gen()
+ self.pgconn.send_flush_request()
+ yield from send(self.pgconn)
to_process = []
while self.result_queue:
for queued, results in to_process:
self._process_results(queued, results)
- def _flush_gen(self) -> PQGen[None]:
- self.pgconn.send_flush_request()
- yield from send(self.pgconn)
-
def _process_results(
self, queued: PendingResult, results: List["PGresult"]
) -> None:
# Update the prepare state of the query.
cursor._conn._prepared.validate(key, prep, name, results)
+ def _enqueue_sync(self) -> None:
+ """Enqueue a PQpipelineSync() command."""
+ self.command_queue.append(self.pgconn.pipeline_sync)
+ self.result_queue.append(None)
+
class Pipeline(BasePipeline):
"""Handler for connection in pipeline mode."""
def __init__(self, conn: "Connection[Any]") -> None:
super().__init__(conn)
- def communicate(self) -> None:
+ def sync(self) -> None:
"""Sync the pipeline, send any pending command and fetch and process
all available results.
purposes (e.g. in nested pipelines).
"""
with self._conn.lock:
- self.sync()
- try:
- # Send any pending commands (e.g. COMMIT or Sync);
- # while processing results, we might get errors...
- self._conn.wait(self._communicate_gen())
- finally:
- # then fetch all remaining results but without forcing
- # flush since we emitted a sync just before.
- self._conn.wait(self._fetch_gen(flush=False))
+ self._conn.wait(self._sync_gen())
def __enter__(self) -> "Pipeline":
self._enter()
exc_tb: Optional[TracebackType],
) -> None:
try:
- self.communicate()
+ self.sync()
except Exception as exc2:
# Don't clobber an exception raised in the block with this one
if exc_val:
def __init__(self, conn: "AsyncConnection[Any]") -> None:
super().__init__(conn)
- async def communicate(self) -> None:
+ async def sync(self) -> None:
"""Sync the pipeline, send any pending command and fetch and process
all available results.
purposes (e.g. in nested pipelines).
"""
async with self._conn.lock:
- self.sync()
- try:
- # Send any pending commands (e.g. COMMIT or Sync);
- # while processing results, we might get errors...
- await self._conn.wait(self._communicate_gen())
- finally:
- # then fetch all remaining results but without forcing
- # flush since we emitted a sync just before.
- await self._conn.wait(self._fetch_gen(flush=False))
+ await self._conn.wait(self._sync_gen())
async def __aenter__(self) -> "AsyncPipeline":
self._enter()
exc_tb: Optional[TracebackType],
) -> None:
try:
- await self.communicate()
+ await self.sync()
except Exception as exc2:
# Don't clobber an exception raised in the block with this one
if exc_val: