--- /dev/null
+.. change::
+ :tags: bug, ext
+ :tickets: 4247
+
+ The horizontal sharding extension now makes use of the identity token
+ added to ORM identity keys as part of :ticket:`4137`, when an object
+ refresh or column-based deferred load or unexpiration operation occurs.
+ Since we know the "shard" that the object originated from, we make
+ use of this value when refreshing, thereby avoiding queries against
+ other shards that don't match this object's identity in any case.
\ No newline at end of file
self._params)
return self.instances(result, context)
- if self._shard_id is not None:
+ if context.identity_token is not None:
+ return iter_for_shard(context.identity_token)
+ elif self._shard_id is not None:
return iter_for_shard(self._shard_id)
else:
partial = []
if key is not None:
ident = key[1]
+ identity_token = key[2]
else:
- ident = None
+ ident = identity_token = None
return load_on_pk_identity(
query, ident, refresh_state=refresh_state,
with_for_update=with_for_update,
- only_load_props=only_load_props
+ only_load_props=only_load_props,
+ identity_token=identity_token
)
def load_on_pk_identity(query, primary_key_identity,
refresh_state=None, with_for_update=None,
- only_load_props=None):
+ only_load_props=None, identity_token=None):
"""Load the given primary key identity from the database."""
populate_existing=bool(refresh_state),
version_check=version_check,
only_load_props=only_load_props,
- refresh_state=refresh_state)
+ refresh_state=refresh_state,
+ identity_token=identity_token)
q._order_by = None
try:
_autoflush = True
_only_load_props = None
_refresh_state = None
+ _refresh_identity_token = None
_from_obj = ()
_join_entities = ()
_select_from_entity = None
def _get_options(self, populate_existing=None,
version_check=None,
only_load_props=None,
- refresh_state=None):
+ refresh_state=None,
+ identity_token=None):
if populate_existing:
self._populate_existing = populate_existing
if version_check:
self._refresh_state = refresh_state
if only_load_props:
self._only_load_props = set(only_load_props)
+ if identity_token:
+ self._refresh_identity_token = identity_token
return self
def _clone(self):
self.propagate_options = set(o for o in query._with_options if
o.propagate_to_loaders)
self.attributes = query._attributes.copy()
- self.identity_token = None
+ if self.refresh_state is not None:
+ self.identity_token = query._refresh_identity_token
+ else:
+ self.identity_token = None
class AliasOption(interfaces.MapperOption):
result = session.query(Book).options(selectinload('pages')).all()
eq_(result, [book])
+class RefreshDeferExpireTest(fixtures.DeclarativeMappedTest):
+ @classmethod
+ def setup_classes(cls):
+ Base = cls.DeclarativeBasic
+
+ class A(Base):
+ __tablename__ = 'a'
+ id = Column(Integer, primary_key=True)
+ data = Column(String(30))
+ deferred_data = deferred(Column(String(30)))
+
+ @classmethod
+ def insert_data(cls):
+ A = cls.classes.A
+ s = Session()
+ s.add(A(data='d1', deferred_data='d2'))
+ s.commit()
+
+ def _session_fixture(self):
+
+ return ShardedSession(
+ shards={
+ "main": testing.db,
+ },
+ shard_chooser=lambda *args: 'main',
+ id_chooser=lambda *args: ['fake', 'main'],
+ query_chooser=lambda *args: ['fake', 'main']
+ )
+
+ def test_refresh(self):
+ A = self.classes.A
+ session = self._session_fixture()
+ a1 = session.query(A).set_shard("main").first()
+
+ session.refresh(a1)
+
+ def test_deferred(self):
+ A = self.classes.A
+ session = self._session_fixture()
+ a1 = session.query(A).set_shard("main").first()
+
+ eq_(a1.deferred_data, "d2")
+
+ def test_unexpire(self):
+ A = self.classes.A
+ session = self._session_fixture()
+ a1 = session.query(A).set_shard("main").first()
+
+ session.expire(a1)
+ eq_(a1.data, "d1")
+
class LazyLoadFromIdentityMapTest(fixtures.DeclarativeMappedTest):
@classmethod