]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/fs/rock/RockSwapDir.cc
Source Format Enforcement (#532)
[thirdparty/squid.git] / src / fs / rock / RockSwapDir.cc
index 7f4526b367cbe82b83b383672cc616e033f1dbec..ac324aac5fc0faa904f831fdb3fa7d20224cac3c 100644 (file)
@@ -1,18 +1,26 @@
 /*
- * DEBUG: section 47    Store Directory Routines
+ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
+/* DEBUG: section 47    Store Directory Routines */
+
 #include "squid.h"
 #include "cache_cf.h"
+#include "CollapsedForwarding.h"
 #include "ConfigOption.h"
 #include "DiskIO/DiskIOModule.h"
 #include "DiskIO/DiskIOStrategy.h"
 #include "DiskIO/ReadRequest.h"
 #include "DiskIO/WriteRequest.h"
-#include "fs/rock/RockSwapDir.h"
-#include "fs/rock/RockIoState.h"
+#include "fs/rock/RockHeaderUpdater.h"
 #include "fs/rock/RockIoRequests.h"
+#include "fs/rock/RockIoState.h"
 #include "fs/rock/RockRebuild.h"
+#include "fs/rock/RockSwapDir.h"
 #include "globals.h"
 #include "ipc/mem/Pages.h"
 #include "MemObject.h"
@@ -23,6 +31,7 @@
 
 #include <cstdlib>
 #include <iomanip>
+#include <limits>
 
 #if HAVE_SYS_STAT_H
 #include <sys/stat.h>
@@ -30,7 +39,9 @@
 
 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
 
-Rock::SwapDir::SwapDir(): ::SwapDir("rock"), filePath(NULL), io(NULL), map(NULL)
+Rock::SwapDir::SwapDir(): ::SwapDir("rock"),
+    slotSize(HeaderSize), filePath(NULL), map(NULL), io(NULL),
+    waitingForPage(NULL)
 {
 }
 
@@ -41,19 +52,6 @@ Rock::SwapDir::~SwapDir()
     safe_free(filePath);
 }
 
-StoreSearch *
-Rock::SwapDir::search(String const url, HttpRequest *)
-{
-    assert(false);
-    return NULL; // XXX: implement
-}
-
-void
-Rock::SwapDir::get(String const key, STOREGETCLIENT cb, void *data)
-{
-    ::SwapDir::get(key, cb, data);
-}
-
 // called when Squid core needs a StoreEntry with a given key
 StoreEntry *
 Rock::SwapDir::get(const cache_key *key)
@@ -62,60 +60,101 @@ Rock::SwapDir::get(const cache_key *key)
         return NULL;
 
     sfileno filen;
-    const Ipc::StoreMapSlot *const slot = map->openForReading(key, filen);
+    const Ipc::StoreMapAnchor *const slot = map->openForReading(key, filen);
     if (!slot)
         return NULL;
 
-    const Ipc::StoreMapSlot::Basics &basics = slot->basics;
-
     // create a brand new store entry and initialize it with stored basics
     StoreEntry *e = new StoreEntry();
-    e->lock_count = 0;
-    e->swap_dirn = index;
-    e->swap_filen = filen;
-    e->swap_file_sz = basics.swap_file_sz;
-    e->lastref = basics.lastref;
-    e->timestamp = basics.timestamp;
-    e->expires = basics.expires;
-    e->lastmod = basics.lastmod;
-    e->refcount = basics.refcount;
-    e->flags = basics.flags;
-    e->store_status = STORE_OK;
-    e->setMemStatus(NOT_IN_MEMORY);
-    e->swap_status = SWAPOUT_DONE;
-    e->ping_status = PING_NONE;
-    EBIT_SET(e->flags, ENTRY_CACHABLE);
-    EBIT_CLR(e->flags, RELEASE_REQUEST);
-    EBIT_CLR(e->flags, KEY_PRIVATE);
-    EBIT_SET(e->flags, ENTRY_VALIDATED);
-    e->hashInsert(key);
+    e->createMemObject();
+    anchorEntry(*e, filen, *slot);
     trackReferences(*e);
-
     return e;
-    // the disk entry remains open for reading, protected from modifications
+}
+
+bool
+Rock::SwapDir::anchorToCache(StoreEntry &entry, bool &inSync)
+{
+    if (!map || !theFile || !theFile->canRead())
+        return false;
+
+    sfileno filen;
+    const Ipc::StoreMapAnchor *const slot = map->openForReading(
+            reinterpret_cast<cache_key*>(entry.key), filen);
+    if (!slot)
+        return false;
+
+    anchorEntry(entry, filen, *slot);
+    inSync = updateAnchoredWith(entry, *slot);
+    return true; // even if inSync is false
+}
+
+bool
+Rock::SwapDir::updateAnchored(StoreEntry &entry)
+{
+    if (!map || !theFile || !theFile->canRead())
+        return false;
+
+    assert(entry.hasDisk(index));
+
+    const Ipc::StoreMapAnchor &s = map->readableEntry(entry.swap_filen);
+    return updateAnchoredWith(entry, s);
+}
+
+bool
+Rock::SwapDir::updateAnchoredWith(StoreEntry &entry, const Ipc::StoreMapAnchor &anchor)
+{
+    entry.swap_file_sz = anchor.basics.swap_file_sz;
+    return true;
+}
+
+void
+Rock::SwapDir::anchorEntry(StoreEntry &e, const sfileno filen, const Ipc::StoreMapAnchor &anchor)
+{
+    anchor.exportInto(e);
+
+    const bool complete = anchor.complete();
+    e.store_status = complete ? STORE_OK : STORE_PENDING;
+    // SWAPOUT_WRITING: even though another worker writes?
+    e.attachToDisk(index, filen, complete ? SWAPOUT_DONE : SWAPOUT_WRITING);
+
+    e.ping_status = PING_NONE;
+
+    EBIT_SET(e.flags, ENTRY_VALIDATED);
 }
 
 void Rock::SwapDir::disconnect(StoreEntry &e)
 {
-    assert(e.swap_dirn == index);
-    assert(e.swap_filen >= 0);
-    // cannot have SWAPOUT_NONE entry with swap_filen >= 0
-    assert(e.swap_status != SWAPOUT_NONE);
+    assert(e.hasDisk(index));
+
+    ignoreReferences(e);
 
     // do not rely on e.swap_status here because there is an async delay
     // before it switches from SWAPOUT_WRITING to SWAPOUT_DONE.
 
-    // since e has swap_filen, its slot is locked for either reading or writing
-    map->abortIo(e.swap_filen);
-    e.swap_dirn = -1;
-    e.swap_filen = -1;
-    e.swap_status = SWAPOUT_NONE;
+    // since e has swap_filen, its slot is locked for reading and/or writing
+    // but it is difficult to know whether THIS worker is reading or writing e,
+    // especially since we may switch from writing to reading. This code relies
+    // on Rock::IoState::writeableAnchor_ being set when we locked for writing.
+    if (e.mem_obj && e.mem_obj->swapout.sio != NULL &&
+            dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_) {
+        map->abortWriting(e.swap_filen);
+        e.detachFromDisk();
+        dynamic_cast<IoState&>(*e.mem_obj->swapout.sio).writeableAnchor_ = NULL;
+        Store::Root().stopSharing(e); // broadcasts after the change
+    } else {
+        map->closeForReading(e.swap_filen);
+        e.detachFromDisk();
+    }
 }
 
 uint64_t
 Rock::SwapDir::currentSize() const
 {
-    return HeaderSize + max_objsize * currentCount();
+    const uint64_t spaceSize = !freeSlots ?
+                               maxSize() : (slotSize * freeSlots->size());
+    // everything that is not free is in use
+    return maxSize() - spaceSize;
 }
 
 uint64_t
@@ -133,17 +172,40 @@ Rock::SwapDir::doReportStat() const
 }
 
 void
-Rock::SwapDir::swappedOut(const StoreEntry &)
+Rock::SwapDir::finalizeSwapoutSuccess(const StoreEntry &)
+{
+    // nothing to do
+}
+
+void
+Rock::SwapDir::finalizeSwapoutFailure(StoreEntry &entry)
 {
-    // stats are not stored but computed when needed
+    debugs(47, 5, entry);
+    disconnect(entry); // calls abortWriting() to free the disk entry
 }
 
 int64_t
-Rock::SwapDir::entryLimitAllowed() const
+Rock::SwapDir::slotLimitAbsolute() const
 {
-    const int64_t eLimitLo = map ? map->entryLimit() : 0; // dynamic shrinking unsupported
-    const int64_t eWanted = (maxSize() - HeaderSize)/maxObjectSize();
-    return min(max(eLimitLo, eWanted), entryLimitHigh());
+    // the max value is an invalid one; all values must be below the limit
+    assert(std::numeric_limits<Ipc::StoreMapSliceId>::max() ==
+           std::numeric_limits<SlotId>::max());
+    return std::numeric_limits<SlotId>::max();
+}
+
+int64_t
+Rock::SwapDir::slotLimitActual() const
+{
+    const int64_t sWanted = (maxSize() - HeaderSize)/slotSize;
+    const int64_t sLimitLo = map ? map->sliceLimit() : 0; // dynamic shrinking unsupported
+    const int64_t sLimitHi = slotLimitAbsolute();
+    return min(max(sLimitLo, sWanted), sLimitHi);
+}
+
+int64_t
+Rock::SwapDir::entryLimitActual() const
+{
+    return min(slotLimitActual(), entryLimitAbsolute());
 }
 
 // TODO: encapsulate as a tool
@@ -175,51 +237,45 @@ Rock::SwapDir::create()
         // no need to distinguish ENOENT from other possible stat() errors.
         debugs (47, DBG_IMPORTANT, "Creating Rock db directory: " << path);
         const int res = mkdir(path, 0700);
-        if (res != 0) {
-            debugs(47, DBG_CRITICAL, "Failed to create Rock db dir " << path <<
-                   ": " << xstrerror());
-            fatal("Rock Store db creation error");
-        }
+        if (res != 0)
+            createError("mkdir");
     }
 
     debugs (47, DBG_IMPORTANT, "Creating Rock db: " << filePath);
+    const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
+    if (swap < 0)
+        createError("create");
+
 #if SLOWLY_FILL_WITH_ZEROS
     char block[1024];
     Must(maxSize() % sizeof(block) == 0);
     memset(block, '\0', sizeof(block));
 
-    const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
     for (off_t offset = 0; offset < maxSize(); offset += sizeof(block)) {
-        if (write(swap, block, sizeof(block)) != sizeof(block)) {
-            debugs(47, DBG_CRITICAL, "ERROR: Failed to create Rock Store db in " << filePath <<
-                   ": " << xstrerror());
-            fatal("Rock Store db creation error");
-        }
+        if (write(swap, block, sizeof(block)) != sizeof(block))
+            createError("write");
     }
-    close(swap);
 #else
-    const int swap = open(filePath, O_WRONLY|O_CREAT|O_TRUNC|O_BINARY, 0600);
-    if (swap < 0) {
-        debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " << filePath <<
-               "; create error: " << xstrerror());
-        fatal("Rock Store db creation error");
-    }
-
-    if (ftruncate(swap, maxSize()) != 0) {
-        debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " << filePath <<
-               "; truncate error: " << xstrerror());
-        fatal("Rock Store db creation error");
-    }
+    if (ftruncate(swap, maxSize()) != 0)
+        createError("truncate");
 
     char header[HeaderSize];
     memset(header, '\0', sizeof(header));
-    if (write(swap, header, sizeof(header)) != sizeof(header)) {
-        debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " << filePath <<
-               "; write error: " << xstrerror());
-        fatal("Rock Store db initialization error");
-    }
-    close(swap);
+    if (write(swap, header, sizeof(header)) != sizeof(header))
+        createError("write");
 #endif
+
+    close(swap);
+}
+
+// report Rock DB creation error and exit
+void
+Rock::SwapDir::createError(const char *const msg)
+{
+    int xerrno = errno; // XXX: where does errno come from?
+    debugs(47, DBG_CRITICAL, "ERROR: Failed to initialize Rock Store db in " <<
+           filePath << "; " << msg << " error: " << xstrerr(xerrno));
+    fatal("Rock Store db creation error");
 }
 
 void
@@ -231,8 +287,11 @@ Rock::SwapDir::init()
     // are refcounted. We up our count once to avoid implicit delete's.
     lock();
 
+    freeSlots = shm_old(Ipc::Mem::PageStack)(freeSlotsPath());
+
     Must(!map);
-    map = new DirMap(path);
+    map = new DirMap(inodeMapPath());
+    map->cleaner = this;
 
     const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
     if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
@@ -316,17 +375,26 @@ Rock::SwapDir::parseSize(const bool reconfig)
 ConfigOption *
 Rock::SwapDir::getOptionTree() const
 {
-    ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(::SwapDir::getOptionTree());
-    assert(vector);
-    vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
-    vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
-    return vector;
+    ConfigOption *copt = ::SwapDir::getOptionTree();
+    ConfigOptionVector *vector = dynamic_cast<ConfigOptionVector*>(copt);
+    if (vector) {
+        // if copt is actually a ConfigOptionVector
+        vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseSizeOption, &SwapDir::dumpSizeOption));
+        vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseTimeOption, &SwapDir::dumpTimeOption));
+        vector->options.push_back(new ConfigOptionAdapter<SwapDir>(*const_cast<SwapDir *>(this), &SwapDir::parseRateOption, &SwapDir::dumpRateOption));
+    } else {
+        // we don't know how to handle copt, as it's not a ConfigOptionVector.
+        // free it (and return nullptr)
+        delete copt;
+        copt = nullptr;
+    }
+    return copt;
 }
 
 bool
 Rock::SwapDir::allowOptionReconfigure(const char *const option) const
 {
-    return strcmp(option, "max-size") != 0 &&
+    return strcmp(option, "slot-size") != 0 &&
            ::SwapDir::allowOptionReconfigure(option);
 }
 
@@ -335,7 +403,7 @@ bool
 Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconfig)
 {
     // TODO: ::SwapDir or, better, Config should provide time-parsing routines,
-    // including time unit handling. Same for size.
+    // including time unit handling. Same for size and rate.
 
     time_msec_t *storedTime;
     if (strcmp(option, "swap-timeout") == 0)
@@ -343,14 +411,17 @@ Rock::SwapDir::parseTimeOption(char const *option, const char *value, int reconf
     else
         return false;
 
-    if (!value)
+    if (!value) {
         self_destruct();
+        return false;
+    }
 
     // TODO: handle time units and detect parsing errors better
     const int64_t parsedValue = strtoll(value, NULL, 10);
     if (parsedValue < 0) {
         debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
         self_destruct();
+        return false;
     }
 
     const time_msec_t newTime = static_cast<time_msec_t>(parsedValue);
@@ -385,14 +456,17 @@ Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaRec
     else
         return false;
 
-    if (!value)
+    if (!value) {
         self_destruct();
+        return false;
+    }
 
     // TODO: handle time units and detect parsing errors better
     const int64_t parsedValue = strtoll(value, NULL, 10);
     if (parsedValue < 0) {
         debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << parsedValue);
         self_destruct();
+        return false;
     }
 
     const int newRate = static_cast<int>(parsedValue);
@@ -400,6 +474,7 @@ Rock::SwapDir::parseRateOption(char const *option, const char *value, int isaRec
     if (newRate < 0) {
         debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must not be negative but is: " << newRate);
         self_destruct();
+        return false;
     }
 
     if (!isaReconfig)
@@ -421,31 +496,93 @@ Rock::SwapDir::dumpRateOption(StoreEntry * e) const
         storeAppendPrintf(e, " max-swap-rate=%d", fileConfig.ioRate);
 }
 
+/// parses size-specific options; mimics ::SwapDir::optionObjectSizeParse()
+bool
+Rock::SwapDir::parseSizeOption(char const *option, const char *value, int reconfig)
+{
+    uint64_t *storedSize;
+    if (strcmp(option, "slot-size") == 0)
+        storedSize = &slotSize;
+    else
+        return false;
+
+    if (!value) {
+        self_destruct();
+        return false;
+    }
+
+    // TODO: handle size units and detect parsing errors better
+    const uint64_t newSize = strtoll(value, NULL, 10);
+    if (newSize <= 0) {
+        debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must be positive; got: " << newSize);
+        self_destruct();
+        return false;
+    }
+
+    if (newSize <= sizeof(DbCellHeader)) {
+        debugs(3, DBG_CRITICAL, "FATAL: cache_dir " << path << ' ' << option << " must exceed " << sizeof(DbCellHeader) << "; got: " << newSize);
+        self_destruct();
+        return false;
+    }
+
+    if (!reconfig)
+        *storedSize = newSize;
+    else if (*storedSize != newSize) {
+        debugs(3, DBG_IMPORTANT, "WARNING: cache_dir " << path << ' ' << option
+               << " cannot be changed dynamically, value left unchanged: " <<
+               *storedSize);
+    }
+
+    return true;
+}
+
+/// reports size-specific options; mimics ::SwapDir::optionObjectSizeDump()
+void
+Rock::SwapDir::dumpSizeOption(StoreEntry * e) const
+{
+    storeAppendPrintf(e, " slot-size=%" PRId64, slotSize);
+}
+
 /// check the results of the configuration; only level-0 debugging works here
 void
 Rock::SwapDir::validateOptions()
 {
-    if (max_objsize <= 0)
-        fatal("Rock store requires a positive max-size");
+    if (slotSize <= 0)
+        fatal("Rock store requires a positive slot-size");
 
     const int64_t maxSizeRoundingWaste = 1024 * 1024; // size is configured in MB
-    const int64_t maxObjectSizeRoundingWaste = maxObjectSize();
+    const int64_t slotSizeRoundingWaste = slotSize;
     const int64_t maxRoundingWaste =
-        max(maxSizeRoundingWaste, maxObjectSizeRoundingWaste);
-    const int64_t usableDiskSize = diskOffset(entryLimitAllowed());
-    const int64_t diskWasteSize = maxSize() - usableDiskSize;
-    Must(diskWasteSize >= 0);
+        max(maxSizeRoundingWaste, slotSizeRoundingWaste);
+
+    // an entry consumes at least one slot; round up to reduce false warnings
+    const int64_t blockSize = static_cast<int64_t>(slotSize);
+    const int64_t maxObjSize = max(blockSize,
+                                   ((maxObjectSize()+blockSize-1)/blockSize)*blockSize);
+
+    // Does the "sfileno*max-size" limit match configured db capacity?
+    const double entriesMayOccupy = entryLimitAbsolute()*static_cast<double>(maxObjSize);
+    if (entriesMayOccupy + maxRoundingWaste < maxSize()) {
+        const int64_t diskWasteSize = maxSize() - static_cast<int64_t>(entriesMayOccupy);
+        debugs(47, DBG_CRITICAL, "WARNING: Rock cache_dir " << path << " wastes disk space due to entry limits:" <<
+               "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
+               "\n\tconfigured db slot size: " << slotSize << " bytes" <<
+               "\n\tconfigured maximum entry size: " << maxObjectSize() << " bytes" <<
+               "\n\tmaximum number of cache_dir entries supported by Squid: " << entryLimitAbsolute() <<
+               "\n\tdisk space all entries may use: " << entriesMayOccupy << " bytes" <<
+               "\n\tdisk space wasted: " << diskWasteSize << " bytes");
+    }
 
-    // warn if maximum db size is not reachable due to sfileno limit
-    if (entryLimitAllowed() == entryLimitHigh() &&
-            diskWasteSize >= maxRoundingWaste) {
-        debugs(47, DBG_CRITICAL, "Rock store cache_dir[" << index << "] '" << path << "':");
-        debugs(47, DBG_CRITICAL, "\tmaximum number of entries: " << entryLimitAllowed());
-        debugs(47, DBG_CRITICAL, "\tmaximum object size: " << maxObjectSize() << " Bytes");
-        debugs(47, DBG_CRITICAL, "\tmaximum db size: " << maxSize() << " Bytes");
-        debugs(47, DBG_CRITICAL, "\tusable db size:  " << usableDiskSize << " Bytes");
-        debugs(47, DBG_CRITICAL, "\tdisk space waste: " << diskWasteSize << " Bytes");
-        debugs(47, DBG_CRITICAL, "WARNING: Rock store config wastes space.");
+    // Does the "absolute slot count" limit match configured db capacity?
+    const double slotsMayOccupy = slotLimitAbsolute()*static_cast<double>(slotSize);
+    if (slotsMayOccupy + maxRoundingWaste < maxSize()) {
+        const int64_t diskWasteSize = maxSize() - static_cast<int64_t>(entriesMayOccupy);
+        debugs(47, DBG_CRITICAL, "WARNING: Rock cache_dir " << path << " wastes disk space due to slot limits:" <<
+               "\n\tconfigured db capacity: " << maxSize() << " bytes" <<
+               "\n\tconfigured db slot size: " << slotSize << " bytes" <<
+               "\n\tmaximum number of rock cache_dir slots supported by Squid: " << slotLimitAbsolute() <<
+               "\n\tdisk space all slots may use: " << slotsMayOccupy << " bytes" <<
+               "\n\tdisk space wasted: " << diskWasteSize << " bytes");
     }
 }
 
@@ -456,32 +593,12 @@ Rock::SwapDir::rebuild()
     AsyncJob::Start(new Rebuild(this));
 }
 
-/* Add a new object to the cache with empty memory copy and pointer to disk
- * use to rebuild store from disk. Based on UFSSwapDir::addDiskRestore */
-bool
-Rock::SwapDir::addEntry(const int filen, const DbCellHeader &header, const StoreEntry &from)
-{
-    debugs(47, 8, HERE << &from << ' ' << from.getMD5Text() <<
-           ", filen="<< std::setfill('0') << std::hex << std::uppercase <<
-           std::setw(8) << filen);
-
-    sfileno newLocation = 0;
-    if (Ipc::StoreMapSlot *slot = map->openForWriting(reinterpret_cast<const cache_key *>(from.key), newLocation)) {
-        if (filen == newLocation) {
-            slot->set(from);
-            map->extras(filen) = header;
-        } // else some other, newer entry got into our cell
-        map->closeForWriting(newLocation, false);
-        return filen == newLocation;
-    }
-
-    return false;
-}
-
 bool
 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
 {
-    if (!::SwapDir::canStore(e, sizeof(DbCellHeader)+diskSpaceNeeded, load))
+    if (diskSpaceNeeded >= 0)
+        diskSpaceNeeded += sizeof(DbCellHeader);
+    if (!::SwapDir::canStore(e, diskSpaceNeeded, load))
         return false;
 
     if (!theFile || !theFile->canWrite())
@@ -513,42 +630,31 @@ Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreI
         return NULL;
     }
 
-    // compute payload size for our cell header, using StoreEntry info
-    // careful: e.objectLen() may still be negative here
-    const int64_t expectedReplySize = e.mem_obj->expectedReplySize();
-    assert(expectedReplySize >= 0); // must know to prevent cell overflows
-    assert(e.mem_obj->swap_hdr_sz > 0);
-    DbCellHeader header;
-    header.payloadSize = e.mem_obj->swap_hdr_sz + expectedReplySize;
-    const int64_t payloadEnd = sizeof(DbCellHeader) + header.payloadSize;
-    assert(payloadEnd <= max_objsize);
-
     sfileno filen;
-    Ipc::StoreMapSlot *const slot =
+    Ipc::StoreMapAnchor *const slot =
         map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
     if (!slot) {
         debugs(47, 5, HERE << "map->add failed");
         return NULL;
     }
-    e.swap_file_sz = header.payloadSize; // and will be copied to the map
+
+    assert(filen >= 0);
     slot->set(e);
-    map->extras(filen) = header;
 
     // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
     // If that does not happen, the entry will not decrement the read level!
 
-    IoState *sio = new IoState(this, &e, cbFile, cbIo, data);
+    Rock::SwapDir::Pointer self(this);
+    IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
 
     sio->swap_dirn = index;
     sio->swap_filen = filen;
-    sio->payloadEnd = payloadEnd;
-    sio->diskOffset = diskOffset(sio->swap_filen);
+    sio->writeableAnchor_ = slot;
 
     debugs(47,5, HERE << "dir " << index << " created new filen " <<
            std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
-           sio->swap_filen << std::dec << " at " << sio->diskOffset);
-
-    assert(sio->diskOffset + payloadEnd <= diskOffsetLimit());
+           sio->swap_filen << std::dec << " starting at " <<
+           diskOffset(sio->swap_filen));
 
     sio->file(theFile);
 
