in particular text(bind), DDL.execute().
Change-Id: Ie85ae9f61219182f5649f68e5f52b4923843199c
"_expression": "sqlalchemy.sql.expression",
"_sql": "sqlalchemy.sql.expression",
"_dml": "sqlalchemy.sql.expression",
+ "_ddl": "sqlalchemy.schema",
"_functions": "sqlalchemy.sql.functions",
"_pool": "sqlalchemy.pool",
"_event": "sqlalchemy.event",
:param statement: :class:`_expression.ClauseElement` to be compiled.
- :param bind: Optional Engine or Connection to compile this
- statement against.
-
:param schema_translate_map: dictionary of schema names to be
translated when forming the resultant SQL
self, multiparams, params, execution_options
)
+ @util.deprecated_20(
+ ":meth:`.DDL.execute`",
+ alternative="All statement execution in SQLAlchemy 2.0 is performed "
+ "by the :meth:`_engine.Connection.execute` method of "
+ ":class:`_engine.Connection`, "
+ "or in the ORM by the :meth:`.Session.execute` method of "
+ ":class:`.Session`.",
+ )
def execute(self, bind=None, target=None):
"""Execute this DDL immediately.
@_generative
def execute_if(self, dialect=None, callable_=None, state=None):
r"""Return a callable that will execute this
- DDLElement conditionally.
+ :class:`_ddl.DDLElement` conditionally within an event handler.
Used to provide a wrapper for event listening::
__visit_name__ = "ddl"
+ @util.deprecated_params(
+ bind=(
+ "2.0",
+ "The :paramref:`_ddl.DDL.bind` argument is deprecated and "
+ "will be removed in SQLAlchemy 2.0.",
+ ),
+ )
def __init__(self, statement, context=None, bind=None):
"""Create a DDL statement.
@classmethod
@_document_text_coercion("text", ":func:`.text`", ":paramref:`.text.text`")
+ @util.deprecated_params(
+ bind=(
+ "2.0",
+ "The :paramref:`_sql.text.bind` argument is deprecated and "
+ "will be removed in SQLAlchemy 2.0.",
+ ),
+ )
def _create_text(cls, text, bind=None):
r"""Construct a new :class:`_expression.TextClause` clause,
representing
("type", InternalTraversal.dp_type),
]
+ @util.deprecated_params(
+ bind=(
+ "2.0",
+ "The :paramref:`_sql.text.bind` argument is deprecated and "
+ "will be removed in SQLAlchemy 2.0.",
+ ),
+ )
def __init__(self, name, *clauses, **kw):
"""Construct a :class:`.Function`.
"""
self.packagenames = kw.pop("packagenames", None) or ()
self.name = name
- self._bind = kw.get("bind", None)
+
+ self._bind = self._get_bind(kw)
self.type = sqltypes.to_instance(kw.get("type_", None))
FunctionElement.__init__(self, *clauses, **kw)
+ def _get_bind(self, kw):
+ if "bind" in kw:
+ util.warn_deprecated_20(
+ "The Function.bind argument is deprecated and "
+ "will be removed in SQLAlchemy 2.0.",
+ )
+ return kw["bind"]
+
def _bind_param(self, operator, obj, type_=None):
return BindParameter(
self.name,
]
self._has_args = self._has_args or bool(parsed_args)
self.packagenames = ()
- self._bind = kwargs.get("bind", None)
+ self._bind = self._get_bind(kwargs)
self.clause_expr = ClauseList(
operator=operators.comma_op, group_contents=True, *parsed_args
).self_group()
assert isinstance(
seq, schema.Sequence
), "next_value() accepts a Sequence object as input."
- self._bind = kw.get("bind", None)
+ self._bind = self._get_bind(kw)
self.sequence = seq
def compare(self, other, **kw):
A :class:`.Connectable` used to access the database; if None, uses
the existing bind on this ``MetaData``, if any.
+ .. note:: the "bind" argument will be required in
+ SQLAlchemy 2.0.
+
:param schema:
Optional, query and reflect tables from an alternate schema.
If None, the schema associated with this :class:`_schema.MetaData`
_fetch_clause_options = None
_for_update_arg = None
+ @util.deprecated_params(
+ bind=(
+ "2.0",
+ "The :paramref:`_sql.select.bind` argument is deprecated and "
+ "will be removed in SQLAlchemy 2.0.",
+ ),
+ )
def __init__(
self,
_label_style=LABEL_STYLE_NONE,
r"The MetaData.bind argument is deprecated",
r"The ``bind`` argument for schema methods that invoke SQL ",
r"The Executable.bind attribute is considered legacy ",
+ r"The Function.bind argument",
+ r"The select.bind argument",
#
# result sets
#
+++ /dev/null
-"""tests the "bind" attribute/argument across schema and SQL,
-including the deprecated versions of these arguments"""
-
-import sqlalchemy as sa
-from sqlalchemy import engine
-from sqlalchemy import exc
-from sqlalchemy import inspect
-from sqlalchemy import Integer
-from sqlalchemy import MetaData
-from sqlalchemy import testing
-from sqlalchemy import text
-from sqlalchemy import ThreadLocalMetaData
-from sqlalchemy.testing import assert_raises
-from sqlalchemy.testing import assert_raises_message
-from sqlalchemy.testing import fixtures
-from sqlalchemy.testing import is_false
-from sqlalchemy.testing import is_true
-from sqlalchemy.testing.schema import Column
-from sqlalchemy.testing.schema import Table
-
-
-class BindTest(fixtures.TestBase):
- def test_bind_close_engine(self):
- e = testing.db
- with e.connect() as conn:
- assert not conn.closed
- assert conn.closed
-
- def test_create_drop_explicit(self):
- metadata = MetaData()
- table = Table("test_table", metadata, Column("foo", Integer))
- for bind in (testing.db, testing.db.connect()):
- for args in [([], {"bind": bind}), ([bind], {})]:
- metadata.create_all(*args[0], **args[1])
- is_true(inspect(bind).has_table(table.name))
- metadata.drop_all(*args[0], **args[1])
- table.create(*args[0], **args[1])
- table.drop(*args[0], **args[1])
- is_false(inspect(bind).has_table(table.name))
-
- def test_create_drop_err_metadata(self):
- metadata = MetaData()
- Table("test_table", metadata, Column("foo", Integer))
- for meth in [metadata.create_all, metadata.drop_all]:
- assert_raises_message(
- exc.UnboundExecutionError,
- "MetaData object is not bound to an Engine or Connection.",
- meth,
- )
-
- def test_create_drop_err_table(self):
- metadata = MetaData()
- table = Table("test_table", metadata, Column("foo", Integer))
-
- for meth in [table.create, table.drop]:
- assert_raises_message(
- exc.UnboundExecutionError,
- (
- "Table object 'test_table' is not bound to an Engine or "
- "Connection."
- ),
- meth,
- )
-
- @testing.uses_deprecated()
- def test_create_drop_bound(self):
-
- for meta in (MetaData, ThreadLocalMetaData):
- for bind in (testing.db, testing.db.connect()):
- metadata = meta()
- table = Table("test_table", metadata, Column("foo", Integer))
- metadata.bind = bind
- assert metadata.bind is table.bind is bind
- metadata.create_all()
- assert table.exists()
- metadata.drop_all()
- table.create()
- table.drop()
- assert not table.exists()
-
- metadata = meta()
- table = Table("test_table", metadata, Column("foo", Integer))
-
- metadata.bind = bind
-
- assert metadata.bind is table.bind is bind
- metadata.create_all()
- assert table.exists()
- metadata.drop_all()
- table.create()
- table.drop()
- assert not table.exists()
- if isinstance(bind, engine.Connection):
- bind.close()
-
- def test_create_drop_constructor_bound(self):
- for bind in (testing.db, testing.db.connect()):
- try:
- for args in (([bind], {}), ([], {"bind": bind})):
- metadata = MetaData(*args[0], **args[1])
- table = Table(
- "test_table", metadata, Column("foo", Integer)
- )
- assert metadata.bind is table.bind is bind
- metadata.create_all()
- is_true(inspect(bind).has_table(table.name))
- metadata.drop_all()
- table.create()
- table.drop()
- is_false(inspect(bind).has_table(table.name))
- finally:
- if isinstance(bind, engine.Connection):
- bind.close()
-
- def test_implicit_execution(self):
- metadata = MetaData()
- table = Table(
- "test_table",
- metadata,
- Column("foo", Integer),
- test_needs_acid=True,
- )
- conn = testing.db.connect()
- metadata.create_all(bind=conn)
- try:
- trans = conn.begin()
- metadata.bind = conn
- t = table.insert()
- assert t.bind is conn
- table.insert().execute(foo=5)
- table.insert().execute(foo=6)
- table.insert().execute(foo=7)
- trans.rollback()
- metadata.bind = None
- assert (
- conn.exec_driver_sql(
- "select count(*) from test_table"
- ).scalar()
- == 0
- )
- finally:
- metadata.drop_all(bind=conn)
-
- def test_clauseelement(self):
- metadata = MetaData()
- table = Table("test_table", metadata, Column("foo", Integer))
- metadata.create_all(bind=testing.db)
- try:
- for elem in [
- table.select,
- lambda **kwargs: sa.func.current_timestamp(**kwargs).select(),
- # func.current_timestamp().select,
- lambda **kwargs: text("select * from test_table", **kwargs),
- ]:
- for bind in (testing.db, testing.db.connect()):
- try:
- e = elem(bind=bind)
- assert e.bind is bind
- e.execute().close()
- finally:
- if isinstance(bind, engine.Connection):
- bind.close()
-
- e = elem()
- assert e.bind is None
- assert_raises(exc.UnboundExecutionError, e.execute)
- finally:
- if isinstance(bind, engine.Connection):
- bind.close()
- metadata.drop_all(bind=testing.db)
from sqlalchemy.schema import CheckConstraint
from sqlalchemy.schema import DDL
from sqlalchemy.schema import DropConstraint
-from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
ddl = DDL("SELECT 1")
for spec in (
- (engine.execute, ddl),
- (engine.execute, ddl, table),
(cx.execute, ddl),
(cx.execute, ddl, table),
- (ddl.execute, engine),
- (ddl.execute, engine, table),
- (ddl.execute, cx),
- (ddl.execute, cx, table),
):
fn = spec[0]
arg = spec[1:]
r = fn(*arg)
eq_(list(r), [(1,)])
- for fn, kw in ((ddl.execute, {}), (ddl.execute, dict(target=table))):
- assert_raises(tsa.exc.UnboundExecutionError, fn, **kw)
-
- for bind in engine, cx:
- ddl.bind = bind
- for fn, kw in (
- (ddl.execute, {}),
- (ddl.execute, dict(target=table)),
- ):
- r = fn(**kw)
- eq_(list(r), [(1,)])
-
def test_platform_escape(self):
"""test the escaping of % characters in the DDL construct."""
import re
import sqlalchemy as tsa
+import sqlalchemy as sa
from sqlalchemy import create_engine
+from sqlalchemy import DDL
+from sqlalchemy import engine
from sqlalchemy import event
+from sqlalchemy import exc
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import inspect
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import text
+from sqlalchemy import ThreadLocalMetaData
from sqlalchemy import VARCHAR
from sqlalchemy.engine import reflection
from sqlalchemy.engine.base import Connection
with inspector._operation_context() as conn:
is_instance_of(conn, Connection)
+ def test_bind_close_engine(self):
+ e = testing.db
+ with e.connect() as conn:
+ assert not conn.closed
+ assert conn.closed
+
+ def test_bind_create_drop_err_metadata(self):
+ metadata = MetaData()
+ Table("test_table", metadata, Column("foo", Integer))
+ for meth in [metadata.create_all, metadata.drop_all]:
+ assert_raises_message(
+ exc.UnboundExecutionError,
+ "MetaData object is not bound to an Engine or Connection.",
+ meth,
+ )
+
+ def test_bind_create_drop_err_table(self):
+ metadata = MetaData()
+ table = Table("test_table", metadata, Column("foo", Integer))
+
+ for meth in [table.create, table.drop]:
+ assert_raises_message(
+ exc.UnboundExecutionError,
+ (
+ "Table object 'test_table' is not bound to an Engine or "
+ "Connection."
+ ),
+ meth,
+ )
+
+ def test_bind_create_drop_bound(self):
+
+ for meta in (MetaData, ThreadLocalMetaData):
+ for bind in (testing.db, testing.db.connect()):
+ if meta is ThreadLocalMetaData:
+ with testing.expect_deprecated(
+ "ThreadLocalMetaData is deprecated"
+ ):
+ metadata = meta()
+ else:
+ metadata = meta()
+ table = Table("test_table", metadata, Column("foo", Integer))
+ metadata.bind = bind
+ assert metadata.bind is table.bind is bind
+ metadata.create_all()
+
+ with testing.expect_deprecated(
+ r"The Table.exists\(\) method is deprecated and will "
+ "be removed in a future release."
+ ):
+ assert table.exists()
+ metadata.drop_all()
+ table.create()
+ table.drop()
+ with testing.expect_deprecated(
+ r"The Table.exists\(\) method is deprecated and will "
+ "be removed in a future release."
+ ):
+ assert not table.exists()
+
+ if meta is ThreadLocalMetaData:
+ with testing.expect_deprecated(
+ "ThreadLocalMetaData is deprecated"
+ ):
+ metadata = meta()
+ else:
+ metadata = meta()
+
+ table = Table("test_table", metadata, Column("foo", Integer))
+
+ metadata.bind = bind
+
+ assert metadata.bind is table.bind is bind
+ metadata.create_all()
+ with testing.expect_deprecated(
+ r"The Table.exists\(\) method is deprecated and will "
+ "be removed in a future release."
+ ):
+ assert table.exists()
+ metadata.drop_all()
+ table.create()
+ table.drop()
+ with testing.expect_deprecated(
+ r"The Table.exists\(\) method is deprecated and will "
+ "be removed in a future release."
+ ):
+ assert not table.exists()
+ if isinstance(bind, engine.Connection):
+ bind.close()
+
+ def test_bind_create_drop_constructor_bound(self):
+ for bind in (testing.db, testing.db.connect()):
+ try:
+ for args in (([bind], {}), ([], {"bind": bind})):
+ metadata = MetaData(*args[0], **args[1])
+ table = Table(
+ "test_table", metadata, Column("foo", Integer)
+ )
+ assert metadata.bind is table.bind is bind
+ metadata.create_all()
+ is_true(inspect(bind).has_table(table.name))
+ metadata.drop_all()
+ table.create()
+ table.drop()
+ is_false(inspect(bind).has_table(table.name))
+ finally:
+ if isinstance(bind, engine.Connection):
+ bind.close()
+
+ def test_bind_implicit_execution(self):
+ metadata = MetaData()
+ table = Table(
+ "test_table",
+ metadata,
+ Column("foo", Integer),
+ test_needs_acid=True,
+ )
+ conn = testing.db.connect()
+ metadata.create_all(bind=conn)
+ try:
+ trans = conn.begin()
+ metadata.bind = conn
+ t = table.insert()
+ assert t.bind is conn
+ table.insert().execute(foo=5)
+ table.insert().execute(foo=6)
+ table.insert().execute(foo=7)
+ trans.rollback()
+ metadata.bind = None
+ assert (
+ conn.exec_driver_sql(
+ "select count(*) from test_table"
+ ).scalar()
+ == 0
+ )
+ finally:
+ metadata.drop_all(bind=conn)
+
+ def test_bind_clauseelement(self):
+ metadata = MetaData()
+ table = Table("test_table", metadata, Column("foo", Integer))
+ metadata.create_all(bind=testing.db)
+ try:
+ for elem in [
+ table.select,
+ lambda **kwargs: sa.func.current_timestamp(**kwargs).select(),
+ # func.current_timestamp().select,
+ lambda **kwargs: text("select * from test_table", **kwargs),
+ ]:
+ for bind in (testing.db, testing.db.connect()):
+ try:
+ with testing.expect_deprecated_20(
+ "The .*bind argument is deprecated"
+ ):
+ e = elem(bind=bind)
+ assert e.bind is bind
+ e.execute().close()
+ finally:
+ if isinstance(bind, engine.Connection):
+ bind.close()
+
+ e = elem()
+ assert e.bind is None
+ assert_raises(exc.UnboundExecutionError, e.execute)
+ finally:
+ if isinstance(bind, engine.Connection):
+ bind.close()
+ metadata.drop_all(bind=testing.db)
+
def test_inspector_constructor_engine(self):
with testing.expect_deprecated(
r"The __init__\(\) method on Inspector is deprecated and will "
r"\"ConnectionEvents.after_execute\" event listener",
):
e1.execute(select(1))
+
+
+class DDLExecutionTest(fixtures.TestBase):
+ def setup(self):
+ self.engine = engines.mock_engine()
+ self.metadata = MetaData(self.engine)
+ self.users = Table(
+ "users",
+ self.metadata,
+ Column("user_id", Integer, primary_key=True),
+ Column("user_name", String(40)),
+ )
+
+ @testing.requires.sqlite
+ def test_ddl_execute(self):
+ engine = create_engine("sqlite:///")
+ cx = engine.connect()
+ table = self.users
+ ddl = DDL("SELECT 1")
+
+ eng_msg = r"The Engine.execute\(\) method is considered legacy"
+ ddl_msg = r"The DDL.execute\(\) method is considered legacy"
+ for spec in (
+ (engine.execute, ddl, eng_msg),
+ (engine.execute, ddl, table, eng_msg),
+ (ddl.execute, engine, ddl_msg),
+ (ddl.execute, engine, table, ddl_msg),
+ (ddl.execute, cx, ddl_msg),
+ (ddl.execute, cx, table, ddl_msg),
+ ):
+ fn = spec[0]
+ arg = spec[1:-1]
+ warning = spec[-1]
+
+ with testing.expect_deprecated_20(warning):
+ r = fn(*arg)
+ eq_(list(r), [(1,)])
+
+ for fn, kw in ((ddl.execute, {}), (ddl.execute, dict(target=table))):
+ with testing.expect_deprecated_20(ddl_msg):
+ assert_raises(exc.UnboundExecutionError, fn, **kw)
+
+ for bind in engine, cx:
+ ddl.bind = bind
+ for fn, kw in (
+ (ddl.execute, {}),
+ (ddl.execute, dict(target=table)),
+ ):
+ with testing.expect_deprecated_20(ddl_msg):
+ r = fn(**kw)
+ eq_(list(r), [(1,)])
addresses.insert(), address_id=1, user_id=2, address="foo@bar.com"
)
- r = connection.execute(
- text("select * from addresses", bind=testing.db)
- ).first()
+ r = connection.execute(text("select * from addresses")).first()
eq_(r[0:1], (1,))
eq_(r[1:], (2, "foo@bar.com"))
eq_(r[:-1], (1, 2))
addresses.insert(), address_id=1, user_id=2, address="foo@bar.com"
)
- r = connection.execute(
- text("select * from addresses", bind=testing.db)
- )
+ r = connection.execute(text("select * from addresses"))
eq_(
r.mappings().all(),
[{"address_id": 1, "user_id": 2, "address": "foo@bar.com"}],
"from users "
"UNION select users.user_id, "
"users.user_name from users",
- bind=testing.db,
).execution_options(sqlite_raw_colnames=True)
).first()
"from users "
"UNION select users.user_id, "
"users.user_name from users",
- bind=testing.db,
)
).first()
eq_(r._mapping["user_id"], 1)
'select users.user_id AS "users.user_id", '
'users.user_name AS "users.user_name" '
"from users",
- bind=testing.db,
).execution_options(sqlite_raw_colnames=True)
).first()
eq_(r._mapping["users.user_id"], 1)
binary_table.select(order_by=binary_table.c.primary_id),
text(
"select * from binary_table order by binary_table.primary_id",
- bind=testing.db,
).columns(
**{
"pickled": PickleType,