]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Bug 7: Update cached entries on 304 responses.
authorAlex Rousskov <rousskov@measurement-factory.com>
Fri, 11 Mar 2016 18:00:51 +0000 (11:00 -0700)
committerAlex Rousskov <rousskov@measurement-factory.com>
Fri, 11 Mar 2016 18:00:51 +0000 (11:00 -0700)
New Store API to update entry metadata and headers on 304s.
Support entry updates in shared memory cache and rock cache_dirs.
No changes to ufs-based cache_dirs: Their entries are still not updated.

* Atomic StoreEntry metadata updating

   StoreEntry metadata (swap_file_sz, timestamps, etc.) is used
   throughout Squid code. Metadata cannot be updated atomically because
   it has many fields, but a partial update to those fields causes
   assertions. Still, we must update metadata when updating HTTP
   headers. Locking the entire entry for a rewrite does not work well
   because concurrent requests will attempt to download a new entry
   copy, defeating the very HTTP 304 optimization we want to support.

   Ipc::StoreMap index now uses an extra level of indirection (the
   StoreMap::fileNos index) which allows StoreMap control which
   anchor/fileno is associated with a given StoreEntry key. The entry
   updating code creates a disassociated (i.e., entry/key-less) anchor,
   writes new metadata and headers using that new anchor, and then
   _atomically_ switches the map to use that new anchor. This allows old
   readers to continue reading using the stale anchor/fileno as if
   nothing happened while a new reader gets the new anchor/fileno.

   Shared memory usage increase: 8 additional bytes per cache entry: 4
   for the extra level of indirection (StoreMapFileNos) plus 4 for
   splicing fresh chain prefix with the stale chain suffix
   (StoreMapAnchor::splicingPoint). However, if the updated headers are
   larger than the stale ones, Squid will allocate shared memory pages
   to accommodate for the increase, leading to shared memory
   fragmentation/waste for small increases.

* Revamped rock index rebuild process

   The index rebuild process had to be completely revamped because
   splicing fresh and stale entry slot chain segments implies tolerating
   multiple entry versions in a single chain and the old code was based
   on the assumption that different slot versions are incompatible. We
   were also uncomfortable with the old cavalier approach to accessing
   two differently indexed layers of information (entry vs. slot) using
   the same set of class fields, making it trivial to accidentally
   access entry data while using slot index.

   During the rewrite of the index rebuilding code, we also discovered a
   way to significantly reduce RAM usage for the index build map (a
   temporary object that is allocated in the beginning and freed at the
   end of the index build process). The savings depend on the cache
   size: A small cache saves about 30% (17 vs 24 bytes per entry/slot)
   while a 1TB cache_dir with 32KB slots (which implies uneven
   entry/slot indexes) saves more than 50% (~370MB vs. ~800MB).

   Adjusted how invalid slots are counted. The code was sometimes
   counting invalid entries and sometimes invalid entry slots. We should
   always count _slots_ now because progress is measured in the number
   of slots scanned, not entries loaded. This accounting change may
   surprise users with much higher "Invalid entries" count in cache.log
   upon startup, but at least the new reports are meaningful.

   This rewrite does not attempt to solve all rock index build problems.
   For example, the code still assumes that StoreEntry metadata fits a
   single slot which is not always true for very small slots.

28 files changed:
src/MemStore.cc
src/MemStore.h
src/StoreIOState.cc
src/StoreIOState.h
src/client_side_reply.cc
src/fs/Makefile.am
src/fs/rock/RockHeaderUpdater.cc [new file with mode: 0644]
src/fs/rock/RockHeaderUpdater.h [new file with mode: 0644]
src/fs/rock/RockIoState.cc
src/fs/rock/RockIoState.h
src/fs/rock/RockRebuild.cc
src/fs/rock/RockRebuild.h
src/fs/rock/RockSwapDir.cc
src/fs/rock/RockSwapDir.h
src/fs/rock/forward.h
src/http.cc
src/ipc/ReadWriteLock.cc
src/ipc/ReadWriteLock.h
src/ipc/StoreMap.cc
src/ipc/StoreMap.h
src/peer_digest.cc
src/store/Controlled.h
src/store/Controller.cc
src/store/Controller.h
src/store/Disks.cc
src/store/Disks.h
src/tests/stub_HttpReply.cc
src/tests/stub_MemStore.cc

index e7b2722c05aff552000287c56ae92159e50b9913..8393dda240f4d5e50390a73867290869aa017903 100644 (file)
@@ -35,6 +35,138 @@ static const char *ExtrasLabel = "cache_mem_ex";
 // used except for a positivity test. A unique value is handy for debugging.
 static const uint32_t SpacePoolId = 510716;
 
+/// Packs to shared memory, allocating new slots/pages as needed.
+/// Requires an Ipc::StoreMapAnchor locked for writing.
+class ShmWriter: public Packable
+{
+public:
+    ShmWriter(MemStore &aStore, StoreEntry *anEntry, const sfileno aFileNo, Ipc::StoreMapSliceId aFirstSlice = -1);
+
+    /* Packable API */
+    virtual void append(const char *aBuf, int aSize) override;
+    virtual void vappendf(const char *fmt, va_list ap) override;
+
+public:
+    StoreEntry *entry; ///< the entry being updated
+
+    /// the slot keeping the first byte of the appended content (at least)
+    /// either set via constructor parameter or allocated by the first append
+    Ipc::StoreMapSliceId firstSlice;
+
+    /// the slot keeping the last byte of the appended content (at least)
+    Ipc::StoreMapSliceId lastSlice;
+
+    uint64_t totalWritten; ///< cumulative number of bytes appended so far
+
+protected:
+    void copyToShm();
+    void copyToShmSlice(Ipc::StoreMap::Slice &slice);
+
+private:
+    MemStore &store;
+    const sfileno fileNo;
+
+    /* set by (and only valid during) append calls */
+    const char *buf; ///< content being appended now
+    int bufSize; ///< buf size
+    int bufWritten; ///< buf bytes appended so far
+};
+
+/* ShmWriter */
+
+ShmWriter::ShmWriter(MemStore &aStore, StoreEntry *anEntry, const sfileno aFileNo, Ipc::StoreMapSliceId aFirstSlice):
+    entry(anEntry),
+    firstSlice(aFirstSlice),
+    lastSlice(firstSlice),
+    totalWritten(0),
+    store(aStore),
+    fileNo(aFileNo),
+    buf(nullptr),
+    bufSize(0),
+    bufWritten(0)
+{
+    Must(entry);
+}
+
+void
+ShmWriter::append(const char *aBuf, int aBufSize)
+{
+    Must(!buf);
+    buf = aBuf;
+    bufSize = aBufSize;
+    if (bufSize) {
+        Must(buf);
+        bufWritten = 0;
+        copyToShm();
+    }
+    buf = nullptr;
+    bufSize = 0;
+    bufWritten = 0;
+}
+
+void
+ShmWriter::vappendf(const char *fmt, va_list ap)
+{
+    SBuf vaBuf;
+#if defined(VA_COPY)
+    va_list apCopy;
+    VA_COPY(apCopy, ap);
+    vaBuf.vappendf(fmt, apCopy);
+    va_end(apCopy);
+#else
+    vaBuf.vappendf(fmt, ap);
+#endif
+    append(vaBuf.rawContent(), vaBuf.length());
+}
+
+/// copies the entire buffer to shared memory
+void
+ShmWriter::copyToShm()
+{
+    Must(bufSize > 0); // do not use up shared memory pages for nothing
+    Must(firstSlice < 0 || lastSlice >= 0);
+
+    // fill, skip slices that are already full
+    while (bufWritten < bufSize) {
+        Ipc::StoreMap::Slice &slice = store.nextAppendableSlice(fileNo, lastSlice);
+        if (firstSlice < 0)
+            firstSlice = lastSlice;
+        copyToShmSlice(slice);
+    }
+
+    debugs(20, 7, "stored " << bufWritten << '/' << totalWritten << " header bytes of " << *entry);
+}
+
+/// copies at most one slice worth of buffer to shared memory
+void
+ShmWriter::copyToShmSlice(Ipc::StoreMap::Slice &slice)
+{
+    Ipc::Mem::PageId page = store.pageForSlice(lastSlice);
+    debugs(20, 7, "entry " << *entry << " slice " << lastSlice << " has " <<
+           page);
+
+    Must(bufWritten <= bufSize);
+    const int64_t writingDebt = bufSize - bufWritten;
+    const int64_t pageSize = Ipc::Mem::PageSize();
+    const int64_t sliceOffset = totalWritten % pageSize;
+    const int64_t copySize = std::min(writingDebt, pageSize - sliceOffset);
+    memcpy(static_cast<char*>(PagePointer(page)) + sliceOffset, buf + bufWritten,
+           copySize);
+
+    debugs(20, 7, "copied " << slice.size << '+' << copySize << " bytes of " <<
+           entry << " from " << sliceOffset << " in " << page);
+
+    slice.size += copySize;
+    bufWritten += copySize;
+    totalWritten += copySize;
+    // fresh anchor.basics.swap_file_sz is already set [to the stale value]
+
+    // either we wrote everything or we filled the entire slice
+    Must(bufWritten == bufSize || sliceOffset + copySize == pageSize);
+}
+
+/* MemStore */
+
 MemStore::MemStore(): map(NULL), lastWritingSlice(-1)
 {
 }
@@ -207,6 +339,69 @@ MemStore::get(const cache_key *key)
     return NULL;
 }
 
+void
+MemStore::updateHeaders(StoreEntry *updatedE)
+{
+    if (!map)
+        return;
+
+    Ipc::StoreMapUpdate update(updatedE);
+    assert(updatedE);
+    assert(updatedE->mem_obj);
+    if (!map->openForUpdating(update, updatedE->mem_obj->memCache.index))
+        return;
+
+    try {
+        updateHeadersOrThrow(update);
+    } catch (const std::exception &ex) {
+        debugs(20, 2, "error starting to update entry " << *updatedE << ": " << ex.what());
+        map->abortUpdating(update);
+    }
+}
+
+void
+MemStore::updateHeadersOrThrow(Ipc::StoreMapUpdate &update)
+{
+    // our +/- hdr_sz math below does not work if the chains differ [in size]
+    Must(update.stale.anchor->basics.swap_file_sz == update.fresh.anchor->basics.swap_file_sz);
+
+    const HttpReply *rawReply = update.entry->getReply();
+    Must(rawReply);
+    const HttpReply &reply = *rawReply;
+    const uint64_t staleHdrSz = reply.hdr_sz;
+    debugs(20, 7, "stale hdr_sz: " << staleHdrSz);
+
+    /* we will need to copy same-slice payload after the stored headers later */
+    Must(staleHdrSz > 0);
+    update.stale.splicingPoint = map->sliceContaining(update.stale.fileNo, staleHdrSz);
+    Must(update.stale.splicingPoint >= 0);
+    Must(update.stale.anchor->basics.swap_file_sz >= staleHdrSz);
+
+    Must(update.stale.anchor);
+    ShmWriter writer(*this, update.entry, update.fresh.fileNo);
+    reply.packHeadersInto(&writer);
+    const uint64_t freshHdrSz = writer.totalWritten;
+    debugs(20, 7, "fresh hdr_sz: " << freshHdrSz << " diff: " << (freshHdrSz - staleHdrSz));
+
+    /* copy same-slice payload remaining after the stored headers */
+    const Ipc::StoreMapSlice &slice = map->readableSlice(update.stale.fileNo, update.stale.splicingPoint);
+    const Ipc::StoreMapSlice::Size sliceCapacity = Ipc::Mem::PageSize();
+    const Ipc::StoreMapSlice::Size headersInLastSlice = staleHdrSz % sliceCapacity;
+    Must(headersInLastSlice > 0); // or sliceContaining() would have stopped earlier
+    Must(slice.size >= headersInLastSlice);
+    const Ipc::StoreMapSlice::Size payloadInLastSlice = slice.size - headersInLastSlice;
+    const MemStoreMapExtras::Item &extra = extras->items[update.stale.splicingPoint];
+    char *page = static_cast<char*>(PagePointer(extra.page));
+    debugs(20, 5, "appending same-slice payload: " << payloadInLastSlice);
+    writer.append(page + headersInLastSlice, payloadInLastSlice);
+    update.fresh.splicingPoint = writer.lastSlice;
+
+    update.fresh.anchor->basics.swap_file_sz -= staleHdrSz;
+    update.fresh.anchor->basics.swap_file_sz += freshHdrSz;
+
+    map->closeForUpdating(update);
+}
+
 bool
 MemStore::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
 {
@@ -498,10 +693,6 @@ MemStore::copyToShm(StoreEntry &e)
     assert(map);
     assert(e.mem_obj);
 
-    const int32_t index = e.mem_obj->memCache.index;
-    assert(index >= 0);
-    Ipc::StoreMapAnchor &anchor = map->writeableEntry(index);
-
     const int64_t eSize = e.mem_obj->endOffset();
     if (e.mem_obj->memCache.offset >= eSize) {
         debugs(20, 5, "postponing copying " << e << " for lack of news: " <<
@@ -509,34 +700,19 @@ MemStore::copyToShm(StoreEntry &e)
         return; // nothing to do (yet)
     }
 
-    if (anchor.start < 0) { // must allocate the very first slot for e
-        Ipc::Mem::PageId page;
-        anchor.start = reserveSapForWriting(page); // throws
-        extras->items[anchor.start].page = page;
-    }
-
+    const int32_t index = e.mem_obj->memCache.index;
+    assert(index >= 0);
+    Ipc::StoreMapAnchor &anchor = map->writeableEntry(index);
     lastWritingSlice = anchor.start;
-    const size_t sliceCapacity = Ipc::Mem::PageSize();
 
     // fill, skip slices that are already full
     // Optimize: remember lastWritingSlice in e.mem_obj
     while (e.mem_obj->memCache.offset < eSize) {
-        Ipc::StoreMap::Slice &slice =
-            map->writeableSlice(e.mem_obj->memCache.index, lastWritingSlice);
-
-        if (slice.size >= sliceCapacity) {
-            if (slice.next >= 0) {
-                lastWritingSlice = slice.next;
-                continue;
-            }
-
-            Ipc::Mem::PageId page;
-            slice.next = lastWritingSlice = reserveSapForWriting(page);
-            extras->items[lastWritingSlice].page = page;
-            debugs(20, 7, "entry " << index << " new slice: " << lastWritingSlice);
-        }
-
-        copyToShmSlice(e, anchor);
+        Ipc::StoreMap::Slice &slice = nextAppendableSlice(
+                                          e.mem_obj->memCache.index, lastWritingSlice);
+        if (anchor.start < 0)
+            anchor.start = lastWritingSlice;
+        copyToShmSlice(e, anchor, slice);
     }
 
     debugs(20, 7, "mem-cached available " << eSize << " bytes of " << e);
@@ -544,13 +720,9 @@ MemStore::copyToShm(StoreEntry &e)
 
 /// copies at most one slice worth of local memory to shared memory
 void
-MemStore::copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor)
+MemStore::copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor, Ipc::StoreMap::Slice &slice)
 {
-    Ipc::StoreMap::Slice &slice =
-        map->writeableSlice(e.mem_obj->memCache.index, lastWritingSlice);
-
-    Ipc::Mem::PageId page = extras->items[lastWritingSlice].page;
-    assert(lastWritingSlice >= 0 && page);
+    Ipc::Mem::PageId page = pageForSlice(lastWritingSlice);
     debugs(20, 7, "entry " << e << " slice " << lastWritingSlice << " has " <<
            page);
 
@@ -576,6 +748,53 @@ MemStore::copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor)
     anchor.basics.swap_file_sz = e.mem_obj->memCache.offset;
 }
 
