]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Working Large Rock implementation, including concurrent store "rebuild"
authorAlex Rousskov <rousskov@measurement-factory.com>
Mon, 7 Jan 2013 19:41:41 +0000 (12:41 -0700)
committerAlex Rousskov <rousskov@measurement-factory.com>
Mon, 7 Jan 2013 19:41:41 +0000 (12:41 -0700)
and a simple "scan" replacement policy.

Needs polishing and possibly a swap.state equivalent for faster rebuild.

15 files changed:
src/fs/Makefile.am
src/fs/rock/RockDbCell.cc
src/fs/rock/RockDbCell.h
src/fs/rock/RockForward.h [new file with mode: 0644]
src/fs/rock/RockIoState.cc
src/fs/rock/RockIoState.h
src/fs/rock/RockRebuild.cc
src/fs/rock/RockRebuild.h
src/fs/rock/RockSwapDir.cc
src/fs/rock/RockSwapDir.h
src/ipc/Makefile.am
src/ipc/StoreMap.cc
src/ipc/StoreMap.h
src/ipc/StoreMapSlice.cc [new file with mode: 0644]
src/ipc/StoreMapSlice.h [new file with mode: 0644]

index 1c361986d4a76431ff7c53816eb9568e663ea0de..61ea8c2ef587b9bd970c7cb3669516ba1835979f 100644 (file)
@@ -38,6 +38,7 @@ libufs_la_SOURCES = \
 librock_la_SOURCES = \
        rock/RockDbCell.cc \
        rock/RockDbCell.h \
+       rock/RockForward.h \
        rock/RockIoState.cc \
        rock/RockIoState.h \
        rock/RockIoRequests.cc \
index a756f0ac8d90ffb0fc77c9b4a9985ecf415c0da8..bde7cc8a45f55e54ff34966f5fe172199f322e06 100644 (file)
@@ -4,15 +4,8 @@
 
 #include "squid.h"
 #include "fs/rock/RockDbCell.h"
-#include "ipc/StoreMap.h"
-#include "tools.h"
 
-Rock::DbCellHeader::DbCellHeader(): firstSlot(0), nextSlot(0), version(0),
-        payloadSize(0) {
-    memset(&key, 0, sizeof(key));
-}
-
-bool
-Rock::DbCellHeader::sane() const {
-    return firstSlot > 0;
+Rock::DbCellHeader::DbCellHeader()
+{
+    memset(this, 0, sizeof(*this));
 }
index 9830933b25bbf32c6703fe0695e76025bda61316..31fb250e523b30c2aa10b9d6acb635dfdcdbe263 100644 (file)
@@ -1,16 +1,14 @@
 #ifndef SQUID_FS_ROCK_DB_CELL_H
 #define SQUID_FS_ROCK_DB_CELL_H
 
-namespace Ipc
-{
-class StoreMapSlot;
-}
+#include "typedefs.h"
 
 namespace Rock
 {
 
 /** \ingroup Rock
  * Meta-information at the beginning of every db cell.
+ * Links multiple map slots belonging to the same entry into an entry chain.
  * Stored on disk and used as sizeof() argument so it must remain POD.
  */
 class DbCellHeader
@@ -18,14 +16,21 @@ class DbCellHeader
 public:
     DbCellHeader();
 
-    /// whether the freshly loaded header fields make sense
-    bool sane() const;
+    /// true iff no entry occupies this slot
+    bool empty() const { return !firstSlot && !nextSlot && !payloadSize; }
+
+    /* members below are not meaningful if empty() */
+
+    /// whether this slot is not corrupted
+    bool sane() const { return firstSlot >= 0 && nextSlot >= -1 &&
+       version > 0 && payloadSize > 0; }
 
     uint64_t key[2]; ///< StoreEntry key
-    uint32_t firstSlot; ///< first slot pointer in the entry chain
-    uint32_t nextSlot; ///< next slot pointer in the entry chain
-    uint32_t version; ///< entry chain version
-    uint32_t payloadSize; ///< cell contents size excluding this header
+    uint64_t entrySize; ///< total entry content size or zero if still unknown
+    uint32_t payloadSize; ///< slot contents size, always positive
+    uint32_t version;  ///< detects conflicts among same-key entries
+    sfileno firstSlot; ///< slot ID of the first slot occupied by the entry
+    sfileno nextSlot; ///< slot ID of the next slot occupied by the entry
 };
 
 } // namespace Rock
diff --git a/src/fs/rock/RockForward.h b/src/fs/rock/RockForward.h
new file mode 100644 (file)
index 0000000..df44f33
--- /dev/null
@@ -0,0 +1,35 @@
+#ifndef SQUID_FS_ROCK_FORWARD_H
+#define SQUID_FS_ROCK_FORWARD_H
+
+namespace Ipc
+{
+
+class StoreMapAnchor;
+class StoreMapSlice;
+
+namespace Mem
+{
+class PageId;
+}
+
+}
+
+
+namespace Rock
+{
+
+class SwapDir;
+
+/// db cell number, starting with cell 0 (always occupied by the db header)
+typedef sfileno SlotId;
+
+class Rebuild;
+
+class IoState;
+
+class DbCellHeader;
+
+}
+
+
+#endif /* SQUID_FS_ROCK_FORWARD_H */
index 9c2612eb4e50106a3be2d93dbc92d13d34ea23e9..9fe072694eb9237751c526728dfca1128ec78ab9 100644 (file)
@@ -3,8 +3,7 @@
  */
 
 #include "squid.h"
-#include "MemObject.h"
-#include "Parsing.h"
+#include "base/TextException.h"
 #include "DiskIO/DiskIOModule.h"
 #include "DiskIO/DiskIOStrategy.h"
 #include "DiskIO/WriteRequest.h"
 #include "fs/rock/RockIoRequests.h"
 #include "fs/rock/RockSwapDir.h"
 #include "globals.h"
+#include "MemObject.h"
+#include "Mem.h"
+#include "Parsing.h"
 
