]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Add Query.lazy_load_from attribute for sharding
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 28 May 2018 17:03:14 +0000 (13:03 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 4 Jun 2018 16:30:05 +0000 (12:30 -0400)
Added new attribute :attr:`.Query.lazy_loaded_from` which is populated
with an :class:`.InstanceState` that is using this :class:`.Query` in
order to lazy load a relationship.  The rationale for this is that
it serves as a hint for the horizontal sharding feature to use, such that
the identity token of the state can be used as the default identity token
to use for the query within id_chooser().

Also repaired an issue in the :meth:`.Result.with_post_criteria`
method added in I899808734458e25a023142c2c5bb37cbed869479
for :ticket:`4128` where the "unbake subquery loaders" version was calling
the post crtieria functions given the :class:`.Result` as the argument
rather than applying them to the :class:`.Query`.

Change-Id: I3c0919ce7fd151b80fe2f9b5f99f60df31c2d73d
Fixes: #4243
doc/build/changelog/unreleased_12/4243.rst [new file with mode: 0644]
examples/sharding/attribute_shard.py
lib/sqlalchemy/ext/baked.py
lib/sqlalchemy/ext/horizontal_shard.py
lib/sqlalchemy/orm/query.py
lib/sqlalchemy/orm/strategies.py
lib/sqlalchemy/testing/assertions.py
test/ext/test_horizontal_shard.py

diff --git a/doc/build/changelog/unreleased_12/4243.rst b/doc/build/changelog/unreleased_12/4243.rst
new file mode 100644 (file)
index 0000000..bd27d5c
--- /dev/null
@@ -0,0 +1,11 @@
+.. change::
+    :tags: feature, ext
+    :tickets: 4243
+    :versions: 1.3.0b1
+
+    Added new attribute :attr:`.Query.lazy_loaded_from` which is populated
+    with an :class:`.InstanceState` that is using this :class:`.Query` in
+    order to lazy load a relationship.  The rationale for this is that
+    it serves as a hint for the horizontal sharding feature to use, such that
+    the identity token of the state can be used as the default identity token
+    to use for the query within id_chooser().
index cd9b14d5e2a77679df7eaabac536345c2ef20433..0e19b69f369a55582048cbcc1fccd60a017a9d7f 100644 (file)
@@ -133,7 +133,13 @@ def id_chooser(query, ident):
     distributed among DBs.
 
     """
-    return ['north_america', 'asia', 'europe', 'south_america']
+    if query.lazy_loaded_from:
+        # if we are in a lazy load, we can look at the parent object
+        # and limit our search to that same shard, assuming that's how we've
+        # set things up.
+        return [query.lazy_loaded_from.identity_token]
+    else:
+        return ['north_america', 'asia', 'europe', 'south_america']
 
 
 def query_chooser(query):
index f4d71f4103bf2405f62378c7538dc91ef41982d9..79457e86eafe8a7fd9982eaa72ebb073e3b3f784 100644 (file)
@@ -253,7 +253,7 @@ class BakedQuery(object):
             bk._cache_key = cache_key
             q = bk.for_session(session)
             for fn in post_criteria:
-                q = fn(q)
+                q = q.with_post_criteria(fn)
             context.attributes[k] = q.params(**params)
 
 
index c7770d195abaeb41ee93649660588092a9d1e539..6ef4c56126f0c60def015957366a711ac452c6b4 100644 (file)
@@ -65,7 +65,8 @@ class ShardedQuery(Query):
             return iter(partial)
 
     def _identity_lookup(
-            self, mapper, primary_key_identity, identity_token=None, **kw):
+            self, mapper, primary_key_identity, identity_token=None,
+            lazy_loaded_from=None, **kw):
         """override the default Query._identity_lookup method so that we
         search for a given non-token primary key identity across all
         possible identity tokens (e.g. shard ids).
@@ -79,6 +80,8 @@ class ShardedQuery(Query):
             )
         else:
             q = self.session.query(mapper)
