From: Mike Bayer Date: Mon, 17 Jan 2011 01:18:32 +0000 (-0500) Subject: - the zblog example is obsolete, the tests don't really test it X-Git-Tag: rel_0_7b1~66 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a9a67667901e24eefb36e01334eed9bf25b2b144;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - the zblog example is obsolete, the tests don't really test it and a key feature of its mapping (the deferred col outside of the select) doesn't work anyway. - add a token "deferred on selectable" test to test_mapper. --- diff --git a/test/orm/test_mapper.py b/test/orm/test_mapper.py index b34c4ab8af..4de669b879 100644 --- a/test/orm/test_mapper.py +++ b/test/orm/test_mapper.py @@ -1892,6 +1892,28 @@ class DeferredTest(_fixtures.FixtureTest): eq_(o1.description, 'order 1') self.sql_count_(0, go) + @testing.resolve_artifact_names + def test_map_selectable_wo_deferred(self): + """test mapping to a selectable with deferred cols, + the selectable doesn't include the deferred col. + + """ + + order_select = sa.select([ + orders.c.id, + orders.c.user_id, + orders.c.address_id, + orders.c.description, + orders.c.isopen]).alias() + mapper(Order, order_select, properties={ + 'description':deferred(order_select.c.description) + }) + + sess = Session() + o1 = sess.query(Order).order_by(Order.id).first() + assert 'description' not in o1.__dict__ + eq_(o1.description, 'order 1') + @testing.resolve_artifact_names def test_deep_options(self): mapper(Item, items, properties=dict( diff --git a/test/zblog/__init__.py b/test/zblog/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/test/zblog/blog.py b/test/zblog/blog.py deleted file mode 100644 index 4c7635430d..0000000000 --- a/test/zblog/blog.py +++ /dev/null @@ -1,35 +0,0 @@ -import datetime - - -__all__ = ['Blog', 'Post', 'Topic', 'TopicAssociation', 'Comment'] - -class Blog(object): - def __init__(self, owner=None): - self.owner = owner - -class Post(object): - topics = set - def __init__(self, user=None, headline=None, summary=None): - self.user = user - self.datetime = datetime.datetime.today() - self.headline = headline - self.summary = summary - self.comments = [] - self.comment_count = 0 - -class Topic(object): - def __init__(self, keyword=None, description=None): - self.keyword = keyword - self.description = description - -class TopicAssociation(object): - def __init__(self, post=None, topic=None, is_primary=False): - self.post = post - self.topic = topic - self.is_primary = is_primary - -class Comment(object): - def __init__(self, subject=None, body=None): - self.subject = subject - self.datetime = datetime.datetime.today() - self.body = body diff --git a/test/zblog/mappers.py b/test/zblog/mappers.py deleted file mode 100644 index 4ad542ec14..0000000000 --- a/test/zblog/mappers.py +++ /dev/null @@ -1,135 +0,0 @@ -"""mapper.py - defines mappers for domain objects, mapping operations""" - -from test.zblog import tables, user -from test.zblog.blog import * -from sqlalchemy import * -from sqlalchemy.orm import * -import sqlalchemy.util as util - -def zblog_mappers(): - # User mapper. Here, we redefine the names of some of the columns to - # different property names. normally the table columns are all sucked in - # automatically. - mapper(user.User, tables.users, properties={ - 'id':tables.users.c.user_id, - 'name':tables.users.c.user_name, - 'group':tables.users.c.groupname, - 'crypt_password':tables.users.c.password, - }) - - # blog mapper. this contains a reference to the user mapper, and also - # installs a "backreference" on that relationship to handle it in both - # ways. this will also attach a 'blogs' property to the user mapper. - mapper(Blog, tables.blogs, properties={ - 'id':tables.blogs.c.blog_id, - 'owner':relationship(user.User, lazy='joined', - backref=backref('blogs', cascade="all, delete-orphan")), - }) - - # topic mapper. map all topic columns to the Topic class. - mapper(Topic, tables.topics) - - # TopicAssocation mapper. This is an "association" object, which is - # similar to a many-to-many relationship except extra data is associated - # with each pair of related data. because the topic_xref table doesnt - # have a primary key, the "primary key" columns of a TopicAssociation are - # defined manually here. - mapper(TopicAssociation,tables.topic_xref, - primary_key=[tables.topic_xref.c.post_id, - tables.topic_xref.c.topic_id], - properties={ - 'topic':relationship(Topic, lazy='joined'), - }) - - # Post mapper, these are posts within a blog. - # since we want the count of comments for each post, create a select that - # will get the posts and count the comments in one query. - posts_with_ccount = select( - [c for c in tables.posts.c if c.key != 'body'] + [ - func.count(tables.comments.c.comment_id).label('comment_count') - ], - from_obj = [ - outerjoin(tables.posts, tables.comments) - ], - group_by=[ - c for c in tables.posts.c if c.key != 'body' - ] - ) .alias('postswcount') - - # then create a Post mapper on that query. - # we have the body as "deferred" so that it loads only when needed, the - # user as a Lazy load, since the lazy load will run only once per user and - # its usually only one user's posts is needed per page, the owning blog is - # a lazy load since its also probably loaded into the identity map - # already, and topics is an eager load since that query has to be done per - # post in any case. - mapper(Post, posts_with_ccount, properties={ - 'id':posts_with_ccount.c.post_id, - 'body':deferred(tables.posts.c.body), - 'user':relationship(user.User, lazy='select', - backref=backref('posts', cascade="all, delete-orphan")), - 'blog':relationship(Blog, lazy='select', - backref=backref('posts', cascade="all, delete-orphan")), - 'topics':relationship(TopicAssociation, lazy='joined', - cascade="all, delete-orphan", - backref='post') - }, order_by=[desc(posts_with_ccount.c.datetime)]) - - - # comment mapper. This mapper is handling a hierarchical relationship on - # itself, and contains a lazy reference both to its parent comment and its - # list of child comments. - mapper(Comment, tables.comments, properties={ - 'id':tables.comments.c.comment_id, - 'post':relationship(Post, lazy='select', - backref=backref('comments', - cascade="all, delete-orphan")), - 'user':relationship(user.User, lazy='joined', - backref=backref('comments', - cascade="all, delete-orphan")), - 'parent':relationship(Comment, - primaryjoin=(tables.comments.c.parent_comment_id == - tables.comments.c.comment_id), - foreign_keys=[tables.comments.c.comment_id], - lazy='select', uselist=False), - 'replies':relationship(Comment, - primaryjoin=(tables.comments.c.parent_comment_id == - tables.comments.c.comment_id), - lazy='select', uselist=True, cascade="all"), - }) - -# we define one special find-by for the comments of a post, which is going to -# make its own "noload" mapper and organize the comments into their correct -# hierarchy in one pass. hierarchical data normally needs to be loaded by -# separate queries for each set of children, unless you use a proprietary -# extension like CONNECT BY. -def find_by_post(post): - """returns a hierarchical collection of comments based on a given criterion. - - Uses a mapper that does not lazy load replies or parents, and instead - organizes comments into a hierarchical tree when the result is produced. - """ - - q = session().query(Comment).options(noload('replies'), noload('parent')) - comments = q.select_by(post_id=post.id) - result = [] - d = {} - for c in comments: - d[c.id] = c - if c.parent_comment_id is None: - result.append(c) - c.parent=None - else: - parent = d[c.parent_comment_id] - parent.replies.append(c) - c.parent = parent - return result - -Comment.find_by_post = staticmethod(find_by_post) - -def start_session(): - """creates a new session for the start of a request.""" - trans.session = create_session(bind_to=zblog.database.engine ) - -def session(): - return trans.session diff --git a/test/zblog/tables.py b/test/zblog/tables.py deleted file mode 100644 index 3326c28fbe..0000000000 --- a/test/zblog/tables.py +++ /dev/null @@ -1,53 +0,0 @@ -"""application table metadata objects are described here.""" - -from sqlalchemy import * -from test.lib.schema import Table, Column - -metadata = MetaData() - -users = Table('users', metadata, - Column('user_id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('user_name', String(30), nullable=False), - Column('fullname', String(100), nullable=False), - Column('password', String(40), nullable=False), - Column('groupname', String(20), nullable=False), - ) - -blogs = Table('blogs', metadata, - Column('blog_id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('owner_id', Integer, ForeignKey('users.user_id'), nullable=False), - Column('name', String(100), nullable=False), - Column('description', String(500)) - ) - -posts = Table('posts', metadata, - Column('post_id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('blog_id', Integer, ForeignKey('blogs.blog_id'), nullable=False), - Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False), - Column('datetime', DateTime, nullable=False), - Column('headline', String(500)), - Column('summary', String(255)), - Column('body', Text), - ) - -topics = Table('topics', metadata, - Column('topic_id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('keyword', String(50), nullable=False), - Column('description', String(500)) - ) - -topic_xref = Table('topic_post_xref', metadata, - Column('topic_id', Integer, ForeignKey('topics.topic_id'), nullable=False), - Column('is_primary', Boolean, nullable=False), - Column('post_id', Integer, ForeignKey('posts.post_id'), nullable=False) - ) - -comments = Table('comments', metadata, - Column('comment_id', Integer, primary_key=True, test_needs_autoincrement=True), - Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False), - Column('post_id', Integer, ForeignKey('posts.post_id'), nullable=False), - Column('datetime', DateTime, nullable=False), - Column('parent_comment_id', Integer, ForeignKey('comments.comment_id')), - Column('subject', String(500)), - Column('body', Text), - ) diff --git a/test/zblog/test_zblog.py b/test/zblog/test_zblog.py deleted file mode 100644 index 0ef1293e9b..0000000000 --- a/test/zblog/test_zblog.py +++ /dev/null @@ -1,116 +0,0 @@ -from sqlalchemy import * -from sqlalchemy.orm import * -from test.lib import * -from test.zblog import mappers, tables -from test.zblog.user import * -from test.zblog.blog import * - - -class ZBlogTest(TestBase, AssertsExecutionResults): - - @classmethod - def create_tables(cls): - tables.metadata.drop_all(bind=testing.db) - tables.metadata.create_all(bind=testing.db) - - @classmethod - def drop_tables(cls): - tables.metadata.drop_all(bind=testing.db) - - @classmethod - def setup_class(cls): - cls.create_tables() - @classmethod - def teardown_class(cls): - cls.drop_tables() - def teardown(self): - pass - def setup(self): - pass - - -class SavePostTest(ZBlogTest): - @classmethod - def setup_class(cls): - super(SavePostTest, cls).setup_class() - - mappers.zblog_mappers() - global blog_id, user_id - s = create_session(bind=testing.db) - user = User('zbloguser', "Zblog User", "hello", group=administrator) - blog = Blog(owner=user) - blog.name = "this is a blog" - s.add(user) - s.add(blog) - s.flush() - blog_id = blog.id - user_id = user.id - s.close() - - @classmethod - def teardown_class(cls): - clear_mappers() - super(SavePostTest, cls).teardown_class() - - def test_attach_noautoflush(self): - """Test pending backref behavior.""" - - s = create_session(bind=testing.db, autoflush=False) - - s.begin() - try: - blog = s.query(Blog).get(blog_id) - post = Post(headline="asdf asdf", summary="asdfasfd") - s.add(post) - post.blog_id=blog_id - post.blog = blog - assert post in blog.posts - finally: - s.rollback() - - def test_attach_autoflush(self): - s = create_session(bind=testing.db, autoflush=True) - - s.begin() - try: - blog = s.query(Blog).get(blog_id) - user = s.query(User).get(user_id) - post = Post(headline="asdf asdf", summary="asdfasfd", user=user) - s.add(post) - post.blog_id=blog_id - post.blog = blog - assert post in blog.posts - finally: - s.rollback() - - def testoptimisticorphans(self): - """test that instances in the session with un-loaded parents will not - get marked as "orphans" and then deleted """ - s = create_session(bind=testing.db) - - s.begin() - try: - blog = s.query(Blog).get(blog_id) - post = Post(headline="asdf asdf", summary="asdfasfd") - post.blog = blog - user = s.query(User).get(user_id) - post.user = user - s.add(post) - s.flush() - s.expunge_all() - - user = s.query(User).get(user_id) - blog = s.query(Blog).get(blog_id) - post = blog.posts[0] - comment = Comment(subject="some subject", body="some body") - comment.post = post - comment.user = user - s.flush() - s.expunge_all() - - assert s.query(Post).get(post.id) is not None - - finally: - s.rollback() - - diff --git a/test/zblog/user.py b/test/zblog/user.py deleted file mode 100644 index 30f1e3da16..0000000000 --- a/test/zblog/user.py +++ /dev/null @@ -1,41 +0,0 @@ -"""user.py - handles user login and validation""" - -import random, string - -import sys -if sys.version_info < (2, 5): - from sha import sha -else: - from hashlib import sha1 as sha - -administrator = 'admin' -user = 'user' -groups = [user, administrator] - -def cryptpw(password, salt=None): - if salt is None: - salt = "".join([chr(random.randint(ord('a'), ord('z'))), - chr(random.randint(ord('a'), ord('z')))]) - return sha((password+ salt).encode('ascii')).hexdigest() - -def checkpw(password, dbpw): - return cryptpw(password, dbpw[:2]) == dbpw - -class User(object): - def __init__(self, name=None, fullname=None, password=None, group=user): - self.name = name - self.fullname = fullname - self.password = password - self.group = group - - def is_administrator(self): - return self.group == administrator - - def _set_password(self, password): - if password: - self.crypt_password=cryptpw(password) - - password = property(lambda s: None, _set_password) - - def checkpw(self, password): - return checkpw(password, self.crypt_password)