]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
add row logging tests main
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 12 Jun 2026 17:33:05 +0000 (13:33 -0400)
committerMichael Bayer <mike_mp@zzzcomputing.com>
Fri, 12 Jun 2026 18:14:54 +0000 (18:14 +0000)
ensure all the main result methods as well as the ORM-used
_raw_all_rows() perform equivalent row logging.

Change-Id: Iee0dc3c75a9c71f6d1a73fa058c5edf0b16348ec

test/engine/test_logging.py

index 34e8dcc198d88220482662e6cbc67b6c38a1c54a..5ba30927875c203ae7357b1ab09f66dacbf10bf7 100644 (file)
@@ -1136,3 +1136,97 @@ class EchoTest(fixtures.TestBase):
 
         assert self.buf.buffer[5].getMessage().startswith("SELECT 6")
         assert len(self.buf.buffer) == 8
+
+
+class RowLoggingTest(fixtures.TablesTest):
+    __only_on__ = "sqlite+pysqlite"
+
+    @classmethod
+    def define_tables(cls, metadata):
+        Table(
+            "data",
+            metadata,
+            Column("id", Integer, primary_key=True),
+            Column("val", String(50)),
+        )
+
+    @classmethod
+    def insert_data(cls, connection):
+        connection.execute(
+            cls.tables.data.insert(),
+            [
+                {"id": 1, "val": "v1"},
+                {"id": 2, "val": "v2"},
+                {"id": 3, "val": "v3"},
+            ],
+        )
+
+    def setup_test(self):
+        self.buf = logging.handlers.BufferingHandler(100)
+        logging.getLogger("sqlalchemy.engine").addHandler(self.buf)
+
+    def teardown_test(self):
+        logging.getLogger("sqlalchemy.engine").removeHandler(self.buf)
+
+    @testing.fixture(params=["echo_debug", "plain_logging"])
+    def debug_engine(self, testing_engine, request):
+        if request.param == "echo_debug":
+            yield testing_engine(
+                options={"echo": "debug", "sqlite_share_pool": True}
+            )
+        elif request.param == "plain_logging":
+            log = logging.getLogger("sqlalchemy.engine")
+            existing_level = log.level
+            log.setLevel(logging.DEBUG)
+            try:
+                yield testing_engine(options={"sqlite_share_pool": True})
+            finally:
+                log.setLevel(existing_level)
+
+    def _get_row_messages(self):
+        return [
+            rec.getMessage()
+            for rec in self.buf.buffer
+            if rec.getMessage().startswith("Row ")
+        ]
+
+    @testing.combinations(
+        ("all", lambda result: result.all(), 3),
+        ("first", lambda result: result.first(), 1),
+        ("fetchone", lambda result: result.fetchone(), 1),
+        ("fetchmany", lambda result: result.fetchmany(2), 2),
+        ("scalar", lambda result: result.scalar(), 1),
+        ("partitions", lambda result: list(result.partitions(2)), 3),
+        ("_raw_all_rows", lambda result: result._raw_all_rows(), 3),
+        id_="iaa",
+        argnames="consume,expected_rows",
+    )
+    def test_row_logging(self, debug_engine, consume, expected_rows):
+        t = self.tables.data
+        self.buf.flush()
+
+        with debug_engine.connect() as conn:
+            result = conn.execute(select(t).order_by(t.c.id))
+            consume(result)
+
+        eq_(
+            self._get_row_messages(),
+            ["Row (%d, 'v%d')" % (i, i) for i in range(1, expected_rows + 1)],
+        )
+
+    @testing.combinations(
+        ("echo_false", False),
+        ("echo_true", True),
+        id_="ia",
+        argnames="echo",
+    )
+    def test_no_row_logging(self, testing_engine, echo):
+        t = self.tables.data
+        eng = testing_engine(options={"echo": echo, "sqlite_share_pool": True})
+        self.buf.flush()
+
+        with eng.connect() as conn:
+            result = conn.execute(select(t).order_by(t.c.id))
+            result.all()
+
+        eq_(self._get_row_messages(), [])