-Rock::IoState::IoState(SwapDir &aDir,
+Rock::IoState::IoState(Rock::SwapDir::Pointer &aDir,
                        StoreEntry *anEntry,
                        StoreIOState::STFNCB *cbFile,
                        StoreIOState::STIOCB *cbIo,
                        void *data):
-        dbSlot(NULL),
+        readableAnchor_(NULL),
+        writeableAnchor_(NULL),
+        sidCurrent(-1),
         dir(aDir),
-        slotSize(dir.slotSize),
-        objOffset(0)
+        slotSize(dir->slotSize),
+        objOffset(0),
+        theBuf(dir->slotSize)
 {
     e = anEntry;
-    // swap_filen, swap_dirn and diskOffset are set by the caller
+    e->lock("rock I/O");
+    // anchor, swap_filen, and swap_dirn are set by the caller
     file_callback = cbFile;
     callback = cbIo;
     callback_data = cbdataReference(data);
@@ -35,9 +41,17 @@ Rock::IoState::IoState(SwapDir &aDir,
 Rock::IoState::~IoState()
 {
     --store_open_disk_fd;
+
+    // The dir map entry may still be open for reading at the point because
+    // the map entry lock is associated with StoreEntry, not IoState.
+    // assert(!readableAnchor_);
+    assert(!writeableAnchor_);
+
     if (callback_data)
         cbdataReferenceDone(callback_data);
     theFile = NULL;
+
+    e->unlock("rock I/O");
 }
 
 void
@@ -48,136 +62,252 @@ Rock::IoState::file(const RefCount<DiskFile> &aFile)
     theFile = aFile;
 }
 
+const Ipc::StoreMapAnchor &
+Rock::IoState::readAnchor() const
+{
+    assert(readableAnchor_);
+    return *readableAnchor_;
+}
+
+Ipc::StoreMapAnchor &
+Rock::IoState::writeAnchor()
+{
+    assert(writeableAnchor_);
+    return *writeableAnchor_;
+}
+
+/// convenience wrapper returning the map slot we are reading now
+const Ipc::StoreMapSlice &
+Rock::IoState::currentReadableSlice() const
+{
+    return dir->map->readableSlice(swap_filen, sidCurrent);
+}
+
 void
 Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
 {
+    debugs(79, 7, swap_filen << " reads from " << coreOff);
+
     assert(theFile != NULL);
     assert(coreOff >= 0);
 
-    Ipc::Mem::PageId pageId;
-    pageId.pool = dir.index;
-    if (coreOff < objOffset) { // rewind
-        pageId.number = dbSlot->firstSlot;
-        dbSlot = &dir.dbSlot(pageId);
+    // if we are dealing with the first read or
+    // if the offset went backwords, start searching from the beginning
+    if (sidCurrent < 0 || coreOff < objOffset) {
+        sidCurrent = readAnchor().start;
         objOffset = 0;
     }
 
-    while (coreOff >= objOffset + dbSlot->payloadSize) {
-        objOffset += dbSlot->payloadSize;
-        pageId.number = dbSlot->nextSlot;
-        assert(pageId); // XXX: should be an error?
-        dbSlot = &dir.dbSlot(pageId);
+    while (coreOff >= objOffset + currentReadableSlice().size) {
+        objOffset += currentReadableSlice().size;
+        sidCurrent = currentReadableSlice().next;
+        assert(sidCurrent >= 0); // XXX: handle "read offset too big" error
     }
-    if (pageId)
-        diskOffset = dir.diskOffset(pageId);
 
     offset_ = coreOff;
     len = min(len,
-        static_cast<size_t>(objOffset + dbSlot->payloadSize - coreOff));
+        static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
 
     assert(read.callback == NULL);
     assert(read.callback_data == NULL);
     read.callback = cb;
     read.callback_data = cbdataReference(data);
 
+    const uint64_t diskOffset = dir->diskOffset(sidCurrent);
     theFile->read(new ReadRequest(::ReadRequest(buf,
         diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
 }
 
-// We only write data when full slot is accumulated or when close() is called.
-// We buffer, in part, to avoid forcing OS to _read_ old unwritten portions
-// of the slot when the write does not end at the page or sector boundary.
-void
+/// wraps tryWrite() to handle deep write failures centrally and safely
+bool
 Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
 {
-    assert(dbSlot);
-
-    if (theBuf.isNull()) {
-        theBuf.init(min(size + sizeof(DbCellHeader), slotSize), slotSize);
-        theBuf.appended(sizeof(DbCellHeader)); // will fill header in doWrite
+    bool success = false;
+    try {
+        tryWrite(buf, size, coreOff);
+        success = true;
+    } catch (const std::exception &e) { // TODO: should we catch ... as well?
+        debugs(79, 2, "db write error: " << e.what());
+        dir->writeError(swap_filen);
+        finishedWriting(DISK_ERROR);
+        // 'this' might be gone beyond this point; fall through to free buf
     }
 
-    if (size <= static_cast<size_t>(theBuf.spaceSize()))
-        theBuf.append(buf, size);
-    else {
-        Ipc::Mem::PageId pageId;
-        if (!dir.popDbSlot(pageId)) {
-            debugs(79, DBG_IMPORTANT, "WARNING: Rock cache_dir '" << dir.path <<
-                   "' run out of DB slots");
-            dir.writeError(swap_filen);
-            // XXX: do we need to destroy buf on error?
-            if (dtor)
-                (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
-            // XXX: do we need to call callback on error?
-            callBack(DISK_ERROR);
-            return;
-        }
-        DbCellHeader &nextDbSlot = dir.dbSlot(pageId);
-        memcpy(nextDbSlot.key, dbSlot->key, sizeof(nextDbSlot.key));
-        nextDbSlot.firstSlot = dbSlot->firstSlot;
-        nextDbSlot.nextSlot = 0;
-        nextDbSlot.version = dbSlot->version;
-        nextDbSlot.payloadSize = 0;
+    // careful: 'this' might be gone here
+    if (dtor)
+        (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
+
+    return success;
+}
+
+/** We only write data when full slot is accumulated or when close() is called.
+ We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of
+ the slot when the write does not end at the page or sector boundary. */
+void
+Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
+{
+    debugs(79, 7, swap_filen << " writes " << size << " more");
+
+    // either this is the first write or append; we do not support write gaps
+    assert(!coreOff || coreOff == -1);
 
-        dbSlot->nextSlot = pageId.number;
+    // allocate the first slice diring the first write
+    if (!coreOff) {
+        assert(sidCurrent < 0);
+        sidCurrent = reserveSlotForWriting(); // throws on failures
+        assert(sidCurrent >= 0);
+        writeAnchor().start = sidCurrent;
+    }
 
-        const size_t left = size - theBuf.spaceSize();
-        offset_ += theBuf.spaceSize(); // so that Core thinks we wrote it
-        theBuf.append(buf, theBuf.spaceSize());
+    // buffer incoming data in slot buffer and write overflowing or final slots
+    // quit when no data left or we stopped writing on reentrant error
+    while (size > 0 && theFile != NULL) {
+        assert(sidCurrent >= 0);
+        const size_t processed = writeToBuffer(buf, size);
+        buf += processed;
+        size -= processed;
+        const bool overflow = size > 0;
+
+        // We do not write a full buffer without overflow because
+        // we would not yet know what to set the nextSlot to.
+        if (overflow) {
+            const SlotId sidNext = reserveSlotForWriting(); // throws
+            assert(sidNext >= 0);
+            writeToDisk(sidNext);
+        }
+    }
+}
 
-        doWrite();
+/// Buffers incoming data for the current slot.
+/// Returns the number of bytes buffered.
+size_t
+Rock::IoState::writeToBuffer(char const *buf, size_t size)
+{
+    // do not buffer a cell header for nothing
+    if (!size)
+        return 0;
 
-        dbSlot = &nextDbSlot;
-        diskOffset = dir.diskOffset(pageId);
-        theBuf.init(min(left, slotSize), slotSize);
-        write(buf + size - left, left, -1, NULL);
+    if (!theBuf.size) {
+        // will fill the header in writeToDisk when the next slot is known
+        theBuf.appended(sizeof(DbCellHeader));
     }
 
-    if (dtor)
-        (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
+    size_t forCurrentSlot = min(size, static_cast<size_t>(theBuf.spaceSize()));
+    theBuf.append(buf, forCurrentSlot);
+    offset_ += forCurrentSlot; // so that Core thinks we wrote it
+    return forCurrentSlot;
 }
 
-// write what was buffered during write() calls
+/// write what was buffered during write() calls
+/// negative sidNext means this is the last write request for this entry
 void
-Rock::IoState::doWrite(const bool isLast)
+Rock::IoState::writeToDisk(const SlotId sidNext)
 {
     assert(theFile != NULL);
-    assert(!theBuf.isNull());
+    assert(theBuf.size >= sizeof(DbCellHeader));
+
+    if (sidNext < 0) { // we are writing the last slot
+        e->swap_file_sz = offset_;
+        writeAnchor().basics.swap_file_sz = offset_; // would not hurt, right?
+    }
 
     // TODO: if DiskIO module is mmap-based, we should be writing whole pages
     // to avoid triggering read-page;new_head+old_tail;write-page overheads
 
+    const uint64_t diskOffset = dir->diskOffset(sidCurrent);
     debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
-           theBuf.contentSize());
+           theBuf.size);
+
+    // finalize map slice
+    Ipc::StoreMap::Slice &slice =
+        dir->map->writeableSlice(swap_filen, sidCurrent);
+    slice.next = sidNext;
+    slice.size = theBuf.size - sizeof(DbCellHeader);
+
+    // finalize db cell header
+    DbCellHeader header;
+    memcpy(header.key, e->key, sizeof(header.key));
+    header.firstSlot = writeAnchor().start;
+    header.nextSlot = sidNext;
+    header.payloadSize = theBuf.size - sizeof(DbCellHeader);
+    header.entrySize = e->swap_file_sz; // may still be zero unless sidNext < 0
+    header.version = writeAnchor().basics.timestamp;
+
+    // copy finalized db cell header into buffer
+    memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
+
+    // and now allocate another buffer for the WriteRequest so that
+    // we can support concurrent WriteRequests (and to ease cleaning)
+    // TODO: should we limit the number of outstanding requests?
+    size_t wBufCap = 0;
+    void *wBuf = memAllocBuf(theBuf.size, &wBufCap);
+    memcpy(wBuf, theBuf.mem, theBuf.size);
+
+    WriteRequest *const r = new WriteRequest(
+        ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
+            memFreeBufFunc(wBufCap)), this, sidNext < 0);
+    theBuf.clear();
 
-    dbSlot->payloadSize = theBuf.contentSize() - sizeof(DbCellHeader);
-    memcpy(theBuf.content(), dbSlot, sizeof(DbCellHeader));
+    sidCurrent = sidNext;
 
-    assert(static_cast<size_t>(theBuf.contentSize()) <= slotSize);
     // theFile->write may call writeCompleted immediatelly
-    WriteRequest *const r = new WriteRequest(
-        ::WriteRequest(theBuf.content(), diskOffset, theBuf.contentSize(),
-                       theBuf.freeFunc()), this, isLast);
     theFile->write(r);
 }
 
-//
+/// finds and returns a free db slot to fill or throws
+Rock::SlotId
+Rock::IoState::reserveSlotForWriting()
+{
+    Ipc::Mem::PageId pageId;
+    if (dir->useFreeSlot(pageId))
+        return pageId.number-1;
+
+    // This may happen when the number of available db slots is close to the
+    // number of concurrent requests reading or writing those slots, which may
+    // happen when the db is "small" compared to the request traffic OR when we
+    // are rebuilding and have not loaded "many" entries or empty slots yet.
+    throw TexcHere("ran out of free db slots");
+}
+
 void
 Rock::IoState::finishedWriting(const int errFlag)
 {
     // we incremented offset_ while accumulating data in write()
+    writeableAnchor_ = NULL;
     callBack(errFlag);
 }
 
 void
 Rock::IoState::close(int how)
 {
-    debugs(79, 3, HERE << swap_filen << " accumulated: " << offset_ <<
-           " how=" << how);
-    if (how == wroteAll && !theBuf.isNull())
-        doWrite(true);
-    else
-        callBack(how == writerGone ? DISK_ERROR : 0); // TODO: add DISK_CALLER_GONE
+    debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how <<
+           " buf: " << theBuf.size << " callback: " << callback);
+
+    if (!theFile) {
+        debugs(79, 3, "I/O already canceled");
+        assert(!callback);
+        assert(!writeableAnchor_);
+        assert(!readableAnchor_);
+        return;
+    }
+
+    switch (how) {
+    case wroteAll:
+        assert(theBuf.size > 0); // we never flush last bytes on our own
+        writeToDisk(-1); // flush last, yet unwritten slot to disk
+        return; // writeCompleted() will callBack()
+
+    case writerGone:
+        assert(writeableAnchor_);
+        dir->writeError(swap_filen); // abort a partially stored entry
+        finishedWriting(DISK_ERROR);
+        return;
+
+    case readerDone:
+        callBack(0);
+        return;
+    }
 }
 
 /// close callback (STIOCB) dialer: breaks dependencies and
index 191bf25c3c7fadebba783f3da4dff0550ffea2d9..8c4ff5313fb6f5887e9bdc1c706c020016d658f3 100644 (file)
@@ -1,8 +1,8 @@
 #ifndef SQUID_FS_ROCK_IO_STATE_H
 #define SQUID_FS_ROCK_IO_STATE_H
 
-#include "MemBuf.h"
-#include "SwapDir.h"
+#include "fs/rock/RockSwapDir.h"
+#include "MemBlob.h"
 
 class DiskFile;
 
@@ -18,33 +18,48 @@ class IoState: public ::StoreIOState
 public:
     typedef RefCount<IoState> Pointer;
 
-    IoState(SwapDir &aDir, StoreEntry *e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data);
+    IoState(Rock::SwapDir::Pointer &aDir, StoreEntry *e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data);
     virtual ~IoState();
 
     void file(const RefCount<DiskFile> &aFile);
 
     // ::StoreIOState API
     virtual void read_(char *buf, size_t size, off_t offset, STRCB * callback, void *callback_data);
-    virtual void write(char const *buf, size_t size, off_t offset, FREE * free_func);
+    virtual bool write(char const *buf, size_t size, off_t offset, FREE * free_func);
     virtual void close(int how);
 
-    void finishedWriting(int errFlag);
+    /// whether we are still waiting for the I/O results (i.e., not closed)
+    bool stillWaiting() const { return theFile != NULL; }
 
-    int64_t diskOffset; ///< the start of this cell inside the db file
-    DbCellHeader *dbSlot; ///< current db slot, used for writing
+    /// called by SwapDir::writeCompleted() after the last write and on error
+    void finishedWriting(const int errFlag);
 
     MEMPROXY_CLASS(IoState);
 
+    /* one and only one of these will be set and locked; access via *Anchor() */
+    const Ipc::StoreMapAnchor *readableAnchor_; ///< starting point for reading
+    Ipc::StoreMapAnchor *writeableAnchor_; ///< starting point for writing
+
+    SlotId sidCurrent; ///< ID of the db slot currently being read or written
+
 private:
-    void doWrite(const bool isLast = false);
+    const Ipc::StoreMapAnchor &readAnchor() const;
+    Ipc::StoreMapAnchor &writeAnchor();
+    const Ipc::StoreMapSlice &currentReadableSlice() const;
+
+    void tryWrite(char const *buf, size_t size, off_t offset);
+    size_t writeToBuffer(char const *buf, size_t size);
+    void writeToDisk(const SlotId nextSlot);
+    SlotId reserveSlotForWriting();
+    
     void callBack(int errflag);
 
-    SwapDir &dir; ///< swap dir object
+    Rock::SwapDir::Pointer dir; ///< swap dir that initiated I/O
     const size_t slotSize; ///< db cell size
     int64_t objOffset; ///< object offset for current db slot
 
     RefCount<DiskFile> theFile; // "file" responsible for this I/O
-    MemBuf theBuf; // use for write content accumulation only
+    MemBlob theBuf; // use for write content accumulation only
 };
 
 MEMPROXY_CLASS_INLINE(IoState);
index 59096d29ad479d03d8654211fe46b4ab4ca381a7..077b2ab36d7c27dfb6632e4d653220b54db37791 100644 (file)
 
 CBDATA_NAMESPACED_CLASS_INIT(Rock, Rebuild);
 
+namespace Rock {
+
+/// maintains information about the store entry being loaded from disk
+/// used for identifying partially stored/loaded entries
+class LoadingEntry {
+public:
+    LoadingEntry(): size(0), version(0), state(leEmpty), anchored(0),
+        mapped(0), freed(0), more(-1) {}
+
+    /* store entry-level information indexed by sfileno */
+    uint64_t size; ///< payload seen so far
+    uint32_t version; ///< DbCellHeader::version to distinguish same-URL chains
+    uint32_t state:3;  ///< current entry state (one of the State values)
+    uint32_t anchored:1;  ///< whether we loaded the inode slot for this entry
+
+    /* db slot-level information indexed by slotId, starting with firstSlot */
+    uint32_t mapped:1;  ///< whether this slot was added to a mapped entry
+    uint32_t freed:1;  ///< whether this slot was marked as free
+    sfileno more:25; ///< another slot in some entry chain (unordered)
+    bool used() const { return freed || mapped || more != -1; }
+
+    /// possible entry states
+    typedef enum { leEmpty = 0, leLoading, leLoaded, leCorrupted, leIgnored } State;
+};
+
+} /* namespace Rock */
+
+/** 
+    Several layers of information is manipualted during the rebuild:
+
+    Store Entry: Response message plus all the metainformation associated with
+    it. Identified by store key. At any given time, from Squid point
+    of view, there is only one entry with a given key, but several
+    different entries with the same key can be observed in any historical
+    archive (such as an access log or a store database).
+
+    Slot chain: A sequence of db slots representing a Store Entry state at
+    some point in time. Identified by key+version combination. Due to
+    transaction aborts, crashes, and idle periods, some chains may contain
+    incomplete or stale information. We assume that no two different chains
+    have the same key and version. If that assumption fails, we may serve a
+    hodgepodge entry during rebuild, until "extra" slots are loaded/noticed.
+
+    Db slot: A db record containing a piece of a single store entry and linked
+    to other slots with the same key and version fields, forming a chain.
+    Slots are identified by their absolute position in the database file,
+    which is naturally unique.
+
+
+    Except for the "mapped", "freed", and "more" fields, LoadingEntry info is
+    entry-level and is stored at fileno position. In other words, the array of
+    LoadingEntries should be interpreted as two arrays, one that maps slot ID
+    to the LoadingEntry::mapped/free/more members, and the second one that maps
+    fileno to all other LoadingEntry members. StoreMap maps slot key to fileno.
+
+
+    When information from the newly loaded db slot contradicts the entry-level
+    information collected so far (e.g., the versions do not match or the total
+    chain size after the slot contribution exceeds the expected number), the
+    whole entry (and not just the chain or the slot!) is declared corrupted.
+
+    Why invalidate the whole entry? Rock Store is written for high-load
+    environments with large caches, where there is usually very few idle slots
+    in the database. A space occupied by a purged entry is usually immediately
+    reclaimed. A Squid crash or a transaction abort is rather unlikely to
+    leave a relatively large number of stale slots in the database. Thus, the
+    number of potentially corrupted entries is relatively small. On the other
+    hand, the damage from serving a single hadgepodge entry may be significant
+    to the user. In such an environment, invalidating the whole entry has
+    negligible performance impact but saves us from high-damage bugs.
+*/
+
+
 Rock::Rebuild::Rebuild(SwapDir *dir): AsyncJob("Rock::Rebuild"),
         sd(dir),
+        entries(NULL),
         dbSize(0),
         dbEntrySize(0),
         dbEntryLimit(0),
         dbSlot(0),
         fd(-1),
         dbOffset(0),
-        filen(0)
+        slotPos(0),
+        validationPos(0)
 {
     assert(sd);
     memset(&counts, 0, sizeof(counts));
     dbSize = sd->diskOffsetLimit(); // we do not care about the trailer waste
     dbEntrySize = sd->slotSize;
     dbEntryLimit = sd->entryLimit();
-    processed.reserve(dbEntryLimit);
-    while (static_cast<int>(processed.size()) < dbEntryLimit)
-        processed.push_back(false);
 }
 
 Rock::Rebuild::~Rebuild()
 {
     if (fd >= 0)
         file_close(fd);
+    delete[] entries;
 }
 
 /// prepares and initiates entry loading sequence
@@ -66,12 +139,18 @@ Rock::Rebuild::start()
     if (fd < 0)
         failure("cannot open db", errno);
 
-    char buf[SwapDir::HeaderSize];
-    if (read(fd, buf, sizeof(buf)) != SwapDir::HeaderSize)
+    char hdrBuf[SwapDir::HeaderSize];
+    if (read(fd, hdrBuf, sizeof(hdrBuf)) != SwapDir::HeaderSize)
         failure("cannot read db header", errno);
 
+    // slot prefix of SM_PAGE_SIZE should fit both core entry header and ours
+    assert(sizeof(DbCellHeader) < SM_PAGE_SIZE);
+    buf.init(SM_PAGE_SIZE, SM_PAGE_SIZE);
+
     dbOffset = SwapDir::HeaderSize;
-    filen = 0;
+    slotPos = 0;
+
+    entries = new LoadingEntry[dbEntryLimit];
 
     checkpoint();
 }
@@ -80,19 +159,15 @@ Rock::Rebuild::start()
 void
 Rock::Rebuild::checkpoint()
 {
-    if (dbOffset < dbSize)
+    if (!done())
         eventAdd("Rock::Rebuild", Rock::Rebuild::Steps, this, 0.01, 1, true);
-    else
-    if (!doneAll()) {
-        eventAdd("Rock::Rebuild::Step2", Rock::Rebuild::Steps2, this, 0.01, 1,
-                 true);
-    }
 }
 
 bool
 Rock::Rebuild::doneAll() const
 {
-    return dbSlot >= dbSize && AsyncJob::doneAll();
+    return dbOffset >= dbSize && validationPos >= dbEntryLimit &&
+        AsyncJob::doneAll();
 }
 
 void
@@ -103,16 +178,20 @@ Rock::Rebuild::Steps(void *data)
 }
 
 void
-Rock::Rebuild::Steps2(void *data)
+Rock::Rebuild::steps()
 {
-    // use async call to enable job call protection that time events lack
-    CallJobHere(47, 5, static_cast<Rebuild*>(data), Rock::Rebuild, steps2);
+    if (dbOffset < dbSize)
+        loadingSteps();
+    else
+        validationSteps();
+
+    checkpoint();
 }
 
 void
-Rock::Rebuild::steps()
+Rock::Rebuild::loadingSteps()
 {
-    debugs(47,5, HERE << sd->index << " filen " << filen << " at " <<
+    debugs(47,5, HERE << sd->index << " slot " << slotPos << " at " <<
            dbOffset << " <= " << dbSize);
 
     // Balance our desire to maximize the number of entries processed at once
@@ -123,9 +202,9 @@ Rock::Rebuild::steps()
 
     int loaded = 0;
     while (loaded < dbEntryLimit && dbOffset < dbSize) {
-        doOneEntry();
+        loadOneSlot();
         dbOffset += dbEntrySize;
-        ++filen;
+        ++slotPos;
         ++loaded;
 
         if (counts.scancount % 1000 == 0)
@@ -142,47 +221,12 @@ Rock::Rebuild::steps()
             break;
         }
     }
-
-    checkpoint();
 }
 
 void
-Rock::Rebuild::steps2()
+Rock::Rebuild::loadOneSlot()
 {
-    debugs(47,5, HERE << sd->index << " filen " << filen << " at " <<
-           dbSlot << " <= " << dbSize);
-
-    // Balance our desire to maximize the number of slots processed at once
-    // (and, hence, minimize overheads and total rebuild time) with a
-    // requirement to also process Coordinator events, disk I/Os, etc.
-    const int maxSpentMsec = 50; // keep small: most RAM I/Os are under 1ms
-    const timeval loopStart = current_time;
-
-    int loaded = 0;
-    while (dbSlot < dbSize) {
-        doOneSlot();
-        ++dbSlot;
-        ++loaded;
-
-        if (opt_foreground_rebuild)
-            continue; // skip "few entries at a time" check below
-
-        getCurrentTime();
-        const double elapsedMsec = tvSubMsec(loopStart, current_time);
-        if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
-            debugs(47, 5, HERE << "pausing after " << loaded << " slots in " <<
-                   elapsedMsec << "ms; " << (elapsedMsec/loaded) << "ms per slot");
-            break;
-        }
-    }
-
-    checkpoint();
-}
-
-void
-Rock::Rebuild::doOneEntry()
-{
-    debugs(47,5, HERE << sd->index << " filen " << filen << " at " <<
+    debugs(47,5, HERE << sd->index << " slot " << slotPos << " at " <<
            dbOffset << " <= " << dbSize);
 
     ++counts.scancount;
@@ -190,56 +234,144 @@ Rock::Rebuild::doOneEntry()
     if (lseek(fd, dbOffset, SEEK_SET) < 0)
         failure("cannot seek to db entry", errno);
 
-    MemBuf buf;
-    buf.init(sizeof(DbCellHeader), sizeof(DbCellHeader));
+    buf.reset();
 
     if (!storeRebuildLoadEntry(fd, sd->index, buf, counts))
         return;
 
-    // get our header
-    Ipc::Mem::PageId pageId;
-    pageId.pool = sd->index;
-    pageId.number = filen + 1;
-    DbCellHeader &header = sd->dbSlot(pageId);
-    assert(!header.sane());
+    const SlotId slotId = slotPos;
 
+    // get our header
+    DbCellHeader header;
     if (buf.contentSize() < static_cast<mb_size_t>(sizeof(header))) {
         debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
                "Ignoring truncated cache entry meta data at " << dbOffset);
-        invalidSlot(pageId);
+        freeSlotIfIdle(slotId, true);
         return;
     }
-    memcpy(&header, buf.content(), sizeof(header));
 
+    memcpy(&header, buf.content(), sizeof(header));
+    if (header.empty()) {
+        freeSlotIfIdle(slotId, false);
+        return;
+    }
     if (!header.sane()) {
         debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
                "Ignoring malformed cache entry meta data at " << dbOffset);
-        invalidSlot(pageId);
+        freeSlotIfIdle(slotId, true);
         return;
     }
+    buf.consume(sizeof(header)); // optimize to avoid memmove()
+
+    useNewSlot(slotId, header);
+}
+
+/// parse StoreEntry basics and add them to the map, returning true on success
+bool
+Rock::Rebuild::importEntry(Ipc::StoreMapAnchor &anchor, const sfileno fileno, const DbCellHeader &header)
+{
+    cache_key key[SQUID_MD5_DIGEST_LENGTH];
+    StoreEntry loadedE;
+    if (!storeRebuildParseEntry(buf, loadedE, key, counts, 0))
+        return false;
+
+    const uint64_t knownSize = header.entrySize > 0 ?
+        header.entrySize : anchor.basics.swap_file_sz;
+    if (!loadedE.swap_file_sz && knownSize)
+        loadedE.swap_file_sz = knownSize;
+    // the entry size may still be unknown at this time
+
+    debugs(47, 8, "importing entry basics for " << fileno);
+    anchor.set(loadedE);
+
+    // we have not validated whether all db cells for this entry were loaded
+    EBIT_CLR(anchor.basics.flags, ENTRY_VALIDATED);
+
+    // loadedE->dump(5);
+
+    return true;
 }
 
 void
-Rock::Rebuild::doOneSlot()
+Rock::Rebuild::validationSteps()
 {
-    debugs(47,5, HERE << sd->index << " filen " << filen << " at " <<
-           dbSlot << " <= " << dbSize);
+    debugs(47, 5, sd->index << " validating from " << validationPos);
 
-    if (processed[dbSlot])
-        return;
+    // see loadingSteps() for the rationale; TODO: avoid duplication
+    const int maxSpentMsec = 50; // keep small: validation does not do I/O
+    const timeval loopStart = current_time;
 
-    Ipc::Mem::PageId pageId;
-    pageId.pool = sd->index;
-    pageId.number = dbSlot + 1;
-    const DbCellHeader &dbSlot = sd->dbSlot(pageId);
-    assert(dbSlot.sane());
+    int validated = 0;
+    while (validationPos < dbEntryLimit) {
+        validateOneEntry();
+        ++validationPos;
+        ++validated;
 
-    pageId.number = dbSlot.firstSlot;
-    //const DbCellHeader &firstChainSlot = sd->dbSlot(pageId);
+        if (validationPos % 1000 == 0)
+            debugs(20, 2, "validated: " << validationPos);
 
-    /* Process all not yet loaded slots, verify entry chains, if chain
-       is valid, load entry from first slot similar to small rock,
-       call SwapDir::addEntry (needs to be restored). */
+        if (opt_foreground_rebuild)
+            continue; // skip "few entries at a time" check below
+
+        getCurrentTime();
+        const double elapsedMsec = tvSubMsec(loopStart, current_time);
+        if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
+            debugs(47, 5, "pausing after " << validated << " entries in " <<
+                   elapsedMsec << "ms; " << (elapsedMsec/validated) << "ms per entry");
+            break;
+        }
+    }
+}
+
+void
+Rock::Rebuild::validateOneEntry()
+{
+    LoadingEntry &e = entries[validationPos];
+    switch (e.state) {
+
+    case LoadingEntry::leEmpty:
+        break; // no entry hashed to this position
+
+    case LoadingEntry::leLoading:
+        freeBadEntry(validationPos, "partially stored");
+        break;
+
+    case LoadingEntry::leLoaded:
+        break; // we have already unlocked this entry
+
+    case LoadingEntry::leCorrupted:
+        break; // we have already removed this entry
+    }
+}
+
+/// Marks remaining bad entry slots as free and unlocks the entry. The map
+/// cannot do this because Loading entries may have holes in the slots chain.
+void
+Rock::Rebuild::freeBadEntry(const sfileno fileno, const char *eDescription)
+{
+    debugs(47, 2, "cache_dir #" << sd->index << ' ' << eDescription <<
+           " entry " << fileno << " is ignored during rebuild");
+
+    Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno);
+
+    bool freedSome = false;
+    // free all loaded non-anchor slots
+    SlotId slotId = entries[anchor.start].more;
+    while (slotId >= 0) {
+        const SlotId next = entries[slotId].more;
+        freeSlot(slotId, false);
+        slotId = next;
+        freedSome = true;
+    }
+    // free anchor slot if it was loaded
+    if (entries[fileno].anchored) {
+        freeSlot(anchor.start, false);
+        freedSome = true;
+    }
+    assert(freedSome);
+
+    sd->map->forgetWritingEntry(fileno);
+    ++counts.invalid;
 }
 
 void
@@ -254,7 +386,7 @@ Rock::Rebuild::swanSong()
 void
 Rock::Rebuild::failure(const char *msg, int errNo)
 {
-    debugs(47,5, HERE << sd->index << " filen " << filen << " at " <<
+    debugs(47,5, HERE << sd->index << " slot " << slotPos << " at " <<
            dbOffset << " <= " << dbSize);
 
     if (errNo)
@@ -266,9 +398,285 @@ Rock::Rebuild::failure(const char *msg, int errNo)
            sd->index, sd->filePath, msg);
 }
 
-void Rock::Rebuild::invalidSlot(Ipc::Mem::PageId &pageId)
+/// adds slot to the free slot index
+void
+Rock::Rebuild::freeSlot(const SlotId slotId, const bool invalid)
 {
-    ++counts.invalid;
-    processed[pageId.number - 1] = true;
-    sd->dbSlotIndex->push(pageId);
+    debugs(47,5, sd->index << " frees slot " << slotId);
+    LoadingEntry &le = entries[slotId];
+    assert(!le.freed);
+    le.freed = 1;
+
+    if (invalid) {
+        ++counts.invalid;
+        //sd->unlink(fileno); leave garbage on disk, it should not hurt
+    }
+
+    Ipc::Mem::PageId pageId;
+    pageId.pool = sd->index+1;
+    pageId.number = slotId+1;
+    sd->freeSlots->push(pageId);
+}
+
+/// adds slot to the free slot index but only if the slot is unused
+void
+Rock::Rebuild::freeSlotIfIdle(const SlotId slotId, const bool invalid)
+{
+    const LoadingEntry &le = entries[slotId];
+
+    // mapped slots must be freed via freeBadEntry() to keep the map in sync
+    assert(!le.mapped);
+
+    if (!le.used())
+        freeSlot(slotId, invalid);
+}
+
+/// adds slot to the entry chain in the map
+void
+Rock::Rebuild::mapSlot(const SlotId slotId, const DbCellHeader &header)
+{
+    LoadingEntry &le = entries[slotId];
+    assert(!le.mapped);
+    assert(!le.freed);
+    le.mapped = 1;
+
+    Ipc::StoreMapSlice slice;
+    slice.next = header.nextSlot;
+    slice.size = header.payloadSize;
+    sd->map->importSlice(slotId, slice);
+}
+
+/// adds slot to an existing entry chain; caller must check that the slot
+/// belongs to the chain it is being added to
+void
+Rock::Rebuild::addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
+{
+    LoadingEntry &le = entries[fileno];
+    Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno);
+
+    assert(le.version == header.version);
+
+    // mark anchor as loaded or add the secondary slot to the chain
+    LoadingEntry &inode = entries[header.firstSlot];
+    if (header.firstSlot == slotId) {
+        debugs(47,5, "adding inode");
+        assert(!inode.freed);
+        le.anchored = 1;
+    } else {
+        debugs(47,9, "linking " << slotId << " to " << inode.more);
+        // we do not need to preserve the order
+        LoadingEntry &slice = entries[slotId];
+        assert(!slice.freed);
+        assert(slice.more < 0);
+        slice.more = inode.more;
+        inode.more = slotId;
+    }
+
+    if (header.firstSlot == slotId && !importEntry(anchor, fileno, header)) {
+        le.state = LoadingEntry::leCorrupted;
+        freeBadEntry(fileno, "corrupted metainfo");
+        return;
+    }
+
+    // set total entry size and/or check it for consistency
+    uint64_t totalSize = header.entrySize;
+    assert(totalSize != static_cast<uint64_t>(-1));
+    if (!totalSize && anchor.basics.swap_file_sz) {
+        assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
+        // perhaps we loaded a later slot (with entrySize) earlier
+        totalSize = anchor.basics.swap_file_sz;
+    } else
+    if (totalSize && !anchor.basics.swap_file_sz) {
+        anchor.basics.swap_file_sz = totalSize;
+        assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
+    } else
+    if (totalSize != anchor.basics.swap_file_sz) {
+        le.state = LoadingEntry::leCorrupted;
+        freeBadEntry(fileno, "size mismatch");
+        return;
+    }
+
+    le.size += header.payloadSize;
+
+    if (totalSize > 0 && le.size > totalSize) { // overflow
+        le.state = LoadingEntry::leCorrupted;
+        freeBadEntry(fileno, "overflowing");
+        return;
+    }
+
+    mapSlot(slotId, header);
+    if (totalSize > 0 && le.size == totalSize) {
+        // entry fully loaded, unlock it
+        // we have validated that all db cells for this entry were loaded
+        EBIT_SET(anchor.basics.flags, ENTRY_VALIDATED);
+        le.state = LoadingEntry::leLoaded;
+        sd->map->closeForWriting(fileno, false);
+        ++counts.objcount;
+    }
+}
+
+/// initialize housekeeping information for a newly accepted entry
+void
+Rock::Rebuild::primeNewEntry(Ipc::StoreMap::Anchor &anchor, const sfileno fileno, const DbCellHeader &header)
+{
+    anchor.setKey(reinterpret_cast<const cache_key*>(header.key));
+    assert(header.firstSlot >= 0);
+    anchor.start = header.firstSlot;
+
+    assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
+
+    LoadingEntry &le = entries[fileno];
+    le.state = LoadingEntry::leLoading;
+    le.version = header.version;
+    le.size = 0;
+}
+
+/// handle a slot from an entry that we have not seen before
+void
+Rock::Rebuild::startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
+{
+    // If some other from-disk entry is/was using this slot as its inode OR
+    // if some other from-disk entry is/was using our inode slot, then the 
+    // entries are conflicting. We cannot identify other entries, so we just
+    // remove ours and hope that the others were/will be handled correctly.
+    const LoadingEntry &slice = entries[slotId];
+    const LoadingEntry &inode = entries[header.firstSlot];
+    if (slice.used() || inode.used()) {
+        debugs(47,8, "slice/inode used: " << slice.used() << inode.used());
+        LoadingEntry &le = entries[fileno];
+        le.state = LoadingEntry::leCorrupted;
+        freeSlotIfIdle(slotId, slotId == header.firstSlot);
+        // if not idle, the other entry will handle its slice
+        ++counts.clashcount;
+        return;
+    }
+
+    // A miss may have been stored at our fileno while we were loading other
+    // slots from disk. We ought to preserve that entry because it is fresher.
+    const bool overwriteExisting = false;
+    if (Ipc::StoreMap::Anchor *anchor = sd->map->openForWritingAt(fileno, overwriteExisting)) {
+        primeNewEntry(*anchor, fileno, header);
+        addSlotToEntry(fileno, slotId, header); // may fail
+        assert(anchor->basics.swap_file_sz != static_cast<uint64_t>(-1));
+    } else {
+        // A new from-network entry is occupying our map slot; let it be, but
+        // save us from the trouble of going through the above motions again.
+        LoadingEntry &le = entries[fileno];
+        le.state = LoadingEntry::leIgnored;
+        freeSlotIfIdle(slotId, false);
+    }
+}
+
+/// does the header belong to the fileno entry being loaded?
+bool
+Rock::Rebuild::sameEntry(const sfileno fileno, const DbCellHeader &header) const
+{
+    const Ipc::StoreMap::Anchor &anchor = sd->map->writeableEntry(fileno);
+    const LoadingEntry &le = entries[fileno];
+    // any order will work, but do fast comparisons first:
+    return le.version == header.version &&
+        anchor.start == static_cast<Ipc::StoreMapSliceId>(header.firstSlot) &&
+        anchor.sameKey(reinterpret_cast<const cache_key*>(header.key));
+}
+
+/// is the new header consistent with information already loaded?
+bool
+Rock::Rebuild::canAdd(const sfileno fileno, const SlotId slotId, const DbCellHeader &header) const
+{
+    if (!sameEntry(fileno, header)) {
+        debugs(79, 7, "cannot add; wrong entry");
+        return false;
+    }
+
+    const LoadingEntry &le = entries[slotId];
+    // We cannot add a slot that was already declared free or mapped.
+    if (le.freed || le.mapped) {
+        debugs(79, 7, "cannot add; freed/mapped: " << le.freed << le.mapped);
+        return false;
+    }
+
+    if (slotId == header.firstSlot) {
+        // If we are the inode, the anchored flag cannot be set yet.
+        if (entries[fileno].anchored) {
+            debugs(79, 7, "cannot add; extra anchor");
+            return false;
+        }
+
+        // And there should have been some other slot for this entry to exist.
+        if (le.more < 0) {
+            debugs(79, 7, "cannot add; missing slots");
+            return false;
+        }
+
+        return true;
+    }
+
+    // We are the continuation slice so the more field is reserved for us.
+    if (le.more >= 0) {
+        debugs(79, 7, "cannot add; foreign slot");
+        return false;
+    }
+
+    return true;
+}
+
+/// handle freshly loaded (and validated) db slot header
+void
+Rock::Rebuild::useNewSlot(const SlotId slotId, const DbCellHeader &header)
+{
+    LoadingEntry &slice = entries[slotId];
+    assert(!slice.freed); // we cannot free what was not loaded
+
+    const cache_key *const key =
+        reinterpret_cast<const cache_key*>(header.key);
+    const sfileno fileno = sd->map->anchorIndexByKey(key);
+    assert(0 <= fileno && fileno < dbEntryLimit);
+
+    LoadingEntry &le = entries[fileno];
+    debugs(47,9, "entry " << fileno << " state: " << le.state << ", inode: " <<
+            header.firstSlot << ", size: " << header.payloadSize);
+
+    switch (le.state) {
+
+    case LoadingEntry::leEmpty: {
+        startNewEntry(fileno, slotId, header);
+        break;
+    }
+
+    case LoadingEntry::leLoading: {
+        if (canAdd(fileno, slotId, header)) {
+            addSlotToEntry(fileno, slotId, header);
+        } else {
+            // either the loading chain or this slot is stale;
+            // be conservative and ignore both (and any future ones)
+            le.state = LoadingEntry::leCorrupted;
+            freeBadEntry(fileno, "duplicated");
+            freeSlotIfIdle(slotId, slotId == header.firstSlot);
+            ++counts.dupcount;
+        }
+        break;
+    }
+
+    case LoadingEntry::leLoaded: {
+        // either the previously loaded chain or this slot is stale;
+        // be conservative and ignore both (and any future ones)
+        le.state = LoadingEntry::leCorrupted;
+        sd->map->freeEntry(fileno); // may not be immediately successful
+        freeSlotIfIdle(slotId, slotId == header.firstSlot);
+        ++counts.dupcount;
+        break;
+    }
+
+    case LoadingEntry::leCorrupted: {
+        // previously seen slots messed things up so we must ignore this one
+        freeSlotIfIdle(slotId, false);
+        break;
+    }
+
+    case LoadingEntry::leIgnored: {
+        // already replaced by a fresher or colliding from-network entry
+        freeSlotIfIdle(slotId, false);
+        break;
+    }
+    }
 }
