/*
- * $Id$
+ * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
*
- * DEBUG: section 79 Disk IO Routines
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
*/
-#include "config.h"
-#include "MemObject.h"
-#include "Parsing.h"
+/* DEBUG: section 79 Disk IO Routines */
+
+#include "squid.h"
+#include "base/TextException.h"
+#include "CollapsedForwarding.h"
#include "DiskIO/DiskIOModule.h"
#include "DiskIO/DiskIOStrategy.h"
#include "DiskIO/WriteRequest.h"
-#include "fs/rock/RockIoState.h"
#include "fs/rock/RockIoRequests.h"
+#include "fs/rock/RockIoState.h"
#include "fs/rock/RockSwapDir.h"
+#include "globals.h"
+#include "MemObject.h"
+#include "Parsing.h"
+#include "Transients.h"
-Rock::IoState::IoState(SwapDir *dir,
+Rock::IoState::IoState(Rock::SwapDir::Pointer &aDir,
StoreEntry *anEntry,
StoreIOState::STFNCB *cbFile,
StoreIOState::STIOCB *cbIo,
void *data):
- slotSize(0),
- diskOffset(-1),
- payloadEnd(-1)
+ readableAnchor_(NULL),
+ writeableAnchor_(NULL),
+ sidCurrent(-1),
+ dir(aDir),
+ slotSize(dir->slotSize),
+ objOffset(0),
+ theBuf(dir->slotSize)
{
e = anEntry;
- // swap_filen, swap_dirn, diskOffset, and payloadEnd are set by the caller
- slotSize = dir->max_objsize;
+ e->lock("rock I/O");
+ // anchor, swap_filen, and swap_dirn are set by the caller
file_callback = cbFile;
callback = cbIo;
callback_data = cbdataReference(data);
Rock::IoState::~IoState()
{
--store_open_disk_fd;
+
+ // The dir map entry may still be open for reading at the point because
+ // the map entry lock is associated with StoreEntry, not IoState.
+ // assert(!readableAnchor_);
+ assert(shutting_down || !writeableAnchor_);
+
if (callback_data)
cbdataReferenceDone(callback_data);
theFile = NULL;
+
+ e->unlock("rock I/O");
}
void
theFile = aFile;
}
+const Ipc::StoreMapAnchor &
+Rock::IoState::readAnchor() const
+{
+ assert(readableAnchor_);
+ return *readableAnchor_;
+}
+
+Ipc::StoreMapAnchor &
+Rock::IoState::writeAnchor()
+{
+ assert(writeableAnchor_);
+ return *writeableAnchor_;
+}
+
+/// convenience wrapper returning the map slot we are reading now
+const Ipc::StoreMapSlice &
+Rock::IoState::currentReadableSlice() const
+{
+ return dir->map->readableSlice(swap_filen, sidCurrent);
+}
+
void
Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
{
+ debugs(79, 7, swap_filen << " reads from " << coreOff);
+
assert(theFile != NULL);
assert(coreOff >= 0);
- offset_ = coreOff;
- // we skip our cell header; it is only read when building the map
- const int64_t cellOffset = sizeof(DbCellHeader) +
- static_cast<int64_t>(coreOff);
- assert(cellOffset <= payloadEnd);
+ // if we are dealing with the first read or
+ // if the offset went backwords, start searching from the beginning
+ if (sidCurrent < 0 || coreOff < objOffset) {
+ sidCurrent = readAnchor().start;
+ objOffset = 0;
+ }
- // Core specifies buffer length, but we must not exceed stored entry size
- if (cellOffset + (int64_t)len > payloadEnd)
- len = payloadEnd - cellOffset;
+ while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) {
+ objOffset += currentReadableSlice().size;
+ sidCurrent = currentReadableSlice().next;
+ }
assert(read.callback == NULL);
assert(read.callback_data == NULL);
read.callback = cb;
read.callback_data = cbdataReference(data);
- theFile->read(new ReadRequest(
- ::ReadRequest(buf, diskOffset + cellOffset, len), this));
+ // punt if read offset is too big (because of client bugs or collapsing)
+ if (sidCurrent < 0) {
+ debugs(79, 5, "no " << coreOff << " in " << *e);
+ callReaderBack(buf, 0);
+ return;
+ }
+
+ offset_ = coreOff;
+ len = min(len,
+ static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
+ const uint64_t diskOffset = dir->diskOffset(sidCurrent);
+ theFile->read(new ReadRequest(::ReadRequest(buf,
+ diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
}
-// We only buffer data here; we actually write when close() is called.
-// We buffer, in part, to avoid forcing OS to _read_ old unwritten portions
-// of the slot when the write does not end at the page or sector boundary.
void
+Rock::IoState::callReaderBack(const char *buf, int rlen)
+{
+ debugs(79, 5, rlen << " bytes for " << *e);
+ StoreIOState::STRCB *callb = read.callback;
+ assert(callb);
+ read.callback = NULL;
+ void *cbdata;
+ if (cbdataReferenceValidDone(read.callback_data, &cbdata))
+ callb(cbdata, buf, rlen, this);
+}
+
+/// wraps tryWrite() to handle deep write failures centrally and safely
+bool
Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
{
- // TODO: move to create?
- if (!coreOff) {
- assert(theBuf.isNull());
- assert(payloadEnd <= slotSize);
- theBuf.init(min(payloadEnd, slotSize), slotSize);
- // start with our header; TODO: consider making it a trailer
- DbCellHeader header;
- assert(static_cast<int64_t>(sizeof(header)) <= payloadEnd);
- header.payloadSize = payloadEnd - sizeof(header);
- theBuf.append(reinterpret_cast<const char*>(&header), sizeof(header));
- } else {
- // Core uses -1 offset as "append". Sigh.
- assert(coreOff == -1);
- assert(!theBuf.isNull());
+ bool success = false;
+ try {
+ tryWrite(buf, size, coreOff);
+ success = true;
+ } catch (const std::exception &ex) { // TODO: should we catch ... as well?
+ debugs(79, 2, "db write error: " << ex.what());
+ dir->writeError(*e);
+ finishedWriting(DISK_ERROR);
+ // 'this' might be gone beyond this point; fall through to free buf
}
- theBuf.append(buf, size);
- offset_ += size; // so that Core thinks we wrote it
+ // careful: 'this' might be gone here
if (dtor)
(dtor)(const_cast<char*>(buf)); // cast due to a broken API?
+
+ return success;
}
-// write what was buffered during write() calls
+/**
+ * Possibly send data to be written to disk:
+ * We only write data when full slot is accumulated or when close() is called.
+ * We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of
+ * the slot when the write does not end at the page or sector boundary.
+ */
void
-Rock::IoState::startWriting()
+Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
+{
+ debugs(79, 7, swap_filen << " writes " << size << " more");
+
+ // either this is the first write or append; we do not support write gaps
+ assert(!coreOff || coreOff == -1);
+
+ // allocate the first slice during the first write
+ if (!coreOff) {
+ assert(sidCurrent < 0);
+ sidCurrent = reserveSlotForWriting(); // throws on failures
+ assert(sidCurrent >= 0);
+ writeAnchor().start = sidCurrent;
+ }
+
+ // buffer incoming data in slot buffer and write overflowing or final slots
+ // quit when no data left or we stopped writing on reentrant error
+ while (size > 0 && theFile != NULL) {
+ assert(sidCurrent >= 0);
+ const size_t processed = writeToBuffer(buf, size);
+ buf += processed;
+ size -= processed;
+ const bool overflow = size > 0;
+
+ // We do not write a full buffer without overflow because
+ // we would not yet know what to set the nextSlot to.
+ if (overflow) {
+ const SlotId sidNext = reserveSlotForWriting(); // throws
+ assert(sidNext >= 0);
+ writeToDisk(sidNext);
+ } else if (Store::Root().transientReaders(*e)) {
+ // write partial buffer for all remote hit readers to see
+ writeBufToDisk(-1, false);
+ }
+ }
+
+}
+
+/// Buffers incoming data for the current slot.
+/// \return the number of bytes buffered
+size_t
+Rock::IoState::writeToBuffer(char const *buf, size_t size)
+{
+ // do not buffer a cell header for nothing
+ if (!size)
+ return 0;
+
+ if (!theBuf.size) {
+ // will fill the header in writeToDisk when the next slot is known
+ theBuf.appended(sizeof(DbCellHeader));
+ }
+
+ size_t forCurrentSlot = min(size, static_cast<size_t>(theBuf.spaceSize()));
+ theBuf.append(buf, forCurrentSlot);
+ offset_ += forCurrentSlot; // so that Core thinks we wrote it
+ return forCurrentSlot;
+}
+
+/// write what was buffered during write() calls
+/// negative sidNext means this is the last write request for this entry
+void
+Rock::IoState::writeToDisk(const SlotId sidNext)
{
assert(theFile != NULL);
- assert(!theBuf.isNull());
+ assert(theBuf.size >= sizeof(DbCellHeader));
// TODO: if DiskIO module is mmap-based, we should be writing whole pages
// to avoid triggering read-page;new_head+old_tail;write-page overheads
+ writeBufToDisk(sidNext, sidNext < 0);
+ theBuf.clear();
+
+ sidCurrent = sidNext;
+}
+
+/// creates and submits a request to write current slot buffer to disk
+/// eof is true if and only this is the last slot
+void
+Rock::IoState::writeBufToDisk(const SlotId sidNext, bool eof)
+{
+ // no slots after the last/eof slot (but partial slots may have a nil next)
+ assert(!eof || sidNext < 0);
+
+ // finalize db cell header
+ DbCellHeader header;
+ memcpy(header.key, e->key, sizeof(header.key));
+ header.firstSlot = writeAnchor().start;
+ header.nextSlot = sidNext;
+ header.payloadSize = theBuf.size - sizeof(DbCellHeader);
+ header.entrySize = eof ? offset_ : 0; // storeSwapOutFileClosed sets swap_file_sz after write
+ header.version = writeAnchor().basics.timestamp;
+
+ // copy finalized db cell header into buffer
+ memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
+
+ // and now allocate another buffer for the WriteRequest so that
+ // we can support concurrent WriteRequests (and to ease cleaning)
+ // TODO: should we limit the number of outstanding requests?
+ size_t wBufCap = 0;
+ void *wBuf = memAllocBuf(theBuf.size, &wBufCap);
+ memcpy(wBuf, theBuf.mem, theBuf.size);
+
+ const uint64_t diskOffset = dir->diskOffset(sidCurrent);
debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
- theBuf.contentSize());
+ theBuf.size);
+
+ WriteRequest *const r = new WriteRequest(
+ ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
+ memFreeBufFunc(wBufCap)), this);
+ r->sidCurrent = sidCurrent;
+ r->sidNext = sidNext;
+ r->eof = eof;
- assert(theBuf.contentSize() <= slotSize);
// theFile->write may call writeCompleted immediatelly
- theFile->write(new WriteRequest(::WriteRequest(theBuf.content(),
- diskOffset, theBuf.contentSize(), theBuf.freeFunc()), this));
+ theFile->write(r);
+}
+
+/// finds and returns a free db slot to fill or throws
+Rock::SlotId
+Rock::IoState::reserveSlotForWriting()
+{
+ Ipc::Mem::PageId pageId;
+ if (dir->useFreeSlot(pageId))
+ return pageId.number-1;
+
+ // This may happen when the number of available db slots is close to the
+ // number of concurrent requests reading or writing those slots, which may
+ // happen when the db is "small" compared to the request traffic OR when we
+ // are rebuilding and have not loaded "many" entries or empty slots yet.
+ throw TexcHere("ran out of free db slots");
}
-//
void
Rock::IoState::finishedWriting(const int errFlag)
{
// we incremented offset_ while accumulating data in write()
+ // we do not reset writeableAnchor_ here because we still keep the lock
+ CollapsedForwarding::Broadcast(*e);
callBack(errFlag);
}
void
Rock::IoState::close(int how)
{
- debugs(79, 3, HERE << swap_filen << " accumulated: " << offset_ <<
- " how=" << how);
- if (how == wroteAll && !theBuf.isNull())
- startWriting();
- else
- callBack(how == writerGone ? DISK_ERROR : 0); // TODO: add DISK_CALLER_GONE
+ debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how <<
+ " buf: " << theBuf.size << " callback: " << callback);
+
+ if (!theFile) {
+ debugs(79, 3, "I/O already canceled");
+ assert(!callback);
+ // We keep writeableAnchor_ after callBack() on I/O errors.
+ assert(!readableAnchor_);
+ return;
+ }
+
+ switch (how) {
+ case wroteAll:
+ assert(theBuf.size > 0); // we never flush last bytes on our own
+ writeToDisk(-1); // flush last, yet unwritten slot to disk
+ return; // writeCompleted() will callBack()
+
+ case writerGone:
+ assert(writeableAnchor_);
+ dir->writeError(*e); // abort a partially stored entry
+ finishedWriting(DISK_ERROR);
+ return;
+
+ case readerDone:
+ callBack(0);
+ return;
+ }
}
/// close callback (STIOCB) dialer: breaks dependencies and
{
public:
StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio):
- callback(NULL),
- callback_data(NULL),
- errflag(err),
- sio(anSio) {
+ callback(NULL),
+ callback_data(NULL),
+ errflag(err),
+ sio(anSio) {
callback = cb;
callback_data = cbdataReference(data);
}
StoreIOStateCb(const StoreIOStateCb &cb):
- callback(NULL),
- callback_data(NULL),
- errflag(cb.errflag),
- sio(cb.sio) {
+ callback(NULL),
+ callback_data(NULL),
+ errflag(cb.errflag),
+ sio(cb.sio) {
callback = cb.callback;
callback_data = cbdataReference(cb.callback_data);
cbdataReferenceDone(callback_data); // may be nil already
}
- void dial(AsyncCall &call) {
+ void dial(AsyncCall &) {
void *cbd;
if (cbdataReferenceValidDone(callback_data, &cbd) && callback)
callback(cbd, errflag, sio.getRaw());
}
- bool canDial(AsyncCall &call) const {
+ bool canDial(AsyncCall &) const {
return cbdataReferenceValid(callback_data) && callback;
}
}
private:
- StoreIOStateCb &operator =(const StoreIOStateCb &cb); // not defined
+ StoreIOStateCb &operator =(const StoreIOStateCb &); // not defined
StoreIOState::STIOCB *callback;
void *callback_data;
Rock::IoState::Pointer sio;
};
-
void
Rock::IoState::callBack(int errflag)
{