The ENTRY_REQUIRES_COLLAPSING flag was used for relaying "hitting this
entry carries Collapsed Forwarding risks right now" info to other
workers. That flag is not necessary to relay that info because remote
workers already know whether the entry is attached to one of the shared
cache stores. Moreover, relaying that flag correctly is difficult and
would require a different implementation:
* The flag is changed in relatively high-level Store code, possibly
_after_ we stopped writing, when the transients entry is already
read-only. This bug leads to an isWriter() assertion in
Transients::clearCollapsingRequirement() called via a quick or aborted
StoreEntry::startWriting().
* Basics.flags is not an atomic field. It lacks an "[app]" mark -- its
readers will not get a reliable value due to race conditions when
Squid will need to update the Transient entry flags in "append" mode.
* The flag is changed assuming that simply sending stuff to disk is
sufficient for other workers to read it. Since
18102f7, this is
inaccurate because the store index is updated only after the slot is
written to disk.
Remote workers now use Store attachment info to set the local StoreEntry
object flag, ignoring the no-longer-used Transients entry flags.
The ENTRY_REQUIRES_COLLAPSING flag is still used to tell transactions
which StoreEntry objects are subject to CF risks. It would be nice to
use the presence of MemObject::reply_ to relay the same info for
store-unattached entries (and attachment info for others), but we could
not find a way to do that (during commit
d2a6dcb work and then again
during this project) without a lot of essentially out of scope rewrites.
Also removed misleading Ipc::StoreMapAnchor::exportInto() debugging.
return nullptr;
}
- // store hadWriter before checking ENTRY_REQUIRES_COLLAPSING to avoid racing
- // the writer that clears that flag and then leaves
- const auto hadWriter = map->peekAtWriter(index);
- if (!hadWriter && EBIT_TEST(anchor->basics.flags, ENTRY_REQUIRES_COLLAPSING)) {
- debugs(20, 3, "not joining abandoned entry " << index);
- map->closeForReadingAndFreeIdle(index);
- return nullptr;
- }
-
StoreEntry *e = new StoreEntry();
e->createMemObject();
anchorEntry(*e, index, *anchor);
return nullptr;
}
-void
-Transients::clearCollapsingRequirement(const StoreEntry &e)
-{
- assert(map);
- assert(e.hasTransients());
- assert(isWriter(e));
- const auto idx = e.mem_obj->xitTable.index;
- auto &anchor = map->writeableEntry(idx);
- if (EBIT_TEST(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING)) {
- // XXX: Non-atomic, [app]-unmarked flags are off limits in append mode.
- EBIT_CLR(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING);
- CollapsedForwarding::Broadcast(e);
- }
-}
-
void
Transients::monitorIo(StoreEntry *e, const cache_key *key, const Store::IoStatus direction)
{
xitTable.index = index;
xitTable.io = Store::ioReading;
- const auto hadWriter = hasWriter(e); // before computing collapsingRequired
anchor.exportInto(e);
- const bool collapsingRequired = EBIT_TEST(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING);
- assert(!collapsingRequired || hadWriter);
- e.setCollapsingRequirement(collapsingRequired);
}
bool
map->writeableEntry(idx) : map->readableEntry(idx);
entryStatus.hasWriter = anchor.writing();
entryStatus.waitingToBeFreed = anchor.waitingToBeFreed;
- entryStatus.collapsed = EBIT_TEST(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING);
}
void
public:
bool hasWriter = false; ///< whether some worker is storing the entry
bool waitingToBeFreed = false; ///< whether the entry was marked for deletion
- bool collapsed = false; ///< whether the entry allows collapsing
};
Transients();
/// return a local, previously collapsed entry
StoreEntry *findCollapsed(const sfileno xitIndex);
- /// removes collapsing requirement (for future hits)
- void clearCollapsingRequirement(const StoreEntry &e);
-
/// start listening for remote DELETE requests targeting either a complete
/// StoreEntry (ioReading) or a being-formed miss StoreEntry (ioWriting)
void monitorIo(StoreEntry*, const cache_key*, const Store::IoStatus);
into.lastModified(basics.lastmod);
into.swap_file_sz = basics.swap_file_sz;
into.refcount = basics.refcount;
+
+ // Some basics.flags are not meaningful and should not be overwritten here.
+ // ENTRY_REQUIRES_COLLAPSING is one of them. TODO: check other flags.
const bool collapsingRequired = into.hittingRequiresCollapsing();
into.flags = basics.flags;
- // There are possibly several flags we do not need to overwrite,
- // and ENTRY_REQUIRES_COLLAPSING is one of them.
- // TODO: check for other flags.
- into.setCollapsingRequirement(collapsingRequired);
+ // Avoid into.setCollapsingRequirement() here: We only restore the bit we
+ // just cleared in the assignment above, while that method debugging will
+ // falsely imply that the collapsing requirements have changed.
+ if (collapsingRequired)
+ EBIT_SET(into.flags, ENTRY_REQUIRES_COLLAPSING);
+ else
+ EBIT_CLR(into.flags, ENTRY_REQUIRES_COLLAPSING);
}
void
rep->packHeadersUsingSlowPacker(*this);
mem_obj->markEndOfReplyHeaders();
+ // Same-worker collapsing risks end with the receipt of the headers.
+ // SMP collapsing risks remain until the headers are actually cached, but
+ // that event is announced via CF-agnostic Store writing broadcasts.
+ setCollapsingRequirement(false);
+
rep->body.packInto(this);
flush();
-
- // The entry headers are written, new clients
- // should not collapse anymore.
- if (hittingRequiresCollapsing()) {
- setCollapsingRequirement(false);
- Store::Root().transientsClearCollapsingRequirement(*this);
- }
}
char const *
void
StoreEntry::setCollapsingRequirement(const bool required)
{
+ if (hittingRequiresCollapsing() == required)
+ return; // no change
+
+ debugs(20, 5, (required ? "adding to " : "removing from ") << *this);
if (required)
EBIT_SET(flags, ENTRY_REQUIRES_COLLAPSING);
else
transients->disconnect(e);
}
-void
-Store::Controller::transientsClearCollapsingRequirement(StoreEntry &e)
-{
- if (transients)
- transients->clearCollapsingRequirement(e);
-}
-
void
Store::Controller::handleIdleEntry(StoreEntry &e)
{
Transients::EntryStatus entryStatus;
transients->status(*collapsed, entryStatus);
- if (!entryStatus.collapsed) {
- debugs(20, 5, "removing collapsing requirement for " << *collapsed << " since remote writer probably got headers");
- collapsed->setCollapsingRequirement(false);
- }
-
if (entryStatus.waitingToBeFreed) {
debugs(20, 3, "will release " << *collapsed << " due to waitingToBeFreed");
collapsed->release(true); // may already be marked
assert(transients->isReader(*collapsed));
- if (entryStatus.collapsed && !collapsed->hittingRequiresCollapsing()) {
- debugs(20, 3, "aborting " << *collapsed << " due to writer/reader collapsing state mismatch");
- collapsed->abort();
- return;
- }
-
bool found = false;
bool inSync = false;
if (sharedMemStore && collapsed->mem_obj->memCache.io == MemObject::ioDone) {
if (inSync) {
debugs(20, 5, "synced " << *collapsed);
+ assert(found);
+ collapsed->setCollapsingRequirement(false);
collapsed->invokeHandlers();
return;
}
// the entry is still not in one of the caches
debugs(20, 7, "waiting " << *collapsed);
+ collapsed->setCollapsingRequirement(true);
}
/// Called for Transients entries that are not yet anchored to a cache.
Transients::EntryStatus entryStatus;
transients->status(entry, entryStatus);
+ inSync = false;
bool found = false;
if (sharedMemStore)
found = sharedMemStore->anchorToCache(entry, inSync);
if (!found && swapDir)
found = swapDir->anchorToCache(entry, inSync);
+ if (inSync) {
+ debugs(20, 7, "anchored " << entry);
+ assert(found);
+ entry.setCollapsingRequirement(false);
+ return true;
+ }
+
if (found) {
- if (inSync)
- debugs(20, 7, "anchored " << entry);
- else
- debugs(20, 5, "failed to anchor " << entry);
+ debugs(20, 5, "failed to anchor " << entry);
return true;
}
if (entryStatus.waitingToBeFreed) {
debugs(20, 5, "failed on marked unattached " << entry);
- inSync = false;
return true;
}
if (!entryStatus.hasWriter) {
debugs(20, 5, "failed on abandoned-by-writer " << entry);
- inSync = false;
return true;
}
debugs(20, 7, "skipping not yet cached " << entry);
+ entry.setCollapsingRequirement(true);
return false;
}
/// disassociates the entry from the intransit table
void transientsDisconnect(StoreEntry &);
- /// removes collapsing requirement (for future hits)
- void transientsClearCollapsingRequirement(StoreEntry &e);
-
/// disassociates the entry from the memory cache, preserving cached data
void memoryDisconnect(StoreEntry &);
void Controller::noteStoppedSharedWriting(StoreEntry &) STUB
int Controller::transientReaders(const StoreEntry &) const STUB_RETVAL(0)
void Controller::transientsDisconnect(StoreEntry &) STUB
-void Controller::transientsClearCollapsingRequirement(StoreEntry &) STUB
void Controller::memoryDisconnect(StoreEntry &) STUB
StoreSearch *Controller::search() STUB_RETVAL(nullptr)
bool Controller::SmpAware() STUB_RETVAL(false)