]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- Quick cleanup of defaults.py. The main DefaultTest is still a mess.
authorJason Kirtland <jek@discorporate.us>
Tue, 20 May 2008 00:14:51 +0000 (00:14 +0000)
committerJason Kirtland <jek@discorporate.us>
Tue, 20 May 2008 00:14:51 +0000 (00:14 +0000)
test/engine/_base.py
test/orm/_base.py
test/orm/_fixtures.py
test/sql/_base.py [new file with mode: 0644]
test/sql/defaults.py

index 9ffdb5e1a6f5bf01d70ff5a787729af0ce3b5d89..c215b2e962fc3b3fffbab8204b1a8b8914366dba 100644 (file)
@@ -1,4 +1,147 @@
-from testlib import testing
+from testlib import sa, testing
+from testlib.testing import adict
+
+
+class TablesTest(testing.TestBase):
+    """An integration test that creates and uses tables."""
+
+    # 'once', 'each', None
+    run_setup_bind = 'once'
+
+    # 'once', 'each', None
+    run_define_tables = 'once'
+
+    # 'once', 'each', None
+    run_inserts = 'each'
+
+    # 'foreach', None
+    run_deletes = 'each'
+
+    # 'once', 'each', None
+    run_dispose_bind = None
+
+    _artifact_registries = ('tables', 'other_artifacts')
+
+    bind = None
+    metadata = None
+    tables = None
+    other_artifacts = None
+
+    def setUpAll(self):
+        if self.run_setup_bind is None:
+            assert self.bind is not None
+        assert self.run_deletes in (None, 'each')
+        if self.run_inserts == 'once':
+            assert self.run_deletes is None
+
+        cls = self.__class__
+        if cls.tables is None:
+            cls.tables = adict()
+        if cls.other_artifacts is None:
+            cls.other_artifacts = adict()
+
+        if self.bind is None:
+            setattr(type(self), 'bind', self.setup_bind())
+
+        if self.metadata is None:
+            setattr(type(self), 'metadata', sa.MetaData())
+
+        if self.metadata.bind is None:
+            self.metadata.bind = self.bind
+
+        if self.run_define_tables:
+            self.define_tables(self.metadata)
+            self.metadata.create_all()
+            self.tables.update(self.metadata.tables)
+
+        if self.run_inserts:
+            self._load_fixtures()
+            self.insert_data()
+
+    def setUp(self):
+        if self._sa_first_test:
+            return
+
+        cls = self.__class__
+
+        if self.setup_bind == 'each':
+            setattr(cls, 'bind', self.setup_bind())
+
+        if self.run_define_tables == 'each':
+            self.tables.clear()
+            self.metadata.drop_all()
+            self.metadata.clear()
+            self.define_tables(self.metadata)
+            self.metadata.create_all()
+            self.tables.update(self.metadata.tables)
+
+        if self.run_inserts == 'each':
+            self._load_fixtures()
+            self.insert_data()
+
+    def tearDown(self):
+        # no need to run deletes if tables are recreated on setup
+        if self.run_define_tables != 'each' and self.run_deletes:
+            for table in self.metadata.table_iterator(reverse=True):
+                try:
+                    table.delete().execute().close()
+                except sa.exc.DBAPIError, ex:
+                    print >> sys.stderr, "Error emptying table %s: %r" % (
+                        table, ex)
+
+        if self.run_dispose_bind == 'each':
+            self.dispose_bind(self.bind)
+
+    def tearDownAll(self):
+        self.metadata.drop_all()
+
+        if self.dispose_bind:
+            self.dispose_bind(self.bind)
+
+        self.metadata.bind = None
+
+        if self.run_setup_bind is not None:
+            self.bind = None
+
+    def setup_bind(self):
+        return testing.db
+
+    def dispose_bind(self, bind):
+        if hasattr(bind, 'dispose'):
+            bind.dispose()
+        elif hasattr(bind, 'close'):
+            bind.close()
+
+    def define_tables(self, metadata):
+        raise NotImplementedError()
+
+    def fixtures(self):
+        return {}
+
+    def insert_data(self):
+        pass
+
+    def sql_count_(self, count, fn):
+        self.assert_sql_count(self.bind, fn, count)
+
+    def sql_eq_(self, callable_, statements, with_sequences=None):
+        self.assert_sql(self.bind,
+                        callable_, statements, with_sequences)
+
+    def _load_fixtures(self):
+        headers, rows = {}, {}
+        for table, data in self.fixtures().iteritems():
+            if isinstance(table, basestring):
+                table = self.tables[table]
+            headers[table] = data[0]
+            rows[table] = data[1:]
+        for table in self.metadata.table_iterator(reverse=False):
+            if table not in headers:
+                continue
+            table.bind.execute(
+                table.insert(),
+                [dict(zip(headers[table], column_values))
+                 for column_values in rows[table]])
 
 
 class AltEngineTest(testing.TestBase):
index b952952d7650b7ff29852347c712b408dd39f0c5..62285affbb73ab7ff2c1868d6faa2ed0f1d6fa49 100644 (file)
@@ -199,7 +199,7 @@ class MappedTest(ORMTest):
 
         if self.run_setup_mappers == 'each':
             sa.orm.clear_mappers()
-        
+
         # no need to run deletes if tables are recreated on setup
         if self.run_define_tables != 'each' and self.run_deletes:
             for table in self.metadata.table_iterator(reverse=True):
index 4ded57d9862d853a18330f53e45cb2947ecf0064..6be6c7bd036b2552951dd9ac1c6ce1b9a9c7a267 100644 (file)
@@ -160,79 +160,30 @@ def _load_fixtures():
 def run_inserts_for(table, bind=None):
     table.info[('fixture', 'loader')](bind)
 
-
 class Base(_base.ComparableEntity):
     pass
 
-_recursion_stack = set()
-class ZBase(_base.BasicEntity):
-    def __ne__(self, other):
-        return not self.__eq__(other)
-
-    def __eq__(self, other):
-        """'passively' compare this object to another.
-
-        only look at attributes that are present on the source object.
-
-        """
-        if self in _recursion_stack:
-            return True
-        _recursion_stack.add(self)
-        try:
-            # pick the entity thats not SA persisted as the source
-            try:
-                state = attributes.instance_state(self)
-                key = state.key
-            except (KeyError, AttributeError):
-                key = None
-            if other is None:
-                a = self
-                b = other
-            elif key is not None:
-                a = other
-                b = self
-            else:
-                a = self
-                b = other
-
-            for attr in a.__dict__.keys():
-                if attr[0] == '_':
-                    continue
-                value = getattr(a, attr)
-                #print "looking at attr:", attr, "start value:", value
-                if hasattr(value, '__iter__') and not isinstance(value, basestring):
-                    try:
-                        # catch AttributeError so that lazy loaders trigger
-                        battr = getattr(b, attr)
-                    except AttributeError:
-                        #print "b class does not have attribute named '%s'" % attr
-                        #raise
-                        return False
-
-                    if list(value) == list(battr):
-                        continue
-                    else:
-                        return False
-                else:
-                    if value is not None:
-                        if value != getattr(b, attr, None):
-                            #print "2. Attribute named '%s' does not match that of b" % attr
-                            return False
-            else:
-                return True
-        finally:
-            _recursion_stack.remove(self)
 
 class User(Base):
     pass
+
+
 class Order(Base):
     pass
+
+
 class Item(Base):
     pass
+
+
 class Keyword(Base):
     pass
+
+
 class Address(Base):
     pass
+
+
 class Dingaling(Base):
     pass
 
@@ -253,7 +204,7 @@ class FixtureTest(_base.MappedTest):
     run_setup_mappers = 'each'
     run_inserts = 'each'
     run_deletes = 'each'
