translated_indexes: Optional[List[int]],
safe_for_cache: bool,
keymap_by_result_column_idx: Any,
- ) -> CursorResultMetaData:
+ ) -> Self:
new_obj = self.__class__.__new__(self.__class__)
new_obj._unpickled = unpickled
new_obj._processors = processors
new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX)
return new_obj
- def _remove_processors(self) -> CursorResultMetaData:
+ def _remove_processors(self) -> Self:
assert not self._tuplefilter
return self._make_new_metadata(
unpickled=self._unpickled,
keymap_by_result_column_idx=self._keymap_by_result_column_idx,
)
- def _splice_horizontally(
- self, other: CursorResultMetaData
- ) -> CursorResultMetaData:
+ def _splice_horizontally(self, other: CursorResultMetaData) -> Self:
assert not self._tuplefilter
keymap = dict(self._keymap)
},
)
- def _reduce(self, keys: Sequence[_KeyIndexType]) -> ResultMetaData:
+ def _reduce(self, keys: Sequence[_KeyIndexType]) -> Self:
recs = list(self._metadata_for_keys(keys))
indexes = [rec[MD_INDEX] for rec in recs]
keymap_by_result_column_idx=self._keymap_by_result_column_idx,
)
- def _adapt_to_context(self, context: ExecutionContext) -> ResultMetaData:
+ def _adapt_to_context(self, context: ExecutionContext) -> Self:
"""When using a cached Compiled construct that has a _result_map,
for a new statement that used the cached Compiled, we need to ensure
the keymap has the Column objects from our new statement as keys.
self,
parent: CursorResult[Unpack[TupleAny]],
cursor_description: _DBAPICursorDescription,
+ *,
+ driver_column_names: bool = False,
):
context = parent.context
self._tuplefilter = None
textual_ordered,
ad_hoc_textual,
loose_column_name_matching,
+ driver_column_names,
)
# processors in key order which are used when building up
for metadata_entry in raw
}
- # update keymap with "translated" names. In SQLAlchemy this is a
- # sqlite only thing, and in fact impacting only extremely old SQLite
- # versions unlikely to be present in modern Python versions.
- # however, the pyhive third party dialect is
- # also using this hook, which means others still might use it as well.
- # I dislike having this awkward hook here but as long as we need
- # to use names in cursor.description in some cases we need to have
- # some hook to accomplish this.
- if not num_ctx_cols and context._translate_colname:
+ # update keymap with "translated" names.
+ # the "translated" name thing has a long history:
+ # 1. originally, it was used to fix an issue in very old SQLite
+ # versions prior to 3.10.0. This code is still there in the
+ # sqlite dialect.
+ # 2. Next, the pyhive third party dialect started using this hook
+ # for some driver related issue on their end.
+ # 3. Most recently, the "driver_column_names" execution option has
+ # taken advantage of this hook to get raw DBAPI col names in the
+ # result keys without disrupting the usual merge process.
+
+ if driver_column_names or (
+ not num_ctx_cols and context._translate_colname
+ ):
self._keymap.update(
{
metadata_entry[MD_UNTRANSLATED]: self._keymap[
textual_ordered,
ad_hoc_textual,
loose_column_name_matching,
+ driver_column_names,
):
"""Merge a cursor.description with compiled result column information.
and cols_are_ordered
and not textual_ordered
and num_ctx_cols == len(cursor_description)
+ and not driver_column_names
):
self._keys = [elem[0] for elem in result_columns]
# pure positional 1-1 case; doesn't need to read
# most common case for Core and ORM
- # this metadata is safe to cache because we are guaranteed
+ # this metadata is safe to
+ # cache because we are guaranteed
# to have the columns in the same order for new executions
self._safe_for_cache = True
+
return [
(
idx,
if textual_ordered or (
ad_hoc_textual and len(cursor_description) == num_ctx_cols
):
- self._safe_for_cache = True
+ self._safe_for_cache = not driver_column_names
# textual positional case
raw_iterator = self._merge_textual_cols_by_position(
- context, cursor_description, result_columns
+ context,
+ cursor_description,
+ result_columns,
+ driver_column_names,
)
elif num_ctx_cols:
# compiled SQL with a mismatch of description cols
cursor_description,
result_columns,
loose_column_name_matching,
+ driver_column_names,
)
else:
# no compiled SQL, just a raw string, order of columns
# can change for "select *"
self._safe_for_cache = False
raw_iterator = self._merge_cols_by_none(
- context, cursor_description
+ context, cursor_description, driver_column_names
)
return [
) in raw_iterator
]
- def _colnames_from_description(self, context, cursor_description):
+ def _colnames_from_description(
+ self, context, cursor_description, driver_column_names
+ ):
"""Extract column names and data types from a cursor.description.
Applies unicode decoding, column translation, "normalization",
and case sensitivity rules to the names based on the dialect.
"""
-
dialect = context.dialect
translate_colname = context._translate_colname
normalize_name = (
dialect.normalize_name if dialect.requires_name_normalize else None
)
- untranslated = None
self._keys = []
+ untranslated = None
+
for idx, rec in enumerate(cursor_description):
- colname = rec[0]
+ colname = unnormalized = rec[0]
coltype = rec[1]
if translate_colname:
+ # a None here for "untranslated" means "the dialect did not
+ # change the column name and the untranslated case can be
+ # ignored". otherwise "untranslated" is expected to be the
+ # original, unchanged colname (e.g. is == to "unnormalized")
colname, untranslated = translate_colname(colname)
+ assert untranslated is None or untranslated == unnormalized
+
if normalize_name:
colname = normalize_name(colname)
- self._keys.append(colname)
+ if driver_column_names:
+ self._keys.append(unnormalized)
- yield idx, colname, untranslated, coltype
+ yield idx, colname, unnormalized, coltype
+
+ else:
+ self._keys.append(colname)
+
+ yield idx, colname, untranslated, coltype
def _merge_textual_cols_by_position(
- self, context, cursor_description, result_columns
+ self, context, cursor_description, result_columns, driver_column_names
):
num_ctx_cols = len(result_columns)
colname,
untranslated,
coltype,
- ) in self._colnames_from_description(context, cursor_description):
+ ) in self._colnames_from_description(
+ context, cursor_description, driver_column_names
+ ):
if idx < num_ctx_cols:
ctx_rec = result_columns[idx]
obj = ctx_rec[RM_OBJECTS]
cursor_description,
result_columns,
loose_column_name_matching,
+ driver_column_names,
):
match_map = self._create_description_match_map(
result_columns, loose_column_name_matching
colname,
untranslated,
coltype,
- ) in self._colnames_from_description(context, cursor_description):
+ ) in self._colnames_from_description(
+ context, cursor_description, driver_column_names
+ ):
try:
ctx_rec = match_map[colname]
except KeyError:
] = {}
for ridx, elem in enumerate(result_columns):
key = elem[RM_RENDERED_NAME]
+
if key in d:
# conflicting keyname - just add the column-linked objects
# to the existing record. if there is a duplicate column
)
return d
- def _merge_cols_by_none(self, context, cursor_description):
+ def _merge_cols_by_none(
+ self, context, cursor_description, driver_column_names
+ ):
for (
idx,
colname,
untranslated,
coltype,
- ) in self._colnames_from_description(context, cursor_description):
+ ) in self._colnames_from_description(
+ context, cursor_description, driver_column_names
+ ):
yield (
idx,
None,
self._metadata = self._no_result_metadata
def _init_metadata(self, context, cursor_description):
+ driver_column_names = context.execution_options.get(
+ "driver_column_names", False
+ )
if context.compiled:
compiled = context.compiled
- if compiled._cached_metadata:
+ metadata: CursorResultMetaData
+
+ if driver_column_names:
+ metadata = CursorResultMetaData(
+ self, cursor_description, driver_column_names=True
+ )
+ assert not metadata._safe_for_cache
+ elif compiled._cached_metadata:
metadata = compiled._cached_metadata
else:
metadata = CursorResultMetaData(self, cursor_description)
else:
self._metadata = metadata = CursorResultMetaData(
- self, cursor_description
+ self,
+ cursor_description,
+ driver_column_names=driver_column_names,
)
if self._echo:
context.connection._log_debug(
from ... import DateTime
from ... import func
from ... import Integer
+from ... import quoted_name
from ... import select
from ... import sql
from ... import String
eq_(row.somelabel, datetime.datetime(2006, 5, 12, 12, 0, 0))
+class NameDenormalizeTest(fixtures.TablesTest):
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ cls.tables.denormalize_table = Table(
+ "denormalize_table",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("all_lowercase", Integer),
+ Column("ALL_UPPERCASE", Integer),
+ Column("MixedCase", Integer),
+ Column(quoted_name("all_lowercase_quoted", quote=True), Integer),
+ Column(quoted_name("ALL_UPPERCASE_QUOTED", quote=True), Integer),
+ )
+
+ @classmethod
+ def insert_data(cls, connection):
+ connection.execute(
+ cls.tables.denormalize_table.insert(),
+ {
+ "id": 1,
+ "all_lowercase": 5,
+ "ALL_UPPERCASE": 6,
+ "MixedCase": 7,
+ "all_lowercase_quoted": 8,
+ "ALL_UPPERCASE_QUOTED": 9,
+ },
+ )
+
+ def _assert_row_mapping(self, row, mapping, include_cols=None):
+ eq_(row._mapping, mapping)
+
+ for k in mapping:
+ eq_(row._mapping[k], mapping[k])
+ eq_(getattr(row, k), mapping[k])
+
+ for idx, k in enumerate(mapping):
+ eq_(row[idx], mapping[k])
+
+ if include_cols:
+ for col, (idx, k) in zip(include_cols, enumerate(mapping)):
+ eq_(row._mapping[col], mapping[k])
+
+ @testing.variation(
+ "stmt_type", ["driver_sql", "text_star", "core_select", "text_cols"]
+ )
+ @testing.variation("use_driver_cols", [True, False])
+ def test_cols_driver_cols(self, connection, stmt_type, use_driver_cols):
+ if stmt_type.driver_sql or stmt_type.text_star or stmt_type.text_cols:
+ stmt = select("*").select_from(self.tables.denormalize_table)
+ text_stmt = str(stmt.compile(connection))
+
+ if stmt_type.text_star or stmt_type.text_cols:
+ stmt = text(text_stmt)
+
+ if stmt_type.text_cols:
+ stmt = stmt.columns(*self.tables.denormalize_table.c)
+ elif stmt_type.core_select:
+ stmt = select(self.tables.denormalize_table)
+ else:
+ stmt_type.fail()
+
+ if use_driver_cols:
+ execution_options = {"driver_column_names": True}
+ else:
+ execution_options = {}
+
+ if stmt_type.driver_sql:
+ row = connection.exec_driver_sql(
+ text_stmt, execution_options=execution_options
+ ).one()
+ else:
+ row = connection.execute(
+ stmt,
+ execution_options=execution_options,
+ ).one()
+
+ if (
+ stmt_type.core_select and not use_driver_cols
+ ) or not testing.requires.denormalized_names.enabled:
+ self._assert_row_mapping(
+ row,
+ {
+ "id": 1,
+ "all_lowercase": 5,
+ "ALL_UPPERCASE": 6,
+ "MixedCase": 7,
+ "all_lowercase_quoted": 8,
+ "ALL_UPPERCASE_QUOTED": 9,
+ },
+ )
+
+ if testing.requires.denormalized_names.enabled:
+ # with driver column names, raw cursor.description
+ # is used. this is clearly not useful for non-quoted names.
+ if use_driver_cols:
+ self._assert_row_mapping(
+ row,
+ {
+ "ID": 1,
+ "ALL_LOWERCASE": 5,
+ "ALL_UPPERCASE": 6,
+ "MixedCase": 7,
+ "all_lowercase_quoted": 8,
+ "ALL_UPPERCASE_QUOTED": 9,
+ },
+ )
+ else:
+ if stmt_type.core_select:
+ self._assert_row_mapping(
+ row,
+ {
+ "id": 1,
+ "all_lowercase": 5,
+ "ALL_UPPERCASE": 6,
+ "MixedCase": 7,
+ "all_lowercase_quoted": 8,
+ "ALL_UPPERCASE_QUOTED": 9,
+ },
+ include_cols=self.tables.denormalize_table.c,
+ )
+ else:
+ self._assert_row_mapping(
+ row,
+ {
+ "id": 1,
+ "all_lowercase": 5,
+ "all_uppercase": 6,
+ "MixedCase": 7,
+ "all_lowercase_quoted": 8,
+ "all_uppercase_quoted": 9,
+ },
+ include_cols=(
+ self.tables.denormalize_table.c
+ if stmt_type.text_cols
+ else None
+ ),
+ )
+
+ else:
+ self._assert_row_mapping(
+ row,
+ {
+ "id": 1,
+ "all_lowercase": 5,
+ "ALL_UPPERCASE": 6,
+ "MixedCase": 7,
+ "all_lowercase_quoted": 8,
+ "ALL_UPPERCASE_QUOTED": 9,
+ },
+ include_cols=(
+ self.tables.denormalize_table.c
+ if stmt_type.core_select or stmt_type.text_cols
+ else None
+ ),
+ )
+
+
class PercentSchemaNamesTest(fixtures.TablesTest):
"""tests using percent signs, spaces in table and column names.