def fullobject(select):
"""Iterate over the full result row."""
- return list(select.execute().fetchone())
+ return list(select.execute().first())
for x in xrange(ITERATIONS):
# Zoos
for x in xrange(ITERATIONS):
# Edit
- SDZ = Zoo.select(Zoo.c.Name==u'San Diego Zoo').execute().fetchone()
+ SDZ = Zoo.select(Zoo.c.Name==u'San Diego Zoo').execute().first()
Zoo.update(Zoo.c.ID==SDZ['ID']).execute(
Name=u'The San Diego Zoo',
Founded = datetime.date(1900, 1, 1),
Admission = "35.00")
# Test edits
- SDZ = Zoo.select(Zoo.c.Name==u'The San Diego Zoo').execute().fetchone()
+ SDZ = Zoo.select(Zoo.c.Name==u'The San Diego Zoo').execute().first()
assert SDZ['Founded'] == datetime.date(1900, 1, 1), SDZ['Founded']
# Change it back
Admission = "0")
# Test re-edits
- SDZ = Zoo.select(Zoo.c.Name==u'San Diego Zoo').execute().fetchone()
+ SDZ = Zoo.select(Zoo.c.Name==u'San Diego Zoo').execute().first()
assert SDZ['Founded'] == datetime.date(1935, 9, 13)
def test_baseline_7_multiview(self):
try:
t.insert(values=dict(name='dante')).execute()
t.insert(values=dict(name='alighieri')).execute()
- select([func.count(t.c.id)],func.length(t.c.name)==5).execute().fetchone()[0] == 1
+ select([func.count(t.c.id)],func.length(t.c.name)==5).execute().first()[0] == 1
finally:
meta.drop_all()
vals = []
for i in xrange(3):
cr.execute('SELECT busto.NEXTVAL FROM DUAL')
- vals.append(cr.fetchone()[0])
+ vals.append(cr.first()[0])
# should be 1,2,3, but no...
self.assert_(vals != [1,2,3])
result = cattable.insert().values(description='PHP').execute()
eq_([10], result.last_inserted_ids())
lastcat = cattable.select().order_by(desc(cattable.c.id)).execute()
- eq_((10, 'PHP'), lastcat.fetchone())
+ eq_((10, 'PHP'), lastcat.first())
def test_executemany(self):
cattable.insert().execute([
con.execute(u"insert into unitest_table values ('bien mangé')".encode('UTF-8'))
try:
- r = t1.select().execute().fetchone()
+ r = t1.select().execute().first()
assert isinstance(r[1], unicode), '%s is %s instead of unicode, working on %s' % (
r[1], type(r[1]), meta.bind)
tbl.insert().execute()
if 'int_y' in tbl.c:
assert select([tbl.c.int_y]).scalar() == 1
- assert list(tbl.select().execute().fetchone()).count(1) == 1
+ assert list(tbl.select().execute().first()).count(1) == 1
else:
- assert 1 not in list(tbl.select().execute().fetchone())
+ assert 1 not in list(tbl.select().execute().first())
class BinaryTest(TestBase, AssertsExecutionResults):
tbl.insert().execute()
if 'int_y' in tbl.c:
assert select([tbl.c.int_y]).scalar() == 1
- assert list(tbl.select().execute().fetchone()).count(1) == 1
+ assert list(tbl.select().execute().first()).count(1) == 1
else:
- assert 1 not in list(tbl.select().execute().fetchone())
+ assert 1 not in list(tbl.select().execute().first())
finally:
meta.drop_all()
assert isinstance(t2.c.data.type, sqltypes.NVARCHAR)
data = u'm’a réveillé.'
t2.insert().execute(data=data)
- eq_(t2.select().execute().fetchone()['data'], data)
+ eq_(t2.select().execute().first()['data'], data)
finally:
metadata.drop_all()
t.create(engine)
try:
engine.execute(t.insert(), id=1, data='this is text', bindata='this is binary')
- row = engine.execute(t.select()).fetchone()
+ row = engine.execute(t.select()).first()
eq_(row['data'].read(), 'this is text')
eq_(row['bindata'].read(), 'this is binary')
finally:
somedate = testing.db.connect().scalar(func.current_timestamp().select())
tztable.insert().execute(id=1, name='row1', date=somedate)
c = tztable.update(tztable.c.id==1).execute(name='newname')
- print tztable.select(tztable.c.id==1).execute().fetchone()
+ print tztable.select(tztable.c.id==1).execute().first()
def test_without_timezone(self):
# get a date without a tzinfo
somedate = datetime.datetime(2005, 10,20, 11, 52, 00)
notztable.insert().execute(id=1, name='row1', date=somedate)
c = notztable.update(notztable.c.id==1).execute(name='newname')
- print notztable.select(tztable.c.id==1).execute().fetchone()
+ print notztable.select(tztable.c.id==1).execute().first()
class ArrayTest(TestBase, AssertsExecutionResults):
__only_on__ = 'postgresql'
connection = engine.connect()
s = select(["timestamp '2007-12-25'"])
- result = connection.execute(s).fetchone()
+ result = connection.execute(s).first()
eq_(result[0], datetime.datetime(2007, 12, 25, 0, 0))
class ServerSideCursorsTest(TestBase, AssertsExecutionResults):
l = sa.select([users, addresses],
sa.and_(users.c.id==addresses.c.user_id,
addresses.c.id==a.id)).execute()
- eq_(l.fetchone().values(),
+ eq_(l.first().values(),
[a.user.id, 'asdf8d', a.id, a.user_id, 'theater@foo.com'])
@testing.resolve_artifact_names
def test_insert_values(self):
t.insert(values={'col3':50}).execute()
l = t.select().execute()
- eq_(50, l.fetchone()['col3'])
+ eq_(50, l.first()['col3'])
@testing.fails_on('firebird', 'Data type unknown')
def test_updatemany(self):
t.update(t.c.col1==pk).execute(col4=None, col5=None)
ctexec = currenttime.scalar()
l = t.select(t.c.col1==pk).execute()
- l = l.fetchone()
+ l = l.first()
eq_(l,
(pk, 'im the update', f2, None, None, ctexec, True, False,
13, datetime.date.today(), 'py'))
pk = r.last_inserted_ids()[0]
t.update(t.c.col1==pk, values={'col3': 55}).execute()
l = t.select(t.c.col1==pk).execute()
- l = l.fetchone()
+ l = l.first()
eq_(55, l['col3'])
@testing.fails_on_everything_except('postgresql')
meta.create_all()
try:
t.insert(values=dict(value=func.length("one"))).execute()
- assert t.select().execute().fetchone()['value'] == 3
+ assert t.select().execute().first()['value'] == 3
t.update(values=dict(value=func.length("asfda"))).execute()
- assert t.select().execute().fetchone()['value'] == 5
+ assert t.select().execute().first()['value'] == 5
r = t.insert(values=dict(value=func.length("sfsaafsda"))).execute()
id = r.last_inserted_ids()[0]
- assert t.select(t.c.id==id).execute().fetchone()['value'] == 9
+ assert t.select(t.c.id==id).execute().first()['value'] == 9
t.update(values={t.c.value:func.length("asdf")}).execute()
- assert t.select().execute().fetchone()['value'] == 4
+ assert t.select().execute().first()['value'] == 4
print "--------------------------"
t2.insert().execute()
t2.insert(values=dict(value=func.length("one"))).execute()
t2.delete().execute()
t2.insert(values=dict(value=func.length("one") + 8)).execute()
- assert t2.select().execute().fetchone()['value'] == 11
+ assert t2.select().execute().first()['value'] == 11
t2.update(values=dict(value=func.length("asfda"))).execute()
- assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (5, "thisisstuff")
+ assert select([t2.c.value, t2.c.stuff]).execute().first() == (5, "thisisstuff")
t2.update(values={t2.c.value:func.length("asfdaasdf"), t2.c.stuff:"foo"}).execute()
- print "HI", select([t2.c.value, t2.c.stuff]).execute().fetchone()
- assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (9, "foo")
+ print "HI", select([t2.c.value, t2.c.stuff]).execute().first()
+ assert select([t2.c.value, t2.c.stuff]).execute().first() == (9, "foo")
finally:
meta.drop_all()
# construct a column-based FROM object out of a function, like in [ticket:172]
s = select([sql.column('date', type_=DateTime)], from_obj=[testing.db.func.current_date()])
- q = s.execute().fetchone()[s.c.date]
+ q = s.execute().first()[s.c.date]
r = s.alias('datequery').select().scalar()
assert x == y == z == w == q == r
'd': datetime.date(2010, 5, 1) })
rs = select([extract('year', table.c.dt),
extract('month', table.c.d)]).execute()
- row = rs.fetchone()
+ row = rs.first()
assert row[0] == 2010
assert row[1] == 5
rs.close()
unicode_table.insert().execute(unicode_varchar=unicodedata,unicode_text=unicodedata)
- x = unicode_table.select().execute().fetchone()
+ x = unicode_table.select().execute().first()
self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)
self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata)
dict(unicode_varchar=unicodedata,unicode_text=unicodedata)
)
- x = unicode_table.select().execute().fetchone()
+ x = unicode_table.select().execute().first()
self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)
self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata)
unicode_table.insert().execute(unicode_varchar=unicodedata,unicode_text=unicodedata)
- x = union(select([unicode_table.c.unicode_varchar]), select([unicode_table.c.unicode_varchar])).execute().fetchone()
+ x = union(select([unicode_table.c.unicode_varchar]), select([unicode_table.c.unicode_varchar])).execute().first()
self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)
@testing.fails_on('oracle', 'oracle converts empty strings to a blank space')
def test_roundtrip(self):
delta = datetime.datetime(2006, 10, 5) - datetime.datetime(2005, 8, 17)
interval_table.insert().execute(interval=delta)
- assert interval_table.select().execute().fetchone()['interval'] == delta
+ assert interval_table.select().execute().first()['interval'] == delta
def test_null(self):
interval_table.insert().execute(id=1, inverval=None)
- assert interval_table.select().execute().fetchone()['interval'] is None
+ assert interval_table.select().execute().first()['interval'] is None
class BooleanTest(TestBase, AssertsExecutionResults):
@classmethod