#include "swap_log_op.h"
#include "SwapDir.h"
#include "tools.h"
+#include "Transients.h"
#if HAVE_STATVFS
#if HAVE_SYS_STATVFS_H
int StoreController::store_dirs_rebuilding = 1;
StoreController::StoreController() : swapDir (new StoreHashIndex())
- , memStore(NULL)
+ , memStore(NULL), transients(NULL)
{}
StoreController::~StoreController()
{
delete memStore;
+ delete transients;
}
/*
storeDirSelectSwapDir = storeDirSelectSwapDirLeastLoad;
debugs(47, DBG_IMPORTANT, "Using Least Load store dir selection");
}
+
+ if (UsingSmp() && IamWorkerProcess() && Config.onoff.collapsed_forwarding) {
+ transients = new Transients;
+ transients->init();
+ }
}
void
StoreEntry *
StoreController::get(const cache_key *key)
+{
+ if (StoreEntry *e = find(key)) {
+ // this is not very precise: some get()s are not initiated by clients
+ e->touch();
+ return e;
+ }
+ return NULL;
+}
+
+/// Internal method to implements the guts of the Store::get() API:
+/// returns an in-transit or cached object with a given key, if any.
+StoreEntry *
+StoreController::find(const cache_key *key)
{
if (StoreEntry *e = swapDir->get(key)) {
// TODO: ignore and maybe handleIdleEntry() unlocked intransit entries
return e;
}
+ // Must search transients before caches because we must sync those we find.
+ if (transients) {
+ if (StoreEntry *e = transients->get(key)) {
+ debugs(20, 3, "got shared in-transit entry: " << *e);
+ bool inSync = false;
+ const bool found = anchorCollapsed(*e, inSync);
+ if (!found || inSync)
+ return e;
+ assert(!e->locked()); // ensure release will destroyStoreEntry()
+ e->release(); // do not let others into the same trap
+ return NULL;
+ }
+ }
+
if (memStore) {
if (StoreEntry *e = memStore->get(key)) {
debugs(20, 3, HERE << "got mem-cached entry: " << *e);
debugs(20, 4, HERE << "none of " << Config.cacheSwap.n_configured <<
" cache_dirs have " << storeKeyText(key));
+
return NULL;
}
fatal("not implemented");
}
+/// updates the collapsed entry with the corresponding on-disk entry, if any
+/// In other words, the SwapDir::anchorCollapsed() API applied to all disks.
+bool
+StoreController::anchorCollapsedOnDisk(StoreEntry &collapsed, bool &inSync)
+{
+ // TODO: move this loop to StoreHashIndex, just like the one in get().
+ if (const int cacheDirs = Config.cacheSwap.n_configured) {
+ // ask each cache_dir until the entry is found; use static starting
+ // point to avoid asking the same subset of disks more often
+ // TODO: coordinate with put() to be able to guess the right disk often
+ static int idx = 0;
+ for (int n = 0; n < cacheDirs; ++n) {
+ idx = (idx + 1) % cacheDirs;
+ SwapDir *sd = dynamic_cast<SwapDir*>(INDEXSD(idx));
+ if (!sd->active())
+ continue;
+
+ if (sd->anchorCollapsed(collapsed, inSync)) {
+ debugs(20, 3, "cache_dir " << idx << " anchors " << collapsed);
+ return true;
+ }
+ }
+ }
+
+ debugs(20, 4, "none of " << Config.cacheSwap.n_configured <<
+ " cache_dirs have " << collapsed);
+ return false;
+}
+
+void StoreController::markForUnlink(StoreEntry &e)
+{
+ if (transients && e.mem_obj && e.mem_obj->xitTable.index >= 0)
+ transients->markForUnlink(e);
+ if (memStore && e.mem_obj && e.mem_obj->memCache.index >= 0)
+ memStore->markForUnlink(e);
+ if (e.swap_filen >= 0)
+ e.store()->markForUnlink(e);
+}
+
// move this into [non-shared] memory cache class when we have one
/// whether e should be kept in local RAM for possible future caching
bool
}
void
-StoreController::maybeTrimMemory(StoreEntry &e, const bool preserveSwappable)
+StoreController::memoryOut(StoreEntry &e, const bool preserveSwappable)
{
bool keepInLocalMemory = false;
if (memStore)
- keepInLocalMemory = memStore->keepInLocalMemory(e);
+ memStore->write(e); // leave keepInLocalMemory false
else
keepInLocalMemory = keepForLocalMemoryCache(e);
e.trimMemory(preserveSwappable);
}
+void
+StoreController::memoryUnlink(StoreEntry &e)
+{
+ if (memStore)
+ memStore->unlink(e);
+ else // TODO: move into [non-shared] memory cache class when we have one
+ e.destroyMemObject();
+}
+
+void
+StoreController::memoryDisconnect(StoreEntry &e)
+{
+ if (memStore)
+ memStore->disconnect(e);
+ // else nothing to do for non-shared memory cache
+}
+
+void
+StoreController::transientsAbandon(StoreEntry &e)
+{
+ if (transients) {
+ assert(e.mem_obj);
+ if (e.mem_obj->xitTable.index >= 0)
+ transients->abandon(e);
+ }
+}
+
+void
+StoreController::transientsCompleteWriting(StoreEntry &e)
+{
+ if (transients) {
+ assert(e.mem_obj);
+ if (e.mem_obj->xitTable.index >= 0)
+ transients->completeWriting(e);
+ }
+}
+
+int
+StoreController::transientReaders(const StoreEntry &e) const
+{
+ return (transients && e.mem_obj && e.mem_obj->xitTable.index >= 0) ?
+ transients->readers(e) : 0;
+}
+
+void
+StoreController::transientsDisconnect(MemObject &mem_obj)
+{
+ if (transients)
+ transients->disconnect(mem_obj);
+}
+
void
StoreController::handleIdleEntry(StoreEntry &e)
{
// They are not managed [well] by any specific Store handled below.
keepInLocalMemory = true;
} else if (memStore) {
- memStore->considerKeeping(e);
// leave keepInLocalMemory false; memStore maintains its own cache
} else {
keepInLocalMemory = keepForLocalMemoryCache(e) && // in good shape and
}
}
+void
+StoreController::allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags,
+ const HttpRequestMethod &reqMethod)
+{
+ e->makePublic(); // this is needed for both local and SMP collapsing
+ if (transients)
+ transients->startWriting(e, reqFlags, reqMethod);
+ debugs(20, 3, "may " << (transients && e->mem_obj->xitTable.index >= 0 ?
+ "SMP-" : "locally-") << "collapse " << *e);
+}
+
+void
+StoreController::syncCollapsed(const sfileno xitIndex)
+{
+ assert(transients);
+
+ StoreEntry *collapsed = transients->findCollapsed(xitIndex);
+ if (!collapsed) { // the entry is no longer locally active, ignore update
+ debugs(20, 7, "not SMP-syncing not-transient " << xitIndex);
+ return;
+ }
+ assert(collapsed->mem_obj);
+ assert(collapsed->mem_obj->smpCollapsed);
+
+ debugs(20, 7, "syncing " << *collapsed);
+
+ bool abandoned = transients->abandoned(*collapsed);
+ bool found = false;
+ bool inSync = false;
+ if (memStore && collapsed->mem_obj->memCache.io == MemObject::ioDone) {
+ found = true;
+ inSync = true;
+ debugs(20, 7, "fully mem-loaded " << *collapsed);
+ } else if (memStore && collapsed->mem_obj->memCache.index >= 0) {
+ found = true;
+ inSync = memStore->updateCollapsed(*collapsed);
+ } else if (collapsed->swap_filen >= 0) {
+ found = true;
+ inSync = collapsed->store()->updateCollapsed(*collapsed);
+ } else {
+ found = anchorCollapsed(*collapsed, inSync);
+ }
+
+ if (abandoned && collapsed->store_status == STORE_PENDING) {
+ debugs(20, 3, "aborting abandoned but STORE_PENDING " << *collapsed);
+ collapsed->abort();
+ return;
+ }
+
+ if (inSync) {
+ debugs(20, 5, "synced " << *collapsed);
+ collapsed->invokeHandlers();
+ } else if (found) { // unrecoverable problem syncing this entry
+ debugs(20, 3, "aborting unsyncable " << *collapsed);
+ collapsed->abort();
+ } else { // the entry is still not in one of the caches
+ debugs(20, 7, "waiting " << *collapsed);
+ }
+}
+
+/// Called for in-transit entries that are not yet anchored to a cache.
+/// For cached entries, return true after synchronizing them with their cache
+/// (making inSync true on success). For not-yet-cached entries, return false.
+bool
+StoreController::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
+{
+ // this method is designed to work with collapsed transients only
+ assert(collapsed.mem_obj);
+ assert(collapsed.mem_obj->xitTable.index >= 0);
+ assert(collapsed.mem_obj->smpCollapsed);
+
+ debugs(20, 7, "anchoring " << collapsed);
+
+ bool found = false;
+ if (memStore)
+ found = memStore->anchorCollapsed(collapsed, inSync);
+ else if (Config.cacheSwap.n_configured)
+ found = anchorCollapsedOnDisk(collapsed, inSync);
+
+ if (found) {
+ if (inSync)
+ debugs(20, 7, "anchored " << collapsed);
+ else
+ debugs(20, 5, "failed to anchor " << collapsed);
+ } else {
+ debugs(20, 7, "skipping not yet cached " << collapsed);
+ }
+
+ return found;
+}
+
StoreHashIndex::StoreHashIndex()
{
if (store_table)