clauses = []
for column in cols:
- val = on_duplicate.update.get(column.key)
- if val is None:
+ try:
+ val = on_duplicate.update[column.key]
+ except KeyError:
continue
- elif coercions._is_literal(val):
+
+ if coercions._is_literal(val):
val = elements.BindParameter(None, val, type_=column.type)
value_text = self.process(val.self_group(), use_schema=False)
elif isinstance(val, elements.BindParameter) and val.type._isnull:
[(1, "ab", "bz", False)],
)
+ def test_on_duplicate_key_update_null(self):
+ foos = self.tables.foos
+ with testing.db.connect() as conn:
+ conn.execute(insert(foos, dict(id=1, bar="b", baz="bz")))
+ stmt = insert(foos).values(
+ [dict(id=1, bar="ab"), dict(id=2, bar="b")]
+ )
+ stmt = stmt.on_duplicate_key_update(updated_once=None)
+ result = conn.execute(stmt)
+ eq_(result.inserted_primary_key, [2])
+ eq_(
+ conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
+ [(1, "b", "bz", None)],
+ )
+
def test_on_duplicate_key_update_preserve_order(self):
foos = self.tables.foos
with testing.db.connect() as conn: