import datetime, re, time
-from sqlalchemy import sql, schema, exc, pool, DefaultClause
+from sqlalchemy import schema as sa_schema
+from sqlalchemy import sql, exc, pool, DefaultClause
from sqlalchemy.engine import default
from sqlalchemy.engine import reflection
from sqlalchemy import types as sqltypes
return (row is not None)
@reflection.cache
- def get_columns(self, connection, tablename, schemaname=None,
- info_cache=None):
+ def get_table_names(self, connection, schema=None, **kw):
+ return self.table_names(connection, schema)
+
+ @reflection.cache
+ def get_view_names(self, connection, schema=None, **kw):
+ if schema is not None:
+ qschema = self.identifier_preparer.quote_identifier(schema)
+ master = '%s.sqlite_master' % qschema
+ s = ("SELECT name FROM %s "
+ "WHERE type='view' ORDER BY name") % (master,)
+ rs = connection.execute(s)
+ else:
+ try:
+ s = ("SELECT name FROM "
+ " (SELECT * FROM sqlite_master UNION ALL "
+ " SELECT * FROM sqlite_temp_master) "
+ "WHERE type='view' ORDER BY name")
+ rs = connection.execute(s)
+ except exc.DBAPIError:
+ raise
+ s = ("SELECT name FROM sqlite_master "
+ "WHERE type='view' ORDER BY name")
+ rs = connection.execute(s)
+
+ return [row[0] for row in rs]
+
+ @reflection.cache
+ def get_view_definition(self, connection, view_name, schema=None, **kw):
quote = self.identifier_preparer.quote_identifier
- if schemaname is not None:
- pragma = "PRAGMA %s." % quote(schemaname)
+ if schema is not None:
+ qschema = self.identifier_preparer.quote_identifier(schema)
+ master = '%s.sqlite_master' % qschema
+ s = ("SELECT sql FROM %s WHERE name = '%s'"
+ "AND type='view'") % (master, view_name)
+ rs = connection.execute(s)
+ else:
+ try:
+ s = ("SELECT sql FROM "
+ " (SELECT * FROM sqlite_master UNION ALL "
+ " SELECT * FROM sqlite_temp_master) "
+ "WHERE name = '%s' "
+ "AND type='view'") % view_name
+ rs = connection.execute(s)
+ except exc.DBAPIError:
+ raise
+ s = ("SELECT sql FROM sqlite_master WHERE name = '%s' "
+ "AND type='view'") % view_name
+ rs = connection.execute(s)
+
+ result = rs.fetchall()
+ if result:
+ return result[0].sql
+
+ @reflection.cache
+ def get_columns(self, connection, table_name, schema=None, **kw):
+ quote = self.identifier_preparer.quote_identifier
+ if schema is not None:
+ pragma = "PRAGMA %s." % quote(schema)
else:
pragma = "PRAGMA "
- qtable = quote(tablename)
+ qtable = quote(table_name)
c = connection.execute("%stable_info(%s)" % (pragma, qtable))
found_table = False
columns = []
return columns
@reflection.cache
- def get_foreign_keys(self, connection, tablename, schemaname=None,
- info_cache=None):
+ def get_primary_keys(self, connection, table_name, schema=None, **kw):
+ cols = self.get_columns(connection, table_name, schema, **kw)
+ pkeys = []
+ for col in cols:
+ if col['primary_key']:
+ pkeys.append(col['name'])
+ return pkeys
+
+ @reflection.cache
+ def get_foreign_keys(self, connection, table_name, schema=None, **kw):
quote = self.identifier_preparer.quote_identifier
- if schemaname is not None:
- pragma = "PRAGMA %s." % quote(schemaname)
+ if schema is not None:
+ pragma = "PRAGMA %s." % quote(schema)
else:
pragma = "PRAGMA "
- qtable = quote(tablename)
+ qtable = quote(table_name)
c = connection.execute("%sforeign_key_list(%s)" % (pragma, qtable))
fkeys = []
fks = {}
fk['referred_columns'].append(rcol)
return fkeys
- def get_unique_indexes(self, connection, tablename, schemaname=None,
- info_cache=None):
+ @reflection.cache
+ def get_indexes(self, connection, table_name, schema=None, **kw):
quote = self.identifier_preparer.quote_identifier
- if schemaname is not None:
- pragma = "PRAGMA %s." % quote(schemaname)
+ if schema is not None:
+ pragma = "PRAGMA %s." % quote(schema)
else:
pragma = "PRAGMA "
- qtable = quote(tablename)
+ qtable = quote(table_name)
+ c = connection.execute("%sindex_list(%s)" % (pragma, qtable))
+ indexes = []
+ while True:
+ row = c.fetchone()
+ if row is None:
+ break
+ indexes.append(dict(name=row[1], column_names=[], unique=row[2]))
+ # loop thru unique indexes to get the column names.
+ for idx in indexes:
+ c = connection.execute("%sindex_info(%s)" % (pragma, idx['name']))
+ cols = idx['column_names']
+ while True:
+ row = c.fetchone()
+ if row is None:
+ break
+ cols.append(row[2])
+ return indexes
+
+ def get_unique_indexes(self, connection, table_name, schema=None, **kw):
+ quote = self.identifier_preparer.quote_identifier
+ if schema is not None:
+ pragma = "PRAGMA %s." % quote(schema)
+ else:
+ pragma = "PRAGMA "
+ qtable = quote(table_name)
c = connection.execute("%sindex_list(%s)" % (pragma, qtable))
unique_indexes = []
while True:
def reflecttable(self, connection, table, include_columns):
preparer = self.identifier_preparer
- tablename = table.name
- schemaname = table.schema
+ table_name = table.name
+ schema = table.schema
found_table = False
- info_cache = SQLiteInfoCache()
+
+ info_cache = {}
# columns
- for column in self.get_columns(connection, tablename, schemaname,
- info_cache):
+ for column in self.get_columns(connection, table_name, schema,
+ info_cache=info_cache):
name = column['name']
coltype = column['type']
nullable = column['nullable']
found_table = True
if include_columns and name not in include_columns:
continue
- table.append_column(schema.Column(name, coltype, primary_key = primary_key, nullable = nullable, *colargs))
+ table.append_column(sa_schema.Column(name, coltype, primary_key = primary_key, nullable = nullable, *colargs))
if not found_table:
raise exc.NoSuchTableError(table.name)
# foreign keys
- for fkey_d in self.get_foreign_keys(connection, tablename, schemaname,
- info_cache):
+ for fkey_d in self.get_foreign_keys(connection, table_name, schema,
+ info_cache=info_cache):
rtbl = fkey_d['referred_table']
rcols = fkey_d['referred_columns']
lcols = fkey_d['constrained_columns']
# look up the table based on the given table's engine, not 'self',
# since it could be a ProxyEngine
- remotetable = schema.Table(rtbl, table.metadata, autoload=True, autoload_with=connection)
+ remotetable = sa_schema.Table(rtbl, table.metadata, autoload=True, autoload_with=connection)
refspecs = ["%s.%s" % (rtbl, rcol) for rcol in rcols]
- table.append_constraint(schema.ForeignKeyConstraint(lcols, refspecs, link_to_name=True))
+ table.append_constraint(sa_schema.ForeignKeyConstraint(lcols, refspecs, link_to_name=True))
# this doesn't do anything ???
- unique_indexes = self.get_unique_indexes(connection, tablename,
- schemaname, info_cache)
+ unique_indexes = self.get_unique_indexes(connection, table_name,
+ schema, info_cache=info_cache)