]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Initial Large Rock implmentation.
authorAlex Rousskov <rousskov@measurement-factory.com>
Wed, 5 Dec 2012 16:06:35 +0000 (09:06 -0700)
committerAlex Rousskov <rousskov@measurement-factory.com>
Wed, 5 Dec 2012 16:06:35 +0000 (09:06 -0700)
Needs polishing, a better replacement policy, and rebuild fixes.

12 files changed:
src/fs/Makefile.am
src/fs/rock/RockDbCell.cc [new file with mode: 0644]
src/fs/rock/RockDbCell.h
src/fs/rock/RockIoRequests.cc
src/fs/rock/RockIoRequests.h
src/fs/rock/RockIoState.cc
src/fs/rock/RockIoState.h
src/fs/rock/RockRebuild.cc
src/fs/rock/RockRebuild.h
src/fs/rock/RockSwapDir.cc
src/fs/rock/RockSwapDir.h
src/ipc/mem/Pointer.h

index 444d35c50563f70d54d3b48d5d0331eb908e0ad9..1c361986d4a76431ff7c53816eb9568e663ea0de 100644 (file)
@@ -36,6 +36,7 @@ libufs_la_SOURCES = \
        ufs/RebuildState.cc
 
 librock_la_SOURCES = \
+       rock/RockDbCell.cc \
        rock/RockDbCell.h \
        rock/RockIoState.cc \
        rock/RockIoState.h \
diff --git a/src/fs/rock/RockDbCell.cc b/src/fs/rock/RockDbCell.cc
new file mode 100644 (file)
index 0000000..31c3c00
--- /dev/null
@@ -0,0 +1,18 @@
+/*
+ * DEBUG: section 79    Disk IO Routines
+ */
+
+#include "squid.h"
+#include "fs/rock/RockDbCell.h"
+#include "ipc/StoreMap.h"
+#include "tools.h"
+
+Rock::DbCellHeader::DbCellHeader(): firstSlot(0), nextSlot(0), version(0),
+        payloadSize(0) {
+    xmemset(&key, 0, sizeof(key));
+}
+
+bool
+Rock::DbCellHeader::sane() const {
+    return firstSlot > 0;
+}
index a644a8f080786243e424cd4b5c4e28aa45461c54..9830933b25bbf32c6703fe0695e76025bda61316 100644 (file)
@@ -1,6 +1,11 @@
 #ifndef SQUID_FS_ROCK_DB_CELL_H
 #define SQUID_FS_ROCK_DB_CELL_H
 
+namespace Ipc
+{
+class StoreMapSlot;
+}
+
 namespace Rock
 {
 
@@ -11,13 +16,16 @@ namespace Rock
 class DbCellHeader
 {
 public:
-    DbCellHeader(): payloadSize(0), reserved(0) {}
+    DbCellHeader();
 
     /// whether the freshly loaded header fields make sense
-    bool sane() const { return payloadSize >= 0 && reserved == 0; }
+    bool sane() const;
 
-    int64_t payloadSize; ///< cell contents size excluding this header
-    int64_t reserved; ///< reserved for future use (next cell pointer?)
+    uint64_t key[2]; ///< StoreEntry key
+    uint32_t firstSlot; ///< first slot pointer in the entry chain
+    uint32_t nextSlot; ///< next slot pointer in the entry chain
+    uint32_t version; ///< entry chain version
+    uint32_t payloadSize; ///< cell contents size excluding this header
 };
 
 } // namespace Rock
index be3742f4fba86ec3faa77485b41288b4c5a93233..cf84b7b6d5b1b2306e36cb416a8bc303da738b37 100644 (file)
@@ -16,8 +16,10 @@ Rock::ReadRequest::ReadRequest(const ::ReadRequest &base,
 }
 
 Rock::WriteRequest::WriteRequest(const ::WriteRequest &base,
-                                 const IoState::Pointer &anSio):
+                                 const IoState::Pointer &anSio,
+                                 const bool last):
         ::WriteRequest(base),
-        sio(anSio)
+        sio(anSio),
+        isLast(last)
 {
 }
