return true;
}
+ if (anchor.writerHalted) {
+ debugs(20, 5, "mem-loaded aborted " << e.mem_obj->endOffset() << '/' <<
+ anchor.basics.swap_file_sz << " bytes of " << e);
+ return false;
+ }
+
debugs(20, 5, "mem-loaded all " << e.mem_obj->endOffset() << '/' <<
anchor.basics.swap_file_sz << " bytes of " << e);
return false;
}
+ // Store::Root() in the next check below is FATALly missing during shutdown
+ if (shutting_down) {
+ debugs(20, 5, "yield to shutdown: " << e);
+ return false;
+ }
+
+ // To avoid SMP workers releasing each other caching attempts, restrict disk
+ // caching to StoreEntry publisher. This check goes before memoryCachable()
+ // that may incorrectly release() publisher's entry via checkCachable().
+ if (Store::Root().transientsReader(e)) {
+ debugs(20, 5, "yield to entry publisher: " << e);
+ return false;
+ }
+
if (!e.memoryCachable()) {
debugs(20, 7, "Not memory cachable: " << e);
return false; // will not cache due to entry state or properties
e.mem_obj->memCache.io = MemObject::ioDone;
map->closeForWriting(index);
- CollapsedForwarding::Broadcast(e); // before we close our transient entry!
- Store::Root().transientsCompleteWriting(e);
+ CollapsedForwarding::Broadcast(e);
+ e.storeWriterDone();
}
void
map->abortWriting(mem_obj.memCache.index);
mem_obj.memCache.index = -1;
mem_obj.memCache.io = MemObject::ioDone;
- Store::Root().stopSharing(e); // broadcasts after the change
+ CollapsedForwarding::Broadcast(e);
+ e.storeWriterDone();
} else {
assert(mem_obj.memCache.io == MemObject::ioReading);
map->closeForReading(mem_obj.memCache.index);
void memOutDecision(const bool willCacheInRam);
// called when a decision to cache on disk has been made
void swapOutDecision(const MemObject::SwapOut::Decision &decision);
+ /// called when a store writer ends its work (successfully or not)
+ void storeWriterDone();
void abort();
bool makePublic(const KeyScope keyScope = ksDefault);
protected:
typedef Store::EntryGuard EntryGuard;
- void transientsAbandonmentCheck();
+ void storeWritingCheckpoint();
/// does nothing except throwing if disk-associated data members are inconsistent
void checkDisk() const;
const auto idx = e.mem_obj->xitTable.index;
auto &anchor = map->writeableEntry(idx);
if (EBIT_TEST(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING)) {
+ // XXX: Non-atomic, [app]-unmarked flags are off limits in append mode.
EBIT_CLR(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING);
CollapsedForwarding::Broadcast(e);
}
const auto idx = entry.mem_obj->xitTable.index;
const auto &anchor = isWriter(entry) ?
map->writeableEntry(idx) : map->readableEntry(idx);
- entryStatus.abortedByWriter = anchor.writerHalted;
+ entryStatus.hasWriter = anchor.writing();
entryStatus.waitingToBeFreed = anchor.waitingToBeFreed;
entryStatus.collapsed = EBIT_TEST(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING);
}
void
Transients::completeWriting(const StoreEntry &e)
{
+ debugs(20, 5, e);
assert(e.hasTransients());
assert(isWriter(e));
map->switchWritingToReading(e.mem_obj->xitTable.index);
e.mem_obj->xitTable.io = Store::ioReading;
+ CollapsedForwarding::Broadcast(e);
}
int
auto &xitTable = entry.mem_obj->xitTable;
assert(map);
if (isWriter(entry)) {
- map->abortWriting(xitTable.index);
+ // completeWriting() was not called, so there could be an active
+ // Store writer out there, but we should not abortWriting() here
+ // because another writer may have succeeded, making readers happy.
+ // If none succeeded, the readers will notice the lack of writers.
+ map->closeForWriting(xitTable.index);
+ CollapsedForwarding::Broadcast(entry);
} else {
assert(isReader(entry));
map->closeForReadingAndFreeIdle(xitTable.index);
typedef Ipc::StoreMap TransientsMap;
-/// Keeps track of store entries being delivered to clients that arrived before
-/// those entries were [fully] cached. This SMP-shared table is necessary to
-/// * sync an entry-writing worker with entry-reading worker(s); and
-/// * sync an entry-deleting worker with both entry-reading/writing workers.
+/// A Transients entry allows workers to Broadcast() DELETE requests and swapout
+/// progress updates. In a collapsed forwarding context, it also represents a CF
+/// initiating worker promise to either cache the response or inform the waiting
+/// slaves (via false EntryStatus::hasWriter) that caching will not happen. A
+/// Transients entry itself does not carry response- or Store-specific metadata.
class Transients: public Store::Controlled, public Ipc::StoreMapCleaner
{
public:
class EntryStatus
{
public:
- bool abortedByWriter = false; ///< whether the entry was aborted
+ bool hasWriter = false; ///< whether some worker is storing the entry
bool waitingToBeFreed = false; ///< whether the entry was marked for deletion
bool collapsed = false; ///< whether the entry allows collapsing
};
assert(theFile != nullptr);
assert(coreOff >= 0);
+ bool writerLeft = readAnchor().writerHalted; // before the sidCurrent change
+
// if we are dealing with the first read or
// if the offset went backwords, start searching from the beginning
if (sidCurrent < 0 || coreOff < objOffset) {
}
while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) {
+ writerLeft = readAnchor().writerHalted; // before the sidCurrent change
objOffset += currentReadableSlice().size;
sidCurrent = currentReadableSlice().next;
}
read.callback = cb;
read.callback_data = cbdataReference(data);
+ // quit if we cannot read what they want, and the writer cannot add more
+ if (sidCurrent < 0 && writerLeft) {
+ debugs(79, 5, "quitting at " << coreOff << " in " << *e);
+ callReaderBack(buf, -1);
+ return;
+ }
+
// punt if read offset is too big (because of client bugs or collapsing)
if (sidCurrent < 0) {
debugs(79, 5, "no " << coreOff << " in " << *e);
map->abortWriting(e.swap_filen);
e.detachFromDisk();
dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_ = nullptr;
- Store::Root().stopSharing(e); // broadcasts after the change
+ CollapsedForwarding::Broadcast(e);
+ e.storeWriterDone();
} else {
map->closeForReading(e.swap_filen);
e.detachFromDisk();
}
void
-Rock::SwapDir::finalizeSwapoutSuccess(const StoreEntry &)
+Rock::SwapDir::finalizeSwapoutSuccess(const StoreEntry &e)
{
- // nothing to do
+ // nothing to do; handleWriteCompletionSuccess() did everything for us
+ assert(!e.mem_obj ||
+ !e.mem_obj->swapout.sio ||
+ !dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_);
}
void
map->switchWritingToReading(sio.swap_filen);
// sio.e keeps the (now read) lock on the anchor
+ // storeSwapOutFileClosed() sets swap_status and calls storeWriterDone()
}
sio.writeableAnchor_ = nullptr;
sio.finishedWriting(DISK_OK);
map->freeEntry(sio.swap_filen); // will mark as unusable, just in case
if (sio.touchingStoreEntry())
- Store::Root().stopSharing(*sio.e);
+ CollapsedForwarding::Broadcast(*sio.e);
// else noop: a fresh entry update error does not affect stale entry readers
// All callers must also call IoState callback, to propagate the error.
}
/**
- * Abandon the transient entry our worker has created if neither the shared
- * memory cache nor the disk cache wants to store it. Collapsed requests, if
- * any, should notice and use Plan B instead of getting stuck waiting for us
- * to start swapping the entry out.
+ * If needed, signal transient entry readers that no more cache changes are
+ * expected and, hence, they should switch to Plan B instead of getting stuck
+ * waiting for us to start or finish storing the entry.
*/
void
-StoreEntry::transientsAbandonmentCheck()
-{
- if (mem_obj && !Store::Root().transientsReader(*this) && // this worker is responsible
- hasTransients() && // other workers may be interested
- !hasMemStore() && // rejected by the shared memory cache
- mem_obj->swapout.decision == MemObject::SwapOut::swImpossible) {
- debugs(20, 7, "cannot be shared: " << *this);
- if (!shutting_down) // Store::Root() is FATALly missing during shutdown
- Store::Root().stopSharing(*this);
+StoreEntry::storeWritingCheckpoint()
+{
+ if (!hasTransients())
+ return; // no SMP complications
+
+ // writers become readers but only after completeWriting() which we trigger
+ if (Store::Root().transientsReader(*this))
+ return; // readers do not need to inform
+
+ assert(mem_obj);
+ if (mem_obj->memCache.io != Store::ioDone) {
+ debugs(20, 7, "not done with mem-caching " << *this);
+ return;
+ }
+
+ const auto doneWithDiskCache =
+ // will not start
+ (mem_obj->swapout.decision == MemObject::SwapOut::swImpossible) ||
+ // or has started but finished already
+ (mem_obj->swapout.decision == MemObject::SwapOut::swStarted && !swappingOut());
+ if (!doneWithDiskCache) {
+ debugs(20, 7, "not done with disk-caching " << *this);
+ return;
}
+
+ debugs(20, 7, "done with writing " << *this);
+ if (!shutting_down) // Store::Root() is FATALly missing during shutdown
+ Store::Root().noteStoppedSharedWriting(*this);
}
void
-StoreEntry::memOutDecision(const bool)
+StoreEntry::memOutDecision(const bool willCacheInRam)
{
- transientsAbandonmentCheck();
+ if (!willCacheInRam)
+ return storeWritingCheckpoint();
+ assert(mem_obj->memCache.io != Store::ioDone);
+ // and wait for storeWriterDone()
}
void
StoreEntry::swapOutDecision(const MemObject::SwapOut::Decision &decision)
{
- // Abandon our transient entry if neither shared memory nor disk wants it.
assert(mem_obj);
mem_obj->swapout.decision = decision;
- transientsAbandonmentCheck();
+ storeWritingCheckpoint();
+}
+
+void
+StoreEntry::storeWriterDone()
+{
+ storeWritingCheckpoint();
}
void
}
void
-Store::Controller::stopSharing(StoreEntry &e)
+Store::Controller::noteStoppedSharedWriting(StoreEntry &e)
{
- // Marking the transients entry is sufficient to prevent new readers from
- // starting to wait for `e` updates and to inform the current readers (and,
- // hence, Broadcast() recipients) about the underlying Store problems.
- if (transients && e.hasTransients())
- transients->evictCached(e);
-}
-
-void
-Store::Controller::transientsCompleteWriting(StoreEntry &e)
-{
- // transients->isWriter(e) is false if `e` is writing to its second store
- // after finishing writing to its first store: At the end of the first swap
- // out, the transients writer becomes a reader and (XXX) we never switch
- // back to writing, even if we start swapping out again (to another store).
- if (transients && e.hasTransients() && transients->isWriter(e))
+ if (transients && e.hasTransients()) // paranoid: the caller should check
transients->completeWriting(e);
}
if (e->makePublic(keyScope)) { // this is needed for both local and SMP collapsing
debugs(20, 3, "may " << (transients && e->hasTransients() ?
"SMP-" : "locally-") << "collapse " << *e);
+ assert(e->hittingRequiresCollapsing());
return true;
}
// paranoid cleanup; the flag is meaningless for private entries
assert(transients->isReader(*collapsed));
- if (entryStatus.abortedByWriter) {
- debugs(20, 3, "aborting " << *collapsed << " because its writer has aborted");
- collapsed->abort();
- return;
- }
-
if (entryStatus.collapsed && !collapsed->hittingRequiresCollapsing()) {
debugs(20, 3, "aborting " << *collapsed << " due to writer/reader collapsing state mismatch");
collapsed->abort();
return;
}
+ if (!entryStatus.hasWriter) {
+ debugs(20, 3, "aborting abandoned-by-writer " << *collapsed);
+ collapsed->abort();
+ return;
+ }
+
// the entry is still not in one of the caches
debugs(20, 7, "waiting " << *collapsed);
}
/// Called for Transients entries that are not yet anchored to a cache.
-/// For cached entries, return true after synchronizing them with their cache
-/// (making inSync true on success). For not-yet-cached entries, return false.
+/// \returns false for not-yet-cached entries that we may attach later
+/// \returns true for other entries after synchronizing them with their store
+/// and setting inSync to reflect that synchronization outcome.
bool
Store::Controller::anchorToCache(StoreEntry &entry, bool &inSync)
{
debugs(20, 7, "anchoring " << entry);
+ Transients::EntryStatus entryStatus;
+ transients->status(entry, entryStatus);
+
bool found = false;
if (sharedMemStore)
found = sharedMemStore->anchorToCache(entry, inSync);
debugs(20, 7, "anchored " << entry);
else
debugs(20, 5, "failed to anchor " << entry);
- } else {
- debugs(20, 7, "skipping not yet cached " << entry);
+ return true;
+ }
+
+ if (entryStatus.waitingToBeFreed) {
+ debugs(20, 5, "failed on marked unattached " << entry);
+ inSync = false;
+ return true;
+ }
+
+ if (!entryStatus.hasWriter) {
+ debugs(20, 5, "failed on abandoned-by-writer " << entry);
+ inSync = false;
+ return true;
}
- return found;
+ debugs(20, 7, "skipping not yet cached " << entry);
+ return false;
}
bool
/// whether the entry is in "writing to Transients" I/O state
bool transientsWriter(const StoreEntry &) const;
- /// marks the entry completed for collapsed requests
- void transientsCompleteWriting(StoreEntry &);
-
/// Update local intransit entry after changes made by appending worker.
void syncCollapsed(const sfileno);
- /// stop any current (and prevent any future) SMP sharing of the given entry
- void stopSharing(StoreEntry &);
+ /// adjust shared state after this worker stopped changing the entry
+ void noteStoppedSharedWriting(StoreEntry &);
/// number of the transient entry readers some time ago
int transientReaders(const StoreEntry &) const;
debugs(20, 5, "storeSwapOutStart: Begin SwapOut '" << e->url() << "' to dirno " <<
e->swap_dirn << ", fileno " << std::hex << std::setw(8) << std::setfill('0') <<
std::uppercase << e->swap_filen);
- e->swapOutDecision(MemObject::SwapOut::swStarted);
/* If we start swapping out objects with OutOfBand Metadata,
* then this code needs changing
*/
/* Pick up the file number if it was assigned immediately */
e->attachToDisk(mem->swapout.sio->swap_dirn, mem->swapout.sio->swap_filen, SWAPOUT_WRITING);
+ e->swapOutDecision(MemObject::SwapOut::swStarted); // after SWAPOUT_WRITING
+
/* write out the swap metadata */
storeIOWrite(mem->swapout.sio, buf, mem->swap_hdr_sz, 0, xfree_cppwrapper);
}
void
StoreEntry::swapOut()
{
+ // Store::Root() in many swapout checks is FATALly missing during shutdown
+ if (shutting_down)
+ return;
+
if (!mem_obj)
return;
++statCounter.swap.outs;
}
- Store::Root().transientsCompleteWriting(*e);
debugs(20, 3, "storeSwapOutFileClosed: " << __FILE__ << ":" << __LINE__);
mem->swapout.sio = nullptr;
+ e->storeWriterDone(); // after updating swap_status
e->unlock("storeSwapOutFileClosed");
}
return false;
}
- // if we are swapping out or swapped out already, do not start over
- if (hasDisk() || Store::Root().hasReadableDiskEntry(*this)) {
- debugs(20, 3, "already did");
- swapOutDecision(MemObject::SwapOut::swImpossible);
+ // If we have started swapping out, do not start over. Most likely, we have
+ // finished swapping out by now because we are not currently swappingOut().
+ if (decision == MemObject::SwapOut::swStarted) {
+ debugs(20, 3, "already started");
return false;
}
- // if we have just stared swapping out (attachToDisk() has not been
- // called), do not start over
- if (decision == MemObject::SwapOut::swStarted) {
- debugs(20, 3, "already started");
+ // if there is a usable disk entry already, do not start over
+ if (hasDisk() || Store::Root().hasReadableDiskEntry(*this)) {
+ debugs(20, 3, "already did"); // we or somebody else created that entry
swapOutDecision(MemObject::SwapOut::swImpossible);
return false;
}
return true;
}
+ // To avoid SMP workers releasing each other caching attempts, restrict disk
+ // caching to StoreEntry publisher. This check goes before checkCachable()
+ // that may incorrectly release() publisher's entry.
+ if (Store::Root().transientsReader(*this)) {
+ debugs(20, 5, "yield to entry publisher");
+ swapOutDecision(MemObject::SwapOut::swImpossible);
+ return false;
+ }
+
if (!checkCachable()) {
debugs(20, 3, "not cachable");
swapOutDecision(MemObject::SwapOut::swImpossible);
void Controller::addWriting(StoreEntry *, const cache_key *) STUB
bool Controller::transientsReader(const StoreEntry &) const STUB_RETVAL(false)
bool Controller::transientsWriter(const StoreEntry &) const STUB_RETVAL(false)
-void Controller::transientsCompleteWriting(StoreEntry &) STUB
void Controller::syncCollapsed(const sfileno) STUB
-void Controller::stopSharing(StoreEntry &) STUB
+void Controller::noteStoppedSharedWriting(StoreEntry &) STUB
int Controller::transientReaders(const StoreEntry &) const STUB_RETVAL(0)
void Controller::transientsDisconnect(StoreEntry &) STUB
void Controller::transientsClearCollapsingRequirement(StoreEntry &) STUB