)
result = connection.execute(
- tbl.select(tbl.c.col_a.is_distinct_from(tbl.c.col_b))
+ tbl.select().where(tbl.c.col_a.is_distinct_from(tbl.c.col_b))
).fetchall()
eq_(
len(result),
1 if expected_row_count_for_is == 0 else 0
)
result = connection.execute(
- tbl.select(tbl.c.col_a.is_not_distinct_from(tbl.c.col_b))
+ tbl.select().where(tbl.c.col_a.is_not_distinct_from(tbl.c.col_b))
).fetchall()
eq_(
len(result),
# be "error" however for I98b8defdf7c37b818b3824d02f7668e3f5f31c94
# we are moving one at a time
for msg in [
- #
- # Core execution
- #
- # r".*DefaultGenerator.execute\(\)",
- #
- #
#
# Core SQL constructs
#
- r"The FromClause\.select\(\).whereclause parameter is deprecated",
r"Set functions such as union\(\), union_all\(\), extract\(\),",
r"The legacy calling style of select\(\) is deprecated and will be "
"removed",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(of=table1),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(of=table1),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %s "
"FOR UPDATE",
def test_for_update_basic(self):
self.assert_compile(
- self.table1.select(self.table1.c.myid == 7).with_for_update(),
+ self.table1.select()
+ .where(self.table1.c.myid == 7)
+ .with_for_update(),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %s FOR UPDATE",
)
def test_for_update_read(self):
self.assert_compile(
- self.table1.select(self.table1.c.myid == 7).with_for_update(
- read=True
- ),
+ self.table1.select()
+ .where(self.table1.c.myid == 7)
+ .with_for_update(read=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE",
)
def test_for_update_skip_locked(self):
self.assert_compile(
- self.table1.select(self.table1.c.myid == 7).with_for_update(
- skip_locked=True
- ),
+ self.table1.select()
+ .where(self.table1.c.myid == 7)
+ .with_for_update(skip_locked=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %s "
"FOR UPDATE SKIP LOCKED",
def test_for_update_read_and_skip_locked(self):
self.assert_compile(
- self.table1.select(self.table1.c.myid == 7).with_for_update(
- read=True, skip_locked=True
- ),
+ self.table1.select()
+ .where(self.table1.c.myid == 7)
+ .with_for_update(read=True, skip_locked=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %s "
"LOCK IN SHARE MODE SKIP LOCKED",
def test_for_update_nowait(self):
self.assert_compile(
- self.table1.select(self.table1.c.myid == 7).with_for_update(
- nowait=True
- ),
+ self.table1.select()
+ .where(self.table1.c.myid == 7)
+ .with_for_update(nowait=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %s "
"FOR UPDATE NOWAIT",
def test_for_update_read_and_nowait(self):
self.assert_compile(
- self.table1.select(self.table1.c.myid == 7).with_for_update(
- read=True, nowait=True
- ),
+ self.table1.select()
+ .where(self.table1.c.myid == 7)
+ .with_for_update(read=True, nowait=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %s "
"LOCK IN SHARE MODE NOWAIT",
def test_for_update_of_nowait(self):
self.assert_compile(
- self.table1.select(self.table1.c.myid == 7).with_for_update(
- of=self.table1, nowait=True
- ),
+ self.table1.select()
+ .where(self.table1.c.myid == 7)
+ .with_for_update(of=self.table1, nowait=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %s "
"FOR UPDATE OF mytable NOWAIT",
def test_for_update_of_basic(self):
self.assert_compile(
- self.table1.select(self.table1.c.myid == 7).with_for_update(
- of=self.table1
- ),
+ self.table1.select()
+ .where(self.table1.c.myid == 7)
+ .with_for_update(of=self.table1),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %s "
"FOR UPDATE OF mytable",
def test_for_update_of_skip_locked(self):
self.assert_compile(
- self.table1.select(self.table1.c.myid == 7).with_for_update(
- of=self.table1, skip_locked=True
- ),
+ self.table1.select()
+ .where(self.table1.c.myid == 7)
+ .with_for_update(of=self.table1, skip_locked=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %s "
"FOR UPDATE OF mytable SKIP LOCKED",
def test_for_update_of_column_list_aliased(self):
ta = self.table1.alias()
self.assert_compile(
- ta.select(ta.c.myid == 7).with_for_update(
- of=[ta.c.myid, ta.c.name]
- ),
+ ta.select()
+ .where(ta.c.myid == 7)
+ .with_for_update(of=[ta.c.myid, ta.c.name]),
"SELECT mytable_1.myid, mytable_1.name, mytable_1.description "
"FROM mytable AS mytable_1 "
"WHERE mytable_1.myid = %s FOR UPDATE OF mytable_1",
def test_for_update_of_read_nowait(self):
self.assert_compile(
- self.table1.select(self.table1.c.myid == 7).with_for_update(
- read=True, of=self.table1, nowait=True
- ),
+ self.table1.select()
+ .where(self.table1.c.myid == 7)
+ .with_for_update(read=True, of=self.table1, nowait=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %s "
"LOCK IN SHARE MODE OF mytable NOWAIT",
def test_for_update_of_read_skip_locked(self):
self.assert_compile(
- self.table1.select(self.table1.c.myid == 7).with_for_update(
- read=True, of=self.table1, skip_locked=True
- ),
+ self.table1.select()
+ .where(self.table1.c.myid == 7)
+ .with_for_update(read=True, of=self.table1, skip_locked=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %s "
"LOCK IN SHARE MODE OF mytable SKIP LOCKED",
def test_for_update_of_read_nowait_column_list(self):
self.assert_compile(
- self.table1.select(self.table1.c.myid == 7).with_for_update(
+ self.table1.select()
+ .where(self.table1.c.myid == 7)
+ .with_for_update(
read=True,
of=[self.table1.c.myid, self.table1.c.name],
nowait=True,
def test_for_update_of_read(self):
self.assert_compile(
- self.table1.select(self.table1.c.myid == 7).with_for_update(
- read=True, of=self.table1
- ),
+ self.table1.select()
+ .where(self.table1.c.myid == 7)
+ .with_for_update(read=True, of=self.table1),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %s "
"LOCK IN SHARE MODE OF mytable",
def test_for_update_textual_of(self):
self.assert_compile(
- self.table1.select(self.table1.c.myid == 7).with_for_update(
- of=text("mytable")
- ),
+ self.table1.select()
+ .where(self.table1.c.myid == 7)
+ .with_for_update(of=text("mytable")),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %s "
"FOR UPDATE OF mytable",
)
self.assert_compile(
- self.table1.select(self.table1.c.myid == 7).with_for_update(
- of=literal_column("mytable")
- ),
+ self.table1.select()
+ .where(self.table1.c.myid == 7)
+ .with_for_update(of=literal_column("mytable")),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %s "
"FOR UPDATE OF mytable",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(),
+ table1.select().where(table1.c.myid == 7).with_for_update(),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- of=table1.c.myid
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(of=table1.c.myid),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 "
"FOR UPDATE OF mytable.myid",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(nowait=True),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(nowait=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- nowait=True, of=table1.c.myid
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(nowait=True, of=table1.c.myid),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 "
"FOR UPDATE OF mytable.myid NOWAIT",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- nowait=True, of=[table1.c.myid, table1.c.name]
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(nowait=True, of=[table1.c.myid, table1.c.name]),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF "
"mytable.myid, mytable.name NOWAIT",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(
skip_locked=True, of=[table1.c.myid, table1.c.name]
),
"SELECT mytable.myid, mytable.name, mytable.description "
# key_share has no effect
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(key_share=True),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(key_share=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE",
)
# read has no effect
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- read=True, key_share=True
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(read=True, key_share=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE",
)
ta = table1.alias()
self.assert_compile(
- ta.select(ta.c.myid == 7).with_for_update(
- of=[ta.c.myid, ta.c.name]
- ),
+ ta.select()
+ .where(ta.c.myid == 7)
+ .with_for_update(of=[ta.c.myid, ta.c.name]),
"SELECT mytable_1.myid, mytable_1.name, mytable_1.description "
"FROM mytable mytable_1 "
"WHERE mytable_1.myid = :myid_1 FOR UPDATE OF "
# ensure of=text() for of works
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- read=True, of=text("table1")
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(read=True, of=text("table1")),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF table1",
)
# ensure of=literal_column() for of works
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- read=True, of=literal_column("table1")
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(read=True, of=literal_column("table1")),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE OF table1",
)
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(),
+ table1.select().where(table1.c.myid == 7).with_for_update(),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(nowait=True),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(nowait=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE NOWAIT",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- skip_locked=True
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(skip_locked=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s "
"FOR UPDATE SKIP LOCKED",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(read=True),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(read=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- read=True, nowait=True
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(read=True, nowait=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE NOWAIT",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- key_share=True, nowait=True
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(key_share=True, nowait=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s "
"FOR NO KEY UPDATE NOWAIT",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- key_share=True, read=True, nowait=True
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(key_share=True, read=True, nowait=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s "
"FOR KEY SHARE NOWAIT",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- read=True, skip_locked=True
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(read=True, skip_locked=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s "
"FOR SHARE SKIP LOCKED",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- of=table1.c.myid
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(of=table1.c.myid),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s "
"FOR UPDATE OF mytable",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- read=True, nowait=True, of=table1
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(read=True, nowait=True, of=table1),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s "
"FOR SHARE OF mytable NOWAIT",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(
key_share=True, read=True, nowait=True, of=table1
),
"SELECT mytable.myid, mytable.name, mytable.description "
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- read=True, nowait=True, of=table1.c.myid
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(read=True, nowait=True, of=table1.c.myid),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s "
"FOR SHARE OF mytable NOWAIT",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(
read=True, nowait=True, of=[table1.c.myid, table1.c.name]
),
"SELECT mytable.myid, mytable.name, mytable.description "
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(
read=True,
skip_locked=True,
of=[table1.c.myid, table1.c.name],
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(
skip_locked=True, of=[table1.c.myid, table1.c.name]
),
"SELECT mytable.myid, mytable.name, mytable.description "
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(
read=True, skip_locked=True, of=[table1.c.myid, table1.c.name]
),
"SELECT mytable.myid, mytable.name, mytable.description "
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(
key_share=True, nowait=True, of=[table1.c.myid, table1.c.name]
),
"SELECT mytable.myid, mytable.name, mytable.description "
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(
key_share=True,
skip_locked=True,
of=[table1.c.myid, table1.c.name],
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(
key_share=True, of=[table1.c.myid, table1.c.name]
),
"SELECT mytable.myid, mytable.name, mytable.description "
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(key_share=True),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(key_share=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s "
"FOR NO KEY UPDATE",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- read=True, key_share=True
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(read=True, key_share=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s "
"FOR KEY SHARE",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- read=True, key_share=True, of=table1
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(read=True, key_share=True, of=table1),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s "
"FOR KEY SHARE OF mytable",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- read=True, of=table1
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(read=True, of=table1),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s "
"FOR SHARE OF mytable",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- read=True, key_share=True, skip_locked=True
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(read=True, key_share=True, skip_locked=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s "
"FOR KEY SHARE SKIP LOCKED",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- key_share=True, skip_locked=True
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(key_share=True, skip_locked=True),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s "
"FOR NO KEY UPDATE SKIP LOCKED",
ta = table1.alias()
self.assert_compile(
- ta.select(ta.c.myid == 7).with_for_update(
- of=[ta.c.myid, ta.c.name]
- ),
+ ta.select()
+ .where(ta.c.myid == 7)
+ .with_for_update(of=[ta.c.myid, ta.c.name]),
"SELECT mytable_1.myid, mytable_1.name, mytable_1.description "
"FROM mytable AS mytable_1 "
"WHERE mytable_1.myid = %(myid_1)s FOR UPDATE OF mytable_1",
table2 = table("table2", column("mytable_id"))
join = table2.join(table1, table2.c.mytable_id == table1.c.myid)
self.assert_compile(
- join.select(table2.c.mytable_id == 7).with_for_update(of=[join]),
+ join.select()
+ .where(table2.c.mytable_id == 7)
+ .with_for_update(of=[join]),
"SELECT table2.mytable_id, "
"mytable.myid, mytable.name, mytable.description "
"FROM table2 "
join = table2.join(ta, table2.c.mytable_id == ta.c.myid)
self.assert_compile(
- join.select(table2.c.mytable_id == 7).with_for_update(of=[join]),
+ join.select()
+ .where(table2.c.mytable_id == 7)
+ .with_for_update(of=[join]),
"SELECT table2.mytable_id, "
"mytable_1.myid, mytable_1.name, mytable_1.description "
"FROM table2 "
# ensure of=text() for of works
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- of=text("table1")
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(of=text("table1")),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s "
"FOR UPDATE OF table1",
# ensure literal_column of works
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(
- of=literal_column("table1")
- ),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(of=literal_column("table1")),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = %(myid_1)s "
"FOR UPDATE OF table1",
)
self.assert_compile(
- table1.select(table1.c.myid == 7).with_for_update(of=table1),
+ table1.select()
+ .where(table1.c.myid == 7)
+ .with_for_update(of=table1),
"SELECT testschema.mytable.myid, testschema.mytable.name "
"FROM testschema.mytable "
"WHERE testschema.mytable.myid = %(myid_1)s "
Column("b", String(30), nullable=0),
)
- cls.tables.bar = foo.select(foo.c.b == "bar").alias("bar")
- cls.tables.baz = foo.select(foo.c.b == "baz").alias("baz")
+ cls.tables.bar = foo.select().where(foo.c.b == "bar").alias("bar")
+ cls.tables.baz = foo.select().where(foo.c.b == "baz").alias("baz")
def test_load(self, connection):
foo, bar, baz = self.tables.foo, self.tables.bar, self.tables.baz
sess.execute(users_unbound.insert(), params=dict(id=2, name="jack"))
eq_(
sess.execute(
- users_unbound.select(users_unbound.c.id == 2)
+ users_unbound.select().where(users_unbound.c.id == 2)
).fetchall(),
[(2, "jack")],
)
eq_(
- sess.execute(users_unbound.select(User.id == 2)).fetchall(),
+ sess.execute(
+ users_unbound.select().where(User.id == 2)
+ ).fetchall(),
[(2, "jack")],
)
eq_(
sess.execute(
- users_unbound.select(users_unbound.c.id == 2)
+ users_unbound.select().where(users_unbound.c.id == 2)
).fetchall(),
[(2, "jack")],
)
eq_(
- sess.execute(users_unbound.select(User.id == 2)).fetchall(),
+ sess.execute(
+ users_unbound.select().where(User.id == 2)
+ ).fetchall(),
[(2, "jack")],
)
)
query = (
- users.select(users.c.id == 7)
- .union(users.select(users.c.id > 7))
+ users.select()
+ .where(users.c.id == 7)
+ .union(users.select().where(users.c.id > 7))
.alias("ulist")
.outerjoin(addresses)
.select(order_by=[text("ulist.id"), addresses.c.id])
# the adapter created by contains_eager()
adalias = addresses.alias()
query = (
- users.select(users.c.id == 7)
- .union(users.select(users.c.id > 7))
+ users.select()
+ .where(users.c.id == 7)
+ .union(users.select().where(users.c.id > 7))
.alias("ulist")
.outerjoin(adalias)
.select(order_by=[text("ulist.id"), adalias.c.id])
# mapper(User, users, properties={"addresses": relationship(Address)})
# mapper(Address, addresses)
- sel = users.select(users.c.id.in_([7, 8]))
+ sel = users.select().where(users.c.id.in_([7, 8]))
sess = fixture_session()
with self._expect_implicit_subquery():
)
sess = fixture_session()
- sel = users.select(users.c.id.in_([7, 8]))
+ sel = users.select().where(users.c.id.in_([7, 8]))
with self._expect_implicit_subquery():
eq_(
def test_join_no_order_by(self):
User, users = self.classes.User, self.tables.users
- sel = users.select(users.c.id.in_([7, 8]))
+ sel = users.select().where(users.c.id.in_([7, 8]))
sess = fixture_session()
with self._expect_implicit_subquery():
self.classes.User,
)
- sel = users.select(users.c.id.in_([7, 8]))
+ sel = users.select().where(users.c.id.in_([7, 8]))
sess = fixture_session()
def go():
)
query = (
- users.select(users.c.id == 7)
- .union(users.select(users.c.id > 7))
+ users.select()
+ .where(users.c.id == 7)
+ .union(users.select().where(users.c.id > 7))
.alias("ulist")
.outerjoin(addresses)
.select(order_by=[text("ulist.id"), addresses.c.id])
with testing.expect_deprecated(r"Query.values?\(\) is deprecated"):
assert list(sess.query(User).values()) == list()
- sel = users.select(User.id.in_([7, 8])).alias()
+ sel = users.select().where(User.id.in_([7, 8])).alias()
q = sess.query(User)
with testing.expect_deprecated(r"Query.values?\(\) is deprecated"):
q2 = q.select_entity_from(sel).values(User.name)
with testing.expect_deprecated(r"Query.values?\(\) is deprecated"):
assert list(sess.query(User).values()) == list()
- sel = users.select(User.id.in_([7, 8])).alias()
+ sel = users.select().where(User.id.in_([7, 8])).alias()
q = sess.query(User)
u2 = aliased(User)
with testing.expect_deprecated(r"Query.values?\(\) is deprecated"):
.alias("max_orders_by_user")
)
- max_orders = orders.select(
- orders.c.id == max_orders_by_user.c.order_id
- ).alias("max_orders")
+ max_orders = (
+ orders.select()
+ .where(orders.c.id == max_orders_by_user.c.order_id)
+ .alias("max_orders")
+ )
mapper(Order, orders)
mapper(
)
query = (
- users.select(users.c.id == 7)
- .union(users.select(users.c.id > 7))
+ users.select()
+ .where(users.c.id == 7)
+ .union(users.select().where(users.c.id > 7))
.alias("ulist")
.outerjoin(addresses)
.select(order_by=[text("ulist.id"), addresses.c.id])
)
query = (
- users.select(users.c.id == 7)
- .union(users.select(users.c.id > 7))
+ users.select()
+ .where(users.c.id == 7)
+ .union(users.select().where(users.c.id > 7))
.alias("ulist")
.outerjoin(addresses)
.select(order_by=[text("ulist.id"), addresses.c.id])
)
query = (
- users.select(users.c.id == 7)
- .union(users.select(users.c.id > 7))
+ users.select()
+ .where(users.c.id == 7)
+ .union(users.select().where(users.c.id > 7))
.alias("ulist")
.outerjoin(addresses)
.select(order_by=[text("ulist.id"), addresses.c.id])
# the adapter created by contains_eager()
adalias = addresses.alias()
query = (
- users.select(users.c.id == 7)
- .union(users.select(users.c.id > 7))
+ users.select()
+ .where(users.c.id == 7)
+ .union(users.select().where(users.c.id > 7))
.alias("ulist")
.outerjoin(adalias)
.select(order_by=[text("ulist.id"), adalias.c.id])
)
sess = fixture_session()
- sel = users.select(User.id.in_([7, 8])).alias()
+ sel = users.select().where(User.id.in_([7, 8])).alias()
q = sess.query(User.name)
q2 = q.select_entity_from(sel).all()
eq_(list(q2), [("jack",), ("ed",)])
mapper(User, users, properties={"addresses": relationship(Address)})
mapper(Address, addresses)
- sel = users.select(users.c.id.in_([7, 8])).alias()
+ sel = users.select().where(users.c.id.in_([7, 8])).alias()
sess = fixture_session()
eq_(
mapper(User, users)
- sel = users.select(users.c.id.in_([7, 8]))
+ sel = users.select().where(users.c.id.in_([7, 8]))
sess = fixture_session()
eq_(
mapper(User, users, properties={"addresses": relationship(Address)})
mapper(Address, addresses)
- sel = users.select(users.c.id.in_([7, 8]))
+ sel = users.select().where(users.c.id.in_([7, 8]))
sess = fixture_session()
eq_(
mapper(Keyword, keywords)
sess = fixture_session()
- sel = users.select(users.c.id.in_([7, 8]))
+ sel = users.select().where(users.c.id.in_([7, 8]))
eq_(
sess.query(User)
sess = fixture_session()
- sel = users.select(users.c.id.in_([7, 8]))
+ sel = users.select().where(users.c.id.in_([7, 8]))
def go():
eq_(
self.assert_sql_count(testing.db, go, 1)
sess.expunge_all()
- sel2 = orders.select(orders.c.id.in_([1, 2, 3]))
+ sel2 = orders.select().where(orders.c.id.in_([1, 2, 3]))
eq_(
sess.query(Order)
.select_entity_from(sel2.subquery())
)
mapper(Address, addresses)
- sel = users.select(users.c.id.in_([7, 8]))
+ sel = users.select().where(users.c.id.in_([7, 8]))
sess = fixture_session()
def go():
class SomeUser(object):
pass
- s = users.select(users.c.id != 12).alias("users")
+ s = users.select().where(users.c.id != 12).alias("users")
m = mapper(SomeUser, s)
assert s.primary_key == m.primary_key
.alias("max_orders_by_user")
)
- max_orders = orders.select(
- orders.c.id == max_orders_by_user.c.order_id
- ).alias("max_orders")
+ max_orders = (
+ orders.select()
+ .where(orders.c.id == max_orders_by_user.c.order_id)
+ .alias("max_orders")
+ )
mapper(Order, orders)
mapper(
.alias("max_orders_by_user")
)
- max_orders = orders.select(
- orders.c.id == max_orders_by_user.c.order_id
- ).alias("max_orders")
+ max_orders = (
+ orders.select()
+ .where(orders.c.id == max_orders_by_user.c.order_id)
+ .alias("max_orders")
+ )
mapper(Order, orders)
mapper(
conn = session.connection()
user_rows = conn.execute(
- users.select(users.c.id.in_([u.id]))
+ users.select().where(users.c.id.in_([u.id]))
).fetchall()
eq_(list(user_rows[0]), [u.id, "one2manytester"])
address_rows = conn.execute(
- addresses.select(
+ addresses.select(order_by=[addresses.c.email_address]).where(
addresses.c.id.in_([a.id, a2.id]),
- order_by=[addresses.c.email_address],
)
).fetchall()
eq_(list(address_rows[0]), [a2.id, u.id, "lala@test.org"])
session.flush()
address_rows = conn.execute(
- addresses.select(addresses.c.id == addressid)
+ addresses.select().where(addresses.c.id == addressid)
).fetchall()
eq_(list(address_rows[0]), [addressid, userid, "somethingnew@foo.com"])
self.assert_(u.id == userid and a2.id == addressid)
conn = session.connection()
user_rows = conn.execute(
- users.select(users.c.id.in_([u.foo_id]))
+ users.select().where(users.c.id.in_([u.foo_id]))
).fetchall()
eq_(list(user_rows[0]), [u.foo_id, "multitester"])
address_rows = conn.execute(
- addresses.select(addresses.c.id.in_([u.id]))
+ addresses.select().where(addresses.c.id.in_([u.id]))
).fetchall()
eq_(list(address_rows[0]), [u.id, u.foo_id, "multi@test.org"])
session.flush()
user_rows = conn.execute(
- users.select(users.c.id.in_([u.foo_id]))
+ users.select().where(users.c.id.in_([u.foo_id]))
).fetchall()
eq_(list(user_rows[0]), [u.foo_id, "imnew"])
address_rows = conn.execute(
- addresses.select(addresses.c.id.in_([u.id]))
+ addresses.select().where(addresses.c.id.in_([u.id]))
).fetchall()
eq_(list(address_rows[0]), [u.id, u.foo_id, "lala@hey.com"])
def test_operator_precedence_1(self):
self.assert_compile(
- self.table2.select((self.table2.c.field == 5) == None), # noqa
+ self.table2.select().where(
+ (self.table2.c.field == 5) == None
+ ), # noqa
"SELECT op.field FROM op WHERE (op.field = :field_1) IS NULL",
)
def test_operator_precedence_2(self):
self.assert_compile(
- self.table2.select(
+ self.table2.select().where(
(self.table2.c.field + 5) == self.table2.c.field
),
"SELECT op.field FROM op WHERE op.field + :field_1 = op.field",
def test_operator_precedence_3(self):
self.assert_compile(
- self.table2.select((self.table2.c.field + 5) * 6),
+ self.table2.select().where((self.table2.c.field + 5) * 6),
"SELECT op.field FROM op WHERE (op.field + :field_1) * :param_1",
)
def test_operator_precedence_4(self):
self.assert_compile(
- self.table2.select((self.table2.c.field * 5) + 6),
+ self.table2.select().where((self.table2.c.field * 5) + 6),
"SELECT op.field FROM op WHERE op.field * :field_1 + :param_1",
)
def test_operator_precedence_5(self):
self.assert_compile(
- self.table2.select(5 + self.table2.c.field.in_([5, 6])),
+ self.table2.select().where(5 + self.table2.c.field.in_([5, 6])),
"SELECT op.field FROM op WHERE :param_1 + "
"(op.field IN ([POSTCOMPILE_field_1]))",
)
def test_operator_precedence_6(self):
self.assert_compile(
- self.table2.select((5 + self.table2.c.field).in_([5, 6])),
+ self.table2.select().where((5 + self.table2.c.field).in_([5, 6])),
"SELECT op.field FROM op WHERE :field_1 + op.field "
"IN ([POSTCOMPILE_param_1])",
)
def test_operator_precedence_7(self):
self.assert_compile(
- self.table2.select(
+ self.table2.select().where(
not_(and_(self.table2.c.field == 5, self.table2.c.field == 7))
),
"SELECT op.field FROM op WHERE NOT "
def test_operator_precedence_8(self):
self.assert_compile(
- self.table2.select(not_(self.table2.c.field == 5)),
+ self.table2.select().where(not_(self.table2.c.field == 5)),
"SELECT op.field FROM op WHERE op.field != :field_1",
)
def test_operator_precedence_9(self):
self.assert_compile(
- self.table2.select(not_(self.table2.c.field.between(5, 6))),
+ self.table2.select().where(
+ not_(self.table2.c.field.between(5, 6))
+ ),
"SELECT op.field FROM op WHERE "
"op.field NOT BETWEEN :field_1 AND :field_2",
)
def test_operator_precedence_10(self):
self.assert_compile(
- self.table2.select(not_(self.table2.c.field) == 5),
+ self.table2.select().where(not_(self.table2.c.field) == 5),
"SELECT op.field FROM op WHERE (NOT op.field) = :param_1",
)
def test_operator_precedence_11(self):
self.assert_compile(
- self.table2.select(
+ self.table2.select().where(
(self.table2.c.field == self.table2.c.field).between(
False, True
)
def test_operator_precedence_12(self):
self.assert_compile(
- self.table2.select(
+ self.table2.select().where(
between(
(self.table2.c.field == self.table2.c.field), False, True
)
def test_operator_precedence_13(self):
self.assert_compile(
- self.table2.select(
+ self.table2.select().where(
self.table2.c.field.match(self.table2.c.field).is_(None)
),
"SELECT op.field FROM op WHERE (op.field MATCH op.field) IS NULL",
def test_op_operators(self):
self.assert_compile(
- self.table1.select(self.table1.c.myid.op("hoho")(12) == 14),
+ self.table1.select().where(
+ self.table1.c.myid.op("hoho")(12) == 14
+ ),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable WHERE (mytable.myid hoho :myid_1) = :param_1",
)
def test_negate_operators_2(self):
self.assert_compile(
- self.table1.select(
+ self.table1.select().where(
(self.table1.c.myid != 12) & ~(self.table1.c.name == "john")
),
"SELECT mytable.myid, mytable.name FROM "
def test_negate_operators_3(self):
self.assert_compile(
- self.table1.select(
+ self.table1.select().where(
(self.table1.c.myid != 12)
& ~(self.table1.c.name.between("jack", "john"))
),
def test_negate_operators_4(self):
self.assert_compile(
- self.table1.select(
+ self.table1.select().where(
(self.table1.c.myid != 12)
& ~and_(
self.table1.c.name == "john",
def test_negate_operators_5(self):
self.assert_compile(
- self.table1.select(
+ self.table1.select().where(
(self.table1.c.myid != 12) & ~self.table1.c.name
),
"SELECT mytable.myid, mytable.name FROM "
connection.execute(users.insert(), dict(user_id=8, user_name="fred"))
u = bindparam("userid")
- s = users.select(and_(users.c.user_name == u, users.c.user_name == u))
+ s = users.select().where(
+ and_(users.c.user_name == u, users.c.user_name == u)
+ )
r = connection.execute(s, dict(userid="fred")).fetchall()
assert len(r) == 1
connection.execute(users.insert(), dict(user_id=8, user_name="fred"))
connection.execute(users.insert(), dict(user_id=9, user_name=None))
- s = users.select(users.c.user_name.in_([]))
+ s = users.select().where(users.c.user_name.in_([]))
r = connection.execute(s).fetchall()
# No username is in empty set
assert len(r) == 0
- s = users.select(not_(users.c.user_name.in_([])))
+ s = users.select().where(not_(users.c.user_name.in_([])))
r = connection.execute(s).fetchall()
assert len(r) == 3
- s = users.select(users.c.user_name.in_(["jack", "fred"]))
+ s = users.select().where(users.c.user_name.in_(["jack", "fred"]))
r = connection.execute(s).fetchall()
assert len(r) == 2
- s = users.select(not_(users.c.user_name.in_(["jack", "fred"])))
+ s = users.select().where(not_(users.c.user_name.in_(["jack", "fred"])))
r = connection.execute(s).fetchall()
# Null values are not outside any set
assert len(r) == 0
u = bindparam("search_key", type_=String)
- s = users.select(not_(u.in_([])))
+ s = users.select().where(not_(u.in_([])))
r = connection.execute(s, dict(search_key="john")).fetchall()
assert len(r) == 3
r = connection.execute(s, dict(search_key=None)).fetchall()
connection.execute(users.insert(), dict(user_id=8, user_name="fred"))
connection.execute(users.insert(), dict(user_id=9, user_name=None))
- s = users.select(not_(literal("john").in_([])))
+ s = users.select().where(not_(literal("john").in_([])))
r = connection.execute(s).fetchall()
assert len(r) == 3
],
)
- s = users.select(users.c.user_name.in_([]) == True) # noqa
+ s = users.select().where(users.c.user_name.in_([]) == True) # noqa
r = connection.execute(s).fetchall()
assert len(r) == 0
- s = users.select(users.c.user_name.in_([]) == False) # noqa
+ s = users.select().where(users.c.user_name.in_([]) == False) # noqa
r = connection.execute(s).fetchall()
assert len(r) == 3
- s = users.select(users.c.user_name.in_([]) == None) # noqa
+ s = users.select().where(users.c.user_name.in_([]) == None) # noqa
r = connection.execute(s).fetchall()
assert len(r) == 0
],
)
- r = connection.execute(users.select(users.c.user_id == 2)).first()
+ r = connection.execute(
+ users.select().where(users.c.user_id == 2)
+ ).first()
eq_(r.user_id, 2)
eq_(r._mapping["user_id"], 2)
eq_(r._mapping[users.c.user_id], 2)
],
)
- r = connection.execute(users.select(users.c.user_id == 2)).first()
+ r = connection.execute(
+ users.select().where(users.c.user_id == 2)
+ ).first()
eq_(r.user_id, 2)
eq_(r._mapping["user_id"], 2)
users = self.tables.users
connection.execute(users.insert(), dict(user_id=1, user_name="john"))
- r = connection.execute(users.select(users.c.user_id == 1)).first()
+ r = connection.execute(
+ users.select().where(users.c.user_id == 1)
+ ).first()
connection.execute(users.delete())
connection.execute(users.insert(), r._mapping)
eq_(connection.execute(users.select()).fetchall(), [(1, "john")])
users = self.tables.users
connection.execute(users.insert(), dict(user_id=1, user_name="foo"))
- r = connection.execute(users.select(users.c.user_id == 1)).first()
+ r = connection.execute(
+ users.select().where(users.c.user_id == 1)
+ ).first()
eq_(r[0], 1)
eq_(r[1], "foo")
eq_([x.lower() for x in r._fields], ["user_id", "user_name"])
),
)
r = connection.execute(
- shadowed.select(shadowed.c.shadow_id == 1)
+ shadowed.select().where(shadowed.c.shadow_id == 1)
).first()
eq_(r.shadow_id, 1)
item_join = polymorphic_union(
{
- "BaseItem": base_item_table.select(
- base_item_table.c.child_name == "BaseItem"
- ).subquery(),
+ "BaseItem": base_item_table.select()
+ .where(base_item_table.c.child_name == "BaseItem")
+ .subquery(),
"Item": base_item_table.join(item_table),
},
None,