From: Alex Rousskov Date: Fri, 7 Jun 2013 23:34:36 +0000 (-0600) Subject: Support "appending" read/write lock state that can be shared by readers X-Git-Tag: SQUID_3_5_0_1~444^2~60 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=ce49546;p=thirdparty%2Fsquid.git Support "appending" read/write lock state that can be shared by readers and writer. Writer promises not to update key metadata (except growing object size and next pointers) and readers promise to be careful when reading growing slices. Support copying of partially cached entries from the shared memory cache to local RAM. This is required for collapsed shared memory hits to receive new data during broadcasted updates. Properly unlock objects in the shared memory cache when their entries are abandoned by a worker. This was not necessary before because we never locked memory cache entries for more than a single method call. Now, with partially cached entries support, the locks may persist much longer. Properly delete objects from the shared memory cache when they are purged by a worker. Before this change, locally purged objects may have stayed in the shared memory cache. Update disk cache index _after_ the changes are written to disk. Another worker may be using that index and will expect to find the indexed slices on disk. Disk queues are not FIFOs across workers. Made CollapsedForwarding work better in non-SMP mode. Polished broadcasting code. We need to broadcast entry key because the entry may not have any other information (it may no longer be cached by the sender, for example). Implemented "anchoring" in-transit entries when the writer caches the corresponding object. This allows the reader's entry object to reflect its cached status and, hence, be able to ask for cached data during broadcasted entry updates. Still need to handle the case where the writer does not cache the object (by aborting collapsed hit). --- diff --git a/src/CollapsedForwarding.cc b/src/CollapsedForwarding.cc index 657e6d6c09..d1b108828d 100644 --- a/src/CollapsedForwarding.cc +++ b/src/CollapsedForwarding.cc @@ -9,8 +9,10 @@ #include "ipc/Port.h" #include "ipc/TypedMsgHdr.h" #include "CollapsedForwarding.h" -#include "SquidConfig.h" #include "globals.h" +#include "SquidConfig.h" +#include "Store.h" +#include "store_key_md5.h" #include "tools.h" /// shared memory segment path to use for CollapsedForwarding queue @@ -25,11 +27,11 @@ std::auto_ptr CollapsedForwarding::queue; class CollapsedForwardingMsg { public: - CollapsedForwardingMsg(): processId(-1) {} + CollapsedForwardingMsg(): sender(-1) { key[0] = key[1] = 0; } public: - int processId; /// ID of sending process - // XXX: add entry info + int sender; /// kid ID of sending process + uint64_t key[2]; ///< StoreEntry key }; // CollapsedForwarding @@ -38,21 +40,26 @@ void CollapsedForwarding::Init() { Must(!queue.get()); - queue.reset(new Queue(ShmLabel, KidIdentifier)); + if (UsingSmp() && IamWorkerProcess()) + queue.reset(new Queue(ShmLabel, KidIdentifier)); } void -CollapsedForwarding::NewData(const StoreIOState &sio) +CollapsedForwarding::Broadcast(const cache_key *key) { + if (!queue.get()) + return; + CollapsedForwardingMsg msg; - msg.processId = KidIdentifier; - // XXX: copy data from sio + msg.sender = KidIdentifier; + memcpy(msg.key, key, sizeof(msg.key)); + + debugs(17, 5, storeKeyText(key) << " to " << Config.workers << "-1 workers"); // TODO: send only to workers who are waiting for data - // XXX: does not work for non-daemon mode? for (int workerId = 1; workerId <= Config.workers; ++workerId) { try { - if (queue->push(workerId, msg)) + if (workerId != KidIdentifier && queue->push(workerId, msg)) Notify(workerId); } catch (const Queue::Full &) { debugs(17, DBG_IMPORTANT, "Worker collapsed forwarding push queue " @@ -66,7 +73,7 @@ void CollapsedForwarding::Notify(const int workerId) { // TODO: Count and report the total number of notifications, pops, pushes. - debugs(17, 7, HERE << "kid" << workerId); + debugs(17, 7, "to kid" << workerId); Ipc::TypedMsgHdr msg; // TODO: add proper message type? msg.setType(Ipc::mtCollapsedForwardingNotification); @@ -78,18 +85,19 @@ CollapsedForwarding::Notify(const int workerId) void CollapsedForwarding::HandleNewData(const char *const when) { - debugs(17, 4, HERE << "popping all " << when); + debugs(17, 4, "popping all " << when); CollapsedForwardingMsg msg; int workerId; int poppedCount = 0; while (queue->pop(workerId, msg)) { - debugs(17, 3, HERE << "collapsed forwarding data message from " << - workerId); - if (workerId != msg.processId) { - debugs(17, DBG_IMPORTANT, HERE << "mismatching IDs: " << workerId << - " != " << msg.processId); + debugs(17, 3, "message from kid" << workerId); + if (workerId != msg.sender) { + debugs(17, DBG_IMPORTANT, "mismatching kid IDs: " << workerId << + " != " << msg.sender); } + Store::Root().syncCollapsed(reinterpret_cast(msg.key)); + // XXX: stop and schedule an async call to continue assert(++poppedCount < SQUID_MAXFD); } @@ -100,6 +108,7 @@ CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg) { const int from = msg.getInt(); debugs(17, 7, HERE << "from " << from); + assert(queue.get()); queue->clearReaderSignal(from); HandleNewData("after notification"); } @@ -132,8 +141,7 @@ void CollapsedForwardingRr::create(const RunnerRegistry &) void CollapsedForwardingRr::open(const RunnerRegistry &) { - if (IamWorkerProcess()) - CollapsedForwarding::Init(); + CollapsedForwarding::Init(); } CollapsedForwardingRr::~CollapsedForwardingRr() diff --git a/src/CollapsedForwarding.h b/src/CollapsedForwarding.h index e183802f44..6af07a82cf 100644 --- a/src/CollapsedForwarding.h +++ b/src/CollapsedForwarding.h @@ -8,6 +8,7 @@ #include "ipc/Queue.h" #include "ipc/forward.h" +#include "typedefs.h" #include @@ -20,9 +21,12 @@ public: /// open shared memory segment static void Init(); - /// notify other workers that new data is available + /// XXX: remove static void NewData(const StoreIOState &sio); + /// notify other workers that new data is available + static void Broadcast(const cache_key *key); + /// kick worker with empty IPC queue static void Notify(const int workerId); diff --git a/src/MemObject.cc b/src/MemObject.cc index 7e403697a4..81884c0b93 100644 --- a/src/MemObject.cc +++ b/src/MemObject.cc @@ -83,7 +83,7 @@ MemObject::resetUrls(char const *aUrl, char const *aLog_url) url = xstrdup(aUrl); } -MemObject::MemObject(char const *aUrl, char const *aLog_url) +MemObject::MemObject(char const *aUrl, char const *aLog_url): mem_index(-1) { debugs(20, 3, HERE << "new MemObject " << this); _reply = new HttpReply; @@ -115,6 +115,7 @@ MemObject::~MemObject() assert(chksum == url_checksum(url)); #endif + assert(mem_index < 0); if (!shutting_down) assert(swapout.sio == NULL); diff --git a/src/MemObject.h b/src/MemObject.h index a4548b0fb4..cee4c9e3e2 100644 --- a/src/MemObject.h +++ b/src/MemObject.h @@ -139,6 +139,7 @@ public: } abort; char *log_url; RemovalPolicyNode repl; + int32_t mem_index; ///< entry position inside the [shared] memory cache int id; int64_t object_sz; size_t swap_hdr_sz; diff --git a/src/MemStore.cc b/src/MemStore.cc index ea4ee2d3fc..8a9d96998f 100644 --- a/src/MemStore.cc +++ b/src/MemStore.cc @@ -5,6 +5,7 @@ #include "squid.h" #include "base/RunnersRegistry.h" +#include "CollapsedForwarding.h" #include "HttpReply.h" #include "ipc/mem/Page.h" #include "ipc/mem/Pages.h" @@ -186,34 +187,26 @@ MemStore::get(const cache_key *key) if (!slot) return NULL; - const Ipc::StoreMapAnchor::Basics &basics = slot->basics; - // create a brand new store entry and initialize it with stored info StoreEntry *e = new StoreEntry(); e->lock_count = 0; - e->swap_file_sz = basics.swap_file_sz; - e->lastref = basics.lastref; - e->timestamp = basics.timestamp; - e->expires = basics.expires; - e->lastmod = basics.lastmod; - e->refcount = basics.refcount; - e->flags = basics.flags; - - e->store_status = STORE_OK; - e->mem_status = IN_MEMORY; // setMemStatus(IN_MEMORY) requires mem_obj - //e->swap_status = set in StoreEntry constructor to SWAPOUT_NONE; - e->ping_status = PING_NONE; + // XXX: We do not know the URLs yet, only the key, but we need to parse and + // store the response for the Root().get() callers to be happy because they + // expect IN_MEMORY entries to already have the response headers and body. + // At least one caller calls createMemObject() if there is not one, so + // we hide the true object until that happens (to avoid leaking TBD URLs). + e->createMemObject("TBD", "TBD"); - EBIT_SET(e->flags, ENTRY_CACHABLE); - EBIT_CLR(e->flags, RELEASE_REQUEST); - EBIT_CLR(e->flags, KEY_PRIVATE); - EBIT_SET(e->flags, ENTRY_VALIDATED); + anchorEntry(*e, index, *slot); const bool copied = copyFromShm(*e, index, *slot); // we copied everything we could to local memory; no more need to lock map->closeForReading(index); + e->mem_obj->mem_index = -1; + + e->hideMemObject(); if (copied) { e->hashInsert(key); @@ -232,34 +225,129 @@ MemStore::get(String const key, STOREGETCLIENT aCallback, void *aCallbackData) fatal("MemStore::get(key,callback,data) should not be called"); } +bool +MemStore::anchorCollapsed(StoreEntry &collapsed) +{ + if (!map) + return false; + + sfileno index; + const Ipc::StoreMapAnchor *const slot = map->openForReading( + reinterpret_cast(collapsed.key), index); + if (!slot) + return false; + + anchorEntry(collapsed, index, *slot); + return updateCollapsedWith(collapsed, index, *slot); +} + +bool +MemStore::updateCollapsed(StoreEntry &collapsed) +{ + if (!map) + return false; + + if (collapsed.mem_status != IN_MEMORY) // no longer using a memory cache + return false; + + const sfileno index = collapsed.mem_obj->mem_index; + + // already disconnected from the cache, no need to update + if (index < 0) + return true; + + const Ipc::StoreMapAnchor &anchor = map->readableEntry(index); + return updateCollapsedWith(collapsed, index, anchor); +} + +bool +MemStore::updateCollapsedWith(StoreEntry &collapsed, const sfileno index, const Ipc::StoreMapAnchor &anchor) +{ + collapsed.swap_file_sz = anchor.basics.swap_file_sz; // XXX: make atomic + + const bool copied = copyFromShm(collapsed, index, anchor); + + return copied; // XXX: when do we unlock the map slot? +} + +/// anchors StoreEntry to an already locked map entry +void +MemStore::anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor) +{ + const Ipc::StoreMapAnchor::Basics &basics = anchor.basics; + + e.swap_file_sz = basics.swap_file_sz; + e.lastref = basics.lastref; + e.timestamp = basics.timestamp; + e.expires = basics.expires; + e.lastmod = basics.lastmod; + e.refcount = basics.refcount; + e.flags = basics.flags; + + assert(e.mem_obj); + e.store_status = STORE_OK; + e.setMemStatus(IN_MEMORY); + e.mem_obj->mem_index = index; + assert(e.swap_status == SWAPOUT_NONE); // set in StoreEntry constructor + e.ping_status = PING_NONE; + + EBIT_SET(e.flags, ENTRY_CACHABLE); + EBIT_CLR(e.flags, RELEASE_REQUEST); + EBIT_CLR(e.flags, KEY_PRIVATE); + EBIT_SET(e.flags, ENTRY_VALIDATED); +} + /// copies the entire entry from shared to local memory bool MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor) { debugs(20, 7, "mem-loading entry " << index << " from " << anchor.start); - // XXX: We do not know the URLs yet, only the key, but we need to parse and - // store the response for the Root().get() callers to be happy because they - // expect IN_MEMORY entries to already have the response headers and body. - // At least one caller calls createMemObject() if there is not one, so - // we hide the true object until that happens (to avoid leaking TBD URLs). - e.createMemObject("TBD", "TBD"); - // emulate the usual Store code but w/o inapplicable checks and callbacks: - Ipc::StoreMapSliceId sid = anchor.start; - int64_t offset = 0; + Ipc::StoreMapSliceId sid = anchor.start; // optimize: remember the last sid + bool wasEof = anchor.complete() && sid < 0; + int64_t sliceOffset = 0; while (sid >= 0) { const Ipc::StoreMapSlice &slice = map->readableSlice(index, sid); - const MemStoreMap::Extras &extras = map->extras(sid); - StoreIOBuffer sliceBuf(slice.size, offset, - static_cast(PagePointer(extras.page))); - if (!copyFromShmSlice(e, sliceBuf, slice.next < 0)) - return false; - debugs(20, 9, "entry " << index << " slice " << sid << " filled " << - extras.page); - offset += slice.size; - sid = slice.next; + // slice state may change during copying; take snapshots now + wasEof = anchor.complete() && slice.next < 0; + const Ipc::StoreMapSlice::Size wasSize = slice.size; + + if (e.mem_obj->endOffset() < sliceOffset + wasSize) { + // size of the slice data that we already copied + const size_t prefixSize = e.mem_obj->endOffset() - sliceOffset; + assert(prefixSize <= wasSize); + + const MemStoreMap::Extras &extras = map->extras(sid); + char *page = static_cast(PagePointer(extras.page)); + const StoreIOBuffer sliceBuf(wasSize - prefixSize, + e.mem_obj->endOffset(), + page + prefixSize); + if (!copyFromShmSlice(e, sliceBuf, wasEof)) + return false; + debugs(20, 9, "entry " << index << " copied slice " << sid << + " from " << extras.page << " +" << prefixSize); + } + // else skip a [possibly incomplete] slice that we copied earlier + + // careful: the slice may have grown _and_ gotten the next slice ID! + if (slice.next >= 0) { + assert(!wasEof); + // here we know that slice.size may not change any more + if (wasSize >= slice.size) { // did not grow since we started copying + sliceOffset += wasSize; + sid = slice.next; + } + } else if (wasSize >= slice.size) { // did not grow + break; + } + } + + if (!wasEof) { + debugs(20, 7, "mem-loaded " << e.mem_obj->endOffset() << '/' << + anchor.basics.swap_file_sz << " bytes of " << e); + return true; } e.mem_obj->object_sz = e.mem_obj->endOffset(); // from StoreEntry::complete() @@ -269,15 +357,13 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc assert(static_cast(e.mem_obj->object_sz) == anchor.basics.swap_file_sz); // would be nice to call validLength() here, but it needs e.key - - e.hideMemObject(); - + // XXX: unlock acnhor here! return true; } /// imports one shared memory slice into local memory bool -MemStore::copyFromShmSlice(StoreEntry &e, StoreIOBuffer &buf, bool eof) +MemStore::copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof) { debugs(20, 7, "buf: " << buf.offset << " + " << buf.length); @@ -392,6 +478,7 @@ MemStore::keep(StoreEntry &e) if (copyToShm(e, index, *slot)) { slot->set(e); map->closeForWriting(index, false); + CollapsedForwarding::Broadcast(static_cast(e.key)); return; } // fall through to the error handling code @@ -403,6 +490,7 @@ MemStore::keep(StoreEntry &e) } map->abortIo(index); + CollapsedForwarding::Broadcast(static_cast(e.key)); } /// copies all local data to shared memory @@ -534,6 +622,29 @@ MemStore::noteFreeMapSlice(const sfileno sliceId) } } +void +MemStore::unlink(StoreEntry &e) +{ + assert(e.mem_obj); + if (e.mem_obj->mem_index >= 0) { + map->freeEntry(e.mem_obj->mem_index); + disconnect(e); + } else { + map->freeEntryByKey(reinterpret_cast(e.key)); + } + e.destroyMemObject(); +} + +void +MemStore::disconnect(StoreEntry &e) +{ + assert(e.mem_obj); + if (e.mem_obj->mem_index >= 0) { + map->abortIo(e.mem_obj->mem_index); + e.mem_obj->mem_index = -1; + } +} + /// calculates maximum number of entries we need to store and map int64_t MemStore::EntryLimit() diff --git a/src/MemStore.h b/src/MemStore.h index 02fc2188c8..ed4d2a38f9 100644 --- a/src/MemStore.h +++ b/src/MemStore.h @@ -26,6 +26,12 @@ public: /// whether e should be kept in local RAM for possible future caching bool keepInLocalMemory(const StoreEntry &e) const; + /// remove from the cache + void unlink(StoreEntry &e); + + /// called when the entry is about to forget its association with mem cache + void disconnect(StoreEntry &e); + /* Store API */ virtual int callback(); virtual StoreEntry * get(const cache_key *); @@ -42,6 +48,8 @@ public: virtual void reference(StoreEntry &); virtual bool dereference(StoreEntry &, bool); virtual void maintain(); + virtual bool anchorCollapsed(StoreEntry &collapsed); + virtual bool updateCollapsed(StoreEntry &collapsed); static int64_t EntryLimit(); @@ -51,7 +59,10 @@ protected: bool copyToShm(StoreEntry &e, const sfileno index, Ipc::StoreMapAnchor &anchor); bool copyToShmSlice(StoreEntry &e, const sfileno index, Ipc::StoreMapAnchor &anchor, int64_t &offset); bool copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor); - bool copyFromShmSlice(StoreEntry &e, StoreIOBuffer &buf, bool eof); + bool copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof); + + void anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor); + bool updateCollapsedWith(StoreEntry &collapsed, const sfileno index, const Ipc::StoreMapAnchor &anchor); sfileno reserveSapForWriting(Ipc::Mem::PageId &page); diff --git a/src/Store.h b/src/Store.h index f63e8f3a1d..53dbaa77ce 100644 --- a/src/Store.h +++ b/src/Store.h @@ -374,6 +374,22 @@ public: /// makes the entry available for collapsing future requests virtual void allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod) {} + // XXX: This method belongs to Store::Root/StoreController, but it is here + // to avoid casting Root() to StoreController until Root() API is fixed. + /// Update local intransit entry after changes made by appending worker. + virtual void syncCollapsed(const cache_key *key) {} + + // XXX: This method belongs to Store::Root/StoreController, but it is here + // to avoid casting Root() to StoreController until Root() API is fixed. + /// removes the entry from the memory cache + virtual void memoryUnlink(StoreEntry &e) {} + + /// if the entry is found, tie it to this cache and call updateCollapsed() + virtual bool anchorCollapsed(StoreEntry &collapsed) { return false; } + + /// update a local collapsed entry with fresh info from this cache (if any) + virtual bool updateCollapsed(StoreEntry &collapsed) { return false; } + private: static RefCount CurrentRoot; }; diff --git a/src/SwapDir.h b/src/SwapDir.h index 2d93de43e4..ca151b5d58 100644 --- a/src/SwapDir.h +++ b/src/SwapDir.h @@ -63,7 +63,9 @@ public: /* Store parent API */ virtual void handleIdleEntry(StoreEntry &e); virtual void maybeTrimMemory(StoreEntry &e, const bool preserveSwappable); + virtual void memoryUnlink(StoreEntry &e); virtual void allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod); + virtual void syncCollapsed(const cache_key *key); virtual void init(); @@ -96,6 +98,7 @@ public: private: void createOneStore(Store &aStore); bool keepForLocalMemoryCache(const StoreEntry &e) const; + bool anchorCollapsedOnDisk(StoreEntry &collapsed); StorePointer swapDir; ///< summary view of all disk caches MemStore *memStore; ///< memory cache diff --git a/src/Transients.cc b/src/Transients.cc index de8344decb..0b1573a75a 100644 --- a/src/Transients.cc +++ b/src/Transients.cc @@ -175,8 +175,8 @@ Transients::get(const cache_key *key) // neighbors_do_private_keys (which is true in most cases and by default). // This is nothing but waste of CPU cycles. Need a better API to avoid it. e->setPublicKey(); + assert(e->key); - assert(e->next); // e->hashInsert(key) is done in setPublicKey() return e; } diff --git a/src/fs/rock/RockIoRequests.cc b/src/fs/rock/RockIoRequests.cc index cf84b7b6d5..25c08d451c 100644 --- a/src/fs/rock/RockIoRequests.cc +++ b/src/fs/rock/RockIoRequests.cc @@ -16,10 +16,10 @@ Rock::ReadRequest::ReadRequest(const ::ReadRequest &base, } Rock::WriteRequest::WriteRequest(const ::WriteRequest &base, - const IoState::Pointer &anSio, - const bool last): + const IoState::Pointer &anSio): ::WriteRequest(base), sio(anSio), - isLast(last) + sidCurrent(-1), + sidNext(-1) { } diff --git a/src/fs/rock/RockIoRequests.h b/src/fs/rock/RockIoRequests.h index 29839fc84a..c985b4940b 100644 --- a/src/fs/rock/RockIoRequests.h +++ b/src/fs/rock/RockIoRequests.h @@ -25,9 +25,14 @@ private: class WriteRequest: public ::WriteRequest { public: - WriteRequest(const ::WriteRequest &base, const IoState::Pointer &anSio, const bool last); + WriteRequest(const ::WriteRequest &base, const IoState::Pointer &anSio); IoState::Pointer sio; - const bool isLast; + + /// slot being written using this write request + SlotId sidCurrent; + + /// allocated next slot (negative if we are writing the last slot) + SlotId sidNext; private: CBDATA_CLASS2(WriteRequest); diff --git a/src/fs/rock/RockIoState.cc b/src/fs/rock/RockIoState.cc index c7ed4722c5..22c86dd9c8 100644 --- a/src/fs/rock/RockIoState.cc +++ b/src/fs/rock/RockIoState.cc @@ -179,7 +179,7 @@ Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff) // write partial buffer for all collapsed hit readers to see // XXX: can we check that this is needed w/o stalling readers // that appear right after our check? - writeBufToDisk(false); + writeBufToDisk(-1); } } @@ -221,12 +221,6 @@ Rock::IoState::writeToDisk(const SlotId sidNext) // TODO: if DiskIO module is mmap-based, we should be writing whole pages // to avoid triggering read-page;new_head+old_tail;write-page overheads - // finalize map slice - Ipc::StoreMap::Slice &slice = - dir->map->writeableSlice(swap_filen, sidCurrent); - slice.next = sidNext; - slice.size = theBuf.size - sizeof(DbCellHeader); - // finalize db cell header DbCellHeader header; memcpy(header.key, e->key, sizeof(header.key)); @@ -239,7 +233,7 @@ Rock::IoState::writeToDisk(const SlotId sidNext) // copy finalized db cell header into buffer memcpy(theBuf.mem, &header, sizeof(DbCellHeader)); - writeBufToDisk(sidNext < 0); + writeBufToDisk(sidNext); theBuf.clear(); sidCurrent = sidNext; @@ -247,7 +241,7 @@ Rock::IoState::writeToDisk(const SlotId sidNext) /// Write header-less (XXX) or complete buffer to disk. void -Rock::IoState::writeBufToDisk(const bool last) +Rock::IoState::writeBufToDisk(const SlotId sidNext) { // and now allocate another buffer for the WriteRequest so that // we can support concurrent WriteRequests (and to ease cleaning) @@ -262,7 +256,9 @@ Rock::IoState::writeBufToDisk(const bool last) WriteRequest *const r = new WriteRequest( ::WriteRequest(static_cast(wBuf), diskOffset, theBuf.size, - memFreeBufFunc(wBufCap)), this, last); + memFreeBufFunc(wBufCap)), this); + r->sidCurrent = sidCurrent; + r->sidNext = sidNext; // theFile->write may call writeCompleted immediatelly theFile->write(r); diff --git a/src/fs/rock/RockIoState.h b/src/fs/rock/RockIoState.h index f22a736e0a..29e02f5d57 100644 --- a/src/fs/rock/RockIoState.h +++ b/src/fs/rock/RockIoState.h @@ -50,7 +50,7 @@ private: void tryWrite(char const *buf, size_t size, off_t offset); size_t writeToBuffer(char const *buf, size_t size); void writeToDisk(const SlotId nextSlot); - void writeBufToDisk(const bool last); + void writeBufToDisk(const SlotId nextSlot); SlotId reserveSlotForWriting(); void callBack(int errflag); diff --git a/src/fs/rock/RockSwapDir.cc b/src/fs/rock/RockSwapDir.cc index a732232fd7..f7a840d776 100644 --- a/src/fs/rock/RockSwapDir.cc +++ b/src/fs/rock/RockSwapDir.cc @@ -69,28 +69,11 @@ Rock::SwapDir::get(const cache_key *key) if (!slot) return NULL; - const Ipc::StoreMapAnchor::Basics &basics = slot->basics; - // create a brand new store entry and initialize it with stored basics StoreEntry *e = new StoreEntry(); e->lock_count = 0; - e->swap_dirn = index; - e->swap_filen = filen; - e->swap_file_sz = basics.swap_file_sz; - e->lastref = basics.lastref; - e->timestamp = basics.timestamp; - e->expires = basics.expires; - e->lastmod = basics.lastmod; - e->refcount = basics.refcount; - e->flags = basics.flags; - e->store_status = STORE_OK; - e->setMemStatus(NOT_IN_MEMORY); - e->swap_status = SWAPOUT_DONE; - e->ping_status = PING_NONE; - EBIT_SET(e->flags, ENTRY_CACHABLE); - EBIT_CLR(e->flags, RELEASE_REQUEST); - EBIT_CLR(e->flags, KEY_PRIVATE); - EBIT_SET(e->flags, ENTRY_VALIDATED); + anchorEntry(*e, filen, *slot); + e->hashInsert(key); trackReferences(*e); @@ -98,6 +81,70 @@ Rock::SwapDir::get(const cache_key *key) // the disk entry remains open for reading, protected from modifications } +bool +Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed) +{ + if (!map || !theFile || !theFile->canRead()) + return false; + + sfileno filen; + const Ipc::StoreMapAnchor *const slot = map->openForReading( + reinterpret_cast(collapsed.key), filen); + if (!slot) + return false; + + anchorEntry(collapsed, filen, *slot); + return updateCollapsedWith(collapsed, *slot); +} + +bool +Rock::SwapDir::updateCollapsed(StoreEntry &collapsed) +{ + if (!map || !theFile || !theFile->canRead()) + return false; + + if (collapsed.swap_filen < 0) // no longer using a disk cache + return true; + assert(collapsed.swap_dirn == index); + + const Ipc::StoreMapAnchor &s = map->readableEntry(collapsed.swap_filen); + return updateCollapsedWith(collapsed, s); +} + +bool +Rock::SwapDir::updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor) +{ + collapsed.swap_file_sz = anchor.basics.swap_file_sz; // XXX: make atomic + return true; +} + +void +Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor) +{ + const Ipc::StoreMapAnchor::Basics &basics = anchor.basics; + + e.swap_file_sz = basics.swap_file_sz; + e.swap_dirn = index; + e.swap_filen = filen; + e.lastref = basics.lastref; + e.timestamp = basics.timestamp; + e.expires = basics.expires; + e.lastmod = basics.lastmod; + e.refcount = basics.refcount; + e.flags = basics.flags; + + e.store_status = STORE_OK; + e.setMemStatus(NOT_IN_MEMORY); + e.swap_status = SWAPOUT_DONE; + e.ping_status = PING_NONE; + + EBIT_SET(e.flags, ENTRY_CACHABLE); + EBIT_CLR(e.flags, RELEASE_REQUEST); + EBIT_CLR(e.flags, KEY_PRIVATE); + EBIT_SET(e.flags, ENTRY_VALIDATED); +} + + void Rock::SwapDir::disconnect(StoreEntry &e) { assert(e.swap_dirn == index); @@ -748,14 +795,16 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest return; } - // XXX: can we check that this is needed w/o stalling readers - // that appear right after our check? - if (Config.onoff.collapsed_forwarding) - CollapsedForwarding::NewData(sio); - if (errflag == DISK_OK) { // do not increment sio.offset_ because we do it in sio->write() - if (request->isLast) { + + // finalize the shared slice info after writing slice contents to disk + Ipc::StoreMap::Slice &slice = + map->writeableSlice(sio.swap_filen, request->sidCurrent); + slice.size = request->len - sizeof(DbCellHeader); + slice.next = request->sidNext; + + if (request->sidNext < 0) { // close, the entry gets the read lock map->closeForWriting(sio.swap_filen, true); sio.finishedWriting(errflag); @@ -765,6 +814,8 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest sio.finishedWriting(errflag); // and hope that Core will call disconnect() to close the map entry } + + CollapsedForwarding::Broadcast(static_cast(sio.e->key)); } void diff --git a/src/fs/rock/RockSwapDir.h b/src/fs/rock/RockSwapDir.h index 301d0f410e..eba5ed6019 100644 --- a/src/fs/rock/RockSwapDir.h +++ b/src/fs/rock/RockSwapDir.h @@ -65,6 +65,10 @@ public: uint64_t slotSize; ///< all db slots are of this size protected: + /* Store API */ + virtual bool anchorCollapsed(StoreEntry &collapsed); + virtual bool updateCollapsed(StoreEntry &collapsed); + /* protected ::SwapDir API */ virtual bool needsDiskStrand() const; virtual void init(); @@ -107,6 +111,9 @@ protected: int entryMaxPayloadSize() const; int entriesNeeded(const int64_t objSize) const; + void anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor); + bool updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor); + friend class Rebuild; friend class IoState; const char *filePath; ///< location of cache storage file inside path/ diff --git a/src/ipc/ReadWriteLock.cc b/src/ipc/ReadWriteLock.cc index 14ba1b4549..4732df4e2b 100644 --- a/src/ipc/ReadWriteLock.cc +++ b/src/ipc/ReadWriteLock.cc @@ -10,7 +10,7 @@ bool Ipc::ReadWriteLock::lockShared() { ++readers; // this locks "new" writers out - if (!writers) // there are no old writers + if (!writers || appending) // there are no old writers or sharing is OK return true; --readers; return false; @@ -36,6 +36,7 @@ Ipc::ReadWriteLock::unlockShared() void Ipc::ReadWriteLock::unlockExclusive() { + appending = 0; assert(writers-- > 0); } @@ -46,6 +47,13 @@ Ipc::ReadWriteLock::switchExclusiveToShared() unlockExclusive(); } +void +Ipc::ReadWriteLock::startAppending() +{ + assert(writers > 0); + appending++; +} + void Ipc::ReadWriteLock::updateStats(ReadWriteLockStats &stats) const { @@ -55,6 +63,7 @@ Ipc::ReadWriteLock::updateStats(ReadWriteLockStats &stats) const } else if (writers) { ++stats.writeable; stats.writers += writers; + stats.appenders += appending; } else { ++stats.idle; } @@ -87,7 +96,9 @@ Ipc::ReadWriteLockStats::dump(StoreEntry &e) const const int locked = readers + writers; storeAppendPrintf(&e, "Readers: %9d %6.2f%%\n", readers, (100.0 * readers / locked)); - storeAppendPrintf(&e, "Writers: %9d %6.2f%%\n", - writers, (100.0 * writers / locked)); + const double appPerc = writers ? (100.0 * appenders / writers) : 0.0; + storeAppendPrintf(&e, "Writers: %9d %6.2f%% including Appenders: %9d %6.2f%%\n", + writers, (100.0 * writers / locked), + appenders, appPerc); } } diff --git a/src/ipc/ReadWriteLock.h b/src/ipc/ReadWriteLock.h index 925e02ca0d..5292cad251 100644 --- a/src/ipc/ReadWriteLock.h +++ b/src/ipc/ReadWriteLock.h @@ -11,6 +11,9 @@ namespace Ipc class ReadWriteLockStats; /// an atomic readers-writer or shared-exclusive lock suitable for maps/tables +/// Also supports reading-while-appending mode when readers and writer are +/// allowed to access the same locked object because the writer promisses +/// to only append new data and all size-related object properties are atomic. class ReadWriteLock { public: @@ -22,12 +25,15 @@ public: void unlockExclusive(); ///< undo successful exclusiveLock() void switchExclusiveToShared(); ///< stop writing, start reading + void startAppending(); ///< writer keeps its lock but also allows reading + /// adds approximate current stats to the supplied ones void updateStats(ReadWriteLockStats &stats) const; public: mutable Atomic::Word readers; ///< number of users trying to read Atomic::Word writers; ///< number of writers trying to modify protected data + Atomic::Word appending; ///< the writer has promissed to only append }; /// approximate stats of a set of ReadWriteLocks @@ -44,6 +50,7 @@ public: int idle; ///< number of unlocked locks int readers; ///< sum of lock.readers int writers; ///< sum of lock.writers + int appenders; ///< number of appending writers }; } // namespace Ipc diff --git a/src/ipc/StoreMap.cc b/src/ipc/StoreMap.cc index 4f992977e8..01a44ce9f3 100644 --- a/src/ipc/StoreMap.cc +++ b/src/ipc/StoreMap.cc @@ -39,7 +39,7 @@ Ipc::StoreMap::compareVersions(const sfileno fileno, time_t newVersion) const // note: we do not lock, so comparison may be inacurate - if (inode.state == Anchor::Empty) + if (inode.empty()) return +2; if (const time_t diff = newVersion - inode.basics.timestamp) @@ -54,14 +54,14 @@ Ipc::StoreMap::forgetWritingEntry(sfileno fileno) assert(valid(fileno)); Anchor &inode = shared->slots[fileno].anchor; - assert(inode.state == Anchor::Writeable); + assert(inode.writing()); // we do not iterate slices because we were told to forget about // them; the caller is responsible for freeing them (most likely // our slice list is incomplete or has holes) inode.waitingToBeFreed = false; - inode.state = Anchor::Empty; + inode.rewind(); inode.lock.unlockExclusive(); --shared->count; @@ -91,10 +91,10 @@ Ipc::StoreMap::openForWritingAt(const sfileno fileno, bool overwriteExisting) ReadWriteLock &lock = s.lock; if (lock.lockExclusive()) { - assert(s.state != Anchor::Writeable); // until we start breaking locks + assert(s.writing() && !s.reading()); // bail if we cannot empty this position - if (!s.waitingToBeFreed && s.state == Anchor::Readable && !overwriteExisting) { + if (!s.waitingToBeFreed && !s.empty() && !overwriteExisting) { lock.unlockExclusive(); debugs(54, 5, "cannot open existing entry " << fileno << " for writing " << path); @@ -102,12 +102,11 @@ Ipc::StoreMap::openForWritingAt(const sfileno fileno, bool overwriteExisting) } // free if the entry was used, keeping the entry locked - if (s.waitingToBeFreed || s.state == Anchor::Readable) + if (s.waitingToBeFreed || !s.empty()) freeChain(fileno, s, true); - assert(s.state == Anchor::Empty); + assert(s.empty()); ++shared->count; - s.state = Anchor::Writeable; //s.setKey(key); // XXX: the caller should do that debugs(54, 5, "opened entry " << fileno << " for writing " << path); @@ -119,20 +118,31 @@ Ipc::StoreMap::openForWritingAt(const sfileno fileno, bool overwriteExisting) return NULL; } +void +Ipc::StoreMap::startAppending(const sfileno fileno) +{ + assert(valid(fileno)); + Anchor &s = shared->slots[fileno].anchor; + assert(s.writing()); + s.lock.startAppending(); + debugs(54, 5, "restricted entry " << fileno << " to appending " << path); +} + void Ipc::StoreMap::closeForWriting(const sfileno fileno, bool lockForReading) { assert(valid(fileno)); Anchor &s = shared->slots[fileno].anchor; - assert(s.state == Anchor::Writeable); - s.state = Anchor::Readable; + assert(s.writing()); if (lockForReading) { s.lock.switchExclusiveToShared(); debugs(54, 5, "switched entry " << fileno << " from writing to reading " << path); + assert(s.complete()); } else { s.lock.unlockExclusive(); debugs(54, 5, "closed entry " << fileno << " for writing " << path); + // cannot assert completeness here because we have no lock } } @@ -140,7 +150,7 @@ Ipc::StoreMap::Slice & Ipc::StoreMap::writeableSlice(const AnchorId anchorId, const SliceId sliceId) { assert(valid(anchorId)); - assert(shared->slots[anchorId].anchor.state == Anchor::Writeable); + assert(shared->slots[anchorId].anchor.writing()); assert(valid(sliceId)); return shared->slots[sliceId].slice; } @@ -149,7 +159,7 @@ const Ipc::StoreMap::Slice & Ipc::StoreMap::readableSlice(const AnchorId anchorId, const SliceId sliceId) const { assert(valid(anchorId)); - assert(shared->slots[anchorId].anchor.state == Anchor::Readable); + assert(shared->slots[anchorId].anchor.reading()); assert(valid(sliceId)); return shared->slots[sliceId].slice; } @@ -158,7 +168,15 @@ Ipc::StoreMap::Anchor & Ipc::StoreMap::writeableEntry(const AnchorId anchorId) { assert(valid(anchorId)); - assert(shared->slots[anchorId].anchor.state == Anchor::Writeable); + assert(shared->slots[anchorId].anchor.writing()); + return shared->slots[anchorId].anchor; +} + +const Ipc::StoreMap::Anchor & +Ipc::StoreMap::readableEntry(const AnchorId anchorId) const +{ + assert(valid(anchorId)); + assert(shared->slots[anchorId].anchor.reading()); return shared->slots[anchorId].anchor; } @@ -169,9 +187,17 @@ Ipc::StoreMap::abortWriting(const sfileno fileno) debugs(54, 5, "aborting entry " << fileno << " for writing " << path); assert(valid(fileno)); Anchor &s = shared->slots[fileno].anchor; - assert(s.state == Anchor::Writeable); - freeChain(fileno, s, false); - debugs(54, 5, "closed entry " << fileno << " for writing " << path); + assert(s.writing()); + s.lock.appending = false; // locks out any new readers + if (!s.lock.readers) { + freeChain(fileno, s, false); + debugs(54, 5, "closed clean entry " << fileno << " for writing " << path); + } else { + s.waitingToBeFreed = true; + // XXX: s.state &= !Anchor::Writeable; + s.lock.unlockExclusive(); + debugs(54, 5, "closed dirty entry " << fileno << " for writing " << path); + } } void @@ -183,7 +209,7 @@ Ipc::StoreMap::abortIo(const sfileno fileno) // The caller is a lock holder. Thus, if we are Writeable, then the // caller must be the writer; otherwise the caller must be the reader. - if (s.state == Anchor::Writeable) + if (s.writing()) abortWriting(fileno); else closeForReading(fileno); @@ -194,15 +220,11 @@ Ipc::StoreMap::peekAtReader(const sfileno fileno) const { assert(valid(fileno)); const Anchor &s = shared->slots[fileno].anchor; - switch (s.state) { - case Anchor::Readable: + if (s.reading()) return &s; // immediate access by lock holder so no locking - case Anchor::Writeable: - return NULL; // cannot read the slot when it is being written - case Anchor::Empty: - assert(false); // must be locked for reading or writing - } - assert(false); // not reachable + if (s.writing()) + return NULL; // the caller is not a read lock holder + assert(false); // must be locked for reading or writing return NULL; } @@ -220,13 +242,37 @@ Ipc::StoreMap::freeEntry(const sfileno fileno) s.waitingToBeFreed = true; // mark to free it later } +void +Ipc::StoreMap::freeEntryByKey(const cache_key *const key) +{ + debugs(54, 5, "marking entry with key " << storeKeyText(key) + << " to be freed in " << path); + + const int idx = anchorIndexByKey(key); + Anchor &s = shared->slots[idx].anchor; + if (s.lock.lockExclusive()) { + if (s.sameKey(key)) + freeChain(idx, s, true); + s.lock.unlockExclusive(); + } else if (s.lock.lockShared()) { + if (s.sameKey(key)) + s.waitingToBeFreed = true; // mark to free it later + s.lock.unlockShared(); + } else { + // we cannot be sure that the entry we found is ours because we do not + // have a lock on it, but we still check to minimize false deletions + if (s.sameKey(key)) + s.waitingToBeFreed = true; // mark to free it later + } +} + /// unconditionally frees an already locked chain of slots, unlocking if needed void Ipc::StoreMap::freeChain(const sfileno fileno, Anchor &inode, const bool keepLocked) { - debugs(54, 7, "freeing " << inode.state << " entry " << fileno << + debugs(54, 7, "freeing entry " << fileno << " in " << path); - if (inode.state != Anchor::Empty) { + if (!inode.empty()) { sfileno sliceId = inode.start; debugs(54, 8, "first slice " << sliceId); while (sliceId >= 0) { @@ -240,7 +286,7 @@ Ipc::StoreMap::freeChain(const sfileno fileno, Anchor &inode, const bool keepLoc } inode.waitingToBeFreed = false; - inode.state = Anchor::Empty; + inode.rewind(); if (!keepLocked) inode.lock.unlockExclusive(); @@ -278,7 +324,7 @@ Ipc::StoreMap::openForReadingAt(const sfileno fileno) return NULL; } - if (s.state == Anchor::Empty) { + if (s.empty()) { s.lock.unlockShared(); debugs(54, 7, "cannot open empty entry " << fileno << " for reading " << path); @@ -292,8 +338,6 @@ Ipc::StoreMap::openForReadingAt(const sfileno fileno) return NULL; } - // cannot be Writing here if we got shared lock and checked Empty above - assert(s.state == Anchor::Readable); debugs(54, 5, "opened entry " << fileno << " for reading " << path); return &s; } @@ -303,7 +347,7 @@ Ipc::StoreMap::closeForReading(const sfileno fileno) { assert(valid(fileno)); Anchor &s = shared->slots[fileno].anchor; - assert(s.state == Anchor::Readable); + assert(s.reading()); s.lock.unlockShared(); debugs(54, 5, "closed entry " << fileno << " for reading " << path); } @@ -320,7 +364,7 @@ Ipc::StoreMap::purgeOne() assert(valid(fileno)); Anchor &s = shared->slots[fileno].anchor; if (s.lock.lockExclusive()) { - if (s.state == Anchor::Readable) { // skip empties + if (!s.empty()) { // this entry may be marked for deletion, and that is OK freeChain(fileno, s, false); debugs(54, 5, "purged entry " << fileno << " from " << path); @@ -392,7 +436,7 @@ Ipc::StoreMap::anchorByKey(const cache_key *const key) /* Ipc::StoreMapAnchor */ -Ipc::StoreMapAnchor::StoreMapAnchor(): start(0), state(Empty) +Ipc::StoreMapAnchor::StoreMapAnchor(): start(0) { memset(&key, 0, sizeof(key)); memset(&basics, 0, sizeof(basics)); @@ -415,7 +459,7 @@ Ipc::StoreMapAnchor::sameKey(const cache_key *const aKey) const void Ipc::StoreMapAnchor::set(const StoreEntry &from) { - assert(state == Writeable); + assert(writing() && !reading()); memcpy(key, from.key, sizeof(key)); basics.timestamp = from.timestamp; basics.lastref = from.lastref; @@ -429,7 +473,7 @@ Ipc::StoreMapAnchor::set(const StoreEntry &from) void Ipc::StoreMapAnchor::rewind() { - assert(state == Writeable); + assert(writing()); start = 0; memset(&key, 0, sizeof(key)); memset(&basics, 0, sizeof(basics)); diff --git a/src/ipc/StoreMap.h b/src/ipc/StoreMap.h index 995176e421..3cb26d5599 100644 --- a/src/ipc/StoreMap.h +++ b/src/ipc/StoreMap.h @@ -12,13 +12,16 @@ namespace Ipc typedef int32_t StoreMapSliceId; /// a piece of Store entry, linked to other pieces, forming a chain +/// slices may be appended by writers while readers read the entry class StoreMapSlice { public: - StoreMapSlice(): next(-1), size(0) {} + typedef uint32_t Size; - StoreMapSliceId next; ///< ID of the next slice occupied by the entry - uint32_t size; ///< slice contents size + StoreMapSlice(): size(0), next(-1) {} + + Atomic::WordT size; ///< slice contents size + Atomic::WordT next; ///< ID of the next entry slice }; @@ -40,10 +43,19 @@ public: /// undo the effects of set(), setKey(), etc., but keep locks and state void rewind(); + /* entry state may change immediately after calling these methods unless + * the caller holds an appropriate lock */ + bool empty() const { return !key[0] && !key[1]; } + bool reading() const { return lock.readers; } + bool writing() const { return lock.writers; } + bool complete() const { return !empty() && !writing(); } + public: mutable ReadWriteLock lock; ///< protects slot data below Atomic::WordT waitingToBeFreed; ///< may be accessed w/o a lock + // fields marked with [app] can be modified when appending-while-reading + uint64_t key[2]; ///< StoreEntry key // STORE_META_STD TLV field from StoreEntry @@ -52,13 +64,15 @@ public: time_t lastref; time_t expires; time_t lastmod; - uint64_t swap_file_sz; + uint64_t swap_file_sz; // [app]; XXX: make atomic uint16_t refcount; uint16_t flags; } basics; - StoreMapSliceId start; ///< where the chain of StoreEntry slices begins + /// where the chain of StoreEntry slices begins [app]; XXX: make atomic + StoreMapSliceId start; +#if 0 /// possible persistent states typedef enum { Empty, ///< ready for writing, with nothing of value @@ -66,6 +80,7 @@ public: Readable, ///< ready for reading } State; State state; ///< current state +#endif }; /// A hack to allocate one shared array for both anchors and slices. @@ -126,6 +141,8 @@ public: /// locks and returns an anchor for the empty fileno position; if /// overwriteExisting is false and the position is not empty, returns nil Anchor *openForWritingAt(sfileno fileno, bool overwriteExisting = true); + /// restrict opened for writing entry to appending operations; allow reads + void startAppending(const sfileno fileno); /// successfully finish creating or updating the entry at fileno pos void closeForWriting(const sfileno fileno, bool lockForReading = false); /// unlock and "forget" openForWriting entry, making it Empty again @@ -135,9 +152,11 @@ public: /// only works on locked entries; returns nil unless the slice is readable const Anchor *peekAtReader(const sfileno fileno) const; - /// if possible, free the entry and return true - /// otherwise mark it as waiting to be freed and return false + /// free the entry if possible or mark it as waiting to be freed if not void freeEntry(const sfileno fileno); + /// free the entry if possible or mark it as waiting to be freed if not + /// does nothing if we cannot check that the key matches the cached entry + void freeEntryByKey(const cache_key *const key); /// opens entry (identified by key) for reading, increments read level const Anchor *openForReading(const cache_key *const key, sfileno &fileno); @@ -152,6 +171,8 @@ public: const Slice &readableSlice(const AnchorId anchorId, const SliceId sliceId) const; /// writeable anchor for the entry created by openForWriting() Anchor &writeableEntry(const AnchorId anchorId); + /// readable anchor for the entry created by openForReading() + const Anchor &readableEntry(const AnchorId anchorId) const; /// called by lock holder to terminate either slice writing or reading void abortIo(const sfileno fileno); diff --git a/src/store.cc b/src/store.cc index 7b86662a7d..24f707fee8 100644 --- a/src/store.cc +++ b/src/store.cc @@ -523,7 +523,7 @@ StoreEntry::purgeMem() debugs(20, 3, "StoreEntry::purgeMem: Freeing memory-copy of " << getMD5Text()); - destroyMemObject(); + Store::Root().memoryUnlink(*this); if (swap_status != SWAPOUT_DONE) release(); @@ -1279,12 +1279,11 @@ StoreEntry::release() return; } + Store::Root().memoryUnlink(*this); + if (StoreController::store_dirs_rebuilding && swap_filen > -1) { setPrivateKey(); - if (mem_obj) - destroyMemObject(); - if (swap_filen > -1) { /* * Fake a call to StoreEntry->lock() When rebuilding is done, @@ -1312,7 +1311,6 @@ StoreEntry::release() unlink(); } - setMemStatus(NOT_IN_MEMORY); destroyStoreEntry(static_cast(this)); PROF_stop(storeRelease); } diff --git a/src/store_dir.cc b/src/store_dir.cc index 6e564643d3..dccf60fcbb 100644 --- a/src/store_dir.cc +++ b/src/store_dir.cc @@ -811,6 +811,34 @@ StoreController::get(String const key, STOREGETCLIENT aCallback, void *aCallback fatal("not implemented"); } +/// updates the collapsed entry with the corresponding on-disk entry, if any +bool +StoreController::anchorCollapsedOnDisk(StoreEntry &collapsed) +{ + // TODO: move this loop to StoreHashIndex, just like the one in get(). + if (const int cacheDirs = Config.cacheSwap.n_configured) { + // ask each cache_dir until the entry is found; use static starting + // point to avoid asking the same subset of disks more often + // TODO: coordinate with put() to be able to guess the right disk often + static int idx = 0; + for (int n = 0; n < cacheDirs; ++n) { + idx = (idx + 1) % cacheDirs; + SwapDir *sd = dynamic_cast(INDEXSD(idx)); + if (!sd->active()) + continue; + + if (sd->anchorCollapsed(collapsed)) { + debugs(20, 3, "cache_dir " << idx << " anchors " << collapsed); + return true; + } + } + } + + debugs(20, 4, "none of " << Config.cacheSwap.n_configured << + " cache_dirs have " << collapsed); + return false; +} + // move this into [non-shared] memory cache class when we have one /// whether e should be kept in local RAM for possible future caching bool @@ -845,6 +873,18 @@ StoreController::maybeTrimMemory(StoreEntry &e, const bool preserveSwappable) e.trimMemory(preserveSwappable); } +void +StoreController::memoryUnlink(StoreEntry &e) +{ + if (e.mem_status != IN_MEMORY) + return; + + if (memStore) + memStore->unlink(e); + else // TODO: move into [non-shared] memory cache class when we have one + e.destroyMemObject(); +} + void StoreController::handleIdleEntry(StoreEntry &e) { @@ -893,6 +933,33 @@ StoreController::allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, debugs(20, 3, "may " << (transients ? "SMP" : "") << " collapse " << *e); } +void +StoreController::syncCollapsed(const cache_key *key) +{ + StoreEntry *collapsed = swapDir->get(key); + if (!collapsed) // the entry is no longer locally active, ignore the update + return; + + debugs(20, 7, "syncing " << *collapsed); + + bool inSync = false; + if (memStore && collapsed->mem_status == IN_MEMORY) + inSync = memStore->updateCollapsed(*collapsed); + else if (collapsed->swap_filen >= 0) + inSync = collapsed->store()->updateCollapsed(*collapsed); + else if (memStore && memStore->anchorCollapsed(*collapsed)) + inSync = true; // found in the memory cache + else if (anchorCollapsedOnDisk(*collapsed)) + inSync = true; // found in a disk cache + + if (inSync) { + debugs(20, 5, "synced " << *collapsed); + collapsed->invokeHandlers(); + } else { // the backing entry is no longer cached; abort this hit + debugs(20, 3, "aborting cacheless " << *collapsed); + collapsed->abort(); + } +} StoreHashIndex::StoreHashIndex() {