]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- cleanup
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 22 Nov 2013 22:48:55 +0000 (17:48 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 22 Nov 2013 22:52:08 +0000 (17:52 -0500)
test/dialect/test_oracle.py

index 953dfe4e36a8ff66aca20b68cb0ab160a7f7d2a7..41996682d6b757a3a2b1626be353ee77ee4f5853 100644 (file)
@@ -11,12 +11,13 @@ from sqlalchemy.testing import assert_raises, assert_raises_message
 from sqlalchemy.testing.engines import testing_engine
 from sqlalchemy.dialects.oracle import cx_oracle, base as oracle
 from sqlalchemy.engine import default
+from sqlalchemy.util import u
 import decimal
 from sqlalchemy.testing.schema import Table, Column
 import datetime
 import os
 from sqlalchemy import sql
-
+from sqlalchemy.testing.mock import Mock
 
 class OutParamTest(fixtures.TestBase, AssertsExecutionResults):
     __only_on__ = 'oracle+cx_oracle'
@@ -24,31 +25,31 @@ class OutParamTest(fixtures.TestBase, AssertsExecutionResults):
     @classmethod
     def setup_class(cls):
         testing.db.execute("""
-create or replace procedure foo(x_in IN number, x_out OUT number, y_out OUT number, z_out OUT varchar) IS
-  retval number;
-    begin
-    retval := 6;
-    x_out := 10;
-    y_out := x_in * 15;
-    z_out := NULL;
-    end;
+        create or replace procedure foo(x_in IN number, x_out OUT number,
+        y_out OUT number, z_out OUT varchar) IS
+        retval number;
+        begin
+            retval := 6;
+            x_out := 10;
+            y_out := x_in * 15;
+            z_out := NULL;
+        end;
         """)
 
     def test_out_params(self):
-        result = \
-            testing.db.execute(text('begin foo(:x_in, :x_out, :y_out, '
+        result = testing.db.execute(text('begin foo(:x_in, :x_out, :y_out, '
                                ':z_out); end;',
                                bindparams=[bindparam('x_in', Float),
                                outparam('x_out', Integer),
                                outparam('y_out', Float),
                                outparam('z_out', String)]), x_in=5)
-        eq_(result.out_parameters, {'x_out': 10, 'y_out': 75, 'z_out'
-            : None})
+        eq_(result.out_parameters,
+            {'x_out': 10, 'y_out': 75, 'z_out': None})
         assert isinstance(result.out_parameters['x_out'], int)
 
     @classmethod
     def teardown_class(cls):
-         testing.db.execute("DROP PROCEDURE foo")
+        testing.db.execute("DROP PROCEDURE foo")
 
 class CXOracleArgsTest(fixtures.TestBase):
     __only_on__ = 'oracle+cx_oracle'
@@ -90,7 +91,7 @@ class QuotedBindRoundTripTest(fixtures.TestBase):
         metadata.create_all()
 
         table.insert().execute(
-            {"option":1, "plain":1, "union":1}
+            {"option": 1, "plain": 1, "union": 1}
         )
         eq_(
             testing.db.execute(table.select()).first(),
@@ -104,7 +105,6 @@ class QuotedBindRoundTripTest(fixtures.TestBase):
 
 
 class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
-
     __dialect__ = oracle.dialect()
 
     def test_true_false(self):
@@ -248,7 +248,7 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
 
     def test_use_binds_for_limits_enabled(self):
         t = table('sometable', column('col1'), column('col2'))
-        dialect = oracle.OracleDialect(use_binds_for_limits = True)
+        dialect = oracle.OracleDialect(use_binds_for_limits=True)
 
         self.assert_compile(select([t]).limit(10),
                 "SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, "
@@ -346,8 +346,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
         )
 
         query = select([table1, table2], or_(table1.c.name == 'fred',
-                       table1.c.myid == 10, table2.c.othername != 'jack'
-                       'EXISTS (select yay from foo where boo = lar)'
+                       table1.c.myid == 10, table2.c.othername != 'jack',
+                       'EXISTS (select yay from foo where boo = lar)'
                        ), from_obj=[outerjoin(table1, table2,
                        table1.c.myid == table2.c.otherid)])
         self.assert_compile(query,
@@ -433,8 +433,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
                             'mytable.description AS description FROM '
                             'mytable LEFT OUTER JOIN myothertable ON '
                             'mytable.myid = myothertable.otherid) '
-                            'anon_1 ON thirdtable.userid = anon_1.myid'
-                            dialect=oracle.dialect(use_ansi=True))
+                            'anon_1 ON thirdtable.userid = anon_1.myid',
+                            dialect=oracle.dialect(use_ansi=True))
 
         self.assert_compile(q,
                             'SELECT thirdtable.userid, '
@@ -514,7 +514,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
     def test_returning_insert_labeled(self):
         t1 = table('t1', column('c1'), column('c2'), column('c3'))
         self.assert_compile(
-            t1.insert().values(c1=1).returning(t1.c.c2.label('c2_l'), t1.c.c3.label('c3_l')),
+            t1.insert().values(c1=1).returning(
+                        t1.c.c2.label('c2_l'), t1.c.c3.label('c3_l')),
             "INSERT INTO t1 (c1) VALUES (:c1) RETURNING "
                 "t1.c2, t1.c3 INTO :ret_0, :ret_1"
         )
@@ -564,32 +565,40 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
         )
 
 class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL):
-    __only_on__ = 'oracle'
 
-    def test_ora8_flags(self):
-        def server_version_info(self):
-            return (8, 2, 5)
+    def _dialect(self, server_version, **kw):
+        def server_version_info(conn):
+            return server_version
 
-        dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi)
+        dialect = oracle.dialect(
+                        dbapi=Mock(version="0.0.0", paramstyle="named"),
+                        **kw)
         dialect._get_server_version_info = server_version_info
+        dialect._check_unicode_returns = Mock()
+        dialect._check_unicode_description = Mock()
+        dialect._get_default_schema_name = Mock()
+        return dialect
+
+
+    def test_ora8_flags(self):
+        dialect = self._dialect((8, 2, 5))
 
         # before connect, assume modern DB
         assert dialect._supports_char_length
         assert dialect._supports_nchar
         assert dialect.use_ansi
 
-        dialect.initialize(testing.db.connect())
+        dialect.initialize(Mock())
         assert not dialect.implicit_returning
         assert not dialect._supports_char_length
         assert not dialect._supports_nchar
         assert not dialect.use_ansi
-        self.assert_compile(String(50),"VARCHAR2(50)",dialect=dialect)
-        self.assert_compile(Unicode(50),"VARCHAR2(50)",dialect=dialect)
-        self.assert_compile(UnicodeText(),"CLOB",dialect=dialect)
+        self.assert_compile(String(50), "VARCHAR2(50)", dialect=dialect)
+        self.assert_compile(Unicode(50), "VARCHAR2(50)", dialect=dialect)
+        self.assert_compile(UnicodeText(), "CLOB", dialect=dialect)
 
-        dialect = oracle.dialect(implicit_returning=True,
-                                    dbapi=testing.db.dialect.dbapi)
-        dialect._get_server_version_info = server_version_info
+
+        dialect = self._dialect((8, 2, 5), implicit_returning=True)
         dialect.initialize(testing.db.connect())
         assert dialect.implicit_returning
 
@@ -597,26 +606,25 @@ class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL):
     def test_default_flags(self):
         """test with no initialization or server version info"""
 
