]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Clean up .execute in test/sql/test_resultset.py
authorGord Thompson <gord@gordthompson.com>
Sun, 12 Apr 2020 17:58:58 +0000 (11:58 -0600)
committerGord Thompson <gord@gordthompson.com>
Sun, 12 Apr 2020 19:37:48 +0000 (13:37 -0600)
Change-Id: Ie4bb2b8e94361d15a3f1f1c21ce1c59cff1cf735

test/sql/test_resultset.py

index 253ad7b38e21c01e76690c71ef8f7ba16e73daf3..f8d245228acc17dfe75e2a85c7d821706e5640d4 100644 (file)
@@ -90,29 +90,31 @@ class ResultProxyTest(fixtures.TablesTest):
             test_needs_acid=True,
         )
 
-    def test_row_iteration(self):
+    def test_row_iteration(self, connection):
         users = self.tables.users
 
-        users.insert().execute(
+        connection.execute(
+            users.insert(),
             {"user_id": 7, "user_name": "jack"},
             {"user_id": 8, "user_name": "ed"},
             {"user_id": 9, "user_name": "fred"},
         )
-        r = users.select().execute()
+        r = connection.execute(users.select())
         rows = []
         for row in r:
             rows.append(row)
         eq_(len(rows), 3)
 
-    def test_row_next(self):
+    def test_row_next(self, connection):
         users = self.tables.users
 
-        users.insert().execute(
+        connection.execute(
+            users.insert(),
             {"user_id": 7, "user_name": "jack"},
             {"user_id": 8, "user_name": "ed"},
             {"user_id": 9, "user_name": "fred"},
         )
-        r = users.select().execute()
+        r = connection.execute(users.select())
         rows = []
         while True:
             row = next(r, "foo")
@@ -122,10 +124,11 @@ class ResultProxyTest(fixtures.TablesTest):
         eq_(len(rows), 3)
 
     @testing.requires.subqueries
-    def test_anonymous_rows(self):
+    def test_anonymous_rows(self, connection):
         users = self.tables.users
 
-        users.insert().execute(
+        connection.execute(
+            users.insert(),
             {"user_id": 7, "user_name": "jack"},
             {"user_id": 8, "user_name": "ed"},
             {"user_id": 9, "user_name": "fred"},
@@ -136,15 +139,17 @@ class ResultProxyTest(fixtures.TablesTest):
             .where(users.c.user_name == "jack")
             .scalar_subquery()
         )
-        for row in select([sel + 1, sel + 3], bind=users.bind).execute():
+        for row in connection.execute(
+            select([sel + 1, sel + 3], bind=users.bind)
+        ):
             eq_(row._mapping["anon_1"], 8)
             eq_(row._mapping["anon_2"], 10)
 
-    def test_row_comparison(self):
+    def test_row_comparison(self, connection):
         users = self.tables.users
 
-        users.insert().execute(user_id=7, user_name="jack")
-        rp = users.select().execute().first()
+        connection.execute(users.insert(), user_id=7, user_name="jack")
+        rp = connection.execute(users.select()).first()
 
         eq_(rp, rp)
         is_(not (rp != rp), True)
@@ -192,19 +197,19 @@ class ResultProxyTest(fixtures.TablesTest):
                     eq_(control, op(compare, rp))
 
     @testing.provide_metadata
-    def test_column_label_overlap_fallback(self):
+    def test_column_label_overlap_fallback(self, connection):
         content = Table("content", self.metadata, Column("type", String(30)))
         bar = Table("bar", self.metadata, Column("content_type", String(30)))
-        self.metadata.create_all(testing.db)
-        testing.db.execute(content.insert().values(type="t1"))
+        self.metadata.create_all(connection)
+        connection.execute(content.insert().values(type="t1"))
 
-        row = testing.db.execute(content.select(use_labels=True)).first()
+        row = connection.execute(content.select(use_labels=True)).first()
         in_(content.c.type, row._mapping)
         not_in_(bar.c.content_type, row._mapping)
 
         not_in_(bar.c.content_type, row._mapping)
 
-        row = testing.db.execute(
+        row = connection.execute(
             select([func.now().label("content_type")])
         ).first()
 
@@ -212,10 +217,11 @@ class ResultProxyTest(fixtures.TablesTest):
 
         not_in_(bar.c.content_type, row._mapping)
 
-    def test_pickled_rows(self):
+    def test_pickled_rows(self, connection):
         users = self.tables.users
 
-        users.insert().execute(
+        connection.execute(
+            users.insert(),
             {"user_id": 7, "user_name": "jack"},
             {"user_id": 8, "user_name": "ed"},
             {"user_id": 9, "user_name": "fred"},
@@ -223,12 +229,11 @@ class ResultProxyTest(fixtures.TablesTest):
 
         for pickle in False, True:
             for use_labels in False, True:
-                result = (
-                    users.select(use_labels=use_labels)
-                    .order_by(users.c.user_id)
-                    .execute()
-                    .fetchall()
-                )
+                result = connection.execute(
+                    users.select(use_labels=use_labels).order_by(
+                        users.c.user_id
+                    )
+                ).fetchall()
 
                 if pickle:
                     result = util.pickle.loads(util.pickle.dumps(result))
@@ -255,8 +260,8 @@ class ResultProxyTest(fixtures.TablesTest):
                     lambda: result[0]._mapping["fake key"],
                 )
 
-    def test_column_error_printing(self):
-        result = testing.db.execute(select([1]))
+    def test_column_error_printing(self, connection):
+        result = connection.execute(select([1]))
         row = result.first()
 
         class unprintable(object):
@@ -283,42 +288,45 @@ class ResultProxyTest(fixtures.TablesTest):
                 lambda: row._mapping[accessor],
             )
 
-    def test_fetchmany(self):
+    def test_fetchmany(self, connection):
         users = self.tables.users
 
-        users.insert().execute(user_id=7, user_name="jack")
-        users.insert().execute(user_id=8, user_name="ed")
-        users.insert().execute(user_id=9, user_name="fred")
-        r = users.select().execute()
+        connection.execute(users.insert(), user_id=7, user_name="jack")
+        connection.execute(users.insert(), user_id=8, user_name="ed")
+        connection.execute(users.insert(), user_id=9, user_name="fred")
+        r = connection.execute(users.select())
         rows = []
         for row in r.fetchmany(size=2):
             rows.append(row)
         eq_(len(rows), 2)
 
-    def test_column_slices(self):
+    def test_column_slices(self, connection):
         users = self.tables.users
         addresses = self.tables.addresses
 
-        users.insert().execute(user_id=1, user_name="john")
-        users.insert().execute(user_id=2, user_name="jack")
-        addresses.insert().execute(
-            address_id=1, user_id=2, address="foo@bar.com"
+        connection.execute(users.insert(), user_id=1, user_name="john")
+        connection.execute(users.insert(), user_id=2, user_name="jack")
+        connection.execute(
+            addresses.insert(), address_id=1, user_id=2, address="foo@bar.com"
         )
 
-        r = text("select * from addresses", bind=testing.db).execute().first()
+        r = connection.execute(
+            text("select * from addresses", bind=testing.db)
+        ).first()
         eq_(r[0:1], (1,))
         eq_(r[1:], (2, "foo@bar.com"))
         eq_(r[:-1], (1, 2))
 
-    def test_column_accessor_basic_compiled_mapping(self):
+    def test_column_accessor_basic_compiled_mapping(self, connection):
         users = self.tables.users
 
-        users.insert().execute(
+        connection.execute(
+            users.insert(),
             dict(user_id=1, user_name="john"),
             dict(user_id=2, user_name="jack"),
         )
 
-        r = users.select(users.c.user_id == 2).execute().first()
+        r = connection.execute(users.select(users.c.user_id == 2)).first()
         eq_(r.user_id, 2)
         eq_(r._mapping["user_id"], 2)
         eq_(r._mapping[users.c.user_id], 2)
@@ -327,15 +335,16 @@ class ResultProxyTest(fixtures.TablesTest):
         eq_(r._mapping["user_name"], "jack")
         eq_(r._mapping[users.c.user_name], "jack")
 
-    def test_column_accessor_basic_compiled_traditional(self):
+    def test_column_accessor_basic_compiled_traditional(self, connection):
         users = self.tables.users
 
-        users.insert().execute(
+        connection.execute(
+            users.insert(),
             dict(user_id=1, user_name="john"),
             dict(user_id=2, user_name="jack"),
         )
 
-        r = users.select(users.c.user_id == 2).execute().first()
+        r = connection.execute(users.select(users.c.user_id == 2)).first()
 
         eq_(r.user_id, 2)
         eq_(r._mapping["user_id"], 2)
@@ -345,28 +354,30 @@ class ResultProxyTest(fixtures.TablesTest):
         eq_(r._mapping["user_name"], "jack")
         eq_(r._mapping[users.c.user_name], "jack")
 
-    def test_row_getitem_string(self):
+    def test_row_getitem_string(self, connection):
         users = self.tables.users
 
-        users.insert().execute(
+        connection.execute(
+            users.insert(),
             dict(user_id=1, user_name="john"),
             dict(user_id=2, user_name="jack"),
         )
 
-        r = testing.db.execute(
+        r = connection.execute(
             text("select * from users where user_id=2")
         ).first()
 
         eq_(r._mapping["user_name"], "jack")
 
-    def test_column_accessor_basic_text(self):
+    def test_column_accessor_basic_text(self, connection):
         users = self.tables.users
 
-        users.insert().execute(
+        connection.execute(
+            users.insert(),
             dict(user_id=1, user_name="john"),
             dict(user_id=2, user_name="jack"),
         )
-        r = testing.db.execute(
+        r = connection.execute(
             text("select * from users where user_id=2")
         ).first()
 
@@ -379,14 +390,15 @@ class ResultProxyTest(fixtures.TablesTest):
         eq_(r.user_name, "jack")
         eq_(r._mapping["user_name"], "jack")
 
-    def test_column_accessor_text_colexplicit(self):
+    def test_column_accessor_text_colexplicit(self, connection):
         users = self.tables.users
 
-        users.insert().execute(
+        connection.execute(
+            users.insert(),
             dict(user_id=1, user_name="john"),
             dict(user_id=2, user_name="jack"),
         )
-        r = testing.db.execute(
+        r = connection.execute(
             text("select * from users where user_id=2").columns(
                 users.c.user_id, users.c.user_name
             )
@@ -400,16 +412,17 @@ class ResultProxyTest(fixtures.TablesTest):
         eq_(r._mapping["user_name"], "jack")
         eq_(r._mapping[users.c.user_name], "jack")
 
-    def test_column_accessor_textual_select(self):
+    def test_column_accessor_textual_select(self, connection):
         users = self.tables.users
 
-        users.insert().execute(
+        connection.execute(
+            users.insert(),
             dict(user_id=1, user_name="john"),
             dict(user_id=2, user_name="jack"),
         )
         # this will create column() objects inside
         # the select(), these need to match on name anyway
-        r = testing.db.execute(
+        r = connection.execute(
             select([column("user_id"), column("user_name")])
             .select_from(table("users"))
             .where(text("user_id=2"))
@@ -422,14 +435,14 @@ class ResultProxyTest(fixtures.TablesTest):
         eq_(r.user_name, "jack")
         eq_(r._mapping["user_name"], "jack")
 
-    def test_column_accessor_dotted_union(self):
+    def test_column_accessor_dotted_union(self, connection):
         users = self.tables.users
 
-        users.insert().execute(dict(user_id=1, user_name="john"))
+        connection.execute(users.insert(), dict(user_id=1, user_name="john"))
 
         # test a little sqlite < 3.10.0 weirdness - with the UNION,
         # cols come back as "users.user_id" in cursor.description
-        r = testing.db.execute(
+        r = connection.execute(
             text(
                 "select users.user_id, users.user_name "
                 "from users "
@@ -441,23 +454,20 @@ class ResultProxyTest(fixtures.TablesTest):
         eq_(r._mapping["user_name"], "john")
         eq_(list(r._fields), ["user_id", "user_name"])
 
-    def test_column_accessor_sqlite_raw(self):
+    def test_column_accessor_sqlite_raw(self, connection):
         users = self.tables.users
 
-        users.insert().execute(dict(user_id=1, user_name="john"))
+        connection.execute(users.insert(), dict(user_id=1, user_name="john"))
 
-        r = (
+        r = connection.execute(
             text(
                 "select users.user_id, users.user_name "
                 "from users "
                 "UNION select users.user_id, "
                 "users.user_name from users",
                 bind=testing.db,
-            )
-            .execution_options(sqlite_raw_colnames=True)
-            .execute()
-            .first()
-        )
+            ).execution_options(sqlite_raw_colnames=True)
+        ).first()
 
         if testing.against("sqlite < 3.10.0"):
             not_in_("user_id", r)
@@ -474,12 +484,12 @@ class ResultProxyTest(fixtures.TablesTest):
 
             eq_(list(r._fields), ["user_id", "user_name"])
 
-    def test_column_accessor_sqlite_translated(self):
+    def test_column_accessor_sqlite_translated(self, connection):
         users = self.tables.users
 
-        users.insert().execute(dict(user_id=1, user_name="john"))
+        connection.execute(users.insert(), dict(user_id=1, user_name="john"))
 
-        r = (
+        r = connection.execute(
             text(
                 "select users.user_id, users.user_name "
                 "from users "
@@ -487,9 +497,7 @@ class ResultProxyTest(fixtures.TablesTest):
                 "users.user_name from users",
                 bind=testing.db,
             )
-            .execute()
-            .first()
-        )
+        ).first()
         eq_(r._mapping["user_id"], 1)
         eq_(r._mapping["user_name"], "john")
 
@@ -502,44 +510,38 @@ class ResultProxyTest(fixtures.TablesTest):
 
         eq_(list(r._fields), ["user_id", "user_name"])
 
-    def test_column_accessor_labels_w_dots(self):
+    def test_column_accessor_labels_w_dots(self, connection):
         users = self.tables.users
 
-        users.insert().execute(dict(user_id=1, user_name="john"))
+        connection.execute(users.insert(), dict(user_id=1, user_name="john"))
         # test using literal tablename.colname
-        r = (
+        r = connection.execute(
             text(
                 'select users.user_id AS "users.user_id", '
                 'users.user_name AS "users.user_name" '
                 "from users",
                 bind=testing.db,
-            )
-            .execution_options(sqlite_raw_colnames=True)
-            .execute()
-            .first()
-        )
+            ).execution_options(sqlite_raw_colnames=True)
+        ).first()
         eq_(r._mapping["users.user_id"], 1)
         eq_(r._mapping["users.user_name"], "john")
         not_in_("user_name", r._mapping)
         eq_(list(r._fields), ["users.user_id", "users.user_name"])
 
-    def test_column_accessor_unary(self):
+    def test_column_accessor_unary(self, connection):
         users = self.tables.users
 
-        users.insert().execute(dict(user_id=1, user_name="john"))
+        connection.execute(users.insert(), dict(user_id=1, user_name="john"))
 
         # unary expressions
-        r = (
-            select([users.c.user_name.distinct()])
-            .order_by(users.c.user_name)
-            .execute()
-            .first()
-        )
+        r = connection.execute(
+            select([users.c.user_name.distinct()]).order_by(users.c.user_name)
+        ).first()
         eq_(r._mapping[users.c.user_name], "john")
         eq_(r.user_name, "john")
 
-    def test_column_accessor_err(self):
-        r = testing.db.execute(select([1])).first()
+    def test_column_accessor_err(self, connection):
+        r = connection.execute(select([1])).first()
         assert_raises_message(
             AttributeError,
             "Could not locate column in row for column 'foo'",
@@ -601,6 +603,7 @@ class ResultProxyTest(fixtures.TablesTest):
         )
 
     def test_connectionless_autoclose_rows_exhausted(self):
+        # TODO: deprecate for 2.0
         users = self.tables.users
         with testing.db.connect() as conn:
             conn.execute(users.insert(), dict(user_id=1, user_name="john"))
@@ -615,6 +618,7 @@ class ResultProxyTest(fixtures.TablesTest):
 
     @testing.requires.returning
     def test_connectionless_autoclose_crud_rows_exhausted(self):
+        # TODO: deprecate for 2.0
         users = self.tables.users
         stmt = (
             users.insert()
@@ -630,6 +634,7 @@ class ResultProxyTest(fixtures.TablesTest):
         assert connection.closed
 
     def test_connectionless_autoclose_no_rows(self):
+        # TODO: deprecate for 2.0
         result = testing.db.execute(text("select * from users"))
         connection = result.connection
         assert not connection.closed
@@ -638,6 +643,7 @@ class ResultProxyTest(fixtures.TablesTest):
 
     @testing.requires.updateable_autoincrement_pks
     def test_connectionless_autoclose_no_metadata(self):
+        # TODO: deprecate for 2.0
         result = testing.db.execute(text("update users set user_id=5"))
         connection = result.connection
         assert connection.closed
@@ -647,8 +653,8 @@ class ResultProxyTest(fixtures.TablesTest):
             result.fetchone,
         )
 
-    def test_row_case_sensitive(self):
-        row = testing.db.execute(
+    def test_row_case_sensitive(self, connection):
+        row = connection.execute(
             select(
                 [
                     literal_column("1").label("case_insensitive"),
@@ -670,75 +676,80 @@ class ResultProxyTest(fixtures.TablesTest):
         assert_raises(KeyError, lambda: row._mapping["casesensitive"])
 
     def test_row_case_sensitive_unoptimized(self):
-        ins_db = engines.testing_engine()
-        row = ins_db.execute(
-            select(
-                [
-                    literal_column("1").label("case_insensitive"),
-                    literal_column("2").label("CaseSensitive"),
-                    text("3 AS screw_up_the_cols"),
-                ]
-            )
-        ).first()
+        with engines.testing_engine().connect() as ins_conn:
+            row = ins_conn.execute(
+                select(
+                    [
+                        literal_column("1").label("case_insensitive"),
+                        literal_column("2").label("CaseSensitive"),
+                        text("3 AS screw_up_the_cols"),
+                    ]
+                )
+            ).first()
 
-        eq_(
-            list(row._fields),
-            ["case_insensitive", "CaseSensitive", "screw_up_the_cols"],
-        )
+            eq_(
+                list(row._fields),
+                ["case_insensitive", "CaseSensitive", "screw_up_the_cols"],
+            )
 
-        in_("case_insensitive", row._keymap)
-        in_("CaseSensitive", row._keymap)
-        not_in_("casesensitive", row._keymap)
+            in_("case_insensitive", row._keymap)
+            in_("CaseSensitive", row._keymap)
+            not_in_("casesensitive", row._keymap)
 
-        eq_(row._mapping["case_insensitive"], 1)
-        eq_(row._mapping["CaseSensitive"], 2)
-        eq_(row._mapping["screw_up_the_cols"], 3)
+            eq_(row._mapping["case_insensitive"], 1)
+            eq_(row._mapping["CaseSensitive"], 2)
+            eq_(row._mapping["screw_up_the_cols"], 3)
 
-        assert_raises(KeyError, lambda: row._mapping["Case_insensitive"])
-        assert_raises(KeyError, lambda: row._mapping["casesensitive"])
-        assert_raises(KeyError, lambda: row._mapping["screw_UP_the_cols"])
+            assert_raises(KeyError, lambda: row._mapping["Case_insensitive"])
+            assert_raises(KeyError, lambda: row._mapping["casesensitive"])
+            assert_raises(KeyError, lambda: row._mapping["screw_UP_the_cols"])
 
-    def test_row_as_args(self):
+    def test_row_as_args(self, connection):
         users = self.tables.users
 
-        users.insert().execute(user_id=1, user_name="john")
-        r = users.select(users.c.user_id == 1).execute().first()
-        users.delete().execute()
-        users.insert().execute(r._mapping)
-        eq_(users.select().execute().fetchall(), [(1, "john")])
+        connection.execute(users.insert(), user_id=1, user_name="john")
+        r = connection.execute(users.select(users.c.user_id == 1)).first()
+        connection.execute(users.delete())
+        connection.execute(users.insert(), r._mapping)
+        eq_(connection.execute(users.select()).fetchall(), [(1, "john")])
 
-    def test_result_as_args(self):
+    def test_result_as_args(self, connection):
         users = self.tables.users
         users2 = self.tables.users2
 
-        users.insert().execute(
+        connection.execute(
+            users.insert(),
             [
                 dict(user_id=1, user_name="john"),
                 dict(user_id=2, user_name="ed"),
-            ]
+            ],
         )
-        r = users.select().execute()
-        users2.insert().execute([row._mapping for row in r])
+        r = connection.execute(users.select())
+        connection.execute(users2.insert(), [row._mapping for row in r])
         eq_(
-            users2.select().order_by(users2.c.user_id).execute().fetchall(),
+            connection.execute(
+                users2.select().order_by(users2.c.user_id)
+            ).fetchall(),
             [(1, "john"), (2, "ed")],
         )
 
-        users2.delete().execute()
-        r = users.select().execute()
-        users2.insert().execute(*[row._mapping for row in r])
+        connection.execute(users2.delete())
+        r = connection.execute(users.select())
+        connection.execute(users2.insert(), *[row._mapping for row in r])
         eq_(
-            users2.select().order_by(users2.c.user_id).execute().fetchall(),
+            connection.execute(
+                users2.select().order_by(users2.c.user_id)
+            ).fetchall(),
             [(1, "john"), (2, "ed")],
         )
 
     @testing.requires.duplicate_names_in_cursor_description
-    def test_ambiguous_column(self):
+    def test_ambiguous_column(self, connection):
         users = self.tables.users
         addresses = self.tables.addresses
 
-        users.insert().execute(user_id=1, user_name="john")
-        result = users.outerjoin(addresses).select().execute()
+        connection.execute(users.insert(), user_id=1, user_name="john")
+        result = connection.execute(users.outerjoin(addresses).select())
         r = result.first()
 
         assert_raises_message(
@@ -776,7 +787,7 @@ class ResultProxyTest(fixtures.TablesTest):
             lambda: r._mapping["user_id"],
         )
 
-        result = users.outerjoin(addresses).select().execute()
+        result = connection.execute(users.outerjoin(addresses).select())
         result = _result.BufferedColumnResultProxy(result.context)
         r = result.first()
         assert isinstance(r, _result.BufferedColumnRow)
@@ -787,16 +798,16 @@ class ResultProxyTest(fixtures.TablesTest):
         )
 
     @testing.requires.duplicate_names_in_cursor_description
-    def test_ambiguous_column_by_col(self):
+    def test_ambiguous_column_by_col(self, connection):
         users = self.tables.users
 
-        users.insert().execute(user_id=1, user_name="john")
+        connection.execute(users.insert(), user_id=1, user_name="john")
         ua = users.alias()
         u2 = users.alias()
-        result = (
-            select([users.c.user_id, ua.c.user_id])
-            .select_from(users.join(ua, true()))
-            .execute()
+        result = connection.execute(
+            select([users.c.user_id, ua.c.user_id]).select_from(
+                users.join(ua, true())
+            )
         )
         row = result.first()
 
@@ -815,18 +826,18 @@ class ResultProxyTest(fixtures.TablesTest):
         )
 
     @testing.requires.duplicate_names_in_cursor_description
-    def test_ambiguous_column_contains(self):
+    def test_ambiguous_column_contains(self, connection):
         users = self.tables.users
         addresses = self.tables.addresses
 
         # ticket 2702.  in 0.7 we'd get True, False.
         # in 0.8, both columns are present so it's True;
         # but when they're fetched you'll get the ambiguous error.
-        users.insert().execute(user_id=1, user_name="john")
-        result = (
-            select([users.c.user_id, addresses.c.user_id])
-            .select_from(users.outerjoin(addresses))
-            .execute()
+        connection.execute(users.insert(), user_id=1, user_name="john")
+        result = connection.execute(
+            select([users.c.user_id, addresses.c.user_id]).select_from(
+                users.outerjoin(addresses)
+            )
         )
         row = result.first()
 
@@ -840,106 +851,102 @@ class ResultProxyTest(fixtures.TablesTest):
             set([True]),
         )
 
-    def test_loose_matching_one(self):
+    def test_loose_matching_one(self, connection):
         users = self.tables.users
         addresses = self.tables.addresses
 
-        with testing.db.connect() as conn:
-            conn.execute(users.insert(), {"user_id": 1, "user_name": "john"})
-            conn.execute(
-                addresses.insert(),
-                {"address_id": 1, "user_id": 1, "address": "email"},
+        connection.execute(users.insert(), {"user_id": 1, "user_name": "john"})
+        connection.execute(
+            addresses.insert(),
+            {"address_id": 1, "user_id": 1, "address": "email"},
+        )
+
+        # use some column labels in the SELECT
+        result = connection.execute(
+            TextualSelect(
+                text(
+                    "select users.user_name AS users_user_name, "
+                    "users.user_id AS user_id, "
+                    "addresses.address_id AS address_id "
+                    "FROM users JOIN addresses "
+                    "ON users.user_id = addresses.user_id "
+                    "WHERE users.user_id=1 "
+                ),
+                [users.c.user_id, users.c.user_name, addresses.c.address_id],
+                positional=False,
             )
+        )
+        row = result.first()
+        eq_(row._mapping[users.c.user_id], 1)
+        eq_(row._mapping[users.c.user_name], "john")
 
-            # use some column labels in the SELECT
-            result = conn.execute(
-                TextualSelect(
-                    text(
-                        "select users.user_name AS users_user_name, "
-                        "users.user_id AS user_id, "
-                        "addresses.address_id AS address_id "
-                        "FROM users JOIN addresses "
-                        "ON users.user_id = addresses.user_id "
-                        "WHERE users.user_id=1 "
-                    ),
-                    [
-                        users.c.user_id,
-                        users.c.user_name,
-                        addresses.c.address_id,
-                    ],
-                    positional=False,
-                )
-            )
-            row = result.first()
-            eq_(row._mapping[users.c.user_id], 1)
-            eq_(row._mapping[users.c.user_name], "john")
-
-    def test_loose_matching_two(self):
+    def test_loose_matching_two(self, connection):
         users = self.tables.users
         addresses = self.tables.addresses
 
-        with testing.db.connect() as conn:
-            conn.execute(users.insert(), {"user_id": 1, "user_name": "john"})
-            conn.execute(
-                addresses.insert(),
-                {"address_id": 1, "user_id": 1, "address": "email"},
-            )
-
-            # use some column labels in the SELECT
-            result = conn.execute(
-                TextualSelect(
-                    text(
-                        "select users.user_name AS users_user_name, "
-                        "users.user_id AS user_id, "
-                        "addresses.user_id "
-                        "FROM users JOIN addresses "
-                        "ON users.user_id = addresses.user_id "
-                        "WHERE users.user_id=1 "
-                    ),
-                    [users.c.user_id, users.c.user_name, addresses.c.user_id],
-                    positional=False,
-                )
+        connection.execute(users.insert(), {"user_id": 1, "user_name": "john"})
+        connection.execute(
+            addresses.insert(),
+            {"address_id": 1, "user_id": 1, "address": "email"},
+        )
+
+        # use some column labels in the SELECT
+        result = connection.execute(
+            TextualSelect(
+                text(
+                    "select users.user_name AS users_user_name, "
+                    "users.user_id AS user_id, "
+                    "addresses.user_id "
+                    "FROM users JOIN addresses "
+                    "ON users.user_id = addresses.user_id "
+                    "WHERE users.user_id=1 "
+                ),
+                [users.c.user_id, users.c.user_name, addresses.c.user_id],
+                positional=False,
             )
-            row = result.first()
+        )
+        row = result.first()
 
-            assert_raises_message(
-                exc.InvalidRequestError,
-                "Ambiguous column name",
-                lambda: row._mapping[users.c.user_id],
-            )
-            assert_raises_message(
-                exc.InvalidRequestError,
-                "Ambiguous column name",
-                lambda: row._mapping[addresses.c.user_id],
-            )
-            eq_(row._mapping[users.c.user_name], "john")
+        assert_raises_message(
+            exc.InvalidRequestError,
+            "Ambiguous column name",
+            lambda: row._mapping[users.c.user_id],
+        )
+        assert_raises_message(
+            exc.InvalidRequestError,
+            "Ambiguous column name",
+            lambda: row._mapping[addresses.c.user_id],
+        )
+        eq_(row._mapping[users.c.user_name], "john")
 
-    def test_ambiguous_column_by_col_plus_label(self):
+    def test_ambiguous_column_by_col_plus_label(self, connection):
         users = self.tables.users
 
-        users.insert().execute(user_id=1, user_name="john")
-        result = select(
-            [
-                users.c.user_id,
-                type_coerce(users.c.user_id, Integer).label("foo"),
-            ]
-        ).execute()
+        connection.execute(users.insert(), user_id=1, user_name="john")
+        result = connection.execute(
+            select(
+                [
+                    users.c.user_id,
+                    type_coerce(users.c.user_id, Integer).label("foo"),
+                ]
+            )
+        )
         row = result.first()
         eq_(row._mapping[users.c.user_id], 1)
         eq_(row[1], 1)
 
-    def test_fetch_partial_result_map(self):
+    def test_fetch_partial_result_map(self, connection):
         users = self.tables.users
 
-        users.insert().execute(user_id=7, user_name="ed")
+        connection.execute(users.insert(), user_id=7, user_name="ed")
 
         t = text("select * from users").columns(user_name=String())
-        eq_(testing.db.execute(t).fetchall(), [(7, "ed")])
+        eq_(connection.execute(t).fetchall(), [(7, "ed")])
 
-    def test_fetch_unordered_result_map(self):
+    def test_fetch_unordered_result_map(self, connection):
         users = self.tables.users
 
-        users.insert().execute(user_id=7, user_name="ed")
+        connection.execute(users.insert(), user_id=7, user_name="ed")
 
         class Goofy1(TypeDecorator):
             impl = String
@@ -963,28 +970,28 @@ class ResultProxyTest(fixtures.TablesTest):
             "select user_name as a, user_name as b, "
             "user_name as c from users"
         ).columns(a=Goofy1(), b=Goofy2(), c=Goofy3())
-        eq_(testing.db.execute(t).fetchall(), [("eda", "edb", "edc")])
+        eq_(connection.execute(t).fetchall(), [("eda", "edb", "edc")])
 
     @testing.requires.subqueries
-    def test_column_label_targeting(self):
+    def test_column_label_targeting(self, connection):
         users = self.tables.users
 
-        users.insert().execute(user_id=7, user_name="ed")
+        connection.execute(users.insert(), user_id=7, user_name="ed")
 
         for s in (
             users.select().alias("foo"),
             users.select().alias(users.name),
         ):
-            row = s.select(use_labels=True).execute().first()
+            row = connection.execute(s.select(use_labels=True)).first()
             eq_(row._mapping[s.c.user_id], 7)
             eq_(row._mapping[s.c.user_name], "ed")
 
     @testing.requires.python3
-    def test_ro_mapping_py3k(self):
+    def test_ro_mapping_py3k(self, connection):
         users = self.tables.users
 
-        users.insert().execute(user_id=1, user_name="foo")
-        result = users.select().execute()
+        connection.execute(users.insert(), user_id=1, user_name="foo")
+        result = connection.execute(users.select())
 
         row = result.first()
         dict_row = row._asdict()
@@ -1003,11 +1010,11 @@ class ResultProxyTest(fixtures.TablesTest):
         eq_(odict_row.items(), mapping_row.items())
 
     @testing.requires.python2
-    def test_ro_mapping_py2k(self):
+    def test_ro_mapping_py2k(self, connection):
         users = self.tables.users
 
-        users.insert().execute(user_id=1, user_name="foo")
-        result = users.select().execute()
+        connection.execute(users.insert(), user_id=1, user_name="foo")
+        result = connection.execute(users.select())
 
         row = result.first()
         dict_row = row._asdict()
@@ -1023,33 +1030,33 @@ class ResultProxyTest(fixtures.TablesTest):
         eq_(odict_row.values(), list(mapping_row.values()))
         eq_(odict_row.items(), list(mapping_row.items()))
 
-    def test_keys(self):
+    def test_keys(self, connection):
         users = self.tables.users
 
-        users.insert().execute(user_id=1, user_name="foo")
-        result = users.select().execute()
+        connection.execute(users.insert(), user_id=1, user_name="foo")
+        result = connection.execute(users.select())
         eq_(result.keys(), ["user_id", "user_name"])
         row = result.first()
         eq_(list(row._mapping.keys()), ["user_id", "user_name"])
         eq_(row._fields, ("user_id", "user_name"))
 
-    def test_row_keys_legacy_dont_warn(self):
+    def test_row_keys_legacy_dont_warn(self, connection):
         users = self.tables.users
 
-        users.insert().execute(user_id=1, user_name="foo")
-        result = users.select().execute()
+        connection.execute(users.insert(), user_id=1, user_name="foo")
+        result = connection.execute(users.select())
         row = result.first()
         # DO NOT WARN DEPRECATED IN 1.x, ONLY 2.0 WARNING
         eq_(dict(row), {"user_id": 1, "user_name": "foo"})
         eq_(row.keys(), ["user_id", "user_name"])
 
-    def test_keys_anon_labels(self):
+    def test_keys_anon_labels(self, connection):
         """test [ticket:3483]"""
 
         users = self.tables.users
 
-        users.insert().execute(user_id=1, user_name="foo")
-        result = testing.db.execute(
+        connection.execute(users.insert(), user_id=1, user_name="foo")
+        result = connection.execute(
             select(
                 [
                     users.c.user_id,
@@ -1064,11 +1071,11 @@ class ResultProxyTest(fixtures.TablesTest):
         eq_(row._fields, ("user_id", "user_name_1", "count_1"))
         eq_(list(row._mapping.keys()), ["user_id", "user_name_1", "count_1"])
 
-    def test_items(self):
+    def test_items(self, connection):
         users = self.tables.users
 
-        users.insert().execute(user_id=1, user_name="foo")
-        r = users.select().execute().first()
+        connection.execute(users.insert(), user_id=1, user_name="foo")
+        r = connection.execute(users.select()).first()
         eq_(
             [(x[0].lower(), x[1]) for x in list(r._mapping.items())],
             [("user_id", 1), ("user_name", "foo")],
@@ -1088,27 +1095,30 @@ class ResultProxyTest(fixtures.TablesTest):
         r = connection.exec_driver_sql("select user_name from users").first()
         eq_(len(r), 1)
 
-    def test_sorting_in_python(self):
+    def test_sorting_in_python(self, connection):
         users = self.tables.users
 
-        users.insert().execute(
+        connection.execute(
+            users.insert(),
             dict(user_id=1, user_name="foo"),
             dict(user_id=2, user_name="bar"),
             dict(user_id=3, user_name="def"),
         )
 
-        rows = users.select().order_by(users.c.user_name).execute().fetchall()
+        rows = connection.execute(
+            users.select().order_by(users.c.user_name)
+        ).fetchall()
 
         eq_(rows, [(2, "bar"), (3, "def"), (1, "foo")])
 
         eq_(sorted(rows), [(1, "foo"), (2, "bar"), (3, "def")])
 
-    def test_column_order_with_simple_query(self):
+    def test_column_order_with_simple_query(self, connection):
         # should return values in column definition order
         users = self.tables.users
 
-        users.insert().execute(user_id=1, user_name="foo")
-        r = users.select(users.c.user_id == 1).execute().first()
+        connection.execute(users.insert(), user_id=1, user_name="foo")
+        r = connection.execute(users.select(users.c.user_id == 1)).first()
         eq_(r[0], 1)
         eq_(r[1], "foo")
         eq_([x.lower() for x in r._fields], ["user_id", "user_name"])
@@ -1128,10 +1138,10 @@ class ResultProxyTest(fixtures.TablesTest):
         eq_([x.lower() for x in r._fields], ["user_name", "user_id"])
         eq_(list(r._mapping.values()), ["foo", 1])
 
-    @testing.crashes("oracle", "FIXME: unknown, varify not fails_on()")
+    @testing.crashes("oracle", "FIXME: unknown, verify not fails_on()")
     @testing.crashes("firebird", "An identifier must begin with a letter")
     @testing.provide_metadata
-    def test_column_accessor_shadow(self):
+    def test_column_accessor_shadow(self, connection):
         shadowed = Table(
             "test_shadowed",
             self.metadata,
@@ -1142,8 +1152,9 @@ class ResultProxyTest(fixtures.TablesTest):
             Column("_parent", VARCHAR(20)),
             Column("_row", VARCHAR(20)),
         )
-        self.metadata.create_all()
-        shadowed.insert().execute(
+        self.metadata.create_all(connection)
+        connection.execute(
+            shadowed.insert(),
             shadow_id=1,
             shadow_name="The Shadow",
             parent="The Light",
@@ -1151,7 +1162,9 @@ class ResultProxyTest(fixtures.TablesTest):
             _parent="Hidden parent",
             _row="Hidden row",
         )
-        r = shadowed.select(shadowed.c.shadow_id == 1).execute().first()
+        r = connection.execute(
+            shadowed.select(shadowed.c.shadow_id == 1)
+        ).first()
 
         eq_(r.shadow_id, 1)
         eq_(r._mapping["shadow_id"], 1)
@@ -1221,25 +1234,26 @@ class ResultProxyTest(fixtures.TablesTest):
         with patch.object(
             engine.dialect.execution_ctx_cls, "rowcount"
         ) as mock_rowcount:
-            mock_rowcount.__get__ = Mock()
-            engine.execute(
-                t.insert(), {"data": "d1"}, {"data": "d2"}, {"data": "d3"}
-            )
+            with engine.connect() as conn:
+                mock_rowcount.__get__ = Mock()
+                conn.execute(
+                    t.insert(), {"data": "d1"}, {"data": "d2"}, {"data": "d3"}
+                )
 
-            eq_(len(mock_rowcount.__get__.mock_calls), 0)
+                eq_(len(mock_rowcount.__get__.mock_calls), 0)
 
-            eq_(
-                engine.execute(t.select()).fetchall(),
-                [("d1",), ("d2",), ("d3",)],
-            )
-            eq_(len(mock_rowcount.__get__.mock_calls), 0)
+                eq_(
+                    conn.execute(t.select()).fetchall(),
+                    [("d1",), ("d2",), ("d3",)],
+                )
+                eq_(len(mock_rowcount.__get__.mock_calls), 0)
 
-            engine.execute(t.update(), {"data": "d4"})
+                conn.execute(t.update(), {"data": "d4"})
 
-            eq_(len(mock_rowcount.__get__.mock_calls), 1)
+                eq_(len(mock_rowcount.__get__.mock_calls), 1)
 
-            engine.execute(t.delete())
-            eq_(len(mock_rowcount.__get__.mock_calls), 2)
+                conn.execute(t.delete())
+                eq_(len(mock_rowcount.__get__.mock_calls), 2)
 
     def test_row_is_sequence(self):
 
@@ -1259,17 +1273,17 @@ class ResultProxyTest(fixtures.TablesTest):
         eq_(hash(row), hash((1, "value", "foo")))
 
     @testing.provide_metadata
-    def test_row_getitem_indexes_compiled(self):
+    def test_row_getitem_indexes_compiled(self, connection):
         values = Table(
             "rp",
             self.metadata,
             Column("key", String(10), primary_key=True),
             Column("value", String(10)),
         )
-        values.create()
+        values.create(connection)
 
-        testing.db.execute(values.insert(), dict(key="One", value="Uno"))
-        row = testing.db.execute(values.select()).first()
+        connection.execute(values.insert(), dict(key="One", value="Uno"))
+        row = connection.execute(values.select()).first()
         eq_(row._mapping["key"], "One")
         eq_(row._mapping["value"], "Uno")
         eq_(row[0], "One")
@@ -1293,7 +1307,7 @@ class ResultProxyTest(fixtures.TablesTest):
 
     @testing.requires.cextensions
     def test_row_c_sequence_check(self):
-
+        # TODO: modernize for 2.0
         metadata = MetaData()
         metadata.bind = "sqlite://"
         users = Table(
@@ -1409,39 +1423,39 @@ class KeyTargetingTest(fixtures.TablesTest):
                 )
 
     @testing.requires.schemas
-    def test_keyed_accessor_wschema(self):
+    def test_keyed_accessor_wschema(self, connection):
         keyed1 = self.tables["%s.wschema" % testing.config.test_schema]
-        row = testing.db.execute(keyed1.select()).first()
+        row = connection.execute(keyed1.select()).first()
 
         eq_(row.b, "a1")
         eq_(row.q, "c1")
         eq_(row.a, "a1")
         eq_(row.c, "c1")
 
-    def test_keyed_accessor_single(self):
+    def test_keyed_accessor_single(self, connection):
         keyed1 = self.tables.keyed1
-        row = testing.db.execute(keyed1.select()).first()
+        row = connection.execute(keyed1.select()).first()
 
         eq_(row.b, "a1")
         eq_(row.q, "c1")
         eq_(row.a, "a1")
         eq_(row.c, "c1")
 
-    def test_keyed_accessor_single_labeled(self):
+    def test_keyed_accessor_single_labeled(self, connection):
         keyed1 = self.tables.keyed1
-        row = testing.db.execute(keyed1.select().apply_labels()).first()
+        row = connection.execute(keyed1.select().apply_labels()).first()
 
         eq_(row.keyed1_b, "a1")
         eq_(row.keyed1_q, "c1")
         eq_(row.keyed1_a, "a1")
         eq_(row.keyed1_c, "c1")
 
-    def _test_keyed_targeting_no_label_at_all(self, expression):
+    def _test_keyed_targeting_no_label_at_all(self, expression, conn):
         lt = literal_column("2")
         stmt = select([literal_column("1"), expression, lt]).select_from(
             self.tables.keyed1
         )
-        row = testing.db.execute(stmt).first()
+        row = conn.execute(stmt).first()
 
         eq_(row._mapping[expression], "a1")
         eq_(row._mapping[lt], 2)
@@ -1450,7 +1464,7 @@ class KeyTargetingTest(fixtures.TablesTest):
         # easily.  we get around that because we know that "2" is unique
         eq_(row._mapping["2"], 2)
 
-    def test_keyed_targeting_no_label_at_all_one(self):
+    def test_keyed_targeting_no_label_at_all_one(self, connection):
         class not_named_max(expression.ColumnElement):
             name = "not_named_max"
 
@@ -1464,9 +1478,9 @@ class KeyTargetingTest(fixtures.TablesTest):
         eq_(str(select([not_named_max()])), "SELECT max(a)")
 
         nnm = not_named_max()
-        self._test_keyed_targeting_no_label_at_all(nnm)
+        self._test_keyed_targeting_no_label_at_all(nnm, connection)
 
-    def test_keyed_targeting_no_label_at_all_two(self):
+    def test_keyed_targeting_no_label_at_all_two(self, connection):
         class not_named_max(expression.ColumnElement):
             name = "not_named_max"
 
@@ -1479,24 +1493,24 @@ class KeyTargetingTest(fixtures.TablesTest):
         eq_(str(select([not_named_max()])), "SELECT max(a)")
 
         nnm = not_named_max()
-        self._test_keyed_targeting_no_label_at_all(nnm)
+        self._test_keyed_targeting_no_label_at_all(nnm, connection)
 
-    def test_keyed_targeting_no_label_at_all_text(self):
+    def test_keyed_targeting_no_label_at_all_text(self, connection):
         t1 = text("max(a)")
         t2 = text("min(a)")
 
         stmt = select([t1, t2]).select_from(self.tables.keyed1)
-        row = testing.db.execute(stmt).first()
+        row = connection.execute(stmt).first()
 
         eq_(row._mapping[t1], "a1")
         eq_(row._mapping[t2], "a1")
 
     @testing.requires.duplicate_names_in_cursor_description
-    def test_keyed_accessor_composite_conflict_2(self):
+    def test_keyed_accessor_composite_conflict_2(self, connection):
         keyed1 = self.tables.keyed1
         keyed2 = self.tables.keyed2
 
-        row = testing.db.execute(
+        row = connection.execute(
             select([keyed1, keyed2]).select_from(keyed1.join(keyed2, true()))
         ).first()
 
@@ -1522,14 +1536,16 @@ class KeyTargetingTest(fixtures.TablesTest):
         # illustrate why row.b above is ambiguous, and not "b2"; because
         # if we didn't have keyed2, now it matches row.a.  a new column
         # shouldn't be able to grab the value from a previous column.
-        row = testing.db.execute(select([keyed1])).first()
+        row = connection.execute(select([keyed1])).first()
         eq_(row.b, "a1")
 
-    def test_keyed_accessor_composite_conflict_2_fix_w_uselabels(self):
+    def test_keyed_accessor_composite_conflict_2_fix_w_uselabels(
+        self, connection
+    ):
         keyed1 = self.tables.keyed1
         keyed2 = self.tables.keyed2
 
-        row = testing.db.execute(
+        row = connection.execute(
             select([keyed1, keyed2])
             .select_from(keyed1.join(keyed2, true()))
             .apply_labels()
@@ -1541,11 +1557,11 @@ class KeyTargetingTest(fixtures.TablesTest):
         eq_(row._mapping["keyed2_b"], "b2")
         eq_(row._mapping["keyed1_a"], "a1")
 
-    def test_keyed_accessor_composite_names_precedent(self):
+    def test_keyed_accessor_composite_names_precedent(self, connection):
         keyed1 = self.tables.keyed1
         keyed4 = self.tables.keyed4
 
-        row = testing.db.execute(
+        row = connection.execute(
             select([keyed1, keyed4]).select_from(keyed1.join(keyed4, true()))
         ).first()
         eq_(row.b, "b4")
@@ -1554,11 +1570,11 @@ class KeyTargetingTest(fixtures.TablesTest):
         eq_(row.c, "c1")
 
     @testing.requires.duplicate_names_in_cursor_description
-    def test_keyed_accessor_composite_keys_precedent(self):
+    def test_keyed_accessor_composite_keys_precedent(self, connection):
         keyed1 = self.tables.keyed1
         keyed3 = self.tables.keyed3
 
-        row = testing.db.execute(
+        row = connection.execute(
             select([keyed1, keyed3]).select_from(keyed1.join(keyed3, true()))
         ).first()
         eq_(row.q, "c1")
@@ -1578,11 +1594,11 @@ class KeyTargetingTest(fixtures.TablesTest):
         )
         eq_(row.d, "d3")
 
-    def test_keyed_accessor_composite_labeled(self):
+    def test_keyed_accessor_composite_labeled(self, connection):
         keyed1 = self.tables.keyed1
         keyed2 = self.tables.keyed2
 
-        row = testing.db.execute(
+        row = connection.execute(
             select([keyed1, keyed2])
             .select_from(keyed1.join(keyed2, true()))
             .apply_labels()
@@ -1599,7 +1615,9 @@ class KeyTargetingTest(fixtures.TablesTest):
         assert_raises(KeyError, lambda: row._mapping["keyed2_c"])
         assert_raises(KeyError, lambda: row._mapping["keyed2_q"])
 
-    def test_keyed_accessor_column_is_repeated_multiple_times(self):
+    def test_keyed_accessor_column_is_repeated_multiple_times(
+        self, connection
+    ):
         # test new logic added as a result of the combination of #4892 and
         # #4887.   We allow duplicate columns, but we also have special logic
         # to disambiguate for the same column repeated, and as #4887 adds
@@ -1627,7 +1645,7 @@ class KeyTargetingTest(fixtures.TablesTest):
             .apply_labels()
         )
 
-        result = testing.db.execute(stmt)
+        result = connection.execute(stmt)
         is_false(result._metadata.matched_on_name)
 
         # ensure the result map is the same number of cols so we can
@@ -1664,34 +1682,34 @@ class KeyTargetingTest(fixtures.TablesTest):
         eq_(row[6], "d3")
         eq_(row[7], "d3")
 
-    def test_columnclause_schema_column_one(self):
+    def test_columnclause_schema_column_one(self, connection):
         # originally addressed by [ticket:2932], however liberalized
         # Column-targeting rules are deprecated
         a, b = sql.column("a"), sql.column("b")
         stmt = select([a, b]).select_from(table("keyed2"))
-        row = testing.db.execute(stmt).first()
+        row = connection.execute(stmt).first()
 
         in_(a, row._mapping)
         in_(b, row._mapping)
 
-    def test_columnclause_schema_column_two(self):
+    def test_columnclause_schema_column_two(self, connection):
         keyed2 = self.tables.keyed2
 
         stmt = select([keyed2.c.a, keyed2.c.b])
-        row = testing.db.execute(stmt).first()
+        row = connection.execute(stmt).first()
 
         in_(keyed2.c.a, row._mapping)
         in_(keyed2.c.b, row._mapping)
 
-    def test_columnclause_schema_column_three(self):
+    def test_columnclause_schema_column_three(self, connection):
         # this is also addressed by [ticket:2932]
         stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR)
-        row = testing.db.execute(stmt).first()
+        row = connection.execute(stmt).first()
 
         in_(stmt.selected_columns.a, row._mapping)
         in_(stmt.selected_columns.b, row._mapping)
 
-    def test_columnclause_schema_column_four(self):
+    def test_columnclause_schema_column_four(self, connection):
         # originally addressed by [ticket:2932], however liberalized
         # Column-targeting rules are deprecated
 
@@ -1699,7 +1717,7 @@ class KeyTargetingTest(fixtures.TablesTest):
         stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
             a, b
         )
-        row = testing.db.execute(stmt).first()
+        row = connection.execute(stmt).first()
 
         in_(a, row._mapping)
         in_(b, row._mapping)
@@ -1707,13 +1725,13 @@ class KeyTargetingTest(fixtures.TablesTest):
         in_(stmt.selected_columns.keyed2_a, row._mapping)
         in_(stmt.selected_columns.keyed2_b, row._mapping)
 
-    def test_columnclause_schema_column_five(self):
+    def test_columnclause_schema_column_five(self, connection):
         # this is also addressed by [ticket:2932]
 
         stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
             keyed2_a=CHAR, keyed2_b=CHAR
         )
-        row = testing.db.execute(stmt).first()
+        row = connection.execute(stmt).first()
 
         in_(stmt.selected_columns.keyed2_a, row._mapping)
         in_(stmt.selected_columns.keyed2_b, row._mapping)
@@ -1818,15 +1836,17 @@ class PositionalTextTest(fixtures.TablesTest):
 
     @classmethod
     def insert_data(cls):
-        cls.tables.text1.insert().execute(
-            [dict(a="a1", b="b1", c="c1", d="d1")]
-        )
+        with testing.db.connect() as conn:
+            conn.execute(
+                cls.tables.text1.insert(),
+                [dict(a="a1", b="b1", c="c1", d="d1")],
+            )
 
-    def test_via_column(self):
+    def test_via_column(self, connection):
         c1, c2, c3, c4 = column("q"), column("p"), column("r"), column("d")
         stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
 
-        result = testing.db.execute(stmt)
+        result = connection.execute(stmt)
         row = result.first()
 
         eq_(row._mapping[c2], "b1")
@@ -1838,23 +1858,23 @@ class PositionalTextTest(fixtures.TablesTest):
         eq_(row._mapping["r"], "c1")
         eq_(row._mapping["d"], "d1")
 
-    def test_fewer_cols_than_sql_positional(self):
+    def test_fewer_cols_than_sql_positional(self, connection):
         c1, c2 = column("q"), column("p")
         stmt = text("select a, b, c, d from text1").columns(c1, c2)
 
         # no warning as this can be similar for non-positional
-        result = testing.db.execute(stmt)
+        result = connection.execute(stmt)
         row = result.first()
 
         eq_(row._mapping[c1], "a1")
         eq_(row._mapping["c"], "c1")
 
-    def test_fewer_cols_than_sql_non_positional(self):
+    def test_fewer_cols_than_sql_non_positional(self, connection):
         c1, c2 = column("a"), column("p")
         stmt = text("select a, b, c, d from text1").columns(c2, c1, d=CHAR)
 
         # no warning as this can be similar for non-positional
-        result = testing.db.execute(stmt)
+        result = connection.execute(stmt)
         row = result.first()
 
         # c1 name matches, locates
@@ -1868,7 +1888,7 @@ class PositionalTextTest(fixtures.TablesTest):
             lambda: row._mapping[c2],
         )
 
-    def test_more_cols_than_sql_positional(self):
+    def test_more_cols_than_sql_positional(self, connection):
         c1, c2, c3, c4 = column("q"), column("p"), column("r"), column("d")
         stmt = text("select a, b from text1").columns(c1, c2, c3, c4)
 
@@ -1876,7 +1896,7 @@ class PositionalTextTest(fixtures.TablesTest):
             r"Number of columns in textual SQL \(4\) is "
             r"smaller than number of columns requested \(2\)"
         ):
-            result = testing.db.execute(stmt)
+            result = connection.execute(stmt)
 
         row = result.first()
         eq_(row._mapping[c2], "b1")
@@ -1887,14 +1907,14 @@ class PositionalTextTest(fixtures.TablesTest):
             lambda: row._mapping[c3],
         )
 
-    def test_more_cols_than_sql_nonpositional(self):
+    def test_more_cols_than_sql_nonpositional(self, connection):
         c1, c2, c3, c4 = column("b"), column("a"), column("r"), column("d")
         stmt = TextualSelect(
             text("select a, b from text1"), [c1, c2, c3, c4], positional=False
         )
 
         # no warning for non-positional
-        result = testing.db.execute(stmt)
+        result = connection.execute(stmt)
 
         row = result.first()
         eq_(row._mapping[c1], "b1")
@@ -1906,7 +1926,7 @@ class PositionalTextTest(fixtures.TablesTest):
             lambda: row._mapping[c3],
         )
 
-    def test_more_cols_than_sql_nonpositional_labeled_cols(self):
+    def test_more_cols_than_sql_nonpositional_labeled_cols(self, connection):
         text1 = self.tables.text1
         c1, c2, c3, c4 = text1.c.b, text1.c.a, column("r"), column("d")
 
@@ -1919,7 +1939,7 @@ class PositionalTextTest(fixtures.TablesTest):
         )
 
         # no warning for non-positional
-        result = testing.db.execute(stmt)
+        result = connection.execute(stmt)
 
         row = result.first()
         eq_(row._mapping[c1], "b1")
@@ -1931,7 +1951,7 @@ class PositionalTextTest(fixtures.TablesTest):
             lambda: row._mapping[c3],
         )
 
-    def test_dupe_col_obj(self):
+    def test_dupe_col_obj(self, connection):
         c1, c2, c3 = column("q"), column("p"), column("r")
         stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c2)
 
@@ -1939,11 +1959,11 @@ class PositionalTextTest(fixtures.TablesTest):
             exc.InvalidRequestError,
             "Duplicate column expression requested in "
             "textual SQL: <.*.ColumnClause.*; p>",
-            testing.db.execute,
+            connection.execute,
             stmt,
         )
 
-    def test_anon_aliased_unique(self):
+    def test_anon_aliased_unique(self, connection):
         text1 = self.tables.text1
 
         c1 = text1.c.a.label(None)
@@ -1952,7 +1972,7 @@ class PositionalTextTest(fixtures.TablesTest):
         c4 = text1.alias().c.d.label(None)
 
         stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
-        result = testing.db.execute(stmt)
+        result = connection.execute(stmt)
         row = result.first()
 
         eq_(row._mapping[c1], "a1")
@@ -1968,7 +1988,7 @@ class PositionalTextTest(fixtures.TablesTest):
             lambda: row._mapping[text1.c.b],
         )
 
-    def test_anon_aliased_overlapping(self):
+    def test_anon_aliased_overlapping(self, connection):
         text1 = self.tables.text1
 
         c1 = text1.c.a.label(None)
@@ -1977,7 +1997,7 @@ class PositionalTextTest(fixtures.TablesTest):
         c4 = text1.c.a.label(None)
 
         stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
-        result = testing.db.execute(stmt)
+        result = connection.execute(stmt)
         row = result.first()
 
         eq_(row._mapping[c1], "a1")
@@ -1985,7 +2005,7 @@ class PositionalTextTest(fixtures.TablesTest):
         eq_(row._mapping[c3], "c1")
         eq_(row._mapping[c4], "d1")
 
-    def test_anon_aliased_name_conflict(self):
+    def test_anon_aliased_name_conflict(self, connection):
         text1 = self.tables.text1
 
         c1 = text1.c.a.label("a")
@@ -1998,7 +2018,7 @@ class PositionalTextTest(fixtures.TablesTest):
         stmt = text("select a, b as a, c as a, d as a from text1").columns(
             c1, c2, c3, c4
         )
-        result = testing.db.execute(stmt)
+        result = connection.execute(stmt)
         row = result.first()
 
         eq_(row._mapping[c1], "a1")
@@ -2034,10 +2054,11 @@ class AlternateResultProxyTest(fixtures.TablesTest):
 
     @classmethod
     def insert_data(cls):
-        cls.engine.execute(
-            cls.tables.test.insert(),
-            [{"x": i, "y": "t_%d" % i} for i in range(1, 12)],
-        )
+        with cls.engine.connect() as conn:
+            conn.execute(
+                cls.tables.test.insert(),
+                [{"x": i, "y": "t_%d" % i} for i in range(1, 12)],
+            )
 
     @contextmanager
     def _proxy_fixture(self, cls):
@@ -2059,53 +2080,54 @@ class AlternateResultProxyTest(fixtures.TablesTest):
     def _test_proxy(self, cls):
         with self._proxy_fixture(cls):
             rows = []
-            r = self.engine.execute(select([self.table]))
-            assert isinstance(r.cursor_strategy, cls)
-            for i in range(5):
-                rows.append(r.fetchone())
-            eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
+            with self.engine.connect() as conn:
+                r = conn.execute(select([self.table]))
+                assert isinstance(r.cursor_strategy, cls)
+                for i in range(5):
+                    rows.append(r.fetchone())
+                eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
 
-            rows = r.fetchmany(3)
-            eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)])
+                rows = r.fetchmany(3)
+                eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)])
 
-            rows = r.fetchall()
-            eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)])
+                rows = r.fetchall()
+                eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)])
 
-            r = self.engine.execute(select([self.table]))
-            rows = r.fetchmany(None)
-            eq_(rows[0], (1, "t_1"))
-            # number of rows here could be one, or the whole thing
-            assert len(rows) == 1 or len(rows) == 11
+                r = conn.execute(select([self.table]))
+                rows = r.fetchmany(None)
+                eq_(rows[0], (1, "t_1"))
+                # number of rows here could be one, or the whole thing
+                assert len(rows) == 1 or len(rows) == 11
 
-            r = self.engine.execute(select([self.table]).limit(1))
-            r.fetchone()
-            eq_(r.fetchone(), None)
+                r = conn.execute(select([self.table]).limit(1))
+                r.fetchone()
+                eq_(r.fetchone(), None)
 
-            r = self.engine.execute(select([self.table]).limit(5))
-            rows = r.fetchmany(6)
-            eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
+                r = conn.execute(select([self.table]).limit(5))
+                rows = r.fetchmany(6)
+                eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
 
-            # result keeps going just fine with blank results...
-            eq_(r.fetchmany(2), [])
+                # result keeps going just fine with blank results...
+                eq_(r.fetchmany(2), [])
 
-            eq_(r.fetchmany(2), [])
+                eq_(r.fetchmany(2), [])
 
-            eq_(r.fetchall(), [])
+                eq_(r.fetchall(), [])
 
-            eq_(r.fetchone(), None)
+                eq_(r.fetchone(), None)
 
-            # until we close
-            r.close()
+                # until we close
+                r.close()
 
-            self._assert_result_closed(r)
+                self._assert_result_closed(r)
 
-            r = self.engine.execute(select([self.table]).limit(5))
-            eq_(r.first(), (1, "t_1"))
-            self._assert_result_closed(r)
+                r = conn.execute(select([self.table]).limit(5))
+                eq_(r.first(), (1, "t_1"))
+                self._assert_result_closed(r)
 
-            r = self.engine.execute(select([self.table]).limit(5))
-            eq_(r.scalar(), 1)
-            self._assert_result_closed(r)
+                r = conn.execute(select([self.table]).limit(5))
+                eq_(r.scalar(), 1)
+                self._assert_result_closed(r)
 
     def _assert_result_closed(self, r):
         assert_raises_message(