]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Clean up .execute in test/sql/test_deprecations.py
authorGord Thompson <gord@gordthompson.com>
Sun, 12 Apr 2020 21:27:06 +0000 (15:27 -0600)
committerGord Thompson <gord@gordthompson.com>
Sun, 12 Apr 2020 22:12:35 +0000 (16:12 -0600)
Change-Id: I488f0992d5f26e164a903cbced11422046403647

test/sql/test_deprecations.py

index 7c115789bb72c0395c4afd551cafdbed4a5ad59a..13d4cd1547dcf66b11e055b8fb45609c100269b0 100644 (file)
@@ -880,20 +880,24 @@ class KeyTargetingTest(fixtures.TablesTest):
 
     @classmethod
     def insert_data(cls):
-        cls.tables.keyed1.insert().execute(dict(b="a1", q="c1"))
-        cls.tables.keyed2.insert().execute(dict(a="a2", b="b2"))
-        cls.tables.keyed3.insert().execute(dict(a="a3", d="d3"))
-        cls.tables.keyed4.insert().execute(dict(b="b4", q="q4"))
-        cls.tables.content.insert().execute(type="t1")
-
-        if testing.requires.schemas.enabled:
-            cls.tables[
-                "%s.wschema" % testing.config.test_schema
-            ].insert().execute(dict(b="a1", q="c1"))
+        with testing.db.connect() as conn:
+            conn.execute(cls.tables.keyed1.insert(), dict(b="a1", q="c1"))
+            conn.execute(cls.tables.keyed2.insert(), dict(a="a2", b="b2"))
+            conn.execute(cls.tables.keyed3.insert(), dict(a="a3", d="d3"))
+            conn.execute(cls.tables.keyed4.insert(), dict(b="b4", q="q4"))
+            conn.execute(cls.tables.content.insert(), type="t1")
+
+            if testing.requires.schemas.enabled:
+                conn.execute(
+                    cls.tables[
+                        "%s.wschema" % testing.config.test_schema
+                    ].insert(),
+                    dict(b="a1", q="c1"),
+                )
 
-    def test_column_label_overlap_fallback(self):
+    def test_column_label_overlap_fallback(self, connection):
         content, bar = self.tables.content, self.tables.bar
-        row = testing.db.execute(
+        row = connection.execute(
             select([content.c.type.label("content_type")])
         ).first()
 
@@ -906,7 +910,7 @@ class KeyTargetingTest(fixtures.TablesTest):
         ):
             in_(sql.column("content_type"), row)
 
-        row = testing.db.execute(
+        row = connection.execute(
             select([func.now().label("content_type")])
         ).first()
         not_in_(content.c.type, row)
@@ -917,7 +921,7 @@ class KeyTargetingTest(fixtures.TablesTest):
         ):
             in_(sql.column("content_type"), row)
 
-    def test_columnclause_schema_column_one(self):
+    def test_columnclause_schema_column_one(self, connection):
         keyed2 = self.tables.keyed2
 
         # this is addressed by [ticket:2932]
@@ -926,7 +930,7 @@ class KeyTargetingTest(fixtures.TablesTest):
         # cols, which results in a more liberal comparison scheme
         a, b = sql.column("a"), sql.column("b")
         stmt = select([a, b]).select_from(table("keyed2"))
-        row = testing.db.execute(stmt).first()
+        row = connection.execute(stmt).first()
 
         with testing.expect_deprecated(
             "Retreiving row values using Column objects "
@@ -939,12 +943,12 @@ class KeyTargetingTest(fixtures.TablesTest):
         ):
             in_(keyed2.c.b, row)
 
-    def test_columnclause_schema_column_two(self):
+    def test_columnclause_schema_column_two(self, connection):
         keyed2 = self.tables.keyed2
 
         a, b = sql.column("a"), sql.column("b")
         stmt = select([keyed2.c.a, keyed2.c.b])
-        row = testing.db.execute(stmt).first()
+        row = connection.execute(stmt).first()
 
         with testing.expect_deprecated(
             "Retreiving row values using Column objects "
@@ -957,7 +961,7 @@ class KeyTargetingTest(fixtures.TablesTest):
         ):
             in_(b, row)
 
-    def test_columnclause_schema_column_three(self):
+    def test_columnclause_schema_column_three(self, connection):
         keyed2 = self.tables.keyed2
 
         # originally addressed by [ticket:2932], however liberalized
@@ -965,7 +969,7 @@ class KeyTargetingTest(fixtures.TablesTest):
 
         a, b = sql.column("a"), sql.column("b")
         stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR)
-        row = testing.db.execute(stmt).first()
+        row = connection.execute(stmt).first()
 
         with testing.expect_deprecated(
             "Retreiving row values using Column objects "
@@ -1000,7 +1004,7 @@ class KeyTargetingTest(fixtures.TablesTest):
         ):
             in_(stmt.c.b, row)
 
-    def test_columnclause_schema_column_four(self):
+    def test_columnclause_schema_column_four(self, connection):
         keyed2 = self.tables.keyed2
 
         # this is also addressed by [ticket:2932]
@@ -1009,7 +1013,7 @@ class KeyTargetingTest(fixtures.TablesTest):
         stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
             a, b
         )
-        row = testing.db.execute(stmt).first()
+        row = connection.execute(stmt).first()
 
         with testing.expect_deprecated(
             "Retreiving row values using Column objects "
@@ -1034,7 +1038,7 @@ class KeyTargetingTest(fixtures.TablesTest):
         ):
             in_(stmt.c.keyed2_b, row)
 
-    def test_columnclause_schema_column_five(self):
+    def test_columnclause_schema_column_five(self, connection):
         keyed2 = self.tables.keyed2
 
         # this is also addressed by [ticket:2932]
@@ -1042,7 +1046,7 @@ class KeyTargetingTest(fixtures.TablesTest):
         stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
             keyed2_a=CHAR, keyed2_b=CHAR
         )
-        row = testing.db.execute(stmt).first()
+        row = connection.execute(stmt).first()
 
         with testing.expect_deprecated(
             "Retreiving row values using Column objects "
@@ -1115,12 +1119,12 @@ class ResultProxyTest(fixtures.TablesTest):
                 dict(user_id=2, user_name="jack"),
             )
 
-    def test_column_accessor_textual_select(self):
+    def test_column_accessor_textual_select(self, connection):
         users = self.tables.users
 
         # this will create column() objects inside
         # the select(), these need to match on name anyway
