]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Fix reflection of FK against a unique index
authorGord Thompson <gord@gordthompson.com>
Sat, 9 Oct 2021 19:58:36 +0000 (13:58 -0600)
committermike bayer <mike_mp@zzzcomputing.com>
Thu, 14 Oct 2021 22:38:18 +0000 (22:38 +0000)
Also implement reflection of ON DELETE, ON UPDATE
as the data is right there.

Fixes: #7160
Change-Id: Ifff871a8cb1d1bea235616042e16ed3b5c5f19f9

doc/build/changelog/unreleased_14/7160.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/mssql/base.py
test/dialect/mssql/test_reflection.py
test/requirements.py

diff --git a/doc/build/changelog/unreleased_14/7160.rst b/doc/build/changelog/unreleased_14/7160.rst
new file mode 100644 (file)
index 0000000..b086efc
--- /dev/null
@@ -0,0 +1,14 @@
+.. change::
+    :tags: bug, mssql
+    :tickets: 7160
+
+    Fixed issue with :meth:`.Inspector.get_foreign_keys` where foreign
+    keys were omitted if they were established against a unique
+    index instead of a unique constraint.
+
+
+.. change::
+    :tags: usecase, mssql
+
+    Added reflection support for SQL Server foreign key options, including
+    "ON UPDATE" and "ON DELETE" values of "CASCADE" and "SET NULL".
index 414caedc35884b40db9e87eb2d905349f1e7c62d..beec9348a4f3f7f3805288e10b474365dce7aead 100644 (file)
@@ -3264,33 +3264,122 @@ class MSDialect(default.DefaultDialect):
     def get_foreign_keys(
         self, connection, tablename, dbname, owner, schema, **kw
     ):
