]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Always include a schema name in SQLite PRAGMA
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 2 Aug 2019 17:03:29 +0000 (13:03 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 2 Aug 2019 21:41:47 +0000 (17:41 -0400)
Fixed bug where usage of "PRAGMA table_info" in SQLite dialect meant that
reflection features to detect for table existence, list of table columns,
and list of foreign keys, would default to any table in any attached
database, when no schema name was given and the table did not exist in the
base schema.  The fix explicitly runs PRAGMA for the 'main' schema and then
the 'temp' schema if the 'main' returned no rows, to maintain the behavior
of tables + temp tables in the "no schema" namespace, attached tables only
in the "schema" namespace.

Fixes: #4793
Change-Id: I75bc03ef42581c46b98987510d2d2e701df07412
(cherry picked from commit e091775f1c4c817093e9a936a3abc79b5e311f93)

doc/build/changelog/unreleased_13/4793.rst [new file with mode: 0644]
doc/build/orm/tutorial.rst
lib/sqlalchemy/dialects/sqlite/base.py
lib/sqlalchemy/testing/suite/test_reflection.py
test/dialect/test_sqlite.py

diff --git a/doc/build/changelog/unreleased_13/4793.rst b/doc/build/changelog/unreleased_13/4793.rst
new file mode 100644 (file)
index 0000000..2227ba1
--- /dev/null
@@ -0,0 +1,13 @@
+.. change:
+    :tags: bug, sqlite
+    :tickets: 4793
+
+    Fixed bug where usage of "PRAGMA table_info" in SQLite dialect meant that
+    reflection features to detect for table existence, list of table columns,
+    and list of foreign keys, would default to any table in any attached
+    database, when no schema name was given and the table did not exist in the
+    base schema.  The fix explicitly runs PRAGMA for the 'main' schema and then
+    the 'temp' schema if the 'main' returned no rows, to maintain the behavior
+    of tables + temp tables in the "no schema" namespace, attached tables only
+    in the "schema" namespace.
+
index 283125c823e33371401415a66922c16eb01e4576..686b774b97a80db8b5f6f455bfe81fd5796f76e7 100644 (file)
@@ -210,7 +210,9 @@ the actual ``CREATE TABLE`` statement:
 
     >>> Base.metadata.create_all(engine)
     SELECT ...
-    PRAGMA table_info("users")
+    PRAGMA main.table_info("users")
+    ()
+    PRAGMA temp.table_info("users")
     ()
     CREATE TABLE users (
         id INTEGER NOT NULL, name VARCHAR,
index f8bf2c620867618b8609fd2b5673d1d7991516c3..4ea194c750c9c1947973e2f1d54b3a2da322aead 100644 (file)
@@ -1999,17 +1999,26 @@ class SQLiteDialect(default.DefaultDialect):
     def _get_table_pragma(self, connection, pragma, table_name, schema=None):
         quote = self.identifier_preparer.quote_identifier
         if schema is not None:
-            statement = "PRAGMA %s." % quote(schema)
+            statements = ["PRAGMA %s." % quote(schema)]
         else:
-            statement = "PRAGMA "
+            # because PRAGMA looks in all attached databases if no schema
+            # given, need to specify "main" schema, however since we want
+            # 'temp' tables in the same namespace as 'main', need to run
+            # the PRAGMA twice
+            statements = ["PRAGMA main.", "PRAGMA temp."]
+
         qtable = quote(table_name)
-        statement = "%s%s(%s)" % (statement, pragma, qtable)
-        cursor = connection.execute(statement)
-        if not cursor._soft_closed:
-            # work around SQLite issue whereby cursor.description
-            # is blank when PRAGMA returns no rows:
-            # http://www.sqlite.org/cvstrac/tktview?tn=1884
-            result = cursor.fetchall()
+        for statement in statements:
+            statement = "%s%s(%s)" % (statement, pragma, qtable)
+            cursor = connection.execute(statement)
+            if not cursor._soft_closed:
+                # work around SQLite issue whereby cursor.description
+                # is blank when PRAGMA returns no rows:
+                # http://www.sqlite.org/cvstrac/tktview?tn=1884
+                result = cursor.fetchall()
+            else:
+                result = []
+            if result:
+                return result
         else:
-            result = []
-        return result
+            return []
index e529a0753c7d1be422b509f4bc9a63b33d1d8145..a1297a5804747645794c6f190c8e6c9f8ce8dc10 100644 (file)
@@ -24,6 +24,8 @@ from ...engine.reflection import Inspector
 from ...schema import DDL
 from ...schema import Index
 from ...sql.elements import quoted_name
+from ...testing import is_false
+from ...testing import is_true
 
 
 metadata, users = None, None
@@ -40,11 +42,39 @@ class HasTableTest(fixtures.TablesTest):
             Column("id", Integer, primary_key=True),
             Column("data", String(50)),
         )
+        if testing.requires.schemas.enabled:
+            Table(
+                "test_table_s",
+                metadata,
+                Column("id", Integer, primary_key=True),
+                Column("data", String(50)),
+                schema=config.test_schema,
+            )
 
     def test_has_table(self):
         with config.db.begin() as conn:
-            assert config.db.dialect.has_table(conn, "test_table")
-            assert not config.db.dialect.has_table(conn, "nonexistent_table")
+            is_true(config.db.dialect.has_table(conn, "test_table"))
+            is_false(config.db.dialect.has_table(conn, "test_table_s"))
+            is_false(config.db.dialect.has_table(conn, "nonexistent_table"))
+
+    @testing.requires.schemas
+    def test_has_table_schema(self):
+        with config.db.begin() as conn:
+            is_false(
+                config.db.dialect.has_table(
+                    conn, "test_table", schema=config.test_schema
+                )
+            )
+            is_true(
+                config.db.dialect.has_table(
+                    conn, "test_table_s", schema=config.test_schema
+                )
+            )
+            is_false(
+                config.db.dialect.has_table(
+                    conn, "nonexistent_table", schema=config.test_schema
+                )
+            )
 
 
 class ComponentReflectionTest(fixtures.TablesTest):
index c9332b7fac31af3762304bc335c83ce88b1705f5..bd24878ae93c66d06556bd95e0c693b2b0f4ffc3 100644 (file)
@@ -694,6 +694,9 @@ class AttachedDBTest(fixtures.TestBase):
     def _fixture(self):
         meta = self.metadata
         self.conn = testing.db.connect()
+        Table("created", meta, Column("foo", Integer), Column("bar", String))
+        Table("local_only", meta, Column("q", Integer), Column("p", Integer))
+
         ct = Table(
             "created",
             meta,
@@ -702,6 +705,14 @@ class AttachedDBTest(fixtures.TestBase):
             schema="test_schema",
         )
 
+        Table(
+            "another_created",
+            meta,
+            Column("bat", Integer),
+            Column("hoho", String),
+            schema="test_schema",
+        )
+
         meta.create_all(self.conn)
         return ct
 
@@ -717,15 +728,59 @@ class AttachedDBTest(fixtures.TestBase):
         insp = inspect(self.conn)
         eq_(insp.get_table_names("test_schema"), [])
 
+    def test_column_names(self):
+        self._fixture()
+        insp = inspect(self.conn)
+        eq_(
+            [
+                d["name"]
+                for d in insp.get_columns("created", schema="test_schema")
+            ],
+            ["id", "name"],
+        )
+        eq_(
+            [d["name"] for d in insp.get_columns("created", schema=None)],
+            ["foo", "bar"],
+        )
+
+        eq_(
+            [
+                d["name"]
+                for d in insp.get_columns("nonexistent", schema="test_schema")
+            ],
+            [],
+        )
+        eq_(
+            [
+                d["name"]
+                for d in insp.get_columns("another_created", schema=None)
+            ],
+            [],
+        )
+        eq_(
+            [
+                d["name"]
+                for d in insp.get_columns("local_only", schema="test_schema")
+            ],
+            [],
+        )
+        eq_([d["name"] for d in insp.get_columns("local_only")], ["q", "p"])
+
     def test_table_names_present(self):
         self._fixture()
         insp = inspect(self.conn)
-        eq_(insp.get_table_names("test_schema"), ["created"])
+        eq_(
+            set(insp.get_table_names("test_schema")),
+            {"created", "another_created"},
+        )
 
     def test_table_names_system(self):
         self._fixture()
         insp = inspect(self.conn)
-        eq_(insp.get_table_names("test_schema"), ["created"])
+        eq_(
+            set(insp.get_table_names("test_schema")),
+            {"created", "another_created"},
+        )
 
     def test_schema_names(self):
         self._fixture()