Column('address', String(30)))
metadata.create_all()
- def setUp(self):
- self.users = users
def tearDown(self):
- self.users.delete().execute()
+ users.delete().execute()
def tearDownAll(self):
metadata.drop_all()
def testinsert(self):
- self.users.insert().execute(user_id = 7, user_name = 'jack')
- print repr(self.users.select().execute().fetchall())
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ assert users.count().scalar() == 1
def testupdate(self):
- self.users.insert().execute(user_id = 7, user_name = 'jack')
- print repr(self.users.select().execute().fetchall())
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ assert users.count().scalar() == 1
- self.users.update(self.users.c.user_id == 7).execute(user_name = 'fred')
- print repr(self.users.select().execute().fetchall())
+ users.update(users.c.user_id == 7).execute(user_name = 'fred')
+ assert users.select(users.c.user_id==7).execute().fetchone()['user_name'] == 'fred'
def testrowiteration(self):
- self.users.insert().execute(user_id = 7, user_name = 'jack')
- self.users.insert().execute(user_id = 8, user_name = 'ed')
- self.users.insert().execute(user_id = 9, user_name = 'fred')
- r = self.users.select().execute()
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ users.insert().execute(user_id = 8, user_name = 'ed')
+ users.insert().execute(user_id = 9, user_name = 'fred')
+ r = users.select().execute()
l = []
for row in r:
l.append(row)
self.assert_(len(l) == 3)
def test_fetchmany(self):
- self.users.insert().execute(user_id = 7, user_name = 'jack')
- self.users.insert().execute(user_id = 8, user_name = 'ed')
- self.users.insert().execute(user_id = 9, user_name = 'fred')
- r = self.users.select().execute()
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ users.insert().execute(user_id = 8, user_name = 'ed')
+ users.insert().execute(user_id = 9, user_name = 'fred')
+ r = users.select().execute()
l = []
for row in r.fetchmany(size=2):
l.append(row)
self.assert_(len(l) == 2, "fetchmany(size=2) got %s rows" % len(l))
def test_compiled_execute(self):
- s = select([self.users], self.users.c.user_id==bindparam('id')).compile()
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ s = select([users], users.c.user_id==bindparam('id')).compile()
c = testbase.db.connect()
- print repr(c.execute(s, id=7).fetchall())
-
- def test_global_metadata(self):
- t1 = Table('table1', Column('col1', Integer, primary_key=True),
- Column('col2', String(20)))
- t2 = Table('table2', Column('col1', Integer, primary_key=True),
- Column('col2', String(20)))
-
- assert t1.c.col1
- global_connect(testbase.db)
- default_metadata.create_all()
- try:
- assert t1.count().scalar() == 0
- finally:
- default_metadata.drop_all()
- default_metadata.clear()
-
- @testbase.supported('postgres')
- def testpassiveoverride(self):
- """primarily for postgres, tests that when we get a primary key column back
- from reflecting a table which has a default value on it, we pre-execute
- that PassiveDefault upon insert, even though PassiveDefault says
- "let the database execute this", because in postgres we must have all the primary
- key values in memory before insert; otherwise we cant locate the just inserted row."""
- try:
- meta = BoundMetaData(testbase.db)
- testbase.db.execute("""
- CREATE TABLE speedy_users
- (
- speedy_user_id SERIAL PRIMARY KEY,
-
- user_name VARCHAR NOT NULL,
- user_password VARCHAR NOT NULL
- );
- """, None)
-
- t = Table("speedy_users", meta, autoload=True)
- t.insert().execute(user_name='user', user_password='lala')
- l = t.select().execute().fetchall()
- self.assert_(l == [(1, 'user', 'lala')])
- finally:
- testbase.db.execute("drop table speedy_users", None)
+ assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7
@testbase.supported('postgres')
def testschema(self):
def test_repeated_bindparams(self):
"""test that a BindParam can be used more than once.
this should be run for dbs with both positional and named paramstyles."""
- self.users.insert().execute(user_id = 7, user_name = 'jack')
- self.users.insert().execute(user_id = 8, user_name = 'fred')
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ users.insert().execute(user_id = 8, user_name = 'fred')
u = bindparam('userid')
- s = self.users.select(or_(self.users.c.user_name==u, self.users.c.user_name==u))
+ s = users.select(or_(users.c.user_name==u, users.c.user_name==u))
r = s.execute(userid='fred').fetchall()
assert len(r) == 1
def test_bindparam_shortname(self):
"""test the 'shortname' field on BindParamClause."""
- self.users.insert().execute(user_id = 7, user_name = 'jack')
- self.users.insert().execute(user_id = 8, user_name = 'fred')
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ users.insert().execute(user_id = 8, user_name = 'fred')
u = bindparam('userid', shortname='someshortname')
- s = self.users.select(self.users.c.user_name==u)
+ s = users.select(users.c.user_name==u)
r = s.execute(someshortname='fred').fetchall()
assert len(r) == 1
def testdelete(self):
- self.users.insert().execute(user_id = 7, user_name = 'jack')
- self.users.insert().execute(user_id = 8, user_name = 'fred')
- print repr(self.users.select().execute().fetchall())
+ users.insert().execute(user_id = 7, user_name = 'jack')
+ users.insert().execute(user_id = 8, user_name = 'fred')
+ print repr(users.select().execute().fetchall())
- self.users.delete(self.users.c.user_name == 'fred').execute()
+ users.delete(users.c.user_name == 'fred').execute()
- print repr(self.users.select().execute().fetchall())
+ print repr(users.select().execute().fetchall())
def testselectlimit(self):
- self.users.insert().execute(user_id=1, user_name='john')
- self.users.insert().execute(user_id=2, user_name='jack')
- self.users.insert().execute(user_id=3, user_name='ed')
- self.users.insert().execute(user_id=4, user_name='wendy')
- self.users.insert().execute(user_id=5, user_name='laura')
- self.users.insert().execute(user_id=6, user_name='ralph')
- self.users.insert().execute(user_id=7, user_name='fido')
- r = self.users.select(limit=3, order_by=[self.users.c.user_id]).execute().fetchall()
+ users.insert().execute(user_id=1, user_name='john')
+ users.insert().execute(user_id=2, user_name='jack')
+ users.insert().execute(user_id=3, user_name='ed')
+ users.insert().execute(user_id=4, user_name='wendy')
+ users.insert().execute(user_id=5, user_name='laura')
+ users.insert().execute(user_id=6, user_name='ralph')
+ users.insert().execute(user_id=7, user_name='fido')
+ r = users.select(limit=3, order_by=[users.c.user_id]).execute().fetchall()
self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r))
@testbase.unsupported('mssql')
def testselectlimitoffset(self):
- self.users.insert().execute(user_id=1, user_name='john')
- self.users.insert().execute(user_id=2, user_name='jack')
- self.users.insert().execute(user_id=3, user_name='ed')
- self.users.insert().execute(user_id=4, user_name='wendy')
- self.users.insert().execute(user_id=5, user_name='laura')
- self.users.insert().execute(user_id=6, user_name='ralph')
- self.users.insert().execute(user_id=7, user_name='fido')
- r = self.users.select(limit=3, offset=2, order_by=[self.users.c.user_id]).execute().fetchall()
+ users.insert().execute(user_id=1, user_name='john')
+ users.insert().execute(user_id=2, user_name='jack')
+ users.insert().execute(user_id=3, user_name='ed')
+ users.insert().execute(user_id=4, user_name='wendy')
+ users.insert().execute(user_id=5, user_name='laura')
+ users.insert().execute(user_id=6, user_name='ralph')
+ users.insert().execute(user_id=7, user_name='fido')
+ r = users.select(limit=3, offset=2, order_by=[users.c.user_id]).execute().fetchall()
self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')])
- r = self.users.select(offset=5, order_by=[self.users.c.user_id]).execute().fetchall()
+ r = users.select(offset=5, order_by=[users.c.user_id]).execute().fetchall()
self.assert_(r==[(6, 'ralph'), (7, 'fido')])
@testbase.supported('mssql')
def testselectlimitoffset_mssql(self):
try:
- r = self.users.select(limit=3, offset=2, order_by=[self.users.c.user_id]).execute().fetchall()
+ r = users.select(limit=3, offset=2, order_by=[users.c.user_id]).execute().fetchall()
assert False # InvalidRequestError should have been raised
except exceptions.InvalidRequestError:
pass
datetable.drop()
def test_column_accessor(self):
- self.users.insert().execute(user_id=1, user_name='john')
- self.users.insert().execute(user_id=2, user_name='jack')
- r = self.users.select(self.users.c.user_id==2).execute().fetchone()
- self.assert_(r.user_id == r['user_id'] == r[self.users.c.user_id] == 2)
- self.assert_(r.user_name == r['user_name'] == r[self.users.c.user_name] == 'jack')
+ users.insert().execute(user_id=1, user_name='john')
+ users.insert().execute(user_id=2, user_name='jack')
+ r = users.select(users.c.user_id==2).execute().fetchone()
+ self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
+ self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
r = text("select * from query_users where user_id=2", engine=testbase.db).execute().fetchone()
- self.assert_(r.user_id == r['user_id'] == r[self.users.c.user_id] == 2)
- self.assert_(r.user_name == r['user_name'] == r[self.users.c.user_name] == 'jack')
+ self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
+ self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
def test_keys(self):
- self.users.insert().execute(user_id=1, user_name='foo')
- r = self.users.select().execute().fetchone()
+ users.insert().execute(user_id=1, user_name='foo')
+ r = users.select().execute().fetchone()
self.assertEqual([x.lower() for x in r.keys()], ['user_id', 'user_name'])
def test_items(self):
- self.users.insert().execute(user_id=1, user_name='foo')
- r = self.users.select().execute().fetchone()
+ users.insert().execute(user_id=1, user_name='foo')
+ r = users.select().execute().fetchone()
self.assertEqual([(x[0].lower(), x[1]) for x in r.items()], [('user_id', 1), ('user_name', 'foo')])
def test_len(self):
- self.users.insert().execute(user_id=1, user_name='foo')
- r = self.users.select().execute().fetchone()
+ users.insert().execute(user_id=1, user_name='foo')
+ r = users.select().execute().fetchone()
self.assertEqual(len(r), 2)
r.close()
r = testbase.db.execute('select user_name, user_id from query_users', {}).fetchone()
def test_column_order_with_simple_query(self):
# should return values in column definition order
- self.users.insert().execute(user_id=1, user_name='foo')
- r = self.users.select(self.users.c.user_id==1).execute().fetchone()
+ users.insert().execute(user_id=1, user_name='foo')
+ r = users.select(users.c.user_id==1).execute().fetchone()
self.assertEqual(r[0], 1)
self.assertEqual(r[1], 'foo')
self.assertEqual([x.lower() for x in r.keys()], ['user_id', 'user_name'])
def test_column_order_with_text_query(self):
# should return values in query order
- self.users.insert().execute(user_id=1, user_name='foo')
+ users.insert().execute(user_id=1, user_name='foo')
r = testbase.db.execute('select user_name, user_id from query_users', {}).fetchone()
self.assertEqual(r[0], 'foo')
self.assertEqual(r[1], 1)