except tsa.exc.DBAPIError:
assert True
- @testing.fails_on('mssql', 'rowcount returns -1')
+ @testing.fails_if(lambda: not testing.db.dialect.supports_sane_rowcount)
def test_empty_insert(self):
"""test that execute() interprets [] as a list with no params"""
result = testing.db.execute(users.insert().values(user_name=bindparam('name')), [])
try:
sess2.flush()
- assert False
+ assert not testing.db.dialect.supports_sane_rowcount
except orm_exc.ConcurrentModificationError, e:
assert True
sess2.refresh(s2)
- assert s2.subdata == 'sess1 subdata'
+ if testing.db.dialect.supports_sane_rowcount:
+ assert s2.subdata == 'sess1 subdata'
s2.subdata = 'sess2 subdata'
sess2.flush()
try:
s1.subdata = 'some new subdata'
sess.flush()
- assert False
+ assert not testing.db.dialect.supports_sane_rowcount
except orm_exc.ConcurrentModificationError, e:
assert True
eq_([john.age, jack.age, jill.age, jane.age], [25,37,29,27])
eq_(sess.query(User.age).order_by(User.id).all(), zip([25,37,29,27]))
+ @testing.fails_if(lambda: not testing.db.dialect.supports_sane_rowcount)
@testing.resolve_artifact_names
def test_update_returns_rowcount(self):
sess = create_session(bind=testing.db, autocommit=False)
rowcount = sess.query(User).filter(User.age > 29).update({'age': User.age - 10})
eq_(rowcount, 2)
+ @testing.fails_if(lambda: not testing.db.dialect.supports_sane_rowcount)
@testing.resolve_artifact_names
def test_delete_returns_rowcount(self):
sess = create_session(bind=testing.db, autocommit=False)