-        dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi)
+        dialect = self._dialect(None)
+
         assert dialect._supports_char_length
         assert dialect._supports_nchar
         assert dialect.use_ansi
-        self.assert_compile(String(50),"VARCHAR2(50 CHAR)",dialect=dialect)
-        self.assert_compile(Unicode(50),"NVARCHAR2(50)",dialect=dialect)
-        self.assert_compile(UnicodeText(),"NCLOB",dialect=dialect)
+        self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect)
+        self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect)
+        self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect)
 
     def test_ora10_flags(self):
-        def server_version_info(self):
-            return (10, 2, 5)
-        dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi)
-        dialect._get_server_version_info = server_version_info
-        dialect.initialize(testing.db.connect())
+        dialect = self._dialect((10, 2, 5))
+
+        dialect.initialize(Mock())
         assert dialect._supports_char_length
         assert dialect._supports_nchar
         assert dialect.use_ansi
-        self.assert_compile(String(50),"VARCHAR2(50 CHAR)",dialect=dialect)
-        self.assert_compile(Unicode(50),"NVARCHAR2(50)",dialect=dialect)
-        self.assert_compile(UnicodeText(),"NCLOB",dialect=dialect)
+        self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect)
+        self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect)
+        self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect)
 
 
 class MultiSchemaTest(fixtures.TestBase, AssertsCompiledSQL):
@@ -677,9 +685,10 @@ drop synonym test_schema.local_table;
             if stmt.strip():
                 testing.db.execute(stmt)
 
+    @testing.provide_metadata
     def test_create_same_names_explicit_schema(self):
         schema = testing.db.dialect.default_schema_name
-        meta = MetaData(testing.db)
+        meta = self.metadata
         parent = Table('parent', meta,
             Column('pid', Integer, primary_key=True),
             schema=schema
@@ -690,16 +699,14 @@ drop synonym test_schema.local_table;
             schema=schema
         )
         meta.create_all()
-        try:
-            parent.insert().execute({'pid':1})
-            child.insert().execute({'cid':1, 'pid':1})
-            eq_(child.select().execute().fetchall(), [(1, 1)])
-        finally:
-            meta.drop_all()
+        parent.insert().execute({'pid': 1})
+        child.insert().execute({'cid': 1, 'pid': 1})
+        eq_(child.select().execute().fetchall(), [(1, 1)])
 
     def test_reflect_alt_table_owner_local_synonym(self):
         meta = MetaData(testing.db)
-        parent = Table('test_schema_ptable', meta, autoload=True, oracle_resolve_synonyms=True)
+        parent = Table('test_schema_ptable', meta, autoload=True,
+                            oracle_resolve_synonyms=True)
         self.assert_compile(parent.select(),
                 "SELECT test_schema_ptable.id, "
                 "test_schema_ptable.data FROM test_schema_ptable")
@@ -707,14 +714,16 @@ drop synonym test_schema.local_table;
 
     def test_reflect_alt_synonym_owner_local_table(self):
         meta = MetaData(testing.db)
-        parent = Table('local_table', meta, autoload=True, oracle_resolve_synonyms=True, schema="test_schema")
+        parent = Table('local_table', meta, autoload=True,
+                            oracle_resolve_synonyms=True, schema="test_schema")
         self.assert_compile(parent.select(),
                 "SELECT test_schema.local_table.id, "
                 "test_schema.local_table.data FROM test_schema.local_table")
         select([parent]).execute().fetchall()
 
+    @testing.provide_metadata
     def test_create_same_names_implicit_schema(self):
-        meta = MetaData(testing.db)
+        meta = self.metadata
         parent = Table('parent', meta,
             Column('pid', Integer, primary_key=True),
         )
@@ -723,12 +732,9 @@ drop synonym test_schema.local_table;
             Column('pid', Integer, ForeignKey('parent.pid')),
         )
         meta.create_all()
-        try:
-            parent.insert().execute({'pid':1})
-            child.insert().execute({'cid':1, 'pid':1})
-            eq_(child.select().execute().fetchall(), [(1, 1)])
-        finally:
-            meta.drop_all()
+        parent.insert().execute({'pid': 1})
+        child.insert().execute({'cid': 1, 'pid': 1})
+        eq_(child.select().execute().fetchall(), [(1, 1)])
 
 
     def test_reflect_alt_owner_explicit(self):
@@ -918,10 +924,16 @@ class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL):
         dbapi = FakeDBAPI()
 
         b = bindparam("foo", "hello world!")
-        assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING'
+        eq_(
+            b.type.dialect_impl(dialect).get_dbapi_type(dbapi),
+            'STRING'
+        )
 
-        b = bindparam("foo", u"hello world!")
-        assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING'
+        b = bindparam("foo", "hello world!")
+        eq_(
+            b.type.dialect_impl(dialect).get_dbapi_type(dbapi),
+            'STRING'
+        )
 
     def test_long(self):
         self.assert_compile(oracle.LONG(), "LONG")
@@ -950,14 +962,14 @@ class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL):
         self.assert_compile(oracle.RAW(35), "RAW(35)")
 
     def test_char_length(self):
-        self.assert_compile(VARCHAR(50),"VARCHAR(50 CHAR)")
+        self.assert_compile(VARCHAR(50), "VARCHAR(50 CHAR)")
 
         oracle8dialect = oracle.dialect()
         oracle8dialect.server_version_info = (8, 0)
-        self.assert_compile(VARCHAR(50),"VARCHAR(50)",dialect=oracle8dialect)
+        self.assert_compile(VARCHAR(50), "VARCHAR(50)", dialect=oracle8dialect)
 
-        self.assert_compile(NVARCHAR(50),"NVARCHAR2(50)")
-        self.assert_compile(CHAR(50),"CHAR(50)")
+        self.assert_compile(NVARCHAR(50), "NVARCHAR2(50)")
+        self.assert_compile(CHAR(50), "CHAR(50)")
 
     def test_varchar_types(self):
         dialect = oracle.dialect()
@@ -1005,36 +1017,36 @@ class TypesTest(fixtures.TestBase):
                 dict(id=3, data="value 3")
             )
 
-            eq_(t.select().where(t.c.data=='value 2').execute().fetchall(),
+            eq_(
+                t.select().where(t.c.data == 'value 2').execute().fetchall(),
                 [(2, 'value 2                       ')]
-                )
+            )
 
             m2 = MetaData(testing.db)
             t2 = Table('t1', m2, autoload=True)
             assert type(t2.c.data.type) is CHAR
-            eq_(t2.select().where(t2.c.data=='value 2').execute().fetchall(),
+            eq_(
+                t2.select().where(t2.c.data == 'value 2').execute().fetchall(),
                 [(2, 'value 2                       ')]
-                )
+            )
 
         finally:
             t.drop()
 
     @testing.requires.returning
+    @testing.provide_metadata
     def test_int_not_float(self):
-        m = MetaData(testing.db)
+        m = self.metadata
         t1 = Table('t1', m, Column('foo', Integer))
         t1.create()
-        try:
-            r = t1.insert().values(foo=5).returning(t1.c.foo).execute()
-            x = r.scalar()
-            assert x == 5
-            assert isinstance(x, int)
-
-            x = t1.select().scalar()
-            assert x == 5
-            assert isinstance(x, int)
-        finally:
-            t1.drop()
+        r = t1.insert().values(foo=5).returning(t1.c.foo).execute()
+        x = r.scalar()
+        assert x == 5
+        assert isinstance(x, int)
+
+        x = t1.select().scalar()
+        assert x == 5
+        assert isinstance(x, int)
 
     @testing.provide_metadata
     def test_rowid(self):
@@ -1051,7 +1063,7 @@ class TypesTest(fixtures.TestBase):
         # the ROWID type is not really needed here,
         # as cx_oracle just treats it as a string,
         # but we want to make sure the ROWID works...
-        rowid_col= column('rowid', oracle.ROWID)
+        rowid_col = column('rowid', oracle.ROWID)
         s3 = select([t.c.x, rowid_col]).\
                     where(rowid_col == cast(rowid, oracle.ROWID))
         eq_(s3.select().execute().fetchall(),
@@ -1077,8 +1089,9 @@ class TypesTest(fixtures.TestBase):
         eq_(row['day_interval'], datetime.timedelta(days=35,
             seconds=5743))
 
+    @testing.provide_metadata
     def test_numerics(self):
-        m = MetaData(testing.db)
+        m = self.metadata
         t1 = Table('t1', m,
             Column('intcol', Integer),
             Column('numericcol', Numeric(precision=9, scale=2)),
@@ -1091,41 +1104,38 @@ class TypesTest(fixtures.TestBase):
 
         )
         t1.create()
-        try:
-            t1.insert().execute(
-                intcol=1,
-                numericcol=5.2,
-                floatcol1=6.5,
-                floatcol2 = 8.5,
-                doubleprec = 9.5,
-                numbercol1=12,
-                numbercol2=14.85,
-                numbercol3=15.76
-                )
-
-            m2 = MetaData(testing.db)
-            t2 = Table('t1', m2, autoload=True)
+        t1.insert().execute(
+            intcol=1,
+            numericcol=5.2,
+            floatcol1=6.5,
+            floatcol2=8.5,
+            doubleprec=9.5,
+            numbercol1=12,
+            numbercol2=14.85,
+            numbercol3=15.76
+            )
 
-            for row in (
-                t1.select().execute().first(),
-                t2.select().execute().first()
-            ):
-                for i, (val, type_) in enumerate((
-                    (1, int),
-                    (decimal.Decimal("5.2"), decimal.Decimal),
-                    (6.5, float),
-                    (8.5, float),
-                    (9.5, float),
-                    (12, int),
-                    (decimal.Decimal("14.85"), decimal.Decimal),
-                    (15.76, float),
-                )):
-                    eq_(row[i], val)
-                    assert isinstance(row[i], type_), '%r is not %r' \
-                        % (row[i], type_)
+        m2 = MetaData(testing.db)
+        t2 = Table('t1', m2, autoload=True)
+
+        for row in (
+            t1.select().execute().first(),
+            t2.select().execute().first()
+        ):
+            for i, (val, type_) in enumerate((
+                (1, int),
+                (decimal.Decimal("5.2"), decimal.Decimal),
+                (6.5, float),
+                (8.5, float),
+                (9.5, float),
+                (12, int),
+                (decimal.Decimal("14.85"), decimal.Decimal),
+                (15.76, float),
+            )):
+                eq_(row[i], val)
+                assert isinstance(row[i], type_), '%r is not %r' \
+                    % (row[i], type_)
 
-        finally:
-            t1.drop()
 
 
     def test_numeric_no_decimal_mode(self):
@@ -1157,28 +1167,26 @@ class TypesTest(fixtures.TestBase):
         )
         foo.create()
 
