@classmethod
def insert_data(cls):
- config.db.execute(
- cls.tables.plain_pk.insert(),
- [
- {"id": 1, "data": "d1"},
- {"id": 2, "data": "d2"},
- {"id": 3, "data": "d3"},
- ],
- )
+ with config.db.connect() as conn:
+ conn.execute(
+ cls.tables.plain_pk.insert(),
+ [
+ {"id": 1, "data": "d1"},
+ {"id": 2, "data": "d2"},
+ {"id": 3, "data": "d3"},
+ ],
+ )
- config.db.execute(
- cls.tables.has_dates.insert(),
- [{"id": 1, "today": datetime.datetime(2006, 5, 12, 12, 0, 0)}],
- )
+ conn.execute(
+ cls.tables.has_dates.insert(),
+ [{"id": 1, "today": datetime.datetime(2006, 5, 12, 12, 0, 0)}],
+ )
- def test_via_attr(self):
- row = config.db.execute(
+ def test_via_attr(self, connection):
+ row = connection.execute(
self.tables.plain_pk.select().order_by(self.tables.plain_pk.c.id)
).first()
eq_(row.id, 1)
eq_(row.data, "d1")
- def test_via_string(self):
- row = config.db.execute(
+ def test_via_string(self, connection):
+ row = connection.execute(
self.tables.plain_pk.select().order_by(self.tables.plain_pk.c.id)
).first()
eq_(row._mapping["id"], 1)
eq_(row._mapping["data"], "d1")
- def test_via_int(self):
- row = config.db.execute(
+ def test_via_int(self, connection):
+ row = connection.execute(
self.tables.plain_pk.select().order_by(self.tables.plain_pk.c.id)
).first()
eq_(row[0], 1)
eq_(row[1], "d1")
- def test_via_col_object(self):
- row = config.db.execute(
+ def test_via_col_object(self, connection):
+ row = connection.execute(
self.tables.plain_pk.select().order_by(self.tables.plain_pk.c.id)
).first()
eq_(row._mapping[self.tables.plain_pk.c.data], "d1")
@requirements.duplicate_names_in_cursor_description
- def test_row_with_dupe_names(self):
- result = config.db.execute(
+ def test_row_with_dupe_names(self, connection):
+ result = connection.execute(
select(
[
self.tables.plain_pk.c.data,
eq_(result.keys(), ["data", "data"])
eq_(row, ("d1", "d1"))
- def test_row_w_scalar_select(self):
+ def test_row_w_scalar_select(self, connection):
"""test that a scalar select as a column is returned as such
and that type conversion works OK.
datetable = self.tables.has_dates
s = select([datetable.alias("x").c.today]).scalar_subquery()
s2 = select([datetable.c.id, s.label("somelabel")])
- row = config.db.execute(s2).first()
+ row = connection.execute(s2).first()
eq_(row.somelabel, datetime.datetime(2006, 5, 12, 12, 0, 0))
sql.column("spaces % more spaces"),
)
- def test_single_roundtrip(self):
+ def test_single_roundtrip(self, connection):
percent_table = self.tables.percent_table
for params in [
{"percent%": 5, "spaces % more spaces": 12},
{"percent%": 9, "spaces % more spaces": 10},
{"percent%": 11, "spaces % more spaces": 9},
]:
- config.db.execute(percent_table.insert(), params)
- self._assert_table()
+ connection.execute(percent_table.insert(), params)
+ self._assert_table(connection)
- def test_executemany_roundtrip(self):
+ def test_executemany_roundtrip(self, connection):
percent_table = self.tables.percent_table
- config.db.execute(
+ connection.execute(
percent_table.insert(), {"percent%": 5, "spaces % more spaces": 12}
)
- config.db.execute(
+ connection.execute(
percent_table.insert(),
[
{"percent%": 7, "spaces % more spaces": 11},
{"percent%": 11, "spaces % more spaces": 9},
],
)
- self._assert_table()
+ self._assert_table(connection)
- def _assert_table(self):
+ def _assert_table(self, conn):
percent_table = self.tables.percent_table
lightweight_percent_table = self.tables.lightweight_percent_table
):
eq_(
list(
- config.db.execute(
- table.select().order_by(table.c["percent%"])
- )
+ conn.execute(table.select().order_by(table.c["percent%"]))
),
[(5, 12), (7, 11), (9, 10), (11, 9)],
)
eq_(
list(
- config.db.execute(
+ conn.execute(
table.select()
.where(table.c["spaces % more spaces"].in_([9, 10]))
.order_by(table.c["percent%"])
[(9, 10), (11, 9)],
)
- row = config.db.execute(
+ row = conn.execute(
table.select().order_by(table.c["percent%"])
).first()
eq_(row._mapping["percent%"], 5)
eq_(row._mapping[table.c["percent%"]], 5)
eq_(row._mapping[table.c["spaces % more spaces"]], 12)
- config.db.execute(
+ conn.execute(
percent_table.update().values(
{percent_table.c["spaces % more spaces"]: 15}
)
eq_(
list(
- config.db.execute(
+ conn.execute(
percent_table.select().order_by(
percent_table.c["percent%"]
)