for (key,klass) in self.delete_orphans:
if not getattr(klass, key).hasparent(obj, optimistic=optimistic):
if not has_identity(obj):
- raise exceptions.FlushError("instance %s is an unsaved, pending instance and is an orphan" % obj)
+ raise exceptions.FlushError("instance %s is an unsaved, pending instance and is an orphan (is not attached to any parent '%s' instance via that classes' '%s' attribute)" % (obj, klass.__name__, key))
return True
else:
return False
import sql.alltests as sql
import engine.alltests as engine
import ext.alltests as ext
+import zblog.alltests as zblog
def suite():
alltests = unittest.TestSuite()
- for suite in (base, engine, sql, orm, ext):
+ for suite in (base, engine, sql, orm, ext, zblog):
alltests.addTest(suite.suite())
return alltests
--- /dev/null
+import testbase
+import unittest
+
+def suite():
+ modules_to_test = (
+ 'zblog.tests',
+ )
+ alltests = unittest.TestSuite()
+ for name in modules_to_test:
+ mod = __import__(name)
+ for token in name.split('.')[1:]:
+ mod = getattr(mod, token)
+ alltests.addTest(unittest.findTestCases(mod, suiteClass=None))
+ return alltests
+
+
+if __name__ == '__main__':
+ testbase.runTests(suite())
--- /dev/null
+__all__ = ['Blog', 'Post', 'Topic', 'TopicAssociation', 'Comment']
+
+import datetime
+
+class Blog(object):
+ def __init__(self, owner=None):
+ self.owner = owner
+
+class Post(object):
+ topics = set
+ def __init__(self, user=None, headline=None, summary=None):
+ self.user = user
+ self.datetime = datetime.datetime.today()
+ self.headline = headline
+ self.summary = summary
+ self.comments = []
+ self.comment_count = 0
+
+class Topic(object):
+ def __init__(self, keyword=None, description=None):
+ self.keyword = keyword
+ self.description = description
+
+class TopicAssociation(object):
+ def __init__(self, post=None, topic=None, is_primary=False):
+ self.post = post
+ self.topic = topic
+ self.is_primary = is_primary
+
+class Comment(object):
+ def __init__(self, subject=None, body=None):
+ self.subject = subject
+ self.datetime = datetime.datetime.today()
+ self.body = body
+
+
--- /dev/null
+"""mapper.py - defines mappers for domain objects, mapping operations"""
+
+import zblog.tables as tables
+import zblog.user as user
+from zblog.blog import *
+from sqlalchemy import *
+import sqlalchemy.util as util
+
+def zblog_mappers():
+ # User mapper. Here, we redefine the names of some of the columns
+ # to different property names. normally the table columns are all
+ # sucked in automatically.
+ mapper(user.User, tables.users, properties={
+ 'id':tables.users.c.user_id,
+ 'name':tables.users.c.user_name,
+ 'group':tables.users.c.groupname,
+ 'crypt_password':tables.users.c.password,
+ })
+
+ # blog mapper. this contains a reference to the user mapper,
+ # and also installs a "backreference" on that relationship to handle it
+ # in both ways. this will also attach a 'blogs' property to the user mapper.
+ mapper(Blog, tables.blogs, properties={
+ 'id':tables.blogs.c.blog_id,
+ 'owner':relation(user.User, lazy=False, backref=backref('blogs', cascade="all, delete-orphan")),
+ })
+
+ # topic mapper. map all topic columns to the Topic class.
+ mapper(Topic, tables.topics)
+
+ # TopicAssocation mapper. This is an "association" object, which is similar to
+ # a many-to-many relationship except extra data is associated with each pair
+ # of related data. because the topic_xref table doesnt have a primary key,
+ # the "primary key" columns of a TopicAssociation are defined manually here.
+ mapper(TopicAssociation,tables.topic_xref,
+ primary_key=[tables.topic_xref.c.post_id, tables.topic_xref.c.topic_id],
+ properties={
+ 'topic':relation(Topic, lazy=False),
+ })
+
+ # Post mapper, these are posts within a blog.
+ # since we want the count of comments for each post, create a select that will get the posts
+ # and count the comments in one query.
+ posts_with_ccount = select(
+ [c for c in tables.posts.c if c.key != 'body'] + [
+ func.count(tables.comments.c.comment_id).label('comment_count')
+ ],
+ from_obj = [
+ outerjoin(tables.posts, tables.comments)
+ ],
+ group_by=[
+ c for c in tables.posts.c if c.key != 'body'
+ ]
+ ) .alias('postswcount')
+
+ # then create a Post mapper on that query.
+ # we have the body as "deferred" so that it loads only when needed,
+ # the user as a Lazy load, since the lazy load will run only once per user and
+ # its usually only one user's posts is needed per page,
+ # the owning blog is a lazy load since its also probably loaded into the identity map
+ # already, and topics is an eager load since that query has to be done per post in any
+ # case.
+ mapper(Post, posts_with_ccount, properties={
+ 'id':posts_with_ccount.c.post_id,
+ 'body':deferred(tables.posts.c.body),
+ 'user':relation(user.User, lazy=True, backref=backref('posts', cascade="all, delete-orphan")),
+ 'blog':relation(Blog, lazy=True, backref=backref('posts', cascade="all, delete-orphan")),
+ 'topics':relation(TopicAssociation, lazy=False, private=True, association=Topic, backref='post')
+ }, is_primary=True, order_by=[desc(posts_with_ccount.c.datetime)])
+
+
+ # comment mapper. This mapper is handling a hierarchical relationship on itself, and contains
+ # a lazy reference both to its parent comment and its list of child comments.
+ mapper(Comment, tables.comments, properties={
+ 'id':tables.comments.c.comment_id,
+ 'post':relation(Post, lazy=True, backref=backref('comments', cascade="all, delete-orphan")),
+ 'user':relation(user.User, lazy=False, backref=backref('comments', cascade="all, delete-orphan")),
+ 'parent':relation(Comment, primaryjoin=tables.comments.c.parent_comment_id==tables.comments.c.comment_id, foreignkey=tables.comments.c.comment_id, lazy=True, uselist=False),
+ 'replies':relation(Comment,primaryjoin=tables.comments.c.parent_comment_id==tables.comments.c.comment_id, lazy=True, uselist=True, cascade="all"),
+ }, is_primary=True)
+
+# we define one special find-by for the comments of a post, which is going to make its own "noload"
+# mapper and organize the comments into their correct hierarchy in one pass. hierarchical
+# data normally needs to be loaded by separate queries for each set of children, unless you
+# use a proprietary extension like CONNECT BY.
+def find_by_post(post):
+ """returns a hierarchical collection of comments based on a given criterion.
+ uses a mapper that does not lazy load replies or parents, and instead
+ organizes comments into a hierarchical tree when the result is produced.
+ """
+ q = session().query(Comment).options(noload('replies'), noload('parent'))
+ comments = q.select_by(post_id=post.id)
+ result = []
+ d = {}
+ for c in comments:
+ d[c.id] = c
+ if c.parent_comment_id is None:
+ result.append(c)
+ c.parent=None
+ else:
+ parent = d[c.parent_comment_id]
+ parent.replies.append(c)
+ c.parent = parent
+ return result
+
+Comment.find_by_post = staticmethod(find_by_post)
+
+def start_session():
+ """creates a new session for the start of a request."""
+ trans.session = create_session(bind_to=zblog.database.engine, echo_uow=False)
+
+def session():
+ return trans.session
+
--- /dev/null
+from sqlalchemy import *
+
+metadata = MetaData()
+"""application table metadata objects are described here."""
+
+users = Table('users', metadata,
+ Column('user_id', Integer, primary_key=True),
+ Column('user_name', String(30), nullable=False),
+ Column('fullname', String(100), nullable=False),
+ Column('password', String(30), nullable=False),
+ Column('groupname', String(20), nullable=False),
+ )
+
+blogs = Table('blogs', metadata,
+ Column('blog_id', Integer, primary_key=True),
+ Column('owner_id', Integer, ForeignKey('users.user_id'), nullable=False),
+ Column('name', String(100), nullable=False),
+ Column('description', String(500))
+ )
+
+posts = Table('posts', metadata,
+ Column('post_id', Integer, primary_key=True),
+ Column('blog_id', Integer, ForeignKey('blogs.blog_id'), nullable=False),
+ Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False),
+ Column('datetime', DateTime, nullable=False),
+ Column('headline', String(500)),
+ Column('summary', String),
+ Column('body', String),
+ )
+
+topics = Table('topics', metadata,
+ Column('topic_id', Integer, primary_key=True),
+ Column('keyword', String(50), nullable=False),
+ Column('description', String(500))
+ )
+
+topic_xref = Table('topic_post_xref', metadata,
+ Column('topic_id', Integer, ForeignKey('topics.topic_id'), nullable=False),
+ Column('is_primary', Boolean, nullable=False),
+ Column('post_id', Integer, ForeignKey('posts.post_id'), nullable=False)
+ )
+
+comments = Table('comments', metadata,
+ Column('comment_id', Integer, primary_key=True),
+ Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False),
+ Column('post_id', Integer, ForeignKey('posts.post_id'), nullable=False),
+ Column('datetime', DateTime, nullable=False),
+ Column('parent_comment_id', Integer, ForeignKey('comments.comment_id')),
+ Column('subject', String(500)),
+ Column('body', String),
+ )
+
--- /dev/null
+from testbase import AssertMixin
+import testbase
+import unittest
+
+db = testbase.db
+from sqlalchemy import *
+
+from zblog import mappers, tables
+from zblog.user import *
+from zblog.blog import *
+
+class ZBlogTest(AssertMixin):
+
+ def create_tables(self):
+ tables.metadata.create_all(engine=db)
+ def drop_tables(self):
+ tables.metadata.drop_all(engine=db)
+
+ def setUpAll(self):
+ self.create_tables()
+ def tearDownAll(self):
+ self.drop_tables()
+ def tearDown(self):
+ pass
+ def setUp(self):
+ pass
+
+
+class SavePostTest(ZBlogTest):
+ def setUpAll(self):
+ super(SavePostTest, self).setUpAll()
+ mappers.zblog_mappers()
+ global blog_id, user_id
+ s = create_session(bind_to=db)
+ user = User('zbloguser', "Zblog User", "hello", group=administrator)
+ blog = Blog(owner=user)
+ blog.name = "this is a blog"
+ s.save(user)
+ s.save(blog)
+ s.flush()
+ blog_id = blog.id
+ user_id = user.id
+ s.close()
+
+ def tearDownAll(self):
+ clear_mappers()
+
+ def testattach(self):
+ """test that a transient/pending instance has proper bi-directional behavior.
+
+ this requires that lazy loaders do not fire off for a transient/pending instance."""
+ s = create_session(bind_to=db)
+
+ trans = s.create_transaction()
+ try:
+ blog = s.query(Blog).get(blog_id)
+ post = Post(headline="asdf asdf", summary="asdfasfd")
+ s.save(post)
+ post.blog_id=blog_id
+ post.blog = blog
+ assert post in blog.posts
+ finally:
+ trans.rollback()
+
+ def testoptimisticorphans(self):
+ """test that instances in the session with un-loaded parents will not
+ get marked as "orphans" and then deleted """
+ s = create_session(bind_to=db)
+
+ trans = s.create_transaction()
+ try:
+ blog = s.query(Blog).get(blog_id)
+ post = Post(headline="asdf asdf", summary="asdfasfd")
+ post.blog = blog
+ user = s.query(User).get(user_id)
+ post.user = user
+ s.save(post)
+ s.flush()
+ s.clear()
+
+ blog = s.query(Blog).get(blog_id)
+ post = blog.posts[0]
+ comment = Comment(subject="some subject", body="some body")
+ comment.post = post
+ comment.user = user
+ s.flush()
+ s.clear()
+
+ assert s.query(Post).get(post.id) is not None
+
+ finally:
+ trans.rollback()
+
+
+if __name__ == "__main__":
+ testbase.main()
+
+
\ No newline at end of file
--- /dev/null
+"""user.py - handles user login and validation"""
+
+import random, string
+try:
+ from crypt import crypt
+except:
+ try:
+ from fcrypt import crypt
+ except:
+ raise "Need fcrypt module on non-Unix platform: http://home.clear.net.nz/pages/c.evans/sw/"
+
+administrator = 'admin'
+user = 'user'
+groups = [user, administrator]
+
+def cryptpw(password, salt=None):
+ if salt is None:
+ salt = string.join([chr(random.randint(ord('a'), ord('z'))), chr(random.randint(ord('a'), ord('z')))],'')
+ return crypt(password, salt)
+
+def checkpw(password, dbpw):
+ return cryptpw(password, dbpw[:2]) == dbpw
+
+class User(object):
+ def __init__(self, name=None, fullname=None, password=None, group=user):
+ self.name = name
+ self.fullname = fullname
+ self.password = password
+ self.group = group
+
+ def is_administrator(self):
+ return self.group == administrator
+
+ def _set_password(self, password):
+ if password:
+ self.crypt_password=cryptpw(password)
+
+ password = property(lambda s: None, _set_password)
+
+ def checkpw(self, password):
+ return checkpw(password, self.crypt_password)
\ No newline at end of file