eq_(
connection.execute(
- tt1.select(order_by=desc(u("méil")))
+ tt1.select().order_by(desc(u("méil")))
).fetchall(),
[(2, 7), (1, 5)],
)
eq_(
connection.execute(
- tt2.select(order_by=desc(u("méil")))
+ tt2.select().order_by(desc(u("méil")))
).fetchall(),
[(2, 2), (1, 1)],
)
eq_(
connection.execute(
- tt3.select(order_by=desc(ue("\u6e2c\u8a66_id")))
+ tt3.select().order_by(desc(ue("\u6e2c\u8a66_id")))
).fetchall(),
[(2, 7, 2, 2), (1, 5, 1, 1)],
)
#
# Core SQL constructs
#
- r"The legacy calling style of select\(\) is deprecated and will be "
- "removed",
r"The FromClause.select\(\) method will no longer accept keyword "
"arguments in version 2.0",
r"The Join.select\(\) method will no longer accept keyword arguments "
t = table("t", column("x", Integer), column("y", Integer))
cols = [t.c.x, t.c.x.label("q"), t.c.x.label("p"), t.c.y]
- s = select(cols).where(t.c.x == 5).order_by(t.c.y).limit(10).offset(20)
+ s = (
+ select(*cols)
+ .where(t.c.x == 5)
+ .order_by(t.c.y)
+ .limit(10)
+ .offset(20)
+ )
self.assert_compile(
s,
def test_for_update_of_join_one(self):
self.assert_compile(
- self.join.select(self.table2.c.mytable_id == 7).with_for_update(
- of=[self.join]
- ),
+ self.join.select()
+ .where(self.table2.c.mytable_id == 7)
+ .with_for_update(of=[self.join]),
"SELECT table2.mytable_id, "
"mytable.myid, mytable.name, mytable.description "
"FROM table2 "
ta, self.table2.c.mytable_id == ta.c.myid
)
self.assert_compile(
- alias_join.select(self.table2.c.mytable_id == 7).with_for_update(
- of=[alias_join]
- ),
+ alias_join.select()
+ .where(self.table2.c.mytable_id == 7)
+ .with_for_update(of=[alias_join]),
"SELECT table2.mytable_id, "
"mytable_1.myid, mytable_1.name, mytable_1.description "
"FROM table2 "
),
)
results = connection.execute(
- arrtable.select(order_by=[arrtable.c.intarr])
+ arrtable.select().order_by(arrtable.c.intarr)
).fetchall()
eq_(len(results), 2)
eq_(results[0].strarr, [util.ue("m\xe4\xe4"), util.ue("m\xf6\xf6")])
.union(users.select().where(users.c.id > 7))
.alias("ulist")
.outerjoin(addresses)
- .select(order_by=[text("ulist.id"), addresses.c.id])
+ .select()
+ .order_by(text("ulist.id"), addresses.c.id)
)
sess = fixture_session()
.union(users.select().where(users.c.id > 7))
.alias("ulist")
.outerjoin(adalias)
- .select(order_by=[text("ulist.id"), adalias.c.id])
+ .select()
+ .order_by(text("ulist.id"), adalias.c.id)
)
def go():
.union(users.select().where(users.c.id > 7))
.alias("ulist")
.outerjoin(addresses)
- .select(order_by=[text("ulist.id"), addresses.c.id])
+ .select()
+ .order_by(text("ulist.id"), addresses.c.id)
)
sess = fixture_session()
q = sess.query(User)
sess = fixture_session()
- selectquery = users.outerjoin(addresses).select(
- users.c.id < 10,
- order_by=[users.c.id, addresses.c.id],
+ selectquery = (
+ users.outerjoin(addresses)
+ .select()
+ .where(users.c.id < 10)
+ .order_by(users.c.id, addresses.c.id)
)
q = sess.query(User)
q = sess.query(User)
adalias = addresses.alias("adalias")
- selectquery = users.outerjoin(adalias).select(
- order_by=[users.c.id, adalias.c.id]
+ selectquery = (
+ users.outerjoin(adalias)
+ .select()
+ .order_by(users.c.id, adalias.c.id)
)
# note this has multiple problems because we aren't giving Query
q = sess.query(User)
adalias = addresses.alias("adalias")
- selectquery = users.outerjoin(adalias).select(
- order_by=[users.c.id, adalias.c.id]
+ selectquery = (
+ users.outerjoin(adalias)
+ .select()
+ .order_by(users.c.id, adalias.c.id)
)
# note this has multiple problems because we aren't giving Query
)
self.assert_compile(
- sess.query(users, exists([1], from_obj=addresses))
+ sess.query(users, exists(text("1")).select_from(addresses))
.set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
.statement,
"SELECT users.id AS users_id, users.name AS users_name, EXISTS "
.union(users.select().where(users.c.id > 7))
.alias("ulist")
.outerjoin(addresses)
- .select(order_by=[text("ulist.id"), addresses.c.id])
+ .select()
+ .order_by(text("ulist.id"), addresses.c.id)
)
sess = fixture_session()
q = sess.query(User)
.union(users.select().where(users.c.id > 7))
.alias("ulist")
.outerjoin(addresses)
- .select(order_by=[text("ulist.id"), addresses.c.id])
+ .select()
+ .order_by(text("ulist.id"), addresses.c.id)
)
sess = fixture_session()
q = sess.query(User)
.union(users.select().where(users.c.id > 7))
.alias("ulist")
.outerjoin(addresses)
- .select(order_by=[text("ulist.id"), addresses.c.id])
+ .select()
+ .order_by(text("ulist.id"), addresses.c.id)
)
sess = fixture_session()
.union(users.select().where(users.c.id > 7))
.alias("ulist")
.outerjoin(adalias)
- .select(order_by=[text("ulist.id"), adalias.c.id])
+ .select()
+ .order_by(text("ulist.id"), adalias.c.id)
)
def go():
sess = fixture_session()
- selectquery = users.outerjoin(addresses).select(
- users.c.id < 10,
- order_by=[users.c.id, addresses.c.id],
+ selectquery = (
+ users.outerjoin(addresses)
+ .select()
+ .where(users.c.id < 10)
+ .order_by(users.c.id, addresses.c.id)
)
q = sess.query(User)
sess = fixture_session(future=True)
- selectquery = users.outerjoin(addresses).select(
- users.c.id < 10,
- order_by=[users.c.id, addresses.c.id],
+ selectquery = (
+ users.outerjoin(addresses)
+ .select()
+ .where(users.c.id < 10)
+ .order_by(users.c.id, addresses.c.id)
)
q = select(User)
sess = fixture_session(future=True)
- selectquery = users.outerjoin(addresses).select(
- order_by=[users.c.id, addresses.c.id]
+ selectquery = (
+ users.outerjoin(addresses)
+ .select()
+ .order_by(users.c.id, addresses.c.id)
)
result = sess.execute(
eq_(list(user_rows[0]), [u.id, "one2manytester"])
address_rows = conn.execute(
- addresses.select(order_by=[addresses.c.email_address]).where(
+ addresses.select()
+ .order_by(addresses.c.email_address)
+ .where(
addresses.c.id.in_([a.id, a2.id]),
)
).fetchall()
c1 = const(m1, bind=testing.db)
is_(c1.bind, testing.db)
+
+
+class FutureSelectTest(fixtures.TestBase, AssertsCompiledSQL):
+ __dialect__ = "default"
+
+ @testing.fixture
+ def table_fixture(self):
+ table1 = table(
+ "mytable",
+ column("myid", Integer),
+ column("name", String),
+ column("description", String),
+ )
+
+ table2 = table(
+ "myothertable",
+ column("otherid", Integer),
+ column("othername", String),
+ )
+ return table1, table2
+
+ def test_legacy_calling_style_kw_only(self, table_fixture):
+ table1, table2 = table_fixture
+ with testing.expect_deprecated_20(
+ "The legacy calling style of select"
+ ):
+ stmt = select(
+ whereclause=table1.c.myid == table2.c.otherid
+ ).add_columns(table1.c.myid)
+
+ self.assert_compile(
+ stmt,
+ "SELECT mytable.myid FROM mytable, myothertable "
+ "WHERE mytable.myid = myothertable.otherid",
+ )
+
+ def test_legacy_calling_style_col_seq_only(self, table_fixture):
+ table1, table2 = table_fixture
+ with testing.expect_deprecated_20(
+ "The legacy calling style of select"
+ ):
+ # keep [] here
+ stmt = select([table1.c.myid]).where(
+ table1.c.myid == table2.c.otherid
+ )
+
+ self.assert_compile(
+ stmt,
+ "SELECT mytable.myid FROM mytable, myothertable "
+ "WHERE mytable.myid = myothertable.otherid",
+ )
+
+ def test_new_calling_style_thing_ok_actually_use_iter(self, table_fixture):
+ table1, table2 = table_fixture
+
+ class Thing(object):
+ def __iter__(self):
+ return iter([table1.c.name, table1.c.description])
+
+ with testing.expect_deprecated_20(
+ "The legacy calling style of select"
+ ):
+ stmt = select(Thing())
+ self.assert_compile(
+ stmt,
+ "SELECT mytable.name, mytable.description FROM mytable",
+ )
+
+ def test_kw_triggers_old_style(self, table_fixture):
+ table1, table2 = table_fixture
+ with testing.expect_deprecated_20(
+ "The legacy calling style of select"
+ ):
+ assert_raises_message(
+ exc.ArgumentError,
+ r"select\(\) construct created in legacy mode, "
+ "i.e. with keyword arguments",
+ select,
+ table1.c.myid,
+ whereclause=table1.c.myid == table2.c.otherid,
+ )
exc.ArgumentError,
"Textual column expression 'f' should be explicitly declared",
select,
- [lambda: "foo"],
+ lambda: "foo",
)
def test_coercion_where_clause(self):
return stmt
a_eq(
- users.select(order_by=[users.c.user_id]).set_label_style(
- label_style
- ),
+ users.select()
+ .order_by(users.c.user_id)
+ .set_label_style(label_style),
[(1, "c"), (2, "b"), (3, "a")],
)
a_eq(
- users.select(
- order_by=[users.c.user_name, users.c.user_id],
- ).set_label_style(label_style),
+ users.select()
+ .order_by(users.c.user_name, users.c.user_id)
+ .set_label_style(label_style),
[(3, "a"), (2, "b"), (1, "c")],
)
)
a_eq(
- users.select(
- distinct=True,
- order_by=[users.c.user_id],
- ).set_label_style(label_style),
+ users.select()
+ .distinct()
+ .order_by(users.c.user_id)
+ .set_label_style(label_style),
[(1, "c"), (2, "b"), (3, "a")],
)
)
a_eq(
- users.select(
- distinct=True,
- order_by=[desc(users.c.user_id)],
- ).set_label_style(label_style),
+ users.select()
+ .distinct()
+ .order_by(desc(users.c.user_id))
+ .set_label_style(label_style),
[(3, "a"), (2, "b"), (1, "c")],
)
else LABEL_STYLE_TABLENAME_PLUS_COL
)
a_eq(
- users.select(
- order_by=[users.c.user_name.nulls_first()],
- ).set_label_style(label_style),
+ users.select()
+ .order_by(users.c.user_name.nulls_first())
+ .set_label_style(label_style),
[(1, None), (3, "a"), (2, "b")],
)
a_eq(
- users.select(
- order_by=[users.c.user_name.nulls_last()],
- ).set_label_style(label_style),
+ users.select()
+ .order_by(users.c.user_name.nulls_last())
+ .set_label_style(label_style),
[(3, "a"), (2, "b"), (1, None)],
)
a_eq(
- users.select(
- order_by=[asc(users.c.user_name).nulls_first()],
- ).set_label_style(label_style),
+ users.select()
+ .order_by(asc(users.c.user_name).nulls_first())
+ .set_label_style(label_style),
[(1, None), (3, "a"), (2, "b")],
)
a_eq(
- users.select(
- order_by=[asc(users.c.user_name).nulls_last()],
- ).set_label_style(label_style),
+ users.select()
+ .order_by(asc(users.c.user_name).nulls_last())
+ .set_label_style(label_style),
[(3, "a"), (2, "b"), (1, None)],
)
a_eq(
- users.select(
- order_by=[users.c.user_name.desc().nulls_first()],
- ).set_label_style(label_style),
+ users.select()
+ .order_by(users.c.user_name.desc().nulls_first())
+ .set_label_style(label_style),
[(1, None), (2, "b"), (3, "a")],
)
a_eq(
- users.select(
- order_by=[users.c.user_name.desc().nulls_last()],
- ).set_label_style(label_style),
+ users.select()
+ .order_by(users.c.user_name.desc().nulls_last())
+ .set_label_style(label_style),
[(2, "b"), (3, "a"), (1, None)],
)
a_eq(
- users.select(
- order_by=[desc(users.c.user_name).nulls_first()],
- ).set_label_style(label_style),
+ users.select()
+ .order_by(desc(users.c.user_name).nulls_first())
+ .set_label_style(label_style),
[(1, None), (2, "b"), (3, "a")],
)
a_eq(
- users.select(
- order_by=[desc(users.c.user_name).nulls_last()],
- ).set_label_style(label_style),
+ users.select()
+ .order_by(desc(users.c.user_name).nulls_last())
+ .set_label_style(label_style),
[(2, "b"), (3, "a"), (1, None)],
)
a_eq(
- users.select(
- order_by=[
- users.c.user_name.nulls_first(),
- users.c.user_id,
- ],
- ).set_label_style(label_style),
+ users.select()
+ .order_by(
+ users.c.user_name.nulls_first(),
+ users.c.user_id,
+ )
+ .set_label_style(label_style),
[(1, None), (3, "a"), (2, "b")],
)
a_eq(
- users.select(
- order_by=[users.c.user_name.nulls_last(), users.c.user_id],
- ).set_label_style(label_style),
+ users.select()
+ .order_by(users.c.user_name.nulls_last(), users.c.user_id)
+ .set_label_style(label_style),
[(3, "a"), (2, "b"), (1, None)],
)
def test_select_limit(self, connection):
users, addresses = self.tables("users", "addresses")
r = connection.execute(
- users.select(limit=3, order_by=[users.c.user_id])
+ users.select().limit(3).order_by(users.c.user_id)
).fetchall()
self.assert_(r == [(1, "john"), (2, "jack"), (3, "ed")], repr(r))
users, addresses = self.tables("users", "addresses")
r = connection.execute(
- users.select(limit=3, offset=2, order_by=[users.c.user_id])
+ users.select().limit(3).offset(2).order_by(users.c.user_id)
).fetchall()
self.assert_(r == [(3, "ed"), (4, "wendy"), (5, "laura")])
r = connection.execute(
- users.select(offset=5, order_by=[users.c.user_id])
+ users.select().offset(5).order_by(users.c.user_id)
).fetchall()
self.assert_(r == [(6, "ralph"), (7, "fido")])
table1.c.MixedCase,
table1.c.a123,
]
- result = connection.execute(select(columns)).all()
+ result = connection.execute(select(*columns)).all()
assert result == [(1, 2, 3, 4), (2, 2, 3, 4), (4, 3, 2, 1)]
columns = [table2.c.d123, table2.c.u123, table2.c.MixedCase]
- result = connection.execute(select(columns)).all()
+ result = connection.execute(select(*columns)).all()
assert result == [(1, 2, 3), (2, 2, 3), (4, 3, 2)]
def test_use_labels(self, connection):
table1.c.a123,
]
result = connection.execute(
- select(columns).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
+ select(*columns).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
).fetchall()
assert result == [(1, 2, 3, 4), (2, 2, 3, 4), (4, 3, 2, 1)]
columns = [table2.c.d123, table2.c.u123, table2.c.MixedCase]
result = connection.execute(
- select(columns).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
+ select(*columns).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
).all()
assert result == [(1, 2, 3), (2, 2, 3), (4, 3, 2)]
)
self.assert_compile(
- table1.select(distinct=True).alias("LaLa").select(),
+ table1.select().distinct().alias("LaLa").select(),
"SELECT "
'"LaLa".lowercase, '
'"LaLa"."UPPERCASE", '
# Note that 'col1' is already quoted (literal_column)
columns = [sql.literal_column("'col1'").label("label1")]
- x = select(columns, from_obj=[table]).alias("alias1")
+ x = select(*columns).select_from(table).alias("alias1")
x = x.select()
self.assert_compile(
x,
# Note that 'Col1' is already quoted (literal_column)
columns = [sql.literal_column("'Col1'").label("Label1")]
- x = select(columns, from_obj=[table]).alias("Alias1")
+ x = select(*columns).select_from(table).alias("Alias1")
x = x.select()
self.assert_compile(
x,
.where(users.c.user_name == "jack")
.scalar_subquery()
)
- for row in connection.execute(
- select([sel + 1, sel + 3], bind=users.bind)
- ):
+ for row in connection.execute(select(sel + 1, sel + 3)):
eq_(row._mapping["anon_1"], 8)
eq_(row._mapping["anon_2"], 10)
def _adapt_result_columns_fixture_five(self):
users, teams = self.tables("users", "teams")
- return select([users.c.id, teams.c.id]).select_from(
+ return select(users.c.id, teams.c.id).select_from(
users.outerjoin(teams)
)
class FutureSelectTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = "default"
- def test_legacy_calling_style_kw_only(self):
- stmt = select(
- whereclause=table1.c.myid == table2.c.otherid
- ).add_columns(table1.c.myid)
-
- self.assert_compile(
- stmt,
- "SELECT mytable.myid FROM mytable, myothertable "
- "WHERE mytable.myid = myothertable.otherid",
- )
-
- def test_legacy_calling_style_col_seq_only(self):
- # keep [] here
- stmt = select([table1.c.myid]).where(table1.c.myid == table2.c.otherid)
-
- self.assert_compile(
- stmt,
- "SELECT mytable.myid FROM mytable, myothertable "
- "WHERE mytable.myid = myothertable.otherid",
- )
-
def test_new_calling_style(self):
stmt = select(table1.c.myid).where(table1.c.myid == table2.c.otherid)
"mytable.description FROM mytable",
)
- def test_new_calling_style_thing_ok_actually_use_iter(self):
- class Thing(object):
- def __iter__(self):
- return iter([table1.c.name, table1.c.description])
-
- stmt = select(Thing())
- self.assert_compile(
- stmt,
- "SELECT mytable.name, mytable.description FROM mytable",
- )
-
- def test_kw_triggers_old_style(self):
-
- assert_raises_message(
- exc.ArgumentError,
- r"select\(\) construct created in legacy mode, "
- "i.e. with keyword arguments",
- select,
- table1.c.myid,
- whereclause=table1.c.myid == table2.c.otherid,
- )
-
def test_join_nofrom_implicit_left_side_explicit_onclause(self):
stmt = select(table1).join(table2, table1.c.myid == table2.c.otherid)
)
def test_plain_exists(self):
- expr = exists([1])
+ expr = exists(text("1"))
eq_(type(expr.type), Boolean)
eq_(
[
)
def test_plain_exists_negate(self):
- expr = ~exists([1])
+ expr = ~exists(text("1"))
eq_(type(expr.type), Boolean)
eq_(
[
)
def test_plain_exists_double_negate(self):
- expr = ~(~exists([1]))
+ expr = ~(~exists(text("1")))
eq_(type(expr.type), Boolean)
eq_(
[
__dialect__ = "default"
def _test(self, fn, arg, offending_clause):
+ arg = util.to_list(arg)
+
assert_raises_message(
exc.ArgumentError,
r"Textual (?:SQL|column|SQL FROM) expression %(stmt)r should be "
r"explicitly declared (?:with|as) text\(%(stmt)r\)"
% {"stmt": util.ellipses_string(offending_clause)},
fn,
- arg,
+ *arg
)
def test_where(self):
)
for stmt in (
- binary_table.select(order_by=binary_table.c.primary_id),
+ binary_table.select().order_by(binary_table.c.primary_id),
text(
"select * from binary_table order by binary_table.primary_id",
).columns(