]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Fixes issue #8288
authorJohn Lennox <john.lennox@comcast.net>
Fri, 29 Jul 2022 02:18:52 +0000 (22:18 -0400)
committerJohn Lennox <john.lennox@comcast.net>
Fri, 29 Jul 2022 02:18:52 +0000 (22:18 -0400)
Formatting changes

lib/sqlalchemy/dialects/mssql/base.py
test/dialect/mssql/test_reflection.py

index 3fabc054439cf0bdf4b3c22bbd538f9eea4024e1..628f388cf0679aff4356711c36d28241eb681cea 100644 (file)
@@ -3450,8 +3450,12 @@ class MSDialect(default.DefaultDialect):
         # Primary key constraints
         s = (
             sql.select(
-                C.c.column_name, TC.c.constraint_type, C.c.constraint_name,
-                text("objectproperty(object_id(c.table_schema+'.'+c.constraint_name), 'CnstIsClustKey') as is_clustered") 
+                C.c.column_name,
+                TC.c.constraint_type,
+                C.c.constraint_name,
+                text(
+                    "objectproperty(object_id(c.table_schema+'.'+c.constraint_name), 'CnstIsClustKey') as is_clustered"
+                ),
             )
             .where(
                 sql.and_(
@@ -3470,14 +3474,14 @@ class MSDialect(default.DefaultDialect):
                 pkeys.append(row["COLUMN_NAME"])
                 if constraint_name is None:
                     constraint_name = row[C.c.constraint_name.name]
-            
+
         if pkeys:
             pkinfo = {"constrained_columns": pkeys, "name": constraint_name}
             # issue #8288 - add mssql_clustered value
-            pkinfo.setdefault("dialect_options", {})[
-                "mssql_clustered"
-            ] = row["is_clustered"]
-            
+            pkinfo.setdefault("dialect_options", {})["mssql_clustered"] = row[
+                "is_clustered"
+            ]
+
             return pkinfo
         else:
             return self._default_or_error(
index 890e187e2ea1266064da2c458d76d8f9fcc1869b..973acf19fd995259578fadbcb19f06a5d82d0944 100644 (file)
@@ -569,7 +569,8 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
         t2 = Table("t", MetaData(), autoload_with=connection)
         idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0]
         self.assert_compile(
-            CreateIndex(idx), "CREATE NONCLUSTERED INDEX idx_x ON t (x) WHERE ([x]='test')"
+            CreateIndex(idx),
+            "CREATE NONCLUSTERED INDEX idx_x ON t (x) WHERE ([x]='test')",
         )
 
     def test_index_reflection_clustered(self, metadata, connection):
@@ -603,7 +604,9 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
             CreateIndex(idx), "CREATE CLUSTERED INDEX idx_x ON t (x)"
         )
 
-    def test_index_reflection_filtered_and_clustered(self, metadata, connection):
+    def test_index_reflection_filtered_and_clustered(
+        self, metadata, connection
+    ):
         """
         table with one filtered index and one clustered index so each index will have different
         dialect_options keys
@@ -631,23 +634,29 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
         for ix in ind:
             if "dialect_options" in ix:
                 if "mssql_where" in ix["dialect_options"]:
-                    filtered_indexes.append(ix["dialect_options"]["mssql_where"])
+                    filtered_indexes.append(
+                        ix["dialect_options"]["mssql_where"]
+                    )
 
         eq_(sorted(filtered_indexes), ["([y]>=(5))"])
 
         t2 = Table("t", MetaData(), autoload_with=connection)
-        clustered_idx = list(sorted(t2.indexes, key=lambda clustered_idx: clustered_idx.name))[0]
-        filtered_idx = list(sorted(t2.indexes, key=lambda filtered_idx: filtered_idx.name))[1]
+        clustered_idx = list(
+            sorted(t2.indexes, key=lambda clustered_idx: clustered_idx.name)
+        )[0]
+        filtered_idx = list(
+            sorted(t2.indexes, key=lambda filtered_idx: filtered_idx.name)
+        )[1]
 
         self.assert_compile(
             CreateIndex(clustered_idx), "CREATE CLUSTERED INDEX idx_x ON t (x)"
         )
 
         self.assert_compile(
-            CreateIndex(filtered_idx), "CREATE NONCLUSTERED INDEX idx_y ON t (y) WHERE ([y]>=(5))"
+            CreateIndex(filtered_idx),
+            "CREATE NONCLUSTERED INDEX idx_y ON t (y) WHERE ([y]>=(5))",
         )
 
-
     def test_index_reflection_nonclustered(self, metadata, connection):
         """
         one index created by specifying mssql_clustered=False
@@ -691,13 +700,14 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
             Column("y", types.Integer),
         )
         PrimaryKeyConstraint(t1.c.id, name="pk_t")
-        
+
         metadata.create_all(connection)
-        pk_reflect = testing.db.dialect.get_pk_constraint(connection, "t", None)
+        pk_reflect = testing.db.dialect.get_pk_constraint(
+            connection, "t", None
+        )
 
         assert pk_reflect["dialect_options"]["mssql_clustered"] == True
 
-
     def test_primary_key_reflection_nonclustered(self, metadata, connection):
         """
         Nonclustered primary key should include mssql_clustered=False
@@ -710,14 +720,15 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
             Column("x", types.String(20)),
             Column("y", types.Integer),
         )
-        PrimaryKeyConstraint(t1.c.id, name="pk_t", mssql_clustered=False )
-        
+        PrimaryKeyConstraint(t1.c.id, name="pk_t", mssql_clustered=False)
+
         metadata.create_all(connection)
-        pk_reflect = testing.db.dialect.get_pk_constraint(connection, "t", None)
+        pk_reflect = testing.db.dialect.get_pk_constraint(
+            connection, "t", None
+        )
 
         assert pk_reflect["dialect_options"]["mssql_clustered"] == False
 
-
     def test_max_ident_in_varchar_not_present(self, metadata, connection):
         """test [ticket:3504].