# Copyright (C) 2021 The Psycopg Team
+import logging
from types import TracebackType
from typing import Any, List, Optional, Union, Tuple, Type, TYPE_CHECKING
None, Tuple["BaseCursor[Any, Any]", Optional[Tuple[Key, Prepare, bytes]]]
]
+logger = logging.getLogger("psycopg")
+
class BasePipeline:
with self._conn.lock:
self.sync()
try:
- # Send an pending commands (e.g. COMMIT or Sync);
+ # Send any pending commands (e.g. COMMIT or Sync);
# while processing results, we might get errors...
self._conn.wait(self._communicate_gen())
finally:
# then fetch all remaining results but without forcing
# flush since we emitted a sync just before.
self._conn.wait(self._fetch_gen(flush=False))
+ except Exception as exc2:
+ # Don't clobber an exception raised in the block with this one
+ if exc_val:
+ logger.warning("error ignored exiting %r: %s", self, exc2)
+ else:
+ raise
finally:
self._exit()
async with self._conn.lock:
self.sync()
try:
- # Send an pending commands (e.g. COMMIT or Sync);
+ # Send any pending commands (e.g. COMMIT or Sync);
# while processing results, we might get errors...
await self._conn.wait(self._communicate_gen())
finally:
# then fetch all remaining results but without forcing
# flush since we emitted a sync just before.
await self._conn.wait(self._fetch_gen(flush=False))
+ except Exception as exc2:
+ # Don't clobber an exception raised in the block with this one
+ if exc_val:
+ logger.warning("error ignored exiting %r: %s", self, exc2)
+ else:
+ raise
finally:
self._exit()
+import logging
from typing import Any
import concurrent.futures
assert closed
+def test_pipeline_exit_error_noclobber(conn, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg")
+ with pytest.raises(ZeroDivisionError):
+ with conn.pipeline():
+ conn.close()
+ 1 / 0
+
+ assert len(caplog.records) == 1
+
+
def test_cursor_stream(conn):
with conn.pipeline(), conn.cursor() as cur:
with pytest.raises(psycopg.ProgrammingError):
import asyncio
+import logging
from typing import Any
import pytest
assert closed
+async def test_pipeline_exit_error_noclobber(aconn, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg")
+ with pytest.raises(ZeroDivisionError):
+ async with aconn.pipeline():
+ await aconn.close()
+ 1 / 0
+
+ assert len(caplog.records) == 1
+
+
async def test_cursor_stream(aconn):
async with aconn.pipeline(), aconn.cursor() as cur:
with pytest.raises(psycopg.ProgrammingError):