]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
support upsert for SQLite
authorRamonWill <ramonwilliams@hotmail.co.uk>
Mon, 14 Sep 2020 00:31:42 +0000 (01:31 +0100)
committerRamonWill <ramonwilliams@hotmail.co.uk>
Mon, 14 Sep 2020 00:31:42 +0000 (01:31 +0100)
doc/build/changelog/unreleased_14/4010.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/sqlite/__init__.py
lib/sqlalchemy/dialects/sqlite/base.py
lib/sqlalchemy/dialects/sqlite/dml.py [new file with mode: 0644]
test/dialect/test_sqlite.py

diff --git a/doc/build/changelog/unreleased_14/4010.rst b/doc/build/changelog/unreleased_14/4010.rst
new file mode 100644 (file)
index 0000000..2a40d41
--- /dev/null
@@ -0,0 +1,7 @@
+.. change::
+    :tags: sqlite, usecase
+    :tickets: 4010
+
+    Implemented DO NOTHING and DO UPDATE SET (Upsert) for the
+    INSERT.. ON CONFLICT Clause as understood by the SQLite dialect.
+    Pull request courtesy Ramon Williams.
index 142131f631bddeed30951f5caca9a58d763414ff..72402dd92bab59931899bafa1ab85c07e0e6ef62 100644 (file)
@@ -24,7 +24,8 @@ from .base import TEXT
 from .base import TIME
 from .base import TIMESTAMP
 from .base import VARCHAR
-
+from .dml import Insert
+from .dml import insert
 
 # default dialect
 base.dialect = dialect = pysqlite.dialect
@@ -47,5 +48,7 @@ __all__ = (
     "TIMESTAMP",
     "VARCHAR",
     "REAL",
+    "Insert",
+    "insert",
     "dialect",
 )
index 8ef35514abe29afae9a9487745f805fcee1a7607..82d01891d8afc3e5c407b36576035aa8d48c6c4d 100644 (file)
@@ -402,6 +402,253 @@ resolution algorithm is applied to the constraint itself::
         PRIMARY KEY (id) ON CONFLICT FAIL
     )
 
