From: Mike Bayer Date: Mon, 28 May 2018 17:03:14 +0000 (-0400) Subject: Add Query.lazy_load_from attribute for sharding X-Git-Tag: rel_1_3_0b1~172^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a574b409296ef793cec8e1d00f1f7be48f15325e;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Add Query.lazy_load_from attribute for sharding Added new attribute :attr:`.Query.lazy_loaded_from` which is populated with an :class:`.InstanceState` that is using this :class:`.Query` in order to lazy load a relationship. The rationale for this is that it serves as a hint for the horizontal sharding feature to use, such that the identity token of the state can be used as the default identity token to use for the query within id_chooser(). Also repaired an issue in the :meth:`.Result.with_post_criteria` method added in I899808734458e25a023142c2c5bb37cbed869479 for :ticket:`4128` where the "unbake subquery loaders" version was calling the post crtieria functions given the :class:`.Result` as the argument rather than applying them to the :class:`.Query`. Change-Id: I3c0919ce7fd151b80fe2f9b5f99f60df31c2d73d Fixes: #4243 --- diff --git a/doc/build/changelog/unreleased_12/4243.rst b/doc/build/changelog/unreleased_12/4243.rst new file mode 100644 index 0000000000..bd27d5c051 --- /dev/null +++ b/doc/build/changelog/unreleased_12/4243.rst @@ -0,0 +1,11 @@ +.. change:: + :tags: feature, ext + :tickets: 4243 + :versions: 1.3.0b1 + + Added new attribute :attr:`.Query.lazy_loaded_from` which is populated + with an :class:`.InstanceState` that is using this :class:`.Query` in + order to lazy load a relationship. The rationale for this is that + it serves as a hint for the horizontal sharding feature to use, such that + the identity token of the state can be used as the default identity token + to use for the query within id_chooser(). diff --git a/examples/sharding/attribute_shard.py b/examples/sharding/attribute_shard.py index cd9b14d5e2..0e19b69f36 100644 --- a/examples/sharding/attribute_shard.py +++ b/examples/sharding/attribute_shard.py @@ -133,7 +133,13 @@ def id_chooser(query, ident): distributed among DBs. """ - return ['north_america', 'asia', 'europe', 'south_america'] + if query.lazy_loaded_from: + # if we are in a lazy load, we can look at the parent object + # and limit our search to that same shard, assuming that's how we've + # set things up. + return [query.lazy_loaded_from.identity_token] + else: + return ['north_america', 'asia', 'europe', 'south_america'] def query_chooser(query): diff --git a/lib/sqlalchemy/ext/baked.py b/lib/sqlalchemy/ext/baked.py index f4d71f4103..79457e86ea 100644 --- a/lib/sqlalchemy/ext/baked.py +++ b/lib/sqlalchemy/ext/baked.py @@ -253,7 +253,7 @@ class BakedQuery(object): bk._cache_key = cache_key q = bk.for_session(session) for fn in post_criteria: - q = fn(q) + q = q.with_post_criteria(fn) context.attributes[k] = q.params(**params) diff --git a/lib/sqlalchemy/ext/horizontal_shard.py b/lib/sqlalchemy/ext/horizontal_shard.py index c7770d195a..6ef4c56126 100644 --- a/lib/sqlalchemy/ext/horizontal_shard.py +++ b/lib/sqlalchemy/ext/horizontal_shard.py @@ -65,7 +65,8 @@ class ShardedQuery(Query): return iter(partial) def _identity_lookup( - self, mapper, primary_key_identity, identity_token=None, **kw): + self, mapper, primary_key_identity, identity_token=None, + lazy_loaded_from=None, **kw): """override the default Query._identity_lookup method so that we search for a given non-token primary key identity across all possible identity tokens (e.g. shard ids). @@ -79,6 +80,8 @@ class ShardedQuery(Query): ) else: q = self.session.query(mapper) + if lazy_loaded_from: + q = q._set_lazyload_from(lazy_loaded_from) for shard_id in self.id_chooser(q, primary_key_identity): obj = super(ShardedQuery, self)._identity_lookup( mapper, primary_key_identity, identity_token=shard_id, **kw diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py index 56e42a7029..e7efc172a1 100644 --- a/lib/sqlalchemy/orm/query.py +++ b/lib/sqlalchemy/orm/query.py @@ -112,6 +112,18 @@ class Query(object): _current_path = _path_registry _has_mapper_entities = False + lazy_loaded_from = None + """An :class:`.InstanceState` that is using this :class:`.Query` for a + lazy load operation. + + This can be used for extensions like the horizontal sharding extension + as well as event handlers and custom mapper options to determine + when a query is being used to lazy load a relationship on an object. + + .. versionadded:: 1.2.9 + + """ + def __init__(self, entities, session=None): """Construct a :class:`.Query` directly. @@ -260,6 +272,10 @@ class Query(object): for o in cols ] + @_generative() + def _set_lazyload_from(self, state): + self.lazy_loaded_from = state + @_generative() def _adapt_all_clauses(self): self._orm_only_adapt = False @@ -887,8 +903,8 @@ class Query(object): ident, loading.load_on_pk_identity) def _identity_lookup(self, mapper, primary_key_identity, - identity_token=None, - passive=attributes.PASSIVE_OFF): + identity_token=None, passive=attributes.PASSIVE_OFF, + lazy_loaded_from=None): """Locate an object in the identity map. Given a primary key identity, constructs an identity key and then @@ -913,6 +929,14 @@ class Query(object): :func:`.loading.get_from_identity`, which impacts the behavior if the object is found; the object may be validated and/or unexpired if the flag allows for SQL to be emitted. + :param lazy_loaded_from: an :class:`.InstanceState` that is + specifically asking for this identity as a related identity. Used + for sharding schemes where there is a correspondence between an object + and a related object being lazy-loaded (or otherwise + relationship-loaded). + + .. versionadded:: 1.2.9 + :return: None if the object is not found in the identity map, *or* if the object was unexpired and found to have been deleted. if passive flags disallow SQL and the object is expired, returns diff --git a/lib/sqlalchemy/orm/strategies.py b/lib/sqlalchemy/orm/strategies.py index 93288c3d61..d7597d3b2a 100644 --- a/lib/sqlalchemy/orm/strategies.py +++ b/lib/sqlalchemy/orm/strategies.py @@ -617,7 +617,8 @@ class LazyLoader(AbstractRelationshipLoader, util.MemoizedSlots): # does this, including how it decides what the correct # identity_token would be for this identity. instance = session.query()._identity_lookup( - self.mapper, primary_key_identity, passive=passive + self.mapper, primary_key_identity, passive=passive, + lazy_loaded_from=state ) if instance is not None: @@ -715,8 +716,12 @@ class LazyLoader(AbstractRelationshipLoader, util.MemoizedSlots): if self.use_get: if self._raise_on_sql: self._invoke_raise_load(state, passive, "raise_on_sql") - return q(session)._load_on_pk_identity( - session.query(self.mapper), primary_key_identity) + + return q(session).\ + with_post_criteria(lambda q: q._set_lazyload_from(state)).\ + _load_on_pk_identity( + session.query(self.mapper), + primary_key_identity) if self.parent_property.order_by: q.add_criteria( @@ -761,7 +766,9 @@ class LazyLoader(AbstractRelationshipLoader, util.MemoizedSlots): q._params = params return q - result = q(session).with_post_criteria(set_default_params).all() + result = q(session).\ + with_post_criteria(lambda q: q._set_lazyload_from(state)).\ + with_post_criteria(set_default_params).all() if self.uselist: return result else: diff --git a/lib/sqlalchemy/testing/assertions.py b/lib/sqlalchemy/testing/assertions.py index 69d43c92f7..e42376921a 100644 --- a/lib/sqlalchemy/testing/assertions.py +++ b/lib/sqlalchemy/testing/assertions.py @@ -518,6 +518,21 @@ class AssertsExecutionResults(object): self.assert_sql_execution( db, callable_, assertsql.CountStatements(count)) + def assert_multiple_sql_count(self, dbs, callable_, counts): + recs = [ + (self.sql_execution_asserter(db), db, count) + for (db, count) in zip(dbs, counts) + ] + asserters = [] + for ctx, db, count in recs: + asserters.append(ctx.__enter__()) + try: + return callable_() + finally: + for asserter, (ctx, db, count) in zip(asserters, recs): + ctx.__exit__(None, None, None) + asserter.assert_(assertsql.CountStatements(count)) + @contextlib.contextmanager def assert_execution(self, db, *rules): with self.sql_execution_asserter(db) as asserter: diff --git a/test/ext/test_horizontal_shard.py b/test/ext/test_horizontal_shard.py index 4b37cbd16c..00dfeb29b6 100644 --- a/test/ext/test_horizontal_shard.py +++ b/test/ext/test_horizontal_shard.py @@ -304,12 +304,13 @@ class DistinctEngineShardTest(ShardTest, fixtures.TestBase): db3 = testing_engine('sqlite:///shard3.db') db4 = testing_engine('sqlite:///shard4.db') - return db1, db2, db3, db4 + self.dbs = [db1, db2, db3, db4] + return self.dbs - def tearDown(self): + def teardown(self): clear_mappers() - for db in (db1, db2, db3, db4): + for db in self.dbs: db.connect().invalidate() for i in range(1, 5): os.remove("shard%d.db" % i) @@ -425,7 +426,25 @@ class RefreshDeferExpireTest(fixtures.DeclarativeMappedTest): eq_(a1.data, "d1") -class LazyLoadFromIdentityMapTest(fixtures.DeclarativeMappedTest): +class LazyLoadIdentityKeyTest(fixtures.DeclarativeMappedTest): + def _init_dbs(self): + self.db1 = db1 = testing_engine('sqlite:///shard1.db', + options=dict(pool_threadlocal=True)) + self.db2 = db2 = testing_engine('sqlite:///shard2.db') + + for db in (db1, db2): + self.metadata.create_all(db) + + self.dbs = [db1, db2] + + return self.dbs + + def teardown(self): + for db in self.dbs: + db.connect().invalidate() + for i in range(1, 3): + os.remove("shard%d.db" % i) + @classmethod def setup_classes(cls): Base = cls.DeclarativeBasic @@ -433,36 +452,127 @@ class LazyLoadFromIdentityMapTest(fixtures.DeclarativeMappedTest): class Book(Base): __tablename__ = 'book' id = Column(Integer, primary_key=True) + title = Column(String(50), nullable=False) pages = relationship('Page', backref='book') class Page(Base): __tablename__ = 'page' id = Column(Integer, primary_key=True) book_id = Column(ForeignKey('book.id')) + title = Column(String(50)) - def test_lazy_load_from_identity_map(self): + def _fixture(self, lazy_load_book=False, lazy_load_pages=False): + Book, Page = self.classes("Book", "Page") + + def shard_for_book(book): + if book.title == "title 1": + return "test" + elif book.title == "title 2": + return "test2" + else: + assert False + + def id_chooser(query, ident): + assert query.lazy_loaded_from + if isinstance(query.lazy_loaded_from.obj(), Book): + token = shard_for_book(query.lazy_loaded_from.obj()) + assert query.lazy_loaded_from.identity_token == token + + return [query.lazy_loaded_from.identity_token] + + def no_query_chooser(query): + if query.column_descriptions[0]['type'] is Book and lazy_load_book: + assert isinstance(query.lazy_loaded_from.obj(), Page) + elif query.column_descriptions[0]['type'] is Page and lazy_load_pages: + assert isinstance(query.lazy_loaded_from.obj(), Book) + + if query.lazy_loaded_from is None: + return ['test', 'test2'] + else: + return [query.lazy_loaded_from.identity_token] + + def shard_chooser(mapper, instance, **kw): + if isinstance(instance, Page): + return shard_for_book(instance.book) + else: + return shard_for_book(instance) + + db1, db2 = self._init_dbs() session = ShardedSession( - shards={"test": testing.db}, - shard_chooser=lambda *args: 'test', - id_chooser=lambda *args: ['test'], - query_chooser=lambda *args: ['test'] + shards={"test": db1, "test2": db2}, + shard_chooser=shard_chooser, + id_chooser=id_chooser, + query_chooser=no_query_chooser ) + return session + + def test_lazy_load_from_identity_map(self): + session = self._fixture() + Book, Page = self.classes("Book", "Page") - book = Book() + book = Book(title="title 1") book.pages.append(Page()) session.add(book) - session.commit() + session.flush() - book = session.query(Book).first() page = session.query(Page).first() + session.expire(page, ['book']) + def go(): eq_(page.book, book) # doesn't emit SQL - self.assert_sql_count( - testing.db, + self.assert_multiple_sql_count( + self.dbs, go, - 0) + [0, 0]) + + def test_lazy_load_from_db(self): + session = self._fixture(lazy_load_book=True) + + Book, Page = self.classes("Book", "Page") + book1 = Book(title="title 1") + book1.pages.append(Page(title="book 1 page 1")) + + session.add(book1) + session.flush() + + book1_id = inspect(book1).identity_key + session.expunge(book1) + + book1_page = session.query(Page).first() + session.expire(book1_page, ['book']) + + def go(): + eq_(inspect(book1_page.book).identity_key, book1_id) + + # emits one query + self.assert_multiple_sql_count( + self.dbs, + go, + [1, 0]) + + def test_lazy_load_no_baked_conflict(self): + session = self._fixture(lazy_load_pages=True) + + Book, Page = self.classes("Book", "Page") + book1 = Book(title="title 1") + book1.pages.append(Page(title="book 1 page 1")) + + book2 = Book(title="title 2") + book2.pages.append(Page(title="book 2 page 1")) + + session.add(book1) + session.add(book2) + session.flush() + + session.expire(book1, ['pages']) + session.expire(book2, ['pages']) + + eq_(book1.pages[0].title, "book 1 page 1") + + # second lazy load uses correct state + eq_(book2.pages[0].title, "book 2 page 1")