]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Various shared memory-based collapsed forwarding improvements and fixes.
authorAlex Rousskov <rousskov@measurement-factory.com>
Sat, 22 Jun 2013 15:24:34 +0000 (09:24 -0600)
committerAlex Rousskov <rousskov@measurement-factory.com>
Sat, 22 Jun 2013 15:24:34 +0000 (09:24 -0600)
Lock transient entries while in use. Transient entry presence is used
used to detect collapsed entry aborts for not-yet-cached entries.

Store current transient locks and memory cache entry state in MemObject. Why
not in StoreEntry like the disk cache does?  To avoid penalizing those Stores
that keep idle StoreEntries in RAM.

Mark collapsing entries specially (in MemObject) so that we can stop updating
(un-tie) local entries that tried to collapse but did not like the collapsed
hit object that they started to get from another worker. When this happens,
the client side creates a new StoreEntry, but without a flag Store cannot tell
whether that entry needs to be kept in sync with the collapsed writer because
both the old entry and the new one have the same key. We may eventually find
a better way to distinguish the two cases.

Do not require MemObjects to be disassociated from various caches during
shutdown because Squid is currently incapable of maintaining Store::Root()
during shutdown.

Support incremental shared memory caching. Maintain and honor the
ENTRY_FWD_HDR_WAIT flag. Maintain shared memory cache reading/writing states.

Better updates of collapsed entries. Detect aborted entries. Do not release
entries that are not yet cached anywhere at the update time.

Polished entry debugging.

Bug 3480 fix.

Added XXXs to mark new problems and old bugs.

17 files changed:
src/CollapsedForwarding.h
src/MemObject.cc
src/MemObject.h
src/MemStore.cc
src/MemStore.h
src/Store.h
src/SwapDir.h
src/Transients.cc
src/Transients.h
src/fs/rock/RockIoState.cc
src/fs/rock/RockSwapDir.cc
src/fs/rock/RockSwapDir.h
src/store.cc
src/store_client.cc
src/store_dir.cc
src/store_swapout.cc
src/tests/stub_MemObject.cc

index 6af07a82cf674594c50775c90824bedeb058e28f..f5e42757fb670d8fad8fbebb03c8e595f33d4619 100644 (file)
@@ -21,10 +21,7 @@ public:
     /// open shared memory segment
     static void Init();
 
-    /// XXX: remove
-    static void NewData(const StoreIOState &sio);
-
-    /// notify other workers that new data is available
+    /// notify other workers about changes in entry state (e.g., new data)
     static void Broadcast(const cache_key *key);
 
     /// kick worker with empty IPC queue
index 7620d1195eecfe72cf9bd4e6a4b378ecc29a004d..c25bf61284b11f7c137b731ecaa42af4d142ddfd 100644 (file)
@@ -83,7 +83,8 @@ MemObject::resetUrls(char const *aUrl, char const *aLog_url)
     url = xstrdup(aUrl);
 }
 
-MemObject::MemObject(char const *aUrl, char const *aLog_url): mem_index(-1)
+MemObject::MemObject(char const *aUrl, char const *aLog_url):
+    smpCollapsed(false)
 {
     debugs(20, 3, HERE << "new MemObject " << this);
     _reply = new HttpReply;
@@ -115,9 +116,19 @@ MemObject::~MemObject()
     assert(chksum == url_checksum(url));
 #endif
 
-    assert(mem_index < 0);
-    if (!shutting_down)
+    if (!shutting_down) { // Store::Root() is FATALly missing during shutdown
+        // TODO: Consider moving these to destroyMemoryObject while providing
+        // StoreEntry::memObjForDisconnect() or similar to get access to the
+        // hidden memory object
+        if (xitTable.index >= 0)
+            Store::Root().transientsDisconnect(*this);
+        if (memCache.index >= 0)
+            Store::Root().memoryDisconnect(*this);
+
+        assert(xitTable.index < 0);
+        assert(memCache.index < 0);
         assert(swapout.sio == NULL);
+    }
 
     data_hdr.freeContent();
 
index 5c9af1868b2864420e8dbeffc3e8e796901e25f7..7927825a3636986c2ceb73e480d032ae8c15e1e4 100644 (file)
@@ -125,6 +125,31 @@ public:
 
     SwapOut swapout;
 
+    /// State of an entry with regards to the [shared] in-transit table.
+    class XitTable {
+    public:
+        XitTable(): index(-1) {}
+
+        int32_t index; ///< entry position inside the in-transit table
+    };
+    XitTable xitTable; ///< current [shared] memory caching state for the entry
+
+    /// State of an entry with regards to the [shared] memory caching.
+    class MemCache {
+    public:
+        MemCache(): index(-1), offset(0), io(ioUndecided) {}
+
+        int32_t index; ///< entry position inside the memory cache
+        int64_t offset; ///< bytes written/read to/from the memory cache so far
+        
+        /// I/O direction and status
+        typedef enum { ioUndecided, ioWriting, ioReading, ioDone } Io;
+        Io io; ///< current I/O state
+    };
+    MemCache memCache; ///< current [shared] memory caching state for the entry
+
+    bool smpCollapsed; ///< whether this entry gets data from another worker
+
     /* Read only - this reply must be preserved by store clients */
     /* The original reply. possibly with updated metadata. */
     HttpRequest *request;
@@ -139,7 +164,6 @@ public:
     } abort;
     char *log_url;
     RemovalPolicyNode repl;
-    int32_t mem_index; ///< entry position inside the [shared] memory cache
     int id;
     int64_t object_sz;
     size_t swap_hdr_sz;
index 8a9d96998f5f2918f6e7494bb6cdea2f352fa0e5..176d4e673b9a998e4dae3beb9196ed286ab3727e 100644 (file)
@@ -204,7 +204,8 @@ MemStore::get(const cache_key *key)
 
     // we copied everything we could to local memory; no more need to lock
     map->closeForReading(index);
-    e->mem_obj->mem_index = -1;
+    e->mem_obj->memCache.index = -1;
+    e->mem_obj->memCache.io = MemObject::MemCache::ioDone;
 
     e->hideMemObject();
 
@@ -226,7 +227,7 @@ MemStore::get(String const key, STOREGETCLIENT aCallback, void *aCallbackData)
 }
 
 bool
