]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/MemStore.cc
Docs: Copyright updates for 2018 (#114)
[thirdparty/squid.git] / src / MemStore.cc
index c3b18ecc1fca3bc8ebcddeda5d39b36bcd9e45c5..3e1983b25d74d1e29f742dcdf58d7cb634358986 100644 (file)
@@ -1,8 +1,13 @@
 /*
- * DEBUG: section 20    Memory Cache
+ * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
  *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
+/* DEBUG: section 20    Memory Cache */
+
 #include "squid.h"
 #include "base/RunnersRegistry.h"
 #include "CollapsedForwarding.h"
 #include "tools.h"
 
 /// shared memory segment path to use for MemStore maps
-static const char *MapLabel = "cache_mem_map";
+static const SBuf MapLabel("cache_mem_map");
 /// shared memory segment path to use for the free slices index
 static const char *SpaceLabel = "cache_mem_space";
+/// shared memory segment path to use for IDs of shared pages with slice data
+static const char *ExtrasLabel = "cache_mem_ex";
 // TODO: sync with Rock::SwapDir::*Path()
 
 // We store free slot IDs (i.e., "space") as Page objects so that we can use
@@ -28,6 +35,137 @@ static const char *SpaceLabel = "cache_mem_space";
 // used except for a positivity test. A unique value is handy for debugging.
 static const uint32_t SpacePoolId = 510716;
 
+/// Packs to shared memory, allocating new slots/pages as needed.
+/// Requires an Ipc::StoreMapAnchor locked for writing.
+class ShmWriter: public Packable
+{
+public:
+    ShmWriter(MemStore &aStore, StoreEntry *anEntry, const sfileno aFileNo, Ipc::StoreMapSliceId aFirstSlice = -1);
+
+    /* Packable API */
+    virtual void append(const char *aBuf, int aSize) override;
+    virtual void vappendf(const char *fmt, va_list ap) override;
+
+public:
+    StoreEntry *entry; ///< the entry being updated
+
+    /// the slot keeping the first byte of the appended content (at least)
+    /// either set via constructor parameter or allocated by the first append
+    Ipc::StoreMapSliceId firstSlice;
+
+    /// the slot keeping the last byte of the appended content (at least)
+    Ipc::StoreMapSliceId lastSlice;
+
+    uint64_t totalWritten; ///< cumulative number of bytes appended so far
+
+protected:
+    void copyToShm();
+    void copyToShmSlice(Ipc::StoreMap::Slice &slice);
+
+private:
+    MemStore &store;
+    const sfileno fileNo;
+
+    /* set by (and only valid during) append calls */
+    const char *buf; ///< content being appended now
+    int bufSize; ///< buf size
+    int bufWritten; ///< buf bytes appended so far
+};
+
+/* ShmWriter */
+
+ShmWriter::ShmWriter(MemStore &aStore, StoreEntry *anEntry, const sfileno aFileNo, Ipc::StoreMapSliceId aFirstSlice):
+    entry(anEntry),
+    firstSlice(aFirstSlice),
+    lastSlice(firstSlice),
+    totalWritten(0),
+    store(aStore),
+    fileNo(aFileNo),
+    buf(nullptr),
+    bufSize(0),
+    bufWritten(0)
+{
+    Must(entry);
+}
+
+void
+ShmWriter::append(const char *aBuf, int aBufSize)
+{
+    Must(!buf);
+    buf = aBuf;
+    bufSize = aBufSize;
+    if (bufSize) {
+        Must(buf);
+        bufWritten = 0;
+        copyToShm();
+    }
+    buf = nullptr;
+    bufSize = 0;
+    bufWritten = 0;
+}
+
+void
+ShmWriter::vappendf(const char *fmt, va_list ap)
+{
+    SBuf vaBuf;
+#if defined(VA_COPY)
+    va_list apCopy;
+    VA_COPY(apCopy, ap);
+    vaBuf.vappendf(fmt, apCopy);
+    va_end(apCopy);
+#else
+    vaBuf.vappendf(fmt, ap);
+#endif
+    append(vaBuf.rawContent(), vaBuf.length());
+}
+
+/// copies the entire buffer to shared memory
+void
+ShmWriter::copyToShm()
+{
+    Must(bufSize > 0); // do not use up shared memory pages for nothing
+    Must(firstSlice < 0 || lastSlice >= 0);
+
+    // fill, skip slices that are already full
+    while (bufWritten < bufSize) {
+        Ipc::StoreMap::Slice &slice = store.nextAppendableSlice(fileNo, lastSlice);
+        if (firstSlice < 0)
+            firstSlice = lastSlice;
+        copyToShmSlice(slice);
+    }
+
+    debugs(20, 7, "stored " << bufWritten << '/' << totalWritten << " header bytes of " << *entry);
+}
+
+/// copies at most one slice worth of buffer to shared memory
+void
+ShmWriter::copyToShmSlice(Ipc::StoreMap::Slice &slice)
+{
+    Ipc::Mem::PageId page = store.pageForSlice(lastSlice);
+    debugs(20, 7, "entry " << *entry << " slice " << lastSlice << " has " <<
+           page);
+
+    Must(bufWritten <= bufSize);
+    const int64_t writingDebt = bufSize - bufWritten;
+    const int64_t pageSize = Ipc::Mem::PageSize();
+    const int64_t sliceOffset = totalWritten % pageSize;
+    const int64_t copySize = std::min(writingDebt, pageSize - sliceOffset);
+    memcpy(static_cast<char*>(PagePointer(page)) + sliceOffset, buf + bufWritten,
+           copySize);
+
+    debugs(20, 7, "copied " << slice.size << '+' << copySize << " bytes of " <<
+           entry << " from " << sliceOffset << " in " << page);
+
+    slice.size += copySize;
+    bufWritten += copySize;
+    totalWritten += copySize;
+    // fresh anchor.basics.swap_file_sz is already set [to the stale value]
+
+    // either we wrote everything or we filled the entire slice
+    Must(bufWritten == bufSize || sliceOffset + copySize == pageSize);
+}
+
+/* MemStore */
 
 MemStore::MemStore(): map(NULL), lastWritingSlice(-1)
 {
@@ -47,21 +185,22 @@ MemStore::init()
 
     // check compatibility with the disk cache, if any
     if (Config.cacheSwap.n_configured > 0) {
-    const int64_t diskMaxSize = Store::Root().maxObjectSize();
-    const int64_t memMaxSize = maxObjectSize();
-    if (diskMaxSize == -1) {
-        debugs(20, DBG_IMPORTANT, "WARNING: disk-cache maximum object size "
-               "is unlimited but mem-cache maximum object size is " <<
-               memMaxSize / 1024.0 << " KB");
-    } else if (diskMaxSize > memMaxSize) {
-        debugs(20, DBG_IMPORTANT, "WARNING: disk-cache maximum object size "
-               "is too large for mem-cache: " <<
-               diskMaxSize / 1024.0 << " KB > " <<
-               memMaxSize / 1024.0 << " KB");
-    }
+        const int64_t diskMaxSize = Store::Root().maxObjectSize();
+        const int64_t memMaxSize = maxObjectSize();
+        if (diskMaxSize == -1) {
+            debugs(20, DBG_IMPORTANT, "WARNING: disk-cache maximum object size "
+                   "is unlimited but mem-cache maximum object size is " <<
+                   memMaxSize / 1024.0 << " KB");
+        } else if (diskMaxSize > memMaxSize) {
+            debugs(20, DBG_IMPORTANT, "WARNING: disk-cache maximum object size "
+                   "is too large for mem-cache: " <<
+                   diskMaxSize / 1024.0 << " KB > " <<
+                   memMaxSize / 1024.0 << " KB");
+        }
     }
 
     freeSlots = shm_old(Ipc::Mem::PageStack)(SpaceLabel);
+    extras = shm_old(Extras)(ExtrasLabel);
 
     Must(!map);
     map = new MemStoreMap(MapLabel);
@@ -92,21 +231,25 @@ MemStore::stat(StoreEntry &e) const
                       Math::doublePercent(currentSize(), maxSize()));
 
     if (map) {
-        const int limit = map->entryLimit();
-        storeAppendPrintf(&e, "Maximum entries: %9d\n", limit);
-        if (limit > 0) {
+        const int entryLimit = map->entryLimit();
+        const int slotLimit = map->sliceLimit();
+        storeAppendPrintf(&e, "Maximum entries: %9d\n", entryLimit);
+        if (entryLimit > 0) {
             storeAppendPrintf(&e, "Current entries: %" PRId64 " %.2f%%\n",
-                              currentCount(), (100.0 * currentCount() / limit));
+                              currentCount(), (100.0 * currentCount() / entryLimit));
+        }
 
+        storeAppendPrintf(&e, "Maximum slots:   %9d\n", slotLimit);
+        if (slotLimit > 0) {
             const unsigned int slotsFree =
                 Ipc::Mem::PagesAvailable(Ipc::Mem::PageId::cachePage);
-            if (slotsFree <= static_cast<const unsigned int>(limit)) {
-                const int usedSlots = limit - static_cast<const int>(slotsFree);
+            if (slotsFree <= static_cast<const unsigned int>(slotLimit)) {
+                const int usedSlots = slotLimit - static_cast<const int>(slotsFree);
                 storeAppendPrintf(&e, "Used slots:      %9d %.2f%%\n",
-                                  usedSlots, (100.0 * usedSlots / limit));
+                                  usedSlots, (100.0 * usedSlots / slotLimit));
             }
 
-            if (limit < 100) { // XXX: otherwise too expensive to count
+            if (slotLimit < 100) { // XXX: otherwise too expensive to count
                 Ipc::ReadWriteLockStats stats;
                 map->updateStats(stats);
                 stats.dump(e);
@@ -136,7 +279,7 @@ uint64_t
 MemStore::currentSize() const
 {
     return Ipc::Mem::PageLevel(Ipc::Mem::PageId::cachePage) *
-        Ipc::Mem::PageSize();
+           Ipc::Mem::PageSize();
 }
 
 uint64_t
@@ -157,25 +300,12 @@ MemStore::reference(StoreEntry &)
 }
 
 bool
-MemStore::dereference(StoreEntry &, bool)
+MemStore::dereference(StoreEntry &)
 {
     // no need to keep e in the global store_table for us; we have our own map
     return false;
 }
 
-int
-MemStore::callback()
-{
-    return 0;
-}
-
-StoreSearch *
-MemStore::search(String const, HttpRequest *)
-{
-    fatal("not implemented");
-    return NULL;
-}
-
 StoreEntry *
 MemStore::get(const cache_key *key)
 {
@@ -193,7 +323,7 @@ MemStore::get(const cache_key *key)
     // XXX: We do not know the URLs yet, only the key, but we need to parse and
     // store the response for the Root().get() callers to be happy because they
     // expect IN_MEMORY entries to already have the response headers and body.
-    e->makeMemObject();
+    e->createMemObject();
 
     anchorEntry(*e, index, *slot);
 
@@ -210,10 +340,66 @@ MemStore::get(const cache_key *key)
 }
 
 void
-MemStore::get(String const key, STOREGETCLIENT aCallback, void *aCallbackData)
+MemStore::updateHeaders(StoreEntry *updatedE)
 {
-    // XXX: not needed but Store parent forces us to implement this
-    fatal("MemStore::get(key,callback,data) should not be called");
+    if (!map)
+        return;
+
+    Ipc::StoreMapUpdate update(updatedE);
+    assert(updatedE);
+    assert(updatedE->mem_obj);
+    if (!map->openForUpdating(update, updatedE->mem_obj->memCache.index))
+        return;
+
+    try {
+        updateHeadersOrThrow(update);
+    } catch (const std::exception &ex) {
+        debugs(20, 2, "error starting to update entry " << *updatedE << ": " << ex.what());
+        map->abortUpdating(update);
+    }
+}
+
+void
+MemStore::updateHeadersOrThrow(Ipc::StoreMapUpdate &update)
+{
+    // our +/- hdr_sz math below does not work if the chains differ [in size]
+    Must(update.stale.anchor->basics.swap_file_sz == update.fresh.anchor->basics.swap_file_sz);
+
+    const HttpReply *rawReply = update.entry->getReply();
+    Must(rawReply);
+    const HttpReply &reply = *rawReply;
+    const uint64_t staleHdrSz = reply.hdr_sz;
+    debugs(20, 7, "stale hdr_sz: " << staleHdrSz);
+
+    /* we will need to copy same-slice payload after the stored headers later */
+    Must(staleHdrSz > 0);
+    update.stale.splicingPoint = map->sliceContaining(update.stale.fileNo, staleHdrSz);
+    Must(update.stale.splicingPoint >= 0);
+    Must(update.stale.anchor->basics.swap_file_sz >= staleHdrSz);
+
+    Must(update.stale.anchor);
+    ShmWriter writer(*this, update.entry, update.fresh.fileNo);
+    reply.packHeadersInto(&writer);
+    const uint64_t freshHdrSz = writer.totalWritten;
+    debugs(20, 7, "fresh hdr_sz: " << freshHdrSz << " diff: " << (freshHdrSz - staleHdrSz));
+
+    /* copy same-slice payload remaining after the stored headers */
+    const Ipc::StoreMapSlice &slice = map->readableSlice(update.stale.fileNo, update.stale.splicingPoint);
+    const Ipc::StoreMapSlice::Size sliceCapacity = Ipc::Mem::PageSize();
+    const Ipc::StoreMapSlice::Size headersInLastSlice = staleHdrSz % sliceCapacity;
+    Must(headersInLastSlice > 0); // or sliceContaining() would have stopped earlier
+    Must(slice.size >= headersInLastSlice);
+    const Ipc::StoreMapSlice::Size payloadInLastSlice = slice.size - headersInLastSlice;
+    const MemStoreMapExtras::Item &extra = extras->items[update.stale.splicingPoint];
+    char *page = static_cast<char*>(PagePointer(extra.page));
+    debugs(20, 5, "appending same-slice payload: " << payloadInLastSlice);
+    writer.append(page + headersInLastSlice, payloadInLastSlice);
+    update.fresh.splicingPoint = writer.lastSlice;
+
+    update.fresh.anchor->basics.swap_file_sz -= staleHdrSz;
+    update.fresh.anchor->basics.swap_file_sz += freshHdrSz;
+
+    map->closeForUpdating(update);
 }
 
 bool
@@ -224,7 +410,7 @@ MemStore::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
 
     sfileno index;
     const Ipc::StoreMapAnchor *const slot = map->openForReading(
-        reinterpret_cast<cache_key*>(collapsed.key), index);
+            reinterpret_cast<cache_key*>(collapsed.key), index);
     if (!slot)
         return false;
 
@@ -238,10 +424,10 @@ MemStore::updateCollapsed(StoreEntry &collapsed)
 {
     assert(collapsed.mem_obj);
 
-    const sfileno index = collapsed.mem_obj->memCache.index; 
+    const sfileno index = collapsed.mem_obj->memCache.index;
 
     // already disconnected from the cache, no need to update
-    if (index < 0) 
+    if (index < 0)
         return true;
 
     if (!map)
@@ -251,14 +437,13 @@ MemStore::updateCollapsed(StoreEntry &collapsed)
     return updateCollapsedWith(collapsed, index, anchor);
 }
 
+/// updates collapsed entry after its anchor has been located
 bool
 MemStore::updateCollapsedWith(StoreEntry &collapsed, const sfileno index, const Ipc::StoreMapAnchor &anchor)
 {
-    collapsed.swap_file_sz = anchor.basics.swap_file_sz; // XXX: make atomic
-
+    collapsed.swap_file_sz = anchor.basics.swap_file_sz;
     const bool copied = copyFromShm(collapsed, index, anchor);
-
-    return copied; // XXX: when do we unlock the map slot?
+    return copied;
 }
 
 /// anchors StoreEntry to an already locked map entry
@@ -271,7 +456,7 @@ MemStore::anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc
     e.lastref = basics.lastref;
     e.timestamp = basics.timestamp;
     e.expires = basics.expires;
-    e.lastmod = basics.lastmod;
+    e.lastModified(basics.lastmod);
     e.refcount = basics.refcount;
     e.flags = basics.flags;
 
@@ -288,9 +473,8 @@ MemStore::anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc
     assert(e.swap_status == SWAPOUT_NONE); // set in StoreEntry constructor
     e.ping_status = PING_NONE;
 
-    EBIT_SET(e.flags, ENTRY_CACHABLE);
     EBIT_CLR(e.flags, RELEASE_REQUEST);
-    EBIT_CLR(e.flags, KEY_PRIVATE);
+    e.clearPrivate();
     EBIT_SET(e.flags, ENTRY_VALIDATED);
 
     MemObject::MemCache &mc = e.mem_obj->memCache;
@@ -320,21 +504,22 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc
                wasEof << " wasSize " << wasSize << " <= " <<
                anchor.basics.swap_file_sz << " sliceOffset " << sliceOffset <<
                " mem.endOffset " << e.mem_obj->endOffset());
-       if (e.mem_obj->endOffset() < sliceOffset + wasSize) {
+
+        if (e.mem_obj->endOffset() < sliceOffset + wasSize) {
             // size of the slice data that we already copied
             const size_t prefixSize = e.mem_obj->endOffset() - sliceOffset;
             assert(prefixSize <= wasSize);
 
-            const MemStoreMap::Extras &extras = map->extras(sid);
-            char *page = static_cast<char*>(PagePointer(extras.page));
+            const MemStoreMapExtras::Item &extra = extras->items[sid];
+
+            char *page = static_cast<char*>(PagePointer(extra.page));
             const StoreIOBuffer sliceBuf(wasSize - prefixSize,
                                          e.mem_obj->endOffset(),
                                          page + prefixSize);
             if (!copyFromShmSlice(e, sliceBuf, wasEof))
                 return false;
             debugs(20, 9, "entry " << index << " copied slice " << sid <<
-                   " from " << extras.page << " +" << prefixSize);
+                   " from " << extra.page << '+' << prefixSize);
         }
         // else skip a [possibly incomplete] slice that we copied earlier
 
@@ -345,7 +530,7 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc
             if (wasSize >= slice.size) { // did not grow since we started copying
                 sliceOffset += wasSize;
                 sid = slice.next;
-                       }
+            }
         } else if (wasSize >= slice.size) { // did not grow
             break;
         }
@@ -370,7 +555,7 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc
     // would be nice to call validLength() here, but it needs e.key
 
     // we read the entire response into the local memory; no more need to lock
-    disconnect(*e.mem_obj);
+    disconnect(e);
     return true;
 }
 
@@ -383,15 +568,15 @@ MemStore::copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof)
     // from store_client::readBody()
     // parse headers if needed; they might span multiple slices!
     HttpReply *rep = (HttpReply *)e.getReply();
-    if (rep->pstate < psParsed) {
+    if (rep->pstate < Http::Message::psParsed) {
         // XXX: have to copy because httpMsgParseStep() requires 0-termination
         MemBuf mb;
         mb.init(buf.length+1, buf.length+1);
-        mb.append(buf.data, buf.length);        
+        mb.append(buf.data, buf.length);
         mb.terminate();
         const int result = rep->httpMsgParseStep(mb.buf, buf.length, eof);
         if (result > 0) {
-            assert(rep->pstate == psParsed);
+            assert(rep->pstate == Http::Message::psParsed);
             EBIT_CLR(e.flags, ENTRY_FWD_HDR_WAIT);
         } else if (result < 0) {
             debugs(20, DBG_IMPORTANT, "Corrupted mem-cached headers: " << e);
@@ -414,7 +599,7 @@ MemStore::copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof)
 
 /// whether we should cache the entry
 bool
-MemStore::shouldCache(const StoreEntry &e) const
+MemStore::shouldCache(StoreEntry &e) const
 {
     if (e.mem_status == IN_MEMORY) {
         debugs(20, 5, "already loaded from mem-cache: " << e);
@@ -432,30 +617,34 @@ MemStore::shouldCache(const StoreEntry &e) const
     }
 
     assert(e.mem_obj);
-    const int64_t expectedSize = e.mem_obj->expectedReplySize(); // may be < 0
 
-    // objects of unknown size are not allowed into memory cache, for now
-    if (expectedSize < 0) {
-        debugs(20, 5, HERE << "Unknown expected size: " << e);
+    if (!e.mem_obj->vary_headers.isEmpty()) {
+        // XXX: We must store/load SerialisedMetaData to cache Vary in RAM
+        debugs(20, 5, "Vary not yet supported: " << e.mem_obj->vary_headers);
         return false;
     }
 
+    const int64_t expectedSize = e.mem_obj->expectedReplySize(); // may be < 0
     const int64_t loadedSize = e.mem_obj->endOffset();
     const int64_t ramSize = max(loadedSize, expectedSize);
-
     if (ramSize > maxObjectSize()) {
         debugs(20, 5, HERE << "Too big max(" <<
                loadedSize << ", " << expectedSize << "): " << e);
         return false; // will not cache due to cachable entry size limits
     }
 
+    if (!e.mem_obj->isContiguous()) {
+        debugs(20, 5, "not contiguous");
+        return false;
+    }
+
     if (!map) {
         debugs(20, 5, HERE << "No map to mem-cache " << e);
         return false;
     }
 
     if (EBIT_TEST(e.flags, ENTRY_SPECIAL)) {
-        debugs(20, 5, HERE << "Not mem-caching ENTRY_SPECIAL " << e);
+        debugs(20, 5, "Not mem-caching ENTRY_SPECIAL " << e);
         return false;
     }
 
@@ -477,7 +666,11 @@ MemStore::startCaching(StoreEntry &e)
     e.mem_obj->memCache.index = index;
     e.mem_obj->memCache.io = MemObject::ioWriting;
     slot->set(e);
-    map->startAppending(index);
+    // Do not allow others to feed off an unknown-size entry because we will
+    // stop swapping it out if it grows too large.
+    if (e.mem_obj->expectedReplySize() >= 0)
+        map->startAppending(index);
+    e.memOutDecision(true);
     return true;
 }
 
@@ -489,16 +682,12 @@ MemStore::copyToShm(StoreEntry &e)
     // not knowing when the wait is over
     if (EBIT_TEST(e.flags, ENTRY_FWD_HDR_WAIT)) {
         debugs(20, 5, "postponing copying " << e << " for ENTRY_FWD_HDR_WAIT");
-        return;         
+        return;
     }
 
     assert(map);
     assert(e.mem_obj);
 
-    const int32_t index = e.mem_obj->memCache.index;
-    assert(index >= 0);
-    Ipc::StoreMapAnchor &anchor = map->writeableEntry(index);
-
     const int64_t eSize = e.mem_obj->endOffset();
     if (e.mem_obj->memCache.offset >= eSize) {
         debugs(20, 5, "postponing copying " << e << " for lack of news: " <<
@@ -506,49 +695,32 @@ MemStore::copyToShm(StoreEntry &e)
         return; // nothing to do (yet)
     }
 
-    if (anchor.start < 0) { // must allocate the very first slot for e
-        Ipc::Mem::PageId page;
-        anchor.start = reserveSapForWriting(page); // throws
-        map->extras(anchor.start).page = page;
-    }
+    // throw if an accepted unknown-size entry grew too big or max-size changed
+    Must(eSize <= maxObjectSize());
 
+    const int32_t index = e.mem_obj->memCache.index;
+    assert(index >= 0);
+    Ipc::StoreMapAnchor &anchor = map->writeableEntry(index);
     lastWritingSlice = anchor.start;
-    const size_t sliceCapacity = Ipc::Mem::PageSize();
 
     // fill, skip slices that are already full
     // Optimize: remember lastWritingSlice in e.mem_obj
     while (e.mem_obj->memCache.offset < eSize) {
-        Ipc::StoreMap::Slice &slice =
-            map->writeableSlice(e.mem_obj->memCache.index, lastWritingSlice);
-
-        if (slice.size >= sliceCapacity) {
-            if (slice.next >= 0) {
-                lastWritingSlice = slice.next;
-                continue;
-            }
-
-            Ipc::Mem::PageId page;
-            slice.next = lastWritingSlice = reserveSapForWriting(page);
-            map->extras(lastWritingSlice).page = page;
-            debugs(20, 7, "entry " << index << " new slice: " << lastWritingSlice);
-         }
-
-         copyToShmSlice(e, anchor);
+        Ipc::StoreMap::Slice &slice = nextAppendableSlice(
+                                          e.mem_obj->memCache.index, lastWritingSlice);
+        if (anchor.start < 0)
+            anchor.start = lastWritingSlice;
+        copyToShmSlice(e, anchor, slice);
     }
 
-    anchor.basics.swap_file_sz = e.mem_obj->memCache.offset;
     debugs(20, 7, "mem-cached available " << eSize << " bytes of " << e);
 }
 
 /// copies at most one slice worth of local memory to shared memory
 void
-MemStore::copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor)
+MemStore::copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor, Ipc::StoreMap::Slice &slice)
 {
-    Ipc::StoreMap::Slice &slice =
-        map->writeableSlice(e.mem_obj->memCache.index, lastWritingSlice);
-
-    Ipc::Mem::PageId page = map->extras(lastWritingSlice).page;
-    assert(lastWritingSlice >= 0 && page);
+    Ipc::Mem::PageId page = pageForSlice(lastWritingSlice);
     debugs(20, 7, "entry " << e << " slice " << lastWritingSlice << " has " <<
            page);
 
@@ -571,6 +743,55 @@ MemStore::copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor)
 
     slice.size += copied;
     e.mem_obj->memCache.offset += copied;
+    anchor.basics.swap_file_sz = e.mem_obj->memCache.offset;
+}
+
+/// starts checking with the entry chain slice at a given offset and
+/// returns a not-full (but not necessarily empty) slice, updating sliceOffset
+Ipc::StoreMap::Slice &
+MemStore::nextAppendableSlice(const sfileno fileNo, sfileno &sliceOffset)
+{
+    // allocate the very first slot for the entry if needed
+    if (sliceOffset < 0) {
+        Ipc::StoreMapAnchor &anchor = map->writeableEntry(fileNo);
+        Must(anchor.start < 0);
+        Ipc::Mem::PageId page;
+        sliceOffset = reserveSapForWriting(page); // throws
+        extras->items[sliceOffset].page = page;
+        anchor.start = sliceOffset;
+    }
+
+    const size_t sliceCapacity = Ipc::Mem::PageSize();
+    do {
+        Ipc::StoreMap::Slice &slice = map->writeableSlice(fileNo, sliceOffset);
+
+        if (slice.size >= sliceCapacity) {
+            if (slice.next >= 0) {
+                sliceOffset = slice.next;
+                continue;
+            }
+
+            Ipc::Mem::PageId page;
+            slice.next = sliceOffset = reserveSapForWriting(page);
+            extras->items[sliceOffset].page = page;
+            debugs(20, 7, "entry " << fileNo << " new slice: " << sliceOffset);
+            continue; // to get and return the slice at the new sliceOffset
+        }
+
+        return slice;
+    } while (true);
+    /* not reached */
+}
+
+/// safely returns a previously allocated memory page for the given entry slice
+Ipc::Mem::PageId
+MemStore::pageForSlice(Ipc::StoreMapSliceId sliceId)
+{
+    Must(extras);
+    Must(sliceId >= 0);
+    Ipc::Mem::PageId page = extras->items[sliceId].page;
+    Must(page);
+    return page;
 }
 
 /// finds a slot and a free page to fill or throws
