_chunksize = 500
+ @classmethod
+ def _set_chunksize(cls, loadopt) -> int:
+ if loadopt is None or hasattr(loadopt, "local_opts") is None:
+ return cls._chunksize
+
+ user_input = loadopt.local_opts.get("chunksize", None)
+ if user_input is None:
+ return cls._chunksize
+ elif not isinstance(user_input, int) or user_input < 1:
+ raise sa_exc.ArgumentError(
+ f"'chunksize={user_input}' is not an appropriate input, "
+ f"please use a positive non-zero integer."
+ )
+ return user_input
+
def __init__(self, parent, strategy_key):
super().__init__(parent, strategy_key)
self.join_depth = self.parent_property.join_depth
_setup_outermost_orderby, self.parent_property
)
+ chunksize = self._set_chunksize(loadopt)
+
if query_info.load_only_child:
self._load_via_child(
our_states,
q,
context,
execution_options,
+ chunksize,
)
else:
self._load_via_parent(
- our_states, query_info, q, context, execution_options
+ our_states,
+ query_info,
+ q,
+ context,
+ execution_options,
+ chunksize,
)
def _load_via_child(
q,
context,
execution_options,
+ chunksize,
):
uselist = self.uselist
# this sort is really for the benefit of the unit tests
our_keys = sorted(our_states)
while our_keys:
- chunk = our_keys[0 : self._chunksize]
- our_keys = our_keys[self._chunksize :]
+ chunk = our_keys[0:chunksize]
+ our_keys = our_keys[chunksize:]
data = {
k: v
for k, v in context.session.execute(
state.get_impl(self.key).set_committed_value(state, dict_, None)
def _load_via_parent(
- self, our_states, query_info, q, context, execution_options
+ self, our_states, query_info, q, context, execution_options, chunksize
):
uselist = self.uselist
_empty_result = () if uselist else None
while our_states:
- chunk = our_states[0 : self._chunksize]
- our_states = our_states[self._chunksize :]
+ chunk = our_states[0:chunksize]
+ our_states = our_states[chunksize:]
primary_keys = [
key[0] if query_info.zero_idx else key
)
session.commit()
- def test_odd_number_chunks(self):
+ @testing.combinations(
+ (None, (1, 101)),
+ (47, (1, 48, 95, 101)),
+ (50, (1, 51, 101)),
+ (99, (1, 100, 101)),
+ (108, (1, 101)),
+ argnames="chunksize, expected_range",
+ )
+ @testing.variation("chunksize_spec", ["monkeypatch", "parameter"])
+ def test_odd_number_chunks(
+ self, chunksize, expected_range, chunksize_spec
+ ):
A, B = self.classes("A", "B")
session = fixture_session()
def go():
- with mock.patch(
- "sqlalchemy.orm.strategies._SelectInLoader._chunksize", 47
- ):
- q = session.query(A).options(selectinload(A.bs)).order_by(A.id)
+ if chunksize_spec.monkeypatch:
+ if chunksize is None:
+ statement = (
+ select(A).options(selectinload(A.bs)).order_by(A.id)
+ )
+
+ session.scalars(statement).all()
+ else:
+ with mock.patch(
+ "sqlalchemy.orm.strategies._SelectInLoader._chunksize",
+ chunksize,
+ ):
+
+ statement = (
+ select(A)
+ .options(selectinload(A.bs))
+ .order_by(A.id)
+ )
- for a in q:
- a.bs
+ session.scalars(statement).all()
+ else:
+
+ statement = (
+ select(A)
+ .options(selectinload(A.bs, chunksize=chunksize))
+ .order_by(A.id)
+ )
+
+ session.scalars(statement).all()
self.assert_sql_execution(
testing.db,
go,
- CompiledSQL("SELECT a.id AS a_id FROM a ORDER BY a.id", {}),
- CompiledSQL(
- "SELECT b.a_id, b.id "
- "FROM b WHERE b.a_id IN "
- "(__[POSTCOMPILE_primary_keys]) ORDER BY b.id",
- {"primary_keys": list(range(1, 48))},
- ),
- CompiledSQL(
- "SELECT b.a_id, b.id "
- "FROM b WHERE b.a_id IN "
- "(__[POSTCOMPILE_primary_keys]) ORDER BY b.id",
- {"primary_keys": list(range(48, 95))},
- ),
- CompiledSQL(
- "SELECT b.a_id, b.id "
- "FROM b WHERE b.a_id IN "
- "(__[POSTCOMPILE_primary_keys]) ORDER BY b.id",
- {"primary_keys": list(range(95, 101))},
- ),
+ CompiledSQL("SELECT a.id FROM a ORDER BY a.id", {}),
+ *[
+ CompiledSQL(
+ "SELECT b.a_id, b.id "
+ "FROM b WHERE b.a_id IN "
+ "(__[POSTCOMPILE_primary_keys]) ORDER BY b.id",
+ {"primary_keys": list(range(a, b))},
+ )
+ for a, b in zip(expected_range, expected_range[1:])
+ ],
)
+ @testing.combinations(-250, "a", 0)
+ def test_chunksize_value_error(self, chunksize):
+ A, B = self.classes("A", "B")
+
+ def go():
+ with testing.expect_raises_message(
+ sa.exc.ArgumentError,
+ ".*please use a positive non-zero integer.*",
+ ):
+ select(A).options(
+ selectinload(A.bs, chunksize=chunksize)
+ ).order_by(A.id)
+
@testing.requires.independent_cursors
def test_yield_per(self):
# the docs make a lot of guarantees about yield_per
)
+class ChainedChunkingTest(fixtures.DeclarativeMappedTest):
+ @classmethod
+ def setup_mappers(cls):
+ Base = cls.DeclarativeBasic
+
+ class A(ComparableEntity, Base):
+ __tablename__ = "a"
+ id = Column(Integer, primary_key=True)
+ bs = relationship("B", order_by="B.id", back_populates="a")
+
+ class B(ComparableEntity, Base):
+ __tablename__ = "b"
+ id = Column(Integer, primary_key=True)
+ a_id = Column(ForeignKey("a.id"))
+ a = relationship("A", back_populates="bs")
+ cs = relationship("C", order_by="C.id", back_populates="b")
+
+ class C(ComparableEntity, Base):
+ __tablename__ = "c"
+ id = Column(Integer, primary_key=True)
+ b_id = Column(ForeignKey("b.id"))
+ b = relationship("B", back_populates="cs")
+
+ @classmethod
+ def insert_data(cls, connection):
+ A, B, C = cls.classes("A", "B", "C")
+
+ session = Session(connection)
+
+ for i in range(1, 6):
+ b_list = []
+ for j in range(1, 4):
+ b_id = (i * 6) + j
+ c_id = b_id + 1
+
+ b_list.append(B(id=b_id, cs=[C(id=c_id)]))
+ session.add(A(id=i, bs=b_list))
+ session.commit()
+
+ def test_chained_selectinload_with_two_custom_chunksize(self):
+ A, B, C = self.classes("A", "B", "C")
+
+ b_list = [7, 8, 9, 13, 14, 15, 19, 20, 21, 25, 26, 27, 31, 32, 33]
+
+ session = fixture_session()
+
+ def go():
+ statement = (
+ select(A)
+ .options(
+ selectinload(A.bs, chunksize=3).selectinload(
+ B.cs, chunksize=4
+ )
+ )
+ .order_by(A.id)
+ )
+
+ session.scalars(statement).all()
+
+ self.assert_sql_execution(
+ testing.db,
+ go,
+ CompiledSQL("SELECT a.id FROM a ORDER BY a.id", {}),
+ CompiledSQL(
+ "SELECT b.a_id, b.id "
+ "FROM b WHERE b.a_id IN "
+ "(__[POSTCOMPILE_primary_keys]) ORDER BY b.id",
+ {"primary_keys": list(range(1, 4))},
+ ),
+ CompiledSQL(
+ "SELECT b.a_id, b.id "
+ "FROM b WHERE b.a_id IN "
+ "(__[POSTCOMPILE_primary_keys]) ORDER BY b.id",
+ {"primary_keys": list(range(4, 6))},
+ ),
+ CompiledSQL(
+ "SELECT c.b_id, c.id "
+ "FROM c WHERE c.b_id IN "
+ "(__[POSTCOMPILE_primary_keys]) ORDER BY c.id",
+ {"primary_keys": b_list[0:4]},
+ ),
+ CompiledSQL(
+ "SELECT c.b_id, c.id "
+ "FROM c WHERE c.b_id IN "
+ "(__[POSTCOMPILE_primary_keys]) ORDER BY c.id",
+ {"primary_keys": b_list[4:8]},
+ ),
+ CompiledSQL(
+ "SELECT c.b_id, c.id "
+ "FROM c WHERE c.b_id IN "
+ "(__[POSTCOMPILE_primary_keys]) ORDER BY c.id",
+ {"primary_keys": b_list[8:12]},
+ ),
+ CompiledSQL(
+ "SELECT c.b_id, c.id "
+ "FROM c WHERE c.b_id IN "
+ "(__[POSTCOMPILE_primary_keys]) ORDER BY c.id",
+ {"primary_keys": b_list[12:]},
+ ),
+ )
+
+ def test_chained_selectinload_with_one_chunksize(self):
+ """
+ This test is to make sure that a previous custom chunksize doesn't
+ effect chunksize in remaining selectinload
+ """
+
+ A, B, C = self.classes("A", "B", "C")
+
+ b_list = [7, 8, 9, 13, 14, 15, 19, 20, 21, 25, 26, 27, 31, 32, 33]
+
+ session = fixture_session()
+
+ def go():
+ statement = (
+ select(A)
+ .options(selectinload(A.bs, chunksize=3).selectinload(B.cs))
+ .order_by(A.id)
+ )
+
+ session.scalars(statement).all()
+
+ self.assert_sql_execution(
+ testing.db,
+ go,
+ CompiledSQL("SELECT a.id FROM a ORDER BY a.id", {}),
+ CompiledSQL(
+ "SELECT b.a_id, b.id "
+ "FROM b WHERE b.a_id IN "
+ "(__[POSTCOMPILE_primary_keys]) ORDER BY b.id",
+ {"primary_keys": list(range(1, 4))},
+ ),
+ CompiledSQL(
+ "SELECT b.a_id, b.id "
+ "FROM b WHERE b.a_id IN "
+ "(__[POSTCOMPILE_primary_keys]) ORDER BY b.id",
+ {"primary_keys": list(range(4, 6))},
+ ),
+ CompiledSQL(
+ "SELECT c.b_id, c.id "
+ "FROM c WHERE c.b_id IN "
+ "(__[POSTCOMPILE_primary_keys]) ORDER BY c.id",
+ {"primary_keys": b_list},
+ ),
+ )
+
+
class SubRelationFromJoinedSubclassMultiLevelTest(_Polymorphic):
@classmethod
def define_tables(cls, metadata):