From: Mike Bayer Date: Thu, 1 Dec 2011 19:21:43 +0000 (-0500) Subject: - [bug] Fixed bug whereby "order_by='foreign_key'" X-Git-Tag: rel_0_7_4~40 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=896bc4c5b6616492c113d80c1328032413d22b5e;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - [bug] Fixed bug whereby "order_by='foreign_key'" option to Inspector.get_table_names wasn't implementing the sort properly, replaced with the existing sort algorithm - clean up metadata usage in reflection tests --- diff --git a/CHANGES b/CHANGES index 04e45d97d5..4d4af021bd 100644 --- a/CHANGES +++ b/CHANGES @@ -129,6 +129,11 @@ CHANGES using a TypeDecorator that "switches" types, like the CHAR/UUID type. + - [bug] Fixed bug whereby "order_by='foreign_key'" + option to Inspector.get_table_names + wasn't implementing the sort properly, replaced + with the existing sort algorithm + - postgresql - [bug] Postgresql dialect memoizes that an ENUM of a particular name was processed diff --git a/lib/sqlalchemy/engine/reflection.py b/lib/sqlalchemy/engine/reflection.py index e87c1ce0fd..a69fc5b98a 100644 --- a/lib/sqlalchemy/engine/reflection.py +++ b/lib/sqlalchemy/engine/reflection.py @@ -27,6 +27,7 @@ methods such as get_table_names, get_columns, etc. import sqlalchemy from sqlalchemy import exc, sql from sqlalchemy import util +from sqlalchemy.util import topological from sqlalchemy.types import TypeEngine from sqlalchemy import schema as sa_schema @@ -150,27 +151,19 @@ class Inspector(object): if hasattr(self.dialect, 'get_table_names'): tnames = self.dialect.get_table_names(self.bind, - schema, - info_cache=self.info_cache) + schema, info_cache=self.info_cache) else: tnames = self.engine.table_names(schema) if order_by == 'foreign_key': - ordered_tnames = tnames[:] - # Order based on foreign key dependencies. + import random + random.shuffle(tnames) + + tuples = [] for tname in tnames: - table_pos = tnames.index(tname) - fkeys = self.get_foreign_keys(tname, schema) - for fkey in fkeys: - rtable = fkey['referred_table'] - if rtable in ordered_tnames: - ref_pos = ordered_tnames.index(rtable) - # Make sure it's lower in the list than anything it - # references. - if table_pos > ref_pos: - ordered_tnames.pop(table_pos) # rtable moves up 1 - # insert just below rtable - ordered_tnames.index(ref_pos, tname) - tnames = ordered_tnames + for fkey in self.get_foreign_keys(tname, schema): + if tname != fkey['referred_table']: + tuples.append((tname, fkey['referred_table'])) + tnames = list(topological.sort(tuples, tnames)) return tnames def get_table_options(self, table_name, schema=None, **kw): diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py index 63a37b112c..4beeab14d7 100644 --- a/test/engine/test_reflection.py +++ b/test/engine/test_reflection.py @@ -20,8 +20,9 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): 'Date is only supported on MSSQL 2008+') @testing.exclude('mysql', '<', (4, 1, 1), 'early types are squirrely') + @testing.provide_metadata def test_basic_reflection(self): - meta = MetaData(testing.db) + meta = self.metadata users = Table('engine_users', meta, Column('user_id', sa.INT, primary_key=True), @@ -53,20 +54,18 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): ) meta.create_all() - try: - meta2 = MetaData() - reflected_users = Table('engine_users', meta2, - autoload=True, - autoload_with=testing.db) - reflected_addresses = Table('engine_email_addresses', - meta2, autoload=True, autoload_with=testing.db) - self.assert_tables_equal(users, reflected_users) - self.assert_tables_equal(addresses, reflected_addresses) - finally: - meta.drop_all() + meta2 = MetaData() + reflected_users = Table('engine_users', meta2, + autoload=True, + autoload_with=testing.db) + reflected_addresses = Table('engine_email_addresses', + meta2, autoload=True, autoload_with=testing.db) + self.assert_tables_equal(users, reflected_users) + self.assert_tables_equal(addresses, reflected_addresses) + @testing.provide_metadata def test_two_foreign_keys(self): - meta = MetaData(testing.db) + meta = self.metadata t1 = Table( 't1', meta, @@ -80,73 +79,67 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): t3 = Table('t3', meta, Column('id', sa.Integer, primary_key=True), test_needs_fk=True) meta.create_all() - try: - meta2 = MetaData() - t1r, t2r, t3r = [Table(x, meta2, autoload=True, - autoload_with=testing.db) for x in ('t1', - 't2', 't3')] - assert t1r.c.t2id.references(t2r.c.id) - assert t1r.c.t3id.references(t3r.c.id) - finally: - meta.drop_all() + meta2 = MetaData() + t1r, t2r, t3r = [Table(x, meta2, autoload=True, + autoload_with=testing.db) for x in ('t1', + 't2', 't3')] + assert t1r.c.t2id.references(t2r.c.id) + assert t1r.c.t3id.references(t3r.c.id) def test_nonexistent(self): meta = MetaData(testing.db) assert_raises(sa.exc.NoSuchTableError, Table, 'nonexistent', meta, autoload=True) + @testing.provide_metadata def test_include_columns(self): - meta = MetaData(testing.db) + meta = self.metadata foo = Table('foo', meta, *[Column(n, sa.String(30)) for n in ['a', 'b', 'c', 'd', 'e', 'f']]) meta.create_all() - try: - meta2 = MetaData(testing.db) - foo = Table('foo', meta2, autoload=True, - include_columns=['b', 'f', 'e']) - # test that cols come back in original order - eq_([c.name for c in foo.c], ['b', 'e', 'f']) - for c in ('b', 'f', 'e'): - assert c in foo.c - for c in ('a', 'c', 'd'): - assert c not in foo.c - - # test against a table which is already reflected - meta3 = MetaData(testing.db) - foo = Table('foo', meta3, autoload=True) - foo = Table('foo', meta3, include_columns=['b', 'f', 'e'], - extend_existing=True) - eq_([c.name for c in foo.c], ['b', 'e', 'f']) - for c in ('b', 'f', 'e'): - assert c in foo.c - for c in ('a', 'c', 'd'): - assert c not in foo.c - finally: - meta.drop_all() + meta2 = MetaData(testing.db) + foo = Table('foo', meta2, autoload=True, + include_columns=['b', 'f', 'e']) + # test that cols come back in original order + eq_([c.name for c in foo.c], ['b', 'e', 'f']) + for c in ('b', 'f', 'e'): + assert c in foo.c + for c in ('a', 'c', 'd'): + assert c not in foo.c + + # test against a table which is already reflected + meta3 = MetaData(testing.db) + foo = Table('foo', meta3, autoload=True) + foo = Table('foo', meta3, include_columns=['b', 'f', 'e'], + extend_existing=True) + eq_([c.name for c in foo.c], ['b', 'e', 'f']) + for c in ('b', 'f', 'e'): + assert c in foo.c + for c in ('a', 'c', 'd'): + assert c not in foo.c @testing.emits_warning(r".*omitted columns") + @testing.provide_metadata def test_include_columns_indexes(self): - m = MetaData(testing.db) + m = self.metadata t1 = Table('t1', m, Column('a', sa.Integer), Column('b', sa.Integer)) sa.Index('foobar', t1.c.a, t1.c.b) sa.Index('bat', t1.c.a) m.create_all() - try: - m2 = MetaData(testing.db) - t2 = Table('t1', m2, autoload=True) - assert len(t2.indexes) == 2 + m2 = MetaData(testing.db) + t2 = Table('t1', m2, autoload=True) + assert len(t2.indexes) == 2 - m2 = MetaData(testing.db) - t2 = Table('t1', m2, autoload=True, include_columns=['a']) - assert len(t2.indexes) == 1 + m2 = MetaData(testing.db) + t2 = Table('t1', m2, autoload=True, include_columns=['a']) + assert len(t2.indexes) == 1 - m2 = MetaData(testing.db) - t2 = Table('t1', m2, autoload=True, include_columns=['a', 'b']) - assert len(t2.indexes) == 2 - finally: - m.drop_all() + m2 = MetaData(testing.db) + t2 = Table('t1', m2, autoload=True, include_columns=['a', 'b']) + assert len(t2.indexes) == 2 + @testing.provide_metadata def test_autoincrement_col(self): """test that 'autoincrement' is reflected according to sqla's policy. @@ -156,7 +149,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): """ - meta = MetaData(testing.db) + meta = self.metadata t1 = Table('test', meta, Column('id', sa.Integer, primary_key=True), Column('data', sa.String(50)), @@ -170,19 +163,16 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): mysql_engine='MyISAM' ) meta.create_all() - try: - m2 = MetaData(testing.db) - t1a = Table('test', m2, autoload=True) - assert t1a._autoincrement_column is t1a.c.id - - t2a = Table('test2', m2, autoload=True) - assert t2a._autoincrement_column is t2a.c.id2 + m2 = MetaData(testing.db) + t1a = Table('test', m2, autoload=True) + assert t1a._autoincrement_column is t1a.c.id - finally: - meta.drop_all() + t2a = Table('test2', m2, autoload=True) + assert t2a._autoincrement_column is t2a.c.id2 + @testing.provide_metadata def test_unknown_types(self): - meta = MetaData(testing.db) + meta = self.metadata t = Table("test", meta, Column('foo', sa.DateTime)) @@ -201,10 +191,10 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): finally: testing.db.dialect.ischema_names = ischema_names - t.drop() + @testing.provide_metadata def test_basic_override(self): - meta = MetaData(testing.db) + meta = self.metadata table = Table( 'override_test', meta, Column('col1', sa.Integer, primary_key=True), @@ -214,24 +204,22 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): table.create() meta2 = MetaData(testing.db) - try: - table = Table( - 'override_test', meta2, - Column('col2', sa.Unicode()), - Column('col4', sa.String(30)), autoload=True) - - self.assert_(isinstance(table.c.col1.type, sa.Integer)) - self.assert_(isinstance(table.c.col2.type, sa.Unicode)) - self.assert_(isinstance(table.c.col4.type, sa.String)) - finally: - table.drop() + table = Table( + 'override_test', meta2, + Column('col2', sa.Unicode()), + Column('col4', sa.String(30)), autoload=True) + + self.assert_(isinstance(table.c.col1.type, sa.Integer)) + self.assert_(isinstance(table.c.col2.type, sa.Unicode)) + self.assert_(isinstance(table.c.col4.type, sa.String)) + @testing.provide_metadata def test_override_pkfk(self): """test that you can override columns which contain foreign keys to other reflected tables, where the foreign key column is also a primary key column""" - meta = MetaData(testing.db) + meta = self.metadata users = Table('users', meta, Column('id', sa.Integer, primary_key=True), Column('name', sa.String(30))) @@ -241,38 +229,35 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): meta.create_all() - try: - meta2 = MetaData(testing.db) - a2 = Table('addresses', meta2, - Column('id', sa.Integer, - sa.ForeignKey('users.id'), primary_key=True), - autoload=True) - u2 = Table('users', meta2, autoload=True) - - assert list(a2.primary_key) == [a2.c.id] - assert list(u2.primary_key) == [u2.c.id] - assert u2.join(a2).onclause.compare(u2.c.id==a2.c.id) - - meta3 = MetaData(testing.db) - u3 = Table('users', meta3, autoload=True) - a3 = Table('addresses', meta3, - Column('id', sa.Integer, sa.ForeignKey('users.id'), - primary_key=True), - autoload=True) - - assert list(a3.primary_key) == [a3.c.id] - assert list(u3.primary_key) == [u3.c.id] - assert u3.join(a3).onclause.compare(u3.c.id==a3.c.id) - - finally: - meta.drop_all() + meta2 = MetaData(testing.db) + a2 = Table('addresses', meta2, + Column('id', sa.Integer, + sa.ForeignKey('users.id'), primary_key=True), + autoload=True) + u2 = Table('users', meta2, autoload=True) + + assert list(a2.primary_key) == [a2.c.id] + assert list(u2.primary_key) == [u2.c.id] + assert u2.join(a2).onclause.compare(u2.c.id==a2.c.id) + + meta3 = MetaData(testing.db) + u3 = Table('users', meta3, autoload=True) + a3 = Table('addresses', meta3, + Column('id', sa.Integer, sa.ForeignKey('users.id'), + primary_key=True), + autoload=True) + + assert list(a3.primary_key) == [a3.c.id] + assert list(u3.primary_key) == [u3.c.id] + assert u3.join(a3).onclause.compare(u3.c.id==a3.c.id) + @testing.provide_metadata def test_override_nonexistent_fk(self): """test that you can override columns and create new foreign keys to other reflected tables which have no foreign keys. this is common with MySQL MyISAM tables.""" - meta = MetaData(testing.db) + meta = self.metadata users = Table('users', meta, Column('id', sa.Integer, primary_key=True), Column('name', sa.String(30))) @@ -282,53 +267,50 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): Column('user_id', sa.Integer)) meta.create_all() - try: - meta2 = MetaData(testing.db) - a2 = Table('addresses', meta2, - Column('user_id',sa.Integer, sa.ForeignKey('users.id')), + meta2 = MetaData(testing.db) + a2 = Table('addresses', meta2, + Column('user_id',sa.Integer, sa.ForeignKey('users.id')), + autoload=True) + u2 = Table('users', meta2, autoload=True) + assert len(a2.c.user_id.foreign_keys) == 1 + assert len(a2.foreign_keys) == 1 + assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id] + assert [c.parent for c in a2.c.user_id.foreign_keys] \ + == [a2.c.user_id] + assert list(a2.c.user_id.foreign_keys)[0].parent \ + is a2.c.user_id + assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id) + meta3 = MetaData(testing.db) + + u3 = Table('users', meta3, autoload=True) + + a3 = Table('addresses', meta3, Column('user_id', + sa.Integer, sa.ForeignKey('users.id')), autoload=True) - u2 = Table('users', meta2, autoload=True) - assert len(a2.c.user_id.foreign_keys) == 1 - assert len(a2.foreign_keys) == 1 - assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id] - assert [c.parent for c in a2.c.user_id.foreign_keys] \ - == [a2.c.user_id] - assert list(a2.c.user_id.foreign_keys)[0].parent \ - is a2.c.user_id - assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id) - meta3 = MetaData(testing.db) - - u3 = Table('users', meta3, autoload=True) - - a3 = Table('addresses', meta3, Column('user_id', - sa.Integer, sa.ForeignKey('users.id')), - autoload=True) - assert u3.join(a3).onclause.compare(u3.c.id == a3.c.user_id) - - meta4 = MetaData(testing.db) - - u4 = Table('users', meta4, - Column('id', sa.Integer, key='u_id', primary_key=True), - autoload=True) - - a4 = Table( - 'addresses', - meta4, - Column('id', sa.Integer, key='street', - primary_key=True), - Column('street', sa.String(30), key='user_id'), - Column('user_id', sa.Integer, sa.ForeignKey('users.u_id' - ), key='id'), - autoload=True, - ) - assert u4.join(a4).onclause.compare(u4.c.u_id == a4.c.id) - assert list(u4.primary_key) == [u4.c.u_id] - assert len(u4.columns) == 2 - assert len(u4.constraints) == 1 - assert len(a4.columns) == 3 - assert len(a4.constraints) == 2 - finally: - meta.drop_all() + assert u3.join(a3).onclause.compare(u3.c.id == a3.c.user_id) + + meta4 = MetaData(testing.db) + + u4 = Table('users', meta4, + Column('id', sa.Integer, key='u_id', primary_key=True), + autoload=True) + + a4 = Table( + 'addresses', + meta4, + Column('id', sa.Integer, key='street', + primary_key=True), + Column('street', sa.String(30), key='user_id'), + Column('user_id', sa.Integer, sa.ForeignKey('users.u_id' + ), key='id'), + autoload=True, + ) + assert u4.join(a4).onclause.compare(u4.c.u_id == a4.c.id) + assert list(u4.primary_key) == [u4.c.u_id] + assert len(u4.columns) == 2 + assert len(u4.constraints) == 1 + assert len(a4.columns) == 3 + assert len(a4.constraints) == 2 @testing.provide_metadata def test_override_composite_fk(self): @@ -369,12 +351,12 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): + @testing.provide_metadata def test_override_keys(self): """test that columns can be overridden with a 'key', and that ForeignKey targeting during reflection still works.""" - - meta = MetaData(testing.db) + meta = self.metadata a1 = Table('a', meta, Column('x', sa.Integer, primary_key=True), Column('z', sa.Integer), @@ -385,16 +367,13 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): test_needs_fk=True ) meta.create_all() - try: - m2 = MetaData(testing.db) - a2 = Table('a', m2, - Column('x', sa.Integer, primary_key=True, key='x1'), - autoload=True) - b2 = Table('b', m2, autoload=True) - assert a2.join(b2).onclause.compare(a2.c.x1 == b2.c.y) - assert b2.c.y.references(a2.c.x1) - finally: - meta.drop_all() + m2 = MetaData(testing.db) + a2 = Table('a', m2, + Column('x', sa.Integer, primary_key=True, key='x1'), + autoload=True) + b2 = Table('b', m2, autoload=True) + assert a2.join(b2).onclause.compare(a2.c.x1 == b2.c.y) + assert b2.c.y.references(a2.c.x1) @testing.provide_metadata def test_nonreflected_fk_raises(self): @@ -422,12 +401,13 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): assert_raises(sa.exc.NoReferencedColumnError, a2.join, b2) @testing.exclude('mysql', '<', (4, 1, 1), 'innodb funkiness') + @testing.provide_metadata def test_override_existing_fk(self): """test that you can override columns and specify new foreign keys to other reflected tables, on columns which *do* already have that foreign key, and that the FK is not duped. """ - meta = MetaData(testing.db) + meta = self.metadata users = Table('users', meta, Column('id', sa.Integer, primary_key=True), Column('name', sa.String(30)), @@ -438,46 +418,42 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): test_needs_fk=True) meta.create_all() - try: - meta2 = MetaData(testing.db) - a2 = Table('addresses', meta2, - Column('user_id',sa.Integer, sa.ForeignKey('users.id')), - autoload=True) - u2 = Table('users', meta2, autoload=True) - s = sa.select([a2]) - - assert s.c.user_id is not None - assert len(a2.foreign_keys) == 1 - assert len(a2.c.user_id.foreign_keys) == 1 - assert len(a2.constraints) == 2 - assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id] - assert [c.parent for c in a2.c.user_id.foreign_keys] \ - == [a2.c.user_id] - assert list(a2.c.user_id.foreign_keys)[0].parent \ - is a2.c.user_id - assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id) - - meta2 = MetaData(testing.db) - u2 = Table('users', meta2, Column('id', sa.Integer, - primary_key=True), autoload=True) - a2 = Table('addresses', meta2, Column('id', sa.Integer, - primary_key=True), Column('user_id', sa.Integer, - sa.ForeignKey('users.id')), autoload=True) - s = sa.select([a2]) - - assert s.c.user_id is not None - assert len(a2.foreign_keys) == 1 - assert len(a2.c.user_id.foreign_keys) == 1 - assert len(a2.constraints) == 2 - assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id] - assert [c.parent for c in a2.c.user_id.foreign_keys] \ - == [a2.c.user_id] - assert list(a2.c.user_id.foreign_keys)[0].parent \ - is a2.c.user_id - assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id) + meta2 = MetaData(testing.db) + a2 = Table('addresses', meta2, + Column('user_id',sa.Integer, sa.ForeignKey('users.id')), + autoload=True) + u2 = Table('users', meta2, autoload=True) + s = sa.select([a2]) + + assert s.c.user_id is not None + assert len(a2.foreign_keys) == 1 + assert len(a2.c.user_id.foreign_keys) == 1 + assert len(a2.constraints) == 2 + assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id] + assert [c.parent for c in a2.c.user_id.foreign_keys] \ + == [a2.c.user_id] + assert list(a2.c.user_id.foreign_keys)[0].parent \ + is a2.c.user_id + assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id) - finally: - meta.drop_all() + meta2 = MetaData(testing.db) + u2 = Table('users', meta2, Column('id', sa.Integer, + primary_key=True), autoload=True) + a2 = Table('addresses', meta2, Column('id', sa.Integer, + primary_key=True), Column('user_id', sa.Integer, + sa.ForeignKey('users.id')), autoload=True) + s = sa.select([a2]) + + assert s.c.user_id is not None + assert len(a2.foreign_keys) == 1 + assert len(a2.c.user_id.foreign_keys) == 1 + assert len(a2.constraints) == 2 + assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id] + assert [c.parent for c in a2.c.user_id.foreign_keys] \ + == [a2.c.user_id] + assert list(a2.c.user_id.foreign_keys)[0].parent \ + is a2.c.user_id + assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id) def test_pks_not_uniques(self): """test that primary key reflection not tripped up by unique @@ -539,10 +515,11 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): testing.db.execute("drop table book") @testing.exclude('mysql', '<', (4, 1, 1), 'innodb funkiness') + @testing.provide_metadata def test_composite_fk(self): """test reflection of composite foreign keys""" - meta = MetaData(testing.db) + meta = self.metadata multi = Table( 'multi', meta, Column('multi_id', sa.Integer, primary_key=True), @@ -565,22 +542,19 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): ) meta.create_all() - try: - meta2 = MetaData() - table = Table('multi', meta2, autoload=True, - autoload_with=testing.db) - table2 = Table('multi2', meta2, autoload=True, - autoload_with=testing.db) - self.assert_tables_equal(multi, table) - self.assert_tables_equal(multi2, table2) - j = sa.join(table, table2) - - self.assert_(sa.and_(table.c.multi_id == table2.c.foo, - table.c.multi_rev == table2.c.bar, - table.c.multi_hoho - == table2.c.lala).compare(j.onclause)) - finally: - meta.drop_all() + meta2 = MetaData() + table = Table('multi', meta2, autoload=True, + autoload_with=testing.db) + table2 = Table('multi2', meta2, autoload=True, + autoload_with=testing.db) + self.assert_tables_equal(multi, table) + self.assert_tables_equal(multi2, table2) + j = sa.join(table, table2) + + self.assert_(sa.and_(table.c.multi_id == table2.c.foo, + table.c.multi_rev == table2.c.bar, + table.c.multi_hoho + == table2.c.lala).compare(j.onclause)) @testing.crashes('oracle', 'FIXME: unknown, confirm not fails_on') @@ -588,12 +562,13 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): "FIXME: should be supported via the " "DELIMITED env var but that breaks " "everything else for now") + @testing.provide_metadata def test_reserved(self): # check a table that uses an SQL reserved name doesn't cause an # error - meta = MetaData(testing.db) + meta = self.metadata table_a = Table('select', meta, Column('not', sa.Integer, primary_key=True), Column('from', sa.String(12), nullable=False), @@ -625,13 +600,11 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): meta.create_all() index_c.drop() meta2 = MetaData(testing.db) - try: - table_a2 = Table('select', meta2, autoload=True) - table_b2 = Table('false', meta2, autoload=True) - table_c2 = Table('is', meta2, autoload=True) - finally: - meta.drop_all() + table_a2 = Table('select', meta2, autoload=True) + table_b2 = Table('false', meta2, autoload=True) + table_c2 = Table('is', meta2, autoload=True) + @testing.provide_metadata def test_reflect_all(self): existing = testing.db.table_names() @@ -642,57 +615,55 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): self.assert_(name not in existing) self.assert_('rt_f' not in existing) - baseline = MetaData(testing.db) + baseline = self.metadata for name in names: Table(name, baseline, Column('id', sa.Integer, primary_key=True)) baseline.create_all() + m1 = MetaData(testing.db) + self.assert_(not m1.tables) + m1.reflect() + self.assert_(nameset.issubset(set(m1.tables.keys()))) + + m2 = MetaData() + m2.reflect(testing.db, only=['rt_a', 'rt_b']) + self.assert_(set(m2.tables.keys()) == set(['rt_a', 'rt_b'])) + + m3 = MetaData() + c = testing.db.connect() + m3.reflect(bind=c, only=lambda name, meta: name == 'rt_c') + self.assert_(set(m3.tables.keys()) == set(['rt_c'])) + + m4 = MetaData(testing.db) try: - m1 = MetaData(testing.db) - self.assert_(not m1.tables) - m1.reflect() - self.assert_(nameset.issubset(set(m1.tables.keys()))) - - m2 = MetaData() - m2.reflect(testing.db, only=['rt_a', 'rt_b']) - self.assert_(set(m2.tables.keys()) == set(['rt_a', 'rt_b'])) - - m3 = MetaData() - c = testing.db.connect() - m3.reflect(bind=c, only=lambda name, meta: name == 'rt_c') - self.assert_(set(m3.tables.keys()) == set(['rt_c'])) - - m4 = MetaData(testing.db) - try: - m4.reflect(only=['rt_a', 'rt_f']) - self.assert_(False) - except sa.exc.InvalidRequestError, e: - self.assert_(e.args[0].endswith('(rt_f)')) - - m5 = MetaData(testing.db) - m5.reflect(only=[]) - self.assert_(not m5.tables) - - m6 = MetaData(testing.db) - m6.reflect(only=lambda n, m: False) - self.assert_(not m6.tables) - - m7 = MetaData(testing.db, reflect=True) - self.assert_(nameset.issubset(set(m7.tables.keys()))) - - try: - m8 = MetaData(reflect=True) - self.assert_(False) - except sa.exc.ArgumentError, e: - self.assert_(e.args[0] - == 'A bind must be supplied in ' - 'conjunction with reflect=True') - finally: - baseline.drop_all() + m4.reflect(only=['rt_a', 'rt_f']) + self.assert_(False) + except sa.exc.InvalidRequestError, e: + self.assert_(e.args[0].endswith('(rt_f)')) + + m5 = MetaData(testing.db) + m5.reflect(only=[]) + self.assert_(not m5.tables) + + m6 = MetaData(testing.db) + m6.reflect(only=lambda n, m: False) + self.assert_(not m6.tables) + + m7 = MetaData(testing.db, reflect=True) + self.assert_(nameset.issubset(set(m7.tables.keys()))) + + try: + m8 = MetaData(reflect=True) + self.assert_(False) + except sa.exc.ArgumentError, e: + self.assert_(e.args[0] + == 'A bind must be supplied in ' + 'conjunction with reflect=True') if existing: print "Other tables present in database, skipping some checks." else: + baseline.drop_all() m9 = MetaData(testing.db) m9.reflect() self.assert_(not m9.tables) @@ -709,8 +680,9 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): i = Inspector.from_engine(testing.db) assert not c.closed + @testing.provide_metadata def test_index_reflection(self): - m1 = MetaData(testing.db) + m1 = self.metadata t1 = Table('party', m1, Column('id', sa.Integer, nullable=False), Column('name', sa.String(20), index=True) @@ -718,32 +690,29 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): i1 = sa.Index('idx1', t1.c.id, unique=True) i2 = sa.Index('idx2', t1.c.name, t1.c.id, unique=False) m1.create_all() - try: - m2 = MetaData(testing.db) - t2 = Table('party', m2, autoload=True) - - assert len(t2.indexes) == 3 - # Make sure indexes are in the order we expect them in - tmp = [(idx.name, idx) for idx in t2.indexes] - tmp.sort() - r1, r2, r3 = [idx[1] for idx in tmp] - - assert r1.name == 'idx1' - assert r2.name == 'idx2' - assert r1.unique == True - assert r2.unique == False - assert r3.unique == False - assert set([t2.c.id]) == set(r1.columns) - assert set([t2.c.name, t2.c.id]) == set(r2.columns) - assert set([t2.c.name]) == set(r3.columns) - finally: - m1.drop_all() + m2 = MetaData(testing.db) + t2 = Table('party', m2, autoload=True) + + assert len(t2.indexes) == 3 + # Make sure indexes are in the order we expect them in + tmp = [(idx.name, idx) for idx in t2.indexes] + tmp.sort() + r1, r2, r3 = [idx[1] for idx in tmp] + + assert r1.name == 'idx1' + assert r2.name == 'idx2' + assert r1.unique == True + assert r2.unique == False + assert r3.unique == False + assert set([t2.c.id]) == set(r1.columns) + assert set([t2.c.name, t2.c.id]) == set(r2.columns) + assert set([t2.c.name]) == set(r3.columns) @testing.requires.views @testing.provide_metadata def test_views(self): metadata = self.metadata - users, addresses = createTables(metadata, None) + users, addresses, dingalings = createTables(metadata) try: metadata.create_all() _create_views(metadata.bind, None) @@ -765,7 +734,7 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): @testing.provide_metadata def test_reflect_all_with_views(self): metadata = self.metadata - users, addresses = createTables(metadata, None) + users, addresses, dingalings = createTables(metadata, None) try: metadata.create_all() _create_views(metadata.bind, None) @@ -774,15 +743,15 @@ class ReflectionTest(fixtures.TestBase, ComparesTables): m2.reflect(views=False) eq_( set(m2.tables), - set([u'users', u'email_addresses']) + set(['users', 'email_addresses', 'dingalings']) ) m2 = MetaData(testing.db) m2.reflect(views=True) eq_( set(m2.tables), - set([u'email_addresses_v', u'users_v', - u'users', u'email_addresses']) + set(['email_addresses_v', 'users_v', + 'users', 'dingalings', 'email_addresses']) ) finally: _drop_views(metadata.bind) @@ -1034,8 +1003,10 @@ class SchemaTest(fixtures.TestBase): metadata.create_all() m2 = MetaData(schema="test_schema", bind=testing.db) m2.reflect() - eq_(m2.tables.keys(), - [u'test_schema.users', u'test_schema.email_addresses'] + eq_( + set(m2.tables), + set(['test_schema.dingalings', 'test_schema.users', + 'test_schema.email_addresses']) ) class HasSequenceTest(fixtures.TestBase): @@ -1085,13 +1056,9 @@ class HasSequenceTest(fixtures.TestBase): def createTables(meta, schema=None): if schema: - parent_user_id = Column('parent_user_id', sa.Integer, - sa.ForeignKey('%s.users.user_id' % schema) - ) + schema_prefix = schema + "." else: - parent_user_id = Column('parent_user_id', sa.Integer, - sa.ForeignKey('users.user_id') - ) + schema_prefix = "" users = Table('users', meta, Column('user_id', sa.INT, primary_key=True), @@ -1102,7 +1069,8 @@ def createTables(meta, schema=None): Column('test4', sa.Numeric(10, 2), nullable = False), Column('test5', sa.Date), Column('test5_1', sa.TIMESTAMP), - parent_user_id, + Column('parent_user_id', sa.Integer, + sa.ForeignKey('%susers.user_id' % schema_prefix)), Column('test6', sa.Date, nullable=False), Column('test7', sa.Text), Column('test8', sa.LargeBinary), @@ -1112,6 +1080,14 @@ def createTables(meta, schema=None): schema=schema, test_needs_fk=True, ) + dingalings = Table("dingalings", meta, + Column('dingaling_id', sa.Integer, primary_key=True), + Column('address_id', sa.Integer, + sa.ForeignKey('%semail_addresses.address_id' % schema_prefix)), + Column('data', sa.String(30)), + schema=schema, + test_needs_fk=True, + ) addresses = Table('email_addresses', meta, Column('address_id', sa.Integer), Column('remote_user_id', sa.Integer, @@ -1121,7 +1097,8 @@ def createTables(meta, schema=None): schema=schema, test_needs_fk=True, ) - return (users, addresses) + + return (users, addresses, dingalings) def createIndexes(con, schema=None): fullname = 'users' @@ -1242,10 +1219,11 @@ class ComponentReflectionTest(fixtures.TestBase): insp = Inspector(testing.db) eq_(insp.default_schema_name, testing.db.dialect.default_schema_name) + @testing.provide_metadata def _test_get_table_names(self, schema=None, table_type='table', order_by=None): - meta = MetaData(testing.db) - (users, addresses) = createTables(meta, schema) + meta = self.metadata + users, addresses, dingalings = createTables(meta, schema) meta.create_all() _create_views(meta.bind, schema) try: @@ -1257,20 +1235,21 @@ class ComponentReflectionTest(fixtures.TestBase): else: table_names = insp.get_table_names(schema, order_by=order_by) - table_names.sort() if order_by == 'foreign_key': - answer = ['users', 'email_addresses'] + answer = ['dingalings', 'email_addresses', 'users'] + eq_(table_names, answer) else: - answer = ['email_addresses', 'users'] - eq_(table_names, answer) + answer = ['dingalings', 'email_addresses', 'users'] + eq_(sorted(table_names), answer) finally: _drop_views(meta.bind, schema) - addresses.drop() - users.drop() def test_get_table_names(self): self._test_get_table_names() + def test_get_table_names_fks(self): + self._test_get_table_names(order_by='foreign_key') + @testing.requires.schemas def test_get_table_names_with_schema(self): self._test_get_table_names('test_schema') @@ -1285,7 +1264,7 @@ class ComponentReflectionTest(fixtures.TestBase): def _test_get_columns(self, schema=None, table_type='table'): meta = MetaData(testing.db) - users, addresses = createTables(meta, schema) + users, addresses, dingalings = createTables(meta, schema) table_names = ['users', 'email_addresses'] meta.create_all() if table_type == 'view': @@ -1331,8 +1310,7 @@ class ComponentReflectionTest(fixtures.TestBase): finally: if table_type == 'view': _drop_views(meta.bind, schema) - addresses.drop() - users.drop() + meta.drop_all() def test_get_columns(self): self._test_get_columns() @@ -1350,29 +1328,25 @@ class ComponentReflectionTest(fixtures.TestBase): def test_get_view_columns_with_schema(self): self._test_get_columns(schema='test_schema', table_type='view') + @testing.provide_metadata def _test_get_primary_keys(self, schema=None): - meta = MetaData(testing.db) - (users, addresses) = createTables(meta, schema) + meta = self.metadata + users, addresses, dingalings = createTables(meta, schema) meta.create_all() insp = Inspector(meta.bind) - try: - users_pkeys = insp.get_primary_keys(users.name, - schema=schema) - eq_(users_pkeys, ['user_id']) - addr_cons = insp.get_pk_constraint(addresses.name, - schema=schema) + users_pkeys = insp.get_primary_keys(users.name, + schema=schema) + eq_(users_pkeys, ['user_id']) + addr_cons = insp.get_pk_constraint(addresses.name, + schema=schema) - addr_pkeys = addr_cons['constrained_columns'] - eq_(addr_pkeys, ['address_id']) + addr_pkeys = addr_cons['constrained_columns'] + eq_(addr_pkeys, ['address_id']) - @testing.requires.reflects_pk_names - def go(): - eq_(addr_cons['name'], 'email_ad_pk') - go() - - finally: - addresses.drop() - users.drop() + @testing.requires.reflects_pk_names + def go(): + eq_(addr_cons['name'], 'email_ad_pk') + go() def test_get_primary_keys(self): self._test_get_primary_keys() @@ -1381,34 +1355,31 @@ class ComponentReflectionTest(fixtures.TestBase): def test_get_primary_keys_with_schema(self): self._test_get_primary_keys(schema='test_schema') + @testing.provide_metadata def _test_get_foreign_keys(self, schema=None): - meta = MetaData(testing.db) - (users, addresses) = createTables(meta, schema) + meta = self.metadata + users, addresses, dingalings = createTables(meta, schema) meta.create_all() insp = Inspector(meta.bind) - try: - expected_schema = schema - # users - users_fkeys = insp.get_foreign_keys(users.name, - schema=schema) - fkey1 = users_fkeys[0] - self.assert_(fkey1['name'] is not None) - eq_(fkey1['referred_schema'], expected_schema) - eq_(fkey1['referred_table'], users.name) - eq_(fkey1['referred_columns'], ['user_id', ]) - eq_(fkey1['constrained_columns'], ['parent_user_id']) - #addresses - addr_fkeys = insp.get_foreign_keys(addresses.name, - schema=schema) - fkey1 = addr_fkeys[0] - self.assert_(fkey1['name'] is not None) - eq_(fkey1['referred_schema'], expected_schema) - eq_(fkey1['referred_table'], users.name) - eq_(fkey1['referred_columns'], ['user_id', ]) - eq_(fkey1['constrained_columns'], ['remote_user_id']) - finally: - addresses.drop() - users.drop() + expected_schema = schema + # users + users_fkeys = insp.get_foreign_keys(users.name, + schema=schema) + fkey1 = users_fkeys[0] + self.assert_(fkey1['name'] is not None) + eq_(fkey1['referred_schema'], expected_schema) + eq_(fkey1['referred_table'], users.name) + eq_(fkey1['referred_columns'], ['user_id', ]) + eq_(fkey1['constrained_columns'], ['parent_user_id']) + #addresses + addr_fkeys = insp.get_foreign_keys(addresses.name, + schema=schema) + fkey1 = addr_fkeys[0] + self.assert_(fkey1['name'] is not None) + eq_(fkey1['referred_schema'], expected_schema) + eq_(fkey1['referred_table'], users.name) + eq_(fkey1['referred_columns'], ['user_id', ]) + eq_(fkey1['constrained_columns'], ['remote_user_id']) def test_get_foreign_keys(self): self._test_get_foreign_keys() @@ -1417,30 +1388,26 @@ class ComponentReflectionTest(fixtures.TestBase): def test_get_foreign_keys_with_schema(self): self._test_get_foreign_keys(schema='test_schema') + @testing.provide_metadata def _test_get_indexes(self, schema=None): - meta = MetaData(testing.db) - (users, addresses) = createTables(meta, schema) + meta = self.metadata + users, addresses, dingalings = createTables(meta, schema) meta.create_all() createIndexes(meta.bind, schema) - try: - # The database may decide to create indexes for foreign keys, etc. - # so there may be more indexes than expected. - insp = Inspector(meta.bind) - indexes = insp.get_indexes('users', schema=schema) - expected_indexes = [ - {'unique': False, - 'column_names': ['test1', 'test2'], - 'name': 'users_t_idx'}] - index_names = [d['name'] for d in indexes] - for e_index in expected_indexes: - assert e_index['name'] in index_names - index = indexes[index_names.index(e_index['name'])] - for key in e_index: - eq_(e_index[key], index[key]) - - finally: - addresses.drop() - users.drop() + # The database may decide to create indexes for foreign keys, etc. + # so there may be more indexes than expected. + insp = Inspector(meta.bind) + indexes = insp.get_indexes('users', schema=schema) + expected_indexes = [ + {'unique': False, + 'column_names': ['test1', 'test2'], + 'name': 'users_t_idx'}] + index_names = [d['name'] for d in indexes] + for e_index in expected_indexes: + assert e_index['name'] in index_names + index = indexes[index_names.index(e_index['name'])] + for key in e_index: + eq_(e_index[key], index[key]) def test_get_indexes(self): self._test_get_indexes() @@ -1449,9 +1416,10 @@ class ComponentReflectionTest(fixtures.TestBase): def test_get_indexes_with_schema(self): self._test_get_indexes(schema='test_schema') + @testing.provide_metadata def _test_get_view_definition(self, schema=None): - meta = MetaData(testing.db) - (users, addresses) = createTables(meta, schema) + meta = self.metadata + users, addresses, dingalings = createTables(meta, schema) meta.create_all() _create_views(meta.bind, schema) view_name1 = 'users_v' @@ -1464,8 +1432,6 @@ class ComponentReflectionTest(fixtures.TestBase): self.assert_(v2) finally: _drop_views(meta.bind, schema) - addresses.drop() - users.drop() @testing.requires.views def test_get_view_definition(self): @@ -1476,18 +1442,15 @@ class ComponentReflectionTest(fixtures.TestBase): def test_get_view_definition_with_schema(self): self._test_get_view_definition(schema='test_schema') + @testing.only_on("postgresql", "PG specific feature") + @testing.provide_metadata def _test_get_table_oid(self, table_name, schema=None): - if testing.against('postgresql'): - meta = MetaData(testing.db) - (users, addresses) = createTables(meta, schema) - meta.create_all() - try: - insp = create_inspector(meta.bind) - oid = insp.get_table_oid(table_name, schema) - self.assert_(isinstance(oid, (int, long))) - finally: - addresses.drop() - users.drop() + meta = self.metadata + users, addresses, dingalings = createTables(meta, schema) + meta.create_all() + insp = create_inspector(meta.bind) + oid = insp.get_table_oid(table_name, schema) + self.assert_(isinstance(oid, (int, long))) def test_get_table_oid(self): self._test_get_table_oid('users')