-        foo.insert().execute(
-            {'idata':5, 'ndata':decimal.Decimal("45.6"),
-                    'ndata2':decimal.Decimal("45.0"),
-                    'nidata':decimal.Decimal('53'), 'fdata':45.68392},
-        )
+        foo.insert().execute({
+                'idata': 5,
+                'ndata': decimal.Decimal("45.6"),
+                'ndata2': decimal.Decimal("45.0"),
+                'nidata': decimal.Decimal('53'),
+                'fdata': 45.68392
+            })
 
-        stmt = """
-        SELECT
-            idata,
-            ndata,
-            ndata2,
-            nidata,
-            fdata
-        FROM foo
-        """
+        stmt = "SELECT idata, ndata, ndata2, nidata, fdata FROM foo"
 
 
         row = testing.db.execute(stmt).fetchall()[0]
-        eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, int, float])
+        eq_(
+            [type(x) for x in row],
+            [int, decimal.Decimal, decimal.Decimal, int, float]
+        )
         eq_(
             row,
-            (5, decimal.Decimal('45.6'), decimal.Decimal('45'), 53, 45.683920000000001)
+            (5, decimal.Decimal('45.6'), decimal.Decimal('45'),
+                53, 45.683920000000001)
         )
 
         # with a nested subquery,
@@ -1202,7 +1210,10 @@ class TypesTest(fixtures.TestBase):
         FROM dual
         """
         row = testing.db.execute(stmt).fetchall()[0]
-        eq_([type(x) for x in row], [int, decimal.Decimal, int, int, decimal.Decimal])
+        eq_(
+            [type(x) for x in row],
+            [int, decimal.Decimal, int, int, decimal.Decimal]
+        )
         eq_(
             row,
             (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392'))
@@ -1210,15 +1221,20 @@ class TypesTest(fixtures.TestBase):
 
         row = testing.db.execute(text(stmt,
                                 typemap={
-                                        'idata':Integer(),
-                                        'ndata':Numeric(20, 2),
-                                        'ndata2':Numeric(20, 2),
-                                        'nidata':Numeric(5, 0),
-                                        'fdata':Float()
+                                        'idata': Integer(),
+                                        'ndata': Numeric(20, 2),
+                                        'ndata2': Numeric(20, 2),
+                                        'nidata': Numeric(5, 0),
+                                        'fdata': Float()
                                 })).fetchall()[0]
-        eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float])
-        eq_(row,
-            (5, decimal.Decimal('45.6'), decimal.Decimal('45'), decimal.Decimal('53'), 45.683920000000001)
+        eq_(
+            [type(x) for x in row],
+            [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float]
+        )
+        eq_(
+            row,
+            (5, decimal.Decimal('45.6'), decimal.Decimal('45'),
+                decimal.Decimal('53'), 45.683920000000001)
         )
 
         stmt = """
@@ -1244,39 +1260,55 @@ class TypesTest(fixtures.TestBase):
         )
         WHERE ROWNUM >= 0) anon_1
         """
-        row =testing.db.execute(stmt).fetchall()[0]
-        eq_([type(x) for x in row], [int, decimal.Decimal, int, int, decimal.Decimal])
-        eq_(row, (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392')))
+        row = testing.db.execute(stmt).fetchall()[0]
+        eq_(
+            [type(x) for x in row],
+            [int, decimal.Decimal, int, int, decimal.Decimal]
+        )
+        eq_(
+            row,
+            (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392'))
+        )
 
         row = testing.db.execute(text(stmt,
                                 typemap={
-                                        'anon_1_idata':Integer(),
-                                        'anon_1_ndata':Numeric(20, 2),
-                                        'anon_1_ndata2':Numeric(20, 2),
-                                        'anon_1_nidata':Numeric(5, 0),
-                                        'anon_1_fdata':Float()
+                                        'anon_1_idata': Integer(),
+                                        'anon_1_ndata': Numeric(20, 2),
+                                        'anon_1_ndata2': Numeric(20, 2),
+                                        'anon_1_nidata': Numeric(5, 0),
+                                        'anon_1_fdata': Float()
                                 })).fetchall()[0]
-        eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float])
-        eq_(row,
-            (5, decimal.Decimal('45.6'), decimal.Decimal('45'), decimal.Decimal('53'), 45.683920000000001)
+        eq_(
+            [type(x) for x in row],
+            [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float]
+        )
+        eq_(
+            row,
+            (5, decimal.Decimal('45.6'), decimal.Decimal('45'),
+                decimal.Decimal('53'), 45.683920000000001)
         )
 
         row = testing.db.execute(text(stmt,
                                 typemap={
-                                        'anon_1_idata':Integer(),
-                                        'anon_1_ndata':Numeric(20, 2, asdecimal=False),
-                                        'anon_1_ndata2':Numeric(20, 2, asdecimal=False),
-                                        'anon_1_nidata':Numeric(5, 0, asdecimal=False),
-                                        'anon_1_fdata':Float(asdecimal=True)
+                                        'anon_1_idata': Integer(),
+                                        'anon_1_ndata': Numeric(20, 2, asdecimal=False),
+                                        'anon_1_ndata2': Numeric(20, 2, asdecimal=False),
+                                        'anon_1_nidata': Numeric(5, 0, asdecimal=False),
+                                        'anon_1_fdata': Float(asdecimal=True)
                                 })).fetchall()[0]
-        eq_([type(x) for x in row], [int, float, float, float, decimal.Decimal])
-        eq_(row,
+        eq_(
+            [type(x) for x in row],
+            [int, float, float, float, decimal.Decimal]
+        )
+        eq_(
+            row,
             (5, 45.6, 45, 53, decimal.Decimal('45.68392'))
         )
 
 
+    @testing.provide_metadata
     def test_reflect_dates(self):
-        metadata = MetaData(testing.db)
+        metadata = self.metadata
         Table(
             "date_types", metadata,
             Column('d1', DATE),
@@ -1285,20 +1317,16 @@ class TypesTest(fixtures.TestBase):
             Column('d4', oracle.INTERVAL(second_precision=5)),
         )
         metadata.create_all()
-        try:
-            m = MetaData(testing.db)
-            t1 = Table(
-                "date_types", m,
-                autoload=True)
-            assert isinstance(t1.c.d1.type, DATE)
-            assert isinstance(t1.c.d2.type, TIMESTAMP)
-            assert not t1.c.d2.type.timezone
-            assert isinstance(t1.c.d3.type, TIMESTAMP)
-            assert t1.c.d3.type.timezone
-            assert isinstance(t1.c.d4.type, oracle.INTERVAL)
-
-        finally:
-            metadata.drop_all()
+        m = MetaData(testing.db)
+        t1 = Table(
+            "date_types", m,
+            autoload=True)
+        assert isinstance(t1.c.d1.type, DATE)
+        assert isinstance(t1.c.d2.type, TIMESTAMP)
+        assert not t1.c.d2.type.timezone
+        assert isinstance(t1.c.d3.type, TIMESTAMP)
+        assert t1.c.d3.type.timezone
+        assert isinstance(t1.c.d4.type, oracle.INTERVAL)
 
     def test_reflect_all_types_schema(self):
         types_table = Table('all_types', MetaData(testing.db),
@@ -1326,7 +1354,7 @@ class TypesTest(fixtures.TestBase):
     @testing.provide_metadata
     def test_reflect_nvarchar(self):
         metadata = self.metadata
-        t = Table('t', metadata,
+        Table('t', metadata,
             Column('data', sqltypes.NVARCHAR(255))
         )
         metadata.create_all()
@@ -1348,22 +1376,20 @@ class TypesTest(fixtures.TestBase):
         assert isinstance(res, unicode)
 
 
+    @testing.provide_metadata
     def test_char_length(self):
-        metadata = MetaData(testing.db)
+        metadata = self.metadata
         t1 = Table('t1', metadata,
               Column("c1", VARCHAR(50)),
               Column("c2", NVARCHAR(250)),
               Column("c3", CHAR(200))
         )
         t1.create()
-        try:
-            m2 = MetaData(testing.db)
-            t2 = Table('t1', m2, autoload=True)
-            eq_(t2.c.c1.type.length, 50)
-            eq_(t2.c.c2.type.length, 250)
-            eq_(t2.c.c3.type.length, 200)
-        finally:
-            t1.drop()
+        m2 = MetaData(testing.db)
+        t2 = Table('t1', m2, autoload=True)
+        eq_(t2.c.c1.type.length, 50)
+        eq_(t2.c.c2.type.length, 250)
+        eq_(t2.c.c3.type.length, 200)
 
     @testing.provide_metadata
     def test_long_type(self):
@@ -1379,8 +1405,6 @@ class TypesTest(fixtures.TestBase):
             "xyz"
         )
 
-
-
     def test_longstring(self):
         metadata = MetaData(testing.db)
         testing.db.execute("""
