]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/fs/rock/RockSwapDir.cc
Source Format Enforcement (#532)
[thirdparty/squid.git] / src / fs / rock / RockSwapDir.cc
index 14ad992754fa2a63c3d23778a64cb9b1a6320545..ac324aac5fc0faa904f831fdb3fa7d20224cac3c 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
+ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
@@ -16,6 +16,7 @@
 #include "DiskIO/DiskIOStrategy.h"
 #include "DiskIO/ReadRequest.h"
 #include "DiskIO/WriteRequest.h"
+#include "fs/rock/RockHeaderUpdater.h"
 #include "fs/rock/RockIoRequests.h"
 #include "fs/rock/RockIoState.h"
 #include "fs/rock/RockRebuild.h"
@@ -51,19 +52,6 @@ Rock::SwapDir::~SwapDir()
     safe_free(filePath);
 }
 
-StoreSearch *
-Rock::SwapDir::search(String const, HttpRequest *)
-{
-    assert(false);
-    return NULL; // XXX: implement
-}
-
-void
-Rock::SwapDir::get(String const key, STOREGETCLIENT cb, void *data)
-{
-    ::SwapDir::get(key, cb, data);
-}
-
 // called when Squid core needs a StoreEntry with a given key
 StoreEntry *
 Rock::SwapDir::get(const cache_key *key)
@@ -78,90 +66,68 @@ Rock::SwapDir::get(const cache_key *key)
 
     // create a brand new store entry and initialize it with stored basics
     StoreEntry *e = new StoreEntry();
+    e->createMemObject();
     anchorEntry(*e, filen, *slot);
-
-    e->hashInsert(key);
     trackReferences(*e);
-
     return e;
-    // the disk entry remains open for reading, protected from modifications
 }
 
 bool
-Rock::SwapDir::anchorCollapsed(StoreEntry &collapsed, bool &inSync)
+Rock::SwapDir::anchorToCache(StoreEntry &entry, bool &inSync)
 {
     if (!map || !theFile || !theFile->canRead())
         return false;
 
     sfileno filen;
     const Ipc::StoreMapAnchor *const slot = map->openForReading(
-            reinterpret_cast<cache_key*>(collapsed.key), filen);
+            reinterpret_cast<cache_key*>(entry.key), filen);
     if (!slot)
         return false;
 
-    anchorEntry(collapsed, filen, *slot);
-    inSync = updateCollapsedWith(collapsed, *slot);
+    anchorEntry(entry, filen, *slot);
+    inSync = updateAnchoredWith(entry, *slot);
     return true; // even if inSync is false
 }
 
 bool
-Rock::SwapDir::updateCollapsed(StoreEntry &collapsed)
+Rock::SwapDir::updateAnchored(StoreEntry &entry)
 {
     if (!map || !theFile || !theFile->canRead())
         return false;
 
-    if (collapsed.swap_filen < 0) // no longer using a disk cache
-        return true;
-    assert(collapsed.swap_dirn == index);
+    assert(entry.hasDisk(index));
 
-    const Ipc::StoreMapAnchor &s = map->readableEntry(collapsed.swap_filen);
-    return updateCollapsedWith(collapsed, s);
+    const Ipc::StoreMapAnchor &s = map->readableEntry(entry.swap_filen);
+    return updateAnchoredWith(entry, s);
 }
 
 bool
