.. automodule:: examples.versioned_rows
+.. _examples_vertical_tables:
+
Vertical Attribute Mapping
--------------------------
records for each change. The given extensions generate an anonymous "history" class which
represents historical versions of the target object.
+Compare to the :ref:`examples_versioned_rows` examples which write updates
+as new rows in the same table, without using a separate history table.
+
Usage is illustrated via a unit test module ``test_versioning.py``, which can
be run via nose::
"""
-Illustrates an extension which versions data by storing new rows for each change;
-that is, what would normally be an UPDATE becomes an INSERT.
+Several examples that illustrate the technique of intercepting changes
+that would be first interpreted as an UPDATE on a row, and instead turning
+it into an INSERT of a new row, leaving the previous row intact as
+a historical version.
+
+Compare to the :ref:`examples_versioned_history` example which writes a
+history row to a separate history table.
.. autosource::
-"""A variant of the versioned_rows example. Here
-we store a dictionary of key/value pairs, storing the k/v's in a
+"""A variant of the versioned_rows example built around the
+concept of a "vertical table" structure, like those illustrated in
+:ref:`examples_vertical_tables` examples.
+
+Here we store a dictionary of key/value pairs, storing the k/v's in a
"vertical" fashion where each key gets a row. The value is split out
into two separate datatypes, string and int - the range of datatype
storage can be adjusted for individual needs.
from sqlalchemy import Column, String, Integer, ForeignKey, \
create_engine
-from sqlalchemy.orm.interfaces import SessionExtension
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import attributes, relationship, backref, \
- sessionmaker, make_transient, validates
+ sessionmaker, make_transient, validates, Session
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm.collections import attribute_mapped_collection
+from sqlalchemy import event
+
-class VersionExtension(SessionExtension):
+@event.listens_for(Session, "before_flush")
+def before_flush(session, flush_context, instances):
"""Apply the new_version() method of objects which are
marked as dirty during a flush.
- See http://www.sqlalchemy.org/trac/wiki/UsageRecipes/VersionedRows
-
"""
- def before_flush(self, session, flush_context, instances):
- for instance in session.dirty:
- if hasattr(instance, 'new_version') and \
+ for instance in session.dirty:
+ if hasattr(instance, 'new_version') and \
session.is_modified(instance, passive=True):
- # make it transient
- instance.new_version(session)
+ # make it transient
+ instance.new_version(session)
- # re-add
- session.add(instance)
+ # re-add
+ session.add(instance)
Base = declarative_base()
+
class ConfigData(Base):
"""Represent a series of key/value pairs.
id = Column(Integer, primary_key=True)
"""Primary key column of this ConfigData."""
- elements = relationship("ConfigValueAssociation",
- collection_class=attribute_mapped_collection("name"),
- backref=backref("config_data"),
- lazy="subquery"
- )
+ elements = relationship(
+ "ConfigValueAssociation",
+ collection_class=attribute_mapped_collection("name"),
+ backref=backref("config_data"),
+ lazy="subquery"
+ )
"""Dictionary-backed collection of ConfigValueAssociation objects,
keyed to the name of the associated ConfigValue.
# the new ones associate with the new ConfigData,
# the old ones stay associated with the old ConfigData
for elem in hist.unchanged:
- self.elements[elem.name] = ConfigValueAssociation(elem.config_value)
+ self.elements[elem.name] = ConfigValueAssociation(
+ elem.config_value)
# we also need to expire changes on each ConfigValueAssociation
# that is to remain associated with the old ConfigData.
config_id = Column(ForeignKey('config.id'), primary_key=True)
"""Reference the primary key of the ConfigData object."""
-
config_value_id = Column(ForeignKey('config_value.id'), primary_key=True)
"""Reference the primary key of the ConfigValue object."""
"""
if value != self.config_value.value:
self.config_data.elements[self.name] = \
- ConfigValueAssociation(
- ConfigValue(self.config_value.name, value)
- )
+ ConfigValueAssociation(
+ ConfigValue(self.config_value.name, value)
+ )
+
class ConfigValue(Base):
"""Represent an individual key/value pair at a given point in time.
id = Column(Integer, primary_key=True)
name = Column(String(50), nullable=False)
- originating_config_id = Column(Integer, ForeignKey('config.id'),
- nullable=False)
+ originating_config_id = Column(
+ Integer, ForeignKey('config.id'),
+ nullable=False)
int_value = Column(Integer)
string_value = Column(String(255))
if __name__ == '__main__':
engine = create_engine('sqlite://', echo=True)
Base.metadata.create_all(engine)
- Session = sessionmaker(bind=engine, extension=VersionExtension())
+ Session = sessionmaker(engine)
sess = Session()
config = ConfigData({
- 'user_name':'twitter',
- 'hash_id':'4fedffca37eaf',
- 'x':27,
- 'y':450
- })
+ 'user_name': 'twitter',
+ 'hash_id': '4fedffca37eaf',
+ 'x': 27,
+ 'y': 450
+ })
sess.add(config)
sess.commit()
# two versions have been created.
assert config.data == {
- 'user_name':'yahoo',
- 'hash_id':'4fedffca37eaf',
- 'x':27,
- 'y':450
+ 'user_name': 'yahoo',
+ 'hash_id': '4fedffca37eaf',
+ 'x': 27,
+ 'y': 450
}
old_config = sess.query(ConfigData).get(version_one)
assert old_config.data == {
- 'user_name':'twitter',
- 'hash_id':'4fedffca37eaf',
- 'x':27,
- 'y':450
+ 'user_name': 'twitter',
+ 'hash_id': '4fedffca37eaf',
+ 'x': 27,
+ 'y': 450
}
# the history of any key can be acquired using
# the originating_config_id attribute
history = sess.query(ConfigValue).\
- filter(ConfigValue.name=='user_name').\
- order_by(ConfigValue.originating_config_id).\
- all()
+ filter(ConfigValue.name == 'user_name').\
+ order_by(ConfigValue.originating_config_id).\
+ all()
- assert [(h.value, h.originating_config_id) for h in history] == \
- [('twitter', version_one), ('yahoo', version_two)]
+ assert [(h.value, h.originating_config_id) for h in history] == (
+ [('twitter', version_one), ('yahoo', version_two)]
+ )
row is inserted with the new data, keeping the old row intact.
"""
-from sqlalchemy.orm import *
-from sqlalchemy import *
-from sqlalchemy.orm.interfaces import SessionExtension
+from sqlalchemy.orm import sessionmaker, relationship, make_transient, \
+ backref, Session
+from sqlalchemy import Column, ForeignKey, create_engine, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import attributes
+from sqlalchemy import event
+
class Versioned(object):
def new_version(self, session):
- # if on SQLA 0.6.1 or earlier,
- # make sure 'id' isn't expired.
- # self.id
-
# make us transient (removes persistent
# identity).
make_transient(self)
# a new PK will be generated on INSERT.
self.id = None
-class VersionExtension(SessionExtension):
- def before_flush(self, session, flush_context, instances):
- for instance in session.dirty:
- if not isinstance(instance, Versioned):
- continue
- if not session.is_modified(instance, passive=True):
- continue
- if not attributes.instance_state(instance).has_identity:
- continue
+@event.listens_for(Session, "before_flush")
+def before_flush(session, flush_context, instances):
+ for instance in session.dirty:
+ if not isinstance(instance, Versioned):
+ continue
+ if not session.is_modified(instance, passive=True):
+ continue
+
+ if not attributes.instance_state(instance).has_identity:
+ continue
- # make it transient
- instance.new_version(session)
- # re-add
- session.add(instance)
+ # make it transient
+ instance.new_version(session)
+ # re-add
+ session.add(instance)
Base = declarative_base()
engine = create_engine('sqlite://', echo=True)
-Session = sessionmaker(engine, extension=[VersionExtension()])
+Session = sessionmaker(engine)
# example 1, simple versioning
+
class Example(Versioned, Base):
__tablename__ = 'example'
id = Column(Integer, primary_key=True)
e1.data = 'e2'
session.commit()
-assert session.query(Example.id, Example.data).order_by(Example.id).all() == \
- [(1, 'e1'), (2, 'e2')]
+assert session.query(Example.id, Example.data).order_by(Example.id).all() == (
+ [(1, 'e1'), (2, 'e2')]
+)
# example 2, versioning with a parent
+
class Parent(Base):
__tablename__ = 'parent'
id = Column(Integer, primary_key=True)
child_id = Column(Integer, ForeignKey('child.id'))
child = relationship("Child", backref=backref('parent', uselist=False))
+
class Child(Versioned, Base):
__tablename__ = 'child'
session.commit()
assert p1.child_id == 2
-assert session.query(Child.id, Child.data).order_by(Child.id).all() == \
- [(1, 'c1'), (2, 'c2')]
\ No newline at end of file
+assert session.query(Child.id, Child.data).order_by(Child.id).all() == (
+ [(1, 'c1'), (2, 'c2')]
+)
--- /dev/null
+"""Illustrates a method to intercept changes on objects, turning
+an UPDATE statement on a single row into an INSERT statement, so that a new
+row is inserted with the new data, keeping the old row intact.
+
+This example adds a numerical version_id to the Versioned class as well
+as the ability to see which row is the most "current" vesion.
+
+"""
+from sqlalchemy.orm import sessionmaker, relationship, make_transient, \
+ backref, Session, column_property
+from sqlalchemy import Column, ForeignKeyConstraint, create_engine, \
+ Integer, String, Boolean, select, func
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import attributes
+from sqlalchemy import event
+
+
+class Versioned(object):
+ # we have a composite primary key consisting of "id"
+ # and "version_id"
+ id = Column(Integer, primary_key=True)
+ version_id = Column(Integer, primary_key=True, default=1)
+
+ # optional - add a persisted is_current_version column
+ is_current_version = Column(Boolean, default=True)
+
+ # optional - add a calculated is_current_version column
+ @classmethod
+ def __declare_last__(cls):
+ alias = cls.__table__.alias()
+ cls.calc_is_current_version = column_property(
+ select([func.max(alias.c.version_id) == cls.version_id]).where(
+ alias.c.id == cls.id
+ )
+ )
+
+ def new_version(self, session):
+ # optional - set previous version to have is_current_version=False
+ old_id = self.id
+ session.query(self.__class__).filter_by(id=old_id).update(
+ values=dict(is_current_version=False), synchronize_session=False)
+
+ # make us transient (removes persistent
+ # identity).
+ make_transient(self)
+
+ # increment version_id, which means we have a new PK.
+ self.version_id += 1
+
+
+@event.listens_for(Session, "before_flush")
+def before_flush(session, flush_context, instances):
+ for instance in session.dirty:
+ if not isinstance(instance, Versioned):
+ continue
+ if not session.is_modified(instance, passive=True):
+ continue
+
+ if not attributes.instance_state(instance).has_identity:
+ continue
+
+ # make it transient
+ instance.new_version(session)
+
+ # re-add
+ session.add(instance)
+
+Base = declarative_base()
+
+engine = create_engine('sqlite://', echo=True)
+
+Session = sessionmaker(engine)
+
+# example 1, simple versioning
+
+
+class Example(Versioned, Base):
+ __tablename__ = 'example'
+ data = Column(String)
+
+Base.metadata.create_all(engine)
+
+session = Session()
+e1 = Example(id=1, data='e1')
+session.add(e1)
+session.commit()
+
+e1.data = 'e2'
+session.commit()
+
+assert session.query(
+ Example.id,
+ Example.version_id,
+ Example.is_current_version,
+ Example.calc_is_current_version,
+ Example.data).order_by(Example.id, Example.version_id).all() == (
+ [(1, 1, False, False, 'e1'), (1, 2, True, True, 'e2')]
+)
+
+# example 2, versioning with a parent
+
+
+class Parent(Base):
+ __tablename__ = 'parent'
+ id = Column(Integer, primary_key=True)
+ child_id = Column(Integer)
+ child_version_id = Column(Integer)
+ child = relationship("Child", backref=backref('parent', uselist=False))
+
+ __table_args__ = (
+ ForeignKeyConstraint(
+ ['child_id', 'child_version_id'],
+ ['child.id', 'child.version_id'],
+ ),
+ )
+
+
+class Child(Versioned, Base):
+ __tablename__ = 'child'
+
+ data = Column(String)
+
+ def new_version(self, session):
+ # expire parent's reference to us
+ session.expire(self.parent, ['child'])
+
+ # create new version
+ Versioned.new_version(self, session)
+
+ # re-add ourselves to the parent. this causes the
+ # parent foreign key to be updated also
+ self.parent.child = self
+
+Base.metadata.create_all(engine)
+
+session = Session()
+
+p1 = Parent(child=Child(id=1, data='c1'))
+session.add(p1)
+session.commit()
+
+p1.child.data = 'c2'
+session.commit()
+
+assert p1.child_id == 1
+assert p1.child.version_id == 2
+
+assert session.query(
+ Child.id,
+ Child.version_id,
+ Child.is_current_version,
+ Child.calc_is_current_version,
+ Child.data).order_by(Child.id, Child.version_id).all() == (
+ [(1, 1, False, False, 'c1'), (1, 2, True, True, 'c2')]
+)