@@ -1431,15 +1455,16 @@ class EuroNumericTest(fixtures.TestBase):
             del os.environ['NLS_LANG']
         self.engine.dispose()
 
-    @testing.provide_metadata
     def test_output_type_handler(self):
-        metadata = self.metadata
         for stmt, exp, kw in [
             ("SELECT 0.1 FROM DUAL", decimal.Decimal("0.1"), {}),
             ("SELECT 15 FROM DUAL", 15, {}),
-            ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL", decimal.Decimal("15"), {}),
-            ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL", decimal.Decimal("0.1"), {}),
-            ("SELECT :num FROM DUAL", decimal.Decimal("2.5"), {'num':decimal.Decimal("2.5")})
+            ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL",
+                                    decimal.Decimal("15"), {}),
+            ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL",
+                                    decimal.Decimal("0.1"), {}),
+            ("SELECT :num FROM DUAL", decimal.Decimal("2.5"),
+                                    {'num': decimal.Decimal("2.5")})
         ]:
             test_exp = self.engine.scalar(stmt, **kw)
             eq_(
@@ -1519,97 +1544,86 @@ class BufferedColumnTest(fixtures.TestBase, AssertsCompiledSQL):
 class UnsupportedIndexReflectTest(fixtures.TestBase):
     __only_on__ = 'oracle'
 
-    def setup(self):
-        global metadata
-        metadata = MetaData(testing.db)
-        t1 = Table('test_index_reflect', metadata,
+    @testing.emits_warning("No column names")
+    @testing.provide_metadata
+    def test_reflect_functional_index(self):
+        metadata = self.metadata
+        Table('test_index_reflect', metadata,
                     Column('data', String(20), primary_key=True)
                 )
         metadata.create_all()
 
-    def teardown(self):
-        metadata.drop_all()
-
-    @testing.emits_warning("No column names")
-    def test_reflect_functional_index(self):
         testing.db.execute('CREATE INDEX DATA_IDX ON '
                            'TEST_INDEX_REFLECT (UPPER(DATA))')
         m2 = MetaData(testing.db)
-        t2 = Table('test_index_reflect', m2, autoload=True)
+        Table('test_index_reflect', m2, autoload=True)
 
 class RoundTripIndexTest(fixtures.TestBase):
     __only_on__ = 'oracle'
 
+    @testing.provide_metadata
     def test_basic(self):
-        engine = testing.db
-        metadata = MetaData(engine)
+        metadata = self.metadata
 
-        table=Table("sometable", metadata,
+        table = Table("sometable", metadata,
             Column("id_a", Unicode(255), primary_key=True),
             Column("id_b", Unicode(255), primary_key=True, unique=True),
             Column("group", Unicode(255), primary_key=True),
             Column("col", Unicode(255)),
-            UniqueConstraint('col','group'),
+            UniqueConstraint('col', 'group'),
         )
 
         # "group" is a keyword, so lower case
         normalind = Index('tableind', table.c.id_b, table.c.group)
 
-        # create
         metadata.create_all()
-        try:
-            # round trip, create from reflection
-            mirror = MetaData(engine)
-            mirror.reflect()
-            metadata.drop_all()
-            mirror.create_all()
-
-            # inspect the reflected creation
-            inspect = MetaData(engine)
-            inspect.reflect()
-
-            def obj_definition(obj):
-                return obj.__class__, tuple([c.name for c in
-                        obj.columns]), getattr(obj, 'unique', None)
-
-            # find what the primary k constraint name should be
-            primaryconsname = engine.execute(
-               text("""SELECT constraint_name
-                   FROM all_constraints
-                   WHERE table_name = :table_name
-                   AND owner = :owner
-                   AND constraint_type = 'P' """),
-               table_name=table.name.upper(),
-               owner=engine.url.username.upper()).fetchall()[0][0]
-
-            reflectedtable = inspect.tables[table.name]
-
-            # make a dictionary of the reflected objects:
-
-            reflected = dict([(obj_definition(i), i) for i in
-                             reflectedtable.indexes
-                             | reflectedtable.constraints])
-
-            # assert we got primary key constraint and its name, Error
-            # if not in dict
-
-            assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b',
-                             'group'), None)].name.upper() \
-                == primaryconsname.upper()
-
-            # Error if not in dict
-
-            assert reflected[(Index, ('id_b', 'group'), False)].name \
-                == normalind.name
-            assert (Index, ('id_b', ), True) in reflected
-            assert (Index, ('col', 'group'), True) in reflected
-            assert len(reflectedtable.constraints) == 1
-            assert len(reflectedtable.indexes) == 3
+        mirror = MetaData(testing.db)
+        mirror.reflect()
+        metadata.drop_all()
+        mirror.create_all()
 
