From: Robin Thomas Date: Thu, 14 Apr 2016 16:57:15 +0000 (-0400) Subject: Add ON CONFLICT support for Postgresql X-Git-Tag: rel_1_1_0b1~8^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4e9ab7a72f0ad506cf519069fd67127f63e5f2aa;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Add ON CONFLICT support for Postgresql Fixes: #3529 Co-authored-by: Mike Bayer Change-Id: Ie3bf6ad70d9be9f0e44938830e922db03573991a Pull-request: https://github.com/zzzeek/sqlalchemy/pull/258 --- diff --git a/doc/build/changelog/changelog_11.rst b/doc/build/changelog/changelog_11.rst index 8d20ef257d..6151ec3ffe 100644 --- a/doc/build/changelog/changelog_11.rst +++ b/doc/build/changelog/changelog_11.rst @@ -21,6 +21,18 @@ .. changelog:: :version: 1.1.0b1 + .. change:: + :tags: feature, postgresql + :tickets: 3529 + + Added support for Postgresql's INSERT..ON CONFLICT using a new + Postgresql-specific :class:`.postgresql.dml.Insert` object. + Pull request and extensive efforts here by Robin Thomas. + + .. seealso:: + + :ref:`change_3529` + .. change:: :tags: feature, postgresql :pullreq: bitbucket:84 diff --git a/doc/build/changelog/migration_11.rst b/doc/build/changelog/migration_11.rst index 73483f3dbc..ea932b5092 100644 --- a/doc/build/changelog/migration_11.rst +++ b/doc/build/changelog/migration_11.rst @@ -2026,6 +2026,42 @@ necessary to worry about the names themselves in the textual SQL. Dialect Improvements and Changes - Postgresql ============================================= +.. _change_3529: + +Support for INSERT..ON CONFLICT (DO UPDATE | DO NOTHING) +-------------------------------------------------------- + +The ``ON CONFLICT`` clause of ``INSERT`` added to Postgresql as of +version 9.5 is now supported using a Postgresql-specific version of the +:class:`.Insert` object, via :func:`sqlalchemy.dialects.postgresql.dml.insert`. +This :class:`.Insert` subclass adds two new methods :meth:`.Insert.on_conflict_do_update` +and :meth:`.Insert.on_conflict_do_nothing` which implement the full syntax +supported by Posgresql 9.5 in this area:: + + from sqlalchemy.dialects.postgresql import insert + + insert_stmt = insert(my_table). \\ + values(id='some_id', data='some data to insert') + + do_update_stmt = insert_stmt.on_conflict_do_update( + index_elements=[my_table.c.id], + set_=dict(data='some data to update') + ) + + conn.execute(do_update_stmt) + +The above will render:: + + INSERT INTO my_table (id, data) + VALUES (:id, :data) + ON CONFLICT id DO UPDATE SET data=:data_2 + +.. seealso:: + + :ref:`postgresql_insert_on_conflict` + +:ticket:`3529` + .. _change_3499_postgresql: ARRAY and JSON types now correctly specify "unhashable" diff --git a/doc/build/dialects/postgresql.rst b/doc/build/dialects/postgresql.rst index b4c90643d9..56b14a8d05 100644 --- a/doc/build/dialects/postgresql.rst +++ b/doc/build/dialects/postgresql.rst @@ -181,6 +181,14 @@ For example:: ExcludeConstraint(('room', '='), ('during', '&&')), ) +PostgreSQL DML Constructs +--------------------------- + +.. autofunction:: sqlalchemy.dialects.postgresql.dml.insert + +.. autoclass:: sqlalchemy.dialects.postgresql.dml.Insert + :members: + psycopg2 -------- diff --git a/lib/sqlalchemy/dialects/postgresql/__init__.py b/lib/sqlalchemy/dialects/postgresql/__init__.py index ffd100f675..fae1836212 100644 --- a/lib/sqlalchemy/dialects/postgresql/__init__.py +++ b/lib/sqlalchemy/dialects/postgresql/__init__.py @@ -19,6 +19,7 @@ from .hstore import HSTORE, hstore from .json import JSON, JSONB from .array import array, ARRAY, Any, All from .ext import aggregate_order_by, ExcludeConstraint, array_agg +from .dml import insert, Insert from .ranges import INT4RANGE, INT8RANGE, NUMRANGE, DATERANGE, TSRANGE, \ TSTZRANGE @@ -31,5 +32,5 @@ __all__ = ( 'hstore', 'INT4RANGE', 'INT8RANGE', 'NUMRANGE', 'DATERANGE', 'TSRANGE', 'TSTZRANGE', 'json', 'JSON', 'JSONB', 'Any', 'All', 'DropEnumType', 'CreateEnumType', 'ExcludeConstraint', - 'aggregate_order_by', 'array_agg' + 'aggregate_order_by', 'array_agg', 'insert', 'Insert' ) diff --git a/lib/sqlalchemy/dialects/postgresql/base.py b/lib/sqlalchemy/dialects/postgresql/base.py index 16b22129a2..688ce9e1c5 100644 --- a/lib/sqlalchemy/dialects/postgresql/base.py +++ b/lib/sqlalchemy/dialects/postgresql/base.py @@ -48,14 +48,14 @@ Transaction Isolation Level --------------------------- All Postgresql dialects support setting of transaction isolation level -both via a dialect-specific parameter :paramref:`.create_engine.isolation_level` -accepted by :func:`.create_engine`, -as well as the :paramref:`.Connection.execution_options.isolation_level` argument as passed to -:meth:`.Connection.execution_options`. When using a non-psycopg2 dialect, -this feature works by issuing the command +both via a dialect-specific parameter +:paramref:`.create_engine.isolation_level` accepted by :func:`.create_engine`, +as well as the :paramref:`.Connection.execution_options.isolation_level` +argument as passed to :meth:`.Connection.execution_options`. +When using a non-psycopg2 dialect, this feature works by issuing the command ``SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL `` for -each new connection. For the special AUTOCOMMIT isolation level, DBAPI-specific -techniques are used. +each new connection. For the special AUTOCOMMIT isolation level, +DBAPI-specific techniques are used. To set isolation level using :func:`.create_engine`:: @@ -247,6 +247,197 @@ use the :meth:`._UpdateBase.returning` method on a per-statement basis:: where(table.c.name=='foo') print result.fetchall() +.. _postgresql_insert_on_conflict: + +INSERT...ON CONFLICT (Upsert) +------------------------------ + +Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) +of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. +A candidate row will only be inserted if that row does not violate +any unique constraints. In the case of a unique constraint violation, +a secondary action can occur which can be either "DO UPDATE", indicating +that the data in the target row should be updated, or "DO NOTHING", +which indicates to silently skip this row. + +Conflicts are determined using existing unique constraints and indexes. These +constraints may be identified either using their name as stated in DDL, +or they may be *inferred* by stating the columns and conditions that comprise +the indexes. + +SQLAlchemy provides ``ON CONFLICT`` support via the Postgresql-specific +:func:`.postgresql.dml.insert()` function, which provides +the generative methods :meth:`~.postgresql.dml.Insert.on_conflict_do_update` +and :meth:`~.postgresql.dml.Insert.on_conflict_do_nothing`:: + + from sqlalchemy.dialects.postgresql import insert + + insert_stmt = insert(my_table).values( + id='some_existing_id', + data='inserted value') + + do_nothing_stmt = insert_stmt.on_conflict_do_nothing( + index_elements=['id'] + ) + + conn.execute(do_nothing_stmt) + + do_update_stmt = insert_stmt.on_conflict_do_update( + constraint='pk_my_table', + set_=dict(data='updated value') + ) + + conn.execute(do_update_stmt) + +Both methods supply the "target" of the conflict using either the +named constraint or by column inference: + +* The :paramref:`.Insert.on_conflict_do_update.index_elements` argument + specifies a sequence containing string column names, :class:`.Column` objects, + and/or SQL expression elements, which would identify a unique index:: + + do_update_stmt = insert_stmt.on_conflict_do_update( + index_elements=['id'], + set_=dict(data='updated value') + ) + + do_update_stmt = insert_stmt.on_conflict_do_update( + index_elements=[my_table.c.id], + set_=dict(data='updated value') + ) + +* When using :paramref:`.Insert.on_conflict_do_update.index_elements` to + infer an index, a partial index can be inferred by also specifying the + use the :paramref:`.Insert.on_conflict_do_update.index_where` parameter:: + + from sqlalchemy.dialects.postgresql import insert + + stmt = insert(my_table).values(user_email='a@b.com', data='inserted data') + stmt = stmt.on_conflict_do_update( + index_elements=[my_table.c.user_email], + index_where=my_table.c.user_email.like('%@gmail.com'), + set_=dict(data=stmt.excluded.data) + ) + conn.execute(stmt) + + +* The :paramref:`.Insert.on_conflict_do_update.constraint` argument is + used to specify an index directly rather than inferring it. This can be + the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX:: + + do_update_stmt = insert_stmt.on_conflict_do_update( + constraint='my_table_idx_1', + set_=dict(data='updated value') + ) + + do_update_stmt = insert_stmt.on_conflict_do_update( + constraint='my_table_pk', + set_=dict(data='updated value') + ) + +* The :paramref:`.Insert.on_conflict_do_update.constraint` argument may + also refer to a SQLAlchemy construct representing a constraint, + e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`, + :class:`.Index`, or :class:`.ExcludeConstraint`. In this use, + if the constraint has a name, it is used directly. Otherwise, if the + constraint is unnamed, then inference will be used, where the expressions + and optional WHERE clause of the constraint will be spelled out in the + construct. This use is especially convenient + to refer to the named or unnamed primary key of a :class:`.Table` using the + :attr:`.Table.primary_key` attribute:: + + do_update_stmt = insert_stmt.on_conflict_do_update( + constraint=my_table.primary_key, + set_=dict(data='updated value') + ) + +``ON CONFLICT...DO UPDATE`` is used to perform an update of the already +existing row, using any combination of new values as well as values +from the proposed insertion. These values are specified using the +:paramref:`.Insert.on_conflict_do_update.set_` parameter. This +parameter accepts a dictionary which consists of direct values +for UPDATE:: + + from sqlalchemy.dialects.postgresql import insert + + stmt = insert(my_table).values(id='some_id', data='inserted value') + do_update_stmt = stmt.on_conflict_do_update( + index_elements=['id'], + set_=dict(data='updated value') + ) + conn.execute(do_update_stmt) + +.. warning:: + + The :meth:`.Insert.on_conflict_do_update` method does **not** take into + account Python-side default UPDATE values or generation functions, e.g. + e.g. those specified using :paramref:`.Column.onupdate`. + These values will not be exercised for an ON CONFLICT style of UPDATE, + unless they are manually specified in the + :paramref:`.Insert.on_conflict_do_update.set_` dictionary. + +In order to refer to the proposed insertion row, the special alias +:attr:`~.postgresql.dml.Insert.excluded` is available as an attribute on +the :class:`.postgresql.dml.Insert` object; this object is a +:class:`.ColumnCollection` which alias contains all columns of the target +table:: + + from sqlalchemy.dialects.postgresql import insert + + stmt = insert(my_table).values( + id='some_id', + data='inserted value', + author='jlh') + do_update_stmt = stmt.on_conflict_do_update( + index_elements=['id'], + set_=dict(data='updated value', author=stmt.excluded.author) + ) + conn.execute(do_update_stmt) + +The :meth:`.Insert.on_conflict_do_update` method also accepts +a WHERE clause using the :paramref:`.Insert.on_conflict_do_update.where` +parameter, which will limit those rows which receive an UPDATE:: + + from sqlalchemy.dialects.postgresql import insert + + stmt = insert(my_table).values( + id='some_id', + data='inserted value', + author='jlh') + on_update_stmt = stmt.on_conflict_do_update( + index_elements=['id'], + set_=dict(data='updated value', author=stmt.excluded.author) + where=(my_table.c.status == 2) + ) + conn.execute(on_update_stmt) + +``ON CONFLICT`` may also be used to skip inserting a row entirely +if any conflict with a unique or exclusion constraint occurs; below +this is illustrated using the +:meth:`~.postgresql.dml.Insert.on_conflict_do_nothing` method:: + + from sqlalchemy.dialects.postgresql import insert + + stmt = insert(my_table).values(id='some_id', data='inserted value') + stmt = stmt.on_conflict_do_nothing(index_elements=['id']) + conn.execute(stmt) + +If ``DO NOTHING`` is used without specifying any columns or constraint, +it has the effect of skipping the INSERT for any unique or exclusion +constraint violation which occurs:: + + from sqlalchemy.dialects.postgresql import insert + + stmt = insert(my_table).values(id='some_id', data='inserted value') + stmt = stmt.on_conflict_do_nothing() + conn.execute(stmt) + +.. versionadded:: 1.1 Added support for Postgresql ON CONFLICT clauses + +.. seealso:: + + `INSERT .. ON CONFLICT `_ - in the Postgresql documentation. + .. _postgresql_match: Full Text Search @@ -354,6 +545,8 @@ Postgresql-Specific Index Options Several extensions to the :class:`.Index` construct are available, specific to the PostgreSQL dialect. +.. _postgresql_partial_indexes: + Partial Indexes ^^^^^^^^^^^^^^^^ @@ -663,7 +856,6 @@ _DECIMAL_TYPES = (1231, 1700) _FLOAT_TYPES = (700, 701, 1021, 1022) _INT_TYPES = (20, 21, 23, 26, 1005, 1007, 1016) - class BYTEA(sqltypes.LargeBinary): __visit_name__ = 'BYTEA' @@ -1223,6 +1415,68 @@ class PGCompiler(compiler.SQLCompiler): else: return "SUBSTRING(%s FROM %s)" % (s, start) + def _on_conflict_target(self, clause, **kw): + + if clause.constraint_target is not None: + target_text = 'ON CONSTRAINT %s' % clause.constraint_target + elif clause.inferred_target_elements is not None: + target_text = '(%s)' % ', '.join( + (self.preparer.quote(c) + if isinstance(c, util.string_types) + else + self.process(c, include_table=False, use_schema=False)) + for c in clause.inferred_target_elements + ) + if clause.inferred_target_whereclause is not None: + target_text += ' WHERE %s' % \ + self.process( + clause.inferred_target_whereclause, + include_table=False, + use_schema=False + ) + else: + target_text = '' + + return target_text + + def visit_on_conflict_do_nothing(self, on_conflict, **kw): + + target_text = self._on_conflict_target(on_conflict, **kw) + + if target_text: + return "ON CONFLICT %s DO NOTHING" % target_text + else: + return "ON CONFLICT DO NOTHING" + + def visit_on_conflict_do_update(self, on_conflict, **kw): + + clause = on_conflict + + target_text = self._on_conflict_target(on_conflict, **kw) + + action_set_ops = [] + for k, v in clause.update_values_to_set: + key_text = ( + self.preparer.quote(k) + if isinstance(k, util.string_types) + else self.process(k, use_schema=False) + ) + value_text = self.process( + v, + use_schema=False + ) + action_set_ops.append('%s = %s' % (key_text, value_text)) + action_text = ', '.join(action_set_ops) + if clause.update_whereclause is not None: + action_text += ' WHERE %s' % \ + self.process( + clause.update_whereclause, + include_table=False, + use_schema=False + ) + + return 'ON CONFLICT %s DO UPDATE SET %s' % (target_text, action_text) + class PGDDLCompiler(compiler.DDLCompiler): @@ -1706,7 +1960,7 @@ class PGDialect(default.DefaultDialect): "with_oids": None, "on_commit": None, "inherits": None - }) + }), ] reflection_options = ('postgresql_ignore_search_path', ) diff --git a/lib/sqlalchemy/dialects/postgresql/dml.py b/lib/sqlalchemy/dialects/postgresql/dml.py new file mode 100644 index 0000000000..e8e6cd165b --- /dev/null +++ b/lib/sqlalchemy/dialects/postgresql/dml.py @@ -0,0 +1,211 @@ +# postgresql/on_conflict.py +# Copyright (C) 2005-2016 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: http://www.opensource.org/licenses/mit-license.php + +from ...sql.elements import ClauseElement, _literal_as_binds +from ...sql.dml import Insert as StandardInsert +from ...sql.expression import alias +from ...sql import schema +from ...util.langhelpers import public_factory +from ...sql.base import _generative +from ... import util +from . import ext + +__all__ = ('Insert', 'insert') + + +class Insert(StandardInsert): + """Postgresql-specific implementation of INSERT. + + Adds methods for PG-specific syntaxes such as ON CONFLICT. + + .. versionadded:: 1.1 + + """ + + @util.memoized_property + def excluded(self): + """Provide the ``excluded`` namespace for an ON CONFLICT statement + + PG's ON CONFLICT clause allows reference to the row that would + be inserted, known as ``excluded``. This attribute provides + all columns in this row to be referenaceable. + + .. seealso:: + + :ref:`postgresql_insert_on_conflict` - example of how + to use :attr:`.Insert.excluded` + + """ + return alias(self.table, name='excluded').columns + + @_generative + def on_conflict_do_update( + self, + constraint=None, index_elements=None, + index_where=None, set_=None, where=None): + """ + Specifies a DO UPDATE SET action for ON CONFLICT clause. + + Either the ``constraint`` or ``index_elements`` argument is + required, but only one of these can be specified. + + :param constraint: + The name of a unique or exclusion constraint on the table, + or the constraint object itself if it has a .name attribute. + + :param index_elements: + A sequence consisting of string column names, :class:`.Column` + objects, or other column expression objects that will be used + to infer a target index. + + :param index_where: + Additional WHERE criterion that can be used to infer a + conditional target index. + + :param set_: + Required argument. A dictionary or other mapping object + with column names as keys and expressions or literals as values, + specifying the ``SET`` actions to take. + + .. warning:: This dictionary does **not** take into account + Python-specified default UPDATE values or generation functions, + e.g. those specified using :paramref:`.Column.onupdate`. + These values will not be exercised for an ON CONFLICT style of + UPDATE, unless they are manually specified in the + :paramref:`.Insert.on_conflict_do_update.set_` dictionary. + + :param where: + Optional argument. If present, can be a literal SQL + string or an acceptable expression for a ``WHERE`` clause + that restricts the rows affected by ``DO UPDATE SET``. Rows + not meeting the ``WHERE`` condition will not be updated + (effectively a ``DO NOTHING`` for those rows). + + .. versionadded:: 1.1 + + .. seealso:: + + :ref:`postgresql_insert_on_conflict` + + """ + self._post_values_clause = OnConflictDoUpdate( + constraint, index_elements, index_where, set_, where) + return self + + @_generative + def on_conflict_do_nothing( + self, + constraint=None, index_elements=None, index_where=None): + """ + Specifies a DO NOTHING action for ON CONFLICT clause. + + The ``constraint`` and ``index_elements`` arguments + are optional, but only one of these can be specified. + + :param constraint: + The name of a unique or exclusion constraint on the table, + or the constraint object itself if it has a .name attribute. + + :param index_elements: + A sequence consisting of string column names, :class:`.Column` + objects, or other column expression objects that will be used + to infer a target index. + + :param index_where: + Additional WHERE criterion that can be used to infer a + conditional target index. + + .. versionadded:: 1.1 + + .. seealso:: + + :ref:`postgresql_insert_on_conflict` + + """ + self._post_values_clause = OnConflictDoNothing( + constraint, index_elements, index_where) + return self + +insert = public_factory(Insert, '.dialects.postgresql.insert') + + +class OnConflictClause(ClauseElement): + def __init__( + self, + constraint=None, + index_elements=None, + index_where=None): + + if constraint is not None: + if not isinstance(constraint, util.string_types) and \ + isinstance(constraint, ( + schema.Index, schema.Constraint, + ext.ExcludeConstraint)): + constraint = getattr(constraint, 'name') or constraint + + if constraint is not None: + if index_elements is not None: + raise ValueError( + "'constraint' and 'index_elements' are mutually exclusive") + + if isinstance(constraint, util.string_types): + self.constraint_target = constraint + self.inferred_target_elements = None + self.inferred_target_whereclause = None + elif isinstance(constraint, schema.Index): + index_elements = constraint.expressions + index_where = \ + constraint.dialect_options['postgresql'].get("where") + elif isinstance(constraint, ext.ExcludeConstraint): + index_elements = constraint.columns + index_where = constraint.where + else: + index_elements = constraint.columns + index_where = \ + constraint.dialect_options['postgresql'].get("where") + + if index_elements is not None: + self.constraint_target = None + self.inferred_target_elements = index_elements + self.inferred_target_whereclause = index_where + elif constraint is None: + self.constraint_target = self.inferred_target_elements = \ + self.inferred_target_whereclause = None + + +class OnConflictDoNothing(OnConflictClause): + __visit_name__ = 'on_conflict_do_nothing' + + +class OnConflictDoUpdate(OnConflictClause): + __visit_name__ = 'on_conflict_do_update' + + def __init__( + self, + constraint=None, + index_elements=None, + index_where=None, + set_=None, + where=None): + super(OnConflictDoUpdate, self).__init__( + constraint=constraint, + index_elements=index_elements, + index_where=index_where) + + if self.inferred_target_elements is None and \ + self.constraint_target is None: + raise ValueError( + "Either constraint or index_elements, " + "but not both, must be specified unless DO NOTHING") + + if (not isinstance(set_, dict) or not set_): + raise ValueError("set parameter must be a non-empty dictionary") + self.update_values_to_set = [ + (key, _literal_as_binds(value)) + for key, value in set_.items() + ] + self.update_whereclause = where diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py index 6d9ab9039f..94c7db20a9 100644 --- a/lib/sqlalchemy/sql/compiler.py +++ b/lib/sqlalchemy/sql/compiler.py @@ -1984,6 +1984,12 @@ class SQLCompiler(Compiled): text += " VALUES (%s)" % \ ', '.join([c[1] for c in crud_params]) + if insert_stmt._post_values_clause is not None: + post_values_clause = self.process( + insert_stmt._post_values_clause, **kw) + if post_values_clause: + text += " " + post_values_clause + if returning_clause and not self.returning_precedes_values: text += " " + returning_clause diff --git a/lib/sqlalchemy/sql/dml.py b/lib/sqlalchemy/sql/dml.py index 8f368dcdbb..b54b4792de 100644 --- a/lib/sqlalchemy/sql/dml.py +++ b/lib/sqlalchemy/sql/dml.py @@ -194,6 +194,7 @@ class ValuesBase(UpdateBase): _has_multi_parameters = False _preserve_parameter_order = False select = None + _post_values_clause = None def __init__(self, table, values, prefixes): self.table = _interpret_as_from(table) diff --git a/test/dialect/postgresql/test_compiler.py b/test/dialect/postgresql/test_compiler.py index f85ff2682c..88110ba2df 100644 --- a/test/dialect/postgresql/test_compiler.py +++ b/test/dialect/postgresql/test_compiler.py @@ -5,7 +5,7 @@ from sqlalchemy.testing.assertions import AssertsCompiledSQL, is_, \ from sqlalchemy.testing import engines, fixtures from sqlalchemy import testing from sqlalchemy import Sequence, Table, Column, Integer, update, String,\ - insert, func, MetaData, Enum, Index, and_, delete, select, cast, text, \ + func, MetaData, Enum, Index, and_, delete, select, cast, text, \ Text from sqlalchemy.dialects.postgresql import ExcludeConstraint, array from sqlalchemy import exc, schema @@ -14,9 +14,8 @@ from sqlalchemy.dialects.postgresql import TSRANGE from sqlalchemy.orm import mapper, aliased, Session from sqlalchemy.sql import table, column, operators, literal_column from sqlalchemy.sql import util as sql_util -from sqlalchemy.util import u -from sqlalchemy.dialects.postgresql import aggregate_order_by - +from sqlalchemy.util import u, OrderedDict +from sqlalchemy.dialects.postgresql import aggregate_order_by, insert class SequenceTest(fixtures.TestBase, AssertsCompiledSQL): __prefer__ = 'postgresql' @@ -186,7 +185,6 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): schema_translate_map=schema_translate_map ) - def test_create_table_with_tablespace(self): m = MetaData() tbl = Table( @@ -1035,6 +1033,254 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): ) +class InsertOnConflictTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = postgresql.dialect() + + def setup(self): + self.table1 = table1 = table( + 'mytable', + column('myid', Integer), + column('name', String(128)), + column('description', String(128)), + ) + md = MetaData() + self.table_with_metadata = Table( + 'mytable', md, + Column('myid', Integer, primary_key=True), + Column('name', String(128)), + Column('description', String(128)) + ) + self.unique_constr = schema.UniqueConstraint( + table1.c.name, name='uq_name') + self.excl_constr = ExcludeConstraint( + (table1.c.name, '='), + (table1.c.description, '&&'), + name='excl_thing' + ) + self.excl_constr_anon = ExcludeConstraint( + (self.table_with_metadata.c.name, '='), + (self.table_with_metadata.c.description, '&&'), + where=self.table_with_metadata.c.description != 'foo' + ) + self.goofy_index = Index( + 'goofy_index', table1.c.name, + postgresql_where=table1.c.name > 'm' + ) + + def test_do_nothing_no_target(self): + + i = insert( + self.table1, values=dict(name='foo'), + ).on_conflict_do_nothing() + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + '(%(name)s) ON CONFLICT DO NOTHING') + + def test_do_nothing_index_elements_target(self): + + i = insert( + self.table1, values=dict(name='foo'), + ).on_conflict_do_nothing( + index_elements=['myid'], + ) + self.assert_compile( + i, + "INSERT INTO mytable (name) VALUES " + "(%(name)s) ON CONFLICT (myid) DO NOTHING" + ) + + def test_do_update_set_clause_literal(self): + i = insert(self.table_with_metadata).values(myid=1, name='foo') + i = i.on_conflict_do_update( + index_elements=['myid'], + set_=OrderedDict([ + ('name', "I'm a name"), + ('description', None)]) + ) + self.assert_compile( + i, + 'INSERT INTO mytable (myid, name) VALUES ' + '(%(myid)s, %(name)s) ON CONFLICT (myid) ' + 'DO UPDATE SET name = %(param_1)s, ' + 'description = NULL', + {"myid": 1, "name": "foo", "param_1": "I'm a name"} + + ) + + def test_do_update_str_index_elements_target_one(self): + i = insert(self.table_with_metadata).values(myid=1, name='foo') + i = i.on_conflict_do_update( + index_elements=['myid'], + set_=OrderedDict([ + ('name', i.excluded.name), + ('description', i.excluded.description)]) + ) + self.assert_compile(i, + 'INSERT INTO mytable (myid, name) VALUES ' + '(%(myid)s, %(name)s) ON CONFLICT (myid) ' + 'DO UPDATE SET name = excluded.name, ' + 'description = excluded.description') + + def test_do_update_str_index_elements_target_two(self): + i = insert( + self.table1, values=dict(name='foo')) + i = i.on_conflict_do_update( + index_elements=['myid'], + set_=dict(name=i.excluded.name) + ) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + '(%(name)s) ON CONFLICT (myid) ' + 'DO UPDATE SET name = excluded.name') + + def test_do_update_col_index_elements_target(self): + i = insert( + self.table1, values=dict(name='foo')) + i = i.on_conflict_do_update( + index_elements=[self.table1.c.myid], + set_=dict(name=i.excluded.name) + ) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + '(%(name)s) ON CONFLICT (myid) ' + 'DO UPDATE SET name = excluded.name') + + def test_do_update_unnamed_pk_constraint_target(self): + i = insert( + self.table_with_metadata, values=dict(myid=1, name='foo')) + i = i.on_conflict_do_update( + constraint=self.table_with_metadata.primary_key, + set_=dict(name=i.excluded.name) + ) + self.assert_compile(i, + 'INSERT INTO mytable (myid, name) VALUES ' + '(%(myid)s, %(name)s) ON CONFLICT (myid) ' + 'DO UPDATE SET name = excluded.name') + + def test_do_update_pk_constraint_index_elements_target(self): + i = insert( + self.table_with_metadata, values=dict(myid=1, name='foo')) + i = i.on_conflict_do_update( + index_elements=self.table_with_metadata.primary_key, + set_=dict(name=i.excluded.name) + ) + self.assert_compile(i, + 'INSERT INTO mytable (myid, name) VALUES ' + '(%(myid)s, %(name)s) ON CONFLICT (myid) ' + 'DO UPDATE SET name = excluded.name') + + def test_do_update_named_unique_constraint_target(self): + i = insert( + self.table1, values=dict(name='foo')) + i = i.on_conflict_do_update( + constraint=self.unique_constr, + set_=dict(myid=i.excluded.myid) + ) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + '(%(name)s) ON CONFLICT ON CONSTRAINT uq_name ' + 'DO UPDATE SET myid = excluded.myid') + + def test_do_update_string_constraint_target(self): + i = insert( + self.table1, values=dict(name='foo')) + i = i.on_conflict_do_update( + constraint=self.unique_constr.name, + set_=dict(myid=i.excluded.myid) + ) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + '(%(name)s) ON CONFLICT ON CONSTRAINT uq_name ' + 'DO UPDATE SET myid = excluded.myid') + + def test_do_update_index_elements_where_target(self): + i = insert( + self.table1, values=dict(name='foo')) + i = i.on_conflict_do_update( + index_elements=self.goofy_index.expressions, + index_where=self.goofy_index.dialect_options[ + 'postgresql']['where'], + set_=dict(name=i.excluded.name) + ) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + "(%(name)s) ON CONFLICT (name) " + "WHERE name > %(name_1)s " + 'DO UPDATE SET name = excluded.name') + + def test_do_update_unnamed_index_target(self): + i = insert( + self.table1, values=dict(name='foo')) + + unnamed_goofy = Index( + None, self.table1.c.name, + postgresql_where=self.table1.c.name > 'm' + ) + + i = i.on_conflict_do_update( + constraint=unnamed_goofy, + set_=dict(name=i.excluded.name) + ) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + "(%(name)s) ON CONFLICT (name) " + "WHERE name > %(name_1)s " + 'DO UPDATE SET name = excluded.name') + + def test_do_update_unnamed_exclude_constraint_target(self): + i = insert( + self.table1, values=dict(name='foo')) + i = i.on_conflict_do_update( + constraint=self.excl_constr_anon, + set_=dict(name=i.excluded.name) + ) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + "(%(name)s) ON CONFLICT (name, description) " + "WHERE description != %(description_1)s " + 'DO UPDATE SET name = excluded.name') + + def test_do_update_add_whereclause(self): + i = insert( + self.table1, values=dict(name='foo')) + i = i.on_conflict_do_update( + constraint=self.excl_constr_anon, + set_=dict(name=i.excluded.name), + where=( + (self.table1.c.name != 'brah') & + (self.table1.c.description != 'brah')) + ) + self.assert_compile(i, + 'INSERT INTO mytable (name) VALUES ' + "(%(name)s) ON CONFLICT (name, description) " + "WHERE description != %(description_1)s " + 'DO UPDATE SET name = excluded.name ' + "WHERE name != %(name_1)s " + "AND description != %(description_2)s") + + def test_quote_raw_string_col(self): + t = table('t', column("FancyName"), column("other name")) + + stmt = insert(t).values(FancyName='something new').\ + on_conflict_do_update( + index_elements=['FancyName', 'other name'], + set_=OrderedDict([ + ("FancyName", 'something updated'), + ("other name", "something else") + ]) + ) + + self.assert_compile( + stmt, + 'INSERT INTO t ("FancyName") VALUES (%(FancyName)s) ' + 'ON CONFLICT ("FancyName", "other name") ' + 'DO UPDATE SET "FancyName" = %(param_1)s, ' + '"other name" = %(param_2)s', + {'param_1': 'something updated', + 'param_2': 'something else', 'FancyName': 'something new'} + ) + + class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL): """Test 'DISTINCT' with SQL expression language and orm.Query with diff --git a/test/dialect/postgresql/test_on_conflict.py b/test/dialect/postgresql/test_on_conflict.py new file mode 100644 index 0000000000..201287d628 --- /dev/null +++ b/test/dialect/postgresql/test_on_conflict.py @@ -0,0 +1,312 @@ +# coding: utf-8 + +from sqlalchemy.testing.assertions import eq_, assert_raises +from sqlalchemy.testing import fixtures +from sqlalchemy import testing +from sqlalchemy import Table, Column, Integer, String +from sqlalchemy import exc, schema +from sqlalchemy.dialects.postgresql import insert + + +class OnConflictTest(fixtures.TablesTest): + + __only_on__ = 'postgresql >= 9.5', + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + 'users', metadata, + Column('id', Integer, primary_key=True), + Column('name', String(50)) + ) + + users_xtra = Table( + 'users_xtra', metadata, + Column('id', Integer, primary_key=True), + Column('name', String(50)), + Column('login_email', String(50)), + Column('lets_index_this', String(50)) + ) + cls.unique_constraint = schema.UniqueConstraint( + users_xtra.c.login_email, name='uq_login_email') + cls.bogus_index = schema.Index( + 'idx_special_ops', + users_xtra.c.lets_index_this, + postgresql_where=users_xtra.c.lets_index_this > 'm') + + def test_bad_args(self): + assert_raises( + ValueError, + insert(self.tables.users).on_conflict_do_nothing, + constraint='id', index_elements=['id'] + ) + assert_raises( + ValueError, + insert(self.tables.users).on_conflict_do_update, + constraint='id', index_elements=['id'] + ) + assert_raises( + ValueError, + insert(self.tables.users).on_conflict_do_update, constraint='id' + ) + assert_raises( + ValueError, + insert(self.tables.users).on_conflict_do_update + ) + + def test_on_conflict_do_nothing(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute( + insert(users).on_conflict_do_nothing(), + dict(id=1, name='name1') + ) + conn.execute( + insert(users).on_conflict_do_nothing(), + dict(id=1, name='name2') + ) + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, 'name1')] + ) + + @testing.provide_metadata + def test_on_conflict_do_nothing_target(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute( + insert(users) + .on_conflict_do_nothing( + index_elements=users.primary_key.columns), + dict(id=1, name='name1') + ) + conn.execute( + insert(users) + .on_conflict_do_nothing( + index_elements=users.primary_key.columns), + dict(id=1, name='name2') + ) + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, 'name1')] + ) + + def test_on_conflict_do_update_one(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute(users.insert(), dict(id=1, name='name1')) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=[users.c.id], + set_=dict(name=i.excluded.name)) + conn.execute(i, dict(id=1, name='name1')) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, 'name1')] + ) + + def test_on_conflict_do_update_two(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute(users.insert(), dict(id=1, name='name1')) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=[users.c.id], + set_=dict(id=i.excluded.id, name=i.excluded.name) + ) + + conn.execute(i, dict(id=1, name='name2')) + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, 'name2')] + ) + + def test_on_conflict_do_update_three(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute(users.insert(), dict(id=1, name='name1')) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=users.primary_key.columns, + set_=dict(name=i.excluded.name) + ) + conn.execute(i, dict(id=1, name='name3')) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, 'name3')] + ) + + def test_on_conflict_do_update_four(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute(users.insert(), dict(id=1, name='name1')) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=users.primary_key.columns, + set_=dict(id=i.excluded.id, name=i.excluded.name) + ).values(id=1, name='name4') + + conn.execute(i) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, 'name4')] + ) + + def test_on_conflict_do_update_five(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute(users.insert(), dict(id=1, name='name1')) + + i = insert(users) + i = i.on_conflict_do_update( + index_elements=users.primary_key.columns, + set_=dict(id=10, name="I'm a name") + ).values(id=1, name='name4') + + conn.execute(i) + + eq_( + conn.execute( + users.select().where(users.c.id == 10)).fetchall(), + [(10, "I'm a name")] + ) + + def _exotic_targets_fixture(self, conn): + users = self.tables.users_xtra + + conn.execute( + insert(users), + dict( + id=1, name='name1', + login_email='name1@gmail.com', lets_index_this='not' + ) + ) + conn.execute( + users.insert(), + dict( + id=2, name='name2', + login_email='name2@gmail.com', lets_index_this='not' + ) + ) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, 'name1', 'name1@gmail.com', 'not')] + ) + + def test_on_conflict_do_update_exotic_targets_two(self): + users = self.tables.users_xtra + + with testing.db.connect() as conn: + self._exotic_targets_fixture(conn) + # try primary key constraint: cause an upsert on unique id column + i = insert(users) + i = i.on_conflict_do_update( + index_elements=users.primary_key.columns, + set_=dict( + name=i.excluded.name, + login_email=i.excluded.login_email) + ) + conn.execute(i, dict( + id=1, name='name2', login_email='name1@gmail.com', + lets_index_this='not') + ) + + eq_( + conn.execute(users.select().where(users.c.id == 1)).fetchall(), + [(1, 'name2', 'name1@gmail.com', 'not')] + ) + + def test_on_conflict_do_update_exotic_targets_three(self): + users = self.tables.users_xtra + + with testing.db.connect() as conn: + self._exotic_targets_fixture(conn) + # try unique constraint: cause an upsert on target + # login_email, not id + i = insert(users) + i = i.on_conflict_do_update( + constraint=self.unique_constraint, + set_=dict(id=i.excluded.id, name=i.excluded.name, + login_email=i.excluded.login_email) + ) + # note: lets_index_this value totally ignored in SET clause. + conn.execute(i, dict( + id=42, name='nameunique', + login_email='name2@gmail.com', lets_index_this='unique') + ) + + eq_( + conn.execute( + users.select(). + where(users.c.login_email == 'name2@gmail.com') + ).fetchall(), + [(42, 'nameunique', 'name2@gmail.com', 'not')] + ) + + def test_on_conflict_do_update_exotic_targets_four(self): + users = self.tables.users_xtra + + with testing.db.connect() as conn: + self._exotic_targets_fixture(conn) + # try unique constraint by name: cause an + # upsert on target login_email, not id + i = insert(users) + i = i.on_conflict_do_update( + constraint=self.unique_constraint.name, + set_=dict( + id=i.excluded.id, name=i.excluded.name, + login_email=i.excluded.login_email) + ) + # note: lets_index_this value totally ignored in SET clause. + + conn.execute(i, dict( + id=43, name='nameunique2', + login_email='name2@gmail.com', lets_index_this='unique') + ) + + eq_( + conn.execute( + users.select(). + where(users.c.login_email == 'name2@gmail.com') + ).fetchall(), + [(43, 'nameunique2', 'name2@gmail.com', 'not')] + ) + + def test_on_conflict_do_update_exotic_targets_five(self): + users = self.tables.users_xtra + + with testing.db.connect() as conn: + self._exotic_targets_fixture(conn) + # try bogus index + i = insert(users) + i = i.on_conflict_do_update( + index_elements=self.bogus_index.columns, + index_where=self. + bogus_index.dialect_options['postgresql']['where'], + set_=dict( + name=i.excluded.name, + login_email=i.excluded.login_email) + ) + + assert_raises( + exc.ProgrammingError, conn.execute, i, + dict( + id=1, name='namebogus', login_email='bogus@gmail.com', + lets_index_this='bogus') + )