]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Always include a schema name in SQLite PRAGMA
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 2 Aug 2019 17:03:29 +0000 (13:03 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 2 Aug 2019 21:41:24 +0000 (17:41 -0400)
Fixed bug where usage of "PRAGMA table_info" in SQLite dialect meant that
reflection features to detect for table existence, list of table columns,
and list of foreign keys, would default to any table in any attached
database, when no schema name was given and the table did not exist in the
base schema.  The fix explicitly runs PRAGMA for the 'main' schema and then
the 'temp' schema if the 'main' returned no rows, to maintain the behavior
of tables + temp tables in the "no schema" namespace, attached tables only
in the "schema" namespace.

Fixes: #4793
Change-Id: I75bc03ef42581c46b98987510d2d2e701df07412

doc/build/changelog/unreleased_13/4793.rst [new file with mode: 0644]
doc/build/orm/tutorial.rst
lib/sqlalchemy/dialects/sqlite/base.py
lib/sqlalchemy/testing/suite/test_reflection.py
test/dialect/test_sqlite.py

diff --git a/doc/build/changelog/unreleased_13/4793.rst b/doc/build/changelog/unreleased_13/4793.rst
new file mode 100644 (file)
index 0000000..2227ba1
--- /dev/null
@@ -0,0 +1,13 @@
+.. change:
+    :tags: bug, sqlite
+    :tickets: 4793
+
+    Fixed bug where usage of "PRAGMA table_info" in SQLite dialect meant that
+    reflection features to detect for table existence, list of table columns,
+    and list of foreign keys, would default to any table in any attached
+    database, when no schema name was given and the table did not exist in the
+    base schema.  The fix explicitly runs PRAGMA for the 'main' schema and then
+    the 'temp' schema if the 'main' returned no rows, to maintain the behavior
+    of tables + temp tables in the "no schema" namespace, attached tables only
+    in the "schema" namespace.
+
index 21b70af089d639e0adb07fcfbbc462da64662504..1132c96fc893648276db42e59361aa8e572984a4 100644 (file)
@@ -210,7 +210,9 @@ the actual ``CREATE TABLE`` statement:
 
     >>> Base.metadata.create_all(engine)
     SELECT ...
-    PRAGMA table_info("users")
+    PRAGMA main.table_info("users")
+    ()
+    PRAGMA temp.table_info("users")
     ()
     CREATE TABLE users (
         id INTEGER NOT NULL, name VARCHAR,
index c9309cbad57faf79ed7bd9bc31f6cb3451339d4a..ef8507d0522bc933e3ffb3a1cbd0b8a9ffd23579 100644 (file)
@@ -2003,17 +2003,26 @@ class SQLiteDialect(default.DefaultDialect):
     def _get_table_pragma(self, connection, pragma, table_name, schema=None):
         quote = self.identifier_preparer.quote_identifier
         if schema is not None:
-            statement = "PRAGMA %s." % quote(schema)
+            statements = ["PRAGMA %s." % quote(schema)]
         else:
-            statement = "PRAGMA "
+            # because PRAGMA looks in all attached databases if no schema
+            # given, need to specify "main" schema, however since we want
+            # 'temp' tables in the same namespace as 'main', need to run
+            # the PRAGMA twice
+            statements = ["PRAGMA main.", "PRAGMA temp."]
+
         qtable = quote(table_name)
-        statement = "%s%s(%s)" % (statement, pragma, qtable)
-        cursor = connection.execute(statement)
-        if not cursor._soft_closed:
-            # work around SQLite issue whereby cursor.description
-            # is blank when PRAGMA returns no rows:
-            # http://www.sqlite.org/cvstrac/tktview?tn=1884
-            result = cursor.fetchall()
+        for statement in statements:
+            statement = "%s%s(%s)" % (statement, pragma, qtable)
+            cursor = connection.execute(statement)
+            if not cursor._soft_closed:
+                # work around SQLite issue whereby cursor.description
+                # is blank when PRAGMA returns no rows:
+                # http://www.sqlite.org/cvstrac/tktview?tn=1884
+                result = cursor.fetchall()
+            else:
+                result = []
+            if result:
+                return result
         else:
-            result = []
-        return result
+            return []
index 53e1599b3533e5905ed7a0c39eda057b27667cbf..9ca13ec4e58527880bd563debade1ae995aea745 100644 (file)
@@ -24,6 +24,8 @@ from ...engine.reflection import Inspector
 from ...schema import DDL
 from ...schema import Index
 from ...sql.elements import quoted_name
+from ...testing import is_false
+from ...testing import is_true
 
 
 metadata, users = None, None
@@ -40,11 +42,39 @@ class HasTableTest(fixtures.TablesTest):
             Column("id", Integer, primary_key=True),
             Column("data", String(50)),
         )
+        if testing.requires.schemas.enabled:
+            Table(
+                "test_table_s",
+                metadata,
+                Column("id", Integer, primary_key=True),
+                Column("data", String(50)),
+                schema=config.test_schema,
+            )
 
     def test_has_table(self):
         with config.db.begin() as conn:
-            assert config.db.dialect.has_table(conn, "test_table")
-            assert not config.db.dialect.has_table(conn, "nonexistent_table")
+            is_true(config.db.dialect.has_table(conn, "test_table"))
+            is_false(config.db.dialect.has_table(conn, "test_table_s"))
+            is_false(config.db.dialect.has_table(conn, "nonexistent_table"))
+
+    @testing.requires.schemas
+    def test_has_table_schema(self):
+        with config.db.begin() as conn:
+            is_false(
+                config.db.dialect.has_table(
+                    conn, "test_table", schema=config.test_schema
+                )
+            )
+            is_true(
+                config.db.dialect.has_table(
+                    conn, "test_table_s", schema=config.test_schema
+                )
+            )
+            is_false(
+                config.db.dialect.has_table(
+                    conn, "nonexistent_table", schema=config.test_schema
+                )
+            )
 
 
 class ComponentReflectionTest(fixtures.TablesTest):
index e2004069d17abde87d6f01c08247d66190b3e72f..e727005103dfa3d138c4ad128b02999e880b6b91 100644 (file)
@@ -694,6 +694,9 @@ class AttachedDBTest(fixtures.TestBase):
     def _fixture(self):
         meta = self.metadata
         self.conn = testing.db.connect()
+        Table("created", meta, Column("foo", Integer), Column("bar", String))
+        Table("local_only", meta, Column("q", Integer), Column("p", Integer))
+
         ct = Table(
             "created",
             meta,
@@ -702,6 +705,14 @@ class AttachedDBTest(fixtures.TestBase):
             schema="test_schema",
         )
 
+        Table(
+            "another_created",
+            meta,
+            Column("bat", Integer),
+            Column("hoho", String),
+            schema="test_schema",
+        )
+
         meta.create_all(self.conn)
         return ct
 
@@ -717,15 +728,59 @@ class AttachedDBTest(fixtures.TestBase):
         insp = inspect(self.conn)
         eq_(insp.get_table_names("test_schema"), [])
 
+    def test_column_names(self):
+        self._fixture()
+        insp = inspect(self.conn)
+        eq_(
+            [
+                d["name"]
+                for d in insp.get_columns("created", schema="test_schema")
+            ],
+            ["id", "name"],
+        )
+        eq_(
+            [d["name"] for d in insp.get_columns("created", schema=None)],
+            ["foo", "bar"],
+        )
+
+        eq_(
+            [
+                d["name"]
+                for d in insp.get_columns("nonexistent", schema="test_schema")
+            ],
+            [],
+        )
+        eq_(
+            [
+                d["name"]
+                for d in insp.get_columns("another_created", schema=None)
+            ],
+            [],
+        )
+        eq_(
+            [
+                d["name"]
+                for d in insp.get_columns("local_only", schema="test_schema")
+            ],
+            [],
+        )
+        eq_([d["name"] for d in insp.get_columns("local_only")], ["q", "p"])
+
     def test_table_names_present(self):
         self._fixture()
         insp = inspect(self.conn)
-        eq_(insp.get_table_names("test_schema"), ["created"])
+        eq_(
+            set(insp.get_table_names("test_schema")),
+            {"created", "another_created"},
+        )
 
     def test_table_names_system(self):
         self._fixture()
         insp = inspect(self.conn)
-        eq_(insp.get_table_names("test_schema"), ["created"])
+        eq_(
+            set(insp.get_table_names("test_schema")),
+            {"created", "another_created"},
+        )
 
     def test_schema_names(self):
         self._fixture()