-        finally:
-            metadata.drop_all()
+        inspect = MetaData(testing.db)
+        inspect.reflect()
+
+        def obj_definition(obj):
+            return obj.__class__, tuple([c.name for c in
+                    obj.columns]), getattr(obj, 'unique', None)
+
+        # find what the primary k constraint name should be
+        primaryconsname = testing.db.execute(
+           text("""SELECT constraint_name
+               FROM all_constraints
+               WHERE table_name = :table_name
+               AND owner = :owner
+               AND constraint_type = 'P' """),
+           table_name=table.name.upper(),
+           owner=testing.db.url.username.upper()).fetchall()[0][0]
 
+        reflectedtable = inspect.tables[table.name]
 
+        # make a dictionary of the reflected objects:
+
+        reflected = dict([(obj_definition(i), i) for i in
+                         reflectedtable.indexes
+                         | reflectedtable.constraints])
+
+        # assert we got primary key constraint and its name, Error
+        # if not in dict
+
+        assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b',
+                         'group'), None)].name.upper() \
+            == primaryconsname.upper()
+
+        # Error if not in dict
+
+        eq_(
+            reflected[(Index, ('id_b', 'group'), False)].name,
+            normalind.name
+        )
+        assert (Index, ('id_b', ), True) in reflected
+        assert (Index, ('col', 'group'), True) in reflected
+        eq_(len(reflectedtable.constraints), 1)
+        eq_(len(reflectedtable.indexes), 3)
 
 class SequenceTest(fixtures.TestBase, AssertsCompiledSQL):
 
@@ -1656,11 +1670,11 @@ class ExecuteTest(fixtures.TestBase):
         metadata.create_all()
 
         t.insert().execute(
-            {'id':1, 'data':1},
-            {'id':2, 'data':7},
-            {'id':3, 'data':12},
-            {'id':4, 'data':15},
-            {'id':5, 'data':32},
+            {'id': 1, 'data': 1},
+            {'id': 2, 'data': 7},
+            {'id': 3, 'data': 12},
+            {'id': 4, 'data': 15},
+            {'id': 5, 'data': 32},
         )
 
         # here, we can't use ORDER BY.
@@ -1685,34 +1699,34 @@ class UnicodeSchemaTest(fixtures.TestBase):
     @testing.provide_metadata
     def test_quoted_column_non_unicode(self):
         metadata = self.metadata
-        table=Table("atable", metadata,
+        table = Table("atable", metadata,
             Column("_underscorecolumn", Unicode(255), primary_key=True),
         )
         metadata.create_all()
 
         table.insert().execute(
-            {'_underscorecolumn': u'’é'},
+            {'_underscorecolumn': u('’é')},
         )
         result = testing.db.execute(
-            table.select().where(table.c._underscorecolumn==u'’é')
+            table.select().where(table.c._underscorecolumn == u('’é'))
         ).scalar()
-        eq_(result, u'’é')
+        eq_(result, u('’é'))
 
     @testing.provide_metadata
     def test_quoted_column_unicode(self):
         metadata = self.metadata
-        table=Table("atable", metadata,
-            Column(u"méil", Unicode(255), primary_key=True),
+        table = Table("atable", metadata,
+            Column(u("méil"), Unicode(255), primary_key=True),
         )
         metadata.create_all()
 
         table.insert().execute(
-            {u'méil': u'’é'},
+            {u('méil'): u('’é')},
         )
         result = testing.db.execute(
-            table.select().where(table.c[u'méil']==u'’é')
+            table.select().where(table.c[u('méil')] == u('’é'))
         ).scalar()
-        eq_(result, u'’é')
+        eq_(result, u('’é'))
 
 
 class DBLinkReflectionTest(fixtures.TestBase):