-MemStore::anchorCollapsed(StoreEntry &collapsed)
+MemStore::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
 {
     if (!map)
         return false;
@@ -238,24 +239,26 @@ MemStore::anchorCollapsed(StoreEntry &collapsed)
         return false;
 
     anchorEntry(collapsed, index, *slot);
-    return updateCollapsedWith(collapsed, index, *slot);
+    inSync = updateCollapsedWith(collapsed, index, *slot);
+    return true; // even if inSync is false
 }
 
 bool
 MemStore::updateCollapsed(StoreEntry &collapsed)
 {
-    if (!map)
-        return false;
+    assert(collapsed.mem_status == IN_MEMORY);
+    MemObject *mem_obj = collapsed.findMemObject();
+    assert(mem_obj);
 
-    if (collapsed.mem_status != IN_MEMORY) // no longer using a memory cache
-        return false;
-
-    const sfileno index = collapsed.mem_obj->mem_index; 
+    const sfileno index = mem_obj->memCache.index; 
 
     // already disconnected from the cache, no need to update
     if (index < 0) 
         return true;
 
+    if (!map)
+        return false;
+
     const Ipc::StoreMapAnchor &anchor = map->readableEntry(index);
     return updateCollapsedWith(collapsed, index, anchor);
 }
@@ -285,9 +288,14 @@ MemStore::anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc
     e.flags = basics.flags;
 
     assert(e.mem_obj);
-    e.store_status = STORE_OK;
+    if (anchor.complete()) {
+        e.store_status = STORE_OK;
+        e.mem_obj->object_sz = e.swap_file_sz;
+    } else {
+        e.store_status = STORE_PENDING;
+        assert(e.mem_obj->object_sz < 0);
+    }
     e.setMemStatus(IN_MEMORY);
-    e.mem_obj->mem_index = index;
     assert(e.swap_status == SWAPOUT_NONE); // set in StoreEntry constructor
     e.ping_status = PING_NONE;
 
@@ -295,6 +303,10 @@ MemStore::anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc
     EBIT_CLR(e.flags, RELEASE_REQUEST);
     EBIT_CLR(e.flags, KEY_PRIVATE);
     EBIT_SET(e.flags, ENTRY_VALIDATED);
+
+    MemObject::MemCache &mc = e.mem_obj->memCache;
+    mc.index = index;
+    mc.io = MemObject::MemCache::ioReading;
 }
 
 /// copies the entire entry from shared to local memory
@@ -302,6 +314,7 @@ bool
 MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor)
 {
     debugs(20, 7, "mem-loading entry " << index << " from " << anchor.start);
+    assert(e.mem_obj);
 
     // emulate the usual Store code but w/o inapplicable checks and callbacks:
 
@@ -313,6 +326,11 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc
         // slice state may change during copying; take snapshots now
         wasEof = anchor.complete() && slice.next < 0;
         const Ipc::StoreMapSlice::Size wasSize = slice.size;
+
+        debugs(20, 9, "entry " << index << " slice " << sid << " eof " <<
+               wasEof << " wasSize " << wasSize << " <= " <<
+               anchor.basics.swap_file_sz << " sliceOffset " << sliceOffset <<
+               " mem.endOffset " << e.mem_obj->endOffset());
  
        if (e.mem_obj->endOffset() < sliceOffset + wasSize) {
             // size of the slice data that we already copied
@@ -379,6 +397,7 @@ MemStore::copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof)
         const int result = rep->httpMsgParseStep(mb.buf, buf.length, eof);
         if (result > 0) {
             assert(rep->pstate == psParsed);
+            EBIT_CLR(e.flags, ENTRY_FWD_HDR_WAIT);
         } else if (result < 0) {
             debugs(20, DBG_IMPORTANT, "Corrupted mem-cached headers: " << e);
             return false;
@@ -398,17 +417,35 @@ MemStore::copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof)
     return true;
 }
 
+/// whether we should cache the entry
 bool
-MemStore::keepInLocalMemory(const StoreEntry &e) const
+MemStore::shouldCache(const StoreEntry &e) const
 {
+    if (e.mem_status == IN_MEMORY) {
+        debugs(20, 5, "already loaded from mem-cache: " << e);
+        return false;
+    }
+
+    if (e.mem_obj && e.mem_obj->memCache.offset > 0) {
+        debugs(20, 5, "already written to mem-cache: " << e);
+        return false;
+    }
+
     if (!e.memoryCachable()) {
         debugs(20, 7, HERE << "Not memory cachable: " << e);
         return false; // will not cache due to entry state or properties
     }
 
     assert(e.mem_obj);
-    const int64_t loadedSize = e.mem_obj->endOffset();
     const int64_t expectedSize = e.mem_obj->expectedReplySize(); // may be < 0
+
+    // objects of unknown size are not allowed into memory cache, for now
+    if (expectedSize < 0) {
+        debugs(20, 5, HERE << "Unknown expected size: " << e);
+        return false;
+    }
+
+    const int64_t loadedSize = e.mem_obj->endOffset();
     const int64_t ramSize = max(loadedSize, expectedSize);
 
     if (ramSize > maxObjectSize()) {
@@ -417,151 +454,123 @@ MemStore::keepInLocalMemory(const StoreEntry &e) const
         return false; // will not cache due to cachable entry size limits
     }
 
+    if (!map) {
+        debugs(20, 5, HERE << "No map to mem-cache " << e);
+        return false;
+    }
+
     return true;
 }
 
-void
-MemStore::considerKeeping(StoreEntry &e)
+/// locks map anchor and preps to store the entry in shared memory
+bool
+MemStore::startCaching(StoreEntry &e)
 {
-    if (!keepInLocalMemory(e))
-        return;
-
-    // since we copy everything at once, we can only keep complete entries
-    if (e.store_status != STORE_OK) {
-        debugs(20, 7, HERE << "Incomplete: " << e);
-        return;
+    sfileno index = 0;
+    Ipc::StoreMapAnchor *slot = map->openForWriting(reinterpret_cast<const cache_key *>(e.key), index);
+    if (!slot) {
+        debugs(20, 5, HERE << "No room in mem-cache map to index " << e);
+        return false;
     }
 
-    if (e.mem_status == IN_MEMORY) {
-        debugs(20, 5, "already mem-cached: " << e);
-        return;
+    assert(e.mem_obj);
+    e.mem_obj->memCache.index = index;
+    e.mem_obj->memCache.io = MemObject::MemCache::ioWriting;
+    slot->set(e);
+    map->startAppending(index);
+    return true;
+}
+
+/// copies all local data to shared memory
+void
+MemStore::copyToShm(StoreEntry &e)
+{
+    // prevents remote readers from getting ENTRY_FWD_HDR_WAIT entries and
+    // not knowing when the wait is over
+    if (EBIT_TEST(e.flags, ENTRY_FWD_HDR_WAIT)) {
+        debugs(20, 5, "postponing copying " << e << " for ENTRY_FWD_HDR_WAIT");
+        return;         
     }
 
+    assert(map);
     assert(e.mem_obj);
 
-    const int64_t loadedSize = e.mem_obj->endOffset();
-    const int64_t expectedSize = e.mem_obj->expectedReplySize();
+    const int32_t index = e.mem_obj->memCache.index;
+    assert(index >= 0);
+    Ipc::StoreMapAnchor &anchor = map->writeableEntry(index);
 
-    // objects of unknown size are not allowed into memory cache, for now
-    if (expectedSize < 0) {
-        debugs(20, 5, HERE << "Unknown expected size: " << e);
-        return;
+    const int64_t eSize = e.mem_obj->endOffset();
+    if (e.mem_obj->memCache.offset >= eSize) {
+        debugs(20, 5, "postponing copying " << e << " for lack of news: " <<
+               e.mem_obj->memCache.offset << " >= " << eSize);
+        return; // nothing to do (yet)
     }
 
-    // since we copy everything at once, we can only keep fully loaded entries
-    if (loadedSize != expectedSize) {
-        debugs(20, 7, HERE << "partially loaded: " << loadedSize << " != " <<
-               expectedSize);
-        return;
+    if (anchor.start < 0) { // must allocate the very first slot for e
+        Ipc::Mem::PageId page;
+        anchor.start = reserveSapForWriting(page); // throws
+        map->extras(anchor.start).page = page;
     }
 
-    keep(e); // may still fail
-}
+    lastWritingSlice = anchor.start;
+    const size_t sliceCapacity = Ipc::Mem::PageSize();
 
-/// locks map anchor and calls copyToShm to store the entry in shared memory
-void
-MemStore::keep(StoreEntry &e)
-{
-    if (!map) {
-        debugs(20, 5, HERE << "No map to mem-cache " << e);
-        return;
-    }
+    // fill, skip slices that are already full
+    // Optimize: remember lastWritingSlice in e.mem_obj
+    while (e.mem_obj->memCache.offset < eSize) {
+        Ipc::StoreMap::Slice &slice =
+            map->writeableSlice(e.mem_obj->memCache.index, lastWritingSlice);
 
-    sfileno index = 0;
-    Ipc::StoreMapAnchor *slot = map->openForWriting(reinterpret_cast<const cache_key *>(e.key), index);
-    if (!slot) {
-        debugs(20, 5, HERE << "No room in mem-cache map to index " << e);
-        return;
-    }
+        if (slice.size >= sliceCapacity) {
+            if (slice.next >= 0) {
+                lastWritingSlice = slice.next;
+                continue;
+            }
 
-    try {
-        if (copyToShm(e, index, *slot)) {
-            slot->set(e);
-            map->closeForWriting(index, false);
-            CollapsedForwarding::Broadcast(static_cast<const cache_key*>(e.key));
-            return;
-        }
-        // fall through to the error handling code
-    } 
-    catch (const std::exception &x) { // TODO: should we catch ... as well?
-        debugs(20, 2, "mem-caching error writing entry " << index <<
-               ' ' << e << ": " << x.what());
-        // fall through to the error handling code
+            Ipc::Mem::PageId page;
+            slice.next = lastWritingSlice = reserveSapForWriting(page);
+            map->extras(lastWritingSlice).page = page;
+            debugs(20, 7, "entry " << index << " new slice: " << lastWritingSlice);
+         }
+
+         copyToShmSlice(e, anchor);
     }
 
-    map->abortIo(index);
-    CollapsedForwarding::Broadcast(static_cast<cache_key*>(e.key));
+    anchor.basics.swap_file_sz = e.mem_obj->memCache.offset;
+    debugs(20, 7, "mem-cached available " << eSize << " bytes of " << e);
 }
 
-/// copies all local data to shared memory
-bool
-MemStore::copyToShm(StoreEntry &e, const sfileno index, Ipc::StoreMapAnchor &anchor)
+/// copies at most one slice worth of local memory to shared memory
+void
+MemStore::copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor)
 {
-    const int64_t eSize = e.mem_obj->endOffset();
-    int64_t offset = 0;
-    lastWritingSlice = -1;
-    while (offset < eSize) {
-        if (!copyToShmSlice(e, index, anchor, offset))
-            return false;
-    }
-
-    // check that we kept everything or purge incomplete/sparse cached entry
-    if (eSize != offset) {
-        debugs(20, 2, "Failed to mem-cache " << e << ": " <<
-               eSize << " != " << offset);
-        return false;
-    }
+    Ipc::StoreMap::Slice &slice =
+        map->writeableSlice(e.mem_obj->memCache.index, lastWritingSlice);
 
-    debugs(20, 7, "mem-cached all " << eSize << " bytes of " << e);
-    e.swap_file_sz = eSize;
-
-    return true;
-}
-
-/// copies one slice worth of local memory to shared memory
-bool
-MemStore::copyToShmSlice(StoreEntry &e, const sfileno index, Ipc::StoreMapAnchor &anchor, int64_t &offset)
-{
-    Ipc::Mem::PageId page;
-    Ipc::StoreMapSliceId sid = reserveSapForWriting(page); // throws
-    assert(sid >= 0 && page);
-    map->extras(sid).page = page; // remember the page location for cleanup
-    debugs(20, 7, "entry " << index << " slice " << sid << " has " << page);
-
-    // link this slice with other entry slices to form a store entry chain
-    if (!offset) {
-        assert(lastWritingSlice < 0);
-        anchor.start = sid;
-        debugs(20, 7, "entry " << index << " starts at slice " << sid);
-    } else {
-        assert(lastWritingSlice >= 0);
-        map->writeableSlice(index, lastWritingSlice).next = sid;
-        debugs(20, 7, "entry " << index << " slice " << lastWritingSlice <<
-               " followed by slice " << sid);
-    }
-    lastWritingSlice = sid;
+    Ipc::Mem::PageId page = map->extras(lastWritingSlice).page;
+    assert(lastWritingSlice >= 0 && page);
+    debugs(20, 7, "entry " << e << " slice " << lastWritingSlice << " has " <<
+           page);
 
     const int64_t bufSize = Ipc::Mem::PageSize();
-    StoreIOBuffer sharedSpace(bufSize, offset,
-                              static_cast<char*>(PagePointer(page)));
+    const int64_t sliceOffset = e.mem_obj->memCache.offset % bufSize;
+    StoreIOBuffer sharedSpace(bufSize - sliceOffset, e.mem_obj->memCache.offset,
+                              static_cast<char*>(PagePointer(page)) + sliceOffset);
 
     // check that we kept everything or purge incomplete/sparse cached entry
     const ssize_t copied = e.mem_obj->data_hdr.copy(sharedSpace);
     if (copied <= 0) {
-        debugs(20, 2, "Failed to mem-cache " << e << " using " <<
-               bufSize << " bytes from " << offset << " in " << page);
-        return false;
+        debugs(20, 2, "Failed to mem-cache " << (bufSize - sliceOffset) <<
+               " bytes of " << e << " from " << e.mem_obj->memCache.offset <<
+               " in " << page);
+        throw TexcHere("data_hdr.copy failure");
     }
 
     debugs(20, 7, "mem-cached " << copied << " bytes of " << e <<
-           " from " << offset << " to " << page);
+           " from " << e.mem_obj->memCache.offset << " in " << page);
 
-    Ipc::StoreMapSlice &slice = map->writeableSlice(index, sid);
-    slice.next = -1;
-    slice.size = copied;
-
-    offset += copied;
-    return true;
+    slice.size += copied;
+    e.mem_obj->memCache.offset += copied;
 }
 
 /// finds a slot and a free page to fill or throws
@@ -623,25 +632,91 @@ MemStore::noteFreeMapSlice(const sfileno sliceId)
 }
 
 void
-MemStore::unlink(StoreEntry &e)
+MemStore::write(StoreEntry &e)
 {
     assert(e.mem_obj);
-    if (e.mem_obj->mem_index >= 0) {
-        map->freeEntry(e.mem_obj->mem_index);
-        disconnect(e);
+
+    debugs(20, 7, "entry " << e);
+
+    switch (e.mem_obj->memCache.io) {
+    case MemObject::MemCache::ioUndecided:
+        if (!shouldCache(e) || !startCaching(e)) {
+            e.mem_obj->memCache.io = MemObject::MemCache::ioDone;
+            Store::Root().transientsAbandon(e);
+            CollapsedForwarding::Broadcast(static_cast<cache_key*>(e.key));
+            return;
+        }
+        break;
+  
+    case MemObject::MemCache::ioDone:
+    case MemObject::MemCache::ioReading:
+        return; // we should not write in all of the above cases
+
+    case MemObject::MemCache::ioWriting:
+        break; // already decided to write and still writing
+    }
+
+    try {
+        copyToShm(e);
+        if (e.store_status == STORE_OK) // done receiving new content
+            completeWriting(e);
+        CollapsedForwarding::Broadcast(static_cast<cache_key*>(e.key));
+        return;
+    }
+    catch (const std::exception &x) { // TODO: should we catch ... as well?
+        debugs(20, 2, "mem-caching error writing entry " << e << ": " << x.what());
+        // fall through to the error handling code
+    }
+
+    Store::Root().transientsAbandon(e);
+    disconnect(*e.mem_obj);
+    CollapsedForwarding::Broadcast(static_cast<cache_key*>(e.key));
+}
+
+void
+MemStore::completeWriting(StoreEntry &e)
+{
+    assert(e.mem_obj);
+    const int32_t index = e.mem_obj->memCache.index;
+    assert(index >= 0);
+    assert(map);
+
+    debugs(20, 5, "mem-cached all " << e.mem_obj->memCache.offset << " bytes of " << e);
+
+    e.mem_obj->memCache.index = -1;
+    e.mem_obj->memCache.io = MemObject::MemCache::ioDone;
+    map->closeForWriting(index, false);
+}
+
+void
+MemStore::unlink(StoreEntry &e)
+{
+    assert(e.mem_status == IN_MEMORY);
+    MemObject *mem_obj = e.findMemObject();
+    assert(mem_obj);
+    if (mem_obj->memCache.index >= 0) {
+        map->freeEntry(mem_obj->memCache.index);
+        disconnect(*mem_obj);
     } else {
+        // the entry was loaded and then disconnected from the memory cache
         map->freeEntryByKey(reinterpret_cast<cache_key*>(e.key));
     }
-    e.destroyMemObject();
+        
+    e.destroyMemObject(); // XXX: but it may contain useful info such as a client list. The old code used to do that though, right?
 }
 
 void
-MemStore::disconnect(StoreEntry &e)
+MemStore::disconnect(MemObject &mem_obj)
 {
-    assert(e.mem_obj);
-    if (e.mem_obj->mem_index >= 0) {
-        map->abortIo(e.mem_obj->mem_index);
-        e.mem_obj->mem_index = -1;
+    if (mem_obj.memCache.index >= 0) {
+        if (mem_obj.memCache.io == MemObject::MemCache::ioWriting) {
+            map->abortWriting(mem_obj.memCache.index);
+        } else {
+            assert(mem_obj.memCache.io == MemObject::MemCache::ioReading);
+            map->closeForReading(mem_obj.memCache.index);
+        }
+        mem_obj.memCache.index = -1;
+        mem_obj.memCache.io = MemObject::MemCache::ioDone;
     }
 }
 
index ed4d2a38f9830a82edd05072ea63820ee567c071..c875095ca19c4e977a6d9f4a9d5b003c09f78252 100644 (file)
@@ -20,17 +20,20 @@ public:
     MemStore();
     virtual ~MemStore();
 
-    /// cache the entry or forget about it until the next considerKeeping call
-    void considerKeeping(StoreEntry &e);
-
     /// whether e should be kept in local RAM for possible future caching
     bool keepInLocalMemory(const StoreEntry &e) const;
 
+    /// copy non-shared entry data of the being-cached entry to our cache
+    void write(StoreEntry &e);
+
+    /// all data has been received; there will be no more write() calls
+    void completeWriting(StoreEntry &e);
+
     /// remove from the cache
     void unlink(StoreEntry &e);
 
     /// called when the entry is about to forget its association with mem cache
-    void disconnect(StoreEntry &e);
+    void disconnect(MemObject &mem_obj);
 
     /* Store API */
     virtual int callback();
@@ -48,16 +51,17 @@ public:
     virtual void reference(StoreEntry &);
     virtual bool dereference(StoreEntry &, bool);
     virtual void maintain();
-    virtual bool anchorCollapsed(StoreEntry &collapsed);
+    virtual bool anchorCollapsed(StoreEntry &collapsed, bool &inSync);
     virtual bool updateCollapsed(StoreEntry &collapsed);
 
     static int64_t EntryLimit();
 
 protected:
-    void keep(StoreEntry &e);
+    bool shouldCache(const StoreEntry &e) const;
+    bool startCaching(StoreEntry &e);
 
-    bool copyToShm(StoreEntry &e, const sfileno index, Ipc::StoreMapAnchor &anchor);
-    bool copyToShmSlice(StoreEntry &e, const sfileno index, Ipc::StoreMapAnchor &anchor, int64_t &offset);
+    void copyToShm(StoreEntry &e);
+    void copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor);
     bool copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor);
     bool copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof);
 
index 53dbaa77cedd59364ed5847cbf79126b5dbe388d..5d0c65f439c9a9cd758cc932fdb6e00ab8759563 100644 (file)
@@ -121,6 +121,11 @@ public:
     bool memoryCachable() const; ///< may be cached in memory
     void createMemObject(const char *, const char *);
     void hideMemObject(); ///< no mem_obj for callers until createMemObject
+
+    /// Memory cache needs this to get access to memCache.index of entries for
+    /// which the caller did not call StoreEntry::createMemObject().
+    MemObject *findMemObject() { return mem_obj ? mem_obj : hidden_mem_obj; }
+
     void dump(int debug_lvl) const;
     void hashDelete();
     void hashInsert(const cache_key *);
@@ -367,7 +372,7 @@ public:
     // XXX: This method belongs to Store::Root/StoreController, but it is here
     // because test cases use non-StoreController derivatives as Root
     /// called to get rid of no longer needed entry data in RAM, if any
-    virtual void maybeTrimMemory(StoreEntry &e, const bool preserveSwappable) {}
+    virtual void memoryOut(StoreEntry &e, const bool preserveSwappable) {}
 
     // XXX: This method belongs to Store::Root/StoreController, but it is here
     // to avoid casting Root() to StoreController until Root() API is fixed.
@@ -379,13 +384,29 @@ public:
     /// Update local intransit entry after changes made by appending worker.
     virtual void syncCollapsed(const cache_key *key) {}
 
+    // XXX: This method belongs to Store::Root/StoreController, but it is here
+    // to avoid casting Root() to StoreController until Root() API is fixed.
+    /// calls Root().transients->abandon() if transients are tracked
+    virtual void transientsAbandon(StoreEntry &e) {}
+
+    // XXX: This method belongs to Store::Root/StoreController, but it is here
+    // to avoid casting Root() to StoreController until Root() API is fixed.
+    /// disassociates the entry from the intransit table
+    virtual void transientsDisconnect(MemObject &mem_obj) {}
+
     // XXX: This method belongs to Store::Root/StoreController, but it is here
     // to avoid casting Root() to StoreController until Root() API is fixed.
     /// removes the entry from the memory cache
     virtual void memoryUnlink(StoreEntry &e) {}
 
-    /// if the entry is found, tie it to this cache and call updateCollapsed()
-    virtual bool anchorCollapsed(StoreEntry &collapsed) { return false; }
+    // XXX: This method belongs to Store::Root/StoreController, but it is here
+    // to avoid casting Root() to StoreController until Root() API is fixed.
+    /// disassociates the entry from the memory cache, preserving cached data
+    virtual void memoryDisconnect(MemObject &mem_obj) {}
+
+    /// If the entry is not found, return false. Otherwise, return true after
+    /// tying the entry to this cache and setting inSync to updateCollapsed().
+    virtual bool anchorCollapsed(StoreEntry &collapsed, bool &inSync) { return false; }
 
     /// update a local collapsed entry with fresh info from this cache (if any)
     virtual bool updateCollapsed(StoreEntry &collapsed) { return false; }
index ca151b5d58c7a45a0a7d07a31f61a3e433a35102..9e1c89108446cc1852f8d18d28c35a4bfdfecc31 100644 (file)
@@ -62,8 +62,11 @@ public:
 
     /* Store parent API */
     virtual void handleIdleEntry(StoreEntry &e);
-    virtual void maybeTrimMemory(StoreEntry &e, const bool preserveSwappable);
+    virtual void transientsAbandon(StoreEntry &e);
+    virtual void transientsDisconnect(MemObject &mem_obj);
+    virtual void memoryOut(StoreEntry &e, const bool preserveSwappable);
     virtual void memoryUnlink(StoreEntry &e);
+    virtual void memoryDisconnect(MemObject &mem_obj);
     virtual void allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod);
     virtual void syncCollapsed(const cache_key *key);
 
@@ -98,7 +101,7 @@ public:
 private:
     void createOneStore(Store &aStore);
     bool keepForLocalMemoryCache(const StoreEntry &e) const;
-    bool anchorCollapsedOnDisk(StoreEntry &collapsed);
+    bool anchorCollapsedOnDisk(StoreEntry &collapsed, bool &inSync);
 
     StorePointer swapDir; ///< summary view of all disk caches
     MemStore *memStore; ///< memory cache
index 0b1573a75aa4da02ad18d0e30cc59e1461941a9c..f891430aa3f80885094c252d6828aa0ab98578c0 100644 (file)
@@ -27,7 +27,6 @@ static const char *MapLabel = "transients_map";
 
 Transients::Transients(): map(NULL)
 {
-debugs(0,0, "Transients::ctor");
 }
 
 Transients::~Transients()
@@ -156,6 +155,17 @@ Transients::get(const cache_key *key)
     if (!map->openForReading(key, index))
         return NULL;
 
+    if (StoreEntry *e = copyFromShm(index))
+        return e; // keep read lock to receive updates from others
+
+    // loading failure
+    map->closeForReading(index);
+    return NULL;
+}
+
+StoreEntry *
+Transients::copyFromShm(const sfileno index)
+{
     const TransientsMap::Extras &extras = map->extras(index);
 
     // create a brand new store entry and initialize it with stored info
@@ -167,9 +177,7 @@ Transients::get(const cache_key *key)
 
     assert(e->mem_obj);
     e->mem_obj->method = extras.reqMethod;
-
-    // we copied everything we could to local memory; no more need to lock
-    map->closeForReading(index);
+    e->mem_obj->xitTable.index = index;
 
     // XXX: overwriting storeCreateEntry() which calls setPrivateKey() if
     // neighbors_do_private_keys (which is true in most cases and by default).
@@ -177,6 +185,11 @@ Transients::get(const cache_key *key)
     e->setPublicKey();
     assert(e->key);
 
+    // How do we know its SMP- and not just locally-collapsed? A worker gets
+    // locally-collapsed entries from the local store_table, not Transients.
+    // TODO: Can we remove smpCollapsed by not syncing non-transient entries?
+    e->mem_obj->smpCollapsed = true;
+
     return e;
 }
 
@@ -192,6 +205,8 @@ Transients::put(StoreEntry *e, const RequestFlags &reqFlags,
                 const HttpRequestMethod &reqMethod)
 {
     assert(e);
+    assert(e->mem_obj);
+    assert(e->mem_obj->xitTable.index < 0);
 
     if (!map) {
         debugs(20, 5, "No map to add " << *e);
@@ -201,14 +216,16 @@ Transients::put(StoreEntry *e, const RequestFlags &reqFlags,
     sfileno index = 0;
     Ipc::StoreMapAnchor *slot = map->openForWriting(reinterpret_cast<const cache_key *>(e->key), index);
     if (!slot) {
-        debugs(20, 5, "No room in map to index " << *e);
+        debugs(20, 5, "collision registering " << *e);
         return;
        }
 
     try {
         if (copyToShm(*e, index, reqFlags, reqMethod)) {
             slot->set(*e);
-            map->closeForWriting(index, false);
+            e->mem_obj->xitTable.index = index;
+            map->startAppending(index);
+            // keep write lock -- we will be supplying others with updates
             return;
                }
         // fall through to the error handling code
@@ -219,7 +236,7 @@ Transients::put(StoreEntry *e, const RequestFlags &reqFlags,
         // fall through to the error handling code
        }
 
-    map->abortIo(index);
+    map->abortWriting(index);
 }
 
 
@@ -238,7 +255,6 @@ Transients::copyToShm(const StoreEntry &e, const sfileno index,
        extras.url[urlLen] = '\0';
 
     extras.reqFlags = reqFlags;
-    
 
     Must(reqMethod != Http::METHOD_OTHER);
     extras.reqMethod = reqMethod.id();
@@ -252,6 +268,42 @@ Transients::noteFreeMapSlice(const sfileno sliceId)
     // TODO: we should probably find the entry being deleted and abort it
 }
 
+void
+Transients::abandon(const StoreEntry &e)
+{
+    assert(e.mem_obj && map);
+    map->freeEntry(e.mem_obj->xitTable.index); // just marks the locked entry
+    // We do not unlock the entry now because the problem is most likely with
+    // the server resource rather than a specific cache writer, so we want to
+    // prevent other readers from collapsing requests for that resource.
+}
+
+bool
+Transients::abandoned(const StoreEntry &e) const
+{
+    assert(e.mem_obj);
+    return abandonedAt(e.mem_obj->xitTable.index);
+}
+
+/// whether an in-transit entry at the index is now abandoned by its writer
+bool
+Transients::abandonedAt(const sfileno index) const
+{
+    assert(map);
+    return map->readableEntry(index).waitingToBeFreed;
+}
+
+void
+Transients::disconnect(MemObject &mem_obj)
+{
+    assert(mem_obj.xitTable.index >= 0 && map);
+    map->freeEntry(mem_obj.xitTable.index); // just marks the locked entry
+    mem_obj.xitTable.index = -1;
+    // We do not unlock the entry now because the problem is most likely with
+    // the server resource rather than a specific cache writer, so we want to
+    // prevent other readers from collapsing requests for that resource.
+}
+
 /// calculates maximum number of entries we need to store and map
 int64_t
 Transients::EntryLimit()
@@ -289,12 +341,10 @@ void TransientsRr::run(const RunnerRegistry &r)
 
 void TransientsRr::create(const RunnerRegistry &)
 {
-debugs(0,0, "TransientsRr::create1: " << Config.onoff.collapsed_forwarding);
     if (!Config.onoff.collapsed_forwarding)
         return;
 
     const int64_t entryLimit = Transients::EntryLimit();
-debugs(0,0, "TransientsRr::create2: " << entryLimit);
     if (entryLimit <= 0)
         return; // no SMP configured or a misconfiguration
 
index 8e31a770e7c9bd2bce8c11055a158750e0aa7fc7..29a40fe34966454c87b7b117ada266772228552b 100644 (file)
@@ -26,11 +26,14 @@ public:
     /// add an in-transit entry suitable for collapsing future requests
     void put(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod);
 
-    /// cache the entry or forget about it until the next considerKeeping call
-    /// XXX: remove void considerKeeping(StoreEntry &e);
+    /// the calling entry writer no longer expects to cache this entry
+    void abandon(const StoreEntry &e);
 
-    /// whether e should be kept in local RAM for possible future caching
-    /// XXX: remove bool keepInLocalMemory(const StoreEntry &e) const;
+    /// whether an in-transit entry is now abandoned by its writer
+    bool abandoned(const StoreEntry &e) const;
+
+    /// the calling entry writer no longer expects to cache this entry
+    void disconnect(MemObject &mem_obj);
 
     /* Store API */
     virtual int callback();
@@ -52,8 +55,11 @@ public:
     static int64_t EntryLimit();
 
 protected:
+    StoreEntry *copyFromShm(const sfileno index);
     bool copyToShm(const StoreEntry &e, const sfileno index, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod);
 
+    bool abandonedAt(const sfileno index) const;
+
     // Ipc::StoreMapCleaner API
     virtual void noteFreeMapSlice(const sfileno sliceId);
 
index 22c86dd9c8efe7b8187ada136def91a8d1db3925..01ec497f86e12e50a8ac1af7a3281fb223c81c9f 100644 (file)
@@ -128,7 +128,7 @@ Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
         success = true;
     } catch (const std::exception &ex) { // TODO: should we catch ... as well?
         debugs(79, 2, "db write error: " << ex.what());
-        dir->writeError(swap_filen);
+        dir->writeError(*e);
         finishedWriting(DISK_ERROR);
         // 'this' might be gone beyond this point; fall through to free buf
     }
@@ -309,7 +309,7 @@ Rock::IoState::close(int how)
 
     case writerGone:
         assert(writeableAnchor_);
-        dir->writeError(swap_filen); // abort a partially stored entry
+        dir->writeError(*e); // abort a partially stored entry
         finishedWriting(DISK_ERROR);
         return;
 
index f7a840d776b44a0600f2e963a344ac01548c59da..9eadf82616a3d01024b7f3052179c23e4c27ccc9 100644 (file)
@@ -82,7 +82,7 @@ Rock::SwapDir::get(const cache_key *key)
 }
 
 bool
-Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed)
+Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
 {
     if (!map || !theFile || !theFile->canRead())
         return false;
@@ -94,7 +94,8 @@ Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed)
         return false;
 
     anchorEntry(collapsed, filen, *slot);
-    return updateCollapsedWith(collapsed, *slot);
+    inSync = updateCollapsedWith(collapsed, *slot);
+    return false;
 }
 
 bool
