From 4310f8b00dd574542dcec4208112bb89ef403528 Mon Sep 17 00:00:00 2001 From: Eduard Bagdasaryan Date: Fri, 2 Feb 2018 23:41:40 +0300 Subject: [PATCH] Bug 4505: SMP caches sometimes do not purge entries (#46) When Squid finds a requested entry in the memory cache, it does not check whether the same entry is also stored in a cache_dir. The StoreEntry object may become associated with its store entry in the memory cache but not with its store entry on disk. This inconsistency causes two known problems: 1. Squid may needlessly swap out the memory hit to disk, either overwriting an existing (and identical) disk entry or, worse, creating a duplicate entry on another disk. In the second case, the two disk entries are not synchronized and may eventually start to differ if one of them is removed or updated. 2. Squid may not delete a stale disk entry when needed, violating various HTTP MUSTs, and eventually serving stale [disk] cache entries to clients. Another purging problem is not caused by the above inconsistency: 3. A DELETE request or equivalent may come for the entry which is still locked for writing. Squid fails to get a lock for such an entry (in order to purge it) and the entry remains in disk and/or memory cache. To solve the first two problems: * StoreEntry::mayStartSwapout() now avoids needless swapouts by checking whether StoreEntry was fully loaded, is being loaded, or could have been loaded from disk. To be able to reject swapouts in the last case, we now require that the newer (disk) entries explicitly delete their older variants instead of relying on the Store to overwrite the older (unlocked) variant. That explicit delete should already be happening in higher-level code (that knows which entry is newer and must mark any stale entries for deletion anyway). To fix problem #3: * A new Store::Controller::evictIfFound(key) method purges (or marks for deletion if purging is impossible) all the matching store entries, without loading the StoreEntry information from stores. Avoiding StoreEntry creation reduces waste of resources (the StoreEntry object would have to be deleted anyway) _and_ allows us to mark being-created entries (that are locked for writing and, hence, cannot be loaded into a StoreEntry object). XXX: SMP cache purges may continue to malfunction when the Transients table is missing. Currently, Transients are created only when the collapsed_forwarding is on. After Squid bug 4579 is fixed, every public StoreEntry will have the corresponding Transients entry and vice versa, extending these fixes to all SMP environments. Note that even if Squid properly avoids storing duplicate disk entries, some cache_dir manipulations by humans and Squid crashes may lead to such duplicates being present. This patch leaves dealing with potential duplicates out of scope except it guarantees that if an entry is deleted, then all [possible] duplicates are deleted as well. Fixing the above problems required (and/or benefited from) many related improvements, including some Store API changes. It is impractical to detail each change here, but several are highlighted below. To propagate DELETEs across workers, every public StoreEntry now has a Transients entry. Prevented concurrent cache readers from aborting when their entry is release()d. Unlike abort, release should not affect current readers. Fixed store.log code to avoid "Bug: Missing MemObject::storeId value". Removed Transients extras used to initialize MemObject StoreID/method in StoreEntry objects created by Transients::get() for collapsed requests. Controlled::get() and related Controller APIs do not _require_ setting those MemObject details: get() methods for all cache stores return StoreEntry objects without them (because entry basics lack Store ID and request method). The caller is responsible for cache key collision detection. Controlled::get() parameters could include Store ID and request method for early cache key collision detection, but adding a StoreQuery class and improving collision detection code is outside this project scope (and requires many changes). Found more cases where release() should not prevent sharing. Remaining cases need further analysis as discussed in master 39fe14b2. Greatly simplified UFS store rebuilding, possibly fixing subtle bug(s). Clarified RELEASE_REQUEST flag meaning, becoming 'a private StoreEntry which can't become public anymore'. Refactored the related code, combining two related notions: 'a private entry' and 'an entry marked for removal'. Do not abort collapsed StoreEntries during syncing just because the corresponding being stored shared entry was marked for deletion. Abort them if the shared entry has been also aborted. Added StoreEntry helper methods to prevent direct manipulation of individual disk-related data members (swap_dirn, swap_filen, and swap_status). These methods help keep these related data members in a coherent state and minimize code duplication. --- src/CollapsedForwarding.cc | 23 +- src/CollapsedForwarding.h | 7 +- src/MemObject.cc | 3 - src/MemObject.h | 11 +- src/MemStore.cc | 91 +++--- src/MemStore.h | 10 +- src/Store.h | 111 +++++++- src/Transients.cc | 236 +++++++--------- src/Transients.h | 57 ++-- src/client_side_reply.cc | 36 +-- src/clients/FtpGateway.cc | 12 +- src/enums.h | 9 +- src/fs/rock/RockSwapDir.cc | 112 ++++---- src/fs/rock/RockSwapDir.h | 14 +- src/fs/ufs/RebuildState.cc | 224 +++++---------- src/fs/ufs/RebuildState.h | 15 +- src/fs/ufs/UFSSwapDir.cc | 60 ++-- src/fs/ufs/UFSSwapDir.h | 12 +- src/gopher.cc | 3 +- src/http.cc | 23 +- src/ipc/StoreMap.cc | 62 +++- src/ipc/StoreMap.h | 16 +- src/mime.cc | 16 +- src/neighbors.cc | 6 +- src/peer_digest.cc | 15 +- src/store.cc | 370 ++++++++++++------------ src/store/Controlled.h | 20 +- src/store/Controller.cc | 390 ++++++++++++++++++++------ src/store/Controller.h | 80 +++++- src/store/Disk.cc | 4 +- src/store/Disk.h | 9 +- src/store/Disks.cc | 51 +++- src/store/Disks.h | 10 +- src/store/Storage.h | 16 +- src/store/forward.h | 4 + src/store_client.cc | 8 +- src/store_log.cc | 5 +- src/store_rebuild.cc | 47 +--- src/store_rebuild.h | 2 - src/store_swapin.cc | 14 +- src/store_swapout.cc | 44 +-- src/tests/TestSwapDir.h | 8 +- src/tests/stub_CollapsedForwarding.cc | 3 +- src/tests/stub_MemStore.cc | 8 +- src/tests/stub_store.cc | 14 +- src/tests/stub_store_rebuild.cc | 1 - src/tests/testRock.cc | 22 +- src/tests/testStoreController.cc | 2 - src/tests/testStoreHashIndex.cc | 2 - src/whois.cc | 3 +- 50 files changed, 1320 insertions(+), 1001 deletions(-) diff --git a/src/CollapsedForwarding.cc b/src/CollapsedForwarding.cc index c0abf2d49a..f5713be6bb 100644 --- a/src/CollapsedForwarding.cc +++ b/src/CollapsedForwarding.cc @@ -53,27 +53,34 @@ CollapsedForwarding::Init() } void -CollapsedForwarding::Broadcast(const StoreEntry &e) +CollapsedForwarding::Broadcast(const StoreEntry &e, const bool includingThisWorker) { - if (!queue.get()) - return; - - if (!e.mem_obj || e.mem_obj->xitTable.index < 0 || + if (!e.hasTransients() || !Store::Root().transientReaders(e)) { debugs(17, 7, "nobody reads " << e); return; } + debugs(17, 5, e); + Broadcast(e.mem_obj->xitTable.index, includingThisWorker); +} + +void +CollapsedForwarding::Broadcast(const sfileno index, const bool includingThisWorker) +{ + if (!queue.get()) + return; + CollapsedForwardingMsg msg; msg.sender = KidIdentifier; - msg.xitIndex = e.mem_obj->xitTable.index; + msg.xitIndex = index; - debugs(17, 5, e << " to " << Config.workers << "-1 workers"); + debugs(17, 7, "entry " << index << " to " << Config.workers << (includingThisWorker ? "" : "-1") << " workers"); // TODO: send only to workers who are waiting for data for (int workerId = 1; workerId <= Config.workers; ++workerId) { try { - if (workerId != KidIdentifier && queue->push(workerId, msg)) + if ((workerId != KidIdentifier || includingThisWorker) && queue->push(workerId, msg)) Notify(workerId); } catch (const Queue::Full &) { debugs(17, DBG_IMPORTANT, "ERROR: Collapsed forwarding " << diff --git a/src/CollapsedForwarding.h b/src/CollapsedForwarding.h index e067ba456c..9eea7f59e5 100644 --- a/src/CollapsedForwarding.h +++ b/src/CollapsedForwarding.h @@ -13,6 +13,7 @@ #include "ipc/forward.h" #include "ipc/Queue.h" +#include "store/forward.h" #include @@ -26,7 +27,11 @@ public: static void Init(); /// notify other workers about changes in entry state (e.g., new data) - static void Broadcast(const StoreEntry &e); + static void Broadcast(const StoreEntry &e, const bool includingThisWorker = false); + + /// notify other workers about state changes in Transient entry at the given xitTable.index + /// use Broadcast(StoreEntry) variant if you have a StoreEntry object + static void Broadcast(const sfileno index, const bool includingThisWorker); /// kick worker with empty IPC queue static void Notify(const int workerId); diff --git a/src/MemObject.cc b/src/MemObject.cc index 1e9a6bef58..2be8d5628e 100644 --- a/src/MemObject.cc +++ b/src/MemObject.cc @@ -98,7 +98,6 @@ MemObject::setUris(char const *aStoreId, char const *aLogUri, const HttpRequestM MemObject::MemObject() : inmem_lo(0), nclients(0), - smpCollapsed(false), ping_reply_callback(nullptr), ircb_data(nullptr), id(0), @@ -220,8 +219,6 @@ MemObject::stat(MemBuf * mb) const mb->appendf("\tmem-cache index: %d state: %d offset: %" PRId64 "\n", memCache.index, memCache.io, memCache.offset); if (object_sz >= 0) mb->appendf("\tobject_sz: %" PRId64 "\n", object_sz); - if (smpCollapsed) - mb->appendf("\tsmp-collapsed\n"); StoreClientStats statsVisitor(mb); diff --git a/src/MemObject.h b/src/MemObject.h index 4cd2d752ee..ff9d2cc71f 100644 --- a/src/MemObject.h +++ b/src/MemObject.h @@ -15,6 +15,7 @@ #include "RemovalPolicy.h" #include "SquidString.h" #include "stmem.h" +#include "store/forward.h" #include "StoreIOBuffer.h" #include "StoreIOState.h" #include "typedefs.h" //for IRCB @@ -123,8 +124,12 @@ public: SwapOut swapout; - /// cache "I/O" direction and status - typedef enum { ioUndecided, ioWriting, ioReading, ioDone } Io; + /* TODO: Remove this change-minimizing hack */ + using Io = Store::IoStatus; + static constexpr Io ioUndecided = Store::ioUndecided; + static constexpr Io ioReading = Store::ioReading; + static constexpr Io ioWriting = Store::ioWriting; + static constexpr Io ioDone = Store::ioDone; /// State of an entry with regards to the [shared] in-transit table. class XitTable @@ -150,8 +155,6 @@ public: }; MemCache memCache; ///< current [shared] memory caching state for the entry - bool smpCollapsed; ///< whether this entry gets data from another worker - /* Read only - this reply must be preserved by store clients */ /* The original reply. possibly with updated metadata. */ HttpRequestPointer request; diff --git a/src/MemStore.cc b/src/MemStore.cc index 3e1983b25d..64835ef13d 100644 --- a/src/MemStore.cc +++ b/src/MemStore.cc @@ -321,7 +321,7 @@ MemStore::get(const cache_key *key) StoreEntry *e = new StoreEntry(); // XXX: We do not know the URLs yet, only the key, but we need to parse and - // store the response for the Root().get() callers to be happy because they + // store the response for the Root().find() callers to be happy because they // expect IN_MEMORY entries to already have the response headers and body. e->createMemObject(); @@ -329,13 +329,12 @@ MemStore::get(const cache_key *key) const bool copied = copyFromShm(*e, index, *slot); - if (copied) { - e->hashInsert(key); + if (copied) return e; - } - debugs(20, 3, HERE << "mem-loading failed; freeing " << index); + debugs(20, 3, "failed for " << *e); map->freeEntry(index); // do not let others into the same trap + destroyStoreEntry(static_cast(e)); return NULL; } @@ -403,46 +402,41 @@ MemStore::updateHeadersOrThrow(Ipc::StoreMapUpdate &update) } bool -MemStore::anchorCollapsed(StoreEntry &collapsed, bool &inSync) +MemStore::anchorToCache(StoreEntry &entry, bool &inSync) { if (!map) return false; sfileno index; const Ipc::StoreMapAnchor *const slot = map->openForReading( - reinterpret_cast(collapsed.key), index); + reinterpret_cast(entry.key), index); if (!slot) return false; - anchorEntry(collapsed, index, *slot); - inSync = updateCollapsedWith(collapsed, index, *slot); + anchorEntry(entry, index, *slot); + inSync = updateAnchoredWith(entry, index, *slot); return true; // even if inSync is false } bool -MemStore::updateCollapsed(StoreEntry &collapsed) +MemStore::updateAnchored(StoreEntry &entry) { - assert(collapsed.mem_obj); - - const sfileno index = collapsed.mem_obj->memCache.index; - - // already disconnected from the cache, no need to update - if (index < 0) - return true; - if (!map) return false; + assert(entry.mem_obj); + assert(entry.hasMemStore()); + const sfileno index = entry.mem_obj->memCache.index; const Ipc::StoreMapAnchor &anchor = map->readableEntry(index); - return updateCollapsedWith(collapsed, index, anchor); + return updateAnchoredWith(entry, index, anchor); } -/// updates collapsed entry after its anchor has been located +/// updates Transients entry after its anchor has been located bool -MemStore::updateCollapsedWith(StoreEntry &collapsed, const sfileno index, const Ipc::StoreMapAnchor &anchor) +MemStore::updateAnchoredWith(StoreEntry &entry, const sfileno index, const Ipc::StoreMapAnchor &anchor) { - collapsed.swap_file_sz = anchor.basics.swap_file_sz; - const bool copied = copyFromShm(collapsed, index, anchor); + entry.swap_file_sz = anchor.basics.swap_file_sz; + const bool copied = copyFromShm(entry, index, anchor); return copied; } @@ -450,15 +444,8 @@ MemStore::updateCollapsedWith(StoreEntry &collapsed, const sfileno index, const void MemStore::anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor) { - const Ipc::StoreMapAnchor::Basics &basics = anchor.basics; - - e.swap_file_sz = basics.swap_file_sz; - e.lastref = basics.lastref; - e.timestamp = basics.timestamp; - e.expires = basics.expires; - e.lastModified(basics.lastmod); - e.refcount = basics.refcount; - e.flags = basics.flags; + assert(!e.hasDisk()); // no conflict with disk entry basics + anchor.exportInto(e); assert(e.mem_obj); if (anchor.complete()) { @@ -470,11 +457,7 @@ MemStore::anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc assert(e.mem_obj->object_sz < 0); e.setMemStatus(NOT_IN_MEMORY); } - assert(e.swap_status == SWAPOUT_NONE); // set in StoreEntry constructor - e.ping_status = PING_NONE; - EBIT_CLR(e.flags, RELEASE_REQUEST); - e.clearPrivate(); EBIT_SET(e.flags, ENTRY_VALIDATED); MemObject::MemCache &mc = e.mem_obj->memCache; @@ -910,25 +893,29 @@ MemStore::completeWriting(StoreEntry &e) } void -MemStore::markForUnlink(StoreEntry &e) +MemStore::evictCached(StoreEntry &e) { - assert(e.mem_obj); - if (e.mem_obj->memCache.index >= 0) - map->freeEntry(e.mem_obj->memCache.index); + debugs(47, 5, e); + if (e.hasMemStore()) { + if (map->freeEntry(e.mem_obj->memCache.index)) + CollapsedForwarding::Broadcast(e); + if (!e.locked()) { + disconnect(e); + e.destroyMemObject(); + } + } else if (const auto key = e.publicKey()) { + // the entry may have been loaded and then disconnected from the cache + evictIfFound(key); + if (!e.locked()) + e.destroyMemObject(); + } } void -MemStore::unlink(StoreEntry &e) +MemStore::evictIfFound(const cache_key *key) { - if (e.mem_obj && e.mem_obj->memCache.index >= 0) { - map->freeEntry(e.mem_obj->memCache.index); - disconnect(e); - } else if (map) { - // the entry may have been loaded and then disconnected from the cache - map->freeEntryByKey(reinterpret_cast(e.key)); - } - - e.destroyMemObject(); // XXX: but it may contain useful info such as a client list. The old code used to do that though, right? + if (map) + map->freeEntryByKey(key); } void @@ -936,12 +923,12 @@ MemStore::disconnect(StoreEntry &e) { assert(e.mem_obj); MemObject &mem_obj = *e.mem_obj; - if (mem_obj.memCache.index >= 0) { + if (e.hasMemStore()) { if (mem_obj.memCache.io == MemObject::ioWriting) { map->abortWriting(mem_obj.memCache.index); mem_obj.memCache.index = -1; mem_obj.memCache.io = MemObject::ioDone; - Store::Root().transientsAbandon(e); // broadcasts after the change + Store::Root().stopSharing(e); // broadcasts after the change } else { assert(mem_obj.memCache.io == MemObject::ioReading); map->closeForReading(mem_obj.memCache.index); diff --git a/src/MemStore.h b/src/MemStore.h index 65669dcf52..8d771e33a7 100644 --- a/src/MemStore.h +++ b/src/MemStore.h @@ -59,10 +59,10 @@ public: virtual bool dereference(StoreEntry &e) override; virtual void updateHeaders(StoreEntry *e) override; virtual void maintain() override; - virtual bool anchorCollapsed(StoreEntry &e, bool &inSync) override; - virtual bool updateCollapsed(StoreEntry &e) override; - virtual void markForUnlink(StoreEntry &) override; - virtual void unlink(StoreEntry &e) override; + virtual bool anchorToCache(StoreEntry &e, bool &inSync) override; + virtual bool updateAnchored(StoreEntry &) override; + virtual void evictCached(StoreEntry &) override; + virtual void evictIfFound(const cache_key *) override; virtual bool smpAware() const override { return true; } static int64_t EntryLimit(); @@ -81,7 +81,7 @@ protected: void updateHeadersOrThrow(Ipc::StoreMapUpdate &update); void anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor); - bool updateCollapsedWith(StoreEntry &collapsed, const sfileno index, const Ipc::StoreMapAnchor &anchor); + bool updateAnchoredWith(StoreEntry &, const sfileno, const Ipc::StoreMapAnchor &); Ipc::Mem::PageId pageForSlice(Ipc::StoreMapSliceId sliceId); Ipc::StoreMap::Slice &nextAppendableSlice(const sfileno entryIndex, sfileno &sliceOffset); diff --git a/src/Store.h b/src/Store.h index fced940ad2..cf33b5aedb 100644 --- a/src/Store.h +++ b/src/Store.h @@ -82,28 +82,43 @@ public: void swapOutDecision(const MemObject::SwapOut::Decision &decision); void abort(); - void makePublic(const KeyScope keyScope = ksDefault); + bool makePublic(const KeyScope keyScope = ksDefault); void makePrivate(const bool shareable); /// A low-level method just resetting "private key" flags. /// To avoid key inconsistency please use forcePublicKey() /// or similar instead. void clearPrivate(); - void setPublicKey(const KeyScope keyScope = ksDefault); + bool setPublicKey(const KeyScope keyScope = ksDefault); /// Resets existing public key to a public key with default scope, /// releasing the old default-scope entry (if any). /// Does nothing if the existing public key already has default scope. void clearPublicKeyScope(); - void setPrivateKey(const bool shareable); + + /// \returns public key (if the entry has it) or nil (otherwise) + const cache_key *publicKey() const { + return (!EBIT_TEST(flags, KEY_PRIVATE)) ? + reinterpret_cast(key): // may be nil + nullptr; + } + + /// Either fills this entry with private key or changes the existing key + /// from public to private. + /// \param permanent whether this entry should be private forever. + void setPrivateKey(const bool shareable, const bool permanent); + void expireNow(); + /// Makes the StoreEntry private and marks the corresponding entry + /// for eventual removal from the Store. void releaseRequest(const bool shareable = false); void negativeCache(); - void cacheNegatively(); /** \todo argh, why both? */ + bool cacheNegatively(); /** \todo argh, why both? */ void invokeHandlers(); - void purgeMem(); void cacheInMemory(); ///< start or continue storing in memory cache void swapOut(); /// whether we are in the process of writing this entry to disk bool swappingOut() const { return swap_status == SWAPOUT_WRITING; } + /// whether the entire entry is now on disk (possibly marked for deletion) + bool swappedOut() const { return swap_status == SWAPOUT_DONE; } void swapOutFileClose(int how); const char *url() const; /// Satisfies cachability requirements shared among disk and RAM caches. @@ -158,6 +173,20 @@ public: /// the disk this entry is [being] cached on; asserts for entries w/o a disk Store::Disk &disk() const; + /// whether one of this StoreEntry owners has locked the corresponding + /// disk entry (at the specified disk entry coordinates, if any) + bool hasDisk(const sdirno dirn = -1, const sfileno filen = -1) const; + /// Makes hasDisk(dirn, filn) true. The caller should have locked + /// the corresponding disk store entry for reading or writing. + void attachToDisk(const sdirno, const sfileno, const swap_status_t); + /// Makes hasDisk() false. The caller should have unlocked + /// the corresponding disk store entry. + void detachFromDisk(); + + /// whether there is a corresponding locked transients table entry + bool hasTransients() const { return mem_obj && mem_obj->xitTable.index >= 0; } + /// whether there is a corresponding locked shared memory table entry + bool hasMemStore() const { return mem_obj && mem_obj->memCache.index >= 0; } MemObject *mem_obj; RemovalPolicyNode repl; @@ -198,7 +227,6 @@ public: void *operator new(size_t byteCount); void operator delete(void *address); - void setReleaseFlag(); #if USE_SQUID_ESI ESIElement::Pointer cachedESITree; @@ -220,8 +248,19 @@ public: /// update last reference timestamp and related Store metadata void touch(); + /// One of the three methods to get rid of an unlocked StoreEntry object. + /// Removes all unlocked (and marks for eventual removal all locked) Store + /// entries, including attached and unattached entries that have our key. + /// Also destroys us if we are unlocked or makes us private otherwise. + /// TODO: remove virtual. virtual void release(const bool shareable = false); + /// One of the three methods to get rid of an unlocked StoreEntry object. + /// May destroy this object if it is unlocked; does nothing otherwise. + /// Unlike release(), may not trigger eviction of underlying store entries, + /// but, unlike destroyStoreEntry(), does honor an earlier release request. + void abandon(const char *context) { if (!locked()) doAbandon(context); } + /// May the caller commit to treating this [previously locked] /// entry as a cache hit? bool mayStartHitting() const { @@ -242,12 +281,17 @@ public: virtual void flush(); protected: + typedef Store::EntryGuard EntryGuard; + void transientsAbandonmentCheck(); + /// does nothing except throwing if disk-associated data members are inconsistent + void checkDisk() const; private: + void doAbandon(const char *context); bool checkTooBig() const; void forcePublicKey(const cache_key *newkey); - void adjustVary(); + StoreEntry *adjustVary(); const cache_key *calcPublicKey(const KeyScope keyScope); static MemAllocator *pool; @@ -310,9 +354,53 @@ private: typedef void (*STOREGETCLIENT) (StoreEntry *, void *cbdata); namespace Store { + +/// a smart pointer similar to std::unique_ptr<> that automatically +/// release()s and unlock()s the guarded Entry on stack-unwinding failures +class EntryGuard { +public: + /// \param entry either nil or a locked Entry to manage + /// \param context default unlock() message + EntryGuard(Entry *entry, const char *context): + entry_(entry), context_(context) { + assert(!entry_ || entry_->locked()); + } + + ~EntryGuard() { + if (entry_) { + // something went wrong -- the caller did not unlockAndReset() us + onException(); + } + } + + EntryGuard(EntryGuard &&) = delete; // no copying or moving (for now) + + /// like std::unique_ptr::get() + /// \returns nil or the guarded (locked) entry + Entry *get() { + return entry_; + } + + /// like std::unique_ptr::reset() + /// stops guarding the entry + /// unlocks the entry (which may destroy it) + void unlockAndReset(const char *resetContext = nullptr) { + if (entry_) { + entry_->unlock(resetContext ? resetContext : context_); + entry_ = nullptr; + } + } + +private: + void onException() noexcept; + + Entry *entry_; ///< the guarded Entry or nil + const char *context_; ///< default unlock() message +}; + void Stats(StoreEntry *output); void Maintain(void *unused); -}; +}; // namespace Store /// \ingroup StoreAPI size_t storeEntryInUse(); @@ -338,7 +426,7 @@ StoreEntry *storeCreateEntry(const char *, const char *, const RequestFlags &, c /// \ingroup StoreAPI /// Creates a new StoreEntry with mem_obj and sets initial flags/states. -StoreEntry *storeCreatePureEntry(const char *storeId, const char *logUrl, const RequestFlags &, const HttpRequestMethod&); +StoreEntry *storeCreatePureEntry(const char *storeId, const char *logUrl, const HttpRequestMethod&); /// \ingroup StoreAPI void storeInit(void); @@ -376,7 +464,10 @@ void storeFsDone(void); /// \ingroup StoreAPI void storeReplAdd(const char *, REMOVALPOLICYCREATE *); -/// \ingroup StoreAPI +/// One of the three methods to get rid of an unlocked StoreEntry object. +/// This low-level method ignores lock()ing and release() promises. It never +/// leaves the entry in the local store_table. +/// TODO: Hide by moving its functionality into the StoreEntry destructor. extern FREE destroyStoreEntry; /// \ingroup StoreAPI diff --git a/src/Transients.cc b/src/Transients.cc index dacd2d4dcc..f1baa7a75e 100644 --- a/src/Transients.cc +++ b/src/Transients.cc @@ -26,8 +26,6 @@ /// shared memory segment path to use for Transients map static const SBuf MapLabel("transients_map"); -/// shared memory segment path to use for Transients map extras -static const char *ExtrasLabel = "transients_ex"; Transients::Transients(): map(NULL), locals(NULL) { @@ -50,8 +48,6 @@ Transients::init() map = new TransientsMap(MapLabel); map->cleaner = this; - extras = shm_old(TransientsMapExtras)(ExtrasLabel); - locals = new Locals(entryLimit, 0); } @@ -161,42 +157,16 @@ Transients::get(const cache_key *key) if (StoreEntry *oldE = locals->at(index)) { debugs(20, 3, "not joining private " << *oldE); assert(EBIT_TEST(oldE->flags, KEY_PRIVATE)); - } else if (StoreEntry *newE = copyFromShm(index)) { - return newE; // keep read lock to receive updates from others + map->closeForReading(index); + return nullptr; } - // private entry or loading failure - map->closeForReading(index); - return NULL; -} - -StoreEntry * -Transients::copyFromShm(const sfileno index) -{ - const TransientsMapExtras::Item &extra = extras->items[index]; - - // create a brand new store entry and initialize it with stored info - StoreEntry *e = storeCreatePureEntry(extra.url, extra.url, - extra.reqFlags, extra.reqMethod); - - assert(e->mem_obj); - e->mem_obj->method = extra.reqMethod; - e->mem_obj->xitTable.io = MemObject::ioReading; + StoreEntry *e = new StoreEntry(); + e->createMemObject(); e->mem_obj->xitTable.index = index; - - // TODO: Support collapsed revalidation for SMP-aware caches. - e->setPublicKey(ksDefault); - assert(e->key); - - // How do we know its SMP- and not just locally-collapsed? A worker gets - // locally-collapsed entries from the local store_table, not Transients. - // TODO: Can we remove smpCollapsed by not syncing non-transient entries? - e->mem_obj->smpCollapsed = true; - - assert(!locals->at(index)); - // We do not lock e because we do not want to prevent its destruction; - // e is tied to us via mem_obj so we will know when it is destructed. - locals->at(index) = e; + e->mem_obj->xitTable.io = Store::ioReading; + anchor->exportInto(*e); + // keep read lock to receive updates from others return e; } @@ -217,64 +187,49 @@ Transients::findCollapsed(const sfileno index) } void -Transients::startWriting(StoreEntry *e, const RequestFlags &reqFlags, - const HttpRequestMethod &reqMethod) +Transients::monitorIo(StoreEntry *e, const cache_key *key, const Store::IoStatus direction) { - assert(e); - assert(e->mem_obj); - assert(e->mem_obj->xitTable.index < 0); + assert(direction == Store::ioReading || direction == Store::ioWriting); - if (!map) { - debugs(20, 5, "No map to add " << *e); - return; + if (!e->hasTransients()) { + addEntry(e, key, direction); + e->mem_obj->xitTable.io = direction; } - sfileno index = 0; - Ipc::StoreMapAnchor *slot = map->openForWriting(reinterpret_cast(e->key), index); - if (!slot) { - debugs(20, 5, "collision registering " << *e); - return; + assert(e->hasTransients()); + const auto index = e->mem_obj->xitTable.index; + if (const auto old = locals->at(index)) { + assert(old == e); + } else { + // We do not lock e because we do not want to prevent its destruction; + // e is tied to us via mem_obj so we will know when it is destructed. + locals->at(index) = e; } - - try { - if (copyToShm(*e, index, reqFlags, reqMethod)) { - slot->set(*e); - e->mem_obj->xitTable.io = MemObject::ioWriting; - e->mem_obj->xitTable.index = index; - map->startAppending(index); - // keep write lock -- we will be supplying others with updates - return; - } - // fall through to the error handling code - } catch (const std::exception &x) { // TODO: should we catch ... as well? - debugs(20, 2, "error keeping entry " << index << - ' ' << *e << ": " << x.what()); - // fall through to the error handling code - } - - map->abortWriting(index); } -/// copies all relevant local data to shared memory -bool -Transients::copyToShm(const StoreEntry &e, const sfileno index, - const RequestFlags &reqFlags, - const HttpRequestMethod &reqMethod) +/// creates a new Transients entry or throws +void +Transients::addEntry(StoreEntry *e, const cache_key *key, const Store::IoStatus direction) { - TransientsMapExtras::Item &extra = extras->items[index]; - - const char *url = e.url(); - const size_t urlLen = strlen(url); - Must(urlLen < sizeof(extra.url)); // we have space to store it all, plus 0 - strncpy(extra.url, url, sizeof(extra.url)); - extra.url[urlLen] = '\0'; + assert(e); + assert(e->mem_obj); + assert(!e->hasTransients()); - extra.reqFlags = reqFlags; + Must(map); // configured to track transients - Must(reqMethod != Http::METHOD_OTHER); - extra.reqMethod = reqMethod.id(); + sfileno index = 0; + Ipc::StoreMapAnchor *slot = map->openForWriting(key, index); + Must(slot); // no writer collisions - return true; + slot->set(*e, key); + e->mem_obj->xitTable.index = index; + if (direction == Store::ioWriting) { + // keep write lock; the caller will decide what to do with it + map->startAppending(e->mem_obj->xitTable.index); + } else { + // keep the entry locked (for reading) to receive remote DELETE events + map->closeForWriting(e->mem_obj->xitTable.index); + } } void @@ -284,49 +239,30 @@ Transients::noteFreeMapSlice(const Ipc::StoreMapSliceId) } void -Transients::abandon(const StoreEntry &e) -{ - assert(e.mem_obj && map); - map->freeEntry(e.mem_obj->xitTable.index); // just marks the locked entry - CollapsedForwarding::Broadcast(e); - // We do not unlock the entry now because the problem is most likely with - // the server resource rather than a specific cache writer, so we want to - // prevent other readers from collapsing requests for that resource. -} - -bool -Transients::abandoned(const StoreEntry &e) const -{ - assert(e.mem_obj); - return abandonedAt(e.mem_obj->xitTable.index); -} - -/// whether an in-transit entry at the index is now abandoned by its writer -bool -Transients::abandonedAt(const sfileno index) const +Transients::status(const StoreEntry &entry, bool &aborted, bool &waitingToBeFreed) const { assert(map); - return map->readableEntry(index).waitingToBeFreed; + assert(entry.hasTransients()); + const auto idx = entry.mem_obj->xitTable.index; + const auto &anchor = isWriter(entry) ? + map->writeableEntry(idx) : map->readableEntry(idx); + aborted = anchor.writerHalted; + waitingToBeFreed = anchor.waitingToBeFreed; } void Transients::completeWriting(const StoreEntry &e) { - if (e.mem_obj && e.mem_obj->xitTable.index >= 0) { - assert(e.mem_obj->xitTable.io == MemObject::ioWriting); - // there will be no more updates from us after this, so we must prevent - // future readers from joining - map->freeEntry(e.mem_obj->xitTable.index); // just marks the locked entry - map->closeForWriting(e.mem_obj->xitTable.index); - e.mem_obj->xitTable.index = -1; - e.mem_obj->xitTable.io = MemObject::ioDone; - } + assert(e.hasTransients()); + assert(isWriter(e)); + map->closeForWriting(e.mem_obj->xitTable.index, true); + e.mem_obj->xitTable.io = Store::ioReading; } int Transients::readers(const StoreEntry &e) const { - if (e.mem_obj && e.mem_obj->xitTable.index >= 0) { + if (e.hasTransients()) { assert(map); return map->peekAtEntry(e.mem_obj->xitTable.index).lock.readers; } @@ -334,32 +270,46 @@ Transients::readers(const StoreEntry &e) const } void -Transients::markForUnlink(StoreEntry &e) -{ - unlink(e); +Transients::evictCached(StoreEntry &e) +{ + debugs(20, 5, e); + if (e.hasTransients()) { + const auto index = e.mem_obj->xitTable.index; + if (map->freeEntry(index)) { + // Delay syncCollapsed(index) which may end `e` wait for updates. + // Calling it directly/here creates complex reentrant call chains. + CollapsedForwarding::Broadcast(e, true); + } + } // else nothing to do because e must be private } void -Transients::unlink(StoreEntry &e) +Transients::evictIfFound(const cache_key *key) { - if (e.mem_obj && e.mem_obj->xitTable.io == MemObject::ioWriting) - abandon(e); + if (!map) + return; + + const sfileno index = map->fileNoByKey(key); + if (map->freeEntry(index)) + CollapsedForwarding::Broadcast(index, true); } void -Transients::disconnect(MemObject &mem_obj) +Transients::disconnect(StoreEntry &entry) { - if (mem_obj.xitTable.index >= 0) { + debugs(20, 5, entry); + if (entry.hasTransients()) { + auto &xitTable = entry.mem_obj->xitTable; assert(map); - if (mem_obj.xitTable.io == MemObject::ioWriting) { - map->abortWriting(mem_obj.xitTable.index); + if (isWriter(entry)) { + map->abortWriting(xitTable.index); } else { - assert(mem_obj.xitTable.io == MemObject::ioReading); - map->closeForReading(mem_obj.xitTable.index); + assert(isReader(entry)); + map->closeForReading(xitTable.index); } - locals->at(mem_obj.xitTable.index) = NULL; - mem_obj.xitTable.index = -1; - mem_obj.xitTable.io = MemObject::ioDone; + locals->at(xitTable.index) = nullptr; + xitTable.index = -1; + xitTable.io = Store::ioDone; } } @@ -374,12 +324,30 @@ Transients::EntryLimit() return Config.collapsed_forwarding_shared_entries_limit; } +bool +Transients::markedForDeletion(const cache_key *key) const +{ + assert(map); + return map->markedForDeletion(key); +} + +bool +Transients::isReader(const StoreEntry &e) const +{ + return e.mem_obj && e.mem_obj->xitTable.io == Store::ioReading; +} + +bool +Transients::isWriter(const StoreEntry &e) const +{ + return e.mem_obj && e.mem_obj->xitTable.io == Store::ioWriting; +} + /// initializes shared memory segment used by Transients class TransientsRr: public Ipc::Mem::RegisteredRunner { public: /* RegisteredRunner API */ - TransientsRr(): mapOwner(NULL), extrasOwner(NULL) {} virtual void useConfig(); virtual ~TransientsRr(); @@ -387,8 +355,7 @@ protected: virtual void create(); private: - TransientsMap::Owner *mapOwner; - Ipc::Mem::Owner *extrasOwner; + TransientsMap::Owner *mapOwner = nullptr; }; RunnerRegistrationEntry(TransientsRr); @@ -412,13 +379,10 @@ TransientsRr::create() Must(!mapOwner); mapOwner = TransientsMap::Init(MapLabel, entryLimit); - Must(!extrasOwner); - extrasOwner = shm_new(TransientsMapExtras)(ExtrasLabel, entryLimit); } TransientsRr::~TransientsRr() { - delete extrasOwner; delete mapOwner; } diff --git a/src/Transients.h b/src/Transients.h index 003edb2c13..8479f20074 100644 --- a/src/Transients.h +++ b/src/Transients.h @@ -9,26 +9,20 @@ #ifndef SQUID_TRANSIENTS_H #define SQUID_TRANSIENTS_H -#include "http/MethodType.h" #include "ipc/mem/Page.h" #include "ipc/mem/PageStack.h" #include "ipc/StoreMap.h" #include "Store.h" #include "store/Controlled.h" +#include "store/forward.h" #include -// StoreEntry restoration info not already stored by Ipc::StoreMap -struct TransientsMapExtraItem { - char url[MAX_URL+1]; ///< Request-URI; TODO: decrease MAX_URL by one - RequestFlags reqFlags; ///< request flags - Http::MethodType reqMethod; ///< request method; extensions are not supported -}; -typedef Ipc::StoreMapItems TransientsMapExtras; typedef Ipc::StoreMap TransientsMap; /// Keeps track of store entries being delivered to clients that arrived before -/// those entries were [fully] cached. This shared table is necessary to sync -/// the entry-writing worker with entry-reading worker(s). +/// those entries were [fully] cached. This SMP-shared table is necessary to +/// * sync an entry-writing worker with entry-reading worker(s); and +/// * sync an entry-deleting worker with both entry-reading/writing workers. class Transients: public Store::Controlled, public Ipc::StoreMapCleaner { public: @@ -38,23 +32,23 @@ public: /// return a local, previously collapsed entry StoreEntry *findCollapsed(const sfileno xitIndex); - /// add an in-transit entry suitable for collapsing future requests - void startWriting(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod); + /// start listening for remote DELETE requests targeting either a complete + /// StoreEntry (ioReading) or a being-formed miss StoreEntry (ioWriting) + void monitorIo(StoreEntry*, const cache_key*, const Store::IoStatus); /// called when the in-transit entry has been successfully cached void completeWriting(const StoreEntry &e); - /// the calling entry writer no longer expects to cache this entry - void abandon(const StoreEntry &e); - - /// whether an in-transit entry is now abandoned by its writer - bool abandoned(const StoreEntry &e) const; + /// copies current shared entry metadata into parameters + /// \param aborted whether the entry was aborted + /// \param waitingToBeFreed whether the entry was marked for deletion + void status(const StoreEntry &e, bool &aborted, bool &waitingToBeFreed) const; /// number of entry readers some time ago int readers(const StoreEntry &e) const; - /// the caller is done writing or reading this entry - void disconnect(MemObject &mem_obj); + /// the caller is done writing or reading the given entry + void disconnect(StoreEntry &); /* Store API */ virtual StoreEntry *get(const cache_key *) override; @@ -69,18 +63,24 @@ public: virtual void stat(StoreEntry &e) const override; virtual void reference(StoreEntry &e) override; virtual bool dereference(StoreEntry &e) override; - virtual void markForUnlink(StoreEntry &e) override; - virtual void unlink(StoreEntry &e) override; + virtual void evictCached(StoreEntry &) override; + virtual void evictIfFound(const cache_key *) override; virtual void maintain() override; virtual bool smpAware() const override { return true; } + /// Whether an entry with the given public key exists and (but) was + /// marked for removal some time ago; get(key) returns nil in such cases. + bool markedForDeletion(const cache_key *) const; + + /// whether the entry is in "reading from Transients" I/O state + bool isReader(const StoreEntry &) const; + /// whether the entry is in "writing to Transients" I/O state + bool isWriter(const StoreEntry &) const; + static int64_t EntryLimit(); protected: - StoreEntry *copyFromShm(const sfileno index); - bool copyToShm(const StoreEntry &e, const sfileno index, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod); - - bool abandonedAt(const sfileno index) const; + void addEntry(StoreEntry*, const cache_key *, const Store::IoStatus); // Ipc::StoreMapCleaner API virtual void noteFreeMapSlice(const Ipc::StoreMapSliceId sliceId) override; @@ -89,12 +89,9 @@ private: /// shared packed info indexed by Store keys, for creating new StoreEntries TransientsMap *map; - /// shared packed info that standard StoreMap does not store for us - typedef TransientsMapExtras Extras; - Ipc::Mem::Pointer extras; - typedef std::vector Locals; - /// local collapsed entries indexed by transient ID, for syncing old StoreEntries + /// local collapsed reader and writer entries, indexed by transient ID, + /// for syncing old StoreEntries Locals *locals; }; diff --git a/src/client_side_reply.cc b/src/client_side_reply.cc index 649dbda6b7..2eb1dfbdf9 100644 --- a/src/client_side_reply.cc +++ b/src/client_side_reply.cc @@ -316,10 +316,8 @@ clientReplyContext::processExpired() http->log_uri, http->request->flags, http->request->method); /* NOTE, don't call StoreEntry->lock(), storeCreateEntry() does it */ - if (collapsingAllowed) { + if (collapsingAllowed && Store::Root().allowCollapsing(entry, http->request->flags, http->request->method)) { debugs(88, 5, "allow other revalidation requests to collapse on " << *entry); - Store::Root().allowCollapsing(entry, http->request->flags, - http->request->method); collapsedRevalidation = crInitiator; } else { collapsedRevalidation = crNone; @@ -905,30 +903,16 @@ clientReplyContext::purgeRequestFindObjectToPurge() void purgeEntriesByUrl(HttpRequest * req, const char *url) { -#if USE_HTCP - bool get_or_head_sent = false; -#endif - for (HttpRequestMethod m(Http::METHOD_NONE); m != Http::METHOD_ENUM_END; ++m) { if (m.respMaybeCacheable()) { - if (StoreEntry *entry = storeGetPublic(url, m)) { - debugs(88, 5, "purging " << *entry << ' ' << m << ' ' << url); + const cache_key *key = storeKeyPublic(url, m); + debugs(88, 5, m << ' ' << url << ' ' << storeKeyText(key)); #if USE_HTCP - neighborsHtcpClear(entry, url, req, m, HTCP_CLR_INVALIDATION); - if (m == Http::METHOD_GET || m == Http::METHOD_HEAD) { - get_or_head_sent = true; - } + neighborsHtcpClear(nullptr, url, req, m, HTCP_CLR_INVALIDATION); #endif - entry->release(); - } + Store::Root().evictIfFound(key); } } - -#if USE_HTCP - if (!get_or_head_sent) { - neighborsHtcpClear(NULL, url, req, HttpRequestMethod(Http::METHOD_GET), HTCP_CLR_INVALIDATION); - } -#endif } void @@ -1059,7 +1043,7 @@ clientReplyContext::purgeDoPurgeGet(StoreEntry *newEntry) #if USE_HTCP neighborsHtcpClear(newEntry, NULL, http->request, HttpRequestMethod(Http::METHOD_GET), HTCP_CLR_PURGE); #endif - newEntry->release(); + newEntry->release(true); purgeStatus = Http::scOkay; } @@ -1075,7 +1059,7 @@ clientReplyContext::purgeDoPurgeHead(StoreEntry *newEntry) #if USE_HTCP neighborsHtcpClear(newEntry, NULL, http->request, HttpRequestMethod(Http::METHOD_HEAD), HTCP_CLR_PURGE); #endif - newEntry->release(); + newEntry->release(true); purgeStatus = Http::scOkay; } @@ -1091,7 +1075,7 @@ clientReplyContext::purgeDoPurgeHead(StoreEntry *newEntry) #if USE_HTCP neighborsHtcpClear(entry, NULL, http->request, HttpRequestMethod(Http::METHOD_GET), HTCP_CLR_PURGE); #endif - entry->release(); + entry->release(true); purgeStatus = Http::scOkay; } @@ -1102,7 +1086,7 @@ clientReplyContext::purgeDoPurgeHead(StoreEntry *newEntry) #if USE_HTCP neighborsHtcpClear(entry, NULL, http->request, HttpRequestMethod(Http::METHOD_HEAD), HTCP_CLR_PURGE); #endif - entry->release(); + entry->release(true); purgeStatus = Http::scOkay; } } @@ -2297,7 +2281,7 @@ clientReplyContext::createStoreEntry(const HttpRequestMethod& m, RequestFlags re !reqFlags.needValidation && (m == Http::METHOD_GET || m == Http::METHOD_HEAD)) { // make the entry available for future requests now - Store::Root().allowCollapsing(e, reqFlags, m); + (void)Store::Root().allowCollapsing(e, reqFlags, m); } sc = storeClientListAdd(e, this); diff --git a/src/clients/FtpGateway.cc b/src/clients/FtpGateway.cc index 86c6a699cc..f9cb8321a5 100644 --- a/src/clients/FtpGateway.cc +++ b/src/clients/FtpGateway.cc @@ -2640,14 +2640,10 @@ Ftp::Gateway::haveParsedReplyHeaders() e->timestampsSet(); - if (flags.authenticated) { - /* - * Authenticated requests can't be cached. - */ - e->release(); - } else if (!EBIT_TEST(e->flags, RELEASE_REQUEST) && !getCurrentOffset()) { - e->setPublicKey(); - } else { + // makePublic() if allowed/possible or release() otherwise + if (flags.authenticated || // authenticated requests can't be cached + getCurrentOffset() || + !e->makePublic()) { e->release(); } } diff --git a/src/enums.h b/src/enums.h index e0dff7566f..bc320f367e 100644 --- a/src/enums.h +++ b/src/enums.h @@ -47,9 +47,16 @@ typedef enum { STORE_PENDING } store_status_t; +/// StoreEntry relationship with a disk cache typedef enum { + /// StoreEntry is currently not associated with any disk store entry. + /// Does not guarantee (or preclude!) a matching disk store entry existence. SWAPOUT_NONE, + /// StoreEntry is being swapped out to the associated disk store entry. + /// Guarantees the disk store entry existence. SWAPOUT_WRITING, + /// StoreEntry is associated with a complete (i.e., fully swapped out) disk store entry. + /// Guarantees the disk store entry existence. SWAPOUT_DONE } swap_status_t; @@ -69,7 +76,7 @@ enum { ENTRY_SPECIAL, ENTRY_REVALIDATE_ALWAYS, DELAY_SENDING, - RELEASE_REQUEST, + RELEASE_REQUEST, ///< prohibits making the key public REFRESH_REQUEST, ENTRY_REVALIDATE_STALE, ENTRY_DISPATCHED, diff --git a/src/fs/rock/RockSwapDir.cc b/src/fs/rock/RockSwapDir.cc index f89aed50bb..8f36cf29e5 100644 --- a/src/fs/rock/RockSwapDir.cc +++ b/src/fs/rock/RockSwapDir.cc @@ -66,90 +66,68 @@ Rock::SwapDir::get(const cache_key *key) // create a brand new store entry and initialize it with stored basics StoreEntry *e = new StoreEntry(); + e->createMemObject(); anchorEntry(*e, filen, *slot); - - e->hashInsert(key); trackReferences(*e); - return e; - // the disk entry remains open for reading, protected from modifications } bool -Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed, bool &inSync) +Rock::SwapDir::anchorToCache(StoreEntry &entry, bool &inSync) { if (!map || !theFile || !theFile->canRead()) return false; sfileno filen; const Ipc::StoreMapAnchor *const slot = map->openForReading( - reinterpret_cast(collapsed.key), filen); + reinterpret_cast(entry.key), filen); if (!slot) return false; - anchorEntry(collapsed, filen, *slot); - inSync = updateCollapsedWith(collapsed, *slot); + anchorEntry(entry, filen, *slot); + inSync = updateAnchoredWith(entry, *slot); return true; // even if inSync is false } bool -Rock::SwapDir::updateCollapsed(StoreEntry &collapsed) +Rock::SwapDir::updateAnchored(StoreEntry &entry) { if (!map || !theFile || !theFile->canRead()) return false; - if (collapsed.swap_filen < 0) // no longer using a disk cache - return true; - assert(collapsed.swap_dirn == index); + assert(entry.hasDisk(index)); - const Ipc::StoreMapAnchor &s = map->readableEntry(collapsed.swap_filen); - return updateCollapsedWith(collapsed, s); + const Ipc::StoreMapAnchor &s = map->readableEntry(entry.swap_filen); + return updateAnchoredWith(entry, s); } bool -Rock::SwapDir::updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor) +Rock::SwapDir::updateAnchoredWith(StoreEntry &entry, const Ipc::StoreMapAnchor &anchor) { - collapsed.swap_file_sz = anchor.basics.swap_file_sz; + entry.swap_file_sz = anchor.basics.swap_file_sz; return true; } void Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor) { - const Ipc::StoreMapAnchor::Basics &basics = anchor.basics; + anchor.exportInto(e); - e.swap_file_sz = basics.swap_file_sz; - e.lastref = basics.lastref; - e.timestamp = basics.timestamp; - e.expires = basics.expires; - e.lastModified(basics.lastmod); - e.refcount = basics.refcount; - e.flags = basics.flags; - - if (anchor.complete()) { - e.store_status = STORE_OK; - e.swap_status = SWAPOUT_DONE; - } else { - e.store_status = STORE_PENDING; - e.swap_status = SWAPOUT_WRITING; // even though another worker writes? - } + const bool complete = anchor.complete(); + e.store_status = complete ? STORE_OK : STORE_PENDING; + // SWAPOUT_WRITING: even though another worker writes? + e.attachToDisk(index, filen, complete ? SWAPOUT_DONE : SWAPOUT_WRITING); e.ping_status = PING_NONE; - EBIT_CLR(e.flags, RELEASE_REQUEST); - e.clearPrivate(); EBIT_SET(e.flags, ENTRY_VALIDATED); - - e.swap_dirn = index; - e.swap_filen = filen; } void Rock::SwapDir::disconnect(StoreEntry &e) { - assert(e.swap_dirn == index); - assert(e.swap_filen >= 0); - // cannot have SWAPOUT_NONE entry with swap_filen >= 0 - assert(e.swap_status != SWAPOUT_NONE); + assert(e.hasDisk(index)); + + ignoreReferences(e); // do not rely on e.swap_status here because there is an async delay // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE. @@ -161,16 +139,12 @@ void Rock::SwapDir::disconnect(StoreEntry &e) if (e.mem_obj && e.mem_obj->swapout.sio != NULL && dynamic_cast(*e.mem_obj->swapout.sio).writeableAnchor_) { map->abortWriting(e.swap_filen); - e.swap_dirn = -1; - e.swap_filen = -1; - e.swap_status = SWAPOUT_NONE; + e.detachFromDisk(); dynamic_cast(*e.mem_obj->swapout.sio).writeableAnchor_ = NULL; - Store::Root().transientsAbandon(e); // broadcasts after the change + Store::Root().stopSharing(e); // broadcasts after the change } else { map->closeForReading(e.swap_filen); - e.swap_dirn = -1; - e.swap_filen = -1; - e.swap_status = SWAPOUT_NONE; + e.detachFromDisk(); } } @@ -198,9 +172,16 @@ Rock::SwapDir::doReportStat() const } void -Rock::SwapDir::swappedOut(const StoreEntry &) +Rock::SwapDir::finalizeSwapoutSuccess(const StoreEntry &) +{ + // nothing to do +} + +void +Rock::SwapDir::finalizeSwapoutFailure(StoreEntry &entry) { - // stats are not stored but computed when needed + debugs(47, 5, entry); + disconnect(entry); // calls abortWriting() to free the disk entry } int64_t @@ -782,7 +763,7 @@ Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOS return NULL; } - if (e.swap_filen < 0) { + if (!e.hasDisk()) { debugs(47,4, HERE << e); return NULL; } @@ -908,7 +889,7 @@ Rock::SwapDir::writeCompleted(int errflag, size_t, RefCount< ::WriteRequest> r) writeError(sio); sio.finishedWriting(errflag); - // and hope that Core will call disconnect() to close the map entry + // and wait for the finalizeSwapoutFailure() call to close the map entry } if (sio.touchingStoreEntry()) @@ -923,7 +904,7 @@ Rock::SwapDir::writeError(StoreIOState &sio) map->freeEntry(sio.swap_filen); // will mark as unusable, just in case if (sio.touchingStoreEntry()) - Store::Root().transientsAbandon(*sio.e); + Store::Root().stopSharing(*sio.e); // else noop: a fresh entry update error does not affect stale entry readers // All callers must also call IoState callback, to propagate the error. @@ -999,19 +980,24 @@ Rock::SwapDir::unlinkdUseful() const } void -Rock::SwapDir::unlink(StoreEntry &e) +Rock::SwapDir::evictIfFound(const cache_key *key) { - debugs(47, 5, HERE << e); - ignoreReferences(e); - map->freeEntry(e.swap_filen); - disconnect(e); + if (map) + map->freeEntryByKey(key); // may not be there } void -Rock::SwapDir::markForUnlink(StoreEntry &e) +Rock::SwapDir::evictCached(StoreEntry &e) { debugs(47, 5, e); - map->freeEntry(e.swap_filen); + if (e.hasDisk(index)) { + if (map->freeEntry(e.swap_filen)) + CollapsedForwarding::Broadcast(e); + if (!e.locked()) + disconnect(e); + } else if (const auto key = e.publicKey()) { + evictIfFound(key); + } } void @@ -1093,6 +1079,12 @@ Rock::SwapDir::freeSlotsPath() const return spacesPath.termedBuf(); } +bool +Rock::SwapDir::hasReadableEntry(const StoreEntry &e) const +{ + return map->hasReadableEntry(reinterpret_cast(e.key)); +} + namespace Rock { RunnerRegistrationEntry(SwapDirRr); diff --git a/src/fs/rock/RockSwapDir.h b/src/fs/rock/RockSwapDir.h index c51fa45777..879bd45ed0 100644 --- a/src/fs/rock/RockSwapDir.h +++ b/src/fs/rock/RockSwapDir.h @@ -39,15 +39,18 @@ public: /* public ::SwapDir API */ virtual void reconfigure(); virtual StoreEntry *get(const cache_key *key); - virtual void markForUnlink(StoreEntry &e); + virtual void evictCached(StoreEntry &); + virtual void evictIfFound(const cache_key *); virtual void disconnect(StoreEntry &e); virtual uint64_t currentSize() const; virtual uint64_t currentCount() const; virtual bool doReportStat() const; - virtual void swappedOut(const StoreEntry &e); + virtual void finalizeSwapoutSuccess(const StoreEntry &); + virtual void finalizeSwapoutFailure(StoreEntry &); virtual void create(); virtual void parse(int index, char *path); virtual bool smpAware() const { return true; } + virtual bool hasReadableEntry(const StoreEntry &) const; // temporary path to the shared memory map of first slots of cached entries SBuf inodeMapPath() const; @@ -77,8 +80,8 @@ public: protected: /* Store API */ - virtual bool anchorCollapsed(StoreEntry &collapsed, bool &inSync); - virtual bool updateCollapsed(StoreEntry &collapsed); + virtual bool anchorToCache(StoreEntry &entry, bool &inSync); + virtual bool updateAnchored(StoreEntry &); /* protected ::SwapDir API */ virtual bool needsDiskStrand() const; @@ -94,7 +97,6 @@ protected: virtual bool dereference(StoreEntry &e); virtual void updateHeaders(StoreEntry *e); virtual bool unlinkdUseful() const; - virtual void unlink(StoreEntry &e); virtual void statfs(StoreEntry &e) const; /* IORequestor API */ @@ -124,7 +126,7 @@ protected: StoreIOState::Pointer createUpdateIO(const Ipc::StoreMapUpdate &update, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *); void anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor); - bool updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor); + bool updateAnchoredWith(StoreEntry &, const Ipc::StoreMapAnchor &); friend class Rebuild; friend class IoState; diff --git a/src/fs/ufs/RebuildState.cc b/src/fs/ufs/RebuildState.cc index ae01af1f11..c1915e27ce 100644 --- a/src/fs/ufs/RebuildState.cc +++ b/src/fs/ufs/RebuildState.cc @@ -40,7 +40,6 @@ Fs::Ufs::RebuildState::RebuildState(RefCount aSwapDir) : fn(0), entry(NULL), td(NULL), - e(NULL), fromLog(true), _done(false), cbdata(NULL) @@ -108,8 +107,6 @@ Fs::Ufs::RebuildState::RebuildStep(void *data) void Fs::Ufs::RebuildState::rebuildStep() { - currentEntry(NULL); - // Balance our desire to maximize the number of entries processed at once // (and, hence, minimize overheads and total rebuild time) with a // requirement to also process Coordinator events, disk I/Os, etc. @@ -208,34 +205,65 @@ Fs::Ufs::RebuildState::rebuildFromDirectory() return; } - if (!storeRebuildKeepEntry(tmpe, key, counts)) + addIfFresh(key, + filn, + tmpe.swap_file_sz, + tmpe.expires, + tmpe.timestamp, + tmpe.lastref, + tmpe.lastModified(), + tmpe.refcount, + tmpe.flags); +} + +/// if the loaded entry metadata is still relevant, indexes the entry +void +Fs::Ufs::RebuildState::addIfFresh(const cache_key *key, + sfileno file_number, + uint64_t swap_file_sz, + time_t expires, + time_t timestamp, + time_t lastref, + time_t lastmod, + uint32_t refcount, + uint16_t newFlags) +{ + if (!evictStaleAndContinue(key, lastref, counts.dupcount)) return; ++counts.objcount; - // tmpe.dump(5); - currentEntry(sd->addDiskRestore(key, - filn, - tmpe.swap_file_sz, - tmpe.expires, - tmpe.timestamp, - tmpe.lastref, - tmpe.lastModified(), - tmpe.refcount, /* refcount */ - tmpe.flags, /* flags */ - (int) flags.clean)); - storeDirSwapLog(currentEntry(), SWAP_LOG_ADD); + const auto addedEntry = sd->addDiskRestore(key, + file_number, + swap_file_sz, + expires, + timestamp, + lastref, + lastmod, + refcount, + newFlags, + 0 /* XXX: unused */); + storeDirSwapLog(addedEntry, SWAP_LOG_ADD); } -StoreEntry * -Fs::Ufs::RebuildState::currentEntry() const +/// Evicts a matching entry if it was last touched before caller's maxRef. +/// \returns false only if the matching entry was touched at or after maxRef, +/// indicating that the caller has supplied outdated maxRef. +bool +Fs::Ufs::RebuildState::evictStaleAndContinue(const cache_key *candidateKey, const time_t maxRef, int &staleCount) { - return e; -} + if (auto *indexedEntry = Store::Root().peek(candidateKey)) { -void -Fs::Ufs::RebuildState::currentEntry(StoreEntry *newValue) -{ - e = newValue; + if (indexedEntry->lastref >= maxRef) { + indexedEntry->abandon("Fs::Ufs::RebuildState::evictStaleAndContinue"); + ++counts.clashcount; + return false; + } + + ++staleCount; + indexedEntry->release(true); // evict previously indexedEntry + } + + return true; } /// process one swap log entry @@ -278,20 +306,8 @@ Fs::Ufs::RebuildState::rebuildFromSwapLog() if (swapData.op == SWAP_LOG_ADD) { (void) 0; } else if (swapData.op == SWAP_LOG_DEL) { - /* Delete unless we already have a newer copy anywhere in any store */ - /* this needs to become - * 1) unpack url - * 2) make synthetic request with headers ?? or otherwise search - * for a matching object in the store - * TODO FIXME change to new async api - */ - currentEntry (Store::Root().get(swapData.key)); - - if (currentEntry() != NULL && swapData.lastref >= e->lastref) { - undoAdd(); - --counts.objcount; - ++counts.cancelcount; - } + // remove any older or same-age entry; +1 covers same-age entries + (void)evictStaleAndContinue(swapData.key, swapData.lastref+1, counts.cancelcount); return; } else { const double @@ -317,123 +333,23 @@ Fs::Ufs::RebuildState::rebuildFromSwapLog() return; } - /* this needs to become - * 1) unpack url - * 2) make synthetic request with headers ?? or otherwise search - * for a matching object in the store - * TODO FIXME change to new async api - */ - currentEntry (Store::Root().get(swapData.key)); - - int used; /* is swapfile already in use? */ - - used = sd->mapBitTest(swapData.swap_filen); - - /* If this URL already exists in the cache, does the swap log - * appear to have a newer entry? Compare 'lastref' from the - * swap log to e->lastref. */ - /* is the log entry newer than current entry? */ - int disk_entry_newer = currentEntry() ? (swapData.lastref > currentEntry()->lastref ? 1 : 0) : 0; - - if (used && !disk_entry_newer) { - /* log entry is old, ignore it */ + if (sd->mapBitTest(swapData.swap_filen)) { + // While we were scanning the logs, some (unrelated) entry was added to + // our disk using our logged swap_filen. We could change our swap_filen + // and move the store file, but there is no Store API to do that (TODO). ++counts.clashcount; return; - } else if (used && currentEntry() && currentEntry()->swap_filen == swapData.swap_filen && currentEntry()->swap_dirn == sd->index) { - /* swapfile taken, same URL, newer, update meta */ - - if (currentEntry()->store_status == STORE_OK) { - currentEntry()->lastref = swapData.timestamp; - currentEntry()->timestamp = swapData.timestamp; - currentEntry()->expires = swapData.expires; - currentEntry()->lastModified(swapData.lastmod); - currentEntry()->flags = swapData.flags; - currentEntry()->refcount += swapData.refcount; - sd->dereference(*currentEntry()); - } else { - debug_trap("commonUfsDirRebuildFromSwapLog: bad condition"); - debugs(47, DBG_IMPORTANT, HERE << "bad condition"); - } - return; - } else if (used) { - /* swapfile in use, not by this URL, log entry is newer */ - /* This is sorta bad: the log entry should NOT be newer at this - * point. If the log is dirty, the filesize check should have - * caught this. If the log is clean, there should never be a - * newer entry. */ - debugs(47, DBG_IMPORTANT, "WARNING: newer swaplog entry for dirno " << - sd->index << ", fileno "<< std::setfill('0') << std::hex << - std::uppercase << std::setw(8) << swapData.swap_filen); - - /* I'm tempted to remove the swapfile here just to be safe, - * but there is a bad race condition in the NOVM version if - * the swapfile has recently been opened for writing, but - * not yet opened for reading. Because we can't map - * swapfiles back to StoreEntrys, we don't know the state - * of the entry using that file. */ - /* We'll assume the existing entry is valid, probably because - * were in a slow rebuild and the the swap file number got taken - * and the validation procedure hasn't run. */ - assert(flags.need_to_validate); - ++counts.clashcount; - return; - } else if (currentEntry() && !disk_entry_newer) { - /* key already exists, current entry is newer */ - /* keep old, ignore new */ - ++counts.dupcount; - return; - } else if (currentEntry()) { - /* key already exists, this swapfile not being used */ - /* junk old, load new */ - undoAdd(); - --counts.objcount; - ++counts.dupcount; - } else { - /* URL doesnt exist, swapfile not in use */ - /* load new */ - (void) 0; - } - - ++counts.objcount; - - currentEntry(sd->addDiskRestore(swapData.key, - swapData.swap_filen, - swapData.swap_file_sz, - swapData.expires, - swapData.timestamp, - swapData.lastref, - swapData.lastmod, - swapData.refcount, - swapData.flags, - (int) flags.clean)); - - storeDirSwapLog(currentEntry(), SWAP_LOG_ADD); -} - -/// undo the effects of adding an entry in rebuildFromSwapLog() -void -Fs::Ufs::RebuildState::undoAdd() -{ - StoreEntry *added = currentEntry(); - assert(added); - currentEntry(NULL); - - // TODO: Why bother with these two if we are going to release?! - added->expireNow(); - added->releaseRequest(); - - if (added->swap_filen > -1) { - SwapDir *someDir = INDEXSD(added->swap_dirn); - assert(someDir); - if (UFSSwapDir *ufsDir = dynamic_cast(someDir)) - ufsDir->undoAddDiskRestore(added); - // else the entry was loaded from and/or is currently in a non-UFS dir - // Thus, there is no use in preserving its disk file (the only purpose - // of undoAddDiskRestore!), even if we could. Instead, we release the - // the entry and [eventually] unlink its disk file or free its slot. } - added->release(); + addIfFresh(swapData.key, + swapData.swap_filen, + swapData.swap_file_sz, + swapData.expires, + swapData.timestamp, + swapData.lastref, + swapData.lastmod, + swapData.refcount, + swapData.flags); } int @@ -559,9 +475,3 @@ Fs::Ufs::RebuildState::isDone() const return _done; } -StoreEntry * -Fs::Ufs::RebuildState::currentItem() -{ - return currentEntry(); -} - diff --git a/src/fs/ufs/RebuildState.h b/src/fs/ufs/RebuildState.h index dff093ddb3..f41b64582c 100644 --- a/src/fs/ufs/RebuildState.h +++ b/src/fs/ufs/RebuildState.h @@ -33,7 +33,6 @@ public: virtual bool error() const; virtual bool isDone() const; - virtual StoreEntry *currentItem(); RefCount sd; int n_read; @@ -63,11 +62,17 @@ private: void rebuildFromDirectory(); void rebuildFromSwapLog(); void rebuildStep(); - void undoAdd(); + void addIfFresh(const cache_key *key, + sfileno file_number, + uint64_t swap_file_sz, + time_t expires, + time_t timestamp, + time_t lastref, + time_t lastmod, + uint32_t refcount, + uint16_t flags); + bool evictStaleAndContinue(const cache_key *candidateKey, const time_t maxRef, int &staleCount); int getNextFile(sfileno *, int *size); - StoreEntry *currentEntry() const; - void currentEntry(StoreEntry *); - StoreEntry *e; bool fromLog; bool _done; /// \bug (callback) should be hidden behind a proper human readable name diff --git a/src/fs/ufs/UFSSwapDir.cc b/src/fs/ufs/UFSSwapDir.cc index a615c90d66..22261d5c82 100644 --- a/src/fs/ufs/UFSSwapDir.cc +++ b/src/fs/ufs/UFSSwapDir.cc @@ -513,7 +513,7 @@ Fs::Ufs::UFSSwapDir::maintain() ++removed; - e->release(); + e->release(true); } walker->Done(walker); @@ -805,9 +805,7 @@ Fs::Ufs::UFSSwapDir::addDiskRestore(const cache_key * key, e = new StoreEntry(); e->store_status = STORE_OK; e->setMemStatus(NOT_IN_MEMORY); - e->swap_status = SWAPOUT_DONE; - e->swap_filen = file_number; - e->swap_dirn = index; + e->attachToDisk(index, file_number, SWAPOUT_DONE); e->swap_file_sz = swap_file_sz; e->lastref = lastref; e->timestamp = timestamp; @@ -815,31 +813,16 @@ Fs::Ufs::UFSSwapDir::addDiskRestore(const cache_key * key, e->lastModified(lastmod); e->refcount = refcount; e->flags = newFlags; - EBIT_CLR(e->flags, RELEASE_REQUEST); - e->clearPrivate(); e->ping_status = PING_NONE; EBIT_CLR(e->flags, ENTRY_VALIDATED); mapBitSet(e->swap_filen); cur_size += fs.blksize * sizeInBlocks(e->swap_file_sz); ++n_disk_objects; - e->hashInsert(key); /* do it after we clear KEY_PRIVATE */ + e->hashInsert(key); replacementAdd (e); return e; } -void -Fs::Ufs::UFSSwapDir::undoAddDiskRestore(StoreEntry *e) -{ - debugs(47, 5, HERE << *e); - replacementRemove(e); // checks swap_dirn so do it before we invalidate it - // Do not unlink the file as it might be used by a subsequent entry. - mapBitReset(e->swap_filen); - e->swap_filen = -1; - e->swap_dirn = -1; - cur_size -= fs.blksize * sizeInBlocks(e->swap_file_sz); - --n_disk_objects; -} - void Fs::Ufs::UFSSwapDir::rebuild() { @@ -1216,20 +1199,30 @@ Fs::Ufs::UFSSwapDir::unlinkdUseful() const } void -Fs::Ufs::UFSSwapDir::unlink(StoreEntry & e) +Fs::Ufs::UFSSwapDir::evictCached(StoreEntry & e) { - debugs(79, 3, HERE << "dirno " << index << ", fileno "<< - std::setfill('0') << std::hex << std::uppercase << std::setw(8) << e.swap_filen); - if (e.swap_status == SWAPOUT_DONE) { + debugs(79, 3, e); + if (e.locked()) // somebody else may still be using this file + return; // nothing to do: our get() always returns nil + + if (!e.hasDisk()) + return; // see evictIfFound() + + if (e.swappedOut()) { cur_size -= fs.blksize * sizeInBlocks(e.swap_file_sz); --n_disk_objects; } replacementRemove(&e); mapBitReset(e.swap_filen); UFSSwapDir::unlinkFile(e.swap_filen); - e.swap_filen = -1; - e.swap_dirn = -1; - e.swap_status = SWAPOUT_NONE; + e.detachFromDisk(); +} + +void +Fs::Ufs::UFSSwapDir::evictIfFound(const cache_key *) +{ + // UFS disk entries always have (attached) StoreEntries so if we got here, + // the entry is not cached on disk and there is nothing for us to do. } void @@ -1242,8 +1235,7 @@ Fs::Ufs::UFSSwapDir::replacementAdd(StoreEntry * e) void Fs::Ufs::UFSSwapDir::replacementRemove(StoreEntry * e) { - if (e->swap_dirn < 0) - return; + assert(e->hasDisk()); SwapDirPointer SD = INDEXSD(e->swap_dirn); @@ -1295,12 +1287,20 @@ Fs::Ufs::UFSSwapDir::sync() } void -Fs::Ufs::UFSSwapDir::swappedOut(const StoreEntry &e) +Fs::Ufs::UFSSwapDir::finalizeSwapoutSuccess(const StoreEntry &e) { cur_size += fs.blksize * sizeInBlocks(e.swap_file_sz); ++n_disk_objects; } +void +Fs::Ufs::UFSSwapDir::finalizeSwapoutFailure(StoreEntry &entry) +{ + debugs(47, 5, entry); + // rely on the expected subsequent StoreEntry::release(), evictCached(), or + // a similar call to call unlink(), detachFromDisk(), etc. for the entry. +} + void Fs::Ufs::UFSSwapDir::logEntry(const StoreEntry & e, int op) const { diff --git a/src/fs/ufs/UFSSwapDir.h b/src/fs/ufs/UFSSwapDir.h index e3afa9cb6d..f129c44729 100644 --- a/src/fs/ufs/UFSSwapDir.h +++ b/src/fs/ufs/UFSSwapDir.h @@ -49,10 +49,10 @@ public: virtual void dump(StoreEntry &) const override; virtual bool doubleCheck(StoreEntry &) override; virtual bool unlinkdUseful() const override; - virtual void unlink(StoreEntry &) override; virtual void statfs(StoreEntry &) const override; virtual void maintain() override; - virtual void markForUnlink(StoreEntry &) override {} + virtual void evictCached(StoreEntry &) override; + virtual void evictIfFound(const cache_key *) override; virtual bool canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const override; virtual void reference(StoreEntry &) override; virtual bool dereference(StoreEntry &) override; @@ -67,11 +67,15 @@ public: virtual void reconfigure() override; virtual int callback() override; virtual void sync() override; - virtual void swappedOut(const StoreEntry &e) override; + virtual void finalizeSwapoutSuccess(const StoreEntry &) override; + virtual void finalizeSwapoutFailure(StoreEntry &) override; virtual uint64_t currentSize() const override { return cur_size; } virtual uint64_t currentCount() const override { return n_disk_objects; } virtual ConfigOption *getOptionTree() const override; virtual bool smpAware() const override { return false; } + /// as long as ufs relies on the global store_table to index entries, + /// it is wrong to ask individual ufs cache_dirs whether they have an entry + virtual bool hasReadableEntry(const StoreEntry &) const override { return false; } void unlinkFile(sfileno f); // move down when unlink is a virtual method @@ -99,8 +103,6 @@ public: uint32_t refcount, uint16_t flags, int clean); - /// Undo the effects of UFSSwapDir::addDiskRestore(). - void undoAddDiskRestore(StoreEntry *e); int validFileno(sfileno filn, int flag) const; int mapBitAllocate(); diff --git a/src/gopher.cc b/src/gopher.cc index 65124821b3..d5fd18a8aa 100644 --- a/src/gopher.cc +++ b/src/gopher.cc @@ -922,7 +922,8 @@ gopherSendRequest(int, void *data) CommIoCbPtrFun(gopherSendComplete, gopherState)); Comm::Write(gopherState->serverConn, &mb, call); - gopherState->entry->makePublic(); + if (!gopherState->entry->makePublic()) + gopherState->entry->makePrivate(true); } void diff --git a/src/http.cc b/src/http.cc index 1750087b5e..d0909f8a53 100644 --- a/src/http.cc +++ b/src/http.cc @@ -108,7 +108,7 @@ HttpStateData::HttpStateData(FwdState *theFwdState) : * for example, the request to this neighbor fails. */ if (_peer->options.proxy_only) - entry->releaseRequest(); + entry->releaseRequest(true); #if USE_DELAY_POOLS entry->setNoDelay(_peer->options.no_delay); @@ -252,7 +252,7 @@ httpMaybeRemovePublic(StoreEntry * e, Http::StatusCode status) #if USE_HTCP neighborsHtcpClear(e, nullptr, e->mem_obj->request.getRaw(), e->mem_obj->method, HTCP_CLR_INVALIDATION); #endif - pe->release(); + pe->release(true); } /** \par @@ -269,7 +269,7 @@ httpMaybeRemovePublic(StoreEntry * e, Http::StatusCode status) #if USE_HTCP neighborsHtcpClear(e, nullptr, e->mem_obj->request.getRaw(), HttpRequestMethod(Http::METHOD_HEAD), HTCP_CLR_INVALIDATION); #endif - pe->release(); + pe->release(true); } } @@ -334,7 +334,7 @@ HttpStateData::reusableReply(HttpStateData::ReuseDecision &decision) #endif if (EBIT_TEST(entry->flags, RELEASE_REQUEST)) - return decision.make(ReuseDecision::reuseNot, "the entry has been released"); + return decision.make(ReuseDecision::doNotCacheButShare, "the entry has been released"); // RFC 7234 section 4: a cache MUST use the most recent response // (as determined by the Date header field) @@ -905,8 +905,11 @@ HttpStateData::haveParsedReplyHeaders() /* Check if object is cacheable or not based on reply code */ debugs(11, 3, "HTTP CODE: " << statusCode); - if (const StoreEntry *oldEntry = findPreviouslyCachedEntry(entry)) + if (StoreEntry *oldEntry = findPreviouslyCachedEntry(entry)) { + oldEntry->lock("HttpStateData::haveParsedReplyHeaders"); sawDateGoBack = rep->olderThan(oldEntry->getReply()); + oldEntry->unlock("HttpStateData::haveParsedReplyHeaders"); + } if (neighbors_do_private_keys && !sawDateGoBack) httpMaybeRemovePublic(entry, rep->sline.status()); @@ -954,11 +957,17 @@ HttpStateData::haveParsedReplyHeaders() break; case ReuseDecision::cachePositively: - entry->makePublic(); + if (!entry->makePublic()) { + decision.make(ReuseDecision::doNotCacheButShare, "public key creation error"); + entry->makePrivate(true); + } break; case ReuseDecision::cacheNegatively: - entry->cacheNegatively(); + if (!entry->cacheNegatively()) { + decision.make(ReuseDecision::doNotCacheButShare, "public key creation error"); + entry->makePrivate(true); + } break; case ReuseDecision::doNotCacheButShare: diff --git a/src/ipc/StoreMap.cc b/src/ipc/StoreMap.cc index f7320d435a..e474e45878 100644 --- a/src/ipc/StoreMap.cc +++ b/src/ipc/StoreMap.cc @@ -12,6 +12,7 @@ #include "ipc/StoreMap.h" #include "sbuf/SBuf.h" #include "Store.h" +#include "store/Controller.h" #include "store_key_md5.h" #include "tools.h" @@ -212,6 +213,7 @@ Ipc::StoreMap::abortWriting(const sfileno fileno) debugs(54, 5, "closed clean entry " << fileno << " for writing " << path); } else { s.waitingToBeFreed = true; + s.writerHalted = true; s.lock.unlockExclusive(); debugs(54, 5, "closed dirty entry " << fileno << " for writing " << path); } @@ -253,17 +255,22 @@ Ipc::StoreMap::peekAtEntry(const sfileno fileno) const return anchorAt(fileno); } -void +bool Ipc::StoreMap::freeEntry(const sfileno fileno) { debugs(54, 5, "marking entry " << fileno << " to be freed in " << path); Anchor &s = anchorAt(fileno); - if (s.lock.lockExclusive()) + if (s.lock.lockExclusive()) { + const bool result = !s.waitingToBeFreed && !s.empty(); freeChain(fileno, s, false); - else - s.waitingToBeFreed = true; // mark to free it later + return result; + } + + uint8_t expected = false; + // mark to free the locked entry later (if not already marked) + return s.waitingToBeFreed.compare_exchange_strong(expected, true); } void @@ -290,6 +297,25 @@ Ipc::StoreMap::freeEntryByKey(const cache_key *const key) } } +bool +Ipc::StoreMap::markedForDeletion(const cache_key *const key) +{ + const int idx = fileNoByKey(key); + const Anchor &s = anchorAt(idx); + return s.sameKey(key) ? bool(s.waitingToBeFreed) : false; +} + +bool +Ipc::StoreMap::hasReadableEntry(const cache_key *const key) +{ + sfileno index; + if (openForReading(reinterpret_cast(key), index)) { + closeForReading(index); + return true; + } + return false; +} + /// unconditionally frees an already locked chain of slots, unlocking if needed void Ipc::StoreMap::freeChain(const sfileno fileno, Anchor &inode, const bool keepLocked) @@ -671,6 +697,7 @@ Ipc::StoreMap::anchorAt(const sfileno fileno) const sfileno Ipc::StoreMap::nameByKey(const cache_key *const key) const { + assert(key); const uint64_t *const k = reinterpret_cast(key); // TODO: use a better hash function const int hash = (k[0] + k[1]) % entryLimit(); @@ -734,6 +761,7 @@ void Ipc::StoreMapAnchor::setKey(const cache_key *const aKey) { memcpy(key, aKey, sizeof(key)); + waitingToBeFreed = Store::Root().markedForDeletion(aKey); } bool @@ -744,17 +772,36 @@ Ipc::StoreMapAnchor::sameKey(const cache_key *const aKey) const } void -Ipc::StoreMapAnchor::set(const StoreEntry &from) +Ipc::StoreMapAnchor::set(const StoreEntry &from, const cache_key *aKey) { assert(writing() && !reading()); - memcpy(key, from.key, sizeof(key)); + setKey(reinterpret_cast(aKey ? aKey : from.key)); basics.timestamp = from.timestamp; basics.lastref = from.lastref; basics.expires = from.expires; basics.lastmod = from.lastModified(); basics.swap_file_sz = from.swap_file_sz; basics.refcount = from.refcount; - basics.flags = from.flags; + + // do not copy key bit if we are not using from.key + // TODO: Replace KEY_PRIVATE with a nil StoreEntry::key! + uint16_t cleanFlags = from.flags; + if (aKey) + EBIT_CLR(cleanFlags, KEY_PRIVATE); + basics.flags = cleanFlags; +} + +void +Ipc::StoreMapAnchor::exportInto(StoreEntry &into) const +{ + assert(reading()); + into.timestamp = basics.timestamp; + into.lastref = basics.lastref; + into.expires = basics.expires; + into.lastModified(basics.lastmod); + into.swap_file_sz = basics.swap_file_sz; + into.refcount = basics.refcount; + into.flags = basics.flags; } void @@ -766,6 +813,7 @@ Ipc::StoreMapAnchor::rewind() memset(&key, 0, sizeof(key)); memset(&basics, 0, sizeof(basics)); waitingToBeFreed = false; + writerHalted = false; // but keep the lock } diff --git a/src/ipc/StoreMap.h b/src/ipc/StoreMap.h index b205aad28b..bda1f28fea 100644 --- a/src/ipc/StoreMap.h +++ b/src/ipc/StoreMap.h @@ -56,7 +56,9 @@ public: StoreMapAnchor(); /// store StoreEntry key and basics for an inode slot - void set(const StoreEntry &anEntry); + void set(const StoreEntry &anEntry, const cache_key *aKey = nullptr); + /// load StoreEntry basics that were previously stored with set() + void exportInto(StoreEntry &) const; void setKey(const cache_key *const aKey); bool sameKey(const cache_key *const aKey) const; @@ -74,6 +76,8 @@ public: public: mutable ReadWriteLock lock; ///< protects slot data below std::atomic waitingToBeFreed; ///< may be accessed w/o a lock + /// whether StoreMap::abortWriting() was called for a read-locked entry + std::atomic writerHalted; // fields marked with [app] can be modified when appending-while-reading // fields marked with [update] can be modified when updating-while-reading @@ -248,11 +252,19 @@ public: const Anchor &peekAtEntry(const sfileno fileno) const; /// free the entry if possible or mark it as waiting to be freed if not - void freeEntry(const sfileno fileno); + /// \returns whether the entry was neither empty nor marked + bool freeEntry(const sfileno); /// free the entry if possible or mark it as waiting to be freed if not /// does nothing if we cannot check that the key matches the cached entry void freeEntryByKey(const cache_key *const key); + /// whether the entry with the given key exists and was marked as + /// "waiting to be freed" some time ago + bool markedForDeletion(const cache_key *const); + + /// whether the index contains a valid readable entry with the given key + bool hasReadableEntry(const cache_key *const); + /// opens entry (identified by key) for reading, increments read level const Anchor *openForReading(const cache_key *const key, sfileno &fileno); /// opens entry (identified by sfileno) for reading, increments read level diff --git a/src/mime.cc b/src/mime.cc index da86073fe9..0e9c3a4ad6 100644 --- a/src/mime.cc +++ b/src/mime.cc @@ -19,7 +19,6 @@ #include "MemBuf.h" #include "MemObject.h" #include "mime.h" -#include "RequestFlags.h" #include "SquidConfig.h" #include "Store.h" #include "StoreClient.h" @@ -394,18 +393,19 @@ MimeIcon::created(StoreEntry *newEntry) status = Http::scNoContent; } + StoreEntry *e = storeCreatePureEntry(url_, url_, Http::METHOD_GET); + e->lock("MimeIcon::created"); + EBIT_SET(e->flags, ENTRY_SPECIAL); + const auto madePublic = e->setPublicKey(); + assert(madePublic); // nothing can block ENTRY_SPECIAL from becoming public + + /* fill `e` with a canned 2xx response object */ + const MasterXaction::Pointer mx = new MasterXaction(XactionInitiator::initIcon); HttpRequestPointer r(HttpRequest::FromUrl(url_, mx)); if (!r) fatalf("mimeLoadIcon: cannot parse internal URL: %s", url_); - // fill newEntry with a canned 2xx response object - RequestFlags flags; - flags.cachable = true; - StoreEntry *e = storeCreateEntry(url_,url_,flags,Http::METHOD_GET); - assert(e != NULL); - EBIT_SET(e->flags, ENTRY_SPECIAL); - e->setPublicKey(); e->buffer(); e->mem_obj->request = r; diff --git a/src/neighbors.cc b/src/neighbors.cc index 92f9fdbc60..c80902801a 100644 --- a/src/neighbors.cc +++ b/src/neighbors.cc @@ -593,7 +593,7 @@ neighborsUdpPing(HttpRequest * request, if (Config.peers == NULL) return 0; - assert(entry->swap_status == SWAPOUT_NONE); + assert(!entry->hasDisk()); mem->start_ping = current_time; @@ -984,7 +984,7 @@ neighborsUdpAck(const cache_key * key, icp_common_t * header, const Ip::Address debugs(15, 6, "neighborsUdpAck: opcode " << opcode << " '" << storeKeyText(key) << "'"); - if (NULL != (entry = Store::Root().get(key))) + if ((entry = Store::Root().findCallback(key))) mem = entry->mem_obj; if ((p = whichPeer(from))) @@ -1694,7 +1694,7 @@ dump_peers(StoreEntry * sentry, CachePeer * peers) void neighborsHtcpReply(const cache_key * key, HtcpReplyData * htcp, const Ip::Address &from) { - StoreEntry *e = Store::Root().get(key); + StoreEntry *e = Store::Root().findCallback(key); MemObject *mem = NULL; CachePeer *p; peer_t ntype = PEER_NONE; diff --git a/src/peer_digest.cc b/src/peer_digest.cc index a8d9093f7a..70d682df8b 100644 --- a/src/peer_digest.cc +++ b/src/peer_digest.cc @@ -305,7 +305,6 @@ peerDigestRequest(PeerDigest * pd) CachePeer *p = pd->peer; StoreEntry *e, *old_e; char *url = NULL; - const cache_key *key; HttpRequest *req; StoreIOBuffer tempBuffer; @@ -318,16 +317,13 @@ peerDigestRequest(PeerDigest * pd) url = xstrdup(p->digest_url); else url = xstrdup(internalRemoteUri(p->host, p->http_port, "/squid-internal-periodic/", SBuf(StoreDigestFileName))); + debugs(72, 2, url); const MasterXaction::Pointer mx = new MasterXaction(XactionInitiator::initCacheDigest); req = HttpRequest::FromUrl(url, mx); assert(req); - key = storeKeyPublicByRequest(req); - - debugs(72, 2, "peerDigestRequest: " << url << " key: " << storeKeyText(key)); - /* add custom headers */ assert(!req->header.len); @@ -351,13 +347,13 @@ peerDigestRequest(PeerDigest * pd) pd_last_req_time = squid_curtime; req->flags.cachable = true; - /* the rest is based on clientProcessExpired() */ + /* the rest is based on clientReplyContext::processExpired() */ req->flags.refresh = true; - old_e = fetch->old_entry = Store::Root().get(key); + old_e = fetch->old_entry = storeGetPublicByRequest(req); if (old_e) { - debugs(72, 5, "peerDigestRequest: found old entry"); + debugs(72, 5, "found old " << *old_e); old_e->lock("peerDigestRequest"); old_e->ensureMemObject(url, url, req->method); @@ -366,6 +362,7 @@ peerDigestRequest(PeerDigest * pd) } e = fetch->entry = storeCreateEntry(url, url, req->flags, req->method); + debugs(72, 5, "created " << *e); assert(EBIT_TEST(e->flags, KEY_PRIVATE)); fetch->sc = storeClientListAdd(e, fetch); /* set lastmod to trigger IMS request if possible */ @@ -374,8 +371,6 @@ peerDigestRequest(PeerDigest * pd) e->lastModified(old_e->lastModified()); /* push towards peer cache */ - debugs(72, 3, "peerDigestRequest: forwarding to fwdStart..."); - FwdState::fwdStart(Comm::ConnectionPointer(), e, req); tempBuffer.offset = 0; diff --git a/src/store.cc b/src/store.cc index 2c8c9fc7be..f07bd99c39 100644 --- a/src/store.cc +++ b/src/store.cc @@ -9,6 +9,7 @@ /* DEBUG: section 20 Storage Manager */ #include "squid.h" +#include "base/TextException.h" #include "CacheDigest.h" #include "CacheManager.h" #include "comm/Connection.h" @@ -138,35 +139,36 @@ StoreEntry::operator delete (void *address) pool->freeOne(address); } -void +bool StoreEntry::makePublic(const KeyScope scope) { /* This object can be cached for a long time */ - if (!EBIT_TEST(flags, RELEASE_REQUEST)) - setPublicKey(scope); + return !EBIT_TEST(flags, RELEASE_REQUEST) && setPublicKey(scope); } void StoreEntry::makePrivate(const bool shareable) { - /* This object should never be cached at all */ - expireNow(); releaseRequest(shareable); /* delete object when not used */ } void StoreEntry::clearPrivate() { + assert(!EBIT_TEST(flags, RELEASE_REQUEST)); EBIT_CLR(flags, KEY_PRIVATE); shareableWhenPrivate = false; } -void +bool StoreEntry::cacheNegatively() { /* This object may be negatively cached */ - negativeCache(); - makePublic(); + if (makePublic()) { + negativeCache(); + return true; + } + return false; } size_t @@ -282,7 +284,7 @@ StoreEntry::storeClientType() const /* the object has completed. */ if (mem_obj->inmem_lo == 0 && !isEmpty()) { - if (swap_status == SWAPOUT_DONE) { + if (swappedOut()) { debugs(20,7, HERE << mem_obj << " lo: " << mem_obj->inmem_lo << " hi: " << mem_obj->endOffset() << " size: " << mem_obj->object_sz); if (mem_obj->endOffset() == mem_obj->object_sz) { /* hot object fully swapped in (XXX: or swapped out?) */ @@ -369,15 +371,15 @@ StoreEntry::kickProducer() void StoreEntry::destroyMemObject() { - debugs(20, 3, HERE << "destroyMemObject " << mem_obj); + debugs(20, 3, mem_obj << " in " << *this); - if (MemObject *mem = mem_obj) { - // Store::Root() is FATALly missing during shutdown - if (mem->xitTable.index >= 0 && !shutting_down) - Store::Root().transientsDisconnect(*mem); - if (mem->memCache.index >= 0 && !shutting_down) - Store::Root().memoryDisconnect(*this); + // Store::Root() is FATALly missing during shutdown + if (hasTransients() && !shutting_down) + Store::Root().transientsDisconnect(*this); + if (hasMemStore() && !shutting_down) + Store::Root().memoryDisconnect(*this); + if (MemObject *mem = mem_obj) { setMemStatus(NOT_IN_MEMORY); mem_obj = NULL; delete mem; @@ -395,7 +397,7 @@ destroyStoreEntry(void *data) return; // Store::Root() is FATALly missing during shutdown - if (e->swap_filen >= 0 && !shutting_down) + if (e->hasDisk() && !shutting_down) e->disk().disconnect(*e); e->destroyMemObject(); @@ -413,6 +415,7 @@ void StoreEntry::hashInsert(const cache_key * someKey) { debugs(20, 3, "StoreEntry::hashInsert: Inserting Entry " << *this << " key '" << storeKeyText(someKey) << "'"); + assert(!key); key = storeKeyDup(someKey); hash_join(store_table, this); } @@ -429,21 +432,6 @@ StoreEntry::hashDelete() /* -------------------------------------------------------------------------- */ -/* get rid of memory copy of the object */ -void -StoreEntry::purgeMem() -{ - if (mem_obj == NULL) - return; - - debugs(20, 3, "StoreEntry::purgeMem: Freeing memory-copy of " << getMD5Text()); - - Store::Root().memoryUnlink(*this); - - if (swap_status != SWAPOUT_DONE) - release(); -} - void StoreEntry::lock(const char *context) { @@ -457,28 +445,16 @@ StoreEntry::touch() lastref = squid_curtime; } -void -StoreEntry::setReleaseFlag() -{ - if (EBIT_TEST(flags, RELEASE_REQUEST)) - return; - - debugs(20, 3, "StoreEntry::setReleaseFlag: '" << getMD5Text() << "'"); - - EBIT_SET(flags, RELEASE_REQUEST); - - Store::Root().markForUnlink(*this); -} - void StoreEntry::releaseRequest(const bool shareable) { + debugs(20, 3, shareable << ' ' << *this); + if (!shareable) + shareableWhenPrivate = false; // may already be false if (EBIT_TEST(flags, RELEASE_REQUEST)) return; - setReleaseFlag(); // makes validToSend() false, preventing future hits - - setPrivateKey(shareable); + setPrivateKey(shareable, true); } int @@ -492,21 +468,31 @@ StoreEntry::unlock(const char *context) if (lock_count) return (int) lock_count; - if (store_status == STORE_PENDING) - setReleaseFlag(); + abandon(context); + return 0; +} +/// keep the unlocked StoreEntry object in the local store_table (if needed) or +/// delete it (otherwise) +void +StoreEntry::doAbandon(const char *context) +{ + debugs(20, 5, *this << " via " << (context ? context : "somebody")); + assert(!locked()); assert(storePendingNClients(this) == 0); - if (EBIT_TEST(flags, RELEASE_REQUEST)) { + // Both aborted local writers and aborted local readers (of remote writers) + // are STORE_PENDING, but aborted readers should never release(). + if (EBIT_TEST(flags, RELEASE_REQUEST) || + (store_status == STORE_PENDING && !Store::Root().transientsReader(*this))) { this->release(); - return 0; + return; } if (EBIT_TEST(flags, KEY_PRIVATE)) debugs(20, DBG_IMPORTANT, "WARNING: " << __FILE__ << ":" << __LINE__ << ": found KEY_PRIVATE"); Store::Root().handleIdleEntry(*this); // may delete us - return 0; } void @@ -548,13 +534,13 @@ StoreEntry::getPublic (StoreClient *aClient, const char *uri, const HttpRequestM StoreEntry * storeGetPublic(const char *uri, const HttpRequestMethod& method) { - return Store::Root().get(storeKeyPublic(uri, method)); + return Store::Root().find(storeKeyPublic(uri, method)); } StoreEntry * storeGetPublicByRequestMethod(HttpRequest * req, const HttpRequestMethod& method, const KeyScope keyScope) { - return Store::Root().get(storeKeyPublicByRequestMethod(req, method, keyScope)); + return Store::Root().find(storeKeyPublicByRequestMethod(req, method, keyScope)); } StoreEntry * @@ -590,22 +576,19 @@ getKeyCounter(void) * concept'. */ void -StoreEntry::setPrivateKey(const bool shareable) +StoreEntry::setPrivateKey(const bool shareable, const bool permanent) { - if (key && EBIT_TEST(flags, KEY_PRIVATE)) { - // The entry is already private, but it may be still shareable. - if (!shareable) - shareableWhenPrivate = false; + debugs(20, 3, shareable << permanent << ' ' << *this); + if (permanent) + EBIT_SET(flags, RELEASE_REQUEST); // may already be set + if (!shareable) + shareableWhenPrivate = false; // may already be false + + if (EBIT_TEST(flags, KEY_PRIVATE)) return; - } if (key) { - setReleaseFlag(); // will markForUnlink(); all caches/workers will know - - // TODO: move into SwapDir::markForUnlink() already called by Root() - if (swap_filen > -1) - storeDirSwapLog(this, SWAP_LOG_DEL); - + Store::Root().evictCached(*this); // all caches/workers will know hashDelete(); } @@ -619,11 +602,12 @@ StoreEntry::setPrivateKey(const bool shareable) hashInsert(newkey); } -void +bool StoreEntry::setPublicKey(const KeyScope scope) { + debugs(20, 3, *this); if (key && !EBIT_TEST(flags, KEY_PRIVATE)) - return; /* is already public */ + return true; // already public assert(mem_obj); @@ -645,8 +629,17 @@ StoreEntry::setPublicKey(const KeyScope scope) assert(!EBIT_TEST(flags, RELEASE_REQUEST)); - adjustVary(); - forcePublicKey(calcPublicKey(scope)); + try { + EntryGuard newVaryMarker(adjustVary(), "setPublicKey+failure"); + const cache_key *pubKey = calcPublicKey(scope); + Store::Root().addWriting(this, pubKey); + forcePublicKey(pubKey); + newVaryMarker.unlockAndReset("setPublicKey+success"); + return true; + } catch (const std::exception &ex) { + debugs(20, 2, "for " << *this << " failed: " << ex.what()); + } + return false; } void @@ -669,14 +662,13 @@ StoreEntry::clearPublicKeyScope() void StoreEntry::forcePublicKey(const cache_key *newkey) { + debugs(20, 3, storeKeyText(newkey) << " for " << *this); + assert(mem_obj); + if (StoreEntry *e2 = (StoreEntry *)hash_lookup(store_table, newkey)) { assert(e2 != this); - debugs(20, 3, "Making old " << *e2 << " private."); - - // TODO: check whether there is any sense in keeping old entry - // shareable here. Leaving it non-shareable for now. - e2->setPrivateKey(false); - e2->release(false); + debugs(20, 3, "releasing clashing " << *e2); + e2->release(true); } if (key) @@ -684,9 +676,10 @@ StoreEntry::forcePublicKey(const cache_key *newkey) clearPrivate(); + assert(mem_obj->hasUris()); hashInsert(newkey); - if (swap_filen > -1) + if (hasDisk()) storeDirSwapLog(this, SWAP_LOG_ADD); } @@ -703,13 +696,15 @@ StoreEntry::calcPublicKey(const KeyScope keyScope) /// Updates mem_obj->request->vary_headers to reflect the current Vary. /// The vary_headers field is used to calculate the Vary marker key. /// Releases the old Vary marker with an outdated key (if any). -void +/// \returns new (locked) Vary marker StoreEntry or, if none was needed, nil +/// \throws std::exception on failures +StoreEntry * StoreEntry::adjustVary() { assert(mem_obj); if (!mem_obj->request) - return; + return nullptr; HttpRequestPointer request(mem_obj->request); @@ -723,7 +718,7 @@ StoreEntry::adjustVary() */ request->vary_headers.clear(); /* free old "bad" variance key */ if (StoreEntry *pe = storeGetPublic(mem_obj->storeId(), mem_obj->method)) - pe->release(); + pe->release(true); } /* Make sure the request knows the variance status */ @@ -735,12 +730,19 @@ StoreEntry::adjustVary() // We should add/use storeHas() API or lock/unlock those entries. if (!mem_obj->vary_headers.isEmpty() && !storeGetPublic(mem_obj->storeId(), mem_obj->method)) { /* Create "vary" base object */ - String vary; StoreEntry *pe = storeCreateEntry(mem_obj->storeId(), mem_obj->logUri(), request->flags, request->method); + // XXX: storeCreateEntry() already tries to make `pe` public under + // certain conditions. If those conditions do not apply to Vary markers, + // then refactor to call storeCreatePureEntry() above. Otherwise, + // refactor to simply check whether `pe` is already public below. + if (!pe->makePublic()) { + pe->unlock("StoreEntry::adjustVary+failed_makePublic"); + throw TexcHere("failed to make Vary marker public"); + } /* We are allowed to do this typecast */ HttpReply *rep = new HttpReply; rep->setHeaders(Http::scOkay, "Internal marker object", "x-squid-internal/vary", -1, -1, squid_curtime + 100000); - vary = mem_obj->getReply()->header.getList(Http::HdrType::VARY); + String vary = mem_obj->getReply()->header.getList(Http::HdrType::VARY); if (vary.size()) { /* Again, we own this structure layout */ @@ -758,22 +760,21 @@ StoreEntry::adjustVary() } #endif - pe->replaceHttpReply(rep, false); // no write until key is public + pe->replaceHttpReply(rep, false); // no write until timestampsSet() pe->timestampsSet(); - pe->makePublic(); - - pe->startWriting(); // after makePublic() + pe->startWriting(); // after timestampsSet() pe->complete(); - pe->unlock("StoreEntry::forcePublicKey+Vary"); + return pe; } + return nullptr; } StoreEntry * -storeCreatePureEntry(const char *url, const char *log_url, const RequestFlags &flags, const HttpRequestMethod& method) +storeCreatePureEntry(const char *url, const char *log_url, const HttpRequestMethod& method) { StoreEntry *e = NULL; debugs(20, 3, "storeCreateEntry: '" << url << "'"); @@ -781,12 +782,6 @@ storeCreatePureEntry(const char *url, const char *log_url, const RequestFlags &f e = new StoreEntry(); e->createMemObject(url, log_url, method); - if (flags.cachable) { - EBIT_CLR(e->flags, RELEASE_REQUEST); - } else { - e->releaseRequest(); - } - e->store_status = STORE_PENDING; e->refcount = 0; e->lastref = squid_curtime; @@ -799,14 +794,13 @@ storeCreatePureEntry(const char *url, const char *log_url, const RequestFlags &f StoreEntry * storeCreateEntry(const char *url, const char *logUrl, const RequestFlags &flags, const HttpRequestMethod& method) { - StoreEntry *e = storeCreatePureEntry(url, logUrl, flags, method); + StoreEntry *e = storeCreatePureEntry(url, logUrl, method); e->lock("storeCreateEntry"); - if (neighbors_do_private_keys || !flags.hierarchical) - e->setPrivateKey(false); - else - e->setPublicKey(); + if (!neighbors_do_private_keys && flags.hierarchical && flags.cachable && e->setPublicKey()) + return e; + e->setPrivateKey(false, !flags.cachable); return e; } @@ -1021,11 +1015,10 @@ StoreEntry::checkCachable() } else if (EBIT_TEST(flags, KEY_PRIVATE)) { debugs(20, 3, "StoreEntry::checkCachable: NO: private key"); ++store_check_cachable_hist.no.private_key; - } else if (swap_status != SWAPOUT_NONE) { + } else if (hasDisk()) { /* - * here we checked the swap_status because the remaining - * cases are only relevant only if we haven't started swapping - * out the object yet. + * the remaining cases are only relevant if we haven't + * started swapping out the object yet. */ return 1; } else if (storeTooManyDiskFilesOpen()) { @@ -1185,44 +1178,8 @@ void storeGetMemSpace(int size) { PROF_start(storeGetMemSpace); - StoreEntry *e = NULL; - int released = 0; - static time_t last_check = 0; - size_t pages_needed; - RemovalPurgeWalker *walker; - - if (squid_curtime == last_check) { - PROF_stop(storeGetMemSpace); - return; - } - - last_check = squid_curtime; - - pages_needed = (size + SM_PAGE_SIZE-1) / SM_PAGE_SIZE; - - if (mem_node::InUseCount() + pages_needed < store_pages_max) { - PROF_stop(storeGetMemSpace); - return; - } - - debugs(20, 2, "storeGetMemSpace: Starting, need " << pages_needed << - " pages"); - - /* XXX what to set as max_scan here? */ - walker = mem_policy->PurgeInit(mem_policy, 100000); - - while ((e = walker->Next(walker))) { - e->purgeMem(); - ++released; - - if (mem_node::InUseCount() + pages_needed < store_pages_max) - break; - } - - walker->Done(walker); - debugs(20, 3, "storeGetMemSpace stats:"); - debugs(20, 3, " " << std::setw(6) << hot_obj_count << " HOT objects"); - debugs(20, 3, " " << std::setw(6) << released << " were released"); + if (!shutting_down) // Store::Root() is FATALly missing during shutdown + Store::Root().freeMemorySpace(size); PROF_stop(storeGetMemSpace); } @@ -1245,44 +1202,32 @@ Store::Maintain(void *) #define MAINTAIN_MAX_SCAN 1024 #define MAINTAIN_MAX_REMOVE 64 -/* release an object from a cache */ void StoreEntry::release(const bool shareable) { PROF_start(storeRelease); - debugs(20, 3, "releasing " << *this << ' ' << getMD5Text()); + debugs(20, 3, shareable << ' ' << *this << ' ' << getMD5Text()); /* If, for any reason we can't discard this object because of an * outstanding request, mark it for pending release */ if (locked()) { - expireNow(); - debugs(20, 3, "storeRelease: Only setting RELEASE_REQUEST bit"); releaseRequest(shareable); PROF_stop(storeRelease); return; } - if (Store::Controller::store_dirs_rebuilding && swap_filen > -1) { + if (Store::Controller::store_dirs_rebuilding && hasDisk()) { /* TODO: Teach disk stores to handle releases during rebuild instead. */ - Store::Root().memoryUnlink(*this); - - setPrivateKey(shareable); - // lock the entry until rebuilding is done lock("storeLateRelease"); - setReleaseFlag(); + releaseRequest(shareable); LateReleaseStack.push(this); return; } storeLog(STORE_LOG_RELEASE, this); - if (swap_filen > -1 && !EBIT_TEST(flags, KEY_PRIVATE)) { - // log before unlink() below clears swap_filen - storeDirSwapLog(this, SWAP_LOG_DEL); - } - - Store::Root().unlink(*this); + Store::Root().evictCached(*this); destroyStoreEntry(static_cast(this)); PROF_stop(storeRelease); } @@ -1421,7 +1366,7 @@ StoreEntry::memoryCachable() if (mem_obj->inmem_lo != 0) return 0; - if (!Config.onoff.memory_cache_first && swap_status == SWAPOUT_DONE && refcount == 1) + if (!Config.onoff.memory_cache_first && swappedOut() && refcount == 1) return 0; return 1; @@ -1495,7 +1440,7 @@ StoreEntry::validToSend() const return 0; // now check that the entry has a cache backing or is collapsed - if (swap_filen > -1) // backed by a disk cache + if (hasDisk()) // backed by a disk cache return 1; if (swappingOut()) // will be backed by a disk cache @@ -1841,7 +1786,7 @@ StoreEntry::storeErrorResponse(HttpReply *reply) flush(); complete(); negativeCache(); - releaseRequest(); + releaseRequest(false); // if it is safe to negatively cache, sharing is OK unlock("StoreEntry::storeErrorResponse"); } @@ -1908,13 +1853,13 @@ StoreEntry::getSerialisedMetaData() void StoreEntry::transientsAbandonmentCheck() { - if (mem_obj && !mem_obj->smpCollapsed && // this worker is responsible - mem_obj->xitTable.index >= 0 && // other workers may be interested - mem_obj->memCache.index < 0 && // rejected by the shared memory cache + if (mem_obj && !Store::Root().transientsReader(*this) && // this worker is responsible + hasTransients() && // other workers may be interested + !hasMemStore() && // rejected by the shared memory cache mem_obj->swapout.decision == MemObject::SwapOut::swImpossible) { debugs(20, 7, "cannot be shared: " << *this); if (!shutting_down) // Store::Root() is FATALly missing during shutdown - Store::Root().transientsAbandon(*this); + Store::Root().stopSharing(*this); } } @@ -2053,12 +1998,58 @@ StoreEntry::hasOneOfEtags(const String &reqETags, const bool allowWeakMatch) con Store::Disk & StoreEntry::disk() const { - assert(0 <= swap_dirn && swap_dirn < Config.cacheSwap.n_configured); + assert(hasDisk()); const RefCount &sd = INDEXSD(swap_dirn); assert(sd); return *sd; } +bool +StoreEntry::hasDisk(const sdirno dirn, const sfileno filen) const +{ + checkDisk(); + if (dirn < 0 && filen < 0) + return swap_dirn >= 0; + Must(dirn >= 0); + const bool matchingDisk = (swap_dirn == dirn); + return filen < 0 ? matchingDisk : (matchingDisk && swap_filen == filen); +} + +void +StoreEntry::attachToDisk(const sdirno dirn, const sfileno fno, const swap_status_t status) +{ + debugs(88, 3, "attaching entry with key " << getMD5Text() << " : " << + swapStatusStr[status] << " " << dirn << " " << + std::hex << std::setw(8) << std::setfill('0') << + std::uppercase << fno); + checkDisk(); + swap_dirn = dirn; + swap_filen = fno; + swap_status = status; + checkDisk(); +} + +void +StoreEntry::detachFromDisk() +{ + swap_dirn = -1; + swap_filen = -1; + swap_status = SWAPOUT_NONE; +} + +void +StoreEntry::checkDisk() const +{ + const bool ok = (swap_dirn < 0) == (swap_filen < 0) && + (swap_dirn < 0) == (swap_status == SWAPOUT_NONE) && + (swap_dirn < 0 || swap_dirn < Config.cacheSwap.n_configured); + + if (!ok) { + debugs(88, DBG_IMPORTANT, "ERROR: inconsistent disk entry state " << *this); + throw std::runtime_error("inconsistent disk entry state "); + } +} + /* * return true if the entry is in a state where * it can accept more data (ie with write() method) @@ -2087,16 +2078,41 @@ StoreEntry::describeTimestamps() const return buf; } +static std::ostream & +operator <<(std::ostream &os, const Store::IoStatus &io) +{ + switch (io) { + case Store::ioUndecided: + os << 'u'; + break; + case Store::ioReading: + os << 'r'; + break; + case Store::ioWriting: + os << 'w'; + break; + case Store::ioDone: + os << 'o'; + break; + } + return os; +} + std::ostream &operator <<(std::ostream &os, const StoreEntry &e) { os << "e:"; - if (e.mem_obj) { - if (e.mem_obj->xitTable.index > -1) - os << 't' << e.mem_obj->xitTable.index; - if (e.mem_obj->memCache.index > -1) - os << 'm' << e.mem_obj->memCache.index; + if (e.hasTransients()) { + const auto &xitTable = e.mem_obj->xitTable; + os << 't' << xitTable.io << xitTable.index; + } + + if (e.hasMemStore()) { + const auto &memCache = e.mem_obj->memCache; + os << 'm' << memCache.io << memCache.index << '@' << memCache.offset; } + + // Do not use e.hasDisk() here because its checkDisk() call may calls us. if (e.swap_filen > -1 || e.swap_dirn > -1) os << 'd' << e.swap_filen << '@' << e.swap_dirn; @@ -2134,9 +2150,6 @@ std::ostream &operator <<(std::ostream &os, const StoreEntry &e) if (EBIT_TEST(e.flags, ENTRY_ABORTED)) os << 'A'; } - if (e.mem_obj && e.mem_obj->smpCollapsed) - os << 'O'; - return os << '/' << &e << '*' << e.locks(); } @@ -2168,3 +2181,12 @@ NullStoreEntry::getSerialisedMetaData() return NULL; } +void +Store::EntryGuard::onException() noexcept +{ + SWALLOW_EXCEPTIONS({ + entry_->releaseRequest(false); + entry_->unlock(context_); + }); +} + diff --git a/src/store/Controlled.h b/src/store/Controlled.h index 1c3a8cf995..184d271f7c 100644 --- a/src/store/Controlled.h +++ b/src/store/Controlled.h @@ -18,6 +18,12 @@ namespace Store { class Controlled: public Storage { public: + /// \returns a possibly unlocked/unregistered stored entry with key (or nil) + /// The returned entry might not match the caller's Store ID or method. The + /// caller must abandon()/release() the entry or register it with Root(). + /// This method must not trigger slow I/O operations (e.g., disk swap in). + virtual StoreEntry *get(const cache_key *) = 0; + /// somebody needs this entry (many cache replacement policies need to know) virtual void reference(StoreEntry &e) = 0; @@ -28,15 +34,15 @@ public: /// make stored metadata and HTTP headers the same as in the given entry virtual void updateHeaders(StoreEntry *) {} - /// If this storage cannot cache collapsed entries, return false. + /// If Transients entry cannot be attached to this storage, return false. /// If the entry is not found, return false. Otherwise, return true after - /// tying the entry to this cache and setting inSync to updateCollapsed(). - virtual bool anchorCollapsed(StoreEntry &, bool &/*inSync*/) { return false; } + /// tying the entry to this cache and setting inSync to updateAnchored(). + virtual bool anchorToCache(StoreEntry &, bool &/*inSync*/) { return false; } - /// Update a local collapsed entry with fresh info from this cache (if any). - /// Return true iff the cache supports collapsed entries and - /// the given local collapsed entry is now in sync with this storage. - virtual bool updateCollapsed(StoreEntry &) { return false; } + /// Update a local Transients entry with fresh info from this cache (if any). + /// Return true iff the cache supports Transients entries and + /// the given local Transients entry is now in sync with this storage. + virtual bool updateAnchored(StoreEntry &) { return false; } }; } // namespace Store diff --git a/src/store/Controller.cc b/src/store/Controller.cc index fe79f8400f..0affe2edb7 100644 --- a/src/store/Controller.cc +++ b/src/store/Controller.cc @@ -242,7 +242,7 @@ Store::Controller::referenceBusy(StoreEntry &e) /* Notify the fs that we're referencing this object again */ - if (e.swap_dirn > -1) + if (e.hasDisk()) swapDir->reference(e); // Notify the memory cache that we're referencing this object again @@ -267,7 +267,7 @@ Store::Controller::dereferenceIdle(StoreEntry &e, bool wantsLocalMemory) /* Notify the fs that we're not referencing this object any more */ - if (e.swap_filen > -1) + if (e.hasDisk()) keepInStoreTable = swapDir->dereference(e) || keepInStoreTable; // Notify the memory cache that we're not referencing this object any more @@ -286,29 +286,107 @@ Store::Controller::dereferenceIdle(StoreEntry &e, bool wantsLocalMemory) return keepInStoreTable; } +bool +Store::Controller::markedForDeletion(const cache_key *key) const +{ + // assuming a public key, checking Transients should cover all cases. + return transients && transients->markedForDeletion(key); +} + +bool +Store::Controller::markedForDeletionAndAbandoned(const StoreEntry &e) const +{ + // The opposite check order could miss a reader that has arrived after the + // !readers() and before the markedForDeletion() check. + return markedForDeletion(reinterpret_cast(e.key)) && + transients && !transients->readers(e); +} + +bool +Store::Controller::hasReadableDiskEntry(const StoreEntry &e) const +{ + return swapDir->hasReadableEntry(e); +} + StoreEntry * -Store::Controller::get(const cache_key *key) +Store::Controller::find(const cache_key *key) { - if (StoreEntry *e = find(key)) { - // this is not very precise: some get()s are not initiated by clients - e->touch(); - referenceBusy(*e); - return e; + if (const auto entry = peek(key)) { + try { + if (!entry->key) + allowSharing(*entry, key); + checkTransients(*entry); + entry->touch(); + referenceBusy(*entry); + return entry; + } catch (const std::exception &ex) { + debugs(20, 2, "failed with " << *entry << ": " << ex.what()); + entry->release(true); + // fall through + } } return NULL; } -/// Internal method to implements the guts of the Store::get() API: -/// returns an in-transit or cached object with a given key, if any. +/// indexes and adds SMP-tracking for an ephemeral peek() result +void +Store::Controller::allowSharing(StoreEntry &entry, const cache_key *key) +{ + // TODO: refactor to throw on anchorToCache() inSync errors! + + // anchorToCache() below and many find() callers expect a registered entry + addReading(&entry, key); + + if (entry.hasTransients()) { + bool inSync = false; + const bool found = anchorToCache(entry, inSync); + if (found && !inSync) + throw TexcHere("cannot sync"); + } +} + StoreEntry * -Store::Controller::find(const cache_key *key) +Store::Controller::findCallback(const cache_key *key) { - debugs(20, 3, storeKeyText(key)); + // We could check for mem_obj presence (and more), moving and merging some + // of the duplicated neighborsUdpAck() and neighborsHtcpReply() code here, + // but that would mean polluting Store with HTCP/ICP code. Instead, we + // should encapsulate callback-related data in a protocol-neutral MemObject + // member or use an HTCP/ICP-specific index rather than store_table. + return peekAtLocal(key); +} +/// \returns either an existing local reusable StoreEntry object or nil +/// To treat remotely marked entries specially, +/// callers ought to check markedForDeletion() first! +StoreEntry * +Store::Controller::peekAtLocal(const cache_key *key) +{ if (StoreEntry *e = static_cast(hash_lookup(store_table, key))) { + // callers must only search for public entries + assert(!EBIT_TEST(e->flags, KEY_PRIVATE)); + assert(e->publicKey()); + checkTransients(*e); + // TODO: ignore and maybe handleIdleEntry() unlocked intransit entries // because their backing store slot may be gone already. - debugs(20, 3, HERE << "got in-transit entry: " << *e); + return e; + } + return nullptr; +} + +StoreEntry * +Store::Controller::peek(const cache_key *key) +{ + debugs(20, 3, storeKeyText(key)); + + if (markedForDeletion(key)) { + debugs(20, 3, "ignoring marked in-transit " << storeKeyText(key)); + return nullptr; + } + + if (StoreEntry *e = peekAtLocal(key)) { + debugs(20, 3, "got local in-transit entry: " << *e); return e; } @@ -316,13 +394,7 @@ Store::Controller::find(const cache_key *key) if (transients) { if (StoreEntry *e = transients->get(key)) { debugs(20, 3, "got shared in-transit entry: " << *e); - bool inSync = false; - const bool found = anchorCollapsed(*e, inSync); - if (!found || inSync) - return e; - assert(!e->locked()); // ensure release will destroyStoreEntry() - e->release(); // do not let others into the same trap - return NULL; + return e; } } @@ -344,6 +416,18 @@ Store::Controller::find(const cache_key *key) return nullptr; } +bool +Store::Controller::transientsReader(const StoreEntry &e) const +{ + return transients && e.hasTransients() && transients->isReader(e); +} + +bool +Store::Controller::transientsWriter(const StoreEntry &e) const +{ + return transients && e.hasTransients() && transients->isWriter(e); +} + int64_t Store::Controller::accumulateMore(StoreEntry &entry) const { @@ -351,23 +435,87 @@ Store::Controller::accumulateMore(StoreEntry &entry) const // The memory cache should not influence for-swapout accumulation decision. } +// Must be called from StoreEntry::release() or releaseRequest() because +// those methods currently manage local indexing of StoreEntry objects. +// TODO: Replace StoreEntry::release*() with Root().evictCached(). +void +Store::Controller::evictCached(StoreEntry &e) +{ + debugs(20, 7, e); + if (transients) + transients->evictCached(e); + memoryEvictCached(e); + if (swapDir) + swapDir->evictCached(e); +} + void -Store::Controller::markForUnlink(StoreEntry &e) +Store::Controller::evictIfFound(const cache_key *key) +{ + debugs(20, 7, storeKeyText(key)); + + if (StoreEntry *entry = peekAtLocal(key)) { + debugs(20, 5, "marking local in-transit " << *entry); + entry->release(true); + return; + } + + if (memStore) + memStore->evictIfFound(key); + if (swapDir) + swapDir->evictIfFound(key); + if (transients) + transients->evictIfFound(key); +} + +/// whether the memory cache is allowed to store that many additional pages +bool +Store::Controller::memoryCacheHasSpaceFor(const int pagesRequired) const { - if (transients && e.mem_obj && e.mem_obj->xitTable.index >= 0) - transients->markForUnlink(e); - if (memStore && e.mem_obj && e.mem_obj->memCache.index >= 0) - memStore->markForUnlink(e); - if (swapDir && e.swap_filen >= 0) - swapDir->markForUnlink(e); + // XXX: We count mem_nodes but may free shared memory pages instead. + const auto fits = mem_node::InUseCount() + pagesRequired <= store_pages_max; + debugs(20, 7, fits << ": " << mem_node::InUseCount() << '+' << pagesRequired << '?' << store_pages_max); + return fits; } void -Store::Controller::unlink(StoreEntry &e) +Store::Controller::freeMemorySpace(const int bytesRequired) { - memoryUnlink(e); - if (swapDir && e.swap_filen >= 0) - swapDir->unlink(e); + const auto pagesRequired = (bytesRequired + SM_PAGE_SIZE-1) / SM_PAGE_SIZE; + + if (memoryCacheHasSpaceFor(pagesRequired)) + return; + + // XXX: When store_pages_max is smaller than pagesRequired, we should not + // look for more space (but we do because we want to abandon idle entries?). + + // limit our performance impact to one walk per second + static time_t lastWalk = 0; + if (lastWalk == squid_curtime) + return; + lastWalk = squid_curtime; + + debugs(20, 2, "need " << pagesRequired << " pages"); + + // let abandon()/handleIdleEntry() know about the impeding memory shortage + memoryPagesDebt_ = pagesRequired; + + // XXX: SMP-unaware: Walkers should iterate memory cache, not store_table. + // XXX: Limit iterations by time, not arbitrary count. + const auto walker = mem_policy->PurgeInit(mem_policy, 100000); + int removed = 0; + while (const auto entry = walker->Next(walker)) { + // Abandoned memory cache entries are purged during memory shortage. + entry->abandon(__FUNCTION__); // may delete entry + ++removed; + + if (memoryCacheHasSpaceFor(pagesRequired)) + break; + } + // TODO: Move to RemovalPolicyWalker::Done() that has more/better details. + debugs(20, 3, "removed " << removed << " out of " << hot_obj_count << " memory-cached entries"); + walker->Done(walker); + memoryPagesDebt_ = 0; } // move this into [non-shared] memory cache class when we have one @@ -404,13 +552,17 @@ Store::Controller::memoryOut(StoreEntry &e, const bool preserveSwappable) e.trimMemory(preserveSwappable); } +/// removes the entry from the memory cache +/// XXX: Dangerous side effect: Unlocked entries lose their mem_obj. void -Store::Controller::memoryUnlink(StoreEntry &e) +Store::Controller::memoryEvictCached(StoreEntry &e) { + // TODO: Untangle memory caching from mem_obj. if (memStore) - memStore->unlink(e); + memStore->evictCached(e); else // TODO: move into [non-shared] memory cache class when we have one - e.destroyMemObject(); + if (!e.locked()) + e.destroyMemObject(); } void @@ -422,37 +574,38 @@ Store::Controller::memoryDisconnect(StoreEntry &e) } void -Store::Controller::transientsAbandon(StoreEntry &e) +Store::Controller::stopSharing(StoreEntry &e) { - if (transients) { - assert(e.mem_obj); - if (e.mem_obj->xitTable.index >= 0) - transients->abandon(e); - } + // Marking the transients entry is sufficient to prevent new readers from + // starting to wait for `e` updates and to inform the current readers (and, + // hence, Broadcast() recipients) about the underlying Store problems. + if (transients && e.hasTransients()) + transients->evictCached(e); } void Store::Controller::transientsCompleteWriting(StoreEntry &e) { - if (transients) { - assert(e.mem_obj); - if (e.mem_obj->xitTable.index >= 0) - transients->completeWriting(e); - } + // transients->isWriter(e) is false if `e` is writing to its second store + // after finishing writing to its first store: At the end of the first swap + // out, the transients writer becomes a reader and (XXX) we never switch + // back to writing, even if we start swapping out again (to another store). + if (transients && e.hasTransients() && transients->isWriter(e)) + transients->completeWriting(e); } int Store::Controller::transientReaders(const StoreEntry &e) const { - return (transients && e.mem_obj && e.mem_obj->xitTable.index >= 0) ? + return (transients && e.hasTransients()) ? transients->readers(e) : 0; } void -Store::Controller::transientsDisconnect(MemObject &mem_obj) +Store::Controller::transientsDisconnect(StoreEntry &e) { if (transients) - transients->disconnect(mem_obj); + transients->disconnect(e); } void @@ -470,7 +623,7 @@ Store::Controller::handleIdleEntry(StoreEntry &e) } else { keepInLocalMemory = keepForLocalMemoryCache(e) && // in good shape and // the local memory cache is not overflowing - (mem_node::InUseCount() <= store_pages_max); + memoryCacheHasSpaceFor(memoryPagesDebt_); } // An idle, unlocked entry that only belongs to a SwapDir which controls @@ -487,9 +640,18 @@ Store::Controller::handleIdleEntry(StoreEntry &e) if (keepInLocalMemory) { e.setMemStatus(IN_MEMORY); e.mem_obj->unlinkRequest(); - } else { - e.purgeMem(); // may free e + return; + } + + // We know the in-memory data will be gone. Get rid of the entire entry if + // it has nothing worth preserving on disk either. + if (!e.swappedOut()) { + e.release(); // deletes e + return; } + + memoryEvictCached(e); // may already be gone + // and keep the entry in store_table for its on-disk data } void @@ -509,20 +671,41 @@ Store::Controller::updateOnNotModified(StoreEntry *old, const StoreEntry &newer) if (memStore && old->mem_status == IN_MEMORY && !EBIT_TEST(old->flags, ENTRY_SPECIAL)) memStore->updateHeaders(old); - if (old->swap_dirn > -1) + if (old->hasDisk()) swapDir->updateHeaders(old); } -void +bool Store::Controller::allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod) { const KeyScope keyScope = reqFlags.refresh ? ksRevalidation : ksDefault; - e->makePublic(keyScope); // this is needed for both local and SMP collapsing + if (e->makePublic(keyScope)) { // this is needed for both local and SMP collapsing + debugs(20, 3, "may " << (transients && e->hasTransients() ? + "SMP-" : "locally-") << "collapse " << *e); + return true; + } + return false; +} + +void +Store::Controller::addReading(StoreEntry *e, const cache_key *key) +{ + if (transients) + transients->monitorIo(e, key, Store::ioReading); + e->hashInsert(key); +} + +void +Store::Controller::addWriting(StoreEntry *e, const cache_key *key) +{ + assert(e); + if (EBIT_TEST(e->flags, ENTRY_SPECIAL)) + return; // constant memory-resident entries do not need transients + if (transients) - transients->startWriting(e, reqFlags, reqMethod); - debugs(20, 3, "may " << (transients && e->mem_obj->xitTable.index >= 0 ? - "SMP-" : "locally-") << "collapse " << *e); + transients->monitorIo(e, key, Store::ioWriting); + // else: non-SMP configurations do not need transients } void @@ -531,34 +714,66 @@ Store::Controller::syncCollapsed(const sfileno xitIndex) assert(transients); StoreEntry *collapsed = transients->findCollapsed(xitIndex); - if (!collapsed) { // the entry is no longer locally active, ignore update + if (!collapsed) { // the entry is no longer active, ignore update debugs(20, 7, "not SMP-syncing not-transient " << xitIndex); return; } + + if (!collapsed->locked()) { + debugs(20, 3, "skipping (and may destroy) unlocked " << *collapsed); + handleIdleEntry(*collapsed); + return; + } + assert(collapsed->mem_obj); - assert(collapsed->mem_obj->smpCollapsed); + + if (EBIT_TEST(collapsed->flags, ENTRY_ABORTED)) { + debugs(20, 3, "skipping already aborted " << *collapsed); + return; + } debugs(20, 7, "syncing " << *collapsed); - bool abandoned = transients->abandoned(*collapsed); + bool abortedByWriter = false; + bool waitingToBeFreed = false; + transients->status(*collapsed, abortedByWriter, waitingToBeFreed); + + if (waitingToBeFreed) { + debugs(20, 3, "will release " << *collapsed << " due to waitingToBeFreed"); + collapsed->release(true); // may already be marked + } + + if (transients->isWriter(*collapsed)) + return; // readers can only change our waitingToBeFreed flag + + assert(transients->isReader(*collapsed)); + + if (abortedByWriter) { + debugs(20, 3, "aborting " << *collapsed << " because its writer has aborted"); + collapsed->abort(); + return; + } + bool found = false; bool inSync = false; if (memStore && collapsed->mem_obj->memCache.io == MemObject::ioDone) { found = true; inSync = true; debugs(20, 7, "fully mem-loaded " << *collapsed); - } else if (memStore && collapsed->mem_obj->memCache.index >= 0) { + } else if (memStore && collapsed->hasMemStore()) { found = true; - inSync = memStore->updateCollapsed(*collapsed); - } else if (swapDir && collapsed->swap_filen >= 0) { + inSync = memStore->updateAnchored(*collapsed); + // TODO: handle entries attached to both memory and disk + } else if (swapDir && collapsed->hasDisk()) { found = true; - inSync = swapDir->updateCollapsed(*collapsed); + inSync = swapDir->updateAnchored(*collapsed); } else { - found = anchorCollapsed(*collapsed, inSync); + found = anchorToCache(*collapsed, inSync); } - if (abandoned && collapsed->store_status == STORE_PENDING) { - debugs(20, 3, "aborting abandoned but STORE_PENDING " << *collapsed); + if (waitingToBeFreed && !found) { + debugs(20, 3, "aborting unattached " << *collapsed << + " because it was marked for deletion before we could attach it"); collapsed->abort(); return; } @@ -566,40 +781,43 @@ Store::Controller::syncCollapsed(const sfileno xitIndex) if (inSync) { debugs(20, 5, "synced " << *collapsed); collapsed->invokeHandlers(); - } else if (found) { // unrecoverable problem syncing this entry + return; + } + + if (found) { // unrecoverable problem syncing this entry debugs(20, 3, "aborting unsyncable " << *collapsed); collapsed->abort(); - } else { // the entry is still not in one of the caches - debugs(20, 7, "waiting " << *collapsed); + return; } + + // the entry is still not in one of the caches + debugs(20, 7, "waiting " << *collapsed); } -/// Called for in-transit entries that are not yet anchored to a cache. +/// Called for Transients entries that are not yet anchored to a cache. /// For cached entries, return true after synchronizing them with their cache /// (making inSync true on success). For not-yet-cached entries, return false. bool -Store::Controller::anchorCollapsed(StoreEntry &collapsed, bool &inSync) +Store::Controller::anchorToCache(StoreEntry &entry, bool &inSync) { - // this method is designed to work with collapsed transients only - assert(collapsed.mem_obj); - assert(collapsed.mem_obj->xitTable.index >= 0); - assert(collapsed.mem_obj->smpCollapsed); + assert(entry.hasTransients()); + assert(transientsReader(entry)); - debugs(20, 7, "anchoring " << collapsed); + debugs(20, 7, "anchoring " << entry); bool found = false; if (memStore) - found = memStore->anchorCollapsed(collapsed, inSync); + found = memStore->anchorToCache(entry, inSync); if (!found && swapDir) - found = swapDir->anchorCollapsed(collapsed, inSync); + found = swapDir->anchorToCache(entry, inSync); if (found) { if (inSync) - debugs(20, 7, "anchored " << collapsed); + debugs(20, 7, "anchored " << entry); else - debugs(20, 5, "failed to anchor " << collapsed); + debugs(20, 5, "failed to anchor " << entry); } else { - debugs(20, 7, "skipping not yet cached " << collapsed); + debugs(20, 7, "skipping not yet cached " << entry); } return found; @@ -611,6 +829,14 @@ Store::Controller::smpAware() const return memStore || (swapDir && swapDir->smpAware()); } +void +Store::Controller::checkTransients(const StoreEntry &e) const +{ + if (EBIT_TEST(e.flags, ENTRY_SPECIAL)) + return; + assert(!transients || e.hasTransients()); +} + namespace Store { static RefCount TheRoot; } diff --git a/src/store/Controller.h b/src/store/Controller.h index 74e13da1e5..1bce65e8f7 100644 --- a/src/store/Controller.h +++ b/src/store/Controller.h @@ -28,7 +28,6 @@ public: /* Storage API */ virtual void create() override; virtual void init() override; - virtual StoreEntry *get(const cache_key *) override; virtual uint64_t maxSize() const override; virtual uint64_t minSize() const override; virtual uint64_t currentSize() const override; @@ -38,11 +37,40 @@ public: virtual void stat(StoreEntry &) const override; virtual void sync() override; virtual void maintain() override; - virtual void markForUnlink(StoreEntry &) override; - virtual void unlink(StoreEntry &) override; + virtual void evictCached(StoreEntry &) override; + virtual void evictIfFound(const cache_key *) override; virtual int callback() override; virtual bool smpAware() const override; + /// \returns a locally indexed and SMP-tracked matching StoreEntry (or nil) + /// Slower than peek() but does not restrict StoreEntry use and storage. + /// Counts as an entry reference from the removal policy p.o.v. + StoreEntry *find(const cache_key *); + + /// \returns a matching StoreEntry not suitable for long-term use (or nil) + /// Faster than find() but the returned entry may not receive updates, may + /// lack information from some of the Stores, and should not be updated + /// except that purging peek()ed entries is supported. + /// Does not count as an entry reference from the removal policy p.o.v. + StoreEntry *peek(const cache_key *); + + /// \returns matching StoreEntry associated with local ICP/HTCP transaction + /// Warning: The returned StoreEntry is not synced and may be marked for + /// deletion. Use it only for extracting transaction callback details. + /// TODO: Group and return just that callback-related data instead? + StoreEntry *findCallback(const cache_key *); + + /// Whether a transient entry with the given public key exists and (but) was + /// marked for removal some time ago; get(key) returns nil in such cases. + bool markedForDeletion(const cache_key *key) const; + + /// markedForDeletion() with no readers + /// this is one method because the two conditions must be checked in the right order + bool markedForDeletionAndAbandoned(const StoreEntry &) const; + + /// whether there is a disk entry with e.key + bool hasReadableDiskEntry(const StoreEntry &) const; + /// Additional unknown-size entry bytes required by Store in order to /// reduce the risk of selecting the wrong disk cache for the growing entry. int64_t accumulateMore(StoreEntry &) const; @@ -53,14 +81,33 @@ public: /// called when the entry is no longer needed by any transaction void handleIdleEntry(StoreEntry &); + /// Evict memory cache entries to free at least `spaceRequired` bytes. + /// Should be called via storeGetMemSpace(). + /// Unreliable: Fails if enough victims cannot be found fast enough. + void freeMemorySpace(const int spaceRequired); + /// called to get rid of no longer needed entry data in RAM, if any void memoryOut(StoreEntry &, const bool preserveSwappable); /// update old entry metadata and HTTP headers using a newer entry void updateOnNotModified(StoreEntry *old, const StoreEntry &newer); - /// makes the entry available for collapsing future requests - void allowCollapsing(StoreEntry *, const RequestFlags &, const HttpRequestMethod &); + /// tries to make the entry available for collapsing future requests + bool allowCollapsing(StoreEntry *, const RequestFlags &, const HttpRequestMethod &); + + /// register a being-read StoreEntry (to optimize concurrent cache reads + /// and to receive remote DELETE events) + void addReading(StoreEntry *, const cache_key *); + + /// register a being-written StoreEntry (to support concurrent cache reads + /// and to receive remote DELETE events) + void addWriting(StoreEntry *, const cache_key *); + + /// whether the entry is in "reading from Transients" I/O state + bool transientsReader(const StoreEntry &) const; + + /// whether the entry is in "writing to Transients" I/O state + bool transientsWriter(const StoreEntry &) const; /// marks the entry completed for collapsed requests void transientsCompleteWriting(StoreEntry &); @@ -68,17 +115,14 @@ public: /// Update local intransit entry after changes made by appending worker. void syncCollapsed(const sfileno); - /// calls Root().transients->abandon() if transients are tracked - void transientsAbandon(StoreEntry &); + /// stop any current (and prevent any future) SMP sharing of the given entry + void stopSharing(StoreEntry &); /// number of the transient entry readers some time ago int transientReaders(const StoreEntry &) const; /// disassociates the entry from the intransit table - void transientsDisconnect(MemObject &); - - /// removes the entry from the memory cache - void memoryUnlink(StoreEntry &); + void transientsDisconnect(StoreEntry &); /// disassociates the entry from the memory cache, preserving cached data void memoryDisconnect(StoreEntry &); @@ -90,14 +134,21 @@ public: static int store_dirs_rebuilding; private: + bool memoryCacheHasSpaceFor(const int pagesRequired) const; + /// update reference counters of the recently touched entry void referenceBusy(StoreEntry &e); /// dereference() an idle entry and return true if the entry should be deleted bool dereferenceIdle(StoreEntry &, bool wantsLocalMemory); - StoreEntry *find(const cache_key *key); + void allowSharing(StoreEntry &, const cache_key *); + StoreEntry *peekAtLocal(const cache_key *); + + void memoryEvictCached(StoreEntry &); + void transientsUnlinkByKeyIfFound(const cache_key *); bool keepForLocalMemoryCache(StoreEntry &e) const; - bool anchorCollapsed(StoreEntry &, bool &inSync); + bool anchorToCache(StoreEntry &e, bool &inSync); + void checkTransients(const StoreEntry &) const; Disks *swapDir; ///< summary view of all disk caches Memory *memStore; ///< memory cache @@ -106,6 +157,9 @@ private: /// will belong to a memory cache, a disk cache, or will be uncachable /// when the response header comes. Used for SMP collapsed forwarding. Transients *transients; + + /// Hack: Relays page shortage from freeMemorySpace() to handleIdleEntry(). + int memoryPagesDebt_ = 0; }; /// safely access controller singleton diff --git a/src/store/Disk.cc b/src/store/Disk.cc index 670191a5e4..e2d17dd3a8 100644 --- a/src/store/Disk.cc +++ b/src/store/Disk.cc @@ -187,10 +187,10 @@ Store::Disk::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) c bool Store::Disk::canLog(StoreEntry const &e)const { - if (e.swap_filen < 0) + if (e.hasDisk()) return false; - if (e.swap_status != SWAPOUT_DONE) + if (!e.swappedOut()) return false; if (e.swap_file_sz <= 0) diff --git a/src/store/Disk.h b/src/store/Disk.h index 302bf22081..416839b387 100644 --- a/src/store/Disk.h +++ b/src/store/Disk.h @@ -69,8 +69,13 @@ public: /// called when the entry is about to forget its association with cache_dir virtual void disconnect(StoreEntry &) {} - /// called when entry swap out is complete - virtual void swappedOut(const StoreEntry &e) = 0; + /// finalize the successful swapout that has been already noticed by Store + virtual void finalizeSwapoutSuccess(const StoreEntry &) = 0; + /// abort the failed swapout that has been already noticed by Store + virtual void finalizeSwapoutFailure(StoreEntry &) = 0; + + /// whether this cache dir has an entry with `e.key` + virtual bool hasReadableEntry(const StoreEntry &e) const = 0; protected: void parseOptions(int reconfiguring); diff --git a/src/store/Disks.cc b/src/store/Disks.cc index eb8f254cb8..b543d84402 100644 --- a/src/store/Disks.cc +++ b/src/store/Disks.cc @@ -486,19 +486,33 @@ Store::Disks::sync() } void -Store::Disks::markForUnlink(StoreEntry &e) { - if (e.swap_filen >= 0) - store(e.swap_dirn)->markForUnlink(e); +Store::Disks::evictCached(StoreEntry &e) { + if (e.hasDisk()) { + // TODO: move into Fs::Ufs::UFSSwapDir::evictCached() + if (!EBIT_TEST(e.flags, KEY_PRIVATE)) { + // log before evictCached() below may clear hasDisk() + storeDirSwapLog(&e, SWAP_LOG_DEL); + } + + e.disk().evictCached(e); + return; + } + + if (const auto key = e.publicKey()) + evictIfFound(key); } void -Store::Disks::unlink(StoreEntry &e) { - if (e.swap_filen >= 0) - store(e.swap_dirn)->unlink(e); +Store::Disks::evictIfFound(const cache_key *key) +{ + for (int i = 0; i < Config.cacheSwap.n_configured; ++i) { + if (dir(i).active()) + dir(i).evictIfFound(key); + } } bool -Store::Disks::anchorCollapsed(StoreEntry &collapsed, bool &inSync) +Store::Disks::anchorToCache(StoreEntry &entry, bool &inSync) { if (const int cacheDirs = Config.cacheSwap.n_configured) { // ask each cache_dir until the entry is found; use static starting @@ -511,23 +525,23 @@ Store::Disks::anchorCollapsed(StoreEntry &collapsed, bool &inSync) if (!sd.active()) continue; - if (sd.anchorCollapsed(collapsed, inSync)) { - debugs(20, 3, "cache_dir " << idx << " anchors " << collapsed); + if (sd.anchorToCache(entry, inSync)) { + debugs(20, 3, "cache_dir " << idx << " anchors " << entry); return true; } } } debugs(20, 4, "none of " << Config.cacheSwap.n_configured << - " cache_dirs have " << collapsed); + " cache_dirs have " << entry); return false; } bool -Store::Disks::updateCollapsed(StoreEntry &collapsed) +Store::Disks::updateAnchored(StoreEntry &entry) { - return collapsed.swap_filen >= 0 && - dir(collapsed.swap_dirn).updateCollapsed(collapsed); + return entry.hasDisk() && + dir(entry.swap_dirn).updateAnchored(entry); } bool @@ -543,7 +557,14 @@ Store::Disks::smpAware() const return false; } -/* Store::Disks globals that should be converted to use RegisteredRunner */ +bool +Store::Disks::hasReadableEntry(const StoreEntry &e) const +{ + for (int i = 0; i < Config.cacheSwap.n_configured; ++i) + if (dir(i).active() && dir(i).hasReadableEntry(e)) + return true; + return false; +} void storeDirOpenSwapLogs() @@ -713,7 +734,7 @@ storeDirSwapLog(const StoreEntry * e, int op) { assert (e); assert(!EBIT_TEST(e->flags, KEY_PRIVATE)); - assert(e->swap_filen >= 0); + assert(e->hasDisk()); /* * icons and such; don't write them to the swap log */ diff --git a/src/store/Disks.h b/src/store/Disks.h index 8d89b32a18..7a6a12ca88 100644 --- a/src/store/Disks.h +++ b/src/store/Disks.h @@ -36,10 +36,10 @@ public: virtual bool dereference(StoreEntry &e) override; virtual void updateHeaders(StoreEntry *) override; virtual void maintain() override; - virtual bool anchorCollapsed(StoreEntry &e, bool &inSync) override; - virtual bool updateCollapsed(StoreEntry &e) override; - virtual void markForUnlink(StoreEntry &) override; - virtual void unlink(StoreEntry &) override; + virtual bool anchorToCache(StoreEntry &e, bool &inSync) override; + virtual bool updateAnchored(StoreEntry &) override; + virtual void evictCached(StoreEntry &) override; + virtual void evictIfFound(const cache_key *) override; virtual int callback() override; /// slowly calculate (and cache) hi/lo watermarks and similar limits @@ -49,6 +49,8 @@ public: /// reduce the risk of selecting the wrong disk cache for the growing entry. int64_t accumulateMore(const StoreEntry&) const; virtual bool smpAware() const override; + /// whether any of disk caches has entry with e.key + bool hasReadableEntry(const StoreEntry &) const; private: /* migration logic */ diff --git a/src/store/Storage.h b/src/store/Storage.h index 67440343b1..5dd9410649 100644 --- a/src/store/Storage.h +++ b/src/store/Storage.h @@ -10,7 +10,9 @@ #define SQUID_STORE_STORAGE_H #include "base/RefCount.h" +#include "http/RequestMethod.h" #include "store/forward.h" +#include "store_key_md5.h" class StoreInfoStats; @@ -30,9 +32,6 @@ public: /// use readable() and writable() methods. virtual void init() = 0; - /// Retrieve a store entry from the store (blocking) - virtual StoreEntry *get(const cache_key *) = 0; - /** * The maximum size the store will support in normal use. Inaccuracy is * permitted, but may throw estimates for memory etc out of whack. @@ -60,11 +59,14 @@ public: */ virtual void stat(StoreEntry &e) const = 0; - /// expect an unlink() call after the entry becomes idle - virtual void markForUnlink(StoreEntry &e) = 0; + /// Prevent new get() calls from returning the matching entry. + /// If the matching entry is unused, it may be removed from the store now. + /// The store entry is matched using either `e` attachment info or `e.key`. + virtual void evictCached(StoreEntry &e) = 0; - /// remove the entry from the store - virtual void unlink(StoreEntry &e) = 0; + /// An evictCached() equivalent for callers that did not get() a StoreEntry. + /// Callers with StoreEntry objects must use evictCached() instead. + virtual void evictIfFound(const cache_key *) = 0; /// called once every main loop iteration; TODO: Move to UFS code. virtual int callback() { return 0; } diff --git a/src/store/forward.h b/src/store/forward.h index 781420de69..75f3445b8e 100644 --- a/src/store/forward.h +++ b/src/store/forward.h @@ -36,12 +36,16 @@ class Transients; namespace Store { +/// cache "I/O" direction and status +enum IoStatus { ioUndecided, ioWriting, ioReading, ioDone }; + class Storage; class Controller; class Controlled; class Disks; class Disk; class DiskConfig; +class EntryGuard; typedef ::StoreEntry Entry; typedef ::MemStore Memory; diff --git a/src/store_client.cc b/src/store_client.cc index 0705aba430..95b0011721 100644 --- a/src/store_client.cc +++ b/src/store_client.cc @@ -162,7 +162,7 @@ store_client::store_client(StoreEntry *e) : if (getType() == STORE_DISK_CLIENT) { /* assert we'll be able to get the data we want */ /* maybe we should open swapin_sio here */ - assert(entry->swap_filen > -1 || entry->swappingOut()); + assert(entry->hasDisk() || entry->swappingOut()); } } @@ -251,7 +251,7 @@ store_client::moreToSend() const const int64_t len = entry->objectLen(); // If we do not know the entry length, then we have to open the swap file. - const bool canSwapIn = entry->swap_filen >= 0; + const bool canSwapIn = entry->hasDisk(); if (len < 0) return canSwapIn; @@ -441,7 +441,7 @@ store_client::fileRead() flags.disk_io_pending = true; if (mem->swap_hdr_sz != 0) - if (entry->swap_status == SWAPOUT_WRITING) + if (entry->swappingOut()) assert(mem->swapout.sio->offset() > copyInto.offset + (int64_t)mem->swap_hdr_sz); storeRead(swapin_sio, @@ -667,7 +667,7 @@ storeUnregister(store_client * sc, StoreEntry * e, void *data) dlinkDelete(&sc->node, &mem->clients); -- mem->nclients; - if (e->store_status == STORE_OK && e->swap_status != SWAPOUT_DONE) + if (e->store_status == STORE_OK && !e->swappedOut()) e->swapOut(); if (sc->swapin_sio != NULL) { diff --git a/src/store_log.cc b/src/store_log.cc index 5606016bfa..deab800696 100644 --- a/src/store_log.cc +++ b/src/store_log.cc @@ -57,6 +57,9 @@ storeLog(int tag, const StoreEntry * e) String ctype=(reply->content_type.size() ? reply->content_type.termedBuf() : str_unknown); + // mem_obj may still lack logging details; especially in RELEASE cases + const char *logUri = mem->hasUris() ? mem->logUri() : "?"; + logfileLineStart(storelog); logfilePrintf(storelog, "%9d.%03d %-7s %02d %08X %s %4d %9d %9d %9d " SQUIDSTRINGPH " %" PRId64 "/%" PRId64 " " SQUIDSBUFPH " %s\n", (int) current_time.tv_sec, @@ -73,7 +76,7 @@ storeLog(int tag, const StoreEntry * e) reply->content_length, e->contentLen(), SQUIDSBUFPRINT(mem->method.image()), - mem->logUri()); + logUri); logfileLineEnd(storelog); } else { /* no mem object. Most RELEASE cases */ diff --git a/src/store_rebuild.cc b/src/store_rebuild.cc index 008b8a153d..d6d0bf9462 100644 --- a/src/store_rebuild.cc +++ b/src/store_rebuild.cc @@ -75,7 +75,7 @@ storeCleanup(void *) * Calling StoreEntry->release() has no effect because we're * still in 'store_rebuilding' state */ - if (e->swap_filen < 0) + if (!e->hasDisk()) continue; if (opt_store_doublecheck) @@ -359,48 +359,3 @@ storeRebuildParseEntry(MemBuf &buf, StoreEntry &tmpe, cache_key *key, return true; } -bool -storeRebuildKeepEntry(const StoreEntry &tmpe, const cache_key *key, StoreRebuildData &stats) -{ - /* this needs to become - * 1) unpack url - * 2) make synthetic request with headers ?? or otherwise search - * for a matching object in the store - * TODO FIXME change to new async api - * TODO FIXME I think there is a race condition here with the - * async api : - * store A reads in object foo, searchs for it, and finds nothing. - * store B reads in object foo, searchs for it, finds nothing. - * store A gets called back with nothing, so registers the object - * store B gets called back with nothing, so registers the object, - * which will conflict when the in core index gets around to scanning - * store B. - * - * this suggests that rather than searching for duplicates, the - * index rebuild should just assume its the most recent accurate - * store entry and whoever indexes the stores handles duplicates. - */ - if (StoreEntry *e = Store::Root().get(key)) { - - if (e->lastref >= tmpe.lastref) { - /* key already exists, old entry is newer */ - /* keep old, ignore new */ - ++stats.dupcount; - - // For some stores, get() creates/unpacks a store entry. Signal - // such stores that we will no longer use the get() result: - e->lock("storeRebuildKeepEntry"); - e->unlock("storeRebuildKeepEntry"); - - return false; - } else { - /* URL already exists, this swapfile not being used */ - /* junk old, load new */ - e->release(); /* release old entry */ - ++stats.dupcount; - } - } - - return true; -} - diff --git a/src/store_rebuild.h b/src/store_rebuild.h index 3fd9191e6f..de3056edf7 100644 --- a/src/store_rebuild.h +++ b/src/store_rebuild.h @@ -42,8 +42,6 @@ void storeRebuildProgress(int sd_index, int total, int sofar); bool storeRebuildLoadEntry(int fd, int diskIndex, MemBuf &buf, StoreRebuildData &counts); /// parses entry buffer and validates entry metadata; fills e on success bool storeRebuildParseEntry(MemBuf &buf, StoreEntry &e, cache_key *key, StoreRebuildData &counts, uint64_t expectedSize); -/// checks whether the loaded entry should be kept; updates counters -bool storeRebuildKeepEntry(const StoreEntry &e, const cache_key *key, StoreRebuildData &counts); #endif /* SQUID_STORE_REBUILD_H_ */ diff --git a/src/store_swapin.cc b/src/store_swapin.cc index 193e51d216..0b7f204380 100644 --- a/src/store_swapin.cc +++ b/src/store_swapin.cc @@ -31,22 +31,14 @@ storeSwapInStart(store_client * sc) if (e->mem_status != NOT_IN_MEMORY) debugs(20, 3, HERE << "already IN_MEMORY"); - debugs(20, 3, "storeSwapInStart: called for : " << e->swap_dirn << " " << - std::hex << std::setw(8) << std::setfill('0') << std::uppercase << - e->swap_filen << " " << e->getMD5Text()); + debugs(20, 3, *e << " " << e->getMD5Text()); - if (e->swap_status != SWAPOUT_WRITING && e->swap_status != SWAPOUT_DONE) { - debugs(20, DBG_IMPORTANT, "storeSwapInStart: bad swap_status (" << swapStatusStr[e->swap_status] << ")"); - return; - } - - if (e->swap_filen < 0) { - debugs(20, DBG_IMPORTANT, "storeSwapInStart: swap_filen < 0"); + if (!e->hasDisk()) { + debugs(20, DBG_IMPORTANT, "BUG: Attempt to swap in a not-stored entry " << *e << ". Salvaged."); return; } assert(e->mem_obj != NULL); - debugs(20, 3, "storeSwapInStart: Opening fileno " << std::hex << std::setw(8) << std::setfill('0') << std::uppercase << e->swap_filen); sc->swapin_sio = storeOpen(e, storeSwapInFileNotify, storeSwapInFileClosed, sc); } diff --git a/src/store_swapout.cc b/src/store_swapout.cc index 7eab4641fb..08038fe72c 100644 --- a/src/store_swapout.cc +++ b/src/store_swapout.cc @@ -10,6 +10,7 @@ #include "squid.h" #include "cbdata.h" +#include "CollapsedForwarding.h" #include "globals.h" #include "Store.h" #include "StoreClient.h" @@ -46,7 +47,6 @@ storeSwapOutStart(StoreEntry * e) debugs(20, 5, "storeSwapOutStart: Begin SwapOut '" << e->url() << "' to dirno " << e->swap_dirn << ", fileno " << std::hex << std::setw(8) << std::setfill('0') << std::uppercase << e->swap_filen); - e->swap_status = SWAPOUT_WRITING; e->swapOutDecision(MemObject::SwapOut::swStarted); /* If we start swapping out objects with OutOfBand Metadata, * then this code needs changing @@ -65,6 +65,7 @@ storeSwapOutStart(StoreEntry * e) sio = storeCreate(e, storeSwapOutFileNotify, storeSwapOutFileClosed, c); if (sio == NULL) { + assert(!e->hasDisk()); e->swap_status = SWAPOUT_NONE; e->swapOutDecision(MemObject::SwapOut::swImpossible); delete c; @@ -79,14 +80,13 @@ storeSwapOutStart(StoreEntry * e) e->lock("storeSwapOutStart"); /* Pick up the file number if it was assigned immediately */ - e->swap_filen = mem->swapout.sio->swap_filen; - - e->swap_dirn = mem->swapout.sio->swap_dirn; + e->attachToDisk(mem->swapout.sio->swap_dirn, mem->swapout.sio->swap_filen, SWAPOUT_WRITING); /* write out the swap metadata */ storeIOWrite(mem->swapout.sio, buf, mem->swap_hdr_sz, 0, xfree_cppwrapper); } +/// XXX: unused, see a related StoreIOState::file_callback static void storeSwapOutFileNotify(void *data, int errflag, StoreIOState::Pointer self) { @@ -94,11 +94,11 @@ storeSwapOutFileNotify(void *data, int errflag, StoreIOState::Pointer self) static_cast(data)->unwrap(&e); MemObject *mem = e->mem_obj; - assert(e->swap_status == SWAPOUT_WRITING); + assert(e->swappingOut()); assert(mem); assert(mem->swapout.sio == self); assert(errflag == 0); - assert(e->swap_filen < 0); // if this fails, call SwapDir::disconnect(e) + assert(!e->hasDisk()); // if this fails, call SwapDir::disconnect(e) e->swap_filen = mem->swapout.sio->swap_filen; e->swap_dirn = mem->swapout.sio->swap_dirn; } @@ -146,7 +146,7 @@ doPages(StoreEntry *anEntry) -1, memNodeWriteComplete); - if (!ok || anEntry->swap_status != SWAPOUT_WRITING) + if (!ok || !anEntry->swappingOut()) return false; int64_t swapout_size = mem->endOffset() - mem->swapout.queue_offset; @@ -210,7 +210,7 @@ StoreEntry::swapOut() } #endif - if (swap_status == SWAPOUT_WRITING) + if (swappingOut()) assert(mem_obj->inmem_lo <= mem_obj->objectBytesOnDisk() ); // buffered bytes we have not swapped out yet @@ -242,7 +242,7 @@ StoreEntry::swapOut() } /* Ok, we have stuff to swap out. Is there a swapout.sio open? */ - if (swap_status == SWAPOUT_NONE) { + if (!hasDisk()) { assert(mem_obj->swapout.sio == NULL); assert(mem_obj->inmem_lo == 0); storeSwapOutStart(this); // sets SwapOut::swImpossible on failures @@ -288,7 +288,7 @@ storeSwapOutFileClosed(void *data, int errflag, StoreIOState::Pointer self) MemObject *mem = e->mem_obj; assert(mem->swapout.sio == self); - assert(e->swap_status == SWAPOUT_WRITING); + assert(e->swappingOut()); // if object_size is still unknown, the entry was probably aborted if (errflag || e->objectLen() < 0) { @@ -304,12 +304,8 @@ storeSwapOutFileClosed(void *data, int errflag, StoreIOState::Pointer self) storeConfigure(); } - if (e->swap_filen >= 0) - e->disk().unlink(*e); - - assert(e->swap_status == SWAPOUT_NONE); - - e->releaseRequest(); + e->disk().finalizeSwapoutFailure(*e); + e->releaseRequest(); // TODO: Keep the memory entry (if any) } else { /* swapping complete */ debugs(20, 3, "storeSwapOutFileClosed: SwapOut complete: '" << e->url() << "' to " << @@ -320,7 +316,7 @@ storeSwapOutFileClosed(void *data, int errflag, StoreIOState::Pointer self) e->swap_file_sz = e->objectLen() + mem->swap_hdr_sz; e->swap_status = SWAPOUT_DONE; - e->disk().swappedOut(*e); + e->disk().finalizeSwapoutSuccess(*e); // XXX: For some Stores, it is pointless to re-check cachability here // and it leads to double counts in store_check_cachable_hist. We need @@ -334,6 +330,7 @@ storeSwapOutFileClosed(void *data, int errflag, StoreIOState::Pointer self) ++statCounter.swap.outs; } + Store::Root().transientsCompleteWriting(*e); debugs(20, 3, "storeSwapOutFileClosed: " << __FILE__ << ":" << __LINE__); mem->swapout.sio = NULL; e->unlock("storeSwapOutFileClosed"); @@ -358,20 +355,27 @@ StoreEntry::mayStartSwapOut() return false; } - // if we swapped out already, do not start over - if (swap_status == SWAPOUT_DONE) { + // if we are swapping out or swapped out already, do not start over + if (hasDisk() || Store::Root().hasReadableDiskEntry(*this)) { debugs(20, 3, "already did"); swapOutDecision(MemObject::SwapOut::swImpossible); return false; } - // if we stared swapping out already, do not start over + // if we have just stared swapping out (attachToDisk() has not been + // called), do not start over if (decision == MemObject::SwapOut::swStarted) { debugs(20, 3, "already started"); swapOutDecision(MemObject::SwapOut::swImpossible); return false; } + if (Store::Root().markedForDeletionAndAbandoned(*this)) { + debugs(20, 3, "marked for deletion and abandoned"); + swapOutDecision(MemObject::SwapOut::swImpossible); + return false; + } + // if we decided that swapout is possible, do not repeat same checks if (decision == MemObject::SwapOut::swPossible) { debugs(20, 3, "already allowed"); diff --git a/src/tests/TestSwapDir.h b/src/tests/TestSwapDir.h index a82649db8b..a1b0869563 100644 --- a/src/tests/TestSwapDir.h +++ b/src/tests/TestSwapDir.h @@ -24,7 +24,8 @@ public: virtual uint64_t currentSize() const override; virtual uint64_t currentCount() const override; virtual void stat(StoreEntry &) const override; - virtual void swappedOut(const StoreEntry &e) override {} + virtual void finalizeSwapoutSuccess(const StoreEntry &) override {} + virtual void finalizeSwapoutFailure(StoreEntry &) override {} virtual void reconfigure() override; virtual void init() override; virtual bool unlinkdUseful() const override; @@ -32,8 +33,9 @@ public: virtual StoreIOState::Pointer createStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *) override; virtual StoreIOState::Pointer openStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *) override; virtual void parse(int, char*) override; - virtual void markForUnlink(StoreEntry &) override {} - virtual void unlink(StoreEntry &) override {} + virtual void evictCached(StoreEntry &) override {} + virtual void evictIfFound(const cache_key *) override {} + virtual bool hasReadableEntry(const StoreEntry &) const override { return false; } }; typedef RefCount TestSwapDirPointer; diff --git a/src/tests/stub_CollapsedForwarding.cc b/src/tests/stub_CollapsedForwarding.cc index 5fcf772f89..ff400b8037 100644 --- a/src/tests/stub_CollapsedForwarding.cc +++ b/src/tests/stub_CollapsedForwarding.cc @@ -12,5 +12,6 @@ #define STUB_API "CollapsedForwarding.cc" #include "tests/STUB.h" -void CollapsedForwarding::Broadcast(StoreEntry const&) STUB +void CollapsedForwarding::Broadcast(StoreEntry const&, const bool) STUB +void CollapsedForwarding::Broadcast(const sfileno, const bool) STUB diff --git a/src/tests/stub_MemStore.cc b/src/tests/stub_MemStore.cc index ca76a209a1..10d9de7078 100644 --- a/src/tests/stub_MemStore.cc +++ b/src/tests/stub_MemStore.cc @@ -19,7 +19,6 @@ MemStore::~MemStore() STUB bool MemStore::keepInLocalMemory(const StoreEntry &) const STUB_RETVAL(false) void MemStore::write(StoreEntry &e) STUB void MemStore::completeWriting(StoreEntry &e) STUB -void MemStore::unlink(StoreEntry &e) STUB void MemStore::disconnect(StoreEntry &e) STUB void MemStore::reference(StoreEntry &) STUB void MemStore::updateHeaders(StoreEntry *) STUB @@ -35,7 +34,8 @@ uint64_t MemStore::currentSize() const STUB_RETVAL(0) uint64_t MemStore::currentCount() const STUB_RETVAL(0) int64_t MemStore::maxObjectSize() const STUB_RETVAL(0) bool MemStore::dereference(StoreEntry &) STUB_RETVAL(false) -void MemStore::markForUnlink(StoreEntry&) STUB -bool MemStore::anchorCollapsed(StoreEntry&, bool&) STUB_RETVAL(false) -bool MemStore::updateCollapsed(StoreEntry&) STUB_RETVAL(false) +void MemStore::evictCached(StoreEntry&) STUB +void MemStore::evictIfFound(const cache_key *) STUB +bool MemStore::anchorToCache(StoreEntry&, bool&) STUB_RETVAL(false) +bool MemStore::updateAnchored(StoreEntry&) STUB_RETVAL(false) diff --git a/src/tests/stub_store.cc b/src/tests/stub_store.cc index 9ce5941d4f..a5e0ee67b3 100644 --- a/src/tests/stub_store.cc +++ b/src/tests/stub_store.cc @@ -37,15 +37,14 @@ void StoreEntry::replaceHttpReply(HttpReply *, bool andStartWriting) STUB bool StoreEntry::mayStartSwapOut() STUB_RETVAL(false) void StoreEntry::trimMemory(const bool preserveSwappable) STUB void StoreEntry::abort() STUB -void StoreEntry::makePublic(const KeyScope scope) STUB +bool StoreEntry::makePublic(const KeyScope scope) STUB void StoreEntry::makePrivate(const bool shareable) STUB -void StoreEntry::setPublicKey(const KeyScope scope) STUB -void StoreEntry::setPrivateKey(const bool shareable) STUB +bool StoreEntry::setPublicKey(const KeyScope scope) STUB +void StoreEntry::setPrivateKey(const bool, const bool) STUB void StoreEntry::expireNow() STUB void StoreEntry::releaseRequest(const bool shareable) STUB void StoreEntry::negativeCache() STUB -void StoreEntry::cacheNegatively() STUB -void StoreEntry::purgeMem() STUB +bool StoreEntry::cacheNegatively() STUB void StoreEntry::swapOut() STUB void StoreEntry::swapOutFileClose(int how) STUB const char *StoreEntry::url() const STUB_RETVAL(NULL) @@ -83,7 +82,6 @@ void *StoreEntry::operator new(size_t byteCount) return new StoreEntry(); } void StoreEntry::operator delete(void *address) STUB -void StoreEntry::setReleaseFlag() STUB //#if USE_SQUID_ESI //ESIElement::Pointer StoreEntry::cachedESITree STUB_RETVAL(NULL) //#endif @@ -112,6 +110,8 @@ void Store::Maintain(void *unused) STUB int Store::Controller::store_dirs_rebuilding = 0; StoreSearch *Store::Controller::search() STUB_RETVAL(NULL) void Store::Controller::maintain() STUB +bool Store::Controller::markedForDeletion(const cache_key *) const STUB_RETVAL(false) +void Store::Controller::freeMemorySpace(const int) STUB std::ostream &operator <<(std::ostream &os, const StoreEntry &) { @@ -125,7 +125,7 @@ StoreEntry *storeGetPublic(const char *uri, const HttpRequestMethod& method) STU StoreEntry *storeGetPublicByRequest(HttpRequest * request, const KeyScope scope) STUB_RETVAL(NULL) StoreEntry *storeGetPublicByRequestMethod(HttpRequest * request, const HttpRequestMethod& method, const KeyScope scope) STUB_RETVAL(NULL) StoreEntry *storeCreateEntry(const char *, const char *, const RequestFlags &, const HttpRequestMethod&) STUB_RETVAL(NULL) -StoreEntry *storeCreatePureEntry(const char *storeId, const char *logUrl, const RequestFlags &, const HttpRequestMethod&) STUB_RETVAL(NULL) +StoreEntry *storeCreatePureEntry(const char *storeId, const char *logUrl, const HttpRequestMethod&) STUB_RETVAL(nullptr) void storeConfigure(void) STUB int expiresMoreThan(time_t, time_t) STUB_RETVAL(0) void storeAppendPrintf(StoreEntry *, const char *,...) STUB diff --git a/src/tests/stub_store_rebuild.cc b/src/tests/stub_store_rebuild.cc index 9b2c0d9de3..909b965f34 100644 --- a/src/tests/stub_store_rebuild.cc +++ b/src/tests/stub_store_rebuild.cc @@ -19,7 +19,6 @@ #include "tests/STUB.h" void storeRebuildProgress(int sd_index, int total, int sofar) STUB -bool storeRebuildKeepEntry(const StoreEntry &tmpe, const cache_key *key, StoreRebuildData &counts) STUB_RETVAL(false) bool storeRebuildParseEntry(MemBuf &, StoreEntry &, cache_key *, StoreRebuildData &, uint64_t) STUB_RETVAL(false) void storeRebuildComplete(StoreRebuildData *) diff --git a/src/tests/testRock.cc b/src/tests/testRock.cc index e53da2c037..4eb90940f9 100644 --- a/src/tests/testRock.cc +++ b/src/tests/testRock.cc @@ -259,16 +259,28 @@ testRock::testRockSwapOut() // try to swap out entry to a used unlocked slot { - StoreEntry *const pe = addEntry(4); + // without marking the old entry as deleted + StoreEntry *const pe = addEntry(3); - CPPUNIT_ASSERT_EQUAL(SWAPOUT_WRITING, pe->swap_status); - CPPUNIT_ASSERT_EQUAL(0, pe->swap_dirn); - CPPUNIT_ASSERT(pe->swap_filen >= 0); + CPPUNIT_ASSERT_EQUAL(SWAPOUT_NONE, pe->swap_status); + CPPUNIT_ASSERT_EQUAL(-1, pe->swap_dirn); + CPPUNIT_ASSERT_EQUAL(-1, pe->swap_filen); + pe->unlock("testRock::testRockSwapOut e#3"); + + // after marking the old entry as deleted + StoreEntry *const pe2 = getEntry(4); + CPPUNIT_ASSERT(pe2 != nullptr); + pe2->release(); + + StoreEntry *const pe3 = addEntry(4); + CPPUNIT_ASSERT_EQUAL(SWAPOUT_WRITING, pe3->swap_status); + CPPUNIT_ASSERT_EQUAL(0, pe3->swap_dirn); + CPPUNIT_ASSERT(pe3->swap_filen >= 0); StockEventLoop loop; loop.run(); - CPPUNIT_ASSERT_EQUAL(SWAPOUT_DONE, pe->swap_status); + CPPUNIT_ASSERT_EQUAL(SWAPOUT_DONE, pe3->swap_status); pe->unlock("testRock::testRockSwapOut e#4"); } diff --git a/src/tests/testStoreController.cc b/src/tests/testStoreController.cc index 76c65fce25..8b3d3ca649 100644 --- a/src/tests/testStoreController.cc +++ b/src/tests/testStoreController.cc @@ -111,8 +111,6 @@ addedEntry(Store::Disk *aStore, e->expires = squid_curtime; e->lastModified(squid_curtime); e->refcount = 1; - EBIT_CLR(e->flags, RELEASE_REQUEST); - e->clearPrivate(); e->ping_status = PING_NONE; EBIT_CLR(e->flags, ENTRY_VALIDATED); e->hashInsert((const cache_key *)name.termedBuf()); /* do it after we clear KEY_PRIVATE */ diff --git a/src/tests/testStoreHashIndex.cc b/src/tests/testStoreHashIndex.cc index 866b09d8a3..c5380851f3 100644 --- a/src/tests/testStoreHashIndex.cc +++ b/src/tests/testStoreHashIndex.cc @@ -89,8 +89,6 @@ addedEntry(Store::Disk *aStore, e->expires = squid_curtime; e->lastModified(squid_curtime); e->refcount = 1; - EBIT_CLR(e->flags, RELEASE_REQUEST); - e->clearPrivate(); e->ping_status = PING_NONE; EBIT_CLR(e->flags, ENTRY_VALIDATED); e->hashInsert((const cache_key *)name.termedBuf()); /* do it after we clear KEY_PRIVATE */ diff --git a/src/whois.cc b/src/whois.cc index 3e1dc8c0de..3dd028f084 100644 --- a/src/whois.cc +++ b/src/whois.cc @@ -158,7 +158,8 @@ WhoisState::readReply(const Comm::ConnectionPointer &conn, char *aBuffer, size_t entry->timestampsSet(); entry->flush(); - entry->makePublic(); + if (!entry->makePublic()) + entry->makePrivate(true); fwd->complete(); debugs(75, 3, "whoisReadReply: Done: " << entry->url()); -- 2.39.5