index e5386a8bac1191a658c703977fb88ae51400fc67..d56182fc4feddee2d2dbf8506dbf85956b25d70d 100644 (file)
@@ -3,20 +3,14 @@
 
 #include "base/AsyncJob.h"
 #include "cbdata.h"
+#include "fs/rock/RockForward.h"
+#include "MemBuf.h"
 #include "store_rebuild.h"
 
-namespace Ipc
-{
-namespace Mem
-{
-class PageId;
-}
-}
-
 namespace Rock
 {
 
-class SwapDir;
+class LoadingEntry;
 
 /// \ingroup Rock
 /// manages store rebuild process: loading meta information from db on disk
@@ -35,13 +29,31 @@ protected:
 private:
     void checkpoint();
     void steps();
-    void steps2();
-    void doOneEntry();
-    void doOneSlot();
+    void loadingSteps();
+    void validationSteps();
+    void loadOneSlot();
+    void validateOneEntry();
+    bool importEntry(Ipc::StoreMapAnchor &anchor, const sfileno slotId, const DbCellHeader &header);
+    void freeBadEntry(const sfileno fileno, const char *eDescription);
+
     void failure(const char *msg, int errNo = 0);
-    void invalidSlot(Ipc::Mem::PageId &pageId);
+
+    void startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header);
+    void primeNewEntry(Ipc::StoreMapAnchor &anchor, const sfileno fileno, const DbCellHeader &header);
+    void addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header);
+    void useNewSlot(const SlotId slotId, const DbCellHeader &header);
+
+    void mapSlot(const SlotId slotId, const DbCellHeader &header);
+    void freeSlotIfIdle(const SlotId slotId, const bool invalid);
+    void freeBusySlot(const SlotId slotId, const bool invalid);
+    void freeSlot(const SlotId slotId, const bool invalid);
+
+    bool canAdd(const sfileno fileno, const SlotId slotId, const DbCellHeader &header) const;
+    bool sameEntry(const sfileno fileno, const DbCellHeader &header) const;
+
 
     SwapDir *sd;
