]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- rework sqlite FK and unique constraint system to combine both PRAGMA
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 13 Dec 2014 23:04:11 +0000 (18:04 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 14 Dec 2014 00:24:56 +0000 (19:24 -0500)
and regexp parsing of SQL in order to form a complete picture of
constraints + their names.  fixes #3244 fixes #3261
- factor various PRAGMA work to be centralized into one call

doc/build/changelog/changelog_09.rst
doc/build/changelog/changelog_10.rst
doc/build/changelog/migration_10.rst
lib/sqlalchemy/dialects/sqlite/base.py
test/dialect/test_sqlite.py

index f83afd2da4a4bdf64c8e798f1a2afc5cfbeab856..4198279594ccf9cd874118326a263ca5d6931079 100644 (file)
         replaced, however if the mapping were already used for querying, the
         old relationship would still be referenced within some registries.
 
-    .. change::
-        :tags: bug, sqlite
-        :versions: 1.0.0
-        :tickets: 3244
-
-        Fixed issue where un-named UNIQUE constraints were not being
-        reflected in SQLite. Now un-named UNIQUE constraints are returned
-        with a name of u''.
-
     .. change::
         :tags: bug, sql
         :versions: 1.0.0
index d6f36e97ea07ca75fdf04c83fcbbe64bf646ed13..4da7b94568832addba2127fd3fcb340dd7c1fb7f 100644 (file)
     series as well.  For changes that are specific to 1.0 with an emphasis
     on compatibility concerns, see :doc:`/changelog/migration_10`.
 
+    .. change::
+        :tags: bug, sqlite
+        :tickets: 3244, 3261
+
+        UNIQUE and FOREIGN KEY constraints are now fully reflected on
+        SQLite both with and without names.  Previously, foreign key
+        names were ignored and unnamed unique constraints were skipped.
+        Thanks to Jon Nelson for assistance with this.
+
     .. change::
         :tags: feature, examples
 
index cd5d420e532bc4e0b088d906c484427cac7dd6d1..e1fb13662b15370e4aff1f0667e62c614bca801b 100644 (file)
@@ -1680,6 +1680,25 @@ reflection from temp tables as well, which is :ticket:`3203`.
 
 :ticket:`3204`
 
+SQLite named and unnamed UNIQUE and FOREIGN KEY constraints will inspect and reflect
+-------------------------------------------------------------------------------------
+
+UNIQUE and FOREIGN KEY constraints are now fully reflected on
+SQLite both with and without names.  Previously, foreign key
+names were ignored and unnamed unique constraints were skipped.   In particular
+this will help with Alembic's new SQLite migration features.
+
+To achieve this, for both foreign keys and unique constraints, the result
+of PRAGMA foreign_keys, index_list, and index_info is combined with regular
+expression parsing of the CREATE TABLE statement overall to form a complete
+picture of the names of constraints, as well as differentiating UNIQUE
+constraints that were created as UNIQUE vs. unnamed INDEXes.
+
+:ticket:`3244`
+
+:ticket:`3261`
+
+
 .. _change_3220:
 
 Improved support for CTEs in Oracle
index 30d8a6ea3367a134db9ea567f6d602247ce05a07..e79299527aec0ea593198b78e8b4912999a5caf6 100644 (file)
@@ -913,22 +913,9 @@ class SQLiteDialect(default.DefaultDialect):
         return [row[0] for row in rs]
 
     def has_table(self, connection, table_name, schema=None):
-        quote = self.identifier_preparer.quote_identifier
-        if schema is not None:
-            pragma = "PRAGMA %s." % quote(schema)
-        else:
-            pragma = "PRAGMA "
-        qtable = quote(table_name)
-        statement = "%stable_info(%s)" % (pragma, qtable)
-        cursor = _pragma_cursor(connection.execute(statement))
-        row = cursor.fetchone()
-
-        # consume remaining rows, to work around
-        # http://www.sqlite.org/cvstrac/tktview?tn=1884
-        while not cursor.closed and cursor.fetchone() is not None:
-            pass
-
-        return row is not None
+        info = self._get_table_pragma(
+            connection, "table_info", table_name, schema=schema)
+        return bool(info)
 
     @reflection.cache
     def get_view_names(self, connection, schema=None, **kw):
@@ -970,18 +957,11 @@ class SQLiteDialect(default.DefaultDialect):
 
     @reflection.cache
     def get_columns(self, connection, table_name, schema=None, **kw):
-        quote = self.identifier_preparer.quote_identifier
-        if schema is not None:
-            pragma = "PRAGMA %s." % quote(schema)
-        else:
-            pragma = "PRAGMA "
-        qtable = quote(table_name)
-        statement = "%stable_info(%s)" % (pragma, qtable)
-        c = _pragma_cursor(connection.execute(statement))
+        info = self._get_table_pragma(
+            connection, "table_info", table_name, schema=schema)
 
-        rows = c.fetchall()
         columns = []
-        for row in rows:
+        for row in info:
             (name, type_, nullable, default, primary_key) = (
                 row[1], row[2].upper(), not row[3], row[4], row[5])
 
@@ -1068,92 +1048,192 @@ class SQLiteDialect(default.DefaultDialect):
 
     @reflection.cache
     def get_foreign_keys(self, connection, table_name, schema=None, **kw):
-        quote = self.identifier_preparer.quote_identifier
-        if schema is not None:
-            pragma = "PRAGMA %s." % quote(schema)
-        else:
-            pragma = "PRAGMA "
-        qtable = quote(table_name)
-        statement = "%sforeign_key_list(%s)" % (pragma, qtable)
-        c = _pragma_cursor(connection.execute(statement))
-        fkeys = []
+        # sqlite makes this *extremely difficult*.
+        # First, use the pragma to get the actual FKs.
+        pragma_fks = self._get_table_pragma(
+            connection, "foreign_key_list",
+            table_name, schema=schema
+        )
+
         fks = {}
-        while True:
-            row = c.fetchone()
-            if row is None:
-                break
+
+        for row in pragma_fks:
             (numerical_id, rtbl, lcol, rcol) = (
                 row[0], row[2], row[3], row[4])
 
-            self._parse_fk(fks, fkeys, numerical_id, rtbl, lcol, rcol)
-        return fkeys
+            if rcol is None:
+                rcol = lcol
 
-    def _parse_fk(self, fks, fkeys, numerical_id, rtbl, lcol, rcol):
-        # sqlite won't return rcol if the table was created with REFERENCES
-        # <tablename>, no col
-        if rcol is None:
-            rcol = lcol
+            if self._broken_fk_pragma_quotes:
+                rtbl = re.sub(r'^[\"\[`\']|[\"\]`\']$', '', rtbl)
 
-        if self._broken_fk_pragma_quotes:
-            rtbl = re.sub(r'^[\"\[`\']|[\"\]`\']$', '', rtbl)
+            if numerical_id in fks:
+                fk = fks[numerical_id]
+            else:
+                fk = fks[numerical_id] = {
+                    'name': None,
+                    'constrained_columns': [],
+                    'referred_schema': None,
+                    'referred_table': rtbl,
+                    'referred_columns': [],
+                }
+                fks[numerical_id] = fk
 
-        try:
-            fk = fks[numerical_id]
-        except KeyError:
-            fk = {
-                'name': None,
-                'constrained_columns': [],
-                'referred_schema': None,
-                'referred_table': rtbl,
-                'referred_columns': [],
-            }
-            fkeys.append(fk)
-            fks[numerical_id] = fk
-
-        if lcol not in fk['constrained_columns']:
             fk['constrained_columns'].append(lcol)
-        if rcol not in fk['referred_columns']:
             fk['referred_columns'].append(rcol)
-        return fk
+
+        def fk_sig(constrained_columns, referred_table, referred_columns):
+            return tuple(constrained_columns) + (referred_table,) + \
+                tuple(referred_columns)
+
+        # then, parse the actual SQL and attempt to find DDL that matches
+        # the names as well.   SQLite saves the DDL in whatever format
+        # it was typed in as, so need to be liberal here.
+
+        keys_by_signature = dict(
+            (
+                fk_sig(
+                    fk['constrained_columns'],
+                    fk['referred_table'], fk['referred_columns']),
+                fk
+            ) for fk in fks.values()
+        )
+
+        table_data = self._get_table_sql(connection, table_name, schema=schema)
+        if table_data is None:
+            # system tables, etc.
+            return []
+
+        def parse_fks():
+            FK_PATTERN = (
+                '(?:CONSTRAINT (\w+) +)?'
+                'FOREIGN KEY *\( *(.+?) *\) +'
+                'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\((.+?)\)'
+            )
+
+            for match in re.finditer(FK_PATTERN, table_data, re.I):
+                (
+                    constraint_name, constrained_columns,
+                    referred_quoted_name, referred_name,
+                    referred_columns) = match.group(1, 2, 3, 4, 5)
+                constrained_columns = list(
+                    self._find_cols_in_sig(constrained_columns))
+                if not referred_columns:
+                    referred_columns = constrained_columns
+                else:
+                    referred_columns = list(
+                        self._find_cols_in_sig(referred_columns))
+                referred_name = referred_quoted_name or referred_name
+                yield (
+                    constraint_name, constrained_columns,
+                    referred_name, referred_columns)
+        fkeys = []
+
+        for (
+            constraint_name, constrained_columns,
+                referred_name, referred_columns) in parse_fks():
+            sig = fk_sig(
+                constrained_columns, referred_name, referred_columns)
+            if sig not in keys_by_signature:
+                util.warn(
+                    "WARNING: SQL-parsed foreign key constraint "
+                    "'%s' could not be located in PRAGMA "
+                    "foreign_keys for table %s" % (
+                        sig,
+                        table_name
+                    ))
+                continue
+            key = keys_by_signature.pop(sig)
+            key['name'] = constraint_name
+            fkeys.append(key)
+        # assume the remainders are the unnamed, inline constraints, just
+        # use them as is as it's extremely difficult to parse inline
+        # constraints
+        fkeys.extend(keys_by_signature.values())
+        return fkeys
+
+    def _find_cols_in_sig(self, sig):
+        for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I):
+            yield match.group(1) or match.group(2)
+
+    @reflection.cache
+    def get_unique_constraints(self, connection, table_name,
+                               schema=None, **kw):
+
+        auto_index_by_sig = {}
+        for idx in self.get_indexes(
+                connection, table_name, schema=schema,
+                include_auto_indexes=True, **kw):
+            if not idx['name'].startswith("sqlite_autoindex"):
+                continue
+            sig = tuple(idx['column_names'])
+            auto_index_by_sig[sig] = idx
+
+        table_data = self._get_table_sql(
+            connection, table_name, schema=schema, **kw)
+        if not table_data:
+            return []
+
+        unique_constraints = []
+
+        def parse_uqs():
+            UNIQUE_PATTERN = '(?:CONSTRAINT (\w+) +)?UNIQUE *\((.+?)\)'
+            INLINE_UNIQUE_PATTERN = (
+                '(?:(".+?")|([a-z0-9]+)) '
+                '+[a-z0-9_ ]+? +UNIQUE')
+
+            for match in re.finditer(UNIQUE_PATTERN, table_data, re.I):
+                name, cols = match.group(1, 2)
+                yield name, list(self._find_cols_in_sig(cols))
+
+            # we need to match inlines as well, as we seek to differentiate
+            # a UNIQUE constraint from a UNIQUE INDEX, even though these
+            # are kind of the same thing :)
+            for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I):
+                cols = list(
+                    self._find_cols_in_sig(match.group(1) or match.group(2)))
+                yield None, cols
+
+        for name, cols in parse_uqs():
+            sig = tuple(cols)
+            if sig in auto_index_by_sig:
+                auto_index_by_sig.pop(sig)
+                parsed_constraint = {
+                    'name': name,
+                    'column_names': cols
+                }
+                unique_constraints.append(parsed_constraint)
+        # NOTE: auto_index_by_sig might not be empty here,
+        # the PRIMARY KEY may have an entry.
+        return unique_constraints
 
     @reflection.cache
     def get_indexes(self, connection, table_name, schema=None, **kw):
