+
+
+class RowLoggingTest(fixtures.TablesTest):
+ __only_on__ = "sqlite+pysqlite"
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "data",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("val", String(50)),
+ )
+
+ @classmethod
+ def insert_data(cls, connection):
+ connection.execute(
+ cls.tables.data.insert(),
+ [
+ {"id": 1, "val": "v1"},
+ {"id": 2, "val": "v2"},
+ {"id": 3, "val": "v3"},
+ ],
+ )
+
+ def setup_test(self):
+ self.buf = logging.handlers.BufferingHandler(100)
+ logging.getLogger("sqlalchemy.engine").addHandler(self.buf)
+
+ def teardown_test(self):
+ logging.getLogger("sqlalchemy.engine").removeHandler(self.buf)
+
+ @testing.fixture(params=["echo_debug", "plain_logging"])
+ def debug_engine(self, testing_engine, request):
+ if request.param == "echo_debug":
+ yield testing_engine(
+ options={"echo": "debug", "sqlite_share_pool": True}
+ )
+ elif request.param == "plain_logging":
+ log = logging.getLogger("sqlalchemy.engine")
+ existing_level = log.level
+ log.setLevel(logging.DEBUG)
+ try:
+ yield testing_engine(options={"sqlite_share_pool": True})
+ finally:
+ log.setLevel(existing_level)
+
+ def _get_row_messages(self):
+ return [
+ rec.getMessage()
+ for rec in self.buf.buffer
+ if rec.getMessage().startswith("Row ")
+ ]
+
+ @testing.combinations(
+ ("all", lambda result: result.all(), 3),
+ ("first", lambda result: result.first(), 1),
+ ("fetchone", lambda result: result.fetchone(), 1),
+ ("fetchmany", lambda result: result.fetchmany(2), 2),
+ ("scalar", lambda result: result.scalar(), 1),
+ ("partitions", lambda result: list(result.partitions(2)), 3),
+ ("_raw_all_rows", lambda result: result._raw_all_rows(), 3),
+ id_="iaa",
+ argnames="consume,expected_rows",
+ )
+ def test_row_logging(self, debug_engine, consume, expected_rows):
+ t = self.tables.data
+ self.buf.flush()
+
+ with debug_engine.connect() as conn:
+ result = conn.execute(select(t).order_by(t.c.id))
+ consume(result)
+
+ eq_(
+ self._get_row_messages(),
+ ["Row (%d, 'v%d')" % (i, i) for i in range(1, expected_rows + 1)],
+ )
+
+ @testing.combinations(
+ ("echo_false", False),
+ ("echo_true", True),
+ id_="ia",
+ argnames="echo",
+ )
+ def test_no_row_logging(self, testing_engine, echo):
+ t = self.tables.data
+ eng = testing_engine(options={"echo": echo, "sqlite_share_pool": True})
+ self.buf.flush()
+
+ with eng.connect() as conn:
+ result = conn.execute(select(t).order_by(t.c.id))
+ result.all()
+
+ eq_(self._get_row_messages(), [])