--- /dev/null
+.. change::
+ :tags: changed, engine
+ :tickets: 4755
+
+ Deprecated remaining engine-level introspection and utility methods
+ including :meth:`.Engine.run_callable`, :meth:`.Engine.transaction`,
+ :meth:`.Engine.table_names`, :meth:`.Engine.has_table`. The utility
+ methods are superseded by modern context-manager patterns, and the table
+ introspection tasks are suited by the :class:`.Inspector` object.
+
+.. change::
+ :tags: changed, engine
+ :tickets: 4755
+
+ The internal dialect method ``Dialect.reflecttable`` has been removed. A
+ review of third party dialects has not found any making use of this method,
+ as it was already documented as one that should not be used by external
+ dialects. Additionally, the private ``Engine._run_visitor`` method
+ is also removed.
+
+
+.. change::
+ :tags: changed, engine
+ :tickets: 4755
+
+ The long-deprecated ``Inspector.get_table_names.order_by`` parameter has
+ been removed.
+
+.. change::
+ :tags: feature, engine
+ :tickets: 4755
+
+ The :paramref:`.Table.autoload_with` parameter now accepts an :class:`.Inspector` object
+ directly, as well as any :class:`.Engine` or :class:`.Connection` as was the case before.
+
from .interfaces import ExceptionContext
from .util import _distill_params
from .. import exc
+from .. import inspection
from .. import interfaces
from .. import log
from .. import util
else:
util.reraise(*exc_info)
+ def _run_ddl_visitor(self, visitorcallable, element, **kwargs):
+ """run a DDL visitor.
+
+ This method is only here so that the MockConnection can change the
+ options given to the visitor so that "checkfirst" is skipped.
+
+ """
+ visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
+
+ @util.deprecated(
+ "1.4",
+ "The :meth:`.Connection.transaction` method is deprecated and will be "
+ "removed in a future release. Use the :meth:`.Engine.begin` "
+ "context manager instead.",
+ )
def transaction(self, callable_, *args, **kwargs):
r"""Execute the given function within a transaction boundary.
"""
+ kwargs["_sa_skip_warning"] = True
trans = self.begin()
try:
ret = self.run_callable(callable_, *args, **kwargs)
with util.safe_reraise():
trans.rollback()
+ @util.deprecated(
+ "1.4",
+ "The :meth:`.Connection.run_callable` method is deprecated and will "
+ "be removed in a future release. Use a context manager instead.",
+ )
def run_callable(self, callable_, *args, **kwargs):
r"""Given a callable object or function, execute it, passing
a :class:`.Connection` as the first argument.
"""
return callable_(self, *args, **kwargs)
- def _run_visitor(self, visitorcallable, element, **kwargs):
- visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
-
class ExceptionContextImpl(ExceptionContext):
"""Implement the :class:`.ExceptionContext` interface."""
else:
yield connection
- def _run_visitor(
- self, visitorcallable, element, connection=None, **kwargs
- ):
- with self._optional_conn_ctx_manager(connection) as conn:
- conn._run_visitor(visitorcallable, element, **kwargs)
-
class _trans_ctx(object):
def __init__(self, conn, transaction, close_with_result):
self.conn = conn
conn.close()
return Engine._trans_ctx(conn, trans, close_with_result)
+ @util.deprecated(
+ "1.4",
+ "The :meth:`.Engine.transaction` method is deprecated and will be "
+ "removed in a future release. Use the :meth:`.Engine.begin` context "
+ "manager instead.",
+ )
def transaction(self, callable_, *args, **kwargs):
r"""Execute the given function within a transaction boundary.
:meth:`.Engine.transaction`
"""
-
+ kwargs["_sa_skip_warning"] = True
with self.connect() as conn:
return conn.transaction(callable_, *args, **kwargs)
+ @util.deprecated(
+ "1.4",
+ "The :meth:`.Engine.run_callable` method is deprecated and will be "
+ "removed in a future release. Use the :meth:`.Engine.connect` "
+ "context manager instead.",
+ )
def run_callable(self, callable_, *args, **kwargs):
r"""Given a callable object or function, execute it, passing
a :class:`.Connection` as the first argument.
which one is being dealt with.
"""
+ kwargs["_sa_skip_warning"] = True
with self.connect() as conn:
return conn.run_callable(callable_, *args, **kwargs)
+ def _run_ddl_visitor(self, visitorcallable, element, **kwargs):
+ with self.connect() as conn:
+ conn._run_ddl_visitor(visitorcallable, element, **kwargs)
+
def execute(self, statement, *multiparams, **params):
"""Executes the given construct and returns a :class:`.ResultProxy`.
return self._connection_cls(self, close_with_result=close_with_result)
+ @util.deprecated(
+ "1.4",
+ "The :meth:`.Engine.table_names` method is deprecated and will be "
+ "removed in a future release. Please refer to "
+ ":meth:`.Inspector.get_table_names`.",
+ )
def table_names(self, schema=None, connection=None):
"""Return a list of all table names available in the database.
:param connection: Optional, use a specified connection.
"""
-
with self._optional_conn_ctx_manager(connection) as conn:
- if not schema:
- schema = self.dialect.default_schema_name
- return self.dialect.get_table_names(conn, schema)
-
+ insp = inspection.inspect(conn)
+ return insp.get_table_names(schema)
+
+ @util.deprecated(
+ "1.4",
+ "The :meth:`.Engine.has_table` method is deprecated and will be "
+ "removed in a future release. Please refer to "
+ ":meth:`.Inspector.has_table`.",
+ )
def has_table(self, table_name, schema=None):
"""Return True if the given backend has a table of the given name.
with a schema identifier.
"""
- return self.run_callable(self.dialect.has_table, table_name, schema)
+ with self._optional_conn_ctx_manager(None) as conn:
+ insp = inspection.inspect(conn)
+ return insp.has_table(table_name, schema=schema)
def _wrap_pool_connect(self, fn, connection):
dialect = self.dialect
import weakref
from . import interfaces
-from . import reflection
from . import result
from .. import event
from .. import exc
"""
return sqltypes.adapt_type(typeobj, self.colspecs)
- def reflecttable(
- self,
- connection,
- table,
- include_columns,
- exclude_columns,
- resolve_fks,
- **opts
- ):
- insp = reflection.Inspector.from_engine(connection)
- return insp.reflecttable(
- table, include_columns, exclude_columns, resolve_fks, **opts
- )
-
def get_pk_constraint(self, conn, table_name, schema=None, **kw):
"""Compatibility method, adapts the result of get_primary_keys()
for those dialects which don't implement get_pk_constraint().
pass
- def reflecttable(
- self, connection, table, include_columns, exclude_columns, resolve_fks
- ):
- """Load table description from the database.
-
- Given a :class:`.Connection` and a
- :class:`~sqlalchemy.schema.Table` object, reflect its columns and
- properties from the database.
-
- The implementation of this method is provided by
- :meth:`.DefaultDialect.reflecttable`, which makes use of
- :class:`.Inspector` to retrieve column information.
-
- Dialects should **not** seek to implement this method, and should
- instead implement individual schema inspection operations such as
- :meth:`.Dialect.get_columns`, :meth:`.Dialect.get_pk_constraint`,
- etc.
-
- """
-
- raise NotImplementedError()
-
def get_columns(self, connection, table_name, schema=None, **kw):
"""Return information about columns in `table_name`.
"""
raise NotImplementedError()
- def has_table(self, connection, table_name, schema=None):
+ def has_table(self, connection, table_name, schema=None, **kw):
"""Check the existence of a particular table in the database.
Given a :class:`.Connection` object and a string
raise NotImplementedError()
- def has_sequence(self, connection, sequence_name, schema=None):
+ def has_sequence(self, connection, sequence_name, schema=None, **kw):
"""Check the existence of a particular sequence in the database.
Given a :class:`.Connection` object and a string
ddl.SchemaDropper(self.dialect, self, **kwargs).traverse_single(entity)
- def _run_visitor(
+ def _run_ddl_visitor(
self, visitorcallable, element, connection=None, **kwargs
):
kwargs["checkfirst"] = False
return ret
+@inspection._self_inspects
class Inspector(object):
"""Performs database schema inspection.
)
return []
- @util.deprecated_params(
- order_by=(
- "1.0",
- "The :paramref:`get_table_names.order_by` parameter is deprecated "
- "and will be removed in a future release. Please refer to "
- ":meth:`.Inspector.get_sorted_table_and_fkc_names` for a "
- "more comprehensive solution to resolving foreign key cycles "
- "between tables.",
- )
- )
- def get_table_names(self, schema=None, order_by=None):
+ def get_table_names(self, schema=None):
"""Return all table names in referred to within a particular schema.
The names are expected to be real tables only, not views.
"""
- if hasattr(self.dialect, "get_table_names"):
- tnames = self.dialect.get_table_names(
- self.bind, schema, info_cache=self.info_cache
- )
- else:
- tnames = self.engine.table_names(schema)
- if order_by == "foreign_key":
- tuples = []
- for tname in tnames:
- for fkey in self.get_foreign_keys(tname, schema):
- if tname != fkey["referred_table"]:
- tuples.append((fkey["referred_table"], tname))
- tnames = list(topological.sort(tuples, tnames))
- return tnames
+ return self.dialect.get_table_names(
+ self.bind, schema, info_cache=self.info_cache
+ )
+
+ def has_table(self, table_name, schema=None):
+ """Return True if the backend has a table of the given name.
+
+ .. versionadded:: 1.4
+
+ """
+ # TODO: info_cache?
+ return self.dialect.has_table(self.bind, table_name, schema)
def get_sorted_table_and_fkc_names(self, schema=None):
"""Return dependency-sorted table and foreign key constraint names in
:paramref:`.Table.extend_existing`
- :param autoload_with: An :class:`.Engine` or :class:`.Connection` object
- with which this :class:`.Table` object will be reflected; when
- set to a non-None value, it implies that :paramref:`.Table.autoload`
- is ``True``. If left unset, but :paramref:`.Table.autoload` is
- explicitly set to ``True``, an autoload operation will attempt to
- proceed by locating an :class:`.Engine` or :class:`.Connection` bound
- to the underlying :class:`.MetaData` object.
+ :param autoload_with: An :class:`.Engine` or :class:`.Connection` object,
+ or a :class:`.Inspector` object as returned by :func:`.inspect`
+ against one, with which this :class:`.Table` object will be reflected.
+ When set to a non-None value, it implies that
+ :paramref:`.Table.autoload` is ``True``. If left unset, but
+ :paramref:`.Table.autoload` is explicitly set to ``True``, an autoload
+ operation will attempt to proceed by locating an :class:`.Engine` or
+ :class:`.Connection` bound to the underlying :class:`.MetaData` object.
.. seealso::
resolve_fks=True,
_extend_on=None,
):
-
- if autoload_with:
- autoload_with.run_callable(
- autoload_with.dialect.reflecttable,
- self,
- include_columns,
- exclude_columns,
- resolve_fks,
- _extend_on=_extend_on,
- )
- else:
- bind = _bind_or_error(
+ if autoload_with is None:
+ autoload_with = _bind_or_error(
metadata,
msg="No engine is bound to this Table's MetaData. "
"Pass an engine to the Table via "
- "autoload_with=<someengine>, "
- "or associate the MetaData with an engine via "
- "metadata.bind=<someengine>",
- )
- bind.run_callable(
- bind.dialect.reflecttable,
- self,
- include_columns,
- exclude_columns,
- resolve_fks,
- _extend_on=_extend_on,
+ "autoload_with=<someengine_or_connection>",
)
+ insp = inspection.inspect(autoload_with)
+ insp.reflecttable(
+ self,
+ include_columns,
+ exclude_columns,
+ resolve_fks,
+ _extend_on=_extend_on,
+ )
+
@property
def _sorted_constraints(self):
"""Return the set of constraints as a list, sorted by creation
else:
return []
+ @util.deprecated(
+ "1.4",
+ "The :meth:`.Table.exists` method is deprecated and will be "
+ "removed in a future release. Please refer to "
+ ":meth:`.Inspector.has_table`.",
+ )
def exists(self, bind=None):
"""Return True if this table exists."""
if bind is None:
bind = _bind_or_error(self)
- return bind.run_callable(
- bind.dialect.has_table, self.name, schema=self.schema
- )
+ insp = inspection.inspect(bind)
+ return insp.has_table(self.name, schema=self.schema)
def create(self, bind=None, checkfirst=False):
"""Issue a ``CREATE`` statement for this
if bind is None:
bind = _bind_or_error(self)
- bind._run_visitor(ddl.SchemaGenerator, self, checkfirst=checkfirst)
+ bind._run_ddl_visitor(ddl.SchemaGenerator, self, checkfirst=checkfirst)
def drop(self, bind=None, checkfirst=False):
"""Issue a ``DROP`` statement for this
"""
if bind is None:
bind = _bind_or_error(self)
- bind._run_visitor(ddl.SchemaDropper, self, checkfirst=checkfirst)
+ bind._run_ddl_visitor(ddl.SchemaDropper, self, checkfirst=checkfirst)
def tometadata(
self,
if bind is None:
bind = _bind_or_error(self)
- bind._run_visitor(ddl.SchemaGenerator, self, checkfirst=checkfirst)
+ bind._run_ddl_visitor(ddl.SchemaGenerator, self, checkfirst=checkfirst)
def drop(self, bind=None, checkfirst=True):
"""Drops this sequence from the database."""
if bind is None:
bind = _bind_or_error(self)
- bind._run_visitor(ddl.SchemaDropper, self, checkfirst=checkfirst)
+ bind._run_ddl_visitor(ddl.SchemaDropper, self, checkfirst=checkfirst)
def _not_a_column_expr(self):
raise exc.InvalidRequestError(
"""
if bind is None:
bind = _bind_or_error(self)
- bind._run_visitor(ddl.SchemaGenerator, self)
+ bind._run_ddl_visitor(ddl.SchemaGenerator, self)
return self
def drop(self, bind=None):
"""
if bind is None:
bind = _bind_or_error(self)
- bind._run_visitor(ddl.SchemaDropper, self)
+ bind._run_ddl_visitor(ddl.SchemaDropper, self)
def __repr__(self):
return "Index(%s)" % (
bind = _bind_or_error(self)
with bind.connect() as conn:
+ insp = inspection.inspect(conn)
reflect_opts = {
- "autoload": True,
- "autoload_with": conn,
+ "autoload_with": insp,
"extend_existing": extend_existing,
"autoload_replace": autoload_replace,
"resolve_fks": resolve_fks,
if schema is not None:
reflect_opts["schema"] = schema
- available = util.OrderedSet(
- bind.engine.table_names(schema, connection=conn)
- )
+ available = util.OrderedSet(insp.get_table_names(schema))
if views:
- available.update(bind.dialect.get_view_names(conn, schema))
+ available.update(insp.get_view_names(schema))
if schema is not None:
available_w_schema = util.OrderedSet(
"""
if bind is None:
bind = _bind_or_error(self)
- bind._run_visitor(
+ bind._run_ddl_visitor(
ddl.SchemaGenerator, self, checkfirst=checkfirst, tables=tables
)
"""
if bind is None:
bind = _bind_or_error(self)
- bind._run_visitor(
+ bind._run_ddl_visitor(
ddl.SchemaDropper, self, checkfirst=checkfirst, tables=tables
)
+@util.deprecated_cls(
+ "1.4",
+ ":class:`.ThreadLocalMetaData` is deprecated and will be removed "
+ "in a future release.",
+ constructor="__init__",
+)
class ThreadLocalMetaData(MetaData):
"""A MetaData variant that presents a different ``bind`` in every thread.
@decorator
def warned(fn, *args, **kwargs):
- warnings.warn(message, wtype, stacklevel=3)
+ skip_warning = kwargs.pop("_sa_skip_warning", False)
+ if not skip_warning:
+ warnings.warn(message, wtype, stacklevel=3)
return fn(*args, **kwargs)
doc = func.__doc__ is not None and func.__doc__ or ""
],
)
- assert testing.db.has_table("bar", schema=referred_schema)
+ assert inspect(testing.db).has_table("bar", schema=referred_schema)
m2 = MetaData()
Table(
@testing.provide_metadata
def test_has_temporary_table(self):
- assert not testing.db.has_table("some_temp_table")
+ assert not inspect(testing.db).has_table("some_temp_table")
user_tmp = Table(
"some_temp_table",
self.metadata,
prefixes=["TEMPORARY"],
)
user_tmp.create(testing.db)
- assert testing.db.has_table("some_temp_table")
+ assert inspect(testing.db).has_table("some_temp_table")
@testing.provide_metadata
def test_cross_schema_reflection_one(self):
A_table = Table("A", metadata, Column("x", Integer))
a_table.create()
- assert testing.db.has_table("a")
- assert not testing.db.has_table("A")
+ assert inspect(testing.db).has_table("a")
+ assert not inspect(testing.db).has_table("A")
A_table.create(checkfirst=True)
- assert testing.db.has_table("A")
+ assert inspect(testing.db).has_table("A")
def test_uppercase_lowercase_sequence(self):
import sqlalchemy as sa
from sqlalchemy import engine
from sqlalchemy import exc
+from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import testing
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing import is_false
+from sqlalchemy.testing import is_true
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
for bind in (testing.db, testing.db.connect()):
for args in [([], {"bind": bind}), ([bind], {})]:
metadata.create_all(*args[0], **args[1])
- assert table.exists(*args[0], **args[1])
+ is_true(inspect(bind).has_table(table.name))
metadata.drop_all(*args[0], **args[1])
table.create(*args[0], **args[1])
table.drop(*args[0], **args[1])
- assert not table.exists(*args[0], **args[1])
+ is_false(inspect(bind).has_table(table.name))
def test_create_drop_err_metadata(self):
metadata = MetaData()
metadata = MetaData()
table = Table("test_table", metadata, Column("foo", Integer))
- for meth in [table.exists, table.create, table.drop]:
+ for meth in [table.create, table.drop]:
assert_raises_message(
exc.UnboundExecutionError,
(
)
assert metadata.bind is table.bind is bind
metadata.create_all()
- assert table.exists()
+ is_true(inspect(bind).has_table(table.name))
metadata.drop_all()
table.create()
table.drop()
- assert not table.exists()
+ is_false(inspect(bind).has_table(table.name))
finally:
if isinstance(bind, engine.Connection):
bind.close()
from sqlalchemy import event
from sqlalchemy import ForeignKey
from sqlalchemy import func
-from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import literal
from sqlalchemy import MetaData
from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.mock import MockConnection
from sqlalchemy.interfaces import ConnectionProxy
+from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing import is_false
+from sqlalchemy.testing import is_true
from sqlalchemy.testing.mock import Mock
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
)
-class TableNamesOrderByTest(fixtures.TestBase):
- @testing.provide_metadata
- def test_order_by_foreign_key(self):
- Table(
- "t1",
- self.metadata,
- Column("id", Integer, primary_key=True),
- test_needs_acid=True,
- )
- Table(
- "t2",
- self.metadata,
- Column("id", Integer, primary_key=True),
- Column("t1id", Integer, ForeignKey("t1.id")),
- test_needs_acid=True,
- )
- Table(
- "t3",
- self.metadata,
- Column("id", Integer, primary_key=True),
- Column("t2id", Integer, ForeignKey("t2.id")),
- test_needs_acid=True,
- )
- self.metadata.create_all()
- insp = inspect(testing.db)
- with testing.expect_deprecated(
- "The get_table_names.order_by parameter is deprecated "
- ):
- tnames = insp.get_table_names(order_by="foreign_key")
- eq_(tnames, ["t1", "t2", "t3"])
-
-
def _proxy_execute_deprecated():
return (
testing.expect_deprecated("ConnectionProxy.execute is deprecated."),
)
+class TransactionTest(fixtures.TestBase):
+ __backend__ = True
+
+ @classmethod
+ def setup_class(cls):
+ metadata = MetaData()
+ cls.users = Table(
+ "query_users",
+ metadata,
+ Column("user_id", Integer, primary_key=True),
+ Column("user_name", String(20)),
+ test_needs_acid=True,
+ )
+ cls.users.create(testing.db)
+
+ def teardown(self):
+ testing.db.execute(self.users.delete()).close()
+
+ @classmethod
+ def teardown_class(cls):
+ cls.users.drop(testing.db)
+
+ def test_transaction_container(self):
+ users = self.users
+
+ def go(conn, table, data):
+ for d in data:
+ conn.execute(table.insert(), d)
+
+ with testing.expect_deprecated(
+ r"The Engine.transaction\(\) method is deprecated"
+ ):
+ testing.db.transaction(
+ go, users, [dict(user_id=1, user_name="user1")]
+ )
+
+ with testing.db.connect() as conn:
+ eq_(conn.execute(users.select()).fetchall(), [(1, "user1")])
+ with testing.expect_deprecated(
+ r"The Engine.transaction\(\) method is deprecated"
+ ):
+ assert_raises(
+ tsa.exc.DBAPIError,
+ testing.db.transaction,
+ go,
+ users,
+ [
+ {"user_id": 2, "user_name": "user2"},
+ {"user_id": 1, "user_name": "user3"},
+ ],
+ )
+ with testing.db.connect() as conn:
+ eq_(conn.execute(users.select()).fetchall(), [(1, "user1")])
+
+
class ProxyConnectionTest(fixtures.TestBase):
"""These are the same tests as EngineEventsTest, except using
]
conn1.close()
conn2.close()
+
+
+class DeprecatedEngineFeatureTest(fixtures.TablesTest):
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ cls.table = Table(
+ "exec_test",
+ metadata,
+ Column("a", Integer),
+ Column("b", Integer),
+ test_needs_acid=True,
+ )
+
+ def _trans_fn(self, is_transaction=False):
+ def go(conn, x, value=None):
+ if is_transaction:
+ conn = conn.connection
+ conn.execute(self.table.insert().values(a=x, b=value))
+
+ return go
+
+ def _trans_rollback_fn(self, is_transaction=False):
+ def go(conn, x, value=None):
+ if is_transaction:
+ conn = conn.connection
+ conn.execute(self.table.insert().values(a=x, b=value))
+ raise SomeException("breakage")
+
+ return go
+
+ def _assert_no_data(self):
+ eq_(
+ testing.db.scalar(
+ select([func.count("*")]).select_from(self.table)
+ ),
+ 0,
+ )
+
+ def _assert_fn(self, x, value=None):
+ eq_(testing.db.execute(self.table.select()).fetchall(), [(x, value)])
+
+ def test_transaction_engine_fn_commit(self):
+ fn = self._trans_fn()
+ with testing.expect_deprecated(r"The Engine.transaction\(\) method"):
+ testing.db.transaction(fn, 5, value=8)
+ self._assert_fn(5, value=8)
+
+ def test_transaction_engine_fn_rollback(self):
+ fn = self._trans_rollback_fn()
+ with testing.expect_deprecated(
+ r"The Engine.transaction\(\) method is deprecated"
+ ):
+ assert_raises_message(
+ Exception, "breakage", testing.db.transaction, fn, 5, value=8
+ )
+ self._assert_no_data()
+
+ def test_transaction_connection_fn_commit(self):
+ fn = self._trans_fn()
+ with testing.db.connect() as conn:
+ with testing.expect_deprecated(
+ r"The Connection.transaction\(\) method is deprecated"
+ ):
+ conn.transaction(fn, 5, value=8)
+ self._assert_fn(5, value=8)
+
+ def test_transaction_connection_fn_rollback(self):
+ fn = self._trans_rollback_fn()
+ with testing.db.connect() as conn:
+ with testing.expect_deprecated(r""):
+ assert_raises(Exception, conn.transaction, fn, 5, value=8)
+ self._assert_no_data()
+
+
+class DeprecatedReflectionTest(fixtures.TablesTest):
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "user",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("name", String(50)),
+ )
+ Table(
+ "address",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("user_id", ForeignKey("user.id")),
+ Column("email", String(50)),
+ )
+
+ def test_exists(self):
+ dont_exist = Table("dont_exist", MetaData())
+ with testing.expect_deprecated(
+ r"The Table.exists\(\) method is deprecated"
+ ):
+ is_false(dont_exist.exists(testing.db))
+
+ user = self.tables.user
+ with testing.expect_deprecated(
+ r"The Table.exists\(\) method is deprecated"
+ ):
+ is_true(user.exists(testing.db))
+
+ def test_create_drop_explicit(self):
+ metadata = MetaData()
+ table = Table("test_table", metadata, Column("foo", Integer))
+ for bind in (testing.db, testing.db.connect()):
+ for args in [([], {"bind": bind}), ([bind], {})]:
+ metadata.create_all(*args[0], **args[1])
+ with testing.expect_deprecated(
+ r"The Table.exists\(\) method is deprecated"
+ ):
+ assert table.exists(*args[0], **args[1])
+ metadata.drop_all(*args[0], **args[1])
+ table.create(*args[0], **args[1])
+ table.drop(*args[0], **args[1])
+ with testing.expect_deprecated(
+ r"The Table.exists\(\) method is deprecated"
+ ):
+ assert not table.exists(*args[0], **args[1])
+
+ def test_create_drop_err_table(self):
+ metadata = MetaData()
+ table = Table("test_table", metadata, Column("foo", Integer))
+
+ with testing.expect_deprecated(
+ r"The Table.exists\(\) method is deprecated"
+ ):
+ assert_raises_message(
+ tsa.exc.UnboundExecutionError,
+ (
+ "Table object 'test_table' is not bound to an Engine or "
+ "Connection."
+ ),
+ table.exists,
+ )
+
+ def test_engine_has_table(self):
+ with testing.expect_deprecated(
+ r"The Engine.has_table\(\) method is deprecated"
+ ):
+ is_false(testing.db.has_table("dont_exist"))
+
+ with testing.expect_deprecated(
+ r"The Engine.has_table\(\) method is deprecated"
+ ):
+ is_true(testing.db.has_table("user"))
+
+ def test_engine_table_names(self):
+ metadata = self.metadata
+
+ with testing.expect_deprecated(
+ r"The Engine.table_names\(\) method is deprecated"
+ ):
+ table_names = testing.db.table_names()
+ is_true(set(table_names).issuperset(metadata.tables))
from sqlalchemy import create_mock_engine
from sqlalchemy import event
from sqlalchemy import func
+from sqlalchemy import inspect
from sqlalchemy import INT
from sqlalchemy import Integer
from sqlalchemy import LargeBinary
from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
+from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_not_
+from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
from sqlalchemy.testing.assertsql import CompiledSQL
from sqlalchemy.testing.engines import testing_engine
# autocommit is off
self._assert_no_data()
- def test_transaction_engine_fn_commit(self):
- fn = self._trans_fn()
- testing.db.transaction(fn, 5, value=8)
- self._assert_fn(5, value=8)
-
- def test_transaction_engine_fn_rollback(self):
- fn = self._trans_rollback_fn()
- assert_raises_message(
- Exception, "breakage", testing.db.transaction, fn, 5, value=8
- )
- self._assert_no_data()
-
- def test_transaction_connection_fn_commit(self):
- fn = self._trans_fn()
- with testing.db.connect() as conn:
- conn.transaction(fn, 5, value=8)
- self._assert_fn(5, value=8)
-
- def test_transaction_connection_fn_rollback(self):
- fn = self._trans_rollback_fn()
- with testing.db.connect() as conn:
- assert_raises(Exception, conn.transaction, fn, 5, value=8)
- self._assert_no_data()
-
class CompiledCacheTest(fixtures.TestBase):
__backend__ = True
) as conn:
metadata.create_all(conn)
- assert config.db.has_table("t1", schema=config.test_schema)
- assert config.db.has_table("t2", schema=config.test_schema)
- assert config.db.has_table("t3", schema=None)
+ insp = inspect(config.db)
+ is_true(insp.has_table("t1", schema=config.test_schema))
+ is_true(insp.has_table("t2", schema=config.test_schema))
+ is_true(insp.has_table("t3", schema=None))
with config.db.connect().execution_options(
schema_translate_map=map_
) as conn:
metadata.drop_all(conn)
- assert not config.db.has_table("t1", schema=config.test_schema)
- assert not config.db.has_table("t2", schema=config.test_schema)
- assert not config.db.has_table("t3", schema=None)
+ insp = inspect(config.db)
+ is_false(insp.has_table("t1", schema=config.test_schema))
+ is_false(insp.has_table("t2", schema=config.test_schema))
+ is_false(insp.has_table("t3", schema=None))
@testing.provide_metadata
def test_crud(self):
from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import in_
+from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
from sqlalchemy.testing import not_in_
@testing.provide_metadata
def test_reflect_all(self):
- existing = testing.db.table_names()
+ existing = inspect(testing.db).get_table_names()
names = ["rt_%s" % name for name in ("a", "b", "c", "d", "e")]
nameset = set(names)
m = MetaData()
- reflecttable = testing.db.dialect.reflecttable
+ inspector = sa.engine.reflection.Inspector
+ reflecttable = inspector.reflecttable
- def patched(conn, table, *arg, **kw):
+ def patched(self, table, *arg, **kw):
if table.name == "rt_c":
raise sa.exc.UnreflectableTableError("Can't reflect rt_c")
else:
- return reflecttable(conn, table, *arg, **kw)
+ return reflecttable(self, table, *arg, **kw)
- with mock.patch.object(testing.db.dialect, "reflecttable", patched):
+ with mock.patch.object(inspector, "reflecttable", patched):
with expect_warnings("Skipping table rt_c: Can't reflect rt_c"):
m.reflect(bind=testing.db)
_drop_views(metadata.bind)
-class CreateDropTest(fixtures.TestBase):
+class CreateDropTest(fixtures.TablesTest):
__backend__ = True
+ run_create_tables = None
+
@classmethod
- def setup_class(cls):
- global metadata, users
- metadata = MetaData()
- users = Table(
+ def define_tables(cls, metadata):
+ Table(
"users",
metadata,
Column(
sa.Sequence("address_id_seq", optional=True),
primary_key=True,
),
- Column("user_id", sa.Integer, sa.ForeignKey(users.c.user_id)),
+ Column("user_id", sa.Integer, sa.ForeignKey("users.user_id")),
Column("email_address", sa.String(40)),
)
sa.Sequence("order_id_seq", optional=True),
primary_key=True,
),
- Column("user_id", sa.Integer, sa.ForeignKey(users.c.user_id)),
+ Column("user_id", sa.Integer, sa.ForeignKey("users.user_id")),
Column("description", sa.String(50)),
Column("isopen", sa.Integer),
)
Column("item_name", sa.VARCHAR(50)),
)
+ def teardown(self):
+ self.metadata.drop_all(testing.db)
+
def test_sorter(self):
- tables = metadata.sorted_tables
+ tables = self.metadata.sorted_tables
table_names = [t.name for t in tables]
ua = [n for n in table_names if n in ("users", "email_addresses")]
oi = [n for n in table_names if n in ("orders", "items")]
eq_(oi, ["orders", "items"])
def test_checkfirst(self):
- try:
- assert not users.exists(testing.db)
- users.create(bind=testing.db)
- assert users.exists(testing.db)
- users.create(bind=testing.db, checkfirst=True)
- users.drop(bind=testing.db)
- users.drop(bind=testing.db, checkfirst=True)
- assert not users.exists(bind=testing.db)
- users.create(bind=testing.db, checkfirst=True)
- users.drop(bind=testing.db)
- finally:
- metadata.drop_all(bind=testing.db)
+ insp = inspect(testing.db)
+ users = self.tables.users
+
+ is_false(insp.has_table("users"))
+ users.create(bind=testing.db)
+ is_true(insp.has_table("users"))
+ users.create(bind=testing.db, checkfirst=True)
+ users.drop(bind=testing.db)
+ users.drop(bind=testing.db, checkfirst=True)
+ is_false(insp.has_table("users"))
+ users.create(bind=testing.db, checkfirst=True)
+ users.drop(bind=testing.db)
def test_createdrop(self):
+ insp = inspect(testing.db)
+ metadata = self.metadata
metadata.create_all(bind=testing.db)
- eq_(testing.db.has_table("items"), True)
- eq_(testing.db.has_table("email_addresses"), True)
+ is_true(insp.has_table("items"))
+ is_true(insp.has_table("email_addresses"))
metadata.create_all(bind=testing.db)
- eq_(testing.db.has_table("items"), True)
+ is_true(insp.has_table("items"))
metadata.drop_all(bind=testing.db)
- eq_(testing.db.has_table("items"), False)
- eq_(testing.db.has_table("email_addresses"), False)
+ is_false(insp.has_table("items"))
+ is_false(insp.has_table("email_addresses"))
metadata.drop_all(bind=testing.db)
- eq_(testing.db.has_table("items"), False)
+ is_false(insp.has_table("items"))
def test_tablenames(self):
+ metadata = self.metadata
metadata.create_all(bind=testing.db)
+ insp = inspect(testing.db)
- # we only check to see if all the explicitly created tables are
- # there, rather than assertEqual -- the test db could have
- # "extra" tables if there is a misconfigured template. (*cough*
- # tsearch2 w/ the pg windows installer.)
-
- self.assert_(not set(metadata.tables) - set(testing.db.table_names()))
- metadata.drop_all(bind=testing.db)
+ # ensure all tables we created are in the list.
+ is_true(set(insp.get_table_names()).issuperset(metadata.tables))
class SchemaManipulationTest(fixtures.TestBase):
@testing.requires.unicode_connections
def test_has_table(self):
+ insp = inspect(testing.db)
for tname, cname, ixname in self.names:
- assert testing.db.has_table(tname), "Can't detect name %s" % tname
+ assert insp.has_table(tname), "Can't detect name %s" % tname
@testing.requires.unicode_connections
def test_basic(self):
bind = testing.db
names = set([rec[0] for rec in self.names])
- reflected = set(bind.table_names())
+ reflected = set(inspect(bind).get_table_names())
# Jython 2.5 on Java 5 lacks unicodedata.normalize
@testing.fails_if(testing.requires._has_mysql_on_windows)
def test_table_names(self):
- x = testing.db.run_callable(testing.db.dialect.get_table_names)
+ x = inspect(testing.db).get_table_names()
assert set(["SomeTable", "SomeOtherTable"]).issubset(x)
def test_reflect_exact_name(self):
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import VARCHAR
-from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_warnings
assert len(result.fetchall()) == 0
connection.close()
- def test_transaction_container(self):
- def go(conn, table, data):
- for d in data:
- conn.execute(table.insert(), d)
-
- testing.db.transaction(go, users, [dict(user_id=1, user_name="user1")])
- eq_(testing.db.execute(users.select()).fetchall(), [(1, "user1")])
- assert_raises(
- exc.DBAPIError,
- testing.db.transaction,
- go,
- users,
- [
- {"user_id": 2, "user_name": "user2"},
- {"user_id": 1, "user_name": "user3"},
- ],
- )
- eq_(testing.db.execute(users.select()).fetchall(), [(1, "user1")])
-
def test_nested_rollback(self):
connection = testing.db.connect()
try:
class AttachedFileShardTest(ShardTest, fixtures.TestBase):
+ """Use modern schema conventions along with SQLite ATTACH."""
+
+ schema = "changeme"
+
+ def _init_dbs(self):
+ e = testing_engine("sqlite://")
+ with e.connect() as conn:
+ for i in range(1, 5):
+ conn.execute(
+ 'ATTACH DATABASE "shard%s_%s.db" AS shard%s'
+ % (i, provision.FOLLOWER_IDENT, i)
+ )
+
+ db1 = e.execution_options(schema_translate_map={"changeme": "shard1"})
+ db2 = e.execution_options(schema_translate_map={"changeme": "shard2"})
+ db3 = e.execution_options(schema_translate_map={"changeme": "shard3"})
+ db4 = e.execution_options(schema_translate_map={"changeme": "shard4"})
+
+ self.engine = e
+ return db1, db2, db3, db4
+
+ def teardown(self):
+ clear_mappers()
+
+ self.engine.connect().invalidate()
+ for i in range(1, 5):
+ os.remove("shard%d_%s.db" % (i, provision.FOLLOWER_IDENT))
+
+
+class TableNameConventionShardTest(ShardTest, fixtures.TestBase):
+ """This fixture uses a single SQLite database along with a table naming
+ convention to achieve sharding. Event hooks are used to rewrite SQL
+ statements.
+
+ This used to be called "AttachedFileShardTest" but I didn't see any
+ ATTACH going on.
+
+ The approach taken by this test is awkward and I wouldn't recommend using
+ this pattern in a real situation. I'm not sure of the history of this test
+ but it likely predates when we knew how to use real ATTACH in SQLite.
+
+ """
+
schema = "changeme"
def _init_dbs(self):
)
insp = inspect(testing.db)
- assert testing.db.has_table(t1.name)
+ assert insp.has_table(t1.name)
eq_([c["name"] for c in insp.get_columns(t1.name)], ["id"])
- assert testing.db.has_table(t2.name)
+ assert insp.has_table(t2.name)
eq_([c["name"] for c in insp.get_columns(t2.name)], ["id"])
- assert testing.db.has_table(t3.name)
+ assert insp.has_table(t3.name)
eq_([c["name"] for c in insp.get_columns(t3.name)], ["id"])
- assert testing.db.has_table(t4.name)
+ assert insp.has_table(t4.name)
eq_([c["name"] for c in insp.get_columns(t4.name)], ["id"])
def test_basic(self):