+++ /dev/null
-.. change::
- :tags: bug, orm
- :tickets: 5050
-
- Fixed a reference cycle which could impact the GC behavior of the
- :class:`.WeakSequence` object, currently used within one place in certain
- mapper configurations. The issue only affects configuration-time
- structures. Pull request courtesy Carson Ip.
-
--- /dev/null
+.. change::
+ :tags: bug, orm, engine
+ :tickets: 5056, 5050
+
+ Added test support and repaired a wide variety of unnecessary reference
+ cycles created for short-lived objects, mostly in the area of ORM queries.
+ Thanks much to Carson Ip for the help on this.
+
def _get_context_loader(self, context, path):
load = None
- # use EntityRegistry.__getitem__()->PropRegistry here so
- # that the path is stated in terms of our base
- search_path = dict.__getitem__(path, self)
+ search_path = path[self]
# search among: exact match, "attr.*", "default" strategy
# if any.
if self.base_mapper.local_table in tables:
return None
- class ColumnsNotAvailable(Exception):
- pass
-
def visit_binary(binary):
leftcol = binary.left
rightcol = binary.right
passive=attributes.PASSIVE_NO_INITIALIZE,
)
if leftval in orm_util._none_set:
- raise ColumnsNotAvailable()
+ raise _OptGetColumnsNotAvailable()
binary.left = sql.bindparam(
None, leftval, type_=binary.right.type
)
passive=attributes.PASSIVE_NO_INITIALIZE,
)
if rightval in orm_util._none_set:
- raise ColumnsNotAvailable()
+ raise _OptGetColumnsNotAvailable()
binary.right = sql.bindparam(
None, rightval, type_=binary.right.type
)
{"binary": visit_binary},
)
)
- except ColumnsNotAvailable:
+ except _OptGetColumnsNotAvailable:
return None
cond = sql.and_(*allconds)
return result
+class _OptGetColumnsNotAvailable(Exception):
+ pass
+
+
def configure_mappers():
"""Initialize the inter-mapper relationships of all mappers that
have been constructed thus far.
"""
+ __slots__ = ()
+
is_token = False
is_root = False
@classmethod
def per_mapper(cls, mapper):
- return EntityRegistry(cls.root, mapper)
+ if mapper.is_mapper:
+ return CachingEntityRegistry(cls.root, mapper)
+ else:
+ return SlotsEntityRegistry(cls.root, mapper)
@classmethod
def coerce(cls, raw):
class TokenRegistry(PathRegistry):
+ __slots__ = ("token", "parent", "path", "natural_path")
+
def __init__(self, parent, token):
self.token = token
self.parent = parent
self._wildcard_path_loader_key = (
"loader",
- self.parent.path + self.prop._wildcard_token,
+ parent.path + self.prop._wildcard_token,
)
self._default_path_loader_key = self.prop._default_path_loader_key
self._loader_key = ("loader", self.path)
if isinstance(entity, (int, slice)):
return self.path[entity]
else:
- return EntityRegistry(self, entity)
+ return SlotsEntityRegistry(self, entity)
-class EntityRegistry(PathRegistry, dict):
- is_aliased_class = False
+class AbstractEntityRegistry(PathRegistry):
+ __slots__ = ()
+
has_entity = True
def __init__(self, parent, entity):
# are usually not present in mappings. So here we track both the
# "enhanced" path in self.path and the "natural" path that doesn't
# include those objects so these two traversals can be matched up.
+
if parent.path and self.is_aliased_class:
+ # this is an infrequent code path used only for loader strategies
+ # that also make use of of_type().
if entity.mapper.isa(parent.natural_path[-1].entity):
self.natural_path = parent.natural_path + (entity.mapper,)
else:
)
else:
self.natural_path = self.path
- self.entity_path = self
+
+ @property
+ def entity_path(self):
+ return self
@property
def mapper(self):
__nonzero__ = __bool__
+ def __getitem__(self, entity):
+ if isinstance(entity, (int, slice)):
+ return self.path[entity]
+ else:
+ return PropRegistry(self, entity)
+
+
+class SlotsEntityRegistry(AbstractEntityRegistry):
+ # for aliased class, return lightweight, no-cycles created
+ # version
+
+ __slots__ = (
+ "key",
+ "parent",
+ "is_aliased_class",
+ "entity",
+ "path",
+ "natural_path",
+ )
+
+
+class CachingEntityRegistry(AbstractEntityRegistry, dict):
+ # for long lived mapper, return dict based caching
+ # version that creates reference cycles
+
def __getitem__(self, entity):
if isinstance(entity, (int, slice)):
return self.path[entity]
def __missing__(self, key):
self[key] = item = PropRegistry(self, key)
+
return item
while "prev" in jp:
f, prev = jp["prev"]
prev = prev.copy()
- prev[f] = jp
+ prev[f] = jp.copy()
jp["prev"] = (f, prev)
jp = prev
self._joinpath = jp
if element is not None:
element = clone(element)
+ clone = None # remove gc cycles
return element
# build up a path indicating the path from the leftmost
# entity to the thing we're subquery loading.
- with_poly_info = path.get(
+ with_poly_entity = path.get(
context.attributes, "path_with_polymorphic", None
)
- if with_poly_info is not None:
- effective_entity = with_poly_info.entity
+ if with_poly_entity is not None:
+ effective_entity = with_poly_entity
else:
effective_entity = self.entity
chained_from_outerjoin,
)
- with_poly_info = path.get(
+ with_poly_entity = path.get(
context.attributes, "path_with_polymorphic", None
)
- if with_poly_info is not None:
- with_polymorphic = with_poly_info.with_polymorphic_mappers
+ if with_poly_entity is not None:
+ with_polymorphic = inspect(
+ with_poly_entity
+ ).with_polymorphic_mappers
else:
with_polymorphic = None
chained_from_outerjoin=chained_from_outerjoin,
)
- if with_poly_info is not None and None in set(
+ if with_poly_entity is not None and None in set(
context.secondary_columns
):
raise sa_exc.InvalidRequestError(
# otherwise figure it out.
alias = loadopt.local_opts["eager_from_alias"]
-
root_mapper, prop = path[-2:]
if alias is not None:
)
else:
if path.contains(context.attributes, "path_with_polymorphic"):
- with_poly_info = path.get(
+ with_poly_entity = path.get(
context.attributes, "path_with_polymorphic"
)
adapter = orm_util.ORMAdapter(
- with_poly_info.entity,
+ with_poly_entity,
equivalents=prop.mapper._equivalent_columns,
)
else:
parentmapper,
chained_from_outerjoin,
):
- with_poly_info = path.get(
+ with_poly_entity = path.get(
context.attributes, "path_with_polymorphic", None
)
- if with_poly_info:
- to_adapt = with_poly_info.entity
+ if with_poly_entity:
+ to_adapt = with_poly_entity
else:
to_adapt = self._gen_pooled_aliased_class(context)
# build up a path indicating the path from the leftmost
# entity to the thing we're subquery loading.
- with_poly_info = path_w_prop.get(
+ with_poly_entity = path_w_prop.get(
context.attributes, "path_with_polymorphic", None
)
- if with_poly_info is not None:
- effective_entity = with_poly_info.entity
+ if with_poly_entity is not None:
+ effective_entity = with_poly_entity
else:
effective_entity = self.entity
# Load objects
self.context = util.OrderedDict()
self.local_opts = {}
- self._of_type = None
self.is_class_strategy = False
@classmethod
is_class_strategy = False
strategy = None
propagate_to_loaders = False
+ _of_type = None
def process_query(self, query):
self._process(query, True)
ext_info.mapper,
aliased=True,
_use_mapper_path=True,
- _existing_alias=existing,
+ _existing_alias=inspect(existing)
+ if existing is not None
+ else None,
)
+
ext_info = inspect(ac)
path.entity_path[prop].set(
- self.context, "path_with_polymorphic", ext_info
+ self.context, "path_with_polymorphic", ac
)
path = path[prop][ext_info]
self, attr, strategy, propagate_to_loaders=True
):
strategy = self._coerce_strat(strategy)
- self.is_class_strategy = False
+
self.propagate_to_loaders = propagate_to_loaders
- # if the path is a wildcard, this will set propagate_to_loaders=False
- self._generate_path(self.path, attr, strategy, "relationship")
- self.strategy = strategy
- if strategy is not None:
- self._set_path_strategy()
+ cloned = self._clone_for_bind_strategy(attr, strategy, "relationship")
+ self.path = cloned.path
+ self._of_type = cloned._of_type
+ cloned.is_class_strategy = self.is_class_strategy = False
+ self.propagate_to_loaders = cloned.propagate_to_loaders
@_generative
def set_column_strategy(self, attrs, strategy, opts=None, opts_only=False):
self.is_class_strategy = False
for attr in attrs:
- cloned = self._generate()
- cloned.strategy = strategy
- cloned._generate_path(self.path, attr, strategy, "column")
+ cloned = self._clone_for_bind_strategy(
+ attr, strategy, "column", opts_only=opts_only, opts=opts
+ )
cloned.propagate_to_loaders = True
- if opts:
- cloned.local_opts.update(opts)
- if opts_only:
- cloned.is_opts_only = True
- cloned._set_path_strategy()
- self.is_class_strategy = False
@_generative
def set_generic_strategy(self, attrs, strategy):
strategy = self._coerce_strat(strategy)
for attr in attrs:
- path = self._generate_path(self.path, attr, strategy, None)
- cloned = self._generate()
- cloned.strategy = strategy
- cloned.path = path
+ cloned = self._clone_for_bind_strategy(attr, strategy, None)
cloned.propagate_to_loaders = True
- cloned._set_path_strategy()
@_generative
def set_class_strategy(self, strategy, opts):
strategy = self._coerce_strat(strategy)
- cloned = self._generate()
+ cloned = self._clone_for_bind_strategy(None, strategy, None)
cloned.is_class_strategy = True
- path = cloned._generate_path(self.path, None, strategy, None)
- cloned.strategy = strategy
- cloned.path = path
cloned.propagate_to_loaders = True
- cloned._set_path_strategy()
cloned.local_opts.update(opts)
+ def _clone_for_bind_strategy(
+ self, attr, strategy, wildcard_key, opts_only=False, opts=None
+ ):
+ """Create an anonymous clone of the Load/_UnboundLoad that is suitable
+ to be placed in the context / _to_bind collection of this Load
+ object. The clone will then lose references to context/_to_bind
+ in order to not create reference cycles.
+
+ """
+ cloned = self._generate()
+ cloned._generate_path(self.path, attr, strategy, wildcard_key)
+ cloned.strategy = strategy
+
+ cloned.local_opts = self.local_opts
+ if opts:
+ cloned.local_opts.update(opts)
+ if opts_only:
+ cloned.is_opts_only = True
+
+ if strategy or cloned.is_opts_only:
+ cloned._set_path_strategy()
+ return cloned
+
def _set_for_path(self, context, path, replace=True, merge_opts=False):
if merge_opts or not replace:
existing = path.get(self.context, "loader")
merge_opts=self.is_opts_only,
)
+ # remove cycles; _set_path_strategy is always invoked on an
+ # anonymous clone of the Load / UnboundLoad object since #5056
+ self.context = None
+
def __getstate__(self):
d = self.__dict__.copy()
- d["context"] = PathRegistry.serialize_context_dict(
- d["context"], ("loader",)
- )
+ if d["context"] is not None:
+ d["context"] = PathRegistry.serialize_context_dict(
+ d["context"], ("loader",)
+ )
d["path"] = self.path.serialize()
return d
def __setstate__(self, state):
self.__dict__.update(state)
self.path = PathRegistry.deserialize(self.path)
- self.context = PathRegistry.deserialize_context_dict(self.context)
+ if self.context is not None:
+ self.context = PathRegistry.deserialize_context_dict(self.context)
def _chop_path(self, to_chop, path):
i = -1
def _set_path_strategy(self):
self._to_bind.append(self)
- def _apply_to_parent(self, parent, applied, bound):
+ # remove cycles; _set_path_strategy is always invoked on an
+ # anonymous clone of the Load / UnboundLoad object since #5056
+ self._to_bind = None
+
+ def _apply_to_parent(self, parent, applied, bound, to_bind=None):
if self in applied:
return applied[self]
+ if to_bind is None:
+ to_bind = self._to_bind
+
cloned = self._generate()
applied[self] = cloned
assert cloned.is_opts_only == self.is_opts_only
new_to_bind = {
- elem._apply_to_parent(parent, applied, bound)
- for elem in self._to_bind
+ elem._apply_to_parent(parent, applied, bound, to_bind)
+ for elem in to_bind
}
cloned._to_bind = parent._to_bind
cloned._to_bind.extend(new_to_bind)
all_tokens = [token for key in keys for token in _split_key(key)]
for token in all_tokens[0:-1]:
+ # set _is_chain_link first so that clones of the
+ # object also inherit this flag
+ opt._is_chain_link = True
if chained:
opt = meth(opt, token, **kw)
else:
opt = opt.defaultload(token)
- opt._is_chain_link = True
opt = meth(opt, all_tokens[-1], **kw)
opt._is_chain_link = False
import re
import types
+import weakref
from . import attributes # noqa
from .base import _class_to_mapper # noqa
adapt_on_names,
represents_outer_join,
):
- self.entity = entity
+ self._weak_entity = weakref.ref(entity)
self.mapper = mapper
self.selectable = (
self.persist_selectable
) = self.local_table = selectable
self.name = name
self.polymorphic_on = polymorphic_on
- self._base_alias = _base_alias or self
+ self._base_alias = weakref.ref(_base_alias or self)
self._use_mapper_path = _use_mapper_path
self.represents_outer_join = represents_outer_join
self._adapt_on_names = adapt_on_names
self._target = mapper.class_
+ @property
+ def entity(self):
+ return self._weak_entity()
+
is_aliased_class = True
"always returns True"
:class:`.AliasedInsp`."""
return self.mapper.class_
- @util.memoized_property
+ @property
def _path_registry(self):
if self._use_mapper_path:
return self.mapper._path_registry
"adapt_on_names": self._adapt_on_names,
"with_polymorphic_mappers": self.with_polymorphic_mappers,
"with_polymorphic_discriminator": self.polymorphic_on,
- "base_alias": self._base_alias,
+ "base_alias": self._base_alias(),
"use_mapper_path": self._use_mapper_path,
"represents_outer_join": self.represents_outer_join,
}
"""
if entity.is_aliased_class:
if given.is_aliased_class:
- if entity._base_alias is given._base_alias:
+ if entity._base_alias() is given._base_alias():
return True
return False
elif given.is_aliased_class:
if element is not None:
element = clone(element)
+ clone = None # remove gc cycles
return element
if element is not None:
element = clone(element)
+ clone = None # remove gc cycles
return element
return self
+class prefix_anon_map(dict):
+ """A map that creates new keys for missing key access.
+ Considers keys of the form "<ident> <name>" to produce
+ new symbols "<name>_<index>", where "index" is an incrementing integer
+ corresponding to <name>.
+ Inlines the approach taken by :class:`sqlalchemy.util.PopulateDict` which
+ is otherwise usually used for this type of operation.
+ """
+
+ def __missing__(self, key):
+ (ident, derived) = key.split(" ", 1)
+ anonymous_counter = self.get(derived, 1)
+ self[derived] = anonymous_counter + 1
+ value = derived + "_" + str(anonymous_counter)
+ self[key] = value
+ return value
+
+
class SQLCompiler(Compiled):
"""Default implementation of :class:`.Compiled`.
# a map which tracks "anonymous" identifiers that are created on
# the fly here
- self.anon_map = util.PopulateDict(self._process_anon)
+ self.anon_map = prefix_anon_map()
# a map which tracks "truncated" names based on
# dialect.label_length or dialect.max_identifier_length
def _anonymize(self, name):
return name % self.anon_map
- def _process_anon(self, key):
- (ident, derived) = key.split(" ", 1)
- anonymous_counter = self.anon_map.get(derived, 1)
- self.anon_map[derived] = anonymous_counter + 1
- return derived + "_" + str(anonymous_counter)
-
def bindparam_string(
self, name, positional_names=None, expanding=False, **kw
):
"""
s = util.column_set()
f = self
+
+ # note this creates a cycle, asserted in test_memusage. however,
+ # turning this into a plain @property adds tends of thousands of method
+ # calls to Core / ORM performance tests, so the small overhead
+ # introduced by the relatively small amount of short term cycles
+ # produced here is preferable
while f is not None:
s.add(f)
f = f._is_clone_of
# RelationshipProperty.Comparator._criterion_exists() does
# this). Also keep _correlate liberally open with its previous
# contents, as this set is used for matching, not rendering.
- self._correlate = set(clone(f) for f in self._correlate).union(
+ self._correlate = set(clone(f, **kw) for f in self._correlate).union(
self._correlate
)
# unusual case but same idea applies
if self._correlate_except:
self._correlate_except = set(
- clone(f) for f in self._correlate_except
+ clone(f, **kw) for f in self._correlate_except
).union(self._correlate_except)
# 4. clone other things. The difficulty here is that Column
yield e
list(visit(expr))
+ visit = None # remove gc cycles
def find_tables(
anonymize_labels=anonymize_labels,
)
- self.columns = util.populate_column_dict(self._locate_col)
+ self.columns = util.WeakPopulateDict(self._locate_col)
if self.include_fn or self.exclude_fn:
self.columns = self._IncludeExcludeMapping(self, self.columns)
self.adapt_required = adapt_required
ac = self.__class__.__new__(self.__class__)
ac.__dict__.update(self.__dict__)
ac._wrap = adapter
- ac.columns = util.populate_column_dict(ac._locate_col)
+ ac.columns = util.WeakPopulateDict(ac._locate_col)
if ac.include_fn or ac.exclude_fn:
ac.columns = self._IncludeExcludeMapping(ac, ac.columns)
def __setstate__(self, state):
self.__dict__.update(state)
- self.columns = util.PopulateDict(self._locate_col)
+ self.columns = util.WeakPopulateDict(self._locate_col)
cloned = {}
stop_on = set(opts.get("stop_on", []))
- def clone(elem):
+ def clone(elem, **kw):
if elem in stop_on:
return elem
else:
if id(elem) not in cloned:
cloned[id(elem)] = newelem = elem._clone()
- newelem._copy_internals(clone=clone)
+ newelem._copy_internals(clone=clone, **kw)
meth = visitors.get(newelem.__visit_name__, None)
if meth:
meth(newelem)
if obj is not None:
obj = clone(obj)
+
+ clone = None # remove gc cycles
+
return obj
if obj is not None:
obj = clone(obj, **opts)
+ clone = None # remove gc cycles
return obj
from ._collections import OrderedIdentitySet # noqa
from ._collections import OrderedProperties # noqa
from ._collections import OrderedSet # noqa
-from ._collections import populate_column_dict # noqa
from ._collections import PopulateDict # noqa
from ._collections import Properties # noqa
from ._collections import ScopedRegistry # noqa
from ._collections import unique_list # noqa
from ._collections import UniqueAppender # noqa
from ._collections import update_copy # noqa
+from ._collections import WeakPopulateDict # noqa
from ._collections import WeakSequence # noqa
from .compat import b # noqa
from .compat import b64decode # noqa
class WeakSequence(object):
def __init__(self, __elements=()):
+ # adapted from weakref.WeakKeyDictionary, prevent reference
+ # cycles in the collection itself
def _remove(item, selfref=weakref.ref(self)):
self = selfref()
if self is not None:
self._storage.remove(item)
+
self._remove = _remove
self._storage = [
weakref.ref(element, _remove) for element in __elements
return val
+class WeakPopulateDict(dict):
+ """Like PopulateDict, but assumes a self + a method and does not create
+ a reference cycle.
+
+ """
+
+ def __init__(self, creator_method):
+ self.creator = creator_method.__func__
+ weakself = creator_method.__self__
+ self.weakself = weakref.ref(weakself)
+
+ def __missing__(self, key):
+ self[key] = val = self.creator(self.weakself(), key)
+ return val
+
+
# Define collections that are capable of storing
# ColumnElement objects as hashable keys/elements.
# At this point, these are mostly historical, things
column_set = set
column_dict = dict
ordered_column_set = OrderedSet
-populate_column_dict = PopulateDict
_getters = PopulateDict(operator.itemgetter)
import sqlalchemy as sa
from sqlalchemy import ForeignKey
+from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import select
from sqlalchemy import util
from sqlalchemy.orm import aliased
from sqlalchemy.orm import clear_mappers
+from sqlalchemy.orm import configure_mappers
from sqlalchemy.orm import create_session
+from sqlalchemy.orm import join as orm_join
+from sqlalchemy.orm import joinedload
+from sqlalchemy.orm import Load
from sqlalchemy.orm import mapper
from sqlalchemy.orm import relationship
+from sqlalchemy.orm import selectinload
from sqlalchemy.orm import Session
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import subqueryload
from sqlalchemy.processors import to_decimal_processor_factory
from sqlalchemy.processors import to_unicode_processor_factory
from sqlalchemy.sql import column
+from sqlalchemy.sql import util as sql_util
+from sqlalchemy.sql.visitors import cloned_traverse
+from sqlalchemy.sql.visitors import replacement_traverse
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
from sqlalchemy.testing.util import gc_collect
+from ..orm import _fixtures
class A(fixtures.ComparableEntity):
pass
+def assert_cycles(expected=0):
+ def decorate(fn):
+ def go():
+ fn() # warmup, configure mappers, caches, etc.
+
+ gc_collect()
+ gc_collect()
+ gc_collect() # multiple calls seem to matter
+
+ # gc.set_debug(gc.DEBUG_COLLECTABLE)
+ try:
+ return fn() # run for real
+ finally:
+ unreachable = gc_collect()
+ assert unreachable <= expected
+ gc_collect()
+
+ return go
+
+ return decorate
+
+
def profile_memory(
maxtimes=250, assert_no_sessions=True, get_num_objects=None
):
go()
finally:
metadata.drop_all()
+
+
+class CycleTest(_fixtures.FixtureTest):
+ __tags__ = ("memory_intensive",)
+ __requires__ = ("cpython",)
+
+ run_setup_mappers = "once"
+ run_inserts = "once"
+ run_deletes = None
+
+ @classmethod
+ def setup_mappers(cls):
+ cls._setup_stock_mapping()
+
+ def test_query(self):
+ User, Address = self.classes("User", "Address")
+ configure_mappers()
+
+ s = Session()
+
+ @assert_cycles()
+ def go():
+ return s.query(User).all()
+
+ go()
+
+ def test_query_alias(self):
+ User, Address = self.classes("User", "Address")
+ configure_mappers()
+
+ s = Session()
+
+ u1 = aliased(User)
+
+ @assert_cycles()
+ def go():
+ s.query(u1).all()
+
+ go()
+
+ def test_entity_path_w_aliased(self):
+ User, Address = self.classes("User", "Address")
+ configure_mappers()
+
+ @assert_cycles()
+ def go():
+ u1 = aliased(User)
+ inspect(u1)._path_registry[User.addresses.property]
+
+ go()
+
+ def test_orm_objects_from_query(self):
+ User, Address = self.classes("User", "Address")
+ configure_mappers()
+
+ s = Session()
+
+ def generate():
+ objects = s.query(User).filter(User.id == 7).all()
+ gc_collect()
+ return objects
+
+ @assert_cycles()
+ def go():
+ generate()
+
+ go()
+
+ def test_orm_objects_from_query_w_selectinload(self):
+ User, Address = self.classes("User", "Address")
+
+ s = Session()
+
+ def generate():
+ objects = s.query(User).options(selectinload(User.addresses)).all()
+ gc_collect()
+ return objects
+
+ @assert_cycles()
+ def go():
+ generate()
+
+ go()
+
+ def test_selectinload_option_unbound(self):
+ User, Address = self.classes("User", "Address")
+
+ @assert_cycles()
+ def go():
+ selectinload(User.addresses)
+
+ go()
+
+ def test_selectinload_option_bound(self):
+ User, Address = self.classes("User", "Address")
+
+ @assert_cycles()
+ def go():
+ Load(User).selectinload(User.addresses)
+
+ go()
+
+ def test_orm_path(self):
+ User, Address = self.classes("User", "Address")
+
+ @assert_cycles()
+ def go():
+ inspect(User)._path_registry[User.addresses.property][
+ inspect(Address)
+ ]
+
+ go()
+
+ def test_joinedload_option_unbound(self):
+ User, Address = self.classes("User", "Address")
+
+ @assert_cycles()
+ def go():
+ joinedload(User.addresses)
+
+ go()
+
+ def test_joinedload_option_bound(self):
+ User, Address = self.classes("User", "Address")
+
+ @assert_cycles()
+ def go():
+ Load(User).joinedload(User.addresses)
+
+ go()
+
+ def test_orm_objects_from_query_w_joinedload(self):
+ User, Address = self.classes("User", "Address")
+
+ s = Session()
+
+ def generate():
+ objects = s.query(User).options(joinedload(User.addresses)).all()
+ gc_collect()
+ return objects
+
+ @assert_cycles()
+ def go():
+ generate()
+
+ go()
+
+ def test_query_filtered(self):
+ User, Address = self.classes("User", "Address")
+
+ s = Session()
+
+ @assert_cycles()
+ def go():
+ return s.query(User).filter(User.id == 7).all()
+
+ go()
+
+ def test_query_joins(self):
+ User, Address = self.classes("User", "Address")
+
+ s = Session()
+
+ # cycles here are due to ClauseElement._cloned_set
+ @assert_cycles(3)
+ def go():
+ s.query(User).join(User.addresses).all()
+
+ go()
+
+ def test_plain_join(self):
+ users, addresses = self.tables("users", "addresses")
+
+ @assert_cycles()
+ def go():
+ str(users.join(addresses))
+
+ go()
+
+ def test_plain_join_select(self):
+ users, addresses = self.tables("users", "addresses")
+
+ # cycles here are due to ClauseElement._cloned_set
+ @assert_cycles(6)
+ def go():
+ s = select([users]).select_from(users.join(addresses))
+ s._froms
+
+ go()
+
+ def test_orm_join(self):
+ User, Address = self.classes("User", "Address")
+
+ @assert_cycles()
+ def go():
+ str(orm_join(User, Address, User.addresses))
+
+ go()
+
+ def test_join_via_query_relationship(self):
+ User, Address = self.classes("User", "Address")
+ configure_mappers()
+
+ s = Session()
+
+ @assert_cycles()
+ def go():
+ s.query(User).join(User.addresses)
+
+ go()
+
+ def test_join_via_query_to_entity(self):
+ User, Address = self.classes("User", "Address")
+ configure_mappers()
+
+ s = Session()
+
+ @assert_cycles()
+ def go():
+ s.query(User).join(Address)
+
+ go()
+
+ def test_core_select(self):
+ User, Address = self.classes("User", "Address")
+ configure_mappers()
+
+ s = Session()
+
+ stmt = s.query(User).join(User.addresses).statement
+
+ @assert_cycles()
+ def go():
+ s.execute(stmt)
+
+ go()
+
+ def test_adapt_statement_replacement_traversal(self):
+ User, Address = self.classes("User", "Address")
+
+ statement = select([User]).select_from(
+ orm_join(User, Address, User.addresses)
+ )
+
+ @assert_cycles()
+ def go():
+ replacement_traverse(statement, {}, lambda x: None)
+
+ go()
+
+ def test_adapt_statement_cloned_traversal(self):
+ User, Address = self.classes("User", "Address")
+
+ statement = select([User]).select_from(
+ orm_join(User, Address, User.addresses)
+ )
+
+ @assert_cycles()
+ def go():
+ cloned_traverse(statement, {}, {})
+
+ go()
+
+ def test_column_adapter_lookup(self):
+ User, Address = self.classes("User", "Address")
+
+ u1 = aliased(User)
+
+ @assert_cycles()
+ def go():
+ adapter = sql_util.ColumnAdapter(inspect(u1).selectable)
+ adapter.columns[User.id]
+
+ go()
+
+ def test_orm_aliased(self):
+ User, Address = self.classes("User", "Address")
+
+ @assert_cycles()
+ def go():
+ u1 = aliased(User)
+ inspect(u1)
+
+ go()
+
+ @testing.fails
+ def test_the_counter(self):
+ @assert_cycles()
+ def go():
+ x = []
+ x.append(x)
+
+ go()
+
+ def test_weak_sequence(self):
+ class Foo(object):
+ pass
+
+ f = Foo()
+
+ @assert_cycles()
+ def go():
+ util.WeakSequence([f])
+
+ go()
+
+ @testing.provide_metadata
+ def test_optimized_get(self):
+
+ from sqlalchemy.ext.declarative import declarative_base
+
+ Base = declarative_base(metadata=self.metadata)
+
+ class Employee(Base):
+ __tablename__ = "employee"
+ id = Column(
+ Integer, primary_key=True, test_needs_autoincrement=True
+ )
+ type = Column(String(10))
+ __mapper_args__ = {"polymorphic_on": type}
+
+ class Engineer(Employee):
+ __tablename__ = " engineer"
+ id = Column(ForeignKey("employee.id"), primary_key=True)
+
+ engineer_name = Column(String(50))
+ __mapper_args__ = {"polymorphic_identity": "engineer"}
+
+ Base.metadata.create_all(testing.db)
+
+ s = Session(testing.db)
+ s.add(Engineer(engineer_name="wally"))
+ s.commit()
+ s.close()
+
+ @assert_cycles()
+ def go():
+ e1 = s.query(Employee).first()
+ e1.engineer_name
+
+ go()
+
+ def test_visit_binary_product(self):
+ a, b, q, e, f, j, r = [column(chr_) for chr_ in "abqefjr"]
+
+ from sqlalchemy import and_, func
+ from sqlalchemy.sql.util import visit_binary_product
+
+ expr = and_((a + b) == q + func.sum(e + f), j == r)
+
+ def visit(expr, left, right):
+ pass
+
+ @assert_cycles()
+ def go():
+ visit_binary_product(visit, expr)
+
+ go()
eq_(len(w), 2)
eq_(len(w._storage), 2)
- @testing.requires.predictable_gc
- def test_cleanout_container(self):
- import weakref
-
- class Foo(object):
- pass
-
- f = Foo()
- w = WeakSequence([f])
- w_wref = weakref.ref(w)
- del w
- eq_(w_wref(), None)
-
class OrderedDictTest(fixtures.TestBase):
def test_odict(self):
l1 = Load(User)
l2 = l1.joinedload("addresses")
- eq_(l1.context, {("loader", self._make_path([User, "addresses"])): l2})
+ to_bind = l2.context.values()[0]
+ eq_(
+ l1.context,
+ {("loader", self._make_path([User, "addresses"])): to_bind},
+ )
def test_set_strat_col(self):
User = self.classes.User
User = self.classes.User
opt = self._option_fixture(User.addresses)
+ to_bind = list(opt._to_bind)
eq_(
opt.__getstate__(),
{
"is_class_strategy": False,
"path": [(User, "addresses", None)],
"propagate_to_loaders": True,
- "_to_bind": [opt],
- "strategy": (("lazy", "joined"),),
+ "_of_type": None,
+ "_to_bind": to_bind,
},
)
def test_modern_opt_setstate(self):
User = self.classes.User
+ inner_opt = strategy_options._UnboundLoad.__new__(
+ strategy_options._UnboundLoad
+ )
+ inner_state = {
+ "_is_chain_link": False,
+ "local_opts": {},
+ "is_class_strategy": False,
+ "path": [(User, "addresses", None)],
+ "propagate_to_loaders": True,
+ "_to_bind": None,
+ "strategy": (("lazy", "joined"),),
+ }
+ inner_opt.__setstate__(inner_state)
+
opt = strategy_options._UnboundLoad.__new__(
strategy_options._UnboundLoad
)
"is_class_strategy": False,
"path": [(User, "addresses", None)],
"propagate_to_loaders": True,
- "_to_bind": [opt],
- "strategy": (("lazy", "joined"),),
+ "_to_bind": [inner_opt],
}
opt.__setstate__(state)
p_poly = with_polymorphic(Person, [Engineer])
e_poly = inspect(p_poly.Engineer)
- p_poly = inspect(p_poly)
+ p_poly_insp = inspect(p_poly)
- p1 = PathRegistry.coerce((p_poly, emapper.attrs.machines))
+ p1 = PathRegistry.coerce((p_poly_insp, emapper.attrs.machines))
# polymorphic AliasedClass - the path uses _entity_for_mapper()
# to get the most specific sub-entity
return True
return False
- def _copy_internals(self, clone=_clone):
- self.items = [clone(i) for i in self.items]
+ def _copy_internals(self, clone=_clone, **kw):
+ self.items = [clone(i, **kw) for i in self.items]
def get_children(self, **kwargs):
return self.items