-        quote = self.identifier_preparer.quote_identifier
-        if schema is not None:
-            pragma = "PRAGMA %s." % quote(schema)
-        else:
-            pragma = "PRAGMA "
-        include_auto_indexes = kw.pop('include_auto_indexes', False)
-        qtable = quote(table_name)
-        statement = "%sindex_list(%s)" % (pragma, qtable)
-        c = _pragma_cursor(connection.execute(statement))
+        pragma_indexes = self._get_table_pragma(
+            connection, "index_list", table_name, schema=schema)
         indexes = []
-        while True:
-            row = c.fetchone()
-            if row is None:
-                break
+
+        include_auto_indexes = kw.pop('include_auto_indexes', False)
+        for row in pragma_indexes:
             # ignore implicit primary key index.
             # http://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html
-            elif (not include_auto_indexes and
-                  row[1].startswith('sqlite_autoindex')):
+            if (not include_auto_indexes and
+                    row[1].startswith('sqlite_autoindex')):
                 continue
 
             indexes.append(dict(name=row[1], column_names=[], unique=row[2]))
+
         # loop thru unique indexes to get the column names.
         for idx in indexes:
-            statement = "%sindex_info(%s)" % (pragma, quote(idx['name']))
-            c = connection.execute(statement)
-            cols = idx['column_names']
-            while True:
-                row = c.fetchone()
-                if row is None:
-                    break
-                cols.append(row[2])
+            pragma_index = self._get_table_pragma(
+                connection, "index_info", idx['name'])
+
+            for row in pragma_index:
+                idx['column_names'].append(row[2])
         return indexes
 
     @reflection.cache
