col = mapper.version_id_col
params[col._label] = update_version_id
- if col.key not in params and \
+ if (bulk or col.key not in params) and \
mapper.version_id_generator is not False:
val = mapper.version_id_generator(update_version_id)
params[col.key] = val
[(f1.id, 'f1rev2', 2), (f2.id, 'f2rev2', 2)]
)
+ def test_bulk_insert(self):
+ Foo = self.classes.Foo
+
+ s1 = self._fixture()
+ s1.bulk_insert_mappings(
+ Foo,
+ [{"id": 1, "value": "f1"}, {"id": 2, "value": "f2"}]
+ )
+ eq_(
+ s1.query(Foo.id, Foo.value, Foo.version_id).order_by(Foo.id).all(),
+ [(1, 'f1', 1), (2, 'f2', 1)]
+ )
+
+ def test_bulk_update(self):
+ Foo = self.classes.Foo
+
+ s1 = self._fixture()
+ f1 = Foo(value='f1')
+ f2 = Foo(value='f2')
+ s1.add_all((f1, f2))
+ s1.commit()
+
+ s1.bulk_update_mappings(
+ Foo,
+ [
+ {"id": f1.id, "value": "f1rev2", "version_id": 1},
+ {"id": f2.id, "value": "f2rev2", "version_id": 1},
+
+ ]
+ )
+ s1.commit()
+
+ eq_(
+ s1.query(Foo.id, Foo.value, Foo.version_id).order_by(Foo.id).all(),
+ [(f1.id, 'f1rev2', 2), (f2.id, 'f2rev2', 2)]
+ )
+
@testing.emits_warning_on(
'+zxjdbc', r'.*does not support (update|delete)d rowcount')
def test_bump_version(self):