]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
use sys.columns to allow accurate joining to other SYS tables
authorMike Bayer <mike_mp@zzzcomputing.com>
Sun, 8 Jun 2025 17:01:45 +0000 (13:01 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 10 Jun 2025 13:07:30 +0000 (09:07 -0400)
Reworked SQL Server column reflection to be based on the ``sys.columns``
table rather than ``information_schema.columns`` view.  By correctly using
the SQL Server ``object_id()`` function as a lead and joining to related
tables on object_id rather than names, this repairs a variety of issues in
SQL Server reflection, including:

* Issue where reflected column comments would not correctly line up
with the columns themselves in the case that the table had been ALTERed
* Correctly targets tables with awkward names such as names with brackets,
when reflecting not just the basic table / columns but also extended
information including IDENTITY, computed columns, comments which
did not work previously
* Correctly targets IDENTITY, computed status from temporary tables
which did not work previously

Fixes: #12654
Change-Id: I3bf3088c3eec8d7d3d2abc9da35f9628ef78d537

doc/build/changelog/unreleased_20/12654.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/mssql/base.py
lib/sqlalchemy/dialects/mssql/information_schema.py
lib/sqlalchemy/testing/requirements.py
lib/sqlalchemy/testing/suite/test_reflection.py
test/dialect/mssql/test_reflection.py
test/requirements.py

diff --git a/doc/build/changelog/unreleased_20/12654.rst b/doc/build/changelog/unreleased_20/12654.rst
new file mode 100644 (file)
index 0000000..6348953
--- /dev/null
@@ -0,0 +1,18 @@
+.. change::
+    :tags: bug, mssql
+    :tickets: 12654
+
+    Reworked SQL Server column reflection to be based on the ``sys.columns``
+    table rather than ``information_schema.columns`` view.  By correctly using
+    the SQL Server ``object_id()`` function as a lead and joining to related
+    tables on object_id rather than names, this repairs a variety of issues in
+    SQL Server reflection, including:
+
+    * Issue where reflected column comments would not correctly line up
+      with the columns themselves in the case that the table had been ALTERed
+    * Correctly targets tables with awkward names such as names with brackets,
+      when reflecting not just the basic table / columns but also extended
+      information including IDENTITY, computed columns, comments which
+      did not work previously
+    * Correctly targets IDENTITY, computed status from temporary tables
+      which did not work previously
index ed130051ef403d581373f872765003c71c3d2a7a..a71042a3f02a1c2267a334e1e7bd35277c25a875 100644 (file)
@@ -3594,27 +3594,36 @@ where
     @reflection.cache
     @_db_plus_owner
     def get_columns(self, connection, tablename, dbname, owner, schema, **kw):
+        sys_columns = ischema.sys_columns
+        sys_types = ischema.sys_types
+        sys_default_constraints = ischema.sys_default_constraints
+        computed_cols = ischema.computed_columns
+        identity_cols = ischema.identity_columns
+        extended_properties = ischema.extended_properties
+
+        # to access sys tables, need an object_id.
+        # object_id() can normally match to the unquoted name even if it
+        # has special characters. however it also accepts quoted names,
+        # which means for the special case that the name itself has
+        # "quotes" (e.g. brackets for SQL Server) we need to "quote" (e.g.
+        # bracket) that name anyway.  Fixed as part of #12654
+
         is_temp_table = tablename.startswith("#")
         if is_temp_table:
             owner, tablename = self._get_internal_temp_table_name(
                 connection, tablename
             )
 
-            columns = ischema.mssql_temp_table_columns
-        else:
-            columns = ischema.columns
-
-        computed_cols = ischema.computed_columns
-        identity_cols = ischema.identity_columns
+        object_id_tokens = [self.identifier_preparer.quote(tablename)]
         if owner:
-            whereclause = sql.and_(
-                columns.c.table_name == tablename,
-                columns.c.table_schema == owner,
-            )
-            full_name = columns.c.table_schema + "." + columns.c.table_name
-        else:
-            whereclause = columns.c.table_name == tablename
-            full_name = columns.c.table_name
+            object_id_tokens.insert(0, self.identifier_preparer.quote(owner))
+
+        if is_temp_table:
+            object_id_tokens.insert(0, "tempdb")
+
+        object_id = func.object_id(".".join(object_id_tokens))
+
+        whereclause = sys_columns.c.object_id == object_id
 
         if self._supports_nvarchar_max:
             computed_definition = computed_cols.c.definition
@@ -3624,92 +3633,112 @@ where
                 computed_cols.c.definition, NVARCHAR(4000)
             )
 
-        object_id = func.object_id(full_name)
-
         s = (
             sql.select(
-                columns.c.column_name,
-                columns.c.data_type,
-                columns.c.is_nullable,
-                columns.c.character_maximum_length,
-                columns.c.numeric_precision,
-                columns.c.numeric_scale,
-                columns.c.column_default,
-                columns.c.collation_name,
+                sys_columns.c.name,
+                sys_types.c.name,
+                sys_columns.c.is_nullable,
+                sys_columns.c.max_length,
+                sys_columns.c.precision,
+                sys_columns.c.scale,
+                sys_default_constraints.c.definition,
+                sys_columns.c.collation_name,
                 computed_definition,
                 computed_cols.c.is_persisted,
                 identity_cols.c.is_identity,
                 identity_cols.c.seed_value,
                 identity_cols.c.increment_value,
-                ischema.extended_properties.c.value.label("comment"),
+                extended_properties.c.value.label("comment"),
+            )
+            .select_from(sys_columns)
+            .join(
+                sys_types,
+                onclause=sys_columns.c.user_type_id
+                == sys_types.c.user_type_id,
+            )
+            .outerjoin(
+                sys_default_constraints,
+                sql.and_(
+                    sys_default_constraints.c.object_id
+                    == sys_columns.c.default_object_id,
+                    sys_default_constraints.c.parent_column_id
+                    == sys_columns.c.column_id,
+                ),
             )
-            .select_from(columns)
             .outerjoin(
                 computed_cols,
                 onclause=sql.and_(
-                    computed_cols.c.object_id == object_id,
-                    computed_cols.c.name
-                    == columns.c.column_name.collate("DATABASE_DEFAULT"),
+                    computed_cols.c.object_id == sys_columns.c.object_id,
+                    computed_cols.c.column_id == sys_columns.c.column_id,
                 ),
             )
             .outerjoin(
                 identity_cols,
                 onclause=sql.and_(
-                    identity_cols.c.object_id == object_id,
-                    identity_cols.c.name
-                    == columns.c.column_name.collate("DATABASE_DEFAULT"),
+                    identity_cols.c.object_id == sys_columns.c.object_id,
+                    identity_cols.c.column_id == sys_columns.c.column_id,
                 ),
             )
             .outerjoin(
-                ischema.extended_properties,
+                extended_properties,
                 onclause=sql.and_(
-                    ischema.extended_properties.c["class"] == 1,
-                    ischema.extended_properties.c.major_id == object_id,
-                    ischema.extended_properties.c.minor_id
-                    == columns.c.ordinal_position,
-                    ischema.extended_properties.c.name == "MS_Description",
+                    extended_properties.c["class"] == 1,
+                    extended_properties.c.name == "MS_Description",
+                    sys_columns.c.object_id == extended_properties.c.major_id,
+                    sys_columns.c.column_id == extended_properties.c.minor_id,
                 ),
             )
             .where(whereclause)
-            .order_by(columns.c.ordinal_position)
+            .order_by(sys_columns.c.column_id)
         )
 
