test_needs_acid=True,
)
- def test_row_iteration(self):
+ def test_row_iteration(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
{"user_id": 7, "user_name": "jack"},
{"user_id": 8, "user_name": "ed"},
{"user_id": 9, "user_name": "fred"},
)
- r = users.select().execute()
+ r = connection.execute(users.select())
rows = []
for row in r:
rows.append(row)
eq_(len(rows), 3)
- def test_row_next(self):
+ def test_row_next(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
{"user_id": 7, "user_name": "jack"},
{"user_id": 8, "user_name": "ed"},
{"user_id": 9, "user_name": "fred"},
)
- r = users.select().execute()
+ r = connection.execute(users.select())
rows = []
while True:
row = next(r, "foo")
eq_(len(rows), 3)
@testing.requires.subqueries
- def test_anonymous_rows(self):
+ def test_anonymous_rows(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
{"user_id": 7, "user_name": "jack"},
{"user_id": 8, "user_name": "ed"},
{"user_id": 9, "user_name": "fred"},
.where(users.c.user_name == "jack")
.scalar_subquery()
)
- for row in select([sel + 1, sel + 3], bind=users.bind).execute():
+ for row in connection.execute(
+ select([sel + 1, sel + 3], bind=users.bind)
+ ):
eq_(row._mapping["anon_1"], 8)
eq_(row._mapping["anon_2"], 10)
- def test_row_comparison(self):
+ def test_row_comparison(self, connection):
users = self.tables.users
- users.insert().execute(user_id=7, user_name="jack")
- rp = users.select().execute().first()
+ connection.execute(users.insert(), user_id=7, user_name="jack")
+ rp = connection.execute(users.select()).first()
eq_(rp, rp)
is_(not (rp != rp), True)
eq_(control, op(compare, rp))
@testing.provide_metadata
- def test_column_label_overlap_fallback(self):
+ def test_column_label_overlap_fallback(self, connection):
content = Table("content", self.metadata, Column("type", String(30)))
bar = Table("bar", self.metadata, Column("content_type", String(30)))
- self.metadata.create_all(testing.db)
- testing.db.execute(content.insert().values(type="t1"))
+ self.metadata.create_all(connection)
+ connection.execute(content.insert().values(type="t1"))
- row = testing.db.execute(content.select(use_labels=True)).first()
+ row = connection.execute(content.select(use_labels=True)).first()
in_(content.c.type, row._mapping)
not_in_(bar.c.content_type, row._mapping)
not_in_(bar.c.content_type, row._mapping)
- row = testing.db.execute(
+ row = connection.execute(
select([func.now().label("content_type")])
).first()
not_in_(bar.c.content_type, row._mapping)
- def test_pickled_rows(self):
+ def test_pickled_rows(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
{"user_id": 7, "user_name": "jack"},
{"user_id": 8, "user_name": "ed"},
{"user_id": 9, "user_name": "fred"},
for pickle in False, True:
for use_labels in False, True:
- result = (
- users.select(use_labels=use_labels)
- .order_by(users.c.user_id)
- .execute()
- .fetchall()
- )
+ result = connection.execute(
+ users.select(use_labels=use_labels).order_by(
+ users.c.user_id
+ )
+ ).fetchall()
if pickle:
result = util.pickle.loads(util.pickle.dumps(result))
lambda: result[0]._mapping["fake key"],
)
- def test_column_error_printing(self):
- result = testing.db.execute(select([1]))
+ def test_column_error_printing(self, connection):
+ result = connection.execute(select([1]))
row = result.first()
class unprintable(object):
lambda: row._mapping[accessor],
)
- def test_fetchmany(self):
+ def test_fetchmany(self, connection):
users = self.tables.users
- users.insert().execute(user_id=7, user_name="jack")
- users.insert().execute(user_id=8, user_name="ed")
- users.insert().execute(user_id=9, user_name="fred")
- r = users.select().execute()
+ connection.execute(users.insert(), user_id=7, user_name="jack")
+ connection.execute(users.insert(), user_id=8, user_name="ed")
+ connection.execute(users.insert(), user_id=9, user_name="fred")
+ r = connection.execute(users.select())
rows = []
for row in r.fetchmany(size=2):
rows.append(row)
eq_(len(rows), 2)
- def test_column_slices(self):
+ def test_column_slices(self, connection):
users = self.tables.users
addresses = self.tables.addresses
- users.insert().execute(user_id=1, user_name="john")
- users.insert().execute(user_id=2, user_name="jack")
- addresses.insert().execute(
- address_id=1, user_id=2, address="foo@bar.com"
+ connection.execute(users.insert(), user_id=1, user_name="john")
+ connection.execute(users.insert(), user_id=2, user_name="jack")
+ connection.execute(
+ addresses.insert(), address_id=1, user_id=2, address="foo@bar.com"
)
- r = text("select * from addresses", bind=testing.db).execute().first()
+ r = connection.execute(
+ text("select * from addresses", bind=testing.db)
+ ).first()
eq_(r[0:1], (1,))
eq_(r[1:], (2, "foo@bar.com"))
eq_(r[:-1], (1, 2))
- def test_column_accessor_basic_compiled_mapping(self):
+ def test_column_accessor_basic_compiled_mapping(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
dict(user_id=1, user_name="john"),
dict(user_id=2, user_name="jack"),
)
- r = users.select(users.c.user_id == 2).execute().first()
+ r = connection.execute(users.select(users.c.user_id == 2)).first()
eq_(r.user_id, 2)
eq_(r._mapping["user_id"], 2)
eq_(r._mapping[users.c.user_id], 2)
eq_(r._mapping["user_name"], "jack")
eq_(r._mapping[users.c.user_name], "jack")
- def test_column_accessor_basic_compiled_traditional(self):
+ def test_column_accessor_basic_compiled_traditional(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
dict(user_id=1, user_name="john"),
dict(user_id=2, user_name="jack"),
)
- r = users.select(users.c.user_id == 2).execute().first()
+ r = connection.execute(users.select(users.c.user_id == 2)).first()
eq_(r.user_id, 2)
eq_(r._mapping["user_id"], 2)
eq_(r._mapping["user_name"], "jack")
eq_(r._mapping[users.c.user_name], "jack")
- def test_row_getitem_string(self):
+ def test_row_getitem_string(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
dict(user_id=1, user_name="john"),
dict(user_id=2, user_name="jack"),
)
- r = testing.db.execute(
+ r = connection.execute(
text("select * from users where user_id=2")
).first()
eq_(r._mapping["user_name"], "jack")
- def test_column_accessor_basic_text(self):
+ def test_column_accessor_basic_text(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
dict(user_id=1, user_name="john"),
dict(user_id=2, user_name="jack"),
)
- r = testing.db.execute(
+ r = connection.execute(
text("select * from users where user_id=2")
).first()
eq_(r.user_name, "jack")
eq_(r._mapping["user_name"], "jack")
- def test_column_accessor_text_colexplicit(self):
+ def test_column_accessor_text_colexplicit(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
dict(user_id=1, user_name="john"),
dict(user_id=2, user_name="jack"),
)
- r = testing.db.execute(
+ r = connection.execute(
text("select * from users where user_id=2").columns(
users.c.user_id, users.c.user_name
)
eq_(r._mapping["user_name"], "jack")
eq_(r._mapping[users.c.user_name], "jack")
- def test_column_accessor_textual_select(self):
+ def test_column_accessor_textual_select(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
dict(user_id=1, user_name="john"),
dict(user_id=2, user_name="jack"),
)
# this will create column() objects inside
# the select(), these need to match on name anyway
- r = testing.db.execute(
+ r = connection.execute(
select([column("user_id"), column("user_name")])
.select_from(table("users"))
.where(text("user_id=2"))
eq_(r.user_name, "jack")
eq_(r._mapping["user_name"], "jack")
- def test_column_accessor_dotted_union(self):
+ def test_column_accessor_dotted_union(self, connection):
users = self.tables.users
- users.insert().execute(dict(user_id=1, user_name="john"))
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
# test a little sqlite < 3.10.0 weirdness - with the UNION,
# cols come back as "users.user_id" in cursor.description
- r = testing.db.execute(
+ r = connection.execute(
text(
"select users.user_id, users.user_name "
"from users "
eq_(r._mapping["user_name"], "john")
eq_(list(r._fields), ["user_id", "user_name"])
- def test_column_accessor_sqlite_raw(self):
+ def test_column_accessor_sqlite_raw(self, connection):
users = self.tables.users
- users.insert().execute(dict(user_id=1, user_name="john"))
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
- r = (
+ r = connection.execute(
text(
"select users.user_id, users.user_name "
"from users "
"UNION select users.user_id, "
"users.user_name from users",
bind=testing.db,
- )
- .execution_options(sqlite_raw_colnames=True)
- .execute()
- .first()
- )
+ ).execution_options(sqlite_raw_colnames=True)
+ ).first()
if testing.against("sqlite < 3.10.0"):
not_in_("user_id", r)
eq_(list(r._fields), ["user_id", "user_name"])
- def test_column_accessor_sqlite_translated(self):
+ def test_column_accessor_sqlite_translated(self, connection):
users = self.tables.users
- users.insert().execute(dict(user_id=1, user_name="john"))
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
- r = (
+ r = connection.execute(
text(
"select users.user_id, users.user_name "
"from users "
"users.user_name from users",
bind=testing.db,
)
- .execute()
- .first()
- )
+ ).first()
eq_(r._mapping["user_id"], 1)
eq_(r._mapping["user_name"], "john")
eq_(list(r._fields), ["user_id", "user_name"])
- def test_column_accessor_labels_w_dots(self):
+ def test_column_accessor_labels_w_dots(self, connection):
users = self.tables.users
- users.insert().execute(dict(user_id=1, user_name="john"))
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
# test using literal tablename.colname
- r = (
+ r = connection.execute(
text(
'select users.user_id AS "users.user_id", '
'users.user_name AS "users.user_name" '
"from users",
bind=testing.db,
- )
- .execution_options(sqlite_raw_colnames=True)
- .execute()
- .first()
- )
+ ).execution_options(sqlite_raw_colnames=True)
+ ).first()
eq_(r._mapping["users.user_id"], 1)
eq_(r._mapping["users.user_name"], "john")
not_in_("user_name", r._mapping)
eq_(list(r._fields), ["users.user_id", "users.user_name"])
- def test_column_accessor_unary(self):
+ def test_column_accessor_unary(self, connection):
users = self.tables.users
- users.insert().execute(dict(user_id=1, user_name="john"))
+ connection.execute(users.insert(), dict(user_id=1, user_name="john"))
# unary expressions
- r = (
- select([users.c.user_name.distinct()])
- .order_by(users.c.user_name)
- .execute()
- .first()
- )
+ r = connection.execute(
+ select([users.c.user_name.distinct()]).order_by(users.c.user_name)
+ ).first()
eq_(r._mapping[users.c.user_name], "john")
eq_(r.user_name, "john")
- def test_column_accessor_err(self):
- r = testing.db.execute(select([1])).first()
+ def test_column_accessor_err(self, connection):
+ r = connection.execute(select([1])).first()
assert_raises_message(
AttributeError,
"Could not locate column in row for column 'foo'",
)
def test_connectionless_autoclose_rows_exhausted(self):
+ # TODO: deprecate for 2.0
users = self.tables.users
with testing.db.connect() as conn:
conn.execute(users.insert(), dict(user_id=1, user_name="john"))
@testing.requires.returning
def test_connectionless_autoclose_crud_rows_exhausted(self):
+ # TODO: deprecate for 2.0
users = self.tables.users
stmt = (
users.insert()
assert connection.closed
def test_connectionless_autoclose_no_rows(self):
+ # TODO: deprecate for 2.0
result = testing.db.execute(text("select * from users"))
connection = result.connection
assert not connection.closed
@testing.requires.updateable_autoincrement_pks
def test_connectionless_autoclose_no_metadata(self):
+ # TODO: deprecate for 2.0
result = testing.db.execute(text("update users set user_id=5"))
connection = result.connection
assert connection.closed
result.fetchone,
)
- def test_row_case_sensitive(self):
- row = testing.db.execute(
+ def test_row_case_sensitive(self, connection):
+ row = connection.execute(
select(
[
literal_column("1").label("case_insensitive"),
assert_raises(KeyError, lambda: row._mapping["casesensitive"])
def test_row_case_sensitive_unoptimized(self):
- ins_db = engines.testing_engine()
- row = ins_db.execute(
- select(
- [
- literal_column("1").label("case_insensitive"),
- literal_column("2").label("CaseSensitive"),
- text("3 AS screw_up_the_cols"),
- ]
- )
- ).first()
+ with engines.testing_engine().connect() as ins_conn:
+ row = ins_conn.execute(
+ select(
+ [
+ literal_column("1").label("case_insensitive"),
+ literal_column("2").label("CaseSensitive"),
+ text("3 AS screw_up_the_cols"),
+ ]
+ )
+ ).first()
- eq_(
- list(row._fields),
- ["case_insensitive", "CaseSensitive", "screw_up_the_cols"],
- )
+ eq_(
+ list(row._fields),
+ ["case_insensitive", "CaseSensitive", "screw_up_the_cols"],
+ )
- in_("case_insensitive", row._keymap)
- in_("CaseSensitive", row._keymap)
- not_in_("casesensitive", row._keymap)
+ in_("case_insensitive", row._keymap)
+ in_("CaseSensitive", row._keymap)
+ not_in_("casesensitive", row._keymap)
- eq_(row._mapping["case_insensitive"], 1)
- eq_(row._mapping["CaseSensitive"], 2)
- eq_(row._mapping["screw_up_the_cols"], 3)
+ eq_(row._mapping["case_insensitive"], 1)
+ eq_(row._mapping["CaseSensitive"], 2)
+ eq_(row._mapping["screw_up_the_cols"], 3)
- assert_raises(KeyError, lambda: row._mapping["Case_insensitive"])
- assert_raises(KeyError, lambda: row._mapping["casesensitive"])
- assert_raises(KeyError, lambda: row._mapping["screw_UP_the_cols"])
+ assert_raises(KeyError, lambda: row._mapping["Case_insensitive"])
+ assert_raises(KeyError, lambda: row._mapping["casesensitive"])
+ assert_raises(KeyError, lambda: row._mapping["screw_UP_the_cols"])
- def test_row_as_args(self):
+ def test_row_as_args(self, connection):
users = self.tables.users
- users.insert().execute(user_id=1, user_name="john")
- r = users.select(users.c.user_id == 1).execute().first()
- users.delete().execute()
- users.insert().execute(r._mapping)
- eq_(users.select().execute().fetchall(), [(1, "john")])
+ connection.execute(users.insert(), user_id=1, user_name="john")
+ r = connection.execute(users.select(users.c.user_id == 1)).first()
+ connection.execute(users.delete())
+ connection.execute(users.insert(), r._mapping)
+ eq_(connection.execute(users.select()).fetchall(), [(1, "john")])
- def test_result_as_args(self):
+ def test_result_as_args(self, connection):
users = self.tables.users
users2 = self.tables.users2
- users.insert().execute(
+ connection.execute(
+ users.insert(),
[
dict(user_id=1, user_name="john"),
dict(user_id=2, user_name="ed"),
- ]
+ ],
)
- r = users.select().execute()
- users2.insert().execute([row._mapping for row in r])
+ r = connection.execute(users.select())
+ connection.execute(users2.insert(), [row._mapping for row in r])
eq_(
- users2.select().order_by(users2.c.user_id).execute().fetchall(),
+ connection.execute(
+ users2.select().order_by(users2.c.user_id)
+ ).fetchall(),
[(1, "john"), (2, "ed")],
)
- users2.delete().execute()
- r = users.select().execute()
- users2.insert().execute(*[row._mapping for row in r])
+ connection.execute(users2.delete())
+ r = connection.execute(users.select())
+ connection.execute(users2.insert(), *[row._mapping for row in r])
eq_(
- users2.select().order_by(users2.c.user_id).execute().fetchall(),
+ connection.execute(
+ users2.select().order_by(users2.c.user_id)
+ ).fetchall(),
[(1, "john"), (2, "ed")],
)
@testing.requires.duplicate_names_in_cursor_description
- def test_ambiguous_column(self):
+ def test_ambiguous_column(self, connection):
users = self.tables.users
addresses = self.tables.addresses
- users.insert().execute(user_id=1, user_name="john")
- result = users.outerjoin(addresses).select().execute()
+ connection.execute(users.insert(), user_id=1, user_name="john")
+ result = connection.execute(users.outerjoin(addresses).select())
r = result.first()
assert_raises_message(
lambda: r._mapping["user_id"],
)
- result = users.outerjoin(addresses).select().execute()
+ result = connection.execute(users.outerjoin(addresses).select())
result = _result.BufferedColumnResultProxy(result.context)
r = result.first()
assert isinstance(r, _result.BufferedColumnRow)
)
@testing.requires.duplicate_names_in_cursor_description
- def test_ambiguous_column_by_col(self):
+ def test_ambiguous_column_by_col(self, connection):
users = self.tables.users
- users.insert().execute(user_id=1, user_name="john")
+ connection.execute(users.insert(), user_id=1, user_name="john")
ua = users.alias()
u2 = users.alias()
- result = (
- select([users.c.user_id, ua.c.user_id])
- .select_from(users.join(ua, true()))
- .execute()
+ result = connection.execute(
+ select([users.c.user_id, ua.c.user_id]).select_from(
+ users.join(ua, true())
+ )
)
row = result.first()
)
@testing.requires.duplicate_names_in_cursor_description
- def test_ambiguous_column_contains(self):
+ def test_ambiguous_column_contains(self, connection):
users = self.tables.users
addresses = self.tables.addresses
# ticket 2702. in 0.7 we'd get True, False.
# in 0.8, both columns are present so it's True;
# but when they're fetched you'll get the ambiguous error.
- users.insert().execute(user_id=1, user_name="john")
- result = (
- select([users.c.user_id, addresses.c.user_id])
- .select_from(users.outerjoin(addresses))
- .execute()
+ connection.execute(users.insert(), user_id=1, user_name="john")
+ result = connection.execute(
+ select([users.c.user_id, addresses.c.user_id]).select_from(
+ users.outerjoin(addresses)
+ )
)
row = result.first()
set([True]),
)
- def test_loose_matching_one(self):
+ def test_loose_matching_one(self, connection):
users = self.tables.users
addresses = self.tables.addresses
- with testing.db.connect() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "john"})
- conn.execute(
- addresses.insert(),
- {"address_id": 1, "user_id": 1, "address": "email"},
+ connection.execute(users.insert(), {"user_id": 1, "user_name": "john"})
+ connection.execute(
+ addresses.insert(),
+ {"address_id": 1, "user_id": 1, "address": "email"},
+ )
+
+ # use some column labels in the SELECT
+ result = connection.execute(
+ TextualSelect(
+ text(
+ "select users.user_name AS users_user_name, "
+ "users.user_id AS user_id, "
+ "addresses.address_id AS address_id "
+ "FROM users JOIN addresses "
+ "ON users.user_id = addresses.user_id "
+ "WHERE users.user_id=1 "
+ ),
+ [users.c.user_id, users.c.user_name, addresses.c.address_id],
+ positional=False,
)
+ )
+ row = result.first()
+ eq_(row._mapping[users.c.user_id], 1)
+ eq_(row._mapping[users.c.user_name], "john")
- # use some column labels in the SELECT
- result = conn.execute(
- TextualSelect(
- text(
- "select users.user_name AS users_user_name, "
- "users.user_id AS user_id, "
- "addresses.address_id AS address_id "
- "FROM users JOIN addresses "
- "ON users.user_id = addresses.user_id "
- "WHERE users.user_id=1 "
- ),
- [
- users.c.user_id,
- users.c.user_name,
- addresses.c.address_id,
- ],
- positional=False,
- )
- )
- row = result.first()
- eq_(row._mapping[users.c.user_id], 1)
- eq_(row._mapping[users.c.user_name], "john")
-
- def test_loose_matching_two(self):
+ def test_loose_matching_two(self, connection):
users = self.tables.users
addresses = self.tables.addresses
- with testing.db.connect() as conn:
- conn.execute(users.insert(), {"user_id": 1, "user_name": "john"})
- conn.execute(
- addresses.insert(),
- {"address_id": 1, "user_id": 1, "address": "email"},
- )
-
- # use some column labels in the SELECT
- result = conn.execute(
- TextualSelect(
- text(
- "select users.user_name AS users_user_name, "
- "users.user_id AS user_id, "
- "addresses.user_id "
- "FROM users JOIN addresses "
- "ON users.user_id = addresses.user_id "
- "WHERE users.user_id=1 "
- ),
- [users.c.user_id, users.c.user_name, addresses.c.user_id],
- positional=False,
- )
+ connection.execute(users.insert(), {"user_id": 1, "user_name": "john"})
+ connection.execute(
+ addresses.insert(),
+ {"address_id": 1, "user_id": 1, "address": "email"},
+ )
+
+ # use some column labels in the SELECT
+ result = connection.execute(
+ TextualSelect(
+ text(
+ "select users.user_name AS users_user_name, "
+ "users.user_id AS user_id, "
+ "addresses.user_id "
+ "FROM users JOIN addresses "
+ "ON users.user_id = addresses.user_id "
+ "WHERE users.user_id=1 "
+ ),
+ [users.c.user_id, users.c.user_name, addresses.c.user_id],
+ positional=False,
)
- row = result.first()
+ )
+ row = result.first()
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: row._mapping[users.c.user_id],
- )
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: row._mapping[addresses.c.user_id],
- )
- eq_(row._mapping[users.c.user_name], "john")
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name",
+ lambda: row._mapping[users.c.user_id],
+ )
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name",
+ lambda: row._mapping[addresses.c.user_id],
+ )
+ eq_(row._mapping[users.c.user_name], "john")
- def test_ambiguous_column_by_col_plus_label(self):
+ def test_ambiguous_column_by_col_plus_label(self, connection):
users = self.tables.users
- users.insert().execute(user_id=1, user_name="john")
- result = select(
- [
- users.c.user_id,
- type_coerce(users.c.user_id, Integer).label("foo"),
- ]
- ).execute()
+ connection.execute(users.insert(), user_id=1, user_name="john")
+ result = connection.execute(
+ select(
+ [
+ users.c.user_id,
+ type_coerce(users.c.user_id, Integer).label("foo"),
+ ]
+ )
+ )
row = result.first()
eq_(row._mapping[users.c.user_id], 1)
eq_(row[1], 1)
- def test_fetch_partial_result_map(self):
+ def test_fetch_partial_result_map(self, connection):
users = self.tables.users
- users.insert().execute(user_id=7, user_name="ed")
+ connection.execute(users.insert(), user_id=7, user_name="ed")
t = text("select * from users").columns(user_name=String())
- eq_(testing.db.execute(t).fetchall(), [(7, "ed")])
+ eq_(connection.execute(t).fetchall(), [(7, "ed")])
- def test_fetch_unordered_result_map(self):
+ def test_fetch_unordered_result_map(self, connection):
users = self.tables.users
- users.insert().execute(user_id=7, user_name="ed")
+ connection.execute(users.insert(), user_id=7, user_name="ed")
class Goofy1(TypeDecorator):
impl = String
"select user_name as a, user_name as b, "
"user_name as c from users"
).columns(a=Goofy1(), b=Goofy2(), c=Goofy3())
- eq_(testing.db.execute(t).fetchall(), [("eda", "edb", "edc")])
+ eq_(connection.execute(t).fetchall(), [("eda", "edb", "edc")])
@testing.requires.subqueries
- def test_column_label_targeting(self):
+ def test_column_label_targeting(self, connection):
users = self.tables.users
- users.insert().execute(user_id=7, user_name="ed")
+ connection.execute(users.insert(), user_id=7, user_name="ed")
for s in (
users.select().alias("foo"),
users.select().alias(users.name),
):
- row = s.select(use_labels=True).execute().first()
+ row = connection.execute(s.select(use_labels=True)).first()
eq_(row._mapping[s.c.user_id], 7)
eq_(row._mapping[s.c.user_name], "ed")
@testing.requires.python3
- def test_ro_mapping_py3k(self):
+ def test_ro_mapping_py3k(self, connection):
users = self.tables.users
- users.insert().execute(user_id=1, user_name="foo")
- result = users.select().execute()
+ connection.execute(users.insert(), user_id=1, user_name="foo")
+ result = connection.execute(users.select())
row = result.first()
dict_row = row._asdict()
eq_(odict_row.items(), mapping_row.items())
@testing.requires.python2
- def test_ro_mapping_py2k(self):
+ def test_ro_mapping_py2k(self, connection):
users = self.tables.users
- users.insert().execute(user_id=1, user_name="foo")
- result = users.select().execute()
+ connection.execute(users.insert(), user_id=1, user_name="foo")
+ result = connection.execute(users.select())
row = result.first()
dict_row = row._asdict()
eq_(odict_row.values(), list(mapping_row.values()))
eq_(odict_row.items(), list(mapping_row.items()))
- def test_keys(self):
+ def test_keys(self, connection):
users = self.tables.users
- users.insert().execute(user_id=1, user_name="foo")
- result = users.select().execute()
+ connection.execute(users.insert(), user_id=1, user_name="foo")
+ result = connection.execute(users.select())
eq_(result.keys(), ["user_id", "user_name"])
row = result.first()
eq_(list(row._mapping.keys()), ["user_id", "user_name"])
eq_(row._fields, ("user_id", "user_name"))
- def test_row_keys_legacy_dont_warn(self):
+ def test_row_keys_legacy_dont_warn(self, connection):
users = self.tables.users
- users.insert().execute(user_id=1, user_name="foo")
- result = users.select().execute()
+ connection.execute(users.insert(), user_id=1, user_name="foo")
+ result = connection.execute(users.select())
row = result.first()
# DO NOT WARN DEPRECATED IN 1.x, ONLY 2.0 WARNING
eq_(dict(row), {"user_id": 1, "user_name": "foo"})
eq_(row.keys(), ["user_id", "user_name"])
- def test_keys_anon_labels(self):
+ def test_keys_anon_labels(self, connection):
"""test [ticket:3483]"""
users = self.tables.users
- users.insert().execute(user_id=1, user_name="foo")
- result = testing.db.execute(
+ connection.execute(users.insert(), user_id=1, user_name="foo")
+ result = connection.execute(
select(
[
users.c.user_id,
eq_(row._fields, ("user_id", "user_name_1", "count_1"))
eq_(list(row._mapping.keys()), ["user_id", "user_name_1", "count_1"])
- def test_items(self):
+ def test_items(self, connection):
users = self.tables.users
- users.insert().execute(user_id=1, user_name="foo")
- r = users.select().execute().first()
+ connection.execute(users.insert(), user_id=1, user_name="foo")
+ r = connection.execute(users.select()).first()
eq_(
[(x[0].lower(), x[1]) for x in list(r._mapping.items())],
[("user_id", 1), ("user_name", "foo")],
r = connection.exec_driver_sql("select user_name from users").first()
eq_(len(r), 1)
- def test_sorting_in_python(self):
+ def test_sorting_in_python(self, connection):
users = self.tables.users
- users.insert().execute(
+ connection.execute(
+ users.insert(),
dict(user_id=1, user_name="foo"),
dict(user_id=2, user_name="bar"),
dict(user_id=3, user_name="def"),
)
- rows = users.select().order_by(users.c.user_name).execute().fetchall()
+ rows = connection.execute(
+ users.select().order_by(users.c.user_name)
+ ).fetchall()
eq_(rows, [(2, "bar"), (3, "def"), (1, "foo")])
eq_(sorted(rows), [(1, "foo"), (2, "bar"), (3, "def")])
- def test_column_order_with_simple_query(self):
+ def test_column_order_with_simple_query(self, connection):
# should return values in column definition order
users = self.tables.users
- users.insert().execute(user_id=1, user_name="foo")
- r = users.select(users.c.user_id == 1).execute().first()
+ connection.execute(users.insert(), user_id=1, user_name="foo")
+ r = connection.execute(users.select(users.c.user_id == 1)).first()
eq_(r[0], 1)
eq_(r[1], "foo")
eq_([x.lower() for x in r._fields], ["user_id", "user_name"])
eq_([x.lower() for x in r._fields], ["user_name", "user_id"])
eq_(list(r._mapping.values()), ["foo", 1])
- @testing.crashes("oracle", "FIXME: unknown, varify not fails_on()")
+ @testing.crashes("oracle", "FIXME: unknown, verify not fails_on()")
@testing.crashes("firebird", "An identifier must begin with a letter")
@testing.provide_metadata
- def test_column_accessor_shadow(self):
+ def test_column_accessor_shadow(self, connection):
shadowed = Table(
"test_shadowed",
self.metadata,
Column("_parent", VARCHAR(20)),
Column("_row", VARCHAR(20)),
)
- self.metadata.create_all()
- shadowed.insert().execute(
+ self.metadata.create_all(connection)
+ connection.execute(
+ shadowed.insert(),
shadow_id=1,
shadow_name="The Shadow",
parent="The Light",
_parent="Hidden parent",
_row="Hidden row",
)
- r = shadowed.select(shadowed.c.shadow_id == 1).execute().first()
+ r = connection.execute(
+ shadowed.select(shadowed.c.shadow_id == 1)
+ ).first()
eq_(r.shadow_id, 1)
eq_(r._mapping["shadow_id"], 1)
with patch.object(
engine.dialect.execution_ctx_cls, "rowcount"
) as mock_rowcount:
- mock_rowcount.__get__ = Mock()
- engine.execute(
- t.insert(), {"data": "d1"}, {"data": "d2"}, {"data": "d3"}
- )
+ with engine.connect() as conn:
+ mock_rowcount.__get__ = Mock()
+ conn.execute(
+ t.insert(), {"data": "d1"}, {"data": "d2"}, {"data": "d3"}
+ )
- eq_(len(mock_rowcount.__get__.mock_calls), 0)
+ eq_(len(mock_rowcount.__get__.mock_calls), 0)
- eq_(
- engine.execute(t.select()).fetchall(),
- [("d1",), ("d2",), ("d3",)],
- )
- eq_(len(mock_rowcount.__get__.mock_calls), 0)
+ eq_(
+ conn.execute(t.select()).fetchall(),
+ [("d1",), ("d2",), ("d3",)],
+ )
+ eq_(len(mock_rowcount.__get__.mock_calls), 0)
- engine.execute(t.update(), {"data": "d4"})
+ conn.execute(t.update(), {"data": "d4"})
- eq_(len(mock_rowcount.__get__.mock_calls), 1)
+ eq_(len(mock_rowcount.__get__.mock_calls), 1)
- engine.execute(t.delete())
- eq_(len(mock_rowcount.__get__.mock_calls), 2)
+ conn.execute(t.delete())
+ eq_(len(mock_rowcount.__get__.mock_calls), 2)
def test_row_is_sequence(self):
eq_(hash(row), hash((1, "value", "foo")))
@testing.provide_metadata
- def test_row_getitem_indexes_compiled(self):
+ def test_row_getitem_indexes_compiled(self, connection):
values = Table(
"rp",
self.metadata,
Column("key", String(10), primary_key=True),
Column("value", String(10)),
)
- values.create()
+ values.create(connection)
- testing.db.execute(values.insert(), dict(key="One", value="Uno"))
- row = testing.db.execute(values.select()).first()
+ connection.execute(values.insert(), dict(key="One", value="Uno"))
+ row = connection.execute(values.select()).first()
eq_(row._mapping["key"], "One")
eq_(row._mapping["value"], "Uno")
eq_(row[0], "One")
@testing.requires.cextensions
def test_row_c_sequence_check(self):
-
+ # TODO: modernize for 2.0
metadata = MetaData()
metadata.bind = "sqlite://"
users = Table(
)
@testing.requires.schemas
- def test_keyed_accessor_wschema(self):
+ def test_keyed_accessor_wschema(self, connection):
keyed1 = self.tables["%s.wschema" % testing.config.test_schema]
- row = testing.db.execute(keyed1.select()).first()
+ row = connection.execute(keyed1.select()).first()
eq_(row.b, "a1")
eq_(row.q, "c1")
eq_(row.a, "a1")
eq_(row.c, "c1")
- def test_keyed_accessor_single(self):
+ def test_keyed_accessor_single(self, connection):
keyed1 = self.tables.keyed1
- row = testing.db.execute(keyed1.select()).first()
+ row = connection.execute(keyed1.select()).first()
eq_(row.b, "a1")
eq_(row.q, "c1")
eq_(row.a, "a1")
eq_(row.c, "c1")
- def test_keyed_accessor_single_labeled(self):
+ def test_keyed_accessor_single_labeled(self, connection):
keyed1 = self.tables.keyed1
- row = testing.db.execute(keyed1.select().apply_labels()).first()
+ row = connection.execute(keyed1.select().apply_labels()).first()
eq_(row.keyed1_b, "a1")
eq_(row.keyed1_q, "c1")
eq_(row.keyed1_a, "a1")
eq_(row.keyed1_c, "c1")
- def _test_keyed_targeting_no_label_at_all(self, expression):
+ def _test_keyed_targeting_no_label_at_all(self, expression, conn):
lt = literal_column("2")
stmt = select([literal_column("1"), expression, lt]).select_from(
self.tables.keyed1
)
- row = testing.db.execute(stmt).first()
+ row = conn.execute(stmt).first()
eq_(row._mapping[expression], "a1")
eq_(row._mapping[lt], 2)
# easily. we get around that because we know that "2" is unique
eq_(row._mapping["2"], 2)
- def test_keyed_targeting_no_label_at_all_one(self):
+ def test_keyed_targeting_no_label_at_all_one(self, connection):
class not_named_max(expression.ColumnElement):
name = "not_named_max"
eq_(str(select([not_named_max()])), "SELECT max(a)")
nnm = not_named_max()
- self._test_keyed_targeting_no_label_at_all(nnm)
+ self._test_keyed_targeting_no_label_at_all(nnm, connection)
- def test_keyed_targeting_no_label_at_all_two(self):
+ def test_keyed_targeting_no_label_at_all_two(self, connection):
class not_named_max(expression.ColumnElement):
name = "not_named_max"
eq_(str(select([not_named_max()])), "SELECT max(a)")
nnm = not_named_max()
- self._test_keyed_targeting_no_label_at_all(nnm)
+ self._test_keyed_targeting_no_label_at_all(nnm, connection)
- def test_keyed_targeting_no_label_at_all_text(self):
+ def test_keyed_targeting_no_label_at_all_text(self, connection):
t1 = text("max(a)")
t2 = text("min(a)")
stmt = select([t1, t2]).select_from(self.tables.keyed1)
- row = testing.db.execute(stmt).first()
+ row = connection.execute(stmt).first()
eq_(row._mapping[t1], "a1")
eq_(row._mapping[t2], "a1")
@testing.requires.duplicate_names_in_cursor_description
- def test_keyed_accessor_composite_conflict_2(self):
+ def test_keyed_accessor_composite_conflict_2(self, connection):
keyed1 = self.tables.keyed1
keyed2 = self.tables.keyed2
- row = testing.db.execute(
+ row = connection.execute(
select([keyed1, keyed2]).select_from(keyed1.join(keyed2, true()))
).first()
# illustrate why row.b above is ambiguous, and not "b2"; because
# if we didn't have keyed2, now it matches row.a. a new column
# shouldn't be able to grab the value from a previous column.
- row = testing.db.execute(select([keyed1])).first()
+ row = connection.execute(select([keyed1])).first()
eq_(row.b, "a1")
- def test_keyed_accessor_composite_conflict_2_fix_w_uselabels(self):
+ def test_keyed_accessor_composite_conflict_2_fix_w_uselabels(
+ self, connection
+ ):
keyed1 = self.tables.keyed1
keyed2 = self.tables.keyed2
- row = testing.db.execute(
+ row = connection.execute(
select([keyed1, keyed2])
.select_from(keyed1.join(keyed2, true()))
.apply_labels()
eq_(row._mapping["keyed2_b"], "b2")
eq_(row._mapping["keyed1_a"], "a1")
- def test_keyed_accessor_composite_names_precedent(self):
+ def test_keyed_accessor_composite_names_precedent(self, connection):
keyed1 = self.tables.keyed1
keyed4 = self.tables.keyed4
- row = testing.db.execute(
+ row = connection.execute(
select([keyed1, keyed4]).select_from(keyed1.join(keyed4, true()))
).first()
eq_(row.b, "b4")
eq_(row.c, "c1")
@testing.requires.duplicate_names_in_cursor_description
- def test_keyed_accessor_composite_keys_precedent(self):
+ def test_keyed_accessor_composite_keys_precedent(self, connection):
keyed1 = self.tables.keyed1
keyed3 = self.tables.keyed3
- row = testing.db.execute(
+ row = connection.execute(
select([keyed1, keyed3]).select_from(keyed1.join(keyed3, true()))
).first()
eq_(row.q, "c1")
)
eq_(row.d, "d3")
- def test_keyed_accessor_composite_labeled(self):
+ def test_keyed_accessor_composite_labeled(self, connection):
keyed1 = self.tables.keyed1
keyed2 = self.tables.keyed2
- row = testing.db.execute(
+ row = connection.execute(
select([keyed1, keyed2])
.select_from(keyed1.join(keyed2, true()))
.apply_labels()
assert_raises(KeyError, lambda: row._mapping["keyed2_c"])
assert_raises(KeyError, lambda: row._mapping["keyed2_q"])
- def test_keyed_accessor_column_is_repeated_multiple_times(self):
+ def test_keyed_accessor_column_is_repeated_multiple_times(
+ self, connection
+ ):
# test new logic added as a result of the combination of #4892 and
# #4887. We allow duplicate columns, but we also have special logic
# to disambiguate for the same column repeated, and as #4887 adds
.apply_labels()
)
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
is_false(result._metadata.matched_on_name)
# ensure the result map is the same number of cols so we can
eq_(row[6], "d3")
eq_(row[7], "d3")
- def test_columnclause_schema_column_one(self):
+ def test_columnclause_schema_column_one(self, connection):
# originally addressed by [ticket:2932], however liberalized
# Column-targeting rules are deprecated
a, b = sql.column("a"), sql.column("b")
stmt = select([a, b]).select_from(table("keyed2"))
- row = testing.db.execute(stmt).first()
+ row = connection.execute(stmt).first()
in_(a, row._mapping)
in_(b, row._mapping)
- def test_columnclause_schema_column_two(self):
+ def test_columnclause_schema_column_two(self, connection):
keyed2 = self.tables.keyed2
stmt = select([keyed2.c.a, keyed2.c.b])
- row = testing.db.execute(stmt).first()
+ row = connection.execute(stmt).first()
in_(keyed2.c.a, row._mapping)
in_(keyed2.c.b, row._mapping)
- def test_columnclause_schema_column_three(self):
+ def test_columnclause_schema_column_three(self, connection):
# this is also addressed by [ticket:2932]
stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR)
- row = testing.db.execute(stmt).first()
+ row = connection.execute(stmt).first()
in_(stmt.selected_columns.a, row._mapping)
in_(stmt.selected_columns.b, row._mapping)
- def test_columnclause_schema_column_four(self):
+ def test_columnclause_schema_column_four(self, connection):
# originally addressed by [ticket:2932], however liberalized
# Column-targeting rules are deprecated
stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
a, b
)
- row = testing.db.execute(stmt).first()
+ row = connection.execute(stmt).first()
in_(a, row._mapping)
in_(b, row._mapping)
in_(stmt.selected_columns.keyed2_a, row._mapping)
in_(stmt.selected_columns.keyed2_b, row._mapping)
- def test_columnclause_schema_column_five(self):
+ def test_columnclause_schema_column_five(self, connection):
# this is also addressed by [ticket:2932]
stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
keyed2_a=CHAR, keyed2_b=CHAR
)
- row = testing.db.execute(stmt).first()
+ row = connection.execute(stmt).first()
in_(stmt.selected_columns.keyed2_a, row._mapping)
in_(stmt.selected_columns.keyed2_b, row._mapping)
@classmethod
def insert_data(cls):
- cls.tables.text1.insert().execute(
- [dict(a="a1", b="b1", c="c1", d="d1")]
- )
+ with testing.db.connect() as conn:
+ conn.execute(
+ cls.tables.text1.insert(),
+ [dict(a="a1", b="b1", c="c1", d="d1")],
+ )
- def test_via_column(self):
+ def test_via_column(self, connection):
c1, c2, c3, c4 = column("q"), column("p"), column("r"), column("d")
stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
eq_(row._mapping[c2], "b1")
eq_(row._mapping["r"], "c1")
eq_(row._mapping["d"], "d1")
- def test_fewer_cols_than_sql_positional(self):
+ def test_fewer_cols_than_sql_positional(self, connection):
c1, c2 = column("q"), column("p")
stmt = text("select a, b, c, d from text1").columns(c1, c2)
# no warning as this can be similar for non-positional
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
eq_(row._mapping[c1], "a1")
eq_(row._mapping["c"], "c1")
- def test_fewer_cols_than_sql_non_positional(self):
+ def test_fewer_cols_than_sql_non_positional(self, connection):
c1, c2 = column("a"), column("p")
stmt = text("select a, b, c, d from text1").columns(c2, c1, d=CHAR)
# no warning as this can be similar for non-positional
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
# c1 name matches, locates
lambda: row._mapping[c2],
)
- def test_more_cols_than_sql_positional(self):
+ def test_more_cols_than_sql_positional(self, connection):
c1, c2, c3, c4 = column("q"), column("p"), column("r"), column("d")
stmt = text("select a, b from text1").columns(c1, c2, c3, c4)
r"Number of columns in textual SQL \(4\) is "
r"smaller than number of columns requested \(2\)"
):
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
eq_(row._mapping[c2], "b1")
lambda: row._mapping[c3],
)
- def test_more_cols_than_sql_nonpositional(self):
+ def test_more_cols_than_sql_nonpositional(self, connection):
c1, c2, c3, c4 = column("b"), column("a"), column("r"), column("d")
stmt = TextualSelect(
text("select a, b from text1"), [c1, c2, c3, c4], positional=False
)
# no warning for non-positional
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
eq_(row._mapping[c1], "b1")
lambda: row._mapping[c3],
)
- def test_more_cols_than_sql_nonpositional_labeled_cols(self):
+ def test_more_cols_than_sql_nonpositional_labeled_cols(self, connection):
text1 = self.tables.text1
c1, c2, c3, c4 = text1.c.b, text1.c.a, column("r"), column("d")
)
# no warning for non-positional
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
eq_(row._mapping[c1], "b1")
lambda: row._mapping[c3],
)
- def test_dupe_col_obj(self):
+ def test_dupe_col_obj(self, connection):
c1, c2, c3 = column("q"), column("p"), column("r")
stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c2)
exc.InvalidRequestError,
"Duplicate column expression requested in "
"textual SQL: <.*.ColumnClause.*; p>",
- testing.db.execute,
+ connection.execute,
stmt,
)
- def test_anon_aliased_unique(self):
+ def test_anon_aliased_unique(self, connection):
text1 = self.tables.text1
c1 = text1.c.a.label(None)
c4 = text1.alias().c.d.label(None)
stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
eq_(row._mapping[c1], "a1")
lambda: row._mapping[text1.c.b],
)
- def test_anon_aliased_overlapping(self):
+ def test_anon_aliased_overlapping(self, connection):
text1 = self.tables.text1
c1 = text1.c.a.label(None)
c4 = text1.c.a.label(None)
stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
eq_(row._mapping[c1], "a1")
eq_(row._mapping[c3], "c1")
eq_(row._mapping[c4], "d1")
- def test_anon_aliased_name_conflict(self):
+ def test_anon_aliased_name_conflict(self, connection):
text1 = self.tables.text1
c1 = text1.c.a.label("a")
stmt = text("select a, b as a, c as a, d as a from text1").columns(
c1, c2, c3, c4
)
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
eq_(row._mapping[c1], "a1")
@classmethod
def insert_data(cls):
- cls.engine.execute(
- cls.tables.test.insert(),
- [{"x": i, "y": "t_%d" % i} for i in range(1, 12)],
- )
+ with cls.engine.connect() as conn:
+ conn.execute(
+ cls.tables.test.insert(),
+ [{"x": i, "y": "t_%d" % i} for i in range(1, 12)],
+ )
@contextmanager
def _proxy_fixture(self, cls):
def _test_proxy(self, cls):
with self._proxy_fixture(cls):
rows = []
- r = self.engine.execute(select([self.table]))
- assert isinstance(r.cursor_strategy, cls)
- for i in range(5):
- rows.append(r.fetchone())
- eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
+ with self.engine.connect() as conn:
+ r = conn.execute(select([self.table]))
+ assert isinstance(r.cursor_strategy, cls)
+ for i in range(5):
+ rows.append(r.fetchone())
+ eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
- rows = r.fetchmany(3)
- eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)])
+ rows = r.fetchmany(3)
+ eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)])
- rows = r.fetchall()
- eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)])
+ rows = r.fetchall()
+ eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)])
- r = self.engine.execute(select([self.table]))
- rows = r.fetchmany(None)
- eq_(rows[0], (1, "t_1"))
- # number of rows here could be one, or the whole thing
- assert len(rows) == 1 or len(rows) == 11
+ r = conn.execute(select([self.table]))
+ rows = r.fetchmany(None)
+ eq_(rows[0], (1, "t_1"))
+ # number of rows here could be one, or the whole thing
+ assert len(rows) == 1 or len(rows) == 11
- r = self.engine.execute(select([self.table]).limit(1))
- r.fetchone()
- eq_(r.fetchone(), None)
+ r = conn.execute(select([self.table]).limit(1))
+ r.fetchone()
+ eq_(r.fetchone(), None)
- r = self.engine.execute(select([self.table]).limit(5))
- rows = r.fetchmany(6)
- eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
+ r = conn.execute(select([self.table]).limit(5))
+ rows = r.fetchmany(6)
+ eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
- # result keeps going just fine with blank results...
- eq_(r.fetchmany(2), [])
+ # result keeps going just fine with blank results...
+ eq_(r.fetchmany(2), [])
- eq_(r.fetchmany(2), [])
+ eq_(r.fetchmany(2), [])
- eq_(r.fetchall(), [])
+ eq_(r.fetchall(), [])
- eq_(r.fetchone(), None)
+ eq_(r.fetchone(), None)
- # until we close
- r.close()
+ # until we close
+ r.close()
- self._assert_result_closed(r)
+ self._assert_result_closed(r)
- r = self.engine.execute(select([self.table]).limit(5))
- eq_(r.first(), (1, "t_1"))
- self._assert_result_closed(r)
+ r = conn.execute(select([self.table]).limit(5))
+ eq_(r.first(), (1, "t_1"))
+ self._assert_result_closed(r)
- r = self.engine.execute(select([self.table]).limit(5))
- eq_(r.scalar(), 1)
- self._assert_result_closed(r)
+ r = conn.execute(select([self.table]).limit(5))
+ eq_(r.scalar(), 1)
+ self._assert_result_closed(r)
def _assert_result_closed(self, r):
assert_raises_message(