@@ -556,21 +662,109 @@ Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreI
     return sio;
 }
 
+StoreIOState::Pointer
+Rock::SwapDir::createUpdateIO(const Ipc::StoreMapUpdate &update, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
+{
+    if (!theFile || theFile->error()) {
+        debugs(47,4, theFile);
+        return nullptr;
+    }
+
+    Must(update.fresh);
+    Must(update.fresh.fileNo >= 0);
+
+    Rock::SwapDir::Pointer self(this);
+    IoState *sio = new IoState(self, update.entry, cbFile, cbIo, data);
+
+    sio->swap_dirn = index;
+    sio->swap_filen = update.fresh.fileNo;
+    sio->writeableAnchor_ = update.fresh.anchor;
+
+    debugs(47,5, "dir " << index << " updating filen " <<
+           std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
+           sio->swap_filen << std::dec << " starting at " <<
+           diskOffset(sio->swap_filen));
+
+    sio->file(theFile);
+    return sio;
+}
+
 int64_t
-Rock::SwapDir::diskOffset(int filen) const
+Rock::SwapDir::diskOffset(const SlotId sid) const
 {
-    assert(filen >= 0);
-    return HeaderSize + max_objsize*filen;
+    assert(sid >= 0);
+    return HeaderSize + slotSize*sid;
+}
+
+int64_t
+Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const
+{
+    assert(pageId);
+    return diskOffset(pageId.number - 1);
 }
 
 int64_t
 Rock::SwapDir::diskOffsetLimit() const
 {
     assert(map);
-    return diskOffset(map->entryLimit());
+    return diskOffset(map->sliceLimit());
+}
+
+Rock::SlotId
+Rock::SwapDir::reserveSlotForWriting()
+{
+    Ipc::Mem::PageId pageId;
+
+    if (freeSlots->pop(pageId)) {
+        const auto slotId = pageId.number - 1;
+        debugs(47, 5, "got a previously free slot: " << slotId);
+        map->prepFreeSlice(slotId);
+        return slotId;
+    }
+
+    // catch free slots delivered to noteFreeMapSlice()
+    assert(!waitingForPage);
+    waitingForPage = &pageId;
+    if (map->purgeOne()) {
+        assert(!waitingForPage); // noteFreeMapSlice() should have cleared it
+        assert(pageId.set());
+        const auto slotId = pageId.number - 1;
+        debugs(47, 5, "got a previously busy slot: " << slotId);
+        map->prepFreeSlice(slotId);
+        return slotId;
+    }
+    assert(waitingForPage == &pageId);
+    waitingForPage = NULL;
+
+    // This may happen when the number of available db slots is close to the
+    // number of concurrent requests reading or writing those slots, which may
+    // happen when the db is "small" compared to the request traffic OR when we
+    // are rebuilding and have not loaded "many" entries or empty slots yet.
+    debugs(47, 3, "cannot get a slot; entries: " << map->entryCount());
+    throw TexcHere("ran out of free db slots");
+}
+
+bool
+Rock::SwapDir::validSlotId(const SlotId slotId) const
+{
+    return 0 <= slotId && slotId < slotLimitActual();
 }
 
-// tries to open an old or being-written-to entry with swap_filen for reading
+void
+Rock::SwapDir::noteFreeMapSlice(const Ipc::StoreMapSliceId sliceId)
+{
+    Ipc::Mem::PageId pageId;
+    pageId.pool = Ipc::Mem::PageStack::IdForSwapDirSpace(index);
+    pageId.number = sliceId+1;
+    if (waitingForPage) {
+        *waitingForPage = pageId;
+        waitingForPage = NULL;
+    } else {
+        freeSlots->push(pageId);
+    }
+}
+
+// tries to open an old entry with swap_filen for reading
 StoreIOState::Pointer
 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
 {
@@ -579,7 +773,7 @@ Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOS
         return NULL;
     }
 
-    if (e.swap_filen < 0) {
+    if (!e.hasDisk()) {
         debugs(47,4, HERE << e);
         return NULL;
     }
@@ -594,29 +788,28 @@ Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOS
 
     // The are two ways an entry can get swap_filen: our get() locked it for
     // reading or our storeSwapOutStart() locked it for writing. Peeking at our
-    // locked entry is safe, but no support for reading a filling entry.
-    const Ipc::StoreMapSlot *slot = map->peekAtReader(e.swap_filen);
+    // locked entry is safe, but no support for reading the entry we swap out.
+    const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen);
     if (!slot)
         return NULL; // we were writing afterall
 
-    IoState *sio = new IoState(this, &e, cbFile, cbIo, data);
+    Rock::SwapDir::Pointer self(this);
+    IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
 
     sio->swap_dirn = index;
     sio->swap_filen = e.swap_filen;
-    sio->payloadEnd = sizeof(DbCellHeader) + map->extras(e.swap_filen).payloadSize;
-    assert(sio->payloadEnd <= max_objsize); // the payload fits the slot
+    sio->readableAnchor_ = slot;
+    sio->file(theFile);
 
     debugs(47,5, HERE << "dir " << index << " has old filen: " <<
            std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
            sio->swap_filen);
 
-    assert(slot->basics.swap_file_sz > 0);
-    assert(slot->basics.swap_file_sz == e.swap_file_sz);
-
-    sio->diskOffset = diskOffset(sio->swap_filen);
-    assert(sio->diskOffset + sio->payloadEnd <= diskOffsetLimit());
+    assert(slot->sameKey(static_cast<const cache_key*>(e.key)));
+    // For collapsed disk hits: e.swap_file_sz and slot->basics.swap_file_sz
+    // may still be zero and basics.swap_file_sz may grow.
+    assert(slot->basics.swap_file_sz >= e.swap_file_sz);
 
-    sio->file(theFile);
     return sio;
 }
 
