def _assert_round_trip(self, table, conn):
row = conn.execute(table.select()).first()
- eq_(row, (config.db.dialect.default_sequence_base, "some data"))
+ eq_(row, (conn.engine.dialect.default_sequence_base, "some data"))
- def test_autoincrement_on_insert(self):
+ def test_autoincrement_on_insert(self, connection):
- config.db.execute(self.tables.autoinc_pk.insert(), data="some data")
- self._assert_round_trip(self.tables.autoinc_pk, config.db)
+ connection.execute(self.tables.autoinc_pk.insert(), data="some data")
+ self._assert_round_trip(self.tables.autoinc_pk, connection)
- def test_last_inserted_id(self):
+ def test_last_inserted_id(self, connection):
- r = config.db.execute(
+ r = connection.execute(
self.tables.autoinc_pk.insert(), data="some data"
)
- pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
+ pk = connection.scalar(select([self.tables.autoinc_pk.c.id]))
eq_(r.inserted_primary_key, [pk])
@requirements.dbapi_lastrowid
- def test_native_lastrowid_autoinc(self):
- r = config.db.execute(
+ def test_native_lastrowid_autoinc(self, connection):
+ r = connection.execute(
self.tables.autoinc_pk.insert(), data="some data"
)
lastrowid = r.lastrowid
- pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
+ pk = connection.scalar(select([self.tables.autoinc_pk.c.id]))
eq_(lastrowid, pk)
assert not r.returns_rows
@requirements.returning
- def test_autoclose_on_insert_implicit_returning(self):
- r = config.db.execute(
+ def test_autoclose_on_insert_implicit_returning(self, connection):
+ r = connection.execute(
self.tables.autoinc_pk.insert(), data="some data"
)
assert r._soft_closed
assert not r.returns_rows
@requirements.empty_inserts
- def test_empty_insert(self):
- r = config.db.execute(self.tables.autoinc_pk.insert())
+ def test_empty_insert(self, connection):
+ r = connection.execute(self.tables.autoinc_pk.insert())
assert r._soft_closed
assert not r.closed
- r = config.db.execute(
+ r = connection.execute(
self.tables.autoinc_pk.select().where(
self.tables.autoinc_pk.c.id != None
)
assert len(r.fetchall())
@requirements.insert_from_select
- def test_insert_from_select_autoinc(self):
+ def test_insert_from_select_autoinc(self, connection):
src_table = self.tables.manual_pk
dest_table = self.tables.autoinc_pk
- config.db.execute(
+ connection.execute(
src_table.insert(),
[
dict(id=1, data="data1"),
],
)
- result = config.db.execute(
+ result = connection.execute(
dest_table.insert().from_select(
("data",),
select([src_table.c.data]).where(
eq_(result.inserted_primary_key, [None])
- result = config.db.execute(
+ result = connection.execute(
select([dest_table.c.data]).order_by(dest_table.c.data)
)
eq_(result.fetchall(), [("data2",), ("data3",)])
@requirements.insert_from_select
- def test_insert_from_select_autoinc_no_rows(self):
+ def test_insert_from_select_autoinc_no_rows(self, connection):
src_table = self.tables.manual_pk
dest_table = self.tables.autoinc_pk
- result = config.db.execute(
+ result = connection.execute(
dest_table.insert().from_select(
("data",),
select([src_table.c.data]).where(
)
eq_(result.inserted_primary_key, [None])
- result = config.db.execute(
+ result = connection.execute(
select([dest_table.c.data]).order_by(dest_table.c.data)
)
eq_(result.fetchall(), [])
@requirements.insert_from_select
- def test_insert_from_select(self):
+ def test_insert_from_select(self, connection):
table = self.tables.manual_pk
- config.db.execute(
+ connection.execute(
table.insert(),
[
dict(id=1, data="data1"),
],
)
- config.db.execute(
+ connection.execute(
table.insert(inline=True).from_select(
("id", "data"),
select([table.c.id + 5, table.c.data]).where(
)
eq_(
- config.db.execute(
+ connection.execute(
select([table.c.data]).order_by(table.c.data)
).fetchall(),
[("data1",), ("data2",), ("data2",), ("data3",), ("data3",)],
)
@requirements.insert_from_select
- def test_insert_from_select_with_defaults(self):
+ def test_insert_from_select_with_defaults(self, connection):
table = self.tables.includes_defaults
- config.db.execute(
+ connection.execute(
table.insert(),
[
dict(id=1, data="data1"),
],
)
- config.db.execute(
+ connection.execute(
table.insert(inline=True).from_select(
("id", "data"),
select([table.c.id + 5, table.c.data]).where(
)
eq_(
- config.db.execute(
+ connection.execute(
select([table]).order_by(table.c.data, table.c.id)
).fetchall(),
[
def _assert_round_trip(self, table, conn):
row = conn.execute(table.select()).first()
- eq_(row, (config.db.dialect.default_sequence_base, "some data"))
+ eq_(row, (conn.engine.dialect.default_sequence_base, "some data"))
@classmethod
def define_tables(cls, metadata):
)
@requirements.fetch_rows_post_commit
- def test_explicit_returning_pk_autocommit(self):
- engine = config.db
+ def test_explicit_returning_pk_autocommit(self, connection):
table = self.tables.autoinc_pk
- with engine.begin() as conn:
- r = conn.execute(
- table.insert().returning(table.c.id), data="some data"
- )
+ r = connection.execute(
+ table.insert().returning(table.c.id), data="some data"
+ )
pk = r.first()[0]
- fetched_pk = config.db.scalar(select([table.c.id]))
+ fetched_pk = connection.scalar(select([table.c.id]))
eq_(fetched_pk, pk)
- def test_explicit_returning_pk_no_autocommit(self):
- engine = config.db
+ def test_explicit_returning_pk_no_autocommit(self, connection):
table = self.tables.autoinc_pk
- with engine.begin() as conn:
- r = conn.execute(
- table.insert().returning(table.c.id), data="some data"
- )
- pk = r.first()[0]
- fetched_pk = config.db.scalar(select([table.c.id]))
+ r = connection.execute(
+ table.insert().returning(table.c.id), data="some data"
+ )
+ pk = r.first()[0]
+ fetched_pk = connection.scalar(select([table.c.id]))
eq_(fetched_pk, pk)
- def test_autoincrement_on_insert_implicit_returning(self):
+ def test_autoincrement_on_insert_implicit_returning(self, connection):
- config.db.execute(self.tables.autoinc_pk.insert(), data="some data")
- self._assert_round_trip(self.tables.autoinc_pk, config.db)
+ connection.execute(self.tables.autoinc_pk.insert(), data="some data")
+ self._assert_round_trip(self.tables.autoinc_pk, connection)
- def test_last_inserted_id_implicit_returning(self):
+ def test_last_inserted_id_implicit_returning(self, connection):
- r = config.db.execute(
+ r = connection.execute(
self.tables.autoinc_pk.insert(), data="some data"
)
- pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
+ pk = connection.scalar(select([self.tables.autoinc_pk.c.id]))
eq_(r.inserted_primary_key, [pk])