@engines.close_first
def teardown(self):
- addresses.delete().execute()
- users.delete().execute()
- users2.delete().execute()
+ with testing.db.connect() as conn:
+ conn.execute(addresses.delete())
+ conn.execute(users.delete())
+ conn.execute(users2.delete())
@classmethod
def teardown_class(cls):
@testing.fails_on(
"firebird", "kinterbasdb doesn't send full type information"
)
- def test_order_by_label(self):
+ def test_order_by_label(self, connection):
"""test that a label within an ORDER BY works on each backend.
This test should be modified to support [ticket:1068] when that ticket
"""
- users.insert().execute(
+ connection.execute(
+ users.insert(),
{"user_id": 7, "user_name": "jack"},
{"user_id": 8, "user_name": "ed"},
{"user_id": 9, "user_name": "fred"},
concat = ("test: " + users.c.user_name).label("thedata")
eq_(
- select([concat]).order_by("thedata").execute().fetchall(),
+ connection.execute(
+ select([concat]).order_by("thedata")
+ ).fetchall(),
[("test: ed",), ("test: fred",), ("test: jack",)],
)
eq_(
- select([concat]).order_by("thedata").execute().fetchall(),
+ connection.execute(
+ select([concat]).order_by("thedata")
+ ).fetchall(),
[("test: ed",), ("test: fred",), ("test: jack",)],
)
concat = ("test: " + users.c.user_name).label("thedata")
eq_(
- select([concat]).order_by(desc("thedata")).execute().fetchall(),
+ connection.execute(
+ select([concat]).order_by(desc("thedata"))
+ ).fetchall(),
[("test: jack",), ("test: fred",), ("test: ed",)],
)
@testing.requires.order_by_label_with_expression
- def test_order_by_label_compound(self):
- users.insert().execute(
+ def test_order_by_label_compound(self, connection):
+ connection.execute(
+ users.insert(),
{"user_id": 7, "user_name": "jack"},
{"user_id": 8, "user_name": "ed"},
{"user_id": 9, "user_name": "fred"},
concat = ("test: " + users.c.user_name).label("thedata")
eq_(
- select([concat])
- .order_by(literal_column("thedata") + "x")
- .execute()
- .fetchall(),
+ connection.execute(
+ select([concat]).order_by(literal_column("thedata") + "x")
+ ).fetchall(),
[("test: ed",), ("test: fred",), ("test: jack",)],
)
@testing.requires.boolean_col_expressions
- def test_or_and_as_columns(self):
+ def test_or_and_as_columns(self, connection):
true, false = literal(True), literal(False)
- eq_(testing.db.execute(select([and_(true, false)])).scalar(), False)
- eq_(testing.db.execute(select([and_(true, true)])).scalar(), True)
- eq_(testing.db.execute(select([or_(true, false)])).scalar(), True)
- eq_(testing.db.execute(select([or_(false, false)])).scalar(), False)
+ eq_(connection.execute(select([and_(true, false)])).scalar(), False)
+ eq_(connection.execute(select([and_(true, true)])).scalar(), True)
+ eq_(connection.execute(select([or_(true, false)])).scalar(), True)
+ eq_(connection.execute(select([or_(false, false)])).scalar(), False)
eq_(
- testing.db.execute(select([not_(or_(false, false))])).scalar(),
+ connection.execute(select([not_(or_(false, false))])).scalar(),
True,
)
- row = testing.db.execute(
+ row = connection.execute(
select(
[or_(false, false).label("x"), and_(true, false).label("y")]
)
assert row.x == False # noqa
assert row.y == False # noqa
- row = testing.db.execute(
+ row = connection.execute(
select([or_(true, false).label("x"), and_(true, false).label("y")])
).first()
assert row.x == True # noqa
assert row.y == False # noqa
- def test_like_ops(self):
- users.insert().execute(
+ def test_like_ops(self, connection):
+ connection.execute(
+ users.insert(),
{"user_id": 1, "user_name": "apples"},
{"user_id": 2, "user_name": "oranges"},
{"user_id": 3, "user_name": "bananas"},
[(5,)],
),
):
- eq_(expr.execute().fetchall(), result)
+ eq_(connection.execute(expr).fetchall(), result)
@testing.requires.mod_operator_as_percent_sign
@testing.emits_warning(".*now automatically escapes.*")
- def test_percents_in_text(self):
+ def test_percents_in_text(self, connection):
for expr, result in (
(text("select 6 % 10"), 6),
(text("select 17 % 10"), 7),
(text("select '%%%'"), "%%%"),
(text("select 'hello % world'"), "hello % world"),
):
- eq_(testing.db.scalar(expr), result)
+ eq_(connection.scalar(expr), result)
- def test_ilike(self):
- users.insert().execute(
+ def test_ilike(self, connection):
+ connection.execute(
+ users.insert(),
{"user_id": 1, "user_name": "one"},
{"user_id": 2, "user_name": "TwO"},
{"user_id": 3, "user_name": "ONE"},
)
eq_(
- select([users.c.user_id])
- .where(users.c.user_name.ilike("one"))
- .execute()
- .fetchall(),
+ connection.execute(
+ select([users.c.user_id]).where(users.c.user_name.ilike("one"))
+ ).fetchall(),
[(1,), (3,), (4,)],
)
eq_(
- select([users.c.user_id])
- .where(users.c.user_name.ilike("TWO"))
- .execute()
- .fetchall(),
+ connection.execute(
+ select([users.c.user_id]).where(users.c.user_name.ilike("TWO"))
+ ).fetchall(),
[(2,)],
)
if testing.against("postgresql"):
eq_(
- select([users.c.user_id])
- .where(users.c.user_name.like("one"))
- .execute()
- .fetchall(),
+ connection.execute(
+ select([users.c.user_id]).where(
+ users.c.user_name.like("one")
+ )
+ ).fetchall(),
[(1,)],
)
eq_(
- select([users.c.user_id])
- .where(users.c.user_name.like("TWO"))
- .execute()
- .fetchall(),
+ connection.execute(
+ select([users.c.user_id]).where(
+ users.c.user_name.like("TWO")
+ )
+ ).fetchall(),
[],
)
- def test_compiled_execute(self):
- users.insert().execute(user_id=7, user_name="jack")
+ def test_compiled_execute(self, connection):
+ connection.execute(users.insert(), user_id=7, user_name="jack")
s = select([users], users.c.user_id == bindparam("id")).compile()
- c = testing.db.connect()
- eq_(c.execute(s, id=7).first()._mapping["user_id"], 7)
+ eq_(connection.execute(s, id=7).first()._mapping["user_id"], 7)
- def test_compiled_insert_execute(self):
- users.insert().compile().execute(user_id=7, user_name="jack")
+ def test_compiled_insert_execute(self, connection):
+ connection.execute(
+ users.insert().compile(), user_id=7, user_name="jack"
+ )
s = select([users], users.c.user_id == bindparam("id")).compile()
- c = testing.db.connect()
- eq_(c.execute(s, id=7).first()._mapping["user_id"], 7)
+ eq_(connection.execute(s, id=7).first()._mapping["user_id"], 7)
- def test_repeated_bindparams(self):
+ def test_repeated_bindparams(self, connection):
"""Tests that a BindParam can be used more than once.
This should be run for DB-APIs with both positional and named
paramstyles.
"""
- users.insert().execute(user_id=7, user_name="jack")
- users.insert().execute(user_id=8, user_name="fred")
+ connection.execute(users.insert(), user_id=7, user_name="jack")
+ connection.execute(users.insert(), user_id=8, user_name="fred")
u = bindparam("userid")
s = users.select(and_(users.c.user_name == u, users.c.user_name == u))
- r = s.execute(userid="fred").fetchall()
+ r = connection.execute(s, userid="fred").fetchall()
assert len(r) == 1
def test_bindparam_detection(self):
a_eq(prep(r".\:that$ :other."), ".:that$ ?.")
@testing.requires.standalone_binds
- def test_select_from_bindparam(self):
+ def test_select_from_bindparam(self, connection):
"""Test result row processing when selecting from a plain bind
param."""
return "INT_%d" % value
eq_(
- testing.db.scalar(select([cast("INT_5", type_=MyInteger)])),
+ connection.scalar(select([cast("INT_5", type_=MyInteger)])),
"INT_5",
)
eq_(
- testing.db.scalar(
+ connection.scalar(
select([cast("INT_5", type_=MyInteger).label("foo")])
),
"INT_5",
)
- def test_order_by(self):
+ def test_order_by(self, connection):
"""Exercises ORDER BY clause generation.
Tests simple, compound, aliased and DESC clauses.
"""
- users.insert().execute(user_id=1, user_name="c")
- users.insert().execute(user_id=2, user_name="b")
- users.insert().execute(user_id=3, user_name="a")
+ connection.execute(users.insert(), user_id=1, user_name="c")
+ connection.execute(users.insert(), user_id=2, user_name="b")
+ connection.execute(users.insert(), user_id=3, user_name="a")
def a_eq(executable, wanted):
- got = list(executable.execute())
+ got = list(connection.execute(executable))
eq_(got, wanted)
for labels in False, True:
)
@testing.requires.nullsordering
- def test_order_by_nulls(self):
+ def test_order_by_nulls(self, connection):
"""Exercises ORDER BY clause generation.
Tests simple, compound, aliased and DESC clauses.
"""
- users.insert().execute(user_id=1)
- users.insert().execute(user_id=2, user_name="b")
- users.insert().execute(user_id=3, user_name="a")
+ connection.execute(users.insert(), user_id=1)
+ connection.execute(users.insert(), user_id=2, user_name="b")
+ connection.execute(users.insert(), user_id=3, user_name="a")
def a_eq(executable, wanted):
- got = list(executable.execute())
+ got = list(connection.execute(executable))
eq_(got, wanted)
for labels in False, True:
[(3, "a"), (2, "b"), (1, None)],
)
- def test_in_filtering(self):
+ def test_in_filtering(self, connection):
"""test the behavior of the in_() function."""
- users.insert().execute(user_id=7, user_name="jack")
- users.insert().execute(user_id=8, user_name="fred")
- users.insert().execute(user_id=9, user_name=None)
+ connection.execute(users.insert(), user_id=7, user_name="jack")
+ connection.execute(users.insert(), user_id=8, user_name="fred")
+ connection.execute(users.insert(), user_id=9, user_name=None)
s = users.select(users.c.user_name.in_([]))
- r = s.execute().fetchall()
+ r = connection.execute(s).fetchall()
# No username is in empty set
assert len(r) == 0
s = users.select(not_(users.c.user_name.in_([])))
- r = s.execute().fetchall()
+ r = connection.execute(s).fetchall()
assert len(r) == 3
s = users.select(users.c.user_name.in_(["jack", "fred"]))
- r = s.execute().fetchall()
+ r = connection.execute(s).fetchall()
assert len(r) == 2
s = users.select(not_(users.c.user_name.in_(["jack", "fred"])))
- r = s.execute().fetchall()
+ r = connection.execute(s).fetchall()
# Null values are not outside any set
assert len(r) == 0
- def test_expanding_in(self):
- testing.db.execute(
+ def test_expanding_in(self, connection):
+ connection.execute(
users.insert(),
[
dict(user_id=7, user_name="jack"),
],
)
- with testing.db.connect() as conn:
- stmt = (
- select([users])
- .where(
- users.c.user_name.in_(bindparam("uname", expanding=True))
- )
- .order_by(users.c.user_id)
- )
+ stmt = (
+ select([users])
+ .where(users.c.user_name.in_(bindparam("uname", expanding=True)))
+ .order_by(users.c.user_id)
+ )
- eq_(
- conn.execute(stmt, {"uname": ["jack"]}).fetchall(),
- [(7, "jack")],
- )
+ eq_(
+ connection.execute(stmt, {"uname": ["jack"]}).fetchall(),
+ [(7, "jack")],
+ )
- eq_(
- conn.execute(stmt, {"uname": ["jack", "fred"]}).fetchall(),
- [(7, "jack"), (8, "fred")],
- )
+ eq_(
+ connection.execute(stmt, {"uname": ["jack", "fred"]}).fetchall(),
+ [(7, "jack"), (8, "fred")],
+ )
- eq_(conn.execute(stmt, {"uname": []}).fetchall(), [])
+ eq_(connection.execute(stmt, {"uname": []}).fetchall(), [])
- assert_raises_message(
- exc.StatementError,
- "'expanding' parameters can't be used with executemany()",
- conn.execute,
- users.update().where(
- users.c.user_name.in_(bindparam("uname", expanding=True))
- ),
- [{"uname": ["fred"]}, {"uname": ["ed"]}],
- )
+ assert_raises_message(
+ exc.StatementError,
+ "'expanding' parameters can't be used with executemany()",
+ connection.execute,
+ users.update().where(
+ users.c.user_name.in_(bindparam("uname", expanding=True))
+ ),
+ [{"uname": ["fred"]}, {"uname": ["ed"]}],
+ )
@testing.requires.no_quoting_special_bind_names
- def test_expanding_in_special_chars(self):
- testing.db.execute(
+ def test_expanding_in_special_chars(self, connection):
+ connection.execute(
users.insert(),
[
dict(user_id=7, user_name="jack"),
],
)
- with testing.db.connect() as conn:
- stmt = (
- select([users])
- .where(users.c.user_name.in_(bindparam("u35", expanding=True)))
- .where(users.c.user_id == bindparam("u46"))
- .order_by(users.c.user_id)
- )
+ stmt = (
+ select([users])
+ .where(users.c.user_name.in_(bindparam("u35", expanding=True)))
+ .where(users.c.user_id == bindparam("u46"))
+ .order_by(users.c.user_id)
+ )
- eq_(
- conn.execute(
- stmt, {"u35": ["jack", "fred"], "u46": 7}
- ).fetchall(),
- [(7, "jack")],
- )
+ eq_(
+ connection.execute(
+ stmt, {"u35": ["jack", "fred"], "u46": 7}
+ ).fetchall(),
+ [(7, "jack")],
+ )
- stmt = (
- select([users])
- .where(
- users.c.user_name.in_(bindparam("u.35", expanding=True))
- )
- .where(users.c.user_id == bindparam("u.46"))
- .order_by(users.c.user_id)
- )
+ stmt = (
+ select([users])
+ .where(users.c.user_name.in_(bindparam("u.35", expanding=True)))
+ .where(users.c.user_id == bindparam("u.46"))
+ .order_by(users.c.user_id)
+ )
- eq_(
- conn.execute(
- stmt, {"u.35": ["jack", "fred"], "u.46": 7}
- ).fetchall(),
- [(7, "jack")],
- )
+ eq_(
+ connection.execute(
+ stmt, {"u.35": ["jack", "fred"], "u.46": 7}
+ ).fetchall(),
+ [(7, "jack")],
+ )
- def test_expanding_in_multiple(self):
- testing.db.execute(
+ def test_expanding_in_multiple(self, connection):
+ connection.execute(
users.insert(),
[
dict(user_id=7, user_name="jack"),
],
)
- with testing.db.connect() as conn:
- stmt = (
- select([users])
- .where(
- users.c.user_name.in_(bindparam("uname", expanding=True))
- )
- .where(
- users.c.user_id.in_(bindparam("userid", expanding=True))
- )
- .order_by(users.c.user_id)
- )
+ stmt = (
+ select([users])
+ .where(users.c.user_name.in_(bindparam("uname", expanding=True)))
+ .where(users.c.user_id.in_(bindparam("userid", expanding=True)))
+ .order_by(users.c.user_id)
+ )
- eq_(
- conn.execute(
- stmt, {"uname": ["jack", "fred", "ed"], "userid": [8, 9]}
- ).fetchall(),
- [(8, "fred"), (9, "ed")],
- )
+ eq_(
+ connection.execute(
+ stmt, {"uname": ["jack", "fred", "ed"], "userid": [8, 9]}
+ ).fetchall(),
+ [(8, "fred"), (9, "ed")],
+ )
- def test_expanding_in_repeated(self):
- testing.db.execute(
+ def test_expanding_in_repeated(self, connection):
+ connection.execute(
users.insert(),
[
dict(user_id=7, user_name="jack"),
],
)
- with testing.db.connect() as conn:
- stmt = (
- select([users])
- .where(
- users.c.user_name.in_(bindparam("uname", expanding=True))
- | users.c.user_name.in_(
- bindparam("uname2", expanding=True)
- )
- )
- .where(users.c.user_id == 8)
- )
- stmt = stmt.union(
- select([users])
- .where(
- users.c.user_name.in_(bindparam("uname", expanding=True))
- | users.c.user_name.in_(
- bindparam("uname2", expanding=True)
- )
- )
- .where(users.c.user_id == 9)
- ).order_by("user_id")
-
- eq_(
- conn.execute(
- stmt,
- {
- "uname": ["jack", "fred"],
- "uname2": ["ed"],
- "userid": [8, 9],
- },
- ).fetchall(),
- [(8, "fred"), (9, "ed")],
+ stmt = (
+ select([users])
+ .where(
+ users.c.user_name.in_(bindparam("uname", expanding=True))
+ | users.c.user_name.in_(bindparam("uname2", expanding=True))
+ )
+ .where(users.c.user_id == 8)
+ )
+ stmt = stmt.union(
+ select([users])
+ .where(
+ users.c.user_name.in_(bindparam("uname", expanding=True))
+ | users.c.user_name.in_(bindparam("uname2", expanding=True))
)
+ .where(users.c.user_id == 9)
+ ).order_by("user_id")
+
+ eq_(
+ connection.execute(
+ stmt,
+ {
+ "uname": ["jack", "fred"],
+ "uname2": ["ed"],
+ "userid": [8, 9],
+ },
+ ).fetchall(),
+ [(8, "fred"), (9, "ed")],
+ )
@testing.requires.tuple_in
- def test_expanding_in_composite(self):
- testing.db.execute(
+ def test_expanding_in_composite(self, connection):
+ connection.execute(
users.insert(),
[
dict(user_id=7, user_name="jack"),
],
)
- with testing.db.connect() as conn:
- stmt = (
- select([users])
- .where(
- tuple_(users.c.user_id, users.c.user_name).in_(
- bindparam("uname", expanding=True)
- )
+ stmt = (
+ select([users])
+ .where(
+ tuple_(users.c.user_id, users.c.user_name).in_(
+ bindparam("uname", expanding=True)
)
- .order_by(users.c.user_id)
)
+ .order_by(users.c.user_id)
+ )
- eq_(
- conn.execute(stmt, {"uname": [(7, "jack")]}).fetchall(),
- [(7, "jack")],
- )
+ eq_(
+ connection.execute(stmt, {"uname": [(7, "jack")]}).fetchall(),
+ [(7, "jack")],
+ )
- eq_(
- conn.execute(
- stmt, {"uname": [(7, "jack"), (8, "fred")]}
- ).fetchall(),
- [(7, "jack"), (8, "fred")],
- )
+ eq_(
+ connection.execute(
+ stmt, {"uname": [(7, "jack"), (8, "fred")]}
+ ).fetchall(),
+ [(7, "jack"), (8, "fred")],
+ )
- def test_expanding_in_dont_alter_compiled(self):
+ def test_expanding_in_dont_alter_compiled(self, connection):
"""test for issue #5048 """
class NameWithProcess(TypeDecorator):
Column("user_name", NameWithProcess()),
)
- with testing.db.connect() as conn:
- conn.execute(
- users.insert(),
- [
- dict(user_id=7, user_name="AB jack"),
- dict(user_id=8, user_name="BE fred"),
- dict(user_id=9, user_name="GP ed"),
- ],
- )
+ connection.execute(
+ users.insert(),
+ [
+ dict(user_id=7, user_name="AB jack"),
+ dict(user_id=8, user_name="BE fred"),
+ dict(user_id=9, user_name="GP ed"),
+ ],
+ )
- stmt = (
- select([users])
- .where(
- users.c.user_name.in_(bindparam("uname", expanding=True))
- )
- .order_by(users.c.user_id)
- )
+ stmt = (
+ select([users])
+ .where(users.c.user_name.in_(bindparam("uname", expanding=True)))
+ .order_by(users.c.user_id)
+ )
- compiled = stmt.compile(testing.db)
- eq_(len(compiled._bind_processors), 1)
+ compiled = stmt.compile(testing.db)
+ eq_(len(compiled._bind_processors), 1)
- eq_(
- conn.execute(
- compiled, {"uname": ["HJ jack", "RR fred"]}
- ).fetchall(),
- [(7, "jack"), (8, "fred")],
- )
+ eq_(
+ connection.execute(
+ compiled, {"uname": ["HJ jack", "RR fred"]}
+ ).fetchall(),
+ [(7, "jack"), (8, "fred")],
+ )
- eq_(len(compiled._bind_processors), 1)
+ eq_(len(compiled._bind_processors), 1)
@testing.fails_on("firebird", "uses sql-92 rules")
@testing.fails_on("sybase", "uses sql-92 rules")
@testing.skip_if(["mssql"])
- def test_bind_in(self):
+ def test_bind_in(self, connection):
"""test calling IN against a bind parameter.
this isn't allowed on several platforms since we
"""
- users.insert().execute(user_id=7, user_name="jack")
- users.insert().execute(user_id=8, user_name="fred")
- users.insert().execute(user_id=9, user_name=None)
+ connection.execute(users.insert(), user_id=7, user_name="jack")
+ connection.execute(users.insert(), user_id=8, user_name="fred")
+ connection.execute(users.insert(), user_id=9, user_name=None)
u = bindparam("search_key", type_=String)
s = users.select(not_(u.in_([])))
- r = s.execute(search_key="john").fetchall()
+ r = connection.execute(s, search_key="john").fetchall()
assert len(r) == 3
- r = s.execute(search_key=None).fetchall()
+ r = connection.execute(s, search_key=None).fetchall()
assert len(r) == 3
- def test_literal_in(self):
+ def test_literal_in(self, connection):
"""similar to test_bind_in but use a bind with a value."""
- users.insert().execute(user_id=7, user_name="jack")
- users.insert().execute(user_id=8, user_name="fred")
- users.insert().execute(user_id=9, user_name=None)
+ connection.execute(users.insert(), user_id=7, user_name="jack")
+ connection.execute(users.insert(), user_id=8, user_name="fred")
+ connection.execute(users.insert(), user_id=9, user_name=None)
s = users.select(not_(literal("john").in_([])))
- r = s.execute().fetchall()
+ r = connection.execute(s).fetchall()
assert len(r) == 3
@testing.requires.boolean_col_expressions
- def test_empty_in_filtering_static(self):
+ def test_empty_in_filtering_static(self, connection):
"""test the behavior of the in_() function when
comparing against an empty collection, specifically
that a proper boolean value is generated.
"""
- with testing.db.connect() as conn:
- conn.execute(
- users.insert(),
- [
- {"user_id": 7, "user_name": "jack"},
- {"user_id": 8, "user_name": "ed"},
- {"user_id": 9, "user_name": None},
- ],
- )
+ connection.execute(
+ users.insert(),
+ [
+ {"user_id": 7, "user_name": "jack"},
+ {"user_id": 8, "user_name": "ed"},
+ {"user_id": 9, "user_name": None},
+ ],
+ )
- s = users.select(users.c.user_name.in_([]) == True) # noqa
- r = conn.execute(s).fetchall()
- assert len(r) == 0
- s = users.select(users.c.user_name.in_([]) == False) # noqa
- r = conn.execute(s).fetchall()
- assert len(r) == 3
- s = users.select(users.c.user_name.in_([]) == None) # noqa
- r = conn.execute(s).fetchall()
- assert len(r) == 0
+ s = users.select(users.c.user_name.in_([]) == True) # noqa
+ r = connection.execute(s).fetchall()
+ assert len(r) == 0
+ s = users.select(users.c.user_name.in_([]) == False) # noqa
+ r = connection.execute(s).fetchall()
+ assert len(r) == 3
+ s = users.select(users.c.user_name.in_([]) == None) # noqa
+ r = connection.execute(s).fetchall()
+ assert len(r) == 0
class RequiredBindTest(fixtures.TablesTest):
)
metadata.create_all()
- users.insert().execute(user_id=1, user_name="john")
- addresses.insert().execute(address_id=1, user_id=1, address="addr1")
- users.insert().execute(user_id=2, user_name="jack")
- addresses.insert().execute(address_id=2, user_id=2, address="addr1")
- users.insert().execute(user_id=3, user_name="ed")
- addresses.insert().execute(address_id=3, user_id=3, address="addr2")
- users.insert().execute(user_id=4, user_name="wendy")
- addresses.insert().execute(address_id=4, user_id=4, address="addr3")
- users.insert().execute(user_id=5, user_name="laura")
- addresses.insert().execute(address_id=5, user_id=5, address="addr4")
- users.insert().execute(user_id=6, user_name="ralph")
- addresses.insert().execute(address_id=6, user_id=6, address="addr5")
- users.insert().execute(user_id=7, user_name="fido")
- addresses.insert().execute(address_id=7, user_id=7, address="addr5")
+ with testing.db.connect() as conn:
+ conn.execute(users.insert(), user_id=1, user_name="john")
+ conn.execute(
+ addresses.insert(), address_id=1, user_id=1, address="addr1"
+ )
+ conn.execute(users.insert(), user_id=2, user_name="jack")
+ conn.execute(
+ addresses.insert(), address_id=2, user_id=2, address="addr1"
+ )
+ conn.execute(users.insert(), user_id=3, user_name="ed")
+ conn.execute(
+ addresses.insert(), address_id=3, user_id=3, address="addr2"
+ )
+ conn.execute(users.insert(), user_id=4, user_name="wendy")
+ conn.execute(
+ addresses.insert(), address_id=4, user_id=4, address="addr3"
+ )
+ conn.execute(users.insert(), user_id=5, user_name="laura")
+ conn.execute(
+ addresses.insert(), address_id=5, user_id=5, address="addr4"
+ )
+ conn.execute(users.insert(), user_id=6, user_name="ralph")
+ conn.execute(
+ addresses.insert(), address_id=6, user_id=6, address="addr5"
+ )
+ conn.execute(users.insert(), user_id=7, user_name="fido")
+ conn.execute(
+ addresses.insert(), address_id=7, user_id=7, address="addr5"
+ )
@classmethod
def teardown_class(cls):
metadata.drop_all()
- def test_select_limit(self):
- r = (
+ def test_select_limit(self, connection):
+ r = connection.execute(
users.select(limit=3, order_by=[users.c.user_id])
- .execute()
- .fetchall()
- )
+ ).fetchall()
self.assert_(r == [(1, "john"), (2, "jack"), (3, "ed")], repr(r))
@testing.requires.offset
- def test_select_limit_offset(self):
+ def test_select_limit_offset(self, connection):
"""Test the interaction between limit and offset"""
- r = (
+ r = connection.execute(
users.select(limit=3, offset=2, order_by=[users.c.user_id])
- .execute()
- .fetchall()
- )
+ ).fetchall()
self.assert_(r == [(3, "ed"), (4, "wendy"), (5, "laura")])
- r = (
+ r = connection.execute(
users.select(offset=5, order_by=[users.c.user_id])
- .execute()
- .fetchall()
- )
+ ).fetchall()
self.assert_(r == [(6, "ralph"), (7, "fido")])
- def test_select_distinct_limit(self):
+ def test_select_distinct_limit(self, connection):
"""Test the interaction between limit and distinct"""
r = sorted(
[
x[0]
- for x in select([addresses.c.address])
- .distinct()
- .limit(3)
- .order_by(addresses.c.address)
- .execute()
- .fetchall()
+ for x in connection.execute(
+ select([addresses.c.address]).distinct().limit(3)
+ )
]
)
self.assert_(len(r) == 3, repr(r))
self.assert_(r[0] != r[1] and r[1] != r[2], repr(r))
@testing.requires.offset
- def test_select_distinct_offset(self):
+ def test_select_distinct_offset(self, connection):
"""Test the interaction between distinct and offset"""
r = sorted(
[
x[0]
- for x in select([addresses.c.address])
- .distinct()
- .offset(1)
- .order_by(addresses.c.address)
- .execute()
- .fetchall()
+ for x in connection.execute(
+ select([addresses.c.address])
+ .distinct()
+ .offset(1)
+ .order_by(addresses.c.address)
+ ).fetchall()
]
)
eq_(len(r), 4)
self.assert_(r[0] != r[1] and r[1] != r[2] and r[2] != [3], repr(r))
@testing.requires.offset
- def test_select_distinct_limit_offset(self):
+ def test_select_distinct_limit_offset(self, connection):
"""Test the interaction between limit and limit/offset"""
- r = (
+ r = connection.execute(
select([addresses.c.address])
.order_by(addresses.c.address)
.distinct()
.offset(2)
.limit(3)
- .execute()
- .fetchall()
- )
+ ).fetchall()
self.assert_(len(r) == 3, repr(r))
self.assert_(r[0] != r[1] and r[1] != r[2], repr(r))
)
metadata.create_all()
- t1.insert().execute(
- [
- dict(col2="t1col2r1", col3="aaa", col4="aaa"),
- dict(col2="t1col2r2", col3="bbb", col4="bbb"),
- dict(col2="t1col2r3", col3="ccc", col4="ccc"),
- ]
- )
- t2.insert().execute(
- [
- dict(col2="t2col2r1", col3="aaa", col4="bbb"),
- dict(col2="t2col2r2", col3="bbb", col4="ccc"),
- dict(col2="t2col2r3", col3="ccc", col4="aaa"),
- ]
- )
- t3.insert().execute(
- [
- dict(col2="t3col2r1", col3="aaa", col4="ccc"),
- dict(col2="t3col2r2", col3="bbb", col4="aaa"),
- dict(col2="t3col2r3", col3="ccc", col4="bbb"),
- ]
- )
+ with testing.db.connect() as conn:
+ conn.execute(
+ t1.insert(),
+ [
+ dict(col2="t1col2r1", col3="aaa", col4="aaa"),
+ dict(col2="t1col2r2", col3="bbb", col4="bbb"),
+ dict(col2="t1col2r3", col3="ccc", col4="ccc"),
+ ],
+ )
+ conn.execute(
+ t2.insert(),
+ [
+ dict(col2="t2col2r1", col3="aaa", col4="bbb"),
+ dict(col2="t2col2r2", col3="bbb", col4="ccc"),
+ dict(col2="t2col2r3", col3="ccc", col4="aaa"),
+ ],
+ )
+ conn.execute(
+ t3.insert(),
+ [
+ dict(col2="t3col2r1", col3="aaa", col4="ccc"),
+ dict(col2="t3col2r2", col3="bbb", col4="aaa"),
+ dict(col2="t3col2r3", col3="ccc", col4="bbb"),
+ ],
+ )
@engines.close_first
def teardown(self):
return sorted([tuple(row) for row in executed.fetchall()])
@testing.requires.subqueries
- def test_union(self):
+ def test_union(self, connection):
(s1, s2) = (
select(
[t1.c.col3.label("col3"), t1.c.col4.label("col4")],
("bbb", "ccc"),
("ccc", "aaa"),
]
- found1 = self._fetchall_sorted(u.execute())
+ found1 = self._fetchall_sorted(connection.execute(u))
eq_(found1, wanted)
- found2 = self._fetchall_sorted(u.alias("bar").select().execute())
+ found2 = self._fetchall_sorted(
+ connection.execute(u.alias("bar").select())
+ )
eq_(found2, wanted)
@testing.fails_on("firebird", "doesn't like ORDER BY with UNIONs")
- def test_union_ordered(self):
+ def test_union_ordered(self, connection):
(s1, s2) = (
select(
[t1.c.col3.label("col3"), t1.c.col4.label("col4")],
("bbb", "ccc"),
("ccc", "aaa"),
]
- eq_(u.execute().fetchall(), wanted)
+ eq_(connection.execute(u).fetchall(), wanted)
@testing.fails_on("firebird", "doesn't like ORDER BY with UNIONs")
@testing.requires.subqueries
- def test_union_ordered_alias(self):
+ def test_union_ordered_alias(self, connection):
(s1, s2) = (
select(
[t1.c.col3.label("col3"), t1.c.col4.label("col4")],
("bbb", "ccc"),
("ccc", "aaa"),
]
- eq_(u.alias("bar").select().execute().fetchall(), wanted)
+ eq_(connection.execute(u.alias("bar").select()).fetchall(), wanted)
@testing.crashes("oracle", "FIXME: unknown, verify not fails_on")
@testing.fails_on(
testing.requires._mysql_not_mariadb_104, "FIXME: unknown"
)
@testing.fails_on("sqlite", "FIXME: unknown")
- def test_union_all(self):
+ def test_union_all(self, connection):
e = union_all(
select([t1.c.col3]),
union(select([t1.c.col3]), select([t1.c.col3])),
)
wanted = [("aaa",), ("aaa",), ("bbb",), ("bbb",), ("ccc",), ("ccc",)]
- found1 = self._fetchall_sorted(e.execute())
+ found1 = self._fetchall_sorted(connection.execute(e))
eq_(found1, wanted)
- found2 = self._fetchall_sorted(e.alias("foo").select().execute())
+ found2 = self._fetchall_sorted(
+ connection.execute(e.alias("foo").select())
+ )
eq_(found2, wanted)
- def test_union_all_lightweight(self):
+ def test_union_all_lightweight(self, connection):
"""like test_union_all, but breaks the sub-union into
a subquery with an explicit column reference on the outside,
more palatable to a wider variety of engines.
e = union_all(select([t1.c.col3]), select([u.c.col3]))
wanted = [("aaa",), ("aaa",), ("bbb",), ("bbb",), ("ccc",), ("ccc",)]
- found1 = self._fetchall_sorted(e.execute())
+ found1 = self._fetchall_sorted(connection.execute(e))
eq_(found1, wanted)
- found2 = self._fetchall_sorted(e.alias("foo").select().execute())
+ found2 = self._fetchall_sorted(
+ connection.execute(e.alias("foo").select())
+ )
eq_(found2, wanted)
@testing.requires.intersect
- def test_intersect(self):
+ def test_intersect(self, connection):
i = intersect(
select([t2.c.col3, t2.c.col4]),
select([t2.c.col3, t2.c.col4], t2.c.col4 == t3.c.col3),
wanted = [("aaa", "bbb"), ("bbb", "ccc"), ("ccc", "aaa")]
- found1 = self._fetchall_sorted(i.execute())
+ found1 = self._fetchall_sorted(connection.execute(i))
eq_(found1, wanted)
- found2 = self._fetchall_sorted(i.alias("bar").select().execute())
+ found2 = self._fetchall_sorted(
+ connection.execute(i.alias("bar").select())
+ )
eq_(found2, wanted)
@testing.requires.except_
@testing.fails_on("sqlite", "Can't handle this style of nesting")
- def test_except_style1(self):
+ def test_except_style1(self, connection):
e = except_(
union(
select([t1.c.col3, t1.c.col4]),
("ccc", "ccc"),
]
- found = self._fetchall_sorted(e.alias().select().execute())
+ found = self._fetchall_sorted(connection.execute(e.alias().select()))
eq_(found, wanted)
@testing.requires.except_
- def test_except_style2(self):
+ def test_except_style2(self, connection):
# same as style1, but add alias().select() to the except_().
# sqlite can handle it now.
("ccc", "ccc"),
]
- found1 = self._fetchall_sorted(e.execute())
+ found1 = self._fetchall_sorted(connection.execute(e))
eq_(found1, wanted)
- found2 = self._fetchall_sorted(e.alias().select().execute())
+ found2 = self._fetchall_sorted(connection.execute(e.alias().select()))
eq_(found2, wanted)
@testing.fails_on(
"Can't handle this style of nesting",
)
@testing.requires.except_
- def test_except_style3(self):
+ def test_except_style3(self, connection):
# aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc
e = except_(
select([t1.c.col3]), # aaa, bbb, ccc
select([t3.c.col3], t3.c.col3 == "ccc"), # ccc
),
)
- eq_(e.execute().fetchall(), [("ccc",)])
- eq_(e.alias("foo").select().execute().fetchall(), [("ccc",)])
+ eq_(connection.execute(e).fetchall(), [("ccc",)])
+ eq_(connection.execute(e.alias("foo").select()).fetchall(), [("ccc",)])
@testing.requires.except_
- def test_except_style4(self):
+ def test_except_style4(self, connection):
# aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc
e = except_(
select([t1.c.col3]), # aaa, bbb, ccc
.select(),
)
- eq_(e.execute().fetchall(), [("ccc",)])
- eq_(e.alias().select().execute().fetchall(), [("ccc",)])
+ eq_(connection.execute(e).fetchall(), [("ccc",)])
+ eq_(connection.execute(e.alias().select()).fetchall(), [("ccc",)])
@testing.requires.intersect
@testing.fails_on(
["sqlite", testing.requires._mysql_not_mariadb_104],
"sqlite can't handle leading parenthesis",
)
- def test_intersect_unions(self):
+ def test_intersect_unions(self, connection):
u = intersect(
union(
select([t1.c.col3, t1.c.col4]), select([t3.c.col3, t3.c.col4])
.select(),
)
wanted = [("aaa", "ccc"), ("bbb", "aaa"), ("ccc", "bbb")]
- found = self._fetchall_sorted(u.execute())
+ found = self._fetchall_sorted(connection.execute(u))
eq_(found, wanted)
@testing.requires.intersect
- def test_intersect_unions_2(self):
+ def test_intersect_unions_2(self, connection):
u = intersect(
union(
select([t1.c.col3, t1.c.col4]), select([t3.c.col3, t3.c.col4])
.select(),
)
wanted = [("aaa", "ccc"), ("bbb", "aaa"), ("ccc", "bbb")]
- found = self._fetchall_sorted(u.execute())
+ found = self._fetchall_sorted(connection.execute(u))
eq_(found, wanted)
@testing.requires.intersect
- def test_intersect_unions_3(self):
+ def test_intersect_unions_3(self, connection):
u = intersect(
select([t2.c.col3, t2.c.col4]),
union(
.select(),
)
wanted = [("aaa", "bbb"), ("bbb", "ccc"), ("ccc", "aaa")]
- found = self._fetchall_sorted(u.execute())
+ found = self._fetchall_sorted(connection.execute(u))
eq_(found, wanted)
@testing.requires.intersect
- def test_composite_alias(self):
+ def test_composite_alias(self, connection):
ua = intersect(
select([t2.c.col3, t2.c.col4]),
union(
).alias()
wanted = [("aaa", "bbb"), ("bbb", "ccc"), ("ccc", "aaa")]
- found = self._fetchall_sorted(ua.select().execute())
+ found = self._fetchall_sorted(connection.execute(ua.select()))
eq_(found, wanted)
metadata.drop_all()
metadata.create_all()
- # t1.10 -> t2.20 -> t3.30
- # t1.11 -> t2.21
- # t1.12
- t1.insert().execute(
- {"t1_id": 10, "name": "t1 #10"},
- {"t1_id": 11, "name": "t1 #11"},
- {"t1_id": 12, "name": "t1 #12"},
- )
- t2.insert().execute(
- {"t2_id": 20, "t1_id": 10, "name": "t2 #20"},
- {"t2_id": 21, "t1_id": 11, "name": "t2 #21"},
- )
- t3.insert().execute({"t3_id": 30, "t2_id": 20, "name": "t3 #30"})
+ with testing.db.connect() as conn:
+ # t1.10 -> t2.20 -> t3.30
+ # t1.11 -> t2.21
+ # t1.12
+ conn.execute(
+ t1.insert(),
+ {"t1_id": 10, "name": "t1 #10"},
+ {"t1_id": 11, "name": "t1 #11"},
+ {"t1_id": 12, "name": "t1 #12"},
+ )
+ conn.execute(
+ t2.insert(),
+ {"t2_id": 20, "t1_id": 10, "name": "t2 #20"},
+ {"t2_id": 21, "t1_id": 11, "name": "t2 #21"},
+ )
+ conn.execute(
+ t3.insert(), {"t3_id": 30, "t2_id": 20, "name": "t3 #30"}
+ )
@classmethod
def teardown_class(cls):
def assertRows(self, statement, expected):
"""Execute a statement and assert that rows returned equal expected."""
-
- found = sorted([tuple(row) for row in statement.execute().fetchall()])
-
- eq_(found, sorted(expected))
+ with testing.db.connect() as conn:
+ found = sorted(
+ [tuple(row) for row in conn.execute(statement).fetchall()]
+ )
+ eq_(found, sorted(expected))
def test_join_x1(self):
"""Joins t1->t2."""
)
metadata.create_all()
- flds.insert().execute(
- [dict(intcol=5, strcol="foo"), dict(intcol=13, strcol="bar")]
- )
+ with testing.db.connect() as conn:
+ conn.execute(
+ flds.insert(),
+ [dict(intcol=5, strcol="foo"), dict(intcol=13, strcol="bar")],
+ )
@classmethod
def teardown_class(cls):
metadata.drop_all()
# TODO: seems like more tests warranted for this setup.
- def test_modulo(self):
+ def test_modulo(self, connection):
eq_(
- select([flds.c.intcol % 3], order_by=flds.c.idcol)
- .execute()
- .fetchall(),
+ connection.execute(
+ select([flds.c.intcol % 3], order_by=flds.c.idcol)
+ ).fetchall(),
[(2,), (1,)],
)
@testing.requires.window_functions
- def test_over(self):
+ def test_over(self, connection):
eq_(
- select(
- [flds.c.intcol, func.row_number().over(order_by=flds.c.strcol)]
- )
- .execute()
- .fetchall(),
+ connection.execute(
+ select(
+ [
+ flds.c.intcol,
+ func.row_number().over(order_by=flds.c.strcol),
+ ]
+ )
+ ).fetchall(),
[(13, 1), (5, 2)],
)