]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Re-enabled on-disk collapsing of entries after fixing related code.
authorAlex Rousskov <rousskov@measurement-factory.com>
Mon, 29 Jul 2013 00:43:55 +0000 (18:43 -0600)
committerAlex Rousskov <rousskov@measurement-factory.com>
Mon, 29 Jul 2013 00:43:55 +0000 (18:43 -0600)
Since we started writing partial entries, we cannot rely on negative sidNext
marking the end of the slice/write sequence. Added a WriteRequest::eof field
to signal that end explicitly.

Do not leak db slices when write fails or IoState is closed before the write
succeeds.

Handle store client requesting an offset we have not stored yet. This might
happen for collapsed hits (and also if the client is buggy). May need more
work to slow the reader down.

Do not update various shared stats until the corresponding slot is written.

src/fs/rock/RockIoRequests.cc
src/fs/rock/RockIoRequests.h
src/fs/rock/RockIoState.cc
src/fs/rock/RockIoState.h
src/fs/rock/RockSwapDir.cc

index 25c08d451c38199932e553fe722050b2d9794552..dd33320e43a9117f86d764a225c08d9582c81204 100644 (file)
@@ -20,6 +20,7 @@ Rock::WriteRequest::WriteRequest(const ::WriteRequest &base,
         ::WriteRequest(base),
         sio(anSio),
         sidCurrent(-1),
-        sidNext(-1)
+        sidNext(-1),
+        eof(false)
 {
 }
index c985b4940b8a71d64bbebb93e4f9dacbf2c6c5a8..367f4092a3262373d4db28550139b8b7feec8fc8 100644 (file)
@@ -34,6 +34,9 @@ public:
     /// allocated next slot (negative if we are writing the last slot)
     SlotId sidNext;
 
+    /// whether this is the last request for the entry
+    bool eof;
+
 private:
     CBDATA_CLASS2(WriteRequest);
 };
index be0f4638b07155ae9f79c8ae4f8282bc78203441..7eabd7d03df3522e1a1a233ca2ab3218542f3a3a 100644 (file)
@@ -100,26 +100,44 @@ Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data
         objOffset = 0;
     }
 
-    while (coreOff >= objOffset + currentReadableSlice().size) {
+    while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) {
         objOffset += currentReadableSlice().size;
         sidCurrent = currentReadableSlice().next;
-        assert(sidCurrent >= 0); // TODO: handle "read offset too big" error
     }
 
-    offset_ = coreOff;
-    len = min(len,
-        static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
-
     assert(read.callback == NULL);
     assert(read.callback_data == NULL);
     read.callback = cb;
     read.callback_data = cbdataReference(data);
 
+    // punt if read offset is too big (because of client bugs or collapsing)
+    if (sidCurrent < 0) {
+        debugs(79, 5, "no " << coreOff << " in " << *e);
+        callReaderBack(buf, 0);
+        return;
+    }
+
+    offset_ = coreOff;
+    len = min(len,
+        static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
     const uint64_t diskOffset = dir->diskOffset(sidCurrent);
     theFile->read(new ReadRequest(::ReadRequest(buf,
         diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
 }
 
+void
+Rock::IoState::callReaderBack(const char *buf, int rlen)
+{
+    debugs(79, 5, rlen << " bytes for " << *e);
+    StoreIOState::STRCB *callb = read.callback;
+    assert(callb);
+    read.callback = NULL;
+    void *cbdata;
+    if (cbdataReferenceValidDone(read.callback_data, &cbdata))
+        callb(cbdata, buf, rlen, this);
+}
+
+
 /// wraps tryWrite() to handle deep write failures centrally and safely
 bool
 Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
@@ -177,9 +195,9 @@ Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
             const SlotId sidNext = reserveSlotForWriting(); // throws
             assert(sidNext >= 0);
             writeToDisk(sidNext);
-        } else if (false && Store::Root().transientReaders(*e)) {
+        } else if (Store::Root().transientReaders(*e)) {
             // write partial buffer for all remote hit readers to see
-            writeBufToDisk(-1);
+            writeBufToDisk(-1, false);
         }
     }
 
@@ -213,36 +231,35 @@ Rock::IoState::writeToDisk(const SlotId sidNext)
     assert(theFile != NULL);
     assert(theBuf.size >= sizeof(DbCellHeader));
 
-    if (sidNext < 0) { // we are writing the last slot
-        e->swap_file_sz = offset_;
-        writeAnchor().basics.swap_file_sz = offset_; // would not hurt, right?
-    }
-
     // TODO: if DiskIO module is mmap-based, we should be writing whole pages
     // to avoid triggering read-page;new_head+old_tail;write-page overheads
 
+    writeBufToDisk(sidNext, sidNext < 0);
+    theBuf.clear();
+
+    sidCurrent = sidNext;
+}
+
+/// creates and submits a request to write current slot buffer to disk
+/// eof is true if and only this is the last slot
+void
+Rock::IoState::writeBufToDisk(const SlotId sidNext, bool eof)
+{
+    // no slots after the last/eof slot (but partial slots may have a nil next)
+    assert(!eof || sidNext < 0);
+
     // finalize db cell header
     DbCellHeader header;
     memcpy(header.key, e->key, sizeof(header.key));
     header.firstSlot = writeAnchor().start;
     header.nextSlot = sidNext;
     header.payloadSize = theBuf.size - sizeof(DbCellHeader);
-    header.entrySize = e->swap_file_sz; // may still be zero unless sidNext < 0
+    header.entrySize = e->swap_file_sz; // zero except for the very last write
     header.version = writeAnchor().basics.timestamp;
 
     // copy finalized db cell header into buffer
     memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
 
-    writeBufToDisk(sidNext);
-    theBuf.clear();
-
-    sidCurrent = sidNext;
-}
-
-/// Write header-less (ugh) or complete buffer to disk.
-void
-Rock::IoState::writeBufToDisk(const SlotId sidNext)
-{
     // and now allocate another buffer for the WriteRequest so that
     // we can support concurrent WriteRequests (and to ease cleaning)
     // TODO: should we limit the number of outstanding requests?
@@ -259,6 +276,7 @@ Rock::IoState::writeBufToDisk(const SlotId sidNext)
             memFreeBufFunc(wBufCap)), this);
     r->sidCurrent = sidCurrent;
     r->sidNext = sidNext;
+    r->eof = eof;
 
     // theFile->write may call writeCompleted immediatelly
     theFile->write(r);
index 29e02f5d5782950a8b3e08faa4b5bda79fc43a5a..a44660753b0f3549f35a00e6315a5b63b5ff0765 100644 (file)
@@ -31,6 +31,9 @@ public:
     /// whether we are still waiting for the I/O results (i.e., not closed)
     bool stillWaiting() const { return theFile != NULL; }
 
+    /// forwards read data to the reader that initiated this I/O
+    void callReaderBack(const char *buf, int rlen);
+
     /// called by SwapDir::writeCompleted() after the last write and on error
     void finishedWriting(const int errFlag);
 
@@ -50,7 +53,7 @@ private:
     void tryWrite(char const *buf, size_t size, off_t offset);
     size_t writeToBuffer(char const *buf, size_t size);
     void writeToDisk(const SlotId nextSlot);
-    void writeBufToDisk(const SlotId nextSlot);
+    void writeBufToDisk(const SlotId nextSlot, const bool eof);
     SlotId reserveSlotForWriting();
     
     void callBack(int errflag);
index a81da20459e636c603af2004d636889156d58226..3047bdad92beb777ccdef1e98ad2e5373dda007a 100644 (file)
@@ -83,7 +83,7 @@ Rock::SwapDir::get(const cache_key *key)
 bool
 Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
 {
-    if (true || !map || !theFile || !theFile->canRead())
+    if (!map || !theFile || !theFile->canRead())
         return false;
 
     sfileno filen;
@@ -734,7 +734,7 @@ Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOS
 
     // The are two ways an entry can get swap_filen: our get() locked it for
     // reading or our storeSwapOutStart() locked it for writing. Peeking at our
-    // locked entry is safe, but no support for reading a filling entry.
+    // locked entry is safe, but no support for reading the entry we swap out.
     const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen);
     if (!slot)
         return NULL; // we were writing afterall
@@ -752,9 +752,9 @@ Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOS
            sio->swap_filen);
 
     assert(slot->sameKey(static_cast<const cache_key*>(e.key)));
-    assert(slot->basics.swap_file_sz > 0);
-    // XXX: basics.swap_file_sz may grow for collapsed disk hits
-    assert(slot->basics.swap_file_sz == e.swap_file_sz);
+    // For collapsed disk hits: e.swap_file_sz and slot->basics.swap_file_sz
+    // may still be zero and basics.swap_file_sz may grow.
+    assert(slot->basics.swap_file_sz >= e.swap_file_sz);
 
     return sio;
 }
@@ -792,12 +792,7 @@ Rock::SwapDir::readCompleted(const char *buf, int rlen, int errflag, RefCount< :
     if (errflag == DISK_OK && rlen > 0)
         sio->offset_ += rlen;
 
-    StoreIOState::STRCB *callb = sio->read.callback;
-    assert(callb);
-    sio->read.callback = NULL;
-    void *cbdata;
-    if (cbdataReferenceValidDone(sio->read.callback_data, &cbdata))
-        callb(cbdata, r->buf, rlen, sio.getRaw());
+    sio->callReaderBack(r->buf, rlen);
 }
 
 void
@@ -811,9 +806,12 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest
     // quit if somebody called IoState::close() while we were waiting
     if (!sio.stillWaiting()) {
         debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
+        noteFreeMapSlice(request->sidNext);
         return;
     }
 
+    // TODO: Fail if disk dropped one of the previous write requests.
+
     if (errflag == DISK_OK) {
         // do not increment sio.offset_ because we do it in sio->write()
 
@@ -823,13 +821,20 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest
         slice.size = request->len - sizeof(DbCellHeader);
         slice.next = request->sidNext;
         
-        if (request->sidNext < 0) {
+        if (request->eof) {
+            assert(sio.e);
+            assert(sio.writeableAnchor_);
+            sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz =
+                sio.offset_;
+
             // close, the entry gets the read lock
             map->closeForWriting(sio.swap_filen, true);
             sio.writeableAnchor_ = NULL;
             sio.finishedWriting(errflag);
         }
     } else {
+        noteFreeMapSlice(request->sidNext);
+
         writeError(*sio.e);
         sio.finishedWriting(errflag);
         // and hope that Core will call disconnect() to close the map entry