]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Support "appending" read/write lock state that can be shared by readers
authorAlex Rousskov <rousskov@measurement-factory.com>
Fri, 7 Jun 2013 23:34:36 +0000 (17:34 -0600)
committerAlex Rousskov <rousskov@measurement-factory.com>
Fri, 7 Jun 2013 23:34:36 +0000 (17:34 -0600)
and writer. Writer promises not to update key metadata (except growing
object size and next pointers) and readers promise to be careful when
reading growing slices.

Support copying of partially cached entries from the shared memory cache to
local RAM. This is required for collapsed shared memory hits to receive new
data during broadcasted updates.

Properly unlock objects in the shared memory cache when their entries are
abandoned by a worker. This was not necessary before because we never locked
memory cache entries for more than a single method call. Now, with partially
cached entries support, the locks may persist much longer.

Properly delete objects from the shared memory cache when they are purged by a
worker. Before this change, locally purged objects may have stayed in the
shared memory cache.

Update disk cache index _after_ the changes are written to disk. Another
worker may be using that index and will expect to find the indexed slices on
disk. Disk queues are not FIFOs across workers.

Made CollapsedForwarding work better in non-SMP mode.

Polished broadcasting code. We need to broadcast entry key because the entry
may not have any other information (it may no longer be cached by the sender,
for example).

Implemented "anchoring" in-transit entries when the writer caches the
corresponding object. This allows the reader's entry object to reflect its
cached status and, hence, be able to ask for cached data during broadcasted
entry updates. Still need to handle the case where the writer does not cache
the object (by aborting collapsed hit).

21 files changed:
src/CollapsedForwarding.cc
src/CollapsedForwarding.h
src/MemObject.cc
src/MemObject.h
src/MemStore.cc
src/MemStore.h
src/Store.h
src/SwapDir.h
src/Transients.cc
src/fs/rock/RockIoRequests.cc
src/fs/rock/RockIoRequests.h
src/fs/rock/RockIoState.cc
src/fs/rock/RockIoState.h
src/fs/rock/RockSwapDir.cc
src/fs/rock/RockSwapDir.h
src/ipc/ReadWriteLock.cc
src/ipc/ReadWriteLock.h
src/ipc/StoreMap.cc
src/ipc/StoreMap.h
src/store.cc
src/store_dir.cc

index 657e6d6c0926edc4f75496ecfb9deeb249397e16..d1b108828dfbd207fa2486a8b5c14226bf663e03 100644 (file)
@@ -9,8 +9,10 @@
 #include "ipc/Port.h"
 #include "ipc/TypedMsgHdr.h"
 #include "CollapsedForwarding.h"
-#include "SquidConfig.h"
 #include "globals.h"
+#include "SquidConfig.h"
+#include "Store.h"
+#include "store_key_md5.h"
 #include "tools.h"
 
 /// shared memory segment path to use for CollapsedForwarding queue
@@ -25,11 +27,11 @@ std::auto_ptr<CollapsedForwarding::Queue> CollapsedForwarding::queue;
 class CollapsedForwardingMsg
 {
 public:
-    CollapsedForwardingMsg(): processId(-1) {}
+    CollapsedForwardingMsg(): sender(-1) { key[0] = key[1] = 0; }
 
 public:
-    int processId; /// ID of sending process
-    // XXX: add entry info
+    int sender; /// kid ID of sending process
+    uint64_t key[2]; ///< StoreEntry key
 };
 
 // CollapsedForwarding
@@ -38,21 +40,26 @@ void
 CollapsedForwarding::Init()
 {
     Must(!queue.get());
-    queue.reset(new Queue(ShmLabel, KidIdentifier));
+    if (UsingSmp() && IamWorkerProcess())
+        queue.reset(new Queue(ShmLabel, KidIdentifier));
 }
 
 void
-CollapsedForwarding::NewData(const StoreIOState &sio)
+CollapsedForwarding::Broadcast(const cache_key *key)
 {
+    if (!queue.get())
+        return;
+
     CollapsedForwardingMsg msg;
-    msg.processId = KidIdentifier;
-    // XXX: copy data from sio
+    msg.sender = KidIdentifier;
+    memcpy(msg.key, key, sizeof(msg.key));
+
+    debugs(17, 5, storeKeyText(key) << " to " << Config.workers << "-1 workers");
 
     // TODO: send only to workers who are waiting for data
-    // XXX: does not work for non-daemon mode?
     for (int workerId = 1; workerId <= Config.workers; ++workerId) {
         try {
-            if (queue->push(workerId, msg))
+            if (workerId != KidIdentifier && queue->push(workerId, msg))
                 Notify(workerId);
         } catch (const Queue::Full &) {
             debugs(17, DBG_IMPORTANT, "Worker collapsed forwarding push queue "
@@ -66,7 +73,7 @@ void
 CollapsedForwarding::Notify(const int workerId)
 {
     // TODO: Count and report the total number of notifications, pops, pushes.
-    debugs(17, 7, HERE << "kid" << workerId);
+    debugs(17, 7, "to kid" << workerId);
     Ipc::TypedMsgHdr msg;
     // TODO: add proper message type?
     msg.setType(Ipc::mtCollapsedForwardingNotification);
@@ -78,18 +85,19 @@ CollapsedForwarding::Notify(const int workerId)
 void
 CollapsedForwarding::HandleNewData(const char *const when)
 {
-    debugs(17, 4, HERE << "popping all " << when);
+    debugs(17, 4, "popping all " << when);
     CollapsedForwardingMsg msg;
     int workerId;
     int poppedCount = 0;
     while (queue->pop(workerId, msg)) {
-        debugs(17, 3, HERE << "collapsed forwarding data message from " <<
-               workerId);
-        if (workerId != msg.processId) {
-            debugs(17, DBG_IMPORTANT, HERE << "mismatching IDs: " << workerId <<
-                   " != " << msg.processId);
+        debugs(17, 3, "message from kid" << workerId);
+        if (workerId != msg.sender) {
+            debugs(17, DBG_IMPORTANT, "mismatching kid IDs: " << workerId <<
+                   " != " << msg.sender);
         }
 
+        Store::Root().syncCollapsed(reinterpret_cast<const cache_key*>(msg.key));
+
         // XXX: stop and schedule an async call to continue
         assert(++poppedCount < SQUID_MAXFD);
     }
@@ -100,6 +108,7 @@ CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg)
 {
     const int from = msg.getInt();
     debugs(17, 7, HERE << "from " << from);
+    assert(queue.get());
     queue->clearReaderSignal(from);
     HandleNewData("after notification");
 }
@@ -132,8 +141,7 @@ void CollapsedForwardingRr::create(const RunnerRegistry &)
 
 void CollapsedForwardingRr::open(const RunnerRegistry &)
 {
-    if (IamWorkerProcess())
-        CollapsedForwarding::Init();
+    CollapsedForwarding::Init();
 }
 
 CollapsedForwardingRr::~CollapsedForwardingRr()
index e183802f444ad7b5aee3683000b7c18f837fe029..6af07a82cf674594c50775c90824bedeb058e28f 100644 (file)
@@ -8,6 +8,7 @@
 
 #include "ipc/Queue.h"
 #include "ipc/forward.h"
+#include "typedefs.h"
 
 #include <memory>
 
@@ -20,9 +21,12 @@ public:
     /// open shared memory segment
     static void Init();
 
-    /// notify other workers that new data is available
+    /// XXX: remove
     static void NewData(const StoreIOState &sio);
 
+    /// notify other workers that new data is available
+    static void Broadcast(const cache_key *key);
+
     /// kick worker with empty IPC queue
     static void Notify(const int workerId);
 
index 7e403697a4d693561de3c1fa50ab12ba97136c30..81884c0b93ade7536dae7ae1040dc19e389cdcb1 100644 (file)
@@ -83,7 +83,7 @@ MemObject::resetUrls(char const *aUrl, char const *aLog_url)
     url = xstrdup(aUrl);
 }
 
-MemObject::MemObject(char const *aUrl, char const *aLog_url)
+MemObject::MemObject(char const *aUrl, char const *aLog_url): mem_index(-1)
 {
     debugs(20, 3, HERE << "new MemObject " << this);
     _reply = new HttpReply;
@@ -115,6 +115,7 @@ MemObject::~MemObject()
     assert(chksum == url_checksum(url));
 #endif
 
+    assert(mem_index < 0);
     if (!shutting_down)
         assert(swapout.sio == NULL);
 
index a4548b0fb4ae302d482f20f83d38eeedc87cd53a..cee4c9e3e270edead6fb4f90a02a05c3f47f9b4c 100644 (file)
@@ -139,6 +139,7 @@ public:
     } abort;
     char *log_url;
     RemovalPolicyNode repl;
+    int32_t mem_index; ///< entry position inside the [shared] memory cache
     int id;
     int64_t object_sz;
     size_t swap_hdr_sz;
index ea4ee2d3fcd63c8b7cab0e8252e796a2a33f176b..8a9d96998f5f2918f6e7494bb6cdea2f352fa0e5 100644 (file)
@@ -5,6 +5,7 @@
 
 #include "squid.h"
 #include "base/RunnersRegistry.h"
+#include "CollapsedForwarding.h"
 #include "HttpReply.h"
 #include "ipc/mem/Page.h"
 #include "ipc/mem/Pages.h"
@@ -186,34 +187,26 @@ MemStore::get(const cache_key *key)
     if (!slot)
         return NULL;
 
-    const Ipc::StoreMapAnchor::Basics &basics = slot->basics;
-
     // create a brand new store entry and initialize it with stored info
     StoreEntry *e = new StoreEntry();
     e->lock_count = 0;
 
-    e->swap_file_sz = basics.swap_file_sz;
-    e->lastref = basics.lastref;
-    e->timestamp = basics.timestamp;
-    e->expires = basics.expires;
-    e->lastmod = basics.lastmod;
-    e->refcount = basics.refcount;
-    e->flags = basics.flags;
-
-    e->store_status = STORE_OK;
-    e->mem_status = IN_MEMORY; // setMemStatus(IN_MEMORY) requires mem_obj
-    //e->swap_status = set in StoreEntry constructor to SWAPOUT_NONE;
-    e->ping_status = PING_NONE;
+    // XXX: We do not know the URLs yet, only the key, but we need to parse and
+    // store the response for the Root().get() callers to be happy because they
+    // expect IN_MEMORY entries to already have the response headers and body.
+    // At least one caller calls createMemObject() if there is not one, so
+    // we hide the true object until that happens (to avoid leaking TBD URLs).
+    e->createMemObject("TBD", "TBD");
 
-    EBIT_SET(e->flags, ENTRY_CACHABLE);
-    EBIT_CLR(e->flags, RELEASE_REQUEST);
-    EBIT_CLR(e->flags, KEY_PRIVATE);
-    EBIT_SET(e->flags, ENTRY_VALIDATED);
+    anchorEntry(*e, index, *slot);
 
     const bool copied = copyFromShm(*e, index, *slot);
 
     // we copied everything we could to local memory; no more need to lock
     map->closeForReading(index);
+    e->mem_obj->mem_index = -1;
+
+    e->hideMemObject();
 
     if (copied) {
         e->hashInsert(key);
@@ -232,34 +225,129 @@ MemStore::get(String const key, STOREGETCLIENT aCallback, void *aCallbackData)
     fatal("MemStore::get(key,callback,data) should not be called");
 }
 
+bool
+MemStore::anchorCollapsed(StoreEntry &collapsed)
+{
+    if (!map)
+        return false;
+
+    sfileno index;
+    const Ipc::StoreMapAnchor *const slot = map->openForReading(
+        reinterpret_cast<cache_key*>(collapsed.key), index);
+    if (!slot)
+        return false;
+
+    anchorEntry(collapsed, index, *slot);
+    return updateCollapsedWith(collapsed, index, *slot);
+}
+
+bool
+MemStore::updateCollapsed(StoreEntry &collapsed)
+{
+    if (!map)
+        return false;
+
+    if (collapsed.mem_status != IN_MEMORY) // no longer using a memory cache
+        return false;
+
+    const sfileno index = collapsed.mem_obj->mem_index; 
+
+    // already disconnected from the cache, no need to update
+    if (index < 0) 
+        return true;
+
+    const Ipc::StoreMapAnchor &anchor = map->readableEntry(index);
+    return updateCollapsedWith(collapsed, index, anchor);
+}
+
+bool
+MemStore::updateCollapsedWith(StoreEntry &collapsed, const sfileno index, const Ipc::StoreMapAnchor &anchor)
+{
+    collapsed.swap_file_sz = anchor.basics.swap_file_sz; // XXX: make atomic
+
+    const bool copied = copyFromShm(collapsed, index, anchor);
+
+    return copied; // XXX: when do we unlock the map slot?
+}
+
+/// anchors StoreEntry to an already locked map entry
+void
+MemStore::anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor)
+{
+    const Ipc::StoreMapAnchor::Basics &basics = anchor.basics;
+
+    e.swap_file_sz = basics.swap_file_sz;
+    e.lastref = basics.lastref;
+    e.timestamp = basics.timestamp;
+    e.expires = basics.expires;
+    e.lastmod = basics.lastmod;
+    e.refcount = basics.refcount;
+    e.flags = basics.flags;
+
+    assert(e.mem_obj);
+    e.store_status = STORE_OK;
+    e.setMemStatus(IN_MEMORY);
+    e.mem_obj->mem_index = index;
+    assert(e.swap_status == SWAPOUT_NONE); // set in StoreEntry constructor
+    e.ping_status = PING_NONE;
+
+    EBIT_SET(e.flags, ENTRY_CACHABLE);
+    EBIT_CLR(e.flags, RELEASE_REQUEST);
+    EBIT_CLR(e.flags, KEY_PRIVATE);
+    EBIT_SET(e.flags, ENTRY_VALIDATED);
+}
+
 /// copies the entire entry from shared to local memory
 bool
 MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor)
 {
     debugs(20, 7, "mem-loading entry " << index << " from " << anchor.start);
 
-    // XXX: We do not know the URLs yet, only the key, but we need to parse and
-    // store the response for the Root().get() callers to be happy because they
-    // expect IN_MEMORY entries to already have the response headers and body.
-    // At least one caller calls createMemObject() if there is not one, so
-    // we hide the true object until that happens (to avoid leaking TBD URLs).
-    e.createMemObject("TBD", "TBD");
-
     // emulate the usual Store code but w/o inapplicable checks and callbacks:
 
-    Ipc::StoreMapSliceId sid = anchor.start;
-    int64_t offset = 0;
+    Ipc::StoreMapSliceId sid = anchor.start; // optimize: remember the last sid
+    bool wasEof = anchor.complete() && sid < 0;
+    int64_t sliceOffset = 0;
     while (sid >= 0) {
         const Ipc::StoreMapSlice &slice = map->readableSlice(index, sid);
-        const MemStoreMap::Extras &extras = map->extras(sid);
-        StoreIOBuffer sliceBuf(slice.size, offset,
-                               static_cast<char*>(PagePointer(extras.page)));
-        if (!copyFromShmSlice(e, sliceBuf, slice.next < 0))
-            return false;
-        debugs(20, 9, "entry " << index << " slice " << sid << " filled " <<
-               extras.page);
-        offset += slice.size;
-        sid = slice.next;
+        // slice state may change during copying; take snapshots now
+        wasEof = anchor.complete() && slice.next < 0;
+        const Ipc::StoreMapSlice::Size wasSize = slice.size;
+       if (e.mem_obj->endOffset() < sliceOffset + wasSize) {
+            // size of the slice data that we already copied
+            const size_t prefixSize = e.mem_obj->endOffset() - sliceOffset;
+            assert(prefixSize <= wasSize);
+
+            const MemStoreMap::Extras &extras = map->extras(sid);
+            char *page = static_cast<char*>(PagePointer(extras.page));
+            const StoreIOBuffer sliceBuf(wasSize - prefixSize,
+                                         e.mem_obj->endOffset(),
+                                         page + prefixSize);
+            if (!copyFromShmSlice(e, sliceBuf, wasEof))
+                return false;
+            debugs(20, 9, "entry " << index << " copied slice " << sid <<
+                   " from " << extras.page << " +" << prefixSize);
+        }
+        // else skip a [possibly incomplete] slice that we copied earlier
+
+        // careful: the slice may have grown _and_ gotten the next slice ID!
+        if (slice.next >= 0) {
+            assert(!wasEof);
+            // here we know that slice.size may not change any more
+            if (wasSize >= slice.size) { // did not grow since we started copying
+                sliceOffset += wasSize;
+                sid = slice.next;
+                       }
+        } else if (wasSize >= slice.size) { // did not grow
+            break;
+        }
+    }
+
+    if (!wasEof) {
+        debugs(20, 7, "mem-loaded " << e.mem_obj->endOffset() << '/' <<
+               anchor.basics.swap_file_sz << " bytes of " << e);
+        return true;
     }
 
     e.mem_obj->object_sz = e.mem_obj->endOffset(); // from StoreEntry::complete()
@@ -269,15 +357,13 @@ MemStore::copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnc
     assert(static_cast<uint64_t>(e.mem_obj->object_sz) == anchor.basics.swap_file_sz);
     // would be nice to call validLength() here, but it needs e.key
 
-
-    e.hideMemObject();
-
+    // XXX: unlock acnhor here!
     return true;
 }
 
 /// imports one shared memory slice into local memory
 bool
-MemStore::copyFromShmSlice(StoreEntry &e, StoreIOBuffer &buf, bool eof)
+MemStore::copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof)
 {
     debugs(20, 7, "buf: " << buf.offset << " + " << buf.length);
 
@@ -392,6 +478,7 @@ MemStore::keep(StoreEntry &e)
         if (copyToShm(e, index, *slot)) {
             slot->set(e);
             map->closeForWriting(index, false);
+            CollapsedForwarding::Broadcast(static_cast<const cache_key*>(e.key));
             return;
         }
         // fall through to the error handling code
@@ -403,6 +490,7 @@ MemStore::keep(StoreEntry &e)
     }
 
     map->abortIo(index);
+    CollapsedForwarding::Broadcast(static_cast<cache_key*>(e.key));
 }
 
 /// copies all local data to shared memory
@@ -534,6 +622,29 @@ MemStore::noteFreeMapSlice(const sfileno sliceId)
     }
 }
 
