From: Gord Thompson Date: Sun, 12 Apr 2020 17:58:58 +0000 (-0600) Subject: Clean up .execute in test/sql/test_resultset.py X-Git-Tag: rel_1_4_0b1~387^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=6eb11d877799cc42983c011ff16730af174b71e5;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Clean up .execute in test/sql/test_resultset.py Change-Id: Ie4bb2b8e94361d15a3f1f1c21ce1c59cff1cf735 --- diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py index 253ad7b38e..f8d245228a 100644 --- a/test/sql/test_resultset.py +++ b/test/sql/test_resultset.py @@ -90,29 +90,31 @@ class ResultProxyTest(fixtures.TablesTest): test_needs_acid=True, ) - def test_row_iteration(self): + def test_row_iteration(self, connection): users = self.tables.users - users.insert().execute( + connection.execute( + users.insert(), {"user_id": 7, "user_name": "jack"}, {"user_id": 8, "user_name": "ed"}, {"user_id": 9, "user_name": "fred"}, ) - r = users.select().execute() + r = connection.execute(users.select()) rows = [] for row in r: rows.append(row) eq_(len(rows), 3) - def test_row_next(self): + def test_row_next(self, connection): users = self.tables.users - users.insert().execute( + connection.execute( + users.insert(), {"user_id": 7, "user_name": "jack"}, {"user_id": 8, "user_name": "ed"}, {"user_id": 9, "user_name": "fred"}, ) - r = users.select().execute() + r = connection.execute(users.select()) rows = [] while True: row = next(r, "foo") @@ -122,10 +124,11 @@ class ResultProxyTest(fixtures.TablesTest): eq_(len(rows), 3) @testing.requires.subqueries - def test_anonymous_rows(self): + def test_anonymous_rows(self, connection): users = self.tables.users - users.insert().execute( + connection.execute( + users.insert(), {"user_id": 7, "user_name": "jack"}, {"user_id": 8, "user_name": "ed"}, {"user_id": 9, "user_name": "fred"}, @@ -136,15 +139,17 @@ class ResultProxyTest(fixtures.TablesTest): .where(users.c.user_name == "jack") .scalar_subquery() ) - for row in select([sel + 1, sel + 3], bind=users.bind).execute(): + for row in connection.execute( + select([sel + 1, sel + 3], bind=users.bind) + ): eq_(row._mapping["anon_1"], 8) eq_(row._mapping["anon_2"], 10) - def test_row_comparison(self): + def test_row_comparison(self, connection): users = self.tables.users - users.insert().execute(user_id=7, user_name="jack") - rp = users.select().execute().first() + connection.execute(users.insert(), user_id=7, user_name="jack") + rp = connection.execute(users.select()).first() eq_(rp, rp) is_(not (rp != rp), True) @@ -192,19 +197,19 @@ class ResultProxyTest(fixtures.TablesTest): eq_(control, op(compare, rp)) @testing.provide_metadata - def test_column_label_overlap_fallback(self): + def test_column_label_overlap_fallback(self, connection): content = Table("content", self.metadata, Column("type", String(30))) bar = Table("bar", self.metadata, Column("content_type", String(30))) - self.metadata.create_all(testing.db) - testing.db.execute(content.insert().values(type="t1")) + self.metadata.create_all(connection) + connection.execute(content.insert().values(type="t1")) - row = testing.db.execute(content.select(use_labels=True)).first() + row = connection.execute(content.select(use_labels=True)).first() in_(content.c.type, row._mapping) not_in_(bar.c.content_type, row._mapping) not_in_(bar.c.content_type, row._mapping) - row = testing.db.execute( + row = connection.execute( select([func.now().label("content_type")]) ).first() @@ -212,10 +217,11 @@ class ResultProxyTest(fixtures.TablesTest): not_in_(bar.c.content_type, row._mapping) - def test_pickled_rows(self): + def test_pickled_rows(self, connection): users = self.tables.users - users.insert().execute( + connection.execute( + users.insert(), {"user_id": 7, "user_name": "jack"}, {"user_id": 8, "user_name": "ed"}, {"user_id": 9, "user_name": "fred"}, @@ -223,12 +229,11 @@ class ResultProxyTest(fixtures.TablesTest): for pickle in False, True: for use_labels in False, True: - result = ( - users.select(use_labels=use_labels) - .order_by(users.c.user_id) - .execute() - .fetchall() - ) + result = connection.execute( + users.select(use_labels=use_labels).order_by( + users.c.user_id + ) + ).fetchall() if pickle: result = util.pickle.loads(util.pickle.dumps(result)) @@ -255,8 +260,8 @@ class ResultProxyTest(fixtures.TablesTest): lambda: result[0]._mapping["fake key"], ) - def test_column_error_printing(self): - result = testing.db.execute(select([1])) + def test_column_error_printing(self, connection): + result = connection.execute(select([1])) row = result.first() class unprintable(object): @@ -283,42 +288,45 @@ class ResultProxyTest(fixtures.TablesTest): lambda: row._mapping[accessor], ) - def test_fetchmany(self): + def test_fetchmany(self, connection): users = self.tables.users - users.insert().execute(user_id=7, user_name="jack") - users.insert().execute(user_id=8, user_name="ed") - users.insert().execute(user_id=9, user_name="fred") - r = users.select().execute() + connection.execute(users.insert(), user_id=7, user_name="jack") + connection.execute(users.insert(), user_id=8, user_name="ed") + connection.execute(users.insert(), user_id=9, user_name="fred") + r = connection.execute(users.select()) rows = [] for row in r.fetchmany(size=2): rows.append(row) eq_(len(rows), 2) - def test_column_slices(self): + def test_column_slices(self, connection): users = self.tables.users addresses = self.tables.addresses - users.insert().execute(user_id=1, user_name="john") - users.insert().execute(user_id=2, user_name="jack") - addresses.insert().execute( - address_id=1, user_id=2, address="foo@bar.com" + connection.execute(users.insert(), user_id=1, user_name="john") + connection.execute(users.insert(), user_id=2, user_name="jack") + connection.execute( + addresses.insert(), address_id=1, user_id=2, address="foo@bar.com" ) - r = text("select * from addresses", bind=testing.db).execute().first() + r = connection.execute( + text("select * from addresses", bind=testing.db) + ).first() eq_(r[0:1], (1,)) eq_(r[1:], (2, "foo@bar.com")) eq_(r[:-1], (1, 2)) - def test_column_accessor_basic_compiled_mapping(self): + def test_column_accessor_basic_compiled_mapping(self, connection): users = self.tables.users - users.insert().execute( + connection.execute( + users.insert(), dict(user_id=1, user_name="john"), dict(user_id=2, user_name="jack"), ) - r = users.select(users.c.user_id == 2).execute().first() + r = connection.execute(users.select(users.c.user_id == 2)).first() eq_(r.user_id, 2) eq_(r._mapping["user_id"], 2) eq_(r._mapping[users.c.user_id], 2) @@ -327,15 +335,16 @@ class ResultProxyTest(fixtures.TablesTest): eq_(r._mapping["user_name"], "jack") eq_(r._mapping[users.c.user_name], "jack") - def test_column_accessor_basic_compiled_traditional(self): + def test_column_accessor_basic_compiled_traditional(self, connection): users = self.tables.users - users.insert().execute( + connection.execute( + users.insert(), dict(user_id=1, user_name="john"), dict(user_id=2, user_name="jack"), ) - r = users.select(users.c.user_id == 2).execute().first() + r = connection.execute(users.select(users.c.user_id == 2)).first() eq_(r.user_id, 2) eq_(r._mapping["user_id"], 2) @@ -345,28 +354,30 @@ class ResultProxyTest(fixtures.TablesTest): eq_(r._mapping["user_name"], "jack") eq_(r._mapping[users.c.user_name], "jack") - def test_row_getitem_string(self): + def test_row_getitem_string(self, connection): users = self.tables.users - users.insert().execute( + connection.execute( + users.insert(), dict(user_id=1, user_name="john"), dict(user_id=2, user_name="jack"), ) - r = testing.db.execute( + r = connection.execute( text("select * from users where user_id=2") ).first() eq_(r._mapping["user_name"], "jack") - def test_column_accessor_basic_text(self): + def test_column_accessor_basic_text(self, connection): users = self.tables.users - users.insert().execute( + connection.execute( + users.insert(), dict(user_id=1, user_name="john"), dict(user_id=2, user_name="jack"), ) - r = testing.db.execute( + r = connection.execute( text("select * from users where user_id=2") ).first() @@ -379,14 +390,15 @@ class ResultProxyTest(fixtures.TablesTest): eq_(r.user_name, "jack") eq_(r._mapping["user_name"], "jack") - def test_column_accessor_text_colexplicit(self): + def test_column_accessor_text_colexplicit(self, connection): users = self.tables.users - users.insert().execute( + connection.execute( + users.insert(), dict(user_id=1, user_name="john"), dict(user_id=2, user_name="jack"), ) - r = testing.db.execute( + r = connection.execute( text("select * from users where user_id=2").columns( users.c.user_id, users.c.user_name ) @@ -400,16 +412,17 @@ class ResultProxyTest(fixtures.TablesTest): eq_(r._mapping["user_name"], "jack") eq_(r._mapping[users.c.user_name], "jack") - def test_column_accessor_textual_select(self): + def test_column_accessor_textual_select(self, connection): users = self.tables.users - users.insert().execute( + connection.execute( + users.insert(), dict(user_id=1, user_name="john"), dict(user_id=2, user_name="jack"), ) # this will create column() objects inside # the select(), these need to match on name anyway - r = testing.db.execute( + r = connection.execute( select([column("user_id"), column("user_name")]) .select_from(table("users")) .where(text("user_id=2")) @@ -422,14 +435,14 @@ class ResultProxyTest(fixtures.TablesTest): eq_(r.user_name, "jack") eq_(r._mapping["user_name"], "jack") - def test_column_accessor_dotted_union(self): + def test_column_accessor_dotted_union(self, connection): users = self.tables.users - users.insert().execute(dict(user_id=1, user_name="john")) + connection.execute(users.insert(), dict(user_id=1, user_name="john")) # test a little sqlite < 3.10.0 weirdness - with the UNION, # cols come back as "users.user_id" in cursor.description - r = testing.db.execute( + r = connection.execute( text( "select users.user_id, users.user_name " "from users " @@ -441,23 +454,20 @@ class ResultProxyTest(fixtures.TablesTest): eq_(r._mapping["user_name"], "john") eq_(list(r._fields), ["user_id", "user_name"]) - def test_column_accessor_sqlite_raw(self): + def test_column_accessor_sqlite_raw(self, connection): users = self.tables.users - users.insert().execute(dict(user_id=1, user_name="john")) + connection.execute(users.insert(), dict(user_id=1, user_name="john")) - r = ( + r = connection.execute( text( "select users.user_id, users.user_name " "from users " "UNION select users.user_id, " "users.user_name from users", bind=testing.db, - ) - .execution_options(sqlite_raw_colnames=True) - .execute() - .first() - ) + ).execution_options(sqlite_raw_colnames=True) + ).first() if testing.against("sqlite < 3.10.0"): not_in_("user_id", r) @@ -474,12 +484,12 @@ class ResultProxyTest(fixtures.TablesTest): eq_(list(r._fields), ["user_id", "user_name"]) - def test_column_accessor_sqlite_translated(self): + def test_column_accessor_sqlite_translated(self, connection): users = self.tables.users - users.insert().execute(dict(user_id=1, user_name="john")) + connection.execute(users.insert(), dict(user_id=1, user_name="john")) - r = ( + r = connection.execute( text( "select users.user_id, users.user_name " "from users " @@ -487,9 +497,7 @@ class ResultProxyTest(fixtures.TablesTest): "users.user_name from users", bind=testing.db, ) - .execute() - .first() - ) + ).first() eq_(r._mapping["user_id"], 1) eq_(r._mapping["user_name"], "john") @@ -502,44 +510,38 @@ class ResultProxyTest(fixtures.TablesTest): eq_(list(r._fields), ["user_id", "user_name"]) - def test_column_accessor_labels_w_dots(self): + def test_column_accessor_labels_w_dots(self, connection): users = self.tables.users - users.insert().execute(dict(user_id=1, user_name="john")) + connection.execute(users.insert(), dict(user_id=1, user_name="john")) # test using literal tablename.colname - r = ( + r = connection.execute( text( 'select users.user_id AS "users.user_id", ' 'users.user_name AS "users.user_name" ' "from users", bind=testing.db, - ) - .execution_options(sqlite_raw_colnames=True) - .execute() - .first() - ) + ).execution_options(sqlite_raw_colnames=True) + ).first() eq_(r._mapping["users.user_id"], 1) eq_(r._mapping["users.user_name"], "john") not_in_("user_name", r._mapping) eq_(list(r._fields), ["users.user_id", "users.user_name"]) - def test_column_accessor_unary(self): + def test_column_accessor_unary(self, connection): users = self.tables.users - users.insert().execute(dict(user_id=1, user_name="john")) + connection.execute(users.insert(), dict(user_id=1, user_name="john")) # unary expressions - r = ( - select([users.c.user_name.distinct()]) - .order_by(users.c.user_name) - .execute() - .first() - ) + r = connection.execute( + select([users.c.user_name.distinct()]).order_by(users.c.user_name) + ).first() eq_(r._mapping[users.c.user_name], "john") eq_(r.user_name, "john") - def test_column_accessor_err(self): - r = testing.db.execute(select([1])).first() + def test_column_accessor_err(self, connection): + r = connection.execute(select([1])).first() assert_raises_message( AttributeError, "Could not locate column in row for column 'foo'", @@ -601,6 +603,7 @@ class ResultProxyTest(fixtures.TablesTest): ) def test_connectionless_autoclose_rows_exhausted(self): + # TODO: deprecate for 2.0 users = self.tables.users with testing.db.connect() as conn: conn.execute(users.insert(), dict(user_id=1, user_name="john")) @@ -615,6 +618,7 @@ class ResultProxyTest(fixtures.TablesTest): @testing.requires.returning def test_connectionless_autoclose_crud_rows_exhausted(self): + # TODO: deprecate for 2.0 users = self.tables.users stmt = ( users.insert() @@ -630,6 +634,7 @@ class ResultProxyTest(fixtures.TablesTest): assert connection.closed def test_connectionless_autoclose_no_rows(self): + # TODO: deprecate for 2.0 result = testing.db.execute(text("select * from users")) connection = result.connection assert not connection.closed @@ -638,6 +643,7 @@ class ResultProxyTest(fixtures.TablesTest): @testing.requires.updateable_autoincrement_pks def test_connectionless_autoclose_no_metadata(self): + # TODO: deprecate for 2.0 result = testing.db.execute(text("update users set user_id=5")) connection = result.connection assert connection.closed @@ -647,8 +653,8 @@ class ResultProxyTest(fixtures.TablesTest): result.fetchone, ) - def test_row_case_sensitive(self): - row = testing.db.execute( + def test_row_case_sensitive(self, connection): + row = connection.execute( select( [ literal_column("1").label("case_insensitive"), @@ -670,75 +676,80 @@ class ResultProxyTest(fixtures.TablesTest): assert_raises(KeyError, lambda: row._mapping["casesensitive"]) def test_row_case_sensitive_unoptimized(self): - ins_db = engines.testing_engine() - row = ins_db.execute( - select( - [ - literal_column("1").label("case_insensitive"), - literal_column("2").label("CaseSensitive"), - text("3 AS screw_up_the_cols"), - ] - ) - ).first() + with engines.testing_engine().connect() as ins_conn: + row = ins_conn.execute( + select( + [ + literal_column("1").label("case_insensitive"), + literal_column("2").label("CaseSensitive"), + text("3 AS screw_up_the_cols"), + ] + ) + ).first() - eq_( - list(row._fields), - ["case_insensitive", "CaseSensitive", "screw_up_the_cols"], - ) + eq_( + list(row._fields), + ["case_insensitive", "CaseSensitive", "screw_up_the_cols"], + ) - in_("case_insensitive", row._keymap) - in_("CaseSensitive", row._keymap) - not_in_("casesensitive", row._keymap) + in_("case_insensitive", row._keymap) + in_("CaseSensitive", row._keymap) + not_in_("casesensitive", row._keymap) - eq_(row._mapping["case_insensitive"], 1) - eq_(row._mapping["CaseSensitive"], 2) - eq_(row._mapping["screw_up_the_cols"], 3) + eq_(row._mapping["case_insensitive"], 1) + eq_(row._mapping["CaseSensitive"], 2) + eq_(row._mapping["screw_up_the_cols"], 3) - assert_raises(KeyError, lambda: row._mapping["Case_insensitive"]) - assert_raises(KeyError, lambda: row._mapping["casesensitive"]) - assert_raises(KeyError, lambda: row._mapping["screw_UP_the_cols"]) + assert_raises(KeyError, lambda: row._mapping["Case_insensitive"]) + assert_raises(KeyError, lambda: row._mapping["casesensitive"]) + assert_raises(KeyError, lambda: row._mapping["screw_UP_the_cols"]) - def test_row_as_args(self): + def test_row_as_args(self, connection): users = self.tables.users - users.insert().execute(user_id=1, user_name="john") - r = users.select(users.c.user_id == 1).execute().first() - users.delete().execute() - users.insert().execute(r._mapping) - eq_(users.select().execute().fetchall(), [(1, "john")]) + connection.execute(users.insert(), user_id=1, user_name="john") + r = connection.execute(users.select(users.c.user_id == 1)).first() + connection.execute(users.delete()) + connection.execute(users.insert(), r._mapping) + eq_(connection.execute(users.select()).fetchall(), [(1, "john")]) - def test_result_as_args(self): + def test_result_as_args(self, connection): users = self.tables.users users2 = self.tables.users2 - users.insert().execute( + connection.execute( + users.insert(), [ dict(user_id=1, user_name="john"), dict(user_id=2, user_name="ed"), - ] + ], ) - r = users.select().execute() - users2.insert().execute([row._mapping for row in r]) + r = connection.execute(users.select()) + connection.execute(users2.insert(), [row._mapping for row in r]) eq_( - users2.select().order_by(users2.c.user_id).execute().fetchall(), + connection.execute( + users2.select().order_by(users2.c.user_id) + ).fetchall(), [(1, "john"), (2, "ed")], ) - users2.delete().execute() - r = users.select().execute() - users2.insert().execute(*[row._mapping for row in r]) + connection.execute(users2.delete()) + r = connection.execute(users.select()) + connection.execute(users2.insert(), *[row._mapping for row in r]) eq_( - users2.select().order_by(users2.c.user_id).execute().fetchall(), + connection.execute( + users2.select().order_by(users2.c.user_id) + ).fetchall(), [(1, "john"), (2, "ed")], ) @testing.requires.duplicate_names_in_cursor_description - def test_ambiguous_column(self): + def test_ambiguous_column(self, connection): users = self.tables.users addresses = self.tables.addresses - users.insert().execute(user_id=1, user_name="john") - result = users.outerjoin(addresses).select().execute() + connection.execute(users.insert(), user_id=1, user_name="john") + result = connection.execute(users.outerjoin(addresses).select()) r = result.first() assert_raises_message( @@ -776,7 +787,7 @@ class ResultProxyTest(fixtures.TablesTest): lambda: r._mapping["user_id"], ) - result = users.outerjoin(addresses).select().execute() + result = connection.execute(users.outerjoin(addresses).select()) result = _result.BufferedColumnResultProxy(result.context) r = result.first() assert isinstance(r, _result.BufferedColumnRow) @@ -787,16 +798,16 @@ class ResultProxyTest(fixtures.TablesTest): ) @testing.requires.duplicate_names_in_cursor_description - def test_ambiguous_column_by_col(self): + def test_ambiguous_column_by_col(self, connection): users = self.tables.users - users.insert().execute(user_id=1, user_name="john") + connection.execute(users.insert(), user_id=1, user_name="john") ua = users.alias() u2 = users.alias() - result = ( - select([users.c.user_id, ua.c.user_id]) - .select_from(users.join(ua, true())) - .execute() + result = connection.execute( + select([users.c.user_id, ua.c.user_id]).select_from( + users.join(ua, true()) + ) ) row = result.first() @@ -815,18 +826,18 @@ class ResultProxyTest(fixtures.TablesTest): ) @testing.requires.duplicate_names_in_cursor_description - def test_ambiguous_column_contains(self): + def test_ambiguous_column_contains(self, connection): users = self.tables.users addresses = self.tables.addresses # ticket 2702. in 0.7 we'd get True, False. # in 0.8, both columns are present so it's True; # but when they're fetched you'll get the ambiguous error. - users.insert().execute(user_id=1, user_name="john") - result = ( - select([users.c.user_id, addresses.c.user_id]) - .select_from(users.outerjoin(addresses)) - .execute() + connection.execute(users.insert(), user_id=1, user_name="john") + result = connection.execute( + select([users.c.user_id, addresses.c.user_id]).select_from( + users.outerjoin(addresses) + ) ) row = result.first() @@ -840,106 +851,102 @@ class ResultProxyTest(fixtures.TablesTest): set([True]), ) - def test_loose_matching_one(self): + def test_loose_matching_one(self, connection): users = self.tables.users addresses = self.tables.addresses - with testing.db.connect() as conn: - conn.execute(users.insert(), {"user_id": 1, "user_name": "john"}) - conn.execute( - addresses.insert(), - {"address_id": 1, "user_id": 1, "address": "email"}, + connection.execute(users.insert(), {"user_id": 1, "user_name": "john"}) + connection.execute( + addresses.insert(), + {"address_id": 1, "user_id": 1, "address": "email"}, + ) + + # use some column labels in the SELECT + result = connection.execute( + TextualSelect( + text( + "select users.user_name AS users_user_name, " + "users.user_id AS user_id, " + "addresses.address_id AS address_id " + "FROM users JOIN addresses " + "ON users.user_id = addresses.user_id " + "WHERE users.user_id=1 " + ), + [users.c.user_id, users.c.user_name, addresses.c.address_id], + positional=False, ) + ) + row = result.first() + eq_(row._mapping[users.c.user_id], 1) + eq_(row._mapping[users.c.user_name], "john") - # use some column labels in the SELECT - result = conn.execute( - TextualSelect( - text( - "select users.user_name AS users_user_name, " - "users.user_id AS user_id, " - "addresses.address_id AS address_id " - "FROM users JOIN addresses " - "ON users.user_id = addresses.user_id " - "WHERE users.user_id=1 " - ), - [ - users.c.user_id, - users.c.user_name, - addresses.c.address_id, - ], - positional=False, - ) - ) - row = result.first() - eq_(row._mapping[users.c.user_id], 1) - eq_(row._mapping[users.c.user_name], "john") - - def test_loose_matching_two(self): + def test_loose_matching_two(self, connection): users = self.tables.users addresses = self.tables.addresses - with testing.db.connect() as conn: - conn.execute(users.insert(), {"user_id": 1, "user_name": "john"}) - conn.execute( - addresses.insert(), - {"address_id": 1, "user_id": 1, "address": "email"}, - ) - - # use some column labels in the SELECT - result = conn.execute( - TextualSelect( - text( - "select users.user_name AS users_user_name, " - "users.user_id AS user_id, " - "addresses.user_id " - "FROM users JOIN addresses " - "ON users.user_id = addresses.user_id " - "WHERE users.user_id=1 " - ), - [users.c.user_id, users.c.user_name, addresses.c.user_id], - positional=False, - ) + connection.execute(users.insert(), {"user_id": 1, "user_name": "john"}) + connection.execute( + addresses.insert(), + {"address_id": 1, "user_id": 1, "address": "email"}, + ) + + # use some column labels in the SELECT + result = connection.execute( + TextualSelect( + text( + "select users.user_name AS users_user_name, " + "users.user_id AS user_id, " + "addresses.user_id " + "FROM users JOIN addresses " + "ON users.user_id = addresses.user_id " + "WHERE users.user_id=1 " + ), + [users.c.user_id, users.c.user_name, addresses.c.user_id], + positional=False, ) - row = result.first() + ) + row = result.first() - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: row._mapping[users.c.user_id], - ) - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: row._mapping[addresses.c.user_id], - ) - eq_(row._mapping[users.c.user_name], "john") + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: row._mapping[users.c.user_id], + ) + assert_raises_message( + exc.InvalidRequestError, + "Ambiguous column name", + lambda: row._mapping[addresses.c.user_id], + ) + eq_(row._mapping[users.c.user_name], "john") - def test_ambiguous_column_by_col_plus_label(self): + def test_ambiguous_column_by_col_plus_label(self, connection): users = self.tables.users - users.insert().execute(user_id=1, user_name="john") - result = select( - [ - users.c.user_id, - type_coerce(users.c.user_id, Integer).label("foo"), - ] - ).execute() + connection.execute(users.insert(), user_id=1, user_name="john") + result = connection.execute( + select( + [ + users.c.user_id, + type_coerce(users.c.user_id, Integer).label("foo"), + ] + ) + ) row = result.first() eq_(row._mapping[users.c.user_id], 1) eq_(row[1], 1) - def test_fetch_partial_result_map(self): + def test_fetch_partial_result_map(self, connection): users = self.tables.users - users.insert().execute(user_id=7, user_name="ed") + connection.execute(users.insert(), user_id=7, user_name="ed") t = text("select * from users").columns(user_name=String()) - eq_(testing.db.execute(t).fetchall(), [(7, "ed")]) + eq_(connection.execute(t).fetchall(), [(7, "ed")]) - def test_fetch_unordered_result_map(self): + def test_fetch_unordered_result_map(self, connection): users = self.tables.users - users.insert().execute(user_id=7, user_name="ed") + connection.execute(users.insert(), user_id=7, user_name="ed") class Goofy1(TypeDecorator): impl = String @@ -963,28 +970,28 @@ class ResultProxyTest(fixtures.TablesTest): "select user_name as a, user_name as b, " "user_name as c from users" ).columns(a=Goofy1(), b=Goofy2(), c=Goofy3()) - eq_(testing.db.execute(t).fetchall(), [("eda", "edb", "edc")]) + eq_(connection.execute(t).fetchall(), [("eda", "edb", "edc")]) @testing.requires.subqueries - def test_column_label_targeting(self): + def test_column_label_targeting(self, connection): users = self.tables.users - users.insert().execute(user_id=7, user_name="ed") + connection.execute(users.insert(), user_id=7, user_name="ed") for s in ( users.select().alias("foo"), users.select().alias(users.name), ): - row = s.select(use_labels=True).execute().first() + row = connection.execute(s.select(use_labels=True)).first() eq_(row._mapping[s.c.user_id], 7) eq_(row._mapping[s.c.user_name], "ed") @testing.requires.python3 - def test_ro_mapping_py3k(self): + def test_ro_mapping_py3k(self, connection): users = self.tables.users - users.insert().execute(user_id=1, user_name="foo") - result = users.select().execute() + connection.execute(users.insert(), user_id=1, user_name="foo") + result = connection.execute(users.select()) row = result.first() dict_row = row._asdict() @@ -1003,11 +1010,11 @@ class ResultProxyTest(fixtures.TablesTest): eq_(odict_row.items(), mapping_row.items()) @testing.requires.python2 - def test_ro_mapping_py2k(self): + def test_ro_mapping_py2k(self, connection): users = self.tables.users - users.insert().execute(user_id=1, user_name="foo") - result = users.select().execute() + connection.execute(users.insert(), user_id=1, user_name="foo") + result = connection.execute(users.select()) row = result.first() dict_row = row._asdict() @@ -1023,33 +1030,33 @@ class ResultProxyTest(fixtures.TablesTest): eq_(odict_row.values(), list(mapping_row.values())) eq_(odict_row.items(), list(mapping_row.items())) - def test_keys(self): + def test_keys(self, connection): users = self.tables.users - users.insert().execute(user_id=1, user_name="foo") - result = users.select().execute() + connection.execute(users.insert(), user_id=1, user_name="foo") + result = connection.execute(users.select()) eq_(result.keys(), ["user_id", "user_name"]) row = result.first() eq_(list(row._mapping.keys()), ["user_id", "user_name"]) eq_(row._fields, ("user_id", "user_name")) - def test_row_keys_legacy_dont_warn(self): + def test_row_keys_legacy_dont_warn(self, connection): users = self.tables.users - users.insert().execute(user_id=1, user_name="foo") - result = users.select().execute() + connection.execute(users.insert(), user_id=1, user_name="foo") + result = connection.execute(users.select()) row = result.first() # DO NOT WARN DEPRECATED IN 1.x, ONLY 2.0 WARNING eq_(dict(row), {"user_id": 1, "user_name": "foo"}) eq_(row.keys(), ["user_id", "user_name"]) - def test_keys_anon_labels(self): + def test_keys_anon_labels(self, connection): """test [ticket:3483]""" users = self.tables.users - users.insert().execute(user_id=1, user_name="foo") - result = testing.db.execute( + connection.execute(users.insert(), user_id=1, user_name="foo") + result = connection.execute( select( [ users.c.user_id, @@ -1064,11 +1071,11 @@ class ResultProxyTest(fixtures.TablesTest): eq_(row._fields, ("user_id", "user_name_1", "count_1")) eq_(list(row._mapping.keys()), ["user_id", "user_name_1", "count_1"]) - def test_items(self): + def test_items(self, connection): users = self.tables.users - users.insert().execute(user_id=1, user_name="foo") - r = users.select().execute().first() + connection.execute(users.insert(), user_id=1, user_name="foo") + r = connection.execute(users.select()).first() eq_( [(x[0].lower(), x[1]) for x in list(r._mapping.items())], [("user_id", 1), ("user_name", "foo")], @@ -1088,27 +1095,30 @@ class ResultProxyTest(fixtures.TablesTest): r = connection.exec_driver_sql("select user_name from users").first() eq_(len(r), 1) - def test_sorting_in_python(self): + def test_sorting_in_python(self, connection): users = self.tables.users - users.insert().execute( + connection.execute( + users.insert(), dict(user_id=1, user_name="foo"), dict(user_id=2, user_name="bar"), dict(user_id=3, user_name="def"), ) - rows = users.select().order_by(users.c.user_name).execute().fetchall() + rows = connection.execute( + users.select().order_by(users.c.user_name) + ).fetchall() eq_(rows, [(2, "bar"), (3, "def"), (1, "foo")]) eq_(sorted(rows), [(1, "foo"), (2, "bar"), (3, "def")]) - def test_column_order_with_simple_query(self): + def test_column_order_with_simple_query(self, connection): # should return values in column definition order users = self.tables.users - users.insert().execute(user_id=1, user_name="foo") - r = users.select(users.c.user_id == 1).execute().first() + connection.execute(users.insert(), user_id=1, user_name="foo") + r = connection.execute(users.select(users.c.user_id == 1)).first() eq_(r[0], 1) eq_(r[1], "foo") eq_([x.lower() for x in r._fields], ["user_id", "user_name"]) @@ -1128,10 +1138,10 @@ class ResultProxyTest(fixtures.TablesTest): eq_([x.lower() for x in r._fields], ["user_name", "user_id"]) eq_(list(r._mapping.values()), ["foo", 1]) - @testing.crashes("oracle", "FIXME: unknown, varify not fails_on()") + @testing.crashes("oracle", "FIXME: unknown, verify not fails_on()") @testing.crashes("firebird", "An identifier must begin with a letter") @testing.provide_metadata - def test_column_accessor_shadow(self): + def test_column_accessor_shadow(self, connection): shadowed = Table( "test_shadowed", self.metadata, @@ -1142,8 +1152,9 @@ class ResultProxyTest(fixtures.TablesTest): Column("_parent", VARCHAR(20)), Column("_row", VARCHAR(20)), ) - self.metadata.create_all() - shadowed.insert().execute( + self.metadata.create_all(connection) + connection.execute( + shadowed.insert(), shadow_id=1, shadow_name="The Shadow", parent="The Light", @@ -1151,7 +1162,9 @@ class ResultProxyTest(fixtures.TablesTest): _parent="Hidden parent", _row="Hidden row", ) - r = shadowed.select(shadowed.c.shadow_id == 1).execute().first() + r = connection.execute( + shadowed.select(shadowed.c.shadow_id == 1) + ).first() eq_(r.shadow_id, 1) eq_(r._mapping["shadow_id"], 1) @@ -1221,25 +1234,26 @@ class ResultProxyTest(fixtures.TablesTest): with patch.object( engine.dialect.execution_ctx_cls, "rowcount" ) as mock_rowcount: - mock_rowcount.__get__ = Mock() - engine.execute( - t.insert(), {"data": "d1"}, {"data": "d2"}, {"data": "d3"} - ) + with engine.connect() as conn: + mock_rowcount.__get__ = Mock() + conn.execute( + t.insert(), {"data": "d1"}, {"data": "d2"}, {"data": "d3"} + ) - eq_(len(mock_rowcount.__get__.mock_calls), 0) + eq_(len(mock_rowcount.__get__.mock_calls), 0) - eq_( - engine.execute(t.select()).fetchall(), - [("d1",), ("d2",), ("d3",)], - ) - eq_(len(mock_rowcount.__get__.mock_calls), 0) + eq_( + conn.execute(t.select()).fetchall(), + [("d1",), ("d2",), ("d3",)], + ) + eq_(len(mock_rowcount.__get__.mock_calls), 0) - engine.execute(t.update(), {"data": "d4"}) + conn.execute(t.update(), {"data": "d4"}) - eq_(len(mock_rowcount.__get__.mock_calls), 1) + eq_(len(mock_rowcount.__get__.mock_calls), 1) - engine.execute(t.delete()) - eq_(len(mock_rowcount.__get__.mock_calls), 2) + conn.execute(t.delete()) + eq_(len(mock_rowcount.__get__.mock_calls), 2) def test_row_is_sequence(self): @@ -1259,17 +1273,17 @@ class ResultProxyTest(fixtures.TablesTest): eq_(hash(row), hash((1, "value", "foo"))) @testing.provide_metadata - def test_row_getitem_indexes_compiled(self): + def test_row_getitem_indexes_compiled(self, connection): values = Table( "rp", self.metadata, Column("key", String(10), primary_key=True), Column("value", String(10)), ) - values.create() + values.create(connection) - testing.db.execute(values.insert(), dict(key="One", value="Uno")) - row = testing.db.execute(values.select()).first() + connection.execute(values.insert(), dict(key="One", value="Uno")) + row = connection.execute(values.select()).first() eq_(row._mapping["key"], "One") eq_(row._mapping["value"], "Uno") eq_(row[0], "One") @@ -1293,7 +1307,7 @@ class ResultProxyTest(fixtures.TablesTest): @testing.requires.cextensions def test_row_c_sequence_check(self): - + # TODO: modernize for 2.0 metadata = MetaData() metadata.bind = "sqlite://" users = Table( @@ -1409,39 +1423,39 @@ class KeyTargetingTest(fixtures.TablesTest): ) @testing.requires.schemas - def test_keyed_accessor_wschema(self): + def test_keyed_accessor_wschema(self, connection): keyed1 = self.tables["%s.wschema" % testing.config.test_schema] - row = testing.db.execute(keyed1.select()).first() + row = connection.execute(keyed1.select()).first() eq_(row.b, "a1") eq_(row.q, "c1") eq_(row.a, "a1") eq_(row.c, "c1") - def test_keyed_accessor_single(self): + def test_keyed_accessor_single(self, connection): keyed1 = self.tables.keyed1 - row = testing.db.execute(keyed1.select()).first() + row = connection.execute(keyed1.select()).first() eq_(row.b, "a1") eq_(row.q, "c1") eq_(row.a, "a1") eq_(row.c, "c1") - def test_keyed_accessor_single_labeled(self): + def test_keyed_accessor_single_labeled(self, connection): keyed1 = self.tables.keyed1 - row = testing.db.execute(keyed1.select().apply_labels()).first() + row = connection.execute(keyed1.select().apply_labels()).first() eq_(row.keyed1_b, "a1") eq_(row.keyed1_q, "c1") eq_(row.keyed1_a, "a1") eq_(row.keyed1_c, "c1") - def _test_keyed_targeting_no_label_at_all(self, expression): + def _test_keyed_targeting_no_label_at_all(self, expression, conn): lt = literal_column("2") stmt = select([literal_column("1"), expression, lt]).select_from( self.tables.keyed1 ) - row = testing.db.execute(stmt).first() + row = conn.execute(stmt).first() eq_(row._mapping[expression], "a1") eq_(row._mapping[lt], 2) @@ -1450,7 +1464,7 @@ class KeyTargetingTest(fixtures.TablesTest): # easily. we get around that because we know that "2" is unique eq_(row._mapping["2"], 2) - def test_keyed_targeting_no_label_at_all_one(self): + def test_keyed_targeting_no_label_at_all_one(self, connection): class not_named_max(expression.ColumnElement): name = "not_named_max" @@ -1464,9 +1478,9 @@ class KeyTargetingTest(fixtures.TablesTest): eq_(str(select([not_named_max()])), "SELECT max(a)") nnm = not_named_max() - self._test_keyed_targeting_no_label_at_all(nnm) + self._test_keyed_targeting_no_label_at_all(nnm, connection) - def test_keyed_targeting_no_label_at_all_two(self): + def test_keyed_targeting_no_label_at_all_two(self, connection): class not_named_max(expression.ColumnElement): name = "not_named_max" @@ -1479,24 +1493,24 @@ class KeyTargetingTest(fixtures.TablesTest): eq_(str(select([not_named_max()])), "SELECT max(a)") nnm = not_named_max() - self._test_keyed_targeting_no_label_at_all(nnm) + self._test_keyed_targeting_no_label_at_all(nnm, connection) - def test_keyed_targeting_no_label_at_all_text(self): + def test_keyed_targeting_no_label_at_all_text(self, connection): t1 = text("max(a)") t2 = text("min(a)") stmt = select([t1, t2]).select_from(self.tables.keyed1) - row = testing.db.execute(stmt).first() + row = connection.execute(stmt).first() eq_(row._mapping[t1], "a1") eq_(row._mapping[t2], "a1") @testing.requires.duplicate_names_in_cursor_description - def test_keyed_accessor_composite_conflict_2(self): + def test_keyed_accessor_composite_conflict_2(self, connection): keyed1 = self.tables.keyed1 keyed2 = self.tables.keyed2 - row = testing.db.execute( + row = connection.execute( select([keyed1, keyed2]).select_from(keyed1.join(keyed2, true())) ).first() @@ -1522,14 +1536,16 @@ class KeyTargetingTest(fixtures.TablesTest): # illustrate why row.b above is ambiguous, and not "b2"; because # if we didn't have keyed2, now it matches row.a. a new column # shouldn't be able to grab the value from a previous column. - row = testing.db.execute(select([keyed1])).first() + row = connection.execute(select([keyed1])).first() eq_(row.b, "a1") - def test_keyed_accessor_composite_conflict_2_fix_w_uselabels(self): + def test_keyed_accessor_composite_conflict_2_fix_w_uselabels( + self, connection + ): keyed1 = self.tables.keyed1 keyed2 = self.tables.keyed2 - row = testing.db.execute( + row = connection.execute( select([keyed1, keyed2]) .select_from(keyed1.join(keyed2, true())) .apply_labels() @@ -1541,11 +1557,11 @@ class KeyTargetingTest(fixtures.TablesTest): eq_(row._mapping["keyed2_b"], "b2") eq_(row._mapping["keyed1_a"], "a1") - def test_keyed_accessor_composite_names_precedent(self): + def test_keyed_accessor_composite_names_precedent(self, connection): keyed1 = self.tables.keyed1 keyed4 = self.tables.keyed4 - row = testing.db.execute( + row = connection.execute( select([keyed1, keyed4]).select_from(keyed1.join(keyed4, true())) ).first() eq_(row.b, "b4") @@ -1554,11 +1570,11 @@ class KeyTargetingTest(fixtures.TablesTest): eq_(row.c, "c1") @testing.requires.duplicate_names_in_cursor_description - def test_keyed_accessor_composite_keys_precedent(self): + def test_keyed_accessor_composite_keys_precedent(self, connection): keyed1 = self.tables.keyed1 keyed3 = self.tables.keyed3 - row = testing.db.execute( + row = connection.execute( select([keyed1, keyed3]).select_from(keyed1.join(keyed3, true())) ).first() eq_(row.q, "c1") @@ -1578,11 +1594,11 @@ class KeyTargetingTest(fixtures.TablesTest): ) eq_(row.d, "d3") - def test_keyed_accessor_composite_labeled(self): + def test_keyed_accessor_composite_labeled(self, connection): keyed1 = self.tables.keyed1 keyed2 = self.tables.keyed2 - row = testing.db.execute( + row = connection.execute( select([keyed1, keyed2]) .select_from(keyed1.join(keyed2, true())) .apply_labels() @@ -1599,7 +1615,9 @@ class KeyTargetingTest(fixtures.TablesTest): assert_raises(KeyError, lambda: row._mapping["keyed2_c"]) assert_raises(KeyError, lambda: row._mapping["keyed2_q"]) - def test_keyed_accessor_column_is_repeated_multiple_times(self): + def test_keyed_accessor_column_is_repeated_multiple_times( + self, connection + ): # test new logic added as a result of the combination of #4892 and # #4887. We allow duplicate columns, but we also have special logic # to disambiguate for the same column repeated, and as #4887 adds @@ -1627,7 +1645,7 @@ class KeyTargetingTest(fixtures.TablesTest): .apply_labels() ) - result = testing.db.execute(stmt) + result = connection.execute(stmt) is_false(result._metadata.matched_on_name) # ensure the result map is the same number of cols so we can @@ -1664,34 +1682,34 @@ class KeyTargetingTest(fixtures.TablesTest): eq_(row[6], "d3") eq_(row[7], "d3") - def test_columnclause_schema_column_one(self): + def test_columnclause_schema_column_one(self, connection): # originally addressed by [ticket:2932], however liberalized # Column-targeting rules are deprecated a, b = sql.column("a"), sql.column("b") stmt = select([a, b]).select_from(table("keyed2")) - row = testing.db.execute(stmt).first() + row = connection.execute(stmt).first() in_(a, row._mapping) in_(b, row._mapping) - def test_columnclause_schema_column_two(self): + def test_columnclause_schema_column_two(self, connection): keyed2 = self.tables.keyed2 stmt = select([keyed2.c.a, keyed2.c.b]) - row = testing.db.execute(stmt).first() + row = connection.execute(stmt).first() in_(keyed2.c.a, row._mapping) in_(keyed2.c.b, row._mapping) - def test_columnclause_schema_column_three(self): + def test_columnclause_schema_column_three(self, connection): # this is also addressed by [ticket:2932] stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR) - row = testing.db.execute(stmt).first() + row = connection.execute(stmt).first() in_(stmt.selected_columns.a, row._mapping) in_(stmt.selected_columns.b, row._mapping) - def test_columnclause_schema_column_four(self): + def test_columnclause_schema_column_four(self, connection): # originally addressed by [ticket:2932], however liberalized # Column-targeting rules are deprecated @@ -1699,7 +1717,7 @@ class KeyTargetingTest(fixtures.TablesTest): stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( a, b ) - row = testing.db.execute(stmt).first() + row = connection.execute(stmt).first() in_(a, row._mapping) in_(b, row._mapping) @@ -1707,13 +1725,13 @@ class KeyTargetingTest(fixtures.TablesTest): in_(stmt.selected_columns.keyed2_a, row._mapping) in_(stmt.selected_columns.keyed2_b, row._mapping) - def test_columnclause_schema_column_five(self): + def test_columnclause_schema_column_five(self, connection): # this is also addressed by [ticket:2932] stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns( keyed2_a=CHAR, keyed2_b=CHAR ) - row = testing.db.execute(stmt).first() + row = connection.execute(stmt).first() in_(stmt.selected_columns.keyed2_a, row._mapping) in_(stmt.selected_columns.keyed2_b, row._mapping) @@ -1818,15 +1836,17 @@ class PositionalTextTest(fixtures.TablesTest): @classmethod def insert_data(cls): - cls.tables.text1.insert().execute( - [dict(a="a1", b="b1", c="c1", d="d1")] - ) + with testing.db.connect() as conn: + conn.execute( + cls.tables.text1.insert(), + [dict(a="a1", b="b1", c="c1", d="d1")], + ) - def test_via_column(self): + def test_via_column(self, connection): c1, c2, c3, c4 = column("q"), column("p"), column("r"), column("d") stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) - result = testing.db.execute(stmt) + result = connection.execute(stmt) row = result.first() eq_(row._mapping[c2], "b1") @@ -1838,23 +1858,23 @@ class PositionalTextTest(fixtures.TablesTest): eq_(row._mapping["r"], "c1") eq_(row._mapping["d"], "d1") - def test_fewer_cols_than_sql_positional(self): + def test_fewer_cols_than_sql_positional(self, connection): c1, c2 = column("q"), column("p") stmt = text("select a, b, c, d from text1").columns(c1, c2) # no warning as this can be similar for non-positional - result = testing.db.execute(stmt) + result = connection.execute(stmt) row = result.first() eq_(row._mapping[c1], "a1") eq_(row._mapping["c"], "c1") - def test_fewer_cols_than_sql_non_positional(self): + def test_fewer_cols_than_sql_non_positional(self, connection): c1, c2 = column("a"), column("p") stmt = text("select a, b, c, d from text1").columns(c2, c1, d=CHAR) # no warning as this can be similar for non-positional - result = testing.db.execute(stmt) + result = connection.execute(stmt) row = result.first() # c1 name matches, locates @@ -1868,7 +1888,7 @@ class PositionalTextTest(fixtures.TablesTest): lambda: row._mapping[c2], ) - def test_more_cols_than_sql_positional(self): + def test_more_cols_than_sql_positional(self, connection): c1, c2, c3, c4 = column("q"), column("p"), column("r"), column("d") stmt = text("select a, b from text1").columns(c1, c2, c3, c4) @@ -1876,7 +1896,7 @@ class PositionalTextTest(fixtures.TablesTest): r"Number of columns in textual SQL \(4\) is " r"smaller than number of columns requested \(2\)" ): - result = testing.db.execute(stmt) + result = connection.execute(stmt) row = result.first() eq_(row._mapping[c2], "b1") @@ -1887,14 +1907,14 @@ class PositionalTextTest(fixtures.TablesTest): lambda: row._mapping[c3], ) - def test_more_cols_than_sql_nonpositional(self): + def test_more_cols_than_sql_nonpositional(self, connection): c1, c2, c3, c4 = column("b"), column("a"), column("r"), column("d") stmt = TextualSelect( text("select a, b from text1"), [c1, c2, c3, c4], positional=False ) # no warning for non-positional - result = testing.db.execute(stmt) + result = connection.execute(stmt) row = result.first() eq_(row._mapping[c1], "b1") @@ -1906,7 +1926,7 @@ class PositionalTextTest(fixtures.TablesTest): lambda: row._mapping[c3], ) - def test_more_cols_than_sql_nonpositional_labeled_cols(self): + def test_more_cols_than_sql_nonpositional_labeled_cols(self, connection): text1 = self.tables.text1 c1, c2, c3, c4 = text1.c.b, text1.c.a, column("r"), column("d") @@ -1919,7 +1939,7 @@ class PositionalTextTest(fixtures.TablesTest): ) # no warning for non-positional - result = testing.db.execute(stmt) + result = connection.execute(stmt) row = result.first() eq_(row._mapping[c1], "b1") @@ -1931,7 +1951,7 @@ class PositionalTextTest(fixtures.TablesTest): lambda: row._mapping[c3], ) - def test_dupe_col_obj(self): + def test_dupe_col_obj(self, connection): c1, c2, c3 = column("q"), column("p"), column("r") stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c2) @@ -1939,11 +1959,11 @@ class PositionalTextTest(fixtures.TablesTest): exc.InvalidRequestError, "Duplicate column expression requested in " "textual SQL: <.*.ColumnClause.*; p>", - testing.db.execute, + connection.execute, stmt, ) - def test_anon_aliased_unique(self): + def test_anon_aliased_unique(self, connection): text1 = self.tables.text1 c1 = text1.c.a.label(None) @@ -1952,7 +1972,7 @@ class PositionalTextTest(fixtures.TablesTest): c4 = text1.alias().c.d.label(None) stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) - result = testing.db.execute(stmt) + result = connection.execute(stmt) row = result.first() eq_(row._mapping[c1], "a1") @@ -1968,7 +1988,7 @@ class PositionalTextTest(fixtures.TablesTest): lambda: row._mapping[text1.c.b], ) - def test_anon_aliased_overlapping(self): + def test_anon_aliased_overlapping(self, connection): text1 = self.tables.text1 c1 = text1.c.a.label(None) @@ -1977,7 +1997,7 @@ class PositionalTextTest(fixtures.TablesTest): c4 = text1.c.a.label(None) stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4) - result = testing.db.execute(stmt) + result = connection.execute(stmt) row = result.first() eq_(row._mapping[c1], "a1") @@ -1985,7 +2005,7 @@ class PositionalTextTest(fixtures.TablesTest): eq_(row._mapping[c3], "c1") eq_(row._mapping[c4], "d1") - def test_anon_aliased_name_conflict(self): + def test_anon_aliased_name_conflict(self, connection): text1 = self.tables.text1 c1 = text1.c.a.label("a") @@ -1998,7 +2018,7 @@ class PositionalTextTest(fixtures.TablesTest): stmt = text("select a, b as a, c as a, d as a from text1").columns( c1, c2, c3, c4 ) - result = testing.db.execute(stmt) + result = connection.execute(stmt) row = result.first() eq_(row._mapping[c1], "a1") @@ -2034,10 +2054,11 @@ class AlternateResultProxyTest(fixtures.TablesTest): @classmethod def insert_data(cls): - cls.engine.execute( - cls.tables.test.insert(), - [{"x": i, "y": "t_%d" % i} for i in range(1, 12)], - ) + with cls.engine.connect() as conn: + conn.execute( + cls.tables.test.insert(), + [{"x": i, "y": "t_%d" % i} for i in range(1, 12)], + ) @contextmanager def _proxy_fixture(self, cls): @@ -2059,53 +2080,54 @@ class AlternateResultProxyTest(fixtures.TablesTest): def _test_proxy(self, cls): with self._proxy_fixture(cls): rows = [] - r = self.engine.execute(select([self.table])) - assert isinstance(r.cursor_strategy, cls) - for i in range(5): - rows.append(r.fetchone()) - eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) + with self.engine.connect() as conn: + r = conn.execute(select([self.table])) + assert isinstance(r.cursor_strategy, cls) + for i in range(5): + rows.append(r.fetchone()) + eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) - rows = r.fetchmany(3) - eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)]) + rows = r.fetchmany(3) + eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)]) - rows = r.fetchall() - eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)]) + rows = r.fetchall() + eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)]) - r = self.engine.execute(select([self.table])) - rows = r.fetchmany(None) - eq_(rows[0], (1, "t_1")) - # number of rows here could be one, or the whole thing - assert len(rows) == 1 or len(rows) == 11 + r = conn.execute(select([self.table])) + rows = r.fetchmany(None) + eq_(rows[0], (1, "t_1")) + # number of rows here could be one, or the whole thing + assert len(rows) == 1 or len(rows) == 11 - r = self.engine.execute(select([self.table]).limit(1)) - r.fetchone() - eq_(r.fetchone(), None) + r = conn.execute(select([self.table]).limit(1)) + r.fetchone() + eq_(r.fetchone(), None) - r = self.engine.execute(select([self.table]).limit(5)) - rows = r.fetchmany(6) - eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) + r = conn.execute(select([self.table]).limit(5)) + rows = r.fetchmany(6) + eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) - # result keeps going just fine with blank results... - eq_(r.fetchmany(2), []) + # result keeps going just fine with blank results... + eq_(r.fetchmany(2), []) - eq_(r.fetchmany(2), []) + eq_(r.fetchmany(2), []) - eq_(r.fetchall(), []) + eq_(r.fetchall(), []) - eq_(r.fetchone(), None) + eq_(r.fetchone(), None) - # until we close - r.close() + # until we close + r.close() - self._assert_result_closed(r) + self._assert_result_closed(r) - r = self.engine.execute(select([self.table]).limit(5)) - eq_(r.first(), (1, "t_1")) - self._assert_result_closed(r) + r = conn.execute(select([self.table]).limit(5)) + eq_(r.first(), (1, "t_1")) + self._assert_result_closed(r) - r = self.engine.execute(select([self.table]).limit(5)) - eq_(r.scalar(), 1) - self._assert_result_closed(r) + r = conn.execute(select([self.table]).limit(5)) + eq_(r.scalar(), 1) + self._assert_result_closed(r) def _assert_result_closed(self, r): assert_raises_message(