]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Stop (ab)using Transient entry flags for CF requirement mngmt (#1127)
authorAlex Rousskov <rousskov@measurement-factory.com>
Fri, 2 Sep 2022 17:14:12 +0000 (17:14 +0000)
committerSquid Anubis <squid-anubis@squid-cache.org>
Sat, 3 Sep 2022 18:41:32 +0000 (18:41 +0000)
The ENTRY_REQUIRES_COLLAPSING flag was used for relaying "hitting this
entry carries Collapsed Forwarding risks right now" info to other
workers. That flag is not necessary to relay that info because remote
workers already know whether the entry is attached to one of the shared
cache stores. Moreover, relaying that flag correctly is difficult and
would require a different implementation:

* The flag is changed in relatively high-level Store code, possibly
  _after_ we stopped writing, when the transients entry is already
  read-only. This bug leads to an isWriter() assertion in
  Transients::clearCollapsingRequirement() called via a quick or aborted
  StoreEntry::startWriting().

* Basics.flags is not an atomic field. It lacks an "[app]" mark -- its
  readers will not get a reliable value due to race conditions when
  Squid will need to update the Transient entry flags in "append" mode.

* The flag is changed assuming that simply sending stuff to disk is
  sufficient for other workers to read it. Since 18102f7, this is
  inaccurate because the store index is updated only after the slot is
  written to disk.

Remote workers now use Store attachment info to set the local StoreEntry
object flag, ignoring the no-longer-used Transients entry flags.

The ENTRY_REQUIRES_COLLAPSING flag is still used to tell transactions
which StoreEntry objects are subject to CF risks. It would be nice to
use the presence of MemObject::reply_ to relay the same info for
store-unattached entries (and attachment info for others), but we could
not find a way to do that (during commit d2a6dcb work and then again
during this project) without a lot of essentially out of scope rewrites.

Also removed misleading Ipc::StoreMapAnchor::exportInto() debugging.

src/Transients.cc
src/Transients.h
src/ipc/StoreMap.cc
src/store.cc
src/store/Controller.cc
src/store/Controller.h
src/tests/stub_libstore.cc

index 555144375ae569c3038b1dccf5f264432bfae7a5..a089dab6e76a670d6127a77f73ebe9b04af0e09a 100644 (file)
@@ -164,15 +164,6 @@ Transients::get(const cache_key *key)
         return nullptr;
     }
 
-    // store hadWriter before checking ENTRY_REQUIRES_COLLAPSING to avoid racing
-    // the writer that clears that flag and then leaves
-    const auto hadWriter = map->peekAtWriter(index);
-    if (!hadWriter && EBIT_TEST(anchor->basics.flags, ENTRY_REQUIRES_COLLAPSING)) {
-        debugs(20, 3, "not joining abandoned entry " << index);
-        map->closeForReadingAndFreeIdle(index);
-        return nullptr;
-    }
-
     StoreEntry *e = new StoreEntry();
     e->createMemObject();
     anchorEntry(*e, index, *anchor);
@@ -197,21 +188,6 @@ Transients::findCollapsed(const sfileno index)
     return nullptr;
 }
 
-void
-Transients::clearCollapsingRequirement(const StoreEntry &e)
-{
-    assert(map);
-    assert(e.hasTransients());
-    assert(isWriter(e));
-    const auto idx = e.mem_obj->xitTable.index;
-    auto &anchor = map->writeableEntry(idx);
-    if (EBIT_TEST(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING)) {
-        // XXX: Non-atomic, [app]-unmarked flags are off limits in append mode.
-        EBIT_CLR(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING);
-        CollapsedForwarding::Broadcast(e);
-    }
-}
-
 void
 Transients::monitorIo(StoreEntry *e, const cache_key *key, const Store::IoStatus direction)
 {
@@ -292,11 +268,7 @@ Transients::anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapA
     xitTable.index = index;
     xitTable.io = Store::ioReading;
 
-    const auto hadWriter = hasWriter(e); // before computing collapsingRequired
     anchor.exportInto(e);
-    const bool collapsingRequired = EBIT_TEST(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING);
-    assert(!collapsingRequired || hadWriter);
-    e.setCollapsingRequirement(collapsingRequired);
 }
 
 bool
@@ -323,7 +295,6 @@ Transients::status(const StoreEntry &entry, Transients::EntryStatus &entryStatus
                          map->writeableEntry(idx) : map->readableEntry(idx);
     entryStatus.hasWriter = anchor.writing();
     entryStatus.waitingToBeFreed = anchor.waitingToBeFreed;
-    entryStatus.collapsed = EBIT_TEST(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING);
 }
 
 void