+/// starts checking with the entry chain slice at a given offset and
+/// returns a not-full (but not necessarily empty) slice, updating sliceOffset
+Ipc::StoreMap::Slice &
+MemStore::nextAppendableSlice(const sfileno fileNo, sfileno &sliceOffset)
+{
+    // allocate the very first slot for the entry if needed
+    if (sliceOffset < 0) {
+        Ipc::StoreMapAnchor &anchor = map->writeableEntry(fileNo);
+        Must(anchor.start < 0);
+        Ipc::Mem::PageId page;
+        sliceOffset = reserveSapForWriting(page); // throws
+        extras->items[sliceOffset].page = page;
+        anchor.start = sliceOffset;
+    }
+
+    const size_t sliceCapacity = Ipc::Mem::PageSize();
+    do {
+        Ipc::StoreMap::Slice &slice = map->writeableSlice(fileNo, sliceOffset);
+
+        if (slice.size >= sliceCapacity) {
+            if (slice.next >= 0) {
+                sliceOffset = slice.next;
+                continue;
+            }
+
+            Ipc::Mem::PageId page;
+            slice.next = sliceOffset = reserveSapForWriting(page);
+            extras->items[sliceOffset].page = page;
+            debugs(20, 7, "entry " << fileNo << " new slice: " << sliceOffset);
+        }
+
+        return slice;
+    } while (true);
+    /* not reached */
+}
+
+/// safely returns a previously allocated memory page for the given entry slice
+Ipc::Mem::PageId
+MemStore::pageForSlice(Ipc::StoreMapSliceId sliceId)
+{
+    Must(extras);
+    Must(sliceId >= 0);
+    Ipc::Mem::PageId page = extras->items[sliceId].page;
+    Must(page);
+    return page;
+}
+
 /// finds a slot and a free page to fill or throws
 sfileno
 MemStore::reserveSapForWriting(Ipc::Mem::PageId &page)
index 179195fc9d8af5fb8cc5034d99058fc0e03a1b76..6214b50e21be0099f96809ee4ecc762ed0939edc 100644 (file)
@@ -22,6 +22,8 @@ struct MemStoreMapExtraItem {
 typedef Ipc::StoreMapItems<MemStoreMapExtraItem> MemStoreMapExtras;
 typedef Ipc::StoreMap MemStoreMap;
 
+class ShmWriter;
+
 /// Stores HTTP entities in RAM. Current implementation uses shared memory.
 /// Unlike a disk store (SwapDir), operations are synchronous (and fast).
 class MemStore: public Store::Controlled, public Ipc::StoreMapCleaner
@@ -55,6 +57,7 @@ public:
     virtual void stat(StoreEntry &e) const override;
     virtual void reference(StoreEntry &e) override;
     virtual bool dereference(StoreEntry &e) override;
+    virtual void updateHeaders(StoreEntry *e) override;
     virtual void maintain() override;
     virtual bool anchorCollapsed(StoreEntry &e, bool &inSync) override;
     virtual bool updateCollapsed(StoreEntry &e) override;
@@ -64,17 +67,23 @@ public:
     static int64_t EntryLimit();
 
 protected:
+    friend ShmWriter;
+
     bool shouldCache(StoreEntry &e) const;
     bool startCaching(StoreEntry &e);
 
     void copyToShm(StoreEntry &e);
-    void copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor);
+    void copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor, Ipc::StoreMap::Slice &slice);
     bool copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor);
     bool copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof);
 
+    void updateHeadersOrThrow(Ipc::StoreMapUpdate &update);
+
     void anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor);
     bool updateCollapsedWith(StoreEntry &collapsed, const sfileno index, const Ipc::StoreMapAnchor &anchor);
 
+    Ipc::Mem::PageId pageForSlice(Ipc::StoreMapSliceId sliceId);
+    Ipc::StoreMap::Slice &nextAppendableSlice(const sfileno entryIndex, sfileno &sliceOffset);
     sfileno reserveSapForWriting(Ipc::Mem::PageId &page);
 
     // Ipc::StoreMapCleaner API
index 8a89b807d025faa6de862c734f8b6e99234c37cd..e741921d51a4b71a591780fd7c940ee203220c69 100644 (file)
@@ -11,6 +11,7 @@
 #include "squid.h"
 #include "Debug.h"
 #include "defines.h"
+#include "Store.h"
 #include "StoreIOState.h"
 
 void *
@@ -52,3 +53,8 @@ StoreIOState::~StoreIOState()
         cbdataReferenceDone(callback_data);
 }
 
+bool StoreIOState::touchingStoreEntry() const
+{
+    return e && e->swap_filen == swap_filen;
+}
+
index b3f4c5f1f908ece9f36876d8646bbcfe2f56e14a..91779943b8c815f4b81498362e1054fb82e68288 100644 (file)
@@ -73,12 +73,19 @@ public:
     } CloseHow;
     virtual void close(int how) = 0; ///< finish or abort swapping per CloseHow
 
+    // Tests whether we are working with the primary/public StoreEntry chain.
+    // Reads start reading the primary chain, but it may become secondary.
+    // There are two store write kinds:
+    // * regular writes that change (usually append) the entry visible to all and
+    // * header updates that create a fresh chain (while keeping the stale one usable).
+    bool touchingStoreEntry() const;
+
     sdirno swap_dirn;
     sfileno swap_filen;
     StoreEntry *e;      /* Need this so the FS layers can play god */
     mode_t mode;
     off_t offset_; ///< number of bytes written or read for this entry so far
-    STFNCB *file_callback;  /* called on delayed sfileno assignments */
+    STFNCB *file_callback;  // XXX: Unused. TODO: Remove.
     STIOCB *callback;
     void *callback_data;
 
index f9ed177b45954170a8309cd702d46c99465849ed..32fb00055d783afc6c6c7cf1eb7c58decdf94967 100644 (file)
@@ -398,7 +398,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result)
         sendClientOldEntry();
     }
 
-    HttpReply *old_rep = (HttpReply *) old_entry->getReply();
+    const HttpReply *old_rep = old_entry->getReply();
 
     // origin replied 304
     if (status == Http::scNotModified) {
@@ -406,8 +406,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result)
         http->request->flags.staleIfHit = false; // old_entry is no longer stale
 
         // update headers on existing entry
-        old_rep->updateOnNotModified(http->storeEntry()->getReply());
-        old_entry->timestampsSet();
+        Store::Root().updateOnNotModified(old_entry, *http->storeEntry());
 
         // if client sent IMS
 
index 5ba97cfc2df2f9d7447b557fa2fec8758dfef14f..1890b2050fa720a6eef5b53dd62ff8c622f247b1 100644 (file)
@@ -38,6 +38,8 @@ librock_la_SOURCES = \
        rock/forward.h \
        rock/RockDbCell.cc \
        rock/RockDbCell.h \
+       rock/RockHeaderUpdater.cc \
+       rock/RockHeaderUpdater.h \
        rock/RockIoState.cc \
        rock/RockIoState.h \
        rock/RockIoRequests.cc \
