--- /dev/null
+.. change::
+ :tags: bug, mssql
+ :tickets: 12654
+
+ Reworked SQL Server column reflection to be based on the ``sys.columns``
+ table rather than ``information_schema.columns`` view. By correctly using
+ the SQL Server ``object_id()`` function as a lead and joining to related
+ tables on object_id rather than names, this repairs a variety of issues in
+ SQL Server reflection, including:
+
+ * Issue where reflected column comments would not correctly line up
+ with the columns themselves in the case that the table had been ALTERed
+ * Correctly targets tables with awkward names such as names with brackets,
+ when reflecting not just the basic table / columns but also extended
+ information including IDENTITY, computed columns, comments which
+ did not work previously
+ * Correctly targets IDENTITY, computed status from temporary tables
+ which did not work previously
@reflection.cache
@_db_plus_owner
def get_columns(self, connection, tablename, dbname, owner, schema, **kw):
+ sys_columns = ischema.sys_columns
+ sys_types = ischema.sys_types
+ sys_default_constraints = ischema.sys_default_constraints
+ computed_cols = ischema.computed_columns
+ identity_cols = ischema.identity_columns
+ extended_properties = ischema.extended_properties
+
+ # to access sys tables, need an object_id.
+ # object_id() can normally match to the unquoted name even if it
+ # has special characters. however it also accepts quoted names,
+ # which means for the special case that the name itself has
+ # "quotes" (e.g. brackets for SQL Server) we need to "quote" (e.g.
+ # bracket) that name anyway. Fixed as part of #12654
+
is_temp_table = tablename.startswith("#")
if is_temp_table:
owner, tablename = self._get_internal_temp_table_name(
connection, tablename
)
- columns = ischema.mssql_temp_table_columns
- else:
- columns = ischema.columns
-
- computed_cols = ischema.computed_columns
- identity_cols = ischema.identity_columns
+ object_id_tokens = [self.identifier_preparer.quote(tablename)]
if owner:
- whereclause = sql.and_(
- columns.c.table_name == tablename,
- columns.c.table_schema == owner,
- )
- full_name = columns.c.table_schema + "." + columns.c.table_name
- else:
- whereclause = columns.c.table_name == tablename
- full_name = columns.c.table_name
+ object_id_tokens.insert(0, self.identifier_preparer.quote(owner))
+
+ if is_temp_table:
+ object_id_tokens.insert(0, "tempdb")
+
+ object_id = func.object_id(".".join(object_id_tokens))
+
+ whereclause = sys_columns.c.object_id == object_id
if self._supports_nvarchar_max:
computed_definition = computed_cols.c.definition
computed_cols.c.definition, NVARCHAR(4000)
)
- object_id = func.object_id(full_name)
-
s = (
sql.select(
- columns.c.column_name,
- columns.c.data_type,
- columns.c.is_nullable,
- columns.c.character_maximum_length,
- columns.c.numeric_precision,
- columns.c.numeric_scale,
- columns.c.column_default,
- columns.c.collation_name,
+ sys_columns.c.name,
+ sys_types.c.name,
+ sys_columns.c.is_nullable,
+ sys_columns.c.max_length,
+ sys_columns.c.precision,
+ sys_columns.c.scale,
+ sys_default_constraints.c.definition,
+ sys_columns.c.collation_name,
computed_definition,
computed_cols.c.is_persisted,
identity_cols.c.is_identity,
identity_cols.c.seed_value,
identity_cols.c.increment_value,
- ischema.extended_properties.c.value.label("comment"),
+ extended_properties.c.value.label("comment"),
+ )
+ .select_from(sys_columns)
+ .join(
+ sys_types,
+ onclause=sys_columns.c.user_type_id
+ == sys_types.c.user_type_id,
+ )
+ .outerjoin(
+ sys_default_constraints,
+ sql.and_(
+ sys_default_constraints.c.object_id
+ == sys_columns.c.default_object_id,
+ sys_default_constraints.c.parent_column_id
+ == sys_columns.c.column_id,
+ ),
)
- .select_from(columns)
.outerjoin(
computed_cols,
onclause=sql.and_(
- computed_cols.c.object_id == object_id,
- computed_cols.c.name
- == columns.c.column_name.collate("DATABASE_DEFAULT"),
+ computed_cols.c.object_id == sys_columns.c.object_id,
+ computed_cols.c.column_id == sys_columns.c.column_id,
),
)
.outerjoin(
identity_cols,
onclause=sql.and_(
- identity_cols.c.object_id == object_id,
- identity_cols.c.name
- == columns.c.column_name.collate("DATABASE_DEFAULT"),
+ identity_cols.c.object_id == sys_columns.c.object_id,
+ identity_cols.c.column_id == sys_columns.c.column_id,
),
)
.outerjoin(
- ischema.extended_properties,
+ extended_properties,
onclause=sql.and_(
- ischema.extended_properties.c["class"] == 1,
- ischema.extended_properties.c.major_id == object_id,
- ischema.extended_properties.c.minor_id
- == columns.c.ordinal_position,
- ischema.extended_properties.c.name == "MS_Description",
+ extended_properties.c["class"] == 1,
+ extended_properties.c.name == "MS_Description",
+ sys_columns.c.object_id == extended_properties.c.major_id,
+ sys_columns.c.column_id == extended_properties.c.minor_id,
),
)
.where(whereclause)
- .order_by(columns.c.ordinal_position)
+ .order_by(sys_columns.c.column_id)
)
- c = connection.execution_options(future_result=True).execute(s)
+ if is_temp_table:
+ exec_opts = {"schema_translate_map": {"sys": "tempdb.sys"}}
+ else:
+ exec_opts = {"schema_translate_map": {}}
+ c = connection.execution_options(**exec_opts).execute(s)
cols = []
for row in c.mappings():
- name = row[columns.c.column_name]
- type_ = row[columns.c.data_type]
- nullable = row[columns.c.is_nullable] == "YES"
- charlen = row[columns.c.character_maximum_length]
- numericprec = row[columns.c.numeric_precision]
- numericscale = row[columns.c.numeric_scale]
- default = row[columns.c.column_default]
- collation = row[columns.c.collation_name]
+ name = row[sys_columns.c.name]
+ type_ = row[sys_types.c.name]
+ nullable = row[sys_columns.c.is_nullable] == 1
+ maxlen = row[sys_columns.c.max_length]
+ numericprec = row[sys_columns.c.precision]
+ numericscale = row[sys_columns.c.scale]
+ default = row[sys_default_constraints.c.definition]
+ collation = row[sys_columns.c.collation_name]
definition = row[computed_definition]
is_persisted = row[computed_cols.c.is_persisted]
is_identity = row[identity_cols.c.is_identity]
identity_start = row[identity_cols.c.seed_value]
identity_increment = row[identity_cols.c.increment_value]
- comment = row[ischema.extended_properties.c.value]
+ comment = row[extended_properties.c.value]
coltype = self.ischema_names.get(type_, None)
kwargs = {}
+
if coltype in (
+ MSBinary,
+ MSVarBinary,
+ sqltypes.LargeBinary,
+ ):
+ kwargs["length"] = maxlen if maxlen != -1 else None
+ elif coltype in (
MSString,
MSChar,
+ MSText,
+ ):
+ kwargs["length"] = maxlen if maxlen != -1 else None
+ if collation:
+ kwargs["collation"] = collation
+ elif coltype in (
MSNVarchar,
MSNChar,
- MSText,
MSNText,
- MSBinary,
- MSVarBinary,
- sqltypes.LargeBinary,
):
- if charlen == -1:
- charlen = None
- kwargs["length"] = charlen
+ kwargs["length"] = maxlen / 2 if maxlen != -1 else None
if collation:
kwargs["collation"] = collation
schema="INFORMATION_SCHEMA",
)
-mssql_temp_table_columns = Table(
- "COLUMNS",
+sys_columns = Table(
+ "columns",
ischema,
- Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
- Column("TABLE_NAME", CoerceUnicode, key="table_name"),
- Column("COLUMN_NAME", CoerceUnicode, key="column_name"),
- Column("IS_NULLABLE", Integer, key="is_nullable"),
- Column("DATA_TYPE", String, key="data_type"),
- Column("ORDINAL_POSITION", Integer, key="ordinal_position"),
- Column(
- "CHARACTER_MAXIMUM_LENGTH", Integer, key="character_maximum_length"
- ),
- Column("NUMERIC_PRECISION", Integer, key="numeric_precision"),
- Column("NUMERIC_SCALE", Integer, key="numeric_scale"),
- Column("COLUMN_DEFAULT", Integer, key="column_default"),
- Column("COLLATION_NAME", String, key="collation_name"),
- schema="tempdb.INFORMATION_SCHEMA",
+ Column("object_id", Integer),
+ Column("name", CoerceUnicode),
+ Column("column_id", Integer),
+ Column("default_object_id", Integer),
+ Column("user_type_id", Integer),
+ Column("is_nullable", Integer),
+ Column("ordinal_position", Integer),
+ Column("max_length", Integer),
+ Column("precision", Integer),
+ Column("scale", Integer),
+ Column("collation_name", String),
+ schema="sys",
+)
+
+sys_types = Table(
+ "types",
+ ischema,
+ Column("name", CoerceUnicode, key="name"),
+ Column("system_type_id", Integer, key="system_type_id"),
+ Column("user_type_id", Integer, key="user_type_id"),
+ Column("schema_id", Integer, key="schema_id"),
+ Column("max_length", Integer, key="max_length"),
+ Column("precision", Integer, key="precision"),
+ Column("scale", Integer, key="scale"),
+ Column("collation_name", CoerceUnicode, key="collation_name"),
+ Column("is_nullable", Boolean, key="is_nullable"),
+ Column("is_user_defined", Boolean, key="is_user_defined"),
+ Column("is_assembly_type", Boolean, key="is_assembly_type"),
+ Column("default_object_id", Integer, key="default_object_id"),
+ Column("rule_object_id", Integer, key="rule_object_id"),
+ Column("is_table_type", Boolean, key="is_table_type"),
+ schema="sys",
)
constraints = Table(
schema="INFORMATION_SCHEMA",
)
+sys_default_constraints = Table(
+ "default_constraints",
+ ischema,
+ Column("object_id", Integer),
+ Column("name", CoerceUnicode),
+ Column("schema_id", Integer),
+ Column("parent_column_id", Integer),
+ Column("definition", CoerceUnicode),
+ schema="sys",
+)
+
column_constraints = Table(
"CONSTRAINT_COLUMN_USAGE",
ischema,
ischema,
Column("object_id", Integer),
Column("name", CoerceUnicode),
+ Column("column_id", Integer),
Column("is_computed", Boolean),
Column("is_persisted", Boolean),
Column("definition", CoerceUnicode),
ischema,
Column("object_id", Integer),
Column("name", CoerceUnicode),
+ Column("column_id", Integer),
Column("is_identity", Boolean),
Column("seed_value", NumericSqlVariant),
Column("increment_value", NumericSqlVariant),
return exclusions.closed()
+ @property
+ def temp_table_comment_reflection(self):
+ """indicates if database supports comments on temp tables and
+ the dialect can reflect them"""
+ return exclusions.closed()
+
@property
def comment_reflection(self):
"""Indicates if the database support table comment reflection"""
return exclusions.open()
+ @property
+ def nvarchar_types(self):
+ """target database supports NVARCHAR and NCHAR as an actual datatype"""
+ return exclusions.closed()
+
@property
def unicode_data_no_special_types(self):
"""Target database/dialect can receive / deliver / compare data with
)
-class BizarroCharacterFKResolutionTest(fixtures.TestBase):
- """tests for #10275"""
+class BizarroCharacterTest(fixtures.TestBase):
__backend__ = True
- __requires__ = ("foreign_key_constraint_reflection",)
- @testing.combinations(
- ("id",), ("(3)",), ("col%p",), ("[brack]",), argnames="columnname"
- )
+ def column_names():
+ return testing.combinations(
+ ("plainname",),
+ ("(3)",),
+ ("col%p",),
+ ("[brack]",),
+ argnames="columnname",
+ )
+
+ def table_names():
+ return testing.combinations(
+ ("plain",),
+ ("(2)",),
+ ("per % cent",),
+ ("[brackets]",),
+ argnames="tablename",
+ )
+
@testing.variation("use_composite", [True, False])
- @testing.combinations(
- ("plain",),
- ("(2)",),
- ("per % cent",),
- ("[brackets]",),
- argnames="tablename",
- )
+ @column_names()
+ @table_names()
+ @testing.requires.foreign_key_constraint_reflection
def test_fk_ref(
self, connection, metadata, use_composite, tablename, columnname
):
+ """tests for #10275"""
tt = Table(
tablename,
metadata,
if use_composite:
assert o2.c.ref2.references(t1.c[1])
+ @column_names()
+ @table_names()
+ @testing.requires.identity_columns
+ def test_reflect_identity(
+ self, tablename, columnname, connection, metadata
+ ):
+ Table(
+ tablename,
+ metadata,
+ Column(columnname, Integer, Identity(), primary_key=True),
+ )
+ metadata.create_all(connection)
+ insp = inspect(connection)
+
+ eq_(insp.get_columns(tablename)[0]["identity"]["start"], 1)
+
+ @column_names()
+ @table_names()
+ @testing.requires.comment_reflection
+ def test_reflect_comments(
+ self, tablename, columnname, connection, metadata
+ ):
+ Table(
+ tablename,
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column(columnname, Integer, comment="some comment"),
+ )
+ metadata.create_all(connection)
+ insp = inspect(connection)
+
+ eq_(insp.get_columns(tablename)[1]["comment"], "some comment")
+
+
+class TempTableElementsTest(fixtures.TestBase):
+
+ __backend__ = True
+
+ __requires__ = ("temp_table_reflection",)
+
+ @testing.fixture
+ def tablename(self):
+ return get_temp_table_name(
+ config, config.db, f"ident_tmp_{config.ident}"
+ )
+
+ @testing.requires.identity_columns
+ def test_reflect_identity(self, tablename, connection, metadata):
+ Table(
+ tablename,
+ metadata,
+ Column("id", Integer, Identity(), primary_key=True),
+ )
+ metadata.create_all(connection)
+ insp = inspect(connection)
+
+ eq_(insp.get_columns(tablename)[0]["identity"]["start"], 1)
+
+ @testing.requires.temp_table_comment_reflection
+ def test_reflect_comments(self, tablename, connection, metadata):
+ Table(
+ tablename,
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("foobar", Integer, comment="some comment"),
+ )
+ metadata.create_all(connection)
+ insp = inspect(connection)
+
+ eq_(insp.get_columns(tablename)[1]["comment"], "some comment")
+
class QuotedNameArgumentTest(fixtures.TablesTest):
run_create_tables = "once"
eq_(typ.scale, 5)
@testing.requires.table_reflection
- def test_varchar_reflection(self, connection, metadata):
- typ = self._type_round_trip(
- connection, metadata, sql_types.String(52)
- )[0]
- assert isinstance(typ, sql_types.String)
+ @testing.combinations(
+ sql_types.String,
+ sql_types.VARCHAR,
+ sql_types.CHAR,
+ (sql_types.NVARCHAR, testing.requires.nvarchar_types),
+ (sql_types.NCHAR, testing.requires.nvarchar_types),
+ argnames="type_",
+ )
+ def test_string_length_reflection(self, connection, metadata, type_):
+ typ = self._type_round_trip(connection, metadata, type_(52))[0]
+ if issubclass(type_, sql_types.VARCHAR):
+ assert isinstance(typ, sql_types.VARCHAR)
+ elif issubclass(type_, sql_types.CHAR):
+ assert isinstance(typ, sql_types.CHAR)
+ else:
+ assert isinstance(typ, sql_types.String)
+
eq_(typ.length, 52)
@testing.requires.table_reflection
"ComponentReflectionTestExtra",
"TableNoColumnsTest",
"QuotedNameArgumentTest",
- "BizarroCharacterFKResolutionTest",
+ "BizarroCharacterTest",
"HasTableTest",
"HasIndexTest",
"NormalizedNameTest",
"ComputedReflectionTest",
"IdentityReflectionTest",
"CompositeKeyReflectionTest",
+ "TempTableElementsTest",
)
},
)
+ def test_comments_with_dropped_column(self, metadata, connection):
+ """test issue #12654"""
+
+ Table(
+ "tbl_with_comments",
+ metadata,
+ Column(
+ "id", types.Integer, primary_key=True, comment="pk comment"
+ ),
+ Column("foobar", Integer, comment="comment_foobar"),
+ Column("foo", Integer, comment="comment_foo"),
+ Column(
+ "bar",
+ Integer,
+ comment="comment_bar",
+ ),
+ )
+ metadata.create_all(connection)
+ insp = inspect(connection)
+ eq_(
+ {
+ c["name"]: c["comment"]
+ for c in insp.get_columns("tbl_with_comments")
+ },
+ {
+ "id": "pk comment",
+ "foobar": "comment_foobar",
+ "foo": "comment_foo",
+ "bar": "comment_bar",
+ },
+ )
+
+ connection.exec_driver_sql(
+ "ALTER TABLE [tbl_with_comments] DROP COLUMN [foobar]"
+ )
+ insp = inspect(connection)
+ eq_(
+ {
+ c["name"]: c["comment"]
+ for c in insp.get_columns("tbl_with_comments")
+ },
+ {
+ "id": "pk comment",
+ "foo": "comment_foo",
+ "bar": "comment_bar",
+ },
+ )
+
class InfoCoerceUnicodeTest(fixtures.TestBase, AssertsCompiledSQL):
def test_info_unicode_cast_no_2000(self):
def fk_constraint_option_reflection_onupdate_restrict(self):
return only_on(["postgresql", "sqlite", self._mysql_80])
+ @property
+ def temp_table_comment_reflection(self):
+ return only_on(["postgresql", "mysql", "mariadb", "oracle"])
+
@property
def comment_reflection(self):
return only_on(["postgresql", "mysql", "mariadb", "oracle", "mssql"])
"""
return exclusions.open()
+ @property
+ def nvarchar_types(self):
+ return only_on(["mssql", "oracle", "sqlite", "mysql", "mariadb"])
+
@property
def unicode_data_no_special_types(self):
"""Target database/dialect can receive / deliver / compare data with