]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
raise informative error when selectable can't be extended
authorMike Bayer <mike_mp@zzzcomputing.com>
Tue, 31 May 2022 14:48:16 +0000 (10:48 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 31 May 2022 17:01:09 +0000 (13:01 -0400)
An informative error is raised for the use case where
:meth:`.Insert.from_select` is being passed a "compound select" object such
as a UNION, yet the INSERT statement needs to append additional columns to
support Python-side or explicit SQL defaults from the table metadata. In
this case a subquery of the compound object should be passed.

Fixes: #8073
Change-Id: Ic4a5dbf84ec49d2451901be05cb9cf6ae93f02b7
(cherry picked from commit 7474df2159f89d684d32aabb15014ef95cea1641)

doc/build/changelog/unreleased_14/8073.rst [new file with mode: 0644]
lib/sqlalchemy/sql/crud.py
test/sql/test_insert.py

diff --git a/doc/build/changelog/unreleased_14/8073.rst b/doc/build/changelog/unreleased_14/8073.rst
new file mode 100644 (file)
index 0000000..57add15
--- /dev/null
@@ -0,0 +1,9 @@
+.. change::
+    :tags: bug, sql
+    :tickets: 8073
+
+    An informative error is raised for the use case where
+    :meth:`.Insert.from_select` is being passed a "compound select" object such
+    as a UNION, yet the INSERT statement needs to append additional columns to
+    support Python-side or explicit SQL defaults from the table metadata. In
+    this case a subquery of the compound object should be passed.
index 804777c29eeddc96fb1a3b70cefb31a500d8a3a2..920c8b35687ef85d6fd6fe2be018030f315c40ed 100644 (file)
@@ -16,6 +16,7 @@ from . import coercions
 from . import dml
 from . import elements
 from . import roles
+from .selectable import Select
 from .. import exc
 from .. import util
 
@@ -339,10 +340,20 @@ def _scan_insert_from_select_cols(
     if add_select_cols:
         values.extend(add_select_cols)
         ins_from_select = compiler.stack[-1]["insert_from_select"]
+        if not isinstance(ins_from_select, Select):
+            raise exc.CompileError(
+                "Can't extend statement for INSERT..FROM SELECT to include "
+                "additional default-holding column(s) "
+                "%s.  Convert the selectable to a subquery() first, or pass "
+                "include_defaults=False to Insert.from_select() to skip these "
+                "columns."
+                % (", ".join(repr(key) for _, key, _ in add_select_cols),)
+            )
         ins_from_select = ins_from_select._generate()
-        ins_from_select._raw_columns = tuple(
-            ins_from_select._raw_columns
-        ) + tuple(expr for col, col_expr, expr in add_select_cols)
+        # copy raw_columns
+        ins_from_select._raw_columns = list(ins_from_select._raw_columns) + [
+            expr for col, col_expr, expr in add_select_cols
+        ]
         compiler.stack[-1]["insert_from_select"] = ins_from_select
 
 
index 51045daac223bf4d78f42c5ef007977029733372..741859fb2cf7abb4c3c16f80607a4d2b69282e65 100644 (file)
@@ -24,6 +24,7 @@ from sqlalchemy.testing import assert_raises
 from sqlalchemy.testing import assert_raises_message
 from sqlalchemy.testing import AssertsCompiledSQL
 from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises_message
 from sqlalchemy.testing import expect_warnings
 from sqlalchemy.testing import fixtures
 
@@ -662,6 +663,75 @@ class InsertTest(_InsertTestBase, fixtures.TablesTest, AssertsCompiledSQL):
             checkparams={"name_1": "foo", "foo": None},
         )
 
+    def test_insert_from_select_fn_defaults_compound(self):
+        """test #8073"""
+
+        metadata = MetaData()
+
+        table = Table(
+            "sometable",
+            metadata,
+            Column("id", Integer, primary_key=True),
+            Column("foo", Integer, default="foo"),
+            Column("bar", Integer, default="bar"),
+        )
+        table1 = self.tables.mytable
+        sel = (
+            select(table1.c.myid)
+            .where(table1.c.name == "foo")
+            .union(select(table1.c.myid).where(table1.c.name == "foo"))
+        )
+        ins = table.insert().from_select(["id"], sel)
+        with expect_raises_message(
+            exc.CompileError,
+            r"Can't extend statement for INSERT..FROM SELECT to include "
+            r"additional default-holding column\(s\) 'foo', 'bar'.  "
+            r"Convert the selectable to a subquery\(\) first, or pass "
+            r"include_defaults=False to Insert.from_select\(\) to skip these "
+            r"columns.",
+        ):
+            ins.compile()
+
+    def test_insert_from_select_fn_defaults_compound_subquery(self):
+        """test #8073"""
+
+        metadata = MetaData()
+
+        def foo(ctx):
+            return 12
+
+        table = Table(
+            "sometable",
+            metadata,
+            Column("id", Integer, primary_key=True),
+            Column("foo", Integer, default="foo"),
+            Column("bar", Integer, default="bar"),
+        )
+        table1 = self.tables.mytable
+        sel = (
+            select(table1.c.myid)
+            .where(table1.c.name == "foo")
+            .union(select(table1.c.myid).where(table1.c.name == "foo"))
+            .subquery()
+        )
+
+        ins = table.insert().from_select(["id"], sel)
+        self.assert_compile(
+            ins,
+            "INSERT INTO sometable (id, foo, bar) SELECT anon_1.myid, "
+            ":foo AS anon_2, :bar AS anon_3 FROM "
+            "(SELECT mytable.myid AS myid FROM mytable "
+            "WHERE mytable.name = :name_1 UNION "
+            "SELECT mytable.myid AS myid FROM mytable "
+            "WHERE mytable.name = :name_2) AS anon_1",
+            checkparams={
+                "foo": None,
+                "bar": None,
+                "name_1": "foo",
+                "name_2": "foo",
+            },
+        )
+
     def test_insert_from_select_dont_mutate_raw_columns(self):
         # test [ticket:3603]
         from sqlalchemy import table