while our_keys:
chunk = our_keys[0:chunksize]
our_keys = our_keys[chunksize:]
- data = {
- k: v
- for k, v in context.session.execute(
- q,
- params={
- "primary_keys": [
- key[0] if query_info.zero_idx else key
- for key in chunk
- ]
- },
- execution_options=execution_options,
- ).unique()
- }
+ result = context.session.execute(
+ q,
+ params={
+ "primary_keys": [
+ key[0] if query_info.zero_idx else key for key in chunk
+ ]
+ },
+ execution_options=execution_options,
+ )
+ if result.context is not None and result.context.requires_uniquing:
+ result = result.unique()
+ data = {k: v for k, v in result}
for key in chunk:
# for a real foreign key and no concurrent changes to the
for key, state, state_dict, overwrite in chunk
]
+ result = context.session.execute(
+ q,
+ params={"primary_keys": primary_keys},
+ execution_options=execution_options,
+ )
+ if result.context is not None and result.context.requires_uniquing:
+ result = result.unique()
data = collections.defaultdict(list)
- for k, v in itertools.groupby(
- context.session.execute(
- q,
- params={"primary_keys": primary_keys},
- execution_options=execution_options,
- ).unique(),
- lambda x: x[0],
- ):
+ for k, v in itertools.groupby(result, lambda x: x[0]):
data[k].extend(vv[1] for vv in v)
for key, state, state_dict, overwrite in chunk:
[3],
)
+ def test_m2m_selectin_no_duplicate_children(self, simple_m2m, connection):
+ """selectinload over m2m (omit_join fast path) must load the correct
+ collection members and must not produce duplicate items.
+
+ The simple_m2m fixture has a1.bs=[b1,b2] and a2.bs=[b1].
+ """
+ A = simple_m2m
+
+ with Session(connection) as session:
+
+ def go():
+ statement = (
+ select(A).options(selectinload(A.bs)).order_by(A.id)
+ )
+ return session.execute(statement).scalars().all()
+
+ results = self.assert_sql_execution(
+ connection,
+ go,
+ CompiledSQL("SELECT a.id FROM a ORDER BY a.id", {}),
+ CompiledSQL(
+ "SELECT a_b.a_id, b.id "
+ "FROM a_b JOIN b ON b.id = a_b.b_id "
+ "WHERE a_b.a_id IN "
+ "(__[POSTCOMPILE_primary_keys])",
+ {"primary_keys": [1, 2]},
+ ),
+ )
+
+ eq_(sorted(b.id for b in results[0].bs), [1, 2])
+ eq_(sorted(b.id for b in results[1].bs), [1])
+
+ for a in results:
+ b_ids = [b.id for b in a.bs]
+ eq_(len(b_ids), len(set(b_ids)))
+
class SameNamePolymorphicTest(fixtures.DeclarativeMappedTest):
@classmethod
self.assert_sql_count(testing.db, go, 2)
+class TestSelectinWithNestedJoinedCollectionDedup(
+ fixtures.DeclarativeMappedTest
+):
+ """Regression guard for selectinload(...).joinedload/selectinload(...) on a
+ collection.
+
+ When the inner selectin query uses joinedload on a collection, the result
+ rows are multiplied (one row per grandchild). The .unique() call in
+ _load_via_parent must deduplicate those rows so that each parent row ends
+ up with exactly one copy of each child object.
+
+ Also verifies that the joinedload is folded into the selectin query rather
+ than issuing a separate third query — the total SQL count must be 2 for
+ joinedload and 3 for nested selectinload.
+ """
+
+ @classmethod
+ def setup_classes(cls):
+ Base = cls.DeclarativeBasic
+
+ class User(ComparableEntity, Base):
+ __tablename__ = "sel_dedup_user"
+ id = Column(Integer, primary_key=True)
+ name = Column(String(50))
+ addresses = relationship(
+ "Address", back_populates="user", order_by="Address.id"
+ )
+
+ class Address(ComparableEntity, Base):
+ __tablename__ = "sel_dedup_address"
+ id = Column(Integer, primary_key=True)
+ user_id = Column(ForeignKey("sel_dedup_user.id"))
+ email = Column(String(50))
+ user = relationship("User", back_populates="addresses")
+ dingalings = relationship(
+ "Dingaling", back_populates="address", order_by="Dingaling.id"
+ )
+
+ class Dingaling(ComparableEntity, Base):
+ __tablename__ = "sel_dedup_dingaling"
+ id = Column(Integer, primary_key=True)
+ address_id = Column(ForeignKey("sel_dedup_address.id"))
+ data = Column(String(50))
+ address = relationship("Address", back_populates="dingalings")
+
+ @classmethod
+ def insert_data(cls, connection):
+ User, Address, Dingaling = cls.classes("User", "Address", "Dingaling")
+ sess = Session(connection)
+ # user 1: one address with TWO dingalings — this is the key fixture.
+ # When selectinload(User.addresses).joinedload(Address.dingalings) runs
+ # the inner selectin query, address 1 will appear in 2 result rows
+ # (one per dingaling). Without .unique(), loading.require_unique would
+ # raise InvalidRequestError: "The unique() method must be invoked on
+ # this Result".
+ sess.add(
+ User(
+ id=1,
+ name="u1",
+ addresses=[
+ Address(
+ id=1,
+ email="a1@example.com",
+ dingalings=[
+ Dingaling(id=1, data="d1"),
+ Dingaling(id=2, data="d2"),
+ ],
+ ),
+ Address(id=2, email="a2@example.com"),
+ ],
+ )
+ )
+ # user 2: one address, one dingaling — control case
+ sess.add(
+ User(
+ id=2,
+ name="u2",
+ addresses=[
+ Address(
+ id=3,
+ email="a3@example.com",
+ dingalings=[Dingaling(id=3, data="d3")],
+ )
+ ],
+ )
+ )
+ sess.commit()
+
+ @testing.combinations(
+ ("joinedload", 2),
+ ("selectinload", 3),
+ id_="sa",
+ argnames="inner_loader_name,expected_sql_count",
+ )
+ def test_selectin_with_nested_joined_collection_still_dedupes(
+ self, inner_loader_name, expected_sql_count
+ ):
+ """Regression: selectinload(...).joinedload/selectinload(...) on a
+ collection must continue to dedupe inner rows after the
+ conditional-unique optimization.
+ """
+ User, Address, Dingaling = self.classes("User", "Address", "Dingaling")
+ sess = fixture_session()
+
+ if inner_loader_name == "joinedload":
+ inner_opt = selectinload(User.addresses).joinedload(
+ Address.dingalings
+ )
+ else:
+ inner_opt = selectinload(User.addresses).selectinload(
+ Address.dingalings
+ )
+
+ def go():
+ # expunge so we get fresh queries on each call
+ sess.expunge_all()
+ return sess.query(User).options(inner_opt).order_by(User.id).all()
+
+ users = self.assert_sql_count(testing.db, go, expected_sql_count)
+
+ eq_(len(users), 2)
+
+ u1 = users[0]
+ # Address 1 has 2 dingalings; without .unique() the selectin result
+ # would produce 2 rows for address 1 and loading.require_unique would
+ # raise InvalidRequestError. The assertions below are belt-and-
+ # suspenders for if the conditional logic changes.
+ address_ids = [a.id for a in u1.addresses]
+ eq_(len(address_ids), len(set(address_ids)))
+ eq_(len(u1.addresses), 2)
+
+ # Also verify the grandchildren loaded correctly
+ target_address = next(a for a in u1.addresses if a.id == 1)
+ eq_(len(target_address.dingalings), 2)
+
+ u2 = users[1]
+ address_ids2 = [a.id for a in u2.addresses]
+ assert len(address_ids2) == len(
+ set(address_ids2)
+ ), f"user {u2.id} has duplicate addresses: {address_ids2}"
+ eq_(len(u2.addresses), 1)
+ eq_(len(u2.addresses[0].dingalings), 1)
+
+
class TestCompositePlusNonComposite(fixtures.DeclarativeMappedTest):
__requires__ = ("tuple_in",)