]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- added basic 'zblog' test suite
authorMike Bayer <mike_mp@zzzcomputing.com>
Tue, 5 Sep 2006 17:16:26 +0000 (17:16 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 5 Sep 2006 17:16:26 +0000 (17:16 +0000)
- better error message for mapper orphan detect

lib/sqlalchemy/orm/mapper.py
test/alltests.py
test/zblog/__init__.py [new file with mode: 0644]
test/zblog/alltests.py [new file with mode: 0644]
test/zblog/blog.py [new file with mode: 0644]
test/zblog/mappers.py [new file with mode: 0644]
test/zblog/tables.py [new file with mode: 0644]
test/zblog/tests.py [new file with mode: 0644]
test/zblog/user.py [new file with mode: 0644]

index fe0525b4acf591db99c6b95122fc30fa1922b2cf..4197401f7d4a2011190aa77d63d5eb4181772bbb 100644 (file)
@@ -145,7 +145,7 @@ class Mapper(object):
         for (key,klass) in self.delete_orphans:
             if not getattr(klass, key).hasparent(obj, optimistic=optimistic):
                 if not has_identity(obj):
-                    raise exceptions.FlushError("instance %s is an unsaved, pending instance and is an orphan" % obj)
+                    raise exceptions.FlushError("instance %s is an unsaved, pending instance and is an orphan (is not attached to any parent '%s' instance via that classes' '%s' attribute)" % (obj, klass.__name__, key))
                 return True
         else:
             return False
index 147d26d2cf88e861ef6c3b0cb5cddaafe8589a1c..c60e73641d52449c3b141970444ee0499f4dbada 100644 (file)
@@ -6,10 +6,11 @@ import base.alltests as base
 import sql.alltests as sql
 import engine.alltests as engine
 import ext.alltests as ext
+import zblog.alltests as zblog
 
 def suite():
     alltests = unittest.TestSuite()
-    for suite in (base, engine, sql, orm, ext):
+    for suite in (base, engine, sql, orm, ext, zblog):
         alltests.addTest(suite.suite())
     return alltests
 
diff --git a/test/zblog/__init__.py b/test/zblog/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/test/zblog/alltests.py b/test/zblog/alltests.py
new file mode 100644 (file)
index 0000000..9368eb8
--- /dev/null
@@ -0,0 +1,18 @@
+import testbase
+import unittest
+
+def suite():
+    modules_to_test = (
+        'zblog.tests',
+        )
+    alltests = unittest.TestSuite()
+    for name in modules_to_test:
+        mod = __import__(name)
+        for token in name.split('.')[1:]:
+            mod = getattr(mod, token)
+        alltests.addTest(unittest.findTestCases(mod, suiteClass=None))
+    return alltests
+
+
+if __name__ == '__main__':
+    testbase.runTests(suite())
diff --git a/test/zblog/blog.py b/test/zblog/blog.py
new file mode 100644 (file)
index 0000000..e234bbb
--- /dev/null
@@ -0,0 +1,36 @@
+__all__ = ['Blog', 'Post', 'Topic', 'TopicAssociation', 'Comment']
+
+import datetime
+
+class Blog(object):
+    def __init__(self, owner=None):
+        self.owner = owner
+        
+class Post(object):
+    topics = set
+    def __init__(self, user=None, headline=None, summary=None):
+        self.user = user
+        self.datetime = datetime.datetime.today()
+        self.headline = headline
+        self.summary = summary
+        self.comments = []
+        self.comment_count = 0
+        
+class Topic(object):
+    def __init__(self, keyword=None, description=None):
+        self.keyword = keyword
+        self.description = description
+
+class TopicAssociation(object):
+    def __init__(self, post=None, topic=None, is_primary=False):
+        self.post = post
+        self.topic = topic
+        self.is_primary = is_primary
+              
+class Comment(object):
+    def __init__(self, subject=None, body=None):
+        self.subject = subject
+        self.datetime = datetime.datetime.today()
+        self.body = body
+        
+        
diff --git a/test/zblog/mappers.py b/test/zblog/mappers.py
new file mode 100644 (file)
index 0000000..831e6f6
--- /dev/null
@@ -0,0 +1,114 @@
+"""mapper.py - defines mappers for domain objects, mapping operations"""
+
+import zblog.tables as tables
+import zblog.user as user
+from zblog.blog import *
+from sqlalchemy import *
+import sqlalchemy.util as util
+
+def zblog_mappers():
+    # User mapper.  Here, we redefine the names of some of the columns
+    # to different property names.  normally the table columns are all
+    # sucked in automatically.
+    mapper(user.User, tables.users, properties={
+        'id':tables.users.c.user_id,
+        'name':tables.users.c.user_name,
+        'group':tables.users.c.groupname,
+        'crypt_password':tables.users.c.password,
+    })
+
+    # blog mapper.  this contains a reference to the user mapper,
+    # and also installs a "backreference" on that relationship to handle it
+    # in both ways. this will also attach a 'blogs' property to the user mapper.
+    mapper(Blog, tables.blogs, properties={
+        'id':tables.blogs.c.blog_id,
+        'owner':relation(user.User, lazy=False, backref=backref('blogs', cascade="all, delete-orphan")),
+    })
+
+    # topic mapper.  map all topic columns to the Topic class.
+    mapper(Topic, tables.topics)
+        
+    # TopicAssocation mapper.  This is an "association" object, which is similar to
+    # a many-to-many relationship except extra data is associated with each pair 
+    # of related data.  because the topic_xref table doesnt have a primary key,
+    # the "primary key" columns of a TopicAssociation are defined manually here.
+    mapper(TopicAssociation,tables.topic_xref, 
+                primary_key=[tables.topic_xref.c.post_id, tables.topic_xref.c.topic_id], 
+                properties={
+                    'topic':relation(Topic, lazy=False),
+                })
+
+    # Post mapper, these are posts within a blog.  
+    # since we want the count of comments for each post, create a select that will get the posts
+    # and count the comments in one query.
+    posts_with_ccount = select(
+        [c for c in tables.posts.c if c.key != 'body'] + [
+            func.count(tables.comments.c.comment_id).label('comment_count')
+        ],
+        from_obj = [
+            outerjoin(tables.posts, tables.comments)
+        ],
+        group_by=[
+            c for c in tables.posts.c if c.key != 'body'
+        ]
+        ) .alias('postswcount')
+
+    # then create a Post mapper on that query.  
+    # we have the body as "deferred" so that it loads only when needed,
+    # the user as a Lazy load, since the lazy load will run only once per user and
+    # its usually only one user's posts is needed per page,
+    # the owning blog is a lazy load since its also probably loaded into the identity map
+    # already, and topics is an eager load since that query has to be done per post in any
+    # case.
+    mapper(Post, posts_with_ccount, properties={
+        'id':posts_with_ccount.c.post_id,
+        'body':deferred(tables.posts.c.body),
+        'user':relation(user.User, lazy=True, backref=backref('posts', cascade="all, delete-orphan")),
+        'blog':relation(Blog, lazy=True, backref=backref('posts', cascade="all, delete-orphan")),
+        'topics':relation(TopicAssociation, lazy=False, private=True, association=Topic, backref='post')
+    }, is_primary=True, order_by=[desc(posts_with_ccount.c.datetime)])
+
+
+    # comment mapper.  This mapper is handling a hierarchical relationship on itself, and contains
+    # a lazy reference both to its parent comment and its list of child comments.
+    mapper(Comment, tables.comments, properties={
+        'id':tables.comments.c.comment_id,
+        'post':relation(Post, lazy=True, backref=backref('comments', cascade="all, delete-orphan")),
+        'user':relation(user.User, lazy=False, backref=backref('comments', cascade="all, delete-orphan")),
+        'parent':relation(Comment, primaryjoin=tables.comments.c.parent_comment_id==tables.comments.c.comment_id, foreignkey=tables.comments.c.comment_id, lazy=True, uselist=False),
+        'replies':relation(Comment,primaryjoin=tables.comments.c.parent_comment_id==tables.comments.c.comment_id, lazy=True, uselist=True, cascade="all"),
+    }, is_primary=True)
+
+# we define one special find-by for the comments of a post, which is going to make its own "noload"
+# mapper and organize the comments into their correct hierarchy in one pass. hierarchical
+# data normally needs to be loaded by separate queries for each set of children, unless you
+# use a proprietary extension like CONNECT BY.
+def find_by_post(post):
+    """returns a hierarchical collection of comments based on a given criterion.  
+    uses a mapper that does not lazy load replies or parents, and instead
+    organizes comments into a hierarchical tree when the result is produced.
+    """
+    q = session().query(Comment).options(noload('replies'), noload('parent'))
+    comments = q.select_by(post_id=post.id)
+    result = []
+    d = {}
+    for c in comments:
+        d[c.id] = c
+        if c.parent_comment_id is None:
+            result.append(c)
+            c.parent=None
+        else:
+            parent = d[c.parent_comment_id]
+            parent.replies.append(c)
+            c.parent = parent
+    return result
+
+Comment.find_by_post = staticmethod(find_by_post)
+
+def start_session():
+    """creates a new session for the start of a request."""
+    trans.session = create_session(bind_to=zblog.database.engine, echo_uow=False)
+
+def session():
+    return trans.session
+    
diff --git a/test/zblog/tables.py b/test/zblog/tables.py
new file mode 100644 (file)
index 0000000..f01f189
--- /dev/null
@@ -0,0 +1,52 @@
+from sqlalchemy import *
+
+metadata = MetaData()
+"""application table metadata objects are described here."""
+
+users = Table('users', metadata, 
+    Column('user_id', Integer, primary_key=True),
+    Column('user_name', String(30), nullable=False),
+    Column('fullname', String(100), nullable=False),
+    Column('password', String(30), nullable=False),
+    Column('groupname', String(20), nullable=False),
+    )
+
+blogs = Table('blogs', metadata, 
+    Column('blog_id', Integer, primary_key=True),
+    Column('owner_id', Integer, ForeignKey('users.user_id'), nullable=False),
+    Column('name', String(100), nullable=False),
+    Column('description', String(500))
+    )
+    
+posts = Table('posts', metadata,
+    Column('post_id', Integer, primary_key=True),
+    Column('blog_id', Integer, ForeignKey('blogs.blog_id'), nullable=False),
+    Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False),
+    Column('datetime', DateTime, nullable=False),
+    Column('headline', String(500)),
+    Column('summary', String),
+    Column('body', String),
+    )
+    
+topics = Table('topics', metadata,
+    Column('topic_id', Integer, primary_key=True),
+    Column('keyword', String(50), nullable=False),
+    Column('description', String(500))
+   )
+  
+topic_xref = Table('topic_post_xref', metadata, 
+    Column('topic_id', Integer, ForeignKey('topics.topic_id'), nullable=False),
+    Column('is_primary', Boolean, nullable=False),
+    Column('post_id', Integer, ForeignKey('posts.post_id'), nullable=False)
+   )
+
+comments = Table('comments', metadata, 
+    Column('comment_id', Integer, primary_key=True),
+    Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False),
+    Column('post_id', Integer, ForeignKey('posts.post_id'), nullable=False),
+    Column('datetime', DateTime, nullable=False),
+    Column('parent_comment_id', Integer, ForeignKey('comments.comment_id')),
+    Column('subject', String(500)),
+    Column('body', String),
+    )
+    
diff --git a/test/zblog/tests.py b/test/zblog/tests.py
new file mode 100644 (file)
index 0000000..931aa66
--- /dev/null
@@ -0,0 +1,98 @@
+from testbase import AssertMixin
+import testbase
+import unittest
+
+db = testbase.db
+from sqlalchemy import *
+
+from zblog import mappers, tables
+from zblog.user import *
+from zblog.blog import *
+
+class ZBlogTest(AssertMixin):
+
+    def create_tables(self):
+        tables.metadata.create_all(engine=db)
+    def drop_tables(self):
+        tables.metadata.drop_all(engine=db)
+        
+    def setUpAll(self):
+        self.create_tables()
+    def tearDownAll(self):
+        self.drop_tables()
+    def tearDown(self):
+        pass
+    def setUp(self):
+        pass
+
+
+class SavePostTest(ZBlogTest):
+    def setUpAll(self):
+        super(SavePostTest, self).setUpAll()
+        mappers.zblog_mappers()
+        global blog_id, user_id
+        s = create_session(bind_to=db)
+        user = User('zbloguser', "Zblog User", "hello", group=administrator)
+        blog = Blog(owner=user)
+        blog.name = "this is a blog"
+        s.save(user)
+        s.save(blog)
+        s.flush()
+        blog_id = blog.id
+        user_id = user.id
+        s.close()
+
+    def tearDownAll(self):
+        clear_mappers()
+        
+    def testattach(self):
+        """test that a transient/pending instance has proper bi-directional behavior.
+        
+        this requires that lazy loaders do not fire off for a transient/pending instance."""
+        s = create_session(bind_to=db)
+
+        trans = s.create_transaction()
+        try:
+            blog = s.query(Blog).get(blog_id)
+            post = Post(headline="asdf asdf", summary="asdfasfd")
+            s.save(post)
+            post.blog_id=blog_id
+            post.blog = blog
+            assert post in blog.posts
+        finally:
+            trans.rollback()
+            
+    def testoptimisticorphans(self):
+        """test that instances in the session with un-loaded parents will not 
+        get marked as "orphans" and then deleted """
+        s = create_session(bind_to=db)
+        
+        trans = s.create_transaction()
+        try:
+            blog = s.query(Blog).get(blog_id)
+            post = Post(headline="asdf asdf", summary="asdfasfd")
+            post.blog = blog
+            user = s.query(User).get(user_id)
+            post.user = user
+            s.save(post)
+            s.flush()
+            s.clear()
+
+            blog = s.query(Blog).get(blog_id)
+            post = blog.posts[0]
+            comment = Comment(subject="some subject", body="some body")
+            comment.post = post
+            comment.user = user
+            s.flush()
+            s.clear()
+            
+            assert s.query(Post).get(post.id) is not None
+            
+        finally:
+            trans.rollback()
+        
+            
+if __name__ == "__main__":
+    testbase.main()
+
+        
\ No newline at end of file
diff --git a/test/zblog/user.py b/test/zblog/user.py
new file mode 100644 (file)
index 0000000..1dca032
--- /dev/null
@@ -0,0 +1,41 @@
+"""user.py - handles user login and validation"""
+
+import random, string
+try:
+    from crypt import crypt
+except:
+    try:
+        from fcrypt import crypt
+    except:
+        raise "Need fcrypt module on non-Unix platform:  http://home.clear.net.nz/pages/c.evans/sw/"
+
+administrator = 'admin'
+user = 'user'
+groups = [user, administrator]
+
+def cryptpw(password, salt=None):
+    if salt is None:
+        salt = string.join([chr(random.randint(ord('a'), ord('z'))), chr(random.randint(ord('a'), ord('z')))],'')
+    return crypt(password, salt)
+    
+def checkpw(password, dbpw):
+    return cryptpw(password, dbpw[:2]) == dbpw
+
+class User(object):
+    def __init__(self, name=None, fullname=None, password=None, group=user):
+        self.name = name
+        self.fullname = fullname
+        self.password = password
+        self.group = group
+
+    def is_administrator(self):
+        return self.group == administrator
+
+    def _set_password(self, password):
+        if password:
+            self.crypt_password=cryptpw(password)
+
+    password = property(lambda s: None, _set_password)
+
+    def checkpw(self, password):
+        return checkpw(password, self.crypt_password)
\ No newline at end of file