.. versionadded:: 1.2
-"Select IN" eager loading is provided using the ``"selectin"`` argument
-to :paramref:`.relationship.lazy` or by using the :func:`.selectinload` loader
-option. This style of loading emits a SELECT that refers to
-the primary key values of the parent object inside of an IN clause,
-in order to load related associations:
+"Select IN" eager loading is provided using the ``"selectin"`` argument to
+:paramref:`.relationship.lazy` or by using the :func:`.selectinload` loader
+option. This style of loading emits a SELECT that refers to the primary key
+values of the parent object, or in the case of a simple many-to-one
+relationship to the those of the child objects, inside of an IN clause, in
+order to load related associations:
.. sourcecode:: python+sql
.. versionchanged:: 1.3 selectin loading can omit the JOIN for a simple
one-to-many collection.
-In the case where the primary key of the parent object isn't present in
-the related row, "selectin" loading will also JOIN to the parent table so that
-the parent primary key values are present:
+.. versionchanged:: 1.3.6 selectin loading can also omit the JOIN for a simple
+ many-to-one relationship.
+
+For collections, in the case where the primary key of the parent object isn't
+present in the related row, "selectin" loading will also JOIN to the parent
+table so that the parent primary key values are present. This also takes place
+for a non-collection, many-to-one load where the related column values are not
+loaded on the parent objects and would otherwise need to be loaded:
.. sourcecode:: python+sql
return the wrong result.
* "selectin" loading, unlike joined or subquery eager loading, always emits
- its SELECT in terms of the immediate parent objects just loaded, and
- not the original type of object at the top of the chain. So if eager loading
- many levels deep, "selectin" loading still uses no more than one JOIN,
- and usually no JOINs, in the statement. In comparison,
- joined and subquery eager loading always refer to multiple JOINs up to
- the original parent.
+ its SELECT in terms of the immediate parent objects just loaded, and not the
+ original type of object at the top of the chain. So if eager loading many
+ levels deep, "selectin" loading still uses no more than one JOIN, and usually
+ no JOINs, in the statement. In comparison, joined and subquery eager
+ loading always refer to multiple JOINs up to the original parent.
* "selectin" loading produces a SELECT statement of a predictable structure,
independent of that of the original query. As such, taking advantage of
"""sqlalchemy.orm.interfaces.LoaderStrategy
implementations, and related MapperOptions."""
+from __future__ import absolute_import
+import collections
import itertools
from . import attributes
"join_depth",
"omit_join",
"_parent_alias",
- "_in_expr",
- "_pk_cols",
- "_zero_idx",
+ "_query_info",
+ "_fallback_query_info",
"_bakery",
)
+ query_info = collections.namedtuple(
+ "queryinfo",
+ [
+ "load_only_child",
+ "load_with_join",
+ "in_expr",
+ "pk_cols",
+ "zero_idx",
+ "child_lookup_cols",
+ ],
+ )
+
_chunksize = 500
def __init__(self, parent, strategy_key):
super(SelectInLoader, self).__init__(parent, strategy_key)
self.join_depth = self.parent_property.join_depth
+ is_m2o = self.parent_property.direction is interfaces.MANYTOONE
if self.parent_property.omit_join is not None:
self.omit_join = self.parent_property.omit_join
lazyloader = self.parent_property._get_strategy(
(("lazy", "select"),)
)
- self.omit_join = self.parent._get_clause[0].compare(
- lazyloader._rev_lazywhere,
- use_proxies=True,
- equivalents=self.parent._equivalent_columns,
- )
+ if is_m2o:
+ self.omit_join = lazyloader.use_get
+ else:
+ self.omit_join = self.parent._get_clause[0].compare(
+ lazyloader._rev_lazywhere,
+ use_proxies=True,
+ equivalents=self.parent._equivalent_columns,
+ )
if self.omit_join:
- self._init_for_omit_join()
+ if is_m2o:
+ self._query_info = self._init_for_omit_join_m2o()
+ self._fallback_query_info = self._init_for_join()
+ else:
+ self._query_info = self._init_for_omit_join()
else:
- self._init_for_join()
+ self._query_info = self._init_for_join()
def _init_for_omit_join(self):
pk_to_fk = dict(
for equiv in self.parent._equivalent_columns.get(k, ())
)
- self._pk_cols = fk_cols = [
+ pk_cols = fk_cols = [
pk_to_fk[col] for col in self.parent.primary_key if col in pk_to_fk
]
if len(fk_cols) > 1:
- self._in_expr = sql.tuple_(*fk_cols)
- self._zero_idx = False
+ in_expr = sql.tuple_(*fk_cols)
+ zero_idx = False
else:
- self._in_expr = fk_cols[0]
- self._zero_idx = True
+ in_expr = fk_cols[0]
+ zero_idx = True
+
+ return self.query_info(False, False, in_expr, pk_cols, zero_idx, None)
+
+ def _init_for_omit_join_m2o(self):
+ pk_cols = self.mapper.primary_key
+ if len(pk_cols) > 1:
+ in_expr = sql.tuple_(*pk_cols)
+ zero_idx = False
+ else:
+ in_expr = pk_cols[0]
+ zero_idx = True
+
+ lazyloader = self.parent_property._get_strategy((("lazy", "select"),))
+ lookup_cols = [lazyloader._equated_columns[pk] for pk in pk_cols]
+
+ return self.query_info(
+ True, False, in_expr, pk_cols, zero_idx, lookup_cols
+ )
def _init_for_join(self):
self._parent_alias = aliased(self.parent.class_)
pa_insp = inspect(self._parent_alias)
- self._pk_cols = pk_cols = [
+ pk_cols = [
pa_insp._adapt_element(col) for col in self.parent.primary_key
]
if len(pk_cols) > 1:
- self._in_expr = sql.tuple_(*pk_cols)
- self._zero_idx = False
+ in_expr = sql.tuple_(*pk_cols)
+ zero_idx = False
else:
- self._in_expr = pk_cols[0]
- self._zero_idx = True
+ in_expr = pk_cols[0]
+ zero_idx = True
+ return self.query_info(False, True, in_expr, pk_cols, zero_idx, None)
def init_class_attribute(self, mapper):
self.parent_property._get_strategy(
if load_only and self.key not in load_only:
return
- our_states = [
- (state.key[1], state, overwrite) for state, overwrite in states
- ]
+ query_info = self._query_info
- pk_cols = self._pk_cols
- in_expr = self._in_expr
+ if query_info.load_only_child:
+ our_states = collections.defaultdict(list)
- if self.omit_join:
+ mapper = self.parent
+
+ for state, overwrite in states:
+ state_dict = state.dict
+ related_ident = tuple(
+ mapper._get_state_attr_by_column(
+ state,
+ state_dict,
+ lk,
+ passive=attributes.PASSIVE_NO_FETCH,
+ )
+ for lk in query_info.child_lookup_cols
+ )
+ # if the loaded parent objects do not have the foreign key
+ # to the related item loaded, then degrade into the joined
+ # version of selectinload
+ if attributes.PASSIVE_NO_RESULT in related_ident:
+ query_info = self._fallback_query_info
+ break
+ if None not in related_ident:
+ our_states[related_ident].append(
+ (state, state_dict, overwrite)
+ )
+
+ if not query_info.load_only_child:
+ our_states = [
+ (state.key[1], state, state.dict, overwrite)
+ for state, overwrite in states
+ ]
+
+ pk_cols = query_info.pk_cols
+ in_expr = query_info.in_expr
+
+ if not query_info.load_with_join:
# in "omit join" mode, the primary key column and the
# "in" expression are in terms of the related entity. So
# if the related entity is polymorphic or otherwise aliased,
- # we need to adapt our "_pk_cols" and "_in_expr" to that
+ # we need to adapt our "pk_cols" and "in_expr" to that
# entity. in non-"omit join" mode, these are against the
# parent entity and do not need adaption.
insp = inspect(effective_entity)
self,
)
- if self.omit_join:
+ if not query_info.load_with_join:
# the Bundle we have in the "omit_join" case is against raw, non
# annotated columns, so to ensure the Query knows its primary
# entity, we add it explictly. If we made the Bundle against
)
)
- q.add_criteria(
- lambda q: q.filter(
- in_expr.in_(sql.bindparam("primary_keys", expanding=True))
- ).order_by(*pk_cols)
- )
+ if query_info.load_only_child:
+ q.add_criteria(
+ lambda q: q.filter(
+ in_expr.in_(sql.bindparam("primary_keys", expanding=True))
+ )
+ )
+ else:
+ q.add_criteria(
+ lambda q: q.filter(
+ in_expr.in_(sql.bindparam("primary_keys", expanding=True))
+ ).order_by(*pk_cols)
+ )
orig_query = context.query
q.add_criteria(lambda q: q.populate_existing())
if self.parent_property.order_by:
- if self.omit_join:
+ if not query_info.load_with_join:
eager_order_by = self.parent_property.order_by
if insp.is_aliased_class:
eager_order_by = [
q.add_criteria(_setup_outermost_orderby)
+ if query_info.load_only_child:
+ self._load_via_child(our_states, query_info, q, context)
+ else:
+ self._load_via_parent(our_states, query_info, q, context)
+
+ def _load_via_child(self, our_states, query_info, q, context):
+ uselist = self.uselist
+
+ # this sort is really for the benefit of the unit tests
+ our_keys = sorted(our_states)
+ while our_keys:
+ chunk = our_keys[0 : self._chunksize]
+ our_keys = our_keys[self._chunksize :]
+
+ data = {
+ k: v
+ for k, v in q(context.session).params(
+ primary_keys=[
+ key[0] if query_info.zero_idx else key for key in chunk
+ ]
+ )
+ }
+
+ for key in chunk:
+ related_obj = data[key]
+ for state, dict_, overwrite in our_states[key]:
+ if not overwrite and self.key in dict_:
+ continue
+
+ state.get_impl(self.key).set_committed_value(
+ state,
+ dict_,
+ related_obj if not uselist else [related_obj],
+ )
+
+ def _load_via_parent(self, our_states, query_info, q, context):
uselist = self.uselist
_empty_result = () if uselist else None
chunk = our_states[0 : self._chunksize]
our_states = our_states[self._chunksize :]
+ primary_keys = [
+ key[0] if query_info.zero_idx else key
+ for key, state, state_dict, overwrite in chunk
+ ]
+
data = {
k: [vv[1] for vv in v]
for k, v in itertools.groupby(
- q(context.session).params(
- primary_keys=[
- key[0] if self._zero_idx else key
- for key, state, overwrite in chunk
- ]
- ),
+ q(context.session).params(primary_keys=primary_keys),
lambda x: x[0],
)
}
- for key, state, overwrite in chunk:
+ for key, state, state_dict, overwrite in chunk:
- if not overwrite and self.key in state.dict:
+ if not overwrite and self.key in state_dict:
continue
collection = data.get(key, _empty_result)
"attribute '%s' " % self
)
state.get_impl(self.key).set_committed_value(
- state, state.dict, collection[0]
+ state, state_dict, collection[0]
)
else:
state.get_impl(self.key).set_committed_value(
- state, state.dict, collection
+ state, state_dict, collection
)
import sqlalchemy as sa
from sqlalchemy import bindparam
from sqlalchemy import ForeignKey
+from sqlalchemy import ForeignKeyConstraint
from sqlalchemy import Integer
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy.orm import clear_mappers
from sqlalchemy.orm import create_session
from sqlalchemy.orm import defaultload
+from sqlalchemy.orm import defer
from sqlalchemy.orm import deferred
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import mapper
self.assert_sql_count(testing.db, go, 0)
+class TupleTest(fixtures.DeclarativeMappedTest):
+ __requires__ = ("tuple_in",)
+
+ @classmethod
+ def setup_classes(cls):
+ Base = cls.DeclarativeBasic
+
+ class A(fixtures.ComparableEntity, Base):
+ __tablename__ = "a"
+ id1 = Column(Integer, primary_key=True)
+ id2 = Column(Integer, primary_key=True)
+
+ bs = relationship("B", order_by="B.id", back_populates="a")
+
+ class B(fixtures.ComparableEntity, Base):
+ __tablename__ = "b"
+ id = Column(Integer, primary_key=True)
+ a_id1 = Column()
+ a_id2 = Column()
+
+ a = relationship("A", back_populates="bs")
+
+ __table_args__ = (
+ ForeignKeyConstraint(["a_id1", "a_id2"], ["a.id1", "a.id2"]),
+ )
+
+ @classmethod
+ def insert_data(cls):
+ A, B = cls.classes("A", "B")
+
+ session = Session()
+ session.add_all(
+ [
+ A(id1=i, id2=i + 2, bs=[B(id=(i * 6) + j) for j in range(6)])
+ for i in range(1, 20)
+ ]
+ )
+ session.commit()
+
+ def test_load_o2m(self):
+ A, B = self.classes("A", "B")
+
+ session = Session()
+
+ def go():
+ q = (
+ session.query(A)
+ .options(selectinload(A.bs))
+ .order_by(A.id1, A.id2)
+ )
+ return q.all()
+
+ result = self.assert_sql_execution(
+ testing.db,
+ go,
+ CompiledSQL(
+ "SELECT a.id1 AS a_id1, a.id2 AS a_id2 "
+ "FROM a ORDER BY a.id1, a.id2",
+ {},
+ ),
+ CompiledSQL(
+ "SELECT b.a_id1 AS b_a_id1, b.a_id2 AS b_a_id2, b.id AS b_id "
+ "FROM b WHERE (b.a_id1, b.a_id2) IN "
+ "([EXPANDING_primary_keys]) ORDER BY b.a_id1, b.a_id2, b.id",
+ [{"primary_keys": [(i, i + 2) for i in range(1, 20)]}],
+ ),
+ )
+ eq_(
+ result,
+ [
+ A(id1=i, id2=i + 2, bs=[B(id=(i * 6) + j) for j in range(6)])
+ for i in range(1, 20)
+ ],
+ )
+
+ def test_load_m2o(self):
+ A, B = self.classes("A", "B")
+
+ session = Session()
+
+ def go():
+ q = session.query(B).options(selectinload(B.a)).order_by(B.id)
+ return q.all()
+
+ result = self.assert_sql_execution(
+ testing.db,
+ go,
+ CompiledSQL(
+ "SELECT b.id AS b_id, b.a_id1 AS b_a_id1, b.a_id2 AS b_a_id2 "
+ "FROM b ORDER BY b.id",
+ {},
+ ),
+ CompiledSQL(
+ "SELECT a.id1 AS a_id1, a.id2 AS a_id2 FROM a "
+ "WHERE (a.id1, a.id2) IN ([EXPANDING_primary_keys])",
+ [{"primary_keys": [(i, i + 2) for i in range(1, 20)]}],
+ ),
+ )
+ as_ = [A(id1=i, id2=i + 2) for i in range(1, 20)]
+
+ eq_(
+ result,
+ [
+ B(id=(i * 6) + j, a=as_[i - 1])
+ for i in range(1, 20)
+ for j in range(6)
+ ],
+ )
+
+
class ChunkingTest(fixtures.DeclarativeMappedTest):
"""test IN chunking.
class A(fixtures.ComparableEntity, Base):
__tablename__ = "a"
id = Column(Integer, primary_key=True)
- bs = relationship("B", order_by="B.id")
+ bs = relationship("B", order_by="B.id", back_populates="a")
class B(fixtures.ComparableEntity, Base):
__tablename__ = "b"
id = Column(Integer, primary_key=True)
a_id = Column(ForeignKey("a.id"))
+ a = relationship("A", back_populates="bs")
@classmethod
def insert_data(cls):
# (if you enable subquery eager w/ yield_per)
self.assert_sql_count(testing.db, go, total_expected_statements)
+ def test_dont_emit_for_redundant_m2o(self):
+ A, B = self.classes("A", "B")
+
+ session = Session()
+
+ def go():
+ with mock.patch(
+ "sqlalchemy.orm.strategies.SelectInLoader._chunksize", 47
+ ):
+ q = session.query(B).options(selectinload(B.a)).order_by(B.id)
+
+ for b in q:
+ b.a
+
+ self.assert_sql_execution(
+ testing.db,
+ go,
+ CompiledSQL(
+ "SELECT b.id AS b_id, b.a_id AS b_a_id FROM b ORDER BY b.id",
+ {},
+ ),
+ # chunk size is 47. so first chunk are a 1->47...
+ CompiledSQL(
+ "SELECT a.id AS a_id FROM a WHERE a.id IN "
+ "([EXPANDING_primary_keys])",
+ {"primary_keys": list(range(1, 48))},
+ ),
+ # second chunk is a 48-94
+ CompiledSQL(
+ "SELECT a.id AS a_id FROM a WHERE a.id IN "
+ "([EXPANDING_primary_keys])",
+ {"primary_keys": list(range(48, 95))},
+ ),
+ # third and final chunk 95-100.
+ CompiledSQL(
+ "SELECT a.id AS a_id FROM a WHERE a.id IN "
+ "([EXPANDING_primary_keys])",
+ {"primary_keys": list(range(95, 101))},
+ ),
+ )
+
class SubRelationFromJoinedSubclassMultiLevelTest(_Polymorphic):
@classmethod
def setup_classes(cls):
Base = cls.DeclarativeBasic
- class Foo(Base):
+ class Foo(fixtures.ComparableEntity, Base):
__tablename__ = "foo"
id = Column(Integer, primary_key=True)
type = Column(String(50))
.filter(Foo.id == 2)
.options(selectinload(attr1).selectinload(attr2))
)
- self.assert_sql_execution(
+ results = self.assert_sql_execution(
testing.db,
q.all,
CompiledSQL(
[{"id_1": 2}],
),
CompiledSQL(
- "SELECT foo_1.id AS foo_1_id, foo_2.id AS foo_2_id, "
- "foo_2.type AS foo_2_type, foo_2.foo_id AS foo_2_foo_id "
- "FROM foo AS foo_1 JOIN foo AS foo_2 "
- "ON foo_2.id = foo_1.foo_id "
- "WHERE foo_1.id "
- "IN ([EXPANDING_primary_keys]) ORDER BY foo_1.id",
- {"primary_keys": [2]},
+ "SELECT foo_1.id AS foo_1_id, "
+ "foo_1.type AS foo_1_type, foo_1.foo_id AS foo_1_foo_id "
+ "FROM foo AS foo_1 "
+ "WHERE foo_1.id IN ([EXPANDING_primary_keys])",
+ {"primary_keys": [3]},
),
CompiledSQL(
- "SELECT foo_1.id AS foo_1_id, foo_2.id AS foo_2_id, "
- "foo_2.type AS foo_2_type, foo_2.foo_id AS foo_2_foo_id "
- "FROM foo AS foo_1 JOIN foo AS foo_2 "
- "ON foo_2.id = foo_1.foo_id "
- "WHERE foo_1.id IN ([EXPANDING_primary_keys]) "
- "ORDER BY foo_1.id",
- {"primary_keys": [3]},
+ "SELECT foo_1.id AS foo_1_id, "
+ "foo_1.type AS foo_1_type, foo_1.foo_id AS foo_1_foo_id "
+ "FROM foo AS foo_1 "
+ "WHERE foo_1.id IN ([EXPANDING_primary_keys])",
+ {"primary_keys": [1]},
),
)
+ eq_(results, [Bar(id=2, foo=Foo(id=3, foo=Bar(id=1)))])
class TestExistingRowPopulation(fixtures.DeclarativeMappedTest):
{"primary_keys": [1]},
),
)
+
+
+class M2OWDegradeTest(
+ fixtures.DeclarativeMappedTest, testing.AssertsExecutionResults
+):
+ @classmethod
+ def setup_classes(cls):
+ Base = cls.DeclarativeBasic
+
+ class A(fixtures.ComparableEntity, Base):
+ __tablename__ = "a"
+ id = Column(Integer, primary_key=True)
+ b_id = Column(ForeignKey("b.id"))
+ b = relationship("B")
+ q = Column(Integer)
+
+ class B(fixtures.ComparableEntity, Base):
+ __tablename__ = "b"
+ id = Column(Integer, primary_key=True)
+ x = Column(Integer)
+ y = Column(Integer)
+
+ @classmethod
+ def insert_data(cls):
+ A, B = cls.classes("A", "B")
+
+ s = Session()
+ b1, b2 = B(id=1, x=5, y=9), B(id=2, x=10, y=8)
+ s.add_all(
+ [
+ A(id=1, b=b1),
+ A(id=2, b=b2),
+ A(id=3, b=b2),
+ A(id=4, b=None),
+ A(id=5, b=b1),
+ ]
+ )
+ s.commit()
+
+ def test_use_join_parent_criteria(self):
+ A, B = self.classes("A", "B")
+ s = Session()
+ q = (
+ s.query(A)
+ .filter(A.id.in_([1, 3]))
+ .options(selectinload(A.b))
+ .order_by(A.id)
+ )
+ results = self.assert_sql_execution(
+ testing.db,
+ q.all,
+ CompiledSQL(
+ "SELECT a.id AS a_id, a.b_id AS a_b_id, a.q AS a_q "
+ "FROM a WHERE a.id IN (:id_1, :id_2) ORDER BY a.id",
+ [{"id_1": 1, "id_2": 3}],
+ ),
+ CompiledSQL(
+ "SELECT b.id AS b_id, b.x AS b_x, b.y AS b_y "
+ "FROM b WHERE b.id IN ([EXPANDING_primary_keys])",
+ [{"primary_keys": [1, 2]}],
+ ),
+ )
+
+ eq_(
+ results,
+ [A(id=1, b=B(id=1, x=5, y=9)), A(id=3, b=B(id=2, x=10, y=8))],
+ )
+
+ def test_use_join_parent_criteria_degrade_on_defer(self):
+ A, B = self.classes("A", "B")
+ s = Session()
+ q = (
+ s.query(A)
+ .filter(A.id.in_([1, 3]))
+ .options(defer(A.b_id), selectinload(A.b))
+ .order_by(A.id)
+ )
+ results = self.assert_sql_execution(
+ testing.db,
+ q.all,
+ CompiledSQL(
+ "SELECT a.id AS a_id, a.q AS a_q "
+ "FROM a WHERE a.id IN (:id_1, :id_2) ORDER BY a.id",
+ [{"id_1": 1, "id_2": 3}],
+ ),
+ # in the very unlikely case that the the FK col on parent is
+ # deferred, we degrade to the JOIN version so that we don't need to
+ # emit either for each parent object individually, or as a second
+ # query for them.
+ CompiledSQL(
+ "SELECT a_1.id AS a_1_id, b.id AS b_id, b.x AS b_x, "
+ "b.y AS b_y "
+ "FROM a AS a_1 JOIN b ON b.id = a_1.b_id "
+ "WHERE a_1.id IN ([EXPANDING_primary_keys]) ORDER BY a_1.id",
+ [{"primary_keys": [1, 3]}],
+ ),
+ )
+
+ eq_(
+ results,
+ [A(id=1, b=B(id=1, x=5, y=9)), A(id=3, b=B(id=2, x=10, y=8))],
+ )
+
+ def test_use_join(self):
+ A, B = self.classes("A", "B")
+ s = Session()
+ q = s.query(A).options(selectinload(A.b)).order_by(A.id)
+ results = self.assert_sql_execution(
+ testing.db,
+ q.all,
+ CompiledSQL(
+ "SELECT a.id AS a_id, a.b_id AS a_b_id, a.q AS a_q "
+ "FROM a ORDER BY a.id",
+ [{}],
+ ),
+ CompiledSQL(
+ "SELECT b.id AS b_id, b.x AS b_x, b.y AS b_y "
+ "FROM b WHERE b.id IN ([EXPANDING_primary_keys])",
+ [{"primary_keys": [1, 2]}],
+ ),
+ )
+
+ b1, b2 = B(id=1, x=5, y=9), B(id=2, x=10, y=8)
+ eq_(
+ results,
+ [
+ A(id=1, b=b1),
+ A(id=2, b=b2),
+ A(id=3, b=b2),
+ A(id=4, b=None),
+ A(id=5, b=b1),
+ ],
+ )
+
+ def test_use_join_parent_degrade_on_defer(self):
+ A, B = self.classes("A", "B")
+ s = Session()
+ q = s.query(A).options(defer(A.b_id), selectinload(A.b)).order_by(A.id)
+ results = self.assert_sql_execution(
+ testing.db,
+ q.all,
+ CompiledSQL(
+ "SELECT a.id AS a_id, a.q AS a_q " "FROM a ORDER BY a.id", [{}]
+ ),
+ # in the very unlikely case that the the FK col on parent is
+ # deferred, we degrade to the JOIN version so that we don't need to
+ # emit either for each parent object individually, or as a second
+ # query for them.
+ CompiledSQL(
+ "SELECT a_1.id AS a_1_id, b.id AS b_id, b.x AS b_x, "
+ "b.y AS b_y "
+ "FROM a AS a_1 JOIN b ON b.id = a_1.b_id "
+ "WHERE a_1.id IN ([EXPANDING_primary_keys]) ORDER BY a_1.id",
+ [{"primary_keys": [1, 2, 3, 4, 5]}],
+ ),
+ )
+
+ b1, b2 = B(id=1, x=5, y=9), B(id=2, x=10, y=8)
+ eq_(
+ results,
+ [
+ A(id=1, b=b1),
+ A(id=2, b=b2),
+ A(id=3, b=b2),
+ A(id=4, b=None),
+ A(id=5, b=b1),
+ ],
+ )