]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Various fixes related to overlapping and collapsed entry caching.
authorAlex Rousskov <rousskov@measurement-factory.com>
Tue, 25 Jun 2013 16:06:37 +0000 (10:06 -0600)
committerAlex Rousskov <rousskov@measurement-factory.com>
Tue, 25 Jun 2013 16:06:37 +0000 (10:06 -0600)
Wrote Transients description, replacing an irrelevant copy-pasted comment.
Maintain proper transient entry locks, distinguishing reading and writing
cases.

Fixed transients synchronization logic. Store::get() must not return
incomplete from-cache entries, except for local or transient ones. Otherwise,
the returned entry will not be updated when its remote writer makes changes.

Marked entries fully loaded from the shared memory cache as STORE_OK.

Avoid caching ENTRY_SPECIAL in the shared memory cache for now. This is not
strictly necessary, I think, but it simplifies shared caching log when
triaging start-test-analyze test cases. The restriction can be removed
when ENTRY_SPECIAL generation code becomes shared cache-aware, for example.

Fixed copy-paste error in Transients::disconnect().

Changed CollapsedForwarding::Broadcast() profile in preparation for excluding
broadcasts for entries without remote readers.

Do not purge entire cache entries just because we have to trim their RAM
footprint. The old code assumed that non-swappable entries may not have any
other stored content (which is no longer correct because they may still reside
in the shared memory cache) so it almost made sense to purge them, but it is
possible for clients to use partial in-RAM data when serving range requests,
so we should not be purging unless there are other reasons to do that. This
may expose client-side bugs if the hit validation code is not checking for RAM
entries being incomplete.

Allow MemObject::trimUnSwappable() to be called when there is nothing to trim.
This used to be a special case in StoreEntry::trimMemory(), but we do not need
it anymore after the above change.

Added transient and shared memory indexes to StoreEntry debugging summaries.

12 files changed:
src/CollapsedForwarding.cc
src/CollapsedForwarding.h
src/MemObject.cc
src/MemObject.h
src/MemStore.cc
src/Store.h
src/SwapDir.h
src/Transients.cc
src/Transients.h
src/fs/rock/RockSwapDir.cc
src/store.cc
src/store_dir.cc

index c50139369363217c95e9c47bf9808fdbc010c5ce..629327d5465024493ea8b704383ec6c80bb63654 100644 (file)
@@ -45,16 +45,17 @@ CollapsedForwarding::Init()
 }
 
 void