@@ -626,13 +819,16 @@ Rock::SwapDir::ioCompletedNotification()
     if (!theFile)
         fatalf("Rock cache_dir failed to initialize db file: %s", filePath);
 
-    if (theFile->error())
+    if (theFile->error()) {
+        int xerrno = errno; // XXX: where does errno come from
         fatalf("Rock cache_dir at %s failed to open db file: %s", filePath,
-               xstrerror());
+               xstrerr(xerrno));
+    }
 
     debugs(47, 2, "Rock cache_dir[" << index << "] limits: " <<
-           std::setw(12) << maxSize() << " disk bytes and " <<
-           std::setw(7) << map->entryLimit() << " entries");
+           std::setw(12) << maxSize() << " disk bytes, " <<
+           std::setw(7) << map->entryLimit() << " entries, and " <<
+           std::setw(7) << map->sliceLimit() << " slots");
 
     rebuild();
 }
@@ -644,51 +840,132 @@ Rock::SwapDir::closeCompleted()
 }
 
 void
-Rock::SwapDir::readCompleted(const char *buf, int rlen, int errflag, RefCount< ::ReadRequest> r)
+Rock::SwapDir::readCompleted(const char *, int rlen, int errflag, RefCount< ::ReadRequest> r)
 {
     ReadRequest *request = dynamic_cast<Rock::ReadRequest*>(r.getRaw());
     assert(request);
     IoState::Pointer sio = request->sio;
-
-    if (errflag == DISK_OK && rlen > 0)
-        sio->offset_ += rlen;
-    assert(sio->diskOffset + sio->offset_ <= diskOffsetLimit()); // post-factum
-
-    StoreIOState::STRCB *callb = sio->read.callback;
-    assert(callb);
-    sio->read.callback = NULL;
-    void *cbdata;
-    if (cbdataReferenceValidDone(sio->read.callback_data, &cbdata))
-        callb(cbdata, r->buf, rlen, sio.getRaw());
+    sio->handleReadCompletion(*request, rlen, errflag);
 }
 
 void
-Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest> r)
+Rock::SwapDir::writeCompleted(int errflag, size_t, RefCount< ::WriteRequest> r)
 {
+    // TODO: Move details into IoState::handleWriteCompletion() after figuring
+    // out how to deal with map access. See readCompleted().
+
     Rock::WriteRequest *request = dynamic_cast<Rock::WriteRequest*>(r.getRaw());
     assert(request);
     assert(request->sio !=  NULL);
     IoState &sio = *request->sio;
 
-    if (errflag == DISK_OK) {
-        // close, assuming we only write once; the entry gets the read lock
-        map->closeForWriting(sio.swap_filen, true);
-        // do not increment sio.offset_ because we do it in sio->write()
+    // quit if somebody called IoState::close() while we were waiting
+    if (!sio.stillWaiting()) {
+        debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
+        noteFreeMapSlice(request->sidCurrent);
+        return;
+    }
+
+    debugs(79, 7, "errflag=" << errflag << " rlen=" << request->len << " eof=" << request->eof);
+
+    if (errflag != DISK_OK)
+        handleWriteCompletionProblem(errflag, *request);
+    else if (!sio.expectedReply(request->id))
+        handleWriteCompletionProblem(DISK_ERROR, *request);
+    else
+        handleWriteCompletionSuccess(*request);
+
+    if (sio.touchingStoreEntry())
+        CollapsedForwarding::Broadcast(*sio.e);
+}
+
+/// code shared by writeCompleted() success handling cases
+void
+Rock::SwapDir::handleWriteCompletionSuccess(const WriteRequest &request)
+{
+    auto &sio = *(request.sio);
+    sio.splicingPoint = request.sidCurrent;
+    // do not increment sio.offset_ because we do it in sio->write()
+
+    assert(sio.writeableAnchor_);
+    if (sio.writeableAnchor_->start < 0) { // wrote the first slot
+        Must(request.sidPrevious < 0);
+        sio.writeableAnchor_->start = request.sidCurrent;
     } else {
-        // Do not abortWriting here. The entry should keep the write lock
-        // instead of losing association with the store and confusing core.
-        map->free(sio.swap_filen); // will mark as unusable, just in case
+        Must(request.sidPrevious >= 0);
+        map->writeableSlice(sio.swap_filen, request.sidPrevious).next = request.sidCurrent;
     }
 
-    assert(sio.diskOffset + sio.offset_ <= diskOffsetLimit()); // post-factum
+    // finalize the shared slice info after writing slice contents to disk;
+    // the chain gets possession of the slice we were writing
+    Ipc::StoreMap::Slice &slice =
+        map->writeableSlice(sio.swap_filen, request.sidCurrent);
+    slice.size = request.len - sizeof(DbCellHeader);
+    Must(slice.next < 0);
+
+    if (request.eof) {
+        assert(sio.e);
+        if (sio.touchingStoreEntry()) {
+            sio.e->swap_file_sz = sio.writeableAnchor_->basics.swap_file_sz =
+                                      sio.offset_;
+
+            map->switchWritingToReading(sio.swap_filen);
+            // sio.e keeps the (now read) lock on the anchor
+        }
+        sio.writeableAnchor_ = NULL;
+        sio.finishedWriting(DISK_OK);
+    }
+}
+
+/// code shared by writeCompleted() error handling cases
+void
+Rock::SwapDir::handleWriteCompletionProblem(const int errflag, const WriteRequest &request)
+{
+    auto &sio = *request.sio;
 
+    noteFreeMapSlice(request.sidCurrent);
+
+    writeError(sio);
     sio.finishedWriting(errflag);
+    // and hope that Core will call disconnect() to close the map entry
+}
+
+void
+Rock::SwapDir::writeError(StoreIOState &sio)
+{
+    // Do not abortWriting here. The entry should keep the write lock
+    // instead of losing association with the store and confusing core.
+    map->freeEntry(sio.swap_filen); // will mark as unusable, just in case
+
+    if (sio.touchingStoreEntry())
+        Store::Root().stopSharing(*sio.e);
+    // else noop: a fresh entry update error does not affect stale entry readers
+
+    // All callers must also call IoState callback, to propagate the error.
+}
+
+void
+Rock::SwapDir::updateHeaders(StoreEntry *updatedE)
+{
+    if (!map)
+        return;
+
+    Ipc::StoreMapUpdate update(updatedE);
+    if (!map->openForUpdating(update, updatedE->swap_filen))
+        return;
+
+    try {
+        AsyncJob::Start(new HeaderUpdater(this, update));
+    } catch (const std::exception &ex) {
+        debugs(20, 2, "error starting to update entry " << *updatedE << ": " << ex.what());
+        map->abortUpdating(update);
+    }
 }
 
 bool
 Rock::SwapDir::full() const
 {
-    return map && map->full();
+    return freeSlots != NULL && !freeSlots->size();
 }
 
 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
@@ -704,49 +981,10 @@ Rock::SwapDir::diskFull()
 void
 Rock::SwapDir::maintain()
 {
-    debugs(47,3, HERE << "cache_dir[" << index << "] guards: " <<
-           !repl << !map << !full() << StoreController::store_dirs_rebuilding);
-
-    if (!repl)
-        return; // no means (cannot find a victim)
-
-    if (!map)
-        return; // no victims (yet)
-
-    if (!full())
-        return; // no need (to find a victim)
-
-    // XXX: UFSSwapDir::maintain says we must quit during rebuild
-    if (StoreController::store_dirs_rebuilding)
-        return;
-
-    debugs(47,3, HERE << "cache_dir[" << index << "] state: " << map->full() <<
-           ' ' << currentSize() << " < " << diskOffsetLimit());
-
-    // Hopefully, we find a removable entry much sooner (TODO: use time?)
-    const int maxProbed = 10000;
-    RemovalPurgeWalker *walker = repl->PurgeInit(repl, maxProbed);
-
-    // It really should not take that long, but this will stop "infinite" loops
-    const int maxFreed = 1000;
-    int freed = 0;
-    // TODO: should we purge more than needed to minimize overheads?
-    for (; freed < maxFreed && full(); ++freed) {
-        if (StoreEntry *e = walker->Next(walker))
-            e->release(); // will call our unlink() method
-        else
-            break; // no more objects
-    }
-
-    debugs(47,2, HERE << "Rock cache_dir[" << index << "] freed " << freed <<
-           " scanned " << walker->scanned << '/' << walker->locked);
-
-    walker->Done(walker);
-
-    if (full()) {
-        debugs(47, DBG_CRITICAL, "ERROR: Rock cache_dir[" << index << "] " <<
-               "is still full after freeing " << freed << " entries. A bug?");
-    }
+    // The Store calls this to free some db space, but there is nothing wrong
+    // with a full() db, except when db has to shrink after reconfigure, and
+    // we do not support shrinking yet (it would have to purge specific slots).
+    // TODO: Disable maintain() requests when they are pointless.
 }
 
 void
@@ -758,7 +996,7 @@ Rock::SwapDir::reference(StoreEntry &e)
 }
 
 bool
