self._insertmanyvalues = _InsertManyValues(
True,
self.dialect.default_metavalue_token,
- cast(
- "List[crud._CrudParamElementStr]", crud_params_single
- ),
+ crud_params_single,
counted_bindparam,
sort_by_parameter_order=(
insert_stmt._sort_by_parameter_order
for crud_param_set in crud_params_struct.all_multi_params
),
)
- else:
- insert_single_values_expr = ", ".join(
- [
- value
- for _, _, value, _ in cast(
- "List[crud._CrudParamElementStr]",
- crud_params_single,
- )
- ]
- )
+ elif use_insertmanyvalues:
+ if (
+ implicit_sentinel
+ and (
+ self.dialect.insertmanyvalues_implicit_sentinel
+ & InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
+ )
+ # this is checking if we have
+ # INSERT INTO table (id) VALUES (DEFAULT).
+ and not (crud_params_struct.is_default_metavalue_only)
+ ):
+ # if we have a sentinel column that is server generated,
+ # then for selected backends render the VALUES list as a
+ # subquery. This is the orderable form supported by
+ # PostgreSQL and in fewer cases SQL Server
+ embed_sentinel_value = True
+
+ render_bind_casts = (
+ self.dialect.insertmanyvalues_implicit_sentinel
+ & InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS
+ )
- if use_insertmanyvalues:
- if (
- implicit_sentinel
- and (
- self.dialect.insertmanyvalues_implicit_sentinel
- & InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
- )
- # this is checking if we have
- # INSERT INTO table (id) VALUES (DEFAULT).
- and not (crud_params_struct.is_default_metavalue_only)
- ):
- # if we have a sentinel column that is server generated,
- # then for selected backends render the VALUES list as a
- # subquery. This is the orderable form supported by
- # PostgreSQL and SQL Server.
- embed_sentinel_value = True
+ add_sentinel_set = add_sentinel_cols or ()
- render_bind_casts = (
- self.dialect.insertmanyvalues_implicit_sentinel
- & InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS
- )
+ insert_single_values_expr = ", ".join(
+ [
+ value
+ for col, _, value, _ in crud_params_single
+ if col not in add_sentinel_set
+ ]
+ )
- colnames = ", ".join(
- f"p{i}" for i, _ in enumerate(crud_params_single)
- )
+ colnames = ", ".join(
+ f"p{i}"
+ for i, cp in enumerate(crud_params_single)
+ if cp[0] not in add_sentinel_set
+ )
- if render_bind_casts:
- # render casts for the SELECT list. For PG, we are
- # already rendering bind casts in the parameter list,
- # selectively for the more "tricky" types like ARRAY.
- # however, even for the "easy" types, if the parameter
- # is NULL for every entry, PG gives up and says
- # "it must be TEXT", which fails for other easy types
- # like ints. So we cast on this side too.
- colnames_w_cast = ", ".join(
+ if render_bind_casts:
+ # render casts for the SELECT list. For PG, we are
+ # already rendering bind casts in the parameter list,
+ # selectively for the more "tricky" types like ARRAY.
+ # however, even for the "easy" types, if the parameter
+ # is NULL for every entry, PG gives up and says
+ # "it must be TEXT", which fails for other easy types
+ # like ints. So we cast on this side too.
+ colnames_w_cast = ", ".join(
+ (
self.render_bind_cast(
col.type,
col.type._unwrapped_dialect_impl(self.dialect),
f"p{i}",
)
- for i, (col, *_) in enumerate(crud_params_single)
+ if col not in add_sentinel_set
+ else expr
+ )
+ for i, (col, _, expr, _) in enumerate(
+ crud_params_single
)
- else:
- colnames_w_cast = colnames
-
- text += (
- f" SELECT {colnames_w_cast} FROM "
- f"(VALUES ({insert_single_values_expr})) "
- f"AS imp_sen({colnames}, sen_counter) "
- "ORDER BY sen_counter"
)
else:
- # otherwise, if no sentinel or backend doesn't support
- # orderable subquery form, use a plain VALUES list
- embed_sentinel_value = False
- text += f" VALUES ({insert_single_values_expr})"
+ colnames_w_cast = ", ".join(
+ (f"p{i}" if col not in add_sentinel_set else expr)
+ for i, (col, _, expr, _) in enumerate(
+ crud_params_single
+ )
+ )
- self._insertmanyvalues = _InsertManyValues(
- is_default_expr=False,
- single_values_expr=insert_single_values_expr,
- insert_crud_params=cast(
- "List[crud._CrudParamElementStr]",
- crud_params_single,
- ),
- num_positional_params_counted=counted_bindparam,
- sort_by_parameter_order=(
- insert_stmt._sort_by_parameter_order
- ),
- includes_upsert_behaviors=(
- insert_stmt._post_values_clause is not None
- ),
- sentinel_columns=add_sentinel_cols,
- num_sentinel_columns=(
- len(add_sentinel_cols) if add_sentinel_cols else 0
- ),
- sentinel_param_keys=named_sentinel_params,
- implicit_sentinel=implicit_sentinel,
- embed_values_counter=embed_sentinel_value,
+ insert_crud_params = [
+ elem
+ for elem in crud_params_single
+ if elem[0] not in add_sentinel_set
+ ]
+
+ text += (
+ f" SELECT {colnames_w_cast} FROM "
+ f"(VALUES ({insert_single_values_expr})) "
+ f"AS imp_sen({colnames}, sen_counter) "
+ "ORDER BY sen_counter"
)
else:
+ # otherwise, if no sentinel or backend doesn't support
+ # orderable subquery form, use a plain VALUES list
+ embed_sentinel_value = False
+ insert_crud_params = crud_params_single
+ insert_single_values_expr = ", ".join(
+ [value for _, _, value, _ in crud_params_single]
+ )
+
text += f" VALUES ({insert_single_values_expr})"
+ self._insertmanyvalues = _InsertManyValues(
+ is_default_expr=False,
+ single_values_expr=insert_single_values_expr,
+ insert_crud_params=insert_crud_params,
+ num_positional_params_counted=counted_bindparam,
+ sort_by_parameter_order=(insert_stmt._sort_by_parameter_order),
+ includes_upsert_behaviors=(
+ insert_stmt._post_values_clause is not None
+ ),
+ sentinel_columns=add_sentinel_cols,
+ num_sentinel_columns=(
+ len(add_sentinel_cols) if add_sentinel_cols else 0
+ ),
+ sentinel_param_keys=named_sentinel_params,
+ implicit_sentinel=implicit_sentinel,
+ embed_values_counter=embed_sentinel_value,
+ )
+
+ else:
+ insert_single_values_expr = ", ".join(
+ [value for _, _, value, _ in crud_params_single]
+ )
+
+ text += f" VALUES ({insert_single_values_expr})"
+
if insert_stmt._post_values_clause is not None:
post_values_clause = self.process(
insert_stmt._post_values_clause, **kw
metadata.create_all(connection)
self._assert_data_noautoincrement(connection, table)
+ def test_full_cursor_insertmanyvalues_sql(self, metadata, connection):
+ """test compilation/ execution of the subquery form including
+ the fix for #13015
+
+ The specific form in question for #13015 is only supported by the
+ PostgreSQL dialect right now. MSSQL would also use it for a server
+ side function that produces monotonic values, but we have no support
+ for that outside of sequence next right now, where SQL Server doesn't
+ support invokving the sequence outside of the VALUES tuples.
+
+ """
+
+ my_table = Table(
+ "my_table",
+ metadata,
+ Column("data1", String(50)),
+ Column(
+ "id",
+ Integer,
+ Sequence("foo_id_seq", start=1, data_type=Integer),
+ primary_key=True,
+ ),
+ Column("data2", String(50)),
+ )
+
+ my_table.create(connection)
+ with self.sql_execution_asserter(connection) as assert_:
+ connection.execute(
+ my_table.insert().returning(
+ my_table.c.data1,
+ my_table.c.id,
+ sort_by_parameter_order=True,
+ ),
+ [
+ {"data1": f"d1 row {i}", "data2": f"d2 row {i}"}
+ for i in range(10)
+ ],
+ )
+
+ render_bind_casts = (
+ String().dialect_impl(connection.dialect).render_bind_cast
+ )
+
+ if render_bind_casts:
+ varchar_cast = "::VARCHAR"
+ else:
+ varchar_cast = ""
+
+ if connection.dialect.paramstyle == "pyformat":
+ params = ", ".join(
+ f"(%(data1__{i})s{varchar_cast}, %(data2__{i})s"
+ f"{varchar_cast}, {i})"
+ for i in range(0, 10)
+ )
+ parameters = {}
+ for i in range(10):
+ parameters[f"data1__{i}"] = f"d1 row {i}"
+ parameters[f"data2__{i}"] = f"d2 row {i}"
+
+ elif connection.dialect.paramstyle == "numeric_dollar":
+ params = ", ".join(
+ f"(${i * 2 + 1}{varchar_cast}, "
+ f"${i * 2 + 2}{varchar_cast}, {i})"
+ for i in range(0, 10)
+ )
+ parameters = ()
+ for i in range(10):
+ parameters += (f"d1 row {i}", f"d2 row {i}")
+ elif connection.dialect.paramstyle == "format":
+ params = ", ".join(
+ f"(%s{varchar_cast}, %s{varchar_cast}, {i})"
+ for i in range(0, 10)
+ )
+ parameters = ()
+ for i in range(10):
+ parameters += (f"d1 row {i}", f"d2 row {i}")
+ elif connection.dialect.paramstyle == "qmark":
+ params = ", ".join(
+ f"(?{varchar_cast}, ?{varchar_cast}, {i})"
+ for i in range(0, 10)
+ )
+ parameters = ()
+ for i in range(10):
+ parameters += (f"d1 row {i}", f"d2 row {i}")
+ else:
+ assert False
+
+ assert_.assert_(
+ CursorSQL(
+ "INSERT INTO my_table (data1, id, data2) "
+ f"SELECT p0::VARCHAR, nextval('foo_id_seq'), p2::VARCHAR "
+ f"FROM (VALUES {params}) "
+ "AS imp_sen(p0, p2, sen_counter) ORDER BY sen_counter "
+ "RETURNING my_table.data1, my_table.id, my_table.id AS id__1",
+ parameters,
+ )
+ )
+
def _ints_and_strs_setinputsizes(self, connection):
return (
connection.dialect._bind_typing_render_casts