diff --git a/src/fs/rock/RockHeaderUpdater.cc b/src/fs/rock/RockHeaderUpdater.cc
new file mode 100644 (file)
index 0000000..caf5fc6
--- /dev/null
@@ -0,0 +1,267 @@
+/*
+ * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+#include "base/AsyncJobCalls.h"
+#include "Debug.h"
+#include "fs/rock/RockHeaderUpdater.h"
+#include "fs/rock/RockIoState.h"
+#include "mime_header.h"
+#include "Store.h"
+#include "StoreMetaUnpacker.h"
+
+CBDATA_NAMESPACED_CLASS_INIT(Rock, HeaderUpdater);
+
+Rock::HeaderUpdater::HeaderUpdater(const Rock::SwapDir::Pointer &aStore, const Ipc::StoreMapUpdate &anUpdate):
+    AsyncJob("Rock::HeaderUpdater"),
+    store(aStore),
+    update(anUpdate),
+    reader(),
+    writer(),
+    bytesRead(0),
+    staleSwapHeaderSize(0),
+    staleSplicingPointNext(-1)
+{
+    // TODO: Consider limiting the number of concurrent store updates.
+}
+
+bool
+Rock::HeaderUpdater::doneAll() const
+{
+    return !reader && !writer && AsyncJob::doneAll();
+}
+
+void
+Rock::HeaderUpdater::swanSong()
+{
+    if (update.stale || update.fresh)
+        store->map->abortUpdating(update);
+
+    if (reader) {
+        reader->close(StoreIOState::readerDone);
+        reader = nullptr;
+    }
+
+    if (writer) {
+        writer->close(StoreIOState::writerGone);
+        // Emulate SwapDir::disconnect() that writeCompleted(err) hopes for.
+        // Also required to avoid IoState destructor assertions.
+        // We can do this because we closed update earlier or aborted it above.
+        dynamic_cast<IoState&>(*writer).writeableAnchor_ = nullptr;
+        writer = nullptr;
+    }
+
+    AsyncJob::swanSong();
+}
+
+void
+Rock::HeaderUpdater::start()
+{
+    Must(update.entry);
+    Must(update.stale);
+    Must(update.fresh);
+    startReading();
+}
+
+void
+Rock::HeaderUpdater::startReading()
+{
+    reader = store->openStoreIO(
+                 *update.entry,
+                 nullptr, // unused; see StoreIOState::file_callback
+                 &NoteDoneReading,
+                 this);
+    readMore("need swap entry metadata");
+}
+
+void
+Rock::HeaderUpdater::stopReading(const char *why)
+{
+    debugs(47, 7, why);
+
+    Must(reader);
+    const IoState &rockReader = dynamic_cast<IoState&>(*reader);
+    update.stale.splicingPoint = rockReader.splicingPoint;
+    staleSplicingPointNext = rockReader.staleSplicingPointNext;
+    debugs(47, 5, "stale chain ends at " << update.stale.splicingPoint <<
+           " body continues at " << staleSplicingPointNext);
+
+    reader->close(StoreIOState::readerDone); // calls noteDoneReading(0)
+    reader = nullptr; // so that swanSong() does not try to close again
+}
+
+void
+Rock::HeaderUpdater::NoteRead(void *data, const char *buf, ssize_t result, StoreIOState::Pointer)
+{
+    // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
+    CallJobHere1(47, 7,
+                 CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
+                 Rock::HeaderUpdater,
+                 noteRead,
+                 result);
+}
+
+void
+Rock::HeaderUpdater::noteRead(ssize_t result)
+{
+    debugs(47, 7, result);
+    if (!result) { // EOF
+        stopReading("eof");
+    } else {
+        Must(result > 0);
+        bytesRead += result;
+        readerBuffer.forceSize(readerBuffer.length() + result);
+        exchangeBuffer.append(readerBuffer);
+        debugs(47, 7, "accumulated " << exchangeBuffer.length());
+    }
+
+    parseReadBytes();
+}
+
+void
+Rock::HeaderUpdater::readMore(const char *why)
+{
+    debugs(47, 7, "from " << bytesRead << " because " << why);
+    Must(reader);
+    readerBuffer.clear();
+    storeRead(reader,
+              readerBuffer.rawSpace(store->slotSize),
+              store->slotSize,
+              bytesRead,
+              &NoteRead,
+              this);
+}
+
+void
+Rock::HeaderUpdater::NoteDoneReading(void *data, int errflag, StoreIOState::Pointer)
+{
+    // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
+    CallJobHere1(47, 7,
+                 CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
+                 Rock::HeaderUpdater,
+                 noteDoneReading,
+                 errflag);
+}
+
+void
+Rock::HeaderUpdater::noteDoneReading(int errflag)
+{
+    debugs(47, 5, errflag << " writer=" << writer);
+    if (const bool weInitiatedThisClosure = !reader) {
+        Must(!errflag); // we only initiate successful closures
+        Must(writer); // otherwise we would be done() and would not be called
+    } else {
+        reader = nullptr; // we are done reading
+        Must(errflag); // any external closures ought to be errors
+        mustStop("read error");
+    }
+}
+
+void
+Rock::HeaderUpdater::startWriting()
+{
+    writer = store->createUpdateIO(
+                 update,
+                 nullptr, // unused; see StoreIOState::file_callback
+                 &NoteDoneWriting,
+                 this);
+    Must(writer);
+
+    IoState &rockWriter = dynamic_cast<IoState&>(*writer);
+    rockWriter.staleSplicingPointNext = staleSplicingPointNext;
+
+    off_t offset = 0; // current writing offset (for debugging)
+
+    {
+        debugs(20, 7, "fresh store meta for " << *update.entry);
+        const char *freshSwapHeader = update.entry->getSerialisedMetaData();
+        const auto freshSwapHeaderSize = update.entry->mem_obj->swap_hdr_sz;
+        Must(freshSwapHeader);
+        writer->write(freshSwapHeader, freshSwapHeaderSize, 0, nullptr);
+        offset += freshSwapHeaderSize;
+    }
+
+    {
+        debugs(20, 7, "fresh HTTP header @ " << offset);
+        MemBuf *httpHeader = update.entry->mem_obj->getReply()->pack();
+        writer->write(httpHeader->content(), httpHeader->contentSize(), -1, nullptr);
+        offset += httpHeader->contentSize();
+        delete httpHeader;
+    }
+
+    {
+        debugs(20, 7, "moved HTTP body prefix @ " << offset);
+        writer->write(exchangeBuffer.rawContent(), exchangeBuffer.length(), -1, nullptr);
+        offset += exchangeBuffer.length();
+        exchangeBuffer.clear();
+    }
+
+    debugs(20, 7, "wrote " << offset);
+
+    writer->close(StoreIOState::wroteAll); // should call noteDoneWriting()
+}
+
+void
+Rock::HeaderUpdater::NoteDoneWriting(void *data, int errflag, StoreIOState::Pointer)
+{
+    CallJobHere1(47, 7,
+                 CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
+                 Rock::HeaderUpdater,
+                 noteDoneWriting,
+                 errflag);
+}
+
+void
+Rock::HeaderUpdater::noteDoneWriting(int errflag)
+{
+    debugs(47, 5, errflag << " reader=" << reader);
+    Must(!errflag);
+    Must(!reader); // if we wrote everything, then we must have read everything
+
+    Must(writer);
+    IoState &rockWriter = dynamic_cast<IoState&>(*writer);
+    update.fresh.splicingPoint = rockWriter.splicingPoint;
+    debugs(47, 5, "fresh chain ends at " << update.fresh.splicingPoint);
+    store->map->closeForUpdating(update);
+    rockWriter.writeableAnchor_ = nullptr;
+    writer = nullptr; // we are done writing
+
+    Must(doneAll());
+}
+
+void
+Rock::HeaderUpdater::parseReadBytes()
+{
+    if (!staleSwapHeaderSize) {
+        StoreMetaUnpacker aBuilder(
+            exchangeBuffer.rawContent(),
+            exchangeBuffer.length(),
+            &staleSwapHeaderSize);
+        // Squid assumes that metadata always fits into a single db slot
+        Must(aBuilder.isBufferSane()); // cannot update what we cannot parse
+        debugs(47, 7, "staleSwapHeaderSize=" << staleSwapHeaderSize);
+        Must(staleSwapHeaderSize > 0);
+        exchangeBuffer.consume(staleSwapHeaderSize);
+    }
+
+    const size_t staleHttpHeaderSize = headersEnd(
+                                           exchangeBuffer.rawContent(),
+                                           exchangeBuffer.length());
+    debugs(47, 7, "staleHttpHeaderSize=" << staleHttpHeaderSize);
+    if (!staleHttpHeaderSize) {
+        readMore("need more stale HTTP reply header data");
+        return;
+    }
+
+    exchangeBuffer.consume(staleHttpHeaderSize);
+    debugs(47, 7, "httpBodySizePrefix=" << exchangeBuffer.length());
+
+    stopReading("read the last HTTP header slot");
+    startWriting();
+}
+
diff --git a/src/fs/rock/RockHeaderUpdater.h b/src/fs/rock/RockHeaderUpdater.h
new file mode 100644 (file)
index 0000000..704bb76
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef SQUID_FS_ROCK_HEADER_UPDATER_H
+#define SQUID_FS_ROCK_HEADER_UPDATER_H
+
+#include "base/AsyncJob.h"
+#include "cbdata.h"
+#include "fs/rock/forward.h"
+#include "fs/rock/RockSwapDir.h"
+#include "ipc/StoreMap.h"
+
+namespace Rock
+{
+
+/// Updates HTTP headers of a single Rock store entry:
+/// * reads old body data in the same slot as the last old headers slot, if any
+/// * writes new headers (1+ slots)
+/// * writes old data (0-2 slots)
+/// * chains the new entry prefix (1+ slots) to the old entry suffix (0+ slots)
+class HeaderUpdater: public AsyncJob
+{
+    CBDATA_CLASS(HeaderUpdater);
+
+public:
+    HeaderUpdater(const Rock::SwapDir::Pointer &aStore, const Ipc::StoreMapUpdate &update);
+    virtual ~HeaderUpdater() override = default;
+
+protected:
+    /* AsyncJob API */
+    virtual void start() override;
+    virtual bool doneAll() const override;
+    virtual void swanSong() override;
+
+private:
+    static StoreIOState::STRCB NoteRead;
+    static StoreIOState::STIOCB NoteDoneReading;
+    static StoreIOState::STIOCB NoteDoneWriting;
+
+    void startReading();
+    void stopReading(const char *why);
+    void readMore(const char *why);
+    void noteRead(ssize_t result);
+    void noteDoneReading(int errflag);
+    void parseReadBytes();
+
+    void startWriting();
+    void noteDoneWriting(int errflag);
+
+    Rock::SwapDir::Pointer store; ///< cache_dir where the entry is stored
+    Ipc::StoreMapUpdate update; ///< Ipc::StoreMap update reservation
+
+    StoreIOState::Pointer reader; ///< reads old headers and old data
+    StoreIOState::Pointer writer; ///< writes new headers and old data
+
+    SBuf readerBuffer; ///< I/O buffer for a single read operation
+    SBuf exchangeBuffer; ///< bytes read but not yet discarded or written
+    uint64_t bytesRead; ///< total entry bytes read from Store so far
+
+    int staleSwapHeaderSize; ///< stored size of the stale entry metadata
+
+    SlotId staleSplicingPointNext; ///< non-updatable old HTTP body suffix start
+};
+
+} // namespace Rock
+
+#endif /* SQUID_FS_ROCK_HEADER_UPDATER_H */
+
index 72f49ac1769d702d5041d1ec4a2fac3e9763c21a..e43ea441a5e571402b6682fd0af901d74ed46be5 100644 (file)
@@ -30,10 +30,12 @@ Rock::IoState::IoState(Rock::SwapDir::Pointer &aDir,
     StoreIOState(cbFile, cbIo, data),
     readableAnchor_(NULL),
     writeableAnchor_(NULL),
-    sidCurrent(-1),
+    splicingPoint(-1),
+    staleSplicingPointNext(-1),
     dir(aDir),
     slotSize(dir->slotSize),
     objOffset(0),
+    sidCurrent(-1),
     theBuf(dir->slotSize)
 {
     e = anEntry;
@@ -132,6 +134,11 @@ void
 Rock::IoState::callReaderBack(const char *buf, int rlen)
 {
     debugs(79, 5, rlen << " bytes for " << *e);
+    splicingPoint = rlen >= 0 ? sidCurrent : -1;
+    if (splicingPoint < 0)
+        staleSplicingPointNext = -1;
+    else
+        staleSplicingPointNext = currentReadableSlice().next;
     StoreIOState::STRCB *callb = read.callback;
     assert(callb);
     read.callback = NULL;
@@ -150,7 +157,7 @@ Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
         success = true;
     } catch (const std::exception &ex) { // TODO: should we catch ... as well?
         debugs(79, 2, "db write error: " << ex.what());
-        dir->writeError(*e);
+        dir->writeError(*this);
         finishedWriting(DISK_ERROR);
         // 'this' might be gone beyond this point; fall through to free buf
     }
@@ -202,7 +209,7 @@ Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
             writeToDisk(sidNext);
         } else if (Store::Root().transientReaders(*e)) {
             // write partial buffer for all remote hit readers to see
-            writeBufToDisk(-1, false);
+            writeBufToDisk(-1, false, false);
         }
     }
 
@@ -231,15 +238,24 @@ Rock::IoState::writeToBuffer(char const *buf, size_t size)
 /// write what was buffered during write() calls
 /// negative sidNext means this is the last write request for this entry
 void
-Rock::IoState::writeToDisk(const SlotId sidNext)
+Rock::IoState::writeToDisk(const SlotId sidNextProposal)
 {
     assert(theFile != NULL);
     assert(theBuf.size >= sizeof(DbCellHeader));
 
+    const bool lastWrite = sidNextProposal < 0;
+    const bool eof = lastWrite &&
+                     // either not updating or the updating reader has loaded everything
+                     (touchingStoreEntry() || staleSplicingPointNext < 0);
+    // approve sidNextProposal unless _updating_ the last slot
+    const SlotId sidNext = (!touchingStoreEntry() && lastWrite) ?
+                           staleSplicingPointNext : sidNextProposal;
+    debugs(79, 5, "sidNext:" << sidNextProposal << "=>" << sidNext << " eof=" << eof);
+
     // TODO: if DiskIO module is mmap-based, we should be writing whole pages
     // to avoid triggering read-page;new_head+old_tail;write-page overheads
 
-    writeBufToDisk(sidNext, sidNext < 0);
+    writeBufToDisk(sidNext, eof, lastWrite);
     theBuf.clear();
 
     sidCurrent = sidNext;
@@ -248,7 +264,7 @@ Rock::IoState::writeToDisk(const SlotId sidNext)
 /// creates and submits a request to write current slot buffer to disk
 /// eof is true if and only this is the last slot
 void
-Rock::IoState::writeBufToDisk(const SlotId sidNext, bool eof)
+Rock::IoState::writeBufToDisk(const SlotId sidNext, const bool eof, const bool lastWrite)
 {
     // no slots after the last/eof slot (but partial slots may have a nil next)
     assert(!eof || sidNext < 0);
@@ -281,7 +297,7 @@ Rock::IoState::writeBufToDisk(const SlotId sidNext, bool eof)
                        memFreeBufFunc(wBufCap)), this);
     r->sidCurrent = sidCurrent;
     r->sidNext = sidNext;
-    r->eof = eof;
+    r->eof = lastWrite;
 
     // theFile->write may call writeCompleted immediatelly
     theFile->write(r);
@@ -307,7 +323,8 @@ Rock::IoState::finishedWriting(const int errFlag)
 {
     // we incremented offset_ while accumulating data in write()
     // we do not reset writeableAnchor_ here because we still keep the lock
-    CollapsedForwarding::Broadcast(*e);
+    if (touchingStoreEntry())
+        CollapsedForwarding::Broadcast(*e);
     callBack(errFlag);
 }
 
@@ -332,8 +349,7 @@ Rock::IoState::close(int how)
         return; // writeCompleted() will callBack()
 
     case writerGone:
-        assert(writeableAnchor_);
-        dir->writeError(*e); // abort a partially stored entry
+        dir->writeError(*this); // abort a partially stored entry
         finishedWriting(DISK_ERROR);
         return;
 
index 69262067152542519c4e7a6de23a100b08f75267..97e5ef79ed7cd28bec0e27f44a3249f60110672f 100644 (file)
@@ -51,7 +51,11 @@ public:
     const Ipc::StoreMapAnchor *readableAnchor_; ///< starting point for reading
     Ipc::StoreMapAnchor *writeableAnchor_; ///< starting point for writing
 
-    SlotId sidCurrent; ///< ID of the db slot currently being read or written
+    /// the last db slot successfully read or written
+    SlotId splicingPoint;
+    /// when reading, this is the next slot we are going to read (if asked)
+    /// when writing, this is the next slot to use after the last fresh slot
+    SlotId staleSplicingPointNext;
 
 private:
     const Ipc::StoreMapAnchor &readAnchor() const;
@@ -61,7 +65,7 @@ private:
     void tryWrite(char const *buf, size_t size, off_t offset);
     size_t writeToBuffer(char const *buf, size_t size);
     void writeToDisk(const SlotId nextSlot);
-    void writeBufToDisk(const SlotId nextSlot, const bool eof);
+    void writeBufToDisk(const SlotId nextSlot, const bool eof, const bool lastWrite);
     SlotId reserveSlotForWriting();
 
     void callBack(int errflag);
@@ -69,6 +73,7 @@ private:
     Rock::SwapDir::Pointer dir; ///< swap dir that initiated I/O
     const size_t slotSize; ///< db cell size
     int64_t objOffset; ///< object offset for current db slot
+    SlotId sidCurrent; ///< ID of the db slot currently being read or written
 
     RefCount<DiskFile> theFile; // "file" responsible for this I/O
     MemBlob theBuf; // use for write content accumulation only
index 54cd18708ffc2b5afd35d53a3fca7cd9a6e14411..fb5b157d8b0bc8137f198b1eed3530dd7aaa7ab3 100644 (file)
@@ -46,17 +46,15 @@ CBDATA_NAMESPACED_CLASS_INIT(Rock, Rebuild);
  *  have the same key and version. If that assumption fails, we may serve a
  *  hodgepodge entry during rebuild, until "extra" slots are loaded/noticed.
  \par
+ *  iNode: The very first db slot in an entry slot chain. This slot contains
+ *  at least the beginning of Store Entry metadata, but most 32KB inodes contain
+ *  the entire metadata, HTTP headers, and HTTP body.
+ \par
  *  Db slot: A db record containing a piece of a single store entry and linked
  *  to other slots with the same key and version fields, forming a chain.
  *  Slots are identified by their absolute position in the database file,
  *  which is naturally unique.
  \par
- *  Except for the "mapped", "freed", and "more" fields, LoadingEntry info is
- *  entry-level and is stored at fileno position. In other words, the array of
- *  LoadingEntries should be interpreted as two arrays, one that maps slot ID
- *  to the LoadingEntry::mapped/free/more members, and the second one that maps
- *  fileno to all other LoadingEntry members. StoreMap maps slot key to fileno.
- \par
  *  When information from the newly loaded db slot contradicts the entry-level
  *  information collected so far (e.g., the versions do not match or the total
  *  chain size after the slot contribution exceeds the expected number), the