@@ -589,7 +810,7 @@ MemStore::reserveSapForWriting(Ipc::Mem::PageId &page)
             freeSlots->push(slot);
         }
     }
-        
+
     // catch free slots delivered to noteFreeMapSlice()
     assert(!waitingFor);
     waitingFor.slot = &slot;
@@ -610,9 +831,9 @@ MemStore::reserveSapForWriting(Ipc::Mem::PageId &page)
 }
 
 void
-MemStore::noteFreeMapSlice(const sfileno sliceId)
+MemStore::noteFreeMapSlice(const Ipc::StoreMapSliceId sliceId)
 {
-    Ipc::Mem::PageId &pageId = map->extras(sliceId).page;
+    Ipc::Mem::PageId &pageId = extras->items[sliceId].page;
     debugs(20, 9, "slice " << sliceId << " freed " << pageId);
     assert(pageId);
     Ipc::Mem::PageId slotId;
@@ -642,12 +863,11 @@ MemStore::write(StoreEntry &e)
     case MemObject::ioUndecided:
         if (!shouldCache(e) || !startCaching(e)) {
             e.mem_obj->memCache.io = MemObject::ioDone;
-            Store::Root().transientsAbandon(e);
-            CollapsedForwarding::Broadcast(e);
+            e.memOutDecision(false);
             return;
         }
         break;
-  
+
     case MemObject::ioDone:
     case MemObject::ioReading:
         return; // we should not write in all of the above cases
@@ -663,15 +883,12 @@ MemStore::write(StoreEntry &e)
         else
             CollapsedForwarding::Broadcast(e);
         return;
-    }
-    catch (const std::exception &x) { // TODO: should we catch ... as well?
+    } catch (const std::exception &x) { // TODO: should we catch ... as well?
         debugs(20, 2, "mem-caching error writing entry " << e << ": " << x.what());
         // fall through to the error handling code
     }
 
