class Article(object):
# create "keywords" proxied association
keywords = AssociationProxy('keyword_associations', 'keyword', creator=create_keyword_association)
-
+### orderinglist
+
+**Author:** Jason Kirtland
+
+`orderinglist` is a helper for mutable ordered relations. It will intercept
+list operations performed on a relation collection and automatically
+synchronize changes in list position with an attribute on the related objects.
+(See [advdatamapping_properties_customlist](rel:advdatamapping_properties_customlist) for more information on the general pattern.)
+
+Example: Two tables that store slides in a presentation. Each slide
+has a number of bullet points, displayed in order by the 'position'
+column on the bullets table. These bullets can be inserted and re-ordered
+by your end users, and you need to update the 'position' column of all
+affected rows when changes are made.
+
+ {python}
+ slides_table = Table('Slides', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('name', String))
+
+ bullets_table = Table('Bullets', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('slide_id', Integer, ForeignKey('Slides.id')),
+ Column('position', Integer),
+ Column('text', String))
+
+ class Slide(object):
+ pass
+ class Bullet(object):
+ pass
+
+ mapper(Slide, slides_table, properties={
+ 'bullets': relation(Bullet, order_by=[bullets_table.c.position])
+ })
+ mapper(Bullet, bullets_table)
+
+The standard relation mapping will produce a list-like attribute on each Slide
+containing all related Bullets, but coping with changes in ordering is totally
+your responsibility. If you insert a Bullet into that list, there is no
+magic- it won't have a position attribute unless you assign it it one, and
+you'll need to manually renumber all the subsequent Bullets in the list to
+accommodate the insert.
+
+An `orderinglist` can automate this and manage the 'position' attribute on all
+related bullets for you.
+
+ {python}
+ mapper(Slide, slides_table, properties={
+ 'bullets': relation(Bullet,
+ collection_class=ordering_list('position'),
+ order_by=[bullets_table.c.position])
+ })
+ mapper(Bullet, bullets_table)
+
+ s = Slide()
+ s.bullets.append(Bullet())
+ s.bullets.append(Bullet())
+ s.bullets[1].position
+ >>> 1
+ s.bullets.insert(1, Bullet())
+ s.bullets[2].position
+ >>> 2
+
+Use the `ordering_list` function to set up the `collection_class` on relations
+(as in the mapper example above). This implementation depends on the list
+starting in the proper order, so be SURE to put an order_by on your relation.
+
+`ordering_list` takes the name of the related object's ordering attribute as
+an argument. By default, the zero-based integer index of the object's
+position in the `ordering_list` is synchronized with the ordering attribute:
+index 0 will get position 0, index 1 position 1, etc. To start numbering at 1
+or some other integer, provide `count_from=1`.
+
+Ordering values are not limited to incrementing integers. Almost any scheme
+can implemented by supplying a custom `ordering_func` that maps a Python list
+index to any value you require. See the [module
+documentation](rel:docstrings_sqlalchemy.ext.orderinglist) for more
+information, and also check out the unit tests for examples of stepped
+numbering, alphabetical and Fibonacci numbering.
+
### threadlocal
**Author:** Mike Bayer and Daniel Miller
--- /dev/null
+"""
+A custom list implementation for mapped relations that syncs position in a
+Python list with a position attribute on the mapped objects.
+"""
+
+__all__ = [ 'ordering_list' ]
+
+
+def ordering_list(attr, count_from=None, **kw):
+ """
+ Prepares an OrderingList factory for use as an argument to a
+ Mapper relation's 'collection_class' option. Arguments are:
+
+ attr
+ Name of the mapped attribute to use for storage and retrieval of
+ ordering information
+
+ count_from (optional)
+ Set up an integer-based ordering, starting at 'count_from'. For example,
+ ordering_list('pos', count_from=1) would create a 1-based list in SQL,
+ storing the value in the 'pos' column. Ignored if ordering_func is
+ supplied.
+
+ Passes along any keyword arguments to OrderingList constructor.
+ """
+
+ kw = _unsugar_count_from(count_from=count_from, **kw)
+ return lambda: OrderingList(attr, **kw)
+
+# Ordering utility functions
+def count_from_0(index, collection):
+ """Numbering function: consecutive integers starting at 0."""
+
+ return index
+
+def count_from_1(index, collection):
+ """Numbering function: consecutive integers starting at 1."""
+
+ return index + 1
+
+def count_from_n_factory(start):
+ """Numbering function: consecutive integers starting at arbitrary start."""
+
+ def f(index, collection):
+ return index + start
+ try:
+ f.__name__ = 'count_from_%i' % start
+ except TypeError:
+ pass
+ return f
+
+def _unsugar_count_from(**kw):
+ """Keyword argument filter, prepares a simple ordering_func from
+ a 'count_from' argument, otherwise passes ordering_func on unchanged."""
+
+ count_from = kw.pop('count_from', None)
+ if kw.get('ordering_func', None) is None and count_from is not None:
+ if count_from == 0:
+ kw['ordering_func'] = count_from_0
+ elif count_from == 1:
+ kw['ordering_func'] = count_from_1
+ else:
+ kw['ordering_func'] = count_from_n_factory(count_from)
+ return kw
+
+class OrderingList(list):
+ def __init__(self, ordering_attr=None, ordering_func=None,
+ reorder_on_append=False):
+ """
+ A 'collection_class' list implementation that syncs position in a
+ Python list with a position attribute on the mapped objects.
+
+ This implementation counts on the list starting in the proper
+ order, so be SURE to put an order_by on your relation.
+ Arguments are:
+
+ ordering_attr
+ Name of the attribute that stores the object's order in the relation.
+
+ ordering_func
+ Optional. A function that maps the position in the Python list to a
+ value to store in the ordering_attr. Values returned are usually
+ (but need not be!) integers.
+
+ ordering_funcs are called with two positional parameters: index of
+ the element in the list, and the list itself.
+
+ If omitted, list indexes are used for the attribute values. Two
+ basic pre-built numbering functions are provided: 'count_from_0' and
+ 'count_from_1'. For more exotic examples like stepped numbering,
+ alphabetical and Fibonacci numbering, see the unit tests.
+
+ reorder_on_append
+ Default false. When appending an object with an existing (non-None)
+ ordering value, that value will be left untouched unless
+ reorder_on_append is true. This is an optimization to avoid a
+ variety of dangerous unexpected database writes.
+
+ SQLAlchemy will add instances to the list via append() when your
+ object loads. If for some reason the result set from the database
+ skips a step in the ordering (say, row '1' is missing but you get
+ '2', '3', and '4'), reorder_on_append=True would immediately
+ renumber the items to '1', '2', '3'. If you have multiple sessions
+ making changes, any of whom happen to load this collection even in
+ passing, all of the sessions would try to 'clean up' the numbering
+ in their commits, possibly causing all but one to fail with a
+ concurrent modification error. Spooky action at a distance.
+
+ Recommend leaving this with the default of False, and just call
+ ._reorder() if you're doing append() operations with previously
+ ordered instances or doing housekeeping after manual sql operations.
+ """
+
+ self.ordering_attr = ordering_attr
+ if ordering_func is None:
+ ordering_func = count_from_0
+ self.ordering_func = ordering_func
+ self.reorder_on_append = reorder_on_append
+
+ # More complex serialization schemes (multi column, e.g.) are possible by
+ # subclassing and reimplementing these two methods.
+ def _get_order_value(self, entity):
+ return getattr(entity, self.ordering_attr)
+
+ def _set_order_value(self, entity, value):
+ setattr(entity, self.ordering_attr, value)
+
+ def _reorder(self):
+ """Sweep through the list and ensure that each object has accurate
+ ordering information set."""
+
+ for index, entity in enumerate(self):
+ self._order_entity(index, entity, True)
+
+ def _order_entity(self, index, entity, reorder=True):
+ have = self._get_order_value(entity)
+
+ # Don't disturb existing ordering if reorder is False
+ if have is not None and not reorder:
+ return
+
+ should_be = self.ordering_func(index, self)
+ if have <> should_be:
+ self._set_order_value(entity, should_be)
+
+ def append(self, entity):
+ super(OrderingList, self).append(entity)
+ self._order_entity(len(self) - 1, entity, self.reorder_on_append)
+
+ def _raw_append(self, entity):
+ """Append without any ordering behavior."""
+
+ super(OrderingList, self).append(entity)
+
+ def insert(self, index, entity):
+ self[index:index] = [entity]
+
+ def remove(self, entity):
+ super(OrderingList, self).remove(entity)
+ self._reorder()
+
+ def pop(self, index=-1):
+ entity = super(OrderingList, self).pop(index)
+ self._reorder()
+ return entity
+
+ def __setitem__(self, index, entity):
+ super(OrderingList, self).__setitem__(index, entity)
+ self._order_entity(index, entity, True)
+
+ def __delitem__(self, index):
+ super(OrderingList, self).__delitem__(index)
+ self._reorder()
+
+ def __setslice__(self, start, end, values):
+ super(OrderingList, self).__setslice__(start, end, values)
+ self._reorder()
+
+ def __delslice__(self, start, end):
+ super(OrderingList, self).__delslice__(start, end)
+ self._reorder()
--- /dev/null
+from testbase import PersistTest
+import sqlalchemy.util as util
+import unittest, sys, os
+import testbase
+from sqlalchemy import *
+from sqlalchemy.ext.orderinglist import *
+
+db = testbase.db
+metadata = None
+
+# order in whole steps
+def step_numbering(step):
+ def f(index, collection):
+ return step * index
+ return f
+
+# almost fibonacci- skip the first 2 steps
+# e.g. 1, 2, 3, 5, 8, ... instead of 0, 1, 1, 2, 3, ...
+# otherwise ordering of the elements at '1' is undefined... ;)
+def fibonacci_numbering(order_col):
+ def f(index, collection):
+ if index == 0:
+ return 1
+ elif index == 1:
+ return 2
+ else:
+ return (getattr(collection[index - 1], order_col) +
+ getattr(collection[index - 2], order_col))
+ return f
+
+# 0 -> A, 1 -> B, ... 25 -> Z, 26 -> AA, 27 -> AB, ...
+def alpha_ordering(index, collection):
+ s = ''
+ while index > 25:
+ d = index / 26
+ s += chr((d % 26) + 64)
+ index -= d * 26
+ s += chr(index + 65)
+ return s
+
+class OrderingListTest(PersistTest):
+ def setUp(self):
+ global metadata, slides_table, bullets_table, Slide, Bullet
+ slides_table, bullets_table = None, None
+ Slide, Bullet = None, None
+ if metadata:
+ metadata.clear()
+
+ def _setup(self, test_collection_class):
+ """Build a relation situation using the given test_collection_class
+ factory"""
+
+ global metadata, slides_table, bullets_table, Slide, Bullet
+
+ metadata = BoundMetaData(db)
+ slides_table = Table('test_Slides', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('name', String))
+ bullets_table = Table('test_Bullets', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('slide_id', Integer,
+ ForeignKey('test_Slides.id')),
+ Column('position', Integer),
+ Column('text', String))
+
+ class Slide(object):
+ def __init__(self, name):
+ self.name = name
+ def __repr__(self):
+ return '<Slide "%s">' % self.name
+
+ class Bullet(object):
+ def __init__(self, text):
+ self.text = text
+ def __repr__(self):
+ return '<Bullet "%s" pos %s>' % (self.text, self.position)
+
+ mapper(Slide, slides_table, properties={
+ 'bullets': relation(Bullet, lazy=False,
+ collection_class=test_collection_class,
+ backref='slide',
+ order_by=[bullets_table.c.position])
+ })
+ mapper(Bullet, bullets_table)
+
+ metadata.create_all()
+
+ def tearDown(self):
+ metadata.drop_all()
+
+ def test_append_no_reorder(self):
+ self._setup(ordering_list('position', count_from=1,
+ reorder_on_append=False))
+
+ s1 = Slide('Slide #1')
+
+ self.assert_(not s1.bullets)
+ self.assert_(len(s1.bullets) == 0)
+
+ s1.bullets.append(Bullet('s1/b1'))
+
+ self.assert_(s1.bullets)
+ self.assert_(len(s1.bullets) == 1)
+ self.assert_(s1.bullets[0].position == 1)
+
+ s1.bullets.append(Bullet('s1/b2'))
+
+ self.assert_(len(s1.bullets) == 2)
+ self.assert_(s1.bullets[0].position == 1)
+ self.assert_(s1.bullets[1].position == 2)
+
+ bul = Bullet('s1/b100')
+ bul.position = 100
+ s1.bullets.append(bul)
+
+ self.assert_(s1.bullets[0].position == 1)
+ self.assert_(s1.bullets[1].position == 2)
+ self.assert_(s1.bullets[2].position == 100)
+
+ s1.bullets.append(Bullet('s1/b4'))
+ self.assert_(s1.bullets[0].position == 1)
+ self.assert_(s1.bullets[1].position == 2)
+ self.assert_(s1.bullets[2].position == 100)
+ self.assert_(s1.bullets[3].position == 4)
+
+ s1.bullets._reorder()
+ self.assert_(s1.bullets[0].position == 1)
+ self.assert_(s1.bullets[1].position == 2)
+ self.assert_(s1.bullets[2].position == 3)
+ self.assert_(s1.bullets[3].position == 4)
+
+ session = create_session()
+ session.save(s1)
+ session.flush()
+
+ id = s1.id
+ session.clear()
+ del s1
+
+ srt = session.query(Slide).get(id)
+
+ self.assert_(srt.bullets)
+ self.assert_(len(srt.bullets) == 4)
+
+ titles = ['s1/b1','s1/b2','s1/b100','s1/b4']
+ found = [b.text for b in srt.bullets]
+
+ self.assert_(titles == found)
+
+ def test_append_reorder(self):
+ self._setup(ordering_list('position', count_from=1,
+ reorder_on_append=True))
+
+ s1 = Slide('Slide #1')
+
+ self.assert_(not s1.bullets)
+ self.assert_(len(s1.bullets) == 0)
+
+ s1.bullets.append(Bullet('s1/b1'))
+
+ self.assert_(s1.bullets)
+ self.assert_(len(s1.bullets) == 1)
+ self.assert_(s1.bullets[0].position == 1)
+
+ s1.bullets.append(Bullet('s1/b2'))
+
+ self.assert_(len(s1.bullets) == 2)
+ self.assert_(s1.bullets[0].position == 1)
+ self.assert_(s1.bullets[1].position == 2)
+
+ bul = Bullet('s1/b100')
+ bul.position = 100
+ s1.bullets.append(bul)
+
+ self.assert_(s1.bullets[0].position == 1)
+ self.assert_(s1.bullets[1].position == 2)
+ self.assert_(s1.bullets[2].position == 3)
+
+ s1.bullets.append(Bullet('s1/b4'))
+ self.assert_(s1.bullets[0].position == 1)
+ self.assert_(s1.bullets[1].position == 2)
+ self.assert_(s1.bullets[2].position == 3)
+ self.assert_(s1.bullets[3].position == 4)
+
+ s1.bullets._reorder()
+ self.assert_(s1.bullets[0].position == 1)
+ self.assert_(s1.bullets[1].position == 2)
+ self.assert_(s1.bullets[2].position == 3)
+ self.assert_(s1.bullets[3].position == 4)
+
+ session = create_session()
+ session.save(s1)
+ session.flush()
+
+ id = s1.id
+ session.clear()
+ del s1
+
+ srt = session.query(Slide).get(id)
+
+ self.assert_(srt.bullets)
+ self.assert_(len(srt.bullets) == 4)
+
+ titles = ['s1/b1','s1/b2','s1/b100','s1/b4']
+ found = [b.text for b in srt.bullets]
+
+ self.assert_(titles == found)
+
+ def test_insert(self):
+ self._setup(ordering_list('position'))
+
+ s1 = Slide('Slide #1')
+ s1.bullets.append(Bullet('1'))
+ s1.bullets.append(Bullet('2'))
+ s1.bullets.append(Bullet('3'))
+ s1.bullets.append(Bullet('4'))
+
+ self.assert_(s1.bullets[0].position == 0)
+ self.assert_(s1.bullets[1].position == 1)
+ self.assert_(s1.bullets[2].position == 2)
+ self.assert_(s1.bullets[3].position == 3)
+
+ s1.bullets.insert(2, Bullet('insert_at_2'))
+ self.assert_(s1.bullets[0].position == 0)
+ self.assert_(s1.bullets[1].position == 1)
+ self.assert_(s1.bullets[2].position == 2)
+ self.assert_(s1.bullets[3].position == 3)
+ self.assert_(s1.bullets[4].position == 4)
+
+ self.assert_(s1.bullets[1].text == '2')
+ self.assert_(s1.bullets[2].text == 'insert_at_2')
+ self.assert_(s1.bullets[3].text == '3')
+
+ s1.bullets.insert(999, Bullet('999'))
+
+ self.assert_(len(s1.bullets) == 6)
+ self.assert_(s1.bullets[5].position == 5)
+
+ session = create_session()
+ session.save(s1)
+ session.flush()
+
+ id = s1.id
+ session.clear()
+ del s1
+
+ srt = session.query(Slide).get(id)
+
+ self.assert_(srt.bullets)
+ self.assert_(len(srt.bullets) == 6)
+
+ texts = ['1','2','insert_at_2','3','4','999']
+ found = [b.text for b in srt.bullets]
+
+ self.assert_(texts == found)
+
+ def test_slice(self):
+ self._setup(ordering_list('position'))
+
+ b = [ Bullet('1'), Bullet('2'), Bullet('3'),
+ Bullet('4'), Bullet('5'), Bullet('6') ]
+ s1 = Slide('Slide #1')
+
+ # 1, 2, 3
+ s1.bullets[0:3] = b[0:3]
+ for i in 0, 1, 2:
+ self.assert_(s1.bullets[i].position == i)
+ self.assert_(s1.bullets[i] == b[i])
+
+ # 1, 4, 5, 6, 3
+ s1.bullets[1:2] = b[3:6]
+ for li, bi in (0,0), (1,3), (2,4), (3,5), (4,2):
+ self.assert_(s1.bullets[li].position == li)
+ self.assert_(s1.bullets[li] == b[bi])
+
+ # 1, 6, 3
+ del s1.bullets[1:3]
+ for li, bi in (0,0), (1,5), (2,2):
+ self.assert_(s1.bullets[li].position == li)
+ self.assert_(s1.bullets[li] == b[bi])
+
+ session = create_session()
+ session.save(s1)
+ session.flush()
+
+ id = s1.id
+ session.clear()
+ del s1
+
+ srt = session.query(Slide).get(id)
+
+ self.assert_(srt.bullets)
+ self.assert_(len(srt.bullets) == 3)
+
+ texts = ['1', '6', '3']
+ for i, text in enumerate(texts):
+ self.assert_(srt.bullets[i].position == i)
+ self.assert_(srt.bullets[i].text == text)
+
+ def test_replace1(self):
+ self._setup(ordering_list('position'))
+
+ s1 = Slide('Slide #1')
+ s1.bullets = [ Bullet('1'), Bullet('2'), Bullet('3') ]
+
+ self.assert_(len(s1.bullets) == 3)
+ self.assert_(s1.bullets[2].position == 2)
+
+ session = create_session()
+ session.save(s1)
+ session.flush()
+
+ new_bullet = Bullet('new 2')
+ self.assert_(new_bullet.position is None)
+
+ # naive replacement, no database deletion should occur
+ # with current InstrumentedList __setitem__ semantics
+ s1.bullets[1] = new_bullet
+
+ self.assert_(new_bullet.position == 1)
+ self.assert_(len(s1.bullets) == 3)
+
+ id = s1.id
+
+ session.flush()
+ session.clear()
+
+ srt = session.query(Slide).get(id)
+
+ self.assert_(srt.bullets)
+ self.assert_(len(srt.bullets) == 4)
+
+ self.assert_(srt.bullets[1].text == '2')
+ self.assert_(srt.bullets[2].text == 'new 2')
+
+ def test_replace2(self):
+ self._setup(ordering_list('position'))
+
+ s1 = Slide('Slide #1')
+ s1.bullets = [ Bullet('1'), Bullet('2'), Bullet('3') ]
+
+ self.assert_(len(s1.bullets) == 3)
+ self.assert_(s1.bullets[2].position == 2)
+
+ session = create_session()
+ session.save(s1)
+ session.flush()
+
+ new_bullet = Bullet('new 2')
+ self.assert_(new_bullet.position is None)
+
+ # mark existing bullet as db-deleted before replacement.
+ session.delete(s1.bullets[1])
+ s1.bullets[1] = new_bullet
+
+ self.assert_(new_bullet.position == 1)
+ self.assert_(len(s1.bullets) == 3)
+
+ id = s1.id
+
+ session.flush()
+ session.clear()
+
+ srt = session.query(Slide).get(id)
+
+ self.assert_(srt.bullets)
+ self.assert_(len(srt.bullets) == 3)
+
+ self.assert_(srt.bullets[1].text == 'new 2')
+ self.assert_(srt.bullets[2].text == '3')
+
+ def test_funky_ordering(self):
+ class Pos(object):
+ def __init__(self):
+ self.position = None
+
+ step_factory = ordering_list('position',
+ ordering_func=step_numbering(2))
+
+ stepped = step_factory()
+ stepped.append(Pos())
+ stepped.append(Pos())
+ stepped.append(Pos())
+ stepped.append(Pos())
+
+ for li, pos in (0,0), (1,2), (2,4), (3,6):
+ self.assert_(stepped[li].position == pos)
+
+ fib_factory = ordering_list('position',
+ ordering_func=fibonacci_numbering('position'))
+
+ fibbed = fib_factory()
+ fibbed.append(Pos())
+ fibbed.append(Pos())
+ fibbed.append(Pos())
+ fibbed.append(Pos())
+ fibbed.append(Pos())
+
+ for li, pos in (0,1), (1,2), (2,3), (3,5), (4,8):
+ self.assert_(fibbed[li].position == pos)
+
+ fibbed.insert(2, Pos())
+ fibbed.insert(4, Pos())
+ fibbed.insert(6, Pos())
+
+ for li, pos in (0,1), (1,2), (2,3), (3,5), (4,8), (5,13), (6,21), (7,34):
+ self.assert_(fibbed[li].position == pos)
+
+ alpha_factory = ordering_list('position',
+ ordering_func=alpha_ordering)
+ alpha = alpha_factory()
+ alpha.append(Pos())
+ alpha.append(Pos())
+ alpha.append(Pos())
+
+ alpha.insert(1, Pos())
+
+ for li, pos in (0,'A'), (1,'B'), (2,'C'), (3,'D'):
+ self.assert_(alpha[li].position == pos)
+
+if __name__ == "__main__":
+ testbase.main()