]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- added first() method to ResultProxy
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 20 Jun 2009 14:53:32 +0000 (14:53 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 20 Jun 2009 14:53:32 +0000 (14:53 +0000)
- corrected all the hanging fetchone()s in test_query so that jython passes without hanging

06CHANGES
lib/sqlalchemy/engine/base.py
test/sql/test_query.py

index 34489a6b44cabe9a6acfc69e70776fd99f6a799d..c71eb1fdba3a46eea777a3fa80f938e2c2b03d72 100644 (file)
--- a/06CHANGES
+++ b/06CHANGES
@@ -12,7 +12,8 @@
     - transaction isolation level may be specified with
       create_engine(... isolation_level="..."); available on
       postgresql and sqlite. [ticket:443]
-
+    - added first() method to ResultProxy, returns first row and closes
+      result set immediately.
 
 - schema
     - deprecated metadata.connect() and threadlocalmetadata.connect() have been 
index 7d2ab0a12a9a6cdb196f24a9c9aca653b7e681b7..dfe59924622e1f9600ee9d8813df9ea736d1684a 100644 (file)
@@ -1801,7 +1801,12 @@ class ResultProxy(object):
             raise
 
     def fetchmany(self, size=None):
-        """Fetch many rows, just like DB-API ``cursor.fetchmany(size=cursor.arraysize)``."""
+        """Fetch many rows, just like DB-API ``cursor.fetchmany(size=cursor.arraysize)``.
+        
+        If rows are present, the cursor remains open after this is called.
+        Else the cursor is automatically closed and an empty list is returned.
+        
+        """
 
         try:
             process_row = self._process_row
@@ -1814,7 +1819,12 @@ class ResultProxy(object):
             raise
 
     def fetchone(self):
-        """Fetch one row, just like DB-API ``cursor.fetchone()``."""
+        """Fetch one row, just like DB-API ``cursor.fetchone()``.
+        
+        If a row is present, the cursor remains open after this is called.
+        Else the cursor is automatically closed and None is returned.
+        
+        """
 
         try:
             row = self._fetchone_impl()
@@ -1827,9 +1837,12 @@ class ResultProxy(object):
             self.connection._handle_dbapi_exception(e, None, None, self.cursor, self.context)
             raise
 
-    def scalar(self):
-        """Fetch the first column of the first row, and close the result set."""
-
+    def first(self):
+        """Fetch the first row and then close the result set unconditionally.
+        
+        Returns None if no row is present.
+        
+        """
         try:
             row = self._fetchone_impl()
         except Exception, e:
@@ -1838,12 +1851,24 @@ class ResultProxy(object):
 
         try:
             if row is not None:
-                return self._process_row(self, row)[0]
+                return self._process_row(self, row)
             else:
                 return None
         finally:
             self.close()
-
+        
+        
+    def scalar(self):
+        """Fetch the first column of the first row, and close the result set.
+        
+        Returns None if no row is present.
+        
+        """
+        row = self.first()
+        if row is not None:
+            return row[0]
+        else:
+            return None
 
 class BufferedRowResultProxy(ResultProxy):
     """A ResultProxy with row buffering behavior.
@@ -1913,6 +1938,7 @@ class BufferedColumnResultProxy(ResultProxy):
     of scope unless explicitly fetched.  Currently this includes just
     cx_Oracle LOB objects, but this behavior is known to exist in
     other DB-APIs as well (Pygresql, currently unsupported).
+    
     """
 
     _process_row = BufferedColumnRow
index 6711802130da277915f334d359ba4575798b38c4..f22a5c22a3201eb0657207b6360ce617ff674916 100644 (file)
@@ -4,7 +4,7 @@ from sqlalchemy import *
 from sqlalchemy import exc, sql
 from sqlalchemy.engine import default
 from sqlalchemy.test import *
-from sqlalchemy.test.testing import eq_
+from sqlalchemy.test.testing import eq_, assert_raises_message
 
 class QueryTest(TestBase):
 
@@ -54,7 +54,7 @@ class QueryTest(TestBase):
         assert users.count().scalar() == 1
 
         users.update(users.c.user_id == 7).execute(user_name = 'fred')
-        assert users.select(users.c.user_id==7).execute().fetchone()['user_name'] == 'fred'
+        assert users.select(users.c.user_id==7).execute().first()['user_name'] == 'fred'
 
     def test_lastrow_accessor(self):
         """Tests the last_inserted_ids() and lastrow_has_id() functions."""
@@ -74,12 +74,9 @@ class QueryTest(TestBase):
 
             if result.lastrow_has_defaults():
                 criterion = and_(*[col==id for col, id in zip(table.primary_key, result.last_inserted_ids())])
-                row = table.select(criterion).execute().fetchone()
-                try:
-                    for c in table.c:
-                        ret[c.key] = row[c]
-                finally:
-                    row.close()
+                row = table.select(criterion).execute().first()
+                for c in table.c:
+                    ret[c.key] = row[c]
             return ret
 
         for supported, table, values, assertvalues in [
@@ -200,7 +197,7 @@ class QueryTest(TestBase):
         
     def test_row_comparison(self):
         users.insert().execute(user_id = 7, user_name = 'jack')
-        rp = users.select().execute().fetchone()
+        rp = users.select().execute().first()
 
         self.assert_(rp == rp)
         self.assert_(not(rp != rp))
@@ -223,11 +220,11 @@ class QueryTest(TestBase):
         eq_(testing.db.execute(select([or_(false, false)])).scalar(), False)
         eq_(testing.db.execute(select([not_(or_(false, false))])).scalar(), True)
 
-        row = testing.db.execute(select([or_(false, false).label("x"), and_(true, false).label("y")])).fetchone()
+        row = testing.db.execute(select([or_(false, false).label("x"), and_(true, false).label("y")])).first()
         assert row.x == False
         assert row.y == False
 
-        row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).fetchone()
+        row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).first()
         assert row.x == True
         assert row.y == False
         
@@ -380,7 +377,7 @@ class QueryTest(TestBase):
             s = select([datetable.alias('x').c.today]).as_scalar()
             s2 = select([datetable.c.id, s.label('somelabel')])
             #print s2.c.somelabel.type
-            assert isinstance(s2.execute().fetchone()['somelabel'], datetime.datetime)
+            assert isinstance(s2.execute().first()['somelabel'], datetime.datetime)
         finally:
             datetable.drop()
 
@@ -451,35 +448,37 @@ class QueryTest(TestBase):
         users.insert().execute(user_id=2, user_name='jack')
         addresses.insert().execute(address_id=1, user_id=2, address='foo@bar.com')
 
-        r = users.select(users.c.user_id==2).execute().fetchone()
+        r = users.select(users.c.user_id==2).execute().first()
         self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
         self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
-
-        r = text("select * from query_users where user_id=2", bind=testing.db).execute().fetchone()
+        
+        r = text("select * from query_users where user_id=2", bind=testing.db).execute().first()
         self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
         self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
-
+        
         # test slices
-        r = text("select * from query_addresses", bind=testing.db).execute().fetchone()
+        r = text("select * from query_addresses", bind=testing.db).execute().first()
         self.assert_(r[0:1] == (1,))
         self.assert_(r[1:] == (2, 'foo@bar.com'))
         self.assert_(r[:-1] == (1, 2))
-
+        
         # test a little sqlite weirdness - with the UNION, cols come back as "query_users.user_id" in cursor.description
         r = text("select query_users.user_id, query_users.user_name from query_users "
-            "UNION select query_users.user_id, query_users.user_name from query_users", bind=testing.db).execute().fetchone()
+            "UNION select query_users.user_id, query_users.user_name from query_users", bind=testing.db).execute().first()
         self.assert_(r['user_id']) == 1
         self.assert_(r['user_name']) == "john"
 
         # test using literal tablename.colname
-        r = text('select query_users.user_id AS "query_users.user_id", query_users.user_name AS "query_users.user_name" from query_users', bind=testing.db).execute().fetchone()
+        r = text('select query_users.user_id AS "query_users.user_id", '
+                'query_users.user_name AS "query_users.user_name" from query_users', 
+                bind=testing.db).execute().first()
         self.assert_(r['query_users.user_id']) == 1
         self.assert_(r['query_users.user_name']) == "john"
 
     @testing.fails_on('oracle', 'oracle result keys() are all uppercase, not getting into this.')
     def test_row_as_args(self):
         users.insert().execute(user_id=1, user_name='john')
-        r = users.select(users.c.user_id==1).execute().fetchone()
+        r = users.select(users.c.user_id==1).execute().first()
         users.delete().execute()
         users.insert().execute(r)
         eq_(users.select().execute().fetchall(), [(1, 'john')])
@@ -498,13 +497,12 @@ class QueryTest(TestBase):
         
     def test_ambiguous_column(self):
         users.insert().execute(user_id=1, user_name='john')
-        r = users.outerjoin(addresses).select().execute().fetchone()
-        try:
-            print r['user_id']
-            assert False
-        except exc.InvalidRequestError, e:
-            assert str(e) == "Ambiguous column name 'user_id' in result set! try 'use_labels' option on select statement." or \
-                   str(e) == "Ambiguous column name 'USER_ID' in result set! try 'use_labels' option on select statement."
+        r = users.outerjoin(addresses).select().execute().first()
+        assert_raises_message(
+            exc.InvalidRequestError,
+            "Ambiguous column name",
+            lambda: r['user_id']
+        )
 
     @testing.requires.subqueries
     def test_column_label_targeting(self):
@@ -514,47 +512,29 @@ class QueryTest(TestBase):
             users.select().alias('foo'),
             users.select().alias(users.name),
         ):
-            row = s.select(use_labels=True).execute().fetchone()
-            try:
-                assert row[s.c.user_id] == 7
-                assert row[s.c.user_name] == 'ed'
-            finally:
-                row.close()
+            row = s.select(use_labels=True).execute().first()
+            assert row[s.c.user_id] == 7
+            assert row[s.c.user_name] == 'ed'
 
     def test_keys(self):
         users.insert().execute(user_id=1, user_name='foo')
-        r = users.select().execute().fetchone()
-        try:
-            eq_([x.lower() for x in r.keys()], ['user_id', 'user_name'])
-        finally:
-            r.close()
+        r = users.select().execute().first()
+        eq_([x.lower() for x in r.keys()], ['user_id', 'user_name'])
 
     def test_items(self):
         users.insert().execute(user_id=1, user_name='foo')
-        r = users.select().execute().fetchone()
-        try:
-            eq_([(x[0].lower(), x[1]) for x in r.items()], [('user_id', 1), ('user_name', 'foo')])
-        finally:
-            r.close()
+        r = users.select().execute().first()
+        eq_([(x[0].lower(), x[1]) for x in r.items()], [('user_id', 1), ('user_name', 'foo')])
 
     def test_len(self):
         users.insert().execute(user_id=1, user_name='foo')
-        try:
-            r = users.select().execute().fetchone()
-            eq_(len(r), 2)
-        finally:
-            r.close()
+        r = users.select().execute().first()
+        eq_(len(r), 2)
             
-        r = testing.db.execute('select user_name, user_id from query_users').fetchone()
-        try:
-            eq_(len(r), 2)
-        finally:
-            r.close()
-        try:
-            r = testing.db.execute('select user_name from query_users').fetchone()
-            eq_(len(r), 1)
-        finally:
-            r.close()
+        r = testing.db.execute('select user_name, user_id from query_users').first()
+        eq_(len(r), 2)
+        r = testing.db.execute('select user_name from query_users').first()
+        eq_(len(r), 1)
 
     def test_cant_execute_join(self):
         try:
@@ -567,7 +547,7 @@ class QueryTest(TestBase):
     def test_column_order_with_simple_query(self):
         # should return values in column definition order
         users.insert().execute(user_id=1, user_name='foo')
-        r = users.select(users.c.user_id==1).execute().fetchone()
+        r = users.select(users.c.user_id==1).execute().first()
         eq_(r[0], 1)
         eq_(r[1], 'foo')
         eq_([x.lower() for x in r.keys()], ['user_id', 'user_name'])
@@ -576,7 +556,7 @@ class QueryTest(TestBase):
     def test_column_order_with_text_query(self):
         # should return values in query order
         users.insert().execute(user_id=1, user_name='foo')
-        r = testing.db.execute('select user_name, user_id from query_users').fetchone()
+        r = testing.db.execute('select user_name, user_id from query_users').first()
         eq_(r[0], 'foo')
         eq_(r[1], 1)
         eq_([x.lower() for x in r.keys()], ['user_name', 'user_id'])
@@ -598,7 +578,7 @@ class QueryTest(TestBase):
         shadowed.create(checkfirst=True)
         try:
             shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', row='Without light there is no shadow', __parent='Hidden parent', __row='Hidden row')
-            r = shadowed.select(shadowed.c.shadow_id==1).execute().fetchone()
+            r = shadowed.select(shadowed.c.shadow_id==1).execute().first()
             self.assert_(r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1)
             self.assert_(r.shadow_name == r['shadow_name'] == r[shadowed.c.shadow_name] == 'The Shadow')
             self.assert_(r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light')