-        RR = ischema.ref_constraints
-        C = ischema.key_constraints.alias("C")
-        R = ischema.key_constraints.alias("R")
         # Foreign key constraints
         s = (
-            sql.select(
-                C.c.column_name,
-                R.c.table_schema,
-                R.c.table_name,
-                R.c.column_name,
-                RR.c.constraint_name,
-                RR.c.match_option,
-                RR.c.update_rule,
-                RR.c.delete_rule,
+            text(
+                """\
+WITH fk_info AS (
+    SELECT
+        ischema_ref_con.constraint_schema,
+        ischema_ref_con.constraint_name,
+        ischema_key_col.ordinal_position,
+        ischema_key_col.table_schema,
+        ischema_key_col.table_name,
+        ischema_ref_con.unique_constraint_schema,
+        ischema_ref_con.unique_constraint_name,
+        ischema_ref_con.match_option,
+        ischema_ref_con.update_rule,
+        ischema_ref_con.delete_rule,
+        ischema_key_col.column_name AS constrained_column
+    FROM
+        INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS ischema_ref_con
+        INNER JOIN
+        INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col ON
+            ischema_key_col.table_schema = ischema_ref_con.constraint_schema
+            AND ischema_key_col.constraint_name =
+            ischema_ref_con.constraint_name
+    WHERE ischema_key_col.table_name = :tablename
+        AND ischema_key_col.table_schema = :owner
+),
+constraint_info AS (
+    SELECT
+        ischema_key_col.constraint_schema,
+        ischema_key_col.constraint_name,
+        ischema_key_col.ordinal_position,
+        ischema_key_col.table_schema,
+        ischema_key_col.table_name,
+        ischema_key_col.column_name
+    FROM
+        INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col
+),
+index_info AS (
+    SELECT
+        sys.schemas.name AS index_schema,
+        sys.indexes.name AS index_name,
+        sys.index_columns.key_ordinal AS ordinal_position,
+        sys.schemas.name AS table_schema,
+        sys.objects.name AS table_name,
+        sys.columns.name AS column_name
+    FROM
+        sys.indexes
+        INNER JOIN
+        sys.objects ON
+            sys.objects.object_id = sys.indexes.object_id
+        INNER JOIN
+        sys.schemas ON
+            sys.schemas.schema_id = sys.objects.schema_id
+        INNER JOIN
+        sys.index_columns ON
+            sys.index_columns.object_id = sys.objects.object_id
+            AND sys.index_columns.index_id = sys.indexes.index_id
+        INNER JOIN
+        sys.columns ON
+            sys.columns.object_id = sys.indexes.object_id
+            AND sys.columns.column_id = sys.index_columns.column_id
+)
+    SELECT
+        fk_info.constraint_schema,
+        fk_info.constraint_name,
+        fk_info.ordinal_position,
+        fk_info.constrained_column,
+        constraint_info.table_schema AS referred_table_schema,
+        constraint_info.table_name AS referred_table_name,
+        constraint_info.column_name AS referred_column,
+        fk_info.match_option,
+        fk_info.update_rule,
+        fk_info.delete_rule
+    FROM
+        fk_info INNER JOIN constraint_info ON
+            constraint_info.constraint_schema =
+                fk_info.unique_constraint_schema
+            AND constraint_info.constraint_name =
+                fk_info.unique_constraint_name
+            AND constraint_info.ordinal_position = fk_info.ordinal_position
+    UNION
+    SELECT
+        fk_info.constraint_schema,
+        fk_info.constraint_name,
+        fk_info.ordinal_position,
+        fk_info.constrained_column,
+        index_info.table_schema AS referred_table_schema,
+        index_info.table_name AS referred_table_name,
+        index_info.column_name AS referred_column,
+        fk_info.match_option,
+        fk_info.update_rule,
+        fk_info.delete_rule
+    FROM
+        fk_info INNER JOIN index_info ON
+            index_info.index_schema = fk_info.unique_constraint_schema
+            AND index_info.index_name = fk_info.unique_constraint_name
+            AND index_info.ordinal_position = fk_info.ordinal_position
+
+    ORDER BY constraint_schema, constraint_name, ordinal_position
+"""
             )
-            .where(
-                sql.and_(
-                    C.c.table_name == tablename,
-                    C.c.table_schema == owner,
-                    RR.c.constraint_schema == C.c.table_schema,
-                    C.c.constraint_name == RR.c.constraint_name,
-                    R.c.constraint_name == RR.c.unique_constraint_name,
-                    R.c.constraint_schema == RR.c.unique_constraint_schema,
-                    C.c.ordinal_position == R.c.ordinal_position,
-                )
+            .bindparams(
+                sql.bindparam("tablename", tablename, ischema.CoerceUnicode()),
+                sql.bindparam("owner", owner, ischema.CoerceUnicode()),
+            )
+            .columns(
+                constraint_schema=sqltypes.Unicode(),
+                constraint_name=sqltypes.Unicode(),
+                table_schema=sqltypes.Unicode(),
+                table_name=sqltypes.Unicode(),
+                constrained_column=sqltypes.Unicode(),
+                referred_table_schema=sqltypes.Unicode(),
+                referred_table_name=sqltypes.Unicode(),
+                referred_column=sqltypes.Unicode(),
             )
-            .order_by(RR.c.constraint_name, R.c.ordinal_position)
         )
 
         # group rows by constraint ID, to handle multi-column FKs
@@ -3303,15 +3392,37 @@ class MSDialect(default.DefaultDialect):
                 "referred_schema": None,
                 "referred_table": None,
                 "referred_columns": [],
+                "options": {},
             }
 
         fkeys = util.defaultdict(fkey_rec)
 
         for r in connection.execute(s).fetchall():
-            scol, rschema, rtbl, rcol, rfknm, fkmatch, fkuprule, fkdelrule = r
+            (
+                _,  # constraint schema
+                rfknm,
+                _,  # ordinal position
+                scol,
+                rschema,
+                rtbl,
+                rcol,
+                # TODO: we support match=<keyword> for foreign keys so
+                # we can support this also, PG has match=FULL for example
+                # but this seems to not be a valid value for SQL Server
+                _,  # match rule
+                fkuprule,
+                fkdelrule,
+            ) = r
 
             rec = fkeys[rfknm]
             rec["name"] = rfknm
+
+            if fkuprule != "NO ACTION":
+                rec["options"]["onupdate"] = fkuprule
+
+            if fkdelrule != "NO ACTION":
+                rec["options"]["ondelete"] = fkdelrule
+
             if not rec["referred_table"]:
                 rec["referred_table"] = rtbl
                 if schema is not None or owner != rschema:
index 12bb46d913e5f631544e2334ba7e7deb7369b806..1fa301e282bd2a5d588a57465590e10f395ac214 100644 (file)
@@ -7,6 +7,7 @@ from sqlalchemy import DDL
 from sqlalchemy import event
 from sqlalchemy import exc
 from sqlalchemy import ForeignKey
+from sqlalchemy import ForeignKeyConstraint
 from sqlalchemy import Identity
 from sqlalchemy import Index
 from sqlalchemy import inspect
@@ -217,6 +218,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
                     "referred_schema": "test_schema",
                     "referred_table": "subject",
                     "referred_columns": ["id"],
+                    "options": {},
                 }
             ],
         )
