]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- [bug] Fixed bug whereby "order_by='foreign_key'"
authorMike Bayer <mike_mp@zzzcomputing.com>
Thu, 1 Dec 2011 19:21:43 +0000 (14:21 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Thu, 1 Dec 2011 19:21:43 +0000 (14:21 -0500)
option to Inspector.get_table_names
wasn't implementing the sort properly, replaced
with the existing sort algorithm
- clean up metadata usage in reflection tests

CHANGES
lib/sqlalchemy/engine/reflection.py
test/engine/test_reflection.py

diff --git a/CHANGES b/CHANGES
index 04e45d97d54c9f1dc7038cbbfc58a97096ac4877..4d4af021bda8a46b7ff9f6b1ca1b2d4fe828bdb9 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -129,6 +129,11 @@ CHANGES
     using a TypeDecorator that "switches" types,
     like the CHAR/UUID type.
 
+  - [bug] Fixed bug whereby "order_by='foreign_key'"
+    option to Inspector.get_table_names 
+    wasn't implementing the sort properly, replaced
+    with the existing sort algorithm
+
 - postgresql
    - [bug] Postgresql dialect memoizes that an ENUM of a 
      particular name was processed
index e87c1ce0fd3791742169f78b4914c60fad6ef01a..a69fc5b98a9e6eb27cf6fbbc0802ab0384faac5c 100644 (file)
@@ -27,6 +27,7 @@ methods such as get_table_names, get_columns, etc.
 import sqlalchemy
 from sqlalchemy import exc, sql
 from sqlalchemy import util
+from sqlalchemy.util import topological
 from sqlalchemy.types import TypeEngine
 from sqlalchemy import schema as sa_schema
 
@@ -150,27 +151,19 @@ class Inspector(object):
 
         if hasattr(self.dialect, 'get_table_names'):
             tnames = self.dialect.get_table_names(self.bind,
-            schema,
-                                                    info_cache=self.info_cache)
+            schema, info_cache=self.info_cache)
         else:
             tnames = self.engine.table_names(schema)
         if order_by == 'foreign_key':
-            ordered_tnames = tnames[:]
-            # Order based on foreign key dependencies.
+            import random
+            random.shuffle(tnames)
+
+            tuples = []
             for tname in tnames:
-                table_pos = tnames.index(tname)
-                fkeys = self.get_foreign_keys(tname, schema)
-                for fkey in fkeys:
-                    rtable = fkey['referred_table']
-                    if rtable in ordered_tnames:
-                        ref_pos = ordered_tnames.index(rtable)
-                        # Make sure it's lower in the list than anything it
-                        # references.
-                        if table_pos > ref_pos:
-                            ordered_tnames.pop(table_pos) # rtable moves up 1
-                            # insert just below rtable
-                            ordered_tnames.index(ref_pos, tname)
-            tnames = ordered_tnames
+                for fkey in self.get_foreign_keys(tname, schema):
+                    if tname != fkey['referred_table']:
+                        tuples.append((tname, fkey['referred_table']))
+            tnames = list(topological.sort(tuples, tnames))
         return tnames
 
     def get_table_options(self, table_name, schema=None, **kw):
index 63a37b112c39db1b641c1eb89a696b65ca3f0f5d..4beeab14d7c545e18831d273b16073f5eb66c14b 100644 (file)
@@ -20,8 +20,9 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
                      'Date is only supported on MSSQL 2008+')
     @testing.exclude('mysql', '<', (4, 1, 1),
                      'early types are squirrely')
+    @testing.provide_metadata
     def test_basic_reflection(self):
-        meta = MetaData(testing.db)
+        meta = self.metadata
 
         users = Table('engine_users', meta,
             Column('user_id', sa.INT, primary_key=True),
@@ -53,20 +54,18 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
             )
         meta.create_all()
 
-        try:
-            meta2 = MetaData()
-            reflected_users = Table('engine_users', meta2,
-                                    autoload=True,
-                                    autoload_with=testing.db)
-            reflected_addresses = Table('engine_email_addresses',
-                    meta2, autoload=True, autoload_with=testing.db)
-            self.assert_tables_equal(users, reflected_users)
-            self.assert_tables_equal(addresses, reflected_addresses)
-        finally:
-            meta.drop_all()
+        meta2 = MetaData()
+        reflected_users = Table('engine_users', meta2,
+                                autoload=True,
+                                autoload_with=testing.db)
+        reflected_addresses = Table('engine_email_addresses',
+                meta2, autoload=True, autoload_with=testing.db)
+        self.assert_tables_equal(users, reflected_users)
+        self.assert_tables_equal(addresses, reflected_addresses)
 
+    @testing.provide_metadata
     def test_two_foreign_keys(self):
