),
)
- def test_processing(self):
+ def test_processing(self, connection):
users = self.tables.users
self._data_fixture()
- result = users.select().order_by(users.c.user_id).execute().fetchall()
+ result = connection.execute(
+ users.select().order_by(users.c.user_id)
+ ).fetchall()
for assertstr, assertint, assertint2, row in zip(
[
"BIND_INjackBIND_OUT",
for col in row[3], row[4]:
assert isinstance(col, util.text_type)
- def test_plain_in(self):
+ def test_plain_in(self, connection):
users = self.tables.users
self._data_fixture()
.where(users.c.goofy8.in_([15, 9]))
.order_by(users.c.user_id)
)
- result = testing.db.execute(stmt, {"goofy": [15, 9]})
+ result = connection.execute(stmt, {"goofy": [15, 9]})
eq_(result.fetchall(), [(3, 1500), (4, 900)])
- def test_expanding_in(self):
+ def test_expanding_in(self, connection):
users = self.tables.users
self._data_fixture()
.where(users.c.goofy8.in_(bindparam("goofy", expanding=True)))
.order_by(users.c.user_id)
)
- result = testing.db.execute(stmt, {"goofy": [15, 9]})
+ result = connection.execute(stmt, {"goofy": [15, 9]})
eq_(result.fetchall(), [(3, 1500), (4, 900)])
Table("t", metadata, Column("data", String(50)))
- def test_insert_round_trip_cast(self):
- self._test_insert_round_trip(cast)
+ def test_insert_round_trip_cast(self, connection):
+ self._test_insert_round_trip(cast, connection)
- def test_insert_round_trip_type_coerce(self):
- self._test_insert_round_trip(type_coerce)
+ def test_insert_round_trip_type_coerce(self, connection):
+ self._test_insert_round_trip(type_coerce, connection)
- def _test_insert_round_trip(self, coerce_fn):
+ def _test_insert_round_trip(self, coerce_fn, conn):
MyType = self.MyType
t = self.tables.t
- t.insert().values(data=coerce_fn("d1", MyType)).execute()
+ conn.execute(t.insert().values(data=coerce_fn("d1", MyType)))
eq_(
- select([coerce_fn(t.c.data, MyType)]).execute().fetchall(),
+ conn.execute(select([coerce_fn(t.c.data, MyType)])).fetchall(),
[("BIND_INd1BIND_OUT",)],
)
- def test_coerce_from_nulltype_cast(self):
- self._test_coerce_from_nulltype(cast)
+ def test_coerce_from_nulltype_cast(self, connection):
+ self._test_coerce_from_nulltype(cast, connection)
- def test_coerce_from_nulltype_type_coerce(self):
- self._test_coerce_from_nulltype(type_coerce)
+ def test_coerce_from_nulltype_type_coerce(self, connection):
+ self._test_coerce_from_nulltype(type_coerce, connection)
- def _test_coerce_from_nulltype(self, coerce_fn):
+ def _test_coerce_from_nulltype(self, coerce_fn, conn):
MyType = self.MyType
# test coerce from nulltype - e.g. use an object that
t = self.tables.t
- t.insert().values(data=coerce_fn(MyObj(), MyType)).execute()
+ conn.execute(t.insert().values(data=coerce_fn(MyObj(), MyType)))
eq_(
- select([coerce_fn(t.c.data, MyType)]).execute().fetchall(),
+ conn.execute(select([coerce_fn(t.c.data, MyType)])).fetchall(),
[("BIND_INTHISISMYOBJBIND_OUT",)],
)
- def test_vs_non_coerced_cast(self):
- self._test_vs_non_coerced(cast)
+ def test_vs_non_coerced_cast(self, connection):
+ self._test_vs_non_coerced(cast, connection)
- def test_vs_non_coerced_type_coerce(self):
- self._test_vs_non_coerced(type_coerce)
+ def test_vs_non_coerced_type_coerce(self, connection):
+ self._test_vs_non_coerced(type_coerce, connection)
- def _test_vs_non_coerced(self, coerce_fn):
+ def _test_vs_non_coerced(self, coerce_fn, conn):
MyType = self.MyType
t = self.tables.t
- t.insert().values(data=coerce_fn("d1", MyType)).execute()
+ conn.execute(t.insert().values(data=coerce_fn("d1", MyType)))
eq_(
- select([t.c.data, coerce_fn(t.c.data, MyType)])
- .execute()
- .fetchall(),
+ conn.execute(
+ select([t.c.data, coerce_fn(t.c.data, MyType)])
+ ).fetchall(),
[("BIND_INd1", "BIND_INd1BIND_OUT")],
)
- def test_vs_non_coerced_alias_cast(self):
- self._test_vs_non_coerced_alias(cast)
+ def test_vs_non_coerced_alias_cast(self, connection):
+ self._test_vs_non_coerced_alias(cast, connection)
- def test_vs_non_coerced_alias_type_coerce(self):
- self._test_vs_non_coerced_alias(type_coerce)
+ def test_vs_non_coerced_alias_type_coerce(self, connection):
+ self._test_vs_non_coerced_alias(type_coerce, connection)
- def _test_vs_non_coerced_alias(self, coerce_fn):
+ def _test_vs_non_coerced_alias(self, coerce_fn, conn):
MyType = self.MyType
t = self.tables.t
- t.insert().values(data=coerce_fn("d1", MyType)).execute()
+ conn.execute(t.insert().values(data=coerce_fn("d1", MyType)))
eq_(
- select([t.c.data.label("x"), coerce_fn(t.c.data, MyType)])
- .alias()
- .select()
- .execute()
- .fetchall(),
+ conn.execute(
+ select([t.c.data.label("x"), coerce_fn(t.c.data, MyType)])
+ .alias()
+ .select()
+ ).fetchall(),
[("BIND_INd1", "BIND_INd1BIND_OUT")],
)
- def test_vs_non_coerced_where_cast(self):
- self._test_vs_non_coerced_where(cast)
+ def test_vs_non_coerced_where_cast(self, connection):
+ self._test_vs_non_coerced_where(cast, connection)
- def test_vs_non_coerced_where_type_coerce(self):
- self._test_vs_non_coerced_where(type_coerce)
+ def test_vs_non_coerced_where_type_coerce(self, connection):
+ self._test_vs_non_coerced_where(type_coerce, connection)
- def _test_vs_non_coerced_where(self, coerce_fn):
+ def _test_vs_non_coerced_where(self, coerce_fn, conn):
MyType = self.MyType
t = self.tables.t
- t.insert().values(data=coerce_fn("d1", MyType)).execute()
+ conn.execute(t.insert().values(data=coerce_fn("d1", MyType)))
# coerce on left side
eq_(
- select([t.c.data, coerce_fn(t.c.data, MyType)])
- .where(coerce_fn(t.c.data, MyType) == "d1")
- .execute()
- .fetchall(),
+ conn.execute(
+ select([t.c.data, coerce_fn(t.c.data, MyType)]).where(
+ coerce_fn(t.c.data, MyType) == "d1"
+ )
+ ).fetchall(),
[("BIND_INd1", "BIND_INd1BIND_OUT")],
)
# coerce on right side
eq_(
- select([t.c.data, coerce_fn(t.c.data, MyType)])
- .where(t.c.data == coerce_fn("d1", MyType))
- .execute()
- .fetchall(),
+ conn.execute(
+ select([t.c.data, coerce_fn(t.c.data, MyType)]).where(
+ t.c.data == coerce_fn("d1", MyType)
+ )
+ ).fetchall(),
[("BIND_INd1", "BIND_INd1BIND_OUT")],
)
- def test_coerce_none_cast(self):
- self._test_coerce_none(cast)
+ def test_coerce_none_cast(self, connection):
+ self._test_coerce_none(cast, connection)
- def test_coerce_none_type_coerce(self):
- self._test_coerce_none(type_coerce)
+ def test_coerce_none_type_coerce(self, connection):
+ self._test_coerce_none(type_coerce, connection)
- def _test_coerce_none(self, coerce_fn):
+ def _test_coerce_none(self, coerce_fn, conn):
MyType = self.MyType
t = self.tables.t
- t.insert().values(data=coerce_fn("d1", MyType)).execute()
+ conn.execute(t.insert().values(data=coerce_fn("d1", MyType)))
eq_(
- select([t.c.data, coerce_fn(t.c.data, MyType)])
- .where(t.c.data == coerce_fn(None, MyType))
- .execute()
- .fetchall(),
+ conn.execute(
+ select([t.c.data, coerce_fn(t.c.data, MyType)]).where(
+ t.c.data == coerce_fn(None, MyType)
+ )
+ ).fetchall(),
[],
)
eq_(
- select([t.c.data, coerce_fn(t.c.data, MyType)])
- .where(coerce_fn(t.c.data, MyType) == None)
- .execute() # noqa
- .fetchall(),
+ conn.execute(
+ select([t.c.data, coerce_fn(t.c.data, MyType)]).where(
+ coerce_fn(t.c.data, MyType) == None
+ )
+ ).fetchall(), # noqa
[],
)
- def test_resolve_clause_element_cast(self):
- self._test_resolve_clause_element(cast)
+ def test_resolve_clause_element_cast(self, connection):
+ self._test_resolve_clause_element(cast, connection)
- def test_resolve_clause_element_type_coerce(self):
- self._test_resolve_clause_element(type_coerce)
+ def test_resolve_clause_element_type_coerce(self, connection):
+ self._test_resolve_clause_element(type_coerce, connection)
- def _test_resolve_clause_element(self, coerce_fn):
+ def _test_resolve_clause_element(self, coerce_fn, conn):
MyType = self.MyType
t = self.tables.t
- t.insert().values(data=coerce_fn("d1", MyType)).execute()
+ conn.execute(t.insert().values(data=coerce_fn("d1", MyType)))
class MyFoob(object):
def __clause_element__(self):
return t.c.data
eq_(
- testing.db.execute(
+ conn.execute(
select([t.c.data, coerce_fn(MyFoob(), MyType)])
).fetchall(),
[("BIND_INd1", "BIND_INd1BIND_OUT")],
)
- def test_cast_replace_col_w_bind(self):
- self._test_replace_col_w_bind(cast)
+ def test_cast_replace_col_w_bind(self, connection):
+ self._test_replace_col_w_bind(cast, connection)
- def test_type_coerce_replace_col_w_bind(self):
- self._test_replace_col_w_bind(type_coerce)
+ def test_type_coerce_replace_col_w_bind(self, connection):
+ self._test_replace_col_w_bind(type_coerce, connection)
- def _test_replace_col_w_bind(self, coerce_fn):
+ def _test_replace_col_w_bind(self, coerce_fn, conn):
MyType = self.MyType
t = self.tables.t
- t.insert().values(data=coerce_fn("d1", MyType)).execute()
+ conn.execute(t.insert().values(data=coerce_fn("d1", MyType)))
stmt = select([t.c.data, coerce_fn(t.c.data, MyType)])
# original statement
eq_(
- testing.db.execute(stmt).fetchall(),
+ conn.execute(stmt).fetchall(),
[("BIND_INd1", "BIND_INd1BIND_OUT")],
)
# replaced with binds; CAST can't affect the bound parameter
# on the way in here
eq_(
- testing.db.execute(new_stmt).fetchall(),
+ conn.execute(new_stmt).fetchall(),
[("x", "BIND_INxBIND_OUT")]
if coerce_fn is type_coerce
else [("x", "xBIND_OUT")],
)
- def test_cast_bind(self):
- self._test_bind(cast)
+ def test_cast_bind(self, connection):
+ self._test_bind(cast, connection)
- def test_type_bind(self):
- self._test_bind(type_coerce)
+ def test_type_bind(self, connection):
+ self._test_bind(type_coerce, connection)
- def _test_bind(self, coerce_fn):
+ def _test_bind(self, coerce_fn, conn):
MyType = self.MyType
t = self.tables.t
- t.insert().values(data=coerce_fn("d1", MyType)).execute()
+ conn.execute(t.insert().values(data=coerce_fn("d1", MyType)))
stmt = select(
[
)
eq_(
- testing.db.execute(stmt).fetchall(),
+ conn.execute(stmt).fetchall(),
[("x", "BIND_INxBIND_OUT")]
if coerce_fn is type_coerce
else [("x", "xBIND_OUT")],
)
- def test_cast_existing_typed(self):
+ def test_cast_existing_typed(self, connection):
MyType = self.MyType
coerce_fn = cast
# when cast() is given an already typed value,
# the type does not take effect on the value itself.
eq_(
- testing.db.scalar(select([coerce_fn(literal("d1"), MyType)])),
+ connection.scalar(select([coerce_fn(literal("d1"), MyType)])),
"d1BIND_OUT",
)
- def test_type_coerce_existing_typed(self):
+ def test_type_coerce_existing_typed(self, connection):
MyType = self.MyType
coerce_fn = type_coerce
t = self.tables.t
# type_coerce does upgrade the given expression to the
# given type.
- t.insert().values(data=coerce_fn(literal("d1"), MyType)).execute()
+ connection.execute(
+ t.insert().values(data=coerce_fn(literal("d1"), MyType))
+ )
eq_(
- select([coerce_fn(t.c.data, MyType)]).execute().fetchall(),
+ connection.execute(
+ select([coerce_fn(t.c.data, MyType)])
+ ).fetchall(),
[("BIND_INd1BIND_OUT",)],
)
"foo",
)
- def test_validators_not_in_like_roundtrip(self):
+ def test_validators_not_in_like_roundtrip(self, connection):
enum_table = self.tables["non_native_enum_table"]
- enum_table.insert().execute(
+ connection.execute(
+ enum_table.insert(),
[
{"id": 1, "someenum": "two"},
{"id": 2, "someenum": "two"},
{"id": 3, "someenum": "one"},
- ]
+ ],
)
eq_(
- enum_table.select()
- .where(enum_table.c.someenum.like("%wo%"))
- .order_by(enum_table.c.id)
- .execute()
- .fetchall(),
+ connection.execute(
+ enum_table.select()
+ .where(enum_table.c.someenum.like("%wo%"))
+ .order_by(enum_table.c.id)
+ ).fetchall(),
[(1, "two", None), (2, "two", None)],
)
- def test_validators_not_in_concatenate_roundtrip(self):
+ def test_validators_not_in_concatenate_roundtrip(self, connection):
enum_table = self.tables["non_native_enum_table"]
- enum_table.insert().execute(
+ connection.execute(
+ enum_table.insert(),
[
{"id": 1, "someenum": "two"},
{"id": 2, "someenum": "two"},
{"id": 3, "someenum": "one"},
- ]
+ ],
)
eq_(
- select(["foo" + enum_table.c.someenum])
- .order_by(enum_table.c.id)
- .execute()
- .fetchall(),
+ connection.execute(
+ select(["foo" + enum_table.c.someenum]).order_by(
+ enum_table.c.id
+ )
+ ).fetchall(),
[("footwo",), ("footwo",), ("fooone",)],
)
- def test_round_trip(self):
+ def test_round_trip(self, connection):
enum_table = self.tables["enum_table"]
- enum_table.insert().execute(
+ connection.execute(
+ enum_table.insert(),
[
{"id": 1, "someenum": "two"},
{"id": 2, "someenum": "two"},
{"id": 3, "someenum": "one"},
- ]
+ ],
)
eq_(
- enum_table.select().order_by(enum_table.c.id).execute().fetchall(),
+ connection.execute(
+ enum_table.select().order_by(enum_table.c.id)
+ ).fetchall(),
[(1, "two"), (2, "two"), (3, "one")],
)
- def test_null_round_trip(self):
+ def test_null_round_trip(self, connection):
enum_table = self.tables.enum_table
non_native_enum_table = self.tables.non_native_enum_table
- with testing.db.connect() as conn:
- conn.execute(enum_table.insert(), {"id": 1, "someenum": None})
- eq_(conn.scalar(select([enum_table.c.someenum])), None)
+ connection.execute(enum_table.insert(), {"id": 1, "someenum": None})
+ eq_(connection.scalar(select([enum_table.c.someenum])), None)
- with testing.db.connect() as conn:
- conn.execute(
- non_native_enum_table.insert(), {"id": 1, "someenum": None}
- )
- eq_(conn.scalar(select([non_native_enum_table.c.someenum])), None)
+ connection.execute(
+ non_native_enum_table.insert(), {"id": 1, "someenum": None}
+ )
+ eq_(
+ connection.scalar(select([non_native_enum_table.c.someenum])), None
+ )
@testing.requires.enforces_check_constraints
def test_check_constraint(self, connection):
select([self.tables.non_native_enum_table.c.someotherenum]),
)
- def test_non_native_round_trip(self):
+ def test_non_native_round_trip(self, connection):
non_native_enum_table = self.tables["non_native_enum_table"]
- non_native_enum_table.insert().execute(
+ connection.execute(
+ non_native_enum_table.insert(),
[
{"id": 1, "someenum": "two"},
{"id": 2, "someenum": "two"},
{"id": 3, "someenum": "one"},
- ]
+ ],
)
eq_(
- select(
- [non_native_enum_table.c.id, non_native_enum_table.c.someenum]
- )
- .order_by(non_native_enum_table.c.id)
- .execute()
- .fetchall(),
+ connection.execute(
+ select(
+ [
+ non_native_enum_table.c.id,
+ non_native_enum_table.c.someenum,
+ ]
+ ).order_by(non_native_enum_table.c.id)
+ ).fetchall(),
[(1, "two"), (2, "two"), (3, "one")],
)
typ = Enum(self.SomeEnum, sort_key_function=None)
is_(typ.sort_key_function, None)
- def test_pep435_enum_round_trip(self):
+ def test_pep435_enum_round_trip(self, connection):
stdlib_enum_table = self.tables["stdlib_enum_table"]
- stdlib_enum_table.insert().execute(
+ connection.execute(
+ stdlib_enum_table.insert(),
[
{"id": 1, "someenum": self.SomeEnum.two},
{"id": 2, "someenum": self.SomeEnum.two},
{"id": 5, "someenum": self.SomeEnum.four},
{"id": 6, "someenum": "three"},
{"id": 7, "someenum": "four"},
- ]
+ ],
)
eq_(
- stdlib_enum_table.select()
- .order_by(stdlib_enum_table.c.id)
- .execute()
- .fetchall(),
+ connection.execute(
+ stdlib_enum_table.select().order_by(stdlib_enum_table.c.id)
+ ).fetchall(),
[
(1, self.SomeEnum.two),
(2, self.SomeEnum.two),
],
)
- def test_pep435_enum_values_callable_round_trip(self):
+ def test_pep435_enum_values_callable_round_trip(self, connection):
stdlib_enum_table_custom_values = self.tables["stdlib_enum_table2"]
- stdlib_enum_table_custom_values.insert().execute(
+ connection.execute(
+ stdlib_enum_table_custom_values.insert(),
[
{"id": 1, "someotherenum": self.SomeOtherEnum.AMember},
{"id": 2, "someotherenum": self.SomeOtherEnum.BMember},
{"id": 3, "someotherenum": self.SomeOtherEnum.AMember},
- ]
+ ],
)
eq_(
- stdlib_enum_table_custom_values.select()
- .order_by(stdlib_enum_table_custom_values.c.id)
- .execute()
- .fetchall(),
+ connection.execute(
+ stdlib_enum_table_custom_values.select().order_by(
+ stdlib_enum_table_custom_values.c.id
+ )
+ ).fetchall(),
[
(1, self.SomeOtherEnum.AMember),
(2, self.SomeOtherEnum.BMember),
],
)
- def test_pep435_enum_expanding_in(self):
+ def test_pep435_enum_expanding_in(self, connection):
stdlib_enum_table_custom_values = self.tables["stdlib_enum_table2"]
- stdlib_enum_table_custom_values.insert().execute(
+ connection.execute(
+ stdlib_enum_table_custom_values.insert(),
[
{"id": 1, "someotherenum": self.SomeOtherEnum.one},
{"id": 2, "someotherenum": self.SomeOtherEnum.two},
{"id": 3, "someotherenum": self.SomeOtherEnum.three},
- ]
+ ],
)
stmt = (
.order_by(stdlib_enum_table_custom_values.c.id)
)
eq_(
- testing.db.execute(
+ connection.execute(
stmt,
{"member": [self.SomeOtherEnum.one, self.SomeOtherEnum.three]},
).fetchall(),
dialect="default",
)
- def test_lookup_failure(self):
+ def test_lookup_failure(self, connection):
assert_raises(
exc.StatementError,
- self.tables["non_native_enum_table"].insert().execute,
+ connection.execute,
+ self.tables["non_native_enum_table"].insert(),
{"id": 4, "someotherenum": "four"},
)
@engines.close_first
def teardown(self):
- binary_table.delete().execute()
+ with testing.db.connect() as conn:
+ conn.execute(binary_table.delete())
@classmethod
def teardown_class(cls):
metadata.drop_all()
@testing.requires.non_broken_binary
- def test_round_trip(self):
+ def test_round_trip(self, connection):
testobj1 = pickleable.Foo("im foo 1")
testobj2 = pickleable.Foo("im foo 2")
testobj3 = pickleable.Foo("im foo 3")
stream1 = self.load_stream("binary_data_one.dat")
stream2 = self.load_stream("binary_data_two.dat")
- binary_table.insert().execute(
+ connection.execute(
+ binary_table.insert(),
primary_id=1,
misc="binary_data_one.dat",
data=stream1,
pickled=testobj1,
mypickle=testobj3,
)
- binary_table.insert().execute(
+ connection.execute(
+ binary_table.insert(),
primary_id=2,
misc="binary_data_two.dat",
data=stream2,
data_slice=stream2[0:99],
pickled=testobj2,
)
- binary_table.insert().execute(
+ connection.execute(
+ binary_table.insert(),
primary_id=3,
misc="binary_data_two.dat",
data=None,
}
),
):
- result = stmt.execute().fetchall()
+ result = connection.execute(stmt).fetchall()
eq_(stream1, result[0]._mapping["data"])
eq_(stream1[0:100], result[0]._mapping["data_slice"])
eq_(stream2, result[1]._mapping["data"])
)
@testing.requires.binary_comparisons
- def test_comparison(self):
+ def test_comparison(self, connection):
"""test that type coercion occurs on comparison for binary"""
expr = binary_table.c.data == "foo"
assert isinstance(expr.right.type, LargeBinary)
data = os.urandom(32)
- binary_table.insert().execute(data=data)
+ connection.execute(binary_table.insert(), data=data)
eq_(
- select([func.count("*")])
- .select_from(binary_table)
- .where(binary_table.c.data == data)
- .scalar(),
+ connection.scalar(
+ select([func.count("*")])
+ .select_from(binary_table)
+ .where(binary_table.c.data == data)
+ ),
1,
)
@testing.requires.binary_literals
- def test_literal_roundtrip(self):
+ def test_literal_roundtrip(self, connection):
compiled = select([cast(literal(util.b("foo")), LargeBinary)]).compile(
dialect=testing.db.dialect, compile_kwargs={"literal_binds": True}
)
- result = testing.db.execute(compiled)
+ result = connection.execute(compiled)
eq_(result.scalar(), util.b("foo"))
def test_bind_processor_no_dbapi(self):
meta.create_all()
- test_table.insert().execute(
- {
- "id": 1,
- "data": "somedata",
- "atimestamp": datetime.date(2007, 10, 15),
- "avalue": 25,
- "bvalue": "foo",
- }
- )
+ with testing.db.connect() as conn:
+ conn.execute(
+ test_table.insert(),
+ {
+ "id": 1,
+ "data": "somedata",
+ "atimestamp": datetime.date(2007, 10, 15),
+ "avalue": 25,
+ "bvalue": "foo",
+ },
+ )
@classmethod
def teardown_class(cls):
],
)
- def test_bind_adapt(self):
+ def test_bind_adapt(self, connection):
# test an untyped bind gets the left side's type
expr = test_table.c.atimestamp == bindparam("thedate")
eq_(expr.right.type._type_affinity, Date)
eq_(
- testing.db.execute(
+ connection.execute(
select(
[
test_table.c.id,
eq_(expr.right.type._type_affinity, MyCustomType)
eq_(
- testing.db.execute(
+ connection.execute(
test_table.select().where(expr), {"somevalue": 25}
).fetchall(),
[
eq_(expr.right.type._type_affinity, String)
eq_(
- testing.db.execute(
+ connection.execute(
test_table.select().where(expr), {"somevalue": "foo"}
).fetchall(),
[
def test_actual_literal_adapters(self, data, expected):
is_(literal(data).type.__class__, expected)
- def test_typedec_operator_adapt(self):
+ def test_typedec_operator_adapt(self, connection):
expr = test_table.c.bvalue + "hi"
assert expr.type.__class__ is MyTypeDec
assert expr.right.type.__class__ is MyTypeDec
eq_(
- testing.db.execute(select([expr.label("foo")])).scalar(),
+ connection.execute(select([expr.label("foo")])).scalar(),
"BIND_INfooBIND_INhiBIND_OUT",
)
dialect=default.DefaultDialect(supports_native_boolean=True),
)
- def test_typedec_righthand_coercion(self):
+ def test_typedec_righthand_coercion(self, connection):
class MyTypeDec(types.TypeDecorator):
impl = String
is_(expr.type.__class__, MyTypeDec)
eq_(
- testing.db.execute(select([expr.label("foo")])).scalar(),
+ connection.execute(select([expr.label("foo")])).scalar(),
"BIND_INfooBIND_IN6BIND_OUT",
)
expr = bindparam("bar") + bindparam("foo")
eq_(expr.type, types.NULLTYPE)
- def test_distinct(self):
+ def test_distinct(self, connection):
s = select([distinct(test_table.c.avalue)])
- eq_(testing.db.execute(s).scalar(), 25)
+ eq_(connection.execute(s).scalar(), 25)
s = select([test_table.c.avalue.distinct()])
- eq_(testing.db.execute(s).scalar(), 25)
+ eq_(connection.execute(s).scalar(), 25)
assert distinct(test_table.c.data).type == test_table.c.data.type
assert test_table.c.data.distinct().type == test_table.c.data.type
def _fixture(self, metadata, type_, data):
t = Table("t", metadata, Column("val", type_))
metadata.create_all()
- t.insert().execute(val=data)
+ with testing.db.connect() as conn:
+ conn.execute(t.insert(), val=data)
@testing.fails_on("sqlite", "Doesn't provide Decimal results natively")
@testing.provide_metadata
@engines.close_first
def teardown(self):
- interval_table.delete().execute()
+ with testing.db.connect() as conn:
+ conn.execute(interval_table.delete())
@classmethod
def teardown_class(cls):