From: Gord Thompson Date: Sun, 12 Apr 2020 21:27:06 +0000 (-0600) Subject: Clean up .execute in test/sql/test_deprecations.py X-Git-Tag: rel_1_4_0b1~386^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a731c005b84517fe6f857467151ce9a3aa60119c;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Clean up .execute in test/sql/test_deprecations.py Change-Id: I488f0992d5f26e164a903cbced11422046403647 --- diff --git a/test/sql/test_deprecations.py b/test/sql/test_deprecations.py index 7c115789bb..13d4cd1547 100644 --- a/test/sql/test_deprecations.py +++ b/test/sql/test_deprecations.py @@ -880,20 +880,24 @@ class KeyTargetingTest(fixtures.TablesTest): @classmethod def insert_data(cls): - cls.tables.keyed1.insert().execute(dict(b="a1", q="c1")) - cls.tables.keyed2.insert().execute(dict(a="a2", b="b2")) - cls.tables.keyed3.insert().execute(dict(a="a3", d="d3")) - cls.tables.keyed4.insert().execute(dict(b="b4", q="q4")) - cls.tables.content.insert().execute(type="t1") - - if testing.requires.schemas.enabled: - cls.tables[ - "%s.wschema" % testing.config.test_schema - ].insert().execute(dict(b="a1", q="c1")) + with testing.db.connect() as conn: + conn.execute(cls.tables.keyed1.insert(), dict(b="a1", q="c1")) + conn.execute(cls.tables.keyed2.insert(), dict(a="a2", b="b2")) + conn.execute(cls.tables.keyed3.insert(), dict(a="a3", d="d3")) + conn.execute(cls.tables.keyed4.insert(), dict(b="b4", q="q4")) + conn.execute(cls.tables.content.insert(), type="t1") + + if testing.requires.schemas.enabled: + conn.execute( + cls.tables[ + "%s.wschema" % testing.config.test_schema + ].insert(), + dict(b="a1", q="c1"), + ) - def test_column_label_overlap_fallback(self): + def test_column_label_overlap_fallback(self, connection): content, bar = self.tables.content, self.tables.bar - row = testing.db.execute( + row = connection.execute( select([content.c.type.label("content_type")]) ).first() @@ -906,7 +910,7 @@ class KeyTargetingTest(fixtures.TablesTest): ): in_(sql.column("content_type"), row) - row = testing.db.execute( + row = connection.execute( select([func.now().label("content_type")]) ).first() not_in_(content.c.type, row) @@ -917,7 +921,7 @@ class KeyTargetingTest(fixtures.TablesTest): ): in_(sql.column("content_type"), row) - def test_columnclause_schema_column_one(self): + def test_columnclause_schema_column_one(self, connection): keyed2 = self.tables.keyed2 # this is addressed by [ticket:2932] @@ -926,7 +930,7 @@ class KeyTargetingTest(fixtures.TablesTest): # cols, which results in a more liberal comparison scheme a, b = sql.column("a"), sql.column("b") stmt = select([a, b]).select_from(table("keyed2")) - row = testing.db.execute(stmt).first() + row = connection.execute(stmt).first() with testing.expect_deprecated( "Retreiving row values using Column objects " @@ -939,12 +943,12 @@ class KeyTargetingTest(fixtures.TablesTest): ): in_(keyed2.c.b, row) - def test_columnclause_schema_column_two(self): + def test_columnclause_schema_column_two(self, connection): keyed2 = self.tables.keyed2 a, b = sql.column("a"), sql.column("b") stmt = select([keyed2.c.a, keyed2.c.b]) - row = testing.db.execute(stmt).first() + row = connection.execute(stmt).first() with testing.expect_deprecated( "Retreiving row values using Column objects " @@ -957,7 +961,7 @@ class KeyTargetingTest(fixtures.TablesTest): ): in_(b, row) - def test_columnclause_schema_column_three(self): + def test_columnclause_schema_column_three(self, connection): keyed2 = self.tables.keyed2 # originally addressed by [ticket:2932], however liberalized @@ -965,7 +969,7 @@ class KeyTargetingTest(fixtures.TablesTest): a, b = sql.column("a"), sql.column("b") stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR) - row = testing.db.execute(stmt).first() + row = connection.execute(stmt).first() with testing.expect_deprecated( "Retreiving row values using Column objects " @@ -1000,7 +1004,7 @@ class KeyTargetingTest(fixtures.TablesTest): ): in_(stmt.c.b, row) - def test_columnclause_schema_column_four(self): + def test_columnclause_schema_column_four(self, connection): keyed2 = self.tables.keyed2 # this is also addressed by [ticket:2932] @@ -1009,7 +1013,7 @@ class KeyTargetingTest(fixtures.TablesTest): stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( a, b ) - row = testing.db.execute(stmt).first() + row = connection.execute(stmt).first() with testing.expect_deprecated( "Retreiving row values using Column objects " @@ -1034,7 +1038,7 @@ class KeyTargetingTest(fixtures.TablesTest): ): in_(stmt.c.keyed2_b, row) - def test_columnclause_schema_column_five(self): + def test_columnclause_schema_column_five(self, connection): keyed2 = self.tables.keyed2 # this is also addressed by [ticket:2932] @@ -1042,7 +1046,7 @@ class KeyTargetingTest(fixtures.TablesTest): stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( keyed2_a=CHAR, keyed2_b=CHAR ) - row = testing.db.execute(stmt).first() + row = connection.execute(stmt).first() with testing.expect_deprecated( "Retreiving row values using Column objects " @@ -1115,12 +1119,12 @@ class ResultProxyTest(fixtures.TablesTest): dict(user_id=2, user_name="jack"), ) - def test_column_accessor_textual_select(self): + def test_column_accessor_textual_select(self, connection): users = self.tables.users # this will create column() objects inside # the select(), these need to match on name anyway - r = testing.db.execute( + r = connection.execute( select([column("user_id"), column("user_name")]) .select_from(table("users")) .where(text("user_id=2")) @@ -1147,10 +1151,10 @@ class ResultProxyTest(fixtures.TablesTest): ): eq_(r._mapping[users.c.user_name], "jack") - def test_column_accessor_basic_text(self): + def test_column_accessor_basic_text(self, connection): users = self.tables.users - r = testing.db.execute( + r = connection.execute( text("select * from users where user_id=2") ).first() @@ -1185,13 +1189,13 @@ class ResultProxyTest(fixtures.TablesTest): eq_(r._mapping[users.c.user_name], "jack") @testing.provide_metadata - def test_column_label_overlap_fallback(self): + def test_column_label_overlap_fallback(self, connection): content = Table("content", self.metadata, Column("type", String(30))) bar = Table("bar", self.metadata, Column("content_type", String(30))) self.metadata.create_all(testing.db) - testing.db.execute(content.insert().values(type="t1")) + connection.execute(content.insert().values(type="t1")) - row = testing.db.execute(content.select(use_labels=True)).first() + row = connection.execute(content.select(use_labels=True)).first() in_(content.c.type, row._mapping) not_in_(bar.c.content_type, row) with testing.expect_deprecated( @@ -1200,7 +1204,7 @@ class ResultProxyTest(fixtures.TablesTest): ): in_(sql.column("content_type"), row) - row = testing.db.execute( + row = connection.execute( select([content.c.type.label("content_type")]) ).first() with testing.expect_deprecated( @@ -1217,7 +1221,7 @@ class ResultProxyTest(fixtures.TablesTest): ): in_(sql.column("content_type"), row) - row = testing.db.execute( + row = connection.execute( select([func.now().label("content_type")]) ).first() @@ -1243,80 +1247,80 @@ class ResultProxyTest(fixtures.TablesTest): {"user_id": 9, "user_name": "fred"}, ) - for pickle in False, True: - for use_labels in False, True: - result = ( - users.select(use_labels=use_labels) - .order_by(users.c.user_id) - .execute() - .fetchall() - ) + for pickle in False, True: + for use_labels in False, True: + result = conn.execute( + users.select(use_labels=use_labels).order_by( + users.c.user_id + ) + ).fetchall() + + if pickle: + result = util.pickle.loads(util.pickle.dumps(result)) + + if pickle: + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "from a row that was unpickled" + ): + eq_(result[0]._mapping[users.c.user_id], 7) + + result[0]._keymap.pop(users.c.user_id) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "from a row that was unpickled" + ): + eq_(result[0]._mapping[users.c.user_id], 7) + + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "from a row that was unpickled" + ): + eq_(result[0]._mapping[users.c.user_name], "jack") + + result[0]._keymap.pop(users.c.user_name) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "from a row that was unpickled" + ): + eq_(result[0]._mapping[users.c.user_name], "jack") + + if not pickle or use_labels: + assert_raises( + exc.NoSuchColumnError, + lambda: result[0][addresses.c.user_id], + ) + + assert_raises( + exc.NoSuchColumnError, + lambda: result[0]._mapping[addresses.c.user_id], + ) + else: + # test with a different table. name resolution is + # causing 'user_id' to match when use_labels wasn't + # used. + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "from a row that was unpickled" + ): + eq_(result[0]._mapping[addresses.c.user_id], 7) + + result[0]._keymap.pop(addresses.c.user_id) + with testing.expect_deprecated( + "Retreiving row values using Column objects " + "from a row that was unpickled" + ): + eq_(result[0]._mapping[addresses.c.user_id], 7) - if pickle: - result = util.pickle.loads(util.pickle.dumps(result)) - - if pickle: - with testing.expect_deprecated( - "Retreiving row values using Column objects " - "from a row that was unpickled" - ): - eq_(result[0]._mapping[users.c.user_id], 7) - - result[0]._keymap.pop(users.c.user_id) - with testing.expect_deprecated( - "Retreiving row values using Column objects " - "from a row that was unpickled" - ): - eq_(result[0]._mapping[users.c.user_id], 7) - - with testing.expect_deprecated( - "Retreiving row values using Column objects " - "from a row that was unpickled" - ): - eq_(result[0]._mapping[users.c.user_name], "jack") - - result[0]._keymap.pop(users.c.user_name) - with testing.expect_deprecated( - "Retreiving row values using Column objects " - "from a row that was unpickled" - ): - eq_(result[0]._mapping[users.c.user_name], "jack") - - if not pickle or use_labels: assert_raises( exc.NoSuchColumnError, - lambda: result[0][addresses.c.user_id], + lambda: result[0][addresses.c.address_id], ) assert_raises( exc.NoSuchColumnError, - lambda: result[0]._mapping[addresses.c.user_id], + lambda: result[0]._mapping[addresses.c.address_id], ) - else: - # test with a different table. name resolution is - # causing 'user_id' to match when use_labels wasn't used. - with testing.expect_deprecated( - "Retreiving row values using Column objects " - "from a row that was unpickled" - ): - eq_(result[0]._mapping[addresses.c.user_id], 7) - - result[0]._keymap.pop(addresses.c.user_id) - with testing.expect_deprecated( - "Retreiving row values using Column objects " - "from a row that was unpickled" - ): - eq_(result[0]._mapping[addresses.c.user_id], 7) - - assert_raises( - exc.NoSuchColumnError, - lambda: result[0][addresses.c.address_id], - ) - - assert_raises( - exc.NoSuchColumnError, - lambda: result[0]._mapping[addresses.c.address_id], - ) @testing.requires.duplicate_names_in_cursor_description def test_ambiguous_column_case_sensitive(self): @@ -1325,105 +1329,111 @@ class ResultProxyTest(fixtures.TablesTest): ): eng = engines.testing_engine(options=dict(case_sensitive=False)) - row = eng.execute( - select( - [ - literal_column("1").label("SOMECOL"), - literal_column("1").label("SOMECOL"), - ] - ) - ).first() + with eng.connect() as conn: + row = conn.execute( + select( + [ + literal_column("1").label("SOMECOL"), + literal_column("1").label("SOMECOL"), + ] + ) + ).first() - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: row._mapping["somecol"], - ) + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: row._mapping["somecol"], + ) - def test_row_getitem_string(self): - with testing.db.connect() as conn: - col = literal_column("1").label("foo") - row = conn.execute(select([col])).first() + def test_row_getitem_string(self, connection): + col = literal_column("1").label("foo") + row = connection.execute(select([col])).first() - with testing.expect_deprecated( - "Using non-integer/slice indices on Row is deprecated " - "and will be removed in version 2.0;" - ): - eq_(row["foo"], 1) + with testing.expect_deprecated( + "Using non-integer/slice indices on Row is deprecated " + "and will be removed in version 2.0;" + ): + eq_(row["foo"], 1) - eq_(row._mapping["foo"], 1) + eq_(row._mapping["foo"], 1) - def test_row_getitem_column(self): - with testing.db.connect() as conn: - col = literal_column("1").label("foo") - row = conn.execute(select([col])).first() + def test_row_getitem_column(self, connection): + col = literal_column("1").label("foo") + row = connection.execute(select([col])).first() - with testing.expect_deprecated( - "Using non-integer/slice indices on Row is deprecated " - "and will be removed in version 2.0;" - ): - eq_(row[col], 1) + with testing.expect_deprecated( + "Using non-integer/slice indices on Row is deprecated " + "and will be removed in version 2.0;" + ): + eq_(row[col], 1) - eq_(row._mapping[col], 1) + eq_(row._mapping[col], 1) def test_row_case_insensitive(self): with testing.expect_deprecated( "The create_engine.case_sensitive parameter is deprecated" ): - ins_db = engines.testing_engine(options={"case_sensitive": False}) - row = ins_db.execute( - select( - [ - literal_column("1").label("case_insensitive"), - literal_column("2").label("CaseSensitive"), - ] - ) - ).first() + with engines.testing_engine( + options={"case_sensitive": False} + ).connect() as ins_conn: + row = ins_conn.execute( + select( + [ + literal_column("1").label("case_insensitive"), + literal_column("2").label("CaseSensitive"), + ] + ) + ).first() - eq_(list(row._mapping.keys()), ["case_insensitive", "CaseSensitive"]) + eq_( + list(row._mapping.keys()), + ["case_insensitive", "CaseSensitive"], + ) - in_("case_insensitive", row._keymap) - in_("CaseSensitive", row._keymap) - in_("casesensitive", row._keymap) + in_("case_insensitive", row._keymap) + in_("CaseSensitive", row._keymap) + in_("casesensitive", row._keymap) - eq_(row._mapping["case_insensitive"], 1) - eq_(row._mapping["CaseSensitive"], 2) - eq_(row._mapping["Case_insensitive"], 1) - eq_(row._mapping["casesensitive"], 2) + eq_(row._mapping["case_insensitive"], 1) + eq_(row._mapping["CaseSensitive"], 2) + eq_(row._mapping["Case_insensitive"], 1) + eq_(row._mapping["casesensitive"], 2) def test_row_case_insensitive_unoptimized(self): with testing.expect_deprecated( "The create_engine.case_sensitive parameter is deprecated" ): - ins_db = engines.testing_engine(options={"case_sensitive": False}) - row = ins_db.execute( - select( - [ - literal_column("1").label("case_insensitive"), - literal_column("2").label("CaseSensitive"), - text("3 AS screw_up_the_cols"), - ] - ) - ).first() + with engines.testing_engine( + options={"case_sensitive": False} + ).connect() as ins_conn: + row = ins_conn.execute( + select( + [ + literal_column("1").label("case_insensitive"), + literal_column("2").label("CaseSensitive"), + text("3 AS screw_up_the_cols"), + ] + ) + ).first() - eq_( - list(row._mapping.keys()), - ["case_insensitive", "CaseSensitive", "screw_up_the_cols"], - ) + eq_( + list(row._mapping.keys()), + ["case_insensitive", "CaseSensitive", "screw_up_the_cols"], + ) - in_("case_insensitive", row._keymap) - in_("CaseSensitive", row._keymap) - in_("casesensitive", row._keymap) + in_("case_insensitive", row._keymap) + in_("CaseSensitive", row._keymap) + in_("casesensitive", row._keymap) - eq_(row._mapping["case_insensitive"], 1) - eq_(row._mapping["CaseSensitive"], 2) - eq_(row._mapping["screw_up_the_cols"], 3) - eq_(row._mapping["Case_insensitive"], 1) - eq_(row._mapping["casesensitive"], 2) - eq_(row._mapping["screw_UP_the_cols"], 3) + eq_(row._mapping["case_insensitive"], 1) + eq_(row._mapping["CaseSensitive"], 2) + eq_(row._mapping["screw_up_the_cols"], 3) + eq_(row._mapping["Case_insensitive"], 1) + eq_(row._mapping["casesensitive"], 2) + eq_(row._mapping["screw_UP_the_cols"], 3) - def test_row_keys_deprecated(self): - r = testing.db.execute( + def test_row_keys_deprecated(self, connection): + r = connection.execute( text("select * from users where user_id=2") ).first() @@ -1432,8 +1442,8 @@ class ResultProxyTest(fixtures.TablesTest): ): eq_(r.keys(), ["user_id", "user_name"]) - def test_row_contains_key_deprecated(self): - r = testing.db.execute( + def test_row_contains_key_deprecated(self, connection): + r = connection.execute( text("select * from users where user_id=2") ).first() @@ -1472,11 +1482,13 @@ class PositionalTextTest(fixtures.TablesTest): @classmethod def insert_data(cls): - cls.tables.text1.insert().execute( - [dict(a="a1", b="b1", c="c1", d="d1")] - ) + with testing.db.connect() as conn: + conn.execute( + cls.tables.text1.insert(), + [dict(a="a1", b="b1", c="c1", d="d1")], + ) - def test_anon_aliased_overlapping(self): + def test_anon_aliased_overlapping(self, connection): text1 = self.tables.text1 c1 = text1.c.a.label(None) @@ -1485,7 +1497,7 @@ class PositionalTextTest(fixtures.TablesTest): c4 = text1.c.a.label(None) stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) - result = testing.db.execute(stmt) + result = connection.execute(stmt) row = result.first() with testing.expect_deprecated( @@ -1494,7 +1506,7 @@ class PositionalTextTest(fixtures.TablesTest): ): eq_(row._mapping[text1.c.a], "a1") - def test_anon_aliased_unique(self): + def test_anon_aliased_unique(self, connection): text1 = self.tables.text1 c1 = text1.c.a.label(None) @@ -1503,7 +1515,7 @@ class PositionalTextTest(fixtures.TablesTest): c4 = text1.alias().c.d.label(None) stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) - result = testing.db.execute(stmt) + result = connection.execute(stmt) row = result.first() eq_(row._mapping[c1], "a1")