index 15e57601d899a47f14ad369eb80f3c33db9ecb32..29839fc84acd5c732eea85e02b09ebd9cc436982 100644 (file)
@@ -25,8 +25,9 @@ private:
 class WriteRequest: public ::WriteRequest
 {
 public:
-    WriteRequest(const ::WriteRequest &base, const IoState::Pointer &anSio);
+    WriteRequest(const ::WriteRequest &base, const IoState::Pointer &anSio, const bool last);
     IoState::Pointer sio;
+    const bool isLast;
 
 private:
     CBDATA_CLASS2(WriteRequest);
index 0b036b7230c932fc204c7eee818b05926a338c34..2b36d0f44ae5149c1c05dddaaeefe3222f684e93 100644 (file)
 #include "fs/rock/RockSwapDir.h"
 #include "globals.h"
 
-Rock::IoState::IoState(SwapDir *dir,
+Rock::IoState::IoState(SwapDir &aDir,
                        StoreEntry *anEntry,
                        StoreIOState::STFNCB *cbFile,
                        StoreIOState::STIOCB *cbIo,
                        void *data):
-        slotSize(0),
-        diskOffset(-1),
-        payloadEnd(-1)
+        dbSlot(NULL),
+        dir(aDir),
+        slotSize(dir.max_objsize),
+        objOffset(0)
 {
     e = anEntry;
-    // swap_filen, swap_dirn, diskOffset, and payloadEnd are set by the caller
-    slotSize = dir->max_objsize;
+    // swap_filen, swap_dirn and diskOffset are set by the caller
     file_callback = cbFile;
     callback = cbIo;
     callback_data = cbdataReference(data);
@@ -53,50 +53,85 @@ Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data
 {
     assert(theFile != NULL);
     assert(coreOff >= 0);
-    offset_ = coreOff;
 
-    // we skip our cell header; it is only read when building the map
-    const int64_t cellOffset = sizeof(DbCellHeader) +
-                               static_cast<int64_t>(coreOff);
-    assert(cellOffset <= payloadEnd);
+    Ipc::Mem::PageId pageId;
+    pageId.pool = dir.index;
+    if (coreOff < objOffset) { // rewind
+        pageId.number = dbSlot->firstSlot;
+        dbSlot = &dir.dbSlot(pageId);
+        objOffset = 0;
+    }
+
+    while (coreOff >= objOffset + dbSlot->payloadSize) {
+        objOffset += dbSlot->payloadSize;
+        pageId.number = dbSlot->nextSlot;
+        assert(pageId); // XXX: should be an error?
+        dbSlot = &dir.dbSlot(pageId);
+    }
+    if (pageId)
+        diskOffset = dir.diskOffset(pageId);
 
-    // Core specifies buffer length, but we must not exceed stored entry size
-    if (cellOffset + (int64_t)len > payloadEnd)
-        len = payloadEnd - cellOffset;
+    offset_ = coreOff;
+    len = min(len,
+        static_cast<size_t>(objOffset + dbSlot->payloadSize - coreOff));
 
     assert(read.callback == NULL);
     assert(read.callback_data == NULL);
     read.callback = cb;
     read.callback_data = cbdataReference(data);
 
-    theFile->read(new ReadRequest(
-                      ::ReadRequest(buf, diskOffset + cellOffset, len), this));
+    theFile->read(new ReadRequest(::ReadRequest(buf,
+        diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
 }
 
-// We only buffer data here; we actually write when close() is called.
+// We only write data when full slot is accumulated or when close() is called.
 // We buffer, in part, to avoid forcing OS to _read_ old unwritten portions
 // of the slot when the write does not end at the page or sector boundary.
 void
 Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
 {
-    // TODO: move to create?
-    if (!coreOff) {
-        assert(theBuf.isNull());
-        assert(payloadEnd <= slotSize);
-        theBuf.init(min(payloadEnd, slotSize), slotSize);
-        // start with our header; TODO: consider making it a trailer
-        DbCellHeader header;
-        assert(static_cast<int64_t>(sizeof(header)) <= payloadEnd);
-        header.payloadSize = payloadEnd - sizeof(header);
-        theBuf.append(reinterpret_cast<const char*>(&header), sizeof(header));
-    } else {
-        // Core uses -1 offset as "append". Sigh.
-        assert(coreOff == -1);
-        assert(!theBuf.isNull());
+    assert(dbSlot);
+
+    if (theBuf.isNull()) {
+        theBuf.init(min(size + sizeof(DbCellHeader), slotSize), slotSize);
+        theBuf.appended(sizeof(DbCellHeader)); // will fill header in doWrite
     }
 
-    theBuf.append(buf, size);
-    offset_ += size; // so that Core thinks we wrote it
+    if (size <= static_cast<size_t>(theBuf.spaceSize()))
+        theBuf.append(buf, size);
+    else {
+        Ipc::Mem::PageId pageId;
+        if (!dir.popDbSlot(pageId)) {
+            debugs(79, DBG_IMPORTANT, "WARNING: Rock cache_dir '" << dir.path <<
+                   "' run out of DB slots");
+            dir.writeError(swap_filen);
+            // XXX: do we need to destroy buf on error?
+            if (dtor)
+                (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
+            // XXX: do we need to call callback on error?
+            callBack(DISK_ERROR);
+            return;
+        }
+        DbCellHeader &nextDbSlot = dir.dbSlot(pageId);
+        memcpy(nextDbSlot.key, dbSlot->key, sizeof(nextDbSlot.key));
+        nextDbSlot.firstSlot = dbSlot->firstSlot;
+        nextDbSlot.nextSlot = 0;
+        nextDbSlot.version = dbSlot->version;
+        nextDbSlot.payloadSize = 0;
+
+        dbSlot->nextSlot = pageId.number;
+
+        const size_t left = size - theBuf.spaceSize();
+        offset_ += theBuf.spaceSize(); // so that Core thinks we wrote it
+        theBuf.append(buf, theBuf.spaceSize());
+
+        doWrite();
+
+        dbSlot = &nextDbSlot;
+        diskOffset = dir.diskOffset(pageId);
+        theBuf.init(min(left, slotSize), slotSize);
+        write(buf + size - left, left, -1, NULL);
+    }
 
     if (dtor)
         (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
@@ -104,7 +139,7 @@ Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
 
 // write what was buffered during write() calls
 void
-Rock::IoState::startWriting()
+Rock::IoState::doWrite(const bool isLast)
 {
     assert(theFile != NULL);
     assert(!theBuf.isNull());
@@ -115,10 +150,15 @@ Rock::IoState::startWriting()
     debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
            theBuf.contentSize());
 
-    assert(theBuf.contentSize() <= slotSize);
+    dbSlot->payloadSize = theBuf.contentSize() - sizeof(DbCellHeader);
+    memcpy(theBuf.content(), dbSlot, sizeof(DbCellHeader));
+
+    assert(static_cast<size_t>(theBuf.contentSize()) <= slotSize);
     // theFile->write may call writeCompleted immediatelly
-    theFile->write(new WriteRequest(::WriteRequest(theBuf.content(),
-                                    diskOffset, theBuf.contentSize(), theBuf.freeFunc()), this));
+    WriteRequest *const r = new WriteRequest(
+        ::WriteRequest(theBuf.content(), diskOffset, theBuf.contentSize(),
+                       theBuf.freeFunc()), this, isLast);
+    theFile->write(r);
 }
 
 //
@@ -135,7 +175,7 @@ Rock::IoState::close(int how)
     debugs(79, 3, HERE << swap_filen << " accumulated: " << offset_ <<
            " how=" << how);
     if (how == wroteAll && !theBuf.isNull())
-        startWriting();
+        doWrite(true);
     else
         callBack(how == writerGone ? DISK_ERROR : 0); // TODO: add DISK_CALLER_GONE
 }
index 11b3ddd628924db56e9ca9379892e828d782f8a8..191bf25c3c7fadebba783f3da4dff0550ffea2d9 100644 (file)
@@ -9,6 +9,7 @@ class DiskFile;
 namespace Rock
 {
 
+class DbCellHeader;
 class SwapDir;
 
 /// \ingroup Rock
@@ -17,7 +18,7 @@ class IoState: public ::StoreIOState
 public:
     typedef RefCount<IoState> Pointer;
 
-    IoState(SwapDir *dir, StoreEntry *e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data);
+    IoState(SwapDir &aDir, StoreEntry *e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data);
     virtual ~IoState();
 
     void file(const RefCount<DiskFile> &aFile);
@@ -27,22 +28,21 @@ public:
     virtual void write(char const *buf, size_t size, off_t offset, FREE * free_func);
     virtual void close(int how);
 
-    /// called by SwapDir when writing is done
     void finishedWriting(int errFlag);
 
-    int64_t slotSize; ///< db cell size
     int64_t diskOffset; ///< the start of this cell inside the db file
-
-    /// when reading: number of bytes previously written to the db cell;
-    /// when writing: maximum payload offset in a db cell
-    int64_t payloadEnd;
+    DbCellHeader *dbSlot; ///< current db slot, used for writing
 
     MEMPROXY_CLASS(IoState);
 
 private:
-    void startWriting();
+    void doWrite(const bool isLast = false);
     void callBack(int errflag);
 
+    SwapDir &dir; ///< swap dir object
+    const size_t slotSize; ///< db cell size
+    int64_t objOffset; ///< object offset for current db slot
+
     RefCount<DiskFile> theFile; // "file" responsible for this I/O
     MemBuf theBuf; // use for write content accumulation only
 };
index 8f07ded9d9faa8633720a6825baf969c428db3c7..845d90662d83db17a8e56f2ac1fbc1ffde8d1fff 100644 (file)
@@ -7,6 +7,7 @@
 #include "fs/rock/RockRebuild.h"
 #include "fs/rock/RockSwapDir.h"
 #include "fs/rock/RockDbCell.h"
+#include "ipc/StoreMap.h"
 #include "globals.h"
 #include "md5.h"
 #include "tools.h"
@@ -25,6 +26,7 @@ Rock::Rebuild::Rebuild(SwapDir *dir): AsyncJob("Rock::Rebuild"),
         dbSize(0),
         dbEntrySize(0),
         dbEntryLimit(0),
+        dbSlot(0),
         fd(-1),
         dbOffset(0),
         filen(0)
@@ -34,6 +36,9 @@ Rock::Rebuild::Rebuild(SwapDir *dir): AsyncJob("Rock::Rebuild"),
     dbSize = sd->diskOffsetLimit(); // we do not care about the trailer waste
     dbEntrySize = sd->max_objsize;
     dbEntryLimit = sd->entryLimit();
+    loaded.reserve(dbSize);
+    for (size_t i = 0; i < loaded.size(); ++i)
+        loaded.push_back(false);
 }
 
 Rock::Rebuild::~Rebuild()
@@ -75,14 +80,19 @@ Rock::Rebuild::start()
 void
 Rock::Rebuild::checkpoint()
 {
-    if (!done())
+    if (dbOffset < dbSize)
         eventAdd("Rock::Rebuild", Rock::Rebuild::Steps, this, 0.01, 1, true);
+    else
+    if (!doneAll()) {
+        eventAdd("Rock::Rebuild::Step2", Rock::Rebuild::Steps2, this, 0.01, 1,
+                 true);
+    }
 }
 
 bool
 Rock::Rebuild::doneAll() const
 {
-    return dbOffset >= dbSize && AsyncJob::doneAll();
+    return dbSlot >= dbSize && AsyncJob::doneAll();
 }
 
 void
@@ -92,6 +102,13 @@ Rock::Rebuild::Steps(void *data)
     CallJobHere(47, 5, static_cast<Rebuild*>(data), Rock::Rebuild, steps);
 }
 
+void
+Rock::Rebuild::Steps2(void *data)
+{
+    // use async call to enable job call protection that time events lack
+    CallJobHere(47, 5, static_cast<Rebuild*>(data), Rock::Rebuild, steps2);
+}
+
 void
 Rock::Rebuild::steps()
 {
@@ -129,6 +146,39 @@ Rock::Rebuild::steps()
     checkpoint();
 }
 
+void
+Rock::Rebuild::steps2()
+{
+    debugs(47,5, HERE << sd->index << " filen " << filen << " at " <<
+           dbSlot << " <= " << dbSize);
+
+    // Balance our desire to maximize the number of slots processed at once
+    // (and, hence, minimize overheads and total rebuild time) with a
+    // requirement to also process Coordinator events, disk I/Os, etc.
+    const int maxSpentMsec = 50; // keep small: most RAM I/Os are under 1ms
+    const timeval loopStart = current_time;
+
+    int loaded = 0;
+    while (dbSlot < dbSize) {
+        doOneSlot();
+        ++dbSlot;
+        ++loaded;
+
+        if (opt_foreground_rebuild)
+            continue; // skip "few entries at a time" check below
+
+        getCurrentTime();
+        const double elapsedMsec = tvSubMsec(loopStart, current_time);
+        if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
+            debugs(47, 5, HERE << "pausing after " << loaded << " slots in " <<
+                   elapsedMsec << "ms; " << (elapsedMsec/loaded) << "ms per slot");
+            break;
+        }
+    }
+
+    checkpoint();
+}
+
 void
 Rock::Rebuild::doOneEntry()
 {
@@ -141,17 +191,22 @@ Rock::Rebuild::doOneEntry()
         failure("cannot seek to db entry", errno);
 
     MemBuf buf;
-    buf.init(SM_PAGE_SIZE, SM_PAGE_SIZE);
+    buf.init(sizeof(DbCellHeader), sizeof(DbCellHeader));
 
     if (!storeRebuildLoadEntry(fd, sd->index, buf, counts))
         return;
 
     // get our header
-    DbCellHeader header;
+    Ipc::Mem::PageId pageId;
+    pageId.pool = sd->index;
+    pageId.number = filen + 1;
+    DbCellHeader &header = sd->dbSlot(pageId);
+    assert(!header.sane());
+
     if (buf.contentSize() < static_cast<mb_size_t>(sizeof(header))) {
         debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
                "Ignoring truncated cache entry meta data at " << dbOffset);
-        ++counts.invalid;
+        invalidSlot(pageId);
         return;
     }
     memcpy(&header, buf.content(), sizeof(header));
@@ -159,30 +214,32 @@ Rock::Rebuild::doOneEntry()
     if (!header.sane()) {
         debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
                "Ignoring malformed cache entry meta data at " << dbOffset);
-        ++counts.invalid;
-        return;
-    }
-    buf.consume(sizeof(header)); // optimize to avoid memmove()
-
-    cache_key key[SQUID_MD5_DIGEST_LENGTH];
-    StoreEntry loadedE;
-    if (!storeRebuildParseEntry(buf, loadedE, key, counts, header.payloadSize)) {
-        // skip empty slots
-        if (loadedE.swap_filen > 0 || loadedE.swap_file_sz > 0) {
-            ++counts.invalid;
-            //sd->unlink(filen); leave garbage on disk, it should not hurt
-        }
+        invalidSlot(pageId);
         return;
     }
+}
 
-    assert(loadedE.swap_filen < dbEntryLimit);
-    if (!storeRebuildKeepEntry(loadedE, key, counts))
+void
+Rock::Rebuild::doOneSlot()
+{
+    debugs(47,5, HERE << sd->index << " filen " << filen << " at " <<
+           dbSlot << " <= " << dbSize);
+
+    if (loaded[dbSlot])
         return;
 
-    ++counts.objcount;
-    // loadedE->dump(5);
+    Ipc::Mem::PageId pageId;
+    pageId.pool = sd->index;
+    pageId.number = dbSlot + 1;
+    const DbCellHeader &dbSlot = sd->dbSlot(pageId);
+    assert(dbSlot.sane());
+
+    pageId.number = dbSlot.firstSlot;
+    //const DbCellHeader &firstChainSlot = sd->dbSlot(pageId);
 
-    sd->addEntry(filen, header, loadedE);
+    /* Process all not yet loaded slots, verify entry chains, if chain
+       is valid, load entry from first slot similar to small rock,
+       call SwapDir::addEntry (needs to be restored). */
 }
 
 void
@@ -208,3 +265,10 @@ Rock::Rebuild::failure(const char *msg, int errNo)
     fatalf("Rock cache_dir[%d] rebuild of %s failed: %s.",
            sd->index, sd->filePath, msg);
 }
+
+void Rock::Rebuild::invalidSlot(Ipc::Mem::PageId &pageId)
+{
+    ++counts.invalid;
+    loaded[pageId.number - 1] = true;
+    sd->dbSlotIndex->push(pageId);
+}
index f4864633bd40e33235cd774430d06032ec390ba6..787d5682cb8a617824ea3b444647769eaa6fd2ce 100644 (file)
@@ -5,6 +5,14 @@
 #include "cbdata.h"
 #include "store_rebuild.h"
 
+namespace Ipc
+{
+namespace Mem
+{
+class PageId;
+}
+}
+
 namespace Rock
 {
 
@@ -27,22 +35,29 @@ protected:
 private:
     void checkpoint();
     void steps();
+    void steps2();
     void doOneEntry();
+    void doOneSlot();
     void failure(const char *msg, int errNo = 0);
+    void invalidSlot(Ipc::Mem::PageId &pageId);
 
     SwapDir *sd;
 
     int64_t dbSize;
     int dbEntrySize;
     int dbEntryLimit;
+    int dbSlot;
 
     int fd; // store db file descriptor
     int64_t dbOffset;
     int filen;
 
+    Vector<bool> loaded; ///< true iff rebuilt is complete for a given slot
+
     StoreRebuildData counts;
 
     static void Steps(void *data);
+    static void Steps2(void *data);
 
     CBDATA_CLASS2(Rebuild);
 };
index 7f5de22ffc8ec712a8bb8ec189f35f729d772996..e82898a52b758e4eb2b5b45aa637a6e74095251c 100644 (file)
@@ -229,6 +229,10 @@ Rock::SwapDir::init()
     theFile->configure(fileConfig);
     theFile->open(O_RDWR, 0644, this);
 
+    dbSlotIndex = shm_old(Ipc::Mem::PageStack)(path);
+    dbSlots = new (reinterpret_cast<char *>(dbSlotIndex.getRaw()) +
+                   dbSlotIndex->stackSize()) DbCellHeader[entryLimitAllowed()];
+
     // Increment early. Otherwise, if one SwapDir finishes rebuild before
     // others start, storeRebuildComplete() will think the rebuild is over!
     // TODO: move store_dirs_rebuilding hack to store modules that need it.
@@ -436,28 +440,6 @@ Rock::SwapDir::rebuild()
     AsyncJob::Start(new Rebuild(this));
 }
 
-/* Add a new object to the cache with empty memory copy and pointer to disk
- * use to rebuild store from disk. Based on UFSSwapDir::addDiskRestore */
-bool
-Rock::SwapDir::addEntry(const int filen, const DbCellHeader &header, const StoreEntry &from)
-{
-    debugs(47, 8, HERE << &from << ' ' << from.getMD5Text() <<
-           ", filen="<< std::setfill('0') << std::hex << std::uppercase <<
-           std::setw(8) << filen);
-
-    sfileno newLocation = 0;
-    if (Ipc::StoreMapSlot *slot = map->openForWriting(reinterpret_cast<const cache_key *>(from.key), newLocation)) {
-        if (filen == newLocation) {
-            slot->set(from);
-            map->extras(filen) = header;
-        } // else some other, newer entry got into our cell
-        map->closeForWriting(newLocation, false);
-        return filen == newLocation;
-    }
-
-    return false;
-}
-
 bool
 Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const
 {
@@ -470,6 +452,11 @@ Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load)
     if (!map)
         return false;
 
+    // TODO: consider DB slots freed when older object would be replaced
+    if (dbSlotIndex->size() <
+        static_cast<unsigned int>(max(entriesNeeded(diskSpaceNeeded), 1)))
+        return false;
+
     // Do not start I/O transaction if there are less than 10% free pages left.
     // TODO: reserve page instead
     if (needsDiskStrand() &&
@@ -493,16 +480,6 @@ Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreI
         return NULL;
     }
 
-    // compute payload size for our cell header, using StoreEntry info
-    // careful: e.objectLen() may still be negative here
-    const int64_t expectedReplySize = e.mem_obj->expectedReplySize();
-    assert(expectedReplySize >= 0); // must know to prevent cell overflows
-    assert(e.mem_obj->swap_hdr_sz > 0);
-    DbCellHeader header;
-    header.payloadSize = e.mem_obj->swap_hdr_sz + expectedReplySize;
-    const int64_t payloadEnd = sizeof(DbCellHeader) + header.payloadSize;
-    assert(payloadEnd <= max_objsize);
-
     sfileno filen;
     Ipc::StoreMapSlot *const slot =
         map->openForWriting(reinterpret_cast<const cache_key *>(e.key), filen);
@@ -510,25 +487,37 @@ Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreI
         debugs(47, 5, HERE << "map->add failed");
         return NULL;
     }
