from sqlalchemy.orm import create_session, sessionmaker, attributes, \
make_transient, Session
import sqlalchemy as sa
-from test.lib import engines, testing, config
+from test.lib import engines, config
+from test.lib import testing
from sqlalchemy import Integer, String, Sequence
from test.lib.schema import Table, Column
from sqlalchemy.orm import mapper, relationship, backref, joinedload, \
exc as orm_exc, object_session
from sqlalchemy.util import pypy
from test.lib import fixtures
-from test.lib import fixtures
from test.orm import _fixtures
+from sqlalchemy import event, ForeignKey
-class SessionTest(_fixtures.FixtureTest):
+class BindTest(_fixtures.FixtureTest):
run_inserts = None
- def test_no_close_on_flush(self):
- """Flush() doesn't close a connection the session didn't open"""
-
- User, users = self.classes.User, self.tables.users
-
- c = testing.db.connect()
- c.execute("select * from users")
-
- mapper(User, users)
- s = create_session(bind=c)
- s.add(User(name='first'))
- s.flush()
- c.execute("select * from users")
-
- def test_close(self):
- """close() doesn't close a connection the session didn't open"""
-
- User, users = self.classes.User, self.tables.users
-
- c = testing.db.connect()
- c.execute("select * from users")
-
- mapper(User, users)
- s = create_session(bind=c)
- s.add(User(name='first'))
- s.flush()
- c.execute("select * from users")
- s.close()
- c.execute("select * from users")
-
- def test_object_session_raises(self):
- User = self.classes.User
-
- assert_raises(
- orm_exc.UnmappedInstanceError,
- object_session,
- object()
- )
-
- assert_raises(
- orm_exc.UnmappedInstanceError,
- object_session,
- User()
- )
-
- @testing.requires.sequences
- def test_sequence_execute(self):
- seq = Sequence("some_sequence")
- seq.create(testing.db)
- try:
- sess = create_session(bind=testing.db)
- eq_(sess.execute(seq), 1)
- finally:
- seq.drop(testing.db)
-
-
-
- @engines.close_open_connections
def test_mapped_binds(self):
Address, addresses, users, User = (self.classes.Address,
self.tables.addresses,
sess.close()
- @engines.close_open_connections
def test_table_binds(self):
Address, addresses, users, User = (self.classes.Address,
self.tables.addresses,
sess.close()
- @engines.close_open_connections
def test_bind_from_metadata(self):
users, User = self.tables.users, self.classes.User
assert len(session.query(User).filter_by(name='Johnny').all()) == 0
session.close()
- @testing.requires.independent_connections
- @engines.close_open_connections
- def test_transaction(self):
- User, users = self.classes.User, self.tables.users
+ def test_bind_arguments(self):
+ users, Address, addresses, User = (self.tables.users,
+ self.classes.Address,
+ self.tables.addresses,
+ self.classes.User)
mapper(User, users)
- conn1 = testing.db.connect()
- conn2 = testing.db.connect()
+ mapper(Address, addresses)
- sess = create_session(autocommit=False, bind=conn1)
- u = User(name='x')
- sess.add(u)
- sess.flush()
- assert conn1.execute("select count(1) from users").scalar() == 1
- assert conn2.execute("select count(1) from users").scalar() == 0
- sess.commit()
- assert conn1.execute("select count(1) from users").scalar() == 1
+ e1 = engines.testing_engine()
+ e2 = engines.testing_engine()
+ e3 = engines.testing_engine()
+
+ sess = Session(e3)
+ sess.bind_mapper(User, e1)
+ sess.bind_mapper(Address, e2)
+
+ assert sess.connection().engine is e3
+ assert sess.connection(bind=e1).engine is e1
+ assert sess.connection(mapper=Address, bind=e1).engine is e1
+ assert sess.connection(mapper=Address).engine is e2
+ assert sess.connection(clause=addresses.select()).engine is e2
+ assert sess.connection(mapper=User,
+ clause=addresses.select()).engine is e1
+ assert sess.connection(mapper=User,
+ clause=addresses.select(),
+ bind=e2).engine is e2
- assert testing.db.connect().execute('select count(1) from users'
- ).scalar() == 1
sess.close()
- @testing.requires.independent_connections
@engines.close_open_connections
- def test_autoflush(self):
- User, users = self.classes.User, self.tables.users
+ def test_bound_connection(self):
+ users, User = self.tables.users, self.classes.User
- bind = self.metadata.bind
mapper(User, users)
- conn1 = bind.connect()
- conn2 = bind.connect()
-
- sess = create_session(bind=conn1, autocommit=False, autoflush=True)
- u = User()
- u.name='ed'
+ c = testing.db.connect()
+ sess = create_session(bind=c)
+ sess.begin()
+ transaction = sess.transaction
+ u = User(name='u1')
sess.add(u)
- u2 = sess.query(User).filter_by(name='ed').one()
- assert u2 is u
- eq_(conn1.execute("select count(1) from users").scalar(), 1)
- eq_(conn2.execute("select count(1) from users").scalar(), 0)
- sess.commit()
- eq_(conn1.execute("select count(1) from users").scalar(), 1)
- eq_(bind.connect().execute("select count(1) from users").scalar(), 1)
+ sess.flush()
+ assert transaction._connection_for_bind(testing.db) \
+ is transaction._connection_for_bind(c) is c
+
+ assert_raises_message(sa.exc.InvalidRequestError,
+ 'Session already has a Connection '
+ 'associated',
+ transaction._connection_for_bind,
+ testing.db.connect())
+ transaction.rollback()
+ assert len(sess.query(User).all()) == 0
sess.close()
- @testing.requires.python26
- def test_with_no_autoflush(self):
+ def test_bound_connection_transactional(self):
User, users = self.classes.User, self.tables.users
mapper(User, users)
- sess = Session()
+ c = testing.db.connect()
- u = User()
- u.name = 'ed'
+ sess = create_session(bind=c, autocommit=False)
+ u = User(name='u1')
sess.add(u)
- def go(obj):
- assert u not in sess.query(User).all()
- testing.run_as_contextmanager(sess.no_autoflush, go)
- assert u in sess.new
- assert u in sess.query(User).all()
- assert u not in sess.new
-
- def test_make_transient(self):
- users, User = self.tables.users, self.classes.User
-
- mapper(User, users)
- sess = create_session()
- sess.add(User(name='test'))
sess.flush()
-
- u1 = sess.query(User).first()
- make_transient(u1)
- assert u1 not in sess
- sess.add(u1)
- assert u1 in sess.new
-
- u1 = sess.query(User).first()
- sess.expunge(u1)
- make_transient(u1)
- sess.add(u1)
- assert u1 in sess.new
-
- # test expired attributes
- # get unexpired
- u1 = sess.query(User).first()
- sess.expire(u1)
- make_transient(u1)
- assert u1.id is None
- assert u1.name is None
-
- # works twice
- make_transient(u1)
-
sess.close()
+ assert not c.in_transaction()
+ assert c.scalar("select count(1) from users") == 0
- u1.name = 'test2'
- sess.add(u1)
- sess.flush()
- assert u1 in sess
- sess.delete(u1)
+ sess = create_session(bind=c, autocommit=False)
+ u = User(name='u2')
+ sess.add(u)
sess.flush()
- assert u1 not in sess
+ sess.commit()
+ assert not c.in_transaction()
+ assert c.scalar("select count(1) from users") == 1
+ c.execute("delete from users")
+ assert c.scalar("select count(1) from users") == 0
- assert_raises(sa.exc.InvalidRequestError, sess.add, u1)
- make_transient(u1)
- sess.add(u1)
+ c = testing.db.connect()
+
+ trans = c.begin()
+ sess = create_session(bind=c, autocommit=True)
+ u = User(name='u3')
+ sess.add(u)
sess.flush()
- assert u1 in sess
+ assert c.in_transaction()
+ trans.commit()
+ assert not c.in_transaction()
+ assert c.scalar("select count(1) from users") == 1
- def test_make_transient_plus_rollback(self):
- # test for [ticket:2182]
- users, User = self.tables.users, self.classes.User
+class ExecutionTest(_fixtures.FixtureTest):
+ run_inserts = None
- mapper(User, users)
- sess = Session()
- u1 = User(name='test')
- sess.add(u1)
- sess.commit()
+ @testing.requires.sequences
+ def test_sequence_execute(self):
+ seq = Sequence("some_sequence")
+ seq.create(testing.db)
+ try:
+ sess = create_session(bind=testing.db)
+ eq_(sess.execute(seq), 1)
+ finally:
+ seq.drop(testing.db)
- sess.delete(u1)
- sess.flush()
- make_transient(u1)
- sess.rollback()
+ def test_textual_execute(self):
+ """test that Session.execute() converts to text()"""
- def test_deleted_flag(self):
- users, User = self.tables.users, self.classes.User
+ users = self.tables.users
- mapper(User, users)
- sess = sessionmaker()()
+ sess = create_session(bind=self.metadata.bind)
+ users.insert().execute(id=7, name='jack')
- u1 = User(name='u1')
- sess.add(u1)
- sess.commit()
+ # use :bindparam style
+ eq_(sess.execute("select * from users where id=:id",
+ {'id':7}).fetchall(),
+ [(7, u'jack')])
- sess.delete(u1)
- sess.flush()
- assert u1 not in sess
- assert_raises(sa.exc.InvalidRequestError, sess.add, u1)
- sess.rollback()
- assert u1 in sess
- sess.delete(u1)
+ # use :bindparam style
+ eq_(sess.scalar("select id from users where id=:id",
+ {'id':7}),
+ 7)
+
+ def test_parameter_execute(self):
+ users = self.tables.users
+ sess = Session(bind=testing.db)
+ sess.execute(users.insert(), [
+ {"id": 7, "name": "u7"},
+ {"id": 8, "name": "u8"}
+ ]
+ )
+ sess.execute(users.insert(), {"id": 9, "name": "u9"})
+ eq_(
+ sess.execute(sa.select([users.c.id]).\
+ order_by(users.c.id)).fetchall(),
+ [(7, ), (8, ), (9, )]
+ )
+
+
+class TransScopingTest(_fixtures.FixtureTest):
+ run_inserts = None
+
+ def test_no_close_on_flush(self):
+ """Flush() doesn't close a connection the session didn't open"""
+
+ User, users = self.classes.User, self.tables.users
+
+ c = testing.db.connect()
+ c.execute("select * from users")
+
+ mapper(User, users)
+ s = create_session(bind=c)
+ s.add(User(name='first'))
+ s.flush()
+ c.execute("select * from users")
+
+ def test_close(self):
+ """close() doesn't close a connection the session didn't open"""
+
+ User, users = self.classes.User, self.tables.users
+
+ c = testing.db.connect()
+ c.execute("select * from users")
+
+ mapper(User, users)
+ s = create_session(bind=c)
+ s.add(User(name='first'))
+ s.flush()
+ c.execute("select * from users")
+ s.close()
+ c.execute("select * from users")
+
+ @testing.requires.independent_connections
+ @engines.close_open_connections
+ def test_transaction(self):
+ User, users = self.classes.User, self.tables.users
+
+ mapper(User, users)
+ conn1 = testing.db.connect()
+ conn2 = testing.db.connect()
+
+ sess = create_session(autocommit=False, bind=conn1)
+ u = User(name='x')
+ sess.add(u)
+ sess.flush()
+ assert conn1.execute("select count(1) from users").scalar() == 1
+ assert conn2.execute("select count(1) from users").scalar() == 0
+ sess.commit()
+ assert conn1.execute("select count(1) from users").scalar() == 1
+
+ assert testing.db.connect().execute('select count(1) from users'
+ ).scalar() == 1
+ sess.close()
+
+class SessionUtilTest(_fixtures.FixtureTest):
+ run_inserts = None
+
+ def test_object_session_raises(self):
+ User = self.classes.User
+
+ assert_raises(
+ orm_exc.UnmappedInstanceError,
+ object_session,
+ object()
+ )
+
+ assert_raises(
+ orm_exc.UnmappedInstanceError,
+ object_session,
+ User()
+ )
+
+ def test_make_transient(self):
+ users, User = self.tables.users, self.classes.User
+
+ mapper(User, users)
+ sess = create_session()
+ sess.add(User(name='test'))
+ sess.flush()
+
+ u1 = sess.query(User).first()
+ make_transient(u1)
+ assert u1 not in sess
+ sess.add(u1)
+ assert u1 in sess.new
+
+ u1 = sess.query(User).first()
+ sess.expunge(u1)
+ make_transient(u1)
+ sess.add(u1)
+ assert u1 in sess.new
+
+ # test expired attributes
+ # get unexpired
+ u1 = sess.query(User).first()
+ sess.expire(u1)
+ make_transient(u1)
+ assert u1.id is None
+ assert u1.name is None
+
+ # works twice
+ make_transient(u1)
+
+ sess.close()
+
+ u1.name = 'test2'
+ sess.add(u1)
+ sess.flush()
+ assert u1 in sess
+ sess.delete(u1)
+ sess.flush()
+ assert u1 not in sess
+
+ assert_raises(sa.exc.InvalidRequestError, sess.add, u1)
+ make_transient(u1)
+ sess.add(u1)
+ sess.flush()
+ assert u1 in sess
+
+ def test_make_transient_plus_rollback(self):
+ # test for [ticket:2182]
+ users, User = self.tables.users, self.classes.User
+
+ mapper(User, users)
+ sess = Session()
+ u1 = User(name='test')
+ sess.add(u1)
+ sess.commit()
+
+ sess.delete(u1)
+ sess.flush()
+ make_transient(u1)
+ sess.rollback()
+
+class SessionStateTest(_fixtures.FixtureTest):
+ run_inserts = None
+
+
+ @testing.requires.independent_connections
+ @engines.close_open_connections
+ def test_autoflush(self):
+ User, users = self.classes.User, self.tables.users
+
+ bind = self.metadata.bind
+ mapper(User, users)
+ conn1 = bind.connect()
+ conn2 = bind.connect()
+
+ sess = create_session(bind=conn1, autocommit=False, autoflush=True)
+ u = User()
+ u.name = 'ed'
+ sess.add(u)
+ u2 = sess.query(User).filter_by(name='ed').one()
+ assert u2 is u
+ eq_(conn1.execute("select count(1) from users").scalar(), 1)
+ eq_(conn2.execute("select count(1) from users").scalar(), 0)
+ sess.commit()
+ eq_(conn1.execute("select count(1) from users").scalar(), 1)
+ eq_(bind.connect().execute("select count(1) from users").scalar(), 1)
+ sess.close()
+
+ @testing.requires.python26
+ def test_with_no_autoflush(self):
+ User, users = self.classes.User, self.tables.users
+
+ mapper(User, users)
+ sess = Session()
+
+ u = User()
+ u.name = 'ed'
+ sess.add(u)
+ def go(obj):
+ assert u not in sess.query(User).all()
+ testing.run_as_contextmanager(sess.no_autoflush, go)
+ assert u in sess.new
+ assert u in sess.query(User).all()
+ assert u not in sess.new
+
+
+ def test_deleted_flag(self):
+ users, User = self.tables.users, self.classes.User
+
+ mapper(User, users)
+
+ sess = sessionmaker()()
+
+ u1 = User(name='u1')
+ sess.add(u1)
+ sess.commit()
+
+ sess.delete(u1)
+ sess.flush()
+ assert u1 not in sess
+ assert_raises(sa.exc.InvalidRequestError, sess.add, u1)
+ sess.rollback()
+ assert u1 in sess
+
+ sess.delete(u1)
sess.commit()
assert u1 not in sess
assert_raises(sa.exc.InvalidRequestError, sess.add, u1)
sess.rollback()
assert not sess.is_active
- def test_textual_execute(self):
- """test that Session.execute() converts to text()"""
- users = self.tables.users
+ @engines.close_open_connections
+ def test_add_delete(self):
+ User, Address, addresses, users = (self.classes.User,
+ self.classes.Address,
+ self.tables.addresses,
+ self.tables.users)
- sess = create_session(bind=self.metadata.bind)
- users.insert().execute(id=7, name='jack')
+ s = create_session()
+ mapper(User, users, properties={
+ 'addresses':relationship(Address, cascade="all, delete")
+ })
+ mapper(Address, addresses)
- # use :bindparam style
- eq_(sess.execute("select * from users where id=:id",
- {'id':7}).fetchall(),
- [(7, u'jack')])
+ user = User(name='u1')
+ assert_raises_message(sa.exc.InvalidRequestError,
+ 'is not persisted', s.delete, user)
- # use :bindparam style
- eq_(sess.scalar("select id from users where id=:id",
- {'id':7}),
- 7)
+ s.add(user)
+ s.flush()
+ user = s.query(User).one()
+ s.expunge(user)
+ assert user not in s
- def test_parameter_execute(self):
- users = self.tables.users
- sess = Session(bind=testing.db)
- sess.execute(users.insert(), [
- {"id": 7, "name": "u7"},
- {"id": 8, "name": "u8"}
- ]
- )
- sess.execute(users.insert(), {"id": 9, "name": "u9"})
- eq_(
- sess.execute(sa.select([users.c.id]).\
- order_by(users.c.id)).fetchall(),
- [(7, ), (8, ), (9, )]
- )
+ # modify outside of session, assert changes remain/get saved
+ user.name = "fred"
+ s.add(user)
+ assert user in s
+ assert user in s.dirty
+ s.flush()
+ s.expunge_all()
+ assert s.query(User).count() == 1
+ user = s.query(User).one()
+ assert user.name == 'fred'
+ # ensure its not dirty if no changes occur
+ s.expunge_all()
+ assert user not in s
+ s.add(user)
+ assert user in s
+ assert user not in s.dirty
- @engines.close_open_connections
- def test_bound_connection(self):
+ s2 = create_session()
+ assert_raises_message(sa.exc.InvalidRequestError,
+ 'is already attached to session',
+ s2.delete, user)
+ u2 = s2.query(User).get(user.id)
+ assert_raises_message(sa.exc.InvalidRequestError,
+ 'another instance with key', s.delete, u2)
+ s.expire(user)
+ s.expunge(user)
+ assert user not in s
+ s.delete(user)
+ assert user in s
+
+ s.flush()
+ assert user not in s
+ assert s.query(User).count() == 0
+
+
+ @testing.uses_deprecated()
+ def test_identity_conflict(self):
users, User = self.tables.users, self.classes.User
mapper(User, users)
- c = testing.db.connect()
- sess = create_session(bind=c)
- sess.begin()
- transaction = sess.transaction
- u = User(name='u1')
- sess.add(u)
- sess.flush()
- assert transaction._connection_for_bind(testing.db) \
- is transaction._connection_for_bind(c) is c
+ for s in (
+ create_session(),
+ create_session(weak_identity_map=False),
+ ):
+ users.delete().execute()
+ u1 = User(name="ed")
+ s.add(u1)
+ s.flush()
+ s.expunge(u1)
+ u2 = s.query(User).first()
+ s.expunge(u2)
+ s.identity_map.add(sa.orm.attributes.instance_state(u1))
+ assert_raises(AssertionError, s.identity_map.add,
+ sa.orm.attributes.instance_state(u2))
+
+ def test_pickled_update(self):
+ users, User = self.tables.users, pickleable.User
+
+ mapper(User, users)
+ sess1 = create_session()
+ sess2 = create_session()
+ u1 = User(name='u1')
+ sess1.add(u1)
assert_raises_message(sa.exc.InvalidRequestError,
- 'Session already has a Connection '
- 'associated',
- transaction._connection_for_bind,
- testing.db.connect())
- transaction.rollback()
- assert len(sess.query(User).all()) == 0
- sess.close()
+ 'already attached to session', sess2.add,
+ u1)
+ u2 = pickle.loads(pickle.dumps(u1))
+ sess2.add(u2)
- def test_bound_connection_transactional(self):
- User, users = self.classes.User, self.tables.users
+ def test_duplicate_update(self):
+ users, User = self.tables.users, self.classes.User
mapper(User, users)
- c = testing.db.connect()
+ Session = sessionmaker()
+ sess = Session()
- sess = create_session(bind=c, autocommit=False)
- u = User(name='u1')
- sess.add(u)
+ u1 = User(name='u1')
+ sess.add(u1)
sess.flush()
- sess.close()
- assert not c.in_transaction()
- assert c.scalar("select count(1) from users") == 0
+ assert u1.id is not None
- sess = create_session(bind=c, autocommit=False)
- u = User(name='u2')
- sess.add(u)
- sess.flush()
- sess.commit()
- assert not c.in_transaction()
- assert c.scalar("select count(1) from users") == 1
- c.execute("delete from users")
- assert c.scalar("select count(1) from users") == 0
+ sess.expunge(u1)
- c = testing.db.connect()
+ assert u1 not in sess
+ assert Session.object_session(u1) is None
+
+ u2 = sess.query(User).get(u1.id)
+ assert u2 is not None and u2 is not u1
+ assert u2 in sess
+
+ assert_raises(Exception, lambda: sess.add(u1))
+
+ sess.expunge(u2)
+ assert u2 not in sess
+ assert Session.object_session(u2) is None
+
+ u1.name = "John"
+ u2.name = "Doe"
+
+ sess.add(u1)
+ assert u1 in sess
+ assert Session.object_session(u1) is sess
- trans = c.begin()
- sess = create_session(bind=c, autocommit=True)
- u = User(name='u3')
- sess.add(u)
sess.flush()
- assert c.in_transaction()
- trans.commit()
- assert not c.in_transaction()
- assert c.scalar("select count(1) from users") == 1
- def test_bind_arguments(self):
- users, Address, addresses, User = (self.tables.users,
- self.classes.Address,
- self.tables.addresses,
- self.classes.User)
+ sess.expunge_all()
+
+ u3 = sess.query(User).get(u1.id)
+ assert u3 is not u1 and u3 is not u2 and u3.name == u1.name
+
+ def test_no_double_save(self):
+ users = self.tables.users
+
+ sess = create_session()
+ class Foo(object):
+ def __init__(self):
+ sess.add(self)
+ class Bar(Foo):
+ def __init__(self):
+ sess.add(self)
+ Foo.__init__(self)
+ mapper(Foo, users)
+ mapper(Bar, users)
+
+ b = Bar()
+ assert b in sess
+ assert len(list(sess)) == 1
+
+ def test_identity_map_mutate(self):
+ users, User = self.tables.users, self.classes.User
mapper(User, users)
- mapper(Address, addresses)
- e1 = engines.testing_engine()
- e2 = engines.testing_engine()
- e3 = engines.testing_engine()
+ sess = Session()
- sess = Session(e3)
- sess.bind_mapper(User, e1)
- sess.bind_mapper(Address, e2)
+ sess.add_all([User(name='u1'), User(name='u2'), User(name='u3')])
+ sess.commit()
- assert sess.connection().engine is e3
- assert sess.connection(bind=e1).engine is e1
- assert sess.connection(mapper=Address, bind=e1).engine is e1
- assert sess.connection(mapper=Address).engine is e2
- assert sess.connection(clause=addresses.select()).engine is e2
- assert sess.connection(mapper=User,
- clause=addresses.select()).engine is e1
- assert sess.connection(mapper=User,
- clause=addresses.select(),
- bind=e2).engine is e2
+ # TODO: what are we testing here ? that iteritems() can
+ # withstand a change? should this be
+ # more directly attempting to manipulate the identity_map ?
+ u1, u2, u3 = sess.query(User).all()
+ for i, (key, value) in enumerate(sess.identity_map.iteritems()):
+ if i == 2:
+ del u3
+ gc_collect()
- sess.close()
+class SessionStateWFixtureTest(_fixtures.FixtureTest):
- @engines.close_open_connections
- def test_add_delete(self):
- User, Address, addresses, users = (self.classes.User,
- self.classes.Address,
+ def test_autoflush_rollback(self):
+ Address, addresses, users, User = (self.classes.Address,
self.tables.addresses,
- self.tables.users)
-
+ self.tables.users,
+ self.classes.User)
- s = create_session()
- mapper(User, users, properties={
- 'addresses':relationship(Address, cascade="all, delete")
- })
mapper(Address, addresses)
+ mapper(User, users, properties={
+ 'addresses':relationship(Address)})
- user = User(name='u1')
+ sess = create_session(autocommit=False, autoflush=True)
+ u = sess.query(User).get(8)
+ newad = Address(email_address='a new address')
+ u.addresses.append(newad)
+ u.name = 'some new name'
+ assert u.name == 'some new name'
+ assert len(u.addresses) == 4
+ assert newad in u.addresses
+ sess.rollback()
+ assert u.name == 'ed'
+ assert len(u.addresses) == 3
- assert_raises_message(sa.exc.InvalidRequestError,
- 'is not persisted', s.delete, user)
+ assert newad not in u.addresses
+ # pending objects dont get expired
+ assert newad.email_address == 'a new address'
- s.add(user)
- s.flush()
- user = s.query(User).one()
- s.expunge(user)
- assert user not in s
+ def test_expunge_cascade(self):
+ Address, addresses, users, User = (self.classes.Address,
+ self.tables.addresses,
+ self.tables.users,
+ self.classes.User)
- # modify outside of session, assert changes remain/get saved
- user.name = "fred"
- s.add(user)
- assert user in s
- assert user in s.dirty
- s.flush()
- s.expunge_all()
- assert s.query(User).count() == 1
- user = s.query(User).one()
- assert user.name == 'fred'
+ mapper(Address, addresses)
+ mapper(User, users, properties={
+ 'addresses':relationship(Address,
+ backref=backref("user", cascade="all"),
+ cascade="all")})
+
+ session = create_session()
+ u = session.query(User).filter_by(id=7).one()
+
+ # get everything to load in both directions
+ print [a.user for a in u.addresses]
+
+ # then see if expunge fails
+ session.expunge(u)
- # ensure its not dirty if no changes occur
- s.expunge_all()
- assert user not in s
- s.add(user)
- assert user in s
- assert user not in s.dirty
+ assert sa.orm.object_session(u) is None
+ assert sa.orm.attributes.instance_state(u).session_id is None
+ for a in u.addresses:
+ assert sa.orm.object_session(a) is None
+ assert sa.orm.attributes.instance_state(a).session_id is None
- s2 = create_session()
- assert_raises_message(sa.exc.InvalidRequestError,
- 'is already attached to session',
- s2.delete, user)
- u2 = s2.query(User).get(user.id)
- assert_raises_message(sa.exc.InvalidRequestError,
- 'another instance with key', s.delete, u2)
- s.expire(user)
- s.expunge(user)
- assert user not in s
- s.delete(user)
- assert user in s
- s.flush()
- assert user not in s
- assert s.query(User).count() == 0
+class WeakIdentityMapTest(_fixtures.FixtureTest):
+ run_inserts = None
- def test_weak_ref(self):
+ @testing.requires.predictable_gc
+ def test_weakref(self):
"""test the weak-referencing identity map, which strongly-
references modified items."""
assert user.name == 'fred'
assert s.identity_map
- def test_weak_ref_pickled(self):
+ @testing.requires.predictable_gc
+ def test_weakref_pickled(self):
users, User = self.tables.users, pickleable.User
s = create_session()
assert not s.identity_map
- @testing.uses_deprecated()
- def test_identity_conflict(self):
- users, User = self.tables.users, self.classes.User
-
- mapper(User, users)
- for s in (
- create_session(),
- create_session(weak_identity_map=False),
- ):
- users.delete().execute()
- u1 = User(name="ed")
- s.add(u1)
- s.flush()
- s.expunge(u1)
- u2 = s.query(User).first()
- s.expunge(u2)
- s.identity_map.add(sa.orm.attributes.instance_state(u1))
-
- assert_raises(AssertionError, s.identity_map.add,
- sa.orm.attributes.instance_state(u2))
-
-
+ @testing.requires.predictable_gc
def test_weakref_with_cycles_o2m(self):
Address, addresses, users, User = (self.classes.Address,
self.tables.addresses,
user = s.query(User).options(joinedload(User.addresses)).one()
eq_(user, User(name="ed", addresses=[Address(email_address="ed2")]))
+ @testing.requires.predictable_gc
def test_weakref_with_cycles_o2o(self):
Address, addresses, users, User = (self.classes.Address,
self.tables.addresses,
user = s.query(User).options(joinedload(User.address)).one()
eq_(user, User(name="ed", address=Address(email_address="ed2")))
+ def test_auto_detach_on_gc_session(self):
+ users, User = self.tables.users, self.classes.User
+
+ mapper(User, users)
+
+ sess = Session()
+
+ u1 = User(name='u1')
+ sess.add(u1)
+ sess.commit()
+
+ # can't add u1 to Session,
+ # already belongs to u2
+ s2 = Session()
+ assert_raises_message(
+ sa.exc.InvalidRequestError,
+ r".*is already attached to session",
+ s2.add, u1
+ )
+
+ # garbage collect sess
+ del sess
+ gc_collect()
+
+ # s2 lets it in now despite u1 having
+ # session_key
+ s2.add(u1)
+ assert u1 in s2
+
+
+class StrongIdentityMapTest(_fixtures.FixtureTest):
+ run_inserts = None
+
@testing.uses_deprecated()
def test_strong_ref(self):
users, User = self.tables.users, self.classes.User
self.assert_(len(s.identity_map) == 0)
- def test_pickled_update(self):
- users, User = self.tables.users, pickleable.User
-
- mapper(User, users)
- sess1 = create_session()
- sess2 = create_session()
- u1 = User(name='u1')
- sess1.add(u1)
- assert_raises_message(sa.exc.InvalidRequestError,
- 'already attached to session', sess2.add,
- u1)
- u2 = pickle.loads(pickle.dumps(u1))
- sess2.add(u2)
-
- def test_duplicate_update(self):
- users, User = self.tables.users, self.classes.User
-
- mapper(User, users)
- Session = sessionmaker()
- sess = Session()
-
- u1 = User(name='u1')
- sess.add(u1)
- sess.flush()
- assert u1.id is not None
-
- sess.expunge(u1)
-
- assert u1 not in sess
- assert Session.object_session(u1) is None
-
- u2 = sess.query(User).get(u1.id)
- assert u2 is not None and u2 is not u1
- assert u2 in sess
-
- assert_raises(Exception, lambda: sess.add(u1))
-
- sess.expunge(u2)
- assert u2 not in sess
- assert Session.object_session(u2) is None
-
- u1.name = "John"
- u2.name = "Doe"
-
- sess.add(u1)
- assert u1 in sess
- assert Session.object_session(u1) is sess
-
- sess.flush()
-
- sess.expunge_all()
-
- u3 = sess.query(User).get(u1.id)
- assert u3 is not u1 and u3 is not u2 and u3.name == u1.name
-
- def test_no_double_save(self):
- users = self.tables.users
-
- sess = create_session()
- class Foo(object):
- def __init__(self):
- sess.add(self)
- class Bar(Foo):
- def __init__(self):
- sess.add(self)
- Foo.__init__(self)
- mapper(Foo, users)
- mapper(Bar, users)
-
- b = Bar()
- assert b in sess
- assert len(list(sess)) == 1
-
- def test_identity_map_mutate(self):
- users, User = self.tables.users, self.classes.User
-
- mapper(User, users)
-
- sess = Session()
-
- sess.add_all([User(name='u1'), User(name='u2'), User(name='u3')])
- sess.commit()
-
- u1, u2, u3 = sess.query(User).all()
- for i, (key, value) in enumerate(sess.identity_map.iteritems()):
- if i == 2:
- del u3
- gc_collect()
-
- def test_auto_detach_on_gc_session(self):
- users, User = self.tables.users, self.classes.User
-
- mapper(User, users)
-
- sess = Session()
-
- u1 = User(name='u1')
- sess.add(u1)
- sess.commit()
-
- # can't add u1 to Session,
- # already belongs to u2
- s2 = Session()
- assert_raises_message(
- sa.exc.InvalidRequestError,
- r".*is already attached to session",
- s2.add, u1
- )
-
- # garbage collect sess
- del sess
- gc_collect()
-
- # s2 lets it in now despite u1 having
- # session_key
- s2.add(u1)
- assert u1 in s2
-
-class SessionDataTest(_fixtures.FixtureTest):
- def test_expunge_cascade(self):
- Address, addresses, users, User = (self.classes.Address,
- self.tables.addresses,
- self.tables.users,
- self.classes.User)
-
- mapper(Address, addresses)
- mapper(User, users, properties={
- 'addresses':relationship(Address,
- backref=backref("user", cascade="all"),
- cascade="all")})
-
- session = create_session()
- u = session.query(User).filter_by(id=7).one()
-
- # get everything to load in both directions
- print [a.user for a in u.addresses]
-
- # then see if expunge fails
- session.expunge(u)
-
- assert sa.orm.object_session(u) is None
- assert sa.orm.attributes.instance_state(u).session_id is None
- for a in u.addresses:
- assert sa.orm.object_session(a) is None
- assert sa.orm.attributes.instance_state(a).session_id is None
-
- def test_autoflush_rollback(self):
- Address, addresses, users, User = (self.classes.Address,
- self.tables.addresses,
- self.tables.users,
- self.classes.User)
-
- mapper(Address, addresses)
- mapper(User, users, properties={
- 'addresses':relationship(Address)})
-
- sess = create_session(autocommit=False, autoflush=True)
- u = sess.query(User).get(8)
- newad = Address(email_address='a new address')
- u.addresses.append(newad)
- u.name = 'some new name'
- assert u.name == 'some new name'
- assert len(u.addresses) == 4
- assert newad in u.addresses
- sess.rollback()
- assert u.name == 'ed'
- assert len(u.addresses) == 3
-
- assert newad not in u.addresses
- # pending objects dont get expired
- assert newad.email_address == 'a new address'
-
class IsModifiedTest(_fixtures.FixtureTest):
run_inserts = None
sess.flush()
self.bind.commit()
+
+