From bfe559a7e4d69e5699c390ac9cafd2a5a2d38078 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Fri, 12 Jun 2026 13:33:05 -0400 Subject: [PATCH] add row logging tests ensure all the main result methods as well as the ORM-used _raw_all_rows() perform equivalent row logging. Change-Id: Iee0dc3c75a9c71f6d1a73fa058c5edf0b16348ec --- test/engine/test_logging.py | 94 +++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/test/engine/test_logging.py b/test/engine/test_logging.py index 34e8dcc198..5ba3092787 100644 --- a/test/engine/test_logging.py +++ b/test/engine/test_logging.py @@ -1136,3 +1136,97 @@ class EchoTest(fixtures.TestBase): assert self.buf.buffer[5].getMessage().startswith("SELECT 6") assert len(self.buf.buffer) == 8 + + +class RowLoggingTest(fixtures.TablesTest): + __only_on__ = "sqlite+pysqlite" + + @classmethod + def define_tables(cls, metadata): + Table( + "data", + metadata, + Column("id", Integer, primary_key=True), + Column("val", String(50)), + ) + + @classmethod + def insert_data(cls, connection): + connection.execute( + cls.tables.data.insert(), + [ + {"id": 1, "val": "v1"}, + {"id": 2, "val": "v2"}, + {"id": 3, "val": "v3"}, + ], + ) + + def setup_test(self): + self.buf = logging.handlers.BufferingHandler(100) + logging.getLogger("sqlalchemy.engine").addHandler(self.buf) + + def teardown_test(self): + logging.getLogger("sqlalchemy.engine").removeHandler(self.buf) + + @testing.fixture(params=["echo_debug", "plain_logging"]) + def debug_engine(self, testing_engine, request): + if request.param == "echo_debug": + yield testing_engine( + options={"echo": "debug", "sqlite_share_pool": True} + ) + elif request.param == "plain_logging": + log = logging.getLogger("sqlalchemy.engine") + existing_level = log.level + log.setLevel(logging.DEBUG) + try: + yield testing_engine(options={"sqlite_share_pool": True}) + finally: + log.setLevel(existing_level) + + def _get_row_messages(self): + return [ + rec.getMessage() + for rec in self.buf.buffer + if rec.getMessage().startswith("Row ") + ] + + @testing.combinations( + ("all", lambda result: result.all(), 3), + ("first", lambda result: result.first(), 1), + ("fetchone", lambda result: result.fetchone(), 1), + ("fetchmany", lambda result: result.fetchmany(2), 2), + ("scalar", lambda result: result.scalar(), 1), + ("partitions", lambda result: list(result.partitions(2)), 3), + ("_raw_all_rows", lambda result: result._raw_all_rows(), 3), + id_="iaa", + argnames="consume,expected_rows", + ) + def test_row_logging(self, debug_engine, consume, expected_rows): + t = self.tables.data + self.buf.flush() + + with debug_engine.connect() as conn: + result = conn.execute(select(t).order_by(t.c.id)) + consume(result) + + eq_( + self._get_row_messages(), + ["Row (%d, 'v%d')" % (i, i) for i in range(1, expected_rows + 1)], + ) + + @testing.combinations( + ("echo_false", False), + ("echo_true", True), + id_="ia", + argnames="echo", + ) + def test_no_row_logging(self, testing_engine, echo): + t = self.tables.data + eng = testing_engine(options={"echo": echo, "sqlite_share_pool": True}) + self.buf.flush() + + with eng.connect() as conn: + result = conn.execute(select(t).order_by(t.c.id)) + result.all() + + eq_(self._get_row_messages(), []) -- 2.47.3