From: Daniele Varrazzo Date: Sun, 27 Mar 2022 19:47:11 +0000 (+0200) Subject: fix: don't clobber exceptions exiting the pipeline block X-Git-Tag: 3.1~146^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3b1db4900dd4415ba5b42ef002c70d7a42313a9b;p=thirdparty%2Fpsycopg.git fix: don't clobber exceptions exiting the pipeline block Problem found thanks to random failures in unrelated tests, e.g. https://github.com/psycopg/psycopg/actions/runs/2048536089 --- diff --git a/psycopg/psycopg/_pipeline.py b/psycopg/psycopg/_pipeline.py index edc1485a8..26923682b 100644 --- a/psycopg/psycopg/_pipeline.py +++ b/psycopg/psycopg/_pipeline.py @@ -4,6 +4,7 @@ commands pipeline management # Copyright (C) 2021 The Psycopg Team +import logging from types import TracebackType from typing import Any, List, Optional, Union, Tuple, Type, TYPE_CHECKING @@ -38,6 +39,8 @@ PendingResult: TypeAlias = Union[ None, Tuple["BaseCursor[Any, Any]", Optional[Tuple[Key, Prepare, bytes]]] ] +logger = logging.getLogger("psycopg") + class BasePipeline: @@ -168,13 +171,19 @@ class Pipeline(BasePipeline): with self._conn.lock: self.sync() try: - # Send an pending commands (e.g. COMMIT or Sync); + # Send any pending commands (e.g. COMMIT or Sync); # while processing results, we might get errors... self._conn.wait(self._communicate_gen()) finally: # then fetch all remaining results but without forcing # flush since we emitted a sync just before. self._conn.wait(self._fetch_gen(flush=False)) + except Exception as exc2: + # Don't clobber an exception raised in the block with this one + if exc_val: + logger.warning("error ignored exiting %r: %s", self, exc2) + else: + raise finally: self._exit() @@ -202,12 +211,18 @@ class AsyncPipeline(BasePipeline): async with self._conn.lock: self.sync() try: - # Send an pending commands (e.g. COMMIT or Sync); + # Send any pending commands (e.g. COMMIT or Sync); # while processing results, we might get errors... await self._conn.wait(self._communicate_gen()) finally: # then fetch all remaining results but without forcing # flush since we emitted a sync just before. await self._conn.wait(self._fetch_gen(flush=False)) + except Exception as exc2: + # Don't clobber an exception raised in the block with this one + if exc_val: + logger.warning("error ignored exiting %r: %s", self, exc2) + else: + raise finally: self._exit() diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index cfbfbad11..fde3c99f1 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -1,3 +1,4 @@ +import logging from typing import Any import concurrent.futures @@ -49,6 +50,16 @@ def test_pipeline_broken_conn_exit(conn: psycopg.Connection[Any]) -> None: assert closed +def test_pipeline_exit_error_noclobber(conn, caplog): + caplog.set_level(logging.WARNING, logger="psycopg") + with pytest.raises(ZeroDivisionError): + with conn.pipeline(): + conn.close() + 1 / 0 + + assert len(caplog.records) == 1 + + def test_cursor_stream(conn): with conn.pipeline(), conn.cursor() as cur: with pytest.raises(psycopg.ProgrammingError): diff --git a/tests/test_pipeline_async.py b/tests/test_pipeline_async.py index 67265c7ee..c7ce2df94 100644 --- a/tests/test_pipeline_async.py +++ b/tests/test_pipeline_async.py @@ -1,4 +1,5 @@ import asyncio +import logging from typing import Any import pytest @@ -52,6 +53,16 @@ async def test_pipeline_broken_conn_exit(aconn: psycopg.AsyncConnection[Any]) -> assert closed +async def test_pipeline_exit_error_noclobber(aconn, caplog): + caplog.set_level(logging.WARNING, logger="psycopg") + with pytest.raises(ZeroDivisionError): + async with aconn.pipeline(): + await aconn.close() + 1 / 0 + + assert len(caplog.records) == 1 + + async def test_cursor_stream(aconn): async with aconn.pipeline(), aconn.cursor() as cur: with pytest.raises(psycopg.ProgrammingError):