-        r = testing.db.execute(
+        r = connection.execute(
             select([column("user_id"), column("user_name")])
             .select_from(table("users"))
             .where(text("user_id=2"))
@@ -1147,10 +1151,10 @@ class ResultProxyTest(fixtures.TablesTest):
         ):
             eq_(r._mapping[users.c.user_name], "jack")
 
-    def test_column_accessor_basic_text(self):
+    def test_column_accessor_basic_text(self, connection):
         users = self.tables.users
 
-        r = testing.db.execute(
+        r = connection.execute(
             text("select * from users where user_id=2")
         ).first()
 
@@ -1185,13 +1189,13 @@ class ResultProxyTest(fixtures.TablesTest):
             eq_(r._mapping[users.c.user_name], "jack")
 
     @testing.provide_metadata
-    def test_column_label_overlap_fallback(self):
+    def test_column_label_overlap_fallback(self, connection):
         content = Table("content", self.metadata, Column("type", String(30)))
         bar = Table("bar", self.metadata, Column("content_type", String(30)))
         self.metadata.create_all(testing.db)
-        testing.db.execute(content.insert().values(type="t1"))
+        connection.execute(content.insert().values(type="t1"))
 
-        row = testing.db.execute(content.select(use_labels=True)).first()
+        row = connection.execute(content.select(use_labels=True)).first()
         in_(content.c.type, row._mapping)
         not_in_(bar.c.content_type, row)
         with testing.expect_deprecated(
@@ -1200,7 +1204,7 @@ class ResultProxyTest(fixtures.TablesTest):
         ):
             in_(sql.column("content_type"), row)
 
-        row = testing.db.execute(
+        row = connection.execute(
             select([content.c.type.label("content_type")])
         ).first()
         with testing.expect_deprecated(
@@ -1217,7 +1221,7 @@ class ResultProxyTest(fixtures.TablesTest):
         ):
             in_(sql.column("content_type"), row)
 
-        row = testing.db.execute(
+        row = connection.execute(
             select([func.now().label("content_type")])
         ).first()
 
@@ -1243,80 +1247,80 @@ class ResultProxyTest(fixtures.TablesTest):
                 {"user_id": 9, "user_name": "fred"},
             )
 
-        for pickle in False, True:
-            for use_labels in False, True:
-                result = (
-                    users.select(use_labels=use_labels)
-                    .order_by(users.c.user_id)
-                    .execute()
-                    .fetchall()
-                )
+            for pickle in False, True:
+                for use_labels in False, True:
+                    result = conn.execute(
+                        users.select(use_labels=use_labels).order_by(
+                            users.c.user_id
+                        )
+                    ).fetchall()
+
+                    if pickle:
+                        result = util.pickle.loads(util.pickle.dumps(result))
+
+                    if pickle:
+                        with testing.expect_deprecated(
+                            "Retreiving row values using Column objects "
+                            "from a row that was unpickled"
+                        ):
+                            eq_(result[0]._mapping[users.c.user_id], 7)
+
+                        result[0]._keymap.pop(users.c.user_id)
+                        with testing.expect_deprecated(
+                            "Retreiving row values using Column objects "
+                            "from a row that was unpickled"
+                        ):
+                            eq_(result[0]._mapping[users.c.user_id], 7)
+
+                        with testing.expect_deprecated(
+                            "Retreiving row values using Column objects "
+                            "from a row that was unpickled"
+                        ):
+                            eq_(result[0]._mapping[users.c.user_name], "jack")
+
+                        result[0]._keymap.pop(users.c.user_name)
+                        with testing.expect_deprecated(
+                            "Retreiving row values using Column objects "
+                            "from a row that was unpickled"
+                        ):
+                            eq_(result[0]._mapping[users.c.user_name], "jack")
+
+                    if not pickle or use_labels:
+                        assert_raises(
+                            exc.NoSuchColumnError,
+                            lambda: result[0][addresses.c.user_id],
+                        )
+
+                        assert_raises(
+                            exc.NoSuchColumnError,
+                            lambda: result[0]._mapping[addresses.c.user_id],
+                        )
+                    else:
+                        # test with a different table.  name resolution is
+                        # causing 'user_id' to match when use_labels wasn't
+                        # used.
+                        with testing.expect_deprecated(
+                            "Retreiving row values using Column objects "
+                            "from a row that was unpickled"
+                        ):
+                            eq_(result[0]._mapping[addresses.c.user_id], 7)
+
+                        result[0]._keymap.pop(addresses.c.user_id)
+                        with testing.expect_deprecated(
+                            "Retreiving row values using Column objects "
+                            "from a row that was unpickled"
+                        ):
+                            eq_(result[0]._mapping[addresses.c.user_id], 7)
 
-                if pickle:
-                    result = util.pickle.loads(util.pickle.dumps(result))
-
-                if pickle:
-                    with testing.expect_deprecated(
-                        "Retreiving row values using Column objects "
-                        "from a row that was unpickled"
-                    ):
-                        eq_(result[0]._mapping[users.c.user_id], 7)
-
-                    result[0]._keymap.pop(users.c.user_id)
-                    with testing.expect_deprecated(
-                        "Retreiving row values using Column objects "
-                        "from a row that was unpickled"
-                    ):
-                        eq_(result[0]._mapping[users.c.user_id], 7)
-
-                    with testing.expect_deprecated(
-                        "Retreiving row values using Column objects "
-                        "from a row that was unpickled"
-                    ):
-                        eq_(result[0]._mapping[users.c.user_name], "jack")
-
-                    result[0]._keymap.pop(users.c.user_name)
-                    with testing.expect_deprecated(
-                        "Retreiving row values using Column objects "
-                        "from a row that was unpickled"
-                    ):
-                        eq_(result[0]._mapping[users.c.user_name], "jack")
-
-                if not pickle or use_labels:
                     assert_raises(
                         exc.NoSuchColumnError,
-                        lambda: result[0][addresses.c.user_id],
+                        lambda: result[0][addresses.c.address_id],
                     )
 
                     assert_raises(
                         exc.NoSuchColumnError,
-                        lambda: result[0]._mapping[addresses.c.user_id],
+                        lambda: result[0]._mapping[addresses.c.address_id],
                     )
-                else:
-                    # test with a different table.  name resolution is
-                    # causing 'user_id' to match when use_labels wasn't used.
-                    with testing.expect_deprecated(
-                        "Retreiving row values using Column objects "
-                        "from a row that was unpickled"
-                    ):
-                        eq_(result[0]._mapping[addresses.c.user_id], 7)
-
-                    result[0]._keymap.pop(addresses.c.user_id)
-                    with testing.expect_deprecated(
-                        "Retreiving row values using Column objects "
-                        "from a row that was unpickled"
-                    ):
-                        eq_(result[0]._mapping[addresses.c.user_id], 7)
-
-                assert_raises(
-                    exc.NoSuchColumnError,
-                    lambda: result[0][addresses.c.address_id],
-                )
-
-                assert_raises(
-                    exc.NoSuchColumnError,
-                    lambda: result[0]._mapping[addresses.c.address_id],
-                )
 
     @testing.requires.duplicate_names_in_cursor_description
     def test_ambiguous_column_case_sensitive(self):
@@ -1325,105 +1329,111 @@ class ResultProxyTest(fixtures.TablesTest):
         ):
             eng = engines.testing_engine(options=dict(case_sensitive=False))
 
-        row = eng.execute(
-            select(
-                [
-                    literal_column("1").label("SOMECOL"),
-                    literal_column("1").label("SOMECOL"),
-                ]
-            )
-        ).first()
+        with eng.connect() as conn:
+            row = conn.execute(
+                select(
+                    [
+                        literal_column("1").label("SOMECOL"),
+                        literal_column("1").label("SOMECOL"),
+                    ]
+                )
+            ).first()
 
-        assert_raises_message(
-            exc.InvalidRequestError,
-            "Ambiguous column name",
-            lambda: row._mapping["somecol"],
-        )
+            assert_raises_message(
+                exc.InvalidRequestError,
+                "Ambiguous column name",
+                lambda: row._mapping["somecol"],
+            )
 
-    def test_row_getitem_string(self):
-        with testing.db.connect() as conn:
-            col = literal_column("1").label("foo")
-            row = conn.execute(select([col])).first()
+    def test_row_getitem_string(self, connection):
+        col = literal_column("1").label("foo")
+        row = connection.execute(select([col])).first()
 
-            with testing.expect_deprecated(
-                "Using non-integer/slice indices on Row is deprecated "
-                "and will be removed in version 2.0;"
-            ):
-                eq_(row["foo"], 1)
+        with testing.expect_deprecated(
+            "Using non-integer/slice indices on Row is deprecated "
+            "and will be removed in version 2.0;"
+        ):
+            eq_(row["foo"], 1)
 
-            eq_(row._mapping["foo"], 1)
+        eq_(row._mapping["foo"], 1)
 
-    def test_row_getitem_column(self):
-        with testing.db.connect() as conn:
-            col = literal_column("1").label("foo")
-            row = conn.execute(select([col])).first()
+    def test_row_getitem_column(self, connection):
+        col = literal_column("1").label("foo")
+        row = connection.execute(select([col])).first()
 
-            with testing.expect_deprecated(
-                "Using non-integer/slice indices on Row is deprecated "
-                "and will be removed in version 2.0;"
-            ):
-                eq_(row[col], 1)
+        with testing.expect_deprecated(
+            "Using non-integer/slice indices on Row is deprecated "
+            "and will be removed in version 2.0;"
+        ):
+            eq_(row[col], 1)
 
-            eq_(row._mapping[col], 1)
+        eq_(row._mapping[col], 1)
 
     def test_row_case_insensitive(self):
         with testing.expect_deprecated(
             "The create_engine.case_sensitive parameter is deprecated"
         ):
-            ins_db = engines.testing_engine(options={"case_sensitive": False})
-        row = ins_db.execute(
-            select(
-                [
-                    literal_column("1").label("case_insensitive"),
-                    literal_column("2").label("CaseSensitive"),
-                ]
-            )
-        ).first()
+            with engines.testing_engine(
+                options={"case_sensitive": False}
+            ).connect() as ins_conn:
+                row = ins_conn.execute(
+                    select(
+                        [
+                            literal_column("1").label("case_insensitive"),
+                            literal_column("2").label("CaseSensitive"),
+                        ]
+                    )
+                ).first()
 
-        eq_(list(row._mapping.keys()), ["case_insensitive", "CaseSensitive"])
+                eq_(
+                    list(row._mapping.keys()),
+                    ["case_insensitive", "CaseSensitive"],
+                )
 
-        in_("case_insensitive", row._keymap)
-        in_("CaseSensitive", row._keymap)
-        in_("casesensitive", row._keymap)
+                in_("case_insensitive", row._keymap)
+                in_("CaseSensitive", row._keymap)
+                in_("casesensitive", row._keymap)
 
-        eq_(row._mapping["case_insensitive"], 1)
-        eq_(row._mapping["CaseSensitive"], 2)
-        eq_(row._mapping["Case_insensitive"], 1)
-        eq_(row._mapping["casesensitive"], 2)
+                eq_(row._mapping["case_insensitive"], 1)
+                eq_(row._mapping["CaseSensitive"], 2)
+                eq_(row._mapping["Case_insensitive"], 1)
+                eq_(row._mapping["casesensitive"], 2)
 
     def test_row_case_insensitive_unoptimized(self):
         with testing.expect_deprecated(
             "The create_engine.case_sensitive parameter is deprecated"
         ):
-            ins_db = engines.testing_engine(options={"case_sensitive": False})
-        row = ins_db.execute(
-            select(
-                [
-                    literal_column("1").label("case_insensitive"),
-                    literal_column("2").label("CaseSensitive"),
-                    text("3 AS screw_up_the_cols"),
-                ]
-            )
-        ).first()
+            with engines.testing_engine(
+                options={"case_sensitive": False}
+            ).connect() as ins_conn:
+                row = ins_conn.execute(
+                    select(
+                        [
+                            literal_column("1").label("case_insensitive"),
+                            literal_column("2").label("CaseSensitive"),
+                            text("3 AS screw_up_the_cols"),
+                        ]
+                    )
+                ).first()
 
-        eq_(
-            list(row._mapping.keys()),
-            ["case_insensitive", "CaseSensitive", "screw_up_the_cols"],
-        )
+                eq_(
+                    list(row._mapping.keys()),
+                    ["case_insensitive", "CaseSensitive", "screw_up_the_cols"],
+                )
 
-        in_("case_insensitive", row._keymap)
-        in_("CaseSensitive", row._keymap)
-        in_("casesensitive", row._keymap)
+                in_("case_insensitive", row._keymap)
+                in_("CaseSensitive", row._keymap)
+                in_("casesensitive", row._keymap)
 
-        eq_(row._mapping["case_insensitive"], 1)
-        eq_(row._mapping["CaseSensitive"], 2)
-        eq_(row._mapping["screw_up_the_cols"], 3)
-        eq_(row._mapping["Case_insensitive"], 1)
-        eq_(row._mapping["casesensitive"], 2)
-        eq_(row._mapping["screw_UP_the_cols"], 3)
+                eq_(row._mapping["case_insensitive"], 1)
+                eq_(row._mapping["CaseSensitive"], 2)
+                eq_(row._mapping["screw_up_the_cols"], 3)
+                eq_(row._mapping["Case_insensitive"], 1)
+                eq_(row._mapping["casesensitive"], 2)
+                eq_(row._mapping["screw_UP_the_cols"], 3)
 
-    def test_row_keys_deprecated(self):
-        r = testing.db.execute(
+    def test_row_keys_deprecated(self, connection):
+        r = connection.execute(
             text("select * from users where user_id=2")
         ).first()
 
@@ -1432,8 +1442,8 @@ class ResultProxyTest(fixtures.TablesTest):
         ):
             eq_(r.keys(), ["user_id", "user_name"])
 
-    def test_row_contains_key_deprecated(self):
-        r = testing.db.execute(
+    def test_row_contains_key_deprecated(self, connection):
+        r = connection.execute(
             text("select * from users where user_id=2")
         ).first()
 
@@ -1472,11 +1482,13 @@ class PositionalTextTest(fixtures.TablesTest):
 
     @classmethod
     def insert_data(cls):
-        cls.tables.text1.insert().execute(
-            [dict(a="a1", b="b1", c="c1", d="d1")]
-        )
+        with testing.db.connect() as conn:
+            conn.execute(
+                cls.tables.text1.insert(),
+                [dict(a="a1", b="b1", c="c1", d="d1")],
+            )
 
-    def test_anon_aliased_overlapping(self):
+    def test_anon_aliased_overlapping(self, connection):
         text1 = self.tables.text1
 
         c1 = text1.c.a.label(None)
@@ -1485,7 +1497,7 @@ class PositionalTextTest(fixtures.TablesTest):
         c4 = text1.c.a.label(None)
 
         stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
-        result = testing.db.execute(stmt)
+        result = connection.execute(stmt)
         row = result.first()
 
         with testing.expect_deprecated(
@@ -1494,7 +1506,7 @@ class PositionalTextTest(fixtures.TablesTest):
         ):
             eq_(row._mapping[text1.c.a], "a1")
 
-    def test_anon_aliased_unique(self):
+    def test_anon_aliased_unique(self, connection):
         text1 = self.tables.text1
 
         c1 = text1.c.a.label(None)
@@ -1503,7 +1515,7 @@ class PositionalTextTest(fixtures.TablesTest):
         c4 = text1.alias().c.d.label(None)
 
         stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
-        result = testing.db.execute(stmt)
+        result = connection.execute(stmt)
         row = result.first()
 
         eq_(row._mapping[c1], "a1")