-    def get_unique_constraints(self, connection, table_name,
-                               schema=None, **kw):
+    def _get_table_sql(self, connection, table_name, schema=None, **kw):
         try:
             s = ("SELECT sql FROM "
                  " (SELECT * FROM sqlite_master UNION ALL "
@@ -1165,27 +1245,22 @@ class SQLiteDialect(default.DefaultDialect):
             s = ("SELECT sql FROM sqlite_master WHERE name = '%s' "
                  "AND type = 'table'") % table_name
             rs = connection.execute(s)
-        row = rs.fetchone()
-        if row is None:
-            # sqlite won't return the schema for the sqlite_master or
-            # sqlite_temp_master tables from this query. These tables
-            # don't have any unique constraints anyway.
-            return []
-        table_data = row[0]
-
-        UNIQUE_PATTERN = '(?:CONSTRAINT (\w+) )?UNIQUE \(([^\)]+)\)'
-        return [
-            {'name': name,
-             'column_names': [col.strip(' "') for col in cols.split(',')]}
-            for name, cols in re.findall(UNIQUE_PATTERN, table_data)
-        ]
+        return rs.scalar()
 
-
-def _pragma_cursor(cursor):
-    """work around SQLite issue whereby cursor.description
-    is blank when PRAGMA returns no rows."""
-
-    if cursor.closed:
-        cursor.fetchone = lambda: None
-        cursor.fetchall = lambda: []
-    return cursor
+    def _get_table_pragma(self, connection, pragma, table_name, schema=None):
+        quote = self.identifier_preparer.quote_identifier
+        if schema is not None:
+            statement = "PRAGMA %s." % quote(schema)
+        else:
+            statement = "PRAGMA "
+        qtable = quote(table_name)
+        statement = "%s%s(%s)" % (statement, pragma, qtable)
+        cursor = connection.execute(statement)
+        if not cursor.closed:
+            # work around SQLite issue whereby cursor.description
+            # is blank when PRAGMA returns no rows:
+            # http://www.sqlite.org/cvstrac/tktview?tn=1884
+            result = cursor.fetchall()
+        else:
+            result = []
+        return result
index b4524dc2731554a41cdc92dd79b558539b406f6d..44e4eda42a31b535086ac9d5efef1c81d6183025 100644 (file)
@@ -22,6 +22,7 @@ from sqlalchemy.testing import fixtures, AssertsCompiledSQL, \
 from sqlalchemy import testing
 from sqlalchemy.schema import CreateTable
 from sqlalchemy.engine.reflection import Inspector
+from sqlalchemy.testing import mock
 
 
 class TestTypes(fixtures.TestBase, AssertsExecutionResults):
@@ -500,30 +501,6 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
         # assert j.onclause.compare(table1.c['"id"']
         #        == table2.c['"aid"'])
 
-    def test_legacy_quoted_identifiers_unit(self):
-        dialect = sqlite.dialect()
-        dialect._broken_fk_pragma_quotes = True
-
-        for row in [
-            (0, 'target', 'tid', 'id'),
-            (0, '"target"', 'tid', 'id'),
-            (0, '[target]', 'tid', 'id'),
-            (0, "'target'", 'tid', 'id'),
-            (0, '`target`', 'tid', 'id'),
-        ]:
-            fks = {}
-            fkeys = []
-            dialect._parse_fk(fks, fkeys, *row)
-            eq_(
-                fkeys,
-                [{
-                    'referred_table': 'target',
-                    'referred_columns': ['id'],
-                    'referred_schema': None,
-                    'name': None,
-                    'constrained_columns': ['tid']
-                }])
-
     @testing.provide_metadata
     def test_description_encoding(self):
         # amazingly, pysqlite seems to still deliver cursor.description
@@ -557,69 +534,6 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
         e = create_engine('sqlite+pysqlite:///foo.db')
         assert e.pool.__class__ is pool.NullPool
 
-    @testing.provide_metadata
-    def test_dont_reflect_autoindex(self):
-        meta = self.metadata
-        Table('foo', meta, Column('bar', String, primary_key=True))
-        meta.create_all()
-        inspector = Inspector(testing.db)
-        eq_(inspector.get_indexes('foo'), [])
-        eq_(
-            inspector.get_indexes('foo', include_auto_indexes=True),
-            [{
-                'unique': 1,
-                'name': 'sqlite_autoindex_foo_1',
-                'column_names': ['bar']}])
-
-    @testing.provide_metadata
-    def test_create_index_with_schema(self):
-        """Test creation of index with explicit schema"""
-
-        meta = self.metadata
-        Table(
-            'foo', meta, Column('bar', String, index=True),
-            schema='main')
-        meta.create_all()
-        inspector = Inspector(testing.db)
-        eq_(
-            inspector.get_indexes('foo', schema='main'),
-            [{'unique': 0, 'name': u'ix_main_foo_bar',
-              'column_names': [u'bar']}])
-
-    @testing.provide_metadata
-    def test_get_unique_constraints(self):
-        meta = self.metadata
-        Table(
-            'foo', meta, Column('f', Integer),
-            UniqueConstraint('f', name='foo_f'))
-        Table(
-            'bar', meta, Column('b', Integer),
-            UniqueConstraint('b', name='bar_b'),
-            prefixes=['TEMPORARY'])
-        meta.create_all()
-        inspector = Inspector(testing.db)
-        eq_(inspector.get_unique_constraints('foo'),
-            [{'column_names': [u'f'], 'name': u'foo_f'}])
-        eq_(inspector.get_unique_constraints('bar'),
-            [{'column_names': [u'b'], 'name': u'bar_b'}])
-
-    def test_get_unnamed_unique_constraints(self):
-        meta = MetaData(testing.db)
-        t1 = Table('foo', meta, Column('f', Integer),
-                   UniqueConstraint('f'))
-        t2 = Table('bar', meta, Column('b', Integer),
-                   UniqueConstraint('b'),
-                   prefixes=['TEMPORARY'])
-        meta.create_all()
-        from sqlalchemy.engine.reflection import Inspector
-        try:
-            inspector = Inspector(testing.db)
-            eq_(inspector.get_unique_constraints('foo'),
-                [{'column_names': [u'f'], 'name': u''}])
-            eq_(inspector.get_unique_constraints('bar'),
-                [{'column_names': [u'b'], 'name': u''}])
-        finally:
-            meta.drop_all()
 
 
 class AttachedMemoryDBTest(fixtures.TestBase):
@@ -1072,52 +986,306 @@ class ReflectHeadlessFKsTest(fixtures.TestBase):
         assert b.c.id.references(a.c.id)
 
 
-class ReflectFKConstraintTest(fixtures.TestBase):
+class ConstraintReflectionTest(fixtures.TestBase):
     __only_on__ = 'sqlite'
 
-    def setup(self):
-        testing.db.execute("CREATE TABLE a1 (id INTEGER PRIMARY KEY)")
-        testing.db.execute("CREATE TABLE a2 (id INTEGER PRIMARY KEY)")
-        testing.db.execute(
-            "CREATE TABLE b (id INTEGER PRIMARY KEY, "
-            "FOREIGN KEY(id) REFERENCES a1(id),"
-            "FOREIGN KEY(id) REFERENCES a2(id)"
-            ")")
-        testing.db.execute(
-            "CREATE TABLE c (id INTEGER, "
-            "CONSTRAINT bar PRIMARY KEY(id),"
-            "CONSTRAINT foo1 FOREIGN KEY(id) REFERENCES a1(id),"
-            "CONSTRAINT foo2 FOREIGN KEY(id) REFERENCES a2(id)"
-            ")")
+    @classmethod
+    def setup_class(cls):
+        with testing.db.begin() as conn:
+
+            conn.execute("CREATE TABLE a1 (id INTEGER PRIMARY KEY)")
+            conn.execute("CREATE TABLE a2 (id INTEGER PRIMARY KEY)")
+            conn.execute(
+                "CREATE TABLE b (id INTEGER PRIMARY KEY, "
+                "FOREIGN KEY(id) REFERENCES a1(id),"
+                "FOREIGN KEY(id) REFERENCES a2(id)"
+                ")")
+            conn.execute(
+                "CREATE TABLE c (id INTEGER, "
+                "CONSTRAINT bar PRIMARY KEY(id),"
+                "CONSTRAINT foo1 FOREIGN KEY(id) REFERENCES a1(id),"
+                "CONSTRAINT foo2 FOREIGN KEY(id) REFERENCES a2(id)"
+                ")")
+            conn.execute(
+                # the lower casing + inline is intentional here
+                "CREATE TABLE d (id INTEGER, x INTEGER unique)")
+            conn.execute(
+                # the lower casing + inline is intentional here
+                'CREATE TABLE d1 '
+                '(id INTEGER, "some ( STUPID n,ame" INTEGER unique)')
+            conn.execute(
+                # the lower casing + inline is intentional here
+                'CREATE TABLE d2 ( "some STUPID n,ame" INTEGER unique)')
+            conn.execute(
+                # the lower casing + inline is intentional here
+                'CREATE TABLE d3 ( "some STUPID n,ame" INTEGER NULL unique)')
+
+            conn.execute(
+                # lower casing + inline is intentional
+                "CREATE TABLE e (id INTEGER, x INTEGER references a2(id))")
+            conn.execute(
+                'CREATE TABLE e1 (id INTEGER, "some ( STUPID n,ame" INTEGER '
+                'references a2   ("some ( STUPID n,ame"))')
+            conn.execute(
+                'CREATE TABLE e2 (id INTEGER, '
+                '"some ( STUPID n,ame" INTEGER NOT NULL  '
+                'references a2   ("some ( STUPID n,ame"))')
+
+            conn.execute(
+                "CREATE TABLE f (x INTEGER, CONSTRAINT foo_fx UNIQUE(x))"
+            )
+            conn.execute(
+                "CREATE TEMPORARY TABLE g "
+                "(x INTEGER, CONSTRAINT foo_gx UNIQUE(x))"
+            )
+            conn.execute(
+                # intentional broken casing
+                "CREATE TABLE h (x INTEGER, COnstraINT foo_hx unIQUE(x))"
+            )
+            conn.execute(
+                "CREATE TABLE i (x INTEGER, y INTEGER, PRIMARY KEY(x, y))"
+            )
+            conn.execute(
+                "CREATE TABLE j (id INTEGER, q INTEGER, p INTEGER, "
+                "PRIMARY KEY(id), FOreiGN KEY(q,p) REFERENCes  i(x,y))"
+            )
+            conn.execute(
+                "CREATE TABLE k (id INTEGER, q INTEGER, p INTEGER, "
+                "PRIMARY KEY(id), "
+                "conSTRAINT my_fk FOreiGN KEY (  q  , p  )   "
+                "REFERENCes   i    (  x ,   y ))"
+            )
 
-    def teardown(self):
-        testing.db.execute("drop table c")
-        testing.db.execute("drop table b")
-        testing.db.execute("drop table a1")
-        testing.db.execute("drop table a2")
+            meta = MetaData()
+            Table(
+                'l', meta, Column('bar', String, index=True),
+                schema='main')
+
+            Table(
+                'm', meta,
+                Column('id', Integer, primary_key=True),
+                Column('x', String(30)),
+                UniqueConstraint('x')
+            )
 
-    def test_name_is_none(self):
+            Table(
+                'n', meta,
+                Column('id', Integer, primary_key=True),
+                Column('x', String(30)),
+                UniqueConstraint('x'),
+                prefixes=['TEMPORARY']
+            )
+
+            meta.create_all(conn)
+
+            # will contain an "autoindex"
+            conn.execute("create table o (foo varchar(20) primary key)")
+
+    @classmethod
+    def teardown_class(cls):
+        with testing.db.begin() as conn:
+            for name in [
+                "m", "main.l", "k", "j", "i", "h", "g", "f", "e", "e1",
+                    "d", "d1", "d2", "c", "b", "a1", "a2"]:
+                conn.execute("drop table %s" % name)
+
+    def test_legacy_quoted_identifiers_unit(self):
+        dialect = sqlite.dialect()
+        dialect._broken_fk_pragma_quotes = True
+
+        for row in [
+            (0, None, 'target', 'tid', 'id', None),
+            (0, None, '"target"', 'tid', 'id', None),
+            (0, None, '[target]', 'tid', 'id', None),
+            (0, None, "'target'", 'tid', 'id', None),
+            (0, None, '`target`', 'tid', 'id', None),
+        ]:
+            def _get_table_pragma(*arg, **kw):
+                return [row]
+
+            def _get_table_sql(*arg, **kw):
+                return "CREATE TABLE foo "\
+                    "(tid INTEGER, "\
+                    "FOREIGN KEY(tid) REFERENCES %s (id))" % row[2]
+            with mock.patch.object(
+                    dialect, "_get_table_pragma", _get_table_pragma):
+                with mock.patch.object(
+                        dialect, '_get_table_sql', _get_table_sql):
+
+                    fkeys = dialect.get_foreign_keys(None, 'foo')
+                    eq_(
+                        fkeys,
+                        [{
+                            'referred_table': 'target',
+                            'referred_columns': ['id'],
+                            'referred_schema': None,
+                            'name': None,
+                            'constrained_columns': ['tid']
+                        }])
+
+    def test_foreign_key_name_is_none(self):
         # and not "0"
-        meta = MetaData()
-        b = Table('b', meta, autoload=True, autoload_with=testing.db)
+        inspector = Inspector(testing.db)
+        fks = inspector.get_foreign_keys('b')
         eq_(
-            [con.name for con in b.constraints],
-            [None, None, None]
+            fks,
+            [
+                {'referred_table': 'a1', 'referred_columns': ['id'],
+                 'referred_schema': None, 'name': None,
+                 'constrained_columns': ['id']},
+                {'referred_table': 'a2', 'referred_columns': ['id'],
+                 'referred_schema': None, 'name': None,
+                 'constrained_columns': ['id']},
+            ]
         )
 
-    def test_name_not_none(self):
-        # we don't have names for PK constraints,
-        # it appears we get back None in the pragma for
-        # FKs also (also it doesn't even appear to be documented on
-        # sqlite's docs
-        # at http://www.sqlite.org/pragma.html#pragma_foreign_key_list
-        # how did we ever know that's the "name" field ??)
+    def test_foreign_key_name_is_not_none(self):
+        inspector = Inspector(testing.db)
+        fks = inspector.get_foreign_keys('c')
+        eq_(
+            fks,
+            [
+                {
+                    'referred_table': 'a1', 'referred_columns': ['id'],
+                    'referred_schema': None, 'name': 'foo1',
+                    'constrained_columns': ['id']},
+                {
+                    'referred_table': 'a2', 'referred_columns': ['id'],
+                    'referred_schema': None, 'name': 'foo2',
+                    'constrained_columns': ['id']},
+            ]
+        )
 
-        meta = MetaData()
-        c = Table('c', meta, autoload=True, autoload_with=testing.db)
+    def test_unnamed_inline_foreign_key(self):
+        inspector = Inspector(testing.db)
+        fks = inspector.get_foreign_keys('e')
+        eq_(
+            fks,
+            [{
+                'referred_table': 'a2', 'referred_columns': ['id'],
+                'referred_schema': None,
+                'name': None, 'constrained_columns': ['x']
+            }]
+        )
+
+    def test_unnamed_inline_foreign_key_quoted(self):
+        inspector = Inspector(testing.db)
+
+        inspector = Inspector(testing.db)
+        fks = inspector.get_foreign_keys('e1')
+        eq_(
+            fks,
+            [{
+                'referred_table': 'a2',
+                'referred_columns': ['some ( STUPID n,ame'],
+                'referred_schema': None,
+                'name': None, 'constrained_columns': ['some ( STUPID n,ame']
+            }]
+        )
+        fks = inspector.get_foreign_keys('e2')
+        eq_(
+            fks,
+            [{
+                'referred_table': 'a2',
+                'referred_columns': ['some ( STUPID n,ame'],
+                'referred_schema': None,
+                'name': None, 'constrained_columns': ['some ( STUPID n,ame']
+            }]
+        )
+
+    def test_foreign_key_composite_broken_casing(self):
+        inspector = Inspector(testing.db)
+        fks = inspector.get_foreign_keys('j')
+        eq_(
+            fks,
+            [{
+                'referred_table': 'i',
+                'referred_columns': ['x', 'y'],
+                'referred_schema': None, 'name': None,
+                'constrained_columns': ['q', 'p']}]
+        )
+        fks = inspector.get_foreign_keys('k')
+        eq_(
+            fks,
+            [{'referred_table': 'i', 'referred_columns': ['x', 'y'],
+             'referred_schema': None, 'name': 'my_fk',
+             'constrained_columns': ['q', 'p']}]
+        )
+
+    def test_dont_reflect_autoindex(self):
+        inspector = Inspector(testing.db)
+        eq_(inspector.get_indexes('o'), [])
+        eq_(
+            inspector.get_indexes('o', include_auto_indexes=True),
+            [{
+                'unique': 1,
+                'name': 'sqlite_autoindex_o_1',
+                'column_names': ['foo']}])
+
+    def test_create_index_with_schema(self):
+        """Test creation of index with explicit schema"""
+
+        inspector = Inspector(testing.db)
+        eq_(
+            inspector.get_indexes('l', schema='main'),
+            [{'unique': 0, 'name': u'ix_main_l_bar',
+              'column_names': [u'bar']}])
+
+    def test_unique_constraint_named(self):
+        inspector = Inspector(testing.db)
+        eq_(
+            inspector.get_unique_constraints("f"),
+            [{'column_names': ['x'], 'name': 'foo_fx'}]
+        )
+
+    def test_unique_constraint_named_broken_casing(self):
+        inspector = Inspector(testing.db)
+        eq_(
+            inspector.get_unique_constraints("h"),
+            [{'column_names': ['x'], 'name': 'foo_hx'}]
+        )
+
+    def test_unique_constraint_named_broken_temp(self):
+        inspector = Inspector(testing.db)
+        eq_(
+            inspector.get_unique_constraints("g"),
+            [{'column_names': ['x'], 'name': 'foo_gx'}]
+        )
+
+    def test_unique_constraint_unnamed_inline(self):
+        inspector = Inspector(testing.db)
+        eq_(
+            inspector.get_unique_constraints("d"),
+            [{'column_names': ['x'], 'name': None}]
+        )
+
+    def test_unique_constraint_unnamed_inline_quoted(self):
+        inspector = Inspector(testing.db)
+        eq_(
+            inspector.get_unique_constraints("d1"),
+            [{'column_names': ['some ( STUPID n,ame'], 'name': None}]
+        )
+        eq_(
+            inspector.get_unique_constraints("d2"),
+            [{'column_names': ['some STUPID n,ame'], 'name': None}]
+        )
+        eq_(
+            inspector.get_unique_constraints("d3"),
+            [{'column_names': ['some STUPID n,ame'], 'name': None}]
+        )
+
+    def test_unique_constraint_unnamed_normal(self):
+        inspector = Inspector(testing.db)
+        eq_(
+            inspector.get_unique_constraints("m"),
+            [{'column_names': ['x'], 'name': None}]
+        )
+
+    def test_unique_constraint_unnamed_normal_temporary(self):
+        inspector = Inspector(testing.db)
         eq_(
-            set([con.name for con in c.constraints]),
-            set([None, None])
+            inspector.get_unique_constraints("n"),
+            [{'column_names': ['x'], 'name': None}]
         )