from sqlalchemy.orm.interfaces import AttributeExtension
from sqlalchemy import exc as sa_exc
from test.lib import *
-from test.lib.testing import eq_, ne_, assert_raises, assert_raises_message
+from test.lib.testing import eq_, ne_, assert_raises, \
+ assert_raises_message
from test.orm import _base
from test.lib.util import gc_collect, all_partial_orderings
from sqlalchemy.util import cmp, jython, topological
MyTest, MyTest2 = None, None
def test_basic(self):
- class User(object):pass
+ class User(object):
+ pass
instrumentation.register_class(User)
- attributes.register_attribute(User, 'user_id', uselist=False, useobject=False)
- attributes.register_attribute(User, 'user_name', uselist=False, useobject=False)
- attributes.register_attribute(User, 'email_address', uselist=False, useobject=False)
-
+ attributes.register_attribute(User, 'user_id', uselist=False,
+ useobject=False)
+ attributes.register_attribute(User, 'user_name', uselist=False,
+ useobject=False)
+ attributes.register_attribute(User, 'email_address',
+ uselist=False, useobject=False)
u = User()
u.user_id = 7
u.user_name = 'john'
u.email_address = 'lala@123.com'
-
- self.assert_(u.user_id == 7 and u.user_name == 'john' and u.email_address == 'lala@123.com')
+ self.assert_(u.user_id == 7 and u.user_name == 'john'
+ and u.email_address == 'lala@123.com')
attributes.instance_state(u).commit_all(attributes.instance_dict(u))
- self.assert_(u.user_id == 7 and u.user_name == 'john' and u.email_address == 'lala@123.com')
-
+ self.assert_(u.user_id == 7 and u.user_name == 'john'
+ and u.email_address == 'lala@123.com')
u.user_name = 'heythere'
u.email_address = 'foo@bar.com'
- self.assert_(u.user_id == 7 and u.user_name == 'heythere' and u.email_address == 'foo@bar.com')
+ self.assert_(u.user_id == 7 and u.user_name == 'heythere'
+ and u.email_address == 'foo@bar.com')
def test_pickleness(self):
instrumentation.register_class(MyTest)
self.assert_(o4.mt2[0].b is None)
def test_state_gc(self):
- """test that InstanceState always has a dict, even after host object gc'ed."""
+ """test that InstanceState always has a dict, even after host
+ object gc'ed."""
class Foo(object):
pass
attributes.register_attribute(Foo, 'b', uselist=False, useobject=False)
f = Foo()
- attributes.instance_state(f).expire(attributes.instance_dict(f), set())
- eq_(f.a, "this is a")
+ attributes.instance_state(f).expire(attributes.instance_dict(f),
+ set())
+ eq_(f.a, 'this is a')
eq_(f.b, 12)
-
- f.a = "this is some new a"
- attributes.instance_state(f).expire(attributes.instance_dict(f), set())
- eq_(f.a, "this is a")
+ f.a = 'this is some new a'
+ attributes.instance_state(f).expire(attributes.instance_dict(f),
+ set())
+ eq_(f.a, 'this is a')
eq_(f.b, 12)
-
- attributes.instance_state(f).expire(attributes.instance_dict(f), set())
- f.a = "this is another new a"
- eq_(f.a, "this is another new a")
+ attributes.instance_state(f).expire(attributes.instance_dict(f),
+ set())
+ f.a = 'this is another new a'
+ eq_(f.a, 'this is another new a')
eq_(f.b, 12)
-
- attributes.instance_state(f).expire(attributes.instance_dict(f), set())
- eq_(f.a, "this is a")
+ attributes.instance_state(f).expire(attributes.instance_dict(f),
+ set())
+ eq_(f.a, 'this is a')
eq_(f.b, 12)
-
del f.a
eq_(f.a, None)
eq_(f.b, 12)
-
- attributes.instance_state(f).commit_all(attributes.instance_dict(f), set())
+ attributes.instance_state(f).commit_all(attributes.instance_dict(f),
+ set())
eq_(f.a, None)
eq_(f.b, 12)
instrumentation.register_class(User)
instrumentation.register_class(Address)
- attributes.register_attribute(User, 'user_id', uselist=False, useobject=False)
- attributes.register_attribute(User, 'user_name', uselist=False, useobject=False)
- attributes.register_attribute(User, 'addresses', uselist = True, useobject=True)
- attributes.register_attribute(Address, 'address_id', uselist=False, useobject=False)
- attributes.register_attribute(Address, 'email_address', uselist=False, useobject=False)
+ attributes.register_attribute(User, 'user_id', uselist=False,
+ useobject=False)
+ attributes.register_attribute(User, 'user_name', uselist=False,
+ useobject=False)
+ attributes.register_attribute(User, 'addresses', uselist=True,
+ useobject=True)
+ attributes.register_attribute(Address, 'address_id',
+ uselist=False, useobject=False)
+ attributes.register_attribute(Address, 'email_address',
+ uselist=False, useobject=False)
u = User()
u.user_id = 7
a.email_address = 'lala@123.com'
u.addresses.append(a)
- self.assert_(u.user_id == 7 and u.user_name == 'john' and u.addresses[0].email_address == 'lala@123.com')
- u, attributes.instance_state(a).commit_all(attributes.instance_dict(a))
- self.assert_(u.user_id == 7 and u.user_name == 'john' and u.addresses[0].email_address == 'lala@123.com')
+ self.assert_(u.user_id == 7 and u.user_name == 'john'
+ and u.addresses[0].email_address == 'lala@123.com')
+ (u,
+ attributes.instance_state(a).commit_all(attributes.instance_dict(a)))
+ self.assert_(u.user_id == 7 and u.user_name == 'john'
+ and u.addresses[0].email_address == 'lala@123.com')
u.user_name = 'heythere'
a = Address()
a.address_id = 11
a.email_address = 'foo@bar.com'
u.addresses.append(a)
- self.assert_(u.user_id == 7 and u.user_name == 'heythere' and u.addresses[0].email_address == 'lala@123.com' and u.addresses[1].email_address == 'foo@bar.com')
+
+ eq_(u.user_id, 7)
+ eq_(u.user_name, 'heythere')
+ eq_(u.addresses[0].email_address,'lala@123.com')
+ eq_(u.addresses[1].email_address,'foo@bar.com')
def test_extension_commit_attr(self):
"""test that an extension which commits attribute history
def hist(key, shouldmatch, fn, *arg):
attributes.instance_state(f1).commit_all(attributes.instance_dict(f1))
fn(*arg)
- histories.append((shouldmatch, attributes.get_history(f1, key)))
+ histories.append((shouldmatch,
+ attributes.get_history(f1, key)))
f1 = Foo()
hist('bars', True, f1.bars.append, b3)
x.bars
b = Bar(id=4)
b.foos.append(x)
- attributes.instance_state(x).expire_attributes(attributes.instance_dict(x), ['bars'])
+ attributes.instance_state(x).expire_attributes(attributes.instance_dict(x),
+ ['bars'])
assert_raises(AssertionError, b.foos.remove, x)
def test_scalar_listener(self):
- # listeners on ScalarAttributeImpl and MutableScalarAttributeImpl aren't used normally.
- # test that they work for the benefit of user extensions
+
+ # listeners on ScalarAttributeImpl and
+ # MutableScalarAttributeImpl aren't used normally. test that
+ # they work for the benefit of user extensions
+
class Foo(object):
+
pass
results = []
return child
instrumentation.register_class(Foo)
- attributes.register_attribute(Foo, 'x', uselist=False, mutable_scalars=False, useobject=False, extension=ReceiveEvents())
- attributes.register_attribute(Foo, 'y', uselist=False, mutable_scalars=True, useobject=False, copy_function=lambda x:x, extension=ReceiveEvents())
+ attributes.register_attribute(Foo, 'x', uselist=False,
+ mutable_scalars=False, useobject=False,
+ extension=ReceiveEvents())
+ attributes.register_attribute(Foo, 'y', uselist=False,
+ mutable_scalars=True, useobject=False,
+ copy_function=lambda x: x, extension=ReceiveEvents())
f = Foo()
f.x = 5
def test_inheritance2(self):
- """test that the attribute manager can properly traverse the managed attributes of an object,
- if the object is of a descendant class with managed attributes in the parent class"""
- class Foo(object):pass
- class Bar(Foo):pass
+ """test that the attribute manager can properly traverse the
+ managed attributes of an object, if the object is of a
+ descendant class with managed attributes in the parent class"""
+
+ class Foo(object):
+ pass
+
+ class Bar(Foo):
+ pass
class Element(object):
_state = True
instrumentation.register_class(Foo)
instrumentation.register_class(Bar)
- attributes.register_attribute(Foo, 'element', uselist=False, useobject=True)
+ attributes.register_attribute(Foo, 'element', uselist=False,
+ useobject=True)
el = Element()
x = Bar()
x.element = el
- eq_(attributes.get_state_history(attributes.instance_state(x), 'element'), ([el], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(x),
+ 'element'), ([el], (), ()))
attributes.instance_state(x).commit_all(attributes.instance_dict(x))
-
- (added, unchanged, deleted) = attributes.get_state_history(attributes.instance_state(x), 'element')
+ added, unchanged, deleted = \
+ attributes.get_state_history(attributes.instance_state(x),
+ 'element')
assert added == ()
assert unchanged == [el]
instrumentation.register_class(Foo)
instrumentation.register_class(Bar)
+ bar1, bar2, bar3, bar4 = [Bar(id=1), Bar(id=2), Bar(id=3),
+ Bar(id=4)]
- bar1, bar2, bar3, bar4 = [Bar(id=1), Bar(id=2), Bar(id=3), Bar(id=4)]
def func1(state, passive):
- return "this is func 1"
+ return 'this is func 1'
+
def func2(state, passive):
return [bar1, bar2, bar3]
callable_=func2, useobject=True)
attributes.register_attribute(Bar, 'id', uselist=False,
useobject=True)
-
x = Foo()
attributes.instance_state(x).commit_all(attributes.instance_dict(x))
x.col2.append(bar4)
- eq_(attributes.get_state_history(attributes.instance_state(x), 'col2'), ([bar4], [bar1, bar2, bar3], []))
+ eq_(attributes.get_state_history(attributes.instance_state(x),
+ 'col2'), ([bar4], [bar1, bar2, bar3], []))
def test_parenttrack(self):
- class Foo(object):pass
- class Bar(object):pass
+ class Foo(object):
+ pass
+
+ class Bar(object):
+ pass
instrumentation.register_class(Foo)
instrumentation.register_class(Bar)
-
- attributes.register_attribute(Foo, 'element', uselist=False, trackparent=True, useobject=True)
- attributes.register_attribute(Bar, 'element', uselist=False, trackparent=True, useobject=True)
-
+ attributes.register_attribute(Foo, 'element', uselist=False,
+ trackparent=True, useobject=True)
+ attributes.register_attribute(Bar, 'element', uselist=False,
+ trackparent=True, useobject=True)
f1 = Foo()
f2 = Foo()
b1 = Bar()
b2 = Bar()
-
f1.element = b1
b2.element = f2
-
assert attributes.has_parent(Foo, b1, 'element')
assert not attributes.has_parent(Foo, b2, 'element')
assert not attributes.has_parent(Foo, f2, 'element')
assert attributes.has_parent(Bar, f2, 'element')
-
b2.element = None
assert not attributes.has_parent(Bar, f2, 'element')
- # test that double assignment doesn't accidentally reset the 'parent' flag.
+ # test that double assignment doesn't accidentally reset the
+ # 'parent' flag.
+
b3 = Bar()
f4 = Foo()
b3.element = f4
def test_mutablescalars(self):
"""test detection of changes on mutable scalar items"""
- class Foo(object):pass
+
+ class Foo(object):
+ pass
instrumentation.register_class(Foo)
- attributes.register_attribute(Foo, 'element', uselist=False, copy_function=lambda x:[y for y in x], mutable_scalars=True, useobject=False)
+ attributes.register_attribute(Foo, 'element', uselist=False,
+ copy_function=lambda x: [y for y in x],
+ mutable_scalars=True, useobject=False)
x = Foo()
x.element = ['one', 'two', 'three']
attributes.instance_state(x).commit_all(attributes.instance_dict(x))
x.element[1] = 'five'
assert attributes.instance_state(x).modified
-
instrumentation.unregister_class(Foo)
-
instrumentation.register_class(Foo)
- attributes.register_attribute(Foo, 'element', uselist=False, useobject=False)
+ attributes.register_attribute(Foo, 'element', uselist=False,
+ useobject=False)
x = Foo()
x.element = ['one', 'two', 'three']
attributes.instance_state(x).commit_all(attributes.instance_dict(x))
assert not attributes.instance_state(x).modified
def test_descriptorattributes(self):
- """changeset: 1633 broke ability to use ORM to map classes with unusual
- descriptor attributes (for example, classes that inherit from ones
- implementing zope.interface.Interface).
- This is a simple regression test to prevent that defect.
- """
+ """changeset: 1633 broke ability to use ORM to map classes with
+ unusual descriptor attributes (for example, classes that inherit
+ from ones implementing zope.interface.Interface). This is a
+ simple regression test to prevent that defect. """
+
class des(object):
+
def __get__(self, instance, owner):
raise AttributeError('fake attribute')
def test_collectionclasses(self):
- class Foo(object):pass
- instrumentation.register_class(Foo)
+ class Foo(object):
+ pass
- attributes.register_attribute(Foo, "collection", uselist=True, typecallable=set, useobject=True)
- assert attributes.manager_of_class(Foo).is_instrumented("collection")
+ instrumentation.register_class(Foo)
+ attributes.register_attribute(Foo, 'collection', uselist=True,
+ typecallable=set, useobject=True)
+ assert attributes.manager_of_class(Foo).is_instrumented('collection'
+ )
assert isinstance(Foo().collection, set)
-
- attributes.unregister_attribute(Foo, "collection")
- assert not attributes.manager_of_class(Foo).is_instrumented("collection")
-
+ attributes.unregister_attribute(Foo, 'collection')
+ assert not attributes.manager_of_class(Foo).is_instrumented('collection'
+ )
try:
- attributes.register_attribute(Foo, "collection", uselist=True, typecallable=dict, useobject=True)
+ attributes.register_attribute(Foo, 'collection',
+ uselist=True, typecallable=dict, useobject=True)
assert False
except sa_exc.ArgumentError, e:
- assert str(e) == "Type InstrumentedDict must elect an appender method to be a collection class"
+ assert str(e) \
+ == 'Type InstrumentedDict must elect an appender '\
+ 'method to be a collection class'
class MyDict(dict):
+
@collection.appender
def append(self, item):
self[item.foo] = item
+
@collection.remover
def remove(self, item):
del self[item.foo]
- attributes.register_attribute(Foo, "collection", uselist=True, typecallable=MyDict, useobject=True)
+
+ attributes.register_attribute(Foo, 'collection', uselist=True,
+ typecallable=MyDict, useobject=True)
assert isinstance(Foo().collection, MyDict)
+ attributes.unregister_attribute(Foo, 'collection')
- attributes.unregister_attribute(Foo, "collection")
+ class MyColl(object):
+ pass
- class MyColl(object):pass
try:
- attributes.register_attribute(Foo, "collection", uselist=True, typecallable=MyColl, useobject=True)
+ attributes.register_attribute(Foo, 'collection',
+ uselist=True, typecallable=MyColl, useobject=True)
assert False
except sa_exc.ArgumentError, e:
- assert str(e) == "Type MyColl must elect an appender method to be a collection class"
+ assert str(e) \
+ == 'Type MyColl must elect an appender method to be a '\
+ 'collection class'
class MyColl(object):
+
@collection.iterator
def __iter__(self):
return iter([])
+
@collection.appender
def append(self, item):
pass
+
@collection.remover
def remove(self, item):
pass
- attributes.register_attribute(Foo, "collection", uselist=True, typecallable=MyColl, useobject=True)
+
+ attributes.register_attribute(Foo, 'collection', uselist=True,
+ typecallable=MyColl, useobject=True)
try:
Foo().collection
assert True
pass
instrumentation.register_class(Foo)
- attributes.register_attribute(Foo, 'someattr', uselist=False, useobject=False)
-
+ attributes.register_attribute(Foo, 'someattr', uselist=False,
+ useobject=False)
f = Foo()
- eq_(Foo.someattr.impl.get_committed_value(attributes.instance_state(f), attributes.instance_dict(f)), None)
-
+ eq_(Foo.someattr.impl.get_committed_value(attributes.instance_state(f),
+ attributes.instance_dict(f)), None)
f.someattr = 3
- eq_(Foo.someattr.impl.get_committed_value(attributes.instance_state(f), attributes.instance_dict(f)), None)
-
+ eq_(Foo.someattr.impl.get_committed_value(attributes.instance_state(f),
+ attributes.instance_dict(f)), None)
f = Foo()
f.someattr = 3
- eq_(Foo.someattr.impl.get_committed_value(attributes.instance_state(f), attributes.instance_dict(f)), None)
-
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(Foo.someattr.impl.get_committed_value(attributes.instance_state(f), attributes.instance_dict(f)), 3)
+ eq_(Foo.someattr.impl.get_committed_value(attributes.instance_state(f),
+ attributes.instance_dict(f)), None)
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
+ eq_(Foo.someattr.impl.get_committed_value(attributes.instance_state(f),
+ attributes.instance_dict(f)), 3)
def test_scalar(self):
class Foo(_base.BasicEntity):
pass
instrumentation.register_class(Foo)
- attributes.register_attribute(Foo, 'someattr', uselist=False, useobject=False)
+ attributes.register_attribute(Foo, 'someattr', uselist=False,
+ useobject=False)
# case 1. new object
- f = Foo()
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), (), ()))
-
- f.someattr = "hi"
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), (['hi'], (), ()))
-
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), ['hi'], ()))
+ f = Foo()
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), (), ()))
+ f.someattr = 'hi'
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), (['hi'], (), ()))
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), ['hi'], ()))
f.someattr = 'there'
-
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), (['there'], (), ['hi']))
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
-
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), ['there'], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), (['there'], (), ['hi']))
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), ['there'], ()))
del f.someattr
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), (), ['there']))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), (), ['there']))
+
+ # case 2. object with direct dictionary settings (similar to a
+ # load operation)
- # case 2. object with direct dictionary settings (similar to a load operation)
f = Foo()
f.__dict__['someattr'] = 'new'
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), ['new'], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), ['new'], ()))
f.someattr = 'old'
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), (['old'], (), ['new']))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), (['old'], (), ['new']))
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), ['old'], ()))
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), ['old'], ()))
+ # setting None on uninitialized is currently a change for a
+ # scalar attribute no lazyload occurs so this allows overwrite
+ # operation to proceed
- # setting None on uninitialized is currently a change for a scalar attribute
- # no lazyload occurs so this allows overwrite operation to proceed
f = Foo()
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), (), ()))
f.someattr = None
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([None], (), ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([None], (), ()))
f = Foo()
f.__dict__['someattr'] = 'new'
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), ['new'], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), ['new'], ()))
f.someattr = None
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([None], (), ['new']))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([None], (), ['new']))
# set same value twice
+
f = Foo()
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
f.someattr = 'one'
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), (['one'], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), (['one'], (), ()))
f.someattr = 'two'
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), (['two'], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), (['two'], (), ()))
def test_mutable_scalar(self):
pass
instrumentation.register_class(Foo)
- attributes.register_attribute(Foo, 'someattr', uselist=False, useobject=False, mutable_scalars=True, copy_function=dict)
+ attributes.register_attribute(Foo, 'someattr', uselist=False,
+ useobject=False, mutable_scalars=True,
+ copy_function=dict)
# case 1. new object
- f = Foo()
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), (), ()))
-
- f.someattr = {'foo':'hi'}
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([{'foo':'hi'}], (), ()))
-
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [{'foo':'hi'}], ()))
- eq_(attributes.instance_state(f).committed_state['someattr'], {'foo':'hi'})
+ f = Foo()
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), (), ()))
+ f.someattr = {'foo': 'hi'}
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([{'foo': 'hi'}], (), ()))
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [{'foo': 'hi'}], ()))
+ eq_(attributes.instance_state(f).committed_state['someattr'],
+ {'foo': 'hi'})
f.someattr['foo'] = 'there'
- eq_(attributes.instance_state(f).committed_state['someattr'], {'foo':'hi'})
-
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([{'foo':'there'}], (), [{'foo':'hi'}]))
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
-
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [{'foo':'there'}], ()))
+ eq_(attributes.instance_state(f).committed_state['someattr'],
+ {'foo': 'hi'})
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([{'foo': 'there'}], (), [{'foo': 'hi'}]))
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [{'foo': 'there'}], ()))
+
+ # case 2. object with direct dictionary settings (similar to a
+ # load operation)
- # case 2. object with direct dictionary settings (similar to a load operation)
f = Foo()
- f.__dict__['someattr'] = {'foo':'new'}
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [{'foo':'new'}], ()))
-
- f.someattr = {'foo':'old'}
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([{'foo':'old'}], (), [{'foo':'new'}]))
-
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [{'foo':'old'}], ()))
+ f.__dict__['someattr'] = {'foo': 'new'}
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [{'foo': 'new'}], ()))
+ f.someattr = {'foo': 'old'}
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([{'foo': 'old'}], (), [{'foo': 'new'}]))
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [{'foo': 'old'}], ()))
def test_flag_modified(self):
class Foo(_base.BasicEntity):
pass
instrumentation.register_class(Foo)
- attributes.register_attribute(Foo, 'someattr', uselist=False, useobject=False)
-
+ attributes.register_attribute(Foo, 'someattr', uselist=False,
+ useobject=False)
f = Foo()
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), (), ()))
-
- f.someattr = {'a':'b'}
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([{'a':'b'},], (), ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), (), ()))
+ f.someattr = {'a': 'b'}
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([{'a': 'b'}], (), ()))
attributes.instance_state(f).commit_all(attributes.instance_dict(f))
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [{'a':'b'},], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [{'a': 'b'}], ()))
f.someattr['a'] = 'c'
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [{'a':'c'},], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [{'a': 'c'}], ()))
attributes.flag_modified(f, 'someattr')
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([{'a':'c'},], (), ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([{'a': 'c'}], (), ()))
f.someattr = ['a']
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([['a']], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([['a']], (), ()))
attributes.instance_state(f).commit_all(attributes.instance_dict(f))
-
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [['a']], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [['a']], ()))
f.someattr[0] = 'b'
f.someattr.append('c')
-
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [['b', 'c']], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [['b', 'c']], ()))
attributes.flag_modified(f, 'someattr')
-
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([['b', 'c']], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([['b', 'c']], (), ()))
def test_use_object(self):
class Foo(_base.BasicEntity):
old = Bar(name='old')
instrumentation.register_class(Foo)
- attributes.register_attribute(Foo, 'someattr', uselist=False, useobject=True)
+ attributes.register_attribute(Foo, 'someattr', uselist=False,
+ useobject=True)
# case 1. new object
- f = Foo()
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [None], ()))
+ f = Foo()
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [None], ()))
f.someattr = hi
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([hi], (), ()))
-
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [hi], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([hi], (), ()))
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [hi], ()))
f.someattr = there
-
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([there], (), [hi]))
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
-
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [there], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([there], (), [hi]))
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [there], ()))
del f.someattr
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([None], (), [there]))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([None], (), [there]))
+
+ # case 2. object with direct dictionary settings (similar to a
+ # load operation)
- # case 2. object with direct dictionary settings (similar to a load operation)
f = Foo()
f.__dict__['someattr'] = 'new'
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), ['new'], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), ['new'], ()))
f.someattr = old
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([old], (), ['new']))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([old], (), ['new']))
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [old], ()))
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [old], ()))
+ # setting None on uninitialized is currently not a change for an
+ # object attribute (this is different than scalar attribute). a
+ # lazyload has occured so if its None, its really None
- # setting None on uninitialized is currently not a change for an object attribute
- # (this is different than scalar attribute). a lazyload has occured so if its
- # None, its really None
f = Foo()
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [None], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [None], ()))
f.someattr = None
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [None], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [None], ()))
f = Foo()
f.__dict__['someattr'] = 'new'
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), ['new'], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), ['new'], ()))
f.someattr = None
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([None], (), ['new']))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([None], (), ['new']))
# set same value twice
+
f = Foo()
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
f.someattr = 'one'
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), (['one'], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), (['one'], (), ()))
f.someattr = 'two'
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), (['two'], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), (['two'], (), ()))
def test_object_collections_set(self):
class Foo(_base.BasicEntity):
instrumentation.register_class(Foo)
instrumentation.register_class(Bar)
- attributes.register_attribute(Foo, 'someattr', uselist=True, useobject=True)
-
+ attributes.register_attribute(Foo, 'someattr', uselist=True,
+ useobject=True)
hi = Bar(name='hi')
there = Bar(name='there')
old = Bar(name='old')
new = Bar(name='new')
# case 1. new object
- f = Foo()
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [], ()))
+ f = Foo()
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [], ()))
f.someattr = [hi]
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([hi], [], []))
-
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [hi], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([hi], [], []))
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [hi], ()))
f.someattr = [there]
-
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([there], [], [hi]))
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
-
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [there], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([there], [], [hi]))
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [there], ()))
f.someattr = [hi]
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([hi], [], [there]))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([hi], [], [there]))
f.someattr = [old, new]
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([old, new], [], [there]))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([old, new], [], [there]))
+
+ # case 2. object with direct settings (similar to a load
+ # operation)
- # case 2. object with direct settings (similar to a load operation)
f = Foo()
collection = attributes.init_collection(f, 'someattr')
collection.append_without_event(new)
attributes.instance_state(f).commit_all(attributes.instance_dict(f))
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [new], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [new], ()))
f.someattr = [old]
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([old], [], [new]))
-
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [old], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([old], [], [new]))
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [old], ()))
def test_dict_collections(self):
class Foo(_base.BasicEntity):
pass
from sqlalchemy.orm.collections import attribute_mapped_collection
-
instrumentation.register_class(Foo)
instrumentation.register_class(Bar)
- attributes.register_attribute(Foo, 'someattr', uselist=True, useobject=True, typecallable=attribute_mapped_collection('name'))
-
+ attributes.register_attribute(Foo, 'someattr', uselist=True,
+ useobject=True,
+ typecallable=attribute_mapped_collection('name'))
hi = Bar(name='hi')
there = Bar(name='there')
old = Bar(name='old')
new = Bar(name='new')
-
f = Foo()
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [], ()))
f.someattr['hi'] = hi
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([hi], [], []))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([hi], [], []))
f.someattr['there'] = there
- eq_(tuple([set(x) for x in attributes.get_state_history(attributes.instance_state(f), 'someattr')]), (set([hi, there]), set(), set()))
-
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(tuple([set(x) for x in attributes.get_state_history(attributes.instance_state(f), 'someattr')]), (set(), set([hi, there]), set()))
+ eq_(tuple([set(x) for x in
+ attributes.get_state_history(attributes.instance_state(f),
+ 'someattr')]), (set([hi, there]), set(), set()))
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
+ eq_(tuple([set(x) for x in
+ attributes.get_state_history(attributes.instance_state(f),
+ 'someattr')]), (set(), set([hi, there]), set()))
def test_object_collections_mutate(self):
class Foo(_base.BasicEntity):
pass
instrumentation.register_class(Foo)
- attributes.register_attribute(Foo, 'someattr', uselist=True, useobject=True)
- attributes.register_attribute(Foo, 'id', uselist=False, useobject=False)
+ attributes.register_attribute(Foo, 'someattr', uselist=True,
+ useobject=True)
+ attributes.register_attribute(Foo, 'id', uselist=False,
+ useobject=False)
instrumentation.register_class(Bar)
-
hi = Bar(name='hi')
there = Bar(name='there')
old = Bar(name='old')
new = Bar(name='new')
# case 1. new object
- f = Foo(id=1)
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [], ()))
+ f = Foo(id=1)
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [], ()))
f.someattr.append(hi)
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([hi], [], []))
-
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [hi], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([hi], [], []))
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [hi], ()))
f.someattr.append(there)
-
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([there], [hi], []))
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
-
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [hi, there], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([there], [hi], []))
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [hi, there], ()))
f.someattr.remove(there)
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([], [hi], [there]))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([], [hi], [there]))
f.someattr.append(old)
f.someattr.append(new)
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([old, new], [hi], [there]))
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [hi, old, new], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([old, new], [hi], [there]))
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [hi, old, new], ()))
f.someattr.pop(0)
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([], [old, new], [hi]))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([], [old, new], [hi]))
+
+ # case 2. object with direct settings (similar to a load
+ # operation)
- # case 2. object with direct settings (similar to a load operation)
f = Foo()
f.__dict__['id'] = 1
collection = attributes.init_collection(f, 'someattr')
collection.append_without_event(new)
attributes.instance_state(f).commit_all(attributes.instance_dict(f))
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [new], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [new], ()))
f.someattr.append(old)
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([old], [new], []))
-
- attributes.instance_state(f).commit(attributes.instance_dict(f), ['someattr'])
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [new, old], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([old], [new], []))
+ attributes.instance_state(f).commit(attributes.instance_dict(f),
+ ['someattr'])
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [new, old], ()))
f = Foo()
collection = attributes.init_collection(f, 'someattr')
collection.append_without_event(new)
attributes.instance_state(f).commit_all(attributes.instance_dict(f))
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [new], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [new], ()))
f.id = 1
f.someattr.remove(new)
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([], [], [new]))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([], [], [new]))
# case 3. mixing appends with sets
+
f = Foo()
f.someattr.append(hi)
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([hi], [], []))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([hi], [], []))
f.someattr.append(there)
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([hi, there], [], []))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([hi, there], [], []))
f.someattr = [there]
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([there], [], []))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([there], [], []))
# case 4. ensure duplicates show up, order is maintained
+
f = Foo()
f.someattr.append(hi)
f.someattr.append(there)
f.someattr.append(hi)
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([hi, there, hi], [], []))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([hi, there, hi], [], []))
attributes.instance_state(f).commit_all(attributes.instance_dict(f))
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ((), [hi, there, hi], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ((), [hi, there, hi], ()))
f.someattr = []
- eq_(attributes.get_state_history(attributes.instance_state(f), 'someattr'), ([], [], [hi, there, hi]))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'someattr'), ([], [], [hi, there, hi]))
def test_collections_via_backref(self):
class Foo(_base.BasicEntity):
instrumentation.register_class(Foo)
instrumentation.register_class(Bar)
- attributes.register_attribute(Foo, 'bars', uselist=True, backref='foo', trackparent=True, useobject=True)
- attributes.register_attribute(Bar, 'foo', uselist=False, backref='bars', trackparent=True, useobject=True)
-
+ attributes.register_attribute(Foo, 'bars', uselist=True,
+ backref='foo', trackparent=True, useobject=True)
+ attributes.register_attribute(Bar, 'foo', uselist=False,
+ backref='bars', trackparent=True, useobject=True)
f1 = Foo()
b1 = Bar()
- eq_(attributes.get_state_history(attributes.instance_state(f1), 'bars'), ((), [], ()))
- eq_(attributes.get_state_history(attributes.instance_state(b1), 'foo'), ((), [None], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f1),
+ 'bars'), ((), [], ()))
+ eq_(attributes.get_state_history(attributes.instance_state(b1),
+ 'foo'), ((), [None], ()))
- #b1.foo = f1
- f1.bars.append(b1)
- eq_(attributes.get_state_history(attributes.instance_state(f1), 'bars'), ([b1], [], []))
- eq_(attributes.get_state_history(attributes.instance_state(b1), 'foo'), ([f1], (), ()))
+ # b1.foo = f1
+ f1.bars.append(b1)
+ eq_(attributes.get_state_history(attributes.instance_state(f1),
+ 'bars'), ([b1], [], []))
+ eq_(attributes.get_state_history(attributes.instance_state(b1),
+ 'foo'), ([f1], (), ()))
b2 = Bar()
f1.bars.append(b2)
- eq_(attributes.get_state_history(attributes.instance_state(f1), 'bars'), ([b1, b2], [], []))
- eq_(attributes.get_state_history(attributes.instance_state(b1), 'foo'), ([f1], (), ()))
- eq_(attributes.get_state_history(attributes.instance_state(b2), 'foo'), ([f1], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(f1),
+ 'bars'), ([b1, b2], [], []))
+ eq_(attributes.get_state_history(attributes.instance_state(b1),
+ 'foo'), ([f1], (), ()))
+ eq_(attributes.get_state_history(attributes.instance_state(b2),
+ 'foo'), ([f1], (), ()))
def test_lazy_backref_collections(self):
class Foo(_base.BasicEntity):
useobject=True)
attributes.register_attribute(Bar, 'foo', uselist=False,
backref='bars', trackparent=True, useobject=True)
-
- bar1, bar2, bar3, bar4 = [Bar(id=1), Bar(id=2), Bar(id=3), Bar(id=4)]
+ bar1, bar2, bar3, bar4 = [Bar(id=1), Bar(id=2), Bar(id=3),
+ Bar(id=4)]
lazy_load = [bar1, bar2, bar3]
-
f = Foo()
bar4 = Bar()
bar4.foo = f
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bars'), ([bar4], [bar1, bar2, bar3], []))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bars'), ([bar4], [bar1, bar2, bar3], []))
lazy_load = None
f = Foo()
bar4 = Bar()
bar4.foo = f
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bars'), ([bar4], [], []))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bars'), ([bar4], [], []))
lazy_load = [bar1, bar2, bar3]
- attributes.instance_state(f).expire_attributes(attributes.instance_dict(f), ['bars'])
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bars'), ((), [bar1, bar2, bar3], ()))
+ attributes.instance_state(f).expire_attributes(attributes.instance_dict(f),
+ ['bars'])
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bars'), ((), [bar1, bar2, bar3], ()))
def test_collections_via_lazyload(self):
class Foo(_base.BasicEntity):
instrumentation.register_class(Bar)
attributes.register_attribute(Foo, 'bars', uselist=True,
callable_=lazyload, trackparent=True, useobject=True)
-
- bar1, bar2, bar3, bar4 = [Bar(id=1), Bar(id=2), Bar(id=3), Bar(id=4)]
+ bar1, bar2, bar3, bar4 = [Bar(id=1), Bar(id=2), Bar(id=3),
+ Bar(id=4)]
lazy_load = [bar1, bar2, bar3]
-
f = Foo()
f.bars = []
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bars'), ([], [], [bar1, bar2, bar3]))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bars'), ([], [], [bar1, bar2, bar3]))
f = Foo()
f.bars.append(bar4)
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bars'), ([bar4], [bar1, bar2, bar3], []) )
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bars'), ([bar4], [bar1, bar2, bar3], []))
f = Foo()
f.bars.remove(bar2)
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bars'), ([], [bar1, bar3], [bar2]))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bars'), ([], [bar1, bar3], [bar2]))
f.bars.append(bar4)
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bars'), ([bar4], [bar1, bar3], [bar2]))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bars'), ([bar4], [bar1, bar3], [bar2]))
f = Foo()
del f.bars[1]
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bars'), ([], [bar1, bar3], [bar2]))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bars'), ([], [bar1, bar3], [bar2]))
lazy_load = None
f = Foo()
f.bars.append(bar2)
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bars'), ([bar2], [], []))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bars'), ([bar2], [], []))
def test_scalar_via_lazyload(self):
class Foo(_base.BasicEntity):
pass
lazy_load = None
+
def lazyload(state, passive):
return lazy_load
callable_=lazyload, useobject=False)
lazy_load = 'hi'
- # with scalar non-object and active_history=False, the lazy callable is only executed on gets, not history
- # operations
+ # with scalar non-object and active_history=False, the lazy
+ # callable is only executed on gets, not history operations
f = Foo()
- eq_(f.bar, "hi")
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ((), ["hi"], ()))
-
+ eq_(f.bar, 'hi')
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bar'), ((), ['hi'], ()))
f = Foo()
f.bar = None
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ([None], (), ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bar'), ([None], (), ()))
f = Foo()
- f.bar = "there"
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), (["there"], (), ()))
- f.bar = "hi"
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), (["hi"], (), ()))
-
+ f.bar = 'there'
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bar'), (['there'], (), ()))
+ f.bar = 'hi'
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bar'), (['hi'], (), ()))
f = Foo()
- eq_(f.bar, "hi")
+ eq_(f.bar, 'hi')
del f.bar
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ((), (), ["hi"]))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bar'), ((), (), ['hi']))
assert f.bar is None
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ([None], (), ["hi"]))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bar'), ([None], (), ['hi']))
def test_scalar_via_lazyload_with_active(self):
class Foo(_base.BasicEntity):
pass
lazy_load = None
+
def lazyload(state, passive):
return lazy_load
active_history=True)
lazy_load = 'hi'
- # active_history=True means the lazy callable is executed on set as well as get,
- # causing the old value to appear in the history
+ # active_history=True means the lazy callable is executed on set
+ # as well as get, causing the old value to appear in the history
f = Foo()
- eq_(f.bar, "hi")
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ((), ["hi"], ()))
-
+ eq_(f.bar, 'hi')
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bar'), ((), ['hi'], ()))
f = Foo()
f.bar = None
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ([None], (), ['hi']))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bar'), ([None], (), ['hi']))
f = Foo()
- f.bar = "there"
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), (["there"], (), ['hi']))
- f.bar = "hi"
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ((), ["hi"], ()))
-
+ f.bar = 'there'
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bar'), (['there'], (), ['hi']))
+ f.bar = 'hi'
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bar'), ((), ['hi'], ()))
f = Foo()
- eq_(f.bar, "hi")
+ eq_(f.bar, 'hi')
del f.bar
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ((), (), ["hi"]))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bar'), ((), (), ['hi']))
assert f.bar is None
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ([None], (), ["hi"]))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bar'), ([None], (), ['hi']))
def test_scalar_object_via_lazyload(self):
class Foo(_base.BasicEntity):
bar1, bar2 = [Bar(id=1), Bar(id=2)]
lazy_load = bar1
- # with scalar object, the lazy callable is only executed on gets and history
- # operations
+ # with scalar object, the lazy callable is only executed on gets
+ # and history operations
f = Foo()
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ((), [bar1], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bar'), ((), [bar1], ()))
f = Foo()
f.bar = None
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ([None], (), [bar1]))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bar'), ([None], (), [bar1]))
f = Foo()
f.bar = bar2
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ([bar2], (), [bar1]))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bar'), ([bar2], (), [bar1]))
f.bar = bar1
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ((), [bar1], ()))
-
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bar'), ((), [bar1], ()))
f = Foo()
eq_(f.bar, bar1)
del f.bar
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ([None], (), [bar1]))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bar'), ([None], (), [bar1]))
assert f.bar is None
- eq_(attributes.get_state_history(attributes.instance_state(f), 'bar'), ([None], (), [bar1]))
+ eq_(attributes.get_state_history(attributes.instance_state(f),
+ 'bar'), ([None], (), [bar1]))
+
+ def test_deprecated_flags(self):
+ assert_raises_message(
+ sa_exc.SADeprecationWarning,
+ "Passing True for 'passive' is deprecated. "
+ "Use attributes.PASSIVE_NO_INITIALIZE",
+ attributes.get_history, object(), 'foo', True
+ )
+
+ assert_raises_message(
+ sa_exc.SADeprecationWarning,
+ "Passing False for 'passive' is deprecated. "
+ "Use attributes.PASSIVE_OFF",
+ attributes.get_history, object(), 'foo', False
+ )
class ListenerTest(_base.ORMTest):
def test_receive_changes(self):
instrumentation.register_class(Foo)
instrumentation.register_class(Bar)
- attributes.register_attribute(Foo, 'data', uselist=False, useobject=False)
- attributes.register_attribute(Foo, 'barlist', uselist=True, useobject=True)
- attributes.register_attribute(Foo, 'barset', typecallable=set, uselist=True, useobject=True)
- attributes.register_attribute(Bar, 'data', uselist=False, useobject=False)
-
+ attributes.register_attribute(Foo, 'data', uselist=False,
+ useobject=False)
+ attributes.register_attribute(Foo, 'barlist', uselist=True,
+ useobject=True)
+ attributes.register_attribute(Foo, 'barset', typecallable=set,
+ uselist=True, useobject=True)
+ attributes.register_attribute(Bar, 'data', uselist=False,
+ useobject=False)
event.listen(Foo.data, 'set', on_set, retval=True)
event.listen(Foo.barlist, 'append', append, retval=True)
event.listen(Foo.barset, 'append', append, retval=True)
-
f1 = Foo()
- f1.data = "some data"
- eq_(f1.data, "some data modified")
+ f1.data = 'some data'
+ eq_(f1.data, 'some data modified')
b1 = Bar()
- b1.data = "some bar"
+ b1.data = 'some bar'
f1.barlist.append(b1)
- assert b1.data == "some bar"
- assert f1.barlist[0].data == "some bar appended"
-
+ assert b1.data == 'some bar'
+ assert f1.barlist[0].data == 'some bar appended'
f1.barset.add(b1)
- assert f1.barset.pop().data == "some bar appended"
+ assert f1.barset.pop().data == 'some bar appended'
def test_propagate(self):
classes = [None, None, None]
instrumentation.register_class(classes[2])
def attr_a():
- attributes.register_attribute(classes[0], 'attrib', uselist=False, useobject=False)
+ attributes.register_attribute(classes[0], 'attrib',
+ uselist=False, useobject=False)
def attr_b():
- attributes.register_attribute(classes[1], 'attrib', uselist=False, useobject=False)
+ attributes.register_attribute(classes[1], 'attrib',
+ uselist=False, useobject=False)
def attr_c():
- attributes.register_attribute(classes[2], 'attrib', uselist=False, useobject=False)
+ attributes.register_attribute(classes[2], 'attrib',
+ uselist=False, useobject=False)
def set(state, value, oldvalue, initiator):
canary.append(value)