-        meta = MetaData(testing.db)
+        meta = self.metadata
         t1 = Table(
             't1',
             meta,
@@ -80,73 +79,67 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
         t3 = Table('t3', meta, Column('id', sa.Integer,
                    primary_key=True), test_needs_fk=True)
         meta.create_all()
-        try:
-            meta2 = MetaData()
-            t1r, t2r, t3r = [Table(x, meta2, autoload=True,
-                             autoload_with=testing.db) for x in ('t1',
-                             't2', 't3')]
-            assert t1r.c.t2id.references(t2r.c.id)
-            assert t1r.c.t3id.references(t3r.c.id)
-        finally:
-            meta.drop_all()
+        meta2 = MetaData()
+        t1r, t2r, t3r = [Table(x, meta2, autoload=True,
+                         autoload_with=testing.db) for x in ('t1',
+                         't2', 't3')]
+        assert t1r.c.t2id.references(t2r.c.id)
+        assert t1r.c.t3id.references(t3r.c.id)
 
     def test_nonexistent(self):
         meta = MetaData(testing.db)
         assert_raises(sa.exc.NoSuchTableError, Table, 'nonexistent',
                       meta, autoload=True)
 
+    @testing.provide_metadata
     def test_include_columns(self):
-        meta = MetaData(testing.db)
+        meta = self.metadata
         foo = Table('foo', meta, *[Column(n, sa.String(30))
                                    for n in ['a', 'b', 'c', 'd', 'e', 'f']])
         meta.create_all()
-        try:
-            meta2 = MetaData(testing.db)
-            foo = Table('foo', meta2, autoload=True,
-                        include_columns=['b', 'f', 'e'])
-            # test that cols come back in original order
-            eq_([c.name for c in foo.c], ['b', 'e', 'f'])
-            for c in ('b', 'f', 'e'):
-                assert c in foo.c
-            for c in ('a', 'c', 'd'):
-                assert c not in foo.c
-
-            # test against a table which is already reflected
-            meta3 = MetaData(testing.db)
-            foo = Table('foo', meta3, autoload=True)
-            foo = Table('foo', meta3, include_columns=['b', 'f', 'e'],
-                        extend_existing=True)
-            eq_([c.name for c in foo.c], ['b', 'e', 'f'])
-            for c in ('b', 'f', 'e'):
-                assert c in foo.c
-            for c in ('a', 'c', 'd'):
-                assert c not in foo.c
-        finally:
-            meta.drop_all()
+        meta2 = MetaData(testing.db)
+        foo = Table('foo', meta2, autoload=True,
+                    include_columns=['b', 'f', 'e'])
+        # test that cols come back in original order
+        eq_([c.name for c in foo.c], ['b', 'e', 'f'])
+        for c in ('b', 'f', 'e'):
+            assert c in foo.c
+        for c in ('a', 'c', 'd'):
+            assert c not in foo.c
+
+        # test against a table which is already reflected
+        meta3 = MetaData(testing.db)
+        foo = Table('foo', meta3, autoload=True)
+        foo = Table('foo', meta3, include_columns=['b', 'f', 'e'],
+                    extend_existing=True)
+        eq_([c.name for c in foo.c], ['b', 'e', 'f'])
+        for c in ('b', 'f', 'e'):
+            assert c in foo.c
+        for c in ('a', 'c', 'd'):
+            assert c not in foo.c
 
     @testing.emits_warning(r".*omitted columns")
+    @testing.provide_metadata
     def test_include_columns_indexes(self):
-        m = MetaData(testing.db)
+        m = self.metadata
 
         t1 = Table('t1', m, Column('a', sa.Integer), Column('b', sa.Integer))
         sa.Index('foobar', t1.c.a, t1.c.b)
         sa.Index('bat', t1.c.a)
         m.create_all()
-        try:
-            m2 = MetaData(testing.db)
-            t2 = Table('t1', m2, autoload=True)
-            assert len(t2.indexes) == 2
+        m2 = MetaData(testing.db)
+        t2 = Table('t1', m2, autoload=True)
+        assert len(t2.indexes) == 2
 
-            m2 = MetaData(testing.db)
-            t2 = Table('t1', m2, autoload=True, include_columns=['a'])
-            assert len(t2.indexes) == 1
+        m2 = MetaData(testing.db)
+        t2 = Table('t1', m2, autoload=True, include_columns=['a'])
+        assert len(t2.indexes) == 1
 
-            m2 = MetaData(testing.db)
-            t2 = Table('t1', m2, autoload=True, include_columns=['a', 'b'])
-            assert len(t2.indexes) == 2
-        finally:
-            m.drop_all()
+        m2 = MetaData(testing.db)
+        t2 = Table('t1', m2, autoload=True, include_columns=['a', 'b'])
+        assert len(t2.indexes) == 2
 
+    @testing.provide_metadata
     def test_autoincrement_col(self):
         """test that 'autoincrement' is reflected according to sqla's policy.
 
@@ -156,7 +149,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
 
         """
 
-        meta = MetaData(testing.db)
+        meta = self.metadata
         t1 = Table('test', meta,
             Column('id', sa.Integer, primary_key=True),
             Column('data', sa.String(50)),
@@ -170,19 +163,16 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
             mysql_engine='MyISAM'
         )
         meta.create_all()
-        try:
-            m2 = MetaData(testing.db)
-            t1a = Table('test', m2, autoload=True)
-            assert t1a._autoincrement_column is t1a.c.id
-
-            t2a = Table('test2', m2, autoload=True)
-            assert t2a._autoincrement_column is t2a.c.id2
+        m2 = MetaData(testing.db)
+        t1a = Table('test', m2, autoload=True)
+        assert t1a._autoincrement_column is t1a.c.id
 
-        finally:
-            meta.drop_all()
+        t2a = Table('test2', m2, autoload=True)
+        assert t2a._autoincrement_column is t2a.c.id2
 
+    @testing.provide_metadata
     def test_unknown_types(self):
-        meta = MetaData(testing.db)
+        meta = self.metadata
         t = Table("test", meta,
             Column('foo', sa.DateTime))
 
@@ -201,10 +191,10 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
 
         finally:
             testing.db.dialect.ischema_names = ischema_names
-            t.drop()
 
+    @testing.provide_metadata
     def test_basic_override(self):
-        meta = MetaData(testing.db)
+        meta = self.metadata
         table = Table(
             'override_test', meta,
             Column('col1', sa.Integer, primary_key=True),
@@ -214,24 +204,22 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
         table.create()
 
         meta2 = MetaData(testing.db)
-        try:
-            table = Table(
-                'override_test', meta2,
-                Column('col2', sa.Unicode()),
-                Column('col4', sa.String(30)), autoload=True)
-
-            self.assert_(isinstance(table.c.col1.type, sa.Integer))
-            self.assert_(isinstance(table.c.col2.type, sa.Unicode))
-            self.assert_(isinstance(table.c.col4.type, sa.String))
-        finally:
-            table.drop()
+        table = Table(
+            'override_test', meta2,
+            Column('col2', sa.Unicode()),
+            Column('col4', sa.String(30)), autoload=True)
+
+        self.assert_(isinstance(table.c.col1.type, sa.Integer))
+        self.assert_(isinstance(table.c.col2.type, sa.Unicode))
+        self.assert_(isinstance(table.c.col4.type, sa.String))
 
+    @testing.provide_metadata
     def test_override_pkfk(self):
         """test that you can override columns which contain foreign keys
         to other reflected tables, where the foreign key column is also
         a primary key column"""
 
-        meta = MetaData(testing.db)
+        meta = self.metadata
         users = Table('users', meta,
             Column('id', sa.Integer, primary_key=True),
             Column('name', sa.String(30)))
@@ -241,38 +229,35 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
 
 
         meta.create_all()
-        try:
-            meta2 = MetaData(testing.db)
-            a2 = Table('addresses', meta2,
-                Column('id', sa.Integer,
-                       sa.ForeignKey('users.id'), primary_key=True),
-                autoload=True)
-            u2 = Table('users', meta2, autoload=True)
-
-            assert list(a2.primary_key) == [a2.c.id]
-            assert list(u2.primary_key) == [u2.c.id]
-            assert u2.join(a2).onclause.compare(u2.c.id==a2.c.id)
-
-            meta3 = MetaData(testing.db)
-            u3 = Table('users', meta3, autoload=True)
-            a3 = Table('addresses', meta3,
-                Column('id', sa.Integer, sa.ForeignKey('users.id'),
-                       primary_key=True),
-                autoload=True)
-
-            assert list(a3.primary_key) == [a3.c.id]
-            assert list(u3.primary_key) == [u3.c.id]
-            assert u3.join(a3).onclause.compare(u3.c.id==a3.c.id)
-
-        finally:
-            meta.drop_all()
+        meta2 = MetaData(testing.db)
+        a2 = Table('addresses', meta2,
+            Column('id', sa.Integer,
+                   sa.ForeignKey('users.id'), primary_key=True),
+            autoload=True)
+        u2 = Table('users', meta2, autoload=True)
+
+        assert list(a2.primary_key) == [a2.c.id]
+        assert list(u2.primary_key) == [u2.c.id]
+        assert u2.join(a2).onclause.compare(u2.c.id==a2.c.id)
+
+        meta3 = MetaData(testing.db)
+        u3 = Table('users', meta3, autoload=True)
+        a3 = Table('addresses', meta3,
+            Column('id', sa.Integer, sa.ForeignKey('users.id'),
+                   primary_key=True),
+            autoload=True)
+
+        assert list(a3.primary_key) == [a3.c.id]
+        assert list(u3.primary_key) == [u3.c.id]
+        assert u3.join(a3).onclause.compare(u3.c.id==a3.c.id)
 
+    @testing.provide_metadata
     def test_override_nonexistent_fk(self):
         """test that you can override columns and create new foreign
         keys to other reflected tables which have no foreign keys.  this
         is common with MySQL MyISAM tables."""
 
-        meta = MetaData(testing.db)
+        meta = self.metadata
         users = Table('users', meta,
             Column('id', sa.Integer, primary_key=True),
             Column('name', sa.String(30)))
@@ -282,53 +267,50 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
             Column('user_id', sa.Integer))
 
         meta.create_all()
-        try:
-            meta2 = MetaData(testing.db)
-            a2 = Table('addresses', meta2,
-                    Column('user_id',sa.Integer, sa.ForeignKey('users.id')),
+        meta2 = MetaData(testing.db)
+        a2 = Table('addresses', meta2,
+                Column('user_id',sa.Integer, sa.ForeignKey('users.id')),
+               autoload=True)
+        u2 = Table('users', meta2, autoload=True)
+        assert len(a2.c.user_id.foreign_keys) == 1
+        assert len(a2.foreign_keys) == 1
+        assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
+        assert [c.parent for c in a2.c.user_id.foreign_keys] \
+            == [a2.c.user_id]
+        assert list(a2.c.user_id.foreign_keys)[0].parent \
+            is a2.c.user_id
+        assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id)
+        meta3 = MetaData(testing.db)
+
+        u3 = Table('users', meta3, autoload=True)
+
+        a3 = Table('addresses', meta3, Column('user_id',
+                   sa.Integer, sa.ForeignKey('users.id')),
                    autoload=True)
-            u2 = Table('users', meta2, autoload=True)
-            assert len(a2.c.user_id.foreign_keys) == 1
-            assert len(a2.foreign_keys) == 1
-            assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
-            assert [c.parent for c in a2.c.user_id.foreign_keys] \
-                == [a2.c.user_id]
-            assert list(a2.c.user_id.foreign_keys)[0].parent \
-                is a2.c.user_id
-            assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id)
-            meta3 = MetaData(testing.db)
-
-            u3 = Table('users', meta3, autoload=True)
-
-            a3 = Table('addresses', meta3, Column('user_id',
-                       sa.Integer, sa.ForeignKey('users.id')),
-                       autoload=True)
-            assert u3.join(a3).onclause.compare(u3.c.id == a3.c.user_id)
-
-            meta4 = MetaData(testing.db)
-
-            u4 = Table('users', meta4, 
-                    Column('id', sa.Integer, key='u_id', primary_key=True),
-                    autoload=True)
-
-            a4 = Table(
-                'addresses',
-                meta4,
-                Column('id', sa.Integer, key='street',
-                       primary_key=True),
-                Column('street', sa.String(30), key='user_id'),
-                Column('user_id', sa.Integer, sa.ForeignKey('users.u_id'
-                       ), key='id'),
-                autoload=True,
-                )
-            assert u4.join(a4).onclause.compare(u4.c.u_id == a4.c.id)
-            assert list(u4.primary_key) == [u4.c.u_id]
-            assert len(u4.columns) == 2
-            assert len(u4.constraints) == 1
-            assert len(a4.columns) == 3
-            assert len(a4.constraints) == 2
-        finally:
-            meta.drop_all()
+        assert u3.join(a3).onclause.compare(u3.c.id == a3.c.user_id)
+
+        meta4 = MetaData(testing.db)
+
+        u4 = Table('users', meta4, 
+                Column('id', sa.Integer, key='u_id', primary_key=True),
+                autoload=True)
+
+        a4 = Table(
+            'addresses',
+            meta4,
+            Column('id', sa.Integer, key='street',
+                   primary_key=True),
+            Column('street', sa.String(30), key='user_id'),
+            Column('user_id', sa.Integer, sa.ForeignKey('users.u_id'
+                   ), key='id'),
+            autoload=True,
+            )
+        assert u4.join(a4).onclause.compare(u4.c.u_id == a4.c.id)
+        assert list(u4.primary_key) == [u4.c.u_id]
+        assert len(u4.columns) == 2
+        assert len(u4.constraints) == 1
+        assert len(a4.columns) == 3
+        assert len(a4.constraints) == 2
 
     @testing.provide_metadata
     def test_override_composite_fk(self):