+.. _sqlite_insert_on_conflict
+
+INSERT...ON CONFLICT (Upsert)
+-----------------------------------
+From version 3.24.0 onwards, SQLite supports "upserts" (update or insert)
+of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT``
+statement. A candidate row will only be inserted if that row does not violate
+any unique constraints. In the case of a unique constraint violation, a
+secondary action can occur which can be either “DO UPDATE”, indicating that
+the data in the target row should be updated, or “DO NOTHING”, which indicates
+to silently skip this row.
+
+Conflicts are determined using existing uniqueness constraints.
+These constraints may be identified either using their column name as stated
+in DDL, or they may be *inferred* by stating the columns and conditions that
+comprise the indexes.
+
+A "uniqueness constraint" is an explicit UNIQUE or PRIMARY KEY constraint
+within the CREATE TABLE statement, or a unique index.
+
+SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific
+:func:`_sqlite.insert()` function, which provides
+the generative methods :meth:`~.sqlite.Insert.on_conflict_do_update`
+and :meth:`~.sqlite.Insert.on_conflict_do_nothing`::
+
+    from sqlalchemy.dialects.sqlite import insert
+
+    insert_stmt = insert(my_table).values(
+        id='some_existing_id',
+        data='inserted value')
+
+    do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
+        index_elements=['id']
+    )
+
+    conn.execute(do_nothing_stmt)
+
+    do_update_stmt = insert_stmt.on_conflict_do_update(
+        constraint='id',
+        set_=dict(data='updated value')
+    )
+
+    conn.execute(do_update_stmt)
+
+Both methods supply the "target" of the conflict using either the
+named constraint or by column inference:
+
+* The :paramref:`.Insert.on_conflict_do_update.index_elements` argument
+  specifies a sequence containing string column names, :class:`_schema.Column`
+  objects, and/or SQL expression elements, which would identify a unique
+  index::
+
+
+    unique_index = schema.Index(
+        "unique_index_name",
+        my_table.c.id,
+        unique=True,
+    )
+
+    do_update_stmt = insert_stmt.on_conflict_do_update(
+        index_elements=['id'],
+        set_=dict(data='updated value')
+    )
+
+    do_update_stmt = insert_stmt.on_conflict_do_update(
+        index_elements=[my_table.c.id],
+        set_=dict(data='updated value')
+    )
+
+* When using :paramref:`.Insert.on_conflict_do_update.index_elements` to
+  infer an index, a partial index can be inferred by also specifying the
+  use the :paramref:`.Insert.on_conflict_do_update.index_where` parameter::
+
+    from sqlalchemy.dialects.sqlite import insert
+
+    unique_partial_index = schema.Index(
+        "unique_partial_index_name",
+        my_table.c.user_email,
+        unique=True,
+        sqlite_where=my_table.c.user_email.like('@gmail.com'),
+    )
+
+    stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
+    stmt = stmt.on_conflict_do_update(
+        index_elements=[my_table.c.user_email],
+        index_where=my_table.c.user_email.like('%@gmail.com'),
+        set_=dict(data=stmt.excluded.data)
+        )
+    conn.execute(stmt)
+
+* The :paramref:`.Insert.on_conflict_do_update.constraint` argument is
+  used to specify a constraint directly rather than inferring it.  This can
+  either be the name of the underlying columns of a UNIQUE constraint or
+  a PRIMARY KEY constraint. SQLite does not support specifying the names of
+  an UNIQUE or PRIMARY KEY constraint and INDEX within the ON CONFLICT Clause.
+
+  That is, the following will not work::
+    unique_constraint_pk = schema.PrimaryKeyConstraint(
+        my_table.c.id, name="uq_user_pk"
+    )
+
+    do_update_stmt = insert_stmt.on_conflict_do_update(
+        constraint='uq_user_pk',
+        set_=dict(data='updated value')
+    )
+
+ But the statements below will work::
+    do_update_stmt_A = insert_stmt.on_conflict_do_update(
+        constraint=unique_constraint_pk,
+        set_=dict(id=1, data='updated value')
+    )
+
+    do_update_stmt_B = insert_stmt.on_conflict_do_update(
+        constraint='id',
+        set_=dict(id=1, data='updated value')
+    )
+
+* The :paramref:`.Insert.on_conflict_do_update.constraint` argument may
+  also refer to a SQLAlchemy construct representing a constraint,
+  e.g. :class:`.UniqueConstraint` or :class:`.PrimaryKeyConstraint`.
+  In this use, only the name of the constraint's first column is used directly.
+  If the constraint is unnamed, then inference will be used,
+  where the expressions and optional WHERE clause of the constraint will
+  be spelled out in the construct::
+
+    unique_constraint = schema.UniqueConstraint(
+        my_table.c.user_email, name="uq_user_email"
+    )
+
+    do_update_stmt = insert_stmt.on_conflict_do_update(
+        constraint=unique_constraint,
+        set_=dict(data='updated value')
+    )
+
+``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
+existing row, using any combination of new values as well as values
+from the proposed insertion.   These values are specified using the
+:paramref:`.Insert.on_conflict_do_update.set_` parameter.  This
+parameter accepts a dictionary which consists of direct values
+for UPDATE::
+
+    from sqlalchemy.dialects.sqlite import insert
+
+    stmt = insert(my_table).values(id='some_id', data='inserted value')
+    do_update_stmt = stmt.on_conflict_do_update(
+        index_elements=['id'],
+        set_=dict(data='updated value')
+        )
+    conn.execute(do_update_stmt)
+
+.. warning::
+
+    The :meth:`_expression.Insert.on_conflict_do_update`
+    method does **not** take into
+    account Python-side default UPDATE values or generation functions, e.g.
+    those specified using :paramref:`_schema.Column.onupdate`.
+    These values will not be exercised for an ON CONFLICT style of UPDATE,
+    unless they are manually specified in the
+    :paramref:`.Insert.on_conflict_do_update.set_` dictionary.
+
+In order to refer to the proposed insertion row, the special alias
+:attr:`~.sqlite.Insert.excluded` is available as an attribute on
+the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix
+on a column, that informs the DO UPDATE to update the row with the value that
+would have been inserted had the constraint not failed::
+
+    from sqlalchemy.dialects.sqlite import insert
+
+    stmt = insert(my_table).values(
+        id='some_id',
+        data='inserted value',
+        author='jlh')
+    do_update_stmt = stmt.on_conflict_do_update(
+        index_elements=['id'],
+        set_=dict(data='updated value', author=stmt.excluded.author)
+        )
+    conn.execute(do_update_stmt)
+
+The :meth:`_expression.Insert.on_conflict_do_update` method also accepts
+a WHERE clause using the :paramref:`.Insert.on_conflict_do_update.where`
+parameter, which will limit those rows which receive an UPDATE::
+
+    from sqlalchemy.dialects.sqlite import insert
+
+    stmt = insert(my_table).values(
+        id='some_id',
+        data='inserted value',
+        author='jlh')
+    on_update_stmt = stmt.on_conflict_do_update(
+        index_elements=['id'],
+        set_=dict(data='updated value', author=stmt.excluded.author)
+        where=(my_table.c.status == 2)
+        )
+    conn.execute(on_update_stmt)
+
+``ON CONFLICT`` may also be used to skip inserting a row entirely
+if any conflict with a unique constraint occurs; below
+this is illustrated using the
+:meth:`~.sqlite.Insert.on_conflict_do_nothing` method::
+
+    from sqlalchemy.dialects.sqlite import insert
+
+    stmt = insert(my_table).values(id='some_id', data='inserted value')
+    stmt = stmt.on_conflict_do_nothing(index_elements=['id'])
+    conn.execute(stmt)
+
+If ``DO NOTHING`` is used without specifying any columns or constraint,
+it has the effect of skipping the INSERT for any unique violation which
+occurs::
+
+    from sqlalchemy.dialects.sqlite import insert
+
+    stmt = insert(my_table).values(id='some_id', data='inserted value')
+    stmt = stmt.on_conflict_do_nothing()
+    conn.execute(stmt)
+
+The SQLite ``ON CONFLICT`` also supports multiple conflict targets. For example
+a unique index can be created over multiple columns, then this constraint will
+be inferred by listing the columns in index_elements::
+
+    unique_index = schema.Index(
+        "unique_index_name",
+        my_table.c.id,
+        my_table.c.data,
+        unique=True,
+    )
+
+    insert_stmt = insert(my_table).values(id='some_id', data='old_value')
+    do_update_stmt = insert_stmt.on_conflict_do_update(
+        index_elements=['id', 'data'],
+        set_=dict(data='new_value')
+    )
+
+This will render the following ``ON CONFLICT...DO UPDATE`` statement::
+    INSERT INTO my_table (id, data) VALUES ('some_id', 'old_value')
+    ON CONFLICT (id, date)
+    DO UPDATE SET data = 'new_value';
+
+
+.. versionadded:: 1.4 Added support for SQLite ON CONFLICT clauses
+
+.. seealso::
+
+    `Upsert
+    <https://sqlite.org/lang_UPSERT.htmlT>`_
+    - in the SQLite documentation.
+
 .. _sqlite_type_reflection:
 
 Type Reflection