-    
+
     metadata = fixture_metadata
     fixture_classes = dict(User=User,
                            Order=Order,
diff --git a/test/sql/_base.py b/test/sql/_base.py
new file mode 100644 (file)
index 0000000..c1a107e
--- /dev/null
@@ -0,0 +1,4 @@
+from engine import _base as engine_base
+
+
+TablesTest = engine_base.TablesTest
index c496dfca84884490898c1937b625bea8f6379b44..fbea5888eb4a63a0119bb4016bcab300cdc24337 100644 (file)
@@ -1,14 +1,14 @@
 import testenv; testenv.configure_for_tests()
 import datetime
-from sqlalchemy import *
-from sqlalchemy import exc, schema, util
-from sqlalchemy.orm import mapper, create_session
+from sqlalchemy import Sequence, Column, func
 from testlib import sa, testing
+from testlib.sa import MetaData, Table, Integer, String, ForeignKey
 from testlib.testing import eq_
-from testlib import *
+from testlib.compat import set
+from sql import _base
 
 
-class DefaultTest(TestBase):
+class DefaultTest(testing.TestBase):
 
     def setUpAll(self):
         global t, f, f2, ts, currenttime, metadata, default_generator
@@ -23,12 +23,12 @@ class DefaultTest(TestBase):
 
         def myupdate_with_ctx(ctx):
             conn = ctx.connection
-            return conn.execute(select([text('13')])).scalar()
+            return conn.execute(sa.select([sa.text('13')])).scalar()
 
         def mydefault_using_connection(ctx):
             conn = ctx.connection
             try:
-                return conn.execute(select([text('12')])).scalar()
+                return conn.execute(sa.select([sa.text('12')])).scalar()
             finally:
                 # ensure a "close()" on this connection does nothing,
                 # since its a "branched" connection
@@ -40,32 +40,32 @@ class DefaultTest(TestBase):
         # select "count(1)" returns different results on different DBs also
         # correct for "current_date" compatible as column default, value
         # differences
-        currenttime = func.current_date(type_=Date, bind=db)
+        currenttime = func.current_date(type_=sa.Date, bind=db)
 
         if is_oracle:
-            ts = db.scalar(select([func.trunc(func.sysdate(), literal_column("'DAY'"), type_=Date).label('today')]))
+            ts = db.scalar(sa.select([func.trunc(func.sysdate(), sa.literal_column("'DAY'"), type_=sa.Date).label('today')]))
             assert isinstance(ts, datetime.date) and not isinstance(ts, datetime.datetime)
-            f = select([func.length('abcdef')], bind=db).scalar()
-            f2 = select([func.length('abcdefghijk')], bind=db).scalar()
+            f = sa.select([func.length('abcdef')], bind=db).scalar()
+            f2 = sa.select([func.length('abcdefghijk')], bind=db).scalar()
             # TODO: engine propigation across nested functions not working
-            currenttime = func.trunc(currenttime, literal_column("'DAY'"), bind=db, type_=Date)
+            currenttime = func.trunc(currenttime, sa.literal_column("'DAY'"), bind=db, type_=sa.Date)
             def1 = currenttime
-            def2 = func.trunc(text("sysdate"), literal_column("'DAY'"), type_=Date)
+            def2 = func.trunc(sa.text("sysdate"), sa.literal_column("'DAY'"), type_=sa.Date)
 
-            deftype = Date
+            deftype = sa.Date
         elif use_function_defaults:
-            f = select([func.length('abcdef')], bind=db).scalar()
-            f2 = select([func.length('abcdefghijk')], bind=db).scalar()
+            f = sa.select([func.length('abcdef')], bind=db).scalar()
+            f2 = sa.select([func.length('abcdefghijk')], bind=db).scalar()
             def1 = currenttime
             if testing.against('maxdb'):
-                def2 = text("curdate")
+                def2 = sa.text("curdate")
             else:
-                def2 = text("current_date")
-            deftype = Date
+                def2 = sa.text("current_date")
+            deftype = sa.Date
             ts = db.func.current_date().scalar()
         else:
-            f = select([func.length('abcdef')], bind=db).scalar()
-            f2 = select([func.length('abcdefghijk')], bind=db).scalar()
+            f = len('abcdef')
+            f2 = len('abcdefghijk')
             def1 = def2 = "3"
             ts = 3
             deftype = Integer
@@ -94,12 +94,12 @@ class DefaultTest(TestBase):
                    server_default=def2),
 
             # preexecute + update timestamp
