]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Modernize test_defaults
authorMike Bayer <mike_mp@zzzcomputing.com>
Thu, 9 Apr 2020 22:03:30 +0000 (18:03 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 10 Apr 2020 14:47:15 +0000 (10:47 -0400)
Use modern execution patterns, goal is so that these same tests
can work for the future engine

break sequence tests into test_sequences suite

sequence tests that are testing implicit execution patterns
at least move into their own suite that will go into test_deprecations
eventually.

Change-Id: I27cac9bd265c86ff2a3381ff9f844f60ef991cfc

lib/sqlalchemy/sql/schema.py
test/sql/test_defaults.py
test/sql/test_sequences.py [new file with mode: 0644]

index 5c6b1f3c6e73ab64a1d3982ff2d79421e855f25f..eb6c12f8012dd130a5a8aa4e110788b42c02c88a 100644 (file)
@@ -2100,10 +2100,17 @@ class DefaultGenerator(SchemaItem):
         else:
             self.column.default = self
 
+    @util.deprecated_20(
+        ":meth:`.DefaultGenerator.execute`",
+        alternative="All statement execution in SQLAlchemy 2.0 is performed "
+        "by the :meth:`.Connection.execute` method of :class:`.Connection`, "
+        "or in the ORM by the :meth:`.Session.execute` method of "
+        ":class:`.Session`.",
+    )
     def execute(self, bind=None, **kwargs):
         if bind is None:
             bind = _bind_or_error(self)
-        return bind._execute_default(self, **kwargs)
+        return bind.execute(self, **kwargs)
 
     def _execute_on_connection(self, connection, multiparams, params):
         return connection._execute_default(self, multiparams, params)
index 7d33933f82180b1363a667a36d3a6491d26d65dc..52e73b22331c2d3d1eb0cd32000d3c157e75e30d 100644 (file)
@@ -15,11 +15,7 @@ from sqlalchemy import Sequence
 from sqlalchemy import String
 from sqlalchemy import testing
 from sqlalchemy import Unicode
-from sqlalchemy import util
-from sqlalchemy.dialects import sqlite
-from sqlalchemy.schema import CreateSequence
 from sqlalchemy.schema import CreateTable
-from sqlalchemy.schema import DropSequence
 from sqlalchemy.sql import literal_column
 from sqlalchemy.sql import select
 from sqlalchemy.sql import text
@@ -30,9 +26,6 @@ from sqlalchemy.testing import eq_
 from sqlalchemy.testing import expect_warnings
 from sqlalchemy.testing import fixtures
 from sqlalchemy.testing import mock
-from sqlalchemy.testing.assertsql import AllOf
-from sqlalchemy.testing.assertsql import CompiledSQL
-from sqlalchemy.testing.assertsql import EachOf
 from sqlalchemy.testing.schema import Column
 from sqlalchemy.testing.schema import Table
 from sqlalchemy.types import TypeDecorator
@@ -41,9 +34,6 @@ from sqlalchemy.util import b
 from sqlalchemy.util import u
 
 
-t = f = f2 = ts = currenttime = metadata = default_generator = None
-
-
 class DDLTest(fixtures.TestBase, AssertsCompiledSQL):
     __dialect__ = "default"
 
@@ -142,154 +132,7 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL):
         )
 
 
-class DefaultTest(fixtures.TestBase):
-    __backend__ = True
-
-    @classmethod
-    def setup_class(cls):
-        global t, f, f2, ts, currenttime, metadata, default_generator
-
-        db = testing.db
-        metadata = MetaData(db)
-        default_generator = {"x": 50}
-
-        def mydefault():
-            default_generator["x"] += 1
-            return default_generator["x"]
-
-        def myupdate_with_ctx(ctx):
-            conn = ctx.connection
-            return conn.execute(sa.select([sa.text("13")])).scalar()
-
-        def mydefault_using_connection(ctx):
-            conn = ctx.connection
-            return conn.execute(sa.select([sa.text("12")])).scalar()
-
-        use_function_defaults = testing.against("postgresql", "mssql")
-        is_oracle = testing.against("oracle")
-
-        class MyClass(object):
-            @classmethod
-            def gen_default(cls, ctx):
-                return "hi"
-
-        class MyType(TypeDecorator):
-            impl = String(50)
-
-            def process_bind_param(self, value, dialect):
-                if value is not None:
-                    value = "BIND" + value
-                return value
-
-        # select "count(1)" returns different results on different DBs also
-        # correct for "current_date" compatible as column default, value
-        # differences
-        currenttime = func.current_date(type_=sa.Date, bind=db)
-        if is_oracle:
-            ts = db.scalar(
-                sa.select(
-                    [
-                        func.trunc(
-                            func.current_timestamp(),
-                            sa.literal_column("'DAY'"),
-                            type_=sa.Date,
-                        )
-                    ]
-                )
-            )
-            assert isinstance(ts, datetime.date) and not isinstance(
-                ts, datetime.datetime
-            )
-            f = sa.select([func.length("abcdef")], bind=db).scalar()
-            f2 = sa.select([func.length("abcdefghijk")], bind=db).scalar()
-            # TODO: engine propagation across nested functions not working
-            currenttime = func.trunc(
-                currenttime, sa.literal_column("'DAY'"), bind=db, type_=sa.Date
-            )
-            def1 = currenttime
-            def2 = func.trunc(
-                sa.text("current_timestamp"),
-                sa.literal_column("'DAY'"),
-                type_=sa.Date,
-            )
-
-            deftype = sa.Date
-        elif use_function_defaults:
-            f = sa.select([func.length("abcdef")], bind=db).scalar()
-            f2 = sa.select([func.length("abcdefghijk")], bind=db).scalar()
-            def1 = currenttime
-            deftype = sa.Date
-            if testing.against("mssql"):
-                def2 = sa.text("getdate()")
-            else:
-                def2 = sa.text("current_date")
-            ts = db.scalar(func.current_date())
-        else:
-            f = len("abcdef")
-            f2 = len("abcdefghijk")
-            def1 = def2 = "3"
-            ts = 3
-            deftype = Integer
-
-        t = Table(
-            "default_test1",
-            metadata,
-            # python function
-            Column("col1", Integer, primary_key=True, default=mydefault),
-            # python literal
-            Column(
-                "col2",
-                String(20),
-                default="imthedefault",
-                onupdate="im the update",
-            ),
-            # preexecute expression
-            Column(
-                "col3",
-                Integer,
-                default=func.length("abcdef"),
-                onupdate=func.length("abcdefghijk"),
-            ),
-            # SQL-side default from sql expression
-            Column("col4", deftype, server_default=def1),
-            # SQL-side default from literal expression
-            Column("col5", deftype, server_default=def2),
-            # preexecute + update timestamp
-            Column("col6", sa.Date, default=currenttime, onupdate=currenttime),
-            Column("boolcol1", sa.Boolean, default=True),
-            Column("boolcol2", sa.Boolean, default=False),
-            # python function which uses ExecutionContext
-            Column(
-                "col7",
-                Integer,
-                default=mydefault_using_connection,
-                onupdate=myupdate_with_ctx,
-            ),
-            # python builtin
-            Column(
-                "col8",
-                sa.Date,
-                default=datetime.date.today,
-                onupdate=datetime.date.today,
-            ),
-            # combo
-            Column("col9", String(20), default="py", server_default="ddl"),
-            # python method w/ context
-            Column("col10", String(20), default=MyClass.gen_default),
-            # fixed default w/ type that has bound processor
-            Column("col11", MyType(), default="foo"),
-        )
-
-        t.create()
-
-    @classmethod
-    def teardown_class(cls):
-        t.drop()
-
-    def teardown(self):
-        default_generator["x"] = 50
-        t.delete().execute()
-
+class DefaultObjectTest(fixtures.TestBase):
     def test_bad_arg_signature(self):
         ex_msg = (
             "ColumnDefault Python function takes zero "
@@ -356,50 +199,89 @@ class DefaultTest(fixtures.TestBase):
             c = sa.ColumnDefault(fn)
             c.arg("context")
 
-    @testing.fails_on("firebird", "Data type unknown")
-    def test_standalone(self):
-        c = testing.db.engine.connect()
-        x = c.execute(t.c.col1.default)
-        y = t.c.col2.default.execute()
-        z = c.execute(t.c.col3.default)
-        assert 50 <= x <= 57
-        eq_(y, "imthedefault")
-        eq_(z, f)
-        eq_(f2, 11)
-
-    def test_py_vs_server_default_detection(self):
-        def has_(name, *wanted):
-            slots = [
-                "default",
-                "onupdate",
-                "server_default",
-                "server_onupdate",
-            ]
-            col = tbl.c[name]
-            for slot in wanted:
-                slots.remove(slot)
-                assert getattr(col, slot) is not None, getattr(col, slot)
-            for slot in slots:
-                assert getattr(col, slot) is None, getattr(col, slot)
-
-        tbl = t
-        has_("col1", "default")
-        has_("col2", "default", "onupdate")
-        has_("col3", "default", "onupdate")
-        has_("col4", "server_default")
-        has_("col5", "server_default")
-        has_("col6", "default", "onupdate")
-        has_("boolcol1", "default")
-        has_("boolcol2", "default")
-        has_("col7", "default", "onupdate")
-        has_("col8", "default", "onupdate")
-        has_("col9", "default", "server_default")
+    def _check_default_slots(self, tbl, name, *wanted):
+        slots = [
+            "default",
+            "onupdate",
+            "server_default",
+            "server_onupdate",
+        ]
+        col = tbl.c[name]
+        for slot in wanted:
+            slots.remove(slot)
+            assert getattr(col, slot) is not None, getattr(col, slot)
+        for slot in slots:
+            assert getattr(col, slot) is None, getattr(col, slot)
+
+    def test_py_vs_server_default_detection_one(self):
+        has_ = self._check_default_slots
+
+        metadata = MetaData()
+        tbl = Table(
+            "default_test",
+            metadata,
+            # python function
+            Column("col1", Integer, primary_key=True, default="1"),
+            # python literal
+            Column(
+                "col2",
+                String(20),
+                default="imthedefault",
+                onupdate="im the update",
+            ),
+            # preexecute expression
+            Column(
+                "col3",
+                Integer,
+                default=func.length("abcdef"),
+                onupdate=func.length("abcdefghijk"),
+            ),
+            # SQL-side default from sql expression
+            Column("col4", Integer, server_default="1"),
+            # SQL-side default from literal expression
+            Column("col5", Integer, server_default="1"),
+            # preexecute + update timestamp
+            Column(
+                "col6",
+                sa.Date,
+                default=datetime.datetime.today,
+                onupdate=datetime.datetime.today,
+            ),
+            Column("boolcol1", sa.Boolean, default=True),
+            Column("boolcol2", sa.Boolean, default=False),
+            # python function which uses ExecutionContext
+            Column("col7", Integer, default=lambda: 5, onupdate=lambda: 10,),
+            # python builtin
+            Column(
+                "col8",
+                sa.Date,
+                default=datetime.date.today,
+                onupdate=datetime.date.today,
+            ),
+            Column("col9", String(20), default="py", server_default="ddl"),
+        )
+
+        has_(tbl, "col1", "default")
+        has_(tbl, "col2", "default", "onupdate")
+        has_(tbl, "col3", "default", "onupdate")
+        has_(tbl, "col4", "server_default")
+        has_(tbl, "col5", "server_default")
+        has_(tbl, "col6", "default", "onupdate")
+        has_(tbl, "boolcol1", "default")
+        has_(tbl, "boolcol2", "default")
+        has_(tbl, "col7", "default", "onupdate")
+        has_(tbl, "col8", "default", "onupdate")
+        has_(tbl, "col9", "default", "server_default")
+
+    def test_py_vs_server_default_detection_two(self):
+        has_ = self._check_default_slots
 
+        metadata = MetaData()
         ColumnDefault, DefaultClause = sa.ColumnDefault, sa.DefaultClause
 
-        t2 = Table(
+        tbl = Table(
             "t2",
-            MetaData(),
+            metadata,
             Column("col1", Integer, Sequence("foo")),
             Column(
                 "col2", Integer, default=Sequence("foo"), server_default="y"
@@ -439,40 +321,218 @@ class DefaultTest(fixtures.TestBase):
                 onupdate="z",
             ),
         )
-        tbl = t2
-        has_("col1", "default")
-        has_("col2", "default", "server_default")
-        has_("col3", "default", "server_default")
-        has_("col4", "default", "server_default", "server_onupdate")
-        has_("col5", "default", "server_default", "onupdate")
-        has_("col6", "default", "server_default", "onupdate")
-        has_("col7", "default", "server_default", "onupdate")
+        has_(tbl, "col1", "default")
+        has_(tbl, "col2", "default", "server_default")
+        has_(tbl, "col3", "default", "server_default")
+        has_(tbl, "col4", "default", "server_default", "server_onupdate")
+        has_(tbl, "col5", "default", "server_default", "onupdate")
+        has_(tbl, "col6", "default", "server_default", "onupdate")
+        has_(tbl, "col7", "default", "server_default", "onupdate")
         has_(
-            "col8", "default", "server_default", "onupdate", "server_onupdate"
+            tbl,
+            "col8",
+            "default",
+            "server_default",
+            "onupdate",
+            "server_onupdate",
         )
 
-    @testing.fails_on("firebird", "Data type unknown")
-    def test_insert(self):
-        r = t.insert().execute()
+    def test_no_embed_in_sql(self):
+        """Using a DefaultGenerator, Sequence, DefaultClause
+        in the columns, where clause of a select, or in the values
+        clause of insert, update, raises an informative error"""
+
+        t = Table("some_table", MetaData(), Column("id", Integer))
+        for const in (
+            sa.Sequence("y"),
+            sa.ColumnDefault("y"),
+            sa.DefaultClause("y"),
+        ):
+            assert_raises_message(
+                sa.exc.ArgumentError,
+                r"SQL expression for WHERE/HAVING role expected, "
+                r"got \[(?:Sequence|ColumnDefault|DefaultClause)\('y'.*\)\]",
+                t.select,
+                [const],
+            )
+            assert_raises_message(
+                sa.exc.ArgumentError,
+                "SQL expression element expected, got %s"
+                % const.__class__.__name__,
+                t.insert().values,
+                col4=const,
+            )
+            assert_raises_message(
+                sa.exc.ArgumentError,
+                "SQL expression element expected, got %s"
+                % const.__class__.__name__,
+                t.update().values,
+                col4=const,
+            )
+
+
+class DefaultRoundTripTest(fixtures.TablesTest):
+    __backend__ = True
+
+    @classmethod
+    def define_tables(cls, metadata):
+        default_generator = cls.default_generator = {"x": 50}
+
+        def mydefault():
+            default_generator["x"] += 1
+            return default_generator["x"]
+
+        def myupdate_with_ctx(ctx):
+            conn = ctx.connection
+            return conn.execute(sa.select([sa.text("13")])).scalar()
+
+        def mydefault_using_connection(ctx):
+            conn = ctx.connection
+            return conn.execute(sa.select([sa.text("12")])).scalar()
+
+        use_function_defaults = testing.against("postgresql", "mssql")
+        is_oracle = testing.against("oracle")
+
+        class MyClass(object):
+            @classmethod
+            def gen_default(cls, ctx):
+                return "hi"
+
+        class MyType(TypeDecorator):
+            impl = String(50)
+
+            def process_bind_param(self, value, dialect):
+                if value is not None:
+                    value = "BIND" + value
+                return value
+
+        cls.f = 6
+        cls.f2 = 11
+        with testing.db.connect() as conn:
+            currenttime = cls.currenttime = func.current_date(type_=sa.Date)
+            if is_oracle:
+                ts = conn.scalar(
+                    sa.select(
+                        [
+                            func.trunc(
+                                func.current_timestamp(),
+                                sa.literal_column("'DAY'"),
+                                type_=sa.Date,
+                            )
+                        ]
+                    )
+                )
+                currenttime = cls.currenttime = func.trunc(
+                    currenttime, sa.literal_column("'DAY'"), type_=sa.Date
+                )
+                def1 = currenttime
+                def2 = func.trunc(
+                    sa.text("current_timestamp"),
+                    sa.literal_column("'DAY'"),
+                    type_=sa.Date,
+                )
+
+                deftype = sa.Date
+            elif use_function_defaults:
+                def1 = currenttime
+                deftype = sa.Date
+                if testing.against("mssql"):
+                    def2 = sa.text("getdate()")
+                else:
+                    def2 = sa.text("current_date")
+                ts = conn.scalar(func.current_date())
+            else:
+                def1 = def2 = "3"
+                ts = 3
+                deftype = Integer
+
+            cls.ts = ts
+
+        Table(
+            "default_test",
+            metadata,
+            # python function
+            Column("col1", Integer, primary_key=True, default=mydefault),
+            # python literal
+            Column(
+                "col2",
+                String(20),
+                default="imthedefault",
+                onupdate="im the update",
+            ),
+            # preexecute expression
+            Column(
+                "col3",
+                Integer,
+                default=func.length("abcdef"),
+                onupdate=func.length("abcdefghijk"),
+            ),
+            # SQL-side default from sql expression
+            Column("col4", deftype, server_default=def1),
+            # SQL-side default from literal expression
+            Column("col5", deftype, server_default=def2),
+            # preexecute + update timestamp
+            Column("col6", sa.Date, default=currenttime, onupdate=currenttime),
+            Column("boolcol1", sa.Boolean, default=True),
+            Column("boolcol2", sa.Boolean, default=False),
+            # python function which uses ExecutionContext
+            Column(
+                "col7",
+                Integer,
+                default=mydefault_using_connection,
+                onupdate=myupdate_with_ctx,
+            ),
+            # python builtin
+            Column(
+                "col8",
+                sa.Date,
+                default=datetime.date.today,
+                onupdate=datetime.date.today,
+            ),
+            # combo
+            Column("col9", String(20), default="py", server_default="ddl"),
+            # python method w/ context
+            Column("col10", String(20), default=MyClass.gen_default),
+            # fixed default w/ type that has bound processor
+            Column("col11", MyType(), default="foo"),
+        )
+
+    def teardown(self):
+        self.default_generator["x"] = 50
+        super(DefaultRoundTripTest, self).teardown()
+
+    def test_standalone(self, connection):
+        t = self.tables.default_test
+        x = connection.execute(t.c.col1.default)
+        y = connection.execute(t.c.col2.default)
+        z = connection.execute(t.c.col3.default)
+        assert 50 <= x <= 57
+        eq_(y, "imthedefault")
+        eq_(z, self.f)
+
+    def test_insert(self, connection):
+        t = self.tables.default_test
+
+        r = connection.execute(t.insert())
         assert r.lastrow_has_defaults()
         eq_(
             set(r.context.postfetch_cols),
             set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]),
         )
 
-        r = t.insert().inline().execute()
+        r = connection.execute(t.insert().inline())
         assert r.lastrow_has_defaults()
         eq_(
             set(r.context.postfetch_cols),
             set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]),
         )
 
-        t.insert().execute()
+        connection.execute(t.insert())
 
-        ctexec = sa.select(
-            [currenttime.label("now")], bind=testing.db
+        ctexec = connection.execute(
+            sa.select([self.currenttime.label("now")])
         ).scalar()
-        result = t.select().order_by(t.c.col1).execute()
+        result = connection.execute(t.select().order_by(t.c.col1))
         today = datetime.date.today()
         eq_(
             result.fetchall(),
@@ -480,9 +540,9 @@ class DefaultTest(fixtures.TestBase):
                 (
                     x,
                     "imthedefault",
-                    f,
-                    ts,
-                    ts,
+                    self.f,
+                    self.ts,
+                    self.ts,
                     ctexec,
                     True,
                     False,
@@ -496,22 +556,26 @@ class DefaultTest(fixtures.TestBase):
             ],
         )
 
-        t.insert().execute(col9=None)
+        connection.execute(t.insert(), dict(col9=None))
+
+        # TODO: why are we looking at 'r' when we just executed something
+        # else ?
         assert r.lastrow_has_defaults()
+
         eq_(
             set(r.context.postfetch_cols),
             set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]),
         )
 
         eq_(
-            t.select(t.c.col1 == 54).execute().fetchall(),
+            list(connection.execute(t.select().where(t.c.col1 == 54))),
             [
                 (
                     54,
                     "imthedefault",
-                    f,
-                    ts,
-                    ts,
+                    self.f,
+                    self.ts,
+                    self.ts,
                     ctexec,
                     True,
                     False,
@@ -524,21 +588,23 @@ class DefaultTest(fixtures.TestBase):
             ],
         )
 
-    def test_insertmany(self):
-        t.insert().execute({}, {}, {})
+    def test_insertmany(self, connection):
+        t = self.tables.default_test
+
+        connection.execute(t.insert(), [{}, {}, {}])
 
-        ctexec = currenttime.scalar()
-        result = t.select().order_by(t.c.col1).execute()
+        ctexec = connection.scalar(self.currenttime)
+        result = connection.execute(t.select().order_by(t.c.col1))
         today = datetime.date.today()
         eq_(
-            result.fetchall(),
+            list(result),
             [
                 (
                     51,
                     "imthedefault",
-                    f,
-                    ts,
-                    ts,
+                    self.f,
+                    self.ts,
+                    self.ts,
                     ctexec,
                     True,
                     False,
@@ -551,9 +617,9 @@ class DefaultTest(fixtures.TestBase):
                 (
                     52,
                     "imthedefault",
-                    f,
-                    ts,
-                    ts,
+                    self.f,
+                    self.ts,
+                    self.ts,
                     ctexec,
                     True,
                     False,
@@ -566,9 +632,9 @@ class DefaultTest(fixtures.TestBase):
                 (
                     53,
                     "imthedefault",
-                    f,
-                    ts,
-                    ts,
+                    self.f,
+                    self.ts,
+                    self.ts,
                     ctexec,
                     True,
                     False,
@@ -582,22 +648,23 @@ class DefaultTest(fixtures.TestBase):
         )
 
     @testing.requires.multivalues_inserts
-    def test_insert_multivalues(self):
+    def test_insert_multivalues(self, connection):
+        t = self.tables.default_test
 
-        t.insert().values([{}, {}, {}]).execute()
+        connection.execute(t.insert().values([{}, {}, {}]))
 
-        ctexec = currenttime.scalar()
-        result = t.select().order_by(t.c.col1).execute()
+        ctexec = connection.execute(self.currenttime).scalar()
+        result = connection.execute(t.select().order_by(t.c.col1))
         today = datetime.date.today()
         eq_(
-            result.fetchall(),
+            list(result),
             [
                 (
                     51,
                     "imthedefault",
-                    f,
-                    ts,
-                    ts,
+                    self.f,
+                    self.ts,
+                    self.ts,
                     ctexec,
                     True,
                     False,
@@ -610,9 +677,9 @@ class DefaultTest(fixtures.TestBase):
                 (
                     52,
                     "imthedefault",
-                    f,
-                    ts,
-                    ts,
+                    self.f,
+                    self.ts,
+                    self.ts,
                     ctexec,
                     True,
                     False,
@@ -625,9 +692,9 @@ class DefaultTest(fixtures.TestBase):
                 (
                     53,
                     "imthedefault",
-                    f,
-                    ts,
-                    ts,
+                    self.f,
+                    self.ts,
+                    self.ts,
                     ctexec,
                     True,
                     False,
@@ -640,84 +707,52 @@ class DefaultTest(fixtures.TestBase):
             ],
         )
 
-    def test_no_embed_in_sql(self):
-        """Using a DefaultGenerator, Sequence, DefaultClause
-        in the columns, where clause of a select, or in the values
-        clause of insert, update, raises an informative error"""
-
-        for const in (
-            sa.Sequence("y"),
-            sa.ColumnDefault("y"),
-            sa.DefaultClause("y"),
-        ):
-            assert_raises_message(
-                sa.exc.ArgumentError,
-                r"SQL expression for WHERE/HAVING role expected, "
-                r"got \[(?:Sequence|ColumnDefault|DefaultClause)\('y'.*\)\]",
-                t.select,
-                [const],
-            )
-            assert_raises_message(
-                sa.exc.ArgumentError,
-                "SQL expression element expected, got %s"
-                % const.__class__.__name__,
-                t.insert().values,
-                col4=const,
-            )
-            assert_raises_message(
-                sa.exc.ArgumentError,
-                "SQL expression element expected, got %s"
-                % const.__class__.__name__,
-                t.update().values,
-                col4=const,
-            )
-
-    def test_missing_many_param(self):
+    def test_missing_many_param(self, connection):
+        t = self.tables.default_test
         assert_raises_message(
             exc.StatementError,
             "A value is required for bind parameter 'col7', in parameter "
             "group 1",
-            t.insert().execute,
+            connection.execute,
+            t.insert(),
             {"col4": 7, "col7": 12, "col8": 19},
             {"col4": 7, "col8": 19},
             {"col4": 7, "col7": 12, "col8": 19},
         )
 
-    def test_insert_values(self):
-        t.insert(values={"col3": 50}).execute()
-        result = t.select().execute()
+    def test_insert_values(self, connection):
+        t = self.tables.default_test
+        connection.execute(t.insert().values(col3=50))
+        result = connection.execute(t.select().order_by(t.c.col1))
         eq_(50, result.first()._mapping["col3"])
 
-    @testing.fails_on("firebird", "Data type unknown")
-    def test_updatemany(self):
-        # MySQL-Python 1.2.2 breaks functions in execute_many :(
-        if testing.against(
-            "mysql+mysqldb"
-        ) and testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2):
-            return
+    def test_updatemany(self, connection):
+        t = self.tables.default_test
 
-        t.insert().execute({}, {}, {})
+        connection.execute(t.insert(), [{}, {}, {}])
 
-        t.update(t.c.col1 == sa.bindparam("pkval")).execute(
-            {"pkval": 51, "col7": None, "col8": None, "boolcol1": False}
+        connection.execute(
+            t.update().where(t.c.col1 == sa.bindparam("pkval")),
+            {"pkval": 51, "col7": None, "col8": None, "boolcol1": False},
         )
 
-        t.update(t.c.col1 == sa.bindparam("pkval")).execute(
-            {"pkval": 51}, {"pkval": 52}, {"pkval": 53}
+        connection.execute(
+            t.update().where(t.c.col1 == sa.bindparam("pkval")),
+            [{"pkval": 51}, {"pkval": 52}, {"pkval": 53}],
         )
 
-        result = t.select().execute()
-        ctexec = currenttime.scalar()
+        ctexec = connection.scalar(self.currenttime)
         today = datetime.date.today()
+        result = connection.execute(t.select().order_by(t.c.col1))
         eq_(
-            result.fetchall(),
+            list(result),
             [
                 (
                     51,
                     "im the update",
-                    f2,
-                    ts,
-                    ts,
+                    self.f2,
+                    self.ts,
+                    self.ts,
                     ctexec,
                     False,
                     False,
@@ -730,9 +765,9 @@ class DefaultTest(fixtures.TestBase):
                 (
                     52,
                     "im the update",
-                    f2,
-                    ts,
-                    ts,
+                    self.f2,
+                    self.ts,
+                    self.ts,
                     ctexec,
                     True,
                     False,
@@ -745,9 +780,9 @@ class DefaultTest(fixtures.TestBase):
                 (
                     53,
                     "im the update",
-                    f2,
-                    ts,
-                    ts,
+                    self.f2,
+                    self.ts,
+                    self.ts,
                     ctexec,
                     True,
                     False,
@@ -760,20 +795,22 @@ class DefaultTest(fixtures.TestBase):
             ],
         )
 
-    @testing.fails_on("firebird", "Data type unknown")
-    def test_update(self):
-        r = t.insert().execute()
+    def test_update(self, connection):
+        t = self.tables.default_test
+        r = connection.execute(t.insert())
         pk = r.inserted_primary_key[0]
-        t.update(t.c.col1 == pk).execute(col4=None, col5=None)
-        ctexec = currenttime.scalar()
-        result = t.select(t.c.col1 == pk).execute()
+        connection.execute(
+            t.update().where(t.c.col1 == pk), dict(col4=None, col5=None)
+        )
+        ctexec = connection.scalar(self.currenttime)
+        result = connection.execute(t.select().where(t.c.col1 == pk))
         result = result.first()
         eq_(
             result,
             (
                 pk,
                 "im the update",
-                f2,
+                self.f2,
                 None,
                 None,
                 ctexec,
@@ -786,16 +823,15 @@ class DefaultTest(fixtures.TestBase):
                 "BINDfoo",
             ),
         )
-        eq_(11, f2)
 
-    @testing.fails_on("firebird", "Data type unknown")
-    def test_update_values(self):
-        r = t.insert().execute()
+    def test_update_values(self, connection):
+        t = self.tables.default_test
+        r = connection.execute(t.insert())
         pk = r.inserted_primary_key[0]
-        t.update(t.c.col1 == pk, values={"col3": 55}).execute()
-        result = t.select(t.c.col1 == pk).execute()
-        result = result.first()
-        eq_(55, result._mapping["col3"])
+        connection.execute(t.update().where(t.c.col1 == pk).values(col3=55))
+        result = connection.execute(t.select().where(t.c.col1 == pk))
+        row = result.first()
+        eq_(55, row._mapping["col3"])
 
 
 class CTEDefaultTest(fixtures.TablesTest):
@@ -820,79 +856,62 @@ class CTEDefaultTest(fixtures.TablesTest):
             Column("u", Integer, onupdate=1),
         )
 
-    def _test_a_in_b(self, a, b):
+    @testing.combinations(
+        ("update", "select", testing.requires.ctes_on_dml),
+        ("update", "select", testing.requires.ctes_on_dml),
+        ("update", "select", testing.requires.ctes_on_dml),
+        ("select", "update"),
+        ("select", "insert"),
+        argnames="a, b",
+    )
+    def test_a_in_b(self, a, b, connection):
         q = self.tables.q
         p = self.tables.p
 
-        with testing.db.connect() as conn:
-            if a == "delete":
-                conn.execute(q.insert().values(y=10, z=1))
-                cte = q.delete().where(q.c.z == 1).returning(q.c.z).cte("c")
-                expected = None
-            elif a == "insert":
-                cte = q.insert().values(z=1, y=10).returning(q.c.z).cte("c")
-                expected = (2, 10)
-            elif a == "update":
-                conn.execute(q.insert().values(x=5, y=10, z=1))
-                cte = (
-                    q.update()
-                    .where(q.c.z == 1)
-                    .values(x=7)
-                    .returning(q.c.z)
-                    .cte("c")
-                )
-                expected = (7, 5)
-            elif a == "select":
-                conn.execute(q.insert().values(x=5, y=10, z=1))
-                cte = sa.select([q.c.z]).cte("c")
-                expected = (5, 10)
-
-            if b == "select":
-                conn.execute(p.insert().values(s=1))
-                stmt = select([p.c.s, cte.c.z]).where(p.c.s == cte.c.z)
-            elif b == "insert":
-                sel = select([1, cte.c.z])
-                stmt = (
-                    p.insert()
-                    .from_select(["s", "t"], sel)
-                    .returning(p.c.s, p.c.t)
-                )
-            elif b == "delete":
-                stmt = (
-                    p.insert().values(s=1, t=cte.c.z).returning(p.c.s, cte.c.z)
-                )
-            elif b == "update":
-                conn.execute(p.insert().values(s=1))
-                stmt = (
-                    p.update()
-                    .values(t=5)
-                    .where(p.c.s == cte.c.z)
-                    .returning(p.c.u, cte.c.z)
-                )
-            eq_(conn.execute(stmt).fetchall(), [(1, 1)])
-
-            eq_(conn.execute(select([q.c.x, q.c.y])).fetchone(), expected)
-
-    @testing.requires.ctes_on_dml
-    def test_update_in_select(self):
-        self._test_a_in_b("update", "select")
-
-    @testing.requires.ctes_on_dml
-    def test_delete_in_select(self):
-        self._test_a_in_b("update", "select")
-
-    @testing.requires.ctes_on_dml
-    def test_insert_in_select(self):
-        self._test_a_in_b("update", "select")
-
-    def test_select_in_update(self):
-        self._test_a_in_b("select", "update")
-
-    def test_select_in_insert(self):
-        self._test_a_in_b("select", "insert")
+        conn = connection
+        if a == "delete":
+            conn.execute(q.insert().values(y=10, z=1))
+            cte = q.delete().where(q.c.z == 1).returning(q.c.z).cte("c")
+            expected = None
+        elif a == "insert":
+            cte = q.insert().values(z=1, y=10).returning(q.c.z).cte("c")
+            expected = (2, 10)
+        elif a == "update":
+            conn.execute(q.insert().values(x=5, y=10, z=1))
+            cte = (
+                q.update()
+                .where(q.c.z == 1)
+                .values(x=7)
+                .returning(q.c.z)
+                .cte("c")
+            )
+            expected = (7, 5)
+        elif a == "select":
+            conn.execute(q.insert().values(x=5, y=10, z=1))
+            cte = sa.select([q.c.z]).cte("c")
+            expected = (5, 10)
+
+        if b == "select":
+            conn.execute(p.insert().values(s=1))
+            stmt = select([p.c.s, cte.c.z]).where(p.c.s == cte.c.z)
+        elif b == "insert":
+            sel = select([1, cte.c.z])
+            stmt = (
+                p.insert().from_select(["s", "t"], sel).returning(p.c.s, p.c.t)
+            )
+        elif b == "delete":
+            stmt = p.insert().values(s=1, t=cte.c.z).returning(p.c.s, cte.c.z)
+        elif b == "update":
+            conn.execute(p.insert().values(s=1))
+            stmt = (
+                p.update()
+                .values(t=5)
+                .where(p.c.s == cte.c.z)
+                .returning(p.c.u, cte.c.z)
+            )
+        eq_(list(conn.execute(stmt)), [(1, 1)])
 
-    # TODO: updates / inserts can be run in one statement w/ CTE ?
-    # deletes?
+        eq_(conn.execute(select([q.c.x, q.c.y])).first(), expected)
 
 
 class PKDefaultTest(fixtures.TablesTest):
@@ -1019,75 +1038,45 @@ class PKIncrementTest(fixtures.TablesTest):
         self._test_autoincrement(testing.db)
 
     def test_autoincrement_transaction(self):
-        con = testing.db.connect()
-        tx = con.begin()
-        try:
-            try:
-                self._test_autoincrement(con)
-            except Exception:
-                try:
-                    tx.rollback()
-                except Exception:
-                    pass
-                raise
-            else:
-                tx.commit()
-        finally:
-            con.close()
+        with testing.db.begin() as conn:
+            self._test_autoincrement(conn)
 
 
 class EmptyInsertTest(fixtures.TestBase):
     __backend__ = True
 
-    @testing.exclude("sqlite", "<", (3, 3, 8), "no empty insert support")
     @testing.fails_on("oracle", "FIXME: unknown")
     @testing.provide_metadata
-    def test_empty_insert(self):
+    def test_empty_insert(self, connection):
         t1 = Table(
             "t1",
             self.metadata,
             Column("is_true", Boolean, server_default=("1")),
         )
-        self.metadata.create_all()
-        t1.insert().execute()
-        eq_(1, select([func.count(text("*"))], from_obj=t1).scalar())
-        eq_(True, t1.select().scalar())
+        self.metadata.create_all(connection)
+        connection.execute(t1.insert())
+        eq_(
+            1,
+            connection.scalar(select([func.count(text("*"))]).select_from(t1)),
+        )
+        eq_(True, connection.scalar(t1.select()))
 
 
-class AutoIncrementTest(fixtures.TablesTest):
+class AutoIncrementTest(fixtures.TestBase):
     __requires__ = ("identity",)
-    run_define_tables = "each"
     __backend__ = True
 
-    @classmethod
-    def define_tables(cls, metadata):
-        """Each test manipulates self.metadata individually."""
-
-    @testing.exclude("sqlite", "<", (3, 4), "no database support")
-    def test_autoincrement_single_col(self):
+    @testing.provide_metadata
+    def test_autoincrement_single_col(self, connection):
         single = Table(
             "single", self.metadata, Column("id", Integer, primary_key=True)
         )
-        single.create()
+        self.metadata.create_all(connection)
 
-        r = single.insert().execute()
+        r = connection.execute(single.insert())
         id_ = r.inserted_primary_key[0]
         eq_(id_, 1)
-        eq_(1, sa.select([func.count(sa.text("*"))], from_obj=single).scalar())
-
-    def test_autoincrement_fk(self):
-        nodes = Table(
-            "nodes",
-            self.metadata,
-            Column("id", Integer, primary_key=True),
-            Column("parent_id", Integer, ForeignKey("nodes.id")),
-            Column("data", String(30)),
-        )
-        nodes.create()
-
-        r = nodes.insert().execute(data="foo")
-        id_ = r.inserted_primary_key[0]
-        nodes.insert().execute(data="bar", parent_id=id_)
+        eq_(connection.scalar(sa.select([single.c.id])), 1)
 
     def test_autoinc_detection_no_affinity(self):
         class MyType(TypeDecorator):
@@ -1124,7 +1113,8 @@ class AutoIncrementTest(fixtures.TablesTest):
         assert x._autoincrement_column is None
 
     @testing.only_on("sqlite")
-    def test_non_autoincrement(self):
+    @testing.provide_metadata
+    def test_non_autoincrement(self, connection):
         # sqlite INT primary keys can be non-unique! (only for ints)
         nonai = Table(
             "nonaitest",
@@ -1132,17 +1122,14 @@ class AutoIncrementTest(fixtures.TablesTest):
             Column("id", Integer, autoincrement=False, primary_key=True),
             Column("data", String(20)),
         )
-        nonai.create()
-
-        def go():
-            # postgresql + mysql strict will fail on first row,
-            # mysql in legacy mode fails on second row
-            nonai.insert().execute(data="row 1")
-            nonai.insert().execute(data="row 2")
+        nonai.create(connection)
 
         # just testing SQLite for now, it passes
         with expect_warnings(".*has no Python-side or server-side default.*"):
-            go()
+            # postgresql + mysql strict will fail on first row,
+            # mysql in legacy mode fails on second row
+            connection.execute(nonai.insert(), dict(data="row 1"))
+            connection.execute(nonai.insert(), dict(data="row 2"))
 
     @testing.metadata_fixture(ddl="function")
     def dataset_no_autoinc(self, metadata):
@@ -1169,7 +1156,7 @@ class AutoIncrementTest(fixtures.TablesTest):
         return dataset_no_autoinc
 
     def test_col_w_optional_sequence_non_autoinc_no_firing(
-        self, dataset_no_autoinc
+        self, dataset_no_autoinc, connection
     ):
         """this is testing that a Table which includes a Sequence, when
         run against a DB that does not support sequences, the Sequence
@@ -1178,18 +1165,17 @@ class AutoIncrementTest(fixtures.TablesTest):
         """
         dataset_no_autoinc.c.set_id.default.optional = True
 
-        with testing.db.connect() as conn:
-            conn.execute(dataset_no_autoinc.insert())
-            eq_(
-                conn.scalar(
-                    select([func.count("*")]).select_from(dataset_no_autoinc)
-                ),
-                1,
-            )
+        connection.execute(dataset_no_autoinc.insert())
+        eq_(
+            connection.scalar(
+                select([func.count("*")]).select_from(dataset_no_autoinc)
+            ),
+            1,
+        )
 
     @testing.fails_if(testing.requires.sequences)
     def test_col_w_nonoptional_sequence_non_autoinc_no_firing(
-        self, dataset_no_autoinc
+        self, dataset_no_autoinc, connection
     ):
         """When the sequence is not optional and sequences are supported,
         the test fails because we didn't create the sequence.
@@ -1197,511 +1183,12 @@ class AutoIncrementTest(fixtures.TablesTest):
         """
         dataset_no_autoinc.c.set_id.default.optional = False
 
-        with testing.db.connect() as conn:
-            conn.execute(dataset_no_autoinc.insert())
-            eq_(
-                conn.scalar(
-                    select([func.count("*")]).select_from(dataset_no_autoinc)
-                ),
-                1,
-            )
-
-
-class SequenceDDLTest(fixtures.TestBase, testing.AssertsCompiledSQL):
-    __dialect__ = "default"
-    __backend__ = True
-
-    def test_create_drop_ddl(self):
-        self.assert_compile(
-            CreateSequence(Sequence("foo_seq")), "CREATE SEQUENCE foo_seq"
-        )
-
-        self.assert_compile(
-            CreateSequence(Sequence("foo_seq", start=5)),
-            "CREATE SEQUENCE foo_seq START WITH 5",
-        )
-
-        self.assert_compile(
-            CreateSequence(Sequence("foo_seq", increment=2)),
-            "CREATE SEQUENCE foo_seq INCREMENT BY 2",
-        )
-
-        self.assert_compile(
-            CreateSequence(Sequence("foo_seq", increment=2, start=5)),
-            "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 5",
-        )
-
-        self.assert_compile(
-            CreateSequence(
-                Sequence("foo_seq", increment=2, start=0, minvalue=0)
-            ),
-            "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 0 MINVALUE 0",
-        )
-
-        self.assert_compile(
-            CreateSequence(
-                Sequence("foo_seq", increment=2, start=1, maxvalue=5)
-            ),
-            "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 1 MAXVALUE 5",
-        )
-
-        self.assert_compile(
-            CreateSequence(
-                Sequence("foo_seq", increment=2, start=1, nomaxvalue=True)
-            ),
-            "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 1 NO MAXVALUE",
-        )
-
-        self.assert_compile(
-            CreateSequence(
-                Sequence("foo_seq", increment=2, start=0, nominvalue=True)
-            ),
-            "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 0 NO MINVALUE",
-        )
-
-        self.assert_compile(
-            CreateSequence(
-                Sequence("foo_seq", start=1, maxvalue=10, cycle=True)
-            ),
-            "CREATE SEQUENCE foo_seq START WITH 1 MAXVALUE 10 CYCLE",
-        )
-
-        self.assert_compile(
-            CreateSequence(Sequence("foo_seq", cache=1000, order=True)),
-            "CREATE SEQUENCE foo_seq CACHE 1000 ORDER",
-        )
-
-        self.assert_compile(
-            CreateSequence(Sequence("foo_seq", order=True)),
-            "CREATE SEQUENCE foo_seq ORDER",
-        )
-
-        self.assert_compile(
-            DropSequence(Sequence("foo_seq")), "DROP SEQUENCE foo_seq"
-        )
-
-
-class SequenceExecTest(fixtures.TestBase):
-    __requires__ = ("sequences",)
-    __backend__ = True
-
-    @classmethod
-    def setup_class(cls):
-        cls.seq = Sequence("my_sequence")
-        cls.seq.create(testing.db)
-
-    @classmethod
-    def teardown_class(cls):
-        cls.seq.drop(testing.db)
-
-    def _assert_seq_result(self, ret):
-        """asserts return of next_value is an int"""
-
-        assert isinstance(ret, util.int_types)
-        assert ret > 0
-
-    def test_implicit_connectionless(self):
-        s = Sequence("my_sequence", metadata=MetaData(testing.db))
-        self._assert_seq_result(s.execute())
-
-    def test_explicit(self):
-        s = Sequence("my_sequence")
-        self._assert_seq_result(s.execute(testing.db))
-
-    def test_explicit_optional(self):
-        """test dialect executes a Sequence, returns nextval, whether
-        or not "optional" is set """
-
-        s = Sequence("my_sequence", optional=True)
-        self._assert_seq_result(s.execute(testing.db))
-
-    def test_func_implicit_connectionless_execute(self):
-        """test func.next_value().execute()/.scalar() works
-        with connectionless execution. """
-
-        s = Sequence("my_sequence", metadata=MetaData(testing.db))
-        self._assert_seq_result(s.next_value().execute().scalar())
-
-    def test_func_explicit(self):
-        s = Sequence("my_sequence")
-        self._assert_seq_result(testing.db.scalar(s.next_value()))
-
-    def test_func_implicit_connectionless_scalar(self):
-        """test func.next_value().execute()/.scalar() works. """
-
-        s = Sequence("my_sequence", metadata=MetaData(testing.db))
-        self._assert_seq_result(s.next_value().scalar())
-
-    def test_func_embedded_select(self):
-        """test can use next_value() in select column expr"""
-
-        s = Sequence("my_sequence")
-        self._assert_seq_result(testing.db.scalar(select([s.next_value()])))
-
-    @testing.fails_on("oracle", "ORA-02287: sequence number not allowed here")
-    @testing.provide_metadata
-    def test_func_embedded_whereclause(self):
-        """test can use next_value() in whereclause"""
-
-        metadata = self.metadata
-        t1 = Table("t", metadata, Column("x", Integer))
-        t1.create(testing.db)
-        testing.db.execute(t1.insert(), [{"x": 1}, {"x": 300}, {"x": 301}])
-        s = Sequence("my_sequence")
+        connection.execute(dataset_no_autoinc.insert())
         eq_(
-            testing.db.execute(
-                t1.select().where(t1.c.x > s.next_value())
-            ).fetchall(),
-            [(300,), (301,)],
-        )
-
-    @testing.provide_metadata
-    def test_func_embedded_valuesbase(self):
-        """test can use next_value() in values() of _ValuesBase"""
-
-        metadata = self.metadata
-        t1 = Table("t", metadata, Column("x", Integer))
-        t1.create(testing.db)
-        s = Sequence("my_sequence")
-        testing.db.execute(t1.insert().values(x=s.next_value()))
-        self._assert_seq_result(testing.db.scalar(t1.select()))
-
-    @testing.requires.supports_lastrowid
-    @testing.provide_metadata
-    def test_inserted_pk_no_returning_w_lastrowid(self):
-        """test inserted_primary_key contains the pk when
-        pk_col=next_value(), lastrowid is supported."""
-
-        metadata = self.metadata
-        e = engines.testing_engine(options={"implicit_returning": False})
-        s = Sequence("my_sequence")
-        metadata.bind = e
-        t1 = Table("t", metadata, Column("x", Integer, primary_key=True))
-        t1.create()
-        r = e.execute(t1.insert().values(x=s.next_value()))
-        self._assert_seq_result(r.inserted_primary_key[0])
-
-    @testing.requires.no_lastrowid_support
-    @testing.provide_metadata
-    def test_inserted_pk_no_returning_no_lastrowid(self):
-        """test inserted_primary_key contains [None] when
-        pk_col=next_value(), implicit returning is not used."""
-
-        metadata = self.metadata
-        e = engines.testing_engine(options={"implicit_returning": False})
-        s = Sequence("my_sequence")
-        metadata.bind = e
-        t1 = Table("t", metadata, Column("x", Integer, primary_key=True))
-        t1.create()
-        r = e.execute(t1.insert().values(x=s.next_value()))
-        eq_(r.inserted_primary_key, [None])
-
-    @testing.requires.returning
-    @testing.provide_metadata
-    def test_inserted_pk_implicit_returning(self):
-        """test inserted_primary_key contains the result when
-        pk_col=next_value(), when implicit returning is used."""
-
-        metadata = self.metadata
-        e = engines.testing_engine(options={"implicit_returning": True})
-        s = Sequence("my_sequence")
-        metadata.bind = e
-        t1 = Table("t", metadata, Column("x", Integer, primary_key=True))
-        t1.create()
-        r = e.execute(t1.insert().values(x=s.next_value()))
-        self._assert_seq_result(r.inserted_primary_key[0])
-
-
-class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
-    __requires__ = ("sequences",)
-    __backend__ = True
-
-    @testing.fails_on("firebird", "no FB support for start/increment")
-    def test_start_increment(self):
-        for seq in (
-            Sequence("foo_seq"),
-            Sequence("foo_seq", start=8),
-            Sequence("foo_seq", increment=5),
-        ):
-            seq.create(testing.db)
-            try:
-                values = [testing.db.execute(seq) for i in range(3)]
-                start = seq.start or 1
-                inc = seq.increment or 1
-                assert values == list(range(start, start + inc * 3, inc))
-
-            finally:
-                seq.drop(testing.db)
-
-    def _has_sequence(self, name):
-        return testing.db.dialect.has_sequence(testing.db, name)
-
-    def test_nextval_render(self):
-        """test dialect renders the "nextval" construct,
-        whether or not "optional" is set """
-
-        for s in (Sequence("my_seq"), Sequence("my_seq", optional=True)):
-            assert str(s.next_value().compile(dialect=testing.db.dialect)) in (
-                "nextval('my_seq')",
-                "nextval(my_seq)",
-                "gen_id(my_seq, 1)",
-                "my_seq.nextval",
-            )
-
-    def test_nextval_unsupported(self):
-        """test next_value() used on non-sequence platform
-        raises NotImplementedError."""
-
-        s = Sequence("my_seq")
-        d = sqlite.dialect()
-        assert_raises_message(
-            NotImplementedError,
-            "Dialect 'sqlite' does not support sequence increments.",
-            s.next_value().compile,
-            dialect=d,
-        )
-
-    def test_checkfirst_sequence(self):
-        s = Sequence("my_sequence")
-        s.create(testing.db, checkfirst=False)
-        assert self._has_sequence("my_sequence")
-        s.create(testing.db, checkfirst=True)
-        s.drop(testing.db, checkfirst=False)
-        assert not self._has_sequence("my_sequence")
-        s.drop(testing.db, checkfirst=True)
-
-    def test_checkfirst_metadata(self):
-        m = MetaData()
-        Sequence("my_sequence", metadata=m)
-        m.create_all(testing.db, checkfirst=False)
-        assert self._has_sequence("my_sequence")
-        m.create_all(testing.db, checkfirst=True)
-        m.drop_all(testing.db, checkfirst=False)
-        assert not self._has_sequence("my_sequence")
-        m.drop_all(testing.db, checkfirst=True)
-
-    def test_checkfirst_table(self):
-        m = MetaData()
-        s = Sequence("my_sequence")
-        t = Table("t", m, Column("c", Integer, s, primary_key=True))
-        t.create(testing.db, checkfirst=False)
-        assert self._has_sequence("my_sequence")
-        t.create(testing.db, checkfirst=True)
-        t.drop(testing.db, checkfirst=False)
-        assert not self._has_sequence("my_sequence")
-        t.drop(testing.db, checkfirst=True)
-
-    @testing.provide_metadata
-    def test_table_overrides_metadata_create(self):
-        metadata = self.metadata
-        Sequence("s1", metadata=metadata)
-        s2 = Sequence("s2", metadata=metadata)
-        s3 = Sequence("s3")
-        t = Table("t", metadata, Column("c", Integer, s3, primary_key=True))
-        assert s3.metadata is metadata
-
-        t.create(testing.db, checkfirst=True)
-        s3.drop(testing.db)
-
-        # 't' is created, and 's3' won't be
-        # re-created since it's linked to 't'.
-        # 's1' and 's2' are, however.
-        metadata.create_all(testing.db)
-        assert self._has_sequence("s1")
-        assert self._has_sequence("s2")
-        assert not self._has_sequence("s3")
-
-        s2.drop(testing.db)
-        assert self._has_sequence("s1")
-        assert not self._has_sequence("s2")
-
-        metadata.drop_all(testing.db)
-        assert not self._has_sequence("s1")
-        assert not self._has_sequence("s2")
-
-    @testing.requires.returning
-    @testing.provide_metadata
-    def test_freestanding_sequence_via_autoinc(self):
-        t = Table(
-            "some_table",
-            self.metadata,
-            Column(
-                "id",
-                Integer,
-                autoincrement=True,
-                primary_key=True,
-                default=Sequence(
-                    "my_sequence", metadata=self.metadata
-                ).next_value(),
-            ),
-        )
-        self.metadata.create_all(testing.db)
-
-        result = testing.db.execute(t.insert())
-        eq_(result.inserted_primary_key, [1])
-
-
-cartitems = sometable = metadata = None
-
-
-class TableBoundSequenceTest(fixtures.TestBase):
-    __requires__ = ("sequences",)
-    __backend__ = True
-
-    @classmethod
-    def setup_class(cls):
-        global cartitems, sometable, metadata
-        metadata = MetaData(testing.db)
-        cartitems = Table(
-            "cartitems",
-            metadata,
-            Column(
-                "cart_id", Integer, Sequence("cart_id_seq"), primary_key=True
-            ),
-            Column("description", String(40)),
-            Column("createdate", sa.DateTime()),
-        )
-        sometable = Table(
-            "Manager",
-            metadata,
-            Column("obj_id", Integer, Sequence("obj_id_seq")),
-            Column("name", String(128)),
-            Column(
-                "id",
-                Integer,
-                Sequence("Manager_id_seq", optional=True),
-                primary_key=True,
+            connection.scalar(
+                select([func.count("*")]).select_from(dataset_no_autoinc)
             ),
-        )
-
-        metadata.create_all()
-
-    @classmethod
-    def teardown_class(cls):
-        metadata.drop_all()
-
-    def test_insert_via_seq(self):
-        cartitems.insert().execute(description="hi")
-        cartitems.insert().execute(description="there")
-        r = cartitems.insert().execute(description="lala")
-
-        assert r.inserted_primary_key and r.inserted_primary_key[0] is not None
-        id_ = r.inserted_primary_key[0]
-
-        eq_(
             1,
-            sa.select(
-                [func.count(cartitems.c.cart_id)],
-                sa.and_(
-                    cartitems.c.description == "lala",
-                    cartitems.c.cart_id == id_,
-                ),
-            ).scalar(),
-        )
-
-        cartitems.select().execute().fetchall()
-
-    def test_seq_nonpk(self):
-        """test sequences fire off as defaults on non-pk columns"""
-
-        engine = engines.testing_engine(options={"implicit_returning": False})
-        result = engine.execute(sometable.insert(), name="somename")
-
-        assert set(result.postfetch_cols()) == set([sometable.c.obj_id])
-
-        result = engine.execute(sometable.insert(), name="someother")
-        assert set(result.postfetch_cols()) == set([sometable.c.obj_id])
-
-        sometable.insert().execute({"name": "name3"}, {"name": "name4"})
-        eq_(
-            sometable.select().order_by(sometable.c.id).execute().fetchall(),
-            [
-                (1, "somename", 1),
-                (2, "someother", 2),
-                (3, "name3", 3),
-                (4, "name4", 4),
-            ],
-        )
-
-
-class SequenceAsServerDefaultTest(
-    testing.AssertsExecutionResults, fixtures.TablesTest
-):
-    __requires__ = ("sequences_as_server_defaults",)
-    __backend__ = True
-
-    run_create_tables = "each"
-
-    @classmethod
-    def define_tables(cls, metadata):
-        m = metadata
-
-        s = Sequence("t_seq", metadata=m)
-        Table(
-            "t_seq_test",
-            m,
-            Column("id", Integer, s, server_default=s.next_value()),
-            Column("data", String(50)),
-        )
-
-        s2 = Sequence("t_seq_2", metadata=m)
-        Table(
-            "t_seq_test_2",
-            m,
-            Column("id", Integer, server_default=s2.next_value()),
-            Column("data", String(50)),
-        )
-
-    def test_default_textual_w_default(self):
-        with testing.db.connect() as conn:
-            conn.exec_driver_sql(
-                "insert into t_seq_test (data) values ('some data')"
-            )
-
-            eq_(conn.exec_driver_sql("select id from t_seq_test").scalar(), 1)
-
-    def test_default_core_w_default(self):
-        t_seq_test = self.tables.t_seq_test
-        with testing.db.connect() as conn:
-            conn.execute(t_seq_test.insert().values(data="some data"))
-
-            eq_(conn.scalar(select([t_seq_test.c.id])), 1)
-
-    def test_default_textual_server_only(self):
-        with testing.db.connect() as conn:
-            conn.exec_driver_sql(
-                "insert into t_seq_test_2 (data) values ('some data')"
-            )
-
-            eq_(
-                conn.exec_driver_sql("select id from t_seq_test_2").scalar(), 1
-            )
-
-    def test_default_core_server_only(self):
-        t_seq_test = self.tables.t_seq_test_2
-        with testing.db.connect() as conn:
-            conn.execute(t_seq_test.insert().values(data="some data"))
-
-            eq_(conn.scalar(select([t_seq_test.c.id])), 1)
-
-    def test_drop_ordering(self):
-        self.assert_sql_execution(
-            testing.db,
-            lambda: self.metadata.drop_all(checkfirst=False),
-            AllOf(
-                CompiledSQL("DROP TABLE t_seq_test_2", {}),
-                EachOf(
-                    CompiledSQL("DROP TABLE t_seq_test", {}),
-                    CompiledSQL(
-                        "DROP SEQUENCE t_seq",  # dropped as part of t_seq_test
-                        {},
-                    ),
-                ),
-            ),
-            CompiledSQL(
-                "DROP SEQUENCE t_seq_2",  # dropped as part of metadata level
-                {},
-            ),
         )
 
 
@@ -1748,19 +1235,20 @@ class SpecialTypePKTest(fixtures.TestBase):
             implicit_returning=implicit_returning,
         )
 
-        t.create()
-        r = t.insert().values(data=5).execute()
-
-        # we don't pre-fetch 'server_default'.
-        if "server_default" in kw and (
-            not testing.db.dialect.implicit_returning or not implicit_returning
-        ):
-            eq_(r.inserted_primary_key, [None])
-        else:
-            eq_(r.inserted_primary_key, ["INT_1"])
-        r.close()
+        with testing.db.connect() as conn:
+            t.create(conn)
+            r = conn.execute(t.insert().values(data=5))
+
+            # we don't pre-fetch 'server_default'.
+            if "server_default" in kw and (
+                not testing.db.dialect.implicit_returning
+                or not implicit_returning
+            ):
+                eq_(r.inserted_primary_key, [None])
+            else:
+                eq_(r.inserted_primary_key, ["INT_1"])
 
-        eq_(t.select().execute().first(), ("INT_1", 5))
+            eq_(conn.execute(t.select()).first(), ("INT_1", 5))
 
     def test_plain(self):
         # among other things, tests that autoincrement
@@ -1807,7 +1295,7 @@ class ServerDefaultsOnPKTest(fixtures.TestBase):
     __backend__ = True
 
     @testing.provide_metadata
-    def test_string_default_none_on_insert(self):
+    def test_string_default_none_on_insert(self, connection):
         """Test that without implicit returning, we return None for
         a string server default.
 
@@ -1827,14 +1315,14 @@ class ServerDefaultsOnPKTest(fixtures.TestBase):
             Column("data", String(10)),
             implicit_returning=False,
         )
-        metadata.create_all()
-        r = t.insert().execute(data="data")
+        metadata.create_all(connection)
+        r = connection.execute(t.insert(), dict(data="data"))
         eq_(r.inserted_primary_key, [None])
-        eq_(t.select().execute().fetchall(), [("key_one", "data")])
+        eq_(list(connection.execute(t.select())), [("key_one", "data")])
 
     @testing.requires.returning
     @testing.provide_metadata
-    def test_string_default_on_insert_with_returning(self):
+    def test_string_default_on_insert_with_returning(self, connection):
         """With implicit_returning, we get a string PK default back no
         problem."""
 
@@ -1847,13 +1335,13 @@ class ServerDefaultsOnPKTest(fixtures.TestBase):
             ),
             Column("data", String(10)),
         )
-        metadata.create_all()
-        r = t.insert().execute(data="data")
+        metadata.create_all(connection)
+        r = connection.execute(t.insert(), dict(data="data"))
         eq_(r.inserted_primary_key, ["key_one"])
-        eq_(t.select().execute().fetchall(), [("key_one", "data")])
+        eq_(list(connection.execute(t.select())), [("key_one", "data")])
 
     @testing.provide_metadata
-    def test_int_default_none_on_insert(self):
+    def test_int_default_none_on_insert(self, connection):
         metadata = self.metadata
         t = Table(
             "x",
@@ -1863,16 +1351,16 @@ class ServerDefaultsOnPKTest(fixtures.TestBase):
             implicit_returning=False,
         )
         assert t._autoincrement_column is None
-        metadata.create_all()
-        r = t.insert().execute(data="data")
+        metadata.create_all(connection)
+        r = connection.execute(t.insert(), dict(data="data"))
         eq_(r.inserted_primary_key, [None])
         if testing.against("sqlite"):
-            eq_(t.select().execute().fetchall(), [(1, "data")])
+            eq_(list(connection.execute(t.select())), [(1, "data")])
         else:
-            eq_(t.select().execute().fetchall(), [(5, "data")])
+            eq_(list(connection.execute(t.select())), [(5, "data")])
 
     @testing.provide_metadata
-    def test_autoincrement_reflected_from_server_default(self):
+    def test_autoincrement_reflected_from_server_default(self, connection):
         metadata = self.metadata
         t = Table(
             "x",
@@ -1882,14 +1370,14 @@ class ServerDefaultsOnPKTest(fixtures.TestBase):
             implicit_returning=False,
         )
         assert t._autoincrement_column is None
-        metadata.create_all()
+        metadata.create_all(connection)
 
-        m2 = MetaData(metadata.bind)
-        t2 = Table("x", m2, autoload=True, implicit_returning=False)
+        m2 = MetaData()
+        t2 = Table("x", m2, autoload_with=connection, implicit_returning=False)
         assert t2._autoincrement_column is None
 
     @testing.provide_metadata
-    def test_int_default_none_on_insert_reflected(self):
+    def test_int_default_none_on_insert_reflected(self, connection):
         metadata = self.metadata
         Table(
             "x",
@@ -1898,21 +1386,21 @@ class ServerDefaultsOnPKTest(fixtures.TestBase):
             Column("data", String(10)),
             implicit_returning=False,
         )
-        metadata.create_all()
+        metadata.create_all(connection)
 
-        m2 = MetaData(metadata.bind)
-        t2 = Table("x", m2, autoload=True, implicit_returning=False)
+        m2 = MetaData()
+        t2 = Table("x", m2, autoload_with=connection, implicit_returning=False)
 
-        r = t2.insert().execute(data="data")
+        r = connection.execute(t2.insert(), dict(data="data"))
         eq_(r.inserted_primary_key, [None])
         if testing.against("sqlite"):
-            eq_(t2.select().execute().fetchall(), [(1, "data")])
+            eq_(list(connection.execute(t2.select())), [(1, "data")])
         else:
-            eq_(t2.select().execute().fetchall(), [(5, "data")])
+            eq_(list(connection.execute(t2.select())), [(5, "data")])
 
     @testing.requires.returning
     @testing.provide_metadata
-    def test_int_default_on_insert_with_returning(self):
+    def test_int_default_on_insert_with_returning(self, connection):
         metadata = self.metadata
         t = Table(
             "x",
@@ -1921,10 +1409,10 @@ class ServerDefaultsOnPKTest(fixtures.TestBase):
             Column("data", String(10)),
         )
 
-        metadata.create_all()
-        r = t.insert().execute(data="data")
+        metadata.create_all(connection)
+        r = connection.execute(t.insert(), dict(data="data"))
         eq_(r.inserted_primary_key, [5])
-        eq_(t.select().execute().fetchall(), [(5, "data")])
+        eq_(list(connection.execute(t.select())), [(5, "data")])
 
 
 class UnicodeDefaultsTest(fixtures.TestBase):
@@ -1950,20 +1438,23 @@ class UnicodeDefaultsTest(fixtures.TestBase):
         )
 
 
-class InsertFromSelectTest(fixtures.TestBase):
+class InsertFromSelectTest(fixtures.TablesTest):
     __backend__ = True
 
-    def _fixture(self):
-        data = Table(
-            "data", self.metadata, Column("x", Integer), Column("y", Integer)
-        )
-        data.create()
-        testing.db.execute(data.insert(), {"x": 2, "y": 5}, {"x": 7, "y": 12})
-        return data
+    @classmethod
+    def define_tables(cls, metadata):
+        Table("data", metadata, Column("x", Integer), Column("y", Integer))
+
+    @classmethod
+    def insert_data(cls):
+        data = cls.tables.data
+
+        with testing.db.connect() as conn:
+            conn.execute(data.insert(), [{"x": 2, "y": 5}, {"x": 7, "y": 12}])
 
     @testing.provide_metadata
-    def test_insert_from_select_override_defaults(self):
-        data = self._fixture()
+    def test_insert_from_select_override_defaults(self, connection):
+        data = self.tables.data
 
         table = Table(
             "sometable",
@@ -1973,21 +1464,21 @@ class InsertFromSelectTest(fixtures.TestBase):
             Column("y", Integer),
         )
 
-        table.create()
+        table.create(connection)
 
         sel = select([data.c.x, data.c.y])
 
         ins = table.insert().from_select(["x", "y"], sel)
-        testing.db.execute(ins)
+        connection.execute(ins)
 
         eq_(
-            testing.db.execute(table.select().order_by(table.c.x)).fetchall(),
+            list(connection.execute(table.select().order_by(table.c.x))),
             [(2, 12, 5), (7, 12, 12)],
         )
 
     @testing.provide_metadata
-    def test_insert_from_select_fn_defaults(self):
-        data = self._fixture()
+    def test_insert_from_select_fn_defaults(self, connection):
+        data = self.tables.data
 
         counter = itertools.count(1)
 
@@ -2002,16 +1493,16 @@ class InsertFromSelectTest(fixtures.TestBase):
             Column("y", Integer),
         )
 
-        table.create()
+        table.create(connection)
 
         sel = select([data.c.x, data.c.y])
 
         ins = table.insert().from_select(["x", "y"], sel)
-        testing.db.execute(ins)
+        connection.execute(ins)
 
         # counter is only called once!
         eq_(
-            testing.db.execute(table.select().order_by(table.c.x)).fetchall(),
+            list(connection.execute(table.select().order_by(table.c.x))),
             [(2, 1, 5), (7, 1, 12)],
         )
 
@@ -2039,7 +1530,15 @@ class CurrentParametersTest(fixtures.TablesTest):
         some_table.c.x.default.arg = gen_default
         return fn
 
-    def _test(self, exec_type, usemethod):
+    @testing.combinations(
+        ("single", "attribute"),
+        ("single", "method"),
+        ("executemany", "attribute"),
+        ("executemany", "method"),
+        ("multivalues", "method", testing.requires.multivalues_inserts),
+        argnames="exec_type, usemethod",
+    )
+    def test_parameters(self, exec_type, usemethod, connection):
         collect = mock.Mock()
 
         @self._fixture
@@ -2057,25 +1556,8 @@ class CurrentParametersTest(fixtures.TablesTest):
         else:
             stmt, params = table.insert(), parameters
 
-        with testing.db.connect() as conn:
-            conn.execute(stmt, params)
+        connection.execute(stmt, params)
         eq_(
             collect.mock_calls,
             [mock.call({"y": param["y"], "x": None}) for param in parameters],
         )
-
-    def test_single_w_attribute(self):
-        self._test("single", "attribute")
-
-    def test_single_w_method(self):
-        self._test("single", "method")
-
-    def test_executemany_w_attribute(self):
-        self._test("executemany", "attribute")
-
-    def test_executemany_w_method(self):
-        self._test("executemany", "method")
-
-    @testing.requires.multivalues_inserts
-    def test_multivalued_w_method(self):
-        self._test("multivalues", "method")
diff --git a/test/sql/test_sequences.py b/test/sql/test_sequences.py
new file mode 100644 (file)
index 0000000..8beee51
--- /dev/null
@@ -0,0 +1,555 @@
+import sqlalchemy as sa
+from sqlalchemy import Integer
+from sqlalchemy import MetaData
+from sqlalchemy import Sequence
+from sqlalchemy import String
+from sqlalchemy import testing
+from sqlalchemy import util
+from sqlalchemy.dialects import sqlite
+from sqlalchemy.schema import CreateSequence
+from sqlalchemy.schema import DropSequence
+from sqlalchemy.sql import select
+from sqlalchemy.testing import assert_raises_message
+from sqlalchemy.testing import engines
+from sqlalchemy.testing import eq_
+from sqlalchemy.testing import fixtures
+from sqlalchemy.testing.assertsql import AllOf
+from sqlalchemy.testing.assertsql import CompiledSQL
+from sqlalchemy.testing.assertsql import EachOf
+from sqlalchemy.testing.schema import Column
+from sqlalchemy.testing.schema import Table
+
+
+class SequenceDDLTest(fixtures.TestBase, testing.AssertsCompiledSQL):
+    __dialect__ = "default"
+    __backend__ = True
+
+    def test_create_drop_ddl(self):
+        self.assert_compile(
+            CreateSequence(Sequence("foo_seq")), "CREATE SEQUENCE foo_seq"
+        )
+
+        self.assert_compile(
+            CreateSequence(Sequence("foo_seq", start=5)),
+            "CREATE SEQUENCE foo_seq START WITH 5",
+        )
+
+        self.assert_compile(
+            CreateSequence(Sequence("foo_seq", increment=2)),
+            "CREATE SEQUENCE foo_seq INCREMENT BY 2",
+        )
+
+        self.assert_compile(
+            CreateSequence(Sequence("foo_seq", increment=2, start=5)),
+            "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 5",
+        )
+
+        self.assert_compile(
+            CreateSequence(
+                Sequence("foo_seq", increment=2, start=0, minvalue=0)
+            ),
+            "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 0 MINVALUE 0",
+        )
+
+        self.assert_compile(
+            CreateSequence(
+                Sequence("foo_seq", increment=2, start=1, maxvalue=5)
+            ),
+            "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 1 MAXVALUE 5",
+        )
+
+        self.assert_compile(
+            CreateSequence(
+                Sequence("foo_seq", increment=2, start=1, nomaxvalue=True)
+            ),
+            "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 1 NO MAXVALUE",
+        )
+
+        self.assert_compile(
+            CreateSequence(
+                Sequence("foo_seq", increment=2, start=0, nominvalue=True)
+            ),
+            "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 0 NO MINVALUE",
+        )
+
+        self.assert_compile(
+            CreateSequence(
+                Sequence("foo_seq", start=1, maxvalue=10, cycle=True)
+            ),
+            "CREATE SEQUENCE foo_seq START WITH 1 MAXVALUE 10 CYCLE",
+        )
+
+        self.assert_compile(
+            CreateSequence(Sequence("foo_seq", cache=1000, order=True)),
+            "CREATE SEQUENCE foo_seq CACHE 1000 ORDER",
+        )
+
+        self.assert_compile(
+            CreateSequence(Sequence("foo_seq", order=True)),
+            "CREATE SEQUENCE foo_seq ORDER",
+        )
+
+        self.assert_compile(
+            DropSequence(Sequence("foo_seq")), "DROP SEQUENCE foo_seq"
+        )
+
+
+class LegacySequenceExecTest(fixtures.TestBase):
+    __requires__ = ("sequences",)
+    __backend__ = True
+
+    @classmethod
+    def setup_class(cls):
+        cls.seq = Sequence("my_sequence")
+        cls.seq.create(testing.db)
+
+    @classmethod
+    def teardown_class(cls):
+        cls.seq.drop(testing.db)
+
+    def _assert_seq_result(self, ret):
+        """asserts return of next_value is an int"""
+
+        assert isinstance(ret, util.int_types)
+        assert ret > 0
+
+    def test_implicit_connectionless(self):
+        s = Sequence("my_sequence", metadata=MetaData(testing.db))
+        self._assert_seq_result(s.execute())
+
+    def test_explicit(self, connection):
+        s = Sequence("my_sequence")
+        self._assert_seq_result(s.execute(connection))
+
+    def test_explicit_optional(self):
+        """test dialect executes a Sequence, returns nextval, whether
+        or not "optional" is set """
+
+        s = Sequence("my_sequence", optional=True)
+        self._assert_seq_result(s.execute(testing.db))
+
+    def test_func_implicit_connectionless_execute(self):
+        """test func.next_value().execute()/.scalar() works
+        with connectionless execution. """
+
+        s = Sequence("my_sequence", metadata=MetaData(testing.db))
+        self._assert_seq_result(s.next_value().execute().scalar())
+
+    def test_func_explicit(self):
+        s = Sequence("my_sequence")
+        self._assert_seq_result(testing.db.scalar(s.next_value()))
+
+    def test_func_implicit_connectionless_scalar(self):
+        """test func.next_value().execute()/.scalar() works. """
+
+        s = Sequence("my_sequence", metadata=MetaData(testing.db))
+        self._assert_seq_result(s.next_value().scalar())
+
+    def test_func_embedded_select(self):
+        """test can use next_value() in select column expr"""
+
+        s = Sequence("my_sequence")
+        self._assert_seq_result(testing.db.scalar(select([s.next_value()])))
+
+
+class SequenceExecTest(fixtures.TestBase):
+    __requires__ = ("sequences",)
+    __backend__ = True
+
+    @classmethod
+    def setup_class(cls):
+        cls.seq = Sequence("my_sequence")
+        cls.seq.create(testing.db)
+
+    @classmethod
+    def teardown_class(cls):
+        cls.seq.drop(testing.db)
+
+    def _assert_seq_result(self, ret):
+        """asserts return of next_value is an int"""
+
+        assert isinstance(ret, util.int_types)
+        assert ret > 0
+
+    def test_execute(self, connection):
+        s = Sequence("my_sequence")
+        self._assert_seq_result(connection.execute(s))
+
+    def test_execute_optional(self, connection):
+        """test dialect executes a Sequence, returns nextval, whether
+        or not "optional" is set """
+
+        s = Sequence("my_sequence", optional=True)
+        self._assert_seq_result(connection.execute(s))
+
+    def test_execute_next_value(self, connection):
+        """test func.next_value().execute()/.scalar() works
+        with connectionless execution. """
+
+        s = Sequence("my_sequence")
+        self._assert_seq_result(connection.scalar(s.next_value()))
+
+    def test_execute_optional_next_value(self, connection):
+        """test func.next_value().execute()/.scalar() works
+        with connectionless execution. """
+
+        s = Sequence("my_sequence", optional=True)
+        self._assert_seq_result(connection.scalar(s.next_value()))
+
+    def test_func_embedded_select(self, connection):
+        """test can use next_value() in select column expr"""
+
+        s = Sequence("my_sequence")
+        self._assert_seq_result(connection.scalar(select([s.next_value()])))
+
+    @testing.fails_on("oracle", "ORA-02287: sequence number not allowed here")
+    @testing.provide_metadata
+    def test_func_embedded_whereclause(self, connection):
+        """test can use next_value() in whereclause"""
+
+        metadata = self.metadata
+        t1 = Table("t", metadata, Column("x", Integer))
+        t1.create(testing.db)
+        connection.execute(t1.insert(), [{"x": 1}, {"x": 300}, {"x": 301}])
+        s = Sequence("my_sequence")
+        eq_(
+            list(
+                connection.execute(t1.select().where(t1.c.x > s.next_value()))
+            ),
+            [(300,), (301,)],
+        )
+
+    @testing.provide_metadata
+    def test_func_embedded_valuesbase(self, connection):
+        """test can use next_value() in values() of _ValuesBase"""
+
+        metadata = self.metadata
+        t1 = Table("t", metadata, Column("x", Integer))
+        t1.create(testing.db)
+        s = Sequence("my_sequence")
+        connection.execute(t1.insert().values(x=s.next_value()))
+        self._assert_seq_result(connection.scalar(t1.select()))
+
+    @testing.requires.supports_lastrowid
+    @testing.provide_metadata
+    def test_inserted_pk_no_returning_w_lastrowid(self):
+        """test inserted_primary_key contains the pk when
+        pk_col=next_value(), lastrowid is supported."""
+
+        metadata = self.metadata
+        t1 = Table("t", metadata, Column("x", Integer, primary_key=True))
+        t1.create(testing.db)
+        e = engines.testing_engine(options={"implicit_returning": False})
+        s = Sequence("my_sequence")
+
+        with e.connect() as conn:
+            r = conn.execute(t1.insert().values(x=s.next_value()))
+            self._assert_seq_result(r.inserted_primary_key[0])
+
+    @testing.requires.no_lastrowid_support
+    @testing.provide_metadata
+    def test_inserted_pk_no_returning_no_lastrowid(self):
+        """test inserted_primary_key contains [None] when
+        pk_col=next_value(), implicit returning is not used."""
+
+        metadata = self.metadata
+        t1 = Table("t", metadata, Column("x", Integer, primary_key=True))
+        t1.create(testing.db)
+
+        e = engines.testing_engine(options={"implicit_returning": False})
+        s = Sequence("my_sequence")
+        with e.connect() as conn:
+            r = conn.execute(t1.insert().values(x=s.next_value()))
+            eq_(r.inserted_primary_key, [None])
+
+    @testing.requires.returning
+    @testing.provide_metadata
+    def test_inserted_pk_implicit_returning(self):
+        """test inserted_primary_key contains the result when
+        pk_col=next_value(), when implicit returning is used."""
+
+        metadata = self.metadata
+        s = Sequence("my_sequence")
+        t1 = Table("t", metadata, Column("x", Integer, primary_key=True))
+        t1.create(testing.db)
+
+        e = engines.testing_engine(options={"implicit_returning": True})
+        with e.connect() as conn:
+            r = conn.execute(t1.insert().values(x=s.next_value()))
+            self._assert_seq_result(r.inserted_primary_key[0])
+
+
+class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
+    __requires__ = ("sequences",)
+    __backend__ = True
+
+    @testing.combinations(
+        (Sequence("foo_seq"),),
+        (Sequence("foo_seq", start=8),),
+        (Sequence("foo_seq", increment=5),),
+    )
+    def test_start_increment(self, seq):
+        seq.create(testing.db)
+        try:
+            with testing.db.connect() as conn:
+                values = [conn.execute(seq) for i in range(3)]
+                start = seq.start or 1
+                inc = seq.increment or 1
+                eq_(values, list(range(start, start + inc * 3, inc)))
+
+        finally:
+            seq.drop(testing.db)
+
+    def _has_sequence(self, connection, name):
+        return testing.db.dialect.has_sequence(connection, name)
+
+    def test_nextval_unsupported(self):
+        """test next_value() used on non-sequence platform
+        raises NotImplementedError."""
+
+        s = Sequence("my_seq")
+        d = sqlite.dialect()
+        assert_raises_message(
+            NotImplementedError,
+            "Dialect 'sqlite' does not support sequence increments.",
+            s.next_value().compile,
+            dialect=d,
+        )
+
+    def test_checkfirst_sequence(self, connection):
+        s = Sequence("my_sequence")
+        s.create(connection, checkfirst=False)
+        assert self._has_sequence(connection, "my_sequence")
+        s.create(connection, checkfirst=True)
+        s.drop(connection, checkfirst=False)
+        assert not self._has_sequence(connection, "my_sequence")
+        s.drop(connection, checkfirst=True)
+
+    def test_checkfirst_metadata(self, connection):
+        m = MetaData()
+        Sequence("my_sequence", metadata=m)
+        m.create_all(connection, checkfirst=False)
+        assert self._has_sequence(connection, "my_sequence")
+        m.create_all(connection, checkfirst=True)
+        m.drop_all(connection, checkfirst=False)
+        assert not self._has_sequence(connection, "my_sequence")
+        m.drop_all(connection, checkfirst=True)
+
+    def test_checkfirst_table(self, connection):
+        m = MetaData()
+        s = Sequence("my_sequence")
+        t = Table("t", m, Column("c", Integer, s, primary_key=True))
+        t.create(connection, checkfirst=False)
+        assert self._has_sequence(connection, "my_sequence")
+        t.create(connection, checkfirst=True)
+        t.drop(connection, checkfirst=False)
+        assert not self._has_sequence(connection, "my_sequence")
+        t.drop(connection, checkfirst=True)
+
+    @testing.provide_metadata
+    def test_table_overrides_metadata_create(self, connection):
+        metadata = self.metadata
+        Sequence("s1", metadata=metadata)
+        s2 = Sequence("s2", metadata=metadata)
+        s3 = Sequence("s3")
+        t = Table("t", metadata, Column("c", Integer, s3, primary_key=True))
+        assert s3.metadata is metadata
+
+        t.create(connection, checkfirst=True)
+        s3.drop(connection)
+
+        # 't' is created, and 's3' won't be
+        # re-created since it's linked to 't'.
+        # 's1' and 's2' are, however.
+        metadata.create_all(connection)
+        assert self._has_sequence(connection, "s1")
+        assert self._has_sequence(connection, "s2")
+        assert not self._has_sequence(connection, "s3")
+
+        s2.drop(connection)
+        assert self._has_sequence(connection, "s1")
+        assert not self._has_sequence(connection, "s2")
+
+        metadata.drop_all(connection)
+        assert not self._has_sequence(connection, "s1")
+        assert not self._has_sequence(connection, "s2")
+
+    @testing.requires.returning
+    @testing.provide_metadata
+    def test_freestanding_sequence_via_autoinc(self, connection):
+        t = Table(
+            "some_table",
+            self.metadata,
+            Column(
+                "id",
+                Integer,
+                autoincrement=True,
+                primary_key=True,
+                default=Sequence(
+                    "my_sequence", metadata=self.metadata
+                ).next_value(),
+            ),
+        )
+        self.metadata.create_all(connection)
+
+        result = connection.execute(t.insert())
+        eq_(result.inserted_primary_key, [1])
+
+
+class TableBoundSequenceTest(fixtures.TablesTest):
+    __requires__ = ("sequences",)
+    __backend__ = True
+
+    @classmethod
+    def define_tables(cls, metadata):
+        Table(
+            "cartitems",
+            metadata,
+            Column(
+                "cart_id", Integer, Sequence("cart_id_seq"), primary_key=True
+            ),
+            Column("description", String(40)),
+            Column("createdate", sa.DateTime()),
+        )
+
+        # a little bit of implicit case sensitive naming test going on here
+        Table(
+            "Manager",
+            metadata,
+            Column("obj_id", Integer, Sequence("obj_id_seq")),
+            Column("name", String(128)),
+            Column(
+                "id",
+                Integer,
+                Sequence("Manager_id_seq", optional=True),
+                primary_key=True,
+            ),
+        )
+
+    def test_insert_via_seq(self, connection):
+        cartitems = self.tables.cartitems
+
+        connection.execute(cartitems.insert(), dict(description="hi"))
+        connection.execute(cartitems.insert(), dict(description="there"))
+        r = connection.execute(cartitems.insert(), dict(description="lala"))
+
+        eq_(r.inserted_primary_key[0], 3)
+
+        eq_(
+            connection.scalar(
+                sa.select([cartitems.c.cart_id]).where(
+                    cartitems.c.description == "lala"
+                ),
+            ),
+            3,
+        )
+
+    def test_seq_nonpk(self):
+        """test sequences fire off as defaults on non-pk columns"""
+
+        sometable = self.tables.Manager
+
+        engine = engines.testing_engine(options={"implicit_returning": False})
+
+        with engine.connect() as conn:
+            result = conn.execute(sometable.insert(), dict(name="somename"))
+
+            eq_(result.postfetch_cols(), [sometable.c.obj_id])
+
+            result = conn.execute(sometable.insert(), dict(name="someother"))
+
+            conn.execute(
+                sometable.insert(), [{"name": "name3"}, {"name": "name4"}]
+            )
+            eq_(
+                list(
+                    conn.execute(sometable.select().order_by(sometable.c.id))
+                ),
+                [
+                    (1, "somename", 1),
+                    (2, "someother", 2),
+                    (3, "name3", 3),
+                    (4, "name4", 4),
+                ],
+            )
+
+
+class SequenceAsServerDefaultTest(
+    testing.AssertsExecutionResults, fixtures.TablesTest
+):
+    __requires__ = ("sequences_as_server_defaults",)
+    __backend__ = True
+
+    run_create_tables = "each"
+
+    @classmethod
+    def define_tables(cls, metadata):
+        m = metadata
+
+        s = Sequence("t_seq", metadata=m)
+        Table(
+            "t_seq_test",
+            m,
+            Column("id", Integer, s, server_default=s.next_value()),
+            Column("data", String(50)),
+        )
+
+        s2 = Sequence("t_seq_2", metadata=m)
+        Table(
+            "t_seq_test_2",
+            m,
+            Column("id", Integer, server_default=s2.next_value()),
+            Column("data", String(50)),
+        )
+
+    def test_default_textual_w_default(self, connection):
+        connection.exec_driver_sql(
+            "insert into t_seq_test (data) values ('some data')"
+        )
+
+        eq_(
+            connection.exec_driver_sql("select id from t_seq_test").scalar(), 1
+        )
+
+    def test_default_core_w_default(self, connection):
+        t_seq_test = self.tables.t_seq_test
+        connection.execute(t_seq_test.insert().values(data="some data"))
+
+        eq_(connection.scalar(select([t_seq_test.c.id])), 1)
+
+    def test_default_textual_server_only(self, connection):
+        connection.exec_driver_sql(
+            "insert into t_seq_test_2 (data) values ('some data')"
+        )
+
+        eq_(
+            connection.exec_driver_sql("select id from t_seq_test_2").scalar(),
+            1,
+        )
+
+    def test_default_core_server_only(self, connection):
+        t_seq_test = self.tables.t_seq_test_2
+        connection.execute(t_seq_test.insert().values(data="some data"))
+
+        eq_(connection.scalar(select([t_seq_test.c.id])), 1)
+
+    def test_drop_ordering(self):
+        with self.sql_execution_asserter(testing.db) as asserter:
+            self.metadata.drop_all(checkfirst=False)
+
+        asserter.assert_(
+            AllOf(
+                CompiledSQL("DROP TABLE t_seq_test_2", {}),
+                EachOf(
+                    CompiledSQL("DROP TABLE t_seq_test", {}),
+                    CompiledSQL(
+                        "DROP SEQUENCE t_seq",  # dropped as part of t_seq_test
+                        {},
+                    ),
+                ),
+            ),
+            CompiledSQL(
+                "DROP SEQUENCE t_seq_2",  # dropped as part of metadata level
+                {},
+            ),
+        )