--- /dev/null
+.. change::
+ :tags: bug, orm
+ :tickets: 6812
+
+ Fixed issue where the unit of work would internally use a 2.0-deprecated
+ SQL expression form, emitting a deprecation warning when SQLALCHEMY_WARN_20
+ were enabled.
+
if secondary_update:
associationrow = secondary_update[0]
- statement = self.secondary.update(
+ statement = self.secondary.update().where(
sql.and_(
*[
c == sql.bindparam("old_" + c.key, type_=c.type)
# WHERE matches 3, 3 rows changed
department = employees_table.c.department
r = connection.execute(
- employees_table.update(department == "C"), {"department": "Z"}
+ employees_table.update().where(department == "C"),
+ {"department": "Z"},
)
assert r.rowcount == 3
department = employees_table.c.department
r = connection.execute(
- employees_table.update(department == "C"), {"department": "C"}
+ employees_table.update().where(department == "C"),
+ {"department": "C"},
)
eq_(r.rowcount, 3)
department = employees_table.c.department
stmt = (
- employees_table.update(department == "C")
+ employees_table.update()
+ .where(department == "C")
.values(name=employees_table.c.department + "Z")
.return_defaults()
)
# WHERE matches 3, 3 rows deleted
department = employees_table.c.department
- r = connection.execute(employees_table.delete(department == "C"))
+ r = connection.execute(
+ employees_table.delete().where(department == "C")
+ )
eq_(r.rowcount, 3)
@testing.requires.sane_multi_rowcount
#
# DML
#
- r"The (?:update|delete).whereclause parameter will be removed in "
- "SQLAlchemy 2.0.",
r"The (?:insert|update).values parameter will be removed in "
"SQLAlchemy 2.0.",
r"The update.preserve_parameter_order parameter will be removed in "
def test_update(self):
t = table("sometable", column("somecolumn"))
self.assert_compile(
- t.update(t.c.somecolumn == 7),
+ t.update().where(t.c.somecolumn == 7),
"UPDATE sometable SET somecolumn=:somecolum"
"n WHERE sometable.somecolumn = "
":somecolumn_1",
schema="paj",
)
self.assert_compile(
- tbl.delete(tbl.c.id == 1),
+ tbl.delete().where(tbl.c.id == 1),
"DELETE FROM paj.test WHERE paj.test.id = " ":id_1",
)
s = select(tbl.c.id).where(tbl.c.id == 1)
schema="banana.paj",
)
self.assert_compile(
- tbl.delete(tbl.c.id == 1),
+ tbl.delete().where(tbl.c.id == 1),
"DELETE FROM banana.paj.test WHERE " "banana.paj.test.id = :id_1",
)
s = select(tbl.c.id).where(tbl.c.id == 1)
schema="banana split.paj",
)
self.assert_compile(
- tbl.delete(tbl.c.id == 1),
+ tbl.delete().where(tbl.c.id == 1),
"DELETE FROM [banana split].paj.test WHERE "
"[banana split].paj.test.id = :id_1",
)
schema="banana split.paj with a space",
)
self.assert_compile(
- tbl.delete(tbl.c.id == 1),
+ tbl.delete().where(tbl.c.id == 1),
"DELETE FROM [banana split].[paj with a "
"space].test WHERE [banana split].[paj "
"with a space].test.id = :id_1",
conn.execute(tbl.insert(), {"id": 1})
eq_(conn.scalar(tbl.select()), 1)
- @testing.provide_metadata
- def test_delete_schema(self, connection):
- meta = self.metadata
-
- is_(connection.dialect.legacy_schema_aliasing, False)
-
- tbl = Table(
- "test",
- meta,
- Column("id", Integer, primary_key=True),
- schema=testing.config.test_schema,
- )
- tbl.create(connection)
- connection.execute(tbl.insert(), {"id": 1})
- eq_(connection.scalar(tbl.select()), 1)
- connection.execute(tbl.delete(tbl.c.id == 1))
- eq_(connection.scalar(tbl.select()), None)
-
@testing.provide_metadata
def test_delete_schema_legacy(self):
meta = self.metadata
tbl.create(conn)
conn.execute(tbl.insert(), {"id": 1})
eq_(conn.scalar(tbl.select()), 1)
- conn.execute(tbl.delete(tbl.c.id == 1))
+ conn.execute(tbl.delete().where(tbl.c.id == 1))
eq_(conn.scalar(tbl.select()), None)
tbl.create(connection)
connection.execute(tbl.insert(), {"id": 1})
eq_(connection.scalar(tbl.select()), 1)
- connection.execute(tbl.delete(tbl.c.id == 1))
+ connection.execute(tbl.delete().where(tbl.c.id == 1))
eq_(connection.scalar(tbl.select()), None)
@testing.provide_metadata
row[0].tzinfo.utcoffset(row[0]),
)
result = connection.execute(
- tztable.update(tztable.c.id == 1).returning(tztable.c.date),
+ tztable.update()
+ .where(tztable.c.id == 1)
+ .returning(tztable.c.date),
dict(
name="newname",
),
eq_(row[0], somedate)
eq_(row[0].tzinfo, None)
result = connection.execute(
- notztable.update(notztable.c.id == 1).returning(notztable.c.date),
+ notztable.update()
+ .where(notztable.c.id == 1)
+ .returning(notztable.c.date),
dict(
name="newname",
),
u.name = "foo"
sess.flush()
# change the value in the DB
- sess.execute(users.update(users.c.id == 7, values=dict(name="jack")))
+ sess.execute(
+ users.update(values=dict(name="jack")).where(users.c.id == 7)
+ )
sess.expire(u)
# object isn't refreshed yet, using dict to bypass trigger
assert u.__dict__.get("name") != "jack"
sess.expire(u)
assert "name" not in u.__dict__
- sess.execute(users.update(users.c.id == 7), dict(name="jack2"))
+ sess.execute(users.update().where(users.c.id == 7), dict(name="jack2"))
assert u.name == "jack2"
assert u.uname == "jack2"
assert "name" in u.__dict__
assert attributes.instance_state(o).dict["isopen"] == 1
sess.execute(
- orders.update(orders.c.id == 3),
+ orders.update().where(orders.c.id == 3),
dict(description="order 3 modified"),
)
sess.add(u1)
sess.commit()
- sess.execute(users.update(users.c.name == "ed").values(name="edward"))
+ sess.execute(
+ users.update().where(users.c.name == "ed").values(name="edward")
+ )
assert u1.name == "ed"
sess.expire_all()
)
)
cte = q.cte("deldup")
- stmt = delete(cte, text("RN > 1"))
+ stmt = delete(cte).where(text("RN > 1"))
eq_(stmt.compile().execution_options["autocommit"], True)
self.assert_compile(
)
result = connection.execute(
- table.update(table.c.persons > 4, dict(full=True)).returning(
- table.c.id
- )
+ table.update()
+ .values(dict(full=True))
+ .where(table.c.persons > 4)
+ .returning(table.c.id)
)
eq_(result.fetchall(), [(1,)])
)
result = connection.execute(
- table.update(table.c.persons > 2)
+ table.update()
+ .where(table.c.persons > 2)
.values(full=True)
.returning(table.c.id, table.c.full)
)
)
result = connection.execute(
- table.delete(table.c.persons > 4).returning(table.c.id)
+ table.delete().where(table.c.persons > 4).returning(table.c.id)
)
eq_(result.fetchall(), [(1,)])