import weakref
from .. import exc, orm, util
from ..orm import collections, interfaces
-from ..sql import not_
+from ..sql import not_, or_
def association_proxy(target_collection, attr, **kw):
return not self._get_property().\
mapper.get_property(self.value_attr).uselist
+ @util.memoized_property
+ def _target_is_object(self):
+ return getattr(self.target_class, self.value_attr).impl.uses_objects
+
def __get__(self, obj, class_):
if self.owning_class is None:
self.owning_class = class_ and class_ or type(obj)
"""
- return self._comparator.has(
+ if self._target_is_object:
+ return self._comparator.has(
getattr(self.target_class, self.value_attr).\
has(criterion, **kwargs)
)
+ else:
+ if criterion is not None or kwargs:
+ raise exc.ArgumentError(
+ "Non-empty has() not allowed for "
+ "column-targeted association proxy; use ==")
+ return self._comparator.has()
def contains(self, obj):
"""Produce a proxied 'contains' expression using EXISTS.
return self._comparator.any(**{self.value_attr: obj})
def __eq__(self, obj):
- return self._comparator.has(**{self.value_attr: obj})
+ # note the has() here will fail for collections; eq_()
+ # is only allowed with a scalar.
+ if obj is None:
+ return or_(
+ self._comparator.has(**{self.value_attr: obj}),
+ self._comparator == None
+ )
+ else:
+ return self._comparator.has(**{self.value_attr: obj})
def __ne__(self, obj):
- return not_(self.__eq__(obj))
+ # note the has() here will fail for collections; eq_()
+ # is only allowed with a scalar.
+ return self._comparator.has(
+ getattr(self.target_class, self.value_attr) != obj)
class _lazy_collection(object):
Table('singular', metadata,
Column('id', Integer,
primary_key=True, test_needs_autoincrement=True),
+ Column('value', String(50))
)
@classmethod
# nonuselist -> uselist
singular_keywords = association_proxy('singular', 'keywords')
+ # m2o -> scalar
+ # nonuselist
+ singular_value = association_proxy('singular', 'value')
+
class Keyword(cls.Comparable):
def __init__(self, keyword):
self.keyword = keyword
'fox', 'jumped', 'over',
'the', 'lazy',
)
- for ii in range(4):
+ for ii in range(16):
user = User('user%d' % ii)
- user.singular = Singular()
+
+ if ii % 2 == 0:
+ user.singular = Singular(value=("singular%d" % ii)
+ if ii % 4 == 0 else None)
session.add(user)
- for jj in words[ii:ii + 3]:
+ for jj in words[(ii % len(words)):((ii + 3) % len(words))]:
k = Keyword(jj)
user.keywords.append(k)
- user.singular.keywords.append(k)
+ if ii % 3 == None:
+ user.singular.keywords.append(k)
+
orphan = Keyword('orphan')
orphan.user_keyword = UserKeyword(keyword=orphan, user=None)
session.add(orphan)
+
+ keyword_with_nothing = Keyword('kwnothing')
+ session.add(keyword_with_nothing)
+
session.commit()
cls.u = user
cls.kw = user.keywords[0]
self.classes.Keyword)
self._equivalent(self.session.query(Keyword).
- filter(Keyword.user.has(User.name
- == 'user2')),
+ filter(Keyword.user.has(User.name == 'user2')),
self.session.query(Keyword).
filter(Keyword.user_keyword.has(
- UserKeyword.user.has(User.name
- == 'user2'))))
+ UserKeyword.user.has(User.name == 'user2'))))
def test_filter_any_criterion_nul_ul(self):
User, Keyword, Singular = (self.classes.User,
self.classes.Singular)
self._equivalent(
- self.session.query(User).\
- filter(User.singular_keywords.any(Keyword.keyword=='jumped')),
- self.session.query(User).\
+ self.session.query(User).
+ filter(User.singular_keywords.any(
+ Keyword.keyword == 'jumped')),
+ self.session.query(User).
filter(
User.singular.has(
- Singular.keywords.any(Keyword.keyword=='jumped')
+ Singular.keywords.any(Keyword.keyword == 'jumped')
)
)
)
def test_filter_ne_nul_nul(self):
Keyword = self.classes.Keyword
- self._equivalent(self.session.query(Keyword).filter(Keyword.user
- != self.u),
+ self._equivalent(self.session.query(Keyword).filter(Keyword.user != self.u),
self.session.query(Keyword).
- filter(not_(Keyword.user_keyword.has(user=self.u))))
+ filter(
+ Keyword.user_keyword.has(Keyword.user != self.u)
+ )
+ )
def test_filter_eq_null_nul_nul(self):
UserKeyword, Keyword = self.classes.UserKeyword, self.classes.Keyword
- self._equivalent(self.session.query(Keyword).filter(Keyword.user
- == None),
- self.session.query(Keyword).
- filter(Keyword.user_keyword.has(UserKeyword.user
- == None)))
+ self._equivalent(
+ self.session.query(Keyword).filter(Keyword.user == None),
+ self.session.query(Keyword).
+ filter(
+ or_(
+ Keyword.user_keyword.has(UserKeyword.user == None),
+ Keyword.user_keyword == None
+ )
+
+ )
+ )
+
+ def test_filter_ne_null_nul_nul(self):
+ UserKeyword, Keyword = self.classes.UserKeyword, self.classes.Keyword
+
+ self._equivalent(
+ self.session.query(Keyword).filter(Keyword.user != None),
+ self.session.query(Keyword).
+ filter(
+ Keyword.user_keyword.has(UserKeyword.user != None),
+ )
+ )
+
+ def test_filter_eq_None_nul(self):
+ User = self.classes.User
+ Singular = self.classes.Singular
+
+ self._equivalent(
+ self.session.query(User).filter(User.singular_value == None),
+ self.session.query(User).filter(
+ or_(
+ User.singular.has(Singular.value==None),
+ User.singular == None
+ )
+ )
+ )
+
+ def test_filter_eq_value_nul(self):
+ User = self.classes.User
+ Singular = self.classes.Singular
+
+ self._equivalent(
+ self.session.query(User).filter(User.singular_value == "singular4"),
+ self.session.query(User).filter(
+ User.singular.has(Singular.value=="singular4"),
+ )
+ )
+
+ def test_filter_ne_None_nul(self):
+ User = self.classes.User
+ Singular = self.classes.Singular
+
+ self._equivalent(
+ self.session.query(User).filter(User.singular_value != None),
+ self.session.query(User).filter(
+ User.singular.has(Singular.value != None),
+ )
+ )
+
+ def test_has_nul(self):
+ # a special case where we provide an empty has() on a
+ # non-object-targeted association proxy.
+ User = self.classes.User
+ Singular = self.classes.Singular
+
+ self._equivalent(
+ self.session.query(User).filter(User.singular_value.has()),
+ self.session.query(User).filter(
+ User.singular.has(),
+ )
+ )
+
+ def test_nothas_nul(self):
+ # a special case where we provide an empty has() on a
+ # non-object-targeted association proxy.
+ User = self.classes.User
+ Singular = self.classes.Singular
+
+ self._equivalent(
+ self.session.query(User).filter(~User.singular_value.has()),
+ self.session.query(User).filter(
+ ~User.singular.has(),
+ )
+ )
+
+ def test_has_criterion_nul(self):
+ # but we don't allow that with any criterion...
+ User = self.classes.User
+ Singular = self.classes.Singular
+
+ assert_raises_message(
+ exc.ArgumentError,
+ "Non-empty has\(\) not allowed",
+ User.singular_value.has,
+ User.singular_value == "singular4"
+ )
+
+ def test_has_kwargs_nul(self):
+ # ... or kwargs
+ User = self.classes.User
+ Singular = self.classes.Singular
+
+ assert_raises_message(
+ exc.ArgumentError,
+ "Non-empty has\(\) not allowed",
+ User.singular_value.has, singular_value="singular4"
+ )
+
+ def test_filter_ne_value_nul(self):
+ User = self.classes.User
+ Singular = self.classes.Singular
+
+ self._equivalent(
+ self.session.query(User).filter(User.singular_value != "singular4"),
+ self.session.query(User).filter(
+ User.singular.has(Singular.value != "singular4"),
+ )
+ )
def test_filter_scalar_contains_fails_nul_nul(self):
Keyword = self.classes.Keyword