elif len(multiparams) == 1:
zero = multiparams[0]
if isinstance(zero, (list, tuple)):
- if not zero or isinstance(zero[0], (list, tuple, dict)):
+ if not zero or hasattr(zero[0], '__iter__'):
return zero
else:
return [zero]
- elif isinstance(zero, dict):
+ elif hasattr(zero, 'keys'):
return [zero]
else:
return [[zero]]
else:
- if isinstance(multiparams[0], (list, tuple, dict)):
+ if hasattr(multiparams[0], '__iter__'):
return multiparams
else:
return [multiparams]
class QueryTest(TestBase):
def setUpAll(self):
- global users, addresses, metadata
+ global users, users2, addresses, metadata
metadata = MetaData(testing.db)
users = Table('query_users', metadata,
Column('user_id', INT, primary_key = True),
Column('address_id', Integer, primary_key=True),
Column('user_id', Integer, ForeignKey('query_users.user_id')),
Column('address', String(30)))
+
+ users2 = Table('u2', metadata,
+ Column('user_id', INT, primary_key = True),
+ Column('user_name', VARCHAR(20)),
+ )
metadata.create_all()
def tearDown(self):
addresses.delete().execute()
users.delete().execute()
+ users2.delete().execute()
def tearDownAll(self):
metadata.drop_all()
self.assert_(r['query_users.user_id']) == 1
self.assert_(r['query_users.user_name']) == "john"
-
+ def test_row_as_args(self):
+ users.insert().execute(user_id=1, user_name='john')
+ r = users.select(users.c.user_id==1).execute().fetchone()
+ users.delete().execute()
+ users.insert().execute(r)
+ assert users.select().execute().fetchall() == [(1, 'john')]
+
+ def test_result_as_args(self):
+ users.insert().execute([dict(user_id=1, user_name='john'), dict(user_id=2, user_name='ed')])
+ r = users.select().execute()
+ users2.insert().execute(list(r))
+ assert users2.select().execute().fetchall() == [(1, 'john'), (2, 'ed')]
+
+ users2.delete().execute()
+ r = users.select().execute()
+ users2.insert().execute(*list(r))
+ assert users2.select().execute().fetchall() == [(1, 'john'), (2, 'ed')]
+
def test_ambiguous_column(self):
users.insert().execute(user_id=1, user_name='john')
r = users.outerjoin(addresses).select().execute().fetchone()