From 9fd52e65d4c617657babf8ebbe0bfc4253bc4816 Mon Sep 17 00:00:00 2001 From: John Lennox Date: Mon, 25 Jul 2022 20:47:18 -0400 Subject: [PATCH] Fixes Issue #8288 SQL Server: include index clustering info when retrieving via get_indexes() --- lib/sqlalchemy/dialects/mssql/base.py | 7 ++ test/dialect/mssql/test_reflection.py | 111 +++++++++++++++++++++++++- 2 files changed, 117 insertions(+), 1 deletion(-) diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py index e33ae4dbb6..a600e3ce62 100644 --- a/lib/sqlalchemy/dialects/mssql/base.py +++ b/lib/sqlalchemy/dialects/mssql/base.py @@ -3143,6 +3143,7 @@ class MSDialect(default.DefaultDialect): rp = connection.execution_options(future_result=True).execute( sql.text( "select ind.index_id, ind.is_unique, ind.name, " + "case when ind.index_id = 1 then cast(1 as bit) else cast(0 as bit) end as is_clustered, " f"{filter_definition} " "from sys.indexes as ind join sys.tables as tab on " "ind.object_id=tab.object_id " @@ -3167,6 +3168,12 @@ class MSDialect(default.DefaultDialect): "include_columns": [], } + # issue #8288 + if row["is_clustered"]: + indexes[row["index_id"]].setdefault("dialect_options", {})[ + "mssql_clustered" + ] = True + if row["filter_definition"] is not None: indexes[row["index_id"]].setdefault("dialect_options", {})[ "mssql_where" diff --git a/test/dialect/mssql/test_reflection.py b/test/dialect/mssql/test_reflection.py index 73fd6bf1ca..9c93b8331e 100644 --- a/test/dialect/mssql/test_reflection.py +++ b/test/dialect/mssql/test_reflection.py @@ -568,11 +568,120 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL): t2 = Table("t", MetaData(), autoload_with=connection) idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0] - self.assert_compile( CreateIndex(idx), "CREATE INDEX idx_x ON t (x) WHERE ([x]='test')" ) + def test_index_with_clustered(self, metadata, connection): + """ + when the result of get_indexes() is used build an index it should + include the CLUSTERED keyword when appropriate + """ + t1 = Table( + "t", + metadata, + Column("id", Integer), + Column("x", types.String(20)), + Column("y", types.Integer), + ) + Index("idx_x", t1.c.x, mssql_clustered=True) + Index("idx_y", t1.c.y) + metadata.create_all(connection) + ind = testing.db.dialect.get_indexes(connection, "t", None) + + clustered_index = "" + for ix in ind: + if "dialect_options" in ix: + if "mssql_clustered" in ix["dialect_options"] and ix["dialect_options"]["mssql_clustered"]: + clustered_index = ix["name"] + + eq_(clustered_index, "idx_x") + + t2 = Table("t", MetaData(), autoload_with=connection) + idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0] + + self.assert_compile( + CreateIndex(idx), "CREATE CLUSTERED INDEX idx_x ON t (x)" + ) + + def test_indexes_with_filtered_and_clustered(self, metadata, connection): + """ + table with one filtered index and one clustered index so each index will have different + dialect_options keys + """ + t1 = Table( + "t", + metadata, + Column("id", Integer), + Column("x", types.String(20)), + Column("y", types.Integer), + ) + Index("idx_x", t1.c.x, mssql_clustered=True) + Index("idx_y", t1.c.y, mssql_where=t1.c.y >= 5) + metadata.create_all(connection) + ind = testing.db.dialect.get_indexes(connection, "t", None) + + clustered_index = "" + for ix in ind: + if "dialect_options" in ix: + if "mssql_clustered" in ix["dialect_options"] and ix["dialect_options"]["mssql_clustered"]: + clustered_index = ix["name"] + + eq_(clustered_index, "idx_x") + + filtered_indexes = [] + for ix in ind: + if "dialect_options" in ix: + if "mssql_where" in ix["dialect_options"]: + filtered_indexes.append(ix["dialect_options"]["mssql_where"]) + + eq_(sorted(filtered_indexes), ["([y]>=(5))"]) + + t2 = Table("t", MetaData(), autoload_with=connection) + clustered_idx = list(sorted(t2.indexes, key=lambda clustered_idx: clustered_idx.name))[0] + filtered_idx = list(sorted(t2.indexes, key=lambda filtered_idx: filtered_idx.name))[1] + + self.assert_compile( + CreateIndex(clustered_idx), "CREATE CLUSTERED INDEX idx_x ON t (x)" + ) + + self.assert_compile( + CreateIndex(filtered_idx), "CREATE INDEX idx_y ON t (y) WHERE ([y]>=(5))" + ) + + + def test_index_with_explicit_nonclustered(self, metadata, connection): + """ + Resulting CREATE INDEX statement should use the default behavior and omit the + NONCLUSTERED keyword + """ + t1 = Table( + "t", + metadata, + Column("id", Integer), + Column("x", types.String(20)), + Column("y", types.Integer), + ) + Index("idx_x", t1.c.x, mssql_clustered=False) + Index("idx_y", t1.c.y) + metadata.create_all(connection) + ind = testing.db.dialect.get_indexes(connection, "t", None) + + clustered_index = "" + for ix in ind: + if "dialect_options" in ix: + if "mssql_clustered" in ix["dialect_options"] and ix["dialect_options"]["mssql_clustered"]: + clustered_index = ix["name"] + + eq_(clustered_index, "") + + t2 = Table("t", MetaData(), autoload_with=connection) + idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0] + + self.assert_compile( + CreateIndex(idx), "CREATE INDEX idx_x ON t (x)" + ) + def test_max_ident_in_varchar_not_present(self, metadata, connection): """test [ticket:3504]. -- 2.47.3