]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Add ON CONFLICT support for Postgresql
authorRobin Thomas <robin.thomas@livestream.com>
Thu, 14 Apr 2016 16:57:15 +0000 (12:57 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 14 Jun 2016 19:03:14 +0000 (15:03 -0400)
Fixes: #3529
Co-authored-by: Mike Bayer <mike_mp@zzzcomputing.com>
Change-Id: Ie3bf6ad70d9be9f0e44938830e922db03573991a
Pull-request: https://github.com/zzzeek/sqlalchemy/pull/258

doc/build/changelog/changelog_11.rst
doc/build/changelog/migration_11.rst
doc/build/dialects/postgresql.rst
lib/sqlalchemy/dialects/postgresql/__init__.py
lib/sqlalchemy/dialects/postgresql/base.py
lib/sqlalchemy/dialects/postgresql/dml.py [new file with mode: 0644]
lib/sqlalchemy/sql/compiler.py
lib/sqlalchemy/sql/dml.py
test/dialect/postgresql/test_compiler.py
test/dialect/postgresql/test_on_conflict.py [new file with mode: 0644]

index 8d20ef257d68f75c62e7efbbe67e4cd3d509a627..6151ec3ffe5dff6326d50c3eeddebce3f899b8ca 100644 (file)
 .. changelog::
     :version: 1.1.0b1
 
+    .. change::
+        :tags: feature, postgresql
+        :tickets: 3529
+
+        Added support for Postgresql's INSERT..ON CONFLICT using a new
+        Postgresql-specific :class:`.postgresql.dml.Insert` object.
+        Pull request and extensive efforts here by Robin Thomas.
+
+        .. seealso::
+
+            :ref:`change_3529`
+
     .. change::
         :tags: feature, postgresql
         :pullreq: bitbucket:84
index 73483f3dbc36244b39e90a5ead2e531770cb7092..ea932b50927e29a0ca08cee3034e2ee6254bb751 100644 (file)
@@ -2026,6 +2026,42 @@ necessary to worry about the names themselves in the textual SQL.
 Dialect Improvements and Changes - Postgresql
 =============================================
 
+.. _change_3529:
+
+Support for INSERT..ON CONFLICT (DO UPDATE | DO NOTHING)
+--------------------------------------------------------
+
+The ``ON CONFLICT`` clause of ``INSERT`` added to Postgresql as of
+version 9.5 is now supported using a Postgresql-specific version of the
+:class:`.Insert` object, via :func:`sqlalchemy.dialects.postgresql.dml.insert`.
+This :class:`.Insert` subclass adds two new methods :meth:`.Insert.on_conflict_do_update`
+and :meth:`.Insert.on_conflict_do_nothing` which implement the full syntax
+supported by Posgresql 9.5 in this area::
+
+    from sqlalchemy.dialects.postgresql import insert
+
+    insert_stmt = insert(my_table). \\
+        values(id='some_id', data='some data to insert')
+
+    do_update_stmt = insert_stmt.on_conflict_do_update(
+        index_elements=[my_table.c.id],
+        set_=dict(data='some data to update')
+    )
+
+    conn.execute(do_update_stmt)
+
+The above will render::
+
+    INSERT INTO my_table (id, data)
+    VALUES (:id, :data)
+    ON CONFLICT id DO UPDATE SET data=:data_2
+
+.. seealso::
+
+    :ref:`postgresql_insert_on_conflict`
+
+:ticket:`3529`
+
 .. _change_3499_postgresql:
 
 ARRAY and JSON types now correctly specify "unhashable"
index b4c90643d98204ee08728dc012bd1e1d7bac161d..56b14a8d05fa31d5d639bf5b0a70ec665179fa9f 100644 (file)
@@ -181,6 +181,14 @@ For example::
           ExcludeConstraint(('room', '='), ('during', '&&')),
       )
 
+PostgreSQL DML Constructs
+---------------------------
+
+.. autofunction:: sqlalchemy.dialects.postgresql.dml.insert
+
+.. autoclass:: sqlalchemy.dialects.postgresql.dml.Insert
+  :members:
+
 psycopg2
 --------
 
index ffd100f6759543d36eec5170743a1ff97451d1ff..fae18362126cfd2a2a43feaf0fbc2dd2b53fe9d9 100644 (file)
@@ -19,6 +19,7 @@ from .hstore import HSTORE, hstore
 from .json import JSON, JSONB
 from .array import array, ARRAY, Any, All
 from .ext import aggregate_order_by, ExcludeConstraint, array_agg
+from .dml import insert, Insert
 
 from .ranges import INT4RANGE, INT8RANGE, NUMRANGE, DATERANGE, TSRANGE, \
     TSTZRANGE
@@ -31,5 +32,5 @@ __all__ = (
     'hstore', 'INT4RANGE', 'INT8RANGE', 'NUMRANGE', 'DATERANGE',
     'TSRANGE', 'TSTZRANGE', 'json', 'JSON', 'JSONB', 'Any', 'All',
     'DropEnumType', 'CreateEnumType', 'ExcludeConstraint',
-    'aggregate_order_by', 'array_agg'
+    'aggregate_order_by', 'array_agg', 'insert', 'Insert'
 )
index 16b22129a295aa00158f12d99970afad8050d1f5..688ce9e1c53e98ae249b02fd2e719b27947b9e64 100644 (file)
@@ -48,14 +48,14 @@ Transaction Isolation Level
 ---------------------------
 
 All Postgresql dialects support setting of transaction isolation level
