if fmt:
message = fmt(message)
- self.engine.logger.info(message, *arg, **kw)
+ self.engine.logger.info(message, *arg, stacklevel=2, **kw)
def _log_debug(self, message, *arg, **kw):
fmt = self._message_formatter
if fmt:
message = fmt(message)
- self.engine.logger.debug(message, *arg, **kw)
+ self.engine.logger.debug(message, *arg, stacklevel=2, **kw)
@property
def _schema_translate_map(self):
self.log(logging.CRITICAL, msg, *args, **kwargs)
- def log(self, level: int, msg: str, *args: Any, **kwargs: Any) -> None:
+ def log(
+ self,
+ level: int,
+ msg: str,
+ *args: Any,
+ stacklevel: int = 1,
+ **kwargs: Any,
+ ) -> None:
"""Delegate a log call to the underlying logger.
The level here is determined by the echo
selected_level = self.logger.getEffectiveLevel()
if level >= selected_level:
- self.logger._log(level, msg, args, **kwargs)
+ self.logger._log(
+ level, msg, args, stacklevel=stacklevel + 1, **kwargs
+ )
def isEnabledFor(self, level: int) -> bool:
"""Is this logger enabled for level 'level'?"""
]
)
+ def test_log_messages_have_correct_metadata(self, logging_engine):
+ buf = logging.handlers.BufferingHandler(100)
+ log = logging.getLogger("sqlalchemy.engine")
+ try:
+ log.addHandler(buf)
+
+ with logging_engine.connect().execution_options(
+ isolation_level="AUTOCOMMIT"
+ ) as conn:
+ conn.begin()
+ conn.rollback()
+ finally:
+ log.removeHandler(buf)
+
+ assert len(buf.buffer) >= 2
+
+ # log messages must originate from functions called 'begin'/'rollback'
+ logging_functions = {rec.funcName for rec in buf.buffer}
+ assert any(
+ "begin" in fn for fn in logging_functions
+ ), logging_functions
+ assert any(
+ "rollback" in fn for fn in logging_functions
+ ), logging_functions
+
+ # log messages must originate from different lines
+ log_lines = {rec.lineno for rec in buf.buffer}
+ assert len(log_lines) > 1, log_lines
+ buf.flush()
+
class LoggingTokenTest(fixtures.TestBase):
def setup_test(self):