unless we know it was a PK switch that
triggered the change. [ticket:1856]
+ - The value of version_id_col can be changed
+ manually, and this will result in an UPDATE
+ of the row. Versioned UPDATEs and DELETEs
+ now use the "committed" value of the
+ version_id_col in the WHERE clause and
+ not the pending changed value. The
+ version generator is also bypassed if
+ manual changes are present on the attribute.
+ [ticket:1857]
+
- sql
- Changed the scheme used to generate truncated
"auto" index names when using the "index=True"
for col in mapper._cols_by_table[table]:
if col is mapper.version_id_col:
params[col._label] = \
- mapper._get_state_attr_by_column(
+ mapper._get_committed_state_attr_by_column(
row_switch or state,
row_switch and row_switch.dict
or state_dict,
col)
- params[col.key] = \
+
+ prop = mapper._columntoproperty[col]
+ history = attributes.get_state_history(
+ state, prop.key, passive=True
+ )
+ if history.added:
+ params[col.key] = history.added[0]
+ hasdata = True
+ else:
+ params[col.key] = \
mapper.version_id_generator(
params[col._label])
- # HACK: check for history, in case the
- # history is only
- # in a different table than the one
- # where the version_id_col is.
- for prop in mapper._columntoproperty.itervalues():
- history = attributes.get_state_history(
- state, prop.key, passive=True)
- if history.added:
- hasdata = True
+ # HACK: check for history, in case the
+ # history is only
+ # in a different table than the one
+ # where the version_id_col is.
+ for prop in mapper._columntoproperty.\
+ itervalues():
+ history = attributes.get_state_history(
+ state, prop.key,
+ passive=True)
+ if history.added:
+ hasdata = True
elif mapper.polymorphic_on is not None and \
mapper.polymorphic_on.shares_lineage(col) and \
col not in pks:
if mapper.version_id_col is not None and \
table.c.contains_column(mapper.version_id_col):
params[mapper.version_id_col.key] = \
- mapper._get_state_attr_by_column(
+ mapper._get_committed_state_attr_by_column(
state, state_dict,
mapper.version_id_col)
'after_delete'])
@testing.resolve_artifact_names
- def test_after_with_no_changes(self):
- """after_update is called even if no columns were updated."""
+ def test_before_after_only_collection(self):
+ """before_update is called on parent for collection modifications,
+ after_update is called even if no columns were updated.
+
+ """
- Ext, methods = self.extension()
+ Ext1, methods1 = self.extension()
+ Ext2, methods2 = self.extension()
- mapper(Item, items, extension=Ext() , properties={
+ mapper(Item, items, extension=Ext1() , properties={
'keywords': relationship(Keyword, secondary=item_keywords)})
- mapper(Keyword, keywords, extension=Ext())
+ mapper(Keyword, keywords, extension=Ext2())
sess = create_session()
i1 = Item(description="i1")
sess.add(i1)
sess.add(k1)
sess.flush()
- eq_(methods,
- ['instrument_class', 'instrument_class', 'init_instance',
- 'init_instance', 'before_insert', 'after_insert',
- 'before_insert', 'after_insert'])
-
- del methods[:]
+ eq_(methods1,
+ ['instrument_class', 'init_instance',
+ 'before_insert', 'after_insert'])
+ eq_(methods2,
+ ['instrument_class', 'init_instance',
+ 'before_insert', 'after_insert'])
+
+ del methods1[:]
+ del methods2[:]
i1.keywords.append(k1)
sess.flush()
- eq_(methods, ['before_update', 'after_update'])
+ eq_(methods1, ['before_update', 'after_update'])
+ eq_(methods2, [])
@testing.resolve_artifact_names
else:
s1.commit()
+ @testing.emits_warning(r'.*does not support updated rowcount')
+ @testing.resolve_artifact_names
+ def test_bump_version(self):
+ """test that version number can be bumped.
+
+ Ensures that the UPDATE or DELETE is against the
+ last committed version of version_id_col, not the modified
+ state.
+
+ """
+ mapper(Foo, version_table,
+ version_id_col=version_table.c.version_id)
+
+ s1 = sessionmaker()()
+ f1 = Foo(value='f1')
+ s1.add(f1)
+ s1.commit()
+ eq_(f1.version_id, 1)
+ f1.version_id = 2
+ s1.commit()
+ eq_(f1.version_id, 2)
+
+ # skip an id, test that history
+ # is honored
+ f1.version_id = 4
+ f1.value = "something new"
+ s1.commit()
+ eq_(f1.version_id, 4)
+
+ f1.version_id = 5
+ s1.delete(f1)
+ s1.commit()
+ eq_(s1.query(Foo).count(), 0)
+
@testing.emits_warning(r'.*does not support updated rowcount')
@engines.close_open_connections
@testing.resolve_artifact_names
assert P.c.property.strategy.use_get
session = sessionmaker()()
- session.add(P(id=1, data='P version 1'))
+ session.add(P(id='P1', data='P version 1'))
session.commit()
session.close()
assert P.c.property.strategy.use_get
session = sessionmaker()()
- session.add(P(id=1, data='P version 1'))
+ session.add(P(id='P1', data='P version 1'))
session.commit()
session.close()
def test_child_row_switch_two(self):
Session = sessionmaker()
+ # TODO: not sure this test is
+ # testing exactly what its looking for
+
sess1 = Session()
- sess1.add(P(id=1, data='P version 1'))
+ sess1.add(P(id='P1', data='P version 1'))
sess1.commit()
sess1.close()
sess1.delete(p1)
sess1.commit()
+ # this can be removed and it still passes
sess1.add(P(id='P1', data='P version 2'))
sess1.commit()