"Instance does not contain a non-NULL version value",
s1.commit)
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_null_version_id_update(self):
Foo = self.classes.Foo
finally:
testing.db.dialect.supports_sane_rowcount = save
- @testing.emits_warning_on(
- '+zxjdbc', r'.*does not support (update|delete)d rowcount')
+ @testing.emits_warning(r".*versioning cannot be verified")
+ @testing.requires.sane_rowcount_w_returning
def test_basic(self):
Foo = self.classes.Foo
else:
s1.commit()
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_multiple_updates(self):
Foo = self.classes.Foo
[(f1.id, 'f1rev2', 2), (f2.id, 'f2rev2', 2)]
)
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_bulk_insert(self):
Foo = self.classes.Foo
[(1, 'f1', 1), (2, 'f2', 1)]
)
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_bulk_update(self):
Foo = self.classes.Foo
[(f1.id, 'f1rev2', 2), (f2.id, 'f2rev2', 2)]
)
- @testing.emits_warning_on(
- '+zxjdbc', r'.*does not support (update|delete)d rowcount')
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_bump_version(self):
"""test that version number can be bumped.
s1.commit()
eq_(s1.query(Foo).count(), 0)
- @testing.emits_warning(r'.*does not support updated rowcount')
+ @testing.emits_warning(r".*versioning cannot be verified")
@engines.close_open_connections
def test_versioncheck(self):
"""query.with_lockmode performs a 'version check' on an already loaded
s1.close()
s1.query(Foo).with_for_update(read=True).get(f1s1.id)
- @testing.emits_warning(r'.*does not support updated rowcount')
+ @testing.emits_warning(r".*versioning cannot be verified")
@engines.close_open_connections
def test_versioncheck_legacy(self):
"""query.with_lockmode performs a 'version check' on an already loaded
s1.commit()
s1.query(Foo).with_lockmode('read').get(f1s1.id)
- @testing.emits_warning(r'.*does not support updated rowcount')
+ @testing.emits_warning(r".*versioning cannot be verified")
@engines.close_open_connections
@testing.requires.update_nowait
def test_versioncheck_for_update(self):
s1.refresh(f1s1, with_for_update={"nowait": True})
assert f1s1.version_id == f1s2.version_id
- @testing.emits_warning(r'.*does not support updated rowcount')
+ @testing.emits_warning(r".*versioning cannot be verified")
@engines.close_open_connections
@testing.requires.update_nowait
def test_versioncheck_for_update_legacy(self):
s1.refresh(f1s1, lockmode='update_nowait')
assert f1s1.version_id == f1s2.version_id
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_update_multi_missing_broken_multi_rowcount(self):
@util.memoized_property
def rowcount(self):
assert f1s2.id == f1s1.id
assert f1s2.value == f1s1.value
- @testing.emits_warning_on(
- '+zxjdbc', r'.*does not support updated rowcount')
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_merge_no_version(self):
Foo = self.classes.Foo
s1.commit()
eq_(f3.version_id, 3)
- @testing.emits_warning_on(
- '+zxjdbc', r'.*does not support updated rowcount')
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_merge_correct_version(self):
Foo = self.classes.Foo
s1.commit()
eq_(f3.version_id, 3)
- @testing.emits_warning_on(
- '+zxjdbc', r'.*does not support updated rowcount')
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_merge_incorrect_version(self):
Foo = self.classes.Foo
s1.merge, f2
)
- @testing.emits_warning_on(
- '+zxjdbc', r'.*does not support updated rowcount')
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_merge_incorrect_version_not_in_session(self):
Foo = self.classes.Foo
s.flush()
return s, n1, n2
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_o2m_plain(self):
s, n1, n2 = self._fixture(o2m=True, post_update=False)
eq_(n1.version_id, 1)
eq_(n2.version_id, 2)
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_m2o_plain(self):
s, n1, n2 = self._fixture(o2m=False, post_update=False)
eq_(n1.version_id, 2)
eq_(n2.version_id, 1)
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_o2m_post_update(self):
s, n1, n2 = self._fixture(o2m=True, post_update=True)
eq_(n1.version_id, 1)
eq_(n2.version_id, 2)
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_m2o_post_update(self):
s, n1, n2 = self._fixture(o2m=False, post_update=True)
eq_(n1.version_id, 2)
eq_(n2.version_id, 1)
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_o2m_post_update_not_assoc_w_insert(self):
s, n1, n2 = self._fixture(o2m=True, post_update=True, insert=False)
eq_(n1.version_id, 1)
eq_(n2.version_id, 1)
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_m2o_post_update_not_assoc_w_insert(self):
s, n1, n2 = self._fixture(o2m=False, post_update=True, insert=False)
eq_(n1.version_id, 1)
eq_(n2.version_id, 1)
+ @testing.requires.sane_rowcount_w_returning
def test_o2m_post_update_version_assert(self):
Node = self.classes.Node
s, n1, n2 = self._fixture(o2m=True, post_update=True)
s.flush
)
+ @testing.requires.sane_rowcount_w_returning
def test_m2o_post_update_version_assert(self):
Node = self.classes.Node
s1 = Session()
return s1
+ @testing.emits_warning(r".*versioning cannot be verified")
@engines.close_open_connections
def test_update(self):
Foo = self.classes.Foo
C, uselist=False, cascade='all, delete-orphan')})
mapper(C, c, version_id_col=c.c.version_id)
- @testing.emits_warning_on(
- '+zxjdbc', r'.*does not support updated rowcount')
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_row_switch(self):
P = self.classes.P
session.add(P(id='P1', data="really a row-switch"))
session.commit()
- @testing.emits_warning_on(
- '+zxjdbc', r'.*does not support updated rowcount')
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_child_row_switch(self):
P, C = self.classes.P, self.classes.C
version_id_generator=lambda x: make_uuid(),
)
- @testing.emits_warning_on(
- '+zxjdbc', r'.*does not support updated rowcount')
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_row_switch(self):
P = self.classes.P
session.add(P(id='P1', data="really a row-switch"))
session.commit()
- @testing.emits_warning_on(
- '+zxjdbc', r'.*does not support (update|delete)d rowcount')
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_child_row_switch_one(self):
P, C = self.classes.P, self.classes.C
p.c = C(data='child row-switch')
session.commit()
- @testing.emits_warning_on(
- '+zxjdbc', r'.*does not support (update|delete)d rowcount')
+ @testing.emits_warning(r".*versioning cannot be verified")
+ @testing.requires.sane_rowcount_w_returning
def test_child_row_switch_two(self):
P = self.classes.P
class Sub(Base):
pass
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_update_child_table_only(self):
Base, sub, base, Sub = (
self.classes.Base, self.tables.sub, self.tables.base,
def test_update_col_eager_defaults(self):
self._test_update_col(eager_defaults=True)
+ @testing.emits_warning(r".*versioning cannot be verified")
def _test_update_col(self, **kw):
sess = self._fixture(**kw)
)
self.assert_sql_execution(testing.db, sess.flush, *statements)
+ @testing.emits_warning(r".*versioning cannot be verified")
+ @testing.requires.updateable_autoincrement_pks
def test_sql_expr_bump(self):
sess = self._fixture()
eq_(f1.version_id, 2)
+ @testing.emits_warning(r".*versioning cannot be verified")
+ @testing.requires.updateable_autoincrement_pks
@testing.requires.returning
def test_sql_expr_w_mods_bump(self):
sess = self._fixture()
eq_(f1.id, 5)
eq_(f1.version_id, 2)
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_multi_update(self):
sess = self._fixture()
]
self.assert_sql_execution(testing.db, sess.flush, *statements)
+ @testing.requires.sane_rowcount_w_returning
def test_concurrent_mod_err_expire_on_commit(self):
sess = self._fixture()
sess.commit
)
+ @testing.requires.sane_rowcount_w_returning
def test_concurrent_mod_err_noexpire_on_commit(self):
sess = self._fixture(expire_on_commit=False)
eq_(a1.vid, 1)
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_update(self):
sess = Session()
a1 = self.classes.A()
eq_(a1.vid, 2)
+ @testing.requires.sane_rowcount_w_returning
def test_update_concurrent_check(self):
sess = Session()
a1 = self.classes.A()
sess.commit
)
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_update_version_conditional(self):
sess = Session()
a1 = self.classes.A()
mapper(
cls.classes.B, cls.tables.b, inherits=cls.classes.A)
+ @testing.emits_warning(r".*versioning cannot be verified")
def test_no_increment(self):
sess = Session()
b1 = self.classes.B()