--- /dev/null
+.. change::
+ :tags: bug, orm
+ :tickets: 8168
+
+ Improved a fix first made in version 1.4 for :ticket:`8456` which scaled
+ back the usage of internal "polymorphic adapters", that are used to render
+ ORM queries when the :paramref:`_orm.Mapper.with_polymorphic` parameter is
+ used. These adapters, which are very complex and error prone, are now used
+ only in those cases where an explicit user-supplied subquery is used for
+ :paramref:`_orm.Mapper.with_polymorphic`, which includes only the use case
+ of concrete inheritance mappings that use the
+ :func:`_orm.polymorphic_union` helper, as well as the legacy use case of
+ using an aliased subquery for joined inheritance mappings, which is not
+ needed in modern use.
+
+ For the most common case of joined inheritance mappings that use the
+ built-in polymorphic loading scheme, which includes those which make use of
+ the :paramref:`_orm.Mapper.polymorphic_load` parameter set to ``inline``,
+ polymorphic adapters are now no longer used. This has both a positive
+ performance impact on the construction of queries as well as a
+ substantial simplification of the internal query rendering process.
+
+ The specific issue targeted was to allow a :func:`_orm.column_property`
+ to refer to joined-inheritance classes within a scalar subquery, which now
+ works as intuitively as is feasible.
+
+
from .path_registry import PathRegistry
from .util import _entity_corresponds_to
from .util import _ORMJoin
+from .util import _TraceAdaptRole
from .util import AliasedClass
from .util import Bundle
from .util import ORMAdapter
+from .util import ORMStatementAdapter
from .. import exc as sa_exc
from .. import future
from .. import inspect
def _create_with_polymorphic_adapter(self, ext_info, selectable):
"""given MapperEntity or ORMColumnEntity, setup polymorphic loading
- if appropriate
+ if called for by the Mapper.
+
+ As of #8168 in 2.0.0b5, polymorphic adapters, which greatly increase
+ the complexity of the query creation process, are not used at all
+ except in the quasi-legacy cases of with_polymorphic referring to an
+ alias and/or subquery. This would apply to concrete polymorphic
+ loading, and joined inheritance where a subquery is
+ passed to with_polymorphic (which is completely unnecessary in modern
+ use).
"""
if (
self._mapper_loads_polymorphically_with(
mp,
ORMAdapter(
- mp, mp._equivalent_columns, selectable=selectable
+ _TraceAdaptRole.WITH_POLYMORPHIC_ADAPTER,
+ mp,
+ equivalents=mp._equivalent_columns,
+ selectable=selectable,
),
)
)
== "orm"
)
- self._from_obj_alias = sql.util.ColumnAdapter(
- self.statement, adapt_on_names=not statement_is_orm
+
+ self._from_obj_alias = ORMStatementAdapter(
+ _TraceAdaptRole.ADAPT_FROM_STATEMENT,
+ self.statement,
+ adapt_on_names=not statement_is_orm,
)
return self
self._join_entities = ()
if self.compile_options._set_base_alias:
+ # legacy Query only
self._set_select_from_alias()
for memoized_entities in query._memoized_select_entities:
return stmt
def _set_select_from_alias(self):
+ """used only for legacy Query cases"""
query = self.select_statement # query
self._from_obj_alias = adapter
def _get_select_from_alias_from_obj(self, from_obj):
+ """used only for legacy Query cases"""
+
info = from_obj
if "parententity" in info._annotations:
elif isinstance(info.selectable, sql.selectable.AliasedReturnsRows):
equivs = self._all_equivs()
- return sql_util.ColumnAdapter(info, equivs)
+ assert info is info.selectable
+ return ORMStatementAdapter(
+ _TraceAdaptRole.LEGACY_SELECT_FROM_ALIAS,
+ info.selectable,
+ equivalents=equivs,
+ )
else:
return None
equivs = self._all_equivs()
- self.compound_eager_adapter = sql_util.ColumnAdapter(inner, equivs)
+ self.compound_eager_adapter = ORMStatementAdapter(
+ _TraceAdaptRole.COMPOUND_EAGER_STATEMENT, inner, equivalents=equivs
+ )
statement = future.select(
*([inner] + self.secondary_columns) # use_labels=self.labels
)
if need_adapter:
+ # if need_adapter is True, we are in a deprecated case and
+ # a warning has been emitted.
assert right_mapper
adapter = ORMAdapter(
- inspect(right), equivalents=right_mapper._equivalent_columns
+ _TraceAdaptRole.DEPRECATED_JOIN_ADAPT_RIGHT_SIDE,
+ inspect(right),
+ equivalents=right_mapper._equivalent_columns,
)
# if an alias() on the right side was generated,
elif (
not r_info.is_clause_element
and not right_is_aliased
- and right_mapper.with_polymorphic
- and isinstance(
- right_mapper._with_polymorphic_selectable,
- expression.AliasedReturnsRows,
- )
+ and right_mapper._has_aliased_polymorphic_fromclause
):
# for the case where the target mapper has a with_polymorphic
# set up, ensure an adapter is set up for criteria that works
# and similar
self._mapper_loads_polymorphically_with(
right_mapper,
- sql_util.ColumnAdapter(
- right_mapper.selectable,
- right_mapper._equivalent_columns,
+ ORMAdapter(
+ _TraceAdaptRole.WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN,
+ right_mapper,
+ selectable=right_mapper.selectable,
+ equivalents=right_mapper._equivalent_columns,
),
)
# if the onclause is a ClauseElement, adapt it with any
from .properties import ColumnProperty
from .relationships import RelationshipProperty
from .state import InstanceState
+ from .util import ORMAdapter
from ..engine import Row
from ..engine import RowMapping
from ..sql._typing import _ColumnExpressionArgument
from ..sql.elements import ColumnElement
from ..sql.schema import Column
from ..sql.selectable import FromClause
- from ..sql.util import ColumnAdapter
from ..util import OrderedSet
else:
return None
+ @HasMemoized.memoized_attribute
+ def _has_aliased_polymorphic_fromclause(self):
+ """return True if with_polymorphic[1] is an aliased fromclause,
+ like a subquery.
+
+ As of #8168, polymorphic adaption with ORMAdapter is used only
+ if this is present.
+
+ """
+ return self.with_polymorphic and isinstance(
+ self.with_polymorphic[1],
+ expression.AliasedReturnsRows,
+ )
+
@HasMemoized.memoized_attribute
def _should_select_with_poly_adapter(self):
"""determine if _MapperEntity or _ORMColumnEntity will need to use
# polymorphic selectable, *or* if the base mapper has either of those,
# we turn on the adaption thing. if not, we do *no* adaption.
#
+ # (UPDATE for #8168: the above comment was not accurate, as we were
+ # still saying "do polymorphic" if we were using an auto-generated
+ # flattened JOIN for with_polymorphic.)
+ #
# this splits the behavior among the "regular" joined inheritance
# and single inheritance mappers, vs. the "weird / difficult"
# concrete and joined inh mappings that use a with_polymorphic of
# these tests actually adapt the polymorphic selectable (like, the
# UNION or the SELECT subquery with JOIN in it) to be just the simple
# subclass table. Hence even if we are a "plain" inheriting mapper
- # but our base has a wpoly on it, we turn on adaption.
+ # but our base has a wpoly on it, we turn on adaption. This is a
+ # legacy case we should probably disable.
+ #
+ #
+ # UPDATE: simplified way more as of #8168. polymorphic adaption
+ # is turned off even if with_polymorphic is set, as long as there
+ # is no user-defined aliased selectable / subquery configured.
+ # this scales back the use of polymorphic adaption in practice
+ # to basically no cases except for concrete inheritance with a
+ # polymorphic base class.
+ #
return (
- self.with_polymorphic
+ self._has_aliased_polymorphic_fromclause
or self._requires_row_aliasing
- or self.base_mapper.with_polymorphic
+ or (self.base_mapper._has_aliased_polymorphic_fromclause)
or self.base_mapper._requires_row_aliasing
)
]
@HasMemoized.memoized_attribute
- def _polymorphic_adapter(self) -> Optional[sql_util.ColumnAdapter]:
- if self.with_polymorphic:
- return sql_util.ColumnAdapter(
- self.selectable, equivalents=self._equivalent_columns
+ def _polymorphic_adapter(self) -> Optional[orm_util.ORMAdapter]:
+ if self._has_aliased_polymorphic_fromclause:
+ return orm_util.ORMAdapter(
+ orm_util._TraceAdaptRole.MAPPER_POLYMORPHIC_ADAPTER,
+ self,
+ selectable=self.selectable,
+ equivalents=self._equivalent_columns,
+ limit_on_entity=False,
)
else:
return None
self,
row: Optional[Union[Row[Any], RowMapping]],
identity_token: Optional[Any] = None,
- adapter: Optional[ColumnAdapter] = None,
+ adapter: Optional[ORMAdapter] = None,
) -> _IdentityKeyType[_O]:
"""Return an identity-map key for use in storing/retrieving an
item from the identity map.
if alias is not None:
if isinstance(alias, str):
alias = prop.target.alias(alias)
- adapter = sql_util.ColumnAdapter(
- alias, equivalents=prop.mapper._equivalent_columns
+ adapter = orm_util.ORMAdapter(
+ orm_util._TraceAdaptRole.JOINEDLOAD_USER_DEFINED_ALIAS,
+ prop.mapper,
+ selectable=alias,
+ equivalents=prop.mapper._equivalent_columns,
+ limit_on_entity=False,
)
else:
if path.contains(
compile_state.attributes, "path_with_polymorphic"
)
adapter = orm_util.ORMAdapter(
+ orm_util._TraceAdaptRole.JOINEDLOAD_PATH_WITH_POLYMORPHIC,
with_poly_entity,
equivalents=prop.mapper._equivalent_columns,
)
to_adapt = self._gen_pooled_aliased_class(compile_state)
to_adapt_insp = inspect(to_adapt)
+
clauses = to_adapt_insp._memo(
("joinedloader_ormadapter", self),
orm_util.ORMAdapter,
+ orm_util._TraceAdaptRole.JOINEDLOAD_MEMOIZED_ADAPTER,
to_adapt_insp,
equivalents=self.mapper._equivalent_columns,
adapt_required=True,
from __future__ import annotations
+import enum
import re
import types
import typing
+from typing import AbstractSet
from typing import Any
from typing import Callable
from typing import cast
raise sa_exc.ArgumentError("class or instance is required")
+class _TraceAdaptRole(enum.Enum):
+ """Enumeration of all the use cases for ORMAdapter.
+
+ ORMAdapter remains one of the most complicated aspects of the ORM, as it is
+ used for in-place adaption of column expressions to be applied to a SELECT,
+ replacing :class:`.Table` and other objects that are mapped to classes with
+ aliases of those tables in the case of joined eager loading, or in the case
+ of polymorphic loading as used with concrete mappings or other custom "with
+ polymorphic" parameters, with whole user-defined subqueries. The
+ enumerations provide an overview of all the use cases used by ORMAdapter, a
+ layer of formality as to the introduction of new ORMAdapter use cases (of
+ which none are anticipated), as well as a means to trace the origins of a
+ particular ORMAdapter within runtime debugging.
+
+ SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on
+ open-ended statement adaption, including the ``Query.with_polymorphic()``
+ method and the ``Query.select_from_entity()`` methods, favoring
+ user-explicit aliasing schemes using the ``aliased()`` and
+ ``with_polymorphic()`` standalone constructs; these still use adaption,
+ however the adaption is applied in a narrower scope.
+
+ """
+
+ # aliased() use that is used to adapt individual attributes at query
+ # construction time
+ ALIASED_INSP = enum.auto()
+
+ # joinedload cases; typically adapt an ON clause of a relationship
+ # join
+ JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto()
+ JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto()
+ JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto()
+
+ # polymorphic cases - these are complex ones that replace FROM
+ # clauses, replacing tables with subqueries
+ MAPPER_POLYMORPHIC_ADAPTER = enum.auto()
+ WITH_POLYMORPHIC_ADAPTER = enum.auto()
+ WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto()
+ DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto()
+
+ # the from_statement() case, used only to adapt individual attributes
+ # from a given statement to local ORM attributes at result fetching
+ # time. assigned to ORMCompileState._from_obj_alias
+ ADAPT_FROM_STATEMENT = enum.auto()
+
+ # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case;
+ # the query is placed inside of a subquery with the LIMIT/OFFSET/etc.,
+ # joinedloads are then placed on the outside.
+ # assigned to ORMCompileState.compound_eager_adapter
+ COMPOUND_EAGER_STATEMENT = enum.auto()
+
+ # the legacy Query._set_select_from() case.
+ # this is needed for Query's set operations (i.e. UNION, etc. )
+ # as well as "legacy from_self()", which while removed from 2.0 as
+ # public API, is used for the Query.count() method. this one
+ # still does full statement traversal
+ # assigned to ORMCompileState._from_obj_alias
+ LEGACY_SELECT_FROM_ALIAS = enum.auto()
+
+
+class ORMStatementAdapter(sql_util.ColumnAdapter):
+ """ColumnAdapter which includes a role attribute."""
+
+ __slots__ = ("role",)
+
+ def __init__(
+ self,
+ role: _TraceAdaptRole,
+ selectable: Selectable,
+ *,
+ equivalents: Optional[_EquivalentColumnMap] = None,
+ adapt_required: bool = False,
+ allow_label_resolve: bool = True,
+ anonymize_labels: bool = False,
+ adapt_on_names: bool = False,
+ adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
+ ):
+ self.role = role
+ super().__init__(
+ selectable,
+ equivalents=equivalents,
+ adapt_required=adapt_required,
+ allow_label_resolve=allow_label_resolve,
+ anonymize_labels=anonymize_labels,
+ adapt_on_names=adapt_on_names,
+ adapt_from_selectables=adapt_from_selectables,
+ )
+
+
class ORMAdapter(sql_util.ColumnAdapter):
"""ColumnAdapter subclass which excludes adaptation of entities from
non-matching mappers.
"""
+ __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp")
+
is_aliased_class: bool
aliased_insp: Optional[AliasedInsp[Any]]
def __init__(
self,
+ role: _TraceAdaptRole,
entity: _InternalEntityType[Any],
+ *,
equivalents: Optional[_EquivalentColumnMap] = None,
adapt_required: bool = False,
allow_label_resolve: bool = True,
anonymize_labels: bool = False,
selectable: Optional[Selectable] = None,
+ limit_on_entity: bool = True,
+ adapt_on_names: bool = False,
+ adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
):
-
+ self.role = role
self.mapper = entity.mapper
if selectable is None:
selectable = entity.selectable
self.is_aliased_class = False
self.aliased_insp = None
- sql_util.ColumnAdapter.__init__(
- self,
+ super().__init__(
selectable,
equivalents,
adapt_required=adapt_required,
allow_label_resolve=allow_label_resolve,
anonymize_labels=anonymize_labels,
- include_fn=self._include_fn,
+ include_fn=self._include_fn if limit_on_entity else None,
+ adapt_on_names=adapt_on_names,
+ adapt_from_selectables=adapt_from_selectables,
)
def _include_fn(self, elem):
- # TODO: we still have cases where we should return False here
- # yet we are not able to reliably detect without false positives.
- # see issue #8168
-
entity = elem._annotations.get("parentmapper", None)
return not entity or entity.isa(self.mapper) or self.mapper.isa(entity)
mapper: Mapper[_O]
selectable: FromClause
- _adapter: sql_util.ColumnAdapter
+ _adapter: ORMAdapter
with_polymorphic_mappers: Sequence[Mapper[Any]]
_with_polymorphic_entities: Sequence[AliasedInsp[Any]]
self._is_with_polymorphic = False
self.with_polymorphic_mappers = [mapper]
- self._adapter = sql_util.ColumnAdapter(
- selectable,
+ self._adapter = ORMAdapter(
+ _TraceAdaptRole.ALIASED_INSP,
+ mapper,
+ selectable=selectable,
equivalents=mapper._equivalent_columns,
adapt_on_names=adapt_on_names,
anonymize_labels=True,
for m in self.with_polymorphic_mappers
if not adapt_on_names
},
+ limit_on_entity=False,
)
if nest_adapters:
from __future__ import annotations
from collections import deque
+import copy
from itertools import chain
import typing
from typing import AbstractSet
"""
+ __slots__ = (
+ "__traverse_options__",
+ "selectable",
+ "include_fn",
+ "exclude_fn",
+ "equivalents",
+ "adapt_on_names",
+ "adapt_from_selectables",
+ )
+
def __init__(
self,
selectable: Selectable,
# TODO: cython candidate
+ if self.include_fn and not self.include_fn(col): # type: ignore
+ return None
+ elif self.exclude_fn and self.exclude_fn(col): # type: ignore
+ return None
+
if isinstance(col, FromClause) and not isinstance(
col, functions.FunctionElement
):
# however the logic to check this moved here as of #7154 so that
# it is made specific to SQL rewriting and not all column
# correspondence
+
return None
if "adapt_column" in col._annotations:
if TYPE_CHECKING:
assert isinstance(col, KeyedColumnElement)
- if self.include_fn and not self.include_fn(col):
- return None
- elif self.exclude_fn and self.exclude_fn(col):
- return None
- else:
- return self._corresponding_column( # type: ignore
- col, require_embedded=True
- )
+ return self._corresponding_column( # type: ignore
+ col, require_embedded=True
+ )
class _ColumnLookup(Protocol):
"""
+ __slots__ = (
+ "columns",
+ "adapt_required",
+ "allow_label_resolve",
+ "_wrap",
+ "__weakref__",
+ )
+
columns: _ColumnLookup
def __init__(
anonymize_labels: bool = False,
adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
):
- ClauseAdapter.__init__(
- self,
+ super().__init__(
selectable,
equivalents,
include_fn=include_fn,
return self.columns[key]
def wrap(self, adapter):
- ac = self.__class__.__new__(self.__class__)
- ac.__dict__.update(self.__dict__)
+ ac = copy.copy(self)
ac._wrap = adapter
ac.columns = util.WeakPopulateDict(ac._locate_col) # type: ignore
if ac.include_fn or ac.exclude_fn:
return c
- def __getstate__(self):
- d = self.__dict__.copy()
- del d["columns"]
- return d
-
- def __setstate__(self, state):
- self.__dict__.update(state)
- self.columns = util.WeakPopulateDict(self._locate_col) # type: ignore
-
def _offset_or_limit_clause(
element: Union[int, _ColumnExpressionArgument[int]],
_ExtT = TypeVar("_ExtT", bound="ExternalTraversal")
-class ExternalTraversal:
+class ExternalTraversal(util.MemoizedSlots):
"""Base class for visitor objects which can traverse externally using
the :func:`.visitors.traverse` function.
"""
+ __slots__ = ("_visitor_dict", "_next")
+
__traverse_options__: Dict[str, Any] = {}
_next: Optional[ExternalTraversal]
return traverse(obj, self.__traverse_options__, self._visitor_dict)
- @util.memoized_property
- def _visitor_dict(self) -> Dict[str, _TraverseCallableType[Any]]:
+ def _memoized_attr__visitor_dict(
+ self,
+ ) -> Dict[str, _TraverseCallableType[Any]]:
visitors = {}
for name in dir(self):
"""
+ __slots__ = ()
+
def copy_and_process(
self, list_: List[ExternallyTraversible]
) -> List[ExternallyTraversible]:
"""
+ __slots__ = ()
+
def replace(
self, elem: ExternallyTraversible
) -> Optional[ExternallyTraversible]:
from sqlalchemy.testing import config
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
-from sqlalchemy.testing import skip_test
from sqlalchemy.testing.fixtures import ComparableEntity
from sqlalchemy.testing.fixtures import fixture_session
from sqlalchemy.testing.provision import normalize_sequence
def mapping(self, decl_base):
Base = decl_base
- def go(scenario, use_poly):
+ def go(scenario, use_poly, use_poly_on_retailer):
class Customer(Base):
__tablename__ = "customer"
id = Column(Integer, primary_key=True)
.scalar_subquery()
)
- __mapper_args__ = {"polymorphic_identity": "retailer"}
+ __mapper_args__ = {
+ "polymorphic_identity": "retailer",
+ "polymorphic_load": "inline"
+ if use_poly_on_retailer
+ else None,
+ }
return Customer, Store, Retailer
@testing.variation("scenario", ["mapped_cls", "table", "table_alias"])
@testing.variation("use_poly", [True, False])
- def test_select_attr_only(self, scenario, use_poly, mapping):
- Customer, Store, Retailer = mapping(scenario, use_poly)
+ @testing.variation("use_poly_on_retailer", [True, False])
+ def test_select_attr_only(
+ self, scenario, use_poly, use_poly_on_retailer, mapping
+ ):
+ Customer, Store, Retailer = mapping(
+ scenario, use_poly, use_poly_on_retailer
+ )
if scenario.mapped_cls:
self.assert_compile(
@testing.variation("scenario", ["mapped_cls", "table", "table_alias"])
@testing.variation("use_poly", [True, False])
- def test_select_cls(self, scenario, mapping, use_poly):
- Customer, Store, Retailer = mapping(scenario, use_poly)
+ @testing.variation("use_poly_on_retailer", [True, False])
+ def test_select_cls(
+ self, scenario, mapping, use_poly, use_poly_on_retailer
+ ):
+ Customer, Store, Retailer = mapping(
+ scenario, use_poly, use_poly_on_retailer
+ )
if scenario.mapped_cls:
- # breaks for use_poly, but this is not totally unexpected
- if use_poly:
- skip_test("Case not working yet")
self.assert_compile(
select(Retailer),
"SELECT (SELECT count(store.id) AS count_1 FROM customer "
"FROM customer JOIN retailer ON customer.id = retailer.id",
)
elif scenario.table:
- # TODO: breaks for use_poly, and this should not happen.
- # selecting from the Table should be honoring that
- if use_poly:
- skip_test("Case not working yet")
self.assert_compile(
select(Retailer),
"SELECT (SELECT count(store.id) AS count_1 FROM store "
--- /dev/null
+"""
+Debug ORMAdapter calls within ORM runs.
+
+Demos::
+
+ python tools/trace_orm_adapter.py -m pytest \
+ test/orm/inheritance/test_polymorphic_rel.py::PolymorphicAliasedJoinsTest::test_primary_eager_aliasing_joinedload
+
+ python tools/trace_orm_adapter.py -m pytest \
+ test/orm/test_eager_relations.py::LazyLoadOptSpecificityTest::test_pathed_joinedload_aliased_abs_bcs
+
+ python tools/trace_orm_adapter.py my_test_script.py
+
+
+The above two tests should spit out a ton of debug output. If a test or program
+has no debug output at all, that's a good thing! it means ORMAdapter isn't
+used for that case.
+
+You can then set a breakpoint at the end of any adapt step:
+
+ python tools/trace_orm_adapter.py -d 10 -m pytest -s \
+ test/orm/test_eager_relations.py::LazyLoadOptSpecificityTest::test_pathed_joinedload_aliased_abs_bcs
+
+
+""" # noqa: E501
+
+from __future__ import annotations
+
+import argparse
+import contextlib
+import contextvars
+import sys
+from typing import TYPE_CHECKING
+
+from sqlalchemy.orm import util
+
+
+if TYPE_CHECKING:
+ from typing import Any
+ from typing import List
+ from typing import Optional
+
+ from sqlalchemy.sql.elements import ColumnElement
+
+
+class _ORMAdapterTrace:
+ def _locate_col(
+ self, col: ColumnElement[Any]
+ ) -> Optional[ColumnElement[Any]]:
+ with self._tracer("_locate_col") as tracer:
+ return tracer(super()._locate_col, col)
+
+ def replace(self, col, _include_singleton_constants: bool = False):
+ with self._tracer("replace") as tracer:
+ return tracer(super().replace, col)
+
+ _orm_adapter_trace_context = contextvars.ContextVar("_tracer")
+
+ @contextlib.contextmanager
+ def _tracer(self, meth):
+ adapter = self
+ ctx = self._orm_adapter_trace_context.get(
+ {"stack": [], "last_depth": 0, "line_no": 0}
+ )
+ self._orm_adapter_trace_context.set(ctx)
+
+ stack: List[Any] = ctx["stack"] # type: ignore
+ last_depth = len(stack)
+ line_no: int = ctx["line_no"] # type: ignore
+ ctx["last_depth"] = last_depth
+ stack.append((adapter, meth))
+ indent = " " * last_depth
+
+ if hasattr(adapter, "mapper"):
+ adapter_desc = (
+ f"{adapter.__class__.__name__}"
+ f"({adapter.role.name}, mapper={adapter.mapper})"
+ )
+ else:
+ adapter_desc = f"{adapter.__class__.__name__}({adapter.role.name})"
+
+ def tracer_fn(fn, arg):
+ nonlocal line_no
+
+ line_no += 1
+
+ print(f"{indent} {line_no} {adapter_desc}", file=REAL_STDOUT)
+ sub_indent = " " * len(f"{line_no} ")
+
+ print(
+ f"{indent}{sub_indent} -> "
+ f"{meth} {_orm_adapter_trace_print(arg)}",
+ file=REAL_STDOUT,
+ )
+ ctx["line_no"] = line_no
+ ret = fn(arg)
+
+ if DEBUG_ADAPT_STEP == line_no:
+ breakpoint()
+
+ if ret is arg:
+ print(f"{indent} {line_no} <- same object", file=REAL_STDOUT)
+ else:
+ print(
+ f"{indent} {line_no} <- {_orm_adapter_trace_print(ret)}",
+ file=REAL_STDOUT,
+ )
+
+ if last_depth == 0:
+ print("", file=REAL_STDOUT)
+ return ret
+
+ try:
+ yield tracer_fn
+ finally:
+ stack.pop(-1)
+
+
+util.ORMAdapter.__bases__ = (_ORMAdapterTrace,) + util.ORMAdapter.__bases__
+util.ORMStatementAdapter.__bases__ = (
+ _ORMAdapterTrace,
+) + util.ORMStatementAdapter.__bases__
+
+
+def _orm_adapter_trace_print(obj):
+ if obj is None:
+ return "None"
+
+ t_print = _orm_adapter_trace_printers.get(obj.__visit_name__, None)
+ if t_print:
+ return t_print(obj)
+ else:
+ return f"{obj!r}"
+
+
+_orm_adapter_trace_printers = {
+ "table": lambda t: (
+ f'Table("{t.name}", '
+ f"entity={t._annotations.get('parentmapper', None)})"
+ ),
+ "column": lambda c: (
+ f'Column("{c.name}", {_orm_adapter_trace_print(c.table)} '
+ f"entity={c._annotations.get('parentmapper', None)})"
+ ),
+ "join": lambda j: (
+ f"{j.__class__.__name__}({_orm_adapter_trace_print(j.left)}, "
+ f"{_orm_adapter_trace_print(j.right)})"
+ ),
+ "label": lambda l: f"Label({_orm_adapter_trace_print(l.element)})",
+}
+
+DEBUG_ADAPT_STEP = None
+REAL_STDOUT = sys.__stdout__
+
+
+def main():
+ global DEBUG_ADAPT_STEP
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "-d", "--debug", type=int, help="breakpoint at this adaptation step"
+ )
+ parser.add_argument(
+ "-m",
+ "--module",
+ type=str,
+ help="import module name instead of running a script",
+ )
+ parser.add_argument(
+ "args", metavar="N", type=str, nargs="*", help="additional arguments"
+ )
+
+ argparse_args = []
+ sys_argv = list(sys.argv)
+
+ progname = sys_argv.pop(0)
+
+ # this is a little crazy, works at the moment for:
+ # module w args:
+ # python tools/trace_orm_adapter.py -m pytest test/orm/test_query.py -s
+ # script:
+ # python tools/trace_orm_adapter.py test3.py
+ has_module = False
+ while sys_argv:
+ arg = sys_argv.pop(0)
+ if arg in ("-m", "--module", "-d", "--debug"):
+ argparse_args.append(arg)
+ argparse_args.append(sys_argv.pop(0))
+ has_module = arg in ("-m", "--module")
+ else:
+ if not has_module:
+ argparse_args.append(arg)
+ else:
+ sys_argv.insert(0, arg)
+ break
+
+ options = parser.parse_args(argparse_args)
+ sys.argv = ["program.py"] + sys_argv
+
+ if options.module == "pytest":
+ sys.argv.extend(["--capture", "sys"])
+
+ import runpy
+
+ if options.debug:
+ DEBUG_ADAPT_STEP = options.debug
+
+ if options.module:
+ runpy.run_module(options.module, run_name="__main__")
+ else:
+ progname = options.args[0]
+
+ runpy.run_path(progname)
+
+
+if __name__ == "__main__":
+ main()