# Primary key constraints
s = (
sql.select(
- C.c.column_name, TC.c.constraint_type, C.c.constraint_name
+ C.c.column_name, TC.c.constraint_type, C.c.constraint_name,
+ text("objectproperty(object_id(c.table_schema+'.'+c.constraint_name), 'CnstIsClustKey') as is_clustered")
)
.where(
sql.and_(
pkeys.append(row["COLUMN_NAME"])
if constraint_name is None:
constraint_name = row[C.c.constraint_name.name]
+
if pkeys:
- return {"constrained_columns": pkeys, "name": constraint_name}
+ pkinfo = {"constrained_columns": pkeys, "name": constraint_name}
+ # default PK behavior is clustered in absence of another clustered index
+ # if the PK is nonclustered, include this in the dialect_options
+ if not row["is_clustered"]:
+ pkinfo.setdefault("dialect_options", {})[
+ "mssql_clustered"
+ ] = False
+
+ return pkinfo
else:
return self._default_or_error(
connection,
CreateIndex(idx), "CREATE INDEX idx_x ON t (x)"
)
+ def test_clustered_primary_key_reflection(self, metadata, connection):
+ """
+ A primary key will be clustered by default if no other clustered index
+ exists. mssql_clustered dialect_options should not be present in this
+ case to allow default behavior to control DDL.
+ """
+ t1 = Table(
+ "t",
+ metadata,
+ Column("id", Integer),
+ Column("x", types.String(20)),
+ Column("y", types.Integer),
+ )
+ PrimaryKeyConstraint(t1.c.id, name="pk_t")
+
+ metadata.create_all(connection)
+ pkconst = testing.db.dialect.get_pk_constraint(connection, "t", None)
+
+ assert "dialect_options" not in pkconst
+
+
+ def test_nonclustered_primary_key_reflection(self, metadata, connection):
+ """
+ Nonclustered primary key should include mssql_clustered=False
+ when reflected back
+ """
+ t1 = Table(
+ "t",
+ metadata,
+ Column("id", Integer),
+ Column("x", types.String(20)),
+ Column("y", types.Integer),
+ )
+ PrimaryKeyConstraint(t1.c.id, name="pk_t", mssql_clustered=False )
+
+ metadata.create_all(connection)
+ pk_reflect = testing.db.dialect.get_pk_constraint(connection, "t", None)
+
+ assert "dialect_options" in pk_reflect
+ assert pk_reflect["dialect_options"]["mssql_clustered"] == False
+
+
def test_max_ident_in_varchar_not_present(self, metadata, connection):
"""test [ticket:3504].