From: Mike Bayer Date: Sat, 1 Sep 2012 00:03:57 +0000 (-0400) Subject: - [bug] Fixed a regression since 0.6 regarding X-Git-Tag: rel_0_7_9~37 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=90ad3c5e16ffe4f972144eebaf7a7e2a0397c9c0;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - [bug] Fixed a regression since 0.6 regarding result-row targeting. It should be possible to use a select() statement with string based columns in it, that is select(['id', 'name']).select_from('mytable'), and have this statement be targetable by Column objects with those names; this is the mechanism by which query(MyClass).from_statement(some_statement) works. At some point the specific case of using select(['id']), which is equivalent to select([literal_column('id')]), stopped working here, so this has been re-instated and of course tested. [ticket:2558] --- diff --git a/CHANGES b/CHANGES index 0044262517..1c2208e0c0 100644 --- a/CHANGES +++ b/CHANGES @@ -71,6 +71,21 @@ CHANGES ultimate name as a name inside the embedded UNION. [ticket:2552] + - [bug] Fixed a regression since 0.6 regarding + result-row targeting. It should be possible + to use a select() statement with string + based columns in it, that is + select(['id', 'name']).select_from('mytable'), + and have this statement be targetable by + Column objects with those names; this is the + mechanism by which + query(MyClass).from_statement(some_statement) + works. At some point the specific case of + using select(['id']), which is equivalent to + select([literal_column('id')]), stopped working + here, so this has been re-instated and of + course tested. [ticket:2558] + - engine - [bug] Fixed bug whereby a disconnect detect + dispose that occurs diff --git a/lib/sqlalchemy/sql/expression.py b/lib/sqlalchemy/sql/expression.py index 495ea0c8dd..ddff1e18bf 100644 --- a/lib/sqlalchemy/sql/expression.py +++ b/lib/sqlalchemy/sql/expression.py @@ -4057,11 +4057,15 @@ class ColumnClause(_Immutable, ColumnElement): self.is_literal = is_literal def _compare_name_for_result(self, other): - if self.table is not None and hasattr(other, 'proxy_set'): - return other.proxy_set.intersection(self.proxy_set) - else: + if self.is_literal or \ + self.table is None or \ + not hasattr(other, 'proxy_set') or ( + isinstance(other, ColumnClause) and other.is_literal + ): return super(ColumnClause, self).\ _compare_name_for_result(other) + else: + return other.proxy_set.intersection(self.proxy_set) def _get_table(self): return self.__dict__['table'] diff --git a/test/orm/test_query.py b/test/orm/test_query.py index c2af6b84b7..87a1ed3fec 100644 --- a/test/orm/test_query.py +++ b/test/orm/test_query.py @@ -1693,32 +1693,72 @@ class TextTest(QueryTest): def test_fulltext(self): User = self.classes.User - assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).from_statement("select * from users order by id").all() + eq_( + create_session().query(User). + from_statement("select * from users order by id").all(), + [User(id=7), User(id=8), User(id=9), User(id=10)] + ) - assert User(id=7) == create_session().query(User).from_statement("select * from users order by id").first() - assert None == create_session().query(User).from_statement("select * from users where name='nonexistent'").first() + eq_( + create_session().query(User). + from_statement("select * from users order by id").first(), + User(id=7) + ) + eq_( + create_session().query(User). + from_statement( + "select * from users where name='nonexistent'").first(), + None + ) def test_fragment(self): User = self.classes.User - assert [User(id=8), User(id=9)] == create_session().query(User).filter("id in (8, 9)").all() + eq_( + create_session().query(User).filter("id in (8, 9)").all(), + [User(id=8), User(id=9)] - assert [User(id=9)] == create_session().query(User).filter("name='fred'").filter("id=9").all() + ) - assert [User(id=9)] == create_session().query(User).filter("name='fred'").filter(User.id==9).all() + eq_( + create_session().query(User).filter("name='fred'"). + filter("id=9").all(), + [User(id=9)] + ) + eq_( + create_session().query(User).filter("name='fred'"). + filter(User.id == 9).all(), + [User(id=9)] + ) def test_binds(self): User = self.classes.User - assert [User(id=8), User(id=9)] == create_session().query(User).filter("id in (:id1, :id2)").params(id1=8, id2=9).all() + eq_( + create_session().query(User).filter("id in (:id1, :id2)").\ + params(id1=8, id2=9).all(), + [User(id=8), User(id=9)] + ) def test_as_column(self): User = self.classes.User s = create_session() - assert_raises(sa_exc.InvalidRequestError, s.query, User.id, text("users.name")) + assert_raises(sa_exc.InvalidRequestError, s.query, + User.id, text("users.name")) + + eq_(s.query(User.id, "name").order_by(User.id).all(), + [(7, u'jack'), (8, u'ed'), (9, u'fred'), (10, u'chuck')]) - eq_(s.query(User.id, "name").order_by(User.id).all(), [(7, u'jack'), (8, u'ed'), (9, u'fred'), (10, u'chuck')]) + def test_via_select(self): + User = self.classes.User + s = create_session() + eq_( + s.query(User).from_statement( + select(['id', 'name']).select_from('users').order_by('id'), + ).all(), + [User(id=7), User(id=8), User(id=9), User(id=10)] + ) class ParentTest(QueryTest, AssertsCompiledSQL): __dialect__ = 'default' diff --git a/test/sql/test_query.py b/test/sql/test_query.py index f46ffd8123..0b696894cf 100644 --- a/test/sql/test_query.py +++ b/test/sql/test_query.py @@ -325,12 +325,16 @@ class QueryTest(fixtures.TestBase): row = testing.db.execute(select([content.c.type.label("content_type")])).first() assert content.c.type in row + assert bar.c.content_type not in row + assert sql.column('content_type') in row row = testing.db.execute(select([func.now().label("content_type")])).first() assert content.c.type not in row + assert bar.c.content_type not in row + assert sql.column('content_type') in row def test_pickled_rows(self): @@ -731,7 +735,23 @@ class QueryTest(fixtures.TestBase): dict(user_id=1, user_name='john'), dict(user_id=2, user_name='jack') ) - r = text("select * from query_users where user_id=2", bind=testing.db).execute().first() + r = testing.db.execute( + text("select * from query_users where user_id=2") + ).first() + self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) + self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') + + def test_column_accessor_textual_select(self): + users.insert().execute( + dict(user_id=1, user_name='john'), + dict(user_id=2, user_name='jack') + ) + # this will create column() objects inside + # the select(), these need to match on name anyway + r = testing.db.execute( + select(['user_id', 'user_name']).select_from('query_users'). + where('user_id=2') + ).first() self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') @@ -742,9 +762,11 @@ class QueryTest(fixtures.TestBase): # test a little sqlite weirdness - with the UNION, # cols come back as "query_users.user_id" in cursor.description - r = text("select query_users.user_id, query_users.user_name from query_users " - "UNION select query_users.user_id, query_users.user_name from query_users", - bind=testing.db).execute().first() + r = testing.db.execute( + text("select query_users.user_id, query_users.user_name from query_users " + "UNION select query_users.user_id, query_users.user_name from query_users" + ) + ).first() eq_(r['user_id'], 1) eq_(r['user_name'], "john") eq_(r.keys(), ["user_id", "user_name"])