-both via a dialect-specific parameter :paramref:`.create_engine.isolation_level`
-accepted by :func:`.create_engine`,
-as well as the :paramref:`.Connection.execution_options.isolation_level` argument as passed to
-:meth:`.Connection.execution_options`.  When using a non-psycopg2 dialect,
-this feature works by issuing the command
+both via a dialect-specific parameter
+:paramref:`.create_engine.isolation_level` accepted by :func:`.create_engine`,
+as well as the :paramref:`.Connection.execution_options.isolation_level`
+argument as passed to :meth:`.Connection.execution_options`.
+When using a non-psycopg2 dialect, this feature works by issuing the command
 ``SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL <level>`` for
-each new connection.  For the special AUTOCOMMIT isolation level, DBAPI-specific
-techniques are used.
+each new connection.  For the special AUTOCOMMIT isolation level,
+DBAPI-specific techniques are used.
 
 To set isolation level using :func:`.create_engine`::
 
@@ -247,6 +247,197 @@ use the :meth:`._UpdateBase.returning` method on a per-statement basis::
         where(table.c.name=='foo')
     print result.fetchall()
 
+.. _postgresql_insert_on_conflict:
+
+INSERT...ON CONFLICT (Upsert)
+------------------------------
+
+Starting with version 9.5, PostgreSQL allows "upserts" (update or insert)
+of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement.
+A candidate row will only be inserted if that row does not violate
+any unique constraints.  In the case of a unique constraint violation,
+a secondary action can occur which can be either "DO UPDATE", indicating
+that the data in the target row should be updated, or "DO NOTHING",
+which indicates to silently skip this row.
+
+Conflicts are determined using existing unique constraints and indexes.  These
+constraints may be identified either using their name as stated in DDL,
+or they may be *inferred* by stating the columns and conditions that comprise
+the indexes.
+
+SQLAlchemy provides ``ON CONFLICT`` support via the Postgresql-specific
+:func:`.postgresql.dml.insert()` function, which provides
+the generative methods :meth:`~.postgresql.dml.Insert.on_conflict_do_update`
+and :meth:`~.postgresql.dml.Insert.on_conflict_do_nothing`::
+
+    from sqlalchemy.dialects.postgresql import insert
+
+    insert_stmt = insert(my_table).values(
+        id='some_existing_id',
+        data='inserted value')
+
+    do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
+        index_elements=['id']
+    )
+
+    conn.execute(do_nothing_stmt)
+
+    do_update_stmt = insert_stmt.on_conflict_do_update(
+        constraint='pk_my_table',
+        set_=dict(data='updated value')
+    )
+
+    conn.execute(do_update_stmt)
+
+Both methods supply the "target" of the conflict using either the
+named constraint or by column inference:
+
+* The :paramref:`.Insert.on_conflict_do_update.index_elements` argument
+  specifies a sequence containing string column names, :class:`.Column` objects,
+  and/or SQL expression elements, which would identify a unique index::
+
+    do_update_stmt = insert_stmt.on_conflict_do_update(
+        index_elements=['id'],
+        set_=dict(data='updated value')
+    )
+
+    do_update_stmt = insert_stmt.on_conflict_do_update(
+        index_elements=[my_table.c.id],
+        set_=dict(data='updated value')
+    )
+
+* When using :paramref:`.Insert.on_conflict_do_update.index_elements` to
+  infer an index, a partial index can be inferred by also specifying the
+  use the :paramref:`.Insert.on_conflict_do_update.index_where` parameter::
+
+    from sqlalchemy.dialects.postgresql import insert
+
+    stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
+    stmt = stmt.on_conflict_do_update(
+        index_elements=[my_table.c.user_email],
+        index_where=my_table.c.user_email.like('%@gmail.com'),
+        set_=dict(data=stmt.excluded.data)
+        )
+    conn.execute(stmt)
+
+
+* The :paramref:`.Insert.on_conflict_do_update.constraint` argument is
+  used to specify an index directly rather than inferring it.  This can be
+  the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX::
+
+    do_update_stmt = insert_stmt.on_conflict_do_update(
+        constraint='my_table_idx_1',
+        set_=dict(data='updated value')
+    )
+
+    do_update_stmt = insert_stmt.on_conflict_do_update(
+        constraint='my_table_pk',
+        set_=dict(data='updated value')
+    )
+
+* The :paramref:`.Insert.on_conflict_do_update.constraint` argument may
+  also refer to a SQLAlchemy construct representing a constraint,
+  e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`,
+  :class:`.Index`, or :class:`.ExcludeConstraint`.   In this use,
+  if the constraint has a name, it is used directly.  Otherwise, if the
+  constraint is unnamed, then inference will be used, where the expressions
+  and optional WHERE clause of the constraint will be spelled out in the
+  construct.  This use is especially convenient
+  to refer to the named or unnamed primary key of a :class:`.Table` using the
+  :attr:`.Table.primary_key` attribute::
+
+    do_update_stmt = insert_stmt.on_conflict_do_update(
+        constraint=my_table.primary_key,
+        set_=dict(data='updated value')
+    )
+
+``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
+existing row, using any combination of new values as well as values
+from the proposed insertion.   These values are specified using the
+:paramref:`.Insert.on_conflict_do_update.set_` parameter.  This
+parameter accepts a dictionary which consists of direct values
+for UPDATE::
+
+    from sqlalchemy.dialects.postgresql import insert
+
+    stmt = insert(my_table).values(id='some_id', data='inserted value')
+    do_update_stmt = stmt.on_conflict_do_update(
+        index_elements=['id'],
+        set_=dict(data='updated value')
+        )
+    conn.execute(do_update_stmt)
+
+.. warning::
+
+    The :meth:`.Insert.on_conflict_do_update` method does **not** take into
+    account Python-side default UPDATE values or generation functions, e.g.
+    e.g. those specified using :paramref:`.Column.onupdate`.
+    These values will not be exercised for an ON CONFLICT style of UPDATE,
+    unless they are manually specified in the
+    :paramref:`.Insert.on_conflict_do_update.set_` dictionary.
+
+In order to refer to the proposed insertion row, the special alias
+:attr:`~.postgresql.dml.Insert.excluded` is available as an attribute on
+the :class:`.postgresql.dml.Insert` object; this object is a
+:class:`.ColumnCollection` which alias contains all columns of the target
+table::
+
+    from sqlalchemy.dialects.postgresql import insert
+
+    stmt = insert(my_table).values(
+        id='some_id',
+        data='inserted value',
+        author='jlh')
+    do_update_stmt = stmt.on_conflict_do_update(
+        index_elements=['id'],
+        set_=dict(data='updated value', author=stmt.excluded.author)
+        )
+    conn.execute(do_update_stmt)
+
+The :meth:`.Insert.on_conflict_do_update` method also accepts
+a WHERE clause using the :paramref:`.Insert.on_conflict_do_update.where`
+parameter, which will limit those rows which receive an UPDATE::
+
+    from sqlalchemy.dialects.postgresql import insert
+
+    stmt = insert(my_table).values(
+        id='some_id',
+        data='inserted value',
+        author='jlh')
+    on_update_stmt = stmt.on_conflict_do_update(
+        index_elements=['id'],
+        set_=dict(data='updated value', author=stmt.excluded.author)
+        where=(my_table.c.status == 2)
+        )
+    conn.execute(on_update_stmt)
+
+``ON CONFLICT`` may also be used to skip inserting a row entirely
+if any conflict with a unique or exclusion constraint occurs; below
+this is illustrated using the
+:meth:`~.postgresql.dml.Insert.on_conflict_do_nothing` method::
+
+    from sqlalchemy.dialects.postgresql import insert
+
+    stmt = insert(my_table).values(id='some_id', data='inserted value')
+    stmt = stmt.on_conflict_do_nothing(index_elements=['id'])
+    conn.execute(stmt)
+
+If ``DO NOTHING`` is used without specifying any columns or constraint,
+it has the effect of skipping the INSERT for any unique or exclusion
+constraint violation which occurs::
+
+    from sqlalchemy.dialects.postgresql import insert
+
+    stmt = insert(my_table).values(id='some_id', data='inserted value')
+    stmt = stmt.on_conflict_do_nothing()
+    conn.execute(stmt)
+
+.. versionadded:: 1.1 Added support for Postgresql ON CONFLICT clauses
+
+.. seealso::
+
+    `INSERT .. ON CONFLICT <http://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_ - in the Postgresql documentation.
+
 .. _postgresql_match:
 
 Full Text Search