index 7a2eff57d2aaad09e977fb2dd7b3983a42c62fcf..82a42ebdd38bcc31338ff463a65932d61234f846 100644 (file)
@@ -33,7 +33,6 @@ public:
     public:
         bool hasWriter = false; ///< whether some worker is storing the entry
         bool waitingToBeFreed = false; ///< whether the entry was marked for deletion
-        bool collapsed = false; ///< whether the entry allows collapsing
     };
 
     Transients();
@@ -42,9 +41,6 @@ public:
     /// return a local, previously collapsed entry
     StoreEntry *findCollapsed(const sfileno xitIndex);
 
-    /// removes collapsing requirement (for future hits)
-    void clearCollapsingRequirement(const StoreEntry &e);
-
     /// start listening for remote DELETE requests targeting either a complete
     /// StoreEntry (ioReading) or a being-formed miss StoreEntry (ioWriting)
     void monitorIo(StoreEntry*, const cache_key*, const Store::IoStatus);
index 2eaac13de034e59ec8c5df0c045f002fc25fb923..59cee4877097b388a6eccf5decf5858a50f1b01d 100644 (file)
@@ -985,12 +985,18 @@ Ipc::StoreMapAnchor::exportInto(StoreEntry &into) const
     into.lastModified(basics.lastmod);
     into.swap_file_sz = basics.swap_file_sz;
     into.refcount = basics.refcount;
+
+    // Some basics.flags are not meaningful and should not be overwritten here.
+    // ENTRY_REQUIRES_COLLAPSING is one of them. TODO: check other flags.
     const bool collapsingRequired = into.hittingRequiresCollapsing();
     into.flags = basics.flags;
-    // There are possibly several flags we do not need to overwrite,
-    // and ENTRY_REQUIRES_COLLAPSING is one of them.
-    // TODO: check for other flags.
-    into.setCollapsingRequirement(collapsingRequired);
+    // Avoid into.setCollapsingRequirement() here: We only restore the bit we
+    // just cleared in the assignment above, while that method debugging will
+    // falsely imply that the collapsing requirements have changed.
+    if (collapsingRequired)
+        EBIT_SET(into.flags, ENTRY_REQUIRES_COLLAPSING);
+    else
+        EBIT_CLR(into.flags, ENTRY_REQUIRES_COLLAPSING);
 }
 
 void
index b6e808279fc5864c3f18b58720f8716d1dc1b2ff..175774ffe6177a89963400df2572cf99d596e58d 100644 (file)
@@ -1704,15 +1704,13 @@ StoreEntry::startWriting()
     rep->packHeadersUsingSlowPacker(*this);
     mem_obj->markEndOfReplyHeaders();
 
+    // Same-worker collapsing risks end with the receipt of the headers.
+    // SMP collapsing risks remain until the headers are actually cached, but
+    // that event is announced via CF-agnostic Store writing broadcasts.
+    setCollapsingRequirement(false);
+
     rep->body.packInto(this);
     flush();
-
-    // The entry headers are written, new clients
-    // should not collapse anymore.
-    if (hittingRequiresCollapsing()) {
-        setCollapsingRequirement(false);
-        Store::Root().transientsClearCollapsingRequirement(*this);
-    }
 }
 
 char const *
@@ -1981,6 +1979,10 @@ StoreEntry::describeTimestamps() const
 void
 StoreEntry::setCollapsingRequirement(const bool required)
 {
+    if (hittingRequiresCollapsing() == required)
+        return; // no change
+
+    debugs(20, 5, (required ? "adding to " : "removing from ") << *this);
     if (required)
         EBIT_SET(flags, ENTRY_REQUIRES_COLLAPSING);
     else
index 4309d8337d4eb803647959b7ce3c44154850fd2d..ec48c8e48c92ae8c976953c3b2d4755c42a388ce 100644 (file)
@@ -647,13 +647,6 @@ Store::Controller::transientsDisconnect(StoreEntry &e)
         transients->disconnect(e);
 }
 
