]> git.ipfire.org Git - thirdparty/sqlalchemy/alembic.git/commitdiff
Escape sql server constriant names
authorCaselIT <cfederico87@gmail.com>
Thu, 23 Feb 2023 20:45:11 +0000 (21:45 +0100)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 28 Feb 2023 18:24:23 +0000 (13:24 -0500)
Properly escape constraint name on SQL Server when dropping
a column while specifying ``mssql_drop_default=True`` or
``mssql_drop_check=True`` or ``mssql_drop_foreign_key=True``.

Fixes: #1187
Change-Id: I060442bc63c4e53f64724985e20e6e15e4335b6b

alembic/ddl/mssql.py
docs/build/unreleased/1187.rst [new file with mode: 0644]
tests/test_mssql.py

index bdf215d8ee7fbb82996d6056472c4cc845de5dbe..ebf4db19afae99e5cd68e453f816b5ab8f66ad63 100644 (file)
@@ -306,9 +306,8 @@ def _exec_drop_col_constraint(
     )
     # from http://www.mssqltips.com/sqlservertip/1425/\
     # working-with-default-constraints-in-sql-server/
-    # TODO: needs table formatting, etc.
     return """declare @const_name varchar(256)
-select @const_name = [name] from %(type)s
+select @const_name = QUOTENAME([name]) from %(type)s
 where parent_object_id = object_id('%(schema_dot)s%(tname)s')
 and col_name(parent_object_id, parent_column_id) = '%(colname)s'
 exec('alter table %(tname_quoted)s drop constraint ' + @const_name)""" % {
@@ -327,7 +326,7 @@ def _exec_drop_col_fk_constraint(
     schema, tname, colname = element.schema, element.tname, element.colname
 
     return """declare @const_name varchar(256)
-select @const_name = [name] from
+select @const_name = QUOTENAME([name]) from
 sys.foreign_keys fk join sys.foreign_key_columns fkc
 on fk.object_id=fkc.constraint_object_id
 where fkc.parent_object_id = object_id('%(schema_dot)s%(tname)s')
diff --git a/docs/build/unreleased/1187.rst b/docs/build/unreleased/1187.rst
new file mode 100644 (file)
index 0000000..0d9c7dd
--- /dev/null
@@ -0,0 +1,8 @@
+.. change::
+    :tags: bug, mssql
+    :tickets: 1187
+
+    Properly escape constraint name on SQL Server when dropping
+    a column while specifying ``mssql_drop_default=True`` or
+    ``mssql_drop_check=True`` or ``mssql_drop_foreign_key=True``.
+
index 4bb618cd4b8917f104ea159a7734966a73c75b47..640e1685b534a33454b5b875ca2c1dfa0d17f06c 100644 (file)
@@ -4,10 +4,16 @@ from __future__ import annotations
 from typing import Any
 from typing import Dict
 
+from sqlalchemy import CheckConstraint
 from sqlalchemy import Column
 from sqlalchemy import exc
+from sqlalchemy import ForeignKey
+from sqlalchemy import inspect
 from sqlalchemy import Integer
+from sqlalchemy import MetaData
 from sqlalchemy import String
+from sqlalchemy import Table
+from sqlalchemy import text
 
 from alembic import command
 from alembic import op
@@ -17,6 +23,7 @@ from alembic.testing import combinations
 from alembic.testing import config
 from alembic.testing import eq_
 from alembic.testing import expect_warnings
+from alembic.testing import fixture
 from alembic.testing.env import _no_sql_testing_config
 from alembic.testing.env import clear_staging_env
 from alembic.testing.env import staging_env
@@ -181,9 +188,10 @@ class OpTest(TestBase):
             server_default=None,
         )
         context.assert_contains(
-            "declare @const_name varchar(256)select @const_name = [name] "
-            "from sys.default_constraintswhere parent_object_id = "
-            "object_id('t')and col_name(parent_object_id, "
+            "declare @const_name varchar(256)\n"
+            "select @const_name = QUOTENAME([name]) "
+            "from sys.default_constraints\nwhere parent_object_id = "
+            "object_id('t')\nand col_name(parent_object_id, "
             "parent_column_id) = 'c'"
         )
         context.assert_contains(
@@ -199,9 +207,10 @@ class OpTest(TestBase):
             schema="xyz",
         )
         context.assert_contains(
-            "declare @const_name varchar(256)select @const_name = [name] "
-            "from sys.default_constraintswhere parent_object_id = "
-            "object_id('xyz.t')and col_name(parent_object_id, "
+            "declare @const_name varchar(256)\n"
+            "select @const_name = QUOTENAME([name]) from "
+            "sys.default_constraints\nwhere parent_object_id = "
+            "object_id('xyz.t')\nand col_name(parent_object_id, "
             "parent_column_id) = 'c'"
         )
         context.assert_contains(
@@ -259,7 +268,7 @@ class OpTest(TestBase):
         op.drop_column("t1", "c1", mssql_drop_foreign_key=True)
         context.assert_contains(
             "declare @const_name varchar(256)\n"
-            "select @const_name = [name] from\n"
+            "select @const_name = QUOTENAME([name]) from\n"
             "sys.foreign_keys fk join sys.foreign_key_columns fkcon "
             "fk.object_id=fkc.constraint_object_id\n"
             "where fkc.parent_object_id = object_id('t1')\nand "
@@ -276,7 +285,7 @@ class OpTest(TestBase):
         op.drop_column("t1", "c1", schema="xyz", mssql_drop_foreign_key=True)
         context.assert_contains(
             "declare @const_name varchar(256)\n"
-            "select @const_name = [name] from\n"
+            "select @const_name = QUOTENAME([name]) from\n"
             "sys.foreign_keys fk join sys.foreign_key_columns fkcon "
             "fk.object_id=fkc.constraint_object_id\n"
             "where fkc.parent_object_id = object_id('xyz.t1')\nand "
@@ -516,3 +525,51 @@ class OpTest(TestBase):
             server_default=sd(),
             existing_server_default=esd(),
         )
+
+
+class RoundTripTest(TestBase):
+    __backend__ = True
+    __only_on__ = "mssql"
+
+    @fixture
+    def tables(self, connection):
+        self.meta = MetaData()
+        self.tbl_other = Table(
+            "other", self.meta, Column("oid", Integer, primary_key=True)
+        )
+        self.tbl = Table(
+            "round_trip_table",
+            self.meta,
+            Column("id", Integer, primary_key=True),
+            Column(
+                "oid_fk", ForeignKey("other.oid", name="require quote charß!")
+            ),
+            Column("with_check", Integer),
+            CheckConstraint("with_check > 0", name="chéck çst"),
+        )
+        self.meta.create_all(connection)
+        yield
+        self.meta.drop_all(connection)
+
+    def test_drop_col_with_fk(self, ops_context, connection, tables):
+        ops_context.drop_column(
+            "round_trip_table", "oid_fk", mssql_drop_foreign_key=True
+        )
+        insp = inspect(connection)
+        eq_(insp.get_foreign_keys("round_trip_table"), [])
+
+    def test_drop_col_with_check(self, ops_context, connection, tables):
+        ops_context.drop_column(
+            "round_trip_table", "with_check", mssql_drop_check=True
+        )
+        # No get_check_constraints on mssql
+        val = connection.execute(
+            text(
+                "select name from sys.check_constraints where "
+                "parent_object_id=object_id('round_trip_table')"
+            )
+        ).scalar()
+        eq_(val, None)
+
+    # don't know if a default constraint can be explicitly named, but
+    # the path is the same as the check constraint, so it should be good