@@ -354,6 +545,8 @@ Postgresql-Specific Index Options
 Several extensions to the :class:`.Index` construct are available, specific
 to the PostgreSQL dialect.
 
+.. _postgresql_partial_indexes:
+
 Partial Indexes
 ^^^^^^^^^^^^^^^^
 
@@ -663,7 +856,6 @@ _DECIMAL_TYPES = (1231, 1700)
 _FLOAT_TYPES = (700, 701, 1021, 1022)
 _INT_TYPES = (20, 21, 23, 26, 1005, 1007, 1016)
 
-
 class BYTEA(sqltypes.LargeBinary):
     __visit_name__ = 'BYTEA'
 
@@ -1223,6 +1415,68 @@ class PGCompiler(compiler.SQLCompiler):
         else:
             return "SUBSTRING(%s FROM %s)" % (s, start)
 
+    def _on_conflict_target(self, clause, **kw):
+
+        if clause.constraint_target is not None:
+            target_text = 'ON CONSTRAINT %s' % clause.constraint_target
+        elif clause.inferred_target_elements is not None:
+            target_text = '(%s)' % ', '.join(
+                (self.preparer.quote(c)
+                 if isinstance(c, util.string_types)
+                 else
+                 self.process(c, include_table=False, use_schema=False))
+                for c in clause.inferred_target_elements
+            )
+            if clause.inferred_target_whereclause is not None:
+                target_text += ' WHERE %s' % \
+                    self.process(
+                        clause.inferred_target_whereclause,
+                        include_table=False,
+                        use_schema=False
+                    )
+        else:
+            target_text = ''
+
+        return target_text
+
+    def visit_on_conflict_do_nothing(self, on_conflict, **kw):
+
+        target_text = self._on_conflict_target(on_conflict, **kw)
+
+        if target_text:
+            return "ON CONFLICT %s DO NOTHING" % target_text
+        else:
+            return "ON CONFLICT DO NOTHING"
+
+    def visit_on_conflict_do_update(self, on_conflict, **kw):
+
+        clause = on_conflict
+
+        target_text = self._on_conflict_target(on_conflict, **kw)
+
+        action_set_ops = []
+        for k, v in clause.update_values_to_set:
+            key_text = (
+                self.preparer.quote(k)
+                if isinstance(k, util.string_types)
+                else self.process(k, use_schema=False)
+            )
+            value_text = self.process(
+                v,
+                use_schema=False
+            )
+            action_set_ops.append('%s = %s' % (key_text, value_text))
+        action_text = ', '.join(action_set_ops)
+        if clause.update_whereclause is not None:
+            action_text += ' WHERE %s' % \
+                self.process(
+                    clause.update_whereclause,
+                    include_table=False,
+                    use_schema=False
+                )
+
+        return 'ON CONFLICT %s DO UPDATE SET %s' % (target_text, action_text)
+
 
 class PGDDLCompiler(compiler.DDLCompiler):
 
@@ -1706,7 +1960,7 @@ class PGDialect(default.DefaultDialect):
             "with_oids": None,
             "on_commit": None,
             "inherits": None
