]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Use separate label generator for column_label naming convention
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 14 Oct 2019 00:33:24 +0000 (20:33 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 14 Oct 2019 16:51:14 +0000 (12:51 -0400)
Fixed bug where a table that would have a column label overlap with a plain
column name, such as "foo.id AS foo_id" vs. "foo.foo_id", would prematurely
generate the ``._label`` attribute for a column before this overlap could
be detected due to the use of the ``index=True`` or ``unique=True`` flag on
the column in conjunction with the default naming convention of
``"column_0_label"``.  This would then lead to failures when ``._label``
were used later to generate a bound parameter name, in particular those
used by the ORM when generating the WHERE clause for an UPDATE statement.
The issue has been fixed by using an alternate ``._label`` accessor for DDL
generation that does not affect the state of the :class:`.Column`.   The
accessor also bypasses the key-deduplication step as it is not necessary
for DDL, the naming is now consistently ``"<tablename>_<columnname>"``
without any subsequent numeric symbols when used in DDL.

Fixes: #4911
Change-Id: Iabf5fd3250738d800d6e41a2a3a27a7ce2405e7d

doc/build/changelog/unreleased_13/4911.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/mssql/base.py
lib/sqlalchemy/sql/compiler.py
lib/sqlalchemy/sql/elements.py
lib/sqlalchemy/sql/naming.py
lib/sqlalchemy/sql/schema.py
test/sql/test_metadata.py
test/sql/test_update.py

diff --git a/doc/build/changelog/unreleased_13/4911.rst b/doc/build/changelog/unreleased_13/4911.rst
new file mode 100644 (file)
index 0000000..02db4b8
--- /dev/null
@@ -0,0 +1,19 @@
+.. change::
+    :tags: bug, schema
+    :tickets: 4911
+
+    Fixed bug where a table that would have a column label overlap with a plain
+    column name, such as "foo.id AS foo_id" vs. "foo.foo_id", would prematurely
+    generate the ``._label`` attribute for a column before this overlap could
+    be detected due to the use of the ``index=True`` or ``unique=True`` flag on
+    the column in conjunction with the default naming convention of
+    ``"column_0_label"``.  This would then lead to failures when ``._label``
+    were used later to generate a bound parameter name, in particular those
+    used by the ORM when generating the WHERE clause for an UPDATE statement.
+    The issue has been fixed by using an alternate ``._label`` accessor for DDL
+    generation that does not affect the state of the :class:`.Column`.   The
+    accessor also bypasses the key-deduplication step as it is not necessary
+    for DDL, the naming is now consistently ``"<tablename>_<columnname>"``
+    without any subsequent numeric symbols when used in DDL.
+
+
index 6c7895ea13e6f97262adb2997ff8a4dcbc49aa69..2f24bb3f4eb4c6a44dfddf6ef9af529d84ed1c38 100644 (file)
@@ -2092,9 +2092,9 @@ class MSDDLCompiler(compiler.DDLCompiler):
             return ""
         text = ""
         if constraint.name is not None:
-            text += "CONSTRAINT %s " % self.preparer.format_constraint(
-                constraint
-            )
+            formatted_name = self.preparer.format_constraint(constraint)
+            if formatted_name is not None:
+                text += "CONSTRAINT %s " % formatted_name
         text += "UNIQUE "
 
         clustered = constraint.dialect_options["mssql"]["clustered"]
index c6c30629d804287085e8c64536b4dacc95d4cfff..5ecec7d6c208c8cb31418938f7b4df1bea76ecf8 100644 (file)
@@ -3022,6 +3022,10 @@ class DDLCompiler(Compiled):
         text = "CREATE "
         if index.unique:
             text += "UNIQUE "
+        if index.name is None:
+            raise exc.CompileError(
+                "CREATE INDEX requires that the index have a name"
+            )
         text += "INDEX %s ON %s (%s)" % (
             self._prepared_index_name(index, include_schema=include_schema),
             preparer.format_table(
@@ -3038,6 +3042,11 @@ class DDLCompiler(Compiled):
 
     def visit_drop_index(self, drop):
         index = drop.element
+
+        if index.name is None:
+            raise exc.CompileError(
+                "DROP INDEX requires that the index have a name"
+            )
         return "\nDROP INDEX " + self._prepared_index_name(
             index, include_schema=True
         )
@@ -3251,7 +3260,8 @@ class DDLCompiler(Compiled):
         text = ""
         if constraint.name is not None:
             formatted_name = self.preparer.format_constraint(constraint)
-            text += "CONSTRAINT %s " % formatted_name
+            if formatted_name is not None:
+                text += "CONSTRAINT %s " % formatted_name
         text += "UNIQUE (%s)" % (
             ", ".join(self.preparer.quote(c.name) for c in constraint)
         )
index b6462b33472b923b630e976d6901fdb4e0f49197..78c434cff5d965f99fdf05da16f54e84130359ad 100644 (file)
@@ -4263,7 +4263,11 @@ class ColumnClause(roles.LabeledColumnExprRole, Immutable, ColumnElement):
     def _render_label_in_columns_clause(self):
         return self.table is not None
 
-    def _gen_label(self, name):
+    @property
+    def _ddl_label(self):
+        return self._gen_label(self.name, dedupe_on_key=False)
+
+    def _gen_label(self, name, dedupe_on_key=True):
         t = self.table
 
         if self.is_literal:
@@ -4287,21 +4291,22 @@ class ColumnClause(roles.LabeledColumnExprRole, Immutable, ColumnElement):
                 assert not isinstance(label, quoted_name)
                 label = quoted_name(label, t.name.quote)
 
-            # ensure the label name doesn't conflict with that
-            # of an existing column.   note that this implies that any
-            # Column must **not** set up its _label before its parent table
-            # has all of its other Column objects set up.  There are several
-            # tables in the test suite which will fail otherwise; example:
-            # table "owner" has columns "name" and "owner_name".  Therefore
-            # column owner.name cannot use the label "owner_name", it has
-            # to be "owner_name_1".
-            if label in t.c:
-                _label = label
-                counter = 1
-                while _label in t.c:
-                    _label = label + "_" + str(counter)
-                    counter += 1
-                label = _label
+            if dedupe_on_key:
+                # ensure the label name doesn't conflict with that of an
+                # existing column.   note that this implies that any Column
+                # must **not** set up its _label before its parent table has
+                # all of its other Column objects set up.  There are several
+                # tables in the test suite which will fail otherwise; example:
+                # table "owner" has columns "name" and "owner_name".  Therefore
+                # column owner.name cannot use the label "owner_name", it has
+                # to be "owner_name_1".
+                if label in t.c:
+                    _label = label
+                    counter = 1
+                    while _label in t.c:
+                        _label = label + "_" + str(counter)
+                        counter += 1
+                    label = _label
 
             return coercions.expect(roles.TruncatedLabelRole, label)
 
index 68a6190cfc4014bd61c06be03f0606902fba363c..3fca4e35bcb30c6ad496db5d96cf867dd9aa8574 100644 (file)
@@ -67,7 +67,7 @@ class ConventionDict(object):
         return self._column_X(idx).name
 
     def _key_column_X_label(self, idx):
-        return self._column_X(idx)._label
+        return self._column_X(idx)._ddl_label
 
     def _key_referred_table_name(self):
         fk = self.const.elements[0]
index af538af0e71c55264aaf7702657bbd018fea121a..4e8f4a39701c908138e9c58da822fe63d5f18286 100644 (file)
@@ -1434,7 +1434,12 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause):
                     "To create indexes with a specific name, create an "
                     "explicit Index object external to the Table."
                 )
-            Index(None, self, unique=bool(self.unique), _column_flag=True)
+            table.append_constraint(
+                Index(
+                    None, self.key, unique=bool(self.unique), _column_flag=True
+                )
+            )
+
         elif self.unique:
             if isinstance(self.unique, util.string_types):
                 raise exc.ArgumentError(
index ad251d1b8142396714a9e91ec5071df221776d24..e08c35bfb062161fa21971de0fece33c3ffd1246 100644 (file)
@@ -31,7 +31,11 @@ from sqlalchemy import Unicode
 from sqlalchemy import UniqueConstraint
 from sqlalchemy import util
 from sqlalchemy.engine import default
+from sqlalchemy.schema import AddConstraint
+from sqlalchemy.schema import CreateIndex
+from sqlalchemy.schema import DropIndex
 from sqlalchemy.sql import naming
+from sqlalchemy.sql.elements import _NONE_NAME
 from sqlalchemy.testing import assert_raises
 from sqlalchemy.testing import assert_raises_message
 from sqlalchemy.testing import AssertsCompiledSQL
@@ -4410,6 +4414,97 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
 
         return u1
 
+    def _colliding_name_fixture(self, naming_convention, id_flags):
+        m1 = MetaData(naming_convention=naming_convention)
+
+        t1 = Table(
+            "foo",
+            m1,
+            Column("id", Integer, **id_flags),
+            Column("foo_id", Integer),
+        )
+        return t1
+
+    def test_colliding_col_label_from_index_flag(self):
+        t1 = self._colliding_name_fixture(
+            {"ix": "ix_%(column_0_label)s"}, {"index": True}
+        )
+
+        idx = list(t1.indexes)[0]
+
+        # name is generated up front.  alembic really prefers this
+        eq_(idx.name, "ix_foo_id")
+        self.assert_compile(
+            CreateIndex(idx), "CREATE INDEX ix_foo_id ON foo (id)"
+        )
+
+    def test_colliding_col_label_from_unique_flag(self):
+        t1 = self._colliding_name_fixture(
+            {"uq": "uq_%(column_0_label)s"}, {"unique": True}
+        )
+
+        const = [c for c in t1.constraints if isinstance(c, UniqueConstraint)]
+        uq = const[0]
+
+        # name is generated up front.  alembic really prefers this
+        eq_(uq.name, "uq_foo_id")
+
+        self.assert_compile(
+            AddConstraint(uq),
+            "ALTER TABLE foo ADD CONSTRAINT uq_foo_id UNIQUE (id)",
+        )
+
+    def test_colliding_col_label_from_index_obj(self):
+        t1 = self._colliding_name_fixture({"ix": "ix_%(column_0_label)s"}, {})
+
+        idx = Index(None, t1.c.id)
+        is_(idx, list(t1.indexes)[0])
+        eq_(idx.name, "ix_foo_id")
+        self.assert_compile(
+            CreateIndex(idx), "CREATE INDEX ix_foo_id ON foo (id)"
+        )
+
+    def test_colliding_col_label_from_unique_obj(self):
+        t1 = self._colliding_name_fixture({"uq": "uq_%(column_0_label)s"}, {})
+        uq = UniqueConstraint(t1.c.id)
+        const = [c for c in t1.constraints if isinstance(c, UniqueConstraint)]
+        is_(const[0], uq)
+        eq_(const[0].name, "uq_foo_id")
+        self.assert_compile(
+            AddConstraint(const[0]),
+            "ALTER TABLE foo ADD CONSTRAINT uq_foo_id UNIQUE (id)",
+        )
+
+    def test_colliding_col_label_from_index_flag_no_conv(self):
+        t1 = self._colliding_name_fixture({"ck": "foo"}, {"index": True})
+
+        idx = list(t1.indexes)[0]
+
+        # this behavior needs to fail, as of #4911 since we are testing it,
+        # ensure it raises a CompileError.  In #4289 we may want to revisit
+        # this in some way, most likely specifically to Postgresql only.
+        assert_raises_message(
+            exc.CompileError,
+            "CREATE INDEX requires that the index have a name",
+            CreateIndex(idx).compile,
+        )
+
+        assert_raises_message(
+            exc.CompileError,
+            "DROP INDEX requires that the index have a name",
+            DropIndex(idx).compile,
+        )
+
+    def test_colliding_col_label_from_unique_flag_no_conv(self):
+        t1 = self._colliding_name_fixture({"ck": "foo"}, {"unique": True})
+
+        const = [c for c in t1.constraints if isinstance(c, UniqueConstraint)]
+        is_(const[0].name, None)
+
+        self.assert_compile(
+            AddConstraint(const[0]), "ALTER TABLE foo ADD UNIQUE (id)"
+        )
+
     def test_uq_name(self):
         u1 = self._fixture(
             naming_convention={"uq": "uq_%(table_name)s_%(column_0_name)s"}
@@ -4775,11 +4870,11 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
 
         u1 = Table("user", m1, Column("x", Boolean()))
         # constraint is not hit
-        eq_(
+        is_(
             [c for c in u1.constraints if isinstance(c, CheckConstraint)][
                 0
             ].name,
-            "_unnamed_",
+            _NONE_NAME,
         )
         # but is hit at compile time
         self.assert_compile(
@@ -4841,11 +4936,11 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
 
         u1 = Table("user", m1, Column("x", Boolean()))
         # constraint gets special _defer_none_name
-        eq_(
+        is_(
             [c for c in u1.constraints if isinstance(c, CheckConstraint)][
                 0
             ].name,
-            "_unnamed_",
+            _NONE_NAME,
         )
         # no issue with native boolean
         self.assert_compile(
@@ -4867,11 +4962,11 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
 
         u1 = Table("user", m1, Column("x", Boolean()))
         # constraint gets special _defer_none_name
-        eq_(
+        is_(
             [c for c in u1.constraints if isinstance(c, CheckConstraint)][
                 0
             ].name,
-            "_unnamed_",
+            _NONE_NAME,
         )
 
         self.assert_compile(
@@ -4903,3 +4998,14 @@ class NamingConventionTest(fixtures.TestBase, AssertsCompiledSQL):
 
         eq_(t2a.primary_key.name, t2b.primary_key.name)
         eq_(t2b.primary_key.name, "t2_pk")
+
+    def test_expression_index(self):
+        m = MetaData(naming_convention={"ix": "ix_%(column_0_label)s"})
+        t = Table("t", m, Column("q", Integer), Column("p", Integer))
+        ix = Index(None, t.c.q + 5)
+        t.append_constraint(ix)
+
+        # huh.  pretty cool
+        self.assert_compile(
+            CreateIndex(ix), "CREATE INDEX ix_t_q ON t (q + 5)"
+        )
index 9309ca45a9f6108f8856452ef03af5bd1f7dfbe2..e625b7d9cee5d41a73ab98a8b2a0495dc0abc6ef 100644 (file)
@@ -292,6 +292,26 @@ class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
             "UPDATE foo SET id=:id, foo_id=:foo_id WHERE foo.id = :foo_id_1",
         )
 
+    def test_labels_no_collision_index(self):
+        """test for [ticket:4911] """
+
+        t = Table(
+            "foo",
+            MetaData(),
+            Column("id", Integer, index=True),
+            Column("foo_id", Integer),
+        )
+
+        self.assert_compile(
+            t.update().where(t.c.id == 5),
+            "UPDATE foo SET id=:id, foo_id=:foo_id WHERE foo.id = :id_1",
+        )
+
+        self.assert_compile(
+            t.update().where(t.c.id == bindparam(key=t.c.id._label)),
+            "UPDATE foo SET id=:id, foo_id=:foo_id WHERE foo.id = :foo_id_1",
+        )
+
     def test_inline_defaults(self):
         m = MetaData()
         foo = Table("foo", m, Column("id", Integer))