From: Denis Laxalde Date: Tue, 29 Mar 2022 06:53:57 +0000 (+0200) Subject: refactor: move final pipeline steps into a communicate() method X-Git-Tag: 3.1~145^2~12 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=72dd88eaa90bc749a3796d4bfc89bf48525c3abc;p=thirdparty%2Fpsycopg.git refactor: move final pipeline steps into a communicate() method --- diff --git a/psycopg/psycopg/_pipeline.py b/psycopg/psycopg/_pipeline.py index 99a010339..4a73286e6 100644 --- a/psycopg/psycopg/_pipeline.py +++ b/psycopg/psycopg/_pipeline.py @@ -160,6 +160,24 @@ class Pipeline(BasePipeline): def __init__(self, conn: "Connection[Any]") -> None: super().__init__(conn) + def communicate(self) -> None: + """Sync the pipeline, send any pending command and fetch and process + all available results. + + This is called when exiting the pipeline, but can be used for other + purposes (e.g. in nested pipelines). + """ + with self._conn.lock: + self.sync() + try: + # Send any pending commands (e.g. COMMIT or Sync); + # while processing results, we might get errors... + self._conn.wait(self._communicate_gen()) + finally: + # then fetch all remaining results but without forcing + # flush since we emitted a sync just before. + self._conn.wait(self._fetch_gen(flush=False)) + def __enter__(self) -> "Pipeline": self._enter() return self @@ -171,16 +189,7 @@ class Pipeline(BasePipeline): exc_tb: Optional[TracebackType], ) -> None: try: - with self._conn.lock: - self.sync() - try: - # Send any pending commands (e.g. COMMIT or Sync); - # while processing results, we might get errors... - self._conn.wait(self._communicate_gen()) - finally: - # then fetch all remaining results but without forcing - # flush since we emitted a sync just before. - self._conn.wait(self._fetch_gen(flush=False)) + self.communicate() except Exception as exc2: # Don't clobber an exception raised in the block with this one if exc_val: @@ -200,6 +209,24 @@ class AsyncPipeline(BasePipeline): def __init__(self, conn: "AsyncConnection[Any]") -> None: super().__init__(conn) + async def communicate(self) -> None: + """Sync the pipeline, send any pending command and fetch and process + all available results. + + This is called when exiting the pipeline, but can be used for other + purposes (e.g. in nested pipelines). + """ + async with self._conn.lock: + self.sync() + try: + # Send any pending commands (e.g. COMMIT or Sync); + # while processing results, we might get errors... + await self._conn.wait(self._communicate_gen()) + finally: + # then fetch all remaining results but without forcing + # flush since we emitted a sync just before. + await self._conn.wait(self._fetch_gen(flush=False)) + async def __aenter__(self) -> "AsyncPipeline": self._enter() return self @@ -211,16 +238,7 @@ class AsyncPipeline(BasePipeline): exc_tb: Optional[TracebackType], ) -> None: try: - async with self._conn.lock: - self.sync() - try: - # Send any pending commands (e.g. COMMIT or Sync); - # while processing results, we might get errors... - await self._conn.wait(self._communicate_gen()) - finally: - # then fetch all remaining results but without forcing - # flush since we emitted a sync just before. - await self._conn.wait(self._fetch_gen(flush=False)) + await self.communicate() except Exception as exc2: # Don't clobber an exception raised in the block with this one if exc_val: