class FooWithEq(object):
+
def __init__(self, **kw):
for k in kw:
setattr(self, k, kw[k])
class Point(MutableComposite):
+
def __init__(self, x, y):
self.x = x
self.y = y
class MyPoint(Point):
+
@classmethod
def coerce(cls, key, value):
if isinstance(value, tuple):
eq_(f1.non_mutable_data, {'a': 'b'})
+
class MutableWithScalarPickleTest(_MutableDictTestBase, fixtures.MappedTest):
+
@classmethod
def define_tables(cls, metadata):
MutableDict = cls._type_fixture()
mutable_pickle = MutableDict.as_mutable(PickleType)
Table('foo', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('skip', mutable_pickle),
- Column('data', mutable_pickle),
- Column('non_mutable_data', PickleType),
- Column('unrelated_data', String(50))
- )
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('skip', mutable_pickle),
+ Column('data', mutable_pickle),
+ Column('non_mutable_data', PickleType),
+ Column('unrelated_data', String(50))
+ )
def test_non_mutable(self):
self._test_non_mutable()
+
class MutableWithScalarJSONTest(_MutableDictTestBase, fixtures.MappedTest):
+
@classmethod
def define_tables(cls, metadata):
import json
MutableDict = cls._type_fixture()
Table('foo', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('data', MutableDict.as_mutable(JSONEncodedDict)),
- Column('non_mutable_data', JSONEncodedDict),
- Column('unrelated_data', String(50))
- )
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('data', MutableDict.as_mutable(JSONEncodedDict)),
+ Column('non_mutable_data', JSONEncodedDict),
+ Column('unrelated_data', String(50))
+ )
def test_non_mutable(self):
self._test_non_mutable()
-class MutableAssocWithAttrInheritTest(_MutableDictTestBase, fixtures.MappedTest):
+
+class MutableAssocWithAttrInheritTest(_MutableDictTestBase,
+ fixtures.MappedTest):
+
@classmethod
def define_tables(cls, metadata):
Table('foo', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('data', PickleType),
- Column('non_mutable_data', PickleType),
- Column('unrelated_data', String(50))
- )
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('data', PickleType),
+ Column('non_mutable_data', PickleType),
+ Column('unrelated_data', String(50))
+ )
Table('subfoo', metadata,
- Column('id', Integer, ForeignKey('foo.id'), primary_key=True),
- )
+ Column('id', Integer, ForeignKey('foo.id'), primary_key=True),
+ )
def setup_mappers(cls):
foo = cls.tables.foo
sess.commit()
eq_(f1.data, {'b': 'c'})
-class MutableAssociationScalarPickleTest(_MutableDictTestBase, fixtures.MappedTest):
+
+class MutableAssociationScalarPickleTest(_MutableDictTestBase,
+ fixtures.MappedTest):
+
@classmethod
def define_tables(cls, metadata):
MutableDict = cls._type_fixture()
MutableDict.associate_with(PickleType)
Table('foo', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('skip', PickleType),
- Column('data', PickleType),
- Column('unrelated_data', String(50))
- )
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('skip', PickleType),
+ Column('data', PickleType),
+ Column('unrelated_data', String(50))
+ )
+
+
+class MutableAssociationScalarJSONTest(_MutableDictTestBase,
+ fixtures.MappedTest):
-class MutableAssociationScalarJSONTest(_MutableDictTestBase, fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):
import json
MutableDict.associate_with(JSONEncodedDict)
Table('foo', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('data', JSONEncodedDict),
- Column('unrelated_data', String(50))
- )
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('data', JSONEncodedDict),
+ Column('unrelated_data', String(50))
+ )
-class CustomMutableAssociationScalarJSONTest(_MutableDictTestBase, fixtures.MappedTest):
+class CustomMutableAssociationScalarJSONTest(_MutableDictTestBase,
+ fixtures.MappedTest):
CustomMutableDict = None
@classmethod
def _type_fixture(cls):
if not(getattr(cls, 'CustomMutableDict')):
- MutableDict = super(CustomMutableAssociationScalarJSONTest, cls)._type_fixture()
+ MutableDict = super(
+ CustomMutableAssociationScalarJSONTest, cls)._type_fixture()
+
class CustomMutableDict(MutableDict):
pass
cls.CustomMutableDict = CustomMutableDict
CustomMutableDict.associate_with(JSONEncodedDict)
Table('foo', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('data', JSONEncodedDict),
- Column('unrelated_data', String(50))
- )
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('data', JSONEncodedDict),
+ Column('unrelated_data', String(50))
+ )
def test_pickle_parent(self):
- # Picklers don't know how to pickle CustomMutableDict, but we aren't testing that here
+ # Picklers don't know how to pickle CustomMutableDict,
+ # but we aren't testing that here
pass
def test_coerce(self):
class _CompositeTestBase(object):
+
@classmethod
def define_tables(cls, metadata):
Table('foo', metadata,
- Column('id', Integer, primary_key=True,
- test_needs_autoincrement=True),
- Column('x', Integer),
- Column('y', Integer),
- Column('unrelated_data', String(50))
- )
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('x', Integer),
+ Column('y', Integer),
+ Column('unrelated_data', String(50))
+ )
def setup(self):
from sqlalchemy.ext import mutable
mutable._setup_composite_listener()
super(_CompositeTestBase, self).setup()
-
def teardown(self):
# clear out mapper events
Mapper.dispatch._clear()
@classmethod
def _type_fixture(cls):
-
return Point
+
class MutableCompositesUnpickleTest(_CompositeTestBase, fixtures.MappedTest):
@classmethod
for loads, dumps in picklers():
loads(dumps(u1))
+
class MutableCompositesTest(_CompositeTestBase, fixtures.MappedTest):
@classmethod
eq_(f1.data.x, 5)
+
class MutableCompositeCallableTest(_CompositeTestBase, fixtures.MappedTest):
@classmethod
eq_(f1.data.x, 3)
-class MutableCompositeCustomCoerceTest(_CompositeTestBase, fixtures.MappedTest):
+class MutableCompositeCustomCoerceTest(_CompositeTestBase,
+ fixtures.MappedTest):
+
@classmethod
def _type_fixture(cls):
return MyPoint
-
@classmethod
def setup_mappers(cls):
foo = cls.tables.foo
class MutableInheritedCompositesTest(_CompositeTestBase, fixtures.MappedTest):
+
@classmethod
def define_tables(cls, metadata):
Table('foo', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('x', Integer),
- Column('y', Integer)
- )
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('x', Integer),
+ Column('y', Integer)
+ )
Table('subfoo', metadata,
- Column('id', Integer, ForeignKey('foo.id'), primary_key=True),
- )
+ Column('id', Integer, ForeignKey('foo.id'), primary_key=True),
+ )
@classmethod
def setup_mappers(cls):
sess.add(f2)
f2.data.y = 12
assert f2 in sess.dirty
-
from sqlalchemy import event
from sqlalchemy.testing.mock import Mock, call
+
class _RemoveListeners(object):
+
def teardown(self):
events.MapperEvents._clear()
events.InstanceEvents._clear()
users, addresses = self.tables.users, self.tables.addresses
-
canary = []
+
class A(object):
pass
+
class B(A):
pass
mapper(A, users)
mapper(B, addresses, inherits=A,
- properties={'address_id': addresses.c.id})
+ properties={'address_id': addresses.c.id})
def init_a(target, args, kwargs):
canary.append(('init_a', target))
a = A()
eq_(canary, [('init_a', a), ('init_b', a),
- ('init_c', a), ('init_d', a), ('init_e', a)])
+ ('init_c', a), ('init_d', a), ('init_e', a)])
# test propagate flag
canary[:] = []
b = B()
eq_(canary, [('init_a', b), ('init_b', b), ('init_e', b)])
-
def listen_all(self, mapper, **kw):
canary = []
+
def evt(meth):
def go(*args, **kwargs):
canary.append(meth)
def test_basic(self):
User, users = self.classes.User, self.tables.users
-
mapper(User, users)
canary = self.listen_all(User)
named_canary = self.listen_all(User, named=True)
s = Session()
u2 = s.merge(u)
s = Session()
- u2 = s.merge(User(name='u2'))
+ u2 = s.merge(User(name='u2')) # noqa
s.commit()
s.query(User).order_by(User.id).first()
eq_(canary, ['load', 'load', 'load'])
-
def test_inheritance(self):
users, addresses, User = (self.tables.users,
- self.tables.addresses,
- self.classes.User)
+ self.tables.addresses,
+ self.classes.User)
class AdminUser(User):
pass
mapper(User, users)
mapper(AdminUser, addresses, inherits=User,
- properties={'address_id': addresses.c.id})
+ properties={'address_id': addresses.c.id})
canary1 = self.listen_all(User, propagate=True)
canary2 = self.listen_all(User)
sess.delete(am)
sess.flush()
eq_(canary1, ['init', 'before_insert', 'after_insert',
- 'refresh', 'load',
- 'before_update', 'after_update', 'before_delete',
- 'after_delete'])
+ 'refresh', 'load',
+ 'before_update', 'after_update', 'before_delete',
+ 'after_delete'])
eq_(canary2, [])
eq_(canary3, ['init', 'before_insert', 'after_insert',
- 'refresh',
- 'load',
- 'before_update', 'after_update', 'before_delete',
- 'after_delete'])
+ 'refresh',
+ 'load',
+ 'before_update', 'after_update', 'before_delete',
+ 'after_delete'])
def test_inheritance_subclass_deferred(self):
users, addresses, User = (self.tables.users,
- self.tables.addresses,
- self.classes.User)
-
+ self.tables.addresses,
+ self.classes.User)
mapper(User, users)
class AdminUser(User):
pass
mapper(AdminUser, addresses, inherits=User,
- properties={'address_id': addresses.c.id})
+ properties={'address_id': addresses.c.id})
canary3 = self.listen_all(AdminUser)
sess = create_session()
sess.delete(am)
sess.flush()
eq_(canary1, ['init', 'before_insert', 'after_insert',
- 'refresh', 'load',
- 'before_update', 'after_update', 'before_delete',
- 'after_delete'])
+ 'refresh', 'load',
+ 'before_update', 'after_update', 'before_delete',
+ 'after_delete'])
eq_(canary2, [])
eq_(canary3, ['init', 'before_insert', 'after_insert',
- 'refresh', 'load',
- 'before_update', 'after_update', 'before_delete',
- 'after_delete'])
-
+ 'refresh', 'load',
+ 'before_update', 'after_update', 'before_delete',
+ 'after_delete'])
def test_before_after_only_collection(self):
"""before_update is called on parent for collection modifications,
"""
keywords, items, item_keywords, Keyword, Item = (
- self.tables.keywords,
- self.tables.items,
- self.tables.item_keywords,
- self.classes.Keyword,
- self.classes.Item)
-
+ self.tables.keywords,
+ self.tables.items,
+ self.tables.item_keywords,
+ self.classes.Keyword,
+ self.classes.Item)
mapper(Item, items, properties={
'keywords': relationship(Keyword, secondary=item_keywords)})
sess.flush()
eq_(canary1,
['init',
- 'before_insert', 'after_insert'])
+ 'before_insert', 'after_insert'])
eq_(canary2,
['init',
- 'before_insert', 'after_insert'])
+ 'before_insert', 'after_insert'])
- canary1[:]= []
- canary2[:]= []
+ canary1[:] = []
+ canary2[:] = []
i1.keywords.append(k1)
sess.flush()
assert_raises_message(
sa.exc.SAWarning,
"before_configured' and 'after_configured' ORM events only "
- "invoke with the mapper\(\) function or Mapper class as the target.",
+ "invoke with the mapper\(\) function or Mapper class as "
+ "the target.",
event.listen, User, 'before_configured', m1
)
assert_raises_message(
sa.exc.SAWarning,
"before_configured' and 'after_configured' ORM events only "
- "invoke with the mapper\(\) function or Mapper class as the target.",
+ "invoke with the mapper\(\) function or Mapper class as "
+ "the target.",
event.listen, User, 'after_configured', m1
)
def test_instrument_event(self):
Address, addresses, users, User = (self.classes.Address,
- self.tables.addresses,
- self.tables.users,
- self.classes.User)
+ self.tables.addresses,
+ self.tables.users,
+ self.classes.User)
canary = []
+
def instrument_class(mapper, cls):
canary.append(cls)
)
-class DeclarativeEventListenTest(_RemoveListeners, fixtures.DeclarativeMappedTest):
+class DeclarativeEventListenTest(_RemoveListeners,
+ fixtures.DeclarativeMappedTest):
run_setup_classes = "each"
run_deletes = None
class DeferredMapperEventsTest(_RemoveListeners, _fixtures.FixtureTest):
+
""""test event listeners against unmapped classes.
This incurs special logic. Note if we ever do the "remove" case,
"""
users, User = (self.tables.users,
- self.classes.User)
+ self.classes.User)
canary = []
+
def evt(x, y, z):
canary.append(x)
event.listen(User, "before_insert", evt, raw=True)
"""
users, User = (self.tables.users,
- self.classes.User)
+ self.classes.User)
class SubUser(User):
pass
pass
canary = Mock()
+
def evt(x, y, z):
canary.append(x)
event.listen(User, "before_insert", canary, propagate=True, raw=True)
m = mapper(SubUser, users)
m.dispatch.before_insert(5, 6, 7)
eq_(canary.mock_calls,
- [call(5, 6, 7)])
+ [call(5, 6, 7)])
m2 = mapper(SubSubUser, users)
m2.dispatch.before_insert(8, 9, 10)
eq_(canary.mock_calls,
- [call(5, 6, 7), call(8, 9, 10)])
-
+ [call(5, 6, 7), call(8, 9, 10)])
def test_deferred_map_event_subclass_no_propagate(self):
"""
"""
users, User = (self.tables.users,
- self.classes.User)
+ self.classes.User)
class SubUser(User):
pass
canary = []
+
def evt(x, y, z):
canary.append(x)
event.listen(User, "before_insert", evt, propagate=False)
"""
users, User = (self.tables.users,
- self.classes.User)
+ self.classes.User)
class SubUser(User):
pass
m = mapper(SubUser, users)
canary = []
+
def evt(x, y, z):
canary.append(x)
event.listen(User, "before_insert", evt, propagate=True, raw=True)
"""
users, User = (self.tables.users,
- self.classes.User)
+ self.classes.User)
class SubUser(User):
pass
"""
users, User = (self.tables.users,
- self.classes.User)
+ self.classes.User)
class SubUser(User):
pass
m = mapper(SubUser, users)
canary = []
+
def evt(x):
canary.append(x)
event.listen(User, "load", evt, propagate=True, raw=True)
m.class_manager.dispatch.load(5)
eq_(canary, [5])
-
def test_deferred_instance_event_plain(self):
"""
1. instance event listen on class, w/o propagate
"""
users, User = (self.tables.users,
- self.classes.User)
+ self.classes.User)
canary = []
+
def evt(x):
canary.append(x)
event.listen(User, "load", evt, raw=True)
"""
users, User = (self.tables.users,
- self.classes.User)
+ self.classes.User)
class SubUser(User):
pass
pass
canary = []
+
def evt(x):
canary.append(x)
event.listen(User, "load", evt, propagate=True, raw=True)
def test_deferred_instance_event_subclass_propagate_baseclass(self):
"""
1. instance event listen on class, w propagate
- 2. map one subclass of class, map base class, leave 2nd subclass unmapped
+ 2. map one subclass of class, map base class, leave 2nd subclass
+ unmapped
3. event fire on sub should receive one and only one event
4. event fire on base should receive one and only one event
5. map 2nd subclass
6. event fire on 2nd subclass should receive one and only one event
"""
users, User = (self.tables.users,
- self.classes.User)
+ self.classes.User)
class SubUser(User):
pass
m3 = mapper(SubUser2, users)
m3.class_manager.dispatch.load(instance)
eq_(canary.mock_calls, [call(instance.obj()),
- call(instance.obj()), call(instance.obj())])
+ call(instance.obj()), call(instance.obj())])
def test_deferred_instance_event_subclass_no_propagate(self):
"""
3. event fire on subclass should not receive event
"""
users, User = (self.tables.users,
- self.classes.User)
+ self.classes.User)
class SubUser(User):
pass
canary = []
+
def evt(x):
canary.append(x)
event.listen(User, "load", evt, propagate=False)
eq_(canary, [])
def test_deferred_instrument_event(self):
- users, User = (self.tables.users,
- self.classes.User)
+ User = self.classes.User
canary = []
+
def evt(x):
canary.append(x)
event.listen(User, "attribute_instrument", evt)
- instrumentation._instrumentation_factory.dispatch.attribute_instrument(User)
+ instrumentation._instrumentation_factory.\
+ dispatch.attribute_instrument(User)
eq_(canary, [User])
def test_isolation_instrument_event(self):
- users, User = (self.tables.users,
- self.classes.User)
+ User = self.classes.User
+
class Bar(object):
pass
canary = []
+
def evt(x):
canary.append(x)
event.listen(Bar, "attribute_instrument", evt)
- instrumentation._instrumentation_factory.dispatch.attribute_instrument(User)
+ instrumentation._instrumentation_factory.dispatch.\
+ attribute_instrument(User)
eq_(canary, [])
@testing.requires.predictable_gc
assert not dispatch.attribute_instrument
-
def test_deferred_instrument_event_subclass_propagate(self):
- users, User = (self.tables.users,
- self.classes.User)
+ User = self.classes.User
+
class SubUser(User):
pass
canary = []
+
def evt(x):
canary.append(x)
event.listen(User, "attribute_instrument", evt, propagate=True)
instrumentation._instrumentation_factory.dispatch.\
- attribute_instrument(SubUser)
+ attribute_instrument(SubUser)
eq_(canary, [SubUser])
def test_deferred_instrument_event_subclass_no_propagate(self):
users, User = (self.tables.users,
- self.classes.User)
+ self.classes.User)
+
class SubUser(User):
pass
canary = []
+
def evt(x):
canary.append(x)
event.listen(User, "attribute_instrument", evt, propagate=False)
mapper(SubUser, users)
- instrumentation._instrumentation_factory.dispatch.attribute_instrument(5)
+ instrumentation._instrumentation_factory.dispatch.\
+ attribute_instrument(5)
eq_(canary, [])
User = self.classes.User
canary = []
+
def load(target, ctx):
canary.append("load")
+
def refresh(target, ctx, attrs):
canary.append(("refresh", attrs))
class RemovalTest(_fixtures.FixtureTest):
run_inserts = None
-
def test_attr_propagated(self):
User = self.classes.User
users, addresses, User = (self.tables.users,
- self.tables.addresses,
- self.classes.User)
+ self.tables.addresses,
+ self.classes.User)
class AdminUser(User):
pass
mapper(User, users)
mapper(AdminUser, addresses, inherits=User,
- properties={'address_id': addresses.c.id})
+ properties={'address_id': addresses.c.id})
fn = Mock()
event.listen(User.name, "set", fn, propagate=True)
eq_(fn.mock_calls, [call(u1, "u1")])
-
class RefreshTest(_fixtures.FixtureTest):
run_inserts = None
User = self.classes.User
canary = []
+
def load(target, ctx):
canary.append("load")
+
def refresh(target, ctx, attrs):
canary.append(("refresh", attrs))
assert "name" not in attributes.instance_state(u1).committed_state
assert u1 in sess.dirty
-
-
-
def test_repeated_rows(self):
User = self.classes.User
sess.commit()
sess.query(User).union_all(sess.query(User)).all()
- eq_(canary, [('refresh', set(['id','name']))])
+ eq_(canary, [('refresh', set(['id', 'name']))])
def test_via_refresh_state(self):
User = self.classes.User
sess.commit()
u1.name
- eq_(canary, [('refresh', set(['id','name']))])
+ eq_(canary, [('refresh', set(['id', 'name']))])
def test_was_expired(self):
User = self.classes.User
sess.expire(u1)
sess.query(User).first()
- eq_(canary, [('refresh', set(['id','name']))])
+ eq_(canary, [('refresh', set(['id', 'name']))])
def test_was_expired_via_commit(self):
User = self.classes.User
sess.commit()
sess.query(User).first()
- eq_(canary, [('refresh', set(['id','name']))])
+ eq_(canary, [('refresh', set(['id', 'name']))])
def test_was_expired_attrs(self):
User = self.classes.User
def my_listener_one(*arg, **kw):
pass
+
def my_listener_two(*arg, **kw):
pass
def my_listener_one(*arg, **kw):
pass
- scope = scoped_session(lambda:Session())
+ scope = scoped_session(lambda: Session())
assert_raises_message(
sa.exc.ArgumentError,
pass
class NotASession(object):
+
def __call__(self):
return Session()
def _listener_fixture(self, **kw):
canary = []
+
def listener(name):
def go(*arg, **kw):
canary.append(name)
def test_flush_autocommit_hook(self):
User, users = self.classes.User, self.tables.users
-
mapper(User, users)
- sess, canary = self._listener_fixture(autoflush=False,
- autocommit=True, expire_on_commit=False)
+ sess, canary = self._listener_fixture(
+ autoflush=False,
+ autocommit=True, expire_on_commit=False)
u = User(name='u1')
sess.add(u)
sess.flush()
eq_(
canary,
- [ 'before_attach', 'after_attach', 'before_flush',
- 'after_transaction_create', 'after_begin',
- 'after_flush', 'after_flush_postexec',
- 'before_commit', 'after_commit','after_transaction_end']
+ ['before_attach', 'after_attach', 'before_flush',
+ 'after_transaction_create', 'after_begin',
+ 'after_flush', 'after_flush_postexec',
+ 'before_commit', 'after_commit', 'after_transaction_end']
)
-
def test_rollback_hook(self):
User, users = self.classes.User, self.tables.users
sess, canary = self._listener_fixture()
sess.rollback()
eq_(canary,
- ['before_attach', 'after_attach', 'before_commit', 'before_flush',
- 'after_transaction_create', 'after_begin', 'after_flush',
- 'after_flush_postexec', 'after_transaction_end', 'after_commit',
- 'after_transaction_end', 'after_transaction_create',
- 'before_attach', 'after_attach', 'before_commit',
- 'before_flush', 'after_transaction_create', 'after_begin', 'after_rollback',
- 'after_transaction_end',
- 'after_soft_rollback', 'after_transaction_end','after_transaction_create',
- 'after_soft_rollback'])
+ ['before_attach', 'after_attach', 'before_commit', 'before_flush',
+ 'after_transaction_create', 'after_begin', 'after_flush',
+ 'after_flush_postexec', 'after_transaction_end', 'after_commit',
+ 'after_transaction_end', 'after_transaction_create',
+ 'before_attach', 'after_attach', 'before_commit',
+ 'before_flush', 'after_transaction_create', 'after_begin',
+ 'after_rollback',
+ 'after_transaction_end',
+ 'after_soft_rollback', 'after_transaction_end',
+ 'after_transaction_create',
+ 'after_soft_rollback'])
def test_can_use_session_in_outer_rollback_hook(self):
User, users = self.classes.User, self.tables.users
sess = Session()
assertions = []
+
@event.listens_for(sess, "after_soft_rollback")
def do_something(session, previous_transaction):
if session.is_active:
sess.rollback()
eq_(assertions, [True, True])
-
def test_flush_noautocommit_hook(self):
User, users = self.classes.User, self.tables.users
sess.add(u)
sess.flush()
eq_(canary, ['before_attach', 'after_attach', 'before_flush',
- 'after_transaction_create', 'after_begin',
- 'after_flush', 'after_flush_postexec',
- 'after_transaction_end'])
+ 'after_transaction_create', 'after_begin',
+ 'after_flush', 'after_flush_postexec',
+ 'after_transaction_end'])
def test_flush_in_commit_hook(self):
User, users = self.classes.User, self.tables.users
u.name = 'ed'
sess.commit()
- eq_(canary, ['before_commit', 'before_flush', 'after_transaction_create', 'after_flush',
- 'after_flush_postexec',
- 'after_transaction_end',
- 'after_commit',
- 'after_transaction_end', 'after_transaction_create',])
+ eq_(canary, ['before_commit', 'before_flush',
+ 'after_transaction_create', 'after_flush',
+ 'after_flush_postexec',
+ 'after_transaction_end',
+ 'after_commit',
+ 'after_transaction_end', 'after_transaction_create', ])
def test_state_before_attach(self):
User, users = self.classes.User, self.tables.users
assert inst not in session.new
mapper(User, users)
- u= User(name='u1')
+ u = User(name='u1')
sess.add(u)
sess.flush()
sess.expunge(u)
assert inst in session.new
mapper(User, users)
- u= User(name='u1')
+ u = User(name='u1')
sess.add(u)
sess.flush()
sess.expunge(u)
sess, canary = self._listener_fixture()
sess.commit()
eq_(canary, ['before_commit', 'after_commit',
- 'after_transaction_end',
- 'after_transaction_create'])
+ 'after_transaction_end',
+ 'after_transaction_create'])
def test_on_bulk_update_hook(self):
User, users = self.classes.User, self.tables.users
[call(sess, upd.query, upd.context, upd.result)]
)
-
def test_on_bulk_delete_hook(self):
User, users = self.classes.User, self.tables.users
def test_connection_emits_after_begin(self):
sess, canary = self._listener_fixture(bind=testing.db)
- conn = sess.connection()
+ sess.connection()
eq_(canary, ['after_begin'])
+ sess.close()
def test_reentrant_flush(self):
users, User = self.tables.users, self.classes.User
-
mapper(User, users)
def before_flush(session, flush_context, objects):
def test_before_flush_affects_flush_plan(self):
users, User = self.tables.users, self.classes.User
-
mapper(User, users)
def before_flush(session, flush_context, objects):
session.add(User(name='another %s' % obj.name))
for obj in list(session.deleted):
if isinstance(obj, User):
- x = session.query(User).filter(User.name
- == 'another %s' % obj.name).one()
+ x = session.query(User).filter(
+ User.name == 'another %s' % obj.name).one()
session.delete(x)
sess = Session()
[
User(name='another u1'),
User(name='u1')
- ]
+ ]
)
sess.flush()
[
User(name='another u1'),
User(name='u1')
- ]
+ ]
)
- u.name='u2'
+ u.name = 'u2'
sess.flush()
eq_(sess.query(User).order_by(User.name).all(),
[
User(name='another u1'),
User(name='another u2'),
User(name='u2')
- ]
+ ]
)
sess.delete(u)
eq_(sess.query(User).order_by(User.name).all(),
[
User(name='another u1'),
- ]
+ ]
)
def test_before_flush_affects_dirty(self):
sess.flush()
eq_(sess.query(User).order_by(User.name).all(),
[User(name='u1')]
- )
+ )
sess.add(User(name='u2'))
sess.flush()
[
User(name='u1 modified'),
User(name='u2')
- ]
+ ]
)
-
class MapperExtensionTest(_fixtures.FixtureTest):
+
"""Superseded by MapperEventsTest - test backwards
compatibility of MapperExtension."""
methods = []
class Ext(sa.orm.MapperExtension):
+
def instrument_class(self, mapper, cls):
methods.append('instrument_class')
return sa.orm.EXT_CONTINUE
- def init_instance(self, mapper, class_, oldinit, instance, args, kwargs):
+ def init_instance(
+ self, mapper, class_, oldinit, instance, args, kwargs):
methods.append('init_instance')
return sa.orm.EXT_CONTINUE
- def init_failed(self, mapper, class_, oldinit, instance, args, kwargs):
+ def init_failed(
+ self, mapper, class_, oldinit, instance, args, kwargs):
methods.append('init_failed')
return sa.orm.EXT_CONTINUE
def test_inheritance(self):
users, addresses, User = (self.tables.users,
- self.tables.addresses,
- self.classes.User)
+ self.tables.addresses,
+ self.classes.User)
Ext, methods = self.extension()
mapper(User, users, extension=Ext())
mapper(AdminUser, addresses, inherits=User,
- properties={'address_id': addresses.c.id})
+ properties={'address_id': addresses.c.id})
sess = create_session()
am = AdminUser(name='au1', email_address='au1@e1')
"""
- keywords, items, item_keywords, Keyword, Item = (self.tables.keywords,
- self.tables.items,
- self.tables.item_keywords,
- self.classes.Keyword,
- self.classes.Item)
-
+ keywords, items, item_keywords, Keyword, Item = (
+ self.tables.keywords,
+ self.tables.items,
+ self.tables.item_keywords,
+ self.classes.Keyword,
+ self.classes.Item)
Ext1, methods1 = self.extension()
Ext2, methods2 = self.extension()
- mapper(Item, items, extension=Ext1() , properties={
+ mapper(Item, items, extension=Ext1(), properties={
'keywords': relationship(Keyword, secondary=item_keywords)})
mapper(Keyword, keywords, extension=Ext2())
sess.flush()
eq_(methods1,
['instrument_class', 'init_instance',
- 'before_insert', 'after_insert'])
+ 'before_insert', 'after_insert'])
eq_(methods2,
['instrument_class', 'init_instance',
- 'before_insert', 'after_insert'])
+ 'before_insert', 'after_insert'])
del methods1[:]
del methods2[:]
eq_(methods1, ['before_update', 'after_update'])
eq_(methods2, [])
-
def test_inheritance_with_dupes(self):
"""Inheritance with the same extension instance on both mappers."""
users, addresses, User = (self.tables.users,
- self.tables.addresses,
- self.classes.User)
+ self.tables.addresses,
+ self.classes.User)
Ext, methods = self.extension()
ext = Ext()
mapper(User, users, extension=ext)
mapper(AdminUser, addresses, inherits=User, extension=ext,
- properties={'address_id': addresses.c.id})
+ properties={'address_id': addresses.c.id})
sess = create_session()
am = AdminUser(name="au1", email_address="au1@e1")
'before_update', 'after_update', 'before_delete',
'after_delete'])
-
def test_unnecessary_methods_not_evented(self):
users = self.tables.users
class MyExtension(sa.orm.MapperExtension):
+
def before_insert(self, mapper, connection, instance):
pass
class AttributeExtensionTest(fixtures.MappedTest):
+
@classmethod
def define_tables(cls, metadata):
Table('t1',
- metadata,
- Column('id', Integer, primary_key=True),
- Column('type', String(40)),
- Column('data', String(50))
+ metadata,
+ Column('id', Integer, primary_key=True),
+ Column('type', String(40)),
+ Column('data', String(50))
- )
+ )
def test_cascading_extensions(self):
t1 = self.tables.t1
ext_msg = []
class Ex1(sa.orm.AttributeExtension):
+
def set(self, state, value, oldvalue, initiator):
ext_msg.append("Ex1 %r" % value)
return "ex1" + value
class Ex2(sa.orm.AttributeExtension):
+
def set(self, state, value, oldvalue, initiator):
ext_msg.append("Ex2 %r" % value)
return "ex2" + value
class A(fixtures.BasicEntity):
pass
+
class B(A):
pass
+
class C(B):
pass
- mapper(A, t1, polymorphic_on=t1.c.type, polymorphic_identity='a', properties={
- 'data':column_property(t1.c.data, extension=Ex1())
- })
+ mapper(
+ A, t1, polymorphic_on=t1.c.type, polymorphic_identity='a',
+ properties={
+ 'data': column_property(t1.c.data, extension=Ex1())
+ }
+ )
mapper(B, polymorphic_identity='b', inherits=A)
- mc = mapper(C, polymorphic_identity='c', inherits=B, properties={
- 'data':column_property(t1.c.data, extension=Ex2())
+ mapper(C, polymorphic_identity='c', inherits=B, properties={
+ 'data': column_property(t1.c.data, extension=Ex2())
})
a1 = A(data='a1')
eq_(c1.data, 'ex2c1')
a1.data = 'a2'
- b1.data='b2'
+ b1.data = 'b2'
c1.data = 'c2'
eq_(a1.data, 'ex1a2')
eq_(b1.data, 'ex1b2')
eq_(c1.data, 'ex2c2')
eq_(ext_msg, ["Ex1 'a1'", "Ex1 'b1'", "Ex2 'c1'",
- "Ex1 'a2'", "Ex1 'b2'", "Ex2 'c2'"])
-
+ "Ex1 'a2'", "Ex1 'b2'", "Ex2 'c2'"])
class SessionExtensionTest(_fixtures.FixtureTest):
mapper(User, users)
log = []
+
class MyExt(sa.orm.session.SessionExtension):
+
def before_commit(self, session):
log.append('before_commit')
+
def after_commit(self, session):
log.append('after_commit')
+
def after_rollback(self, session):
log.append('after_rollback')
+
def before_flush(self, session, flush_context, objects):
log.append('before_flush')
+
def after_flush(self, session, flush_context):
log.append('after_flush')
+
def after_flush_postexec(self, session, flush_context):
log.append('after_flush_postexec')
+
def after_begin(self, session, transaction, connection):
log.append('after_begin')
+
def after_attach(self, session, instance):
log.append('after_attach')
+
def after_bulk_update(
self,
session, query, query_context, result
- ):
+ ):
log.append('after_bulk_update')
def after_bulk_delete(
self,
session, query, query_context, result
- ):
+ ):
log.append('after_bulk_delete')
- sess = create_session(extension = MyExt())
+ sess = create_session(extension=MyExt())
u = User(name='u1')
sess.add(u)
sess.flush()
'after_flush_postexec',
'before_commit',
'after_commit',
- ]
+ ]
log = []
sess = create_session(autocommit=False, extension=MyExt())
u = User(name='u1')
log = []
sess = create_session(autocommit=False, extension=MyExt(),
bind=testing.db)
- conn = sess.connection()
+ sess.connection()
assert log == ['after_begin']
+ sess.close()
def test_multiple_extensions(self):
User, users = self.classes.User, self.tables.users
log = []
+
class MyExt1(sa.orm.session.SessionExtension):
+
def before_commit(self, session):
log.append('before_commit_one')
-
class MyExt2(sa.orm.session.SessionExtension):
+
def before_commit(self, session):
log.append('before_commit_two')
mapper(User, users)
- sess = create_session(extension = [MyExt1(), MyExt2()])
+ sess = create_session(extension=[MyExt1(), MyExt2()])
u = User(name='u1')
sess.add(u)
sess.flush()
assert log == [
'before_commit_one',
'before_commit_two',
- ]
+ ]
def test_unnecessary_methods_not_evented(self):
class MyExtension(sa.orm.session.SessionExtension):
+
def before_commit(self, session):
pass
q.all(),
[(7, 'jack')]
)
-