]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/fs/rock/RockIoState.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / fs / rock / RockIoState.cc
index f8e461eb832ecc85f8f90f96e01d5ccddb176986..94c627a11dd191f7043925f6c970b6fc9246b2eb 100644 (file)
@@ -1,32 +1,39 @@
 /*
- * DEBUG: section 79    Disk IO Routines
+ * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
+/* DEBUG: section 79    Disk IO Routines */
+
 #include "squid.h"
 #include "base/TextException.h"
+#include "CollapsedForwarding.h"
 #include "DiskIO/DiskIOModule.h"
 #include "DiskIO/DiskIOStrategy.h"
 #include "DiskIO/WriteRequest.h"
-#include "fs/rock/RockIoState.h"
 #include "fs/rock/RockIoRequests.h"
+#include "fs/rock/RockIoState.h"
 #include "fs/rock/RockSwapDir.h"
 #include "globals.h"
 #include "MemObject.h"
-#include "Mem.h"
 #include "Parsing.h"
+#include "Transients.h"
 
 Rock::IoState::IoState(Rock::SwapDir::Pointer &aDir,
                        StoreEntry *anEntry,
                        StoreIOState::STFNCB *cbFile,
                        StoreIOState::STIOCB *cbIo,
                        void *data):
-        readableAnchor_(NULL),
-        writeableAnchor_(NULL),
-        sidCurrent(-1),
-        dir(aDir),
-        slotSize(dir->slotSize),
-        objOffset(0),
-        theBuf(dir->slotSize)
+    readableAnchor_(NULL),
+    writeableAnchor_(NULL),
+    sidCurrent(-1),
+    dir(aDir),
+    slotSize(dir->slotSize),
+    objOffset(0),
+    theBuf(dir->slotSize)
 {
     e = anEntry;
     e->lock("rock I/O");
@@ -45,7 +52,7 @@ Rock::IoState::~IoState()
     // The dir map entry may still be open for reading at the point because
     // the map entry lock is associated with StoreEntry, not IoState.
     // assert(!readableAnchor_);
-    assert(!writeableAnchor_);
+    assert(shutting_down || !writeableAnchor_);
 
     if (callback_data)
         cbdataReferenceDone(callback_data);
@@ -98,24 +105,41 @@ Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data
         objOffset = 0;
     }
 
-    while (coreOff >= objOffset + currentReadableSlice().size) {
+    while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) {
         objOffset += currentReadableSlice().size;
         sidCurrent = currentReadableSlice().next;
-        assert(sidCurrent >= 0); // XXX: handle "read offset too big" error
     }
 
-    offset_ = coreOff;
-    len = min(len,
-        static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
-
     assert(read.callback == NULL);
     assert(read.callback_data == NULL);
     read.callback = cb;
     read.callback_data = cbdataReference(data);
 
+    // punt if read offset is too big (because of client bugs or collapsing)
+    if (sidCurrent < 0) {
+        debugs(79, 5, "no " << coreOff << " in " << *e);
+        callReaderBack(buf, 0);
+        return;
+    }
+
+    offset_ = coreOff;
+    len = min(len,
+              static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
     const uint64_t diskOffset = dir->diskOffset(sidCurrent);
     theFile->read(new ReadRequest(::ReadRequest(buf,
-        diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
+                                  diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
+}
+
+void
+Rock::IoState::callReaderBack(const char *buf, int rlen)
+{
+    debugs(79, 5, rlen << " bytes for " << *e);
+    StoreIOState::STRCB *callb = read.callback;
+    assert(callb);
+    read.callback = NULL;
+    void *cbdata;
+    if (cbdataReferenceValidDone(read.callback_data, &cbdata))
+        callb(cbdata, buf, rlen, this);
 }
 
 /// wraps tryWrite() to handle deep write failures centrally and safely
@@ -128,22 +152,25 @@ Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
         success = true;
     } catch (const std::exception &ex) { // TODO: should we catch ... as well?
         debugs(79, 2, "db write error: " << ex.what());
-        dir->writeError(swap_filen);
+        dir->writeError(*e);
         finishedWriting(DISK_ERROR);
         // 'this' might be gone beyond this point; fall through to free buf
     }
 
     // careful: 'this' might be gone here
+
     if (dtor)
         (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
 
     return success;
 }
 
-/** We only write data when full slot is accumulated or when close() is called.
- We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of
- the slot when the write does not end at the page or sector boundary. */
+/**
+ * Possibly send data to be written to disk:
+ * We only write data when full slot is accumulated or when close() is called.
+ * We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of
+ * the slot when the write does not end at the page or sector boundary.
+ */
 void
 Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
 {
@@ -152,7 +179,7 @@ Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
     // either this is the first write or append; we do not support write gaps
     assert(!coreOff || coreOff == -1);
 
-    // allocate the first slice diring the first write
+    // allocate the first slice during the first write
     if (!coreOff) {
         assert(sidCurrent < 0);
         sidCurrent = reserveSlotForWriting(); // throws on failures
@@ -175,15 +202,16 @@ Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
             const SlotId sidNext = reserveSlotForWriting(); // throws
             assert(sidNext >= 0);
             writeToDisk(sidNext);
+        } else if (Store::Root().transientReaders(*e)) {
+            // write partial buffer for all remote hit readers to see
+            writeBufToDisk(-1, false);
         }
     }
 
-    // XXX: check that there are workers waiting for data, i.e. readers > 0
-    writeBufToDisk();
 }
 
 /// Buffers incoming data for the current slot.
-/// Returns the number of bytes buffered.
+/// \return the number of bytes buffered
 size_t
 Rock::IoState::writeToBuffer(char const *buf, size_t size)
 {
@@ -210,19 +238,22 @@ Rock::IoState::writeToDisk(const SlotId sidNext)
     assert(theFile != NULL);
     assert(theBuf.size >= sizeof(DbCellHeader));
 
-    if (sidNext < 0) { // we are writing the last slot
-        e->swap_file_sz = offset_;
-        writeAnchor().basics.swap_file_sz = offset_; // would not hurt, right?
-    }
-
     // TODO: if DiskIO module is mmap-based, we should be writing whole pages
     // to avoid triggering read-page;new_head+old_tail;write-page overheads
 
-    // finalize map slice
-    Ipc::StoreMap::Slice &slice =
-        dir->map->writeableSlice(swap_filen, sidCurrent);
-    slice.next = sidNext;
-    slice.size = theBuf.size - sizeof(DbCellHeader);
+    writeBufToDisk(sidNext, sidNext < 0);
+    theBuf.clear();
+
+    sidCurrent = sidNext;
+}
+
+/// creates and submits a request to write current slot buffer to disk
+/// eof is true if and only this is the last slot
+void
+Rock::IoState::writeBufToDisk(const SlotId sidNext, bool eof)
+{
+    // no slots after the last/eof slot (but partial slots may have a nil next)
+    assert(!eof || sidNext < 0);
 
     // finalize db cell header
     DbCellHeader header;
@@ -230,21 +261,12 @@ Rock::IoState::writeToDisk(const SlotId sidNext)
     header.firstSlot = writeAnchor().start;
     header.nextSlot = sidNext;
     header.payloadSize = theBuf.size - sizeof(DbCellHeader);
-    header.entrySize = e->swap_file_sz; // may still be zero unless sidNext < 0
+    header.entrySize = eof ? offset_ : 0; // storeSwapOutFileClosed sets swap_file_sz after write
     header.version = writeAnchor().basics.timestamp;
 
     // copy finalized db cell header into buffer
     memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
 
-    writeBufToDisk(sidNext < 0);
-    theBuf.clear();
-
-    sidCurrent = sidNext;
-}
-
-void
-Rock::IoState::writeBufToDisk(const bool last)
-{
     // and now allocate another buffer for the WriteRequest so that
     // we can support concurrent WriteRequests (and to ease cleaning)
     // TODO: should we limit the number of outstanding requests?
@@ -258,7 +280,10 @@ Rock::IoState::writeBufToDisk(const bool last)
 
     WriteRequest *const r = new WriteRequest(
         ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
-            memFreeBufFunc(wBufCap)), this, last);
+                       memFreeBufFunc(wBufCap)), this);
+    r->sidCurrent = sidCurrent;
+    r->sidNext = sidNext;
+    r->eof = eof;
 
     // theFile->write may call writeCompleted immediatelly
     theFile->write(r);
