from sqlalchemy.sql import expression
from sqlalchemy.engine import base, default, reflection
from sqlalchemy.sql import compiler
-
+from sqlalchemy.sql.elements import _literal_as_binds
from sqlalchemy.types import (BIGINT, BLOB, DATE, FLOAT, INTEGER, NUMERIC,
SMALLINT, TEXT, TIME, TIMESTAMP, Integer)
result = ""
if select._limit:
- result += "FIRST %s " % self.process(sql.literal(select._limit))
+ result += "FIRST %s " % self.process(_literal_as_binds(select._limit))
if select._offset:
- result += "SKIP %s " % self.process(sql.literal(select._offset))
+ result += "SKIP %s " % self.process(_literal_as_binds(select._offset))
if select._distinct:
result += "DISTINCT "
return result
from ...engine import default, reflection
from ...sql import compiler, expression, operators
from ... import types as sqltypes
+from sqlalchemy.sql.elements import _literal_as_binds
try:
from uuid import UUID as _python_UUID
def limit_clause(self, select):
text = ""
if select._limit is not None:
- text += " \n LIMIT " + self.process(sql.literal(select._limit))
+ text += " \n LIMIT " + self.process(_literal_as_binds(select._limit))
if select._offset is not None:
if select._limit is None:
text += " \n LIMIT ALL"
- text += " OFFSET " + self.process(sql.literal(select._offset))
+ text += " OFFSET " + self.process(_literal_as_binds(select._offset))
return text
def format_from_hint_text(self, sqltext, table, hint, iscrud):
from ... import util
from ...engine import default, reflection
from ...sql import compiler
+from sqlalchemy.sql.elements import _literal_as_binds
from ...types import (BLOB, BOOLEAN, CHAR, DATE, DECIMAL, FLOAT, INTEGER, REAL,
NUMERIC, SMALLINT, TEXT, TIMESTAMP, VARCHAR)
def limit_clause(self, select):
text = ""
if select._limit is not None:
- text += "\n LIMIT " + self.process(sql.literal(select._limit))
+ text += "\n LIMIT " + self.process(_literal_as_binds(select._limit))
if select._offset is not None:
if select._limit is None:
text += "\n LIMIT " + self.process(sql.literal(-1))
- text += " OFFSET " + self.process(sql.literal(select._offset))
+ text += " OFFSET " + self.process(_literal_as_binds(select._offset))
else:
text += " OFFSET " + self.process(sql.literal(0))
return text
import decimal
import itertools
import operator
+from sqlalchemy.sql.elements import _literal_as_binds
RESERVED_WORDS = set([
'all', 'analyse', 'analyze', 'and', 'any', 'array',
def limit_clause(self, select):
text = ""
if select._limit is not None:
- text += "\n LIMIT " + self.process(elements.literal(select._limit))
+ text += "\n LIMIT " + self.process(_literal_as_binds(select._limit))
if select._offset is not None:
if select._limit is None:
text += "\n LIMIT -1"
- text += " OFFSET " + self.process(elements.literal(select._offset))
+ text += " OFFSET " + self.process(_literal_as_binds(select._offset))
return text
def visit_table(self, table, asfrom=False, iscrud=False, ashint=False,
self._execution_options.union(
{'autocommit': autocommit})
if limit is not None:
- self._limit = util.asint(limit)
+ self._limit = limit
if offset is not None:
- self._offset = util.asint(offset)
+ self._offset = offset
self._bind = bind
if order_by is not None:
"""return a new selectable with the given LIMIT criterion
applied."""
- self._limit = util.asint(limit)
+ self._limit = limit
@_generative
def offset(self, offset):
"""return a new selectable with the given OFFSET criterion
applied."""
- self._offset = util.asint(offset)
+ self._offset = offset
@_generative
def order_by(self, *clauses):
self._group_by_clause = ClauseList(*clauses)
+ def _copy_internals(self, clone=_clone, **kw):
+ if isinstance(self._limit, ClauseElement):
+ self._limit = clone(self._limit)
+ if isinstance(self._offset, ClauseElement):
+ self._offset = clone(self._offset)
+
class CompoundSelect(GenerativeSelect):
"""Forms the basis of ``UNION``, ``UNION ALL``, and other
SELECT-based set operations.
"addition of columns to underlying selectables")
def _copy_internals(self, clone=_clone, **kw):
+ super(CompoundSelect, self)._copy_internals(clone, **kw)
self._reset_exported()
self.selects = [clone(s, **kw) for s in self.selects]
if hasattr(self, '_col_map'):
return False
def _copy_internals(self, clone=_clone, **kw):
+ super(Select, self)._copy_internals(clone, **kw)
# Select() object has been cloned and probably adapted by the
# given clone function. Apply the cloning function to internal
assert [] == sess.query(User).order_by(User.id)[3:3]
assert [] == sess.query(User).order_by(User.id)[0:0]
+ def test_select_with_bindparam_limit(self):
+ """Does a query allow bindparam for the limit?"""
+ sess = create_session()
+ users = []
+
+ q1 = sess.query(self.classes.User).order_by(self.classes.User.id).limit(sa.bindparam('n'))
+ for n in xrange(1,4):
+ users[:] = q1.params(n=n).all()
+ assert len(users) == n
@testing.requires.boolean_col_expressions
def test_exists(self):