from alembic.testing import assert_raises_message
from alembic.testing import config
from alembic.testing import eq_
+from alembic.testing import exclusions
from alembic.testing import is_
from alembic.testing import is_not_
from alembic.testing import mock
"pre_cache" if not disable_pre_cache else "no_pre_cache"
],
)
+
+
+class AutogenPlaceholderTableTest(AutogenFixtureTest, TestBase):
+ """test for placeholder table creation for non-reflected FK targets (issue
+ #1787).
+
+ When foreign keys reference tables that are filtered out by include_name,
+ the fix creates placeholder Table objects so that the FK can be processed
+ without error.
+ """
+
+ __backend__ = True
+
+ def test_fk_to_filtered_table_single_column(self):
+ """Test FK to a table filtered out by include_name - single column."""
+ m1 = MetaData()
+ m2 = MetaData()
+
+ # Database schema: parent and child tables with FK
+ Table(
+ "parent",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("data", String(50)),
+ )
+
+ Table(
+ "child",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("parent_id", Integer),
+ ForeignKeyConstraint(["parent_id"], ["parent.id"]),
+ )
+
+ # Model schema: child table only, no FK
+ # (parent is filtered out, so FK should be dropped too)
+ Table(
+ "child",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("parent_id", Integer),
+ )
+
+ def include_name(name, type_, parent_names):
+ if type_ == "table" and name == "parent":
+ return False
+ return True
+
+ # Should not raise NoReferencedTableError during comparison
+ # The fix creates placeholder tables for reflected FKs that
+ # reference filtered tables, allowing the comparison to proceed
+ diffs = self._fixture(m1, m2, name_filters=include_name)
+ # We expect one diff: drop the FK constraint
+ eq_(len(diffs), 1)
+ eq_(diffs[0][0], "remove_fk")
+
+ def test_fk_to_filtered_table_composite(self):
+ """Test FK to a table filtered out by include_name - composite FK."""
+ m1 = MetaData()
+ m2 = MetaData()
+
+ # Database schema: parent and child with composite FK
+ Table(
+ "parent",
+ m1,
+ Column("id", Integer),
+ Column("version", Integer),
+ PrimaryKeyConstraint("id", "version"),
+ )
+
+ Table(
+ "child",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("parent_id", Integer),
+ Column("parent_version", Integer),
+ ForeignKeyConstraint(
+ ["parent_id", "parent_version"],
+ ["parent.id", "parent.version"],
+ ),
+ )
+
+ # Model schema: only child table, no FK
+ Table(
+ "child",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("parent_id", Integer),
+ Column("parent_version", Integer),
+ )
+
+ def include_name(name, type_, parent_names):
+ if type_ == "table" and name == "parent":
+ return False
+ return True
+
+ # Should not raise NoReferencedTableError
+ diffs = self._fixture(m1, m2, name_filters=include_name)
+ eq_(len(diffs), 1)
+ eq_(diffs[0][0], "remove_fk")
+
+ def test_fk_to_filtered_table_with_include_object(self):
+ """Test FK filtering with include_object can access referred_table."""
+ m1 = MetaData()
+ m2 = MetaData()
+
+ # Database schema
+ Table(
+ "parent",
+ m1,
+ Column("id", Integer, primary_key=True),
+ )
+
+ Table(
+ "parent_partition_1",
+ m1,
+ Column("id", Integer, primary_key=True),
+ )
+
+ Table(
+ "child",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("parent_id", Integer),
+ ForeignKeyConstraint(["parent_id"], ["parent.id"]),
+ # FK to partition (should be filtered by include_object)
+ ForeignKeyConstraint(
+ ["parent_id"],
+ ["parent_partition_1.id"],
+ name="fk_to_partition",
+ ),
+ )
+
+ # Model schema: parent and child with FK to parent only
+ # (no partitions, no FK to partition)
+ Table(
+ "parent",
+ m2,
+ Column("id", Integer, primary_key=True),
+ )
+
+ Table(
+ "child",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("parent_id", Integer),
+ ForeignKeyConstraint(["parent_id"], ["parent.id"]),
+ )
+
+ def include_name(name, type_, parent_names):
+ if type_ == "table" and name.startswith("parent_partition"):
+ return False
+ return True
+
+ def include_object(obj, name, type_, reflected, compare_to):
+ if type_ == "foreign_key_constraint":
+ # Should be able to access obj.referred_table without error
+ # The fix creates placeholder tables for filtered FK targets
+ if obj.referred_table.name.startswith("parent_partition"):
+ return False
+ return True
+
+ # Should not raise NoReferencedTableError when accessing
+ # obj.referred_table in include_object
+ diffs = self._fixture(
+ m1, m2, name_filters=include_name, object_filters=include_object
+ )
+ eq_(diffs, [])
+
+ def test_fk_placeholder_type_preservation(self):
+ """Test that placeholder tables preserve column types correctly."""
+ m1 = MetaData()
+ m2 = MetaData()
+
+ # Database schema
+ Table(
+ "parent",
+ m1,
+ Column("id", BigInteger),
+ Column("code", String(100)),
+ PrimaryKeyConstraint("id", "code"),
+ )
+
+ Table(
+ "child",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("parent_id", BigInteger),
+ Column("parent_code", String(100)),
+ ForeignKeyConstraint(
+ ["parent_id", "parent_code"], ["parent.id", "parent.code"]
+ ),
+ )
+
+ # Model schema: only child, no FK
+ Table(
+ "child",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("parent_id", BigInteger),
+ Column("parent_code", String(100)),
+ )
+
+ inspected_type_affinities = {}
+
+ def include_name(name, type_, parent_names):
+ if type_ == "table" and name == "parent":
+ return False
+ return True
+
+ def include_object(obj, name, type_, reflected, compare_to):
+ if type_ == "foreign_key_constraint" and reflected:
+ # Inspect the referred table columns created by the placeholder
+ referred_table = obj.referred_table
+ # Build dict of column name -> type affinity
+ inspected_type_affinities.update(
+ {
+ col.name: col.type._type_affinity
+ for col in referred_table.c
+ }
+ )
+ return True
+
+ # Should create placeholder with correct types and not crash
+ diffs = self._fixture(
+ m1, m2, name_filters=include_name, object_filters=include_object
+ )
+ eq_(len(diffs), 1)
+ eq_(diffs[0][0], "remove_fk")
+
+ # Verify the placeholder table columns have correct type affinities
+ # BigInteger has Integer affinity, String has String affinity
+ eq_(
+ inspected_type_affinities,
+ {"id": Integer, "code": String},
+ )
+
+ @exclusions.only_on("postgresql")
+ def test_fk_to_filtered_table_cross_schema(self):
+ """Test FK to a filtered table in a different schema.
+
+ PostgreSQL-only test since include_schemas=True on MySQL would
+ pick up system databases like 'mysql', 'information_schema', etc.
+ """
+ m1 = MetaData()
+ m2 = MetaData()
+
+ # Database schema: parent in test schema, child in default
+ Table(
+ "parent",
+ m1,
+ Column("id", Integer, primary_key=True),
+ schema=config.test_schema,
+ )
+
+ Table(
+ "child",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("parent_id", Integer),
+ ForeignKeyConstraint(
+ ["parent_id"], [f"{config.test_schema}.parent.id"]
+ ),
+ )
+
+ # Model schema: only child, no FK
+ Table(
+ "child",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("parent_id", Integer),
+ )
+
+ def include_name(name, type_, parent_names):
+ if (
+ type_ == "table"
+ and name == "parent"
+ and parent_names.get("schema_name") == config.test_schema
+ ):
+ return False
+ return True
+
+ # Should not raise NoReferencedTableError
+ diffs = self._fixture(
+ m1, m2, include_schemas=True, name_filters=include_name
+ )
+ assert all(diff[0] in ("remove_fk", "remove_table") for diff in diffs)
+
+ def test_multiple_fks_to_same_filtered_table(self):
+ """Test multiple FKs referencing the same filtered table."""
+ m1 = MetaData()
+ m2 = MetaData()
+
+ # Database schema
+ Table(
+ "parent",
+ m1,
+ Column("id", Integer, primary_key=True),
+ )
+
+ Table(
+ "child",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("parent_id_1", Integer),
+ Column("parent_id_2", Integer),
+ ForeignKeyConstraint(["parent_id_1"], ["parent.id"]),
+ ForeignKeyConstraint(["parent_id_2"], ["parent.id"]),
+ )
+
+ # Model schema: only child, no FKs
+ Table(
+ "child",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("parent_id_1", Integer),
+ Column("parent_id_2", Integer),
+ )
+
+ def include_name(name, type_, parent_names):
+ if type_ == "table" and name == "parent":
+ return False
+ return True
+
+ # Should not raise NoReferencedTableError
+ # Should only create one placeholder table even with multiple FKs
+ diffs = self._fixture(m1, m2, name_filters=include_name)
+ eq_(len(diffs), 2)
+ eq_(diffs[0][0], "remove_fk")
+ eq_(diffs[1][0], "remove_fk")
+
+ def test_fk_to_different_columns_in_filtered_table(self):
+ """Test multiple FKs referencing different columns in same filtered
+ table.
+
+ This tests that the placeholder table accumulates columns from all FKs
+ rather than only having columns from the first FK processed.
+ """
+ m1 = MetaData()
+ m2 = MetaData()
+
+ # Database schema
+ Table(
+ "parent",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("code", String(50), unique=True),
+ Column("version", Integer, unique=True),
+ )
+
+ Table(
+ "child",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("parent_id", Integer),
+ Column("parent_code", String(50)),
+ Column("parent_version", Integer),
+ # FK to parent.id
+ ForeignKeyConstraint(["parent_id"], ["parent.id"]),
+ # FK to parent.code (different column)
+ ForeignKeyConstraint(["parent_code"], ["parent.code"]),
+ # FK to parent.version (yet another column)
+ ForeignKeyConstraint(["parent_version"], ["parent.version"]),
+ )
+
+ # Model schema: only child, no FKs
+ Table(
+ "child",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("parent_id", Integer),
+ Column("parent_code", String(50)),
+ Column("parent_version", Integer),
+ )
+
+ placeholder_columns = {}
+
+ def include_name(name, type_, parent_names):
+ if type_ == "table" and name == "parent":
+ return False
+ return True
+
+ def include_object(obj, name, type_, reflected, compare_to):
+ if type_ == "foreign_key_constraint" and reflected:
+ # Track table columns (will be a placeholder in newer
+ # SQLAlchemy versions, not in 1.4)
+ referred_table = obj.referred_table
+ if referred_table.name == "parent":
+ placeholder_columns.update(
+ {
+ col.name: col.type._type_affinity
+ for col in referred_table.c
+ }
+ )
+ return True
+
+ # Should not raise NoReferencedTableError
+ # Placeholder should accumulate all needed columns
+ diffs = self._fixture(
+ m1, m2, name_filters=include_name, object_filters=include_object
+ )
+ eq_(len(diffs), 3)
+ eq_(diffs[0][0], "remove_fk")
+ eq_(diffs[1][0], "remove_fk")
+ eq_(diffs[2][0], "remove_fk")
+
+ # Verify placeholder table has all three columns
+ eq_(
+ placeholder_columns,
+ {"id": Integer, "code": String, "version": Integer},
+ )
+
+ def test_fk_chain_with_filtered_middle_table(self):
+ """Test FK chain where middle table is filtered."""
+ m1 = MetaData()
+ m2 = MetaData()
+
+ # Database schema: grandparent -> parent -> child
+ Table(
+ "grandparent",
+ m1,
+ Column("id", Integer, primary_key=True),
+ )
+
+ Table(
+ "parent",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("grandparent_id", Integer),
+ ForeignKeyConstraint(["grandparent_id"], ["grandparent.id"]),
+ )
+
+ Table(
+ "child",
+ m1,
+ Column("id", Integer, primary_key=True),
+ Column("parent_id", Integer),
+ ForeignKeyConstraint(["parent_id"], ["parent.id"]),
+ )
+
+ # Model schema: grandparent and child only (parent filtered, so no FK)
+ Table(
+ "grandparent",
+ m2,
+ Column("id", Integer, primary_key=True),
+ )
+
+ Table(
+ "child",
+ m2,
+ Column("id", Integer, primary_key=True),
+ Column("parent_id", Integer),
+ )
+
+ def include_name(name, type_, parent_names):
+ if type_ == "table" and name == "parent":
+ return False
+ return True
+
+ # Should not raise NoReferencedTableError
+ diffs = self._fixture(m1, m2, name_filters=include_name)
+ eq_(len(diffs), 1)
+ eq_(diffs[0][0], "remove_fk")