from __future__ import annotations
+import operator
from typing import Any
from typing import Callable
from typing import Dict
from typing import Generic
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
from typing import TypeVar
+from typing import Union
from . import base
from .collections import collection
from ..sql import coercions
from ..sql import expression
from ..sql import roles
+from ..util.typing import Literal
if TYPE_CHECKING:
- from typing import List
- from typing import Optional
- from typing import Sequence
- from typing import Tuple
- from typing import Union
from . import AttributeEventToken
from . import Mapper
if self.composite:
return tuple(key)
else:
- return key[0]
+ obj = key[0]
+ if obj is None:
+ return _UNMAPPED_AMBIGUOUS_NONE
+ else:
+ return obj
class _SerializableColumnGetterV2(_PlainColumnGetter[_KT]):
.. versionadded:: 2.0 an error is raised by default if the attribute
being used for the dictionary key is determined that it was never
populated with any value. The
- :paramref:`.column_keyed_dict.ignore_unpopulated_attribute`
+ :paramref:`_orm.column_keyed_dict.ignore_unpopulated_attribute`
parameter may be set which will instead indicate that this condition
should be ignored, and the append operation silently skipped.
This is in contrast to the behavior of the 1.x series which would
)
+_UNMAPPED_AMBIGUOUS_NONE = object()
+
+
class _AttrGetter:
- __slots__ = ("attr_name",)
+ __slots__ = ("attr_name", "getter")
def __init__(self, attr_name: str):
self.attr_name = attr_name
+ self.getter = operator.attrgetter(attr_name)
def __call__(self, mapped_object: Any) -> Any:
- dict_ = base.instance_dict(mapped_object)
- return dict_.get(self.attr_name, base.NO_VALUE)
+ obj = self.getter(mapped_object)
+ if obj is None:
+ state = base.instance_state(mapped_object)
+ mp = state.mapper
+ if self.attr_name in mp.attrs:
+ dict_ = state.dict
+ obj = dict_.get(self.attr_name, base.NO_VALUE)
+ if obj is None:
+ return _UNMAPPED_AMBIGUOUS_NONE
+ else:
+ return _UNMAPPED_AMBIGUOUS_NONE
+
+ return obj
def __reduce__(self) -> Tuple[Type[_AttrGetter], Tuple[str]]:
return _AttrGetter, (self.attr_name,)
.. versionadded:: 2.0 an error is raised by default if the attribute
being used for the dictionary key is determined that it was never
populated with any value. The
- :paramref:`.attribute_keyed_dict.ignore_unpopulated_attribute`
+ :paramref:`_orm.attribute_keyed_dict.ignore_unpopulated_attribute`
parameter may be set which will instead indicate that this condition
should be ignored, and the append operation silently skipped.
This is in contrast to the behavior of the 1.x series which would
being used for the dictionary key returns
:attr:`.LoaderCallableStatus.NO_VALUE`, which in an ORM attribute
context indicates an attribute that was never populated with any value.
- The :paramref:`.mapped_collection.ignore_unpopulated_attribute`
+ The :paramref:`_orm.mapped_collection.ignore_unpopulated_attribute`
parameter may be set which will instead indicate that this condition
should be ignored, and the append operation silently skipped. This is
in contrast to the behavior of the 1.x series which would erroneously
def __init__(
self,
keyfunc: _F,
- *,
+ *dict_args: Any,
ignore_unpopulated_attribute: bool = False,
) -> None:
"""Create a new collection with keying provided by keyfunc.
"""
self.keyfunc = keyfunc
self.ignore_unpopulated_attribute = ignore_unpopulated_attribute
+ super().__init__(*dict_args)
@classmethod
def _unreduce(
]:
return (KeyFuncDict._unreduce, (self.keyfunc, dict(self)))
+ @util.preload_module("sqlalchemy.orm.attributes")
def _raise_for_unpopulated(
- self, value: _KT, initiator: Optional[AttributeEventToken]
+ self,
+ value: _KT,
+ initiator: Union[AttributeEventToken, Literal[None, False]] = None,
+ *,
+ warn_only: bool,
) -> None:
mapper = base.instance_state(value).mapper
- if initiator is None:
+ attributes = util.preloaded.orm_attributes
+
+ if not isinstance(initiator, attributes.AttributeEventToken):
relationship = "unknown relationship"
- else:
+ elif initiator.key in mapper.attrs:
relationship = f"{mapper.attrs[initiator.key]}"
-
- raise sa_exc.InvalidRequestError(
- f"In event triggered from population of attribute {relationship} "
- "(likely from a backref), "
- f"can't populate value in KeyFuncDict; "
- "dictionary key "
- f"derived from {base.instance_str(value)} is not "
- f"populated. Ensure appropriate state is set up on "
- f"the {base.instance_str(value)} object "
- f"before assigning to the {relationship} attribute. "
- f"To skip this assignment entirely, "
- f'Set the "ignore_unpopulated_attribute=True" '
- f"parameter on the mapped collection factory."
- )
+ else:
+ relationship = initiator.key
+
+ if warn_only:
+ util.warn(
+ f"Attribute keyed dictionary value for "
+ f"attribute '{relationship}' was None; this will raise "
+ "in a future release. "
+ f"To skip this assignment entirely, "
+ f'Set the "ignore_unpopulated_attribute=True" '
+ f"parameter on the mapped collection factory."
+ )
+ else:
+ raise sa_exc.InvalidRequestError(
+ "In event triggered from population of "
+ f"attribute '{relationship}' "
+ "(potentially from a backref), "
+ f"can't populate value in KeyFuncDict; "
+ "dictionary key "
+ f"derived from {base.instance_str(value)} is not "
+ f"populated. Ensure appropriate state is set up on "
+ f"the {base.instance_str(value)} object "
+ f"before assigning to the {relationship} attribute. "
+ f"To skip this assignment entirely, "
+ f'Set the "ignore_unpopulated_attribute=True" '
+ f"parameter on the mapped collection factory."
+ )
@collection.appender # type: ignore[misc]
@collection.internally_instrumented # type: ignore[misc]
def set(
self,
value: _KT,
- _sa_initiator: Optional[AttributeEventToken] = None,
+ _sa_initiator: Union[AttributeEventToken, Literal[None, False]] = None,
) -> None:
"""Add an item by value, consulting the keyfunc for the key."""
if key is base.NO_VALUE:
if not self.ignore_unpopulated_attribute:
- self._raise_for_unpopulated(value, _sa_initiator)
+ self._raise_for_unpopulated(
+ value, _sa_initiator, warn_only=False
+ )
+ else:
+ return
+ elif key is _UNMAPPED_AMBIGUOUS_NONE:
+ if not self.ignore_unpopulated_attribute:
+ self._raise_for_unpopulated(
+ value, _sa_initiator, warn_only=True
+ )
+ key = None
else:
return
def remove(
self,
value: _KT,
- _sa_initiator: Optional[AttributeEventToken] = None,
+ _sa_initiator: Union[AttributeEventToken, Literal[None, False]] = None,
) -> None:
"""Remove an item by value, consulting the keyfunc for the key."""
if key is base.NO_VALUE:
if not self.ignore_unpopulated_attribute:
- self._raise_for_unpopulated(value, _sa_initiator)
+ self._raise_for_unpopulated(
+ value, _sa_initiator, warn_only=False
+ )
return
+ elif key is _UNMAPPED_AMBIGUOUS_NONE:
+ if not self.ignore_unpopulated_attribute:
+ self._raise_for_unpopulated(
+ value, _sa_initiator, warn_only=True
+ )
+ key = None
+ else:
+ return
# Let self[key] raise if key is not in this collection
# testlib.pragma exempt:__ne__
keyfunc: _F, ignore_unpopulated_attribute: bool
) -> Type[KeyFuncDict[_KT, _KT]]:
class _MKeyfuncMapped(KeyFuncDict[_KT, _KT]):
- def __init__(self) -> None:
+ def __init__(self, *dict_args: Any) -> None:
super().__init__(
keyfunc,
+ *dict_args,
ignore_unpopulated_attribute=ignore_unpopulated_attribute,
)
+from __future__ import annotations
+
import contextlib
+import dataclasses
from functools import reduce
from operator import and_
+from typing import Any
+from typing import List
+from typing import MutableMapping
+from typing import MutableSet
from sqlalchemy import event
from sqlalchemy import exc as sa_exc
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import util
+from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import attributes
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import instrumentation
from sqlalchemy.orm import relationship
+from sqlalchemy.orm import synonym
import sqlalchemy.orm.collections as collections
from sqlalchemy.orm.collections import collection
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_raises_message
+from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_true
def __repr__(self):
return str((id(self), self.a, self.b, self.c))
+ class SimpleComparableEntity:
+ def __init__(self, a=None, b=None):
+ self.a = a
+ self.b = b
+
+ def __hash__(self):
+ return hash(self.a) + hash(self.b)
+
+ def __eq__(self, other):
+ return other.a == self.a and other.b == self.b
+
+ def __repr__(self):
+ return str((id(self), self.a, self.b, self.c))
+
@classmethod
def setup_test_class(cls):
instrumentation.register_class(cls.Entity)
self.assert_(e6 not in canary.removed)
self.assert_(e7 not in canary.removed)
+ def _test_list_dataclasses(self, typecallable):
+
+ creator = self.SimpleComparableEntity
+
+ @dataclasses.dataclass
+ class Foo:
+ attr: List[Any] = dataclasses.field(default_factory=list)
+
+ canary = Canary()
+ instrumentation.register_class(Foo)
+ d = _register_attribute(
+ Foo,
+ "attr",
+ uselist=True,
+ typecallable=typecallable,
+ useobject=True,
+ )
+ canary.listen(d)
+
+ obj = Foo()
+ direct = obj.attr
+
+ e1 = creator(a=1, b=2)
+ collections.collection_adapter(direct).append_with_event(e1)
+
+ like_me = typecallable()
+ like_me.append(e1)
+
+ eq_(dataclasses.asdict(obj), {"attr": like_me})
+
def test_list(self):
self._test_adapter(list)
self._test_list(list)
self._test_list_bulk(list)
+ self._test_list_dataclasses(list)
def test_list_setitem_with_slices(self):
self._test_adapter(MyList)
self._test_list(MyList)
self._test_list_bulk(MyList)
+ self._test_list_dataclasses(MyList)
self.assert_(getattr(MyList, "_sa_instrumented") == id(MyList))
def test_list_duck(self):
self.assert_(e4 not in canary.data)
self.assert_(e3 in canary.data)
+ def _test_set_dataclasses(self, typecallable):
+
+ creator = self.SimpleComparableEntity
+
+ @dataclasses.dataclass
+ class Foo:
+ attr: MutableSet[Any] = dataclasses.field(default_factory=set)
+
+ canary = Canary()
+ instrumentation.register_class(Foo)
+ d = _register_attribute(
+ Foo,
+ "attr",
+ uselist=True,
+ typecallable=typecallable,
+ useobject=True,
+ )
+ canary.listen(d)
+
+ obj = Foo()
+ direct = obj.attr
+
+ e1 = creator(a=1, b=2)
+ collections.collection_adapter(direct).append_with_event(e1)
+
+ like_me = typecallable()
+ like_me.add(e1)
+
+ eq_(dataclasses.asdict(obj), {"attr": like_me})
+
def test_set(self):
self._test_adapter(set)
self._test_set(set)
self._test_set_bulk(set)
self._test_set_wo_mutation(set)
+ self._test_set_dataclasses(set)
def test_set_subclass(self):
class MySet(set):
self._test_adapter(MySet)
self._test_set(MySet)
self._test_set_bulk(MySet)
+ self._test_set_dataclasses(MySet)
self.assert_(getattr(MySet, "_sa_instrumented") == id(MySet))
def test_set_duck(self):
self._test_adapter(SetLike)
self._test_set(SetLike)
self._test_set_bulk(SetLike)
+ self._test_set_dataclasses(SetLike)
self.assert_(getattr(SetLike, "_sa_instrumented") == id(SetLike))
def test_set_emulates(self):
self._test_adapter(SetIsh)
self._test_set(SetIsh)
self._test_set_bulk(SetIsh)
+ self._test_set_dataclasses(SetIsh)
self.assert_(getattr(SetIsh, "_sa_instrumented") == id(SetIsh))
def _test_dict_wo_mutation(self, typecallable, creator=None):
dict,
)
+ def _test_dict_dataclasses(self, typecallable):
+
+ creator = self.SimpleComparableEntity
+
+ @dataclasses.dataclass
+ class Foo:
+ attr: MutableMapping[Any, Any] = dataclasses.field(
+ default_factory=dict
+ )
+
+ canary = Canary()
+ instrumentation.register_class(Foo)
+ d = _register_attribute(
+ Foo,
+ "attr",
+ uselist=True,
+ typecallable=typecallable,
+ useobject=True,
+ )
+ canary.listen(d)
+
+ obj = Foo()
+ direct = obj.attr
+
+ e1 = creator(a=1, b=2)
+ collections.collection_adapter(direct).append_with_event(e1)
+
+ like_me = typecallable()
+ like_me.set(e1)
+
+ eq_(dataclasses.asdict(obj), {"attr": like_me})
+
def test_dict_subclass(self):
class MyDict(dict):
@collection.appender
self._test_dict(MyDict)
self._test_dict_bulk(MyDict)
self._test_dict_wo_mutation(MyDict)
+ self._test_dict_dataclasses(MyDict)
self.assert_(getattr(MyDict, "_sa_instrumented") == id(MyDict))
def test_dict_subclass2(self):
class MyEasyDict(collections.KeyFuncDict):
- def __init__(self):
- super().__init__(lambda e: e.a)
+ def __init__(self, *args):
+ super().__init__(lambda e: e.a, *args)
self._test_adapter(
MyEasyDict, self.dictable_entity, to_set=lambda c: set(c.values())
self._test_dict(MyEasyDict)
self._test_dict_bulk(MyEasyDict)
self._test_dict_wo_mutation(MyEasyDict)
+ self._test_dict_dataclasses(MyEasyDict)
self.assert_(getattr(MyEasyDict, "_sa_instrumented") == id(MyEasyDict))
def test_dict_subclass3(self, ordered_dict_mro):
class MyOrdered(ordered_dict_mro):
- def __init__(self):
- collections.KeyFuncDict.__init__(self, lambda e: e.a)
+ def __init__(self, *dict_args):
+ collections.KeyFuncDict.__init__(
+ self, lambda e: e.a, *dict_args
+ )
util.OrderedDict.__init__(self)
self._test_adapter(
self._test_dict(MyOrdered)
self._test_dict_bulk(MyOrdered)
self._test_dict_wo_mutation(MyOrdered)
+ self._test_dict_dataclasses(MyOrdered)
self.assert_(getattr(MyOrdered, "_sa_instrumented") == id(MyOrdered))
def test_dict_duck(self):
class DictLike:
- def __init__(self):
- self.data = dict()
+ def __init__(self, *args):
+ self.data = dict(*args)
@collection.appender
@collection.replaces(1)
self._test_dict(DictLike)
self._test_dict_bulk(DictLike)
self._test_dict_wo_mutation(DictLike)
+ self._test_dict_dataclasses(DictLike)
self.assert_(getattr(DictLike, "_sa_instrumented") == id(DictLike))
def test_dict_emulates(self):
class DictIsh:
__emulates__ = dict
- def __init__(self):
- self.data = dict()
+ def __init__(self, *args):
+ self.data = dict(*args)
@collection.appender
@collection.replaces(1)
self._test_dict(DictIsh)
self._test_dict_bulk(DictIsh)
self._test_dict_wo_mutation(DictIsh)
+ self._test_dict_dataclasses(DictIsh)
self.assert_(getattr(DictIsh, "_sa_instrumented") == id(DictIsh))
def _test_object(self, typecallable, creator=None):
Column("c", String(128)),
)
+ Table(
+ "dcparents",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("label", String(128)),
+ )
+ Table(
+ "dcchildren",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column(
+ "parent_id",
+ Integer,
+ ForeignKey("dcparents.id"),
+ nullable=False,
+ ),
+ Column("a", String(128)),
+ Column("b", String(128)),
+ )
+
@classmethod
def setup_classes(cls):
class Parent(cls.Basic):
len(list(collections.collection_adapter(p.children))) == 0
)
+ def _test_scalar_dataclass_mapped(self, collection_class):
+ dcparents, dcchildren = self.tables("dcparents", "dcchildren")
+
+ @dataclasses.dataclass
+ class DCParent:
+ children: MutableMapping[Any, Any] = dataclasses.field(
+ default_factory=dict
+ )
+
+ @dataclasses.dataclass
+ class DCChild:
+ a: str
+ b: str
+
+ self.mapper_registry.map_imperatively(DCChild, dcchildren)
+ self.mapper_registry.map_imperatively(
+ DCParent,
+ dcparents,
+ properties={
+ "children": relationship(
+ DCChild,
+ collection_class=collection_class,
+ cascade="all, delete-orphan",
+ )
+ },
+ )
+
+ p = DCParent()
+ p.children["foo"] = DCChild("foo", "value")
+ p.children["bar"] = DCChild("bar", "value")
+
+ eq_(
+ dataclasses.asdict(p),
+ {
+ "children": {
+ "foo": {"a": "foo", "b": "value"},
+ "bar": {"a": "bar", "b": "value"},
+ }
+ },
+ )
+
def _test_composite_mapped(self, collection_class):
parents, children, Parent, Child = (
self.tables.parents,
def test_mapped_collection(self):
collection_class = collections.keyfunc_mapping(lambda c: c.a)
self._test_scalar_mapped(collection_class)
+ self._test_scalar_dataclass_mapped(collection_class)
def test_mapped_collection2(self):
collection_class = collections.keyfunc_mapping(lambda c: (c.a, c.b))
def test_attr_mapped_collection(self):
collection_class = collections.attribute_keyed_dict("a")
self._test_scalar_mapped(collection_class)
+ self._test_scalar_dataclass_mapped(collection_class)
def test_declarative_column_mapped(self):
"""test that uncompiled attribute usage works with
collection_class = collections.column_keyed_dict(children.c.a)
self._test_scalar_mapped(collection_class)
+ self._test_scalar_dataclass_mapped(collection_class)
def test_column_mapped_collection2(self):
children = self.tables.children
def test_mixin(self, ordered_dict_mro):
class Ordered(ordered_dict_mro):
- def __init__(self):
- collections.KeyFuncDict.__init__(self, lambda v: v.a)
+ def __init__(self, *args):
+ collections.KeyFuncDict.__init__(self, lambda v: v.a, *args)
util.OrderedDict.__init__(self)
collection_class = Ordered
self._test_scalar_mapped(collection_class)
+ self._test_scalar_dataclass_mapped(collection_class)
def test_mixin2(self, ordered_dict_mro):
class Ordered2(ordered_dict_mro):
- def __init__(self, keyfunc):
- collections.KeyFuncDict.__init__(self, keyfunc)
+ def __init__(self, keyfunc, *args):
+ collections.KeyFuncDict.__init__(self, keyfunc, *args)
util.OrderedDict.__init__(self)
- def collection_class():
- return Ordered2(lambda v: (v.a, v.b))
+ def collection_class(*args):
+ return Ordered2(lambda v: (v.a, v.b), *args)
self._test_composite_mapped(collection_class)
assert not adapter._referenced_by_owner
+class AttrKeyedDictKeysTest(fixtures.TestBase):
+ """tests for #9424, regression when populating attr_keyed_dict from
+ a non-ORM-mapped attribute.
+
+ """
+
+ @testing.variation_fixture(
+ "type_", ["plain", "synonym", "property", "assoc_proxy"]
+ )
+ def key_fixture(self, request, decl_base):
+ type_ = request.param
+
+ def go(ignore_unpopulated=False):
+ class A(decl_base):
+ __tablename__ = "a"
+
+ id = Column(Integer, primary_key=True)
+ bs = relationship(
+ "B",
+ collection_class=collections.attribute_keyed_dict(
+ "attrkey",
+ ignore_unpopulated_attribute=ignore_unpopulated,
+ ),
+ back_populates="a",
+ )
+
+ class B(decl_base):
+ __tablename__ = "b"
+ id = Column(Integer, primary_key=True)
+ a_id = Column(ForeignKey("a.id"))
+ c_id = Column(ForeignKey("c.id"))
+ c = relationship("C")
+ a = relationship("A", back_populates="bs")
+
+ if type_.plain:
+ attrkey = Column(String(30))
+ elif type_.synonym:
+ data = Column(String(30))
+ attrkey = synonym("data")
+ elif type_.property:
+
+ @property
+ def attrkey(self):
+ return self.c.name
+
+ elif type_.assoc_proxy:
+ attrkey = association_proxy("c", "name")
+ else:
+ type_.fail()
+
+ class C(decl_base):
+ __tablename__ = "c"
+ id = Column(Integer, primary_key=True)
+ name = Column(String(30))
+
+ decl_base.metadata.create_all(testing.db)
+ return type_, A, B, C
+
+ return go
+
+ def test_attr_dict_keys_persist(self, key_fixture):
+ type_, A, B, C = key_fixture()
+
+ if type_.plain or type_.synonym:
+ b1 = B(attrkey="k1", c=C(name="k1"))
+ b2 = B(attrkey="k2", c=C(name="k2"))
+ else:
+ b1 = B(c=C(name="k1"))
+ b2 = B(c=C(name="k2"))
+
+ sess = fixture_session()
+ a1 = A(bs={"k1": b1, "k2": b2})
+
+ sess.add(a1)
+ sess.commit()
+
+ eq_(a1.bs, {"k1": b1, "k2": b2})
+
+ @testing.variation("ignore_unpopulated", [True, False])
+ def test_attr_dict_keys_none(self, key_fixture, ignore_unpopulated):
+ type_, A, B, C = key_fixture(
+ ignore_unpopulated=bool(ignore_unpopulated)
+ )
+
+ b1 = B(c=C(name=None))
+ b2 = B(c=C(name=None))
+
+ sess = fixture_session()
+
+ if ignore_unpopulated:
+ a1 = A(bs={"k1": b1, "k2": b2})
+ else:
+ if type_.plain or type_.synonym:
+ with expect_raises_message(
+ sa_exc.InvalidRequestError,
+ "In event triggered from population of attribute 'bs'",
+ ):
+ A(bs={"k1": b1, "k2": b2})
+ return
+
+ with expect_warnings(
+ "Attribute keyed dictionary value for attribute "
+ "'bs' was None;",
+ "Attribute keyed dictionary value for attribute "
+ "'B.a' was None;",
+ raise_on_any_unexpected=True,
+ ):
+ a1 = A(bs={"k1": b1, "k2": b2})
+ sess.add(a1)
+ sess.commit()
+
+ if ignore_unpopulated:
+ eq_(a1.bs, {})
+ else:
+ # not totally ideal but this is very edge case. usually
+ # the attributes for the DB were populated in some other way
+ # so when loading, there would be no issue
+ with expect_warnings(
+ "Attribute keyed dictionary value for attribute "
+ "'unknown relationship' was None;",
+ raise_on_any_unexpected=True,
+ ):
+ eq_(a1.bs, {None: b2})
+
+
class UnpopulatedAttrTest(fixtures.TestBase):
def _fixture(self, decl_base, collection_fn, ignore_unpopulated):
class B(decl_base):
__tablename__ = "b"
id = Column(Integer, primary_key=True)
- data = Column(String)
+ data = Column(String(30))
a_id = Column(ForeignKey("a.id"))
if collection_fn is collections.attribute_keyed_dict:
else:
with expect_raises_message(
sa_exc.InvalidRequestError,
- "In event triggered from population of attribute B.a",
+ "In event triggered from population of attribute 'B.a'",
):
a1.bs["bar"] = B(a=a1)
else:
with expect_raises_message(
sa_exc.InvalidRequestError,
- "In event triggered from population of attribute B.a",
+ "In event triggered from population of attribute 'B.a'",
):
b1.a = None
+
+ @testing.combinations(
+ collections.attribute_keyed_dict,
+ collections.column_keyed_dict,
+ argnames="collection_fn",
+ )
+ @testing.variation("ignore_unpopulated", [True, False])
+ @testing.variation("attr_is_actually_none", [True, False])
+ def test_what_about_lazy_loading(
+ self,
+ decl_base,
+ collection_fn,
+ ignore_unpopulated,
+ attr_is_actually_none,
+ ):
+ """test additional use case that wasn't considered for #8372"""
+ A, B = self._fixture(
+ decl_base, collection_fn, bool(ignore_unpopulated)
+ )
+
+ decl_base.metadata.create_all(testing.db)
+
+ sess = fixture_session()
+
+ a1 = A()
+
+ if attr_is_actually_none:
+ b1 = B()
+ else:
+ b1 = B(data="bar")
+
+ sess.add_all([a1, b1])
+ sess.commit()
+
+ # load empty a1.bs so that backref populates it
+ a1.bs
+
+ # b1.data not loaded
+ assert "data" not in b1.__dict__
+
+ # a1.bs is present, will need to be populated
+ assert "bs" in a1.__dict__
+
+ if attr_is_actually_none and not ignore_unpopulated:
+ with expect_warnings(
+ "Attribute keyed dictionary value for attribute "
+ "'B.a' was None;",
+ raise_on_any_unexpected=True,
+ ):
+ b1.a = a1
+ else:
+ b1.a = a1
+
+ # it loaded
+ assert "data" in b1.__dict__
+
+ if attr_is_actually_none:
+ if ignore_unpopulated:
+ eq_(a1.bs, {})
+ else:
+ eq_(a1.bs, {None: b1})
+ else:
+ eq_(a1.bs, {"bar": b1})