objOffset = 0;
}
- while (coreOff >= objOffset + currentReadableSlice().size) {
+ while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) {
objOffset += currentReadableSlice().size;
sidCurrent = currentReadableSlice().next;
- assert(sidCurrent >= 0); // TODO: handle "read offset too big" error
}
- offset_ = coreOff;
- len = min(len,
- static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
-
assert(read.callback == NULL);
assert(read.callback_data == NULL);
read.callback = cb;
read.callback_data = cbdataReference(data);
+ // punt if read offset is too big (because of client bugs or collapsing)
+ if (sidCurrent < 0) {
+ debugs(79, 5, "no " << coreOff << " in " << *e);
+ callReaderBack(buf, 0);
+ return;
+ }
+
+ offset_ = coreOff;
+ len = min(len,
+ static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
const uint64_t diskOffset = dir->diskOffset(sidCurrent);
theFile->read(new ReadRequest(::ReadRequest(buf,
diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
}
+void
+Rock::IoState::callReaderBack(const char *buf, int rlen)
+{
+ debugs(79, 5, rlen << " bytes for " << *e);
+ StoreIOState::STRCB *callb = read.callback;
+ assert(callb);
+ read.callback = NULL;
+ void *cbdata;
+ if (cbdataReferenceValidDone(read.callback_data, &cbdata))
+ callb(cbdata, buf, rlen, this);
+}
+
+
/// wraps tryWrite() to handle deep write failures centrally and safely
bool
Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
const SlotId sidNext = reserveSlotForWriting(); // throws
assert(sidNext >= 0);
writeToDisk(sidNext);
- } else if (false && Store::Root().transientReaders(*e)) {
+ } else if (Store::Root().transientReaders(*e)) {
// write partial buffer for all remote hit readers to see
- writeBufToDisk(-1);
+ writeBufToDisk(-1, false);
}
}
assert(theFile != NULL);
assert(theBuf.size >= sizeof(DbCellHeader));
- if (sidNext < 0) { // we are writing the last slot
- e->swap_file_sz = offset_;
- writeAnchor().basics.swap_file_sz = offset_; // would not hurt, right?
- }
-
// TODO: if DiskIO module is mmap-based, we should be writing whole pages
// to avoid triggering read-page;new_head+old_tail;write-page overheads
+ writeBufToDisk(sidNext, sidNext < 0);
+ theBuf.clear();
+
+ sidCurrent = sidNext;
+}
+
+/// creates and submits a request to write current slot buffer to disk
+/// eof is true if and only this is the last slot
+void
+Rock::IoState::writeBufToDisk(const SlotId sidNext, bool eof)
+{
+ // no slots after the last/eof slot (but partial slots may have a nil next)
+ assert(!eof || sidNext < 0);
+
// finalize db cell header
DbCellHeader header;
memcpy(header.key, e->key, sizeof(header.key));
header.firstSlot = writeAnchor().start;
header.nextSlot = sidNext;
header.payloadSize = theBuf.size - sizeof(DbCellHeader);
- header.entrySize = e->swap_file_sz; // may still be zero unless sidNext < 0
+ header.entrySize = e->swap_file_sz; // zero except for the very last write
header.version = writeAnchor().basics.timestamp;
// copy finalized db cell header into buffer
memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
- writeBufToDisk(sidNext);
- theBuf.clear();
-
- sidCurrent = sidNext;
-}
-
-/// Write header-less (ugh) or complete buffer to disk.
-void
-Rock::IoState::writeBufToDisk(const SlotId sidNext)
-{
// and now allocate another buffer for the WriteRequest so that
// we can support concurrent WriteRequests (and to ease cleaning)
// TODO: should we limit the number of outstanding requests?
memFreeBufFunc(wBufCap)), this);
r->sidCurrent = sidCurrent;
r->sidNext = sidNext;
+ r->eof = eof;
// theFile->write may call writeCompleted immediatelly
theFile->write(r);
/// whether we are still waiting for the I/O results (i.e., not closed)
bool stillWaiting() const { return theFile != NULL; }
+ /// forwards read data to the reader that initiated this I/O
+ void callReaderBack(const char *buf, int rlen);
+
/// called by SwapDir::writeCompleted() after the last write and on error
void finishedWriting(const int errFlag);
void tryWrite(char const *buf, size_t size, off_t offset);
size_t writeToBuffer(char const *buf, size_t size);
void writeToDisk(const SlotId nextSlot);
- void writeBufToDisk(const SlotId nextSlot);
+ void writeBufToDisk(const SlotId nextSlot, const bool eof);
SlotId reserveSlotForWriting();
void callBack(int errflag);
bool
Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
{
- if (true || !map || !theFile || !theFile->canRead())
+ if (!map || !theFile || !theFile->canRead())
return false;
sfileno filen;
// The are two ways an entry can get swap_filen: our get() locked it for
// reading or our storeSwapOutStart() locked it for writing. Peeking at our
- // locked entry is safe, but no support for reading a filling entry.
+ // locked entry is safe, but no support for reading the entry we swap out.
const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen);
if (!slot)
return NULL; // we were writing afterall
sio->swap_filen);
assert(slot->sameKey(static_cast<const cache_key*>(e.key)));
- assert(slot->basics.swap_file_sz > 0);
- // XXX: basics.swap_file_sz may grow for collapsed disk hits
- assert(slot->basics.swap_file_sz == e.swap_file_sz);
+ // For collapsed disk hits: e.swap_file_sz and slot->basics.swap_file_sz
+ // may still be zero and basics.swap_file_sz may grow.
+ assert(slot->basics.swap_file_sz >= e.swap_file_sz);
return sio;
}
if (errflag == DISK_OK && rlen > 0)
sio->offset_ += rlen;
- StoreIOState::STRCB *callb = sio->read.callback;
- assert(callb);
- sio->read.callback = NULL;
- void *cbdata;
- if (cbdataReferenceValidDone(sio->read.callback_data, &cbdata))
- callb(cbdata, r->buf, rlen, sio.getRaw());
+ sio->callReaderBack(r->buf, rlen);
}
void
// quit if somebody called IoState::close() while we were waiting
if (!sio.stillWaiting()) {
debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
+ noteFreeMapSlice(request->sidNext);
return;
}
+ // TODO: Fail if disk dropped one of the previous write requests.
+
if (errflag == DISK_OK) {
// do not increment sio.offset_ because we do it in sio->write()
slice.size = request->len - sizeof(DbCellHeader);
slice.next = request->sidNext;
- if (request->sidNext < 0) {
+ if (request->eof) {
+ assert(sio.e);
+ assert(sio.writeableAnchor_);
+ sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz =
+ sio.offset_;
+
// close, the entry gets the read lock
map->closeForWriting(sio.swap_filen, true);
sio.writeableAnchor_ = NULL;
sio.finishedWriting(errflag);
}
} else {
+ noteFreeMapSlice(request->sidNext);
+
writeError(*sio.e);
sio.finishedWriting(errflag);
// and hope that Core will call disconnect() to close the map entry