def __init__(self, conn: "Connection[Any]") -> None:
super().__init__(conn)
+ def communicate(self) -> None:
+ """Sync the pipeline, send any pending command and fetch and process
+ all available results.
+
+ This is called when exiting the pipeline, but can be used for other
+ purposes (e.g. in nested pipelines).
+ """
+ with self._conn.lock:
+ self.sync()
+ try:
+ # Send any pending commands (e.g. COMMIT or Sync);
+ # while processing results, we might get errors...
+ self._conn.wait(self._communicate_gen())
+ finally:
+ # then fetch all remaining results but without forcing
+ # flush since we emitted a sync just before.
+ self._conn.wait(self._fetch_gen(flush=False))
+
def __enter__(self) -> "Pipeline":
self._enter()
return self
exc_tb: Optional[TracebackType],
) -> None:
try:
- with self._conn.lock:
- self.sync()
- try:
- # Send any pending commands (e.g. COMMIT or Sync);
- # while processing results, we might get errors...
- self._conn.wait(self._communicate_gen())
- finally:
- # then fetch all remaining results but without forcing
- # flush since we emitted a sync just before.
- self._conn.wait(self._fetch_gen(flush=False))
+ self.communicate()
except Exception as exc2:
# Don't clobber an exception raised in the block with this one
if exc_val:
def __init__(self, conn: "AsyncConnection[Any]") -> None:
super().__init__(conn)
+ async def communicate(self) -> None:
+ """Sync the pipeline, send any pending command and fetch and process
+ all available results.
+
+ This is called when exiting the pipeline, but can be used for other
+ purposes (e.g. in nested pipelines).
+ """
+ async with self._conn.lock:
+ self.sync()
+ try:
+ # Send any pending commands (e.g. COMMIT or Sync);
+ # while processing results, we might get errors...
+ await self._conn.wait(self._communicate_gen())
+ finally:
+ # then fetch all remaining results but without forcing
+ # flush since we emitted a sync just before.
+ await self._conn.wait(self._fetch_gen(flush=False))
+
async def __aenter__(self) -> "AsyncPipeline":
self._enter()
return self
exc_tb: Optional[TracebackType],
) -> None:
try:
- async with self._conn.lock:
- self.sync()
- try:
- # Send any pending commands (e.g. COMMIT or Sync);
- # while processing results, we might get errors...
- await self._conn.wait(self._communicate_gen())
- finally:
- # then fetch all remaining results but without forcing
- # flush since we emitted a sync just before.
- await self._conn.wait(self._fetch_gen(flush=False))
+ await self.communicate()
except Exception as exc2:
# Don't clobber an exception raised in the block with this one
if exc_val: