def deprecated(
- version, message=None, add_deprecation_to_docstring=True, warning=None
+ version,
+ message=None,
+ add_deprecation_to_docstring=True,
+ warning=None,
+ enable_warnings=True,
):
"""Decorates a function and issues a deprecation warning on use.
def decorate(fn):
return _decorate_with_warning(
- fn, warning, message % dict(func=fn.__name__), version, header
+ fn,
+ warning,
+ message % dict(func=fn.__name__),
+ version,
+ header,
+ enable_warnings=enable_warnings,
)
return decorate
)
)
+ if ":attr:" in api_name:
+ attribute_ok = kw.pop("warn_on_attribute_access", False)
+ if not attribute_ok:
+ assert kw.get("enable_warnings") is False, (
+ "attribute %s will emit a warning on read access. "
+ "If you *really* want this, "
+ "add warn_on_attribute_access=True. Otherwise please add "
+ "enable_warnings=False." % api_name
+ )
+
if alternative:
message += " " + alternative
def _decorate_with_warning(
- func, wtype, message, version, docstring_header=None
+ func, wtype, message, version, docstring_header=None, enable_warnings=True
):
"""Wrap a function with a warnings.warn and augmented docstring."""
@decorator
def warned(fn, *args, **kwargs):
- skip_warning = kwargs.pop("_sa_skip_warning", False)
+ skip_warning = not enable_warnings or kwargs.pop(
+ "_sa_skip_warning", False
+ )
if not skip_warning:
_warn_with_version(message, version, wtype, stacklevel=3)
return fn(*args, **kwargs)
mapper(User, users)
s = create_session(bind=c)
s.begin()
- tran = s.transaction
+ tran = s._legacy_transaction()
s.add(User(name="first"))
s.flush()
c.exec_driver_sql("select * from users")
u = User(name="third")
s.add(u)
s.flush()
- assert s.transaction is tran
+ assert s._legacy_transaction() is tran
tran.close()
finally:
c.close()
sess.add(User(name="u2"))
t2.commit()
- assert sess.transaction is t1
+ assert sess._legacy_transaction() is t1
sess.close()
mapper(User, users)
session = create_session(autocommit=False)
session.add(User(name="ed"))
- session.transaction.commit()
+ session._legacy_transaction().commit()
- is_not(session.transaction, None)
+ is_not(session._legacy_transaction(), None)
def test_no_autocommit_with_explicit_commit_future(self):
User, users = self.classes.User, self.tables.users
mapper(User, users)
session = create_session(testing.db, autocommit=False, future=True)
session.add(User(name="ed"))
- session.transaction.commit()
+ session._legacy_transaction().commit()
# new in 1.4
- is_(session.transaction, None)
+ is_(session._legacy_transaction(), None)
@testing.requires.python2
@testing.requires.savepoints_w_release
sess.add(User(name="u2"))
t2.commit()
- assert sess.transaction is t1
+ assert sess._legacy_transaction() is t1
sess.close()
try:
with subtransaction_recipe(sess):
- assert sess.transaction
+ assert sess._legacy_transaction()
raise Exception("force rollback")
except:
pass
nested_trans = trans._connections[self.bind][1]
nested_trans._do_commit()
- is_(s.transaction, trans)
+ is_(s._legacy_transaction(), trans)
with expect_warnings("nested transaction already deassociated"):
# this previously would raise
assert u1 not in s.new
is_(trans._state, _session.CLOSED)
- is_not(s.transaction, trans)
- is_(s.transaction._state, _session.ACTIVE)
+ is_not(s._legacy_transaction(), trans)
+ is_(s._legacy_transaction()._state, _session.ACTIVE)
- is_(s.transaction.nested, False)
+ is_(s._legacy_transaction().nested, False)
- is_(s.transaction._parent, None)
+ is_(s._legacy_transaction()._parent, None)
class AccountingFlagsTest(_LocalFixture):
def test_explicit_begin(self):
s1 = Session(testing.db)
with s1.begin() as trans:
- is_(trans, s1.transaction)
+ is_(trans, s1._legacy_transaction())
s1.connection()
is_(s1._transaction, None)
# rolls back the whole transaction
s1.rollback()
- is_(s1.transaction, None)
+ is_(s1._legacy_transaction(), None)
eq_(s1.connection().scalar(select(func.count()).select_from(users)), 0)
s1.commit()
- is_(s1.transaction, None)
+ is_(s1._legacy_transaction(), None)
@testing.requires.savepoints
def test_old_rollback_is_local(self):
# rolls back only the savepoint
s1.rollback()
- is_(s1.transaction, t1)
+ is_(s1._legacy_transaction(), t1)
eq_(s1.connection().scalar(select(func.count()).select_from(users)), 1)
s1.commit()
eq_(s1.connection().scalar(select(func.count()).select_from(users)), 1)
- is_not(s1.transaction, None)
+ is_not(s1._legacy_transaction(), None)
def test_session_as_ctx_manager_one(self):
users = self.tables.users
with Session(testing.db) as sess:
- is_not(sess.transaction, None)
+ is_not(sess._legacy_transaction(), None)
sess.connection().execute(
users.insert().values(id=1, name="user1")
sess.connection().execute(users.select()).all(), [(1, "user1")]
)
- is_not(sess.transaction, None)
+ is_not(sess._legacy_transaction(), None)
- is_not(sess.transaction, None)
+ is_not(sess._legacy_transaction(), None)
# did not commit
eq_(sess.connection().execute(users.select()).all(), [])
users = self.tables.users
with Session(testing.db, future=True) as sess:
- is_(sess.transaction, None)
+ is_(sess._legacy_transaction(), None)
sess.connection().execute(
users.insert().values(id=1, name="user1")
sess.connection().execute(users.select()).all(), [(1, "user1")]
)
- is_not(sess.transaction, None)
+ is_not(sess._legacy_transaction(), None)
- is_(sess.transaction, None)
+ is_(sess._legacy_transaction(), None)
# did not commit
eq_(sess.connection().execute(users.select()).all(), [])
try:
with Session(testing.db) as sess:
- is_not(sess.transaction, None)
+ is_not(sess._legacy_transaction(), None)
sess.connection().execute(
users.insert().values(id=1, name="user1")
raise Exception("force rollback")
except:
pass
- is_not(sess.transaction, None)
+ is_not(sess._legacy_transaction(), None)
def test_session_as_ctx_manager_two_future(self):
users = self.tables.users
try:
with Session(testing.db, future=True) as sess:
- is_(sess.transaction, None)
+ is_(sess._legacy_transaction(), None)
sess.connection().execute(
users.insert().values(id=1, name="user1")
raise Exception("force rollback")
except:
pass
- is_(sess.transaction, None)
+ is_(sess._legacy_transaction(), None)
def test_begin_context_manager(self):
users = self.tables.users
assert "t" not in inspect(testing.db).get_table_names()
+ def test_bind_arg_text(self):
+ with testing.expect_deprecated_20(
+ "The text.bind argument is deprecated and will be "
+ "removed in SQLAlchemy 2.0."
+ ):
+ t1 = text("ASdf", bind=testing.db)
+
+ # no warnings emitted
+ is_(t1.bind, testing.db)
+ eq_(str(t1), "ASdf")
+
+ def test_bind_arg_function(self):
+ with testing.expect_deprecated_20(
+ "The text.bind argument is deprecated and will be "
+ "removed in SQLAlchemy 2.0."
+ ):
+ f1 = func.foobar(bind=testing.db)
+
+ # no warnings emitted
+ is_(f1.bind, testing.db)
+ eq_(str(f1), "foobar()")
+
+ def test_bind_arg_select(self):
+ with testing.expect_deprecated_20(
+ "The select.bind argument is deprecated and will be "
+ "removed in SQLAlchemy 2.0."
+ ):
+ s1 = select([column("q")], bind=testing.db)
+
+ # no warnings emitted
+ is_(s1.bind, testing.db)
+ eq_(str(s1), "SELECT q")
+
+ def test_bind_attr_join_no_warning(self):
+ t1 = table("t1", column("a"))
+ t2 = table("t2", column("b"))
+ j1 = join(t1, t2, t1.c.a == t2.c.b)
+
+ # no warnings emitted
+ is_(j1.bind, None)
+ eq_(str(j1), "t1 JOIN t2 ON t1.a = t2.b")
+
class DeprecationWarningsTest(fixtures.TestBase, AssertsCompiledSQL):
__backend__ = True