From: Randall Smith Date: Thu, 19 Feb 2009 05:40:35 +0000 (+0000) Subject: refactored. tests/dialects/sqlite and tests/engine/reflection pass X-Git-Tag: rel_0_6_6~276 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=734fef3af6eca34b3ec25721e4791ebea155a3c7;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git refactored. tests/dialects/sqlite and tests/engine/reflection pass --- diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py index b8a1d7d71d..8b13b4dcca 100644 --- a/lib/sqlalchemy/dialects/sqlite/base.py +++ b/lib/sqlalchemy/dialects/sqlite/base.py @@ -27,7 +27,7 @@ import datetime, re, time from sqlalchemy import sql, schema, exc, pool, DefaultClause from sqlalchemy.engine import default -from sqlalchemy.engine import default +from sqlalchemy.engine import reflection from sqlalchemy import types as sqltypes from sqlalchemy import util from sqlalchemy.sql import compiler, functions as sql_functions @@ -236,6 +236,9 @@ class SQLiteIdentifierPreparer(compiler.IdentifierPreparer): 'vacuum', 'values', 'view', 'virtual', 'when', 'where', ]) +class SQLiteInfoCache(reflection.DefaultInfoCache): + pass + class SQLiteDialect(default.DefaultDialect): name = 'sqlite' supports_alter = False @@ -251,6 +254,7 @@ class SQLiteDialect(default.DefaultDialect): preparer = SQLiteIdentifierPreparer ischema_names = ischema_names colspecs = colspecs + info_cache = SQLiteInfoCache def table_names(self, connection, schema): if schema is not None: @@ -291,26 +295,24 @@ class SQLiteDialect(default.DefaultDialect): return (row is not None) - def reflecttable(self, connection, table, include_columns): - preparer = self.identifier_preparer - if table.schema is None: - pragma = "PRAGMA " + @reflection.caches + def get_columns(self, connection, tablename, schemaname=None, + info_cache=None): + quote = self.identifier_preparer.quote_identifier + if schemaname is not None: + pragma = "PRAGMA %s." % quote(schemaname) else: - pragma = "PRAGMA %s." % preparer.quote_identifier(table.schema) - qtable = preparer.format_table(table, False) - + pragma = "PRAGMA " + qtable = quote(tablename) c = connection.execute("%stable_info(%s)" % (pragma, qtable)) found_table = False + columns = [] while True: row = c.fetchone() if row is None: break - - found_table = True (name, type_, nullable, default, has_default, primary_key) = (row[1], row[2].upper(), not row[3], row[4], row[4] is not None, row[5]) name = re.sub(r'^\"|\"$', '', name) - if include_columns and name not in include_columns: - continue match = re.match(r'(\w+)(\(.*?\))?', type_) if match: coltype = match.group(1) @@ -318,54 +320,77 @@ class SQLiteDialect(default.DefaultDialect): else: coltype = "VARCHAR" args = '' - try: coltype = self.ischema_names[coltype] except KeyError: util.warn("Did not recognize type '%s' of column '%s'" % (coltype, name)) coltype = sqltypes.NullType - if args is not None: args = re.findall(r'(\d+)', args) coltype = coltype(*[int(a) for a in args]) - colargs = [] if has_default: colargs.append(DefaultClause(sql.text(default))) - table.append_column(schema.Column(name, coltype, primary_key = primary_key, nullable = nullable, *colargs)) - - if not found_table: - raise exc.NoSuchTableError(table.name) - + columns.append({ + 'name' : name, + 'type' : coltype, + 'nullable' : nullable, + 'default' : default, + 'colargs' : colargs, + 'primary_key': primary_key + }) + return columns + + @reflection.caches + def get_foreign_keys(self, connection, tablename, schemaname=None, + info_cache=None): + quote = self.identifier_preparer.quote_identifier + if schemaname is not None: + pragma = "PRAGMA %s." % quote(schemaname) + else: + pragma = "PRAGMA " + qtable = quote(tablename) c = connection.execute("%sforeign_key_list(%s)" % (pragma, qtable)) + fkeys = [] fks = {} while True: row = c.fetchone() if row is None: break - (constraint_name, tablename, localcol, remotecol) = (row[0], row[2], row[3], row[4]) - tablename = re.sub(r'^\"|\"$', '', tablename) - localcol = re.sub(r'^\"|\"$', '', localcol) - remotecol = re.sub(r'^\"|\"$', '', remotecol) + (constraint_name, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4]) + rtbl = re.sub(r'^\"|\"$', '', rtbl) + lcol = re.sub(r'^\"|\"$', '', lcol) + rcol = re.sub(r'^\"|\"$', '', rcol) try: fk = fks[constraint_name] except KeyError: - fk = ([], []) + fk = { + 'name' : constraint_name, + 'constrained_columns' : [], + 'referred_schema' : None, + 'referred_table' : rtbl, + 'referred_columns' : [] + } + fkeys.append(fk) fks[constraint_name] = fk # look up the table based on the given table's engine, not 'self', # since it could be a ProxyEngine - remotetable = schema.Table(tablename, table.metadata, autoload=True, autoload_with=connection) - constrained_column = table.c[localcol].name - refspec = ".".join([tablename, remotecol]) - if constrained_column not in fk[0]: - fk[0].append(constrained_column) - if refspec not in fk[1]: - fk[1].append(refspec) - for name, value in fks.iteritems(): - table.append_constraint(schema.ForeignKeyConstraint(value[0], value[1], link_to_name=True)) - # check for UNIQUE indexes + if lcol not in fk['constrained_columns']: + fk['constrained_columns'].append(lcol) + if rcol not in fk['referred_columns']: + fk['referred_columns'].append(rcol) + return fkeys + + def get_unique_indexes(self, connection, tablename, schemaname=None, + info_cache=None): + quote = self.identifier_preparer.quote_identifier + if schemaname is not None: + pragma = "PRAGMA %s." % quote(schemaname) + else: + pragma = "PRAGMA " + qtable = quote(tablename) c = connection.execute("%sindex_list(%s)" % (pragma, qtable)) unique_indexes = [] while True: @@ -383,4 +408,43 @@ class SQLiteDialect(default.DefaultDialect): if row is None: break cols.append(row[2]) + return unique_indexes + + def reflecttable(self, connection, table, include_columns): + preparer = self.identifier_preparer + tablename = table.name + schemaname = table.schema + found_table = False + info_cache = SQLiteInfoCache() + + # columns + for column in self.get_columns(connection, tablename, schemaname, + info_cache): + name = column['name'] + coltype = column['type'] + nullable = column['nullable'] + default = column['default'] + colargs = column['colargs'] + primary_key = column['primary_key'] + found_table = True + if include_columns and name not in include_columns: + continue + table.append_column(schema.Column(name, coltype, primary_key = primary_key, nullable = nullable, *colargs)) + if not found_table: + raise exc.NoSuchTableError(table.name) + + # foreign keys + for fkey_d in self.get_foreign_keys(connection, tablename, schemaname, + info_cache): + rtbl = fkey_d['referred_table'] + rcols = fkey_d['referred_columns'] + lcols = fkey_d['constrained_columns'] + # look up the table based on the given table's engine, not 'self', + # since it could be a ProxyEngine + remotetable = schema.Table(rtbl, table.metadata, autoload=True, autoload_with=connection) + refspecs = ["%s.%s" % (rtbl, rcol) for rcol in rcols] + table.append_constraint(schema.ForeignKeyConstraint(lcols, refspecs, link_to_name=True)) + # this doesn't do anything ??? + unique_indexes = self.get_unique_indexes(connection, tablename, + schemaname, info_cache)