-CollapsedForwarding::Broadcast(const cache_key *key)
+CollapsedForwarding::Broadcast(const StoreEntry &e)
 {
     if (!queue.get())
         return;
 
     CollapsedForwardingMsg msg;
     msg.sender = KidIdentifier;
-    memcpy(msg.key, key, sizeof(msg.key));
+    memcpy(msg.key, e.key, sizeof(msg.key));
 
-    debugs(17, 5, storeKeyText(key) << " to " << Config.workers << "-1 workers");
+    debugs(17, 5, storeKeyText(static_cast<cache_key*>(e.key)) << " to " <<
+           Config.workers << "-1 workers");
 
     // TODO: send only to workers who are waiting for data
     for (int workerId = 1; workerId <= Config.workers; ++workerId) {
index f5e42757fb670d8fad8fbebb03c8e595f33d4619..88c4745126d8848aace767fa620887bb08ad816f 100644 (file)
@@ -12,7 +12,7 @@
 
 #include <memory>
 
-class StoreIOState;
+class StoreEntry;
 
 /// Sends and handles collapsed forwarding notifications.
 class CollapsedForwarding
@@ -22,7 +22,7 @@ public:
     static void Init();
 
     /// notify other workers about changes in entry state (e.g., new data)
-    static void Broadcast(const cache_key *key);
+    static void Broadcast(const StoreEntry &e);
 
     /// kick worker with empty IPC queue
     static void Notify(const int workerId);
index 0902ff820b4631c26bf71a84f9085bdeb0cd4411..219c447ec3180ef00666d9c9cdf9b012bc39a91c 100644 (file)
@@ -421,11 +421,11 @@ MemObject::trimSwappable()
 void
 MemObject::trimUnSwappable()
 {
-    int64_t new_mem_lo = policyLowestOffsetToKeep(0);
-    assert (new_mem_lo > 0);
-
-    data_hdr.freeDataUpto(new_mem_lo);
-    inmem_lo = new_mem_lo;
+    if (const int64_t new_mem_lo = policyLowestOffsetToKeep(false)) {
+        assert (new_mem_lo > 0);
+        data_hdr.freeDataUpto(new_mem_lo);
+        inmem_lo = new_mem_lo;
+    } // else we should not trim anything at this time
 }
 
 bool
index c35d602aa7e7f72796ec29421b8f1e5450abb51b..36d3390c3d5af6b6225476b6b9757d4c7fad7630 100644 (file)
@@ -139,12 +139,16 @@ public:
 
     SwapOut swapout;
 
+    /// cache "I/O" direction and status
+    typedef enum { ioUndecided, ioWriting, ioReading, ioDone } Io;
+
     /// State of an entry with regards to the [shared] in-transit table.
     class XitTable {
     public:
-        XitTable(): index(-1) {}
+        XitTable(): index(-1), io(ioUndecided) {}
 
         int32_t index; ///< entry position inside the in-transit table
+        Io io; ///< current I/O state
     };
     XitTable xitTable; ///< current [shared] memory caching state for the entry
 
@@ -156,8 +160,6 @@ public:
         int32_t index; ///< entry position inside the memory cache
         int64_t offset; ///< bytes written/read to/from the memory cache so far
         
-        /// I/O direction and status
-        typedef enum { ioUndecided, ioWriting, ioReading, ioDone } Io;
         Io io; ///< current I/O state
     };
     MemCache memCache; ///< current [shared] memory caching state for the entry
index 9c3d1c081ec9b20523fa258f96a590884448f4d6..cc1aa0b57ddde2d83133f518f02b9ca9b817900b 100644 (file)
@@ -200,11 +200,6 @@ MemStore::get(const cache_key *key)
 
     const bool copied = copyFromShm(*e, index, *slot);
 
-    // we copied everything we could to local memory; no more need to lock
-    map->closeForReading(index);
-    e->mem_obj->memCache.index = -1;
-    e->mem_obj->memCache.io = MemObject::MemCache::ioDone;
-
     if (copied) {
         e->hashInsert(key);
         return e;
@@ -301,7 +296,7 @@ MemStore::anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc
 
     MemObject::MemCache &mc = e.mem_obj->memCache;
     mc.index = index;
-    mc.io = MemObject::MemCache::ioReading;
+    mc.io = MemObject::ioReading;
 }
 
 /// copies the entire entry from shared to local memory
@@ -363,14 +358,19 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc
         return true;
     }
 
-    e.mem_obj->object_sz = e.mem_obj->endOffset(); // from StoreEntry::complete()
     debugs(20, 7, "mem-loaded all " << e.mem_obj->object_sz << '/' <<
            anchor.basics.swap_file_sz << " bytes of " << e);
+
+    // from StoreEntry::complete()
+    e.mem_obj->object_sz = e.mem_obj->endOffset();
+    e.store_status = STORE_OK;
+
     assert(e.mem_obj->object_sz >= 0);
     assert(static_cast<uint64_t>(e.mem_obj->object_sz) == anchor.basics.swap_file_sz);
     // would be nice to call validLength() here, but it needs e.key
 
-    // XXX: unlock acnhor here!
+    // we read the entire response into the local memory; no more need to lock
+    disconnect(*e.mem_obj);
     return true;
 }
 
@@ -454,6 +454,11 @@ MemStore::shouldCache(const StoreEntry &e) const
         return false;
     }
 
+    if (EBIT_TEST(e.flags, ENTRY_SPECIAL)) {
+        debugs(20, 5, HERE << "Not mem-caching ENTRY_SPECIAL " << e);
+        return false;
+    }
+
     return true;
 }
 
@@ -470,7 +475,7 @@ MemStore::startCaching(StoreEntry &e)
 
     assert(e.mem_obj);
     e.mem_obj->memCache.index = index;
-    e.mem_obj->memCache.io = MemObject::MemCache::ioWriting;
+    e.mem_obj->memCache.io = MemObject::ioWriting;
     slot->set(e);
     map->startAppending(index);
     return true;
@@ -634,20 +639,20 @@ MemStore::write(StoreEntry &e)
     debugs(20, 7, "entry " << e);
 
     switch (e.mem_obj->memCache.io) {
-    case MemObject::MemCache::ioUndecided:
+    case MemObject::ioUndecided:
         if (!shouldCache(e) || !startCaching(e)) {
-            e.mem_obj->memCache.io = MemObject::MemCache::ioDone;
+            e.mem_obj->memCache.io = MemObject::ioDone;
             Store::Root().transientsAbandon(e);
-            CollapsedForwarding::Broadcast(static_cast<cache_key*>(e.key));
+            CollapsedForwarding::Broadcast(e);
             return;
         }
         break;
   
-    case MemObject::MemCache::ioDone:
-    case MemObject::MemCache::ioReading:
+    case MemObject::ioDone:
+    case MemObject::ioReading:
         return; // we should not write in all of the above cases
 
-    case MemObject::MemCache::ioWriting:
+    case MemObject::ioWriting:
         break; // already decided to write and still writing
     }
 
@@ -655,7 +660,8 @@ MemStore::write(StoreEntry &e)
         copyToShm(e);
         if (e.store_status == STORE_OK) // done receiving new content
             completeWriting(e);
-        CollapsedForwarding::Broadcast(static_cast<cache_key*>(e.key));
+        else
+            CollapsedForwarding::Broadcast(e);
         return;
     }
     catch (const std::exception &x) { // TODO: should we catch ... as well?
@@ -665,7 +671,7 @@ MemStore::write(StoreEntry &e)
 
     Store::Root().transientsAbandon(e);
     disconnect(*e.mem_obj);
-    CollapsedForwarding::Broadcast(static_cast<cache_key*>(e.key));
+    CollapsedForwarding::Broadcast(e);
 }
 
 void
@@ -679,8 +685,11 @@ MemStore::completeWriting(StoreEntry &e)
     debugs(20, 5, "mem-cached all " << e.mem_obj->memCache.offset << " bytes of " << e);
 
     e.mem_obj->memCache.index = -1;
-    e.mem_obj->memCache.io = MemObject::MemCache::ioDone;
+    e.mem_obj->memCache.io = MemObject::ioDone;
     map->closeForWriting(index, false);
+
+    CollapsedForwarding::Broadcast(e); // before we close our transient entry!
+    Store::Root().transientsCompleteWriting(e);
 }
 
 void
@@ -703,14 +712,14 @@ void
 MemStore::disconnect(MemObject &mem_obj)
 {
     if (mem_obj.memCache.index >= 0) {
-        if (mem_obj.memCache.io == MemObject::MemCache::ioWriting) {
+        if (mem_obj.memCache.io == MemObject::ioWriting) {
             map->abortWriting(mem_obj.memCache.index);
         } else {
-            assert(mem_obj.memCache.io == MemObject::MemCache::ioReading);
+            assert(mem_obj.memCache.io == MemObject::ioReading);
             map->closeForReading(mem_obj.memCache.index);
         }
         mem_obj.memCache.index = -1;
-        mem_obj.memCache.io = MemObject::MemCache::ioDone;
+        mem_obj.memCache.io = MemObject::ioDone;
     }
 }
 
index 27b2df58dab04d2784f88e3dc59673e5afc3ef8c..d2131fe38805ac75ef83299172b3c8a2680d9b4f 100644 (file)
@@ -377,6 +377,11 @@ public:
     /// makes the entry available for collapsing future requests
     virtual void allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod) {}
 
