--- /dev/null
+.. change::
+ :tags: bug, engine
+ :tickets: 8536
+
+ Fixed issue where mixing "*" with additional explicitly-named column
+ expressions within the columns clause of a :func:`_sql.select` construct
+ would cause result-column targeting to sometimes consider the label name or
+ other non-repeated names to be an ambiguous target.
result_columns,
cols_are_ordered,
textual_ordered,
+ ad_hoc_textual,
loose_column_name_matching,
) = context.result_column_struct
num_ctx_cols = len(result_columns)
cols_are_ordered
) = (
num_ctx_cols
+ ) = (
+ ad_hoc_textual
) = loose_column_name_matching = textual_ordered = False
# merge cursor.description with the column info
num_ctx_cols,
cols_are_ordered,
textual_ordered,
+ ad_hoc_textual,
loose_column_name_matching,
)
}
if len(by_key) != num_ctx_cols:
- # if by-primary-string dictionary smaller (or bigger?!) than
- # number of columns, assume we have dupes, rewrite
+ # if by-primary-string dictionary smaller than
+ # number of columns, assume we have dupes; (this check
+ # is also in place if string dictionary is bigger, as
+ # can occur when '*' was used as one of the compiled columns,
+ # which may or may not be suggestive of dupes), rewrite
# dupe records with "None" for index which results in
# ambiguous column exception when accessed.
#
num_ctx_cols,
cols_are_ordered,
textual_ordered,
+ ad_hoc_textual,
loose_column_name_matching,
):
"""Merge a cursor.description with compiled result column information.
# name-based or text-positional cases, where we need
# to read cursor.description names
- if textual_ordered:
+ if textual_ordered or (
+ ad_hoc_textual and len(cursor_description) == num_ctx_cols
+ ):
self._safe_for_cache = True
# textual positional case
raw_iterator = self._merge_textual_cols_by_position(
executemany = False
compiled: Optional[Compiled] = None
result_column_struct: Optional[
- Tuple[List[ResultColumnsEntry], bool, bool, bool]
+ Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
] = None
returned_default_rows: Optional[Sequence[Row[Any]]] = None
compiled._result_columns,
compiled._ordered_columns,
compiled._textual_ordered_columns,
+ compiled._ad_hoc_textual,
compiled._loose_column_name_matching,
)
_textual_ordered_columns: bool = False
"""tell the result object that the column names as rendered are important,
but they are also "ordered" vs. what is in the compiled object here.
+
+ As of 1.4.42 this condition is only present when the statement is a
+ TextualSelect, e.g. text("....").columns(...), where it is required
+ that the columns are considered positionally and not by name.
+
+ """
+
+ _ad_hoc_textual: bool = False
+ """tell the result that we encountered text() or '*' constructs in the
+ middle of the result columns, but we also have compiled columns, so
+ if the number of columns in cursor.description does not match how many
+ expressions we have, that means we can't rely on positional at all and
+ should match on name.
+
"""
_ordered_columns: bool = True
) -> None:
if keyname is None or keyname == "*":
self._ordered_columns = False
- self._textual_ordered_columns = True
+ self._ad_hoc_textual = True
if type_._is_tuple_type:
raise exc.CompileError(
"Most backends don't support SELECTing "
return skip_if(["+pyodbc"], "no driver support")
+ @property
+ def select_star_mixed(self):
+ r"""target supports expressions like "SELECT x, y, \*, z FROM table"
+
+ apparently MySQL / MariaDB, Oracle doesn't handle this.
+
+ We only need a few backends so just cover SQLite / PG
+
+ """
+ return only_on(["sqlite", "postgresql"])
+
@property
def independent_connections(self):
"""
set([True]),
)
+ @testing.combinations(
+ (("name_label", "*"), False),
+ (("*", "name_label"), False),
+ (("user_id", "name_label", "user_name"), False),
+ (("user_id", "name_label", "*", "user_name"), True),
+ argnames="cols,other_cols_are_ambiguous",
+ )
+ @testing.requires.select_star_mixed
+ def test_label_against_star(
+ self, connection, cols, other_cols_are_ambiguous
+ ):
+ """test #8536"""
+ users = self.tables.users
+
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
+
+ stmt = select(
+ *[
+ text("*")
+ if colname == "*"
+ else users.c.user_name.label("name_label")
+ if colname == "name_label"
+ else users.c[colname]
+ for colname in cols
+ ]
+ )
+
+ row = connection.execute(stmt).first()
+
+ eq_(row._mapping["name_label"], "john")
+
+ if other_cols_are_ambiguous:
+ with expect_raises_message(
+ exc.InvalidRequestError, "Ambiguous column name"
+ ):
+ row._mapping["user_id"]
+ with expect_raises_message(
+ exc.InvalidRequestError, "Ambiguous column name"
+ ):
+ row._mapping["user_name"]
+ else:
+ eq_(row._mapping["user_id"], 1)
+ eq_(row._mapping["user_name"], "john")
+
def test_loose_matching_one(self, connection):
users = self.tables.users
addresses = self.tables.addresses