]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
refactored. tests/dialects/sqlite and tests/engine/reflection pass
authorRandall Smith <randall@tnr.cc>
Thu, 19 Feb 2009 05:40:35 +0000 (05:40 +0000)
committerRandall Smith <randall@tnr.cc>
Thu, 19 Feb 2009 05:40:35 +0000 (05:40 +0000)
lib/sqlalchemy/dialects/sqlite/base.py

index b8a1d7d71df2b9071574397dbaef471310ec5172..8b13b4dccab2722295517711be5b68d77711fb8d 100644 (file)
@@ -27,7 +27,7 @@ import datetime, re, time
 
 from sqlalchemy import sql, schema, exc, pool, DefaultClause
 from sqlalchemy.engine import default
-from sqlalchemy.engine import default
+from sqlalchemy.engine import reflection
 from sqlalchemy import types as sqltypes
 from sqlalchemy import util
 from sqlalchemy.sql import compiler, functions as sql_functions
@@ -236,6 +236,9 @@ class SQLiteIdentifierPreparer(compiler.IdentifierPreparer):
         'vacuum', 'values', 'view', 'virtual', 'when', 'where',
         ])
 
+class SQLiteInfoCache(reflection.DefaultInfoCache):
+    pass
+
 class SQLiteDialect(default.DefaultDialect):
     name = 'sqlite'
     supports_alter = False
@@ -251,6 +254,7 @@ class SQLiteDialect(default.DefaultDialect):
     preparer = SQLiteIdentifierPreparer
     ischema_names = ischema_names
     colspecs = colspecs
+    info_cache = SQLiteInfoCache
     
     def table_names(self, connection, schema):
         if schema is not None:
@@ -291,26 +295,24 @@ class SQLiteDialect(default.DefaultDialect):
 
         return (row is not None)
 
-    def reflecttable(self, connection, table, include_columns):
-        preparer = self.identifier_preparer
-        if table.schema is None:
-            pragma = "PRAGMA "
+    @reflection.caches
+    def get_columns(self, connection, tablename, schemaname=None,
+                                                        info_cache=None):
+        quote = self.identifier_preparer.quote_identifier
+        if schemaname is not None:
+            pragma = "PRAGMA %s." % quote(schemaname)
         else:
-            pragma = "PRAGMA %s." % preparer.quote_identifier(table.schema)
-        qtable = preparer.format_table(table, False)
-
+            pragma = "PRAGMA "
+        qtable = quote(tablename)
         c = connection.execute("%stable_info(%s)" % (pragma, qtable))
         found_table = False
+        columns = []
         while True:
             row = c.fetchone()
             if row is None:
                 break
-
-            found_table = True
             (name, type_, nullable, default, has_default, primary_key) = (row[1], row[2].upper(), not row[3], row[4], row[4] is not None, row[5])
             name = re.sub(r'^\"|\"$', '', name)
-            if include_columns and name not in include_columns:
-                continue
             match = re.match(r'(\w+)(\(.*?\))?', type_)
             if match:
                 coltype = match.group(1)
@@ -318,54 +320,77 @@ class SQLiteDialect(default.DefaultDialect):
             else:
                 coltype = "VARCHAR"
                 args = ''
-
             try:
                 coltype = self.ischema_names[coltype]
             except KeyError:
                 util.warn("Did not recognize type '%s' of column '%s'" %
                           (coltype, name))
                 coltype = sqltypes.NullType
-
             if args is not None:
                 args = re.findall(r'(\d+)', args)
                 coltype = coltype(*[int(a) for a in args])
-
             colargs = []
             if has_default:
                 colargs.append(DefaultClause(sql.text(default)))
-            table.append_column(schema.Column(name, coltype, primary_key = primary_key, nullable = nullable, *colargs))
-
-        if not found_table:
-            raise exc.NoSuchTableError(table.name)
-
+            columns.append({
+                'name' : name,
+                'type' : coltype,
+                'nullable' : nullable,
+                'default' : default,
+                'colargs' : colargs,
+                'primary_key': primary_key
+            })
+        return columns
+
+    @reflection.caches
+    def get_foreign_keys(self, connection, tablename, schemaname=None,
+                                                        info_cache=None):
+        quote = self.identifier_preparer.quote_identifier
+        if schemaname is not None:
+            pragma = "PRAGMA %s." % quote(schemaname)
+        else:
+            pragma = "PRAGMA "
+        qtable = quote(tablename)
         c = connection.execute("%sforeign_key_list(%s)" % (pragma, qtable))