-            Column('col6', Date,
+            Column('col6', sa.Date,
                    default=currenttime,
                    onupdate=currenttime),
 
-            Column('boolcol1', Boolean, default=True),
-            Column('boolcol2', Boolean, default=False),
+            Column('boolcol1', sa.Boolean, default=True),
+            Column('boolcol2', sa.Boolean, default=False),
 
             # python function which uses ExecutionContext
             Column('col7', Integer,
@@ -107,7 +107,7 @@ class DefaultTest(TestBase):
                    onupdate=myupdate_with_ctx),
 
             # python builtin
-            Column('col8', Date,
+            Column('col8', sa.Date,
                    default=datetime.date.today,
                    onupdate=datetime.date.today),
             # combo
@@ -123,7 +123,7 @@ class DefaultTest(TestBase):
         default_generator['x'] = 50
         t.delete().execute()
 
-    def test_bad_argsignature(self):
+    def test_bad_arg_signature(self):
         ex_msg = \
           "ColumnDefault Python function takes zero or one positional arguments"
 
@@ -138,13 +138,11 @@ class DefaultTest(TestBase):
         fn4 = FN4()
 
         for fn in fn1, fn2, fn3, fn4:
-            try:
-                c = ColumnDefault(fn)
-                assert False, str(fn)
-            except exc.ArgumentError, e:
-                assert str(e) == ex_msg
+            self.assertRaisesMessage(sa.exc.ArgumentError,
+                                     ex_msg,
+                                     sa.ColumnDefault, fn)
 
-    def test_argsignature(self):
+    def test_arg_signature(self):
         def fn1(): pass
         def fn2(): pass
         def fn3(x=1): pass
@@ -166,17 +164,18 @@ class DefaultTest(TestBase):
         fn8 = FN8()
 
         for fn in fn1, fn2, fn3, fn4, fn5, fn6, fn7, fn8:
-            c = ColumnDefault(fn)
+            c = sa.ColumnDefault(fn)
 
-    def teststandalone(self):
+    @testing.fails_on('firebird') # 'Data type unknown'
+    def test_standalone(self):
         c = testing.db.engine.contextual_connect()
         x = c.execute(t.c.col1.default)
         y = t.c.col2.default.execute()
         z = c.execute(t.c.col3.default)
-        self.assert_(50 <= x <= 57)
-        self.assert_(y == 'imthedefault')
-        self.assert_(z == f)
-        self.assert_(f2==11)
+        assert 50 <= x <= 57
+        eq_(y, 'imthedefault')
+        eq_(z, f)
+        eq_(f2, 11)
 
     def test_py_vs_server_default_detection(self):
 
@@ -202,6 +201,8 @@ class DefaultTest(TestBase):
         has_('col8', 'default', 'onupdate')
         has_('col9', 'default', 'server_default')
 
+        ColumnDefault, DefaultClause = sa.ColumnDefault, sa.DefaultClause
+
         t2 = Table('t2', MetaData(),
                    Column('col1', Integer, Sequence('foo')),
                    Column('col2', Integer,
@@ -244,31 +245,38 @@ class DefaultTest(TestBase):
         has_('col7', 'default', 'server_default', 'onupdate')
         has_('col8', 'default', 'server_default', 'onupdate', 'server_onupdate')
 
+    @testing.fails_on('firebird') # 'Data type unknown'
     def test_insert(self):
         r = t.insert().execute()
         assert r.lastrow_has_defaults()
-        eq_(util.Set(r.context.postfetch_cols), util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
+        eq_(set(r.context.postfetch_cols),
+            set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
 
         r = t.insert(inline=True).execute()
         assert r.lastrow_has_defaults()
-        eq_(util.Set(r.context.postfetch_cols), util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
+        eq_(set(r.context.postfetch_cols),
+            set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
 
         t.insert().execute()
 
-        ctexec = select([currenttime.label('now')], bind=testing.db).scalar()
+        ctexec = sa.select([currenttime.label('now')], bind=testing.db).scalar()
         l = t.select().execute()
         today = datetime.date.today()
         eq_(l.fetchall(), [
-            (x, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py')
+            (x, 'imthedefault', f, ts, ts, ctexec, True, False,
+             12, today, 'py')
             for x in range(51, 54)])
 
         t.insert().execute(col9=None)
         assert r.lastrow_has_defaults()
-        eq_(util.Set(r.context.postfetch_cols), util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
+        eq_(set(r.context.postfetch_cols),
+            set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
 
         eq_(t.select(t.c.col1==54).execute().fetchall(),
-            [(54, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, None)])
+            [(54, 'imthedefault', f, ts, ts, ctexec, True, False,
+              12, today, None)])
 
+    @testing.fails_on('firebird') # 'Data type unknown'
     def test_insertmany(self):
         # MySQL-Python 1.2.2 breaks functions in execute_many :(
         if (testing.against('mysql') and
@@ -281,15 +289,19 @@ class DefaultTest(TestBase):
         l = t.select().execute()
         today = datetime.date.today()
         eq_(l.fetchall(),
-            [(51, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py'),
-             (52, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py'),
-             (53, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py')])
+            [(51, 'imthedefault', f, ts, ts, ctexec, True, False,
+              12, today, 'py'),
+             (52, 'imthedefault', f, ts, ts, ctexec, True, False,
+              12, today, 'py'),
+             (53, 'imthedefault', f, ts, ts, ctexec, True, False,
+              12, today, 'py')])
 
     def test_insert_values(self):
         t.insert(values={'col3':50}).execute()
         l = t.select().execute()
-        self.assert_(l.fetchone()['col3'] == 50)
+        eq_(50, l.fetchone()['col3'])
 
+    @testing.fails_on('firebird') # 'Data type unknown'
     def test_updatemany(self):
         # MySQL-Python 1.2.2 breaks functions in execute_many :(
         if (testing.against('mysql') and
@@ -298,44 +310,49 @@ class DefaultTest(TestBase):
 
         t.insert().execute({}, {}, {})
 
-        t.update(t.c.col1==bindparam('pkval')).execute(
-            {'pkval':51,'col7':None, 'col8':None, 'boolcol1':False},
-        )
+        t.update(t.c.col1==sa.bindparam('pkval')).execute(
+            {'pkval':51,'col7':None, 'col8':None, 'boolcol1':False})
 
-        t.update(t.c.col1==bindparam('pkval')).execute(
+        t.update(t.c.col1==sa.bindparam('pkval')).execute(
             {'pkval':51,},
             {'pkval':52,},
-            {'pkval':53,},
-        )
+            {'pkval':53,})
 
         l = t.select().execute()
         ctexec = currenttime.scalar()
         today = datetime.date.today()
         eq_(l.fetchall(),
-            [(51, 'im the update', f2, ts, ts, ctexec, False, False, 13, today, 'py'),
-             (52, 'im the update', f2, ts, ts, ctexec, True, False, 13, today, 'py'),
-             (53, 'im the update', f2, ts, ts, ctexec, True, False, 13, today, 'py')])
-
-    def testupdate(self):
+            [(51, 'im the update', f2, ts, ts, ctexec, False, False,
+              13, today, 'py'),
+             (52, 'im the update', f2, ts, ts, ctexec, True, False,
+              13, today, 'py'),
+             (53, 'im the update', f2, ts, ts, ctexec, True, False,
+              13, today, 'py')])
+
+    @testing.fails_on('firebird') # 'Data type unknown'
+    def test_update(self):
         r = t.insert().execute()
         pk = r.last_inserted_ids()[0]
         t.update(t.c.col1==pk).execute(col4=None, col5=None)
         ctexec = currenttime.scalar()
         l = t.select(t.c.col1==pk).execute()
         l = l.fetchone()
-        self.assert_(l == (pk, 'im the update', f2, None, None, ctexec, True, False, 13, datetime.date.today(), 'py'))
-        self.assert_(f2==11)
+        eq_(l,
+            (pk, 'im the update', f2, None, None, ctexec, True, False,
+             13, datetime.date.today(), 'py'))
+        eq_(11, f2)
 
-    def testupdatevalues(self):
+    @testing.fails_on('firebird') # 'Data type unknown'
+    def test_update_values(self):
         r = t.insert().execute()
         pk = r.last_inserted_ids()[0]
         t.update(t.c.col1==pk, values={'col3': 55}).execute()
         l = t.select(t.c.col1==pk).execute()
         l = l.fetchone()
-        self.assert_(l['col3'] == 55)
+        eq_(55, l['col3'])
 
     @testing.fails_on_everything_except('postgres')
-    def testpassiveoverride(self):
+    def test_passive_override(self):
         """
         Primarily for postgres, tests that when we get a primary key column
         back from reflecting a table which has a default value on it, we
@@ -345,6 +362,7 @@ class DefaultTest(TestBase):
         locate the just inserted row.
 
         """
+        # TODO: move this to dialect/postgres
         try:
             meta = MetaData(testing.db)
             testing.db.execute("""
@@ -360,58 +378,48 @@ class DefaultTest(TestBase):
             t = Table("speedy_users", meta, autoload=True)
             t.insert().execute(user_name='user', user_password='lala')
             l = t.select().execute().fetchall()
-            self.assert_(l == [(1, 'user', 'lala')])
+            eq_(l, [(1, 'user', 'lala')])
         finally:
             testing.db.execute("drop table speedy_users", None)
 
-class PKDefaultTest(TestBase):
-    __requires__ = ('subqueries',)
-
-    def setUpAll(self):
-        global metadata, t1, t2
 
-        metadata = MetaData(testing.db)
+class PKDefaultTest(_base.TablesTest):
+    __requires__ = ('subqueries',)
 
+    def define_tables(self, metadata):
         t2 = Table('t2', metadata,
             Column('nextid', Integer))
 
-        t1 = Table('t1', metadata,
-            Column('id', Integer, primary_key=True,
-                   default=select([func.max(t2.c.nextid)]).as_scalar()),
-            Column('data', String(30)))
-
-        metadata.create_all()
-
-    def tearDownAll(self):
-        metadata.drop_all()
+        Table('t1', metadata,
+              Column('id', Integer, primary_key=True,
+                     default=sa.select([func.max(t2.c.nextid)]).as_scalar()),
+              Column('data', String(30)))
 
     @testing.crashes('mssql', 'FIXME: unknown, verify not fails_on')
+    @testing.resolve_artifact_names
     def test_basic(self):
         t2.insert().execute(nextid=1)
         r = t1.insert().execute(data='hi')
-        assert r.last_inserted_ids() == [1]
+        eq_([1], r.last_inserted_ids())
 
         t2.insert().execute(nextid=2)
         r = t1.insert().execute(data='there')
-        assert r.last_inserted_ids() == [2]
-
+        eq_([2], r.last_inserted_ids())
 
-class PKIncrementTest(TestBase):
-    def setUp(self):
-        global aitable, aimeta
 
-        aimeta = MetaData(testing.db)
-        aitable = Table("aitest", aimeta,
-            Column('id', Integer, Sequence('ai_id_seq', optional=True),
-                   primary_key=True),
-            Column('int1', Integer),
-            Column('str1', String(20)))
-        aimeta.create_all()
+class PKIncrementTest(_base.TablesTest):
+    run_define_tables = 'each'
 
-    def tearDown(self):
-        aimeta.drop_all()
+    def define_tables(self, metadata):
+        Table("aitable", metadata,
+              Column('id', Integer, Sequence('ai_id_seq', optional=True),
+                     primary_key=True),
+              Column('int1', Integer),
+              Column('str1', String(20)))
 
     # TODO: add coverage for increment on a secondary column in a key
+    @testing.fails_on('firebird') # data type unknown
+    @testing.resolve_artifact_names
     def _test_autoincrement(self, bind):
         ids = set()
         rs = bind.execute(aitable.insert(), int1=1)
@@ -438,13 +446,14 @@ class PKIncrementTest(TestBase):
         self.assert_(last not in ids)
         ids.add(last)
 
-        self.assert_(
-            list(bind.execute(aitable.select().order_by(aitable.c.id))) ==
+        eq_(list(bind.execute(aitable.select().order_by(aitable.c.id))),
             [(1, 1, None), (2, None, 'row 2'), (3, 3, 'row 3'), (4, 4, None)])
 
+    @testing.resolve_artifact_names
     def test_autoincrement_autocommit(self):
         self._test_autoincrement(testing.db)
 
+    @testing.resolve_artifact_names
     def test_autoincrement_transaction(self):
         con = testing.db.connect()
         tx = con.begin()
@@ -463,64 +472,57 @@ class PKIncrementTest(TestBase):
             con.close()
 
 
-class AutoIncrementTest(TestBase):
+class AutoIncrementTest(_base.TablesTest):
     __requires__ = ('identity',)
+    run_define_tables = 'each'
+
+    def define_tables(self, metadata):
+        """Each test manipulates self.metadata individually."""
 
     @testing.exclude('sqlite', '<', (3, 4), 'no database support')
     def test_autoincrement_single_col(self):
-        metadata = MetaData(testing.db)
+        single = Table('single', self.metadata,
+                       Column('id', Integer, primary_key=True))
+        single.create()
 
-        single = Table('single', metadata,
-            Column('id', Integer, primary_key=True))
-        metadata.create_all()
-        try:
-            r = single.insert().execute()
-            id_ = r.last_inserted_ids()[0]
-            assert id_ is not None
-            eq_(select([func.count(text('*'))], from_obj=single).scalar(), 1)
-        finally:
-            metadata.drop_all()
+        r = single.insert().execute()
+        id_ = r.last_inserted_ids()[0]
+        assert id_ is not None
+        eq_(1, sa.select([func.count(sa.text('*'))], from_obj=single).scalar())
 
     def test_autoincrement_fk(self):
-        metadata = MetaData(testing.db)
-
-        nodes = Table('nodes', metadata,
+        nodes = Table('nodes', self.metadata,
             Column('id', Integer, primary_key=True),
             Column('parent_id', Integer, ForeignKey('nodes.id')),
             Column('data', String(30)))
-        metadata.create_all()
-        try:
-            r = nodes.insert().execute(data='foo')
-            id_ = r.last_inserted_ids()[0]
-            nodes.insert().execute(data='bar', parent_id=id_)
-        finally:
-            metadata.drop_all()
+        nodes.create()
+
+        r = nodes.insert().execute(data='foo')
+        id_ = r.last_inserted_ids()[0]
+        nodes.insert().execute(data='bar', parent_id=id_)
 
     @testing.fails_on('sqlite')
     def test_non_autoincrement(self):
         # sqlite INT primary keys can be non-unique! (only for ints)
-        meta = MetaData(testing.db)
-        nonai_table = Table("nonaitest", meta,
+        nonai = Table("nonaitest", self.metadata,
             Column('id', Integer, autoincrement=False, primary_key=True),
             Column('data', String(20)))
-        nonai_table.create(checkfirst=True)
+        nonai.create()
+
+
         try:
-            try:
-                # postgres + mysql strict will fail on first row,
-                # mysql in legacy mode fails on second row
-                nonai_table.insert().execute(data='row 1')
-                nonai_table.insert().execute(data='row 2')
-                assert False
-            except exc.SQLError, e:
-                print "Got exception", str(e)
-                assert True
-
-            nonai_table.insert().execute(id=1, data='row 1')
-        finally:
-            nonai_table.drop()
+            # postgres + mysql strict will fail on first row,
+            # mysql in legacy mode fails on second row
+            nonai.insert().execute(data='row 1')
+            nonai.insert().execute(data='row 2')
+            assert False
+        except sa.exc.SQLError, e:
+            assert True
+
+        nonai.insert().execute(id=1, data='row 1')
 
 
-class SequenceTest(TestBase):
+class SequenceTest(testing.TestBase):
     __requires__ = ('sequences',)
 
     def setUpAll(self):
@@ -529,7 +531,7 @@ class SequenceTest(TestBase):
         cartitems = Table("cartitems", metadata,
             Column("cart_id", Integer, Sequence('cart_id_seq'), primary_key=True),
             Column("description", String(40)),
-            Column("createdate", DateTime())
+            Column("createdate", sa.DateTime())
         )
         sometable = Table( 'Manager', metadata,
                Column('obj_id', Integer, Sequence('obj_id_seq'), ),
@@ -551,14 +553,12 @@ class SequenceTest(TestBase):
 
         sometable.insert().execute(
             {'name':'name3'},
-            {'name':'name4'}
-        )
-        assert sometable.select().execute().fetchall() == [
-            (1, "somename", 1),
-            (2, "someother", 2),
-            (3, "name3", 3),
-            (4, "name4", 4),
-        ]
+            {'name':'name4'})
+        eq_(sometable.select().execute().fetchall(),
+            [(1, "somename", 1),
+             (2, "someother", 2),
+             (3, "name3", 3),
+             (4, "name4", 4)])
 
     def testsequence(self):
         cartitems.insert().execute(description='hi')
@@ -568,13 +568,13 @@ class SequenceTest(TestBase):
         assert r.last_inserted_ids() and r.last_inserted_ids()[0] is not None
         id_ = r.last_inserted_ids()[0]
 
-        assert select([func.count(cartitems.c.cart_id)],
-                      and_(cartitems.c.description == 'lala',
-                           cartitems.c.cart_id == id_)).scalar() == 1
+        eq_(1,
+            sa.select([func.count(cartitems.c.cart_id)],
+                      sa.and_(cartitems.c.description == 'lala',
+                              cartitems.c.cart_id == id_)).scalar())
 
         cartitems.select().execute().fetchall()
 
-
     @testing.fails_on('maxdb')
     # maxdb db-api seems to double-execute NEXTVAL internally somewhere,
     # throwing off the numbers for these tests...
@@ -583,7 +583,7 @@ class SequenceTest(TestBase):
         s.create()
         try:
             x = s.execute()
-            self.assert_(x == 1)
+            eq_(x, 1)
         finally:
             s.drop()
 
@@ -593,7 +593,7 @@ class SequenceTest(TestBase):
         s.create(bind=testing.db)
         try:
             x = s.execute(testing.db)
-            self.assert_(x == 1)
+            eq_(x, 1)
         finally:
             s.drop(testing.db)