]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/fs/rock/RockIoState.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / fs / rock / RockIoState.cc
index 3f89ea63b8cae4bc592c031d4f15d9f5a951f1d3..94c627a11dd191f7043925f6c970b6fc9246b2eb 100644 (file)
@@ -1,29 +1,43 @@
 /*
- * $Id$
+ * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
  *
- * DEBUG: section 79    Disk IO Routines
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
-#include "Parsing.h"
+/* DEBUG: section 79    Disk IO Routines */
+
+#include "squid.h"
+#include "base/TextException.h"
+#include "CollapsedForwarding.h"
 #include "DiskIO/DiskIOModule.h"
 #include "DiskIO/DiskIOStrategy.h"
 #include "DiskIO/WriteRequest.h"
-#include "fs/rock/RockIoState.h"
 #include "fs/rock/RockIoRequests.h"
+#include "fs/rock/RockIoState.h"
 #include "fs/rock/RockSwapDir.h"
-
-Rock::IoState::IoState(SwapDir *dir,
-    StoreEntry *anEntry,
-    StoreIOState::STFNCB *cbFile,
-    StoreIOState::STIOCB *cbIo,
-    void *data):
-    slotSize(0),
-    entrySize(0)
+#include "globals.h"
+#include "MemObject.h"
+#include "Parsing.h"
+#include "Transients.h"
+
+Rock::IoState::IoState(Rock::SwapDir::Pointer &aDir,
+                       StoreEntry *anEntry,
+                       StoreIOState::STFNCB *cbFile,
+                       StoreIOState::STIOCB *cbIo,
+                       void *data):
+    readableAnchor_(NULL),
+    writeableAnchor_(NULL),
+    sidCurrent(-1),
+    dir(aDir),
+    slotSize(dir->slotSize),
+    objOffset(0),
+    theBuf(dir->slotSize)
 {
     e = anEntry;
-    swap_filen = e->swap_filen;
-    swap_dirn = dir->index;
-    slotSize = dir->max_objsize;
+    e->lock("rock I/O");
+    // anchor, swap_filen, and swap_dirn are set by the caller
     file_callback = cbFile;
     callback = cbIo;
     callback_data = cbdataReference(data);
@@ -34,9 +48,17 @@ Rock::IoState::IoState(SwapDir *dir,
 Rock::IoState::~IoState()
 {
     --store_open_disk_fd;
+
+    // The dir map entry may still be open for reading at the point because
+    // the map entry lock is associated with StoreEntry, not IoState.
+    // assert(!readableAnchor_);
+    assert(shutting_down || !writeableAnchor_);
+
     if (callback_data)
         cbdataReferenceDone(callback_data);
     theFile = NULL;
+
+    e->unlock("rock I/O");
 }
 
 void
@@ -47,91 +69,285 @@ Rock::IoState::file(const RefCount<DiskFile> &aFile)
     theFile = aFile;
 }
 
+const Ipc::StoreMapAnchor &
+Rock::IoState::readAnchor() const
+{
+    assert(readableAnchor_);
+    return *readableAnchor_;
+}
+
+Ipc::StoreMapAnchor &
+Rock::IoState::writeAnchor()
+{
+    assert(writeableAnchor_);
+    return *writeableAnchor_;
+}
+
+/// convenience wrapper returning the map slot we are reading now
+const Ipc::StoreMapSlice &
+Rock::IoState::currentReadableSlice() const
+{
+    return dir->map->readableSlice(swap_filen, sidCurrent);
+}
+
 void
-Rock::IoState::read_(char *buf, size_t len, off_t off, STRCB *cb, void *data)
+Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
 {
+    debugs(79, 7, swap_filen << " reads from " << coreOff);
+
     assert(theFile != NULL);
-    assert(theFile->canRead());
+    assert(coreOff >= 0);
 
-    // Core specifies buffer length, but we must not exceed stored entry size
-    assert(off >= 0);
-    assert(entrySize >= 0);
-    const int64_t offset = static_cast<int64_t>(off);
-    assert(offset <= entrySize);
-    if (offset + (int64_t)len > entrySize)
-        len = entrySize - offset;
+    // if we are dealing with the first read or
+    // if the offset went backwords, start searching from the beginning
+    if (sidCurrent < 0 || coreOff < objOffset) {
+        sidCurrent = readAnchor().start;
+        objOffset = 0;
+    }
+
+    while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) {
+        objOffset += currentReadableSlice().size;
+        sidCurrent = currentReadableSlice().next;
+    }
 
     assert(read.callback == NULL);
     assert(read.callback_data == NULL);
     read.callback = cb;
     read.callback_data = cbdataReference(data);
 
-    theFile->read(new ReadRequest(::ReadRequest(buf, offset_ + offset, len), this));
+    // punt if read offset is too big (because of client bugs or collapsing)
+    if (sidCurrent < 0) {
+        debugs(79, 5, "no " << coreOff << " in " << *e);
+        callReaderBack(buf, 0);
+        return;
+    }
+
+    offset_ = coreOff;
+    len = min(len,
+              static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
+    const uint64_t diskOffset = dir->diskOffset(sidCurrent);
+    theFile->read(new ReadRequest(::ReadRequest(buf,
+                                  diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
 }
 
-// We only buffer data here; we actually write when close() is called.
-// We buffer, in part, to avoid forcing OS to _read_ old unwritten portions
-// of the slot when the write does not end at the page or sector boundary.
 void
-Rock::IoState::write(char const *buf, size_t size, off_t offset, FREE *dtor)
-{
-    // TODO: move to create?
-    if (!offset) {
-        assert(theBuf.isNull());
-        assert(entrySize >= 0);
-        theBuf.init(min(entrySize, slotSize), slotSize);
-    } else {
-        // Core uses -1 offset as "append". Sigh.
-        assert(offset == -1);
-        assert(!theBuf.isNull());
+Rock::IoState::callReaderBack(const char *buf, int rlen)
+{
+    debugs(79, 5, rlen << " bytes for " << *e);
+    StoreIOState::STRCB *callb = read.callback;
+    assert(callb);
+    read.callback = NULL;
+    void *cbdata;
+    if (cbdataReferenceValidDone(read.callback_data, &cbdata))
+        callb(cbdata, buf, rlen, this);
+}
+
+/// wraps tryWrite() to handle deep write failures centrally and safely
+bool
+Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
+{
+    bool success = false;
+    try {
+        tryWrite(buf, size, coreOff);
+        success = true;
+    } catch (const std::exception &ex) { // TODO: should we catch ... as well?
+        debugs(79, 2, "db write error: " << ex.what());
+        dir->writeError(*e);
+        finishedWriting(DISK_ERROR);
+        // 'this' might be gone beyond this point; fall through to free buf
     }
 
-    theBuf.append(buf, size);
+    // careful: 'this' might be gone here
 
     if (dtor)
         (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
+
+    return success;
+}
+
+/**
+ * Possibly send data to be written to disk:
+ * We only write data when full slot is accumulated or when close() is called.
+ * We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of
+ * the slot when the write does not end at the page or sector boundary.
+ */
+void
+Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
+{
+    debugs(79, 7, swap_filen << " writes " << size << " more");
+
+    // either this is the first write or append; we do not support write gaps
+    assert(!coreOff || coreOff == -1);
+
+    // allocate the first slice during the first write
+    if (!coreOff) {
+        assert(sidCurrent < 0);
+        sidCurrent = reserveSlotForWriting(); // throws on failures
+        assert(sidCurrent >= 0);
+        writeAnchor().start = sidCurrent;
+    }
+
+    // buffer incoming data in slot buffer and write overflowing or final slots
+    // quit when no data left or we stopped writing on reentrant error
+    while (size > 0 && theFile != NULL) {
+        assert(sidCurrent >= 0);
+        const size_t processed = writeToBuffer(buf, size);
+        buf += processed;
+        size -= processed;
+        const bool overflow = size > 0;
+
+        // We do not write a full buffer without overflow because
+        // we would not yet know what to set the nextSlot to.
+        if (overflow) {
+            const SlotId sidNext = reserveSlotForWriting(); // throws
+            assert(sidNext >= 0);
+            writeToDisk(sidNext);
+        } else if (Store::Root().transientReaders(*e)) {
+            // write partial buffer for all remote hit readers to see
+            writeBufToDisk(-1, false);
+        }
+    }
+
+}
+
+/// Buffers incoming data for the current slot.
+/// \return the number of bytes buffered
+size_t
+Rock::IoState::writeToBuffer(char const *buf, size_t size)
+{
+    // do not buffer a cell header for nothing
+    if (!size)
+        return 0;
+
+    if (!theBuf.size) {
+        // will fill the header in writeToDisk when the next slot is known
+        theBuf.appended(sizeof(DbCellHeader));
+    }
+
+    size_t forCurrentSlot = min(size, static_cast<size_t>(theBuf.spaceSize()));
+    theBuf.append(buf, forCurrentSlot);
+    offset_ += forCurrentSlot; // so that Core thinks we wrote it
+    return forCurrentSlot;
 }
 
-// write what was buffered during write() calls
+/// write what was buffered during write() calls
+/// negative sidNext means this is the last write request for this entry
 void
-Rock::IoState::startWriting()
+Rock::IoState::writeToDisk(const SlotId sidNext)
 {
     assert(theFile != NULL);
-    assert(theFile->canWrite());
-    assert(!theBuf.isNull());
+    assert(theBuf.size >= sizeof(DbCellHeader));
 
     // TODO: if DiskIO module is mmap-based, we should be writing whole pages
     // to avoid triggering read-page;new_head+old_tail;write-page overheads
 
-    debugs(79, 5, HERE << swap_filen << " at " << offset_ << '+' <<
-        theBuf.contentSize());
+    writeBufToDisk(sidNext, sidNext < 0);
+    theBuf.clear();
+
+    sidCurrent = sidNext;
+}
+
+/// creates and submits a request to write current slot buffer to disk
+/// eof is true if and only this is the last slot
+void
+Rock::IoState::writeBufToDisk(const SlotId sidNext, bool eof)
+{
+    // no slots after the last/eof slot (but partial slots may have a nil next)
+    assert(!eof || sidNext < 0);
+
+    // finalize db cell header
+    DbCellHeader header;
+    memcpy(header.key, e->key, sizeof(header.key));
+    header.firstSlot = writeAnchor().start;
+    header.nextSlot = sidNext;
+    header.payloadSize = theBuf.size - sizeof(DbCellHeader);
+    header.entrySize = eof ? offset_ : 0; // storeSwapOutFileClosed sets swap_file_sz after write
+    header.version = writeAnchor().basics.timestamp;
+
+    // copy finalized db cell header into buffer
+    memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
+
+    // and now allocate another buffer for the WriteRequest so that
+    // we can support concurrent WriteRequests (and to ease cleaning)
+    // TODO: should we limit the number of outstanding requests?
+    size_t wBufCap = 0;
+    void *wBuf = memAllocBuf(theBuf.size, &wBufCap);
+    memcpy(wBuf, theBuf.mem, theBuf.size);
+
+    const uint64_t diskOffset = dir->diskOffset(sidCurrent);
+    debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
+           theBuf.size);
+
+    WriteRequest *const r = new WriteRequest(
+        ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
+                       memFreeBufFunc(wBufCap)), this);
+    r->sidCurrent = sidCurrent;
+    r->sidNext = sidNext;
+    r->eof = eof;
 
-    assert(theBuf.contentSize() <= slotSize);
     // theFile->write may call writeCompleted immediatelly
-    theFile->write(new WriteRequest(::WriteRequest(theBuf.content(), offset_,
-        theBuf.contentSize(), theBuf.freeFunc()), this));
+    theFile->write(r);
+}
+
+/// finds and returns a free db slot to fill or throws
+Rock::SlotId
+Rock::IoState::reserveSlotForWriting()
+{
+    Ipc::Mem::PageId pageId;
+    if (dir->useFreeSlot(pageId))
+        return pageId.number-1;
+
+    // This may happen when the number of available db slots is close to the
+    // number of concurrent requests reading or writing those slots, which may
+    // happen when the db is "small" compared to the request traffic OR when we
+    // are rebuilding and have not loaded "many" entries or empty slots yet.
+    throw TexcHere("ran out of free db slots");
 }
 
-// 
 void
 Rock::IoState::finishedWriting(const int errFlag)
 {
+    // we incremented offset_ while accumulating data in write()
+    // we do not reset writeableAnchor_ here because we still keep the lock
+    CollapsedForwarding::Broadcast(*e);
     callBack(errFlag);
 }
 
 void
-Rock::IoState::close()
+Rock::IoState::close(int how)
 {
-    debugs(79, 3, HERE << swap_filen << " at " << offset_);
-    if (!theBuf.isNull())
-        startWriting();
-    else
+    debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how <<
+           " buf: " << theBuf.size << " callback: " << callback);
+
+    if (!theFile) {
+        debugs(79, 3, "I/O already canceled");
+        assert(!callback);
+        // We keep writeableAnchor_ after callBack() on I/O errors.
+        assert(!readableAnchor_);
+        return;
+    }
+
+    switch (how) {
+    case wroteAll:
+        assert(theBuf.size > 0); // we never flush last bytes on our own
+        writeToDisk(-1); // flush last, yet unwritten slot to disk
+        return; // writeCompleted() will callBack()
+
+    case writerGone:
+        assert(writeableAnchor_);
+        dir->writeError(*e); // abort a partially stored entry
+        finishedWriting(DISK_ERROR);
+        return;
+
+    case readerDone:
         callBack(0);
+        return;
+    }
 }
 
-/// close callback (STIOCB) dialer: breaks dependencies and 
+/// close callback (STIOCB) dialer: breaks dependencies and
 /// counts IOState concurrency level
-class StoreIOStateCb: public CallDialer 
+class StoreIOStateCb: public CallDialer
 {
 public:
     StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio):
@@ -158,13 +374,13 @@ public:
         cbdataReferenceDone(callback_data); // may be nil already
     }
 
-    void dial(AsyncCall &call) {
+    void dial(AsyncCall &) {
         void *cbd;
         if (cbdataReferenceValidDone(callback_data, &cbd) && callback)
             callback(cbd, errflag, sio.getRaw());
     }
 
-    bool canDial(AsyncCall &call) const {
+    bool canDial(AsyncCall &) const {
         return cbdataReferenceValid(callback_data) && callback;
     }
 
@@ -173,7 +389,7 @@ public:
     }
 
 private:
-    StoreIOStateCb &operator =(const StoreIOStateCb &cb); // not defined
+    StoreIOStateCb &operator =(const StoreIOStateCb &); // not defined
 
     StoreIOState::STIOCB *callback;
     void *callback_data;
@@ -181,7 +397,6 @@ private:
     Rock::IoState::Pointer sio;
 };
 
-
 void
 Rock::IoState::callBack(int errflag)
 {
@@ -189,7 +404,7 @@ Rock::IoState::callBack(int errflag)
     theFile = NULL;
 
     AsyncCall::Pointer call = asyncCall(79,3, "SomeIoStateCloseCb",
-        StoreIOStateCb(callback, callback_data, errflag, this));
+                                        StoreIOStateCb(callback, callback_data, errflag, this));
     ScheduleCallHere(call);
 
     callback = NULL;