From: RamonWill Date: Mon, 14 Sep 2020 00:31:42 +0000 (+0100) Subject: support upsert for SQLite X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9b26063c41059ffb7c92adf844847ada7d086ffb;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git support upsert for SQLite --- diff --git a/doc/build/changelog/unreleased_14/4010.rst b/doc/build/changelog/unreleased_14/4010.rst new file mode 100644 index 0000000000..2a40d417ce --- /dev/null +++ b/doc/build/changelog/unreleased_14/4010.rst @@ -0,0 +1,7 @@ +.. change:: + :tags: sqlite, usecase + :tickets: 4010 + + Implemented DO NOTHING and DO UPDATE SET (Upsert) for the + INSERT.. ON CONFLICT Clause as understood by the SQLite dialect. + Pull request courtesy Ramon Williams. diff --git a/lib/sqlalchemy/dialects/sqlite/__init__.py b/lib/sqlalchemy/dialects/sqlite/__init__.py index 142131f631..72402dd92b 100644 --- a/lib/sqlalchemy/dialects/sqlite/__init__.py +++ b/lib/sqlalchemy/dialects/sqlite/__init__.py @@ -24,7 +24,8 @@ from .base import TEXT from .base import TIME from .base import TIMESTAMP from .base import VARCHAR - +from .dml import Insert +from .dml import insert # default dialect base.dialect = dialect = pysqlite.dialect @@ -47,5 +48,7 @@ __all__ = ( "TIMESTAMP", "VARCHAR", "REAL", + "Insert", + "insert", "dialect", ) diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py index 8ef35514ab..82d01891d8 100644 --- a/lib/sqlalchemy/dialects/sqlite/base.py +++ b/lib/sqlalchemy/dialects/sqlite/base.py @@ -402,6 +402,253 @@ resolution algorithm is applied to the constraint itself:: PRIMARY KEY (id) ON CONFLICT FAIL ) +.. _sqlite_insert_on_conflict + +INSERT...ON CONFLICT (Upsert) +----------------------------------- +From version 3.24.0 onwards, SQLite supports "upserts" (update or insert) +of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` +statement. A candidate row will only be inserted if that row does not violate +any unique constraints. In the case of a unique constraint violation, a +secondary action can occur which can be either “DO UPDATE”, indicating that +the data in the target row should be updated, or “DO NOTHING”, which indicates +to silently skip this row. + +Conflicts are determined using existing uniqueness constraints. +These constraints may be identified either using their column name as stated +in DDL, or they may be *inferred* by stating the columns and conditions that +comprise the indexes. + +A "uniqueness constraint" is an explicit UNIQUE or PRIMARY KEY constraint +within the CREATE TABLE statement, or a unique index. + +SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific +:func:`_sqlite.insert()` function, which provides +the generative methods :meth:`~.sqlite.Insert.on_conflict_do_update` +and :meth:`~.sqlite.Insert.on_conflict_do_nothing`:: + + from sqlalchemy.dialects.sqlite import insert + + insert_stmt = insert(my_table).values( + id='some_existing_id', + data='inserted value') + + do_nothing_stmt = insert_stmt.on_conflict_do_nothing( + index_elements=['id'] + ) + + conn.execute(do_nothing_stmt) + + do_update_stmt = insert_stmt.on_conflict_do_update( + constraint='id', + set_=dict(data='updated value') + ) + + conn.execute(do_update_stmt) + +Both methods supply the "target" of the conflict using either the +named constraint or by column inference: + +* The :paramref:`.Insert.on_conflict_do_update.index_elements` argument + specifies a sequence containing string column names, :class:`_schema.Column` + objects, and/or SQL expression elements, which would identify a unique + index:: + + + unique_index = schema.Index( + "unique_index_name", + my_table.c.id, + unique=True, + ) + + do_update_stmt = insert_stmt.on_conflict_do_update( + index_elements=['id'], + set_=dict(data='updated value') + ) + + do_update_stmt = insert_stmt.on_conflict_do_update( + index_elements=[my_table.c.id], + set_=dict(data='updated value') + ) + +* When using :paramref:`.Insert.on_conflict_do_update.index_elements` to + infer an index, a partial index can be inferred by also specifying the + use the :paramref:`.Insert.on_conflict_do_update.index_where` parameter:: + + from sqlalchemy.dialects.sqlite import insert + + unique_partial_index = schema.Index( + "unique_partial_index_name", + my_table.c.user_email, + unique=True, + sqlite_where=my_table.c.user_email.like('@gmail.com'), + ) + + stmt = insert(my_table).values(user_email='a@b.com', data='inserted data') + stmt = stmt.on_conflict_do_update( + index_elements=[my_table.c.user_email], + index_where=my_table.c.user_email.like('%@gmail.com'), + set_=dict(data=stmt.excluded.data) + ) + conn.execute(stmt) + +* The :paramref:`.Insert.on_conflict_do_update.constraint` argument is + used to specify a constraint directly rather than inferring it. This can + either be the name of the underlying columns of a UNIQUE constraint or + a PRIMARY KEY constraint. SQLite does not support specifying the names of + an UNIQUE or PRIMARY KEY constraint and INDEX within the ON CONFLICT Clause. + + That is, the following will not work:: + unique_constraint_pk = schema.PrimaryKeyConstraint( + my_table.c.id, name="uq_user_pk" + ) + + do_update_stmt = insert_stmt.on_conflict_do_update( + constraint='uq_user_pk', + set_=dict(data='updated value') + ) + + But the statements below will work:: + do_update_stmt_A = insert_stmt.on_conflict_do_update( + constraint=unique_constraint_pk, + set_=dict(id=1, data='updated value') + ) + + do_update_stmt_B = insert_stmt.on_conflict_do_update( + constraint='id', + set_=dict(id=1, data='updated value') + ) + +* The :paramref:`.Insert.on_conflict_do_update.constraint` argument may + also refer to a SQLAlchemy construct representing a constraint, + e.g. :class:`.UniqueConstraint` or :class:`.PrimaryKeyConstraint`. + In this use, only the name of the constraint's first column is used directly. + If the constraint is unnamed, then inference will be used, + where the expressions and optional WHERE clause of the constraint will + be spelled out in the construct:: + + unique_constraint = schema.UniqueConstraint( + my_table.c.user_email, name="uq_user_email" + ) + + do_update_stmt = insert_stmt.on_conflict_do_update( + constraint=unique_constraint, + set_=dict(data='updated value') + ) + +``ON CONFLICT...DO UPDATE`` is used to perform an update of the already +existing row, using any combination of new values as well as values +from the proposed insertion. These values are specified using the +:paramref:`.Insert.on_conflict_do_update.set_` parameter. This +parameter accepts a dictionary which consists of direct values +for UPDATE:: + + from sqlalchemy.dialects.sqlite import insert + + stmt = insert(my_table).values(id='some_id', data='inserted value') + do_update_stmt = stmt.on_conflict_do_update( + index_elements=['id'], + set_=dict(data='updated value') + ) + conn.execute(do_update_stmt) + +.. warning:: + + The :meth:`_expression.Insert.on_conflict_do_update` + method does **not** take into + account Python-side default UPDATE values or generation functions, e.g. + those specified using :paramref:`_schema.Column.onupdate`. + These values will not be exercised for an ON CONFLICT style of UPDATE, + unless they are manually specified in the + :paramref:`.Insert.on_conflict_do_update.set_` dictionary. + +In order to refer to the proposed insertion row, the special alias +:attr:`~.sqlite.Insert.excluded` is available as an attribute on +the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix +on a column, that informs the DO UPDATE to update the row with the value that +would have been inserted had the constraint not failed:: + + from sqlalchemy.dialects.sqlite import insert + + stmt = insert(my_table).values( + id='some_id', + data='inserted value', + author='jlh') + do_update_stmt = stmt.on_conflict_do_update( + index_elements=['id'], + set_=dict(data='updated value', author=stmt.excluded.author) + ) + conn.execute(do_update_stmt) + +The :meth:`_expression.Insert.on_conflict_do_update` method also accepts +a WHERE clause using the :paramref:`.Insert.on_conflict_do_update.where` +parameter, which will limit those rows which receive an UPDATE:: + + from sqlalchemy.dialects.sqlite import insert + + stmt = insert(my_table).values( + id='some_id', + data='inserted value', + author='jlh') + on_update_stmt = stmt.on_conflict_do_update( + index_elements=['id'], + set_=dict(data='updated value', author=stmt.excluded.author) + where=(my_table.c.status == 2) + ) + conn.execute(on_update_stmt) + +``ON CONFLICT`` may also be used to skip inserting a row entirely +if any conflict with a unique constraint occurs; below +this is illustrated using the +:meth:`~.sqlite.Insert.on_conflict_do_nothing` method:: + + from sqlalchemy.dialects.sqlite import insert + + stmt = insert(my_table).values(id='some_id', data='inserted value') + stmt = stmt.on_conflict_do_nothing(index_elements=['id']) + conn.execute(stmt) + +If ``DO NOTHING`` is used without specifying any columns or constraint, +it has the effect of skipping the INSERT for any unique violation which +occurs:: + + from sqlalchemy.dialects.sqlite import insert + + stmt = insert(my_table).values(id='some_id', data='inserted value') + stmt = stmt.on_conflict_do_nothing() + conn.execute(stmt) + +The SQLite ``ON CONFLICT`` also supports multiple conflict targets. For example +a unique index can be created over multiple columns, then this constraint will +be inferred by listing the columns in index_elements:: + + unique_index = schema.Index( + "unique_index_name", + my_table.c.id, + my_table.c.data, + unique=True, + ) + + insert_stmt = insert(my_table).values(id='some_id', data='old_value') + do_update_stmt = insert_stmt.on_conflict_do_update( + index_elements=['id', 'data'], + set_=dict(data='new_value') + ) + +This will render the following ``ON CONFLICT...DO UPDATE`` statement:: + INSERT INTO my_table (id, data) VALUES ('some_id', 'old_value') + ON CONFLICT (id, date) + DO UPDATE SET data = 'new_value'; + + +.. versionadded:: 1.4 Added support for SQLite ON CONFLICT clauses + +.. seealso:: + + `Upsert + `_ + - in the SQLite documentation. + .. _sqlite_type_reflection: Type Reflection @@ -600,8 +847,11 @@ from ... import types as sqltypes from ... import util from ...engine import default from ...engine import reflection +from ...sql import coercions from ...sql import ColumnElement from ...sql import compiler +from ...sql import elements +from ...sql import roles from ...types import BLOB # noqa from ...types import BOOLEAN # noqa from ...types import CHAR # noqa @@ -1083,6 +1333,101 @@ class SQLiteCompiler(compiler.SQLCompiler): def visit_not_regexp_match_op_binary(self, binary, operator, **kw): return self._generate_generic_binary(binary, " NOT REGEXP ", **kw) + def _on_conflict_target(self, clause, **kw): + if clause.constraint_target is not None: + target_text = "(%s)" % clause.constraint_target + elif clause.inferred_target_elements is not None: + target_text = "(%s)" % ", ".join( + ( + self.preparer.quote(c) + if isinstance(c, util.string_types) + else self.process(c, include_table=False, use_schema=False) + ) + for c in clause.inferred_target_elements + ) + if clause.inferred_target_whereclause is not None: + target_text += " WHERE %s" % self.process( + clause.inferred_target_whereclause, + include_table=False, + use_schema=False, + literal_binds=True, + ) + + else: + target_text = "" + + return target_text + + def visit_on_conflict_do_nothing(self, on_conflict, **kw): + + target_text = self._on_conflict_target(on_conflict, **kw) + + if target_text: + return "ON CONFLICT %s DO NOTHING" % target_text + else: + return "ON CONFLICT DO NOTHING" + + def visit_on_conflict_do_update(self, on_conflict, **kw): + clause = on_conflict + + target_text = self._on_conflict_target(on_conflict, **kw) + + action_set_ops = [] + + set_parameters = dict(clause.update_values_to_set) + # create a list of column assignment clauses as tuples + + insert_statement = self.stack[-1]["selectable"] + cols = insert_statement.table.c + for c in cols: + col_key = c.key + if col_key in set_parameters: + value = set_parameters.pop(col_key) + if coercions._is_literal(value): + value = elements.BindParameter(None, value, type_=c.type) + + else: + if ( + isinstance(value, elements.BindParameter) + and value.type._isnull + ): + value = value._clone() + value.type = c.type + value_text = self.process(value.self_group(), use_schema=False) + + key_text = self.preparer.quote(col_key) + action_set_ops.append("%s = %s" % (key_text, value_text)) + + # check for names that don't match columns + if set_parameters: + util.warn( + "Additional column names not matching " + "any column keys in table '%s': %s" + % ( + self.statement.table.name, + (", ".join("'%s'" % c for c in set_parameters)), + ) + ) + for k, v in set_parameters.items(): + key_text = ( + self.preparer.quote(k) + if isinstance(k, util.string_types) + else self.process(k, use_schema=False) + ) + value_text = self.process( + coercions.expect(roles.ExpressionElementRole, v), + use_schema=False, + ) + action_set_ops.append("%s = %s" % (key_text, value_text)) + + action_text = ", ".join(action_set_ops) + if clause.update_whereclause is not None: + action_text += " WHERE %s" % self.process( + clause.update_whereclause, include_table=True, use_schema=False + ) + + return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) + class SQLiteDDLCompiler(compiler.DDLCompiler): def get_column_specification(self, column, **kwargs): diff --git a/lib/sqlalchemy/dialects/sqlite/dml.py b/lib/sqlalchemy/dialects/sqlite/dml.py new file mode 100644 index 0000000000..2040ceb29f --- /dev/null +++ b/lib/sqlalchemy/dialects/sqlite/dml.py @@ -0,0 +1,231 @@ +# Copyright (C) 2005-2020 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: http://www.opensource.org/licenses/mit-license.php + +from ... import util +from ...sql import schema +from ...sql.base import _generative +from ...sql.dml import Insert as StandardInsert +from ...sql.elements import ClauseElement +from ...sql.expression import alias +from ...util.langhelpers import public_factory + + +__all__ = ("Insert", "insert") + + +class Insert(StandardInsert): + """SQLite-specific implementation of INSERT. + + Adds methods for SQLite-specific syntaxes such as ON CONFLICT. + + The :class:`_sqlite.Insert` object is created using the + :func:`sqlalchemy.dialects.sqlite.insert` function. + + .. versionadded:: 1.4 + + """ + + @util.memoized_property + def excluded(self): + """Provide the ``excluded`` namespace for an ON CONFLICT statement + + SQLite's ON CONFLICT clause allows reference to the row that would + be inserted, known as ``excluded``. This attribute provides + all columns in this row to be referenceable. + + .. seealso:: + + :ref:`sqlite_insert_on_conflict` - example of how + to use :attr:`_expression.Insert.excluded` + + """ + return alias(self.table, name="excluded").columns + + @_generative + def on_conflict_do_update( + self, + constraint=None, + index_elements=None, + index_where=None, + set_=None, + where=None, + ): + r""" + Specifies a DO UPDATE SET action for ON CONFLICT clause. + + Either the ``constraint`` or ``index_elements`` argument is + required, but only one of these can be specified. + + :param constraint: + The column name of a unique or primary key constraint on the table, + or the constraint object itself. + + :param index_elements: + A sequence consisting of string column names, :class:`_schema.Column` + objects, or other column expression objects that will be used + to infer a target index. + + :param index_where: + Additional WHERE criterion that can be used to infer a + conditional target index. + + :param set\_: + Required argument. A dictionary or other mapping object + with column names as keys and expressions or literals as values, + specifying the ``SET`` actions to take. + If the target :class:`_schema.Column` specifies a ". + key" attribute distinct + from the column name, that key should be used. + + .. warning:: This dictionary does **not** take into account + Python-specified default UPDATE values or generation functions, + e.g. those specified using :paramref:`_schema.Column.onupdate`. + These values will not be exercised for an ON CONFLICT style of + UPDATE, unless they are manually specified in the + :paramref:`.Insert.on_conflict_do_update.set_` dictionary. + + :param where: + Optional argument. If present, can be a literal SQL + string or an acceptable expression for a ``WHERE`` clause + that restricts the rows affected by ``DO UPDATE SET``. Rows + not meeting the ``WHERE`` condition will not be updated + (effectively a ``DO NOTHING`` for those rows). + + .. versionadded:: 1.4 + + + .. seealso:: + + :ref:`sqlite_insert_on_conflict` + + """ + if isinstance(constraint, schema.UniqueConstraint) or isinstance( + constraint, schema.PrimaryKeyConstraint + ): + constraint = constraint.columns[0].name + + self._post_values_clause = OnConflictDoUpdate( + constraint, index_elements, index_where, set_, where + ) + + @_generative + def on_conflict_do_nothing( + self, constraint=None, index_elements=None, index_where=None + ): + """ + Specifies a DO NOTHING action for ON CONFLICT clause. + + The ``constraint`` and ``index_elements`` arguments + are optional, but only one of these can be specified. + + :param constraint: + The column name of a unique or primary key constraint on the table, + or the constraint object itself. + + :param index_elements: + A sequence consisting of string column names, :class:`_schema.Column` + objects, or other column expression objects that will be used + to infer a target index. + + :param index_where: + Additional WHERE criterion that can be used to infer a + conditional target index. + + .. versionadded:: 1.4 + + .. seealso:: + + :ref:`sqlite_insert_on_conflict` + + """ + + if isinstance(constraint, schema.UniqueConstraint) or isinstance( + constraint, schema.PrimaryKeyConstraint + ): + constraint = constraint.columns[0].name + + self._post_values_clause = OnConflictDoNothing( + constraint, index_elements, index_where + ) + + +insert = public_factory( + Insert, ".dialects.sqlite.insert", ".dialects.sqlite.Insert" +) + + +class OnConflictClause(ClauseElement): + def __init__(self, constraint=None, index_elements=None, index_where=None): + + if constraint is not None: + if not isinstance(constraint, util.string_types) and isinstance( + constraint, (schema.Index, schema.Constraint), + ): + constraint = getattr(constraint, "name") or constraint + + if constraint is not None: + if index_elements is not None: + raise ValueError( + "'constraint' and 'index_elements' are mutually exclusive" + ) + + if isinstance(constraint, util.string_types): + self.constraint_target = constraint + self.inferred_target_elements = None + self.inferred_target_whereclause = None + elif isinstance(constraint, schema.Index): + index_elements = constraint.expressions + index_where = constraint.dialect_options["sqlite"].get("where") + else: + index_elements = constraint.columns + index_where = constraint.dialect_options["sqlite"].get("where") + + if index_elements is not None: + self.constraint_target = None + self.inferred_target_elements = index_elements + self.inferred_target_whereclause = index_where + elif constraint is None: + self.constraint_target = ( + self.inferred_target_elements + ) = self.inferred_target_whereclause = None + + +class OnConflictDoNothing(OnConflictClause): + __visit_name__ = "on_conflict_do_nothing" + + +class OnConflictDoUpdate(OnConflictClause): + __visit_name__ = "on_conflict_do_update" + + def __init__( + self, + constraint=None, + index_elements=None, + index_where=None, + set_=None, + where=None, + ): + super(OnConflictDoUpdate, self).__init__( + constraint=constraint, + index_elements=index_elements, + index_where=index_where, + ) + + if ( + self.inferred_target_elements is None + and self.constraint_target is None + ): + raise ValueError( + "Either constraint or index_elements, " + "but not both, must be specified unless DO NOTHING" + ) + + if not isinstance(set_, dict) or not set_: + raise ValueError("set parameter must be a non-empty dictionary") + self.update_values_to_set = [ + (key, value) for key, value in set_.items() + ] + self.update_whereclause = where diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py index cb418e99d6..49ce5128e6 100644 --- a/test/dialect/test_sqlite.py +++ b/test/dialect/test_sqlite.py @@ -36,6 +36,7 @@ from sqlalchemy import types as sqltypes from sqlalchemy import UniqueConstraint from sqlalchemy import util from sqlalchemy.dialects.sqlite import base as sqlite +from sqlalchemy.dialects.sqlite import insert from sqlalchemy.dialects.sqlite import pysqlite as pysqlite_dialect from sqlalchemy.engine.url import make_url from sqlalchemy.schema import CreateTable @@ -2683,3 +2684,542 @@ class RegexpTest(fixtures.TestBase, testing.AssertsCompiledSQL): self.table.c.myid.regexp_replace("pattern", "rep").compile, dialect=sqlite.dialect(), ) + + +class OnConflictTest(fixtures.TablesTest): + + __only_on__ = "sqlite" + + @classmethod + def define_tables(cls, metadata): + Table( + "users", + metadata, + Column("id", Integer, primary_key=True), + Column("name", String(50)), + ) + + class SpecialType(sqltypes.TypeDecorator): + impl = String + + def process_bind_param(self, value, dialect): + return value + " processed" + + Table( + "bind_targets", + metadata, + Column("id", Integer, primary_key=True), + Column("data", SpecialType()), + ) + + users_xtra = Table( + "users_xtra", + metadata, + Column("id", Integer, primary_key=True), + Column("name", String(50)), + Column("login_email", String(50)), + Column("lets_index_this", String(50)), + ) + cls.unique_partial_index = schema.Index( + "idx_unique_partial_name", + users_xtra.c.name, + users_xtra.c.lets_index_this, + unique=True, + sqlite_where=users_xtra.c.lets_index_this == "unique_name", + ) + + cls.unique_constraint = schema.UniqueConstraint( + users_xtra.c.login_email, name="uq_login_email" + ) + cls.bogus_index = schema.Index( + "idx_special_ops", + users_xtra.c.lets_index_this, + sqlite_where=users_xtra.c.lets_index_this > "m", + ) + + def test_bad_args(self): + assert_raises( + ValueError, insert(self.tables.users).on_conflict_do_update + ) + + def test_on_conflict_do_nothing(self): + users = self.tables.users + + with testing.db.connect() as conn: + result = conn.execute( + insert(users).on_conflict_do_nothing(), + dict(id=1, name="name1"), + ) + eq_(result.inserted_primary_key, (1,)) + + result = conn.execute( + insert(users).on_conflict_do_nothing(), + dict(id=1, name="name2"), + ) + eq_(result.inserted_primary_key, (1,)) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, "name1")], + ) + + def test_on_conflict_do_nothing_connectionless(self, connection): + users = self.tables.users_xtra + + result = connection.execute( + insert(users).on_conflict_do_nothing(constraint="login_email"), + dict(name="name1", login_email="email1"), + ) + eq_(result.inserted_primary_key, (1,)) + + result = connection.execute( + insert(users).on_conflict_do_nothing(constraint="login_email"), + dict(name="name2", login_email="email1"), + ) + eq_(result.inserted_primary_key, (1,)) + + eq_( + connection.execute( + users.select().where(users.c.id == 1) + ).fetchall(), + [(1, "name1", "email1", None)], + ) + + @testing.provide_metadata + def test_on_conflict_do_nothing_target(self): + users = self.tables.users + + with testing.db.connect() as conn: + result = conn.execute( + insert(users).on_conflict_do_nothing( + index_elements=users.primary_key.columns + ), + dict(id=1, name="name1"), + ) + eq_(result.inserted_primary_key, (1,)) + + result = conn.execute( + insert(users).on_conflict_do_nothing( + index_elements=users.primary_key.columns + ), + dict(id=1, name="name2"), + ) + eq_(result.inserted_primary_key, (1,)) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, "name1")], + ) + + def test_on_conflict_do_update_one(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute(users.insert(), dict(id=1, name="name1")) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=[users.c.id], set_=dict(name=i.excluded.name) + ) + result = conn.execute(i, dict(id=1, name="name1")) + + eq_(result.inserted_primary_key, (1,)) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, "name1")], + ) + + def test_on_conflict_do_update_two(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute(users.insert(), dict(id=1, name="name1")) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=[users.c.id], + set_=dict(id=i.excluded.id, name=i.excluded.name), + ) + + result = conn.execute(i, dict(id=1, name="name2")) + eq_(result.inserted_primary_key, (1,)) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, "name2")], + ) + + def test_on_conflict_do_update_three(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute(users.insert(), dict(id=1, name="name1")) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=users.primary_key.columns, + set_=dict(name=i.excluded.name), + ) + result = conn.execute(i, dict(id=1, name="name3")) + eq_(result.inserted_primary_key, (1,)) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, "name3")], + ) + + def test_on_conflict_do_update_four(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute(users.insert(), dict(id=1, name="name1")) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=users.primary_key.columns, + set_=dict(id=i.excluded.id, name=i.excluded.name), + ).values(id=1, name="name4") + + result = conn.execute(i) + eq_(result.inserted_primary_key, (1,)) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, "name4")], + ) + + def test_on_conflict_do_update_five(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute(users.insert(), dict(id=1, name="name1")) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=users.primary_key.columns, + set_=dict(id=10, name="I'm a name"), + ).values(id=1, name="name4") + + result = conn.execute(i) + eq_(result.inserted_primary_key, (1,)) + + eq_( + conn.execute( + users.select().where(users.c.id == 10) + ).fetchall(), + [(10, "I'm a name")], + ) + + def test_on_conflict_do_update_multivalues(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute(users.insert(), dict(id=1, name="name1")) + conn.execute(users.insert(), dict(id=2, name="name2")) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=users.primary_key.columns, + set_=dict(name="updated"), + where=(i.excluded.name != "name12"), + ).values( + [ + dict(id=1, name="name11"), + dict(id=2, name="name12"), + dict(id=3, name="name13"), + dict(id=4, name="name14"), + ] + ) + + result = conn.execute(i) + eq_(result.inserted_primary_key, (None,)) + + eq_( + conn.execute(users.select().order_by(users.c.id)).fetchall(), + [(1, "updated"), (2, "name2"), (3, "name13"), (4, "name14")], + ) + + def _exotic_targets_fixture(self, conn): + users = self.tables.users_xtra + + conn.execute( + insert(users), + dict( + id=1, + name="name1", + login_email="name1@gmail.com", + lets_index_this="not", + ), + ) + conn.execute( + users.insert(), + dict( + id=2, + name="name2", + login_email="name2@gmail.com", + lets_index_this="not", + ), + ) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, "name1", "name1@gmail.com", "not")], + ) + + def test_on_conflict_do_update_exotic_targets_two(self): + users = self.tables.users_xtra + + with testing.db.connect() as conn: + self._exotic_targets_fixture(conn) + # try primary key constraint: cause an upsert on unique id column + i = insert(users) + i = i.on_conflict_do_update( + index_elements=users.primary_key.columns, + set_=dict( + name=i.excluded.name, login_email=i.excluded.login_email + ), + ) + result = conn.execute( + i, + dict( + id=1, + name="name2", + login_email="name1@gmail.com", + lets_index_this="not", + ), + ) + eq_(result.inserted_primary_key, (1,)) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, "name2", "name1@gmail.com", "not")], + ) + + def test_on_conflict_do_update_exotic_targets_three(self): + users = self.tables.users_xtra + + with testing.db.connect() as conn: + self._exotic_targets_fixture(conn) + # try unique constraint: cause an upsert on target + # login_email, not id + i = insert(users) + i = i.on_conflict_do_update( + constraint=self.unique_constraint, + set_=dict( + id=i.excluded.id, + name=i.excluded.name, + login_email=i.excluded.login_email, + ), + ) + # note: lets_index_this value totally ignored in SET clause. + result = conn.execute( + i, + dict( + id=42, + name="nameunique", + login_email="name2@gmail.com", + lets_index_this="unique", + ), + ) + eq_(result.inserted_primary_key, (42,)) + + eq_( + conn.execute( + users.select().where( + users.c.login_email == "name2@gmail.com" + ) + ).fetchall(), + [(42, "nameunique", "name2@gmail.com", "not")], + ) + + def test_on_conflict_do_update_exotic_targets_four(self): + users = self.tables.users_xtra + + with testing.db.connect() as conn: + self._exotic_targets_fixture(conn) + # try unique constraint by name: cause an + # upsert on target login_email, not id + i = insert(users) + i = i.on_conflict_do_update( + constraint="login_email", + set_=dict( + id=i.excluded.id, + name=i.excluded.name, + login_email=i.excluded.login_email, + ), + ) + # note: lets_index_this value totally ignored in SET clause. + + result = conn.execute( + i, + dict( + id=43, + name="nameunique2", + login_email="name2@gmail.com", + lets_index_this="unique", + ), + ) + eq_(result.inserted_primary_key, (43,)) + + eq_( + conn.execute( + users.select().where( + users.c.login_email == "name2@gmail.com" + ) + ).fetchall(), + [(43, "nameunique2", "name2@gmail.com", "not")], + ) + + def test_on_conflict_do_update_exotic_targets_four_no_pk(self): + users = self.tables.users_xtra + + with testing.db.connect() as conn: + self._exotic_targets_fixture(conn) + # try unique constraint by name: cause an + # upsert on target login_email, not id + i = insert(users) + i = i.on_conflict_do_update( + index_elements=[users.c.login_email], + set_=dict( + id=i.excluded.id, + name=i.excluded.name, + login_email=i.excluded.login_email, + ), + ) + + conn.execute(i, dict(name="name3", login_email="name1@gmail.com")) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [], + ) + + eq_( + conn.execute(users.select().order_by(users.c.id)).fetchall(), + [ + (2, "name2", "name2@gmail.com", "not"), + (3, "name3", "name1@gmail.com", "not"), + ], + ) + + def test_on_conflict_do_update_exotic_targets_five(self): + users = self.tables.users_xtra + + with testing.db.connect() as conn: + self._exotic_targets_fixture(conn) + # try bogus index + i = insert(users) + + i = i.on_conflict_do_update( + index_elements=self.bogus_index.columns, + index_where=self.bogus_index.dialect_options["sqlite"][ + "where" + ], + set_=dict( + name=i.excluded.name, login_email=i.excluded.login_email + ), + ) + + assert_raises( + exc.OperationalError, + conn.execute, + i, + dict( + id=1, + name="namebogus", + login_email="bogus@gmail.com", + lets_index_this="bogus", + ), + ) + + def test_on_conflict_do_update_exotic_targets_six(self): + users = self.tables.users_xtra + + with testing.db.connect() as conn: + conn.execute( + insert(users), + dict( + id=1, + name="name1", + login_email="mail1@gmail.com", + lets_index_this="unique_name", + ), + ) + i = insert(users) + i = i.on_conflict_do_update( + index_elements=self.unique_partial_index.columns, + index_where=self.unique_partial_index.dialect_options[ + "sqlite" + ]["where"], + set_=dict( + name=i.excluded.name, login_email=i.excluded.login_email + ), + ) + + conn.execute( + i, + [ + dict( + name="name1", + login_email="mail2@gmail.com", + lets_index_this="unique_name", + ) + ], + ) + + eq_( + conn.execute(users.select()).fetchall(), + [(1, "name1", "mail2@gmail.com", "unique_name")], + ) + + def test_on_conflict_do_update_no_row_actually_affected(self): + users = self.tables.users_xtra + + with testing.db.connect() as conn: + self._exotic_targets_fixture(conn) + i = insert(users) + i = i.on_conflict_do_update( + index_elements=[users.c.login_email], + set_=dict(name="new_name"), + where=(i.excluded.name == "other_name"), + ) + result = conn.execute( + i, dict(name="name2", login_email="name1@gmail.com") + ) + + # The last inserted primary key should be 2 here + # it is taking the result from the the exotic fixture + eq_(result.inserted_primary_key, (2,)) + + eq_( + conn.execute(users.select()).fetchall(), + [ + (1, "name1", "name1@gmail.com", "not"), + (2, "name2", "name2@gmail.com", "not"), + ], + ) + + def test_on_conflict_do_update_special_types_in_set(self): + bind_targets = self.tables.bind_targets + + with testing.db.connect() as conn: + i = insert(bind_targets) + conn.execute(i, {"id": 1, "data": "initial data"}) + + eq_( + conn.scalar(sql.select(bind_targets.c.data)), + "initial data processed", + ) + + i = insert(bind_targets) + i = i.on_conflict_do_update( + index_elements=[bind_targets.c.id], + set_=dict(data="new updated data"), + ) + conn.execute(i, {"id": 1, "data": "new inserted data"}) + + eq_( + conn.scalar(sql.select(bind_targets.c.data)), + "new updated data processed", + )