and scoped_session() has been renamed to
"expire_on_commit". It does not affect
the expiration behavior of rollback().
+
+ - fixed endless loop bug which could occur
+ within a mapper's deferred load of
+ inherited attributes.
- a legacy-support flag
"_enable_transation_accounting" flag added
Additionally fixed bugs involving order_by being
specified as a class-bound attribute in conjunction
with eager loading.
-
+
+ - declarative initialization of Columns adjusted so that
+ non-renamed columns initialize in the same way as a non
+ declarative mapper. This allows an inheriting mapper
+ to set up its same-named "id" columns in particular
+ such that the parent "id" column is favored over the child
+ column, reducing database round trips when this value
+ is requested.
+
- mysql
- Quoting of MSEnum values for use in CREATE TABLE is now
optional & will be quoted on demand as required. (Quoting was
elif isinstance(c, Column):
_undefer_column_name(key, c)
cols.append(c)
+ # if the column is the same name as the key,
+ # remove it from the explicit properties dict.
+ # the normal rules for assigning column-based properties
+ # will take over, including precedence of columns
+ # in multi-column ColumnProperties.
+ if key == c.key:
+ del our_stuff[key]
cls.__table__ = table = Table(tablename, cls.metadata,
*(tuple(cols) + tuple(args)), **table_kw)
else:
def set(self, state, value, initiator):
raise NotImplementedError()
- def get_committed_value(self, state):
+ def get_committed_value(self, state, passive=False):
"""return the unchanged value of this attribute"""
if self.key in state.committed_state:
else:
return state.committed_state.get(self.key)
else:
- return self.get(state)
+ return self.get(state, passive=passive)
def set_committed_value(self, state, value):
"""set an attribute value on the given instance and 'commit' it."""
state = attributes.instance_state(obj)
return self._get_committed_state_attr_by_column(state, column)
- def _get_committed_state_attr_by_column(self, state, column):
- return self._get_col_to_prop(column).getcommitted(state, column)
+ def _get_committed_state_attr_by_column(self, state, column, passive=False):
+ return self._get_col_to_prop(column).getcommitted(state, column, passive=passive)
def _save_obj(self, states, uowtransaction, postupdate=False, post_update_cols=None, single=False):
"""Issue ``INSERT`` and/or ``UPDATE`` statements for a list of objects.
identitykey = self._identity_key_from_state(refresh_state)
else:
identitykey = identity_key(row)
-
+
if identitykey in session_identity_map:
instance = session_identity_map[identitykey]
state = attributes.instance_state(instance)
if self.base_mapper.local_table in tables:
return None
+ class ColumnsNotAvailable(Exception):
+ pass
+
def visit_binary(binary):
leftcol = binary.left
rightcol = binary.right
return
if leftcol.table not in tables:
- binary.left = sql.bindparam(None, self._get_committed_state_attr_by_column(state, leftcol), type_=binary.right.type)
+ leftval = self._get_committed_state_attr_by_column(state, leftcol, passive=True)
+ if leftval is attributes.PASSIVE_NORESULT:
+ raise ColumnsNotAvailable()
+ binary.left = sql.bindparam(None, leftval, type_=binary.right.type)
elif rightcol.table not in tables:
- binary.right = sql.bindparam(None, self._get_committed_state_attr_by_column(state, rightcol), type_=binary.right.type)
+ rightval = self._get_committed_state_attr_by_column(state, rightcol, passive=True)
+ if rightval is attributes.PASSIVE_NORESULT:
+ raise ColumnsNotAvailable()
+ binary.right = sql.bindparam(None, rightval, type_=binary.right.type)
allconds = []
- start = False
- for mapper in reversed(list(self.iterate_to_root())):
- if mapper.local_table in tables:
- start = True
- if start and not mapper.single:
- allconds.append(visitors.cloned_traverse(mapper.inherit_condition, {}, {'binary':visit_binary}))
-
+ try:
+ start = False
+ for mapper in reversed(list(self.iterate_to_root())):
+ if mapper.local_table in tables:
+ start = True
+ if start and not mapper.single:
+ allconds.append(visitors.cloned_traverse(mapper.inherit_condition, {}, {'binary':visit_binary}))
+ except ColumnsNotAvailable:
+ return None
+
cond = sql.and_(*allconds)
return sql.select(tables, cond, use_labels=True)
raise sa_exc.UnboundExecutionError("Instance %s is not bound to a Session; attribute refresh operation cannot proceed" % (state_str(state)))
has_key = _state_has_identity(state)
-
+
result = False
if mapper.inherits and not mapper.concrete:
statement = mapper._optimized_get_statement(state, attribute_names)
def getattr(self, state, column):
return state.get_impl(self.key).get(state)
- def getcommitted(self, state, column):
- return state.get_impl(self.key).get_committed_value(state)
+ def getcommitted(self, state, column, passive=False):
+ return state.get_impl(self.key).get_committed_value(state, passive=passive)
def setattr(self, state, value, column):
state.get_impl(self.key).set(state, value, None)
obj = state.get_impl(self.key).get(state)
return self.get_col_value(column, obj)
- def getcommitted(self, state, column):
- obj = state.get_impl(self.key).get_committed_value(state)
+ def getcommitted(self, state, column, passive=False):
+ obj = state.get_impl(self.key).get_committed_value(state, passive=passive)
return self.get_col_value(column, obj)
def setattr(self, state, value, column):
any(Engineer.primary_language == 'cobol')).first()),
c2)
+ # ensure that the Manager mapper was compiled
+ # with the Person id column as higher priority.
+ # this ensures that "id" will get loaded from the Person row
+ # and not the possibly non-present Manager row
+ assert Manager.id.property.columns == [Person.__table__.c.id, Manager.__table__.c.id]
+
+ # assert that the "id" column is available without a second load.
+ # this would be the symptom of the previous step not being correct.
+ sess.clear()
+ def go():
+ assert sess.query(Manager).filter(Manager.name=='dogbert').one().id
+ self.assert_sql_count(testing.db, go, 1)
+ sess.clear()
+ def go():
+ assert sess.query(Person).filter(Manager.name=='dogbert').one().id
+ self.assert_sql_count(testing.db, go, 1)
+
def test_inheritance_with_undefined_relation(self):
class Parent(Base):
__tablename__ = 'parent'
assert sess.query(Base).get(b1.base_id).data == "this is base"
assert sess.query(Sub).get(s1.base_id).data == "this is base"
+class OptimizedLoadTest(ORMTest):
+ """test that the 'optimized load' routine doesn't crash when
+ a column in the join condition is not available.
+
+ """
+ def define_tables(self, metadata):
+ global base, sub
+ base = Table('base', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(50)),
+ Column('type', String(50))
+ )
+ sub = Table('sub', metadata,
+ Column('id', Integer, ForeignKey('base.id'), primary_key=True),
+ Column('sub', String(50))
+ )
+
+ def test_optimized_passes(self):
+ class Base(object):
+ pass
+ class Sub(Base):
+ pass
+
+ mapper(Base, base, polymorphic_on=base.c.type, polymorphic_identity='base')
+
+ # redefine Sub's "id" to favor the "id" col in the subtable.
+ # "id" is also part of the primary join condition
+ mapper(Sub, sub, inherits=Base, polymorphic_identity='sub', properties={'id':sub.c.id})
+ sess = create_session()
+ s1 = Sub()
+ s1.data = 's1data'
+ s1.sub = 's1sub'
+ sess.save(s1)
+ sess.flush()
+ sess.clear()
+
+ # load s1 via Base. s1.id won't populate since it's relative to
+ # the "sub" table. The optimized load kicks in and tries to
+ # generate on the primary join, but cannot since "id" is itself unloaded.
+ # the optimized load needs to return "None" so regular full-row loading proceeds
+ s1 = sess.query(Base).get(s1.id)
+ assert s1.sub == 's1sub'
class DeleteOrphanTest(ORMTest):
def define_tables(self, metadata):
session.query(Person).filter(getattr(Person, person_attribute_name)=='dilbert').first(),
dilbert
)
+
+ assert session.query(Person).filter(getattr(Person, person_attribute_name)=='dilbert').first().person_id
+
self.assertEquals(
session.query(Engineer).filter(getattr(Person, person_attribute_name)=='dilbert').first(),
dilbert
# test that the splicing of the join works here, doesnt break in the middle of "parent join child1"
self.assert_compile(q.limit(1).with_labels().statement,
- "SELECT anon_1.child1_id AS anon_1_child1_id, anon_1.parent_id AS anon_1_parent_id, "\
- "anon_1.parent_cls AS anon_1_parent_cls, anon_2.child2_id AS anon_2_child2_id, anon_2.parent_id AS anon_2_parent_id, "\
- "anon_2.parent_cls AS anon_2_parent_cls FROM (SELECT child1.id AS child1_id, parent.id AS parent_id, "\
- "parent.cls AS parent_cls FROM parent JOIN child1 ON parent.id = child1.id "\
- "LIMIT 1) AS anon_1 LEFT OUTER JOIN secondary AS secondary_1 ON anon_1.parent_id = secondary_1.right_id LEFT OUTER JOIN "\
- "(SELECT parent.id AS parent_id, parent.cls AS parent_cls, child2.id AS child2_id FROM parent JOIN child2 ON parent.id = child2.id) "\
+ "SELECT anon_1.parent_id AS anon_1_parent_id, anon_1.child1_id AS anon_1_child1_id, "\
+ "anon_1.parent_cls AS anon_1_parent_cls, anon_2.parent_id AS anon_2_parent_id, "\
+ "anon_2.child2_id AS anon_2_child2_id, anon_2.parent_cls AS anon_2_parent_cls FROM "\
+ "(SELECT parent.id AS parent_id, child1.id AS child1_id, parent.cls AS parent_cls FROM parent "\
+ "JOIN child1 ON parent.id = child1.id LIMIT 1) AS anon_1 LEFT OUTER JOIN secondary AS secondary_1 "\
+ "ON anon_1.parent_id = secondary_1.right_id LEFT OUTER JOIN (SELECT parent.id AS parent_id, "\
+ "parent.cls AS parent_cls, child2.id AS child2_id FROM parent JOIN child2 ON parent.id = child2.id) "\
"AS anon_2 ON anon_2.parent_id = secondary_1.left_id"
, dialect=default.DefaultDialect())