From: Alex Rousskov Date: Wed, 5 Dec 2012 16:06:35 +0000 (-0700) Subject: Initial Large Rock implmentation. X-Git-Tag: SQUID_3_5_0_1~444^2~97 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=93910d5cd4b588c9bf91af698e290ed45e303721;p=thirdparty%2Fsquid.git Initial Large Rock implmentation. Needs polishing, a better replacement policy, and rebuild fixes. --- diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am index 444d35c505..1c361986d4 100644 --- a/src/fs/Makefile.am +++ b/src/fs/Makefile.am @@ -36,6 +36,7 @@ libufs_la_SOURCES = \ ufs/RebuildState.cc librock_la_SOURCES = \ + rock/RockDbCell.cc \ rock/RockDbCell.h \ rock/RockIoState.cc \ rock/RockIoState.h \ diff --git a/src/fs/rock/RockDbCell.cc b/src/fs/rock/RockDbCell.cc new file mode 100644 index 0000000000..31c3c009d4 --- /dev/null +++ b/src/fs/rock/RockDbCell.cc @@ -0,0 +1,18 @@ +/* + * DEBUG: section 79 Disk IO Routines + */ + +#include "squid.h" +#include "fs/rock/RockDbCell.h" +#include "ipc/StoreMap.h" +#include "tools.h" + +Rock::DbCellHeader::DbCellHeader(): firstSlot(0), nextSlot(0), version(0), + payloadSize(0) { + xmemset(&key, 0, sizeof(key)); +} + +bool +Rock::DbCellHeader::sane() const { + return firstSlot > 0; +} diff --git a/src/fs/rock/RockDbCell.h b/src/fs/rock/RockDbCell.h index a644a8f080..9830933b25 100644 --- a/src/fs/rock/RockDbCell.h +++ b/src/fs/rock/RockDbCell.h @@ -1,6 +1,11 @@ #ifndef SQUID_FS_ROCK_DB_CELL_H #define SQUID_FS_ROCK_DB_CELL_H +namespace Ipc +{ +class StoreMapSlot; +} + namespace Rock { @@ -11,13 +16,16 @@ namespace Rock class DbCellHeader { public: - DbCellHeader(): payloadSize(0), reserved(0) {} + DbCellHeader(); /// whether the freshly loaded header fields make sense - bool sane() const { return payloadSize >= 0 && reserved == 0; } + bool sane() const; - int64_t payloadSize; ///< cell contents size excluding this header - int64_t reserved; ///< reserved for future use (next cell pointer?) + uint64_t key[2]; ///< StoreEntry key + uint32_t firstSlot; ///< first slot pointer in the entry chain + uint32_t nextSlot; ///< next slot pointer in the entry chain + uint32_t version; ///< entry chain version + uint32_t payloadSize; ///< cell contents size excluding this header }; } // namespace Rock diff --git a/src/fs/rock/RockIoRequests.cc b/src/fs/rock/RockIoRequests.cc index be3742f4fb..cf84b7b6d5 100644 --- a/src/fs/rock/RockIoRequests.cc +++ b/src/fs/rock/RockIoRequests.cc @@ -16,8 +16,10 @@ Rock::ReadRequest::ReadRequest(const ::ReadRequest &base, } Rock::WriteRequest::WriteRequest(const ::WriteRequest &base, - const IoState::Pointer &anSio): + const IoState::Pointer &anSio, + const bool last): ::WriteRequest(base), - sio(anSio) + sio(anSio), + isLast(last) { } diff --git a/src/fs/rock/RockIoRequests.h b/src/fs/rock/RockIoRequests.h index 15e57601d8..29839fc84a 100644 --- a/src/fs/rock/RockIoRequests.h +++ b/src/fs/rock/RockIoRequests.h @@ -25,8 +25,9 @@ private: class WriteRequest: public ::WriteRequest { public: - WriteRequest(const ::WriteRequest &base, const IoState::Pointer &anSio); + WriteRequest(const ::WriteRequest &base, const IoState::Pointer &anSio, const bool last); IoState::Pointer sio; + const bool isLast; private: CBDATA_CLASS2(WriteRequest); diff --git a/src/fs/rock/RockIoState.cc b/src/fs/rock/RockIoState.cc index 0b036b7230..2b36d0f44a 100644 --- a/src/fs/rock/RockIoState.cc +++ b/src/fs/rock/RockIoState.cc @@ -13,18 +13,18 @@ #include "fs/rock/RockSwapDir.h" #include "globals.h" -Rock::IoState::IoState(SwapDir *dir, +Rock::IoState::IoState(SwapDir &aDir, StoreEntry *anEntry, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data): - slotSize(0), - diskOffset(-1), - payloadEnd(-1) + dbSlot(NULL), + dir(aDir), + slotSize(dir.max_objsize), + objOffset(0) { e = anEntry; - // swap_filen, swap_dirn, diskOffset, and payloadEnd are set by the caller - slotSize = dir->max_objsize; + // swap_filen, swap_dirn and diskOffset are set by the caller file_callback = cbFile; callback = cbIo; callback_data = cbdataReference(data); @@ -53,50 +53,85 @@ Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data { assert(theFile != NULL); assert(coreOff >= 0); - offset_ = coreOff; - // we skip our cell header; it is only read when building the map - const int64_t cellOffset = sizeof(DbCellHeader) + - static_cast(coreOff); - assert(cellOffset <= payloadEnd); + Ipc::Mem::PageId pageId; + pageId.pool = dir.index; + if (coreOff < objOffset) { // rewind + pageId.number = dbSlot->firstSlot; + dbSlot = &dir.dbSlot(pageId); + objOffset = 0; + } + + while (coreOff >= objOffset + dbSlot->payloadSize) { + objOffset += dbSlot->payloadSize; + pageId.number = dbSlot->nextSlot; + assert(pageId); // XXX: should be an error? + dbSlot = &dir.dbSlot(pageId); + } + if (pageId) + diskOffset = dir.diskOffset(pageId); - // Core specifies buffer length, but we must not exceed stored entry size - if (cellOffset + (int64_t)len > payloadEnd) - len = payloadEnd - cellOffset; + offset_ = coreOff; + len = min(len, + static_cast(objOffset + dbSlot->payloadSize - coreOff)); assert(read.callback == NULL); assert(read.callback_data == NULL); read.callback = cb; read.callback_data = cbdataReference(data); - theFile->read(new ReadRequest( - ::ReadRequest(buf, diskOffset + cellOffset, len), this)); + theFile->read(new ReadRequest(::ReadRequest(buf, + diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this)); } -// We only buffer data here; we actually write when close() is called. +// We only write data when full slot is accumulated or when close() is called. // We buffer, in part, to avoid forcing OS to _read_ old unwritten portions // of the slot when the write does not end at the page or sector boundary. void Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor) { - // TODO: move to create? - if (!coreOff) { - assert(theBuf.isNull()); - assert(payloadEnd <= slotSize); - theBuf.init(min(payloadEnd, slotSize), slotSize); - // start with our header; TODO: consider making it a trailer - DbCellHeader header; - assert(static_cast(sizeof(header)) <= payloadEnd); - header.payloadSize = payloadEnd - sizeof(header); - theBuf.append(reinterpret_cast(&header), sizeof(header)); - } else { - // Core uses -1 offset as "append". Sigh. - assert(coreOff == -1); - assert(!theBuf.isNull()); + assert(dbSlot); + + if (theBuf.isNull()) { + theBuf.init(min(size + sizeof(DbCellHeader), slotSize), slotSize); + theBuf.appended(sizeof(DbCellHeader)); // will fill header in doWrite } - theBuf.append(buf, size); - offset_ += size; // so that Core thinks we wrote it + if (size <= static_cast(theBuf.spaceSize())) + theBuf.append(buf, size); + else { + Ipc::Mem::PageId pageId; + if (!dir.popDbSlot(pageId)) { + debugs(79, DBG_IMPORTANT, "WARNING: Rock cache_dir '" << dir.path << + "' run out of DB slots"); + dir.writeError(swap_filen); + // XXX: do we need to destroy buf on error? + if (dtor) + (dtor)(const_cast(buf)); // cast due to a broken API? + // XXX: do we need to call callback on error? + callBack(DISK_ERROR); + return; + } + DbCellHeader &nextDbSlot = dir.dbSlot(pageId); + memcpy(nextDbSlot.key, dbSlot->key, sizeof(nextDbSlot.key)); + nextDbSlot.firstSlot = dbSlot->firstSlot; + nextDbSlot.nextSlot = 0; + nextDbSlot.version = dbSlot->version; + nextDbSlot.payloadSize = 0; + + dbSlot->nextSlot = pageId.number; + + const size_t left = size - theBuf.spaceSize(); + offset_ += theBuf.spaceSize(); // so that Core thinks we wrote it + theBuf.append(buf, theBuf.spaceSize()); + + doWrite(); + + dbSlot = &nextDbSlot; + diskOffset = dir.diskOffset(pageId); + theBuf.init(min(left, slotSize), slotSize); + write(buf + size - left, left, -1, NULL); + } if (dtor) (dtor)(const_cast(buf)); // cast due to a broken API? @@ -104,7 +139,7 @@ Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor) // write what was buffered during write() calls void -Rock::IoState::startWriting() +Rock::IoState::doWrite(const bool isLast) { assert(theFile != NULL); assert(!theBuf.isNull()); @@ -115,10 +150,15 @@ Rock::IoState::startWriting() debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' << theBuf.contentSize()); - assert(theBuf.contentSize() <= slotSize); + dbSlot->payloadSize = theBuf.contentSize() - sizeof(DbCellHeader); + memcpy(theBuf.content(), dbSlot, sizeof(DbCellHeader)); + + assert(static_cast(theBuf.contentSize()) <= slotSize); // theFile->write may call writeCompleted immediatelly - theFile->write(new WriteRequest(::WriteRequest(theBuf.content(), - diskOffset, theBuf.contentSize(), theBuf.freeFunc()), this)); + WriteRequest *const r = new WriteRequest( + ::WriteRequest(theBuf.content(), diskOffset, theBuf.contentSize(), + theBuf.freeFunc()), this, isLast); + theFile->write(r); } // @@ -135,7 +175,7 @@ Rock::IoState::close(int how) debugs(79, 3, HERE << swap_filen << " accumulated: " << offset_ << " how=" << how); if (how == wroteAll && !theBuf.isNull()) - startWriting(); + doWrite(true); else callBack(how == writerGone ? DISK_ERROR : 0); // TODO: add DISK_CALLER_GONE } diff --git a/src/fs/rock/RockIoState.h b/src/fs/rock/RockIoState.h index 11b3ddd628..191bf25c3c 100644 --- a/src/fs/rock/RockIoState.h +++ b/src/fs/rock/RockIoState.h @@ -9,6 +9,7 @@ class DiskFile; namespace Rock { +class DbCellHeader; class SwapDir; /// \ingroup Rock @@ -17,7 +18,7 @@ class IoState: public ::StoreIOState public: typedef RefCount Pointer; - IoState(SwapDir *dir, StoreEntry *e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data); + IoState(SwapDir &aDir, StoreEntry *e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data); virtual ~IoState(); void file(const RefCount &aFile); @@ -27,22 +28,21 @@ public: virtual void write(char const *buf, size_t size, off_t offset, FREE * free_func); virtual void close(int how); - /// called by SwapDir when writing is done void finishedWriting(int errFlag); - int64_t slotSize; ///< db cell size int64_t diskOffset; ///< the start of this cell inside the db file - - /// when reading: number of bytes previously written to the db cell; - /// when writing: maximum payload offset in a db cell - int64_t payloadEnd; + DbCellHeader *dbSlot; ///< current db slot, used for writing MEMPROXY_CLASS(IoState); private: - void startWriting(); + void doWrite(const bool isLast = false); void callBack(int errflag); + SwapDir &dir; ///< swap dir object + const size_t slotSize; ///< db cell size + int64_t objOffset; ///< object offset for current db slot + RefCount theFile; // "file" responsible for this I/O MemBuf theBuf; // use for write content accumulation only }; diff --git a/src/fs/rock/RockRebuild.cc b/src/fs/rock/RockRebuild.cc index 8f07ded9d9..845d90662d 100644 --- a/src/fs/rock/RockRebuild.cc +++ b/src/fs/rock/RockRebuild.cc @@ -7,6 +7,7 @@ #include "fs/rock/RockRebuild.h" #include "fs/rock/RockSwapDir.h" #include "fs/rock/RockDbCell.h" +#include "ipc/StoreMap.h" #include "globals.h" #include "md5.h" #include "tools.h" @@ -25,6 +26,7 @@ Rock::Rebuild::Rebuild(SwapDir *dir): AsyncJob("Rock::Rebuild"), dbSize(0), dbEntrySize(0), dbEntryLimit(0), + dbSlot(0), fd(-1), dbOffset(0), filen(0) @@ -34,6 +36,9 @@ Rock::Rebuild::Rebuild(SwapDir *dir): AsyncJob("Rock::Rebuild"), dbSize = sd->diskOffsetLimit(); // we do not care about the trailer waste dbEntrySize = sd->max_objsize; dbEntryLimit = sd->entryLimit(); + loaded.reserve(dbSize); + for (size_t i = 0; i < loaded.size(); ++i) + loaded.push_back(false); } Rock::Rebuild::~Rebuild() @@ -75,14 +80,19 @@ Rock::Rebuild::start() void Rock::Rebuild::checkpoint() { - if (!done()) + if (dbOffset < dbSize) eventAdd("Rock::Rebuild", Rock::Rebuild::Steps, this, 0.01, 1, true); + else + if (!doneAll()) { + eventAdd("Rock::Rebuild::Step2", Rock::Rebuild::Steps2, this, 0.01, 1, + true); + } } bool Rock::Rebuild::doneAll() const { - return dbOffset >= dbSize && AsyncJob::doneAll(); + return dbSlot >= dbSize && AsyncJob::doneAll(); } void @@ -92,6 +102,13 @@ Rock::Rebuild::Steps(void *data) CallJobHere(47, 5, static_cast(data), Rock::Rebuild, steps); } +void +Rock::Rebuild::Steps2(void *data) +{ + // use async call to enable job call protection that time events lack + CallJobHere(47, 5, static_cast(data), Rock::Rebuild, steps2); +} + void Rock::Rebuild::steps() { @@ -129,6 +146,39 @@ Rock::Rebuild::steps() checkpoint(); } +void +Rock::Rebuild::steps2() +{ + debugs(47,5, HERE << sd->index << " filen " << filen << " at " << + dbSlot << " <= " << dbSize); + + // Balance our desire to maximize the number of slots processed at once + // (and, hence, minimize overheads and total rebuild time) with a + // requirement to also process Coordinator events, disk I/Os, etc. + const int maxSpentMsec = 50; // keep small: most RAM I/Os are under 1ms + const timeval loopStart = current_time; + + int loaded = 0; + while (dbSlot < dbSize) { + doOneSlot(); + ++dbSlot; + ++loaded; + + if (opt_foreground_rebuild) + continue; // skip "few entries at a time" check below + + getCurrentTime(); + const double elapsedMsec = tvSubMsec(loopStart, current_time); + if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) { + debugs(47, 5, HERE << "pausing after " << loaded << " slots in " << + elapsedMsec << "ms; " << (elapsedMsec/loaded) << "ms per slot"); + break; + } + } + + checkpoint(); +} + void Rock::Rebuild::doOneEntry() { @@ -141,17 +191,22 @@ Rock::Rebuild::doOneEntry() failure("cannot seek to db entry", errno); MemBuf buf; - buf.init(SM_PAGE_SIZE, SM_PAGE_SIZE); + buf.init(sizeof(DbCellHeader), sizeof(DbCellHeader)); if (!storeRebuildLoadEntry(fd, sd->index, buf, counts)) return; // get our header - DbCellHeader header; + Ipc::Mem::PageId pageId; + pageId.pool = sd->index; + pageId.number = filen + 1; + DbCellHeader &header = sd->dbSlot(pageId); + assert(!header.sane()); + if (buf.contentSize() < static_cast(sizeof(header))) { debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " << "Ignoring truncated cache entry meta data at " << dbOffset); - ++counts.invalid; + invalidSlot(pageId); return; } memcpy(&header, buf.content(), sizeof(header)); @@ -159,30 +214,32 @@ Rock::Rebuild::doOneEntry() if (!header.sane()) { debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " << "Ignoring malformed cache entry meta data at " << dbOffset); - ++counts.invalid; - return; - } - buf.consume(sizeof(header)); // optimize to avoid memmove() - - cache_key key[SQUID_MD5_DIGEST_LENGTH]; - StoreEntry loadedE; - if (!storeRebuildParseEntry(buf, loadedE, key, counts, header.payloadSize)) { - // skip empty slots - if (loadedE.swap_filen > 0 || loadedE.swap_file_sz > 0) { - ++counts.invalid; - //sd->unlink(filen); leave garbage on disk, it should not hurt - } + invalidSlot(pageId); return; } +} - assert(loadedE.swap_filen < dbEntryLimit); - if (!storeRebuildKeepEntry(loadedE, key, counts)) +void +Rock::Rebuild::doOneSlot() +{ + debugs(47,5, HERE << sd->index << " filen " << filen << " at " << + dbSlot << " <= " << dbSize); + + if (loaded[dbSlot]) return; - ++counts.objcount; - // loadedE->dump(5); + Ipc::Mem::PageId pageId; + pageId.pool = sd->index; + pageId.number = dbSlot + 1; + const DbCellHeader &dbSlot = sd->dbSlot(pageId); + assert(dbSlot.sane()); + + pageId.number = dbSlot.firstSlot; + //const DbCellHeader &firstChainSlot = sd->dbSlot(pageId); - sd->addEntry(filen, header, loadedE); + /* Process all not yet loaded slots, verify entry chains, if chain + is valid, load entry from first slot similar to small rock, + call SwapDir::addEntry (needs to be restored). */ } void @@ -208,3 +265,10 @@ Rock::Rebuild::failure(const char *msg, int errNo) fatalf("Rock cache_dir[%d] rebuild of %s failed: %s.", sd->index, sd->filePath, msg); } + +void Rock::Rebuild::invalidSlot(Ipc::Mem::PageId &pageId) +{ + ++counts.invalid; + loaded[pageId.number - 1] = true; + sd->dbSlotIndex->push(pageId); +} diff --git a/src/fs/rock/RockRebuild.h b/src/fs/rock/RockRebuild.h index f4864633bd..787d5682cb 100644 --- a/src/fs/rock/RockRebuild.h +++ b/src/fs/rock/RockRebuild.h @@ -5,6 +5,14 @@ #include "cbdata.h" #include "store_rebuild.h" +namespace Ipc +{ +namespace Mem +{ +class PageId; +} +} + namespace Rock { @@ -27,22 +35,29 @@ protected: private: void checkpoint(); void steps(); + void steps2(); void doOneEntry(); + void doOneSlot(); void failure(const char *msg, int errNo = 0); + void invalidSlot(Ipc::Mem::PageId &pageId); SwapDir *sd; int64_t dbSize; int dbEntrySize; int dbEntryLimit; + int dbSlot; int fd; // store db file descriptor int64_t dbOffset; int filen; + Vector loaded; ///< true iff rebuilt is complete for a given slot + StoreRebuildData counts; static void Steps(void *data); + static void Steps2(void *data); CBDATA_CLASS2(Rebuild); }; diff --git a/src/fs/rock/RockSwapDir.cc b/src/fs/rock/RockSwapDir.cc index 7f5de22ffc..e82898a52b 100644 --- a/src/fs/rock/RockSwapDir.cc +++ b/src/fs/rock/RockSwapDir.cc @@ -229,6 +229,10 @@ Rock::SwapDir::init() theFile->configure(fileConfig); theFile->open(O_RDWR, 0644, this); + dbSlotIndex = shm_old(Ipc::Mem::PageStack)(path); + dbSlots = new (reinterpret_cast(dbSlotIndex.getRaw()) + + dbSlotIndex->stackSize()) DbCellHeader[entryLimitAllowed()]; + // Increment early. Otherwise, if one SwapDir finishes rebuild before // others start, storeRebuildComplete() will think the rebuild is over! // TODO: move store_dirs_rebuilding hack to store modules that need it. @@ -436,28 +440,6 @@ Rock::SwapDir::rebuild() AsyncJob::Start(new Rebuild(this)); } -/* Add a new object to the cache with empty memory copy and pointer to disk - * use to rebuild store from disk. Based on UFSSwapDir::addDiskRestore */ -bool -Rock::SwapDir::addEntry(const int filen, const DbCellHeader &header, const StoreEntry &from) -{ - debugs(47, 8, HERE << &from << ' ' << from.getMD5Text() << - ", filen="<< std::setfill('0') << std::hex << std::uppercase << - std::setw(8) << filen); - - sfileno newLocation = 0; - if (Ipc::StoreMapSlot *slot = map->openForWriting(reinterpret_cast(from.key), newLocation)) { - if (filen == newLocation) { - slot->set(from); - map->extras(filen) = header; - } // else some other, newer entry got into our cell - map->closeForWriting(newLocation, false); - return filen == newLocation; - } - - return false; -} - bool Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) const { @@ -470,6 +452,11 @@ Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) if (!map) return false; + // TODO: consider DB slots freed when older object would be replaced + if (dbSlotIndex->size() < + static_cast(max(entriesNeeded(diskSpaceNeeded), 1))) + return false; + // Do not start I/O transaction if there are less than 10% free pages left. // TODO: reserve page instead if (needsDiskStrand() && @@ -493,16 +480,6 @@ Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreI return NULL; } - // compute payload size for our cell header, using StoreEntry info - // careful: e.objectLen() may still be negative here - const int64_t expectedReplySize = e.mem_obj->expectedReplySize(); - assert(expectedReplySize >= 0); // must know to prevent cell overflows - assert(e.mem_obj->swap_hdr_sz > 0); - DbCellHeader header; - header.payloadSize = e.mem_obj->swap_hdr_sz + expectedReplySize; - const int64_t payloadEnd = sizeof(DbCellHeader) + header.payloadSize; - assert(payloadEnd <= max_objsize); - sfileno filen; Ipc::StoreMapSlot *const slot = map->openForWriting(reinterpret_cast(e.key), filen); @@ -510,25 +487,37 @@ Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreI debugs(47, 5, HERE << "map->add failed"); return NULL; } - e.swap_file_sz = header.payloadSize; // and will be copied to the map + + Ipc::Mem::PageId pageId; + if (!popDbSlot(pageId)) { + debugs(79, DBG_IMPORTANT, "WARNING: Rock cache_dir '" << filePath << + "' run out of DB slots"); + map->free(filen); + } + slot->set(e); - map->extras(filen) = header; // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno. // If that does not happen, the entry will not decrement the read level! - IoState *sio = new IoState(this, &e, cbFile, cbIo, data); + IoState *sio = new IoState(*this, &e, cbFile, cbIo, data); sio->swap_dirn = index; sio->swap_filen = filen; - sio->payloadEnd = payloadEnd; - sio->diskOffset = diskOffset(sio->swap_filen); + sio->diskOffset = diskOffset(pageId); + + DbCellHeader &firstDbSlot = dbSlot(pageId); + memcpy(firstDbSlot.key, e.key, sizeof(firstDbSlot.key)); + firstDbSlot.firstSlot = pageId.number; + firstDbSlot.nextSlot = 0; + ++firstDbSlot.version; + firstDbSlot.payloadSize = 0; + sio->dbSlot = &firstDbSlot; debugs(47,5, HERE << "dir " << index << " created new filen " << std::setfill('0') << std::hex << std::uppercase << std::setw(8) << - sio->swap_filen << std::dec << " at " << sio->diskOffset); - - assert(sio->diskOffset + payloadEnd <= diskOffsetLimit()); + sio->swap_filen << std::dec << " at " << + diskOffset(sio->swap_filen)); sio->file(theFile); @@ -539,8 +528,15 @@ Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreI int64_t Rock::SwapDir::diskOffset(int filen) const { - assert(filen >= 0); - return HeaderSize + max_objsize*filen; + assert(filen >= 0); + return HeaderSize + max_objsize*filen; +} + +int64_t +Rock::SwapDir::diskOffset(Ipc::Mem::PageId &pageId) const +{ + assert(pageId); + return diskOffset(pageId.number - 1); } int64_t @@ -550,6 +546,56 @@ Rock::SwapDir::diskOffsetLimit() const return diskOffset(map->entryLimit()); } +int +Rock::SwapDir::entryMaxPayloadSize() const +{ + return max_objsize - sizeof(DbCellHeader); +} + +int +Rock::SwapDir::entriesNeeded(const int64_t objSize) const +{ + return (objSize + entryMaxPayloadSize() - 1) / entryMaxPayloadSize(); +} + +bool +Rock::SwapDir::popDbSlot(Ipc::Mem::PageId &pageId) +{ + return dbSlotIndex->pop(pageId); +} + +Rock::DbCellHeader & +Rock::SwapDir::dbSlot(const Ipc::Mem::PageId &pageId) +{ + const DbCellHeader &s = const_cast(this)->dbSlot(pageId); + return const_cast(s); +} + +const Rock::DbCellHeader & +Rock::SwapDir::dbSlot(const Ipc::Mem::PageId &pageId) const +{ + assert(dbSlotIndex->pageIdIsValid(pageId)); + return dbSlots[pageId.number - 1]; +} + +void +Rock::SwapDir::cleanReadable(const sfileno fileno) +{ + Ipc::Mem::PageId pageId = map->extras(fileno).pageId; + Ipc::Mem::PageId nextPageId = pageId; + while (pageId) { + const DbCellHeader &curDbSlot = dbSlot(pageId); + nextPageId.number = curDbSlot.nextSlot; + const DbCellHeader &nextDbSlot = dbSlot(nextPageId); + const bool sameChain = memcmp(curDbSlot.key, nextDbSlot.key, + sizeof(curDbSlot.key)) == 0 && + curDbSlot.version == nextDbSlot.version; + dbSlotIndex->push(pageId); + if (sameChain) + pageId = nextPageId; + } +} + // tries to open an old or being-written-to entry with swap_filen for reading StoreIOState::Pointer Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data) @@ -579,12 +625,17 @@ Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOS if (!slot) return NULL; // we were writing afterall - IoState *sio = new IoState(this, &e, cbFile, cbIo, data); + IoState *sio = new IoState(*this, &e, cbFile, cbIo, data); sio->swap_dirn = index; sio->swap_filen = e.swap_filen; - sio->payloadEnd = sizeof(DbCellHeader) + map->extras(e.swap_filen).payloadSize; - assert(sio->payloadEnd <= max_objsize); // the payload fits the slot + sio->dbSlot = &dbSlot(map->extras(e.swap_filen).pageId); + + const Ipc::Mem::PageId &pageId = map->extras(e.swap_filen).pageId; + sio->diskOffset = diskOffset(pageId); + DbCellHeader &firstDbSlot = dbSlot(map->extras(e.swap_filen).pageId); + assert(memcmp(firstDbSlot.key, e.key, sizeof(firstDbSlot.key))); + assert(firstDbSlot.firstSlot == pageId.number); debugs(47,5, HERE << "dir " << index << " has old filen: " << std::setfill('0') << std::hex << std::uppercase << std::setw(8) << @@ -593,9 +644,6 @@ Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOS assert(slot->basics.swap_file_sz > 0); assert(slot->basics.swap_file_sz == e.swap_file_sz); - sio->diskOffset = diskOffset(sio->swap_filen); - assert(sio->diskOffset + sio->payloadEnd <= diskOffsetLimit()); - sio->file(theFile); return sio; } @@ -632,7 +680,6 @@ Rock::SwapDir::readCompleted(const char *buf, int rlen, int errflag, RefCount< : if (errflag == DISK_OK && rlen > 0) sio->offset_ += rlen; - assert(sio->diskOffset + sio->offset_ <= diskOffsetLimit()); // post-factum StoreIOState::STRCB *callback = sio->read.callback; assert(callback); @@ -654,15 +701,19 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest // close, assuming we only write once; the entry gets the read lock map->closeForWriting(sio.swap_filen, true); // do not increment sio.offset_ because we do it in sio->write() - } else { - // Do not abortWriting here. The entry should keep the write lock - // instead of losing association with the store and confusing core. - map->free(sio.swap_filen); // will mark as unusable, just in case - } - - assert(sio.diskOffset + sio.offset_ <= diskOffsetLimit()); // post-factum + if (request->isLast) + sio.finishedWriting(errflag); + } else + writeError(sio.swap_filen); +} - sio.finishedWriting(errflag); +void +Rock::SwapDir::writeError(const sfileno fileno) +{ + // Do not abortWriting here. The entry should keep the write lock + // instead of losing association with the store and confusing core. + map->free(fileno); // will mark as unusable, just in case + // XXX: should we call IoState callback? } bool @@ -827,18 +878,34 @@ RunnerRegistrationEntry(rrAfterConfig, SwapDirRr); void Rock::SwapDirRr::create(const RunnerRegistry &) { - Must(owners.empty()); + Must(mapOwners.empty() && dbSlotsOwners.empty()); for (int i = 0; i < Config.cacheSwap.n_configured; ++i) { if (const Rock::SwapDir *const sd = dynamic_cast(INDEXSD(i))) { - Rock::SwapDir::DirMap::Owner *const owner = - Rock::SwapDir::DirMap::Init(sd->path, sd->entryLimitAllowed()); - owners.push_back(owner); + const int64_t capacity = sd->entryLimitAllowed(); + SwapDir::DirMap::Owner *const mapOwner = + SwapDir::DirMap::Init(sd->path, capacity); + mapOwners.push_back(mapOwner); + + // XXX: remove pool id and counters from PageStack + Ipc::Mem::Owner *const dbSlotsOwner = + shm_new(Ipc::Mem::PageStack)(sd->path, i, capacity, + sizeof(DbCellHeader)); + dbSlotsOwners.push_back(dbSlotsOwner); + + // XXX: add method to initialize PageStack with no free pages + while (true) { + Ipc::Mem::PageId pageId; + if (!dbSlotsOwner->object()->pop(pageId)) + break; + } } } } Rock::SwapDirRr::~SwapDirRr() { - for (size_t i = 0; i < owners.size(); ++i) - delete owners[i]; + for (size_t i = 0; i < mapOwners.size(); ++i) { + delete mapOwners[i]; + delete dbSlotsOwners[i]; + } } diff --git a/src/fs/rock/RockSwapDir.h b/src/fs/rock/RockSwapDir.h index 29576ed43e..bd2739d010 100644 --- a/src/fs/rock/RockSwapDir.h +++ b/src/fs/rock/RockSwapDir.h @@ -6,6 +6,8 @@ #include "DiskIO/IORequestor.h" #include "fs/rock/RockDbCell.h" #include "ipc/StoreMap.h" +#include "ipc/mem/Page.h" +#include "ipc/mem/PageStack.h" class DiskIOStrategy; class ReadRequest; @@ -17,7 +19,7 @@ namespace Rock class Rebuild; /// \ingroup Rock -class SwapDir: public ::SwapDir, public IORequestor +class SwapDir: public ::SwapDir, public IORequestor, public Ipc::StoreMapCleaner { public: SwapDir(); @@ -36,10 +38,26 @@ public: virtual void create(); virtual void parse(int index, char *path); + // XXX: stop misusing max_objsize as slot size + virtual int64_t maxObjectSize() const { return max_objsize * entryLimitAllowed(); } + int64_t entryLimitHigh() const { return SwapFilenMax; } ///< Core limit int64_t entryLimitAllowed() const; - typedef Ipc::StoreMapWithExtras DirMap; + bool popDbSlot(Ipc::Mem::PageId &pageId); + DbCellHeader &dbSlot(const Ipc::Mem::PageId &pageId); + const DbCellHeader &dbSlot(const Ipc::Mem::PageId &pageId) const; + + int64_t diskOffset(Ipc::Mem::PageId &pageId) const; + void writeError(const sfileno fileno); + + virtual void cleanReadable(const sfileno fileno); + + // TODO: merge with MemStoreMapExtras? + struct MapExtras { + Ipc::Mem::PageId pageId; + }; + typedef Ipc::StoreMapWithExtras DirMap; protected: /* protected ::SwapDir API */ @@ -72,8 +90,6 @@ protected: void dumpRateOption(StoreEntry * e) const; void rebuild(); ///< starts loading and validating stored entry metadata - ///< used to add entries successfully loaded during rebuild - bool addEntry(const int fileno, const DbCellHeader &header, const StoreEntry &from); bool full() const; ///< no more entries can be stored without purging void trackReferences(StoreEntry &e); ///< add to replacement policy scope @@ -82,6 +98,8 @@ protected: int64_t diskOffset(int filen) const; int64_t diskOffsetLimit() const; int entryLimit() const { return map->entryLimit(); } + int entryMaxPayloadSize() const; + int entriesNeeded(const int64_t objSize) const; friend class Rebuild; const char *filePath; ///< location of cache storage file inside path/ @@ -92,6 +110,8 @@ private: DiskIOStrategy *io; RefCount theFile; ///< cache storage for this cache_dir DirMap *map; + DbCellHeader *dbSlots; + Ipc::Mem::Pointer dbSlotIndex; /* configurable options */ DiskFile::Config fileConfig; ///< file-level configuration options @@ -111,7 +131,8 @@ protected: virtual void create(const RunnerRegistry &); private: - Vector owners; + Vector mapOwners; + Vector< Ipc::Mem::Owner *> dbSlotsOwners; }; } // namespace Rock diff --git a/src/ipc/mem/Pointer.h b/src/ipc/mem/Pointer.h index 55cb55b86b..f46092d08c 100644 --- a/src/ipc/mem/Pointer.h +++ b/src/ipc/mem/Pointer.h @@ -32,6 +32,8 @@ public: ~Owner(); + Class *object() { return theObject; } + private: Owner(const char *const id, const off_t sharedSize);