@@ -600,8 +847,11 @@ from ... import types as sqltypes
 from ... import util
 from ...engine import default
 from ...engine import reflection
+from ...sql import coercions
 from ...sql import ColumnElement
 from ...sql import compiler
+from ...sql import elements
+from ...sql import roles
 from ...types import BLOB  # noqa
 from ...types import BOOLEAN  # noqa
 from ...types import CHAR  # noqa
@@ -1083,6 +1333,101 @@ class SQLiteCompiler(compiler.SQLCompiler):
     def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
         return self._generate_generic_binary(binary, " NOT REGEXP ", **kw)
 
+    def _on_conflict_target(self, clause, **kw):
+        if clause.constraint_target is not None:
+            target_text = "(%s)" % clause.constraint_target
+        elif clause.inferred_target_elements is not None:
+            target_text = "(%s)" % ", ".join(
+                (
+                    self.preparer.quote(c)
+                    if isinstance(c, util.string_types)
+                    else self.process(c, include_table=False, use_schema=False)
+                )
+                for c in clause.inferred_target_elements
+            )
+            if clause.inferred_target_whereclause is not None:
+                target_text += " WHERE %s" % self.process(
+                    clause.inferred_target_whereclause,
+                    include_table=False,
+                    use_schema=False,
+                    literal_binds=True,
+                )
+
+        else:
+            target_text = ""
+
+        return target_text
+
+    def visit_on_conflict_do_nothing(self, on_conflict, **kw):
+
+        target_text = self._on_conflict_target(on_conflict, **kw)
+
+        if target_text:
+            return "ON CONFLICT %s DO NOTHING" % target_text
+        else:
+            return "ON CONFLICT DO NOTHING"
+
+    def visit_on_conflict_do_update(self, on_conflict, **kw):
+        clause = on_conflict
+
+        target_text = self._on_conflict_target(on_conflict, **kw)
+
+        action_set_ops = []
+
+        set_parameters = dict(clause.update_values_to_set)
+        # create a list of column assignment clauses as tuples
+
+        insert_statement = self.stack[-1]["selectable"]
+        cols = insert_statement.table.c
+        for c in cols:
+            col_key = c.key
+            if col_key in set_parameters:
+                value = set_parameters.pop(col_key)
+                if coercions._is_literal(value):
+                    value = elements.BindParameter(None, value, type_=c.type)
+
+                else:
+                    if (
+                        isinstance(value, elements.BindParameter)
+                        and value.type._isnull
+                    ):
+                        value = value._clone()
+                        value.type = c.type
+                value_text = self.process(value.self_group(), use_schema=False)
+
+                key_text = self.preparer.quote(col_key)
+                action_set_ops.append("%s = %s" % (key_text, value_text))
+
+        # check for names that don't match columns
+        if set_parameters:
+            util.warn(
+                "Additional column names not matching "
+                "any column keys in table '%s': %s"
+                % (
+                    self.statement.table.name,
+                    (", ".join("'%s'" % c for c in set_parameters)),
+                )
+            )
+            for k, v in set_parameters.items():
+                key_text = (
+                    self.preparer.quote(k)
+                    if isinstance(k, util.string_types)
+                    else self.process(k, use_schema=False)
+                )
+                value_text = self.process(
+                    coercions.expect(roles.ExpressionElementRole, v),
+                    use_schema=False,
+                )
+                action_set_ops.append("%s = %s" % (key_text, value_text))
+
+        action_text = ", ".join(action_set_ops)
+        if clause.update_whereclause is not None:
+            action_text += " WHERE %s" % self.process(
+                clause.update_whereclause, include_table=True, use_schema=False
+            )
+
+        return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
+
 
 class SQLiteDDLCompiler(compiler.DDLCompiler):
     def get_column_specification(self, column, **kwargs):
