From: Mike Bayer Date: Thu, 4 Feb 2021 17:09:54 +0000 (-0500) Subject: Accommodate column-based naming conventions for pk constraint X-Git-Tag: rel_1_3_24~26 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=d997caa15ccf3d7a56d2c02710c6c3a53287282e;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Accommodate column-based naming conventions for pk constraint Repaired / implemented support for primary key constraint naming conventions that use column names/keys/etc as part of the convention. In particular, this includes that the :class:`.PrimaryKeyConstraint` object that's automatically associated with a :class:`.schema.Table` will update its name as new primary key :class:`_schema.Column` objects are added to the table and then to the constraint. Internal failure modes related to this constraint construction process including no columns present, no name present or blank name present are now accommodated. Fixes: #5919 Change-Id: Ic2800b50f4a4cd5978bec48cefea0a2e198e0123 (cherry picked from commit 9660e94e35be438b0d51cd87e6ccb4047a332c15) --- diff --git a/doc/build/changelog/unreleased_13/5919.rst b/doc/build/changelog/unreleased_13/5919.rst new file mode 100644 index 0000000000..ddae6c2b03 --- /dev/null +++ b/doc/build/changelog/unreleased_13/5919.rst @@ -0,0 +1,12 @@ +.. change:: + :tags: bug, schema + :tickets: 5919 + + Repaired / implemented support for primary key constraint naming + conventions that use column names/keys/etc as part of the convention. In + particular, this includes that the :class:`.PrimaryKeyConstraint` object + that's automatically associated with a :class:`.schema.Table` will update + its name as new primary key :class:`_schema.Column` objects are added to + the table and then to the constraint. Internal failure modes related to + this constraint construction process including no columns present, no name + present or blank name present are now accommodated. diff --git a/lib/sqlalchemy/event/base.py b/lib/sqlalchemy/event/base.py index 56d14ce0e8..05c24730c3 100644 --- a/lib/sqlalchemy/event/base.py +++ b/lib/sqlalchemy/event/base.py @@ -29,7 +29,13 @@ _registrars = util.defaultdict(list) def _is_event_name(name): - return not name.startswith("_") and name != "dispatch" + # _sa_event prefix is special to support internal-only event names. + # most event names are just plain method names that aren't + # underscored. + + return ( + not name.startswith("_") and name != "dispatch" + ) or name.startswith("_sa_event") class _UnpickleDispatch(object): diff --git a/lib/sqlalchemy/events.py b/lib/sqlalchemy/events.py index 2a97432ad6..40a6e90ebc 100644 --- a/lib/sqlalchemy/events.py +++ b/lib/sqlalchemy/events.py @@ -216,6 +216,12 @@ class DDLEvents(event.Events): """ + def _sa_event_column_added_to_pk_constraint(self, const, col): + """internal event hook used for primary key naming convention + updates. + + """ + def column_reflect(self, inspector, table, column_info): """Called for each unit of 'column info' retrieved when a :class:`_schema.Table` is being reflected. diff --git a/lib/sqlalchemy/sql/naming.py b/lib/sqlalchemy/sql/naming.py index ba1eaf9f22..b727b51ce6 100644 --- a/lib/sqlalchemy/sql/naming.py +++ b/lib/sqlalchemy/sql/naming.py @@ -39,12 +39,22 @@ class ConventionDict(object): def _key_table_name(self): return self.table.name - def _column_X(self, idx): + def _column_X(self, idx, attrname): if self._is_fk: - fk = self.const.elements[idx] - return fk.parent + try: + fk = self.const.elements[idx] + except IndexError: + return "" + else: + return getattr(fk.parent, attrname) else: - return list(self.const.columns)[idx] + cols = list(self.const.columns) + try: + col = cols[idx] + except IndexError: + return "" + else: + return getattr(col, attrname) def _key_constraint_name(self): if isinstance(self._const_name, (type(None), _defer_none_name)): @@ -61,13 +71,13 @@ class ConventionDict(object): # note this method was missing before # [ticket:3989], meaning tokens like ``%(column_0_key)s`` weren't # working even though documented. - return self._column_X(idx).key + return self._column_X(idx, "key") def _key_column_X_name(self, idx): - return self._column_X(idx).name + return self._column_X(idx, "name") def _key_column_X_label(self, idx): - return self._column_X(idx)._ddl_label + return self._column_X(idx, "_ddl_label") def _key_referred_table_name(self): fk = self.const.elements[0] @@ -161,10 +171,28 @@ def _constraint_name_for_table(const, table): return None +@event.listens_for( + PrimaryKeyConstraint, "_sa_event_column_added_to_pk_constraint" +) +def _column_added_to_pk_constraint(pk_constraint, col): + if pk_constraint._implicit_generated: + # only operate upon the "implicit" pk constraint for now, + # as we have to force the name to None to reset it. the + # "implicit" constraint will only have a naming convention name + # if at all. + table = pk_constraint.table + pk_constraint.name = None + newname = _constraint_name_for_table(pk_constraint, table) + if newname: + pk_constraint.name = newname + + @event.listens_for(Constraint, "after_parent_attach") @event.listens_for(Index, "after_parent_attach") def _constraint_name(const, table): if isinstance(table, Column): + # this path occurs for a CheckConstraint linked to a Column + # for column-attached constraint, set another event # to link the column attached to the table as this constraint # associated with the table. @@ -173,10 +201,11 @@ def _constraint_name(const, table): "after_parent_attach", lambda col, table: _constraint_name(const, table), ) + elif isinstance(table, Table): if isinstance(const.name, (conv, _defer_name)): return newname = _constraint_name_for_table(const, table) - if newname is not None: + if newname: const.name = newname diff --git a/lib/sqlalchemy/sql/schema.py b/lib/sqlalchemy/sql/schema.py index 191246f460..e6ab8efb53 100644 --- a/lib/sqlalchemy/sql/schema.py +++ b/lib/sqlalchemy/sql/schema.py @@ -1665,6 +1665,8 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause): table._columns.replace(self) + self.table = table + if self.primary_key: table.primary_key._replace(self) elif self.key in table.primary_key: @@ -1674,8 +1676,6 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause): % (self.key, table.fullname) ) - self.table = table - if self.index: if isinstance(self.index, util.string_types): raise exc.ArgumentError( @@ -3747,7 +3747,6 @@ class PrimaryKeyConstraint(ColumnCollectionConstraint): are already present. """ - # set the primary key flag on new columns. # note any existing PK cols on the table also have their # flag still set. @@ -3763,6 +3762,8 @@ class PrimaryKeyConstraint(ColumnCollectionConstraint): PrimaryKeyConstraint._autoincrement_column._reset(self) self.columns.replace(col) + self.dispatch._sa_event_column_added_to_pk_constraint(self, col) + @property def columns_autoinc_first(self): autoinc = self._autoincrement_column diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py index cd86d1f1c0..12723a1af1 100644 --- a/test/sql/test_metadata.py +++ b/test/sql/test_metadata.py @@ -4689,6 +4689,73 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): AddConstraint(const[0]), "ALTER TABLE foo ADD UNIQUE (id)" ) + @testing.combinations( + ("nopk",), + ("column",), + ("constraint",), + ("explicit_name",), + argnames="pktype", + ) + @testing.combinations( + ("pk_%(table_name)s", "pk_t1"), + ("pk_%(column_0_name)s", "pk_x"), + ("pk_%(column_0_N_name)s", "pk_x_y"), + ("pk_%(column_0_N_label)s", "pk_t1_x_t1_y"), + ("%(column_0_name)s", "x"), + ("%(column_0N_name)s", "xy"), + argnames="conv, expected_name", + ) + def test_pk_conventions(self, conv, expected_name, pktype): + m1 = MetaData(naming_convention={"pk": conv}) + + if pktype == "column": + t1 = Table( + "t1", + m1, + Column("x", Integer, primary_key=True), + Column("y", Integer, primary_key=True), + ) + elif pktype == "constraint": + t1 = Table( + "t1", + m1, + Column("x", Integer), + Column("y", Integer), + PrimaryKeyConstraint("x", "y"), + ) + elif pktype == "nopk": + t1 = Table( + "t1", + m1, + Column("x", Integer, nullable=False), + Column("y", Integer, nullable=False), + ) + expected_name = None + elif pktype == "explicit_name": + t1 = Table( + "t1", + m1, + Column("x", Integer, primary_key=True), + Column("y", Integer, primary_key=True), + PrimaryKeyConstraint("x", "y", name="myname"), + ) + expected_name = "myname" + + if expected_name: + eq_(t1.primary_key.name, expected_name) + + if pktype == "nopk": + self.assert_compile( + schema.CreateTable(t1), + "CREATE TABLE t1 (x INTEGER NOT NULL, y INTEGER NOT NULL)", + ) + else: + self.assert_compile( + schema.CreateTable(t1), + "CREATE TABLE t1 (x INTEGER NOT NULL, y INTEGER NOT NULL, " + "CONSTRAINT %s PRIMARY KEY (x, y))" % expected_name, + ) + def test_uq_name(self): u1 = self._fixture( naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"}