}
void
-CollapsedForwarding::Broadcast(const StoreEntry &e)
+CollapsedForwarding::Broadcast(const StoreEntry &e, const bool includingThisWorker)
{
- if (!queue.get())
- return;
-
- if (!e.mem_obj || e.mem_obj->xitTable.index < 0 ||
+ if (!e.hasTransients() ||
!Store::Root().transientReaders(e)) {
debugs(17, 7, "nobody reads " << e);
return;
}
+ debugs(17, 5, e);
+ Broadcast(e.mem_obj->xitTable.index, includingThisWorker);
+}
+
+void
+CollapsedForwarding::Broadcast(const sfileno index, const bool includingThisWorker)
+{
+ if (!queue.get())
+ return;
+
CollapsedForwardingMsg msg;
msg.sender = KidIdentifier;
- msg.xitIndex = e.mem_obj->xitTable.index;
+ msg.xitIndex = index;
- debugs(17, 5, e << " to " << Config.workers << "-1 workers");
+ debugs(17, 7, "entry " << index << " to " << Config.workers << (includingThisWorker ? "" : "-1") << " workers");
// TODO: send only to workers who are waiting for data
for (int workerId = 1; workerId <= Config.workers; ++workerId) {
try {
- if (workerId != KidIdentifier && queue->push(workerId, msg))
+ if ((workerId != KidIdentifier || includingThisWorker) && queue->push(workerId, msg))
Notify(workerId);
} catch (const Queue::Full &) {
debugs(17, DBG_IMPORTANT, "ERROR: Collapsed forwarding " <<
#include "ipc/forward.h"
#include "ipc/Queue.h"
+#include "store/forward.h"
#include <memory>
static void Init();
/// notify other workers about changes in entry state (e.g., new data)
- static void Broadcast(const StoreEntry &e);
+ static void Broadcast(const StoreEntry &e, const bool includingThisWorker = false);
+
+ /// notify other workers about state changes in Transient entry at the given xitTable.index
+ /// use Broadcast(StoreEntry) variant if you have a StoreEntry object
+ static void Broadcast(const sfileno index, const bool includingThisWorker);
/// kick worker with empty IPC queue
static void Notify(const int workerId);
MemObject::MemObject() :
inmem_lo(0),
nclients(0),
- smpCollapsed(false),
ping_reply_callback(nullptr),
ircb_data(nullptr),
id(0),
mb->appendf("\tmem-cache index: %d state: %d offset: %" PRId64 "\n", memCache.index, memCache.io, memCache.offset);
if (object_sz >= 0)
mb->appendf("\tobject_sz: %" PRId64 "\n", object_sz);
- if (smpCollapsed)
- mb->appendf("\tsmp-collapsed\n");
StoreClientStats statsVisitor(mb);
#include "RemovalPolicy.h"
#include "SquidString.h"
#include "stmem.h"
+#include "store/forward.h"
#include "StoreIOBuffer.h"
#include "StoreIOState.h"
#include "typedefs.h" //for IRCB
SwapOut swapout;
- /// cache "I/O" direction and status
- typedef enum { ioUndecided, ioWriting, ioReading, ioDone } Io;
+ /* TODO: Remove this change-minimizing hack */
+ using Io = Store::IoStatus;
+ static constexpr Io ioUndecided = Store::ioUndecided;
+ static constexpr Io ioReading = Store::ioReading;
+ static constexpr Io ioWriting = Store::ioWriting;
+ static constexpr Io ioDone = Store::ioDone;
/// State of an entry with regards to the [shared] in-transit table.
class XitTable
};
MemCache memCache; ///< current [shared] memory caching state for the entry
- bool smpCollapsed; ///< whether this entry gets data from another worker
-
/* Read only - this reply must be preserved by store clients */
/* The original reply. possibly with updated metadata. */
HttpRequestPointer request;
StoreEntry *e = new StoreEntry();
// XXX: We do not know the URLs yet, only the key, but we need to parse and
- // store the response for the Root().get() callers to be happy because they
+ // store the response for the Root().find() callers to be happy because they
// expect IN_MEMORY entries to already have the response headers and body.
e->createMemObject();
const bool copied = copyFromShm(*e, index, *slot);
- if (copied) {
- e->hashInsert(key);
+ if (copied)
return e;
- }
- debugs(20, 3, HERE << "mem-loading failed; freeing " << index);
+ debugs(20, 3, "failed for " << *e);
map->freeEntry(index); // do not let others into the same trap
+ destroyStoreEntry(static_cast<hash_link *>(e));
return NULL;
}
}
bool
-MemStore::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
+MemStore::anchorToCache(StoreEntry &entry, bool &inSync)
{
if (!map)
return false;
sfileno index;
const Ipc::StoreMapAnchor *const slot = map->openForReading(
- reinterpret_cast<cache_key*>(collapsed.key), index);
+ reinterpret_cast<cache_key*>(entry.key), index);
if (!slot)
return false;
- anchorEntry(collapsed, index, *slot);
- inSync = updateCollapsedWith(collapsed, index, *slot);
+ anchorEntry(entry, index, *slot);
+ inSync = updateAnchoredWith(entry, index, *slot);
return true; // even if inSync is false
}
bool
-MemStore::updateCollapsed(StoreEntry &collapsed)
+MemStore::updateAnchored(StoreEntry &entry)
{
- assert(collapsed.mem_obj);
-
- const sfileno index = collapsed.mem_obj->memCache.index;
-
- // already disconnected from the cache, no need to update
- if (index < 0)
- return true;
-
if (!map)
return false;
+ assert(entry.mem_obj);
+ assert(entry.hasMemStore());
+ const sfileno index = entry.mem_obj->memCache.index;
const Ipc::StoreMapAnchor &anchor = map->readableEntry(index);
- return updateCollapsedWith(collapsed, index, anchor);
+ return updateAnchoredWith(entry, index, anchor);
}
-/// updates collapsed entry after its anchor has been located
+/// updates Transients entry after its anchor has been located
bool
-MemStore::updateCollapsedWith(StoreEntry &collapsed, const sfileno index, const Ipc::StoreMapAnchor &anchor)
+MemStore::updateAnchoredWith(StoreEntry &entry, const sfileno index, const Ipc::StoreMapAnchor &anchor)
{
- collapsed.swap_file_sz = anchor.basics.swap_file_sz;
- const bool copied = copyFromShm(collapsed, index, anchor);
+ entry.swap_file_sz = anchor.basics.swap_file_sz;
+ const bool copied = copyFromShm(entry, index, anchor);
return copied;
}
void
MemStore::anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor)
{
- const Ipc::StoreMapAnchor::Basics &basics = anchor.basics;
-
- e.swap_file_sz = basics.swap_file_sz;
- e.lastref = basics.lastref;
- e.timestamp = basics.timestamp;
- e.expires = basics.expires;
- e.lastModified(basics.lastmod);
- e.refcount = basics.refcount;
- e.flags = basics.flags;
+ assert(!e.hasDisk()); // no conflict with disk entry basics
+ anchor.exportInto(e);
assert(e.mem_obj);
if (anchor.complete()) {
assert(e.mem_obj->object_sz < 0);
e.setMemStatus(NOT_IN_MEMORY);
}
- assert(e.swap_status == SWAPOUT_NONE); // set in StoreEntry constructor
- e.ping_status = PING_NONE;
- EBIT_CLR(e.flags, RELEASE_REQUEST);
- e.clearPrivate();
EBIT_SET(e.flags, ENTRY_VALIDATED);
MemObject::MemCache &mc = e.mem_obj->memCache;
}
void
-MemStore::markForUnlink(StoreEntry &e)
+MemStore::evictCached(StoreEntry &e)
{
- assert(e.mem_obj);
- if (e.mem_obj->memCache.index >= 0)
- map->freeEntry(e.mem_obj->memCache.index);
+ debugs(47, 5, e);
+ if (e.hasMemStore()) {
+ if (map->freeEntry(e.mem_obj->memCache.index))
+ CollapsedForwarding::Broadcast(e);
+ if (!e.locked()) {
+ disconnect(e);
+ e.destroyMemObject();
+ }
+ } else if (const auto key = e.publicKey()) {
+ // the entry may have been loaded and then disconnected from the cache
+ evictIfFound(key);
+ if (!e.locked())
+ e.destroyMemObject();
+ }
}
void
-MemStore::unlink(StoreEntry &e)
+MemStore::evictIfFound(const cache_key *key)
{
- if (e.mem_obj && e.mem_obj->memCache.index >= 0) {
- map->freeEntry(e.mem_obj->memCache.index);
- disconnect(e);
- } else if (map) {
- // the entry may have been loaded and then disconnected from the cache
- map->freeEntryByKey(reinterpret_cast<cache_key*>(e.key));
- }
-
- e.destroyMemObject(); // XXX: but it may contain useful info such as a client list. The old code used to do that though, right?
+ if (map)
+ map->freeEntryByKey(key);
}
void
{
assert(e.mem_obj);
MemObject &mem_obj = *e.mem_obj;
- if (mem_obj.memCache.index >= 0) {
+ if (e.hasMemStore()) {
if (mem_obj.memCache.io == MemObject::ioWriting) {
map->abortWriting(mem_obj.memCache.index);
mem_obj.memCache.index = -1;
mem_obj.memCache.io = MemObject::ioDone;
- Store::Root().transientsAbandon(e); // broadcasts after the change
+ Store::Root().stopSharing(e); // broadcasts after the change
} else {
assert(mem_obj.memCache.io == MemObject::ioReading);
map->closeForReading(mem_obj.memCache.index);
virtual bool dereference(StoreEntry &e) override;
virtual void updateHeaders(StoreEntry *e) override;
virtual void maintain() override;
- virtual bool anchorCollapsed(StoreEntry &e, bool &inSync) override;
- virtual bool updateCollapsed(StoreEntry &e) override;
- virtual void markForUnlink(StoreEntry &) override;
- virtual void unlink(StoreEntry &e) override;
+ virtual bool anchorToCache(StoreEntry &e, bool &inSync) override;
+ virtual bool updateAnchored(StoreEntry &) override;
+ virtual void evictCached(StoreEntry &) override;
+ virtual void evictIfFound(const cache_key *) override;
virtual bool smpAware() const override { return true; }
static int64_t EntryLimit();
void updateHeadersOrThrow(Ipc::StoreMapUpdate &update);
void anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor);
- bool updateCollapsedWith(StoreEntry &collapsed, const sfileno index, const Ipc::StoreMapAnchor &anchor);
+ bool updateAnchoredWith(StoreEntry &, const sfileno, const Ipc::StoreMapAnchor &);
Ipc::Mem::PageId pageForSlice(Ipc::StoreMapSliceId sliceId);
Ipc::StoreMap::Slice &nextAppendableSlice(const sfileno entryIndex, sfileno &sliceOffset);
void swapOutDecision(const MemObject::SwapOut::Decision &decision);
void abort();
- void makePublic(const KeyScope keyScope = ksDefault);
+ bool makePublic(const KeyScope keyScope = ksDefault);
void makePrivate(const bool shareable);
/// A low-level method just resetting "private key" flags.
/// To avoid key inconsistency please use forcePublicKey()
/// or similar instead.
void clearPrivate();
- void setPublicKey(const KeyScope keyScope = ksDefault);
+ bool setPublicKey(const KeyScope keyScope = ksDefault);
/// Resets existing public key to a public key with default scope,
/// releasing the old default-scope entry (if any).
/// Does nothing if the existing public key already has default scope.
void clearPublicKeyScope();
- void setPrivateKey(const bool shareable);
+
+ /// \returns public key (if the entry has it) or nil (otherwise)
+ const cache_key *publicKey() const {
+ return (!EBIT_TEST(flags, KEY_PRIVATE)) ?
+ reinterpret_cast<const cache_key*>(key): // may be nil
+ nullptr;
+ }
+
+ /// Either fills this entry with private key or changes the existing key
+ /// from public to private.
+ /// \param permanent whether this entry should be private forever.
+ void setPrivateKey(const bool shareable, const bool permanent);
+
void expireNow();
+ /// Makes the StoreEntry private and marks the corresponding entry
+ /// for eventual removal from the Store.
void releaseRequest(const bool shareable = false);
void negativeCache();
- void cacheNegatively(); /** \todo argh, why both? */
+ bool cacheNegatively(); /** \todo argh, why both? */
void invokeHandlers();
- void purgeMem();
void cacheInMemory(); ///< start or continue storing in memory cache
void swapOut();
/// whether we are in the process of writing this entry to disk
bool swappingOut() const { return swap_status == SWAPOUT_WRITING; }
+ /// whether the entire entry is now on disk (possibly marked for deletion)
+ bool swappedOut() const { return swap_status == SWAPOUT_DONE; }
void swapOutFileClose(int how);
const char *url() const;
/// Satisfies cachability requirements shared among disk and RAM caches.
/// the disk this entry is [being] cached on; asserts for entries w/o a disk
Store::Disk &disk() const;
+ /// whether one of this StoreEntry owners has locked the corresponding
+ /// disk entry (at the specified disk entry coordinates, if any)
+ bool hasDisk(const sdirno dirn = -1, const sfileno filen = -1) const;
+ /// Makes hasDisk(dirn, filn) true. The caller should have locked
+ /// the corresponding disk store entry for reading or writing.
+ void attachToDisk(const sdirno, const sfileno, const swap_status_t);
+ /// Makes hasDisk() false. The caller should have unlocked
+ /// the corresponding disk store entry.
+ void detachFromDisk();
+
+ /// whether there is a corresponding locked transients table entry
+ bool hasTransients() const { return mem_obj && mem_obj->xitTable.index >= 0; }
+ /// whether there is a corresponding locked shared memory table entry
+ bool hasMemStore() const { return mem_obj && mem_obj->memCache.index >= 0; }
MemObject *mem_obj;
RemovalPolicyNode repl;
void *operator new(size_t byteCount);
void operator delete(void *address);
- void setReleaseFlag();
#if USE_SQUID_ESI
ESIElement::Pointer cachedESITree;
/// update last reference timestamp and related Store metadata
void touch();
+ /// One of the three methods to get rid of an unlocked StoreEntry object.
+ /// Removes all unlocked (and marks for eventual removal all locked) Store
+ /// entries, including attached and unattached entries that have our key.
+ /// Also destroys us if we are unlocked or makes us private otherwise.
+ /// TODO: remove virtual.
virtual void release(const bool shareable = false);
+ /// One of the three methods to get rid of an unlocked StoreEntry object.
+ /// May destroy this object if it is unlocked; does nothing otherwise.
+ /// Unlike release(), may not trigger eviction of underlying store entries,
+ /// but, unlike destroyStoreEntry(), does honor an earlier release request.
+ void abandon(const char *context) { if (!locked()) doAbandon(context); }
+
/// May the caller commit to treating this [previously locked]
/// entry as a cache hit?
bool mayStartHitting() const {
virtual void flush();
protected:
+ typedef Store::EntryGuard EntryGuard;
+
void transientsAbandonmentCheck();
+ /// does nothing except throwing if disk-associated data members are inconsistent
+ void checkDisk() const;
private:
+ void doAbandon(const char *context);
bool checkTooBig() const;
void forcePublicKey(const cache_key *newkey);
- void adjustVary();
+ StoreEntry *adjustVary();
const cache_key *calcPublicKey(const KeyScope keyScope);
static MemAllocator *pool;
typedef void (*STOREGETCLIENT) (StoreEntry *, void *cbdata);
namespace Store {
+
+/// a smart pointer similar to std::unique_ptr<> that automatically
+/// release()s and unlock()s the guarded Entry on stack-unwinding failures
+class EntryGuard {
+public:
+ /// \param entry either nil or a locked Entry to manage
+ /// \param context default unlock() message
+ EntryGuard(Entry *entry, const char *context):
+ entry_(entry), context_(context) {
+ assert(!entry_ || entry_->locked());
+ }
+
+ ~EntryGuard() {
+ if (entry_) {
+ // something went wrong -- the caller did not unlockAndReset() us
+ onException();
+ }
+ }
+
+ EntryGuard(EntryGuard &&) = delete; // no copying or moving (for now)
+
+ /// like std::unique_ptr::get()
+ /// \returns nil or the guarded (locked) entry
+ Entry *get() {
+ return entry_;
+ }
+
+ /// like std::unique_ptr::reset()
+ /// stops guarding the entry
+ /// unlocks the entry (which may destroy it)
+ void unlockAndReset(const char *resetContext = nullptr) {
+ if (entry_) {
+ entry_->unlock(resetContext ? resetContext : context_);
+ entry_ = nullptr;
+ }
+ }
+
+private:
+ void onException() noexcept;
+
+ Entry *entry_; ///< the guarded Entry or nil
+ const char *context_; ///< default unlock() message
+};
+
void Stats(StoreEntry *output);
void Maintain(void *unused);
-};
+}; // namespace Store
/// \ingroup StoreAPI
size_t storeEntryInUse();
/// \ingroup StoreAPI
/// Creates a new StoreEntry with mem_obj and sets initial flags/states.
-StoreEntry *storeCreatePureEntry(const char *storeId, const char *logUrl, const RequestFlags &, const HttpRequestMethod&);
+StoreEntry *storeCreatePureEntry(const char *storeId, const char *logUrl, const HttpRequestMethod&);
/// \ingroup StoreAPI
void storeInit(void);
/// \ingroup StoreAPI
void storeReplAdd(const char *, REMOVALPOLICYCREATE *);
-/// \ingroup StoreAPI
+/// One of the three methods to get rid of an unlocked StoreEntry object.
+/// This low-level method ignores lock()ing and release() promises. It never
+/// leaves the entry in the local store_table.
+/// TODO: Hide by moving its functionality into the StoreEntry destructor.
extern FREE destroyStoreEntry;
/// \ingroup StoreAPI
/// shared memory segment path to use for Transients map
static const SBuf MapLabel("transients_map");
-/// shared memory segment path to use for Transients map extras
-static const char *ExtrasLabel = "transients_ex";
Transients::Transients(): map(NULL), locals(NULL)
{
map = new TransientsMap(MapLabel);
map->cleaner = this;
- extras = shm_old(TransientsMapExtras)(ExtrasLabel);
-
locals = new Locals(entryLimit, 0);
}
if (StoreEntry *oldE = locals->at(index)) {
debugs(20, 3, "not joining private " << *oldE);
assert(EBIT_TEST(oldE->flags, KEY_PRIVATE));
- } else if (StoreEntry *newE = copyFromShm(index)) {
- return newE; // keep read lock to receive updates from others
+ map->closeForReading(index);
+ return nullptr;
}
- // private entry or loading failure
- map->closeForReading(index);
- return NULL;
-}
-
-StoreEntry *
-Transients::copyFromShm(const sfileno index)
-{
- const TransientsMapExtras::Item &extra = extras->items[index];
-
- // create a brand new store entry and initialize it with stored info
- StoreEntry *e = storeCreatePureEntry(extra.url, extra.url,
- extra.reqFlags, extra.reqMethod);
-
- assert(e->mem_obj);
- e->mem_obj->method = extra.reqMethod;
- e->mem_obj->xitTable.io = MemObject::ioReading;
+ StoreEntry *e = new StoreEntry();
+ e->createMemObject();
e->mem_obj->xitTable.index = index;
-
- // TODO: Support collapsed revalidation for SMP-aware caches.
- e->setPublicKey(ksDefault);
- assert(e->key);
-
- // How do we know its SMP- and not just locally-collapsed? A worker gets
- // locally-collapsed entries from the local store_table, not Transients.
- // TODO: Can we remove smpCollapsed by not syncing non-transient entries?
- e->mem_obj->smpCollapsed = true;
-
- assert(!locals->at(index));
- // We do not lock e because we do not want to prevent its destruction;
- // e is tied to us via mem_obj so we will know when it is destructed.
- locals->at(index) = e;
+ e->mem_obj->xitTable.io = Store::ioReading;
+ anchor->exportInto(*e);
+ // keep read lock to receive updates from others
return e;
}
}
void
-Transients::startWriting(StoreEntry *e, const RequestFlags &reqFlags,
- const HttpRequestMethod &reqMethod)
+Transients::monitorIo(StoreEntry *e, const cache_key *key, const Store::IoStatus direction)
{
- assert(e);
- assert(e->mem_obj);
- assert(e->mem_obj->xitTable.index < 0);
+ assert(direction == Store::ioReading || direction == Store::ioWriting);
- if (!map) {
- debugs(20, 5, "No map to add " << *e);
- return;
+ if (!e->hasTransients()) {
+ addEntry(e, key, direction);
+ e->mem_obj->xitTable.io = direction;
}
- sfileno index = 0;
- Ipc::StoreMapAnchor *slot = map->openForWriting(reinterpret_cast<const cache_key *>(e->key), index);
- if (!slot) {
- debugs(20, 5, "collision registering " << *e);
- return;
+ assert(e->hasTransients());
+ const auto index = e->mem_obj->xitTable.index;
+ if (const auto old = locals->at(index)) {
+ assert(old == e);
+ } else {
+ // We do not lock e because we do not want to prevent its destruction;
+ // e is tied to us via mem_obj so we will know when it is destructed.
+ locals->at(index) = e;
}
-
- try {
- if (copyToShm(*e, index, reqFlags, reqMethod)) {
- slot->set(*e);
- e->mem_obj->xitTable.io = MemObject::ioWriting;
- e->mem_obj->xitTable.index = index;
- map->startAppending(index);
- // keep write lock -- we will be supplying others with updates
- return;
- }
- // fall through to the error handling code
- } catch (const std::exception &x) { // TODO: should we catch ... as well?
- debugs(20, 2, "error keeping entry " << index <<
- ' ' << *e << ": " << x.what());
- // fall through to the error handling code
- }
-
- map->abortWriting(index);
}
-/// copies all relevant local data to shared memory
-bool
-Transients::copyToShm(const StoreEntry &e, const sfileno index,
- const RequestFlags &reqFlags,
- const HttpRequestMethod &reqMethod)
+/// creates a new Transients entry or throws
+void
+Transients::addEntry(StoreEntry *e, const cache_key *key, const Store::IoStatus direction)
{
- TransientsMapExtras::Item &extra = extras->items[index];
-
- const char *url = e.url();
- const size_t urlLen = strlen(url);
- Must(urlLen < sizeof(extra.url)); // we have space to store it all, plus 0
- strncpy(extra.url, url, sizeof(extra.url));
- extra.url[urlLen] = '\0';
+ assert(e);
+ assert(e->mem_obj);
+ assert(!e->hasTransients());
- extra.reqFlags = reqFlags;
+ Must(map); // configured to track transients
- Must(reqMethod != Http::METHOD_OTHER);
- extra.reqMethod = reqMethod.id();
+ sfileno index = 0;
+ Ipc::StoreMapAnchor *slot = map->openForWriting(key, index);
+ Must(slot); // no writer collisions
- return true;
+ slot->set(*e, key);
+ e->mem_obj->xitTable.index = index;
+ if (direction == Store::ioWriting) {
+ // keep write lock; the caller will decide what to do with it
+ map->startAppending(e->mem_obj->xitTable.index);
+ } else {
+ // keep the entry locked (for reading) to receive remote DELETE events
+ map->closeForWriting(e->mem_obj->xitTable.index);
+ }
}
void
}
void
-Transients::abandon(const StoreEntry &e)
-{
- assert(e.mem_obj && map);
- map->freeEntry(e.mem_obj->xitTable.index); // just marks the locked entry
- CollapsedForwarding::Broadcast(e);
- // We do not unlock the entry now because the problem is most likely with
- // the server resource rather than a specific cache writer, so we want to
- // prevent other readers from collapsing requests for that resource.
-}
-
-bool
-Transients::abandoned(const StoreEntry &e) const
-{
- assert(e.mem_obj);
- return abandonedAt(e.mem_obj->xitTable.index);
-}
-
-/// whether an in-transit entry at the index is now abandoned by its writer
-bool
-Transients::abandonedAt(const sfileno index) const
+Transients::status(const StoreEntry &entry, bool &aborted, bool &waitingToBeFreed) const
{
assert(map);
- return map->readableEntry(index).waitingToBeFreed;
+ assert(entry.hasTransients());
+ const auto idx = entry.mem_obj->xitTable.index;
+ const auto &anchor = isWriter(entry) ?
+ map->writeableEntry(idx) : map->readableEntry(idx);
+ aborted = anchor.writerHalted;
+ waitingToBeFreed = anchor.waitingToBeFreed;
}
void
Transients::completeWriting(const StoreEntry &e)
{
- if (e.mem_obj && e.mem_obj->xitTable.index >= 0) {
- assert(e.mem_obj->xitTable.io == MemObject::ioWriting);
- // there will be no more updates from us after this, so we must prevent
- // future readers from joining
- map->freeEntry(e.mem_obj->xitTable.index); // just marks the locked entry
- map->closeForWriting(e.mem_obj->xitTable.index);
- e.mem_obj->xitTable.index = -1;
- e.mem_obj->xitTable.io = MemObject::ioDone;
- }
+ assert(e.hasTransients());
+ assert(isWriter(e));
+ map->closeForWriting(e.mem_obj->xitTable.index, true);
+ e.mem_obj->xitTable.io = Store::ioReading;
}
int
Transients::readers(const StoreEntry &e) const
{
- if (e.mem_obj && e.mem_obj->xitTable.index >= 0) {
+ if (e.hasTransients()) {
assert(map);
return map->peekAtEntry(e.mem_obj->xitTable.index).lock.readers;
}
}
void
-Transients::markForUnlink(StoreEntry &e)
-{
- unlink(e);
+Transients::evictCached(StoreEntry &e)
+{
+ debugs(20, 5, e);
+ if (e.hasTransients()) {
+ const auto index = e.mem_obj->xitTable.index;
+ if (map->freeEntry(index)) {
+ // Delay syncCollapsed(index) which may end `e` wait for updates.
+ // Calling it directly/here creates complex reentrant call chains.
+ CollapsedForwarding::Broadcast(e, true);
+ }
+ } // else nothing to do because e must be private
}
void
-Transients::unlink(StoreEntry &e)
+Transients::evictIfFound(const cache_key *key)
{
- if (e.mem_obj && e.mem_obj->xitTable.io == MemObject::ioWriting)
- abandon(e);
+ if (!map)
+ return;
+
+ const sfileno index = map->fileNoByKey(key);
+ if (map->freeEntry(index))
+ CollapsedForwarding::Broadcast(index, true);
}
void
-Transients::disconnect(MemObject &mem_obj)
+Transients::disconnect(StoreEntry &entry)
{
- if (mem_obj.xitTable.index >= 0) {
+ debugs(20, 5, entry);
+ if (entry.hasTransients()) {
+ auto &xitTable = entry.mem_obj->xitTable;
assert(map);
- if (mem_obj.xitTable.io == MemObject::ioWriting) {
- map->abortWriting(mem_obj.xitTable.index);
+ if (isWriter(entry)) {
+ map->abortWriting(xitTable.index);
} else {
- assert(mem_obj.xitTable.io == MemObject::ioReading);
- map->closeForReading(mem_obj.xitTable.index);
+ assert(isReader(entry));
+ map->closeForReading(xitTable.index);
}
- locals->at(mem_obj.xitTable.index) = NULL;
- mem_obj.xitTable.index = -1;
- mem_obj.xitTable.io = MemObject::ioDone;
+ locals->at(xitTable.index) = nullptr;
+ xitTable.index = -1;
+ xitTable.io = Store::ioDone;
}
}
return Config.collapsed_forwarding_shared_entries_limit;
}
+bool
+Transients::markedForDeletion(const cache_key *key) const
+{
+ assert(map);
+ return map->markedForDeletion(key);
+}
+
+bool
+Transients::isReader(const StoreEntry &e) const
+{
+ return e.mem_obj && e.mem_obj->xitTable.io == Store::ioReading;
+}
+
+bool
+Transients::isWriter(const StoreEntry &e) const
+{
+ return e.mem_obj && e.mem_obj->xitTable.io == Store::ioWriting;
+}
+
/// initializes shared memory segment used by Transients
class TransientsRr: public Ipc::Mem::RegisteredRunner
{
public:
/* RegisteredRunner API */
- TransientsRr(): mapOwner(NULL), extrasOwner(NULL) {}
virtual void useConfig();
virtual ~TransientsRr();
virtual void create();
private:
- TransientsMap::Owner *mapOwner;
- Ipc::Mem::Owner<TransientsMapExtras> *extrasOwner;
+ TransientsMap::Owner *mapOwner = nullptr;
};
RunnerRegistrationEntry(TransientsRr);
Must(!mapOwner);
mapOwner = TransientsMap::Init(MapLabel, entryLimit);
- Must(!extrasOwner);
- extrasOwner = shm_new(TransientsMapExtras)(ExtrasLabel, entryLimit);
}
TransientsRr::~TransientsRr()
{
- delete extrasOwner;
delete mapOwner;
}
#ifndef SQUID_TRANSIENTS_H
#define SQUID_TRANSIENTS_H
-#include "http/MethodType.h"
#include "ipc/mem/Page.h"
#include "ipc/mem/PageStack.h"
#include "ipc/StoreMap.h"
#include "Store.h"
#include "store/Controlled.h"
+#include "store/forward.h"
#include <vector>
-// StoreEntry restoration info not already stored by Ipc::StoreMap
-struct TransientsMapExtraItem {
- char url[MAX_URL+1]; ///< Request-URI; TODO: decrease MAX_URL by one
- RequestFlags reqFlags; ///< request flags
- Http::MethodType reqMethod; ///< request method; extensions are not supported
-};
-typedef Ipc::StoreMapItems<TransientsMapExtraItem> TransientsMapExtras;
typedef Ipc::StoreMap TransientsMap;
/// Keeps track of store entries being delivered to clients that arrived before
-/// those entries were [fully] cached. This shared table is necessary to sync
-/// the entry-writing worker with entry-reading worker(s).
+/// those entries were [fully] cached. This SMP-shared table is necessary to
+/// * sync an entry-writing worker with entry-reading worker(s); and
+/// * sync an entry-deleting worker with both entry-reading/writing workers.
class Transients: public Store::Controlled, public Ipc::StoreMapCleaner
{
public:
/// return a local, previously collapsed entry
StoreEntry *findCollapsed(const sfileno xitIndex);
- /// add an in-transit entry suitable for collapsing future requests
- void startWriting(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod);
+ /// start listening for remote DELETE requests targeting either a complete
+ /// StoreEntry (ioReading) or a being-formed miss StoreEntry (ioWriting)
+ void monitorIo(StoreEntry*, const cache_key*, const Store::IoStatus);
/// called when the in-transit entry has been successfully cached
void completeWriting(const StoreEntry &e);
- /// the calling entry writer no longer expects to cache this entry
- void abandon(const StoreEntry &e);
-
- /// whether an in-transit entry is now abandoned by its writer
- bool abandoned(const StoreEntry &e) const;
+ /// copies current shared entry metadata into parameters
+ /// \param aborted whether the entry was aborted
+ /// \param waitingToBeFreed whether the entry was marked for deletion
+ void status(const StoreEntry &e, bool &aborted, bool &waitingToBeFreed) const;
/// number of entry readers some time ago
int readers(const StoreEntry &e) const;
- /// the caller is done writing or reading this entry
- void disconnect(MemObject &mem_obj);
+ /// the caller is done writing or reading the given entry
+ void disconnect(StoreEntry &);
/* Store API */
virtual StoreEntry *get(const cache_key *) override;
virtual void stat(StoreEntry &e) const override;
virtual void reference(StoreEntry &e) override;
virtual bool dereference(StoreEntry &e) override;
- virtual void markForUnlink(StoreEntry &e) override;
- virtual void unlink(StoreEntry &e) override;
+ virtual void evictCached(StoreEntry &) override;
+ virtual void evictIfFound(const cache_key *) override;
virtual void maintain() override;
virtual bool smpAware() const override { return true; }
+ /// Whether an entry with the given public key exists and (but) was
+ /// marked for removal some time ago; get(key) returns nil in such cases.
+ bool markedForDeletion(const cache_key *) const;
+
+ /// whether the entry is in "reading from Transients" I/O state
+ bool isReader(const StoreEntry &) const;
+ /// whether the entry is in "writing to Transients" I/O state
+ bool isWriter(const StoreEntry &) const;
+
static int64_t EntryLimit();
protected:
- StoreEntry *copyFromShm(const sfileno index);
- bool copyToShm(const StoreEntry &e, const sfileno index, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod);
-
- bool abandonedAt(const sfileno index) const;
+ void addEntry(StoreEntry*, const cache_key *, const Store::IoStatus);
// Ipc::StoreMapCleaner API
virtual void noteFreeMapSlice(const Ipc::StoreMapSliceId sliceId) override;
/// shared packed info indexed by Store keys, for creating new StoreEntries
TransientsMap *map;
- /// shared packed info that standard StoreMap does not store for us
- typedef TransientsMapExtras Extras;
- Ipc::Mem::Pointer<Extras> extras;
-
typedef std::vector<StoreEntry*> Locals;
- /// local collapsed entries indexed by transient ID, for syncing old StoreEntries
+ /// local collapsed reader and writer entries, indexed by transient ID,
+ /// for syncing old StoreEntries
Locals *locals;
};
http->log_uri, http->request->flags, http->request->method);
/* NOTE, don't call StoreEntry->lock(), storeCreateEntry() does it */
- if (collapsingAllowed) {
+ if (collapsingAllowed && Store::Root().allowCollapsing(entry, http->request->flags, http->request->method)) {
debugs(88, 5, "allow other revalidation requests to collapse on " << *entry);
- Store::Root().allowCollapsing(entry, http->request->flags,
- http->request->method);
collapsedRevalidation = crInitiator;
} else {
collapsedRevalidation = crNone;
void
purgeEntriesByUrl(HttpRequest * req, const char *url)
{
-#if USE_HTCP
- bool get_or_head_sent = false;
-#endif
-
for (HttpRequestMethod m(Http::METHOD_NONE); m != Http::METHOD_ENUM_END; ++m) {
if (m.respMaybeCacheable()) {
- if (StoreEntry *entry = storeGetPublic(url, m)) {
- debugs(88, 5, "purging " << *entry << ' ' << m << ' ' << url);
+ const cache_key *key = storeKeyPublic(url, m);
+ debugs(88, 5, m << ' ' << url << ' ' << storeKeyText(key));
#if USE_HTCP
- neighborsHtcpClear(entry, url, req, m, HTCP_CLR_INVALIDATION);
- if (m == Http::METHOD_GET || m == Http::METHOD_HEAD) {
- get_or_head_sent = true;
- }
+ neighborsHtcpClear(nullptr, url, req, m, HTCP_CLR_INVALIDATION);
#endif
- entry->release();
- }
+ Store::Root().evictIfFound(key);
}
}
-
-#if USE_HTCP
- if (!get_or_head_sent) {
- neighborsHtcpClear(NULL, url, req, HttpRequestMethod(Http::METHOD_GET), HTCP_CLR_INVALIDATION);
- }
-#endif
}
void
#if USE_HTCP
neighborsHtcpClear(newEntry, NULL, http->request, HttpRequestMethod(Http::METHOD_GET), HTCP_CLR_PURGE);
#endif
- newEntry->release();
+ newEntry->release(true);
purgeStatus = Http::scOkay;
}
#if USE_HTCP
neighborsHtcpClear(newEntry, NULL, http->request, HttpRequestMethod(Http::METHOD_HEAD), HTCP_CLR_PURGE);
#endif
- newEntry->release();
+ newEntry->release(true);
purgeStatus = Http::scOkay;
}
#if USE_HTCP
neighborsHtcpClear(entry, NULL, http->request, HttpRequestMethod(Http::METHOD_GET), HTCP_CLR_PURGE);
#endif
- entry->release();
+ entry->release(true);
purgeStatus = Http::scOkay;
}
#if USE_HTCP
neighborsHtcpClear(entry, NULL, http->request, HttpRequestMethod(Http::METHOD_HEAD), HTCP_CLR_PURGE);
#endif
- entry->release();
+ entry->release(true);
purgeStatus = Http::scOkay;
}
}
!reqFlags.needValidation &&
(m == Http::METHOD_GET || m == Http::METHOD_HEAD)) {
// make the entry available for future requests now
- Store::Root().allowCollapsing(e, reqFlags, m);
+ (void)Store::Root().allowCollapsing(e, reqFlags, m);
}
sc = storeClientListAdd(e, this);
e->timestampsSet();
- if (flags.authenticated) {
- /*
- * Authenticated requests can't be cached.
- */
- e->release();
- } else if (!EBIT_TEST(e->flags, RELEASE_REQUEST) && !getCurrentOffset()) {
- e->setPublicKey();
- } else {
+ // makePublic() if allowed/possible or release() otherwise
+ if (flags.authenticated || // authenticated requests can't be cached
+ getCurrentOffset() ||
+ !e->makePublic()) {
e->release();
}
}
STORE_PENDING
} store_status_t;
+/// StoreEntry relationship with a disk cache
typedef enum {
+ /// StoreEntry is currently not associated with any disk store entry.
+ /// Does not guarantee (or preclude!) a matching disk store entry existence.
SWAPOUT_NONE,
+ /// StoreEntry is being swapped out to the associated disk store entry.
+ /// Guarantees the disk store entry existence.
SWAPOUT_WRITING,
+ /// StoreEntry is associated with a complete (i.e., fully swapped out) disk store entry.
+ /// Guarantees the disk store entry existence.
SWAPOUT_DONE
} swap_status_t;
ENTRY_SPECIAL,
ENTRY_REVALIDATE_ALWAYS,
DELAY_SENDING,
- RELEASE_REQUEST,
+ RELEASE_REQUEST, ///< prohibits making the key public
REFRESH_REQUEST,
ENTRY_REVALIDATE_STALE,
ENTRY_DISPATCHED,
// create a brand new store entry and initialize it with stored basics
StoreEntry *e = new StoreEntry();
+ e->createMemObject();
anchorEntry(*e, filen, *slot);
-
- e->hashInsert(key);
trackReferences(*e);
-
return e;
- // the disk entry remains open for reading, protected from modifications
}
bool
-Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
+Rock::SwapDir::anchorToCache(StoreEntry &entry, bool &inSync)
{
if (!map || !theFile || !theFile->canRead())
return false;
sfileno filen;
const Ipc::StoreMapAnchor *const slot = map->openForReading(
- reinterpret_cast<cache_key*>(collapsed.key), filen);
+ reinterpret_cast<cache_key*>(entry.key), filen);
if (!slot)
return false;
- anchorEntry(collapsed, filen, *slot);
- inSync = updateCollapsedWith(collapsed, *slot);
+ anchorEntry(entry, filen, *slot);
+ inSync = updateAnchoredWith(entry, *slot);
return true; // even if inSync is false
}
bool
-Rock::SwapDir::updateCollapsed(StoreEntry &collapsed)
+Rock::SwapDir::updateAnchored(StoreEntry &entry)
{
if (!map || !theFile || !theFile->canRead())
return false;
- if (collapsed.swap_filen < 0) // no longer using a disk cache
- return true;
- assert(collapsed.swap_dirn == index);
+ assert(entry.hasDisk(index));
- const Ipc::StoreMapAnchor &s = map->readableEntry(collapsed.swap_filen);
- return updateCollapsedWith(collapsed, s);
+ const Ipc::StoreMapAnchor &s = map->readableEntry(entry.swap_filen);
+ return updateAnchoredWith(entry, s);
}
bool
-Rock::SwapDir::updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor)
+Rock::SwapDir::updateAnchoredWith(StoreEntry &entry, const Ipc::StoreMapAnchor &anchor)
{
- collapsed.swap_file_sz = anchor.basics.swap_file_sz;
+ entry.swap_file_sz = anchor.basics.swap_file_sz;
return true;
}
void
Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor)
{
- const Ipc::StoreMapAnchor::Basics &basics = anchor.basics;
+ anchor.exportInto(e);
- e.swap_file_sz = basics.swap_file_sz;
- e.lastref = basics.lastref;
- e.timestamp = basics.timestamp;
- e.expires = basics.expires;
- e.lastModified(basics.lastmod);
- e.refcount = basics.refcount;
- e.flags = basics.flags;
-
- if (anchor.complete()) {
- e.store_status = STORE_OK;
- e.swap_status = SWAPOUT_DONE;
- } else {
- e.store_status = STORE_PENDING;
- e.swap_status = SWAPOUT_WRITING; // even though another worker writes?
- }
+ const bool complete = anchor.complete();
+ e.store_status = complete ? STORE_OK : STORE_PENDING;
+ // SWAPOUT_WRITING: even though another worker writes?
+ e.attachToDisk(index, filen, complete ? SWAPOUT_DONE : SWAPOUT_WRITING);
e.ping_status = PING_NONE;
- EBIT_CLR(e.flags, RELEASE_REQUEST);
- e.clearPrivate();
EBIT_SET(e.flags, ENTRY_VALIDATED);
-
- e.swap_dirn = index;
- e.swap_filen = filen;
}
void Rock::SwapDir::disconnect(StoreEntry &e)
{
- assert(e.swap_dirn == index);
- assert(e.swap_filen >= 0);
- // cannot have SWAPOUT_NONE entry with swap_filen >= 0
- assert(e.swap_status != SWAPOUT_NONE);
+ assert(e.hasDisk(index));
+
+ ignoreReferences(e);
// do not rely on e.swap_status here because there is an async delay
// before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
if (e.mem_obj && e.mem_obj->swapout.sio != NULL &&
dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_) {
map->abortWriting(e.swap_filen);
- e.swap_dirn = -1;
- e.swap_filen = -1;
- e.swap_status = SWAPOUT_NONE;
+ e.detachFromDisk();
dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_ = NULL;
- Store::Root().transientsAbandon(e); // broadcasts after the change
+ Store::Root().stopSharing(e); // broadcasts after the change
} else {
map->closeForReading(e.swap_filen);
- e.swap_dirn = -1;
- e.swap_filen = -1;
- e.swap_status = SWAPOUT_NONE;
+ e.detachFromDisk();
}
}
}
void
-Rock::SwapDir::swappedOut(const StoreEntry &)
+Rock::SwapDir::finalizeSwapoutSuccess(const StoreEntry &)
+{
+ // nothing to do
+}
+
+void
+Rock::SwapDir::finalizeSwapoutFailure(StoreEntry &entry)
{
- // stats are not stored but computed when needed
+ debugs(47, 5, entry);
+ disconnect(entry); // calls abortWriting() to free the disk entry
}
int64_t
return NULL;
}
- if (e.swap_filen < 0) {
+ if (!e.hasDisk()) {
debugs(47,4, HERE << e);
return NULL;
}
writeError(sio);
sio.finishedWriting(errflag);
- // and hope that Core will call disconnect() to close the map entry
+ // and wait for the finalizeSwapoutFailure() call to close the map entry
}
if (sio.touchingStoreEntry())
map->freeEntry(sio.swap_filen); // will mark as unusable, just in case
if (sio.touchingStoreEntry())
- Store::Root().transientsAbandon(*sio.e);
+ Store::Root().stopSharing(*sio.e);
// else noop: a fresh entry update error does not affect stale entry readers
// All callers must also call IoState callback, to propagate the error.
}
void
-Rock::SwapDir::unlink(StoreEntry &e)
+Rock::SwapDir::evictIfFound(const cache_key *key)
{
- debugs(47, 5, HERE << e);
- ignoreReferences(e);
- map->freeEntry(e.swap_filen);
- disconnect(e);
+ if (map)
+ map->freeEntryByKey(key); // may not be there
}
void
-Rock::SwapDir::markForUnlink(StoreEntry &e)
+Rock::SwapDir::evictCached(StoreEntry &e)
{
debugs(47, 5, e);
- map->freeEntry(e.swap_filen);
+ if (e.hasDisk(index)) {
+ if (map->freeEntry(e.swap_filen))
+ CollapsedForwarding::Broadcast(e);
+ if (!e.locked())
+ disconnect(e);
+ } else if (const auto key = e.publicKey()) {
+ evictIfFound(key);
+ }
}
void
return spacesPath.termedBuf();
}
+bool
+Rock::SwapDir::hasReadableEntry(const StoreEntry &e) const
+{
+ return map->hasReadableEntry(reinterpret_cast<const cache_key*>(e.key));
+}
+
namespace Rock
{
RunnerRegistrationEntry(SwapDirRr);
/* public ::SwapDir API */
virtual void reconfigure();
virtual StoreEntry *get(const cache_key *key);
- virtual void markForUnlink(StoreEntry &e);
+ virtual void evictCached(StoreEntry &);
+ virtual void evictIfFound(const cache_key *);
virtual void disconnect(StoreEntry &e);
virtual uint64_t currentSize() const;
virtual uint64_t currentCount() const;
virtual bool doReportStat() const;
- virtual void swappedOut(const StoreEntry &e);
+ virtual void finalizeSwapoutSuccess(const StoreEntry &);
+ virtual void finalizeSwapoutFailure(StoreEntry &);
virtual void create();
virtual void parse(int index, char *path);
virtual bool smpAware() const { return true; }
+ virtual bool hasReadableEntry(const StoreEntry &) const;
// temporary path to the shared memory map of first slots of cached entries
SBuf inodeMapPath() const;
protected:
/* Store API */
- virtual bool anchorCollapsed(StoreEntry &collapsed, bool &inSync);
- virtual bool updateCollapsed(StoreEntry &collapsed);
+ virtual bool anchorToCache(StoreEntry &entry, bool &inSync);
+ virtual bool updateAnchored(StoreEntry &);
/* protected ::SwapDir API */
virtual bool needsDiskStrand() const;
virtual bool dereference(StoreEntry &e);
virtual void updateHeaders(StoreEntry *e);
virtual bool unlinkdUseful() const;
- virtual void unlink(StoreEntry &e);
virtual void statfs(StoreEntry &e) const;
/* IORequestor API */
StoreIOState::Pointer createUpdateIO(const Ipc::StoreMapUpdate &update, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *);
void anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor);
- bool updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor);
+ bool updateAnchoredWith(StoreEntry &, const Ipc::StoreMapAnchor &);
friend class Rebuild;
friend class IoState;
fn(0),
entry(NULL),
td(NULL),
- e(NULL),
fromLog(true),
_done(false),
cbdata(NULL)
void
Fs::Ufs::RebuildState::rebuildStep()
{
- currentEntry(NULL);
-
// Balance our desire to maximize the number of entries processed at once
// (and, hence, minimize overheads and total rebuild time) with a
// requirement to also process Coordinator events, disk I/Os, etc.
return;
}
- if (!storeRebuildKeepEntry(tmpe, key, counts))
+ addIfFresh(key,
+ filn,
+ tmpe.swap_file_sz,
+ tmpe.expires,
+ tmpe.timestamp,
+ tmpe.lastref,
+ tmpe.lastModified(),
+ tmpe.refcount,
+ tmpe.flags);
+}
+
+/// if the loaded entry metadata is still relevant, indexes the entry
+void
+Fs::Ufs::RebuildState::addIfFresh(const cache_key *key,
+ sfileno file_number,
+ uint64_t swap_file_sz,
+ time_t expires,
+ time_t timestamp,
+ time_t lastref,
+ time_t lastmod,
+ uint32_t refcount,
+ uint16_t newFlags)
+{
+ if (!evictStaleAndContinue(key, lastref, counts.dupcount))
return;
++counts.objcount;
- // tmpe.dump(5);
- currentEntry(sd->addDiskRestore(key,
- filn,
- tmpe.swap_file_sz,
- tmpe.expires,
- tmpe.timestamp,
- tmpe.lastref,
- tmpe.lastModified(),
- tmpe.refcount, /* refcount */
- tmpe.flags, /* flags */
- (int) flags.clean));
- storeDirSwapLog(currentEntry(), SWAP_LOG_ADD);
+ const auto addedEntry = sd->addDiskRestore(key,
+ file_number,
+ swap_file_sz,
+ expires,
+ timestamp,
+ lastref,
+ lastmod,
+ refcount,
+ newFlags,
+ 0 /* XXX: unused */);
+ storeDirSwapLog(addedEntry, SWAP_LOG_ADD);
}
-StoreEntry *
-Fs::Ufs::RebuildState::currentEntry() const
+/// Evicts a matching entry if it was last touched before caller's maxRef.
+/// \returns false only if the matching entry was touched at or after maxRef,
+/// indicating that the caller has supplied outdated maxRef.
+bool
+Fs::Ufs::RebuildState::evictStaleAndContinue(const cache_key *candidateKey, const time_t maxRef, int &staleCount)
{
- return e;
-}
+ if (auto *indexedEntry = Store::Root().peek(candidateKey)) {
-void
-Fs::Ufs::RebuildState::currentEntry(StoreEntry *newValue)
-{
- e = newValue;
+ if (indexedEntry->lastref >= maxRef) {
+ indexedEntry->abandon("Fs::Ufs::RebuildState::evictStaleAndContinue");
+ ++counts.clashcount;
+ return false;
+ }
+
+ ++staleCount;
+ indexedEntry->release(true); // evict previously indexedEntry
+ }
+
+ return true;
}
/// process one swap log entry
if (swapData.op == SWAP_LOG_ADD) {
(void) 0;
} else if (swapData.op == SWAP_LOG_DEL) {
- /* Delete unless we already have a newer copy anywhere in any store */
- /* this needs to become
- * 1) unpack url
- * 2) make synthetic request with headers ?? or otherwise search
- * for a matching object in the store
- * TODO FIXME change to new async api
- */
- currentEntry (Store::Root().get(swapData.key));
-
- if (currentEntry() != NULL && swapData.lastref >= e->lastref) {
- undoAdd();
- --counts.objcount;
- ++counts.cancelcount;
- }
+ // remove any older or same-age entry; +1 covers same-age entries
+ (void)evictStaleAndContinue(swapData.key, swapData.lastref+1, counts.cancelcount);
return;
} else {
const double
return;
}
- /* this needs to become
- * 1) unpack url
- * 2) make synthetic request with headers ?? or otherwise search
- * for a matching object in the store
- * TODO FIXME change to new async api
- */
- currentEntry (Store::Root().get(swapData.key));
-
- int used; /* is swapfile already in use? */
-
- used = sd->mapBitTest(swapData.swap_filen);
-
- /* If this URL already exists in the cache, does the swap log
- * appear to have a newer entry? Compare 'lastref' from the
- * swap log to e->lastref. */
- /* is the log entry newer than current entry? */
- int disk_entry_newer = currentEntry() ? (swapData.lastref > currentEntry()->lastref ? 1 : 0) : 0;
-
- if (used && !disk_entry_newer) {
- /* log entry is old, ignore it */
+ if (sd->mapBitTest(swapData.swap_filen)) {
+ // While we were scanning the logs, some (unrelated) entry was added to
+ // our disk using our logged swap_filen. We could change our swap_filen
+ // and move the store file, but there is no Store API to do that (TODO).
++counts.clashcount;
return;
- } else if (used && currentEntry() && currentEntry()->swap_filen == swapData.swap_filen && currentEntry()->swap_dirn == sd->index) {
- /* swapfile taken, same URL, newer, update meta */
-
- if (currentEntry()->store_status == STORE_OK) {
- currentEntry()->lastref = swapData.timestamp;
- currentEntry()->timestamp = swapData.timestamp;
- currentEntry()->expires = swapData.expires;
- currentEntry()->lastModified(swapData.lastmod);
- currentEntry()->flags = swapData.flags;
- currentEntry()->refcount += swapData.refcount;
- sd->dereference(*currentEntry());
- } else {
- debug_trap("commonUfsDirRebuildFromSwapLog: bad condition");
- debugs(47, DBG_IMPORTANT, HERE << "bad condition");
- }
- return;
- } else if (used) {
- /* swapfile in use, not by this URL, log entry is newer */
- /* This is sorta bad: the log entry should NOT be newer at this
- * point. If the log is dirty, the filesize check should have
- * caught this. If the log is clean, there should never be a
- * newer entry. */
- debugs(47, DBG_IMPORTANT, "WARNING: newer swaplog entry for dirno " <<
- sd->index << ", fileno "<< std::setfill('0') << std::hex <<
- std::uppercase << std::setw(8) << swapData.swap_filen);
-
- /* I'm tempted to remove the swapfile here just to be safe,
- * but there is a bad race condition in the NOVM version if
- * the swapfile has recently been opened for writing, but
- * not yet opened for reading. Because we can't map
- * swapfiles back to StoreEntrys, we don't know the state
- * of the entry using that file. */
- /* We'll assume the existing entry is valid, probably because
- * were in a slow rebuild and the the swap file number got taken
- * and the validation procedure hasn't run. */
- assert(flags.need_to_validate);
- ++counts.clashcount;
- return;
- } else if (currentEntry() && !disk_entry_newer) {
- /* key already exists, current entry is newer */
- /* keep old, ignore new */
- ++counts.dupcount;
- return;
- } else if (currentEntry()) {
- /* key already exists, this swapfile not being used */
- /* junk old, load new */
- undoAdd();
- --counts.objcount;
- ++counts.dupcount;
- } else {
- /* URL doesnt exist, swapfile not in use */
- /* load new */
- (void) 0;
- }
-
- ++counts.objcount;
-
- currentEntry(sd->addDiskRestore(swapData.key,
- swapData.swap_filen,
- swapData.swap_file_sz,
- swapData.expires,
- swapData.timestamp,
- swapData.lastref,
- swapData.lastmod,
- swapData.refcount,
- swapData.flags,
- (int) flags.clean));
-
- storeDirSwapLog(currentEntry(), SWAP_LOG_ADD);
-}
-
-/// undo the effects of adding an entry in rebuildFromSwapLog()
-void
-Fs::Ufs::RebuildState::undoAdd()
-{
- StoreEntry *added = currentEntry();
- assert(added);
- currentEntry(NULL);
-
- // TODO: Why bother with these two if we are going to release?!
- added->expireNow();
- added->releaseRequest();
-
- if (added->swap_filen > -1) {
- SwapDir *someDir = INDEXSD(added->swap_dirn);
- assert(someDir);
- if (UFSSwapDir *ufsDir = dynamic_cast<UFSSwapDir*>(someDir))
- ufsDir->undoAddDiskRestore(added);
- // else the entry was loaded from and/or is currently in a non-UFS dir
- // Thus, there is no use in preserving its disk file (the only purpose
- // of undoAddDiskRestore!), even if we could. Instead, we release the
- // the entry and [eventually] unlink its disk file or free its slot.
}
- added->release();
+ addIfFresh(swapData.key,
+ swapData.swap_filen,
+ swapData.swap_file_sz,
+ swapData.expires,
+ swapData.timestamp,
+ swapData.lastref,
+ swapData.lastmod,
+ swapData.refcount,
+ swapData.flags);
}
int
return _done;
}
-StoreEntry *
-Fs::Ufs::RebuildState::currentItem()
-{
- return currentEntry();
-}
-
virtual bool error() const;
virtual bool isDone() const;
- virtual StoreEntry *currentItem();
RefCount<UFSSwapDir> sd;
int n_read;
void rebuildFromDirectory();
void rebuildFromSwapLog();
void rebuildStep();
- void undoAdd();
+ void addIfFresh(const cache_key *key,
+ sfileno file_number,
+ uint64_t swap_file_sz,
+ time_t expires,
+ time_t timestamp,
+ time_t lastref,
+ time_t lastmod,
+ uint32_t refcount,
+ uint16_t flags);
+ bool evictStaleAndContinue(const cache_key *candidateKey, const time_t maxRef, int &staleCount);
int getNextFile(sfileno *, int *size);
- StoreEntry *currentEntry() const;
- void currentEntry(StoreEntry *);
- StoreEntry *e;
bool fromLog;
bool _done;
/// \bug (callback) should be hidden behind a proper human readable name
++removed;
- e->release();
+ e->release(true);
}
walker->Done(walker);
e = new StoreEntry();
e->store_status = STORE_OK;
e->setMemStatus(NOT_IN_MEMORY);
- e->swap_status = SWAPOUT_DONE;
- e->swap_filen = file_number;
- e->swap_dirn = index;
+ e->attachToDisk(index, file_number, SWAPOUT_DONE);
e->swap_file_sz = swap_file_sz;
e->lastref = lastref;
e->timestamp = timestamp;
e->lastModified(lastmod);
e->refcount = refcount;
e->flags = newFlags;
- EBIT_CLR(e->flags, RELEASE_REQUEST);
- e->clearPrivate();
e->ping_status = PING_NONE;
EBIT_CLR(e->flags, ENTRY_VALIDATED);
mapBitSet(e->swap_filen);
cur_size += fs.blksize * sizeInBlocks(e->swap_file_sz);
++n_disk_objects;
- e->hashInsert(key); /* do it after we clear KEY_PRIVATE */
+ e->hashInsert(key);
replacementAdd (e);
return e;
}
-void
-Fs::Ufs::UFSSwapDir::undoAddDiskRestore(StoreEntry *e)
-{
- debugs(47, 5, HERE << *e);
- replacementRemove(e); // checks swap_dirn so do it before we invalidate it
- // Do not unlink the file as it might be used by a subsequent entry.
- mapBitReset(e->swap_filen);
- e->swap_filen = -1;
- e->swap_dirn = -1;
- cur_size -= fs.blksize * sizeInBlocks(e->swap_file_sz);
- --n_disk_objects;
-}
-
void
Fs::Ufs::UFSSwapDir::rebuild()
{
}
void
-Fs::Ufs::UFSSwapDir::unlink(StoreEntry & e)
+Fs::Ufs::UFSSwapDir::evictCached(StoreEntry & e)
{
- debugs(79, 3, HERE << "dirno " << index << ", fileno "<<
- std::setfill('0') << std::hex << std::uppercase << std::setw(8) << e.swap_filen);
- if (e.swap_status == SWAPOUT_DONE) {
+ debugs(79, 3, e);
+ if (e.locked()) // somebody else may still be using this file
+ return; // nothing to do: our get() always returns nil
+
+ if (!e.hasDisk())
+ return; // see evictIfFound()
+
+ if (e.swappedOut()) {
cur_size -= fs.blksize * sizeInBlocks(e.swap_file_sz);
--n_disk_objects;
}
replacementRemove(&e);
mapBitReset(e.swap_filen);
UFSSwapDir::unlinkFile(e.swap_filen);
- e.swap_filen = -1;
- e.swap_dirn = -1;
- e.swap_status = SWAPOUT_NONE;
+ e.detachFromDisk();
+}
+
+void
+Fs::Ufs::UFSSwapDir::evictIfFound(const cache_key *)
+{
+ // UFS disk entries always have (attached) StoreEntries so if we got here,
+ // the entry is not cached on disk and there is nothing for us to do.
}
void
void
Fs::Ufs::UFSSwapDir::replacementRemove(StoreEntry * e)
{
- if (e->swap_dirn < 0)
- return;
+ assert(e->hasDisk());
SwapDirPointer SD = INDEXSD(e->swap_dirn);
}
void
-Fs::Ufs::UFSSwapDir::swappedOut(const StoreEntry &e)
+Fs::Ufs::UFSSwapDir::finalizeSwapoutSuccess(const StoreEntry &e)
{
cur_size += fs.blksize * sizeInBlocks(e.swap_file_sz);
++n_disk_objects;
}
+void
+Fs::Ufs::UFSSwapDir::finalizeSwapoutFailure(StoreEntry &entry)
+{
+ debugs(47, 5, entry);
+ // rely on the expected subsequent StoreEntry::release(), evictCached(), or
+ // a similar call to call unlink(), detachFromDisk(), etc. for the entry.
+}
+
void
Fs::Ufs::UFSSwapDir::logEntry(const StoreEntry & e, int op) const
{
virtual void dump(StoreEntry &) const override;
virtual bool doubleCheck(StoreEntry &) override;
virtual bool unlinkdUseful() const override;
- virtual void unlink(StoreEntry &) override;
virtual void statfs(StoreEntry &) const override;
virtual void maintain() override;
- virtual void markForUnlink(StoreEntry &) override {}
+ virtual void evictCached(StoreEntry &) override;
+ virtual void evictIfFound(const cache_key *) override;
virtual bool canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const override;
virtual void reference(StoreEntry &) override;
virtual bool dereference(StoreEntry &) override;
virtual void reconfigure() override;
virtual int callback() override;
virtual void sync() override;
- virtual void swappedOut(const StoreEntry &e) override;
+ virtual void finalizeSwapoutSuccess(const StoreEntry &) override;
+ virtual void finalizeSwapoutFailure(StoreEntry &) override;
virtual uint64_t currentSize() const override { return cur_size; }
virtual uint64_t currentCount() const override { return n_disk_objects; }
virtual ConfigOption *getOptionTree() const override;
virtual bool smpAware() const override { return false; }
+ /// as long as ufs relies on the global store_table to index entries,
+ /// it is wrong to ask individual ufs cache_dirs whether they have an entry
+ virtual bool hasReadableEntry(const StoreEntry &) const override { return false; }
void unlinkFile(sfileno f);
// move down when unlink is a virtual method
uint32_t refcount,
uint16_t flags,
int clean);
- /// Undo the effects of UFSSwapDir::addDiskRestore().
- void undoAddDiskRestore(StoreEntry *e);
int validFileno(sfileno filn, int flag) const;
int mapBitAllocate();
CommIoCbPtrFun(gopherSendComplete, gopherState));
Comm::Write(gopherState->serverConn, &mb, call);
- gopherState->entry->makePublic();
+ if (!gopherState->entry->makePublic())
+ gopherState->entry->makePrivate(true);
}
void
* for example, the request to this neighbor fails.
*/
if (_peer->options.proxy_only)
- entry->releaseRequest();
+ entry->releaseRequest(true);
#if USE_DELAY_POOLS
entry->setNoDelay(_peer->options.no_delay);
#if USE_HTCP
neighborsHtcpClear(e, nullptr, e->mem_obj->request.getRaw(), e->mem_obj->method, HTCP_CLR_INVALIDATION);
#endif
- pe->release();
+ pe->release(true);
}
/** \par
#if USE_HTCP
neighborsHtcpClear(e, nullptr, e->mem_obj->request.getRaw(), HttpRequestMethod(Http::METHOD_HEAD), HTCP_CLR_INVALIDATION);
#endif
- pe->release();
+ pe->release(true);
}
}
#endif
if (EBIT_TEST(entry->flags, RELEASE_REQUEST))
- return decision.make(ReuseDecision::reuseNot, "the entry has been released");
+ return decision.make(ReuseDecision::doNotCacheButShare, "the entry has been released");
// RFC 7234 section 4: a cache MUST use the most recent response
// (as determined by the Date header field)
/* Check if object is cacheable or not based on reply code */
debugs(11, 3, "HTTP CODE: " << statusCode);
- if (const StoreEntry *oldEntry = findPreviouslyCachedEntry(entry))
+ if (StoreEntry *oldEntry = findPreviouslyCachedEntry(entry)) {
+ oldEntry->lock("HttpStateData::haveParsedReplyHeaders");
sawDateGoBack = rep->olderThan(oldEntry->getReply());
+ oldEntry->unlock("HttpStateData::haveParsedReplyHeaders");
+ }
if (neighbors_do_private_keys && !sawDateGoBack)
httpMaybeRemovePublic(entry, rep->sline.status());
break;
case ReuseDecision::cachePositively:
- entry->makePublic();
+ if (!entry->makePublic()) {
+ decision.make(ReuseDecision::doNotCacheButShare, "public key creation error");
+ entry->makePrivate(true);
+ }
break;
case ReuseDecision::cacheNegatively:
- entry->cacheNegatively();
+ if (!entry->cacheNegatively()) {
+ decision.make(ReuseDecision::doNotCacheButShare, "public key creation error");
+ entry->makePrivate(true);
+ }
break;
case ReuseDecision::doNotCacheButShare:
#include "ipc/StoreMap.h"
#include "sbuf/SBuf.h"
#include "Store.h"
+#include "store/Controller.h"
#include "store_key_md5.h"
#include "tools.h"
debugs(54, 5, "closed clean entry " << fileno << " for writing " << path);
} else {
s.waitingToBeFreed = true;
+ s.writerHalted = true;
s.lock.unlockExclusive();
debugs(54, 5, "closed dirty entry " << fileno << " for writing " << path);
}
return anchorAt(fileno);
}
-void
+bool
Ipc::StoreMap::freeEntry(const sfileno fileno)
{
debugs(54, 5, "marking entry " << fileno << " to be freed in " << path);
Anchor &s = anchorAt(fileno);
- if (s.lock.lockExclusive())
+ if (s.lock.lockExclusive()) {
+ const bool result = !s.waitingToBeFreed && !s.empty();
freeChain(fileno, s, false);
- else
- s.waitingToBeFreed = true; // mark to free it later
+ return result;
+ }
+
+ uint8_t expected = false;
+ // mark to free the locked entry later (if not already marked)
+ return s.waitingToBeFreed.compare_exchange_strong(expected, true);
}
void
}
}
+bool
+Ipc::StoreMap::markedForDeletion(const cache_key *const key)
+{
+ const int idx = fileNoByKey(key);
+ const Anchor &s = anchorAt(idx);
+ return s.sameKey(key) ? bool(s.waitingToBeFreed) : false;
+}
+
+bool
+Ipc::StoreMap::hasReadableEntry(const cache_key *const key)
+{
+ sfileno index;
+ if (openForReading(reinterpret_cast<const cache_key*>(key), index)) {
+ closeForReading(index);
+ return true;
+ }
+ return false;
+}
+
/// unconditionally frees an already locked chain of slots, unlocking if needed
void
Ipc::StoreMap::freeChain(const sfileno fileno, Anchor &inode, const bool keepLocked)
sfileno
Ipc::StoreMap::nameByKey(const cache_key *const key) const
{
+ assert(key);
const uint64_t *const k = reinterpret_cast<const uint64_t *>(key);
// TODO: use a better hash function
const int hash = (k[0] + k[1]) % entryLimit();
Ipc::StoreMapAnchor::setKey(const cache_key *const aKey)
{
memcpy(key, aKey, sizeof(key));
+ waitingToBeFreed = Store::Root().markedForDeletion(aKey);
}
bool
}
void
-Ipc::StoreMapAnchor::set(const StoreEntry &from)
+Ipc::StoreMapAnchor::set(const StoreEntry &from, const cache_key *aKey)
{
assert(writing() && !reading());
- memcpy(key, from.key, sizeof(key));
+ setKey(reinterpret_cast<const cache_key*>(aKey ? aKey : from.key));
basics.timestamp = from.timestamp;
basics.lastref = from.lastref;
basics.expires = from.expires;
basics.lastmod = from.lastModified();
basics.swap_file_sz = from.swap_file_sz;
basics.refcount = from.refcount;
- basics.flags = from.flags;
+
+ // do not copy key bit if we are not using from.key
+ // TODO: Replace KEY_PRIVATE with a nil StoreEntry::key!
+ uint16_t cleanFlags = from.flags;
+ if (aKey)
+ EBIT_CLR(cleanFlags, KEY_PRIVATE);
+ basics.flags = cleanFlags;
+}
+
+void
+Ipc::StoreMapAnchor::exportInto(StoreEntry &into) const
+{
+ assert(reading());
+ into.timestamp = basics.timestamp;
+ into.lastref = basics.lastref;
+ into.expires = basics.expires;
+ into.lastModified(basics.lastmod);
+ into.swap_file_sz = basics.swap_file_sz;
+ into.refcount = basics.refcount;
+ into.flags = basics.flags;
}
void
memset(&key, 0, sizeof(key));
memset(&basics, 0, sizeof(basics));
waitingToBeFreed = false;
+ writerHalted = false;
// but keep the lock
}
StoreMapAnchor();
/// store StoreEntry key and basics for an inode slot
- void set(const StoreEntry &anEntry);
+ void set(const StoreEntry &anEntry, const cache_key *aKey = nullptr);
+ /// load StoreEntry basics that were previously stored with set()
+ void exportInto(StoreEntry &) const;
void setKey(const cache_key *const aKey);
bool sameKey(const cache_key *const aKey) const;
public:
mutable ReadWriteLock lock; ///< protects slot data below
std::atomic<uint8_t> waitingToBeFreed; ///< may be accessed w/o a lock
+ /// whether StoreMap::abortWriting() was called for a read-locked entry
+ std::atomic<uint8_t> writerHalted;
// fields marked with [app] can be modified when appending-while-reading
// fields marked with [update] can be modified when updating-while-reading
const Anchor &peekAtEntry(const sfileno fileno) const;
/// free the entry if possible or mark it as waiting to be freed if not
- void freeEntry(const sfileno fileno);
+ /// \returns whether the entry was neither empty nor marked
+ bool freeEntry(const sfileno);
/// free the entry if possible or mark it as waiting to be freed if not
/// does nothing if we cannot check that the key matches the cached entry
void freeEntryByKey(const cache_key *const key);
+ /// whether the entry with the given key exists and was marked as
+ /// "waiting to be freed" some time ago
+ bool markedForDeletion(const cache_key *const);
+
+ /// whether the index contains a valid readable entry with the given key
+ bool hasReadableEntry(const cache_key *const);
+
/// opens entry (identified by key) for reading, increments read level
const Anchor *openForReading(const cache_key *const key, sfileno &fileno);
/// opens entry (identified by sfileno) for reading, increments read level
#include "MemBuf.h"
#include "MemObject.h"
#include "mime.h"
-#include "RequestFlags.h"
#include "SquidConfig.h"
#include "Store.h"
#include "StoreClient.h"
status = Http::scNoContent;
}
+ StoreEntry *e = storeCreatePureEntry(url_, url_, Http::METHOD_GET);
+ e->lock("MimeIcon::created");
+ EBIT_SET(e->flags, ENTRY_SPECIAL);
+ const auto madePublic = e->setPublicKey();
+ assert(madePublic); // nothing can block ENTRY_SPECIAL from becoming public
+
+ /* fill `e` with a canned 2xx response object */
+
const MasterXaction::Pointer mx = new MasterXaction(XactionInitiator::initIcon);
HttpRequestPointer r(HttpRequest::FromUrl(url_, mx));
if (!r)
fatalf("mimeLoadIcon: cannot parse internal URL: %s", url_);
- // fill newEntry with a canned 2xx response object
- RequestFlags flags;
- flags.cachable = true;
- StoreEntry *e = storeCreateEntry(url_,url_,flags,Http::METHOD_GET);
- assert(e != NULL);
- EBIT_SET(e->flags, ENTRY_SPECIAL);
- e->setPublicKey();
e->buffer();
e->mem_obj->request = r;
if (Config.peers == NULL)
return 0;
- assert(entry->swap_status == SWAPOUT_NONE);
+ assert(!entry->hasDisk());
mem->start_ping = current_time;
debugs(15, 6, "neighborsUdpAck: opcode " << opcode << " '" << storeKeyText(key) << "'");
- if (NULL != (entry = Store::Root().get(key)))
+ if ((entry = Store::Root().findCallback(key)))
mem = entry->mem_obj;
if ((p = whichPeer(from)))
void
neighborsHtcpReply(const cache_key * key, HtcpReplyData * htcp, const Ip::Address &from)
{
- StoreEntry *e = Store::Root().get(key);
+ StoreEntry *e = Store::Root().findCallback(key);
MemObject *mem = NULL;
CachePeer *p;
peer_t ntype = PEER_NONE;
CachePeer *p = pd->peer;
StoreEntry *e, *old_e;
char *url = NULL;
- const cache_key *key;
HttpRequest *req;
StoreIOBuffer tempBuffer;
url = xstrdup(p->digest_url);
else
url = xstrdup(internalRemoteUri(p->host, p->http_port, "/squid-internal-periodic/", SBuf(StoreDigestFileName)));
+ debugs(72, 2, url);
const MasterXaction::Pointer mx = new MasterXaction(XactionInitiator::initCacheDigest);
req = HttpRequest::FromUrl(url, mx);
assert(req);
- key = storeKeyPublicByRequest(req);
-
- debugs(72, 2, "peerDigestRequest: " << url << " key: " << storeKeyText(key));
-
/* add custom headers */
assert(!req->header.len);
pd_last_req_time = squid_curtime;
req->flags.cachable = true;
- /* the rest is based on clientProcessExpired() */
+ /* the rest is based on clientReplyContext::processExpired() */
req->flags.refresh = true;
- old_e = fetch->old_entry = Store::Root().get(key);
+ old_e = fetch->old_entry = storeGetPublicByRequest(req);
if (old_e) {
- debugs(72, 5, "peerDigestRequest: found old entry");
+ debugs(72, 5, "found old " << *old_e);
old_e->lock("peerDigestRequest");
old_e->ensureMemObject(url, url, req->method);
}
e = fetch->entry = storeCreateEntry(url, url, req->flags, req->method);
+ debugs(72, 5, "created " << *e);
assert(EBIT_TEST(e->flags, KEY_PRIVATE));
fetch->sc = storeClientListAdd(e, fetch);
/* set lastmod to trigger IMS request if possible */
e->lastModified(old_e->lastModified());
/* push towards peer cache */
- debugs(72, 3, "peerDigestRequest: forwarding to fwdStart...");
-
FwdState::fwdStart(Comm::ConnectionPointer(), e, req);
tempBuffer.offset = 0;
/* DEBUG: section 20 Storage Manager */
#include "squid.h"
+#include "base/TextException.h"
#include "CacheDigest.h"
#include "CacheManager.h"
#include "comm/Connection.h"
pool->freeOne(address);
}
-void
+bool
StoreEntry::makePublic(const KeyScope scope)
{
/* This object can be cached for a long time */
- if (!EBIT_TEST(flags, RELEASE_REQUEST))
- setPublicKey(scope);
+ return !EBIT_TEST(flags, RELEASE_REQUEST) && setPublicKey(scope);
}
void
StoreEntry::makePrivate(const bool shareable)
{
- /* This object should never be cached at all */
- expireNow();
releaseRequest(shareable); /* delete object when not used */
}
void
StoreEntry::clearPrivate()
{
+ assert(!EBIT_TEST(flags, RELEASE_REQUEST));
EBIT_CLR(flags, KEY_PRIVATE);
shareableWhenPrivate = false;
}
-void
+bool
StoreEntry::cacheNegatively()
{
/* This object may be negatively cached */
- negativeCache();
- makePublic();
+ if (makePublic()) {
+ negativeCache();
+ return true;
+ }
+ return false;
}
size_t
/* the object has completed. */
if (mem_obj->inmem_lo == 0 && !isEmpty()) {
- if (swap_status == SWAPOUT_DONE) {
+ if (swappedOut()) {
debugs(20,7, HERE << mem_obj << " lo: " << mem_obj->inmem_lo << " hi: " << mem_obj->endOffset() << " size: " << mem_obj->object_sz);
if (mem_obj->endOffset() == mem_obj->object_sz) {
/* hot object fully swapped in (XXX: or swapped out?) */
void
StoreEntry::destroyMemObject()
{
- debugs(20, 3, HERE << "destroyMemObject " << mem_obj);
+ debugs(20, 3, mem_obj << " in " << *this);
- if (MemObject *mem = mem_obj) {
- // Store::Root() is FATALly missing during shutdown
- if (mem->xitTable.index >= 0 && !shutting_down)
- Store::Root().transientsDisconnect(*mem);
- if (mem->memCache.index >= 0 && !shutting_down)
- Store::Root().memoryDisconnect(*this);
+ // Store::Root() is FATALly missing during shutdown
+ if (hasTransients() && !shutting_down)
+ Store::Root().transientsDisconnect(*this);
+ if (hasMemStore() && !shutting_down)
+ Store::Root().memoryDisconnect(*this);
+ if (MemObject *mem = mem_obj) {
setMemStatus(NOT_IN_MEMORY);
mem_obj = NULL;
delete mem;
return;
// Store::Root() is FATALly missing during shutdown
- if (e->swap_filen >= 0 && !shutting_down)
+ if (e->hasDisk() && !shutting_down)
e->disk().disconnect(*e);
e->destroyMemObject();
StoreEntry::hashInsert(const cache_key * someKey)
{
debugs(20, 3, "StoreEntry::hashInsert: Inserting Entry " << *this << " key '" << storeKeyText(someKey) << "'");
+ assert(!key);
key = storeKeyDup(someKey);
hash_join(store_table, this);
}
/* -------------------------------------------------------------------------- */
-/* get rid of memory copy of the object */
-void
-StoreEntry::purgeMem()
-{
- if (mem_obj == NULL)
- return;
-
- debugs(20, 3, "StoreEntry::purgeMem: Freeing memory-copy of " << getMD5Text());
-
- Store::Root().memoryUnlink(*this);
-
- if (swap_status != SWAPOUT_DONE)
- release();
-}
-
void
StoreEntry::lock(const char *context)
{
lastref = squid_curtime;
}
-void
-StoreEntry::setReleaseFlag()
-{
- if (EBIT_TEST(flags, RELEASE_REQUEST))
- return;
-
- debugs(20, 3, "StoreEntry::setReleaseFlag: '" << getMD5Text() << "'");
-
- EBIT_SET(flags, RELEASE_REQUEST);
-
- Store::Root().markForUnlink(*this);
-}
-
void
StoreEntry::releaseRequest(const bool shareable)
{
+ debugs(20, 3, shareable << ' ' << *this);
+ if (!shareable)
+ shareableWhenPrivate = false; // may already be false
if (EBIT_TEST(flags, RELEASE_REQUEST))
return;
- setReleaseFlag(); // makes validToSend() false, preventing future hits
-
- setPrivateKey(shareable);
+ setPrivateKey(shareable, true);
}
int
if (lock_count)
return (int) lock_count;
- if (store_status == STORE_PENDING)
- setReleaseFlag();
+ abandon(context);
+ return 0;
+}
+/// keep the unlocked StoreEntry object in the local store_table (if needed) or
+/// delete it (otherwise)
+void
+StoreEntry::doAbandon(const char *context)
+{
+ debugs(20, 5, *this << " via " << (context ? context : "somebody"));
+ assert(!locked());
assert(storePendingNClients(this) == 0);
- if (EBIT_TEST(flags, RELEASE_REQUEST)) {
+ // Both aborted local writers and aborted local readers (of remote writers)
+ // are STORE_PENDING, but aborted readers should never release().
+ if (EBIT_TEST(flags, RELEASE_REQUEST) ||
+ (store_status == STORE_PENDING && !Store::Root().transientsReader(*this))) {
this->release();
- return 0;
+ return;
}
if (EBIT_TEST(flags, KEY_PRIVATE))
debugs(20, DBG_IMPORTANT, "WARNING: " << __FILE__ << ":" << __LINE__ << ": found KEY_PRIVATE");
Store::Root().handleIdleEntry(*this); // may delete us
- return 0;
}
void
StoreEntry *
storeGetPublic(const char *uri, const HttpRequestMethod& method)
{
- return Store::Root().get(storeKeyPublic(uri, method));
+ return Store::Root().find(storeKeyPublic(uri, method));
}
StoreEntry *
storeGetPublicByRequestMethod(HttpRequest * req, const HttpRequestMethod& method, const KeyScope keyScope)
{
- return Store::Root().get(storeKeyPublicByRequestMethod(req, method, keyScope));
+ return Store::Root().find(storeKeyPublicByRequestMethod(req, method, keyScope));
}
StoreEntry *
* concept'.
*/
void
-StoreEntry::setPrivateKey(const bool shareable)
+StoreEntry::setPrivateKey(const bool shareable, const bool permanent)
{
- if (key && EBIT_TEST(flags, KEY_PRIVATE)) {
- // The entry is already private, but it may be still shareable.
- if (!shareable)
- shareableWhenPrivate = false;
+ debugs(20, 3, shareable << permanent << ' ' << *this);
+ if (permanent)
+ EBIT_SET(flags, RELEASE_REQUEST); // may already be set
+ if (!shareable)
+ shareableWhenPrivate = false; // may already be false
+
+ if (EBIT_TEST(flags, KEY_PRIVATE))
return;
- }
if (key) {
- setReleaseFlag(); // will markForUnlink(); all caches/workers will know
-
- // TODO: move into SwapDir::markForUnlink() already called by Root()
- if (swap_filen > -1)
- storeDirSwapLog(this, SWAP_LOG_DEL);
-
+ Store::Root().evictCached(*this); // all caches/workers will know
hashDelete();
}
hashInsert(newkey);
}
-void
+bool
StoreEntry::setPublicKey(const KeyScope scope)
{
+ debugs(20, 3, *this);
if (key && !EBIT_TEST(flags, KEY_PRIVATE))
- return; /* is already public */
+ return true; // already public
assert(mem_obj);
assert(!EBIT_TEST(flags, RELEASE_REQUEST));
- adjustVary();
- forcePublicKey(calcPublicKey(scope));
+ try {
+ EntryGuard newVaryMarker(adjustVary(), "setPublicKey+failure");
+ const cache_key *pubKey = calcPublicKey(scope);
+ Store::Root().addWriting(this, pubKey);
+ forcePublicKey(pubKey);
+ newVaryMarker.unlockAndReset("setPublicKey+success");
+ return true;
+ } catch (const std::exception &ex) {
+ debugs(20, 2, "for " << *this << " failed: " << ex.what());
+ }
+ return false;
}
void
void
StoreEntry::forcePublicKey(const cache_key *newkey)
{
+ debugs(20, 3, storeKeyText(newkey) << " for " << *this);
+ assert(mem_obj);
+
if (StoreEntry *e2 = (StoreEntry *)hash_lookup(store_table, newkey)) {
assert(e2 != this);
- debugs(20, 3, "Making old " << *e2 << " private.");
-
- // TODO: check whether there is any sense in keeping old entry
- // shareable here. Leaving it non-shareable for now.
- e2->setPrivateKey(false);
- e2->release(false);
+ debugs(20, 3, "releasing clashing " << *e2);
+ e2->release(true);
}
if (key)
clearPrivate();
+ assert(mem_obj->hasUris());
hashInsert(newkey);
- if (swap_filen > -1)
+ if (hasDisk())
storeDirSwapLog(this, SWAP_LOG_ADD);
}
/// Updates mem_obj->request->vary_headers to reflect the current Vary.
/// The vary_headers field is used to calculate the Vary marker key.
/// Releases the old Vary marker with an outdated key (if any).
-void
+/// \returns new (locked) Vary marker StoreEntry or, if none was needed, nil
+/// \throws std::exception on failures
+StoreEntry *
StoreEntry::adjustVary()
{
assert(mem_obj);
if (!mem_obj->request)
- return;
+ return nullptr;
HttpRequestPointer request(mem_obj->request);
*/
request->vary_headers.clear(); /* free old "bad" variance key */
if (StoreEntry *pe = storeGetPublic(mem_obj->storeId(), mem_obj->method))
- pe->release();
+ pe->release(true);
}
/* Make sure the request knows the variance status */
// We should add/use storeHas() API or lock/unlock those entries.
if (!mem_obj->vary_headers.isEmpty() && !storeGetPublic(mem_obj->storeId(), mem_obj->method)) {
/* Create "vary" base object */
- String vary;
StoreEntry *pe = storeCreateEntry(mem_obj->storeId(), mem_obj->logUri(), request->flags, request->method);
+ // XXX: storeCreateEntry() already tries to make `pe` public under
+ // certain conditions. If those conditions do not apply to Vary markers,
+ // then refactor to call storeCreatePureEntry() above. Otherwise,
+ // refactor to simply check whether `pe` is already public below.
+ if (!pe->makePublic()) {
+ pe->unlock("StoreEntry::adjustVary+failed_makePublic");
+ throw TexcHere("failed to make Vary marker public");
+ }
/* We are allowed to do this typecast */
HttpReply *rep = new HttpReply;
rep->setHeaders(Http::scOkay, "Internal marker object", "x-squid-internal/vary", -1, -1, squid_curtime + 100000);
- vary = mem_obj->getReply()->header.getList(Http::HdrType::VARY);
+ String vary = mem_obj->getReply()->header.getList(Http::HdrType::VARY);
if (vary.size()) {
/* Again, we own this structure layout */
}
#endif
- pe->replaceHttpReply(rep, false); // no write until key is public
+ pe->replaceHttpReply(rep, false); // no write until timestampsSet()
pe->timestampsSet();
- pe->makePublic();
-
- pe->startWriting(); // after makePublic()
+ pe->startWriting(); // after timestampsSet()
pe->complete();
- pe->unlock("StoreEntry::forcePublicKey+Vary");
+ return pe;
}
+ return nullptr;
}
StoreEntry *
-storeCreatePureEntry(const char *url, const char *log_url, const RequestFlags &flags, const HttpRequestMethod& method)
+storeCreatePureEntry(const char *url, const char *log_url, const HttpRequestMethod& method)
{
StoreEntry *e = NULL;
debugs(20, 3, "storeCreateEntry: '" << url << "'");
e = new StoreEntry();
e->createMemObject(url, log_url, method);
- if (flags.cachable) {
- EBIT_CLR(e->flags, RELEASE_REQUEST);
- } else {
- e->releaseRequest();
- }
-
e->store_status = STORE_PENDING;
e->refcount = 0;
e->lastref = squid_curtime;
StoreEntry *
storeCreateEntry(const char *url, const char *logUrl, const RequestFlags &flags, const HttpRequestMethod& method)
{
- StoreEntry *e = storeCreatePureEntry(url, logUrl, flags, method);
+ StoreEntry *e = storeCreatePureEntry(url, logUrl, method);
e->lock("storeCreateEntry");
- if (neighbors_do_private_keys || !flags.hierarchical)
- e->setPrivateKey(false);
- else
- e->setPublicKey();
+ if (!neighbors_do_private_keys && flags.hierarchical && flags.cachable && e->setPublicKey())
+ return e;
+ e->setPrivateKey(false, !flags.cachable);
return e;
}
} else if (EBIT_TEST(flags, KEY_PRIVATE)) {
debugs(20, 3, "StoreEntry::checkCachable: NO: private key");
++store_check_cachable_hist.no.private_key;
- } else if (swap_status != SWAPOUT_NONE) {
+ } else if (hasDisk()) {
/*
- * here we checked the swap_status because the remaining
- * cases are only relevant only if we haven't started swapping
- * out the object yet.
+ * the remaining cases are only relevant if we haven't
+ * started swapping out the object yet.
*/
return 1;
} else if (storeTooManyDiskFilesOpen()) {
storeGetMemSpace(int size)
{
PROF_start(storeGetMemSpace);
- StoreEntry *e = NULL;
- int released = 0;
- static time_t last_check = 0;
- size_t pages_needed;
- RemovalPurgeWalker *walker;
-
- if (squid_curtime == last_check) {
- PROF_stop(storeGetMemSpace);
- return;
- }
-
- last_check = squid_curtime;
-
- pages_needed = (size + SM_PAGE_SIZE-1) / SM_PAGE_SIZE;
-
- if (mem_node::InUseCount() + pages_needed < store_pages_max) {
- PROF_stop(storeGetMemSpace);
- return;
- }
-
- debugs(20, 2, "storeGetMemSpace: Starting, need " << pages_needed <<
- " pages");
-
- /* XXX what to set as max_scan here? */
- walker = mem_policy->PurgeInit(mem_policy, 100000);
-
- while ((e = walker->Next(walker))) {
- e->purgeMem();
- ++released;
-
- if (mem_node::InUseCount() + pages_needed < store_pages_max)
- break;
- }
-
- walker->Done(walker);
- debugs(20, 3, "storeGetMemSpace stats:");
- debugs(20, 3, " " << std::setw(6) << hot_obj_count << " HOT objects");
- debugs(20, 3, " " << std::setw(6) << released << " were released");
+ if (!shutting_down) // Store::Root() is FATALly missing during shutdown
+ Store::Root().freeMemorySpace(size);
PROF_stop(storeGetMemSpace);
}
#define MAINTAIN_MAX_SCAN 1024
#define MAINTAIN_MAX_REMOVE 64
-/* release an object from a cache */
void
StoreEntry::release(const bool shareable)
{
PROF_start(storeRelease);
- debugs(20, 3, "releasing " << *this << ' ' << getMD5Text());
+ debugs(20, 3, shareable << ' ' << *this << ' ' << getMD5Text());
/* If, for any reason we can't discard this object because of an
* outstanding request, mark it for pending release */
if (locked()) {
- expireNow();
- debugs(20, 3, "storeRelease: Only setting RELEASE_REQUEST bit");
releaseRequest(shareable);
PROF_stop(storeRelease);
return;
}
- if (Store::Controller::store_dirs_rebuilding && swap_filen > -1) {
+ if (Store::Controller::store_dirs_rebuilding && hasDisk()) {
/* TODO: Teach disk stores to handle releases during rebuild instead. */
- Store::Root().memoryUnlink(*this);
-
- setPrivateKey(shareable);
-
// lock the entry until rebuilding is done
lock("storeLateRelease");
- setReleaseFlag();
+ releaseRequest(shareable);
LateReleaseStack.push(this);
return;
}
storeLog(STORE_LOG_RELEASE, this);
- if (swap_filen > -1 && !EBIT_TEST(flags, KEY_PRIVATE)) {
- // log before unlink() below clears swap_filen
- storeDirSwapLog(this, SWAP_LOG_DEL);
- }
-
- Store::Root().unlink(*this);
+ Store::Root().evictCached(*this);
destroyStoreEntry(static_cast<hash_link *>(this));
PROF_stop(storeRelease);
}
if (mem_obj->inmem_lo != 0)
return 0;
- if (!Config.onoff.memory_cache_first && swap_status == SWAPOUT_DONE && refcount == 1)
+ if (!Config.onoff.memory_cache_first && swappedOut() && refcount == 1)
return 0;
return 1;
return 0;
// now check that the entry has a cache backing or is collapsed
- if (swap_filen > -1) // backed by a disk cache
+ if (hasDisk()) // backed by a disk cache
return 1;
if (swappingOut()) // will be backed by a disk cache
flush();
complete();
negativeCache();
- releaseRequest();
+ releaseRequest(false); // if it is safe to negatively cache, sharing is OK
unlock("StoreEntry::storeErrorResponse");
}
void
StoreEntry::transientsAbandonmentCheck()
{
- if (mem_obj && !mem_obj->smpCollapsed && // this worker is responsible
- mem_obj->xitTable.index >= 0 && // other workers may be interested
- mem_obj->memCache.index < 0 && // rejected by the shared memory cache
+ if (mem_obj && !Store::Root().transientsReader(*this) && // this worker is responsible
+ hasTransients() && // other workers may be interested
+ !hasMemStore() && // rejected by the shared memory cache
mem_obj->swapout.decision == MemObject::SwapOut::swImpossible) {
debugs(20, 7, "cannot be shared: " << *this);
if (!shutting_down) // Store::Root() is FATALly missing during shutdown
- Store::Root().transientsAbandon(*this);
+ Store::Root().stopSharing(*this);
}
}
Store::Disk &
StoreEntry::disk() const
{
- assert(0 <= swap_dirn && swap_dirn < Config.cacheSwap.n_configured);
+ assert(hasDisk());
const RefCount<Store::Disk> &sd = INDEXSD(swap_dirn);
assert(sd);
return *sd;
}
+bool
+StoreEntry::hasDisk(const sdirno dirn, const sfileno filen) const
+{
+ checkDisk();
+ if (dirn < 0 && filen < 0)
+ return swap_dirn >= 0;
+ Must(dirn >= 0);
+ const bool matchingDisk = (swap_dirn == dirn);
+ return filen < 0 ? matchingDisk : (matchingDisk && swap_filen == filen);
+}
+
+void
+StoreEntry::attachToDisk(const sdirno dirn, const sfileno fno, const swap_status_t status)
+{
+ debugs(88, 3, "attaching entry with key " << getMD5Text() << " : " <<
+ swapStatusStr[status] << " " << dirn << " " <<
+ std::hex << std::setw(8) << std::setfill('0') <<
+ std::uppercase << fno);
+ checkDisk();
+ swap_dirn = dirn;
+ swap_filen = fno;
+ swap_status = status;
+ checkDisk();
+}
+
+void
+StoreEntry::detachFromDisk()
+{
+ swap_dirn = -1;
+ swap_filen = -1;
+ swap_status = SWAPOUT_NONE;
+}
+
+void
+StoreEntry::checkDisk() const
+{
+ const bool ok = (swap_dirn < 0) == (swap_filen < 0) &&
+ (swap_dirn < 0) == (swap_status == SWAPOUT_NONE) &&
+ (swap_dirn < 0 || swap_dirn < Config.cacheSwap.n_configured);
+
+ if (!ok) {
+ debugs(88, DBG_IMPORTANT, "ERROR: inconsistent disk entry state " << *this);
+ throw std::runtime_error("inconsistent disk entry state ");
+ }
+}
+
/*
* return true if the entry is in a state where
* it can accept more data (ie with write() method)
return buf;
}
+static std::ostream &
+operator <<(std::ostream &os, const Store::IoStatus &io)
+{
+ switch (io) {
+ case Store::ioUndecided:
+ os << 'u';
+ break;
+ case Store::ioReading:
+ os << 'r';
+ break;
+ case Store::ioWriting:
+ os << 'w';
+ break;
+ case Store::ioDone:
+ os << 'o';
+ break;
+ }
+ return os;
+}
+
std::ostream &operator <<(std::ostream &os, const StoreEntry &e)
{
os << "e:";
- if (e.mem_obj) {
- if (e.mem_obj->xitTable.index > -1)
- os << 't' << e.mem_obj->xitTable.index;
- if (e.mem_obj->memCache.index > -1)
- os << 'm' << e.mem_obj->memCache.index;
+ if (e.hasTransients()) {
+ const auto &xitTable = e.mem_obj->xitTable;
+ os << 't' << xitTable.io << xitTable.index;
+ }
+
+ if (e.hasMemStore()) {
+ const auto &memCache = e.mem_obj->memCache;
+ os << 'm' << memCache.io << memCache.index << '@' << memCache.offset;
}
+
+ // Do not use e.hasDisk() here because its checkDisk() call may calls us.
if (e.swap_filen > -1 || e.swap_dirn > -1)
os << 'd' << e.swap_filen << '@' << e.swap_dirn;
if (EBIT_TEST(e.flags, ENTRY_ABORTED)) os << 'A';
}
- if (e.mem_obj && e.mem_obj->smpCollapsed)
- os << 'O';
-
return os << '/' << &e << '*' << e.locks();
}
return NULL;
}
+void
+Store::EntryGuard::onException() noexcept
+{
+ SWALLOW_EXCEPTIONS({
+ entry_->releaseRequest(false);
+ entry_->unlock(context_);
+ });
+}
+
class Controlled: public Storage
{
public:
+ /// \returns a possibly unlocked/unregistered stored entry with key (or nil)
+ /// The returned entry might not match the caller's Store ID or method. The
+ /// caller must abandon()/release() the entry or register it with Root().
+ /// This method must not trigger slow I/O operations (e.g., disk swap in).
+ virtual StoreEntry *get(const cache_key *) = 0;
+
/// somebody needs this entry (many cache replacement policies need to know)
virtual void reference(StoreEntry &e) = 0;
/// make stored metadata and HTTP headers the same as in the given entry
virtual void updateHeaders(StoreEntry *) {}
- /// If this storage cannot cache collapsed entries, return false.
+ /// If Transients entry cannot be attached to this storage, return false.
/// If the entry is not found, return false. Otherwise, return true after
- /// tying the entry to this cache and setting inSync to updateCollapsed().
- virtual bool anchorCollapsed(StoreEntry &, bool &/*inSync*/) { return false; }
+ /// tying the entry to this cache and setting inSync to updateAnchored().
+ virtual bool anchorToCache(StoreEntry &, bool &/*inSync*/) { return false; }
- /// Update a local collapsed entry with fresh info from this cache (if any).
- /// Return true iff the cache supports collapsed entries and
- /// the given local collapsed entry is now in sync with this storage.
- virtual bool updateCollapsed(StoreEntry &) { return false; }
+ /// Update a local Transients entry with fresh info from this cache (if any).
+ /// Return true iff the cache supports Transients entries and
+ /// the given local Transients entry is now in sync with this storage.
+ virtual bool updateAnchored(StoreEntry &) { return false; }
};
} // namespace Store
/* Notify the fs that we're referencing this object again */
- if (e.swap_dirn > -1)
+ if (e.hasDisk())
swapDir->reference(e);
// Notify the memory cache that we're referencing this object again
/* Notify the fs that we're not referencing this object any more */
- if (e.swap_filen > -1)
+ if (e.hasDisk())
keepInStoreTable = swapDir->dereference(e) || keepInStoreTable;
// Notify the memory cache that we're not referencing this object any more
return keepInStoreTable;
}
+bool
+Store::Controller::markedForDeletion(const cache_key *key) const
+{
+ // assuming a public key, checking Transients should cover all cases.
+ return transients && transients->markedForDeletion(key);
+}
+
+bool
+Store::Controller::markedForDeletionAndAbandoned(const StoreEntry &e) const
+{
+ // The opposite check order could miss a reader that has arrived after the
+ // !readers() and before the markedForDeletion() check.
+ return markedForDeletion(reinterpret_cast<const cache_key*>(e.key)) &&
+ transients && !transients->readers(e);
+}
+
+bool
+Store::Controller::hasReadableDiskEntry(const StoreEntry &e) const
+{
+ return swapDir->hasReadableEntry(e);
+}
+
StoreEntry *
-Store::Controller::get(const cache_key *key)
+Store::Controller::find(const cache_key *key)
{
- if (StoreEntry *e = find(key)) {
- // this is not very precise: some get()s are not initiated by clients
- e->touch();
- referenceBusy(*e);
- return e;
+ if (const auto entry = peek(key)) {
+ try {
+ if (!entry->key)
+ allowSharing(*entry, key);
+ checkTransients(*entry);
+ entry->touch();
+ referenceBusy(*entry);
+ return entry;
+ } catch (const std::exception &ex) {
+ debugs(20, 2, "failed with " << *entry << ": " << ex.what());
+ entry->release(true);
+ // fall through
+ }
}
return NULL;
}
-/// Internal method to implements the guts of the Store::get() API:
-/// returns an in-transit or cached object with a given key, if any.
+/// indexes and adds SMP-tracking for an ephemeral peek() result
+void
+Store::Controller::allowSharing(StoreEntry &entry, const cache_key *key)
+{
+ // TODO: refactor to throw on anchorToCache() inSync errors!
+
+ // anchorToCache() below and many find() callers expect a registered entry
+ addReading(&entry, key);
+
+ if (entry.hasTransients()) {
+ bool inSync = false;
+ const bool found = anchorToCache(entry, inSync);
+ if (found && !inSync)
+ throw TexcHere("cannot sync");
+ }
+}
+
StoreEntry *
-Store::Controller::find(const cache_key *key)
+Store::Controller::findCallback(const cache_key *key)
{
- debugs(20, 3, storeKeyText(key));
+ // We could check for mem_obj presence (and more), moving and merging some
+ // of the duplicated neighborsUdpAck() and neighborsHtcpReply() code here,
+ // but that would mean polluting Store with HTCP/ICP code. Instead, we
+ // should encapsulate callback-related data in a protocol-neutral MemObject
+ // member or use an HTCP/ICP-specific index rather than store_table.
+ return peekAtLocal(key);
+}
+/// \returns either an existing local reusable StoreEntry object or nil
+/// To treat remotely marked entries specially,
+/// callers ought to check markedForDeletion() first!
+StoreEntry *
+Store::Controller::peekAtLocal(const cache_key *key)
+{
if (StoreEntry *e = static_cast<StoreEntry*>(hash_lookup(store_table, key))) {
+ // callers must only search for public entries
+ assert(!EBIT_TEST(e->flags, KEY_PRIVATE));
+ assert(e->publicKey());
+ checkTransients(*e);
+
// TODO: ignore and maybe handleIdleEntry() unlocked intransit entries
// because their backing store slot may be gone already.
- debugs(20, 3, HERE << "got in-transit entry: " << *e);
+ return e;
+ }
+ return nullptr;
+}
+
+StoreEntry *
+Store::Controller::peek(const cache_key *key)
+{
+ debugs(20, 3, storeKeyText(key));
+
+ if (markedForDeletion(key)) {
+ debugs(20, 3, "ignoring marked in-transit " << storeKeyText(key));
+ return nullptr;
+ }
+
+ if (StoreEntry *e = peekAtLocal(key)) {
+ debugs(20, 3, "got local in-transit entry: " << *e);
return e;
}
if (transients) {
if (StoreEntry *e = transients->get(key)) {
debugs(20, 3, "got shared in-transit entry: " << *e);
- bool inSync = false;
- const bool found = anchorCollapsed(*e, inSync);
- if (!found || inSync)
- return e;
- assert(!e->locked()); // ensure release will destroyStoreEntry()
- e->release(); // do not let others into the same trap
- return NULL;
+ return e;
}
}
return nullptr;
}
+bool
+Store::Controller::transientsReader(const StoreEntry &e) const
+{
+ return transients && e.hasTransients() && transients->isReader(e);
+}
+
+bool
+Store::Controller::transientsWriter(const StoreEntry &e) const
+{
+ return transients && e.hasTransients() && transients->isWriter(e);
+}
+
int64_t
Store::Controller::accumulateMore(StoreEntry &entry) const
{
// The memory cache should not influence for-swapout accumulation decision.
}
+// Must be called from StoreEntry::release() or releaseRequest() because
+// those methods currently manage local indexing of StoreEntry objects.
+// TODO: Replace StoreEntry::release*() with Root().evictCached().
+void
+Store::Controller::evictCached(StoreEntry &e)
+{
+ debugs(20, 7, e);
+ if (transients)
+ transients->evictCached(e);
+ memoryEvictCached(e);
+ if (swapDir)
+ swapDir->evictCached(e);
+}
+
void
-Store::Controller::markForUnlink(StoreEntry &e)
+Store::Controller::evictIfFound(const cache_key *key)
+{
+ debugs(20, 7, storeKeyText(key));
+
+ if (StoreEntry *entry = peekAtLocal(key)) {
+ debugs(20, 5, "marking local in-transit " << *entry);
+ entry->release(true);
+ return;
+ }
+
+ if (memStore)
+ memStore->evictIfFound(key);
+ if (swapDir)
+ swapDir->evictIfFound(key);
+ if (transients)
+ transients->evictIfFound(key);
+}
+
+/// whether the memory cache is allowed to store that many additional pages
+bool
+Store::Controller::memoryCacheHasSpaceFor(const int pagesRequired) const
{
- if (transients && e.mem_obj && e.mem_obj->xitTable.index >= 0)
- transients->markForUnlink(e);
- if (memStore && e.mem_obj && e.mem_obj->memCache.index >= 0)
- memStore->markForUnlink(e);
- if (swapDir && e.swap_filen >= 0)
- swapDir->markForUnlink(e);
+ // XXX: We count mem_nodes but may free shared memory pages instead.
+ const auto fits = mem_node::InUseCount() + pagesRequired <= store_pages_max;
+ debugs(20, 7, fits << ": " << mem_node::InUseCount() << '+' << pagesRequired << '?' << store_pages_max);
+ return fits;
}
void
-Store::Controller::unlink(StoreEntry &e)
+Store::Controller::freeMemorySpace(const int bytesRequired)
{
- memoryUnlink(e);
- if (swapDir && e.swap_filen >= 0)
- swapDir->unlink(e);
+ const auto pagesRequired = (bytesRequired + SM_PAGE_SIZE-1) / SM_PAGE_SIZE;
+
+ if (memoryCacheHasSpaceFor(pagesRequired))
+ return;
+
+ // XXX: When store_pages_max is smaller than pagesRequired, we should not
+ // look for more space (but we do because we want to abandon idle entries?).
+
+ // limit our performance impact to one walk per second
+ static time_t lastWalk = 0;
+ if (lastWalk == squid_curtime)
+ return;
+ lastWalk = squid_curtime;
+
+ debugs(20, 2, "need " << pagesRequired << " pages");
+
+ // let abandon()/handleIdleEntry() know about the impeding memory shortage
+ memoryPagesDebt_ = pagesRequired;
+
+ // XXX: SMP-unaware: Walkers should iterate memory cache, not store_table.
+ // XXX: Limit iterations by time, not arbitrary count.
+ const auto walker = mem_policy->PurgeInit(mem_policy, 100000);
+ int removed = 0;
+ while (const auto entry = walker->Next(walker)) {
+ // Abandoned memory cache entries are purged during memory shortage.
+ entry->abandon(__FUNCTION__); // may delete entry
+ ++removed;
+
+ if (memoryCacheHasSpaceFor(pagesRequired))
+ break;
+ }
+ // TODO: Move to RemovalPolicyWalker::Done() that has more/better details.
+ debugs(20, 3, "removed " << removed << " out of " << hot_obj_count << " memory-cached entries");
+ walker->Done(walker);
+ memoryPagesDebt_ = 0;
}
// move this into [non-shared] memory cache class when we have one
e.trimMemory(preserveSwappable);
}
+/// removes the entry from the memory cache
+/// XXX: Dangerous side effect: Unlocked entries lose their mem_obj.
void
-Store::Controller::memoryUnlink(StoreEntry &e)
+Store::Controller::memoryEvictCached(StoreEntry &e)
{
+ // TODO: Untangle memory caching from mem_obj.
if (memStore)
- memStore->unlink(e);
+ memStore->evictCached(e);
else // TODO: move into [non-shared] memory cache class when we have one
- e.destroyMemObject();
+ if (!e.locked())
+ e.destroyMemObject();
}
void
}
void
-Store::Controller::transientsAbandon(StoreEntry &e)
+Store::Controller::stopSharing(StoreEntry &e)
{
- if (transients) {
- assert(e.mem_obj);
- if (e.mem_obj->xitTable.index >= 0)
- transients->abandon(e);
- }
+ // Marking the transients entry is sufficient to prevent new readers from
+ // starting to wait for `e` updates and to inform the current readers (and,
+ // hence, Broadcast() recipients) about the underlying Store problems.
+ if (transients && e.hasTransients())
+ transients->evictCached(e);
}
void
Store::Controller::transientsCompleteWriting(StoreEntry &e)
{
- if (transients) {
- assert(e.mem_obj);
- if (e.mem_obj->xitTable.index >= 0)
- transients->completeWriting(e);
- }
+ // transients->isWriter(e) is false if `e` is writing to its second store
+ // after finishing writing to its first store: At the end of the first swap
+ // out, the transients writer becomes a reader and (XXX) we never switch
+ // back to writing, even if we start swapping out again (to another store).
+ if (transients && e.hasTransients() && transients->isWriter(e))
+ transients->completeWriting(e);
}
int
Store::Controller::transientReaders(const StoreEntry &e) const
{
- return (transients && e.mem_obj && e.mem_obj->xitTable.index >= 0) ?
+ return (transients && e.hasTransients()) ?
transients->readers(e) : 0;
}
void
-Store::Controller::transientsDisconnect(MemObject &mem_obj)
+Store::Controller::transientsDisconnect(StoreEntry &e)
{
if (transients)
- transients->disconnect(mem_obj);
+ transients->disconnect(e);
}
void
} else {
keepInLocalMemory = keepForLocalMemoryCache(e) && // in good shape and
// the local memory cache is not overflowing
- (mem_node::InUseCount() <= store_pages_max);
+ memoryCacheHasSpaceFor(memoryPagesDebt_);
}
// An idle, unlocked entry that only belongs to a SwapDir which controls
if (keepInLocalMemory) {
e.setMemStatus(IN_MEMORY);
e.mem_obj->unlinkRequest();
- } else {
- e.purgeMem(); // may free e
+ return;
+ }
+
+ // We know the in-memory data will be gone. Get rid of the entire entry if
+ // it has nothing worth preserving on disk either.
+ if (!e.swappedOut()) {
+ e.release(); // deletes e
+ return;
}
+
+ memoryEvictCached(e); // may already be gone
+ // and keep the entry in store_table for its on-disk data
}
void
if (memStore && old->mem_status == IN_MEMORY && !EBIT_TEST(old->flags, ENTRY_SPECIAL))
memStore->updateHeaders(old);
- if (old->swap_dirn > -1)
+ if (old->hasDisk())
swapDir->updateHeaders(old);
}
-void
+bool
Store::Controller::allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags,
const HttpRequestMethod &reqMethod)
{
const KeyScope keyScope = reqFlags.refresh ? ksRevalidation : ksDefault;
- e->makePublic(keyScope); // this is needed for both local and SMP collapsing
+ if (e->makePublic(keyScope)) { // this is needed for both local and SMP collapsing
+ debugs(20, 3, "may " << (transients && e->hasTransients() ?
+ "SMP-" : "locally-") << "collapse " << *e);
+ return true;
+ }
+ return false;
+}
+
+void
+Store::Controller::addReading(StoreEntry *e, const cache_key *key)
+{
+ if (transients)
+ transients->monitorIo(e, key, Store::ioReading);
+ e->hashInsert(key);
+}
+
+void
+Store::Controller::addWriting(StoreEntry *e, const cache_key *key)
+{
+ assert(e);
+ if (EBIT_TEST(e->flags, ENTRY_SPECIAL))
+ return; // constant memory-resident entries do not need transients
+
if (transients)
- transients->startWriting(e, reqFlags, reqMethod);
- debugs(20, 3, "may " << (transients && e->mem_obj->xitTable.index >= 0 ?
- "SMP-" : "locally-") << "collapse " << *e);
+ transients->monitorIo(e, key, Store::ioWriting);
+ // else: non-SMP configurations do not need transients
}
void
assert(transients);
StoreEntry *collapsed = transients->findCollapsed(xitIndex);
- if (!collapsed) { // the entry is no longer locally active, ignore update
+ if (!collapsed) { // the entry is no longer active, ignore update
debugs(20, 7, "not SMP-syncing not-transient " << xitIndex);
return;
}
+
+ if (!collapsed->locked()) {
+ debugs(20, 3, "skipping (and may destroy) unlocked " << *collapsed);
+ handleIdleEntry(*collapsed);
+ return;
+ }
+
assert(collapsed->mem_obj);
- assert(collapsed->mem_obj->smpCollapsed);
+
+ if (EBIT_TEST(collapsed->flags, ENTRY_ABORTED)) {
+ debugs(20, 3, "skipping already aborted " << *collapsed);
+ return;
+ }
debugs(20, 7, "syncing " << *collapsed);
- bool abandoned = transients->abandoned(*collapsed);
+ bool abortedByWriter = false;
+ bool waitingToBeFreed = false;
+ transients->status(*collapsed, abortedByWriter, waitingToBeFreed);
+
+ if (waitingToBeFreed) {
+ debugs(20, 3, "will release " << *collapsed << " due to waitingToBeFreed");
+ collapsed->release(true); // may already be marked
+ }
+
+ if (transients->isWriter(*collapsed))
+ return; // readers can only change our waitingToBeFreed flag
+
+ assert(transients->isReader(*collapsed));
+
+ if (abortedByWriter) {
+ debugs(20, 3, "aborting " << *collapsed << " because its writer has aborted");
+ collapsed->abort();
+ return;
+ }
+
bool found = false;
bool inSync = false;
if (memStore && collapsed->mem_obj->memCache.io == MemObject::ioDone) {
found = true;
inSync = true;
debugs(20, 7, "fully mem-loaded " << *collapsed);
- } else if (memStore && collapsed->mem_obj->memCache.index >= 0) {
+ } else if (memStore && collapsed->hasMemStore()) {
found = true;
- inSync = memStore->updateCollapsed(*collapsed);
- } else if (swapDir && collapsed->swap_filen >= 0) {
+ inSync = memStore->updateAnchored(*collapsed);
+ // TODO: handle entries attached to both memory and disk
+ } else if (swapDir && collapsed->hasDisk()) {
found = true;
- inSync = swapDir->updateCollapsed(*collapsed);
+ inSync = swapDir->updateAnchored(*collapsed);
} else {
- found = anchorCollapsed(*collapsed, inSync);
+ found = anchorToCache(*collapsed, inSync);
}
- if (abandoned && collapsed->store_status == STORE_PENDING) {
- debugs(20, 3, "aborting abandoned but STORE_PENDING " << *collapsed);
+ if (waitingToBeFreed && !found) {
+ debugs(20, 3, "aborting unattached " << *collapsed <<
+ " because it was marked for deletion before we could attach it");
collapsed->abort();
return;
}
if (inSync) {
debugs(20, 5, "synced " << *collapsed);
collapsed->invokeHandlers();
- } else if (found) { // unrecoverable problem syncing this entry
+ return;
+ }
+
+ if (found) { // unrecoverable problem syncing this entry
debugs(20, 3, "aborting unsyncable " << *collapsed);
collapsed->abort();
- } else { // the entry is still not in one of the caches
- debugs(20, 7, "waiting " << *collapsed);
+ return;
}
+
+ // the entry is still not in one of the caches
+ debugs(20, 7, "waiting " << *collapsed);
}
-/// Called for in-transit entries that are not yet anchored to a cache.
+/// Called for Transients entries that are not yet anchored to a cache.
/// For cached entries, return true after synchronizing them with their cache
/// (making inSync true on success). For not-yet-cached entries, return false.
bool
-Store::Controller::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
+Store::Controller::anchorToCache(StoreEntry &entry, bool &inSync)
{
- // this method is designed to work with collapsed transients only
- assert(collapsed.mem_obj);
- assert(collapsed.mem_obj->xitTable.index >= 0);
- assert(collapsed.mem_obj->smpCollapsed);
+ assert(entry.hasTransients());
+ assert(transientsReader(entry));
- debugs(20, 7, "anchoring " << collapsed);
+ debugs(20, 7, "anchoring " << entry);
bool found = false;
if (memStore)
- found = memStore->anchorCollapsed(collapsed, inSync);
+ found = memStore->anchorToCache(entry, inSync);
if (!found && swapDir)
- found = swapDir->anchorCollapsed(collapsed, inSync);
+ found = swapDir->anchorToCache(entry, inSync);
if (found) {
if (inSync)
- debugs(20, 7, "anchored " << collapsed);
+ debugs(20, 7, "anchored " << entry);
else
- debugs(20, 5, "failed to anchor " << collapsed);
+ debugs(20, 5, "failed to anchor " << entry);
} else {
- debugs(20, 7, "skipping not yet cached " << collapsed);
+ debugs(20, 7, "skipping not yet cached " << entry);
}
return found;
return memStore || (swapDir && swapDir->smpAware());
}
+void
+Store::Controller::checkTransients(const StoreEntry &e) const
+{
+ if (EBIT_TEST(e.flags, ENTRY_SPECIAL))
+ return;
+ assert(!transients || e.hasTransients());
+}
+
namespace Store {
static RefCount<Controller> TheRoot;
}
/* Storage API */
virtual void create() override;
virtual void init() override;
- virtual StoreEntry *get(const cache_key *) override;
virtual uint64_t maxSize() const override;
virtual uint64_t minSize() const override;
virtual uint64_t currentSize() const override;
virtual void stat(StoreEntry &) const override;
virtual void sync() override;
virtual void maintain() override;
- virtual void markForUnlink(StoreEntry &) override;
- virtual void unlink(StoreEntry &) override;
+ virtual void evictCached(StoreEntry &) override;
+ virtual void evictIfFound(const cache_key *) override;
virtual int callback() override;
virtual bool smpAware() const override;
+ /// \returns a locally indexed and SMP-tracked matching StoreEntry (or nil)
+ /// Slower than peek() but does not restrict StoreEntry use and storage.
+ /// Counts as an entry reference from the removal policy p.o.v.
+ StoreEntry *find(const cache_key *);
+
+ /// \returns a matching StoreEntry not suitable for long-term use (or nil)
+ /// Faster than find() but the returned entry may not receive updates, may
+ /// lack information from some of the Stores, and should not be updated
+ /// except that purging peek()ed entries is supported.
+ /// Does not count as an entry reference from the removal policy p.o.v.
+ StoreEntry *peek(const cache_key *);
+
+ /// \returns matching StoreEntry associated with local ICP/HTCP transaction
+ /// Warning: The returned StoreEntry is not synced and may be marked for
+ /// deletion. Use it only for extracting transaction callback details.
+ /// TODO: Group and return just that callback-related data instead?
+ StoreEntry *findCallback(const cache_key *);
+
+ /// Whether a transient entry with the given public key exists and (but) was
+ /// marked for removal some time ago; get(key) returns nil in such cases.
+ bool markedForDeletion(const cache_key *key) const;
+
+ /// markedForDeletion() with no readers
+ /// this is one method because the two conditions must be checked in the right order
+ bool markedForDeletionAndAbandoned(const StoreEntry &) const;
+
+ /// whether there is a disk entry with e.key
+ bool hasReadableDiskEntry(const StoreEntry &) const;
+
/// Additional unknown-size entry bytes required by Store in order to
/// reduce the risk of selecting the wrong disk cache for the growing entry.
int64_t accumulateMore(StoreEntry &) const;
/// called when the entry is no longer needed by any transaction
void handleIdleEntry(StoreEntry &);
+ /// Evict memory cache entries to free at least `spaceRequired` bytes.
+ /// Should be called via storeGetMemSpace().
+ /// Unreliable: Fails if enough victims cannot be found fast enough.
+ void freeMemorySpace(const int spaceRequired);
+
/// called to get rid of no longer needed entry data in RAM, if any
void memoryOut(StoreEntry &, const bool preserveSwappable);
/// update old entry metadata and HTTP headers using a newer entry
void updateOnNotModified(StoreEntry *old, const StoreEntry &newer);
- /// makes the entry available for collapsing future requests
- void allowCollapsing(StoreEntry *, const RequestFlags &, const HttpRequestMethod &);
+ /// tries to make the entry available for collapsing future requests
+ bool allowCollapsing(StoreEntry *, const RequestFlags &, const HttpRequestMethod &);
+
+ /// register a being-read StoreEntry (to optimize concurrent cache reads
+ /// and to receive remote DELETE events)
+ void addReading(StoreEntry *, const cache_key *);
+
+ /// register a being-written StoreEntry (to support concurrent cache reads
+ /// and to receive remote DELETE events)
+ void addWriting(StoreEntry *, const cache_key *);
+
+ /// whether the entry is in "reading from Transients" I/O state
+ bool transientsReader(const StoreEntry &) const;
+
+ /// whether the entry is in "writing to Transients" I/O state
+ bool transientsWriter(const StoreEntry &) const;
/// marks the entry completed for collapsed requests
void transientsCompleteWriting(StoreEntry &);
/// Update local intransit entry after changes made by appending worker.
void syncCollapsed(const sfileno);
- /// calls Root().transients->abandon() if transients are tracked
- void transientsAbandon(StoreEntry &);
+ /// stop any current (and prevent any future) SMP sharing of the given entry
+ void stopSharing(StoreEntry &);
/// number of the transient entry readers some time ago
int transientReaders(const StoreEntry &) const;
/// disassociates the entry from the intransit table
- void transientsDisconnect(MemObject &);
-
- /// removes the entry from the memory cache
- void memoryUnlink(StoreEntry &);
+ void transientsDisconnect(StoreEntry &);
/// disassociates the entry from the memory cache, preserving cached data
void memoryDisconnect(StoreEntry &);
static int store_dirs_rebuilding;
private:
+ bool memoryCacheHasSpaceFor(const int pagesRequired) const;
+
/// update reference counters of the recently touched entry
void referenceBusy(StoreEntry &e);
/// dereference() an idle entry and return true if the entry should be deleted
bool dereferenceIdle(StoreEntry &, bool wantsLocalMemory);
- StoreEntry *find(const cache_key *key);
+ void allowSharing(StoreEntry &, const cache_key *);
+ StoreEntry *peekAtLocal(const cache_key *);
+
+ void memoryEvictCached(StoreEntry &);
+ void transientsUnlinkByKeyIfFound(const cache_key *);
bool keepForLocalMemoryCache(StoreEntry &e) const;
- bool anchorCollapsed(StoreEntry &, bool &inSync);
+ bool anchorToCache(StoreEntry &e, bool &inSync);
+ void checkTransients(const StoreEntry &) const;
Disks *swapDir; ///< summary view of all disk caches
Memory *memStore; ///< memory cache
/// will belong to a memory cache, a disk cache, or will be uncachable
/// when the response header comes. Used for SMP collapsed forwarding.
Transients *transients;
+
+ /// Hack: Relays page shortage from freeMemorySpace() to handleIdleEntry().
+ int memoryPagesDebt_ = 0;
};
/// safely access controller singleton
bool
Store::Disk::canLog(StoreEntry const &e)const
{
- if (e.swap_filen < 0)
+ if (e.hasDisk())
return false;
- if (e.swap_status != SWAPOUT_DONE)
+ if (!e.swappedOut())
return false;
if (e.swap_file_sz <= 0)
/// called when the entry is about to forget its association with cache_dir
virtual void disconnect(StoreEntry &) {}
- /// called when entry swap out is complete
- virtual void swappedOut(const StoreEntry &e) = 0;
+ /// finalize the successful swapout that has been already noticed by Store
+ virtual void finalizeSwapoutSuccess(const StoreEntry &) = 0;
+ /// abort the failed swapout that has been already noticed by Store
+ virtual void finalizeSwapoutFailure(StoreEntry &) = 0;
+
+ /// whether this cache dir has an entry with `e.key`
+ virtual bool hasReadableEntry(const StoreEntry &e) const = 0;
protected:
void parseOptions(int reconfiguring);
}
void
-Store::Disks::markForUnlink(StoreEntry &e) {
- if (e.swap_filen >= 0)
- store(e.swap_dirn)->markForUnlink(e);
+Store::Disks::evictCached(StoreEntry &e) {
+ if (e.hasDisk()) {
+ // TODO: move into Fs::Ufs::UFSSwapDir::evictCached()
+ if (!EBIT_TEST(e.flags, KEY_PRIVATE)) {
+ // log before evictCached() below may clear hasDisk()
+ storeDirSwapLog(&e, SWAP_LOG_DEL);
+ }
+
+ e.disk().evictCached(e);
+ return;
+ }
+
+ if (const auto key = e.publicKey())
+ evictIfFound(key);
}
void
-Store::Disks::unlink(StoreEntry &e) {
- if (e.swap_filen >= 0)
- store(e.swap_dirn)->unlink(e);
+Store::Disks::evictIfFound(const cache_key *key)
+{
+ for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
+ if (dir(i).active())
+ dir(i).evictIfFound(key);
+ }
}
bool
-Store::Disks::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
+Store::Disks::anchorToCache(StoreEntry &entry, bool &inSync)
{
if (const int cacheDirs = Config.cacheSwap.n_configured) {
// ask each cache_dir until the entry is found; use static starting
if (!sd.active())
continue;
- if (sd.anchorCollapsed(collapsed, inSync)) {
- debugs(20, 3, "cache_dir " << idx << " anchors " << collapsed);
+ if (sd.anchorToCache(entry, inSync)) {
+ debugs(20, 3, "cache_dir " << idx << " anchors " << entry);
return true;
}
}
}
debugs(20, 4, "none of " << Config.cacheSwap.n_configured <<
- " cache_dirs have " << collapsed);
+ " cache_dirs have " << entry);
return false;
}
bool
-Store::Disks::updateCollapsed(StoreEntry &collapsed)
+Store::Disks::updateAnchored(StoreEntry &entry)
{
- return collapsed.swap_filen >= 0 &&
- dir(collapsed.swap_dirn).updateCollapsed(collapsed);
+ return entry.hasDisk() &&
+ dir(entry.swap_dirn).updateAnchored(entry);
}
bool
return false;
}
-/* Store::Disks globals that should be converted to use RegisteredRunner */
+bool
+Store::Disks::hasReadableEntry(const StoreEntry &e) const
+{
+ for (int i = 0; i < Config.cacheSwap.n_configured; ++i)
+ if (dir(i).active() && dir(i).hasReadableEntry(e))
+ return true;
+ return false;
+}
void
storeDirOpenSwapLogs()
{
assert (e);
assert(!EBIT_TEST(e->flags, KEY_PRIVATE));
- assert(e->swap_filen >= 0);
+ assert(e->hasDisk());
/*
* icons and such; don't write them to the swap log
*/
virtual bool dereference(StoreEntry &e) override;
virtual void updateHeaders(StoreEntry *) override;
virtual void maintain() override;
- virtual bool anchorCollapsed(StoreEntry &e, bool &inSync) override;
- virtual bool updateCollapsed(StoreEntry &e) override;
- virtual void markForUnlink(StoreEntry &) override;
- virtual void unlink(StoreEntry &) override;
+ virtual bool anchorToCache(StoreEntry &e, bool &inSync) override;
+ virtual bool updateAnchored(StoreEntry &) override;
+ virtual void evictCached(StoreEntry &) override;
+ virtual void evictIfFound(const cache_key *) override;
virtual int callback() override;
/// slowly calculate (and cache) hi/lo watermarks and similar limits
/// reduce the risk of selecting the wrong disk cache for the growing entry.
int64_t accumulateMore(const StoreEntry&) const;
virtual bool smpAware() const override;
+ /// whether any of disk caches has entry with e.key
+ bool hasReadableEntry(const StoreEntry &) const;
private:
/* migration logic */
#define SQUID_STORE_STORAGE_H
#include "base/RefCount.h"
+#include "http/RequestMethod.h"
#include "store/forward.h"
+#include "store_key_md5.h"
class StoreInfoStats;
/// use readable() and writable() methods.
virtual void init() = 0;
- /// Retrieve a store entry from the store (blocking)
- virtual StoreEntry *get(const cache_key *) = 0;
-
/**
* The maximum size the store will support in normal use. Inaccuracy is
* permitted, but may throw estimates for memory etc out of whack.
*/
virtual void stat(StoreEntry &e) const = 0;
- /// expect an unlink() call after the entry becomes idle
- virtual void markForUnlink(StoreEntry &e) = 0;
+ /// Prevent new get() calls from returning the matching entry.
+ /// If the matching entry is unused, it may be removed from the store now.
+ /// The store entry is matched using either `e` attachment info or `e.key`.
+ virtual void evictCached(StoreEntry &e) = 0;
- /// remove the entry from the store
- virtual void unlink(StoreEntry &e) = 0;
+ /// An evictCached() equivalent for callers that did not get() a StoreEntry.
+ /// Callers with StoreEntry objects must use evictCached() instead.
+ virtual void evictIfFound(const cache_key *) = 0;
/// called once every main loop iteration; TODO: Move to UFS code.
virtual int callback() { return 0; }
namespace Store
{
+/// cache "I/O" direction and status
+enum IoStatus { ioUndecided, ioWriting, ioReading, ioDone };
+
class Storage;
class Controller;
class Controlled;
class Disks;
class Disk;
class DiskConfig;
+class EntryGuard;
typedef ::StoreEntry Entry;
typedef ::MemStore Memory;
if (getType() == STORE_DISK_CLIENT) {
/* assert we'll be able to get the data we want */
/* maybe we should open swapin_sio here */
- assert(entry->swap_filen > -1 || entry->swappingOut());
+ assert(entry->hasDisk() || entry->swappingOut());
}
}
const int64_t len = entry->objectLen();
// If we do not know the entry length, then we have to open the swap file.
- const bool canSwapIn = entry->swap_filen >= 0;
+ const bool canSwapIn = entry->hasDisk();
if (len < 0)
return canSwapIn;
flags.disk_io_pending = true;
if (mem->swap_hdr_sz != 0)
- if (entry->swap_status == SWAPOUT_WRITING)
+ if (entry->swappingOut())
assert(mem->swapout.sio->offset() > copyInto.offset + (int64_t)mem->swap_hdr_sz);
storeRead(swapin_sio,
dlinkDelete(&sc->node, &mem->clients);
-- mem->nclients;
- if (e->store_status == STORE_OK && e->swap_status != SWAPOUT_DONE)
+ if (e->store_status == STORE_OK && !e->swappedOut())
e->swapOut();
if (sc->swapin_sio != NULL) {
String ctype=(reply->content_type.size() ? reply->content_type.termedBuf() : str_unknown);
+ // mem_obj may still lack logging details; especially in RELEASE cases
+ const char *logUri = mem->hasUris() ? mem->logUri() : "?";
+
logfileLineStart(storelog);
logfilePrintf(storelog, "%9d.%03d %-7s %02d %08X %s %4d %9d %9d %9d " SQUIDSTRINGPH " %" PRId64 "/%" PRId64 " " SQUIDSBUFPH " %s\n",
(int) current_time.tv_sec,
reply->content_length,
e->contentLen(),
SQUIDSBUFPRINT(mem->method.image()),
- mem->logUri());
+ logUri);
logfileLineEnd(storelog);
} else {
/* no mem object. Most RELEASE cases */
* Calling StoreEntry->release() has no effect because we're
* still in 'store_rebuilding' state
*/
- if (e->swap_filen < 0)
+ if (!e->hasDisk())
continue;
if (opt_store_doublecheck)
return true;
}
-bool
-storeRebuildKeepEntry(const StoreEntry &tmpe, const cache_key *key, StoreRebuildData &stats)
-{
- /* this needs to become
- * 1) unpack url
- * 2) make synthetic request with headers ?? or otherwise search
- * for a matching object in the store
- * TODO FIXME change to new async api
- * TODO FIXME I think there is a race condition here with the
- * async api :
- * store A reads in object foo, searchs for it, and finds nothing.
- * store B reads in object foo, searchs for it, finds nothing.
- * store A gets called back with nothing, so registers the object
- * store B gets called back with nothing, so registers the object,
- * which will conflict when the in core index gets around to scanning
- * store B.
- *
- * this suggests that rather than searching for duplicates, the
- * index rebuild should just assume its the most recent accurate
- * store entry and whoever indexes the stores handles duplicates.
- */
- if (StoreEntry *e = Store::Root().get(key)) {
-
- if (e->lastref >= tmpe.lastref) {
- /* key already exists, old entry is newer */
- /* keep old, ignore new */
- ++stats.dupcount;
-
- // For some stores, get() creates/unpacks a store entry. Signal
- // such stores that we will no longer use the get() result:
- e->lock("storeRebuildKeepEntry");
- e->unlock("storeRebuildKeepEntry");
-
- return false;
- } else {
- /* URL already exists, this swapfile not being used */
- /* junk old, load new */
- e->release(); /* release old entry */
- ++stats.dupcount;
- }
- }
-
- return true;
-}
-
bool storeRebuildLoadEntry(int fd, int diskIndex, MemBuf &buf, StoreRebuildData &counts);
/// parses entry buffer and validates entry metadata; fills e on success
bool storeRebuildParseEntry(MemBuf &buf, StoreEntry &e, cache_key *key, StoreRebuildData &counts, uint64_t expectedSize);
-/// checks whether the loaded entry should be kept; updates counters
-bool storeRebuildKeepEntry(const StoreEntry &e, const cache_key *key, StoreRebuildData &counts);
#endif /* SQUID_STORE_REBUILD_H_ */
if (e->mem_status != NOT_IN_MEMORY)
debugs(20, 3, HERE << "already IN_MEMORY");
- debugs(20, 3, "storeSwapInStart: called for : " << e->swap_dirn << " " <<
- std::hex << std::setw(8) << std::setfill('0') << std::uppercase <<
- e->swap_filen << " " << e->getMD5Text());
+ debugs(20, 3, *e << " " << e->getMD5Text());
- if (e->swap_status != SWAPOUT_WRITING && e->swap_status != SWAPOUT_DONE) {
- debugs(20, DBG_IMPORTANT, "storeSwapInStart: bad swap_status (" << swapStatusStr[e->swap_status] << ")");
- return;
- }
-
- if (e->swap_filen < 0) {
- debugs(20, DBG_IMPORTANT, "storeSwapInStart: swap_filen < 0");
+ if (!e->hasDisk()) {
+ debugs(20, DBG_IMPORTANT, "BUG: Attempt to swap in a not-stored entry " << *e << ". Salvaged.");
return;
}
assert(e->mem_obj != NULL);
- debugs(20, 3, "storeSwapInStart: Opening fileno " << std::hex << std::setw(8) << std::setfill('0') << std::uppercase << e->swap_filen);
sc->swapin_sio = storeOpen(e, storeSwapInFileNotify, storeSwapInFileClosed, sc);
}
#include "squid.h"
#include "cbdata.h"
+#include "CollapsedForwarding.h"
#include "globals.h"
#include "Store.h"
#include "StoreClient.h"
debugs(20, 5, "storeSwapOutStart: Begin SwapOut '" << e->url() << "' to dirno " <<
e->swap_dirn << ", fileno " << std::hex << std::setw(8) << std::setfill('0') <<
std::uppercase << e->swap_filen);
- e->swap_status = SWAPOUT_WRITING;
e->swapOutDecision(MemObject::SwapOut::swStarted);
/* If we start swapping out objects with OutOfBand Metadata,
* then this code needs changing
sio = storeCreate(e, storeSwapOutFileNotify, storeSwapOutFileClosed, c);
if (sio == NULL) {
+ assert(!e->hasDisk());
e->swap_status = SWAPOUT_NONE;
e->swapOutDecision(MemObject::SwapOut::swImpossible);
delete c;
e->lock("storeSwapOutStart");
/* Pick up the file number if it was assigned immediately */
- e->swap_filen = mem->swapout.sio->swap_filen;
-
- e->swap_dirn = mem->swapout.sio->swap_dirn;
+ e->attachToDisk(mem->swapout.sio->swap_dirn, mem->swapout.sio->swap_filen, SWAPOUT_WRITING);
/* write out the swap metadata */
storeIOWrite(mem->swapout.sio, buf, mem->swap_hdr_sz, 0, xfree_cppwrapper);
}
+/// XXX: unused, see a related StoreIOState::file_callback
static void
storeSwapOutFileNotify(void *data, int errflag, StoreIOState::Pointer self)
{
static_cast<generic_cbdata *>(data)->unwrap(&e);
MemObject *mem = e->mem_obj;
- assert(e->swap_status == SWAPOUT_WRITING);
+ assert(e->swappingOut());
assert(mem);
assert(mem->swapout.sio == self);
assert(errflag == 0);
- assert(e->swap_filen < 0); // if this fails, call SwapDir::disconnect(e)
+ assert(!e->hasDisk()); // if this fails, call SwapDir::disconnect(e)
e->swap_filen = mem->swapout.sio->swap_filen;
e->swap_dirn = mem->swapout.sio->swap_dirn;
}
-1,
memNodeWriteComplete);
- if (!ok || anEntry->swap_status != SWAPOUT_WRITING)
+ if (!ok || !anEntry->swappingOut())
return false;
int64_t swapout_size = mem->endOffset() - mem->swapout.queue_offset;
}
#endif
- if (swap_status == SWAPOUT_WRITING)
+ if (swappingOut())
assert(mem_obj->inmem_lo <= mem_obj->objectBytesOnDisk() );
// buffered bytes we have not swapped out yet
}
/* Ok, we have stuff to swap out. Is there a swapout.sio open? */
- if (swap_status == SWAPOUT_NONE) {
+ if (!hasDisk()) {
assert(mem_obj->swapout.sio == NULL);
assert(mem_obj->inmem_lo == 0);
storeSwapOutStart(this); // sets SwapOut::swImpossible on failures
MemObject *mem = e->mem_obj;
assert(mem->swapout.sio == self);
- assert(e->swap_status == SWAPOUT_WRITING);
+ assert(e->swappingOut());
// if object_size is still unknown, the entry was probably aborted
if (errflag || e->objectLen() < 0) {
storeConfigure();
}
- if (e->swap_filen >= 0)
- e->disk().unlink(*e);
-
- assert(e->swap_status == SWAPOUT_NONE);
-
- e->releaseRequest();
+ e->disk().finalizeSwapoutFailure(*e);
+ e->releaseRequest(); // TODO: Keep the memory entry (if any)
} else {
/* swapping complete */
debugs(20, 3, "storeSwapOutFileClosed: SwapOut complete: '" << e->url() << "' to " <<
e->swap_file_sz = e->objectLen() + mem->swap_hdr_sz;
e->swap_status = SWAPOUT_DONE;
- e->disk().swappedOut(*e);
+ e->disk().finalizeSwapoutSuccess(*e);
// XXX: For some Stores, it is pointless to re-check cachability here
// and it leads to double counts in store_check_cachable_hist. We need
++statCounter.swap.outs;
}
+ Store::Root().transientsCompleteWriting(*e);
debugs(20, 3, "storeSwapOutFileClosed: " << __FILE__ << ":" << __LINE__);
mem->swapout.sio = NULL;
e->unlock("storeSwapOutFileClosed");
return false;
}
- // if we swapped out already, do not start over
- if (swap_status == SWAPOUT_DONE) {
+ // if we are swapping out or swapped out already, do not start over
+ if (hasDisk() || Store::Root().hasReadableDiskEntry(*this)) {
debugs(20, 3, "already did");
swapOutDecision(MemObject::SwapOut::swImpossible);
return false;
}
- // if we stared swapping out already, do not start over
+ // if we have just stared swapping out (attachToDisk() has not been
+ // called), do not start over
if (decision == MemObject::SwapOut::swStarted) {
debugs(20, 3, "already started");
swapOutDecision(MemObject::SwapOut::swImpossible);
return false;
}
+ if (Store::Root().markedForDeletionAndAbandoned(*this)) {
+ debugs(20, 3, "marked for deletion and abandoned");
+ swapOutDecision(MemObject::SwapOut::swImpossible);
+ return false;
+ }
+
// if we decided that swapout is possible, do not repeat same checks
if (decision == MemObject::SwapOut::swPossible) {
debugs(20, 3, "already allowed");
virtual uint64_t currentSize() const override;
virtual uint64_t currentCount() const override;
virtual void stat(StoreEntry &) const override;
- virtual void swappedOut(const StoreEntry &e) override {}
+ virtual void finalizeSwapoutSuccess(const StoreEntry &) override {}
+ virtual void finalizeSwapoutFailure(StoreEntry &) override {}
virtual void reconfigure() override;
virtual void init() override;
virtual bool unlinkdUseful() const override;
virtual StoreIOState::Pointer createStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *) override;
virtual StoreIOState::Pointer openStoreIO(StoreEntry &, StoreIOState::STFNCB *, StoreIOState::STIOCB *, void *) override;
virtual void parse(int, char*) override;
- virtual void markForUnlink(StoreEntry &) override {}
- virtual void unlink(StoreEntry &) override {}
+ virtual void evictCached(StoreEntry &) override {}
+ virtual void evictIfFound(const cache_key *) override {}
+ virtual bool hasReadableEntry(const StoreEntry &) const override { return false; }
};
typedef RefCount<TestSwapDir> TestSwapDirPointer;
#define STUB_API "CollapsedForwarding.cc"
#include "tests/STUB.h"
-void CollapsedForwarding::Broadcast(StoreEntry const&) STUB
+void CollapsedForwarding::Broadcast(StoreEntry const&, const bool) STUB
+void CollapsedForwarding::Broadcast(const sfileno, const bool) STUB
bool MemStore::keepInLocalMemory(const StoreEntry &) const STUB_RETVAL(false)
void MemStore::write(StoreEntry &e) STUB
void MemStore::completeWriting(StoreEntry &e) STUB
-void MemStore::unlink(StoreEntry &e) STUB
void MemStore::disconnect(StoreEntry &e) STUB
void MemStore::reference(StoreEntry &) STUB
void MemStore::updateHeaders(StoreEntry *) STUB
uint64_t MemStore::currentCount() const STUB_RETVAL(0)
int64_t MemStore::maxObjectSize() const STUB_RETVAL(0)
bool MemStore::dereference(StoreEntry &) STUB_RETVAL(false)
-void MemStore::markForUnlink(StoreEntry&) STUB
-bool MemStore::anchorCollapsed(StoreEntry&, bool&) STUB_RETVAL(false)
-bool MemStore::updateCollapsed(StoreEntry&) STUB_RETVAL(false)
+void MemStore::evictCached(StoreEntry&) STUB
+void MemStore::evictIfFound(const cache_key *) STUB
+bool MemStore::anchorToCache(StoreEntry&, bool&) STUB_RETVAL(false)
+bool MemStore::updateAnchored(StoreEntry&) STUB_RETVAL(false)
bool StoreEntry::mayStartSwapOut() STUB_RETVAL(false)
void StoreEntry::trimMemory(const bool preserveSwappable) STUB
void StoreEntry::abort() STUB
-void StoreEntry::makePublic(const KeyScope scope) STUB
+bool StoreEntry::makePublic(const KeyScope scope) STUB
void StoreEntry::makePrivate(const bool shareable) STUB
-void StoreEntry::setPublicKey(const KeyScope scope) STUB
-void StoreEntry::setPrivateKey(const bool shareable) STUB
+bool StoreEntry::setPublicKey(const KeyScope scope) STUB
+void StoreEntry::setPrivateKey(const bool, const bool) STUB
void StoreEntry::expireNow() STUB
void StoreEntry::releaseRequest(const bool shareable) STUB
void StoreEntry::negativeCache() STUB
-void StoreEntry::cacheNegatively() STUB
-void StoreEntry::purgeMem() STUB
+bool StoreEntry::cacheNegatively() STUB
void StoreEntry::swapOut() STUB
void StoreEntry::swapOutFileClose(int how) STUB
const char *StoreEntry::url() const STUB_RETVAL(NULL)
return new StoreEntry();
}
void StoreEntry::operator delete(void *address) STUB
-void StoreEntry::setReleaseFlag() STUB
//#if USE_SQUID_ESI
//ESIElement::Pointer StoreEntry::cachedESITree STUB_RETVAL(NULL)
//#endif
int Store::Controller::store_dirs_rebuilding = 0;
StoreSearch *Store::Controller::search() STUB_RETVAL(NULL)
void Store::Controller::maintain() STUB
+bool Store::Controller::markedForDeletion(const cache_key *) const STUB_RETVAL(false)
+void Store::Controller::freeMemorySpace(const int) STUB
std::ostream &operator <<(std::ostream &os, const StoreEntry &)
{
StoreEntry *storeGetPublicByRequest(HttpRequest * request, const KeyScope scope) STUB_RETVAL(NULL)
StoreEntry *storeGetPublicByRequestMethod(HttpRequest * request, const HttpRequestMethod& method, const KeyScope scope) STUB_RETVAL(NULL)
StoreEntry *storeCreateEntry(const char *, const char *, const RequestFlags &, const HttpRequestMethod&) STUB_RETVAL(NULL)
-StoreEntry *storeCreatePureEntry(const char *storeId, const char *logUrl, const RequestFlags &, const HttpRequestMethod&) STUB_RETVAL(NULL)
+StoreEntry *storeCreatePureEntry(const char *storeId, const char *logUrl, const HttpRequestMethod&) STUB_RETVAL(nullptr)
void storeConfigure(void) STUB
int expiresMoreThan(time_t, time_t) STUB_RETVAL(0)
void storeAppendPrintf(StoreEntry *, const char *,...) STUB
#include "tests/STUB.h"
void storeRebuildProgress(int sd_index, int total, int sofar) STUB
-bool storeRebuildKeepEntry(const StoreEntry &tmpe, const cache_key *key, StoreRebuildData &counts) STUB_RETVAL(false)
bool storeRebuildParseEntry(MemBuf &, StoreEntry &, cache_key *, StoreRebuildData &, uint64_t) STUB_RETVAL(false)
void storeRebuildComplete(StoreRebuildData *)
// try to swap out entry to a used unlocked slot
{
- StoreEntry *const pe = addEntry(4);
+ // without marking the old entry as deleted
+ StoreEntry *const pe = addEntry(3);
- CPPUNIT_ASSERT_EQUAL(SWAPOUT_WRITING, pe->swap_status);
- CPPUNIT_ASSERT_EQUAL(0, pe->swap_dirn);
- CPPUNIT_ASSERT(pe->swap_filen >= 0);
+ CPPUNIT_ASSERT_EQUAL(SWAPOUT_NONE, pe->swap_status);
+ CPPUNIT_ASSERT_EQUAL(-1, pe->swap_dirn);
+ CPPUNIT_ASSERT_EQUAL(-1, pe->swap_filen);
+ pe->unlock("testRock::testRockSwapOut e#3");
+
+ // after marking the old entry as deleted
+ StoreEntry *const pe2 = getEntry(4);
+ CPPUNIT_ASSERT(pe2 != nullptr);
+ pe2->release();
+
+ StoreEntry *const pe3 = addEntry(4);
+ CPPUNIT_ASSERT_EQUAL(SWAPOUT_WRITING, pe3->swap_status);
+ CPPUNIT_ASSERT_EQUAL(0, pe3->swap_dirn);
+ CPPUNIT_ASSERT(pe3->swap_filen >= 0);
StockEventLoop loop;
loop.run();
- CPPUNIT_ASSERT_EQUAL(SWAPOUT_DONE, pe->swap_status);
+ CPPUNIT_ASSERT_EQUAL(SWAPOUT_DONE, pe3->swap_status);
pe->unlock("testRock::testRockSwapOut e#4");
}
e->expires = squid_curtime;
e->lastModified(squid_curtime);
e->refcount = 1;
- EBIT_CLR(e->flags, RELEASE_REQUEST);
- e->clearPrivate();
e->ping_status = PING_NONE;
EBIT_CLR(e->flags, ENTRY_VALIDATED);
e->hashInsert((const cache_key *)name.termedBuf()); /* do it after we clear KEY_PRIVATE */
e->expires = squid_curtime;
e->lastModified(squid_curtime);
e->refcount = 1;
- EBIT_CLR(e->flags, RELEASE_REQUEST);
- e->clearPrivate();
e->ping_status = PING_NONE;
EBIT_CLR(e->flags, ENTRY_VALIDATED);
e->hashInsert((const cache_key *)name.termedBuf()); /* do it after we clear KEY_PRIVATE */
entry->timestampsSet();
entry->flush();
- entry->makePublic();
+ if (!entry->makePublic())
+ entry->makePrivate(true);
fwd->complete();
debugs(75, 3, "whoisReadReply: Done: " << entry->url());