From 5296bbd9b4eaee1522df1ff0e734c09eab532402 Mon Sep 17 00:00:00 2001 From: Alex Rousskov Date: Sun, 28 Jul 2013 18:43:55 -0600 Subject: [PATCH] Re-enabled on-disk collapsing of entries after fixing related code. Since we started writing partial entries, we cannot rely on negative sidNext marking the end of the slice/write sequence. Added a WriteRequest::eof field to signal that end explicitly. Do not leak db slices when write fails or IoState is closed before the write succeeds. Handle store client requesting an offset we have not stored yet. This might happen for collapsed hits (and also if the client is buggy). May need more work to slow the reader down. Do not update various shared stats until the corresponding slot is written. --- src/fs/rock/RockIoRequests.cc | 3 +- src/fs/rock/RockIoRequests.h | 3 ++ src/fs/rock/RockIoState.cc | 66 ++++++++++++++++++++++------------- src/fs/rock/RockIoState.h | 5 ++- src/fs/rock/RockSwapDir.cc | 29 ++++++++------- 5 files changed, 68 insertions(+), 38 deletions(-) diff --git a/src/fs/rock/RockIoRequests.cc b/src/fs/rock/RockIoRequests.cc index 25c08d451c..dd33320e43 100644 --- a/src/fs/rock/RockIoRequests.cc +++ b/src/fs/rock/RockIoRequests.cc @@ -20,6 +20,7 @@ Rock::WriteRequest::WriteRequest(const ::WriteRequest &base, ::WriteRequest(base), sio(anSio), sidCurrent(-1), - sidNext(-1) + sidNext(-1), + eof(false) { } diff --git a/src/fs/rock/RockIoRequests.h b/src/fs/rock/RockIoRequests.h index c985b4940b..367f4092a3 100644 --- a/src/fs/rock/RockIoRequests.h +++ b/src/fs/rock/RockIoRequests.h @@ -34,6 +34,9 @@ public: /// allocated next slot (negative if we are writing the last slot) SlotId sidNext; + /// whether this is the last request for the entry + bool eof; + private: CBDATA_CLASS2(WriteRequest); }; diff --git a/src/fs/rock/RockIoState.cc b/src/fs/rock/RockIoState.cc index be0f4638b0..7eabd7d03d 100644 --- a/src/fs/rock/RockIoState.cc +++ b/src/fs/rock/RockIoState.cc @@ -100,26 +100,44 @@ Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data objOffset = 0; } - while (coreOff >= objOffset + currentReadableSlice().size) { + while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) { objOffset += currentReadableSlice().size; sidCurrent = currentReadableSlice().next; - assert(sidCurrent >= 0); // TODO: handle "read offset too big" error } - offset_ = coreOff; - len = min(len, - static_cast(objOffset + currentReadableSlice().size - coreOff)); - assert(read.callback == NULL); assert(read.callback_data == NULL); read.callback = cb; read.callback_data = cbdataReference(data); + // punt if read offset is too big (because of client bugs or collapsing) + if (sidCurrent < 0) { + debugs(79, 5, "no " << coreOff << " in " << *e); + callReaderBack(buf, 0); + return; + } + + offset_ = coreOff; + len = min(len, + static_cast(objOffset + currentReadableSlice().size - coreOff)); const uint64_t diskOffset = dir->diskOffset(sidCurrent); theFile->read(new ReadRequest(::ReadRequest(buf, diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this)); } +void +Rock::IoState::callReaderBack(const char *buf, int rlen) +{ + debugs(79, 5, rlen << " bytes for " << *e); + StoreIOState::STRCB *callb = read.callback; + assert(callb); + read.callback = NULL; + void *cbdata; + if (cbdataReferenceValidDone(read.callback_data, &cbdata)) + callb(cbdata, buf, rlen, this); +} + + /// wraps tryWrite() to handle deep write failures centrally and safely bool Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor) @@ -177,9 +195,9 @@ Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff) const SlotId sidNext = reserveSlotForWriting(); // throws assert(sidNext >= 0); writeToDisk(sidNext); - } else if (false && Store::Root().transientReaders(*e)) { + } else if (Store::Root().transientReaders(*e)) { // write partial buffer for all remote hit readers to see - writeBufToDisk(-1); + writeBufToDisk(-1, false); } } @@ -213,36 +231,35 @@ Rock::IoState::writeToDisk(const SlotId sidNext) assert(theFile != NULL); assert(theBuf.size >= sizeof(DbCellHeader)); - if (sidNext < 0) { // we are writing the last slot - e->swap_file_sz = offset_; - writeAnchor().basics.swap_file_sz = offset_; // would not hurt, right? - } - // TODO: if DiskIO module is mmap-based, we should be writing whole pages // to avoid triggering read-page;new_head+old_tail;write-page overheads + writeBufToDisk(sidNext, sidNext < 0); + theBuf.clear(); + + sidCurrent = sidNext; +} + +/// creates and submits a request to write current slot buffer to disk +/// eof is true if and only this is the last slot +void +Rock::IoState::writeBufToDisk(const SlotId sidNext, bool eof) +{ + // no slots after the last/eof slot (but partial slots may have a nil next) + assert(!eof || sidNext < 0); + // finalize db cell header DbCellHeader header; memcpy(header.key, e->key, sizeof(header.key)); header.firstSlot = writeAnchor().start; header.nextSlot = sidNext; header.payloadSize = theBuf.size - sizeof(DbCellHeader); - header.entrySize = e->swap_file_sz; // may still be zero unless sidNext < 0 + header.entrySize = e->swap_file_sz; // zero except for the very last write header.version = writeAnchor().basics.timestamp; // copy finalized db cell header into buffer memcpy(theBuf.mem, &header, sizeof(DbCellHeader)); - writeBufToDisk(sidNext); - theBuf.clear(); - - sidCurrent = sidNext; -} - -/// Write header-less (ugh) or complete buffer to disk. -void -Rock::IoState::writeBufToDisk(const SlotId sidNext) -{ // and now allocate another buffer for the WriteRequest so that // we can support concurrent WriteRequests (and to ease cleaning) // TODO: should we limit the number of outstanding requests? @@ -259,6 +276,7 @@ Rock::IoState::writeBufToDisk(const SlotId sidNext) memFreeBufFunc(wBufCap)), this); r->sidCurrent = sidCurrent; r->sidNext = sidNext; + r->eof = eof; // theFile->write may call writeCompleted immediatelly theFile->write(r); diff --git a/src/fs/rock/RockIoState.h b/src/fs/rock/RockIoState.h index 29e02f5d57..a44660753b 100644 --- a/src/fs/rock/RockIoState.h +++ b/src/fs/rock/RockIoState.h @@ -31,6 +31,9 @@ public: /// whether we are still waiting for the I/O results (i.e., not closed) bool stillWaiting() const { return theFile != NULL; } + /// forwards read data to the reader that initiated this I/O + void callReaderBack(const char *buf, int rlen); + /// called by SwapDir::writeCompleted() after the last write and on error void finishedWriting(const int errFlag); @@ -50,7 +53,7 @@ private: void tryWrite(char const *buf, size_t size, off_t offset); size_t writeToBuffer(char const *buf, size_t size); void writeToDisk(const SlotId nextSlot); - void writeBufToDisk(const SlotId nextSlot); + void writeBufToDisk(const SlotId nextSlot, const bool eof); SlotId reserveSlotForWriting(); void callBack(int errflag); diff --git a/src/fs/rock/RockSwapDir.cc b/src/fs/rock/RockSwapDir.cc index a81da20459..3047bdad92 100644 --- a/src/fs/rock/RockSwapDir.cc +++ b/src/fs/rock/RockSwapDir.cc @@ -83,7 +83,7 @@ Rock::SwapDir::get(const cache_key *key) bool Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed, bool &inSync) { - if (true || !map || !theFile || !theFile->canRead()) + if (!map || !theFile || !theFile->canRead()) return false; sfileno filen; @@ -734,7 +734,7 @@ Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOS // The are two ways an entry can get swap_filen: our get() locked it for // reading or our storeSwapOutStart() locked it for writing. Peeking at our - // locked entry is safe, but no support for reading a filling entry. + // locked entry is safe, but no support for reading the entry we swap out. const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen); if (!slot) return NULL; // we were writing afterall @@ -752,9 +752,9 @@ Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOS sio->swap_filen); assert(slot->sameKey(static_cast(e.key))); - assert(slot->basics.swap_file_sz > 0); - // XXX: basics.swap_file_sz may grow for collapsed disk hits - assert(slot->basics.swap_file_sz == e.swap_file_sz); + // For collapsed disk hits: e.swap_file_sz and slot->basics.swap_file_sz + // may still be zero and basics.swap_file_sz may grow. + assert(slot->basics.swap_file_sz >= e.swap_file_sz); return sio; } @@ -792,12 +792,7 @@ Rock::SwapDir::readCompleted(const char *buf, int rlen, int errflag, RefCount< : if (errflag == DISK_OK && rlen > 0) sio->offset_ += rlen; - StoreIOState::STRCB *callb = sio->read.callback; - assert(callb); - sio->read.callback = NULL; - void *cbdata; - if (cbdataReferenceValidDone(sio->read.callback_data, &cbdata)) - callb(cbdata, r->buf, rlen, sio.getRaw()); + sio->callReaderBack(r->buf, rlen); } void @@ -811,9 +806,12 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest // quit if somebody called IoState::close() while we were waiting if (!sio.stillWaiting()) { debugs(79, 3, "ignoring closed entry " << sio.swap_filen); + noteFreeMapSlice(request->sidNext); return; } + // TODO: Fail if disk dropped one of the previous write requests. + if (errflag == DISK_OK) { // do not increment sio.offset_ because we do it in sio->write() @@ -823,13 +821,20 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest slice.size = request->len - sizeof(DbCellHeader); slice.next = request->sidNext; - if (request->sidNext < 0) { + if (request->eof) { + assert(sio.e); + assert(sio.writeableAnchor_); + sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz = + sio.offset_; + // close, the entry gets the read lock map->closeForWriting(sio.swap_filen, true); sio.writeableAnchor_ = NULL; sio.finishedWriting(errflag); } } else { + noteFreeMapSlice(request->sidNext); + writeError(*sio.e); sio.finishedWriting(errflag); // and hope that Core will call disconnect() to close the map entry -- 2.39.5