dir(aDir),
slotSize(dir->slotSize),
objOffset(0),
+ sidFirst(-1),
+ sidPrevious(-1),
sidCurrent(-1),
+ sidNext(-1),
+ requestsSent(0),
+ repliesReceived(0),
theBuf(dir->slotSize)
{
e = anEntry;
// if we are dealing with the first read or
// if the offset went backwords, start searching from the beginning
if (sidCurrent < 0 || coreOff < objOffset) {
- sidCurrent = readAnchor().start;
+ // readers do not need sidFirst but set it for consistency/triage sake
+ sidCurrent = sidFirst = readAnchor().start;
objOffset = 0;
}
len = min(len,
static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
const uint64_t diskOffset = dir->diskOffset(sidCurrent);
- theFile->read(new ReadRequest(::ReadRequest(buf,
- diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
+ const auto start = diskOffset + sizeof(DbCellHeader) + coreOff - objOffset;
+ const auto id = ++requestsSent;
+ const auto request = new ReadRequest(::ReadRequest(buf, start, len), this, id);
+ theFile->read(request);
}
+void
+Rock::IoState::handleReadCompletion(Rock::ReadRequest &request, const int rlen, const int errFlag)
+{
+ if (errFlag != DISK_OK || rlen < 0) {
+ debugs(79, 3, errFlag << " failure for " << *e);
+ return callReaderBack(request.buf, -1);
+ }
+
+ if (!expectedReply(request.id))
+ return callReaderBack(request.buf, -1);
+
+ debugs(79, 5, '#' << request.id << " read " << rlen << " bytes at " << offset_ << " for " << *e);
+ offset_ += rlen;
+ callReaderBack(request.buf, rlen);
+}
+
+/// report (already sanitized/checked) I/O results to the read initiator
void
Rock::IoState::callReaderBack(const char *buf, int rlen)
{
- debugs(79, 5, rlen << " bytes for " << *e);
splicingPoint = rlen >= 0 ? sidCurrent : -1;
if (splicingPoint < 0)
staleSplicingPointNext = -1;
{
debugs(79, 7, swap_filen << " writes " << size << " more");
- // either this is the first write or append; we do not support write gaps
+ // either this is the first write or append;
+ // we do not support write gaps or rewrites
assert(!coreOff || coreOff == -1);
// throw if an accepted unknown-size entry grew too big or max-size changed
Must(static_cast<uint64_t>(offset_ + size) <= static_cast<uint64_t>(dir->maxObjectSize()));
- // allocate the first slice during the first write
- if (!coreOff) {
- assert(sidCurrent < 0);
- sidCurrent = dir->reserveSlotForWriting(); // throws on failures
- assert(sidCurrent >= 0);
- writeAnchor().start = sidCurrent;
- }
-
// buffer incoming data in slot buffer and write overflowing or final slots
// quit when no data left or we stopped writing on reentrant error
while (size > 0 && theFile != NULL) {
- assert(sidCurrent >= 0);
const size_t processed = writeToBuffer(buf, size);
buf += processed;
size -= processed;
const bool overflow = size > 0;
// We do not write a full buffer without overflow because
- // we would not yet know what to set the nextSlot to.
+ // we do not want to risk writing a payload-free slot on EOF.
if (overflow) {
- const auto sidNext = dir->reserveSlotForWriting(); // throws
+ Must(sidNext < 0);
+ sidNext = dir->reserveSlotForWriting();
assert(sidNext >= 0);
- writeToDisk(sidNext);
- // } else if (Store::Root().transientReaders(*e)) {
- // XXX: Partial writes cannot be read -- no map->startAppending()!
- // XXX: Partial writes confuse SwapDir::droppedEarlierRequest(),
- // resulting in released entries and, hence, misses and CF retries.
- // XXX: The effective benefit of partial writes is reduced by
- // doPages() buffering SM_PAGE_SIZE*n leftovers.
-
- // // write partial buffer for all remote hit readers to see
- // writeBufToDisk(-1, false, false);
+ writeToDisk();
+ Must(sidNext < 0); // short sidNext lifetime simplifies code logic
}
}
return 0;
if (!theBuf.size) {
- // will fill the header in writeToDisk when the next slot is known
+ // eventually, writeToDisk() will fill this header space
theBuf.appended(sizeof(DbCellHeader));
}
}
/// write what was buffered during write() calls
-/// negative sidNext means this is the last write request for this entry
void
-Rock::IoState::writeToDisk(const SlotId sidNextProposal)
+Rock::IoState::writeToDisk()
{
assert(theFile != NULL);
assert(theBuf.size >= sizeof(DbCellHeader));
- const bool lastWrite = sidNextProposal < 0;
+ assert((sidFirst < 0) == (sidCurrent < 0));
+ if (sidFirst < 0) // this is the first disk write
+ sidCurrent = sidFirst = dir->reserveSlotForWriting();
+
+ // negative sidNext means this is the last write request for this entry
+ const bool lastWrite = sidNext < 0;
+ // here, eof means that we are writing the right-most entry slot
const bool eof = lastWrite &&
// either not updating or the updating reader has loaded everything
(touchingStoreEntry() || staleSplicingPointNext < 0);
- // approve sidNextProposal unless _updating_ the last slot
- const SlotId sidNext = (!touchingStoreEntry() && lastWrite) ?
- staleSplicingPointNext : sidNextProposal;
- debugs(79, 5, "sidNext:" << sidNextProposal << "=>" << sidNext << " eof=" << eof);
+ debugs(79, 5, "sidCurrent=" << sidCurrent << " sidNext=" << sidNext << " eof=" << eof);
// TODO: if DiskIO module is mmap-based, we should be writing whole pages
// to avoid triggering read-page;new_head+old_tail;write-page overheads
- writeBufToDisk(sidNext, eof, lastWrite);
- theBuf.clear();
-
- sidCurrent = sidNext;
-}
-
-/// creates and submits a request to write current slot buffer to disk
-/// eof is true if and only this is the last slot
-void
-Rock::IoState::writeBufToDisk(const SlotId sidNext, const bool eof, const bool lastWrite)
-{
- // no slots after the last/eof slot (but partial slots may have a nil next)
- assert(!eof || sidNext < 0);
+ assert(!eof || sidNext < 0); // no slots after eof
// finalize db cell header
DbCellHeader header;
memcpy(header.key, e->key, sizeof(header.key));
- header.firstSlot = writeAnchor().start;
- header.nextSlot = sidNext;
+ header.firstSlot = sidFirst;
+
+ const auto lastUpdatingWrite = lastWrite && !touchingStoreEntry();
+ assert(!lastUpdatingWrite || sidNext < 0);
+ header.nextSlot = lastUpdatingWrite ? staleSplicingPointNext : sidNext;
+
header.payloadSize = theBuf.size - sizeof(DbCellHeader);
header.entrySize = eof ? offset_ : 0; // storeSwapOutFileClosed sets swap_file_sz after write
header.version = writeAnchor().basics.timestamp;
const uint64_t diskOffset = dir->diskOffset(sidCurrent);
debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
theBuf.size);
-
+ const auto id = ++requestsSent;
WriteRequest *const r = new WriteRequest(
::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
- memFreeBufFunc(wBufCap)), this);
+ memFreeBufFunc(wBufCap)), this, id);
r->sidCurrent = sidCurrent;
- r->sidNext = sidNext;
+ r->sidPrevious = sidPrevious;
r->eof = lastWrite;
+ sidPrevious = sidCurrent;
+ sidCurrent = sidNext; // sidNext may be cleared/negative already
+ sidNext = -1;
+
+ theBuf.clear();
+
// theFile->write may call writeCompleted immediatelly
theFile->write(r);
}
+bool
+Rock::IoState::expectedReply(const IoXactionId receivedId)
+{
+ Must(requestsSent); // paranoid: we sent some requests
+ Must(receivedId); // paranoid: the request was sent by some sio
+ Must(receivedId <= requestsSent); // paranoid: within our range
+ ++repliesReceived;
+ const auto expectedId = repliesReceived;
+ if (receivedId == expectedId)
+ return true;
+
+ debugs(79, 3, "no; expected reply #" << expectedId <<
+ ", but got #" << receivedId);
+ return false;
+}
+
void
Rock::IoState::finishedWriting(const int errFlag)
{
+ if (sidCurrent >= 0) {
+ dir->noteFreeMapSlice(sidCurrent);
+ sidCurrent = -1;
+ }
+ if (sidNext >= 0) {
+ dir->noteFreeMapSlice(sidNext);
+ sidNext = -1;
+ }
+
// we incremented offset_ while accumulating data in write()
// we do not reset writeableAnchor_ here because we still keep the lock
if (touchingStoreEntry())
Rock::IoState::close(int how)
{
debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how <<
- " buf: " << theBuf.size << " callback: " << callback);
+ " leftovers: " << theBuf.size <<
+ " after " << requestsSent << '/' << repliesReceived <<
+ " callback: " << callback);
if (!theFile) {
debugs(79, 3, "I/O already canceled");
switch (how) {
case wroteAll:
assert(theBuf.size > 0); // we never flush last bytes on our own
- writeToDisk(-1); // flush last, yet unwritten slot to disk
- return; // writeCompleted() will callBack()
+ try {
+ writeToDisk(); // flush last, yet unwritten slot to disk
+ return; // writeCompleted() will callBack()
+ }
+ catch (...) {
+ debugs(79, 2, "db flush error: " << CurrentException);
+ // TODO: Move finishedWriting() into SwapDir::writeError().
+ dir->writeError(*this);
+ finishedWriting(DISK_ERROR);
+ }
+ return;
case writerGone:
dir->writeError(*this); // abort a partially stored entry
#ifndef SQUID_FS_ROCK_IO_STATE_H
#define SQUID_FS_ROCK_IO_STATE_H
+#include "fs/rock/forward.h"
#include "fs/rock/RockSwapDir.h"
#include "sbuf/MemBlob.h"
/// whether we are still waiting for the I/O results (i.e., not closed)
bool stillWaiting() const { return theFile != NULL; }
- /// forwards read data to the reader that initiated this I/O
- void callReaderBack(const char *buf, int rlen);
+ /// forwards read data (or an error) to the reader that initiated this I/O
+ void handleReadCompletion(Rock::ReadRequest &request, const int rlen, const int errFlag);
/// called by SwapDir::writeCompleted() after the last write and on error
void finishedWriting(const int errFlag);
+ /// notes that the disker has satisfied the given I/O request
+ /// \returns whether all earlier I/O requests have been satisfied already
+ bool expectedReply(const IoXactionId receivedId);
+
/* one and only one of these will be set and locked; access via *Anchor() */
const Ipc::StoreMapAnchor *readableAnchor_; ///< starting point for reading
Ipc::StoreMapAnchor *writeableAnchor_; ///< starting point for writing
void tryWrite(char const *buf, size_t size, off_t offset);
size_t writeToBuffer(char const *buf, size_t size);
- void writeToDisk(const SlotId nextSlot);
- void writeBufToDisk(const SlotId nextSlot, const bool eof, const bool lastWrite);
+ void writeToDisk();
+ void callReaderBack(const char *buf, int rlen);
void callBack(int errflag);
Rock::SwapDir::Pointer dir; ///< swap dir that initiated I/O
const size_t slotSize; ///< db cell size
int64_t objOffset; ///< object offset for current db slot
- SlotId sidCurrent; ///< ID of the db slot currently being read or written
+
+ /// The very first entry slot. Usually the same as anchor.first,
+ /// but writers set anchor.first only after the first write is done.
+ SlotId sidFirst;
+
+ /// Unused by readers.
+ /// For writers, the slot pointing (via .next) to sidCurrent.
+ SlotId sidPrevious;
+
+ /// For readers, the db slot currently being read from disk.
+ /// For writers, the reserved db slot currently being filled (to be written).
+ SlotId sidCurrent;
+
+ /// Unused by readers.
+ /// For writers, the reserved db slot that sidCurrent.next will point to.
+ SlotId sidNext;
+
+ /// the number of read or write requests we sent to theFile
+ uint64_t requestsSent;
+
+ /// the number of successful responses we received from theFile
+ uint64_t repliesReceived;
RefCount<DiskFile> theFile; // "file" responsible for this I/O
MemBlob theBuf; // use for write content accumulation only
ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
assert(request);
IoState::Pointer sio = request->sio;
-
- if (errflag == DISK_OK && rlen > 0)
- sio->offset_ += rlen;
-
- sio->callReaderBack(r->buf, rlen);
+ sio->handleReadCompletion(*request, rlen, errflag);
}
void
Rock::SwapDir::writeCompleted(int errflag, size_t, RefCount< ::WriteRequest> r)
{
+ // TODO: Move details into IoState::handleWriteCompletion() after figuring
+ // out how to deal with map access. See readCompleted().
+
Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
assert(request);
assert(request->sio != NULL);
// quit if somebody called IoState::close() while we were waiting
if (!sio.stillWaiting()) {
debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
- noteFreeMapSlice(request->sidNext);
+ noteFreeMapSlice(request->sidCurrent);
return;
}
if (errflag != DISK_OK)
handleWriteCompletionProblem(errflag, *request);
- else if (droppedEarlierRequest(*request))
+ else if (!sio.expectedReply(request->id))
handleWriteCompletionProblem(DISK_ERROR, *request);
else
handleWriteCompletionSuccess(*request);
sio.splicingPoint = request.sidCurrent;
// do not increment sio.offset_ because we do it in sio->write()
- // finalize the shared slice info after writing slice contents to disk
+ assert(sio.writeableAnchor_);
+ if (sio.writeableAnchor_->start < 0) { // wrote the first slot
+ Must(request.sidPrevious < 0);
+ sio.writeableAnchor_->start = request.sidCurrent;
+ } else {
+ Must(request.sidPrevious >= 0);
+ map->writeableSlice(sio.swap_filen, request.sidPrevious).next = request.sidCurrent;
+ }
+
+ // finalize the shared slice info after writing slice contents to disk;
+ // the chain gets possession of the slice we were writing
Ipc::StoreMap::Slice &slice =
map->writeableSlice(sio.swap_filen, request.sidCurrent);
slice.size = request.len - sizeof(DbCellHeader);
- slice.next = request.sidNext;
+ Must(slice.next < 0);
if (request.eof) {
assert(sio.e);
- assert(sio.writeableAnchor_);
if (sio.touchingStoreEntry()) {
sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz =
sio.offset_;
{
auto &sio = *request.sio;
- noteFreeMapSlice(request.sidNext);
+ noteFreeMapSlice(request.sidCurrent);
writeError(sio);
sio.finishedWriting(errflag);
// All callers must also call IoState callback, to propagate the error.
}
-/// whether the disk has dropped at least one of the previous write requests
-bool
-Rock::SwapDir::droppedEarlierRequest(const WriteRequest &request) const
-{
- const auto &sio = *request.sio;
- assert(sio.writeableAnchor_);
- const Ipc::StoreMapSliceId expectedSliceId = sio.splicingPoint < 0 ?
- sio.writeableAnchor_->start :
- map->writeableSlice(sio.swap_filen, sio.splicingPoint).next;
- if (expectedSliceId != request.sidCurrent) {
- debugs(79, 3, "yes; expected " << expectedSliceId << ", but got " << request.sidCurrent);
- return true;
- }
-
- return false;
-}
-
void
Rock::SwapDir::updateHeaders(StoreEntry *updatedE)
{