+    LoadingEntry *entries; ///< store entries being loaded from disk
 
     int64_t dbSize;
     int dbEntrySize;
@@ -50,15 +62,13 @@ private:
 
     int fd; // store db file descriptor
     int64_t dbOffset;
-    int filen;
-
-    // TODO: use std::bitmap?
-    Vector<bool> processed; ///< true iff rebuilt is complete for a given slot
+    sfileno slotPos;
+    sfileno validationPos;
+    MemBuf buf;
 
     StoreRebuildData counts;
 
     static void Steps(void *data);
-    static void Steps2(void *data);
 
     CBDATA_CLASS2(Rebuild);
 };
index f9d6efbeeafb3cb97a3ba969189ce36ff30501a7..2bc3fd3d2e3ee41f861502e417fdfb8b1d6b5654 100644 (file)
@@ -31,7 +31,8 @@
 const int64_t Rock::SwapDir::HeaderSize = 16*1024;
 
 Rock::SwapDir::SwapDir(): ::SwapDir("rock"), 
-    slotSize(HeaderSize), filePath(NULL), io(NULL), map(NULL), dbSlots(NULL)
+    slotSize(HeaderSize), filePath(NULL), map(NULL), io(NULL), allSlots(NULL),
+    waitingForPage(NULL)
 {
 }
 
@@ -63,11 +64,11 @@ Rock::SwapDir::get(const cache_key *key)
         return NULL;
 
     sfileno filen;
-    const Ipc::StoreMapSlot *const slot = map->openForReading(key, filen);
+    const Ipc::StoreMapAnchor *const slot = map->openForReading(key, filen);
     if (!slot)
         return NULL;
 
-    const Ipc::StoreMapSlot::Basics &basics = slot->basics;
+    const Ipc::StoreMapAnchor::Basics &basics = slot->basics;
 
     // create a brand new store entry and initialize it with stored basics
     StoreEntry *e = new StoreEntry();
@@ -116,8 +117,8 @@ void Rock::SwapDir::disconnect(StoreEntry &e)
 uint64_t
 Rock::SwapDir::currentSize() const
 {
-    const uint64_t spaceSize = !dbSlotIndex ?
-        maxSize() : (slotSize * dbSlotIndex->size());
+    const uint64_t spaceSize = !freeSlots ?
+        maxSize() : (slotSize * freeSlots->size());
     // everything that is not free is in use
     return maxSize() - spaceSize;
 }
@@ -217,6 +218,7 @@ Rock::SwapDir::init()
 
     Must(!map);
     map = new DirMap(inodeMapPath());
+    map->cleaner = this;
 
     const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking";
     if (DiskIOModule *m = DiskIOModule::Find(ioModule)) {
@@ -233,9 +235,7 @@ Rock::SwapDir::init()
     theFile->configure(fileConfig);
     theFile->open(O_RDWR, 0644, this);
 
-    dbSlotIndex = shm_old(Ipc::Mem::PageStack)(spaceIndexPath());
-    dbSlots = new (reinterpret_cast<char *>(dbSlotIndex.getRaw()) +
-                   dbSlotIndex->stackSize()) DbCellHeader[entryLimitAllowed()];
+    freeSlots = shm_old(Ipc::Mem::PageStack)(freeSlotsPath());
 
     // Increment early. Otherwise, if one SwapDir finishes rebuild before
     // others start, storeRebuildComplete() will think the rebuild is over!
@@ -500,11 +500,6 @@ Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load)
     if (!map)
         return false;
 
-    // TODO: consider DB slots freed when older object would be replaced
-    if (dbSlotIndex->size() <
-        static_cast<unsigned int>(max(entriesNeeded(diskSpaceNeeded), 1)))
-        return false;
-
     // Do not start I/O transaction if there are less than 10% free pages left.
     // TODO: reserve page instead
     if (needsDiskStrand() &&
@@ -529,42 +524,29 @@ Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreI
     }
 
     sfileno filen;
-    Ipc::StoreMapSlot *const slot =
+    Ipc::StoreMapAnchor *const slot =
         map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
     if (!slot) {
         debugs(47, 5, HERE << "map->add failed");
         return NULL;
     }
 
-    Ipc::Mem::PageId pageId;
-    if (!popDbSlot(pageId)) {
-        debugs(79, DBG_IMPORTANT, "WARNING: Rock cache_dir '" << filePath <<
-               "' run out of DB slots");
-        map->free(filen);
-    }
-
+    assert(filen >= 0);
     slot->set(e);
 
     // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
     // If that does not happen, the entry will not decrement the read level!
 
-    IoState *sio = new IoState(*this, &e, cbFile, cbIo, data);
+    Rock::SwapDir::Pointer self(this);
+    IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
 
     sio->swap_dirn = index;
     sio->swap_filen = filen;
-    sio->diskOffset = diskOffset(pageId);
-
-    DbCellHeader &firstDbSlot = dbSlot(pageId);
-    memcpy(firstDbSlot.key, e.key, sizeof(firstDbSlot.key));
-    firstDbSlot.firstSlot = pageId.number;
-    firstDbSlot.nextSlot = 0;
-    ++firstDbSlot.version;
-    firstDbSlot.payloadSize = 0;
-    sio->dbSlot = &firstDbSlot;
+    sio->writeableAnchor_ = slot;
 
     debugs(47,5, HERE << "dir " << index << " created new filen " <<
            std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
-           sio->swap_filen << std::dec << " at " <<
+           sio->swap_filen << std::dec << " starting at " <<
            diskOffset(sio->swap_filen));
 
     sio->file(theFile);
@@ -607,44 +589,50 @@ Rock::SwapDir::entriesNeeded(const int64_t objSize) const
 }
 
 bool
-Rock::SwapDir::popDbSlot(Ipc::Mem::PageId &pageId)
+Rock::SwapDir::useFreeSlot(Ipc::Mem::PageId &pageId)
 {
-    return dbSlotIndex->pop(pageId);
-}
+    if (freeSlots->pop(pageId)) {
+        debugs(47, 5, "got a previously free slot: " << pageId);
+        return true;
+    }
 
-Rock::DbCellHeader &
-Rock::SwapDir::dbSlot(const Ipc::Mem::PageId &pageId)
-{
-    const DbCellHeader &s = const_cast<const SwapDir *>(this)->dbSlot(pageId);
-    return const_cast<DbCellHeader &>(s);
+    // catch free slots delivered to noteFreeMapSlice()
+    assert(!waitingForPage);
+    waitingForPage = &pageId;
+    if (map->purgeOne()) {
+        assert(!waitingForPage); // noteFreeMapSlice() should have cleared it
+        assert(pageId.set());
+        debugs(47, 5, "got a previously busy slot: " << pageId);
+        return true;
+    }
+    assert(waitingForPage == &pageId);
+    waitingForPage = NULL;
+
+    debugs(47, 3, "cannot get a slot; entries: " << map->entryCount());
+    return false;
 }
 
-const Rock::DbCellHeader &
-Rock::SwapDir::dbSlot(const Ipc::Mem::PageId &pageId) const
+bool
+Rock::SwapDir::validSlotId(const SlotId slotId) const
 {
-    assert(dbSlotIndex->pageIdIsValid(pageId));
-    return dbSlots[pageId.number - 1];
+    return 0 <= slotId && slotId < entryLimitAllowed();
 }
 
 void
-Rock::SwapDir::cleanReadable(const sfileno fileno)
-{
-    Ipc::Mem::PageId pageId = map->extras(fileno).pageId;
-    Ipc::Mem::PageId nextPageId = pageId;
-    while (pageId) {
-        const DbCellHeader &curDbSlot = dbSlot(pageId);
-        nextPageId.number = curDbSlot.nextSlot;
-        const DbCellHeader &nextDbSlot = dbSlot(nextPageId);
-        const bool sameChain = memcmp(curDbSlot.key, nextDbSlot.key,
-                                      sizeof(curDbSlot.key)) == 0 &&
-            curDbSlot.version == nextDbSlot.version;
-        dbSlotIndex->push(pageId);
-        if (sameChain)
-            pageId = nextPageId;
+Rock::SwapDir::noteFreeMapSlice(const sfileno sliceId)
+{
+    Ipc::Mem::PageId pageId;
+    pageId.pool = index+1;
+    pageId.number = sliceId+1;
+    if (waitingForPage) {
+        *waitingForPage = pageId;
+        waitingForPage = NULL;
+    } else {
+        freeSlots->push(pageId);
     }
 }
 
-// tries to open an old or being-written-to entry with swap_filen for reading
+// tries to open an old entry with swap_filen for reading
 StoreIOState::Pointer
 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
 {
@@ -669,30 +657,26 @@ Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOS
     // The are two ways an entry can get swap_filen: our get() locked it for
     // reading or our storeSwapOutStart() locked it for writing. Peeking at our
     // locked entry is safe, but no support for reading a filling entry.
-    const Ipc::StoreMapSlot *slot = map->peekAtReader(e.swap_filen);
+    const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen);
     if (!slot)
         return NULL; // we were writing afterall
 
-    IoState *sio = new IoState(*this, &e, cbFile, cbIo, data);
+    Rock::SwapDir::Pointer self(this);
+    IoState *sio = new IoState(self, &e, cbFile, cbIo, data);
 
     sio->swap_dirn = index;
     sio->swap_filen = e.swap_filen;
-    sio->dbSlot = &dbSlot(map->extras(e.swap_filen).pageId);
-
-    const Ipc::Mem::PageId &pageId = map->extras(e.swap_filen).pageId;
-    sio->diskOffset = diskOffset(pageId);
-    DbCellHeader &firstDbSlot = dbSlot(map->extras(e.swap_filen).pageId);
-    assert(memcmp(firstDbSlot.key, e.key, sizeof(firstDbSlot.key)));
-    assert(firstDbSlot.firstSlot == pageId.number);
+    sio->readableAnchor_ = slot;
+    sio->file(theFile);
 
     debugs(47,5, HERE << "dir " << index << " has old filen: " <<
            std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
            sio->swap_filen);
 
+    assert(slot->sameKey(static_cast<const cache_key*>(e.key)));
     assert(slot->basics.swap_file_sz > 0);
     assert(slot->basics.swap_file_sz == e.swap_file_sz);
 
-    sio->file(theFile);
     return sio;
 }
 
@@ -745,14 +729,24 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest
     assert(request->sio !=  NULL);
     IoState &sio = *request->sio;
 
+    // quit if somebody called IoState::close() while we were waiting
+    if (!sio.stillWaiting()) {
+        debugs(79, 3, "ignoring closed entry " << sio.swap_filen);
+        return;
+    }
+
     if (errflag == DISK_OK) {
-        // close, assuming we only write once; the entry gets the read lock
-        map->closeForWriting(sio.swap_filen, true);
         // do not increment sio.offset_ because we do it in sio->write()
-        if (request->isLast)
+        if (request->isLast) {
+            // close, the entry gets the read lock
+            map->closeForWriting(sio.swap_filen, true);
             sio.finishedWriting(errflag);
-    } else
+        }
+    } else {
         writeError(sio.swap_filen);
+        sio.finishedWriting(errflag);
+        // and hope that Core will call disconnect() to close the map entry
+    }
 }
 
 void
@@ -760,14 +754,14 @@ Rock::SwapDir::writeError(const sfileno fileno)
 {
     // Do not abortWriting here. The entry should keep the write lock
     // instead of losing association with the store and confusing core.
-    map->free(fileno); // will mark as unusable, just in case
-    // XXX: should we call IoState callback?
+    map->freeEntry(fileno); // will mark as unusable, just in case
+    // All callers must also call IoState callback, to propagate the error.
 }
 
 bool
 Rock::SwapDir::full() const
 {
-    return map && map->full();
+    return freeSlots != NULL && !freeSlots->size();
 }
 
 // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT,
@@ -783,49 +777,10 @@ Rock::SwapDir::diskFull()
 void
 Rock::SwapDir::maintain()
 {
-    debugs(47,3, HERE << "cache_dir[" << index << "] guards: " <<
-           !repl << !map << !full() << StoreController::store_dirs_rebuilding);
-
-    if (!repl)
-        return; // no means (cannot find a victim)
-
-    if (!map)
-        return; // no victims (yet)
-
-    if (!full())
-        return; // no need (to find a victim)
-
-    // XXX: UFSSwapDir::maintain says we must quit during rebuild
-    if (StoreController::store_dirs_rebuilding)
-        return;
-
-    debugs(47,3, HERE << "cache_dir[" << index << "] state: " << map->full() <<
-           ' ' << currentSize() << " < " << diskOffsetLimit());
-
-    // Hopefully, we find a removable entry much sooner (TODO: use time?)
-    const int maxProbed = 10000;
-    RemovalPurgeWalker *walker = repl->PurgeInit(repl, maxProbed);
-
-    // It really should not take that long, but this will stop "infinite" loops
-    const int maxFreed = 1000;
-    int freed = 0;
-    // TODO: should we purge more than needed to minimize overheads?
-    for (; freed < maxFreed && full(); ++freed) {
-        if (StoreEntry *e = walker->Next(walker))
-            e->release(); // will call our unlink() method
-        else
-            break; // no more objects
-    }
-
-    debugs(47,2, HERE << "Rock cache_dir[" << index << "] freed " << freed <<
-           " scanned " << walker->scanned << '/' << walker->locked);
-
-    walker->Done(walker);
-
-    if (full()) {
-        debugs(47, DBG_CRITICAL, "ERROR: Rock cache_dir[" << index << "] " <<
-               "is still full after freeing " << freed << " entries. A bug?");
-    }
+    // The Store calls this to free some db space, but there is nothing wrong
+    // with a full() db, except when db has to shrink after reconfigure, and
+    // we do not support shrinking yet (it would have to purge specific slots).
+    // TODO: Disable maintain() requests when they are pointless.
 }
 
 void