-Rock::SwapDir::updateCollapsedWith(StoreEntry &collapsed, const Ipc::StoreMapAnchor &anchor)
+Rock::SwapDir::updateAnchoredWith(StoreEntry &entry, const Ipc::StoreMapAnchor &anchor)
 {
-    collapsed.swap_file_sz = anchor.basics.swap_file_sz;
+    entry.swap_file_sz = anchor.basics.swap_file_sz;
     return true;
 }
 
 void
 Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor)
 {
-    const Ipc::StoreMapAnchor::Basics &basics = anchor.basics;
+    anchor.exportInto(e);
 
-    e.swap_file_sz = basics.swap_file_sz;
-    e.lastref = basics.lastref;
-    e.timestamp = basics.timestamp;
-    e.expires = basics.expires;
-    e.lastmod = basics.lastmod;
-    e.refcount = basics.refcount;
-    e.flags = basics.flags;
-
-    if (anchor.complete()) {
-        e.store_status = STORE_OK;
-        e.swap_status = SWAPOUT_DONE;
-    } else {
-        e.store_status = STORE_PENDING;
-        e.swap_status = SWAPOUT_WRITING; // even though another worker writes?
-    }
+    const bool complete = anchor.complete();
+    e.store_status = complete ? STORE_OK : STORE_PENDING;
+    // SWAPOUT_WRITING: even though another worker writes?
+    e.attachToDisk(index, filen, complete ? SWAPOUT_DONE : SWAPOUT_WRITING);
 
     e.ping_status = PING_NONE;
 
-    EBIT_CLR(e.flags, RELEASE_REQUEST);
-    EBIT_CLR(e.flags, KEY_PRIVATE);
     EBIT_SET(e.flags, ENTRY_VALIDATED);
-
-    e.swap_dirn = index;
-    e.swap_filen = filen;
 }
 
 void Rock::SwapDir::disconnect(StoreEntry &e)
 {
-    assert(e.swap_dirn == index);
-    assert(e.swap_filen >= 0);
-    // cannot have SWAPOUT_NONE entry with swap_filen >= 0
-    assert(e.swap_status != SWAPOUT_NONE);
+    assert(e.hasDisk(index));
+
+    ignoreReferences(e);
 
     // do not rely on e.swap_status here because there is an async delay
     // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
@@ -173,16 +139,12 @@ void Rock::SwapDir::disconnect(StoreEntry &e)
     if (e.mem_obj && e.mem_obj->swapout.sio != NULL &&
             dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_) {
         map->abortWriting(e.swap_filen);
-        e.swap_dirn = -1;
-        e.swap_filen = -1;
-        e.swap_status = SWAPOUT_NONE;
+        e.detachFromDisk();
         dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_ = NULL;
-        Store::Root().transientsAbandon(e); // broadcasts after the change
+        Store::Root().stopSharing(e); // broadcasts after the change
     } else {
         map->closeForReading(e.swap_filen);
-        e.swap_dirn = -1;
-        e.swap_filen = -1;
-        e.swap_status = SWAPOUT_NONE;
+        e.detachFromDisk();
     }
 }
 
@@ -210,9 +172,16 @@ Rock::SwapDir::doReportStat() const
 }
 
 void
-Rock::SwapDir::swappedOut(const StoreEntry &)
+Rock::SwapDir::finalizeSwapoutSuccess(const StoreEntry &)
 {
-    // stats are not stored but computed when needed
+    // nothing to do
+}
+
+void
+Rock::SwapDir::finalizeSwapoutFailure(StoreEntry &entry)
+{
+    debugs(47, 5, entry);
+    disconnect(entry); // calls abortWriting() to free the disk entry
 }
 
 int64_t
@@ -303,8 +272,9 @@ Rock::SwapDir::create()
 void
 Rock::SwapDir::createError(const char *const msg)
 {
+    int xerrno = errno; // XXX: where does errno come from?
     debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " <<
-           filePath << "; " << msg << " error: " << xstrerror());
+           filePath << "; " << msg << " error: " << xstrerr(xerrno));
     fatal("Rock Store db creation error");
 }
 
@@ -441,14 +411,17 @@ Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconf
     else
         return false;
 
-    if (!value)
+    if (!value) {
         self_destruct();
+        return false;
+    }
 
     // TODO: handle time units and detect parsing errors better
     const int64_t parsedValue = strtoll(value, NULL, 10);
     if (parsedValue < 0) {
         debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
         self_destruct();
+        return false;
     }
 
     const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
@@ -483,14 +456,17 @@ Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaRec
     else
         return false;
 
-    if (!value)
+    if (!value) {
         self_destruct();
+        return false;
+    }
 
     // TODO: handle time units and detect parsing errors better
     const int64_t parsedValue = strtoll(value, NULL, 10);
     if (parsedValue < 0) {
         debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
         self_destruct();
+        return false;
     }
 
     const int newRate = static_cast<int>(parsedValue);
@@ -498,6 +474,7 @@ Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaRec
     if (newRate < 0) {
         debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
         self_destruct();
+        return false;
     }
 
     if (!isaReconfig)
