is_index = False
is_unique = True
- def __init__(self, const: UniqueConstraint) -> None:
+ def __init__(self, const: UniqueConstraint, impl: DefaultImpl) -> None:
self.const = const
self.name = const.name
- self.sig = ("UNIQUE_CONSTRAINT",) + tuple(
- sorted([col.name for col in const.columns])
+ self.sig = ("UNIQUE_CONSTRAINT",) + impl.create_unique_constraint_sig(
+ const
)
@property
# 2a. if the dialect dupes unique indexes as unique constraints
# (mysql and oracle), correct for that
+ impl = autogen_context.migration_context.impl
if unique_constraints_duplicate_unique_indexes:
_correct_for_uq_duplicates_uix(
conn_uniques,
metadata_unique_constraints,
metadata_indexes,
autogen_context.dialect,
+ impl,
)
# 3. give the dialect a chance to omit indexes and constraints that
# Index and UniqueConstraint so we can easily work with them
# interchangeably
metadata_unique_constraints_sig = {
- _uq_constraint_sig(uq) for uq in metadata_unique_constraints
+ _uq_constraint_sig(uq, impl) for uq in metadata_unique_constraints
}
- impl = autogen_context.migration_context.impl
metadata_indexes_sig = {
_ix_constraint_sig(ix, impl) for ix in metadata_indexes
}
- conn_unique_constraints = {_uq_constraint_sig(uq) for uq in conn_uniques}
+ conn_unique_constraints = {
+ _uq_constraint_sig(uq, impl) for uq in conn_uniques
+ }
conn_indexes_sig = {_ix_constraint_sig(ix, impl) for ix in conn_indexes}
metadata_unique_constraints,
metadata_indexes,
dialect,
+ impl,
):
# dedupe unique indexes vs. constraints, since MySQL / Oracle
# doesn't really have unique constraints as a separate construct.
}
unnamed_metadata_uqs = {
- _uq_constraint_sig(cons).sig
+ _uq_constraint_sig(cons, impl).sig
for name, cons in metadata_cons_names
if name is None
}
for overlap in uqs_dupe_indexes:
if overlap not in metadata_uq_names:
if (
- _uq_constraint_sig(uqs_dupe_indexes[overlap]).sig
+ _uq_constraint_sig(uqs_dupe_indexes[overlap], impl).sig
not in unnamed_metadata_uqs
):
conn_unique_constraints.discard(uqs_dupe_indexes[overlap])
if TYPE_CHECKING:
from typing import Literal
+ from sqlalchemy.sql.base import DialectKWArgs
from sqlalchemy.sql.elements import ColumnElement
from sqlalchemy.sql.elements import TextClause
from sqlalchemy.sql.schema import CheckConstraint
return text
+def _render_dialect_kwargs_items(
+ autogen_context: AutogenContext, item: DialectKWArgs
+) -> list[str]:
+ return [
+ f"{key}={_render_potential_expr(val, autogen_context)}"
+ for key, val in item.dialect_kwargs.items()
+ ]
+
+
@renderers.dispatch_for(ops.CreateIndexOp)
def _add_index(autogen_context: AutogenContext, op: ops.CreateIndexOp) -> str:
index = op.to_index()
)
assert index.table is not None
+
+ opts = _render_dialect_kwargs_items(autogen_context, index)
text = tmpl % {
"prefix": _alembic_autogenerate_prefix(autogen_context),
"name": _render_gen_name(autogen_context, index.name),
"schema": (", schema=%r" % _ident(index.table.schema))
if index.table.schema
else "",
- "kwargs": (
- ", "
- + ", ".join(
- [
- "%s=%s"
- % (key, _render_potential_expr(val, autogen_context))
- for key, val in index.kwargs.items()
- ]
- )
- )
- if len(index.kwargs)
- else "",
+ "kwargs": ", " + ", ".join(opts) if opts else "",
}
return text
"%(prefix)sdrop_index(%(name)r, "
"table_name=%(table_name)r%(schema)s%(kwargs)s)"
)
-
+ opts = _render_dialect_kwargs_items(autogen_context, index)
text = tmpl % {
"prefix": _alembic_autogenerate_prefix(autogen_context),
"name": _render_gen_name(autogen_context, op.index_name),
"table_name": _ident(op.table_name),
"schema": ((", schema=%r" % _ident(op.schema)) if op.schema else ""),
- "kwargs": (
- ", "
- + ", ".join(
- [
- "%s=%s"
- % (key, _render_potential_expr(val, autogen_context))
- for key, val in index.kwargs.items()
- ]
- )
- )
- if len(index.kwargs)
- else "",
+ "kwargs": ", " + ", ".join(opts) if opts else "",
}
return text
opts.append(
("name", _render_gen_name(autogen_context, constraint.name))
)
+ dialect_options = _render_dialect_kwargs_items(autogen_context, constraint)
if alter:
args = [repr(_render_gen_name(autogen_context, constraint.name))]
args += [repr(_ident(constraint.table.name))]
args.append(repr([_ident(col.name) for col in constraint.columns]))
args.extend(["%s=%r" % (k, v) for k, v in opts])
+ args.extend(dialect_options)
return "%(prefix)screate_unique_constraint(%(args)s)" % {
"prefix": _alembic_autogenerate_prefix(autogen_context),
"args": ", ".join(args),
else:
args = [repr(_ident(col.name)) for col in constraint.columns]
args.extend(["%s=%r" % (k, v) for k, v in opts])
+ args.extend(dialect_options)
return "%(prefix)sUniqueConstraint(%(args)s)" % {
"prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
"args": ", ".join(args),
# order of col matters in an index
return tuple(col.name for col in index.columns)
+ def create_unique_constraint_sig(
+ self, const: UniqueConstraint
+ ) -> Tuple[Any, ...]:
+ # order of col does not matters in an unique constraint
+ return tuple(sorted([col.name for col in const.columns]))
+
def _skip_functional_indexes(self, metadata_indexes, conn_indexes):
conn_indexes_by_name = {c.name: c for c in conn_indexes}
from typing import Union
from sqlalchemy import Column
-from sqlalchemy import Index
from sqlalchemy import literal_column
from sqlalchemy import Numeric
from sqlalchemy import text
if TYPE_CHECKING:
from typing import Literal
+ from sqlalchemy import Index
+ from sqlalchemy import UniqueConstraint
from sqlalchemy.dialects.postgresql.array import ARRAY
from sqlalchemy.dialects.postgresql.base import PGDDLCompiler
from sqlalchemy.dialects.postgresql.hstore import HSTORE
break
return to_remove
+ def _dialect_sig(
+ self, item: Union[Index, UniqueConstraint]
+ ) -> Tuple[Any, ...]:
+ if (
+ item.dialect_kwargs.get("postgresql_nulls_not_distinct")
+ is not None
+ ):
+ return (
+ (
+ "nulls_not_distinct",
+ item.dialect_kwargs["postgresql_nulls_not_distinct"],
+ ),
+ )
+ return ()
+
def create_index_sig(self, index: Index) -> Tuple[Any, ...]:
return tuple(
self._cleanup_index_expr(
),
)
for e in index.expressions
- )
+ ) + self._dialect_sig(index)
+
+ def create_unique_constraint_sig(
+ self, const: UniqueConstraint
+ ) -> Tuple[Any, ...]:
+ return tuple(
+ sorted([col.name for col in const.columns])
+ ) + self._dialect_sig(const)
def _compile_element(self, element: ClauseElement) -> str:
return element.compile(
@classmethod
def register_operation(
cls, name: str, sourcename: Optional[str] = None
- ) -> Callable[..., Any]:
+ ) -> Callable[[_T], _T]:
"""Register a new operation for this class.
This method is normally used to add new operations
--- /dev/null
+.. change::
+ :tags: usecase, autogenerate
+ :tickets: 1248
+
+ Added support in autogenerate for NULLS NOT DISTINCT in
+ the PostgreSQL dialect.
from sqlalchemy import exc as sqla_exc
+from sqlalchemy import Index
from sqlalchemy import text
from alembic.testing import exclusions
@property
def indexes_with_expressions(self):
return exclusions.only_on(["postgresql", "sqlite>=3.9.0"])
+
+ @property
+ def nulls_not_distinct_sa(self):
+ def _has_nulls_not_distinct():
+ try:
+ Index("foo", "bar", postgresql_nulls_not_distinct=True)
+ return True
+ except sqla_exc.ArgumentError:
+ return False
+
+ return exclusions.only_if(
+ _has_nulls_not_distinct,
+ "sqlalchemy with nulls not distinct support needed",
+ )
+
+ @property
+ def nulls_not_distinct_db(self):
+ return self.nulls_not_distinct_sa + exclusions.only_on(
+ ["postgresql>=15"]
+ )
"postgresql.JSONB(astext_type=sa.Text())",
)
+ @config.requirements.nulls_not_distinct_sa
+ def test_render_unique_nulls_not_distinct_constraint(self):
+ m = MetaData()
+ t = Table("tbl", m, Column("c", Integer))
+ uc = UniqueConstraint(
+ t.c.c,
+ name="uq_1",
+ deferrable="XYZ",
+ postgresql_nulls_not_distinct=True,
+ )
+ eq_ignore_whitespace(
+ autogenerate.render.render_op_text(
+ self.autogen_context,
+ ops.AddConstraintOp.from_constraint(uc),
+ ),
+ "op.create_unique_constraint('uq_1', 'tbl', ['c'], "
+ "deferrable='XYZ', postgresql_nulls_not_distinct=True)",
+ )
+ eq_ignore_whitespace(
+ autogenerate.render._render_unique_constraint(
+ uc, self.autogen_context, None
+ ),
+ "sa.UniqueConstraint('c', deferrable='XYZ', name='uq_1', "
+ "postgresql_nulls_not_distinct=True)",
+ )
+
+ @config.requirements.nulls_not_distinct_sa
+ def test_render_index_nulls_not_distinct_constraint(self):
+ m = MetaData()
+ t = Table("tbl", m, Column("c", Integer))
+ idx = Index("ix_42", t.c.c, postgresql_nulls_not_distinct=False)
+ eq_ignore_whitespace(
+ autogenerate.render.render_op_text(
+ self.autogen_context, ops.CreateIndexOp.from_index(idx)
+ ),
+ "op.create_index('ix_42', 'tbl', ['c'], unique=False, "
+ "postgresql_nulls_not_distinct=False)",
+ )
+
class PGUniqueIndexAutogenerateTest(AutogenFixtureTest, TestBase):
__only_on__ = "postgresql"
eq_(diffs[0][0], "remove_constraint")
eq_(diffs[0][1].name, "uq_name")
eq_(len(diffs), 1)
+
+
+case = combinations(False, True, None, argnames="case", id_="s")
+name_type = combinations(
+ (
+ "index",
+ lambda value: Index(
+ "nnd_obj", "name", unique=True, postgresql_nulls_not_distinct=value
+ ),
+ ),
+ (
+ "constraint",
+ lambda value: UniqueConstraint(
+ "id", "name", name="nnd_obj", postgresql_nulls_not_distinct=value
+ ),
+ ),
+ argnames="name,type_",
+ id_="sa",
+)
+
+
+class PGNullsNotDistinctAutogenerateTest(AutogenFixtureTest, TestBase):
+ __requires__ = ("nulls_not_distinct_db",)
+ __only_on__ = "postgresql"
+ __backend__ = True
+
+ @case
+ @name_type
+ def test_add(self, case, name, type_):
+ m1 = MetaData()
+ m2 = MetaData()
+ Table(
+ "tbl",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("name", String),
+ )
+ Table(
+ "tbl",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("name", String),
+ type_(case),
+ )
+ diffs = self._fixture(m1, m2)
+ eq_(len(diffs), 1)
+ eq_(diffs[0][0], f"add_{name}")
+ added = diffs[0][1]
+ eq_(added.name, "nnd_obj")
+ eq_(added.dialect_kwargs["postgresql_nulls_not_distinct"], case)
+
+ @case
+ @name_type
+ def test_remove(self, case, name, type_):
+ m1 = MetaData()
+ m2 = MetaData()
+ Table(
+ "tbl",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("name", String),
+ type_(case),
+ )
+ Table(
+ "tbl",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("name", String),
+ )
+ diffs = self._fixture(m1, m2)
+ eq_(len(diffs), 1)
+ eq_(diffs[0][0], f"remove_{name}")
+ eq_(diffs[0][1].name, "nnd_obj")
+
+ @case
+ @name_type
+ def test_toggle_not_distinct(self, case, name, type_):
+ m1 = MetaData()
+ m2 = MetaData()
+ to = not case
+ Table(
+ "tbl",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("name", String),
+ type_(case),
+ )
+ Table(
+ "tbl",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("name", String),
+ type_(to),
+ )
+ diffs = self._fixture(m1, m2)
+ eq_(len(diffs), 2)
+ eq_(diffs[0][0], f"remove_{name}")
+ eq_(diffs[1][0], f"add_{name}")
+ eq_(diffs[1][1].name, "nnd_obj")
+ eq_(diffs[1][1].dialect_kwargs["postgresql_nulls_not_distinct"], to)