return self._scalar_get(getattr(obj, self.target_collection))
else:
try:
- return getattr(obj, self.key)
+ # If the owning instance is reborn (orm session resurrect,
+ # etc.), refresh the proxy cache.
+ creator_id, proxy = getattr(obj, self.key)
+ if id(obj) == creator_id:
+ return proxy
except AttributeError:
- proxy = self._new(self._lazy_collection(weakref.ref(obj)))
- setattr(obj, self.key, proxy)
- return proxy
+ pass
+ proxy = self._new(self._lazy_collection(weakref.ref(obj)))
+ setattr(obj, self.key, (id(obj), proxy))
+ return proxy
def __set__(self, obj, values):
if self.scalar is None:
import testenv; testenv.configure_for_tests()
-
+import gc
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.orm.collections import collection
self.assert_(p._children is not None)
+class ReconstitutionTest(TestBase):
+ def setUp(self):
+ metadata = MetaData(testing.db)
+ parents = Table('parents', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(30)))
+ children = Table('children', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('parent_id', Integer, ForeignKey('parents.id')),
+ Column('name', String(30)))
+ metadata.create_all()
+ parents.insert().execute(name='p1')
+
+ class Parent(object):
+ kids = association_proxy('children', 'name')
+ def __init__(self, name):
+ self.name = name
+
+ class Child(object):
+ def __init__(self, name):
+ self.name = name
+
+ mapper(Parent, parents, properties=dict(children=relation(Child)))
+ mapper(Child, children)
+
+ self.metadata = metadata
+ self.Parent = Parent
+
+ def tearDown(self):
+ self.metadata.drop_all()
+
+ def test_weak_identity_map(self):
+ session = create_session(weak_identity_map=True)
+
+ def add_child(parent_name, child_name):
+ parent = (session.query(self.Parent).
+ filter_by(name=parent_name)).one()
+ parent.kids.append(child_name)
+
+
+ add_child('p1', 'c1')
+ gc.collect()
+ add_child('p1', 'c2')
+
+ session.flush()
+ p = session.query(self.Parent).filter_by(name='p1').one()
+ assert set(p.kids) == set(['c1', 'c2']), p.kids
+
if __name__ == "__main__":
testenv.main()