-        c = connection.execution_options(future_result=True).execute(s)
+        if is_temp_table:
+            exec_opts = {"schema_translate_map": {"sys": "tempdb.sys"}}
+        else:
+            exec_opts = {"schema_translate_map": {}}
+        c = connection.execution_options(**exec_opts).execute(s)
 
         cols = []
         for row in c.mappings():
-            name = row[columns.c.column_name]
-            type_ = row[columns.c.data_type]
-            nullable = row[columns.c.is_nullable] == "YES"
-            charlen = row[columns.c.character_maximum_length]
-            numericprec = row[columns.c.numeric_precision]
-            numericscale = row[columns.c.numeric_scale]
-            default = row[columns.c.column_default]
-            collation = row[columns.c.collation_name]
+            name = row[sys_columns.c.name]
+            type_ = row[sys_types.c.name]
+            nullable = row[sys_columns.c.is_nullable] == 1
+            maxlen = row[sys_columns.c.max_length]
+            numericprec = row[sys_columns.c.precision]
+            numericscale = row[sys_columns.c.scale]
+            default = row[sys_default_constraints.c.definition]
+            collation = row[sys_columns.c.collation_name]
             definition = row[computed_definition]
             is_persisted = row[computed_cols.c.is_persisted]
             is_identity = row[identity_cols.c.is_identity]
             identity_start = row[identity_cols.c.seed_value]
             identity_increment = row[identity_cols.c.increment_value]
-            comment = row[ischema.extended_properties.c.value]
+            comment = row[extended_properties.c.value]
 
             coltype = self.ischema_names.get(type_, None)
 
             kwargs = {}
+
             if coltype in (
+                MSBinary,
+                MSVarBinary,
+                sqltypes.LargeBinary,
+            ):
+                kwargs["length"] = maxlen if maxlen != -1 else None
+            elif coltype in (
                 MSString,
                 MSChar,
+                MSText,
+            ):
+                kwargs["length"] = maxlen if maxlen != -1 else None
+                if collation:
+                    kwargs["collation"] = collation
+            elif coltype in (
                 MSNVarchar,
                 MSNChar,
-                MSText,
                 MSNText,
-                MSBinary,
-                MSVarBinary,
-                sqltypes.LargeBinary,
             ):
-                if charlen == -1:
-                    charlen = None
-                kwargs["length"] = charlen
+                kwargs["length"] = maxlen / 2 if maxlen != -1 else None
                 if collation:
                     kwargs["collation"] = collation
 
index b60bb158b4631ee4ba64e5e9c71277eceb93d765..5a68e3a30997d4c1ea6d9aaa13c2406ce7d4e071 100644 (file)
@@ -88,23 +88,41 @@ columns = Table(
     schema="INFORMATION_SCHEMA",
 )
 
