def _sync_gen(self) -> PQGen[None]:
self._enqueue_sync()
+ yield from self._communicate_gen()
+
+ def _exit_gen(self) -> PQGen[None]:
try:
# Send any pending commands (e.g. COMMIT or Sync);
# while processing results, we might get errors...
- yield from self._communicate_gen()
+ yield from self._sync_gen()
finally:
# then fetch all remaining results but without forcing
# flush since we emitted a sync just before.
super().__init__(conn)
def sync(self) -> None:
- """Sync the pipeline, send any pending command and fetch and process
+ """Sync the pipeline, send any pending command and receive and process
all available results.
-
- This is called when exiting the pipeline, but can be used for other
- purposes (e.g. in nested pipelines).
"""
try:
with self._conn.lock:
exc_tb: Optional[TracebackType],
) -> None:
try:
- self.sync()
+ with self._conn.lock:
+ self._conn.wait(self._exit_gen())
except Exception as exc2:
# Don't clobber an exception raised in the block with this one
if exc_val:
- logger.warning("error ignored syncing %r: %s", self, exc2)
+ logger.warning("error ignored terminating %r: %s", self, exc2)
else:
raise exc2.with_traceback(None)
finally:
exc_tb: Optional[TracebackType],
) -> None:
try:
- await self.sync()
+ async with self._conn.lock:
+ await self._conn.wait(self._exit_gen())
except Exception as exc2:
# Don't clobber an exception raised in the block with this one
if exc_val:
- logger.warning("error ignored syncing %r: %s", self, exc2)
+ logger.warning("error ignored terminating %r: %s", self, exc2)
else:
raise exc2.with_traceback(None)
finally: