From: Alex Rousskov Date: Mon, 1 Jul 2013 02:25:50 +0000 (-0600) Subject: Several fixes and improvements to help collapsed forwarding work reliably: X-Git-Tag: SQUID_3_5_0_1~444^2~42 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=6919be24f39ca407578092aed8fcd23b744f5c3c;p=thirdparty%2Fsquid.git Several fixes and improvements to help collapsed forwarding work reliably: Removed ENTRY_CACHABLE. AFAICT, it was just negating RELEASE_REQUEST AFAICT. Broadcast transients index instead of key because key may become private. Squid uses private keys to resolve store_table collisions (among other things). Thus, a public entry may become private at any time, at any worker. Using keys results in collapsed entries getting stuck waiting for an update. The transients index remains constant and can be used for reliable synchronization. Using transient index, however, requires storing a pointer to the transient entry corresponding to that index. Otherwise, there is no API to find the entry object when a notification comes: Store::Root().get() needs a key. Mark an entry for release when setting its key from public to private. The old code was only logging SWAP_LOG_DEL, but we now need to prevent requests in other workers from collapsing on top of a now-private cache entry. In many cases, such an entry is in trouble (but not all cases because private keys are also used for store_table collision resolution). Fixed syncing of abandoned entries. Prevent new requests from collapsing on writer-less transient entries. --- diff --git a/src/CollapsedForwarding.cc b/src/CollapsedForwarding.cc index 6cd2380170..e743fb4d19 100644 --- a/src/CollapsedForwarding.cc +++ b/src/CollapsedForwarding.cc @@ -28,11 +28,13 @@ std::auto_ptr CollapsedForwarding::queue; class CollapsedForwardingMsg { public: - CollapsedForwardingMsg(): sender(-1) { key[0] = key[1] = 0; } + CollapsedForwardingMsg(): sender(-1), xitIndex(-1) {} public: - int sender; /// kid ID of sending process - uint64_t key[2]; ///< StoreEntry key + int sender; ///< kid ID of sending process + + /// transients index, so that workers can find [private] entries to sync + sfileno xitIndex; }; // CollapsedForwarding @@ -59,10 +61,9 @@ CollapsedForwarding::Broadcast(const StoreEntry &e) CollapsedForwardingMsg msg; msg.sender = KidIdentifier; - memcpy(msg.key, e.key, sizeof(msg.key)); + msg.xitIndex = e.mem_obj->xitTable.index; - debugs(17, 5, storeKeyText(static_cast(e.key)) << " to " << - Config.workers << "-1 workers"); + debugs(17, 5, e << " to " << Config.workers << "-1 workers"); // TODO: send only to workers who are waiting for data for (int workerId = 1; workerId <= Config.workers; ++workerId) { @@ -105,10 +106,9 @@ CollapsedForwarding::HandleNewData(const char *const when) " != " << msg.sender); } - const cache_key *key = reinterpret_cast(msg.key); - debugs(17, 7, "hadling " << storeKeyText(key)); - Store::Root().syncCollapsed(key); - debugs(17, 7, "handled " << storeKeyText(key)); + debugs(17, 7, "handling entry " << msg.xitIndex << " in transients_map"); + Store::Root().syncCollapsed(msg.xitIndex); + debugs(17, 7, "handled entry " << msg.xitIndex << " in transients_map"); // XXX: stop and schedule an async call to continue assert(++poppedCount < SQUID_MAXFD); diff --git a/src/MemStore.cc b/src/MemStore.cc index c3b18ecc1f..bd52388303 100644 --- a/src/MemStore.cc +++ b/src/MemStore.cc @@ -288,7 +288,6 @@ MemStore::anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc assert(e.swap_status == SWAPOUT_NONE); // set in StoreEntry constructor e.ping_status = PING_NONE; - EBIT_SET(e.flags, ENTRY_CACHABLE); EBIT_CLR(e.flags, RELEASE_REQUEST); EBIT_CLR(e.flags, KEY_PRIVATE); EBIT_SET(e.flags, ENTRY_VALIDATED); diff --git a/src/Store.h b/src/Store.h index 446894d3b3..4ce61544f0 100644 --- a/src/Store.h +++ b/src/Store.h @@ -385,7 +385,7 @@ public: // XXX: This method belongs to Store::Root/StoreController, but it is here // to avoid casting Root() to StoreController until Root() API is fixed. /// Update local intransit entry after changes made by appending worker. - virtual void syncCollapsed(const cache_key *key) {} + virtual void syncCollapsed(const sfileno xitIndex) {} // XXX: This method belongs to Store::Root/StoreController, but it is here // to avoid casting Root() to StoreController until Root() API is fixed. diff --git a/src/SwapDir.h b/src/SwapDir.h index fa08136f05..1dcae1198f 100644 --- a/src/SwapDir.h +++ b/src/SwapDir.h @@ -71,7 +71,7 @@ public: virtual void memoryUnlink(StoreEntry &e); virtual void memoryDisconnect(MemObject &mem_obj); virtual void allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod); - virtual void syncCollapsed(const cache_key *key); + virtual void syncCollapsed(const sfileno xitIndex); virtual void init(); diff --git a/src/Transients.cc b/src/Transients.cc index 10a2480ec3..5dfaabc405 100644 --- a/src/Transients.cc +++ b/src/Transients.cc @@ -26,13 +26,14 @@ static const char *MapLabel = "transients_map"; -Transients::Transients(): map(NULL) +Transients::Transients(): map(NULL), locals(NULL) { } Transients::~Transients() { delete map; + delete locals; } void @@ -45,6 +46,8 @@ Transients::init() Must(!map); map = new TransientsMap(MapLabel); map->cleaner = this; + + locals = new Locals(entryLimit, NULL); } void @@ -157,15 +160,18 @@ Transients::get(const cache_key *key) if (!anchor) return NULL; - // Without a writer, either the response has been cached already or we will - // get stuck waiting for it to be cached (because nobody will cache it). - if (!anchor->writing()) { - debugs(20, 5, "ignoring writer-less entry " << index); - } else if (StoreEntry *e = copyFromShm(index)) { - return e; // keep read lock to receive updates from others + // If we already have a local entry, the store_table should have found it. + // Since it did not, the local entry key must have changed from public to + // private. We still need to keep the private entry around for syncing as + // its clients depend on it, but we should not allow new clients to join. + if (StoreEntry *oldE = locals->at(index)) { + debugs(20, 3, "not joining private " << *oldE); + assert(EBIT_TEST(oldE->flags, KEY_PRIVATE)); + } else if (StoreEntry *newE = copyFromShm(index)) { + return newE; // keep read lock to receive updates from others } - // missing writer or loading failure + // private entry or loading failure map->closeForReading(index); return NULL; } @@ -192,6 +198,10 @@ Transients::copyFromShm(const sfileno index) // TODO: Can we remove smpCollapsed by not syncing non-transient entries? e->mem_obj->smpCollapsed = true; + assert(!locals->at(index)); + // We do not lock e because we do not want to prevent its destruction; + // e is tied to us via mem_obj so we will know when it is destructed. + locals->at(index) = e; return e; } @@ -202,6 +212,22 @@ Transients::get(String const key, STOREGETCLIENT aCallback, void *aCallbackData) fatal("Transients::get(key,callback,data) should not be called"); } +StoreEntry * +Transients::findCollapsed(const sfileno index) +{ + if (!map) + return NULL; + + if (StoreEntry *oldE = locals->at(index)) { + debugs(20, 5, "found " << *oldE << " at " << index << " in " << MapLabel); + assert(oldE->mem_obj && oldE->mem_obj->xitTable.index == index); + return oldE; + } + + debugs(20, 3, "no entry at " << index << " in " << MapLabel); + return NULL; +} + void Transients::startWriting(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod) @@ -302,6 +328,9 @@ Transients::completeWriting(const StoreEntry &e) { if (e.mem_obj && e.mem_obj->xitTable.index >= 0) { assert(e.mem_obj->xitTable.io == MemObject::ioWriting); + // there will be no more updates from us after this, so we must prevent + // future readers from joining + map->freeEntry(e.mem_obj->xitTable.index); // just marks the locked entry map->closeForWriting(e.mem_obj->xitTable.index); e.mem_obj->xitTable.index = -1; e.mem_obj->xitTable.io = MemObject::ioDone; @@ -336,6 +365,7 @@ Transients::disconnect(MemObject &mem_obj) assert(mem_obj.xitTable.io == MemObject::ioReading); map->closeForReading(mem_obj.xitTable.index); } + locals->at(mem_obj.xitTable.index) = NULL; mem_obj.xitTable.index = -1; mem_obj.xitTable.io = MemObject::ioDone; } diff --git a/src/Transients.h b/src/Transients.h index 0204853015..307db40b66 100644 --- a/src/Transients.h +++ b/src/Transients.h @@ -6,6 +6,7 @@ #include "ipc/mem/PageStack.h" #include "ipc/StoreMap.h" #include "Store.h" +#include // StoreEntry restoration info not already stored by Ipc::StoreMap struct TransientsMapExtras { @@ -24,6 +25,9 @@ public: Transients(); virtual ~Transients(); + /// return a local, previously collapsed entry + StoreEntry *findCollapsed(const sfileno xitIndex); + /// add an in-transit entry suitable for collapsing future requests void startWriting(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod); @@ -72,7 +76,12 @@ protected: virtual void noteFreeMapSlice(const sfileno sliceId); private: - TransientsMap *map; ///< index of mem-cached entries + /// shared packed info indexed by Store keys, for creating new StoreEntries + TransientsMap *map; + + typedef std::vector Locals; + /// local collapsed entries indexed by transient ID, for syncing old StoreEntries + Locals *locals; }; // TODO: Why use Store as a base? We are not really a cache. diff --git a/src/enums.h b/src/enums.h index 01310216b6..c810cdf0f6 100644 --- a/src/enums.h +++ b/src/enums.h @@ -122,7 +122,7 @@ enum { DELAY_SENDING, RELEASE_REQUEST, REFRESH_REQUEST, - ENTRY_CACHABLE, + ENTRY_CACHABLE_RESERVED_FOR_FUTURE_USE, ENTRY_DISPATCHED, KEY_PRIVATE, ENTRY_FWD_HDR_WAIT, diff --git a/src/fs/coss/store_dir_coss.cc b/src/fs/coss/store_dir_coss.cc index 72a592ced3..dc61b00987 100644 --- a/src/fs/coss/store_dir_coss.cc +++ b/src/fs/coss/store_dir_coss.cc @@ -526,7 +526,6 @@ CossSwapDir::addDiskRestore(const cache_key *const key, e->lastmod = lastmod; e->refcount = refcount; e->flags = flags; - EBIT_SET(e->flags, ENTRY_CACHABLE); EBIT_CLR(e->flags, RELEASE_REQUEST); EBIT_CLR(e->flags, KEY_PRIVATE); e->ping_status = PING_NONE; diff --git a/src/fs/rock/RockSwapDir.cc b/src/fs/rock/RockSwapDir.cc index a992b17d6b..7cfc82a554 100644 --- a/src/fs/rock/RockSwapDir.cc +++ b/src/fs/rock/RockSwapDir.cc @@ -138,7 +138,6 @@ Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreM e.swap_status = SWAPOUT_DONE; e.ping_status = PING_NONE; - EBIT_SET(e.flags, ENTRY_CACHABLE); EBIT_CLR(e.flags, RELEASE_REQUEST); EBIT_CLR(e.flags, KEY_PRIVATE); EBIT_SET(e.flags, ENTRY_VALIDATED); diff --git a/src/fs/ufs/UFSSwapDir.cc b/src/fs/ufs/UFSSwapDir.cc index 599b8ad2be..2392738467 100644 --- a/src/fs/ufs/UFSSwapDir.cc +++ b/src/fs/ufs/UFSSwapDir.cc @@ -760,7 +760,6 @@ Fs::Ufs::UFSSwapDir::addDiskRestore(const cache_key * key, e->lastmod = lastmod; e->refcount = refcount; e->flags = newFlags; - EBIT_SET(e->flags, ENTRY_CACHABLE); EBIT_CLR(e->flags, RELEASE_REQUEST); EBIT_CLR(e->flags, KEY_PRIVATE); e->ping_status = PING_NONE; diff --git a/src/ftp.cc b/src/ftp.cc index 757db4e015..373899704f 100644 --- a/src/ftp.cc +++ b/src/ftp.cc @@ -3704,7 +3704,7 @@ FtpStateData::haveParsedReplyHeaders() * Authenticated requests can't be cached. */ e->release(); - } else if (EBIT_TEST(e->flags, ENTRY_CACHABLE) && !getCurrentOffset()) { + } else if (!EBIT_TEST(e->flags, RELEASE_REQUEST) && !getCurrentOffset()) { e->setPublicKey(); } else { e->release(); diff --git a/src/gopher.cc b/src/gopher.cc index 63241914ff..0cf485c2ba 100644 --- a/src/gopher.cc +++ b/src/gopher.cc @@ -945,8 +945,7 @@ gopherSendRequest(int fd, void *data) CommIoCbPtrFun(gopherSendComplete, gopherState)); Comm::Write(gopherState->serverConn, buf, strlen(buf), call, NULL); - if (EBIT_TEST(gopherState->entry->flags, ENTRY_CACHABLE)) - gopherState->entry->setPublicKey(); /* Make it public */ + gopherState->entry->makePublic(); } /// \ingroup ServerProtocolGopherInternal diff --git a/src/http.cc b/src/http.cc index a08b9de126..9fa1efb5b4 100644 --- a/src/http.cc +++ b/src/http.cc @@ -345,6 +345,11 @@ HttpStateData::cacheableReply() #define REFRESH_OVERRIDE(flag) 0 #endif + if (EBIT_TEST(entry->flags, RELEASE_REQUEST)) { + debugs(22, 3, "NO because " << *entry << " has been released."); + return 0; + } + // Check for Surrogate/1.0 protocol conditions // NP: reverse-proxy traffic our parent server has instructed us never to cache if (surrogateNoStore) { diff --git a/src/stat.cc b/src/stat.cc index 715bef8310..4ef301884b 100644 --- a/src/stat.cc +++ b/src/stat.cc @@ -320,9 +320,6 @@ storeEntryFlags(const StoreEntry * entry) if (EBIT_TEST(flags, REFRESH_REQUEST)) strcat(buf, "REFRESH_REQUEST,"); - if (EBIT_TEST(flags, ENTRY_CACHABLE)) - strcat(buf, "CACHABLE,"); - if (EBIT_TEST(flags, ENTRY_DISPATCHED)) strcat(buf, "DISPATCHED,"); diff --git a/src/store.cc b/src/store.cc index 35f483f997..d32f7c0ede 100644 --- a/src/store.cc +++ b/src/store.cc @@ -191,7 +191,7 @@ StoreEntry::makePublic() { /* This object can be cached for a long time */ - if (EBIT_TEST(flags, ENTRY_CACHABLE)) + if (!EBIT_TEST(flags, RELEASE_REQUEST)) setPublicKey(); } @@ -201,7 +201,6 @@ StoreEntry::makePrivate() /* This object should never be cached at all */ expireNow(); releaseRequest(); /* delete object when not used */ - /* releaseRequest clears ENTRY_CACHABLE flag */ } void @@ -209,9 +208,7 @@ StoreEntry::cacheNegatively() { /* This object may be negatively cached */ negativeCache(); - - if (EBIT_TEST(flags, ENTRY_CACHABLE)) - setPublicKey(); + makePublic(); } size_t @@ -523,14 +520,7 @@ StoreEntry::releaseRequest() if (EBIT_TEST(flags, RELEASE_REQUEST)) return; - setReleaseFlag(); - - /* - * Clear cachable flag here because we might get called before - * anyone else even looks at the cachability flag. Also, this - * prevents httpMakePublic from really setting a public key. - */ - EBIT_CLR(flags, ENTRY_CACHABLE); + setReleaseFlag(); // makes validToSend() false, preventing future hits setPrivateKey(); } @@ -651,6 +641,9 @@ StoreEntry::setPrivateKey() return; /* is already private */ if (key) { + setReleaseFlag(); // will markForUnlink(); all caches/workers will know + + // TODO: move into SwapDir::markForUnlink() already called by Root() if (swap_filen > -1) storeDirSwapLog(this, SWAP_LOG_DEL); @@ -686,8 +679,7 @@ StoreEntry::setPublicKey() * store clients won't be able to access object data which has * been freed from memory. * - * If RELEASE_REQUEST is set, then ENTRY_CACHABLE should not - * be set, and StoreEntry::setPublicKey() should not be called. + * If RELEASE_REQUEST is set, setPublicKey() should not be called. */ #if MORE_DEBUG_OUTPUT @@ -798,10 +790,8 @@ storeCreatePureEntry(const char *url, const char *log_url, const RequestFlags &f e->mem_obj->setUris(url, log_url, method); if (flags.cachable) { - EBIT_SET(e->flags, ENTRY_CACHABLE); EBIT_CLR(e->flags, RELEASE_REQUEST); } else { - /* StoreEntry::releaseRequest() clears ENTRY_CACHABLE */ e->releaseRequest(); } @@ -958,9 +948,9 @@ StoreEntry::checkCachable() if (store_status == STORE_OK && EBIT_TEST(flags, ENTRY_BAD_LENGTH)) { debugs(20, 2, "StoreEntry::checkCachable: NO: wrong content-length"); ++store_check_cachable_hist.no.wrong_content_length; - } else if (!EBIT_TEST(flags, ENTRY_CACHABLE)) { + } else if (EBIT_TEST(flags, RELEASE_REQUEST)) { debugs(20, 2, "StoreEntry::checkCachable: NO: not cachable"); - ++store_check_cachable_hist.no.not_entry_cachable; + ++store_check_cachable_hist.no.not_entry_cachable; // TODO: rename? } else if (EBIT_TEST(flags, ENTRY_NEGCACHED)) { debugs(20, 3, "StoreEntry::checkCachable: NO: negative cached"); ++store_check_cachable_hist.no.negative_cached; @@ -995,7 +985,6 @@ StoreEntry::checkCachable() } releaseRequest(); - /* StoreEntry::releaseRequest() cleared ENTRY_CACHABLE */ return 0; } @@ -2010,7 +1999,6 @@ std::ostream &operator <<(std::ostream &os, const StoreEntry &e) if (EBIT_TEST(e.flags, DELAY_SENDING)) os << 'P'; if (EBIT_TEST(e.flags, RELEASE_REQUEST)) os << 'X'; if (EBIT_TEST(e.flags, REFRESH_REQUEST)) os << 'F'; - if (EBIT_TEST(e.flags, ENTRY_CACHABLE)) os << 'C'; if (EBIT_TEST(e.flags, ENTRY_DISPATCHED)) os << 'D'; if (EBIT_TEST(e.flags, KEY_PRIVATE)) os << 'I'; if (EBIT_TEST(e.flags, ENTRY_FWD_HDR_WAIT)) os << 'W'; diff --git a/src/store_digest.cc b/src/store_digest.cc index f8b2cca2e0..1a636252f9 100644 --- a/src/store_digest.cc +++ b/src/store_digest.cc @@ -224,11 +224,6 @@ storeDigestAddable(const StoreEntry * e) /* check various entry flags (mimics StoreEntry::checkCachable XXX) */ - if (!EBIT_TEST(e->flags, ENTRY_CACHABLE)) { - debugs(71, 6, "storeDigestAddable: NO: not cachable"); - return 0; - } - if (EBIT_TEST(e->flags, KEY_PRIVATE)) { debugs(71, 6, "storeDigestAddable: NO: private key"); return 0; diff --git a/src/store_dir.cc b/src/store_dir.cc index 74e336dcef..b362896ad9 100644 --- a/src/store_dir.cc +++ b/src/store_dir.cc @@ -1001,43 +1001,21 @@ StoreController::allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, } void -StoreController::syncCollapsed(const cache_key *key) +StoreController::syncCollapsed(const sfileno xitIndex) { - StoreEntry *collapsed = swapDir->get(key); - if (!collapsed) { // the entry is no longer locally active, ignore update - debugs(20, 7, "not SMP-syncing not-local " << storeKeyText(key)); - return; - } - - if (!collapsed->mem_obj) { - // without mem_obj, the entry cannot be transient so we ignore it - debugs(20, 7, "not SMP-syncing not-shared " << *collapsed); - return; - } - - if (collapsed->mem_obj->xitTable.index < 0) { - debugs(20, 7, "not SMP-syncing not-transient " << *collapsed); - return; - } - - // this must be done before the abandoned() check because we may be looking - // at an entry we are writing while abandoned() requires a reading lock. - if (!collapsed->mem_obj->smpCollapsed) { - // this happens, e.g., when we tried collapsing but rejected the hit - // or a stale notification was received (and we are now the writer) - debugs(20, 7, "not SMP-syncing not-SMP-collapsed " << *collapsed); - return; - } - assert(transients); - if (transients->abandoned(*collapsed)) { - debugs(20, 3, "aborting abandoned " << *collapsed); - collapsed->abort(); + + StoreEntry *collapsed = transients->findCollapsed(xitIndex); + if (!collapsed) { // the entry is no longer locally active, ignore update + debugs(20, 7, "not SMP-syncing not-transient " << xitIndex); return; } + assert(collapsed->mem_obj); + assert(collapsed->mem_obj->smpCollapsed); debugs(20, 7, "syncing " << *collapsed); + bool abandoned = transients->abandoned(*collapsed); bool found = false; bool inSync = false; if (memStore && collapsed->mem_obj->memCache.io == MemObject::ioDone) { @@ -1054,6 +1032,12 @@ StoreController::syncCollapsed(const cache_key *key) found = anchorCollapsed(*collapsed, inSync); } + if (abandoned && collapsed->store_status == STORE_PENDING) { + debugs(20, 3, "aborting abandoned but STORE_PENDING " << *collapsed); + collapsed->abort(); + return; + } + if (inSync) { debugs(20, 5, "synced " << *collapsed); collapsed->invokeHandlers(); diff --git a/src/tests/testStoreController.cc b/src/tests/testStoreController.cc index 21e62cece9..eaf263925c 100644 --- a/src/tests/testStoreController.cc +++ b/src/tests/testStoreController.cc @@ -105,7 +105,6 @@ addedEntry(StorePointer hashStore, e->expires = squid_curtime; e->lastmod = squid_curtime; e->refcount = 1; - EBIT_SET(e->flags, ENTRY_CACHABLE); EBIT_CLR(e->flags, RELEASE_REQUEST); EBIT_CLR(e->flags, KEY_PRIVATE); e->ping_status = PING_NONE; diff --git a/src/tests/testStoreHashIndex.cc b/src/tests/testStoreHashIndex.cc index 9d320794f4..87f6b955bc 100644 --- a/src/tests/testStoreHashIndex.cc +++ b/src/tests/testStoreHashIndex.cc @@ -86,7 +86,6 @@ addedEntry(StorePointer hashStore, e->expires = squid_curtime; e->lastmod = squid_curtime; e->refcount = 1; - EBIT_SET(e->flags, ENTRY_CACHABLE); EBIT_CLR(e->flags, RELEASE_REQUEST); EBIT_CLR(e->flags, KEY_PRIVATE); e->ping_status = PING_NONE; diff --git a/src/whois.cc b/src/whois.cc index a3b51d30ea..43f41930cf 100644 --- a/src/whois.cc +++ b/src/whois.cc @@ -192,8 +192,7 @@ WhoisState::readReply(const Comm::ConnectionPointer &conn, char *aBuffer, size_t entry->timestampsSet(); entry->flush(); - if (!EBIT_TEST(entry->flags, RELEASE_REQUEST)) - entry->setPublicKey(); + entry->makePublic(); fwd->complete(); debugs(75, 3, "whoisReadReply: Done: " << entry->url());