@@ -76,35 +74,135 @@ CBDATA_NAMESPACED_CLASS_INIT(Rock, Rebuild);
 namespace Rock
 {
 
-/// maintains information about the store entry being loaded from disk
-/// used for identifying partially stored/loaded entries
-class LoadingEntry
+/// low-level anti-padding storage class for LoadingEntry and LoadingSlot flags
+class LoadingFlags
 {
 public:
-    LoadingEntry(): size(0), version(0), state(leEmpty), anchored(0),
-        mapped(0), freed(0), more(-1) {}
+    LoadingFlags(): state(0), anchored(0), mapped(0), finalized(0), freed(0) {}
 
-    /* store entry-level information indexed by sfileno */
-    uint64_t size; ///< payload seen so far
-    uint32_t version; ///< DbCellHeader::version to distinguish same-URL chains
-    uint8_t state:3;  ///< current entry state (one of the State values)
+    /* for LoadingEntry */
+    uint8_t state:3;  ///< current entry state (one of the LoadingEntry::State values)
     uint8_t anchored:1;  ///< whether we loaded the inode slot for this entry
 
-    /* db slot-level information indexed by slotId, starting with firstSlot */
-    uint8_t mapped:1;  ///< whether this slot was added to a mapped entry
-    uint8_t freed:1;  ///< whether this slot was marked as free
-    Ipc::StoreMapSliceId more; ///< another slot in some entry chain (unordered)
-    bool used() const { return freed || mapped || more != -1; }
+    /* for LoadingSlot */
+    uint8_t mapped:1;  ///< whether the slot was added to a mapped entry
+    uint8_t finalized:1;  ///< whether finalizeOrThrow() has scanned the slot
+    uint8_t freed:1;  ///< whether the slot was given to the map as free space
+};
+
+/// smart StoreEntry-level info pointer (hides anti-padding LoadingParts arrays)
+class LoadingEntry
+{
+public:
+    LoadingEntry(const sfileno fileNo, LoadingParts &source);
+
+    uint64_t &size; ///< payload seen so far
+    uint32_t &version; ///< DbCellHeader::version to distinguish same-URL chains
 
-    /// possible entry states
+    /// possible store entry states during index rebuild
     typedef enum { leEmpty = 0, leLoading, leLoaded, leCorrupted, leIgnored } State;
+
+    /* LoadingFlags::state */
+    State state() const { return static_cast<State>(flags.state); }
+    void state(State aState) const { flags.state = aState; }
+
+    /* LoadingFlags::anchored */
+    bool anchored() const { return flags.anchored; }
+    void anchored(const bool beAnchored) { flags.anchored = beAnchored; }
+
+private:
+    LoadingFlags &flags; ///< entry flags (see the above accessors) are ours
+};
+
+/// smart db slot-level info pointer (hides anti-padding LoadingParts arrays)
+class LoadingSlot
+{
+public:
+    LoadingSlot(const SlotId slotId, LoadingParts &source);
+
+    /// another slot in some chain belonging to the same entry (unordered!)
+    Ipc::StoreMapSliceId &more;
+
+    /* LoadingFlags::mapped */
+    bool mapped() const { return flags.mapped; }
+    void mapped(const bool beMapped) { flags.mapped = beMapped; }
+
+    /* LoadingFlags::finalized */
+    bool finalized() const { return flags.finalized; }
+    void finalized(const bool beFinalized) { flags.finalized = beFinalized; }
+
+    /* LoadingFlags::freed */
+    bool freed() const { return flags.freed; }
+    void freed(const bool beFreed) { flags.freed = beFreed; }
+
+    bool used() const { return freed() || mapped() || more != -1; }
+
+private:
+    LoadingFlags &flags; ///< slot flags (see the above accessors) are ours
+};
+
+/// information about store entries being loaded from disk (and their slots)
+/// used for identifying partially stored/loaded entries
+class LoadingParts
+{
+public:
+    LoadingParts(int dbSlotLimit, int dbEntryLimit);
+    LoadingParts(LoadingParts&&) = delete; // paranoid (often too huge to copy)
+
+private:
+    friend class LoadingEntry;
+    friend class LoadingSlot;
+
+    /* Anti-padding storage. With millions of entries, padding matters! */
+
+    /* indexed by sfileno */
+    std::vector<uint64_t> sizes; ///< LoadingEntry::size for all entries
+    std::vector<uint32_t> versions; ///< LoadingEntry::version for all entries
+
+    /* indexed by SlotId */
+    std::vector<Ipc::StoreMapSliceId> mores; ///< LoadingSlot::more for all slots
+
+    /* entry flags are indexed by sfileno; slot flags -- by SlotId */
+    std::vector<LoadingFlags> flags; ///< all LoadingEntry and LoadingSlot flags
 };
 
 } /* namespace Rock */
 
+/* LoadingEntry */
+
+Rock::LoadingEntry::LoadingEntry(const sfileno fileNo, LoadingParts &source):
+    size(source.sizes.at(fileNo)),
+    version(source.versions.at(fileNo)),
+    flags(source.flags.at(fileNo))
+{
+}
+
+/* LoadingSlot */
+
+Rock::LoadingSlot::LoadingSlot(const SlotId slotId, LoadingParts &source):
+    more(source.mores.at(slotId)),
+    flags(source.flags.at(slotId))
+{
+}
+
+/* LoadingParts */
+
+Rock::LoadingParts::LoadingParts(const int dbEntryLimit, const int dbSlotLimit):
+    sizes(dbEntryLimit, 0),
+    versions(dbEntryLimit, 0),
+    mores(dbSlotLimit, -1),
+    flags(dbSlotLimit)
+{
+    assert(sizes.size() == versions.size()); // every entry has both fields
+    assert(sizes.size() <= mores.size()); // every entry needs slot(s)
+    assert(mores.size() == flags.size()); // every slot needs a set of flags
+}
+
+/* Rebuild */
+
 Rock::Rebuild::Rebuild(SwapDir *dir): AsyncJob("Rock::Rebuild"),
     sd(dir),
-    entries(NULL),
+    parts(nullptr),
     dbSize(0),
     dbSlotSize(0),
     dbSlotLimit(0),
@@ -127,7 +225,7 @@ Rock::Rebuild::~Rebuild()
 {
     if (fd >= 0)
         file_close(fd);
-    delete[] entries;
+    delete parts;
 }
 
 /// prepares and initiates entry loading sequence
@@ -159,7 +257,7 @@ Rock::Rebuild::start()
 
     dbOffset = SwapDir::HeaderSize;
 
-    entries = new LoadingEntry[dbSlotLimit];
+    parts = new LoadingParts(dbEntryLimit, dbSlotLimit);
 
     checkpoint();
 }
@@ -172,11 +270,24 @@ Rock::Rebuild::checkpoint()
         eventAdd("Rock::Rebuild", Rock::Rebuild::Steps, this, 0.01, 1, true);
 }
 
+bool
+Rock::Rebuild::doneLoading() const
+{
+    return loadingPos >= dbSlotLimit;
+}
+
+bool
+Rock::Rebuild::doneValidating() const
+{
+    // paranoid slot checking is only enabled with squid -S
+    return validationPos >= dbEntryLimit +
+           (opt_store_doublecheck ? dbSlotLimit : 0);
+}
+
 bool
 Rock::Rebuild::doneAll() const
 {
-    return loadingPos >= dbSlotLimit && validationPos >= dbSlotLimit &&
-           AsyncJob::doneAll();
+    return doneLoading() && doneValidating() && AsyncJob::doneAll();
 }
 
 void
@@ -189,7 +300,7 @@ Rock::Rebuild::Steps(void *data)
 void
 Rock::Rebuild::steps()
 {
-    if (loadingPos < dbSlotLimit)
+    if (!doneLoading())
         loadingSteps();
     else
         validationSteps();
@@ -210,7 +321,7 @@ Rock::Rebuild::loadingSteps()
     const timeval loopStart = current_time;
 
     int loaded = 0;
-    while (loadingPos < dbSlotLimit) {
+    while (!doneLoading()) {
         loadOneSlot();
         dbOffset += dbSlotSize;
         ++loadingPos;
@@ -232,6 +343,21 @@ Rock::Rebuild::loadingSteps()
     }
 }
 
+Rock::LoadingEntry
+Rock::Rebuild::loadingEntry(const sfileno fileNo)
+{
+    Must(0 <= fileNo && fileNo < dbEntryLimit);
+    return LoadingEntry(fileNo, *parts);
+}
+
+Rock::LoadingSlot
+Rock::Rebuild::loadingSlot(const SlotId slotId)
+{
+    Must(0 <= slotId && slotId < dbSlotLimit);
+    Must(slotId <= loadingPos); // cannot look ahead
+    return LoadingSlot(slotId, *parts);
+}
+
 void
 Rock::Rebuild::loadOneSlot()
 {
@@ -256,18 +382,18 @@ Rock::Rebuild::loadOneSlot()
         debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
                "Ignoring truncated " << buf.contentSize() << "-byte " <<
                "cache entry meta data at " << dbOffset);
-        freeSlotIfIdle(slotId, true);
+        freeUnusedSlot(slotId, true);
         return;
     }
     memcpy(&header, buf.content(), sizeof(header));
     if (header.empty()) {
-        freeSlotIfIdle(slotId, false);
+        freeUnusedSlot(slotId, false);
         return;
     }
     if (!header.sane(dbSlotSize, dbSlotLimit)) {
         debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
                "Ignoring malformed cache entry meta data at " << dbOffset);
-        freeSlotIfIdle(slotId, true);
+        freeUnusedSlot(slotId, true);
         return;
     }
     buf.consume(sizeof(header)); // optimize to avoid memmove()
@@ -286,9 +412,10 @@ Rock::Rebuild::importEntry(Ipc::StoreMapAnchor &anchor, const sfileno fileno, co
     if (!storeRebuildParseEntry(buf, loadedE, key, counts, knownSize))
         return false;
 
-    // the entry size may still be unknown at this time
+    // the entry size may be unknown, but if it is known, it is authoritative
 
     debugs(47, 8, "importing basics for entry " << fileno <<
+           " inode.entrySize: " << header.entrySize <<
            " swap_file_sz: " << loadedE.swap_file_sz);
     anchor.set(loadedE);
 
@@ -310,8 +437,11 @@ Rock::Rebuild::validationSteps()
     const timeval loopStart = current_time;
 
     int validated = 0;
-    while (validationPos < dbSlotLimit) {
-        validateOneEntry();
+    while (!doneValidating()) {
+        if (validationPos < dbEntryLimit)
+            validateOneEntry(validationPos);
+        else
+            validateOneSlot(validationPos - dbEntryLimit);
         ++validationPos;
         ++validated;
 
@@ -331,27 +461,78 @@ Rock::Rebuild::validationSteps()
     }
 }
 
+/// Either make the entry accessible to all or throw.
+/// This method assumes it is called only when no more entry slots are expected.
+void
+Rock::Rebuild::finalizeOrThrow(const sfileno fileNo, LoadingEntry &le)
+{
+    // walk all map-linked slots, starting from inode, and mark each
+    Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileNo);
+    Must(le.size > 0); // paranoid
+    uint64_t mappedSize = 0;
+    SlotId slotId = anchor.start;
+    while (slotId >= 0 && mappedSize < le.size) {
+        LoadingSlot slot = loadingSlot(slotId); // throws if we have not loaded that slot
+        Must(!slot.finalized()); // no loops or stealing from other entries
+        Must(slot.mapped()); // all our slots should be in the sd->map
+        Must(!slot.freed()); // all our slots should still be present
+        slot.finalized(true);
+
+        Ipc::StoreMapSlice &mapSlice = sd->map->writeableSlice(fileNo, slotId);
+        Must(mapSlice.size > 0); // paranoid
+        mappedSize += mapSlice.size;
+        slotId = mapSlice.next;
+    }
+    /* no hodgepodge entries: one entry - one full chain and no leftovers */
+    Must(slotId < 0);
+    Must(mappedSize == le.size);
+
+    if (!anchor.basics.swap_file_sz)
+        anchor.basics.swap_file_sz = le.size;
+    EBIT_SET(anchor.basics.flags, ENTRY_VALIDATED);
+    le.state(LoadingEntry::leLoaded);
+    sd->map->closeForWriting(fileNo, false);
+    ++counts.objcount;
+}
+
+/// Either make the entry accessible to all or free it.
+/// This method must only be called when no more entry slots are expected.
 void
-Rock::Rebuild::validateOneEntry()
+Rock::Rebuild::finalizeOrFree(const sfileno fileNo, LoadingEntry &le)
 {
-    LoadingEntry &e = entries[validationPos];
-    switch (e.state) {
+    try {
+        finalizeOrThrow(fileNo, le);
+    } catch (const std::exception &ex) {
+        freeBadEntry(fileNo, ex.what());
+    }
+}
 
-    case LoadingEntry::leEmpty:
-        break; // no entry hashed to this position
+void
+Rock::Rebuild::validateOneEntry(const sfileno fileNo)
+{
+    LoadingEntry entry = loadingEntry(fileNo);
+    switch (entry.state()) {
 
     case LoadingEntry::leLoading:
-        freeBadEntry(validationPos, "partially stored");
+        finalizeOrFree(fileNo, entry);
         break;
 
-    case LoadingEntry::leLoaded:
-        break; // we have already unlocked this entry
-
-    case LoadingEntry::leCorrupted:
-        break; // we have already removed this entry
+    case LoadingEntry::leEmpty: // no entry hashed to this position
+    case LoadingEntry::leLoaded: // we have already unlocked this entry
+    case LoadingEntry::leCorrupted: // we have already removed this entry
+    case LoadingEntry::leIgnored: // we have already discarded this entry
+        break;
     }
 }
 
+void
+Rock::Rebuild::validateOneSlot(const SlotId slotId)
+{
+    const LoadingSlot slot = loadingSlot(slotId);
+    // there should not be any unprocessed slots left
+    Must(slot.freed() || (slot.mapped() && slot.finalized()));
+}
+
 /// Marks remaining bad entry slots as free and unlocks the entry. The map
 /// cannot do this because Loading entries may have holes in the slots chain.
 void
@@ -360,26 +541,18 @@ Rock::Rebuild::freeBadEntry(const sfileno fileno, const char *eDescription)
     debugs(47, 2, "cache_dir #" << sd->index << ' ' << eDescription <<
            " entry " << fileno << " is ignored during rebuild");
 
-    Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno);
+    LoadingEntry le = loadingEntry(fileno);
+    le.state(LoadingEntry::leCorrupted);
 
-    bool freedSome = false;
-    // free all loaded non-anchor slots
-    SlotId slotId = entries[anchor.start].more;
-    while (slotId >= 0) {
-        const SlotId next = entries[slotId].more;
-        freeSlot(slotId, false);
+    Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno);
+    assert(anchor.start < 0 || le.size > 0);
+    for (SlotId slotId = anchor.start; slotId >= 0;) {
+        const SlotId next = loadingSlot(slotId).more;
+        freeSlot(slotId, true);
         slotId = next;
-        freedSome = true;
     }
-    // free anchor slot if it was loaded
-    if (entries[fileno].anchored) {
-        freeSlot(anchor.start, false);
-        freedSome = true;
-    }
-    assert(freedSome);
 
     sd->map->forgetWritingEntry(fileno);
-    ++counts.invalid;
 }
 
 void
