The PostgreSQL dialect now supports reflection of expression based indexes.
The reflection is supported both when using
:meth:`_engine.Inspector.get_indexes` and when reflecting a
:class:`_schema.Table` using :paramref:`_schema.Table.autoload_with`.
Thanks to immerrr and Aidan Kane for the help on this ticket.
Fixes: #7442
Change-Id: I3e36d557235286c0f7f6d8276272ff9225058d48
--- /dev/null
+.. change::
+ :tags: reflection, postgresql
+ :tickets: 7442
+
+ The PostgreSQL dialect now supports reflection of expression based indexes.
+ The reflection is supported both when using
+ :meth:`_engine.Inspector.get_indexes` and when reflecting a
+ :class:`_schema.Table` using :paramref:`_schema.Table.autoload_with`.
+ Thanks to immerrr and Aidan Kane for the help on this ticket.
select(
idx_sq.c.indexrelid,
idx_sq.c.indrelid,
- pg_catalog.pg_attribute.c.attname,
+ # NOTE: always using pg_get_indexdef is too slow so just
+ # invoke when the element is an expression
+ sql.case(
+ (
+ idx_sq.c.attnum == 0,
+ pg_catalog.pg_get_indexdef(
+ idx_sq.c.indexrelid, idx_sq.c.ord + 1, True
+ ),
+ ),
+ else_=pg_catalog.pg_attribute.c.attname,
+ ).label("element"),
+ (idx_sq.c.attnum == 0).label("is_expr"),
)
- .select_from(pg_catalog.pg_attribute)
- .join(
- idx_sq,
+ .select_from(idx_sq)
+ .outerjoin(
+ # do not remove rows where idx_sq.c.attnum is 0
+ pg_catalog.pg_attribute,
sql.and_(
pg_catalog.pg_attribute.c.attnum == idx_sq.c.attnum,
pg_catalog.pg_attribute.c.attrelid == idx_sq.c.indrelid,
select(
attr_sq.c.indexrelid,
attr_sq.c.indrelid,
- sql.func.array_agg(attr_sq.c.attname).label("cols"),
+ sql.func.array_agg(attr_sq.c.element).label("elements"),
+ sql.func.array_agg(attr_sq.c.is_expr).label(
+ "elements_is_expr"
+ ),
)
.group_by(attr_sq.c.indexrelid, attr_sq.c.indrelid)
.subquery("idx_cols")
pg_catalog.pg_index.c.indrelid,
pg_class_index.c.relname.label("relname_index"),
pg_catalog.pg_index.c.indisunique,
- pg_catalog.pg_index.c.indexprs,
pg_catalog.pg_constraint.c.conrelid.is_not(None).label(
"has_constraint"
),
pg_catalog.pg_index.c.indoption,
pg_class_index.c.reloptions,
pg_catalog.pg_am.c.amname,
- pg_catalog.pg_get_expr(
- pg_catalog.pg_index.c.indpred,
- pg_catalog.pg_index.c.indrelid,
+ sql.case(
+ # pg_get_expr is very fast so this case has almost no
+ # performance impact
+ (
+ pg_catalog.pg_index.c.indpred.is_not(None),
+ pg_catalog.pg_get_expr(
+ pg_catalog.pg_index.c.indpred,
+ pg_catalog.pg_index.c.indrelid,
+ ),
+ ),
+ else_=sql.null(),
).label("filter_definition"),
indnkeyatts,
- cols_sq.c.cols.label("index_cols"),
+ cols_sq.c.elements,
+ cols_sq.c.elements_is_expr,
)
.select_from(pg_catalog.pg_index)
.where(
table_indexes = indexes[(schema, table_name)]
- if row["indexprs"]:
- tn = (
- table_name
- if schema is None
- else f"{schema}.{table_name}"
- )
- util.warn(
- "Skipped unsupported reflection of "
- f"expression-based index {index_name} of "
- f"table {tn}"
- )
- continue
-
- all_cols = row["index_cols"]
+ all_elements = row["elements"]
+ all_elements_is_expr = row["elements_is_expr"]
indnkeyatts = row["indnkeyatts"]
# "The number of key columns in the index, not counting any
# included columns, which are merely stored and do not
# participate in the index semantics"
- if indnkeyatts and all_cols[indnkeyatts:]:
+ if indnkeyatts and len(all_elements) > indnkeyatts:
# this is a "covering index" which has INCLUDE columns
# as well as regular index columns
- inc_cols = all_cols[indnkeyatts:]
- idx_cols = all_cols[:indnkeyatts]
+ inc_cols = all_elements[indnkeyatts:]
+ idx_elements = all_elements[:indnkeyatts]
+ idx_elements_is_expr = all_elements_is_expr[
+ :indnkeyatts
+ ]
+ # postgresql does not support expression on included
+ # columns as of v14: "ERROR: expressions are not
+ # supported in included columns".
+ assert all(
+ not is_expr
+ for is_expr in all_elements_is_expr[indnkeyatts:]
+ )
else:
- idx_cols = all_cols
+ idx_elements = all_elements
+ idx_elements_is_expr = all_elements_is_expr
inc_cols = []
- index = {
- "name": index_name,
- "unique": row["indisunique"],
- "column_names": idx_cols,
- }
+ index = {"name": index_name, "unique": row["indisunique"]}
+ if any(idx_elements_is_expr):
+ index["column_names"] = [
+ None if is_expr else expr
+ for expr, is_expr in zip(
+ idx_elements, idx_elements_is_expr
+ )
+ ]
+ index["expressions"] = idx_elements
+ else:
+ index["column_names"] = idx_elements
sorting = {}
for col_index, col_flags in enumerate(row["indoption"]):
if col_flags & 0x02:
col_sorting += ("nulls_first",)
if col_sorting:
- sorting[idx_cols[col_index]] = col_sorting
+ sorting[idx_elements[col_index]] = col_sorting
if sorting:
index["column_sorting"] = sorting
if row["has_constraint"]:
format_type = _pg_cat.format_type
pg_get_expr = _pg_cat.pg_get_expr
pg_get_constraintdef = _pg_cat.pg_get_constraintdef
+pg_get_indexdef = _pg_cat.pg_get_indexdef
# constants
RELKINDS_TABLE_NO_FOREIGN = ("r", "p")
name: Optional[str]
"""index name"""
- column_names: List[str]
- """column names which the index refers towards"""
+ column_names: List[Optional[str]]
+ """column names which the index refers towards.
+ An element of this list is ``None`` if it's an expression and is
+ returned in the ``expressions`` list.
+ """
+
+ expressions: NotRequired[List[str]]
+ """Expressions that compose the index. This list, when present, contains
+ both plain column names (that are also in ``column_names``) and
+ expressions (that are ``None`` in ``column_names``).
+ """
unique: bool
"""whether or not the index has a unique flag"""
)
)
- _index_sort_exprs = [
- ("asc", operators.asc_op),
- ("desc", operators.desc_op),
- ("nulls_first", operators.nulls_first_op),
- ("nulls_last", operators.nulls_last_op),
- ]
+ _index_sort_exprs = {
+ "asc": operators.asc_op,
+ "desc": operators.desc_op,
+ "nulls_first": operators.nulls_first_op,
+ "nulls_last": operators.nulls_last_op,
+ }
def _reflect_indexes(
self,
for index_d in indexes:
name = index_d["name"]
columns = index_d["column_names"]
+ expressions = index_d.get("expressions")
column_sorting = index_d.get("column_sorting", {})
unique = index_d["unique"]
flavor = index_d.get("type", "index")
continue
# look for columns by orig name in cols_by_orig_name,
# but support columns that are in-Python only as fallback
- idx_col: Any
- idx_cols = []
- for c in columns:
- try:
- idx_col = (
- cols_by_orig_name[c]
- if c in cols_by_orig_name
- else table.c[c]
- )
- except KeyError:
- util.warn(
- "%s key '%s' was not located in "
- "columns for table '%s'" % (flavor, c, table.name)
- )
- continue
- c_sorting = column_sorting.get(c, ())
- for k, op in self._index_sort_exprs:
- if k in c_sorting:
- idx_col = op(idx_col)
- idx_cols.append(idx_col)
-
- sa_schema.Index(
- name,
- *idx_cols,
- _table=table,
- **dict(list(dialect_options.items()) + [("unique", unique)]),
- )
+ idx_element: Any
+ idx_elements = []
+ for index, c in enumerate(columns):
+ if c is None:
+ if not expressions:
+ util.warn(
+ f"Skipping {flavor} {name!r} because key "
+ f"{index+1} reflected as None but no "
+ "'expressions' were returned"
+ )
+ break
+ idx_element = sql.text(expressions[index])
+ else:
+ try:
+ if c in cols_by_orig_name:
+ idx_element = cols_by_orig_name[c]
+ else:
+ idx_element = table.c[c]
+ except KeyError:
+ util.warn(
+ f"{flavor} key {c!r} was not located in "
+ f"columns for table {table.name!r}"
+ )
+ continue
+ for option in column_sorting.get(c, ()):
+ if option in self._index_sort_exprs:
+ op = self._index_sort_exprs[option]
+ idx_element = op(idx_element)
+ idx_elements.append(idx_element)
+ else:
+ sa_schema.Index(
+ name,
+ *idx_elements,
+ _table=table,
+ unique=unique,
+ **dialect_options,
+ )
def _reflect_unique_constraints(
self,
from .assertions import assert_warns_message
from .assertions import AssertsCompiledSQL
from .assertions import AssertsExecutionResults
+from .assertions import ComparesIndexes
from .assertions import ComparesTables
from .assertions import emits_warning
from .assertions import emits_warning_on
def assert_statement_count(self, db, count):
return self.assert_execution(db, assertsql.CountStatements(count))
+
+
+class ComparesIndexes:
+ def compare_table_index_with_expected(
+ self, table: schema.Table, expected: list, dialect_name: str
+ ):
+ eq_(len(table.indexes), len(expected))
+ idx_dict = {idx.name: idx for idx in table.indexes}
+ for exp in expected:
+ idx = idx_dict[exp["name"]]
+ eq_(idx.unique, exp["unique"])
+ cols = [c for c in exp["column_names"] if c is not None]
+ eq_(len(idx.columns), len(cols))
+ for c in cols:
+ is_true(c in idx.columns)
+ exprs = exp.get("expressions")
+ if exprs:
+ eq_(len(idx.expressions), len(exprs))
+ for idx_exp, expr, col in zip(
+ idx.expressions, exprs, exp["column_names"]
+ ):
+ if col is None:
+ eq_(idx_exp.text, expr)
+ if (
+ exp.get("dialect_options")
+ and f"{dialect_name}_include" in exp["dialect_options"]
+ ):
+ eq_(
+ idx.dialect_options[dialect_name]["include"],
+ exp["dialect_options"][f"{dialect_name}_include"],
+ )
"""target database supports CREATE INDEX against SQL expressions."""
return exclusions.closed()
+ @property
+ def reflect_indexes_with_expressions(self):
+ """target database supports reflection of indexes with
+ SQL expressions."""
+ return exclusions.closed()
+
@property
def unique_constraint_reflection(self):
"""target dialect supports reflection of unique constraints"""
from ...schema import Index
from ...sql.elements import quoted_name
from ...sql.schema import BLANK_SCHEMA
+from ...testing import ComparesIndexes
from ...testing import ComparesTables
from ...testing import is_false
from ...testing import is_true
eq_(multi, {(None, "empty_v"): []})
-class ComponentReflectionTestExtra(fixtures.TestBase):
+class ComponentReflectionTestExtra(ComparesIndexes, fixtures.TestBase):
__backend__ = True
metadata,
Column("x", String(30)),
Column("y", String(30)),
+ Column("z", String(30)),
)
- Index("t_idx", func.lower(t.c.x), func.lower(t.c.y))
+ Index("t_idx", func.lower(t.c.x), t.c.z, func.lower(t.c.y))
Index("t_idx_2", t.c.x)
expected = [
{"name": "t_idx_2", "column_names": ["x"], "unique": False}
]
- if testing.requires.index_reflects_included_columns.enabled:
- expected[0]["include_columns"] = []
- expected[0]["dialect_options"] = {
- "%s_include" % connection.engine.name: []
+
+ def completeIndex(entry):
+ if testing.requires.index_reflects_included_columns.enabled:
+ entry["include_columns"] = []
+ entry["dialect_options"] = {
+ f"{connection.engine.name}_include": []
+ }
+
+ completeIndex(expected[0])
+
+ class filtering_str(str):
+ def __eq__(self, other):
+ # test that lower and x or y are in the string
+ return "lower" in other and ("x" in other or "y" in other)
+
+ if testing.requires.reflect_indexes_with_expressions.enabled:
+ expr_index = {
+ "name": "t_idx",
+ "column_names": [None, "z", None],
+ "expressions": [
+ filtering_str("lower(x)"),
+ "z",
+ filtering_str("lower(y)"),
+ ],
+ "unique": False,
}
+ completeIndex(expr_index)
+ expected.insert(0, expr_index)
+ eq_(insp.get_indexes("t"), expected)
+ m2 = MetaData()
+ t2 = Table("t", m2, autoload_with=connection)
+ else:
+ with expect_warnings(
+ "Skipped unsupported reflection of expression-based "
+ "index t_idx"
+ ):
+ eq_(insp.get_indexes("t"), expected)
+ m2 = MetaData()
+ t2 = Table("t", m2, autoload_with=connection)
- with expect_warnings(
- "Skipped unsupported reflection of expression-based index t_idx"
- ):
- eq_(
- insp.get_indexes("t"),
- expected,
- )
+ self.compare_table_index_with_expected(
+ t2, expected, connection.engine.name
+ )
@testing.requires.index_reflects_included_columns
def test_reflect_covering_index(self, metadata, connection):
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
+from sqlalchemy import Text
from sqlalchemy import UniqueConstraint
from sqlalchemy.dialects.postgresql import ARRAY
from sqlalchemy.dialects.postgresql import base as postgresql
from sqlalchemy.testing import mock
from sqlalchemy.testing.assertions import assert_warns
from sqlalchemy.testing.assertions import AssertsExecutionResults
+from sqlalchemy.testing.assertions import ComparesIndexes
from sqlalchemy.testing.assertions import eq_
from sqlalchemy.testing.assertions import expect_raises
from sqlalchemy.testing.assertions import is_
class ReflectionTest(
- ReflectionFixtures, AssertsCompiledSQL, fixtures.TestBase
+ ReflectionFixtures, AssertsCompiledSQL, ComparesIndexes, fixtures.TestBase
):
__only_on__ = "postgresql"
__backend__ = True
A_seq.drop(connection)
def test_index_reflection(self, metadata, connection):
- """Reflecting expression-based indexes should warn"""
+ """Reflecting expression-based indexes works"""
Table(
"party",
Column("id", String(10), nullable=False),
Column("name", String(20), index=True),
Column("aname", String(20)),
+ Column("other", String(20)),
)
metadata.create_all(connection)
- connection.exec_driver_sql("create index idx1 on party ((id || name))")
+ connection.exec_driver_sql(
+ """
+ create index idx3 on party
+ (lower(name::text), other, lower(aname::text))
+ """
+ )
+ connection.exec_driver_sql(
+ "create index idx1 on party ((id || name), (other || id::text))"
+ )
connection.exec_driver_sql(
"create unique index idx2 on party (id) where name = 'test'"
)
connection.exec_driver_sql(
"""
- create index idx3 on party using btree
- (lower(name::text), lower(aname::text))
+ create index idx4 on party using btree
+ (name nulls first, lower(other), aname desc)
+ where name != 'foo'
"""
)
- def go():
- m2 = MetaData()
- t2 = Table("party", m2, autoload_with=connection)
- assert len(t2.indexes) == 2
-
- # Make sure indexes are in the order we expect them in
-
- tmp = [(idx.name, idx) for idx in t2.indexes]
- tmp.sort()
- r1, r2 = [idx[1] for idx in tmp]
- assert r1.name == "idx2"
- assert r1.unique is True
- assert r2.unique is False
- assert [t2.c.id] == r1.columns
- assert [t2.c.name] == r2.columns
-
- testing.assert_warnings(
- go,
- [
- "Skipped unsupported reflection of "
- "expression-based index idx1 of table party",
- "Skipped unsupported reflection of "
- "expression-based index idx3 of table party",
- ],
- )
+ expected = [
+ {
+ "name": "idx1",
+ "column_names": [None, None],
+ "expressions": [
+ "(id::text || name::text)",
+ "(other::text || id::text)",
+ ],
+ "unique": False,
+ "include_columns": [],
+ "dialect_options": {"postgresql_include": []},
+ },
+ {
+ "name": "idx2",
+ "column_names": ["id"],
+ "unique": True,
+ "include_columns": [],
+ "dialect_options": {
+ "postgresql_include": [],
+ "postgresql_where": "((name)::text = 'test'::text)",
+ },
+ },
+ {
+ "name": "idx3",
+ "column_names": [None, "other", None],
+ "expressions": [
+ "lower(name::text)",
+ "other",
+ "lower(aname::text)",
+ ],
+ "unique": False,
+ "include_columns": [],
+ "dialect_options": {"postgresql_include": []},
+ },
+ {
+ "name": "idx4",
+ "column_names": ["name", None, "aname"],
+ "expressions": ["name", "lower(other::text)", "aname"],
+ "unique": False,
+ "include_columns": [],
+ "dialect_options": {
+ "postgresql_include": [],
+ "postgresql_where": "((name)::text <> 'foo'::text)",
+ },
+ "column_sorting": {
+ "aname": ("desc",),
+ "name": ("nulls_first",),
+ },
+ },
+ {
+ "name": "ix_party_name",
+ "column_names": ["name"],
+ "unique": False,
+ "include_columns": [],
+ "dialect_options": {"postgresql_include": []},
+ },
+ ]
+ if connection.dialect.server_version_info < (11,):
+ for index in expected:
+ index.pop("include_columns")
+ index["dialect_options"].pop("postgresql_include")
+ if not index["dialect_options"]:
+ index.pop("dialect_options")
+
+ insp = inspect(connection)
+ eq_(insp.get_indexes("party"), expected)
+
+ m2 = MetaData()
+ t2 = Table("party", m2, autoload_with=connection)
+ self.compare_table_index_with_expected(t2, expected, "postgresql")
def test_index_reflection_partial(self, metadata, connection):
"""Reflect the filter definition on partial indexes"""
Column("id", Integer, primary_key=True),
Column("x", ARRAY(Integer)),
Column("name", String(20)),
+ Column("aname", String(20)),
+ Column("other", Text()),
)
metadata.create_all(connection)
connection.exec_driver_sql("CREATE INDEX idx1 ON t (x) INCLUDE (name)")
-
- # prior to #5205, this would return:
- # [{'column_names': ['x', 'name'],
- # 'name': 'idx1', 'unique': False}]
+ connection.exec_driver_sql(
+ """
+ create index idx3 on t
+ (lower(name::text), other desc nulls last, lower(aname::text))
+ include (id, x)
+ """
+ )
+ connection.exec_driver_sql(
+ """
+ create unique index idx2 on t using btree
+ (lower(other), (id * id)) include (id)
+ """
+ )
ind = connection.dialect.get_indexes(connection, "t", None)
eq_(
"include_columns": ["name"],
"dialect_options": {"postgresql_include": ["name"]},
"name": "idx1",
- }
+ },
+ {
+ "name": "idx2",
+ "column_names": [None, None],
+ "expressions": ["lower(other)", "(id * id)"],
+ "unique": True,
+ "include_columns": ["id"],
+ "dialect_options": {"postgresql_include": ["id"]},
+ },
+ {
+ "name": "idx3",
+ "column_names": [None, "other", None],
+ "expressions": [
+ "lower(name::text)",
+ "other",
+ "lower(aname::text)",
+ ],
+ "unique": False,
+ "include_columns": ["id", "x"],
+ "dialect_options": {"postgresql_include": ["id", "x"]},
+ "column_sorting": {
+ "other": ("desc", "nulls_last"),
+ },
+ },
],
)
eq_(list(t.indexes)[0].columns, [t.c.b])
+ def test_index_reflection_expression_not_found(self, connection, metadata):
+ t = Table("x", metadata, Column("a", Integer), Column("b", Integer))
+ sa.Index("x_ix", t.c.a)
+ sa.Index("x_iy", t.c.a, t.c.b)
+ metadata.create_all(connection)
+
+ gri = Inspector._get_reflection_info
+
+ def mock_gri(self, *a, **kw):
+ res = gri(self, *a, **kw)
+ for idx in res.indexes[(None, "x")]:
+ if idx["name"] == "x_iy":
+ idx["column_names"][1] = None
+ idx.pop("expressions", None)
+ return res
+
+ with testing.mock.patch.object(
+ Inspector, "_get_reflection_info", mock_gri
+ ):
+ m = MetaData()
+ with testing.expect_warnings(
+ "Skipping index 'x_iy' because key 2 reflected as None"
+ ):
+ t = Table("x", m, autoload_with=connection)
+
+ eq_(len(t.indexes), 1)
+ eq_(list(t.indexes)[0].name, "x_ix")
+
@testing.requires.views
def test_views(self, connection, metadata):
users, addresses, dingalings = createTables(metadata)
def indexes_with_expressions(self):
return only_on(["postgresql", "sqlite>=3.9.0"])
+ @property
+ def reflect_indexes_with_expressions(self):
+ return only_on(["postgresql"])
+
@property
def temp_table_names(self):
"""target dialect supports listing of temporary table names"""