.. sourcecode:: python+sql
# load Jack by primary key
- {sql}>>> jack = session.query(User).get(5)
+ {sql}>>> jack = session.get(User, 5)
BEGIN (implicit)
SELECT users.id AS users_id,
users.name AS users_name,
mapper = inspect(entity)
+ if not mapper or not mapper.is_mapper:
+ raise sa_exc.ArgumentError(
+ "Expected mapped class or mapper, got: %r" % entity
+ )
+
is_dict = isinstance(primary_key_identity, dict)
if not is_dict:
primary_key_identity = util.to_list(
if len(primary_key_identity) != len(mapper.primary_key):
raise sa_exc.InvalidRequestError(
"Incorrect number of values in identifier to formulate "
- "primary key for query.get(); primary key columns are %s"
- % ",".join("'%s'" % c for c in mapper.primary_key)
+ "primary key for session.get(); primary key columns "
+ "are %s" % ",".join("'%s'" % c for c in mapper.primary_key)
)
if is_dict:
util.raise_(
sa_exc.InvalidRequestError(
"Incorrect names of values in identifier to formulate "
- "primary key for query.get(); primary key attribute "
+ "primary key for session.get(); primary key attribute "
"names are %s"
% ",".join(
"'%s'" % prop.key
#
# ORM Query
#
- r"The Query\.get\(\) method",
r"The Query.with_polymorphic\(\) method is considered "
"legacy as of the 1.x series",
#
sess = fixture_session()
sess2 = fixture_session()
- p1 = sess.query(Parent).get(1)
+ p1 = sess.get(Parent, 1)
p1.children
# down from 185 on this this is a small slice of a usually
sess = fixture_session()
sess2 = fixture_session()
- p1 = sess.query(Parent).get(1)
+ p1 = sess.get(Parent, 1)
p1.children
# preloading of collection took this down from 1728 to 1192
self.session.flush()
id_, type_ = obj.id, type(obj)
self.session.expunge_all()
- return self.session.query(type_).get(id_)
+ return self.session.get(type_, id_)
def _test_sequence_ops(self):
Parent, Child = self.classes("Parent", "Child")
session.flush()
id_, type_ = obj.id, type(obj)
session.expunge_all()
- return session.query(type_).get(id_)
+ return session.get(type_, id_)
p = Parent("p")
self.session.flush()
id_, type_ = obj.id, type(obj)
self.session.expunge_all()
- return self.session.query(type_).get(id_)
+ return self.session.get(type_, id_)
def test_lazy_list(self):
Parent, Child = self.classes("Parent", "Child")
self.assert_sql_count(testing.db, go, 1)
- u1 = sess.query(User).get(7) # noqa
+ u1 = sess.get(User, 7) # noqa
def go():
u2 = bq(sess).get(7)
self.assert_sql_count(testing.db, go, 1)
- u1 = sess.query(AddressUser).get((10, None)) # noqa
+ u1 = sess.get(AddressUser, (10, None)) # noqa
def go():
u2 = bq(sess).get((10, None))
def test_get(self):
sess = self._fixture_data()
- tokyo = sess.query(WeatherLocation).get(1)
+ tokyo = sess.get(WeatherLocation, 1)
eq_(tokyo.city, "Tokyo")
- newyork = sess.query(WeatherLocation).get(2)
+ newyork = sess.get(WeatherLocation, 2)
eq_(newyork.city, "New York")
- t2 = sess.query(WeatherLocation).get(1)
+ t2 = sess.get(WeatherLocation, 1)
is_(t2, tokyo)
def test_get_explicit_shard(self):
sess = self._fixture_data()
- tokyo = sess.query(WeatherLocation).set_shard("europe").get(1)
+ tokyo = (
+ sess.query(WeatherLocation)
+ .set_shard("europe")
+ .where(WeatherLocation.id == 1)
+ .first()
+ )
is_(tokyo, None)
- newyork = sess.query(WeatherLocation).set_shard("north_america").get(2)
+ newyork = (
+ sess.query(WeatherLocation)
+ .set_shard("north_america")
+ .where(WeatherLocation.id == 2)
+ .first()
+ )
eq_(newyork.city, "New York")
# now it found it
- t2 = sess.query(WeatherLocation).get(1)
+ t2 = sess.get(WeatherLocation, 1)
eq_(t2.city, "Tokyo")
def test_query_explicit_shard_via_bind_opts(self):
tokyo.city # reload 'city' attribute on tokyo
sess.expire_all()
- t = sess.query(WeatherLocation).get(tokyo.id)
+ t = sess.get(WeatherLocation, tokyo.id)
eq_(t.city, tokyo.city)
eq_(t.reports[0].temperature, 80.0)
north_american_cities = sess.query(WeatherLocation).filter(
Person = self.classes.Person
s = fixture_session()
- jill = s.query(Person).get(3)
+ jill = s.get(Person, 3)
s.query(Person).update(
{Person.first_name: "moonbeam"}, synchronize_session="evaluate"
Person = self.classes.Person
s = fixture_session()
- jill = s.query(Person).get(3)
+ jill = s.get(Person, 3)
s.query(Person).update(
{Person.fname2: "moonbeam"}, synchronize_session="evaluate"
Person = self.classes.Person
s = fixture_session()
- jill = s.query(Person).get(3)
+ jill = s.get(Person, 3)
s.query(Person).update(
{Person.fname: "moonbeam"}, synchronize_session="evaluate"
Person = self.classes.Person
s = fixture_session()
- jill = s.query(Person).get(3)
+ jill = s.get(Person, 3)
s.query(Person).update(
{Person.fname2: "moonbeam"}, synchronize_session="fetch"
Person = self.classes.Person
s = fixture_session()
- jill = s.query(Person).get(3)
+ jill = s.get(Person, 3)
s.query(Person).update(
{Person.fname: "moonbeam"}, synchronize_session="fetch"
Person = self.classes.Person
s = fixture_session()
- jill = s.query(Person).get(3)
+ jill = s.get(Person, 3)
s.query(Person).update(
{Person.name: "moonbeam sunshine"}, synchronize_session="evaluate"
Person = self.classes.Person
s = fixture_session()
- jill = s.query(Person).get(3)
+ jill = s.get(Person, 3)
s.query(Person).update(
{Person.name: "moonbeam sunshine"}, synchronize_session="fetch"
Person = self.classes.Person
s = fixture_session()
- jill = s.query(Person).get(3)
+ jill = s.get(Person, 3)
s.query(Person).update(
{Person.uname: "moonbeam sunshine"}, synchronize_session="evaluate"
session.expunge_all()
del s1
- srt = session.query(Slide).get(id_)
+ srt = session.get(Slide, id_)
self.assert_(srt.bullets)
self.assert_(len(srt.bullets) == 4)
session.expunge_all()
del s1
- srt = session.query(Slide).get(id_)
+ srt = session.get(Slide, id_)
self.assert_(srt.bullets)
self.assert_(len(srt.bullets) == 5)
session.flush()
session.expunge_all()
- srt = session.query(Slide).get(id_)
+ srt = session.get(Slide, id_)
titles = ["s1/b1", "s1/b2", "s1/b100", "s1/b4", "raw", "raw2"]
found = [b.text for b in srt.bullets]
eq_(titles, found)
session.expunge_all()
del s1
- srt = session.query(Slide).get(id_)
+ srt = session.get(Slide, id_)
self.assert_(srt.bullets)
self.assert_(len(srt.bullets) == 6)
session.expunge_all()
del s1
- srt = session.query(Slide).get(id_)
+ srt = session.get(Slide, id_)
self.assert_(srt.bullets)
self.assert_(len(srt.bullets) == 3)
session.flush()
session.expunge_all()
- srt = session.query(Slide).get(id_)
+ srt = session.get(Slide, id_)
self.assert_(srt.bullets)
self.assert_(len(srt.bullets) == 3)
.scalar(),
1,
)
- u1 = Session.query(User).get(8)
+ u1 = Session.get(User, 8)
q = (
Session.query(Address)
.filter(Address.user == u1)
# this eager load sets up an AliasedClauses for the "comment"
# relationship, then stores it in clauses_by_lead_mapper[mapper for
# Derived]
- d = sess.query(Derived).get("uid1")
+ d = sess.get(Derived, "uid1")
sess.expunge_all()
assert len([c for c in d.comments]) == 1
# relationship, and should store it in clauses_by_lead_mapper[mapper
# for DerivedII]. the bug was that the previous AliasedClause create
# prevented this population from occurring.
- d2 = sess.query(DerivedII).get("uid2")
+ d2 = sess.get(DerivedII, "uid2")
sess.expunge_all()
# object is not in the session; therefore the lazy load cant trigger
sess.add(d)
sess.flush()
sess.expunge_all()
- x = sess.query(Design).get(1)
+ x = sess.get(Design, 1)
x.inheritedParts
invoice_id = i1.invoice_id
session.expunge_all()
- c = session.query(Company).get(company_id)
+ c = session.get(Company, company_id)
session.expunge_all()
- i = session.query(Invoice).get(invoice_id)
+ i = session.get(Invoice, invoice_id)
def go():
eq_(c, i.company)
f = Foo()
sess.add(f)
sess.flush()
- assert sess.query(Foo).get(f.id) is f
+ assert sess.get(Foo, f.id) is f
finally:
if hasattr(bind, "close"):
bind.close()
s.add(Person(id=5, personname="thename"))
s.commit()
- p = s.query(Person).get(5)
+ p = s.get(Person, 5)
with self.sql_execution_asserter(testing.db) as asserter:
p.personname = "newname"
s.bulk_save_objects([p])
pid = p.id
session.expunge_all()
- p = session.query(Parent).get(pid)
+ p = session.get(Parent, pid)
eq_(set(p.children.keys()), set(["foo", "bar"]))
cid = p.children["foo"].id
session.flush()
session.expunge_all()
- p = session.query(Parent).get(pid)
+ p = session.get(Parent, pid)
self.assert_(set(p.children.keys()) == set(["foo", "bar"]))
self.assert_(p.children["foo"].id != cid)
session.flush()
session.expunge_all()
- p = session.query(Parent).get(pid)
+ p = session.get(Parent, pid)
self.assert_(
len(list(collections.collection_adapter(p.children))) == 2
)
session.flush()
session.expunge_all()
- p = session.query(Parent).get(pid)
+ p = session.get(Parent, pid)
self.assert_(
len(list(collections.collection_adapter(p.children))) == 1
)
session.flush()
session.expunge_all()
- p = session.query(Parent).get(pid)
+ p = session.get(Parent, pid)
self.assert_(
len(list(collections.collection_adapter(p.children))) == 0
)
pid = p.id
session.expunge_all()
- p = session.query(Parent).get(pid)
+ p = session.get(Parent, pid)
self.assert_(
set(p.children.keys()) == set([("foo", "1"), ("foo", "2")])
session.flush()
session.expunge_all()
- p = session.query(Parent).get(pid)
+ p = session.get(Parent, pid)
self.assert_(
set(p.children.keys()) == set([("foo", "1"), ("foo", "2")])
sess.add(f)
sess.flush()
sess.expunge_all()
- f = sess.query(Foo).get(f.col1)
+ f = sess.get(Foo, f.col1)
assert len(list(f.bars)) == 2
f.bars.clear()
sess.add(f)
sess.flush()
sess.expunge_all()
- f = sess.query(Foo).get(f.col1)
+ f = sess.get(Foo, f.col1)
assert len(list(f.bars)) == 2
f.bars.clear()
sess.add(f)
sess.flush()
sess.expunge_all()
- f = sess.query(Foo).get(f.col1)
+ f = sess.get(Foo, f.col1)
assert len(list(f.bars)) == 2
strongref = list(f.bars.values())
f.bars["a"] = Bar("a")
sess.flush()
sess.expunge_all()
- f = sess.query(Foo).get(f.col1)
+ f = sess.get(Foo, f.col1)
assert len(list(f.bars)) == 2
replaced = set([id(b) for b in list(f.bars.values())])
sess.flush()
sess.expunge_all()
- p2 = sess.query(Parent).get(p1.col1)
+ p2 = sess.get(Parent, p1.col1)
o = list(p2.children)
assert len(o) == 3
},
)
sess = Session(connection)
- assert sess.query(Node).get(1).names == []
+ assert sess.get(Node, 1).names == []
def test_conflicting_backref_two(self):
meta = MetaData()
g1 = sess.query(Graph).first()
sess.close()
- g = sess.query(Graph).get(g1.id)
+ g = sess.get(Graph, g1.id)
eq_(
[(e.start, e.end) for e in g.edges],
[(Point(3, 4), Point(5, 6)), (Point(14, 5), Point(2, 7))],
g.edges[1].end = Point(18, 4)
sess.commit()
- e = sess.query(Edge).get(g.edges[1].id)
+ e = sess.get(Edge, g.edges[1].id)
eq_(e.end, Point(18, 4))
def test_not_none(self):
sess.close()
def go():
- g2 = (
- sess.query(Graph)
- .options(sa.orm.joinedload(Graph.edges))
- .get(g.id)
+ g2 = sess.get(
+ Graph, g.id, options=[sa.orm.joinedload(Graph.edges)]
)
eq_(
sess.add(g)
sess.commit()
- g2 = sess.query(Graph).get(1)
+ g2 = sess.get(Graph, 1)
assert g2.edges[-1].start.x is None
assert g2.edges[-1].start.y is None
sess = self._fixture()
g = sess.query(Graph).first()
- g2 = sess.query(Graph).get([g.id, g.version_id])
+ g2 = sess.get(Graph, [g.id, g.version_id])
eq_(g.version, g2.version)
def test_get_by_composite(self):
sess = self._fixture()
g = sess.query(Graph).first()
- g2 = sess.query(Graph).get(Version(g.id, g.version_id))
+ g2 = sess.get(Graph, Version(g.id, g.version_id))
eq_(g.version, g2.version)
def test_pk_mutation(self):
g.version = Version(2, 1)
sess.commit()
- g2 = sess.query(Graph).get(Version(2, 1))
+ g2 = sess.get(Graph, Version(2, 1))
eq_(g.version, g2.version)
@testing.fails_on_everything_except("sqlite")
sess.add(c1)
sess.flush()
sess.expunge_all()
- c1 = sess.query(C1).get(c1.c1)
+ c1 = sess.get(C1, c1.c1)
c2 = C1()
c2.parent = c1
sess.add(c2)
session.flush()
session.expunge_all()
- f1 = session.query(A).get(f1.id)
- f2 = session.query(A).get(f2.id)
+ f1 = session.get(A, f1.id)
+ f2 = session.get(A, f2.id)
assert f2.foo is f1
session.commit()
with fixture_session() as session:
- a = session.query(Account).get(42)
+ a = session.get(Account, 42)
self.check_data_fixture(a)
def test_appending_to_relationship(self):
account.add_widget(Widget("Xyzzy"))
with Session(testing.db) as session:
- a = session.query(Account).get(42)
+ a = session.get(Account, 42)
eq_(a.widget_count, 3)
eq_(len(a.widgets), 3)
)
sess = fixture_session()
- o1 = sess.query(Order).get(1)
+ o1 = sess.get(Order, 1)
eq_(o1.description, "order 1")
def test_unsaved_2(self):
)
sess = fixture_session()
- o2 = sess.query(Order).get(2)
+ o2 = sess.get(Order, 2)
o2.isopen = 1
sess.flush()
},
)
sess = fixture_session(autoflush=False)
- o = sess.query(Order).get(3)
+ o = sess.get(Order, 3)
assert "userident" not in o.__dict__
o.description = "somenewdescription"
eq_(o.description, "somenewdescription")
)
sess = fixture_session()
- o2 = sess.query(Order).get(3)
+ o2 = sess.get(Order, 3)
# this will load the group of attributes
eq_(o2.description, "order 3")
r"The Query.with_parent\(\) method is considered legacy as of the 1.x"
)
+query_get_dep = r"The Query.get\(\) method is considered legacy as of the 1.x"
+
sef_dep = (
r"The Query.select_entity_from\(\) method is considered "
"legacy as of the 1.x"
)
+class GetTest(QueryTest):
+ def test_get(self):
+ User = self.classes.User
+
+ s = fixture_session()
+ with assertions.expect_deprecated_20(query_get_dep):
+ assert s.query(User).get(19) is None
+ with assertions.expect_deprecated_20(query_get_dep):
+ u = s.query(User).get(7)
+ with assertions.expect_deprecated_20(query_get_dep):
+ u2 = s.query(User).get(7)
+ assert u is u2
+ s.expunge_all()
+ with assertions.expect_deprecated_20(query_get_dep):
+ u2 = s.query(User).get(7)
+ assert u is not u2
+
+ def test_loader_options(self):
+ User = self.classes.User
+
+ s = fixture_session()
+
+ with assertions.expect_deprecated_20(query_get_dep):
+ u1 = s.query(User).options(joinedload(User.addresses)).get(8)
+ eq_(len(u1.__dict__["addresses"]), 3)
+
+ def test_no_criterion_when_already_loaded(self):
+ """test that get()/load() does not use preexisting filter/etc.
+ criterion, even when we're only using the identity map."""
+
+ User, Address = self.classes.User, self.classes.Address
+
+ s = fixture_session()
+
+ s.get(User, 7)
+
+ q = s.query(User).join(User.addresses).filter(Address.user_id == 8)
+ with assertions.expect_deprecated_20(query_get_dep):
+ with assertions.expect_raises_message(
+ sa_exc.InvalidRequestError,
+ r"Query.get\(\) being called on a Query with existing "
+ "criterion.",
+ ):
+ q.get(7)
+
+ def test_no_criterion(self):
+ """test that get()/load() does not use preexisting filter/etc.
+ criterion"""
+
+ User, Address = self.classes.User, self.classes.Address
+
+ s = fixture_session()
+
+ q = s.query(User).join(User.addresses).filter(Address.user_id == 8)
+
+ with assertions.expect_deprecated_20(query_get_dep):
+ with assertions.expect_raises_message(
+ sa_exc.InvalidRequestError,
+ r"Query.get\(\) being called on a Query with existing "
+ "criterion.",
+ ):
+ q.get(7)
+
+ with assertions.expect_deprecated_20(query_get_dep):
+ with assertions.expect_raises_message(
+ sa_exc.InvalidRequestError,
+ r"Query.get\(\) being called on a Query with existing "
+ "criterion.",
+ ):
+ s.query(User).filter(User.id == 7).get(19)
+
+ # order_by()/get() doesn't raise
+ with assertions.expect_deprecated_20(query_get_dep):
+ s.query(User).order_by(User.id).get(8)
+
+ def test_get_against_col(self):
+ User = self.classes.User
+
+ s = fixture_session()
+
+ with assertions.expect_deprecated_20(query_get_dep):
+ with assertions.expect_raises_message(
+ sa_exc.InvalidRequestError,
+ r"get\(\) can only be used against a single mapped class.",
+ ):
+ s.query(User.id).get(5)
+
+ def test_only_full_mapper_zero(self):
+ User, Address = self.classes.User, self.classes.Address
+
+ s = fixture_session()
+ q = s.query(User, Address)
+
+ with assertions.expect_deprecated_20(query_get_dep):
+ with assertions.expect_raises_message(
+ sa_exc.InvalidRequestError,
+ r"get\(\) can only be used against a single mapped class.",
+ ):
+ q.get(5)
+
+
class CustomJoinTest(QueryTest):
run_setup_mappers = None
m.add_property("name", synonym("_name"))
sess = fixture_session(autocommit=False)
- assert sess.query(User).get(7)
+ assert sess.get(User, 7)
u = sess.query(User).filter_by(name="jack").one()
self.assert_sql_count(testing.db, go, count)
sess = fixture_session()
- user = sess.query(User).get(7)
+ user = sess.get(User, 7)
closed_mapper = User.closed_orders.entity
open_mapper = User.open_orders.entity
User, Address = self.classes.User, self.classes.Address
sess = fixture_session()
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
with assertions.expect_deprecated_20(query_wparent_dep):
q = sess.query(Address).select_from(Address).with_parent(u1)
self.assert_compile(
User, Address = self.classes.User, self.classes.Address
sess = fixture_session()
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
with assertions.expect_deprecated_20(query_wparent_dep):
q = sess.query(User, Address).with_parent(
u1, User.addresses, from_entity=Address
User, Address = self.classes.User, self.classes.Address
sess = fixture_session()
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
a1 = aliased(Address)
with assertions.expect_deprecated_20(query_wparent_dep):
q = sess.query(a1).with_parent(u1)
User, Address = self.classes.User, self.classes.Address
sess = fixture_session()
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
a1 = aliased(Address)
with assertions.expect_deprecated_20(query_wparent_dep):
q = sess.query(a1).with_parent(u1, User.addresses)
User, Address = self.classes.User, self.classes.Address
sess = fixture_session()
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
a1 = aliased(Address)
a2 = aliased(Address)
with assertions.expect_deprecated_20(query_wparent_dep):
User, Address = self.classes.User, self.classes.Address
sess = fixture_session()
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
a1 = aliased(Address)
a2 = aliased(Address)
with assertions.expect_deprecated_20(query_wparent_dep):
User, Address = self._user_address_fixture()
sess = fixture_session()
- u = sess.query(User).get(8)
+ u = sess.get(User, 8)
sess.expunge(u)
assert_raises(
orm_exc.DetachedInstanceError,
User, Address = self._user_address_fixture()
sess = fixture_session()
- u = sess.query(User).get(8)
+ u = sess.get(User, 8)
sess.expunge(u)
with testing.expect_warnings(
def test_order_by(self):
User, Address = self._user_address_fixture()
sess = fixture_session()
- u = sess.query(User).get(8)
+ u = sess.get(User, 8)
eq_(
list(u.addresses.order_by(desc(Address.email_address))),
[
)
sess = fixture_session()
- u = sess.query(User).get(8)
+ u = sess.get(User, 8)
with self.sql_execution_asserter() as asserter:
for i in range(3):
)
sess = fixture_session()
- u = sess.query(User).get(8)
+ u = sess.get(User, 8)
eq_(
list(u.addresses),
[
self.mapper_registry.map_imperatively(User, users)
sess = fixture_session()
- ad = sess.query(Address).get(1)
+ ad = sess.get(Address, 1)
def go():
ad.user = None
self.assert_sql_count(testing.db, go, 0)
sess.flush()
- u = sess.query(User).get(7)
+ u = sess.get(User, 7)
assert ad not in u.addresses
def test_no_count(self):
), # noqa
0,
)
- u1 = sess.query(User).get(u1.id)
+ u1 = sess.get(User, u1.id)
u1.addresses.append(a1)
sess.flush()
self.mapper_registry.map_imperatively(Address, addresses)
sess = fixture_session()
- user = sess.query(User).get(7)
+ user = sess.get(User, 7)
assert getattr(User, "addresses").hasparent(
sa.orm.attributes.instance_state(user.addresses[0]),
optimistic=True,
eq_(q.all(), [User(id=7, addresses=[Address(id=1)])])
sess.expunge_all()
- u = sess.query(User).get(7)
+ u = sess.get(User, 7)
def go():
eq_(u.addresses[0].user_id, 7)
sess.expunge_all()
def go():
- u = sess.query(User).get(8)
+ u = sess.get(User, 8)
eq_(
User(
id=8,
def go():
a = q.filter(addresses.c.id == 1).one()
is_not(a.user, None)
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
is_(a.user, u1)
self.assert_sql_count(testing.db, go, 1)
def test_runs_query_on_refresh(self):
User, Address, sess = self._eager_config_fixture()
- u1 = sess.query(User).get(8)
+ u1 = sess.get(User, 8)
assert "addresses" in u1.__dict__
sess.expire(u1)
)
sess = fixture_session(autoflush=False)
- u1 = sess.query(User).get(8)
+ u1 = sess.get(User, 8)
assert "addresses" in u1.__dict__
sess.expire(u1)
# out an existing collection to function correctly with
# populate_existing.
User, Address, sess = self._eager_config_fixture()
- u1 = sess.query(User).get(8)
+ u1 = sess.get(User, 8)
u1.addresses[2].email_address = "foofoo"
del u1.addresses[1]
u1 = sess.query(User).populate_existing().filter_by(id=8).one()
def test_loads_second_level_collection_to_scalar(self):
User, Address, Dingaling, sess = self._collection_to_scalar_fixture()
- u1 = sess.query(User).get(8)
+ u1 = sess.get(User, 8)
a1 = Address()
u1.addresses.append(a1)
a2 = u1.addresses[0]
def test_loads_second_level_collection_to_collection(self):
User, Order, Item, sess = self._collection_to_collection_fixture()
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
u1.orders
o1 = Order()
u1.orders.append(o1)
sess.add(u)
sess.flush()
sess.expire(u)
- u = sess.query(User).get(u.id)
+ u = sess.get(User, u.id)
sess.expunge_all()
- u = sess.query(User).get(u.id)
+ u = sess.get(User, u.id)
u.name = "u1 changed"
sess.flush()
sess.delete(u)
am = AdminUser(name="au1", email_address="au1@e1")
sess.add(am)
sess.flush()
- am = sess.query(AdminUser).populate_existing().get(am.id)
+ am = sess.get(AdminUser, am.id, populate_existing=True)
sess.expunge_all()
- am = sess.query(AdminUser).get(am.id)
+ am = sess.get(AdminUser, am.id)
am.name = "au1 changed"
sess.flush()
sess.delete(am)
am = AdminUser(name="au1", email_address="au1@e1")
sess.add(am)
sess.flush()
- am = sess.query(AdminUser).populate_existing().get(am.id)
+ am = sess.get(AdminUser, am.id, populate_existing=True)
sess.expunge_all()
- am = sess.query(AdminUser).get(am.id)
+ am = sess.get(AdminUser, am.id)
am.name = "au1 changed"
sess.flush()
sess.delete(am)
self.mapper_registry.map_imperatively(Address, addresses)
sess = fixture_session(autoflush=False)
- u = sess.query(User).get(7)
+ u = sess.get(User, 7)
assert len(u.addresses) == 1
u.name = "foo"
del u.addresses[0]
s = fixture_session()
- a1 = s.query(Address).get(2)
- u1 = s.query(User).get(7)
+ a1 = s.get(Address, 2)
+ u1 = s.get(User, 7)
a1.user = u1
s.expire(a1, ["user_id"])
self.mapper_registry.map_imperatively(User, users)
s = fixture_session()
- u = s.query(User).get(7)
+ u = s.get(User, 7)
s.expunge_all()
assert_raises_message(
self.mapper_registry.map_imperatively(User, users)
s = fixture_session(autocommit=False)
- u = s.query(User).get(10)
+ u = s.get(User, 10)
s.expire_all()
def go():
- s.query(User).get(10) # get() refreshes
+ s.get(User, 10) # get() refreshes
self.assert_sql_count(testing.db, go, 1)
self.assert_sql_count(testing.db, go, 0)
def go():
- s.query(User).get(10) # expire flag reset, so not expired
+ s.get(User, 10) # expire flag reset, so not expired
self.assert_sql_count(testing.db, go, 0)
self.mapper_registry.map_imperatively(User, users)
s = fixture_session(autocommit=False)
- u = s.query(User).get(10)
+ u = s.get(User, 10)
s.expire_all()
s.execute(users.delete().where(User.id == 10))
# object is gone, get() returns None, removes u from session
assert u in s
- assert s.query(User).get(10) is None
+ assert s.get(User, 10) is None
assert u not in s # and expunges
def test_refresh_on_deleted_raises(self):
self.mapper_registry.map_imperatively(User, users)
s = fixture_session(autocommit=False)
- u = s.query(User).get(10)
+ u = s.get(User, 10)
s.expire_all()
s.expire_all()
self.mapper_registry.map_imperatively(User, users)
s = fixture_session(autocommit=False)
- u = s.query(User).get(10)
+ u = s.get(User, 10)
s.expire_all()
s.execute(users.delete().where(User.id == 10))
# do a get()/remove u from session
- assert s.query(User).get(10) is None
+ assert s.get(User, 10) is None
assert u not in s
s.rollback()
User, users, properties={"name": deferred(users.c.name)}
)
s = fixture_session(autocommit=False)
- u = s.query(User).get(10)
+ u = s.get(User, 10)
assert "name" not in u.__dict__
s.execute(users.delete().where(User.id == 10))
)
self.mapper_registry.map_imperatively(Address, addresses)
s = fixture_session(autoflush=True, autocommit=False)
- u = s.query(User).get(8)
+ u = s.get(User, 8)
adlist = u.addresses
eq_(
adlist,
)
self.mapper_registry.map_imperatively(Address, addresses)
s = fixture_session(autoflush=True, autocommit=False)
- u = s.query(User).get(8)
+ u = s.get(User, 8)
assert_raises_message(
sa_exc.InvalidRequestError,
"properties specified for refresh",
self.mapper_registry.map_imperatively(User, users)
s = fixture_session()
- u = s.query(User).get(7)
+ u = s.get(User, 7)
s.expire(u)
s.refresh(u)
def go():
- u = s.query(User).get(7)
+ u = s.get(User, 7)
eq_(u.name, "jack")
self.assert_sql_count(testing.db, go, 0)
self.mapper_registry.map_imperatively(User, users)
sess = fixture_session(autoflush=False)
- u = sess.query(User).get(7)
+ u = sess.get(User, 7)
sess.expire(u, attribute_names=["name"])
self.assert_sql_count(testing.db, go, 0)
sess.flush()
sess.expunge_all()
- assert sess.query(User).get(7).name == "somenewname"
+ assert sess.get(User, 7).name == "somenewname"
def test_no_session(self):
users, User = self.tables.users, self.classes.User
self.mapper_registry.map_imperatively(User, users)
sess = fixture_session()
- u = sess.query(User).get(7)
+ u = sess.get(User, 7)
sess.expire(u, attribute_names=["name"])
sess.expunge(u)
# occur during a flush() on an instance that was just inserted
self.mapper_registry.map_imperatively(User, users)
sess = fixture_session(autoflush=False)
- u = sess.query(User).get(7)
+ u = sess.get(User, 7)
sess.expire(u, attribute_names=["name"])
sess.expunge(u)
# are absent. ensure an error is raised.
self.mapper_registry.map_imperatively(User, users)
sess = fixture_session()
- u = sess.query(User).get(7)
+ u = sess.get(User, 7)
sess.expire(u, attribute_names=["name", "id"])
sess.expunge(u)
self.mapper_registry.map_imperatively(Order, orders)
sess = fixture_session(autoflush=False)
- o = sess.query(Order).get(3)
+ o = sess.get(Order, 3)
sess.expire(o)
o.description = "order 3 modified"
self.mapper_registry.map_imperatively(Order, orders)
sess = fixture_session(autoflush=False)
- o = sess.query(Order).get(3)
+ o = sess.get(Order, 3)
sess.expire(o)
sess.execute(orders.update(), dict(description="order 3 modified"))
)
self.mapper_registry.map_imperatively(Address, addresses)
s = fixture_session(autoflush=False)
- u = s.query(User).get(8)
+ u = s.get(User, 8)
assert u.addresses[0].email_address == "ed@wood.com"
u.addresses[0].email_address = "someotheraddress"
)
self.mapper_registry.map_imperatively(Address, addresses)
s = fixture_session(autoflush=False)
- u = s.query(User).get(8)
+ u = s.get(User, 8)
assert u.addresses[0].email_address == "ed@wood.com"
u.addresses[0].email_address = "someotheraddress"
self.mapper_registry.map_imperatively(Address, addresses)
s = fixture_session(autoflush=False)
- u = s.query(User).get(8)
+ u = s.get(User, 8)
a = Address(id=12, email_address="foobar")
u.addresses.append(a)
self.mapper_registry.map_imperatively(Address, addresses)
sess = fixture_session()
- u = sess.query(User).get(7)
+ u = sess.get(User, 7)
sess.expire(u)
assert "name" not in u.__dict__
self.mapper_registry.map_imperatively(Address, addresses)
sess = fixture_session()
- u = sess.query(User).get(7)
+ u = sess.get(User, 7)
sess.expire(u)
assert "name" not in u.__dict__
self.mapper_registry.map_imperatively(Address, addresses)
sess = fixture_session(autoflush=False)
- u = sess.query(User).get(7)
+ u = sess.get(User, 7)
a1 = u.addresses[0]
eq_(a1.email_address, "jack@bean.com")
)
self.mapper_registry.map_imperatively(Address, addresses)
sess = fixture_session(autoflush=False)
- u = sess.query(User).get(8)
+ u = sess.get(User, 8)
sess.expire(u, ["name", "addresses"])
u.addresses
assert "name" not in u.__dict__
)
self.mapper_registry.map_imperatively(Address, addresses)
sess = fixture_session()
- u = sess.query(User).get(8)
+ u = sess.get(User, 8)
sess.expire(u)
u.id
)
self.mapper_registry.map_imperatively(Address, addresses)
sess = fixture_session()
- u = sess.query(User).options(joinedload(User.addresses)).get(8)
+ u = sess.get(User, 8, options=[joinedload(User.addresses)])
sess.expire(u)
u.id
assert "addresses" in u.__dict__
)
self.mapper_registry.map_imperatively(Address, addresses)
sess = fixture_session()
- u = sess.query(User).get(8)
+ u = sess.get(User, 8)
sess.expire(u)
# here, the lazy loader will encounter the attribute already
)
sess = fixture_session()
- u = sess.query(User).get(7)
+ u = sess.get(User, 7)
assert "name" in u.__dict__
assert u.uname == u.name
self.mapper_registry.map_imperatively(Order, orders)
sess = fixture_session(autoflush=False)
- o = sess.query(Order).get(3)
+ o = sess.get(Order, 3)
sess.expire(o, attribute_names=["description"])
assert "id" in o.__dict__
self.mapper_registry.map_imperatively(Address, addresses)
sess = fixture_session(autoflush=False)
- u = sess.query(User).get(8)
+ u = sess.get(User, 8)
sess.expire(u, ["name", "addresses"])
assert "name" not in u.__dict__
self.mapper_registry.map_imperatively(Address, addresses)
sess = fixture_session(autoflush=False)
- u = sess.query(User).get(8)
+ u = sess.get(User, 8)
sess.expire(u, ["name", "addresses"])
assert "name" not in u.__dict__
self.mapper_registry.map_imperatively(Address, addresses)
sess = fixture_session(autoflush=False)
- u = sess.query(User).get(8)
+ u = sess.get(User, 8)
assert "name" in u.__dict__
u.addresses
assert "addresses" in u.__dict__
)
sess = fixture_session(autoflush=False)
- o = sess.query(Order).get(3)
+ o = sess.get(Order, 3)
sess.expire(o, ["description", "isopen"])
assert "isopen" not in o.__dict__
assert "description" not in o.__dict__
sess.expunge_all()
# same tests, using deferred at the options level
- o = sess.query(Order).options(sa.orm.defer(Order.description)).get(3)
+ o = sess.get(Order, 3, options=[sa.orm.defer(Order.description)])
assert "description" not in o.__dict__
self.mapper_registry.map_imperatively(Address, addresses)
sess = fixture_session(autoflush=False)
- u = sess.query(User).get(8)
+ u = sess.get(User, 8)
assert len(u.addresses) == 3
sess.expire(u)
assert "addresses" not in u.__dict__
Engineer = self.classes.Engineer
sess = fixture_session(autoflush=False)
- e1 = sess.query(Engineer).get(2)
+ e1 = sess.get(Engineer, 2)
sess.expire(e1, attribute_names=["name"])
sess.expunge(e1)
# same as test_no_instance_key, but the PK columns
# are absent. ensure an error is raised.
sess = fixture_session(autoflush=False)
- e1 = sess.query(Engineer).get(2)
+ e1 = sess.get(Engineer, 2)
sess.expire(e1, attribute_names=["name", "person_id"])
sess.expunge(e1)
},
)
s = fixture_session(autoflush=False)
- u = s.query(User).get(7)
+ u = s.get(User, 7)
u.name = "foo"
a = Address()
assert sa.orm.object_session(a) is None
self.mapper_registry.map_imperatively(User, users)
s = fixture_session()
- u = s.query(User).get(7)
+ u = s.get(User, 7)
s.expunge_all()
assert_raises_message(
sa_exc.InvalidRequestError,
s = fixture_session()
- a1 = s.query(Address).get(2)
- u1 = s.query(User).get(7)
+ a1 = s.get(Address, 2)
+ u1 = s.get(User, 7)
a1.user = u1
s.refresh(a1, ["user_id"])
self.mapper_registry.map_imperatively(User, users)
s = fixture_session()
- u = s.query(User).get(7)
+ u = s.get(User, 7)
s.expire(u)
assert "name" not in u.__dict__
s.refresh(u)
)
s = fixture_session()
- u = s.query(User).get(8)
+ u = s.get(User, 8)
assert len(u.addresses) == 3
s.refresh(u)
assert len(u.addresses) == 3
s = fixture_session()
- u = s.query(User).get(8)
+ u = s.get(User, 8)
assert len(u.addresses) == 3
s.expire(u)
assert len(u.addresses) == 3
self.mapper_registry.map_imperatively(
Order, orders, properties={"address": relationship(Address)}
) # m2o
+ configure_mappers()
sess = fixture_session()
def go():
- o1 = (
- sess.query(Order)
- .options(joinedload(Order.address).joinedload(Address.user))
- .get(1)
+ o1 = sess.get(
+ Order,
+ 1,
+ options=[joinedload(Order.address).joinedload(Address.user)],
)
eq_(o1.address.user.count, 1)
s.flush()
insp = inspect(u1)
eq_(insp.identity, (u1.id,))
- is_(s.query(User).get(insp.identity), u1)
+ is_(s.get(User, insp.identity), u1)
def test_is_instance(self):
User = self.classes.User
self.mapper_registry.map_imperatively(Address, addresses)
sess = fixture_session()
- user = sess.query(User).get(7)
+ user = sess.get(User, 7)
assert getattr(User, "addresses").hasparent(
attributes.instance_state(user.addresses[0]), optimistic=True
)
self.assert_sql_count(testing.db, go, 15)
sess = fixture_session()
- user = sess.query(User).get(7)
+ user = sess.get(User, 7)
closed_mapper = User.closed_orders.entity
open_mapper = User.open_orders.entity
)
# load user that is attached to the address
- u1 = sess.query(User).get(8)
+ u1 = sess.get(User, 8)
def go():
# lazy load of a1.user should get it from the session
)
# load user that is attached to the address
- u1 = sess.query(User).get(8)
+ u1 = sess.get(User, 8)
def go():
# lazy load of a1.user should get it from the session
assert a.user is not None
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
assert a.user is u1
sess.flush()
sess.expunge_all()
- ad2 = sess.query(Address).get(1)
- ad3 = sess.query(Address).get(ad1.id)
+ ad2 = sess.get(Address, 1)
+ ad3 = sess.get(Address, ad1.id)
def go():
# one lazy load
Order(id=5, address=None),
],
),
- s.query(User).populate_existing().get(7),
+ s.get(User, 7, populate_existing=True),
)
self.assert_sql_count(testing.db, go, 1)
)
s = fixture_session()
- u = s.query(User).get(7)
+ u = s.get(User, 7)
eq_(u._name, "jack")
eq_(u._id, 7)
u2 = s.query(User).filter_by(user_name="jack").one()
m.add_property("addresses", relationship(Address))
sess = fixture_session(autocommit=False)
- assert sess.query(User).get(7)
+ assert sess.get(User, 7)
u = sess.query(User).filter_by(name="jack").one()
)
sess = fixture_session()
- u1 = sess.query(User).get(7)
- u2 = sess.query(User).get(8)
+ u1 = sess.get(User, 7)
+ u2 = sess.get(User, 8)
# comparaison ops need to work
a1 = sess.query(Address).filter(Address.user == u1).one()
eq_(a1.id, 1)
s = Session(testing.db)
s.add(u1)
s.commit()
- assert s.query(NoBoolAllowed).get(u1.id) is u1
+ assert s.get(NoBoolAllowed, u1.id) is u1
def test_we_dont_call_eq(self):
class NoEqAllowed(object):
sess.flush()
sess.expunge_all()
- u2 = sess.query(User).get(7)
+ u2 = sess.get(User, 7)
eq_(
u2,
# assert data was saved
sess2 = fixture_session()
- u2 = sess2.query(User).get(7)
+ u2 = sess2.get(User, 7)
eq_(
u2,
User(
# assert modified/merged data was saved
with fixture_session() as sess:
- u = sess.query(User).get(7)
+ u = sess.get(User, 7)
eq_(
u,
User(
eq_(load.called, 18)
with fixture_session(expire_on_commit=False) as sess5:
- u2 = sess5.query(User).get(u.id)
+ u2 = sess5.get(User, u.id)
eq_(u2.name, "fred2")
eq_(u2.addresses[1].email_address, "afafds")
eq_(load.called, 21)
eq_(load.called, 0)
sess2 = fixture_session()
- u2 = sess2.query(User).get(u.id)
+ u2 = sess2.get(User, u.id)
eq_(load.called, 1)
u.addresses[1].email_address = "addr 2 modified"
eq_(load.called, 3)
sess3 = fixture_session()
- u3 = sess3.query(User).get(u.id)
+ u3 = sess3.get(User, u.id)
eq_(load.called, 4)
u.name = "also fred"
eq_(load.called, 0)
with fixture_session(expire_on_commit=False) as sess2:
- o2 = sess2.query(Order).get(o.id)
+ o2 = sess2.get(Order, o.id)
eq_(load.called, 1)
o.items[1].description = "item 2 modified"
eq_(load.called, 3)
with fixture_session(expire_on_commit=False) as sess3:
- o3 = sess3.query(Order).get(o.id)
+ o3 = sess3.get(Order, o.id)
eq_(load.called, 4)
o.description = "desc modified"
eq_(load.called, 0)
sess2 = fixture_session()
- u2 = sess2.query(User).get(7)
+ u2 = sess2.get(User, 7)
eq_(load.called, 1)
u2.name = "fred2"
u2.address.email_address = "hoho@lalala.com"
sess.commit()
sess2 = fixture_session()
- u2 = (
- sess2.query(User).options(sa.orm.joinedload(User.addresses)).get(7)
- )
+ u2 = sess2.get(User, 7, options=[sa.orm.joinedload(User.addresses)])
sess3 = fixture_session()
u3 = sess3.merge(u2, load=False) # noqa
"mapped instances before merging with load=False." in str(e)
)
- u2 = sess2.query(User).get(7)
+ u2 = sess2.get(User, 7)
sess3 = fixture_session()
u3 = sess3.merge(u2, load=False) # noqa
with fixture_session() as sess2:
eq_(
- sess2.query(User).get(u2.id).addresses[0].email_address,
+ sess2.get(User, u2.id).addresses[0].email_address,
"somenewaddress",
)
sess2 = fixture_session()
sess = fixture_session()
- u = sess.query(User).get(7)
+ u = sess.get(User, 7)
u.addresses.append(Address())
sess2 = fixture_session()
try:
sess2.flush()
sess2.expunge_all()
eq_(
- sess2.query(User).get(u2.id).addresses[0].email_address,
+ sess2.get(User, u2.id).addresses[0].email_address,
"somenewaddress",
)
except sa.exc.InvalidRequestError as e:
sess.add(u1)
sess.flush()
- assert sess.query(User).get("jack") is u1
+ assert sess.get(User, "jack") is u1
u1.username = "ed"
sess.flush()
def go():
- assert sess.query(User).get("ed") is u1
+ assert sess.get(User, "ed") is u1
self.assert_sql_count(testing.db, go, 0)
- assert sess.query(User).get("jack") is None
+ assert sess.get(User, "jack") is None
sess.expunge_all()
- u1 = sess.query(User).get("ed")
+ u1 = sess.get(User, "ed")
eq_(User(username="ed", fullname="jack"), u1)
def test_load_after_expire(self):
sess.add(u1)
sess.flush()
- assert sess.query(User).get("jack") is u1
+ assert sess.get(User, "jack") is u1
sess.execute(
users.update().values({User.username: "jack"}), dict(username="ed")
assert_raises(sa.orm.exc.ObjectDeletedError, getattr, u1, "username")
sess.expunge_all()
- assert sess.query(User).get("jack") is None
- assert sess.query(User).get("ed").fullname == "jack"
+ assert sess.get(User, "jack") is None
+ assert sess.get(User, "ed").fullname == "jack"
@testing.requires.returning
def test_update_to_sql_expr(self):
sess.add(u1)
sess.flush()
- assert sess.query(User).get("jack") is u1
+ assert sess.get(User, "jack") is u1
sess.expire(u1)
u1.username = "ed"
sess.flush()
sess.expunge_all()
- assert sess.query(User).get("ed").fullname == "jack"
+ assert sess.get(User, "ed").fullname == "jack"
@testing.requires.on_update_cascade
def test_onetomany_passive(self):
sess.add(u1)
sess.flush()
- assert sess.query(Address).get("jack1") is u1.addresses[0]
+ assert sess.get(Address, "jack1") is u1.addresses[0]
u1.username = "ed"
sess.flush()
sess.query(Address).all(),
)
- u1 = sess.query(User).get("ed")
+ u1 = sess.get(User, "ed")
u1.username = "jack"
def go():
username="jack",
addresses=[Address(username="jack"), Address(username="jack")],
)
- == sess.query(User).get("jack")
+ == sess.get(User, "jack")
)
- u1 = sess.query(User).get("jack")
+ u1 = sess.get(User, "jack")
u1.addresses = []
u1.username = "fred"
sess.flush()
sess.expunge_all()
- assert sess.query(Address).get("jack1").username is None
- u1 = sess.query(User).get("fred")
+ assert sess.get(Address, "jack1").username is None
+ u1 = sess.get(User, "fred")
eq_(User(username="fred", fullname="jack"), u1)
@testing.requires.on_update_cascade
sess.query(Address).all(),
)
- u1 = sess.query(User).get("ed")
+ u1 = sess.get(User, "ed")
assert len(u1.addresses) == 2 # load addresses
u1.username = "fred"
eq_(["ed", "jack"], sorted([u.username for u in r[1].users]))
sess.expunge_all()
- u2 = sess.query(User).get(u2.username)
+ u2 = sess.get(User, u2.username)
u2.username = "wendy"
sess.flush()
r = sess.query(Item).filter(with_parent(u2, User.items)).all()
a_editable.status = PUBLISHED
session.commit()
- assert session.query(User).get([1, PUBLISHED]) is a_editable
- assert session.query(User).get([1, ARCHIVED]) is a_published
+ assert session.get(User, [1, PUBLISHED]) is a_editable
+ assert session.get(User, [1, ARCHIVED]) is a_published
a_published.status = PUBLISHED
a_editable.status = EDITABLE
session.commit()
- assert session.query(User).get([1, PUBLISHED]) is a_published
- assert session.query(User).get([1, EDITABLE]) is a_editable
+ assert session.get(User, [1, PUBLISHED]) is a_published
+ assert session.get(User, [1, EDITABLE]) is a_editable
@testing.requires.savepoints
def test_reverse_savepoint(self):
[("jack",), ("jack",)],
)
- assert sess.query(Address).get(a1.id) is u1.addresses[0]
+ assert sess.get(Address, a1.id) is u1.addresses[0]
u1.username = "ed"
sess.flush()
sess.query(Address).all(),
)
- u1 = sess.query(User).get(u1.id)
+ u1 = sess.get(User, u1.id)
u1.username = "jack"
def go():
username="jack",
addresses=[Address(username="jack"), Address(username="jack")],
)
- == sess.query(User).get(u1.id)
+ == sess.get(User, u1.id)
)
sess.expunge_all()
- u1 = sess.query(User).get(u1.id)
+ u1 = sess.get(User, u1.id)
u1.addresses = []
u1.username = "fred"
sess.flush()
sess.expunge_all()
- a1 = sess.query(Address).get(a1.id)
+ a1 = sess.get(Address, a1.id)
eq_(a1.username, None)
eq_(
[(None,), (None,)],
)
- u1 = sess.query(User).get(u1.id)
+ u1 = sess.get(User, u1.id)
eq_(User(username="fred", fullname="jack"), u1)
jid = j.id
pid = p.id
- j = session.query(Jack).get(jid)
- p = session.query(Port).get(pid)
+ j = session.get(Jack, jid)
+ p = session.get(Port, pid)
assert p.jack is not None
assert p.jack is j
assert j.port is not None
session.expunge_all()
- j = session.query(Jack).get(jid)
- p = session.query(Port).get(pid)
+ j = session.get(Jack, jid)
+ p = session.get(Port, pid)
j.port = None
sess.expunge_all()
- eq_(u1, sess.query(User).get(u2.id))
+ eq_(u1, sess.get(User, u2.id))
def test_no_mappers(self):
users = self.tables.users
sess.commit()
with fixture_session() as sess:
- u1 = sess.query(User).get(u1.id)
+ u1 = sess.get(User, u1.id)
assert "name" not in u1.__dict__
assert "addresses" not in u1.__dict__
sess.commit()
with fixture_session(expire_on_commit=False) as sess:
- u1 = (
- sess.query(User)
- .options(
+ u1 = sess.get(
+ User,
+ u1.id,
+ options=[
sa.orm.defer(User.name),
sa.orm.defaultload(User.addresses).defer(
Address.email_address
),
- )
- .get(u1.id)
+ ],
)
assert "name" not in u1.__dict__
assert "addresses" not in u1.__dict__
s = fixture_session()
- u1 = s.query(User).options(joinedload(User.addresses)).get(8)
- eq_(len(u1.__dict__["addresses"]), 3)
-
- def test_loader_options_future(self):
- User = self.classes.User
-
- s = fixture_session()
-
u1 = s.get(User, 8, options=[joinedload(User.addresses)])
eq_(len(u1.__dict__["addresses"]), 3)
CompositePk = self.classes.CompositePk
s = fixture_session()
- is_(s.query(CompositePk).get({"i": 100, "j": 100}), None)
+ is_(s.get(CompositePk, {"i": 100, "j": 100}), None)
def test_get_composite_pk_keyword_based_result(self):
CompositePk = self.classes.CompositePk
s = fixture_session()
- one_two = s.query(CompositePk).get({"i": 1, "j": 2})
+ one_two = s.get(CompositePk, {"i": 1, "j": 2})
eq_(one_two.i, 1)
eq_(one_two.j, 2)
eq_(one_two.k, 3)
CompositePk = self.classes.CompositePk
s = fixture_session()
- q = s.query(CompositePk)
- assert_raises(sa_exc.InvalidRequestError, q.get, {"i": 1, "k": 2})
+ assert_raises(
+ sa_exc.InvalidRequestError, s.get, CompositePk, {"i": 1, "k": 2}
+ )
def test_get_composite_pk_keyword_based_too_few_keys(self):
CompositePk = self.classes.CompositePk
s = fixture_session()
- q = s.query(CompositePk)
- assert_raises(sa_exc.InvalidRequestError, q.get, {"i": 1})
+ assert_raises(sa_exc.InvalidRequestError, s.get, CompositePk, {"i": 1})
def test_get_composite_pk_keyword_based_too_many_keys(self):
CompositePk = self.classes.CompositePk
s = fixture_session()
- q = s.query(CompositePk)
assert_raises(
- sa_exc.InvalidRequestError, q.get, {"i": 1, "j": "2", "k": 3}
+ sa_exc.InvalidRequestError,
+ s.get,
+ CompositePk,
+ {"i": 1, "j": "2", "k": 3},
)
def test_get(self):
User = self.classes.User
- s = fixture_session()
- assert s.query(User).get(19) is None
- u = s.query(User).get(7)
- u2 = s.query(User).get(7)
- assert u is u2
- s.expunge_all()
- u2 = s.query(User).get(7)
- assert u is not u2
-
- def test_get_future(self):
- User = self.classes.User
-
s = fixture_session()
assert s.get(User, 19) is None
u = s.get(User, 7)
CompositePk = self.classes.CompositePk
s = fixture_session()
- assert s.query(CompositePk).get((100, 100)) is None
+ assert s.get(CompositePk, (100, 100)) is None
def test_get_composite_pk_result(self):
CompositePk = self.classes.CompositePk
s = fixture_session()
- one_two = s.query(CompositePk).get((1, 2))
+ one_two = s.get(CompositePk, (1, 2))
assert one_two.i == 1
assert one_two.j == 2
assert one_two.k == 3
CompositePk = self.classes.CompositePk
s = fixture_session()
- q = s.query(CompositePk)
- assert_raises(sa_exc.InvalidRequestError, q.get, 7)
+ assert_raises_message(
+ sa_exc.InvalidRequestError,
+ r"Incorrect number of values in identifier to formulate "
+ r"primary key for session.get\(\); ",
+ s.get,
+ CompositePk,
+ 7,
+ )
def test_get_too_few_params_tuple(self):
CompositePk = self.classes.CompositePk
s = fixture_session()
- q = s.query(CompositePk)
- assert_raises(sa_exc.InvalidRequestError, q.get, (7,))
+ assert_raises_message(
+ sa_exc.InvalidRequestError,
+ r"Incorrect number of values in identifier to formulate "
+ r"primary key for session.get\(\); ",
+ s.get,
+ CompositePk,
+ (7,),
+ )
def test_get_too_many_params(self):
CompositePk = self.classes.CompositePk
s = fixture_session()
- q = s.query(CompositePk)
- assert_raises(sa_exc.InvalidRequestError, q.get, (7, 10, 100))
+ assert_raises_message(
+ sa_exc.InvalidRequestError,
+ r"Incorrect number of values in identifier to formulate "
+ r"primary key for session.get\(\); ",
+ s.get,
+ CompositePk,
+ (7, 10, 100),
+ )
def test_get_against_col(self):
User = self.classes.User
s = fixture_session()
- q = s.query(User.id)
- assert_raises(sa_exc.InvalidRequestError, q.get, (5,))
+ assert_raises_message(
+ sa_exc.ArgumentError,
+ r"Expected mapped class or mapper, got: .*Instrumented",
+ s.get,
+ User.id,
+ (5,),
+ )
@testing.fixture
def outerjoin_mapping(self, registry):
UserThing = outerjoin_mapping
sess = fixture_session()
- u10 = sess.query(UserThing).get((10, None))
+ u10 = sess.get(UserThing, (10, None))
eq_(u10, UserThing(id=10))
def test_get_fully_null_pk(self):
User = self.classes.User
s = fixture_session()
- q = s.query(User)
assert_raises_message(
sa_exc.SAWarning,
r"fully NULL primary key identity cannot load any object. "
"This condition may raise an error in a future release.",
- q.get,
+ s.get,
+ User,
None,
)
UserThing = outerjoin_mapping
s = fixture_session()
- q = s.query(UserThing)
assert_raises_message(
sa_exc.SAWarning,
r"fully NULL primary key identity cannot load any object. "
"This condition may raise an error in a future release.",
- q.get,
+ s.get,
+ UserThing,
(None, None),
)
- def test_no_criterion(self):
- """test that get()/load() does not use preexisting filter/etc.
- criterion"""
-
- User, Address = self.classes.User, self.classes.Address
-
- s = fixture_session()
-
- q = s.query(User).join(User.addresses).filter(Address.user_id == 8)
- assert_raises(sa_exc.InvalidRequestError, q.get, 7)
- assert_raises(
- sa_exc.InvalidRequestError,
- s.query(User).filter(User.id == 7).get,
- 19,
- )
-
- # order_by()/get() doesn't raise
- s.query(User).order_by(User.id).get(8)
-
- def test_no_criterion_when_already_loaded(self):
- """test that get()/load() does not use preexisting filter/etc.
- criterion, even when we're only using the identity map."""
-
- User, Address = self.classes.User, self.classes.Address
-
- s = fixture_session()
-
- s.query(User).get(7)
-
- q = s.query(User).join(User.addresses).filter(Address.user_id == 8)
- assert_raises(sa_exc.InvalidRequestError, q.get, 7)
-
def test_unique_param_names(self):
users = self.tables.users
assert s.primary_key == m.primary_key
sess = fixture_session()
- assert sess.query(SomeUser).get(7).name == "jack"
+ assert sess.get(SomeUser, 7).name == "jack"
def test_load(self):
User, Address = self.classes.User, self.classes.Address
s = fixture_session(autoflush=False)
- assert s.query(User).populate_existing().get(19) is None
+ assert s.get(User, 19, populate_existing=True) is None
- u = s.query(User).populate_existing().get(7)
- u2 = s.query(User).populate_existing().get(7)
+ u = s.get(User, 7, populate_existing=True)
+ u2 = s.get(User, 7, populate_existing=True)
assert u is u2
s.expunge_all()
- u2 = s.query(User).populate_existing().get(7)
+ u2 = s.get(User, 7, populate_existing=True)
assert u is not u2
u2.name = "some name"
assert u2 in s.dirty
assert a in u2.addresses
- s.query(User).populate_existing().get(7)
+ s.get(User, 7, populate_existing=True)
assert u2 not in s.dirty
assert u2.name == "jack"
LocalFoo(id=ustring, data=ustring),
)
- def test_populate_existing(self):
- User, Address = self.classes.User, self.classes.Address
- Order = self.classes.Order
-
- s = fixture_session(autoflush=False)
-
- userlist = s.query(User).all()
-
- u = userlist[0]
- u.name = "foo"
- a = Address(name="ed")
- u.addresses.append(a)
-
- self.assert_(a in u.addresses)
-
- s.query(User).populate_existing().all()
-
- self.assert_(u not in s.dirty)
-
- self.assert_(u.name == "jack")
-
- self.assert_(a not in u.addresses)
-
- u.addresses[0].email_address = "lala"
- u.orders[1].items[2].description = "item 12"
- # test that lazy load doesn't change child items
- s.query(User).populate_existing().all()
- assert u.addresses[0].email_address == "lala"
- assert u.orders[1].items[2].description == "item 12"
-
- # eager load does
- s.query(User).options(
- joinedload(User.addresses),
- joinedload(User.orders).joinedload(Order.items),
- ).populate_existing().all()
- assert u.addresses[0].email_address == "jack@bean.com"
- assert u.orders[1].items[2].description == "item 5"
-
- def test_populate_existing_future(self):
- User, Address = self.classes.User, self.classes.Address
- Order = self.classes.Order
-
- s = fixture_session(autoflush=False)
-
- userlist = s.query(User).all()
-
- u = userlist[0]
- u.name = "foo"
- a = Address(name="ed")
- u.addresses.append(a)
-
- self.assert_(a in u.addresses)
-
- stmt = select(User).execution_options(populate_existing=True)
-
- s.execute(
- stmt,
- ).scalars().all()
-
- self.assert_(u not in s.dirty)
-
- self.assert_(u.name == "jack")
-
- self.assert_(a not in u.addresses)
-
- u.addresses[0].email_address = "lala"
- u.orders[1].items[2].description = "item 12"
- # test that lazy load doesn't change child items
- s.query(User).populate_existing().all()
- assert u.addresses[0].email_address == "lala"
- assert u.orders[1].items[2].description == "item 12"
-
- # eager load does
-
- stmt = (
- select(User)
- .options(
- joinedload(User.addresses),
- joinedload(User.orders).joinedload(Order.items),
- )
- .execution_options(populate_existing=True)
- )
-
- s.execute(stmt).unique().scalars().all()
-
- assert u.addresses[0].email_address == "jack@bean.com"
- assert u.orders[1].items[2].description == "item 5"
-
- def test_option_transfer_future(self):
- User = self.classes.User
- stmt = select(User).execution_options(
- populate_existing=True, autoflush=False, yield_per=10
- )
- s = fixture_session()
-
- m1 = mock.Mock()
-
- event.listen(s, "do_orm_execute", m1)
-
- s.execute(stmt)
-
- eq_(
- m1.mock_calls[0][1][0].load_options,
- QueryContext.default_load_options(
- _autoflush=False, _populate_existing=True, _yield_per=10
- ),
- )
-
class InvalidGenerationsTest(QueryTest, AssertsCompiledSQL):
@testing.combinations(
)
assert_raises(sa_exc.InvalidRequestError, q.with_polymorphic, User)
- def test_only_full_mapper_zero(self):
- User, Address = self.classes.User, self.classes.Address
-
- s = fixture_session()
-
- q = s.query(User, Address)
- assert_raises(sa_exc.InvalidRequestError, q.get, 5)
-
def test_entity_or_mapper_zero_from_context(self):
User, Address = self.classes.User, self.classes.Address
s = fixture_session()
User, Address = self.classes.User, self.classes.Address
sess = fixture_session()
- address = sess.query(Address).get(3)
+ address = sess.get(Address, 3)
assert [User(id=8)] == sess.query(User).filter(
User.addresses.contains(address)
).all()
Address.id
).all()
- dingaling = sess.query(Dingaling).get(2)
+ dingaling = sess.get(Dingaling, 2)
assert [User(id=9)] == sess.query(User).filter(
User.addresses.any(Address.dingaling == dingaling)
).all()
Item, Order = self.classes.Item, self.classes.Order
sess = fixture_session()
- item = sess.query(Item).get(3)
+ item = sess.get(Item, 3)
eq_(
sess.query(Order)
[Order(id=4), Order(id=5)],
)
- item2 = sess.query(Item).get(5)
+ item2 = sess.get(Item, 5)
eq_(
sess.query(Order)
.filter(Order.items.contains(item))
)
sess = fixture_session()
- user = sess.query(User).get(8)
+ user = sess.get(User, 8)
assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(
Address
).filter(Address.user == user).all()
).all() # noqa
# o2o
- dingaling = sess.query(Dingaling).get(2)
+ dingaling = sess.get(Dingaling, 2)
assert [Address(id=5)] == sess.query(Address).filter(
Address.dingaling == dingaling
).all()
User, Address = self.classes.User, self.classes.Address
sess = fixture_session()
- user = sess.query(User).get(8)
+ user = sess.get(User, 8)
assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(
Address
).filter_by(user=user).all()
User, Address = self.classes.User, self.classes.Address
sess = fixture_session()
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
q = (
sess.query(Address)
.select_from(Address)
User, Address = self.classes.User, self.classes.Address
sess = fixture_session()
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
q = sess.query(User, Address).filter(
with_parent(u1, User.addresses, from_entity=Address)
)
User, Address = self.classes.User, self.classes.Address
sess = fixture_session()
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
a1 = aliased(Address)
q = sess.query(a1).filter(with_parent(u1, User.addresses.of_type(a1)))
self.assert_compile(
User, Address = self.classes.User, self.classes.Address
sess = fixture_session()
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
a1 = aliased(Address)
a2 = aliased(Address)
q = sess.query(a1, a2).filter(
User, Address = self.classes.User, self.classes.Address
sess = fixture_session()
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
a1 = aliased(Address)
a2 = aliased(Address)
q = sess.query(a1, a2).filter(
class ExecutionOptionsTest(QueryTest):
+ def test_populate_existing(self):
+ User, Address = self.classes.User, self.classes.Address
+ Order = self.classes.Order
+
+ s = fixture_session(autoflush=False)
+
+ userlist = s.query(User).all()
+
+ u = userlist[0]
+ u.name = "foo"
+ a = Address(name="ed")
+ u.addresses.append(a)
+
+ self.assert_(a in u.addresses)
+
+ s.query(User).populate_existing().all()
+
+ self.assert_(u not in s.dirty)
+
+ self.assert_(u.name == "jack")
+
+ self.assert_(a not in u.addresses)
+
+ u.addresses[0].email_address = "lala"
+ u.orders[1].items[2].description = "item 12"
+ # test that lazy load doesn't change child items
+ s.query(User).populate_existing().all()
+ assert u.addresses[0].email_address == "lala"
+ assert u.orders[1].items[2].description == "item 12"
+
+ # eager load does
+ s.query(User).options(
+ joinedload(User.addresses),
+ joinedload(User.orders).joinedload(Order.items),
+ ).populate_existing().all()
+ assert u.addresses[0].email_address == "jack@bean.com"
+ assert u.orders[1].items[2].description == "item 5"
+
+ def test_populate_existing_future(self):
+ User, Address = self.classes.User, self.classes.Address
+ Order = self.classes.Order
+
+ s = fixture_session(autoflush=False)
+
+ userlist = s.query(User).all()
+
+ u = userlist[0]
+ u.name = "foo"
+ a = Address(name="ed")
+ u.addresses.append(a)
+
+ self.assert_(a in u.addresses)
+
+ stmt = select(User).execution_options(populate_existing=True)
+
+ s.execute(
+ stmt,
+ ).scalars().all()
+
+ self.assert_(u not in s.dirty)
+
+ self.assert_(u.name == "jack")
+
+ self.assert_(a not in u.addresses)
+
+ u.addresses[0].email_address = "lala"
+ u.orders[1].items[2].description = "item 12"
+ # test that lazy load doesn't change child items
+ s.query(User).populate_existing().all()
+ assert u.addresses[0].email_address == "lala"
+ assert u.orders[1].items[2].description == "item 12"
+
+ # eager load does
+
+ stmt = (
+ select(User)
+ .options(
+ joinedload(User.addresses),
+ joinedload(User.orders).joinedload(Order.items),
+ )
+ .execution_options(populate_existing=True)
+ )
+
+ s.execute(stmt).unique().scalars().all()
+
+ assert u.addresses[0].email_address == "jack@bean.com"
+ assert u.orders[1].items[2].description == "item 5"
+
+ def test_option_transfer_future(self):
+ User = self.classes.User
+ stmt = select(User).execution_options(
+ populate_existing=True, autoflush=False, yield_per=10
+ )
+ s = fixture_session()
+
+ m1 = mock.Mock()
+
+ event.listen(s, "do_orm_execute", m1)
+
+ s.execute(stmt)
+
+ eq_(
+ m1.mock_calls[0][1][0].load_options,
+ QueryContext.default_load_options(
+ _autoflush=False, _populate_existing=True, _yield_per=10
+ ),
+ )
+
def test_option_building(self):
User = self.classes.User
s = fixture_session(query_cls=fixture())
- assert s.query(User).get(19) is None
- u = s.query(User).get(7)
- u2 = s.query(User).get(7)
+ assert s.get(User, 19) is None
+ u = s.get(User, 7)
+ u2 = s.get(User, 7)
assert u is u2
def _test_o2m_lazyload(self, fixture):
sess.flush()
sess.expunge_all()
- c1 = sess.query(C1).get(c1.id)
+ c1 = sess.get(C1, c1.id)
assert set([x.id for x in c1.t2s]) == set([c2a.id, c2b.id])
assert set([x.id for x in c1.t2_view]) == set([c2b.id])
sess.flush()
sess.expunge_all()
- c1 = sess.query(C1).get(c1.t1id)
+ c1 = sess.get(C1, c1.t1id)
assert set([x.t2id for x in c1.t2s]) == set([c2a.t2id, c2b.t2id])
assert set([x.t2id for x in c1.t2_view]) == set([c2b.t2id])
)
sess = fixture_session()
- q = sess.query(User).options(selectinload(User.addresses))
-
def go():
eq_(
User(
id=7,
addresses=[Address(id=1, email_address="jack@bean.com")],
),
- q.get(7),
+ sess.get(User, 7, options=[selectinload(User.addresses)]),
)
self.assert_sql_count(testing.db, go, 2)
def go():
a = q.filter(addresses.c.id == 1).one()
is_not(a.user, None)
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
is_(a.user, u1)
self.assert_sql_count(testing.db, go, 2)
def test_runs_query_on_refresh(self):
User, Address, sess = self._eager_config_fixture()
- u1 = sess.query(User).get(8)
+ u1 = sess.get(User, 8)
assert "addresses" in u1.__dict__
sess.expire(u1)
def test_no_query_on_deferred(self):
User, Address, sess = self._deferred_config_fixture()
- u1 = sess.query(User).get(8)
+ u1 = sess.get(User, 8)
assert "addresses" in u1.__dict__
sess.expire(u1, ["addresses"])
def test_populate_existing_propagate(self):
User, Address, sess = self._eager_config_fixture()
- u1 = sess.query(User).get(8)
+ u1 = sess.get(User, 8)
u1.addresses[2].email_address = "foofoo"
del u1.addresses[1]
u1 = sess.query(User).populate_existing().filter_by(id=8).one()
def test_loads_second_level_collection_to_scalar(self):
User, Address, Dingaling, sess = self._collection_to_scalar_fixture()
- u1 = sess.query(User).get(8)
+ u1 = sess.get(User, 8)
a1 = Address()
u1.addresses.append(a1)
a2 = u1.addresses[0]
def test_loads_second_level_collection_to_collection(self):
User, Order, Item, sess = self._collection_to_collection_fixture()
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
u1.orders
o1 = Order()
u1.orders.append(o1)
s2.delete,
user,
)
- u2 = s2.query(User).get(user.id)
+ u2 = s2.get(User, user.id)
s2.expunge(u2)
assert_raises_message(
sa.exc.InvalidRequestError,
assert u1 not in sess
assert Session.object_session(u1) is None
- u2 = sess.query(User).get(u1.id)
+ u2 = sess.get(User, u1.id)
assert u2 is not None and u2 is not u1
assert u2 in sess
sess.expunge_all()
- u3 = sess.query(User).get(u1.id)
+ u3 = sess.get(User, u1.id)
assert u3 is not u1 and u3 is not u2 and u3.name == u1.name
def test_no_double_save(self):
sess.commit()
sess.close()
- u = sess.query(User).get(u.id)
+ u = sess.get(User, u.id)
q = sess.query(Address).filter(Address.user == u)
del u
gc_collect()
)
sess = fixture_session(autocommit=False, autoflush=True)
- u = sess.query(User).get(8)
+ u = sess.get(User, 8)
newad = Address(email_address="a new address")
u.addresses.append(newad)
u.name = "some new name"
)
sess = fixture_session()
- q = sess.query(User).options(subqueryload(User.addresses))
-
def go():
eq_(
User(
id=7,
addresses=[Address(id=1, email_address="jack@bean.com")],
),
- q.get(7),
+ sess.get(User, 7, options=[subqueryload(User.addresses)]),
)
self.assert_sql_count(testing.db, go, 2)
def go():
a = q.filter(addresses.c.id == 1).one()
is_not(a.user, None)
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
is_(a.user, u1)
self.assert_sql_count(testing.db, go, 2)
def test_runs_query_on_refresh(self):
User, Address, sess = self._eager_config_fixture()
- u1 = sess.query(User).get(8)
+ u1 = sess.get(User, 8)
assert "addresses" in u1.__dict__
sess.expire(u1)
def test_no_query_on_deferred(self):
User, Address, sess = self._deferred_config_fixture()
- u1 = sess.query(User).get(8)
+ u1 = sess.get(User, 8)
assert "addresses" in u1.__dict__
sess.expire(u1, ["addresses"])
def test_populate_existing_propagate(self):
User, Address, sess = self._eager_config_fixture()
- u1 = sess.query(User).get(8)
+ u1 = sess.get(User, 8)
u1.addresses[2].email_address = "foofoo"
del u1.addresses[1]
u1 = sess.query(User).populate_existing().filter_by(id=8).one()
def test_loads_second_level_collection_to_scalar(self):
User, Address, Dingaling, sess = self._collection_to_scalar_fixture()
- u1 = sess.query(User).get(8)
+ u1 = sess.get(User, 8)
a1 = Address()
u1.addresses.append(a1)
a2 = u1.addresses[0]
def test_loads_second_level_collection_to_collection(self):
User, Order, Item, sess = self._collection_to_collection_fixture()
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
u1.orders
o1 = Order()
u1.orders.append(o1)
def test_attrs_on_rollback(self):
User = self.classes.User
sess = fixture_session()
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
u1.name = "ed"
sess.rollback()
eq_(u1.name, "jack")
def test_commit_persistent(self):
User = self.classes.User
sess = fixture_session()
- u1 = sess.query(User).get(7)
+ u1 = sess.get(User, 7)
u1.name = "ed"
sess.flush()
sess.commit()
def test_concurrent_commit_persistent(self):
User = self.classes.User
s1 = fixture_session()
- u1 = s1.query(User).get(7)
+ u1 = s1.get(User, 7)
u1.name = "ed"
s1.commit()
s2 = fixture_session()
- u2 = s2.query(User).get(7)
+ u2 = s2.get(User, 7)
assert u2.name == "ed"
u2.name = "will"
s2.commit()
s.flush()
s.execute(users.insert().values(name="u1"))
- u2 = s.query(User).get("u1")
+ u2 = s.get(User, "u1")
assert u1 not in s
s.rollback()
session.flush()
session.expunge_all()
- e2 = session.query(Entry).get((e.multi_id, 2))
+ e2 = session.get(Entry, (e.multi_id, 2))
self.assert_(e is not e2)
state = sa.orm.attributes.instance_state(e)
state2 = sa.orm.attributes.instance_state(e2)
self.sql_count_(1, go)
session.expunge_all()
- u = session.query(User).get(u.id)
+ u = session.get(User, u.id)
eq_(u.name, "test2")
eq_(u.counter, 2)
conn.scalar(select(func.count("*")).select_from(myothertable)),
4,
)
- mc = session.query(MyClass).get(mc.id)
+ mc = session.get(MyClass, mc.id)
session.delete(mc)
session.flush()
conn.scalar(select(func.count("*")).select_from(myothertable)),
4,
)
- mc = session.query(MyClass).get(mc.id)
+ mc = session.get(MyClass, mc.id)
session.delete(mc)
assert_raises(sa.exc.DBAPIError, session.flush)
1,
)
- mc = session.query(MyClass).get(mc.id)
+ mc = session.get(MyClass, mc.id)
session.delete(mc)
mc.children[0].data = "some new data"
assert_raises(sa.exc.DBAPIError, session.flush)
session.expunge_all()
eq_(
- session.query(Hoho).get(h1.id),
+ session.get(Hoho, h1.id),
Hoho(hoho=hohoval, secondaries=[Secondary(data="s1")]),
)
- h1 = session.query(Hoho).get(h1.id)
+ h1 = session.get(Hoho, h1.id)
h1.secondaries.append(Secondary(data="s2"))
session.flush()
session.expunge_all()
eq_(
- session.query(Hoho).get(h1.id),
+ session.get(Hoho, h1.id),
Hoho(
hoho=hohoval,
secondaries=[Secondary(data="s1"), Secondary(data="s2")],
session.flush()
session.expunge_all()
- u2 = session.query(User).get(u2.id)
+ u2 = session.get(User, u2.id)
eq_(len(u2.addresses), 1)
def test_child_move_2(self):
session.flush()
session.expunge_all()
- u2 = session.query(User).get(u2.id)
+ u2 = session.get(User, u2.id)
eq_(len(u2.addresses), 1)
def test_o2m_delete_parent(self):
session.flush()
# assert the first one retrieves the same from the identity map
- nu = session.query(m).get(u.id)
+ nu = session.get(m, u.id)
assert u is nu
# clear out the identity map, so next get forces a SELECT
session.expunge_all()
# check it again, identity should be different but ids the same
- nu = session.query(m).get(u.id)
+ nu = session.get(m, u.id)
assert u is not nu and u.id == nu.id and nu.name == "savetester"
session.commit()
session.flush()
session.expunge_all()
- u = session.query(User).get(u.id)
+ u = session.get(User, u.id)
u.name = ""
self.sql_count_(0, session.flush)
id_ = m.primary_key_from_instance(u)
- u = session.query(User).get(id_)
+ u = session.get(User, id_)
assert u.name == "multitester"
conn = session.connection()
eq_(list(address_rows[0]), [u.id, u.foo_id, "lala@hey.com"])
session.expunge_all()
- u = session.query(User).get(id_)
+ u = session.get(User, id_)
assert u.name == "imnew"
def test_history_get(self):
session.flush()
session.expunge_all()
- u = session.query(User).get(u.id)
+ u = session.get(User, u.id)
session.delete(u)
session.flush()
eq_(
session.flush()
session.expunge_all()
- a1 = session.query(Address).get(a1.id)
- u1 = session.query(User).get(u1.id)
+ a1 = session.get(Address, a1.id)
+ u1 = session.get(User, u1.id)
assert a1.user is u1
a1.user = None
session.flush()
session.expunge_all()
- a1 = session.query(Address).get(a1.id)
- u1 = session.query(User).get(u1.id)
+ a1 = session.get(Address, a1.id)
+ u1 = session.get(User, u1.id)
assert a1.user is None
def test_many_to_one_2(self):
session.flush()
session.expunge_all()
- a1 = session.query(Address).get(a1.id)
- a2 = session.query(Address).get(a2.id)
- u1 = session.query(User).get(u1.id)
+ a1 = session.get(Address, a1.id)
+ a2 = session.get(Address, a2.id)
+ u1 = session.get(User, u1.id)
assert a1.user is u1
a1.user = None
session.flush()
session.expunge_all()
- a1 = session.query(Address).get(a1.id)
- a2 = session.query(Address).get(a2.id)
- u1 = session.query(User).get(u1.id)
+ a1 = session.get(Address, a1.id)
+ a2 = session.get(Address, a2.id)
+ u1 = session.get(User, u1.id)
assert a1.user is None
assert a2.user is u1
session.flush()
session.expunge_all()
- a1 = session.query(Address).get(a1.id)
- u1 = session.query(User).get(u1.id)
- u2 = session.query(User).get(u2.id)
+ a1 = session.get(Address, a1.id)
+ u1 = session.get(User, u1.id)
+ u2 = session.get(User, u2.id)
assert a1.user is u1
a1.user = u2
session.flush()
session.expunge_all()
- a1 = session.query(Address).get(a1.id)
- u1 = session.query(User).get(u1.id)
- u2 = session.query(User).get(u2.id)
+ a1 = session.get(Address, a1.id)
+ u1 = session.get(User, u1.id)
+ u2 = session.get(User, u2.id)
assert a1.user is u2
def test_bidirectional_no_load(self):
session.flush()
session.expunge_all()
- a1 = session.query(Address).get(a1.id)
+ a1 = session.get(Address, a1.id)
a1.user = None
session.flush()
session.expunge_all()
- assert session.query(Address).get(a1.id).user is None
- assert session.query(User).get(u1.id).addresses == []
+ assert session.get(Address, a1.id).user is None
+ assert session.get(User, u1.id).addresses == []
class ManyToManyTest(_fixtures.FixtureTest):
session.flush()
session.expunge_all()
- item = session.query(Item).get(item.id)
+ item = session.get(Item, item.id)
eq_(item.keywords, [k1, k2])
def test_association(self):
[T(value=False, name="t2")],
)
- t2 = sess.query(T).get(t2.id)
+ t2 = sess.get(T, t2.id)
t2.value = True
sess.flush()
eq_(
sess.expunge_all()
# lookup an address and move it to the other user
- a1 = sess.query(Address).get(a1.id)
+ a1 = sess.get(Address, a1.id)
# move address to another user's fk
assert a1.user_id == u1.id
sess.commit()
sess.expunge_all()
- u1 = sess.query(User).get(u1.id)
+ u1 = sess.get(User, u1.id)
u1.id = 2
try:
sess.flush()
return User.name.__clause_element__()
s = fixture_session()
- jill = s.query(User).get(3)
+ jill = s.get(User, 3)
s.query(User).update(
{Thing(): "moonbeam"}, synchronize_session="evaluate"
)
User = self.classes.User
s = fixture_session()
- jill = s.query(User).get(3)
+ jill = s.get(User, 3)
s.query(User).update(
{column("name"): "moonbeam"}, synchronize_session="evaluate"
)
)
s = fixture_session()
- jill = s.query(Foo).get(3)
+ jill = s.get(Foo, 3)
s.query(Foo).update(
{"uname": "moonbeam"}, synchronize_session="evaluate"
)
)
s = fixture_session()
- jill = s.query(Foo).get(3)
+ jill = s.get(Foo, 3)
s.query(Foo).update(
{Foo.uname: "moonbeam"}, synchronize_session="evaluate"
)
)
s = fixture_session()
- jill = s.query(Foo).get(3)
+ jill = s.get(Foo, 3)
s.query(Foo).update(
{Foo.ufoo: "moonbeam"}, synchronize_session="evaluate"
)
s1.commit()
s2 = fixture_session(autocommit=False)
- f1_s = s2.query(Foo).get(f1.id)
+ f1_s = s2.get(Foo, f1.id)
f1_s.value = "f1rev3"
with conditional_sane_rowcount_warnings(
update=True, only_returning=True
s1.commit()
# new in 0.5 ! don't need to close the session
- f1 = s1.query(Foo).get(f1.id)
- f2 = s1.query(Foo).get(f2.id)
+ f1 = s1.get(Foo, f1.id)
+ f2 = s1.get(Foo, f2.id)
f1_s.value = "f1rev4"
with conditional_sane_rowcount_warnings(
s1.commit()
s2 = fixture_session(autocommit=False)
- f1s2 = s2.query(Foo).get(f1s1.id)
+ f1s2 = s2.get(Foo, f1s1.id)
f1s2.value = "f1 new value"
with conditional_sane_rowcount_warnings(
update=True, only_returning=True
sa.orm.exc.StaleDataError,
r"Instance .* has version id '\d+' which does not "
r"match database-loaded version id '\d+'",
- s1.query(Foo).with_for_update(read=True).get,
+ s1.get,
+ Foo,
f1s1.id,
+ with_for_update=dict(read=True),
)
# reload it - this expires the old version first
s1.refresh(f1s1, with_for_update={"read": True})
# now assert version OK
- s1.query(Foo).with_for_update(read=True).get(f1s1.id)
+ s1.get(Foo, f1s1.id, with_for_update=dict(read=True))
# assert brand new load is OK too
s1.close()
- s1.query(Foo).with_for_update(read=True).get(f1s1.id)
+ s1.get(Foo, f1s1.id, with_for_update=dict(read=True))
def test_versioncheck_not_versioned(self):
"""ensure the versioncheck logic skips if there isn't a
f1s1 = Foo(value="f1 value", version_id=1)
s1.add(f1s1)
s1.commit()
- s1.query(Foo).with_for_update(read=True).get(f1s1.id)
+ s1.query(Foo).with_for_update(read=True).where(Foo.id == f1s1.id).one()
@engines.close_open_connections
@testing.requires.update_nowait
s1.commit()
s2 = fixture_session(autocommit=False)
- f1s2 = s2.query(Foo).get(f1s1.id)
+ f1s2 = s2.get(Foo, f1s1.id)
# not sure if I like this API
s2.refresh(f1s2, with_for_update=True)
f1s2.value = "f1 new value"
s1.commit()
s2 = fixture_session(autocommit=False)
- f1s2 = s2.query(Foo).with_for_update(read=True).get(f1s1.id)
+ f1s2 = (
+ s2.query(Foo)
+ .with_for_update(read=True)
+ .where(Foo.id == f1s1.id)
+ .one()
+ )
assert f1s2.id == f1s1.id
assert f1s2.value == f1s1.value