+            if lazy_loaded_from:
+                q = q._set_lazyload_from(lazy_loaded_from)
             for shard_id in self.id_chooser(q, primary_key_identity):
                 obj = super(ShardedQuery, self)._identity_lookup(
                     mapper, primary_key_identity, identity_token=shard_id, **kw
index 56e42a7029421389f4b92d789b00f62a680958a0..e7efc172a1fb1d0c44d82e079eb8946020f4d513 100644 (file)
@@ -112,6 +112,18 @@ class Query(object):
     _current_path = _path_registry
     _has_mapper_entities = False
 
+    lazy_loaded_from = None
+    """An :class:`.InstanceState` that is using this :class:`.Query` for a
+    lazy load operation.
+
+    This can be used for extensions like the horizontal sharding extension
+    as well as event handlers and custom mapper options to determine
+    when a query is being used to lazy load a relationship on an object.
+
+    .. versionadded:: 1.2.9
+
+    """
+
     def __init__(self, entities, session=None):
         """Construct a :class:`.Query` directly.
 
@@ -260,6 +272,10 @@ class Query(object):
             for o in cols
         ]
 
+    @_generative()
+    def _set_lazyload_from(self, state):
+        self.lazy_loaded_from = state
+
     @_generative()
     def _adapt_all_clauses(self):
         self._orm_only_adapt = False
@@ -887,8 +903,8 @@ class Query(object):
             ident, loading.load_on_pk_identity)
 
     def _identity_lookup(self, mapper, primary_key_identity,
-                         identity_token=None,
-                         passive=attributes.PASSIVE_OFF):
+                         identity_token=None, passive=attributes.PASSIVE_OFF,
+                         lazy_loaded_from=None):
         """Locate an object in the identity map.
 
         Given a primary key identity, constructs an identity key and then
@@ -913,6 +929,14 @@ class Query(object):
          :func:`.loading.get_from_identity`, which impacts the behavior if
          the object is found; the object may be validated and/or unexpired
          if the flag allows for SQL to be emitted.
+        :param lazy_loaded_from: an :class:`.InstanceState` that is
+         specifically asking for this identity as a related identity.  Used
+         for sharding schemes where there is a correspondence between an object
+         and a related object being lazy-loaded (or otherwise
+         relationship-loaded).
+
+         .. versionadded:: 1.2.9
+
         :return: None if the object is not found in the identity map, *or*
          if the object was unexpired and found to have been deleted.
          if passive flags disallow SQL and the object is expired, returns
index 93288c3d619e9b58edd9a9aa816fcad9cd517ddc..d7597d3b2aeb3625281352a30351d7f0cd6a2379 100644 (file)
@@ -617,7 +617,8 @@ class LazyLoader(AbstractRelationshipLoader, util.MemoizedSlots):
             # does this, including how it decides what the correct
             # identity_token would be for this identity.
             instance = session.query()._identity_lookup(
-                self.mapper, primary_key_identity, passive=passive
+                self.mapper, primary_key_identity, passive=passive,
+                lazy_loaded_from=state
             )
 
             if instance is not None:
@@ -715,8 +716,12 @@ class LazyLoader(AbstractRelationshipLoader, util.MemoizedSlots):
         if self.use_get:
             if self._raise_on_sql:
                 self._invoke_raise_load(state, passive, "raise_on_sql")
-            return q(session)._load_on_pk_identity(
-                session.query(self.mapper), primary_key_identity)
+
+            return q(session).\
+                with_post_criteria(lambda q: q._set_lazyload_from(state)).\
+                _load_on_pk_identity(
+                    session.query(self.mapper),
+                    primary_key_identity)
 
         if self.parent_property.order_by:
             q.add_criteria(
@@ -761,7 +766,9 @@ class LazyLoader(AbstractRelationshipLoader, util.MemoizedSlots):
             q._params = params
             return q
 
-        result = q(session).with_post_criteria(set_default_params).all()
+        result = q(session).\
+            with_post_criteria(lambda q: q._set_lazyload_from(state)).\
+            with_post_criteria(set_default_params).all()
         if self.uselist:
             return result
         else:
index 69d43c92f7634a8140a0f83445a28e76f1cdb1ed..e42376921aae645d729b58498ed74db44a3e13af 100644 (file)
@@ -518,6 +518,21 @@ class AssertsExecutionResults(object):
         self.assert_sql_execution(
             db, callable_, assertsql.CountStatements(count))
 
+    def assert_multiple_sql_count(self, dbs, callable_, counts):
+        recs = [
+            (self.sql_execution_asserter(db), db, count)
+            for (db, count) in zip(dbs, counts)
+        ]
+        asserters = []
+        for ctx, db, count in recs:
+            asserters.append(ctx.__enter__())
+        try:
+            return callable_()
+        finally:
+            for asserter, (ctx, db, count) in zip(asserters, recs):
+                ctx.__exit__(None, None, None)
+                asserter.assert_(assertsql.CountStatements(count))
+
     @contextlib.contextmanager
     def assert_execution(self, db, *rules):
         with self.sql_execution_asserter(db) as asserter:
index 4b37cbd16c9c5028c854631a66d9f29f5d12cd08..00dfeb29b64a62bae248ee510321c86c1293811f 100644 (file)
@@ -304,12 +304,13 @@ class DistinctEngineShardTest(ShardTest, fixtures.TestBase):
         db3 = testing_engine('sqlite:///shard3.db')
         db4 = testing_engine('sqlite:///shard4.db')
 
-        return db1, db2, db3, db4
+        self.dbs = [db1, db2, db3, db4]
+        return self.dbs
 
-    def tearDown(self):
+    def teardown(self):
         clear_mappers()
 
-        for db in (db1, db2, db3, db4):
+        for db in self.dbs:
             db.connect().invalidate()
         for i in range(1, 5):
             os.remove("shard%d.db" % i)
@@ -425,7 +426,25 @@ class RefreshDeferExpireTest(fixtures.DeclarativeMappedTest):
         eq_(a1.data, "d1")
 
 
-class LazyLoadFromIdentityMapTest(fixtures.DeclarativeMappedTest):
+class LazyLoadIdentityKeyTest(fixtures.DeclarativeMappedTest):
+    def _init_dbs(self):
+        self.db1 = db1 = testing_engine('sqlite:///shard1.db',
+                             options=dict(pool_threadlocal=True))
+        self.db2 = db2 = testing_engine('sqlite:///shard2.db')
+
+        for db in (db1, db2):
+            self.metadata.create_all(db)
+
+        self.dbs = [db1, db2]
+
+        return self.dbs
+
+    def teardown(self):
+        for db in self.dbs:
+            db.connect().invalidate()
+        for i in range(1, 3):
+            os.remove("shard%d.db" % i)
+
     @classmethod
     def setup_classes(cls):
         Base = cls.DeclarativeBasic
@@ -433,36 +452,127 @@ class LazyLoadFromIdentityMapTest(fixtures.DeclarativeMappedTest):
         class Book(Base):
             __tablename__ = 'book'
             id = Column(Integer, primary_key=True)
+            title = Column(String(50), nullable=False)
             pages = relationship('Page', backref='book')
 
         class Page(Base):
             __tablename__ = 'page'
             id = Column(Integer, primary_key=True)
             book_id = Column(ForeignKey('book.id'))
+            title = Column(String(50))
 
-    def test_lazy_load_from_identity_map(self):
+    def _fixture(self, lazy_load_book=False, lazy_load_pages=False):
+        Book, Page = self.classes("Book", "Page")
+
+        def shard_for_book(book):
+            if book.title == "title 1":
+                return "test"
+            elif book.title == "title 2":
+                return "test2"
+            else:
+                assert False
+
+        def id_chooser(query, ident):
+            assert query.lazy_loaded_from
+            if isinstance(query.lazy_loaded_from.obj(), Book):
+                token = shard_for_book(query.lazy_loaded_from.obj())
+                assert query.lazy_loaded_from.identity_token == token
+
+            return [query.lazy_loaded_from.identity_token]
+
+        def no_query_chooser(query):
+            if query.column_descriptions[0]['type'] is Book and lazy_load_book:
+                assert isinstance(query.lazy_loaded_from.obj(), Page)
+            elif query.column_descriptions[0]['type'] is Page and lazy_load_pages:
+                assert isinstance(query.lazy_loaded_from.obj(), Book)
+
+            if query.lazy_loaded_from is None:
+                return ['test', 'test2']
+            else:
+                return [query.lazy_loaded_from.identity_token]
+
+        def shard_chooser(mapper, instance, **kw):
+            if isinstance(instance, Page):
+                return shard_for_book(instance.book)
+            else:
+                return shard_for_book(instance)
+
+        db1, db2 = self._init_dbs()
         session = ShardedSession(
-            shards={"test": testing.db},
-            shard_chooser=lambda *args: 'test',
-            id_chooser=lambda *args: ['test'],
-            query_chooser=lambda *args: ['test']
+            shards={"test": db1, "test2": db2},
+            shard_chooser=shard_chooser,
+            id_chooser=id_chooser,
+            query_chooser=no_query_chooser
         )
 
+        return session
+
+    def test_lazy_load_from_identity_map(self):
+        session = self._fixture()
+
         Book, Page = self.classes("Book", "Page")
-        book = Book()
+        book = Book(title="title 1")
         book.pages.append(Page())
 
         session.add(book)
-        session.commit()
+        session.flush()
 
-        book = session.query(Book).first()
         page = session.query(Page).first()
 
+        session.expire(page, ['book'])
+
         def go():
             eq_(page.book, book)
 
         # doesn't emit SQL
-        self.assert_sql_count(
-            testing.db,
+        self.assert_multiple_sql_count(
+            self.dbs,
             go,
-            0)
+            [0, 0])
+
+    def test_lazy_load_from_db(self):
+        session = self._fixture(lazy_load_book=True)
+
+        Book, Page = self.classes("Book", "Page")
+        book1 = Book(title="title 1")
+        book1.pages.append(Page(title="book 1 page 1"))
+
+        session.add(book1)
+        session.flush()
+
+        book1_id = inspect(book1).identity_key
+        session.expunge(book1)
+
+        book1_page = session.query(Page).first()
+        session.expire(book1_page, ['book'])
+
+        def go():
+            eq_(inspect(book1_page.book).identity_key, book1_id)
+
+        # emits one query
+        self.assert_multiple_sql_count(
+            self.dbs,
+            go,
+            [1, 0])
+
+    def test_lazy_load_no_baked_conflict(self):
+        session = self._fixture(lazy_load_pages=True)
+
+        Book, Page = self.classes("Book", "Page")
+        book1 = Book(title="title 1")
+        book1.pages.append(Page(title="book 1 page 1"))
+
+        book2 = Book(title="title 2")
+        book2.pages.append(Page(title="book 2 page 1"))
+
+        session.add(book1)
+        session.add(book2)
+        session.flush()
+
+        session.expire(book1, ['pages'])
+        session.expire(book2, ['pages'])
+
+        eq_(book1.pages[0].title, "book 1 page 1")
+
+        # second lazy load uses correct state
+        eq_(book2.pages[0].title, "book 2 page 1")