-    Store::Root().transientsAbandon(e);
-    disconnect(*e.mem_obj);
-    CollapsedForwarding::Broadcast(e);
+    disconnect(e);
 }
 
 void
@@ -703,30 +920,34 @@ MemStore::markForUnlink(StoreEntry &e)
 void
 MemStore::unlink(StoreEntry &e)
 {
-    assert(e.mem_obj);
-    if (e.mem_obj->memCache.index >= 0) {
+    if (e.mem_obj && e.mem_obj->memCache.index >= 0) {
         map->freeEntry(e.mem_obj->memCache.index);
-        disconnect(*e.mem_obj);
-    } else {
-        // the entry may bave been loaded and then disconnected from the cache
+        disconnect(e);
+    } else if (map) {
+        // the entry may have been loaded and then disconnected from the cache
         map->freeEntryByKey(reinterpret_cast<cache_key*>(e.key));
     }
-        
+
     e.destroyMemObject(); // XXX: but it may contain useful info such as a client list. The old code used to do that though, right?
 }
 
 void
-MemStore::disconnect(MemObject &mem_obj)
+MemStore::disconnect(StoreEntry &e)
 {
+    assert(e.mem_obj);
+    MemObject &mem_obj = *e.mem_obj;
     if (mem_obj.memCache.index >= 0) {
         if (mem_obj.memCache.io == MemObject::ioWriting) {
             map->abortWriting(mem_obj.memCache.index);
+            mem_obj.memCache.index = -1;
+            mem_obj.memCache.io = MemObject::ioDone;
+            Store::Root().transientsAbandon(e); // broadcasts after the change
         } else {
             assert(mem_obj.memCache.io == MemObject::ioReading);
             map->closeForReading(mem_obj.memCache.index);
+            mem_obj.memCache.index = -1;
+            mem_obj.memCache.io = MemObject::ioDone;
         }
-        mem_obj.memCache.index = -1;
-        mem_obj.memCache.io = MemObject::ioDone;
     }
 }
 
@@ -742,42 +963,44 @@ MemStore::EntryLimit()
     return entryLimit;
 }
 
