From: Mike Bayer Date: Tue, 5 Sep 2006 17:16:26 +0000 (+0000) Subject: - added basic 'zblog' test suite X-Git-Tag: rel_0_2_8~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9a5247818904185d3f1e3f4bfbc66cc614a941c6;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - added basic 'zblog' test suite - better error message for mapper orphan detect --- diff --git a/lib/sqlalchemy/orm/mapper.py b/lib/sqlalchemy/orm/mapper.py index fe0525b4ac..4197401f7d 100644 --- a/lib/sqlalchemy/orm/mapper.py +++ b/lib/sqlalchemy/orm/mapper.py @@ -145,7 +145,7 @@ class Mapper(object): for (key,klass) in self.delete_orphans: if not getattr(klass, key).hasparent(obj, optimistic=optimistic): if not has_identity(obj): - raise exceptions.FlushError("instance %s is an unsaved, pending instance and is an orphan" % obj) + raise exceptions.FlushError("instance %s is an unsaved, pending instance and is an orphan (is not attached to any parent '%s' instance via that classes' '%s' attribute)" % (obj, klass.__name__, key)) return True else: return False diff --git a/test/alltests.py b/test/alltests.py index 147d26d2cf..c60e73641d 100644 --- a/test/alltests.py +++ b/test/alltests.py @@ -6,10 +6,11 @@ import base.alltests as base import sql.alltests as sql import engine.alltests as engine import ext.alltests as ext +import zblog.alltests as zblog def suite(): alltests = unittest.TestSuite() - for suite in (base, engine, sql, orm, ext): + for suite in (base, engine, sql, orm, ext, zblog): alltests.addTest(suite.suite()) return alltests diff --git a/test/zblog/__init__.py b/test/zblog/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/zblog/alltests.py b/test/zblog/alltests.py new file mode 100644 index 0000000000..9368eb8207 --- /dev/null +++ b/test/zblog/alltests.py @@ -0,0 +1,18 @@ +import testbase +import unittest + +def suite(): + modules_to_test = ( + 'zblog.tests', + ) + alltests = unittest.TestSuite() + for name in modules_to_test: + mod = __import__(name) + for token in name.split('.')[1:]: + mod = getattr(mod, token) + alltests.addTest(unittest.findTestCases(mod, suiteClass=None)) + return alltests + + +if __name__ == '__main__': + testbase.runTests(suite()) diff --git a/test/zblog/blog.py b/test/zblog/blog.py new file mode 100644 index 0000000000..e234bbbc76 --- /dev/null +++ b/test/zblog/blog.py @@ -0,0 +1,36 @@ +__all__ = ['Blog', 'Post', 'Topic', 'TopicAssociation', 'Comment'] + +import datetime + +class Blog(object): + def __init__(self, owner=None): + self.owner = owner + +class Post(object): + topics = set + def __init__(self, user=None, headline=None, summary=None): + self.user = user + self.datetime = datetime.datetime.today() + self.headline = headline + self.summary = summary + self.comments = [] + self.comment_count = 0 + +class Topic(object): + def __init__(self, keyword=None, description=None): + self.keyword = keyword + self.description = description + +class TopicAssociation(object): + def __init__(self, post=None, topic=None, is_primary=False): + self.post = post + self.topic = topic + self.is_primary = is_primary + +class Comment(object): + def __init__(self, subject=None, body=None): + self.subject = subject + self.datetime = datetime.datetime.today() + self.body = body + + diff --git a/test/zblog/mappers.py b/test/zblog/mappers.py new file mode 100644 index 0000000000..831e6f680e --- /dev/null +++ b/test/zblog/mappers.py @@ -0,0 +1,114 @@ +"""mapper.py - defines mappers for domain objects, mapping operations""" + +import zblog.tables as tables +import zblog.user as user +from zblog.blog import * +from sqlalchemy import * +import sqlalchemy.util as util + +def zblog_mappers(): + # User mapper. Here, we redefine the names of some of the columns + # to different property names. normally the table columns are all + # sucked in automatically. + mapper(user.User, tables.users, properties={ + 'id':tables.users.c.user_id, + 'name':tables.users.c.user_name, + 'group':tables.users.c.groupname, + 'crypt_password':tables.users.c.password, + }) + + # blog mapper. this contains a reference to the user mapper, + # and also installs a "backreference" on that relationship to handle it + # in both ways. this will also attach a 'blogs' property to the user mapper. + mapper(Blog, tables.blogs, properties={ + 'id':tables.blogs.c.blog_id, + 'owner':relation(user.User, lazy=False, backref=backref('blogs', cascade="all, delete-orphan")), + }) + + # topic mapper. map all topic columns to the Topic class. + mapper(Topic, tables.topics) + + # TopicAssocation mapper. This is an "association" object, which is similar to + # a many-to-many relationship except extra data is associated with each pair + # of related data. because the topic_xref table doesnt have a primary key, + # the "primary key" columns of a TopicAssociation are defined manually here. + mapper(TopicAssociation,tables.topic_xref, + primary_key=[tables.topic_xref.c.post_id, tables.topic_xref.c.topic_id], + properties={ + 'topic':relation(Topic, lazy=False), + }) + + # Post mapper, these are posts within a blog. + # since we want the count of comments for each post, create a select that will get the posts + # and count the comments in one query. + posts_with_ccount = select( + [c for c in tables.posts.c if c.key != 'body'] + [ + func.count(tables.comments.c.comment_id).label('comment_count') + ], + from_obj = [ + outerjoin(tables.posts, tables.comments) + ], + group_by=[ + c for c in tables.posts.c if c.key != 'body' + ] + ) .alias('postswcount') + + # then create a Post mapper on that query. + # we have the body as "deferred" so that it loads only when needed, + # the user as a Lazy load, since the lazy load will run only once per user and + # its usually only one user's posts is needed per page, + # the owning blog is a lazy load since its also probably loaded into the identity map + # already, and topics is an eager load since that query has to be done per post in any + # case. + mapper(Post, posts_with_ccount, properties={ + 'id':posts_with_ccount.c.post_id, + 'body':deferred(tables.posts.c.body), + 'user':relation(user.User, lazy=True, backref=backref('posts', cascade="all, delete-orphan")), + 'blog':relation(Blog, lazy=True, backref=backref('posts', cascade="all, delete-orphan")), + 'topics':relation(TopicAssociation, lazy=False, private=True, association=Topic, backref='post') + }, is_primary=True, order_by=[desc(posts_with_ccount.c.datetime)]) + + + # comment mapper. This mapper is handling a hierarchical relationship on itself, and contains + # a lazy reference both to its parent comment and its list of child comments. + mapper(Comment, tables.comments, properties={ + 'id':tables.comments.c.comment_id, + 'post':relation(Post, lazy=True, backref=backref('comments', cascade="all, delete-orphan")), + 'user':relation(user.User, lazy=False, backref=backref('comments', cascade="all, delete-orphan")), + 'parent':relation(Comment, primaryjoin=tables.comments.c.parent_comment_id==tables.comments.c.comment_id, foreignkey=tables.comments.c.comment_id, lazy=True, uselist=False), + 'replies':relation(Comment,primaryjoin=tables.comments.c.parent_comment_id==tables.comments.c.comment_id, lazy=True, uselist=True, cascade="all"), + }, is_primary=True) + +# we define one special find-by for the comments of a post, which is going to make its own "noload" +# mapper and organize the comments into their correct hierarchy in one pass. hierarchical +# data normally needs to be loaded by separate queries for each set of children, unless you +# use a proprietary extension like CONNECT BY. +def find_by_post(post): + """returns a hierarchical collection of comments based on a given criterion. + uses a mapper that does not lazy load replies or parents, and instead + organizes comments into a hierarchical tree when the result is produced. + """ + q = session().query(Comment).options(noload('replies'), noload('parent')) + comments = q.select_by(post_id=post.id) + result = [] + d = {} + for c in comments: + d[c.id] = c + if c.parent_comment_id is None: + result.append(c) + c.parent=None + else: + parent = d[c.parent_comment_id] + parent.replies.append(c) + c.parent = parent + return result + +Comment.find_by_post = staticmethod(find_by_post) + +def start_session(): + """creates a new session for the start of a request.""" + trans.session = create_session(bind_to=zblog.database.engine, echo_uow=False) + +def session(): + return trans.session + diff --git a/test/zblog/tables.py b/test/zblog/tables.py new file mode 100644 index 0000000000..f01f18921b --- /dev/null +++ b/test/zblog/tables.py @@ -0,0 +1,52 @@ +from sqlalchemy import * + +metadata = MetaData() +"""application table metadata objects are described here.""" + +users = Table('users', metadata, + Column('user_id', Integer, primary_key=True), + Column('user_name', String(30), nullable=False), + Column('fullname', String(100), nullable=False), + Column('password', String(30), nullable=False), + Column('groupname', String(20), nullable=False), + ) + +blogs = Table('blogs', metadata, + Column('blog_id', Integer, primary_key=True), + Column('owner_id', Integer, ForeignKey('users.user_id'), nullable=False), + Column('name', String(100), nullable=False), + Column('description', String(500)) + ) + +posts = Table('posts', metadata, + Column('post_id', Integer, primary_key=True), + Column('blog_id', Integer, ForeignKey('blogs.blog_id'), nullable=False), + Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False), + Column('datetime', DateTime, nullable=False), + Column('headline', String(500)), + Column('summary', String), + Column('body', String), + ) + +topics = Table('topics', metadata, + Column('topic_id', Integer, primary_key=True), + Column('keyword', String(50), nullable=False), + Column('description', String(500)) + ) + +topic_xref = Table('topic_post_xref', metadata, + Column('topic_id', Integer, ForeignKey('topics.topic_id'), nullable=False), + Column('is_primary', Boolean, nullable=False), + Column('post_id', Integer, ForeignKey('posts.post_id'), nullable=False) + ) + +comments = Table('comments', metadata, + Column('comment_id', Integer, primary_key=True), + Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False), + Column('post_id', Integer, ForeignKey('posts.post_id'), nullable=False), + Column('datetime', DateTime, nullable=False), + Column('parent_comment_id', Integer, ForeignKey('comments.comment_id')), + Column('subject', String(500)), + Column('body', String), + ) + diff --git a/test/zblog/tests.py b/test/zblog/tests.py new file mode 100644 index 0000000000..931aa66e81 --- /dev/null +++ b/test/zblog/tests.py @@ -0,0 +1,98 @@ +from testbase import AssertMixin +import testbase +import unittest + +db = testbase.db +from sqlalchemy import * + +from zblog import mappers, tables +from zblog.user import * +from zblog.blog import * + +class ZBlogTest(AssertMixin): + + def create_tables(self): + tables.metadata.create_all(engine=db) + def drop_tables(self): + tables.metadata.drop_all(engine=db) + + def setUpAll(self): + self.create_tables() + def tearDownAll(self): + self.drop_tables() + def tearDown(self): + pass + def setUp(self): + pass + + +class SavePostTest(ZBlogTest): + def setUpAll(self): + super(SavePostTest, self).setUpAll() + mappers.zblog_mappers() + global blog_id, user_id + s = create_session(bind_to=db) + user = User('zbloguser', "Zblog User", "hello", group=administrator) + blog = Blog(owner=user) + blog.name = "this is a blog" + s.save(user) + s.save(blog) + s.flush() + blog_id = blog.id + user_id = user.id + s.close() + + def tearDownAll(self): + clear_mappers() + + def testattach(self): + """test that a transient/pending instance has proper bi-directional behavior. + + this requires that lazy loaders do not fire off for a transient/pending instance.""" + s = create_session(bind_to=db) + + trans = s.create_transaction() + try: + blog = s.query(Blog).get(blog_id) + post = Post(headline="asdf asdf", summary="asdfasfd") + s.save(post) + post.blog_id=blog_id + post.blog = blog + assert post in blog.posts + finally: + trans.rollback() + + def testoptimisticorphans(self): + """test that instances in the session with un-loaded parents will not + get marked as "orphans" and then deleted """ + s = create_session(bind_to=db) + + trans = s.create_transaction() + try: + blog = s.query(Blog).get(blog_id) + post = Post(headline="asdf asdf", summary="asdfasfd") + post.blog = blog + user = s.query(User).get(user_id) + post.user = user + s.save(post) + s.flush() + s.clear() + + blog = s.query(Blog).get(blog_id) + post = blog.posts[0] + comment = Comment(subject="some subject", body="some body") + comment.post = post + comment.user = user + s.flush() + s.clear() + + assert s.query(Post).get(post.id) is not None + + finally: + trans.rollback() + + +if __name__ == "__main__": + testbase.main() + + \ No newline at end of file diff --git a/test/zblog/user.py b/test/zblog/user.py new file mode 100644 index 0000000000..1dca0328ec --- /dev/null +++ b/test/zblog/user.py @@ -0,0 +1,41 @@ +"""user.py - handles user login and validation""" + +import random, string +try: + from crypt import crypt +except: + try: + from fcrypt import crypt + except: + raise "Need fcrypt module on non-Unix platform: http://home.clear.net.nz/pages/c.evans/sw/" + +administrator = 'admin' +user = 'user' +groups = [user, administrator] + +def cryptpw(password, salt=None): + if salt is None: + salt = string.join([chr(random.randint(ord('a'), ord('z'))), chr(random.randint(ord('a'), ord('z')))],'') + return crypt(password, salt) + +def checkpw(password, dbpw): + return cryptpw(password, dbpw[:2]) == dbpw + +class User(object): + def __init__(self, name=None, fullname=None, password=None, group=user): + self.name = name + self.fullname = fullname + self.password = password + self.group = group + + def is_administrator(self): + return self.group == administrator + + def _set_password(self, password): + if password: + self.crypt_password=cryptpw(password) + + password = property(lambda s: None, _set_password) + + def checkpw(self, password): + return checkpw(password, self.crypt_password) \ No newline at end of file