From: Alex Rousskov Date: Fri, 2 Sep 2022 17:14:12 +0000 (+0000) Subject: Stop (ab)using Transient entry flags for CF requirement mngmt (#1127) X-Git-Tag: SQUID_6_0_1~118 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=9358e99f998ace9c4c7a21d510432dde5b7f9cca;p=thirdparty%2Fsquid.git Stop (ab)using Transient entry flags for CF requirement mngmt (#1127) The ENTRY_REQUIRES_COLLAPSING flag was used for relaying "hitting this entry carries Collapsed Forwarding risks right now" info to other workers. That flag is not necessary to relay that info because remote workers already know whether the entry is attached to one of the shared cache stores. Moreover, relaying that flag correctly is difficult and would require a different implementation: * The flag is changed in relatively high-level Store code, possibly _after_ we stopped writing, when the transients entry is already read-only. This bug leads to an isWriter() assertion in Transients::clearCollapsingRequirement() called via a quick or aborted StoreEntry::startWriting(). * Basics.flags is not an atomic field. It lacks an "[app]" mark -- its readers will not get a reliable value due to race conditions when Squid will need to update the Transient entry flags in "append" mode. * The flag is changed assuming that simply sending stuff to disk is sufficient for other workers to read it. Since 18102f7, this is inaccurate because the store index is updated only after the slot is written to disk. Remote workers now use Store attachment info to set the local StoreEntry object flag, ignoring the no-longer-used Transients entry flags. The ENTRY_REQUIRES_COLLAPSING flag is still used to tell transactions which StoreEntry objects are subject to CF risks. It would be nice to use the presence of MemObject::reply_ to relay the same info for store-unattached entries (and attachment info for others), but we could not find a way to do that (during commit d2a6dcb work and then again during this project) without a lot of essentially out of scope rewrites. Also removed misleading Ipc::StoreMapAnchor::exportInto() debugging. --- diff --git a/src/Transients.cc b/src/Transients.cc index 555144375a..a089dab6e7 100644 --- a/src/Transients.cc +++ b/src/Transients.cc @@ -164,15 +164,6 @@ Transients::get(const cache_key *key) return nullptr; } - // store hadWriter before checking ENTRY_REQUIRES_COLLAPSING to avoid racing - // the writer that clears that flag and then leaves - const auto hadWriter = map->peekAtWriter(index); - if (!hadWriter && EBIT_TEST(anchor->basics.flags, ENTRY_REQUIRES_COLLAPSING)) { - debugs(20, 3, "not joining abandoned entry " << index); - map->closeForReadingAndFreeIdle(index); - return nullptr; - } - StoreEntry *e = new StoreEntry(); e->createMemObject(); anchorEntry(*e, index, *anchor); @@ -197,21 +188,6 @@ Transients::findCollapsed(const sfileno index) return nullptr; } -void -Transients::clearCollapsingRequirement(const StoreEntry &e) -{ - assert(map); - assert(e.hasTransients()); - assert(isWriter(e)); - const auto idx = e.mem_obj->xitTable.index; - auto &anchor = map->writeableEntry(idx); - if (EBIT_TEST(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING)) { - // XXX: Non-atomic, [app]-unmarked flags are off limits in append mode. - EBIT_CLR(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING); - CollapsedForwarding::Broadcast(e); - } -} - void Transients::monitorIo(StoreEntry *e, const cache_key *key, const Store::IoStatus direction) { @@ -292,11 +268,7 @@ Transients::anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapA xitTable.index = index; xitTable.io = Store::ioReading; - const auto hadWriter = hasWriter(e); // before computing collapsingRequired anchor.exportInto(e); - const bool collapsingRequired = EBIT_TEST(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING); - assert(!collapsingRequired || hadWriter); - e.setCollapsingRequirement(collapsingRequired); } bool @@ -323,7 +295,6 @@ Transients::status(const StoreEntry &entry, Transients::EntryStatus &entryStatus map->writeableEntry(idx) : map->readableEntry(idx); entryStatus.hasWriter = anchor.writing(); entryStatus.waitingToBeFreed = anchor.waitingToBeFreed; - entryStatus.collapsed = EBIT_TEST(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING); } void diff --git a/src/Transients.h b/src/Transients.h index 7a2eff57d2..82a42ebdd3 100644 --- a/src/Transients.h +++ b/src/Transients.h @@ -33,7 +33,6 @@ public: public: bool hasWriter = false; ///< whether some worker is storing the entry bool waitingToBeFreed = false; ///< whether the entry was marked for deletion - bool collapsed = false; ///< whether the entry allows collapsing }; Transients(); @@ -42,9 +41,6 @@ public: /// return a local, previously collapsed entry StoreEntry *findCollapsed(const sfileno xitIndex); - /// removes collapsing requirement (for future hits) - void clearCollapsingRequirement(const StoreEntry &e); - /// start listening for remote DELETE requests targeting either a complete /// StoreEntry (ioReading) or a being-formed miss StoreEntry (ioWriting) void monitorIo(StoreEntry*, const cache_key*, const Store::IoStatus); diff --git a/src/ipc/StoreMap.cc b/src/ipc/StoreMap.cc index 2eaac13de0..59cee48770 100644 --- a/src/ipc/StoreMap.cc +++ b/src/ipc/StoreMap.cc @@ -985,12 +985,18 @@ Ipc::StoreMapAnchor::exportInto(StoreEntry &into) const into.lastModified(basics.lastmod); into.swap_file_sz = basics.swap_file_sz; into.refcount = basics.refcount; + + // Some basics.flags are not meaningful and should not be overwritten here. + // ENTRY_REQUIRES_COLLAPSING is one of them. TODO: check other flags. const bool collapsingRequired = into.hittingRequiresCollapsing(); into.flags = basics.flags; - // There are possibly several flags we do not need to overwrite, - // and ENTRY_REQUIRES_COLLAPSING is one of them. - // TODO: check for other flags. - into.setCollapsingRequirement(collapsingRequired); + // Avoid into.setCollapsingRequirement() here: We only restore the bit we + // just cleared in the assignment above, while that method debugging will + // falsely imply that the collapsing requirements have changed. + if (collapsingRequired) + EBIT_SET(into.flags, ENTRY_REQUIRES_COLLAPSING); + else + EBIT_CLR(into.flags, ENTRY_REQUIRES_COLLAPSING); } void diff --git a/src/store.cc b/src/store.cc index b6e808279f..175774ffe6 100644 --- a/src/store.cc +++ b/src/store.cc @@ -1704,15 +1704,13 @@ StoreEntry::startWriting() rep->packHeadersUsingSlowPacker(*this); mem_obj->markEndOfReplyHeaders(); + // Same-worker collapsing risks end with the receipt of the headers. + // SMP collapsing risks remain until the headers are actually cached, but + // that event is announced via CF-agnostic Store writing broadcasts. + setCollapsingRequirement(false); + rep->body.packInto(this); flush(); - - // The entry headers are written, new clients - // should not collapse anymore. - if (hittingRequiresCollapsing()) { - setCollapsingRequirement(false); - Store::Root().transientsClearCollapsingRequirement(*this); - } } char const * @@ -1981,6 +1979,10 @@ StoreEntry::describeTimestamps() const void StoreEntry::setCollapsingRequirement(const bool required) { + if (hittingRequiresCollapsing() == required) + return; // no change + + debugs(20, 5, (required ? "adding to " : "removing from ") << *this); if (required) EBIT_SET(flags, ENTRY_REQUIRES_COLLAPSING); else diff --git a/src/store/Controller.cc b/src/store/Controller.cc index 4309d8337d..ec48c8e48c 100644 --- a/src/store/Controller.cc +++ b/src/store/Controller.cc @@ -647,13 +647,6 @@ Store::Controller::transientsDisconnect(StoreEntry &e) transients->disconnect(e); } -void -Store::Controller::transientsClearCollapsingRequirement(StoreEntry &e) -{ - if (transients) - transients->clearCollapsingRequirement(e); -} - void Store::Controller::handleIdleEntry(StoreEntry &e) { @@ -803,11 +796,6 @@ Store::Controller::syncCollapsed(const sfileno xitIndex) Transients::EntryStatus entryStatus; transients->status(*collapsed, entryStatus); - if (!entryStatus.collapsed) { - debugs(20, 5, "removing collapsing requirement for " << *collapsed << " since remote writer probably got headers"); - collapsed->setCollapsingRequirement(false); - } - if (entryStatus.waitingToBeFreed) { debugs(20, 3, "will release " << *collapsed << " due to waitingToBeFreed"); collapsed->release(true); // may already be marked @@ -818,12 +806,6 @@ Store::Controller::syncCollapsed(const sfileno xitIndex) assert(transients->isReader(*collapsed)); - if (entryStatus.collapsed && !collapsed->hittingRequiresCollapsing()) { - debugs(20, 3, "aborting " << *collapsed << " due to writer/reader collapsing state mismatch"); - collapsed->abort(); - return; - } - bool found = false; bool inSync = false; if (sharedMemStore && collapsed->mem_obj->memCache.io == MemObject::ioDone) { @@ -850,6 +832,8 @@ Store::Controller::syncCollapsed(const sfileno xitIndex) if (inSync) { debugs(20, 5, "synced " << *collapsed); + assert(found); + collapsed->setCollapsingRequirement(false); collapsed->invokeHandlers(); return; } @@ -868,6 +852,7 @@ Store::Controller::syncCollapsed(const sfileno xitIndex) // the entry is still not in one of the caches debugs(20, 7, "waiting " << *collapsed); + collapsed->setCollapsingRequirement(true); } /// Called for Transients entries that are not yet anchored to a cache. @@ -885,33 +870,37 @@ Store::Controller::anchorToCache(StoreEntry &entry, bool &inSync) Transients::EntryStatus entryStatus; transients->status(entry, entryStatus); + inSync = false; bool found = false; if (sharedMemStore) found = sharedMemStore->anchorToCache(entry, inSync); if (!found && swapDir) found = swapDir->anchorToCache(entry, inSync); + if (inSync) { + debugs(20, 7, "anchored " << entry); + assert(found); + entry.setCollapsingRequirement(false); + return true; + } + if (found) { - if (inSync) - debugs(20, 7, "anchored " << entry); - else - debugs(20, 5, "failed to anchor " << entry); + debugs(20, 5, "failed to anchor " << entry); return true; } if (entryStatus.waitingToBeFreed) { debugs(20, 5, "failed on marked unattached " << entry); - inSync = false; return true; } if (!entryStatus.hasWriter) { debugs(20, 5, "failed on abandoned-by-writer " << entry); - inSync = false; return true; } debugs(20, 7, "skipping not yet cached " << entry); + entry.setCollapsingRequirement(true); return false; } diff --git a/src/store/Controller.h b/src/store/Controller.h index 0b8ec38ea4..ab3471cc3f 100644 --- a/src/store/Controller.h +++ b/src/store/Controller.h @@ -120,9 +120,6 @@ public: /// disassociates the entry from the intransit table void transientsDisconnect(StoreEntry &); - /// removes collapsing requirement (for future hits) - void transientsClearCollapsingRequirement(StoreEntry &e); - /// disassociates the entry from the memory cache, preserving cached data void memoryDisconnect(StoreEntry &); diff --git a/src/tests/stub_libstore.cc b/src/tests/stub_libstore.cc index faaf177931..7a49ee4abb 100644 --- a/src/tests/stub_libstore.cc +++ b/src/tests/stub_libstore.cc @@ -51,7 +51,6 @@ void Controller::syncCollapsed(const sfileno) STUB void Controller::noteStoppedSharedWriting(StoreEntry &) STUB int Controller::transientReaders(const StoreEntry &) const STUB_RETVAL(0) void Controller::transientsDisconnect(StoreEntry &) STUB -void Controller::transientsClearCollapsingRequirement(StoreEntry &) STUB void Controller::memoryDisconnect(StoreEntry &) STUB StoreSearch *Controller::search() STUB_RETVAL(nullptr) bool Controller::SmpAware() STUB_RETVAL(false)