/*
- * DEBUG: section 79 Disk IO Routines
+ * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
*/
+/* DEBUG: section 79 Disk IO Routines */
+
#include "squid.h"
#include "base/TextException.h"
+#include "CollapsedForwarding.h"
#include "DiskIO/DiskIOModule.h"
#include "DiskIO/DiskIOStrategy.h"
#include "DiskIO/WriteRequest.h"
-#include "fs/rock/RockIoState.h"
#include "fs/rock/RockIoRequests.h"
+#include "fs/rock/RockIoState.h"
#include "fs/rock/RockSwapDir.h"
#include "globals.h"
#include "MemObject.h"
-#include "Mem.h"
#include "Parsing.h"
+#include "Transients.h"
Rock::IoState::IoState(Rock::SwapDir::Pointer &aDir,
StoreEntry *anEntry,
StoreIOState::STFNCB *cbFile,
StoreIOState::STIOCB *cbIo,
void *data):
- readableAnchor_(NULL),
- writeableAnchor_(NULL),
- sidCurrent(-1),
- dir(aDir),
- slotSize(dir->slotSize),
- objOffset(0),
- theBuf(dir->slotSize)
+ readableAnchor_(NULL),
+ writeableAnchor_(NULL),
+ sidCurrent(-1),
+ dir(aDir),
+ slotSize(dir->slotSize),
+ objOffset(0),
+ theBuf(dir->slotSize)
{
e = anEntry;
e->lock("rock I/O");
// The dir map entry may still be open for reading at the point because
// the map entry lock is associated with StoreEntry, not IoState.
// assert(!readableAnchor_);
- assert(!writeableAnchor_);
+ assert(shutting_down || !writeableAnchor_);
if (callback_data)
cbdataReferenceDone(callback_data);
objOffset = 0;
}
- while (coreOff >= objOffset + currentReadableSlice().size) {
+ while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) {
objOffset += currentReadableSlice().size;
sidCurrent = currentReadableSlice().next;
- assert(sidCurrent >= 0); // XXX: handle "read offset too big" error
}
- offset_ = coreOff;
- len = min(len,
- static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
-
assert(read.callback == NULL);
assert(read.callback_data == NULL);
read.callback = cb;
read.callback_data = cbdataReference(data);
+ // punt if read offset is too big (because of client bugs or collapsing)
+ if (sidCurrent < 0) {
+ debugs(79, 5, "no " << coreOff << " in " << *e);
+ callReaderBack(buf, 0);
+ return;
+ }
+
+ offset_ = coreOff;
+ len = min(len,
+ static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
const uint64_t diskOffset = dir->diskOffset(sidCurrent);
theFile->read(new ReadRequest(::ReadRequest(buf,
- diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
+ diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
+}
+
+void
+Rock::IoState::callReaderBack(const char *buf, int rlen)
+{
+ debugs(79, 5, rlen << " bytes for " << *e);
+ StoreIOState::STRCB *callb = read.callback;
+ assert(callb);
+ read.callback = NULL;
+ void *cbdata;
+ if (cbdataReferenceValidDone(read.callback_data, &cbdata))
+ callb(cbdata, buf, rlen, this);
}
/// wraps tryWrite() to handle deep write failures centrally and safely
success = true;
} catch (const std::exception &ex) { // TODO: should we catch ... as well?
debugs(79, 2, "db write error: " << ex.what());
- dir->writeError(swap_filen);
+ dir->writeError(*e);
finishedWriting(DISK_ERROR);
// 'this' might be gone beyond this point; fall through to free buf
}
// careful: 'this' might be gone here
-
+
if (dtor)
(dtor)(const_cast<char*>(buf)); // cast due to a broken API?
return success;
}
-/** We only write data when full slot is accumulated or when close() is called.
- We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of
- the slot when the write does not end at the page or sector boundary. */
+/**
+ * Possibly send data to be written to disk:
+ * We only write data when full slot is accumulated or when close() is called.
+ * We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of
+ * the slot when the write does not end at the page or sector boundary.
+ */
void
Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
{
// either this is the first write or append; we do not support write gaps
assert(!coreOff || coreOff == -1);
- // allocate the first slice diring the first write
+ // allocate the first slice during the first write
if (!coreOff) {
assert(sidCurrent < 0);
sidCurrent = reserveSlotForWriting(); // throws on failures
const SlotId sidNext = reserveSlotForWriting(); // throws
assert(sidNext >= 0);
writeToDisk(sidNext);
+ } else if (Store::Root().transientReaders(*e)) {
+ // write partial buffer for all remote hit readers to see
+ writeBufToDisk(-1, false);
}
}
- // XXX: check that there are workers waiting for data, i.e. readers > 0
- writeBufToDisk();
}
/// Buffers incoming data for the current slot.
-/// Returns the number of bytes buffered.
+/// \return the number of bytes buffered
size_t
Rock::IoState::writeToBuffer(char const *buf, size_t size)
{
assert(theFile != NULL);
assert(theBuf.size >= sizeof(DbCellHeader));
- if (sidNext < 0) { // we are writing the last slot
- e->swap_file_sz = offset_;
- writeAnchor().basics.swap_file_sz = offset_; // would not hurt, right?
- }
-
// TODO: if DiskIO module is mmap-based, we should be writing whole pages
// to avoid triggering read-page;new_head+old_tail;write-page overheads
- // finalize map slice
- Ipc::StoreMap::Slice &slice =
- dir->map->writeableSlice(swap_filen, sidCurrent);
- slice.next = sidNext;
- slice.size = theBuf.size - sizeof(DbCellHeader);
+ writeBufToDisk(sidNext, sidNext < 0);
+ theBuf.clear();
+
+ sidCurrent = sidNext;
+}
+
+/// creates and submits a request to write current slot buffer to disk
+/// eof is true if and only this is the last slot
+void
+Rock::IoState::writeBufToDisk(const SlotId sidNext, bool eof)
+{
+ // no slots after the last/eof slot (but partial slots may have a nil next)
+ assert(!eof || sidNext < 0);
// finalize db cell header
DbCellHeader header;
header.firstSlot = writeAnchor().start;
header.nextSlot = sidNext;
header.payloadSize = theBuf.size - sizeof(DbCellHeader);
- header.entrySize = e->swap_file_sz; // may still be zero unless sidNext < 0
+ header.entrySize = eof ? offset_ : 0; // storeSwapOutFileClosed sets swap_file_sz after write
header.version = writeAnchor().basics.timestamp;
// copy finalized db cell header into buffer
memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
- writeBufToDisk(sidNext < 0);
- theBuf.clear();
-
- sidCurrent = sidNext;
-}
-
-void
-Rock::IoState::writeBufToDisk(const bool last)
-{
// and now allocate another buffer for the WriteRequest so that
// we can support concurrent WriteRequests (and to ease cleaning)
// TODO: should we limit the number of outstanding requests?
WriteRequest *const r = new WriteRequest(
::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
- memFreeBufFunc(wBufCap)), this, last);
+ memFreeBufFunc(wBufCap)), this);
+ r->sidCurrent = sidCurrent;
+ r->sidNext = sidNext;
+ r->eof = eof;
// theFile->write may call writeCompleted immediatelly
theFile->write(r);
Rock::IoState::finishedWriting(const int errFlag)
{
// we incremented offset_ while accumulating data in write()
- writeableAnchor_ = NULL;
+ // we do not reset writeableAnchor_ here because we still keep the lock
+ CollapsedForwarding::Broadcast(*e);
callBack(errFlag);
}
if (!theFile) {
debugs(79, 3, "I/O already canceled");
assert(!callback);
- assert(!writeableAnchor_);
+ // We keep writeableAnchor_ after callBack() on I/O errors.
assert(!readableAnchor_);
return;
}
case writerGone:
assert(writeableAnchor_);
- dir->writeError(swap_filen); // abort a partially stored entry
+ dir->writeError(*e); // abort a partially stored entry
finishedWriting(DISK_ERROR);
return;
{
public:
StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio):
- callback(NULL),
- callback_data(NULL),
- errflag(err),
- sio(anSio) {
+ callback(NULL),
+ callback_data(NULL),
+ errflag(err),
+ sio(anSio) {
callback = cb;
callback_data = cbdataReference(data);
}
StoreIOStateCb(const StoreIOStateCb &cb):
- callback(NULL),
- callback_data(NULL),
- errflag(cb.errflag),
- sio(cb.sio) {
+ callback(NULL),
+ callback_data(NULL),
+ errflag(cb.errflag),
+ sio(cb.sio) {
callback = cb.callback;
callback_data = cbdataReference(cb.callback_data);
cbdataReferenceDone(callback_data); // may be nil already
}
- void dial(AsyncCall &call) {
+ void dial(AsyncCall &) {
void *cbd;
if (cbdataReferenceValidDone(callback_data, &cbd) && callback)
callback(cbd, errflag, sio.getRaw());
}
- bool canDial(AsyncCall &call) const {
+ bool canDial(AsyncCall &) const {
return cbdataReferenceValid(callback_data) && callback;
}
}
private:
- StoreIOStateCb &operator =(const StoreIOStateCb &cb); // not defined
+ StoreIOStateCb &operator =(const StoreIOStateCb &); // not defined
StoreIOState::STIOCB *callback;
void *callback_data;