@@ -369,12 +351,12 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
 
 
 
+    @testing.provide_metadata
     def test_override_keys(self):
         """test that columns can be overridden with a 'key', 
         and that ForeignKey targeting during reflection still works."""
 
-
-        meta = MetaData(testing.db)
+        meta = self.metadata
         a1 = Table('a', meta,
             Column('x', sa.Integer, primary_key=True),
             Column('z', sa.Integer),
@@ -385,16 +367,13 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
             test_needs_fk=True
         )
         meta.create_all()
-        try:
-            m2 = MetaData(testing.db)
-            a2 = Table('a', m2, 
-                    Column('x', sa.Integer, primary_key=True, key='x1'),
-                    autoload=True)
-            b2 = Table('b', m2, autoload=True)
-            assert a2.join(b2).onclause.compare(a2.c.x1 == b2.c.y)
-            assert b2.c.y.references(a2.c.x1)
-        finally:
-            meta.drop_all()
+        m2 = MetaData(testing.db)
+        a2 = Table('a', m2, 
+                Column('x', sa.Integer, primary_key=True, key='x1'),
+                autoload=True)
+        b2 = Table('b', m2, autoload=True)
+        assert a2.join(b2).onclause.compare(a2.c.x1 == b2.c.y)
+        assert b2.c.y.references(a2.c.x1)
 
     @testing.provide_metadata
     def test_nonreflected_fk_raises(self):
