.. changelog::
:version: 0.8.1
+ .. change::
+ :tags: bug, core
+ :tickets: 2702
+
+ A major fix to the way in which a select() object produces
+ labeled columns when apply_labels() is used; this mode
+ produces a SELECT where each column is labeled as in
+ <tablename>_<columnname>, to remove column name collisions
+ for a multiple table select. The fix is that if two labels
+ collide when combined with the table name, i.e.
+ "foo.bar_id" and "foo_bar.id", anonymous aliasing will be
+ applied to one of the dupes. This allows the ORM to handle
+ both columns independently; previously, 0.7
+ would in some cases silently emit a second SELECT for the
+ column that was "duped", and in 0.8 an ambiguous column error
+ would be emitted. The "keys" applied to the .c. collection
+ of the select() will also be deduped, so that the "column
+ being replaced" warning will no longer emit for any select()
+ that specifies use_labels, though the dupe key will be given
+ an anonymous label which isn't generally user-friendly.
+
.. change::
:tags: bug, orm, declarative
:tickets: 2656
# this check isn't currently available if the row
# was unpickled.
if result is not None and \
- result[1] is not None:
+ result[1] is not None:
for obj in result[1]:
if key._compare_name_for_result(obj):
break
else:
self.result_map[keyname] = name, objects, type_
- def _label_select_column(self, select, column, populate_result_map,
+ def _label_select_column(self, select, column,
+ populate_result_map,
asfrom, column_clause_args,
+ name=None,
within_columns_clause=True):
"""produce labeled columns present in a select()."""
if column.type._has_column_expression and \
- populate_result_map:
+ populate_result_map:
col_expr = column.type.column_expression(column)
add_to_result_map = lambda keyname, name, objects, type_: \
self._add_to_result_map(
else:
result_expr = col_expr
- elif select is not None and \
- select.use_labels and \
- column._label:
+ elif select is not None and name:
result_expr = _CompileLabel(
col_expr,
- column._label,
- alt_names=(column._key_label, )
+ name,
+ alt_names=(column._key_label,)
)
elif \
isinstance(column, sql.ColumnClause) and \
not column.is_literal and \
column.table is not None and \
- not isinstance(column.table, sql.Select):
+ not isinstance(column.table, sql.Select):
result_expr = _CompileLabel(col_expr,
sql._as_truncated(column.name),
alt_names=(column.key,))
# correlate_froms.union(existingfroms)
populate_result_map = force_result_map or (
- compound_index == 0 and (
- not entry or \
- entry.get('iswrapper', False)
- )
- )
+ compound_index == 0 and (
+ not entry or \
+ entry.get('iswrapper', False)
+ )
+ )
self.stack.append({'from': correlate_froms,
'iswrapper': iswrapper})
# the actual list of columns to print in the SELECT column list.
inner_columns = [
c for c in [
- self._label_select_column(select, column,
+ self._label_select_column(select,
+ column,
populate_result_map, asfrom,
- column_clause_args)
- for column in util.unique_list(select.inner_columns)
+ column_clause_args,
+ name=name)
+ for name, column in select._columns_plus_names
]
if c is not None
]
fromclause = _interpret_as_from(fromclause)
self._from_obj = self._from_obj.union([fromclause])
+
+ @_memoized_property
+ def _columns_plus_names(self):
+ if self.use_labels:
+ names = set()
+ def name_for_col(c):
+ if c._label is None:
+ return (None, c)
+ name = c._label
+ if name in names:
+ name = c.anon_label
+ else:
+ names.add(name)
+ return name, c
+
+ return [
+ name_for_col(c)
+ for c in util.unique_list(_select_iterables(self._raw_columns))
+ ]
+ else:
+ return [
+ (None, c)
+ for c in util.unique_list(_select_iterables(self._raw_columns))
+ ]
+
def _populate_column_collection(self):
- for c in self.inner_columns:
- if hasattr(c, '_make_proxy'):
- c._make_proxy(self,
- name=c._label if self.use_labels else None,
- key=c._key_label if self.use_labels else None,
- name_is_truncatable=True)
+ for name, c in self._columns_plus_names:
+ if not hasattr(c, '_make_proxy'):
+ continue
+ if name is None:
+ key = None
+ elif self.use_labels:
+ key = c._key_label
+ if key is not None and key in self.c:
+ key = c.anon_label
+ else:
+ key = None
+
+ c._make_proxy(self, key=key,
+ name=name,
+ name_is_truncatable=True)
def _refresh_for_new_column(self, column):
for fromclause in self._froms:
ordered_column_set = OrderedSet
populate_column_dict = PopulateDict
-
def unique_list(seq, hashfunc=None):
seen = {}
if not hashfunc:
def go():
s = select([t1], t1.c.c2 == t2.c.c1)
s.compile(dialect=self.dialect)
+ go()
+
+ def test_select_labels(self):
+ # give some of the cached type values
+ # a chance to warm up
+ s = select([t1], t1.c.c2 == t2.c.c1).apply_labels()
+ s.compile(dialect=self.dialect)
+
+ @profiling.function_call_count()
+ def go():
+ s = select([t1], t1.c.c2 == t2.c.c1).apply_labels()
+ s.compile(dialect=self.dialect)
go()
\ No newline at end of file
filter(Sub1.id==1).one(),
b1
)
+
+class LabelCollideTest(fixtures.MappedTest):
+ """Test handling for a label collision. This collision
+ is handled by core, see ticket:2702 as well as
+ test/sql/test_selectable->WithLabelsTest. here we want
+ to make sure the end result is as we expect.
+
+ """
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('foo', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('bar_id', Integer)
+ )
+ Table('foo_bar', metadata,
+ Column('id', Integer, primary_key=True),
+ )
+
+ @classmethod
+ def setup_classes(cls):
+ class Foo(cls.Basic):
+ pass
+ class Bar(cls.Basic):
+ pass
+
+ @classmethod
+ def setup_mappers(cls):
+ mapper(cls.classes.Foo, cls.tables.foo)
+ mapper(cls.classes.Bar, cls.tables.foo_bar)
+
+ @classmethod
+ def insert_data(cls):
+ s = Session()
+ s.add_all([
+ cls.classes.Foo(id=1, bar_id=2),
+ cls.classes.Bar(id=3)
+ ])
+ s.commit()
+
+ def test_overlap_plain(self):
+ s = Session()
+ row = s.query(self.classes.Foo, self.classes.Bar).all()[0]
+ def go():
+ eq_(row.Foo.id, 1)
+ eq_(row.Foo.bar_id, 2)
+ eq_(row.Bar.id, 3)
+ # all three columns are loaded independently without
+ # overlap, no additional SQL to load all attributes
+ self.assert_sql_count(testing.db, go, 0)
+
+ def test_overlap_subquery(self):
+ s = Session()
+ row = s.query(self.classes.Foo, self.classes.Bar).from_self().all()[0]
+ def go():
+ eq_(row.Foo.id, 1)
+ eq_(row.Foo.bar_id, 2)
+ eq_(row.Bar.id, 3)
+ # all three columns are loaded independently without
+ # overlap, no additional SQL to load all attributes
+ self.assert_sql_count(testing.db, go, 0)
\ No newline at end of file
test.aaa_profiling.test_compiler.CompileTest.test_select 2.7_sqlite_pysqlite_cextensions 135
test.aaa_profiling.test_compiler.CompileTest.test_select 2.7_sqlite_pysqlite_nocextensions 135
+# TEST: test.aaa_profiling.test_compiler.CompileTest.test_select_labels
+
+test.aaa_profiling.test_compiler.CompileTest.test_select_labels 2.7_sqlite_pysqlite_nocextensions 177
+
# TEST: test.aaa_profiling.test_compiler.CompileTest.test_update
test.aaa_profiling.test_compiler.CompileTest.test_update 2.5_sqlite_pysqlite_nocextensions 65
lambda: row[u2.c.user_id]
)
+ def test_ambiguous_column_contains(self):
+ # ticket 2702. in 0.7 we'd get True, False.
+ # in 0.8, both columns are present so it's True;
+ # but when they're fetched you'll get the ambiguous error.
+ users.insert().execute(user_id=1, user_name='john')
+ result = select([
+ users.c.user_id,
+ addresses.c.user_id]).\
+ select_from(users.outerjoin(addresses)).execute()
+ row = result.first()
+
+ eq_(
+ set([users.c.user_id in row, addresses.c.user_id in row]),
+ set([True])
+ )
+
def test_ambiguous_column_by_col_plus_label(self):
users.insert().execute(user_id=1, user_name='john')
result = select([users.c.user_id,
comp2 = c2.comparator
assert (c2 == 5).left._annotations == {"foo": "bar", "bat": "hoho"}
+
+class WithLabelsTest(fixtures.TestBase):
+ def _assert_labels_warning(self, s):
+ assert_raises_message(
+ exc.SAWarning,
+ "replaced by another column with the same key",
+ lambda: s.c
+ )
+
+ def _assert_result_keys(self, s, keys):
+ compiled = s.compile()
+ eq_(set(compiled.result_map), set(keys))
+
+ def _assert_subq_result_keys(self, s, keys):
+ compiled = s.select().compile()
+ eq_(set(compiled.result_map), set(keys))
+
+ def _names_overlap(self):
+ m = MetaData()
+ t1 = Table('t1', m, Column('x', Integer))
+ t2 = Table('t2', m, Column('x', Integer))
+ return select([t1, t2])
+
+ def test_names_overlap_nolabel(self):
+ sel = self._names_overlap()
+ self._assert_labels_warning(sel)
+ self._assert_result_keys(sel, ['x'])
+
+ def test_names_overlap_label(self):
+ sel = self._names_overlap().apply_labels()
+ eq_(
+ sel.c.keys(),
+ ['t1_x', 't2_x']
+ )
+ self._assert_result_keys(sel, ['t1_x', 't2_x'])
+
+ def _names_overlap_keys_dont(self):
+ m = MetaData()
+ t1 = Table('t1', m, Column('x', Integer, key='a'))
+ t2 = Table('t2', m, Column('x', Integer, key='b'))
+ return select([t1, t2])
+
+ def test_names_overlap_keys_dont_nolabel(self):
+ sel = self._names_overlap_keys_dont()
+ eq_(
+ sel.c.keys(),
+ ['a', 'b']
+ )
+ self._assert_result_keys(sel, ['x'])
+
+ def test_names_overlap_keys_dont_label(self):
+ sel = self._names_overlap_keys_dont().apply_labels()
+ eq_(
+ sel.c.keys(),
+ ['t1_a', 't2_b']
+ )
+ self._assert_result_keys(sel, ['t1_x', 't2_x'])
+
+ def _labels_overlap(self):
+ m = MetaData()
+ t1 = Table('t', m, Column('x_id', Integer))
+ t2 = Table('t_x', m, Column('id', Integer))
+ return select([t1, t2])
+
+ def test_labels_overlap_nolabel(self):
+ sel = self._labels_overlap()
+ eq_(
+ sel.c.keys(),
+ ['x_id', 'id']
+ )
+ self._assert_result_keys(sel, ['x_id', 'id'])
+
+ def test_labels_overlap_label(self):
+ sel = self._labels_overlap().apply_labels()
+ t2 = sel.froms[1]
+ eq_(
+ sel.c.keys(),
+ ['t_x_id', t2.c.id.anon_label]
+ )
+ self._assert_result_keys(sel, ['t_x_id', 'id_1'])
+ self._assert_subq_result_keys(sel, ['t_x_id', 'id_1'])
+
+ def _labels_overlap_keylabels_dont(self):
+ m = MetaData()
+ t1 = Table('t', m, Column('x_id', Integer, key='a'))
+ t2 = Table('t_x', m, Column('id', Integer, key='b'))
+ return select([t1, t2])
+
+ def test_labels_overlap_keylabels_dont_nolabel(self):
+ sel = self._labels_overlap_keylabels_dont()
+ eq_(sel.c.keys(), ['a', 'b'])
+ self._assert_result_keys(sel, ['x_id', 'id'])
+
+ def test_labels_overlap_keylabels_dont_label(self):
+ sel = self._labels_overlap_keylabels_dont().apply_labels()
+ eq_(sel.c.keys(), ['t_a', 't_x_b'])
+ self._assert_result_keys(sel, ['t_x_id', 'id_1'])
+
+ def _keylabels_overlap_labels_dont(self):
+ m = MetaData()
+ t1 = Table('t', m, Column('a', Integer, key='x_id'))
+ t2 = Table('t_x', m, Column('b', Integer, key='id'))
+ return select([t1, t2])
+
+ def test_keylabels_overlap_labels_dont_nolabel(self):
+ sel = self._keylabels_overlap_labels_dont()
+ eq_(sel.c.keys(), ['x_id', 'id'])
+ self._assert_result_keys(sel, ['a', 'b'])
+
+ def test_keylabels_overlap_labels_dont_label(self):
+ sel = self._keylabels_overlap_labels_dont().apply_labels()
+ t2 = sel.froms[1]
+ eq_(sel.c.keys(), ['t_x_id', t2.c.id.anon_label])
+ self._assert_result_keys(sel, ['t_a', 't_x_b'])
+ self._assert_subq_result_keys(sel, ['t_a', 't_x_b'])
+
+ def _keylabels_overlap_labels_overlap(self):
+ m = MetaData()
+ t1 = Table('t', m, Column('x_id', Integer, key='x_a'))
+ t2 = Table('t_x', m, Column('id', Integer, key='a'))
+ return select([t1, t2])
+
+ def test_keylabels_overlap_labels_overlap_nolabel(self):
+ sel = self._keylabels_overlap_labels_overlap()
+ eq_(sel.c.keys(), ['x_a', 'a'])
+ self._assert_result_keys(sel, ['x_id', 'id'])
+ self._assert_subq_result_keys(sel, ['x_id', 'id'])
+
+ def test_keylabels_overlap_labels_overlap_label(self):
+ sel = self._keylabels_overlap_labels_overlap().apply_labels()
+ t2 = sel.froms[1]
+ eq_(sel.c.keys(), ['t_x_a', t2.c.a.anon_label])
+ self._assert_result_keys(sel, ['t_x_id', 'id_1'])
+ self._assert_subq_result_keys(sel, ['t_x_id', 'id_1'])
+
+ def _keys_overlap_names_dont(self):
+ m = MetaData()
+ t1 = Table('t1', m, Column('a', Integer, key='x'))
+ t2 = Table('t2', m, Column('b', Integer, key='x'))
+ return select([t1, t2])
+
+ def test_keys_overlap_names_dont_nolabel(self):
+ sel = self._keys_overlap_names_dont()
+ self._assert_labels_warning(sel)
+ self._assert_result_keys(sel, ['a', 'b'])
+
+ def test_keys_overlap_names_dont_label(self):
+ sel = self._keys_overlap_names_dont().apply_labels()
+ eq_(
+ sel.c.keys(),
+ ['t1_x', 't2_x']
+ )
+ self._assert_result_keys(sel, ['t1_a', 't2_b'])