@@ -384,6 +386,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
                     "referred_schema": referred_schema,
                     "name": "fkfoo",
                     "constrained_columns": ["foo_id"],
+                    "options": {},
                 }
             ],
         )
@@ -399,6 +402,56 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
         )
         eq_(m2.tables["%s.foo" % referred_schema].schema, referred_schema)
 
+    def test_fk_on_unique_index(self, metadata, connection):
+        # test for issue #7160
+        Table(
+            "uidx_parent",
+            metadata,
+            Column("id", Integer, primary_key=True),
+            Column("uidx_col1", Integer, nullable=False),
+            Column("uidx_col2", Integer, nullable=False),
+            Index(
+                "UIDX_composite",
+                "uidx_col1",
+                "uidx_col2",
+                unique=True,
+            ),
+        )
+
+        Table(
+            "uidx_child",
+            metadata,
+            Column("id", Integer, primary_key=True),
+            Column("parent_uidx_col1", Integer, nullable=False),
+            Column("parent_uidx_col2", Integer, nullable=False),
+            ForeignKeyConstraint(
+                ["parent_uidx_col1", "parent_uidx_col2"],
+                ["uidx_parent.uidx_col1", "uidx_parent.uidx_col2"],
+                name="FK_uidx_parent",
+            ),
+        )
+
+        metadata.create_all(connection)
+
+        inspector = inspect(connection)
+        fk_info = inspector.get_foreign_keys("uidx_child")
+        eq_(
+            fk_info,
+            [
+                {
+                    "referred_table": "uidx_parent",
+                    "referred_columns": ["uidx_col1", "uidx_col2"],
+                    "referred_schema": None,
+                    "name": "FK_uidx_parent",
+                    "constrained_columns": [
+                        "parent_uidx_col1",
+                        "parent_uidx_col2",
+                    ],
+                    "options": {},
+                }
+            ],
+        )
+
     def test_indexes_cols(self, metadata, connection):
 
         t1 = Table("t", metadata, Column("x", Integer), Column("y", Integer))
index 7efd6cbd50da4a289457656281ed4fba619cf416..d1dd7fc0e189f5a290bee235e499eb02bbd4b70a 100644 (file)
@@ -132,7 +132,9 @@ class DefaultRequirements(SuiteRequirements):
 
     @property
     def foreign_key_constraint_option_reflection_ondelete(self):
-        return only_on(["postgresql", "mysql", "mariadb", "sqlite", "oracle"])
+        return only_on(
+            ["postgresql", "mysql", "mariadb", "sqlite", "oracle", "mssql"]
+        )
 
     @property
     def fk_constraint_option_reflection_ondelete_restrict(self):
@@ -140,11 +142,11 @@ class DefaultRequirements(SuiteRequirements):
 
     @property
     def fk_constraint_option_reflection_ondelete_noaction(self):
-        return only_on(["postgresql", "mysql", "mariadb", "sqlite"])
+        return only_on(["postgresql", "mysql", "mariadb", "sqlite", "mssql"])
 
     @property
     def foreign_key_constraint_option_reflection_onupdate(self):
-        return only_on(["postgresql", "mysql", "mariadb", "sqlite"])
+        return only_on(["postgresql", "mysql", "mariadb", "sqlite", "mssql"])
 
     @property
     def fk_constraint_option_reflection_onupdate_restrict(self):