-/// reports our needs for shared memory pages to Ipc::Mem::Pages
-class MemStoreClaimMemoryNeedsRr: public RegisteredRunner
+/// reports our needs for shared memory pages to Ipc::Mem::Pages;
+/// decides whether to use a shared memory cache or checks its configuration;
+/// and initializes shared memory segments used by MemStore
+class MemStoreRr: public Ipc::Mem::RegisteredRunner
 {
 public:
     /* RegisteredRunner API */
-    virtual void run(const RunnerRegistry &r);
+    MemStoreRr(): spaceOwner(NULL), mapOwner(NULL), extrasOwner(NULL) {}
+    virtual void finalizeConfig();
+    virtual void claimMemoryNeeds();
+    virtual void useConfig();
+    virtual ~MemStoreRr();
+
+protected:
+    /* Ipc::Mem::RegisteredRunner API */
+    virtual void create();
+
+private:
+    Ipc::Mem::Owner<Ipc::Mem::PageStack> *spaceOwner; ///< free slices Owner
+    MemStoreMap::Owner *mapOwner; ///< primary map Owner
+    Ipc::Mem::Owner<MemStoreMapExtras> *extrasOwner; ///< PageIds Owner
 };
 
-RunnerRegistrationEntry(rrClaimMemoryNeeds, MemStoreClaimMemoryNeedsRr);
+RunnerRegistrationEntry(MemStoreRr);
 
 void
