]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Add mssql_columnstore to index options.
authorFederico Caselli <cfederico87@gmail.com>
Wed, 28 Jun 2023 22:25:30 +0000 (00:25 +0200)
committerFederico Caselli <cfederico87@gmail.com>
Thu, 29 Jun 2023 20:27:54 +0000 (22:27 +0200)
Added support for creation and reflection of COLUMNSTORE
indexes in MSSQL dialect. Can be specified on indexes
specifying ``mssql_columnstore=True``.

Fixes: #7340
Change-Id: Ieb485b5c95b5c65b5b9f07fa7337eee3df545f95

doc/build/changelog/unreleased_20/7340.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/mssql/base.py
test/dialect/mssql/test_compiler.py
test/dialect/mssql/test_reflection.py

diff --git a/doc/build/changelog/unreleased_20/7340.rst b/doc/build/changelog/unreleased_20/7340.rst
new file mode 100644 (file)
index 0000000..13b6f24
--- /dev/null
@@ -0,0 +1,7 @@
+.. change::
+    :tags: usecase, mssql
+    :tickets: 7340
+
+    Added support for creation and reflection of COLUMNSTORE
+    indexes in MSSQL dialect. Can be specified on indexes
+    specifying ``mssql_columnstore=True``.
index 99c3abe7a4158bf77858ba33ff798118fca640a7..1f80aaef29d5987b365b565e2663b494b1e4b4cc 100644 (file)
@@ -741,6 +741,8 @@ Clustered Index Support
 The MSSQL dialect supports clustered indexes (and primary keys) via the
 ``mssql_clustered`` option.  This option is available to :class:`.Index`,
 :class:`.UniqueConstraint`. and :class:`.PrimaryKeyConstraint`.
+For indexes this option can be combined with the ``mssql_columnstore`` one
+to create a clustered columnstore index.
 
 To generate a clustered index::
 
@@ -782,6 +784,30 @@ which will render the table, for example, as::
   CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL,
                          PRIMARY KEY NONCLUSTERED (x, y))
 
+Columnstore Index Support
+-------------------------
+
+The MSSQL dialect supports columnstore indexes via the ``mssql_columnstore``
+option.  This option is available to :class:`.Index`. It be combined with
+the ``mssql_clustered`` option to create a clustered columnstore index.
+
+To generate a columnstore index::
+
+    Index("my_index", table.c.x, mssql_columnstore=True)
+
+which renders the index as ``CREATE COLUMNSTORE INDEX my_index ON table (x)``.
+
+To generate a clustered columnstore index provide no columns::
+
+    idx = Index("my_index", mssql_clustered=True, mssql_columnstore=True)
+    # required to associate the index with the table
+    table.append_constraint(idx)
+
+the above renders the index as
+``CREATE CLUSTERED COLUMNSTORE INDEX my_index ON table``.
+
+.. versionadded:: 2.0.18
+
 MSSQL-Specific Index Options
 -----------------------------
 
@@ -2615,16 +2641,24 @@ class MSDDLCompiler(compiler.DDLCompiler):
             else:
                 text += "NONCLUSTERED "
 
-        text += "INDEX %s ON %s (%s)" % (
+        # handle columnstore option (has no negative value)
+        columnstore = index.dialect_options["mssql"]["columnstore"]
+        if columnstore:
+            text += "COLUMNSTORE "
+
+        text += "INDEX %s ON %s" % (
             self._prepared_index_name(index, include_schema=include_schema),
             preparer.format_table(index.table),
-            ", ".join(
+        )
+
+        # in some case mssql allows indexes with no columns defined
+        if len(index.expressions) > 0:
+            text += " (%s)" % ", ".join(
                 self.sql_compiler.process(
                     expr, include_table=False, literal_binds=True
                 )
                 for expr in index.expressions
-            ),
-        )
+            )
 
         # handle other included columns
         if index.dialect_options["mssql"]["include"]:
@@ -3052,7 +3086,15 @@ class MSDialect(default.DefaultDialect):
     construct_arguments = [
         (sa_schema.PrimaryKeyConstraint, {"clustered": None}),
         (sa_schema.UniqueConstraint, {"clustered": None}),
-        (sa_schema.Index, {"clustered": None, "include": None, "where": None}),
+        (
+            sa_schema.Index,
+            {
+                "clustered": None,
+                "include": None,
+                "where": None,
+                "columnstore": None,
+            },
+        ),
         (
             sa_schema.Column,
             {"identity_start": None, "identity_increment": None},
@@ -3379,18 +3421,27 @@ class MSDialect(default.DefaultDialect):
         )
         rp = connection.execution_options(future_result=True).execute(
             sql.text(
-                "select ind.index_id, ind.is_unique, ind.name, "
-                "case when ind.index_id = 1 "
-                "then cast(1 as bit) "
-                "else cast(0 as bit) end as is_clustered, "
-                f"{filter_definition} "
-                "from sys.indexes as ind join sys.tables as tab on "
-                "ind.object_id=tab.object_id "
-                "join sys.schemas as sch on sch.schema_id=tab.schema_id "
-                "where tab.name = :tabname "
-                "and sch.name=:schname "
-                "and ind.is_primary_key=0 and ind.type != 0 "
-                "order by ind.name "
+                f"""
+select
+    ind.index_id,
+    ind.is_unique,
+    ind.name,
+    ind.type,
+    {filter_definition}
+from
+    sys.indexes as ind
+join sys.tables as tab on
+    ind.object_id = tab.object_id
+join sys.schemas as sch on
+    sch.schema_id = tab.schema_id
+where
+    tab.name = :tabname
+    and sch.name = :schname
+    and ind.is_primary_key = 0
+    and ind.type != 0
+order by
+    ind.name
+                """
             )
             .bindparams(
                 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
@@ -3400,31 +3451,44 @@ class MSDialect(default.DefaultDialect):
         )
         indexes = {}
         for row in rp.mappings():
-            indexes[row["index_id"]] = {
+            indexes[row["index_id"]] = current = {
                 "name": row["name"],
                 "unique": row["is_unique"] == 1,
                 "column_names": [],
                 "include_columns": [],
-                "dialect_options": {"mssql_clustered": row["is_clustered"]},
+                "dialect_options": {},
             }
 
+            do = current["dialect_options"]
+            index_type = row["type"]
+            if index_type in {1, 2}:
+                do["mssql_clustered"] = index_type == 1
+            if index_type in {5, 6}:
+                do["mssql_clustered"] = index_type == 5
+                do["mssql_columnstore"] = True
             if row["filter_definition"] is not None:
-                indexes[row["index_id"]].setdefault("dialect_options", {})[
-                    "mssql_where"
-                ] = row["filter_definition"]
+                do["mssql_where"] = row["filter_definition"]
 
         rp = connection.execution_options(future_result=True).execute(
             sql.text(
-                "select ind_col.index_id, ind_col.object_id, col.name, "
-                "ind_col.is_included_column "
-                "from sys.columns as col "
-                "join sys.tables as tab on tab.object_id=col.object_id "
-                "join sys.index_columns as ind_col on "
-                "(ind_col.column_id=col.column_id and "
-                "ind_col.object_id=tab.object_id) "
-                "join sys.schemas as sch on sch.schema_id=tab.schema_id "
-                "where tab.name=:tabname "
-                "and sch.name=:schname"
+                """
+select
+    ind_col.index_id,
+    col.name,
+    ind_col.is_included_column
+from
+    sys.columns as col
+join sys.tables as tab on
+    tab.object_id = col.object_id
+join sys.index_columns as ind_col on
+    ind_col.column_id = col.column_id
+    and ind_col.object_id = tab.object_id
+join sys.schemas as sch on
+    sch.schema_id = tab.schema_id
+where
+    tab.name = :tabname
+    and sch.name = :schname
+            """
             )
             .bindparams(
                 sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
@@ -3433,21 +3497,26 @@ class MSDialect(default.DefaultDialect):
             .columns(name=sqltypes.Unicode())
         )
         for row in rp.mappings():
-            if row["index_id"] in indexes:
-                if row["is_included_column"]:
-                    indexes[row["index_id"]]["include_columns"].append(
-                        row["name"]
-                    )
+            if row["index_id"] not in indexes:
+                continue
+            index_def = indexes[row["index_id"]]
+            is_colstore = index_def["dialect_options"].get("mssql_columnstore")
+            is_clustered = index_def["dialect_options"].get("mssql_clustered")
+            if not (is_colstore and is_clustered):
+                # a clustered columnstore index includes all columns but does
+                # not want them in the index definition
+                if row["is_included_column"] and not is_colstore:
+                    # a noncludsted columnstore index reports that includes
+                    # columns but requires that are listed as normal columns
+                    index_def["include_columns"].append(row["name"])
                 else:
-                    indexes[row["index_id"]]["column_names"].append(
-                        row["name"]
-                    )
+                    index_def["column_names"].append(row["name"])
         for index_info in indexes.values():
             # NOTE: "root level" include_columns is legacy, now part of
             #       dialect_options (issue #7382)
-            index_info.setdefault("dialect_options", {})[
-                "mssql_include"
-            ] = index_info["include_columns"]
+            index_info["dialect_options"]["mssql_include"] = index_info[
+                "include_columns"
+            ]
 
         if indexes:
             return list(indexes.values())
index 93edbd9cb9f693e5419a53c52da9dc5f08029772..de7d85afc0459c2dce0ff039baaffc300992e50f 100644 (file)
@@ -1375,6 +1375,42 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
             schema.CreateIndex(idx), "CREATE CLUSTERED INDEX foo ON test (id)"
         )
 
+    def test_index_empty(self):
+        metadata = MetaData()
+        idx = Index("foo")
+        Table("test", metadata, Column("id", Integer)).append_constraint(idx)
+        self.assert_compile(
+            schema.CreateIndex(idx), "CREATE INDEX foo ON test"
+        )
+
+    def test_index_colstore_clustering(self):
+        metadata = MetaData()
+        idx = Index("foo", mssql_clustered=True, mssql_columnstore=True)
+        Table("test", metadata, Column("id", Integer)).append_constraint(idx)
+        self.assert_compile(
+            schema.CreateIndex(idx),
+            "CREATE CLUSTERED COLUMNSTORE INDEX foo ON test",
+        )
+
+    def test_index_colstore_no_clustering(self):
+        metadata = MetaData()
+        tbl = Table("test", metadata, Column("id", Integer))
+        idx = Index(
+            "foo", tbl.c.id, mssql_clustered=False, mssql_columnstore=True
+        )
+        self.assert_compile(
+            schema.CreateIndex(idx),
+            "CREATE NONCLUSTERED COLUMNSTORE INDEX foo ON test (id)",
+        )
+
+    def test_index_not_colstore_clustering(self):
+        metadata = MetaData()
+        idx = Index("foo", mssql_clustered=True, mssql_columnstore=False)
+        Table("test", metadata, Column("id", Integer)).append_constraint(idx)
+        self.assert_compile(
+            schema.CreateIndex(idx), "CREATE CLUSTERED INDEX foo ON test"
+        )
+
     def test_index_where(self):
         metadata = MetaData()
         tbl = Table("test", metadata, Column("data", Integer))
index e360568f77725e08d96ad4577c92a9a1f9c7c7e1..122c5d7e3250d906bb04454528d66c230f5844d1 100644 (file)
@@ -35,6 +35,7 @@ from sqlalchemy.testing import is_
 from sqlalchemy.testing import is_true
 from sqlalchemy.testing import mock
 from sqlalchemy.testing import provision
+from sqlalchemy.testing.assertions import is_false
 
 
 class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
@@ -653,6 +654,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
         for ix in ind:
             if ix["dialect_options"]["mssql_clustered"]:
                 clustered_index = ix["name"]
+                is_false("mssql_columnstore" in ix["dialect_options"])
 
         eq_(clustered_index, "idx_x")
 
@@ -704,6 +706,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
 
         for ix in ind:
             assert ix["dialect_options"]["mssql_clustered"] == False
+            is_false("mssql_columnstore" in ix["dialect_options"])
 
         t2 = Table("t", MetaData(), autoload_with=connection)
         idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0]
@@ -712,6 +715,142 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
             CreateIndex(idx), "CREATE NONCLUSTERED INDEX idx_x ON t (x)"
         )
 
+    @testing.only_if("mssql>=12")
+    def test_index_reflection_colstore_clustered(self, metadata, connection):
+        t1 = Table(
+            "t",
+            metadata,
+            Column("id", Integer),
+            Column("x", types.String(20)),
+            Column("y", types.Integer),
+            Index("idx_x", mssql_clustered=True, mssql_columnstore=True),
+        )
+        Index("idx_y", t1.c.y)
+        metadata.create_all(connection)
+        ind = testing.db.dialect.get_indexes(connection, "t", None)
+
+        for ix in ind:
+            if ix["name"] == "idx_x":
+                is_true(ix["dialect_options"]["mssql_clustered"])
+                is_true(ix["dialect_options"]["mssql_columnstore"])
+                eq_(ix["dialect_options"]["mssql_include"], [])
+                eq_(ix["column_names"], [])
+            else:
+                is_false(ix["dialect_options"]["mssql_clustered"])
+                is_false("mssql_columnstore" in ix["dialect_options"])
+
+        t2 = Table("t", MetaData(), autoload_with=connection)
+        idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0]
+
+        self.assert_compile(
+            CreateIndex(idx), "CREATE CLUSTERED COLUMNSTORE INDEX idx_x ON t"
+        )
+
+    @testing.only_if("mssql>=11")
+    def test_index_reflection_colstore_nonclustered(
+        self, metadata, connection
+    ):
+        t1 = Table(
+            "t",
+            metadata,
+            Column("id", Integer),
+            Column("x", types.String(20)),
+            Column("y", types.Integer),
+        )
+        Index("idx_x", t1.c.x, mssql_clustered=False, mssql_columnstore=True)
+        Index("idx_y", t1.c.y)
+        metadata.create_all(connection)
+        ind = testing.db.dialect.get_indexes(connection, "t", None)
+
+        for ix in ind:
+            is_false(ix["dialect_options"]["mssql_clustered"])
+            if ix["name"] == "idx_x":
+                is_true(ix["dialect_options"]["mssql_columnstore"])
+                eq_(ix["dialect_options"]["mssql_include"], [])
+                eq_(ix["column_names"], ["x"])
+            else:
+                is_false("mssql_columnstore" in ix["dialect_options"])
+
+        t2 = Table("t", MetaData(), autoload_with=connection)
+        idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0]
+
+        self.assert_compile(
+            CreateIndex(idx),
+            "CREATE NONCLUSTERED COLUMNSTORE INDEX idx_x ON t (x)",
+        )
+
+    @testing.only_if("mssql>=11")
+    def test_index_reflection_colstore_nonclustered_none(
+        self, metadata, connection
+    ):
+        t1 = Table(
+            "t",
+            metadata,
+            Column("id", Integer),
+            Column("x", types.String(20)),
+            Column("y", types.Integer),
+        )
+        Index("idx_x", t1.c.x, mssql_columnstore=True)
+        Index("idx_y", t1.c.y)
+        metadata.create_all(connection)
+        ind = testing.db.dialect.get_indexes(connection, "t", None)
+
+        for ix in ind:
+            is_false(ix["dialect_options"]["mssql_clustered"])
+            if ix["name"] == "idx_x":
+                is_true(ix["dialect_options"]["mssql_columnstore"])
+                eq_(ix["dialect_options"]["mssql_include"], [])
+                eq_(ix["column_names"], ["x"])
+            else:
+                is_false("mssql_columnstore" in ix["dialect_options"])
+
+        t2 = Table("t", MetaData(), autoload_with=connection)
+        idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0]
+
+        self.assert_compile(
+            CreateIndex(idx),
+            "CREATE NONCLUSTERED COLUMNSTORE INDEX idx_x ON t (x)",
+        )
+
+    @testing.only_if("mssql>=11")
+    def test_index_reflection_colstore_nonclustered_multicol(
+        self, metadata, connection
+    ):
+        t1 = Table(
+            "t",
+            metadata,
+            Column("id", Integer),
+            Column("x", types.String(20)),
+            Column("y", types.Integer),
+        )
+        Index(
+            "idx_xid",
+            t1.c.x,
+            t1.c.id,
+            mssql_clustered=False,
+            mssql_columnstore=True,
+        )
+        Index("idx_y", t1.c.y)
+        metadata.create_all(connection)
+        ind = testing.db.dialect.get_indexes(connection, "t", None)
+
+        for ix in ind:
+            is_false(ix["dialect_options"]["mssql_clustered"])
+            if ix["name"] == "idx_xid":
+                is_true(ix["dialect_options"]["mssql_columnstore"])
+                eq_(ix["dialect_options"]["mssql_include"], [])
+                eq_(ix["column_names"], ["x", "id"])
+            else:
+                is_false("mssql_columnstore" in ix["dialect_options"])
+
+        t2 = Table("t", MetaData(), autoload_with=connection)
+        idx = list(sorted(t2.indexes, key=lambda idx: idx.name))[0]
+
+        self.assert_compile(
+            CreateIndex(idx),
+            "CREATE NONCLUSTERED COLUMNSTORE INDEX idx_xid ON t (x, id)",
+        )
+
     def test_primary_key_reflection_clustered(self, metadata, connection):
         """
         A primary key will be clustered by default if no other clustered index