:ticket:`4449`
+.. _change_4808:
+
+New "post compile" bound parameters used for LIMIT/OFFSET in Oracle, SQL Server
+-------------------------------------------------------------------------------
+
+A major goal of the 1.4 series is to establish that all Core SQL constructs
+are completely cacheable, meaning that a particular :class:`.Compiled`
+structure will produce an identical SQL string regardless of any SQL parameters
+used with it, which notably includes those used to specify the LIMIT and
+OFFSET values, typically used for pagination and "top N" style results.
+
+While SQLAlchemy has used bound parameters for LIMIT/OFFSET schemes for many
+years, a few outliers remained where such parameters were not allowed, including
+a SQL Server "TOP N" statement, such as::
+
+ SELECT TOP 5 mytable.id, mytable.data FROM mytable
+
+as well as with Oracle, where the FIRST_ROWS() hint (which SQLAlchemy will
+use if the ``optimize_limits=True`` parameter is passed to
+:func:`.create_engine` with an Oracle URL) does not allow them,
+but also that using bound parameters with ROWNUM comparisons has been reported
+as producing slower query plans::
+
+ SELECT anon_1.id, anon_1.data FROM (
+ SELECT /*+ FIRST_ROWS(5) */
+ anon_2.id AS id,
+ anon_2.data AS data,
+ ROWNUM AS ora_rn FROM (
+ SELECT mytable.id, mytable.data FROM mytable
+ ) anon_2
+ WHERE ROWNUM <= :param_1
+ ) anon_1 WHERE ora_rn > :param_2
+
+In order to allow for all statements to be unconditionally cacheable at the
+compilation level, a new form of bound parameter called a "post compile"
+parameter has been added, which makes use of the same mechanism as that
+of "expanding IN parameters". This is a :func:`.bindparam` that behaves
+identically to any other bound parameter except that parameter value will
+be rendered literally into the SQL string before sending it to the DBAPI
+``cursor.execute()`` method. The new parameter is used internally by the
+SQL Server and Oracle dialects, so that the drivers receive the literal
+rendered value but the rest of SQLAlchemy can still consider this as a
+bound parameter. The above two statements when stringified using
+``str(statement.compile(dialect=<dialect>))`` now look like::
+
+ SELECT TOP [POSTCOMPILE_param_1] mytable.id, mytable.data FROM mytable
+
+and::
+
+ SELECT anon_1.id, anon_1.data FROM (
+ SELECT /*+ FIRST_ROWS([POSTCOMPILE__ora_frow_1]) */
+ anon_2.id AS id,
+ anon_2.data AS data,
+ ROWNUM AS ora_rn FROM (
+ SELECT mytable.id, mytable.data FROM mytable
+ ) anon_2
+ WHERE ROWNUM <= [POSTCOMPILE_param_1]
+ ) anon_1 WHERE ora_rn > [POSTCOMPILE_param_2]
+
+The ``[POSTCOMPILE_<param>]`` format is also what is seen when an
+"expanding IN" is used.
+
+When viewing the SQL logging output, the final form of the statement will
+be seen::
+
+ SELECT anon_1.id, anon_1.data FROM (
+ SELECT /*+ FIRST_ROWS(5) */
+ anon_2.id AS id,
+ anon_2.data AS data,
+ ROWNUM AS ora_rn FROM (
+ SELECT mytable.id AS id, mytable.data AS data FROM mytable
+ ) anon_2
+ WHERE ROWNUM <= 8
+ ) anon_1 WHERE ora_rn > 3
+
+
+The "post compile parameter" feature is exposed as public API through the
+:paramref:`.bindparam.literal_execute` parameter, however is currently not
+intended for general use. The literal values are rendered using the
+:meth:`.TypeEngine.literal_processor` of the underlying datatype, which in
+SQLAlchemy has **extremely limited** scope, supporting only integers and simple
+string values.
+
+:ticket:`4808`
+
.. _change_4712:
Connection-level transactions can now be inactive based on subtransaction
--- /dev/null
+.. change::
+ :tags: feature, sql, mssql, oracle
+ :tickets: 4808
+
+ Added new "post compile parameters" feature. This feature allows a
+ :func:`.bindparam` construct to have its value rendered into the SQL string
+ before being passed to the DBAPI driver, but after the compilation step,
+ using the "literal render" feature of the compiler. The immediate
+ rationale for this feature is to support LIMIT/OFFSET schemes that don't
+ work or perform well as bound parameters handled by the database driver,
+ while still allowing for SQLAlchemy SQL constructs to be cacheable in their
+ compiled form. The immediate targets for the new feature are the "TOP
+ N" clause used by SQL Server (and Sybase) which does not support a bound
+ parameter, as well as the "ROWNUM" and optional "FIRST_ROWS()" schemes used
+ by the Oracle dialect, the former of which has been known to perform better
+ without bound parameters and the latter of which does not support a bound
+ parameter. The feature builds upon the mechanisms first developed to
+ support "expanding" parameters for IN expressions. As part of this
+ feature, the Oracle ``use_binds_for_limits`` feature is turned on
+ unconditionally and this flag is now deprecated.
+
+ .. seealso::
+
+ :ref:`change_4808`
--------------------
MSSQL has no support for the LIMIT or OFFSET keywords. LIMIT is
-supported directly through the ``TOP`` Transact SQL keyword::
+supported directly through the ``TOP`` Transact SQL keyword. A statement
+such as::
- select.limit
+ select([some_table]).limit(5)
-will yield::
+will render similarly to::
- SELECT TOP n
+ SELECT TOP 5 col1, col2.. FROM table
-If using SQL Server 2005 or above, LIMIT with OFFSET
-support is available through the ``ROW_NUMBER OVER`` construct.
-For versions below 2005, LIMIT with OFFSET usage will fail.
+LIMIT with OFFSET support is implemented using the using the ``ROW_NUMBER()``
+window function. A statement such as::
+
+ select([some_table]).order_by(some_table.c.col3).limit(5).offset(10)
+
+will render similarly to::
+
+ SELECT anon_1.col1, anon_1.col2 FROM (SELECT col1, col2,
+ ROW_NUMBER() OVER (ORDER BY col3) AS
+ mssql_rn FROM table WHERE t.x = :x_1) AS
+ anon_1 WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1
+
+Note that when using LIMIT and OFFSET together, the statement must have
+an ORDER BY as well.
.. _mssql_isolation_level:
# ODBC drivers and possibly others
# don't support bind params in the SELECT clause on SQL Server.
# so have to use literal here.
- s += "TOP %d " % select._limit
-
+ kw["literal_execute"] = True
+ s += "TOP %s " % self.process(select._limit_clause, **kw)
if s:
return s
else:
* ``optimize_limits`` - defaults to ``False``. see the section on
LIMIT/OFFSET.
-* ``use_binds_for_limits`` - defaults to ``True``. see the section on
+* ``use_binds_for_limits`` - deprecated. see the section on
LIMIT/OFFSET.
Auto Increment Behavior
LIMIT/OFFSET Support
--------------------
-Oracle has no support for the LIMIT or OFFSET keywords. SQLAlchemy uses
-a wrapped subquery approach in conjunction with ROWNUM. The exact methodology
-is taken from
-http://www.oracle.com/technetwork/issue-archive/2006/06-sep/o56asktom-086197.html .
+Oracle has no direct support for LIMIT and OFFSET until version 12c.
+To achieve this behavior across all widely used versions of Oracle starting
+with the 8 series, SQLAlchemy currently makes use of ROWNUM to achieve
+LIMIT/OFFSET; the exact methodology is taken from
+https://blogs.oracle.com/oraclemagazine/on-rownum-and-limiting-results .
-There are two options which affect its behavior:
+There is currently a single option to affect its behavior:
-* the "FIRST ROWS()" optimization keyword is not used by default. To enable
+* the "FIRST_ROWS()" optimization keyword is not used by default. To enable
the usage of this optimization directive, specify ``optimize_limits=True``
to :func:`.create_engine`.
-* the values passed for the limit/offset are sent as bound parameters. Some
- users have observed that Oracle produces a poor query plan when the values
- are sent as binds and not rendered literally. To render the limit/offset
- values literally within the SQL statement, specify
- ``use_binds_for_limits=False`` to :func:`.create_engine`.
-
-Some users have reported better performance when the entirely different
-approach of a window query is used, i.e. ROW_NUMBER() OVER (ORDER BY), to
-provide LIMIT/OFFSET (note that the majority of users don't observe this).
-To suit this case the method used for LIMIT/OFFSET can be replaced entirely.
-See the recipe at
-http://www.sqlalchemy.org/trac/wiki/UsageRecipes/WindowFunctionsByDefault
-which installs a select compiler that overrides the generation of limit/offset
-with a window function.
+
+.. versionchanged:: 1.4
+ The Oracle dialect renders limit/offset integer values using a "post
+ compile" scheme which renders the integer directly before passing the
+ statement to the cursor for execution. The ``use_binds_for_limits`` flag
+ no longer has an effect.
+
+ .. seealso::
+
+ :ref:`change_4808`.
+
+Support for changing the row number strategy, which would include one that
+makes use of the ``row_number()`` window function as well as one that makes
+use of the Oracle 12c "FETCH FIRST N ROW / OFFSET N ROWS" keywords may be
+added in a future release.
+
.. _oracle_returning:
from ...types import CLOB
from ...types import FLOAT
from ...types import INTEGER
+from ...types import Integer
from ...types import NCHAR
from ...types import NVARCHAR
from ...types import TIMESTAMP
limit_clause = select._limit_clause
offset_clause = select._offset_clause
if limit_clause is not None or offset_clause is not None:
- # See http://www.oracle.com/technology/oramag/oracle/06-sep/\
- # o56asktom.html
- #
- # Generalized form of an Oracle pagination query:
- # select ... from (
- # select /*+ FIRST_ROWS(N) */ ...., rownum as ora_rn from
- # ( select distinct ... where ... order by ...
- # ) where ROWNUM <= :limit+:offset
- # ) where ora_rn > :offset
- # Outer select and "ROWNUM as ora_rn" can be dropped if
- # limit=0
+ # currently using form at:
+ # https://blogs.oracle.com/oraclemagazine/\
+ # on-rownum-and-limiting-results
kwargs["select_wraps_for"] = orig_select = select
select = select._generate()
and self.dialect.optimize_limits
and select._simple_int_limit
):
+ param = sql.bindparam(
+ "_ora_frow",
+ select._limit,
+ type_=Integer,
+ literal_execute=True,
+ unique=True,
+ )
limitselect = limitselect.prefix_with(
- "/*+ FIRST_ROWS(%d) */" % select._limit
+ expression.text(
+ "/*+ FIRST_ROWS(:_ora_frow) */"
+ ).bindparams(param)
)
limitselect._oracle_visit = True
# If needed, add the limiting clause
if limit_clause is not None:
- if not self.dialect.use_binds_for_limits:
- # use simple int limits, will raise an exception
- # if the limit isn't specified this way
+ if select._simple_int_limit and (
+ offset_clause is None or select._simple_int_offset
+ ):
max_row = select._limit
if offset_clause is not None:
max_row += select._offset
- max_row = sql.literal_column("%d" % max_row)
+ max_row = sql.bindparam(
+ None,
+ max_row,
+ type_=Integer,
+ literal_execute=True,
+ unique=True,
+ )
else:
max_row = limit_clause
if offset_clause is not None:
adapter.traverse(elem) for elem in for_update.of
]
- if not self.dialect.use_binds_for_limits:
- offset_clause = sql.literal_column(
- "%d" % select._offset
+ if select._simple_int_offset:
+ offset_clause = sql.bindparam(
+ None,
+ select._offset,
+ Integer,
+ literal_execute=True,
+ unique=True,
)
+
offsetselect = offsetselect.where(
sql.literal_column("ora_rn") > offset_clause
)
(sa_schema.Index, {"bitmap": False, "compress": False}),
]
+ @util.deprecated_params(
+ use_binds_for_limits=(
+ "1.4",
+ "The ``use_binds_for_limits`` Oracle dialect parameter is "
+ "deprecated. The dialect now renders LIMIT /OFFSET integers "
+ "inline in all cases using a post-compilation hook, so that the "
+ "value is still represented by a 'bound parameter' on the Core "
+ "Expression side.",
+ )
+ )
def __init__(
self,
use_ansi=True,
optimize_limits=False,
- use_binds_for_limits=True,
+ use_binds_for_limits=None,
use_nchar_for_unicode=False,
exclude_tablespaces=("SYSTEM", "SYSAUX"),
**kwargs
self._use_nchar_for_unicode = use_nchar_for_unicode
self.use_ansi = use_ansi
self.optimize_limits = optimize_limits
- self.use_binds_for_limits = use_binds_for_limits
self.exclude_tablespaces = exclude_tablespaces
def initialize(self, connection):
quote is True
or quote is not False
and self.preparer._bindparam_requires_quotes(name)
+ and not kw.get("post_compile", False)
):
- if kw.get("expanding", False):
- raise exc.CompileError(
- "Can't use expanding feature with parameter name "
- "%r on Oracle; it requires quoting which is not supported "
- "in this context." % name
- )
+ # interesting to note about expanding parameters - since the
+ # new parameters take the form <paramname>_<int>, at least if
+ # they are originally formed from reserved words, they no longer
+ # need quoting :). names that include illegal characters
+ # won't work however.
quoted_name = '"%s"' % name
self._quoted_bind_names[name] = quoted_name
return OracleCompiler.bindparam_string(self, quoted_name, **kw)
def get_select_precolumns(self, select, **kw):
s = select._distinct and "DISTINCT " or ""
- # TODO: don't think Sybase supports
- # bind params for FIRST / TOP
- limit = select._limit
- if limit:
- # if select._limit == 1:
- # s += "FIRST "
- # else:
- # s += "TOP %s " % (select._limit,)
- s += "TOP %s " % (limit,)
- offset = select._offset
- if offset:
+
+ if select._simple_int_limit and not select._offset:
+ kw["literal_execute"] = True
+ s += "TOP %s " % self.process(select._limit_clause, **kw)
+
+ if select._offset:
raise NotImplementedError("Sybase ASE does not support OFFSET")
return s
processors = compiled._bind_processors
- if compiled.contains_expanding_parameters:
- positiontup = self._expand_in_parameters(compiled, processors)
+ if compiled.literal_execute_params:
+ positiontup = self._literal_execute_parameters(
+ compiled, processors
+ )
elif compiled.positional:
positiontup = self.compiled.positiontup
return self
- def _expand_in_parameters(self, compiled, processors):
- """handle special 'expanding' parameters, IN tuples that are rendered
- on a per-parameter basis for an otherwise fixed SQL statement string.
+ def _literal_execute_parameters(self, compiled, processors):
+ """handle special post compile parameters.
+
+ These include:
+
+ * "expanding" parameters -typically IN tuples that are rendered
+ on a per-parameter basis for an otherwise fixed SQL statement string.
+
+ * literal_binds compiled with the literal_execute flag. Used for
+ things like SQL Server "TOP N" where the driver does not accommodate
+ N as a bound parameter.
"""
if self.executemany:
raise exc.InvalidRequestError(
- "'expanding' parameters can't be used with " "executemany()"
+ "'literal_execute' or 'expanding' parameters can't be "
+ "used with executemany()"
)
- if self.compiled.positional and self.compiled._numeric_binds:
- # I'm not familiar with any DBAPI that uses 'numeric'
+ if compiled.positional and compiled._numeric_binds:
+ # I'm not familiar with any DBAPI that uses 'numeric'.
+ # strategy would likely be to make use of numbers greater than
+ # the highest number present; then for expanding parameters,
+ # append them to the end of the parameter list. that way
+ # we avoid having to renumber all the existing parameters.
raise NotImplementedError(
- "'expanding' bind parameters not supported with "
- "'numeric' paramstyle at this time."
+ "'post-compile' bind parameters are not supported with "
+ "the 'numeric' paramstyle at this time."
)
self._expanded_parameters = {}
to_update_sets = {}
for name in (
- self.compiled.positiontup
+ compiled.positiontup
if compiled.positional
- else self.compiled.binds
+ else compiled.bind_names.values()
):
- parameter = self.compiled.binds[name]
- if parameter.expanding:
+ parameter = compiled.binds[name]
+ if parameter in compiled.literal_execute_params:
+
+ if not parameter.expanding:
+ value = compiled_params.pop(name)
+ replacement_expressions[
+ name
+ ] = compiled.render_literal_bindparam(
+ parameter, render_literal_value=value
+ )
+ continue
if name in replacement_expressions:
to_update = to_update_sets[name]
# param.
values = compiled_params.pop(name)
- if not values:
- to_update = to_update_sets[name] = []
- replacement_expressions[
- name
- ] = self.compiled.visit_empty_set_expr(
- parameter._expanding_in_types
- if parameter._expanding_in_types
- else [parameter.type]
- )
+ leep = compiled._literal_execute_expanding_parameter
+ to_update, replacement_expr = leep(name, parameter, values)
- elif isinstance(values[0], (tuple, list)):
- to_update = to_update_sets[name] = [
- ("%s_%s_%s" % (name, i, j), value)
- for i, tuple_element in enumerate(values, 1)
- for j, value in enumerate(tuple_element, 1)
- ]
- replacement_expressions[name] = (
- "VALUES " if self.dialect.tuple_in_values else ""
- ) + ", ".join(
- "(%s)"
- % ", ".join(
- self.compiled.bindtemplate
- % {
- "name": to_update[
- i * len(tuple_element) + j
- ][0]
- }
- for j, value in enumerate(tuple_element)
- )
- for i, tuple_element in enumerate(values)
- )
- else:
- to_update = to_update_sets[name] = [
- ("%s_%s" % (name, i), value)
- for i, value in enumerate(values, 1)
- ]
- replacement_expressions[name] = ", ".join(
- self.compiled.bindtemplate % {"name": key}
- for key, value in to_update
- )
+ to_update_sets[name] = to_update
+ replacement_expressions[name] = replacement_expr
- compiled_params.update(to_update)
- processors.update(
- (key, processors[name])
- for key, value in to_update
- if name in processors
- )
- if compiled.positional:
- positiontup.extend(name for name, value in to_update)
- self._expanded_parameters[name] = [
- expand_key for expand_key, value in to_update
- ]
+ if not parameter.literal_execute:
+ compiled_params.update(to_update)
+
+ processors.update(
+ (key, processors[name])
+ for key, value in to_update
+ if name in processors
+ )
+ if compiled.positional:
+ positiontup.extend(name for name, value in to_update)
+ self._expanded_parameters[name] = [
+ expand_key for expand_key, value in to_update
+ ]
elif compiled.positional:
positiontup.append(name)
return replacement_expressions[m.group(1)]
self.statement = re.sub(
- r"\[EXPANDING_(\S+)\]", process_expanding, self.statement
+ r"\[POSTCOMPILE_(\S+)\]", process_expanding, self.statement
)
return positiontup
inputsizes = {}
for bindparam in self.compiled.bind_names:
+ if bindparam in self.compiled.literal_execute_params:
+ continue
dialect_impl = bindparam.type._unwrapped_dialect_impl(self.dialect)
dialect_impl_cls = type(dialect_impl)
def _post_coercion(self, resolved, expr, **kw):
if (
- isinstance(resolved, elements.BindParameter)
+ isinstance(resolved, (elements.Grouping, elements.BindParameter))
and resolved.type._isnull
+ and not expr.type._isnull
):
- resolved = resolved._clone()
- resolved.type = expr.type
+ resolved = resolved._with_binary_element_type(expr.type)
return resolved
element = element._with_expanding_in_types(
[elem.type for elem in expr]
)
+
return element
else:
return element
from . import schema
from . import selectable
from . import sqltypes
+from .base import NO_ARG
from .. import exc
from .. import util
columns with the table name (i.e. MySQL only)
"""
- contains_expanding_parameters = False
- """True if we've encountered bindparam(..., expanding=True).
-
- These need to be converted before execution time against the
- string statement.
-
- """
-
ansi_bind_rules = False
"""SQL 92 doesn't allow bind parameters to be used
in the columns clause of a SELECT, nor does it allow
"""
+ literal_execute_params = frozenset()
+
insert_prefetch = update_prefetch = ()
def __init__(
% self.dialect.name
)
+ def _literal_execute_expanding_parameter_literal_binds(
+ self, parameter, values
+ ):
+ if not values:
+ replacement_expression = self.visit_empty_set_expr(
+ parameter._expanding_in_types
+ if parameter._expanding_in_types
+ else [parameter.type]
+ )
+
+ elif isinstance(values[0], (tuple, list)):
+ replacement_expression = (
+ "VALUES " if self.dialect.tuple_in_values else ""
+ ) + ", ".join(
+ "(%s)"
+ % (
+ ", ".join(
+ self.render_literal_value(value, parameter.type)
+ for value in tuple_element
+ )
+ )
+ for i, tuple_element in enumerate(values)
+ )
+ else:
+ replacement_expression = ", ".join(
+ self.render_literal_value(value, parameter.type)
+ for value in values
+ )
+
+ return (), replacement_expression
+
+ def _literal_execute_expanding_parameter(self, name, parameter, values):
+ if parameter.literal_execute:
+ return self._literal_execute_expanding_parameter_literal_binds(
+ parameter, values
+ )
+
+ if not values:
+ to_update = []
+ replacement_expression = self.visit_empty_set_expr(
+ parameter._expanding_in_types
+ if parameter._expanding_in_types
+ else [parameter.type]
+ )
+
+ elif isinstance(values[0], (tuple, list)):
+ to_update = [
+ ("%s_%s_%s" % (name, i, j), value)
+ for i, tuple_element in enumerate(values, 1)
+ for j, value in enumerate(tuple_element, 1)
+ ]
+ replacement_expression = (
+ "VALUES " if self.dialect.tuple_in_values else ""
+ ) + ", ".join(
+ "(%s)"
+ % (
+ ", ".join(
+ self.bindtemplate
+ % {"name": to_update[i * len(tuple_element) + j][0]}
+ for j, value in enumerate(tuple_element)
+ )
+ )
+ for i, tuple_element in enumerate(values)
+ )
+ else:
+ to_update = [
+ ("%s_%s" % (name, i), value)
+ for i, value in enumerate(values, 1)
+ ]
+ replacement_expression = ", ".join(
+ self.bindtemplate % {"name": key} for key, value in to_update
+ )
+
+ return to_update, replacement_expression
+
def visit_binary(
self, binary, override_operator=None, eager_grouping=False, **kw
):
within_columns_clause=False,
literal_binds=False,
skip_bind_expression=False,
+ literal_execute=False,
**kwargs
):
skip_bind_expression=True,
within_columns_clause=within_columns_clause,
literal_binds=literal_binds,
+ literal_execute=literal_execute,
**kwargs
)
- if literal_binds or (within_columns_clause and self.ansi_bind_rules):
- if bindparam.value is None and bindparam.callable is None:
- raise exc.CompileError(
- "Bind parameter '%s' without a "
- "renderable value not allowed here." % bindparam.key
- )
- return self.render_literal_bindparam(
+ if not literal_binds:
+ post_compile = (
+ literal_execute
+ or bindparam.literal_execute
+ or bindparam.expanding
+ )
+ else:
+ post_compile = False
+
+ if not literal_execute and (
+ literal_binds or (within_columns_clause and self.ansi_bind_rules)
+ ):
+ ret = self.render_literal_bindparam(
bindparam, within_columns_clause=True, **kwargs
)
+ if bindparam.expanding:
+ ret = "(%s)" % ret
+ return ret
name = self._truncate_bindparam(bindparam)
self.binds[bindparam.key] = self.binds[name] = bindparam
- return self.bindparam_string(
- name, expanding=bindparam.expanding, **kwargs
+ if post_compile:
+ self.literal_execute_params |= {bindparam}
+
+ ret = self.bindparam_string(
+ name,
+ post_compile=post_compile,
+ expanding=bindparam.expanding,
+ **kwargs
)
+ if bindparam.expanding:
+ ret = "(%s)" % ret
+ return ret
+
+ def render_literal_bindparam(
+ self, bindparam, render_literal_value=NO_ARG, **kw
+ ):
+ if render_literal_value is not NO_ARG:
+ value = render_literal_value
+ else:
+ if bindparam.value is None and bindparam.callable is None:
+ raise exc.CompileError(
+ "Bind parameter '%s' without a "
+ "renderable value not allowed here." % bindparam.key
+ )
+ value = bindparam.effective_value
- def render_literal_bindparam(self, bindparam, **kw):
- value = bindparam.effective_value
- return self.render_literal_value(value, bindparam.type)
+ if bindparam.expanding:
+ leep = self._literal_execute_expanding_parameter_literal_binds
+ to_update, replacement_expr = leep(bindparam, value)
+ return replacement_expr
+ else:
+ return self.render_literal_value(value, bindparam.type)
def render_literal_value(self, value, type_):
"""Render the value of a bind parameter as a quoted literal.
return derived + "_" + str(anonymous_counter)
def bindparam_string(
- self, name, positional_names=None, expanding=False, **kw
+ self,
+ name,
+ positional_names=None,
+ post_compile=False,
+ expanding=False,
+ **kw
):
if self.positional:
if positional_names is not None:
positional_names.append(name)
else:
self.positiontup.append(name)
- if expanding:
- self.contains_expanding_parameters = True
- return "([EXPANDING_%s])" % name
+ if post_compile:
+ return "[POSTCOMPILE_%s]" % name
else:
return self.bindtemplate % {"name": name}
return c
+ def _with_binary_element_type(self, type_):
+ """in the context of binary expression, convert the type of this
+ object to the one given.
+
+ applies only to :class:`.ColumnElement` classes.
+
+ """
+ return self
+
def _cache_key(self, **kw):
"""return an optional cache key.
def type(self):
return type_api.NULLTYPE
+ def _with_binary_element_type(self, type_):
+ cloned = self._clone()
+ cloned._copy_internals(
+ clone=lambda element: element._with_binary_element_type(type_)
+ )
+ cloned.type = type_
+ return cloned
+
@util.memoized_property
def comparator(self):
try:
callable_=None,
expanding=False,
isoutparam=False,
+ literal_execute=False,
_compared_to_operator=None,
_compared_to_type=None,
):
:func:`.outparam`
+ :param literal_execute:
+ if True, the bound parameter will be rendered in the compile phase
+ with a special "POSTCOMPILE" token, and the SQLAlchemy compiler will
+ render the final value of the parameter into the SQL statement at
+ statement execution time, omitting the value from the parameter
+ dictionary / list passed to DBAPI ``cursor.execute()``. This
+ produces a similar effect as that of using the ``literal_binds``,
+ compilation flag, however takes place as the statement is sent to
+ the DBAPI ``cursor.execute()`` method, rather than when the statement
+ is compiled. The primary use of this
+ capability is for rendering LIMIT / OFFSET clauses for database
+ drivers that can't accommodate for bound parameters in these
+ contexts, while allowing SQL constructs to be cacheable at the
+ compilation level.
+
+ .. versionadded:: 1.4 Added "post compile" bound parameters
+
+ .. seealso::
+
+ :ref:`change_4808`.
+
+
+
+
"""
if isinstance(key, ColumnClause):
type_ = key.type
self.isoutparam = isoutparam
self.required = required
self.expanding = expanding
+ self.literal_execute = literal_execute
if type_ is None:
if _compared_to_type is not None:
for bind in binds:
try:
- existing = new_params[bind.key]
+ # the regex used for text() currently will not match
+ # a unique/anonymous key in any case, so use the _orig_key
+ # so that a text() construct can support unique parameters
+ existing = new_params[bind._orig_key]
except KeyError:
raise exc.ArgumentError(
"This text() construct doesn't define a "
- "bound parameter named %r" % bind.key
+ "bound parameter named %r" % bind._orig_key
)
else:
- new_params[existing.key] = bind
+ new_params[existing._orig_key] = bind
for key, value in names_to_values.items():
try:
def literal_processor(self, dialect):
def process(value):
- return str(value)
+ return str(int(value))
return process
class CursorSQL(SQLMatchRule):
- consume_statement = False
-
- def __init__(self, statement, params=None):
+ def __init__(self, statement, params=None, consume_statement=True):
self.statement = statement
self.params = params
+ self.consume_statement = consume_statement
def process_statement(self, execute_observed):
stmt = execute_observed.statements[0]
return exclusions.open()
+ @property
+ def standard_cursor_sql(self):
+ """Target database passes SQL-92 style statements to cursor.execute()
+ when a statement like select() or insert() is run.
+
+ A very small portion of dialect-level tests will ensure that certain
+ conditions are present in SQL strings, and these tests use very basic
+ SQL that will work on any SQL-like platform in order to assert results.
+
+ It's normally a given for any pep-249 DBAPI that a statement like
+ "SELECT id, name FROM table WHERE some_table.id=5" will work.
+ However, there are dialects that don't actually produce SQL Strings
+ and instead may work with symbolic objects instead, or dialects that
+ aren't working with SQL, so for those this requirement can be marked
+ as excluded.
+
+ """
+
+ return exclusions.open()
+
@property
def on_update_cascade(self):
""""target database must support ON UPDATE..CASCADE behavior in
+from .. import AssertsCompiledSQL
+from .. import AssertsExecutionResults
from .. import config
from .. import fixtures
from ..assertions import eq_
from ..assertions import in_
+from ..assertsql import CursorSQL
from ..schema import Column
from ..schema import Table
from ... import bindparam
from ... import select
from ... import String
from ... import testing
+from ... import text
from ... import true
from ... import tuple_
from ... import union
params={"l": 2, "o": 1},
)
+ @testing.requires.sql_expression_limit_offset
+ def test_expr_offset(self):
+ table = self.tables.some_table
+ self._assert_result(
+ select([table])
+ .order_by(table.c.id)
+ .offset(literal_column("1") + literal_column("2")),
+ [(4, 4, 5)],
+ )
+
+ @testing.requires.sql_expression_limit_offset
+ def test_expr_limit(self):
+ table = self.tables.some_table
+ self._assert_result(
+ select([table])
+ .order_by(table.c.id)
+ .limit(literal_column("1") + literal_column("2")),
+ [(1, 1, 2), (2, 2, 3), (3, 3, 4)],
+ )
+
+ @testing.requires.sql_expression_limit_offset
+ def test_expr_limit_offset(self):
+ table = self.tables.some_table
+ self._assert_result(
+ select([table])
+ .order_by(table.c.id)
+ .limit(literal_column("1") + literal_column("1"))
+ .offset(literal_column("1") + literal_column("1")),
+ [(3, 3, 4), (4, 4, 5)],
+ )
+
+ @testing.requires.sql_expression_limit_offset
+ def test_simple_limit_expr_offset(self):
+ table = self.tables.some_table
+ self._assert_result(
+ select([table])
+ .order_by(table.c.id)
+ .limit(2)
+ .offset(literal_column("1") + literal_column("1")),
+ [(3, 3, 4), (4, 4, 5)],
+ )
+
+ @testing.requires.sql_expression_limit_offset
+ def test_expr_limit_simple_offset(self):
+ table = self.tables.some_table
+ self._assert_result(
+ select([table])
+ .order_by(table.c.id)
+ .limit(literal_column("1") + literal_column("1"))
+ .offset(2),
+ [(3, 3, 4), (4, 4, 5)],
+ )
+
class CompoundSelectTest(fixtures.TablesTest):
__backend__ = True
)
+class PostCompileParamsTest(
+ AssertsExecutionResults, AssertsCompiledSQL, fixtures.TablesTest
+):
+ __backend__ = True
+
+ __requires__ = ("standard_cursor_sql",)
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "some_table",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("x", Integer),
+ Column("y", Integer),
+ Column("z", String(50)),
+ )
+
+ @classmethod
+ def insert_data(cls):
+ config.db.execute(
+ cls.tables.some_table.insert(),
+ [
+ {"id": 1, "x": 1, "y": 2, "z": "z1"},
+ {"id": 2, "x": 2, "y": 3, "z": "z2"},
+ {"id": 3, "x": 3, "y": 4, "z": "z3"},
+ {"id": 4, "x": 4, "y": 5, "z": "z4"},
+ ],
+ )
+
+ def test_compile(self):
+ table = self.tables.some_table
+
+ stmt = select([table.c.id]).where(
+ table.c.x == bindparam("q", literal_execute=True)
+ )
+
+ self.assert_compile(
+ stmt,
+ "SELECT some_table.id FROM some_table "
+ "WHERE some_table.x = [POSTCOMPILE_q]",
+ {},
+ )
+
+ def test_compile_literal_binds(self):
+ table = self.tables.some_table
+
+ stmt = select([table.c.id]).where(
+ table.c.x == bindparam("q", 10, literal_execute=True)
+ )
+
+ self.assert_compile(
+ stmt,
+ "SELECT some_table.id FROM some_table WHERE some_table.x = 10",
+ {},
+ literal_binds=True,
+ )
+
+ def test_execute(self):
+ table = self.tables.some_table
+
+ stmt = select([table.c.id]).where(
+ table.c.x == bindparam("q", literal_execute=True)
+ )
+
+ with self.sql_execution_asserter() as asserter:
+ with config.db.connect() as conn:
+ conn.execute(stmt, q=10)
+
+ asserter.assert_(
+ CursorSQL(
+ "SELECT some_table.id \nFROM some_table "
+ "\nWHERE some_table.x = 10",
+ () if config.db.dialect.positional else {},
+ )
+ )
+
+ def test_execute_expanding_plus_literal_execute(self):
+ table = self.tables.some_table
+
+ stmt = select([table.c.id]).where(
+ table.c.x.in_(bindparam("q", expanding=True, literal_execute=True))
+ )
+
+ with self.sql_execution_asserter() as asserter:
+ with config.db.connect() as conn:
+ conn.execute(stmt, q=[5, 6, 7])
+
+ asserter.assert_(
+ CursorSQL(
+ "SELECT some_table.id \nFROM some_table "
+ "\nWHERE some_table.x IN (5, 6, 7)",
+ () if config.db.dialect.positional else {},
+ )
+ )
+
+ @testing.requires.tuple_in
+ def test_execute_tuple_expanding_plus_literal_execute(self):
+ table = self.tables.some_table
+
+ stmt = select([table.c.id]).where(
+ tuple_(table.c.x, table.c.y).in_(
+ bindparam("q", expanding=True, literal_execute=True)
+ )
+ )
+
+ with self.sql_execution_asserter() as asserter:
+ with config.db.connect() as conn:
+ conn.execute(stmt, q=[(5, 10), (12, 18)])
+
+ asserter.assert_(
+ CursorSQL(
+ "SELECT some_table.id \nFROM some_table "
+ "\nWHERE (some_table.x, some_table.y) "
+ "IN (%s(5, 10), (12, 18))"
+ % ("VALUES " if config.db.dialect.tuple_in_values else ""),
+ () if config.db.dialect.positional else {},
+ )
+ )
+
+
class ExpandingBoundInTest(fixtures.TablesTest):
__backend__ = True
params={"q": [(2, "z2"), (3, "z3"), (4, "z4")]},
)
+ @testing.requires.tuple_in
+ def test_bound_in_heterogeneous_two_tuple_text(self):
+ stmt = text(
+ "select id FROM some_table WHERE (x, z) IN :q ORDER BY id"
+ ).bindparams(bindparam("q", expanding=True))
+
+ self._assert_result(
+ stmt,
+ [(2,), (3,), (4,)],
+ params={"q": [(2, "z2"), (3, "z3"), (4, "z4")]},
+ )
+
def test_empty_set_against_integer(self):
table = self.tables.some_table
self.assert_compile(
s,
- "SELECT TOP 10 t.x, t.y FROM t WHERE t.x = :x_1 ORDER BY t.y",
- checkparams={"x_1": 5},
+ "SELECT TOP [POSTCOMPILE_param_1] t.x, t.y FROM t "
+ "WHERE t.x = :x_1 ORDER BY t.y",
+ checkparams={"x_1": 5, "param_1": 10},
)
def test_limit_zero_using_top(self):
self.assert_compile(
s,
- "SELECT TOP 0 t.x, t.y FROM t WHERE t.x = :x_1 ORDER BY t.y",
- checkparams={"x_1": 5},
+ "SELECT TOP [POSTCOMPILE_param_1] t.x, t.y FROM t "
+ "WHERE t.x = :x_1 ORDER BY t.y",
+ checkparams={"x_1": 5, "param_1": 0},
)
c = s.compile(dialect=mssql.dialect())
eq_(len(c._result_columns), 2)
# of zero, so produces TOP 0
self.assert_compile(
s,
- "SELECT TOP 0 t.x, t.y FROM t " "WHERE t.x = :x_1 ORDER BY t.y",
- checkparams={"x_1": 5},
+ "SELECT TOP [POSTCOMPILE_param_1] t.x, t.y FROM t "
+ "WHERE t.x = :x_1 ORDER BY t.y",
+ checkparams={"x_1": 5, "param_1": 0},
)
def test_primary_key_no_identity(self):
DialectSQL(
"INSERT INTO t1 (data) VALUES (:data)", {"data": "somedata"}
),
- CursorSQL("SELECT @@identity AS lastrowid"),
+ CursorSQL(
+ "SELECT @@identity AS lastrowid", consume_statement=False
+ ),
)
@testing.provide_metadata
# even with pyodbc, we don't embed the scope identity on a
# DEFAULT VALUES insert
asserter.assert_(
- CursorSQL("INSERT INTO t1 DEFAULT VALUES"),
- CursorSQL("SELECT scope_identity() AS lastrowid"),
+ CursorSQL(
+ "INSERT INTO t1 DEFAULT VALUES", consume_statement=False
+ ),
+ CursorSQL(
+ "SELECT scope_identity() AS lastrowid", consume_statement=False
+ ),
)
@testing.only_on("mssql+pyodbc")
CursorSQL(
"INSERT INTO t1 (data) VALUES (?); select scope_identity()",
("somedata",),
+ consume_statement=False,
)
)
# coding: utf-8
-
-
from sqlalchemy import and_
from sqlalchemy import bindparam
-from sqlalchemy import exc
from sqlalchemy import except_
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import Index
from sqlalchemy import Integer
+from sqlalchemy import literal
from sqlalchemy import literal_column
from sqlalchemy import MetaData
from sqlalchemy import or_
from sqlalchemy import Sequence
from sqlalchemy import sql
from sqlalchemy import String
+from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import type_coerce
from sqlalchemy import TypeDecorator
from sqlalchemy.sql import column
from sqlalchemy.sql import quoted_name
from sqlalchemy.sql import table
-from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
t.update().values(plain=5), 'UPDATE s SET "plain"=:"plain"'
)
- def test_bindparam_quote_raise_on_expanding(self):
- assert_raises_message(
- exc.CompileError,
- "Can't use expanding feature with parameter name 'uid' on "
- "Oracle; it requires quoting which is not supported in this "
- "context",
- bindparam("uid", expanding=True).compile,
+ def test_bindparam_quote_works_on_expanding(self):
+ self.assert_compile(
+ bindparam("uid", expanding=True),
+ "([POSTCOMPILE_uid])",
dialect=cx_oracle.dialect(),
)
"anon_2.col2 AS col2, ROWNUM AS ora_rn FROM (SELECT "
"sometable.col1 AS col1, sometable.col2 AS "
"col2 FROM sometable) anon_2 WHERE ROWNUM <= "
- ":param_1 + :param_2) anon_1 WHERE ora_rn > :param_2",
- checkparams={"param_1": 10, "param_2": 20},
+ "[POSTCOMPILE_param_1]) anon_1 WHERE ora_rn > "
+ "[POSTCOMPILE_param_2]",
+ checkparams={"param_1": 30, "param_2": 20},
)
c = s.compile(dialect=oracle.OracleDialect())
eq_(len(c._result_columns), 2)
assert t.c.col1 in set(c._create_result_map()["col1"][1])
+ def test_limit_one_firstrows(self):
+ t = table("sometable", column("col1"), column("col2"))
+ s = select([t])
+ s = select([t]).limit(10).offset(20)
+ self.assert_compile(
+ s,
+ "SELECT anon_1.col1, anon_1.col2 FROM "
+ "(SELECT /*+ FIRST_ROWS([POSTCOMPILE__ora_frow_1]) */ "
+ "anon_2.col1 AS col1, "
+ "anon_2.col2 AS col2, ROWNUM AS ora_rn FROM (SELECT "
+ "sometable.col1 AS col1, sometable.col2 AS "
+ "col2 FROM sometable) anon_2 WHERE ROWNUM <= "
+ "[POSTCOMPILE_param_1]) anon_1 WHERE ora_rn > "
+ "[POSTCOMPILE_param_2]",
+ checkparams={"_ora_frow_1": 10, "param_1": 30, "param_2": 20},
+ dialect=oracle.OracleDialect(optimize_limits=True),
+ )
+
def test_limit_two(self):
t = table("sometable", column("col1"), column("col2"))
s = select([t]).limit(10).offset(20).subquery()
"ROWNUM AS ora_rn "
"FROM (SELECT sometable.col1 AS col1, "
"sometable.col2 AS col2 FROM sometable) anon_3 "
- "WHERE ROWNUM <= :param_1 + :param_2) anon_2 "
- "WHERE ora_rn > :param_2) anon_1",
- checkparams={"param_1": 10, "param_2": 20},
+ "WHERE ROWNUM <= [POSTCOMPILE_param_1]) anon_2 "
+ "WHERE ora_rn > [POSTCOMPILE_param_2]) anon_1",
+ checkparams={"param_1": 30, "param_2": 20},
)
self.assert_compile(
"ROWNUM AS ora_rn "
"FROM (SELECT sometable.col1 AS col1, "
"sometable.col2 AS col2 FROM sometable) anon_3 "
- "WHERE ROWNUM <= :param_1 + :param_2) anon_2 "
- "WHERE ora_rn > :param_2) anon_1",
+ "WHERE ROWNUM <= [POSTCOMPILE_param_1]) anon_2 "
+ "WHERE ora_rn > [POSTCOMPILE_param_2]) anon_1",
)
c = s2.compile(dialect=oracle.OracleDialect())
eq_(len(c._result_columns), 2)
"sometable.col1 AS col1, sometable.col2 AS "
"col2 FROM sometable ORDER BY "
"sometable.col2) anon_2 WHERE ROWNUM <= "
- ":param_1 + :param_2) anon_1 WHERE ora_rn > :param_2",
- checkparams={"param_1": 10, "param_2": 20},
+ "[POSTCOMPILE_param_1]) anon_1 "
+ "WHERE ora_rn > [POSTCOMPILE_param_2]",
+ checkparams={"param_1": 30, "param_2": 20},
)
c = s.compile(dialect=oracle.OracleDialect())
eq_(len(c._result_columns), 2)
"SELECT anon_1.col1, anon_1.col2 FROM (SELECT "
"sometable.col1 AS col1, sometable.col2 AS "
"col2 FROM sometable ORDER BY "
- "sometable.col2) anon_1 WHERE ROWNUM <= :param_1 "
+ "sometable.col2) anon_1 WHERE ROWNUM <= [POSTCOMPILE_param_1] "
+ "FOR UPDATE",
+ checkparams={"param_1": 10},
+ )
+
+ def test_limit_four_firstrows(self):
+ t = table("sometable", column("col1"), column("col2"))
+
+ s = select([t]).with_for_update().limit(10).order_by(t.c.col2)
+ self.assert_compile(
+ s,
+ "SELECT /*+ FIRST_ROWS([POSTCOMPILE__ora_frow_1]) */ "
+ "anon_1.col1, anon_1.col2 FROM (SELECT "
+ "sometable.col1 AS col1, sometable.col2 AS "
+ "col2 FROM sometable ORDER BY "
+ "sometable.col2) anon_1 WHERE ROWNUM <= [POSTCOMPILE_param_1] "
"FOR UPDATE",
+ checkparams={"param_1": 10, "_ora_frow_1": 10},
+ dialect=oracle.OracleDialect(optimize_limits=True),
)
def test_limit_five(self):
"sometable.col1 AS col1, sometable.col2 AS "
"col2 FROM sometable ORDER BY "
"sometable.col2) anon_2 WHERE ROWNUM <= "
- ":param_1 + :param_2) anon_1 WHERE ora_rn > :param_2 FOR "
+ "[POSTCOMPILE_param_1]) anon_1 "
+ "WHERE ora_rn > [POSTCOMPILE_param_2] FOR "
"UPDATE",
+ checkparams={"param_1": 30, "param_2": 20},
+ )
+
+ def test_limit_six(self):
+ t = table("sometable", column("col1"), column("col2"))
+
+ s = (
+ select([t])
+ .limit(10)
+ .offset(literal(10) + literal(20))
+ .order_by(t.c.col2)
+ )
+ self.assert_compile(
+ s,
+ "SELECT anon_1.col1, anon_1.col2 FROM (SELECT anon_2.col1 AS "
+ "col1, anon_2.col2 AS col2, ROWNUM AS ora_rn FROM "
+ "(SELECT sometable.col1 AS col1, sometable.col2 AS col2 "
+ "FROM sometable ORDER BY sometable.col2) anon_2 WHERE "
+ "ROWNUM <= :param_1 + :param_2 + :param_3) anon_1 "
+ "WHERE ora_rn > :param_2 + :param_3",
+ checkparams={"param_1": 10, "param_2": 10, "param_3": 20},
)
def test_limit_special_quoting(self):
'SELECT anon_1."SUM(ABC)" FROM '
'(SELECT SUM(ABC) AS "SUM(ABC)" '
"FROM my_table ORDER BY SUM(ABC)) anon_1 "
- "WHERE ROWNUM <= :param_1",
+ "WHERE ROWNUM <= [POSTCOMPILE_param_1]",
)
col = literal_column("SUM(ABC)").label(quoted_name("SUM(ABC)", True))
'SELECT anon_1."SUM(ABC)" FROM '
'(SELECT SUM(ABC) AS "SUM(ABC)" '
"FROM my_table ORDER BY SUM(ABC)) anon_1 "
- "WHERE ROWNUM <= :param_1",
+ "WHERE ROWNUM <= [POSTCOMPILE_param_1]",
)
col = literal_column("SUM(ABC)").label("SUM(ABC)_")
'SELECT anon_1."SUM(ABC)_" FROM '
'(SELECT SUM(ABC) AS "SUM(ABC)_" '
"FROM my_table ORDER BY SUM(ABC)) anon_1 "
- "WHERE ROWNUM <= :param_1",
+ "WHERE ROWNUM <= [POSTCOMPILE_param_1]",
)
col = literal_column("SUM(ABC)").label(quoted_name("SUM(ABC)_", True))
'SELECT anon_1."SUM(ABC)_" FROM '
'(SELECT SUM(ABC) AS "SUM(ABC)_" '
"FROM my_table ORDER BY SUM(ABC)) anon_1 "
- "WHERE ROWNUM <= :param_1",
+ "WHERE ROWNUM <= [POSTCOMPILE_param_1]",
)
def test_for_update(self):
"SELECT anon_1.myid, anon_1.name FROM "
"(SELECT mytable.myid AS myid, mytable.name AS name "
"FROM mytable WHERE mytable.myid = :myid_1) anon_1 "
- "WHERE ROWNUM <= :param_1 FOR UPDATE OF anon_1.name NOWAIT",
+ "WHERE ROWNUM <= [POSTCOMPILE_param_1] "
+ "FOR UPDATE OF anon_1.name NOWAIT",
+ checkparams={"param_1": 10, "myid_1": 7},
)
def test_for_update_of_w_limit_adaption_col_unpresent(self):
"SELECT anon_1.myid FROM "
"(SELECT mytable.myid AS myid, mytable.name AS name "
"FROM mytable WHERE mytable.myid = :myid_1) anon_1 "
- "WHERE ROWNUM <= :param_1 FOR UPDATE OF anon_1.name NOWAIT",
+ "WHERE ROWNUM <= [POSTCOMPILE_param_1] "
+ "FOR UPDATE OF anon_1.name NOWAIT",
)
def test_for_update_of_w_limit_offset_adaption_col_present(self):
"ROWNUM AS ora_rn "
"FROM (SELECT mytable.myid AS myid, mytable.name AS name "
"FROM mytable WHERE mytable.myid = :myid_1) anon_2 "
- "WHERE ROWNUM <= :param_1 + :param_2) anon_1 "
- "WHERE ora_rn > :param_2 "
+ "WHERE ROWNUM <= [POSTCOMPILE_param_1]) anon_1 "
+ "WHERE ora_rn > [POSTCOMPILE_param_2] "
"FOR UPDATE OF anon_1.name NOWAIT",
+ checkparams={"param_1": 60, "param_2": 50, "myid_1": 7},
)
def test_for_update_of_w_limit_offset_adaption_col_unpresent(self):
"ROWNUM AS ora_rn, anon_2.name AS name "
"FROM (SELECT mytable.myid AS myid, mytable.name AS name "
"FROM mytable WHERE mytable.myid = :myid_1) anon_2 "
- "WHERE ROWNUM <= :param_1 + :param_2) anon_1 "
- "WHERE ora_rn > :param_2 "
+ "WHERE ROWNUM <= [POSTCOMPILE_param_1]) anon_1 "
+ "WHERE ora_rn > [POSTCOMPILE_param_2] "
"FOR UPDATE OF anon_1.name NOWAIT",
+ checkparams={"param_1": 60, "param_2": 50, "myid_1": 7},
)
def test_for_update_of_w_limit_offset_adaption_partial_col_unpresent(self):
"mytable.bar AS bar, "
"mytable.foo AS foo FROM mytable "
"WHERE mytable.myid = :myid_1) anon_2 "
- "WHERE ROWNUM <= :param_1 + :param_2) anon_1 "
- "WHERE ora_rn > :param_2 "
+ "WHERE ROWNUM <= [POSTCOMPILE_param_1]) anon_1 "
+ "WHERE ora_rn > [POSTCOMPILE_param_2] "
"FOR UPDATE OF anon_1.foo, anon_1.bar NOWAIT",
+ checkparams={"param_1": 60, "param_2": 50, "myid_1": 7},
)
def test_limit_preserves_typing_information(self):
def test_use_binds_for_limits_disabled_one(self):
t = table("sometable", column("col1"), column("col2"))
- dialect = oracle.OracleDialect(use_binds_for_limits=False)
+ with testing.expect_deprecated(
+ "The ``use_binds_for_limits`` Oracle dialect parameter is "
+ "deprecated."
+ ):
+ dialect = oracle.OracleDialect(use_binds_for_limits=False)
self.assert_compile(
select([t]).limit(10),
"SELECT anon_1.col1, anon_1.col2 FROM "
"(SELECT sometable.col1 AS col1, "
- "sometable.col2 AS col2 FROM sometable) anon_1 WHERE ROWNUM <= 10",
+ "sometable.col2 AS col2 FROM sometable) anon_1 "
+ "WHERE ROWNUM <= [POSTCOMPILE_param_1]",
dialect=dialect,
)
def test_use_binds_for_limits_disabled_two(self):
t = table("sometable", column("col1"), column("col2"))
- dialect = oracle.OracleDialect(use_binds_for_limits=False)
+ with testing.expect_deprecated(
+ "The ``use_binds_for_limits`` Oracle dialect parameter is "
+ "deprecated."
+ ):
+ dialect = oracle.OracleDialect(use_binds_for_limits=False)
self.assert_compile(
select([t]).offset(10),
"SELECT anon_1.col1, anon_1.col2 FROM (SELECT "
"anon_2.col1 AS col1, anon_2.col2 AS col2, ROWNUM AS ora_rn "
"FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 "
- "FROM sometable) anon_2) anon_1 WHERE ora_rn > 10",
+ "FROM sometable) anon_2) anon_1 "
+ "WHERE ora_rn > [POSTCOMPILE_param_1]",
dialect=dialect,
)
def test_use_binds_for_limits_disabled_three(self):
t = table("sometable", column("col1"), column("col2"))
- dialect = oracle.OracleDialect(use_binds_for_limits=False)
+ with testing.expect_deprecated(
+ "The ``use_binds_for_limits`` Oracle dialect parameter is "
+ "deprecated."
+ ):
+ dialect = oracle.OracleDialect(use_binds_for_limits=False)
self.assert_compile(
select([t]).limit(10).offset(10),
"SELECT anon_1.col1, anon_1.col2 FROM (SELECT "
"anon_2.col1 AS col1, anon_2.col2 AS col2, ROWNUM AS ora_rn "
"FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 "
- "FROM sometable) anon_2 WHERE ROWNUM <= 20) anon_1 "
- "WHERE ora_rn > 10",
+ "FROM sometable) anon_2 "
+ "WHERE ROWNUM <= [POSTCOMPILE_param_1]) anon_1 "
+ "WHERE ora_rn > [POSTCOMPILE_param_2]",
dialect=dialect,
)
def test_use_binds_for_limits_enabled_one(self):
t = table("sometable", column("col1"), column("col2"))
- dialect = oracle.OracleDialect(use_binds_for_limits=True)
+ with testing.expect_deprecated(
+ "The ``use_binds_for_limits`` Oracle dialect parameter is "
+ "deprecated."
+ ):
+ dialect = oracle.OracleDialect(use_binds_for_limits=True)
self.assert_compile(
select([t]).limit(10),
"SELECT anon_1.col1, anon_1.col2 FROM "
"(SELECT sometable.col1 AS col1, "
"sometable.col2 AS col2 FROM sometable) anon_1 WHERE ROWNUM "
- "<= :param_1",
+ "<= [POSTCOMPILE_param_1]",
dialect=dialect,
)
def test_use_binds_for_limits_enabled_two(self):
t = table("sometable", column("col1"), column("col2"))
- dialect = oracle.OracleDialect(use_binds_for_limits=True)
+ with testing.expect_deprecated(
+ "The ``use_binds_for_limits`` Oracle dialect parameter is "
+ "deprecated."
+ ):
+ dialect = oracle.OracleDialect(use_binds_for_limits=True)
self.assert_compile(
select([t]).offset(10),
"(SELECT anon_2.col1 AS col1, anon_2.col2 AS col2, "
"ROWNUM AS ora_rn "
"FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 "
- "FROM sometable) anon_2) anon_1 WHERE ora_rn > :param_1",
+ "FROM sometable) anon_2) anon_1 "
+ "WHERE ora_rn > [POSTCOMPILE_param_1]",
dialect=dialect,
)
def test_use_binds_for_limits_enabled_three(self):
t = table("sometable", column("col1"), column("col2"))
- dialect = oracle.OracleDialect(use_binds_for_limits=True)
+ with testing.expect_deprecated(
+ "The ``use_binds_for_limits`` Oracle dialect parameter is "
+ "deprecated."
+ ):
+ dialect = oracle.OracleDialect(use_binds_for_limits=True)
self.assert_compile(
select([t]).limit(10).offset(10),
"ROWNUM AS ora_rn "
"FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 "
"FROM sometable) anon_2 "
- "WHERE ROWNUM <= :param_1 + :param_2) anon_1 "
- "WHERE ora_rn > :param_2",
+ "WHERE ROWNUM <= [POSTCOMPILE_param_1]) anon_1 "
+ "WHERE ora_rn > [POSTCOMPILE_param_2]",
dialect=dialect,
- checkparams={"param_1": 10, "param_2": 10},
+ checkparams={"param_1": 20, "param_2": 10},
)
def test_long_labels(self):
"thirdtable.userid(+) = "
"myothertable.otherid AND mytable.myid = "
"myothertable.otherid ORDER BY mytable.name) anon_2 "
- "WHERE ROWNUM <= :param_1 + :param_2) anon_1 "
- "WHERE ora_rn > :param_2",
- checkparams={"param_1": 10, "param_2": 5},
+ "WHERE ROWNUM <= [POSTCOMPILE_param_1]) anon_1 "
+ "WHERE ora_rn > [POSTCOMPILE_param_2]",
+ checkparams={"param_1": 15, "param_2": 5},
dialect=oracle.dialect(use_ansi=False),
)
testing.db.execute(t.insert(), {"100K": 10})
eq_(testing.db.scalar(t.select()), 10)
+ @testing.provide_metadata
+ def test_expanding_quote_roundtrip(self):
+ t = Table("asfd", self.metadata, Column("foo", Integer))
+ t.create()
+
+ with testing.db.connect() as conn:
+ conn.execute(
+ select([t]).where(
+ t.c.foo.in_(bindparam("uid", expanding=True))
+ ),
+ uid=[1, 2, 3],
+ )
+
class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL):
def _dialect(self, server_version, **kw):
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
{"id": 30, "data": "d1"},
),
- CursorSQL("select nextval('my_seq')"),
+ CursorSQL("select nextval('my_seq')", consume_statement=False),
DialectSQL(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
{"id": 1, "data": "d2"},
"a.type AS a_type, "
"asub.asubdata AS asub_asubdata FROM a JOIN asub "
"ON a.id = asub.id "
- "WHERE a.id IN ([EXPANDING_primary_keys]) "
+ "WHERE a.id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY a.id",
{"primary_keys": [2]},
),
"SELECT c.a_sub_id AS c_a_sub_id, "
"c.id AS c_id "
"FROM c WHERE c.a_sub_id "
- "IN ([EXPANDING_primary_keys]) ORDER BY c.a_sub_id",
+ "IN ([POSTCOMPILE_primary_keys]) ORDER BY c.a_sub_id",
{"primary_keys": [2]},
),
),
CompiledSQL(
"SELECT b.a_id AS b_a_id, b.id AS b_id FROM b "
- "WHERE b.a_id IN ([EXPANDING_primary_keys]) "
+ "WHERE b.a_id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY b.a_id",
{"primary_keys": [1, 2]},
),
"engineers.primary_language AS engineers_primary_language "
"FROM people JOIN engineers "
"ON people.person_id = engineers.person_id "
- "WHERE people.person_id IN ([EXPANDING_primary_keys]) "
+ "WHERE people.person_id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY people.person_id",
{"primary_keys": [1, 2, 5]},
),
"managers.manager_name AS managers_manager_name "
"FROM people JOIN managers "
"ON people.person_id = managers.person_id "
- "WHERE people.person_id IN ([EXPANDING_primary_keys]) "
+ "WHERE people.person_id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY people.person_id",
{"primary_keys": [3, 4]},
),
"people.person_id AS people_person_id, "
"people.name AS people_name, people.type AS people_type "
"FROM people WHERE people.company_id "
- "IN ([EXPANDING_primary_keys]) "
+ "IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY people.company_id, people.person_id",
{"primary_keys": [1, 2]},
),
"managers.manager_name AS managers_manager_name "
"FROM people JOIN managers "
"ON people.person_id = managers.person_id "
- "WHERE people.person_id IN ([EXPANDING_primary_keys]) "
+ "WHERE people.person_id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY people.person_id",
{"primary_keys": [3, 4]},
),
"engineers.primary_language AS engineers_primary_language "
"FROM people JOIN engineers "
"ON people.person_id = engineers.person_id "
- "WHERE people.person_id IN ([EXPANDING_primary_keys]) "
+ "WHERE people.person_id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY people.person_id",
{"primary_keys": [1, 2, 5]},
),
"c.c_data AS c_c_data, c.e_data AS c_e_data, "
"c.d_data AS c_d_data "
"FROM a JOIN c ON a.id = c.id "
- "WHERE a.id IN ([EXPANDING_primary_keys]) ORDER BY a.id",
+ "WHERE a.id IN ([POSTCOMPILE_primary_keys]) ORDER BY a.id",
[{"primary_keys": [1, 2]}],
),
CompiledSQL(
"c.c_data AS c_c_data, "
"c.d_data AS c_d_data, c.e_data AS c_e_data "
"FROM a JOIN c ON a.id = c.id "
- "WHERE a.id IN ([EXPANDING_primary_keys]) ORDER BY a.id",
+ "WHERE a.id IN ([POSTCOMPILE_primary_keys]) ORDER BY a.id",
[{"primary_keys": [1, 2]}],
),
),
"c.c_data AS c_c_data, c.e_data AS c_e_data, "
"c.d_data AS c_d_data "
"FROM a JOIN c ON a.id = c.id "
- "WHERE a.id IN ([EXPANDING_primary_keys]) ORDER BY a.id",
+ "WHERE a.id IN ([POSTCOMPILE_primary_keys]) ORDER BY a.id",
[{"primary_keys": [1, 2]}],
),
CompiledSQL(
"c.c_data AS c_c_data, c.d_data AS c_d_data, "
"c.e_data AS c_e_data "
"FROM a JOIN c ON a.id = c.id "
- "WHERE a.id IN ([EXPANDING_primary_keys]) ORDER BY a.id",
+ "WHERE a.id IN ([POSTCOMPILE_primary_keys]) ORDER BY a.id",
[{"primary_keys": [1, 2]}],
),
),
"e.id AS e_id, e.e_data AS e_e_data FROM a JOIN c "
"ON a.id = c.id LEFT OUTER JOIN d ON c.id = d.id "
"LEFT OUTER JOIN e ON c.id = e.id) AS poly "
- "WHERE poly.a_id IN ([EXPANDING_primary_keys]) "
+ "WHERE poly.a_id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY poly.a_id",
[{"primary_keys": [1, 2]}],
),
"e.e_data AS e_e_data FROM a JOIN c ON a.id = c.id "
"LEFT OUTER JOIN d ON c.id = d.id "
"LEFT OUTER JOIN e ON c.id = e.id) AS poly "
- "WHERE poly.a_id IN ([EXPANDING_primary_keys]) "
+ "WHERE poly.a_id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY poly.a_id",
[{"primary_keys": [1, 2]}],
),
"child.type AS child_type "
"FROM child JOIN child_subclass1 "
"ON child.id = child_subclass1.id "
- "WHERE child.id IN ([EXPANDING_primary_keys]) "
+ "WHERE child.id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY child.id",
[{"primary_keys": [1]}],
),
"ON child.id = child_subclass1.id "
"LEFT OUTER JOIN other AS other_1 "
"ON child_subclass1.id = other_1.child_subclass_id "
- "WHERE child.id IN ([EXPANDING_primary_keys]) "
+ "WHERE child.id IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY child.id",
[{"primary_keys": [1]}],
),
"SELECT a_1.id AS a_1_id, b.id AS b_id FROM a AS a_1 "
"JOIN (b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) "
"ON a_1.b_id = b.id WHERE a_1.id "
- "IN ([EXPANDING_primary_keys]) ORDER BY a_1.id",
+ "IN ([POSTCOMPILE_primary_keys]) ORDER BY a_1.id",
[{"primary_keys": [1]}],
),
)
"FROM (SELECT anon_2.users_id AS users_id, "
"anon_2.users_name AS users_name FROM "
"(SELECT users.id AS users_id, users.name AS users_name "
- "FROM users) anon_2 WHERE ROWNUM <= :param_1) anon_1 "
+ "FROM users) anon_2 WHERE ROWNUM <= [POSTCOMPILE_param_1]) anon_1 "
"LEFT OUTER JOIN addresses addresses_1 "
"ON anon_1.users_id = addresses_1.user_id FOR UPDATE",
dialect="oracle",
"paperwork.paperwork_id AS paperwork_paperwork_id, "
"paperwork.description AS paperwork_description "
"FROM paperwork WHERE paperwork.person_id "
- "IN ([EXPANDING_primary_keys]) "
+ "IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY paperwork.person_id, paperwork.paperwork_id",
[{"primary_keys": [1]}],
),
"paperwork.paperwork_id AS paperwork_paperwork_id, "
"paperwork.description AS paperwork_description "
"FROM paperwork WHERE paperwork.person_id "
- "IN ([EXPANDING_primary_keys]) "
+ "IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY paperwork.person_id, paperwork.paperwork_id",
[{"primary_keys": [1]}],
),
"paperwork.paperwork_id AS paperwork_paperwork_id, "
"paperwork.description AS paperwork_description "
"FROM paperwork WHERE paperwork.person_id "
- "IN ([EXPANDING_primary_keys]) "
+ "IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY paperwork.person_id, paperwork.paperwork_id",
[{"primary_keys": [1]}],
),
"paperwork.paperwork_id AS paperwork_paperwork_id, "
"paperwork.description AS paperwork_description "
"FROM paperwork WHERE paperwork.person_id "
- "IN ([EXPANDING_primary_keys]) "
+ "IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY paperwork.person_id, paperwork.paperwork_id",
[{"primary_keys": [1]}],
),
"paperwork.paperwork_id AS paperwork_paperwork_id, "
"paperwork.description AS paperwork_description "
"FROM paperwork WHERE paperwork.person_id "
- "IN ([EXPANDING_primary_keys]) "
+ "IN ([POSTCOMPILE_primary_keys]) "
"ORDER BY paperwork.person_id, paperwork.paperwork_id",
[{"primary_keys": [1]}],
),
CompiledSQL(
"SELECT b.a_id1 AS b_a_id1, b.a_id2 AS b_a_id2, b.id AS b_id "
"FROM b WHERE (b.a_id1, b.a_id2) IN "
- "([EXPANDING_primary_keys]) ORDER BY b.a_id1, b.a_id2, b.id",
+ "([POSTCOMPILE_primary_keys]) ORDER BY b.a_id1, b.a_id2, b.id",
[{"primary_keys": [(i, i + 2) for i in range(1, 20)]}],
),
)
),
CompiledSQL(
"SELECT a.id1 AS a_id1, a.id2 AS a_id2 FROM a "
- "WHERE (a.id1, a.id2) IN ([EXPANDING_primary_keys])",
+ "WHERE (a.id1, a.id2) IN ([POSTCOMPILE_primary_keys])",
[{"primary_keys": [(i, i + 2) for i in range(1, 20)]}],
),
)
CompiledSQL(
"SELECT b.a_id AS b_a_id, b.id AS b_id "
"FROM b WHERE b.a_id IN "
- "([EXPANDING_primary_keys]) ORDER BY b.a_id, b.id",
+ "([POSTCOMPILE_primary_keys]) ORDER BY b.a_id, b.id",
{"primary_keys": list(range(1, 48))},
),
CompiledSQL(
"SELECT b.a_id AS b_a_id, b.id AS b_id "
"FROM b WHERE b.a_id IN "
- "([EXPANDING_primary_keys]) ORDER BY b.a_id, b.id",
+ "([POSTCOMPILE_primary_keys]) ORDER BY b.a_id, b.id",
{"primary_keys": list(range(48, 95))},
),
CompiledSQL(
"SELECT b.a_id AS b_a_id, b.id AS b_id "
"FROM b WHERE b.a_id IN "
- "([EXPANDING_primary_keys]) ORDER BY b.a_id, b.id",
+ "([POSTCOMPILE_primary_keys]) ORDER BY b.a_id, b.id",
{"primary_keys": list(range(95, 101))},
),
)
# chunk size is 47. so first chunk are a 1->47...
CompiledSQL(
"SELECT a.id AS a_id FROM a WHERE a.id IN "
- "([EXPANDING_primary_keys])",
+ "([POSTCOMPILE_primary_keys])",
{"primary_keys": list(range(1, 48))},
),
# second chunk is a 48-94
CompiledSQL(
"SELECT a.id AS a_id FROM a WHERE a.id IN "
- "([EXPANDING_primary_keys])",
+ "([POSTCOMPILE_primary_keys])",
{"primary_keys": list(range(48, 95))},
),
# third and final chunk 95-100.
CompiledSQL(
"SELECT a.id AS a_id FROM a WHERE a.id IN "
- "([EXPANDING_primary_keys])",
+ "([POSTCOMPILE_primary_keys])",
{"primary_keys": list(range(95, 101))},
),
)
"SELECT foo_1.id AS foo_1_id, "
"foo_1.type AS foo_1_type, foo_1.foo_id AS foo_1_foo_id "
"FROM foo AS foo_1 "
- "WHERE foo_1.id IN ([EXPANDING_primary_keys])",
+ "WHERE foo_1.id IN ([POSTCOMPILE_primary_keys])",
{"primary_keys": [3]},
),
CompiledSQL(
"SELECT foo_1.id AS foo_1_id, "
"foo_1.type AS foo_1_type, foo_1.foo_id AS foo_1_foo_id "
"FROM foo AS foo_1 "
- "WHERE foo_1.id IN ([EXPANDING_primary_keys])",
+ "WHERE foo_1.id IN ([POSTCOMPILE_primary_keys])",
{"primary_keys": [1]},
),
)
CompiledSQL(
"SELECT role.user_id AS role_user_id, role.id AS role_id "
"FROM role WHERE role.user_id "
- "IN ([EXPANDING_primary_keys]) ORDER BY role.user_id",
+ "IN ([POSTCOMPILE_primary_keys]) ORDER BY role.user_id",
{"primary_keys": [1]},
),
)
),
CompiledSQL(
"SELECT b.id AS b_id, b.x AS b_x, b.y AS b_y "
- "FROM b WHERE b.id IN ([EXPANDING_primary_keys])",
+ "FROM b WHERE b.id IN ([POSTCOMPILE_primary_keys])",
[{"primary_keys": [1, 2]}],
),
)
"SELECT a_1.id AS a_1_id, b.id AS b_id, b.x AS b_x, "
"b.y AS b_y "
"FROM a AS a_1 JOIN b ON b.id = a_1.b_id "
- "WHERE a_1.id IN ([EXPANDING_primary_keys]) ORDER BY a_1.id",
+ "WHERE a_1.id IN ([POSTCOMPILE_primary_keys]) ORDER BY a_1.id",
[{"primary_keys": [1, 3]}],
),
)
),
CompiledSQL(
"SELECT b.id AS b_id, b.x AS b_x, b.y AS b_y "
- "FROM b WHERE b.id IN ([EXPANDING_primary_keys])",
+ "FROM b WHERE b.id IN ([POSTCOMPILE_primary_keys])",
[{"primary_keys": [1, 2]}],
),
)
"SELECT a_1.id AS a_1_id, b.id AS b_id, b.x AS b_x, "
"b.y AS b_y "
"FROM a AS a_1 JOIN b ON b.id = a_1.b_id "
- "WHERE a_1.id IN ([EXPANDING_primary_keys]) ORDER BY a_1.id",
+ "WHERE a_1.id IN ([POSTCOMPILE_primary_keys]) ORDER BY a_1.id",
[{"primary_keys": [1, 2, 3, 4, 5]}],
),
)
checkparams=params,
)
- def test_limit_offset_select_literal_binds(self):
- stmt = select([1]).limit(5).offset(6)
- self.assert_compile(
- stmt, "SELECT 1 LIMIT 5 OFFSET 6", literal_binds=True
- )
-
- def test_limit_offset_compound_select_literal_binds(self):
- stmt = select([1]).union(select([2])).limit(5).offset(6)
- self.assert_compile(
- stmt,
- "SELECT 1 UNION SELECT 2 LIMIT 5 OFFSET 6",
- literal_binds=True,
- )
-
def test_select_precol_compile_ordering(self):
s1 = (
select([column("x")])
"SELECT mytable.myid FROM mytable",
)
- def test_multiple_col_binds(self):
- self.assert_compile(
- select(
- [literal_column("*")],
- or_(
- table1.c.myid == 12,
- table1.c.myid == "asdf",
- table1.c.myid == "foo",
- ),
- ),
- "SELECT * FROM mytable WHERE mytable.myid = :myid_1 "
- "OR mytable.myid = :myid_2 OR mytable.myid = :myid_3",
- )
-
def test_order_by_nulls(self):
self.assert_compile(
table2.select(
dialect=mysql.dialect(),
)
- def test_render_binds_as_literal(self):
- """test a compiler that renders binds inline into
- SQL in the columns clause."""
-
- dialect = default.DefaultDialect()
-
- class Compiler(dialect.statement_compiler):
- ansi_bind_rules = True
-
- dialect.statement_compiler = Compiler
-
- self.assert_compile(
- select([literal("someliteral")]),
- "SELECT 'someliteral' AS anon_1",
- dialect=dialect,
- )
-
- self.assert_compile(
- select([table1.c.myid + 3]),
- "SELECT mytable.myid + 3 AS anon_1 FROM mytable",
- dialect=dialect,
- )
-
- self.assert_compile(
- select([table1.c.myid.in_([4, 5, 6])]),
- "SELECT mytable.myid IN (4, 5, 6) AS anon_1 FROM mytable",
- dialect=dialect,
- )
-
- self.assert_compile(
- select([func.mod(table1.c.myid, 5)]),
- "SELECT mod(mytable.myid, 5) AS mod_1 FROM mytable",
- dialect=dialect,
- )
-
- self.assert_compile(
- select([literal("foo").in_([])]),
- "SELECT 1 != 1 AS anon_1",
- dialect=dialect,
- )
-
- self.assert_compile(
- select([literal(util.b("foo"))]),
- "SELECT 'foo' AS anon_1",
- dialect=dialect,
- )
-
- # test callable
- self.assert_compile(
- select([table1.c.myid == bindparam("foo", callable_=lambda: 5)]),
- "SELECT mytable.myid = 5 AS anon_1 FROM mytable",
- dialect=dialect,
- )
-
- empty_in_dialect = default.DefaultDialect(empty_in_strategy="dynamic")
- empty_in_dialect.statement_compiler = Compiler
-
- assert_raises_message(
- exc.CompileError,
- "Bind parameter 'foo' without a "
- "renderable value not allowed here.",
- bindparam("foo").in_([]).compile,
- dialect=empty_in_dialect,
- )
-
def test_collate(self):
# columns clause
self.assert_compile(
" LIMIT -1 OFFSET :param_2) AS anon_2",
)
- def test_binds(self):
- for (
- stmt,
- expected_named_stmt,
- expected_positional_stmt,
- expected_default_params_dict,
- expected_default_params_list,
- test_param_dict,
- expected_test_params_dict,
- expected_test_params_list,
- ) in [
- (
- select(
- [table1, table2],
- and_(
- table1.c.myid == table2.c.otherid,
- table1.c.name == bindparam("mytablename"),
- ),
- ),
- "SELECT mytable.myid, mytable.name, mytable.description, "
- "myothertable.otherid, myothertable.othername FROM mytable, "
- "myothertable WHERE mytable.myid = myothertable.otherid "
- "AND mytable.name = :mytablename",
- "SELECT mytable.myid, mytable.name, mytable.description, "
- "myothertable.otherid, myothertable.othername FROM mytable, "
- "myothertable WHERE mytable.myid = myothertable.otherid AND "
- "mytable.name = ?",
- {"mytablename": None},
- [None],
- {"mytablename": 5},
- {"mytablename": 5},
- [5],
- ),
- (
- select(
- [table1],
- or_(
- table1.c.myid == bindparam("myid"),
- table2.c.otherid == bindparam("myid"),
- ),
- ),
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable, myothertable WHERE mytable.myid = :myid "
- "OR myothertable.otherid = :myid",
- "SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable, myothertable WHERE mytable.myid = ? "
- "OR myothertable.otherid = ?",
- {"myid": None},
- [None, None],
- {"myid": 5},
- {"myid": 5},
- [5, 5],
- ),
- (
- text(
- "SELECT mytable.myid, mytable.name, "
- "mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = :myid OR "
- "myothertable.otherid = :myid"
- ),
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = :myid OR "
- "myothertable.otherid = :myid",
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = ? OR "
- "myothertable.otherid = ?",
- {"myid": None},
- [None, None],
- {"myid": 5},
- {"myid": 5},
- [5, 5],
- ),
- (
- select(
- [table1],
- or_(
- table1.c.myid == bindparam("myid", unique=True),
- table2.c.otherid == bindparam("myid", unique=True),
- ),
- ),
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = "
- ":myid_1 OR myothertable.otherid = :myid_2",
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = ? "
- "OR myothertable.otherid = ?",
- {"myid_1": None, "myid_2": None},
- [None, None],
- {"myid_1": 5, "myid_2": 6},
- {"myid_1": 5, "myid_2": 6},
- [5, 6],
- ),
- (
- bindparam("test", type_=String, required=False) + text("'hi'"),
- ":test || 'hi'",
- "? || 'hi'",
- {"test": None},
- [None],
- {},
- {"test": None},
- [None],
- ),
- (
- # testing select.params() here - bindparam() objects
- # must get required flag set to False
- select(
- [table1],
- or_(
- table1.c.myid == bindparam("myid"),
- table2.c.otherid == bindparam("myotherid"),
- ),
- ).params({"myid": 8, "myotherid": 7}),
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = "
- ":myid OR myothertable.otherid = :myotherid",
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = "
- "? OR myothertable.otherid = ?",
- {"myid": 8, "myotherid": 7},
- [8, 7],
- {"myid": 5},
- {"myid": 5, "myotherid": 7},
- [5, 7],
- ),
- (
- select(
- [table1],
- or_(
- table1.c.myid
- == bindparam("myid", value=7, unique=True),
- table2.c.otherid
- == bindparam("myid", value=8, unique=True),
- ),
- ),
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = "
- ":myid_1 OR myothertable.otherid = :myid_2",
- "SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = "
- "? OR myothertable.otherid = ?",
- {"myid_1": 7, "myid_2": 8},
- [7, 8],
- {"myid_1": 5, "myid_2": 6},
- {"myid_1": 5, "myid_2": 6},
- [5, 6],
- ),
- ]:
+ def test_cast(self):
+ tbl = table(
+ "casttest",
+ column("id", Integer),
+ column("v1", Float),
+ column("v2", Float),
+ column("ts", TIMESTAMP),
+ )
- self.assert_compile(
- stmt, expected_named_stmt, params=expected_default_params_dict
- )
- self.assert_compile(
- stmt, expected_positional_stmt, dialect=sqlite.dialect()
- )
- nonpositional = stmt.compile()
- positional = stmt.compile(dialect=sqlite.dialect())
- pp = positional.params
- eq_(
- [pp[k] for k in positional.positiontup],
- expected_default_params_list,
- )
-
- eq_(
- nonpositional.construct_params(test_param_dict),
- expected_test_params_dict,
- )
- pp = positional.construct_params(test_param_dict)
- eq_(
- [pp[k] for k in positional.positiontup],
- expected_test_params_list,
- )
-
- # check that params() doesn't modify original statement
- s = select(
- [table1],
- or_(
- table1.c.myid == bindparam("myid"),
- table2.c.otherid == bindparam("myotherid"),
- ),
- )
- s2 = s.params({"myid": 8, "myotherid": 7})
- s3 = s2.params({"myid": 9})
- assert s.compile().params == {"myid": None, "myotherid": None}
- assert s2.compile().params == {"myid": 8, "myotherid": 7}
- assert s3.compile().params == {"myid": 9, "myotherid": 7}
-
- # test using same 'unique' param object twice in one compile
- s = (
- select([table1.c.myid])
- .where(table1.c.myid == 12)
- .scalar_subquery()
- )
- s2 = select([table1, s], table1.c.myid == s)
- self.assert_compile(
- s2,
- "SELECT mytable.myid, mytable.name, mytable.description, "
- "(SELECT mytable.myid FROM mytable WHERE mytable.myid = "
- ":myid_1) AS anon_1 FROM mytable WHERE mytable.myid = "
- "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)",
- )
- positional = s2.compile(dialect=sqlite.dialect())
-
- pp = positional.params
- assert [pp[k] for k in positional.positiontup] == [12, 12]
-
- # check that conflicts with "unique" params are caught
- s = select(
- [table1],
- or_(table1.c.myid == 7, table1.c.myid == bindparam("myid_1")),
- )
- assert_raises_message(
- exc.CompileError,
- "conflicts with unique bind parameter " "of the same name",
- str,
- s,
- )
-
- s = select(
- [table1],
- or_(
- table1.c.myid == 7,
- table1.c.myid == 8,
- table1.c.myid == bindparam("myid_1"),
- ),
- )
- assert_raises_message(
- exc.CompileError,
- "conflicts with unique bind parameter " "of the same name",
- str,
- s,
- )
-
- def _test_binds_no_hash_collision(self):
- """test that construct_params doesn't corrupt dict
- due to hash collisions"""
-
- total_params = 100000
-
- in_clause = [":in%d" % i for i in range(total_params)]
- params = dict(("in%d" % i, i) for i in range(total_params))
- t = text("text clause %s" % ", ".join(in_clause))
- eq_(len(t.bindparams), total_params)
- c = t.compile()
- pp = c.construct_params(params)
- eq_(len(set(pp)), total_params, "%s %s" % (len(set(pp)), len(pp)))
- eq_(len(set(pp.values())), total_params)
-
- def test_bind_as_col(self):
- t = table("foo", column("id"))
-
- s = select([t, literal("lala").label("hoho")])
- self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo")
-
- assert [str(c) for c in s.subquery().c] == ["anon_1.id", "anon_1.hoho"]
-
- def test_bind_callable(self):
- expr = column("x") == bindparam("key", callable_=lambda: 12)
- self.assert_compile(expr, "x = :key", {"x": 12})
-
- def test_bind_params_missing(self):
- assert_raises_message(
- exc.InvalidRequestError,
- r"A value is required for bind parameter 'x'",
- select([table1])
- .where(
- and_(
- table1.c.myid == bindparam("x", required=True),
- table1.c.name == bindparam("y", required=True),
- )
- )
- .compile()
- .construct_params,
- params=dict(y=5),
- )
-
- assert_raises_message(
- exc.InvalidRequestError,
- r"A value is required for bind parameter 'x'",
- select([table1])
- .where(table1.c.myid == bindparam("x", required=True))
- .compile()
- .construct_params,
- )
-
- assert_raises_message(
- exc.InvalidRequestError,
- r"A value is required for bind parameter 'x', "
- "in parameter group 2",
- select([table1])
- .where(
- and_(
- table1.c.myid == bindparam("x", required=True),
- table1.c.name == bindparam("y", required=True),
- )
- )
- .compile()
- .construct_params,
- params=dict(y=5),
- _group_number=2,
- )
-
- assert_raises_message(
- exc.InvalidRequestError,
- r"A value is required for bind parameter 'x', "
- "in parameter group 2",
- select([table1])
- .where(table1.c.myid == bindparam("x", required=True))
- .compile()
- .construct_params,
- _group_number=2,
- )
-
- def test_tuple(self):
- self.assert_compile(
- tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]),
- "(mytable.myid, mytable.name) IN "
- "((:param_1, :param_2), (:param_3, :param_4))",
- )
-
- dialect = default.DefaultDialect()
- dialect.tuple_in_values = True
- self.assert_compile(
- tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]),
- "(mytable.myid, mytable.name) IN "
- "(VALUES (:param_1, :param_2), (:param_3, :param_4))",
- dialect=dialect,
- )
-
- self.assert_compile(
- tuple_(table1.c.myid, table1.c.name).in_(
- [tuple_(table2.c.otherid, table2.c.othername)]
- ),
- "(mytable.myid, mytable.name) IN "
- "((myothertable.otherid, myothertable.othername))",
- )
-
- self.assert_compile(
- tuple_(table1.c.myid, table1.c.name).in_(
- select([table2.c.otherid, table2.c.othername])
- ),
- "(mytable.myid, mytable.name) IN (SELECT "
- "myothertable.otherid, myothertable.othername FROM myothertable)",
- )
-
- def test_expanding_parameter(self):
- self.assert_compile(
- tuple_(table1.c.myid, table1.c.name).in_(
- bindparam("foo", expanding=True)
- ),
- "(mytable.myid, mytable.name) IN ([EXPANDING_foo])",
- )
-
- dialect = default.DefaultDialect()
- dialect.tuple_in_values = True
- self.assert_compile(
- tuple_(table1.c.myid, table1.c.name).in_(
- bindparam("foo", expanding=True)
- ),
- "(mytable.myid, mytable.name) IN ([EXPANDING_foo])",
- dialect=dialect,
- )
-
- self.assert_compile(
- table1.c.myid.in_(bindparam("foo", expanding=True)),
- "mytable.myid IN ([EXPANDING_foo])",
- )
-
- def test_cast(self):
- tbl = table(
- "casttest",
- column("id", Integer),
- column("v1", Float),
- column("v2", Float),
- column("ts", TIMESTAMP),
- )
-
- def check_results(dialect, expected_results, literal):
- eq_(
- len(expected_results),
- 5,
- "Incorrect number of expected results",
+ def check_results(dialect, expected_results, literal):
+ eq_(
+ len(expected_results),
+ 5,
+ "Incorrect number of expected results",
)
eq_(
str(cast(tbl.c.v1, Numeric).compile(dialect=dialect)),
assert_raises(exc.ArgumentError, and_, ("a",), ("b",))
+class BindParameterTest(AssertsCompiledSQL, fixtures.TestBase):
+ __dialect__ = "default"
+
+ def test_binds(self):
+ for (
+ stmt,
+ expected_named_stmt,
+ expected_positional_stmt,
+ expected_default_params_dict,
+ expected_default_params_list,
+ test_param_dict,
+ expected_test_params_dict,
+ expected_test_params_list,
+ ) in [
+ (
+ select(
+ [table1, table2],
+ and_(
+ table1.c.myid == table2.c.otherid,
+ table1.c.name == bindparam("mytablename"),
+ ),
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description, "
+ "myothertable.otherid, myothertable.othername FROM mytable, "
+ "myothertable WHERE mytable.myid = myothertable.otherid "
+ "AND mytable.name = :mytablename",
+ "SELECT mytable.myid, mytable.name, mytable.description, "
+ "myothertable.otherid, myothertable.othername FROM mytable, "
+ "myothertable WHERE mytable.myid = myothertable.otherid AND "
+ "mytable.name = ?",
+ {"mytablename": None},
+ [None],
+ {"mytablename": 5},
+ {"mytablename": 5},
+ [5],
+ ),
+ (
+ select(
+ [table1],
+ or_(
+ table1.c.myid == bindparam("myid"),
+ table2.c.otherid == bindparam("myid"),
+ ),
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable, myothertable WHERE mytable.myid = :myid "
+ "OR myothertable.otherid = :myid",
+ "SELECT mytable.myid, mytable.name, mytable.description "
+ "FROM mytable, myothertable WHERE mytable.myid = ? "
+ "OR myothertable.otherid = ?",
+ {"myid": None},
+ [None, None],
+ {"myid": 5},
+ {"myid": 5},
+ [5, 5],
+ ),
+ (
+ text(
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = :myid OR "
+ "myothertable.otherid = :myid"
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = :myid OR "
+ "myothertable.otherid = :myid",
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = ? OR "
+ "myothertable.otherid = ?",
+ {"myid": None},
+ [None, None],
+ {"myid": 5},
+ {"myid": 5},
+ [5, 5],
+ ),
+ (
+ select(
+ [table1],
+ or_(
+ table1.c.myid == bindparam("myid", unique=True),
+ table2.c.otherid == bindparam("myid", unique=True),
+ ),
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = "
+ ":myid_1 OR myothertable.otherid = :myid_2",
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = ? "
+ "OR myothertable.otherid = ?",
+ {"myid_1": None, "myid_2": None},
+ [None, None],
+ {"myid_1": 5, "myid_2": 6},
+ {"myid_1": 5, "myid_2": 6},
+ [5, 6],
+ ),
+ (
+ bindparam("test", type_=String, required=False) + text("'hi'"),
+ ":test || 'hi'",
+ "? || 'hi'",
+ {"test": None},
+ [None],
+ {},
+ {"test": None},
+ [None],
+ ),
+ (
+ # testing select.params() here - bindparam() objects
+ # must get required flag set to False
+ select(
+ [table1],
+ or_(
+ table1.c.myid == bindparam("myid"),
+ table2.c.otherid == bindparam("myotherid"),
+ ),
+ ).params({"myid": 8, "myotherid": 7}),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = "
+ ":myid OR myothertable.otherid = :myotherid",
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = "
+ "? OR myothertable.otherid = ?",
+ {"myid": 8, "myotherid": 7},
+ [8, 7],
+ {"myid": 5},
+ {"myid": 5, "myotherid": 7},
+ [5, 7],
+ ),
+ (
+ select(
+ [table1],
+ or_(
+ table1.c.myid
+ == bindparam("myid", value=7, unique=True),
+ table2.c.otherid
+ == bindparam("myid", value=8, unique=True),
+ ),
+ ),
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = "
+ ":myid_1 OR myothertable.otherid = :myid_2",
+ "SELECT mytable.myid, mytable.name, mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = "
+ "? OR myothertable.otherid = ?",
+ {"myid_1": 7, "myid_2": 8},
+ [7, 8],
+ {"myid_1": 5, "myid_2": 6},
+ {"myid_1": 5, "myid_2": 6},
+ [5, 6],
+ ),
+ ]:
+
+ self.assert_compile(
+ stmt, expected_named_stmt, params=expected_default_params_dict
+ )
+ self.assert_compile(
+ stmt, expected_positional_stmt, dialect=sqlite.dialect()
+ )
+ nonpositional = stmt.compile()
+ positional = stmt.compile(dialect=sqlite.dialect())
+ pp = positional.params
+ eq_(
+ [pp[k] for k in positional.positiontup],
+ expected_default_params_list,
+ )
+
+ eq_(
+ nonpositional.construct_params(test_param_dict),
+ expected_test_params_dict,
+ )
+ pp = positional.construct_params(test_param_dict)
+ eq_(
+ [pp[k] for k in positional.positiontup],
+ expected_test_params_list,
+ )
+
+ # check that params() doesn't modify original statement
+ s = select(
+ [table1],
+ or_(
+ table1.c.myid == bindparam("myid"),
+ table2.c.otherid == bindparam("myotherid"),
+ ),
+ )
+ s2 = s.params({"myid": 8, "myotherid": 7})
+ s3 = s2.params({"myid": 9})
+ assert s.compile().params == {"myid": None, "myotherid": None}
+ assert s2.compile().params == {"myid": 8, "myotherid": 7}
+ assert s3.compile().params == {"myid": 9, "myotherid": 7}
+
+ # test using same 'unique' param object twice in one compile
+ s = (
+ select([table1.c.myid])
+ .where(table1.c.myid == 12)
+ .scalar_subquery()
+ )
+ s2 = select([table1, s], table1.c.myid == s)
+ self.assert_compile(
+ s2,
+ "SELECT mytable.myid, mytable.name, mytable.description, "
+ "(SELECT mytable.myid FROM mytable WHERE mytable.myid = "
+ ":myid_1) AS anon_1 FROM mytable WHERE mytable.myid = "
+ "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)",
+ )
+ positional = s2.compile(dialect=sqlite.dialect())
+
+ pp = positional.params
+ assert [pp[k] for k in positional.positiontup] == [12, 12]
+
+ # check that conflicts with "unique" params are caught
+ s = select(
+ [table1],
+ or_(table1.c.myid == 7, table1.c.myid == bindparam("myid_1")),
+ )
+ assert_raises_message(
+ exc.CompileError,
+ "conflicts with unique bind parameter " "of the same name",
+ str,
+ s,
+ )
+
+ s = select(
+ [table1],
+ or_(
+ table1.c.myid == 7,
+ table1.c.myid == 8,
+ table1.c.myid == bindparam("myid_1"),
+ ),
+ )
+ assert_raises_message(
+ exc.CompileError,
+ "conflicts with unique bind parameter " "of the same name",
+ str,
+ s,
+ )
+
+ def _test_binds_no_hash_collision(self):
+ """test that construct_params doesn't corrupt dict
+ due to hash collisions"""
+
+ total_params = 100000
+
+ in_clause = [":in%d" % i for i in range(total_params)]
+ params = dict(("in%d" % i, i) for i in range(total_params))
+ t = text("text clause %s" % ", ".join(in_clause))
+ eq_(len(t.bindparams), total_params)
+ c = t.compile()
+ pp = c.construct_params(params)
+ eq_(len(set(pp)), total_params, "%s %s" % (len(set(pp)), len(pp)))
+ eq_(len(set(pp.values())), total_params)
+
+ def test_bind_as_col(self):
+ t = table("foo", column("id"))
+
+ s = select([t, literal("lala").label("hoho")])
+ self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo")
+
+ assert [str(c) for c in s.subquery().c] == ["anon_1.id", "anon_1.hoho"]
+
+ def test_bind_callable(self):
+ expr = column("x") == bindparam("key", callable_=lambda: 12)
+ self.assert_compile(expr, "x = :key", {"x": 12})
+
+ def test_bind_params_missing(self):
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"A value is required for bind parameter 'x'",
+ select([table1])
+ .where(
+ and_(
+ table1.c.myid == bindparam("x", required=True),
+ table1.c.name == bindparam("y", required=True),
+ )
+ )
+ .compile()
+ .construct_params,
+ params=dict(y=5),
+ )
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"A value is required for bind parameter 'x'",
+ select([table1])
+ .where(table1.c.myid == bindparam("x", required=True))
+ .compile()
+ .construct_params,
+ )
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"A value is required for bind parameter 'x', "
+ "in parameter group 2",
+ select([table1])
+ .where(
+ and_(
+ table1.c.myid == bindparam("x", required=True),
+ table1.c.name == bindparam("y", required=True),
+ )
+ )
+ .compile()
+ .construct_params,
+ params=dict(y=5),
+ _group_number=2,
+ )
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ r"A value is required for bind parameter 'x', "
+ "in parameter group 2",
+ select([table1])
+ .where(table1.c.myid == bindparam("x", required=True))
+ .compile()
+ .construct_params,
+ _group_number=2,
+ )
+
+ def test_tuple(self):
+ self.assert_compile(
+ tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]),
+ "(mytable.myid, mytable.name) IN "
+ "((:param_1, :param_2), (:param_3, :param_4))",
+ )
+
+ dialect = default.DefaultDialect()
+ dialect.tuple_in_values = True
+ self.assert_compile(
+ tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]),
+ "(mytable.myid, mytable.name) IN "
+ "(VALUES (:param_1, :param_2), (:param_3, :param_4))",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ tuple_(table1.c.myid, table1.c.name).in_(
+ [tuple_(table2.c.otherid, table2.c.othername)]
+ ),
+ "(mytable.myid, mytable.name) IN "
+ "((myothertable.otherid, myothertable.othername))",
+ )
+
+ self.assert_compile(
+ tuple_(table1.c.myid, table1.c.name).in_(
+ select([table2.c.otherid, table2.c.othername])
+ ),
+ "(mytable.myid, mytable.name) IN (SELECT "
+ "myothertable.otherid, myothertable.othername FROM myothertable)",
+ )
+
+ def test_expanding_parameter(self):
+ self.assert_compile(
+ tuple_(table1.c.myid, table1.c.name).in_(
+ bindparam("foo", expanding=True)
+ ),
+ "(mytable.myid, mytable.name) IN ([POSTCOMPILE_foo])",
+ )
+
+ dialect = default.DefaultDialect()
+ dialect.tuple_in_values = True
+ self.assert_compile(
+ tuple_(table1.c.myid, table1.c.name).in_(
+ bindparam("foo", expanding=True)
+ ),
+ "(mytable.myid, mytable.name) IN ([POSTCOMPILE_foo])",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ table1.c.myid.in_(bindparam("foo", expanding=True)),
+ "mytable.myid IN ([POSTCOMPILE_foo])",
+ )
+
+ def test_limit_offset_select_literal_binds(self):
+ stmt = select([1]).limit(5).offset(6)
+ self.assert_compile(
+ stmt, "SELECT 1 LIMIT 5 OFFSET 6", literal_binds=True
+ )
+
+ def test_limit_offset_compound_select_literal_binds(self):
+ stmt = select([1]).union(select([2])).limit(5).offset(6)
+ self.assert_compile(
+ stmt,
+ "SELECT 1 UNION SELECT 2 LIMIT 5 OFFSET 6",
+ literal_binds=True,
+ )
+
+ def test_multiple_col_binds(self):
+ self.assert_compile(
+ select(
+ [literal_column("*")],
+ or_(
+ table1.c.myid == 12,
+ table1.c.myid == "asdf",
+ table1.c.myid == "foo",
+ ),
+ ),
+ "SELECT * FROM mytable WHERE mytable.myid = :myid_1 "
+ "OR mytable.myid = :myid_2 OR mytable.myid = :myid_3",
+ )
+
+ def test_render_binds_as_literal(self):
+ """test a compiler that renders binds inline into
+ SQL in the columns clause."""
+
+ dialect = default.DefaultDialect()
+
+ class Compiler(dialect.statement_compiler):
+ ansi_bind_rules = True
+
+ dialect.statement_compiler = Compiler
+
+ self.assert_compile(
+ select([literal("someliteral")]),
+ "SELECT 'someliteral' AS anon_1",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ select([table1.c.myid + 3]),
+ "SELECT mytable.myid + 3 AS anon_1 FROM mytable",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ select([table1.c.myid.in_([4, 5, 6])]),
+ "SELECT mytable.myid IN (4, 5, 6) AS anon_1 FROM mytable",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ select([func.mod(table1.c.myid, 5)]),
+ "SELECT mod(mytable.myid, 5) AS mod_1 FROM mytable",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ select([literal("foo").in_([])]),
+ "SELECT 1 != 1 AS anon_1",
+ dialect=dialect,
+ )
+
+ self.assert_compile(
+ select([literal(util.b("foo"))]),
+ "SELECT 'foo' AS anon_1",
+ dialect=dialect,
+ )
+
+ # test callable
+ self.assert_compile(
+ select([table1.c.myid == bindparam("foo", callable_=lambda: 5)]),
+ "SELECT mytable.myid = 5 AS anon_1 FROM mytable",
+ dialect=dialect,
+ )
+
+ empty_in_dialect = default.DefaultDialect(empty_in_strategy="dynamic")
+ empty_in_dialect.statement_compiler = Compiler
+
+ assert_raises_message(
+ exc.CompileError,
+ "Bind parameter 'foo' without a "
+ "renderable value not allowed here.",
+ bindparam("foo").in_([]).compile,
+ dialect=empty_in_dialect,
+ )
+
+ def test_render_literal_execute_parameter(self):
+ self.assert_compile(
+ select([table1.c.myid]).where(
+ table1.c.myid == bindparam("foo", 5, literal_execute=True)
+ ),
+ "SELECT mytable.myid FROM mytable "
+ "WHERE mytable.myid = [POSTCOMPILE_foo]",
+ )
+
+ def test_render_literal_execute_parameter_literal_binds(self):
+ self.assert_compile(
+ select([table1.c.myid]).where(
+ table1.c.myid == bindparam("foo", 5, literal_execute=True)
+ ),
+ "SELECT mytable.myid FROM mytable " "WHERE mytable.myid = 5",
+ literal_binds=True,
+ )
+
+ def test_render_expanding_parameter(self):
+ self.assert_compile(
+ select([table1.c.myid]).where(
+ table1.c.myid.in_(bindparam("foo", expanding=True))
+ ),
+ "SELECT mytable.myid FROM mytable "
+ "WHERE mytable.myid IN ([POSTCOMPILE_foo])",
+ )
+
+ def test_render_expanding_parameter_literal_binds(self):
+ self.assert_compile(
+ select([table1.c.myid]).where(
+ table1.c.myid.in_(bindparam("foo", [1, 2, 3], expanding=True))
+ ),
+ "SELECT mytable.myid FROM mytable "
+ "WHERE mytable.myid IN (1, 2, 3)",
+ literal_binds=True,
+ )
+
+
class UnsupportedTest(fixtures.TestBase):
def test_unsupported_element_str_visit_name(self):
from sqlalchemy.sql.expression import ClauseElement
+from sqlalchemy import bindparam
from sqlalchemy import Column
from sqlalchemy import exc
from sqlalchemy import Integer
from sqlalchemy.sql.elements import Null
from sqlalchemy.sql.selectable import FromGrouping
from sqlalchemy.sql.selectable import SelectStatementGrouping
+from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
)
)
+ def test_offset_or_limit_role_only_ints_or_clauseelement(self):
+ assert_raises(ValueError, select([t]).limit, "some limit")
+
+ assert_raises(ValueError, select([t]).offset, "some offset")
+
+ def test_offset_or_limit_role_clauseelement(self):
+ bind = bindparam("x")
+ stmt = select([t]).limit(bind)
+ is_(stmt._limit_clause, bind)
+
+ stmt = select([t]).offset(bind)
+ is_(stmt._offset_clause, bind)
+
def test_from_clause_is_not_a_select(self):
assert_raises_message(
exc.ArgumentError,
dialect="postgresql",
)
+ def test_unique_binds(self):
+ # unique binds can be used in text() however they uniquify across
+ # multiple text() constructs only, not within a single text
+
+ t1 = text("select :foo").bindparams(bindparam("foo", 5, unique=True))
+ t2 = text("select :foo").bindparams(bindparam("foo", 10, unique=True))
+ stmt = select([t1, t2])
+ self.assert_compile(
+ stmt,
+ "SELECT select :foo_1, select :foo_2",
+ checkparams={"foo_1": 5, "foo_2": 10},
+ )
+
def test_binds_compiled_positional(self):
self.assert_compile(
text(
from sqlalchemy.schema import CheckConstraint
from sqlalchemy.sql import column
from sqlalchemy.sql import ddl
+from sqlalchemy.sql import elements
from sqlalchemy.sql import null
from sqlalchemy.sql import operators
from sqlalchemy.sql import sqltypes
],
)
+ def test_grouped_bind_adapt(self):
+ expr = test_table.c.atimestamp == elements.Grouping(
+ bindparam("thedate")
+ )
+ eq_(expr.right.type._type_affinity, Date)
+ eq_(expr.right.element.type._type_affinity, Date)
+
+ expr = test_table.c.atimestamp == elements.Grouping(
+ elements.Grouping(bindparam("thedate"))
+ )
+ eq_(expr.right.type._type_affinity, Date)
+ eq_(expr.right.element.type._type_affinity, Date)
+ eq_(expr.right.element.element.type._type_affinity, Date)
+
def test_bind_adapt_update(self):
bp = bindparam("somevalue")
stmt = test_table.update().values(avalue=bp)
eq_(row["non_native_interval"], None)
+class IntegerTest(fixtures.TestBase):
+ def test_integer_literal_processor(self):
+ typ = Integer()
+ eq_(typ._cached_literal_processor(testing.db.dialect)(5), "5")
+
+ assert_raises(
+ ValueError,
+ typ._cached_literal_processor(testing.db.dialect),
+ "notanint",
+ )
+
+
class BooleanTest(
fixtures.TablesTest, AssertsExecutionResults, AssertsCompiledSQL
):