Column("full", Boolean),
Column("goofy", GoofyType(50)),
)
- table.create(checkfirst=True)
+ with testing.db.connect() as conn:
+ table.create(conn, checkfirst=True)
def teardown(self):
- table.drop()
+ with testing.db.connect() as conn:
+ table.drop(conn)
- def test_column_targeting(self):
- result = (
- table.insert()
- .returning(table.c.id, table.c.full)
- .execute({"persons": 1, "full": False})
+ def test_column_targeting(self, connection):
+ result = connection.execute(
+ table.insert().returning(table.c.id, table.c.full),
+ {"persons": 1, "full": False},
)
row = result.first()._mapping
assert row[table.c.full] == row["full"]
assert row["full"] is False
- result = (
+ result = connection.execute(
table.insert()
.values(persons=5, full=True, goofy="somegoofy")
.returning(table.c.persons, table.c.full, table.c.goofy)
- .execute()
)
row = result.first()._mapping
assert row[table.c.persons] == row["persons"] == 5
eq_(row["goofy"], "FOOsomegoofyBAR")
@testing.fails_on("firebird", "fb can't handle returning x AS y")
- def test_labeling(self):
- result = (
+ def test_labeling(self, connection):
+ result = connection.execute(
table.insert()
.values(persons=6)
.returning(table.c.persons.label("lala"))
- .execute()
)
row = result.first()._mapping
assert row["lala"] == 6
@testing.fails_on(
"firebird", "fb/kintersbasdb can't handle the bind params"
)
- def test_anon_expressions(self):
- result = (
+ def test_anon_expressions(self, connection):
+ result = connection.execute(
table.insert()
.values(goofy="someOTHERgoofy")
.returning(func.lower(table.c.goofy, type_=GoofyType))
- .execute()
)
row = result.first()
eq_(row[0], "foosomeothergoofyBAR")
- result = (
- table.insert()
- .values(persons=12)
- .returning(table.c.persons + 18)
- .execute()
+ result = connection.execute(
+ table.insert().values(persons=12).returning(table.c.persons + 18)
)
row = result.first()
eq_(row[0], 30)
- def test_update_returning(self):
- table.insert().execute(
- [{"persons": 5, "full": False}, {"persons": 3, "full": False}]
+ def test_update_returning(self, connection):
+ connection.execute(
+ table.insert(),
+ [{"persons": 5, "full": False}, {"persons": 3, "full": False}],
)
- result = (
- table.update(table.c.persons > 4, dict(full=True))
- .returning(table.c.id)
- .execute()
+ result = connection.execute(
+ table.update(table.c.persons > 4, dict(full=True)).returning(
+ table.c.id
+ )
)
eq_(result.fetchall(), [(1,)])
- result2 = (
- select([table.c.id, table.c.full]).order_by(table.c.id).execute()
+ result2 = connection.execute(
+ select([table.c.id, table.c.full]).order_by(table.c.id)
)
eq_(result2.fetchall(), [(1, True), (2, False)])
- def test_insert_returning(self):
- result = (
- table.insert()
- .returning(table.c.id)
- .execute({"persons": 1, "full": False})
+ def test_insert_returning(self, connection):
+ result = connection.execute(
+ table.insert().returning(table.c.id), {"persons": 1, "full": False}
)
eq_(result.fetchall(), [(1,)])
@testing.requires.multivalues_inserts
- def test_multirow_returning(self):
+ def test_multirow_returning(self, connection):
ins = (
table.insert()
.returning(table.c.id, table.c.persons)
]
)
)
- result = testing.db.execute(ins)
+ result = connection.execute(ins)
eq_(result.fetchall(), [(1, 1), (2, 2), (3, 3)])
- def test_no_ipk_on_returning(self):
- result = testing.db.execute(
+ def test_no_ipk_on_returning(self, connection):
+ result = connection.execute(
table.insert().returning(table.c.id), {"persons": 1, "full": False}
)
assert_raises_message(
)
eq_([dict(row._mapping) for row in result4], [{"persons": 10}])
- def test_delete_returning(self):
- table.insert().execute(
- [{"persons": 5, "full": False}, {"persons": 3, "full": False}]
+ def test_delete_returning(self, connection):
+ connection.execute(
+ table.insert(),
+ [{"persons": 5, "full": False}, {"persons": 3, "full": False}],
)
- result = (
- table.delete(table.c.persons > 4).returning(table.c.id).execute()
+ result = connection.execute(
+ table.delete(table.c.persons > 4).returning(table.c.id)
)
eq_(result.fetchall(), [(1,)])
- result2 = (
- select([table.c.id, table.c.full]).order_by(table.c.id).execute()
+ result2 = connection.execute(
+ select([table.c.id, table.c.full]).order_by(table.c.id)
)
eq_(result2.fetchall(), [(2, False)])
__backend__ = True
@testing.provide_metadata
- def test_select_doesnt_pollute_result(self):
+ def test_select_doesnt_pollute_result(self, connection):
class MyType(TypeDecorator):
impl = Integer
t2 = Table("t2", self.metadata, Column("x", Integer))
- self.metadata.create_all(testing.db)
- with testing.db.connect() as conn:
- conn.execute(t1.insert().values(x=5))
+ self.metadata.create_all(connection)
+ connection.execute(t1.insert().values(x=5))
- stmt = (
- t2.insert()
- .values(x=select([t1.c.x]).scalar_subquery())
- .returning(t2.c.x)
- )
+ stmt = (
+ t2.insert()
+ .values(x=select([t1.c.x]).scalar_subquery())
+ .returning(t2.c.x)
+ )
- result = conn.execute(stmt)
- eq_(result.scalar(), 5)
+ result = connection.execute(stmt)
+ eq_(result.scalar(), 5)
class SequenceReturningTest(fixtures.TestBase):
Column("id", Integer, seq, primary_key=True),
Column("data", String(50)),
)
- table.create(checkfirst=True)
+ with testing.db.connect() as conn:
+ table.create(conn, checkfirst=True)
def teardown(self):
- table.drop()
+ with testing.db.connect() as conn:
+ table.drop(conn)
- def test_insert(self):
- r = table.insert().values(data="hi").returning(table.c.id).execute()
+ def test_insert(self, connection):
+ r = connection.execute(
+ table.insert().values(data="hi").returning(table.c.id)
+ )
assert r.first() == (1,)
- assert seq.execute() == 2
+ assert connection.execute(seq) == 2
class KeyReturningTest(fixtures.TestBase, AssertsExecutionResults):
),
Column("data", String(20)),
)
- table.create(checkfirst=True)
+ with testing.db.connect() as conn:
+ table.create(conn, checkfirst=True)
def teardown(self):
- table.drop()
+ with testing.db.connect() as conn:
+ table.drop(conn)
@testing.exclude("firebird", "<", (2, 0), "2.0+ feature")
@testing.exclude("postgresql", "<", (8, 2), "8.2+ feature")
- def test_insert(self):
- result = (
- table.insert().returning(table.c.foo_id).execute(data="somedata")
+ def test_insert(self, connection):
+ result = connection.execute(
+ table.insert().returning(table.c.foo_id), data="somedata"
)
row = result.first()._mapping
assert row[table.c.foo_id] == row["id"] == 1
- result = table.select().execute().first()._mapping
+ result = connection.execute(table.select()).first()._mapping
assert row[table.c.foo_id] == row["id"] == 1
Column("upddef", Integer, onupdate=IncDefault()),
)
- def test_chained_insert_pk(self):
+ def test_chained_insert_pk(self, connection):
t1 = self.tables.t1
- result = testing.db.execute(
+ result = connection.execute(
t1.insert().values(upddef=1).return_defaults(t1.c.insdef)
)
eq_(
[1, 0],
)
- def test_arg_insert_pk(self):
+ def test_arg_insert_pk(self, connection):
t1 = self.tables.t1
- result = testing.db.execute(
+ result = connection.execute(
t1.insert(return_defaults=[t1.c.insdef]).values(upddef=1)
)
eq_(
[1, 0],
)
- def test_chained_update_pk(self):
+ def test_chained_update_pk(self, connection):
t1 = self.tables.t1
- testing.db.execute(t1.insert().values(upddef=1))
- result = testing.db.execute(
+ connection.execute(t1.insert().values(upddef=1))
+ result = connection.execute(
t1.update().values(data="d1").return_defaults(t1.c.upddef)
)
eq_(
[result.returned_defaults._mapping[k] for k in (t1.c.upddef,)], [1]
)
- def test_arg_update_pk(self):
+ def test_arg_update_pk(self, connection):
t1 = self.tables.t1
- testing.db.execute(t1.insert().values(upddef=1))
- result = testing.db.execute(
+ connection.execute(t1.insert().values(upddef=1))
+ result = connection.execute(
t1.update(return_defaults=[t1.c.upddef]).values(data="d1")
)
eq_(
[result.returned_defaults._mapping[k] for k in (t1.c.upddef,)], [1]
)
- def test_insert_non_default(self):
+ def test_insert_non_default(self, connection):
"""test that a column not marked at all as a
default works with this feature."""
t1 = self.tables.t1
- result = testing.db.execute(
+ result = connection.execute(
t1.insert().values(upddef=1).return_defaults(t1.c.data)
)
eq_(
[1, None],
)
- def test_update_non_default(self):
+ def test_update_non_default(self, connection):
"""test that a column not marked at all as a
default works with this feature."""
t1 = self.tables.t1
- testing.db.execute(t1.insert().values(upddef=1))
- result = testing.db.execute(
+ connection.execute(t1.insert().values(upddef=1))
+ result = connection.execute(
t1.update().values(upddef=2).return_defaults(t1.c.data)
)
eq_(
[None],
)
- def test_insert_non_default_plus_default(self):
+ def test_insert_non_default_plus_default(self, connection):
t1 = self.tables.t1
- result = testing.db.execute(
+ result = connection.execute(
t1.insert()
.values(upddef=1)
.return_defaults(t1.c.data, t1.c.insdef)
{"id": 1, "data": None, "insdef": 0},
)
- def test_update_non_default_plus_default(self):
+ def test_update_non_default_plus_default(self, connection):
t1 = self.tables.t1
- testing.db.execute(t1.insert().values(upddef=1))
- result = testing.db.execute(
+ connection.execute(t1.insert().values(upddef=1))
+ result = connection.execute(
t1.update()
.values(insdef=2)
.return_defaults(t1.c.data, t1.c.upddef)
{"data": None, "upddef": 1},
)
- def test_insert_all(self):
+ def test_insert_all(self, connection):
t1 = self.tables.t1
- result = testing.db.execute(
+ result = connection.execute(
t1.insert().values(upddef=1).return_defaults()
)
eq_(
{"id": 1, "data": None, "insdef": 0},
)
- def test_update_all(self):
+ def test_update_all(self, connection):
t1 = self.tables.t1
- testing.db.execute(t1.insert().values(upddef=1))
- result = testing.db.execute(
+ connection.execute(t1.insert().values(upddef=1))
+ result = connection.execute(
t1.update().values(insdef=2).return_defaults()
)
eq_(dict(result.returned_defaults._mapping), {"upddef": 1})