From: Mike Bayer Date: Tue, 12 Nov 2019 15:01:42 +0000 (-0500) Subject: Support schemas for MSSQL drop server default + FK constraint X-Git-Tag: rel_1_3_1~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6e3079526635092c748d33e897bfcdf5482eecc8;p=thirdparty%2Fsqlalchemy%2Falembic.git Support schemas for MSSQL drop server default + FK constraint Fixed bug in MSSQL dialect where the drop constraint execution steps used to remove server default or implicit foreign key constraint failed to take into account the schema name of the target table. Change-Id: Ia95b043ee6289efbb90d6f21392f4ce622748611 Fixes: #621 --- diff --git a/alembic/ddl/mssql.py b/alembic/ddl/mssql.py index de2168bf..3ccbbff0 100644 --- a/alembic/ddl/mssql.py +++ b/alembic/ddl/mssql.py @@ -89,7 +89,10 @@ class MSSQLImpl(DefaultImpl): if existing_server_default is not False or server_default is None: self._exec( _ExecDropConstraint( - table_name, column_name, "sys.default_constraints" + table_name, + column_name, + "sys.default_constraints", + schema, ) ) if server_default is not None: @@ -129,72 +132,83 @@ class MSSQLImpl(DefaultImpl): else: super(MSSQLImpl, self).bulk_insert(table, rows, **kw) - def drop_column(self, table_name, column, **kw): + def drop_column(self, table_name, column, schema=None, **kw): drop_default = kw.pop("mssql_drop_default", False) if drop_default: self._exec( _ExecDropConstraint( - table_name, column, "sys.default_constraints" + table_name, column, "sys.default_constraints", schema ) ) drop_check = kw.pop("mssql_drop_check", False) if drop_check: self._exec( _ExecDropConstraint( - table_name, column, "sys.check_constraints" + table_name, column, "sys.check_constraints", schema ) ) drop_fks = kw.pop("mssql_drop_foreign_key", False) if drop_fks: - self._exec(_ExecDropFKConstraint(table_name, column)) - super(MSSQLImpl, self).drop_column(table_name, column, **kw) + self._exec(_ExecDropFKConstraint(table_name, column, schema)) + super(MSSQLImpl, self).drop_column( + table_name, column, schema=schema, **kw + ) class _ExecDropConstraint(Executable, ClauseElement): - def __init__(self, tname, colname, type_): + def __init__(self, tname, colname, type_, schema): self.tname = tname self.colname = colname self.type_ = type_ + self.schema = schema class _ExecDropFKConstraint(Executable, ClauseElement): - def __init__(self, tname, colname): + def __init__(self, tname, colname, schema): self.tname = tname self.colname = colname + self.schema = schema @compiles(_ExecDropConstraint, "mssql") def _exec_drop_col_constraint(element, compiler, **kw): - tname, colname, type_ = element.tname, element.colname, element.type_ + schema, tname, colname, type_ = ( + element.schema, + element.tname, + element.colname, + element.type_, + ) # from http://www.mssqltips.com/sqlservertip/1425/\ # working-with-default-constraints-in-sql-server/ # TODO: needs table formatting, etc. return """declare @const_name varchar(256) select @const_name = [name] from %(type)s -where parent_object_id = object_id('%(tname)s') +where parent_object_id = object_id('%(schema_dot)s%(tname)s') and col_name(parent_object_id, parent_column_id) = '%(colname)s' exec('alter table %(tname_quoted)s drop constraint ' + @const_name)""" % { "type": type_, "tname": tname, "colname": colname, - "tname_quoted": format_table_name(compiler, tname, None), + "tname_quoted": format_table_name(compiler, tname, schema), + "schema_dot": schema + "." if schema else "", } @compiles(_ExecDropFKConstraint, "mssql") def _exec_drop_col_fk_constraint(element, compiler, **kw): - tname, colname = element.tname, element.colname + schema, tname, colname = element.schema, element.tname, element.colname return """declare @const_name varchar(256) select @const_name = [name] from - sys.foreign_keys fk join sys.foreign_key_columns fkc - on fk.object_id=fkc.constraint_object_id -where fkc.parent_object_id = object_id('%(tname)s') -and col_name(fkc.parent_object_id, fkc.parent_column_id) = '%(colname)s' +sys.foreign_keys fk join sys.foreign_key_columns fkc +on fk.object_id=fkc.constraint_object_id +where fkc.parent_object_id = object_id('%(schema_dot)s%(tname)s') +`and col_name(fkc.parent_object_id, fkc.parent_column_id) = '%(colname)s' exec('alter table %(tname_quoted)s drop constraint ' + @const_name)""" % { "tname": tname, "colname": colname, - "tname_quoted": format_table_name(compiler, tname, None), + "tname_quoted": format_table_name(compiler, tname, schema), + "schema_dot": schema + "." if schema else "", } diff --git a/alembic/testing/fixtures.py b/alembic/testing/fixtures.py index b3990a5b..29514556 100644 --- a/alembic/testing/fixtures.py +++ b/alembic/testing/fixtures.py @@ -119,17 +119,20 @@ def op_fixture( buf = buffer_() class ctx(MigrationContext): + def get_buf(self): + return buf + def clear_assertions(self): buf.lines[:] = [] def assert_(self, *sql): # TODO: make this more flexible about # whitespace and such - eq_(buf.lines, list(sql)) + eq_(buf.lines, [re.sub(r"[\n\t]", "", s) for s in sql]) def assert_contains(self, sql): for stmt in buf.lines: - if sql in stmt: + if re.sub(r"[\n\t]", "", sql) in stmt: return else: assert False, "Could not locate fragment %r in %r" % ( diff --git a/docs/build/unreleased/621.rst b/docs/build/unreleased/621.rst new file mode 100644 index 00000000..2221c229 --- /dev/null +++ b/docs/build/unreleased/621.rst @@ -0,0 +1,8 @@ +.. change:: + :tags: bug, mssql + :tickets: 621 + + Fixed bug in MSSQL dialect where the drop constraint execution steps used + to remove server default or implicit foreign key constraint failed to take + into account the schema name of the target table. + diff --git a/tests/test_mssql.py b/tests/test_mssql.py index 94182711..28a0bd98 100644 --- a/tests/test_mssql.py +++ b/tests/test_mssql.py @@ -116,10 +116,29 @@ class OpTest(TestBase): def test_alter_column_drop_default(self): context = op_fixture("mssql") op.alter_column("t", "c", server_default=None) + context.assert_contains( + "declare @const_name varchar(256)select @const_name = [name] " + "from sys.default_constraintswhere parent_object_id = " + "object_id('t')and col_name(parent_object_id, " + "parent_column_id) = 'c'" + ) context.assert_contains( "exec('alter table t drop constraint ' + @const_name)" ) + def test_alter_column_drop_default_w_schema(self): + context = op_fixture("mssql") + op.alter_column("t", "c", server_default=None, schema="xyz") + context.assert_contains( + "declare @const_name varchar(256)select @const_name = [name] " + "from sys.default_constraintswhere parent_object_id = " + "object_id('xyz.t')and col_name(parent_object_id, " + "parent_column_id) = 'c'" + ) + context.assert_contains( + "exec('alter table xyz.t drop constraint ' + @const_name)" + ) + def test_alter_column_dont_drop_default(self): context = op_fixture("mssql") op.alter_column("t", "c", server_default=False) @@ -165,11 +184,37 @@ class OpTest(TestBase): def test_drop_column_w_fk(self): context = op_fixture("mssql") op.drop_column("t1", "c1", mssql_drop_foreign_key=True) + context.assert_contains( + "declare @const_name varchar(256)\n" + "select @const_name = [name] from\n" + "sys.foreign_keys fk join sys.foreign_key_columns fkcon " + "fk.object_id=fkc.constraint_object_id\n" + "where fkc.parent_object_id = object_id('t1')`and " + "col_name(fkc.parent_object_id, fkc.parent_column_id) = 'c1'\n" + "exec('alter table t1 drop constraint ' + @const_name)" + ) context.assert_contains( "exec('alter table t1 drop constraint ' + @const_name)" ) context.assert_contains("ALTER TABLE t1 DROP COLUMN c1") + def test_drop_column_w_fk_schema(self): + context = op_fixture("mssql") + op.drop_column("t1", "c1", schema="xyz", mssql_drop_foreign_key=True) + context.assert_contains( + "declare @const_name varchar(256)\n" + "select @const_name = [name] from\n" + "sys.foreign_keys fk join sys.foreign_key_columns fkcon " + "fk.object_id=fkc.constraint_object_id\n" + "where fkc.parent_object_id = object_id('xyz.t1')`and " + "col_name(fkc.parent_object_id, fkc.parent_column_id) = 'c1'\n" + "exec('alter table xyz.t1 drop constraint ' + @const_name)" + ) + context.assert_contains( + "exec('alter table xyz.t1 drop constraint ' + @const_name)" + ) + context.assert_contains("ALTER TABLE xyz.t1 DROP COLUMN c1") + def test_drop_column_w_fk_in_batch(self): context = op_fixture("mssql") with op.batch_alter_table("t1", schema=None) as batch_op: