--- /dev/null
+.. change::
+ :tags: bug, engine
+ :tickets: 9427
+
+ Fixed issue where :meth:`.Result.freeze` would lose track of ambiguous
+ column names present in the original :class:`.CursorResult`, causing
+ key-based access on the thawed result to silently return a value instead of
+ raising :class:`.InvalidRequestError`. The
+ :class:`.SimpleResultMetaData` now accepts and propagates ambiguous key
+ information so that frozen, thawed, and pickled results raise consistently
+ for duplicate column names. Pull request courtesy Saurabh Kohli.
return key in self._keymap
def _for_freeze(self) -> ResultMetaData:
+ ambiguous = {
+ rec[MD_LOOKUP_KEY]
+ for rec in self._keymap.values()
+ if rec[MD_INDEX] is None
+ }
return SimpleResultMetaData(
self._keys,
extra=[self._keymap[key][MD_OBJECTS] for key in self._keys],
+ _ambiguous_keys=frozenset(ambiguous) if ambiguous else None,
)
def _make_new_metadata(
"_translated_indexes",
"_create_unique_filters",
"_key_to_index",
+ "_ambiguous_keys",
)
_keys: Sequence[str]
Sequence[Optional[Callable[[Any], Any]]],
]
] = None,
+ _ambiguous_keys: Optional[frozenset[str]] = None,
):
self._keys = list(keys)
self._tuplefilter = _tuplefilter
self._keymap = {key: rec for keys, rec in recs_names for key in keys}
+ if _ambiguous_keys:
+ for name in _ambiguous_keys.intersection(self._keymap):
+ rec = self._keymap[name]
+ self._keymap[name] = (None,) + rec[1:]
+
self._processors = _processors
+ self._ambiguous_keys = _ambiguous_keys
+
self._key_to_index = self._make_key_to_index(self._keymap, 0)
def _has_key(self, key: object) -> bool:
self._keys,
extra=[self._keymap[key][2] for key in self._keys],
_create_unique_filters=create_unique_filters,
+ _ambiguous_keys=self._ambiguous_keys,
)
def __getstate__(self) -> Dict[str, Any]:
return {
"_keys": self._keys,
"_translated_indexes": self._translated_indexes,
+ "_ambiguous_keys": self._ambiguous_keys,
}
def __setstate__(self, state: Dict[str, Any]) -> None:
state["_keys"],
_translated_indexes=_translated_indexes,
_tuplefilter=_tuplefilter,
+ _ambiguous_keys=state.get("_ambiguous_keys"),
)
def _index_for_key(self, key: Any, raiseerr: bool = True) -> int:
except KeyError as ke:
rec = self._key_fallback(key, ke, raiseerr)
+ if rec[0] is None:
+ self._raise_for_ambiguous_column_name(rec)
return rec[0] # type: ignore[no-any-return]
+ def _raise_for_ambiguous_column_name(
+ self, rec: _KeyMapRecType
+ ) -> NoReturn:
+ raise exc.InvalidRequestError(
+ "Ambiguous column name '%s' in "
+ "result set column descriptions" % rec[1]
+ )
+
def _indexes_for_keys(self, keys: Sequence[Any]) -> Sequence[int]:
+ # only used by the ORM with Column objects; does not need
+ # ambiguous column name support
return [self._keymap[key][0] for key in keys]
def _metadata_for_keys(
except KeyError as ke:
rec = self._key_fallback(key, ke, True)
+ if rec[0] is None:
+ self._raise_for_ambiguous_column_name(rec)
+
yield rec
def _reduce(self, keys: Sequence[Any]) -> ResultMetaData:
r2 = frozen().scalars(1).unique()
eq_(r2.fetchall(), [1, 3])
+ def test_ambiguous_key_raises(self):
+ meta = result.SimpleResultMetaData(
+ ["x", "x"], _ambiguous_keys=frozenset(["x"])
+ )
+ res_obj = result.IteratorResult(meta, iter([(4, 2)]))
+ row = res_obj.fetchone()
+
+ eq_(row[0], 4)
+ eq_(row[1], 2)
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name 'x'",
+ lambda: row._mapping["x"],
+ )
+
class MergeResultTest(fixtures.TestBase):
@testing.fixture
)
@testing.requires.duplicate_names_in_cursor_description
- def test_ambiguous_column(self, connection):
+ @testing.variation(
+ "scenario", ["plain", "pickled", "frozen", "frozen_pickled"]
+ )
+ def test_ambiguous_column(self, connection, scenario):
users = self.tables.users
addresses = self.tables.addresses
.select()
.set_label_style(LABEL_STYLE_NONE)
)
+
+ if scenario.frozen or scenario.frozen_pickled:
+ frozen = result.freeze()
+ result = frozen()
+
r = result.first()
+ if scenario.pickled or scenario.frozen_pickled:
+ r = pickle.loads(pickle.dumps(r))
+
assert_raises_message(
exc.InvalidRequestError,
"Ambiguous column name",
lambda: r._mapping["user_id"],
)
+ @testing.requires.duplicate_names_in_cursor_description
+ def test_ambiguous_column_getter(self, connection):
+ users = self.tables.users
+ addresses = self.tables.addresses
+
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
+ result = connection.execute(
+ users.outerjoin(addresses)
+ .select()
+ .set_label_style(LABEL_STYLE_NONE)
+ )
+ r = result.first()
+
assert_raises_message(
exc.InvalidRequestError,
"Ambiguous column name",
eq_(r._mapping[users.c.user_id], 1)
eq_(r._mapping[addresses.c.user_id], None)
- # try to trick it - fake_table isn't in the result!
- # we get the correct error
+ @testing.requires.duplicate_names_in_cursor_description
+ def test_ambiguous_column_fake_table(self, connection):
+ users = self.tables.users
+ addresses = self.tables.addresses
+
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
+ result = connection.execute(
+ users.outerjoin(addresses)
+ .select()
+ .set_label_style(LABEL_STYLE_NONE)
+ )
+ r = result.first()
+
fake_table = Table("fake", MetaData(), Column("user_id", Integer))
assert_raises_message(
exc.InvalidRequestError,
lambda: r._mapping[fake_table.c.user_id],
)
- r = pickle.loads(pickle.dumps(r))
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: r._mapping["user_id"],
- )
-
@testing.requires.duplicate_names_in_cursor_description
def test_ambiguous_column_by_col(self, connection):
users = self.tables.users