From abf396ecb7fcdc3e156c73d3eb221c5576627689 Mon Sep 17 00:00:00 2001 From: Alex Rousskov Date: Fri, 11 Mar 2016 11:00:51 -0700 Subject: [PATCH] Bug 7: Update cached entries on 304 responses. New Store API to update entry metadata and headers on 304s. Support entry updates in shared memory cache and rock cache_dirs. No changes to ufs-based cache_dirs: Their entries are still not updated. * Atomic StoreEntry metadata updating StoreEntry metadata (swap_file_sz, timestamps, etc.) is used throughout Squid code. Metadata cannot be updated atomically because it has many fields, but a partial update to those fields causes assertions. Still, we must update metadata when updating HTTP headers. Locking the entire entry for a rewrite does not work well because concurrent requests will attempt to download a new entry copy, defeating the very HTTP 304 optimization we want to support. Ipc::StoreMap index now uses an extra level of indirection (the StoreMap::fileNos index) which allows StoreMap control which anchor/fileno is associated with a given StoreEntry key. The entry updating code creates a disassociated (i.e., entry/key-less) anchor, writes new metadata and headers using that new anchor, and then _atomically_ switches the map to use that new anchor. This allows old readers to continue reading using the stale anchor/fileno as if nothing happened while a new reader gets the new anchor/fileno. Shared memory usage increase: 8 additional bytes per cache entry: 4 for the extra level of indirection (StoreMapFileNos) plus 4 for splicing fresh chain prefix with the stale chain suffix (StoreMapAnchor::splicingPoint). However, if the updated headers are larger than the stale ones, Squid will allocate shared memory pages to accommodate for the increase, leading to shared memory fragmentation/waste for small increases. * Revamped rock index rebuild process The index rebuild process had to be completely revamped because splicing fresh and stale entry slot chain segments implies tolerating multiple entry versions in a single chain and the old code was based on the assumption that different slot versions are incompatible. We were also uncomfortable with the old cavalier approach to accessing two differently indexed layers of information (entry vs. slot) using the same set of class fields, making it trivial to accidentally access entry data while using slot index. During the rewrite of the index rebuilding code, we also discovered a way to significantly reduce RAM usage for the index build map (a temporary object that is allocated in the beginning and freed at the end of the index build process). The savings depend on the cache size: A small cache saves about 30% (17 vs 24 bytes per entry/slot) while a 1TB cache_dir with 32KB slots (which implies uneven entry/slot indexes) saves more than 50% (~370MB vs. ~800MB). Adjusted how invalid slots are counted. The code was sometimes counting invalid entries and sometimes invalid entry slots. We should always count _slots_ now because progress is measured in the number of slots scanned, not entries loaded. This accounting change may surprise users with much higher "Invalid entries" count in cache.log upon startup, but at least the new reports are meaningful. This rewrite does not attempt to solve all rock index build problems. For example, the code still assumes that StoreEntry metadata fits a single slot which is not always true for very small slots. --- src/MemStore.cc | 285 +++++++++++++++-- src/MemStore.h | 11 +- src/StoreIOState.cc | 6 + src/StoreIOState.h | 9 +- src/client_side_reply.cc | 5 +- src/fs/Makefile.am | 2 + src/fs/rock/RockHeaderUpdater.cc | 267 ++++++++++++++++ src/fs/rock/RockHeaderUpdater.h | 73 +++++ src/fs/rock/RockIoState.cc | 36 ++- src/fs/rock/RockIoState.h | 9 +- src/fs/rock/RockRebuild.cc | 517 +++++++++++++++++++------------ src/fs/rock/RockRebuild.h | 21 +- src/fs/rock/RockSwapDir.cc | 72 ++++- src/fs/rock/RockSwapDir.h | 7 +- src/fs/rock/forward.h | 2 + src/http.cc | 5 +- src/ipc/ReadWriteLock.cc | 28 ++ src/ipc/ReadWriteLock.h | 10 +- src/ipc/StoreMap.cc | 351 +++++++++++++++++++-- src/ipc/StoreMap.h | 81 ++++- src/peer_digest.cc | 6 +- src/store/Controlled.h | 3 + src/store/Controller.cc | 19 ++ src/store/Controller.h | 3 + src/store/Disks.cc | 7 + src/store/Disks.h | 1 + src/tests/stub_HttpReply.cc | 1 + src/tests/stub_MemStore.cc | 1 + 28 files changed, 1526 insertions(+), 312 deletions(-) create mode 100644 src/fs/rock/RockHeaderUpdater.cc create mode 100644 src/fs/rock/RockHeaderUpdater.h diff --git a/src/MemStore.cc b/src/MemStore.cc index e7b2722c05..8393dda240 100644 --- a/src/MemStore.cc +++ b/src/MemStore.cc @@ -35,6 +35,138 @@ static const char *ExtrasLabel = "cache_mem_ex"; // used except for a positivity test. A unique value is handy for debugging. static const uint32_t SpacePoolId = 510716; +/// Packs to shared memory, allocating new slots/pages as needed. +/// Requires an Ipc::StoreMapAnchor locked for writing. +class ShmWriter: public Packable +{ +public: + ShmWriter(MemStore &aStore, StoreEntry *anEntry, const sfileno aFileNo, Ipc::StoreMapSliceId aFirstSlice = -1); + + /* Packable API */ + virtual void append(const char *aBuf, int aSize) override; + virtual void vappendf(const char *fmt, va_list ap) override; + +public: + StoreEntry *entry; ///< the entry being updated + + /// the slot keeping the first byte of the appended content (at least) + /// either set via constructor parameter or allocated by the first append + Ipc::StoreMapSliceId firstSlice; + + /// the slot keeping the last byte of the appended content (at least) + Ipc::StoreMapSliceId lastSlice; + + uint64_t totalWritten; ///< cumulative number of bytes appended so far + +protected: + void copyToShm(); + void copyToShmSlice(Ipc::StoreMap::Slice &slice); + +private: + MemStore &store; + const sfileno fileNo; + + /* set by (and only valid during) append calls */ + const char *buf; ///< content being appended now + int bufSize; ///< buf size + int bufWritten; ///< buf bytes appended so far +}; + +/* ShmWriter */ + +ShmWriter::ShmWriter(MemStore &aStore, StoreEntry *anEntry, const sfileno aFileNo, Ipc::StoreMapSliceId aFirstSlice): + entry(anEntry), + firstSlice(aFirstSlice), + lastSlice(firstSlice), + totalWritten(0), + store(aStore), + fileNo(aFileNo), + buf(nullptr), + bufSize(0), + bufWritten(0) +{ + Must(entry); +} + +void +ShmWriter::append(const char *aBuf, int aBufSize) +{ + Must(!buf); + buf = aBuf; + bufSize = aBufSize; + if (bufSize) { + Must(buf); + bufWritten = 0; + copyToShm(); + } + buf = nullptr; + bufSize = 0; + bufWritten = 0; +} + +void +ShmWriter::vappendf(const char *fmt, va_list ap) +{ + SBuf vaBuf; +#if defined(VA_COPY) + va_list apCopy; + VA_COPY(apCopy, ap); + vaBuf.vappendf(fmt, apCopy); + va_end(apCopy); +#else + vaBuf.vappendf(fmt, ap); +#endif + append(vaBuf.rawContent(), vaBuf.length()); +} + +/// copies the entire buffer to shared memory +void +ShmWriter::copyToShm() +{ + Must(bufSize > 0); // do not use up shared memory pages for nothing + Must(firstSlice < 0 || lastSlice >= 0); + + // fill, skip slices that are already full + while (bufWritten < bufSize) { + Ipc::StoreMap::Slice &slice = store.nextAppendableSlice(fileNo, lastSlice); + if (firstSlice < 0) + firstSlice = lastSlice; + copyToShmSlice(slice); + } + + debugs(20, 7, "stored " << bufWritten << '/' << totalWritten << " header bytes of " << *entry); +} + +/// copies at most one slice worth of buffer to shared memory +void +ShmWriter::copyToShmSlice(Ipc::StoreMap::Slice &slice) +{ + Ipc::Mem::PageId page = store.pageForSlice(lastSlice); + debugs(20, 7, "entry " << *entry << " slice " << lastSlice << " has " << + page); + + Must(bufWritten <= bufSize); + const int64_t writingDebt = bufSize - bufWritten; + const int64_t pageSize = Ipc::Mem::PageSize(); + const int64_t sliceOffset = totalWritten % pageSize; + const int64_t copySize = std::min(writingDebt, pageSize - sliceOffset); + memcpy(static_cast(PagePointer(page)) + sliceOffset, buf + bufWritten, + copySize); + + debugs(20, 7, "copied " << slice.size << '+' << copySize << " bytes of " << + entry << " from " << sliceOffset << " in " << page); + + slice.size += copySize; + bufWritten += copySize; + totalWritten += copySize; + // fresh anchor.basics.swap_file_sz is already set [to the stale value] + + // either we wrote everything or we filled the entire slice + Must(bufWritten == bufSize || sliceOffset + copySize == pageSize); +} + +/* MemStore */ + MemStore::MemStore(): map(NULL), lastWritingSlice(-1) { } @@ -207,6 +339,69 @@ MemStore::get(const cache_key *key) return NULL; } +void +MemStore::updateHeaders(StoreEntry *updatedE) +{ + if (!map) + return; + + Ipc::StoreMapUpdate update(updatedE); + assert(updatedE); + assert(updatedE->mem_obj); + if (!map->openForUpdating(update, updatedE->mem_obj->memCache.index)) + return; + + try { + updateHeadersOrThrow(update); + } catch (const std::exception &ex) { + debugs(20, 2, "error starting to update entry " << *updatedE << ": " << ex.what()); + map->abortUpdating(update); + } +} + +void +MemStore::updateHeadersOrThrow(Ipc::StoreMapUpdate &update) +{ + // our +/- hdr_sz math below does not work if the chains differ [in size] + Must(update.stale.anchor->basics.swap_file_sz == update.fresh.anchor->basics.swap_file_sz); + + const HttpReply *rawReply = update.entry->getReply(); + Must(rawReply); + const HttpReply &reply = *rawReply; + const uint64_t staleHdrSz = reply.hdr_sz; + debugs(20, 7, "stale hdr_sz: " << staleHdrSz); + + /* we will need to copy same-slice payload after the stored headers later */ + Must(staleHdrSz > 0); + update.stale.splicingPoint = map->sliceContaining(update.stale.fileNo, staleHdrSz); + Must(update.stale.splicingPoint >= 0); + Must(update.stale.anchor->basics.swap_file_sz >= staleHdrSz); + + Must(update.stale.anchor); + ShmWriter writer(*this, update.entry, update.fresh.fileNo); + reply.packHeadersInto(&writer); + const uint64_t freshHdrSz = writer.totalWritten; + debugs(20, 7, "fresh hdr_sz: " << freshHdrSz << " diff: " << (freshHdrSz - staleHdrSz)); + + /* copy same-slice payload remaining after the stored headers */ + const Ipc::StoreMapSlice &slice = map->readableSlice(update.stale.fileNo, update.stale.splicingPoint); + const Ipc::StoreMapSlice::Size sliceCapacity = Ipc::Mem::PageSize(); + const Ipc::StoreMapSlice::Size headersInLastSlice = staleHdrSz % sliceCapacity; + Must(headersInLastSlice > 0); // or sliceContaining() would have stopped earlier + Must(slice.size >= headersInLastSlice); + const Ipc::StoreMapSlice::Size payloadInLastSlice = slice.size - headersInLastSlice; + const MemStoreMapExtras::Item &extra = extras->items[update.stale.splicingPoint]; + char *page = static_cast(PagePointer(extra.page)); + debugs(20, 5, "appending same-slice payload: " << payloadInLastSlice); + writer.append(page + headersInLastSlice, payloadInLastSlice); + update.fresh.splicingPoint = writer.lastSlice; + + update.fresh.anchor->basics.swap_file_sz -= staleHdrSz; + update.fresh.anchor->basics.swap_file_sz += freshHdrSz; + + map->closeForUpdating(update); +} + bool MemStore::anchorCollapsed(StoreEntry &collapsed, bool &inSync) { @@ -498,10 +693,6 @@ MemStore::copyToShm(StoreEntry &e) assert(map); assert(e.mem_obj); - const int32_t index = e.mem_obj->memCache.index; - assert(index >= 0); - Ipc::StoreMapAnchor &anchor = map->writeableEntry(index); - const int64_t eSize = e.mem_obj->endOffset(); if (e.mem_obj->memCache.offset >= eSize) { debugs(20, 5, "postponing copying " << e << " for lack of news: " << @@ -509,34 +700,19 @@ MemStore::copyToShm(StoreEntry &e) return; // nothing to do (yet) } - if (anchor.start < 0) { // must allocate the very first slot for e - Ipc::Mem::PageId page; - anchor.start = reserveSapForWriting(page); // throws - extras->items[anchor.start].page = page; - } - + const int32_t index = e.mem_obj->memCache.index; + assert(index >= 0); + Ipc::StoreMapAnchor &anchor = map->writeableEntry(index); lastWritingSlice = anchor.start; - const size_t sliceCapacity = Ipc::Mem::PageSize(); // fill, skip slices that are already full // Optimize: remember lastWritingSlice in e.mem_obj while (e.mem_obj->memCache.offset < eSize) { - Ipc::StoreMap::Slice &slice = - map->writeableSlice(e.mem_obj->memCache.index, lastWritingSlice); - - if (slice.size >= sliceCapacity) { - if (slice.next >= 0) { - lastWritingSlice = slice.next; - continue; - } - - Ipc::Mem::PageId page; - slice.next = lastWritingSlice = reserveSapForWriting(page); - extras->items[lastWritingSlice].page = page; - debugs(20, 7, "entry " << index << " new slice: " << lastWritingSlice); - } - - copyToShmSlice(e, anchor); + Ipc::StoreMap::Slice &slice = nextAppendableSlice( + e.mem_obj->memCache.index, lastWritingSlice); + if (anchor.start < 0) + anchor.start = lastWritingSlice; + copyToShmSlice(e, anchor, slice); } debugs(20, 7, "mem-cached available " << eSize << " bytes of " << e); @@ -544,13 +720,9 @@ MemStore::copyToShm(StoreEntry &e) /// copies at most one slice worth of local memory to shared memory void -MemStore::copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor) +MemStore::copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor, Ipc::StoreMap::Slice &slice) { - Ipc::StoreMap::Slice &slice = - map->writeableSlice(e.mem_obj->memCache.index, lastWritingSlice); - - Ipc::Mem::PageId page = extras->items[lastWritingSlice].page; - assert(lastWritingSlice >= 0 && page); + Ipc::Mem::PageId page = pageForSlice(lastWritingSlice); debugs(20, 7, "entry " << e << " slice " << lastWritingSlice << " has " << page); @@ -576,6 +748,53 @@ MemStore::copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor) anchor.basics.swap_file_sz = e.mem_obj->memCache.offset; } +/// starts checking with the entry chain slice at a given offset and +/// returns a not-full (but not necessarily empty) slice, updating sliceOffset +Ipc::StoreMap::Slice & +MemStore::nextAppendableSlice(const sfileno fileNo, sfileno &sliceOffset) +{ + // allocate the very first slot for the entry if needed + if (sliceOffset < 0) { + Ipc::StoreMapAnchor &anchor = map->writeableEntry(fileNo); + Must(anchor.start < 0); + Ipc::Mem::PageId page; + sliceOffset = reserveSapForWriting(page); // throws + extras->items[sliceOffset].page = page; + anchor.start = sliceOffset; + } + + const size_t sliceCapacity = Ipc::Mem::PageSize(); + do { + Ipc::StoreMap::Slice &slice = map->writeableSlice(fileNo, sliceOffset); + + if (slice.size >= sliceCapacity) { + if (slice.next >= 0) { + sliceOffset = slice.next; + continue; + } + + Ipc::Mem::PageId page; + slice.next = sliceOffset = reserveSapForWriting(page); + extras->items[sliceOffset].page = page; + debugs(20, 7, "entry " << fileNo << " new slice: " << sliceOffset); + } + + return slice; + } while (true); + /* not reached */ +} + +/// safely returns a previously allocated memory page for the given entry slice +Ipc::Mem::PageId +MemStore::pageForSlice(Ipc::StoreMapSliceId sliceId) +{ + Must(extras); + Must(sliceId >= 0); + Ipc::Mem::PageId page = extras->items[sliceId].page; + Must(page); + return page; +} + /// finds a slot and a free page to fill or throws sfileno MemStore::reserveSapForWriting(Ipc::Mem::PageId &page) diff --git a/src/MemStore.h b/src/MemStore.h index 179195fc9d..6214b50e21 100644 --- a/src/MemStore.h +++ b/src/MemStore.h @@ -22,6 +22,8 @@ struct MemStoreMapExtraItem { typedef Ipc::StoreMapItems MemStoreMapExtras; typedef Ipc::StoreMap MemStoreMap; +class ShmWriter; + /// Stores HTTP entities in RAM. Current implementation uses shared memory. /// Unlike a disk store (SwapDir), operations are synchronous (and fast). class MemStore: public Store::Controlled, public Ipc::StoreMapCleaner @@ -55,6 +57,7 @@ public: virtual void stat(StoreEntry &e) const override; virtual void reference(StoreEntry &e) override; virtual bool dereference(StoreEntry &e) override; + virtual void updateHeaders(StoreEntry *e) override; virtual void maintain() override; virtual bool anchorCollapsed(StoreEntry &e, bool &inSync) override; virtual bool updateCollapsed(StoreEntry &e) override; @@ -64,17 +67,23 @@ public: static int64_t EntryLimit(); protected: + friend ShmWriter; + bool shouldCache(StoreEntry &e) const; bool startCaching(StoreEntry &e); void copyToShm(StoreEntry &e); - void copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor); + void copyToShmSlice(StoreEntry &e, Ipc::StoreMapAnchor &anchor, Ipc::StoreMap::Slice &slice); bool copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor); bool copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof); + void updateHeadersOrThrow(Ipc::StoreMapUpdate &update); + void anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor); bool updateCollapsedWith(StoreEntry &collapsed, const sfileno index, const Ipc::StoreMapAnchor &anchor); + Ipc::Mem::PageId pageForSlice(Ipc::StoreMapSliceId sliceId); + Ipc::StoreMap::Slice &nextAppendableSlice(const sfileno entryIndex, sfileno &sliceOffset); sfileno reserveSapForWriting(Ipc::Mem::PageId &page); // Ipc::StoreMapCleaner API diff --git a/src/StoreIOState.cc b/src/StoreIOState.cc index 8a89b807d0..e741921d51 100644 --- a/src/StoreIOState.cc +++ b/src/StoreIOState.cc @@ -11,6 +11,7 @@ #include "squid.h" #include "Debug.h" #include "defines.h" +#include "Store.h" #include "StoreIOState.h" void * @@ -52,3 +53,8 @@ StoreIOState::~StoreIOState() cbdataReferenceDone(callback_data); } +bool StoreIOState::touchingStoreEntry() const +{ + return e && e->swap_filen == swap_filen; +} + diff --git a/src/StoreIOState.h b/src/StoreIOState.h index b3f4c5f1f9..91779943b8 100644 --- a/src/StoreIOState.h +++ b/src/StoreIOState.h @@ -73,12 +73,19 @@ public: } CloseHow; virtual void close(int how) = 0; ///< finish or abort swapping per CloseHow + // Tests whether we are working with the primary/public StoreEntry chain. + // Reads start reading the primary chain, but it may become secondary. + // There are two store write kinds: + // * regular writes that change (usually append) the entry visible to all and + // * header updates that create a fresh chain (while keeping the stale one usable). + bool touchingStoreEntry() const; + sdirno swap_dirn; sfileno swap_filen; StoreEntry *e; /* Need this so the FS layers can play god */ mode_t mode; off_t offset_; ///< number of bytes written or read for this entry so far - STFNCB *file_callback; /* called on delayed sfileno assignments */ + STFNCB *file_callback; // XXX: Unused. TODO: Remove. STIOCB *callback; void *callback_data; diff --git a/src/client_side_reply.cc b/src/client_side_reply.cc index f9ed177b45..32fb00055d 100644 --- a/src/client_side_reply.cc +++ b/src/client_side_reply.cc @@ -398,7 +398,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result) sendClientOldEntry(); } - HttpReply *old_rep = (HttpReply *) old_entry->getReply(); + const HttpReply *old_rep = old_entry->getReply(); // origin replied 304 if (status == Http::scNotModified) { @@ -406,8 +406,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result) http->request->flags.staleIfHit = false; // old_entry is no longer stale // update headers on existing entry - old_rep->updateOnNotModified(http->storeEntry()->getReply()); - old_entry->timestampsSet(); + Store::Root().updateOnNotModified(old_entry, *http->storeEntry()); // if client sent IMS diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am index 5ba97cfc2d..1890b2050f 100644 --- a/src/fs/Makefile.am +++ b/src/fs/Makefile.am @@ -38,6 +38,8 @@ librock_la_SOURCES = \ rock/forward.h \ rock/RockDbCell.cc \ rock/RockDbCell.h \ + rock/RockHeaderUpdater.cc \ + rock/RockHeaderUpdater.h \ rock/RockIoState.cc \ rock/RockIoState.h \ rock/RockIoRequests.cc \ diff --git a/src/fs/rock/RockHeaderUpdater.cc b/src/fs/rock/RockHeaderUpdater.cc new file mode 100644 index 0000000000..caf5fc691e --- /dev/null +++ b/src/fs/rock/RockHeaderUpdater.cc @@ -0,0 +1,267 @@ +/* + * Copyright (C) 1996-2016 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#include "squid.h" +#include "base/AsyncJobCalls.h" +#include "Debug.h" +#include "fs/rock/RockHeaderUpdater.h" +#include "fs/rock/RockIoState.h" +#include "mime_header.h" +#include "Store.h" +#include "StoreMetaUnpacker.h" + +CBDATA_NAMESPACED_CLASS_INIT(Rock, HeaderUpdater); + +Rock::HeaderUpdater::HeaderUpdater(const Rock::SwapDir::Pointer &aStore, const Ipc::StoreMapUpdate &anUpdate): + AsyncJob("Rock::HeaderUpdater"), + store(aStore), + update(anUpdate), + reader(), + writer(), + bytesRead(0), + staleSwapHeaderSize(0), + staleSplicingPointNext(-1) +{ + // TODO: Consider limiting the number of concurrent store updates. +} + +bool +Rock::HeaderUpdater::doneAll() const +{ + return !reader && !writer && AsyncJob::doneAll(); +} + +void +Rock::HeaderUpdater::swanSong() +{ + if (update.stale || update.fresh) + store->map->abortUpdating(update); + + if (reader) { + reader->close(StoreIOState::readerDone); + reader = nullptr; + } + + if (writer) { + writer->close(StoreIOState::writerGone); + // Emulate SwapDir::disconnect() that writeCompleted(err) hopes for. + // Also required to avoid IoState destructor assertions. + // We can do this because we closed update earlier or aborted it above. + dynamic_cast(*writer).writeableAnchor_ = nullptr; + writer = nullptr; + } + + AsyncJob::swanSong(); +} + +void +Rock::HeaderUpdater::start() +{ + Must(update.entry); + Must(update.stale); + Must(update.fresh); + startReading(); +} + +void +Rock::HeaderUpdater::startReading() +{ + reader = store->openStoreIO( + *update.entry, + nullptr, // unused; see StoreIOState::file_callback + &NoteDoneReading, + this); + readMore("need swap entry metadata"); +} + +void +Rock::HeaderUpdater::stopReading(const char *why) +{ + debugs(47, 7, why); + + Must(reader); + const IoState &rockReader = dynamic_cast(*reader); + update.stale.splicingPoint = rockReader.splicingPoint; + staleSplicingPointNext = rockReader.staleSplicingPointNext; + debugs(47, 5, "stale chain ends at " << update.stale.splicingPoint << + " body continues at " << staleSplicingPointNext); + + reader->close(StoreIOState::readerDone); // calls noteDoneReading(0) + reader = nullptr; // so that swanSong() does not try to close again +} + +void +Rock::HeaderUpdater::NoteRead(void *data, const char *buf, ssize_t result, StoreIOState::Pointer) +{ + // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free". + CallJobHere1(47, 7, + CbcPointer(static_cast(data)), + Rock::HeaderUpdater, + noteRead, + result); +} + +void +Rock::HeaderUpdater::noteRead(ssize_t result) +{ + debugs(47, 7, result); + if (!result) { // EOF + stopReading("eof"); + } else { + Must(result > 0); + bytesRead += result; + readerBuffer.forceSize(readerBuffer.length() + result); + exchangeBuffer.append(readerBuffer); + debugs(47, 7, "accumulated " << exchangeBuffer.length()); + } + + parseReadBytes(); +} + +void +Rock::HeaderUpdater::readMore(const char *why) +{ + debugs(47, 7, "from " << bytesRead << " because " << why); + Must(reader); + readerBuffer.clear(); + storeRead(reader, + readerBuffer.rawSpace(store->slotSize), + store->slotSize, + bytesRead, + &NoteRead, + this); +} + +void +Rock::HeaderUpdater::NoteDoneReading(void *data, int errflag, StoreIOState::Pointer) +{ + // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free". + CallJobHere1(47, 7, + CbcPointer(static_cast(data)), + Rock::HeaderUpdater, + noteDoneReading, + errflag); +} + +void +Rock::HeaderUpdater::noteDoneReading(int errflag) +{ + debugs(47, 5, errflag << " writer=" << writer); + if (const bool weInitiatedThisClosure = !reader) { + Must(!errflag); // we only initiate successful closures + Must(writer); // otherwise we would be done() and would not be called + } else { + reader = nullptr; // we are done reading + Must(errflag); // any external closures ought to be errors + mustStop("read error"); + } +} + +void +Rock::HeaderUpdater::startWriting() +{ + writer = store->createUpdateIO( + update, + nullptr, // unused; see StoreIOState::file_callback + &NoteDoneWriting, + this); + Must(writer); + + IoState &rockWriter = dynamic_cast(*writer); + rockWriter.staleSplicingPointNext = staleSplicingPointNext; + + off_t offset = 0; // current writing offset (for debugging) + + { + debugs(20, 7, "fresh store meta for " << *update.entry); + const char *freshSwapHeader = update.entry->getSerialisedMetaData(); + const auto freshSwapHeaderSize = update.entry->mem_obj->swap_hdr_sz; + Must(freshSwapHeader); + writer->write(freshSwapHeader, freshSwapHeaderSize, 0, nullptr); + offset += freshSwapHeaderSize; + } + + { + debugs(20, 7, "fresh HTTP header @ " << offset); + MemBuf *httpHeader = update.entry->mem_obj->getReply()->pack(); + writer->write(httpHeader->content(), httpHeader->contentSize(), -1, nullptr); + offset += httpHeader->contentSize(); + delete httpHeader; + } + + { + debugs(20, 7, "moved HTTP body prefix @ " << offset); + writer->write(exchangeBuffer.rawContent(), exchangeBuffer.length(), -1, nullptr); + offset += exchangeBuffer.length(); + exchangeBuffer.clear(); + } + + debugs(20, 7, "wrote " << offset); + + writer->close(StoreIOState::wroteAll); // should call noteDoneWriting() +} + +void +Rock::HeaderUpdater::NoteDoneWriting(void *data, int errflag, StoreIOState::Pointer) +{ + CallJobHere1(47, 7, + CbcPointer(static_cast(data)), + Rock::HeaderUpdater, + noteDoneWriting, + errflag); +} + +void +Rock::HeaderUpdater::noteDoneWriting(int errflag) +{ + debugs(47, 5, errflag << " reader=" << reader); + Must(!errflag); + Must(!reader); // if we wrote everything, then we must have read everything + + Must(writer); + IoState &rockWriter = dynamic_cast(*writer); + update.fresh.splicingPoint = rockWriter.splicingPoint; + debugs(47, 5, "fresh chain ends at " << update.fresh.splicingPoint); + store->map->closeForUpdating(update); + rockWriter.writeableAnchor_ = nullptr; + writer = nullptr; // we are done writing + + Must(doneAll()); +} + +void +Rock::HeaderUpdater::parseReadBytes() +{ + if (!staleSwapHeaderSize) { + StoreMetaUnpacker aBuilder( + exchangeBuffer.rawContent(), + exchangeBuffer.length(), + &staleSwapHeaderSize); + // Squid assumes that metadata always fits into a single db slot + Must(aBuilder.isBufferSane()); // cannot update what we cannot parse + debugs(47, 7, "staleSwapHeaderSize=" << staleSwapHeaderSize); + Must(staleSwapHeaderSize > 0); + exchangeBuffer.consume(staleSwapHeaderSize); + } + + const size_t staleHttpHeaderSize = headersEnd( + exchangeBuffer.rawContent(), + exchangeBuffer.length()); + debugs(47, 7, "staleHttpHeaderSize=" << staleHttpHeaderSize); + if (!staleHttpHeaderSize) { + readMore("need more stale HTTP reply header data"); + return; + } + + exchangeBuffer.consume(staleHttpHeaderSize); + debugs(47, 7, "httpBodySizePrefix=" << exchangeBuffer.length()); + + stopReading("read the last HTTP header slot"); + startWriting(); +} + diff --git a/src/fs/rock/RockHeaderUpdater.h b/src/fs/rock/RockHeaderUpdater.h new file mode 100644 index 0000000000..704bb76c2b --- /dev/null +++ b/src/fs/rock/RockHeaderUpdater.h @@ -0,0 +1,73 @@ +/* + * Copyright (C) 1996-2016 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#ifndef SQUID_FS_ROCK_HEADER_UPDATER_H +#define SQUID_FS_ROCK_HEADER_UPDATER_H + +#include "base/AsyncJob.h" +#include "cbdata.h" +#include "fs/rock/forward.h" +#include "fs/rock/RockSwapDir.h" +#include "ipc/StoreMap.h" + +namespace Rock +{ + +/// Updates HTTP headers of a single Rock store entry: +/// * reads old body data in the same slot as the last old headers slot, if any +/// * writes new headers (1+ slots) +/// * writes old data (0-2 slots) +/// * chains the new entry prefix (1+ slots) to the old entry suffix (0+ slots) +class HeaderUpdater: public AsyncJob +{ + CBDATA_CLASS(HeaderUpdater); + +public: + HeaderUpdater(const Rock::SwapDir::Pointer &aStore, const Ipc::StoreMapUpdate &update); + virtual ~HeaderUpdater() override = default; + +protected: + /* AsyncJob API */ + virtual void start() override; + virtual bool doneAll() const override; + virtual void swanSong() override; + +private: + static StoreIOState::STRCB NoteRead; + static StoreIOState::STIOCB NoteDoneReading; + static StoreIOState::STIOCB NoteDoneWriting; + + void startReading(); + void stopReading(const char *why); + void readMore(const char *why); + void noteRead(ssize_t result); + void noteDoneReading(int errflag); + void parseReadBytes(); + + void startWriting(); + void noteDoneWriting(int errflag); + + Rock::SwapDir::Pointer store; ///< cache_dir where the entry is stored + Ipc::StoreMapUpdate update; ///< Ipc::StoreMap update reservation + + StoreIOState::Pointer reader; ///< reads old headers and old data + StoreIOState::Pointer writer; ///< writes new headers and old data + + SBuf readerBuffer; ///< I/O buffer for a single read operation + SBuf exchangeBuffer; ///< bytes read but not yet discarded or written + uint64_t bytesRead; ///< total entry bytes read from Store so far + + int staleSwapHeaderSize; ///< stored size of the stale entry metadata + + SlotId staleSplicingPointNext; ///< non-updatable old HTTP body suffix start +}; + +} // namespace Rock + +#endif /* SQUID_FS_ROCK_HEADER_UPDATER_H */ + diff --git a/src/fs/rock/RockIoState.cc b/src/fs/rock/RockIoState.cc index 72f49ac176..e43ea441a5 100644 --- a/src/fs/rock/RockIoState.cc +++ b/src/fs/rock/RockIoState.cc @@ -30,10 +30,12 @@ Rock::IoState::IoState(Rock::SwapDir::Pointer &aDir, StoreIOState(cbFile, cbIo, data), readableAnchor_(NULL), writeableAnchor_(NULL), - sidCurrent(-1), + splicingPoint(-1), + staleSplicingPointNext(-1), dir(aDir), slotSize(dir->slotSize), objOffset(0), + sidCurrent(-1), theBuf(dir->slotSize) { e = anEntry; @@ -132,6 +134,11 @@ void Rock::IoState::callReaderBack(const char *buf, int rlen) { debugs(79, 5, rlen << " bytes for " << *e); + splicingPoint = rlen >= 0 ? sidCurrent : -1; + if (splicingPoint < 0) + staleSplicingPointNext = -1; + else + staleSplicingPointNext = currentReadableSlice().next; StoreIOState::STRCB *callb = read.callback; assert(callb); read.callback = NULL; @@ -150,7 +157,7 @@ Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor) success = true; } catch (const std::exception &ex) { // TODO: should we catch ... as well? debugs(79, 2, "db write error: " << ex.what()); - dir->writeError(*e); + dir->writeError(*this); finishedWriting(DISK_ERROR); // 'this' might be gone beyond this point; fall through to free buf } @@ -202,7 +209,7 @@ Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff) writeToDisk(sidNext); } else if (Store::Root().transientReaders(*e)) { // write partial buffer for all remote hit readers to see - writeBufToDisk(-1, false); + writeBufToDisk(-1, false, false); } } @@ -231,15 +238,24 @@ Rock::IoState::writeToBuffer(char const *buf, size_t size) /// write what was buffered during write() calls /// negative sidNext means this is the last write request for this entry void -Rock::IoState::writeToDisk(const SlotId sidNext) +Rock::IoState::writeToDisk(const SlotId sidNextProposal) { assert(theFile != NULL); assert(theBuf.size >= sizeof(DbCellHeader)); + const bool lastWrite = sidNextProposal < 0; + const bool eof = lastWrite && + // either not updating or the updating reader has loaded everything + (touchingStoreEntry() || staleSplicingPointNext < 0); + // approve sidNextProposal unless _updating_ the last slot + const SlotId sidNext = (!touchingStoreEntry() && lastWrite) ? + staleSplicingPointNext : sidNextProposal; + debugs(79, 5, "sidNext:" << sidNextProposal << "=>" << sidNext << " eof=" << eof); + // TODO: if DiskIO module is mmap-based, we should be writing whole pages // to avoid triggering read-page;new_head+old_tail;write-page overheads - writeBufToDisk(sidNext, sidNext < 0); + writeBufToDisk(sidNext, eof, lastWrite); theBuf.clear(); sidCurrent = sidNext; @@ -248,7 +264,7 @@ Rock::IoState::writeToDisk(const SlotId sidNext) /// creates and submits a request to write current slot buffer to disk /// eof is true if and only this is the last slot void -Rock::IoState::writeBufToDisk(const SlotId sidNext, bool eof) +Rock::IoState::writeBufToDisk(const SlotId sidNext, const bool eof, const bool lastWrite) { // no slots after the last/eof slot (but partial slots may have a nil next) assert(!eof || sidNext < 0); @@ -281,7 +297,7 @@ Rock::IoState::writeBufToDisk(const SlotId sidNext, bool eof) memFreeBufFunc(wBufCap)), this); r->sidCurrent = sidCurrent; r->sidNext = sidNext; - r->eof = eof; + r->eof = lastWrite; // theFile->write may call writeCompleted immediatelly theFile->write(r); @@ -307,7 +323,8 @@ Rock::IoState::finishedWriting(const int errFlag) { // we incremented offset_ while accumulating data in write() // we do not reset writeableAnchor_ here because we still keep the lock - CollapsedForwarding::Broadcast(*e); + if (touchingStoreEntry()) + CollapsedForwarding::Broadcast(*e); callBack(errFlag); } @@ -332,8 +349,7 @@ Rock::IoState::close(int how) return; // writeCompleted() will callBack() case writerGone: - assert(writeableAnchor_); - dir->writeError(*e); // abort a partially stored entry + dir->writeError(*this); // abort a partially stored entry finishedWriting(DISK_ERROR); return; diff --git a/src/fs/rock/RockIoState.h b/src/fs/rock/RockIoState.h index 6926206715..97e5ef79ed 100644 --- a/src/fs/rock/RockIoState.h +++ b/src/fs/rock/RockIoState.h @@ -51,7 +51,11 @@ public: const Ipc::StoreMapAnchor *readableAnchor_; ///< starting point for reading Ipc::StoreMapAnchor *writeableAnchor_; ///< starting point for writing - SlotId sidCurrent; ///< ID of the db slot currently being read or written + /// the last db slot successfully read or written + SlotId splicingPoint; + /// when reading, this is the next slot we are going to read (if asked) + /// when writing, this is the next slot to use after the last fresh slot + SlotId staleSplicingPointNext; private: const Ipc::StoreMapAnchor &readAnchor() const; @@ -61,7 +65,7 @@ private: void tryWrite(char const *buf, size_t size, off_t offset); size_t writeToBuffer(char const *buf, size_t size); void writeToDisk(const SlotId nextSlot); - void writeBufToDisk(const SlotId nextSlot, const bool eof); + void writeBufToDisk(const SlotId nextSlot, const bool eof, const bool lastWrite); SlotId reserveSlotForWriting(); void callBack(int errflag); @@ -69,6 +73,7 @@ private: Rock::SwapDir::Pointer dir; ///< swap dir that initiated I/O const size_t slotSize; ///< db cell size int64_t objOffset; ///< object offset for current db slot + SlotId sidCurrent; ///< ID of the db slot currently being read or written RefCount theFile; // "file" responsible for this I/O MemBlob theBuf; // use for write content accumulation only diff --git a/src/fs/rock/RockRebuild.cc b/src/fs/rock/RockRebuild.cc index 54cd18708f..fb5b157d8b 100644 --- a/src/fs/rock/RockRebuild.cc +++ b/src/fs/rock/RockRebuild.cc @@ -46,17 +46,15 @@ CBDATA_NAMESPACED_CLASS_INIT(Rock, Rebuild); * have the same key and version. If that assumption fails, we may serve a * hodgepodge entry during rebuild, until "extra" slots are loaded/noticed. \par + * iNode: The very first db slot in an entry slot chain. This slot contains + * at least the beginning of Store Entry metadata, but most 32KB inodes contain + * the entire metadata, HTTP headers, and HTTP body. + \par * Db slot: A db record containing a piece of a single store entry and linked * to other slots with the same key and version fields, forming a chain. * Slots are identified by their absolute position in the database file, * which is naturally unique. \par - * Except for the "mapped", "freed", and "more" fields, LoadingEntry info is - * entry-level and is stored at fileno position. In other words, the array of - * LoadingEntries should be interpreted as two arrays, one that maps slot ID - * to the LoadingEntry::mapped/free/more members, and the second one that maps - * fileno to all other LoadingEntry members. StoreMap maps slot key to fileno. - \par * When information from the newly loaded db slot contradicts the entry-level * information collected so far (e.g., the versions do not match or the total * chain size after the slot contribution exceeds the expected number), the @@ -76,35 +74,135 @@ CBDATA_NAMESPACED_CLASS_INIT(Rock, Rebuild); namespace Rock { -/// maintains information about the store entry being loaded from disk -/// used for identifying partially stored/loaded entries -class LoadingEntry +/// low-level anti-padding storage class for LoadingEntry and LoadingSlot flags +class LoadingFlags { public: - LoadingEntry(): size(0), version(0), state(leEmpty), anchored(0), - mapped(0), freed(0), more(-1) {} + LoadingFlags(): state(0), anchored(0), mapped(0), finalized(0), freed(0) {} - /* store entry-level information indexed by sfileno */ - uint64_t size; ///< payload seen so far - uint32_t version; ///< DbCellHeader::version to distinguish same-URL chains - uint8_t state:3; ///< current entry state (one of the State values) + /* for LoadingEntry */ + uint8_t state:3; ///< current entry state (one of the LoadingEntry::State values) uint8_t anchored:1; ///< whether we loaded the inode slot for this entry - /* db slot-level information indexed by slotId, starting with firstSlot */ - uint8_t mapped:1; ///< whether this slot was added to a mapped entry - uint8_t freed:1; ///< whether this slot was marked as free - Ipc::StoreMapSliceId more; ///< another slot in some entry chain (unordered) - bool used() const { return freed || mapped || more != -1; } + /* for LoadingSlot */ + uint8_t mapped:1; ///< whether the slot was added to a mapped entry + uint8_t finalized:1; ///< whether finalizeOrThrow() has scanned the slot + uint8_t freed:1; ///< whether the slot was given to the map as free space +}; + +/// smart StoreEntry-level info pointer (hides anti-padding LoadingParts arrays) +class LoadingEntry +{ +public: + LoadingEntry(const sfileno fileNo, LoadingParts &source); + + uint64_t &size; ///< payload seen so far + uint32_t &version; ///< DbCellHeader::version to distinguish same-URL chains - /// possible entry states + /// possible store entry states during index rebuild typedef enum { leEmpty = 0, leLoading, leLoaded, leCorrupted, leIgnored } State; + + /* LoadingFlags::state */ + State state() const { return static_cast(flags.state); } + void state(State aState) const { flags.state = aState; } + + /* LoadingFlags::anchored */ + bool anchored() const { return flags.anchored; } + void anchored(const bool beAnchored) { flags.anchored = beAnchored; } + +private: + LoadingFlags &flags; ///< entry flags (see the above accessors) are ours +}; + +/// smart db slot-level info pointer (hides anti-padding LoadingParts arrays) +class LoadingSlot +{ +public: + LoadingSlot(const SlotId slotId, LoadingParts &source); + + /// another slot in some chain belonging to the same entry (unordered!) + Ipc::StoreMapSliceId &more; + + /* LoadingFlags::mapped */ + bool mapped() const { return flags.mapped; } + void mapped(const bool beMapped) { flags.mapped = beMapped; } + + /* LoadingFlags::finalized */ + bool finalized() const { return flags.finalized; } + void finalized(const bool beFinalized) { flags.finalized = beFinalized; } + + /* LoadingFlags::freed */ + bool freed() const { return flags.freed; } + void freed(const bool beFreed) { flags.freed = beFreed; } + + bool used() const { return freed() || mapped() || more != -1; } + +private: + LoadingFlags &flags; ///< slot flags (see the above accessors) are ours +}; + +/// information about store entries being loaded from disk (and their slots) +/// used for identifying partially stored/loaded entries +class LoadingParts +{ +public: + LoadingParts(int dbSlotLimit, int dbEntryLimit); + LoadingParts(LoadingParts&&) = delete; // paranoid (often too huge to copy) + +private: + friend class LoadingEntry; + friend class LoadingSlot; + + /* Anti-padding storage. With millions of entries, padding matters! */ + + /* indexed by sfileno */ + std::vector sizes; ///< LoadingEntry::size for all entries + std::vector versions; ///< LoadingEntry::version for all entries + + /* indexed by SlotId */ + std::vector mores; ///< LoadingSlot::more for all slots + + /* entry flags are indexed by sfileno; slot flags -- by SlotId */ + std::vector flags; ///< all LoadingEntry and LoadingSlot flags }; } /* namespace Rock */ +/* LoadingEntry */ + +Rock::LoadingEntry::LoadingEntry(const sfileno fileNo, LoadingParts &source): + size(source.sizes.at(fileNo)), + version(source.versions.at(fileNo)), + flags(source.flags.at(fileNo)) +{ +} + +/* LoadingSlot */ + +Rock::LoadingSlot::LoadingSlot(const SlotId slotId, LoadingParts &source): + more(source.mores.at(slotId)), + flags(source.flags.at(slotId)) +{ +} + +/* LoadingParts */ + +Rock::LoadingParts::LoadingParts(const int dbEntryLimit, const int dbSlotLimit): + sizes(dbEntryLimit, 0), + versions(dbEntryLimit, 0), + mores(dbSlotLimit, -1), + flags(dbSlotLimit) +{ + assert(sizes.size() == versions.size()); // every entry has both fields + assert(sizes.size() <= mores.size()); // every entry needs slot(s) + assert(mores.size() == flags.size()); // every slot needs a set of flags +} + +/* Rebuild */ + Rock::Rebuild::Rebuild(SwapDir *dir): AsyncJob("Rock::Rebuild"), sd(dir), - entries(NULL), + parts(nullptr), dbSize(0), dbSlotSize(0), dbSlotLimit(0), @@ -127,7 +225,7 @@ Rock::Rebuild::~Rebuild() { if (fd >= 0) file_close(fd); - delete[] entries; + delete parts; } /// prepares and initiates entry loading sequence @@ -159,7 +257,7 @@ Rock::Rebuild::start() dbOffset = SwapDir::HeaderSize; - entries = new LoadingEntry[dbSlotLimit]; + parts = new LoadingParts(dbEntryLimit, dbSlotLimit); checkpoint(); } @@ -172,11 +270,24 @@ Rock::Rebuild::checkpoint() eventAdd("Rock::Rebuild", Rock::Rebuild::Steps, this, 0.01, 1, true); } +bool +Rock::Rebuild::doneLoading() const +{ + return loadingPos >= dbSlotLimit; +} + +bool +Rock::Rebuild::doneValidating() const +{ + // paranoid slot checking is only enabled with squid -S + return validationPos >= dbEntryLimit + + (opt_store_doublecheck ? dbSlotLimit : 0); +} + bool Rock::Rebuild::doneAll() const { - return loadingPos >= dbSlotLimit && validationPos >= dbSlotLimit && - AsyncJob::doneAll(); + return doneLoading() && doneValidating() && AsyncJob::doneAll(); } void @@ -189,7 +300,7 @@ Rock::Rebuild::Steps(void *data) void Rock::Rebuild::steps() { - if (loadingPos < dbSlotLimit) + if (!doneLoading()) loadingSteps(); else validationSteps(); @@ -210,7 +321,7 @@ Rock::Rebuild::loadingSteps() const timeval loopStart = current_time; int loaded = 0; - while (loadingPos < dbSlotLimit) { + while (!doneLoading()) { loadOneSlot(); dbOffset += dbSlotSize; ++loadingPos; @@ -232,6 +343,21 @@ Rock::Rebuild::loadingSteps() } } +Rock::LoadingEntry +Rock::Rebuild::loadingEntry(const sfileno fileNo) +{ + Must(0 <= fileNo && fileNo < dbEntryLimit); + return LoadingEntry(fileNo, *parts); +} + +Rock::LoadingSlot +Rock::Rebuild::loadingSlot(const SlotId slotId) +{ + Must(0 <= slotId && slotId < dbSlotLimit); + Must(slotId <= loadingPos); // cannot look ahead + return LoadingSlot(slotId, *parts); +} + void Rock::Rebuild::loadOneSlot() { @@ -256,18 +382,18 @@ Rock::Rebuild::loadOneSlot() debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " << "Ignoring truncated " << buf.contentSize() << "-byte " << "cache entry meta data at " << dbOffset); - freeSlotIfIdle(slotId, true); + freeUnusedSlot(slotId, true); return; } memcpy(&header, buf.content(), sizeof(header)); if (header.empty()) { - freeSlotIfIdle(slotId, false); + freeUnusedSlot(slotId, false); return; } if (!header.sane(dbSlotSize, dbSlotLimit)) { debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " << "Ignoring malformed cache entry meta data at " << dbOffset); - freeSlotIfIdle(slotId, true); + freeUnusedSlot(slotId, true); return; } buf.consume(sizeof(header)); // optimize to avoid memmove() @@ -286,9 +412,10 @@ Rock::Rebuild::importEntry(Ipc::StoreMapAnchor &anchor, const sfileno fileno, co if (!storeRebuildParseEntry(buf, loadedE, key, counts, knownSize)) return false; - // the entry size may still be unknown at this time + // the entry size may be unknown, but if it is known, it is authoritative debugs(47, 8, "importing basics for entry " << fileno << + " inode.entrySize: " << header.entrySize << " swap_file_sz: " << loadedE.swap_file_sz); anchor.set(loadedE); @@ -310,8 +437,11 @@ Rock::Rebuild::validationSteps() const timeval loopStart = current_time; int validated = 0; - while (validationPos < dbSlotLimit) { - validateOneEntry(); + while (!doneValidating()) { + if (validationPos < dbEntryLimit) + validateOneEntry(validationPos); + else + validateOneSlot(validationPos - dbEntryLimit); ++validationPos; ++validated; @@ -331,27 +461,78 @@ Rock::Rebuild::validationSteps() } } +/// Either make the entry accessible to all or throw. +/// This method assumes it is called only when no more entry slots are expected. +void +Rock::Rebuild::finalizeOrThrow(const sfileno fileNo, LoadingEntry &le) +{ + // walk all map-linked slots, starting from inode, and mark each + Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileNo); + Must(le.size > 0); // paranoid + uint64_t mappedSize = 0; + SlotId slotId = anchor.start; + while (slotId >= 0 && mappedSize < le.size) { + LoadingSlot slot = loadingSlot(slotId); // throws if we have not loaded that slot + Must(!slot.finalized()); // no loops or stealing from other entries + Must(slot.mapped()); // all our slots should be in the sd->map + Must(!slot.freed()); // all our slots should still be present + slot.finalized(true); + + Ipc::StoreMapSlice &mapSlice = sd->map->writeableSlice(fileNo, slotId); + Must(mapSlice.size > 0); // paranoid + mappedSize += mapSlice.size; + slotId = mapSlice.next; + } + /* no hodgepodge entries: one entry - one full chain and no leftovers */ + Must(slotId < 0); + Must(mappedSize == le.size); + + if (!anchor.basics.swap_file_sz) + anchor.basics.swap_file_sz = le.size; + EBIT_SET(anchor.basics.flags, ENTRY_VALIDATED); + le.state(LoadingEntry::leLoaded); + sd->map->closeForWriting(fileNo, false); + ++counts.objcount; +} + +/// Either make the entry accessible to all or free it. +/// This method must only be called when no more entry slots are expected. void -Rock::Rebuild::validateOneEntry() +Rock::Rebuild::finalizeOrFree(const sfileno fileNo, LoadingEntry &le) { - LoadingEntry &e = entries[validationPos]; - switch (e.state) { + try { + finalizeOrThrow(fileNo, le); + } catch (const std::exception &ex) { + freeBadEntry(fileNo, ex.what()); + } +} - case LoadingEntry::leEmpty: - break; // no entry hashed to this position +void +Rock::Rebuild::validateOneEntry(const sfileno fileNo) +{ + LoadingEntry entry = loadingEntry(fileNo); + switch (entry.state()) { case LoadingEntry::leLoading: - freeBadEntry(validationPos, "partially stored"); + finalizeOrFree(fileNo, entry); break; - case LoadingEntry::leLoaded: - break; // we have already unlocked this entry - - case LoadingEntry::leCorrupted: - break; // we have already removed this entry + case LoadingEntry::leEmpty: // no entry hashed to this position + case LoadingEntry::leLoaded: // we have already unlocked this entry + case LoadingEntry::leCorrupted: // we have already removed this entry + case LoadingEntry::leIgnored: // we have already discarded this entry + break; } } +void +Rock::Rebuild::validateOneSlot(const SlotId slotId) +{ + const LoadingSlot slot = loadingSlot(slotId); + // there should not be any unprocessed slots left + Must(slot.freed() || (slot.mapped() && slot.finalized())); +} + /// Marks remaining bad entry slots as free and unlocks the entry. The map /// cannot do this because Loading entries may have holes in the slots chain. void @@ -360,26 +541,18 @@ Rock::Rebuild::freeBadEntry(const sfileno fileno, const char *eDescription) debugs(47, 2, "cache_dir #" << sd->index << ' ' << eDescription << " entry " << fileno << " is ignored during rebuild"); - Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno); + LoadingEntry le = loadingEntry(fileno); + le.state(LoadingEntry::leCorrupted); - bool freedSome = false; - // free all loaded non-anchor slots - SlotId slotId = entries[anchor.start].more; - while (slotId >= 0) { - const SlotId next = entries[slotId].more; - freeSlot(slotId, false); + Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno); + assert(anchor.start < 0 || le.size > 0); + for (SlotId slotId = anchor.start; slotId >= 0;) { + const SlotId next = loadingSlot(slotId).more; + freeSlot(slotId, true); slotId = next; - freedSome = true; } - // free anchor slot if it was loaded - if (entries[fileno].anchored) { - freeSlot(anchor.start, false); - freedSome = true; - } - assert(freedSome); sd->map->forgetWritingEntry(fileno); - ++counts.invalid; } void @@ -411,9 +584,9 @@ void Rock::Rebuild::freeSlot(const SlotId slotId, const bool invalid) { debugs(47,5, sd->index << " frees slot " << slotId); - LoadingEntry &le = entries[slotId]; - assert(!le.freed); - le.freed = 1; + LoadingSlot slot = loadingSlot(slotId); + assert(!slot.freed()); + slot.freed(true); if (invalid) { ++counts.invalid; @@ -426,27 +599,24 @@ Rock::Rebuild::freeSlot(const SlotId slotId, const bool invalid) sd->freeSlots->push(pageId); } -/// adds slot to the free slot index but only if the slot is unused +/// freeSlot() for never-been-mapped slots void -Rock::Rebuild::freeSlotIfIdle(const SlotId slotId, const bool invalid) +Rock::Rebuild::freeUnusedSlot(const SlotId slotId, const bool invalid) { - const LoadingEntry &le = entries[slotId]; - + LoadingSlot slot = loadingSlot(slotId); // mapped slots must be freed via freeBadEntry() to keep the map in sync - assert(!le.mapped); - - if (!le.used()) - freeSlot(slotId, invalid); + assert(!slot.mapped()); + freeSlot(slotId, invalid); } /// adds slot to the entry chain in the map void Rock::Rebuild::mapSlot(const SlotId slotId, const DbCellHeader &header) { - LoadingEntry &le = entries[slotId]; - assert(!le.mapped); - assert(!le.freed); - le.mapped = 1; + LoadingSlot slot = loadingSlot(slotId); + assert(!slot.mapped()); + assert(!slot.freed()); + slot.mapped(true); Ipc::StoreMapSlice slice; slice.next = header.nextSlot; @@ -454,73 +624,75 @@ Rock::Rebuild::mapSlot(const SlotId slotId, const DbCellHeader &header) sd->map->importSlice(slotId, slice); } +template // accommodates atomic and simple SlotIds. +void +Rock::Rebuild::chainSlots(SlotIdType &from, const SlotId to) +{ + LoadingSlot slot = loadingSlot(to); + assert(slot.more < 0); + slot.more = from; // may still be unset + from = to; +} + /// adds slot to an existing entry chain; caller must check that the slot /// belongs to the chain it is being added to void Rock::Rebuild::addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header) { - LoadingEntry &le = entries[fileno]; + LoadingEntry le = loadingEntry(fileno); Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno); - assert(le.version == header.version); - - // mark anchor as loaded or add the secondary slot to the chain - LoadingEntry &inode = entries[header.firstSlot]; - if (header.firstSlot == slotId) { - debugs(47,5, "adding inode"); - assert(!inode.freed); - le.anchored = 1; + debugs(47,9, "adding " << slotId << " to entry " << fileno); + // we do not need to preserve the order + if (le.anchored()) { + LoadingSlot inode = loadingSlot(anchor.start); + chainSlots(inode.more, slotId); } else { - debugs(47,9, "linking " << slotId << " to " << inode.more); - // we do not need to preserve the order - LoadingEntry &slice = entries[slotId]; - assert(!slice.freed); - assert(slice.more < 0); - slice.more = inode.more; - inode.more = slotId; + chainSlots(anchor.start, slotId); } - if (header.firstSlot == slotId && !importEntry(anchor, fileno, header)) { - le.state = LoadingEntry::leCorrupted; - freeBadEntry(fileno, "corrupted metainfo"); - return; - } + le.size += header.payloadSize; // must precede freeBadEntry() calls - // set total entry size and/or check it for consistency - debugs(47, 8, "header.entrySize: " << header.entrySize << " swap_file_sz: " << anchor.basics.swap_file_sz); - uint64_t totalSize = header.entrySize; - assert(totalSize != static_cast(-1)); - if (!totalSize && anchor.basics.swap_file_sz) { - assert(anchor.basics.swap_file_sz != static_cast(-1)); - // perhaps we loaded a later slot (with entrySize) earlier - totalSize = anchor.basics.swap_file_sz; - } else if (totalSize && !anchor.basics.swap_file_sz) { - anchor.basics.swap_file_sz = totalSize; - assert(anchor.basics.swap_file_sz != static_cast(-1)); - } else if (totalSize != anchor.basics.swap_file_sz) { - le.state = LoadingEntry::leCorrupted; - freeBadEntry(fileno, "size mismatch"); - return; + if (header.firstSlot == slotId) { + debugs(47,5, "added inode"); + + if (le.anchored()) { // we have already added another inode slot + freeBadEntry(fileno, "inode conflict"); + ++counts.clashcount; + return; + } + + le.anchored(true); + + if (!importEntry(anchor, fileno, header)) { + freeBadEntry(fileno, "corrupted metainfo"); + return; + } + + // set total entry size and/or check it for consistency + if (const uint64_t totalSize = header.entrySize) { + assert(totalSize != static_cast(-1)); + if (!anchor.basics.swap_file_sz) { + anchor.basics.swap_file_sz = totalSize; + assert(anchor.basics.swap_file_sz != static_cast(-1)); + } else if (totalSize != anchor.basics.swap_file_sz) { + freeBadEntry(fileno, "size mismatch"); + return; + } + } } - le.size += header.payloadSize; + const uint64_t totalSize = anchor.basics.swap_file_sz; // may be 0/unknown if (totalSize > 0 && le.size > totalSize) { // overflow debugs(47, 8, "overflow: " << le.size << " > " << totalSize); - le.state = LoadingEntry::leCorrupted; freeBadEntry(fileno, "overflowing"); return; } mapSlot(slotId, header); - if (totalSize > 0 && le.size == totalSize) { - // entry fully loaded, unlock it - // we have validated that all db cells for this entry were loaded - EBIT_SET(anchor.basics.flags, ENTRY_VALIDATED); - le.state = LoadingEntry::leLoaded; - sd->map->closeForWriting(fileno, false); - ++counts.objcount; - } + if (totalSize > 0 && le.size == totalSize) + finalizeOrFree(fileno, le); // entry is probably fully loaded now } /// initialize housekeeping information for a newly accepted entry @@ -529,12 +701,12 @@ Rock::Rebuild::primeNewEntry(Ipc::StoreMap::Anchor &anchor, const sfileno fileno { anchor.setKey(reinterpret_cast(header.key)); assert(header.firstSlot >= 0); - anchor.start = header.firstSlot; + anchor.start = -1; // addSlotToEntry() will set it assert(anchor.basics.swap_file_sz != static_cast(-1)); - LoadingEntry &le = entries[fileno]; - le.state = LoadingEntry::leLoading; + LoadingEntry le = loadingEntry(fileno); + le.state(LoadingEntry::leLoading); le.version = header.version; le.size = 0; } @@ -543,22 +715,6 @@ Rock::Rebuild::primeNewEntry(Ipc::StoreMap::Anchor &anchor, const sfileno fileno void Rock::Rebuild::startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header) { - // If some other from-disk entry is/was using this slot as its inode OR - // if some other from-disk entry is/was using our inode slot, then the - // entries are conflicting. We cannot identify other entries, so we just - // remove ours and hope that the others were/will be handled correctly. - const LoadingEntry &slice = entries[slotId]; - const LoadingEntry &inode = entries[header.firstSlot]; - if (slice.used() || inode.used()) { - debugs(47,8, "slice/inode used: " << slice.used() << inode.used()); - LoadingEntry &le = entries[fileno]; - le.state = LoadingEntry::leCorrupted; - freeSlotIfIdle(slotId, slotId == header.firstSlot); - // if not idle, the other entry will handle its slice - ++counts.clashcount; - return; - } - // A miss may have been stored at our fileno while we were loading other // slots from disk. We ought to preserve that entry because it is fresher. const bool overwriteExisting = false; @@ -569,9 +725,9 @@ Rock::Rebuild::startNewEntry(const sfileno fileno, const SlotId slotId, const Db } else { // A new from-network entry is occupying our map slot; let it be, but // save us from the trouble of going through the above motions again. - LoadingEntry &le = entries[fileno]; - le.state = LoadingEntry::leIgnored; - freeSlotIfIdle(slotId, false); + LoadingEntry le = loadingEntry(fileno); + le.state(LoadingEntry::leIgnored); + freeUnusedSlot(slotId, false); } } @@ -579,72 +735,26 @@ Rock::Rebuild::startNewEntry(const sfileno fileno, const SlotId slotId, const Db bool Rock::Rebuild::sameEntry(const sfileno fileno, const DbCellHeader &header) const { + // Header updates always result in multi-start chains and often + // result in multi-version chains so we can only compare the keys. const Ipc::StoreMap::Anchor &anchor = sd->map->writeableEntry(fileno); - const LoadingEntry &le = entries[fileno]; - // any order will work, but do fast comparisons first: - return le.version == header.version && - anchor.start == static_cast(header.firstSlot) && - anchor.sameKey(reinterpret_cast(header.key)); -} - -/// is the new header consistent with information already loaded? -bool -Rock::Rebuild::canAdd(const sfileno fileno, const SlotId slotId, const DbCellHeader &header) const -{ - if (!sameEntry(fileno, header)) { - debugs(79, 7, "cannot add; wrong entry"); - return false; - } - - const LoadingEntry &le = entries[slotId]; - // We cannot add a slot that was already declared free or mapped. - if (le.freed || le.mapped) { - debugs(79, 7, "cannot add; freed/mapped: " << le.freed << le.mapped); - return false; - } - - if (slotId == header.firstSlot) { - // If we are the inode, the anchored flag cannot be set yet. - if (entries[fileno].anchored) { - debugs(79, 7, "cannot add; extra anchor"); - return false; - } - - // And there should have been some other slot for this entry to exist. - if (le.more < 0) { - debugs(79, 7, "cannot add; missing slots"); - return false; - } - - return true; - } - - // We are the continuation slice so the more field is reserved for us. - if (le.more >= 0) { - debugs(79, 7, "cannot add; foreign slot"); - return false; - } - - return true; + return anchor.sameKey(reinterpret_cast(header.key)); } /// handle freshly loaded (and validated) db slot header void Rock::Rebuild::useNewSlot(const SlotId slotId, const DbCellHeader &header) { - LoadingEntry &slice = entries[slotId]; - assert(!slice.freed); // we cannot free what was not loaded - const cache_key *const key = reinterpret_cast(header.key); - const sfileno fileno = sd->map->anchorIndexByKey(key); + const sfileno fileno = sd->map->fileNoByKey(key); assert(0 <= fileno && fileno < dbEntryLimit); - LoadingEntry &le = entries[fileno]; - debugs(47,9, "entry " << fileno << " state: " << le.state << ", inode: " << + LoadingEntry le = loadingEntry(fileno); + debugs(47,9, "entry " << fileno << " state: " << le.state() << ", inode: " << header.firstSlot << ", size: " << header.payloadSize); - switch (le.state) { + switch (le.state()) { case LoadingEntry::leEmpty: { startNewEntry(fileno, slotId, header); @@ -652,14 +762,13 @@ Rock::Rebuild::useNewSlot(const SlotId slotId, const DbCellHeader &header) } case LoadingEntry::leLoading: { - if (canAdd(fileno, slotId, header)) { - addSlotToEntry(fileno, slotId, header); + if (sameEntry(fileno, header)) { + addSlotToEntry(fileno, slotId, header); // may fail } else { // either the loading chain or this slot is stale; // be conservative and ignore both (and any future ones) - le.state = LoadingEntry::leCorrupted; freeBadEntry(fileno, "duplicated"); - freeSlotIfIdle(slotId, slotId == header.firstSlot); + freeUnusedSlot(slotId, true); ++counts.dupcount; } break; @@ -668,22 +777,22 @@ Rock::Rebuild::useNewSlot(const SlotId slotId, const DbCellHeader &header) case LoadingEntry::leLoaded: { // either the previously loaded chain or this slot is stale; // be conservative and ignore both (and any future ones) - le.state = LoadingEntry::leCorrupted; + le.state(LoadingEntry::leCorrupted); sd->map->freeEntry(fileno); // may not be immediately successful - freeSlotIfIdle(slotId, slotId == header.firstSlot); + freeUnusedSlot(slotId, true); ++counts.dupcount; break; } case LoadingEntry::leCorrupted: { // previously seen slots messed things up so we must ignore this one - freeSlotIfIdle(slotId, false); + freeUnusedSlot(slotId, true); break; } case LoadingEntry::leIgnored: { // already replaced by a fresher or colliding from-network entry - freeSlotIfIdle(slotId, false); + freeUnusedSlot(slotId, false); break; } } diff --git a/src/fs/rock/RockRebuild.h b/src/fs/rock/RockRebuild.h index 3735f8a7d0..56febd0507 100644 --- a/src/fs/rock/RockRebuild.h +++ b/src/fs/rock/RockRebuild.h @@ -19,6 +19,8 @@ namespace Rock { class LoadingEntry; +class LoadingSlot; +class LoadingParts; /// \ingroup Rock /// manages store rebuild process: loading meta information from db on disk @@ -36,33 +38,42 @@ protected: virtual bool doneAll() const; virtual void swanSong(); + bool doneLoading() const; + bool doneValidating() const; + private: void checkpoint(); void steps(); void loadingSteps(); void validationSteps(); void loadOneSlot(); - void validateOneEntry(); + void validateOneEntry(const sfileno fileNo); + void validateOneSlot(const SlotId slotId); bool importEntry(Ipc::StoreMapAnchor &anchor, const sfileno slotId, const DbCellHeader &header); void freeBadEntry(const sfileno fileno, const char *eDescription); void failure(const char *msg, int errNo = 0); + LoadingEntry loadingEntry(const sfileno fileNo); void startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header); void primeNewEntry(Ipc::StoreMapAnchor &anchor, const sfileno fileno, const DbCellHeader &header); + void finalizeOrFree(const sfileno fileNo, LoadingEntry &le); + void finalizeOrThrow(const sfileno fileNo, LoadingEntry &le); void addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header); void useNewSlot(const SlotId slotId, const DbCellHeader &header); + LoadingSlot loadingSlot(const SlotId slotId); void mapSlot(const SlotId slotId, const DbCellHeader &header); - void freeSlotIfIdle(const SlotId slotId, const bool invalid); - void freeBusySlot(const SlotId slotId, const bool invalid); + void freeUnusedSlot(const SlotId slotId, const bool invalid); void freeSlot(const SlotId slotId, const bool invalid); - bool canAdd(const sfileno fileno, const SlotId slotId, const DbCellHeader &header) const; + template + void chainSlots(SlotIdType &from, const SlotId to); + bool sameEntry(const sfileno fileno, const DbCellHeader &header) const; SwapDir *sd; - LoadingEntry *entries; ///< store entries being loaded from disk + LoadingParts *parts; ///< parts of store entries being loaded from disk int64_t dbSize; int dbSlotSize; ///< the size of a db cell, including the cell header diff --git a/src/fs/rock/RockSwapDir.cc b/src/fs/rock/RockSwapDir.cc index 1ae2f4b668..f5fac233cd 100644 --- a/src/fs/rock/RockSwapDir.cc +++ b/src/fs/rock/RockSwapDir.cc @@ -16,6 +16,7 @@ #include "DiskIO/DiskIOStrategy.h" #include "DiskIO/ReadRequest.h" #include "DiskIO/WriteRequest.h" +#include "fs/rock/RockHeaderUpdater.h" #include "fs/rock/RockIoRequests.h" #include "fs/rock/RockIoState.h" #include "fs/rock/RockRebuild.h" @@ -666,6 +667,33 @@ Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreI return sio; } +StoreIOState::Pointer +Rock::SwapDir::createUpdateIO(const Ipc::StoreMapUpdate &update, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data) +{ + if (!theFile || theFile->error()) { + debugs(47,4, theFile); + return nullptr; + } + + Must(update.fresh); + Must(update.fresh.fileNo >= 0); + + Rock::SwapDir::Pointer self(this); + IoState *sio = new IoState(self, update.entry, cbFile, cbIo, data); + + sio->swap_dirn = index; + sio->swap_filen = update.fresh.fileNo; + sio->writeableAnchor_ = update.fresh.anchor; + + debugs(47,5, "dir " << index << " updating filen " << + std::setfill('0') << std::hex << std::uppercase << std::setw(8) << + sio->swap_filen << std::dec << " starting at " << + diskOffset(sio->swap_filen)); + + sio->file(theFile); + return sio; +} + int64_t Rock::SwapDir::diskOffset(const SlotId sid) const { @@ -832,6 +860,8 @@ Rock::SwapDir::writeCompleted(int errflag, size_t, RefCount< ::WriteRequest> r) return; } + debugs(79, 7, "errflag=" << errflag << " rlen=" << request->len << " eof=" << request->eof); + // TODO: Fail if disk dropped one of the previous write requests. if (errflag == DISK_OK) { @@ -846,37 +876,61 @@ Rock::SwapDir::writeCompleted(int errflag, size_t, RefCount< ::WriteRequest> r) if (request->eof) { assert(sio.e); assert(sio.writeableAnchor_); - sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz = - sio.offset_; + if (sio.touchingStoreEntry()) { + sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz = + sio.offset_; - // close, the entry gets the read lock - map->closeForWriting(sio.swap_filen, true); + // close, the entry gets the read lock + map->closeForWriting(sio.swap_filen, true); + } sio.writeableAnchor_ = NULL; + sio.splicingPoint = request->sidCurrent; sio.finishedWriting(errflag); } } else { noteFreeMapSlice(request->sidNext); - writeError(*sio.e); + writeError(sio); sio.finishedWriting(errflag); // and hope that Core will call disconnect() to close the map entry } - CollapsedForwarding::Broadcast(*sio.e); + if (sio.touchingStoreEntry()) + CollapsedForwarding::Broadcast(*sio.e); } void -Rock::SwapDir::writeError(StoreEntry &e) +Rock::SwapDir::writeError(StoreIOState &sio) { // Do not abortWriting here. The entry should keep the write lock // instead of losing association with the store and confusing core. - map->freeEntry(e.swap_filen); // will mark as unusable, just in case + map->freeEntry(sio.swap_filen); // will mark as unusable, just in case - Store::Root().transientsAbandon(e); + if (sio.touchingStoreEntry()) + Store::Root().transientsAbandon(*sio.e); + // else noop: a fresh entry update error does not affect stale entry readers // All callers must also call IoState callback, to propagate the error. } +void +Rock::SwapDir::updateHeaders(StoreEntry *updatedE) +{ + if (!map) + return; + + Ipc::StoreMapUpdate update(updatedE); + if (!map->openForUpdating(update, updatedE->swap_filen)) + return; + + try { + AsyncJob::Start(new HeaderUpdater(this, update)); + } catch (const std::exception &ex) { + debugs(20, 2, "error starting to update entry " << *updatedE << ": " << ex.what()); + map->abortUpdating(update); + } +} + bool Rock::SwapDir::full() const { diff --git a/src/fs/rock/RockSwapDir.h b/src/fs/rock/RockSwapDir.h index 2be42dea85..7340ec7e4b 100644 --- a/src/fs/rock/RockSwapDir.h +++ b/src/fs/rock/RockSwapDir.h @@ -67,7 +67,7 @@ public: int64_t diskOffset(Ipc::Mem::PageId &pageId) const; int64_t diskOffset(int filen) const; - void writeError(StoreEntry &e); + void writeError(StoreIOState &sio); /* StoreMapCleaner API */ virtual void noteFreeMapSlice(const Ipc::StoreMapSliceId fileno); @@ -91,6 +91,7 @@ protected: virtual void diskFull(); virtual void reference(StoreEntry &e); virtual bool dereference(StoreEntry &e); + virtual void updateHeaders(StoreEntry *e); virtual bool unlinkdUseful() const; virtual void unlink(StoreEntry &e); virtual void statfs(StoreEntry &e) const; @@ -118,11 +119,15 @@ protected: int64_t diskOffsetLimit() const; + void updateHeadersOrThrow(Ipc::StoreMapUpdate &update); + StoreIOState::Pointer createUpdateIO(const Ipc::StoreMapUpdate &update, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *); + void anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor); bool updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor); friend class Rebuild; friend class IoState; + friend class HeaderUpdater; const char *filePath; ///< location of cache storage file inside path/ DirMap *map; ///< entry key/sfileno to MaxExtras/inode mapping diff --git a/src/fs/rock/forward.h b/src/fs/rock/forward.h index 1a3e9a6458..6cbd1d0b52 100644 --- a/src/fs/rock/forward.h +++ b/src/fs/rock/forward.h @@ -36,6 +36,8 @@ class Rebuild; class IoState; +class HeaderUpdater; + class DbCellHeader; } diff --git a/src/http.cc b/src/http.cc index af712ed82a..a54804b28b 100644 --- a/src/http.cc +++ b/src/http.cc @@ -1448,8 +1448,11 @@ HttpStateData::processReplyBody() writeReplyBody(); } + // storing/sending methods like earlier adaptOrFinalizeReply() or + // above writeReplyBody() may release/abort the store entry. if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) { - // The above writeReplyBody() call may have aborted the store entry. + // TODO: In some cases (e.g., 304), we should keep persistent conn open. + // Detect end-of-reply (and, hence, pool our idle pconn) earlier (ASAP). abortTransaction("store entry aborted while storing reply"); return; } else diff --git a/src/ipc/ReadWriteLock.cc b/src/ipc/ReadWriteLock.cc index 813b89da79..2a3b7951e0 100644 --- a/src/ipc/ReadWriteLock.cc +++ b/src/ipc/ReadWriteLock.cc @@ -12,6 +12,14 @@ #include "ipc/ReadWriteLock.h" #include "Store.h" +void Ipc::AssertFlagIsSet(std::atomic_flag &flag) +{ + // If the flag was false, then we set it to true and assert. A true flag + // may help keep other processes away from this broken entry. + // Otherwise, we just set an already set flag, which is probably a no-op. + assert(flag.test_and_set(std::memory_order_relaxed)); +} + bool Ipc::ReadWriteLock::lockShared() { @@ -37,6 +45,18 @@ Ipc::ReadWriteLock::lockExclusive() return false; } +bool +Ipc::ReadWriteLock::lockHeaders() +{ + if (lockShared()) { + if (!updating.test_and_set(std::memory_order_acquire)) + return true; // we got here first + // the updating lock was already set by somebody else + unlockShared(); + } + return false; +} + void Ipc::ReadWriteLock::unlockShared() { @@ -54,6 +74,14 @@ Ipc::ReadWriteLock::unlockExclusive() --writeLevel; } +void +Ipc::ReadWriteLock::unlockHeaders() +{ + AssertFlagIsSet(updating); + updating.clear(std::memory_order_release); + unlockShared(); +} + void Ipc::ReadWriteLock::switchExclusiveToShared() { diff --git a/src/ipc/ReadWriteLock.h b/src/ipc/ReadWriteLock.h index 329be57255..59f71877ba 100644 --- a/src/ipc/ReadWriteLock.h +++ b/src/ipc/ReadWriteLock.h @@ -30,8 +30,10 @@ public: bool lockShared(); ///< lock for reading or return false bool lockExclusive(); ///< lock for modification or return false + bool lockHeaders(); ///< lock for [readable] metadata update or return false void unlockShared(); ///< undo successful sharedLock() void unlockExclusive(); ///< undo successful exclusiveLock() + void unlockHeaders(); ///< undo successful lockHeaders() void switchExclusiveToShared(); ///< stop writing, start reading void startAppending(); ///< writer keeps its lock but also allows reading @@ -42,7 +44,8 @@ public: public: mutable std::atomic readers; ///< number of reading users std::atomic writing; ///< there is a writing user (there can be at most 1) - std::atomic appending; ///< the writer has promissed to only append + std::atomic appending; ///< the writer has promised to only append + std::atomic_flag updating; ///< a reader is updating metadata/headers private: mutable std::atomic readLevel; ///< number of users reading (or trying to) @@ -66,6 +69,11 @@ public: int appenders; ///< number of appending writers }; +/// Same as assert(flag is set): The process assert()s if flag is not set. +/// Side effect: The unset flag becomes set just before we assert(). +/// Needed because atomic_flag cannot be compared with a boolean. +void AssertFlagIsSet(std::atomic_flag &flag); + } // namespace Ipc #endif /* SQUID_IPC_READ_WRITE_LOCK_H */ diff --git a/src/ipc/StoreMap.cc b/src/ipc/StoreMap.cc index 9c39f398e2..124c07261a 100644 --- a/src/ipc/StoreMap.cc +++ b/src/ipc/StoreMap.cc @@ -27,12 +27,19 @@ StoreMapAnchorsId(const SBuf &path) return Ipc::Mem::Segment::Name(path, "anchors"); } +static SBuf +StoreMapFileNosId(const SBuf &path) +{ + return Ipc::Mem::Segment::Name(path, "filenos"); +} + Ipc::StoreMap::Owner * Ipc::StoreMap::Init(const SBuf &path, const int sliceLimit) { assert(sliceLimit > 0); // we should not be created otherwise const int anchorLimit = min(sliceLimit, static_cast(SwapFilenMax)); Owner *owner = new Owner; + owner->fileNos = shm_new(FileNos)(StoreMapFileNosId(path).c_str(), anchorLimit); owner->anchors = shm_new(Anchors)(StoreMapAnchorsId(path).c_str(), anchorLimit); owner->slices = shm_new(Slices)(StoreMapSlicesId(path).c_str(), sliceLimit); debugs(54, 5, "created " << path << " with " << anchorLimit << '+' << sliceLimit); @@ -40,10 +47,12 @@ Ipc::StoreMap::Init(const SBuf &path, const int sliceLimit) } Ipc::StoreMap::StoreMap(const SBuf &aPath): cleaner(NULL), path(aPath), + fileNos(shm_old(FileNos)(StoreMapFileNosId(path).c_str())), anchors(shm_old(Anchors)(StoreMapAnchorsId(path).c_str())), slices(shm_old(Slices)(StoreMapSlicesId(path).c_str())) { debugs(54, 5, "attached " << path << " with " << + fileNos->capacity << '+' << anchors->capacity << '+' << slices->capacity); assert(entryLimit() > 0); // key-to-position mapping requires this assert(entryLimit() <= sliceLimit()); // at least one slice per entry @@ -76,7 +85,6 @@ Ipc::StoreMap::forgetWritingEntry(sfileno fileno) // them; the caller is responsible for freeing them (most likely // our slice list is incomplete or has holes) - inode.waitingToBeFreed = false; inode.rewind(); inode.lock.unlockExclusive(); @@ -90,7 +98,7 @@ Ipc::StoreMap::openForWriting(const cache_key *const key, sfileno &fileno) { debugs(54, 5, "opening entry with key " << storeKeyText(key) << " for writing " << path); - const int idx = anchorIndexByKey(key); + const int idx = fileNoByKey(key); if (Anchor *anchor = openForWritingAt(idx)) { fileno = idx; @@ -123,6 +131,7 @@ Ipc::StoreMap::openForWritingAt(const sfileno fileno, bool overwriteExisting) assert(s.empty()); s.start = -1; // we have not allocated any slices yet + s.splicingPoint = -1; ++anchors->count; //s.setKey(key); // XXX: the caller should do that @@ -208,6 +217,24 @@ Ipc::StoreMap::abortWriting(const sfileno fileno) } } +void +Ipc::StoreMap::abortUpdating(Update &update) +{ + const sfileno fileno = update.stale.fileNo; + debugs(54, 5, "aborting entry " << fileno << " for updating " << path); + if (update.stale) { + AssertFlagIsSet(update.stale.anchor->lock.updating); + update.stale.anchor->lock.unlockHeaders(); + closeForReading(update.stale.fileNo); + update.stale = Update::Edition(); + } + if (update.fresh) { + abortWriting(update.fresh.fileNo); + update.fresh = Update::Edition(); + } + debugs(54, 5, "aborted entry " << fileno << " for updating " << path); +} + const Ipc::StoreMap::Anchor * Ipc::StoreMap::peekAtReader(const sfileno fileno) const { @@ -245,7 +272,7 @@ Ipc::StoreMap::freeEntryByKey(const cache_key *const key) debugs(54, 5, "marking entry with key " << storeKeyText(key) << " to be freed in " << path); - const int idx = anchorIndexByKey(key); + const int idx = fileNoByKey(key); Anchor &s = anchorAt(idx); if (s.lock.lockExclusive()) { if (s.sameKey(key)) @@ -269,21 +296,8 @@ Ipc::StoreMap::freeChain(const sfileno fileno, Anchor &inode, const bool keepLoc { debugs(54, 7, "freeing entry " << fileno << " in " << path); - if (!inode.empty()) { - sfileno sliceId = inode.start; - debugs(54, 8, "first slice " << sliceId); - while (sliceId >= 0) { - Slice &slice = sliceAt(sliceId); - const sfileno nextId = slice.next; - slice.size = 0; - slice.next = -1; - if (cleaner) - cleaner->noteFreeMapSlice(sliceId); // might change slice state - sliceId = nextId; - } - } - - inode.waitingToBeFreed = false; + if (!inode.empty()) + freeChainAt(inode.start, inode.splicingPoint); inode.rewind(); if (!keepLocked) @@ -292,19 +306,62 @@ Ipc::StoreMap::freeChain(const sfileno fileno, Anchor &inode, const bool keepLoc debugs(54, 5, "freed entry " << fileno << " in " << path); } +/// unconditionally frees an already locked chain of slots; no anchor maintenance +void +Ipc::StoreMap::freeChainAt(SliceId sliceId, const SliceId splicingPoint) +{ + static uint64_t ChainId = 0; // to pair freeing/freed calls in debugs() + const uint64_t chainId = ++ChainId; + debugs(54, 7, "freeing chain #" << chainId << " starting at " << sliceId << " in " << path); + while (sliceId >= 0) { + Slice &slice = sliceAt(sliceId); + const SliceId nextId = slice.next; + slice.size = 0; + slice.next = -1; + if (cleaner) + cleaner->noteFreeMapSlice(sliceId); // might change slice state + if (sliceId == splicingPoint) { + debugs(54, 5, "preserving chain #" << chainId << " in " << path << + " suffix after slice " << splicingPoint); + break; // do not free the rest of the chain + } + sliceId = nextId; + } + debugs(54, 7, "freed chain #" << chainId << " in " << path); +} + +Ipc::StoreMap::SliceId +Ipc::StoreMap::sliceContaining(const sfileno fileno, const uint64_t bytesNeeded) const +{ + const Anchor &anchor = anchorAt(fileno); + Must(anchor.reading()); + uint64_t bytesSeen = 0; + SliceId lastSlice = anchor.start; + while (lastSlice >= 0) { + const Slice &slice = sliceAt(lastSlice); + bytesSeen += slice.size; + if (bytesSeen >= bytesNeeded) + break; + lastSlice = slice.next; + } + debugs(54, 7, "entry " << fileno << " has " << bytesNeeded << '/' << bytesSeen << + " bytes at slice " << lastSlice << " in " << path); + return lastSlice; // may be negative +} + const Ipc::StoreMap::Anchor * Ipc::StoreMap::openForReading(const cache_key *const key, sfileno &fileno) { debugs(54, 5, "opening entry with key " << storeKeyText(key) << " for reading " << path); - const int idx = anchorIndexByKey(key); + const int idx = fileNoByKey(key); if (const Anchor *slot = openForReadingAt(idx)) { if (slot->sameKey(key)) { fileno = idx; return slot; // locked for reading } slot->lock.unlockShared(); - debugs(54, 7, "closed entry " << idx << " for reading " << path); + debugs(54, 7, "closed wrong-key entry " << idx << " for reading " << path); } return NULL; } @@ -349,14 +406,192 @@ Ipc::StoreMap::closeForReading(const sfileno fileno) } bool -Ipc::StoreMap::purgeOne() +Ipc::StoreMap::openForUpdating(Update &update, const sfileno fileNoHint) { - // Hopefully, we find a removable entry much sooner (TODO: use time?). + Must(update.entry); + const StoreEntry &entry = *update.entry; + const cache_key *const key = reinterpret_cast(entry.key); + update.stale.name = nameByKey(key); + + if (!validEntry(fileNoHint)) { + debugs(54, 5, "opening entry with key " << storeKeyText(key) << + " for updating " << path); + update.stale.fileNo = fileNoByName(update.stale.name); + } else { + update.stale.fileNo = fileNoHint; + } + + debugs(54, 5, "opening entry " << update.stale.fileNo << " of " << entry << " for updating " << path); + + // Unreadable entries cannot (e.g., empty and otherwise problematic entries) + // or should not (e.g., entries still forming their metadata) be updated. + if (const Anchor *anchor = openForReadingAt(update.stale.fileNo)) { + if (!anchor->sameKey(key)) { + closeForReading(update.stale.fileNo); + debugs(54, 5, "cannot open wrong-key entry " << update.stale.fileNo << " for updating " << path); + return false; + } + } else { + debugs(54, 5, "cannot open unreadable entry " << update.stale.fileNo << " for updating " << path); + return false; + } + + update.stale.anchor = &anchorAt(update.stale.fileNo); + if (update.stale.anchor->writing()) { + // TODO: Support updating appending entries. + // For example, MemStore::updateHeaders() would not know how + // many old prefix body bytes to copy to the new prefix if the last old + // prefix slice has not been formed yet (i.e., still gets more bytes). + debugs(54, 5, "cannot open appending entry " << update.stale.fileNo << + " for updating " << path); + closeForReading(update.stale.fileNo); + return false; + } + + if (!update.stale.anchor->lock.lockHeaders()) { + debugs(54, 5, "cannot open updating entry " << update.stale.fileNo << + " for updating " << path); + closeForReading(update.stale.fileNo); + return false; + } + + /* stale anchor is properly locked; we can now use abortUpdating() if needed */ + + if (!openKeyless(update.fresh)) { + debugs(54, 5, "cannot open freshchainless entry " << update.stale.fileNo << + " for updating " << path); + abortUpdating(update); + return false; + } + + Must(update.stale); + Must(update.fresh); + update.fresh.anchor->set(entry); + debugs(54, 5, "opened entry " << update.stale.fileNo << " for updating " << path << + " using entry " << update.fresh.fileNo << " of " << entry); + + return true; +} + +/// finds an anchor that is currently not associated with any entry key and +/// locks it for writing so ensure exclusive access during updates +bool +Ipc::StoreMap::openKeyless(Update::Edition &edition) +{ + return visitVictims([&](const sfileno name) { + Update::Edition temp; + temp.name = name; + temp.fileNo = fileNoByName(temp.name); + if ((temp.anchor = openForWritingAt(temp.fileNo))) { + debugs(54, 5, "created entry " << temp.fileNo << + " for updating " << path); + Must(temp); + edition = temp; + return true; + } + return false; + }); +} + +void +Ipc::StoreMap::closeForUpdating(Update &update) +{ + Must(update.stale.anchor); + Must(update.fresh.anchor); + AssertFlagIsSet(update.stale.anchor->lock.updating); + Must(update.stale.splicingPoint >= 0); + Must(update.fresh.splicingPoint >= 0); + + /* the stale prefix cannot overlap with the fresh one (a weak check) */ + Must(update.stale.anchor->start != update.fresh.anchor->start); + Must(update.stale.anchor->start != update.fresh.splicingPoint); + Must(update.stale.splicingPoint != update.fresh.anchor->start); + Must(update.stale.splicingPoint != update.fresh.splicingPoint); + + /* the relative order of most operations is significant here */ + + /* splice the fresh chain prefix with the stale chain suffix */ + Slice &freshSplicingSlice = sliceAt(update.fresh.splicingPoint); + const SliceId suffixStart = sliceAt(update.stale.splicingPoint).next; // may be negative + // the fresh chain is either properly terminated or already spliced + if (freshSplicingSlice.next < 0) + freshSplicingSlice.next = suffixStart; + else + Must(freshSplicingSlice.next == suffixStart); + // either way, fresh chain uses the stale chain suffix now + + // make the fresh anchor/chain readable for everybody + update.fresh.anchor->lock.switchExclusiveToShared(); + // but the fresh anchor is still invisible to anybody but us + + // This freeEntry() code duplicates the code below to minimize the time when + // the freeEntry() race condition (see the Race: comment below) might occur. + if (update.stale.anchor->waitingToBeFreed) + freeEntry(update.fresh.fileNo); + + /* any external changes were applied to the stale anchor/chain until now */ + relocate(update.stale.name, update.fresh.fileNo); + /* any external changes will apply to the fresh anchor/chain from now on */ + + // Race: If the stale entry was deleted by some kid during the assignment, + // then we propagate that event to the fresh anchor and chain. Since this + // update is not atomically combined with the assignment above, another kid + // might get a fresh entry just before we have a chance to free it. However, + // such deletion races are always possible even without updates. + if (update.stale.anchor->waitingToBeFreed) + freeEntry(update.fresh.fileNo); + + /* free the stale chain prefix except for the shared suffix */ + update.stale.anchor->splicingPoint = update.stale.splicingPoint; + freeEntry(update.stale.fileNo); + + // make the stale anchor/chain reusable, reachable via its new location + relocate(update.fresh.name, update.stale.fileNo); + + const Update updateSaved = update; // for post-close debugging below + + /* unlock the stale anchor/chain */ + update.stale.anchor->lock.unlockHeaders(); + closeForReading(update.stale.fileNo); + update.stale = Update::Edition(); + + // finally, unlock the fresh entry + closeForReading(update.fresh.fileNo); + update.fresh = Update::Edition(); + + debugs(54, 5, "closed entry " << updateSaved.stale.fileNo << " of " << *updateSaved.entry << + " named " << updateSaved.stale.name << " for updating " << path << + " to fresh entry " << updateSaved.fresh.fileNo << " named " << updateSaved.fresh.name << + " with [" << updateSaved.fresh.anchor->start << ',' << updateSaved.fresh.splicingPoint << + "] prefix containing at least " << freshSplicingSlice.size << " bytes"); +} + +/// Visits entries until either +/// * the `visitor` returns true (indicating its satisfaction with the offer); +/// * we give up finding a suitable entry because it already took "too long"; or +/// * we have offered all entries. +bool +Ipc::StoreMap::visitVictims(const NameFilter visitor) +{ + // Hopefully, we find a usable entry much sooner (TODO: use time?). // The min() will protect us from division by zero inside the loop. const int searchLimit = min(10000, entryLimit()); int tries = 0; for (; tries < searchLimit; ++tries) { - const sfileno fileno = static_cast(++anchors->victim % entryLimit()); + const sfileno name = static_cast(++anchors->victim % entryLimit()); + if (visitor(name)) + return true; + } + + debugs(54, 5, "no victims found in " << path << "; tried: " << tries); + return false; +} + +bool +Ipc::StoreMap::purgeOne() +{ + return visitVictims([&](const sfileno name) { + const sfileno fileno = fileNoByName(name); Anchor &s = anchorAt(fileno); if (s.lock.lockExclusive()) { // the caller wants a free slice; empty anchor is not enough @@ -368,9 +603,8 @@ Ipc::StoreMap::purgeOne() } s.lock.unlockExclusive(); } - } - debugs(54, 5, "no entries to purge from " << path << "; tried: " << tries); - return false; + return false; + }); } void @@ -435,17 +669,43 @@ Ipc::StoreMap::anchorAt(const sfileno fileno) const } sfileno -Ipc::StoreMap::anchorIndexByKey(const cache_key *const key) const +Ipc::StoreMap::nameByKey(const cache_key *const key) const { const uint64_t *const k = reinterpret_cast(key); // TODO: use a better hash function - return (k[0] + k[1]) % entryLimit(); + const int hash = (k[0] + k[1]) % entryLimit(); + return hash; +} + +sfileno +Ipc::StoreMap::fileNoByName(const sfileno name) const +{ + // fileNos->items are initialized to zero, which we treat as "name is fileno"; + // a positive value means the entry anchor got moved to a new fileNo + if (const int item = fileNos->items[name]) + return item-1; + return name; +} + +/// map `name` to `fileNo` +void +Ipc::StoreMap::relocate(const sfileno name, const sfileno fileno) +{ + // preserve special meaning for zero; see fileNoByName + fileNos->items[name] = fileno+1; +} + +sfileno +Ipc::StoreMap::fileNoByKey(const cache_key *const key) const +{ + const int name = nameByKey(key); + return fileNoByName(name); } Ipc::StoreMap::Anchor & Ipc::StoreMap::anchorByKey(const cache_key *const key) { - return anchorAt(anchorIndexByKey(key)); + return anchorAt(fileNoByKey(key)); } Ipc::StoreMap::Slice& @@ -463,7 +723,7 @@ Ipc::StoreMap::sliceAt(const SliceId sliceId) const /* Ipc::StoreMapAnchor */ -Ipc::StoreMapAnchor::StoreMapAnchor(): start(0) +Ipc::StoreMapAnchor::StoreMapAnchor(): start(0), splicingPoint(-1) { memset(&key, 0, sizeof(key)); memset(&basics, 0, sizeof(basics)); @@ -502,17 +762,46 @@ Ipc::StoreMapAnchor::rewind() { assert(writing()); start = 0; + splicingPoint = -1; memset(&key, 0, sizeof(key)); memset(&basics, 0, sizeof(basics)); + waitingToBeFreed = false; // but keep the lock } -Ipc::StoreMap::Owner::Owner(): anchors(NULL), slices(NULL) +/* Ipc::StoreMapUpdate */ + +Ipc::StoreMapUpdate::StoreMapUpdate(StoreEntry *anEntry): + entry(anEntry) +{ + entry->lock("Ipc::StoreMapUpdate1"); +} + +Ipc::StoreMapUpdate::StoreMapUpdate(const StoreMapUpdate &other): + entry(other.entry), + stale(other.stale), + fresh(other.fresh) +{ + entry->lock("Ipc::StoreMapUpdate2"); +} + +Ipc::StoreMapUpdate::~StoreMapUpdate() +{ + entry->unlock("Ipc::StoreMapUpdate"); +} + +/* Ipc::StoreMap::Owner */ + +Ipc::StoreMap::Owner::Owner(): + fileNos(nullptr), + anchors(nullptr), + slices(nullptr) { } Ipc::StoreMap::Owner::~Owner() { + delete fileNos; delete anchors; delete slices; } diff --git a/src/ipc/StoreMap.h b/src/ipc/StoreMap.h index c53d4209a2..2066b9efbd 100644 --- a/src/ipc/StoreMap.h +++ b/src/ipc/StoreMap.h @@ -16,6 +16,8 @@ #include "store/forward.h" #include "store_key_md5.h" +#include + namespace Ipc { @@ -74,6 +76,7 @@ public: std::atomic waitingToBeFreed; ///< may be accessed w/o a lock // fields marked with [app] can be modified when appending-while-reading + // fields marked with [update] can be modified when updating-while-reading uint64_t key[2]; ///< StoreEntry key @@ -90,6 +93,10 @@ public: /// where the chain of StoreEntry slices begins [app] std::atomic start; + + /// where the updated chain prefix containing metadata/headers ends [update] + /// if unset, this anchor points to a chain that was never updated + std::atomic splicingPoint; }; /// an array of shareable Items @@ -113,7 +120,7 @@ public: /// StoreMapSlices indexed by their slice ID. typedef StoreMapItems StoreMapSlices; -/// StoreMapAnchors indexed by entry fileno plus +/// StoreMapAnchors (indexed by fileno) plus /// sharing-safe basic housekeeping info about Store entries class StoreMapAnchors { @@ -132,20 +139,58 @@ public: }; // TODO: Find an elegant way to use StoreMapItems in StoreMapAnchors +/// StoreMapAnchor positions, indexed by entry "name" (i.e., the entry key hash) +typedef StoreMapItems< std::atomic > StoreMapFileNos; + +/// Aggregates information required for updating entry metadata and headers. +class StoreMapUpdate +{ +public: + /// During an update, the stored entry has two editions: stale and fresh. + class Edition + { + public: + Edition(): anchor(nullptr), fileNo(-1), name(-1), splicingPoint(-1) {} + + /// whether this entry edition is currently used/initialized + explicit operator bool() const { return anchor; } + + StoreMapAnchor *anchor; ///< StoreMap::anchors[fileNo], for convenience/speed + sfileno fileNo; ///< StoreMap::fileNos[name], for convenience/speed + sfileno name; ///< StoreEntry position in StoreMap::fileNos, for swapping Editions + + /// the last slice in the chain still containing metadata/headers + StoreMapSliceId splicingPoint; + }; + + explicit StoreMapUpdate(StoreEntry *anEntry); + StoreMapUpdate(const StoreMapUpdate &other); + ~StoreMapUpdate(); + + StoreMapUpdate &operator =(const StoreMapUpdate &other) = delete; + + StoreEntry *entry; ///< the store entry being updated + Edition stale; ///< old anchor and chain being updated + Edition fresh; ///< new anchor and updated chain prefix +}; + class StoreMapCleaner; /// Manages shared Store index (e.g., locking/unlocking/freeing entries) using -/// StoreMapAnchors indexed by their keys and -/// StoreMapSlices indexed by their slide ID. +/// StoreMapFileNos indexed by hashed entry keys (a.k.a. entry names), +/// StoreMapAnchors indexed by fileno, and +/// StoreMapSlices indexed by slice ID. class StoreMap { public: + typedef StoreMapFileNos FileNos; typedef StoreMapAnchor Anchor; typedef StoreMapAnchors Anchors; typedef sfileno AnchorId; typedef StoreMapSlice Slice; typedef StoreMapSlices Slices; typedef StoreMapSliceId SliceId; + typedef StoreMapUpdate Update; public: /// aggregates anchor and slice owners for Init() caller convenience @@ -154,6 +199,7 @@ public: public: Owner(); ~Owner(); + FileNos::Owner *fileNos; Anchors::Owner *anchors; Slices::Owner *slices; private: @@ -166,8 +212,8 @@ public: StoreMap(const SBuf &aPath); - /// computes map entry position for a given entry key - sfileno anchorIndexByKey(const cache_key *const key) const; + /// computes map entry anchor position for a given entry key + sfileno fileNoByKey(const cache_key *const key) const; /// Like strcmp(mapped, new), but for store entry versions/timestamps. /// Returns +2 if the mapped entry does not exist; -1/0/+1 otherwise. @@ -188,6 +234,13 @@ public: /// this call does not free entry slices so the caller has to do that void forgetWritingEntry(const sfileno fileno); + /// finds and locks the Update entry for an exclusive metadata update + bool openForUpdating(Update &update, sfileno fileNoHint); + /// makes updated info available to others, unlocks, and cleans up + void closeForUpdating(Update &update); + /// undoes partial update, unlocks, and cleans up + void abortUpdating(Update &update); + /// only works on locked entries; returns nil unless the slice is readable const Anchor *peekAtReader(const sfileno fileno) const; @@ -216,6 +269,11 @@ public: /// readable anchor for the entry created by openForReading() const Anchor &readableEntry(const AnchorId anchorId) const; + /// Returns the ID of the entry slice containing n-th byte or + /// a negative ID if the entry does not store that many bytes (yet). + /// Requires a read lock. + SliceId sliceContaining(const sfileno fileno, const uint64_t nth) const; + /// stop writing the entry, freeing its slot for others to use if possible void abortWriting(const sfileno fileno); @@ -239,10 +297,17 @@ public: protected: const SBuf path; ///< cache_dir path or similar cache name; for logging + Mem::Pointer fileNos; ///< entry inodes (starting blocks) Mem::Pointer anchors; ///< entry inodes (starting blocks) Mem::Pointer slices; ///< chained entry pieces positions private: + /// computes entry name (i.e., key hash) for a given entry key + sfileno nameByKey(const cache_key *const key) const; + /// computes anchor position for a given entry name + sfileno fileNoByName(const sfileno name) const; + void relocate(const sfileno name, const sfileno fileno); + Anchor &anchorAt(const sfileno fileno); const Anchor &anchorAt(const sfileno fileno) const; Anchor &anchorByKey(const cache_key *const key); @@ -250,8 +315,14 @@ private: Slice &sliceAt(const SliceId sliceId); const Slice &sliceAt(const SliceId sliceId) const; Anchor *openForReading(Slice &s); + bool openKeyless(Update::Edition &edition); + void closeForUpdateFinal(Update &update); + + typedef std::function NameFilter; // a "name"-based test + bool visitVictims(const NameFilter filter); void freeChain(const sfileno fileno, Anchor &inode, const bool keepLock); + void freeChainAt(SliceId sliceId, const SliceId splicingPoint); }; /// API for adjusting external state when dirty map slice is being freed diff --git a/src/peer_digest.cc b/src/peer_digest.cc index 7f2c3b2ca9..9920581bfe 100644 --- a/src/peer_digest.cc +++ b/src/peer_digest.cc @@ -537,11 +537,7 @@ peerDigestFetchReply(void *data, char *buf, ssize_t size) assert(fetch->old_entry->mem_obj->request); - HttpReply *old_rep = (HttpReply *) fetch->old_entry->getReply(); - - old_rep->updateOnNotModified(reply); - - fetch->old_entry->timestampsSet(); + Store::Root().updateOnNotModified(fetch->old_entry, *fetch->entry); /* get rid of 304 reply */ storeUnregister(fetch->sc, fetch->entry, fetch); diff --git a/src/store/Controlled.h b/src/store/Controlled.h index 0c336aa052..38a7c299b6 100644 --- a/src/store/Controlled.h +++ b/src/store/Controlled.h @@ -25,6 +25,9 @@ public: /// return false iff the idle entry should be destroyed virtual bool dereference(StoreEntry &e) = 0; + /// make stored metadata and HTTP headers the same as in the given entry + virtual void updateHeaders(StoreEntry *) {} + /// If this storage cannot cache collapsed entries, return false. /// If the entry is not found, return false. Otherwise, return true after /// tying the entry to this cache and setting inSync to updateCollapsed(). diff --git a/src/store/Controller.cc b/src/store/Controller.cc index 1e9401418a..9af6e609a4 100644 --- a/src/store/Controller.cc +++ b/src/store/Controller.cc @@ -467,6 +467,25 @@ Store::Controller::handleIdleEntry(StoreEntry &e) } } +void +Store::Controller::updateOnNotModified(StoreEntry *old, const StoreEntry &newer) +{ + /* update the old entry object */ + Must(old); + HttpReply *oldReply = const_cast(old->getReply()); + Must(oldReply); + oldReply->updateOnNotModified(newer.getReply()); + old->timestampsSet(); + + /* update stored image of the old entry */ + + if (memStore && old->mem_status == IN_MEMORY && !EBIT_TEST(old->flags, ENTRY_SPECIAL)) + memStore->updateHeaders(old); + + if (old->swap_dirn > -1) + swapDir->updateHeaders(old); +} + void Store::Controller::allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod) diff --git a/src/store/Controller.h b/src/store/Controller.h index b38d4b75f2..65eb26ea8a 100644 --- a/src/store/Controller.h +++ b/src/store/Controller.h @@ -48,6 +48,9 @@ public: /// called to get rid of no longer needed entry data in RAM, if any void memoryOut(StoreEntry &, const bool preserveSwappable); + /// update old entry metadata and HTTP headers using a newer entry + void updateOnNotModified(StoreEntry *old, const StoreEntry &newer); + /// makes the entry available for collapsing future requests void allowCollapsing(StoreEntry *, const RequestFlags &, const HttpRequestMethod &); diff --git a/src/store/Disks.cc b/src/store/Disks.cc index 7f2a1e776b..f1bea0caef 100644 --- a/src/store/Disks.cc +++ b/src/store/Disks.cc @@ -381,6 +381,13 @@ Store::Disks::dereference(StoreEntry &e) return e.disk().dereference(e); } +void +Store::Disks::updateHeaders(StoreEntry *e) +{ + Must(e); + return e->disk().updateHeaders(e); +} + void Store::Disks::maintain() { diff --git a/src/store/Disks.h b/src/store/Disks.h index 49def69052..1203e1c815 100644 --- a/src/store/Disks.h +++ b/src/store/Disks.h @@ -32,6 +32,7 @@ public: virtual void sync() override; virtual void reference(StoreEntry &) override; virtual bool dereference(StoreEntry &e) override; + virtual void updateHeaders(StoreEntry *) override; virtual void maintain() override; virtual bool anchorCollapsed(StoreEntry &e, bool &inSync) override; virtual bool updateCollapsed(StoreEntry &e) override; diff --git a/src/tests/stub_HttpReply.cc b/src/tests/stub_HttpReply.cc index 07b1c4c43e..196ede285e 100644 --- a/src/tests/stub_HttpReply.cc +++ b/src/tests/stub_HttpReply.cc @@ -28,5 +28,6 @@ HttpReply::HttpReply() : HttpMsg(hoReply), date (0), last_modified (0), void HttpReply::hdrCacheInit() STUB HttpReply * HttpReply::clone() const STUB_RETVAL(NULL) bool HttpReply::inheritProperties(const HttpMsg *aMsg) STUB_RETVAL(false) + void HttpReply::updateOnNotModified(HttpReply const*) STUB int64_t HttpReply::bodySize(const HttpRequestMethod&) const STUB_RETVAL(0) diff --git a/src/tests/stub_MemStore.cc b/src/tests/stub_MemStore.cc index 1688e84e32..060188b43f 100644 --- a/src/tests/stub_MemStore.cc +++ b/src/tests/stub_MemStore.cc @@ -22,6 +22,7 @@ void MemStore::completeWriting(StoreEntry &e) STUB void MemStore::unlink(StoreEntry &e) STUB void MemStore::disconnect(StoreEntry &e) STUB void MemStore::reference(StoreEntry &) STUB +void MemStore::updateHeaders(StoreEntry *) STUB void MemStore::maintain() STUB void MemStore::noteFreeMapSlice(const Ipc::StoreMapSliceId) STUB void MemStore::init() STUB -- 2.39.5