--- /dev/null
+.. change::
+ :tags: sqlite, usecase
+ :tickets: 4010
+
+ Implemented DO NOTHING and DO UPDATE SET (Upsert) for the
+ INSERT.. ON CONFLICT Clause as understood by the SQLite dialect.
+ Pull request courtesy Ramon Williams.
from .base import TIME
from .base import TIMESTAMP
from .base import VARCHAR
-
+from .dml import Insert
+from .dml import insert
# default dialect
base.dialect = dialect = pysqlite.dialect
"TIMESTAMP",
"VARCHAR",
"REAL",
+ "Insert",
+ "insert",
"dialect",
)
PRIMARY KEY (id) ON CONFLICT FAIL
)
+.. _sqlite_insert_on_conflict
+
+INSERT...ON CONFLICT (Upsert)
+-----------------------------------
+From version 3.24.0 onwards, SQLite supports "upserts" (update or insert)
+of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT``
+statement. A candidate row will only be inserted if that row does not violate
+any unique constraints. In the case of a unique constraint violation, a
+secondary action can occur which can be either “DO UPDATE”, indicating that
+the data in the target row should be updated, or “DO NOTHING”, which indicates
+to silently skip this row.
+
+Conflicts are determined using existing uniqueness constraints.
+These constraints may be identified either using their column name as stated
+in DDL, or they may be *inferred* by stating the columns and conditions that
+comprise the indexes.
+
+A "uniqueness constraint" is an explicit UNIQUE or PRIMARY KEY constraint
+within the CREATE TABLE statement, or a unique index.
+
+SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific
+:func:`_sqlite.insert()` function, which provides
+the generative methods :meth:`~.sqlite.Insert.on_conflict_do_update`
+and :meth:`~.sqlite.Insert.on_conflict_do_nothing`::
+
+ from sqlalchemy.dialects.sqlite import insert
+
+ insert_stmt = insert(my_table).values(
+ id='some_existing_id',
+ data='inserted value')
+
+ do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
+ index_elements=['id']
+ )
+
+ conn.execute(do_nothing_stmt)
+
+ do_update_stmt = insert_stmt.on_conflict_do_update(
+ constraint='id',
+ set_=dict(data='updated value')
+ )
+
+ conn.execute(do_update_stmt)
+
+Both methods supply the "target" of the conflict using either the
+named constraint or by column inference:
+
+* The :paramref:`.Insert.on_conflict_do_update.index_elements` argument
+ specifies a sequence containing string column names, :class:`_schema.Column`
+ objects, and/or SQL expression elements, which would identify a unique
+ index::
+
+
+ unique_index = schema.Index(
+ "unique_index_name",
+ my_table.c.id,
+ unique=True,
+ )
+
+ do_update_stmt = insert_stmt.on_conflict_do_update(
+ index_elements=['id'],
+ set_=dict(data='updated value')
+ )
+
+ do_update_stmt = insert_stmt.on_conflict_do_update(
+ index_elements=[my_table.c.id],
+ set_=dict(data='updated value')
+ )
+
+* When using :paramref:`.Insert.on_conflict_do_update.index_elements` to
+ infer an index, a partial index can be inferred by also specifying the
+ use the :paramref:`.Insert.on_conflict_do_update.index_where` parameter::
+
+ from sqlalchemy.dialects.sqlite import insert
+
+ unique_partial_index = schema.Index(
+ "unique_partial_index_name",
+ my_table.c.user_email,
+ unique=True,
+ sqlite_where=my_table.c.user_email.like('@gmail.com'),
+ )
+
+ stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
+ stmt = stmt.on_conflict_do_update(
+ index_elements=[my_table.c.user_email],
+ index_where=my_table.c.user_email.like('%@gmail.com'),
+ set_=dict(data=stmt.excluded.data)
+ )
+ conn.execute(stmt)
+
+* The :paramref:`.Insert.on_conflict_do_update.constraint` argument is
+ used to specify a constraint directly rather than inferring it. This can
+ either be the name of the underlying columns of a UNIQUE constraint or
+ a PRIMARY KEY constraint. SQLite does not support specifying the names of
+ an UNIQUE or PRIMARY KEY constraint and INDEX within the ON CONFLICT Clause.
+
+ That is, the following will not work::
+ unique_constraint_pk = schema.PrimaryKeyConstraint(
+ my_table.c.id, name="uq_user_pk"
+ )
+
+ do_update_stmt = insert_stmt.on_conflict_do_update(
+ constraint='uq_user_pk',
+ set_=dict(data='updated value')
+ )
+
+ But the statements below will work::
+ do_update_stmt_A = insert_stmt.on_conflict_do_update(
+ constraint=unique_constraint_pk,
+ set_=dict(id=1, data='updated value')
+ )
+
+ do_update_stmt_B = insert_stmt.on_conflict_do_update(
+ constraint='id',
+ set_=dict(id=1, data='updated value')
+ )
+
+* The :paramref:`.Insert.on_conflict_do_update.constraint` argument may
+ also refer to a SQLAlchemy construct representing a constraint,
+ e.g. :class:`.UniqueConstraint` or :class:`.PrimaryKeyConstraint`.
+ In this use, only the name of the constraint's first column is used directly.
+ If the constraint is unnamed, then inference will be used,
+ where the expressions and optional WHERE clause of the constraint will
+ be spelled out in the construct::
+
+ unique_constraint = schema.UniqueConstraint(
+ my_table.c.user_email, name="uq_user_email"
+ )
+
+ do_update_stmt = insert_stmt.on_conflict_do_update(
+ constraint=unique_constraint,
+ set_=dict(data='updated value')
+ )
+
+``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
+existing row, using any combination of new values as well as values
+from the proposed insertion. These values are specified using the
+:paramref:`.Insert.on_conflict_do_update.set_` parameter. This
+parameter accepts a dictionary which consists of direct values
+for UPDATE::
+
+ from sqlalchemy.dialects.sqlite import insert
+
+ stmt = insert(my_table).values(id='some_id', data='inserted value')
+ do_update_stmt = stmt.on_conflict_do_update(
+ index_elements=['id'],
+ set_=dict(data='updated value')
+ )
+ conn.execute(do_update_stmt)
+
+.. warning::
+
+ The :meth:`_expression.Insert.on_conflict_do_update`
+ method does **not** take into
+ account Python-side default UPDATE values or generation functions, e.g.
+ those specified using :paramref:`_schema.Column.onupdate`.
+ These values will not be exercised for an ON CONFLICT style of UPDATE,
+ unless they are manually specified in the
+ :paramref:`.Insert.on_conflict_do_update.set_` dictionary.
+
+In order to refer to the proposed insertion row, the special alias
+:attr:`~.sqlite.Insert.excluded` is available as an attribute on
+the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix
+on a column, that informs the DO UPDATE to update the row with the value that
+would have been inserted had the constraint not failed::
+
+ from sqlalchemy.dialects.sqlite import insert
+
+ stmt = insert(my_table).values(
+ id='some_id',
+ data='inserted value',
+ author='jlh')
+ do_update_stmt = stmt.on_conflict_do_update(
+ index_elements=['id'],
+ set_=dict(data='updated value', author=stmt.excluded.author)
+ )
+ conn.execute(do_update_stmt)
+
+The :meth:`_expression.Insert.on_conflict_do_update` method also accepts
+a WHERE clause using the :paramref:`.Insert.on_conflict_do_update.where`
+parameter, which will limit those rows which receive an UPDATE::
+
+ from sqlalchemy.dialects.sqlite import insert
+
+ stmt = insert(my_table).values(
+ id='some_id',
+ data='inserted value',
+ author='jlh')
+ on_update_stmt = stmt.on_conflict_do_update(
+ index_elements=['id'],
+ set_=dict(data='updated value', author=stmt.excluded.author)
+ where=(my_table.c.status == 2)
+ )
+ conn.execute(on_update_stmt)
+
+``ON CONFLICT`` may also be used to skip inserting a row entirely
+if any conflict with a unique constraint occurs; below
+this is illustrated using the
+:meth:`~.sqlite.Insert.on_conflict_do_nothing` method::
+
+ from sqlalchemy.dialects.sqlite import insert
+
+ stmt = insert(my_table).values(id='some_id', data='inserted value')
+ stmt = stmt.on_conflict_do_nothing(index_elements=['id'])
+ conn.execute(stmt)
+
+If ``DO NOTHING`` is used without specifying any columns or constraint,
+it has the effect of skipping the INSERT for any unique violation which
+occurs::
+
+ from sqlalchemy.dialects.sqlite import insert
+
+ stmt = insert(my_table).values(id='some_id', data='inserted value')
+ stmt = stmt.on_conflict_do_nothing()
+ conn.execute(stmt)
+
+The SQLite ``ON CONFLICT`` also supports multiple conflict targets. For example
+a unique index can be created over multiple columns, then this constraint will
+be inferred by listing the columns in index_elements::
+
+ unique_index = schema.Index(
+ "unique_index_name",
+ my_table.c.id,
+ my_table.c.data,
+ unique=True,
+ )
+
+ insert_stmt = insert(my_table).values(id='some_id', data='old_value')
+ do_update_stmt = insert_stmt.on_conflict_do_update(
+ index_elements=['id', 'data'],
+ set_=dict(data='new_value')
+ )
+
+This will render the following ``ON CONFLICT...DO UPDATE`` statement::
+ INSERT INTO my_table (id, data) VALUES ('some_id', 'old_value')
+ ON CONFLICT (id, date)
+ DO UPDATE SET data = 'new_value';
+
+
+.. versionadded:: 1.4 Added support for SQLite ON CONFLICT clauses
+
+.. seealso::
+
+ `Upsert
+ <https://sqlite.org/lang_UPSERT.htmlT>`_
+ - in the SQLite documentation.
+
.. _sqlite_type_reflection:
Type Reflection
from ... import util
from ...engine import default
from ...engine import reflection
+from ...sql import coercions
from ...sql import ColumnElement
from ...sql import compiler
+from ...sql import elements
+from ...sql import roles
from ...types import BLOB # noqa
from ...types import BOOLEAN # noqa
from ...types import CHAR # noqa
def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
return self._generate_generic_binary(binary, " NOT REGEXP ", **kw)
+ def _on_conflict_target(self, clause, **kw):
+ if clause.constraint_target is not None:
+ target_text = "(%s)" % clause.constraint_target
+ elif clause.inferred_target_elements is not None:
+ target_text = "(%s)" % ", ".join(
+ (
+ self.preparer.quote(c)
+ if isinstance(c, util.string_types)
+ else self.process(c, include_table=False, use_schema=False)
+ )
+ for c in clause.inferred_target_elements
+ )
+ if clause.inferred_target_whereclause is not None:
+ target_text += " WHERE %s" % self.process(
+ clause.inferred_target_whereclause,
+ include_table=False,
+ use_schema=False,
+ literal_binds=True,
+ )
+
+ else:
+ target_text = ""
+
+ return target_text
+
+ def visit_on_conflict_do_nothing(self, on_conflict, **kw):
+
+ target_text = self._on_conflict_target(on_conflict, **kw)
+
+ if target_text:
+ return "ON CONFLICT %s DO NOTHING" % target_text
+ else:
+ return "ON CONFLICT DO NOTHING"
+
+ def visit_on_conflict_do_update(self, on_conflict, **kw):
+ clause = on_conflict
+
+ target_text = self._on_conflict_target(on_conflict, **kw)
+
+ action_set_ops = []
+
+ set_parameters = dict(clause.update_values_to_set)
+ # create a list of column assignment clauses as tuples
+
+ insert_statement = self.stack[-1]["selectable"]
+ cols = insert_statement.table.c
+ for c in cols:
+ col_key = c.key
+ if col_key in set_parameters:
+ value = set_parameters.pop(col_key)
+ if coercions._is_literal(value):
+ value = elements.BindParameter(None, value, type_=c.type)
+
+ else:
+ if (
+ isinstance(value, elements.BindParameter)
+ and value.type._isnull
+ ):
+ value = value._clone()
+ value.type = c.type
+ value_text = self.process(value.self_group(), use_schema=False)
+
+ key_text = self.preparer.quote(col_key)
+ action_set_ops.append("%s = %s" % (key_text, value_text))
+
+ # check for names that don't match columns
+ if set_parameters:
+ util.warn(
+ "Additional column names not matching "
+ "any column keys in table '%s': %s"
+ % (
+ self.statement.table.name,
+ (", ".join("'%s'" % c for c in set_parameters)),
+ )
+ )
+ for k, v in set_parameters.items():
+ key_text = (
+ self.preparer.quote(k)
+ if isinstance(k, util.string_types)
+ else self.process(k, use_schema=False)
+ )
+ value_text = self.process(
+ coercions.expect(roles.ExpressionElementRole, v),
+ use_schema=False,
+ )
+ action_set_ops.append("%s = %s" % (key_text, value_text))
+
+ action_text = ", ".join(action_set_ops)
+ if clause.update_whereclause is not None:
+ action_text += " WHERE %s" % self.process(
+ clause.update_whereclause, include_table=True, use_schema=False
+ )
+
+ return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
+
class SQLiteDDLCompiler(compiler.DDLCompiler):
def get_column_specification(self, column, **kwargs):
--- /dev/null
+# Copyright (C) 2005-2020 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+
+from ... import util
+from ...sql import schema
+from ...sql.base import _generative
+from ...sql.dml import Insert as StandardInsert
+from ...sql.elements import ClauseElement
+from ...sql.expression import alias
+from ...util.langhelpers import public_factory
+
+
+__all__ = ("Insert", "insert")
+
+
+class Insert(StandardInsert):
+ """SQLite-specific implementation of INSERT.
+
+ Adds methods for SQLite-specific syntaxes such as ON CONFLICT.
+
+ The :class:`_sqlite.Insert` object is created using the
+ :func:`sqlalchemy.dialects.sqlite.insert` function.
+
+ .. versionadded:: 1.4
+
+ """
+
+ @util.memoized_property
+ def excluded(self):
+ """Provide the ``excluded`` namespace for an ON CONFLICT statement
+
+ SQLite's ON CONFLICT clause allows reference to the row that would
+ be inserted, known as ``excluded``. This attribute provides
+ all columns in this row to be referenceable.
+
+ .. seealso::
+
+ :ref:`sqlite_insert_on_conflict` - example of how
+ to use :attr:`_expression.Insert.excluded`
+
+ """
+ return alias(self.table, name="excluded").columns
+
+ @_generative
+ def on_conflict_do_update(
+ self,
+ constraint=None,
+ index_elements=None,
+ index_where=None,
+ set_=None,
+ where=None,
+ ):
+ r"""
+ Specifies a DO UPDATE SET action for ON CONFLICT clause.
+
+ Either the ``constraint`` or ``index_elements`` argument is
+ required, but only one of these can be specified.
+
+ :param constraint:
+ The column name of a unique or primary key constraint on the table,
+ or the constraint object itself.
+
+ :param index_elements:
+ A sequence consisting of string column names, :class:`_schema.Column`
+ objects, or other column expression objects that will be used
+ to infer a target index.
+
+ :param index_where:
+ Additional WHERE criterion that can be used to infer a
+ conditional target index.
+
+ :param set\_:
+ Required argument. A dictionary or other mapping object
+ with column names as keys and expressions or literals as values,
+ specifying the ``SET`` actions to take.
+ If the target :class:`_schema.Column` specifies a ".
+ key" attribute distinct
+ from the column name, that key should be used.
+
+ .. warning:: This dictionary does **not** take into account
+ Python-specified default UPDATE values or generation functions,
+ e.g. those specified using :paramref:`_schema.Column.onupdate`.
+ These values will not be exercised for an ON CONFLICT style of
+ UPDATE, unless they are manually specified in the
+ :paramref:`.Insert.on_conflict_do_update.set_` dictionary.
+
+ :param where:
+ Optional argument. If present, can be a literal SQL
+ string or an acceptable expression for a ``WHERE`` clause
+ that restricts the rows affected by ``DO UPDATE SET``. Rows
+ not meeting the ``WHERE`` condition will not be updated
+ (effectively a ``DO NOTHING`` for those rows).
+
+ .. versionadded:: 1.4
+
+
+ .. seealso::
+
+ :ref:`sqlite_insert_on_conflict`
+
+ """
+ if isinstance(constraint, schema.UniqueConstraint) or isinstance(
+ constraint, schema.PrimaryKeyConstraint
+ ):
+ constraint = constraint.columns[0].name
+
+ self._post_values_clause = OnConflictDoUpdate(
+ constraint, index_elements, index_where, set_, where
+ )
+
+ @_generative
+ def on_conflict_do_nothing(
+ self, constraint=None, index_elements=None, index_where=None
+ ):
+ """
+ Specifies a DO NOTHING action for ON CONFLICT clause.
+
+ The ``constraint`` and ``index_elements`` arguments
+ are optional, but only one of these can be specified.
+
+ :param constraint:
+ The column name of a unique or primary key constraint on the table,
+ or the constraint object itself.
+
+ :param index_elements:
+ A sequence consisting of string column names, :class:`_schema.Column`
+ objects, or other column expression objects that will be used
+ to infer a target index.
+
+ :param index_where:
+ Additional WHERE criterion that can be used to infer a
+ conditional target index.
+
+ .. versionadded:: 1.4
+
+ .. seealso::
+
+ :ref:`sqlite_insert_on_conflict`
+
+ """
+
+ if isinstance(constraint, schema.UniqueConstraint) or isinstance(
+ constraint, schema.PrimaryKeyConstraint
+ ):
+ constraint = constraint.columns[0].name
+
+ self._post_values_clause = OnConflictDoNothing(
+ constraint, index_elements, index_where
+ )
+
+
+insert = public_factory(
+ Insert, ".dialects.sqlite.insert", ".dialects.sqlite.Insert"
+)
+
+
+class OnConflictClause(ClauseElement):
+ def __init__(self, constraint=None, index_elements=None, index_where=None):
+
+ if constraint is not None:
+ if not isinstance(constraint, util.string_types) and isinstance(
+ constraint, (schema.Index, schema.Constraint),
+ ):
+ constraint = getattr(constraint, "name") or constraint
+
+ if constraint is not None:
+ if index_elements is not None:
+ raise ValueError(
+ "'constraint' and 'index_elements' are mutually exclusive"
+ )
+
+ if isinstance(constraint, util.string_types):
+ self.constraint_target = constraint
+ self.inferred_target_elements = None
+ self.inferred_target_whereclause = None
+ elif isinstance(constraint, schema.Index):
+ index_elements = constraint.expressions
+ index_where = constraint.dialect_options["sqlite"].get("where")
+ else:
+ index_elements = constraint.columns
+ index_where = constraint.dialect_options["sqlite"].get("where")
+
+ if index_elements is not None:
+ self.constraint_target = None
+ self.inferred_target_elements = index_elements
+ self.inferred_target_whereclause = index_where
+ elif constraint is None:
+ self.constraint_target = (
+ self.inferred_target_elements
+ ) = self.inferred_target_whereclause = None
+
+
+class OnConflictDoNothing(OnConflictClause):
+ __visit_name__ = "on_conflict_do_nothing"
+
+
+class OnConflictDoUpdate(OnConflictClause):
+ __visit_name__ = "on_conflict_do_update"
+
+ def __init__(
+ self,
+ constraint=None,
+ index_elements=None,
+ index_where=None,
+ set_=None,
+ where=None,
+ ):
+ super(OnConflictDoUpdate, self).__init__(
+ constraint=constraint,
+ index_elements=index_elements,
+ index_where=index_where,
+ )
+
+ if (
+ self.inferred_target_elements is None
+ and self.constraint_target is None
+ ):
+ raise ValueError(
+ "Either constraint or index_elements, "
+ "but not both, must be specified unless DO NOTHING"
+ )
+
+ if not isinstance(set_, dict) or not set_:
+ raise ValueError("set parameter must be a non-empty dictionary")
+ self.update_values_to_set = [
+ (key, value) for key, value in set_.items()
+ ]
+ self.update_whereclause = where
from sqlalchemy import UniqueConstraint
from sqlalchemy import util
from sqlalchemy.dialects.sqlite import base as sqlite
+from sqlalchemy.dialects.sqlite import insert
from sqlalchemy.dialects.sqlite import pysqlite as pysqlite_dialect
from sqlalchemy.engine.url import make_url
from sqlalchemy.schema import CreateTable
self.table.c.myid.regexp_replace("pattern", "rep").compile,
dialect=sqlite.dialect(),
)
+
+
+class OnConflictTest(fixtures.TablesTest):
+
+ __only_on__ = "sqlite"
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "users",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("name", String(50)),
+ )
+
+ class SpecialType(sqltypes.TypeDecorator):
+ impl = String
+
+ def process_bind_param(self, value, dialect):
+ return value + " processed"
+
+ Table(
+ "bind_targets",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("data", SpecialType()),
+ )
+
+ users_xtra = Table(
+ "users_xtra",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("name", String(50)),
+ Column("login_email", String(50)),
+ Column("lets_index_this", String(50)),
+ )
+ cls.unique_partial_index = schema.Index(
+ "idx_unique_partial_name",
+ users_xtra.c.name,
+ users_xtra.c.lets_index_this,
+ unique=True,
+ sqlite_where=users_xtra.c.lets_index_this == "unique_name",
+ )
+
+ cls.unique_constraint = schema.UniqueConstraint(
+ users_xtra.c.login_email, name="uq_login_email"
+ )
+ cls.bogus_index = schema.Index(
+ "idx_special_ops",
+ users_xtra.c.lets_index_this,
+ sqlite_where=users_xtra.c.lets_index_this > "m",
+ )
+
+ def test_bad_args(self):
+ assert_raises(
+ ValueError, insert(self.tables.users).on_conflict_do_update
+ )
+
+ def test_on_conflict_do_nothing(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+ result = conn.execute(
+ insert(users).on_conflict_do_nothing(),
+ dict(id=1, name="name1"),
+ )
+ eq_(result.inserted_primary_key, (1,))
+
+ result = conn.execute(
+ insert(users).on_conflict_do_nothing(),
+ dict(id=1, name="name2"),
+ )
+ eq_(result.inserted_primary_key, (1,))
+
+ eq_(
+ conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+ [(1, "name1")],
+ )
+
+ def test_on_conflict_do_nothing_connectionless(self, connection):
+ users = self.tables.users_xtra
+
+ result = connection.execute(
+ insert(users).on_conflict_do_nothing(constraint="login_email"),
+ dict(name="name1", login_email="email1"),
+ )
+ eq_(result.inserted_primary_key, (1,))
+
+ result = connection.execute(
+ insert(users).on_conflict_do_nothing(constraint="login_email"),
+ dict(name="name2", login_email="email1"),
+ )
+ eq_(result.inserted_primary_key, (1,))
+
+ eq_(
+ connection.execute(
+ users.select().where(users.c.id == 1)
+ ).fetchall(),
+ [(1, "name1", "email1", None)],
+ )
+
+ @testing.provide_metadata
+ def test_on_conflict_do_nothing_target(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+ result = conn.execute(
+ insert(users).on_conflict_do_nothing(
+ index_elements=users.primary_key.columns
+ ),
+ dict(id=1, name="name1"),
+ )
+ eq_(result.inserted_primary_key, (1,))
+
+ result = conn.execute(
+ insert(users).on_conflict_do_nothing(
+ index_elements=users.primary_key.columns
+ ),
+ dict(id=1, name="name2"),
+ )
+ eq_(result.inserted_primary_key, (1,))
+
+ eq_(
+ conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+ [(1, "name1")],
+ )
+
+ def test_on_conflict_do_update_one(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+ conn.execute(users.insert(), dict(id=1, name="name1"))
+
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=[users.c.id], set_=dict(name=i.excluded.name)
+ )
+ result = conn.execute(i, dict(id=1, name="name1"))
+
+ eq_(result.inserted_primary_key, (1,))
+
+ eq_(
+ conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+ [(1, "name1")],
+ )
+
+ def test_on_conflict_do_update_two(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+ conn.execute(users.insert(), dict(id=1, name="name1"))
+
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=[users.c.id],
+ set_=dict(id=i.excluded.id, name=i.excluded.name),
+ )
+
+ result = conn.execute(i, dict(id=1, name="name2"))
+ eq_(result.inserted_primary_key, (1,))
+
+ eq_(
+ conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+ [(1, "name2")],
+ )
+
+ def test_on_conflict_do_update_three(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+ conn.execute(users.insert(), dict(id=1, name="name1"))
+
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=users.primary_key.columns,
+ set_=dict(name=i.excluded.name),
+ )
+ result = conn.execute(i, dict(id=1, name="name3"))
+ eq_(result.inserted_primary_key, (1,))
+
+ eq_(
+ conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+ [(1, "name3")],
+ )
+
+ def test_on_conflict_do_update_four(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+ conn.execute(users.insert(), dict(id=1, name="name1"))
+
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=users.primary_key.columns,
+ set_=dict(id=i.excluded.id, name=i.excluded.name),
+ ).values(id=1, name="name4")
+
+ result = conn.execute(i)
+ eq_(result.inserted_primary_key, (1,))
+
+ eq_(
+ conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+ [(1, "name4")],
+ )
+
+ def test_on_conflict_do_update_five(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+ conn.execute(users.insert(), dict(id=1, name="name1"))
+
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=users.primary_key.columns,
+ set_=dict(id=10, name="I'm a name"),
+ ).values(id=1, name="name4")
+
+ result = conn.execute(i)
+ eq_(result.inserted_primary_key, (1,))
+
+ eq_(
+ conn.execute(
+ users.select().where(users.c.id == 10)
+ ).fetchall(),
+ [(10, "I'm a name")],
+ )
+
+ def test_on_conflict_do_update_multivalues(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+ conn.execute(users.insert(), dict(id=1, name="name1"))
+ conn.execute(users.insert(), dict(id=2, name="name2"))
+
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=users.primary_key.columns,
+ set_=dict(name="updated"),
+ where=(i.excluded.name != "name12"),
+ ).values(
+ [
+ dict(id=1, name="name11"),
+ dict(id=2, name="name12"),
+ dict(id=3, name="name13"),
+ dict(id=4, name="name14"),
+ ]
+ )
+
+ result = conn.execute(i)
+ eq_(result.inserted_primary_key, (None,))
+
+ eq_(
+ conn.execute(users.select().order_by(users.c.id)).fetchall(),
+ [(1, "updated"), (2, "name2"), (3, "name13"), (4, "name14")],
+ )
+
+ def _exotic_targets_fixture(self, conn):
+ users = self.tables.users_xtra
+
+ conn.execute(
+ insert(users),
+ dict(
+ id=1,
+ name="name1",
+ login_email="name1@gmail.com",
+ lets_index_this="not",
+ ),
+ )
+ conn.execute(
+ users.insert(),
+ dict(
+ id=2,
+ name="name2",
+ login_email="name2@gmail.com",
+ lets_index_this="not",
+ ),
+ )
+
+ eq_(
+ conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+ [(1, "name1", "name1@gmail.com", "not")],
+ )
+
+ def test_on_conflict_do_update_exotic_targets_two(self):
+ users = self.tables.users_xtra
+
+ with testing.db.connect() as conn:
+ self._exotic_targets_fixture(conn)
+ # try primary key constraint: cause an upsert on unique id column
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=users.primary_key.columns,
+ set_=dict(
+ name=i.excluded.name, login_email=i.excluded.login_email
+ ),
+ )
+ result = conn.execute(
+ i,
+ dict(
+ id=1,
+ name="name2",
+ login_email="name1@gmail.com",
+ lets_index_this="not",
+ ),
+ )
+ eq_(result.inserted_primary_key, (1,))
+
+ eq_(
+ conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+ [(1, "name2", "name1@gmail.com", "not")],
+ )
+
+ def test_on_conflict_do_update_exotic_targets_three(self):
+ users = self.tables.users_xtra
+
+ with testing.db.connect() as conn:
+ self._exotic_targets_fixture(conn)
+ # try unique constraint: cause an upsert on target
+ # login_email, not id
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ constraint=self.unique_constraint,
+ set_=dict(
+ id=i.excluded.id,
+ name=i.excluded.name,
+ login_email=i.excluded.login_email,
+ ),
+ )
+ # note: lets_index_this value totally ignored in SET clause.
+ result = conn.execute(
+ i,
+ dict(
+ id=42,
+ name="nameunique",
+ login_email="name2@gmail.com",
+ lets_index_this="unique",
+ ),
+ )
+ eq_(result.inserted_primary_key, (42,))
+
+ eq_(
+ conn.execute(
+ users.select().where(
+ users.c.login_email == "name2@gmail.com"
+ )
+ ).fetchall(),
+ [(42, "nameunique", "name2@gmail.com", "not")],
+ )
+
+ def test_on_conflict_do_update_exotic_targets_four(self):
+ users = self.tables.users_xtra
+
+ with testing.db.connect() as conn:
+ self._exotic_targets_fixture(conn)
+ # try unique constraint by name: cause an
+ # upsert on target login_email, not id
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ constraint="login_email",
+ set_=dict(
+ id=i.excluded.id,
+ name=i.excluded.name,
+ login_email=i.excluded.login_email,
+ ),
+ )
+ # note: lets_index_this value totally ignored in SET clause.
+
+ result = conn.execute(
+ i,
+ dict(
+ id=43,
+ name="nameunique2",
+ login_email="name2@gmail.com",
+ lets_index_this="unique",
+ ),
+ )
+ eq_(result.inserted_primary_key, (43,))
+
+ eq_(
+ conn.execute(
+ users.select().where(
+ users.c.login_email == "name2@gmail.com"
+ )
+ ).fetchall(),
+ [(43, "nameunique2", "name2@gmail.com", "not")],
+ )
+
+ def test_on_conflict_do_update_exotic_targets_four_no_pk(self):
+ users = self.tables.users_xtra
+
+ with testing.db.connect() as conn:
+ self._exotic_targets_fixture(conn)
+ # try unique constraint by name: cause an
+ # upsert on target login_email, not id
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=[users.c.login_email],
+ set_=dict(
+ id=i.excluded.id,
+ name=i.excluded.name,
+ login_email=i.excluded.login_email,
+ ),
+ )
+
+ conn.execute(i, dict(name="name3", login_email="name1@gmail.com"))
+
+ eq_(
+ conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+ [],
+ )
+
+ eq_(
+ conn.execute(users.select().order_by(users.c.id)).fetchall(),
+ [
+ (2, "name2", "name2@gmail.com", "not"),
+ (3, "name3", "name1@gmail.com", "not"),
+ ],
+ )
+
+ def test_on_conflict_do_update_exotic_targets_five(self):
+ users = self.tables.users_xtra
+
+ with testing.db.connect() as conn:
+ self._exotic_targets_fixture(conn)
+ # try bogus index
+ i = insert(users)
+
+ i = i.on_conflict_do_update(
+ index_elements=self.bogus_index.columns,
+ index_where=self.bogus_index.dialect_options["sqlite"][
+ "where"
+ ],
+ set_=dict(
+ name=i.excluded.name, login_email=i.excluded.login_email
+ ),
+ )
+
+ assert_raises(
+ exc.OperationalError,
+ conn.execute,
+ i,
+ dict(
+ id=1,
+ name="namebogus",
+ login_email="bogus@gmail.com",
+ lets_index_this="bogus",
+ ),
+ )
+
+ def test_on_conflict_do_update_exotic_targets_six(self):
+ users = self.tables.users_xtra
+
+ with testing.db.connect() as conn:
+ conn.execute(
+ insert(users),
+ dict(
+ id=1,
+ name="name1",
+ login_email="mail1@gmail.com",
+ lets_index_this="unique_name",
+ ),
+ )
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=self.unique_partial_index.columns,
+ index_where=self.unique_partial_index.dialect_options[
+ "sqlite"
+ ]["where"],
+ set_=dict(
+ name=i.excluded.name, login_email=i.excluded.login_email
+ ),
+ )
+
+ conn.execute(
+ i,
+ [
+ dict(
+ name="name1",
+ login_email="mail2@gmail.com",
+ lets_index_this="unique_name",
+ )
+ ],
+ )
+
+ eq_(
+ conn.execute(users.select()).fetchall(),
+ [(1, "name1", "mail2@gmail.com", "unique_name")],
+ )
+
+ def test_on_conflict_do_update_no_row_actually_affected(self):
+ users = self.tables.users_xtra
+
+ with testing.db.connect() as conn:
+ self._exotic_targets_fixture(conn)
+ i = insert(users)
+ i = i.on_conflict_do_update(
+ index_elements=[users.c.login_email],
+ set_=dict(name="new_name"),
+ where=(i.excluded.name == "other_name"),
+ )
+ result = conn.execute(
+ i, dict(name="name2", login_email="name1@gmail.com")
+ )
+
+ # The last inserted primary key should be 2 here
+ # it is taking the result from the the exotic fixture
+ eq_(result.inserted_primary_key, (2,))
+
+ eq_(
+ conn.execute(users.select()).fetchall(),
+ [
+ (1, "name1", "name1@gmail.com", "not"),
+ (2, "name2", "name2@gmail.com", "not"),
+ ],
+ )
+
+ def test_on_conflict_do_update_special_types_in_set(self):
+ bind_targets = self.tables.bind_targets
+
+ with testing.db.connect() as conn:
+ i = insert(bind_targets)
+ conn.execute(i, {"id": 1, "data": "initial data"})
+
+ eq_(
+ conn.scalar(sql.select(bind_targets.c.data)),
+ "initial data processed",
+ )
+
+ i = insert(bind_targets)
+ i = i.on_conflict_do_update(
+ index_elements=[bind_targets.c.id],
+ set_=dict(data="new updated data"),
+ )
+ conn.execute(i, {"id": 1, "data": "new inserted data"})
+
+ eq_(
+ conn.scalar(sql.select(bind_targets.c.data)),
+ "new updated data processed",
+ )