raise
def fetchmany(self, size=None):
- """Fetch many rows, just like DB-API ``cursor.fetchmany(size=cursor.arraysize)``."""
+ """Fetch many rows, just like DB-API ``cursor.fetchmany(size=cursor.arraysize)``.
+
+ If rows are present, the cursor remains open after this is called.
+ Else the cursor is automatically closed and an empty list is returned.
+
+ """
try:
process_row = self._process_row
raise
def fetchone(self):
- """Fetch one row, just like DB-API ``cursor.fetchone()``."""
+ """Fetch one row, just like DB-API ``cursor.fetchone()``.
+
+ If a row is present, the cursor remains open after this is called.
+ Else the cursor is automatically closed and None is returned.
+
+ """
try:
row = self._fetchone_impl()
self.connection._handle_dbapi_exception(e, None, None, self.cursor, self.context)
raise
- def scalar(self):
- """Fetch the first column of the first row, and close the result set."""
-
+ def first(self):
+ """Fetch the first row and then close the result set unconditionally.
+
+ Returns None if no row is present.
+
+ """
try:
row = self._fetchone_impl()
except Exception, e:
try:
if row is not None:
- return self._process_row(self, row)[0]
+ return self._process_row(self, row)
else:
return None
finally:
self.close()
-
+
+
+ def scalar(self):
+ """Fetch the first column of the first row, and close the result set.
+
+ Returns None if no row is present.
+
+ """
+ row = self.first()
+ if row is not None:
+ return row[0]
+ else:
+ return None
class BufferedRowResultProxy(ResultProxy):
"""A ResultProxy with row buffering behavior.
of scope unless explicitly fetched. Currently this includes just
cx_Oracle LOB objects, but this behavior is known to exist in
other DB-APIs as well (Pygresql, currently unsupported).
+
"""
_process_row = BufferedColumnRow
from sqlalchemy import exc, sql
from sqlalchemy.engine import default
from sqlalchemy.test import *
-from sqlalchemy.test.testing import eq_
+from sqlalchemy.test.testing import eq_, assert_raises_message
class QueryTest(TestBase):
assert users.count().scalar() == 1
users.update(users.c.user_id == 7).execute(user_name = 'fred')
- assert users.select(users.c.user_id==7).execute().fetchone()['user_name'] == 'fred'
+ assert users.select(users.c.user_id==7).execute().first()['user_name'] == 'fred'
def test_lastrow_accessor(self):
"""Tests the last_inserted_ids() and lastrow_has_id() functions."""
if result.lastrow_has_defaults():
criterion = and_(*[col==id for col, id in zip(table.primary_key, result.last_inserted_ids())])
- row = table.select(criterion).execute().fetchone()
- try:
- for c in table.c:
- ret[c.key] = row[c]
- finally:
- row.close()
+ row = table.select(criterion).execute().first()
+ for c in table.c:
+ ret[c.key] = row[c]
return ret
for supported, table, values, assertvalues in [
def test_row_comparison(self):
users.insert().execute(user_id = 7, user_name = 'jack')
- rp = users.select().execute().fetchone()
+ rp = users.select().execute().first()
self.assert_(rp == rp)
self.assert_(not(rp != rp))
eq_(testing.db.execute(select([or_(false, false)])).scalar(), False)
eq_(testing.db.execute(select([not_(or_(false, false))])).scalar(), True)
- row = testing.db.execute(select([or_(false, false).label("x"), and_(true, false).label("y")])).fetchone()
+ row = testing.db.execute(select([or_(false, false).label("x"), and_(true, false).label("y")])).first()
assert row.x == False
assert row.y == False
- row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).fetchone()
+ row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).first()
assert row.x == True
assert row.y == False
s = select([datetable.alias('x').c.today]).as_scalar()
s2 = select([datetable.c.id, s.label('somelabel')])
#print s2.c.somelabel.type
- assert isinstance(s2.execute().fetchone()['somelabel'], datetime.datetime)
+ assert isinstance(s2.execute().first()['somelabel'], datetime.datetime)
finally:
datetable.drop()
users.insert().execute(user_id=2, user_name='jack')
addresses.insert().execute(address_id=1, user_id=2, address='foo@bar.com')
- r = users.select(users.c.user_id==2).execute().fetchone()
+ r = users.select(users.c.user_id==2).execute().first()
self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
-
- r = text("select * from query_users where user_id=2", bind=testing.db).execute().fetchone()
+
+ r = text("select * from query_users where user_id=2", bind=testing.db).execute().first()
self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
-
+
# test slices
- r = text("select * from query_addresses", bind=testing.db).execute().fetchone()
+ r = text("select * from query_addresses", bind=testing.db).execute().first()
self.assert_(r[0:1] == (1,))
self.assert_(r[1:] == (2, 'foo@bar.com'))
self.assert_(r[:-1] == (1, 2))
-
+
# test a little sqlite weirdness - with the UNION, cols come back as "query_users.user_id" in cursor.description
r = text("select query_users.user_id, query_users.user_name from query_users "
- "UNION select query_users.user_id, query_users.user_name from query_users", bind=testing.db).execute().fetchone()
+ "UNION select query_users.user_id, query_users.user_name from query_users", bind=testing.db).execute().first()
self.assert_(r['user_id']) == 1
self.assert_(r['user_name']) == "john"
# test using literal tablename.colname
- r = text('select query_users.user_id AS "query_users.user_id", query_users.user_name AS "query_users.user_name" from query_users', bind=testing.db).execute().fetchone()
+ r = text('select query_users.user_id AS "query_users.user_id", '
+ 'query_users.user_name AS "query_users.user_name" from query_users',
+ bind=testing.db).execute().first()
self.assert_(r['query_users.user_id']) == 1
self.assert_(r['query_users.user_name']) == "john"
@testing.fails_on('oracle', 'oracle result keys() are all uppercase, not getting into this.')
def test_row_as_args(self):
users.insert().execute(user_id=1, user_name='john')
- r = users.select(users.c.user_id==1).execute().fetchone()
+ r = users.select(users.c.user_id==1).execute().first()
users.delete().execute()
users.insert().execute(r)
eq_(users.select().execute().fetchall(), [(1, 'john')])
def test_ambiguous_column(self):
users.insert().execute(user_id=1, user_name='john')
- r = users.outerjoin(addresses).select().execute().fetchone()
- try:
- print r['user_id']
- assert False
- except exc.InvalidRequestError, e:
- assert str(e) == "Ambiguous column name 'user_id' in result set! try 'use_labels' option on select statement." or \
- str(e) == "Ambiguous column name 'USER_ID' in result set! try 'use_labels' option on select statement."
+ r = users.outerjoin(addresses).select().execute().first()
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "Ambiguous column name",
+ lambda: r['user_id']
+ )
@testing.requires.subqueries
def test_column_label_targeting(self):
users.select().alias('foo'),
users.select().alias(users.name),
):
- row = s.select(use_labels=True).execute().fetchone()
- try:
- assert row[s.c.user_id] == 7
- assert row[s.c.user_name] == 'ed'
- finally:
- row.close()
+ row = s.select(use_labels=True).execute().first()
+ assert row[s.c.user_id] == 7
+ assert row[s.c.user_name] == 'ed'
def test_keys(self):
users.insert().execute(user_id=1, user_name='foo')
- r = users.select().execute().fetchone()
- try:
- eq_([x.lower() for x in r.keys()], ['user_id', 'user_name'])
- finally:
- r.close()
+ r = users.select().execute().first()
+ eq_([x.lower() for x in r.keys()], ['user_id', 'user_name'])
def test_items(self):
users.insert().execute(user_id=1, user_name='foo')
- r = users.select().execute().fetchone()
- try:
- eq_([(x[0].lower(), x[1]) for x in r.items()], [('user_id', 1), ('user_name', 'foo')])
- finally:
- r.close()
+ r = users.select().execute().first()
+ eq_([(x[0].lower(), x[1]) for x in r.items()], [('user_id', 1), ('user_name', 'foo')])
def test_len(self):
users.insert().execute(user_id=1, user_name='foo')
- try:
- r = users.select().execute().fetchone()
- eq_(len(r), 2)
- finally:
- r.close()
+ r = users.select().execute().first()
+ eq_(len(r), 2)
- r = testing.db.execute('select user_name, user_id from query_users').fetchone()
- try:
- eq_(len(r), 2)
- finally:
- r.close()
- try:
- r = testing.db.execute('select user_name from query_users').fetchone()
- eq_(len(r), 1)
- finally:
- r.close()
+ r = testing.db.execute('select user_name, user_id from query_users').first()
+ eq_(len(r), 2)
+ r = testing.db.execute('select user_name from query_users').first()
+ eq_(len(r), 1)
def test_cant_execute_join(self):
try:
def test_column_order_with_simple_query(self):
# should return values in column definition order
users.insert().execute(user_id=1, user_name='foo')
- r = users.select(users.c.user_id==1).execute().fetchone()
+ r = users.select(users.c.user_id==1).execute().first()
eq_(r[0], 1)
eq_(r[1], 'foo')
eq_([x.lower() for x in r.keys()], ['user_id', 'user_name'])
def test_column_order_with_text_query(self):
# should return values in query order
users.insert().execute(user_id=1, user_name='foo')
- r = testing.db.execute('select user_name, user_id from query_users').fetchone()
+ r = testing.db.execute('select user_name, user_id from query_users').first()
eq_(r[0], 'foo')
eq_(r[1], 1)
eq_([x.lower() for x in r.keys()], ['user_name', 'user_id'])
shadowed.create(checkfirst=True)
try:
shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', row='Without light there is no shadow', __parent='Hidden parent', __row='Hidden row')
- r = shadowed.select(shadowed.c.shadow_id==1).execute().fetchone()
+ r = shadowed.select(shadowed.c.shadow_id==1).execute().first()
self.assert_(r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1)
self.assert_(r.shadow_name == r['shadow_name'] == r[shadowed.c.shadow_name] == 'The Shadow')
self.assert_(r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light')