@@ -422,12 +401,13 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
         assert_raises(sa.exc.NoReferencedColumnError, a2.join, b2)
 
     @testing.exclude('mysql', '<', (4, 1, 1), 'innodb funkiness')
+    @testing.provide_metadata
     def test_override_existing_fk(self):
         """test that you can override columns and specify new foreign
         keys to other reflected tables, on columns which *do* already
         have that foreign key, and that the FK is not duped. """
 
-        meta = MetaData(testing.db)
+        meta = self.metadata
         users = Table('users', meta,
             Column('id', sa.Integer, primary_key=True),
             Column('name', sa.String(30)),
@@ -438,46 +418,42 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
             test_needs_fk=True)
 
         meta.create_all()
-        try:
-            meta2 = MetaData(testing.db)
-            a2 = Table('addresses', meta2, 
-                    Column('user_id',sa.Integer, sa.ForeignKey('users.id')),
-                    autoload=True)
-            u2 = Table('users', meta2, autoload=True)
-            s = sa.select([a2])
-
-            assert s.c.user_id is not None
-            assert len(a2.foreign_keys) == 1
-            assert len(a2.c.user_id.foreign_keys) == 1
-            assert len(a2.constraints) == 2
-            assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
-            assert [c.parent for c in a2.c.user_id.foreign_keys] \
-                == [a2.c.user_id]
-            assert list(a2.c.user_id.foreign_keys)[0].parent \
-                is a2.c.user_id
-            assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id)
-
-            meta2 = MetaData(testing.db)
-            u2 = Table('users', meta2, Column('id', sa.Integer,
-                       primary_key=True), autoload=True)
-            a2 = Table('addresses', meta2, Column('id', sa.Integer,
-                       primary_key=True), Column('user_id', sa.Integer,
-                       sa.ForeignKey('users.id')), autoload=True)
-            s = sa.select([a2])
-
-            assert s.c.user_id is not None
-            assert len(a2.foreign_keys) == 1
-            assert len(a2.c.user_id.foreign_keys) == 1
-            assert len(a2.constraints) == 2
-            assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
-            assert [c.parent for c in a2.c.user_id.foreign_keys] \
-                == [a2.c.user_id]
-            assert list(a2.c.user_id.foreign_keys)[0].parent \
-                is a2.c.user_id
-            assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id)
+        meta2 = MetaData(testing.db)
+        a2 = Table('addresses', meta2, 
+                Column('user_id',sa.Integer, sa.ForeignKey('users.id')),
+                autoload=True)
+        u2 = Table('users', meta2, autoload=True)
+        s = sa.select([a2])
+
+        assert s.c.user_id is not None
+        assert len(a2.foreign_keys) == 1
+        assert len(a2.c.user_id.foreign_keys) == 1
+        assert len(a2.constraints) == 2
+        assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
+        assert [c.parent for c in a2.c.user_id.foreign_keys] \
+            == [a2.c.user_id]
+        assert list(a2.c.user_id.foreign_keys)[0].parent \
+            is a2.c.user_id
+        assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id)
 
