return s
default_schema_name = connection.dialect.default_schema_name
- col_tuples = [
- (
- lower(rec["referred_schema"] or default_schema_name),
- lower(rec["referred_table"]),
- col_name,
- )
- for rec in fkeys
- for col_name in rec["referred_columns"]
- ]
- if col_tuples:
- correct_for_wrong_fk_case = connection.execute(
- sql.text(
- """
- select table_schema, table_name, column_name
- from information_schema.columns
- where (table_schema, table_name, lower(column_name)) in
- :table_data;
- """
- ).bindparams(sql.bindparam("table_data", expanding=True)),
- dict(table_data=col_tuples),
+ # NOTE: using (table_schema, table_name, lower(column_name)) in (...)
+ # is very slow since mysql does not seem able to properly use indexse.
+ # Unpack the where condition instead.
+ schema_by_table_by_column = defaultdict(lambda: defaultdict(list))
+ for rec in fkeys:
+ sch = lower(rec["referred_schema"] or default_schema_name)
+ tbl = lower(rec["referred_table"])
+ for col_name in rec["referred_columns"]:
+ schema_by_table_by_column[sch][tbl].append(col_name)
+
+ if schema_by_table_by_column:
+
+ condition = sql.or_(
+ *(
+ sql.and_(
+ _info_columns.c.table_schema == schema,
+ sql.or_(
+ *(
+ sql.and_(
+ _info_columns.c.table_name == table,
+ sql.func.lower(
+ _info_columns.c.column_name
+ ).in_(columns),
+ )
+ for table, columns in tables.items()
+ )
+ ),
+ )
+ for schema, tables in schema_by_table_by_column.items()
+ )
)
+ select = sql.select(
+ _info_columns.c.table_schema,
+ _info_columns.c.table_name,
+ _info_columns.c.column_name,
+ ).where(condition)
+
+ correct_for_wrong_fk_case = connection.execute(select)
+
# in casing=0, table name and schema name come back in their
# exact case.
# in casing=1, table name and schema name come back in lower
return item.decode(self.charset)
else:
return item
+
+
+_info_columns = sql.table(
+ "columns",
+ sql.column("table_schema", VARCHAR(64)),
+ sql.column("table_name", VARCHAR(64)),
+ sql.column("column_name", VARCHAR(64)),
+ schema="information_schema",
+)