From 468db416dbf284f0e7dddde90ec9641dc89428c6 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Sat, 13 Dec 2014 18:04:11 -0500 Subject: [PATCH] - rework sqlite FK and unique constraint system to combine both PRAGMA and regexp parsing of SQL in order to form a complete picture of constraints + their names. fixes #3244 fixes #3261 - factor various PRAGMA work to be centralized into one call --- doc/build/changelog/changelog_09.rst | 9 - doc/build/changelog/changelog_10.rst | 9 + doc/build/changelog/migration_10.rst | 19 ++ lib/sqlalchemy/dialects/sqlite/base.py | 299 +++++++++++------- test/dialect/test_sqlite.py | 414 +++++++++++++++++-------- 5 files changed, 506 insertions(+), 244 deletions(-) diff --git a/doc/build/changelog/changelog_09.rst b/doc/build/changelog/changelog_09.rst index f83afd2da4..4198279594 100644 --- a/doc/build/changelog/changelog_09.rst +++ b/doc/build/changelog/changelog_09.rst @@ -59,15 +59,6 @@ replaced, however if the mapping were already used for querying, the old relationship would still be referenced within some registries. - .. change:: - :tags: bug, sqlite - :versions: 1.0.0 - :tickets: 3244 - - Fixed issue where un-named UNIQUE constraints were not being - reflected in SQLite. Now un-named UNIQUE constraints are returned - with a name of u''. - .. change:: :tags: bug, sql :versions: 1.0.0 diff --git a/doc/build/changelog/changelog_10.rst b/doc/build/changelog/changelog_10.rst index d6f36e97ea..4da7b94568 100644 --- a/doc/build/changelog/changelog_10.rst +++ b/doc/build/changelog/changelog_10.rst @@ -22,6 +22,15 @@ series as well. For changes that are specific to 1.0 with an emphasis on compatibility concerns, see :doc:`/changelog/migration_10`. + .. change:: + :tags: bug, sqlite + :tickets: 3244, 3261 + + UNIQUE and FOREIGN KEY constraints are now fully reflected on + SQLite both with and without names. Previously, foreign key + names were ignored and unnamed unique constraints were skipped. + Thanks to Jon Nelson for assistance with this. + .. change:: :tags: feature, examples diff --git a/doc/build/changelog/migration_10.rst b/doc/build/changelog/migration_10.rst index cd5d420e53..e1fb13662b 100644 --- a/doc/build/changelog/migration_10.rst +++ b/doc/build/changelog/migration_10.rst @@ -1680,6 +1680,25 @@ reflection from temp tables as well, which is :ticket:`3203`. :ticket:`3204` +SQLite named and unnamed UNIQUE and FOREIGN KEY constraints will inspect and reflect +------------------------------------------------------------------------------------- + +UNIQUE and FOREIGN KEY constraints are now fully reflected on +SQLite both with and without names. Previously, foreign key +names were ignored and unnamed unique constraints were skipped. In particular +this will help with Alembic's new SQLite migration features. + +To achieve this, for both foreign keys and unique constraints, the result +of PRAGMA foreign_keys, index_list, and index_info is combined with regular +expression parsing of the CREATE TABLE statement overall to form a complete +picture of the names of constraints, as well as differentiating UNIQUE +constraints that were created as UNIQUE vs. unnamed INDEXes. + +:ticket:`3244` + +:ticket:`3261` + + .. _change_3220: Improved support for CTEs in Oracle diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py index 30d8a6ea33..e79299527a 100644 --- a/lib/sqlalchemy/dialects/sqlite/base.py +++ b/lib/sqlalchemy/dialects/sqlite/base.py @@ -913,22 +913,9 @@ class SQLiteDialect(default.DefaultDialect): return [row[0] for row in rs] def has_table(self, connection, table_name, schema=None): - quote = self.identifier_preparer.quote_identifier - if schema is not None: - pragma = "PRAGMA %s." % quote(schema) - else: - pragma = "PRAGMA " - qtable = quote(table_name) - statement = "%stable_info(%s)" % (pragma, qtable) - cursor = _pragma_cursor(connection.execute(statement)) - row = cursor.fetchone() - - # consume remaining rows, to work around - # http://www.sqlite.org/cvstrac/tktview?tn=1884 - while not cursor.closed and cursor.fetchone() is not None: - pass - - return row is not None + info = self._get_table_pragma( + connection, "table_info", table_name, schema=schema) + return bool(info) @reflection.cache def get_view_names(self, connection, schema=None, **kw): @@ -970,18 +957,11 @@ class SQLiteDialect(default.DefaultDialect): @reflection.cache def get_columns(self, connection, table_name, schema=None, **kw): - quote = self.identifier_preparer.quote_identifier - if schema is not None: - pragma = "PRAGMA %s." % quote(schema) - else: - pragma = "PRAGMA " - qtable = quote(table_name) - statement = "%stable_info(%s)" % (pragma, qtable) - c = _pragma_cursor(connection.execute(statement)) + info = self._get_table_pragma( + connection, "table_info", table_name, schema=schema) - rows = c.fetchall() columns = [] - for row in rows: + for row in info: (name, type_, nullable, default, primary_key) = ( row[1], row[2].upper(), not row[3], row[4], row[5]) @@ -1068,92 +1048,192 @@ class SQLiteDialect(default.DefaultDialect): @reflection.cache def get_foreign_keys(self, connection, table_name, schema=None, **kw): - quote = self.identifier_preparer.quote_identifier - if schema is not None: - pragma = "PRAGMA %s." % quote(schema) - else: - pragma = "PRAGMA " - qtable = quote(table_name) - statement = "%sforeign_key_list(%s)" % (pragma, qtable) - c = _pragma_cursor(connection.execute(statement)) - fkeys = [] + # sqlite makes this *extremely difficult*. + # First, use the pragma to get the actual FKs. + pragma_fks = self._get_table_pragma( + connection, "foreign_key_list", + table_name, schema=schema + ) + fks = {} - while True: - row = c.fetchone() - if row is None: - break + + for row in pragma_fks: (numerical_id, rtbl, lcol, rcol) = ( row[0], row[2], row[3], row[4]) - self._parse_fk(fks, fkeys, numerical_id, rtbl, lcol, rcol) - return fkeys + if rcol is None: + rcol = lcol - def _parse_fk(self, fks, fkeys, numerical_id, rtbl, lcol, rcol): - # sqlite won't return rcol if the table was created with REFERENCES - # , no col - if rcol is None: - rcol = lcol + if self._broken_fk_pragma_quotes: + rtbl = re.sub(r'^[\"\[`\']|[\"\]`\']$', '', rtbl) - if self._broken_fk_pragma_quotes: - rtbl = re.sub(r'^[\"\[`\']|[\"\]`\']$', '', rtbl) + if numerical_id in fks: + fk = fks[numerical_id] + else: + fk = fks[numerical_id] = { + 'name': None, + 'constrained_columns': [], + 'referred_schema': None, + 'referred_table': rtbl, + 'referred_columns': [], + } + fks[numerical_id] = fk - try: - fk = fks[numerical_id] - except KeyError: - fk = { - 'name': None, - 'constrained_columns': [], - 'referred_schema': None, - 'referred_table': rtbl, - 'referred_columns': [], - } - fkeys.append(fk) - fks[numerical_id] = fk - - if lcol not in fk['constrained_columns']: fk['constrained_columns'].append(lcol) - if rcol not in fk['referred_columns']: fk['referred_columns'].append(rcol) - return fk + + def fk_sig(constrained_columns, referred_table, referred_columns): + return tuple(constrained_columns) + (referred_table,) + \ + tuple(referred_columns) + + # then, parse the actual SQL and attempt to find DDL that matches + # the names as well. SQLite saves the DDL in whatever format + # it was typed in as, so need to be liberal here. + + keys_by_signature = dict( + ( + fk_sig( + fk['constrained_columns'], + fk['referred_table'], fk['referred_columns']), + fk + ) for fk in fks.values() + ) + + table_data = self._get_table_sql(connection, table_name, schema=schema) + if table_data is None: + # system tables, etc. + return [] + + def parse_fks(): + FK_PATTERN = ( + '(?:CONSTRAINT (\w+) +)?' + 'FOREIGN KEY *\( *(.+?) *\) +' + 'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\((.+?)\)' + ) + + for match in re.finditer(FK_PATTERN, table_data, re.I): + ( + constraint_name, constrained_columns, + referred_quoted_name, referred_name, + referred_columns) = match.group(1, 2, 3, 4, 5) + constrained_columns = list( + self._find_cols_in_sig(constrained_columns)) + if not referred_columns: + referred_columns = constrained_columns + else: + referred_columns = list( + self._find_cols_in_sig(referred_columns)) + referred_name = referred_quoted_name or referred_name + yield ( + constraint_name, constrained_columns, + referred_name, referred_columns) + fkeys = [] + + for ( + constraint_name, constrained_columns, + referred_name, referred_columns) in parse_fks(): + sig = fk_sig( + constrained_columns, referred_name, referred_columns) + if sig not in keys_by_signature: + util.warn( + "WARNING: SQL-parsed foreign key constraint " + "'%s' could not be located in PRAGMA " + "foreign_keys for table %s" % ( + sig, + table_name + )) + continue + key = keys_by_signature.pop(sig) + key['name'] = constraint_name + fkeys.append(key) + # assume the remainders are the unnamed, inline constraints, just + # use them as is as it's extremely difficult to parse inline + # constraints + fkeys.extend(keys_by_signature.values()) + return fkeys + + def _find_cols_in_sig(self, sig): + for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I): + yield match.group(1) or match.group(2) + + @reflection.cache + def get_unique_constraints(self, connection, table_name, + schema=None, **kw): + + auto_index_by_sig = {} + for idx in self.get_indexes( + connection, table_name, schema=schema, + include_auto_indexes=True, **kw): + if not idx['name'].startswith("sqlite_autoindex"): + continue + sig = tuple(idx['column_names']) + auto_index_by_sig[sig] = idx + + table_data = self._get_table_sql( + connection, table_name, schema=schema, **kw) + if not table_data: + return [] + + unique_constraints = [] + + def parse_uqs(): + UNIQUE_PATTERN = '(?:CONSTRAINT (\w+) +)?UNIQUE *\((.+?)\)' + INLINE_UNIQUE_PATTERN = ( + '(?:(".+?")|([a-z0-9]+)) ' + '+[a-z0-9_ ]+? +UNIQUE') + + for match in re.finditer(UNIQUE_PATTERN, table_data, re.I): + name, cols = match.group(1, 2) + yield name, list(self._find_cols_in_sig(cols)) + + # we need to match inlines as well, as we seek to differentiate + # a UNIQUE constraint from a UNIQUE INDEX, even though these + # are kind of the same thing :) + for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I): + cols = list( + self._find_cols_in_sig(match.group(1) or match.group(2))) + yield None, cols + + for name, cols in parse_uqs(): + sig = tuple(cols) + if sig in auto_index_by_sig: + auto_index_by_sig.pop(sig) + parsed_constraint = { + 'name': name, + 'column_names': cols + } + unique_constraints.append(parsed_constraint) + # NOTE: auto_index_by_sig might not be empty here, + # the PRIMARY KEY may have an entry. + return unique_constraints @reflection.cache def get_indexes(self, connection, table_name, schema=None, **kw): - quote = self.identifier_preparer.quote_identifier - if schema is not None: - pragma = "PRAGMA %s." % quote(schema) - else: - pragma = "PRAGMA " - include_auto_indexes = kw.pop('include_auto_indexes', False) - qtable = quote(table_name) - statement = "%sindex_list(%s)" % (pragma, qtable) - c = _pragma_cursor(connection.execute(statement)) + pragma_indexes = self._get_table_pragma( + connection, "index_list", table_name, schema=schema) indexes = [] - while True: - row = c.fetchone() - if row is None: - break + + include_auto_indexes = kw.pop('include_auto_indexes', False) + for row in pragma_indexes: # ignore implicit primary key index. # http://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html - elif (not include_auto_indexes and - row[1].startswith('sqlite_autoindex')): + if (not include_auto_indexes and + row[1].startswith('sqlite_autoindex')): continue indexes.append(dict(name=row[1], column_names=[], unique=row[2])) + # loop thru unique indexes to get the column names. for idx in indexes: - statement = "%sindex_info(%s)" % (pragma, quote(idx['name'])) - c = connection.execute(statement) - cols = idx['column_names'] - while True: - row = c.fetchone() - if row is None: - break - cols.append(row[2]) + pragma_index = self._get_table_pragma( + connection, "index_info", idx['name']) + + for row in pragma_index: + idx['column_names'].append(row[2]) return indexes @reflection.cache - def get_unique_constraints(self, connection, table_name, - schema=None, **kw): + def _get_table_sql(self, connection, table_name, schema=None, **kw): try: s = ("SELECT sql FROM " " (SELECT * FROM sqlite_master UNION ALL " @@ -1165,27 +1245,22 @@ class SQLiteDialect(default.DefaultDialect): s = ("SELECT sql FROM sqlite_master WHERE name = '%s' " "AND type = 'table'") % table_name rs = connection.execute(s) - row = rs.fetchone() - if row is None: - # sqlite won't return the schema for the sqlite_master or - # sqlite_temp_master tables from this query. These tables - # don't have any unique constraints anyway. - return [] - table_data = row[0] - - UNIQUE_PATTERN = '(?:CONSTRAINT (\w+) )?UNIQUE \(([^\)]+)\)' - return [ - {'name': name, - 'column_names': [col.strip(' "') for col in cols.split(',')]} - for name, cols in re.findall(UNIQUE_PATTERN, table_data) - ] + return rs.scalar() - -def _pragma_cursor(cursor): - """work around SQLite issue whereby cursor.description - is blank when PRAGMA returns no rows.""" - - if cursor.closed: - cursor.fetchone = lambda: None - cursor.fetchall = lambda: [] - return cursor + def _get_table_pragma(self, connection, pragma, table_name, schema=None): + quote = self.identifier_preparer.quote_identifier + if schema is not None: + statement = "PRAGMA %s." % quote(schema) + else: + statement = "PRAGMA " + qtable = quote(table_name) + statement = "%s%s(%s)" % (statement, pragma, qtable) + cursor = connection.execute(statement) + if not cursor.closed: + # work around SQLite issue whereby cursor.description + # is blank when PRAGMA returns no rows: + # http://www.sqlite.org/cvstrac/tktview?tn=1884 + result = cursor.fetchall() + else: + result = [] + return result diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py index b4524dc273..44e4eda42a 100644 --- a/test/dialect/test_sqlite.py +++ b/test/dialect/test_sqlite.py @@ -22,6 +22,7 @@ from sqlalchemy.testing import fixtures, AssertsCompiledSQL, \ from sqlalchemy import testing from sqlalchemy.schema import CreateTable from sqlalchemy.engine.reflection import Inspector +from sqlalchemy.testing import mock class TestTypes(fixtures.TestBase, AssertsExecutionResults): @@ -500,30 +501,6 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): # assert j.onclause.compare(table1.c['"id"'] # == table2.c['"aid"']) - def test_legacy_quoted_identifiers_unit(self): - dialect = sqlite.dialect() - dialect._broken_fk_pragma_quotes = True - - for row in [ - (0, 'target', 'tid', 'id'), - (0, '"target"', 'tid', 'id'), - (0, '[target]', 'tid', 'id'), - (0, "'target'", 'tid', 'id'), - (0, '`target`', 'tid', 'id'), - ]: - fks = {} - fkeys = [] - dialect._parse_fk(fks, fkeys, *row) - eq_( - fkeys, - [{ - 'referred_table': 'target', - 'referred_columns': ['id'], - 'referred_schema': None, - 'name': None, - 'constrained_columns': ['tid'] - }]) - @testing.provide_metadata def test_description_encoding(self): # amazingly, pysqlite seems to still deliver cursor.description @@ -557,69 +534,6 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): e = create_engine('sqlite+pysqlite:///foo.db') assert e.pool.__class__ is pool.NullPool - @testing.provide_metadata - def test_dont_reflect_autoindex(self): - meta = self.metadata - Table('foo', meta, Column('bar', String, primary_key=True)) - meta.create_all() - inspector = Inspector(testing.db) - eq_(inspector.get_indexes('foo'), []) - eq_( - inspector.get_indexes('foo', include_auto_indexes=True), - [{ - 'unique': 1, - 'name': 'sqlite_autoindex_foo_1', - 'column_names': ['bar']}]) - - @testing.provide_metadata - def test_create_index_with_schema(self): - """Test creation of index with explicit schema""" - - meta = self.metadata - Table( - 'foo', meta, Column('bar', String, index=True), - schema='main') - meta.create_all() - inspector = Inspector(testing.db) - eq_( - inspector.get_indexes('foo', schema='main'), - [{'unique': 0, 'name': u'ix_main_foo_bar', - 'column_names': [u'bar']}]) - - @testing.provide_metadata - def test_get_unique_constraints(self): - meta = self.metadata - Table( - 'foo', meta, Column('f', Integer), - UniqueConstraint('f', name='foo_f')) - Table( - 'bar', meta, Column('b', Integer), - UniqueConstraint('b', name='bar_b'), - prefixes=['TEMPORARY']) - meta.create_all() - inspector = Inspector(testing.db) - eq_(inspector.get_unique_constraints('foo'), - [{'column_names': [u'f'], 'name': u'foo_f'}]) - eq_(inspector.get_unique_constraints('bar'), - [{'column_names': [u'b'], 'name': u'bar_b'}]) - - def test_get_unnamed_unique_constraints(self): - meta = MetaData(testing.db) - t1 = Table('foo', meta, Column('f', Integer), - UniqueConstraint('f')) - t2 = Table('bar', meta, Column('b', Integer), - UniqueConstraint('b'), - prefixes=['TEMPORARY']) - meta.create_all() - from sqlalchemy.engine.reflection import Inspector - try: - inspector = Inspector(testing.db) - eq_(inspector.get_unique_constraints('foo'), - [{'column_names': [u'f'], 'name': u''}]) - eq_(inspector.get_unique_constraints('bar'), - [{'column_names': [u'b'], 'name': u''}]) - finally: - meta.drop_all() class AttachedMemoryDBTest(fixtures.TestBase): @@ -1072,52 +986,306 @@ class ReflectHeadlessFKsTest(fixtures.TestBase): assert b.c.id.references(a.c.id) -class ReflectFKConstraintTest(fixtures.TestBase): +class ConstraintReflectionTest(fixtures.TestBase): __only_on__ = 'sqlite' - def setup(self): - testing.db.execute("CREATE TABLE a1 (id INTEGER PRIMARY KEY)") - testing.db.execute("CREATE TABLE a2 (id INTEGER PRIMARY KEY)") - testing.db.execute( - "CREATE TABLE b (id INTEGER PRIMARY KEY, " - "FOREIGN KEY(id) REFERENCES a1(id)," - "FOREIGN KEY(id) REFERENCES a2(id)" - ")") - testing.db.execute( - "CREATE TABLE c (id INTEGER, " - "CONSTRAINT bar PRIMARY KEY(id)," - "CONSTRAINT foo1 FOREIGN KEY(id) REFERENCES a1(id)," - "CONSTRAINT foo2 FOREIGN KEY(id) REFERENCES a2(id)" - ")") + @classmethod + def setup_class(cls): + with testing.db.begin() as conn: + + conn.execute("CREATE TABLE a1 (id INTEGER PRIMARY KEY)") + conn.execute("CREATE TABLE a2 (id INTEGER PRIMARY KEY)") + conn.execute( + "CREATE TABLE b (id INTEGER PRIMARY KEY, " + "FOREIGN KEY(id) REFERENCES a1(id)," + "FOREIGN KEY(id) REFERENCES a2(id)" + ")") + conn.execute( + "CREATE TABLE c (id INTEGER, " + "CONSTRAINT bar PRIMARY KEY(id)," + "CONSTRAINT foo1 FOREIGN KEY(id) REFERENCES a1(id)," + "CONSTRAINT foo2 FOREIGN KEY(id) REFERENCES a2(id)" + ")") + conn.execute( + # the lower casing + inline is intentional here + "CREATE TABLE d (id INTEGER, x INTEGER unique)") + conn.execute( + # the lower casing + inline is intentional here + 'CREATE TABLE d1 ' + '(id INTEGER, "some ( STUPID n,ame" INTEGER unique)') + conn.execute( + # the lower casing + inline is intentional here + 'CREATE TABLE d2 ( "some STUPID n,ame" INTEGER unique)') + conn.execute( + # the lower casing + inline is intentional here + 'CREATE TABLE d3 ( "some STUPID n,ame" INTEGER NULL unique)') + + conn.execute( + # lower casing + inline is intentional + "CREATE TABLE e (id INTEGER, x INTEGER references a2(id))") + conn.execute( + 'CREATE TABLE e1 (id INTEGER, "some ( STUPID n,ame" INTEGER ' + 'references a2 ("some ( STUPID n,ame"))') + conn.execute( + 'CREATE TABLE e2 (id INTEGER, ' + '"some ( STUPID n,ame" INTEGER NOT NULL ' + 'references a2 ("some ( STUPID n,ame"))') + + conn.execute( + "CREATE TABLE f (x INTEGER, CONSTRAINT foo_fx UNIQUE(x))" + ) + conn.execute( + "CREATE TEMPORARY TABLE g " + "(x INTEGER, CONSTRAINT foo_gx UNIQUE(x))" + ) + conn.execute( + # intentional broken casing + "CREATE TABLE h (x INTEGER, COnstraINT foo_hx unIQUE(x))" + ) + conn.execute( + "CREATE TABLE i (x INTEGER, y INTEGER, PRIMARY KEY(x, y))" + ) + conn.execute( + "CREATE TABLE j (id INTEGER, q INTEGER, p INTEGER, " + "PRIMARY KEY(id), FOreiGN KEY(q,p) REFERENCes i(x,y))" + ) + conn.execute( + "CREATE TABLE k (id INTEGER, q INTEGER, p INTEGER, " + "PRIMARY KEY(id), " + "conSTRAINT my_fk FOreiGN KEY ( q , p ) " + "REFERENCes i ( x , y ))" + ) - def teardown(self): - testing.db.execute("drop table c") - testing.db.execute("drop table b") - testing.db.execute("drop table a1") - testing.db.execute("drop table a2") + meta = MetaData() + Table( + 'l', meta, Column('bar', String, index=True), + schema='main') + + Table( + 'm', meta, + Column('id', Integer, primary_key=True), + Column('x', String(30)), + UniqueConstraint('x') + ) - def test_name_is_none(self): + Table( + 'n', meta, + Column('id', Integer, primary_key=True), + Column('x', String(30)), + UniqueConstraint('x'), + prefixes=['TEMPORARY'] + ) + + meta.create_all(conn) + + # will contain an "autoindex" + conn.execute("create table o (foo varchar(20) primary key)") + + @classmethod + def teardown_class(cls): + with testing.db.begin() as conn: + for name in [ + "m", "main.l", "k", "j", "i", "h", "g", "f", "e", "e1", + "d", "d1", "d2", "c", "b", "a1", "a2"]: + conn.execute("drop table %s" % name) + + def test_legacy_quoted_identifiers_unit(self): + dialect = sqlite.dialect() + dialect._broken_fk_pragma_quotes = True + + for row in [ + (0, None, 'target', 'tid', 'id', None), + (0, None, '"target"', 'tid', 'id', None), + (0, None, '[target]', 'tid', 'id', None), + (0, None, "'target'", 'tid', 'id', None), + (0, None, '`target`', 'tid', 'id', None), + ]: + def _get_table_pragma(*arg, **kw): + return [row] + + def _get_table_sql(*arg, **kw): + return "CREATE TABLE foo "\ + "(tid INTEGER, "\ + "FOREIGN KEY(tid) REFERENCES %s (id))" % row[2] + with mock.patch.object( + dialect, "_get_table_pragma", _get_table_pragma): + with mock.patch.object( + dialect, '_get_table_sql', _get_table_sql): + + fkeys = dialect.get_foreign_keys(None, 'foo') + eq_( + fkeys, + [{ + 'referred_table': 'target', + 'referred_columns': ['id'], + 'referred_schema': None, + 'name': None, + 'constrained_columns': ['tid'] + }]) + + def test_foreign_key_name_is_none(self): # and not "0" - meta = MetaData() - b = Table('b', meta, autoload=True, autoload_with=testing.db) + inspector = Inspector(testing.db) + fks = inspector.get_foreign_keys('b') eq_( - [con.name for con in b.constraints], - [None, None, None] + fks, + [ + {'referred_table': 'a1', 'referred_columns': ['id'], + 'referred_schema': None, 'name': None, + 'constrained_columns': ['id']}, + {'referred_table': 'a2', 'referred_columns': ['id'], + 'referred_schema': None, 'name': None, + 'constrained_columns': ['id']}, + ] ) - def test_name_not_none(self): - # we don't have names for PK constraints, - # it appears we get back None in the pragma for - # FKs also (also it doesn't even appear to be documented on - # sqlite's docs - # at http://www.sqlite.org/pragma.html#pragma_foreign_key_list - # how did we ever know that's the "name" field ??) + def test_foreign_key_name_is_not_none(self): + inspector = Inspector(testing.db) + fks = inspector.get_foreign_keys('c') + eq_( + fks, + [ + { + 'referred_table': 'a1', 'referred_columns': ['id'], + 'referred_schema': None, 'name': 'foo1', + 'constrained_columns': ['id']}, + { + 'referred_table': 'a2', 'referred_columns': ['id'], + 'referred_schema': None, 'name': 'foo2', + 'constrained_columns': ['id']}, + ] + ) - meta = MetaData() - c = Table('c', meta, autoload=True, autoload_with=testing.db) + def test_unnamed_inline_foreign_key(self): + inspector = Inspector(testing.db) + fks = inspector.get_foreign_keys('e') + eq_( + fks, + [{ + 'referred_table': 'a2', 'referred_columns': ['id'], + 'referred_schema': None, + 'name': None, 'constrained_columns': ['x'] + }] + ) + + def test_unnamed_inline_foreign_key_quoted(self): + inspector = Inspector(testing.db) + + inspector = Inspector(testing.db) + fks = inspector.get_foreign_keys('e1') + eq_( + fks, + [{ + 'referred_table': 'a2', + 'referred_columns': ['some ( STUPID n,ame'], + 'referred_schema': None, + 'name': None, 'constrained_columns': ['some ( STUPID n,ame'] + }] + ) + fks = inspector.get_foreign_keys('e2') + eq_( + fks, + [{ + 'referred_table': 'a2', + 'referred_columns': ['some ( STUPID n,ame'], + 'referred_schema': None, + 'name': None, 'constrained_columns': ['some ( STUPID n,ame'] + }] + ) + + def test_foreign_key_composite_broken_casing(self): + inspector = Inspector(testing.db) + fks = inspector.get_foreign_keys('j') + eq_( + fks, + [{ + 'referred_table': 'i', + 'referred_columns': ['x', 'y'], + 'referred_schema': None, 'name': None, + 'constrained_columns': ['q', 'p']}] + ) + fks = inspector.get_foreign_keys('k') + eq_( + fks, + [{'referred_table': 'i', 'referred_columns': ['x', 'y'], + 'referred_schema': None, 'name': 'my_fk', + 'constrained_columns': ['q', 'p']}] + ) + + def test_dont_reflect_autoindex(self): + inspector = Inspector(testing.db) + eq_(inspector.get_indexes('o'), []) + eq_( + inspector.get_indexes('o', include_auto_indexes=True), + [{ + 'unique': 1, + 'name': 'sqlite_autoindex_o_1', + 'column_names': ['foo']}]) + + def test_create_index_with_schema(self): + """Test creation of index with explicit schema""" + + inspector = Inspector(testing.db) + eq_( + inspector.get_indexes('l', schema='main'), + [{'unique': 0, 'name': u'ix_main_l_bar', + 'column_names': [u'bar']}]) + + def test_unique_constraint_named(self): + inspector = Inspector(testing.db) + eq_( + inspector.get_unique_constraints("f"), + [{'column_names': ['x'], 'name': 'foo_fx'}] + ) + + def test_unique_constraint_named_broken_casing(self): + inspector = Inspector(testing.db) + eq_( + inspector.get_unique_constraints("h"), + [{'column_names': ['x'], 'name': 'foo_hx'}] + ) + + def test_unique_constraint_named_broken_temp(self): + inspector = Inspector(testing.db) + eq_( + inspector.get_unique_constraints("g"), + [{'column_names': ['x'], 'name': 'foo_gx'}] + ) + + def test_unique_constraint_unnamed_inline(self): + inspector = Inspector(testing.db) + eq_( + inspector.get_unique_constraints("d"), + [{'column_names': ['x'], 'name': None}] + ) + + def test_unique_constraint_unnamed_inline_quoted(self): + inspector = Inspector(testing.db) + eq_( + inspector.get_unique_constraints("d1"), + [{'column_names': ['some ( STUPID n,ame'], 'name': None}] + ) + eq_( + inspector.get_unique_constraints("d2"), + [{'column_names': ['some STUPID n,ame'], 'name': None}] + ) + eq_( + inspector.get_unique_constraints("d3"), + [{'column_names': ['some STUPID n,ame'], 'name': None}] + ) + + def test_unique_constraint_unnamed_normal(self): + inspector = Inspector(testing.db) + eq_( + inspector.get_unique_constraints("m"), + [{'column_names': ['x'], 'name': None}] + ) + + def test_unique_constraint_unnamed_normal_temporary(self): + inspector = Inspector(testing.db) eq_( - set([con.name for con in c.constraints]), - set([None, None]) + inspector.get_unique_constraints("n"), + [{'column_names': ['x'], 'name': None}] ) -- 2.47.3