]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Accommodate column-based naming conventions for pk constraint
authorMike Bayer <mike_mp@zzzcomputing.com>
Thu, 4 Feb 2021 17:09:54 +0000 (12:09 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Thu, 4 Feb 2021 17:13:24 +0000 (12:13 -0500)
Repaired / implemented support for primary key constraint naming
conventions that use column names/keys/etc as part of the convention. In
particular, this includes that the :class:`.PrimaryKeyConstraint` object
that's automatically associated with a :class:`.schema.Table` will update
its name as new primary key :class:`_schema.Column` objects are added to
the table and then to the constraint. Internal failure modes related to
this constraint construction process including no columns present, no name
present or blank name present are now accommodated.

Fixes: #5919
Change-Id: Ic2800b50f4a4cd5978bec48cefea0a2e198e0123
(cherry picked from commit 9660e94e35be438b0d51cd87e6ccb4047a332c15)

doc/build/changelog/unreleased_13/5919.rst [new file with mode: 0644]
lib/sqlalchemy/event/base.py
lib/sqlalchemy/events.py
lib/sqlalchemy/sql/naming.py
lib/sqlalchemy/sql/schema.py
test/sql/test_metadata.py

diff --git a/doc/build/changelog/unreleased_13/5919.rst b/doc/build/changelog/unreleased_13/5919.rst
new file mode 100644 (file)
index 0000000..ddae6c2
--- /dev/null
@@ -0,0 +1,12 @@
+.. change::
+    :tags: bug, schema
+    :tickets: 5919
+
+    Repaired / implemented support for primary key constraint naming
+    conventions that use column names/keys/etc as part of the convention. In
+    particular, this includes that the :class:`.PrimaryKeyConstraint` object
+    that's automatically associated with a :class:`.schema.Table` will update
+    its name as new primary key :class:`_schema.Column` objects are added to
+    the table and then to the constraint. Internal failure modes related to
+    this constraint construction process including no columns present, no name
+    present or blank name present are now accommodated.
index 56d14ce0e8741abda0a332e57df96490215a96c9..05c24730c34c193b72f25efd682d7dedde4b5552 100644 (file)
@@ -29,7 +29,13 @@ _registrars = util.defaultdict(list)
 
 
 def _is_event_name(name):
-    return not name.startswith("_") and name != "dispatch"
+    # _sa_event prefix is special to support internal-only event names.
+    # most event names are just plain method names that aren't
+    # underscored.
+
+    return (
+        not name.startswith("_") and name != "dispatch"
+    ) or name.startswith("_sa_event")
 
 
 class _UnpickleDispatch(object):
index 2a97432ad6db3dda8354505ddb73d87da59e253e..40a6e90ebcbcab0779087bb84c25fd905988fda5 100644 (file)
@@ -216,6 +216,12 @@ class DDLEvents(event.Events):
 
         """
 
+    def _sa_event_column_added_to_pk_constraint(self, const, col):
+        """internal event hook used for primary key naming convention
+        updates.
+
+        """
+
     def column_reflect(self, inspector, table, column_info):
         """Called for each unit of 'column info' retrieved when
         a :class:`_schema.Table` is being reflected.
index ba1eaf9f2202d44f26cec4bd910839ff649058a9..b727b51ce6fd7dcff405abfab4646918c5f1784b 100644 (file)
@@ -39,12 +39,22 @@ class ConventionDict(object):
     def _key_table_name(self):
         return self.table.name
 
-    def _column_X(self, idx):
+    def _column_X(self, idx, attrname):
         if self._is_fk:
-            fk = self.const.elements[idx]
-            return fk.parent
+            try:
+                fk = self.const.elements[idx]
+            except IndexError:
+                return ""
+            else:
+                return getattr(fk.parent, attrname)
         else:
-            return list(self.const.columns)[idx]
+            cols = list(self.const.columns)
+            try:
+                col = cols[idx]
+            except IndexError:
+                return ""
+            else:
+                return getattr(col, attrname)
 
     def _key_constraint_name(self):
         if isinstance(self._const_name, (type(None), _defer_none_name)):
@@ -61,13 +71,13 @@ class ConventionDict(object):
         # note this method was missing before
         # [ticket:3989], meaning tokens like ``%(column_0_key)s`` weren't
         # working even though documented.
-        return self._column_X(idx).key
+        return self._column_X(idx, "key")
 
     def _key_column_X_name(self, idx):
-        return self._column_X(idx).name
+        return self._column_X(idx, "name")
 
     def _key_column_X_label(self, idx):
-        return self._column_X(idx)._ddl_label
+        return self._column_X(idx, "_ddl_label")
 
     def _key_referred_table_name(self):
         fk = self.const.elements[0]
@@ -161,10 +171,28 @@ def _constraint_name_for_table(const, table):
         return None
 
 
+@event.listens_for(
+    PrimaryKeyConstraint, "_sa_event_column_added_to_pk_constraint"
+)
+def _column_added_to_pk_constraint(pk_constraint, col):
+    if pk_constraint._implicit_generated:
+        # only operate upon the "implicit" pk constraint for now,
+        # as we have to force the name to None to reset it.  the
+        # "implicit" constraint will only have a naming convention name
+        # if at all.
+        table = pk_constraint.table
+        pk_constraint.name = None
+        newname = _constraint_name_for_table(pk_constraint, table)
+        if newname:
+            pk_constraint.name = newname
+
+
 @event.listens_for(Constraint, "after_parent_attach")
 @event.listens_for(Index, "after_parent_attach")
 def _constraint_name(const, table):
     if isinstance(table, Column):
+        # this path occurs for a CheckConstraint linked to a Column
+
         # for column-attached constraint, set another event
         # to link the column attached to the table as this constraint
         # associated with the table.
@@ -173,10 +201,11 @@ def _constraint_name(const, table):
             "after_parent_attach",
             lambda col, table: _constraint_name(const, table),
         )