-mssql_temp_table_columns = Table(
-    "COLUMNS",
+sys_columns = Table(
+    "columns",
     ischema,
-    Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
-    Column("TABLE_NAME", CoerceUnicode, key="table_name"),
-    Column("COLUMN_NAME", CoerceUnicode, key="column_name"),
-    Column("IS_NULLABLE", Integer, key="is_nullable"),
-    Column("DATA_TYPE", String, key="data_type"),
-    Column("ORDINAL_POSITION", Integer, key="ordinal_position"),
-    Column(
-        "CHARACTER_MAXIMUM_LENGTH", Integer, key="character_maximum_length"
-    ),
-    Column("NUMERIC_PRECISION", Integer, key="numeric_precision"),
-    Column("NUMERIC_SCALE", Integer, key="numeric_scale"),
-    Column("COLUMN_DEFAULT", Integer, key="column_default"),
-    Column("COLLATION_NAME", String, key="collation_name"),
-    schema="tempdb.INFORMATION_SCHEMA",
+    Column("object_id", Integer),
+    Column("name", CoerceUnicode),
+    Column("column_id", Integer),
+    Column("default_object_id", Integer),
+    Column("user_type_id", Integer),
+    Column("is_nullable", Integer),
+    Column("ordinal_position", Integer),
+    Column("max_length", Integer),
+    Column("precision", Integer),
+    Column("scale", Integer),
+    Column("collation_name", String),
+    schema="sys",
+)
+
+sys_types = Table(
+    "types",
+    ischema,
+    Column("name", CoerceUnicode, key="name"),
+    Column("system_type_id", Integer, key="system_type_id"),
+    Column("user_type_id", Integer, key="user_type_id"),
+    Column("schema_id", Integer, key="schema_id"),
+    Column("max_length", Integer, key="max_length"),
+    Column("precision", Integer, key="precision"),
+    Column("scale", Integer, key="scale"),
+    Column("collation_name", CoerceUnicode, key="collation_name"),
+    Column("is_nullable", Boolean, key="is_nullable"),
+    Column("is_user_defined", Boolean, key="is_user_defined"),
+    Column("is_assembly_type", Boolean, key="is_assembly_type"),
+    Column("default_object_id", Integer, key="default_object_id"),
+    Column("rule_object_id", Integer, key="rule_object_id"),
+    Column("is_table_type", Boolean, key="is_table_type"),
+    schema="sys",
 )
 
 constraints = Table(
@@ -117,6 +135,17 @@ constraints = Table(
     schema="INFORMATION_SCHEMA",
 )
 
+sys_default_constraints = Table(
+    "default_constraints",
+    ischema,
+    Column("object_id", Integer),
+    Column("name", CoerceUnicode),
+    Column("schema_id", Integer),
+    Column("parent_column_id", Integer),
+    Column("definition", CoerceUnicode),
+    schema="sys",
+)
+
 column_constraints = Table(
     "CONSTRAINT_COLUMN_USAGE",
     ischema,
@@ -182,6 +211,7 @@ computed_columns = Table(
     ischema,
     Column("object_id", Integer),
     Column("name", CoerceUnicode),
+    Column("column_id", Integer),
     Column("is_computed", Boolean),
     Column("is_persisted", Boolean),
     Column("definition", CoerceUnicode),
@@ -220,6 +250,7 @@ identity_columns = Table(
     ischema,
     Column("object_id", Integer),
     Column("name", CoerceUnicode),
+    Column("column_id", Integer),
     Column("is_identity", Boolean),
     Column("seed_value", NumericSqlVariant),
     Column("increment_value", NumericSqlVariant),
index f0384eb91af0c4f653d198364bbdb81cb6412408..2f208ec008ad92e6c24971a00ef5c9e9bb0ea451 100644 (file)
@@ -658,6 +658,12 @@ class SuiteRequirements(Requirements):
 
         return exclusions.closed()
 
+    @property
+    def temp_table_comment_reflection(self):
+        """indicates if database supports comments on temp tables and
+        the dialect can reflect them"""
+        return exclusions.closed()
+
     @property
     def comment_reflection(self):
         """Indicates if the database support table comment reflection"""
@@ -823,6 +829,11 @@ class SuiteRequirements(Requirements):
 
         return exclusions.open()
 
+    @property
+    def nvarchar_types(self):
+        """target database supports NVARCHAR and NCHAR as an actual datatype"""
+        return exclusions.closed()
+
     @property
     def unicode_data_no_special_types(self):
         """Target database/dialect can receive / deliver / compare data with
index 5cf860c6a07a63210335c42c976c6361c2fd877f..efb2ad505c6c1dd7439d29455a5e251defe1f39b 100644 (file)
@@ -298,26 +298,36 @@ class HasIndexTest(fixtures.TablesTest):
         )
 
 
-class BizarroCharacterFKResolutionTest(fixtures.TestBase):
-    """tests for #10275"""
+class BizarroCharacterTest(fixtures.TestBase):
 
     __backend__ = True
-    __requires__ = ("foreign_key_constraint_reflection",)
 
-    @testing.combinations(
-        ("id",), ("(3)",), ("col%p",), ("[brack]",), argnames="columnname"
-    )
+    def column_names():
+        return testing.combinations(
+            ("plainname",),
+            ("(3)",),
+            ("col%p",),
+            ("[brack]",),
+            argnames="columnname",
+        )
+
+    def table_names():
+        return testing.combinations(
+            ("plain",),
+            ("(2)",),
+            ("per % cent",),
+            ("[brackets]",),
+            argnames="tablename",
+        )
+
     @testing.variation("use_composite", [True, False])
-    @testing.combinations(
-        ("plain",),
-        ("(2)",),
-        ("per % cent",),
-        ("[brackets]",),
-        argnames="tablename",
-    )
+    @column_names()
+    @table_names()
+    @testing.requires.foreign_key_constraint_reflection
     def test_fk_ref(
         self, connection, metadata, use_composite, tablename, columnname
     ):
+        """tests for #10275"""
         tt = Table(
             tablename,
             metadata,
@@ -357,6 +367,77 @@ class BizarroCharacterFKResolutionTest(fixtures.TestBase):
         if use_composite:
             assert o2.c.ref2.references(t1.c[1])
 
+    @column_names()
+    @table_names()
+    @testing.requires.identity_columns
+    def test_reflect_identity(
+        self, tablename, columnname, connection, metadata
+    ):
+        Table(
+            tablename,
+            metadata,
+            Column(columnname, Integer, Identity(), primary_key=True),
+        )
+        metadata.create_all(connection)
+        insp = inspect(connection)
+
+        eq_(insp.get_columns(tablename)[0]["identity"]["start"], 1)
+
+    @column_names()
+    @table_names()
+    @testing.requires.comment_reflection
+    def test_reflect_comments(
+        self, tablename, columnname, connection, metadata
+    ):
+        Table(
+            tablename,
+            metadata,
+            Column("id", Integer, primary_key=True),
+            Column(columnname, Integer, comment="some comment"),
+        )
+        metadata.create_all(connection)
+        insp = inspect(connection)
+
+        eq_(insp.get_columns(tablename)[1]["comment"], "some comment")
+
+
+class TempTableElementsTest(fixtures.TestBase):
+
+    __backend__ = True
+
+    __requires__ = ("temp_table_reflection",)
+
+    @testing.fixture
+    def tablename(self):
+        return get_temp_table_name(
+            config, config.db, f"ident_tmp_{config.ident}"
+        )
+
+    @testing.requires.identity_columns
+    def test_reflect_identity(self, tablename, connection, metadata):
+        Table(
+            tablename,
+            metadata,
+            Column("id", Integer, Identity(), primary_key=True),
+        )
+        metadata.create_all(connection)
+        insp = inspect(connection)
+
+        eq_(insp.get_columns(tablename)[0]["identity"]["start"], 1)
+
+    @testing.requires.temp_table_comment_reflection
+    def test_reflect_comments(self, tablename, connection, metadata):
+        Table(
+            tablename,
+            metadata,
+            Column("id", Integer, primary_key=True),
+            Column("foobar", Integer, comment="some comment"),
+        )
+        metadata.create_all(connection)
+        insp = inspect(connection)
+
+        eq_(insp.get_columns(tablename)[1]["comment"], "some comment")
+
 
 class QuotedNameArgumentTest(fixtures.TablesTest):
     run_create_tables = "once"
@@ -2772,11 +2853,23 @@ class ComponentReflectionTestExtra(ComparesIndexes, fixtures.TestBase):
             eq_(typ.scale, 5)
 
     @testing.requires.table_reflection
-    def test_varchar_reflection(self, connection, metadata):
-        typ = self._type_round_trip(
-            connection, metadata, sql_types.String(52)
-        )[0]
-        assert isinstance(typ, sql_types.String)
+    @testing.combinations(
+        sql_types.String,
+        sql_types.VARCHAR,
+        sql_types.CHAR,
+        (sql_types.NVARCHAR, testing.requires.nvarchar_types),
+        (sql_types.NCHAR, testing.requires.nvarchar_types),
+        argnames="type_",
+    )
+    def test_string_length_reflection(self, connection, metadata, type_):
+        typ = self._type_round_trip(connection, metadata, type_(52))[0]
+        if issubclass(type_, sql_types.VARCHAR):
+            assert isinstance(typ, sql_types.VARCHAR)
+        elif issubclass(type_, sql_types.CHAR):
+            assert isinstance(typ, sql_types.CHAR)
+        else:
+            assert isinstance(typ, sql_types.String)
+
         eq_(typ.length, 52)
 
     @testing.requires.table_reflection
@@ -3266,11 +3359,12 @@ __all__ = (
     "ComponentReflectionTestExtra",
     "TableNoColumnsTest",
     "QuotedNameArgumentTest",
-    "BizarroCharacterFKResolutionTest",
+    "BizarroCharacterTest",
     "HasTableTest",
     "HasIndexTest",
     "NormalizedNameTest",
     "ComputedReflectionTest",
     "IdentityReflectionTest",
     "CompositeKeyReflectionTest",
+    "TempTableElementsTest",
 )
index 7222ba47ae3e83ff9c3f3452184dfc929114a4cc..06e5147fbeefd244cc46b4d91e42bc54467f9b64 100644 (file)
@@ -985,6 +985,54 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
             },
         )
 
+    def test_comments_with_dropped_column(self, metadata, connection):
+        """test issue #12654"""
+
+        Table(
+            "tbl_with_comments",
+            metadata,
+            Column(
+                "id", types.Integer, primary_key=True, comment="pk comment"
+            ),
+            Column("foobar", Integer, comment="comment_foobar"),
+            Column("foo", Integer, comment="comment_foo"),
+            Column(
+                "bar",
+                Integer,
+                comment="comment_bar",
+            ),
+        )
+        metadata.create_all(connection)
+        insp = inspect(connection)
+        eq_(
+            {
+                c["name"]: c["comment"]
+                for c in insp.get_columns("tbl_with_comments")
+            },
+            {
+                "id": "pk comment",
+                "foobar": "comment_foobar",
+                "foo": "comment_foo",
+                "bar": "comment_bar",
+            },
+        )
+
+        connection.exec_driver_sql(
+            "ALTER TABLE [tbl_with_comments] DROP COLUMN [foobar]"
+        )
+        insp = inspect(connection)
+        eq_(
+            {
+                c["name"]: c["comment"]
+                for c in insp.get_columns("tbl_with_comments")
+            },
+            {
+                "id": "pk comment",
+                "foo": "comment_foo",
+                "bar": "comment_bar",
+            },
+        )
+
 
 class InfoCoerceUnicodeTest(fixtures.TestBase, AssertsCompiledSQL):
     def test_info_unicode_cast_no_2000(self):
index 1f4a4eb392344e9c278f01215df54bc1594cd83e..72b609f21f1e30037975a083808c13cc9d0c3e33 100644 (file)
@@ -159,6 +159,10 @@ class DefaultRequirements(SuiteRequirements):
     def fk_constraint_option_reflection_onupdate_restrict(self):
         return only_on(["postgresql", "sqlite", self._mysql_80])
 
+    @property
+    def temp_table_comment_reflection(self):
+        return only_on(["postgresql", "mysql", "mariadb", "oracle"])
+
     @property
     def comment_reflection(self):
         return only_on(["postgresql", "mysql", "mariadb", "oracle", "mssql"])
@@ -993,6 +997,10 @@ class DefaultRequirements(SuiteRequirements):
         """
         return exclusions.open()
 
+    @property
+    def nvarchar_types(self):
+        return only_on(["mssql", "oracle", "sqlite", "mysql", "mariadb"])
+
     @property
     def unicode_data_no_special_types(self):
         """Target database/dialect can receive / deliver / compare data with