-        })
+        }),
     ]
 
     reflection_options = ('postgresql_ignore_search_path', )
diff --git a/lib/sqlalchemy/dialects/postgresql/dml.py b/lib/sqlalchemy/dialects/postgresql/dml.py
new file mode 100644 (file)
index 0000000..e8e6cd1
--- /dev/null
@@ -0,0 +1,211 @@
+# postgresql/on_conflict.py
+# Copyright (C) 2005-2016 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+
+from ...sql.elements import ClauseElement, _literal_as_binds
+from ...sql.dml import Insert as StandardInsert
+from ...sql.expression import alias
+from ...sql import schema
+from ...util.langhelpers import public_factory
+from ...sql.base import _generative
+from ... import util
+from . import ext
+
+__all__ = ('Insert', 'insert')
+
+
+class Insert(StandardInsert):
+    """Postgresql-specific implementation of INSERT.
+
+    Adds methods for PG-specific syntaxes such as ON CONFLICT.
+
+    .. versionadded:: 1.1
+
+    """
+
+    @util.memoized_property
+    def excluded(self):
+        """Provide the ``excluded`` namespace for an ON CONFLICT statement
+
+        PG's ON CONFLICT clause allows reference to the row that would
+        be inserted, known as ``excluded``.  This attribute provides
+        all columns in this row to be referenaceable.
+
+        .. seealso::
+
+            :ref:`postgresql_insert_on_conflict` - example of how
+            to use :attr:`.Insert.excluded`
+
+        """
+        return alias(self.table, name='excluded').columns
+
+    @_generative
+    def on_conflict_do_update(
+            self,
+            constraint=None, index_elements=None,
+            index_where=None, set_=None, where=None):
+        """
+        Specifies a DO UPDATE SET action for ON CONFLICT clause.
+
+        Either the ``constraint`` or ``index_elements`` argument is
+        required, but only one of these can be specified.
+
+        :param constraint:
+        The name of a unique or exclusion constraint on the table,
+        or the constraint object itself if it has a .name attribute.
+
+        :param index_elements:
+        A sequence consisting of string column names, :class:`.Column`
+        objects, or other column expression objects that will be used
+        to infer a target index.
+
+        :param index_where:
+        Additional WHERE criterion that can be used to infer a
+        conditional target index.
+
+        :param set_:
+        Required argument. A dictionary or other mapping object
+        with column names as keys and expressions or literals as values,
+        specifying the ``SET`` actions to take.
+
+        .. warning:: This dictionary does **not** take into account
+           Python-specified default UPDATE values or generation functions,
+           e.g. those specified using :paramref:`.Column.onupdate`.
+           These values will not be exercised for an ON CONFLICT style of
+           UPDATE, unless they are manually specified in the
+           :paramref:`.Insert.on_conflict_do_update.set_` dictionary.
+
+        :param where:
+        Optional argument. If present, can be a literal SQL
+        string or an acceptable expression for a ``WHERE`` clause
+        that restricts the rows affected by ``DO UPDATE SET``. Rows
+        not meeting the ``WHERE`` condition will not be updated
+        (effectively a ``DO NOTHING`` for those rows).
+
+        .. versionadded:: 1.1
+
+        .. seealso::
+
+            :ref:`postgresql_insert_on_conflict`
+
+        """
+        self._post_values_clause = OnConflictDoUpdate(
+            constraint, index_elements, index_where, set_, where)
+        return self
+
+    @_generative
+    def on_conflict_do_nothing(
+            self,
+            constraint=None, index_elements=None, index_where=None):
+        """
+        Specifies a DO NOTHING action for ON CONFLICT clause.
+
+        The ``constraint`` and ``index_elements`` arguments
+        are optional, but only one of these can be specified.
+
+        :param constraint:
+        The name of a unique or exclusion constraint on the table,
+        or the constraint object itself if it has a .name attribute.
+
+        :param index_elements:
+        A sequence consisting of string column names, :class:`.Column`
+        objects, or other column expression objects that will be used
+        to infer a target index.
+
+        :param index_where:
+        Additional WHERE criterion that can be used to infer a
+        conditional target index.
+
+        .. versionadded:: 1.1
+
+        .. seealso::
+
+            :ref:`postgresql_insert_on_conflict`
+
+        """
+        self._post_values_clause = OnConflictDoNothing(
+            constraint, index_elements, index_where)
+        return self
+
+insert = public_factory(Insert, '.dialects.postgresql.insert')
+
+
+class OnConflictClause(ClauseElement):
+    def __init__(
+            self,
+            constraint=None,
+            index_elements=None,
+            index_where=None):
+
+        if constraint is not None:
+            if not isinstance(constraint, util.string_types) and \
+                    isinstance(constraint, (
+                        schema.Index, schema.Constraint,
+                        ext.ExcludeConstraint)):
+                constraint = getattr(constraint, 'name') or constraint
+
+        if constraint is not None:
+            if index_elements is not None:
+                raise ValueError(
+                    "'constraint' and 'index_elements' are mutually exclusive")
+
+            if isinstance(constraint, util.string_types):
+                self.constraint_target = constraint
+                self.inferred_target_elements = None
+                self.inferred_target_whereclause = None
+            elif isinstance(constraint, schema.Index):
+                index_elements = constraint.expressions
+                index_where = \
+                    constraint.dialect_options['postgresql'].get("where")
+            elif isinstance(constraint, ext.ExcludeConstraint):
+                index_elements = constraint.columns
+                index_where = constraint.where
+            else:
+                index_elements = constraint.columns
+                index_where = \
+                    constraint.dialect_options['postgresql'].get("where")
+
+        if index_elements is not None:
+            self.constraint_target = None
+            self.inferred_target_elements = index_elements
+            self.inferred_target_whereclause = index_where
+        elif constraint is None:
+            self.constraint_target = self.inferred_target_elements = \
+                self.inferred_target_whereclause = None
+
+
+class OnConflictDoNothing(OnConflictClause):
+    __visit_name__ = 'on_conflict_do_nothing'
+
+
+class OnConflictDoUpdate(OnConflictClause):
+    __visit_name__ = 'on_conflict_do_update'
+
+    def __init__(
+            self,
+            constraint=None,
+            index_elements=None,
+            index_where=None,
+            set_=None,
+            where=None):
+        super(OnConflictDoUpdate, self).__init__(
+            constraint=constraint,
+            index_elements=index_elements,
+            index_where=index_where)
+
+        if self.inferred_target_elements is None and \
+                self.constraint_target is None:
+            raise ValueError(
+                "Either constraint or index_elements, "
+                "but not both, must be specified unless DO NOTHING")
+
+        if (not isinstance(set_, dict) or not set_):
+            raise ValueError("set parameter must be a non-empty dictionary")
+        self.update_values_to_set = [
+            (key, _literal_as_binds(value))
+            for key, value in set_.items()
+        ]
+        self.update_whereclause = where
index 6d9ab9039f1e1175d24de76ee61f3ada9098e272..94c7db20a9d70644a322f22a2d6ee81259b406bc 100644 (file)
@@ -1984,6 +1984,12 @@ class SQLCompiler(Compiled):
             text += " VALUES (%s)" % \
                 ', '.join([c[1] for c in crud_params])
 