-    e.swap_file_sz = header.payloadSize; // and will be copied to the map
+
+    Ipc::Mem::PageId pageId;
+    if (!popDbSlot(pageId)) {
+        debugs(79, DBG_IMPORTANT, "WARNING: Rock cache_dir '" << filePath <<
+               "' run out of DB slots");
+        map->free(filen);
+    }
+
     slot->set(e);
-    map->extras(filen) = header;
 
     // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
     // If that does not happen, the entry will not decrement the read level!
 
-    IoState *sio = new IoState(this, &e, cbFile, cbIo, data);
+    IoState *sio = new IoState(*this, &e, cbFile, cbIo, data);
 
     sio->swap_dirn = index;
     sio->swap_filen = filen;
-    sio->payloadEnd = payloadEnd;
-    sio->diskOffset = diskOffset(sio->swap_filen);
+    sio->diskOffset = diskOffset(pageId);
+
+    DbCellHeader &firstDbSlot = dbSlot(pageId);
+    memcpy(firstDbSlot.key, e.key, sizeof(firstDbSlot.key));
+    firstDbSlot.firstSlot = pageId.number;
+    firstDbSlot.nextSlot = 0;
+    ++firstDbSlot.version;
+    firstDbSlot.payloadSize = 0;
+    sio->dbSlot = &firstDbSlot;
 
     debugs(47,5, HERE << "dir " << index << " created new filen " <<
            std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
-           sio->swap_filen << std::dec << " at " << sio->diskOffset);
-
-    assert(sio->diskOffset + payloadEnd <= diskOffsetLimit());
+           sio->swap_filen << std::dec << " at " <<
+           diskOffset(sio->swap_filen));
 
     sio->file(theFile);
 
@@ -539,8 +528,15 @@ Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreI
 int64_t
 Rock::SwapDir::diskOffset(int filen) const
 {
-    assert(filen >= 0);
-    return HeaderSize + max_objsize*filen;
+     assert(filen >= 0);
+     return HeaderSize + max_objsize*filen;
+}
+
+int64_t
+Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const
+{
+    assert(pageId);
+    return diskOffset(pageId.number - 1);
 }
 
 int64_t
@@ -550,6 +546,56 @@ Rock::SwapDir::diskOffsetLimit() const
     return diskOffset(map->entryLimit());
 }
 
+int
+Rock::SwapDir::entryMaxPayloadSize() const
+{
+    return max_objsize - sizeof(DbCellHeader);
+}
+
+int
+Rock::SwapDir::entriesNeeded(const int64_t objSize) const
+{
+    return (objSize + entryMaxPayloadSize() - 1) / entryMaxPayloadSize();
+}
+
+bool
+Rock::SwapDir::popDbSlot(Ipc::Mem::PageId &pageId)
+{
+    return dbSlotIndex->pop(pageId);
+}
+
+Rock::DbCellHeader &
+Rock::SwapDir::dbSlot(const Ipc::Mem::PageId &pageId)
+{
+    const DbCellHeader &s = const_cast<const SwapDir *>(this)->dbSlot(pageId);
+    return const_cast<DbCellHeader &>(s);
+}
+
+const Rock::DbCellHeader &
+Rock::SwapDir::dbSlot(const Ipc::Mem::PageId &pageId) const
+{
+    assert(dbSlotIndex->pageIdIsValid(pageId));
+    return dbSlots[pageId.number - 1];
+}
+
+void
+Rock::SwapDir::cleanReadable(const sfileno fileno)
+{
+    Ipc::Mem::PageId pageId = map->extras(fileno).pageId;
+    Ipc::Mem::PageId nextPageId = pageId;
+    while (pageId) {
+        const DbCellHeader &curDbSlot = dbSlot(pageId);
+        nextPageId.number = curDbSlot.nextSlot;
+        const DbCellHeader &nextDbSlot = dbSlot(nextPageId);
+        const bool sameChain = memcmp(curDbSlot.key, nextDbSlot.key,
+                                      sizeof(curDbSlot.key)) == 0 &&
+            curDbSlot.version == nextDbSlot.version;
+        dbSlotIndex->push(pageId);
+        if (sameChain)
+            pageId = nextPageId;
+    }
+}
+
 // tries to open an old or being-written-to entry with swap_filen for reading
 StoreIOState::Pointer
 Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data)
