From: Jason Kirtland Date: Thu, 3 May 2007 18:22:31 +0000 (+0000) Subject: - Test assoc proxy lazy loads, fixed __set__ race on single scalar assocs X-Git-Tag: rel_0_3_8~58 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=5606c01d391aebca903029c5a8882ea964adc566;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - Test assoc proxy lazy loads, fixed __set__ race on single scalar assocs --- diff --git a/lib/sqlalchemy/ext/associationproxy.py b/lib/sqlalchemy/ext/associationproxy.py index 0913d6c488..d86297bc17 100644 --- a/lib/sqlalchemy/ext/associationproxy.py +++ b/lib/sqlalchemy/ext/associationproxy.py @@ -114,13 +114,15 @@ class AssociationProxy(object): return self._get_property().mapper.class_ target_class = property(_target_class) + def _target_is_scalar(self): + return not self._get_property().uselist def __get__(self, obj, class_): if obj is None: self.owning_class = class_ return elif self.scalar is None: - self.scalar = not self._get_property().uselist + self.scalar = self._target_is_scalar() if self.scalar: return getattr(getattr(obj, self.target_collection), self.value_attr) @@ -133,6 +135,9 @@ class AssociationProxy(object): return proxy def __set__(self, obj, values): + if self.scalar is None: + self.scalar = self._target_is_scalar() + if self.scalar: creator = self.creator and self.creator or self.target_class target = getattr(obj, self.target_collection) diff --git a/test/ext/associationproxy.py b/test/ext/associationproxy.py index 9374247884..c4dd78463c 100644 --- a/test/ext/associationproxy.py +++ b/test/ext/associationproxy.py @@ -32,6 +32,7 @@ class ObjectCollection(object): class _CollectionOperations(PersistTest): def setUp(self): collection_class = self.collection_class + lazy = self.lazy if hasattr(self, 'lazy') else False metadata = BoundMetaData(db) @@ -61,7 +62,7 @@ class _CollectionOperations(PersistTest): self.name = name mapper(Parent, parents_table, properties={ - '_children': relation(Child, lazy=False, + '_children': relation(Child, lazy=lazy, collection_class=collection_class)}) mapper(Child, children_table) @@ -172,7 +173,6 @@ class CustomDictTest(DictTest): ch = Child('a', 'regular') p1._children.append(ch) - print repr(p1._children) self.assert_(ch in p1._children.values()) self.assert_(len(p1._children) == 1) @@ -451,7 +451,7 @@ class ScalarTest(PersistTest): mapper(Parent, parents_table, properties={ 'child': relation(Child, lazy=False, - backref='parent', uselist=False)}) + backref='parent', uselist=False)}) mapper(Child, children_table) metadata.create_all() @@ -530,5 +530,117 @@ class ScalarTest(PersistTest): self.assert_(p.bar is None) self.assert_(p.baz == 'xxx') + # Ensure an immediate __set__ works. + p2 = Parent('p2') + p2.bar = 'quux' + + +class LazyLoadTest(PersistTest): + def setUp(self): + metadata = BoundMetaData(db) + + parents_table = Table('Parent', metadata, + Column('id', Integer, primary_key=True), + Column('name', String)) + children_table = Table('Children', metadata, + Column('id', Integer, primary_key=True), + Column('parent_id', Integer, + ForeignKey('Parent.id')), + Column('foo', String), + Column('name', String)) + + class Parent(object): + children = association_proxy('_children', 'name') + + def __init__(self, name): + self.name = name + + class Child(object): + def __init__(self, name): + self.name = name + + + mapper(Child, children_table) + metadata.create_all() + + self.metadata = metadata + self.session = create_session() + self.Parent, self.Child = Parent, Child + self.table = parents_table + + def tearDown(self): + self.metadata.drop_all() + + def roundtrip(self, obj): + self.session.save(obj) + self.session.flush() + id, type_ = obj.id, type(obj) + self.session.clear() + return self.session.query(type_).get(id) + + def test_lazy_list(self): + Parent, Child = self.Parent, self.Child + + mapper(Parent, self.table, properties={ + '_children': relation(Child, lazy=True, + collection_class=list)}) + + p = Parent('p') + p.children = ['a','b','c'] + + p = self.roundtrip(p) + + # Is there a better way to ensure that the association_proxy + # didn't convert a lazy load to an eager load? This does work though. + self.assert_('_children' not in p.__dict__) + self.assert_(len(p._children.data) == 3) + self.assert_('_children' in p.__dict__) + + def test_eager_list(self): + Parent, Child = self.Parent, self.Child + + mapper(Parent, self.table, properties={ + '_children': relation(Child, lazy=False, + collection_class=list)}) + + p = Parent('p') + p.children = ['a','b','c'] + + p = self.roundtrip(p) + + self.assert_('_children' in p.__dict__) + self.assert_(len(p._children.data) == 3) + + def test_lazy_scalar(self): + Parent, Child = self.Parent, self.Child + + mapper(Parent, self.table, properties={ + '_children': relation(Child, lazy=True, uselist=False)}) + + + p = Parent('p') + p.children = 'value' + + p = self.roundtrip(p) + + self.assert_('_children' not in p.__dict__) + self.assert_(p._children is not None) + + def test_eager_scalar(self): + Parent, Child = self.Parent, self.Child + + mapper(Parent, self.table, properties={ + '_children': relation(Child, lazy=False, uselist=False)}) + + + p = Parent('p') + p.children = 'value' + + p = self.roundtrip(p) + + self.assert_('_children' in p.__dict__) + self.assert_(p._children is not None) + + if __name__ == "__main__": testbase.main()