+import contextlib
import datetime
import uuid
from sqlalchemy.testing import config
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.assertsql import CompiledSQL
from sqlalchemy.testing.mock import patch
return uuid.uuid4().hex
+@contextlib.contextmanager
+def conditional_sane_rowcount_warnings(
+ update=False, delete=False, only_returning=False
+):
+ warnings = ()
+ if (
+ only_returning
+ and not testing.db.dialect.supports_sane_rowcount_returning
+ ) or (
+ not only_returning and not testing.db.dialect.supports_sane_rowcount
+ ):
+ if update:
+ warnings += (
+ "Dialect .* does not support updated rowcount - "
+ "versioning cannot be verified.",
+ )
+ if delete:
+ warnings += (
+ "Dialect .* does not support deleted rowcount - "
+ "versioning cannot be verified.",
+ )
+
+ with expect_warnings(*warnings):
+ yield
+ else:
+ yield
+
+
class NullVersionIdTest(fixtures.MappedTest):
__backend__ = True
s1.commit,
)
- @testing.emits_warning(r".*versioning cannot be verified")
def test_null_version_id_update(self):
Foo = self.classes.Foo
f1.value = "f1rev2"
f1.version_id = None
- assert_raises_message(
- sa.orm.exc.FlushError,
- "Instance does not contain a non-NULL version value",
- s1.commit,
- )
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ assert_raises_message(
+ sa.orm.exc.FlushError,
+ "Instance does not contain a non-NULL version value",
+ s1.commit,
+ )
class VersioningTest(fixtures.MappedTest):
finally:
testing.db.dialect.supports_sane_rowcount = save
- @testing.emits_warning(r".*versioning cannot be verified")
- @testing.requires.sane_rowcount_w_returning
def test_basic(self):
Foo = self.classes.Foo
s1.commit()
f1.value = "f1rev2"
- s1.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s1.commit()
s2 = create_session(autocommit=False)
f1_s = s2.query(Foo).get(f1.id)
f1_s.value = "f1rev3"
- s2.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s2.commit()
f1.value = "f1rev3mine"
# Only dialects with a sane rowcount can detect the
# StaleDataError
- if testing.db.dialect.supports_sane_rowcount:
+ if testing.db.dialect.supports_sane_rowcount_returning:
assert_raises_message(
sa.orm.exc.StaleDataError,
r"UPDATE statement on table 'version_table' expected "
),
s1.rollback()
else:
- s1.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s1.commit()
# new in 0.5 ! don't need to close the session
f1 = s1.query(Foo).get(f1.id)
f2 = s1.query(Foo).get(f2.id)
f1_s.value = "f1rev4"
- s2.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s2.commit()
s1.delete(f1)
s1.delete(f2)
- if testing.db.dialect.supports_sane_rowcount:
+ if testing.db.dialect.supports_sane_multi_rowcount:
assert_raises_message(
sa.orm.exc.StaleDataError,
r"DELETE statement on table 'version_table' expected "
s1.commit,
)
else:
- s1.commit()
+ with conditional_sane_rowcount_warnings(delete=True):
+ s1.commit()
- @testing.emits_warning(r".*versioning cannot be verified")
def test_multiple_updates(self):
Foo = self.classes.Foo
f1.value = "f1rev2"
f2.value = "f2rev2"
- s1.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s1.commit()
eq_(
s1.query(Foo.id, Foo.value, Foo.version_id).order_by(Foo.id).all(),
[(f1.id, "f1rev2", 2), (f2.id, "f2rev2", 2)],
)
- @testing.emits_warning(r".*versioning cannot be verified")
def test_bulk_insert(self):
Foo = self.classes.Foo
[(1, "f1", 1), (2, "f2", 1)],
)
- @testing.emits_warning(r".*versioning cannot be verified")
def test_bulk_update(self):
Foo = self.classes.Foo
s1.add_all((f1, f2))
s1.commit()
- s1.bulk_update_mappings(
- Foo,
- [
- {"id": f1.id, "value": "f1rev2", "version_id": 1},
- {"id": f2.id, "value": "f2rev2", "version_id": 1},
- ],
- )
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s1.bulk_update_mappings(
+ Foo,
+ [
+ {"id": f1.id, "value": "f1rev2", "version_id": 1},
+ {"id": f2.id, "value": "f2rev2", "version_id": 1},
+ ],
+ )
s1.commit()
eq_(
[(f1.id, "f1rev2", 2), (f2.id, "f2rev2", 2)],
)
- @testing.emits_warning(r".*versioning cannot be verified")
def test_bump_version(self):
"""test that version number can be bumped.
s1.commit()
eq_(f1.version_id, 1)
f1.version_id = 2
- s1.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s1.commit()
eq_(f1.version_id, 2)
# skip an id, test that history
# is honored
f1.version_id = 4
f1.value = "something new"
- s1.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s1.commit()
eq_(f1.version_id, 4)
f1.version_id = 5
s1.delete(f1)
- s1.commit()
+ with conditional_sane_rowcount_warnings(delete=True):
+ s1.commit()
eq_(s1.query(Foo).count(), 0)
- @testing.emits_warning(r".*versioning cannot be verified")
@engines.close_open_connections
def test_versioncheck(self):
"""query.with_lockmode performs a 'version check' on an already loaded
s2 = create_session(autocommit=False)
f1s2 = s2.query(Foo).get(f1s1.id)
f1s2.value = "f1 new value"
- s2.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s2.commit()
# load, version is wrong
assert_raises_message(
s1.close()
s1.query(Foo).with_for_update(read=True).get(f1s1.id)
- @testing.emits_warning(r".*versioning cannot be verified")
@engines.close_open_connections
def test_versioncheck_legacy(self):
"""query.with_lockmode performs a 'version check' on an already loaded
s2 = create_session(autocommit=False)
f1s2 = s2.query(Foo).get(f1s1.id)
f1s2.value = "f1 new value"
- s2.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s2.commit()
# load, version is wrong
assert_raises_message(
s1.commit()
s1.query(Foo).with_for_update(read=True).get(f1s1.id)
- @testing.emits_warning(r".*versioning cannot be verified")
@engines.close_open_connections
@testing.requires.update_nowait
def test_versioncheck_for_update(self):
)
s1.rollback()
- s2.commit()
+ with conditional_sane_rowcount_warnings(update=True):
+ s2.commit()
s1.refresh(f1s1, with_for_update={"nowait": True})
assert f1s1.version_id == f1s2.version_id
- @testing.emits_warning(r".*versioning cannot be verified")
@engines.close_open_connections
@testing.requires.update_nowait
def test_versioncheck_for_update_legacy(self):
)
s1.rollback()
- s2.commit()
+ with conditional_sane_rowcount_warnings(update=True):
+ s2.commit()
s1.refresh(f1s1, lockmode="update_nowait")
assert f1s1.version_id == f1s2.version_id
- @testing.emits_warning(r".*versioning cannot be verified")
def test_update_multi_missing_broken_multi_rowcount(self):
@util.memoized_property
def rowcount(self):
with patch.object(
config.db.dialect, "supports_sane_multi_rowcount", False
- ):
- with patch(
- "sqlalchemy.engine.result.ResultProxy.rowcount", rowcount
+ ), patch("sqlalchemy.engine.result.ResultProxy.rowcount", rowcount):
+
+ Foo = self.classes.Foo
+ s1 = self._fixture()
+ f1s1 = Foo(value="f1 value")
+ s1.add(f1s1)
+ s1.commit()
+
+ f1s1.value = "f2 value"
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
):
+ s1.flush()
+ eq_(f1s1.version_id, 2)
- Foo = self.classes.Foo
- s1 = self._fixture()
- f1s1 = Foo(value="f1 value")
- s1.add(f1s1)
- s1.commit()
+ def test_update_delete_no_plain_rowcount(self):
+
+ with patch.object(
+ config.db.dialect, "supports_sane_rowcount", False
+ ), patch.object(
+ config.db.dialect, "supports_sane_multi_rowcount", False
+ ):
+ Foo = self.classes.Foo
+ s1 = self._fixture()
+ f1s1 = Foo(value="f1 value")
+ s1.add(f1s1)
+ s1.commit()
- f1s1.value = "f2 value"
+ f1s1.value = "f2 value"
+
+ with expect_warnings(
+ "Dialect .* does not support updated rowcount - "
+ "versioning cannot be verified."
+ ):
+ s1.flush()
+ eq_(f1s1.version_id, 2)
+
+ s1.delete(f1s1)
+ with expect_warnings(
+ "Dialect .* does not support deleted rowcount - "
+ "versioning cannot be verified."
+ ):
s1.flush()
- eq_(f1s1.version_id, 2)
- @testing.emits_warning(r".*does not support updated rowcount")
@engines.close_open_connections
def test_noversioncheck(self):
"""test query.with_lockmode works when the mapper has no version id
assert f1s2.id == f1s1.id
assert f1s2.value == f1s1.value
- @testing.emits_warning(r".*versioning cannot be verified")
def test_merge_no_version(self):
Foo = self.classes.Foo
s1.commit()
f1.value = "f2"
- s1.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s1.commit()
f2 = Foo(id=f1.id, value="f3")
f3 = s1.merge(f2)
assert f3 is f1
- s1.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s1.commit()
eq_(f3.version_id, 3)
- @testing.emits_warning(r".*versioning cannot be verified")
def test_merge_correct_version(self):
Foo = self.classes.Foo
s1.commit()
f1.value = "f2"
- s1.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s1.commit()
f2 = Foo(id=f1.id, value="f3", version_id=2)
f3 = s1.merge(f2)
assert f3 is f1
- s1.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s1.commit()
eq_(f3.version_id, 3)
- @testing.emits_warning(r".*versioning cannot be verified")
def test_merge_incorrect_version(self):
Foo = self.classes.Foo
s1.commit()
f1.value = "f2"
- s1.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s1.commit()
f2 = Foo(id=f1.id, value="f3", version_id=1)
assert_raises_message(
f2,
)
- @testing.emits_warning(r".*versioning cannot be verified")
def test_merge_incorrect_version_not_in_session(self):
Foo = self.classes.Foo
s1.commit()
f1.value = "f2"
- s1.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s1.commit()
f2 = Foo(id=f1.id, value="f3", version_id=1)
s1.close()
s.flush()
return s, n1, n2
- @testing.emits_warning(r".*versioning cannot be verified")
def test_o2m_plain(self):
s, n1, n2 = self._fixture(o2m=True, post_update=False)
n1.related.append(n2)
- s.flush()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s.flush()
eq_(n1.version_id, 1)
eq_(n2.version_id, 2)
- @testing.emits_warning(r".*versioning cannot be verified")
def test_m2o_plain(self):
s, n1, n2 = self._fixture(o2m=False, post_update=False)
n1.related = n2
- s.flush()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s.flush()
eq_(n1.version_id, 2)
eq_(n2.version_id, 1)
- @testing.emits_warning(r".*versioning cannot be verified")
def test_o2m_post_update(self):
s, n1, n2 = self._fixture(o2m=True, post_update=True)
n1.related.append(n2)
- s.flush()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s.flush()
eq_(n1.version_id, 1)
eq_(n2.version_id, 2)
- @testing.emits_warning(r".*versioning cannot be verified")
def test_m2o_post_update(self):
s, n1, n2 = self._fixture(o2m=False, post_update=True)
n1.related = n2
- s.flush()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s.flush()
eq_(n1.version_id, 2)
eq_(n2.version_id, 1)
- @testing.emits_warning(r".*versioning cannot be verified")
def test_o2m_post_update_not_assoc_w_insert(self):
s, n1, n2 = self._fixture(o2m=True, post_update=True, insert=False)
n1.related.append(n2)
s.add_all([n1, n2])
- s.flush()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s.flush()
eq_(n1.version_id, 1)
eq_(n2.version_id, 1)
- @testing.emits_warning(r".*versioning cannot be verified")
def test_m2o_post_update_not_assoc_w_insert(self):
s, n1, n2 = self._fixture(o2m=False, post_update=True, insert=False)
n1.related = n2
s.add_all([n1, n2])
- s.flush()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s.flush()
eq_(n1.version_id, 1)
eq_(n2.version_id, 1)
s.flush,
)
+ def test_o2m_post_update_no_sane_rowcount(self):
+ Node = self.classes.Node
+ s, n1, n2 = self._fixture(o2m=True, post_update=True)
+
+ n1.related.append(n2)
+
+ with patch.object(
+ config.db.dialect, "supports_sane_rowcount", False
+ ), patch.object(
+ config.db.dialect, "supports_sane_multi_rowcount", False
+ ):
+ s2 = Session(bind=s.connection(Node))
+ s2.query(Node).filter(Node.id == n2.id).update({"version_id": 3})
+ s2.commit()
+
+ with expect_warnings(
+ "Dialect .* does not support updated rowcount - "
+ "versioning cannot be verified."
+ ):
+ s.flush()
+
@testing.requires.sane_rowcount_w_returning
def test_m2o_post_update_version_assert(self):
Node = self.classes.Node
s1 = Session()
return s1
- @testing.emits_warning(r".*versioning cannot be verified")
@engines.close_open_connections
def test_update(self):
Foo = self.classes.Foo
s1.commit()
f1.value = "f1rev2"
- s1.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s1.commit()
class RowSwitchTest(fixtures.MappedTest):
)
mapper(C, c, version_id_col=c.c.version_id)
- @testing.emits_warning(r".*versioning cannot be verified")
def test_row_switch(self):
P = self.classes.P
p = session.query(P).first()
session.delete(p)
session.add(P(id="P1", data="really a row-switch"))
- session.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ session.commit()
- @testing.emits_warning(r".*versioning cannot be verified")
def test_child_row_switch(self):
P, C = self.classes.P, self.classes.C
p = session.query(P).first()
p.c = C(data="child row-switch")
- session.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ session.commit()
class AlternateGeneratorTest(fixtures.MappedTest):
version_id_generator=lambda x: make_uuid(),
)
- @testing.emits_warning(r".*versioning cannot be verified")
def test_row_switch(self):
P = self.classes.P
p = session.query(P).first()
session.delete(p)
session.add(P(id="P1", data="really a row-switch"))
- session.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ session.commit()
- @testing.emits_warning(r".*versioning cannot be verified")
def test_child_row_switch_one(self):
P, C = self.classes.P, self.classes.C
p = session.query(P).first()
p.c = C(data="child row-switch")
- session.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ session.commit()
- @testing.emits_warning(r".*versioning cannot be verified")
@testing.requires.sane_rowcount_w_returning
def test_child_row_switch_two(self):
P = self.classes.P
sess2.commit,
)
else:
- sess2.commit
+ sess2.commit()
class PlainInheritanceTest(fixtures.MappedTest):
class Sub(Base):
pass
- @testing.emits_warning(r".*versioning cannot be verified")
def test_update_child_table_only(self):
Base, sub, base, Sub = (
self.classes.Base,
s.commit()
s1.sub_data = "s2"
- s.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s.commit()
eq_(s1.version_id, 2)
def test_update_col_eager_defaults(self):
self._test_update_col(eager_defaults=True)
- @testing.emits_warning(r".*versioning cannot be verified")
def _test_update_col(self, **kw):
sess = self._fixture(**kw)
lambda ctx: [{"param_1": 1}],
)
)
- self.assert_sql_execution(testing.db, sess.flush, *statements)
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ self.assert_sql_execution(testing.db, sess.flush, *statements)
- @testing.emits_warning(r".*versioning cannot be verified")
@testing.requires.updateable_autoincrement_pks
def test_sql_expr_bump(self):
sess = self._fixture()
f1.id = self.classes.Foo.id + 0
- sess.flush()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ sess.flush()
eq_(f1.version_id, 2)
- @testing.emits_warning(r".*versioning cannot be verified")
@testing.requires.updateable_autoincrement_pks
@testing.requires.returning
def test_sql_expr_w_mods_bump(self):
f1.id = self.classes.Foo.id + 3
- sess.flush()
+ with conditional_sane_rowcount_warnings(update=True):
+ sess.flush()
eq_(f1.id, 5)
eq_(f1.version_id, 2)
- @testing.emits_warning(r".*versioning cannot be verified")
def test_multi_update(self):
sess = self._fixture()
),
]
)
- self.assert_sql_execution(testing.db, sess.flush, *statements)
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ self.assert_sql_execution(testing.db, sess.flush, *statements)
def test_delete_col(self):
sess = self._fixture()
lambda ctx: [{"id": 1, "version_id": 1}],
)
]
- self.assert_sql_execution(testing.db, sess.flush, *statements)
+ with conditional_sane_rowcount_warnings(delete=True):
+ self.assert_sql_execution(testing.db, sess.flush, *statements)
@testing.requires.sane_rowcount_w_returning
def test_concurrent_mod_err_expire_on_commit(self):
eq_(a1.vid, 1)
- @testing.emits_warning(r".*versioning cannot be verified")
def test_update(self):
sess = Session()
a1 = self.classes.A()
a1.vid = 2
a1.data = "d2"
- sess.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ sess.commit()
eq_(a1.vid, 2)
a1.data = "d2"
assert_raises(orm_exc.StaleDataError, sess.commit)
- @testing.emits_warning(r".*versioning cannot be verified")
def test_update_version_conditional(self):
sess = Session()
a1 = self.classes.A()
# change the data and UPDATE without
# incrementing version id
a1.data = "d2"
- sess.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ sess.commit()
eq_(a1.vid, 1)
a1.data = "d3"
a1.vid = 2
- sess.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ sess.commit()
eq_(a1.vid, 2)
mapper(cls.classes.B, cls.tables.b, inherits=cls.classes.A)
- @testing.emits_warning(r".*versioning cannot be verified")
def test_no_increment(self):
sess = Session()
b1 = self.classes.B()
# change col on subtable only without
# incrementing version id
b1.b_data = "bd2"
- sess.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ sess.commit()
eq_(b1.vid, 1)
b1.b_data = "d3"
b1.vid = 2
- sess.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ sess.commit()
eq_(b1.vid, 2)
s1 = Session()
return s1
- @testing.emits_warning(r".*versioning cannot be verified")
def test_implicit(self):
Foo = self.classes.Foo
f1.value = "f1rev2"
f2.value = "f2rev2"
- s1.commit()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s1.commit()
eq_(
s1.query(Foo.id, Foo.value, Foo.version_id).order_by(Foo.id).all(),
[(f1.id, "f1rev2", 2), (f2.id, "f2rev2", 2)],
)
- @testing.emits_warning(r".*versioning cannot be verified")
def test_explicit(self):
Foo = self.classes.Foo
f1.version_id = 2
f2.value = "f2rev2"
f2.version_id = 2
- s1.flush()
+ with conditional_sane_rowcount_warnings(
+ update=True, only_returning=True
+ ):
+ s1.flush()
eq_(
s1.query(Foo.id, Foo.value, Foo.version_id).order_by(Foo.id).all(),