-void
-Store::Controller::transientsClearCollapsingRequirement(StoreEntry &e)
-{
-    if (transients)
-        transients->clearCollapsingRequirement(e);
-}
-
 void
 Store::Controller::handleIdleEntry(StoreEntry &e)
 {
@@ -803,11 +796,6 @@ Store::Controller::syncCollapsed(const sfileno xitIndex)
     Transients::EntryStatus entryStatus;
     transients->status(*collapsed, entryStatus);
 
-    if (!entryStatus.collapsed) {
-        debugs(20, 5, "removing collapsing requirement for " << *collapsed << " since remote writer probably got headers");
-        collapsed->setCollapsingRequirement(false);
-    }
-
     if (entryStatus.waitingToBeFreed) {
         debugs(20, 3, "will release " << *collapsed << " due to waitingToBeFreed");
         collapsed->release(true); // may already be marked
@@ -818,12 +806,6 @@ Store::Controller::syncCollapsed(const sfileno xitIndex)
 
     assert(transients->isReader(*collapsed));
 
-    if (entryStatus.collapsed && !collapsed->hittingRequiresCollapsing()) {
-        debugs(20, 3, "aborting " << *collapsed << " due to writer/reader collapsing state mismatch");
-        collapsed->abort();
-        return;
-    }
-
     bool found = false;
     bool inSync = false;
     if (sharedMemStore && collapsed->mem_obj->memCache.io == MemObject::ioDone) {
@@ -850,6 +832,8 @@ Store::Controller::syncCollapsed(const sfileno xitIndex)
 
     if (inSync) {
         debugs(20, 5, "synced " << *collapsed);
+        assert(found);
+        collapsed->setCollapsingRequirement(false);
         collapsed->invokeHandlers();
         return;
     }
@@ -868,6 +852,7 @@ Store::Controller::syncCollapsed(const sfileno xitIndex)
 
     // the entry is still not in one of the caches
     debugs(20, 7, "waiting " << *collapsed);
+    collapsed->setCollapsingRequirement(true);
 }
 
 /// Called for Transients entries that are not yet anchored to a cache.
@@ -885,33 +870,37 @@ Store::Controller::anchorToCache(StoreEntry &entry, bool &inSync)
     Transients::EntryStatus entryStatus;
     transients->status(entry, entryStatus);
 
+    inSync = false;
     bool found = false;
     if (sharedMemStore)
         found = sharedMemStore->anchorToCache(entry, inSync);
     if (!found && swapDir)
         found = swapDir->anchorToCache(entry, inSync);
 
+    if (inSync) {
+        debugs(20, 7, "anchored " << entry);
+        assert(found);
+        entry.setCollapsingRequirement(false);
+        return true;
+    }
+
     if (found) {
-        if (inSync)
-            debugs(20, 7, "anchored " << entry);
-        else
-            debugs(20, 5, "failed to anchor " << entry);
+        debugs(20, 5, "failed to anchor " << entry);
         return true;
     }
 
     if (entryStatus.waitingToBeFreed) {
         debugs(20, 5, "failed on marked unattached " << entry);
-        inSync = false;
         return true;
     }
 
     if (!entryStatus.hasWriter) {
         debugs(20, 5, "failed on abandoned-by-writer " << entry);
-        inSync = false;
         return true;
     }
 
     debugs(20, 7, "skipping not yet cached " << entry);
+    entry.setCollapsingRequirement(true);
     return false;
 }
 
index 0b8ec38ea4d029d48aa766d8d19e8ad16ca77d93..ab3471cc3f2bdf9391938bec624c06ee72008b81 100644 (file)
@@ -120,9 +120,6 @@ public:
     /// disassociates the entry from the intransit table
     void transientsDisconnect(StoreEntry &);
 
-    /// removes collapsing requirement (for future hits)
-    void transientsClearCollapsingRequirement(StoreEntry &e);
-
     /// disassociates the entry from the memory cache, preserving cached data
     void memoryDisconnect(StoreEntry &);
 
index faaf1779313c3a34cbec172c138009ef0ff39b9f..7a49ee4abb96723883752ee39b707823c7f4a4aa 100644 (file)
@@ -51,7 +51,6 @@ void Controller::syncCollapsed(const sfileno) STUB
 void Controller::noteStoppedSharedWriting(StoreEntry &) STUB
 int Controller::transientReaders(const StoreEntry &) const STUB_RETVAL(0)
 void Controller::transientsDisconnect(StoreEntry &) STUB
-void Controller::transientsClearCollapsingRequirement(StoreEntry &) STUB
 void Controller::memoryDisconnect(StoreEntry &) STUB
 StoreSearch *Controller::search() STUB_RETVAL(nullptr)
 bool Controller::SmpAware() STUB_RETVAL(false)