"""Build the unified Core sys.* select for column reflection.
Returns a ``select()`` that includes ``table_name`` and ``owner``
- in the result so callers can group rows by table. The caller
- applies the appropriate WHERE clause for either the single-table
- case (``sys_columns.c.object_id == X``) or the multi-table case
- (``sys_schemas.c.name == owner AND sys_objects.c.name.in_(names)``).
+ in the result so callers can group rows by table.
Used by :meth:`.get_multi_columns` (and indirectly by
:meth:`.get_columns` via the multi delegation).
sys_columns.c.column_id == extended_properties.c.minor_id,
),
)
+ .where(
+ ischema.sys_schemas.c.name == sql.bindparam("owner"),
+ ischema.sys_objects.c.name.in_(
+ sql.bindparam("filter_names", expanding=True)
+ ),
+ )
.order_by(sys_objects.c.name, sys_columns.c.column_id)
)
return s
the existing-in-catalog names for the bulk path in one shot.
Returns the tuple
- ``(run_bulk, regular_names, multi_object_names, temp_names)``:
+ ``(regular_names, multi_object_names, temp_names)``:
- * ``run_bulk`` -- whether the multi method should run its bulk
- ``sys.*`` query at all.
* ``regular_names`` -- list of caller-supplied names (with the
caller's casing) for the bulk path, used to build the
case-insensitive name map. Empty list when no regular names
* ``multi_object_names`` -- list of names (with server-side
casing) that actually exist in the catalog and match the
requested ``kind``. This is what the bulk SQL's IN clause
- gets filtered with. Empty list when no matches.
+ gets filtered with. Empty list when no matches, disabling
+ the bulk query.
* ``temp_names`` -- list of ``#``-prefixed temp object names
that the caller asked for and that the requested
``scope``/``kind`` allows.
run_bulk = include_default
if not run_bulk:
- return (run_bulk, regular_names, [], temp_names)
+ return (regular_names, [], temp_names)
type_filter = []
if ObjectKind.TABLE in kind:
type_filter.append("'V'")
# SQL Server does not support materialized views, so ignore them
if not type_filter:
- return (run_bulk, regular_names, [], temp_names)
+ return (regular_names, [], temp_names)
query = (
"SELECT o.name "
rp = connection.execute(sql.text(query).bindparams(*params))
multi_object_names = [row[0] for row in rp]
- return (run_bulk, regular_names, multi_object_names, temp_names)
+ return (regular_names, multi_object_names, temp_names)
@staticmethod
def _multi_name_map(filter_names):
kind,
**kw,
):
- (
- run_bulk,
- regular_names,
- multi_object_names,
- temp_names,
- ) = self._partition_filter_names(
- connection, owner, filter_names, scope, kind
+ regular_names, multi_object_names, temp_names = (
+ self._partition_filter_names(
+ connection, owner, filter_names, scope, kind
+ )
)
result = {}
- if run_bulk and multi_object_names:
+ if multi_object_names:
name_map = self._multi_name_map(regular_names)
self._fetch_multi_columns(
connection,
(no ``schema_translate_map``) and the tempdb pass (with
``schema_translate_map={"sys": "tempdb.sys"}`` applied).
"""
- s = self._columns_select().where(
- ischema.sys_schemas.c.name == owner,
- ischema.sys_objects.c.name.in_(
- sql.bindparam("filter_names", expanding=True)
- ),
- )
+ s = self._columns_select()
rp = connection.execute(
s,
- {"filter_names": names},
+ {"owner": owner, "filter_names": names},
execution_options=exec_opts,
)
``schema_translate_map``. The result rows (keyed by mangled
name) are mapped back to the original ``#name`` for the caller.
"""
- resolved_by_owner, original_by_mangled = self._resolve_temp_names(
+ resolved_by_owner, temp_name_map = self._resolve_temp_names(
connection, temp_names
)
-
for temp_owner, mangled_names in resolved_by_owner.items():
-
- def temp_name_map(server_name, _by=original_by_mangled):
- return _by.get(server_name, server_name)
-
self._fetch_multi_columns(
connection,
owner=temp_owner,
"""Resolve user-facing temp names (``#foo``) to their mangled
tempdb names, grouped by owner.
- Returns ``(resolved_by_owner, original_by_mangled)``:
+ Returns ``(resolved_by_owner, temp_name_map)``:
* ``resolved_by_owner`` -- ``{owner: [mangled_name, ...]}``,
one bulk query per owner (typically just ``'dbo'``).
- * ``original_by_mangled`` -- ``{mangled_name: original_#name}``
- so caller can map result rows back to the user-supplied name.
+ * ``temp_name_map`` -- a function that maps mangled names back
+ to the user-supplied name.
Names that don't resolve (no matching temp table in tempdb) are
silently dropped. The single-table reflection wrappers handle
continue
resolved_by_owner.setdefault(temp_owner, []).append(mangled)
original_by_mangled[mangled] = name
- return resolved_by_owner, original_by_mangled
+
+ def temp_name_map(server_name):
+ return original_by_mangled.get(server_name, server_name)
+
+ return resolved_by_owner, temp_name_map
@_db_plus_owner_multi
def get_multi_pk_constraint(
kind,
**kw,
):
- (
- run_bulk,
- regular_names,
- multi_object_names,
- temp_names,
- ) = self._partition_filter_names(
- connection, owner, filter_names, scope, kind
+ regular_names, multi_object_names, temp_names = (
+ self._partition_filter_names(
+ connection, owner, filter_names, scope, kind
+ )
)
result = {}
- if run_bulk and multi_object_names:
+ if multi_object_names:
name_map = self._multi_name_map(regular_names)
self._fetch_multi_pk_constraint(
connection,
== sys_key_constraints.c.unique_index_id,
),
)
- .where(sys_key_constraints.c.type == "PK")
+ .where(
+ sys_key_constraints.c.type == "PK",
+ ischema.sys_schemas.c.name == sql.bindparam("owner"),
+ ischema.sys_objects.c.name.in_(
+ sql.bindparam("filter_names", expanding=True)
+ ),
+ )
.order_by(sys_objects.c.name, sys_index_columns.c.key_ordinal)
)
return s
Used by :meth:`.get_multi_pk_constraint` for both the main DB
pass and the tempdb pass (with ``schema_translate_map`` set).
"""
- s = self._pk_constraint_select().where(
- ischema.sys_schemas.c.name == owner,
- ischema.sys_objects.c.name.in_(
- sql.bindparam("filter_names", expanding=True)
- ),
- )
+ s = self._pk_constraint_select()
rp = connection.execute(
s,
- {"filter_names": names},
+ {"owner": owner, "filter_names": names},
execution_options=exec_opts,
)
self, connection, temp_names, schema, result
):
"""Run the unified pk_constraint select against tempdb."""
- resolved_by_owner, original_by_mangled = self._resolve_temp_names(
+ resolved_by_owner, temp_name_map = self._resolve_temp_names(
connection, temp_names
)
for temp_owner, mangled_names in resolved_by_owner.items():
-
- def temp_name_map(server_name, _by=original_by_mangled):
- return _by.get(server_name, server_name)
-
self._fetch_multi_pk_constraint(
connection,
owner=temp_owner,
kind,
**kw,
):
- (
- run_bulk,
- regular_names,
- multi_object_names,
- temp_names,
- ) = self._partition_filter_names(
- connection, owner, filter_names, scope, kind
+ regular_names, multi_object_names, temp_names = (
+ self._partition_filter_names(
+ connection, owner, filter_names, scope, kind
+ )
)
final = {}
- if run_bulk and multi_object_names:
+ if multi_object_names:
name_map = self._multi_name_map(regular_names)
self._fetch_multi_foreign_keys(
connection,
kind,
**kw,
):
- (
- run_bulk,
- regular_names,
- multi_object_names,
- temp_names,
- ) = self._partition_filter_names(
- connection, owner, filter_names, scope, kind
+ regular_names, multi_object_names, temp_names = (
+ self._partition_filter_names(
+ connection, owner, filter_names, scope, kind
+ )
)
result = {}
- if run_bulk and multi_object_names:
+ if multi_object_names:
name_map = self._multi_name_map(regular_names)
self._fetch_multi_indexes(
connection,
sys_schemas,
onclause=sys_schemas.c.schema_id == sys_objects.c.schema_id,
)
- .where(sys_indexes.c.is_primary_key == 0)
- .where(sys_indexes.c.type != 0)
+ .where(
+ sys_indexes.c.is_primary_key == 0,
+ sys_indexes.c.type != 0,
+ ischema.sys_schemas.c.name == sql.bindparam("owner"),
+ ischema.sys_objects.c.name.in_(
+ sql.bindparam("filter_names", expanding=True)
+ ),
+ )
.order_by(sys_objects.c.name, sys_indexes.c.name)
)
return s
sys_schemas,
onclause=sys_schemas.c.schema_id == sys_objects.c.schema_id,
)
+ .where(
+ ischema.sys_schemas.c.name == sql.bindparam("owner"),
+ ischema.sys_objects.c.name.in_(
+ sql.bindparam("filter_names", expanding=True)
+ ),
+ )
.order_by(
sys_objects.c.name,
sys_index_columns.c.index_id,
tempdb pass (with ``schema_translate_map={"sys": "tempdb.sys"}``
applied).
"""
- meta_q = self._indexes_metadata_select().where(
- ischema.sys_schemas.c.name == owner,
- ischema.sys_objects.c.name.in_(
- sql.bindparam("filter_names", expanding=True)
- ),
- )
+ meta_q = self._indexes_metadata_select()
rp = connection.execute(
meta_q,
- {"filter_names": names},
+ {"owner": owner, "filter_names": names},
execution_options=exec_opts,
)
indexes_by_table[tname][row["index_id"]] = current
- cols_q = self._indexes_columns_select().where(
- ischema.sys_schemas.c.name == owner,
- ischema.sys_objects.c.name.in_(
- sql.bindparam("filter_names", expanding=True)
- ),
- )
+ cols_q = self._indexes_columns_select()
rp2 = connection.execute(
cols_q,
- {"filter_names": names},
+ {"owner": owner, "filter_names": names},
execution_options=exec_opts,
)
self, connection, temp_names, schema, result
):
"""Run the unified indexes queries against tempdb."""
- resolved_by_owner, original_by_mangled = self._resolve_temp_names(
+ resolved_by_owner, temp_name_map = self._resolve_temp_names(
connection, temp_names
)
for temp_owner, mangled_names in resolved_by_owner.items():
-
- def temp_name_map(server_name, _by=original_by_mangled):
- return _by.get(server_name, server_name)
-
self._fetch_multi_indexes(
connection,
owner=temp_owner,
"version in use"
)
- (
- run_bulk,
- regular_names,
- multi_object_names,
- temp_names,
- ) = self._partition_filter_names(
- connection, owner, filter_names, scope, kind
+ regular_names, multi_object_names, temp_names = (
+ self._partition_filter_names(
+ connection, owner, filter_names, scope, kind
+ )
)
result = {}
- if run_bulk and multi_object_names:
+ if multi_object_names:
name_map = self._multi_name_map(regular_names)
self._fetch_multi_table_comment(
connection,
extended_properties.c.minor_id == 0,
),
)
- .where(sys_objects.c.type.in_(("U", "V")))
+ .where(
+ sys_objects.c.type.in_(("U", "V")),
+ ischema.sys_schemas.c.name == sql.bindparam("owner"),
+ ischema.sys_objects.c.name.in_(
+ sql.bindparam("filter_names", expanding=True)
+ ),
+ )
)
return s
Core sys.* select so ``schema_translate_map`` actually rewrites
the catalog reference for the tempdb pass.
"""
- q = self._table_comment_select().where(
- ischema.sys_schemas.c.name == owner,
- ischema.sys_objects.c.name.in_(
- sql.bindparam("filter_names", expanding=True)
- ),
- )
+ q = self._table_comment_select()
rp = connection.execute(
- q, {"filter_names": names}, execution_options=exec_opts
+ q,
+ {"owner": owner, "filter_names": names},
+ execution_options=exec_opts,
)
for row in rp.mappings():
self, connection, temp_names, schema, result
):
"""Run the unified table comment query against tempdb."""
- resolved_by_owner, original_by_mangled = self._resolve_temp_names(
+ resolved_by_owner, temp_name_map = self._resolve_temp_names(
connection, temp_names
)
-
for temp_owner, mangled_names in resolved_by_owner.items():
-
- def temp_name_map(server_name, _by=original_by_mangled):
- return _by.get(server_name, server_name)
-
self._fetch_multi_table_comment(
connection,
owner=temp_owner,