--- /dev/null
+.. change::
+ :tags: usecase, mssql
+ :tickets: 7340
+
+ Added support for creation and reflection of COLUMNSTORE
+ indexes in MSSQL dialect. Can be specified on indexes
+ specifying ``mssql_columnstore=True``.
The MSSQL dialect supports clustered indexes (and primary keys) via the
``mssql_clustered`` option. This option is available to :class:`.Index`,
:class:`.UniqueConstraint`. and :class:`.PrimaryKeyConstraint`.
+For indexes this option can be combined with the ``mssql_columnstore`` one
+to create a clustered columnstore index.
To generate a clustered index::
CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL,
PRIMARY KEY NONCLUSTERED (x, y))
+Columnstore Index Support
+-------------------------
+
+The MSSQL dialect supports columnstore indexes via the ``mssql_columnstore``
+option. This option is available to :class:`.Index`. It be combined with
+the ``mssql_clustered`` option to create a clustered columnstore index.
+
+To generate a columnstore index::
+
+ Index("my_index", table.c.x, mssql_columnstore=True)
+
+which renders the index as ``CREATE COLUMNSTORE INDEX my_index ON table (x)``.
+
+To generate a clustered columnstore index provide no columns::
+
+ idx = Index("my_index", mssql_clustered=True, mssql_columnstore=True)
+ # required to associate the index with the table
+ table.append_constraint(idx)
+
+the above renders the index as
+``CREATE CLUSTERED COLUMNSTORE INDEX my_index ON table``.
+
+.. versionadded:: 2.0.18
+
MSSQL-Specific Index Options
-----------------------------
else:
text += "NONCLUSTERED "
- text += "INDEX %s ON %s (%s)" % (
+ # handle columnstore option (has no negative value)
+ columnstore = index.dialect_options["mssql"]["columnstore"]
+ if columnstore:
+ text += "COLUMNSTORE "
+
+ text += "INDEX %s ON %s" % (
self._prepared_index_name(index, include_schema=include_schema),
preparer.format_table(index.table),
- ", ".join(
+ )
+
+ # in some case mssql allows indexes with no columns defined
+ if len(index.expressions) > 0:
+ text += " (%s)" % ", ".join(
self.sql_compiler.process(
expr, include_table=False, literal_binds=True
)
for expr in index.expressions
- ),
- )
+ )
# handle other included columns
if index.dialect_options["mssql"]["include"]:
construct_arguments = [
(sa_schema.PrimaryKeyConstraint, {"clustered": None}),
(sa_schema.UniqueConstraint, {"clustered": None}),
- (sa_schema.Index, {"clustered": None, "include": None, "where": None}),
+ (
+ sa_schema.Index,
+ {
+ "clustered": None,
+ "include": None,
+ "where": None,
+ "columnstore": None,
+ },
+ ),
(
sa_schema.Column,
{"identity_start": None, "identity_increment": None},
)
rp = connection.execution_options(future_result=True).execute(
sql.text(
- "select ind.index_id, ind.is_unique, ind.name, "
- "case when ind.index_id = 1 "
- "then cast(1 as bit) "
- "else cast(0 as bit) end as is_clustered, "
- f"{filter_definition} "
- "from sys.indexes as ind join sys.tables as tab on "
- "ind.object_id=tab.object_id "
- "join sys.schemas as sch on sch.schema_id=tab.schema_id "
- "where tab.name = :tabname "
- "and sch.name=:schname "
- "and ind.is_primary_key=0 and ind.type != 0 "
- "order by ind.name "
+ f"""
+select
+ ind.index_id,
+ ind.is_unique,
+ ind.name,
+ ind.type,
+ {filter_definition}
+from
+ sys.indexes as ind
+join sys.tables as tab on
+ ind.object_id = tab.object_id
+join sys.schemas as sch on
+ sch.schema_id = tab.schema_id
+where
+ tab.name = :tabname
+ and sch.name = :schname
+ and ind.is_primary_key = 0
+ and ind.type != 0
+order by
+ ind.name
+ """
)
.bindparams(
sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
)
indexes = {}
for row in rp.mappings():
- indexes[row["index_id"]] = {
+ indexes[row["index_id"]] = current = {
"name": row["name"],
"unique": row["is_unique"] == 1,
"column_names": [],
"include_columns": [],
- "dialect_options": {"mssql_clustered": row["is_clustered"]},
+ "dialect_options": {},
}
+ do = current["dialect_options"]
+ index_type = row["type"]
+ if index_type in {1, 2}:
+ do["mssql_clustered"] = index_type == 1
+ if index_type in {5, 6}:
+ do["mssql_clustered"] = index_type == 5
+ do["mssql_columnstore"] = True
if row["filter_definition"] is not None:
- indexes[row["index_id"]].setdefault("dialect_options", {})[
- "mssql_where"
- ] = row["filter_definition"]
+ do["mssql_where"] = row["filter_definition"]
rp = connection.execution_options(future_result=True).execute(
sql.text(
- "select ind_col.index_id, ind_col.object_id, col.name, "
- "ind_col.is_included_column "
- "from sys.columns as col "
- "join sys.tables as tab on tab.object_id=col.object_id "
- "join sys.index_columns as ind_col on "
- "(ind_col.column_id=col.column_id and "
- "ind_col.object_id=tab.object_id) "
- "join sys.schemas as sch on sch.schema_id=tab.schema_id "
- "where tab.name=:tabname "
- "and sch.name=:schname"
+ """
+select
+ ind_col.index_id,
+ col.name,
+ ind_col.is_included_column
+from
+ sys.columns as col
+join sys.tables as tab on
+ tab.object_id = col.object_id
+join sys.index_columns as ind_col on
+ ind_col.column_id = col.column_id
+ and ind_col.object_id = tab.object_id
+join sys.schemas as sch on
+ sch.schema_id = tab.schema_id
+where
+ tab.name = :tabname
+ and sch.name = :schname
+ """
)
.bindparams(
sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
.columns(name=sqltypes.Unicode())
)
for row in rp.mappings():
- if row["index_id"] in indexes:
- if row["is_included_column"]:
- indexes[row["index_id"]]["include_columns"].append(
- row["name"]
- )
+ if row["index_id"] not in indexes:
+ continue
+ index_def = indexes[row["index_id"]]
+ is_colstore = index_def["dialect_options"].get("mssql_columnstore")
+ is_clustered = index_def["dialect_options"].get("mssql_clustered")
+ if not (is_colstore and is_clustered):
+ # a clustered columnstore index includes all columns but does
+ # not want them in the index definition
+ if row["is_included_column"] and not is_colstore:
+ # a noncludsted columnstore index reports that includes
+ # columns but requires that are listed as normal columns
+ index_def["include_columns"].append(row["name"])
else:
- indexes[row["index_id"]]["column_names"].append(
- row["name"]
- )
+ index_def["column_names"].append(row["name"])
for index_info in indexes.values():
# NOTE: "root level" include_columns is legacy, now part of
# dialect_options (issue #7382)
- index_info.setdefault("dialect_options", {})[
- "mssql_include"
- ] = index_info["include_columns"]
+ index_info["dialect_options"]["mssql_include"] = index_info[
+ "include_columns"
+ ]
if indexes:
return list(indexes.values())
schema.CreateIndex(idx), "CREATE CLUSTERED INDEX foo ON test (id)"
)
+ def test_index_empty(self):
+ metadata = MetaData()
+ idx = Index("foo")
+ Table("test", metadata, Column("id", Integer)).append_constraint(idx)
+ self.assert_compile(
+ schema.CreateIndex(idx), "CREATE INDEX foo ON test"
+ )
+
+ def test_index_colstore_clustering(self):
+ metadata = MetaData()
+ idx = Index("foo", mssql_clustered=True, mssql_columnstore=True)
+ Table("test", metadata, Column("id", Integer)).append_constraint(idx)
+ self.assert_compile(
+ schema.CreateIndex(idx),
+ "CREATE CLUSTERED COLUMNSTORE INDEX foo ON test",
+ )
+
+ def test_index_colstore_no_clustering(self):
+ metadata = MetaData()
+ tbl = Table("test", metadata, Column("id", Integer))
+ idx = Index(
+ "foo", tbl.c.id, mssql_clustered=False, mssql_columnstore=True
+ )
+ self.assert_compile(
+ schema.CreateIndex(idx),
+ "CREATE NONCLUSTERED COLUMNSTORE INDEX foo ON test (id)",
+ )
+
+ def test_index_not_colstore_clustering(self):
+ metadata = MetaData()
+ idx = Index("foo", mssql_clustered=True, mssql_columnstore=False)
+ Table("test", metadata, Column("id", Integer)).append_constraint(idx)
+ self.assert_compile(
+ schema.CreateIndex(idx), "CREATE CLUSTERED INDEX foo ON test"
+ )
+
def test_index_where(self):
metadata = MetaData()
tbl = Table("test", metadata, Column("data", Integer))
from sqlalchemy.testing import is_true
from sqlalchemy.testing import mock
from sqlalchemy.testing import provision
+from sqlalchemy.testing.assertions import is_false
class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
for ix in ind:
if ix["dialect_options"]["mssql_clustered"]:
clustered_index = ix["name"]
+ is_false("mssql_columnstore" in ix["dialect_options"])
eq_(clustered_index, "idx_x")
for ix in ind:
assert ix["dialect_options"]["mssql_clustered"] == False
+ is_false("mssql_columnstore" in ix["dialect_options"])
t2 = Table("t", MetaData(), autoload_with=connection)
idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0]
CreateIndex(idx), "CREATE NONCLUSTERED INDEX idx_x ON t (x)"
)
+ @testing.only_if("mssql>=12")
+ def test_index_reflection_colstore_clustered(self, metadata, connection):
+ t1 = Table(
+ "t",
+ metadata,
+ Column("id", Integer),
+ Column("x", types.String(20)),
+ Column("y", types.Integer),
+ Index("idx_x", mssql_clustered=True, mssql_columnstore=True),
+ )
+ Index("idx_y", t1.c.y)
+ metadata.create_all(connection)
+ ind = testing.db.dialect.get_indexes(connection, "t", None)
+
+ for ix in ind:
+ if ix["name"] == "idx_x":
+ is_true(ix["dialect_options"]["mssql_clustered"])
+ is_true(ix["dialect_options"]["mssql_columnstore"])
+ eq_(ix["dialect_options"]["mssql_include"], [])
+ eq_(ix["column_names"], [])
+ else:
+ is_false(ix["dialect_options"]["mssql_clustered"])
+ is_false("mssql_columnstore" in ix["dialect_options"])
+
+ t2 = Table("t", MetaData(), autoload_with=connection)
+ idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0]
+
+ self.assert_compile(
+ CreateIndex(idx), "CREATE CLUSTERED COLUMNSTORE INDEX idx_x ON t"
+ )
+
+ @testing.only_if("mssql>=11")
+ def test_index_reflection_colstore_nonclustered(
+ self, metadata, connection
+ ):
+ t1 = Table(
+ "t",
+ metadata,
+ Column("id", Integer),
+ Column("x", types.String(20)),
+ Column("y", types.Integer),
+ )
+ Index("idx_x", t1.c.x, mssql_clustered=False, mssql_columnstore=True)
+ Index("idx_y", t1.c.y)
+ metadata.create_all(connection)
+ ind = testing.db.dialect.get_indexes(connection, "t", None)
+
+ for ix in ind:
+ is_false(ix["dialect_options"]["mssql_clustered"])
+ if ix["name"] == "idx_x":
+ is_true(ix["dialect_options"]["mssql_columnstore"])
+ eq_(ix["dialect_options"]["mssql_include"], [])
+ eq_(ix["column_names"], ["x"])
+ else:
+ is_false("mssql_columnstore" in ix["dialect_options"])
+
+ t2 = Table("t", MetaData(), autoload_with=connection)
+ idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0]
+
+ self.assert_compile(
+ CreateIndex(idx),
+ "CREATE NONCLUSTERED COLUMNSTORE INDEX idx_x ON t (x)",
+ )
+
+ @testing.only_if("mssql>=11")
+ def test_index_reflection_colstore_nonclustered_none(
+ self, metadata, connection
+ ):
+ t1 = Table(
+ "t",
+ metadata,
+ Column("id", Integer),
+ Column("x", types.String(20)),
+ Column("y", types.Integer),
+ )
+ Index("idx_x", t1.c.x, mssql_columnstore=True)
+ Index("idx_y", t1.c.y)
+ metadata.create_all(connection)
+ ind = testing.db.dialect.get_indexes(connection, "t", None)
+
+ for ix in ind:
+ is_false(ix["dialect_options"]["mssql_clustered"])
+ if ix["name"] == "idx_x":
+ is_true(ix["dialect_options"]["mssql_columnstore"])
+ eq_(ix["dialect_options"]["mssql_include"], [])
+ eq_(ix["column_names"], ["x"])
+ else:
+ is_false("mssql_columnstore" in ix["dialect_options"])
+
+ t2 = Table("t", MetaData(), autoload_with=connection)
+ idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0]
+
+ self.assert_compile(
+ CreateIndex(idx),
+ "CREATE NONCLUSTERED COLUMNSTORE INDEX idx_x ON t (x)",
+ )
+
+ @testing.only_if("mssql>=11")
+ def test_index_reflection_colstore_nonclustered_multicol(
+ self, metadata, connection
+ ):
+ t1 = Table(
+ "t",
+ metadata,
+ Column("id", Integer),
+ Column("x", types.String(20)),
+ Column("y", types.Integer),
+ )
+ Index(
+ "idx_xid",
+ t1.c.x,
+ t1.c.id,
+ mssql_clustered=False,
+ mssql_columnstore=True,
+ )
+ Index("idx_y", t1.c.y)
+ metadata.create_all(connection)
+ ind = testing.db.dialect.get_indexes(connection, "t", None)
+
+ for ix in ind:
+ is_false(ix["dialect_options"]["mssql_clustered"])
+ if ix["name"] == "idx_xid":
+ is_true(ix["dialect_options"]["mssql_columnstore"])
+ eq_(ix["dialect_options"]["mssql_include"], [])
+ eq_(ix["column_names"], ["x", "id"])
+ else:
+ is_false("mssql_columnstore" in ix["dialect_options"])
+
+ t2 = Table("t", MetaData(), autoload_with=connection)
+ idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0]
+
+ self.assert_compile(
+ CreateIndex(idx),
+ "CREATE NONCLUSTERED COLUMNSTORE INDEX idx_xid ON t (x, id)",
+ )
+
def test_primary_key_reflection_clustered(self, metadata, connection):
"""
A primary key will be clustered by default if no other clustered index