@@ -529,19 +506,23 @@ Rock::SwapDir::parseSizeOption(char const *option, const char *value, int reconf
     else
         return false;
 
-    if (!value)
+    if (!value) {
         self_destruct();
+        return false;
+    }
 
     // TODO: handle size units and detect parsing errors better
     const uint64_t newSize = strtoll(value, NULL, 10);
     if (newSize <= 0) {
         debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must be positive; got: " << newSize);
         self_destruct();
+        return false;
     }
 
     if (newSize <= sizeof(DbCellHeader)) {
         debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must exceed " << sizeof(DbCellHeader) << "; got: " << newSize);
         self_destruct();
+        return false;
     }
 
     if (!reconfig)
@@ -615,7 +596,9 @@ Rock::SwapDir::rebuild()
 bool
 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
 {
-    if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load))
+    if (diskSpaceNeeded >= 0)
+        diskSpaceNeeded += sizeof(DbCellHeader);
+    if (!::SwapDir::canStore(e, diskSpaceNeeded, load))
         return false;
 
     if (!theFile || !theFile->canWrite())
@@ -679,6 +662,33 @@ Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreI
     return sio;
 }
 
+StoreIOState::Pointer
+Rock::SwapDir::createUpdateIO(const Ipc::StoreMapUpdate &update, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
+{
+    if (!theFile || theFile->error()) {
+        debugs(47,4, theFile);
+        return nullptr;
+    }
+
+    Must(update.fresh);
+    Must(update.fresh.fileNo >= 0);
+
+    Rock::SwapDir::Pointer self(this);
+    IoState *sio = new IoState(self, update.entry, cbFile, cbIo, data);
+
+    sio->swap_dirn = index;
+    sio->swap_filen = update.fresh.fileNo;
+    sio->writeableAnchor_ = update.fresh.anchor;
+
+    debugs(47,5, "dir " << index << " updating filen " <<
+           std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
+           sio->swap_filen << std::dec << " starting at " <<
+           diskOffset(sio->swap_filen));
+
+    sio->file(theFile);
+    return sio;
+}
+
 int64_t
 Rock::SwapDir::diskOffset(const SlotId sid) const
 {
@@ -700,12 +710,16 @@ Rock::SwapDir::diskOffsetLimit() const
     return diskOffset(map->sliceLimit());
 }
 
-bool
-Rock::SwapDir::useFreeSlot(Ipc::Mem::PageId &pageId)
+Rock::SlotId
+Rock::SwapDir::reserveSlotForWriting()
 {
+    Ipc::Mem::PageId pageId;
+
     if (freeSlots->pop(pageId)) {
-        debugs(47, 5, "got a previously free slot: " << pageId);
-        return true;
+        const auto slotId = pageId.number - 1;
+        debugs(47, 5, "got a previously free slot: " << slotId);
+        map->prepFreeSlice(slotId);
+        return slotId;
     }
 
     // catch free slots delivered to noteFreeMapSlice()
@@ -714,14 +728,20 @@ Rock::SwapDir::useFreeSlot(Ipc::Mem::PageId &pageId)
     if (map->purgeOne()) {
         assert(!waitingForPage); // noteFreeMapSlice() should have cleared it
         assert(pageId.set());
-        debugs(47, 5, "got a previously busy slot: " << pageId);
-        return true;
+        const auto slotId = pageId.number - 1;
+        debugs(47, 5, "got a previously busy slot: " << slotId);
+        map->prepFreeSlice(slotId);
+        return slotId;
     }
     assert(waitingForPage == &pageId);
     waitingForPage = NULL;
 
+    // This may happen when the number of available db slots is close to the
+    // number of concurrent requests reading or writing those slots, which may
+    // happen when the db is "small" compared to the request traffic OR when we
+    // are rebuilding and have not loaded "many" entries or empty slots yet.
     debugs(47, 3, "cannot get a slot; entries: " << map->entryCount());
-    return false;
+    throw TexcHere("ran out of free db slots");
 }
 
 bool
@@ -734,7 +754,7 @@ void
 Rock::SwapDir::noteFreeMapSlice(const Ipc::StoreMapSliceId sliceId)
 {
     Ipc::Mem::PageId pageId;
-    pageId.pool = index+1;
+    pageId.pool = Ipc::Mem::PageStack::IdForSwapDirSpace(index);
     pageId.number = sliceId+1;
     if (waitingForPage) {
         *waitingForPage = pageId;
@@ -753,7 +773,7 @@ Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOS
         return NULL;
     }
 
-    if (e.swap_filen < 0) {
+    if (!e.hasDisk()) {
         debugs(47,4, HERE << e);
         return NULL;
     }
@@ -799,9 +819,11 @@ Rock::SwapDir::ioCompletedNotification()
     if (!theFile)
         fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
 
-    if (theFile->error())
+    if (theFile->error()) {
+        int xerrno = errno; // XXX: where does errno come from
         fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
-               xstrerror());
+               xstrerr(xerrno));
+    }
 
     debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
            std::setw(12) << maxSize() << " disk bytes, " <<
@@ -823,16 +845,15 @@ Rock::SwapDir::readCompleted(const char *, int rlen, int errflag, RefCount< ::Re
     ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
     assert(request);
     IoState::Pointer sio = request->sio;
-
-    if (errflag == DISK_OK && rlen > 0)
-        sio->offset_ += rlen;
-
-    sio->callReaderBack(r->buf, rlen);
+    sio->handleReadCompletion(*request, rlen, errflag);
 }
 
 void
 Rock::SwapDir::writeCompleted(int errflag, size_t, RefCount< ::WriteRequest> r)
 {
+    // TODO: Move details into IoState::handleWriteCompletion() after figuring
+    // out how to deal with map access. See readCompleted().
+
     Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
     assert(request);
     assert(request->sio !=  NULL);
@@ -841,55 +862,106 @@ Rock::SwapDir::writeCompleted(int errflag, size_t, RefCount< ::WriteRequest> r)
     // quit if somebody called IoState::close() while we were waiting
     if (!sio.stillWaiting()) {
         debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
-        noteFreeMapSlice(request->sidNext);
+        noteFreeMapSlice(request->sidCurrent);
         return;
     }
 
-    // TODO: Fail if disk dropped one of the previous write requests.
+    debugs(79, 7, "errflag=" << errflag << " rlen=" << request->len << " eof=" << request->eof);
 
-    if (errflag == DISK_OK) {
-        // do not increment sio.offset_ because we do it in sio->write()
+    if (errflag != DISK_OK)
+        handleWriteCompletionProblem(errflag, *request);
+    else if (!sio.expectedReply(request->id))
+        handleWriteCompletionProblem(DISK_ERROR, *request);
+    else
+        handleWriteCompletionSuccess(*request);
+
+    if (sio.touchingStoreEntry())
+        CollapsedForwarding::Broadcast(*sio.e);
+}
 
-        // finalize the shared slice info after writing slice contents to disk
-        Ipc::StoreMap::Slice &slice =
-            map->writeableSlice(sio.swap_filen, request->sidCurrent);
-        slice.size = request->len - sizeof(DbCellHeader);
-        slice.next = request->sidNext;
+/// code shared by writeCompleted() success handling cases
+void
+Rock::SwapDir::handleWriteCompletionSuccess(const WriteRequest &request)
+{
+    auto &sio = *(request.sio);
+    sio.splicingPoint = request.sidCurrent;
+    // do not increment sio.offset_ because we do it in sio->write()
 
-        if (request->eof) {
-            assert(sio.e);
-            assert(sio.writeableAnchor_);
+    assert(sio.writeableAnchor_);
+    if (sio.writeableAnchor_->start < 0) { // wrote the first slot
+        Must(request.sidPrevious < 0);
+        sio.writeableAnchor_->start = request.sidCurrent;
+    } else {
+        Must(request.sidPrevious >= 0);
+        map->writeableSlice(sio.swap_filen, request.sidPrevious).next = request.sidCurrent;
+    }
+
+    // finalize the shared slice info after writing slice contents to disk;
+    // the chain gets possession of the slice we were writing
+    Ipc::StoreMap::Slice &slice =
+        map->writeableSlice(sio.swap_filen, request.sidCurrent);
+    slice.size = request.len - sizeof(DbCellHeader);
+    Must(slice.next < 0);
+
+    if (request.eof) {
+        assert(sio.e);
+        if (sio.touchingStoreEntry()) {
             sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz =
                                       sio.offset_;
 
-            // close, the entry gets the read lock
-            map->closeForWriting(sio.swap_filen, true);
-            sio.writeableAnchor_ = NULL;
-            sio.finishedWriting(errflag);
+            map->switchWritingToReading(sio.swap_filen);
+            // sio.e keeps the (now read) lock on the anchor
         }
-    } else {
-        noteFreeMapSlice(request->sidNext);
-
-        writeError(*sio.e);
-        sio.finishedWriting(errflag);
-        // and hope that Core will call disconnect() to close the map entry
+        sio.writeableAnchor_ = NULL;
+        sio.finishedWriting(DISK_OK);
     }
+}
+
+/// code shared by writeCompleted() error handling cases
+void
+Rock::SwapDir::handleWriteCompletionProblem(const int errflag, const WriteRequest &request)
+{
+    auto &sio = *request.sio;
+
+    noteFreeMapSlice(request.sidCurrent);
 
-    CollapsedForwarding::Broadcast(*sio.e);
+    writeError(sio);
+    sio.finishedWriting(errflag);
+    // and hope that Core will call disconnect() to close the map entry
 }
 
 void
-Rock::SwapDir::writeError(StoreEntry &e)
+Rock::SwapDir::writeError(StoreIOState &sio)
 {
     // Do not abortWriting here. The entry should keep the write lock
     // instead of losing association with the store and confusing core.
-    map->freeEntry(e.swap_filen); // will mark as unusable, just in case
+    map->freeEntry(sio.swap_filen); // will mark as unusable, just in case
 
-    Store::Root().transientsAbandon(e);
+    if (sio.touchingStoreEntry())
+        Store::Root().stopSharing(*sio.e);
+    // else noop: a fresh entry update error does not affect stale entry readers
 
     // All callers must also call IoState callback, to propagate the error.
 }
 
+void
+Rock::SwapDir::updateHeaders(StoreEntry *updatedE)
+{
+    if (!map)
+        return;
+
+    Ipc::StoreMapUpdate update(updatedE);
+    if (!map->openForUpdating(update, updatedE->swap_filen))
+        return;
+
+    try {
+        AsyncJob::Start(new HeaderUpdater(this, update));
+    } catch (const std::exception &ex) {
+        debugs(20, 2, "error starting to update entry " << *updatedE << ": " << ex.what());
+        map->abortUpdating(update);
+    }
+}
+
 bool
 Rock::SwapDir::full() const
 {
@@ -924,7 +996,7 @@ Rock::SwapDir::reference(StoreEntry &e)
 }
 
 bool
-Rock::SwapDir::dereference(StoreEntry &e, bool)
+Rock::SwapDir::dereference(StoreEntry &e)
 {
     debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
     if (repl && repl->Dereferenced)
@@ -942,19 +1014,24 @@ Rock::SwapDir::unlinkdUseful() const
 }
 
 void
-Rock::SwapDir::unlink(StoreEntry &e)
+Rock::SwapDir::evictIfFound(const cache_key *key)
 {
-    debugs(47, 5, HERE << e);
-    ignoreReferences(e);
-    map->freeEntry(e.swap_filen);
-    disconnect(e);
+    if (map)
+        map->freeEntryByKey(key); // may not be there
 }
 
 void
-Rock::SwapDir::markForUnlink(StoreEntry &e)
+Rock::SwapDir::evictCached(StoreEntry &e)
 {
     debugs(47, 5, e);
-    map->freeEntry(e.swap_filen);
+    if (e.hasDisk(index)) {
+        if (map->freeEntry(e.swap_filen))
+            CollapsedForwarding::Broadcast(e);
+        if (!e.locked())
+            disconnect(e);
+    } else if (const auto key = e.publicKey()) {
+        evictIfFound(key);
+    }
 }
 
 void
@@ -1036,6 +1113,12 @@ Rock::SwapDir::freeSlotsPath() const
     return spacesPath.termedBuf();
 }
 
+bool
+Rock::SwapDir::hasReadableEntry(const StoreEntry &e) const
+{
+    return map->hasReadableEntry(reinterpret_cast<const cache_key*>(e.key));
+}
+
 namespace Rock
 {
 RunnerRegistrationEntry(SwapDirRr);
@@ -1055,7 +1138,9 @@ void Rock::SwapDirRr::create()
             // TODO: somehow remove pool id and counters from PageStack?
             Ipc::Mem::Owner<Ipc::Mem::PageStack> *const freeSlotsOwner =
                 shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(),
-                                             i+1, capacity, 0);
+                                             Ipc::Mem::PageStack::IdForSwapDirSpace(i),
+                                             capacity,
+                                             0);
             freeSlotsOwners.push_back(freeSlotsOwner);
 
             // TODO: add method to initialize PageStack with no free pages