]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Reflect PK of referred table if referred columns not present
authorMike Bayer <mike_mp@zzzcomputing.com>
Sun, 18 Aug 2019 17:03:49 +0000 (13:03 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 18 Aug 2019 20:35:07 +0000 (16:35 -0400)
Fixed bug where a FOREIGN KEY that was set up to refer to the parent table
by table name only without the column names would not correctly be
reflected as far as setting up the "referred columns", since SQLite's
PRAGMA does not report on these columns if they weren't given explicitly.
For some reason this was harcoded to assume the name of the local column,
which might work for some cases but is not correct. The new approach
reflects the primary key of the referred table and uses the constraint
columns list as the referred columns list, if the remote column(s) aren't
present in the reflected pragma directly.

Fixes: #4810
Change-Id: I7789f83d68845ae197a782080af8ec64a7bf48cc
(cherry picked from commit f06c6ba67303e5c75d8ad044494193d8f97b2e2e)

doc/build/changelog/unreleased_13/4810.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/sqlite/base.py
test/dialect/test_sqlite.py

diff --git a/doc/build/changelog/unreleased_13/4810.rst b/doc/build/changelog/unreleased_13/4810.rst
new file mode 100644 (file)
index 0000000..b7a0dce
--- /dev/null
@@ -0,0 +1,14 @@
+.. change::
+    :tags: bug, sqlite, reflection
+    :tickets: 4810
+
+    Fixed bug where a FOREIGN KEY that was set up to refer to the parent table
+    by table name only without the column names would not correctly be
+    reflected as far as setting up the "referred columns", since SQLite's
+    PRAGMA does not report on these columns if they weren't given explicitly.
+    For some reason this was harcoded to assume the name of the local column,
+    which might work for some cases but is not correct. The new approach
+    reflects the primary key of the referred table and uses the constraint
+    columns list as the referred columns list, if the remote column(s) aren't
+    present in the reflected pragma directly.
+
index 9a0455a9d0cd62b47ee70028d94c878974155e42..6848c0e64cf69cec9a532ca7d1969d5a80baa53c 100644 (file)
@@ -1754,8 +1754,22 @@ class SQLiteDialect(default.DefaultDialect):
         for row in pragma_fks:
             (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4])
 
-            if rcol is None:
-                rcol = lcol
+            if not rcol:
+                # no referred column, which means it was not named in the
+                # original DDL.  The referred columns of the foreign key
+                # constraint are therefore the primary key of the referred
+                # table.
+                referred_pk = self.get_pk_constraint(
+                    connection, rtbl, schema=schema, **kw
+                )
+                # note that if table doesnt exist, we still get back a record,
+                # just it has no columns in it
+                referred_columns = referred_pk["constrained_columns"]
+            else:
+                # note we use this list only if this is the first column
+                # in the constraint.  for subsequent columns we ignore the
+                # list and append "rcol" if present.
+                referred_columns = []
 
             if self._broken_fk_pragma_quotes:
                 rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl)
@@ -1768,13 +1782,15 @@ class SQLiteDialect(default.DefaultDialect):
                     "constrained_columns": [],
                     "referred_schema": schema,
                     "referred_table": rtbl,
-                    "referred_columns": [],
+                    "referred_columns": referred_columns,
                     "options": {},
                 }
                 fks[numerical_id] = fk
 
             fk["constrained_columns"].append(lcol)
-            fk["referred_columns"].append(rcol)
+
+            if rcol:
+                fk["referred_columns"].append(rcol)
 
         def fk_sig(constrained_columns, referred_table, referred_columns):
             return (
index 755fd6c296ff2b89f58d047028e5172ab73059a3..162282a6475df59bfa5f2d9ae3d11fb4c73e8288 100644 (file)
@@ -1770,10 +1770,45 @@ class ConstraintReflectionTest(fixtures.TestBase):
                 ")"
             )
 
+            conn.execute(
+                "CREATE TABLE implicit_referred (pk integer primary key)"
+            )
+            # single col foreign key with no referred column given,
+            # must assume primary key of referred table
+            conn.execute(
+                "CREATE TABLE implicit_referrer "
+                "(id integer REFERENCES implicit_referred)"
+            )
+
+            conn.execute(
+                "CREATE TABLE implicit_referred_comp "
+                "(pk1 integer, pk2 integer, primary key (pk1, pk2))"
+            )
+            # composite foreign key with no referred columns given,
+            # must assume primary key of referred table
+            conn.execute(
+                "CREATE TABLE implicit_referrer_comp "
+                "(id1 integer, id2 integer, foreign key(id1, id2) "
+                "REFERENCES implicit_referred_comp)"
+            )
+
+            # worst case - FK that refers to nonexistent table so we cant
+            # get pks.  requires FK pragma is turned off
+            conn.execute(
+                "CREATE TABLE implicit_referrer_comp_fake "
+                "(id1 integer, id2 integer, foreign key(id1, id2) "
+                "REFERENCES fake_table)"
+            )
+
     @classmethod
     def teardown_class(cls):
         with testing.db.begin() as conn:
             for name in [
+                "implicit_referrer_comp_fake",
+                "implicit_referrer",
+                "implicit_referred",
+                "implicit_referrer_comp",
+                "implicit_referred_comp",
                 "m",
                 "main.l",
                 "k",
@@ -1892,6 +1927,72 @@ class ConstraintReflectionTest(fixtures.TestBase):
             ],
         )
 
+    def test_foreign_key_implicit_parent(self):
+        inspector = Inspector(testing.db)
+        fks = inspector.get_foreign_keys("implicit_referrer")
+        eq_(
+            fks,
+            [
+                {
+                    "name": None,
+                    "constrained_columns": ["id"],
+                    "referred_schema": None,
+                    "referred_table": "implicit_referred",
+                    "referred_columns": ["pk"],
+                    "options": {},
+                }
+            ],
+        )
+
+    def test_foreign_key_composite_implicit_parent(self):
+        inspector = Inspector(testing.db)
+        fks = inspector.get_foreign_keys("implicit_referrer_comp")
+        eq_(
+            fks,
+            [
+                {
+                    "name": None,
+                    "constrained_columns": ["id1", "id2"],
+                    "referred_schema": None,
+                    "referred_table": "implicit_referred_comp",
+                    "referred_columns": ["pk1", "pk2"],
+                    "options": {},
+                }
+            ],
+        )
+
+    def test_foreign_key_implicit_missing_parent(self):
+        # test when the FK refers to a non-existent table and column names
+        # aren't given.   only sqlite allows this case to exist
+        inspector = Inspector(testing.db)
+        fks = inspector.get_foreign_keys("implicit_referrer_comp_fake")
+        # the referred table doesn't exist but the operation does not fail
+        eq_(
+            fks,
+            [
+                {
+                    "name": None,
+                    "constrained_columns": ["id1", "id2"],
+                    "referred_schema": None,
+                    "referred_table": "fake_table",
+                    "referred_columns": [],
+                    "options": {},
+                }
+            ],
+        )
+
+    def test_foreign_key_implicit_missing_parent_reflection(self):
+        # full Table reflection fails however, which is not a new behavior
+        m = MetaData()
+        assert_raises_message(
+            exc.NoSuchTableError,
+            "fake_table",
+            Table,
+            "implicit_referrer_comp_fake",
+            m,
+            autoload_with=testing.db,
+        )
+
     def test_unnamed_inline_foreign_key(self):
         inspector = Inspector(testing.db)
         fks = inspector.get_foreign_keys("e")