+        fkeys = []
         fks = {}
         while True:
             row = c.fetchone()
             if row is None:
                 break
-            (constraint_name, tablename, localcol, remotecol) = (row[0], row[2], row[3], row[4])
-            tablename = re.sub(r'^\"|\"$', '', tablename)
-            localcol = re.sub(r'^\"|\"$', '', localcol)
-            remotecol = re.sub(r'^\"|\"$', '', remotecol)
+            (constraint_name, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4])
+            rtbl = re.sub(r'^\"|\"$', '', rtbl)
+            lcol = re.sub(r'^\"|\"$', '', lcol)
+            rcol = re.sub(r'^\"|\"$', '', rcol)
             try:
                 fk = fks[constraint_name]
             except KeyError:
-                fk = ([], [])
+                fk = {
+                    'name' : constraint_name,
+                    'constrained_columns' : [],
+                    'referred_schema' : None,
+                    'referred_table' : rtbl,
+                    'referred_columns' : []
+                }
+                fkeys.append(fk)
                 fks[constraint_name] = fk
 
             # look up the table based on the given table's engine, not 'self',
             # since it could be a ProxyEngine
-            remotetable = schema.Table(tablename, table.metadata, autoload=True, autoload_with=connection)
-            constrained_column = table.c[localcol].name
-            refspec = ".".join([tablename, remotecol])
-            if constrained_column not in fk[0]:
-                fk[0].append(constrained_column)
-            if refspec not in fk[1]:
-                fk[1].append(refspec)
-        for name, value in fks.iteritems():
-            table.append_constraint(schema.ForeignKeyConstraint(value[0], value[1], link_to_name=True))
-        # check for UNIQUE indexes
+            if lcol not in fk['constrained_columns']:
+                fk['constrained_columns'].append(lcol)
+            if rcol not in fk['referred_columns']:
+                fk['referred_columns'].append(rcol)
+        return fkeys
+
+    def get_unique_indexes(self, connection, tablename, schemaname=None,
+                                                            info_cache=None):
+        quote = self.identifier_preparer.quote_identifier
+        if schemaname is not None:
+            pragma = "PRAGMA %s." % quote(schemaname)
+        else:
+            pragma = "PRAGMA "
+        qtable = quote(tablename)
         c = connection.execute("%sindex_list(%s)" % (pragma, qtable))
         unique_indexes = []
         while True:
@@ -383,4 +408,43 @@ class SQLiteDialect(default.DefaultDialect):
                 if row is None:
                     break
                 cols.append(row[2])
+        return unique_indexes
+
+    def reflecttable(self, connection, table, include_columns):
+        preparer = self.identifier_preparer
+        tablename = table.name
+        schemaname = table.schema
+        found_table = False
+        info_cache = SQLiteInfoCache()
+
+        # columns
+        for column in self.get_columns(connection, tablename, schemaname,
+                                                                info_cache):
+            name = column['name']
+            coltype = column['type']
+            nullable = column['nullable']
+            default = column['default']
+            colargs = column['colargs']
+            primary_key = column['primary_key']
+            found_table = True
+            if include_columns and name not in include_columns:
+                continue
+            table.append_column(schema.Column(name, coltype, primary_key = primary_key, nullable = nullable, *colargs))
+        if not found_table:
+            raise exc.NoSuchTableError(table.name)
+
+        # foreign keys
+        for fkey_d in self.get_foreign_keys(connection, tablename, schemaname,
+                                                                   info_cache):
 
+            rtbl = fkey_d['referred_table']
+            rcols = fkey_d['referred_columns']
+            lcols = fkey_d['constrained_columns']
+            # look up the table based on the given table's engine, not 'self',
+            # since it could be a ProxyEngine
+            remotetable = schema.Table(rtbl, table.metadata, autoload=True, autoload_with=connection)
+            refspecs = ["%s.%s" % (rtbl, rcol) for rcol in rcols]
+            table.append_constraint(schema.ForeignKeyConstraint(lcols, refspecs, link_to_name=True))
+        # this doesn't do anything ???
+        unique_indexes = self.get_unique_indexes(connection, tablename, 
+                                                 schemaname, info_cache)