@@ -411,9 +584,9 @@ void
 Rock::Rebuild::freeSlot(const SlotId slotId, const bool invalid)
 {
     debugs(47,5, sd->index << " frees slot " << slotId);
-    LoadingEntry &le = entries[slotId];
-    assert(!le.freed);
-    le.freed = 1;
+    LoadingSlot slot = loadingSlot(slotId);
+    assert(!slot.freed());
+    slot.freed(true);
 
     if (invalid) {
         ++counts.invalid;
@@ -426,27 +599,24 @@ Rock::Rebuild::freeSlot(const SlotId slotId, const bool invalid)
     sd->freeSlots->push(pageId);
 }
 
-/// adds slot to the free slot index but only if the slot is unused
+/// freeSlot() for never-been-mapped slots
 void
-Rock::Rebuild::freeSlotIfIdle(const SlotId slotId, const bool invalid)
+Rock::Rebuild::freeUnusedSlot(const SlotId slotId, const bool invalid)
 {
-    const LoadingEntry &le = entries[slotId];
-
+    LoadingSlot slot = loadingSlot(slotId);
     // mapped slots must be freed via freeBadEntry() to keep the map in sync
-    assert(!le.mapped);
-
-    if (!le.used())
-        freeSlot(slotId, invalid);
+    assert(!slot.mapped());
+    freeSlot(slotId, invalid);
 }
 
 /// adds slot to the entry chain in the map
 void
 Rock::Rebuild::mapSlot(const SlotId slotId, const DbCellHeader &header)
 {
-    LoadingEntry &le = entries[slotId];
-    assert(!le.mapped);
-    assert(!le.freed);
-    le.mapped = 1;
+    LoadingSlot slot = loadingSlot(slotId);
+    assert(!slot.mapped());
+    assert(!slot.freed());
+    slot.mapped(true);
 
     Ipc::StoreMapSlice slice;
     slice.next = header.nextSlot;
@@ -454,73 +624,75 @@ Rock::Rebuild::mapSlot(const SlotId slotId, const DbCellHeader &header)
     sd->map->importSlice(slotId, slice);
 }
 
+template <class SlotIdType> // accommodates atomic and simple SlotIds.
+void
+Rock::Rebuild::chainSlots(SlotIdType &from, const SlotId to)
+{
+    LoadingSlot slot = loadingSlot(to);
+    assert(slot.more < 0);
+    slot.more = from; // may still be unset
+    from = to;
+}
+
 /// adds slot to an existing entry chain; caller must check that the slot
 /// belongs to the chain it is being added to
 void
 Rock::Rebuild::addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
 {
-    LoadingEntry &le = entries[fileno];
+    LoadingEntry le = loadingEntry(fileno);
     Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno);
 
-    assert(le.version == header.version);
-
-    // mark anchor as loaded or add the secondary slot to the chain
-    LoadingEntry &inode = entries[header.firstSlot];
-    if (header.firstSlot == slotId) {
-        debugs(47,5, "adding inode");
-        assert(!inode.freed);
-        le.anchored = 1;
+    debugs(47,9, "adding " << slotId << " to entry " << fileno);
+    // we do not need to preserve the order
+    if (le.anchored()) {
+        LoadingSlot inode = loadingSlot(anchor.start);
+        chainSlots(inode.more, slotId);
     } else {
-        debugs(47,9, "linking " << slotId << " to " << inode.more);
-        // we do not need to preserve the order
-        LoadingEntry &slice = entries[slotId];
-        assert(!slice.freed);
-        assert(slice.more < 0);
-        slice.more = inode.more;
-        inode.more = slotId;
+        chainSlots(anchor.start, slotId);
     }
 
-    if (header.firstSlot == slotId && !importEntry(anchor, fileno, header)) {
-        le.state = LoadingEntry::leCorrupted;
-        freeBadEntry(fileno, "corrupted metainfo");
-        return;
-    }
+    le.size += header.payloadSize; // must precede freeBadEntry() calls
 
-    // set total entry size and/or check it for consistency
-    debugs(47, 8, "header.entrySize: " << header.entrySize << " swap_file_sz: " << anchor.basics.swap_file_sz);
-    uint64_t totalSize = header.entrySize;
-    assert(totalSize != static_cast<uint64_t>(-1));
-    if (!totalSize && anchor.basics.swap_file_sz) {
-        assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
-        // perhaps we loaded a later slot (with entrySize) earlier
-        totalSize = anchor.basics.swap_file_sz;
-    } else if (totalSize && !anchor.basics.swap_file_sz) {
-        anchor.basics.swap_file_sz = totalSize;
-        assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
-    } else if (totalSize != anchor.basics.swap_file_sz) {
-        le.state = LoadingEntry::leCorrupted;
-        freeBadEntry(fileno, "size mismatch");
-        return;
+    if (header.firstSlot == slotId) {
+        debugs(47,5, "added inode");
+
+        if (le.anchored()) { // we have already added another inode slot
+            freeBadEntry(fileno, "inode conflict");
+            ++counts.clashcount;
+            return;
+        }
+
+        le.anchored(true);
+
+        if (!importEntry(anchor, fileno, header)) {
+            freeBadEntry(fileno, "corrupted metainfo");
+            return;
+        }
+
+        // set total entry size and/or check it for consistency
+        if (const uint64_t totalSize = header.entrySize) {
+            assert(totalSize != static_cast<uint64_t>(-1));
+            if (!anchor.basics.swap_file_sz) {
+                anchor.basics.swap_file_sz = totalSize;
+                assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
+            } else if (totalSize != anchor.basics.swap_file_sz) {
+                freeBadEntry(fileno, "size mismatch");
+                return;
+            }
+        }
     }
 
-    le.size += header.payloadSize;
+    const uint64_t totalSize = anchor.basics.swap_file_sz; // may be 0/unknown
 
     if (totalSize > 0 && le.size > totalSize) { // overflow
         debugs(47, 8, "overflow: " << le.size << " > " << totalSize);
-        le.state = LoadingEntry::leCorrupted;
         freeBadEntry(fileno, "overflowing");
         return;
     }
 
     mapSlot(slotId, header);
-    if (totalSize > 0 && le.size == totalSize) {
-        // entry fully loaded, unlock it
-        // we have validated that all db cells for this entry were loaded
-        EBIT_SET(anchor.basics.flags, ENTRY_VALIDATED);
-        le.state = LoadingEntry::leLoaded;
-        sd->map->closeForWriting(fileno, false);
-        ++counts.objcount;
-    }
+    if (totalSize > 0 && le.size == totalSize)
+        finalizeOrFree(fileno, le); // entry is probably fully loaded now
 }
 
 /// initialize housekeeping information for a newly accepted entry
@@ -529,12 +701,12 @@ Rock::Rebuild::primeNewEntry(Ipc::StoreMap::Anchor &anchor, const sfileno fileno
 {
     anchor.setKey(reinterpret_cast<const cache_key*>(header.key));
     assert(header.firstSlot >= 0);
-    anchor.start = header.firstSlot;
+    anchor.start = -1; // addSlotToEntry() will set it
 
     assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
 
-    LoadingEntry &le = entries[fileno];
-    le.state = LoadingEntry::leLoading;
+    LoadingEntry le = loadingEntry(fileno);
+    le.state(LoadingEntry::leLoading);
     le.version = header.version;
     le.size = 0;
 }
@@ -543,22 +715,6 @@ Rock::Rebuild::primeNewEntry(Ipc::StoreMap::Anchor &anchor, const sfileno fileno
 void
 Rock::Rebuild::startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
 {
-    // If some other from-disk entry is/was using this slot as its inode OR
-    // if some other from-disk entry is/was using our inode slot, then the
-    // entries are conflicting. We cannot identify other entries, so we just
-    // remove ours and hope that the others were/will be handled correctly.
-    const LoadingEntry &slice = entries[slotId];
-    const LoadingEntry &inode = entries[header.firstSlot];
-    if (slice.used() || inode.used()) {
-        debugs(47,8, "slice/inode used: " << slice.used() << inode.used());
-        LoadingEntry &le = entries[fileno];
-        le.state = LoadingEntry::leCorrupted;
-        freeSlotIfIdle(slotId, slotId == header.firstSlot);
-        // if not idle, the other entry will handle its slice
-        ++counts.clashcount;
-        return;
-    }
-
     // A miss may have been stored at our fileno while we were loading other
     // slots from disk. We ought to preserve that entry because it is fresher.
     const bool overwriteExisting = false;
@@ -569,9 +725,9 @@ Rock::Rebuild::startNewEntry(const sfileno fileno, const SlotId slotId, const Db
     } else {
         // A new from-network entry is occupying our map slot; let it be, but
         // save us from the trouble of going through the above motions again.
-        LoadingEntry &le = entries[fileno];
-        le.state = LoadingEntry::leIgnored;
-        freeSlotIfIdle(slotId, false);
+        LoadingEntry le = loadingEntry(fileno);
+        le.state(LoadingEntry::leIgnored);
+        freeUnusedSlot(slotId, false);
     }
 }
 
@@ -579,72 +735,26 @@ Rock::Rebuild::startNewEntry(const sfileno fileno, const SlotId slotId, const Db
 bool
 Rock::Rebuild::sameEntry(const sfileno fileno, const DbCellHeader &header) const
 {
+    // Header updates always result in multi-start chains and often
+    // result in multi-version chains so we can only compare the keys.
     const Ipc::StoreMap::Anchor &anchor = sd->map->writeableEntry(fileno);
-    const LoadingEntry &le = entries[fileno];
-    // any order will work, but do fast comparisons first:
-    return le.version == header.version &&
-           anchor.start == static_cast<Ipc::StoreMapSliceId>(header.firstSlot) &&
-           anchor.sameKey(reinterpret_cast<const cache_key*>(header.key));
-}
-
-/// is the new header consistent with information already loaded?
-bool
-Rock::Rebuild::canAdd(const sfileno fileno, const SlotId slotId, const DbCellHeader &header) const
-{
-    if (!sameEntry(fileno, header)) {
-        debugs(79, 7, "cannot add; wrong entry");
-        return false;
-    }
-
-    const LoadingEntry &le = entries[slotId];
-    // We cannot add a slot that was already declared free or mapped.
-    if (le.freed || le.mapped) {
-        debugs(79, 7, "cannot add; freed/mapped: " << le.freed << le.mapped);
-        return false;
-    }
-
-    if (slotId == header.firstSlot) {
-        // If we are the inode, the anchored flag cannot be set yet.
-        if (entries[fileno].anchored) {
-            debugs(79, 7, "cannot add; extra anchor");
-            return false;
-        }
-
-        // And there should have been some other slot for this entry to exist.
-        if (le.more < 0) {
-            debugs(79, 7, "cannot add; missing slots");
-            return false;
-        }
-
-        return true;
-    }
-
-    // We are the continuation slice so the more field is reserved for us.
-    if (le.more >= 0) {
-        debugs(79, 7, "cannot add; foreign slot");
-        return false;
-    }
-
-    return true;
+    return anchor.sameKey(reinterpret_cast<const cache_key*>(header.key));
 }
 
 /// handle freshly loaded (and validated) db slot header
 void
 Rock::Rebuild::useNewSlot(const SlotId slotId, const DbCellHeader &header)
 {
-    LoadingEntry &slice = entries[slotId];
-    assert(!slice.freed); // we cannot free what was not loaded
-
     const cache_key *const key =
         reinterpret_cast<const cache_key*>(header.key);
-    const sfileno fileno = sd->map->anchorIndexByKey(key);
+    const sfileno fileno = sd->map->fileNoByKey(key);
     assert(0 <= fileno && fileno < dbEntryLimit);
 
-    LoadingEntry &le = entries[fileno];
-    debugs(47,9, "entry " << fileno << " state: " << le.state << ", inode: " <<
+    LoadingEntry le = loadingEntry(fileno);
+    debugs(47,9, "entry " << fileno << " state: " << le.state() << ", inode: " <<
            header.firstSlot << ", size: " << header.payloadSize);
 
-    switch (le.state) {
+    switch (le.state()) {
 
     case LoadingEntry::leEmpty: {
         startNewEntry(fileno, slotId, header);
@@ -652,14 +762,13 @@ Rock::Rebuild::useNewSlot(const SlotId slotId, const DbCellHeader &header)
     }
 
     case LoadingEntry::leLoading: {
-        if (canAdd(fileno, slotId, header)) {
-            addSlotToEntry(fileno, slotId, header);
+        if (sameEntry(fileno, header)) {
+            addSlotToEntry(fileno, slotId, header); // may fail
         } else {
             // either the loading chain or this slot is stale;
             // be conservative and ignore both (and any future ones)
-            le.state = LoadingEntry::leCorrupted;
             freeBadEntry(fileno, "duplicated");
-            freeSlotIfIdle(slotId, slotId == header.firstSlot);
+            freeUnusedSlot(slotId, true);
             ++counts.dupcount;
         }
         break;
@@ -668,22 +777,22 @@ Rock::Rebuild::useNewSlot(const SlotId slotId, const DbCellHeader &header)
     case LoadingEntry::leLoaded: {
         // either the previously loaded chain or this slot is stale;
         // be conservative and ignore both (and any future ones)
-        le.state = LoadingEntry::leCorrupted;
+        le.state(LoadingEntry::leCorrupted);
         sd->map->freeEntry(fileno); // may not be immediately successful
-        freeSlotIfIdle(slotId, slotId == header.firstSlot);
+        freeUnusedSlot(slotId, true);
         ++counts.dupcount;
         break;
     }
 
     case LoadingEntry::leCorrupted: {
         // previously seen slots messed things up so we must ignore this one
-        freeSlotIfIdle(slotId, false);
+        freeUnusedSlot(slotId, true);
         break;
     }
 
     case LoadingEntry::leIgnored: {
         // already replaced by a fresher or colliding from-network entry
-        freeSlotIfIdle(slotId, false);
+        freeUnusedSlot(slotId, false);
         break;
     }
     }
index 3735f8a7d0ef780d64157b4d7e0b316e6b321249..56febd0507292a7de0c8f8d661184f441932e499 100644 (file)
@@ -19,6 +19,8 @@ namespace Rock
 {
 
 class LoadingEntry;
+class LoadingSlot;
+class LoadingParts;
 
 /// \ingroup Rock
 /// manages store rebuild process: loading meta information from db on disk
@@ -36,33 +38,42 @@ protected:
     virtual bool doneAll() const;
     virtual void swanSong();
 
+    bool doneLoading() const;
+    bool doneValidating() const;
+
 private:
     void checkpoint();
     void steps();
     void loadingSteps();
     void validationSteps();
     void loadOneSlot();
-    void validateOneEntry();
+    void validateOneEntry(const sfileno fileNo);
+    void validateOneSlot(const SlotId slotId);
     bool importEntry(Ipc::StoreMapAnchor &anchor, const sfileno slotId, const DbCellHeader &header);
     void freeBadEntry(const sfileno fileno, const char *eDescription);
 
     void failure(const char *msg, int errNo = 0);
 
+    LoadingEntry loadingEntry(const sfileno fileNo);
     void startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header);
     void primeNewEntry(Ipc::StoreMapAnchor &anchor, const sfileno fileno, const DbCellHeader &header);
+    void finalizeOrFree(const sfileno fileNo, LoadingEntry &le);
+    void finalizeOrThrow(const sfileno fileNo, LoadingEntry &le);
     void addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header);
     void useNewSlot(const SlotId slotId, const DbCellHeader &header);
 
+    LoadingSlot loadingSlot(const SlotId slotId);
     void mapSlot(const SlotId slotId, const DbCellHeader &header);
-    void freeSlotIfIdle(const SlotId slotId, const bool invalid);
-    void freeBusySlot(const SlotId slotId, const bool invalid);
+    void freeUnusedSlot(const SlotId slotId, const bool invalid);
     void freeSlot(const SlotId slotId, const bool invalid);
 
-    bool canAdd(const sfileno fileno, const SlotId slotId, const DbCellHeader &header) const;
+    template <class SlotIdType>
+    void chainSlots(SlotIdType &from, const SlotId to);
+
     bool sameEntry(const sfileno fileno, const DbCellHeader &header) const;
 
     SwapDir *sd;
-    LoadingEntry *entries; ///< store entries being loaded from disk
+    LoadingParts *parts; ///< parts of store entries being loaded from disk
 
     int64_t dbSize;
     int dbSlotSize; ///< the size of a db cell, including the cell header
index 1ae2f4b66892ff6d77b430790898befcb284fb75..f5fac233cdd4d3d49762ccb5bd55cbf8221bab86 100644 (file)
@@ -16,6 +16,7 @@
 #include "DiskIO/DiskIOStrategy.h"
 #include "DiskIO/ReadRequest.h"
 #include "DiskIO/WriteRequest.h"
+#include "fs/rock/RockHeaderUpdater.h"
 #include "fs/rock/RockIoRequests.h"
 #include "fs/rock/RockIoState.h"
 #include "fs/rock/RockRebuild.h"
@@ -666,6 +667,33 @@ Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreI
     return sio;
 }
 
+StoreIOState::Pointer
+Rock::SwapDir::createUpdateIO(const Ipc::StoreMapUpdate &update, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
+{
+    if (!theFile || theFile->error()) {
+        debugs(47,4, theFile);
+        return nullptr;
+    }
+
+    Must(update.fresh);
+    Must(update.fresh.fileNo >= 0);
+
+    Rock::SwapDir::Pointer self(this);
+    IoState *sio = new IoState(self, update.entry, cbFile, cbIo, data);
+
+    sio->swap_dirn = index;
+    sio->swap_filen = update.fresh.fileNo;
+    sio->writeableAnchor_ = update.fresh.anchor;
+
+    debugs(47,5, "dir " << index << " updating filen " <<
+           std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
+           sio->swap_filen << std::dec << " starting at " <<
+           diskOffset(sio->swap_filen));
+
+    sio->file(theFile);
+    return sio;
+}
+
 int64_t
 Rock::SwapDir::diskOffset(const SlotId sid) const
 {
@@ -832,6 +860,8 @@ Rock::SwapDir::writeCompleted(int errflag, size_t, RefCount< ::WriteRequest> r)
         return;
     }
 
+    debugs(79, 7, "errflag=" << errflag << " rlen=" << request->len << " eof=" << request->eof);
+
     // TODO: Fail if disk dropped one of the previous write requests.
 
     if (errflag == DISK_OK) {
@@ -846,37 +876,61 @@ Rock::SwapDir::writeCompleted(int errflag, size_t, RefCount< ::WriteRequest> r)
         if (request->eof) {
             assert(sio.e);
             assert(sio.writeableAnchor_);
-            sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz =
-                                      sio.offset_;
+            if (sio.touchingStoreEntry()) {
+                sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz =
+                                          sio.offset_;
 
-            // close, the entry gets the read lock
-            map->closeForWriting(sio.swap_filen, true);
+                // close, the entry gets the read lock
+                map->closeForWriting(sio.swap_filen, true);
+            }
             sio.writeableAnchor_ = NULL;
+            sio.splicingPoint = request->sidCurrent;
             sio.finishedWriting(errflag);
         }
     } else {
         noteFreeMapSlice(request->sidNext);
 
-        writeError(*sio.e);
+        writeError(sio);
         sio.finishedWriting(errflag);
         // and hope that Core will call disconnect() to close the map entry
     }
 
-    CollapsedForwarding::Broadcast(*sio.e);
+    if (sio.touchingStoreEntry())
+        CollapsedForwarding::Broadcast(*sio.e);
 }
 
 void
-Rock::SwapDir::writeError(StoreEntry &e)
+Rock::SwapDir::writeError(StoreIOState &sio)
 {
     // Do not abortWriting here. The entry should keep the write lock
     // instead of losing association with the store and confusing core.
-    map->freeEntry(e.swap_filen); // will mark as unusable, just in case
+    map->freeEntry(sio.swap_filen); // will mark as unusable, just in case
 
-    Store::Root().transientsAbandon(e);
+    if (sio.touchingStoreEntry())
+        Store::Root().transientsAbandon(*sio.e);
+    // else noop: a fresh entry update error does not affect stale entry readers
 
     // All callers must also call IoState callback, to propagate the error.
 }
 
+void
+Rock::SwapDir::updateHeaders(StoreEntry *updatedE)
+{
+    if (!map)
+        return;
+
+    Ipc::StoreMapUpdate update(updatedE);
+    if (!map->openForUpdating(update, updatedE->swap_filen))
+        return;
+
+    try {
+        AsyncJob::Start(new HeaderUpdater(this, update));
+    } catch (const std::exception &ex) {
+        debugs(20, 2, "error starting to update entry " << *updatedE << ": " << ex.what());
+        map->abortUpdating(update);
+    }
+}
+
 bool
 Rock::SwapDir::full() const
 {
index 2be42dea85ecf151486bc45472a868b306d00fb0..7340ec7e4becae87cb261b57dc1770ffcd7342e3 100644 (file)
@@ -67,7 +67,7 @@ public:
 
     int64_t diskOffset(Ipc::Mem::PageId &pageId) const;
     int64_t diskOffset(int filen) const;
-    void writeError(StoreEntry &e);
+    void writeError(StoreIOState &sio);
 
     /* StoreMapCleaner API */
     virtual void noteFreeMapSlice(const Ipc::StoreMapSliceId fileno);
@@ -91,6 +91,7 @@ protected:
     virtual void diskFull();
     virtual void reference(StoreEntry &e);
     virtual bool dereference(StoreEntry &e);
+    virtual void updateHeaders(StoreEntry *e);
     virtual bool unlinkdUseful() const;
     virtual void unlink(StoreEntry &e);
     virtual void statfs(StoreEntry &e) const;
@@ -118,11 +119,15 @@ protected:
 
     int64_t diskOffsetLimit() const;
 
+    void updateHeadersOrThrow(Ipc::StoreMapUpdate &update);
+    StoreIOState::Pointer createUpdateIO(const Ipc::StoreMapUpdate &update, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *);
+
     void anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor);
     bool updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor);
 
     friend class Rebuild;
     friend class IoState;
+    friend class HeaderUpdater;
     const char *filePath; ///< location of cache storage file inside path/
     DirMap *map; ///< entry key/sfileno to MaxExtras/inode mapping
 
index 1a3e9a6458de78c28aac6c9daf3260b5a1624452..6cbd1d0b5267d21da0e1c3a689a833afb70256c3 100644 (file)
@@ -36,6 +36,8 @@ class Rebuild;
 
 class IoState;
 
+class HeaderUpdater;
+
 class DbCellHeader;
 
 }
index af712ed82a5a960290cde47b84ba362780b310b3..a54804b28bc84edab41fc9ec75d4f8cfebaff74f 100644 (file)
@@ -1448,8 +1448,11 @@ HttpStateData::processReplyBody()
             writeReplyBody();
     }
 
+    // storing/sending methods like earlier adaptOrFinalizeReply() or
+    // above writeReplyBody() may release/abort the store entry.
     if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
-        // The above writeReplyBody() call may have aborted the store entry.
+        // TODO: In some cases (e.g., 304), we should keep persistent conn open.
+        // Detect end-of-reply (and, hence, pool our idle pconn) earlier (ASAP).
         abortTransaction("store entry aborted while storing reply");
         return;
     } else
index 813b89da799745198a9ce31dd7e7b3b056366249..2a3b7951e04b96941a20caf6910f8dfb63a57839 100644 (file)
 #include "ipc/ReadWriteLock.h"
 #include "Store.h"
 
+void Ipc::AssertFlagIsSet(std::atomic_flag &flag)
+{
+    // If the flag was false, then we set it to true and assert. A true flag
+    // may help keep other processes away from this broken entry.
+    // Otherwise, we just set an already set flag, which is probably a no-op.
+    assert(flag.test_and_set(std::memory_order_relaxed));
+}
+
 bool
 Ipc::ReadWriteLock::lockShared()
 {
@@ -37,6 +45,18 @@ Ipc::ReadWriteLock::lockExclusive()
     return false;
 }
 
+bool
+Ipc::ReadWriteLock::lockHeaders()
+{
+    if (lockShared()) {
+        if (!updating.test_and_set(std::memory_order_acquire))
+            return true; // we got here first
+        // the updating lock was already set by somebody else
+        unlockShared();
+    }
+    return false;
+}
+
 void
 Ipc::ReadWriteLock::unlockShared()
 {
@@ -54,6 +74,14 @@ Ipc::ReadWriteLock::unlockExclusive()
     --writeLevel;
 }
 
+void
+Ipc::ReadWriteLock::unlockHeaders()
+{
+    AssertFlagIsSet(updating);
+    updating.clear(std::memory_order_release);
+    unlockShared();
+}
+
 void
 Ipc::ReadWriteLock::switchExclusiveToShared()
 {
index 329be57255c695578f0303992253d4491ac4a443..59f71877bad322ddfc99882e81f9951af0990d45 100644 (file)
@@ -30,8 +30,10 @@ public:
 
     bool lockShared(); ///< lock for reading or return false
     bool lockExclusive(); ///< lock for modification or return false
+    bool lockHeaders(); ///< lock for [readable] metadata update or return false
     void unlockShared(); ///< undo successful sharedLock()
     void unlockExclusive(); ///< undo successful exclusiveLock()
+    void unlockHeaders(); ///< undo successful lockHeaders()
     void switchExclusiveToShared(); ///< stop writing, start reading
 
     void startAppending(); ///< writer keeps its lock but also allows reading
@@ -42,7 +44,8 @@ public:
 public:
     mutable std::atomic<uint32_t> readers; ///< number of reading users
     std::atomic<bool> writing; ///< there is a writing user (there can be at most 1)
-    std::atomic<bool> appending; ///< the writer has promissed to only append
+    std::atomic<bool> appending; ///< the writer has promised to only append
+    std::atomic_flag updating; ///< a reader is updating metadata/headers
 
 private:
     mutable std::atomic<uint32_t> readLevel; ///< number of users reading (or trying to)
@@ -66,6 +69,11 @@ public:
     int appenders; ///< number of appending writers
 };
 
+/// Same as assert(flag is set): The process assert()s if flag is not set.
+/// Side effect: The unset flag becomes set just before we assert().
+/// Needed because atomic_flag cannot be compared with a boolean.
+void AssertFlagIsSet(std::atomic_flag &flag);
+
 } // namespace Ipc
 
 #endif /* SQUID_IPC_READ_WRITE_LOCK_H */
index 9c39f398e26a9a957859d482e9f2e13ad00ec842..124c07261abf1713b2dd6cd3d79edfafa5ac07ea 100644 (file)
@@ -27,12 +27,19 @@ StoreMapAnchorsId(const SBuf &path)
     return Ipc::Mem::Segment::Name(path, "anchors");
 }
 
+static SBuf
+StoreMapFileNosId(const SBuf &path)
+{
+    return Ipc::Mem::Segment::Name(path, "filenos");
+}
+
 Ipc::StoreMap::Owner *
 Ipc::StoreMap::Init(const SBuf &path, const int sliceLimit)
 {
     assert(sliceLimit > 0); // we should not be created otherwise
     const int anchorLimit = min(sliceLimit, static_cast<int>(SwapFilenMax));
     Owner *owner = new Owner;
+    owner->fileNos = shm_new(FileNos)(StoreMapFileNosId(path).c_str(), anchorLimit);
     owner->anchors = shm_new(Anchors)(StoreMapAnchorsId(path).c_str(), anchorLimit);
     owner->slices = shm_new(Slices)(StoreMapSlicesId(path).c_str(), sliceLimit);
     debugs(54, 5, "created " << path << " with " << anchorLimit << '+' << sliceLimit);
@@ -40,10 +47,12 @@ Ipc::StoreMap::Init(const SBuf &path, const int sliceLimit)
 }
 
 Ipc::StoreMap::StoreMap(const SBuf &aPath): cleaner(NULL), path(aPath),
+    fileNos(shm_old(FileNos)(StoreMapFileNosId(path).c_str())),
     anchors(shm_old(Anchors)(StoreMapAnchorsId(path).c_str())),
     slices(shm_old(Slices)(StoreMapSlicesId(path).c_str()))
 {
     debugs(54, 5, "attached " << path << " with " <<
+           fileNos->capacity << '+' <<
            anchors->capacity << '+' << slices->capacity);
     assert(entryLimit() > 0); // key-to-position mapping requires this
     assert(entryLimit() <= sliceLimit()); // at least one slice per entry
@@ -76,7 +85,6 @@ Ipc::StoreMap::forgetWritingEntry(sfileno fileno)
     // them; the caller is responsible for freeing them (most likely
     // our slice list is incomplete or has holes)
 
-    inode.waitingToBeFreed = false;
     inode.rewind();
 
     inode.lock.unlockExclusive();
@@ -90,7 +98,7 @@ Ipc::StoreMap::openForWriting(const cache_key *const key, sfileno &fileno)
 {
     debugs(54, 5, "opening entry with key " << storeKeyText(key)
            << " for writing " << path);
-    const int idx = anchorIndexByKey(key);
+    const int idx = fileNoByKey(key);
 
     if (Anchor *anchor = openForWritingAt(idx)) {
         fileno = idx;
@@ -123,6 +131,7 @@ Ipc::StoreMap::openForWritingAt(const sfileno fileno, bool overwriteExisting)
 
         assert(s.empty());
         s.start = -1; // we have not allocated any slices yet
+        s.splicingPoint = -1;
         ++anchors->count;
 
         //s.setKey(key); // XXX: the caller should do that
@@ -208,6 +217,24 @@ Ipc::StoreMap::abortWriting(const sfileno fileno)
     }
 }
 
+void
+Ipc::StoreMap::abortUpdating(Update &update)
+{
+    const sfileno fileno = update.stale.fileNo;
+    debugs(54, 5, "aborting entry " << fileno << " for updating " << path);
+    if (update.stale) {
+        AssertFlagIsSet(update.stale.anchor->lock.updating);
+        update.stale.anchor->lock.unlockHeaders();
+        closeForReading(update.stale.fileNo);
+        update.stale = Update::Edition();
+    }
+    if (update.fresh) {
+        abortWriting(update.fresh.fileNo);
+        update.fresh = Update::Edition();
+    }
+    debugs(54, 5, "aborted entry " << fileno << " for updating " << path);
+}
+
 const Ipc::StoreMap::Anchor *
 Ipc::StoreMap::peekAtReader(const sfileno fileno) const
 {
@@ -245,7 +272,7 @@ Ipc::StoreMap::freeEntryByKey(const cache_key *const key)
     debugs(54, 5, "marking entry with key " << storeKeyText(key)
            << " to be freed in " << path);
 
-    const int idx = anchorIndexByKey(key);
+    const int idx = fileNoByKey(key);
     Anchor &s = anchorAt(idx);
     if (s.lock.lockExclusive()) {
         if (s.sameKey(key))
@@ -269,21 +296,8 @@ Ipc::StoreMap::freeChain(const sfileno fileno, Anchor &inode, const bool keepLoc
 {
     debugs(54, 7, "freeing entry " << fileno <<
            " in " << path);
-    if (!inode.empty()) {
-        sfileno sliceId = inode.start;
-        debugs(54, 8, "first slice " << sliceId);
-        while (sliceId >= 0) {
-            Slice &slice = sliceAt(sliceId);
-            const sfileno nextId = slice.next;
-            slice.size = 0;
-            slice.next = -1;
-            if (cleaner)
-                cleaner->noteFreeMapSlice(sliceId); // might change slice state
-            sliceId = nextId;
-        }
-    }
-
-    inode.waitingToBeFreed = false;
+    if (!inode.empty())
+        freeChainAt(inode.start, inode.splicingPoint);
     inode.rewind();
 
     if (!keepLocked)
@@ -292,19 +306,62 @@ Ipc::StoreMap::freeChain(const sfileno fileno, Anchor &inode, const bool keepLoc
     debugs(54, 5, "freed entry " << fileno << " in " << path);
 }
 
+/// unconditionally frees an already locked chain of slots; no anchor maintenance
+void
+Ipc::StoreMap::freeChainAt(SliceId sliceId, const SliceId splicingPoint)
+{
+    static uint64_t ChainId = 0; // to pair freeing/freed calls in debugs()
+    const uint64_t chainId = ++ChainId;
+    debugs(54, 7, "freeing chain #" << chainId << " starting at " << sliceId << " in " << path);
+    while (sliceId >= 0) {
+        Slice &slice = sliceAt(sliceId);
+        const SliceId nextId = slice.next;
+        slice.size = 0;
+        slice.next = -1;
+        if (cleaner)
+            cleaner->noteFreeMapSlice(sliceId); // might change slice state
+        if (sliceId == splicingPoint) {
+            debugs(54, 5, "preserving chain #" << chainId << " in " << path <<
+                   " suffix after slice " << splicingPoint);
+            break; // do not free the rest of the chain
+        }
+        sliceId = nextId;
+    }
+    debugs(54, 7, "freed chain #" << chainId << " in " << path);
+}
+
+Ipc::StoreMap::SliceId
+Ipc::StoreMap::sliceContaining(const sfileno fileno, const uint64_t bytesNeeded) const
+{
+    const Anchor &anchor = anchorAt(fileno);
+    Must(anchor.reading());
+    uint64_t bytesSeen = 0;
+    SliceId lastSlice = anchor.start;
+    while (lastSlice >= 0) {
+        const Slice &slice = sliceAt(lastSlice);
+        bytesSeen += slice.size;
+        if (bytesSeen >= bytesNeeded)
+            break;
+        lastSlice = slice.next;
+    }
+    debugs(54, 7, "entry " << fileno << " has " << bytesNeeded << '/' << bytesSeen <<
+           " bytes at slice " << lastSlice << " in " << path);
+    return lastSlice; // may be negative
+}
+
 const Ipc::StoreMap::Anchor *
 Ipc::StoreMap::openForReading(const cache_key *const key, sfileno &fileno)
 {
     debugs(54, 5, "opening entry with key " << storeKeyText(key)
            << " for reading " << path);
-    const int idx = anchorIndexByKey(key);
+    const int idx = fileNoByKey(key);
     if (const Anchor *slot = openForReadingAt(idx)) {
         if (slot->sameKey(key)) {
             fileno = idx;
             return slot; // locked for reading
         }
         slot->lock.unlockShared();
-        debugs(54, 7, "closed entry " << idx << " for reading " << path);
+        debugs(54, 7, "closed wrong-key entry " << idx << " for reading " << path);
     }
     return NULL;
 }
@@ -349,14 +406,192 @@ Ipc::StoreMap::closeForReading(const sfileno fileno)
 }
 
 bool
-Ipc::StoreMap::purgeOne()
+Ipc::StoreMap::openForUpdating(Update &update, const sfileno fileNoHint)
 {
-    // Hopefully, we find a removable entry much sooner (TODO: use time?).
+    Must(update.entry);
+    const StoreEntry &entry = *update.entry;
+    const cache_key *const key = reinterpret_cast<const cache_key*>(entry.key);
+    update.stale.name = nameByKey(key);
+
+    if (!validEntry(fileNoHint)) {
+        debugs(54, 5, "opening entry with key " << storeKeyText(key) <<
+               " for updating " << path);
+        update.stale.fileNo = fileNoByName(update.stale.name);
+    } else {
+        update.stale.fileNo = fileNoHint;
+    }
+
+    debugs(54, 5, "opening entry " << update.stale.fileNo << " of " << entry << " for updating " << path);
+
+    // Unreadable entries cannot (e.g., empty and otherwise problematic entries)
+    // or should not (e.g., entries still forming their metadata) be updated.
+    if (const Anchor *anchor = openForReadingAt(update.stale.fileNo)) {
+        if (!anchor->sameKey(key)) {
+            closeForReading(update.stale.fileNo);
+            debugs(54, 5, "cannot open wrong-key entry " << update.stale.fileNo << " for updating " << path);
+            return false;
+        }
+    } else {
+        debugs(54, 5, "cannot open unreadable entry " << update.stale.fileNo << " for updating " << path);
+        return false;
+    }
+
+    update.stale.anchor = &anchorAt(update.stale.fileNo);
+    if (update.stale.anchor->writing()) {
+        // TODO: Support updating appending entries.
+        // For example, MemStore::updateHeaders() would not know how
+        // many old prefix body bytes to copy to the new prefix if the last old
+        // prefix slice has not been formed yet (i.e., still gets more bytes).
+        debugs(54, 5, "cannot open appending entry " << update.stale.fileNo <<
+               " for updating " << path);
+        closeForReading(update.stale.fileNo);
+        return false;
+    }
+
+    if (!update.stale.anchor->lock.lockHeaders()) {
+        debugs(54, 5, "cannot open updating entry " << update.stale.fileNo <<
+               " for updating " << path);
+        closeForReading(update.stale.fileNo);
+        return false;
+    }
+
+    /* stale anchor is properly locked; we can now use abortUpdating() if needed */
+
+    if (!openKeyless(update.fresh)) {
+        debugs(54, 5, "cannot open freshchainless entry " << update.stale.fileNo <<
+               " for updating " << path);
+        abortUpdating(update);
+        return false;
+    }
+
+    Must(update.stale);
+    Must(update.fresh);
+    update.fresh.anchor->set(entry);
+    debugs(54, 5, "opened entry " << update.stale.fileNo << " for updating " << path <<
+           " using entry " << update.fresh.fileNo << " of " << entry);
+
+    return true;
+}
+
+/// finds an anchor that is currently not associated with any entry key and
+/// locks it for writing so ensure exclusive access during updates
+bool
+Ipc::StoreMap::openKeyless(Update::Edition &edition)
+{
+    return visitVictims([&](const sfileno name) {
+        Update::Edition temp;
+        temp.name = name;
+        temp.fileNo = fileNoByName(temp.name);
+        if ((temp.anchor = openForWritingAt(temp.fileNo))) {
+            debugs(54, 5, "created entry " << temp.fileNo <<
+                   " for updating " << path);
+            Must(temp);
+            edition = temp;
+            return true;
+        }
+        return false;
+    });
+}
+
+void
+Ipc::StoreMap::closeForUpdating(Update &update)
+{
+    Must(update.stale.anchor);
+    Must(update.fresh.anchor);
+    AssertFlagIsSet(update.stale.anchor->lock.updating);
+    Must(update.stale.splicingPoint >= 0);
+    Must(update.fresh.splicingPoint >= 0);
+
+    /* the stale prefix cannot overlap with the fresh one (a weak check) */
+    Must(update.stale.anchor->start != update.fresh.anchor->start);
+    Must(update.stale.anchor->start != update.fresh.splicingPoint);
+    Must(update.stale.splicingPoint != update.fresh.anchor->start);
+    Must(update.stale.splicingPoint != update.fresh.splicingPoint);
+
+    /* the relative order of most operations is significant here */
+
+    /* splice the fresh chain prefix with the stale chain suffix */
+    Slice &freshSplicingSlice = sliceAt(update.fresh.splicingPoint);
+    const SliceId suffixStart = sliceAt(update.stale.splicingPoint).next; // may be negative
+    // the fresh chain is either properly terminated or already spliced
+    if (freshSplicingSlice.next < 0)
+        freshSplicingSlice.next = suffixStart;
+    else
+        Must(freshSplicingSlice.next == suffixStart);
+    // either way, fresh chain uses the stale chain suffix now
+
+    // make the fresh anchor/chain readable for everybody
+    update.fresh.anchor->lock.switchExclusiveToShared();
+    // but the fresh anchor is still invisible to anybody but us
+
+    // This freeEntry() code duplicates the code below to minimize the time when
+    // the freeEntry() race condition (see the Race: comment below) might occur.
+    if (update.stale.anchor->waitingToBeFreed)
+        freeEntry(update.fresh.fileNo);
+
+    /* any external changes were applied to the stale anchor/chain until now */
+    relocate(update.stale.name, update.fresh.fileNo);
+    /* any external changes will apply to the fresh anchor/chain from now on */
+
+    // Race: If the stale entry was deleted by some kid during the assignment,
+    // then we propagate that event to the fresh anchor and chain. Since this
+    // update is not atomically combined with the assignment above, another kid
+    // might get a fresh entry just before we have a chance to free it. However,
+    // such deletion races are always possible even without updates.
+    if (update.stale.anchor->waitingToBeFreed)
+        freeEntry(update.fresh.fileNo);
+
+    /* free the stale chain prefix except for the shared suffix */
+    update.stale.anchor->splicingPoint = update.stale.splicingPoint;
+    freeEntry(update.stale.fileNo);
+
+    // make the stale anchor/chain reusable, reachable via its new location
+    relocate(update.fresh.name, update.stale.fileNo);
+
+    const Update updateSaved = update; // for post-close debugging below
+
+    /* unlock the stale anchor/chain */
+    update.stale.anchor->lock.unlockHeaders();
+    closeForReading(update.stale.fileNo);
+    update.stale = Update::Edition();
+
+    // finally, unlock the fresh entry
+    closeForReading(update.fresh.fileNo);
+    update.fresh = Update::Edition();
+
+    debugs(54, 5, "closed entry " << updateSaved.stale.fileNo << " of " << *updateSaved.entry <<
+           " named " << updateSaved.stale.name << " for updating " << path <<
+           " to fresh entry " << updateSaved.fresh.fileNo << " named " << updateSaved.fresh.name <<
+           " with [" << updateSaved.fresh.anchor->start << ',' << updateSaved.fresh.splicingPoint <<
+           "] prefix containing at least " << freshSplicingSlice.size << " bytes");
+}
+
+/// Visits entries until either
+/// * the `visitor` returns true (indicating its satisfaction with the offer);
+/// * we give up finding a suitable entry because it already took "too long"; or
+/// * we have offered all entries.
+bool
+Ipc::StoreMap::visitVictims(const NameFilter visitor)
+{
+    // Hopefully, we find a usable entry much sooner (TODO: use time?).
     // The min() will protect us from division by zero inside the loop.
     const int searchLimit = min(10000, entryLimit());
     int tries = 0;
     for (; tries < searchLimit; ++tries) {
-        const sfileno fileno = static_cast<sfileno>(++anchors->victim % entryLimit());
+        const sfileno name = static_cast<sfileno>(++anchors->victim % entryLimit());
+        if (visitor(name))
+            return true;
+    }
+
+    debugs(54, 5, "no victims found in " << path << "; tried: " << tries);
+    return false;
+}
+
+bool
+Ipc::StoreMap::purgeOne()
+{
+    return visitVictims([&](const sfileno name) {
+        const sfileno fileno = fileNoByName(name);
         Anchor &s = anchorAt(fileno);
         if (s.lock.lockExclusive()) {
             // the caller wants a free slice; empty anchor is not enough
@@ -368,9 +603,8 @@ Ipc::StoreMap::purgeOne()
             }
             s.lock.unlockExclusive();
         }
-    }
-    debugs(54, 5, "no entries to purge from " << path << "; tried: " << tries);
-    return false;
+        return false;
+    });
 }
 
 void
@@ -435,17 +669,43 @@ Ipc::StoreMap::anchorAt(const sfileno fileno) const
 }
 
 sfileno
-Ipc::StoreMap::anchorIndexByKey(const cache_key *const key) const
+Ipc::StoreMap::nameByKey(const cache_key *const key) const
 {
     const uint64_t *const k = reinterpret_cast<const uint64_t *>(key);
     // TODO: use a better hash function
-    return (k[0] + k[1]) % entryLimit();
+    const int hash = (k[0] + k[1]) % entryLimit();
+    return hash;
+}
+
+sfileno
+Ipc::StoreMap::fileNoByName(const sfileno name) const
+{
+    // fileNos->items are initialized to zero, which we treat as "name is fileno";
+    // a positive value means the entry anchor got moved to a new fileNo
+    if (const int item = fileNos->items[name])
+        return item-1;
+    return name;
+}
+
+/// map `name` to `fileNo`
+void
+Ipc::StoreMap::relocate(const sfileno name, const sfileno fileno)
+{
+    // preserve special meaning for zero; see fileNoByName
+    fileNos->items[name] = fileno+1;
+}
+
+sfileno
+Ipc::StoreMap::fileNoByKey(const cache_key *const key) const
+{
+    const int name = nameByKey(key);
+    return fileNoByName(name);
 }
 
 Ipc::StoreMap::Anchor &
 Ipc::StoreMap::anchorByKey(const cache_key *const key)
 {
-    return anchorAt(anchorIndexByKey(key));
+    return anchorAt(fileNoByKey(key));
 }
 
 Ipc::StoreMap::Slice&
@@ -463,7 +723,7 @@ Ipc::StoreMap::sliceAt(const SliceId sliceId) const
 
 /* Ipc::StoreMapAnchor */
 
-Ipc::StoreMapAnchor::StoreMapAnchor(): start(0)
+Ipc::StoreMapAnchor::StoreMapAnchor(): start(0), splicingPoint(-1)
 {
     memset(&key, 0, sizeof(key));
     memset(&basics, 0, sizeof(basics));
@@ -502,17 +762,46 @@ Ipc::StoreMapAnchor::rewind()
 {
     assert(writing());
     start = 0;
+    splicingPoint = -1;
     memset(&key, 0, sizeof(key));
     memset(&basics, 0, sizeof(basics));
+    waitingToBeFreed = false;
     // but keep the lock
 }
 
-Ipc::StoreMap::Owner::Owner(): anchors(NULL), slices(NULL)
+/* Ipc::StoreMapUpdate */
+
+Ipc::StoreMapUpdate::StoreMapUpdate(StoreEntry *anEntry):
+    entry(anEntry)
+{
+    entry->lock("Ipc::StoreMapUpdate1");
+}
+
+Ipc::StoreMapUpdate::StoreMapUpdate(const StoreMapUpdate &other):
+    entry(other.entry),
+    stale(other.stale),
+    fresh(other.fresh)
+{
+    entry->lock("Ipc::StoreMapUpdate2");
+}
+
+Ipc::StoreMapUpdate::~StoreMapUpdate()
+{
+    entry->unlock("Ipc::StoreMapUpdate");
+}
+
+/* Ipc::StoreMap::Owner */
+
+Ipc::StoreMap::Owner::Owner():
+    fileNos(nullptr),
+    anchors(nullptr),
+    slices(nullptr)
 {
 }
 
 Ipc::StoreMap::Owner::~Owner()
 {
+    delete fileNos;
     delete anchors;
     delete slices;
 }
index c53d4209a283883244b25286407b6575d8f52884..2066b9efbd0026c18b94d3539bac6fd06231f050 100644 (file)
@@ -16,6 +16,8 @@
 #include "store/forward.h"
 #include "store_key_md5.h"
 
+#include <functional>
+
 namespace Ipc
 {
 
@@ -74,6 +76,7 @@ public:
     std::atomic<uint8_t> waitingToBeFreed; ///< may be accessed w/o a lock
 
     // fields marked with [app] can be modified when appending-while-reading
+    // fields marked with [update] can be modified when updating-while-reading
 
     uint64_t key[2]; ///< StoreEntry key
 
@@ -90,6 +93,10 @@ public:
 
     /// where the chain of StoreEntry slices begins [app]
     std::atomic<StoreMapSliceId> start;
+
+    /// where the updated chain prefix containing metadata/headers ends [update]
+    /// if unset, this anchor points to a chain that was never updated
+    std::atomic<StoreMapSliceId> splicingPoint;
 };
 
 /// an array of shareable Items
@@ -113,7 +120,7 @@ public:
 /// StoreMapSlices indexed by their slice ID.
 typedef StoreMapItems<StoreMapSlice> StoreMapSlices;
 
-/// StoreMapAnchors indexed by entry fileno plus
+/// StoreMapAnchors (indexed by fileno) plus
 /// sharing-safe basic housekeeping info about Store entries
 class StoreMapAnchors
 {
@@ -132,20 +139,58 @@ public:
 };
 // TODO: Find an elegant way to use StoreMapItems in StoreMapAnchors
 
+/// StoreMapAnchor positions, indexed by entry "name" (i.e., the entry key hash)
+typedef StoreMapItems< std::atomic<sfileno> > StoreMapFileNos;
+
+/// Aggregates information required for updating entry metadata and headers.
+class StoreMapUpdate
+{
+public:
+    /// During an update, the stored entry has two editions: stale and fresh.
+    class Edition
+    {
+    public:
+        Edition(): anchor(nullptr), fileNo(-1), name(-1), splicingPoint(-1) {}
+
+        /// whether this entry edition is currently used/initialized
+        explicit operator bool() const { return anchor; }
+
+        StoreMapAnchor *anchor; ///< StoreMap::anchors[fileNo], for convenience/speed
+        sfileno fileNo; ///< StoreMap::fileNos[name], for convenience/speed
+        sfileno name; ///< StoreEntry position in StoreMap::fileNos, for swapping Editions
+
+        /// the last slice in the chain still containing metadata/headers
+        StoreMapSliceId splicingPoint;
+    };
+
+    explicit StoreMapUpdate(StoreEntry *anEntry);
+    StoreMapUpdate(const StoreMapUpdate &other);
+    ~StoreMapUpdate();
+
+    StoreMapUpdate &operator =(const StoreMapUpdate &other) = delete;
+
+    StoreEntry *entry; ///< the store entry being updated
+    Edition stale; ///< old anchor and chain being updated
+    Edition fresh; ///< new anchor and updated chain prefix
+};
+
 class StoreMapCleaner;
 
 /// Manages shared Store index (e.g., locking/unlocking/freeing entries) using
-/// StoreMapAnchors indexed by their keys and
-/// StoreMapSlices indexed by their slide ID.
+/// StoreMapFileNos indexed by hashed entry keys (a.k.a. entry names),
+/// StoreMapAnchors indexed by fileno, and
+/// StoreMapSlices indexed by slice ID.
 class StoreMap
 {
 public:
+    typedef StoreMapFileNos FileNos;
     typedef StoreMapAnchor Anchor;
     typedef StoreMapAnchors Anchors;
     typedef sfileno AnchorId;
     typedef StoreMapSlice Slice;
     typedef StoreMapSlices Slices;
     typedef StoreMapSliceId SliceId;
+    typedef StoreMapUpdate Update;
 
 public:
     /// aggregates anchor and slice owners for Init() caller convenience
@@ -154,6 +199,7 @@ public:
     public:
         Owner();
         ~Owner();
+        FileNos::Owner *fileNos;
         Anchors::Owner *anchors;
         Slices::Owner *slices;
     private:
@@ -166,8 +212,8 @@ public:
 
     StoreMap(const SBuf &aPath);
 
-    /// computes map entry position for a given entry key
-    sfileno anchorIndexByKey(const cache_key *const key) const;
+    /// computes map entry anchor position for a given entry key
+    sfileno fileNoByKey(const cache_key *const key) const;
 
     /// Like strcmp(mapped, new), but for store entry versions/timestamps.
     /// Returns +2 if the mapped entry does not exist; -1/0/+1 otherwise.
@@ -188,6 +234,13 @@ public:
     /// this call does not free entry slices so the caller has to do that
     void forgetWritingEntry(const sfileno fileno);
 
+    /// finds and locks the Update entry for an exclusive metadata update
+    bool openForUpdating(Update &update, sfileno fileNoHint);
+    /// makes updated info available to others, unlocks, and cleans up
+    void closeForUpdating(Update &update);
+    /// undoes partial update, unlocks, and cleans up
+    void abortUpdating(Update &update);
+
     /// only works on locked entries; returns nil unless the slice is readable
     const Anchor *peekAtReader(const sfileno fileno) const;
 
@@ -216,6 +269,11 @@ public:
     /// readable anchor for the entry created by openForReading()
     const Anchor &readableEntry(const AnchorId anchorId) const;
 
+    /// Returns the ID of the entry slice containing n-th byte or
+    /// a negative ID if the entry does not store that many bytes (yet).
+    /// Requires a read lock.
+    SliceId sliceContaining(const sfileno fileno, const uint64_t nth) const;
+
     /// stop writing the entry, freeing its slot for others to use if possible
     void abortWriting(const sfileno fileno);
 
@@ -239,10 +297,17 @@ public:
 
 protected:
     const SBuf path; ///< cache_dir path or similar cache name; for logging
+    Mem::Pointer<StoreMapFileNos> fileNos; ///< entry inodes (starting blocks)
     Mem::Pointer<StoreMapAnchors> anchors; ///< entry inodes (starting blocks)
     Mem::Pointer<StoreMapSlices> slices; ///< chained entry pieces positions
 
 private:
+    /// computes entry name (i.e., key hash) for a given entry key
+    sfileno nameByKey(const cache_key *const key) const;
+    /// computes anchor position for a given entry name
+    sfileno fileNoByName(const sfileno name) const;
+    void relocate(const sfileno name, const sfileno fileno);
+
     Anchor &anchorAt(const sfileno fileno);
     const Anchor &anchorAt(const sfileno fileno) const;
     Anchor &anchorByKey(const cache_key *const key);
@@ -250,8 +315,14 @@ private:
     Slice &sliceAt(const SliceId sliceId);
     const Slice &sliceAt(const SliceId sliceId) const;
     Anchor *openForReading(Slice &s);
+    bool openKeyless(Update::Edition &edition);
+    void closeForUpdateFinal(Update &update);
+
+    typedef std::function<bool (const sfileno name)> NameFilter; // a "name"-based test
+    bool visitVictims(const NameFilter filter);
 
     void freeChain(const sfileno fileno, Anchor &inode, const bool keepLock);
+    void freeChainAt(SliceId sliceId, const SliceId splicingPoint);
 };
 
 /// API for adjusting external state when dirty map slice is being freed
index 7f2c3b2ca95b32beeca530e49bec25d517d471f6..9920581bfe559b036b335d5f470ad2b18c66e652 100644 (file)
@@ -537,11 +537,7 @@ peerDigestFetchReply(void *data, char *buf, ssize_t size)
 
             assert(fetch->old_entry->mem_obj->request);
 
-            HttpReply *old_rep = (HttpReply *) fetch->old_entry->getReply();
-
-            old_rep->updateOnNotModified(reply);
-
-            fetch->old_entry->timestampsSet();
+            Store::Root().updateOnNotModified(fetch->old_entry, *fetch->entry);
 
             /* get rid of 304 reply */
             storeUnregister(fetch->sc, fetch->entry, fetch);
index 0c336aa05247ba28c98a9b136919d251cd5affd1..38a7c299b6111e584483b0d5a76f84cfd5166bda 100644 (file)
@@ -25,6 +25,9 @@ public:
     /// return false iff the idle entry should be destroyed
     virtual bool dereference(StoreEntry &e) = 0;
 
+    /// make stored metadata and HTTP headers the same as in the given entry
+    virtual void updateHeaders(StoreEntry *) {}
+
     /// If this storage cannot cache collapsed entries, return false.
     /// If the entry is not found, return false. Otherwise, return true after
     /// tying the entry to this cache and setting inSync to updateCollapsed().
index 1e9401418aec2484e0d1d4c2f9e916e7a90d9069..9af6e609a4928fc86a0a6d9a84c3183039ef850c 100644 (file)
@@ -467,6 +467,25 @@ Store::Controller::handleIdleEntry(StoreEntry &e)
     }
 }
 
+void
+Store::Controller::updateOnNotModified(StoreEntry *old, const StoreEntry &newer)
+{
+    /* update the old entry object */
+    Must(old);
+    HttpReply *oldReply = const_cast<HttpReply*>(old->getReply());
+    Must(oldReply);
+    oldReply->updateOnNotModified(newer.getReply());
+    old->timestampsSet();
+
+    /* update stored image of the old entry */
+
+    if (memStore && old->mem_status == IN_MEMORY && !EBIT_TEST(old->flags, ENTRY_SPECIAL))
+        memStore->updateHeaders(old);
+
+    if (old->swap_dirn > -1)
+        swapDir->updateHeaders(old);
+}
+
 void
 Store::Controller::allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags,
                                    const HttpRequestMethod &reqMethod)
index b38d4b75f24461509c64e55700086e51bbcc473f..65eb26ea8a41069baca510a5db95520f374bf4aa 100644 (file)
@@ -48,6 +48,9 @@ public:
     /// called to get rid of no longer needed entry data in RAM, if any
     void memoryOut(StoreEntry &, const bool preserveSwappable);
 
+    /// update old entry metadata and HTTP headers using a newer entry
+    void updateOnNotModified(StoreEntry *old, const StoreEntry &newer);
+
     /// makes the entry available for collapsing future requests
     void allowCollapsing(StoreEntry *, const RequestFlags &, const HttpRequestMethod &);
 
index 7f2a1e776bd3357c36f727c16f98f5136811bfcf..f1bea0caef10f3f29396877cc35140016d89497c 100644 (file)
@@ -381,6 +381,13 @@ Store::Disks::dereference(StoreEntry &e)
     return e.disk().dereference(e);
 }
 
