from ... import exc
from ... import util
+from ...sql.base import _exclusive_against
from ...sql.base import _generative
from ...sql.dml import Insert as StandardInsert
from ...sql.elements import ClauseElement
return alias(self.table, name="inserted")
@_generative
+ @_exclusive_against(
+ "_post_values_clause",
+ msgs={
+ "_post_values_clause": "This Insert construct already "
+ "has an ON DUPLICATE KEY clause present"
+ },
+ )
def on_duplicate_key_update(self, *args, **kw):
r"""
Specifies the ON DUPLICATE KEY UPDATE clause.
from ...sql import coercions
from ...sql import roles
from ...sql import schema
+from ...sql.base import _exclusive_against
from ...sql.base import _generative
from ...sql.dml import Insert as StandardInsert
from ...sql.elements import ClauseElement
"""
return alias(self.table, name="excluded").columns
+ _on_conflict_exclusive = _exclusive_against(
+ "_post_values_clause",
+ msgs={
+ "_post_values_clause": "This Insert construct already has "
+ "an ON CONFLICT clause established"
+ },
+ )
+
@_generative
+ @_on_conflict_exclusive
def on_conflict_do_update(
self,
constraint=None,
)
@_generative
+ @_on_conflict_exclusive
def on_conflict_do_nothing(
self, constraint=None, index_elements=None, index_where=None
):
from ... import util
from ...sql import coercions
from ...sql import roles
+from ...sql.base import _exclusive_against
from ...sql.base import _generative
from ...sql.dml import Insert as StandardInsert
from ...sql.elements import ClauseElement
"""
return alias(self.table, name="excluded").columns
+ _on_conflict_exclusive = _exclusive_against(
+ "_post_values_clause",
+ msgs={
+ "_post_values_clause": "This Insert construct already has "
+ "an ON CONFLICT clause established"
+ },
+ )
+
@_generative
+ @_on_conflict_exclusive
def on_conflict_do_update(
self,
index_elements=None,
)
@_generative
+ @_on_conflict_exclusive
def on_conflict_do_nothing(self, index_elements=None, index_where=None):
"""
Specifies a DO NOTHING action for ON CONFLICT clause.
return decorated
+def _exclusive_against(*names, **kw):
+ msgs = kw.pop("msgs", {})
+
+ defaults = kw.pop("defaults", {})
+
+ getters = [
+ (name, operator.attrgetter(name), defaults.get(name, None))
+ for name in names
+ ]
+
+ @util.decorator
+ def check(fn, self, *args, **kw):
+ for name, getter, default_ in getters:
+ if getter(self) is not default_:
+ msg = msgs.get(
+ name,
+ "Method %s() has already been invoked on this %s construct"
+ % (fn.__name__, self.__class__),
+ )
+ raise exc.InvalidRequestError(msg)
+ return fn(self, *args, **kw)
+
+ return check
+
+
def _clone(element, **kw):
return element._clone()
from . import roles
from . import util as sql_util
from .base import _entity_namespace_key
+from .base import _exclusive_against
from .base import _from_objects
from .base import _generative
from .base import ColumnCollection
self._setup_prefixes(prefixes)
@_generative
+ @_exclusive_against(
+ "_select_names",
+ "_ordered_values",
+ msgs={
+ "_select_names": "This construct already inserts from a SELECT",
+ "_ordered_values": "This statement already has ordered "
+ "values present",
+ },
+ )
def values(self, *args, **kwargs):
r"""Specify a fixed VALUES clause for an INSERT statement, or the SET
clause for an UPDATE.
"""
- if self._select_names:
- raise exc.InvalidRequestError(
- "This construct already inserts from a SELECT"
- )
- elif self._ordered_values:
- raise exc.ArgumentError(
- "This statement already has ordered values present"
- )
-
if args:
# positional case. this is currently expensive. we don't
# yet have positional-only args so we have to check the length.
self._values = util.immutabledict(arg)
@_generative
+ @_exclusive_against(
+ "_returning",
+ msgs={
+ "_returning": "RETURNING is already configured on this statement"
+ },
+ defaults={"_returning": _returning},
+ )
def return_defaults(self, *cols):
"""Make use of a :term:`RETURNING` clause for the purpose
of fetching server-side expressions and defaults.
:attr:`_engine.CursorResult.inserted_primary_key_rows`
"""
- if self._returning:
- raise exc.InvalidRequestError(
- "RETURNING is already configured on this statement"
- )
self._return_defaults = cols or True
Column("baz", String(10)),
)
+ def test_no_call_twice(self):
+ stmt = insert(self.table).values(
+ [{"id": 1, "bar": "ab"}, {"id": 2, "bar": "b"}]
+ )
+ stmt = stmt.on_duplicate_key_update(
+ bar=stmt.inserted.bar, baz=stmt.inserted.baz
+ )
+ with testing.expect_raises_message(
+ exc.InvalidRequestError,
+ "This Insert construct already has an "
+ "ON DUPLICATE KEY clause present",
+ ):
+ stmt = stmt.on_duplicate_key_update(
+ bar=stmt.inserted.bar, baz=stmt.inserted.baz
+ )
+
def test_from_values(self):
stmt = insert(self.table).values(
[{"id": 1, "bar": "ab"}, {"id": 2, "bar": "b"}]
"goofy_index", table1.c.name, postgresql_where=table1.c.name > "m"
)
+ def test_on_conflict_do_no_call_twice(self):
+ users = self.table1
+
+ for stmt in (
+ insert(users).on_conflict_do_nothing(),
+ insert(users).on_conflict_do_update(
+ index_elements=[users.c.myid], set_=dict(name="foo")
+ ),
+ ):
+ for meth in (
+ stmt.on_conflict_do_nothing,
+ stmt.on_conflict_do_update,
+ ):
+
+ with testing.expect_raises_message(
+ exc.InvalidRequestError,
+ "This Insert construct already has an "
+ "ON CONFLICT clause established",
+ ):
+ meth()
+
def test_do_nothing_no_target(self):
i = insert(
ValueError, insert(self.tables.users).on_conflict_do_update
)
+ def test_on_conflict_do_no_call_twice(self):
+ users = self.tables.users
+
+ for stmt in (
+ insert(users).on_conflict_do_nothing(),
+ insert(users).on_conflict_do_update(
+ index_elements=[users.c.id], set_=dict(name="foo")
+ ),
+ ):
+ for meth in (
+ stmt.on_conflict_do_nothing,
+ stmt.on_conflict_do_update,
+ ):
+
+ with testing.expect_raises_message(
+ exc.InvalidRequestError,
+ "This Insert construct already has an "
+ "ON CONFLICT clause established",
+ ):
+ meth()
+
def test_on_conflict_do_nothing(self, connection):
users = self.tables.users
stmt = table1.update().ordered_values(("myid", 1), ("name", "d1"))
assert_raises_message(
- exc.ArgumentError,
+ exc.InvalidRequestError,
"This statement already has ordered values present",
stmt.values,
{"myid": 2, "name": "d2"},