--- /dev/null
+.. change::
+ :tags: bug, schema
+ :tickets: 5919
+
+ Repaired / implemented support for primary key constraint naming
+ conventions that use column names/keys/etc as part of the convention. In
+ particular, this includes that the :class:`.PrimaryKeyConstraint` object
+ that's automatically associated with a :class:`.schema.Table` will update
+ its name as new primary key :class:`_schema.Column` objects are added to
+ the table and then to the constraint. Internal failure modes related to
+ this constraint construction process including no columns present, no name
+ present or blank name present are now accommodated.
def _is_event_name(name):
- return not name.startswith("_") and name != "dispatch"
+ # _sa_event prefix is special to support internal-only event names.
+ # most event names are just plain method names that aren't
+ # underscored.
+
+ return (
+ not name.startswith("_") and name != "dispatch"
+ ) or name.startswith("_sa_event")
class _UnpickleDispatch(object):
"""
+ def _sa_event_column_added_to_pk_constraint(self, const, col):
+ """internal event hook used for primary key naming convention
+ updates.
+
+ """
+
def column_reflect(self, inspector, table, column_info):
"""Called for each unit of 'column info' retrieved when
a :class:`_schema.Table` is being reflected.
def _key_table_name(self):
return self.table.name
- def _column_X(self, idx):
+ def _column_X(self, idx, attrname):
if self._is_fk:
- fk = self.const.elements[idx]
- return fk.parent
+ try:
+ fk = self.const.elements[idx]
+ except IndexError:
+ return ""
+ else:
+ return getattr(fk.parent, attrname)
else:
- return list(self.const.columns)[idx]
+ cols = list(self.const.columns)
+ try:
+ col = cols[idx]
+ except IndexError:
+ return ""
+ else:
+ return getattr(col, attrname)
def _key_constraint_name(self):
if isinstance(self._const_name, (type(None), _defer_none_name)):
# note this method was missing before
# [ticket:3989], meaning tokens like ``%(column_0_key)s`` weren't
# working even though documented.
- return self._column_X(idx).key
+ return self._column_X(idx, "key")
def _key_column_X_name(self, idx):
- return self._column_X(idx).name
+ return self._column_X(idx, "name")
def _key_column_X_label(self, idx):
- return self._column_X(idx)._ddl_label
+ return self._column_X(idx, "_ddl_label")
def _key_referred_table_name(self):
fk = self.const.elements[0]
return None
+@event.listens_for(
+ PrimaryKeyConstraint, "_sa_event_column_added_to_pk_constraint"
+)
+def _column_added_to_pk_constraint(pk_constraint, col):
+ if pk_constraint._implicit_generated:
+ # only operate upon the "implicit" pk constraint for now,
+ # as we have to force the name to None to reset it. the
+ # "implicit" constraint will only have a naming convention name
+ # if at all.
+ table = pk_constraint.table
+ pk_constraint.name = None
+ newname = _constraint_name_for_table(pk_constraint, table)
+ if newname:
+ pk_constraint.name = newname
+
+
@event.listens_for(Constraint, "after_parent_attach")
@event.listens_for(Index, "after_parent_attach")
def _constraint_name(const, table):
if isinstance(table, Column):
+ # this path occurs for a CheckConstraint linked to a Column
+
# for column-attached constraint, set another event
# to link the column attached to the table as this constraint
# associated with the table.
"after_parent_attach",
lambda col, table: _constraint_name(const, table),
)
+
elif isinstance(table, Table):
if isinstance(const.name, (conv, _defer_name)):
return
newname = _constraint_name_for_table(const, table)
- if newname is not None:
+ if newname:
const.name = newname
table._columns.replace(self)
+ self.table = table
+
if self.primary_key:
table.primary_key._replace(self)
elif self.key in table.primary_key:
% (self.key, table.fullname)
)
- self.table = table
-
if self.index:
if isinstance(self.index, util.string_types):
raise exc.ArgumentError(
are already present.
"""
-
# set the primary key flag on new columns.
# note any existing PK cols on the table also have their
# flag still set.
PrimaryKeyConstraint._autoincrement_column._reset(self)
self.columns.replace(col)
+ self.dispatch._sa_event_column_added_to_pk_constraint(self, col)
+
@property
def columns_autoinc_first(self):
autoinc = self._autoincrement_column
AddConstraint(const[0]), "ALTER TABLE foo ADD UNIQUE (id)"
)
+ @testing.combinations(
+ ("nopk",),
+ ("column",),
+ ("constraint",),
+ ("explicit_name",),
+ argnames="pktype",
+ )
+ @testing.combinations(
+ ("pk_%(table_name)s", "pk_t1"),
+ ("pk_%(column_0_name)s", "pk_x"),
+ ("pk_%(column_0_N_name)s", "pk_x_y"),
+ ("pk_%(column_0_N_label)s", "pk_t1_x_t1_y"),
+ ("%(column_0_name)s", "x"),
+ ("%(column_0N_name)s", "xy"),
+ argnames="conv, expected_name",
+ )
+ def test_pk_conventions(self, conv, expected_name, pktype):
+ m1 = MetaData(naming_convention={"pk": conv})
+
+ if pktype == "column":
+ t1 = Table(
+ "t1",
+ m1,
+ Column("x", Integer, primary_key=True),
+ Column("y", Integer, primary_key=True),
+ )
+ elif pktype == "constraint":
+ t1 = Table(
+ "t1",
+ m1,
+ Column("x", Integer),
+ Column("y", Integer),
+ PrimaryKeyConstraint("x", "y"),
+ )
+ elif pktype == "nopk":
+ t1 = Table(
+ "t1",
+ m1,
+ Column("x", Integer, nullable=False),
+ Column("y", Integer, nullable=False),
+ )
+ expected_name = None
+ elif pktype == "explicit_name":
+ t1 = Table(
+ "t1",
+ m1,
+ Column("x", Integer, primary_key=True),
+ Column("y", Integer, primary_key=True),
+ PrimaryKeyConstraint("x", "y", name="myname"),
+ )
+ expected_name = "myname"
+
+ if expected_name:
+ eq_(t1.primary_key.name, expected_name)
+
+ if pktype == "nopk":
+ self.assert_compile(
+ schema.CreateTable(t1),
+ "CREATE TABLE t1 (x INTEGER NOT NULL, y INTEGER NOT NULL)",
+ )
+ else:
+ self.assert_compile(
+ schema.CreateTable(t1),
+ "CREATE TABLE t1 (x INTEGER NOT NULL, y INTEGER NOT NULL, "
+ "CONSTRAINT %s PRIMARY KEY (x, y))" % expected_name,
+ )
+
def test_uq_name(self):
u1 = self._fixture(
naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"}