'Date is only supported on MSSQL 2008+')
@testing.exclude('mysql', '<', (4, 1, 1),
'early types are squirrely')
+ @testing.provide_metadata
def test_basic_reflection(self):
- meta = MetaData(testing.db)
+ meta = self.metadata
users = Table('engine_users', meta,
Column('user_id', sa.INT, primary_key=True),
)
meta.create_all()
- try:
- meta2 = MetaData()
- reflected_users = Table('engine_users', meta2,
- autoload=True,
- autoload_with=testing.db)
- reflected_addresses = Table('engine_email_addresses',
- meta2, autoload=True, autoload_with=testing.db)
- self.assert_tables_equal(users, reflected_users)
- self.assert_tables_equal(addresses, reflected_addresses)
- finally:
- meta.drop_all()
+ meta2 = MetaData()
+ reflected_users = Table('engine_users', meta2,
+ autoload=True,
+ autoload_with=testing.db)
+ reflected_addresses = Table('engine_email_addresses',
+ meta2, autoload=True, autoload_with=testing.db)
+ self.assert_tables_equal(users, reflected_users)
+ self.assert_tables_equal(addresses, reflected_addresses)
+ @testing.provide_metadata
def test_two_foreign_keys(self):
- meta = MetaData(testing.db)
+ meta = self.metadata
t1 = Table(
't1',
meta,
t3 = Table('t3', meta, Column('id', sa.Integer,
primary_key=True), test_needs_fk=True)
meta.create_all()
- try:
- meta2 = MetaData()
- t1r, t2r, t3r = [Table(x, meta2, autoload=True,
- autoload_with=testing.db) for x in ('t1',
- 't2', 't3')]
- assert t1r.c.t2id.references(t2r.c.id)
- assert t1r.c.t3id.references(t3r.c.id)
- finally:
- meta.drop_all()
+ meta2 = MetaData()
+ t1r, t2r, t3r = [Table(x, meta2, autoload=True,
+ autoload_with=testing.db) for x in ('t1',
+ 't2', 't3')]
+ assert t1r.c.t2id.references(t2r.c.id)
+ assert t1r.c.t3id.references(t3r.c.id)
def test_nonexistent(self):
meta = MetaData(testing.db)
assert_raises(sa.exc.NoSuchTableError, Table, 'nonexistent',
meta, autoload=True)
+ @testing.provide_metadata
def test_include_columns(self):
- meta = MetaData(testing.db)
+ meta = self.metadata
foo = Table('foo', meta, *[Column(n, sa.String(30))
for n in ['a', 'b', 'c', 'd', 'e', 'f']])
meta.create_all()
- try:
- meta2 = MetaData(testing.db)
- foo = Table('foo', meta2, autoload=True,
- include_columns=['b', 'f', 'e'])
- # test that cols come back in original order
- eq_([c.name for c in foo.c], ['b', 'e', 'f'])
- for c in ('b', 'f', 'e'):
- assert c in foo.c
- for c in ('a', 'c', 'd'):
- assert c not in foo.c
-
- # test against a table which is already reflected
- meta3 = MetaData(testing.db)
- foo = Table('foo', meta3, autoload=True)
- foo = Table('foo', meta3, include_columns=['b', 'f', 'e'],
- extend_existing=True)
- eq_([c.name for c in foo.c], ['b', 'e', 'f'])
- for c in ('b', 'f', 'e'):
- assert c in foo.c
- for c in ('a', 'c', 'd'):
- assert c not in foo.c
- finally:
- meta.drop_all()
+ meta2 = MetaData(testing.db)
+ foo = Table('foo', meta2, autoload=True,
+ include_columns=['b', 'f', 'e'])
+ # test that cols come back in original order
+ eq_([c.name for c in foo.c], ['b', 'e', 'f'])
+ for c in ('b', 'f', 'e'):
+ assert c in foo.c
+ for c in ('a', 'c', 'd'):
+ assert c not in foo.c
+
+ # test against a table which is already reflected
+ meta3 = MetaData(testing.db)
+ foo = Table('foo', meta3, autoload=True)
+ foo = Table('foo', meta3, include_columns=['b', 'f', 'e'],
+ extend_existing=True)
+ eq_([c.name for c in foo.c], ['b', 'e', 'f'])
+ for c in ('b', 'f', 'e'):
+ assert c in foo.c
+ for c in ('a', 'c', 'd'):
+ assert c not in foo.c
@testing.emits_warning(r".*omitted columns")
+ @testing.provide_metadata
def test_include_columns_indexes(self):
- m = MetaData(testing.db)
+ m = self.metadata
t1 = Table('t1', m, Column('a', sa.Integer), Column('b', sa.Integer))
sa.Index('foobar', t1.c.a, t1.c.b)
sa.Index('bat', t1.c.a)
m.create_all()
- try:
- m2 = MetaData(testing.db)
- t2 = Table('t1', m2, autoload=True)
- assert len(t2.indexes) == 2
+ m2 = MetaData(testing.db)
+ t2 = Table('t1', m2, autoload=True)
+ assert len(t2.indexes) == 2
- m2 = MetaData(testing.db)
- t2 = Table('t1', m2, autoload=True, include_columns=['a'])
- assert len(t2.indexes) == 1
+ m2 = MetaData(testing.db)
+ t2 = Table('t1', m2, autoload=True, include_columns=['a'])
+ assert len(t2.indexes) == 1
- m2 = MetaData(testing.db)
- t2 = Table('t1', m2, autoload=True, include_columns=['a', 'b'])
- assert len(t2.indexes) == 2
- finally:
- m.drop_all()
+ m2 = MetaData(testing.db)
+ t2 = Table('t1', m2, autoload=True, include_columns=['a', 'b'])
+ assert len(t2.indexes) == 2
+ @testing.provide_metadata
def test_autoincrement_col(self):
"""test that 'autoincrement' is reflected according to sqla's policy.
"""
- meta = MetaData(testing.db)
+ meta = self.metadata
t1 = Table('test', meta,
Column('id', sa.Integer, primary_key=True),
Column('data', sa.String(50)),
mysql_engine='MyISAM'
)
meta.create_all()
- try:
- m2 = MetaData(testing.db)
- t1a = Table('test', m2, autoload=True)
- assert t1a._autoincrement_column is t1a.c.id
-
- t2a = Table('test2', m2, autoload=True)
- assert t2a._autoincrement_column is t2a.c.id2
+ m2 = MetaData(testing.db)
+ t1a = Table('test', m2, autoload=True)
+ assert t1a._autoincrement_column is t1a.c.id
- finally:
- meta.drop_all()
+ t2a = Table('test2', m2, autoload=True)
+ assert t2a._autoincrement_column is t2a.c.id2
+ @testing.provide_metadata
def test_unknown_types(self):
- meta = MetaData(testing.db)
+ meta = self.metadata
t = Table("test", meta,
Column('foo', sa.DateTime))
finally:
testing.db.dialect.ischema_names = ischema_names
- t.drop()
+ @testing.provide_metadata
def test_basic_override(self):
- meta = MetaData(testing.db)
+ meta = self.metadata
table = Table(
'override_test', meta,
Column('col1', sa.Integer, primary_key=True),
table.create()
meta2 = MetaData(testing.db)
- try:
- table = Table(
- 'override_test', meta2,
- Column('col2', sa.Unicode()),
- Column('col4', sa.String(30)), autoload=True)
-
- self.assert_(isinstance(table.c.col1.type, sa.Integer))
- self.assert_(isinstance(table.c.col2.type, sa.Unicode))
- self.assert_(isinstance(table.c.col4.type, sa.String))
- finally:
- table.drop()
+ table = Table(
+ 'override_test', meta2,
+ Column('col2', sa.Unicode()),
+ Column('col4', sa.String(30)), autoload=True)
+
+ self.assert_(isinstance(table.c.col1.type, sa.Integer))
+ self.assert_(isinstance(table.c.col2.type, sa.Unicode))
+ self.assert_(isinstance(table.c.col4.type, sa.String))
+ @testing.provide_metadata
def test_override_pkfk(self):
"""test that you can override columns which contain foreign keys
to other reflected tables, where the foreign key column is also
a primary key column"""
- meta = MetaData(testing.db)
+ meta = self.metadata
users = Table('users', meta,
Column('id', sa.Integer, primary_key=True),
Column('name', sa.String(30)))
meta.create_all()
- try:
- meta2 = MetaData(testing.db)
- a2 = Table('addresses', meta2,
- Column('id', sa.Integer,
- sa.ForeignKey('users.id'), primary_key=True),
- autoload=True)
- u2 = Table('users', meta2, autoload=True)
-
- assert list(a2.primary_key) == [a2.c.id]
- assert list(u2.primary_key) == [u2.c.id]
- assert u2.join(a2).onclause.compare(u2.c.id==a2.c.id)
-
- meta3 = MetaData(testing.db)
- u3 = Table('users', meta3, autoload=True)
- a3 = Table('addresses', meta3,
- Column('id', sa.Integer, sa.ForeignKey('users.id'),
- primary_key=True),
- autoload=True)
-
- assert list(a3.primary_key) == [a3.c.id]
- assert list(u3.primary_key) == [u3.c.id]
- assert u3.join(a3).onclause.compare(u3.c.id==a3.c.id)
-
- finally:
- meta.drop_all()
+ meta2 = MetaData(testing.db)
+ a2 = Table('addresses', meta2,
+ Column('id', sa.Integer,
+ sa.ForeignKey('users.id'), primary_key=True),
+ autoload=True)
+ u2 = Table('users', meta2, autoload=True)
+
+ assert list(a2.primary_key) == [a2.c.id]
+ assert list(u2.primary_key) == [u2.c.id]
+ assert u2.join(a2).onclause.compare(u2.c.id==a2.c.id)
+
+ meta3 = MetaData(testing.db)
+ u3 = Table('users', meta3, autoload=True)
+ a3 = Table('addresses', meta3,
+ Column('id', sa.Integer, sa.ForeignKey('users.id'),
+ primary_key=True),
+ autoload=True)
+
+ assert list(a3.primary_key) == [a3.c.id]
+ assert list(u3.primary_key) == [u3.c.id]
+ assert u3.join(a3).onclause.compare(u3.c.id==a3.c.id)
+ @testing.provide_metadata
def test_override_nonexistent_fk(self):
"""test that you can override columns and create new foreign
keys to other reflected tables which have no foreign keys. this
is common with MySQL MyISAM tables."""
- meta = MetaData(testing.db)
+ meta = self.metadata
users = Table('users', meta,
Column('id', sa.Integer, primary_key=True),
Column('name', sa.String(30)))
Column('user_id', sa.Integer))
meta.create_all()
- try:
- meta2 = MetaData(testing.db)
- a2 = Table('addresses', meta2,
- Column('user_id',sa.Integer, sa.ForeignKey('users.id')),
+ meta2 = MetaData(testing.db)
+ a2 = Table('addresses', meta2,
+ Column('user_id',sa.Integer, sa.ForeignKey('users.id')),
+ autoload=True)
+ u2 = Table('users', meta2, autoload=True)
+ assert len(a2.c.user_id.foreign_keys) == 1
+ assert len(a2.foreign_keys) == 1
+ assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
+ assert [c.parent for c in a2.c.user_id.foreign_keys] \
+ == [a2.c.user_id]
+ assert list(a2.c.user_id.foreign_keys)[0].parent \
+ is a2.c.user_id
+ assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id)
+ meta3 = MetaData(testing.db)
+
+ u3 = Table('users', meta3, autoload=True)
+
+ a3 = Table('addresses', meta3, Column('user_id',
+ sa.Integer, sa.ForeignKey('users.id')),
autoload=True)
- u2 = Table('users', meta2, autoload=True)
- assert len(a2.c.user_id.foreign_keys) == 1
- assert len(a2.foreign_keys) == 1
- assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
- assert [c.parent for c in a2.c.user_id.foreign_keys] \
- == [a2.c.user_id]
- assert list(a2.c.user_id.foreign_keys)[0].parent \
- is a2.c.user_id
- assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id)
- meta3 = MetaData(testing.db)
-
- u3 = Table('users', meta3, autoload=True)
-
- a3 = Table('addresses', meta3, Column('user_id',
- sa.Integer, sa.ForeignKey('users.id')),
- autoload=True)
- assert u3.join(a3).onclause.compare(u3.c.id == a3.c.user_id)
-
- meta4 = MetaData(testing.db)
-
- u4 = Table('users', meta4,
- Column('id', sa.Integer, key='u_id', primary_key=True),
- autoload=True)
-
- a4 = Table(
- 'addresses',
- meta4,
- Column('id', sa.Integer, key='street',
- primary_key=True),
- Column('street', sa.String(30), key='user_id'),
- Column('user_id', sa.Integer, sa.ForeignKey('users.u_id'
- ), key='id'),
- autoload=True,
- )
- assert u4.join(a4).onclause.compare(u4.c.u_id == a4.c.id)
- assert list(u4.primary_key) == [u4.c.u_id]
- assert len(u4.columns) == 2
- assert len(u4.constraints) == 1
- assert len(a4.columns) == 3
- assert len(a4.constraints) == 2
- finally:
- meta.drop_all()
+ assert u3.join(a3).onclause.compare(u3.c.id == a3.c.user_id)
+
+ meta4 = MetaData(testing.db)
+
+ u4 = Table('users', meta4,
+ Column('id', sa.Integer, key='u_id', primary_key=True),
+ autoload=True)
+
+ a4 = Table(
+ 'addresses',
+ meta4,
+ Column('id', sa.Integer, key='street',
+ primary_key=True),
+ Column('street', sa.String(30), key='user_id'),
+ Column('user_id', sa.Integer, sa.ForeignKey('users.u_id'
+ ), key='id'),
+ autoload=True,
+ )
+ assert u4.join(a4).onclause.compare(u4.c.u_id == a4.c.id)
+ assert list(u4.primary_key) == [u4.c.u_id]
+ assert len(u4.columns) == 2
+ assert len(u4.constraints) == 1
+ assert len(a4.columns) == 3
+ assert len(a4.constraints) == 2
@testing.provide_metadata
def test_override_composite_fk(self):
+ @testing.provide_metadata
def test_override_keys(self):
"""test that columns can be overridden with a 'key',
and that ForeignKey targeting during reflection still works."""
-
- meta = MetaData(testing.db)
+ meta = self.metadata
a1 = Table('a', meta,
Column('x', sa.Integer, primary_key=True),
Column('z', sa.Integer),
test_needs_fk=True
)
meta.create_all()
- try:
- m2 = MetaData(testing.db)
- a2 = Table('a', m2,
- Column('x', sa.Integer, primary_key=True, key='x1'),
- autoload=True)
- b2 = Table('b', m2, autoload=True)
- assert a2.join(b2).onclause.compare(a2.c.x1 == b2.c.y)
- assert b2.c.y.references(a2.c.x1)
- finally:
- meta.drop_all()
+ m2 = MetaData(testing.db)
+ a2 = Table('a', m2,
+ Column('x', sa.Integer, primary_key=True, key='x1'),
+ autoload=True)
+ b2 = Table('b', m2, autoload=True)
+ assert a2.join(b2).onclause.compare(a2.c.x1 == b2.c.y)
+ assert b2.c.y.references(a2.c.x1)
@testing.provide_metadata
def test_nonreflected_fk_raises(self):
assert_raises(sa.exc.NoReferencedColumnError, a2.join, b2)
@testing.exclude('mysql', '<', (4, 1, 1), 'innodb funkiness')
+ @testing.provide_metadata
def test_override_existing_fk(self):
"""test that you can override columns and specify new foreign
keys to other reflected tables, on columns which *do* already
have that foreign key, and that the FK is not duped. """
- meta = MetaData(testing.db)
+ meta = self.metadata
users = Table('users', meta,
Column('id', sa.Integer, primary_key=True),
Column('name', sa.String(30)),
test_needs_fk=True)
meta.create_all()
- try:
- meta2 = MetaData(testing.db)
- a2 = Table('addresses', meta2,
- Column('user_id',sa.Integer, sa.ForeignKey('users.id')),
- autoload=True)
- u2 = Table('users', meta2, autoload=True)
- s = sa.select([a2])
-
- assert s.c.user_id is not None
- assert len(a2.foreign_keys) == 1
- assert len(a2.c.user_id.foreign_keys) == 1
- assert len(a2.constraints) == 2
- assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
- assert [c.parent for c in a2.c.user_id.foreign_keys] \
- == [a2.c.user_id]
- assert list(a2.c.user_id.foreign_keys)[0].parent \
- is a2.c.user_id
- assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id)
-
- meta2 = MetaData(testing.db)
- u2 = Table('users', meta2, Column('id', sa.Integer,
- primary_key=True), autoload=True)
- a2 = Table('addresses', meta2, Column('id', sa.Integer,
- primary_key=True), Column('user_id', sa.Integer,
- sa.ForeignKey('users.id')), autoload=True)
- s = sa.select([a2])
-
- assert s.c.user_id is not None
- assert len(a2.foreign_keys) == 1
- assert len(a2.c.user_id.foreign_keys) == 1
- assert len(a2.constraints) == 2
- assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
- assert [c.parent for c in a2.c.user_id.foreign_keys] \
- == [a2.c.user_id]
- assert list(a2.c.user_id.foreign_keys)[0].parent \
- is a2.c.user_id
- assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id)
+ meta2 = MetaData(testing.db)
+ a2 = Table('addresses', meta2,
+ Column('user_id',sa.Integer, sa.ForeignKey('users.id')),
+ autoload=True)
+ u2 = Table('users', meta2, autoload=True)
+ s = sa.select([a2])
+
+ assert s.c.user_id is not None
+ assert len(a2.foreign_keys) == 1
+ assert len(a2.c.user_id.foreign_keys) == 1
+ assert len(a2.constraints) == 2
+ assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
+ assert [c.parent for c in a2.c.user_id.foreign_keys] \
+ == [a2.c.user_id]
+ assert list(a2.c.user_id.foreign_keys)[0].parent \
+ is a2.c.user_id
+ assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id)
- finally:
- meta.drop_all()
+ meta2 = MetaData(testing.db)
+ u2 = Table('users', meta2, Column('id', sa.Integer,
+ primary_key=True), autoload=True)
+ a2 = Table('addresses', meta2, Column('id', sa.Integer,
+ primary_key=True), Column('user_id', sa.Integer,
+ sa.ForeignKey('users.id')), autoload=True)
+ s = sa.select([a2])
+
+ assert s.c.user_id is not None
+ assert len(a2.foreign_keys) == 1
+ assert len(a2.c.user_id.foreign_keys) == 1
+ assert len(a2.constraints) == 2
+ assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
+ assert [c.parent for c in a2.c.user_id.foreign_keys] \
+ == [a2.c.user_id]
+ assert list(a2.c.user_id.foreign_keys)[0].parent \
+ is a2.c.user_id
+ assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id)
def test_pks_not_uniques(self):
"""test that primary key reflection not tripped up by unique
testing.db.execute("drop table book")
@testing.exclude('mysql', '<', (4, 1, 1), 'innodb funkiness')
+ @testing.provide_metadata
def test_composite_fk(self):
"""test reflection of composite foreign keys"""
- meta = MetaData(testing.db)
+ meta = self.metadata
multi = Table(
'multi', meta,
Column('multi_id', sa.Integer, primary_key=True),
)
meta.create_all()
- try:
- meta2 = MetaData()
- table = Table('multi', meta2, autoload=True,
- autoload_with=testing.db)
- table2 = Table('multi2', meta2, autoload=True,
- autoload_with=testing.db)
- self.assert_tables_equal(multi, table)
- self.assert_tables_equal(multi2, table2)
- j = sa.join(table, table2)
-
- self.assert_(sa.and_(table.c.multi_id == table2.c.foo,
- table.c.multi_rev == table2.c.bar,
- table.c.multi_hoho
- == table2.c.lala).compare(j.onclause))
- finally:
- meta.drop_all()
+ meta2 = MetaData()
+ table = Table('multi', meta2, autoload=True,
+ autoload_with=testing.db)
+ table2 = Table('multi2', meta2, autoload=True,
+ autoload_with=testing.db)
+ self.assert_tables_equal(multi, table)
+ self.assert_tables_equal(multi2, table2)
+ j = sa.join(table, table2)
+
+ self.assert_(sa.and_(table.c.multi_id == table2.c.foo,
+ table.c.multi_rev == table2.c.bar,
+ table.c.multi_hoho
+ == table2.c.lala).compare(j.onclause))
@testing.crashes('oracle', 'FIXME: unknown, confirm not fails_on')
"FIXME: should be supported via the "
"DELIMITED env var but that breaks "
"everything else for now")
+ @testing.provide_metadata
def test_reserved(self):
# check a table that uses an SQL reserved name doesn't cause an
# error
- meta = MetaData(testing.db)
+ meta = self.metadata
table_a = Table('select', meta, Column('not', sa.Integer,
primary_key=True), Column('from',
sa.String(12), nullable=False),
meta.create_all()
index_c.drop()
meta2 = MetaData(testing.db)
- try:
- table_a2 = Table('select', meta2, autoload=True)
- table_b2 = Table('false', meta2, autoload=True)
- table_c2 = Table('is', meta2, autoload=True)
- finally:
- meta.drop_all()
+ table_a2 = Table('select', meta2, autoload=True)
+ table_b2 = Table('false', meta2, autoload=True)
+ table_c2 = Table('is', meta2, autoload=True)
+ @testing.provide_metadata
def test_reflect_all(self):
existing = testing.db.table_names()
self.assert_(name not in existing)
self.assert_('rt_f' not in existing)
- baseline = MetaData(testing.db)
+ baseline = self.metadata
for name in names:
Table(name, baseline, Column('id', sa.Integer, primary_key=True))
baseline.create_all()
+ m1 = MetaData(testing.db)
+ self.assert_(not m1.tables)
+ m1.reflect()
+ self.assert_(nameset.issubset(set(m1.tables.keys())))
+
+ m2 = MetaData()
+ m2.reflect(testing.db, only=['rt_a', 'rt_b'])
+ self.assert_(set(m2.tables.keys()) == set(['rt_a', 'rt_b']))
+
+ m3 = MetaData()
+ c = testing.db.connect()
+ m3.reflect(bind=c, only=lambda name, meta: name == 'rt_c')
+ self.assert_(set(m3.tables.keys()) == set(['rt_c']))
+
+ m4 = MetaData(testing.db)
try:
- m1 = MetaData(testing.db)
- self.assert_(not m1.tables)
- m1.reflect()
- self.assert_(nameset.issubset(set(m1.tables.keys())))
-
- m2 = MetaData()
- m2.reflect(testing.db, only=['rt_a', 'rt_b'])
- self.assert_(set(m2.tables.keys()) == set(['rt_a', 'rt_b']))
-
- m3 = MetaData()
- c = testing.db.connect()
- m3.reflect(bind=c, only=lambda name, meta: name == 'rt_c')
- self.assert_(set(m3.tables.keys()) == set(['rt_c']))
-
- m4 = MetaData(testing.db)
- try:
- m4.reflect(only=['rt_a', 'rt_f'])
- self.assert_(False)
- except sa.exc.InvalidRequestError, e:
- self.assert_(e.args[0].endswith('(rt_f)'))
-
- m5 = MetaData(testing.db)
- m5.reflect(only=[])
- self.assert_(not m5.tables)
-
- m6 = MetaData(testing.db)
- m6.reflect(only=lambda n, m: False)
- self.assert_(not m6.tables)
-
- m7 = MetaData(testing.db, reflect=True)
- self.assert_(nameset.issubset(set(m7.tables.keys())))
-
- try:
- m8 = MetaData(reflect=True)
- self.assert_(False)
- except sa.exc.ArgumentError, e:
- self.assert_(e.args[0]
- == 'A bind must be supplied in '
- 'conjunction with reflect=True')
- finally:
- baseline.drop_all()
+ m4.reflect(only=['rt_a', 'rt_f'])
+ self.assert_(False)
+ except sa.exc.InvalidRequestError, e:
+ self.assert_(e.args[0].endswith('(rt_f)'))
+
+ m5 = MetaData(testing.db)
+ m5.reflect(only=[])
+ self.assert_(not m5.tables)
+
+ m6 = MetaData(testing.db)
+ m6.reflect(only=lambda n, m: False)
+ self.assert_(not m6.tables)
+
+ m7 = MetaData(testing.db, reflect=True)
+ self.assert_(nameset.issubset(set(m7.tables.keys())))
+
+ try:
+ m8 = MetaData(reflect=True)
+ self.assert_(False)
+ except sa.exc.ArgumentError, e:
+ self.assert_(e.args[0]
+ == 'A bind must be supplied in '
+ 'conjunction with reflect=True')
if existing:
print "Other tables present in database, skipping some checks."
else:
+ baseline.drop_all()
m9 = MetaData(testing.db)
m9.reflect()
self.assert_(not m9.tables)
i = Inspector.from_engine(testing.db)
assert not c.closed
+ @testing.provide_metadata
def test_index_reflection(self):
- m1 = MetaData(testing.db)
+ m1 = self.metadata
t1 = Table('party', m1,
Column('id', sa.Integer, nullable=False),
Column('name', sa.String(20), index=True)
i1 = sa.Index('idx1', t1.c.id, unique=True)
i2 = sa.Index('idx2', t1.c.name, t1.c.id, unique=False)
m1.create_all()
- try:
- m2 = MetaData(testing.db)
- t2 = Table('party', m2, autoload=True)
-
- assert len(t2.indexes) == 3
- # Make sure indexes are in the order we expect them in
- tmp = [(idx.name, idx) for idx in t2.indexes]
- tmp.sort()
- r1, r2, r3 = [idx[1] for idx in tmp]
-
- assert r1.name == 'idx1'
- assert r2.name == 'idx2'
- assert r1.unique == True
- assert r2.unique == False
- assert r3.unique == False
- assert set([t2.c.id]) == set(r1.columns)
- assert set([t2.c.name, t2.c.id]) == set(r2.columns)
- assert set([t2.c.name]) == set(r3.columns)
- finally:
- m1.drop_all()
+ m2 = MetaData(testing.db)
+ t2 = Table('party', m2, autoload=True)
+
+ assert len(t2.indexes) == 3
+ # Make sure indexes are in the order we expect them in
+ tmp = [(idx.name, idx) for idx in t2.indexes]
+ tmp.sort()
+ r1, r2, r3 = [idx[1] for idx in tmp]
+
+ assert r1.name == 'idx1'
+ assert r2.name == 'idx2'
+ assert r1.unique == True
+ assert r2.unique == False
+ assert r3.unique == False
+ assert set([t2.c.id]) == set(r1.columns)
+ assert set([t2.c.name, t2.c.id]) == set(r2.columns)
+ assert set([t2.c.name]) == set(r3.columns)
@testing.requires.views
@testing.provide_metadata
def test_views(self):
metadata = self.metadata
- users, addresses = createTables(metadata, None)
+ users, addresses, dingalings = createTables(metadata)
try:
metadata.create_all()
_create_views(metadata.bind, None)
@testing.provide_metadata
def test_reflect_all_with_views(self):
metadata = self.metadata
- users, addresses = createTables(metadata, None)
+ users, addresses, dingalings = createTables(metadata, None)
try:
metadata.create_all()
_create_views(metadata.bind, None)
m2.reflect(views=False)
eq_(
set(m2.tables),
- set([u'users', u'email_addresses'])
+ set(['users', 'email_addresses', 'dingalings'])
)
m2 = MetaData(testing.db)
m2.reflect(views=True)
eq_(
set(m2.tables),
- set([u'email_addresses_v', u'users_v',
- u'users', u'email_addresses'])
+ set(['email_addresses_v', 'users_v',
+ 'users', 'dingalings', 'email_addresses'])
)
finally:
_drop_views(metadata.bind)
metadata.create_all()
m2 = MetaData(schema="test_schema", bind=testing.db)
m2.reflect()
- eq_(m2.tables.keys(),
- [u'test_schema.users', u'test_schema.email_addresses']
+ eq_(
+ set(m2.tables),
+ set(['test_schema.dingalings', 'test_schema.users',
+ 'test_schema.email_addresses'])
)
class HasSequenceTest(fixtures.TestBase):
def createTables(meta, schema=None):
if schema:
- parent_user_id = Column('parent_user_id', sa.Integer,
- sa.ForeignKey('%s.users.user_id' % schema)
- )
+ schema_prefix = schema + "."
else:
- parent_user_id = Column('parent_user_id', sa.Integer,
- sa.ForeignKey('users.user_id')
- )
+ schema_prefix = ""
users = Table('users', meta,
Column('user_id', sa.INT, primary_key=True),
Column('test4', sa.Numeric(10, 2), nullable = False),
Column('test5', sa.Date),
Column('test5_1', sa.TIMESTAMP),
- parent_user_id,
+ Column('parent_user_id', sa.Integer,
+ sa.ForeignKey('%susers.user_id' % schema_prefix)),
Column('test6', sa.Date, nullable=False),
Column('test7', sa.Text),
Column('test8', sa.LargeBinary),
schema=schema,
test_needs_fk=True,
)
+ dingalings = Table("dingalings", meta,
+ Column('dingaling_id', sa.Integer, primary_key=True),
+ Column('address_id', sa.Integer,
+ sa.ForeignKey('%semail_addresses.address_id' % schema_prefix)),
+ Column('data', sa.String(30)),
+ schema=schema,
+ test_needs_fk=True,
+ )
addresses = Table('email_addresses', meta,
Column('address_id', sa.Integer),
Column('remote_user_id', sa.Integer,
schema=schema,
test_needs_fk=True,
)
- return (users, addresses)
+
+ return (users, addresses, dingalings)
def createIndexes(con, schema=None):
fullname = 'users'
insp = Inspector(testing.db)
eq_(insp.default_schema_name, testing.db.dialect.default_schema_name)
+ @testing.provide_metadata
def _test_get_table_names(self, schema=None, table_type='table',
order_by=None):
- meta = MetaData(testing.db)
- (users, addresses) = createTables(meta, schema)
+ meta = self.metadata
+ users, addresses, dingalings = createTables(meta, schema)
meta.create_all()
_create_views(meta.bind, schema)
try:
else:
table_names = insp.get_table_names(schema,
order_by=order_by)
- table_names.sort()
if order_by == 'foreign_key':
- answer = ['users', 'email_addresses']
+ answer = ['dingalings', 'email_addresses', 'users']
+ eq_(table_names, answer)
else:
- answer = ['email_addresses', 'users']
- eq_(table_names, answer)
+ answer = ['dingalings', 'email_addresses', 'users']
+ eq_(sorted(table_names), answer)
finally:
_drop_views(meta.bind, schema)
- addresses.drop()
- users.drop()
def test_get_table_names(self):
self._test_get_table_names()
+ def test_get_table_names_fks(self):
+ self._test_get_table_names(order_by='foreign_key')
+
@testing.requires.schemas
def test_get_table_names_with_schema(self):
self._test_get_table_names('test_schema')
def _test_get_columns(self, schema=None, table_type='table'):
meta = MetaData(testing.db)
- users, addresses = createTables(meta, schema)
+ users, addresses, dingalings = createTables(meta, schema)
table_names = ['users', 'email_addresses']
meta.create_all()
if table_type == 'view':
finally:
if table_type == 'view':
_drop_views(meta.bind, schema)
- addresses.drop()
- users.drop()
+ meta.drop_all()
def test_get_columns(self):
self._test_get_columns()
def test_get_view_columns_with_schema(self):
self._test_get_columns(schema='test_schema', table_type='view')
+ @testing.provide_metadata
def _test_get_primary_keys(self, schema=None):
- meta = MetaData(testing.db)
- (users, addresses) = createTables(meta, schema)
+ meta = self.metadata
+ users, addresses, dingalings = createTables(meta, schema)
meta.create_all()
insp = Inspector(meta.bind)
- try:
- users_pkeys = insp.get_primary_keys(users.name,
- schema=schema)
- eq_(users_pkeys, ['user_id'])
- addr_cons = insp.get_pk_constraint(addresses.name,
- schema=schema)
+ users_pkeys = insp.get_primary_keys(users.name,
+ schema=schema)
+ eq_(users_pkeys, ['user_id'])
+ addr_cons = insp.get_pk_constraint(addresses.name,
+ schema=schema)
- addr_pkeys = addr_cons['constrained_columns']
- eq_(addr_pkeys, ['address_id'])
+ addr_pkeys = addr_cons['constrained_columns']
+ eq_(addr_pkeys, ['address_id'])
- @testing.requires.reflects_pk_names
- def go():
- eq_(addr_cons['name'], 'email_ad_pk')
- go()
-
- finally:
- addresses.drop()
- users.drop()
+ @testing.requires.reflects_pk_names
+ def go():
+ eq_(addr_cons['name'], 'email_ad_pk')
+ go()
def test_get_primary_keys(self):
self._test_get_primary_keys()
def test_get_primary_keys_with_schema(self):
self._test_get_primary_keys(schema='test_schema')
+ @testing.provide_metadata
def _test_get_foreign_keys(self, schema=None):
- meta = MetaData(testing.db)
- (users, addresses) = createTables(meta, schema)
+ meta = self.metadata
+ users, addresses, dingalings = createTables(meta, schema)
meta.create_all()
insp = Inspector(meta.bind)
- try:
- expected_schema = schema
- # users
- users_fkeys = insp.get_foreign_keys(users.name,
- schema=schema)
- fkey1 = users_fkeys[0]
- self.assert_(fkey1['name'] is not None)
- eq_(fkey1['referred_schema'], expected_schema)
- eq_(fkey1['referred_table'], users.name)
- eq_(fkey1['referred_columns'], ['user_id', ])
- eq_(fkey1['constrained_columns'], ['parent_user_id'])
- #addresses
- addr_fkeys = insp.get_foreign_keys(addresses.name,
- schema=schema)
- fkey1 = addr_fkeys[0]
- self.assert_(fkey1['name'] is not None)
- eq_(fkey1['referred_schema'], expected_schema)
- eq_(fkey1['referred_table'], users.name)
- eq_(fkey1['referred_columns'], ['user_id', ])
- eq_(fkey1['constrained_columns'], ['remote_user_id'])
- finally:
- addresses.drop()
- users.drop()
+ expected_schema = schema
+ # users
+ users_fkeys = insp.get_foreign_keys(users.name,
+ schema=schema)
+ fkey1 = users_fkeys[0]
+ self.assert_(fkey1['name'] is not None)
+ eq_(fkey1['referred_schema'], expected_schema)
+ eq_(fkey1['referred_table'], users.name)
+ eq_(fkey1['referred_columns'], ['user_id', ])
+ eq_(fkey1['constrained_columns'], ['parent_user_id'])
+ #addresses
+ addr_fkeys = insp.get_foreign_keys(addresses.name,
+ schema=schema)
+ fkey1 = addr_fkeys[0]
+ self.assert_(fkey1['name'] is not None)
+ eq_(fkey1['referred_schema'], expected_schema)
+ eq_(fkey1['referred_table'], users.name)
+ eq_(fkey1['referred_columns'], ['user_id', ])
+ eq_(fkey1['constrained_columns'], ['remote_user_id'])
def test_get_foreign_keys(self):
self._test_get_foreign_keys()
def test_get_foreign_keys_with_schema(self):
self._test_get_foreign_keys(schema='test_schema')
+ @testing.provide_metadata
def _test_get_indexes(self, schema=None):
- meta = MetaData(testing.db)
- (users, addresses) = createTables(meta, schema)
+ meta = self.metadata
+ users, addresses, dingalings = createTables(meta, schema)
meta.create_all()
createIndexes(meta.bind, schema)
- try:
- # The database may decide to create indexes for foreign keys, etc.
- # so there may be more indexes than expected.
- insp = Inspector(meta.bind)
- indexes = insp.get_indexes('users', schema=schema)
- expected_indexes = [
- {'unique': False,
- 'column_names': ['test1', 'test2'],
- 'name': 'users_t_idx'}]
- index_names = [d['name'] for d in indexes]
- for e_index in expected_indexes:
- assert e_index['name'] in index_names
- index = indexes[index_names.index(e_index['name'])]
- for key in e_index:
- eq_(e_index[key], index[key])
-
- finally:
- addresses.drop()
- users.drop()
+ # The database may decide to create indexes for foreign keys, etc.
+ # so there may be more indexes than expected.
+ insp = Inspector(meta.bind)
+ indexes = insp.get_indexes('users', schema=schema)
+ expected_indexes = [
+ {'unique': False,
+ 'column_names': ['test1', 'test2'],
+ 'name': 'users_t_idx'}]
+ index_names = [d['name'] for d in indexes]
+ for e_index in expected_indexes:
+ assert e_index['name'] in index_names
+ index = indexes[index_names.index(e_index['name'])]
+ for key in e_index:
+ eq_(e_index[key], index[key])
def test_get_indexes(self):
self._test_get_indexes()
def test_get_indexes_with_schema(self):
self._test_get_indexes(schema='test_schema')
+ @testing.provide_metadata
def _test_get_view_definition(self, schema=None):
- meta = MetaData(testing.db)
- (users, addresses) = createTables(meta, schema)
+ meta = self.metadata
+ users, addresses, dingalings = createTables(meta, schema)
meta.create_all()
_create_views(meta.bind, schema)
view_name1 = 'users_v'
self.assert_(v2)
finally:
_drop_views(meta.bind, schema)
- addresses.drop()
- users.drop()
@testing.requires.views
def test_get_view_definition(self):
def test_get_view_definition_with_schema(self):
self._test_get_view_definition(schema='test_schema')
+ @testing.only_on("postgresql", "PG specific feature")
+ @testing.provide_metadata
def _test_get_table_oid(self, table_name, schema=None):
- if testing.against('postgresql'):
- meta = MetaData(testing.db)
- (users, addresses) = createTables(meta, schema)
- meta.create_all()
- try:
- insp = create_inspector(meta.bind)
- oid = insp.get_table_oid(table_name, schema)
- self.assert_(isinstance(oid, (int, long)))
- finally:
- addresses.drop()
- users.drop()
+ meta = self.metadata
+ users, addresses, dingalings = createTables(meta, schema)
+ meta.create_all()
+ insp = create_inspector(meta.bind)
+ oid = insp.get_table_oid(table_name, schema)
+ self.assert_(isinstance(oid, (int, long)))
def test_get_table_oid(self):
self._test_get_table_oid('users')