@@ -579,12 +625,17 @@ Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOS
     if (!slot)
         return NULL; // we were writing afterall
 
-    IoState *sio = new IoState(this, &e, cbFile, cbIo, data);
+    IoState *sio = new IoState(*this, &e, cbFile, cbIo, data);
 
     sio->swap_dirn = index;
     sio->swap_filen = e.swap_filen;
-    sio->payloadEnd = sizeof(DbCellHeader) + map->extras(e.swap_filen).payloadSize;
-    assert(sio->payloadEnd <= max_objsize); // the payload fits the slot
+    sio->dbSlot = &dbSlot(map->extras(e.swap_filen).pageId);
+
+    const Ipc::Mem::PageId &pageId = map->extras(e.swap_filen).pageId;
+    sio->diskOffset = diskOffset(pageId);
+    DbCellHeader &firstDbSlot = dbSlot(map->extras(e.swap_filen).pageId);
+    assert(memcmp(firstDbSlot.key, e.key, sizeof(firstDbSlot.key)));
+    assert(firstDbSlot.firstSlot == pageId.number);
 
     debugs(47,5, HERE << "dir " << index << " has old filen: " <<
            std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
@@ -593,9 +644,6 @@ Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOS
     assert(slot->basics.swap_file_sz > 0);
     assert(slot->basics.swap_file_sz == e.swap_file_sz);
 
-    sio->diskOffset = diskOffset(sio->swap_filen);
-    assert(sio->diskOffset + sio->payloadEnd <= diskOffsetLimit());
-
     sio->file(theFile);
     return sio;
 }
@@ -632,7 +680,6 @@ Rock::SwapDir::readCompleted(const char *buf, int rlen, int errflag, RefCount< :
 
     if (errflag == DISK_OK && rlen > 0)
         sio->offset_ += rlen;
-    assert(sio->diskOffset + sio->offset_ <= diskOffsetLimit()); // post-factum
 
     StoreIOState::STRCB *callback = sio->read.callback;
     assert(callback);
@@ -654,15 +701,19 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest
         // close, assuming we only write once; the entry gets the read lock
         map->closeForWriting(sio.swap_filen, true);
         // do not increment sio.offset_ because we do it in sio->write()
-    } else {
-        // Do not abortWriting here. The entry should keep the write lock
-        // instead of losing association with the store and confusing core.
-        map->free(sio.swap_filen); // will mark as unusable, just in case
-    }
-
-    assert(sio.diskOffset + sio.offset_ <= diskOffsetLimit()); // post-factum
+        if (request->isLast)
+            sio.finishedWriting(errflag);
+    } else
+        writeError(sio.swap_filen);
+}
 
-    sio.finishedWriting(errflag);
+void
+Rock::SwapDir::writeError(const sfileno fileno)
+{
+    // Do not abortWriting here. The entry should keep the write lock
+    // instead of losing association with the store and confusing core.
+    map->free(fileno); // will mark as unusable, just in case
+    // XXX: should we call IoState callback?
 }
 
 bool
