Wrote Transients description, replacing an irrelevant copy-pasted comment.
Maintain proper transient entry locks, distinguishing reading and writing
cases.
Fixed transients synchronization logic. Store::get() must not return
incomplete from-cache entries, except for local or transient ones. Otherwise,
the returned entry will not be updated when its remote writer makes changes.
Marked entries fully loaded from the shared memory cache as STORE_OK.
Avoid caching ENTRY_SPECIAL in the shared memory cache for now. This is not
strictly necessary, I think, but it simplifies shared caching log when
triaging start-test-analyze test cases. The restriction can be removed
when ENTRY_SPECIAL generation code becomes shared cache-aware, for example.
Fixed copy-paste error in Transients::disconnect().
Changed CollapsedForwarding::Broadcast() profile in preparation for excluding
broadcasts for entries without remote readers.
Do not purge entire cache entries just because we have to trim their RAM
footprint. The old code assumed that non-swappable entries may not have any
other stored content (which is no longer correct because they may still reside
in the shared memory cache) so it almost made sense to purge them, but it is
possible for clients to use partial in-RAM data when serving range requests,
so we should not be purging unless there are other reasons to do that. This
may expose client-side bugs if the hit validation code is not checking for RAM
entries being incomplete.
Allow MemObject::trimUnSwappable() to be called when there is nothing to trim.
This used to be a special case in StoreEntry::trimMemory(), but we do not need
it anymore after the above change.
Added transient and shared memory indexes to StoreEntry debugging summaries.
}
void
-CollapsedForwarding::Broadcast(const cache_key *key)
+CollapsedForwarding::Broadcast(const StoreEntry &e)
{
if (!queue.get())
return;
CollapsedForwardingMsg msg;
msg.sender = KidIdentifier;
- memcpy(msg.key, key, sizeof(msg.key));
+ memcpy(msg.key, e.key, sizeof(msg.key));
- debugs(17, 5, storeKeyText(key) << " to " << Config.workers << "-1 workers");
+ debugs(17, 5, storeKeyText(static_cast<cache_key*>(e.key)) << " to " <<
+ Config.workers << "-1 workers");
// TODO: send only to workers who are waiting for data
for (int workerId = 1; workerId <= Config.workers; ++workerId) {
#include <memory>
-class StoreIOState;
+class StoreEntry;
/// Sends and handles collapsed forwarding notifications.
class CollapsedForwarding
static void Init();
/// notify other workers about changes in entry state (e.g., new data)
- static void Broadcast(const cache_key *key);
+ static void Broadcast(const StoreEntry &e);
/// kick worker with empty IPC queue
static void Notify(const int workerId);
void
MemObject::trimUnSwappable()
{
- int64_t new_mem_lo = policyLowestOffsetToKeep(0);
- assert (new_mem_lo > 0);
-
- data_hdr.freeDataUpto(new_mem_lo);
- inmem_lo = new_mem_lo;
+ if (const int64_t new_mem_lo = policyLowestOffsetToKeep(false)) {
+ assert (new_mem_lo > 0);
+ data_hdr.freeDataUpto(new_mem_lo);
+ inmem_lo = new_mem_lo;
+ } // else we should not trim anything at this time
}
bool
SwapOut swapout;
+ /// cache "I/O" direction and status
+ typedef enum { ioUndecided, ioWriting, ioReading, ioDone } Io;
+
/// State of an entry with regards to the [shared] in-transit table.
class XitTable {
public:
- XitTable(): index(-1) {}
+ XitTable(): index(-1), io(ioUndecided) {}
int32_t index; ///< entry position inside the in-transit table
+ Io io; ///< current I/O state
};
XitTable xitTable; ///< current [shared] memory caching state for the entry
int32_t index; ///< entry position inside the memory cache
int64_t offset; ///< bytes written/read to/from the memory cache so far
- /// I/O direction and status
- typedef enum { ioUndecided, ioWriting, ioReading, ioDone } Io;
Io io; ///< current I/O state
};
MemCache memCache; ///< current [shared] memory caching state for the entry
const bool copied = copyFromShm(*e, index, *slot);
- // we copied everything we could to local memory; no more need to lock
- map->closeForReading(index);
- e->mem_obj->memCache.index = -1;
- e->mem_obj->memCache.io = MemObject::MemCache::ioDone;
-
if (copied) {
e->hashInsert(key);
return e;
MemObject::MemCache &mc = e.mem_obj->memCache;
mc.index = index;
- mc.io = MemObject::MemCache::ioReading;
+ mc.io = MemObject::ioReading;
}
/// copies the entire entry from shared to local memory
return true;
}
- e.mem_obj->object_sz = e.mem_obj->endOffset(); // from StoreEntry::complete()
debugs(20, 7, "mem-loaded all " << e.mem_obj->object_sz << '/' <<
anchor.basics.swap_file_sz << " bytes of " << e);
+
+ // from StoreEntry::complete()
+ e.mem_obj->object_sz = e.mem_obj->endOffset();
+ e.store_status = STORE_OK;
+
assert(e.mem_obj->object_sz >= 0);
assert(static_cast<uint64_t>(e.mem_obj->object_sz) == anchor.basics.swap_file_sz);
// would be nice to call validLength() here, but it needs e.key
- // XXX: unlock acnhor here!
+ // we read the entire response into the local memory; no more need to lock
+ disconnect(*e.mem_obj);
return true;
}
return false;
}
+ if (EBIT_TEST(e.flags, ENTRY_SPECIAL)) {
+ debugs(20, 5, HERE << "Not mem-caching ENTRY_SPECIAL " << e);
+ return false;
+ }
+
return true;
}
assert(e.mem_obj);
e.mem_obj->memCache.index = index;
- e.mem_obj->memCache.io = MemObject::MemCache::ioWriting;
+ e.mem_obj->memCache.io = MemObject::ioWriting;
slot->set(e);
map->startAppending(index);
return true;
debugs(20, 7, "entry " << e);
switch (e.mem_obj->memCache.io) {
- case MemObject::MemCache::ioUndecided:
+ case MemObject::ioUndecided:
if (!shouldCache(e) || !startCaching(e)) {
- e.mem_obj->memCache.io = MemObject::MemCache::ioDone;
+ e.mem_obj->memCache.io = MemObject::ioDone;
Store::Root().transientsAbandon(e);
- CollapsedForwarding::Broadcast(static_cast<cache_key*>(e.key));
+ CollapsedForwarding::Broadcast(e);
return;
}
break;
- case MemObject::MemCache::ioDone:
- case MemObject::MemCache::ioReading:
+ case MemObject::ioDone:
+ case MemObject::ioReading:
return; // we should not write in all of the above cases
- case MemObject::MemCache::ioWriting:
+ case MemObject::ioWriting:
break; // already decided to write and still writing
}
copyToShm(e);
if (e.store_status == STORE_OK) // done receiving new content
completeWriting(e);
- CollapsedForwarding::Broadcast(static_cast<cache_key*>(e.key));
+ else
+ CollapsedForwarding::Broadcast(e);
return;
}
catch (const std::exception &x) { // TODO: should we catch ... as well?
Store::Root().transientsAbandon(e);
disconnect(*e.mem_obj);
- CollapsedForwarding::Broadcast(static_cast<cache_key*>(e.key));
+ CollapsedForwarding::Broadcast(e);
}
void
debugs(20, 5, "mem-cached all " << e.mem_obj->memCache.offset << " bytes of " << e);
e.mem_obj->memCache.index = -1;
- e.mem_obj->memCache.io = MemObject::MemCache::ioDone;
+ e.mem_obj->memCache.io = MemObject::ioDone;
map->closeForWriting(index, false);
+
+ CollapsedForwarding::Broadcast(e); // before we close our transient entry!
+ Store::Root().transientsCompleteWriting(e);
}
void
MemStore::disconnect(MemObject &mem_obj)
{
if (mem_obj.memCache.index >= 0) {
- if (mem_obj.memCache.io == MemObject::MemCache::ioWriting) {
+ if (mem_obj.memCache.io == MemObject::ioWriting) {
map->abortWriting(mem_obj.memCache.index);
} else {
- assert(mem_obj.memCache.io == MemObject::MemCache::ioReading);
+ assert(mem_obj.memCache.io == MemObject::ioReading);
map->closeForReading(mem_obj.memCache.index);
}
mem_obj.memCache.index = -1;
- mem_obj.memCache.io = MemObject::MemCache::ioDone;
+ mem_obj.memCache.io = MemObject::ioDone;
}
}
/// makes the entry available for collapsing future requests
virtual void allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod) {}
+ // XXX: This method belongs to Store::Root/StoreController, but it is here
+ // to avoid casting Root() to StoreController until Root() API is fixed.
+ /// marks the entry completed for collapsed requests
+ virtual void transientsCompleteWriting(StoreEntry &e) {}
+
// XXX: This method belongs to Store::Root/StoreController, but it is here
// to avoid casting Root() to StoreController until Root() API is fixed.
/// Update local intransit entry after changes made by appending worker.
/* Store parent API */
virtual void handleIdleEntry(StoreEntry &e);
+ virtual void transientsCompleteWriting(StoreEntry &e);
virtual void transientsAbandon(StoreEntry &e);
virtual void transientsDisconnect(MemObject &mem_obj);
virtual void memoryOut(StoreEntry &e, const bool preserveSwappable);
private:
void createOneStore(Store &aStore);
bool keepForLocalMemoryCache(const StoreEntry &e) const;
+ bool anchorCollapsed(StoreEntry &collapsed, bool &inSync);
bool anchorCollapsedOnDisk(StoreEntry &collapsed, bool &inSync);
StorePointer swapDir; ///< summary view of all disk caches
assert(e->mem_obj);
e->mem_obj->method = extras.reqMethod;
+ e->mem_obj->xitTable.io = MemObject::ioReading;
e->mem_obj->xitTable.index = index;
// XXX: overwriting storeCreateEntry() which calls setPrivateKey() if
}
void
-Transients::put(StoreEntry *e, const RequestFlags &reqFlags,
+Transients::startWriting(StoreEntry *e, const RequestFlags &reqFlags,
const HttpRequestMethod &reqMethod)
{
assert(e);
try {
if (copyToShm(*e, index, reqFlags, reqMethod)) {
slot->set(*e);
+ e->mem_obj->xitTable.io = MemObject::ioWriting;
e->mem_obj->xitTable.index = index;
map->startAppending(index);
// keep write lock -- we will be supplying others with updates
return map->readableEntry(index).waitingToBeFreed;
}
+void
+Transients::completeWriting(const StoreEntry &e)
+{
+ if (e.mem_obj && e.mem_obj->xitTable.index >= 0) {
+ assert(e.mem_obj->xitTable.io == MemObject::ioWriting);
+ map->closeForWriting(e.mem_obj->xitTable.index);
+ e.mem_obj->xitTable.index = -1;
+ e.mem_obj->xitTable.io = MemObject::ioDone;
+ }
+}
+
void
Transients::disconnect(MemObject &mem_obj)
{
- assert(mem_obj.xitTable.index >= 0 && map);
- map->freeEntry(mem_obj.xitTable.index); // just marks the locked entry
- mem_obj.xitTable.index = -1;
- // We do not unlock the entry now because the problem is most likely with
- // the server resource rather than a specific cache writer, so we want to
- // prevent other readers from collapsing requests for that resource.
+ if (mem_obj.xitTable.index >= 0) {
+ assert(map);
+ if (mem_obj.xitTable.io == MemObject::ioWriting) {
+ map->abortWriting(mem_obj.xitTable.index);
+ } else {
+ assert(mem_obj.xitTable.io == MemObject::ioReading);
+ map->closeForReading(mem_obj.xitTable.index);
+ }
+ mem_obj.xitTable.index = -1;
+ mem_obj.xitTable.io = MemObject::ioDone;
+ }
}
/// calculates maximum number of entries we need to store and map
};
typedef Ipc::StoreMapWithExtras<TransientsMapExtras> TransientsMap;
-/// Stores HTTP entities in RAM. Current implementation uses shared memory.
-/// Unlike a disk store (SwapDir), operations are synchronous (and fast).
+/// Keeps track of hits being delivered to clients that arrived before those
+/// hits were [fully] cached. This shared table is necessary to synchronize hit
+/// caching (writing) workers with other workers serving (reading) those hits.
class Transients: public Store, public Ipc::StoreMapCleaner
{
public:
virtual ~Transients();
/// add an in-transit entry suitable for collapsing future requests
- void put(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod);
+ void startWriting(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod);
+
+ /// called when the in-transit entry has been successfully cached
+ void completeWriting(const StoreEntry &e);
/// the calling entry writer no longer expects to cache this entry
void abandon(const StoreEntry &e);
/// whether an in-transit entry is now abandoned by its writer
bool abandoned(const StoreEntry &e) const;
- /// the calling entry writer no longer expects to cache this entry
+ /// the caller is done writing or reading this entry
void disconnect(MemObject &mem_obj);
/* Store API */
// and hope that Core will call disconnect() to close the map entry
}
- CollapsedForwarding::Broadcast(static_cast<const cache_key*>(sio.e->key));
+ CollapsedForwarding::Broadcast(*sio.e);
}
void
if (EBIT_TEST(flags, ENTRY_SPECIAL))
return; // cannot trim because we do not load them again
- if (!preserveSwappable) {
- if (mem_obj->policyLowestOffsetToKeep(0) == 0) {
- /* Nothing to do */
- return;
- }
- /*
- * Its not swap-able, and we're about to delete a chunk,
- * so we must make it PRIVATE. This is tricky/ugly because
- * for the most part, we treat swapable == cachable here.
- */
- releaseRequest();
- mem_obj->trimUnSwappable ();
- } else {
- mem_obj->trimSwappable ();
- }
+ if (preserveSwappable)
+ mem_obj->trimSwappable();
+ else
+ mem_obj->trimUnSwappable();
+
+ debugs(88, 7, *this << " inmem_lo=" << mem_obj->inmem_lo);
}
bool
{
os << "e:";
+ if (e.mem_obj) {
+ if (e.mem_obj->xitTable.index > -1)
+ os << 't' << e.mem_obj->xitTable.index;
+ if (e.mem_obj->memCache.index > -1)
+ os << 'm' << e.mem_obj->memCache.index;
+ }
if (e.swap_filen > -1 || e.swap_dirn > -1)
- os << e.swap_filen << '@' << e.swap_dirn;
+ os << 'd' << e.swap_filen << '@' << e.swap_dirn;
os << '=';
if (e.flags) {
if (EBIT_TEST(e.flags, ENTRY_SPECIAL)) os << 'S';
if (EBIT_TEST(e.flags, ENTRY_REVALIDATE)) os << 'R';
- if (EBIT_TEST(e.flags, DELAY_SENDING)) os << 'T';
+ if (EBIT_TEST(e.flags, DELAY_SENDING)) os << 'P';
if (EBIT_TEST(e.flags, RELEASE_REQUEST)) os << 'X';
if (EBIT_TEST(e.flags, REFRESH_REQUEST)) os << 'F';
if (EBIT_TEST(e.flags, ENTRY_CACHABLE)) os << 'C';
return e;
}
+ // Must search transients before caches because we must sync those we find.
+ if (transients) {
+ if (StoreEntry *e = transients->get(key)) {
+ debugs(20, 3, "got shared in-transit entry: " << *e);
+ bool inSync = false;
+ const bool found = anchorCollapsed(*e, inSync);
+ if (!found || inSync)
+ return e;
+ assert(!e->locked()); // ensure release will destroyStoreEntry()
+ e->release(); // do not let others into the same trap
+ return NULL;
+ }
+ }
+
if (memStore) {
if (StoreEntry *e = memStore->get(key)) {
debugs(20, 3, HERE << "got mem-cached entry: " << *e);
debugs(20, 4, HERE << "none of " << Config.cacheSwap.n_configured <<
" cache_dirs have " << storeKeyText(key));
- // Last, check shared in-transit table if enabled.
- // We speculate that collapsed forwarding hits are less frequent than
- // proper cache hits checked above (the order does not matter for misses).
- if (transients) {
- if (StoreEntry *e = transients->get(key)) {
- debugs(20, 3, "got shared in-transit entry: " << *e);
- return e;
- }
- }
-
return NULL;
}
}
}
+void
+StoreController::transientsCompleteWriting(StoreEntry &e)
+{
+ if (transients) {
+ assert(e.mem_obj);
+ if (e.mem_obj->xitTable.index >= 0)
+ transients->completeWriting(e);
+ }
+}
+
void
StoreController::transientsDisconnect(MemObject &mem_obj)
{
{
e->makePublic(); // this is needed for both local and SMP collapsing
if (transients)
- transients->put(e, reqFlags, reqMethod);
+ transients->startWriting(e, reqFlags, reqMethod);
debugs(20, 3, "may " << (transients && e->mem_obj->xitTable.index >= 0 ?
"SMP-" : "locally-") << "collapse " << *e);
}
StoreController::syncCollapsed(const cache_key *key)
{
StoreEntry *collapsed = swapDir->get(key);
- if (!collapsed) // the entry is no longer locally active, ignore the update
+ if (!collapsed) { // the entry is no longer locally active, ignore update
+ debugs(20, 7, "not SMP-syncing not-local " << storeKeyText(key));
return;
+ }
+
+ if (!collapsed->mem_obj) {
+ // without mem_obj, the entry cannot be transient so we ignore it
+ debugs(20, 7, "not SMP-syncing not-shared " << *collapsed);
+ return;
+ }
+
+ if (collapsed->mem_obj->xitTable.index < 0) {
+ debugs(20, 7, "not SMP-syncing not-transient " << *collapsed);
+ return;
+ }
+
+ assert(transients);
+ if (transients->abandoned(*collapsed)) {
+ debugs(20, 3, "aborting abandoned " << *collapsed);
+ collapsed->abort();
+ return;
+ }
- if (collapsed->mem_obj && !collapsed->mem_obj->smpCollapsed) {
+ if (!collapsed->mem_obj->smpCollapsed) {
// this happens, e.g., when we tried collapsing but rejected the hit
- debugs(20, 7, "not SMP-syncing not SMP-collapsed " << *collapsed);
+ debugs(20, 7, "not SMP-syncing not-SMP-collapsed " << *collapsed);
return;
}
- // XXX: collapsed->mem_obj may be still hidden here
debugs(20, 7, "syncing " << *collapsed);
found = true;
inSync = collapsed->store()->updateCollapsed(*collapsed);
} else {
- if (memStore)
- found = memStore->anchorCollapsed(*collapsed, inSync);
- else if (Config.cacheSwap.n_configured)
- found = anchorCollapsedOnDisk(*collapsed, inSync);
+ found = anchorCollapsed(*collapsed, inSync);
}
if (inSync) {
debugs(20, 5, "synced " << *collapsed);
collapsed->invokeHandlers();
} else if (found) { // unrecoverable problem syncing this entry
- debugs(20, 3, "aborting " << *collapsed);
+ debugs(20, 3, "aborting unsyncable " << *collapsed);
collapsed->abort();
} else { // the entry is still not in one of the caches
debugs(20, 7, "waiting " << *collapsed);
}
}
+/// Called for in-transit entries that are not yet anchored to a cache.
+/// For cached entries, return true after synchronizing them with their cache
+/// (making inSync true on success). For not-yet-cached entries, return false.
+bool
+StoreController::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
+{
+ // this method is designed to work with collapsed transients only
+ assert(collapsed.mem_obj);
+ assert(collapsed.mem_obj->xitTable.index >= 0);
+ assert(collapsed.mem_obj->smpCollapsed);
+
+ debugs(20, 7, "anchoring " << collapsed);
+
+ bool found = false;
+ if (memStore)
+ found = memStore->anchorCollapsed(collapsed, inSync);
+ else if (Config.cacheSwap.n_configured)
+ found = anchorCollapsedOnDisk(collapsed, inSync);
+
+ if (found) {
+ if (inSync)
+ debugs(20, 7, "anchored " << collapsed);
+ else
+ debugs(20, 5, "failed to anchor " << collapsed);
+ } else {
+ debugs(20, 7, "skipping not yet cached " << collapsed);
+ }
+
+ return found;
+}
+
StoreHashIndex::StoreHashIndex()
{
if (store_table)