conn_names = {
c.name: c
for c in conn_unique_constraints.union(conn_indexes_sig)
- if sqla_compat.constraint_name_defined(c.name)
+ if sqla_compat.constraint_name_string(c.name)
}
doubled_constraints = {
from ..util.sqla_compat import _resolve_for_variant
from ..util.sqla_compat import _select
from ..util.sqla_compat import constraint_name_defined
+from ..util.sqla_compat import constraint_name_string
if TYPE_CHECKING:
from typing import Literal
# because
# we have no way to determine _is_type_bound() for these.
pass
- elif constraint_name_defined(const.name):
+ elif constraint_name_string(const.name):
self.named_constraints[const.name] = const
else:
self.unnamed_constraints.append(const)
if self.table.primary_key in self.unnamed_constraints:
self.unnamed_constraints.remove(self.table.primary_key)
- self.named_constraints[const.name] = const
+ if constraint_name_string(const.name):
+ self.named_constraints[const.name] = const
+ else:
+ self.unnamed_constraints.append(const)
def drop_constraint(self, const: Constraint) -> None:
if not const.name:
for col_const in list(self.columns[col.name].constraints):
if col_const.name == const.name:
self.columns[col.name].constraints.remove(col_const)
- else:
- assert constraint_name_defined(const.name)
+ elif constraint_name_string(const.name):
const = self.named_constraints.pop(const.name)
+ elif const in self.unnamed_constraints:
+ self.unnamed_constraints.remove(const)
+
except KeyError:
if _is_type_bound(const):
# type-bound constraints are only included in the new
def __init__(
self,
- constraint_name: Optional[str],
+ constraint_name: Optional[sqla_compat._ConstraintNameDefined],
table_name: str,
type_: Optional[str] = None,
schema: Optional[str] = None,
def __init__(
self,
- constraint_name: Optional[str],
+ constraint_name: Optional[sqla_compat._ConstraintNameDefined],
table_name: str,
columns: Sequence[str],
schema: Optional[str] = None,
def __init__(
self,
- constraint_name: Optional[str],
+ constraint_name: Optional[sqla_compat._ConstraintNameDefined],
table_name: str,
columns: Sequence[str],
schema: Optional[str] = None,
def __init__(
self,
- constraint_name: Optional[str],
+ constraint_name: Optional[sqla_compat._ConstraintNameDefined],
source_table: str,
referent_table: str,
local_cols: List[str],
def __init__(
self,
- constraint_name: Optional[str],
+ constraint_name: Optional[sqla_compat._ConstraintNameDefined],
table_name: str,
condition: Union[str, TextClause, ColumnElement[Any]],
schema: Optional[str] = None,
def primary_key_constraint(
self,
- name: Optional[str],
+ name: Optional[sqla_compat._ConstraintNameDefined],
table_name: str,
cols: Sequence[str],
schema: Optional[str] = None,
m = self.metadata()
columns = [sa_schema.Column(n, NULLTYPE) for n in cols]
t = sa_schema.Table(table_name, m, *columns, schema=schema)
+ # SQLAlchemy primary key constraint name arg is wrongly typed on
+ # the SQLAlchemy side through 2.0.5 at least
p = sa_schema.PrimaryKeyConstraint(
- *[t.c[n] for n in cols], name=name, **dialect_kw
+ *[t.c[n] for n in cols], name=name, **dialect_kw # type: ignore
)
return p
def foreign_key_constraint(
self,
- name: Optional[str],
+ name: Optional[sqla_compat._ConstraintNameDefined],
source: str,
referent: str,
local_cols: List[str],
def unique_constraint(
self,
- name: Optional[str],
+ name: Optional[sqla_compat._ConstraintNameDefined],
source: str,
local_cols: Sequence[str],
schema: Optional[str] = None,
def check_constraint(
self,
- name: Optional[str],
+ name: Optional[sqla_compat._ConstraintNameDefined],
source: str,
condition: Union[str, TextClause, ColumnElement[Any]],
schema: Optional[str] = None,
def generic_constraint(
self,
- name: Optional[str],
+ name: Optional[sqla_compat._ConstraintNameDefined],
table_name: str,
type_: Optional[str],
schema: Optional[str] = None,
from sqlalchemy.sql.elements import quoted_name
from sqlalchemy.sql.elements import TextClause
from sqlalchemy.sql.elements import UnaryExpression
+from sqlalchemy.sql.naming import _NONE_NAME as _NONE_NAME
from sqlalchemy.sql.visitors import traverse
from typing_extensions import TypeGuard
else:
from sqlalchemy.util import symbol as _NoneName # type: ignore[assignment]
+
_ConstraintName = Union[None, str, _NoneName]
+_ConstraintNameDefined = Union[str, _NoneName]
+
+
+def constraint_name_defined(
+ name: _ConstraintName,
+) -> TypeGuard[_ConstraintNameDefined]:
+ return name is _NONE_NAME or isinstance(name, (str, _NoneName))
-def constraint_name_defined(name: _ConstraintName) -> TypeGuard[str]:
+
+def constraint_name_string(
+ name: _ConstraintName,
+) -> TypeGuard[str]:
return isinstance(name, str)
-def constraint_name_or_none(name: _ConstraintName) -> Optional[str]:
- return name if constraint_name_defined(name) else None
+def constraint_name_or_none(
+ name: _ConstraintName,
+) -> Optional[str]:
+ return name if constraint_name_string(name) else None
AUTOINCREMENT_DEFAULT = "auto"
--- /dev/null
+.. change::
+ :tags: bug, batch, regression
+ :tickets: 1195
+
+ Fixed regression for 1.10.0 where :class:`.Constraint` objects were
+ suddenly required to have non-None name fields when using batch mode, which
+ was not previously a requirement.
from alembic.testing.fixtures import op_fixture
from alembic.util import CommandError
from alembic.util import exc as alembic_exc
+from alembic.util.sqla_compat import _NONE_NAME
from alembic.util.sqla_compat import _safe_commit_connection_transaction
from alembic.util.sqla_compat import _select
from alembic.util.sqla_compat import has_computed
ddl_not_contains="CONSTRAINT uq1 UNIQUE",
)
+ def test_add_ck_unnamed(self):
+ """test for #1195"""
+ impl = self._simple_fixture()
+ ck = self.op.schema_obj.check_constraint(_NONE_NAME, "tname", "y > 5")
+
+ impl.add_constraint(ck)
+ self._assert_impl(
+ impl,
+ colnames=["id", "x", "y"],
+ ddl_contains="CHECK (y > 5)",
+ )
+
def test_add_ck(self):
impl = self._simple_fixture()
ck = self.op.schema_obj.check_constraint("ck1", "tname", "y > 5")
t = Table("hasbool", self.metadata, Column("x", Integer))
t.create(self.conn)
+ def test_add_constraint_type(self):
+ """test for #1195."""
+
+ with self.op.batch_alter_table("foo") as batch_op:
+ batch_op.add_column(Column("q", Boolean(create_constraint=True)))
+ insp = inspect(self.conn)
+
+ assert {
+ c["type"]._type_affinity
+ for c in insp.get_columns("foo")
+ if c["name"] == "q"
+ }.intersection([Boolean, Integer])
+
def test_change_type_boolean_to_int(self):
self._boolean_fixture()
with self.op.batch_alter_table("hasbool") as batch_op: