From 9a9954baee514e3e76f150be9da19d90e1d6cd69 Mon Sep 17 00:00:00 2001 From: Alex Rousskov Date: Thu, 21 Feb 2013 11:04:49 -0700 Subject: [PATCH] Support discovery of collapsable entries in kids via the new Transients map. --- src/Makefile.am | 7 + src/Store.h | 5 + src/SwapDir.h | 9 ++ src/Transients.cc | 308 +++++++++++++++++++++++++++++++++++++++ src/Transients.h | 66 +++++++++ src/client_side_reply.cc | 18 +-- src/store_dir.cc | 31 +++- 7 files changed, 434 insertions(+), 10 deletions(-) create mode 100644 src/Transients.cc create mode 100644 src/Transients.h diff --git a/src/Makefile.am b/src/Makefile.am index f73c787555..5b6545929d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -537,6 +537,8 @@ squid_SOURCES = \ swap_log_op.h \ SwapDir.cc \ SwapDir.h \ + Transients.cc \ + Transients.h \ MemStore.cc \ MemStore.h \ time.cc \ @@ -1558,6 +1560,7 @@ tests_testCacheManager_SOURCES = \ StoreSwapLogData.cc \ tools.h \ tools.cc \ + Transients.cc \ tunnel.cc \ SwapDir.cc \ MemStore.cc \ @@ -1984,6 +1987,7 @@ tests_testEvent_SOURCES = \ time.cc \ tools.h \ tools.cc \ + Transients.cc \ tunnel.cc \ MemStore.cc \ $(UNLINKDSOURCE) \ @@ -2231,6 +2235,7 @@ tests_testEventLoop_SOURCES = \ time.cc \ tools.h \ tools.cc \ + Transients.cc \ tunnel.cc \ MemStore.cc \ $(UNLINKDSOURCE) \ @@ -2753,6 +2758,7 @@ tests_testHttpRequest_SOURCES = \ event.cc \ tools.h \ tools.cc \ + Transients.cc \ tunnel.cc \ SwapDir.cc \ MemStore.cc \ @@ -3720,6 +3726,7 @@ tests_testURL_SOURCES = \ StrList.h \ StrList.cc \ SwapDir.cc \ + Transients.cc \ MemStore.cc \ tests/stub_debug.cc \ tests/stub_DiskIOModule.cc \ diff --git a/src/Store.h b/src/Store.h index a2a1b9ffcb..f63e8f3a1d 100644 --- a/src/Store.h +++ b/src/Store.h @@ -369,6 +369,11 @@ public: /// called to get rid of no longer needed entry data in RAM, if any virtual void maybeTrimMemory(StoreEntry &e, const bool preserveSwappable) {} + // XXX: This method belongs to Store::Root/StoreController, but it is here + // to avoid casting Root() to StoreController until Root() API is fixed. + /// makes the entry available for collapsing future requests + virtual void allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod) {} + private: static RefCount CurrentRoot; }; diff --git a/src/SwapDir.h b/src/SwapDir.h index 488cafca91..4a369c3598 100644 --- a/src/SwapDir.h +++ b/src/SwapDir.h @@ -37,6 +37,9 @@ /* forward decls */ class RemovalPolicy; class MemStore; +class Transients; +class RequestFlags; +class HttpRequestMethod; /* Store dir configuration routines */ /* SwapDir *sd, char *path ( + char *opt later when the strtok mess is gone) */ @@ -60,6 +63,7 @@ public: /* Store parent API */ virtual void handleIdleEntry(StoreEntry &e); virtual void maybeTrimMemory(StoreEntry &e, const bool preserveSwappable); + virtual void allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod); virtual void init(); @@ -95,6 +99,11 @@ private: StorePointer swapDir; ///< summary view of all disk caches MemStore *memStore; ///< memory cache + + /// A shared table of public store entries that do not know whether they + /// will belong to a memory cache, a disk cache, or will be uncachable + /// when the response header comes. Used for SMP collapsed forwarding. + Transients *transients; }; /* migrating from the Config based list of swapdirs */ diff --git a/src/Transients.cc b/src/Transients.cc new file mode 100644 index 0000000000..de8344decb --- /dev/null +++ b/src/Transients.cc @@ -0,0 +1,308 @@ +/* + * DEBUG: section 20 Storage Manager + * + */ + +#include "squid.h" +#include "base/RunnersRegistry.h" +#include "HttpReply.h" +#include "ipc/mem/Page.h" +#include "ipc/mem/Pages.h" +#include "MemObject.h" +#include "Transients.h" +#include "mime_header.h" +#include "SquidConfig.h" +#include "SquidMath.h" +#include "StoreStats.h" +#include "tools.h" + +#if HAVE_LIMITS_H +#include +#endif + + +/// shared memory segment path to use for Transients maps +static const char *MapLabel = "transients_map"; + + +Transients::Transients(): map(NULL) +{ +debugs(0,0, "Transients::ctor"); +} + +Transients::~Transients() +{ + delete map; +} + +void +Transients::init() +{ + const int64_t entryLimit = EntryLimit(); + if (entryLimit <= 0) + return; // no SMP support or a misconfiguration + + Must(!map); + map = new TransientsMap(MapLabel); + map->cleaner = this; +} + +void +Transients::getStats(StoreInfoStats &stats) const +{ +#if TRANSIENT_STATS_SUPPORTED + const size_t pageSize = Ipc::Mem::PageSize(); + + stats.mem.shared = true; + stats.mem.capacity = + Ipc::Mem::PageLimit(Ipc::Mem::PageId::cachePage) * pageSize; + stats.mem.size = + Ipc::Mem::PageLevel(Ipc::Mem::PageId::cachePage) * pageSize; + stats.mem.count = currentCount(); +#endif +} + +void +Transients::stat(StoreEntry &e) const +{ + storeAppendPrintf(&e, "\n\nTransient Objects\n"); + + storeAppendPrintf(&e, "Maximum Size: %.0f KB\n", maxSize()/1024.0); + storeAppendPrintf(&e, "Current Size: %.2f KB %.2f%%\n", + currentSize() / 1024.0, + Math::doublePercent(currentSize(), maxSize())); + + if (map) { + const int limit = map->entryLimit(); + storeAppendPrintf(&e, "Maximum entries: %9d\n", limit); + if (limit > 0) { + storeAppendPrintf(&e, "Current entries: %" PRId64 " %.2f%%\n", + currentCount(), (100.0 * currentCount() / limit)); + } + } +} + +void +Transients::maintain() +{ +} + +uint64_t +Transients::minSize() const +{ + return 0; // XXX: irrelevant, but Store parent forces us to implement this +} + +uint64_t +Transients::maxSize() const +{ + // Squid currently does not limit the total size of all transient objects + return std::numeric_limits::max(); +} + +uint64_t +Transients::currentSize() const +{ + // TODO: we do not get enough information to calculate this + // StoreEntry should update associated stores when its size changes + return 0; +} + +uint64_t +Transients::currentCount() const +{ + return map ? map->entryCount() : 0; +} + +int64_t +Transients::maxObjectSize() const +{ + // Squid currently does not limit the size of a transient object + return std::numeric_limits::max(); +} + +void +Transients::reference(StoreEntry &) +{ +} + +bool +Transients::dereference(StoreEntry &, bool) +{ + // no need to keep e in the global store_table for us; we have our own map + return false; +} + +int +Transients::callback() +{ + return 0; +} + +StoreSearch * +Transients::search(String const, HttpRequest *) +{ + fatal("not implemented"); + return NULL; +} + +StoreEntry * +Transients::get(const cache_key *key) +{ + if (!map) + return NULL; + + sfileno index; + if (!map->openForReading(key, index)) + return NULL; + + const TransientsMap::Extras &extras = map->extras(index); + + // create a brand new store entry and initialize it with stored info + StoreEntry *e = storeCreateEntry(extras.url, extras.url, + extras.reqFlags, extras.reqMethod); + // XXX: overwriting storeCreateEntry() because we are expected to return an unlocked entry + // TODO: move locking from storeCreateEntry to callers as a mid-term solution + e->lock_count = 0; + + assert(e->mem_obj); + e->mem_obj->method = extras.reqMethod; + + // we copied everything we could to local memory; no more need to lock + map->closeForReading(index); + + // XXX: overwriting storeCreateEntry() which calls setPrivateKey() if + // neighbors_do_private_keys (which is true in most cases and by default). + // This is nothing but waste of CPU cycles. Need a better API to avoid it. + e->setPublicKey(); + + assert(e->next); // e->hashInsert(key) is done in setPublicKey() + return e; +} + +void +Transients::get(String const key, STOREGETCLIENT aCallback, void *aCallbackData) +{ + // XXX: not needed but Store parent forces us to implement this + fatal("Transients::get(key,callback,data) should not be called"); +} + +void +Transients::put(StoreEntry *e, const RequestFlags &reqFlags, + const HttpRequestMethod &reqMethod) +{ + assert(e); + + if (!map) { + debugs(20, 5, "No map to add " << *e); + return; + } + + sfileno index = 0; + Ipc::StoreMapAnchor *slot = map->openForWriting(reinterpret_cast(e->key), index); + if (!slot) { + debugs(20, 5, "No room in map to index " << *e); + return; + } + + try { + if (copyToShm(*e, index, reqFlags, reqMethod)) { + slot->set(*e); + map->closeForWriting(index, false); + return; + } + // fall through to the error handling code + } + catch (const std::exception &x) { // TODO: should we catch ... as well? + debugs(20, 2, "error keeping entry " << index << + ' ' << *e << ": " << x.what()); + // fall through to the error handling code + } + + map->abortIo(index); +} + + +/// copies all relevant local data to shared memory +bool +Transients::copyToShm(const StoreEntry &e, const sfileno index, + const RequestFlags &reqFlags, + const HttpRequestMethod &reqMethod) +{ + TransientsMap::Extras &extras = map->extras(index); + + const char *url = e.url(); + const size_t urlLen = strlen(url); + Must(urlLen < sizeof(extras.url)); // we have space to store it all, plus 0 + strncpy(extras.url, url, sizeof(extras.url)); + extras.url[urlLen] = '\0'; + + extras.reqFlags = reqFlags; + + + Must(reqMethod != Http::METHOD_OTHER); + extras.reqMethod = reqMethod.id(); + + return true; +} + +void +Transients::noteFreeMapSlice(const sfileno sliceId) +{ + // TODO: we should probably find the entry being deleted and abort it +} + +/// calculates maximum number of entries we need to store and map +int64_t +Transients::EntryLimit() +{ + // TODO: we should also check whether any SMP-aware caching is configured + if (!UsingSmp() || !Config.onoff.collapsed_forwarding) + return 0; // no SMP collapsed forwarding possible or needed + + return 16*1024; // XXX: make configurable +} + +/// initializes shared memory segment used by Transients +class TransientsRr: public Ipc::Mem::RegisteredRunner +{ +public: + /* RegisteredRunner API */ + TransientsRr(): mapOwner(NULL) {} + virtual void run(const RunnerRegistry &); + virtual ~TransientsRr(); + +protected: + virtual void create(const RunnerRegistry &); + +private: + TransientsMap::Owner *mapOwner; +}; + +RunnerRegistrationEntry(rrAfterConfig, TransientsRr); + +void TransientsRr::run(const RunnerRegistry &r) +{ + assert(Config.memShared.configured()); + Ipc::Mem::RegisteredRunner::run(r); +} + +void TransientsRr::create(const RunnerRegistry &) +{ +debugs(0,0, "TransientsRr::create1: " << Config.onoff.collapsed_forwarding); + if (!Config.onoff.collapsed_forwarding) + return; + + const int64_t entryLimit = Transients::EntryLimit(); +debugs(0,0, "TransientsRr::create2: " << entryLimit); + if (entryLimit <= 0) + return; // no SMP configured or a misconfiguration + + Must(!mapOwner); + mapOwner = TransientsMap::Init(MapLabel, entryLimit); +} + +TransientsRr::~TransientsRr() +{ + delete mapOwner; +} diff --git a/src/Transients.h b/src/Transients.h new file mode 100644 index 0000000000..8e31a770e7 --- /dev/null +++ b/src/Transients.h @@ -0,0 +1,66 @@ +#ifndef SQUID_TRANSIENTS_H +#define SQUID_TRANSIENTS_H + +#include "http/MethodType.h" +#include "ipc/mem/Page.h" +#include "ipc/mem/PageStack.h" +#include "ipc/StoreMap.h" +#include "Store.h" + +// StoreEntry restoration info not already stored by Ipc::StoreMap +struct TransientsMapExtras { + char url[MAX_URL+1]; ///< Request-URI; TODO: decrease MAX_URL by one + RequestFlags reqFlags; ///< request flags + Http::MethodType reqMethod; ///< request method; extensions are not supported +}; +typedef Ipc::StoreMapWithExtras TransientsMap; + +/// Stores HTTP entities in RAM. Current implementation uses shared memory. +/// Unlike a disk store (SwapDir), operations are synchronous (and fast). +class Transients: public Store, public Ipc::StoreMapCleaner +{ +public: + Transients(); + virtual ~Transients(); + + /// add an in-transit entry suitable for collapsing future requests + void put(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod); + + /// cache the entry or forget about it until the next considerKeeping call + /// XXX: remove void considerKeeping(StoreEntry &e); + + /// whether e should be kept in local RAM for possible future caching + /// XXX: remove bool keepInLocalMemory(const StoreEntry &e) const; + + /* Store API */ + virtual int callback(); + virtual StoreEntry * get(const cache_key *); + virtual void get(String const key , STOREGETCLIENT callback, void *cbdata); + virtual void init(); + virtual uint64_t maxSize() const; + virtual uint64_t minSize() const; + virtual uint64_t currentSize() const; + virtual uint64_t currentCount() const; + virtual int64_t maxObjectSize() const; + virtual void getStats(StoreInfoStats &stats) const; + virtual void stat(StoreEntry &) const; + virtual StoreSearch *search(String const url, HttpRequest *); + virtual void reference(StoreEntry &); + virtual bool dereference(StoreEntry &, bool); + virtual void maintain(); + + static int64_t EntryLimit(); + +protected: + bool copyToShm(const StoreEntry &e, const sfileno index, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod); + + // Ipc::StoreMapCleaner API + virtual void noteFreeMapSlice(const sfileno sliceId); + +private: + TransientsMap *map; ///< index of mem-cached entries +}; + +// TODO: Why use Store as a base? We are not really a cache. + +#endif /* SQUID_MEMSTORE_H */ diff --git a/src/client_side_reply.cc b/src/client_side_reply.cc index c747e671f0..de278d7da9 100644 --- a/src/client_side_reply.cc +++ b/src/client_side_reply.cc @@ -2179,6 +2179,15 @@ clientReplyContext::createStoreEntry(const HttpRequestMethod& m, RequestFlags re StoreEntry *e = storeCreateEntry(http->uri, http->log_uri, reqFlags, m); + // Make entry collapsable ASAP, to increase collapsing chances for others. + // TODO: why is !.needValidation required here? + if (Config.onoff.collapsed_forwarding && reqFlags.cachable && + !reqFlags.needValidation && + (m == Http::METHOD_GET || m == Http::METHOD_HEAD)) { + // make the entry available for future requests now + Store::Root().allowCollapsing(e, reqFlags, m); + } + sc = storeClientListAdd(e, this); #if USE_DELAY_POOLS @@ -2200,15 +2209,6 @@ clientReplyContext::createStoreEntry(const HttpRequestMethod& m, RequestFlags re /* So, we mark the store logic as complete */ flags.storelogiccomplete = 1; - // TODO: why is !.needValidation required here? - if (Config.onoff.collapsed_forwarding && reqFlags.cachable && - !reqFlags.needValidation && - (m == Http::METHOD_GET || m == Http::METHOD_HEAD)) { - // make the entry available to others - debugs(88, 3, "allow collapsing: " << *e); - e->makePublic(); - } - /* and get the caller to request a read, from whereever they are */ /* NOTE: after ANY data flows down the pipe, even one step, * this function CAN NOT be used to manage errors diff --git a/src/store_dir.cc b/src/store_dir.cc index 002da183f9..3cc33763fa 100644 --- a/src/store_dir.cc +++ b/src/store_dir.cc @@ -46,6 +46,7 @@ #include "SwapDir.h" #include "swap_log_op.h" #include "tools.h" +#include "Transients.h" #if HAVE_STATVFS #if HAVE_SYS_STATVFS_H @@ -83,12 +84,13 @@ static STDIRSELECT storeDirSelectSwapDirLeastLoad; int StoreController::store_dirs_rebuilding = 1; StoreController::StoreController() : swapDir (new StoreHashIndex()) - , memStore(NULL) + , memStore(NULL), transients(NULL) {} StoreController::~StoreController() { delete memStore; + delete transients; } /* @@ -114,6 +116,11 @@ StoreController::init() storeDirSelectSwapDir = storeDirSelectSwapDirLeastLoad; debugs(47, DBG_IMPORTANT, "Using Least Load store dir selection"); } + + if (UsingSmp() && IamWorkerProcess() && Config.onoff.collapsed_forwarding) { + transients = new Transients; + transients->init(); + } } void @@ -781,6 +788,17 @@ StoreController::get(const cache_key *key) debugs(20, 4, HERE << "none of " << Config.cacheSwap.n_configured << " cache_dirs have " << storeKeyText(key)); + + // Last, check shared in-transit table if enabled. + // We speculate that collapsed forwarding hits are less frequent than + // proper cache hits checked above (the order does not matter for misses). + if (transients) { + if (StoreEntry *e = transients->get(key)) { + debugs(20, 3, "got shared in-transit entry: " << *e); + return e; + } + } + return NULL; } @@ -862,6 +880,17 @@ StoreController::handleIdleEntry(StoreEntry &e) } } +void +StoreController::allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, + const HttpRequestMethod &reqMethod) +{ + e->makePublic(); // this is needed for both local and SMP collapsing + if (transients) + transients->put(e, reqFlags, reqMethod); + debugs(20, 3, "may " << (transients ? "SMP" : "") << " collapse " << *e); +} + + StoreHashIndex::StoreHashIndex() { if (store_table) -- 2.39.2