From: John Lennox Date: Fri, 29 Jul 2022 02:18:52 +0000 (-0400) Subject: Fixes issue #8288 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=f78fe15c89b72bb5f1df856564625535de51800f;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Fixes issue #8288 Formatting changes --- diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py index 3fabc05443..628f388cf0 100644 --- a/lib/sqlalchemy/dialects/mssql/base.py +++ b/lib/sqlalchemy/dialects/mssql/base.py @@ -3450,8 +3450,12 @@ class MSDialect(default.DefaultDialect): # Primary key constraints s = ( sql.select( - C.c.column_name, TC.c.constraint_type, C.c.constraint_name, - text("objectproperty(object_id(c.table_schema+'.'+c.constraint_name), 'CnstIsClustKey') as is_clustered") + C.c.column_name, + TC.c.constraint_type, + C.c.constraint_name, + text( + "objectproperty(object_id(c.table_schema+'.'+c.constraint_name), 'CnstIsClustKey') as is_clustered" + ), ) .where( sql.and_( @@ -3470,14 +3474,14 @@ class MSDialect(default.DefaultDialect): pkeys.append(row["COLUMN_NAME"]) if constraint_name is None: constraint_name = row[C.c.constraint_name.name] - + if pkeys: pkinfo = {"constrained_columns": pkeys, "name": constraint_name} # issue #8288 - add mssql_clustered value - pkinfo.setdefault("dialect_options", {})[ - "mssql_clustered" - ] = row["is_clustered"] - + pkinfo.setdefault("dialect_options", {})["mssql_clustered"] = row[ + "is_clustered" + ] + return pkinfo else: return self._default_or_error( diff --git a/test/dialect/mssql/test_reflection.py b/test/dialect/mssql/test_reflection.py index 890e187e2e..973acf19fd 100644 --- a/test/dialect/mssql/test_reflection.py +++ b/test/dialect/mssql/test_reflection.py @@ -569,7 +569,8 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL): t2 = Table("t", MetaData(), autoload_with=connection) idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0] self.assert_compile( - CreateIndex(idx), "CREATE NONCLUSTERED INDEX idx_x ON t (x) WHERE ([x]='test')" + CreateIndex(idx), + "CREATE NONCLUSTERED INDEX idx_x ON t (x) WHERE ([x]='test')", ) def test_index_reflection_clustered(self, metadata, connection): @@ -603,7 +604,9 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL): CreateIndex(idx), "CREATE CLUSTERED INDEX idx_x ON t (x)" ) - def test_index_reflection_filtered_and_clustered(self, metadata, connection): + def test_index_reflection_filtered_and_clustered( + self, metadata, connection + ): """ table with one filtered index and one clustered index so each index will have different dialect_options keys @@ -631,23 +634,29 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL): for ix in ind: if "dialect_options" in ix: if "mssql_where" in ix["dialect_options"]: - filtered_indexes.append(ix["dialect_options"]["mssql_where"]) + filtered_indexes.append( + ix["dialect_options"]["mssql_where"] + ) eq_(sorted(filtered_indexes), ["([y]>=(5))"]) t2 = Table("t", MetaData(), autoload_with=connection) - clustered_idx = list(sorted(t2.indexes, key=lambda clustered_idx: clustered_idx.name))[0] - filtered_idx = list(sorted(t2.indexes, key=lambda filtered_idx: filtered_idx.name))[1] + clustered_idx = list( + sorted(t2.indexes, key=lambda clustered_idx: clustered_idx.name) + )[0] + filtered_idx = list( + sorted(t2.indexes, key=lambda filtered_idx: filtered_idx.name) + )[1] self.assert_compile( CreateIndex(clustered_idx), "CREATE CLUSTERED INDEX idx_x ON t (x)" ) self.assert_compile( - CreateIndex(filtered_idx), "CREATE NONCLUSTERED INDEX idx_y ON t (y) WHERE ([y]>=(5))" + CreateIndex(filtered_idx), + "CREATE NONCLUSTERED INDEX idx_y ON t (y) WHERE ([y]>=(5))", ) - def test_index_reflection_nonclustered(self, metadata, connection): """ one index created by specifying mssql_clustered=False @@ -691,13 +700,14 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL): Column("y", types.Integer), ) PrimaryKeyConstraint(t1.c.id, name="pk_t") - + metadata.create_all(connection) - pk_reflect = testing.db.dialect.get_pk_constraint(connection, "t", None) + pk_reflect = testing.db.dialect.get_pk_constraint( + connection, "t", None + ) assert pk_reflect["dialect_options"]["mssql_clustered"] == True - def test_primary_key_reflection_nonclustered(self, metadata, connection): """ Nonclustered primary key should include mssql_clustered=False @@ -710,14 +720,15 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL): Column("x", types.String(20)), Column("y", types.Integer), ) - PrimaryKeyConstraint(t1.c.id, name="pk_t", mssql_clustered=False ) - + PrimaryKeyConstraint(t1.c.id, name="pk_t", mssql_clustered=False) + metadata.create_all(connection) - pk_reflect = testing.db.dialect.get_pk_constraint(connection, "t", None) + pk_reflect = testing.db.dialect.get_pk_constraint( + connection, "t", None + ) assert pk_reflect["dialect_options"]["mssql_clustered"] == False - def test_max_ident_in_varchar_not_present(self, metadata, connection): """test [ticket:3504].