@@ -827,18 +878,34 @@ RunnerRegistrationEntry(rrAfterConfig, SwapDirRr);
 
 void Rock::SwapDirRr::create(const RunnerRegistry &)
 {
-    Must(owners.empty());
+    Must(mapOwners.empty() && dbSlotsOwners.empty());
     for (int i = 0; i < Config.cacheSwap.n_configured; ++i) {
         if (const Rock::SwapDir *const sd = dynamic_cast<Rock::SwapDir *>(INDEXSD(i))) {
-            Rock::SwapDir::DirMap::Owner *const owner =
-                Rock::SwapDir::DirMap::Init(sd->path, sd->entryLimitAllowed());
-            owners.push_back(owner);
+            const int64_t capacity = sd->entryLimitAllowed();
+            SwapDir::DirMap::Owner *const mapOwner =
+                SwapDir::DirMap::Init(sd->path, capacity);
+            mapOwners.push_back(mapOwner);
+
+            // XXX: remove pool id and counters from PageStack
+            Ipc::Mem::Owner<Ipc::Mem::PageStack> *const dbSlotsOwner =
+                shm_new(Ipc::Mem::PageStack)(sd->path, i, capacity,
+                                             sizeof(DbCellHeader));
+            dbSlotsOwners.push_back(dbSlotsOwner);
+
+            // XXX: add method to initialize PageStack with no free pages
+            while (true) {
+                Ipc::Mem::PageId pageId;
+                if (!dbSlotsOwner->object()->pop(pageId))
+                    break;
+            }
         }
     }
 }
 
 Rock::SwapDirRr::~SwapDirRr()
 {
-    for (size_t i = 0; i < owners.size(); ++i)
-        delete owners[i];
+    for (size_t i = 0; i < mapOwners.size(); ++i) {
+        delete mapOwners[i];
+        delete dbSlotsOwners[i];
+    }
 }
