from sqlalchemy.ext.associationproxy import _AssociationList
from sqlalchemy.test import *
from sqlalchemy.test.util import gc_collect
+from sqlalchemy.sql import not_
+from test.orm import _base
class DictCollection(dict):
self.name = name
def __call__(self, obj):
- return getattr(obj, self.name)
\ No newline at end of file
+ return getattr(obj, self.name)
+
+class ComparatorTest(_base.MappedTest):
+ run_inserts = 'once'
+ run_deletes = None
+ run_setup_mappers = 'once'
+
+ @classmethod
+ def define_tables(cls, metadata):
+
+ Table(
+ 'userkeywords', metadata,
+ Column('keyword_id', Integer, ForeignKey('keywords.id'),
+ primary_key=True),
+ Column('user_id', Integer, ForeignKey('users.id')))
+
+ Table(
+ 'users', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(64)))
+
+ Table(
+ 'keywords', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('keyword', String(64)))
+
+ @classmethod
+ def setup_classes(cls):
+ class User(_base.ComparableEntity):
+ def __init__(self, name):
+ self.name = name
+ keywords = association_proxy('user_keywords', 'keyword',
+ creator=lambda k: UserKeyword(keyword=k))
+
+ class Keyword(_base.ComparableEntity):
+ def __init__(self, keyword):
+ self.keyword = keyword
+ user = association_proxy('user_keyword', 'user')
+
+ class UserKeyword(_base.ComparableEntity):
+ def __init__(self, user=None, keyword=None):
+ self.user = user
+ self.keyword = keyword
+
+ @classmethod
+ @testing.resolve_artifact_names
+ def setup_mappers(cls):
+
+ mapper(User, users)
+ mapper(Keyword, keywords, properties={
+ 'user_keyword': relation(UserKeyword, uselist=False)
+ })
+ mapper(UserKeyword, userkeywords, properties={
+ 'user': relation(User, backref='user_keywords'),
+ 'keyword': relation(Keyword),
+ })
+
+ @classmethod
+ @testing.resolve_artifact_names
+ def insert_data(cls):
+ session = sessionmaker()()
+ words = ('quick', 'brown', 'fox', 'jumped', 'over', 'the', 'lazy')
+ for ii in range(4):
+ user = User('user%d' % ii)
+ session.add(user)
+ for jj in words[ii:ii+3]:
+ user.keywords.append(Keyword(jj))
+
+ orphan = Keyword('orphan')
+ orphan.user_keyword = UserKeyword(keyword=orphan, user=None)
+ session.add(orphan)
+ session.commit()
+
+ cls.u = user
+ cls.kw = user.keywords[0]
+ cls.session = session
+
+ def _equivalent(self, q_proxy, q_direct):
+ eq_(q_proxy.all(), q_direct.all())
+
+ @testing.resolve_artifact_names
+ def test_filter_any_kwarg(self):
+ self._equivalent(
+ self.session.query(User).\
+ filter(User.keywords.any(keyword='jumped')),
+ self.session.query(User).\
+ filter(User.user_keywords.any(
+ UserKeyword.keyword.has(keyword='jumped'))))
+
+ @testing.resolve_artifact_names
+ def test_filter_has_kwarg(self):
+ self._equivalent(
+ self.session.query(Keyword).\
+ filter(Keyword.user.has(name='user2')),
+ self.session.query(Keyword).\
+ filter(Keyword.user_keyword.has(
+ UserKeyword.user.has(name='user2'))))
+
+ @testing.resolve_artifact_names
+ def test_filter_any_criterion(self):
+ self._equivalent(
+ self.session.query(User).\
+ filter(User.keywords.any(Keyword.keyword == 'jumped')),
+ self.session.query(User).\
+ filter(User.user_keywords.any(
+ UserKeyword.keyword.has(Keyword.keyword == 'jumped'))))
+
+ @testing.resolve_artifact_names
+ def test_filter_has_criterion(self):
+ self._equivalent(
+ self.session.query(Keyword).\
+ filter(Keyword.user.has(User.name == 'user2')),
+ self.session.query(Keyword).\
+ filter(Keyword.user_keyword.has(
+ UserKeyword.user.has(User.name == 'user2'))))
+
+ @testing.resolve_artifact_names
+ def test_filter_contains(self):
+ self._equivalent(
+ self.session.query(User).\
+ filter(User.keywords.contains(self.kw)),
+ self.session.query(User).\
+ filter(User.user_keywords.any(keyword=self.kw)))
+
+ @testing.resolve_artifact_names
+ def test_filter_eq(self):
+ self._equivalent(
+ self.session.query(Keyword).\
+ filter(Keyword.user == self.u),
+ self.session.query(Keyword).\
+ filter(Keyword.user_keyword.has(user=self.u)))
+
+ @testing.resolve_artifact_names
+ def test_filter_ne(self):
+ self._equivalent(
+ self.session.query(Keyword).\
+ filter(Keyword.user != self.u),
+ self.session.query(Keyword).\
+ filter(not_(Keyword.user_keyword.has(user=self.u))))
+
+ @testing.resolve_artifact_names
+ def test_filter_eq_null(self):
+ self._equivalent(
+ self.session.query(Keyword).\
+ filter(Keyword.user == None),
+ self.session.query(Keyword).\
+ filter(Keyword.user_keyword.has(UserKeyword.user == None)))
+
+ @testing.resolve_artifact_names
+ def test_filter_scalar_contains_fails(self):
+ assert_raises(exceptions.InvalidRequestError, lambda: Keyword.user.contains(self.u))
+
+ @testing.resolve_artifact_names
+ def test_filter_scalar_any_fails(self):
+ assert_raises(exceptions.InvalidRequestError, lambda: Keyword.user.any(name='user2'))
+
+ @testing.resolve_artifact_names
+ def test_filter_collection_has_fails(self):
+ assert_raises(exceptions.InvalidRequestError, lambda: User.keywords.has(keyword='quick'))
+
+ @testing.resolve_artifact_names
+ def test_filter_collection_eq_fails(self):
+ assert_raises(exceptions.InvalidRequestError, lambda: User.keywords == self.kw)
+
+ @testing.resolve_artifact_names
+ def test_filter_collection_ne_fails(self):
+ assert_raises(exceptions.InvalidRequestError, lambda: User.keywords != self.kw)