from sqlalchemy import event
from sqlalchemy import inspect
from sqlalchemy import schema as sa_schema
+from sqlalchemy import text
from sqlalchemy import types as sqltypes
from sqlalchemy.util import OrderedSet
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.sql.elements import quoted_name
+ from sqlalchemy.sql.elements import TextClause
from sqlalchemy.sql.schema import Column
from sqlalchemy.sql.schema import ForeignKeyConstraint
from sqlalchemy.sql.schema import Index
from sqlalchemy.sql.schema import UniqueConstraint
from alembic.autogenerate.api import AutogenContext
+ from alembic.ddl.impl import DefaultImpl
from alembic.operations.ops import AlterColumnOp
from alembic.operations.ops import MigrationScript
from alembic.operations.ops import ModifyTableOps
def _make_index(params: Dict[str, Any], conn_table: Table) -> Optional[Index]:
- exprs = []
- for col_name in params["column_names"]:
+ exprs: list[Union[Column[Any], TextClause]] = []
+ for num, col_name in enumerate(params["column_names"]):
+ item: Union[Column[Any], TextClause]
if col_name is None:
- util.warn(
- "Skipping reflected expression-based "
- f"index {params['name']!r}"
- )
- return None
+ assert "expressions" in params
+ item = text(params["expressions"][num])
else:
item = conn_table.c[col_name]
exprs.append(item)
class _ix_constraint_sig(_constraint_sig):
is_index = True
- def __init__(self, const: Index) -> None:
+ def __init__(self, const: Index, impl: DefaultImpl) -> None:
self.const = const
self.name = const.name
- self.sig = tuple(sorted([col.name for col in const.columns]))
+ self.sig = impl.create_index_sig(const)
self.is_unique = bool(const.unique)
def md_name_to_sql_name(self, context: AutogenContext) -> Optional[str]:
_uq_constraint_sig(uq) for uq in metadata_unique_constraints
}
- metadata_indexes_sig = {_ix_constraint_sig(ix) for ix in metadata_indexes}
+ impl = autogen_context.migration_context.impl
+ metadata_indexes_sig = {
+ _ix_constraint_sig(ix, impl) for ix in metadata_indexes
+ }
conn_unique_constraints = {_uq_constraint_sig(uq) for uq in conn_uniques}
- conn_indexes_sig = {_ix_constraint_sig(ix) for ix in conn_indexes}
+ conn_indexes_sig = {_ix_constraint_sig(ix, impl) for ix in conn_indexes}
# 5. index things by name, for those objects that have names
metadata_names = {
)
if conn_obj.sig != metadata_obj.sig:
msg.append(
- " columns %r to %r" % (conn_obj.sig, metadata_obj.sig)
+ " expression %r to %r" % (conn_obj.sig, metadata_obj.sig)
)
if msg:
bool(diff) or bool(metadata_identity) != bool(inspector_identity),
)
+ def create_index_sig(self, index: Index) -> Tuple[Any, ...]:
+ # order of col matters in an index
+ return tuple(col.name for col in index.columns)
+
+ def _skip_functional_indexes(self, metadata_indexes, conn_indexes):
+ conn_indexes_by_name = {c.name: c for c in conn_indexes}
+
+ for idx in list(metadata_indexes):
+ if idx.name in conn_indexes_by_name:
+ continue
+ iex = sqla_compat.is_expression_index(idx)
+ if iex:
+ util.warn(
+ "autogenerate skipping metadata-specified "
+ "expression-based index "
+ f"{idx.name!r}; dialect {self.__dialect__!r} under "
+ f"SQLAlchemy {sqla_compat.sqlalchemy_version} can't "
+ "reflect these indexes so they can't be compared"
+ )
+ metadata_indexes.discard(idx)
+
def _compare_identity_options(
attributes, metadata_io, inspector_io, default_io
from typing import Union
from sqlalchemy import Column
+from sqlalchemy import Index
from sqlalchemy import literal_column
from sqlalchemy import Numeric
from sqlalchemy import text
from sqlalchemy.dialects.postgresql import INTEGER
from sqlalchemy.schema import CreateIndex
from sqlalchemy.sql.elements import ColumnClause
-from sqlalchemy.sql.elements import UnaryExpression
from sqlalchemy.types import NULLTYPE
from .base import alter_column
metadata_indexes,
):
- conn_indexes_by_name = {c.name: c for c in conn_indexes}
-
doubled_constraints = {
index
for index in conn_indexes
for ix in doubled_constraints:
conn_indexes.remove(ix)
- for idx in list(metadata_indexes):
- if idx.name in conn_indexes_by_name:
- continue
- exprs = idx.expressions
- for expr in exprs:
- while isinstance(expr, UnaryExpression):
- expr = expr.element
- if not isinstance(expr, Column):
- if sqla_compat.sqla_2:
- msg = ""
- else:
- msg = "; not supported by SQLAlchemy reflection"
- util.warn(
- "autogenerate skipping functional index "
- f"{idx.name!r}{msg}"
- )
- metadata_indexes.discard(idx)
+ if not sqla_compat.sqla_2:
+ self._skip_functional_indexes(metadata_indexes, conn_indexes)
+
+ def _cleanup_index_expr(self, index: Index, expr: str) -> str:
+ # start = expr
+ expr = expr.lower()
+ expr = expr.replace('"', "")
+ if index.table is not None:
+ expr = expr.replace(f"{index.table.name.lower()}.", "")
+
+ while expr and expr[0] == "(" and expr[-1] == ")":
+ expr = expr[1:-1]
+ if "::" in expr:
+ # strip :: cast. types can have spaces in them
+ expr = re.sub(r"(::[\w ]+\w)", "", expr)
+
+ # print(f"START: {start} END: {expr}")
+ return expr
+
+ def create_index_sig(self, index: Index) -> Tuple[Any, ...]:
+ if sqla_compat.is_expression_index(index):
+ return tuple(
+ self._cleanup_index_expr(
+ index,
+ e
+ if isinstance(e, str)
+ else e.compile(
+ dialect=self.dialect,
+ compile_kwargs={"literal_binds": True},
+ ).string,
+ )
+ for e in index.expressions
+ )
+ else:
+ return super().create_index_sig(index)
def render_type(
self, type_: TypeEngine, autogen_context: AutogenContext
existing_transfer["expr"], new_type
)
+ def correct_for_autogen_constraints(
+ self,
+ conn_unique_constraints,
+ conn_indexes,
+ metadata_unique_constraints,
+ metadata_indexes,
+ ):
+
+ self._skip_functional_indexes(metadata_indexes, conn_indexes)
+
@compiles(RenameTable, "sqlite")
def visit_rename_table(
"SQLAlchemy 1.x test",
)
+ @property
+ def sqlalchemy_2(self):
+ return exclusions.skip_if(
+ lambda config: not util.sqla_2,
+ "SQLAlchemy 2.x test",
+ )
+
@property
def comments(self):
return exclusions.only_if(
class CompareIndex:
- def __init__(self, index):
+ def __init__(self, index, name_only=False):
self.index = index
+ self.name_only = name_only
def __eq__(self, other):
- return (
- str(schema.CreateIndex(self.index))
- == str(schema.CreateIndex(other))
- and self.index.dialect_kwargs == other.dialect_kwargs
- )
+ if self.name_only:
+ return self.index.name == other.name
+ else:
+ return (
+ str(schema.CreateIndex(self.index))
+ == str(schema.CreateIndex(other))
+ and self.index.dialect_kwargs == other.dialect_kwargs
+ )
def __ne__(self, other):
return not self.__eq__(other)
import types
from typing import Union
+from sqlalchemy.util import inspect_getfullargspec
+
def flag_combinations(*combinations):
"""A facade around @testing.combinations() oriented towards boolean
"""
+ pos_args = inspect_getfullargspec(__fn)[0]
+ pass_pos_args = {arg: kw.pop(arg) for arg in pos_args}
glb = dict(__fn.__globals__)
glb.update(kw)
new_fn = types.FunctionType(__fn.__code__, glb)
- return new_fn()
+ return new_fn(**pass_pos_args)
def metadata_fixture(ddl="function"):
from .sqla_compat import sqla_13
from .sqla_compat import sqla_14
from .sqla_compat import sqla_1x
+from .sqla_compat import sqla_2
if not sqla_13:
from sqlalchemy.schema import ForeignKeyConstraint
from sqlalchemy.sql import visitors
from sqlalchemy.sql.elements import BindParameter
+from sqlalchemy.sql.elements import ColumnClause
from sqlalchemy.sql.elements import quoted_name
from sqlalchemy.sql.elements import TextClause
+from sqlalchemy.sql.elements import UnaryExpression
from sqlalchemy.sql.visitors import traverse
if TYPE_CHECKING:
from sqlalchemy.sql.base import ColumnCollection
from sqlalchemy.sql.compiler import SQLCompiler
from sqlalchemy.sql.dml import Insert
- from sqlalchemy.sql.elements import ColumnClause
from sqlalchemy.sql.elements import ColumnElement
from sqlalchemy.sql.schema import Constraint
from sqlalchemy.sql.schema import SchemaItem
sqla_13 = _vers >= (1, 3)
sqla_14 = _vers >= (1, 4)
sqla_14_26 = _vers >= (1, 4, 26)
-sqla_2 = _vers >= (1, 5)
+sqla_2 = _vers >= (2,)
+sqlalchemy_version = __version__
if sqla_14:
def _select(*columns, **kw) -> Select: # type: ignore[no-redef]
return sql.select(list(columns), **kw) # type: ignore[call-overload]
+
+
+def is_expression_index(index: Index) -> bool:
+ expr: Any
+ for expr in index.expressions:
+ while isinstance(expr, UnaryExpression):
+ expr = expr.element
+ if not isinstance(expr, ColumnClause) or expr.is_literal:
+ return True
+ return False
--- /dev/null
+.. change::
+ :tags: bug, autogenerate, sqlite
+ :tickets: 1165
+
+ Fixed issue where indexes on SQLite which include SQL expressions would not
+ compare against themselves correctly, generating false positives.
+ SQLAlchemy as of version 2 has no support for reflecting expression based
+ indexes on SQLite; so for now, the behavior is that SQLite expression-based
+ indexes are ignored for autogenerate compare, in the same way that
+ PostgreSQL expression-based indexes were ignored for the time that
+ SQLAlchemy did not support reflection of such indexes (which is now
+ supported in SQLAlchemy 2.0 as well as this release of Alembic).
--- /dev/null
+.. change::
+ :tags: bug, autogenerate
+ :tickets: 1166
+
+ Fixed issue in index detection where autogenerate change detection would
+ consider indexes with the same columns but with different order as equal,
+ while in general they are not equivalent in how a database will use them.
--- /dev/null
+.. change::
+ :tags: usecase, autogenerate, postgresql
+
+ Added support for autogenerate comparison of indexes on PostgreSQL which
+ include SQL expressions; the previous warning that such indexes were
+ skipped is now removed. This functionality requires SQLAlchemy 2.0.
+ For older SQLAlchemy versions, these indexes are still skipped.
[tool:pytest]
-addopts= --tb native -v -r sfxX -p no:warnings -p no:logging --maxfail=25
+addopts= --tb native -v -r sfxX -p no:warnings -p no:logging --maxfail=100
python_files=tests/test_*.py
markers =
backend: tests that should run on all backends; typically dialect-sensitive
)
return imports + version + sqlalchemy
+
+ @property
+ def reflect_indexes_with_expressions(self):
+ sqlalchemy = exclusions.only_if(
+ lambda _: sqla_compat.sqla_2, "sqlalchemy 2 is required"
+ )
+
+ postgresql = exclusions.only_on(["postgresql"])
+
+ return sqlalchemy + postgresql
+
+ @property
+ def indexes_with_expressions(self):
+ return exclusions.only_on(["postgresql", "sqlite>=3.9.0"])
+from contextlib import nullcontext
+import itertools
+
from sqlalchemy import Column
+from sqlalchemy import Float
from sqlalchemy import ForeignKey
from sqlalchemy import ForeignKeyConstraint
+from sqlalchemy import func
from sqlalchemy import Index
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import UniqueConstraint
+from sqlalchemy.dialects.postgresql import DOUBLE_PRECISION
from sqlalchemy.sql.expression import column
from sqlalchemy.sql.expression import desc
+from alembic import testing
from alembic.testing import combinations
from alembic.testing import config
from alembic.testing import eq_
from alembic.testing import exclusions
+from alembic.testing import resolve_lambda
from alembic.testing import schemacompare
from alembic.testing import TestBase
from alembic.testing import util
+from alembic.testing.assertions import expect_warnings
from alembic.testing.env import staging_env
from alembic.testing.suite._autogen_fixtures import AutogenFixtureTest
from alembic.util import sqla_compat
diffs = self._fixture(m1, m2)
eq_(diffs, [])
+ def test_column_order_changed(self):
+ m1 = MetaData()
+ m2 = MetaData()
+
+ old = Index("SomeIndex", "x", "y")
+ Table(
+ "order_change",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer),
+ Column("y", Integer),
+ old,
+ )
+
+ new = Index("SomeIndex", "y", "x")
+ Table(
+ "order_change",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer),
+ Column("y", Integer),
+ new,
+ )
+ diffs = self._fixture(m1, m2)
+ eq_(
+ diffs,
+ [
+ ("remove_index", schemacompare.CompareIndex(old)),
+ ("add_index", schemacompare.CompareIndex(new)),
+ ],
+ )
+
+
+class AutogenerateExpressionIndexTest(AutogenFixtureTest, TestBase):
+ """tests involving indexes with expression"""
+
+ __requires__ = ("indexes_with_expressions",)
+
+ __backend__ = True
+
+ @property
+ def has_reflection(self):
+ return config.requirements.reflect_indexes_with_expressions.enabled
+
+ def test_expression_indexes_add(self):
+ m1 = MetaData()
+ m2 = MetaData()
+
+ Table(
+ "exp_index",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer),
+ Column("y", Integer),
+ )
+
+ idx = Index("SomeIndex", "y", func.lower(column("x"))) # noqa
+ Table(
+ "exp_index",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer),
+ Column("y", Integer),
+ idx,
+ )
+
+ if self.has_reflection:
+ diffs = self._fixture(m1, m2)
+ eq_(diffs, [("add_index", schemacompare.CompareIndex(idx))])
+ else:
+ with expect_warnings(
+ r"autogenerate skipping metadata-specified expression-based "
+ r"index 'SomeIndex'; dialect '.*' under SQLAlchemy .* "
+ r"can't reflect these "
+ r"indexes so they can't be compared",
+ ):
+ diffs = self._fixture(m1, m2)
+ eq_(diffs, [])
+
+ def _lots_of_indexes(flatten: bool = False):
+ diff_pairs = [
+ (
+ lambda t: Index("SomeIndex", "y", func.lower(t.c.x)),
+ lambda t: Index("SomeIndex", func.lower(t.c.x)),
+ ),
+ (
+ lambda CapT: Index("SomeIndex", "y", func.lower(CapT.c.XCol)),
+ lambda CapT: Index("SomeIndex", func.lower(CapT.c.XCol)),
+ ),
+ (
+ lambda t: Index(
+ "SomeIndex", "y", func.lower(column("x")), _table=t
+ ),
+ lambda t: Index(
+ "SomeIndex", func.lower(column("x")), _table=t
+ ),
+ ),
+ (
+ lambda t: Index("SomeIndex", t.c.y, func.lower(t.c.x)),
+ lambda t: Index("SomeIndex", func.lower(t.c.x), t.c.y),
+ ),
+ (
+ lambda t: Index("SomeIndex", t.c.y, func.lower(t.c.x)),
+ lambda t: Index("SomeIndex", t.c.y, func.lower(t.c.q)),
+ ),
+ (
+ lambda t: Index("SomeIndex", t.c.z, func.lower(t.c.x)),
+ lambda t: Index("SomeIndex", t.c.y, func.lower(t.c.x)),
+ ),
+ (
+ lambda t: Index("SomeIndex", func.lower(t.c.x)),
+ lambda t: Index("SomeIndex", t.c.y, func.lower(t.c.x)),
+ ),
+ (
+ lambda t: Index("SomeIndex", t.c.y, func.upper(t.c.x)),
+ lambda t: Index("SomeIndex", t.c.y, func.lower(t.c.x)),
+ ),
+ (
+ lambda t: Index("SomeIndex", t.c.y, t.c.ff + 1),
+ lambda t: Index("SomeIndex", t.c.y, t.c.ff + 3),
+ ),
+ (
+ lambda t: Index("SomeIndex", t.c.y, func.ceil(t.c.ff)),
+ lambda t: Index("SomeIndex", t.c.y, func.floor(t.c.ff)),
+ ),
+ (
+ lambda t: Index("SomeIndex", t.c.y, func.lower(t.c.x)),
+ lambda t: Index("SomeIndex", t.c.y, func.lower(t.c.x + t.c.q)),
+ ),
+ (
+ lambda t: Index("SomeIndex", t.c.y, t.c.z + 3),
+ lambda t: Index("SomeIndex", t.c.y, t.c.z * 3),
+ ),
+ (
+ lambda t: Index("SomeIndex", func.lower(t.c.x), t.c.q + "42"),
+ lambda t: Index("SomeIndex", func.lower(t.c.q), t.c.x + "42"),
+ ),
+ (
+ lambda t: Index("SomeIndex", func.lower(t.c.x), t.c.z + 42),
+ lambda t: Index("SomeIndex", t.c.z + 42, func.lower(t.c.q)),
+ ),
+ (
+ lambda t: Index("SomeIndex", t.c.ff + 42),
+ lambda t: Index("SomeIndex", 42 + t.c.ff),
+ ),
+ ]
+ if flatten:
+ return list(itertools.chain.from_iterable(diff_pairs))
+ else:
+ return diff_pairs
+
+ @testing.fixture
+ def index_changed_tables(self):
+ m1 = MetaData()
+ m2 = MetaData()
+
+ t_old = Table(
+ "exp_index",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("x", String(100)),
+ Column("y", String(100)),
+ Column("q", String(100)),
+ Column("z", Integer),
+ Column("ff", Float().with_variant(DOUBLE_PRECISION, "postgresql")),
+ )
+
+ t_new = Table(
+ "exp_index",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("x", String(100)),
+ Column("y", String(100)),
+ Column("q", String(100)),
+ Column("z", Integer),
+ Column("ff", Float().with_variant(DOUBLE_PRECISION, "postgresql")),
+ )
+
+ CapT_old = Table(
+ "CapT table",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("XCol", String(100)),
+ Column("y", String(100)),
+ )
+
+ CapT_new = Table(
+ "CapT table",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("XCol", String(100)),
+ Column("y", String(100)),
+ )
+
+ return (
+ m1,
+ m2,
+ {"t": t_old, "CapT": CapT_old},
+ {"t": t_new, "CapT": CapT_new},
+ )
+
+ @combinations(*_lots_of_indexes(), argnames="old_fn, new_fn")
+ def test_expression_indexes_changed(
+ self, index_changed_tables, old_fn, new_fn
+ ):
+ m1, m2, old_fixture_tables, new_fixture_tables = index_changed_tables
+
+ old, new = resolve_lambda(
+ old_fn, **old_fixture_tables
+ ), resolve_lambda(new_fn, **new_fixture_tables)
+
+ if self.has_reflection:
+ diffs = self._fixture(m1, m2)
+ eq_(
+ diffs,
+ [
+ ("remove_index", schemacompare.CompareIndex(old, True)),
+ ("add_index", schemacompare.CompareIndex(new)),
+ ],
+ )
+ else:
+ with expect_warnings(
+ r"Skipped unsupported reflection of expression-based index "
+ r"SomeIndex",
+ r"autogenerate skipping metadata-specified expression-based "
+ r"index 'SomeIndex'; dialect '.*' under SQLAlchemy .* "
+ r"can't reflect these "
+ r"indexes so they can't be compared",
+ ):
+ diffs = self._fixture(m1, m2)
+ eq_(diffs, [])
+
+ @combinations(*_lots_of_indexes(flatten=True), argnames="fn")
+ def test_expression_indexes_no_change(self, index_changed_tables, fn):
+ m1, m2, old_fixture_tables, new_fixture_tables = index_changed_tables
+
+ resolve_lambda(fn, **old_fixture_tables)
+ resolve_lambda(fn, **new_fixture_tables)
+
+ if self.has_reflection:
+ ctx = nullcontext()
+ else:
+ ctx = expect_warnings()
+
+ with ctx:
+ diffs = self._fixture(m1, m2)
+ eq_(diffs, [])
+
+ def test_expression_indexes_remove(self):
+ m1 = MetaData()
+ m2 = MetaData()
+
+ idx = Index("SomeIndex", "y", func.lower(column("x")))
+ Table(
+ "exp_index",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("x", String(100)),
+ Column("y", Integer),
+ idx,
+ )
+
+ Table(
+ "exp_index",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("x", String(100)),
+ Column("y", Integer),
+ )
+
+ if self.has_reflection:
+ diffs = self._fixture(m1, m2)
+ eq_(
+ diffs,
+ [("remove_index", schemacompare.CompareIndex(idx, True))],
+ )
+ else:
+ with expect_warnings():
+ diffs = self._fixture(m1, m2)
+ eq_(diffs, [])
+
class NoUqReflectionIndexTest(NoUqReflection, AutogenerateUniqueIndexTest):
__only_on__ = "sqlite"
from alembic.operations import ops
from alembic.script import ScriptDirectory
from alembic.testing import assert_raises_message
-from alembic.testing import assertions
from alembic.testing import combinations
from alembic.testing import config
from alembic.testing import eq_
eq_(diffs[0][0], "remove_constraint")
eq_(diffs[0][1].name, "uq_name")
eq_(len(diffs), 1)
-
- def _functional_index_warn(self):
- return (r"Skip.*refl",)
-
- def test_functional_ix_one(self):
- m1 = MetaData()
- m2 = MetaData()
-
- t1 = Table(
- "foo",
- m1,
- Column("id", Integer, primary_key=True),
- Column("email", String(50)),
- )
- Index("email_idx", func.lower(t1.c.email), unique=True)
-
- t2 = Table(
- "foo",
- m2,
- Column("id", Integer, primary_key=True),
- Column("email", String(50)),
- )
- Index("email_idx", func.lower(t2.c.email), unique=True)
-
- with assertions.expect_warnings(*self._functional_index_warn()):
- diffs = self._fixture(m1, m2)
- eq_(diffs, [])
-
- def test_functional_ix_two(self):
- m1 = MetaData()
- m2 = MetaData()
-
- t1 = Table(
- "foo",
- m1,
- Column("id", Integer, primary_key=True),
- Column("email", String(50)),
- Column("name", String(50)),
- )
- Index(
- "email_idx",
- func.coalesce(t1.c.email, t1.c.name).desc(),
- unique=True,
- )
-
- t2 = Table(
- "foo",
- m2,
- Column("id", Integer, primary_key=True),
- Column("email", String(50)),
- Column("name", String(50)),
- )
- Index(
- "email_idx",
- func.coalesce(t2.c.email, t2.c.name).desc(),
- unique=True,
- )
-
- with assertions.expect_warnings(*self._functional_index_warn()):
- diffs = self._fixture(m1, m2)
- eq_(diffs, [])
from sqlalchemy import DateTime
from sqlalchemy import Float
from sqlalchemy import func
+from sqlalchemy import Index
from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import MetaData
from alembic.testing import config
from alembic.testing import eq_
from alembic.testing import eq_ignore_whitespace
+from alembic.testing.assertions import expect_warnings
from alembic.testing.env import clear_staging_env
from alembic.testing.env import staging_env
from alembic.testing.fixtures import op_fixture
from alembic.testing.fixtures import TestBase
+from alembic.testing.suite._autogen_fixtures import AutogenFixtureTest
class SQLiteTest(TestBase):
"sa.Column('int_value', sa.Integer(), "
"nullable=True, sqlite_on_conflict_not_null='FAIL')",
)
+
+
+class SQLiteAutogenIndexTest(AutogenFixtureTest, TestBase):
+ __requires__ = ("indexes_with_expressions",)
+ __only_on__ = "sqlite"
+ __backend__ = True
+
+ def _functional_index_warn(self):
+ return (r"Skip.*refl",)
+
+ def test_functional_ix_one(self):
+ m1 = MetaData()
+ m2 = MetaData()
+
+ t1 = Table(
+ "foo",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("email", String(50)),
+ )
+ Index("email_idx", func.lower(t1.c.email), unique=True)
+
+ t2 = Table(
+ "foo",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("email", String(50)),
+ )
+ Index("email_idx", func.lower(t2.c.email), unique=True)
+
+ with expect_warnings(*self._functional_index_warn()):
+ diffs = self._fixture(m1, m2)
+ eq_(diffs, [])
+
+ def test_functional_ix_two(self):
+ m1 = MetaData()
+ m2 = MetaData()
+
+ t1 = Table(
+ "foo",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("email", String(50)),
+ Column("name", String(50)),
+ )
+ Index(
+ "email_idx",
+ func.coalesce(t1.c.email, t1.c.name).desc(),
+ unique=True,
+ )
+
+ t2 = Table(
+ "foo",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("email", String(50)),
+ Column("name", String(50)),
+ )
+ Index(
+ "email_idx",
+ func.coalesce(t2.c.email, t2.c.name).desc(),
+ unique=True,
+ )
+
+ with expect_warnings(*self._functional_index_warn()):
+ diffs = self._fixture(m1, m2)
+ eq_(diffs, [])