]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix: don't clobber exceptions exiting the pipeline block 175/head
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 27 Mar 2022 19:47:11 +0000 (21:47 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 2 Apr 2022 23:17:57 +0000 (01:17 +0200)
Problem found thanks to random failures in unrelated tests, e.g.
https://github.com/psycopg/psycopg/actions/runs/2048536089

psycopg/psycopg/_pipeline.py
tests/test_pipeline.py
tests/test_pipeline_async.py

index edc1485a8b15cf43e87184ef148fd13b8f0c1c7a..26923682b47aa271bcb3b9d0115678db706c9648 100644 (file)
@@ -4,6 +4,7 @@ commands pipeline management
 
 # Copyright (C) 2021 The Psycopg Team
 
+import logging
 from types import TracebackType
 from typing import Any, List, Optional, Union, Tuple, Type, TYPE_CHECKING
 
@@ -38,6 +39,8 @@ PendingResult: TypeAlias = Union[
     None, Tuple["BaseCursor[Any, Any]", Optional[Tuple[Key, Prepare, bytes]]]
 ]
 
+logger = logging.getLogger("psycopg")
+
 
 class BasePipeline:
 
@@ -168,13 +171,19 @@ class Pipeline(BasePipeline):
             with self._conn.lock:
                 self.sync()
                 try:
-                    # Send an pending commands (e.g. COMMIT or Sync);
+                    # Send any pending commands (e.g. COMMIT or Sync);
                     # while processing results, we might get errors...
                     self._conn.wait(self._communicate_gen())
                 finally:
                     # then fetch all remaining results but without forcing
                     # flush since we emitted a sync just before.
                     self._conn.wait(self._fetch_gen(flush=False))
+        except Exception as exc2:
+            # Don't clobber an exception raised in the block with this one
+            if exc_val:
+                logger.warning("error ignored exiting %r: %s", self, exc2)
+            else:
+                raise
         finally:
             self._exit()
 
@@ -202,12 +211,18 @@ class AsyncPipeline(BasePipeline):
             async with self._conn.lock:
                 self.sync()
                 try:
-                    # Send an pending commands (e.g. COMMIT or Sync);
+                    # Send any pending commands (e.g. COMMIT or Sync);
                     # while processing results, we might get errors...
                     await self._conn.wait(self._communicate_gen())
                 finally:
                     # then fetch all remaining results but without forcing
                     # flush since we emitted a sync just before.
                     await self._conn.wait(self._fetch_gen(flush=False))
+        except Exception as exc2:
+            # Don't clobber an exception raised in the block with this one
+            if exc_val:
+                logger.warning("error ignored exiting %r: %s", self, exc2)
+            else:
+                raise
         finally:
             self._exit()
index cfbfbad115937b3e0157f3ddf075061eb9b2ab7b..fde3c99f10be9bfb2fb77202fd263e4eeda19b9d 100644 (file)
@@ -1,3 +1,4 @@
+import logging
 from typing import Any
 import concurrent.futures
 
@@ -49,6 +50,16 @@ def test_pipeline_broken_conn_exit(conn: psycopg.Connection[Any]) -> None:
     assert closed
 
 
+def test_pipeline_exit_error_noclobber(conn, caplog):
+    caplog.set_level(logging.WARNING, logger="psycopg")
+    with pytest.raises(ZeroDivisionError):
+        with conn.pipeline():
+            conn.close()
+            1 / 0
+
+    assert len(caplog.records) == 1
+
+
 def test_cursor_stream(conn):
     with conn.pipeline(), conn.cursor() as cur:
         with pytest.raises(psycopg.ProgrammingError):
index 67265c7ee794f62207d80335b15debf8edd7d6fd..c7ce2df94783539c617b5f89a4136ef71918e0b1 100644 (file)
@@ -1,4 +1,5 @@
 import asyncio
+import logging
 from typing import Any
 
 import pytest
@@ -52,6 +53,16 @@ async def test_pipeline_broken_conn_exit(aconn: psycopg.AsyncConnection[Any]) ->
     assert closed
 
 
+async def test_pipeline_exit_error_noclobber(aconn, caplog):
+    caplog.set_level(logging.WARNING, logger="psycopg")
+    with pytest.raises(ZeroDivisionError):
+        async with aconn.pipeline():
+            await aconn.close()
+            1 / 0
+
+    assert len(caplog.records) == 1
+
+
 async def test_cursor_stream(aconn):
     async with aconn.pipeline(), aconn.cursor() as cur:
         with pytest.raises(psycopg.ProgrammingError):