--- /dev/null
+.. change::
+ :tags: bug, sql
+ :tickets: 5754
+
+ Deprecation warnings are emitted under "SQLALCHEMY_WARN_20" mode when
+ passing a plain string to :meth:`_orm.Session.execute`.
+
_coerce_consts = True
def _text_coercion(self, element, argname=None):
+ # see #5754 for why we can't easily deprecate this coercion.
+ # essentially expressions like postgresql_where would have to be
+ # text() as they come back from reflection and we don't want to
+ # have text() elements wired into the inspection dictionaries.
return elements.TextClause(element)
class CoerceTextStatementImpl(_CoerceLiterals, RoleImpl):
__slots__ = ()
- def _dont_literal_coercion(self, element, **kw):
- if callable(element) and hasattr(element, "__code__"):
- return lambdas.StatementLambdaElement(
- element,
- self._role_class,
- additional_cache_criteria=kw.get(
- "additional_cache_criteria", ()
- ),
- tracked=kw["tra"],
- )
- else:
- return super(CoerceTextStatementImpl, self)._literal_coercion(
- element, **kw
- )
-
def _implicit_coercions(
self, original_element, resolved, argname=None, **kw
):
)
def _text_coercion(self, element, argname=None):
- # TODO: this should emit deprecation warning,
- # see deprecation warning in engine/base.py execute()
+ util.warn_deprecated_20(
+ "Using plain strings to indicate SQL statements without using "
+ "the text() construct is "
+ "deprecated and will be removed in version 2.0. Ensure plain "
+ "SQL statements are passed using the text() construct."
+ )
return elements.TextClause(element)
idx3 = Index(
"test_idx2",
tbl.c.data,
- postgresql_where="data > 'a' AND data < 'b''s'",
+ postgresql_where=text("data > 'a' AND data < 'b''s'"),
)
self.assert_compile(
schema.CreateIndex(idx3),
"""
from sqlalchemy import testing
+from sqlalchemy import text
from sqlalchemy.orm import attributes
from sqlalchemy.orm import backref
from sqlalchemy.orm import mapper
eq_(i1.keywords, [k1, k2])
# prove it didn't flush
- eq_(session.scalar("select count(*) from item_keywords"), 1)
+ eq_(session.scalar(text("select count(*) from item_keywords")), 1)
# the pending collection was removed
assert (
):
s1.transaction
+ def test_textual_execute(self, connection):
+ """test that Session.execute() converts to text()"""
+
+ users = self.tables.users
+
+ with Session(bind=connection) as sess:
+ sess.execute(users.insert(), dict(id=7, name="jack"))
+
+ with testing.expect_deprecated_20(
+ "Using plain strings to indicate SQL statements "
+ "without using the text"
+ ):
+ # use :bindparam style
+ eq_(
+ sess.execute(
+ "select * from users where id=:id", {"id": 7}
+ ).fetchall(),
+ [(7, "jack")],
+ )
+
+ with testing.expect_deprecated_20(
+ "Using plain strings to indicate SQL statements "
+ "without using the text"
+ ):
+ # use :bindparam style
+ eq_(sess.scalar("select id from users where id=:id", {"id": 7}), 7)
+
def test_session_str(self):
s1 = Session(testing.db)
str(s1)
p1.place_id
p1.transitions
- sess.execute("delete from place_input", mapper=Place)
+ sess.execute(place_input.delete(), mapper=Place)
p1.place_id = 7
assert_raises_message(
p1.place_id
p1.transitions
- sess.execute("delete from place_input", mapper=Place)
+ sess.execute(place_input.delete(), mapper=Place)
p1.transitions.remove(t1)
assert_raises_message(
orm_exc.StaleDataError,
from sqlalchemy import Sequence
from sqlalchemy import String
from sqlalchemy import testing
+from sqlalchemy import text
from sqlalchemy.orm import attributes
from sqlalchemy.orm import backref
from sqlalchemy.orm import close_all_sessions
finally:
seq.drop(connection)
- def test_textual_execute(self, connection):
- """test that Session.execute() converts to text()"""
-
- users = self.tables.users
-
- with Session(bind=connection) as sess:
- sess.execute(users.insert(), dict(id=7, name="jack"))
-
- # use :bindparam style
- eq_(
- sess.execute(
- "select * from users where id=:id", {"id": 7}
- ).fetchall(),
- [(7, "jack")],
- )
-
- # use :bindparam style
- eq_(sess.scalar("select id from users where id=:id", {"id": 7}), 7)
def test_parameter_execute(self):
users = self.tables.users
User, users = self.classes.User, self.tables.users
mapper(User, users)
- try:
- sess = create_session(autocommit=False, autoflush=True)
+ with create_session(autocommit=False, autoflush=True) as sess:
u = User()
u.name = "ed"
sess.add(u)
u2 = sess.query(User).filter_by(name="ed").one()
- assert u2 is u
- assert (
+ is_(u2, u)
+ eq_(
sess.execute(
- "select count(1) from users",
+ text("select count(1) from users"),
bind_arguments=dict(mapper=User),
- ).scalar()
- == 1
+ ).scalar(),
+ 1,
)
- assert (
+ eq_(
testing.db.connect()
.exec_driver_sql("select count(1) from users")
- .scalar()
- == 0
+ .scalar(),
+ 0,
)
sess.commit()
- assert (
+ eq_(
sess.execute(
- "select count(1) from users",
+ text("select count(1) from users"),
bind_arguments=dict(mapper=User),
- ).scalar()
- == 1
+ ).scalar(),
+ 1,
)
- assert (
+ eq_(
testing.db.connect()
.exec_driver_sql("select count(1) from users")
- .scalar()
- == 1
+ .scalar(),
+ 1,
)
- sess.close()
- except Exception:
- sess.rollback()
- raise
@engines.close_open_connections
def test_autoflush_2(self):
raises_("connection", bind_arguments=dict(mapper=user_arg))
- raises_("execute", "SELECT 1", bind_arguments=dict(mapper=user_arg))
+ raises_(
+ "execute", text("SELECT 1"), bind_arguments=dict(mapper=user_arg)
+ )
raises_("get_bind", mapper=user_arg)
- raises_("scalar", "SELECT 1", bind_arguments=dict(mapper=user_arg))
+ raises_(
+ "scalar", text("SELECT 1"), bind_arguments=dict(mapper=user_arg)
+ )
eq_(
watchdog,
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
+from sqlalchemy import text
from sqlalchemy.future import Engine
from sqlalchemy.orm import attributes
from sqlalchemy.orm import create_session
s1 = Session(eng)
- assert_raises_message(Exception, "failure", s1.execute, "select 1")
+ assert_raises_message(
+ Exception, "failure", s1.execute, text("select 1")
+ )
conn, fairy = state[0]
assert not fairy.is_valid
s1 = Session(eng)
s1.begin_nested()
- assert_raises_message(Exception, "failure", s1.execute, "select 1")
+ assert_raises_message(
+ Exception, "failure", s1.execute, text("select 1")
+ )
conn, fairy = state[0]
assert fairy.is_valid
@event.listens_for(sess, "after_commit")
def go(session):
- session.execute("select 1")
+ session.execute(text("select 1"))
assert_raises_message(
sa_exc.InvalidRequestError,
"This session is in 'prepared' state; no further "
"SQL can be emitted within this transaction.",
sess.execute,
- "select 1",
+ text("select 1"),
)
def test_no_sql_during_rollback(self):
@event.listens_for(sess, "after_rollback")
def go(session):
- session.execute("select 1")
+ session.execute(text("select 1"))
assert_raises_message(
sa_exc.InvalidRequestError,
conn = mock.Mock(engine=bind)
bind.connect = mock.Mock(return_value=conn)
sess = Session(bind=bind)
- sess.execute("select 1")
+ sess.execute(text("select 1"))
with expect_warnings(
"Connection is already established for the "
"given bind; execution_options ignored"
)
def test_statement_text_coercion(self):
- is_true(
- expect(
- roles.CoerceTextStatementRole, "select * from table"
- ).compare(text("select * from table"))
- )
+ with testing.expect_deprecated_20(
+ "Using plain strings to indicate SQL statements"
+ ):
+ is_true(
+ expect(
+ roles.CoerceTextStatementRole, "select * from table"
+ ).compare(text("select * from table"))
+ )
def test_select_statement_no_text_coercion(self):
assert_raises_message(