SET_RE = re.compile(
r"\s*SET\s+(?:(?:GLOBAL|SESSION)\s+)?\w", re.I | re.UNICODE
)
-
+ON_DUP_ALIAS_NAME = "_new"
# old names
MSTime = TIME
else:
cols = statement.table.c
+ alias_clause = ""
clauses = []
# traverses through all table columns to preserve table column order
for column in (col for col in cols if col.key in on_duplicate.update):
-
val = on_duplicate.update[column.key]
if coercions._is_literal(val):
else:
def replace(obj):
+ nonlocal alias
if (
isinstance(obj, elements.BindParameter)
and obj.type._isnull
obj.type = column.type
return obj
elif (
- isinstance(obj, elements.ColumnClause)
- and obj.table is on_duplicate.inserted_alias
+ isinstance(obj, elements.ColumnClause)
+ and obj.table is on_duplicate.inserted_alias
):
- obj = literal_column(
- "VALUES(" + self.preparer.quote(obj.name) + ")"
+ if not alias_clause:
+ alias = f"AS {ON_DUP_ALIAS_NAME} "
+ return literal_column(
+ f"{ON_DUP_ALIAS_NAME}.{self.preparer.quote(obj.name)}"
)
- return obj
else:
- # element is not replaced
- return None
+ # element is not replaced
+ return None
val = visitors.replacement_traverse(val, {}, replace)
value_text = self.process(val.self_group(), use_schema=False)
(", ".join("'%s'" % c for c in non_matching)),
)
)
-
- return "ON DUPLICATE KEY UPDATE " + ", ".join(clauses)
+ return alias_clause + "ON DUPLICATE KEY UPDATE " + ", ".join(clauses)
def visit_concat_op_expression_clauselist(
self, clauselist, operator, **kw
)
expected_sql = (
"INSERT INTO foos (id, bar) VALUES (%s, %s), (%s, %s) "
- "ON DUPLICATE KEY UPDATE bar = VALUES(bar), baz = VALUES(baz)"
+ f"AS {mysql.ON_DUP_ALIAS_NAME} ON DUPLICATE KEY UPDATE "
+ f"bar = {mysql.ON_DUP_ALIAS_NAME}.bar, "
+ f"baz = {mysql.ON_DUP_ALIAS_NAME}.baz"
)
self.assert_compile(stmt, expected_sql)
baz=stmt.inserted.baz + "some literal" + stmt.inserted.bar,
)
expected_sql = (
- "INSERT INTO foos (id, bar) VALUES (%s, %s), (%s, %s) ON "
- "DUPLICATE KEY UPDATE bar = coalesce(VALUES(bar)), "
- "baz = (concat(VALUES(baz), %s, VALUES(bar)))"
+ "INSERT INTO foos (id, bar) VALUES (%s, %s), (%s, %s) "
+ f"AS {mysql.ON_DUP_ALIAS_NAME} ON DUPLICATE KEY UPDATE bar = "
+ f"coalesce({mysql.ON_DUP_ALIAS_NAME}.bar), "
+ f"baz = (concat({mysql.ON_DUP_ALIAS_NAME}.baz, %s, {mysql.ON_DUP_ALIAS_NAME}.bar))"
)
self.assert_compile(
stmt,
},
)
-
class RegexpCommon(testing.AssertsCompiledSQL):
def setup_test(self):
self.table = table(