@classmethod
def insert_data(cls):
- cls.tables.keyed1.insert().execute(dict(b="a1", q="c1"))
- cls.tables.keyed2.insert().execute(dict(a="a2", b="b2"))
- cls.tables.keyed3.insert().execute(dict(a="a3", d="d3"))
- cls.tables.keyed4.insert().execute(dict(b="b4", q="q4"))
- cls.tables.content.insert().execute(type="t1")
-
- if testing.requires.schemas.enabled:
- cls.tables[
- "%s.wschema" % testing.config.test_schema
- ].insert().execute(dict(b="a1", q="c1"))
+ with testing.db.connect() as conn:
+ conn.execute(cls.tables.keyed1.insert(), dict(b="a1", q="c1"))
+ conn.execute(cls.tables.keyed2.insert(), dict(a="a2", b="b2"))
+ conn.execute(cls.tables.keyed3.insert(), dict(a="a3", d="d3"))
+ conn.execute(cls.tables.keyed4.insert(), dict(b="b4", q="q4"))
+ conn.execute(cls.tables.content.insert(), type="t1")
+
+ if testing.requires.schemas.enabled:
+ conn.execute(
+ cls.tables[
+ "%s.wschema" % testing.config.test_schema
+ ].insert(),
+ dict(b="a1", q="c1"),
+ )
- def test_column_label_overlap_fallback(self):
+ def test_column_label_overlap_fallback(self, connection):
content, bar = self.tables.content, self.tables.bar
- row = testing.db.execute(
+ row = connection.execute(
select([content.c.type.label("content_type")])
).first()
):
in_(sql.column("content_type"), row)
- row = testing.db.execute(
+ row = connection.execute(
select([func.now().label("content_type")])
).first()
not_in_(content.c.type, row)
):
in_(sql.column("content_type"), row)
- def test_columnclause_schema_column_one(self):
+ def test_columnclause_schema_column_one(self, connection):
keyed2 = self.tables.keyed2
# this is addressed by [ticket:2932]
# cols, which results in a more liberal comparison scheme
a, b = sql.column("a"), sql.column("b")
stmt = select([a, b]).select_from(table("keyed2"))
- row = testing.db.execute(stmt).first()
+ row = connection.execute(stmt).first()
with testing.expect_deprecated(
"Retreiving row values using Column objects "
):
in_(keyed2.c.b, row)
- def test_columnclause_schema_column_two(self):
+ def test_columnclause_schema_column_two(self, connection):
keyed2 = self.tables.keyed2
a, b = sql.column("a"), sql.column("b")
stmt = select([keyed2.c.a, keyed2.c.b])
- row = testing.db.execute(stmt).first()
+ row = connection.execute(stmt).first()
with testing.expect_deprecated(
"Retreiving row values using Column objects "
):
in_(b, row)
- def test_columnclause_schema_column_three(self):
+ def test_columnclause_schema_column_three(self, connection):
keyed2 = self.tables.keyed2
# originally addressed by [ticket:2932], however liberalized
a, b = sql.column("a"), sql.column("b")
stmt = text("select a, b from keyed2").columns(a=CHAR, b=CHAR)
- row = testing.db.execute(stmt).first()
+ row = connection.execute(stmt).first()
with testing.expect_deprecated(
"Retreiving row values using Column objects "
):
in_(stmt.c.b, row)
- def test_columnclause_schema_column_four(self):
+ def test_columnclause_schema_column_four(self, connection):
keyed2 = self.tables.keyed2
# this is also addressed by [ticket:2932]
stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
a, b
)
- row = testing.db.execute(stmt).first()
+ row = connection.execute(stmt).first()
with testing.expect_deprecated(
"Retreiving row values using Column objects "
):
in_(stmt.c.keyed2_b, row)
- def test_columnclause_schema_column_five(self):
+ def test_columnclause_schema_column_five(self, connection):
keyed2 = self.tables.keyed2
# this is also addressed by [ticket:2932]
stmt = text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
keyed2_a=CHAR, keyed2_b=CHAR
)
- row = testing.db.execute(stmt).first()
+ row = connection.execute(stmt).first()
with testing.expect_deprecated(
"Retreiving row values using Column objects "
dict(user_id=2, user_name="jack"),
)
- def test_column_accessor_textual_select(self):
+ def test_column_accessor_textual_select(self, connection):
users = self.tables.users
# this will create column() objects inside
# the select(), these need to match on name anyway
- r = testing.db.execute(
+ r = connection.execute(
select([column("user_id"), column("user_name")])
.select_from(table("users"))
.where(text("user_id=2"))
):
eq_(r._mapping[users.c.user_name], "jack")
- def test_column_accessor_basic_text(self):
+ def test_column_accessor_basic_text(self, connection):
users = self.tables.users
- r = testing.db.execute(
+ r = connection.execute(
text("select * from users where user_id=2")
).first()
eq_(r._mapping[users.c.user_name], "jack")
@testing.provide_metadata
- def test_column_label_overlap_fallback(self):
+ def test_column_label_overlap_fallback(self, connection):
content = Table("content", self.metadata, Column("type", String(30)))
bar = Table("bar", self.metadata, Column("content_type", String(30)))
self.metadata.create_all(testing.db)
- testing.db.execute(content.insert().values(type="t1"))
+ connection.execute(content.insert().values(type="t1"))
- row = testing.db.execute(content.select(use_labels=True)).first()
+ row = connection.execute(content.select(use_labels=True)).first()
in_(content.c.type, row._mapping)
not_in_(bar.c.content_type, row)
with testing.expect_deprecated(
):
in_(sql.column("content_type"), row)
- row = testing.db.execute(
+ row = connection.execute(
select([content.c.type.label("content_type")])
).first()
with testing.expect_deprecated(
):
in_(sql.column("content_type"), row)
- row = testing.db.execute(
+ row = connection.execute(
select([func.now().label("content_type")])
).first()
{"user_id": 9, "user_name": "fred"},
)
- for pickle in False, True:
- for use_labels in False, True:
- result = (
- users.select(use_labels=use_labels)
- .order_by(users.c.user_id)
- .execute()
- .fetchall()
- )
+ for pickle in False, True:
+ for use_labels in False, True:
+ result = conn.execute(
+ users.select(use_labels=use_labels).order_by(
+ users.c.user_id
+ )
+ ).fetchall()
+
+ if pickle:
+ result = util.pickle.loads(util.pickle.dumps(result))
+
+ if pickle:
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "from a row that was unpickled"
+ ):
+ eq_(result[0]._mapping[users.c.user_id], 7)
+
+ result[0]._keymap.pop(users.c.user_id)
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "from a row that was unpickled"
+ ):
+ eq_(result[0]._mapping[users.c.user_id], 7)
+
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "from a row that was unpickled"
+ ):
+ eq_(result[0]._mapping[users.c.user_name], "jack")
+
+ result[0]._keymap.pop(users.c.user_name)
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "from a row that was unpickled"
+ ):
+ eq_(result[0]._mapping[users.c.user_name], "jack")
+
+ if not pickle or use_labels:
+ assert_raises(
+ exc.NoSuchColumnError,
+ lambda: result[0][addresses.c.user_id],
+ )
+
+ assert_raises(
+ exc.NoSuchColumnError,
+ lambda: result[0]._mapping[addresses.c.user_id],
+ )
+ else:
+ # test with a different table. name resolution is
+ # causing 'user_id' to match when use_labels wasn't
+ # used.
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "from a row that was unpickled"
+ ):
+ eq_(result[0]._mapping[addresses.c.user_id], 7)
+
+ result[0]._keymap.pop(addresses.c.user_id)
+ with testing.expect_deprecated(
+ "Retreiving row values using Column objects "
+ "from a row that was unpickled"
+ ):
+ eq_(result[0]._mapping[addresses.c.user_id], 7)
- if pickle:
- result = util.pickle.loads(util.pickle.dumps(result))
-
- if pickle:
- with testing.expect_deprecated(
- "Retreiving row values using Column objects "
- "from a row that was unpickled"
- ):
- eq_(result[0]._mapping[users.c.user_id], 7)
-
- result[0]._keymap.pop(users.c.user_id)
- with testing.expect_deprecated(
- "Retreiving row values using Column objects "
- "from a row that was unpickled"
- ):
- eq_(result[0]._mapping[users.c.user_id], 7)
-
- with testing.expect_deprecated(
- "Retreiving row values using Column objects "
- "from a row that was unpickled"
- ):
- eq_(result[0]._mapping[users.c.user_name], "jack")
-
- result[0]._keymap.pop(users.c.user_name)
- with testing.expect_deprecated(
- "Retreiving row values using Column objects "
- "from a row that was unpickled"
- ):
- eq_(result[0]._mapping[users.c.user_name], "jack")
-
- if not pickle or use_labels:
assert_raises(
exc.NoSuchColumnError,
- lambda: result[0][addresses.c.user_id],
+ lambda: result[0][addresses.c.address_id],
)
assert_raises(
exc.NoSuchColumnError,
- lambda: result[0]._mapping[addresses.c.user_id],
+ lambda: result[0]._mapping[addresses.c.address_id],
)
- else:
- # test with a different table. name resolution is
- # causing 'user_id' to match when use_labels wasn't used.
- with testing.expect_deprecated(
- "Retreiving row values using Column objects "
- "from a row that was unpickled"
- ):
- eq_(result[0]._mapping[addresses.c.user_id], 7)
-
- result[0]._keymap.pop(addresses.c.user_id)
- with testing.expect_deprecated(
- "Retreiving row values using Column objects "
- "from a row that was unpickled"
- ):
- eq_(result[0]._mapping[addresses.c.user_id], 7)
-
- assert_raises(
- exc.NoSuchColumnError,
- lambda: result[0][addresses.c.address_id],
- )
-
- assert_raises(
- exc.NoSuchColumnError,
- lambda: result[0]._mapping[addresses.c.address_id],
- )
@testing.requires.duplicate_names_in_cursor_description
def test_ambiguous_column_case_sensitive(self):
):
eng = engines.testing_engine(options=dict(case_sensitive=False))
- row = eng.execute(
- select(
- [
- literal_column("1").label("SOMECOL"),
- literal_column("1").label("SOMECOL"),
- ]
- )
- ).first()
+ with eng.connect() as conn:
+ row = conn.execute(
+ select(
+ [
+ literal_column("1").label("SOMECOL"),
+ literal_column("1").label("SOMECOL"),
+ ]
+ )
+ ).first()
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: row._mapping["somecol"],
- )
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name",
+ lambda: row._mapping["somecol"],
+ )
- def test_row_getitem_string(self):
- with testing.db.connect() as conn:
- col = literal_column("1").label("foo")
- row = conn.execute(select([col])).first()
+ def test_row_getitem_string(self, connection):
+ col = literal_column("1").label("foo")
+ row = connection.execute(select([col])).first()
- with testing.expect_deprecated(
- "Using non-integer/slice indices on Row is deprecated "
- "and will be removed in version 2.0;"
- ):
- eq_(row["foo"], 1)
+ with testing.expect_deprecated(
+ "Using non-integer/slice indices on Row is deprecated "
+ "and will be removed in version 2.0;"
+ ):
+ eq_(row["foo"], 1)
- eq_(row._mapping["foo"], 1)
+ eq_(row._mapping["foo"], 1)
- def test_row_getitem_column(self):
- with testing.db.connect() as conn:
- col = literal_column("1").label("foo")
- row = conn.execute(select([col])).first()
+ def test_row_getitem_column(self, connection):
+ col = literal_column("1").label("foo")
+ row = connection.execute(select([col])).first()
- with testing.expect_deprecated(
- "Using non-integer/slice indices on Row is deprecated "
- "and will be removed in version 2.0;"
- ):
- eq_(row[col], 1)
+ with testing.expect_deprecated(
+ "Using non-integer/slice indices on Row is deprecated "
+ "and will be removed in version 2.0;"
+ ):
+ eq_(row[col], 1)
- eq_(row._mapping[col], 1)
+ eq_(row._mapping[col], 1)
def test_row_case_insensitive(self):
with testing.expect_deprecated(
"The create_engine.case_sensitive parameter is deprecated"
):
- ins_db = engines.testing_engine(options={"case_sensitive": False})
- row = ins_db.execute(
- select(
- [
- literal_column("1").label("case_insensitive"),
- literal_column("2").label("CaseSensitive"),
- ]
- )
- ).first()
+ with engines.testing_engine(
+ options={"case_sensitive": False}
+ ).connect() as ins_conn:
+ row = ins_conn.execute(
+ select(
+ [
+ literal_column("1").label("case_insensitive"),
+ literal_column("2").label("CaseSensitive"),
+ ]
+ )
+ ).first()
- eq_(list(row._mapping.keys()), ["case_insensitive", "CaseSensitive"])
+ eq_(
+ list(row._mapping.keys()),
+ ["case_insensitive", "CaseSensitive"],
+ )
- in_("case_insensitive", row._keymap)
- in_("CaseSensitive", row._keymap)
- in_("casesensitive", row._keymap)
+ in_("case_insensitive", row._keymap)
+ in_("CaseSensitive", row._keymap)
+ in_("casesensitive", row._keymap)
- eq_(row._mapping["case_insensitive"], 1)
- eq_(row._mapping["CaseSensitive"], 2)
- eq_(row._mapping["Case_insensitive"], 1)
- eq_(row._mapping["casesensitive"], 2)
+ eq_(row._mapping["case_insensitive"], 1)
+ eq_(row._mapping["CaseSensitive"], 2)
+ eq_(row._mapping["Case_insensitive"], 1)
+ eq_(row._mapping["casesensitive"], 2)
def test_row_case_insensitive_unoptimized(self):
with testing.expect_deprecated(
"The create_engine.case_sensitive parameter is deprecated"
):
- ins_db = engines.testing_engine(options={"case_sensitive": False})
- row = ins_db.execute(
- select(
- [
- literal_column("1").label("case_insensitive"),
- literal_column("2").label("CaseSensitive"),
- text("3 AS screw_up_the_cols"),
- ]
- )
- ).first()
+ with engines.testing_engine(
+ options={"case_sensitive": False}
+ ).connect() as ins_conn:
+ row = ins_conn.execute(
+ select(
+ [
+ literal_column("1").label("case_insensitive"),
+ literal_column("2").label("CaseSensitive"),
+ text("3 AS screw_up_the_cols"),
+ ]
+ )
+ ).first()
- eq_(
- list(row._mapping.keys()),
- ["case_insensitive", "CaseSensitive", "screw_up_the_cols"],
- )
+ eq_(
+ list(row._mapping.keys()),
+ ["case_insensitive", "CaseSensitive", "screw_up_the_cols"],
+ )
- in_("case_insensitive", row._keymap)
- in_("CaseSensitive", row._keymap)
- in_("casesensitive", row._keymap)
+ in_("case_insensitive", row._keymap)
+ in_("CaseSensitive", row._keymap)
+ in_("casesensitive", row._keymap)
- eq_(row._mapping["case_insensitive"], 1)
- eq_(row._mapping["CaseSensitive"], 2)
- eq_(row._mapping["screw_up_the_cols"], 3)
- eq_(row._mapping["Case_insensitive"], 1)
- eq_(row._mapping["casesensitive"], 2)
- eq_(row._mapping["screw_UP_the_cols"], 3)
+ eq_(row._mapping["case_insensitive"], 1)
+ eq_(row._mapping["CaseSensitive"], 2)
+ eq_(row._mapping["screw_up_the_cols"], 3)
+ eq_(row._mapping["Case_insensitive"], 1)
+ eq_(row._mapping["casesensitive"], 2)
+ eq_(row._mapping["screw_UP_the_cols"], 3)
- def test_row_keys_deprecated(self):
- r = testing.db.execute(
+ def test_row_keys_deprecated(self, connection):
+ r = connection.execute(
text("select * from users where user_id=2")
).first()
):
eq_(r.keys(), ["user_id", "user_name"])
- def test_row_contains_key_deprecated(self):
- r = testing.db.execute(
+ def test_row_contains_key_deprecated(self, connection):
+ r = connection.execute(
text("select * from users where user_id=2")
).first()
@classmethod
def insert_data(cls):
- cls.tables.text1.insert().execute(
- [dict(a="a1", b="b1", c="c1", d="d1")]
- )
+ with testing.db.connect() as conn:
+ conn.execute(
+ cls.tables.text1.insert(),
+ [dict(a="a1", b="b1", c="c1", d="d1")],
+ )
- def test_anon_aliased_overlapping(self):
+ def test_anon_aliased_overlapping(self, connection):
text1 = self.tables.text1
c1 = text1.c.a.label(None)
c4 = text1.c.a.label(None)
stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
with testing.expect_deprecated(
):
eq_(row._mapping[text1.c.a], "a1")
- def test_anon_aliased_unique(self):
+ def test_anon_aliased_unique(self, connection):
text1 = self.tables.text1
c1 = text1.c.a.label(None)
c4 = text1.alias().c.d.label(None)
stmt = text("select a, b, c, d from text1").columns(c1, c2, c3, c4)
- result = testing.db.execute(stmt)
+ result = connection.execute(stmt)
row = result.first()
eq_(row._mapping[c1], "a1")