]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- the zblog example is obsolete, the tests don't really test it
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 17 Jan 2011 01:18:32 +0000 (20:18 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 17 Jan 2011 01:18:32 +0000 (20:18 -0500)
and a key feature of its mapping (the deferred col outside of the select)
doesn't work anyway.
- add a token "deferred on selectable" test to test_mapper.

test/orm/test_mapper.py
test/zblog/__init__.py [deleted file]
test/zblog/blog.py [deleted file]
test/zblog/mappers.py [deleted file]
test/zblog/tables.py [deleted file]
test/zblog/test_zblog.py [deleted file]
test/zblog/user.py [deleted file]

index b34c4ab8afae17582dbe95beecddd89f3644d22f..4de669b879a460bd1baec411dd64f4e2b5c5ae93 100644 (file)
@@ -1892,6 +1892,28 @@ class DeferredTest(_fixtures.FixtureTest):
             eq_(o1.description, 'order 1')
         self.sql_count_(0, go)
 
+    @testing.resolve_artifact_names
+    def test_map_selectable_wo_deferred(self):
+        """test mapping to a selectable with deferred cols,
+        the selectable doesn't include the deferred col.
+        
+        """
+
+        order_select = sa.select([
+                        orders.c.id, 
+                        orders.c.user_id, 
+                        orders.c.address_id, 
+                        orders.c.description,
+                        orders.c.isopen]).alias()
+        mapper(Order, order_select, properties={
+            'description':deferred(order_select.c.description)
+        })
+
+        sess = Session()
+        o1 = sess.query(Order).order_by(Order.id).first()
+        assert 'description' not in o1.__dict__
+        eq_(o1.description, 'order 1')
+
     @testing.resolve_artifact_names
     def test_deep_options(self):
         mapper(Item, items, properties=dict(
diff --git a/test/zblog/__init__.py b/test/zblog/__init__.py
deleted file mode 100644 (file)
index e69de29..0000000
diff --git a/test/zblog/blog.py b/test/zblog/blog.py
deleted file mode 100644 (file)
index 4c76354..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-import datetime
-
-
-__all__ = ['Blog', 'Post', 'Topic', 'TopicAssociation', 'Comment']
-
-class Blog(object):
-    def __init__(self, owner=None):
-        self.owner = owner
-
-class Post(object):
-    topics = set
-    def __init__(self, user=None, headline=None, summary=None):
-        self.user = user
-        self.datetime = datetime.datetime.today()
-        self.headline = headline
-        self.summary = summary
-        self.comments = []
-        self.comment_count = 0
-
-class Topic(object):
-    def __init__(self, keyword=None, description=None):
-        self.keyword = keyword
-        self.description = description
-
-class TopicAssociation(object):
-    def __init__(self, post=None, topic=None, is_primary=False):
-        self.post = post
-        self.topic = topic
-        self.is_primary = is_primary
-
-class Comment(object):
-    def __init__(self, subject=None, body=None):
-        self.subject = subject
-        self.datetime = datetime.datetime.today()
-        self.body = body
diff --git a/test/zblog/mappers.py b/test/zblog/mappers.py
deleted file mode 100644 (file)
index 4ad542e..0000000
+++ /dev/null
@@ -1,135 +0,0 @@
-"""mapper.py - defines mappers for domain objects, mapping operations"""
-
-from test.zblog import tables, user
-from test.zblog.blog import *
-from sqlalchemy import *
-from sqlalchemy.orm import *
-import sqlalchemy.util as util
-
-def zblog_mappers():
-    # User mapper.  Here, we redefine the names of some of the columns to
-    # different property names.  normally the table columns are all sucked in
-    # automatically.
-    mapper(user.User, tables.users, properties={
-        'id':tables.users.c.user_id,
-        'name':tables.users.c.user_name,
-        'group':tables.users.c.groupname,
-        'crypt_password':tables.users.c.password,
-    })
-
-    # blog mapper.  this contains a reference to the user mapper, and also
-    # installs a "backreference" on that relationship to handle it in both
-    # ways. this will also attach a 'blogs' property to the user mapper.
-    mapper(Blog, tables.blogs, properties={
-        'id':tables.blogs.c.blog_id,
-        'owner':relationship(user.User, lazy='joined',
-                         backref=backref('blogs', cascade="all, delete-orphan")),
-    })
-
-    # topic mapper.  map all topic columns to the Topic class.
-    mapper(Topic, tables.topics)
-
-    # TopicAssocation mapper.  This is an "association" object, which is
-    # similar to a many-to-many relationship except extra data is associated
-    # with each pair of related data.  because the topic_xref table doesnt
-    # have a primary key, the "primary key" columns of a TopicAssociation are
-    # defined manually here.
-    mapper(TopicAssociation,tables.topic_xref,
-                primary_key=[tables.topic_xref.c.post_id,
-                             tables.topic_xref.c.topic_id],
-                properties={
-                    'topic':relationship(Topic, lazy='joined'),
-                })
-
-    # Post mapper, these are posts within a blog.
-    # since we want the count of comments for each post, create a select that
-    # will get the posts and count the comments in one query.
-    posts_with_ccount = select(
-        [c for c in tables.posts.c if c.key != 'body'] + [
-            func.count(tables.comments.c.comment_id).label('comment_count')
-        ],
-        from_obj = [
-            outerjoin(tables.posts, tables.comments)
-        ],
-        group_by=[
-            c for c in tables.posts.c if c.key != 'body'
-        ]
-        ) .alias('postswcount')
-
-    # then create a Post mapper on that query.
-    # we have the body as "deferred" so that it loads only when needed, the
-    # user as a Lazy load, since the lazy load will run only once per user and
-    # its usually only one user's posts is needed per page, the owning blog is
-    # a lazy load since its also probably loaded into the identity map
-    # already, and topics is an eager load since that query has to be done per
-    # post in any case.
-    mapper(Post, posts_with_ccount, properties={
-        'id':posts_with_ccount.c.post_id,
-        'body':deferred(tables.posts.c.body),
-        'user':relationship(user.User, lazy='select',
-                        backref=backref('posts', cascade="all, delete-orphan")),
-        'blog':relationship(Blog, lazy='select',
-                        backref=backref('posts', cascade="all, delete-orphan")),
-        'topics':relationship(TopicAssociation, lazy='joined',
-                          cascade="all, delete-orphan",
-                          backref='post')
-    }, order_by=[desc(posts_with_ccount.c.datetime)])
-
-
-    # comment mapper.  This mapper is handling a hierarchical relationship on
-    # itself, and contains a lazy reference both to its parent comment and its
-    # list of child comments.
-    mapper(Comment, tables.comments, properties={
-        'id':tables.comments.c.comment_id,
-        'post':relationship(Post, lazy='select',
-                        backref=backref('comments',
-                                        cascade="all, delete-orphan")),
-        'user':relationship(user.User, lazy='joined',
-                        backref=backref('comments',
-                                        cascade="all, delete-orphan")),
-        'parent':relationship(Comment,
-                          primaryjoin=(tables.comments.c.parent_comment_id ==
-                                       tables.comments.c.comment_id),
-                          foreign_keys=[tables.comments.c.comment_id],
-                          lazy='select', uselist=False),
-        'replies':relationship(Comment,
-                           primaryjoin=(tables.comments.c.parent_comment_id ==
-                                        tables.comments.c.comment_id),
-                           lazy='select', uselist=True, cascade="all"),
-    })
-
-# we define one special find-by for the comments of a post, which is going to
-# make its own "noload" mapper and organize the comments into their correct
-# hierarchy in one pass. hierarchical data normally needs to be loaded by
-# separate queries for each set of children, unless you use a proprietary
-# extension like CONNECT BY.
-def find_by_post(post):
-    """returns a hierarchical collection of comments based on a given criterion.
-
-    Uses a mapper that does not lazy load replies or parents, and instead
-    organizes comments into a hierarchical tree when the result is produced.
-    """
-
-    q = session().query(Comment).options(noload('replies'), noload('parent'))
-    comments = q.select_by(post_id=post.id)
-    result = []
-    d = {}
-    for c in comments:
-        d[c.id] = c
-        if c.parent_comment_id is None:
-            result.append(c)
-            c.parent=None
-        else:
-            parent = d[c.parent_comment_id]
-            parent.replies.append(c)
-            c.parent = parent
-    return result
-
-Comment.find_by_post = staticmethod(find_by_post)
-
-def start_session():
-    """creates a new session for the start of a request."""
-    trans.session = create_session(bind_to=zblog.database.engine )
-
-def session():
-    return trans.session
diff --git a/test/zblog/tables.py b/test/zblog/tables.py
deleted file mode 100644 (file)
index 3326c28..0000000
+++ /dev/null
@@ -1,53 +0,0 @@
-"""application table metadata objects are described here."""
-
-from sqlalchemy import *
-from test.lib.schema import Table, Column
-
-metadata = MetaData()
-
-users = Table('users', metadata,
-    Column('user_id', Integer, primary_key=True, test_needs_autoincrement=True),
-    Column('user_name', String(30), nullable=False),
-    Column('fullname', String(100), nullable=False),
-    Column('password', String(40), nullable=False),
-    Column('groupname', String(20), nullable=False),
-    )
-
-blogs = Table('blogs', metadata,
-    Column('blog_id', Integer, primary_key=True, test_needs_autoincrement=True),
-    Column('owner_id', Integer, ForeignKey('users.user_id'), nullable=False),
-    Column('name', String(100), nullable=False),
-    Column('description', String(500))
-    )
-
-posts = Table('posts', metadata,
-    Column('post_id', Integer, primary_key=True, test_needs_autoincrement=True),
-    Column('blog_id', Integer, ForeignKey('blogs.blog_id'), nullable=False),
-    Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False),
-    Column('datetime', DateTime, nullable=False),
-    Column('headline', String(500)),
-    Column('summary', String(255)),
-    Column('body', Text),
-    )
-
-topics = Table('topics', metadata,
-    Column('topic_id', Integer, primary_key=True, test_needs_autoincrement=True),
-    Column('keyword', String(50), nullable=False),
-    Column('description', String(500))
-   )
-
-topic_xref = Table('topic_post_xref', metadata,
-    Column('topic_id', Integer, ForeignKey('topics.topic_id'), nullable=False),
-    Column('is_primary', Boolean, nullable=False),
-    Column('post_id', Integer, ForeignKey('posts.post_id'), nullable=False)
-   )
-
-comments = Table('comments', metadata,
-    Column('comment_id', Integer, primary_key=True, test_needs_autoincrement=True),
-    Column('user_id', Integer, ForeignKey('users.user_id'), nullable=False),
-    Column('post_id', Integer, ForeignKey('posts.post_id'), nullable=False),
-    Column('datetime', DateTime, nullable=False),
-    Column('parent_comment_id', Integer, ForeignKey('comments.comment_id')),
-    Column('subject', String(500)),
-    Column('body', Text),
-    )
diff --git a/test/zblog/test_zblog.py b/test/zblog/test_zblog.py
deleted file mode 100644 (file)
index 0ef1293..0000000
+++ /dev/null
@@ -1,116 +0,0 @@
-from sqlalchemy import *
-from sqlalchemy.orm import *
-from test.lib import *
-from test.zblog import mappers, tables
-from test.zblog.user import *
-from test.zblog.blog import *
-
-
-class ZBlogTest(TestBase, AssertsExecutionResults):
-
-    @classmethod
-    def create_tables(cls):
-        tables.metadata.drop_all(bind=testing.db)
-        tables.metadata.create_all(bind=testing.db)
-
-    @classmethod
-    def drop_tables(cls):
-        tables.metadata.drop_all(bind=testing.db)
-
-    @classmethod
-    def setup_class(cls):
-        cls.create_tables()
-    @classmethod
-    def teardown_class(cls):
-        cls.drop_tables()
-    def teardown(self):
-        pass
-    def setup(self):
-        pass
-
-
-class SavePostTest(ZBlogTest):
-    @classmethod
-    def setup_class(cls):
-        super(SavePostTest, cls).setup_class()
-
-        mappers.zblog_mappers()
-        global blog_id, user_id
-        s = create_session(bind=testing.db)
-        user = User('zbloguser', "Zblog User", "hello", group=administrator)
-        blog = Blog(owner=user)
-        blog.name = "this is a blog"
-        s.add(user)
-        s.add(blog)
-        s.flush()
-        blog_id = blog.id
-        user_id = user.id
-        s.close()
-
-    @classmethod
-    def teardown_class(cls):
-        clear_mappers()
-        super(SavePostTest, cls).teardown_class()
-
-    def test_attach_noautoflush(self):
-        """Test pending backref behavior."""
-
-        s = create_session(bind=testing.db, autoflush=False)
-
-        s.begin()
-        try:
-            blog = s.query(Blog).get(blog_id)
-            post = Post(headline="asdf asdf", summary="asdfasfd")
-            s.add(post)
-            post.blog_id=blog_id
-            post.blog = blog
-            assert post in blog.posts
-        finally:
-            s.rollback()
-
-    def test_attach_autoflush(self):
-        s = create_session(bind=testing.db, autoflush=True)
-
-        s.begin()
-        try:
-            blog = s.query(Blog).get(blog_id)
-            user = s.query(User).get(user_id)
-            post = Post(headline="asdf asdf", summary="asdfasfd", user=user)
-            s.add(post)
-            post.blog_id=blog_id
-            post.blog = blog
-            assert post in blog.posts
-        finally:
-            s.rollback()
-
-    def testoptimisticorphans(self):
-        """test that instances in the session with un-loaded parents will not
-        get marked as "orphans" and then deleted """
-        s = create_session(bind=testing.db)
-
-        s.begin()
-        try:
-            blog = s.query(Blog).get(blog_id)
-            post = Post(headline="asdf asdf", summary="asdfasfd")
-            post.blog = blog
-            user = s.query(User).get(user_id)
-            post.user = user
-            s.add(post)
-            s.flush()
-            s.expunge_all()
-
-            user = s.query(User).get(user_id)
-            blog = s.query(Blog).get(blog_id)
-            post = blog.posts[0]
-            comment = Comment(subject="some subject", body="some body")
-            comment.post = post
-            comment.user = user
-            s.flush()
-            s.expunge_all()
-
-            assert s.query(Post).get(post.id) is not None
-
-        finally:
-            s.rollback()
-
-
diff --git a/test/zblog/user.py b/test/zblog/user.py
deleted file mode 100644 (file)
index 30f1e3d..0000000
+++ /dev/null
@@ -1,41 +0,0 @@
-"""user.py - handles user login and validation"""
-
-import random, string
-
-import sys
-if sys.version_info < (2, 5):
-    from sha import sha
-else:
-    from hashlib import sha1 as sha
-
-administrator = 'admin'
-user = 'user'
-groups = [user, administrator]
-
-def cryptpw(password, salt=None):
-    if salt is None:
-        salt = "".join([chr(random.randint(ord('a'), ord('z'))),
-                            chr(random.randint(ord('a'), ord('z')))])
-    return sha((password+ salt).encode('ascii')).hexdigest()
-
-def checkpw(password, dbpw):
-    return cryptpw(password, dbpw[:2]) == dbpw
-
-class User(object):
-    def __init__(self, name=None, fullname=None, password=None, group=user):
-        self.name = name
-        self.fullname = fullname
-        self.password = password
-        self.group = group
-
-    def is_administrator(self):
-        return self.group == administrator
-
-    def _set_password(self, password):
-        if password:
-            self.crypt_password=cryptpw(password)
-
-    password = property(lambda s: None, _set_password)
-
-    def checkpw(self, password):
-        return checkpw(password, self.crypt_password)