well as relationships. also defines some MapperOptions that can be used with the
properties."""
-from sqlalchemy import sql, schema, util, attributes, exceptions
+from sqlalchemy import sql, schema, util, attributes, exceptions, sql_util
import sync
import mapper
import session as sessionlib
self.eagertarget = self.target.alias()
if self.secondary:
self.eagersecondary = self.secondary.alias()
- self.aliasizer = Aliasizer(self.target, self.secondary, aliases={
+ self.aliasizer = sql_util.Aliasizer(self.target, self.secondary, aliases={
self.target:self.eagertarget,
self.secondary:self.eagersecondary
})
self.eagerprimary.accept_visitor(self.aliasizer)
#print "JOINS:", str(self.eagerprimary), "|", str(self.eagersecondaryjoin)
else:
- self.aliasizer = Aliasizer(self.target, aliases={self.target:self.eagertarget})
+ self.aliasizer = sql_util.Aliasizer(self.target, aliases={self.target:self.eagertarget})
self.eagerprimary = self.primaryjoin.copy_container()
self.eagerprimary.accept_visitor(self.aliasizer)
p.do_init_subclass(prop.key, prop.parent, recursion_stack)
p._create_eager_chain(recursion_stack=recursion_stack)
p.eagerprimary = p.eagerprimary.copy_container()
-# aliasizer = Aliasizer(p.parent.mapped_table, aliases={p.parent.mapped_table:self.eagertarget})
+# aliasizer = sql_util.Aliasizer(p.parent.mapped_table, aliases={p.parent.mapped_table:self.eagertarget})
p.eagerprimary.accept_visitor(self.aliasizer)
#print "new eagertqarget", p.eagertarget.name, (p.secondary and p.secondary.name or "none"), p.parent.mapped_table.name
finally:
prop = ColumnProperty(*oldprop.columns, **self.kwargs)
mapper._compile_property(key, prop)
-class Aliasizer(sql.ClauseVisitor):
- """converts a table instance within an expression to be an alias of that table."""
- def __init__(self, *tables, **kwargs):
- self.tables = {}
- self.aliases = kwargs.get('aliases', {})
- for t in tables:
- self.tables[t] = t
- if not self.aliases.has_key(t):
- self.aliases[t] = sql.alias(t)
- if isinstance(t, sql.Join):
- for t2 in t.columns:
- self.tables[t2.table] = t2
- self.aliases[t2.table] = self.aliases[t]
- self.binary = None
- def get_alias(self, table):
- return self.aliases[table]
- def visit_compound(self, compound):
- self.visit_clauselist(compound)
- def visit_clauselist(self, clist):
- for i in range(0, len(clist.clauses)):
- if isinstance(clist.clauses[i], schema.Column) and self.tables.has_key(clist.clauses[i].table):
- orig = clist.clauses[i]
- clist.clauses[i] = self.get_alias(clist.clauses[i].table).corresponding_column(clist.clauses[i])
- def visit_binary(self, binary):
- if isinstance(binary.left, schema.Column) and self.tables.has_key(binary.left.table):
- binary.left = self.get_alias(binary.left.table).corresponding_column(binary.left)
- if isinstance(binary.right, schema.Column) and self.tables.has_key(binary.right.table):
- binary.right = self.get_alias(binary.right.table).corresponding_column(binary.right)
class BinaryVisitor(sql.ClauseVisitor):
def __init__(self, func):
# the MIT License: http://www.opensource.org/licenses/mit-license.php
import session as sessionlib
-from sqlalchemy import sql, util, exceptions
+from sqlalchemy import sql, util, exceptions, sql_util
+
import mapper
if self._should_nest(**kwargs):
from_obj.append(self.table)
- s2 = sql.select(self.table.primary_key, whereclause, use_labels=True, from_obj=from_obj, **kwargs)
+
+ # if theres an order by, add those columns to the column list
+ # of the "rowcount" query we're going to make
+ if order_by:
+ order_by = util.to_list(order_by) or []
+ cf = sql_util.ColumnFinder()
+ [o.accept_visitor(cf) for o in order_by]
+ else:
+ cf = []
+
+ s2 = sql.select(self.table.primary_key + list(cf), whereclause, use_labels=True, from_obj=from_obj, **kwargs)
# raise "ok first thing", str(s2)
if not kwargs.get('distinct', False) and order_by:
s2.order_by(*util.to_list(order_by))
crit.append(s3.primary_key[i] == self.table.primary_key[i])
statement = sql.select([], sql.and_(*crit), from_obj=[self.table], use_labels=True)
# raise "OK statement", str(statement)
+
+ # now for the order by, convert the columns to their corresponding columns
+ # in the "rowcount" query, and tack that new order by onto the "rowcount" query
if order_by:
- # copy the order_by, since eager loaders will modify it, and we want the
- # "inner" order_by to remain untouched
- # see test/orm/mapper.py EagerTest.testmorelimit
- order_by = [o.copy_container() for o in util.to_list(order_by)]
+ class Aliasizer(sql_util.Aliasizer):
+ def get_alias(self, table):
+ return s3
+ order_by = [o.copy_container() for o in order_by]
+ aliasizer = Aliasizer(*[t for t in sql_util.TableFinder(s3)])
+ [o.accept_visitor(aliasizer) for o in order_by]
statement.order_by(*util.to_list(order_by))
else:
from_obj.append(self.table)
import sqlalchemy.sql as sql
import sqlalchemy.schema as schema
+import sqlalchemy.util as util
"""utility functions that build upon SQL and Schema constructs"""
if self.check_columns:
column.table.accept_visitor(self)
+class ColumnFinder(sql.ClauseVisitor):
+ def __init__(self):
+ self.columns = util.Set()
+ def visit_column(self, c):
+ self.columns.add(c)
+ def __iter__(self):
+ return iter(self.columns)
+
+class Aliasizer(sql.ClauseVisitor):
+ """converts a table instance within an expression to be an alias of that table."""
+ def __init__(self, *tables, **kwargs):
+ self.tables = {}
+ self.aliases = kwargs.get('aliases', {})
+ for t in tables:
+ self.tables[t] = t
+ if not self.aliases.has_key(t):
+ self.aliases[t] = sql.alias(t)
+ if isinstance(t, sql.Join):
+ for t2 in t.columns:
+ self.tables[t2.table] = t2
+ self.aliases[t2.table] = self.aliases[t]
+ self.binary = None
+ def get_alias(self, table):
+ return self.aliases[table]
+ def visit_compound(self, compound):
+ self.visit_clauselist(compound)
+ def visit_clauselist(self, clist):
+ for i in range(0, len(clist.clauses)):
+ if isinstance(clist.clauses[i], schema.Column) and self.tables.has_key(clist.clauses[i].table):
+ orig = clist.clauses[i]
+ clist.clauses[i] = self.get_alias(clist.clauses[i].table).corresponding_column(clist.clauses[i])
+ def visit_binary(self, binary):
+ if isinstance(binary.left, schema.Column) and self.tables.has_key(binary.left.table):
+ binary.left = self.get_alias(binary.left.table).corresponding_column(binary.left)
+ if isinstance(binary.right, schema.Column) and self.tables.has_key(binary.right.table):
+ binary.right = self.get_alias(binary.right.table).corresponding_column(binary.right)
self.assert_result(l, Item, *[item_keyword_result[1], item_keyword_result[2]])
def testmorelimit(self):
- """tests that the ORDER BY doesnt get clobbered with a nested eager load, when the ORDER BY
- is an expression. requires the copying of the order by clause in query.compile()"""
+ """test that the ORDER BY is propigated from the inner select to the outer select, when using the
+ 'wrapped' select statement resulting from the combination of eager loading and limit/offset clauses."""
ordermapper = mapper(Order, orders, properties = dict(
items = relation(mapper(Item, orderitems), lazy = False)
))
l = q.select(q.join_to('orders'), order_by=desc(orders.c.user_id), limit=2, offset=1)
self.assert_result(l, User, *(user_all_result[2], user_all_result[0]))
+ l = q.select(q.join_to('addresses'), order_by=desc(addresses.c.email_address), limit=1, offset=0)
+ self.assert_result(l, User, *(user_all_result[0],))
+
def testonetoone(self):
m = mapper(User, users, properties = dict(
address = relation(mapper(Address, addresses), lazy = False, uselist = False)