--- /dev/null
+.. change::
+ :tags: feature, ext
+ :tickets: 4243
+ :versions: 1.3.0b1
+
+ Added new attribute :attr:`.Query.lazy_loaded_from` which is populated
+ with an :class:`.InstanceState` that is using this :class:`.Query` in
+ order to lazy load a relationship. The rationale for this is that
+ it serves as a hint for the horizontal sharding feature to use, such that
+ the identity token of the state can be used as the default identity token
+ to use for the query within id_chooser().
distributed among DBs.
"""
- return ['north_america', 'asia', 'europe', 'south_america']
+ if query.lazy_loaded_from:
+ # if we are in a lazy load, we can look at the parent object
+ # and limit our search to that same shard, assuming that's how we've
+ # set things up.
+ return [query.lazy_loaded_from.identity_token]
+ else:
+ return ['north_america', 'asia', 'europe', 'south_america']
def query_chooser(query):
bk._cache_key = cache_key
q = bk.for_session(session)
for fn in post_criteria:
- q = fn(q)
+ q = q.with_post_criteria(fn)
context.attributes[k] = q.params(**params)
return iter(partial)
def _identity_lookup(
- self, mapper, primary_key_identity, identity_token=None, **kw):
+ self, mapper, primary_key_identity, identity_token=None,
+ lazy_loaded_from=None, **kw):
"""override the default Query._identity_lookup method so that we
search for a given non-token primary key identity across all
possible identity tokens (e.g. shard ids).
)
else:
q = self.session.query(mapper)
+ if lazy_loaded_from:
+ q = q._set_lazyload_from(lazy_loaded_from)
for shard_id in self.id_chooser(q, primary_key_identity):
obj = super(ShardedQuery, self)._identity_lookup(
mapper, primary_key_identity, identity_token=shard_id, **kw
_current_path = _path_registry
_has_mapper_entities = False
+ lazy_loaded_from = None
+ """An :class:`.InstanceState` that is using this :class:`.Query` for a
+ lazy load operation.
+
+ This can be used for extensions like the horizontal sharding extension
+ as well as event handlers and custom mapper options to determine
+ when a query is being used to lazy load a relationship on an object.
+
+ .. versionadded:: 1.2.9
+
+ """
+
def __init__(self, entities, session=None):
"""Construct a :class:`.Query` directly.
for o in cols
]
+ @_generative()
+ def _set_lazyload_from(self, state):
+ self.lazy_loaded_from = state
+
@_generative()
def _adapt_all_clauses(self):
self._orm_only_adapt = False
ident, loading.load_on_pk_identity)
def _identity_lookup(self, mapper, primary_key_identity,
- identity_token=None,
- passive=attributes.PASSIVE_OFF):
+ identity_token=None, passive=attributes.PASSIVE_OFF,
+ lazy_loaded_from=None):
"""Locate an object in the identity map.
Given a primary key identity, constructs an identity key and then
:func:`.loading.get_from_identity`, which impacts the behavior if
the object is found; the object may be validated and/or unexpired
if the flag allows for SQL to be emitted.
+ :param lazy_loaded_from: an :class:`.InstanceState` that is
+ specifically asking for this identity as a related identity. Used
+ for sharding schemes where there is a correspondence between an object
+ and a related object being lazy-loaded (or otherwise
+ relationship-loaded).
+
+ .. versionadded:: 1.2.9
+
:return: None if the object is not found in the identity map, *or*
if the object was unexpired and found to have been deleted.
if passive flags disallow SQL and the object is expired, returns
# does this, including how it decides what the correct
# identity_token would be for this identity.
instance = session.query()._identity_lookup(
- self.mapper, primary_key_identity, passive=passive
+ self.mapper, primary_key_identity, passive=passive,
+ lazy_loaded_from=state
)
if instance is not None:
if self.use_get:
if self._raise_on_sql:
self._invoke_raise_load(state, passive, "raise_on_sql")
- return q(session)._load_on_pk_identity(
- session.query(self.mapper), primary_key_identity)
+
+ return q(session).\
+ with_post_criteria(lambda q: q._set_lazyload_from(state)).\
+ _load_on_pk_identity(
+ session.query(self.mapper),
+ primary_key_identity)
if self.parent_property.order_by:
q.add_criteria(
q._params = params
return q
- result = q(session).with_post_criteria(set_default_params).all()
+ result = q(session).\
+ with_post_criteria(lambda q: q._set_lazyload_from(state)).\
+ with_post_criteria(set_default_params).all()
if self.uselist:
return result
else:
self.assert_sql_execution(
db, callable_, assertsql.CountStatements(count))
+ def assert_multiple_sql_count(self, dbs, callable_, counts):
+ recs = [
+ (self.sql_execution_asserter(db), db, count)
+ for (db, count) in zip(dbs, counts)
+ ]
+ asserters = []
+ for ctx, db, count in recs:
+ asserters.append(ctx.__enter__())
+ try:
+ return callable_()
+ finally:
+ for asserter, (ctx, db, count) in zip(asserters, recs):
+ ctx.__exit__(None, None, None)
+ asserter.assert_(assertsql.CountStatements(count))
+
@contextlib.contextmanager
def assert_execution(self, db, *rules):
with self.sql_execution_asserter(db) as asserter:
db3 = testing_engine('sqlite:///shard3.db')
db4 = testing_engine('sqlite:///shard4.db')
- return db1, db2, db3, db4
+ self.dbs = [db1, db2, db3, db4]
+ return self.dbs
- def tearDown(self):
+ def teardown(self):
clear_mappers()
- for db in (db1, db2, db3, db4):
+ for db in self.dbs:
db.connect().invalidate()
for i in range(1, 5):
os.remove("shard%d.db" % i)
eq_(a1.data, "d1")
-class LazyLoadFromIdentityMapTest(fixtures.DeclarativeMappedTest):
+class LazyLoadIdentityKeyTest(fixtures.DeclarativeMappedTest):
+ def _init_dbs(self):
+ self.db1 = db1 = testing_engine('sqlite:///shard1.db',
+ options=dict(pool_threadlocal=True))
+ self.db2 = db2 = testing_engine('sqlite:///shard2.db')
+
+ for db in (db1, db2):
+ self.metadata.create_all(db)
+
+ self.dbs = [db1, db2]
+
+ return self.dbs
+
+ def teardown(self):
+ for db in self.dbs:
+ db.connect().invalidate()
+ for i in range(1, 3):
+ os.remove("shard%d.db" % i)
+
@classmethod
def setup_classes(cls):
Base = cls.DeclarativeBasic
class Book(Base):
__tablename__ = 'book'
id = Column(Integer, primary_key=True)
+ title = Column(String(50), nullable=False)
pages = relationship('Page', backref='book')
class Page(Base):
__tablename__ = 'page'
id = Column(Integer, primary_key=True)
book_id = Column(ForeignKey('book.id'))
+ title = Column(String(50))
- def test_lazy_load_from_identity_map(self):
+ def _fixture(self, lazy_load_book=False, lazy_load_pages=False):
+ Book, Page = self.classes("Book", "Page")
+
+ def shard_for_book(book):
+ if book.title == "title 1":
+ return "test"
+ elif book.title == "title 2":
+ return "test2"
+ else:
+ assert False
+
+ def id_chooser(query, ident):
+ assert query.lazy_loaded_from
+ if isinstance(query.lazy_loaded_from.obj(), Book):
+ token = shard_for_book(query.lazy_loaded_from.obj())
+ assert query.lazy_loaded_from.identity_token == token
+
+ return [query.lazy_loaded_from.identity_token]
+
+ def no_query_chooser(query):
+ if query.column_descriptions[0]['type'] is Book and lazy_load_book:
+ assert isinstance(query.lazy_loaded_from.obj(), Page)
+ elif query.column_descriptions[0]['type'] is Page and lazy_load_pages:
+ assert isinstance(query.lazy_loaded_from.obj(), Book)
+
+ if query.lazy_loaded_from is None:
+ return ['test', 'test2']
+ else:
+ return [query.lazy_loaded_from.identity_token]
+
+ def shard_chooser(mapper, instance, **kw):
+ if isinstance(instance, Page):
+ return shard_for_book(instance.book)
+ else:
+ return shard_for_book(instance)
+
+ db1, db2 = self._init_dbs()
session = ShardedSession(
- shards={"test": testing.db},
- shard_chooser=lambda *args: 'test',
- id_chooser=lambda *args: ['test'],
- query_chooser=lambda *args: ['test']
+ shards={"test": db1, "test2": db2},
+ shard_chooser=shard_chooser,
+ id_chooser=id_chooser,
+ query_chooser=no_query_chooser
)
+ return session
+
+ def test_lazy_load_from_identity_map(self):
+ session = self._fixture()
+
Book, Page = self.classes("Book", "Page")
- book = Book()
+ book = Book(title="title 1")
book.pages.append(Page())
session.add(book)
- session.commit()
+ session.flush()
- book = session.query(Book).first()
page = session.query(Page).first()
+ session.expire(page, ['book'])
+
def go():
eq_(page.book, book)
# doesn't emit SQL
- self.assert_sql_count(
- testing.db,
+ self.assert_multiple_sql_count(
+ self.dbs,
go,
- 0)
+ [0, 0])
+
+ def test_lazy_load_from_db(self):
+ session = self._fixture(lazy_load_book=True)
+
+ Book, Page = self.classes("Book", "Page")
+ book1 = Book(title="title 1")
+ book1.pages.append(Page(title="book 1 page 1"))
+
+ session.add(book1)
+ session.flush()
+
+ book1_id = inspect(book1).identity_key
+ session.expunge(book1)
+
+ book1_page = session.query(Page).first()
+ session.expire(book1_page, ['book'])
+
+ def go():
+ eq_(inspect(book1_page.book).identity_key, book1_id)
+
+ # emits one query
+ self.assert_multiple_sql_count(
+ self.dbs,
+ go,
+ [1, 0])
+
+ def test_lazy_load_no_baked_conflict(self):
+ session = self._fixture(lazy_load_pages=True)
+
+ Book, Page = self.classes("Book", "Page")
+ book1 = Book(title="title 1")
+ book1.pages.append(Page(title="book 1 page 1"))
+
+ book2 = Book(title="title 2")
+ book2.pages.append(Page(title="book 2 page 1"))
+
+ session.add(book1)
+ session.add(book2)
+ session.flush()
+
+ session.expire(book1, ['pages'])
+ session.expire(book2, ['pages'])
+
+ eq_(book1.pages[0].title, "book 1 page 1")
+
+ # second lazy load uses correct state
+ eq_(book2.pages[0].title, "book 2 page 1")