-Rock::SwapDir::dereference(StoreEntry &e, bool)
+Rock::SwapDir::dereference(StoreEntry &e)
 {
     debugs(47, 5, HERE << &e << ' ' << e.swap_dirn << ' ' << e.swap_filen);
     if (repl && repl->Dereferenced)
@@ -776,12 +1014,24 @@ Rock::SwapDir::unlinkdUseful() const
 }
 
 void
-Rock::SwapDir::unlink(StoreEntry &e)
+Rock::SwapDir::evictIfFound(const cache_key *key)
 {
-    debugs(47, 5, HERE << e);
-    ignoreReferences(e);
-    map->free(e.swap_filen);
-    disconnect(e);
+    if (map)
+        map->freeEntryByKey(key); // may not be there
+}
+
+void
+Rock::SwapDir::evictCached(StoreEntry &e)
+{
+    debugs(47, 5, e);
+    if (e.hasDisk(index)) {
+        if (map->freeEntry(e.swap_filen))
+            CollapsedForwarding::Broadcast(e);
+        if (!e.locked())
+            disconnect(e);
+    } else if (const auto key = e.publicKey()) {
+        evictIfFound(key);
+    }
 }
 
 void
@@ -809,19 +1059,27 @@ Rock::SwapDir::statfs(StoreEntry &e) const
                       currentSize() / 1024.0,
                       Math::doublePercent(currentSize(), maxSize()));
 
-    if (map) {
-        const int limit = map->entryLimit();
-        storeAppendPrintf(&e, "Maximum entries: %9d\n", limit);
-        if (limit > 0) {
-            const int entryCount = map->entryCount();
-            storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
-                              entryCount, (100.0 * entryCount / limit));
-
-            if (limit < 100) { // XXX: otherwise too expensive to count
-                Ipc::ReadWriteLockStats stats;
-                map->updateStats(stats);
-                stats.dump(e);
-            }
+    const int entryLimit = entryLimitActual();
+    const int slotLimit = slotLimitActual();
+    storeAppendPrintf(&e, "Maximum entries: %9d\n", entryLimit);
+    if (map && entryLimit > 0) {
+        const int entryCount = map->entryCount();
+        storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
+                          entryCount, (100.0 * entryCount / entryLimit));
+    }
+
+    storeAppendPrintf(&e, "Maximum slots:   %9d\n", slotLimit);
+    if (map && slotLimit > 0) {
+        const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size();
+        if (slotsFree <= static_cast<const unsigned int>(slotLimit)) {
+            const int usedSlots = slotLimit - static_cast<const int>(slotsFree);
+            storeAppendPrintf(&e, "Used slots:      %9d %.2f%%\n",
+                              usedSlots, (100.0 * usedSlots / slotLimit));
+        }
+        if (slotLimit < 100) { // XXX: otherwise too expensive to count
+            Ipc::ReadWriteLockStats stats;
+            map->updateStats(stats);
+            stats.dump(e);
         }
     }
 
@@ -840,25 +1098,66 @@ Rock::SwapDir::statfs(StoreEntry &e) const
 
 }
 
+SBuf
+Rock::SwapDir::inodeMapPath() const
+{
+    return Ipc::Mem::Segment::Name(SBuf(path), "map");
+}
+
+const char *
+Rock::SwapDir::freeSlotsPath() const
+{
+    static String spacesPath;
+    spacesPath = path;
+    spacesPath.append("_spaces");
+    return spacesPath.termedBuf();
+}
+
+bool
+Rock::SwapDir::hasReadableEntry(const StoreEntry &e) const
+{
+    return map->hasReadableEntry(reinterpret_cast<const cache_key*>(e.key));
+}
+
 namespace Rock
 {
-RunnerRegistrationEntry(rrAfterConfig, SwapDirRr);
+RunnerRegistrationEntry(SwapDirRr);
 }
 
-void Rock::SwapDirRr::create(const RunnerRegistry &)
+void Rock::SwapDirRr::create()
 {
-    Must(owners.empty());
+    Must(mapOwners.empty() && freeSlotsOwners.empty());
     for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
         if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
-            Rock::SwapDir::DirMap::Owner *const owner =
-                Rock::SwapDir::DirMap::Init(sd->path, sd->entryLimitAllowed());
-            owners.push_back(owner);
+            const int64_t capacity = sd->slotLimitActual();
+
+            SwapDir::DirMap::Owner *const mapOwner =
+                SwapDir::DirMap::Init(sd->inodeMapPath(), capacity);
+            mapOwners.push_back(mapOwner);
+
+            // TODO: somehow remove pool id and counters from PageStack?
+            Ipc::Mem::Owner<Ipc::Mem::PageStack> *const freeSlotsOwner =
+                shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(),
+                                             Ipc::Mem::PageStack::IdForSwapDirSpace(i),
+                                             capacity,
+                                             0);
+            freeSlotsOwners.push_back(freeSlotsOwner);
+
+            // TODO: add method to initialize PageStack with no free pages
+            while (true) {
+                Ipc::Mem::PageId pageId;
+                if (!freeSlotsOwner->object()->pop(pageId))
+                    break;
+            }
         }
     }
 }
 
 Rock::SwapDirRr::~SwapDirRr()
 {
-    for (size_t i = 0; i < owners.size(); ++i)
-        delete owners[i];
+    for (size_t i = 0; i < mapOwners.size(); ++i) {
+        delete mapOwners[i];
+        delete freeSlotsOwners[i];
+    }
 }
+