-MemStoreClaimMemoryNeedsRr::run(const RunnerRegistry &)
+MemStoreRr::claimMemoryNeeds()
 {
     Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::cachePage, MemStore::EntryLimit());
 }
 
-/// decides whether to use a shared memory cache or checks its configuration
-class MemStoreCfgRr: public ::RegisteredRunner
-{
-public:
-    /* RegisteredRunner API */
-    virtual void run(const RunnerRegistry &);
-};
-
-RunnerRegistrationEntry(rrFinalizeConfig, MemStoreCfgRr);
-
-void MemStoreCfgRr::run(const RunnerRegistry &r)
+void
+MemStoreRr::finalizeConfig()
 {
     // decide whether to use a shared memory cache if the user did not specify
     if (!Config.memShared.configured()) {
-        Config.memShared.configure(Ipc::Atomic::Enabled() &&
-                                   Ipc::Mem::Segment::Enabled() && UsingSmp() &&
+        Config.memShared.configure(Ipc::Mem::Segment::Enabled() && UsingSmp() &&
                                    Config.memMaxSize > 0);
-    } else if (Config.memShared && !Ipc::Atomic::Enabled()) {
-        // bail if the user wants shared memory cache but we cannot support it
-        fatal("memory_cache_shared is on, but no support for atomic operations detected");
     } else if (Config.memShared && !Ipc::Mem::Segment::Enabled()) {
         fatal("memory_cache_shared is on, but no support for shared memory detected");
     } else if (Config.memShared && !UsingSmp()) {
@@ -786,32 +1009,15 @@ void MemStoreCfgRr::run(const RunnerRegistry &r)
     }
 }
 
-/// initializes shared memory segments used by MemStore
-class MemStoreRr: public Ipc::Mem::RegisteredRunner
-{
-public:
-    /* RegisteredRunner API */
-    MemStoreRr(): spaceOwner(NULL), mapOwner(NULL) {}
-    virtual void run(const RunnerRegistry &);
-    virtual ~MemStoreRr();
-
-protected:
-    virtual void create(const RunnerRegistry &);
-
-private:
-    Ipc::Mem::Owner<Ipc::Mem::PageStack> *spaceOwner; ///< free slices Owner
-    MemStoreMap::Owner *mapOwner; ///< primary map Owner
-};
-
-RunnerRegistrationEntry(rrAfterConfig, MemStoreRr);
-
-void MemStoreRr::run(const RunnerRegistry &r)
+void
+MemStoreRr::useConfig()
 {
     assert(Config.memShared.configured());
-    Ipc::Mem::RegisteredRunner::run(r);
+    Ipc::Mem::RegisteredRunner::useConfig();
 }
 
-void MemStoreRr::create(const RunnerRegistry &)
+void
+MemStoreRr::create()
 {
     if (!Config.memShared)
         return;
@@ -828,14 +1034,17 @@ void MemStoreRr::create(const RunnerRegistry &)
 
     Must(!spaceOwner);
     spaceOwner = shm_new(Ipc::Mem::PageStack)(SpaceLabel, SpacePoolId,
-                                              entryLimit,
-                                              sizeof(Ipc::Mem::PageId));
+                 entryLimit, 0);
     Must(!mapOwner);
     mapOwner = MemStoreMap::Init(MapLabel, entryLimit);
+    Must(!extrasOwner);
+    extrasOwner = shm_new(MemStoreMapExtras)(ExtrasLabel, entryLimit);
 }
 
 MemStoreRr::~MemStoreRr()
 {
+    delete extrasOwner;
     delete mapOwner;
     delete spaceOwner;
 }
+