+
     elif isinstance(table, Table):
         if isinstance(const.name, (conv, _defer_name)):
             return
 
         newname = _constraint_name_for_table(const, table)
-        if newname is not None:
+        if newname:
             const.name = newname
index 191246f4606ffe58ea7665dc337198a0eba98857..e6ab8efb53671fe7f773930fa57c6477a9a5e6bf 100644 (file)
@@ -1665,6 +1665,8 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause):
 
         table._columns.replace(self)
 
+        self.table = table
+
         if self.primary_key:
             table.primary_key._replace(self)
         elif self.key in table.primary_key:
@@ -1674,8 +1676,6 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause):
                 % (self.key, table.fullname)
             )
 
-        self.table = table
-
         if self.index:
             if isinstance(self.index, util.string_types):
                 raise exc.ArgumentError(
@@ -3747,7 +3747,6 @@ class PrimaryKeyConstraint(ColumnCollectionConstraint):
         are already present.
 
         """
-
         # set the primary key flag on new columns.
         # note any existing PK cols on the table also have their
         # flag still set.
@@ -3763,6 +3762,8 @@ class PrimaryKeyConstraint(ColumnCollectionConstraint):
         PrimaryKeyConstraint._autoincrement_column._reset(self)
         self.columns.replace(col)
 
+        self.dispatch._sa_event_column_added_to_pk_constraint(self, col)
+
     @property
     def columns_autoinc_first(self):
         autoinc = self._autoincrement_column
index cd86d1f1c0cf7581d33e2e19ba90e79dc93ac92d..12723a1af1c00affe121535ae8d74d670edb3edf 100644 (file)
@@ -4689,6 +4689,73 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
             AddConstraint(const[0]), "ALTER TABLE foo ADD UNIQUE (id)"
         )
 
+    @testing.combinations(
+        ("nopk",),
+        ("column",),
+        ("constraint",),
+        ("explicit_name",),
+        argnames="pktype",
+    )
+    @testing.combinations(
+        ("pk_%(table_name)s", "pk_t1"),
+        ("pk_%(column_0_name)s", "pk_x"),
+        ("pk_%(column_0_N_name)s", "pk_x_y"),
+        ("pk_%(column_0_N_label)s", "pk_t1_x_t1_y"),
+        ("%(column_0_name)s", "x"),
+        ("%(column_0N_name)s", "xy"),
+        argnames="conv, expected_name",
+    )
+    def test_pk_conventions(self, conv, expected_name, pktype):
+        m1 = MetaData(naming_convention={"pk": conv})
+
+        if pktype == "column":
+            t1 = Table(
+                "t1",
+                m1,
+                Column("x", Integer, primary_key=True),
+                Column("y", Integer, primary_key=True),
+            )
+        elif pktype == "constraint":
+            t1 = Table(
+                "t1",
+                m1,
+                Column("x", Integer),
+                Column("y", Integer),
+                PrimaryKeyConstraint("x", "y"),
+            )
+        elif pktype == "nopk":
+            t1 = Table(
+                "t1",
+                m1,
+                Column("x", Integer, nullable=False),
+                Column("y", Integer, nullable=False),
+            )
+            expected_name = None
+        elif pktype == "explicit_name":
+            t1 = Table(
+                "t1",
+                m1,
+                Column("x", Integer, primary_key=True),
+                Column("y", Integer, primary_key=True),
+                PrimaryKeyConstraint("x", "y", name="myname"),
+            )
+            expected_name = "myname"
+
+        if expected_name:
+            eq_(t1.primary_key.name, expected_name)
+
+        if pktype == "nopk":
+            self.assert_compile(
+                schema.CreateTable(t1),
+                "CREATE TABLE t1 (x INTEGER NOT NULL, y INTEGER NOT NULL)",
+            )
+        else:
+            self.assert_compile(
+                schema.CreateTable(t1),
+                "CREATE TABLE t1 (x INTEGER NOT NULL, y INTEGER NOT NULL, "
+                "CONSTRAINT %s PRIMARY KEY (x, y))" % expected_name,
+            )
+
     def test_uq_name(self):
         u1 = self._fixture(
             naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"}