From: Alex Rousskov Date: Tue, 25 Jun 2013 16:06:37 +0000 (-0600) Subject: Various fixes related to overlapping and collapsed entry caching. X-Git-Tag: SQUID_3_5_0_1~444^2~48 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=99921d9d8301ad62d0a1075c556d2cb14d8147dd;p=thirdparty%2Fsquid.git Various fixes related to overlapping and collapsed entry caching. Wrote Transients description, replacing an irrelevant copy-pasted comment. Maintain proper transient entry locks, distinguishing reading and writing cases. Fixed transients synchronization logic. Store::get() must not return incomplete from-cache entries, except for local or transient ones. Otherwise, the returned entry will not be updated when its remote writer makes changes. Marked entries fully loaded from the shared memory cache as STORE_OK. Avoid caching ENTRY_SPECIAL in the shared memory cache for now. This is not strictly necessary, I think, but it simplifies shared caching log when triaging start-test-analyze test cases. The restriction can be removed when ENTRY_SPECIAL generation code becomes shared cache-aware, for example. Fixed copy-paste error in Transients::disconnect(). Changed CollapsedForwarding::Broadcast() profile in preparation for excluding broadcasts for entries without remote readers. Do not purge entire cache entries just because we have to trim their RAM footprint. The old code assumed that non-swappable entries may not have any other stored content (which is no longer correct because they may still reside in the shared memory cache) so it almost made sense to purge them, but it is possible for clients to use partial in-RAM data when serving range requests, so we should not be purging unless there are other reasons to do that. This may expose client-side bugs if the hit validation code is not checking for RAM entries being incomplete. Allow MemObject::trimUnSwappable() to be called when there is nothing to trim. This used to be a special case in StoreEntry::trimMemory(), but we do not need it anymore after the above change. Added transient and shared memory indexes to StoreEntry debugging summaries. --- diff --git a/src/CollapsedForwarding.cc b/src/CollapsedForwarding.cc index c501393693..629327d546 100644 --- a/src/CollapsedForwarding.cc +++ b/src/CollapsedForwarding.cc @@ -45,16 +45,17 @@ CollapsedForwarding::Init() } void -CollapsedForwarding::Broadcast(const cache_key *key) +CollapsedForwarding::Broadcast(const StoreEntry &e) { if (!queue.get()) return; CollapsedForwardingMsg msg; msg.sender = KidIdentifier; - memcpy(msg.key, key, sizeof(msg.key)); + memcpy(msg.key, e.key, sizeof(msg.key)); - debugs(17, 5, storeKeyText(key) << " to " << Config.workers << "-1 workers"); + debugs(17, 5, storeKeyText(static_cast(e.key)) << " to " << + Config.workers << "-1 workers"); // TODO: send only to workers who are waiting for data for (int workerId = 1; workerId <= Config.workers; ++workerId) { diff --git a/src/CollapsedForwarding.h b/src/CollapsedForwarding.h index f5e42757fb..88c4745126 100644 --- a/src/CollapsedForwarding.h +++ b/src/CollapsedForwarding.h @@ -12,7 +12,7 @@ #include -class StoreIOState; +class StoreEntry; /// Sends and handles collapsed forwarding notifications. class CollapsedForwarding @@ -22,7 +22,7 @@ public: static void Init(); /// notify other workers about changes in entry state (e.g., new data) - static void Broadcast(const cache_key *key); + static void Broadcast(const StoreEntry &e); /// kick worker with empty IPC queue static void Notify(const int workerId); diff --git a/src/MemObject.cc b/src/MemObject.cc index 0902ff820b..219c447ec3 100644 --- a/src/MemObject.cc +++ b/src/MemObject.cc @@ -421,11 +421,11 @@ MemObject::trimSwappable() void MemObject::trimUnSwappable() { - int64_t new_mem_lo = policyLowestOffsetToKeep(0); - assert (new_mem_lo > 0); - - data_hdr.freeDataUpto(new_mem_lo); - inmem_lo = new_mem_lo; + if (const int64_t new_mem_lo = policyLowestOffsetToKeep(false)) { + assert (new_mem_lo > 0); + data_hdr.freeDataUpto(new_mem_lo); + inmem_lo = new_mem_lo; + } // else we should not trim anything at this time } bool diff --git a/src/MemObject.h b/src/MemObject.h index c35d602aa7..36d3390c3d 100644 --- a/src/MemObject.h +++ b/src/MemObject.h @@ -139,12 +139,16 @@ public: SwapOut swapout; + /// cache "I/O" direction and status + typedef enum { ioUndecided, ioWriting, ioReading, ioDone } Io; + /// State of an entry with regards to the [shared] in-transit table. class XitTable { public: - XitTable(): index(-1) {} + XitTable(): index(-1), io(ioUndecided) {} int32_t index; ///< entry position inside the in-transit table + Io io; ///< current I/O state }; XitTable xitTable; ///< current [shared] memory caching state for the entry @@ -156,8 +160,6 @@ public: int32_t index; ///< entry position inside the memory cache int64_t offset; ///< bytes written/read to/from the memory cache so far - /// I/O direction and status - typedef enum { ioUndecided, ioWriting, ioReading, ioDone } Io; Io io; ///< current I/O state }; MemCache memCache; ///< current [shared] memory caching state for the entry diff --git a/src/MemStore.cc b/src/MemStore.cc index 9c3d1c081e..cc1aa0b57d 100644 --- a/src/MemStore.cc +++ b/src/MemStore.cc @@ -200,11 +200,6 @@ MemStore::get(const cache_key *key) const bool copied = copyFromShm(*e, index, *slot); - // we copied everything we could to local memory; no more need to lock - map->closeForReading(index); - e->mem_obj->memCache.index = -1; - e->mem_obj->memCache.io = MemObject::MemCache::ioDone; - if (copied) { e->hashInsert(key); return e; @@ -301,7 +296,7 @@ MemStore::anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc MemObject::MemCache &mc = e.mem_obj->memCache; mc.index = index; - mc.io = MemObject::MemCache::ioReading; + mc.io = MemObject::ioReading; } /// copies the entire entry from shared to local memory @@ -363,14 +358,19 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc return true; } - e.mem_obj->object_sz = e.mem_obj->endOffset(); // from StoreEntry::complete() debugs(20, 7, "mem-loaded all " << e.mem_obj->object_sz << '/' << anchor.basics.swap_file_sz << " bytes of " << e); + + // from StoreEntry::complete() + e.mem_obj->object_sz = e.mem_obj->endOffset(); + e.store_status = STORE_OK; + assert(e.mem_obj->object_sz >= 0); assert(static_cast(e.mem_obj->object_sz) == anchor.basics.swap_file_sz); // would be nice to call validLength() here, but it needs e.key - // XXX: unlock acnhor here! + // we read the entire response into the local memory; no more need to lock + disconnect(*e.mem_obj); return true; } @@ -454,6 +454,11 @@ MemStore::shouldCache(const StoreEntry &e) const return false; } + if (EBIT_TEST(e.flags, ENTRY_SPECIAL)) { + debugs(20, 5, HERE << "Not mem-caching ENTRY_SPECIAL " << e); + return false; + } + return true; } @@ -470,7 +475,7 @@ MemStore::startCaching(StoreEntry &e) assert(e.mem_obj); e.mem_obj->memCache.index = index; - e.mem_obj->memCache.io = MemObject::MemCache::ioWriting; + e.mem_obj->memCache.io = MemObject::ioWriting; slot->set(e); map->startAppending(index); return true; @@ -634,20 +639,20 @@ MemStore::write(StoreEntry &e) debugs(20, 7, "entry " << e); switch (e.mem_obj->memCache.io) { - case MemObject::MemCache::ioUndecided: + case MemObject::ioUndecided: if (!shouldCache(e) || !startCaching(e)) { - e.mem_obj->memCache.io = MemObject::MemCache::ioDone; + e.mem_obj->memCache.io = MemObject::ioDone; Store::Root().transientsAbandon(e); - CollapsedForwarding::Broadcast(static_cast(e.key)); + CollapsedForwarding::Broadcast(e); return; } break; - case MemObject::MemCache::ioDone: - case MemObject::MemCache::ioReading: + case MemObject::ioDone: + case MemObject::ioReading: return; // we should not write in all of the above cases - case MemObject::MemCache::ioWriting: + case MemObject::ioWriting: break; // already decided to write and still writing } @@ -655,7 +660,8 @@ MemStore::write(StoreEntry &e) copyToShm(e); if (e.store_status == STORE_OK) // done receiving new content completeWriting(e); - CollapsedForwarding::Broadcast(static_cast(e.key)); + else + CollapsedForwarding::Broadcast(e); return; } catch (const std::exception &x) { // TODO: should we catch ... as well? @@ -665,7 +671,7 @@ MemStore::write(StoreEntry &e) Store::Root().transientsAbandon(e); disconnect(*e.mem_obj); - CollapsedForwarding::Broadcast(static_cast(e.key)); + CollapsedForwarding::Broadcast(e); } void @@ -679,8 +685,11 @@ MemStore::completeWriting(StoreEntry &e) debugs(20, 5, "mem-cached all " << e.mem_obj->memCache.offset << " bytes of " << e); e.mem_obj->memCache.index = -1; - e.mem_obj->memCache.io = MemObject::MemCache::ioDone; + e.mem_obj->memCache.io = MemObject::ioDone; map->closeForWriting(index, false); + + CollapsedForwarding::Broadcast(e); // before we close our transient entry! + Store::Root().transientsCompleteWriting(e); } void @@ -703,14 +712,14 @@ void MemStore::disconnect(MemObject &mem_obj) { if (mem_obj.memCache.index >= 0) { - if (mem_obj.memCache.io == MemObject::MemCache::ioWriting) { + if (mem_obj.memCache.io == MemObject::ioWriting) { map->abortWriting(mem_obj.memCache.index); } else { - assert(mem_obj.memCache.io == MemObject::MemCache::ioReading); + assert(mem_obj.memCache.io == MemObject::ioReading); map->closeForReading(mem_obj.memCache.index); } mem_obj.memCache.index = -1; - mem_obj.memCache.io = MemObject::MemCache::ioDone; + mem_obj.memCache.io = MemObject::ioDone; } } diff --git a/src/Store.h b/src/Store.h index 27b2df58da..d2131fe388 100644 --- a/src/Store.h +++ b/src/Store.h @@ -377,6 +377,11 @@ public: /// makes the entry available for collapsing future requests virtual void allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod) {} + // XXX: This method belongs to Store::Root/StoreController, but it is here + // to avoid casting Root() to StoreController until Root() API is fixed. + /// marks the entry completed for collapsed requests + virtual void transientsCompleteWriting(StoreEntry &e) {} + // XXX: This method belongs to Store::Root/StoreController, but it is here // to avoid casting Root() to StoreController until Root() API is fixed. /// Update local intransit entry after changes made by appending worker. diff --git a/src/SwapDir.h b/src/SwapDir.h index 9e1c891084..fc2ed8f749 100644 --- a/src/SwapDir.h +++ b/src/SwapDir.h @@ -62,6 +62,7 @@ public: /* Store parent API */ virtual void handleIdleEntry(StoreEntry &e); + virtual void transientsCompleteWriting(StoreEntry &e); virtual void transientsAbandon(StoreEntry &e); virtual void transientsDisconnect(MemObject &mem_obj); virtual void memoryOut(StoreEntry &e, const bool preserveSwappable); @@ -101,6 +102,7 @@ public: private: void createOneStore(Store &aStore); bool keepForLocalMemoryCache(const StoreEntry &e) const; + bool anchorCollapsed(StoreEntry &collapsed, bool &inSync); bool anchorCollapsedOnDisk(StoreEntry &collapsed, bool &inSync); StorePointer swapDir; ///< summary view of all disk caches diff --git a/src/Transients.cc b/src/Transients.cc index f891430aa3..340f2ee599 100644 --- a/src/Transients.cc +++ b/src/Transients.cc @@ -177,6 +177,7 @@ Transients::copyFromShm(const sfileno index) assert(e->mem_obj); e->mem_obj->method = extras.reqMethod; + e->mem_obj->xitTable.io = MemObject::ioReading; e->mem_obj->xitTable.index = index; // XXX: overwriting storeCreateEntry() which calls setPrivateKey() if @@ -201,7 +202,7 @@ Transients::get(String const key, STOREGETCLIENT aCallback, void *aCallbackData) } void -Transients::put(StoreEntry *e, const RequestFlags &reqFlags, +Transients::startWriting(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod) { assert(e); @@ -223,6 +224,7 @@ Transients::put(StoreEntry *e, const RequestFlags &reqFlags, try { if (copyToShm(*e, index, reqFlags, reqMethod)) { slot->set(*e); + e->mem_obj->xitTable.io = MemObject::ioWriting; e->mem_obj->xitTable.index = index; map->startAppending(index); // keep write lock -- we will be supplying others with updates @@ -293,15 +295,31 @@ Transients::abandonedAt(const sfileno index) const return map->readableEntry(index).waitingToBeFreed; } +void +Transients::completeWriting(const StoreEntry &e) +{ + if (e.mem_obj && e.mem_obj->xitTable.index >= 0) { + assert(e.mem_obj->xitTable.io == MemObject::ioWriting); + map->closeForWriting(e.mem_obj->xitTable.index); + e.mem_obj->xitTable.index = -1; + e.mem_obj->xitTable.io = MemObject::ioDone; + } +} + void Transients::disconnect(MemObject &mem_obj) { - assert(mem_obj.xitTable.index >= 0 && map); - map->freeEntry(mem_obj.xitTable.index); // just marks the locked entry - mem_obj.xitTable.index = -1; - // We do not unlock the entry now because the problem is most likely with - // the server resource rather than a specific cache writer, so we want to - // prevent other readers from collapsing requests for that resource. + if (mem_obj.xitTable.index >= 0) { + assert(map); + if (mem_obj.xitTable.io == MemObject::ioWriting) { + map->abortWriting(mem_obj.xitTable.index); + } else { + assert(mem_obj.xitTable.io == MemObject::ioReading); + map->closeForReading(mem_obj.xitTable.index); + } + mem_obj.xitTable.index = -1; + mem_obj.xitTable.io = MemObject::ioDone; + } } /// calculates maximum number of entries we need to store and map diff --git a/src/Transients.h b/src/Transients.h index 29a40fe349..12d784d2b5 100644 --- a/src/Transients.h +++ b/src/Transients.h @@ -15,8 +15,9 @@ struct TransientsMapExtras { }; typedef Ipc::StoreMapWithExtras TransientsMap; -/// Stores HTTP entities in RAM. Current implementation uses shared memory. -/// Unlike a disk store (SwapDir), operations are synchronous (and fast). +/// Keeps track of hits being delivered to clients that arrived before those +/// hits were [fully] cached. This shared table is necessary to synchronize hit +/// caching (writing) workers with other workers serving (reading) those hits. class Transients: public Store, public Ipc::StoreMapCleaner { public: @@ -24,7 +25,10 @@ public: virtual ~Transients(); /// add an in-transit entry suitable for collapsing future requests - void put(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod); + void startWriting(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod); + + /// called when the in-transit entry has been successfully cached + void completeWriting(const StoreEntry &e); /// the calling entry writer no longer expects to cache this entry void abandon(const StoreEntry &e); @@ -32,7 +36,7 @@ public: /// whether an in-transit entry is now abandoned by its writer bool abandoned(const StoreEntry &e) const; - /// the calling entry writer no longer expects to cache this entry + /// the caller is done writing or reading this entry void disconnect(MemObject &mem_obj); /* Store API */ diff --git a/src/fs/rock/RockSwapDir.cc b/src/fs/rock/RockSwapDir.cc index 9eadf82616..60f41c7271 100644 --- a/src/fs/rock/RockSwapDir.cc +++ b/src/fs/rock/RockSwapDir.cc @@ -821,7 +821,7 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest // and hope that Core will call disconnect() to close the map entry } - CollapsedForwarding::Broadcast(static_cast(sio.e->key)); + CollapsedForwarding::Broadcast(*sio.e); } void diff --git a/src/store.cc b/src/store.cc index 321fecc30b..d4bbef8d9e 100644 --- a/src/store.cc +++ b/src/store.cc @@ -1848,21 +1848,12 @@ StoreEntry::trimMemory(const bool preserveSwappable) if (EBIT_TEST(flags, ENTRY_SPECIAL)) return; // cannot trim because we do not load them again - if (!preserveSwappable) { - if (mem_obj->policyLowestOffsetToKeep(0) == 0) { - /* Nothing to do */ - return; - } - /* - * Its not swap-able, and we're about to delete a chunk, - * so we must make it PRIVATE. This is tricky/ugly because - * for the most part, we treat swapable == cachable here. - */ - releaseRequest(); - mem_obj->trimUnSwappable (); - } else { - mem_obj->trimSwappable (); - } + if (preserveSwappable) + mem_obj->trimSwappable(); + else + mem_obj->trimUnSwappable(); + + debugs(88, 7, *this << " inmem_lo=" << mem_obj->inmem_lo); } bool @@ -1986,8 +1977,14 @@ std::ostream &operator <<(std::ostream &os, const StoreEntry &e) { os << "e:"; + if (e.mem_obj) { + if (e.mem_obj->xitTable.index > -1) + os << 't' << e.mem_obj->xitTable.index; + if (e.mem_obj->memCache.index > -1) + os << 'm' << e.mem_obj->memCache.index; + } if (e.swap_filen > -1 || e.swap_dirn > -1) - os << e.swap_filen << '@' << e.swap_dirn; + os << 'd' << e.swap_filen << '@' << e.swap_dirn; os << '='; @@ -2006,7 +2003,7 @@ std::ostream &operator <<(std::ostream &os, const StoreEntry &e) if (e.flags) { if (EBIT_TEST(e.flags, ENTRY_SPECIAL)) os << 'S'; if (EBIT_TEST(e.flags, ENTRY_REVALIDATE)) os << 'R'; - if (EBIT_TEST(e.flags, DELAY_SENDING)) os << 'T'; + if (EBIT_TEST(e.flags, DELAY_SENDING)) os << 'P'; if (EBIT_TEST(e.flags, RELEASE_REQUEST)) os << 'X'; if (EBIT_TEST(e.flags, REFRESH_REQUEST)) os << 'F'; if (EBIT_TEST(e.flags, ENTRY_CACHABLE)) os << 'C'; diff --git a/src/store_dir.cc b/src/store_dir.cc index 0835d80f28..f1197c2ca3 100644 --- a/src/store_dir.cc +++ b/src/store_dir.cc @@ -761,6 +761,20 @@ StoreController::get(const cache_key *key) return e; } + // Must search transients before caches because we must sync those we find. + if (transients) { + if (StoreEntry *e = transients->get(key)) { + debugs(20, 3, "got shared in-transit entry: " << *e); + bool inSync = false; + const bool found = anchorCollapsed(*e, inSync); + if (!found || inSync) + return e; + assert(!e->locked()); // ensure release will destroyStoreEntry() + e->release(); // do not let others into the same trap + return NULL; + } + } + if (memStore) { if (StoreEntry *e = memStore->get(key)) { debugs(20, 3, HERE << "got mem-cached entry: " << *e); @@ -792,16 +806,6 @@ StoreController::get(const cache_key *key) debugs(20, 4, HERE << "none of " << Config.cacheSwap.n_configured << " cache_dirs have " << storeKeyText(key)); - // Last, check shared in-transit table if enabled. - // We speculate that collapsed forwarding hits are less frequent than - // proper cache hits checked above (the order does not matter for misses). - if (transients) { - if (StoreEntry *e = transients->get(key)) { - debugs(20, 3, "got shared in-transit entry: " << *e); - return e; - } - } - return NULL; } @@ -904,6 +908,16 @@ StoreController::transientsAbandon(StoreEntry &e) } } +void +StoreController::transientsCompleteWriting(StoreEntry &e) +{ + if (transients) { + assert(e.mem_obj); + if (e.mem_obj->xitTable.index >= 0) + transients->completeWriting(e); + } +} + void StoreController::transientsDisconnect(MemObject &mem_obj) { @@ -954,7 +968,7 @@ StoreController::allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, { e->makePublic(); // this is needed for both local and SMP collapsing if (transients) - transients->put(e, reqFlags, reqMethod); + transients->startWriting(e, reqFlags, reqMethod); debugs(20, 3, "may " << (transients && e->mem_obj->xitTable.index >= 0 ? "SMP-" : "locally-") << "collapse " << *e); } @@ -963,15 +977,34 @@ void StoreController::syncCollapsed(const cache_key *key) { StoreEntry *collapsed = swapDir->get(key); - if (!collapsed) // the entry is no longer locally active, ignore the update + if (!collapsed) { // the entry is no longer locally active, ignore update + debugs(20, 7, "not SMP-syncing not-local " << storeKeyText(key)); return; + } + + if (!collapsed->mem_obj) { + // without mem_obj, the entry cannot be transient so we ignore it + debugs(20, 7, "not SMP-syncing not-shared " << *collapsed); + return; + } + + if (collapsed->mem_obj->xitTable.index < 0) { + debugs(20, 7, "not SMP-syncing not-transient " << *collapsed); + return; + } + + assert(transients); + if (transients->abandoned(*collapsed)) { + debugs(20, 3, "aborting abandoned " << *collapsed); + collapsed->abort(); + return; + } - if (collapsed->mem_obj && !collapsed->mem_obj->smpCollapsed) { + if (!collapsed->mem_obj->smpCollapsed) { // this happens, e.g., when we tried collapsing but rejected the hit - debugs(20, 7, "not SMP-syncing not SMP-collapsed " << *collapsed); + debugs(20, 7, "not SMP-syncing not-SMP-collapsed " << *collapsed); return; } - // XXX: collapsed->mem_obj may be still hidden here debugs(20, 7, "syncing " << *collapsed); @@ -984,23 +1017,51 @@ StoreController::syncCollapsed(const cache_key *key) found = true; inSync = collapsed->store()->updateCollapsed(*collapsed); } else { - if (memStore) - found = memStore->anchorCollapsed(*collapsed, inSync); - else if (Config.cacheSwap.n_configured) - found = anchorCollapsedOnDisk(*collapsed, inSync); + found = anchorCollapsed(*collapsed, inSync); } if (inSync) { debugs(20, 5, "synced " << *collapsed); collapsed->invokeHandlers(); } else if (found) { // unrecoverable problem syncing this entry - debugs(20, 3, "aborting " << *collapsed); + debugs(20, 3, "aborting unsyncable " << *collapsed); collapsed->abort(); } else { // the entry is still not in one of the caches debugs(20, 7, "waiting " << *collapsed); } } +/// Called for in-transit entries that are not yet anchored to a cache. +/// For cached entries, return true after synchronizing them with their cache +/// (making inSync true on success). For not-yet-cached entries, return false. +bool +StoreController::anchorCollapsed(StoreEntry &collapsed, bool &inSync) +{ + // this method is designed to work with collapsed transients only + assert(collapsed.mem_obj); + assert(collapsed.mem_obj->xitTable.index >= 0); + assert(collapsed.mem_obj->smpCollapsed); + + debugs(20, 7, "anchoring " << collapsed); + + bool found = false; + if (memStore) + found = memStore->anchorCollapsed(collapsed, inSync); + else if (Config.cacheSwap.n_configured) + found = anchorCollapsedOnDisk(collapsed, inSync); + + if (found) { + if (inSync) + debugs(20, 7, "anchored " << collapsed); + else + debugs(20, 5, "failed to anchor " << collapsed); + } else { + debugs(20, 7, "skipping not yet cached " << collapsed); + } + + return found; +} + StoreHashIndex::StoreHashIndex() { if (store_table)