-        finally:
-            meta.drop_all()
+        meta2 = MetaData(testing.db)
+        u2 = Table('users', meta2, Column('id', sa.Integer,
+                   primary_key=True), autoload=True)
+        a2 = Table('addresses', meta2, Column('id', sa.Integer,
+                   primary_key=True), Column('user_id', sa.Integer,
+                   sa.ForeignKey('users.id')), autoload=True)
+        s = sa.select([a2])
+
+        assert s.c.user_id is not None
+        assert len(a2.foreign_keys) == 1
+        assert len(a2.c.user_id.foreign_keys) == 1
+        assert len(a2.constraints) == 2
+        assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
+        assert [c.parent for c in a2.c.user_id.foreign_keys] \
+            == [a2.c.user_id]
+        assert list(a2.c.user_id.foreign_keys)[0].parent \
+            is a2.c.user_id
+        assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id)
 
     def test_pks_not_uniques(self):
         """test that primary key reflection not tripped up by unique
@@ -539,10 +515,11 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
             testing.db.execute("drop table book")
 
     @testing.exclude('mysql', '<', (4, 1, 1), 'innodb funkiness')
+    @testing.provide_metadata
     def test_composite_fk(self):
         """test reflection of composite foreign keys"""
 
-        meta = MetaData(testing.db)
+        meta = self.metadata
         multi = Table(
             'multi', meta,
             Column('multi_id', sa.Integer, primary_key=True),
@@ -565,22 +542,19 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
         )
         meta.create_all()
 
-        try:
-            meta2 = MetaData()
-            table = Table('multi', meta2, autoload=True,
-                          autoload_with=testing.db)
-            table2 = Table('multi2', meta2, autoload=True,
-                           autoload_with=testing.db)
-            self.assert_tables_equal(multi, table)
-            self.assert_tables_equal(multi2, table2)
-            j = sa.join(table, table2)
-
-            self.assert_(sa.and_(table.c.multi_id == table2.c.foo,
-                         table.c.multi_rev == table2.c.bar,
-                         table.c.multi_hoho
-                         == table2.c.lala).compare(j.onclause))
-        finally:
-            meta.drop_all()
+        meta2 = MetaData()
+        table = Table('multi', meta2, autoload=True,
+                      autoload_with=testing.db)
+        table2 = Table('multi2', meta2, autoload=True,
+                       autoload_with=testing.db)
+        self.assert_tables_equal(multi, table)
+        self.assert_tables_equal(multi2, table2)
+        j = sa.join(table, table2)
+
+        self.assert_(sa.and_(table.c.multi_id == table2.c.foo,
+                     table.c.multi_rev == table2.c.bar,
+                     table.c.multi_hoho
+                     == table2.c.lala).compare(j.onclause))
 
 
     @testing.crashes('oracle', 'FIXME: unknown, confirm not fails_on')
@@ -588,12 +562,13 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
                         "FIXME: should be supported via the "
                         "DELIMITED env var but that breaks "
                         "everything else for now")
+    @testing.provide_metadata
     def test_reserved(self):
 
         # check a table that uses an SQL reserved name doesn't cause an
         # error
 
-        meta = MetaData(testing.db)
+        meta = self.metadata
         table_a = Table('select', meta, Column('not', sa.Integer,
                         primary_key=True), Column('from',
                         sa.String(12), nullable=False),
@@ -625,13 +600,11 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
         meta.create_all()
         index_c.drop()
         meta2 = MetaData(testing.db)
-        try:
-            table_a2 = Table('select', meta2, autoload=True)
-            table_b2 = Table('false', meta2, autoload=True)
-            table_c2 = Table('is', meta2, autoload=True)
-        finally:
-            meta.drop_all()
+        table_a2 = Table('select', meta2, autoload=True)
+        table_b2 = Table('false', meta2, autoload=True)
+        table_c2 = Table('is', meta2, autoload=True)
 
+    @testing.provide_metadata
     def test_reflect_all(self):
         existing = testing.db.table_names()
 
@@ -642,57 +615,55 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
             self.assert_(name not in existing)
         self.assert_('rt_f' not in existing)
 
-        baseline = MetaData(testing.db)
+        baseline = self.metadata
         for name in names:
             Table(name, baseline, Column('id', sa.Integer, primary_key=True))
         baseline.create_all()
 
+        m1 = MetaData(testing.db)
+        self.assert_(not m1.tables)
+        m1.reflect()
+        self.assert_(nameset.issubset(set(m1.tables.keys())))
+
+        m2 = MetaData()
+        m2.reflect(testing.db, only=['rt_a', 'rt_b'])
+        self.assert_(set(m2.tables.keys()) == set(['rt_a', 'rt_b']))
+
+        m3 = MetaData()
+        c = testing.db.connect()
+        m3.reflect(bind=c, only=lambda name, meta: name == 'rt_c')
+        self.assert_(set(m3.tables.keys()) == set(['rt_c']))
+
+        m4 = MetaData(testing.db)
         try:
-            m1 = MetaData(testing.db)
-            self.assert_(not m1.tables)
-            m1.reflect()
-            self.assert_(nameset.issubset(set(m1.tables.keys())))
-
-            m2 = MetaData()
-            m2.reflect(testing.db, only=['rt_a', 'rt_b'])
-            self.assert_(set(m2.tables.keys()) == set(['rt_a', 'rt_b']))
-
-            m3 = MetaData()
-            c = testing.db.connect()
-            m3.reflect(bind=c, only=lambda name, meta: name == 'rt_c')
-            self.assert_(set(m3.tables.keys()) == set(['rt_c']))
-
-            m4 = MetaData(testing.db)
-            try:
-                m4.reflect(only=['rt_a', 'rt_f'])
-                self.assert_(False)
-            except sa.exc.InvalidRequestError, e:
-                self.assert_(e.args[0].endswith('(rt_f)'))
-
-            m5 = MetaData(testing.db)
-            m5.reflect(only=[])
-            self.assert_(not m5.tables)
-
-            m6 = MetaData(testing.db)
-            m6.reflect(only=lambda n, m: False)
-            self.assert_(not m6.tables)
-
-            m7 = MetaData(testing.db, reflect=True)
-            self.assert_(nameset.issubset(set(m7.tables.keys())))
-
-            try:
-                m8 = MetaData(reflect=True)
-                self.assert_(False)
-            except sa.exc.ArgumentError, e:
-                self.assert_(e.args[0]
-                             == 'A bind must be supplied in '
-                             'conjunction with reflect=True')
-        finally:
-            baseline.drop_all()
+            m4.reflect(only=['rt_a', 'rt_f'])
+            self.assert_(False)
+        except sa.exc.InvalidRequestError, e:
+            self.assert_(e.args[0].endswith('(rt_f)'))
+
+        m5 = MetaData(testing.db)
+        m5.reflect(only=[])
+        self.assert_(not m5.tables)
+
+        m6 = MetaData(testing.db)
+        m6.reflect(only=lambda n, m: False)
+        self.assert_(not m6.tables)
+
+        m7 = MetaData(testing.db, reflect=True)
+        self.assert_(nameset.issubset(set(m7.tables.keys())))
+
+        try:
+            m8 = MetaData(reflect=True)
+            self.assert_(False)
+        except sa.exc.ArgumentError, e:
+            self.assert_(e.args[0]
+                         == 'A bind must be supplied in '
+                         'conjunction with reflect=True')
 
         if existing:
             print "Other tables present in database, skipping some checks."
         else:
+            baseline.drop_all()
             m9 = MetaData(testing.db)
             m9.reflect()
             self.assert_(not m9.tables)
@@ -709,8 +680,9 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
         i = Inspector.from_engine(testing.db)
         assert not c.closed
 
+    @testing.provide_metadata
     def test_index_reflection(self):
-        m1 = MetaData(testing.db)
+        m1 = self.metadata
         t1 = Table('party', m1,
             Column('id', sa.Integer, nullable=False),
             Column('name', sa.String(20), index=True)
@@ -718,32 +690,29 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
         i1 = sa.Index('idx1', t1.c.id, unique=True)
         i2 = sa.Index('idx2', t1.c.name, t1.c.id, unique=False)
         m1.create_all()
-        try:
-            m2 = MetaData(testing.db)
-            t2 = Table('party', m2, autoload=True)
-
-            assert len(t2.indexes) == 3
-            # Make sure indexes are in the order we expect them in
-            tmp = [(idx.name, idx) for idx in t2.indexes]
-            tmp.sort()
-            r1, r2, r3 = [idx[1] for idx in tmp]
-
-            assert r1.name == 'idx1'
-            assert r2.name == 'idx2'
-            assert r1.unique == True
-            assert r2.unique == False
-            assert r3.unique == False
-            assert set([t2.c.id]) == set(r1.columns)
-            assert set([t2.c.name, t2.c.id]) == set(r2.columns)
-            assert set([t2.c.name]) == set(r3.columns)
-        finally:
-            m1.drop_all()
+        m2 = MetaData(testing.db)
+        t2 = Table('party', m2, autoload=True)
+
+        assert len(t2.indexes) == 3
+        # Make sure indexes are in the order we expect them in
+        tmp = [(idx.name, idx) for idx in t2.indexes]
+        tmp.sort()
+        r1, r2, r3 = [idx[1] for idx in tmp]
+
+        assert r1.name == 'idx1'
+        assert r2.name == 'idx2'
+        assert r1.unique == True
+        assert r2.unique == False
+        assert r3.unique == False
+        assert set([t2.c.id]) == set(r1.columns)
+        assert set([t2.c.name, t2.c.id]) == set(r2.columns)
+        assert set([t2.c.name]) == set(r3.columns)
 
     @testing.requires.views
     @testing.provide_metadata
     def test_views(self):
         metadata = self.metadata
-        users, addresses = createTables(metadata, None)
+        users, addresses, dingalings = createTables(metadata)
         try:
             metadata.create_all()
             _create_views(metadata.bind, None)
@@ -765,7 +734,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
     @testing.provide_metadata
     def test_reflect_all_with_views(self):
         metadata = self.metadata
-        users, addresses = createTables(metadata, None)
+        users, addresses, dingalings = createTables(metadata, None)
         try:
             metadata.create_all()
             _create_views(metadata.bind, None)
@@ -774,15 +743,15 @@ class ReflectionTest(fixtures.TestBase, ComparesTables):
             m2.reflect(views=False)
             eq_(
                 set(m2.tables), 
-                set([u'users', u'email_addresses'])
+                set(['users', 'email_addresses', 'dingalings'])
             )
 
             m2 = MetaData(testing.db)
             m2.reflect(views=True)
             eq_(
                 set(m2.tables), 
-                set([u'email_addresses_v', u'users_v', 
-                            u'users', u'email_addresses'])
+                set(['email_addresses_v', 'users_v', 
+                            'users', 'dingalings', 'email_addresses'])
             )
         finally:
             _drop_views(metadata.bind)
@@ -1034,8 +1003,10 @@ class SchemaTest(fixtures.TestBase):
         metadata.create_all()
         m2 = MetaData(schema="test_schema", bind=testing.db)
         m2.reflect()
-        eq_(m2.tables.keys(), 
-            [u'test_schema.users', u'test_schema.email_addresses']
+        eq_(
+            set(m2.tables), 
+            set(['test_schema.dingalings', 'test_schema.users', 
+                'test_schema.email_addresses'])
         )
 
 class HasSequenceTest(fixtures.TestBase):
@@ -1085,13 +1056,9 @@ class HasSequenceTest(fixtures.TestBase):
 
 def createTables(meta, schema=None):
     if schema:
-        parent_user_id = Column('parent_user_id', sa.Integer,
-            sa.ForeignKey('%s.users.user_id' % schema)
-        )
+        schema_prefix = schema + "."
     else:
-        parent_user_id = Column('parent_user_id', sa.Integer,
-            sa.ForeignKey('users.user_id')
-        )
+        schema_prefix = ""
 
     users = Table('users', meta,
         Column('user_id', sa.INT, primary_key=True),
@@ -1102,7 +1069,8 @@ def createTables(meta, schema=None):
         Column('test4', sa.Numeric(10, 2), nullable = False),
         Column('test5', sa.Date),
         Column('test5_1', sa.TIMESTAMP),
-        parent_user_id,
+        Column('parent_user_id', sa.Integer,
+                    sa.ForeignKey('%susers.user_id' % schema_prefix)),
         Column('test6', sa.Date, nullable=False),
         Column('test7', sa.Text),
         Column('test8', sa.LargeBinary),
@@ -1112,6 +1080,14 @@ def createTables(meta, schema=None):
         schema=schema,
         test_needs_fk=True,
     )
+    dingalings = Table("dingalings", meta,
+              Column('dingaling_id', sa.Integer, primary_key=True),
+              Column('address_id', sa.Integer, 
+                    sa.ForeignKey('%semail_addresses.address_id' % schema_prefix)),
+              Column('data', sa.String(30)),
+              schema=schema,
+              test_needs_fk=True,
+        )
     addresses = Table('email_addresses', meta,
         Column('address_id', sa.Integer),
         Column('remote_user_id', sa.Integer,
@@ -1121,7 +1097,8 @@ def createTables(meta, schema=None):
         schema=schema,
         test_needs_fk=True,
     )
-    return (users, addresses)
+
+    return (users, addresses, dingalings)
 
 def createIndexes(con, schema=None):
     fullname = 'users'
@@ -1242,10 +1219,11 @@ class ComponentReflectionTest(fixtures.TestBase):
         insp = Inspector(testing.db)
         eq_(insp.default_schema_name, testing.db.dialect.default_schema_name)
 
+    @testing.provide_metadata
     def _test_get_table_names(self, schema=None, table_type='table',
                               order_by=None):
-        meta = MetaData(testing.db)
-        (users, addresses) = createTables(meta, schema)
+        meta = self.metadata
+        users, addresses, dingalings = createTables(meta, schema)
         meta.create_all()
         _create_views(meta.bind, schema)
         try:
@@ -1257,20 +1235,21 @@ class ComponentReflectionTest(fixtures.TestBase):
             else:
                 table_names = insp.get_table_names(schema,
                                                    order_by=order_by)
-                table_names.sort()
                 if order_by == 'foreign_key':
-                    answer = ['users', 'email_addresses']
+                    answer = ['dingalings', 'email_addresses', 'users']
+                    eq_(table_names, answer)
                 else:
-                    answer = ['email_addresses', 'users']
-            eq_(table_names, answer)
+                    answer = ['dingalings', 'email_addresses', 'users']
+                    eq_(sorted(table_names), answer)
         finally:
             _drop_views(meta.bind, schema)
-            addresses.drop()
-            users.drop()
 
     def test_get_table_names(self):
         self._test_get_table_names()
 
+    def test_get_table_names_fks(self):
+        self._test_get_table_names(order_by='foreign_key')
+
     @testing.requires.schemas
     def test_get_table_names_with_schema(self):
         self._test_get_table_names('test_schema')
@@ -1285,7 +1264,7 @@ class ComponentReflectionTest(fixtures.TestBase):
 
     def _test_get_columns(self, schema=None, table_type='table'):
         meta = MetaData(testing.db)
-        users, addresses = createTables(meta, schema)
+        users, addresses, dingalings = createTables(meta, schema)
         table_names = ['users', 'email_addresses']
         meta.create_all()
         if table_type == 'view':
@@ -1331,8 +1310,7 @@ class ComponentReflectionTest(fixtures.TestBase):
         finally:
             if table_type == 'view':
                 _drop_views(meta.bind, schema)
-            addresses.drop()
-            users.drop()
+            meta.drop_all()
 
     def test_get_columns(self):
         self._test_get_columns()
@@ -1350,29 +1328,25 @@ class ComponentReflectionTest(fixtures.TestBase):
     def test_get_view_columns_with_schema(self):
         self._test_get_columns(schema='test_schema', table_type='view')
 
+    @testing.provide_metadata
     def _test_get_primary_keys(self, schema=None):
-        meta = MetaData(testing.db)
-        (users, addresses) = createTables(meta, schema)
+        meta = self.metadata
+        users, addresses, dingalings = createTables(meta, schema)
         meta.create_all()
         insp = Inspector(meta.bind)
-        try:
-            users_pkeys = insp.get_primary_keys(users.name,
-                                                schema=schema)
-            eq_(users_pkeys,  ['user_id'])
-            addr_cons = insp.get_pk_constraint(addresses.name,
-                                                schema=schema)
+        users_pkeys = insp.get_primary_keys(users.name,
+                                            schema=schema)
+        eq_(users_pkeys,  ['user_id'])
+        addr_cons = insp.get_pk_constraint(addresses.name,
+                                            schema=schema)
 
-            addr_pkeys = addr_cons['constrained_columns']
-            eq_(addr_pkeys,  ['address_id'])
+        addr_pkeys = addr_cons['constrained_columns']
+        eq_(addr_pkeys,  ['address_id'])
 
-            @testing.requires.reflects_pk_names
-            def go():
-                eq_(addr_cons['name'], 'email_ad_pk')
-            go()
-
-        finally:
-            addresses.drop()
-            users.drop()
+        @testing.requires.reflects_pk_names
+        def go():
+            eq_(addr_cons['name'], 'email_ad_pk')
+        go()
 
     def test_get_primary_keys(self):
         self._test_get_primary_keys()
@@ -1381,34 +1355,31 @@ class ComponentReflectionTest(fixtures.TestBase):
     def test_get_primary_keys_with_schema(self):
         self._test_get_primary_keys(schema='test_schema')
 
+    @testing.provide_metadata
     def _test_get_foreign_keys(self, schema=None):
-        meta = MetaData(testing.db)
-        (users, addresses) = createTables(meta, schema)
+        meta = self.metadata
+        users, addresses, dingalings = createTables(meta, schema)
         meta.create_all()
         insp = Inspector(meta.bind)
-        try:
-            expected_schema = schema
-            # users
-            users_fkeys = insp.get_foreign_keys(users.name,
-                                                schema=schema)
-            fkey1 = users_fkeys[0]
-            self.assert_(fkey1['name'] is not None)
-            eq_(fkey1['referred_schema'], expected_schema)
-            eq_(fkey1['referred_table'], users.name)
-            eq_(fkey1['referred_columns'], ['user_id', ])
-            eq_(fkey1['constrained_columns'], ['parent_user_id'])
-            #addresses
-            addr_fkeys = insp.get_foreign_keys(addresses.name,
-                                               schema=schema)
-            fkey1 = addr_fkeys[0]
-            self.assert_(fkey1['name'] is not None)
-            eq_(fkey1['referred_schema'], expected_schema)
-            eq_(fkey1['referred_table'], users.name)
-            eq_(fkey1['referred_columns'], ['user_id', ])
-            eq_(fkey1['constrained_columns'], ['remote_user_id'])
-        finally:
-            addresses.drop()
-            users.drop()
+        expected_schema = schema
+        # users
+        users_fkeys = insp.get_foreign_keys(users.name,
+                                            schema=schema)
+        fkey1 = users_fkeys[0]
+        self.assert_(fkey1['name'] is not None)
+        eq_(fkey1['referred_schema'], expected_schema)
+        eq_(fkey1['referred_table'], users.name)
+        eq_(fkey1['referred_columns'], ['user_id', ])
+        eq_(fkey1['constrained_columns'], ['parent_user_id'])
+        #addresses
+        addr_fkeys = insp.get_foreign_keys(addresses.name,
+                                           schema=schema)
+        fkey1 = addr_fkeys[0]
+        self.assert_(fkey1['name'] is not None)
+        eq_(fkey1['referred_schema'], expected_schema)
+        eq_(fkey1['referred_table'], users.name)
+        eq_(fkey1['referred_columns'], ['user_id', ])
+        eq_(fkey1['constrained_columns'], ['remote_user_id'])
 
     def test_get_foreign_keys(self):
         self._test_get_foreign_keys()
@@ -1417,30 +1388,26 @@ class ComponentReflectionTest(fixtures.TestBase):
     def test_get_foreign_keys_with_schema(self):
         self._test_get_foreign_keys(schema='test_schema')
 
+    @testing.provide_metadata
     def _test_get_indexes(self, schema=None):
-        meta = MetaData(testing.db)
-        (users, addresses) = createTables(meta, schema)
+        meta = self.metadata
+        users, addresses, dingalings = createTables(meta, schema)
         meta.create_all()
         createIndexes(meta.bind, schema)
-        try:
-            # The database may decide to create indexes for foreign keys, etc.
-            # so there may be more indexes than expected.
-            insp = Inspector(meta.bind)
-            indexes = insp.get_indexes('users', schema=schema)
-            expected_indexes = [
-                {'unique': False,
-                 'column_names': ['test1', 'test2'],
-                 'name': 'users_t_idx'}]
-            index_names = [d['name'] for d in indexes]
-            for e_index in expected_indexes:
-                assert e_index['name'] in index_names
-                index = indexes[index_names.index(e_index['name'])]
-                for key in e_index:
-                    eq_(e_index[key], index[key])
-
-        finally:
-            addresses.drop()
-            users.drop()
+        # The database may decide to create indexes for foreign keys, etc.
+        # so there may be more indexes than expected.
+        insp = Inspector(meta.bind)
+        indexes = insp.get_indexes('users', schema=schema)
+        expected_indexes = [
+            {'unique': False,
+             'column_names': ['test1', 'test2'],
+             'name': 'users_t_idx'}]
+        index_names = [d['name'] for d in indexes]
+        for e_index in expected_indexes:
+            assert e_index['name'] in index_names
+            index = indexes[index_names.index(e_index['name'])]
+            for key in e_index:
+                eq_(e_index[key], index[key])
 
     def test_get_indexes(self):
         self._test_get_indexes()
@@ -1449,9 +1416,10 @@ class ComponentReflectionTest(fixtures.TestBase):
     def test_get_indexes_with_schema(self):
         self._test_get_indexes(schema='test_schema')
 
+    @testing.provide_metadata
     def _test_get_view_definition(self, schema=None):
-        meta = MetaData(testing.db)
-        (users, addresses) = createTables(meta, schema)
+        meta = self.metadata
+        users, addresses, dingalings = createTables(meta, schema)
         meta.create_all()
         _create_views(meta.bind, schema)
         view_name1 = 'users_v'
@@ -1464,8 +1432,6 @@ class ComponentReflectionTest(fixtures.TestBase):
             self.assert_(v2)
         finally:
             _drop_views(meta.bind, schema)
-            addresses.drop()
-            users.drop()
 
     @testing.requires.views
     def test_get_view_definition(self):
@@ -1476,18 +1442,15 @@ class ComponentReflectionTest(fixtures.TestBase):
     def test_get_view_definition_with_schema(self):
         self._test_get_view_definition(schema='test_schema')
 
+    @testing.only_on("postgresql", "PG specific feature")
+    @testing.provide_metadata
     def _test_get_table_oid(self, table_name, schema=None):
-        if testing.against('postgresql'):
-            meta = MetaData(testing.db)
-            (users, addresses) = createTables(meta, schema)
-            meta.create_all()
-            try:
-                insp = create_inspector(meta.bind)
-                oid = insp.get_table_oid(table_name, schema)
-                self.assert_(isinstance(oid, (int, long)))
-            finally:
-                addresses.drop()
-                users.drop()
+        meta = self.metadata
+        users, addresses, dingalings = createTables(meta, schema)
+        meta.create_all()
+        insp = create_inspector(meta.bind)
+        oid = insp.get_table_oid(table_name, schema)
+        self.assert_(isinstance(oid, (int, long)))
 
     def test_get_table_oid(self):
         self._test_get_table_oid('users')