if 'populate_instance' not in extension.methods or extension.populate_instance(self, context, row, instance, only_load_props=only_load_props, instancekey=identitykey, isnew=isnew) is EXT_CONTINUE:
self.populate_instance(context, instance, row, only_load_props=only_load_props, instancekey=identitykey, isnew=isnew)
+
+ else:
+ attrs = getattr(state, 'expired_attributes', None)
+ # populate attributes on non-loading instances which have been expired
+ # TODO: also support deferred attributes here [ticket:870]
+ if attrs:
+ if state in context.partials:
+ isnew = False
+ attrs = context.partials[state]
+ else:
+ isnew = True
+ attrs = state.expired_attributes.intersection(state.unmodified)
+ context.partials[state] = attrs #<-- allow query.instances to commit the subset of attrs
-# NOTYET: populate attributes on non-loading instances which have been expired, deferred, etc.
-# elif getattr(state, 'expired_attributes', None): # TODO: base off total set of unloaded attributes, not just exp
-# attrs = state.expired_attributes.intersection(state.unmodified)
-# if 'populate_instance' not in extension.methods or extension.populate_instance(self, context, row, instance, only_load_props=attrs, instancekey=identitykey, isnew=isnew) is EXT_CONTINUE:
-# self.populate_instance(context, instance, row, only_load_props=attrs, instancekey=identitykey, isnew=isnew)
-# context.partials.add((state, attrs)) <-- allow query.instances to commit the subset of attrs
+ if 'populate_instance' not in extension.methods or extension.populate_instance(self, context, row, instance, only_load_props=attrs, instancekey=identitykey, isnew=isnew) is EXT_CONTINUE:
+ self.populate_instance(context, instance, row, only_load_props=attrs, instancekey=identitykey, isnew=isnew)
if result is not None and ('append_result' not in extension.methods or extension.append_result(self, context, row, instance, result, instancekey=identitykey, isnew=isnew) is EXT_CONTINUE):
result.append(instance)
if self.non_primary:
selectcontext.attributes[('populating_mapper', instance._state)] = self
- def _post_instance(self, selectcontext, state):
+ def _post_instance(self, selectcontext, state, **kwargs):
post_processors = selectcontext.attributes[('post_processors', self, None)]
for p in post_processors:
- p(state.obj())
+ p(state.obj(), **kwargs)
def _get_poly_select_loader(self, selectcontext, row):
"""set up attribute loaders for 'select' and 'deferred' polymorphic loading.
identitykey = self.identity_key_from_instance(instance)
+ only_load_props = flags.get('only_load_props', None)
+
params = {}
for c, bind in param_names:
params[bind] = self._get_attr_by_column(instance, c)
row = selectcontext.session.connection(self).execute(statement, params).fetchone()
- self.populate_instance(selectcontext, instance, row, isnew=False, instancekey=identitykey, ispostselect=True)
+ self.populate_instance(selectcontext, instance, row, isnew=False, instancekey=identitykey, ispostselect=True, only_load_props=only_load_props)
return post_execute
elif hosted_mapper.polymorphic_fetch == 'deferred':
from sqlalchemy.orm.strategies import DeferredColumnLoader
props = [prop for prop in [self._get_col_to_prop(col) for col in statement.inner_columns] if prop.key not in instance.__dict__]
keys = [p.key for p in props]
+
+ only_load_props = flags.get('only_load_props', None)
+ if only_load_props:
+ keys = util.Set(keys).difference(only_load_props)
+ props = [p for p in props if p.key in only_load_props]
+
for prop in props:
strategy = prop._get_strategy(DeferredColumnLoader)
instance._state.set_callable(prop.key, strategy.setup_loader(instance, props=keys, create_statement=create_statement))
from sqlalchemy.orm import *
from testlib import *
from testlib.fixtures import *
+import gc
class ExpireTest(FixtureTest):
keep_mappers = False
# object isnt refreshed yet, using dict to bypass trigger
assert u.__dict__.get('name') != 'jack'
- if False:
- # NOTYET: need to implement unconditional population
- # of expired attriutes in mapper._instances()
- sess.query(User).all()
- # test that it refreshed
- assert u.__dict__['name'] == 'jack'
+ sess.query(User).all()
+ # test that it refreshed
+ assert u.__dict__['name'] == 'jack'
- def go():
- assert u.name == 'jack'
- self.assert_sql_count(testing.db, go, 0)
+ def go():
+ assert u.name == 'jack'
+ self.assert_sql_count(testing.db, go, 0)
def test_expire_doesntload_on_set(self):
mapper(User, users)
assert o.isopen == 1
assert o.description == 'some new description'
- if False:
- # NOTYET: need to implement unconditional population
- # of expired attriutes in mapper._instances()
- sess.expire(o, ['isopen', 'description'])
- sess.query(Order).all()
- del o.isopen
- def go():
- assert o.isopen is None
- self.assert_sql_count(testing.db, go, 0)
+ sess.expire(o, ['isopen', 'description'])
+ sess.query(Order).all()
+ del o.isopen
+ def go():
+ assert o.isopen is None
+ self.assert_sql_count(testing.db, go, 0)
+ o.isopen=14
+ sess.expire(o)
+ o.description = 'another new description'
+ sess.query(Order).all()
+ assert o.isopen == 1
+ assert o.description == 'another new description'
+
+
def test_expire_committed(self):
"""test that the committed state of the attribute receives the most recent DB data"""
mapper(Order, orders)
assert u.addresses[0].email_address == 'jack@bean.com'
assert u.name == 'jack'
# two loads, since relation() + scalar are
- # separate right now
+ # separate right now on per-attribute load
self.assert_sql_count(testing.db, go, 2)
assert 'name' in u.__dict__
assert 'addresses' in u.__dict__
assert 'name' not in u.__dict__
assert 'addresses' not in u.__dict__
+ def go():
+ sess.query(User).filter_by(id=7).one()
+ assert u.addresses[0].email_address == 'jack@bean.com'
+ assert u.name == 'jack'
+ # one load, since relation() + scalar are
+ # together when eager load used with Query
+ self.assert_sql_count(testing.db, go, 1)
+
+ def test_relation_changes_preserved(self):
+ mapper(User, users, properties={
+ 'addresses':relation(Address, backref='user', lazy=False),
+ })
+ mapper(Address, addresses)
+ sess = create_session()
+ u = sess.query(User).get(8)
+ sess.expire(u, ['name', 'addresses'])
+ u.addresses
+ assert 'name' not in u.__dict__
+ del u.addresses[1]
+ u.name
+ assert 'name' in u.__dict__
+ assert len(u.addresses) == 2
+
+ def test_eagerload_props_dontload(self):
+ # relations currently have to load separately from scalar instances. the use case is:
+ # expire "addresses". then access it. lazy load fires off to load "addresses", but needs
+ # foreign key or primary key attributes in order to lazy load; hits those attributes,
+ # such as below it hits "u.id". "u.id" triggers full unexpire operation, eagerloads
+ # addresses since lazy=False. this is all wihtin lazy load which fires unconditionally;
+ # so an unnecessary eagerload (or lazyload) was issued. would prefer not to complicate
+ # lazyloading to "figure out" that the operation should be aborted right now.
+
+ mapper(User, users, properties={
+ 'addresses':relation(Address, backref='user', lazy=False),
+ })
+ mapper(Address, addresses)
+ sess = create_session()
+ u = sess.query(User).get(8)
+ sess.expire(u)
+ u.id
+ assert 'addresses' not in u.__dict__
+ u.addresses
+ assert 'addresses' in u.__dict__
+
def test_expire_synonym(self):
mapper(User, users, properties={
'uname':synonym('name')
# doing it that way right now
#self.assert_sql_count(testing.db, go, 0)
+ def test_relations_load_on_query(self):
+ mapper(User, users, properties={
+ 'addresses':relation(Address, backref='user'),
+ })
+ mapper(Address, addresses)
+
+ sess = create_session()
+ u = sess.query(User).get(8)
+ assert 'name' in u.__dict__
+ u.addresses
+ assert 'addresses' in u.__dict__
+
+ sess.expire(u, ['name', 'addresses'])
+ assert 'name' not in u.__dict__
+ assert 'addresses' not in u.__dict__
+ sess.query(User).options(eagerload('addresses')).filter_by(id=8).all()
+ assert 'name' in u.__dict__
+ assert 'addresses' in u.__dict__
+
def test_partial_expire_deferred(self):
mapper(Order, orders, properties={
'description':deferred(orders.c.description)
assert o.description == 'order 3'
assert o.isopen == 1
self.assert_sql_count(testing.db, go, 1)
+
+ def test_eagerload_query_refreshes(self):
+ mapper(User, users, properties={
+ 'addresses':relation(Address, backref='user', lazy=False),
+ })
+ mapper(Address, addresses)
+
+ sess = create_session()
+ u = sess.query(User).get(8)
+ assert len(u.addresses) == 3
+ sess.expire(u)
+ assert 'addresses' not in u.__dict__
+ print "-------------------------------------------"
+ sess.query(User).filter_by(id=8).all()
+ assert 'addresses' in u.__dict__
+ assert len(u.addresses) == 3
+
+ def test_expire_all(self):
+ mapper(User, users, properties={
+ 'addresses':relation(Address, backref='user', lazy=False),
+ })
+ mapper(Address, addresses)
+
+ sess = create_session()
+ userlist = sess.query(User).all()
+ assert fixtures.user_address_result == userlist
+ assert len(list(sess)) == 9
+ sess.expire_all()
+ gc.collect()
+ assert len(list(sess)) == 4 # since addresses were gc'ed
+
+ userlist = sess.query(User).all()
+ u = userlist[1]
+ assert fixtures.user_address_result == userlist
+ assert len(list(sess)) == 9
+
+class PolymorphicExpireTest(ORMTest):
+ keep_data = True
+
+ def define_tables(self, metadata):
+ global people, engineers, Person, Engineer
+
+ people = Table('people', metadata,
+ Column('person_id', Integer, Sequence('person_id_seq', optional=True), primary_key=True),
+ Column('name', String(50)),
+ Column('type', String(30)))
+
+ engineers = Table('engineers', metadata,
+ Column('person_id', Integer, ForeignKey('people.person_id'), primary_key=True),
+ Column('status', String(30)),
+ )
+
+ class Person(Base):
+ pass
+ class Engineer(Person):
+ pass
+
+ def insert_data(self):
+ people.insert().execute(
+ {'person_id':1, 'name':'person1', 'type':'person'},
+ {'person_id':2, 'name':'engineer1', 'type':'engineer'},
+ {'person_id':3, 'name':'engineer2', 'type':'engineer'},
+ )
+ engineers.insert().execute(
+ {'person_id':2, 'status':'new engineer'},
+ {'person_id':3, 'status':'old engineer'},
+ )
+
+ def test_poly_select(self):
+ mapper(Person, people, polymorphic_on=people.c.type, polymorphic_identity='person')
+ mapper(Engineer, engineers, inherits=Person, polymorphic_identity='engineer')
+
+ sess = create_session()
+ [p1, e1, e2] = sess.query(Person).order_by(people.c.person_id).all()
+
+ sess.expire(p1)
+ sess.expire(e1, ['status'])
+ sess.expire(e2)
+
+ for p in [p1, e2]:
+ assert 'name' not in p.__dict__
+
+ assert 'name' in e1.__dict__
+ assert 'status' not in e2.__dict__
+ assert 'status' not in e1.__dict__
+
+ e1.name = 'new engineer name'
+
+ def go():
+ sess.query(Person).all()
+ self.assert_sql_count(testing.db, go, 3)
+
+ for p in [p1, e1, e2]:
+ assert 'name' in p.__dict__
+
+ assert 'status' in e2.__dict__
+ assert 'status' in e1.__dict__
+ def go():
+ assert e1.name == 'new engineer name'
+ assert e2.name == 'engineer2'
+ assert e1.status == 'new engineer'
+ self.assert_sql_count(testing.db, go, 0)
+ self.assertEquals(Engineer.name.get_history(e1), (['new engineer name'], [], ['engineer1']))
+
+ def test_poly_deferred(self):
+ mapper(Person, people, polymorphic_on=people.c.type, polymorphic_identity='person', polymorphic_fetch='deferred')
+ mapper(Engineer, engineers, inherits=Person, polymorphic_identity='engineer')
+
+ sess = create_session()
+ [p1, e1, e2] = sess.query(Person).order_by(people.c.person_id).all()
+
+ sess.expire(p1)
+ sess.expire(e1, ['status'])
+ sess.expire(e2)
+
+ for p in [p1, e2]:
+ assert 'name' not in p.__dict__
+
+ assert 'name' in e1.__dict__
+ assert 'status' not in e2.__dict__
+ assert 'status' not in e1.__dict__
+
+ e1.name = 'new engineer name'
+
+ def go():
+ sess.query(Person).all()
+ self.assert_sql_count(testing.db, go, 1)
+
+ for p in [p1, e1, e2]:
+ assert 'name' in p.__dict__
+ assert 'status' not in e2.__dict__
+ assert 'status' not in e1.__dict__
+ def go():
+ assert e1.name == 'new engineer name'
+ assert e2.name == 'engineer2'
+ assert e1.status == 'new engineer'
+ assert e2.status == 'old engineer'
+ self.assert_sql_count(testing.db, go, 2)
+ self.assertEquals(Engineer.name.get_history(e1), (['new engineer name'], [], ['engineer1']))
+
+
class RefreshTest(FixtureTest):
keep_mappers = False
refresh_data = True