+        if insert_stmt._post_values_clause is not None:
+            post_values_clause = self.process(
+                insert_stmt._post_values_clause, **kw)
+            if post_values_clause:
+                text += " " + post_values_clause
+
         if returning_clause and not self.returning_precedes_values:
             text += " " + returning_clause
 
index 8f368dcdbb3cbe2a6beac829aac9e7cdef551d90..b54b4792dead0eb92179a063432acf7bc983b57f 100644 (file)
@@ -194,6 +194,7 @@ class ValuesBase(UpdateBase):
     _has_multi_parameters = False
     _preserve_parameter_order = False
     select = None
+    _post_values_clause = None
 
     def __init__(self, table, values, prefixes):
         self.table = _interpret_as_from(table)
index f85ff2682c523f80a1eed2785465940a915952b5..88110ba2df12635e7eedf232a57e7574cefb5498 100644 (file)
@@ -5,7 +5,7 @@ from sqlalchemy.testing.assertions import AssertsCompiledSQL, is_, \
 from sqlalchemy.testing import engines, fixtures
 from sqlalchemy import testing
 from sqlalchemy import Sequence, Table, Column, Integer, update, String,\
-    insert, func, MetaData, Enum, Index, and_, delete, select, cast, text, \
+    func, MetaData, Enum, Index, and_, delete, select, cast, text, \
     Text
 from sqlalchemy.dialects.postgresql import ExcludeConstraint, array
 from sqlalchemy import exc, schema
@@ -14,9 +14,8 @@ from sqlalchemy.dialects.postgresql import TSRANGE
 from sqlalchemy.orm import mapper, aliased, Session
 from sqlalchemy.sql import table, column, operators, literal_column
 from sqlalchemy.sql import util as sql_util
-from sqlalchemy.util import u
-from sqlalchemy.dialects.postgresql import aggregate_order_by
-
+from sqlalchemy.util import u, OrderedDict
+from sqlalchemy.dialects.postgresql import aggregate_order_by, insert
 
 class SequenceTest(fixtures.TestBase, AssertsCompiledSQL):
     __prefer__ = 'postgresql'
@@ -186,7 +185,6 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
             schema_translate_map=schema_translate_map
         )
 
-
     def test_create_table_with_tablespace(self):
         m = MetaData()
         tbl = Table(
@@ -1035,6 +1033,254 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
         )
 
 
