else:
return element
+def _literal_as_column(element):
+ if isinstance(element, Operators):
+ return element.clause_element()
+ elif _is_literal(element):
+ return literal_column(str(element))
+ else:
+ return element
+
def _literal_as_binds(element, name='literal', type_=None):
if isinstance(element, Operators):
return element.clause_element()
def __compare(self, op, obj, negate=None):
if obj is None or isinstance(obj, _Null):
if op == operator.eq:
- return _BinaryExpression(self.clause_element(), null(), Operators.is_, negate=Operators.isnot)
+ return _BinaryExpression(self.expression_element(), null(), Operators.is_, negate=Operators.isnot)
elif op == operator.ne:
- return _BinaryExpression(self.clause_element(), null(), Operators.isnot, negate=Operators.is_)
+ return _BinaryExpression(self.expression_element(), null(), Operators.isnot, negate=Operators.is_)
else:
raise exceptions.ArgumentError("Only '='/'!=' operators can be used with NULL")
else:
obj = self._check_literal(obj)
- return _BinaryExpression(self.clause_element(), obj, op, type_=sqltypes.Boolean, negate=negate)
+ return _BinaryExpression(self.expression_element(), obj, op, type_=sqltypes.Boolean, negate=negate)
def __operate(self, op, obj):
obj = self._check_literal(obj)
if op == operator.add and isinstance(type_, (sqltypes.Concatenable)):
op = ColumnOperators.concat_op
- return _BinaryExpression(self.clause_element(), obj, op, type_=type_)
+ return _BinaryExpression(self.expression_element(), obj, op, type_=type_)
operators = {
operator.add : (__operate,),
return other
def clause_element(self):
+ """Allow ``_CompareMixins`` to return the underlying ``ClauseElement``, for non-``ClauseElement`` ``_CompareMixins``."""
+
+ return self
+
+ def expression_element(self):
"""Allow ``_CompareMixins`` to return the appropriate object to be used in expressions."""
return self
_label = property(lambda s: s.name)
orig_set = property(lambda s:s.obj.orig_set)
- def clause_element(self):
+ def expression_element(self):
return self.obj
-
+
def _copy_internals(self):
self.obj = self.obj._clone()
"""
- def __init__(self, columns, whereclause=None, from_obj=None, distinct=False, having=None, correlate=True, **kwargs):
+ def __init__(self, columns, whereclause=None, from_obj=None, distinct=False, having=None, correlate=True, prefixes=None, **kwargs):
"""construct a Select object.
The public constructor for Select is the [sqlalchemy.sql#select()] function;
self._froms = util.OrderedSet()
self._whereclause = None
self._having = None
+ self._prefixes = []
if columns is not None:
for c in columns:
s.distinct = True
return s
+ def prefix_with(self, clause):
+ s = self._generate()
+ s.append_prefix(clause)
+ return s
+
def select_from(self, fromclause):
s = self._generate()
s.append_from(fromclause)
self.__correlate.add(fromclause)
def append_column(self, column):
- if _is_literal(column):
- column = literal_column(str(column))
+ column = _literal_as_column(column)
if isinstance(column, _ScalarSelect):
column = column.self_group(against=ColumnOperators.comma_op)
self._raw_columns.append(column)
-
+
+ def append_prefix(self, clause):
+ clause = _literal_as_text(clause)
+ self._prefixes.append(clause)
+
def append_whereclause(self, whereclause):
if self._whereclause is not None:
self._whereclause = and_(self._whereclause, _literal_as_text(whereclause))
def testalias(self):
# test the alias for a table1. column names stay the same, table name "changes" to "foo".
self.runtest(
- select([alias(table1, 'foo')])
+ select([table1.alias('foo')])
,"SELECT foo.myid, foo.name, foo.description FROM mytable AS foo")
-
+
+ for dialect in (firebird.dialect(), oracle.dialect()):
+ self.runtest(
+ select([table1.alias('foo')])
+ ,"SELECT foo.myid, foo.name, foo.description FROM mytable foo"
+ ,dialect=dialect)
+
self.runtest(
- select([alias(table1, 'foo')])
- ,"SELECT foo.myid, foo.name, foo.description FROM mytable foo"
- ,dialect=firebird.dialect())
+ select([table1.alias()])
+ ,"SELECT mytable_1.myid, mytable_1.name, mytable_1.description FROM mytable AS mytable_1")
# create a select for a join of two tables. use_labels means the column names will have
# labels tablename_columnname, which become the column keys accessible off the Selectable object.
WHERE mytable.myid = myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :t2view_mytable_myid"
)
+
+ def test_prefixes(self):
+ self.runtest(table1.select().prefix_with("SQL_CALC_FOUND_ROWS").prefix_with("SQL_SOME_WEIRD_MYSQL_THING"),
+ "SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING mytable.myid, mytable.name, mytable.description FROM mytable"
+ )
+
def testtext(self):
self.runtest(
text("select * from foo where lala = bar") ,