@@ -155,8 +156,13 @@ void Rock::SwapDir::disconnect(StoreEntry &e)
     // do not rely on e.swap_status here because there is an async delay
     // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
 
-    // since e has swap_filen, its slot is locked for either reading or writing
-    map->abortIo(e.swap_filen);
+    // since e has swap_filen, its slot is locked for reading and/or writing
+    // but it is difficult to know whether THIS worker is reading or writing e
+    if (e.swap_status == SWAPOUT_WRITING ||
+        (e.mem_obj && e.mem_obj->swapout.sio != NULL))
+        map->abortWriting(e.swap_filen);
+    else
+        map->closeForReading(e.swap_filen);
     e.swap_dirn = -1;
     e.swap_filen = -1;
     e.swap_status = SWAPOUT_NONE;
@@ -810,7 +816,7 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest
             sio.finishedWriting(errflag);
         }
     } else {
-        writeError(sio.swap_filen);
+        writeError(*sio.e);
         sio.finishedWriting(errflag);
         // and hope that Core will call disconnect() to close the map entry
     }
@@ -819,11 +825,14 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest
 }
 
 void
-Rock::SwapDir::writeError(const sfileno fileno)
+Rock::SwapDir::writeError(StoreEntry &e)
 {
     // Do not abortWriting here. The entry should keep the write lock
     // instead of losing association with the store and confusing core.
-    map->freeEntry(fileno); // will mark as unusable, just in case
+    map->freeEntry(e.swap_filen); // will mark as unusable, just in case
+
+    Store::Root().transientsAbandon(e);
+
     // All callers must also call IoState callback, to propagate the error.
 }
 
