--- /dev/null
+.. change::
+ :tags: performance, orm
+ :tickets: 13363
+
+ ORM result row fetching now processes rows as plain tuples rather than
+ constructing :class:`.Row` objects, as ORM loaders use position-based
+ access and do not require the :class:`.Row` interface. :class:`.Row`
+ construction is still used when engine-level debug logging is enabled so
+ that individual rows can be logged. Benchmarks show a 3-16% improvement in
+ ORM entity load times depending on query shape. Pull request courtesy
+ Oliver Parker.
@HasMemoized_ro_memoized_attribute
def _row_getter(
self,
- ) -> tuple[Callable[..., _R] | None, Callable[..., Sequence[_R]] | None]:
+ ) -> tuple[
+ Callable[..., _R] | None,
+ Callable[..., Sequence[_R]] | None,
+ Callable[..., Sequence[Any]] | None,
+ ]:
+ """Return a three-tuple of callables
+ ``(single_row, many_rows, interim_rows)``.
+
+ ``single_row`` and ``many_rows`` produce :class:`.Row` objects for
+ one input row or a sequence of them. ``interim_rows`` produces
+ rows in "interim" form for a sequence of input rows: plain tuples
+ with result processors and tuple filters applied but no
+ :class:`.Row` construction, falling back to ``many_rows`` when
+ Row objects are required (row logging, scalar sources).
+
+ """
real_result = self if self._real_result is None else self._real_result
metadata = self._metadata
if real_result._source_supports_scalars:
if not self._generate_rows:
- return None, None
+ return None, None, None
else:
flag = _FLAG_SCALAR_TO_TUPLE
elif tuple_filters is not None:
_Row(metadata, None, key_to_index, row) for row in rows
]
- return single_row_simple, many_rows_simple # type: ignore[return-value] # noqa: E501
+ def interim_rows_simple(rows: Sequence[Any], /) -> Sequence[Any]:
+ # interim rows must be tuples so that position-based
+ # getters and slices apply; most DBAPIs return tuples
+ # already
+ if rows and type(rows[0]) is not tuple:
+ return [tuple(row) for row in rows]
+ return rows
+
+ return single_row_simple, many_rows_simple, interim_rows_simple # type: ignore[return-value] # noqa: E501
first_row: cython.bint = True
def many_rows(rows: Sequence[Any], /) -> list[Any]:
return [single_row(row) for row in rows]
- return single_row, many_rows # type: ignore[return-value]
+ if flag == _FLAG_SCALAR_TO_TUPLE or has_log_row:
+ # scalar-source rows have no tuple form; row logging requires
+ # the Row objects themselves
+ interim_rows = many_rows
+ else:
+
+ def single_interim_row(input_row: Sequence[Any], /) -> Any:
+ if flag == _FLAG_TUPLE_FILTER:
+ input_row = tuple_filters(input_row)
+ if proc_size != 0:
+ input_row = _apply_processors(
+ processors, proc_size, proc_valid, input_row
+ )
+ elif type(input_row) is not tuple:
+ input_row = tuple(input_row)
+ return input_row
+
+ if cython.compiled:
+
+ def interim_rows(rows: Sequence[Any], /) -> Sequence[Any]:
+ size: cython.Py_hash_t = len(rows)
+ i: cython.Py_ssize_t
+ result: list = PyList_New(size)
+ for i in range(size):
+ row: object = single_interim_row(rows[i])
+ Py_INCREF(row)
+ PyList_SET_ITEM(result, i, row)
+ return result
+
+ else:
+
+ def interim_rows(rows: Sequence[Any], /) -> Sequence[Any]:
+ return [single_interim_row(row) for row in rows]
+
+ return single_row, many_rows, interim_rows # type: ignore[return-value] # noqa: E501
@HasMemoized_ro_memoized_attribute
def _iterator_getter(self) -> Callable[[], Iterator[_R]]:
return iterrows
- def _raw_all_rows(self) -> Sequence[_R]:
- make_rows = self._row_getter[1]
- assert make_rows is not None
- return make_rows(self._fetchall_impl())
+ def _raw_all_tuples(self) -> Sequence[tuple[Any, ...]]:
+ """Return all remaining rows as plain processed tuples, as produced
+ by the ``interim_rows`` element of :attr:`._row_getter`;
+ :class:`.Row` objects are produced instead where required (row
+ logging, scalar sources).
+
+ Used by ORM loading, whose row getters are position-based and
+ accept any tuple-like row.
+
+ Uniquing and post-creational filters are not applied here, so
+ may not be present on the :class:`.Result` when this method is
+ used.
+
+ """
+ assert self._unique_filter_state is None
+ assert self._post_creational_filter is None
+ interim_rows = self._row_getter[2]
+ assert interim_rows is not None
+ rows = self._fetchall_impl()
+ return interim_rows(rows)
def _allrows(self) -> Sequence[_R]:
post_creational_filter = self._post_creational_filter
if not fetch:
break
else:
- fetch = cursor._raw_all_rows()
+ fetch = cursor._raw_all_tuples()
if single_entity:
proc = process[0]
execution_options=execution_options,
)
if result.context is not None and result.context.requires_uniquing:
- result = result.unique()
- data = {k: v for k, v in result}
+ rows = result.unique()
+ else:
+ # consume the result as plain tuples, skipping per-row
+ # Row construction
+ rows = result._raw_all_tuples()
+ data = {k: v for k, v in rows}
for key in chunk:
# for a real foreign key and no concurrent changes to the
execution_options=execution_options,
)
if result.context is not None and result.context.requires_uniquing:
- result = result.unique()
+ rows = result.unique()
+ else:
+ # consume the result as plain tuples, skipping per-row
+ # Row construction
+ rows = result._raw_all_tuples()
data = collections.defaultdict(list)
- for k, v in itertools.groupby(result, lambda x: x[0]):
+ for k, v in itertools.groupby(rows, lambda x: x[0]):
data[k].extend(vv[1] for vv in v)
for key, state, state_dict, overwrite in chunk:
from collections.abc import Callable
from collections.abc import Sequence
+import logging
import sqlalchemy as sa
from .. import assertions
return run_test
+ @config.fixture()
+ def debug_logging_engine(self, testing_engine):
+ log = logging.getLogger("sqlalchemy.engine")
+ existing_level = log.level
+
+ buf = logging.handlers.BufferingHandler(100)
+ log.addHandler(buf)
+
+ def get_testing_engine(echo=None, log_level=None):
+ options = {"sqlite_share_pool": True}
+ if echo is not None:
+ options["echo"] = echo
+ if log_level:
+ log.setLevel(logging.DEBUG)
+ return testing_engine(options=options)
+
+ try:
+ yield get_testing_engine, buf
+ finally:
+ log.setLevel(existing_level)
+ log.removeHandler(buf)
+
_connection_fixture_connection = None
],
)
- def setup_test(self):
- self.buf = logging.handlers.BufferingHandler(100)
- logging.getLogger("sqlalchemy.engine").addHandler(self.buf)
-
- def teardown_test(self):
- logging.getLogger("sqlalchemy.engine").removeHandler(self.buf)
-
@testing.fixture(params=["echo_debug", "plain_logging"])
- def debug_engine(self, testing_engine, request):
+ def debug_engine(self, debug_logging_engine, request):
+ testing_engine, buf = debug_logging_engine
if request.param == "echo_debug":
- yield testing_engine(
- options={"echo": "debug", "sqlite_share_pool": True}
- )
+ yield testing_engine(echo="debug"), buf
elif request.param == "plain_logging":
- log = logging.getLogger("sqlalchemy.engine")
- existing_level = log.level
- log.setLevel(logging.DEBUG)
- try:
- yield testing_engine(options={"sqlite_share_pool": True})
- finally:
- log.setLevel(existing_level)
-
- def _get_row_messages(self):
+ yield testing_engine(log_level=logging.DEBUG), buf
+
+ def _get_row_messages(self, buf):
return [
rec.getMessage()
- for rec in self.buf.buffer
+ for rec in buf.buffer
if rec.getMessage().startswith("Row ")
]
("fetchmany", lambda result: result.fetchmany(2), 2),
("scalar", lambda result: result.scalar(), 1),
("partitions", lambda result: list(result.partitions(2)), 3),
- ("_raw_all_rows", lambda result: result._raw_all_rows(), 3),
+ ("_raw_all_tuples", lambda result: result._raw_all_tuples(), 3),
id_="iaa",
argnames="consume,expected_rows",
)
def test_row_logging(self, debug_engine, consume, expected_rows):
t = self.tables.data
- self.buf.flush()
- with debug_engine.connect() as conn:
+ engine, buf = debug_engine
+ with engine.connect() as conn:
result = conn.execute(select(t).order_by(t.c.id))
consume(result)
eq_(
- self._get_row_messages(),
+ self._get_row_messages(buf),
["Row (%d, 'v%d')" % (i, i) for i in range(1, expected_rows + 1)],
)
@testing.combinations(
("echo_false", False),
("echo_true", True),
+ ("echo_debug", "debug"),
id_="ia",
argnames="echo",
)
- def test_no_row_logging(self, testing_engine, echo):
+ def test_row_logging_flag(self, debug_logging_engine, echo):
t = self.tables.data
- eng = testing_engine(options={"echo": echo, "sqlite_share_pool": True})
- self.buf.flush()
+
+ testing_engine, buf = debug_logging_engine
+ eng = testing_engine(echo=echo)
with eng.connect() as conn:
result = conn.execute(select(t).order_by(t.c.id))
result.all()
- eq_(self._get_row_messages(), [])
+ if echo == "debug":
+ eq_(
+ self._get_row_messages(buf),
+ ["Row (%d, 'v%d')" % (i, i) for i in range(1, 4)],
+ )
+ else:
+ eq_(self._get_row_messages(buf), [])
from sqlalchemy import delete
from sqlalchemy import exc
from sqlalchemy import insert
+from sqlalchemy import Integer
from sqlalchemy import literal
from sqlalchemy import literal_column
from sqlalchemy import select
+from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import text
+from sqlalchemy import TypeDecorator
from sqlalchemy import update
from sqlalchemy.orm import loading
from sqlalchemy.orm import relationship
+from sqlalchemy.orm import Session
+from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
from sqlalchemy.testing.assertions import assert_raises
from sqlalchemy.testing.assertions import assert_raises_message
from sqlalchemy.testing.assertions import eq_
from sqlalchemy.testing.assertions import expect_raises_message
+from sqlalchemy.testing.assertions import is_
from sqlalchemy.testing.fixtures import fixture_session
+from sqlalchemy.testing.schema import Column
from . import _fixtures
# class GetFromIdentityTest(_fixtures.FixtureTest):
it = list(it())
eq_([(x.id, y) for x, y in it], [(7, 7), (8, 8), (9, 9)])
eq_(list(it[0]._mapping.keys()), ["User", "id"])
+
+
+class InterimRowsLoadTest(fixtures.DeclarativeMappedTest):
+ """ORM loading fetches rows as plain processed tuples via
+ Result._raw_all_tuples(); test that result processors apply and
+ that engine-level row logging, which requires Row objects, still
+ loads correctly via its fallback."""
+
+ @classmethod
+ def setup_classes(cls):
+ Base = cls.DeclarativeBasic
+
+ class UpperString(TypeDecorator):
+ impl = String(50)
+ cache_ok = True
+
+ def process_result_value(self, value, dialect):
+ return value.upper() if value is not None else None
+
+ class A(Base):
+ __tablename__ = "interim_a"
+ id = Column(Integer, primary_key=True)
+ data = Column(UpperString())
+
+ @classmethod
+ def insert_data(cls, connection):
+ A = cls.classes.A
+ s = Session(connection)
+ s.add_all([A(id=1, data="one"), A(id=2, data=None)])
+ s.commit()
+
+ def _assert_load(self, connection):
+ A = self.classes.A
+ s = fixture_session(bind=connection)
+ a1, a2 = s.query(A).order_by(A.id).all()
+ eq_(a1.data, "ONE")
+ is_(a2.data, None)
+
+ def test_result_processors_applied(self, connection):
+ self._assert_load(connection)
+
+ def test_row_logging_fallback(self, debug_logging_engine):
+ """with debug-level logging, the ORM row fetch falls back to
+ constructing Row objects, and each row is logged"""
+
+ testing_engine, buf = debug_logging_engine
+
+ engine = testing_engine(echo="debug")
+
+ with engine.connect() as conn:
+ self._assert_load(conn)
+
+ row_messages = [
+ rec.getMessage()
+ for rec in buf.buffer
+ if rec.getMessage().startswith("Row ")
+ ]
+ eq_(
+ row_messages,
+ ["Row (1, 'ONE')", "Row (2, None)"],
+ )
setattr(cls, name + "_iter", test_case(go_iter, number=number))
+ def go_raw_all_tuples(self):
+ result = self.impl(*init_args())
+ result._raw_all_tuples()
+
+ setattr(
+ cls,
+ name + "_raw_all_tuples",
+ test_case(go_raw_all_tuples, number=number),
+ )
+
def go_iter_uq(self):
result = self.impl(*init_args()).unique()
for _ in result:
# TEST: test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline
test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_cextensions 13312
-test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_nocextensions 25384
+test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_nocextensions 16387
test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_cextensions 13347
-test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_nocextensions 25386
+test.aaa_profiling.test_orm.DeferOptionsTest.test_baseline x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_nocextensions 16389
# TEST: test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols
test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_cextensions 19455
-test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_nocextensions 25491
+test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_nocextensions 22494
test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_cextensions 19454
-test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_nocextensions 25493
+test.aaa_profiling.test_orm.DeferOptionsTest.test_defer_many_cols x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_nocextensions 22496
# TEST: test.aaa_profiling.test_orm.JoinConditionTest.test_a_to_b_aliased
# TEST: test.aaa_profiling.test_orm.JoinedEagerLoadTest.test_fetch_results_integrated
test.aaa_profiling.test_orm.JoinedEagerLoadTest.test_fetch_results_integrated x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_cextensions 25393,982,92557
-test.aaa_profiling.test_orm.JoinedEagerLoadTest.test_fetch_results_integrated x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_nocextensions 26572,1222,116557
+test.aaa_profiling.test_orm.JoinedEagerLoadTest.test_fetch_results_integrated x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_nocextensions 26572,1104,105747
test.aaa_profiling.test_orm.JoinedEagerLoadTest.test_fetch_results_integrated x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_cextensions 25362,987,93057
-test.aaa_profiling.test_orm.JoinedEagerLoadTest.test_fetch_results_integrated x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_nocextensions 26385,1223,116657
+test.aaa_profiling.test_orm.JoinedEagerLoadTest.test_fetch_results_integrated x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_nocextensions 26385,1102,105547
# TEST: test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_identity
# TEST: test.aaa_profiling.test_orm.QueryTest.test_query_cols
test.aaa_profiling.test_orm.QueryTest.test_query_cols x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_cextensions 6502
-test.aaa_profiling.test_orm.QueryTest.test_query_cols x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_nocextensions 7562
+test.aaa_profiling.test_orm.QueryTest.test_query_cols x86_64_linux_cpython_3.13_sqlite_pysqlite_dbapiunicode_nocextensions 7075
test.aaa_profiling.test_orm.QueryTest.test_query_cols x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_cextensions 6512
-test.aaa_profiling.test_orm.QueryTest.test_query_cols x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_nocextensions 7572
+test.aaa_profiling.test_orm.QueryTest.test_query_cols x86_64_linux_cpython_3.14_sqlite_pysqlite_dbapiunicode_nocextensions 7085
# TEST: test.aaa_profiling.test_orm.SelectInEagerLoadTest.test_round_trip_results
from sqlalchemy.engine import cursor as _cursor
from sqlalchemy.engine import default
from sqlalchemy.engine import Row
+from sqlalchemy.engine.result import IteratorResult
from sqlalchemy.engine.result import SimpleResultMetaData
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql import ColumnElement
start += 20
assert result._soft_closed
+
+
+class AllTuplesTest(fixtures.TablesTest):
+ """test Result._raw_all_tuples(), which ORM loading uses to fetch
+ processed rows without constructing Row objects."""
+
+ @classmethod
+ def define_tables(cls, metadata):
+ class UpperString(TypeDecorator):
+ impl = String(50)
+ cache_ok = True
+
+ def process_result_value(self, value, dialect):
+ return value.upper() if value is not None else None
+
+ Table(
+ "interim",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("plain", String(50)),
+ Column("upper", UpperString()),
+ )
+
+ @classmethod
+ def insert_data(cls, connection):
+ connection.execute(
+ cls.tables.interim.insert(),
+ [
+ {"id": 1, "plain": "p1", "upper": "u1"},
+ {"id": 2, "plain": "p2", "upper": None},
+ ],
+ )
+
+ def test_processors_applied(self, connection):
+ """rows are plain tuples with result processors applied"""
+ t = self.tables.interim
+ result = connection.execute(select(t).order_by(t.c.id))
+ rows = result._raw_all_tuples()
+ eq_(list(rows), [(1, "p1", "U1"), (2, "p2", None)])
+ for r in rows:
+ is_(type(r), tuple)
+
+ def test_no_processors(self, connection):
+ """rows with no result processors in play are plain tuples"""
+ t = self.tables.interim
+ result = connection.execute(select(t.c.id, t.c.plain).order_by(t.c.id))
+ rows = result._raw_all_tuples()
+ eq_(list(rows), [(1, "p1"), (2, "p2")])
+ for r in rows:
+ is_(type(r), tuple)
+
+ def test_empty_result(self, connection):
+ t = self.tables.interim
+ result = connection.execute(select(t).where(t.c.id == -1))
+ eq_(list(result._raw_all_tuples()), [])
+
+ def test_rejects_unique_filter(self, connection):
+ """_raw_all_tuples() does not apply uniquing, so asserts it's not
+ present"""
+ t = self.tables.interim
+ result = connection.execute(select(t).order_by(t.c.id))
+ with expect_raises(AssertionError):
+ result.unique()._raw_all_tuples()
+ result.close()
+
+ def test_rejects_post_creational_filter(self, connection):
+ """_raw_all_tuples() does not apply post-creational filters such as
+ the one used by mappings(), so asserts it's not present"""
+ t = self.tables.interim
+ result = connection.execute(select(t).order_by(t.c.id))
+ with expect_raises(AssertionError):
+ result.mappings()._raw_all_tuples()
+ result.close()
+
+ def test_rejects_scalar_source(self):
+ """_raw_all_tuples() asserts that interim_rows is not None, which
+ excludes results whose source delivers scalars without row
+ construction"""
+ result = IteratorResult(
+ SimpleResultMetaData(["a"]),
+ iter([(1,), (2,)]),
+ _source_supports_scalars=True,
+ )
+ result._generate_rows = False
+ with expect_raises(AssertionError):
+ result._raw_all_tuples()
+
+ def test_row_logging_falls_back_to_rows(self, debug_logging_engine):
+ """with debug-level engine logging established, Row objects are
+ built so that each row can be logged"""
+ t = self.tables.interim
+ testing_engine, buf = debug_logging_engine
+
+ engine = testing_engine(echo="debug")
+ with engine.connect() as conn:
+ result = conn.execute(select(t).order_by(t.c.id))
+ rows = result._raw_all_tuples()
+
+ eq_(len(rows), 2)
+ for r in rows:
+ is_true(isinstance(r, Row))
+ eq_(tuple(rows[0]), (1, "p1", "U1"))
+
+ row_messages = [
+ rec.getMessage()
+ for rec in buf.buffer
+ if rec.getMessage().startswith("Row ")
+ ]
+ eq_(
+ row_messages,
+ ["Row (1, 'p1', 'U1')", "Row (2, 'p2', None)"],
+ )