raise
path = path[attr]
- elif _is_mapped_class(attr):
- # TODO: this does not appear to be a valid codepath. "attr"
- # would never be a mapper. This block is present in 1.2
- # as well however does not seem to be accessed in any tests.
- if not orm_util._entity_corresponds_to_use_path_impl(
- attr.parent, path[-1]
- ):
- if raiseerr:
- raise sa_exc.ArgumentError(
- "Attribute '%s' does not "
- "link from element '%s'" % (attr, path.entity)
- )
- else:
- return None
else:
- prop = found_property = attr.property
-
- if not orm_util._entity_corresponds_to_use_path_impl(
- attr.parent, path[-1]
- ):
- if raiseerr:
- raise sa_exc.ArgumentError(
- 'Attribute "%s" does not '
- 'link from element "%s".%s'
- % (
- attr,
- path.entity,
- (
- " Did you mean to use "
- "%s.of_type(%s)?"
- % (path[-2], attr.class_.__name__)
- if len(path) > 1
- and path.entity.is_mapper
- and attr.parent.is_aliased_class
- else ""
- ),
+ insp = inspect(attr)
+
+ if insp.is_mapper or insp.is_aliased_class:
+ # TODO: this does not appear to be a valid codepath. "attr"
+ # would never be a mapper. This block is present in 1.2
+ # as well however does not seem to be accessed in any tests.
+ if not orm_util._entity_corresponds_to_use_path_impl(
+ attr.parent, path[-1]
+ ):
+ if raiseerr:
+ raise sa_exc.ArgumentError(
+ "Attribute '%s' does not "
+ "link from element '%s'" % (attr, path.entity)
)
- )
- else:
- return None
+ else:
+ return None
+ elif insp.is_property:
+ prop = found_property = attr
+ path = path[prop]
+ elif insp.is_attribute:
+ prop = found_property = attr.property
- if attr._extra_criteria:
- self._extra_criteria = attr._extra_criteria
+ if not orm_util._entity_corresponds_to_use_path_impl(
+ attr.parent, path[-1]
+ ):
+ if raiseerr:
+ raise sa_exc.ArgumentError(
+ 'Attribute "%s" does not '
+ 'link from element "%s".%s'
+ % (
+ attr,
+ path.entity,
+ (
+ " Did you mean to use "
+ "%s.of_type(%s)?"
+ % (path[-2], attr.class_.__name__)
+ if len(path) > 1
+ and path.entity.is_mapper
+ and attr.parent.is_aliased_class
+ else ""
+ ),
+ )
+ )
+ else:
+ return None
- if getattr(attr, "_of_type", None):
- ac = attr._of_type
- ext_info = of_type_info = inspect(ac)
+ if attr._extra_criteria:
+ self._extra_criteria = attr._extra_criteria
- if polymorphic_entity_context is None:
- polymorphic_entity_context = self.context
+ if getattr(attr, "_of_type", None):
+ ac = attr._of_type
+ ext_info = of_type_info = inspect(ac)
- existing = path.entity_path[prop].get(
- polymorphic_entity_context, "path_with_polymorphic"
- )
+ if polymorphic_entity_context is None:
+ polymorphic_entity_context = self.context
- if not ext_info.is_aliased_class:
- ac = orm_util.with_polymorphic(
- ext_info.mapper.base_mapper,
- ext_info.mapper,
- aliased=True,
- _use_mapper_path=True,
- _existing_alias=inspect(existing)
- if existing is not None
- else None,
+ existing = path.entity_path[prop].get(
+ polymorphic_entity_context, "path_with_polymorphic"
)
- ext_info = inspect(ac)
+ if not ext_info.is_aliased_class:
+ ac = orm_util.with_polymorphic(
+ ext_info.mapper.base_mapper,
+ ext_info.mapper,
+ aliased=True,
+ _use_mapper_path=True,
+ _existing_alias=inspect(existing)
+ if existing is not None
+ else None,
+ )
- path.entity_path[prop].set(
- polymorphic_entity_context, "path_with_polymorphic", ac
- )
+ ext_info = inspect(ac)
- path = path[prop][ext_info]
+ path.entity_path[prop].set(
+ polymorphic_entity_context, "path_with_polymorphic", ac
+ )
- self._of_type = of_type_info
+ path = path[prop][ext_info]
- else:
- path = path[prop]
+ self._of_type = of_type_info
+
+ else:
+ path = path[prop]
if for_strategy is not None:
found_property._get_strategy(for_strategy)
"ORDER BY addresses_1.id",
)
+ def test_dotted_options(self):
+ User = self.classes.User
+
+ sess = fixture_session()
+
+ with testing.expect_deprecated_20(
+ "Using strings to indicate column or relationship "
+ "paths in loader options"
+ ):
+ q2 = (
+ sess.query(User)
+ .order_by(User.id)
+ .options(sa.orm.joinedload("orders"))
+ .options(sa.orm.joinedload("orders.items"))
+ .options(sa.orm.joinedload("orders.items.keywords"))
+ )
+ u = q2.all()
+
+ def go():
+ u[0].orders[1].items[0].keywords[1]
+
+ self.sql_count_(0, go)
+
def test_str_col_loader_opt(self):
User = self.classes.User
return testing.expect_deprecated_20(r"The Query.from_self\(\) method")
+class SelfReferentialEagerTest(fixtures.MappedTest):
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "nodes",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("parent_id", Integer, ForeignKey("nodes.id")),
+ Column("data", String(30)),
+ )
+
+ def test_eager_loading_with_deferred(self):
+ nodes = self.tables.nodes
+
+ class Node(fixtures.ComparableEntity):
+ def append(self, node):
+ self.children.append(node)
+
+ mapper(
+ Node,
+ nodes,
+ properties={
+ "children": relationship(
+ Node, lazy="joined", join_depth=3, order_by=nodes.c.id
+ ),
+ "data": deferred(nodes.c.data),
+ },
+ )
+ sess = fixture_session()
+ n1 = Node(data="n1")
+ n1.append(Node(data="n11"))
+ n1.append(Node(data="n12"))
+ sess.add(n1)
+ sess.flush()
+ sess.expunge_all()
+
+ def go():
+ with assertions.expect_deprecated_20(
+ "Using strings to indicate column or relationship paths"
+ ):
+ eq_(
+ Node(
+ data="n1",
+ children=[Node(data="n11"), Node(data="n12")],
+ ),
+ sess.query(Node)
+ .options(undefer("data"), undefer("children.data"))
+ .first(),
+ )
+
+ self.assert_sql_count(testing.db, go, 1)
+
+
+class LazyLoadOptSpecificityTest(fixtures.DeclarativeMappedTest):
+ """test for [ticket:3963]"""
+
+ @classmethod
+ def setup_classes(cls):
+ Base = cls.DeclarativeBasic
+
+ class A(Base):
+ __tablename__ = "a"
+ id = Column(Integer, primary_key=True)
+ bs = relationship("B")
+
+ class B(Base):
+ __tablename__ = "b"
+ id = Column(Integer, primary_key=True)
+ a_id = Column(ForeignKey("a.id"))
+ cs = relationship("C")
+
+ class C(Base):
+ __tablename__ = "c"
+ id = Column(Integer, primary_key=True)
+ b_id = Column(ForeignKey("b.id"))
+
+ @classmethod
+ def insert_data(cls, connection):
+ A, B, C = cls.classes("A", "B", "C")
+ s = Session(connection)
+ s.add(A(id=1, bs=[B(cs=[C()])]))
+ s.add(A(id=2))
+ s.commit()
+
+ def _run_tests(self, query, expected):
+ def go():
+ for a, _ in query:
+ for b in a.bs:
+ b.cs
+
+ self.assert_sql_count(testing.db, go, expected)
+
+ def test_string_options_aliased_whatever(self):
+ A, B, C = self.classes("A", "B", "C")
+ s = fixture_session()
+ aa = aliased(A)
+ q = (
+ s.query(aa, A)
+ .filter(aa.id == 1)
+ .filter(A.id == 2)
+ .filter(aa.id != A.id)
+ .options(joinedload("bs").joinedload("cs"))
+ )
+ with assertions.expect_deprecated_20(
+ "Using strings to indicate column or relationship paths"
+ ):
+ self._run_tests(q, 1)
+
+ def test_string_options_unaliased_whatever(self):
+ A, B, C = self.classes("A", "B", "C")
+ s = fixture_session()
+ aa = aliased(A)
+ q = (
+ s.query(A, aa)
+ .filter(aa.id == 2)
+ .filter(A.id == 1)
+ .filter(aa.id != A.id)
+ .options(joinedload("bs").joinedload("cs"))
+ )
+ with assertions.expect_deprecated_20(
+ "Using strings to indicate column or relationship paths"
+ ):
+ self._run_tests(q, 1)
+
+
class DynamicTest(_DynamicFixture, _fixtures.FixtureTest):
def test_negative_slice_access_raises(self):
User, Address = self._user_address_fixture()
)
],
sess.query(User)
- .options(joinedload("addresses"))
+ .options(joinedload(User.addresses))
.filter(User.id == 7)
.all(),
)
sess = fixture_session()
q = (
sess.query(User)
- .join("addresses")
- .options(joinedload("addresses"))
+ .join(User.addresses)
+ .options(joinedload(User.addresses))
.order_by("email_address")
)
q = (
sess.query(User)
- .options(joinedload("addresses"))
+ .options(joinedload(User.addresses))
.order_by("email_address")
)
for opt, count in [
((joinedload(User.orders, Order.items),), 10),
- ((joinedload("orders.items"),), 10),
(
(
joinedload(User.orders),
eq_(
self.static.item_keyword_result[0:2],
(
- q.options(joinedload("keywords"))
- .join("keywords")
+ q.options(joinedload(Item.keywords))
+ .join(Item.keywords)
.filter(keywords.c.name == "red")
)
.order_by(Item.id)
def go():
eq_(u1.addresses[0].user, u1)
- self.assert_sql_execution(
- testing.db,
- go,
- CompiledSQL(
- "SELECT addresses.id AS addresses_id, addresses.user_id AS "
- "addresses_user_id, addresses.email_address AS "
- "addresses_email_address FROM addresses WHERE :param_1 = "
- "addresses.user_id",
- {"param_1": 8},
- ),
- )
+ with testing.expect_warnings(raise_on_any_unexpected=True):
+ self.assert_sql_execution(
+ testing.db,
+ go,
+ CompiledSQL(
+ "SELECT addresses.id AS addresses_id, "
+ "addresses.user_id AS "
+ "addresses_user_id, addresses.email_address AS "
+ "addresses_email_address FROM addresses WHERE :param_1 = "
+ "addresses.user_id",
+ {"param_1": 8},
+ ),
+ )
def test_useget_cancels_eager_propagated_present(self):
"""test that a one to many lazyload cancels the unnecessary
def go():
eq_(u1.addresses[0].user, u1)
- self.assert_sql_execution(
- testing.db,
- go,
- CompiledSQL(
- "SELECT addresses.id AS addresses_id, addresses.user_id AS "
- "addresses_user_id, addresses.email_address AS "
- "addresses_email_address FROM addresses WHERE :param_1 = "
- "addresses.user_id",
- {"param_1": 8},
- ),
- )
+ with testing.expect_warnings(raise_on_any_unexpected=True):
+ self.assert_sql_execution(
+ testing.db,
+ go,
+ CompiledSQL(
+ "SELECT addresses.id AS addresses_id, "
+ "addresses.user_id AS "
+ "addresses_user_id, addresses.email_address AS "
+ "addresses_email_address FROM addresses WHERE :param_1 = "
+ "addresses.user_id",
+ {"param_1": 8},
+ ),
+ )
def test_manytoone_limit(self):
"""test that the subquery wrapping only occurs with
self.assert_compile(
sess.query(User)
- .options(joinedload("orders").joinedload("address"))
+ .options(joinedload(User.orders).joinedload(Order.address))
.limit(10),
"SELECT anon_1.users_id AS anon_1_users_id, "
"anon_1.users_name AS anon_1_users_name, "
self.assert_compile(
sess.query(User).options(
- joinedload("orders").joinedload("items"),
- joinedload("orders").joinedload("address"),
+ joinedload(User.orders).joinedload(Order.items),
+ joinedload(User.orders).joinedload(Order.address),
),
"SELECT users.id AS users_id, users.name AS users_name, "
"items_1.id AS items_1_id, "
self.assert_compile(
sess.query(User)
.options(
- joinedload("orders"),
- joinedload("orders.address", innerjoin=True),
+ joinedload(User.orders),
+ joinedload(User.orders, Order.address, innerjoin=True),
)
.limit(10),
"SELECT anon_1.users_id AS anon_1_users_id, anon_1.users_name "
self.assert_compile(
sess.query(User)
.options(
- joinedload("orders", innerjoin=True),
- joinedload("orders.address", innerjoin=True),
+ joinedload(User.orders, innerjoin=True),
+ joinedload(User.orders, Order.address, innerjoin=True),
)
.limit(10),
"SELECT anon_1.users_id AS anon_1_users_id, "
def go():
o1 = (
sess.query(Order)
- .options(lazyload("address"))
+ .options(lazyload(Order.address))
.filter(Order.id == 5)
.one()
)
sess = fixture_session()
q = sess.query(User).options(
- joinedload("orders", innerjoin=False).joinedload(
- "items", innerjoin=True
+ joinedload(User.orders, innerjoin=False).joinedload(
+ Order.items, innerjoin=True
)
)
sess = fixture_session()
q = sess.query(User).options(
- joinedload("orders"), joinedload("addresses", innerjoin="unnested")
+ joinedload(User.orders),
+ joinedload(User.addresses, innerjoin="unnested"),
)
self.assert_compile(
sess = fixture_session()
q = sess.query(User).options(
- joinedload("orders"), joinedload("addresses", innerjoin=True)
+ joinedload(User.orders), joinedload(User.addresses, innerjoin=True)
)
self.assert_compile(
.join(User.orders)
.join(Order.items)
.options(
- joinedload("orders").joinedload("items").joinedload("keywords")
+ joinedload(User.orders)
+ .joinedload(Order.items)
+ .joinedload(Item.keywords)
)
)
sess = fixture_session()
if use_load:
- opt = Load(User).defaultload("orders").lazyload("*")
+ opt = Load(User).defaultload(User.orders).lazyload("*")
else:
- opt = defaultload("orders").lazyload("*")
+ opt = defaultload(User.orders).lazyload("*")
q = sess.query(User).filter(User.id == 7).options(opt)
def test_nested_innerjoin_propagation_multiple_paths_two(self):
# test #3447
- A = self.classes.A
+ A, B, C1 = (self.classes.A, self.classes.B, self.classes.C1)
s = fixture_session()
q = s.query(A).options(
- joinedload("bs"),
- joinedload("bs.c2s", innerjoin=True),
- joinedload("bs.c1s", innerjoin=True),
- joinedload("bs.c1s.d1s"),
+ joinedload(A.bs),
+ joinedload(A.bs, B.c2s, innerjoin=True),
+ joinedload(A.bs, B.c1s, innerjoin=True),
+ joinedload(A.bs, B.c1s, C1.d1s),
)
self.assert_compile(
q,
self._assert_result(q)
def test_multiple_splice_points(self):
- A = self.classes.A
+ A, B, C1, C2, D1 = (
+ self.classes.A,
+ self.classes.B,
+ self.classes.C1,
+ self.classes.C2,
+ self.classes.D1,
+ )
s = fixture_session()
q = s.query(A).options(
- joinedload("bs", innerjoin=False),
- joinedload("bs.c1s", innerjoin=True),
- joinedload("bs.c2s", innerjoin=True),
- joinedload("bs.c1s.d1s", innerjoin=False),
- joinedload("bs.c2s.d2s"),
- joinedload("bs.c1s.d1s.e1s", innerjoin=True),
+ joinedload(A.bs, innerjoin=False),
+ joinedload(A.bs, B.c1s, innerjoin=True),
+ joinedload(A.bs, B.c2s, innerjoin=True),
+ joinedload(A.bs, B.c1s, C1.d1s, innerjoin=False),
+ joinedload(A.bs, B.c2s, C2.d2s),
+ joinedload(A.bs, B.c1s, C1.d1s, D1.e1s, innerjoin=True),
)
self.assert_compile(
s = fixture_session()
- q = s.query(A).options(joinedload("bs_np", innerjoin=False))
+ q = s.query(A).options(joinedload(A.bs_np, innerjoin=False))
self.assert_compile(
q,
"SELECT a.id AS a_id, b_1.id AS b_1_id, b_1.a_id AS b_1_a_id, "
self.assert_sql_count(testing.db, go, 1)
def test_joined_across(self):
- A = self.classes.A
+ A, B, C = self.classes("A", "B", "C")
s = fixture_session()
q = s.query(A).options(
- joinedload("b")
- .joinedload("c", innerjoin=True)
- .joinedload("ds", innerjoin=True)
+ joinedload(A.b)
+ .joinedload(B.c, innerjoin=True)
+ .joinedload(C.ds, innerjoin=True)
)
self.assert_compile(
q,
self.assert_compile(
fixture_session()
.query(A)
- .options(joinedload("bs"))
+ .options(joinedload(A.bs))
.order_by(A.summation)
.limit(50),
"SELECT anon_1.anon_2 AS anon_1_anon_2, anon_1.a_id "
self.assert_compile(
fixture_session()
.query(A)
- .options(joinedload("bs"))
+ .options(joinedload(A.bs))
.order_by(A.summation.desc())
.limit(50),
"SELECT anon_1.anon_2 AS anon_1_anon_2, anon_1.a_id "
self.assert_compile(
fixture_session()
.query(A)
- .options(joinedload("bs"))
+ .options(joinedload(A.bs))
.order_by(A.summation)
.limit(50),
"SELECT anon_1.anon_2 AS anon_1_anon_2, anon_1.a_id "
self.assert_compile(
fixture_session()
.query(A)
- .options(joinedload("bs"))
+ .options(joinedload(A.bs))
.order_by(cp)
.limit(50),
"SELECT anon_1.a_id AS anon_1_a_id, anon_1.anon_2 "
self.assert_compile(
fixture_session()
.query(A)
- .options(joinedload("bs"))
+ .options(joinedload(A.bs))
.order_by(cp)
.limit(50),
"SELECT anon_1.a_id AS anon_1_a_id, anon_1.foo "
self.assert_compile(
fixture_session()
.query(A)
- .options(joinedload("bs"))
+ .options(joinedload(A.bs))
.order_by(~cp)
.limit(50),
"SELECT anon_1.a_id AS anon_1_a_id, anon_1.anon_2 "
a2 = u1.addresses[0]
a2.email_address = "foo"
sess.query(User).options(
- joinedload("addresses").joinedload("dingaling")
+ joinedload(User.addresses).joinedload(Address.dingaling)
).filter_by(id=8).all()
assert u1.addresses[-1] is a1
for a in u1.addresses:
o1 = Order()
u1.orders.append(o1)
sess.query(User).options(
- joinedload("orders").joinedload("items")
+ joinedload(User.orders).joinedload(Order.items)
).filter_by(id=7).all()
for o in u1.orders:
if o is not o1:
u1 = (
sess.query(User)
.filter_by(id=8)
- .options(joinedload("addresses"))
+ .options(joinedload(User.addresses))
.one()
)
sess.query(User).filter_by(id=8).options(
- joinedload("addresses").joinedload("dingaling")
+ joinedload(User.addresses).joinedload(Address.dingaling)
).first()
assert "dingaling" in u1.addresses[0].__dict__
u1 = (
sess.query(User)
.filter_by(id=7)
- .options(joinedload("orders"))
+ .options(joinedload(User.orders))
.one()
)
sess.query(User).filter_by(id=7).options(
- joinedload("orders").joinedload("items")
+ joinedload(User.orders).joinedload(Order.items)
).first()
assert "items" in u1.orders[0].__dict__
def go():
ret = (
sess.query(User, oalias)
- .options(joinedload("addresses"))
+ .options(joinedload(User.addresses))
.join(oalias, "orders")
.order_by(User.id, oalias.id)
.all()
def go():
ret = (
sess.query(User, oalias)
- .options(joinedload("addresses"), joinedload(oalias.items))
+ .options(joinedload(User.addresses), joinedload(oalias.items))
.join(oalias, "orders")
.order_by(User.id, oalias.id)
.all()
eq_(
Node(data="n1", children=[Node(data="n11"), Node(data="n12")]),
sess.query(Node)
- .options(undefer("data"))
+ .options(undefer(Node.data))
.order_by(Node.id)
.first(),
)
eq_(
Node(data="n1", children=[Node(data="n11"), Node(data="n12")]),
sess.query(Node)
- .options(undefer("data"), undefer("children.data"))
+ .options(
+ undefer(Node.data),
+ defaultload(Node.children).undefer(Node.data),
+ )
.first(),
)
sess.query(Node)
.filter_by(data="n1")
.order_by(Node.id)
- .options(joinedload("children.children"))
+ .options(joinedload(Node.children, Node.children))
.first()
)
eq_(
def go():
sess.query(Node).order_by(Node.id).filter_by(data="n1").options(
- joinedload("children.children")
+ joinedload(Node.children, Node.children)
).first()
# test that the query isn't wrapping the initial query for eager
eq_(
session.query(B)
.options(
- joinedload("parent_b1"),
- joinedload("parent_b2"),
- joinedload("parent_z"),
+ joinedload(B.parent_b1),
+ joinedload(B.parent_b2),
+ joinedload(B.parent_z),
)
.filter(B.id.in_([2, 8, 11]))
.order_by(B.id)
eq_(
sess.query(User)
.order_by(User.name)
- .options(joinedload("stuff"))
+ .options(joinedload(User.stuff))
.all(),
[
User(name="user1", stuff=[Stuff(id=2)]),
eq_(
sess.query(User)
.order_by(User.name)
- .options(joinedload("stuff"))
+ .options(joinedload(User.stuff))
.first(),
User(name="user1", stuff=[Stuff(id=2)]),
)
eq_(
sess.query(User)
.filter(User.id == 2)
- .options(joinedload("stuff"))
+ .options(joinedload(User.stuff))
.one(),
User(name="user2", stuff=[Stuff(id=4)]),
)
self.assert_compile(
s.query(Parent)
- .options(load_only("data"), joinedload(Parent.o2mchild))
+ .options(load_only(Parent.data), joinedload(Parent.o2mchild))
.limit(10),
"SELECT anon_1.parent_id AS anon_1_parent_id, "
"anon_1.parent_data AS anon_1_parent_data, "
self.assert_compile(
s.query(Parent)
- .options(load_only("data"), joinedload(Parent.m2mchild))
+ .options(load_only(Parent.data), joinedload(Parent.m2mchild))
.limit(10),
"SELECT anon_1.parent_id AS anon_1_parent_id, "
"anon_1.parent_data AS anon_1_parent_data, "
self.assert_compile(
s.query(Parent).options(
- load_only("data"), joinedload(Parent.o2mchild)
+ load_only(Parent.data), joinedload(Parent.o2mchild)
),
"SELECT parent.id AS parent_id, parent.data AS parent_data, "
"parent.arb AS parent_arb, o2mchild_1.id AS o2mchild_1_id, "
self.assert_compile(
s.query(Parent).options(
- load_only("data"), joinedload(Parent.m2mchild)
+ load_only(Parent.data), joinedload(Parent.m2mchild)
),
"SELECT parent.id AS parent_id, parent.data AS parent_data, "
"parent.arb AS parent_arb, m2mchild_1.id AS m2mchild_1_id "
# these paths don't work out correctly?
lz_test = (
s.query(LDA)
- .join("ld")
- .options(contains_eager("ld"))
+ .join(LDA.ld)
+ .options(contains_eager(LDA.ld))
.join("a", (l_ac, "ld"), (u_ac, "user"))
.options(
- contains_eager("a")
- .contains_eager("ld", alias=l_ac)
- .contains_eager("user", alias=u_ac)
+ contains_eager(LDA.a)
+ .contains_eager(A.ld, alias=l_ac)
+ .contains_eager(LD.user, alias=u_ac)
)
.first()
)
self.assert_sql_count(testing.db, go, expected)
- def test_string_options_aliased_whatever(self):
- A, B, C = self.classes("A", "B", "C")
- s = fixture_session()
- aa = aliased(A)
- q = (
- s.query(aa, A)
- .filter(aa.id == 1)
- .filter(A.id == 2)
- .filter(aa.id != A.id)
- .options(joinedload("bs").joinedload("cs"))
- )
- self._run_tests(q, 1)
-
- def test_string_options_unaliased_whatever(self):
- A, B, C = self.classes("A", "B", "C")
- s = fixture_session()
- aa = aliased(A)
- q = (
- s.query(A, aa)
- .filter(aa.id == 2)
- .filter(A.id == 1)
- .filter(aa.id != A.id)
- .options(joinedload("bs").joinedload("cs"))
- )
- self._run_tests(q, 1)
-
def test_lazyload_aliased_abs_bcs_one(self):
A, B, C = self.classes("A", "B", "C")
s = fixture_session()
def test_deep_options_2(self):
"""test (joined|subquery)load_all() options"""
- User = self.classes.User
+ User, Order, Item = self.classes("User", "Order", "Item")
sess = fixture_session()
sess.query(User)
.order_by(User.id)
.options(
- sa.orm.joinedload("orders")
- .joinedload("items")
- .joinedload("keywords")
+ sa.orm.joinedload(User.orders)
+ .joinedload(Order.items)
+ .joinedload(Item.keywords)
)
).all()
result = (
sess.query(User).options(
- sa.orm.subqueryload("orders")
- .subqueryload("items")
- .subqueryload("keywords")
+ sa.orm.subqueryload(User.orders)
+ .subqueryload(Order.items)
+ .subqueryload(Item.keywords)
)
).all()
self.sql_count_(0, go)
- def test_deep_options_3(self):
- User = self.classes.User
-
- sess = fixture_session()
-
- # same thing, with separate options calls
- q2 = (
- sess.query(User)
- .order_by(User.id)
- .options(sa.orm.joinedload("orders"))
- .options(sa.orm.joinedload("orders.items"))
- .options(sa.orm.joinedload("orders.items.keywords"))
- )
- u = q2.all()
-
- def go():
- u[0].orders[1].items[0].keywords[1]
-
- self.sql_count_(0, go)
-
def test_deep_options_4(self):
Item, User, Order = (
self.classes.Item,
q3 = (
sess.query(User)
.order_by(User.id)
- .options(sa.orm.joinedload("orders.items.keywords"))
+ .options(
+ sa.orm.defaultload(User.orders)
+ .defaultload(Order.items)
+ .joinedload(Item.keywords)
+ )
)
u = q3.all()
from .inheritance._poly_fixtures import Person
+def _deprecated_strings():
+ return testing.expect_deprecated_20("Using strings to indicate")
+
+
class QueryTest(_fixtures.FixtureTest):
run_setup_mappers = "once"
run_inserts = "once"
Address = self.classes.Address
result = Load(User)
- eq_(
- result._generate_path(
- inspect(User)._path_registry, "addresses", None, "relationship"
- ),
- self._make_path_registry([User, "addresses", Address]),
- )
+ with _deprecated_strings():
+ eq_(
+ result._generate_path(
+ inspect(User)._path_registry,
+ "addresses",
+ None,
+ "relationship",
+ ),
+ self._make_path_registry([User, "addresses", Address]),
+ )
def test_gen_path_string_column(self):
User = self.classes.User
result = Load(User)
- eq_(
- result._generate_path(
- inspect(User)._path_registry, "name", None, "column"
- ),
- self._make_path_registry([User, "name"]),
- )
+ with _deprecated_strings():
+ eq_(
+ result._generate_path(
+ inspect(User)._path_registry, "name", None, "column"
+ ),
+ self._make_path_registry([User, "name"]),
+ )
def test_gen_path_invalid_from_col(self):
User = self.classes.User
sess = fixture_session()
q = sess.query(OrderWProp).options(defer("some_attr"))
- assert_raises_message(
- sa.exc.ArgumentError,
- r"Expected attribute \"some_attr\" on mapped class "
- "OrderWProp->orders to be a mapped attribute; instead "
- "got .*property.* object.",
- q._compile_state,
- )
+ with _deprecated_strings():
+ assert_raises_message(
+ sa.exc.ArgumentError,
+ r"Expected attribute \"some_attr\" on mapped class "
+ "OrderWProp->orders to be a mapped attribute; instead "
+ "got .*property.* object.",
+ q._compile_state,
+ )
def test_gen_path_attr_entity_invalid_noraiseerr(self):
User = self.classes.User
User = self.classes.User
l1 = Load(User)
- l2 = l1.joinedload("addresses")
+ l2 = l1.joinedload(User.addresses)
to_bind = list(l2.context.values())[0]
eq_(
l1.context,
User = self.classes.User
l1 = Load(User)
- l2 = l1.defer("name")
+ l2 = l1.defer(User.name)
l3 = list(l2.context.values())[0]
eq_(l1.context, {("loader", self._make_path([User, "name"])): l3})
eq_(attr[key].local_opts, expected)
def test_single_opt_only(self):
+ User = self.classes.User
+
opt = strategy_options._UnboundLoad().some_col_opt_only(
- "name", {"foo": "bar"}
+ User.name, {"foo": "bar"}
)
self._assert_attrs([opt], {"foo": "bar"})
def test_unbound_multiple_opt_only(self):
+ User = self.classes.User
opts = [
strategy_options._UnboundLoad().some_col_opt_only(
- "name", {"foo": "bar"}
+ User.name, {"foo": "bar"}
),
strategy_options._UnboundLoad().some_col_opt_only(
- "name", {"bat": "hoho"}
+ User.name, {"bat": "hoho"}
),
]
self._assert_attrs(opts, {"foo": "bar", "bat": "hoho"})
User = self.classes.User
opts = [
Load(User)
- .some_col_opt_only("name", {"foo": "bar"})
- .some_col_opt_only("name", {"bat": "hoho"})
+ .some_col_opt_only(User.name, {"foo": "bar"})
+ .some_col_opt_only(User.name, {"bat": "hoho"})
]
self._assert_attrs(opts, {"foo": "bar", "bat": "hoho"})
User = self.classes.User
opts = [
Load(User)
- .some_col_opt_only("name", {"foo": "bar"})
- .some_col_opt_strategy("name", {"bat": "hoho"})
+ .some_col_opt_only(User.name, {"foo": "bar"})
+ .some_col_opt_strategy(User.name, {"bat": "hoho"})
]
self._assert_attrs(opts, {"foo": "bar", "bat": "hoho"})
def test_unbound_strat_opt_recvs_from_optonly(self):
+ User = self.classes.User
opts = [
strategy_options._UnboundLoad().some_col_opt_only(
- "name", {"foo": "bar"}
+ User.name, {"foo": "bar"}
),
strategy_options._UnboundLoad().some_col_opt_strategy(
- "name", {"bat": "hoho"}
+ User.name, {"bat": "hoho"}
),
]
self._assert_attrs(opts, {"foo": "bar", "bat": "hoho"})
def test_unbound_opt_only_adds_to_strat(self):
+ User = self.classes.User
opts = [
strategy_options._UnboundLoad().some_col_opt_strategy(
- "name", {"bat": "hoho"}
+ User.name, {"bat": "hoho"}
),
strategy_options._UnboundLoad().some_col_opt_only(
- "name", {"foo": "bar"}
+ User.name, {"foo": "bar"}
),
]
self._assert_attrs(opts, {"foo": "bar", "bat": "hoho"})
User = self.classes.User
opts = [
Load(User)
- .some_col_opt_strategy("name", {"bat": "hoho"})
- .some_col_opt_only("name", {"foo": "bar"})
+ .some_col_opt_strategy(User.name, {"bat": "hoho"})
+ .some_col_opt_only(User.name, {"foo": "bar"})
]
self._assert_attrs(opts, {"foo": "bar", "bat": "hoho"})
result = (
sess.query(User)
.order_by(User.id)
- .options(sa.orm.joinedload("addresses"))
+ .options(sa.orm.joinedload(User.addresses))
).all()
def go():
sess = fixture_session()
u = (
sess.query(User)
- .options(sa.orm.joinedload("addresses"))
+ .options(sa.orm.joinedload(User.addresses))
.filter_by(id=8)
).one()
sess = fixture_session()
u = (
sess.query(User)
- .options(sa.orm.lazyload("addresses"))
+ .options(sa.orm.lazyload(User.addresses))
.filter_by(id=8)
).one()
result = (
sess.query(User)
.order_by(User.id)
- .options(sa.orm.lazyload("addresses"))
+ .options(sa.orm.lazyload(User.addresses))
).all()
def go():