+    // XXX: This method belongs to Store::Root/StoreController, but it is here
+    // to avoid casting Root() to StoreController until Root() API is fixed.
+    /// marks the entry completed for collapsed requests
+    virtual void transientsCompleteWriting(StoreEntry &e) {}
+
     // XXX: This method belongs to Store::Root/StoreController, but it is here
     // to avoid casting Root() to StoreController until Root() API is fixed.
     /// Update local intransit entry after changes made by appending worker.
index 9e1c89108446cc1852f8d18d28c35a4bfdfecc31..fc2ed8f74936019771ea6ad288a5ea58f1fffbe8 100644 (file)
@@ -62,6 +62,7 @@ public:
 
     /* Store parent API */
     virtual void handleIdleEntry(StoreEntry &e);
+    virtual void transientsCompleteWriting(StoreEntry &e);
     virtual void transientsAbandon(StoreEntry &e);
     virtual void transientsDisconnect(MemObject &mem_obj);
     virtual void memoryOut(StoreEntry &e, const bool preserveSwappable);
@@ -101,6 +102,7 @@ public:
 private:
     void createOneStore(Store &aStore);
     bool keepForLocalMemoryCache(const StoreEntry &e) const;
+    bool anchorCollapsed(StoreEntry &collapsed, bool &inSync);
     bool anchorCollapsedOnDisk(StoreEntry &collapsed, bool &inSync);
 
     StorePointer swapDir; ///< summary view of all disk caches
index f891430aa3f80885094c252d6828aa0ab98578c0..340f2ee599f2e40a3ac9ca75b3f7f559a6423dcf 100644 (file)
@@ -177,6 +177,7 @@ Transients::copyFromShm(const sfileno index)
 
     assert(e->mem_obj);
     e->mem_obj->method = extras.reqMethod;
+    e->mem_obj->xitTable.io = MemObject::ioReading;
     e->mem_obj->xitTable.index = index;
 
     // XXX: overwriting storeCreateEntry() which calls setPrivateKey() if
@@ -201,7 +202,7 @@ Transients::get(String const key, STOREGETCLIENT aCallback, void *aCallbackData)
 }
 
 void
-Transients::put(StoreEntry *e, const RequestFlags &reqFlags,
+Transients::startWriting(StoreEntry *e, const RequestFlags &reqFlags,
                 const HttpRequestMethod &reqMethod)
 {
     assert(e);
@@ -223,6 +224,7 @@ Transients::put(StoreEntry *e, const RequestFlags &reqFlags,
     try {
         if (copyToShm(*e, index, reqFlags, reqMethod)) {
             slot->set(*e);
+            e->mem_obj->xitTable.io = MemObject::ioWriting;
             e->mem_obj->xitTable.index = index;
             map->startAppending(index);
             // keep write lock -- we will be supplying others with updates
@@ -293,15 +295,31 @@ Transients::abandonedAt(const sfileno index) const
     return map->readableEntry(index).waitingToBeFreed;
 }
 
+void
+Transients::completeWriting(const StoreEntry &e)
+{
+    if (e.mem_obj && e.mem_obj->xitTable.index >= 0) {
+        assert(e.mem_obj->xitTable.io == MemObject::ioWriting);
+        map->closeForWriting(e.mem_obj->xitTable.index);
+        e.mem_obj->xitTable.index = -1;
+        e.mem_obj->xitTable.io = MemObject::ioDone;
+    }
+}
+
 void
 Transients::disconnect(MemObject &mem_obj)
 {
-    assert(mem_obj.xitTable.index >= 0 && map);
-    map->freeEntry(mem_obj.xitTable.index); // just marks the locked entry
-    mem_obj.xitTable.index = -1;
-    // We do not unlock the entry now because the problem is most likely with
-    // the server resource rather than a specific cache writer, so we want to
-    // prevent other readers from collapsing requests for that resource.
+    if (mem_obj.xitTable.index >= 0) {
+        assert(map);
+        if (mem_obj.xitTable.io == MemObject::ioWriting) {
+            map->abortWriting(mem_obj.xitTable.index);
+        } else {
+            assert(mem_obj.xitTable.io == MemObject::ioReading);
+            map->closeForReading(mem_obj.xitTable.index);
+        }
+        mem_obj.xitTable.index = -1;
+        mem_obj.xitTable.io = MemObject::ioDone;
+    }
 }
 
 /// calculates maximum number of entries we need to store and map
index 29a40fe34966454c87b7b117ada266772228552b..12d784d2b5d2e0b5781eb456f382af1c3f6aeb28 100644 (file)
@@ -15,8 +15,9 @@ struct TransientsMapExtras {
 };
 typedef Ipc::StoreMapWithExtras<TransientsMapExtras> TransientsMap;
 
-/// Stores HTTP entities in RAM. Current implementation uses shared memory.
-/// Unlike a disk store (SwapDir), operations are synchronous (and fast).
+/// Keeps track of hits being delivered to clients that arrived before those
+/// hits were [fully] cached. This shared table is necessary to synchronize hit
+/// caching (writing) workers with other workers serving (reading) those hits.
 class Transients: public Store, public Ipc::StoreMapCleaner
 {
 public:
@@ -24,7 +25,10 @@ public:
     virtual ~Transients();
 
     /// add an in-transit entry suitable for collapsing future requests
-    void put(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod);
+    void startWriting(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod);
+
+    /// called when the in-transit entry has been successfully cached
+    void completeWriting(const StoreEntry &e);
 
     /// the calling entry writer no longer expects to cache this entry
     void abandon(const StoreEntry &e);
@@ -32,7 +36,7 @@ public:
     /// whether an in-transit entry is now abandoned by its writer
     bool abandoned(const StoreEntry &e) const;
 
-    /// the calling entry writer no longer expects to cache this entry
+    /// the caller is done writing or reading this entry
     void disconnect(MemObject &mem_obj);
 
     /* Store API */
index 9eadf82616a3d01024b7f3052179c23e4c27ccc9..60f41c7271b6ceccb581b14a1334061ba5206a20 100644 (file)
@@ -821,7 +821,7 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest
         // and hope that Core will call disconnect() to close the map entry
     }
 
-    CollapsedForwarding::Broadcast(static_cast<const cache_key*>(sio.e->key));
+    CollapsedForwarding::Broadcast(*sio.e);
 }
 
 void
index 321fecc30bc48e7a1f734fd0b20aa1f164ad6ff3..d4bbef8d9ebf13264c811964daea7f48fa53a413 100644 (file)
@@ -1848,21 +1848,12 @@ StoreEntry::trimMemory(const bool preserveSwappable)
     if (EBIT_TEST(flags, ENTRY_SPECIAL))
         return; // cannot trim because we do not load them again
 
-    if (!preserveSwappable) {
-        if (mem_obj->policyLowestOffsetToKeep(0) == 0) {
-            /* Nothing to do */
-            return;
-        }
-        /*
-         * Its not swap-able, and we're about to delete a chunk,
-         * so we must make it PRIVATE.  This is tricky/ugly because
-         * for the most part, we treat swapable == cachable here.
-         */
-        releaseRequest();
-        mem_obj->trimUnSwappable ();
-    } else {
-        mem_obj->trimSwappable ();
-    }
+    if (preserveSwappable)
+        mem_obj->trimSwappable();
+    else
+        mem_obj->trimUnSwappable();
+
+    debugs(88, 7, *this << " inmem_lo=" << mem_obj->inmem_lo);
 }
 
 bool
@@ -1986,8 +1977,14 @@ std::ostream &operator <<(std::ostream &os, const StoreEntry &e)
 {
     os << "e:";
 
+    if (e.mem_obj) {
+        if (e.mem_obj->xitTable.index > -1)
+            os << 't' << e.mem_obj->xitTable.index;
+        if (e.mem_obj->memCache.index > -1)
+            os << 'm' << e.mem_obj->memCache.index;
+    }
     if (e.swap_filen > -1 || e.swap_dirn > -1)
-        os << e.swap_filen << '@' << e.swap_dirn;
+        os << 'd' << e.swap_filen << '@' << e.swap_dirn;
 
     os << '=';
 
@@ -2006,7 +2003,7 @@ std::ostream &operator <<(std::ostream &os, const StoreEntry &e)
     if (e.flags) {
         if (EBIT_TEST(e.flags, ENTRY_SPECIAL)) os << 'S';
         if (EBIT_TEST(e.flags, ENTRY_REVALIDATE)) os << 'R';
-        if (EBIT_TEST(e.flags, DELAY_SENDING)) os << 'T';
+        if (EBIT_TEST(e.flags, DELAY_SENDING)) os << 'P';
         if (EBIT_TEST(e.flags, RELEASE_REQUEST)) os << 'X';
         if (EBIT_TEST(e.flags, REFRESH_REQUEST)) os << 'F';
         if (EBIT_TEST(e.flags, ENTRY_CACHABLE)) os << 'C';
index 0835d80f28f3415833f196b304a68f5024b42469..f1197c2ca30fd0aac91696cadd9f9592d2910ecd 100644 (file)
@@ -761,6 +761,20 @@ StoreController::get(const cache_key *key)
         return e;
     }
 
+    // Must search transients before caches because we must sync those we find.
+    if (transients) {
+        if (StoreEntry *e = transients->get(key)) {
+            debugs(20, 3, "got shared in-transit entry: " << *e);
+            bool inSync = false;
+            const bool found = anchorCollapsed(*e, inSync);
+            if (!found || inSync)
+                return e;
+            assert(!e->locked()); // ensure release will destroyStoreEntry()
+            e->release(); // do not let others into the same trap
+            return NULL;
+        }
+    }
+
     if (memStore) {
         if (StoreEntry *e = memStore->get(key)) {
             debugs(20, 3, HERE << "got mem-cached entry: " << *e);
@@ -792,16 +806,6 @@ StoreController::get(const cache_key *key)
     debugs(20, 4, HERE << "none of " << Config.cacheSwap.n_configured <<
            " cache_dirs have " << storeKeyText(key));
 
-    // Last, check shared in-transit table if enabled.
-    // We speculate that collapsed forwarding hits are less frequent than
-    // proper cache hits checked above (the order does not matter for misses).
-    if (transients) {
-        if (StoreEntry *e = transients->get(key)) {
-            debugs(20, 3, "got shared in-transit entry: " << *e);
-            return e;
-        }
-    }
-
     return NULL;
 }
 
@@ -904,6 +908,16 @@ StoreController::transientsAbandon(StoreEntry &e)
     }
 }
 
+void
+StoreController::transientsCompleteWriting(StoreEntry &e)
+{
+    if (transients) {
+        assert(e.mem_obj);
+        if (e.mem_obj->xitTable.index >= 0)
+            transients->completeWriting(e);
+    }
+}
+
 void
 StoreController::transientsDisconnect(MemObject &mem_obj)
 {
@@ -954,7 +968,7 @@ StoreController::allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags,
 {
     e->makePublic(); // this is needed for both local and SMP collapsing
     if (transients)
-        transients->put(e, reqFlags, reqMethod);
+        transients->startWriting(e, reqFlags, reqMethod);
     debugs(20, 3, "may " << (transients && e->mem_obj->xitTable.index >= 0 ?
            "SMP-" : "locally-") << "collapse " << *e);
 }
@@ -963,15 +977,34 @@ void
 StoreController::syncCollapsed(const cache_key *key)
 {
     StoreEntry *collapsed = swapDir->get(key);
-    if (!collapsed) // the entry is no longer locally active, ignore the update
+    if (!collapsed) { // the entry is no longer locally active, ignore update
+        debugs(20, 7, "not SMP-syncing not-local " << storeKeyText(key));
         return;
+    }
+
+    if (!collapsed->mem_obj) {
+        // without mem_obj, the entry cannot be transient so we ignore it
+        debugs(20, 7, "not SMP-syncing not-shared " << *collapsed);
+        return;
+    }
+
+    if (collapsed->mem_obj->xitTable.index < 0) {
+        debugs(20, 7, "not SMP-syncing not-transient " << *collapsed);
+        return;
+    }
+
+    assert(transients);
+    if (transients->abandoned(*collapsed)) {
+        debugs(20, 3, "aborting abandoned " << *collapsed);
+        collapsed->abort();
+        return;
+    }
 
-    if (collapsed->mem_obj && !collapsed->mem_obj->smpCollapsed) {
+    if (!collapsed->mem_obj->smpCollapsed) {
         // this happens, e.g., when we tried collapsing but rejected the hit
-        debugs(20, 7, "not SMP-syncing not SMP-collapsed " << *collapsed);
+        debugs(20, 7, "not SMP-syncing not-SMP-collapsed " << *collapsed);
         return;
     }
-    // XXX: collapsed->mem_obj may be still hidden here
 
     debugs(20, 7, "syncing " << *collapsed);
 
@@ -984,23 +1017,51 @@ StoreController::syncCollapsed(const cache_key *key)
         found = true;
         inSync = collapsed->store()->updateCollapsed(*collapsed);
     } else {
-        if (memStore)
-            found = memStore->anchorCollapsed(*collapsed, inSync);
-        else if (Config.cacheSwap.n_configured)
-            found = anchorCollapsedOnDisk(*collapsed, inSync);
+        found = anchorCollapsed(*collapsed, inSync);
     }
 
     if (inSync) {
         debugs(20, 5, "synced " << *collapsed);
         collapsed->invokeHandlers();
     } else if (found) { // unrecoverable problem syncing this entry
-        debugs(20, 3, "aborting " << *collapsed);
+        debugs(20, 3, "aborting unsyncable " << *collapsed);
         collapsed->abort();
     } else { // the entry is still not in one of the caches
         debugs(20, 7, "waiting " << *collapsed);
     }
 }
 
+/// Called for in-transit entries that are not yet anchored to a cache.
+/// For cached entries, return true after synchronizing them with their cache
+/// (making inSync true on success). For not-yet-cached entries, return false.
+bool
+StoreController::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
+{
+    // this method is designed to work with collapsed transients only
+    assert(collapsed.mem_obj);
+    assert(collapsed.mem_obj->xitTable.index >= 0);
+    assert(collapsed.mem_obj->smpCollapsed);
+
+    debugs(20, 7, "anchoring " << collapsed);
+
+    bool found = false;
+    if (memStore)
+        found = memStore->anchorCollapsed(collapsed, inSync);
+    else if (Config.cacheSwap.n_configured)
+        found = anchorCollapsedOnDisk(collapsed, inSync);
+
+    if (found) {
+        if (inSync)
+            debugs(20, 7, "anchored " << collapsed);
+        else
+            debugs(20, 5, "failed to anchor " << collapsed);
+    } else {
+        debugs(20, 7, "skipping not yet cached " << collapsed);
+    }
+
+    return found;
+}
+
 StoreHashIndex::StoreHashIndex()
 {
     if (store_table)