See the example ``examples/association/proxied_association.py``.
"""
+import weakref, itertools
import sqlalchemy.exceptions as exceptions
import sqlalchemy.orm as orm
import sqlalchemy.util as util
-import weakref
def association_proxy(targetcollection, attr, **kw):
"""Convenience function for use in mapped classes. Implements a Python
return self._get(self.col[index])
def __setitem__(self, index, value):
- self._set(self.col[index], value)
+ if not isinstance(index, slice):
+ self._set(self.col[index], value)
+ else:
+ if index.stop is None:
+ stop = len(self)
+ elif index.stop < 0:
+ stop = len(self) + index.stop
+ else:
+ stop = index.stop
+ step = index.step or 1
+
+ rng = range(index.start or 0, stop, step)
+ if step == 1:
+ for i in rng:
+ del self[index.start]
+ i = index.start
+ for item in value:
+ self.insert(i, item)
+ i += 1
+ else:
+ if len(value) != len(rng):
+ raise ValueError(
+ "attempt to assign sequence of size %s to "
+ "extended slice of size %s" % (len(value),
+ len(rng)))
+ for i, item in zip(rng, value):
+ self._set(self.col[i], item)
def __delitem__(self, index):
del self.col[index]
del self.col[start:end]
def __iter__(self):
- """Iterate over proxied values. For the actual domain objects,
- iterate over .col instead or just use the underlying collection
- directly from its property on the parent."""
+ """Iterate over proxied values.
+
+ For the actual domain objects, iterate over .col instead or
+ just use the underlying collection directly from its property
+ on the parent.
+ """
+
for member in self.col:
yield self._get(member)
raise StopIteration
item = self._create(value, **kw)
self.col.append(item)
+ def count(self, value):
+ return sum([1 for _ in
+ itertools.ifilter(lambda v: v == value, iter(self))])
+
def extend(self, values):
for v in values:
self.append(v)
def pop(self, index=-1):
return self.getter(self.col.pop(index))
+ def remove(self, value):
+ for i, val in enumerate(self):
+ if val == value:
+ del self.col[i]
+ return
+ raise ValueError("value not in list")
+
+ def reverse(self):
+ """Not supported, use reversed(mylist)"""
+
+ raise NotImplementedError
+
+ def sort(self):
+ """Not supported, use sorted(mylist)"""
+
+ raise NotImplementedError
+
def clear(self):
del self.col[0:len(self.col)]
self.assert_(p1.children[1] == 'changed-in-place')
assert p1._children[1].id == inplace_id
+ p1.children.append('changed-in-place')
+ self.assert_(p1.children.count('changed-in-place') == 2)
+
+ p1.children.remove('changed-in-place')
+ self.assert_(p1.children.count('changed-in-place') == 1)
+
+ p1 = self.roundtrip(p1)
+ self.assert_(p1.children.count('changed-in-place') == 1)
+
p1._children = []
self.assert_(len(p1.children) == 0)
+ after = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
+ p1.children = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
+ self.assert_(len(p1.children) == 10)
+ self.assert_([c.name for c in p1._children] == after)
+
+ p1.children[2:6] = ['x'] * 4
+ after = ['a', 'b', 'x', 'x', 'x', 'x', 'g', 'h', 'i', 'j']
+ self.assert_(p1.children == after)
+ self.assert_([c.name for c in p1._children] == after)
+
+ p1.children[2:6] = ['y']
+ after = ['a', 'b', 'y', 'g', 'h', 'i', 'j']
+ self.assert_(p1.children == after)
+ self.assert_([c.name for c in p1._children] == after)
+
+ p1.children[2:3] = ['z'] * 4
+ after = ['a', 'b', 'z', 'z', 'z', 'z', 'g', 'h', 'i', 'j']
+ self.assert_(p1.children == after)
+ self.assert_([c.name for c in p1._children] == after)
+
+ p1.children[2::2] = ['O'] * 4
+ after = ['a', 'b', 'O', 'z', 'O', 'z', 'O', 'h', 'O', 'j']
+ self.assert_(p1.children == after)
+ self.assert_([c.name for c in p1._children] == after)
+
class DefaultTest(_CollectionOperations):
def __init__(self, *args, **kw):
super(DefaultTest, self).__init__(*args, **kw)
self.assert_(len(p1.children) == 2)
self.assert_(len(p1._children) == 2)
- self.assert_(set([o.name for o in p1._children]) == set(['regular', 'proxied']))
+ self.assert_(set([o.name for o in p1._children]) ==
+ set(['regular', 'proxied']))
ch2 = None
for o in p1._children: