]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Reflect PK of referred table if referred columns not present
authorMike Bayer <mike_mp@zzzcomputing.com>
Sun, 18 Aug 2019 17:03:49 +0000 (13:03 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 18 Aug 2019 20:34:48 +0000 (16:34 -0400)
Fixed bug where a FOREIGN KEY that was set up to refer to the parent table
by table name only without the column names would not correctly be
reflected as far as setting up the "referred columns", since SQLite's
PRAGMA does not report on these columns if they weren't given explicitly.
For some reason this was harcoded to assume the name of the local column,
which might work for some cases but is not correct. The new approach
reflects the primary key of the referred table and uses the constraint
columns list as the referred columns list, if the remote column(s) aren't
present in the reflected pragma directly.

Fixes: #4810
Change-Id: I7789f83d68845ae197a782080af8ec64a7bf48cc

doc/build/changelog/unreleased_13/4810.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/sqlite/base.py
test/dialect/test_sqlite.py

diff --git a/doc/build/changelog/unreleased_13/4810.rst b/doc/build/changelog/unreleased_13/4810.rst
new file mode 100644 (file)
index 0000000..b7a0dce
--- /dev/null
@@ -0,0 +1,14 @@
+.. change::
+    :tags: bug, sqlite, reflection
+    :tickets: 4810
+
+    Fixed bug where a FOREIGN KEY that was set up to refer to the parent table
+    by table name only without the column names would not correctly be
+    reflected as far as setting up the "referred columns", since SQLite's
+    PRAGMA does not report on these columns if they weren't given explicitly.
+    For some reason this was harcoded to assume the name of the local column,
+    which might work for some cases but is not correct. The new approach
+    reflects the primary key of the referred table and uses the constraint
+    columns list as the referred columns list, if the remote column(s) aren't
+    present in the reflected pragma directly.
+
index 78ce18ac6415e3a45d30a07f42c582661e6a60a9..7be0e06dc384507dd157b9bc0ccb37c75573c2c6 100644 (file)
@@ -1754,8 +1754,22 @@ class SQLiteDialect(default.DefaultDialect):
         for row in pragma_fks:
             (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4])
 
-            if rcol is None:
-                rcol = lcol
+            if not rcol:
+                # no referred column, which means it was not named in the
+                # original DDL.  The referred columns of the foreign key
+                # constraint are therefore the primary key of the referred
+                # table.
+                referred_pk = self.get_pk_constraint(
+                    connection, rtbl, schema=schema, **kw
+                )
+                # note that if table doesnt exist, we still get back a record,
+                # just it has no columns in it
+                referred_columns = referred_pk["constrained_columns"]
+            else:
+                # note we use this list only if this is the first column
+                # in the constraint.  for subsequent columns we ignore the
+                # list and append "rcol" if present.
+                referred_columns = []
 
             if self._broken_fk_pragma_quotes:
                 rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl)
@@ -1768,13 +1782,15 @@ class SQLiteDialect(default.DefaultDialect):
                     "constrained_columns": [],
                     "referred_schema": schema,
                     "referred_table": rtbl,
-                    "referred_columns": [],
+                    "referred_columns": referred_columns,
                     "options": {},
                 }
                 fks[numerical_id] = fk
 
             fk["constrained_columns"].append(lcol)
-            fk["referred_columns"].append(rcol)
+
+            if rcol:
+                fk["referred_columns"].append(rcol)
 
         def fk_sig(constrained_columns, referred_table, referred_columns):
             return (
index 1e894da5566e0ffbc6af239bf4f617fd63255194..b5a16206879677c932c5eae7bdfc5de5a49dd09f 100644 (file)
@@ -1770,10 +1770,45 @@ class ConstraintReflectionTest(fixtures.TestBase):
                 ")"
             )
 
+            conn.execute(
+                "CREATE TABLE implicit_referred (pk integer primary key)"
+            )
+            # single col foreign key with no referred column given,
+            # must assume primary key of referred table
+            conn.execute(
+                "CREATE TABLE implicit_referrer "
+                "(id integer REFERENCES implicit_referred)"
+            )
+
+            conn.execute(
+                "CREATE TABLE implicit_referred_comp "
+                "(pk1 integer, pk2 integer, primary key (pk1, pk2))"
+            )
+            # composite foreign key with no referred columns given,
+            # must assume primary key of referred table
+            conn.execute(
+                "CREATE TABLE implicit_referrer_comp "
+                "(id1 integer, id2 integer, foreign key(id1, id2) "
+                "REFERENCES implicit_referred_comp)"
+            )
+
+            # worst case - FK that refers to nonexistent table so we cant
+            # get pks.  requires FK pragma is turned off
+            conn.execute(
+                "CREATE TABLE implicit_referrer_comp_fake "
+                "(id1 integer, id2 integer, foreign key(id1, id2) "
+                "REFERENCES fake_table)"
+            )
+
     @classmethod
     def teardown_class(cls):
         with testing.db.begin() as conn:
             for name in [
+                "implicit_referrer_comp_fake",
+                "implicit_referrer",
+                "implicit_referred",
+                "implicit_referrer_comp",
+                "implicit_referred_comp",
                 "m",
                 "main.l",
                 "k",
@@ -1892,6 +1927,72 @@ class ConstraintReflectionTest(fixtures.TestBase):
             ],
         )
 
+    def test_foreign_key_implicit_parent(self):
+        inspector = Inspector(testing.db)
+        fks = inspector.get_foreign_keys("implicit_referrer")
+        eq_(
+            fks,
+            [
+                {
+                    "name": None,
+                    "constrained_columns": ["id"],
+                    "referred_schema": None,
+                    "referred_table": "implicit_referred",
+                    "referred_columns": ["pk"],
+                    "options": {},
+                }
+            ],
+        )
+
+    def test_foreign_key_composite_implicit_parent(self):
+        inspector = Inspector(testing.db)
+        fks = inspector.get_foreign_keys("implicit_referrer_comp")
+        eq_(
+            fks,
+            [
+                {
+                    "name": None,
+                    "constrained_columns": ["id1", "id2"],
+                    "referred_schema": None,
+                    "referred_table": "implicit_referred_comp",
+                    "referred_columns": ["pk1", "pk2"],
+                    "options": {},
+                }
+            ],
+        )
+
+    def test_foreign_key_implicit_missing_parent(self):
+        # test when the FK refers to a non-existent table and column names
+        # aren't given.   only sqlite allows this case to exist
+        inspector = Inspector(testing.db)
+        fks = inspector.get_foreign_keys("implicit_referrer_comp_fake")
+        # the referred table doesn't exist but the operation does not fail
+        eq_(
+            fks,
+            [
+                {
+                    "name": None,
+                    "constrained_columns": ["id1", "id2"],
+                    "referred_schema": None,
+                    "referred_table": "fake_table",
+                    "referred_columns": [],
+                    "options": {},
+                }
+            ],
+        )
+
+    def test_foreign_key_implicit_missing_parent_reflection(self):
+        # full Table reflection fails however, which is not a new behavior
+        m = MetaData()
+        assert_raises_message(
+            exc.NoSuchTableError,
+            "fake_table",
+            Table,
+            "implicit_referrer_comp_fake",
+            m,
+            autoload_with=testing.db,
+        )
+
     def test_unnamed_inline_foreign_key(self):
         inspector = Inspector(testing.db)
         fks = inspector.get_foreign_keys("e")