+class InsertOnConflictTest(fixtures.TestBase, AssertsCompiledSQL):
+    __dialect__ = postgresql.dialect()
+
+    def setup(self):
+        self.table1 = table1 = table(
+            'mytable',
+            column('myid', Integer),
+            column('name', String(128)),
+            column('description', String(128)),
+        )
+        md = MetaData()
+        self.table_with_metadata = Table(
+            'mytable', md,
+            Column('myid', Integer, primary_key=True),
+            Column('name', String(128)),
+            Column('description', String(128))
+        )
+        self.unique_constr = schema.UniqueConstraint(
+            table1.c.name, name='uq_name')
+        self.excl_constr = ExcludeConstraint(
+            (table1.c.name, '='),
+            (table1.c.description, '&&'),
+            name='excl_thing'
+        )
+        self.excl_constr_anon = ExcludeConstraint(
+            (self.table_with_metadata.c.name, '='),
+            (self.table_with_metadata.c.description, '&&'),
+            where=self.table_with_metadata.c.description != 'foo'
+        )
+        self.goofy_index = Index(
+            'goofy_index', table1.c.name,
+            postgresql_where=table1.c.name > 'm'
+        )
+
+    def test_do_nothing_no_target(self):
+
+        i = insert(
+            self.table1, values=dict(name='foo'),
+        ).on_conflict_do_nothing()
+        self.assert_compile(i,
+                            'INSERT INTO mytable (name) VALUES '
+                            '(%(name)s) ON CONFLICT DO NOTHING')
+
+    def test_do_nothing_index_elements_target(self):
+
+        i = insert(
+            self.table1, values=dict(name='foo'),
+        ).on_conflict_do_nothing(
+            index_elements=['myid'],
+        )
+        self.assert_compile(
+            i,
+            "INSERT INTO mytable (name) VALUES "
+            "(%(name)s) ON CONFLICT (myid) DO NOTHING"
+        )
+
+    def test_do_update_set_clause_literal(self):
+        i = insert(self.table_with_metadata).values(myid=1, name='foo')
+        i = i.on_conflict_do_update(
+            index_elements=['myid'],
+            set_=OrderedDict([
+                ('name', "I'm a name"),
+                ('description', None)])
+        )
+        self.assert_compile(
+            i,
+            'INSERT INTO mytable (myid, name) VALUES '
+            '(%(myid)s, %(name)s) ON CONFLICT (myid) '
+            'DO UPDATE SET name = %(param_1)s, '
+            'description = NULL',
+            {"myid": 1, "name": "foo", "param_1": "I'm a name"}
+
+        )
+
+    def test_do_update_str_index_elements_target_one(self):
+        i = insert(self.table_with_metadata).values(myid=1, name='foo')
+        i = i.on_conflict_do_update(
+            index_elements=['myid'],
+            set_=OrderedDict([
+                ('name', i.excluded.name),
+                ('description', i.excluded.description)])
+        )
+        self.assert_compile(i,
+                            'INSERT INTO mytable (myid, name) VALUES '
+                            '(%(myid)s, %(name)s) ON CONFLICT (myid) '
+                            'DO UPDATE SET name = excluded.name, '
+                            'description = excluded.description')
+
+    def test_do_update_str_index_elements_target_two(self):
+        i = insert(
+            self.table1, values=dict(name='foo'))
+        i = i.on_conflict_do_update(
+            index_elements=['myid'],
+            set_=dict(name=i.excluded.name)
+        )
+        self.assert_compile(i,
+                            'INSERT INTO mytable (name) VALUES '
+                            '(%(name)s) ON CONFLICT (myid) '
+                            'DO UPDATE SET name = excluded.name')
+
+    def test_do_update_col_index_elements_target(self):
+        i = insert(
+            self.table1, values=dict(name='foo'))
+        i = i.on_conflict_do_update(
+            index_elements=[self.table1.c.myid],
+            set_=dict(name=i.excluded.name)
+        )
+        self.assert_compile(i,
+                            'INSERT INTO mytable (name) VALUES '
+                            '(%(name)s) ON CONFLICT (myid) '
+                            'DO UPDATE SET name = excluded.name')
+
+    def test_do_update_unnamed_pk_constraint_target(self):
+        i = insert(
+            self.table_with_metadata, values=dict(myid=1, name='foo'))
+        i = i.on_conflict_do_update(
+            constraint=self.table_with_metadata.primary_key,
+            set_=dict(name=i.excluded.name)
+        )
+        self.assert_compile(i,
+                            'INSERT INTO mytable (myid, name) VALUES '
+                            '(%(myid)s, %(name)s) ON CONFLICT (myid) '
+                            'DO UPDATE SET name = excluded.name')
+
+    def test_do_update_pk_constraint_index_elements_target(self):
+        i = insert(
+            self.table_with_metadata, values=dict(myid=1, name='foo'))
+        i = i.on_conflict_do_update(
+            index_elements=self.table_with_metadata.primary_key,
+            set_=dict(name=i.excluded.name)
+        )
+        self.assert_compile(i,
+                            'INSERT INTO mytable (myid, name) VALUES '
+                            '(%(myid)s, %(name)s) ON CONFLICT (myid) '
+                            'DO UPDATE SET name = excluded.name')
+
+    def test_do_update_named_unique_constraint_target(self):
+        i = insert(
+            self.table1, values=dict(name='foo'))
+        i = i.on_conflict_do_update(
+            constraint=self.unique_constr,
+            set_=dict(myid=i.excluded.myid)
+        )
+        self.assert_compile(i,
+                            'INSERT INTO mytable (name) VALUES '
+                            '(%(name)s) ON CONFLICT ON CONSTRAINT uq_name '
+                            'DO UPDATE SET myid = excluded.myid')
+
+    def test_do_update_string_constraint_target(self):
+        i = insert(
+            self.table1, values=dict(name='foo'))
+        i = i.on_conflict_do_update(
+            constraint=self.unique_constr.name,
+            set_=dict(myid=i.excluded.myid)
+        )
+        self.assert_compile(i,
+                            'INSERT INTO mytable (name) VALUES '
+                            '(%(name)s) ON CONFLICT ON CONSTRAINT uq_name '
+                            'DO UPDATE SET myid = excluded.myid')
+
+    def test_do_update_index_elements_where_target(self):
+        i = insert(
+            self.table1, values=dict(name='foo'))
+        i = i.on_conflict_do_update(
+            index_elements=self.goofy_index.expressions,
+            index_where=self.goofy_index.dialect_options[
+                'postgresql']['where'],
+            set_=dict(name=i.excluded.name)
+        )
+        self.assert_compile(i,
+                            'INSERT INTO mytable (name) VALUES '
+                            "(%(name)s) ON CONFLICT (name) "
+                            "WHERE name > %(name_1)s "
+                            'DO UPDATE SET name = excluded.name')
+
+    def test_do_update_unnamed_index_target(self):
+        i = insert(
+            self.table1, values=dict(name='foo'))
+
+        unnamed_goofy = Index(
+            None, self.table1.c.name,
+            postgresql_where=self.table1.c.name > 'm'
+        )
+
+        i = i.on_conflict_do_update(
+            constraint=unnamed_goofy,
+            set_=dict(name=i.excluded.name)
+        )
+        self.assert_compile(i,
+                            'INSERT INTO mytable (name) VALUES '
+                            "(%(name)s) ON CONFLICT (name) "
+                            "WHERE name > %(name_1)s "
+                            'DO UPDATE SET name = excluded.name')
+
+    def test_do_update_unnamed_exclude_constraint_target(self):
+        i = insert(
+            self.table1, values=dict(name='foo'))
+        i = i.on_conflict_do_update(
+            constraint=self.excl_constr_anon,
+            set_=dict(name=i.excluded.name)
+        )
+        self.assert_compile(i,
+                            'INSERT INTO mytable (name) VALUES '
+                            "(%(name)s) ON CONFLICT (name, description) "
+                            "WHERE description != %(description_1)s "
+                            'DO UPDATE SET name = excluded.name')
+
+    def test_do_update_add_whereclause(self):
+        i = insert(
+            self.table1, values=dict(name='foo'))
+        i = i.on_conflict_do_update(
+            constraint=self.excl_constr_anon,
+            set_=dict(name=i.excluded.name),
+            where=(
+                (self.table1.c.name != 'brah') &
+                (self.table1.c.description != 'brah'))
+        )
+        self.assert_compile(i,
+                            'INSERT INTO mytable (name) VALUES '
+                            "(%(name)s) ON CONFLICT (name, description) "
+                            "WHERE description != %(description_1)s "
+                            'DO UPDATE SET name = excluded.name '
+                            "WHERE name != %(name_1)s "
+                            "AND description != %(description_2)s")
+
+    def test_quote_raw_string_col(self):
+        t = table('t', column("FancyName"), column("other name"))
+
+        stmt = insert(t).values(FancyName='something new').\
+            on_conflict_do_update(
+                index_elements=['FancyName', 'other name'],
+                set_=OrderedDict([
+                    ("FancyName", 'something updated'),
+                    ("other name", "something else")
+                ])
+        )
+
+        self.assert_compile(
+            stmt,
+            'INSERT INTO t ("FancyName") VALUES (%(FancyName)s) '
+            'ON CONFLICT ("FancyName", "other name") '
+            'DO UPDATE SET "FancyName" = %(param_1)s, '
+            '"other name" = %(param_2)s',
+            {'param_1': 'something updated',
+             'param_2': 'something else', 'FancyName': 'something new'}
+        )
+
+
 class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL):
 
     """Test 'DISTINCT' with SQL expression language and orm.Query with
diff --git a/test/dialect/postgresql/test_on_conflict.py b/test/dialect/postgresql/test_on_conflict.py
new file mode 100644 (file)
index 0000000..201287d
--- /dev/null
@@ -0,0 +1,312 @@
+# coding: utf-8
+
+from sqlalchemy.testing.assertions import eq_, assert_raises
+from sqlalchemy.testing import fixtures
+from sqlalchemy import testing
+from sqlalchemy import Table, Column, Integer, String
+from sqlalchemy import exc, schema
+from sqlalchemy.dialects.postgresql import insert
+
+
+class OnConflictTest(fixtures.TablesTest):
+
+    __only_on__ = 'postgresql >= 9.5',
+    __backend__ = True
+
+    @classmethod
+    def define_tables(cls, metadata):
+        Table(
+            'users', metadata,
+            Column('id', Integer, primary_key=True),
+            Column('name', String(50))
+        )
+
+        users_xtra = Table(
+            'users_xtra', metadata,
+            Column('id', Integer, primary_key=True),
+            Column('name', String(50)),
+            Column('login_email', String(50)),
+            Column('lets_index_this', String(50))
+        )
+        cls.unique_constraint = schema.UniqueConstraint(
+            users_xtra.c.login_email, name='uq_login_email')
+        cls.bogus_index = schema.Index(
+            'idx_special_ops',
+            users_xtra.c.lets_index_this,
+            postgresql_where=users_xtra.c.lets_index_this > 'm')
+
+    def test_bad_args(self):
+        assert_raises(
+            ValueError,
+            insert(self.tables.users).on_conflict_do_nothing,
+            constraint='id', index_elements=['id']
+        )
+        assert_raises(
+            ValueError,
+            insert(self.tables.users).on_conflict_do_update,
+            constraint='id', index_elements=['id']
+        )
+        assert_raises(
+            ValueError,
+            insert(self.tables.users).on_conflict_do_update, constraint='id'
+        )
+        assert_raises(
+            ValueError,
+            insert(self.tables.users).on_conflict_do_update
+        )
+
+    def test_on_conflict_do_nothing(self):
+        users = self.tables.users
+
+        with testing.db.connect() as conn:
+            conn.execute(
+                insert(users).on_conflict_do_nothing(),
+                dict(id=1, name='name1')
+            )
+            conn.execute(
+                insert(users).on_conflict_do_nothing(),
+                dict(id=1, name='name2')
+            )
+            eq_(
+                conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+                [(1, 'name1')]
+            )
+
+    @testing.provide_metadata
+    def test_on_conflict_do_nothing_target(self):
+        users = self.tables.users
+
+        with testing.db.connect() as conn:
+            conn.execute(
+                insert(users)
+                .on_conflict_do_nothing(
+                    index_elements=users.primary_key.columns),
+                dict(id=1, name='name1')
+            )
+            conn.execute(
+                insert(users)
+                .on_conflict_do_nothing(
+                    index_elements=users.primary_key.columns),
+                dict(id=1, name='name2')
+            )
+            eq_(
+                conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+                [(1, 'name1')]
+            )
+
+    def test_on_conflict_do_update_one(self):
+        users = self.tables.users
+
+        with testing.db.connect() as conn:
+            conn.execute(users.insert(), dict(id=1, name='name1'))
+
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                index_elements=[users.c.id],
+                set_=dict(name=i.excluded.name))
+            conn.execute(i, dict(id=1, name='name1'))
+
+            eq_(
+                conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+                [(1, 'name1')]
+            )
+
+    def test_on_conflict_do_update_two(self):
+        users = self.tables.users
+
+        with testing.db.connect() as conn:
+            conn.execute(users.insert(), dict(id=1, name='name1'))
+
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                index_elements=[users.c.id],
+                set_=dict(id=i.excluded.id, name=i.excluded.name)
+            )
+
+            conn.execute(i, dict(id=1, name='name2'))
+            eq_(
+                conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+                [(1, 'name2')]
+            )
+
+    def test_on_conflict_do_update_three(self):
+        users = self.tables.users
+
+        with testing.db.connect() as conn:
+            conn.execute(users.insert(), dict(id=1, name='name1'))
+
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                index_elements=users.primary_key.columns,
+                set_=dict(name=i.excluded.name)
+            )
+            conn.execute(i, dict(id=1, name='name3'))
+
+            eq_(
+                conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+                [(1, 'name3')]
+            )
+
+    def test_on_conflict_do_update_four(self):
+        users = self.tables.users
+
+        with testing.db.connect() as conn:
+            conn.execute(users.insert(), dict(id=1, name='name1'))
+
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                index_elements=users.primary_key.columns,
+                set_=dict(id=i.excluded.id, name=i.excluded.name)
+            ).values(id=1, name='name4')
+
+            conn.execute(i)
+
+            eq_(
+                conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+                [(1, 'name4')]
+            )
+
+    def test_on_conflict_do_update_five(self):
+        users = self.tables.users
+
+        with testing.db.connect() as conn:
+            conn.execute(users.insert(), dict(id=1, name='name1'))
+
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                index_elements=users.primary_key.columns,
+                set_=dict(id=10, name="I'm a name")
+            ).values(id=1, name='name4')
+
+            conn.execute(i)
+
+            eq_(
+                conn.execute(
+                    users.select().where(users.c.id == 10)).fetchall(),
+                [(10, "I'm a name")]
+            )
+
+    def _exotic_targets_fixture(self, conn):
+        users = self.tables.users_xtra
+
+        conn.execute(
+            insert(users),
+            dict(
+                id=1, name='name1',
+                login_email='name1@gmail.com', lets_index_this='not'
+            )
+        )
+        conn.execute(
+            users.insert(),
+            dict(
+                id=2, name='name2',
+                login_email='name2@gmail.com', lets_index_this='not'
+            )
+        )
+
+        eq_(
+            conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+            [(1, 'name1', 'name1@gmail.com', 'not')]
+        )
+
+    def test_on_conflict_do_update_exotic_targets_two(self):
+        users = self.tables.users_xtra
+
+        with testing.db.connect() as conn:
+            self._exotic_targets_fixture(conn)
+            # try primary key constraint: cause an upsert on unique id column
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                index_elements=users.primary_key.columns,
+                set_=dict(
+                    name=i.excluded.name,
+                    login_email=i.excluded.login_email)
+            )
+            conn.execute(i, dict(
+                id=1, name='name2', login_email='name1@gmail.com',
+                lets_index_this='not')
+            )
+
+            eq_(
+                conn.execute(users.select().where(users.c.id == 1)).fetchall(),
+                [(1, 'name2', 'name1@gmail.com', 'not')]
+            )
+
+    def test_on_conflict_do_update_exotic_targets_three(self):
+        users = self.tables.users_xtra
+
+        with testing.db.connect() as conn:
+            self._exotic_targets_fixture(conn)
+            # try unique constraint: cause an upsert on target
+            # login_email, not id
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                constraint=self.unique_constraint,
+                set_=dict(id=i.excluded.id, name=i.excluded.name,
+                          login_email=i.excluded.login_email)
+            )
+            # note: lets_index_this value totally ignored in SET clause.
+            conn.execute(i, dict(
+                id=42, name='nameunique',
+                login_email='name2@gmail.com', lets_index_this='unique')
+            )
+
+            eq_(
+                conn.execute(
+                    users.select().
+                    where(users.c.login_email == 'name2@gmail.com')
+                ).fetchall(),
+                [(42, 'nameunique', 'name2@gmail.com', 'not')]
+            )
+
+    def test_on_conflict_do_update_exotic_targets_four(self):
+        users = self.tables.users_xtra
+
+        with testing.db.connect() as conn:
+            self._exotic_targets_fixture(conn)
+            # try unique constraint by name: cause an
+            # upsert on target login_email, not id
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                constraint=self.unique_constraint.name,
+                set_=dict(
+                    id=i.excluded.id, name=i.excluded.name,
+                    login_email=i.excluded.login_email)
+            )
+            # note: lets_index_this value totally ignored in SET clause.
+
+            conn.execute(i, dict(
+                id=43, name='nameunique2',
+                login_email='name2@gmail.com', lets_index_this='unique')
+            )
+
+            eq_(
+                conn.execute(
+                    users.select().
+                    where(users.c.login_email == 'name2@gmail.com')
+                ).fetchall(),
+                [(43, 'nameunique2', 'name2@gmail.com', 'not')]
+            )
+
+    def test_on_conflict_do_update_exotic_targets_five(self):
+        users = self.tables.users_xtra
+
+        with testing.db.connect() as conn:
+            self._exotic_targets_fixture(conn)
+            # try bogus index
+            i = insert(users)
+            i = i.on_conflict_do_update(
+                index_elements=self.bogus_index.columns,
+                index_where=self.
+                bogus_index.dialect_options['postgresql']['where'],
+                set_=dict(
+                    name=i.excluded.name,
+                    login_email=i.excluded.login_email)
+            )
+
+            assert_raises(
+                exc.ProgrammingError, conn.execute, i,
+                dict(
+                    id=1, name='namebogus', login_email='bogus@gmail.com',
+                    lets_index_this='bogus')
+            )