]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- [bug] Fixed a regression since 0.6 regarding
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 1 Sep 2012 00:03:57 +0000 (20:03 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 1 Sep 2012 00:03:57 +0000 (20:03 -0400)
    result-row targeting.   It should be possible
    to use a select() statement with string
    based columns in it, that is
    select(['id', 'name']).select_from('mytable'),
    and have this statement be targetable by
    Column objects with those names; this is the
    mechanism by which
    query(MyClass).from_statement(some_statement)
    works.  At some point the specific case of
    using select(['id']), which is equivalent to
    select([literal_column('id')]), stopped working
    here, so this has been re-instated and of
    course tested. [ticket:2558]

CHANGES
lib/sqlalchemy/sql/expression.py
test/orm/test_query.py
test/sql/test_query.py

diff --git a/CHANGES b/CHANGES
index 0044262517e7721196b028cf13079492701db8c6..1c2208e0c0b7d9999aae3aa90253bffc58360ce4 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -71,6 +71,21 @@ CHANGES
     ultimate name as a name inside the embedded
     UNION. [ticket:2552]
 
+  - [bug] Fixed a regression since 0.6 regarding
+    result-row targeting.   It should be possible
+    to use a select() statement with string
+    based columns in it, that is
+    select(['id', 'name']).select_from('mytable'),
+    and have this statement be targetable by
+    Column objects with those names; this is the
+    mechanism by which
+    query(MyClass).from_statement(some_statement)
+    works.  At some point the specific case of
+    using select(['id']), which is equivalent to
+    select([literal_column('id')]), stopped working
+    here, so this has been re-instated and of
+    course tested. [ticket:2558]
+
 - engine
   - [bug] Fixed bug whereby
     a disconnect detect + dispose that occurs
index 495ea0c8dde3228977be468d7323947190962302..ddff1e18bf76a5f1b6d50dbb0a0aa5474475cae8 100644 (file)
@@ -4057,11 +4057,15 @@ class ColumnClause(_Immutable, ColumnElement):
         self.is_literal = is_literal
 
     def _compare_name_for_result(self, other):
-        if self.table is not None and hasattr(other, 'proxy_set'):
-            return other.proxy_set.intersection(self.proxy_set)
-        else:
+        if self.is_literal or \
+            self.table is None or \
+            not hasattr(other, 'proxy_set') or (
+            isinstance(other, ColumnClause) and other.is_literal
+        ):
             return super(ColumnClause, self).\
                     _compare_name_for_result(other)
+        else:
+            return other.proxy_set.intersection(self.proxy_set)
 
     def _get_table(self):
         return self.__dict__['table']
index c2af6b84b7d6c40ad519c6be89a7dfc3da0ed61f..87a1ed3fec69297652c0de9c2767ec2d5712c6ae 100644 (file)
@@ -1693,32 +1693,72 @@ class TextTest(QueryTest):
     def test_fulltext(self):
         User = self.classes.User
 
-        assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).from_statement("select * from users order by id").all()
+        eq_(
+            create_session().query(User).
+            from_statement("select * from users order by id").all(),
+            [User(id=7), User(id=8), User(id=9), User(id=10)]
+        )
 
-        assert User(id=7) == create_session().query(User).from_statement("select * from users order by id").first()
-        assert None == create_session().query(User).from_statement("select * from users where name='nonexistent'").first()
+        eq_(
+            create_session().query(User).
+                from_statement("select * from users order by id").first(),
+            User(id=7)
+        )
+        eq_(
+            create_session().query(User).
+                from_statement(
+                    "select * from users where name='nonexistent'").first(),
+            None
+        )
 
     def test_fragment(self):
         User = self.classes.User
 
-        assert [User(id=8), User(id=9)] == create_session().query(User).filter("id in (8, 9)").all()
+        eq_(
+            create_session().query(User).filter("id in (8, 9)").all(),
+            [User(id=8), User(id=9)]
 
-        assert [User(id=9)] == create_session().query(User).filter("name='fred'").filter("id=9").all()
+        )
 
-        assert [User(id=9)] == create_session().query(User).filter("name='fred'").filter(User.id==9).all()
+        eq_(
+            create_session().query(User).filter("name='fred'").
+                filter("id=9").all(),
+            [User(id=9)]
+        )
+        eq_(
+            create_session().query(User).filter("name='fred'").
+                filter(User.id == 9).all(),
+            [User(id=9)]
+        )
 
     def test_binds(self):
         User = self.classes.User
 
-        assert [User(id=8), User(id=9)] == create_session().query(User).filter("id in (:id1, :id2)").params(id1=8, id2=9).all()
+        eq_(
+            create_session().query(User).filter("id in (:id1, :id2)").\
+                params(id1=8, id2=9).all(),
+            [User(id=8), User(id=9)]
+        )
 
     def test_as_column(self):
         User = self.classes.User
 
         s = create_session()
-        assert_raises(sa_exc.InvalidRequestError, s.query, User.id, text("users.name"))
+        assert_raises(sa_exc.InvalidRequestError, s.query,
+                    User.id, text("users.name"))
+
+        eq_(s.query(User.id, "name").order_by(User.id).all(),
+                [(7, u'jack'), (8, u'ed'), (9, u'fred'), (10, u'chuck')])
 
-        eq_(s.query(User.id, "name").order_by(User.id).all(), [(7, u'jack'), (8, u'ed'), (9, u'fred'), (10, u'chuck')])
+    def test_via_select(self):
+        User = self.classes.User
+        s = create_session()
+        eq_(
+            s.query(User).from_statement(
+                select(['id', 'name']).select_from('users').order_by('id'),
+            ).all(),
+            [User(id=7), User(id=8), User(id=9), User(id=10)]
+        )
 
 class ParentTest(QueryTest, AssertsCompiledSQL):
     __dialect__ = 'default'
index f46ffd8123e18825d3be39e22c484e31f3d8dd89..0b696894cf7aa1fc684eea77278a58b4b2b9a282 100644 (file)
@@ -325,12 +325,16 @@ class QueryTest(fixtures.TestBase):
 
         row = testing.db.execute(select([content.c.type.label("content_type")])).first()
         assert content.c.type in row
+
         assert bar.c.content_type not in row
+
         assert sql.column('content_type') in row
 
         row = testing.db.execute(select([func.now().label("content_type")])).first()
         assert content.c.type not in row
+
         assert bar.c.content_type not in row
+
         assert sql.column('content_type') in row
 
     def test_pickled_rows(self):
@@ -731,7 +735,23 @@ class QueryTest(fixtures.TestBase):
             dict(user_id=1, user_name='john'),
             dict(user_id=2, user_name='jack')
         )
-        r = text("select * from query_users where user_id=2", bind=testing.db).execute().first()
+        r = testing.db.execute(
+                text("select * from query_users where user_id=2")
+            ).first()
+        self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
+        self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
+
+    def test_column_accessor_textual_select(self):
+        users.insert().execute(
+            dict(user_id=1, user_name='john'),
+            dict(user_id=2, user_name='jack')
+        )
+        # this will create column() objects inside
+        # the select(), these need to match on name anyway
+        r = testing.db.execute(
+            select(['user_id', 'user_name']).select_from('query_users').
+                where('user_id=2')
+        ).first()
         self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
         self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
 
@@ -742,9 +762,11 @@ class QueryTest(fixtures.TestBase):
 
         # test a little sqlite weirdness - with the UNION,
         # cols come back as "query_users.user_id" in cursor.description
-        r = text("select query_users.user_id, query_users.user_name from query_users "
-            "UNION select query_users.user_id, query_users.user_name from query_users",
-            bind=testing.db).execute().first()
+        r = testing.db.execute(
+                text("select query_users.user_id, query_users.user_name from query_users "
+                "UNION select query_users.user_id, query_users.user_name from query_users"
+                )
+            ).first()
         eq_(r['user_id'], 1)
         eq_(r['user_name'], "john")
         eq_(r.keys(), ["user_id", "user_name"])