+void
+MemStore::unlink(StoreEntry &e)
+{
+    assert(e.mem_obj);
+    if (e.mem_obj->mem_index >= 0) {
+        map->freeEntry(e.mem_obj->mem_index);
+        disconnect(e);
+    } else {
+        map->freeEntryByKey(reinterpret_cast<cache_key*>(e.key));
+    }
+    e.destroyMemObject();
+}
+
+void
+MemStore::disconnect(StoreEntry &e)
+{
+    assert(e.mem_obj);
+    if (e.mem_obj->mem_index >= 0) {
+        map->abortIo(e.mem_obj->mem_index);
+        e.mem_obj->mem_index = -1;
+    }
+}
+
 /// calculates maximum number of entries we need to store and map
 int64_t
 MemStore::EntryLimit()
index 02fc2188c8ebd351a0d819cb2c14894ec24ff4d0..ed4d2a38f9830a82edd05072ea63820ee567c071 100644 (file)
@@ -26,6 +26,12 @@ public:
     /// whether e should be kept in local RAM for possible future caching
     bool keepInLocalMemory(const StoreEntry &e) const;
 
+    /// remove from the cache
+    void unlink(StoreEntry &e);
+
+    /// called when the entry is about to forget its association with mem cache
+    void disconnect(StoreEntry &e);
+
     /* Store API */
     virtual int callback();
     virtual StoreEntry * get(const cache_key *);
@@ -42,6 +48,8 @@ public:
     virtual void reference(StoreEntry &);
     virtual bool dereference(StoreEntry &, bool);
     virtual void maintain();
+    virtual bool anchorCollapsed(StoreEntry &collapsed);
+    virtual bool updateCollapsed(StoreEntry &collapsed);
 
     static int64_t EntryLimit();
 
@@ -51,7 +59,10 @@ protected:
     bool copyToShm(StoreEntry &e, const sfileno index, Ipc::StoreMapAnchor &anchor);
     bool copyToShmSlice(StoreEntry &e, const sfileno index, Ipc::StoreMapAnchor &anchor, int64_t &offset);
     bool copyFromShm(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor);
-    bool copyFromShmSlice(StoreEntry &e, StoreIOBuffer &buf, bool eof);
+    bool copyFromShmSlice(StoreEntry &e, const StoreIOBuffer &buf, bool eof);
+
+    void anchorEntry(StoreEntry &e, const sfileno index, const Ipc::StoreMapAnchor &anchor);
+    bool updateCollapsedWith(StoreEntry &collapsed, const sfileno index, const Ipc::StoreMapAnchor &anchor);
 
     sfileno reserveSapForWriting(Ipc::Mem::PageId &page);
 
index f63e8f3a1d8616a84e791a241e14b1c609bde883..53dbaa77cedd59364ed5847cbf79126b5dbe388d 100644 (file)
@@ -374,6 +374,22 @@ public:
     /// makes the entry available for collapsing future requests
     virtual void allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod) {}
 
+    // XXX: This method belongs to Store::Root/StoreController, but it is here
+    // to avoid casting Root() to StoreController until Root() API is fixed.
+    /// Update local intransit entry after changes made by appending worker.
+    virtual void syncCollapsed(const cache_key *key) {}
+
+    // XXX: This method belongs to Store::Root/StoreController, but it is here
+    // to avoid casting Root() to StoreController until Root() API is fixed.
+    /// removes the entry from the memory cache
+    virtual void memoryUnlink(StoreEntry &e) {}
+
+    /// if the entry is found, tie it to this cache and call updateCollapsed()
+    virtual bool anchorCollapsed(StoreEntry &collapsed) { return false; }
+
+    /// update a local collapsed entry with fresh info from this cache (if any)
+    virtual bool updateCollapsed(StoreEntry &collapsed) { return false; }
+
 private:
     static RefCount<Store> CurrentRoot;
 };
index 2d93de43e46bc0026419c0d57d8068c3c9b93db1..ca151b5d58c7a45a0a7d07a31f61a3e433a35102 100644 (file)
@@ -63,7 +63,9 @@ public:
     /* Store parent API */
     virtual void handleIdleEntry(StoreEntry &e);
     virtual void maybeTrimMemory(StoreEntry &e, const bool preserveSwappable);
+    virtual void memoryUnlink(StoreEntry &e);
     virtual void allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod);
+    virtual void syncCollapsed(const cache_key *key);
 
     virtual void init();
 
@@ -96,6 +98,7 @@ public:
 private:
     void createOneStore(Store &aStore);
     bool keepForLocalMemoryCache(const StoreEntry &e) const;
+    bool anchorCollapsedOnDisk(StoreEntry &collapsed);
 
     StorePointer swapDir; ///< summary view of all disk caches
     MemStore *memStore; ///< memory cache
index de8344decb5745faaead959abb67bd1dfa1adac4..0b1573a75aa4da02ad18d0e30cc59e1461941a9c 100644 (file)
@@ -175,8 +175,8 @@ Transients::get(const cache_key *key)
     // neighbors_do_private_keys (which is true in most cases and by default).
     // This is nothing but waste of CPU cycles. Need a better API to avoid it.
     e->setPublicKey();
+    assert(e->key);
 
-    assert(e->next); // e->hashInsert(key) is done in setPublicKey()
     return e;
 }
 
index cf84b7b6d5b1b2306e36cb416a8bc303da738b37..25c08d451c38199932e553fe722050b2d9794552 100644 (file)
@@ -16,10 +16,10 @@ Rock::ReadRequest::ReadRequest(const ::ReadRequest &base,
 }
 
 Rock::WriteRequest::WriteRequest(const ::WriteRequest &base,
-                                 const IoState::Pointer &anSio,
-                                 const bool last):
+                                 const IoState::Pointer &anSio):
         ::WriteRequest(base),
         sio(anSio),
-        isLast(last)
+        sidCurrent(-1),
+        sidNext(-1)
 {
 }
index 29839fc84acd5c732eea85e02b09ebd9cc436982..c985b4940b8a71d64bbebb93e4f9dacbf2c6c5a8 100644 (file)
@@ -25,9 +25,14 @@ private:
 class WriteRequest: public ::WriteRequest
 {
 public:
-    WriteRequest(const ::WriteRequest &base, const IoState::Pointer &anSio, const bool last);
+    WriteRequest(const ::WriteRequest &base, const IoState::Pointer &anSio);
     IoState::Pointer sio;
-    const bool isLast;
+
+    /// slot being written using this write request
+    SlotId sidCurrent;
+
+    /// allocated next slot (negative if we are writing the last slot)
+    SlotId sidNext;
 
 private:
     CBDATA_CLASS2(WriteRequest);
index c7ed4722c5b74aa1d3225d04655654e8993b4f6e..22c86dd9c8efe7b8187ada136def91a8d1db3925 100644 (file)
@@ -179,7 +179,7 @@ Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
             // write partial buffer for all collapsed hit readers to see
             // XXX: can we check that this is needed w/o stalling readers
             // that appear right after our check?
-            writeBufToDisk(false);
+            writeBufToDisk(-1);
         }
     }
 
@@ -221,12 +221,6 @@ Rock::IoState::writeToDisk(const SlotId sidNext)
     // TODO: if DiskIO module is mmap-based, we should be writing whole pages
     // to avoid triggering read-page;new_head+old_tail;write-page overheads
 
-    // finalize map slice
-    Ipc::StoreMap::Slice &slice =
-        dir->map->writeableSlice(swap_filen, sidCurrent);
-    slice.next = sidNext;
-    slice.size = theBuf.size - sizeof(DbCellHeader);
-
     // finalize db cell header
     DbCellHeader header;
     memcpy(header.key, e->key, sizeof(header.key));
@@ -239,7 +233,7 @@ Rock::IoState::writeToDisk(const SlotId sidNext)
     // copy finalized db cell header into buffer
     memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
 
-    writeBufToDisk(sidNext < 0);
+    writeBufToDisk(sidNext);
     theBuf.clear();
 
     sidCurrent = sidNext;
@@ -247,7 +241,7 @@ Rock::IoState::writeToDisk(const SlotId sidNext)
 
 /// Write header-less (XXX) or complete buffer to disk.
 void
-Rock::IoState::writeBufToDisk(const bool last)
+Rock::IoState::writeBufToDisk(const SlotId sidNext)
 {
     // and now allocate another buffer for the WriteRequest so that
     // we can support concurrent WriteRequests (and to ease cleaning)
@@ -262,7 +256,9 @@ Rock::IoState::writeBufToDisk(const bool last)
 
     WriteRequest *const r = new WriteRequest(
         ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
-            memFreeBufFunc(wBufCap)), this, last);
+            memFreeBufFunc(wBufCap)), this);
+    r->sidCurrent = sidCurrent;
+    r->sidNext = sidNext;
 
     // theFile->write may call writeCompleted immediatelly
     theFile->write(r);
index f22a736e0aeec8de58825aeec898615bc3c7c38a..29e02f5d5782950a8b3e08faa4b5bda79fc43a5a 100644 (file)
@@ -50,7 +50,7 @@ private:
     void tryWrite(char const *buf, size_t size, off_t offset);
     size_t writeToBuffer(char const *buf, size_t size);
     void writeToDisk(const SlotId nextSlot);
-    void writeBufToDisk(const bool last);
+    void writeBufToDisk(const SlotId nextSlot);
     SlotId reserveSlotForWriting();
     
     void callBack(int errflag);
index a732232fd718c11680aedbd1475a405d0104f687..f7a840d776b44a0600f2e963a344ac01548c59da 100644 (file)
@@ -69,28 +69,11 @@ Rock::SwapDir::get(const cache_key *key)
     if (!slot)
         return NULL;
 
-    const Ipc::StoreMapAnchor::Basics &basics = slot->basics;
-
     // create a brand new store entry and initialize it with stored basics
     StoreEntry *e = new StoreEntry();
     e->lock_count = 0;
-    e->swap_dirn = index;
-    e->swap_filen = filen;
-    e->swap_file_sz = basics.swap_file_sz;
-    e->lastref = basics.lastref;
-    e->timestamp = basics.timestamp;
-    e->expires = basics.expires;
-    e->lastmod = basics.lastmod;
-    e->refcount = basics.refcount;
-    e->flags = basics.flags;
-    e->store_status = STORE_OK;
-    e->setMemStatus(NOT_IN_MEMORY);
-    e->swap_status = SWAPOUT_DONE;
-    e->ping_status = PING_NONE;
-    EBIT_SET(e->flags, ENTRY_CACHABLE);
-    EBIT_CLR(e->flags, RELEASE_REQUEST);
-    EBIT_CLR(e->flags, KEY_PRIVATE);
-    EBIT_SET(e->flags, ENTRY_VALIDATED);
+    anchorEntry(*e, filen, *slot);
+
     e->hashInsert(key);
     trackReferences(*e);
 
@@ -98,6 +81,70 @@ Rock::SwapDir::get(const cache_key *key)
     // the disk entry remains open for reading, protected from modifications
 }
 
+bool
+Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed)
+{
+    if (!map || !theFile || !theFile->canRead())
+        return false;
+
+    sfileno filen;
+    const Ipc::StoreMapAnchor *const slot = map->openForReading(
+        reinterpret_cast<cache_key*>(collapsed.key), filen);
+    if (!slot)
+        return false;
+
+    anchorEntry(collapsed, filen, *slot);
+    return updateCollapsedWith(collapsed, *slot);
+}
+
+bool
+Rock::SwapDir::updateCollapsed(StoreEntry &collapsed)
+{
+    if (!map || !theFile || !theFile->canRead())
+        return false;
+
+    if (collapsed.swap_filen < 0) // no longer using a disk cache
+        return true;
+    assert(collapsed.swap_dirn == index);
+
+    const Ipc::StoreMapAnchor &s = map->readableEntry(collapsed.swap_filen);
+    return updateCollapsedWith(collapsed, s);
+}
+
+bool
+Rock::SwapDir::updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor)
+{
+    collapsed.swap_file_sz = anchor.basics.swap_file_sz; // XXX: make atomic
+    return true;
+}
+
+void
+Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor)
+{
+    const Ipc::StoreMapAnchor::Basics &basics = anchor.basics;
+
+    e.swap_file_sz = basics.swap_file_sz;
+    e.swap_dirn = index;
+    e.swap_filen = filen;
+    e.lastref = basics.lastref;
+    e.timestamp = basics.timestamp;
+    e.expires = basics.expires;
+    e.lastmod = basics.lastmod;
+    e.refcount = basics.refcount;
+    e.flags = basics.flags;
+
+    e.store_status = STORE_OK;
+    e.setMemStatus(NOT_IN_MEMORY);
+    e.swap_status = SWAPOUT_DONE;
+    e.ping_status = PING_NONE;
+
+    EBIT_SET(e.flags, ENTRY_CACHABLE);
+    EBIT_CLR(e.flags, RELEASE_REQUEST);
+    EBIT_CLR(e.flags, KEY_PRIVATE);
+    EBIT_SET(e.flags, ENTRY_VALIDATED);
+}
+
+
 void Rock::SwapDir::disconnect(StoreEntry &e)
 {
     assert(e.swap_dirn == index);
@@ -748,14 +795,16 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest
         return;
     }
 
-    // XXX: can we check that this is needed w/o stalling readers
-    // that appear right after our check?
-    if (Config.onoff.collapsed_forwarding)
-        CollapsedForwarding::NewData(sio);
-
     if (errflag == DISK_OK) {
         // do not increment sio.offset_ because we do it in sio->write()
-        if (request->isLast) {
+
+        // finalize the shared slice info after writing slice contents to disk
+        Ipc::StoreMap::Slice &slice =
+            map->writeableSlice(sio.swap_filen, request->sidCurrent);
+        slice.size = request->len - sizeof(DbCellHeader);
+        slice.next = request->sidNext;
+        
+        if (request->sidNext < 0) {
             // close, the entry gets the read lock
             map->closeForWriting(sio.swap_filen, true);
             sio.finishedWriting(errflag);
@@ -765,6 +814,8 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest
         sio.finishedWriting(errflag);
         // and hope that Core will call disconnect() to close the map entry
     }
+
+    CollapsedForwarding::Broadcast(static_cast<const cache_key*>(sio.e->key));
 }
 
 void
index 301d0f410e2d42f3871c4c370e9180cf08294c99..eba5ed6019b17ac099f15bf1d1f57684eea6393c 100644 (file)
@@ -65,6 +65,10 @@ public:
     uint64_t slotSize; ///< all db slots are of this size
 
 protected:
+    /* Store API */
+    virtual bool anchorCollapsed(StoreEntry &collapsed);
+    virtual bool updateCollapsed(StoreEntry &collapsed);
+
     /* protected ::SwapDir API */
     virtual bool needsDiskStrand() const;
     virtual void init();
@@ -107,6 +111,9 @@ protected:
     int entryMaxPayloadSize() const;
     int entriesNeeded(const int64_t objSize) const;
 
+    void anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor);
+    bool updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor);
+
     friend class Rebuild;
     friend class IoState;
     const char *filePath; ///< location of cache storage file inside path/
index 14ba1b45498b0d93c951d61ced8ed6d08444279b..4732df4e2b03abca4443ff33f6b34253a8adbfd1 100644 (file)
@@ -10,7 +10,7 @@ bool
 Ipc::ReadWriteLock::lockShared()
 {
     ++readers; // this locks "new" writers out
-    if (!writers) // there are no old writers
+    if (!writers || appending) // there are no old writers or sharing is OK
         return true;
     --readers;
     return false;
@@ -36,6 +36,7 @@ Ipc::ReadWriteLock::unlockShared()
 void
 Ipc::ReadWriteLock::unlockExclusive()
 {
+    appending = 0;
     assert(writers-- > 0);
 }
 
@@ -46,6 +47,13 @@ Ipc::ReadWriteLock::switchExclusiveToShared()
     unlockExclusive();
 }
 
+void
+Ipc::ReadWriteLock::startAppending()
+{
+    assert(writers > 0);
+    appending++;
+}
+
 void
 Ipc::ReadWriteLock::updateStats(ReadWriteLockStats &stats) const
 {
@@ -55,6 +63,7 @@ Ipc::ReadWriteLock::updateStats(ReadWriteLockStats &stats) const
     } else if (writers) {
         ++stats.writeable;
         stats.writers += writers;
+        stats.appenders += appending;
     } else {
         ++stats.idle;
     }
@@ -87,7 +96,9 @@ Ipc::ReadWriteLockStats::dump(StoreEntry &e) const
         const int locked = readers + writers;
         storeAppendPrintf(&e, "Readers:         %9d %6.2f%%\n",
                           readers, (100.0 * readers / locked));
-        storeAppendPrintf(&e, "Writers:         %9d %6.2f%%\n",
-                          writers, (100.0 * writers / locked));
+        const double appPerc = writers ? (100.0 * appenders / writers) : 0.0;
+        storeAppendPrintf(&e, "Writers:         %9d %6.2f%% including Appenders: %9d %6.2f%%\n",
+                          writers, (100.0 * writers / locked),
+                          appenders, appPerc);
     }
 }
index 925e02ca0df6eca8f089aa48afbfa75b99f20b77..5292cad2518026c0b9f17d84c1599a18f63b6ea9 100644 (file)
@@ -11,6 +11,9 @@ namespace Ipc
 class ReadWriteLockStats;
 
 /// an atomic readers-writer or shared-exclusive lock suitable for maps/tables
+/// Also supports reading-while-appending mode when readers and writer are
+/// allowed to access the same locked object because the writer promisses
+/// to only append new data and all size-related object properties are atomic.
 class ReadWriteLock
 {
 public:
@@ -22,12 +25,15 @@ public:
     void unlockExclusive(); ///< undo successful exclusiveLock()
     void switchExclusiveToShared(); ///< stop writing, start reading
 
+    void startAppending(); ///< writer keeps its lock but also allows reading
+
     /// adds approximate current stats to the supplied ones
     void updateStats(ReadWriteLockStats &stats) const;
 
 public:
     mutable Atomic::Word readers; ///< number of users trying to read
     Atomic::Word writers; ///< number of writers trying to modify protected data
+    Atomic::Word appending; ///< the writer has promissed to only append
 };
 
 /// approximate stats of a set of ReadWriteLocks
@@ -44,6 +50,7 @@ public:
     int idle; ///< number of unlocked locks
     int readers; ///< sum of lock.readers
     int writers; ///< sum of lock.writers
+    int appenders; ///< number of appending writers
 };
 
 } // namespace Ipc
index 4f992977e8344c0d1abfaa3827ee0c822ce78f68..01a44ce9f3d03691da1c29d64a111d9da67847b0 100644 (file)
@@ -39,7 +39,7 @@ Ipc::StoreMap::compareVersions(const sfileno fileno, time_t newVersion) const
 
     // note: we do not lock, so comparison may be inacurate
 
-    if (inode.state == Anchor::Empty)
+    if (inode.empty())
         return +2;
 
     if (const time_t diff = newVersion - inode.basics.timestamp)
@@ -54,14 +54,14 @@ Ipc::StoreMap::forgetWritingEntry(sfileno fileno)
     assert(valid(fileno));
     Anchor &inode = shared->slots[fileno].anchor;
 
-    assert(inode.state == Anchor::Writeable);
+    assert(inode.writing());
 
     // we do not iterate slices because we were told to forget about
     // them; the caller is responsible for freeing them (most likely
     // our slice list is incomplete or has holes)
 
     inode.waitingToBeFreed = false;
-    inode.state = Anchor::Empty;
+    inode.rewind();
 
     inode.lock.unlockExclusive();
     --shared->count;
@@ -91,10 +91,10 @@ Ipc::StoreMap::openForWritingAt(const sfileno fileno, bool overwriteExisting)
     ReadWriteLock &lock = s.lock;
 
     if (lock.lockExclusive()) {
-        assert(s.state != Anchor::Writeable); // until we start breaking locks
+        assert(s.writing() && !s.reading());
 
         // bail if we cannot empty this position
-        if (!s.waitingToBeFreed && s.state == Anchor::Readable && !overwriteExisting) {
+        if (!s.waitingToBeFreed && !s.empty() && !overwriteExisting) {
             lock.unlockExclusive();
             debugs(54, 5, "cannot open existing entry " << fileno <<
                    " for writing " << path);
@@ -102,12 +102,11 @@ Ipc::StoreMap::openForWritingAt(const sfileno fileno, bool overwriteExisting)
         }
 
         // free if the entry was used, keeping the entry locked
-        if (s.waitingToBeFreed || s.state == Anchor::Readable)
+        if (s.waitingToBeFreed || !s.empty())
             freeChain(fileno, s, true);
 
-        assert(s.state == Anchor::Empty);
+        assert(s.empty());
         ++shared->count;
-        s.state = Anchor::Writeable;
 
         //s.setKey(key); // XXX: the caller should do that
         debugs(54, 5, "opened entry " << fileno << " for writing " << path);
@@ -119,20 +118,31 @@ Ipc::StoreMap::openForWritingAt(const sfileno fileno, bool overwriteExisting)
     return NULL;
 }
 
+void
+Ipc::StoreMap::startAppending(const sfileno fileno)
+{
+    assert(valid(fileno));
+    Anchor &s = shared->slots[fileno].anchor;
+    assert(s.writing());
+    s.lock.startAppending();
+    debugs(54, 5, "restricted entry " << fileno << " to appending " << path);
+}
+
 void
 Ipc::StoreMap::closeForWriting(const sfileno fileno, bool lockForReading)
 {
     assert(valid(fileno));
     Anchor &s = shared->slots[fileno].anchor;
-    assert(s.state == Anchor::Writeable);
-    s.state = Anchor::Readable;
+    assert(s.writing());
     if (lockForReading) {
         s.lock.switchExclusiveToShared();
         debugs(54, 5, "switched entry " << fileno <<
                " from writing to reading " << path);
+        assert(s.complete());
     } else {
         s.lock.unlockExclusive();
         debugs(54, 5, "closed entry " << fileno << " for writing " << path);
+        // cannot assert completeness here because we have no lock
     }
 }
 
@@ -140,7 +150,7 @@ Ipc::StoreMap::Slice &
 Ipc::StoreMap::writeableSlice(const AnchorId anchorId, const SliceId sliceId)
 {
     assert(valid(anchorId));
-    assert(shared->slots[anchorId].anchor.state == Anchor::Writeable);
+    assert(shared->slots[anchorId].anchor.writing());
     assert(valid(sliceId));
     return shared->slots[sliceId].slice;
 }
@@ -149,7 +159,7 @@ const Ipc::StoreMap::Slice &
 Ipc::StoreMap::readableSlice(const AnchorId anchorId, const SliceId sliceId) const
 {
     assert(valid(anchorId));
-    assert(shared->slots[anchorId].anchor.state == Anchor::Readable);
+    assert(shared->slots[anchorId].anchor.reading());
     assert(valid(sliceId));
     return shared->slots[sliceId].slice;
 }
@@ -158,7 +168,15 @@ Ipc::StoreMap::Anchor &
 Ipc::StoreMap::writeableEntry(const AnchorId anchorId)
 {
     assert(valid(anchorId));
-    assert(shared->slots[anchorId].anchor.state == Anchor::Writeable);
+    assert(shared->slots[anchorId].anchor.writing());
+    return shared->slots[anchorId].anchor;
+}
+
+const Ipc::StoreMap::Anchor &
+Ipc::StoreMap::readableEntry(const AnchorId anchorId) const
+{
+    assert(valid(anchorId));
+    assert(shared->slots[anchorId].anchor.reading());
     return shared->slots[anchorId].anchor;
 }
 
@@ -169,9 +187,17 @@ Ipc::StoreMap::abortWriting(const sfileno fileno)
     debugs(54, 5, "aborting entry " << fileno << " for writing " << path);
     assert(valid(fileno));
     Anchor &s = shared->slots[fileno].anchor;
-    assert(s.state == Anchor::Writeable);
-    freeChain(fileno, s, false);
-    debugs(54, 5, "closed entry " << fileno << " for writing " << path);
+    assert(s.writing());
+    s.lock.appending = false; // locks out any new readers
+    if (!s.lock.readers) {
+        freeChain(fileno, s, false);
+        debugs(54, 5, "closed clean entry " << fileno << " for writing " << path);
+    } else {
+        s.waitingToBeFreed = true;
+        // XXX: s.state &= !Anchor::Writeable;
+        s.lock.unlockExclusive();
+        debugs(54, 5, "closed dirty entry " << fileno << " for writing " << path);
+       }
 }
 
 void
@@ -183,7 +209,7 @@ Ipc::StoreMap::abortIo(const sfileno fileno)
 
     // The caller is a lock holder. Thus, if we are Writeable, then the
     // caller must be the writer; otherwise the caller must be the reader.
