[ticket:2390]
- sql
+ - [bug] Added support for using the .key
+ of a Column as a string identifier in a
+ result set row. The .key is currently
+ listed as an "alternate" name for a column,
+ and is superseded by the name of a column
+ which has that key value as its regular name.
+ For the next major release
+ of SQLAlchemy we may reverse this precedence
+ so that .key takes precedence, but this
+ is not decided on yet. [ticket:2392]
+
- [bug] A significant change to how labeling
is applied to columns in SELECT statements
allows "truncated" labels, that is label names
__visit_name__ = 'label'
__slots__ = 'element', 'name'
- def __init__(self, col, name):
+ def __init__(self, col, name, alt_names=()):
self.element = col
self.name = name
+ self._alt_names = alt_names
@property
def proxy_set(self):
labelname = label.name
if result_map is not None:
- result_map[labelname.lower()] = \
- (label.name, (label, label.element, labelname),\
+ result_map[labelname.lower()] = (
+ label.name,
+ (label, label.element, labelname, ) +
+ label._alt_names,
label.type)
return label.element._compiler_dispatch(self,
name = self._truncated_identifier("colident", name)
if result_map is not None:
- result_map[name.lower()] = (orig_name, (column, name), column.type)
+ result_map[name.lower()] = (orig_name,
+ (column, name, column.key),
+ column.type)
if is_literal:
name = self.escape_literal_column(name)
if isinstance(column, sql._Label):
return column
- elif select is not None and select.use_labels and column._label:
- return _CompileLabel(column, column._label)
+ elif select is not None and \
+ select.use_labels and \
+ column._label:
+ return _CompileLabel(
+ column,
+ column._label,
+ alt_names=(column._key_label, )
+ )
elif \
asfrom and \
not column.is_literal and \
column.table is not None and \
not isinstance(column.table, sql.Select):
- return _CompileLabel(column, sql._as_truncated(column.name))
+ return _CompileLabel(column, sql._as_truncated(column.name),
+ alt_names=(column.key,))
elif not isinstance(column,
(sql._UnaryExpression, sql._TextClause)) \
and (not hasattr(column, 'name') or \
foreign_keys = []
quote = None
_label = None
+ _key_label = None
+ _alt_names = ()
@property
def _select_iterable(self):
def __init__(self, name, element, type_=None):
while isinstance(element, _Label):
element = element.element
- self.name = self.key = self._label = name \
- or _anonymous_label('%%(%d %s)s' % (id(self),
+ if name:
+ self.name = name
+ else:
+ self.name = _anonymous_label('%%(%d %s)s' % (id(self),
getattr(element, 'name', 'anon')))
+ self.key = self._label = self._key_label = self.name
self._element = element
self._type = type_
self.quote = element.quote
return self.name.encode('ascii', 'backslashreplace')
# end Py2K
+ @_memoized_property
+ def _key_label(self):
+ if self.key != self.name:
+ return self._gen_label(self.key)
+ else:
+ return self._label
+
@_memoized_property
def _label(self):
+ return self._gen_label(self.name)
+
+ def _gen_label(self, name):
t = self.table
if self.is_literal:
return None
elif t is not None and t.named_with_column:
if getattr(t, 'schema', None):
label = t.schema.replace('.', '_') + "_" + \
- t.name + "_" + self.name
+ t.name + "_" + name
else:
- label = t.name + "_" + self.name
+ label = t.name + "_" + name
# ensure the label name doesn't conflict with that
# of an existing column
return _as_truncated(label)
else:
- return self.name
+ return name
def label(self, name):
# currently, anonymous labels don't occur for
def _populate_column_collection(self):
for c in self.inner_columns:
if hasattr(c, '_make_proxy'):
- c._make_proxy(self, name=self.use_labels and c._label or None)
+ c._make_proxy(self,
+ name=self.use_labels
+ and c._label or None)
def self_group(self, against=None):
"""return a 'grouping' construct as per the ClauseElement
column('zip')
)
+keyed = Table('keyed', metadata,
+ Column('x', Integer, key='colx'),
+ Column('y', Integer, key='coly'),
+ Column('z', Integer),
+)
+
class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = 'default'
"SELECT sum(lala(mytable.myid)) AS bar FROM mytable"
)
+ # changes with #2397
+ self.assert_compile(
+ select([keyed]),
+ "SELECT keyed.x, keyed.y"
+ ", keyed.z FROM keyed"
+ )
+
+ # changes with #2397
+ self.assert_compile(
+ select([keyed]).apply_labels(),
+ "SELECT keyed.x AS keyed_x, keyed.y AS "
+ "keyed_y, keyed.z AS keyed_z FROM keyed"
+ )
+
def test_paramstyles(self):
stmt = text("select :foo, :bar, :bat from sometable")
)
def test_dupe_columns(self):
- """test that deduping is performed against clause element identity, not rendered result."""
+ """test that deduping is performed against clause
+ element identity, not rendered result."""
self.assert_compile(
select([column('a'), column('a'), column('a')]),
, dialect=default.DefaultDialect()
)
+ # using alternate keys.
+ # this will change with #2397
+ a, b, c = Column('a', Integer, key='b'), \
+ Column('b', Integer), \
+ Column('c', Integer, key='a')
+ self.assert_compile(
+ select([a, b, c, a, b, c]),
+ "SELECT a, b, c"
+ , dialect=default.DefaultDialect()
+ )
+
self.assert_compile(
select([bindparam('a'), bindparam('b'), bindparam('c')]),
"SELECT :a AS anon_1, :b AS anon_2, :c AS anon_3"
s = s.compile(dialect=default.DefaultDialect(paramstyle='qmark'))
eq_(s.positiontup, ['a', 'b', 'c'])
- def test_nested_uselabels(self):
- """test nested anonymous label generation. this
- essentially tests the ANONYMOUS_LABEL regex.
+ def test_nested_label_targeting(self):
+ """test nested anonymous label generation.
"""
-
s1 = table1.select()
s2 = s1.alias()
s3 = select([s2], use_labels=True)
'AS description FROM mytable) AS anon_2) '
'AS anon_1')
+ def test_nested_label_targeting_keyed(self):
+ # this behavior chagnes with #2397
+ s1 = keyed.select()
+ s2 = s1.alias()
+ s3 = select([s2], use_labels=True)
+ self.assert_compile(s3,
+ "SELECT anon_1.x AS anon_1_x, "
+ "anon_1.y AS anon_1_y, "
+ "anon_1.z AS anon_1_z FROM "
+ "(SELECT keyed.x AS x, keyed.y "
+ "AS y, keyed.z AS z FROM keyed) AS anon_1")
+
+ s4 = s3.alias()
+ s5 = select([s4], use_labels=True)
+ self.assert_compile(s5,
+ "SELECT anon_1.anon_2_x AS anon_1_anon_2_x, "
+ "anon_1.anon_2_y AS anon_1_anon_2_y, "
+ "anon_1.anon_2_z AS anon_1_anon_2_z "
+ "FROM (SELECT anon_2.x AS anon_2_x, anon_2.y AS anon_2_y, "
+ "anon_2.z AS anon_2_z FROM "
+ "(SELECT keyed.x AS x, keyed.y AS y, keyed.z "
+ "AS z FROM keyed) AS anon_2) AS anon_1"
+ )
+
def test_dont_overcorrelate(self):
self.assert_compile(select([table1], from_obj=[table1,
table1.select()]),
Column('user_name', VARCHAR(20)),
test_needs_acid=True
)
+
metadata.create_all()
@engines.close_first
)
concat = ("test: " + users.c.user_name).label('thedata')
- print select([concat]).order_by("thedata")
eq_(
select([concat]).order_by("thedata").execute().fetchall(),
[("test: ed",), ("test: fred",), ("test: jack",)]
]
)
+class KeyTargetingTest(fixtures.TablesTest):
+ run_inserts = 'once'
+ run_deletes = None
+
+ @classmethod
+ def define_tables(cls, metadata):
+ keyed1 = Table('keyed1', metadata,
+ Column("a", CHAR(2), key="b"),
+ Column("c", CHAR(2), key="q")
+ )
+ keyed2 = Table('keyed2', metadata,
+ Column("a", CHAR(2)),
+ Column("b", CHAR(2)),
+ )
+ keyed3 = Table('keyed3', metadata,
+ Column("a", CHAR(2)),
+ Column("d", CHAR(2)),
+ )
+ keyed4 = Table('keyed4', metadata,
+ Column("b", CHAR(2)),
+ Column("q", CHAR(2)),
+ )
+
+ content = Table('content', metadata,
+ Column('t', String(30), key="type"),
+ )
+ bar = Table('bar', metadata,
+ Column('ctype', String(30), key="content_type")
+ )
+
+ @classmethod
+ def insert_data(cls):
+ cls.tables.keyed1.insert().execute(dict(b="a1", q="c1"))
+ cls.tables.keyed2.insert().execute(dict(a="a2", b="b2"))
+ cls.tables.keyed3.insert().execute(dict(a="a3", d="d3"))
+ cls.tables.keyed4.insert().execute(dict(b="b4", q="q4"))
+ cls.tables.content.insert().execute(type="t1")
+
+ def test_keyed_accessor_single(self):
+ keyed1 = self.tables.keyed1
+ row = testing.db.execute(keyed1.select()).first()
+
+ eq_(row.b, "a1")
+ eq_(row.q, "c1")
+ eq_(row.a, "a1")
+ eq_(row.c, "c1")
+
+ def test_keyed_accessor_single_labeled(self):
+ keyed1 = self.tables.keyed1
+ row = testing.db.execute(keyed1.select().apply_labels()).first()
+
+ eq_(row.keyed1_b, "a1")
+ eq_(row.keyed1_q, "c1")
+ eq_(row.keyed1_a, "a1")
+ eq_(row.keyed1_c, "c1")
+
+ def test_keyed_accessor_composite_conflict_2(self):
+ keyed1 = self.tables.keyed1
+ keyed2 = self.tables.keyed2
+
+ row = testing.db.execute(select([keyed1, keyed2])).first()
+ # without #2397, row.b is unambiguous
+ eq_(row.b, "b2")
+ # row.a is ambiguous
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambig",
+ getattr, row, "a"
+ )
+
+ @testing.fails_if(lambda: True, "Possible future behavior")
+ def test_keyed_accessor_composite_conflict_2397(self):
+ keyed1 = self.tables.keyed1
+ keyed2 = self.tables.keyed2
+
+ row = testing.db.execute(select([keyed1, keyed2])).first()
+ # with #2397, row.a is unambiguous
+ eq_(row.a, "a2")
+ # row.b is ambiguous
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name 'b'",
+ getattr, row, 'b'
+ )
+
+ def test_keyed_accessor_composite_names_precedent(self):
+ keyed1 = self.tables.keyed1
+ keyed4 = self.tables.keyed4
+
+ row = testing.db.execute(select([keyed1, keyed4])).first()
+ eq_(row.b, "b4")
+ eq_(row.q, "q4")
+ eq_(row.a, "a1")
+ eq_(row.c, "c1")
+
+ def test_keyed_accessor_composite_keys_precedent(self):
+ keyed1 = self.tables.keyed1
+ keyed3 = self.tables.keyed3
+
+ row = testing.db.execute(select([keyed1, keyed3])).first()
+ assert 'b' not in row
+ eq_(row.q, "c1")
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name 'a'",
+ getattr, row, "a"
+ )
+ eq_(row.d, "d3")
+
+ @testing.fails_if(lambda: True, "Possible future behavior")
+ def test_keyed_accessor_composite_2397(self):
+ keyed1 = self.tables.keyed1
+ keyed3 = self.tables.keyed3
+
+ row = testing.db.execute(select([keyed1, keyed3])).first()
+ eq_(row.b, "a1")
+ eq_(row.q, "c1")
+ eq_(row.a, "a3")
+ eq_(row.d, "d3")
+
+ def test_keyed_accessor_composite_labeled(self):
+ keyed1 = self.tables.keyed1
+ keyed2 = self.tables.keyed2
+
+ row = testing.db.execute(select([keyed1, keyed2]).apply_labels()).first()
+ eq_(row.keyed1_b, "a1")
+ eq_(row.keyed1_a, "a1")
+ eq_(row.keyed1_q, "c1")
+ eq_(row.keyed1_c, "c1")
+ eq_(row.keyed2_a, "a2")
+ eq_(row.keyed2_b, "b2")
+ assert_raises(KeyError, lambda: row['keyed2_c'])
+ assert_raises(KeyError, lambda: row['keyed2_q'])
+
+ def test_column_label_overlap_fallback(self):
+ content, bar = self.tables.content, self.tables.bar
+ row = testing.db.execute(select([content.c.type.label("content_type")])).first()
+ assert content.c.type in row
+ assert bar.c.content_type not in row
+ assert sql.column('content_type') in row
+
+ row = testing.db.execute(select([func.now().label("content_type")])).first()
+ assert content.c.type not in row
+ assert bar.c.content_type not in row
+ assert sql.column('content_type') in row
+
+ def test_column_label_overlap_fallback_2(self):
+ # this fails with #2397
+ content, bar = self.tables.content, self.tables.bar
+ row = testing.db.execute(content.select(use_labels=True)).first()
+ assert content.c.type in row
+ assert bar.c.content_type not in row
+ assert sql.column('content_type') not in row
+
+ @testing.fails_if(lambda: True, "Possible future behavior")
+ def test_column_label_overlap_fallback_3(self):
+ # this passes with #2397
+ content, bar = self.tables.content, self.tables.bar
+ row = testing.db.execute(content.select(use_labels=True)).first()
+ assert content.c.type in row
+ assert bar.c.content_type not in row
+ assert sql.column('content_type') in row
class LimitTest(fixtures.TestBase):
table1.drop()
table2.drop()
- def testbasic(self):
+ def test_basic(self):
table1.insert().execute({'lowercase':1,'UPPERCASE':2,'MixedCase':3,'a123':4},
{'lowercase':2,'UPPERCASE':2,'MixedCase':3,'a123':4},
{'lowercase':4,'UPPERCASE':3,'MixedCase':2,'a123':1})
')'
)
- def testreflect(self):
+ def test_reflect(self):
meta2 = MetaData(testing.db)
t2 = Table('WorstCase2', meta2, autoload=True, quote=True)
assert 'MixedCase' in t2.c
- def testlabels(self):
+ def test_labels(self):
table1.insert().execute({'lowercase':1,'UPPERCASE':2,'MixedCase':3,'a123':4},
{'lowercase':2,'UPPERCASE':2,'MixedCase':3,'a123':4},
{'lowercase':4,'UPPERCASE':3,'MixedCase':2,'a123':1})
@testing.crashes('oracle', 'FIXME: unknown, verify not fails_on')
@testing.requires.subqueries
- def testlabels(self):
+ def test_labels(self):
"""test the quoting of labels.
if labels arent quoted, a query in postgresql in particular will fail since it produces:
x = table1.select(distinct=True).alias("LaLa").select().scalar()
- def testlabels2(self):
+ def test_labels2(self):
metadata = MetaData()
table = Table("ImATable", metadata,
Column("col1", Integer))
metadata = MetaData()
table = Table("ImATable", metadata,
Column("col1", Integer),
- Column("from", Integer, key="morf"),
+ Column("from", Integer),
Column("louisville", Integer),
Column("order", Integer))
- x = select([table.c.col1, table.c.morf, table.c.louisville, table.c.order])
+ x = select([table.c.col1, table.c['from'], table.c.louisville, table.c.order])
self.assert_compile(x,
'''SELECT "ImATable".col1, "ImATable"."from", "ImATable".louisville, "ImATable"."order" FROM "ImATable"''')
-
+
class PreparerTest(fixtures.TestBase):
"""Test the db-agnostic quoting services of IdentifierPreparer."""
Column('coly', Integer),
)
+keyed = Table('keyed', metadata,
+ Column('x', Integer, key='colx'),
+ Column('y', Integer, key='coly'),
+ Column('z', Integer),
+)
class SelectableTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
__dialect__ = 'default'
assert sel3.corresponding_column(l1) is sel3.c.foo
assert sel3.corresponding_column(l2) is sel3.c.bar
+ def test_keyed_gen(self):
+ s = select([keyed])
+ eq_(s.c.colx.key, 'colx')
+
+ # this would change to 'colx'
+ # with #2397
+ eq_(s.c.colx.name, 'x')
+
+ assert s.corresponding_column(keyed.c.colx) is s.c.colx
+ assert s.corresponding_column(keyed.c.coly) is s.c.coly
+ assert s.corresponding_column(keyed.c.z) is s.c.z
+
+ sel2 = s.alias()
+ assert sel2.corresponding_column(keyed.c.colx) is sel2.c.colx
+ assert sel2.corresponding_column(keyed.c.coly) is sel2.c.coly
+ assert sel2.corresponding_column(keyed.c.z) is sel2.c.z
+
+
def test_distance_on_aliases(self):
a1 = table1.alias('a1')
for s in (select([a1, table1], use_labels=True),