]> git.ipfire.org Git - thirdparty/sqlalchemy/alembic.git/commitdiff
Support schemas for MSSQL drop server default + FK constraint
authorMike Bayer <mike_mp@zzzcomputing.com>
Tue, 12 Nov 2019 15:01:42 +0000 (10:01 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 12 Nov 2019 15:16:29 +0000 (10:16 -0500)
Fixed bug in MSSQL dialect where the drop constraint execution steps used
to remove server default or implicit foreign key constraint failed to take
into account the schema name of the target table.

Change-Id: Ia95b043ee6289efbb90d6f21392f4ce622748611
Fixes: #621
alembic/ddl/mssql.py
alembic/testing/fixtures.py
docs/build/unreleased/621.rst [new file with mode: 0644]
tests/test_mssql.py

index de2168bf6f59ed11f8b9f9dc37720961b9ff6ea7..3ccbbff06579862477376f58df2d4a6d0e46646d 100644 (file)
@@ -89,7 +89,10 @@ class MSSQLImpl(DefaultImpl):
             if existing_server_default is not False or server_default is None:
                 self._exec(
                     _ExecDropConstraint(
-                        table_name, column_name, "sys.default_constraints"
+                        table_name,
+                        column_name,
+                        "sys.default_constraints",
+                        schema,
                     )
                 )
             if server_default is not None:
@@ -129,72 +132,83 @@ class MSSQLImpl(DefaultImpl):
         else:
             super(MSSQLImpl, self).bulk_insert(table, rows, **kw)
 
-    def drop_column(self, table_name, column, **kw):
+    def drop_column(self, table_name, column, schema=None, **kw):
         drop_default = kw.pop("mssql_drop_default", False)
         if drop_default:
             self._exec(
                 _ExecDropConstraint(
-                    table_name, column, "sys.default_constraints"
+                    table_name, column, "sys.default_constraints", schema
                 )
             )
         drop_check = kw.pop("mssql_drop_check", False)
         if drop_check:
             self._exec(
                 _ExecDropConstraint(
-                    table_name, column, "sys.check_constraints"
+                    table_name, column, "sys.check_constraints", schema
                 )
             )
         drop_fks = kw.pop("mssql_drop_foreign_key", False)
         if drop_fks:
-            self._exec(_ExecDropFKConstraint(table_name, column))
-        super(MSSQLImpl, self).drop_column(table_name, column, **kw)
+            self._exec(_ExecDropFKConstraint(table_name, column, schema))
+        super(MSSQLImpl, self).drop_column(
+            table_name, column, schema=schema, **kw
+        )
 
 
 class _ExecDropConstraint(Executable, ClauseElement):
-    def __init__(self, tname, colname, type_):
+    def __init__(self, tname, colname, type_, schema):
         self.tname = tname
         self.colname = colname
         self.type_ = type_
+        self.schema = schema
 
 
 class _ExecDropFKConstraint(Executable, ClauseElement):
-    def __init__(self, tname, colname):
+    def __init__(self, tname, colname, schema):
         self.tname = tname
         self.colname = colname
+        self.schema = schema
 
 
 @compiles(_ExecDropConstraint, "mssql")
 def _exec_drop_col_constraint(element, compiler, **kw):
-    tname, colname, type_ = element.tname, element.colname, element.type_
+    schema, tname, colname, type_ = (
+        element.schema,
+        element.tname,
+        element.colname,
+        element.type_,
+    )
     # from http://www.mssqltips.com/sqlservertip/1425/\
     # working-with-default-constraints-in-sql-server/
     # TODO: needs table formatting, etc.
     return """declare @const_name varchar(256)
 select @const_name = [name] from %(type)s
-where parent_object_id = object_id('%(tname)s')
+where parent_object_id = object_id('%(schema_dot)s%(tname)s')
 and col_name(parent_object_id, parent_column_id) = '%(colname)s'
 exec('alter table %(tname_quoted)s drop constraint ' + @const_name)""" % {
         "type": type_,
         "tname": tname,
         "colname": colname,
-        "tname_quoted": format_table_name(compiler, tname, None),
+        "tname_quoted": format_table_name(compiler, tname, schema),
+        "schema_dot": schema + "." if schema else "",
     }
 
 
 @compiles(_ExecDropFKConstraint, "mssql")
 def _exec_drop_col_fk_constraint(element, compiler, **kw):
-    tname, colname = element.tname, element.colname
+    schema, tname, colname = element.schema, element.tname, element.colname
 
     return """declare @const_name varchar(256)
 select @const_name = [name] from
-    sys.foreign_keys fk join sys.foreign_key_columns fkc
-    on fk.object_id=fkc.constraint_object_id
-where fkc.parent_object_id = object_id('%(tname)s')
-and col_name(fkc.parent_object_id, fkc.parent_column_id) = '%(colname)s'
+sys.foreign_keys fk join sys.foreign_key_columns fkc
+on fk.object_id=fkc.constraint_object_id
+where fkc.parent_object_id = object_id('%(schema_dot)s%(tname)s')
+`and col_name(fkc.parent_object_id, fkc.parent_column_id) = '%(colname)s'
 exec('alter table %(tname_quoted)s drop constraint ' + @const_name)""" % {
         "tname": tname,
         "colname": colname,
-        "tname_quoted": format_table_name(compiler, tname, None),
+        "tname_quoted": format_table_name(compiler, tname, schema),
+        "schema_dot": schema + "." if schema else "",
     }
 
 
index b3990a5b03c363a0fb07cc5bfc7fcf134ddbfab7..29514556009405e83fd6608a43d75a7b0579fc66 100644 (file)
@@ -119,17 +119,20 @@ def op_fixture(
     buf = buffer_()
 
     class ctx(MigrationContext):
+        def get_buf(self):
+            return buf
+
         def clear_assertions(self):
             buf.lines[:] = []
 
         def assert_(self, *sql):
             # TODO: make this more flexible about
             # whitespace and such
-            eq_(buf.lines, list(sql))
+            eq_(buf.lines, [re.sub(r"[\n\t]", "", s) for s in sql])
 
         def assert_contains(self, sql):
             for stmt in buf.lines:
-                if sql in stmt:
+                if re.sub(r"[\n\t]", "", sql) in stmt:
                     return
             else:
                 assert False, "Could not locate fragment %r in %r" % (
diff --git a/docs/build/unreleased/621.rst b/docs/build/unreleased/621.rst
new file mode 100644 (file)
index 0000000..2221c22
--- /dev/null
@@ -0,0 +1,8 @@
+.. change::
+    :tags: bug, mssql
+    :tickets: 621
+
+    Fixed bug in MSSQL dialect where the drop constraint execution steps used
+    to remove server default or implicit foreign key constraint failed to take
+    into account the schema name of the target table.
+
index 94182711a6e9b09cfe21169d97c026058e43d38c..28a0bd986d6346a86da9b43a8f890d055708686a 100644 (file)
@@ -116,10 +116,29 @@ class OpTest(TestBase):
     def test_alter_column_drop_default(self):
         context = op_fixture("mssql")
         op.alter_column("t", "c", server_default=None)
+        context.assert_contains(
+            "declare @const_name varchar(256)select @const_name = [name] "
+            "from sys.default_constraintswhere parent_object_id = "
+            "object_id('t')and col_name(parent_object_id, "
+            "parent_column_id) = 'c'"
+        )
         context.assert_contains(
             "exec('alter table t drop constraint ' + @const_name)"
         )
 
+    def test_alter_column_drop_default_w_schema(self):
+        context = op_fixture("mssql")
+        op.alter_column("t", "c", server_default=None, schema="xyz")
+        context.assert_contains(
+            "declare @const_name varchar(256)select @const_name = [name] "
+            "from sys.default_constraintswhere parent_object_id = "
+            "object_id('xyz.t')and col_name(parent_object_id, "
+            "parent_column_id) = 'c'"
+        )
+        context.assert_contains(
+            "exec('alter table xyz.t drop constraint ' + @const_name)"
+        )
+
     def test_alter_column_dont_drop_default(self):
         context = op_fixture("mssql")
         op.alter_column("t", "c", server_default=False)
@@ -165,11 +184,37 @@ class OpTest(TestBase):
     def test_drop_column_w_fk(self):
         context = op_fixture("mssql")
         op.drop_column("t1", "c1", mssql_drop_foreign_key=True)
+        context.assert_contains(
+            "declare @const_name varchar(256)\n"
+            "select @const_name = [name] from\n"
+            "sys.foreign_keys fk join sys.foreign_key_columns fkcon "
+            "fk.object_id=fkc.constraint_object_id\n"
+            "where fkc.parent_object_id = object_id('t1')`and "
+            "col_name(fkc.parent_object_id, fkc.parent_column_id) = 'c1'\n"
+            "exec('alter table t1 drop constraint ' + @const_name)"
+        )
         context.assert_contains(
             "exec('alter table t1 drop constraint ' + @const_name)"
         )
         context.assert_contains("ALTER TABLE t1 DROP COLUMN c1")
 
+    def test_drop_column_w_fk_schema(self):
+        context = op_fixture("mssql")
+        op.drop_column("t1", "c1", schema="xyz", mssql_drop_foreign_key=True)
+        context.assert_contains(
+            "declare @const_name varchar(256)\n"
+            "select @const_name = [name] from\n"
+            "sys.foreign_keys fk join sys.foreign_key_columns fkcon "
+            "fk.object_id=fkc.constraint_object_id\n"
+            "where fkc.parent_object_id = object_id('xyz.t1')`and "
+            "col_name(fkc.parent_object_id, fkc.parent_column_id) = 'c1'\n"
+            "exec('alter table xyz.t1 drop constraint ' + @const_name)"
+        )
+        context.assert_contains(
+            "exec('alter table xyz.t1 drop constraint ' + @const_name)"
+        )
+        context.assert_contains("ALTER TABLE xyz.t1 DROP COLUMN c1")
+
     def test_drop_column_w_fk_in_batch(self):
         context = op_fixture("mssql")
         with op.batch_alter_table("t1", schema=None) as batch_op: