--- /dev/null
+.. change::
+ :tags: engine, bug, regression
+ :tickets: 6427
+
+ Established a deprecation path for calling upon the
+ :meth:`_cursor.CursorResult.keys` method for a statement that returns no
+ rows to provide support for legacy patterns used by the "records" package
+ as well as any other non-migrated applications. Previously, this would
+ raise :class:`.ResourceClosedException` unconditionally in the same way as
+ it does when attempting to fetch rows. While this is the correct behavior
+ going forward, the :class:`_cursor.LegacyCursorResult` object will now in
+ this case return an empty list for ``.keys()`` as it did in 1.3, while also
+ emitting a 2.0 deprecation warning. The :class:`_cursor.CursorResult`, used
+ when using a 2.0-style "future" engine, will continue to raise as it does
+ now.
self._we_dont_return_rows()
+class _LegacyNoResultMetaData(_NoResultMetaData):
+ @property
+ def keys(self):
+ util.warn_deprecated_20(
+ "Calling the .keys() method on a result set that does not return "
+ "rows is deprecated and will raise ResourceClosedError in "
+ "SQLAlchemy 2.0.",
+ )
+ return []
+
+
_NO_RESULT_METADATA = _NoResultMetaData()
+_LEGACY_NO_RESULT_METADATA = _LegacyNoResultMetaData()
class BaseCursorResult(object):
self._set_memoized_attribute("_row_getter", make_row)
else:
- self._metadata = _NO_RESULT_METADATA
+ self._metadata = self._no_result_metadata
def _init_metadata(self, context, cursor_description):
_cursor_metadata = CursorResultMetaData
_cursor_strategy_cls = CursorFetchStrategy
+ _no_result_metadata = _NO_RESULT_METADATA
def _fetchiter_impl(self):
fetchone = self.cursor_strategy.fetchone
_cursor_metadata = LegacyCursorResultMetaData
_cursor_strategy_cls = CursorFetchStrategy
+ _no_result_metadata = _LEGACY_NO_RESULT_METADATA
+
def close(self):
"""Close this :class:`_engine.LegacyCursorResult`.
"This result object does not return rows.",
result.fetchone,
)
- assert_raises_message(
- exc.ResourceClosedError,
- "This result object does not return rows.",
- result.keys,
- )
+
+ with testing.expect_deprecated_20(
+ r"Calling the .keys\(\) method on a result set that does not "
+ ):
+ eq_(result.keys(), [])
class CursorResultTest(fixtures.TablesTest):
):
eq_(r._mapping[users.c.user_name], "jack")
+ def test_keys_no_rows(self, connection):
+
+ for i in range(2):
+ r = connection.execute(
+ text("update users set user_name='new' where user_id=10")
+ )
+
+ with testing.expect_deprecated(
+ r"Calling the .keys\(\) method on a result set that does not "
+ r"return rows is deprecated and will raise "
+ r"ResourceClosedError in SQLAlchemy 2.0."
+ ):
+ list_ = r.keys()
+ eq_(list_, [])
+ list_.append("Don't cache me")
+
def test_column_accessor_basic_text(self, connection):
users = self.tables.users
lambda: r._mapping["foo"],
)
- def test_graceful_fetch_on_non_rows(self):
+ @testing.combinations(
+ (True,),
+ (False,),
+ argnames="future",
+ )
+ def test_graceful_fetch_on_non_rows(self, future):
"""test that calling fetchone() etc. on a result that doesn't
return rows fails gracefully.
users = self.tables.users
conn = testing.db.connect()
+ if future:
+ conn = conn.execution_options(future_result=True)
+ keys_lambda = lambda r: r.keys() # noqa: E731
+
for meth in [
lambda r: r.fetchone(),
lambda r: r.fetchall(),
lambda r: r.scalar(),
lambda r: r.fetchmany(),
lambda r: r._getter("user"),
- lambda r: r.keys(),
+ keys_lambda,
lambda r: r.columns("user"),
lambda r: r.cursor_strategy.fetchone(r, r.cursor),
]:
trans = conn.begin()
result = conn.execute(users.insert(), dict(user_id=1))
- assert_raises_message(
- exc.ResourceClosedError,
- "This result object does not return rows. "
- "It has been closed automatically.",
- meth,
- result,
- )
+
+ if not future and meth is keys_lambda:
+ with testing.expect_deprecated(
+ r"Calling the .keys\(\) method on a result set that does "
+ r"not return rows is deprecated"
+ ):
+ eq_(meth(result), [])
+ else:
+ assert_raises_message(
+ exc.ResourceClosedError,
+ "This result object does not return rows. "
+ "It has been closed automatically.",
+ meth,
+ result,
+ )
trans.rollback()
def test_fetchone_til_end(self, connection):