index 29576ed43e7d87eab568c64147e378bafe85c15e..bd2739d010cfa69ab0f226cd71dda1742aedad21 100644 (file)
@@ -6,6 +6,8 @@
 #include "DiskIO/IORequestor.h"
 #include "fs/rock/RockDbCell.h"
 #include "ipc/StoreMap.h"
+#include "ipc/mem/Page.h"
+#include "ipc/mem/PageStack.h"
 
 class DiskIOStrategy;
 class ReadRequest;
@@ -17,7 +19,7 @@ namespace Rock
 class Rebuild;
 
 /// \ingroup Rock
-class SwapDir: public ::SwapDir, public IORequestor
+class SwapDir: public ::SwapDir, public IORequestor, public Ipc::StoreMapCleaner
 {
 public:
     SwapDir();
@@ -36,10 +38,26 @@ public:
     virtual void create();
     virtual void parse(int index, char *path);
 
+    // XXX: stop misusing max_objsize as slot size
+    virtual int64_t maxObjectSize() const { return max_objsize * entryLimitAllowed(); }
+
     int64_t entryLimitHigh() const { return SwapFilenMax; } ///< Core limit
     int64_t entryLimitAllowed() const;
 
-    typedef Ipc::StoreMapWithExtras<DbCellHeader> DirMap;
+    bool popDbSlot(Ipc::Mem::PageId &pageId);
+    DbCellHeader &dbSlot(const Ipc::Mem::PageId &pageId);
+    const DbCellHeader &dbSlot(const Ipc::Mem::PageId &pageId) const;
+
+    int64_t diskOffset(Ipc::Mem::PageId &pageId) const;
+    void writeError(const sfileno fileno);
+
+    virtual void cleanReadable(const sfileno fileno);
+
+    // TODO: merge with MemStoreMapExtras?
+    struct MapExtras {
+        Ipc::Mem::PageId pageId;
+    };
+    typedef Ipc::StoreMapWithExtras<MapExtras> DirMap;
 
 protected:
     /* protected ::SwapDir API */
@@ -72,8 +90,6 @@ protected:
     void dumpRateOption(StoreEntry * e) const;
 
     void rebuild(); ///< starts loading and validating stored entry metadata
-    ///< used to add entries successfully loaded during rebuild
-    bool addEntry(const int fileno, const DbCellHeader &header, const StoreEntry &from);
 
     bool full() const; ///< no more entries can be stored without purging
     void trackReferences(StoreEntry &e); ///< add to replacement policy scope
@@ -82,6 +98,8 @@ protected:
     int64_t diskOffset(int filen) const;
     int64_t diskOffsetLimit() const;
     int entryLimit() const { return map->entryLimit(); }
+    int entryMaxPayloadSize() const;
+    int entriesNeeded(const int64_t objSize) const;
 
     friend class Rebuild;
     const char *filePath; ///< location of cache storage file inside path/
@@ -92,6 +110,8 @@ private:
     DiskIOStrategy *io;
     RefCount<DiskFile> theFile; ///< cache storage for this cache_dir
     DirMap *map;
+    DbCellHeader *dbSlots;
+    Ipc::Mem::Pointer<Ipc::Mem::PageStack> dbSlotIndex;
 
     /* configurable options */
     DiskFile::Config fileConfig; ///< file-level configuration options
@@ -111,7 +131,8 @@ protected:
     virtual void create(const RunnerRegistry &);
 
 private:
-    Vector<SwapDir::DirMap::Owner *> owners;
+    Vector<SwapDir::DirMap::Owner *> mapOwners;
+    Vector< Ipc::Mem::Owner<Ipc::Mem::PageStack> *> dbSlotsOwners;
 };
 
 } // namespace Rock
index 55cb55b86b63c4e022c6e0bae1a8370ec843a1ac..f46092d08c1977b04dab5374b05caab9cf35fe76 100644 (file)
@@ -32,6 +32,8 @@ public:
 
     ~Owner();
 
+    Class *object() { return theObject; }
+
 private:
     Owner(const char *const id, const off_t sharedSize);