from .elements import ColumnClause
from .schema import default_is_clause_element
from .schema import default_is_sequence
+from .selectable import Select
from .selectable import TableClause
from .. import exc
from .. import util
if add_select_cols:
values.extend(add_select_cols)
ins_from_select = compiler.stack[-1]["insert_from_select"]
+ if not isinstance(ins_from_select, Select):
+ raise exc.CompileError(
+ f"Can't extend statement for INSERT..FROM SELECT to include "
+ f"additional default-holding column(s) "
+ f"""{
+ ', '.join(repr(key) for _, key, _ in add_select_cols)
+ }. Convert the selectable to a subquery() first, or pass """
+ "include_defaults=False to Insert.from_select() to skip these "
+ "columns."
+ )
ins_from_select = ins_from_select._generate()
- ins_from_select._raw_columns = tuple(
- ins_from_select._raw_columns
- ) + tuple(expr for col, col_expr, expr in add_select_cols)
+ # copy raw_columns
+ ins_from_select._raw_columns = list(ins_from_select._raw_columns) + [
+ expr for col, col_expr, expr in add_select_cols
+ ]
compiler.stack[-1]["insert_from_select"] = ins_from_select
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
checkparams={"name_1": "foo", "foo": None},
)
+ def test_insert_from_select_fn_defaults_compound(self):
+ """test #8073"""
+
+ metadata = MetaData()
+
+ table = Table(
+ "sometable",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("foo", Integer, default="foo"),
+ Column("bar", Integer, default="bar"),
+ )
+ table1 = self.tables.mytable
+ sel = (
+ select(table1.c.myid)
+ .where(table1.c.name == "foo")
+ .union(select(table1.c.myid).where(table1.c.name == "foo"))
+ )
+ ins = table.insert().from_select(["id"], sel)
+ with expect_raises_message(
+ exc.CompileError,
+ r"Can't extend statement for INSERT..FROM SELECT to include "
+ r"additional default-holding column\(s\) 'foo', 'bar'. "
+ r"Convert the selectable to a subquery\(\) first, or pass "
+ r"include_defaults=False to Insert.from_select\(\) to skip these "
+ r"columns.",
+ ):
+ ins.compile()
+
+ def test_insert_from_select_fn_defaults_compound_subquery(self):
+ """test #8073"""
+
+ metadata = MetaData()
+
+ def foo(ctx):
+ return 12
+
+ table = Table(
+ "sometable",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("foo", Integer, default="foo"),
+ Column("bar", Integer, default="bar"),
+ )
+ table1 = self.tables.mytable
+ sel = (
+ select(table1.c.myid)
+ .where(table1.c.name == "foo")
+ .union(select(table1.c.myid).where(table1.c.name == "foo"))
+ .subquery()
+ )
+
+ ins = table.insert().from_select(["id"], sel)
+ self.assert_compile(
+ ins,
+ "INSERT INTO sometable (id, foo, bar) SELECT anon_1.myid, "
+ ":foo AS anon_2, :bar AS anon_3 FROM "
+ "(SELECT mytable.myid AS myid FROM mytable "
+ "WHERE mytable.name = :name_1 UNION "
+ "SELECT mytable.myid AS myid FROM mytable "
+ "WHERE mytable.name = :name_2) AS anon_1",
+ checkparams={
+ "foo": None,
+ "bar": None,
+ "name_1": "foo",
+ "name_2": "foo",
+ },
+ )
+
def test_insert_from_select_dont_mutate_raw_columns(self):
# test [ticket:3603]
from sqlalchemy import table