--- /dev/null
+.. change::
+ :tags: bug, engine
+ :tickets: 11160
+
+ Made a change to the adjustment made in version 2.0.10 for :ticket:`9618`,
+ which added the behavior of reconciling RETURNING rows from a bulk INSERT
+ to the parameters that were passed to it. This behavior included a
+ comparison of already-DB-converted bound parameter values against returned
+ row values that was not always "symmetrical" for SQL column types such as
+ UUIDs, depending on specifics of how different DBAPIs receive such values
+ versus how they return them, necessitating the need for additional
+ "sentinel value resolver" methods on these column types. Unfortunately
+ this broke third party column types such as UUID/GUID types in libraries
+ like SQLModel which did not implement this special method, raising an error
+ "Can't match sentinel values in result set to parameter sets". Rather than
+ attempt to further explain and document this implementation detail of the
+ "insertmanyvalues" feature including a public version of the new
+ method, the approach is intead revised to no longer need this extra
+ conversion step, and the logic that does the comparison now works on the
+ pre-converted bound parameter value compared to the post-result-processed
+ value, which should always be of a matching datatype. In the unusual case
+ that a custom SQL column type that also happens to be used in a "sentinel"
+ column for bulk INSERT is not receiving and returning the same value type,
+ the "Can't match" error will be raised, however the mitigation is
+ straightforward in that the same Python datatype should be passed as that
+ returned.
return process
- def _sentinel_value_resolver(self, dialect):
- if not self.native_uuid:
- # dealing entirely with strings going in and out of
- # CHAR(32)
- return None
-
- # true if we expect the returned UUID values to be strings
- # pymssql sends UUID objects back, pyodbc sends strings,
- # however pyodbc converts them to uppercase coming back, so
- # need special logic here
- character_based_uuid = not dialect.supports_native_uuid
-
- if character_based_uuid:
- # we sent UUID objects in all cases, see bind_processor()
- def process(uuid_value):
- return str(uuid_value).upper()
-
- return process
- elif not self.as_uuid:
- return _python_UUID
- else:
- return None
-
class UNIQUEIDENTIFIER(sqltypes.Uuid[sqltypes._UUID_RETURN]):
__visit_name__ = "UNIQUEIDENTIFIER"
else:
return None
- def _sentinel_value_resolver(self, dialect):
- """Return a callable that will receive the uuid object or string
- as it is normally passed to the DB in the parameter set, after
- bind_processor() is called. Convert this value to match
- what it would be as coming back from MariaDB RETURNING. this seems
- to be *after* SQLAlchemy's datatype has converted, so these
- will be UUID objects if as_uuid=True and dashed strings if
- as_uuid=False
-
- """
-
- if not dialect._allows_uuid_binds:
-
- def process(value):
- return (
- f"{value[0:8]}-{value[8:12]}-"
- f"{value[12:16]}-{value[16:20]}-{value[20:]}"
- )
-
- return process
- elif self.as_uuid:
- return str
- else:
- return None
-
class MariaDBDialect(MySQLDialect):
is_mariadb = True
from ..sql.elements import BindParameter
from ..sql.schema import Column
from ..sql.type_api import _BindProcessorType
+ from ..sql.type_api import _ResultProcessorType
from ..sql.type_api import TypeEngine
# When we're handed literal SQL, ensure it's a SELECT query
context = cast(DefaultExecutionContext, context)
compiled = cast(SQLCompiler, context.compiled)
+ _composite_sentinel_proc: Sequence[
+ Optional[_ResultProcessorType[Any]]
+ ] = ()
+ _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None
+ _sentinel_proc_initialized: bool = False
+
+ compiled_parameters = context.compiled_parameters
+
imv = compiled._insertmanyvalues
assert imv is not None
"insertmanyvalues_page_size", self.insertmanyvalues_page_size
)
- sentinel_value_resolvers = None
-
if compiled.schema_translate_map:
schema_translate_map = context.execution_options.get(
"schema_translate_map", {}
sort_by_parameter_order = imv.sort_by_parameter_order
- if imv.num_sentinel_columns:
- sentinel_value_resolvers = (
- compiled._imv_sentinel_value_resolvers
- )
else:
sort_by_parameter_order = False
result = None
for imv_batch in compiled._deliver_insertmanyvalues_batches(
statement,
parameters,
+ compiled_parameters,
generic_setinputsizes,
batch_size,
sort_by_parameter_order,
yield imv_batch
if is_returning:
+
rows = context.fetchall_for_returning(cursor)
# I would have thought "is_returning: Final[bool]"
# otherwise, create dictionaries to match up batches
# with parameters
assert imv.sentinel_param_keys
+ assert imv.sentinel_columns
+
+ _nsc = imv.num_sentinel_columns
+ if not _sentinel_proc_initialized:
+ if composite_sentinel:
+ _composite_sentinel_proc = [
+ col.type._cached_result_processor(
+ self, cursor_desc[1]
+ )
+ for col, cursor_desc in zip(
+ imv.sentinel_columns,
+ cursor.description[-_nsc:],
+ )
+ ]
+ else:
+ _scalar_sentinel_proc = (
+ imv.sentinel_columns[0]
+ ).type._cached_result_processor(
+ self, cursor.description[-1][1]
+ )
+ _sentinel_proc_initialized = True
+
+ rows_by_sentinel: Union[
+ Dict[Tuple[Any, ...], Any],
+ Dict[Any, Any],
+ ]
if composite_sentinel:
- _nsc = imv.num_sentinel_columns
rows_by_sentinel = {
- tuple(row[-_nsc:]): row for row in rows
+ tuple(
+ (proc(val) if proc else val)
+ for val, proc in zip(
+ row[-_nsc:], _composite_sentinel_proc
+ )
+ ): row
+ for row in rows
+ }
+ elif _scalar_sentinel_proc:
+ rows_by_sentinel = {
+ _scalar_sentinel_proc(row[-1]): row for row in rows
}
else:
rows_by_sentinel = {row[-1]: row for row in rows}
)
try:
- if composite_sentinel:
- if sentinel_value_resolvers:
- # composite sentinel (PK) with value resolvers
- ordered_rows = [
- rows_by_sentinel[
- tuple(
- (
- _resolver(parameters[_spk]) # type: ignore # noqa: E501
- if _resolver
- else parameters[_spk] # type: ignore # noqa: E501
- )
- for _resolver, _spk in zip(
- sentinel_value_resolvers,
- imv.sentinel_param_keys,
- )
- )
- ]
- for parameters in imv_batch.batch
- ]
- else:
- # composite sentinel (PK) with no value
- # resolvers
- ordered_rows = [
- rows_by_sentinel[
- tuple(
- parameters[_spk] # type: ignore
- for _spk in imv.sentinel_param_keys
- )
- ]
- for parameters in imv_batch.batch
- ]
- else:
- _sentinel_param_key = imv.sentinel_param_keys[0]
- if (
- sentinel_value_resolvers
- and sentinel_value_resolvers[0]
- ):
- # single-column sentinel with value resolver
- _sentinel_value_resolver = (
- sentinel_value_resolvers[0]
- )
- ordered_rows = [
- rows_by_sentinel[
- _sentinel_value_resolver(
- parameters[_sentinel_param_key] # type: ignore # noqa: E501
- )
- ]
- for parameters in imv_batch.batch
- ]
- else:
- # single-column sentinel with no value resolver
- ordered_rows = [
- rows_by_sentinel[
- parameters[_sentinel_param_key] # type: ignore # noqa: E501
- ]
- for parameters in imv_batch.batch
- ]
+ ordered_rows = [
+ rows_by_sentinel[sentinel_keys]
+ for sentinel_keys in imv_batch.sentinel_values
+ ]
except KeyError as ke:
# see test_insert_exec.py::
# IMVSentinelTest::test_sentinel_cant_match_keys
from .selectable import Select
from .selectable import SelectState
from .type_api import _BindProcessorType
- from .type_api import _SentinelProcessorType
from ..engine.cursor import CursorResultMetaData
from ..engine.interfaces import _CoreSingleExecuteParams
from ..engine.interfaces import _DBAPIAnyExecuteParams
"""
- sentinel_param_keys: Optional[Sequence[Union[str, int]]] = None
- """parameter str keys / int indexes in each param dictionary / tuple
+ sentinel_param_keys: Optional[Sequence[str]] = None
+ """parameter str keys in each param dictionary / tuple
that would link to the client side "sentinel" values for that row, which
we can use to match up parameter sets to result rows.
.. versionadded:: 2.0.10
+ .. versionchanged:: 2.0.29 - the sequence is now string dictionary keys
+ only, used against the "compiled parameteters" collection before
+ the parameters were converted by bound parameter processors
+
"""
implicit_sentinel: bool = False
replaced_parameters: _DBAPIAnyExecuteParams
processed_setinputsizes: Optional[_GenericSetInputSizesType]
batch: Sequence[_DBAPISingleExecuteParams]
+ sentinel_values: Sequence[Tuple[Any, ...]]
current_batch_size: int
batchnum: int
total_batches: int
for v in self._insertmanyvalues.insert_crud_params
]
- sentinel_param_int_idxs = (
- [
- self.positiontup.index(cast(str, _param_key))
- for _param_key in self._insertmanyvalues.sentinel_param_keys # noqa: E501
- ]
- if self._insertmanyvalues.sentinel_param_keys is not None
- else None
- )
-
self._insertmanyvalues = self._insertmanyvalues._replace(
single_values_expr=single_values_expr,
insert_crud_params=insert_crud_params,
- sentinel_param_keys=sentinel_param_int_idxs,
)
def _process_numeric(self):
for v in self._insertmanyvalues.insert_crud_params
]
- sentinel_param_int_idxs = (
- [
- self.positiontup.index(cast(str, _param_key))
- for _param_key in self._insertmanyvalues.sentinel_param_keys # noqa: E501
- ]
- if self._insertmanyvalues.sentinel_param_keys is not None
- else None
- )
-
self._insertmanyvalues = self._insertmanyvalues._replace(
# This has the numbers (:1, :2)
single_values_expr=single_values_expr,
# The single binds are instead %s so they can be formatted
insert_crud_params=insert_crud_params,
- sentinel_param_keys=sentinel_param_int_idxs,
)
@util.memoized_property
if value is not None
}
- @util.memoized_property
- def _imv_sentinel_value_resolvers(
- self,
- ) -> Optional[Sequence[Optional[_SentinelProcessorType[Any]]]]:
- imv = self._insertmanyvalues
- if imv is None or imv.sentinel_columns is None:
- return None
-
- sentinel_value_resolvers = [
- _scol.type._cached_sentinel_value_processor(self.dialect)
- for _scol in imv.sentinel_columns
- ]
- if util.NONE_SET.issuperset(sentinel_value_resolvers):
- return None
- else:
- return sentinel_value_resolvers
-
def is_subquery(self):
return len(self.stack) > 1
self,
statement: str,
parameters: _DBAPIMultiExecuteParams,
+ compiled_parameters: List[_MutableCoreSingleExecuteParams],
generic_setinputsizes: Optional[_GenericSetInputSizesType],
batch_size: int,
sort_by_parameter_order: bool,
imv = self._insertmanyvalues
assert imv is not None
+ if not imv.sentinel_param_keys:
+ _sentinel_from_params = None
+ else:
+ _sentinel_from_params = operator.itemgetter(
+ *imv.sentinel_param_keys
+ )
+
lenparams = len(parameters)
if imv.is_default_expr and not self.dialect.supports_default_metavalue:
# backend doesn't support
downgraded = False
if use_row_at_a_time:
- for batchnum, param in enumerate(
- cast("Sequence[_DBAPISingleExecuteParams]", parameters), 1
+ for batchnum, (param, compiled_param) in enumerate(
+ cast(
+ "Sequence[Tuple[_DBAPISingleExecuteParams, _MutableCoreSingleExecuteParams]]", # noqa: E501
+ zip(parameters, compiled_parameters),
+ ),
+ 1,
):
yield _InsertManyValuesBatch(
statement,
param,
generic_setinputsizes,
[param],
+ (
+ [_sentinel_from_params(compiled_param)]
+ if _sentinel_from_params
+ else []
+ ),
1,
batchnum,
lenparams,
)
batches = cast("List[Sequence[Any]]", list(parameters))
+ compiled_batches = cast(
+ "List[Sequence[Any]]", list(compiled_parameters)
+ )
processed_setinputsizes: Optional[_GenericSetInputSizesType] = None
batchnum = 1
while batches:
batch = batches[0:batch_size]
+ compiled_batch = compiled_batches[0:batch_size]
+
batches[0:batch_size] = []
+ compiled_batches[0:batch_size] = []
+
if batches:
current_batch_size = batch_size
else:
replaced_parameters,
processed_setinputsizes,
batch,
+ (
+ [_sentinel_from_params(cb) for cb in compiled_batch]
+ if _sentinel_from_params
+ else []
+ ),
current_batch_size,
batchnum,
total_batches,
return process
- def _sentinel_value_resolver(self, dialect):
- """For the "insertmanyvalues" feature only, return a callable that
- will receive the uuid object or string
- as it is normally passed to the DB in the parameter set, after
- bind_processor() is called. Convert this value to match
- what it would be as coming back from a RETURNING or similar
- statement for the given backend.
-
- Individual dialects and drivers may need their own implementations
- based on how their UUID types send data and how the drivers behave
- (e.g. pyodbc)
-
- """
- if not self.native_uuid or not dialect.supports_native_uuid:
- # dealing entirely with strings going in and out of
- # CHAR(32)
- return None
-
- elif self.as_uuid:
- # we sent UUID objects and we are getting UUID objects back
- return None
- else:
- # we sent strings and we are getting UUID objects back
- return _python_UUID
-
class UUID(Uuid[_UUID_RETURN], type_api.NativeForEmulated):
"""Represent the SQL UUID type.
"""
return None
- def _sentinel_value_resolver(
- self, dialect: Dialect
- ) -> Optional[_SentinelProcessorType[_T]]:
- """Return an optional callable that will match parameter values
- (post-bind processing) to result values
- (pre-result-processing), for use in the "sentinel" feature.
-
- .. versionadded:: 2.0.10
-
- """
- return None
-
@util.memoized_property
def _has_bind_expression(self) -> bool:
"""memoized boolean, check if bind_expression is implemented.
d["result"][coltype] = rp
return rp
- def _cached_sentinel_value_processor(
- self, dialect: Dialect
- ) -> Optional[_SentinelProcessorType[_T]]:
- try:
- return dialect._type_memos[self]["sentinel"]
- except KeyError:
- pass
-
- d = self._dialect_info(dialect)
- d["sentinel"] = bp = d["impl"]._sentinel_value_resolver(dialect)
- return bp
-
def _cached_custom_processor(
self, dialect: Dialect, key: str, fn: Callable[[TypeEngine[_T]], _O]
) -> _O:
# by not having the other methods we assert that those aren't being
# used
+ @property
+ def description(self):
+ return self.cursor.description
+
def fetchall(self):
rows = self.cursor.fetchall()
rows = list(rows)
asyncmy = mysql+asyncmy://scott:tiger@127.0.0.1:3306/test?charset=utf8mb4
mariadb = mariadb+mysqldb://scott:tiger@127.0.0.1:3306/test
mariadb_connector = mariadb+mariadbconnector://scott:tiger@127.0.0.1:3306/test
-mssql = mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes
-mssql_async = mssql+aioodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes
-pymssql = mssql+pymssql://scott:tiger^5HHH@mssql2017:1433/test
-docker_mssql = mssql+pyodbc://scott:tiger^5HHH@127.0.0.1:1433/test?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes
+mssql = mssql+pyodbc://scott:tiger^5HHH@mssql2022:1433/test?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes&Encrypt=Optional
+mssql_async = mssql+aioodbc://scott:tiger^5HHH@mssql2022:1433/test?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes&Encrypt=Optional
+pymssql = mssql+pymssql://scott:tiger^5HHH@mssql2022:1433/test
+docker_mssql = mssql+pyodbc://scott:tiger^5HHH@127.0.0.1:1433/test?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes&Encrypt=Optional
oracle = oracle+cx_oracle://scott:tiger@oracle18c/xe
cxoracle = oracle+cx_oracle://scott:tiger@oracle18c/xe
oracledb = oracle+oracledb://scott:tiger@oracle18c/xe
"""test assertions to ensure sentinel values passed in parameter
structures can be identified when they come back in cursor.fetchall().
- Values that are further modified by the database driver or by
- SQL expressions (as in the case below) before being INSERTed
- won't match coming back out, so datatypes need to implement
- _sentinel_value_resolver() if this is the case.
+ Sentinels are now matched based on the data on the outside of the
+ type, that is, before the bind, and after the result.
"""
if resolve_sentinel_values:
- def _sentinel_value_resolver(self, dialect):
- def fix_sentinels(value):
- return value.lower()
-
- return fix_sentinels
+ def process_result_value(self, value, dialect):
+ return value.replace("upper", "UPPER")
t1 = Table(
"data",
connection.execute(stmt, data)
else:
result = connection.execute(stmt, data)
- eq_(
- set(result.all()),
- {(f"d{i}", f"upper_d{i}") for i in range(10)},
- )
+ if resolve_sentinel_values:
+ eq_(
+ set(result.all()),
+ {(f"d{i}", f"UPPER_d{i}") for i in range(10)},
+ )
+ else:
+ eq_(
+ set(result.all()),
+ {(f"d{i}", f"upper_d{i}") for i in range(10)},
+ )
@testing.variation("add_insert_sentinel", [True, False])
def test_sentinel_insert_default_pk_only(