yield from self._fetch_gen(flush=False)
def _exit_gen(self) -> PQGen[None]:
- """Exit current pipeline by sending a Sync and, unless within a nested
- pipeline, also fetch back all remaining results.
+ """
+ Exit current pipeline by sending a Sync and fetch back all remaining results.
"""
try:
self._enqueue_sync()
yield from self._communicate_gen()
finally:
- if self.level == 1:
- # No need to force flush since we emitted a sync just before.
- yield from self._fetch_gen(flush=False)
+ # No need to force flush since we emitted a sync just before.
+ yield from self._fetch_gen(flush=False)
def _communicate_gen(self) -> PQGen[None]:
"""Communicate with pipeline to send commands and possibly fetch
raise TypeError("transaction blocks can be used only once")
self._entered = True
- # If we are in pipeline mode and connection idle, we might be in a
- # nested statement but we haven't received yet the result to go
- # INTRANS. This would make _push_savepoint() fail. If so, synchronize
- # with the server.
- if (
- self._conn._pipeline
- and self.pgconn.transaction_status == TransactionStatus.IDLE
- ):
- yield from self._conn._pipeline._sync_gen()
-
self._push_savepoint()
for command in self._get_enter_commands():
yield from self._conn._exec_command(command)
for command in self._get_commit_commands():
yield from self._conn._exec_command(command)
- if self._conn._pipeline:
- yield from self._conn._pipeline._sync_gen()
-
def _rollback_gen(self, exc_val: Optional[BaseException]) -> PQGen[bool]:
if isinstance(exc_val, Rollback):
logger.debug(f"{self._conn}: Explicit rollback from: ", exc_info=True)
if ex:
raise ex
- # Get out of a "pipeline aborted" state
- if (
- self._conn._pipeline
- and self.pgconn.pipeline_status == pq.PipelineStatus.ABORTED
- ):
- yield from self._conn._pipeline._sync_gen()
-
for command in self._get_rollback_commands():
yield from self._conn._exec_command(command)