--- /dev/null
+.. change:
+ :tags: bug, sqlite
+ :tickets: 4793
+
+ Fixed bug where usage of "PRAGMA table_info" in SQLite dialect meant that
+ reflection features to detect for table existence, list of table columns,
+ and list of foreign keys, would default to any table in any attached
+ database, when no schema name was given and the table did not exist in the
+ base schema. The fix explicitly runs PRAGMA for the 'main' schema and then
+ the 'temp' schema if the 'main' returned no rows, to maintain the behavior
+ of tables + temp tables in the "no schema" namespace, attached tables only
+ in the "schema" namespace.
+
>>> Base.metadata.create_all(engine)
SELECT ...
- PRAGMA table_info("users")
+ PRAGMA main.table_info("users")
+ ()
+ PRAGMA temp.table_info("users")
()
CREATE TABLE users (
id INTEGER NOT NULL, name VARCHAR,
def _get_table_pragma(self, connection, pragma, table_name, schema=None):
quote = self.identifier_preparer.quote_identifier
if schema is not None:
- statement = "PRAGMA %s." % quote(schema)
+ statements = ["PRAGMA %s." % quote(schema)]
else:
- statement = "PRAGMA "
+ # because PRAGMA looks in all attached databases if no schema
+ # given, need to specify "main" schema, however since we want
+ # 'temp' tables in the same namespace as 'main', need to run
+ # the PRAGMA twice
+ statements = ["PRAGMA main.", "PRAGMA temp."]
+
qtable = quote(table_name)
- statement = "%s%s(%s)" % (statement, pragma, qtable)
- cursor = connection.execute(statement)
- if not cursor._soft_closed:
- # work around SQLite issue whereby cursor.description
- # is blank when PRAGMA returns no rows:
- # http://www.sqlite.org/cvstrac/tktview?tn=1884
- result = cursor.fetchall()
+ for statement in statements:
+ statement = "%s%s(%s)" % (statement, pragma, qtable)
+ cursor = connection.execute(statement)
+ if not cursor._soft_closed:
+ # work around SQLite issue whereby cursor.description
+ # is blank when PRAGMA returns no rows:
+ # http://www.sqlite.org/cvstrac/tktview?tn=1884
+ result = cursor.fetchall()
+ else:
+ result = []
+ if result:
+ return result
else:
- result = []
- return result
+ return []
from ...schema import DDL
from ...schema import Index
from ...sql.elements import quoted_name
+from ...testing import is_false
+from ...testing import is_true
metadata, users = None, None
Column("id", Integer, primary_key=True),
Column("data", String(50)),
)
+ if testing.requires.schemas.enabled:
+ Table(
+ "test_table_s",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("data", String(50)),
+ schema=config.test_schema,
+ )
def test_has_table(self):
with config.db.begin() as conn:
- assert config.db.dialect.has_table(conn, "test_table")
- assert not config.db.dialect.has_table(conn, "nonexistent_table")
+ is_true(config.db.dialect.has_table(conn, "test_table"))
+ is_false(config.db.dialect.has_table(conn, "test_table_s"))
+ is_false(config.db.dialect.has_table(conn, "nonexistent_table"))
+
+ @testing.requires.schemas
+ def test_has_table_schema(self):
+ with config.db.begin() as conn:
+ is_false(
+ config.db.dialect.has_table(
+ conn, "test_table", schema=config.test_schema
+ )
+ )
+ is_true(
+ config.db.dialect.has_table(
+ conn, "test_table_s", schema=config.test_schema
+ )
+ )
+ is_false(
+ config.db.dialect.has_table(
+ conn, "nonexistent_table", schema=config.test_schema
+ )
+ )
class ComponentReflectionTest(fixtures.TablesTest):
def _fixture(self):
meta = self.metadata
self.conn = testing.db.connect()
+ Table("created", meta, Column("foo", Integer), Column("bar", String))
+ Table("local_only", meta, Column("q", Integer), Column("p", Integer))
+
ct = Table(
"created",
meta,
schema="test_schema",
)
+ Table(
+ "another_created",
+ meta,
+ Column("bat", Integer),
+ Column("hoho", String),
+ schema="test_schema",
+ )
+
meta.create_all(self.conn)
return ct
insp = inspect(self.conn)
eq_(insp.get_table_names("test_schema"), [])
+ def test_column_names(self):
+ self._fixture()
+ insp = inspect(self.conn)
+ eq_(
+ [
+ d["name"]
+ for d in insp.get_columns("created", schema="test_schema")
+ ],
+ ["id", "name"],
+ )
+ eq_(
+ [d["name"] for d in insp.get_columns("created", schema=None)],
+ ["foo", "bar"],
+ )
+
+ eq_(
+ [
+ d["name"]
+ for d in insp.get_columns("nonexistent", schema="test_schema")
+ ],
+ [],
+ )
+ eq_(
+ [
+ d["name"]
+ for d in insp.get_columns("another_created", schema=None)
+ ],
+ [],
+ )
+ eq_(
+ [
+ d["name"]
+ for d in insp.get_columns("local_only", schema="test_schema")
+ ],
+ [],
+ )
+ eq_([d["name"] for d in insp.get_columns("local_only")], ["q", "p"])
+
def test_table_names_present(self):
self._fixture()
insp = inspect(self.conn)
- eq_(insp.get_table_names("test_schema"), ["created"])
+ eq_(
+ set(insp.get_table_names("test_schema")),
+ {"created", "another_created"},
+ )
def test_table_names_system(self):
self._fixture()
insp = inspect(self.conn)
- eq_(insp.get_table_names("test_schema"), ["created"])
+ eq_(
+ set(insp.get_table_names("test_schema")),
+ {"created", "another_created"},
+ )
def test_schema_names(self):
self._fixture()