from sqlalchemy import sql, schema, exc, pool, DefaultClause
from sqlalchemy.engine import default
-from sqlalchemy.engine import default
+from sqlalchemy.engine import reflection
from sqlalchemy import types as sqltypes
from sqlalchemy import util
from sqlalchemy.sql import compiler, functions as sql_functions
'vacuum', 'values', 'view', 'virtual', 'when', 'where',
])
+class SQLiteInfoCache(reflection.DefaultInfoCache):
+ pass
+
class SQLiteDialect(default.DefaultDialect):
name = 'sqlite'
supports_alter = False
preparer = SQLiteIdentifierPreparer
ischema_names = ischema_names
colspecs = colspecs
+ info_cache = SQLiteInfoCache
def table_names(self, connection, schema):
if schema is not None:
return (row is not None)
- def reflecttable(self, connection, table, include_columns):
- preparer = self.identifier_preparer
- if table.schema is None:
- pragma = "PRAGMA "
+ @reflection.caches
+ def get_columns(self, connection, tablename, schemaname=None,
+ info_cache=None):
+ quote = self.identifier_preparer.quote_identifier
+ if schemaname is not None:
+ pragma = "PRAGMA %s." % quote(schemaname)
else:
- pragma = "PRAGMA %s." % preparer.quote_identifier(table.schema)
- qtable = preparer.format_table(table, False)
-
+ pragma = "PRAGMA "
+ qtable = quote(tablename)
c = connection.execute("%stable_info(%s)" % (pragma, qtable))
found_table = False
+ columns = []
while True:
row = c.fetchone()
if row is None:
break
-
- found_table = True
(name, type_, nullable, default, has_default, primary_key) = (row[1], row[2].upper(), not row[3], row[4], row[4] is not None, row[5])
name = re.sub(r'^\"|\"$', '', name)
- if include_columns and name not in include_columns:
- continue
match = re.match(r'(\w+)(\(.*?\))?', type_)
if match:
coltype = match.group(1)
else:
coltype = "VARCHAR"
args = ''
-
try:
coltype = self.ischema_names[coltype]
except KeyError:
util.warn("Did not recognize type '%s' of column '%s'" %
(coltype, name))
coltype = sqltypes.NullType
-
if args is not None:
args = re.findall(r'(\d+)', args)
coltype = coltype(*[int(a) for a in args])
-
colargs = []
if has_default:
colargs.append(DefaultClause(sql.text(default)))
- table.append_column(schema.Column(name, coltype, primary_key = primary_key, nullable = nullable, *colargs))
-
- if not found_table:
- raise exc.NoSuchTableError(table.name)
-
+ columns.append({
+ 'name' : name,
+ 'type' : coltype,
+ 'nullable' : nullable,
+ 'default' : default,
+ 'colargs' : colargs,
+ 'primary_key': primary_key
+ })
+ return columns
+
+ @reflection.caches
+ def get_foreign_keys(self, connection, tablename, schemaname=None,
+ info_cache=None):
+ quote = self.identifier_preparer.quote_identifier
+ if schemaname is not None:
+ pragma = "PRAGMA %s." % quote(schemaname)
+ else:
+ pragma = "PRAGMA "
+ qtable = quote(tablename)
c = connection.execute("%sforeign_key_list(%s)" % (pragma, qtable))
+ fkeys = []
fks = {}
while True:
row = c.fetchone()
if row is None:
break
- (constraint_name, tablename, localcol, remotecol) = (row[0], row[2], row[3], row[4])
- tablename = re.sub(r'^\"|\"$', '', tablename)
- localcol = re.sub(r'^\"|\"$', '', localcol)
- remotecol = re.sub(r'^\"|\"$', '', remotecol)
+ (constraint_name, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4])
+ rtbl = re.sub(r'^\"|\"$', '', rtbl)
+ lcol = re.sub(r'^\"|\"$', '', lcol)
+ rcol = re.sub(r'^\"|\"$', '', rcol)
try:
fk = fks[constraint_name]
except KeyError:
- fk = ([], [])
+ fk = {
+ 'name' : constraint_name,
+ 'constrained_columns' : [],
+ 'referred_schema' : None,
+ 'referred_table' : rtbl,
+ 'referred_columns' : []
+ }
+ fkeys.append(fk)
fks[constraint_name] = fk
# look up the table based on the given table's engine, not 'self',
# since it could be a ProxyEngine
- remotetable = schema.Table(tablename, table.metadata, autoload=True, autoload_with=connection)
- constrained_column = table.c[localcol].name
- refspec = ".".join([tablename, remotecol])
- if constrained_column not in fk[0]:
- fk[0].append(constrained_column)
- if refspec not in fk[1]:
- fk[1].append(refspec)
- for name, value in fks.iteritems():
- table.append_constraint(schema.ForeignKeyConstraint(value[0], value[1], link_to_name=True))
- # check for UNIQUE indexes
+ if lcol not in fk['constrained_columns']:
+ fk['constrained_columns'].append(lcol)
+ if rcol not in fk['referred_columns']:
+ fk['referred_columns'].append(rcol)
+ return fkeys
+
+ def get_unique_indexes(self, connection, tablename, schemaname=None,
+ info_cache=None):
+ quote = self.identifier_preparer.quote_identifier
+ if schemaname is not None:
+ pragma = "PRAGMA %s." % quote(schemaname)
+ else:
+ pragma = "PRAGMA "
+ qtable = quote(tablename)
c = connection.execute("%sindex_list(%s)" % (pragma, qtable))
unique_indexes = []
while True:
if row is None:
break
cols.append(row[2])
+ return unique_indexes
+
+ def reflecttable(self, connection, table, include_columns):
+ preparer = self.identifier_preparer
+ tablename = table.name
+ schemaname = table.schema
+ found_table = False
+ info_cache = SQLiteInfoCache()
+
+ # columns
+ for column in self.get_columns(connection, tablename, schemaname,
+ info_cache):
+ name = column['name']
+ coltype = column['type']
+ nullable = column['nullable']
+ default = column['default']
+ colargs = column['colargs']
+ primary_key = column['primary_key']
+ found_table = True
+ if include_columns and name not in include_columns:
+ continue
+ table.append_column(schema.Column(name, coltype, primary_key = primary_key, nullable = nullable, *colargs))
+ if not found_table:
+ raise exc.NoSuchTableError(table.name)
+
+ # foreign keys
+ for fkey_d in self.get_foreign_keys(connection, tablename, schemaname,
+ info_cache):
+ rtbl = fkey_d['referred_table']
+ rcols = fkey_d['referred_columns']
+ lcols = fkey_d['constrained_columns']
+ # look up the table based on the given table's engine, not 'self',
+ # since it could be a ProxyEngine
+ remotetable = schema.Table(rtbl, table.metadata, autoload=True, autoload_with=connection)
+ refspecs = ["%s.%s" % (rtbl, rcol) for rcol in rcols]
+ table.append_constraint(schema.ForeignKeyConstraint(lcols, refspecs, link_to_name=True))
+ # this doesn't do anything ???
+ unique_indexes = self.get_unique_indexes(connection, tablename,
+ schemaname, info_cache)