From b4a835044d270d58f17377557a23244e773ef450 Mon Sep 17 00:00:00 2001 From: Gord Thompson Date: Sun, 12 Apr 2020 17:38:04 -0600 Subject: [PATCH] Clean up .execute in test/sql/test_query.py Change-Id: I3f3a3b3f7a7ec67ef475a9792372e1c9a600fafb --- test/sql/test_query.py | 810 +++++++++++++++++++++-------------------- 1 file changed, 413 insertions(+), 397 deletions(-) diff --git a/test/sql/test_query.py b/test/sql/test_query.py index 83c5da3427..bca7c262b7 100644 --- a/test/sql/test_query.py +++ b/test/sql/test_query.py @@ -84,9 +84,10 @@ class QueryTest(fixtures.TestBase): @engines.close_first def teardown(self): - addresses.delete().execute() - users.delete().execute() - users2.delete().execute() + with testing.db.connect() as conn: + conn.execute(addresses.delete()) + conn.execute(users.delete()) + conn.execute(users2.delete()) @classmethod def teardown_class(cls): @@ -95,7 +96,7 @@ class QueryTest(fixtures.TestBase): @testing.fails_on( "firebird", "kinterbasdb doesn't send full type information" ) - def test_order_by_label(self): + def test_order_by_label(self, connection): """test that a label within an ORDER BY works on each backend. This test should be modified to support [ticket:1068] when that ticket @@ -104,7 +105,8 @@ class QueryTest(fixtures.TestBase): """ - users.insert().execute( + connection.execute( + users.insert(), {"user_id": 7, "user_name": "jack"}, {"user_id": 8, "user_name": "ed"}, {"user_id": 9, "user_name": "fred"}, @@ -112,24 +114,31 @@ class QueryTest(fixtures.TestBase): concat = ("test: " + users.c.user_name).label("thedata") eq_( - select([concat]).order_by("thedata").execute().fetchall(), + connection.execute( + select([concat]).order_by("thedata") + ).fetchall(), [("test: ed",), ("test: fred",), ("test: jack",)], ) eq_( - select([concat]).order_by("thedata").execute().fetchall(), + connection.execute( + select([concat]).order_by("thedata") + ).fetchall(), [("test: ed",), ("test: fred",), ("test: jack",)], ) concat = ("test: " + users.c.user_name).label("thedata") eq_( - select([concat]).order_by(desc("thedata")).execute().fetchall(), + connection.execute( + select([concat]).order_by(desc("thedata")) + ).fetchall(), [("test: jack",), ("test: fred",), ("test: ed",)], ) @testing.requires.order_by_label_with_expression - def test_order_by_label_compound(self): - users.insert().execute( + def test_order_by_label_compound(self, connection): + connection.execute( + users.insert(), {"user_id": 7, "user_name": "jack"}, {"user_id": 8, "user_name": "ed"}, {"user_id": 9, "user_name": "fred"}, @@ -137,27 +146,26 @@ class QueryTest(fixtures.TestBase): concat = ("test: " + users.c.user_name).label("thedata") eq_( - select([concat]) - .order_by(literal_column("thedata") + "x") - .execute() - .fetchall(), + connection.execute( + select([concat]).order_by(literal_column("thedata") + "x") + ).fetchall(), [("test: ed",), ("test: fred",), ("test: jack",)], ) @testing.requires.boolean_col_expressions - def test_or_and_as_columns(self): + def test_or_and_as_columns(self, connection): true, false = literal(True), literal(False) - eq_(testing.db.execute(select([and_(true, false)])).scalar(), False) - eq_(testing.db.execute(select([and_(true, true)])).scalar(), True) - eq_(testing.db.execute(select([or_(true, false)])).scalar(), True) - eq_(testing.db.execute(select([or_(false, false)])).scalar(), False) + eq_(connection.execute(select([and_(true, false)])).scalar(), False) + eq_(connection.execute(select([and_(true, true)])).scalar(), True) + eq_(connection.execute(select([or_(true, false)])).scalar(), True) + eq_(connection.execute(select([or_(false, false)])).scalar(), False) eq_( - testing.db.execute(select([not_(or_(false, false))])).scalar(), + connection.execute(select([not_(or_(false, false))])).scalar(), True, ) - row = testing.db.execute( + row = connection.execute( select( [or_(false, false).label("x"), and_(true, false).label("y")] ) @@ -165,14 +173,15 @@ class QueryTest(fixtures.TestBase): assert row.x == False # noqa assert row.y == False # noqa - row = testing.db.execute( + row = connection.execute( select([or_(true, false).label("x"), and_(true, false).label("y")]) ).first() assert row.x == True # noqa assert row.y == False # noqa - def test_like_ops(self): - users.insert().execute( + def test_like_ops(self, connection): + connection.execute( + users.insert(), {"user_id": 1, "user_name": "apples"}, {"user_id": 2, "user_name": "oranges"}, {"user_id": 3, "user_name": "bananas"}, @@ -206,11 +215,11 @@ class QueryTest(fixtures.TestBase): [(5,)], ), ): - eq_(expr.execute().fetchall(), result) + eq_(connection.execute(expr).fetchall(), result) @testing.requires.mod_operator_as_percent_sign @testing.emits_warning(".*now automatically escapes.*") - def test_percents_in_text(self): + def test_percents_in_text(self, connection): for expr, result in ( (text("select 6 % 10"), 6), (text("select 17 % 10"), 7), @@ -219,10 +228,11 @@ class QueryTest(fixtures.TestBase): (text("select '%%%'"), "%%%"), (text("select 'hello % world'"), "hello % world"), ): - eq_(testing.db.scalar(expr), result) + eq_(connection.scalar(expr), result) - def test_ilike(self): - users.insert().execute( + def test_ilike(self, connection): + connection.execute( + users.insert(), {"user_id": 1, "user_name": "one"}, {"user_id": 2, "user_name": "TwO"}, {"user_id": 3, "user_name": "ONE"}, @@ -230,62 +240,62 @@ class QueryTest(fixtures.TestBase): ) eq_( - select([users.c.user_id]) - .where(users.c.user_name.ilike("one")) - .execute() - .fetchall(), + connection.execute( + select([users.c.user_id]).where(users.c.user_name.ilike("one")) + ).fetchall(), [(1,), (3,), (4,)], ) eq_( - select([users.c.user_id]) - .where(users.c.user_name.ilike("TWO")) - .execute() - .fetchall(), + connection.execute( + select([users.c.user_id]).where(users.c.user_name.ilike("TWO")) + ).fetchall(), [(2,)], ) if testing.against("postgresql"): eq_( - select([users.c.user_id]) - .where(users.c.user_name.like("one")) - .execute() - .fetchall(), + connection.execute( + select([users.c.user_id]).where( + users.c.user_name.like("one") + ) + ).fetchall(), [(1,)], ) eq_( - select([users.c.user_id]) - .where(users.c.user_name.like("TWO")) - .execute() - .fetchall(), + connection.execute( + select([users.c.user_id]).where( + users.c.user_name.like("TWO") + ) + ).fetchall(), [], ) - def test_compiled_execute(self): - users.insert().execute(user_id=7, user_name="jack") + def test_compiled_execute(self, connection): + connection.execute(users.insert(), user_id=7, user_name="jack") s = select([users], users.c.user_id == bindparam("id")).compile() - c = testing.db.connect() - eq_(c.execute(s, id=7).first()._mapping["user_id"], 7) + eq_(connection.execute(s, id=7).first()._mapping["user_id"], 7) - def test_compiled_insert_execute(self): - users.insert().compile().execute(user_id=7, user_name="jack") + def test_compiled_insert_execute(self, connection): + connection.execute( + users.insert().compile(), user_id=7, user_name="jack" + ) s = select([users], users.c.user_id == bindparam("id")).compile() - c = testing.db.connect() - eq_(c.execute(s, id=7).first()._mapping["user_id"], 7) + eq_(connection.execute(s, id=7).first()._mapping["user_id"], 7) - def test_repeated_bindparams(self): + def test_repeated_bindparams(self, connection): """Tests that a BindParam can be used more than once. This should be run for DB-APIs with both positional and named paramstyles. """ - users.insert().execute(user_id=7, user_name="jack") - users.insert().execute(user_id=8, user_name="fred") + connection.execute(users.insert(), user_id=7, user_name="jack") + connection.execute(users.insert(), user_id=8, user_name="fred") u = bindparam("userid") s = users.select(and_(users.c.user_name == u, users.c.user_name == u)) - r = s.execute(userid="fred").fetchall() + r = connection.execute(s, userid="fred").fetchall() assert len(r) == 1 def test_bindparam_detection(self): @@ -322,7 +332,7 @@ class QueryTest(fixtures.TestBase): a_eq(prep(r".\:that$ :other."), ".:that$ ?.") @testing.requires.standalone_binds - def test_select_from_bindparam(self): + def test_select_from_bindparam(self, connection): """Test result row processing when selecting from a plain bind param.""" @@ -336,28 +346,28 @@ class QueryTest(fixtures.TestBase): return "INT_%d" % value eq_( - testing.db.scalar(select([cast("INT_5", type_=MyInteger)])), + connection.scalar(select([cast("INT_5", type_=MyInteger)])), "INT_5", ) eq_( - testing.db.scalar( + connection.scalar( select([cast("INT_5", type_=MyInteger).label("foo")]) ), "INT_5", ) - def test_order_by(self): + def test_order_by(self, connection): """Exercises ORDER BY clause generation. Tests simple, compound, aliased and DESC clauses. """ - users.insert().execute(user_id=1, user_name="c") - users.insert().execute(user_id=2, user_name="b") - users.insert().execute(user_id=3, user_name="a") + connection.execute(users.insert(), user_id=1, user_name="c") + connection.execute(users.insert(), user_id=2, user_name="b") + connection.execute(users.insert(), user_id=3, user_name="a") def a_eq(executable, wanted): - got = list(executable.execute()) + got = list(connection.execute(executable)) eq_(got, wanted) for labels in False, True: @@ -444,18 +454,18 @@ class QueryTest(fixtures.TestBase): ) @testing.requires.nullsordering - def test_order_by_nulls(self): + def test_order_by_nulls(self, connection): """Exercises ORDER BY clause generation. Tests simple, compound, aliased and DESC clauses. """ - users.insert().execute(user_id=1) - users.insert().execute(user_id=2, user_name="b") - users.insert().execute(user_id=3, user_name="a") + connection.execute(users.insert(), user_id=1) + connection.execute(users.insert(), user_id=2, user_name="b") + connection.execute(users.insert(), user_id=3, user_name="a") def a_eq(executable, wanted): - got = list(executable.execute()) + got = list(connection.execute(executable)) eq_(got, wanted) for labels in False, True: @@ -538,33 +548,33 @@ class QueryTest(fixtures.TestBase): [(3, "a"), (2, "b"), (1, None)], ) - def test_in_filtering(self): + def test_in_filtering(self, connection): """test the behavior of the in_() function.""" - users.insert().execute(user_id=7, user_name="jack") - users.insert().execute(user_id=8, user_name="fred") - users.insert().execute(user_id=9, user_name=None) + connection.execute(users.insert(), user_id=7, user_name="jack") + connection.execute(users.insert(), user_id=8, user_name="fred") + connection.execute(users.insert(), user_id=9, user_name=None) s = users.select(users.c.user_name.in_([])) - r = s.execute().fetchall() + r = connection.execute(s).fetchall() # No username is in empty set assert len(r) == 0 s = users.select(not_(users.c.user_name.in_([]))) - r = s.execute().fetchall() + r = connection.execute(s).fetchall() assert len(r) == 3 s = users.select(users.c.user_name.in_(["jack", "fred"])) - r = s.execute().fetchall() + r = connection.execute(s).fetchall() assert len(r) == 2 s = users.select(not_(users.c.user_name.in_(["jack", "fred"]))) - r = s.execute().fetchall() + r = connection.execute(s).fetchall() # Null values are not outside any set assert len(r) == 0 - def test_expanding_in(self): - testing.db.execute( + def test_expanding_in(self, connection): + connection.execute( users.insert(), [ dict(user_id=7, user_name="jack"), @@ -573,40 +583,37 @@ class QueryTest(fixtures.TestBase): ], ) - with testing.db.connect() as conn: - stmt = ( - select([users]) - .where( - users.c.user_name.in_(bindparam("uname", expanding=True)) - ) - .order_by(users.c.user_id) - ) + stmt = ( + select([users]) + .where(users.c.user_name.in_(bindparam("uname", expanding=True))) + .order_by(users.c.user_id) + ) - eq_( - conn.execute(stmt, {"uname": ["jack"]}).fetchall(), - [(7, "jack")], - ) + eq_( + connection.execute(stmt, {"uname": ["jack"]}).fetchall(), + [(7, "jack")], + ) - eq_( - conn.execute(stmt, {"uname": ["jack", "fred"]}).fetchall(), - [(7, "jack"), (8, "fred")], - ) + eq_( + connection.execute(stmt, {"uname": ["jack", "fred"]}).fetchall(), + [(7, "jack"), (8, "fred")], + ) - eq_(conn.execute(stmt, {"uname": []}).fetchall(), []) + eq_(connection.execute(stmt, {"uname": []}).fetchall(), []) - assert_raises_message( - exc.StatementError, - "'expanding' parameters can't be used with executemany()", - conn.execute, - users.update().where( - users.c.user_name.in_(bindparam("uname", expanding=True)) - ), - [{"uname": ["fred"]}, {"uname": ["ed"]}], - ) + assert_raises_message( + exc.StatementError, + "'expanding' parameters can't be used with executemany()", + connection.execute, + users.update().where( + users.c.user_name.in_(bindparam("uname", expanding=True)) + ), + [{"uname": ["fred"]}, {"uname": ["ed"]}], + ) @testing.requires.no_quoting_special_bind_names - def test_expanding_in_special_chars(self): - testing.db.execute( + def test_expanding_in_special_chars(self, connection): + connection.execute( users.insert(), [ dict(user_id=7, user_name="jack"), @@ -614,39 +621,36 @@ class QueryTest(fixtures.TestBase): ], ) - with testing.db.connect() as conn: - stmt = ( - select([users]) - .where(users.c.user_name.in_(bindparam("u35", expanding=True))) - .where(users.c.user_id == bindparam("u46")) - .order_by(users.c.user_id) - ) + stmt = ( + select([users]) + .where(users.c.user_name.in_(bindparam("u35", expanding=True))) + .where(users.c.user_id == bindparam("u46")) + .order_by(users.c.user_id) + ) - eq_( - conn.execute( - stmt, {"u35": ["jack", "fred"], "u46": 7} - ).fetchall(), - [(7, "jack")], - ) + eq_( + connection.execute( + stmt, {"u35": ["jack", "fred"], "u46": 7} + ).fetchall(), + [(7, "jack")], + ) - stmt = ( - select([users]) - .where( - users.c.user_name.in_(bindparam("u.35", expanding=True)) - ) - .where(users.c.user_id == bindparam("u.46")) - .order_by(users.c.user_id) - ) + stmt = ( + select([users]) + .where(users.c.user_name.in_(bindparam("u.35", expanding=True))) + .where(users.c.user_id == bindparam("u.46")) + .order_by(users.c.user_id) + ) - eq_( - conn.execute( - stmt, {"u.35": ["jack", "fred"], "u.46": 7} - ).fetchall(), - [(7, "jack")], - ) + eq_( + connection.execute( + stmt, {"u.35": ["jack", "fred"], "u.46": 7} + ).fetchall(), + [(7, "jack")], + ) - def test_expanding_in_multiple(self): - testing.db.execute( + def test_expanding_in_multiple(self, connection): + connection.execute( users.insert(), [ dict(user_id=7, user_name="jack"), @@ -655,27 +659,22 @@ class QueryTest(fixtures.TestBase): ], ) - with testing.db.connect() as conn: - stmt = ( - select([users]) - .where( - users.c.user_name.in_(bindparam("uname", expanding=True)) - ) - .where( - users.c.user_id.in_(bindparam("userid", expanding=True)) - ) - .order_by(users.c.user_id) - ) + stmt = ( + select([users]) + .where(users.c.user_name.in_(bindparam("uname", expanding=True))) + .where(users.c.user_id.in_(bindparam("userid", expanding=True))) + .order_by(users.c.user_id) + ) - eq_( - conn.execute( - stmt, {"uname": ["jack", "fred", "ed"], "userid": [8, 9]} - ).fetchall(), - [(8, "fred"), (9, "ed")], - ) + eq_( + connection.execute( + stmt, {"uname": ["jack", "fred", "ed"], "userid": [8, 9]} + ).fetchall(), + [(8, "fred"), (9, "ed")], + ) - def test_expanding_in_repeated(self): - testing.db.execute( + def test_expanding_in_repeated(self, connection): + connection.execute( users.insert(), [ dict(user_id=7, user_name="jack"), @@ -684,43 +683,38 @@ class QueryTest(fixtures.TestBase): ], ) - with testing.db.connect() as conn: - stmt = ( - select([users]) - .where( - users.c.user_name.in_(bindparam("uname", expanding=True)) - | users.c.user_name.in_( - bindparam("uname2", expanding=True) - ) - ) - .where(users.c.user_id == 8) - ) - stmt = stmt.union( - select([users]) - .where( - users.c.user_name.in_(bindparam("uname", expanding=True)) - | users.c.user_name.in_( - bindparam("uname2", expanding=True) - ) - ) - .where(users.c.user_id == 9) - ).order_by("user_id") - - eq_( - conn.execute( - stmt, - { - "uname": ["jack", "fred"], - "uname2": ["ed"], - "userid": [8, 9], - }, - ).fetchall(), - [(8, "fred"), (9, "ed")], + stmt = ( + select([users]) + .where( + users.c.user_name.in_(bindparam("uname", expanding=True)) + | users.c.user_name.in_(bindparam("uname2", expanding=True)) + ) + .where(users.c.user_id == 8) + ) + stmt = stmt.union( + select([users]) + .where( + users.c.user_name.in_(bindparam("uname", expanding=True)) + | users.c.user_name.in_(bindparam("uname2", expanding=True)) ) + .where(users.c.user_id == 9) + ).order_by("user_id") + + eq_( + connection.execute( + stmt, + { + "uname": ["jack", "fred"], + "uname2": ["ed"], + "userid": [8, 9], + }, + ).fetchall(), + [(8, "fred"), (9, "ed")], + ) @testing.requires.tuple_in - def test_expanding_in_composite(self): - testing.db.execute( + def test_expanding_in_composite(self, connection): + connection.execute( users.insert(), [ dict(user_id=7, user_name="jack"), @@ -729,30 +723,29 @@ class QueryTest(fixtures.TestBase): ], ) - with testing.db.connect() as conn: - stmt = ( - select([users]) - .where( - tuple_(users.c.user_id, users.c.user_name).in_( - bindparam("uname", expanding=True) - ) + stmt = ( + select([users]) + .where( + tuple_(users.c.user_id, users.c.user_name).in_( + bindparam("uname", expanding=True) ) - .order_by(users.c.user_id) ) + .order_by(users.c.user_id) + ) - eq_( - conn.execute(stmt, {"uname": [(7, "jack")]}).fetchall(), - [(7, "jack")], - ) + eq_( + connection.execute(stmt, {"uname": [(7, "jack")]}).fetchall(), + [(7, "jack")], + ) - eq_( - conn.execute( - stmt, {"uname": [(7, "jack"), (8, "fred")]} - ).fetchall(), - [(7, "jack"), (8, "fred")], - ) + eq_( + connection.execute( + stmt, {"uname": [(7, "jack"), (8, "fred")]} + ).fetchall(), + [(7, "jack"), (8, "fred")], + ) - def test_expanding_in_dont_alter_compiled(self): + def test_expanding_in_dont_alter_compiled(self, connection): """test for issue #5048 """ class NameWithProcess(TypeDecorator): @@ -768,40 +761,37 @@ class QueryTest(fixtures.TestBase): Column("user_name", NameWithProcess()), ) - with testing.db.connect() as conn: - conn.execute( - users.insert(), - [ - dict(user_id=7, user_name="AB jack"), - dict(user_id=8, user_name="BE fred"), - dict(user_id=9, user_name="GP ed"), - ], - ) + connection.execute( + users.insert(), + [ + dict(user_id=7, user_name="AB jack"), + dict(user_id=8, user_name="BE fred"), + dict(user_id=9, user_name="GP ed"), + ], + ) - stmt = ( - select([users]) - .where( - users.c.user_name.in_(bindparam("uname", expanding=True)) - ) - .order_by(users.c.user_id) - ) + stmt = ( + select([users]) + .where(users.c.user_name.in_(bindparam("uname", expanding=True))) + .order_by(users.c.user_id) + ) - compiled = stmt.compile(testing.db) - eq_(len(compiled._bind_processors), 1) + compiled = stmt.compile(testing.db) + eq_(len(compiled._bind_processors), 1) - eq_( - conn.execute( - compiled, {"uname": ["HJ jack", "RR fred"]} - ).fetchall(), - [(7, "jack"), (8, "fred")], - ) + eq_( + connection.execute( + compiled, {"uname": ["HJ jack", "RR fred"]} + ).fetchall(), + [(7, "jack"), (8, "fred")], + ) - eq_(len(compiled._bind_processors), 1) + eq_(len(compiled._bind_processors), 1) @testing.fails_on("firebird", "uses sql-92 rules") @testing.fails_on("sybase", "uses sql-92 rules") @testing.skip_if(["mssql"]) - def test_bind_in(self): + def test_bind_in(self, connection): """test calling IN against a bind parameter. this isn't allowed on several platforms since we @@ -809,56 +799,55 @@ class QueryTest(fixtures.TestBase): """ - users.insert().execute(user_id=7, user_name="jack") - users.insert().execute(user_id=8, user_name="fred") - users.insert().execute(user_id=9, user_name=None) + connection.execute(users.insert(), user_id=7, user_name="jack") + connection.execute(users.insert(), user_id=8, user_name="fred") + connection.execute(users.insert(), user_id=9, user_name=None) u = bindparam("search_key", type_=String) s = users.select(not_(u.in_([]))) - r = s.execute(search_key="john").fetchall() + r = connection.execute(s, search_key="john").fetchall() assert len(r) == 3 - r = s.execute(search_key=None).fetchall() + r = connection.execute(s, search_key=None).fetchall() assert len(r) == 3 - def test_literal_in(self): + def test_literal_in(self, connection): """similar to test_bind_in but use a bind with a value.""" - users.insert().execute(user_id=7, user_name="jack") - users.insert().execute(user_id=8, user_name="fred") - users.insert().execute(user_id=9, user_name=None) + connection.execute(users.insert(), user_id=7, user_name="jack") + connection.execute(users.insert(), user_id=8, user_name="fred") + connection.execute(users.insert(), user_id=9, user_name=None) s = users.select(not_(literal("john").in_([]))) - r = s.execute().fetchall() + r = connection.execute(s).fetchall() assert len(r) == 3 @testing.requires.boolean_col_expressions - def test_empty_in_filtering_static(self): + def test_empty_in_filtering_static(self, connection): """test the behavior of the in_() function when comparing against an empty collection, specifically that a proper boolean value is generated. """ - with testing.db.connect() as conn: - conn.execute( - users.insert(), - [ - {"user_id": 7, "user_name": "jack"}, - {"user_id": 8, "user_name": "ed"}, - {"user_id": 9, "user_name": None}, - ], - ) + connection.execute( + users.insert(), + [ + {"user_id": 7, "user_name": "jack"}, + {"user_id": 8, "user_name": "ed"}, + {"user_id": 9, "user_name": None}, + ], + ) - s = users.select(users.c.user_name.in_([]) == True) # noqa - r = conn.execute(s).fetchall() - assert len(r) == 0 - s = users.select(users.c.user_name.in_([]) == False) # noqa - r = conn.execute(s).fetchall() - assert len(r) == 3 - s = users.select(users.c.user_name.in_([]) == None) # noqa - r = conn.execute(s).fetchall() - assert len(r) == 0 + s = users.select(users.c.user_name.in_([]) == True) # noqa + r = connection.execute(s).fetchall() + assert len(r) == 0 + s = users.select(users.c.user_name.in_([]) == False) # noqa + r = connection.execute(s).fetchall() + assert len(r) == 3 + s = users.select(users.c.user_name.in_([]) == None) # noqa + r = connection.execute(s).fetchall() + assert len(r) == 0 class RequiredBindTest(fixtures.TablesTest): @@ -951,98 +940,102 @@ class LimitTest(fixtures.TestBase): ) metadata.create_all() - users.insert().execute(user_id=1, user_name="john") - addresses.insert().execute(address_id=1, user_id=1, address="addr1") - users.insert().execute(user_id=2, user_name="jack") - addresses.insert().execute(address_id=2, user_id=2, address="addr1") - users.insert().execute(user_id=3, user_name="ed") - addresses.insert().execute(address_id=3, user_id=3, address="addr2") - users.insert().execute(user_id=4, user_name="wendy") - addresses.insert().execute(address_id=4, user_id=4, address="addr3") - users.insert().execute(user_id=5, user_name="laura") - addresses.insert().execute(address_id=5, user_id=5, address="addr4") - users.insert().execute(user_id=6, user_name="ralph") - addresses.insert().execute(address_id=6, user_id=6, address="addr5") - users.insert().execute(user_id=7, user_name="fido") - addresses.insert().execute(address_id=7, user_id=7, address="addr5") + with testing.db.connect() as conn: + conn.execute(users.insert(), user_id=1, user_name="john") + conn.execute( + addresses.insert(), address_id=1, user_id=1, address="addr1" + ) + conn.execute(users.insert(), user_id=2, user_name="jack") + conn.execute( + addresses.insert(), address_id=2, user_id=2, address="addr1" + ) + conn.execute(users.insert(), user_id=3, user_name="ed") + conn.execute( + addresses.insert(), address_id=3, user_id=3, address="addr2" + ) + conn.execute(users.insert(), user_id=4, user_name="wendy") + conn.execute( + addresses.insert(), address_id=4, user_id=4, address="addr3" + ) + conn.execute(users.insert(), user_id=5, user_name="laura") + conn.execute( + addresses.insert(), address_id=5, user_id=5, address="addr4" + ) + conn.execute(users.insert(), user_id=6, user_name="ralph") + conn.execute( + addresses.insert(), address_id=6, user_id=6, address="addr5" + ) + conn.execute(users.insert(), user_id=7, user_name="fido") + conn.execute( + addresses.insert(), address_id=7, user_id=7, address="addr5" + ) @classmethod def teardown_class(cls): metadata.drop_all() - def test_select_limit(self): - r = ( + def test_select_limit(self, connection): + r = connection.execute( users.select(limit=3, order_by=[users.c.user_id]) - .execute() - .fetchall() - ) + ).fetchall() self.assert_(r == [(1, "john"), (2, "jack"), (3, "ed")], repr(r)) @testing.requires.offset - def test_select_limit_offset(self): + def test_select_limit_offset(self, connection): """Test the interaction between limit and offset""" - r = ( + r = connection.execute( users.select(limit=3, offset=2, order_by=[users.c.user_id]) - .execute() - .fetchall() - ) + ).fetchall() self.assert_(r == [(3, "ed"), (4, "wendy"), (5, "laura")]) - r = ( + r = connection.execute( users.select(offset=5, order_by=[users.c.user_id]) - .execute() - .fetchall() - ) + ).fetchall() self.assert_(r == [(6, "ralph"), (7, "fido")]) - def test_select_distinct_limit(self): + def test_select_distinct_limit(self, connection): """Test the interaction between limit and distinct""" r = sorted( [ x[0] - for x in select([addresses.c.address]) - .distinct() - .limit(3) - .order_by(addresses.c.address) - .execute() - .fetchall() + for x in connection.execute( + select([addresses.c.address]).distinct().limit(3) + ) ] ) self.assert_(len(r) == 3, repr(r)) self.assert_(r[0] != r[1] and r[1] != r[2], repr(r)) @testing.requires.offset - def test_select_distinct_offset(self): + def test_select_distinct_offset(self, connection): """Test the interaction between distinct and offset""" r = sorted( [ x[0] - for x in select([addresses.c.address]) - .distinct() - .offset(1) - .order_by(addresses.c.address) - .execute() - .fetchall() + for x in connection.execute( + select([addresses.c.address]) + .distinct() + .offset(1) + .order_by(addresses.c.address) + ).fetchall() ] ) eq_(len(r), 4) self.assert_(r[0] != r[1] and r[1] != r[2] and r[2] != [3], repr(r)) @testing.requires.offset - def test_select_distinct_limit_offset(self): + def test_select_distinct_limit_offset(self, connection): """Test the interaction between limit and limit/offset""" - r = ( + r = connection.execute( select([addresses.c.address]) .order_by(addresses.c.address) .distinct() .offset(2) .limit(3) - .execute() - .fetchall() - ) + ).fetchall() self.assert_(len(r) == 3, repr(r)) self.assert_(r[0] != r[1] and r[1] != r[2], repr(r)) @@ -1099,27 +1092,31 @@ class CompoundTest(fixtures.TestBase): ) metadata.create_all() - t1.insert().execute( - [ - dict(col2="t1col2r1", col3="aaa", col4="aaa"), - dict(col2="t1col2r2", col3="bbb", col4="bbb"), - dict(col2="t1col2r3", col3="ccc", col4="ccc"), - ] - ) - t2.insert().execute( - [ - dict(col2="t2col2r1", col3="aaa", col4="bbb"), - dict(col2="t2col2r2", col3="bbb", col4="ccc"), - dict(col2="t2col2r3", col3="ccc", col4="aaa"), - ] - ) - t3.insert().execute( - [ - dict(col2="t3col2r1", col3="aaa", col4="ccc"), - dict(col2="t3col2r2", col3="bbb", col4="aaa"), - dict(col2="t3col2r3", col3="ccc", col4="bbb"), - ] - ) + with testing.db.connect() as conn: + conn.execute( + t1.insert(), + [ + dict(col2="t1col2r1", col3="aaa", col4="aaa"), + dict(col2="t1col2r2", col3="bbb", col4="bbb"), + dict(col2="t1col2r3", col3="ccc", col4="ccc"), + ], + ) + conn.execute( + t2.insert(), + [ + dict(col2="t2col2r1", col3="aaa", col4="bbb"), + dict(col2="t2col2r2", col3="bbb", col4="ccc"), + dict(col2="t2col2r3", col3="ccc", col4="aaa"), + ], + ) + conn.execute( + t3.insert(), + [ + dict(col2="t3col2r1", col3="aaa", col4="ccc"), + dict(col2="t3col2r2", col3="bbb", col4="aaa"), + dict(col2="t3col2r3", col3="ccc", col4="bbb"), + ], + ) @engines.close_first def teardown(self): @@ -1133,7 +1130,7 @@ class CompoundTest(fixtures.TestBase): return sorted([tuple(row) for row in executed.fetchall()]) @testing.requires.subqueries - def test_union(self): + def test_union(self, connection): (s1, s2) = ( select( [t1.c.col3.label("col3"), t1.c.col4.label("col4")], @@ -1152,14 +1149,16 @@ class CompoundTest(fixtures.TestBase): ("bbb", "ccc"), ("ccc", "aaa"), ] - found1 = self._fetchall_sorted(u.execute()) + found1 = self._fetchall_sorted(connection.execute(u)) eq_(found1, wanted) - found2 = self._fetchall_sorted(u.alias("bar").select().execute()) + found2 = self._fetchall_sorted( + connection.execute(u.alias("bar").select()) + ) eq_(found2, wanted) @testing.fails_on("firebird", "doesn't like ORDER BY with UNIONs") - def test_union_ordered(self): + def test_union_ordered(self, connection): (s1, s2) = ( select( [t1.c.col3.label("col3"), t1.c.col4.label("col4")], @@ -1178,11 +1177,11 @@ class CompoundTest(fixtures.TestBase): ("bbb", "ccc"), ("ccc", "aaa"), ] - eq_(u.execute().fetchall(), wanted) + eq_(connection.execute(u).fetchall(), wanted) @testing.fails_on("firebird", "doesn't like ORDER BY with UNIONs") @testing.requires.subqueries - def test_union_ordered_alias(self): + def test_union_ordered_alias(self, connection): (s1, s2) = ( select( [t1.c.col3.label("col3"), t1.c.col4.label("col4")], @@ -1201,7 +1200,7 @@ class CompoundTest(fixtures.TestBase): ("bbb", "ccc"), ("ccc", "aaa"), ] - eq_(u.alias("bar").select().execute().fetchall(), wanted) + eq_(connection.execute(u.alias("bar").select()).fetchall(), wanted) @testing.crashes("oracle", "FIXME: unknown, verify not fails_on") @testing.fails_on( @@ -1212,20 +1211,22 @@ class CompoundTest(fixtures.TestBase): testing.requires._mysql_not_mariadb_104, "FIXME: unknown" ) @testing.fails_on("sqlite", "FIXME: unknown") - def test_union_all(self): + def test_union_all(self, connection): e = union_all( select([t1.c.col3]), union(select([t1.c.col3]), select([t1.c.col3])), ) wanted = [("aaa",), ("aaa",), ("bbb",), ("bbb",), ("ccc",), ("ccc",)] - found1 = self._fetchall_sorted(e.execute()) + found1 = self._fetchall_sorted(connection.execute(e)) eq_(found1, wanted) - found2 = self._fetchall_sorted(e.alias("foo").select().execute()) + found2 = self._fetchall_sorted( + connection.execute(e.alias("foo").select()) + ) eq_(found2, wanted) - def test_union_all_lightweight(self): + def test_union_all_lightweight(self, connection): """like test_union_all, but breaks the sub-union into a subquery with an explicit column reference on the outside, more palatable to a wider variety of engines. @@ -1237,14 +1238,16 @@ class CompoundTest(fixtures.TestBase): e = union_all(select([t1.c.col3]), select([u.c.col3])) wanted = [("aaa",), ("aaa",), ("bbb",), ("bbb",), ("ccc",), ("ccc",)] - found1 = self._fetchall_sorted(e.execute()) + found1 = self._fetchall_sorted(connection.execute(e)) eq_(found1, wanted) - found2 = self._fetchall_sorted(e.alias("foo").select().execute()) + found2 = self._fetchall_sorted( + connection.execute(e.alias("foo").select()) + ) eq_(found2, wanted) @testing.requires.intersect - def test_intersect(self): + def test_intersect(self, connection): i = intersect( select([t2.c.col3, t2.c.col4]), select([t2.c.col3, t2.c.col4], t2.c.col4 == t3.c.col3), @@ -1252,15 +1255,17 @@ class CompoundTest(fixtures.TestBase): wanted = [("aaa", "bbb"), ("bbb", "ccc"), ("ccc", "aaa")] - found1 = self._fetchall_sorted(i.execute()) + found1 = self._fetchall_sorted(connection.execute(i)) eq_(found1, wanted) - found2 = self._fetchall_sorted(i.alias("bar").select().execute()) + found2 = self._fetchall_sorted( + connection.execute(i.alias("bar").select()) + ) eq_(found2, wanted) @testing.requires.except_ @testing.fails_on("sqlite", "Can't handle this style of nesting") - def test_except_style1(self): + def test_except_style1(self, connection): e = except_( union( select([t1.c.col3, t1.c.col4]), @@ -1279,11 +1284,11 @@ class CompoundTest(fixtures.TestBase): ("ccc", "ccc"), ] - found = self._fetchall_sorted(e.alias().select().execute()) + found = self._fetchall_sorted(connection.execute(e.alias().select())) eq_(found, wanted) @testing.requires.except_ - def test_except_style2(self): + def test_except_style2(self, connection): # same as style1, but add alias().select() to the except_(). # sqlite can handle it now. @@ -1307,10 +1312,10 @@ class CompoundTest(fixtures.TestBase): ("ccc", "ccc"), ] - found1 = self._fetchall_sorted(e.execute()) + found1 = self._fetchall_sorted(connection.execute(e)) eq_(found1, wanted) - found2 = self._fetchall_sorted(e.alias().select().execute()) + found2 = self._fetchall_sorted(connection.execute(e.alias().select())) eq_(found2, wanted) @testing.fails_on( @@ -1318,7 +1323,7 @@ class CompoundTest(fixtures.TestBase): "Can't handle this style of nesting", ) @testing.requires.except_ - def test_except_style3(self): + def test_except_style3(self, connection): # aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc e = except_( select([t1.c.col3]), # aaa, bbb, ccc @@ -1327,11 +1332,11 @@ class CompoundTest(fixtures.TestBase): select([t3.c.col3], t3.c.col3 == "ccc"), # ccc ), ) - eq_(e.execute().fetchall(), [("ccc",)]) - eq_(e.alias("foo").select().execute().fetchall(), [("ccc",)]) + eq_(connection.execute(e).fetchall(), [("ccc",)]) + eq_(connection.execute(e.alias("foo").select()).fetchall(), [("ccc",)]) @testing.requires.except_ - def test_except_style4(self): + def test_except_style4(self, connection): # aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc e = except_( select([t1.c.col3]), # aaa, bbb, ccc @@ -1343,15 +1348,15 @@ class CompoundTest(fixtures.TestBase): .select(), ) - eq_(e.execute().fetchall(), [("ccc",)]) - eq_(e.alias().select().execute().fetchall(), [("ccc",)]) + eq_(connection.execute(e).fetchall(), [("ccc",)]) + eq_(connection.execute(e.alias().select()).fetchall(), [("ccc",)]) @testing.requires.intersect @testing.fails_on( ["sqlite", testing.requires._mysql_not_mariadb_104], "sqlite can't handle leading parenthesis", ) - def test_intersect_unions(self): + def test_intersect_unions(self, connection): u = intersect( union( select([t1.c.col3, t1.c.col4]), select([t3.c.col3, t3.c.col4]) @@ -1363,12 +1368,12 @@ class CompoundTest(fixtures.TestBase): .select(), ) wanted = [("aaa", "ccc"), ("bbb", "aaa"), ("ccc", "bbb")] - found = self._fetchall_sorted(u.execute()) + found = self._fetchall_sorted(connection.execute(u)) eq_(found, wanted) @testing.requires.intersect - def test_intersect_unions_2(self): + def test_intersect_unions_2(self, connection): u = intersect( union( select([t1.c.col3, t1.c.col4]), select([t3.c.col3, t3.c.col4]) @@ -1382,12 +1387,12 @@ class CompoundTest(fixtures.TestBase): .select(), ) wanted = [("aaa", "ccc"), ("bbb", "aaa"), ("ccc", "bbb")] - found = self._fetchall_sorted(u.execute()) + found = self._fetchall_sorted(connection.execute(u)) eq_(found, wanted) @testing.requires.intersect - def test_intersect_unions_3(self): + def test_intersect_unions_3(self, connection): u = intersect( select([t2.c.col3, t2.c.col4]), union( @@ -1399,12 +1404,12 @@ class CompoundTest(fixtures.TestBase): .select(), ) wanted = [("aaa", "bbb"), ("bbb", "ccc"), ("ccc", "aaa")] - found = self._fetchall_sorted(u.execute()) + found = self._fetchall_sorted(connection.execute(u)) eq_(found, wanted) @testing.requires.intersect - def test_composite_alias(self): + def test_composite_alias(self, connection): ua = intersect( select([t2.c.col3, t2.c.col4]), union( @@ -1417,7 +1422,7 @@ class CompoundTest(fixtures.TestBase): ).alias() wanted = [("aaa", "bbb"), ("bbb", "ccc"), ("ccc", "aaa")] - found = self._fetchall_sorted(ua.select().execute()) + found = self._fetchall_sorted(connection.execute(ua.select())) eq_(found, wanted) @@ -1466,19 +1471,24 @@ class JoinTest(fixtures.TestBase): metadata.drop_all() metadata.create_all() - # t1.10 -> t2.20 -> t3.30 - # t1.11 -> t2.21 - # t1.12 - t1.insert().execute( - {"t1_id": 10, "name": "t1 #10"}, - {"t1_id": 11, "name": "t1 #11"}, - {"t1_id": 12, "name": "t1 #12"}, - ) - t2.insert().execute( - {"t2_id": 20, "t1_id": 10, "name": "t2 #20"}, - {"t2_id": 21, "t1_id": 11, "name": "t2 #21"}, - ) - t3.insert().execute({"t3_id": 30, "t2_id": 20, "name": "t3 #30"}) + with testing.db.connect() as conn: + # t1.10 -> t2.20 -> t3.30 + # t1.11 -> t2.21 + # t1.12 + conn.execute( + t1.insert(), + {"t1_id": 10, "name": "t1 #10"}, + {"t1_id": 11, "name": "t1 #11"}, + {"t1_id": 12, "name": "t1 #12"}, + ) + conn.execute( + t2.insert(), + {"t2_id": 20, "t1_id": 10, "name": "t2 #20"}, + {"t2_id": 21, "t1_id": 11, "name": "t2 #21"}, + ) + conn.execute( + t3.insert(), {"t3_id": 30, "t2_id": 20, "name": "t3 #30"} + ) @classmethod def teardown_class(cls): @@ -1486,10 +1496,11 @@ class JoinTest(fixtures.TestBase): def assertRows(self, statement, expected): """Execute a statement and assert that rows returned equal expected.""" - - found = sorted([tuple(row) for row in statement.execute().fetchall()]) - - eq_(found, sorted(expected)) + with testing.db.connect() as conn: + found = sorted( + [tuple(row) for row in conn.execute(statement).fetchall()] + ) + eq_(found, sorted(expected)) def test_join_x1(self): """Joins t1->t2.""" @@ -1805,30 +1816,35 @@ class OperatorTest(fixtures.TestBase): ) metadata.create_all() - flds.insert().execute( - [dict(intcol=5, strcol="foo"), dict(intcol=13, strcol="bar")] - ) + with testing.db.connect() as conn: + conn.execute( + flds.insert(), + [dict(intcol=5, strcol="foo"), dict(intcol=13, strcol="bar")], + ) @classmethod def teardown_class(cls): metadata.drop_all() # TODO: seems like more tests warranted for this setup. - def test_modulo(self): + def test_modulo(self, connection): eq_( - select([flds.c.intcol % 3], order_by=flds.c.idcol) - .execute() - .fetchall(), + connection.execute( + select([flds.c.intcol % 3], order_by=flds.c.idcol) + ).fetchall(), [(2,), (1,)], ) @testing.requires.window_functions - def test_over(self): + def test_over(self, connection): eq_( - select( - [flds.c.intcol, func.row_number().over(order_by=flds.c.strcol)] - ) - .execute() - .fetchall(), + connection.execute( + select( + [ + flds.c.intcol, + func.row_number().over(order_by=flds.c.strcol), + ] + ) + ).fetchall(), [(13, 1), (5, 2)], ) -- 2.39.5