@@ -859,7 +814,7 @@ Rock::SwapDir::unlink(StoreEntry &e)
 {
     debugs(47, 5, HERE << e);
     ignoreReferences(e);
-    map->free(e.swap_filen);
+    map->freeEntry(e.swap_filen);
     disconnect(e);
 }
 
@@ -896,6 +851,12 @@ Rock::SwapDir::statfs(StoreEntry &e) const
             storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n",
                               entryCount, (100.0 * entryCount / limit));
 
+            const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size();
+            if (slotsFree <= static_cast<const unsigned int>(limit)) {
+                const int usedSlots = limit - static_cast<const int>(slotsFree);
+                storeAppendPrintf(&e, "Used slots:      %9d %.2f%%\n",
+                                  usedSlots, (100.0 * usedSlots / limit));
+            }
             if (limit < 100) { // XXX: otherwise too expensive to count
                 Ipc::ReadWriteLockStats stats;
                 map->updateStats(stats);
@@ -928,7 +889,7 @@ Rock::SwapDir::inodeMapPath() const {
 }
 
 const char *
-Rock::SwapDir::spaceIndexPath() const {
+Rock::SwapDir::freeSlotsPath() const {
     static String spacesPath;
     spacesPath = path;
     spacesPath.append("_spaces");
@@ -942,7 +903,7 @@ RunnerRegistrationEntry(rrAfterConfig, SwapDirRr);
 
 void Rock::SwapDirRr::create(const RunnerRegistry &)
 {
-    Must(mapOwners.empty() && dbSlotsOwners.empty());
+    Must(mapOwners.empty() && freeSlotsOwners.empty());
     for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
         if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
             const int64_t capacity = sd->entryLimitAllowed();
@@ -952,16 +913,16 @@ void Rock::SwapDirRr::create(const RunnerRegistry &)
             mapOwners.push_back(mapOwner);
 
             // XXX: remove pool id and counters from PageStack
-            Ipc::Mem::Owner<Ipc::Mem::PageStack> *const dbSlotsOwner =
-                shm_new(Ipc::Mem::PageStack)(sd->spaceIndexPath(),
-                                             i, capacity,
+            Ipc::Mem::Owner<Ipc::Mem::PageStack> *const freeSlotsOwner =
+                shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(),
+                                             i+1, capacity,
                                              sizeof(DbCellHeader));
-            dbSlotsOwners.push_back(dbSlotsOwner);
+            freeSlotsOwners.push_back(freeSlotsOwner);
 
             // XXX: add method to initialize PageStack with no free pages
             while (true) {
                 Ipc::Mem::PageId pageId;
-                if (!dbSlotsOwner->object()->pop(pageId))
+                if (!freeSlotsOwner->object()->pop(pageId))
                     break;
             }
         }
@@ -972,6 +933,6 @@ Rock::SwapDirRr::~SwapDirRr()
 {
     for (size_t i = 0; i < mapOwners.size(); ++i) {
         delete mapOwners[i];
-        delete dbSlotsOwners[i];
+        delete freeSlotsOwners[i];
     }
 }
index 60c54517332836b7d0af35a70225515518ce6ad4..1480601be0683e562c344173a5455736baf570cc 100644 (file)
@@ -5,6 +5,7 @@
 #include "DiskIO/DiskFile.h"
 #include "DiskIO/IORequestor.h"
 #include "fs/rock/RockDbCell.h"
+#include "fs/rock/RockForward.h"
 #include "ipc/StoreMap.h"
 #include "ipc/mem/Page.h"
 #include "ipc/mem/PageStack.h"
@@ -16,12 +17,13 @@ class WriteRequest;
 namespace Rock
 {
 
-class Rebuild;
-
 /// \ingroup Rock
 class SwapDir: public ::SwapDir, public IORequestor, public Ipc::StoreMapCleaner
 {
 public:
+    typedef RefCount<SwapDir> Pointer;
+    typedef Ipc::StoreMap DirMap;
+
     SwapDir();
     virtual ~SwapDir();
 
@@ -41,25 +43,24 @@ public:
     // temporary path to the shared memory map of first slots of cached entries
     const char *inodeMapPath() const;
     // temporary path to the shared memory stack of free slots
-    const char *spaceIndexPath() const;
+    const char *freeSlotsPath() const;
 
     int64_t entryLimitHigh() const { return SwapFilenMax; } ///< Core limit
     int64_t entryLimitAllowed() const;
 
-    bool popDbSlot(Ipc::Mem::PageId &pageId);
-    DbCellHeader &dbSlot(const Ipc::Mem::PageId &pageId);
-    const DbCellHeader &dbSlot(const Ipc::Mem::PageId &pageId) const;
+    /// removes a slot from a list of free slots or returns false
+    bool useFreeSlot(Ipc::Mem::PageId &pageId);
+    /// whether the given slot ID may point to a slot in this db
+    bool validSlotId(const SlotId slotId) const;
+    /// purges one or more entries to make full() false and free some slots
+    void purgeSome();
 
     int64_t diskOffset(Ipc::Mem::PageId &pageId) const;
+    int64_t diskOffset(int filen) const;
     void writeError(const sfileno fileno);
 
-    virtual void cleanReadable(const sfileno fileno);
-
-    // TODO: merge with MemStoreMapExtras?
-    struct MapExtras {
-        Ipc::Mem::PageId pageId;
-    };
-    typedef Ipc::StoreMapWithExtras<MapExtras> DirMap;
+    /* StoreMapCleaner API */
+    virtual void noteFreeMapSlice(const sfileno fileno);
 
     uint64_t slotSize; ///< all db slots are of this size
 
@@ -101,23 +102,24 @@ protected:
     void trackReferences(StoreEntry &e); ///< add to replacement policy scope
     void ignoreReferences(StoreEntry &e); ///< delete from repl policy scope
 
-    int64_t diskOffset(int filen) const;
     int64_t diskOffsetLimit() const;
     int entryLimit() const { return map->entryLimit(); }
     int entryMaxPayloadSize() const;
     int entriesNeeded(const int64_t objSize) const;
 
     friend class Rebuild;
+    friend class IoState;
     const char *filePath; ///< location of cache storage file inside path/
+    DirMap *map; ///< entry key/sfileno to MaxExtras/inode mapping
 
 private:
     void createError(const char *const msg);
 
     DiskIOStrategy *io;
     RefCount<DiskFile> theFile; ///< cache storage for this cache_dir
-    DirMap *map;
-    DbCellHeader *dbSlots;
-    Ipc::Mem::Pointer<Ipc::Mem::PageStack> dbSlotIndex;
+    DbCellHeader *allSlots; ///< SlotId to DbCellHeader mapping
+    Ipc::Mem::Pointer<Ipc::Mem::PageStack> freeSlots; ///< free slots
+       Ipc::Mem::PageId *waitingForPage; ///< one-page cache for a "hot" free slot
 
     /* configurable options */
     DiskFile::Config fileConfig; ///< file-level configuration options
@@ -138,7 +140,7 @@ protected:
 
 private:
     Vector<SwapDir::DirMap::Owner *> mapOwners;
-    Vector< Ipc::Mem::Owner<Ipc::Mem::PageStack> *> dbSlotsOwners;
+    Vector< Ipc::Mem::Owner<Ipc::Mem::PageStack> *> freeSlotsOwners;
 };
 
 } // namespace Rock
index 3a08d6ea465c09260dd00793edba435d26b0cb2c..cf4932cc6d234ba115c53cfb10892da80c50d835 100644 (file)
@@ -21,6 +21,8 @@ libipc_la_SOURCES = \
        StartListening.h \
        StoreMap.cc \
        StoreMap.h \
+       StoreMapSlice.cc \
+       StoreMapSlice.h \
        StrandCoord.cc \
        StrandCoord.h \
        StrandCoords.h \
index bb365513fd974de0d3d0d0a7c305d88779f98ad8..99a040c9c7bc6a108e925f46b28a7f5f4e2a290a 100644 (file)
@@ -31,34 +31,90 @@ Ipc::StoreMap::StoreMap(const char *const aPath): cleaner(NULL), path(aPath),
            shared->limit);
 }
 
-Ipc::StoreMap::Slot *
+int
+Ipc::StoreMap::compareVersions(const sfileno fileno, time_t newVersion) const
+{
+    assert(valid(fileno));
+    Anchor &inode = shared->slots[fileno].anchor;
+
+    // note: we do not lock, so comparison may be inacurate
+
+    if (inode.state == Anchor::Empty)
+        return +2;
+
+    if (const time_t diff = newVersion - inode.basics.timestamp)
+        return diff < 0 ? -1 : +1;
+    return 0;
+}
+
+void
+Ipc::StoreMap::forgetWritingEntry(sfileno fileno)
+{
+    assert(valid(fileno));
+    Anchor &inode = shared->slots[fileno].anchor;
+
+    assert(inode.state == Anchor::Writeable);
+
+    // we do not iterate slices because we were told to forget about
+    // them; the caller is responsible for freeing them (most likely
+    // our slice list is incomplete or has holes)
+
+    inode.waitingToBeFreed = false;
+    inode.state = Anchor::Empty;
+
+    inode.lock.unlockExclusive();
+    --shared->count;
+
+    debugs(54, 8, "closed entry " << fileno << " for writing " << path);
+}
+
+Ipc::StoreMap::Anchor *
 Ipc::StoreMap::openForWriting(const cache_key *const key, sfileno &fileno)
 {
-    debugs(54, 5, HERE << " trying to open slot for key " << storeKeyText(key)
+    debugs(54, 5, "opening entry with key " << storeKeyText(key)
            << " for writing in map [" << path << ']');
-    const int idx = slotIndexByKey(key);
+    const int idx = anchorIndexByKey(key);
 
-    Slot &s = shared->slots[idx];
+    if (Anchor *anchor = openForWritingAt(idx)) {
+        fileno = idx;
+        return anchor;
+    }
+
+    return NULL;
+}
+
+Ipc::StoreMap::Anchor *
+Ipc::StoreMap::openForWritingAt(const sfileno fileno, bool overwriteExisting)
+{
+    Anchor &s = shared->slots[fileno].anchor;
     ReadWriteLock &lock = s.lock;
 
     if (lock.lockExclusive()) {
-        assert(s.state != Slot::Writeable); // until we start breaking locks
+        assert(s.state != Anchor::Writeable); // until we start breaking locks
+
+        // bail if we cannot empty this position
+        if (!s.waitingToBeFreed && s.state == Anchor::Readable && !overwriteExisting) {
+            lock.unlockExclusive();
+            debugs(54, 5, "cannot open existing entry at " << fileno <<
+                   " for writing in map [" << path << ']');
+            return NULL;
+        }
 
         // free if the entry was used, keeping the entry locked
-        if (s.waitingToBeFreed || s.state == Slot::Readable)
-            freeLocked(s, true);
+        if (s.waitingToBeFreed || s.state == Anchor::Readable)
+            freeChain(fileno, s, true);
 
-        assert(s.state == Slot::Empty);
+        assert(s.state == Anchor::Empty);
         ++shared->count;
-        s.state = Slot::Writeable;
-        fileno = idx;
+        s.state = Anchor::Writeable;
+
         //s.setKey(key); // XXX: the caller should do that
-        debugs(54, 5, HERE << " opened slot at " << idx <<
-               " for writing in map [" << path << ']');
+        debugs(54, 5, "opened entry " << fileno << " for writing " << path);
         return &s; // and keep the entry locked
-    }
+       }
 
-    debugs(54, 5, HERE << " failed to open slot at " << idx <<
+    debugs(54, 5, "cannot open busy entry at " << fileno <<
            " for writing in map [" << path << ']');
     return NULL;
 }
@@ -66,57 +122,86 @@ Ipc::StoreMap::openForWriting(const cache_key *const key, sfileno &fileno)
 void
 Ipc::StoreMap::closeForWriting(const sfileno fileno, bool lockForReading)
 {
-    debugs(54, 5, HERE << " closing slot at " << fileno << " for writing and "
-           "openning for reading in map [" << path << ']');
     assert(valid(fileno));
-    Slot &s = shared->slots[fileno];
-    assert(s.state == Slot::Writeable);
-    s.state = Slot::Readable;
-    if (lockForReading)
+    Anchor &s = shared->slots[fileno].anchor;
+    assert(s.state == Anchor::Writeable);
+    s.state = Anchor::Readable;
+    if (lockForReading) {
         s.lock.switchExclusiveToShared();
-    else
+        debugs(54, 5, "switched entry at " << fileno <<
+               " from writing to reading in map [" << path << ']');
+    } else {
         s.lock.unlockExclusive();
+        debugs(54, 5, "closed entry " << fileno << " for writing " << path);
+    }
+}
+
+Ipc::StoreMap::Slice &
+Ipc::StoreMap::writeableSlice(const AnchorId anchorId, const SliceId sliceId)
+{
+    assert(valid(anchorId));
+    assert(shared->slots[anchorId].anchor.state == Anchor::Writeable);
+    assert(valid(sliceId));
+       return shared->slots[sliceId].slice;
+}
+
+const Ipc::StoreMap::Slice &
+Ipc::StoreMap::readableSlice(const AnchorId anchorId, const SliceId sliceId) const
+{
+    assert(valid(anchorId));
+    assert(shared->slots[anchorId].anchor.state == Anchor::Readable);
+    assert(valid(sliceId));
+       return shared->slots[sliceId].slice;
+}
+
+Ipc::StoreMap::Anchor &
+Ipc::StoreMap::writeableEntry(const AnchorId anchorId)
+{
+    assert(valid(anchorId));
+    assert(shared->slots[anchorId].anchor.state == Anchor::Writeable);
+    return shared->slots[anchorId].anchor;
 }
 
 /// terminate writing the entry, freeing its slot for others to use
 void
 Ipc::StoreMap::abortWriting(const sfileno fileno)
 {
-    debugs(54, 5, HERE << " abort writing slot at " << fileno <<
-           " in map [" << path << ']');
+    debugs(54, 5, "abort entry at " << fileno <<
+           " for writing in map [" << path << ']');
     assert(valid(fileno));
-    Slot &s = shared->slots[fileno];
-    assert(s.state == Slot::Writeable);
-    freeLocked(s, false);
+    Anchor &s = shared->slots[fileno].anchor;
+    assert(s.state == Anchor::Writeable);
+    freeChain(fileno, s, false);
+    debugs(54, 5, "closed entry " << fileno << " for writing " << path);
 }
 
 void
 Ipc::StoreMap::abortIo(const sfileno fileno)
 {
-    debugs(54, 5, HERE << " abort I/O for slot at " << fileno <<
-           " in map [" << path << ']');
+    debugs(54, 5, "abort entry at " << fileno <<
+           " for I/O in map [" << path << ']');
     assert(valid(fileno));
-    Slot &s = shared->slots[fileno];
+    Anchor &s = shared->slots[fileno].anchor;
 
     // The caller is a lock holder. Thus, if we are Writeable, then the
     // caller must be the writer; otherwise the caller must be the reader.
-    if (s.state == Slot::Writeable)
+    if (s.state == Anchor::Writeable)
         abortWriting(fileno);
     else
         closeForReading(fileno);
 }
 
-const Ipc::StoreMap::Slot *
+const Ipc::StoreMap::Anchor *
 Ipc::StoreMap::peekAtReader(const sfileno fileno) const
 {
     assert(valid(fileno));
-    const Slot &s = shared->slots[fileno];
+    const Anchor &s = shared->slots[fileno].anchor;
     switch (s.state) {
-    case Slot::Readable:
+    case Anchor::Readable:
         return &s; // immediate access by lock holder so no locking
-    case Slot::Writeable:
+    case Anchor::Writeable:
         return NULL; // cannot read the slot when it is being written
-    case Slot::Empty:
+    case Anchor::Empty:
         assert(false); // must be locked for reading or writing
     }
     assert(false); // not reachable
@@ -124,85 +209,141 @@ Ipc::StoreMap::peekAtReader(const sfileno fileno) const
 }
 
 void
-Ipc::StoreMap::free(const sfileno fileno)
+Ipc::StoreMap::freeEntry(const sfileno fileno)
 {
-    debugs(54, 5, HERE << " marking slot at " << fileno << " to be freed in"
+    debugs(54, 5, HERE << " marking entry at " << fileno << " to be freed in"
            " map [" << path << ']');
 
     assert(valid(fileno));
-    Slot &s = shared->slots[fileno];
+    Anchor &s = shared->slots[fileno].anchor;
 
     if (s.lock.lockExclusive())
-        freeLocked(s, false);
+        freeChain(fileno, s, false);
     else
         s.waitingToBeFreed = true; // mark to free it later
 }
 
-const Ipc::StoreMap::Slot *
+/// unconditionally frees an already locked chain of slots, unlocking if needed
+void
+Ipc::StoreMap::freeChain(const sfileno fileno, Anchor &inode, const bool keepLocked)
+{
+    debugs(54, 7, "freeing " << inode.state << " entry " << fileno <<
+           " in map [" << path << ']');
+    if (inode.state == Anchor::Readable && cleaner) {
+        sfileno sliceId = inode.start;
+        debugs(54, 7, "first slice " << sliceId);
+        while (sliceId >= 0) {
+                       const sfileno nextId = shared->slots[sliceId].slice.next;
+            cleaner->noteFreeMapSlice(sliceId); // might change slice state
+            sliceId = nextId;
+        }
+    }
+
+    inode.waitingToBeFreed = false;
+    inode.state = Anchor::Empty;
+
+    if (!keepLocked)
+        inode.lock.unlockExclusive();
+    --shared->count;
+    debugs(54, 5, "freed entry " << fileno <<
+           " in map [" << path << ']');
+}
+
+const Ipc::StoreMap::Anchor *
 Ipc::StoreMap::openForReading(const cache_key *const key, sfileno &fileno)
 {
-    debugs(54, 5, HERE << " trying to open slot for key " << storeKeyText(key)
+    debugs(54, 5, "opening entry with key " << storeKeyText(key)
            << " for reading in map [" << path << ']');
-    const int idx = slotIndexByKey(key);
-    if (const Slot *slot = openForReadingAt(idx)) {
+    const int idx = anchorIndexByKey(key);
+    if (const Anchor *slot = openForReadingAt(idx)) {
         if (slot->sameKey(key)) {
             fileno = idx;
-            debugs(54, 5, HERE << " opened slot at " << fileno << " for key "
-                   << storeKeyText(key) << " for reading in map [" << path <<
-                   ']');
             return slot; // locked for reading
         }
         slot->lock.unlockShared();
+        debugs(54, 7, "closed entry " << idx << " for reading " << path);
     }
-    debugs(54, 5, HERE << " failed to open slot for key " << storeKeyText(key)
-           << " for reading in map [" << path << ']');
     return NULL;
 }
 
-const Ipc::StoreMap::Slot *
+const Ipc::StoreMap::Anchor *
 Ipc::StoreMap::openForReadingAt(const sfileno fileno)
 {
-    debugs(54, 5, HERE << " trying to open slot at " << fileno << " for "
-           "reading in map [" << path << ']');
+    debugs(54, 5, "opening entry at " << fileno <<
+           " for reading in map [" << path << ']');
     assert(valid(fileno));
-    Slot &s = shared->slots[fileno];
+    Anchor &s = shared->slots[fileno].anchor;
 
     if (!s.lock.lockShared()) {
-        debugs(54, 5, HERE << " failed to lock slot at " << fileno << " for "
-               "reading in map [" << path << ']');
+        debugs(54, 5, "cannot open busy entry at " << fileno <<
+               "for reading in map [" << path << ']');
         return NULL;
     }
 
-    if (s.state == Slot::Empty) {
+    if (s.state == Anchor::Empty) {
         s.lock.unlockShared();
-        debugs(54, 7, HERE << " empty slot at " << fileno << " for "
-               "reading in map [" << path << ']');
+        debugs(54, 7, "cannot open empty entry at " << fileno <<
+               " for reading in map [" << path << ']');
         return NULL;
     }
 
     if (s.waitingToBeFreed) {
         s.lock.unlockShared();
-        debugs(54, 7, HERE << " dirty slot at " << fileno << " for "
-               "reading in map [" << path << ']');
+        debugs(54, 7, HERE << "cannot open marked entry at " << fileno <<
+               " for reading in map [" << path << ']');
         return NULL;
     }
 
     // cannot be Writing here if we got shared lock and checked Empty above
-    assert(s.state == Slot::Readable);
-    debugs(54, 5, HERE << " opened slot at " << fileno << " for reading in"
-           " map [" << path << ']');
+    assert(s.state == Anchor::Readable);
+    debugs(54, 5, "opened entry " << fileno << " for reading " << path);
     return &s;
 }
 
 void
 Ipc::StoreMap::closeForReading(const sfileno fileno)
 {
-    debugs(54, 5, HERE << " closing slot at " << fileno << " for reading in "
-           "map [" << path << ']');
     assert(valid(fileno));
-    Slot &s = shared->slots[fileno];
-    assert(s.state == Slot::Readable);
+    Anchor &s = shared->slots[fileno].anchor;
+    assert(s.state == Anchor::Readable);
     s.lock.unlockShared();
+    debugs(54, 5, "closed entry " << fileno << " for reading " << path);
+}
+
+bool
+Ipc::StoreMap::purgeOne()
+{
+    // Hopefully, we find a removable entry much sooner (TODO: use time?).
+    // The min() will protect us from division by zero inside the loop.
+    const int searchLimit = min(10000, entryLimit());
+    int tries = 0;
+    for (; tries < searchLimit; ++tries) {
+               const sfileno fileno = shared->victim++ % shared->limit;
+        assert(valid(fileno));
+        Anchor &s = shared->slots[fileno].anchor;
+        if (s.lock.lockExclusive()) {
+            if (s.state == Anchor::Readable) { // skip empties
+                // this entry may be marked for deletion, and that is OK
+                freeChain(fileno, s, false);
+                debugs(54, 5, "purged entry at " << fileno);
+                return true;
+                       }
+            s.lock.unlockExclusive();
+        }
+       }
+    debugs(54, 5, "found no entries to purge; tried: " << tries);
+    return false;
+}
+
+void
+Ipc::StoreMap::importSlice(const SliceId sliceId, const Slice &slice)
+{
+    // Slices are imported into positions that should not be available via
+    // "get free slice" API. This is not something we can double check
+    // reliably because the anchor for the imported slice may not have been
+    // imported yet.
+    assert(valid(sliceId));
+    shared->slots[sliceId].slice = slice;
 }
 
 int
@@ -227,7 +368,7 @@ void
 Ipc::StoreMap::updateStats(ReadWriteLockStats &stats) const
 {
     for (int i = 0; i < shared->limit; ++i)
-        shared->slots[i].lock.updateStats(stats);
+        shared->slots[i].anchor.lock.updateStats(stats);
 }
 
 bool
@@ -236,62 +377,48 @@ Ipc::StoreMap::valid(const int pos) const
     return 0 <= pos && pos < entryLimit();
 }
 
-int
-Ipc::StoreMap::slotIndexByKey(const cache_key *const key) const
+sfileno
+Ipc::StoreMap::anchorIndexByKey(const cache_key *const key) const
 {
     const uint64_t *const k = reinterpret_cast<const uint64_t *>(key);
     // TODO: use a better hash function
     return (k[0] + k[1]) % shared->limit;
 }
 
-Ipc::StoreMap::Slot &
-Ipc::StoreMap::slotByKey(const cache_key *const key)
+Ipc::StoreMap::Anchor &
+Ipc::StoreMap::anchorByKey(const cache_key *const key)
 {
-    return shared->slots[slotIndexByKey(key)];
+    return shared->slots[anchorIndexByKey(key)].anchor;
 }
 
-/// unconditionally frees the already exclusively locked slot and releases lock
-void
-Ipc::StoreMap::freeLocked(Slot &s, bool keepLocked)
-{
-    if (s.state == Slot::Readable && cleaner)
-        cleaner->cleanReadable(&s - shared->slots.raw());
-
-    s.waitingToBeFreed = false;
-    s.state = Slot::Empty;
-    if (!keepLocked)
-        s.lock.unlockExclusive();
-    --shared->count;
-    debugs(54, 5, HERE << " freed slot at " << (&s - shared->slots.raw()) <<
-           " in map [" << path << ']');
-}
 
-/* Ipc::StoreMapSlot */
+/* Ipc::StoreMapAnchor */
 
-Ipc::StoreMapSlot::StoreMapSlot(): state(Empty)
+Ipc::StoreMapAnchor::StoreMapAnchor(): start(0), state(Empty)
 {
     memset(&key, 0, sizeof(key));
     memset(&basics, 0, sizeof(basics));
+    // keep in sync with rewind()
 }
 
 void
-Ipc::StoreMapSlot::setKey(const cache_key *const aKey)
+Ipc::StoreMapAnchor::setKey(const cache_key *const aKey)
 {
     memcpy(key, aKey, sizeof(key));
 }
 
 bool
-Ipc::StoreMapSlot::sameKey(const cache_key *const aKey) const
+Ipc::StoreMapAnchor::sameKey(const cache_key *const aKey) const
 {
     const uint64_t *const k = reinterpret_cast<const uint64_t *>(aKey);
     return k[0] == key[0] && k[1] == key[1];
 }
 
 void
-Ipc::StoreMapSlot::set(const StoreEntry &from)
+Ipc::StoreMapAnchor::set(const StoreEntry &from)
 {
+    assert(state == Writeable);
     memcpy(key, from.key, sizeof(key));
-    // XXX: header = aHeader;
     basics.timestamp = from.timestamp;
     basics.lastref = from.lastref;
     basics.expires = from.expires;
@@ -301,10 +428,21 @@ Ipc::StoreMapSlot::set(const StoreEntry &from)
     basics.flags = from.flags;
 }
 
+void
+Ipc::StoreMapAnchor::rewind()
+{
+    assert(state == Writeable);
+    start = 0;
+    memset(&key, 0, sizeof(key));
+    memset(&basics, 0, sizeof(basics));
+    // but keep the lock
+}
+
 /* Ipc::StoreMap::Shared */
 
 Ipc::StoreMap::Shared::Shared(const int aLimit, const size_t anExtrasSize):
-        limit(aLimit), extrasSize(anExtrasSize), count(0), slots(aLimit)
+        limit(aLimit), extrasSize(anExtrasSize), count(0), victim(0),
+        slots(aLimit)
 {
 }
 
@@ -317,6 +455,6 @@ Ipc::StoreMap::Shared::sharedMemorySize() const
 size_t
 Ipc::StoreMap::Shared::SharedMemorySize(const int limit, const size_t extrasSize)
 {
-    return sizeof(Shared) + limit * (sizeof(Slot) + extrasSize);
+    return sizeof(Shared) + limit * (sizeof(StoreMapSlot) + extrasSize);
 }
 
index 444073a3595de9abfd68a810c206693a37d57163..bacdad0276b69cb56369aba1d91b24a1c4fc5a7c 100644 (file)
@@ -4,23 +4,29 @@
 #include "ipc/ReadWriteLock.h"
 #include "ipc/mem/FlexibleArray.h"
 #include "ipc/mem/Pointer.h"
-#include "typedefs.h"
+#include "ipc/StoreMapSlice.h"
 
 namespace Ipc
 {
 
-/// a StoreMap element, holding basic shareable StoreEntry info
-class StoreMapSlot
+/// Maintains shareable information about a StoreEntry as a whole.
+/// An anchor points to one or more StoreEntry slices. This is the
+/// only lockable part of shared StoreEntry information, providing
+/// protection for all StoreEntry slices.
+class StoreMapAnchor
 {
 public:
-    StoreMapSlot();
+    StoreMapAnchor();
 
-    /// store StoreEntry key and basics
+    /// store StoreEntry key and basics for an inode slot
     void set(const StoreEntry &anEntry);
 
     void setKey(const cache_key *const aKey);
     bool sameKey(const cache_key *const aKey) const;
 
+    /// undo the effects of set(), setKey(), etc., but keep locks and state
+    void rewind();
+
 public:
     mutable ReadWriteLock lock; ///< protects slot data below
     Atomic::WordT<uint8_t> waitingToBeFreed; ///< may be accessed w/o a lock
@@ -38,6 +44,8 @@ public:
         uint16_t flags;
     } basics;
 
+    StoreMapSliceId start; ///< where the chain of StoreEntry slices begins
+
     /// possible persistent states
     typedef enum {
         Empty, ///< ready for writing, with nothing of value
@@ -47,14 +55,24 @@ public:
     State state; ///< current state
 };
 
+/// XXX: a hack to allocate one shared array for both anchors and slices
+class StoreMapSlot {
+public:
+    StoreMapAnchor anchor;
+    StoreMapSlice slice;
+};
+
 class StoreMapCleaner;
 
-/// map of StoreMapSlots indexed by their keys, with read/write slot locking
+/// map of StoreMapSlots indexed by their keys, with read/write slice locking
 /// kids extend to store custom data
 class StoreMap
 {
 public:
-    typedef StoreMapSlot Slot;
+    typedef StoreMapAnchor Anchor;
+    typedef sfileno AnchorId;
+    typedef StoreMapSlice Slice;
+    typedef StoreMapSliceId SliceId;
 
     /// data shared across maps in different processes
     class Shared
@@ -64,10 +82,11 @@ public:
         size_t sharedMemorySize() const;
         static size_t SharedMemorySize(const int limit, const size_t anExtrasSize);
 
-        const int limit; ///< maximum number of map slots
-        const size_t extrasSize; ///< size of slot extra data
-        Atomic::Word count; ///< current number of map slots
-        Ipc::Mem::FlexibleArray<Slot> slots; ///< slots storage
+        const int limit; ///< maximum number of store entries
+        const size_t extrasSize; ///< size of slice extra data
+        Atomic::Word count; ///< current number of entries
+        Atomic::WordT<sfileno> victim; ///< starting point for purge search
+        Ipc::Mem::FlexibleArray<StoreMapSlot> slots; ///< storage
     };
 
 public:
@@ -78,31 +97,60 @@ public:
 
     StoreMap(const char *const aPath);
 
-    /// finds, reservers space for writing a new entry or returns nil
-    Slot *openForWriting(const cache_key *const key, sfileno &fileno);
-    /// successfully finish writing the entry
+    /// computes map entry position for a given entry key
+    sfileno anchorIndexByKey(const cache_key *const key) const;
+
+    /// Like strcmp(mapped, new), but for store entry versions/timestamps.
+    /// Returns +2 if the mapped entry does not exist; -1/0/+1 otherwise.
+    /// Comparison may be inaccurate unless the caller is a lock holder.
+    int compareVersions(const sfileno oldFileno, time_t newVersion) const;
+
+    /// finds, locks, and returns an anchor for an empty key position,
+    /// erasing the old entry (if any)
+    Anchor *openForWriting(const cache_key *const key, sfileno &fileno);
+    /// locks and returns an anchor for the empty fileno position; if
+    /// overwriteExisting is false and the position is not empty, returns nil
+    Anchor *openForWritingAt(sfileno fileno, bool overwriteExisting = true);
+    /// successfully finish creating or updating the entry at fileno pos
     void closeForWriting(const sfileno fileno, bool lockForReading = false);
+    /// unlock and "forget" openForWriting entry, making it Empty again
+    /// this call does not free entry slices so the caller has to do that
+    void forgetWritingEntry(const sfileno fileno);
+
+    /// only works on locked entries; returns nil unless the slice is readable
+    const Anchor *peekAtReader(const sfileno fileno) const;
+
+    /// if possible, free the entry and return true
+    /// otherwise mark it as waiting to be freed and return false
+    void freeEntry(const sfileno fileno);
+
+    /// opens entry (identified by key) for reading, increments read level
+    const Anchor *openForReading(const cache_key *const key, sfileno &fileno);
+    /// opens entry (identified by sfileno) for reading, increments read level
+    const Anchor *openForReadingAt(const sfileno fileno);
+    /// closes open entry after reading, decrements read level
+    void closeForReading(const sfileno fileno);
 
-    /// only works on locked entries; returns nil unless the slot is readable
-    const Slot *peekAtReader(const sfileno fileno) const;
+    /// writeable slice within an entry chain created by openForWriting()
+    Slice &writeableSlice(const AnchorId anchorId, const SliceId sliceId);
+    /// readable slice within an entry chain opened by openForReading()
+    const Slice &readableSlice(const AnchorId anchorId, const SliceId sliceId) const;
+    /// writeable anchor for the entry created by openForWriting()
+    Anchor &writeableEntry(const AnchorId anchorId);
 
-    /// mark the slot as waiting to be freed and, if possible, free it
-    void free(const sfileno fileno);
+    /// called by lock holder to terminate either slice writing or reading
+    void abortIo(const sfileno fileno);
 
-    /// open slot for reading, increments read level
-    const Slot *openForReading(const cache_key *const key, sfileno &fileno);
-    /// open slot for reading, increments read level
-    const Slot *openForReadingAt(const sfileno fileno);
-    /// close slot after reading, decrements read level
-    void closeForReading(const sfileno fileno);
+    /// finds an unlocked entry and frees it or returns false
+    bool purgeOne();
 
-    /// called by lock holder to terminate either slot writing or reading
-    void abortIo(const sfileno fileno);
+    /// copies slice to its designated position
+    void importSlice(const SliceId sliceId, const Slice &slice);
 
-    bool full() const; ///< there are no empty slots left
-    bool valid(const int n) const; ///< whether n is a valid slot coordinate
-    int entryCount() const; ///< number of used slots
-    int entryLimit() const; ///< maximum number of slots that can be used
+    bool full() const; ///< there are no empty slices left; XXX: remove as unused?
+    bool valid(const int n) const; ///< whether n is a valid slice coordinate
+    int entryCount() const; ///< number of writeable and readable entries
+    int entryLimit() const; ///< maximum entryCount() possible
 
     /// adds approximate current stats to the supplied ones
     void updateStats(ReadWriteLockStats &stats) const;
@@ -116,16 +164,15 @@ protected:
     Mem::Pointer<Shared> shared;
 
 private:
-    int slotIndexByKey(const cache_key *const key) const;
-    Slot &slotByKey(const cache_key *const key);
+    Anchor &anchorByKey(const cache_key *const key);
 
-    Slot *openForReading(Slot &s);
+    Anchor *openForReading(Slice &s);
     void abortWriting(const sfileno fileno);
-    void freeIfNeeded(Slot &s);
-    void freeLocked(Slot &s, bool keepLocked);
+
+    void freeChain(const sfileno fileno, Anchor &inode, const bool keepLock);
 };
 
-/// StoreMap with extra slot data
+/// StoreMap with extra slice data
 /// Note: ExtrasT must be POD, it is initialized with zeroes, no
 /// constructors or destructors are called
 template <class ExtrasT>
@@ -149,14 +196,14 @@ protected:
     ExtrasT *sharedExtras; ///< pointer to extras in shared memory
 };
 
-/// API for adjusting external state when dirty map slot is being freed
+/// API for adjusting external state when dirty map slice is being freed
 class StoreMapCleaner
 {
 public:
     virtual ~StoreMapCleaner() {}
 
-    /// adjust slot-linked state before a locked Readable slot is erased
-    virtual void cleanReadable(const sfileno fileno) = 0;
+    /// adjust slice-linked state before a locked Readable slice is erased
+    virtual void noteFreeMapSlice(const sfileno sliceId) = 0;
 };
 
 // StoreMapWithExtras implementation
diff --git a/src/ipc/StoreMapSlice.cc b/src/ipc/StoreMapSlice.cc
new file mode 100644 (file)
index 0000000..e6b4c1f
--- /dev/null
@@ -0,0 +1,12 @@
+/*
+ * DEBUG: section 54    Interprocess Communication
+ */
+
+#include "squid.h"
+#include "ipc/StoreMapSlice.h"
+#include "tools.h"
+
+Ipc::StoreMapSlice::StoreMapSlice()
+{
+    memset(this, 0, sizeof(*this));
+}
diff --git a/src/ipc/StoreMapSlice.h b/src/ipc/StoreMapSlice.h
new file mode 100644 (file)
index 0000000..1ba775a
--- /dev/null
@@ -0,0 +1,24 @@
+#ifndef SQUID_IPC_STORE_MAP_SLICE_H
+#define SQUID_IPC_STORE_MAP_SLICE_H
+
+#include "typedefs.h"
+
+namespace Ipc
+{
+
+typedef uint32_t StoreMapSliceId;
+
+/// a piece of Store entry, linked to other pieces, forming a chain
+class StoreMapSlice
+{
+public:
+    StoreMapSlice(): next(0), /* location(0), */ size(0) {}
+
+    StoreMapSliceId next; ///< ID of the next slice occupied by the entry
+//    uint32_t location; ///< slice contents location
+    uint32_t size; ///< slice contents size
+};
+
+} // namespace Ipc
+
+#endif /* SQUID_IPC_STORE_MAP_SLICE_H */