import codecs
import datetime
+from functools import lru_cache
import operator
import re
from typing import Any
from ... import util
from ...engine import cursor as _cursor
from ...engine import default
+from ...engine import ObjectKind
+from ...engine import ObjectScope
from ...engine import reflection
from ...engine.reflection import ReflectionDefaults
from ...sql import coercions
from ...sql import compiler
from ...sql import elements
from ...sql import expression
-from ...sql import func
from ...sql import quoted_name
from ...sql import roles
from ...sql import sqltypes
return update_wrapper(wrap, fn)
+def _db_plus_owner_multi(fn):
+ def wrap(dialect, connection, schema=None, **kw):
+ dbname, owner = _owner_plus_db(dialect, schema)
+ return _switch_db(
+ dbname,
+ connection,
+ fn,
+ dialect,
+ connection,
+ dbname,
+ owner,
+ schema,
+ **kw,
+ )
+
+ return update_wrapper(wrap, fn)
+
+
def _switch_db(dbname, connection, fn, *arg, **kw):
if dbname:
current_db = connection.exec_driver_sql("select db_name()").scalar()
return c.first() is not None
- def _default_or_error(self, connection, tablename, owner, method, **kw):
- # TODO: try to avoid having to run a separate query here
- if self._internal_has_table(connection, tablename, owner, **kw):
- return method()
- else:
- raise exc.NoSuchTableError(f"{owner}.{tablename}")
-
- @reflection.cache
- @_db_plus_owner
- def get_indexes(self, connection, tablename, dbname, owner, schema, **kw):
- filter_definition = (
- "ind.filter_definition"
- if self.server_version_info >= MS_2008_VERSION
- else "NULL as filter_definition"
- )
- rp = connection.execution_options(future_result=True).execute(
- sql.text(f"""
-select
- ind.index_id,
- ind.is_unique,
- ind.name,
- ind.type,
- {filter_definition}
-from
- sys.indexes as ind
-join sys.tables as tab on
- ind.object_id = tab.object_id
-join sys.schemas as sch on
- sch.schema_id = tab.schema_id
-where
- tab.name = :tabname
- and sch.name = :schname
- and ind.is_primary_key = 0
- and ind.type != 0
-order by
- ind.name
- """)
- .bindparams(
- sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
- sql.bindparam("schname", owner, ischema.CoerceUnicode()),
- )
- .columns(name=sqltypes.Unicode())
- )
- indexes = {}
- for row in rp.mappings():
- indexes[row["index_id"]] = current = {
- "name": row["name"],
- "unique": row["is_unique"] == 1,
- "column_names": [],
- "include_columns": [],
- "dialect_options": {},
- }
-
- do = current["dialect_options"]
- index_type = row["type"]
- if index_type in {1, 2}:
- do["mssql_clustered"] = index_type == 1
- if index_type in {5, 6}:
- do["mssql_clustered"] = index_type == 5
- do["mssql_columnstore"] = True
- if row["filter_definition"] is not None:
- do["mssql_where"] = row["filter_definition"]
-
- rp = connection.execution_options(future_result=True).execute(
- sql.text("""
-select
- ind_col.index_id,
- col.name,
- ind_col.is_included_column
-from
- sys.columns as col
-join sys.tables as tab on
- tab.object_id = col.object_id
-join sys.index_columns as ind_col on
- ind_col.column_id = col.column_id
- and ind_col.object_id = tab.object_id
-join sys.schemas as sch on
- sch.schema_id = tab.schema_id
-where
- tab.name = :tabname
- and sch.name = :schname
-order by
- ind_col.index_id,
- ind_col.key_ordinal
- """)
- .bindparams(
- sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
- sql.bindparam("schname", owner, ischema.CoerceUnicode()),
- )
- .columns(name=sqltypes.Unicode())
- )
- for row in rp.mappings():
- if row["index_id"] not in indexes:
- continue
- index_def = indexes[row["index_id"]]
- is_colstore = index_def["dialect_options"].get("mssql_columnstore")
- is_clustered = index_def["dialect_options"].get("mssql_clustered")
- if not (is_colstore and is_clustered):
- # a clustered columnstore index includes all columns but does
- # not want them in the index definition
- if row["is_included_column"] and not is_colstore:
- # a noncludsted columnstore index reports that includes
- # columns but requires that are listed as normal columns
- index_def["include_columns"].append(row["name"])
- else:
- index_def["column_names"].append(row["name"])
- for index_info in indexes.values():
- # NOTE: "root level" include_columns is legacy, now part of
- # dialect_options (issue #7382)
- index_info["dialect_options"]["mssql_include"] = index_info[
- "include_columns"
- ]
-
- if indexes:
- return list(indexes.values())
- else:
- return self._default_or_error(
- connection, tablename, owner, ReflectionDefaults.indexes, **kw
- )
-
@reflection.cache
@_db_plus_owner
def get_view_definition(
else:
raise exc.NoSuchTableError(f"{owner}.{viewname}")
- @reflection.cache
- def get_table_comment(self, connection, table_name, schema=None, **kw):
- if not self.supports_comments:
- raise NotImplementedError(
- "Can't get table comments on current SQL Server version in use"
- )
-
- schema_name = schema if schema else self.default_schema_name
- COMMENT_SQL = """
- SELECT cast(com.value as nvarchar(max))
- FROM fn_listextendedproperty('MS_Description',
- 'schema', :schema, 'table', :table, NULL, NULL
- ) as com;
- """
-
- comment = connection.execute(
- sql.text(COMMENT_SQL).bindparams(
- sql.bindparam("schema", schema_name, ischema.CoerceUnicode()),
- sql.bindparam("table", table_name, ischema.CoerceUnicode()),
- )
- ).scalar()
- if comment:
- return {"text": comment}
- else:
- return self._default_or_error(
- connection,
- table_name,
- None,
- ReflectionDefaults.table_comment,
- **kw,
- )
-
def _temp_table_name_like_pattern(self, tablename):
# LIKE uses '%' to match zero or more characters and '_' to match any
# single character. We want to match literal underscores, so T-SQL
% tablename
) from ne
- @reflection.cache
- @_db_plus_owner
- def get_columns(self, connection, tablename, dbname, owner, schema, **kw):
+ def _parse_column_info(
+ self,
+ name,
+ type_,
+ nullable,
+ maxlen,
+ numericprec,
+ numericscale,
+ default,
+ collation,
+ definition,
+ is_persisted,
+ is_identity,
+ identity_start,
+ identity_increment,
+ comment,
+ base_type=None,
+ ):
+ # Try to resolve the user type first (e.g., "sysname"),
+ # then fall back to the base type (e.g., "nvarchar").
+ # base_type may be None for CLR types (geography, geometry,
+ # hierarchyid) which have no corresponding base type.
+ coltype = self.ischema_names.get(type_, None)
+ if coltype is None and base_type is not None and base_type != type_:
+ coltype = self.ischema_names.get(base_type, None)
+ kwargs = {}
+
+ if coltype in (MSBinary, MSVarBinary, sqltypes.LargeBinary):
+ kwargs["length"] = maxlen if maxlen != -1 else None
+ elif coltype in (MSString, MSChar, MSText):
+ kwargs["length"] = maxlen if maxlen != -1 else None
+ if collation:
+ kwargs["collation"] = collation
+ elif coltype in (MSNVarchar, MSNChar, MSNText):
+ kwargs["length"] = maxlen // 2 if maxlen != -1 else None
+ if collation:
+ kwargs["collation"] = collation
+
+ if coltype is None:
+ if base_type is not None and base_type != type_:
+ util.warn(
+ "Did not recognize type '%s' (user type) or '%s' "
+ "(base type) of column '%s'" % (type_, base_type, name)
+ )
+ else:
+ util.warn(
+ "Did not recognize type '%s' of column '%s'"
+ % (type_, name)
+ )
+ coltype = sqltypes.NULLTYPE
+ else:
+ if issubclass(coltype, sqltypes.NumericCommon):
+ kwargs["precision"] = numericprec
+ if not issubclass(coltype, sqltypes.Float):
+ kwargs["scale"] = numericscale
+ coltype = coltype(**kwargs)
+
+ cdict = {
+ "name": name,
+ "type": coltype,
+ "nullable": nullable,
+ "default": default,
+ "autoincrement": is_identity is not None,
+ "comment": comment,
+ }
+
+ if definition is not None and is_persisted is not None:
+ cdict["computed"] = {
+ "sqltext": definition,
+ "persisted": is_persisted,
+ }
+
+ if is_identity is not None:
+ if identity_start is None or identity_increment is None:
+ cdict["identity"] = {}
+ else:
+ if isinstance(
+ coltype, (sqltypes.BigInteger, sqltypes.Integer)
+ ):
+ start = int(identity_start)
+ increment = int(identity_increment)
+ else:
+ start = identity_start
+ increment = identity_increment
+ cdict["identity"] = {
+ "start": start,
+ "increment": increment,
+ }
+
+ return cdict
+
+ @lru_cache()
+ def _columns_select(self):
+ """Build the unified Core sys.* select for column reflection.
+
+ Returns a ``select()`` that includes ``table_name`` and ``owner``
+ in the result so callers can group rows by table. The caller
+ applies the appropriate WHERE clause for either the single-table
+ case (``sys_columns.c.object_id == X``) or the multi-table case
+ (``sys_schemas.c.name == owner AND sys_objects.c.name.in_(names)``).
+
+ Used by :meth:`.get_multi_columns` (and indirectly by
+ :meth:`.get_columns` via the multi delegation).
+ """
sys_columns = ischema.sys_columns
sys_types = ischema.sys_types
sys_base_types = ischema.sys_types.alias("base_types")
computed_cols = ischema.computed_columns
identity_cols = ischema.identity_columns
extended_properties = ischema.extended_properties
-
- # to access sys tables, need an object_id.
- # object_id() can normally match to the unquoted name even if it
- # has special characters. however it also accepts quoted names,
- # which means for the special case that the name itself has
- # "quotes" (e.g. brackets for SQL Server) we need to "quote" (e.g.
- # bracket) that name anyway. Fixed as part of #12654
-
- is_temp_table = tablename.startswith("#")
- if is_temp_table:
- owner, tablename = self._get_internal_temp_table_name(
- connection, tablename
- )
-
- object_id_tokens = [self.identifier_preparer.quote(tablename)]
- if owner:
- object_id_tokens.insert(0, self.identifier_preparer.quote(owner))
-
- if is_temp_table:
- object_id_tokens.insert(0, "tempdb")
-
- object_id = func.object_id(".".join(object_id_tokens))
-
- whereclause = sys_columns.c.object_id == object_id
+ sys_objects = ischema.sys_objects
+ sys_schemas = ischema.sys_schemas
if self._supports_nvarchar_max:
computed_definition = computed_cols.c.definition
s = (
sql.select(
- sys_columns.c.name,
- sys_types.c.name,
+ sys_objects.c.name.label("table_name"),
+ sys_schemas.c.name.label("owner"),
+ sys_columns.c.name.label("column_name"),
+ sys_types.c.name.label("type_name"),
sys_base_types.c.name.label("base_type"),
sys_columns.c.is_nullable,
sys_columns.c.max_length,
sys_columns.c.precision,
sys_columns.c.scale,
- sys_default_constraints.c.definition,
+ sys_default_constraints.c.definition.label("default_value"),
sys_columns.c.collation_name,
- computed_definition,
+ computed_definition.label("computed_definition"),
computed_cols.c.is_persisted,
identity_cols.c.is_identity,
identity_cols.c.seed_value,
identity_cols.c.increment_value,
extended_properties.c.value.label("comment"),
+ sys_columns.c.column_id,
)
.select_from(sys_columns)
+ .join(
+ sys_objects,
+ onclause=sys_columns.c.object_id == sys_objects.c.object_id,
+ )
+ .join(
+ sys_schemas,
+ onclause=sys_objects.c.schema_id == sys_schemas.c.schema_id,
+ )
.join(
sys_types,
onclause=sys_columns.c.user_type_id
sys_columns.c.column_id == extended_properties.c.minor_id,
),
)
- .where(whereclause)
- .order_by(sys_columns.c.column_id)
+ .order_by(sys_objects.c.name, sys_columns.c.column_id)
)
+ return s
- if is_temp_table:
- exec_opts = {"schema_translate_map": {"sys": "tempdb.sys"}}
- else:
- exec_opts = {"schema_translate_map": {}}
- c = connection.execution_options(**exec_opts).execute(s)
-
- cols = []
- for row in c.mappings():
- name = row[sys_columns.c.name]
- type_ = row[sys_types.c.name]
- base_type = row["base_type"]
- nullable = row[sys_columns.c.is_nullable] == 1
- maxlen = row[sys_columns.c.max_length]
- numericprec = row[sys_columns.c.precision]
- numericscale = row[sys_columns.c.scale]
- default = row[sys_default_constraints.c.definition]
- collation = row[sys_columns.c.collation_name]
- definition = row[computed_definition]
- is_persisted = row[computed_cols.c.is_persisted]
- is_identity = row[identity_cols.c.is_identity]
- identity_start = row[identity_cols.c.seed_value]
- identity_increment = row[identity_cols.c.increment_value]
- comment = row[extended_properties.c.value]
-
- # Try to resolve the user type first (e.g., "sysname"),
- # then fall back to the base type (e.g., "nvarchar").
- # base_type may be None for CLR types (geography, geometry,
- # hierarchyid) which have no corresponding base type.
- coltype = self.ischema_names.get(type_, None)
- if (
- coltype is None
- and base_type is not None
- and base_type != type_
- ):
- coltype = self.ischema_names.get(base_type, None)
-
- kwargs = {}
-
- if coltype in (
- MSBinary,
- MSVarBinary,
- sqltypes.LargeBinary,
- ):
- kwargs["length"] = maxlen if maxlen != -1 else None
- elif coltype in (
- MSString,
- MSChar,
- MSText,
- ):
- kwargs["length"] = maxlen if maxlen != -1 else None
- if collation:
- kwargs["collation"] = collation
- elif coltype in (
- MSNVarchar,
- MSNChar,
- MSNText,
- ):
- kwargs["length"] = maxlen // 2 if maxlen != -1 else None
- if collation:
- kwargs["collation"] = collation
-
- if coltype is None:
- if base_type is not None and base_type != type_:
- util.warn(
- "Did not recognize type '%s' (user type) or '%s' "
- "(base type) of column '%s'" % (type_, base_type, name)
- )
- else:
- util.warn(
- "Did not recognize type '%s' of column '%s'"
- % (type_, name)
- )
- coltype = sqltypes.NULLTYPE
- else:
- if issubclass(coltype, sqltypes.NumericCommon):
- kwargs["precision"] = numericprec
-
- if not issubclass(coltype, sqltypes.Float):
- kwargs["scale"] = numericscale
-
- coltype = coltype(**kwargs)
- cdict = {
- "name": name,
- "type": coltype,
- "nullable": nullable,
- "default": default,
- "autoincrement": is_identity is not None,
- "comment": comment,
- }
-
- if definition is not None and is_persisted is not None:
- cdict["computed"] = {
- "sqltext": definition,
- "persisted": is_persisted,
- }
-
- if is_identity is not None:
- # identity_start and identity_increment are Decimal or None
- if identity_start is None or identity_increment is None:
- cdict["identity"] = {}
- else:
- if isinstance(coltype, sqltypes.BigInteger):
- start = int(identity_start)
- increment = int(identity_increment)
- elif isinstance(coltype, sqltypes.Integer):
- start = int(identity_start)
- increment = int(identity_increment)
- else:
- start = identity_start
- increment = identity_increment
-
- cdict["identity"] = {
- "start": start,
- "increment": increment,
- }
-
- cols.append(cdict)
-
- if cols:
- return cols
- else:
- return self._default_or_error(
- connection, tablename, owner, ReflectionDefaults.columns, **kw
- )
-
- @reflection.cache
- @_db_plus_owner
- def get_pk_constraint(
- self, connection, tablename, dbname, owner, schema, **kw
- ):
- pkeys = []
- TC = ischema.constraints
- C = ischema.key_constraints.alias("C")
-
- # Primary key constraints
- s = (
- sql.select(
- C.c.column_name,
- TC.c.constraint_type,
- C.c.constraint_name,
- func.objectproperty(
- func.object_id(
- C.c.table_schema + "." + C.c.constraint_name
- ),
- "CnstIsClustKey",
- ).label("is_clustered"),
- )
- .where(
- sql.and_(
- TC.c.constraint_name == C.c.constraint_name,
- TC.c.table_schema == C.c.table_schema,
- C.c.table_name == tablename,
- C.c.table_schema == owner,
- ),
- )
- .order_by(TC.c.constraint_name, C.c.ordinal_position)
- )
- c = connection.execution_options(future_result=True).execute(s)
- constraint_name = None
- is_clustered = None
- for row in c.mappings():
- if "PRIMARY" in row[TC.c.constraint_type.name]:
- pkeys.append(row["COLUMN_NAME"])
- if constraint_name is None:
- constraint_name = row[C.c.constraint_name.name]
- if is_clustered is None:
- is_clustered = row["is_clustered"]
- if pkeys:
- return {
- "constrained_columns": pkeys,
- "name": constraint_name,
- "dialect_options": {"mssql_clustered": is_clustered},
- }
- else:
- return self._default_or_error(
- connection,
- tablename,
- owner,
- ReflectionDefaults.pk_constraint,
- **kw,
- )
-
- @reflection.cache
- @_db_plus_owner
- def get_foreign_keys(
- self, connection, tablename, dbname, owner, schema, **kw
- ):
- # Foreign key constraints
- s = (
- text("""\
+ @staticmethod
+ def _fk_query_sql(fk_info_where, extra_cols=""):
+ return """\
WITH fk_info AS (
SELECT
ischema_ref_con.constraint_schema,
INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS ischema_ref_con
INNER JOIN
INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col ON
- ischema_key_col.table_schema = ischema_ref_con.constraint_schema
+ ischema_key_col.table_schema =
+ ischema_ref_con.constraint_schema
AND ischema_key_col.constraint_name =
- ischema_ref_con.constraint_name
- WHERE ischema_key_col.table_name = :tablename
- AND ischema_key_col.table_schema = :owner
+ ischema_ref_con.constraint_name
+ WHERE %(fk_info_where)s
),
constraint_info AS (
SELECT
sys.columns.name AS column_name
FROM
sys.indexes
- INNER JOIN
- sys.objects ON
- sys.objects.object_id = sys.indexes.object_id
- INNER JOIN
- sys.schemas ON
- sys.schemas.schema_id = sys.objects.schema_id
- INNER JOIN
- sys.index_columns ON
- sys.index_columns.object_id = sys.objects.object_id
+ INNER JOIN sys.objects
+ ON sys.objects.object_id = sys.indexes.object_id
+ INNER JOIN sys.schemas
+ ON sys.schemas.schema_id = sys.objects.schema_id
+ INNER JOIN sys.index_columns
+ ON sys.index_columns.object_id = sys.objects.object_id
AND sys.index_columns.index_id = sys.indexes.index_id
- INNER JOIN
- sys.columns ON
- sys.columns.object_id = sys.indexes.object_id
+ INNER JOIN sys.columns
+ ON sys.columns.object_id = sys.indexes.object_id
AND sys.columns.column_id = sys.index_columns.column_id
)
SELECT
+ %(extra_cols)s
fk_info.constraint_schema,
fk_info.constraint_name,
fk_info.ordinal_position,
AND constraint_info.ordinal_position = fk_info.ordinal_position
UNION
SELECT
+ %(extra_cols)s
fk_info.constraint_schema,
fk_info.constraint_name,
fk_info.ordinal_position,
ORDER BY fk_info.constraint_schema, fk_info.constraint_name,
fk_info.ordinal_position
-""")
+""" % {
+ "fk_info_where": fk_info_where,
+ "extra_cols": extra_cols,
+ }
+
+ # --- multi-reflection API ---
+
+ def _partition_filter_names(
+ self, connection, owner, filter_names, scope, kind
+ ):
+ """Split caller-supplied ``filter_names`` between the bulk
+ ``sys.*`` query path (regular objects in the user database) and
+ the temp object path (objects living in ``tempdb``), and resolve
+ the existing-in-catalog names for the bulk path in one shot.
+
+ Returns the tuple
+ ``(run_bulk, regular_names, multi_object_names, temp_names)``:
+
+ * ``run_bulk`` -- whether the multi method should run its bulk
+ ``sys.*`` query at all.
+ * ``regular_names`` -- list of caller-supplied names (with the
+ caller's casing) for the bulk path, used to build the
+ case-insensitive name map. Empty list when no regular names
+ were requested.
+ * ``multi_object_names`` -- list of names (with server-side
+ casing) that actually exist in the catalog and match the
+ requested ``kind``. This is what the bulk SQL's IN clause
+ gets filtered with. Empty list when no matches.
+ * ``temp_names`` -- list of ``#``-prefixed temp object names
+ that the caller asked for and that the requested
+ ``scope``/``kind`` allows.
+
+ ``scope=ObjectScope.DEFAULT`` excludes temp objects entirely.
+ ``scope=ObjectScope.TEMPORARY`` excludes regular objects.
+ ``ObjectKind.VIEW`` (without TABLE) excludes temp because SQL
+ Server does not have temp views in the same form. SQL Server
+ also has no ``get_temp_table_names`` implementation, so temp
+ objects can only be reflected when the caller supplies their
+ names explicitly.
+ """
+ include_default = scope is not ObjectScope.TEMPORARY
+ include_temp = (
+ scope is not ObjectScope.DEFAULT and ObjectKind.TABLE in kind
+ )
+
+ if filter_names:
+ if include_temp:
+ temp_names = [n for n in filter_names if n.startswith("#")]
+ else:
+ temp_names = []
+
+ if include_default:
+ regular_names = [
+ n for n in filter_names if not n.startswith("#")
+ ]
+ run_bulk = bool(regular_names)
+ else:
+ regular_names = []
+ run_bulk = False
+ else:
+ temp_names = []
+ regular_names = []
+ run_bulk = include_default
+
+ if not run_bulk:
+ return (run_bulk, regular_names, [], temp_names)
+
+ type_filter = []
+ if ObjectKind.TABLE in kind:
+ type_filter.append("'U'")
+ if ObjectKind.VIEW in kind:
+ type_filter.append("'V'")
+ # SQL Server does not support materialized views, so ignore them
+ if not type_filter:
+ return (run_bulk, regular_names, [], temp_names)
+
+ query = (
+ "SELECT o.name "
+ "FROM sys.objects o "
+ "JOIN sys.schemas s ON o.schema_id = s.schema_id "
+ "WHERE s.name = :owner "
+ "AND o.type IN (%s)" % ", ".join(type_filter)
+ )
+ params = [sql.bindparam("owner", owner, ischema.CoerceUnicode())]
+ if regular_names:
+ query += " AND o.name IN :filter_names"
+ params.append(
+ sql.bindparam("filter_names", regular_names, expanding=True)
+ )
+
+ rp = connection.execute(sql.text(query).bindparams(*params))
+ multi_object_names = [row[0] for row in rp]
+
+ return (run_bulk, regular_names, multi_object_names, temp_names)
+
+ @staticmethod
+ def _multi_name_map(filter_names):
+ """Build a case-insensitive ``server_name -> user_name`` mapper.
+
+ MSSQL object names are case-insensitive under the default
+ collation, so a user may pass ``"sOmEtAbLe"`` for a table
+ physically stored as ``"SomeTable"``. The bulk SQL returns the
+ server-side casing, but result keys must use the user-supplied
+ casing so that downstream code (e.g.
+ :meth:`.Inspector.reflect_table`) can find the entry.
+
+ Returns a callable ``(server_name) -> user_name``. When
+ ``filter_names`` is empty/None, returns identity.
+ """
+ if not filter_names:
+ return lambda n: n
+ lookup = {n.lower(): n for n in filter_names}
+ return lambda n: lookup.get(n.lower(), n)
+
+ @staticmethod
+ def _value_or_raise(data, table, schema):
+ """Unwrap a single ``(schema, table)`` entry from a multi-method
+ result, raising :exc:`.NoSuchTableError` when missing.
+
+ Mirrors PostgreSQL's helper of the same name. Used by the
+ single-table reflection wrappers that delegate to the multi
+ implementation.
+ """
+ try:
+ return dict(data)[(schema, table)]
+ except KeyError:
+ raise exc.NoSuchTableError(
+ f"{schema}.{table}" if schema else table
+ ) from None
+
+ @_db_plus_owner_multi
+ def get_multi_columns(
+ self,
+ connection,
+ dbname,
+ owner,
+ schema,
+ filter_names,
+ scope,
+ kind,
+ **kw,
+ ):
+ (
+ run_bulk,
+ regular_names,
+ multi_object_names,
+ temp_names,
+ ) = self._partition_filter_names(
+ connection, owner, filter_names, scope, kind
+ )
+
+ result = {}
+
+ if run_bulk and multi_object_names:
+ name_map = self._multi_name_map(regular_names)
+ self._fetch_multi_columns(
+ connection,
+ owner=owner,
+ names=multi_object_names,
+ schema=schema,
+ name_map=name_map,
+ result=result,
+ exec_opts={"schema_translate_map": {}},
+ )
+
+ if temp_names:
+ self._fetch_multi_columns_temp(
+ connection,
+ temp_names=temp_names,
+ schema=schema,
+ result=result,
+ )
+
+ return result.items()
+
+ def _fetch_multi_columns(
+ self, connection, owner, names, schema, name_map, result, exec_opts
+ ):
+ """Execute the unified columns select for a single catalog pass.
+
+ Used by :meth:`.get_multi_columns` for both the main-database pass
+ (no ``schema_translate_map``) and the tempdb pass (with
+ ``schema_translate_map={"sys": "tempdb.sys"}`` applied).
+ """
+ s = self._columns_select().where(
+ ischema.sys_schemas.c.name == owner,
+ ischema.sys_objects.c.name.in_(
+ sql.bindparam("filter_names", expanding=True)
+ ),
+ )
+
+ rp = connection.execute(
+ s,
+ {"filter_names": names},
+ execution_options=exec_opts,
+ )
+
+ for row in rp.mappings():
+ table_name = name_map(row["table_name"])
+ cdict = self._parse_column_info(
+ name=row["column_name"],
+ type_=row["type_name"],
+ base_type=row["base_type"],
+ nullable=row["is_nullable"] == 1,
+ maxlen=row["max_length"],
+ numericprec=row["precision"],
+ numericscale=row["scale"],
+ default=row["default_value"],
+ collation=row["collation_name"],
+ definition=row["computed_definition"],
+ is_persisted=row["is_persisted"],
+ is_identity=row["is_identity"],
+ identity_start=row["seed_value"],
+ identity_increment=row["increment_value"],
+ comment=row["comment"],
+ )
+ result.setdefault((schema, table_name), []).append(cdict)
+
+ for n in names:
+ key = (schema, name_map(n))
+ if key not in result:
+ result[key] = ReflectionDefaults.columns()
+
+ def _fetch_multi_columns_temp(
+ self, connection, temp_names, schema, result
+ ):
+ """Run the unified columns select against tempdb for temp names.
+
+ Resolves each ``#name`` to its mangled tempdb name (one round
+ trip per temp via :meth:`._get_internal_temp_table_name`), then
+ runs a single bulk select against ``tempdb.sys.*`` via
+ ``schema_translate_map``. The result rows (keyed by mangled
+ name) are mapped back to the original ``#name`` for the caller.
+ """
+ resolved_by_owner, original_by_mangled = self._resolve_temp_names(
+ connection, temp_names
+ )
+
+ for temp_owner, mangled_names in resolved_by_owner.items():
+
+ def temp_name_map(server_name, _by=original_by_mangled):
+ return _by.get(server_name, server_name)
+
+ self._fetch_multi_columns(
+ connection,
+ owner=temp_owner,
+ names=mangled_names,
+ schema=schema,
+ name_map=temp_name_map,
+ result=result,
+ exec_opts={"schema_translate_map": {"sys": "tempdb.sys"}},
+ )
+
+ def _resolve_temp_names(self, connection, temp_names):
+ """Resolve user-facing temp names (``#foo``) to their mangled
+ tempdb names, grouped by owner.
+
+ Returns ``(resolved_by_owner, original_by_mangled)``:
+
+ * ``resolved_by_owner`` -- ``{owner: [mangled_name, ...]}``,
+ one bulk query per owner (typically just ``'dbo'``).
+ * ``original_by_mangled`` -- ``{mangled_name: original_#name}``
+ so caller can map result rows back to the user-supplied name.
+
+ Names that don't resolve (no matching temp table in tempdb) are
+ silently dropped. The single-table reflection wrappers handle
+ the resulting absence by raising :exc:`.NoSuchTableError` via
+ :meth:`._value_or_raise`.
+ """
+ resolved_by_owner = {}
+ original_by_mangled = {}
+ for name in temp_names:
+ try:
+ temp_owner, mangled = self._get_internal_temp_table_name(
+ connection, name
+ )
+ except exc.NoSuchTableError:
+ continue
+ resolved_by_owner.setdefault(temp_owner, []).append(mangled)
+ original_by_mangled[mangled] = name
+ return resolved_by_owner, original_by_mangled
+
+ @_db_plus_owner_multi
+ def get_multi_pk_constraint(
+ self,
+ connection,
+ dbname,
+ owner,
+ schema,
+ filter_names,
+ scope,
+ kind,
+ **kw,
+ ):
+ (
+ run_bulk,
+ regular_names,
+ multi_object_names,
+ temp_names,
+ ) = self._partition_filter_names(
+ connection, owner, filter_names, scope, kind
+ )
+
+ result = {}
+
+ if run_bulk and multi_object_names:
+ name_map = self._multi_name_map(regular_names)
+ self._fetch_multi_pk_constraint(
+ connection,
+ owner=owner,
+ names=multi_object_names,
+ schema=schema,
+ name_map=name_map,
+ result=result,
+ exec_opts={"schema_translate_map": {}},
+ )
+
+ if temp_names:
+ self._fetch_multi_pk_constraint_temp(
+ connection,
+ temp_names=temp_names,
+ schema=schema,
+ result=result,
+ )
+
+ return result.items()
+
+ @lru_cache()
+ def _pk_constraint_select(self):
+ """Build the unified Core sys.* select for PK constraint reflection.
+
+ Returns a ``select()`` that includes ``table_name`` and ``owner``
+ in the result so callers can group rows by table. The clustered
+ flag comes directly from ``sys.indexes.type``(1 = clustered).
+ """
+ sys_key_constraints = ischema.sys_key_constraints
+ sys_indexes = ischema.sys_indexes
+ sys_index_columns = ischema.sys_index_columns
+ sys_columns = ischema.sys_columns
+ sys_objects = ischema.sys_objects
+ sys_schemas = ischema.sys_schemas
+
+ s = (
+ sql.select(
+ sys_objects.c.name.label("table_name"),
+ sys_schemas.c.name.label("owner"),
+ sys_key_constraints.c.name.label("constraint_name"),
+ sys_columns.c.name.label("column_name"),
+ sys_index_columns.c.key_ordinal,
+ sys_indexes.c.type.label("index_type"),
+ )
+ .select_from(sys_key_constraints)
+ .join(
+ sys_objects,
+ onclause=sys_key_constraints.c.parent_object_id
+ == sys_objects.c.object_id,
+ )
+ .join(
+ sys_schemas,
+ onclause=sys_objects.c.schema_id == sys_schemas.c.schema_id,
+ )
+ .join(
+ sys_index_columns,
+ onclause=sql.and_(
+ sys_index_columns.c.object_id
+ == sys_key_constraints.c.parent_object_id,
+ sys_index_columns.c.index_id
+ == sys_key_constraints.c.unique_index_id,
+ ),
+ )
+ .join(
+ sys_columns,
+ onclause=sql.and_(
+ sys_columns.c.object_id == sys_index_columns.c.object_id,
+ sys_columns.c.column_id == sys_index_columns.c.column_id,
+ ),
+ )
+ .join(
+ sys_indexes,
+ onclause=sql.and_(
+ sys_indexes.c.object_id
+ == sys_key_constraints.c.parent_object_id,
+ sys_indexes.c.index_id
+ == sys_key_constraints.c.unique_index_id,
+ ),
+ )
+ .where(sys_key_constraints.c.type == "PK")
+ .order_by(sys_objects.c.name, sys_index_columns.c.key_ordinal)
+ )
+ return s
+
+ def _fetch_multi_pk_constraint(
+ self, connection, owner, names, schema, name_map, result, exec_opts
+ ):
+ """Execute the unified pk_constraint select for one catalog pass.
+
+ Used by :meth:`.get_multi_pk_constraint` for both the main DB
+ pass and the tempdb pass (with ``schema_translate_map`` set).
+ """
+ s = self._pk_constraint_select().where(
+ ischema.sys_schemas.c.name == owner,
+ ischema.sys_objects.c.name.in_(
+ sql.bindparam("filter_names", expanding=True)
+ ),
+ )
+
+ rp = connection.execute(
+ s,
+ {"filter_names": names},
+ execution_options=exec_opts,
+ )
+
+ for row in rp.mappings():
+ table_name = name_map(row["table_name"])
+ key = (schema, table_name)
+ if key not in result:
+ result[key] = {
+ "constrained_columns": [],
+ "name": row["constraint_name"],
+ "dialect_options": {
+ "mssql_clustered": row["index_type"] == 1
+ },
+ }
+ result[key]["constrained_columns"].append(row["column_name"])
+
+ for n in names:
+ key = (schema, name_map(n))
+ if key not in result:
+ result[key] = ReflectionDefaults.pk_constraint()
+
+ def _fetch_multi_pk_constraint_temp(
+ self, connection, temp_names, schema, result
+ ):
+ """Run the unified pk_constraint select against tempdb."""
+ resolved_by_owner, original_by_mangled = self._resolve_temp_names(
+ connection, temp_names
+ )
+
+ for temp_owner, mangled_names in resolved_by_owner.items():
+
+ def temp_name_map(server_name, _by=original_by_mangled):
+ return _by.get(server_name, server_name)
+
+ self._fetch_multi_pk_constraint(
+ connection,
+ owner=temp_owner,
+ names=mangled_names,
+ schema=schema,
+ name_map=temp_name_map,
+ result=result,
+ exec_opts={"schema_translate_map": {"sys": "tempdb.sys"}},
+ )
+
+ @_db_plus_owner_multi
+ def get_multi_foreign_keys(
+ self,
+ connection,
+ dbname,
+ owner,
+ schema,
+ filter_names,
+ scope,
+ kind,
+ **kw,
+ ):
+ (
+ run_bulk,
+ regular_names,
+ multi_object_names,
+ temp_names,
+ ) = self._partition_filter_names(
+ connection, owner, filter_names, scope, kind
+ )
+
+ final = {}
+
+ if run_bulk and multi_object_names:
+ name_map = self._multi_name_map(regular_names)
+ self._fetch_multi_foreign_keys(
+ connection,
+ dbname=dbname,
+ owner=owner,
+ names=multi_object_names,
+ schema=schema,
+ name_map=name_map,
+ final=final,
+ )
+
+ # FK reflection for temp tables: the INFORMATION_SCHEMA query in
+ # _fk_query_sql does not see tempdb objects (it queries the main
+ # DB's catalog). The pre-existing single-table behavior was to
+ # return empty foreign_keys for temp tables that exist. We
+ # preserve that here by checking existence and returning the
+ # default (empty) reflection for each temp that is reachable.
+ for name in temp_names:
+ if self._internal_has_table(connection, name, owner="dbo"):
+ final[(schema, name)] = ReflectionDefaults.foreign_keys()
+
+ return final.items()
+
+ def _fetch_multi_foreign_keys(
+ self, connection, dbname, owner, names, schema, name_map, final
+ ):
+ """Execute the unified FK query for the main-DB pass.
+
+ Uses the _fk_query_sql sql text, restricted to the provided owner and
+ IN-list of names. Groups result rows by table and constraint
+ name into the final dict.
+ """
+ rp = connection.execute(
+ sql.text(
+ self._fk_query_sql(
+ fk_info_where=(
+ "ischema_key_col.table_schema = :owner"
+ "\n AND ischema_key_col.table_name"
+ " IN :filter_names"
+ ),
+ extra_cols="fk_info.table_name,",
+ )
+ )
.bindparams(
- sql.bindparam("tablename", tablename, ischema.CoerceUnicode()),
sql.bindparam("owner", owner, ischema.CoerceUnicode()),
+ sql.bindparam("filter_names", names, expanding=True),
)
.columns(
constraint_schema=sqltypes.Unicode(),
constraint_name=sqltypes.Unicode(),
- table_schema=sqltypes.Unicode(),
table_name=sqltypes.Unicode(),
constrained_column=sqltypes.Unicode(),
referred_table_schema=sqltypes.Unicode(),
)
)
- # group rows by constraint ID, to handle multi-column FKs
- fkeys = util.defaultdict(
- lambda: {
- "name": None,
- "constrained_columns": [],
- "referred_schema": None,
- "referred_table": None,
- "referred_columns": [],
- "options": {},
- }
- )
-
- for r in connection.execute(s).all():
+ grouped = {}
+ for r in rp.all():
(
- _, # constraint schema
+ table_name,
+ _, # constraint_schema
rfknm,
- _, # ordinal position
+ _, # ordinal
scol,
rschema,
rtbl,
rcol,
- # TODO: we support match=<keyword> for foreign keys so
- # we can support this also, PG has match=FULL for example
- # but this seems to not be a valid value for SQL Server
- _, # match rule
+ _, # match
fkuprule,
fkdelrule,
) = r
- rec = fkeys[rfknm]
+ key = (schema, name_map(table_name))
+ if key not in grouped:
+ grouped[key] = util.defaultdict(
+ lambda: {
+ "name": None,
+ "constrained_columns": [],
+ "referred_schema": None,
+ "referred_table": None,
+ "referred_columns": [],
+ "options": {},
+ }
+ )
+
+ rec = grouped[key][rfknm]
rec["name"] = rfknm
if fkuprule != "NO ACTION":
rec["options"]["onupdate"] = fkuprule
-
if fkdelrule != "NO ACTION":
rec["options"]["ondelete"] = fkdelrule
rschema = dbname + "." + rschema
rec["referred_schema"] = rschema
- local_cols, remote_cols = (
- rec["constrained_columns"],
- rec["referred_columns"],
+ rec["constrained_columns"].append(scol)
+ rec["referred_columns"].append(rcol)
+
+ for key, fk_dict in grouped.items():
+ final[key] = list(fk_dict.values())
+
+ for n in names:
+ key = (schema, name_map(n))
+ if key not in final:
+ final[key] = ReflectionDefaults.foreign_keys()
+
+ @_db_plus_owner_multi
+ def get_multi_indexes(
+ self,
+ connection,
+ dbname,
+ owner,
+ schema,
+ filter_names,
+ scope,
+ kind,
+ **kw,
+ ):
+ (
+ run_bulk,
+ regular_names,
+ multi_object_names,
+ temp_names,
+ ) = self._partition_filter_names(
+ connection, owner, filter_names, scope, kind
+ )
+
+ result = {}
+
+ if run_bulk and multi_object_names:
+ name_map = self._multi_name_map(regular_names)
+ self._fetch_multi_indexes(
+ connection,
+ owner=owner,
+ names=multi_object_names,
+ schema=schema,
+ name_map=name_map,
+ result=result,
+ exec_opts={"schema_translate_map": {}},
)
- local_cols.append(scol)
- remote_cols.append(rcol)
+ if temp_names:
+ self._fetch_multi_indexes_temp(
+ connection,
+ temp_names=temp_names,
+ schema=schema,
+ result=result,
+ )
- if fkeys:
- return list(fkeys.values())
+ return result.items()
+
+ @lru_cache()
+ def _indexes_metadata_select(self):
+ """Build the Core sys.* select for index metadata (one row per
+ index per table).
+
+ Used by :meth:`._fetch_multi_indexes` for both the main-DB pass
+ and the tempdb pass (via ``schema_translate_map``). Replaces a
+ previous ``sql.text()`` body that did not honor
+ ``schema_translate_map`` (which only rewrites Core
+ schema-bearing objects, not literal SQL text).
+ """
+ sys_indexes = ischema.sys_indexes
+ sys_objects = ischema.sys_objects
+ sys_schemas = ischema.sys_schemas
+
+ if self.server_version_info >= MS_2008_VERSION:
+ filter_definition = sys_indexes.c.filter_definition
else:
- return self._default_or_error(
+ filter_definition = sql.null()
+
+ s = (
+ sql.select(
+ sys_objects.c.name.label("table_name"),
+ sys_indexes.c.index_id,
+ sys_indexes.c.is_unique,
+ sys_indexes.c.name,
+ sys_indexes.c.type,
+ filter_definition.label("filter_definition"),
+ )
+ .select_from(sys_indexes)
+ .join(
+ sys_objects,
+ onclause=sys_indexes.c.object_id == sys_objects.c.object_id,
+ )
+ .join(
+ sys_schemas,
+ onclause=sys_schemas.c.schema_id == sys_objects.c.schema_id,
+ )
+ .where(sys_indexes.c.is_primary_key == 0)
+ .where(sys_indexes.c.type != 0)
+ .order_by(sys_objects.c.name, sys_indexes.c.name)
+ )
+ return s
+
+ @lru_cache()
+ def _indexes_columns_select(self):
+ """Build the Core sys.* select for index columns (one row per
+ index column per index per table)."""
+ sys_index_columns = ischema.sys_index_columns
+ sys_columns = ischema.sys_columns
+ sys_objects = ischema.sys_objects
+ sys_schemas = ischema.sys_schemas
+
+ s = (
+ sql.select(
+ sys_objects.c.name.label("table_name"),
+ sys_index_columns.c.index_id,
+ sys_columns.c.name,
+ sys_index_columns.c.is_included_column,
+ )
+ .select_from(sys_index_columns)
+ .join(
+ sys_columns,
+ onclause=sql.and_(
+ sys_columns.c.object_id == sys_index_columns.c.object_id,
+ sys_columns.c.column_id == sys_index_columns.c.column_id,
+ ),
+ )
+ .join(
+ sys_objects,
+ onclause=sys_objects.c.object_id
+ == sys_index_columns.c.object_id,
+ )
+ .join(
+ sys_schemas,
+ onclause=sys_schemas.c.schema_id == sys_objects.c.schema_id,
+ )
+ .order_by(
+ sys_objects.c.name,
+ sys_index_columns.c.index_id,
+ sys_index_columns.c.key_ordinal,
+ )
+ )
+ return s
+
+ def _fetch_multi_indexes(
+ self, connection, owner, names, schema, name_map, result, exec_opts
+ ):
+ """Execute the unified indexes queries for one catalog pass.
+
+ Runs two Core sys.* selects (index metadata + index columns),
+ grouped by table. Used for both the main-DB pass and the
+ tempdb pass (with ``schema_translate_map={"sys": "tempdb.sys"}``
+ applied).
+ """
+ meta_q = self._indexes_metadata_select().where(
+ ischema.sys_schemas.c.name == owner,
+ ischema.sys_objects.c.name.in_(
+ sql.bindparam("filter_names", expanding=True)
+ ),
+ )
+
+ rp = connection.execute(
+ meta_q,
+ {"filter_names": names},
+ execution_options=exec_opts,
+ )
+
+ # {table_name: {index_id: index_dict}}
+ indexes_by_table = {}
+ for row in rp.mappings():
+ tname = name_map(row["table_name"])
+ if tname not in indexes_by_table:
+ indexes_by_table[tname] = {}
+
+ current = {
+ "name": row["name"],
+ "unique": row["is_unique"] == 1,
+ "column_names": [],
+ # NOTE: this is legacy, this is part of
+ # dialect_options now as of #7382
+ "include_columns": [],
+ "dialect_options": {},
+ }
+ do = current["dialect_options"]
+ index_type = row["type"]
+ if index_type in {1, 2}:
+ do["mssql_clustered"] = index_type == 1
+ if index_type in {5, 6}:
+ do["mssql_clustered"] = index_type == 5
+ do["mssql_columnstore"] = True
+ if row["filter_definition"] is not None:
+ do["mssql_where"] = row["filter_definition"]
+
+ indexes_by_table[tname][row["index_id"]] = current
+
+ cols_q = self._indexes_columns_select().where(
+ ischema.sys_schemas.c.name == owner,
+ ischema.sys_objects.c.name.in_(
+ sql.bindparam("filter_names", expanding=True)
+ ),
+ )
+
+ rp2 = connection.execute(
+ cols_q,
+ {"filter_names": names},
+ execution_options=exec_opts,
+ )
+
+ for row in rp2.mappings():
+ tname = name_map(row["table_name"])
+ idx_id = row["index_id"]
+ if tname not in indexes_by_table:
+ continue
+ if idx_id not in indexes_by_table[tname]:
+ continue
+ index_def = indexes_by_table[tname][idx_id]
+ is_colstore = index_def["dialect_options"].get("mssql_columnstore")
+ is_clustered = index_def["dialect_options"].get("mssql_clustered")
+ if not (is_colstore and is_clustered):
+ # a clustered columnstore index includes all columns but does
+ # not want them in the index definition
+ if row["is_included_column"] and not is_colstore:
+ # a noncludsted columnstore index reports that includes
+ # columns but requires that are listed as normal columns
+ index_def["include_columns"].append(row["name"])
+ else:
+ index_def["column_names"].append(row["name"])
+
+ for tname, idx_dict in indexes_by_table.items():
+ for index_info in idx_dict.values():
+ index_info["dialect_options"]["mssql_include"] = index_info[
+ "include_columns"
+ ]
+ result[(schema, tname)] = list(idx_dict.values())
+
+ for n in names:
+ key = (schema, name_map(n))
+ if key not in result:
+ result[key] = ReflectionDefaults.indexes()
+
+ def _fetch_multi_indexes_temp(
+ self, connection, temp_names, schema, result
+ ):
+ """Run the unified indexes queries against tempdb."""
+ resolved_by_owner, original_by_mangled = self._resolve_temp_names(
+ connection, temp_names
+ )
+
+ for temp_owner, mangled_names in resolved_by_owner.items():
+
+ def temp_name_map(server_name, _by=original_by_mangled):
+ return _by.get(server_name, server_name)
+
+ self._fetch_multi_indexes(
connection,
- tablename,
- owner,
- ReflectionDefaults.foreign_keys,
- **kw,
+ owner=temp_owner,
+ names=mangled_names,
+ schema=schema,
+ name_map=temp_name_map,
+ result=result,
+ exec_opts={"schema_translate_map": {"sys": "tempdb.sys"}},
+ )
+
+ @_db_plus_owner_multi
+ def get_multi_table_comment(
+ self,
+ connection,
+ dbname,
+ owner,
+ schema,
+ filter_names,
+ scope,
+ kind,
+ **kw,
+ ):
+ if not self.supports_comments:
+ raise NotImplementedError(
+ "Can't get table comments on current SQL Server "
+ "version in use"
+ )
+
+ (
+ run_bulk,
+ regular_names,
+ multi_object_names,
+ temp_names,
+ ) = self._partition_filter_names(
+ connection, owner, filter_names, scope, kind
+ )
+
+ result = {}
+
+ if run_bulk and multi_object_names:
+ name_map = self._multi_name_map(regular_names)
+ self._fetch_multi_table_comment(
+ connection,
+ owner=owner,
+ names=multi_object_names,
+ schema=schema,
+ name_map=name_map,
+ result=result,
+ exec_opts={"schema_translate_map": {}},
+ )
+
+ if temp_names:
+ self._fetch_multi_table_comment_temp(
+ connection,
+ temp_names=temp_names,
+ schema=schema,
+ result=result,
+ )
+
+ return result.items()
+
+ @lru_cache()
+ def _table_comment_select(self):
+ """Build the Core sys.* select for table comment reflection."""
+ sys_objects = ischema.sys_objects
+ sys_schemas = ischema.sys_schemas
+ extended_properties = ischema.extended_properties
+
+ s = (
+ sql.select(
+ sys_objects.c.name.label("table_name"),
+ extended_properties.c.value.label("comment"),
+ )
+ .select_from(sys_objects)
+ .join(
+ sys_schemas,
+ onclause=sys_objects.c.schema_id == sys_schemas.c.schema_id,
+ )
+ .outerjoin(
+ extended_properties,
+ onclause=sql.and_(
+ extended_properties.c["class"] == 1,
+ extended_properties.c.name == "MS_Description",
+ extended_properties.c.major_id == sys_objects.c.object_id,
+ extended_properties.c.minor_id == 0,
+ ),
+ )
+ .where(sys_objects.c.type.in_(("U", "V")))
+ )
+ return s
+
+ def _fetch_multi_table_comment(
+ self, connection, owner, names, schema, name_map, result, exec_opts
+ ):
+ """Execute the unified table comment query for one catalog pass.
+
+ Core sys.* select so ``schema_translate_map`` actually rewrites
+ the catalog reference for the tempdb pass.
+ """
+ q = self._table_comment_select().where(
+ ischema.sys_schemas.c.name == owner,
+ ischema.sys_objects.c.name.in_(
+ sql.bindparam("filter_names", expanding=True)
+ ),
+ )
+
+ rp = connection.execute(
+ q, {"filter_names": names}, execution_options=exec_opts
+ )
+
+ for row in rp.mappings():
+ table_name = name_map(row["table_name"])
+ comment = row["comment"]
+ result[(schema, table_name)] = (
+ {"text": comment} if comment else {"text": None}
+ )
+
+ for n in names:
+ key = (schema, name_map(n))
+ if key not in result:
+ result[key] = ReflectionDefaults.table_comment()
+
+ def _fetch_multi_table_comment_temp(
+ self, connection, temp_names, schema, result
+ ):
+ """Run the unified table comment query against tempdb."""
+ resolved_by_owner, original_by_mangled = self._resolve_temp_names(
+ connection, temp_names
+ )
+
+ for temp_owner, mangled_names in resolved_by_owner.items():
+
+ def temp_name_map(server_name, _by=original_by_mangled):
+ return _by.get(server_name, server_name)
+
+ self._fetch_multi_table_comment(
+ connection,
+ owner=temp_owner,
+ names=mangled_names,
+ schema=schema,
+ name_map=temp_name_map,
+ result=result,
+ exec_opts={"schema_translate_map": {"sys": "tempdb.sys"}},
)
+
+ # --- Single-table reflection wrappers (delegate to multi) ---
+
+ @reflection.cache
+ def get_columns(self, connection, table_name, schema=None, **kw):
+ data = self.get_multi_columns(
+ connection,
+ schema=schema,
+ filter_names=[table_name],
+ scope=ObjectScope.ANY,
+ kind=ObjectKind.ANY,
+ **kw,
+ )
+ return self._value_or_raise(data, table_name, schema)
+
+ @reflection.cache
+ def get_pk_constraint(self, connection, table_name, schema=None, **kw):
+ data = self.get_multi_pk_constraint(
+ connection,
+ schema=schema,
+ filter_names=[table_name],
+ scope=ObjectScope.ANY,
+ kind=ObjectKind.ANY,
+ **kw,
+ )
+ return self._value_or_raise(data, table_name, schema)
+
+ @reflection.cache
+ def get_foreign_keys(self, connection, table_name, schema=None, **kw):
+ data = self.get_multi_foreign_keys(
+ connection,
+ schema=schema,
+ filter_names=[table_name],
+ scope=ObjectScope.ANY,
+ kind=ObjectKind.ANY,
+ **kw,
+ )
+ return self._value_or_raise(data, table_name, schema)
+
+ @reflection.cache
+ def get_indexes(self, connection, table_name, schema=None, **kw):
+ data = self.get_multi_indexes(
+ connection,
+ schema=schema,
+ filter_names=[table_name],
+ scope=ObjectScope.ANY,
+ kind=ObjectKind.ANY,
+ **kw,
+ )
+ return self._value_or_raise(data, table_name, schema)
+
+ @reflection.cache
+ def get_table_comment(self, connection, table_name, schema=None, **kw):
+ data = self.get_multi_table_comment(
+ connection,
+ schema=schema,
+ filter_names=[table_name],
+ scope=ObjectScope.ANY,
+ kind=ObjectKind.ANY,
+ **kw,
+ )
+ return self._value_or_raise(data, table_name, schema)