]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- Enabled schema support on SQLite, added the temporary table namespace to table...
authorJason Kirtland <jek@discorporate.us>
Tue, 5 Feb 2008 23:31:14 +0000 (23:31 +0000)
committerJason Kirtland <jek@discorporate.us>
Tue, 5 Feb 2008 23:31:14 +0000 (23:31 +0000)
- TODO: add sqlite to the standard alternate schema tests. a little tricky, because unlike CREATE SCHEMA, an ATTACH DATABASE won't survive a pool dispose...

CHANGES
lib/sqlalchemy/databases/sqlite.py
test/dialect/sqlite.py

diff --git a/CHANGES b/CHANGES
index af9d1d7fb83da18c0517009391a82b3b6baa7b3a..c878bb9af29228f99d467c35cd52fcd986e58562 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -198,6 +198,14 @@ CHANGES
     - Warnings are now issued as type exceptions.SAWarning.
 
 - dialects
+    - Better support for schemas in SQLite (linked in by
+      ATTACH DATABASE ... AS name).  In some cases in the
+      past, schema names were ommitted from generated SQL
+      for SQLite.  This is no longer the case.
+
+    - table_names on SQLite now picks up temporary tables
+      as well.
+
     - Auto-detect an unspecified MySQL ANSI_QUOTES mode during
       reflection operations, support for changing the mode
       midstream.  Manual mode setting is still required if no
index 939a192289d965aa8c7f5944e79699bc8087422c..984b356e02b6f26e50847228bca7ac38b40199c2 100644 (file)
@@ -249,21 +249,53 @@ class SQLiteDialect(default.DefaultDialect):
         return isinstance(e, self.dbapi.ProgrammingError) and "Cannot operate on a closed database." in str(e)
 
     def table_names(self, connection, schema):
-        s = "SELECT name FROM sqlite_master WHERE type='table'"
-        return [row[0] for row in connection.execute(s)]
+        if schema is not None:
+            qschema = self.identifier_preparer.quote_identifier(schema)
+            master = '%s.sqlite_master' % qschema
+            s = ("SELECT name FROM %s "
+                 "WHERE type='table' ORDER BY name") % (master,)
+            rs = connection.execute(s)
+        else:
+            try:
+                s = ("SELECT name FROM "
+                     " (SELECT * FROM sqlite_master UNION ALL "
+                     "  SELECT * FROM sqlite_temp_master) "
+                     "WHERE type='table' ORDER BY name")
+                rs = connection.execute(s)
+            except exceptions.DBAPIError:
+                raise
+                s = ("SELECT name FROM sqlite_master "
+                     "WHERE type='table' ORDER BY name")
+                rs = connection.execute(s)
+
+        return [row[0] for row in rs]
 
     def has_table(self, connection, table_name, schema=None):
-        cursor = connection.execute("PRAGMA table_info(%s)" %
-           self.identifier_preparer.quote_identifier(table_name), {})
+        quote = self.identifier_preparer.quote_identifier
+        if schema is not None:
+            pragma = "PRAGMA %s." % quote(schema)
+        else:
+            pragma = "PRAGMA "
+        qtable = quote(table_name)
+        cursor = connection.execute("%stable_info(%s)" % (pragma, qtable))
         row = cursor.fetchone()
 
-        # consume remaining rows, to work around: http://www.sqlite.org/cvstrac/tktview?tn=1884
-        while cursor.fetchone() is not None:pass
+        # consume remaining rows, to work around
+        # http://www.sqlite.org/cvstrac/tktview?tn=1884
+        while cursor.fetchone() is not None:
+            pass
 
         return (row is not None)
 
     def reflecttable(self, connection, table, include_columns):
-        c = connection.execute("PRAGMA table_info(%s)" % self.identifier_preparer.format_table(table), {})
+        preparer = self.identifier_preparer
+        if table.schema is None:
+            pragma = "PRAGMA "
+        else:
+            pragma = "PRAGMA %s." % preparer.quote_identifier(table.schema)
+        qtable = preparer.format_table(table, False)
+
+        c = connection.execute("%stable_info(%s)" % (pragma, qtable))
         found_table = False
         while True:
             row = c.fetchone()
@@ -302,7 +334,7 @@ class SQLiteDialect(default.DefaultDialect):
         if not found_table:
             raise exceptions.NoSuchTableError(table.name)
 
