import collections.abc as collections_abc
import contextlib
from enum import IntEnum
+import functools
import itertools
import operator
import re
generic_setinputsizes: Optional[_GenericSetInputSizesType],
batch_size: int,
sort_by_parameter_order: bool,
+ schema_translate_map: Optional[SchemaTranslateMapType],
) -> Iterator[_InsertManyValuesBatch]:
imv = self._insertmanyvalues
assert imv is not None
)
return
- executemany_values = f"({imv.single_values_expr})"
+ if schema_translate_map:
+ rst = functools.partial(
+ self.preparer._render_schema_translates,
+ schema_translate_map=schema_translate_map,
+ )
+ else:
+ rst = None
+
+ imv_single_values_expr = imv.single_values_expr
+ if rst:
+ imv_single_values_expr = rst(imv_single_values_expr)
+
+ executemany_values = f"({imv_single_values_expr})"
statement = statement.replace(executemany_values, "__EXECMANY_TOKEN__")
# Use optional insertmanyvalues_max_parameters
insert_crud_params = imv.insert_crud_params
assert insert_crud_params is not None
+ if rst:
+ insert_crud_params = [
+ (col, key, rst(expr), st)
+ for col, key, expr, st in insert_crud_params
+ ]
+
escaped_bind_names: Mapping[str, str]
expand_pos_lower_index = expand_pos_upper_index = 0
if imv.embed_values_counter:
executemany_values_w_comma = (
- f"({imv.single_values_expr}, _IMV_VALUES_COUNTER), "
+ f"({imv_single_values_expr}, _IMV_VALUES_COUNTER), "
)
else:
- executemany_values_w_comma = f"({imv.single_values_expr}), "
+ executemany_values_w_comma = f"({imv_single_values_expr}), "
all_names_we_will_expand: Set[str] = set()
for elem in imv.insert_crud_params:
from sqlalchemy import INT
from sqlalchemy import Integer
from sqlalchemy import literal
+from sqlalchemy import MetaData
from sqlalchemy import select
from sqlalchemy import Sequence
from sqlalchemy import sql
coll(expected_data),
)
+ @testing.requires.sequences
+ @testing.variation("explicit_sentinel", [True, False])
+ @testing.variation("sequence_actually_translates", [True, False])
+ @testing.variation("the_table_translates", [True, False])
+ def test_sequence_schema_translate(
+ self,
+ metadata,
+ connection,
+ explicit_sentinel,
+ warn_for_downgrades,
+ randomize_returning,
+ sort_by_parameter_order,
+ sequence_actually_translates,
+ the_table_translates,
+ ):
+ """test #11157"""
+
+ # so there's a bit of a bug which is that functions has_table()
+ # and has_sequence() do not take schema translate map into account,
+ # at all. So on MySQL, where we dont have transactional DDL, the
+ # DROP for Table / Sequence does not really work for all test runs
+ # when the schema is set to a "to be translated" kind of name.
+ # so, make a Table/Sequence with fixed schema name for the CREATE,
+ # then use a different object for the test that has a translate
+ # schema name
+ Table(
+ "t1",
+ metadata,
+ Column(
+ "id",
+ Integer,
+ Sequence("some_seq", start=1, schema=config.test_schema),
+ primary_key=True,
+ insert_sentinel=bool(explicit_sentinel),
+ ),
+ Column("data", String(50)),
+ schema=config.test_schema if the_table_translates else None,
+ )
+ metadata.create_all(connection)
+
+ if sequence_actually_translates:
+ connection = connection.execution_options(
+ schema_translate_map={
+ "should_be_translated": config.test_schema
+ }
+ )
+ sequence = Sequence(
+ "some_seq", start=1, schema="should_be_translated"
+ )
+ else:
+ connection = connection.execution_options(
+ schema_translate_map={"foo": "bar"}
+ )
+ sequence = Sequence("some_seq", start=1, schema=config.test_schema)
+
+ m2 = MetaData()
+ t1 = Table(
+ "t1",
+ m2,
+ Column(
+ "id",
+ Integer,
+ sequence,
+ primary_key=True,
+ insert_sentinel=bool(explicit_sentinel),
+ ),
+ Column("data", String(50)),
+ schema=(
+ "should_be_translated"
+ if sequence_actually_translates and the_table_translates
+ else config.test_schema if the_table_translates else None
+ ),
+ )
+
+ fixtures.insertmanyvalues_fixture(
+ connection,
+ randomize_rows=bool(randomize_returning),
+ warn_on_downgraded=bool(warn_for_downgrades),
+ )
+
+ stmt = insert(t1).returning(
+ t1.c.id,
+ t1.c.data,
+ sort_by_parameter_order=bool(sort_by_parameter_order),
+ )
+ data = [{"data": f"d{i}"} for i in range(10)]
+
+ use_imv = testing.db.dialect.use_insertmanyvalues
+ if (
+ use_imv
+ and explicit_sentinel
+ and sort_by_parameter_order
+ and not (
+ testing.db.dialect.insertmanyvalues_implicit_sentinel
+ & InsertmanyvaluesSentinelOpts.SEQUENCE
+ )
+ ):
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ r"Column t1.id can't be explicitly marked as a sentinel "
+ r"column .* as the particular type of default generation",
+ ):
+ connection.execute(stmt, data)
+ return
+
+ with self._expect_downgrade_warnings(
+ warn_for_downgrades=warn_for_downgrades,
+ sort_by_parameter_order=sort_by_parameter_order,
+ server_autoincrement=True,
+ autoincrement_is_sequence=True,
+ ):
+ result = connection.execute(stmt, data)
+
+ if sort_by_parameter_order:
+ coll = list
+ else:
+ coll = set
+
+ expected_data = [(i + 1, f"d{i}") for i in range(10)]
+
+ eq_(
+ coll(result),
+ coll(expected_data),
+ )
+
@testing.combinations(
Integer(),
String(50),