@@ -283,7 +308,8 @@ void
 Rock::IoState::finishedWriting(const int errFlag)
 {
     // we incremented offset_ while accumulating data in write()
-    writeableAnchor_ = NULL;
+    // we do not reset writeableAnchor_ here because we still keep the lock
+    CollapsedForwarding::Broadcast(*e);
     callBack(errFlag);
 }
 
@@ -296,7 +322,7 @@ Rock::IoState::close(int how)
     if (!theFile) {
         debugs(79, 3, "I/O already canceled");
         assert(!callback);
-        assert(!writeableAnchor_);
+        // We keep writeableAnchor_ after callBack() on I/O errors.
         assert(!readableAnchor_);
         return;
     }
@@ -309,7 +335,7 @@ Rock::IoState::close(int how)
 
     case writerGone:
         assert(writeableAnchor_);
-        dir->writeError(swap_filen); // abort a partially stored entry
+        dir->writeError(*e); // abort a partially stored entry
         finishedWriting(DISK_ERROR);
         return;
 
@@ -325,20 +351,20 @@ class StoreIOStateCb: public CallDialer
 {
 public:
     StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio):
-            callback(NULL),
-            callback_data(NULL),
-            errflag(err),
-            sio(anSio) {
+        callback(NULL),
+        callback_data(NULL),
+        errflag(err),
+        sio(anSio) {
 
         callback = cb;
         callback_data = cbdataReference(data);
     }
 
     StoreIOStateCb(const StoreIOStateCb &cb):
-            callback(NULL),
-            callback_data(NULL),
-            errflag(cb.errflag),
-            sio(cb.sio) {
+        callback(NULL),
+        callback_data(NULL),
+        errflag(cb.errflag),
+        sio(cb.sio) {
 
         callback = cb.callback;
         callback_data = cbdataReference(cb.callback_data);
@@ -348,13 +374,13 @@ public:
         cbdataReferenceDone(callback_data); // may be nil already
     }
 
-    void dial(AsyncCall &call) {
+    void dial(AsyncCall &) {
         void *cbd;
         if (cbdataReferenceValidDone(callback_data, &cbd) && callback)
             callback(cbd, errflag, sio.getRaw());
     }
 
-    bool canDial(AsyncCall &call) const {
+    bool canDial(AsyncCall &) const {
         return cbdataReferenceValid(callback_data) && callback;
     }
 
@@ -363,7 +389,7 @@ public:
     }
 
 private:
-    StoreIOStateCb &operator =(const StoreIOStateCb &cb); // not defined
+    StoreIOStateCb &operator =(const StoreIOStateCb &); // not defined
 
     StoreIOState::STIOCB *callback;
     void *callback_data;