-        c = connection.execute("PRAGMA foreign_key_list(%s)" % self.identifier_preparer.format_table(table), {})
+        c = connection.execute("%sforeign_key_list(%s)" % (pragma, qtable))
         fks = {}
         while True:
             row = c.fetchone()
@@ -318,7 +350,6 @@ class SQLiteDialect(default.DefaultDialect):
                 fk = ([],[])
                 fks[constraint_name] = fk
 
-            #print "row! " + repr([key for key in row.keys()]), repr(row)
             # look up the table based on the given table's engine, not 'self',
             # since it could be a ProxyEngine
             remotetable = schema.Table(tablename, table.metadata, autoload=True, autoload_with=connection)
@@ -331,7 +362,7 @@ class SQLiteDialect(default.DefaultDialect):
         for name, value in fks.iteritems():
             table.append_constraint(schema.ForeignKeyConstraint(value[0], value[1]))
         # check for UNIQUE indexes
-        c = connection.execute("PRAGMA index_list(%s)" % self.identifier_preparer.format_table(table), {})
+        c = connection.execute("%sindex_list(%s)" % (pragma, qtable))
         unique_indexes = []
         while True:
             row = c.fetchone()
@@ -341,7 +372,7 @@ class SQLiteDialect(default.DefaultDialect):
                 unique_indexes.append(row[1])
         # loop thru unique indexes for one that includes the primary key
         for idx in unique_indexes:
-            c = connection.execute("PRAGMA index_info(" + idx + ")", {})
+            c = connection.execute("%sindex_info(%s)" % (pragma, idx))
             cols = []
             while True:
                 row = c.fetchone()
@@ -443,7 +474,7 @@ class SQLiteIdentifierPreparer(compiler.IdentifierPreparer):
         ])
 
     def __init__(self, dialect):
-        super(SQLiteIdentifierPreparer, self).__init__(dialect, omit_schema=True)
+        super(SQLiteIdentifierPreparer, self).__init__(dialect)
 
 dialect = SQLiteDialect
 dialect.poolclass = pool.SingletonThreadPool
index 5041cca89e3eab315170db4511ecdb36b9bbf7ad..70658330f88d79f320ff3e29fe010f554239d339 100644 (file)
@@ -155,6 +155,67 @@ class DialectTest(AssertMixin):
             testing.db.execute("drop table django_content_type")
 
 
+    def test_attached_as_schema(self):
+        cx = testing.db.connect()
+        try:
+            cx.execute('ATTACH DATABASE ":memory:" AS  alt_schema')
+            dialect = cx.dialect
+            assert dialect.table_names(cx, 'alt_schema') == []
+
+            meta = MetaData(cx)
+            Table('created', meta, Column('id', Integer),
+                  schema='alt_schema')
+            alt_master = Table('sqlite_master', meta, autoload=True,
+                               schema='alt_schema')
+            meta.create_all(cx)
+
+            self.assertEquals(dialect.table_names(cx, 'alt_schema'),
+                              ['created'])
+            assert len(alt_master.c) > 0
+
+            meta.clear()
+            reflected = Table('created', meta, autoload=True,
+                              schema='alt_schema')
+            assert len(reflected.c) == 1
+
+            cx.execute(reflected.insert(), dict(id=1))
+            r = cx.execute(reflected.select()).fetchall()
+            assert list(r) == [(1,)]
+
+            cx.execute(reflected.update(), dict(id=2))
+            r = cx.execute(reflected.select()).fetchall()
+            assert list(r) == [(2,)]
+
+            cx.execute(reflected.delete(reflected.c.id==2))
+            r = cx.execute(reflected.select()).fetchall()
+            assert list(r) == []
+
+            # note that sqlite_master is cleared, above
+            meta.drop_all()
+
+            assert dialect.table_names(cx, 'alt_schema') == []
+        finally:
+            cx.execute('DETACH DATABASE alt_schema')
+
+    @testing.exclude('sqlite', '<', (2, 6))
+    def test_temp_table_reflection(self):
+        cx = testing.db.connect()
+        try:
+            cx.execute('CREATE TEMPORARY TABLE tempy (id INT)')
+
+            assert 'tempy' in cx.dialect.table_names(cx, None)
+
+            meta = MetaData(cx)
+            tempy = Table('tempy', meta, autoload=True)
+            assert len(tempy.c) == 1
+            meta.drop_all()
+        except:
+            try:
+                cx.execute('DROP TABLE tempy')
+            except exceptions.DBAPIError:
+                pass
+            raise
+
 class InsertTest(AssertMixin):
     """Tests inserts and autoincrement."""