from sqlalchemy.orm import backref
from sqlalchemy.orm import class_mapper
from sqlalchemy.orm import configure_mappers
-from sqlalchemy.orm import create_session
from sqlalchemy.orm import exc as orm_exc
from sqlalchemy.orm import foreign
from sqlalchemy.orm import mapper
def test_list_assignment_new(self):
User, Order = self.classes.User, self.classes.Order
- sess = Session()
- u = User(
- name="jack",
- orders=[
- Order(description="order 1"),
- Order(description="order 2"),
- ],
- )
- sess.add(u)
- sess.commit()
-
- eq_(
- u,
- User(
+ with Session() as sess:
+ u = User(
name="jack",
orders=[
Order(description="order 1"),
Order(description="order 2"),
],
- ),
- )
+ )
+ sess.add(u)
+ sess.commit()
+
+ eq_(
+ u,
+ User(
+ name="jack",
+ orders=[
+ Order(description="order 1"),
+ Order(description="order 2"),
+ ],
+ ),
+ )
def test_list_assignment_replace(self):
User, Order = self.classes.User, self.classes.Order
- sess = Session()
- u = User(
- name="jack",
- orders=[
- Order(description="someorder"),
- Order(description="someotherorder"),
- ],
- )
- sess.add(u)
-
- u.orders = [Order(description="order 3"), Order(description="order 4")]
- sess.commit()
-
- eq_(
- u,
- User(
+ with Session() as sess:
+ u = User(
name="jack",
orders=[
- Order(description="order 3"),
- Order(description="order 4"),
+ Order(description="someorder"),
+ Order(description="someotherorder"),
],
- ),
- )
+ )
+ sess.add(u)
- # order 1, order 2 have been deleted
- eq_(
- sess.query(Order).order_by(Order.id).all(),
- [Order(description="order 3"), Order(description="order 4")],
- )
+ u.orders = [
+ Order(description="order 3"),
+ Order(description="order 4"),
+ ]
+ sess.commit()
+
+ eq_(
+ u,
+ User(
+ name="jack",
+ orders=[
+ Order(description="order 3"),
+ Order(description="order 4"),
+ ],
+ ),
+ )
+
+ # order 1, order 2 have been deleted
+ eq_(
+ sess.query(Order).order_by(Order.id).all(),
+ [Order(description="order 3"), Order(description="order 4")],
+ )
def test_standalone_orphan(self):
Order = self.classes.Order
- sess = Session()
- o5 = Order(description="order 5")
- sess.add(o5)
- assert_raises(sa_exc.DBAPIError, sess.flush)
+ with Session() as sess:
+ o5 = Order(description="order 5")
+ sess.add(o5)
+ assert_raises(sa_exc.DBAPIError, sess.flush)
def test_save_update_sends_pending(self):
"""test that newly added and deleted collection items are
def test_remove_pending_from_collection(self):
User, Order = self.classes.User, self.classes.Order
- sess = Session()
+ with Session() as sess:
- u = User(name="jack")
- sess.add(u)
- sess.commit()
+ u = User(name="jack")
+ sess.add(u)
+ sess.commit()
- o1 = Order()
- u.orders.append(o1)
- assert o1 in sess
- u.orders.remove(o1)
- assert o1 not in sess
+ o1 = Order()
+ u.orders.append(o1)
+ assert o1 in sess
+ u.orders.remove(o1)
+ assert o1 not in sess
def test_remove_pending_from_pending_parent(self):
# test issue #4040
User, Order = self.classes.User, self.classes.Order
- sess = Session()
+ with Session() as sess:
- u = User(name="jack")
+ u = User(name="jack")
- o1 = Order()
- sess.add(o1)
+ o1 = Order()
+ sess.add(o1)
- # object becomes an orphan, but parent is not in session
- u.orders.append(o1)
- u.orders.remove(o1)
+ # object becomes an orphan, but parent is not in session
+ u.orders.append(o1)
+ u.orders.remove(o1)
- sess.add(u)
+ sess.add(u)
- assert o1 in sess
+ assert o1 in sess
- sess.flush()
+ sess.flush()
- assert o1 not in sess
+ assert o1 not in sess
def test_delete(self):
User, users, orders, Order = (
self.classes.Order,
)
- sess = create_session()
- u = User(
- name="jack",
- orders=[
- Order(description="someorder"),
- Order(description="someotherorder"),
- ],
- )
- sess.add(u)
- sess.flush()
-
- sess.delete(u)
- sess.flush()
- eq_(select(func.count("*")).select_from(users).scalar(), 0)
- eq_(select(func.count("*")).select_from(orders).scalar(), 0)
+ with Session() as sess:
+ u = User(
+ name="jack",
+ orders=[
+ Order(description="someorder"),
+ Order(description="someotherorder"),
+ ],
+ )
+ sess.add(u)
+ sess.flush()
+
+ sess.delete(u)
+ sess.flush()
+ eq_(
+ sess.execute(
+ select(func.count("*")).select_from(users)
+ ).scalar(),
+ 0,
+ )
+ eq_(
+ sess.execute(
+ select(func.count("*")).select_from(orders)
+ ).scalar(),
+ 0,
+ )
def test_delete_unloaded_collections(self):
"""Unloaded collections are still included in a delete-cascade
self.classes.Address,
)
- sess = create_session()
- u = User(
- name="jack",
- addresses=[
- Address(email_address="address1"),
- Address(email_address="address2"),
- ],
- )
- sess.add(u)
- sess.flush()
- sess.expunge_all()
- eq_(select(func.count("*")).select_from(addresses).scalar(), 2)
- eq_(select(func.count("*")).select_from(users).scalar(), 1)
+ with Session() as sess:
+ u = User(
+ name="jack",
+ addresses=[
+ Address(email_address="address1"),
+ Address(email_address="address2"),
+ ],
+ )
+ sess.add(u)
+ sess.flush()
+ sess.expunge_all()
+ eq_(
+ sess.execute(
+ select(func.count("*")).select_from(addresses)
+ ).scalar(),
+ 2,
+ )
+ eq_(
+ sess.execute(
+ select(func.count("*")).select_from(users)
+ ).scalar(),
+ 1,
+ )
- u = sess.query(User).get(u.id)
+ u = sess.get(User, u.id)
- assert "addresses" not in u.__dict__
- sess.delete(u)
- sess.flush()
- eq_(select(func.count("*")).select_from(addresses).scalar(), 0)
- eq_(select(func.count("*")).select_from(users).scalar(), 0)
+ assert "addresses" not in u.__dict__
+ sess.delete(u)
+ sess.flush()
+ eq_(
+ sess.execute(
+ select(func.count("*")).select_from(addresses)
+ ).scalar(),
+ 0,
+ )
+ eq_(
+ sess.execute(
+ select(func.count("*")).select_from(users)
+ ).scalar(),
+ 0,
+ )
def test_cascades_onlycollection(self):
"""Cascade only reaches instances that are still part of the
self.tables.orders,
)
- sess = create_session()
- u = User(
- name="jack",
- orders=[
- Order(description="someorder"),
- Order(description="someotherorder"),
- ],
- )
- sess.add(u)
- sess.flush()
-
- o = u.orders[0]
- del u.orders[0]
- sess.delete(u)
- assert u in sess.deleted
- assert o not in sess.deleted
- assert o in sess
-
- u2 = User(name="newuser", orders=[o])
- sess.add(u2)
- sess.flush()
- sess.expunge_all()
- eq_(select(func.count("*")).select_from(users).scalar(), 1)
- eq_(select(func.count("*")).select_from(orders).scalar(), 1)
- eq_(
- sess.query(User).all(),
- [User(name="newuser", orders=[Order(description="someorder")])],
- )
+ with Session(autoflush=False) as sess:
+ u = User(
+ name="jack",
+ orders=[
+ Order(description="someorder"),
+ Order(description="someotherorder"),
+ ],
+ )
+ sess.add(u)
+ sess.flush()
+
+ o = u.orders[0]
+ del u.orders[0]
+ sess.delete(u)
+ assert u in sess.deleted
+ assert o not in sess.deleted
+ assert o in sess
+
+ u2 = User(name="newuser", orders=[o])
+ sess.add(u2)
+ sess.flush()
+ sess.expunge_all()
+ eq_(
+ sess.execute(
+ select(func.count("*")).select_from(users)
+ ).scalar(),
+ 1,
+ )
+ eq_(
+ sess.execute(
+ select(func.count("*")).select_from(orders)
+ ).scalar(),
+ 1,
+ )
+ eq_(
+ sess.query(User).all(),
+ [
+ User(
+ name="newuser", orders=[Order(description="someorder")]
+ )
+ ],
+ )
def test_cascade_nosideeffects(self):
"""test that cascade leaves the state of unloaded
self.classes.Address,
)
- sess = create_session()
+ sess = Session()
u = User(name="jack")
sess.add(u)
assert "orders" not in u.__dict__
self.classes.Order,
)
- sess = create_session()
+ sess = Session()
u = User(
name="jack",
orders=[
)
sess.add(u)
sess.flush()
- eq_(select(func.count("*")).select_from(users).scalar(), 1)
- eq_(select(func.count("*")).select_from(orders).scalar(), 2)
+ eq_(
+ sess.execute(select(func.count("*")).select_from(users)).scalar(),
+ 1,
+ )
+ eq_(
+ sess.execute(select(func.count("*")).select_from(orders)).scalar(),
+ 2,
+ )
del u.orders[0]
sess.delete(u)
sess.flush()
- eq_(select(func.count("*")).select_from(users).scalar(), 0)
- eq_(select(func.count("*")).select_from(orders).scalar(), 0)
+ eq_(
+ sess.execute(select(func.count("*")).select_from(users)).scalar(),
+ 0,
+ )
+ eq_(
+ sess.execute(select(func.count("*")).select_from(orders)).scalar(),
+ 0,
+ )
def test_collection_orphans(self):
User, users, orders, Order = (
self.classes.Order,
)
- sess = create_session()
- u = User(
- name="jack",
- orders=[
- Order(description="someorder"),
- Order(description="someotherorder"),
- ],
- )
- sess.add(u)
- sess.flush()
-
- eq_(select(func.count("*")).select_from(users).scalar(), 1)
- eq_(select(func.count("*")).select_from(orders).scalar(), 2)
+ with Session() as sess:
+ u = User(
+ name="jack",
+ orders=[
+ Order(description="someorder"),
+ Order(description="someotherorder"),
+ ],
+ )
+ sess.add(u)
+ sess.flush()
+
+ eq_(
+ sess.execute(
+ select(func.count("*")).select_from(users)
+ ).scalar(),
+ 1,
+ )
+ eq_(
+ sess.execute(
+ select(func.count("*")).select_from(orders)
+ ).scalar(),
+ 2,
+ )
- u.orders[:] = []
+ u.orders[:] = []
- sess.flush()
+ sess.flush()
- eq_(select(func.count("*")).select_from(users).scalar(), 1)
- eq_(select(func.count("*")).select_from(orders).scalar(), 0)
+ eq_(
+ sess.execute(
+ select(func.count("*")).select_from(users)
+ ).scalar(),
+ 1,
+ )
+ eq_(
+ sess.execute(
+ select(func.count("*")).select_from(orders)
+ ).scalar(),
+ 0,
+ )
class O2MCascadeTest(fixtures.MappedTest):
self.tables.users,
)
- sess = create_session()
- u = User(
- name="jack",
- orders=[
- Order(description="someorder"),
- Order(description="someotherorder"),
- ],
- )
- sess.add(u)
- sess.flush()
- eq_(select(func.count("*")).select_from(users).scalar(), 1)
- eq_(select(func.count("*")).select_from(orders).scalar(), 2)
+ with Session() as sess:
+ u = User(
+ name="jack",
+ orders=[
+ Order(description="someorder"),
+ Order(description="someotherorder"),
+ ],
+ )
+ sess.add(u)
+ sess.flush()
+ eq_(
+ sess.execute(
+ select(func.count("*")).select_from(users)
+ ).scalar(),
+ 1,
+ )
+ eq_(
+ sess.execute(
+ select(func.count("*")).select_from(orders)
+ ).scalar(),
+ 2,
+ )
- del u.orders[0]
- sess.delete(u)
- sess.flush()
- eq_(select(func.count("*")).select_from(users).scalar(), 0)
- eq_(select(func.count("*")).select_from(orders).scalar(), 1)
+ del u.orders[0]
+ sess.delete(u)
+ sess.flush()
+ eq_(
+ sess.execute(
+ select(func.count("*")).select_from(users)
+ ).scalar(),
+ 0,
+ )
+ eq_(
+ sess.execute(
+ select(func.count("*")).select_from(orders)
+ ).scalar(),
+ 1,
+ )
class O2OSingleParentTest(_fixtures.FixtureTest):
User, Address = self.classes.User, self.classes.Address
self._one_to_many_fixture(o2m=True, m2o=True, m2o_cascade=False)
- sess = Session()
- u1 = User(name="u1")
- sess.add(u1)
- sess.flush()
+ with Session() as sess:
+ u1 = User(name="u1")
+ sess.add(u1)
+ sess.flush()
+
+ a1 = Address(email_address="a1")
+ with testing.expect_deprecated(
+ '"Address" object is being merged into a Session along '
+ 'the backref cascade path for relationship "User.addresses"'
+ ):
+ a1.user = u1
+ sess.add(a1)
+ sess.expunge(u1)
+ assert u1 not in sess
+ assert a1 in sess
+ assert_raises_message(
+ sa_exc.SAWarning, "not in session", sess.flush
+ )
- a1 = Address(email_address="a1")
- a1.user = u1
- sess.add(a1)
- sess.expunge(u1)
- assert u1 not in sess
- assert a1 in sess
- assert_raises_message(sa_exc.SAWarning, "not in session", sess.flush)
+ def test_m2o_backref_future_child_expunged(self):
+ User, Address = self.classes.User, self.classes.Address
+
+ self._one_to_many_fixture(o2m=True, m2o=True, m2o_cascade=False)
+ with Session(testing.db, future=True) as sess:
+ u1 = User(name="u1")
+ sess.add(u1)
+ sess.flush()
+
+ a1 = Address(email_address="a1")
+ a1.user = u1
+ assert a1 not in sess
+ sess.add(a1)
+ sess.expunge(u1)
+ assert u1 not in sess
+ assert a1 in sess
+ assert_raises_message(
+ sa_exc.SAWarning, "not in session", sess.flush
+ )
def test_m2o_backref_child_pending_nochange(self):
User, Address = self.classes.User, self.classes.Address
User, Address = self.classes.User, self.classes.Address
self._one_to_many_fixture(o2m=True, m2o=True, m2o_cascade=False)
- sess = Session()
- u1 = User(name="u1")
- sess.add(u1)
- sess.flush()
- a1 = Address(email_address="a1")
- a1.user = u1
- sess.add(a1)
- sess.expunge(u1)
- assert u1 not in sess
- assert a1 in sess
+ with Session() as sess:
+ u1 = User(name="u1")
+ sess.add(u1)
+ sess.flush()
+
+ a1 = Address(email_address="a1")
+ with testing.expect_deprecated(
+ '"Address" object is being merged into a Session along the '
+ 'backref cascade path for relationship "User.addresses"'
+ ):
+ a1.user = u1
+ sess.add(a1)
+ sess.expunge(u1)
+ assert u1 not in sess
+ assert a1 in sess
+
+ @testing.emits_warning(r".*not in session")
+ def go():
+ sess.commit()
+
+ go()
+ # didn't get flushed
+ assert a1.user is None
+
+ def test_m2o_backref_future_child_expunged_nochange(self):
+ User, Address = self.classes.User, self.classes.Address
- @testing.emits_warning(r".*not in session")
- def go():
- sess.commit()
+ self._one_to_many_fixture(o2m=True, m2o=True, m2o_cascade=False)
- go()
- # didn't get flushed
- assert a1.user is None
+ with Session(testing.db, future=True) as sess:
+ u1 = User(name="u1")
+ sess.add(u1)
+ sess.flush()
+
+ a1 = Address(email_address="a1")
+ a1.user = u1
+ assert a1 not in sess
+ sess.add(a1)
+ sess.expunge(u1)
+ assert u1 not in sess
+ assert a1 in sess
+
+ @testing.emits_warning(r".*not in session")
+ def go():
+ sess.commit()
+
+ go()
+ # didn't get flushed
+ assert a1.user is None
def test_m2m_only_child_pending(self):
Item, Keyword = self.classes.Item, self.classes.Keyword
),
)
- sess = create_session()
+ sess = Session()
o1 = Order()
sess.add(o1)
)
mapper(User, users)
- sess = create_session()
+ sess = Session()
u1 = User()
sess.add(u1)
)
mapper(Keyword, keywords)
- sess = create_session()
+ sess = Session()
i1 = Item()
k1 = Keyword()
u1 = User(name="ed", pref=Pref(data="pref 1", extra=[Extra()]))
u2 = User(name="jack", pref=Pref(data="pref 2", extra=[Extra()]))
u3 = User(name="foo", pref=Pref(data="pref 3", extra=[Extra()]))
- sess = create_session(connection)
+ sess = Session(connection)
sess.add_all((u1, u2, u3))
sess.flush()
sess.close()
self.tables.extra,
)
- sess = create_session()
- eq_(select(func.count("*")).select_from(prefs).scalar(), 3)
- eq_(select(func.count("*")).select_from(extra).scalar(), 3)
+ sess = Session()
+ eq_(
+ sess.execute(select(func.count("*")).select_from(prefs)).scalar(),
+ 3,
+ )
+ eq_(
+ sess.execute(select(func.count("*")).select_from(extra)).scalar(),
+ 3,
+ )
jack = sess.query(User).filter_by(name="jack").one()
jack.pref = None
sess.flush()
- eq_(select(func.count("*")).select_from(prefs).scalar(), 2)
- eq_(select(func.count("*")).select_from(extra).scalar(), 2)
+ eq_(
+ sess.execute(select(func.count("*")).select_from(prefs)).scalar(),
+ 2,
+ )
+ eq_(
+ sess.execute(select(func.count("*")).select_from(extra)).scalar(),
+ 2,
+ )
def test_cascade_on_deleted(self):
"""test a bug introduced by r6711"""
self.tables.extra,
)
- sess = create_session()
+ sess = Session()
jack = sess.query(User).filter_by(name="jack").one()
p = jack.pref
e = jack.pref.extra[0]
assert p in sess
assert e in sess
sess.flush()
- eq_(select(func.count("*")).select_from(prefs).scalar(), 2)
- eq_(select(func.count("*")).select_from(extra).scalar(), 2)
+ eq_(
+ sess.execute(select(func.count("*")).select_from(prefs)).scalar(),
+ 2,
+ )
+ eq_(
+ sess.execute(select(func.count("*")).select_from(extra)).scalar(),
+ 2,
+ )
def test_pending_expunge(self):
Pref, User = self.classes.Pref, self.classes.User
- sess = create_session()
+ sess = Session()
someuser = User(name="someuser")
sess.add(someuser)
sess.flush()
Pref, User = self.classes.Pref, self.classes.User
- sess = create_session()
+ sess = Session()
jack = sess.query(User).filter_by(name="jack").one()
newpref = Pref(data="newpref")
def test_cascade_delete(self):
T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1)
- sess = create_session()
+ sess = Session()
x = T1(data="t1a", t2=T2(data="t2a", t3=T3(data="t3a")))
sess.add(x)
sess.flush()
def test_deletes_orphans_onelevel(self):
T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1)
- sess = create_session()
+ sess = Session()
x2 = T1(data="t1b", t2=T2(data="t2b", t3=T3(data="t3b")))
sess.add(x2)
sess.flush()
def test_deletes_orphans_twolevel(self):
T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1)
- sess = create_session()
+ sess = Session()
x = T1(data="t1a", t2=T2(data="t2a", t3=T3(data="t3a")))
sess.add(x)
sess.flush()
def test_finds_orphans_twolevel(self):
T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1)
- sess = create_session()
+ sess = Session()
x = T1(data="t1a", t2=T2(data="t2a", t3=T3(data="t3a")))
sess.add(x)
sess.flush()
def test_cascade_delete(self):
T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1)
- sess = create_session()
+ sess = Session()
x = T1(data="t1a", t2=T2(data="t2a", t3=T3(data="t3a")))
sess.add(x)
sess.flush()
def test_cascade_delete_postappend_onelevel(self):
T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1)
- sess = create_session()
+ sess = Session()
x1 = T1(data="t1")
x2 = T2(data="t2")
x3 = T3(data="t3")
def test_cascade_delete_postappend_twolevel(self):
T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1)
- sess = create_session()
+ sess = Session()
x1 = T1(data="t1", t2=T2(data="t2"))
x3 = T3(data="t3")
sess.add_all((x1, x3))
def test_preserves_orphans_onelevel(self):
T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1)
- sess = create_session()
+ sess = Session()
x2 = T1(data="t1b", t2=T2(data="t2b", t3=T3(data="t3b")))
sess.add(x2)
sess.flush()
def test_preserves_orphans_onelevel_postremove(self):
T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1)
- sess = create_session()
+ sess = Session()
x2 = T1(data="t1b", t2=T2(data="t2b", t3=T3(data="t3b")))
sess.add(x2)
sess.flush()
def test_preserves_orphans_twolevel(self):
T2, T3, T1 = (self.classes.T2, self.classes.T3, self.classes.T1)
- sess = create_session()
+ sess = Session()
x = T1(data="t1a", t2=T2(data="t2a", t3=T3(data="t3a")))
sess.add(x)
sess.flush()
)
mapper(B, b)
- sess = create_session()
+ sess = Session()
b1 = B(data="b1")
a1 = A(data="a1", bs=[b1])
sess.add(a1)
a1.bs.remove(b1)
sess.flush()
- eq_(select(func.count("*")).select_from(atob).scalar(), 0)
- eq_(select(func.count("*")).select_from(b).scalar(), 0)
- eq_(select(func.count("*")).select_from(a).scalar(), 1)
+ eq_(
+ sess.execute(select(func.count("*")).select_from(atob)).scalar(), 0
+ )
+ eq_(sess.execute(select(func.count("*")).select_from(b)).scalar(), 0)
+ eq_(sess.execute(select(func.count("*")).select_from(a)).scalar(), 1)
def test_delete_orphan_dynamic(self):
a, A, B, b, atob = (
# failed until [ticket:427] was fixed
mapper(B, b)
- sess = create_session()
+ sess = Session()
b1 = B(data="b1")
a1 = A(data="a1", bs=[b1])
sess.add(a1)
a1.bs.remove(b1)
sess.flush()
- eq_(select(func.count("*")).select_from(atob).scalar(), 0)
- eq_(select(func.count("*")).select_from(b).scalar(), 0)
- eq_(select(func.count("*")).select_from(a).scalar(), 1)
+ eq_(
+ sess.execute(select(func.count("*")).select_from(atob)).scalar(), 0
+ )
+ eq_(sess.execute(select(func.count("*")).select_from(b)).scalar(), 0)
+ eq_(sess.execute(select(func.count("*")).select_from(a)).scalar(), 1)
def test_delete_orphan_cascades(self):
a, A, c, b, C, B, atob = (
)
mapper(C, c)
- sess = create_session()
+ sess = Session()
b1 = B(data="b1", cs=[C(data="c1")])
a1 = A(data="a1", bs=[b1])
sess.add(a1)
a1.bs.remove(b1)
sess.flush()
- eq_(select(func.count("*")).select_from(atob).scalar(), 0)
- eq_(select(func.count("*")).select_from(b).scalar(), 0)
- eq_(select(func.count("*")).select_from(a).scalar(), 1)
- eq_(select(func.count("*")).select_from(c).scalar(), 0)
+ eq_(
+ sess.execute(select(func.count("*")).select_from(atob)).scalar(), 0
+ )
+ eq_(sess.execute(select(func.count("*")).select_from(b)).scalar(), 0)
+ eq_(sess.execute(select(func.count("*")).select_from(a)).scalar(), 1)
+ eq_(sess.execute(select(func.count("*")).select_from(c)).scalar(), 0)
def test_cascade_delete(self):
a, A, B, b, atob = (
)
mapper(B, b)
- sess = create_session()
+ sess = Session()
a1 = A(data="a1", bs=[B(data="b1")])
sess.add(a1)
sess.flush()
sess.delete(a1)
sess.flush()
- eq_(select(func.count("*")).select_from(atob).scalar(), 0)
- eq_(select(func.count("*")).select_from(b).scalar(), 0)
- eq_(select(func.count("*")).select_from(a).scalar(), 0)
+ eq_(
+ sess.execute(select(func.count("*")).select_from(atob)).scalar(), 0
+ )
+ eq_(sess.execute(select(func.count("*")).select_from(b)).scalar(), 0)
+ eq_(sess.execute(select(func.count("*")).select_from(a)).scalar(), 0)
def test_single_parent_error(self):
a, A, B, b, atob = (
sess.add(a1)
d1 = Dingaling()
- d1.address = a1
+ with testing.expect_deprecated(
+ '"Dingaling" object is being merged into a Session along the '
+ 'backref cascade path for relationship "Address.dingalings"'
+ ):
+ d1.address = a1
assert d1 in a1.dingalings
assert d1 in sess
sess.add(a1)
u1 = User(name="u1")
- u1.addresses.append(a1)
+ with testing.expect_deprecated(
+ '"User" object is being merged into a Session along the backref '
+ 'cascade path for relationship "Address.user"'
+ ):
+ u1.addresses.append(a1)
assert u1 in sess
def test_m2o_commit_warns(self):
)
),
)
- s = create_session()
+ s = Session()
u = User()
s.add(u)
)
),
)
- s = create_session()
+ s = Session()
u = User(name="u1", addresses=[Address(email_address="ad1")])
s.add(u)
a1 = u.addresses[0]
)
),
)
- s = create_session()
+ s = Session(expire_on_commit=False, autoflush=False)
a = Account(balance=0)
sr = SalesRep(name="John")
s.add_all((a, sr))
- s.flush()
+ s.commit()
c = Customer(name="Jane")
},
)
- session = create_session()
+ session = Session()
h1 = Home(description="home1", address=Address(street="address1"))
b1 = Business(
description="business1", address=Address(street="address2")
session.expunge_all()
eq_(
- session.query(Home).get(h1.id),
+ session.get(Home, h1.id),
Home(description="home1", address=Address(street="address1")),
)
eq_(
- session.query(Business).get(b1.id),
+ session.get(Business, b1.id),
Business(
description="business1", address=Address(street="address2")
),
)
},
)
- session = create_session()
+ session = Session()
a1 = Address()
session.add(a1)
session.flush()
a1 = A(name="a1", bs=[B(name="b1"), B(name="b2"), B(name="b3")])
- sess = create_session()
+ sess = Session()
sess.add(a1)
sess.flush()
sess.expunge_all()
eq_(
- sess.query(A).get(a1.id),
+ sess.get(A, a1.id),
A(name="a1", bs=[B(name="b1"), B(name="b2"), B(name="b3")]),
)
- a1 = sess.query(A).get(a1.id)
+ a1 = sess.get(A, a1.id)
assert not class_mapper(B)._is_orphan(
attributes.instance_state(a1.bs[0])
)
sess.expunge_all()
eq_(
- sess.query(A).get(a1.id),
+ sess.get(A, a1.id),
A(name="a1", bs=[B(name="b1"), B(name="b2"), B(name="b3")]),
)
def _do_move_test(self, delete_old):
Parent, Child = self.classes.Parent, self.classes.Child
- sess = create_session()
-
- p1, p2, c1 = Parent(), Parent(), Child()
- if Parent.child.property.uselist:
- p1.child.append(c1)
- else:
- p1.child = c1
- sess.add_all([p1, c1])
- sess.flush()
+ with Session(autoflush=False) as sess:
+ p1, p2, c1 = Parent(), Parent(), Child()
+ if Parent.child.property.uselist:
+ p1.child.append(c1)
+ else:
+ p1.child = c1
+ sess.add_all([p1, c1])
+ sess.flush()
- if delete_old:
- sess.delete(p1)
+ if delete_old:
+ sess.delete(p1)
- if Parent.child.property.uselist:
- p2.child.append(c1)
- else:
- p2.child = c1
- sess.add(p2)
+ if Parent.child.property.uselist:
+ p2.child.append(c1)
+ else:
+ p2.child = c1
+ sess.add(p2)
- sess.flush()
- eq_(sess.query(Child).filter(Child.parent_id == p2.id).all(), [c1])
+ sess.flush()
+ eq_(sess.query(Child).filter(Child.parent_id == p2.id).all(), [c1])
def test_o2o_delete_old(self):
Child, Parent, parent, child = (
Parent,
parent,
properties={
- "child": relationship(Child, uselist=False, backref="parent")
+ "child": relationship(
+ Child,
+ uselist=False,
+ backref=backref("parent", cascade_backrefs=False),
+ )
},
)
mapper(Child, child)
Child,
uselist=False,
cascade="all, delete, delete-orphan",
- backref="parent",
+ backref=backref("parent", cascade_backrefs=False),
)
},
)
single_parent=True,
backref=backref("child", uselist=False),
cascade="all,delete,delete-orphan",
+ cascade_backrefs=False,
)
},
)
single_parent=True,
backref=backref("child", uselist=True),
cascade="all,delete,delete-orphan",
+ cascade_backrefs=False,
)
},
)
)
mapper(Child, noninh_child)
- sess = create_session()
+ sess = Session()
c1, c2 = Child(), Child()
b1 = Base(descr="b1", children=[c1, c2])
assert c2 in sess and c2 not in sess.new
assert b1 in sess and b1 not in sess.new
- sess = create_session()
+ sess = Session()
c1, c2 = Child(), Child()
b1 = Base(descr="b1", children=[c1, c2])
sess.add(b1)
assert c2 in sess and c2 in sess.new
assert b1 in sess and b1 in sess.new
- sess = create_session()
+ sess = Session()
c1, c2 = Child(), Child()
b1 = Base(descr="b1", children=[c1, c2])
sess.add(b1)
mapper(Parent, parent, inherits=Base)
- sess = create_session()
+ sess = Session()
p1 = Parent()
c1, c2, c3 = Child(), Child(), Child()