diff --git a/lib/sqlalchemy/dialects/sqlite/dml.py b/lib/sqlalchemy/dialects/sqlite/dml.py
new file mode 100644 (file)
index 0000000..2040ceb
--- /dev/null
@@ -0,0 +1,231 @@
+# Copyright (C) 2005-2020 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+
+from ... import util
+from ...sql import schema
+from ...sql.base import _generative
+from ...sql.dml import Insert as StandardInsert
+from ...sql.elements import ClauseElement
+from ...sql.expression import alias
+from ...util.langhelpers import public_factory
+
+
+__all__ = ("Insert", "insert")
+
+
+class Insert(StandardInsert):
+    """SQLite-specific implementation of INSERT.
+
+    Adds methods for SQLite-specific syntaxes such as ON CONFLICT.
+
+    The :class:`_sqlite.Insert` object is created using the
+    :func:`sqlalchemy.dialects.sqlite.insert` function.
+
+    .. versionadded:: 1.4
+
+    """
+
+    @util.memoized_property
+    def excluded(self):
+        """Provide the ``excluded`` namespace for an ON CONFLICT statement
+
+        SQLite's ON CONFLICT clause allows reference to the row that would
+        be inserted, known as ``excluded``.  This attribute provides
+        all columns in this row to be referenceable.
+
+        .. seealso::
+
+            :ref:`sqlite_insert_on_conflict` - example of how
+            to use :attr:`_expression.Insert.excluded`
+
+        """
+        return alias(self.table, name="excluded").columns
+
+    @_generative
+    def on_conflict_do_update(
+        self,
+        constraint=None,
+        index_elements=None,
+        index_where=None,
+        set_=None,
+        where=None,
+    ):
+        r"""
+        Specifies a DO UPDATE SET action for ON CONFLICT clause.
+
+        Either the ``constraint`` or ``index_elements`` argument is
+        required, but only one of these can be specified.
+
+        :param constraint:
+         The column name of a unique or primary key constraint on the table,
+         or the constraint object itself.
+
+        :param index_elements:
+         A sequence consisting of string column names, :class:`_schema.Column`
+         objects, or other column expression objects that will be used
+         to infer a target index.
+
+        :param index_where:
+         Additional WHERE criterion that can be used to infer a
+         conditional target index.
+
+        :param set\_:
+         Required argument. A dictionary or other mapping object
+         with column names as keys and expressions or literals as values,
+         specifying the ``SET`` actions to take.
+         If the target :class:`_schema.Column` specifies a ".
+         key" attribute distinct
+         from the column name, that key should be used.
+
+         .. warning:: This dictionary does **not** take into account
+            Python-specified default UPDATE values or generation functions,
+            e.g. those specified using :paramref:`_schema.Column.onupdate`.
+            These values will not be exercised for an ON CONFLICT style of
+            UPDATE, unless they are manually specified in the
+            :paramref:`.Insert.on_conflict_do_update.set_` dictionary.
+
+        :param where:
+         Optional argument. If present, can be a literal SQL
+         string or an acceptable expression for a ``WHERE`` clause
+         that restricts the rows affected by ``DO UPDATE SET``. Rows
+         not meeting the ``WHERE`` condition will not be updated
+         (effectively a ``DO NOTHING`` for those rows).
+
+         .. versionadded:: 1.4
+
+
+        .. seealso::
+
+            :ref:`sqlite_insert_on_conflict`
+
+        """
+        if isinstance(constraint, schema.UniqueConstraint) or isinstance(
+            constraint, schema.PrimaryKeyConstraint
+        ):
+            constraint = constraint.columns[0].name
+
+        self._post_values_clause = OnConflictDoUpdate(
+            constraint, index_elements, index_where, set_, where
+        )
+
+    @_generative
+    def on_conflict_do_nothing(
+        self, constraint=None, index_elements=None, index_where=None
+    ):
+        """
+        Specifies a DO NOTHING action for ON CONFLICT clause.
+
+        The ``constraint`` and ``index_elements`` arguments
+        are optional, but only one of these can be specified.
+
+        :param constraint:
+         The column name of a unique or primary key constraint on the table,
+         or the constraint object itself.
+
+        :param index_elements:
+         A sequence consisting of string column names, :class:`_schema.Column`
+         objects, or other column expression objects that will be used
+         to infer a target index.
+
+        :param index_where:
+         Additional WHERE criterion that can be used to infer a
+         conditional target index.
+
+         .. versionadded:: 1.4
+
+        .. seealso::
+
+            :ref:`sqlite_insert_on_conflict`
+
+        """
+
+        if isinstance(constraint, schema.UniqueConstraint) or isinstance(
+            constraint, schema.PrimaryKeyConstraint
+        ):
+            constraint = constraint.columns[0].name
+
+        self._post_values_clause = OnConflictDoNothing(
+            constraint, index_elements, index_where
+        )
+
+
+insert = public_factory(
+    Insert, ".dialects.sqlite.insert", ".dialects.sqlite.Insert"
+)
+
+
+class OnConflictClause(ClauseElement):
+    def __init__(self, constraint=None, index_elements=None, index_where=None):
+
+        if constraint is not None:
+            if not isinstance(constraint, util.string_types) and isinstance(
+                constraint, (schema.Index, schema.Constraint),
+            ):
+                constraint = getattr(constraint, "name") or constraint
+
+        if constraint is not None:
+            if index_elements is not None:
+                raise ValueError(
+                    "'constraint' and 'index_elements' are mutually exclusive"
+                )
+
+            if isinstance(constraint, util.string_types):
+                self.constraint_target = constraint
+                self.inferred_target_elements = None
+                self.inferred_target_whereclause = None
+            elif isinstance(constraint, schema.Index):
+                index_elements = constraint.expressions
+                index_where = constraint.dialect_options["sqlite"].get("where")
+            else:
+                index_elements = constraint.columns
+                index_where = constraint.dialect_options["sqlite"].get("where")
+
+        if index_elements is not None:
+            self.constraint_target = None
+            self.inferred_target_elements = index_elements
+            self.inferred_target_whereclause = index_where
+        elif constraint is None:
+            self.constraint_target = (
+                self.inferred_target_elements
+            ) = self.inferred_target_whereclause = None
+
+
+class OnConflictDoNothing(OnConflictClause):
+    __visit_name__ = "on_conflict_do_nothing"
+
+
+class OnConflictDoUpdate(OnConflictClause):
+    __visit_name__ = "on_conflict_do_update"
+
+    def __init__(
+        self,
+        constraint=None,
+        index_elements=None,
+        index_where=None,
+        set_=None,
+        where=None,
+    ):
+        super(OnConflictDoUpdate, self).__init__(
+            constraint=constraint,
+            index_elements=index_elements,
+            index_where=index_where,
+        )
+
+        if (
+            self.inferred_target_elements is None
+            and self.constraint_target is None
+        ):
+            raise ValueError(
+                "Either constraint or index_elements, "
+                "but not both, must be specified unless DO NOTHING"
+            )
+
+        if not isinstance(set_, dict) or not set_:
+            raise ValueError("set parameter must be a non-empty dictionary")
+        self.update_values_to_set = [
+            (key, value) for key, value in set_.items()
+        ]
+        self.update_whereclause = where
index cb418e99d6970aaa90a52dc4f67333115a3f3164..49ce5128e654f5f603481bfd13a3ff0540418f43 100644 (file)
@@ -36,6 +36,7 @@ from sqlalchemy import types as sqltypes
 from sqlalchemy import UniqueConstraint
 from sqlalchemy import util
 from sqlalchemy.dialects.sqlite import base as sqlite
+from sqlalchemy.dialects.sqlite import insert
 from sqlalchemy.dialects.sqlite import pysqlite as pysqlite_dialect
 from sqlalchemy.engine.url import make_url
 from sqlalchemy.schema import CreateTable
@@ -2683,3 +2684,542 @@ class RegexpTest(fixtures.TestBase, testing.AssertsCompiledSQL):
             self.table.c.myid.regexp_replace("pattern", "rep").compile,
             dialect=sqlite.dialect(),
         )
+
+
+class OnConflictTest(fixtures.TablesTest):
+
+    __only_on__ = "sqlite"
+
+    @classmethod
+    def define_tables(cls, metadata):
+        Table(
+            "users",
+            metadata,
+            Column("id", Integer, primary_key=True),
+            Column("name", String(50)),
+        )
+
+        class SpecialType(sqltypes.TypeDecorator):
+            impl = String
+
+            def process_bind_param(self, value, dialect):
+                return value + " processed"
+
+        Table(
+            "bind_targets",
+            metadata,
+            Column("id", Integer, primary_key=True),
+            Column("data", SpecialType()),
+        )
+
+        users_xtra = Table(
+            "users_xtra",
+            metadata,
+            Column("id", Integer, primary_key=True),
+            Column("name", String(50)),
+            Column("login_email", String(50)),
+            Column("lets_index_this", String(50)),
+        )
+        cls.unique_partial_index = schema.Index(
+            "idx_unique_partial_name",
+            users_xtra.c.name,
+            users_xtra.c.lets_index_this,
+            unique=True,
+            sqlite_where=users_xtra.c.lets_index_this == "unique_name",
+        )
+
+        cls.unique_constraint = schema.UniqueConstraint(
+            users_xtra.c.login_email, name="uq_login_email"
+        )
+        cls.bogus_index = schema.Index(
+            "idx_special_ops",
+            users_xtra.c.lets_index_this,
+            sqlite_where=users_xtra.c.lets_index_this > "m",
+        )
+
+    def test_bad_args(self):
+        assert_raises(
+            ValueError, insert(self.tables.users).on_conflict_do_update
+        )
+
+    def test_on_conflict_do_nothing(self):
+        users = self.tables.users
+
+        with testing.db.connect() as conn:
+            result = conn.execute(
+                insert(users).on_conflict_do_nothing(),
+                dict(id=1, name="name1"),
+            )
+            eq_(result.inserted_primary_key, (1,))
+
+            result = conn.execute(
+                insert(users).on_conflict_do_nothing(),
+                dict(id=1, name="name2"),
+            )
+            eq_(result.inserted_primary_key, (1,))
+
+            eq_(
+                conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+                [(1, "name1")],
+            )
+
+    def test_on_conflict_do_nothing_connectionless(self, connection):
+        users = self.tables.users_xtra
+
+        result = connection.execute(
+            insert(users).on_conflict_do_nothing(constraint="login_email"),
+            dict(name="name1", login_email="email1"),
+        )
+        eq_(result.inserted_primary_key, (1,))
+
+        result = connection.execute(
+            insert(users).on_conflict_do_nothing(constraint="login_email"),
+            dict(name="name2", login_email="email1"),
+        )
+        eq_(result.inserted_primary_key, (1,))
+
+        eq_(
+            connection.execute(
+                users.select().where(users.c.id == 1)
+            ).fetchall(),
+            [(1, "name1", "email1", None)],
+        )
+
+    @testing.provide_metadata
+    def test_on_conflict_do_nothing_target(self):
+        users = self.tables.users
+
+        with testing.db.connect() as conn:
+            result = conn.execute(
+                insert(users).on_conflict_do_nothing(
+                    index_elements=users.primary_key.columns
+                ),
+                dict(id=1, name="name1"),
+            )
+            eq_(result.inserted_primary_key, (1,))
+
+            result = conn.execute(
+                insert(users).on_conflict_do_nothing(
+                    index_elements=users.primary_key.columns
+                ),
+                dict(id=1, name="name2"),
+            )
+            eq_(result.inserted_primary_key, (1,))
+
+            eq_(
+                conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+                [(1, "name1")],
+            )
+
+    def test_on_conflict_do_update_one(self):
+        users = self.tables.users
+
+        with testing.db.connect() as conn:
+            conn.execute(users.insert(), dict(id=1, name="name1"))
+
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                index_elements=[users.c.id], set_=dict(name=i.excluded.name)
+            )
+            result = conn.execute(i, dict(id=1, name="name1"))
+
+            eq_(result.inserted_primary_key, (1,))
+
+            eq_(
+                conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+                [(1, "name1")],
+            )
+
+    def test_on_conflict_do_update_two(self):
+        users = self.tables.users
+
+        with testing.db.connect() as conn:
+            conn.execute(users.insert(), dict(id=1, name="name1"))
+
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                index_elements=[users.c.id],
+                set_=dict(id=i.excluded.id, name=i.excluded.name),
+            )
+
+            result = conn.execute(i, dict(id=1, name="name2"))
+            eq_(result.inserted_primary_key, (1,))
+
+            eq_(
+                conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+                [(1, "name2")],
+            )
+
+    def test_on_conflict_do_update_three(self):
+        users = self.tables.users
+
+        with testing.db.connect() as conn:
+            conn.execute(users.insert(), dict(id=1, name="name1"))
+
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                index_elements=users.primary_key.columns,
+                set_=dict(name=i.excluded.name),
+            )
+            result = conn.execute(i, dict(id=1, name="name3"))
+            eq_(result.inserted_primary_key, (1,))
+
+            eq_(
+                conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+                [(1, "name3")],
+            )
+
+    def test_on_conflict_do_update_four(self):
+        users = self.tables.users
+
+        with testing.db.connect() as conn:
+            conn.execute(users.insert(), dict(id=1, name="name1"))
+
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                index_elements=users.primary_key.columns,
+                set_=dict(id=i.excluded.id, name=i.excluded.name),
+            ).values(id=1, name="name4")
+
+            result = conn.execute(i)
+            eq_(result.inserted_primary_key, (1,))
+
+            eq_(
+                conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+                [(1, "name4")],
+            )
+
+    def test_on_conflict_do_update_five(self):
+        users = self.tables.users
+
+        with testing.db.connect() as conn:
+            conn.execute(users.insert(), dict(id=1, name="name1"))
+
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                index_elements=users.primary_key.columns,
+                set_=dict(id=10, name="I'm a name"),
+            ).values(id=1, name="name4")
+
+            result = conn.execute(i)
+            eq_(result.inserted_primary_key, (1,))
+
+            eq_(
+                conn.execute(
+                    users.select().where(users.c.id == 10)
+                ).fetchall(),
+                [(10, "I'm a name")],
+            )
+
+    def test_on_conflict_do_update_multivalues(self):
+        users = self.tables.users
+
+        with testing.db.connect() as conn:
+            conn.execute(users.insert(), dict(id=1, name="name1"))
+            conn.execute(users.insert(), dict(id=2, name="name2"))
+
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                index_elements=users.primary_key.columns,
+                set_=dict(name="updated"),
+                where=(i.excluded.name != "name12"),
+            ).values(
+                [
+                    dict(id=1, name="name11"),
+                    dict(id=2, name="name12"),
+                    dict(id=3, name="name13"),
+                    dict(id=4, name="name14"),
+                ]
+            )
+
+            result = conn.execute(i)
+            eq_(result.inserted_primary_key, (None,))
+
+            eq_(
+                conn.execute(users.select().order_by(users.c.id)).fetchall(),
+                [(1, "updated"), (2, "name2"), (3, "name13"), (4, "name14")],
+            )
+
+    def _exotic_targets_fixture(self, conn):
+        users = self.tables.users_xtra
+
+        conn.execute(
+            insert(users),
+            dict(
+                id=1,
+                name="name1",
+                login_email="name1@gmail.com",
+                lets_index_this="not",
+            ),
+        )
+        conn.execute(
+            users.insert(),
+            dict(
+                id=2,
+                name="name2",
+                login_email="name2@gmail.com",
+                lets_index_this="not",
+            ),
+        )
+
+        eq_(
+            conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+            [(1, "name1", "name1@gmail.com", "not")],
+        )
+
+    def test_on_conflict_do_update_exotic_targets_two(self):
+        users = self.tables.users_xtra
+
+        with testing.db.connect() as conn:
+            self._exotic_targets_fixture(conn)
+            # try primary key constraint: cause an upsert on unique id column
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                index_elements=users.primary_key.columns,
+                set_=dict(
+                    name=i.excluded.name, login_email=i.excluded.login_email
+                ),
+            )
+            result = conn.execute(
+                i,
+                dict(
+                    id=1,
+                    name="name2",
+                    login_email="name1@gmail.com",
+                    lets_index_this="not",
+                ),
+            )
+            eq_(result.inserted_primary_key, (1,))
+
+            eq_(
+                conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+                [(1, "name2", "name1@gmail.com", "not")],
+            )
+
+    def test_on_conflict_do_update_exotic_targets_three(self):
+        users = self.tables.users_xtra
+
+        with testing.db.connect() as conn:
+            self._exotic_targets_fixture(conn)
+            # try unique constraint: cause an upsert on target
+            # login_email, not id
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                constraint=self.unique_constraint,
+                set_=dict(
+                    id=i.excluded.id,
+                    name=i.excluded.name,
+                    login_email=i.excluded.login_email,
+                ),
+            )
+            # note: lets_index_this value totally ignored in SET clause.
+            result = conn.execute(
+                i,
+                dict(
+                    id=42,
+                    name="nameunique",
+                    login_email="name2@gmail.com",
+                    lets_index_this="unique",
+                ),
+            )
+            eq_(result.inserted_primary_key, (42,))
+
+            eq_(
+                conn.execute(
+                    users.select().where(
+                        users.c.login_email == "name2@gmail.com"
+                    )
+                ).fetchall(),
+                [(42, "nameunique", "name2@gmail.com", "not")],
+            )
+
+    def test_on_conflict_do_update_exotic_targets_four(self):
+        users = self.tables.users_xtra
+
+        with testing.db.connect() as conn:
+            self._exotic_targets_fixture(conn)
+            # try unique constraint by name: cause an
+            # upsert on target login_email, not id
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                constraint="login_email",
+                set_=dict(
+                    id=i.excluded.id,
+                    name=i.excluded.name,
+                    login_email=i.excluded.login_email,
+                ),
+            )
+            # note: lets_index_this value totally ignored in SET clause.
+
+            result = conn.execute(
+                i,
+                dict(
+                    id=43,
+                    name="nameunique2",
+                    login_email="name2@gmail.com",
+                    lets_index_this="unique",
+                ),
+            )
+            eq_(result.inserted_primary_key, (43,))
+
+            eq_(
+                conn.execute(
+                    users.select().where(
+                        users.c.login_email == "name2@gmail.com"
+                    )
+                ).fetchall(),
+                [(43, "nameunique2", "name2@gmail.com", "not")],
+            )
+
+    def test_on_conflict_do_update_exotic_targets_four_no_pk(self):
+        users = self.tables.users_xtra
+
+        with testing.db.connect() as conn:
+            self._exotic_targets_fixture(conn)
+            # try unique constraint by name: cause an
+            # upsert on target login_email, not id
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                index_elements=[users.c.login_email],
+                set_=dict(
+                    id=i.excluded.id,
+                    name=i.excluded.name,
+                    login_email=i.excluded.login_email,
+                ),
+            )
+
+            conn.execute(i, dict(name="name3", login_email="name1@gmail.com"))
+
+            eq_(
+                conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+                [],
+            )
+
+            eq_(
+                conn.execute(users.select().order_by(users.c.id)).fetchall(),
+                [
+                    (2, "name2", "name2@gmail.com", "not"),
+                    (3, "name3", "name1@gmail.com", "not"),
+                ],
+            )
+
+    def test_on_conflict_do_update_exotic_targets_five(self):
+        users = self.tables.users_xtra
+
+        with testing.db.connect() as conn:
+            self._exotic_targets_fixture(conn)
+            # try bogus index
+            i = insert(users)
+
+            i = i.on_conflict_do_update(
+                index_elements=self.bogus_index.columns,
+                index_where=self.bogus_index.dialect_options["sqlite"][
+                    "where"
+                ],
+                set_=dict(
+                    name=i.excluded.name, login_email=i.excluded.login_email
+                ),
+            )
+
+            assert_raises(
+                exc.OperationalError,
+                conn.execute,
+                i,
+                dict(
+                    id=1,
+                    name="namebogus",
+                    login_email="bogus@gmail.com",
+                    lets_index_this="bogus",
+                ),
+            )
+
+    def test_on_conflict_do_update_exotic_targets_six(self):
+        users = self.tables.users_xtra
+
+        with testing.db.connect() as conn:
+            conn.execute(
+                insert(users),
+                dict(
+                    id=1,
+                    name="name1",
+                    login_email="mail1@gmail.com",
+                    lets_index_this="unique_name",
+                ),
+            )
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                index_elements=self.unique_partial_index.columns,
+                index_where=self.unique_partial_index.dialect_options[
+                    "sqlite"
+                ]["where"],
+                set_=dict(
+                    name=i.excluded.name, login_email=i.excluded.login_email
+                ),
+            )
+
+            conn.execute(
+                i,
+                [
+                    dict(
+                        name="name1",
+                        login_email="mail2@gmail.com",
+                        lets_index_this="unique_name",
+                    )
+                ],
+            )
+
+            eq_(
+                conn.execute(users.select()).fetchall(),
+                [(1, "name1", "mail2@gmail.com", "unique_name")],
+            )
+
+    def test_on_conflict_do_update_no_row_actually_affected(self):
+        users = self.tables.users_xtra
+
+        with testing.db.connect() as conn:
+            self._exotic_targets_fixture(conn)
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                index_elements=[users.c.login_email],
+                set_=dict(name="new_name"),
+                where=(i.excluded.name == "other_name"),
+            )
+            result = conn.execute(
+                i, dict(name="name2", login_email="name1@gmail.com")
+            )
+
+            # The last inserted primary key should be 2 here
+            # it is taking the result from the the exotic fixture
+            eq_(result.inserted_primary_key, (2,))
+
+            eq_(
+                conn.execute(users.select()).fetchall(),
+                [
+                    (1, "name1", "name1@gmail.com", "not"),
+                    (2, "name2", "name2@gmail.com", "not"),
+                ],
+            )
+
+    def test_on_conflict_do_update_special_types_in_set(self):
+        bind_targets = self.tables.bind_targets
+
+        with testing.db.connect() as conn:
+            i = insert(bind_targets)
+            conn.execute(i, {"id": 1, "data": "initial data"})
+
+            eq_(
+                conn.scalar(sql.select(bind_targets.c.data)),
+                "initial data processed",
+            )
+
+            i = insert(bind_targets)
+            i = i.on_conflict_do_update(
+                index_elements=[bind_targets.c.id],
+                set_=dict(data="new updated data"),
+            )
+            conn.execute(i, {"id": 1, "data": "new inserted data"})
+
+            eq_(
+                conn.scalar(sql.select(bind_targets.c.data)),
+                "new updated data processed",
+            )