From: Mike Bayer Date: Thu, 4 Feb 2021 17:09:54 +0000 (-0500) Subject: Accommodate column-based naming conventions for pk constraint X-Git-Tag: rel_1_4_0b3~25 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9660e94e35be438b0d51cd87e6ccb4047a332c15;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Accommodate column-based naming conventions for pk constraint Repaired / implemented support for primary key constraint naming conventions that use column names/keys/etc as part of the convention. In particular, this includes that the :class:`.PrimaryKeyConstraint` object that's automatically associated with a :class:`.schema.Table` will update its name as new primary key :class:`_schema.Column` objects are added to the table and then to the constraint. Internal failure modes related to this constraint construction process including no columns present, no name present or blank name present are now accommodated. Fixes: #5919 Change-Id: Ic2800b50f4a4cd5978bec48cefea0a2e198e0123 --- diff --git a/doc/build/changelog/unreleased_13/5919.rst b/doc/build/changelog/unreleased_13/5919.rst new file mode 100644 index 0000000000..ddae6c2b03 --- /dev/null +++ b/doc/build/changelog/unreleased_13/5919.rst @@ -0,0 +1,12 @@ +.. change:: + :tags: bug, schema + :tickets: 5919 + + Repaired / implemented support for primary key constraint naming + conventions that use column names/keys/etc as part of the convention. In + particular, this includes that the :class:`.PrimaryKeyConstraint` object + that's automatically associated with a :class:`.schema.Table` will update + its name as new primary key :class:`_schema.Column` objects are added to + the table and then to the constraint. Internal failure modes related to + this constraint construction process including no columns present, no name + present or blank name present are now accommodated. diff --git a/lib/sqlalchemy/event/base.py b/lib/sqlalchemy/event/base.py index 181db0cf2a..fb75d9e3ec 100644 --- a/lib/sqlalchemy/event/base.py +++ b/lib/sqlalchemy/event/base.py @@ -29,7 +29,13 @@ _registrars = util.defaultdict(list) def _is_event_name(name): - return not name.startswith("_") and name != "dispatch" + # _sa_event prefix is special to support internal-only event names. + # most event names are just plain method names that aren't + # underscored. + + return ( + not name.startswith("_") and name != "dispatch" + ) or name.startswith("_sa_event") class _UnpickleDispatch(object): diff --git a/lib/sqlalchemy/sql/events.py b/lib/sqlalchemy/sql/events.py index deaa992aff..18c0e8c605 100644 --- a/lib/sqlalchemy/sql/events.py +++ b/lib/sqlalchemy/sql/events.py @@ -209,6 +209,12 @@ class DDLEvents(event.Events): """ + def _sa_event_column_added_to_pk_constraint(self, const, col): + """internal event hook used for primary key naming convention + updates. + + """ + def column_reflect(self, inspector, table, column_info): """Called for each unit of 'column info' retrieved when a :class:`_schema.Table` is being reflected. diff --git a/lib/sqlalchemy/sql/naming.py b/lib/sqlalchemy/sql/naming.py index 130ec6875d..005c41683b 100644 --- a/lib/sqlalchemy/sql/naming.py +++ b/lib/sqlalchemy/sql/naming.py @@ -39,12 +39,22 @@ class ConventionDict(object): def _key_table_name(self): return self.table.name - def _column_X(self, idx): + def _column_X(self, idx, attrname): if self._is_fk: - fk = self.const.elements[idx] - return fk.parent + try: + fk = self.const.elements[idx] + except IndexError: + return "" + else: + return getattr(fk.parent, attrname) else: - return list(self.const.columns)[idx] + cols = list(self.const.columns) + try: + col = cols[idx] + except IndexError: + return "" + else: + return getattr(col, attrname) def _key_constraint_name(self): if isinstance(self._const_name, (type(None), _defer_none_name)): @@ -61,13 +71,13 @@ class ConventionDict(object): # note this method was missing before # [ticket:3989], meaning tokens like ``%(column_0_key)s`` weren't # working even though documented. - return self._column_X(idx).key + return self._column_X(idx, "key") def _key_column_X_name(self, idx): - return self._column_X(idx).name + return self._column_X(idx, "name") def _key_column_X_label(self, idx): - return self._column_X(idx)._ddl_label + return self._column_X(idx, "_ddl_label") def _key_referred_table_name(self): fk = self.const.elements[0] @@ -161,10 +171,28 @@ def _constraint_name_for_table(const, table): return None +@event.listens_for( + PrimaryKeyConstraint, "_sa_event_column_added_to_pk_constraint" +) +def _column_added_to_pk_constraint(pk_constraint, col): + if pk_constraint._implicit_generated: + # only operate upon the "implicit" pk constraint for now, + # as we have to force the name to None to reset it. the + # "implicit" constraint will only have a naming convention name + # if at all. + table = pk_constraint.table + pk_constraint.name = None + newname = _constraint_name_for_table(pk_constraint, table) + if newname: + pk_constraint.name = newname + + @event.listens_for(Constraint, "after_parent_attach") @event.listens_for(Index, "after_parent_attach") def _constraint_name(const, table): if isinstance(table, Column): + # this path occurs for a CheckConstraint linked to a Column + # for column-attached constraint, set another event # to link the column attached to the table as this constraint # associated with the table. @@ -173,10 +201,11 @@ def _constraint_name(const, table): "after_parent_attach", lambda col, table: _constraint_name(const, table), ) + elif isinstance(table, Table): if isinstance(const.name, (conv, _defer_name)): return newname = _constraint_name_for_table(const, table) - if newname is not None: + if newname: const.name = newname diff --git a/lib/sqlalchemy/sql/schema.py b/lib/sqlalchemy/sql/schema.py index 34bedbc6a3..127b12e817 100644 --- a/lib/sqlalchemy/sql/schema.py +++ b/lib/sqlalchemy/sql/schema.py @@ -1777,6 +1777,8 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause): table._columns.replace(self) + self.table = table + if self.primary_key: table.primary_key._replace(self) elif self.key in table.primary_key: @@ -1786,8 +1788,6 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause): % (self.key, table.fullname) ) - self.table = table - if self.index: if isinstance(self.index, util.string_types): raise exc.ArgumentError( @@ -3838,7 +3838,6 @@ class PrimaryKeyConstraint(ColumnCollectionConstraint): are already present. """ - # set the primary key flag on new columns. # note any existing PK cols on the table also have their # flag still set. @@ -3854,6 +3853,8 @@ class PrimaryKeyConstraint(ColumnCollectionConstraint): PrimaryKeyConstraint._autoincrement_column._reset(self) self.columns.replace(col) + self.dispatch._sa_event_column_added_to_pk_constraint(self, col) + @property def columns_autoinc_first(self): autoinc = self._autoincrement_column diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py index 4a592a4763..1c24cc095a 100644 --- a/test/sql/test_metadata.py +++ b/test/sql/test_metadata.py @@ -4841,6 +4841,73 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL): AddConstraint(const[0]), "ALTER TABLE foo ADD UNIQUE (id)" ) + @testing.combinations( + ("nopk",), + ("column",), + ("constraint",), + ("explicit_name",), + argnames="pktype", + ) + @testing.combinations( + ("pk_%(table_name)s", "pk_t1"), + ("pk_%(column_0_name)s", "pk_x"), + ("pk_%(column_0_N_name)s", "pk_x_y"), + ("pk_%(column_0_N_label)s", "pk_t1_x_t1_y"), + ("%(column_0_name)s", "x"), + ("%(column_0N_name)s", "xy"), + argnames="conv, expected_name", + ) + def test_pk_conventions(self, conv, expected_name, pktype): + m1 = MetaData(naming_convention={"pk": conv}) + + if pktype == "column": + t1 = Table( + "t1", + m1, + Column("x", Integer, primary_key=True), + Column("y", Integer, primary_key=True), + ) + elif pktype == "constraint": + t1 = Table( + "t1", + m1, + Column("x", Integer), + Column("y", Integer), + PrimaryKeyConstraint("x", "y"), + ) + elif pktype == "nopk": + t1 = Table( + "t1", + m1, + Column("x", Integer, nullable=False), + Column("y", Integer, nullable=False), + ) + expected_name = None + elif pktype == "explicit_name": + t1 = Table( + "t1", + m1, + Column("x", Integer, primary_key=True), + Column("y", Integer, primary_key=True), + PrimaryKeyConstraint("x", "y", name="myname"), + ) + expected_name = "myname" + + if expected_name: + eq_(t1.primary_key.name, expected_name) + + if pktype == "nopk": + self.assert_compile( + schema.CreateTable(t1), + "CREATE TABLE t1 (x INTEGER NOT NULL, y INTEGER NOT NULL)", + ) + else: + self.assert_compile( + schema.CreateTable(t1), + "CREATE TABLE t1 (x INTEGER NOT NULL, y INTEGER NOT NULL, " + "CONSTRAINT %s PRIMARY KEY (x, y))" % expected_name, + ) + def test_uq_name(self): u1 = self._fixture( naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"}