From: Denis Laxalde Date: Mon, 4 Apr 2022 09:41:06 +0000 (+0200) Subject: fix: do not fetch in Pipeline.sync() X-Git-Tag: 3.1~122^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ec32a8a92e76af9a1de11874cf2e905925316860;p=thirdparty%2Fpsycopg.git fix: do not fetch in Pipeline.sync() During previous refactorings, we made Pipeline.sync() also fetch results from the server. But this somehow breaks the semantics of the synchronization point as defined by Postgres because the user might be interested in emitting Sync message as a way to solely close the current series of queries in the pipeline: i.e., flush queries from client to server and reset the pipeline error state. In this respect, the 'fetch' step should be explicit. BasePipeline._sync_gen() is changed to only emit a Sync and a new _exit_gen() method is introduced doing what _sync_gen() previously did. Accordingly, the warning emitted when calling this _exit_gen() at pipeline exit is adjusted to say "terminating" instead of "syncing". --- diff --git a/psycopg/psycopg/_pipeline.py b/psycopg/psycopg/_pipeline.py index 63bf765cb..068018dd2 100644 --- a/psycopg/psycopg/_pipeline.py +++ b/psycopg/psycopg/_pipeline.py @@ -86,10 +86,13 @@ class BasePipeline: def _sync_gen(self) -> PQGen[None]: self._enqueue_sync() + yield from self._communicate_gen() + + def _exit_gen(self) -> PQGen[None]: try: # Send any pending commands (e.g. COMMIT or Sync); # while processing results, we might get errors... - yield from self._communicate_gen() + yield from self._sync_gen() finally: # then fetch all remaining results but without forcing # flush since we emitted a sync just before. @@ -173,11 +176,8 @@ class Pipeline(BasePipeline): super().__init__(conn) def sync(self) -> None: - """Sync the pipeline, send any pending command and fetch and process + """Sync the pipeline, send any pending command and receive and process all available results. - - This is called when exiting the pipeline, but can be used for other - purposes (e.g. in nested pipelines). """ try: with self._conn.lock: @@ -196,11 +196,12 @@ class Pipeline(BasePipeline): exc_tb: Optional[TracebackType], ) -> None: try: - self.sync() + with self._conn.lock: + self._conn.wait(self._exit_gen()) except Exception as exc2: # Don't clobber an exception raised in the block with this one if exc_val: - logger.warning("error ignored syncing %r: %s", self, exc2) + logger.warning("error ignored terminating %r: %s", self, exc2) else: raise exc2.with_traceback(None) finally: @@ -249,11 +250,12 @@ class AsyncPipeline(BasePipeline): exc_tb: Optional[TracebackType], ) -> None: try: - await self.sync() + async with self._conn.lock: + await self._conn.wait(self._exit_gen()) except Exception as exc2: # Don't clobber an exception raised in the block with this one if exc_val: - logger.warning("error ignored syncing %r: %s", self, exc2) + logger.warning("error ignored terminating %r: %s", self, exc2) else: raise exc2.with_traceback(None) finally: diff --git a/psycopg/psycopg/cursor.py b/psycopg/psycopg/cursor.py index 0a534d563..8a473a987 100644 --- a/psycopg/psycopg/cursor.py +++ b/psycopg/psycopg/cursor.py @@ -241,7 +241,7 @@ class BaseCursor(Generic[ConnectionType, Row]): self._last_query = query if returning: - yield from pipeline._sync_gen() + yield from pipeline._exit_gen() for cmd in self._conn._prepared.get_maintenance_commands(): yield from self._conn._exec_command(cmd)