]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/store_dir.cc
Merged from trunk 13172.
[thirdparty/squid.git] / src / store_dir.cc
index cade545757edb10547f340169b5bdcfb64297768..009f0424ecb18788ef9bbac62c85cde3c739abc3 100644 (file)
@@ -46,6 +46,7 @@
 #include "swap_log_op.h"
 #include "SwapDir.h"
 #include "tools.h"
+#include "Transients.h"
 
 #if HAVE_STATVFS
 #if HAVE_SYS_STATVFS_H
@@ -86,12 +87,13 @@ static STDIRSELECT storeDirSelectSwapDirLeastLoad;
 int StoreController::store_dirs_rebuilding = 1;
 
 StoreController::StoreController() : swapDir (new StoreHashIndex())
-        , memStore(NULL)
+        , memStore(NULL), transients(NULL)
 {}
 
 StoreController::~StoreController()
 {
     delete memStore;
+    delete transients;
 }
 
 /*
@@ -117,6 +119,11 @@ StoreController::init()
         storeDirSelectSwapDir = storeDirSelectSwapDirLeastLoad;
         debugs(47, DBG_IMPORTANT, "Using Least Load store dir selection");
     }
+
+    if (UsingSmp() && IamWorkerProcess() && Config.onoff.collapsed_forwarding) {
+        transients = new Transients;
+        transients->init();
+    }
 }
 
 void
@@ -746,6 +753,19 @@ StoreController::dereference(StoreEntry &e, bool wantsLocalMemory)
 
 StoreEntry *
 StoreController::get(const cache_key *key)
+{
+    if (StoreEntry *e = find(key)) {
+        // this is not very precise: some get()s are not initiated by clients
+        e->touch(); 
+        return e;
+    }
+    return NULL;
+}
+
+/// Internal method to implements the guts of the Store::get() API:
+/// returns an in-transit or cached object with a given key, if any.
+StoreEntry *
+StoreController::find(const cache_key *key)
 {
     if (StoreEntry *e = swapDir->get(key)) {
         // TODO: ignore and maybe handleIdleEntry() unlocked intransit entries
@@ -754,6 +774,20 @@ StoreController::get(const cache_key *key)
         return e;
     }
 
+    // Must search transients before caches because we must sync those we find.
+    if (transients) {
+        if (StoreEntry *e = transients->get(key)) {
+            debugs(20, 3, "got shared in-transit entry: " << *e);
+            bool inSync = false;
+            const bool found = anchorCollapsed(*e, inSync);
+            if (!found || inSync)
+                return e;
+            assert(!e->locked()); // ensure release will destroyStoreEntry()
+            e->release(); // do not let others into the same trap
+            return NULL;
+        }
+    }
+
     if (memStore) {
         if (StoreEntry *e = memStore->get(key)) {
             debugs(20, 3, HERE << "got mem-cached entry: " << *e);
@@ -784,6 +818,7 @@ StoreController::get(const cache_key *key)
 
     debugs(20, 4, HERE << "none of " << Config.cacheSwap.n_configured <<
            " cache_dirs have " << storeKeyText(key));
+
     return NULL;
 }
 
@@ -793,6 +828,45 @@ StoreController::get(String const key, STOREGETCLIENT aCallback, void *aCallback
     fatal("not implemented");
 }
 
+/// updates the collapsed entry with the corresponding on-disk entry, if any
+/// In other words, the SwapDir::anchorCollapsed() API applied to all disks.
+bool
+StoreController::anchorCollapsedOnDisk(StoreEntry &collapsed, bool &inSync)
+{
+    // TODO: move this loop to StoreHashIndex, just like the one in get().
+    if (const int cacheDirs = Config.cacheSwap.n_configured) {
+        // ask each cache_dir until the entry is found; use static starting
+        // point to avoid asking the same subset of disks more often
+        // TODO: coordinate with put() to be able to guess the right disk often
+        static int idx = 0;
+        for (int n = 0; n < cacheDirs; ++n) {
+            idx = (idx + 1) % cacheDirs;
+            SwapDir *sd = dynamic_cast<SwapDir*>(INDEXSD(idx));
+            if (!sd->active())
+                continue;
+
+            if (sd->anchorCollapsed(collapsed, inSync)) {
+                debugs(20, 3, "cache_dir " << idx << " anchors " << collapsed);
+                return true;
+            }
+        }
+    }
+
+    debugs(20, 4, "none of " << Config.cacheSwap.n_configured <<
+           " cache_dirs have " << collapsed);
+    return false;
+}
+
+void StoreController::markForUnlink(StoreEntry &e)
+{
+    if (transients && e.mem_obj && e.mem_obj->xitTable.index >= 0)
+        transients->markForUnlink(e);
+    if (memStore && e.mem_obj && e.mem_obj->memCache.index >= 0)
+        memStore->markForUnlink(e);
+    if (e.swap_filen >= 0)
+        e.store()->markForUnlink(e);
+}
+
 // move this into [non-shared] memory cache class when we have one
 /// whether e should be kept in local RAM for possible future caching
 bool
@@ -813,11 +887,11 @@ StoreController::keepForLocalMemoryCache(const StoreEntry &e) const
 }
 
 void
-StoreController::maybeTrimMemory(StoreEntry &e, const bool preserveSwappable)
+StoreController::memoryOut(StoreEntry &e, const bool preserveSwappable)
 {
     bool keepInLocalMemory = false;
     if (memStore)
-        keepInLocalMemory = memStore->keepInLocalMemory(e);
+        memStore->write(e); // leave keepInLocalMemory false
     else
         keepInLocalMemory = keepForLocalMemoryCache(e);
 
@@ -827,6 +901,57 @@ StoreController::maybeTrimMemory(StoreEntry &e, const bool preserveSwappable)
         e.trimMemory(preserveSwappable);
 }
 
+void
+StoreController::memoryUnlink(StoreEntry &e)
+{
+    if (memStore)
+        memStore->unlink(e);
+    else // TODO: move into [non-shared] memory cache class when we have one
+        e.destroyMemObject();
+}
+
+void
+StoreController::memoryDisconnect(StoreEntry &e)
+{
+    if (memStore)
+        memStore->disconnect(e);
+    // else nothing to do for non-shared memory cache
+}
+
+void
+StoreController::transientsAbandon(StoreEntry &e)
+{
+    if (transients) {
+        assert(e.mem_obj);
+        if (e.mem_obj->xitTable.index >= 0)
+            transients->abandon(e);
+    }
+}
+
+void
+StoreController::transientsCompleteWriting(StoreEntry &e)
+{
+    if (transients) {
+        assert(e.mem_obj);
+        if (e.mem_obj->xitTable.index >= 0)
+            transients->completeWriting(e);
+    }
+}
+
+int
+StoreController::transientReaders(const StoreEntry &e) const
+{
+    return (transients && e.mem_obj && e.mem_obj->xitTable.index >= 0) ?
+        transients->readers(e) : 0;
+}
+
+void
+StoreController::transientsDisconnect(MemObject &mem_obj)
+{
+    if (transients)
+        transients->disconnect(mem_obj);
+}
+
 void
 StoreController::handleIdleEntry(StoreEntry &e)
 {
@@ -838,7 +963,6 @@ StoreController::handleIdleEntry(StoreEntry &e)
         // They are not managed [well] by any specific Store handled below.
         keepInLocalMemory = true;
     } else if (memStore) {
-        memStore->considerKeeping(e);
         // leave keepInLocalMemory false; memStore maintains its own cache
     } else {
         keepInLocalMemory = keepForLocalMemoryCache(e) && // in good shape and
@@ -865,6 +989,97 @@ StoreController::handleIdleEntry(StoreEntry &e)
     }
 }
 
+void
+StoreController::allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags,
+                                 const HttpRequestMethod &reqMethod)
+{
+    e->makePublic(); // this is needed for both local and SMP collapsing
+    if (transients)
+        transients->startWriting(e, reqFlags, reqMethod);
+    debugs(20, 3, "may " << (transients && e->mem_obj->xitTable.index >= 0 ?
+           "SMP-" : "locally-") << "collapse " << *e);
+}
+
+void
+StoreController::syncCollapsed(const sfileno xitIndex)
+{
+    assert(transients);
+
+    StoreEntry *collapsed = transients->findCollapsed(xitIndex);
+    if (!collapsed) { // the entry is no longer locally active, ignore update
+        debugs(20, 7, "not SMP-syncing not-transient " << xitIndex);
+        return;
+    }
+    assert(collapsed->mem_obj);
+    assert(collapsed->mem_obj->smpCollapsed);
+
+    debugs(20, 7, "syncing " << *collapsed);
+
+    bool abandoned = transients->abandoned(*collapsed);
+    bool found = false;
+    bool inSync = false;
+    if (memStore && collapsed->mem_obj->memCache.io == MemObject::ioDone) {
+        found = true;
+        inSync = true;
+        debugs(20, 7, "fully mem-loaded " << *collapsed);
+    } else if (memStore && collapsed->mem_obj->memCache.index >= 0) {
+        found = true;
+        inSync = memStore->updateCollapsed(*collapsed);
+    } else if (collapsed->swap_filen >= 0) {
+        found = true;
+        inSync = collapsed->store()->updateCollapsed(*collapsed);
+    } else {
+        found = anchorCollapsed(*collapsed, inSync);
+    }
+
+    if (abandoned && collapsed->store_status == STORE_PENDING) {
+        debugs(20, 3, "aborting abandoned but STORE_PENDING " << *collapsed);
+        collapsed->abort();
+        return;
+    }
+
+    if (inSync) {
+        debugs(20, 5, "synced " << *collapsed);
+        collapsed->invokeHandlers();
+    } else if (found) { // unrecoverable problem syncing this entry
+        debugs(20, 3, "aborting unsyncable " << *collapsed);
+        collapsed->abort();
+    } else { // the entry is still not in one of the caches
+        debugs(20, 7, "waiting " << *collapsed);
+    }
+}
+
+/// Called for in-transit entries that are not yet anchored to a cache.
+/// For cached entries, return true after synchronizing them with their cache
+/// (making inSync true on success). For not-yet-cached entries, return false.
+bool
+StoreController::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
+{
+    // this method is designed to work with collapsed transients only
+    assert(collapsed.mem_obj);
+    assert(collapsed.mem_obj->xitTable.index >= 0);
+    assert(collapsed.mem_obj->smpCollapsed);
+
+    debugs(20, 7, "anchoring " << collapsed);
+
+    bool found = false;
+    if (memStore)
+        found = memStore->anchorCollapsed(collapsed, inSync);
+    else if (Config.cacheSwap.n_configured)
+        found = anchorCollapsedOnDisk(collapsed, inSync);
+
+    if (found) {
+        if (inSync)
+            debugs(20, 7, "anchored " << collapsed);
+        else
+            debugs(20, 5, "failed to anchor " << collapsed);
+    } else {
+        debugs(20, 7, "skipping not yet cached " << collapsed);
+    }
+
+    return found;
+}
+
 StoreHashIndex::StoreHashIndex()
 {
     if (store_table)