-    if (s.state == Anchor::Writeable)
+    if (s.writing())
         abortWriting(fileno);
     else
         closeForReading(fileno);
@@ -194,15 +220,11 @@ Ipc::StoreMap::peekAtReader(const sfileno fileno) const
 {
     assert(valid(fileno));
     const Anchor &s = shared->slots[fileno].anchor;
-    switch (s.state) {
-    case Anchor::Readable:
+    if (s.reading())
         return &s; // immediate access by lock holder so no locking
-    case Anchor::Writeable:
-        return NULL; // cannot read the slot when it is being written
-    case Anchor::Empty:
-        assert(false); // must be locked for reading or writing
-    }
-    assert(false); // not reachable
+    if (s.writing())
+        return NULL; // the caller is not a read lock holder
+    assert(false); // must be locked for reading or writing
     return NULL;
 }
 
@@ -220,13 +242,37 @@ Ipc::StoreMap::freeEntry(const sfileno fileno)
         s.waitingToBeFreed = true; // mark to free it later
 }
 
+void
+Ipc::StoreMap::freeEntryByKey(const cache_key *const key)
+{
+    debugs(54, 5, "marking entry with key " << storeKeyText(key)
+           << " to be freed in " << path);
+
+    const int idx = anchorIndexByKey(key);
+    Anchor &s = shared->slots[idx].anchor;
+    if (s.lock.lockExclusive()) {
+        if (s.sameKey(key))
+            freeChain(idx, s, true);
+        s.lock.unlockExclusive();
+       } else if (s.lock.lockShared()) {
+        if (s.sameKey(key))
+            s.waitingToBeFreed = true; // mark to free it later
+        s.lock.unlockShared();
+    } else {
+       // we cannot be sure that the entry we found is ours because we do not
+       // have a lock on it, but we still check to minimize false deletions
+       if (s.sameKey(key))
+           s.waitingToBeFreed = true; // mark to free it later
+    }
+}
+
 /// unconditionally frees an already locked chain of slots, unlocking if needed
 void
 Ipc::StoreMap::freeChain(const sfileno fileno, Anchor &inode, const bool keepLocked)
 {
-    debugs(54, 7, "freeing " << inode.state << " entry " << fileno <<
+    debugs(54, 7, "freeing entry " << fileno <<
            " in " << path);
-    if (inode.state != Anchor::Empty) {
+    if (!inode.empty()) {
         sfileno sliceId = inode.start;
         debugs(54, 8, "first slice " << sliceId);
         while (sliceId >= 0) {
@@ -240,7 +286,7 @@ Ipc::StoreMap::freeChain(const sfileno fileno, Anchor &inode, const bool keepLoc
     }
 
     inode.waitingToBeFreed = false;
-    inode.state = Anchor::Empty;
+    inode.rewind();
 
     if (!keepLocked)
         inode.lock.unlockExclusive();
@@ -278,7 +324,7 @@ Ipc::StoreMap::openForReadingAt(const sfileno fileno)
         return NULL;
     }
 
-    if (s.state == Anchor::Empty) {
+    if (s.empty()) {
         s.lock.unlockShared();
         debugs(54, 7, "cannot open empty entry " << fileno <<
                " for reading " << path);
@@ -292,8 +338,6 @@ Ipc::StoreMap::openForReadingAt(const sfileno fileno)
         return NULL;
     }
 
-    // cannot be Writing here if we got shared lock and checked Empty above
-    assert(s.state == Anchor::Readable);
     debugs(54, 5, "opened entry " << fileno << " for reading " << path);
     return &s;
 }
@@ -303,7 +347,7 @@ Ipc::StoreMap::closeForReading(const sfileno fileno)
 {
     assert(valid(fileno));
     Anchor &s = shared->slots[fileno].anchor;
-    assert(s.state == Anchor::Readable);
+    assert(s.reading());
     s.lock.unlockShared();
     debugs(54, 5, "closed entry " << fileno << " for reading " << path);
 }
@@ -320,7 +364,7 @@ Ipc::StoreMap::purgeOne()
         assert(valid(fileno));
         Anchor &s = shared->slots[fileno].anchor;
         if (s.lock.lockExclusive()) {
-            if (s.state == Anchor::Readable) { // skip empties
+            if (!s.empty()) {
                 // this entry may be marked for deletion, and that is OK
                 freeChain(fileno, s, false);
                 debugs(54, 5, "purged entry " << fileno << " from " << path);
@@ -392,7 +436,7 @@ Ipc::StoreMap::anchorByKey(const cache_key *const key)
 
 /* Ipc::StoreMapAnchor */
 
-Ipc::StoreMapAnchor::StoreMapAnchor(): start(0), state(Empty)
+Ipc::StoreMapAnchor::StoreMapAnchor(): start(0)
 {
     memset(&key, 0, sizeof(key));
     memset(&basics, 0, sizeof(basics));
@@ -415,7 +459,7 @@ Ipc::StoreMapAnchor::sameKey(const cache_key *const aKey) const
 void
 Ipc::StoreMapAnchor::set(const StoreEntry &from)
 {
-    assert(state == Writeable);
+    assert(writing() && !reading());
     memcpy(key, from.key, sizeof(key));
     basics.timestamp = from.timestamp;
     basics.lastref = from.lastref;
@@ -429,7 +473,7 @@ Ipc::StoreMapAnchor::set(const StoreEntry &from)
 void
 Ipc::StoreMapAnchor::rewind()
 {
-    assert(state == Writeable);
+    assert(writing());
     start = 0;
     memset(&key, 0, sizeof(key));
     memset(&basics, 0, sizeof(basics));
index 995176e42178a1402be89ec99c0c6d75a66f83d8..3cb26d559910332e4043985a8b1f12f781b2d638 100644 (file)
@@ -12,13 +12,16 @@ namespace Ipc
 typedef int32_t StoreMapSliceId;
 
 /// a piece of Store entry, linked to other pieces, forming a chain
+/// slices may be appended by writers while readers read the entry
 class StoreMapSlice
 {
 public:
-    StoreMapSlice(): next(-1), size(0) {}
+    typedef uint32_t Size;
 
-    StoreMapSliceId next; ///< ID of the next slice occupied by the entry
-    uint32_t size; ///< slice contents size
+    StoreMapSlice(): size(0), next(-1) {}
+
+    Atomic::WordT<Size> size; ///< slice contents size
+    Atomic::WordT<StoreMapSliceId> next; ///< ID of the next entry slice
 };
 
 
@@ -40,10 +43,19 @@ public:
     /// undo the effects of set(), setKey(), etc., but keep locks and state
     void rewind();
 
+    /* entry state may change immediately after calling these methods unless
+     * the caller holds an appropriate lock */
+    bool empty() const { return !key[0] && !key[1]; }
+    bool reading() const { return lock.readers; }
+    bool writing() const { return lock.writers; }
+    bool complete() const { return !empty() && !writing(); }
+
 public:
     mutable ReadWriteLock lock; ///< protects slot data below
     Atomic::WordT<uint8_t> waitingToBeFreed; ///< may be accessed w/o a lock
 
+    // fields marked with [app] can be modified when appending-while-reading
+
     uint64_t key[2]; ///< StoreEntry key
 
     // STORE_META_STD TLV field from StoreEntry
@@ -52,13 +64,15 @@ public:
         time_t lastref;
         time_t expires;
         time_t lastmod;
-        uint64_t swap_file_sz;
+        uint64_t swap_file_sz; // [app]; XXX: make atomic
         uint16_t refcount;
         uint16_t flags;
     } basics;
 
-    StoreMapSliceId start; ///< where the chain of StoreEntry slices begins
+    /// where the chain of StoreEntry slices begins [app]; XXX: make atomic
+    StoreMapSliceId start; 
 
+#if 0
     /// possible persistent states
     typedef enum {
         Empty, ///< ready for writing, with nothing of value
@@ -66,6 +80,7 @@ public:
         Readable, ///< ready for reading
     } State;
     State state; ///< current state
+#endif
 };
 
 /// A hack to allocate one shared array for both anchors and slices.
@@ -126,6 +141,8 @@ public:
     /// locks and returns an anchor for the empty fileno position; if
     /// overwriteExisting is false and the position is not empty, returns nil
     Anchor *openForWritingAt(sfileno fileno, bool overwriteExisting = true);
+    /// restrict opened for writing entry to appending operations; allow reads
+    void startAppending(const sfileno fileno);
     /// successfully finish creating or updating the entry at fileno pos
     void closeForWriting(const sfileno fileno, bool lockForReading = false);
     /// unlock and "forget" openForWriting entry, making it Empty again
@@ -135,9 +152,11 @@ public:
     /// only works on locked entries; returns nil unless the slice is readable
     const Anchor *peekAtReader(const sfileno fileno) const;
 
-    /// if possible, free the entry and return true
-    /// otherwise mark it as waiting to be freed and return false
+    /// free the entry if possible or mark it as waiting to be freed if not
     void freeEntry(const sfileno fileno);
+    /// free the entry if possible or mark it as waiting to be freed if not
+    /// does nothing if we cannot check that the key matches the cached entry
+    void freeEntryByKey(const cache_key *const key);
 
     /// opens entry (identified by key) for reading, increments read level
     const Anchor *openForReading(const cache_key *const key, sfileno &fileno);
@@ -152,6 +171,8 @@ public:
     const Slice &readableSlice(const AnchorId anchorId, const SliceId sliceId) const;
     /// writeable anchor for the entry created by openForWriting()
     Anchor &writeableEntry(const AnchorId anchorId);
+    /// readable anchor for the entry created by openForReading()
+    const Anchor &readableEntry(const AnchorId anchorId) const;
 
     /// called by lock holder to terminate either slice writing or reading
     void abortIo(const sfileno fileno);
index 7b86662a7d4669b2efff612f5f732bac0c4f310e..24f707fee8680020db5b44d80282f64f050462a1 100644 (file)
@@ -523,7 +523,7 @@ StoreEntry::purgeMem()
 
     debugs(20, 3, "StoreEntry::purgeMem: Freeing memory-copy of " << getMD5Text());
 
-    destroyMemObject();
+    Store::Root().memoryUnlink(*this);
 
     if (swap_status != SWAPOUT_DONE)
         release();
@@ -1279,12 +1279,11 @@ StoreEntry::release()
         return;
     }
 
+    Store::Root().memoryUnlink(*this);
+
     if (StoreController::store_dirs_rebuilding && swap_filen > -1) {
         setPrivateKey();
 
-        if (mem_obj)
-            destroyMemObject();
-
         if (swap_filen > -1) {
             /*
              * Fake a call to StoreEntry->lock()  When rebuilding is done,
@@ -1312,7 +1311,6 @@ StoreEntry::release()
         unlink();
     }
 
-    setMemStatus(NOT_IN_MEMORY);
     destroyStoreEntry(static_cast<hash_link *>(this));
     PROF_stop(storeRelease);
 }
index 6e564643d32a04f36fb0b2e1b0c061dd32ccb0ac..dccf60fcbb8296c20737e1e9c6de15aa22c0888c 100644 (file)
@@ -811,6 +811,34 @@ StoreController::get(String const key, STOREGETCLIENT aCallback, void *aCallback
     fatal("not implemented");
 }
 
+/// updates the collapsed entry with the corresponding on-disk entry, if any
+bool
+StoreController::anchorCollapsedOnDisk(StoreEntry &collapsed)
+{
+    // TODO: move this loop to StoreHashIndex, just like the one in get().
+    if (const int cacheDirs = Config.cacheSwap.n_configured) {
+        // ask each cache_dir until the entry is found; use static starting
+        // point to avoid asking the same subset of disks more often
+        // TODO: coordinate with put() to be able to guess the right disk often
+        static int idx = 0;
+        for (int n = 0; n < cacheDirs; ++n) {
+            idx = (idx + 1) % cacheDirs;
+            SwapDir *sd = dynamic_cast<SwapDir*>(INDEXSD(idx));
+            if (!sd->active())
+                continue;
+
+            if (sd->anchorCollapsed(collapsed)) {
+                debugs(20, 3, "cache_dir " << idx << " anchors " << collapsed);
+                return true;
+            }
+        }
+    }
+
+    debugs(20, 4, "none of " << Config.cacheSwap.n_configured <<
+           " cache_dirs have " << collapsed);
+    return false;
+}
+
 // move this into [non-shared] memory cache class when we have one
 /// whether e should be kept in local RAM for possible future caching
 bool
@@ -845,6 +873,18 @@ StoreController::maybeTrimMemory(StoreEntry &e, const bool preserveSwappable)
         e.trimMemory(preserveSwappable);
 }
 
+void
+StoreController::memoryUnlink(StoreEntry &e)
+{
+    if (e.mem_status != IN_MEMORY)
+        return;
+
+    if (memStore)
+        memStore->unlink(e);
+    else // TODO: move into [non-shared] memory cache class when we have one
+        e.destroyMemObject();
+}
+
 void
 StoreController::handleIdleEntry(StoreEntry &e)
 {
@@ -893,6 +933,33 @@ StoreController::allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags,
     debugs(20, 3, "may " << (transients ? "SMP" : "") << " collapse " << *e);
 }
 
+void
+StoreController::syncCollapsed(const cache_key *key)
+{
+    StoreEntry *collapsed = swapDir->get(key);
+    if (!collapsed) // the entry is no longer locally active, ignore the update
+        return;
+
+    debugs(20, 7, "syncing " << *collapsed);
+
+    bool inSync = false;
+    if (memStore && collapsed->mem_status == IN_MEMORY)
+        inSync = memStore->updateCollapsed(*collapsed);
+    else if (collapsed->swap_filen >= 0)
+        inSync = collapsed->store()->updateCollapsed(*collapsed);
+    else if (memStore && memStore->anchorCollapsed(*collapsed))
+        inSync = true; // found in the memory cache
+    else if (anchorCollapsedOnDisk(*collapsed))
+        inSync = true; // found in a disk cache
+
+    if (inSync) {
+        debugs(20, 5, "synced " << *collapsed);
+        collapsed->invokeHandlers();
+    } else { // the backing entry is no longer cached; abort this hit
+        debugs(20, 3, "aborting cacheless " << *collapsed);
+        collapsed->abort();
+    }
+}
 
 StoreHashIndex::StoreHashIndex()
 {