]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Fixes Issue #8288
authorJohn Lennox <john.lennox@comcast.net>
Tue, 26 Jul 2022 00:47:18 +0000 (20:47 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 30 Jul 2022 17:35:02 +0000 (13:35 -0400)
SQL Server: include index clustering info when retrieving via get_indexes()

Fixes issue #8288

SQL Server: include mssql_clustered dialect_options when reflecting
primary key constraint via get_pk_constraint()

Fixes issue #8288

include mssql_clustered value when reflecting
SQL Server index and primary key.

Fixes issue #8288

Formatting changes

Change-Id: I3afe99876fc4c9485bad5ef0bfeb4037a1d6ecf3

lib/sqlalchemy/dialects/mssql/base.py
test/dialect/mssql/test_reflection.py

index e33ae4dbb686ec2de0de0515a7b7b20875d41e60..628f388cf0679aff4356711c36d28241eb681cea 100644 (file)
@@ -3143,6 +3143,7 @@ class MSDialect(default.DefaultDialect):
         rp = connection.execution_options(future_result=True).execute(
             sql.text(
                 "select ind.index_id, ind.is_unique, ind.name, "
+                "case when ind.index_id = 1 then cast(1 as bit) else cast(0 as bit) end as is_clustered, "
                 f"{filter_definition} "
                 "from sys.indexes as ind join sys.tables as tab on "
                 "ind.object_id=tab.object_id "
@@ -3167,6 +3168,11 @@ class MSDialect(default.DefaultDialect):
                 "include_columns": [],
             }
 
+            # issue #8288 - add mssql_clustered value
+            indexes[row["index_id"]].setdefault("dialect_options", {})[
+                "mssql_clustered"
+            ] = row["is_clustered"]
+
             if row["filter_definition"] is not None:
                 indexes[row["index_id"]].setdefault("dialect_options", {})[
                     "mssql_where"
@@ -3444,7 +3450,12 @@ class MSDialect(default.DefaultDialect):
         # Primary key constraints
         s = (
             sql.select(
-                C.c.column_name, TC.c.constraint_type, C.c.constraint_name
+                C.c.column_name,
+                TC.c.constraint_type,
+                C.c.constraint_name,
+                text(
+                    "objectproperty(object_id(c.table_schema+'.'+c.constraint_name), 'CnstIsClustKey') as is_clustered"
+                ),
             )
             .where(
                 sql.and_(
@@ -3463,8 +3474,15 @@ class MSDialect(default.DefaultDialect):
                 pkeys.append(row["COLUMN_NAME"])
                 if constraint_name is None:
                     constraint_name = row[C.c.constraint_name.name]
+
         if pkeys:
-            return {"constrained_columns": pkeys, "name": constraint_name}
+            pkinfo = {"constrained_columns": pkeys, "name": constraint_name}
+            # issue #8288 - add mssql_clustered value
+            pkinfo.setdefault("dialect_options", {})["mssql_clustered"] = row[
+                "is_clustered"
+            ]
+
+            return pkinfo
         else:
             return self._default_or_error(
                 connection,
index 73fd6bf1caa277368346f8f94f4846350596970c..973acf19fd995259578fadbcb19f06a5d82d0944 100644 (file)
@@ -568,10 +568,166 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
 
         t2 = Table("t", MetaData(), autoload_with=connection)
         idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0]
+        self.assert_compile(
+            CreateIndex(idx),
+            "CREATE NONCLUSTERED INDEX idx_x ON t (x) WHERE ([x]='test')",
+        )
+
+    def test_index_reflection_clustered(self, metadata, connection):
+        """
+        when the result of get_indexes() is used to build an index it should
+        include the CLUSTERED keyword when appropriate
+        """
+        t1 = Table(
+            "t",
+            metadata,
+            Column("id", Integer),
+            Column("x", types.String(20)),
+            Column("y", types.Integer),
+        )
+        Index("idx_x", t1.c.x, mssql_clustered=True)
+        Index("idx_y", t1.c.y)
+        metadata.create_all(connection)
+        ind = testing.db.dialect.get_indexes(connection, "t", None)
+
+        clustered_index = ""
+        for ix in ind:
+            if ix["dialect_options"]["mssql_clustered"]:
+                clustered_index = ix["name"]
+
+        eq_(clustered_index, "idx_x")
+
+        t2 = Table("t", MetaData(), autoload_with=connection)
+        idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0]
+
+        self.assert_compile(
+            CreateIndex(idx), "CREATE CLUSTERED INDEX idx_x ON t (x)"
+        )
+
+    def test_index_reflection_filtered_and_clustered(
+        self, metadata, connection
+    ):
+        """
+        table with one filtered index and one clustered index so each index will have different
+        dialect_options keys
+        """
+        t1 = Table(
+            "t",
+            metadata,
+            Column("id", Integer),
+            Column("x", types.String(20)),
+            Column("y", types.Integer),
+        )
+        Index("idx_x", t1.c.x, mssql_clustered=True)
+        Index("idx_y", t1.c.y, mssql_where=t1.c.y >= 5)
+        metadata.create_all(connection)
+        ind = testing.db.dialect.get_indexes(connection, "t", None)
+
+        clustered_index = ""
+        for ix in ind:
+            if ix["dialect_options"]["mssql_clustered"]:
+                clustered_index = ix["name"]
+
+        eq_(clustered_index, "idx_x")
+
+        filtered_indexes = []
+        for ix in ind:
+            if "dialect_options" in ix:
+                if "mssql_where" in ix["dialect_options"]:
+                    filtered_indexes.append(
+                        ix["dialect_options"]["mssql_where"]
+                    )
+
+        eq_(sorted(filtered_indexes), ["([y]>=(5))"])
+
+        t2 = Table("t", MetaData(), autoload_with=connection)
+        clustered_idx = list(
+            sorted(t2.indexes, key=lambda clustered_idx: clustered_idx.name)
+        )[0]
+        filtered_idx = list(
+            sorted(t2.indexes, key=lambda filtered_idx: filtered_idx.name)
+        )[1]
+
+        self.assert_compile(
+            CreateIndex(clustered_idx), "CREATE CLUSTERED INDEX idx_x ON t (x)"
+        )
 
         self.assert_compile(
-            CreateIndex(idx), "CREATE INDEX idx_x ON t (x) WHERE ([x]='test')"
+            CreateIndex(filtered_idx),
+            "CREATE NONCLUSTERED INDEX idx_y ON t (y) WHERE ([y]>=(5))",
+        )
+
+    def test_index_reflection_nonclustered(self, metadata, connection):
+        """
+        one index created by specifying mssql_clustered=False
+        one created without specifying mssql_clustered property so it will
+        use default of NONCLUSTERED.
+        When reflected back mssql_clustered=False should be included in both
+        """
+        t1 = Table(
+            "t",
+            metadata,
+            Column("id", Integer),
+            Column("x", types.String(20)),
+            Column("y", types.Integer),
+        )
+        Index("idx_x", t1.c.x, mssql_clustered=False)
+        Index("idx_y", t1.c.y)
+        metadata.create_all(connection)
+        ind = testing.db.dialect.get_indexes(connection, "t", None)
+
+        for ix in ind:
+            assert ix["dialect_options"]["mssql_clustered"] == False
+
+        t2 = Table("t", MetaData(), autoload_with=connection)
+        idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0]
+
+        self.assert_compile(
+            CreateIndex(idx), "CREATE NONCLUSTERED INDEX idx_x ON t (x)"
+        )
+
+    def test_primary_key_reflection_clustered(self, metadata, connection):
+        """
+        A primary key will be clustered by default if no other clustered index
+        exists.
+        When reflected back, mssql_clustered=True should be present.
+        """
+        t1 = Table(
+            "t",
+            metadata,
+            Column("id", Integer),
+            Column("x", types.String(20)),
+            Column("y", types.Integer),
         )
+        PrimaryKeyConstraint(t1.c.id, name="pk_t")
+
+        metadata.create_all(connection)
+        pk_reflect = testing.db.dialect.get_pk_constraint(
+            connection, "t", None
+        )
+
+        assert pk_reflect["dialect_options"]["mssql_clustered"] == True
+
+    def test_primary_key_reflection_nonclustered(self, metadata, connection):
+        """
+        Nonclustered primary key should include mssql_clustered=False
+        when reflected back
+        """
+        t1 = Table(
+            "t",
+            metadata,
+            Column("id", Integer),
+            Column("x", types.String(20)),
+            Column("y", types.Integer),
+        )
+        PrimaryKeyConstraint(t1.c.id, name="pk_t", mssql_clustered=False)
+
+        metadata.create_all(connection)
+        pk_reflect = testing.db.dialect.get_pk_constraint(
+            connection, "t", None
+        )
+
+        assert pk_reflect["dialect_options"]["mssql_clustered"] == False
 
     def test_max_ident_in_varchar_not_present(self, metadata, connection):
         """test [ticket:3504].