index eba5ed6019b17ac099f15bf1d1f57684eea6393c..a9eb2a26fd15b2c9ae242927694799be47d18089 100644 (file)
@@ -57,7 +57,7 @@ public:
 
     int64_t diskOffset(Ipc::Mem::PageId &pageId) const;
     int64_t diskOffset(int filen) const;
-    void writeError(const sfileno fileno);
+    void writeError(StoreEntry &e);
 
     /* StoreMapCleaner API */
     virtual void noteFreeMapSlice(const sfileno fileno);
@@ -66,7 +66,7 @@ public:
 
 protected:
     /* Store API */
-    virtual bool anchorCollapsed(StoreEntry &collapsed);
+    virtual bool anchorCollapsed(StoreEntry &collapsed, bool &inSync);
     virtual bool updateCollapsed(StoreEntry &collapsed);
 
     /* protected ::SwapDir API */
index fccf0e01a8399e6098c1516eb6107b135c867538..99465fe13e485da8770bcee4964cd6a974c03f6c 100644 (file)
@@ -497,7 +497,7 @@ destroyStoreEntry(void *data)
 void
 StoreEntry::hashInsert(const cache_key * someKey)
 {
-    debugs(20, 3, "StoreEntry::hashInsert: Inserting Entry " << this << " key '" << storeKeyText(someKey) << "'");
+    debugs(20, 3, "StoreEntry::hashInsert: Inserting Entry " << *this << " key '" << storeKeyText(someKey) << "'");
     key = storeKeyDup(someKey);
     hash_join(store_table, this);
 }
@@ -1625,8 +1625,6 @@ StoreEntry::setMemStatus(mem_status_t new_status)
 
     // are we using a shared memory cache?
     if (Config.memShared && IamWorkerProcess()) {
-        // enumerate calling cases if shared memory is enabled
-        assert(new_status != IN_MEMORY || EBIT_TEST(flags, ENTRY_SPECIAL));
         // This method was designed to update replacement policy, not to
         // actually purge something from the memory cache (TODO: rename?).
         // Shared memory cache does not have a policy that needs updates.
@@ -1861,6 +1859,7 @@ StoreEntry::startWriting()
 
     rep->packHeadersInto(&p);
     mem_obj->markEndOfReplyHeaders();
+    EBIT_CLR(flags, ENTRY_FWD_HDR_WAIT);
 
     rep->body.packInto(&p);
 
@@ -2032,7 +2031,9 @@ std::ostream &operator <<(std::ostream &os, const StoreEntry &e)
     os << "e:";
 
     if (e.swap_filen > -1 || e.swap_dirn > -1)
-        os << e.swap_filen << '@' << e.swap_dirn << '=';
+        os << e.swap_filen << '@' << e.swap_dirn;
+
+    os << '=';
 
     // print only non-default status values, using unique letters
     if (e.mem_status != NOT_IN_MEMORY ||
@@ -2043,7 +2044,6 @@ std::ostream &operator <<(std::ostream &os, const StoreEntry &e)
         if (e.store_status != STORE_PENDING) os << 's';
         if (e.swap_status != SWAPOUT_NONE) os << 'w' << e.swap_status;
         if (e.ping_status != PING_NONE) os << 'p' << e.ping_status;
-        os << '.';
     }
 
     // print only set flags, using unique letters
@@ -2061,10 +2061,12 @@ std::ostream &operator <<(std::ostream &os, const StoreEntry &e)
         if (EBIT_TEST(e.flags, ENTRY_VALIDATED)) os << 'V';
         if (EBIT_TEST(e.flags, ENTRY_BAD_LENGTH)) os << 'L';
         if (EBIT_TEST(e.flags, ENTRY_ABORTED)) os << 'A';
-        os << '/';
     }
 
-    return os << &e << '*' << e.lock_count;
+    if (e.mem_obj && e.mem_obj->smpCollapsed)
+        os << 'O';
+
+    return os << '/' << &e << '*' << e.lock_count;
 }
 
 /* NullStoreEntry */
