]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Fixes issue #8288
authorJohn Lennox <john.lennox@comcast.net>
Thu, 28 Jul 2022 01:51:04 +0000 (21:51 -0400)
committerJohn Lennox <john.lennox@comcast.net>
Thu, 28 Jul 2022 01:51:04 +0000 (21:51 -0400)
SQL Server: include mssql_clustered dialect_options when reflecting
primary key constraint via get_pk_constraint()

lib/sqlalchemy/dialects/mssql/base.py
test/dialect/mssql/test_reflection.py

index a600e3ce624b75ae97c8bb7f77890b323afebd58..01a284d8bb4cf85348fc72a7154ae0ffe403c98a 100644 (file)
@@ -3451,7 +3451,8 @@ class MSDialect(default.DefaultDialect):
         # Primary key constraints
         s = (
             sql.select(
-                C.c.column_name, TC.c.constraint_type, C.c.constraint_name
+                C.c.column_name, TC.c.constraint_type, C.c.constraint_name,
+                text("objectproperty(object_id(c.table_schema+'.'+c.constraint_name), 'CnstIsClustKey') as is_clustered") 
             )
             .where(
                 sql.and_(
@@ -3470,8 +3471,17 @@ class MSDialect(default.DefaultDialect):
                 pkeys.append(row["COLUMN_NAME"])
                 if constraint_name is None:
                     constraint_name = row[C.c.constraint_name.name]
+            
         if pkeys:
-            return {"constrained_columns": pkeys, "name": constraint_name}
+            pkinfo = {"constrained_columns": pkeys, "name": constraint_name}
+            # default PK behavior is clustered in absence of another clustered index
+            # if the PK is nonclustered, include this in the dialect_options
+            if not row["is_clustered"]:
+                pkinfo.setdefault("dialect_options", {})[
+                    "mssql_clustered"
+                ] = False
+            
+            return pkinfo
         else:
             return self._default_or_error(
                 connection,
index 9c93b8331e8fe15c91098e0cd24290f8d4682a39..e678ffe4d3fd091719d3e1cb5f5076cffb7bd6b4 100644 (file)
@@ -682,6 +682,48 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
             CreateIndex(idx), "CREATE INDEX idx_x ON t (x)"
         )
 
+    def test_clustered_primary_key_reflection(self, metadata, connection):
+        """
+        A primary key will be clustered by default if no other clustered index
+        exists. mssql_clustered dialect_options should not be present in this
+        case to allow default behavior to control DDL.
+        """
+        t1 = Table(
+            "t",
+            metadata,
+            Column("id", Integer),
+            Column("x", types.String(20)),
+            Column("y", types.Integer),
+        )
+        PrimaryKeyConstraint(t1.c.id, name="pk_t")
+        
+        metadata.create_all(connection)
+        pkconst = testing.db.dialect.get_pk_constraint(connection, "t", None)
+
+        assert "dialect_options" not in pkconst
+
+
+    def test_nonclustered_primary_key_reflection(self, metadata, connection):
+        """
+        Nonclustered primary key should include mssql_clustered=False
+        when reflected back
+        """
+        t1 = Table(
+            "t",
+            metadata,
+            Column("id", Integer),
+            Column("x", types.String(20)),
+            Column("y", types.Integer),
+        )
+        PrimaryKeyConstraint(t1.c.id, name="pk_t", mssql_clustered=False )
+        
+        metadata.create_all(connection)
+        pk_reflect = testing.db.dialect.get_pk_constraint(connection, "t", None)
+
+        assert "dialect_options" in pk_reflect
+        assert pk_reflect["dialect_options"]["mssql_clustered"] == False
+
+
     def test_max_ident_in_varchar_not_present(self, metadata, connection):
         """test [ticket:3504].