# Primary key constraints
s = (
sql.select(
- C.c.column_name, TC.c.constraint_type, C.c.constraint_name,
- text("objectproperty(object_id(c.table_schema+'.'+c.constraint_name), 'CnstIsClustKey') as is_clustered")
+ C.c.column_name,
+ TC.c.constraint_type,
+ C.c.constraint_name,
+ text(
+ "objectproperty(object_id(c.table_schema+'.'+c.constraint_name), 'CnstIsClustKey') as is_clustered"
+ ),
)
.where(
sql.and_(
pkeys.append(row["COLUMN_NAME"])
if constraint_name is None:
constraint_name = row[C.c.constraint_name.name]
-
+
if pkeys:
pkinfo = {"constrained_columns": pkeys, "name": constraint_name}
# issue #8288 - add mssql_clustered value
- pkinfo.setdefault("dialect_options", {})[
- "mssql_clustered"
- ] = row["is_clustered"]
-
+ pkinfo.setdefault("dialect_options", {})["mssql_clustered"] = row[
+ "is_clustered"
+ ]
+
return pkinfo
else:
return self._default_or_error(
t2 = Table("t", MetaData(), autoload_with=connection)
idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0]
self.assert_compile(
- CreateIndex(idx), "CREATE NONCLUSTERED INDEX idx_x ON t (x) WHERE ([x]='test')"
+ CreateIndex(idx),
+ "CREATE NONCLUSTERED INDEX idx_x ON t (x) WHERE ([x]='test')",
)
def test_index_reflection_clustered(self, metadata, connection):
CreateIndex(idx), "CREATE CLUSTERED INDEX idx_x ON t (x)"
)
- def test_index_reflection_filtered_and_clustered(self, metadata, connection):
+ def test_index_reflection_filtered_and_clustered(
+ self, metadata, connection
+ ):
"""
table with one filtered index and one clustered index so each index will have different
dialect_options keys
for ix in ind:
if "dialect_options" in ix:
if "mssql_where" in ix["dialect_options"]:
- filtered_indexes.append(ix["dialect_options"]["mssql_where"])
+ filtered_indexes.append(
+ ix["dialect_options"]["mssql_where"]
+ )
eq_(sorted(filtered_indexes), ["([y]>=(5))"])
t2 = Table("t", MetaData(), autoload_with=connection)
- clustered_idx = list(sorted(t2.indexes, key=lambda clustered_idx: clustered_idx.name))[0]
- filtered_idx = list(sorted(t2.indexes, key=lambda filtered_idx: filtered_idx.name))[1]
+ clustered_idx = list(
+ sorted(t2.indexes, key=lambda clustered_idx: clustered_idx.name)
+ )[0]
+ filtered_idx = list(
+ sorted(t2.indexes, key=lambda filtered_idx: filtered_idx.name)
+ )[1]
self.assert_compile(
CreateIndex(clustered_idx), "CREATE CLUSTERED INDEX idx_x ON t (x)"
)
self.assert_compile(
- CreateIndex(filtered_idx), "CREATE NONCLUSTERED INDEX idx_y ON t (y) WHERE ([y]>=(5))"
+ CreateIndex(filtered_idx),
+ "CREATE NONCLUSTERED INDEX idx_y ON t (y) WHERE ([y]>=(5))",
)
-
def test_index_reflection_nonclustered(self, metadata, connection):
"""
one index created by specifying mssql_clustered=False
Column("y", types.Integer),
)
PrimaryKeyConstraint(t1.c.id, name="pk_t")
-
+
metadata.create_all(connection)
- pk_reflect = testing.db.dialect.get_pk_constraint(connection, "t", None)
+ pk_reflect = testing.db.dialect.get_pk_constraint(
+ connection, "t", None
+ )
assert pk_reflect["dialect_options"]["mssql_clustered"] == True
-
def test_primary_key_reflection_nonclustered(self, metadata, connection):
"""
Nonclustered primary key should include mssql_clustered=False
Column("x", types.String(20)),
Column("y", types.Integer),
)
- PrimaryKeyConstraint(t1.c.id, name="pk_t", mssql_clustered=False )
-
+ PrimaryKeyConstraint(t1.c.id, name="pk_t", mssql_clustered=False)
+
metadata.create_all(connection)
- pk_reflect = testing.db.dialect.get_pk_constraint(connection, "t", None)
+ pk_reflect = testing.db.dialect.get_pk_constraint(
+ connection, "t", None
+ )
assert pk_reflect["dialect_options"]["mssql_clustered"] == False
-
def test_max_ident_in_varchar_not_present(self, metadata, connection):
"""test [ticket:3504].