index 540b8286cbe90c0ccf7049c59641eb0f866c17b3..127072d3685103ef5e9a25a27c78dcac587aee21 100644 (file)
@@ -264,12 +264,17 @@ store_client::copy(StoreEntry * anEntry,
     PROF_stop(storeClient_kickReads);
     copying = false;
 
+    anEntry->lock("store_client::copy"); // see deletion note below
+
     storeClientCopy2(entry, this);
 
+    // Bug 3480: This store_client object may be deleted now if, for example,
+    // the client rejects the hit response copied above. Use on-stack pointers!
+
 #if USE_ADAPTATION
-    if (entry)
-        entry->kickProducer();
+    anEntry->kickProducer();
 #endif
+    anEntry->unlock("store_client::copy");
 }
 
 /*
@@ -764,6 +769,8 @@ StoreEntry::invokeHandlers()
     PROF_stop(InvokeHandlers);
 }
 
+// XXX: Does not account for remote readers of local writers, causing
+// premature StoreEntry aborts.
 int
 storePendingNClients(const StoreEntry * e)
 {
index dccf60fcbb8296c20737e1e9c6de15aa22c0888c..0835d80f28f3415833f196b304a68f5024b42469 100644 (file)
@@ -812,8 +812,9 @@ StoreController::get(String const key, STOREGETCLIENT aCallback, void *aCallback
 }
 
 /// updates the collapsed entry with the corresponding on-disk entry, if any
+/// In other words, the SwapDir::anchorCollapsed() API applied to all disks.
 bool
-StoreController::anchorCollapsedOnDisk(StoreEntry &collapsed)
+StoreController::anchorCollapsedOnDisk(StoreEntry &collapsed, bool &inSync)
 {
     // TODO: move this loop to StoreHashIndex, just like the one in get().
     if (const int cacheDirs = Config.cacheSwap.n_configured) {
@@ -827,7 +828,7 @@ StoreController::anchorCollapsedOnDisk(StoreEntry &collapsed)
             if (!sd->active())
                 continue;
 
-            if (sd->anchorCollapsed(collapsed)) {
+            if (sd->anchorCollapsed(collapsed, inSync)) {
                 debugs(20, 3, "cache_dir " << idx << " anchors " << collapsed);
                 return true;
             }
@@ -859,11 +860,11 @@ StoreController::keepForLocalMemoryCache(const StoreEntry &e) const
 }
 
 void
-StoreController::maybeTrimMemory(StoreEntry &e, const bool preserveSwappable)
+StoreController::memoryOut(StoreEntry &e, const bool preserveSwappable)
 {
     bool keepInLocalMemory = false;
     if (memStore)
-        keepInLocalMemory = memStore->keepInLocalMemory(e);
+        memStore->write(e); // leave keepInLocalMemory false
     else
         keepInLocalMemory = keepForLocalMemoryCache(e);
 
@@ -885,6 +886,31 @@ StoreController::memoryUnlink(StoreEntry &e)
         e.destroyMemObject();
 }
 
+void
+StoreController::memoryDisconnect(MemObject &mem_obj)
+{
+    if (memStore)
+        memStore->disconnect(mem_obj);
+    // else nothing to do for non-shared memory cache
+}
+
+void
+StoreController::transientsAbandon(StoreEntry &e)
+{
+    if (transients) {
+        assert(e.mem_obj);
+        if (e.mem_obj->xitTable.index >= 0)
+            transients->abandon(e);
+    }
+}
+
+void
+StoreController::transientsDisconnect(MemObject &mem_obj)
+{
+    if (transients)
+        transients->disconnect(mem_obj);
+}
+
 void
 StoreController::handleIdleEntry(StoreEntry &e)
 {
@@ -896,7 +922,6 @@ StoreController::handleIdleEntry(StoreEntry &e)
         // They are not managed [well] by any specific Store handled below.
         keepInLocalMemory = true;
     } else if (memStore) {
-        memStore->considerKeeping(e);
         // leave keepInLocalMemory false; memStore maintains its own cache
     } else {
         keepInLocalMemory = keepForLocalMemoryCache(e) && // in good shape and
@@ -930,7 +955,8 @@ StoreController::allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags,
     e->makePublic(); // this is needed for both local and SMP collapsing
     if (transients)
         transients->put(e, reqFlags, reqMethod);
-    debugs(20, 3, "may " << (transients ? "SMP" : "") << " collapse " << *e);
+    debugs(20, 3, "may " << (transients && e->mem_obj->xitTable.index >= 0 ?
+           "SMP-" : "locally-") << "collapse " << *e);
 }
 
 void
@@ -940,24 +966,38 @@ StoreController::syncCollapsed(const cache_key *key)
     if (!collapsed) // the entry is no longer locally active, ignore the update
         return;
 
+    if (collapsed->mem_obj && !collapsed->mem_obj->smpCollapsed) {
+        // this happens, e.g., when we tried collapsing but rejected the hit
+        debugs(20, 7, "not SMP-syncing not SMP-collapsed " << *collapsed);
+        return;
+    }
+    // XXX: collapsed->mem_obj may be still hidden here
+
     debugs(20, 7, "syncing " << *collapsed);
 
+    bool found = false;
     bool inSync = false;
-    if (memStore && collapsed->mem_status == IN_MEMORY)
+    if (memStore && collapsed->mem_status == IN_MEMORY) {
+        found = true;
         inSync = memStore->updateCollapsed(*collapsed);
-    else if (collapsed->swap_filen >= 0)
+    } else if (collapsed->swap_filen >= 0) {
+        found = true;
         inSync = collapsed->store()->updateCollapsed(*collapsed);
-    else if (memStore && memStore->anchorCollapsed(*collapsed))
-        inSync = true; // found in the memory cache
-    else if (anchorCollapsedOnDisk(*collapsed))
-        inSync = true; // found in a disk cache
+    } else {
+        if (memStore)
+            found = memStore->anchorCollapsed(*collapsed, inSync);
+        else if (Config.cacheSwap.n_configured)
+            found = anchorCollapsedOnDisk(*collapsed, inSync);
+    }
 
     if (inSync) {
         debugs(20, 5, "synced " << *collapsed);
         collapsed->invokeHandlers();
-    } else { // the backing entry is no longer cached; abort this hit
-        debugs(20, 3, "aborting cacheless " << *collapsed);
+    } else if (found) { // unrecoverable problem syncing this entry
+        debugs(20, 3, "aborting " << *collapsed);
         collapsed->abort();
+    } else { // the entry is still not in one of the caches
+        debugs(20, 7, "waiting " << *collapsed);
     }
 }
 
index c8b700686e50abca39941ac86bf055e2253ea91a..765aac2d3aecd575a9f5f4dd2d281478acbc549c 100644 (file)
@@ -202,7 +202,7 @@ StoreEntry::swapOut()
 
     const bool weAreOrMayBeSwappingOut = swappingOut() || mayStartSwapOut();
 
-    Store::Root().maybeTrimMemory(*this, weAreOrMayBeSwappingOut);
+    Store::Root().memoryOut(*this, weAreOrMayBeSwappingOut);
 
     if (mem_obj->swapout.decision != MemObject::SwapOut::swPossible)
         return; // nothing else to do
index 309e8cd7ffb8fc29701b49985224d3d2d5d8ee26..33dd231a9d872c2d11f7df91fdcf10a0975eb784 100644 (file)
@@ -55,7 +55,7 @@ int MemObject::mostBytesWanted(int max, bool ignoreDelayPools) const STUB_RETVAL
 DelayId MemObject::mostBytesAllowed() const STUB_RETVAL(DelayId())
 #endif
 void MemObject::unlinkRequest() STUB
-void MemObject::write(StoreIOBuffer writeBuffer, STMCB *callback, void *callbackData) STUB
+void MemObject::write(const StoreIOBuffer &writeBuffer) STUB
 void MemObject::replaceHttpReply(HttpReply *newrep) STUB
 int64_t MemObject::lowestMemReaderOffset() const STUB_RETVAL(0)
 void MemObject::kickReads() STUB