+void
+Store::Disks::updateHeaders(StoreEntry *e)
+{
+    Must(e);
+    return e->disk().updateHeaders(e);
+}
+
 void
 Store::Disks::maintain()
 {
index 49def690529a14e50bca1296fef9c87718c3ffbd..1203e1c815f4f1b61a3eaa3d81a66a5fac45a940 100644 (file)
@@ -32,6 +32,7 @@ public:
     virtual void sync() override;
     virtual void reference(StoreEntry &) override;
     virtual bool dereference(StoreEntry &e) override;
+    virtual void updateHeaders(StoreEntry *) override;
     virtual void maintain() override;
     virtual bool anchorCollapsed(StoreEntry &e, bool &inSync) override;
     virtual bool updateCollapsed(StoreEntry &e) override;
index 07b1c4c43eb09e55469ff1335725125a0b035c2d..196ede285efe571f825bc6005b55fe4b171e4305 100644 (file)
@@ -28,5 +28,6 @@ HttpReply::HttpReply() : HttpMsg(hoReply), date (0), last_modified (0),
     void HttpReply::hdrCacheInit() STUB
     HttpReply * HttpReply::clone() const STUB_RETVAL(NULL)
     bool HttpReply::inheritProperties(const HttpMsg *aMsg) STUB_RETVAL(false)
+    void HttpReply::updateOnNotModified(HttpReply const*) STUB
     int64_t HttpReply::bodySize(const HttpRequestMethod&) const STUB_RETVAL(0)
 
index 1688e84e32046aff17b97d3f4b8f7de990561162..060188b43f39d35973a64360949d3f3d7fdf1114 100644 (file)
@@ -22,6 +22,7 @@ void MemStore::completeWriting(StoreEntry &e) STUB
 void MemStore::unlink(StoreEntry &e) STUB
 void MemStore::disconnect(StoreEntry &e) STUB
 void MemStore::reference(StoreEntry &) STUB
+void MemStore::updateHeaders(StoreEntry *) STUB
 void MemStore::maintain() STUB
 void MemStore::noteFreeMapSlice(const Ipc::StoreMapSliceId) STUB
 void MemStore::init() STUB