From: Alex Rousskov Date: Sun, 21 Aug 2022 05:30:25 +0000 (+0000) Subject: Fixed StoreMap.cc "anchorAt(anchorId).reading()" assertions (#1117) X-Git-Tag: SQUID_6_0_1~126 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=24c937808c28bc755cdce682d624bc7c4225a227;p=thirdparty%2Fsquid.git Fixed StoreMap.cc "anchorAt(anchorId).reading()" assertions (#1117) Squid asserted because the code could switch a transient entry from writing to reading while the corresponding store entry was still being (or could still be) written to the shared memory cache. For example: 1. We start as a collapsed writer. 2. We get a response and start writing to disk and shared memory caches. 3. Disk swapout fails (for any reason, including out-of-slots errors). 4. storeSwapOutFileClosed() calls transientsCompleteWriting(). 5. transientsCompleteWriting() switches the entry into reading mode ... but we are still writing to the memory cache! There was a somewhat related XXX in transientsCompleteWriting(), but what that comment did not say is that if we are writing to two stores in parallel, then the first transientsCompleteWriting() call (made when one of the two swapouts ends) makes us a reader prematurely. An incorrect reader status allows a Controller::syncCollapsed() notification (to, say, a shared memory cache writer) slip past the transients->isWriter() guard, eventually reaching the reading() assertion. Properly detecting the end of all store writing is difficult because the two mostly independent writing "threads" may start/finish at seemingly random times, in many code places, and even in different workers. To simplify this task, Squid now limits cache writing to the worker transaction that made the StoreEntry object public. That transaction creates a writing Transients entry. Other transactions start as Transients readers. The writer switches to reader when neither memory nor disk caching can start or continue. Also simplified Transients entry state. Squid relayed swapout errors via the Transient entries themselves, but that is not necessary (and it prevented the cache entry from recovering by writing to another store). Each store entry can take care of its own swapout status/results. The readers just need to know whether somebody may start (or is still) writing, and we relay that info by keeping the Transients entry locked for writing (appending, to be precise) while that condition is true. Also fixed shared caches to recognize that no more data will be coming in because the remote entry writer is gone. Readers still try to deliver what they have, even if they know that the response will be truncated. Also tried to follow the "broadcast after change, in the same context as the change" principle in the modified code instead of relying on the caller to broadcast after all changes. This approach may increase the number of broadcasts, but it reduces the probability that we will miss an important Broadcast() call. We can (and should) optimize repeated, useless broadcasts, but that work is outside this project scope. Also improved StoreEntry::swapOut() shutdown safety and mayStartSwapOut() checks descriptions/order. Also added an out-of-scope XXX. --- diff --git a/src/MemStore.cc b/src/MemStore.cc index a756c0b4f2..5f2ae2904c 100644 --- a/src/MemStore.cc +++ b/src/MemStore.cc @@ -513,6 +513,12 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc return true; } + if (anchor.writerHalted) { + debugs(20, 5, "mem-loaded aborted " << e.mem_obj->endOffset() << '/' << + anchor.basics.swap_file_sz << " bytes of " << e); + return false; + } + debugs(20, 5, "mem-loaded all " << e.mem_obj->endOffset() << '/' << anchor.basics.swap_file_sz << " bytes of " << e); @@ -581,6 +587,20 @@ MemStore::shouldCache(StoreEntry &e) const return false; } + // Store::Root() in the next check below is FATALly missing during shutdown + if (shutting_down) { + debugs(20, 5, "yield to shutdown: " << e); + return false; + } + + // To avoid SMP workers releasing each other caching attempts, restrict disk + // caching to StoreEntry publisher. This check goes before memoryCachable() + // that may incorrectly release() publisher's entry via checkCachable(). + if (Store::Root().transientsReader(e)) { + debugs(20, 5, "yield to entry publisher: " << e); + return false; + } + if (!e.memoryCachable()) { debugs(20, 7, "Not memory cachable: " << e); return false; // will not cache due to entry state or properties @@ -873,8 +893,8 @@ MemStore::completeWriting(StoreEntry &e) e.mem_obj->memCache.io = MemObject::ioDone; map->closeForWriting(index); - CollapsedForwarding::Broadcast(e); // before we close our transient entry! - Store::Root().transientsCompleteWriting(e); + CollapsedForwarding::Broadcast(e); + e.storeWriterDone(); } void @@ -913,7 +933,8 @@ MemStore::disconnect(StoreEntry &e) map->abortWriting(mem_obj.memCache.index); mem_obj.memCache.index = -1; mem_obj.memCache.io = MemObject::ioDone; - Store::Root().stopSharing(e); // broadcasts after the change + CollapsedForwarding::Broadcast(e); + e.storeWriterDone(); } else { assert(mem_obj.memCache.io == MemObject::ioReading); map->closeForReading(mem_obj.memCache.index); diff --git a/src/Store.h b/src/Store.h index f407fb80e3..a8e686af0b 100644 --- a/src/Store.h +++ b/src/Store.h @@ -93,6 +93,8 @@ public: void memOutDecision(const bool willCacheInRam); // called when a decision to cache on disk has been made void swapOutDecision(const MemObject::SwapOut::Decision &decision); + /// called when a store writer ends its work (successfully or not) + void storeWriterDone(); void abort(); bool makePublic(const KeyScope keyScope = ksDefault); @@ -303,7 +305,7 @@ public: protected: typedef Store::EntryGuard EntryGuard; - void transientsAbandonmentCheck(); + void storeWritingCheckpoint(); /// does nothing except throwing if disk-associated data members are inconsistent void checkDisk() const; diff --git a/src/Transients.cc b/src/Transients.cc index 5bfcd7664d..555144375a 100644 --- a/src/Transients.cc +++ b/src/Transients.cc @@ -206,6 +206,7 @@ Transients::clearCollapsingRequirement(const StoreEntry &e) const auto idx = e.mem_obj->xitTable.index; auto &anchor = map->writeableEntry(idx); if (EBIT_TEST(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING)) { + // XXX: Non-atomic, [app]-unmarked flags are off limits in append mode. EBIT_CLR(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING); CollapsedForwarding::Broadcast(e); } @@ -320,7 +321,7 @@ Transients::status(const StoreEntry &entry, Transients::EntryStatus &entryStatus const auto idx = entry.mem_obj->xitTable.index; const auto &anchor = isWriter(entry) ? map->writeableEntry(idx) : map->readableEntry(idx); - entryStatus.abortedByWriter = anchor.writerHalted; + entryStatus.hasWriter = anchor.writing(); entryStatus.waitingToBeFreed = anchor.waitingToBeFreed; entryStatus.collapsed = EBIT_TEST(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING); } @@ -328,10 +329,12 @@ Transients::status(const StoreEntry &entry, Transients::EntryStatus &entryStatus void Transients::completeWriting(const StoreEntry &e) { + debugs(20, 5, e); assert(e.hasTransients()); assert(isWriter(e)); map->switchWritingToReading(e.mem_obj->xitTable.index); e.mem_obj->xitTable.io = Store::ioReading; + CollapsedForwarding::Broadcast(e); } int @@ -377,7 +380,12 @@ Transients::disconnect(StoreEntry &entry) auto &xitTable = entry.mem_obj->xitTable; assert(map); if (isWriter(entry)) { - map->abortWriting(xitTable.index); + // completeWriting() was not called, so there could be an active + // Store writer out there, but we should not abortWriting() here + // because another writer may have succeeded, making readers happy. + // If none succeeded, the readers will notice the lack of writers. + map->closeForWriting(xitTable.index); + CollapsedForwarding::Broadcast(entry); } else { assert(isReader(entry)); map->closeForReadingAndFreeIdle(xitTable.index); diff --git a/src/Transients.h b/src/Transients.h index 1d2e823d09..7a2eff57d2 100644 --- a/src/Transients.h +++ b/src/Transients.h @@ -19,10 +19,11 @@ typedef Ipc::StoreMap TransientsMap; -/// Keeps track of store entries being delivered to clients that arrived before -/// those entries were [fully] cached. This SMP-shared table is necessary to -/// * sync an entry-writing worker with entry-reading worker(s); and -/// * sync an entry-deleting worker with both entry-reading/writing workers. +/// A Transients entry allows workers to Broadcast() DELETE requests and swapout +/// progress updates. In a collapsed forwarding context, it also represents a CF +/// initiating worker promise to either cache the response or inform the waiting +/// slaves (via false EntryStatus::hasWriter) that caching will not happen. A +/// Transients entry itself does not carry response- or Store-specific metadata. class Transients: public Store::Controlled, public Ipc::StoreMapCleaner { public: @@ -30,7 +31,7 @@ public: class EntryStatus { public: - bool abortedByWriter = false; ///< whether the entry was aborted + bool hasWriter = false; ///< whether some worker is storing the entry bool waitingToBeFreed = false; ///< whether the entry was marked for deletion bool collapsed = false; ///< whether the entry allows collapsing }; diff --git a/src/fs/rock/RockIoState.cc b/src/fs/rock/RockIoState.cc index 14749002f2..c51da8b88b 100644 --- a/src/fs/rock/RockIoState.cc +++ b/src/fs/rock/RockIoState.cc @@ -103,6 +103,8 @@ Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data assert(theFile != nullptr); assert(coreOff >= 0); + bool writerLeft = readAnchor().writerHalted; // before the sidCurrent change + // if we are dealing with the first read or // if the offset went backwords, start searching from the beginning if (sidCurrent < 0 || coreOff < objOffset) { @@ -112,6 +114,7 @@ Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data } while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) { + writerLeft = readAnchor().writerHalted; // before the sidCurrent change objOffset += currentReadableSlice().size; sidCurrent = currentReadableSlice().next; } @@ -121,6 +124,13 @@ Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data read.callback = cb; read.callback_data = cbdataReference(data); + // quit if we cannot read what they want, and the writer cannot add more + if (sidCurrent < 0 && writerLeft) { + debugs(79, 5, "quitting at " << coreOff << " in " << *e); + callReaderBack(buf, -1); + return; + } + // punt if read offset is too big (because of client bugs or collapsing) if (sidCurrent < 0) { debugs(79, 5, "no " << coreOff << " in " << *e); diff --git a/src/fs/rock/RockSwapDir.cc b/src/fs/rock/RockSwapDir.cc index 64128cdfcf..c3d7c79d67 100644 --- a/src/fs/rock/RockSwapDir.cc +++ b/src/fs/rock/RockSwapDir.cc @@ -140,7 +140,8 @@ void Rock::SwapDir::disconnect(StoreEntry &e) map->abortWriting(e.swap_filen); e.detachFromDisk(); dynamic_cast(*e.mem_obj->swapout.sio).writeableAnchor_ = nullptr; - Store::Root().stopSharing(e); // broadcasts after the change + CollapsedForwarding::Broadcast(e); + e.storeWriterDone(); } else { map->closeForReading(e.swap_filen); e.detachFromDisk(); @@ -171,9 +172,12 @@ Rock::SwapDir::doReportStat() const } void -Rock::SwapDir::finalizeSwapoutSuccess(const StoreEntry &) +Rock::SwapDir::finalizeSwapoutSuccess(const StoreEntry &e) { - // nothing to do + // nothing to do; handleWriteCompletionSuccess() did everything for us + assert(!e.mem_obj || + !e.mem_obj->swapout.sio || + !dynamic_cast(*e.mem_obj->swapout.sio).writeableAnchor_); } void @@ -899,6 +903,7 @@ Rock::SwapDir::handleWriteCompletionSuccess(const WriteRequest &request) map->switchWritingToReading(sio.swap_filen); // sio.e keeps the (now read) lock on the anchor + // storeSwapOutFileClosed() sets swap_status and calls storeWriterDone() } sio.writeableAnchor_ = nullptr; sio.finishedWriting(DISK_OK); @@ -926,7 +931,7 @@ Rock::SwapDir::writeError(StoreIOState &sio) map->freeEntry(sio.swap_filen); // will mark as unusable, just in case if (sio.touchingStoreEntry()) - Store::Root().stopSharing(*sio.e); + CollapsedForwarding::Broadcast(*sio.e); // else noop: a fresh entry update error does not affect stale entry readers // All callers must also call IoState callback, to propagate the error. diff --git a/src/store.cc b/src/store.cc index 37e0cb3601..ab732e7b7d 100644 --- a/src/store.cc +++ b/src/store.cc @@ -1732,37 +1732,62 @@ StoreEntry::getSerialisedMetaData(size_t &length) const } /** - * Abandon the transient entry our worker has created if neither the shared - * memory cache nor the disk cache wants to store it. Collapsed requests, if - * any, should notice and use Plan B instead of getting stuck waiting for us - * to start swapping the entry out. + * If needed, signal transient entry readers that no more cache changes are + * expected and, hence, they should switch to Plan B instead of getting stuck + * waiting for us to start or finish storing the entry. */ void -StoreEntry::transientsAbandonmentCheck() -{ - if (mem_obj && !Store::Root().transientsReader(*this) && // this worker is responsible - hasTransients() && // other workers may be interested - !hasMemStore() && // rejected by the shared memory cache - mem_obj->swapout.decision == MemObject::SwapOut::swImpossible) { - debugs(20, 7, "cannot be shared: " << *this); - if (!shutting_down) // Store::Root() is FATALly missing during shutdown - Store::Root().stopSharing(*this); +StoreEntry::storeWritingCheckpoint() +{ + if (!hasTransients()) + return; // no SMP complications + + // writers become readers but only after completeWriting() which we trigger + if (Store::Root().transientsReader(*this)) + return; // readers do not need to inform + + assert(mem_obj); + if (mem_obj->memCache.io != Store::ioDone) { + debugs(20, 7, "not done with mem-caching " << *this); + return; + } + + const auto doneWithDiskCache = + // will not start + (mem_obj->swapout.decision == MemObject::SwapOut::swImpossible) || + // or has started but finished already + (mem_obj->swapout.decision == MemObject::SwapOut::swStarted && !swappingOut()); + if (!doneWithDiskCache) { + debugs(20, 7, "not done with disk-caching " << *this); + return; } + + debugs(20, 7, "done with writing " << *this); + if (!shutting_down) // Store::Root() is FATALly missing during shutdown + Store::Root().noteStoppedSharedWriting(*this); } void -StoreEntry::memOutDecision(const bool) +StoreEntry::memOutDecision(const bool willCacheInRam) { - transientsAbandonmentCheck(); + if (!willCacheInRam) + return storeWritingCheckpoint(); + assert(mem_obj->memCache.io != Store::ioDone); + // and wait for storeWriterDone() } void StoreEntry::swapOutDecision(const MemObject::SwapOut::Decision &decision) { - // Abandon our transient entry if neither shared memory nor disk wants it. assert(mem_obj); mem_obj->swapout.decision = decision; - transientsAbandonmentCheck(); + storeWritingCheckpoint(); +} + +void +StoreEntry::storeWriterDone() +{ + storeWritingCheckpoint(); } void diff --git a/src/store/Controller.cc b/src/store/Controller.cc index ff863c5d63..4309d8337d 100644 --- a/src/store/Controller.cc +++ b/src/store/Controller.cc @@ -627,23 +627,9 @@ Store::Controller::memoryDisconnect(StoreEntry &e) } void -Store::Controller::stopSharing(StoreEntry &e) +Store::Controller::noteStoppedSharedWriting(StoreEntry &e) { - // Marking the transients entry is sufficient to prevent new readers from - // starting to wait for `e` updates and to inform the current readers (and, - // hence, Broadcast() recipients) about the underlying Store problems. - if (transients && e.hasTransients()) - transients->evictCached(e); -} - -void -Store::Controller::transientsCompleteWriting(StoreEntry &e) -{ - // transients->isWriter(e) is false if `e` is writing to its second store - // after finishing writing to its first store: At the end of the first swap - // out, the transients writer becomes a reader and (XXX) we never switch - // back to writing, even if we start swapping out again (to another store). - if (transients && e.hasTransients() && transients->isWriter(e)) + if (transients && e.hasTransients()) // paranoid: the caller should check transients->completeWriting(e); } @@ -760,6 +746,7 @@ Store::Controller::allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, if (e->makePublic(keyScope)) { // this is needed for both local and SMP collapsing debugs(20, 3, "may " << (transients && e->hasTransients() ? "SMP-" : "locally-") << "collapse " << *e); + assert(e->hittingRequiresCollapsing()); return true; } // paranoid cleanup; the flag is meaningless for private entries @@ -831,12 +818,6 @@ Store::Controller::syncCollapsed(const sfileno xitIndex) assert(transients->isReader(*collapsed)); - if (entryStatus.abortedByWriter) { - debugs(20, 3, "aborting " << *collapsed << " because its writer has aborted"); - collapsed->abort(); - return; - } - if (entryStatus.collapsed && !collapsed->hittingRequiresCollapsing()) { debugs(20, 3, "aborting " << *collapsed << " due to writer/reader collapsing state mismatch"); collapsed->abort(); @@ -879,13 +860,20 @@ Store::Controller::syncCollapsed(const sfileno xitIndex) return; } + if (!entryStatus.hasWriter) { + debugs(20, 3, "aborting abandoned-by-writer " << *collapsed); + collapsed->abort(); + return; + } + // the entry is still not in one of the caches debugs(20, 7, "waiting " << *collapsed); } /// Called for Transients entries that are not yet anchored to a cache. -/// For cached entries, return true after synchronizing them with their cache -/// (making inSync true on success). For not-yet-cached entries, return false. +/// \returns false for not-yet-cached entries that we may attach later +/// \returns true for other entries after synchronizing them with their store +/// and setting inSync to reflect that synchronization outcome. bool Store::Controller::anchorToCache(StoreEntry &entry, bool &inSync) { @@ -894,6 +882,9 @@ Store::Controller::anchorToCache(StoreEntry &entry, bool &inSync) debugs(20, 7, "anchoring " << entry); + Transients::EntryStatus entryStatus; + transients->status(entry, entryStatus); + bool found = false; if (sharedMemStore) found = sharedMemStore->anchorToCache(entry, inSync); @@ -905,11 +896,23 @@ Store::Controller::anchorToCache(StoreEntry &entry, bool &inSync) debugs(20, 7, "anchored " << entry); else debugs(20, 5, "failed to anchor " << entry); - } else { - debugs(20, 7, "skipping not yet cached " << entry); + return true; + } + + if (entryStatus.waitingToBeFreed) { + debugs(20, 5, "failed on marked unattached " << entry); + inSync = false; + return true; + } + + if (!entryStatus.hasWriter) { + debugs(20, 5, "failed on abandoned-by-writer " << entry); + inSync = false; + return true; } - return found; + debugs(20, 7, "skipping not yet cached " << entry); + return false; } bool diff --git a/src/store/Controller.h b/src/store/Controller.h index 4fb4b3ff76..0b8ec38ea4 100644 --- a/src/store/Controller.h +++ b/src/store/Controller.h @@ -108,14 +108,11 @@ public: /// whether the entry is in "writing to Transients" I/O state bool transientsWriter(const StoreEntry &) const; - /// marks the entry completed for collapsed requests - void transientsCompleteWriting(StoreEntry &); - /// Update local intransit entry after changes made by appending worker. void syncCollapsed(const sfileno); - /// stop any current (and prevent any future) SMP sharing of the given entry - void stopSharing(StoreEntry &); + /// adjust shared state after this worker stopped changing the entry + void noteStoppedSharedWriting(StoreEntry &); /// number of the transient entry readers some time ago int transientReaders(const StoreEntry &) const; diff --git a/src/store_swapout.cc b/src/store_swapout.cc index 29348290e0..9d4de299b9 100644 --- a/src/store_swapout.cc +++ b/src/store_swapout.cc @@ -47,7 +47,6 @@ storeSwapOutStart(StoreEntry * e) debugs(20, 5, "storeSwapOutStart: Begin SwapOut '" << e->url() << "' to dirno " << e->swap_dirn << ", fileno " << std::hex << std::setw(8) << std::setfill('0') << std::uppercase << e->swap_filen); - e->swapOutDecision(MemObject::SwapOut::swStarted); /* If we start swapping out objects with OutOfBand Metadata, * then this code needs changing */ @@ -82,6 +81,8 @@ storeSwapOutStart(StoreEntry * e) /* Pick up the file number if it was assigned immediately */ e->attachToDisk(mem->swapout.sio->swap_dirn, mem->swapout.sio->swap_filen, SWAPOUT_WRITING); + e->swapOutDecision(MemObject::SwapOut::swStarted); // after SWAPOUT_WRITING + /* write out the swap metadata */ storeIOWrite(mem->swapout.sio, buf, mem->swap_hdr_sz, 0, xfree_cppwrapper); } @@ -159,6 +160,10 @@ doPages(StoreEntry *anEntry) void StoreEntry::swapOut() { + // Store::Root() in many swapout checks is FATALly missing during shutdown + if (shutting_down) + return; + if (!mem_obj) return; @@ -326,9 +331,9 @@ storeSwapOutFileClosed(void *data, int errflag, StoreIOState::Pointer self) ++statCounter.swap.outs; } - Store::Root().transientsCompleteWriting(*e); debugs(20, 3, "storeSwapOutFileClosed: " << __FILE__ << ":" << __LINE__); mem->swapout.sio = nullptr; + e->storeWriterDone(); // after updating swap_status e->unlock("storeSwapOutFileClosed"); } @@ -351,17 +356,16 @@ StoreEntry::mayStartSwapOut() return false; } - // if we are swapping out or swapped out already, do not start over - if (hasDisk() || Store::Root().hasReadableDiskEntry(*this)) { - debugs(20, 3, "already did"); - swapOutDecision(MemObject::SwapOut::swImpossible); + // If we have started swapping out, do not start over. Most likely, we have + // finished swapping out by now because we are not currently swappingOut(). + if (decision == MemObject::SwapOut::swStarted) { + debugs(20, 3, "already started"); return false; } - // if we have just stared swapping out (attachToDisk() has not been - // called), do not start over - if (decision == MemObject::SwapOut::swStarted) { - debugs(20, 3, "already started"); + // if there is a usable disk entry already, do not start over + if (hasDisk() || Store::Root().hasReadableDiskEntry(*this)) { + debugs(20, 3, "already did"); // we or somebody else created that entry swapOutDecision(MemObject::SwapOut::swImpossible); return false; } @@ -378,6 +382,15 @@ StoreEntry::mayStartSwapOut() return true; } + // To avoid SMP workers releasing each other caching attempts, restrict disk + // caching to StoreEntry publisher. This check goes before checkCachable() + // that may incorrectly release() publisher's entry. + if (Store::Root().transientsReader(*this)) { + debugs(20, 5, "yield to entry publisher"); + swapOutDecision(MemObject::SwapOut::swImpossible); + return false; + } + if (!checkCachable()) { debugs(20, 3, "not cachable"); swapOutDecision(MemObject::SwapOut::swImpossible); diff --git a/src/tests/stub_libstore.cc b/src/tests/stub_libstore.cc index 66fce2d89f..faaf177931 100644 --- a/src/tests/stub_libstore.cc +++ b/src/tests/stub_libstore.cc @@ -47,9 +47,8 @@ void Controller::addReading(StoreEntry *, const cache_key *) STUB void Controller::addWriting(StoreEntry *, const cache_key *) STUB bool Controller::transientsReader(const StoreEntry &) const STUB_RETVAL(false) bool Controller::transientsWriter(const StoreEntry &) const STUB_RETVAL(false) -void Controller::transientsCompleteWriting(StoreEntry &) STUB void Controller::syncCollapsed(const sfileno) STUB -void Controller::stopSharing(StoreEntry &) STUB +void Controller::noteStoppedSharedWriting(StoreEntry &) STUB int Controller::transientReaders(const StoreEntry &) const STUB_RETVAL(0) void Controller::transientsDisconnect(StoreEntry &) STUB void Controller::transientsClearCollapsingRequirement(StoreEntry &) STUB