From: Alex Rousskov Date: Mon, 7 Jan 2013 19:41:41 +0000 (-0700) Subject: Working Large Rock implementation, including concurrent store "rebuild" X-Git-Tag: SQUID_3_5_0_1~444^2~81 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=50dc81ecc5f3829fdb2ef389c4b5055934101bfe;p=thirdparty%2Fsquid.git Working Large Rock implementation, including concurrent store "rebuild" and a simple "scan" replacement policy. Needs polishing and possibly a swap.state equivalent for faster rebuild. --- diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am index 1c361986d4..61ea8c2ef5 100644 --- a/src/fs/Makefile.am +++ b/src/fs/Makefile.am @@ -38,6 +38,7 @@ libufs_la_SOURCES = \ librock_la_SOURCES = \ rock/RockDbCell.cc \ rock/RockDbCell.h \ + rock/RockForward.h \ rock/RockIoState.cc \ rock/RockIoState.h \ rock/RockIoRequests.cc \ diff --git a/src/fs/rock/RockDbCell.cc b/src/fs/rock/RockDbCell.cc index a756f0ac8d..bde7cc8a45 100644 --- a/src/fs/rock/RockDbCell.cc +++ b/src/fs/rock/RockDbCell.cc @@ -4,15 +4,8 @@ #include "squid.h" #include "fs/rock/RockDbCell.h" -#include "ipc/StoreMap.h" -#include "tools.h" -Rock::DbCellHeader::DbCellHeader(): firstSlot(0), nextSlot(0), version(0), - payloadSize(0) { - memset(&key, 0, sizeof(key)); -} - -bool -Rock::DbCellHeader::sane() const { - return firstSlot > 0; +Rock::DbCellHeader::DbCellHeader() +{ + memset(this, 0, sizeof(*this)); } diff --git a/src/fs/rock/RockDbCell.h b/src/fs/rock/RockDbCell.h index 9830933b25..31fb250e52 100644 --- a/src/fs/rock/RockDbCell.h +++ b/src/fs/rock/RockDbCell.h @@ -1,16 +1,14 @@ #ifndef SQUID_FS_ROCK_DB_CELL_H #define SQUID_FS_ROCK_DB_CELL_H -namespace Ipc -{ -class StoreMapSlot; -} +#include "typedefs.h" namespace Rock { /** \ingroup Rock * Meta-information at the beginning of every db cell. + * Links multiple map slots belonging to the same entry into an entry chain. * Stored on disk and used as sizeof() argument so it must remain POD. */ class DbCellHeader @@ -18,14 +16,21 @@ class DbCellHeader public: DbCellHeader(); - /// whether the freshly loaded header fields make sense - bool sane() const; + /// true iff no entry occupies this slot + bool empty() const { return !firstSlot && !nextSlot && !payloadSize; } + + /* members below are not meaningful if empty() */ + + /// whether this slot is not corrupted + bool sane() const { return firstSlot >= 0 && nextSlot >= -1 && + version > 0 && payloadSize > 0; } uint64_t key[2]; ///< StoreEntry key - uint32_t firstSlot; ///< first slot pointer in the entry chain - uint32_t nextSlot; ///< next slot pointer in the entry chain - uint32_t version; ///< entry chain version - uint32_t payloadSize; ///< cell contents size excluding this header + uint64_t entrySize; ///< total entry content size or zero if still unknown + uint32_t payloadSize; ///< slot contents size, always positive + uint32_t version; ///< detects conflicts among same-key entries + sfileno firstSlot; ///< slot ID of the first slot occupied by the entry + sfileno nextSlot; ///< slot ID of the next slot occupied by the entry }; } // namespace Rock diff --git a/src/fs/rock/RockForward.h b/src/fs/rock/RockForward.h new file mode 100644 index 0000000000..df44f331e9 --- /dev/null +++ b/src/fs/rock/RockForward.h @@ -0,0 +1,35 @@ +#ifndef SQUID_FS_ROCK_FORWARD_H +#define SQUID_FS_ROCK_FORWARD_H + +namespace Ipc +{ + +class StoreMapAnchor; +class StoreMapSlice; + +namespace Mem +{ +class PageId; +} + +} + + +namespace Rock +{ + +class SwapDir; + +/// db cell number, starting with cell 0 (always occupied by the db header) +typedef sfileno SlotId; + +class Rebuild; + +class IoState; + +class DbCellHeader; + +} + + +#endif /* SQUID_FS_ROCK_FORWARD_H */ diff --git a/src/fs/rock/RockIoState.cc b/src/fs/rock/RockIoState.cc index 9c2612eb4e..9fe072694e 100644 --- a/src/fs/rock/RockIoState.cc +++ b/src/fs/rock/RockIoState.cc @@ -3,8 +3,7 @@ */ #include "squid.h" -#include "MemObject.h" -#include "Parsing.h" +#include "base/TextException.h" #include "DiskIO/DiskIOModule.h" #include "DiskIO/DiskIOStrategy.h" #include "DiskIO/WriteRequest.h" @@ -12,19 +11,26 @@ #include "fs/rock/RockIoRequests.h" #include "fs/rock/RockSwapDir.h" #include "globals.h" +#include "MemObject.h" +#include "Mem.h" +#include "Parsing.h" -Rock::IoState::IoState(SwapDir &aDir, +Rock::IoState::IoState(Rock::SwapDir::Pointer &aDir, StoreEntry *anEntry, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data): - dbSlot(NULL), + readableAnchor_(NULL), + writeableAnchor_(NULL), + sidCurrent(-1), dir(aDir), - slotSize(dir.slotSize), - objOffset(0) + slotSize(dir->slotSize), + objOffset(0), + theBuf(dir->slotSize) { e = anEntry; - // swap_filen, swap_dirn and diskOffset are set by the caller + e->lock("rock I/O"); + // anchor, swap_filen, and swap_dirn are set by the caller file_callback = cbFile; callback = cbIo; callback_data = cbdataReference(data); @@ -35,9 +41,17 @@ Rock::IoState::IoState(SwapDir &aDir, Rock::IoState::~IoState() { --store_open_disk_fd; + + // The dir map entry may still be open for reading at the point because + // the map entry lock is associated with StoreEntry, not IoState. + // assert(!readableAnchor_); + assert(!writeableAnchor_); + if (callback_data) cbdataReferenceDone(callback_data); theFile = NULL; + + e->unlock("rock I/O"); } void @@ -48,136 +62,252 @@ Rock::IoState::file(const RefCount &aFile) theFile = aFile; } +const Ipc::StoreMapAnchor & +Rock::IoState::readAnchor() const +{ + assert(readableAnchor_); + return *readableAnchor_; +} + +Ipc::StoreMapAnchor & +Rock::IoState::writeAnchor() +{ + assert(writeableAnchor_); + return *writeableAnchor_; +} + +/// convenience wrapper returning the map slot we are reading now +const Ipc::StoreMapSlice & +Rock::IoState::currentReadableSlice() const +{ + return dir->map->readableSlice(swap_filen, sidCurrent); +} + void Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data) { + debugs(79, 7, swap_filen << " reads from " << coreOff); + assert(theFile != NULL); assert(coreOff >= 0); - Ipc::Mem::PageId pageId; - pageId.pool = dir.index; - if (coreOff < objOffset) { // rewind - pageId.number = dbSlot->firstSlot; - dbSlot = &dir.dbSlot(pageId); + // if we are dealing with the first read or + // if the offset went backwords, start searching from the beginning + if (sidCurrent < 0 || coreOff < objOffset) { + sidCurrent = readAnchor().start; objOffset = 0; } - while (coreOff >= objOffset + dbSlot->payloadSize) { - objOffset += dbSlot->payloadSize; - pageId.number = dbSlot->nextSlot; - assert(pageId); // XXX: should be an error? - dbSlot = &dir.dbSlot(pageId); + while (coreOff >= objOffset + currentReadableSlice().size) { + objOffset += currentReadableSlice().size; + sidCurrent = currentReadableSlice().next; + assert(sidCurrent >= 0); // XXX: handle "read offset too big" error } - if (pageId) - diskOffset = dir.diskOffset(pageId); offset_ = coreOff; len = min(len, - static_cast(objOffset + dbSlot->payloadSize - coreOff)); + static_cast(objOffset + currentReadableSlice().size - coreOff)); assert(read.callback == NULL); assert(read.callback_data == NULL); read.callback = cb; read.callback_data = cbdataReference(data); + const uint64_t diskOffset = dir->diskOffset(sidCurrent); theFile->read(new ReadRequest(::ReadRequest(buf, diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this)); } -// We only write data when full slot is accumulated or when close() is called. -// We buffer, in part, to avoid forcing OS to _read_ old unwritten portions -// of the slot when the write does not end at the page or sector boundary. -void +/// wraps tryWrite() to handle deep write failures centrally and safely +bool Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor) { - assert(dbSlot); - - if (theBuf.isNull()) { - theBuf.init(min(size + sizeof(DbCellHeader), slotSize), slotSize); - theBuf.appended(sizeof(DbCellHeader)); // will fill header in doWrite + bool success = false; + try { + tryWrite(buf, size, coreOff); + success = true; + } catch (const std::exception &e) { // TODO: should we catch ... as well? + debugs(79, 2, "db write error: " << e.what()); + dir->writeError(swap_filen); + finishedWriting(DISK_ERROR); + // 'this' might be gone beyond this point; fall through to free buf } - if (size <= static_cast(theBuf.spaceSize())) - theBuf.append(buf, size); - else { - Ipc::Mem::PageId pageId; - if (!dir.popDbSlot(pageId)) { - debugs(79, DBG_IMPORTANT, "WARNING: Rock cache_dir '" << dir.path << - "' run out of DB slots"); - dir.writeError(swap_filen); - // XXX: do we need to destroy buf on error? - if (dtor) - (dtor)(const_cast(buf)); // cast due to a broken API? - // XXX: do we need to call callback on error? - callBack(DISK_ERROR); - return; - } - DbCellHeader &nextDbSlot = dir.dbSlot(pageId); - memcpy(nextDbSlot.key, dbSlot->key, sizeof(nextDbSlot.key)); - nextDbSlot.firstSlot = dbSlot->firstSlot; - nextDbSlot.nextSlot = 0; - nextDbSlot.version = dbSlot->version; - nextDbSlot.payloadSize = 0; + // careful: 'this' might be gone here + + if (dtor) + (dtor)(const_cast(buf)); // cast due to a broken API? + + return success; +} + +/** We only write data when full slot is accumulated or when close() is called. + We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of + the slot when the write does not end at the page or sector boundary. */ +void +Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff) +{ + debugs(79, 7, swap_filen << " writes " << size << " more"); + + // either this is the first write or append; we do not support write gaps + assert(!coreOff || coreOff == -1); - dbSlot->nextSlot = pageId.number; + // allocate the first slice diring the first write + if (!coreOff) { + assert(sidCurrent < 0); + sidCurrent = reserveSlotForWriting(); // throws on failures + assert(sidCurrent >= 0); + writeAnchor().start = sidCurrent; + } - const size_t left = size - theBuf.spaceSize(); - offset_ += theBuf.spaceSize(); // so that Core thinks we wrote it - theBuf.append(buf, theBuf.spaceSize()); + // buffer incoming data in slot buffer and write overflowing or final slots + // quit when no data left or we stopped writing on reentrant error + while (size > 0 && theFile != NULL) { + assert(sidCurrent >= 0); + const size_t processed = writeToBuffer(buf, size); + buf += processed; + size -= processed; + const bool overflow = size > 0; + + // We do not write a full buffer without overflow because + // we would not yet know what to set the nextSlot to. + if (overflow) { + const SlotId sidNext = reserveSlotForWriting(); // throws + assert(sidNext >= 0); + writeToDisk(sidNext); + } + } +} - doWrite(); +/// Buffers incoming data for the current slot. +/// Returns the number of bytes buffered. +size_t +Rock::IoState::writeToBuffer(char const *buf, size_t size) +{ + // do not buffer a cell header for nothing + if (!size) + return 0; - dbSlot = &nextDbSlot; - diskOffset = dir.diskOffset(pageId); - theBuf.init(min(left, slotSize), slotSize); - write(buf + size - left, left, -1, NULL); + if (!theBuf.size) { + // will fill the header in writeToDisk when the next slot is known + theBuf.appended(sizeof(DbCellHeader)); } - if (dtor) - (dtor)(const_cast(buf)); // cast due to a broken API? + size_t forCurrentSlot = min(size, static_cast(theBuf.spaceSize())); + theBuf.append(buf, forCurrentSlot); + offset_ += forCurrentSlot; // so that Core thinks we wrote it + return forCurrentSlot; } -// write what was buffered during write() calls +/// write what was buffered during write() calls +/// negative sidNext means this is the last write request for this entry void -Rock::IoState::doWrite(const bool isLast) +Rock::IoState::writeToDisk(const SlotId sidNext) { assert(theFile != NULL); - assert(!theBuf.isNull()); + assert(theBuf.size >= sizeof(DbCellHeader)); + + if (sidNext < 0) { // we are writing the last slot + e->swap_file_sz = offset_; + writeAnchor().basics.swap_file_sz = offset_; // would not hurt, right? + } // TODO: if DiskIO module is mmap-based, we should be writing whole pages // to avoid triggering read-page;new_head+old_tail;write-page overheads + const uint64_t diskOffset = dir->diskOffset(sidCurrent); debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' << - theBuf.contentSize()); + theBuf.size); + + // finalize map slice + Ipc::StoreMap::Slice &slice = + dir->map->writeableSlice(swap_filen, sidCurrent); + slice.next = sidNext; + slice.size = theBuf.size - sizeof(DbCellHeader); + + // finalize db cell header + DbCellHeader header; + memcpy(header.key, e->key, sizeof(header.key)); + header.firstSlot = writeAnchor().start; + header.nextSlot = sidNext; + header.payloadSize = theBuf.size - sizeof(DbCellHeader); + header.entrySize = e->swap_file_sz; // may still be zero unless sidNext < 0 + header.version = writeAnchor().basics.timestamp; + + // copy finalized db cell header into buffer + memcpy(theBuf.mem, &header, sizeof(DbCellHeader)); + + // and now allocate another buffer for the WriteRequest so that + // we can support concurrent WriteRequests (and to ease cleaning) + // TODO: should we limit the number of outstanding requests? + size_t wBufCap = 0; + void *wBuf = memAllocBuf(theBuf.size, &wBufCap); + memcpy(wBuf, theBuf.mem, theBuf.size); + + WriteRequest *const r = new WriteRequest( + ::WriteRequest(static_cast(wBuf), diskOffset, theBuf.size, + memFreeBufFunc(wBufCap)), this, sidNext < 0); + theBuf.clear(); - dbSlot->payloadSize = theBuf.contentSize() - sizeof(DbCellHeader); - memcpy(theBuf.content(), dbSlot, sizeof(DbCellHeader)); + sidCurrent = sidNext; - assert(static_cast(theBuf.contentSize()) <= slotSize); // theFile->write may call writeCompleted immediatelly - WriteRequest *const r = new WriteRequest( - ::WriteRequest(theBuf.content(), diskOffset, theBuf.contentSize(), - theBuf.freeFunc()), this, isLast); theFile->write(r); } -// +/// finds and returns a free db slot to fill or throws +Rock::SlotId +Rock::IoState::reserveSlotForWriting() +{ + Ipc::Mem::PageId pageId; + if (dir->useFreeSlot(pageId)) + return pageId.number-1; + + // This may happen when the number of available db slots is close to the + // number of concurrent requests reading or writing those slots, which may + // happen when the db is "small" compared to the request traffic OR when we + // are rebuilding and have not loaded "many" entries or empty slots yet. + throw TexcHere("ran out of free db slots"); +} + void Rock::IoState::finishedWriting(const int errFlag) { // we incremented offset_ while accumulating data in write() + writeableAnchor_ = NULL; callBack(errFlag); } void Rock::IoState::close(int how) { - debugs(79, 3, HERE << swap_filen << " accumulated: " << offset_ << - " how=" << how); - if (how == wroteAll && !theBuf.isNull()) - doWrite(true); - else - callBack(how == writerGone ? DISK_ERROR : 0); // TODO: add DISK_CALLER_GONE + debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how << + " buf: " << theBuf.size << " callback: " << callback); + + if (!theFile) { + debugs(79, 3, "I/O already canceled"); + assert(!callback); + assert(!writeableAnchor_); + assert(!readableAnchor_); + return; + } + + switch (how) { + case wroteAll: + assert(theBuf.size > 0); // we never flush last bytes on our own + writeToDisk(-1); // flush last, yet unwritten slot to disk + return; // writeCompleted() will callBack() + + case writerGone: + assert(writeableAnchor_); + dir->writeError(swap_filen); // abort a partially stored entry + finishedWriting(DISK_ERROR); + return; + + case readerDone: + callBack(0); + return; + } } /// close callback (STIOCB) dialer: breaks dependencies and diff --git a/src/fs/rock/RockIoState.h b/src/fs/rock/RockIoState.h index 191bf25c3c..8c4ff5313f 100644 --- a/src/fs/rock/RockIoState.h +++ b/src/fs/rock/RockIoState.h @@ -1,8 +1,8 @@ #ifndef SQUID_FS_ROCK_IO_STATE_H #define SQUID_FS_ROCK_IO_STATE_H -#include "MemBuf.h" -#include "SwapDir.h" +#include "fs/rock/RockSwapDir.h" +#include "MemBlob.h" class DiskFile; @@ -18,33 +18,48 @@ class IoState: public ::StoreIOState public: typedef RefCount Pointer; - IoState(SwapDir &aDir, StoreEntry *e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data); + IoState(Rock::SwapDir::Pointer &aDir, StoreEntry *e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data); virtual ~IoState(); void file(const RefCount &aFile); // ::StoreIOState API virtual void read_(char *buf, size_t size, off_t offset, STRCB * callback, void *callback_data); - virtual void write(char const *buf, size_t size, off_t offset, FREE * free_func); + virtual bool write(char const *buf, size_t size, off_t offset, FREE * free_func); virtual void close(int how); - void finishedWriting(int errFlag); + /// whether we are still waiting for the I/O results (i.e., not closed) + bool stillWaiting() const { return theFile != NULL; } - int64_t diskOffset; ///< the start of this cell inside the db file - DbCellHeader *dbSlot; ///< current db slot, used for writing + /// called by SwapDir::writeCompleted() after the last write and on error + void finishedWriting(const int errFlag); MEMPROXY_CLASS(IoState); + /* one and only one of these will be set and locked; access via *Anchor() */ + const Ipc::StoreMapAnchor *readableAnchor_; ///< starting point for reading + Ipc::StoreMapAnchor *writeableAnchor_; ///< starting point for writing + + SlotId sidCurrent; ///< ID of the db slot currently being read or written + private: - void doWrite(const bool isLast = false); + const Ipc::StoreMapAnchor &readAnchor() const; + Ipc::StoreMapAnchor &writeAnchor(); + const Ipc::StoreMapSlice ¤tReadableSlice() const; + + void tryWrite(char const *buf, size_t size, off_t offset); + size_t writeToBuffer(char const *buf, size_t size); + void writeToDisk(const SlotId nextSlot); + SlotId reserveSlotForWriting(); + void callBack(int errflag); - SwapDir &dir; ///< swap dir object + Rock::SwapDir::Pointer dir; ///< swap dir that initiated I/O const size_t slotSize; ///< db cell size int64_t objOffset; ///< object offset for current db slot RefCount theFile; // "file" responsible for this I/O - MemBuf theBuf; // use for write content accumulation only + MemBlob theBuf; // use for write content accumulation only }; MEMPROXY_CLASS_INLINE(IoState); diff --git a/src/fs/rock/RockRebuild.cc b/src/fs/rock/RockRebuild.cc index 59096d29ad..077b2ab36d 100644 --- a/src/fs/rock/RockRebuild.cc +++ b/src/fs/rock/RockRebuild.cc @@ -21,30 +21,103 @@ CBDATA_NAMESPACED_CLASS_INIT(Rock, Rebuild); +namespace Rock { + +/// maintains information about the store entry being loaded from disk +/// used for identifying partially stored/loaded entries +class LoadingEntry { +public: + LoadingEntry(): size(0), version(0), state(leEmpty), anchored(0), + mapped(0), freed(0), more(-1) {} + + /* store entry-level information indexed by sfileno */ + uint64_t size; ///< payload seen so far + uint32_t version; ///< DbCellHeader::version to distinguish same-URL chains + uint32_t state:3; ///< current entry state (one of the State values) + uint32_t anchored:1; ///< whether we loaded the inode slot for this entry + + /* db slot-level information indexed by slotId, starting with firstSlot */ + uint32_t mapped:1; ///< whether this slot was added to a mapped entry + uint32_t freed:1; ///< whether this slot was marked as free + sfileno more:25; ///< another slot in some entry chain (unordered) + bool used() const { return freed || mapped || more != -1; } + + /// possible entry states + typedef enum { leEmpty = 0, leLoading, leLoaded, leCorrupted, leIgnored } State; +}; + +} /* namespace Rock */ + +/** + Several layers of information is manipualted during the rebuild: + + Store Entry: Response message plus all the metainformation associated with + it. Identified by store key. At any given time, from Squid point + of view, there is only one entry with a given key, but several + different entries with the same key can be observed in any historical + archive (such as an access log or a store database). + + Slot chain: A sequence of db slots representing a Store Entry state at + some point in time. Identified by key+version combination. Due to + transaction aborts, crashes, and idle periods, some chains may contain + incomplete or stale information. We assume that no two different chains + have the same key and version. If that assumption fails, we may serve a + hodgepodge entry during rebuild, until "extra" slots are loaded/noticed. + + Db slot: A db record containing a piece of a single store entry and linked + to other slots with the same key and version fields, forming a chain. + Slots are identified by their absolute position in the database file, + which is naturally unique. + + + Except for the "mapped", "freed", and "more" fields, LoadingEntry info is + entry-level and is stored at fileno position. In other words, the array of + LoadingEntries should be interpreted as two arrays, one that maps slot ID + to the LoadingEntry::mapped/free/more members, and the second one that maps + fileno to all other LoadingEntry members. StoreMap maps slot key to fileno. + + + When information from the newly loaded db slot contradicts the entry-level + information collected so far (e.g., the versions do not match or the total + chain size after the slot contribution exceeds the expected number), the + whole entry (and not just the chain or the slot!) is declared corrupted. + + Why invalidate the whole entry? Rock Store is written for high-load + environments with large caches, where there is usually very few idle slots + in the database. A space occupied by a purged entry is usually immediately + reclaimed. A Squid crash or a transaction abort is rather unlikely to + leave a relatively large number of stale slots in the database. Thus, the + number of potentially corrupted entries is relatively small. On the other + hand, the damage from serving a single hadgepodge entry may be significant + to the user. In such an environment, invalidating the whole entry has + negligible performance impact but saves us from high-damage bugs. +*/ + + Rock::Rebuild::Rebuild(SwapDir *dir): AsyncJob("Rock::Rebuild"), sd(dir), + entries(NULL), dbSize(0), dbEntrySize(0), dbEntryLimit(0), dbSlot(0), fd(-1), dbOffset(0), - filen(0) + slotPos(0), + validationPos(0) { assert(sd); memset(&counts, 0, sizeof(counts)); dbSize = sd->diskOffsetLimit(); // we do not care about the trailer waste dbEntrySize = sd->slotSize; dbEntryLimit = sd->entryLimit(); - processed.reserve(dbEntryLimit); - while (static_cast(processed.size()) < dbEntryLimit) - processed.push_back(false); } Rock::Rebuild::~Rebuild() { if (fd >= 0) file_close(fd); + delete[] entries; } /// prepares and initiates entry loading sequence @@ -66,12 +139,18 @@ Rock::Rebuild::start() if (fd < 0) failure("cannot open db", errno); - char buf[SwapDir::HeaderSize]; - if (read(fd, buf, sizeof(buf)) != SwapDir::HeaderSize) + char hdrBuf[SwapDir::HeaderSize]; + if (read(fd, hdrBuf, sizeof(hdrBuf)) != SwapDir::HeaderSize) failure("cannot read db header", errno); + // slot prefix of SM_PAGE_SIZE should fit both core entry header and ours + assert(sizeof(DbCellHeader) < SM_PAGE_SIZE); + buf.init(SM_PAGE_SIZE, SM_PAGE_SIZE); + dbOffset = SwapDir::HeaderSize; - filen = 0; + slotPos = 0; + + entries = new LoadingEntry[dbEntryLimit]; checkpoint(); } @@ -80,19 +159,15 @@ Rock::Rebuild::start() void Rock::Rebuild::checkpoint() { - if (dbOffset < dbSize) + if (!done()) eventAdd("Rock::Rebuild", Rock::Rebuild::Steps, this, 0.01, 1, true); - else - if (!doneAll()) { - eventAdd("Rock::Rebuild::Step2", Rock::Rebuild::Steps2, this, 0.01, 1, - true); - } } bool Rock::Rebuild::doneAll() const { - return dbSlot >= dbSize && AsyncJob::doneAll(); + return dbOffset >= dbSize && validationPos >= dbEntryLimit && + AsyncJob::doneAll(); } void @@ -103,16 +178,20 @@ Rock::Rebuild::Steps(void *data) } void -Rock::Rebuild::Steps2(void *data) +Rock::Rebuild::steps() { - // use async call to enable job call protection that time events lack - CallJobHere(47, 5, static_cast(data), Rock::Rebuild, steps2); + if (dbOffset < dbSize) + loadingSteps(); + else + validationSteps(); + + checkpoint(); } void -Rock::Rebuild::steps() +Rock::Rebuild::loadingSteps() { - debugs(47,5, HERE << sd->index << " filen " << filen << " at " << + debugs(47,5, HERE << sd->index << " slot " << slotPos << " at " << dbOffset << " <= " << dbSize); // Balance our desire to maximize the number of entries processed at once @@ -123,9 +202,9 @@ Rock::Rebuild::steps() int loaded = 0; while (loaded < dbEntryLimit && dbOffset < dbSize) { - doOneEntry(); + loadOneSlot(); dbOffset += dbEntrySize; - ++filen; + ++slotPos; ++loaded; if (counts.scancount % 1000 == 0) @@ -142,47 +221,12 @@ Rock::Rebuild::steps() break; } } - - checkpoint(); } void -Rock::Rebuild::steps2() +Rock::Rebuild::loadOneSlot() { - debugs(47,5, HERE << sd->index << " filen " << filen << " at " << - dbSlot << " <= " << dbSize); - - // Balance our desire to maximize the number of slots processed at once - // (and, hence, minimize overheads and total rebuild time) with a - // requirement to also process Coordinator events, disk I/Os, etc. - const int maxSpentMsec = 50; // keep small: most RAM I/Os are under 1ms - const timeval loopStart = current_time; - - int loaded = 0; - while (dbSlot < dbSize) { - doOneSlot(); - ++dbSlot; - ++loaded; - - if (opt_foreground_rebuild) - continue; // skip "few entries at a time" check below - - getCurrentTime(); - const double elapsedMsec = tvSubMsec(loopStart, current_time); - if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) { - debugs(47, 5, HERE << "pausing after " << loaded << " slots in " << - elapsedMsec << "ms; " << (elapsedMsec/loaded) << "ms per slot"); - break; - } - } - - checkpoint(); -} - -void -Rock::Rebuild::doOneEntry() -{ - debugs(47,5, HERE << sd->index << " filen " << filen << " at " << + debugs(47,5, HERE << sd->index << " slot " << slotPos << " at " << dbOffset << " <= " << dbSize); ++counts.scancount; @@ -190,56 +234,144 @@ Rock::Rebuild::doOneEntry() if (lseek(fd, dbOffset, SEEK_SET) < 0) failure("cannot seek to db entry", errno); - MemBuf buf; - buf.init(sizeof(DbCellHeader), sizeof(DbCellHeader)); + buf.reset(); if (!storeRebuildLoadEntry(fd, sd->index, buf, counts)) return; - // get our header - Ipc::Mem::PageId pageId; - pageId.pool = sd->index; - pageId.number = filen + 1; - DbCellHeader &header = sd->dbSlot(pageId); - assert(!header.sane()); + const SlotId slotId = slotPos; + // get our header + DbCellHeader header; if (buf.contentSize() < static_cast(sizeof(header))) { debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " << "Ignoring truncated cache entry meta data at " << dbOffset); - invalidSlot(pageId); + freeSlotIfIdle(slotId, true); return; } - memcpy(&header, buf.content(), sizeof(header)); + memcpy(&header, buf.content(), sizeof(header)); + if (header.empty()) { + freeSlotIfIdle(slotId, false); + return; + } if (!header.sane()) { debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " << "Ignoring malformed cache entry meta data at " << dbOffset); - invalidSlot(pageId); + freeSlotIfIdle(slotId, true); return; } + buf.consume(sizeof(header)); // optimize to avoid memmove() + + useNewSlot(slotId, header); +} + +/// parse StoreEntry basics and add them to the map, returning true on success +bool +Rock::Rebuild::importEntry(Ipc::StoreMapAnchor &anchor, const sfileno fileno, const DbCellHeader &header) +{ + cache_key key[SQUID_MD5_DIGEST_LENGTH]; + StoreEntry loadedE; + if (!storeRebuildParseEntry(buf, loadedE, key, counts, 0)) + return false; + + const uint64_t knownSize = header.entrySize > 0 ? + header.entrySize : anchor.basics.swap_file_sz; + if (!loadedE.swap_file_sz && knownSize) + loadedE.swap_file_sz = knownSize; + // the entry size may still be unknown at this time + + debugs(47, 8, "importing entry basics for " << fileno); + anchor.set(loadedE); + + // we have not validated whether all db cells for this entry were loaded + EBIT_CLR(anchor.basics.flags, ENTRY_VALIDATED); + + // loadedE->dump(5); + + return true; } void -Rock::Rebuild::doOneSlot() +Rock::Rebuild::validationSteps() { - debugs(47,5, HERE << sd->index << " filen " << filen << " at " << - dbSlot << " <= " << dbSize); + debugs(47, 5, sd->index << " validating from " << validationPos); - if (processed[dbSlot]) - return; + // see loadingSteps() for the rationale; TODO: avoid duplication + const int maxSpentMsec = 50; // keep small: validation does not do I/O + const timeval loopStart = current_time; - Ipc::Mem::PageId pageId; - pageId.pool = sd->index; - pageId.number = dbSlot + 1; - const DbCellHeader &dbSlot = sd->dbSlot(pageId); - assert(dbSlot.sane()); + int validated = 0; + while (validationPos < dbEntryLimit) { + validateOneEntry(); + ++validationPos; + ++validated; - pageId.number = dbSlot.firstSlot; - //const DbCellHeader &firstChainSlot = sd->dbSlot(pageId); + if (validationPos % 1000 == 0) + debugs(20, 2, "validated: " << validationPos); - /* Process all not yet loaded slots, verify entry chains, if chain - is valid, load entry from first slot similar to small rock, - call SwapDir::addEntry (needs to be restored). */ + if (opt_foreground_rebuild) + continue; // skip "few entries at a time" check below + + getCurrentTime(); + const double elapsedMsec = tvSubMsec(loopStart, current_time); + if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) { + debugs(47, 5, "pausing after " << validated << " entries in " << + elapsedMsec << "ms; " << (elapsedMsec/validated) << "ms per entry"); + break; + } + } +} + +void +Rock::Rebuild::validateOneEntry() +{ + LoadingEntry &e = entries[validationPos]; + switch (e.state) { + + case LoadingEntry::leEmpty: + break; // no entry hashed to this position + + case LoadingEntry::leLoading: + freeBadEntry(validationPos, "partially stored"); + break; + + case LoadingEntry::leLoaded: + break; // we have already unlocked this entry + + case LoadingEntry::leCorrupted: + break; // we have already removed this entry + } +} + +/// Marks remaining bad entry slots as free and unlocks the entry. The map +/// cannot do this because Loading entries may have holes in the slots chain. +void +Rock::Rebuild::freeBadEntry(const sfileno fileno, const char *eDescription) +{ + debugs(47, 2, "cache_dir #" << sd->index << ' ' << eDescription << + " entry " << fileno << " is ignored during rebuild"); + + Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno); + + bool freedSome = false; + // free all loaded non-anchor slots + SlotId slotId = entries[anchor.start].more; + while (slotId >= 0) { + const SlotId next = entries[slotId].more; + freeSlot(slotId, false); + slotId = next; + freedSome = true; + } + // free anchor slot if it was loaded + if (entries[fileno].anchored) { + freeSlot(anchor.start, false); + freedSome = true; + } + assert(freedSome); + + sd->map->forgetWritingEntry(fileno); + ++counts.invalid; } void @@ -254,7 +386,7 @@ Rock::Rebuild::swanSong() void Rock::Rebuild::failure(const char *msg, int errNo) { - debugs(47,5, HERE << sd->index << " filen " << filen << " at " << + debugs(47,5, HERE << sd->index << " slot " << slotPos << " at " << dbOffset << " <= " << dbSize); if (errNo) @@ -266,9 +398,285 @@ Rock::Rebuild::failure(const char *msg, int errNo) sd->index, sd->filePath, msg); } -void Rock::Rebuild::invalidSlot(Ipc::Mem::PageId &pageId) +/// adds slot to the free slot index +void +Rock::Rebuild::freeSlot(const SlotId slotId, const bool invalid) { - ++counts.invalid; - processed[pageId.number - 1] = true; - sd->dbSlotIndex->push(pageId); + debugs(47,5, sd->index << " frees slot " << slotId); + LoadingEntry &le = entries[slotId]; + assert(!le.freed); + le.freed = 1; + + if (invalid) { + ++counts.invalid; + //sd->unlink(fileno); leave garbage on disk, it should not hurt + } + + Ipc::Mem::PageId pageId; + pageId.pool = sd->index+1; + pageId.number = slotId+1; + sd->freeSlots->push(pageId); +} + +/// adds slot to the free slot index but only if the slot is unused +void +Rock::Rebuild::freeSlotIfIdle(const SlotId slotId, const bool invalid) +{ + const LoadingEntry &le = entries[slotId]; + + // mapped slots must be freed via freeBadEntry() to keep the map in sync + assert(!le.mapped); + + if (!le.used()) + freeSlot(slotId, invalid); +} + +/// adds slot to the entry chain in the map +void +Rock::Rebuild::mapSlot(const SlotId slotId, const DbCellHeader &header) +{ + LoadingEntry &le = entries[slotId]; + assert(!le.mapped); + assert(!le.freed); + le.mapped = 1; + + Ipc::StoreMapSlice slice; + slice.next = header.nextSlot; + slice.size = header.payloadSize; + sd->map->importSlice(slotId, slice); +} + +/// adds slot to an existing entry chain; caller must check that the slot +/// belongs to the chain it is being added to +void +Rock::Rebuild::addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header) +{ + LoadingEntry &le = entries[fileno]; + Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno); + + assert(le.version == header.version); + + // mark anchor as loaded or add the secondary slot to the chain + LoadingEntry &inode = entries[header.firstSlot]; + if (header.firstSlot == slotId) { + debugs(47,5, "adding inode"); + assert(!inode.freed); + le.anchored = 1; + } else { + debugs(47,9, "linking " << slotId << " to " << inode.more); + // we do not need to preserve the order + LoadingEntry &slice = entries[slotId]; + assert(!slice.freed); + assert(slice.more < 0); + slice.more = inode.more; + inode.more = slotId; + } + + if (header.firstSlot == slotId && !importEntry(anchor, fileno, header)) { + le.state = LoadingEntry::leCorrupted; + freeBadEntry(fileno, "corrupted metainfo"); + return; + } + + // set total entry size and/or check it for consistency + uint64_t totalSize = header.entrySize; + assert(totalSize != static_cast(-1)); + if (!totalSize && anchor.basics.swap_file_sz) { + assert(anchor.basics.swap_file_sz != static_cast(-1)); + // perhaps we loaded a later slot (with entrySize) earlier + totalSize = anchor.basics.swap_file_sz; + } else + if (totalSize && !anchor.basics.swap_file_sz) { + anchor.basics.swap_file_sz = totalSize; + assert(anchor.basics.swap_file_sz != static_cast(-1)); + } else + if (totalSize != anchor.basics.swap_file_sz) { + le.state = LoadingEntry::leCorrupted; + freeBadEntry(fileno, "size mismatch"); + return; + } + + le.size += header.payloadSize; + + if (totalSize > 0 && le.size > totalSize) { // overflow + le.state = LoadingEntry::leCorrupted; + freeBadEntry(fileno, "overflowing"); + return; + } + + mapSlot(slotId, header); + if (totalSize > 0 && le.size == totalSize) { + // entry fully loaded, unlock it + // we have validated that all db cells for this entry were loaded + EBIT_SET(anchor.basics.flags, ENTRY_VALIDATED); + le.state = LoadingEntry::leLoaded; + sd->map->closeForWriting(fileno, false); + ++counts.objcount; + } +} + +/// initialize housekeeping information for a newly accepted entry +void +Rock::Rebuild::primeNewEntry(Ipc::StoreMap::Anchor &anchor, const sfileno fileno, const DbCellHeader &header) +{ + anchor.setKey(reinterpret_cast(header.key)); + assert(header.firstSlot >= 0); + anchor.start = header.firstSlot; + + assert(anchor.basics.swap_file_sz != static_cast(-1)); + + LoadingEntry &le = entries[fileno]; + le.state = LoadingEntry::leLoading; + le.version = header.version; + le.size = 0; +} + +/// handle a slot from an entry that we have not seen before +void +Rock::Rebuild::startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header) +{ + // If some other from-disk entry is/was using this slot as its inode OR + // if some other from-disk entry is/was using our inode slot, then the + // entries are conflicting. We cannot identify other entries, so we just + // remove ours and hope that the others were/will be handled correctly. + const LoadingEntry &slice = entries[slotId]; + const LoadingEntry &inode = entries[header.firstSlot]; + if (slice.used() || inode.used()) { + debugs(47,8, "slice/inode used: " << slice.used() << inode.used()); + LoadingEntry &le = entries[fileno]; + le.state = LoadingEntry::leCorrupted; + freeSlotIfIdle(slotId, slotId == header.firstSlot); + // if not idle, the other entry will handle its slice + ++counts.clashcount; + return; + } + + // A miss may have been stored at our fileno while we were loading other + // slots from disk. We ought to preserve that entry because it is fresher. + const bool overwriteExisting = false; + if (Ipc::StoreMap::Anchor *anchor = sd->map->openForWritingAt(fileno, overwriteExisting)) { + primeNewEntry(*anchor, fileno, header); + addSlotToEntry(fileno, slotId, header); // may fail + assert(anchor->basics.swap_file_sz != static_cast(-1)); + } else { + // A new from-network entry is occupying our map slot; let it be, but + // save us from the trouble of going through the above motions again. + LoadingEntry &le = entries[fileno]; + le.state = LoadingEntry::leIgnored; + freeSlotIfIdle(slotId, false); + } +} + +/// does the header belong to the fileno entry being loaded? +bool +Rock::Rebuild::sameEntry(const sfileno fileno, const DbCellHeader &header) const +{ + const Ipc::StoreMap::Anchor &anchor = sd->map->writeableEntry(fileno); + const LoadingEntry &le = entries[fileno]; + // any order will work, but do fast comparisons first: + return le.version == header.version && + anchor.start == static_cast(header.firstSlot) && + anchor.sameKey(reinterpret_cast(header.key)); +} + +/// is the new header consistent with information already loaded? +bool +Rock::Rebuild::canAdd(const sfileno fileno, const SlotId slotId, const DbCellHeader &header) const +{ + if (!sameEntry(fileno, header)) { + debugs(79, 7, "cannot add; wrong entry"); + return false; + } + + const LoadingEntry &le = entries[slotId]; + // We cannot add a slot that was already declared free or mapped. + if (le.freed || le.mapped) { + debugs(79, 7, "cannot add; freed/mapped: " << le.freed << le.mapped); + return false; + } + + if (slotId == header.firstSlot) { + // If we are the inode, the anchored flag cannot be set yet. + if (entries[fileno].anchored) { + debugs(79, 7, "cannot add; extra anchor"); + return false; + } + + // And there should have been some other slot for this entry to exist. + if (le.more < 0) { + debugs(79, 7, "cannot add; missing slots"); + return false; + } + + return true; + } + + // We are the continuation slice so the more field is reserved for us. + if (le.more >= 0) { + debugs(79, 7, "cannot add; foreign slot"); + return false; + } + + return true; +} + +/// handle freshly loaded (and validated) db slot header +void +Rock::Rebuild::useNewSlot(const SlotId slotId, const DbCellHeader &header) +{ + LoadingEntry &slice = entries[slotId]; + assert(!slice.freed); // we cannot free what was not loaded + + const cache_key *const key = + reinterpret_cast(header.key); + const sfileno fileno = sd->map->anchorIndexByKey(key); + assert(0 <= fileno && fileno < dbEntryLimit); + + LoadingEntry &le = entries[fileno]; + debugs(47,9, "entry " << fileno << " state: " << le.state << ", inode: " << + header.firstSlot << ", size: " << header.payloadSize); + + switch (le.state) { + + case LoadingEntry::leEmpty: { + startNewEntry(fileno, slotId, header); + break; + } + + case LoadingEntry::leLoading: { + if (canAdd(fileno, slotId, header)) { + addSlotToEntry(fileno, slotId, header); + } else { + // either the loading chain or this slot is stale; + // be conservative and ignore both (and any future ones) + le.state = LoadingEntry::leCorrupted; + freeBadEntry(fileno, "duplicated"); + freeSlotIfIdle(slotId, slotId == header.firstSlot); + ++counts.dupcount; + } + break; + } + + case LoadingEntry::leLoaded: { + // either the previously loaded chain or this slot is stale; + // be conservative and ignore both (and any future ones) + le.state = LoadingEntry::leCorrupted; + sd->map->freeEntry(fileno); // may not be immediately successful + freeSlotIfIdle(slotId, slotId == header.firstSlot); + ++counts.dupcount; + break; + } + + case LoadingEntry::leCorrupted: { + // previously seen slots messed things up so we must ignore this one + freeSlotIfIdle(slotId, false); + break; + } + + case LoadingEntry::leIgnored: { + // already replaced by a fresher or colliding from-network entry + freeSlotIfIdle(slotId, false); + break; + } + } } diff --git a/src/fs/rock/RockRebuild.h b/src/fs/rock/RockRebuild.h index e5386a8bac..d56182fc4f 100644 --- a/src/fs/rock/RockRebuild.h +++ b/src/fs/rock/RockRebuild.h @@ -3,20 +3,14 @@ #include "base/AsyncJob.h" #include "cbdata.h" +#include "fs/rock/RockForward.h" +#include "MemBuf.h" #include "store_rebuild.h" -namespace Ipc -{ -namespace Mem -{ -class PageId; -} -} - namespace Rock { -class SwapDir; +class LoadingEntry; /// \ingroup Rock /// manages store rebuild process: loading meta information from db on disk @@ -35,13 +29,31 @@ protected: private: void checkpoint(); void steps(); - void steps2(); - void doOneEntry(); - void doOneSlot(); + void loadingSteps(); + void validationSteps(); + void loadOneSlot(); + void validateOneEntry(); + bool importEntry(Ipc::StoreMapAnchor &anchor, const sfileno slotId, const DbCellHeader &header); + void freeBadEntry(const sfileno fileno, const char *eDescription); + void failure(const char *msg, int errNo = 0); - void invalidSlot(Ipc::Mem::PageId &pageId); + + void startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header); + void primeNewEntry(Ipc::StoreMapAnchor &anchor, const sfileno fileno, const DbCellHeader &header); + void addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header); + void useNewSlot(const SlotId slotId, const DbCellHeader &header); + + void mapSlot(const SlotId slotId, const DbCellHeader &header); + void freeSlotIfIdle(const SlotId slotId, const bool invalid); + void freeBusySlot(const SlotId slotId, const bool invalid); + void freeSlot(const SlotId slotId, const bool invalid); + + bool canAdd(const sfileno fileno, const SlotId slotId, const DbCellHeader &header) const; + bool sameEntry(const sfileno fileno, const DbCellHeader &header) const; + SwapDir *sd; + LoadingEntry *entries; ///< store entries being loaded from disk int64_t dbSize; int dbEntrySize; @@ -50,15 +62,13 @@ private: int fd; // store db file descriptor int64_t dbOffset; - int filen; - - // TODO: use std::bitmap? - Vector processed; ///< true iff rebuilt is complete for a given slot + sfileno slotPos; + sfileno validationPos; + MemBuf buf; StoreRebuildData counts; static void Steps(void *data); - static void Steps2(void *data); CBDATA_CLASS2(Rebuild); }; diff --git a/src/fs/rock/RockSwapDir.cc b/src/fs/rock/RockSwapDir.cc index f9d6efbeea..2bc3fd3d2e 100644 --- a/src/fs/rock/RockSwapDir.cc +++ b/src/fs/rock/RockSwapDir.cc @@ -31,7 +31,8 @@ const int64_t Rock::SwapDir::HeaderSize = 16*1024; Rock::SwapDir::SwapDir(): ::SwapDir("rock"), - slotSize(HeaderSize), filePath(NULL), io(NULL), map(NULL), dbSlots(NULL) + slotSize(HeaderSize), filePath(NULL), map(NULL), io(NULL), allSlots(NULL), + waitingForPage(NULL) { } @@ -63,11 +64,11 @@ Rock::SwapDir::get(const cache_key *key) return NULL; sfileno filen; - const Ipc::StoreMapSlot *const slot = map->openForReading(key, filen); + const Ipc::StoreMapAnchor *const slot = map->openForReading(key, filen); if (!slot) return NULL; - const Ipc::StoreMapSlot::Basics &basics = slot->basics; + const Ipc::StoreMapAnchor::Basics &basics = slot->basics; // create a brand new store entry and initialize it with stored basics StoreEntry *e = new StoreEntry(); @@ -116,8 +117,8 @@ void Rock::SwapDir::disconnect(StoreEntry &e) uint64_t Rock::SwapDir::currentSize() const { - const uint64_t spaceSize = !dbSlotIndex ? - maxSize() : (slotSize * dbSlotIndex->size()); + const uint64_t spaceSize = !freeSlots ? + maxSize() : (slotSize * freeSlots->size()); // everything that is not free is in use return maxSize() - spaceSize; } @@ -217,6 +218,7 @@ Rock::SwapDir::init() Must(!map); map = new DirMap(inodeMapPath()); + map->cleaner = this; const char *ioModule = needsDiskStrand() ? "IpcIo" : "Blocking"; if (DiskIOModule *m = DiskIOModule::Find(ioModule)) { @@ -233,9 +235,7 @@ Rock::SwapDir::init() theFile->configure(fileConfig); theFile->open(O_RDWR, 0644, this); - dbSlotIndex = shm_old(Ipc::Mem::PageStack)(spaceIndexPath()); - dbSlots = new (reinterpret_cast(dbSlotIndex.getRaw()) + - dbSlotIndex->stackSize()) DbCellHeader[entryLimitAllowed()]; + freeSlots = shm_old(Ipc::Mem::PageStack)(freeSlotsPath()); // Increment early. Otherwise, if one SwapDir finishes rebuild before // others start, storeRebuildComplete() will think the rebuild is over! @@ -500,11 +500,6 @@ Rock::SwapDir::canStore(const StoreEntry &e, int64_t diskSpaceNeeded, int &load) if (!map) return false; - // TODO: consider DB slots freed when older object would be replaced - if (dbSlotIndex->size() < - static_cast(max(entriesNeeded(diskSpaceNeeded), 1))) - return false; - // Do not start I/O transaction if there are less than 10% free pages left. // TODO: reserve page instead if (needsDiskStrand() && @@ -529,42 +524,29 @@ Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreI } sfileno filen; - Ipc::StoreMapSlot *const slot = + Ipc::StoreMapAnchor *const slot = map->openForWriting(reinterpret_cast(e.key), filen); if (!slot) { debugs(47, 5, HERE << "map->add failed"); return NULL; } - Ipc::Mem::PageId pageId; - if (!popDbSlot(pageId)) { - debugs(79, DBG_IMPORTANT, "WARNING: Rock cache_dir '" << filePath << - "' run out of DB slots"); - map->free(filen); - } - + assert(filen >= 0); slot->set(e); // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno. // If that does not happen, the entry will not decrement the read level! - IoState *sio = new IoState(*this, &e, cbFile, cbIo, data); + Rock::SwapDir::Pointer self(this); + IoState *sio = new IoState(self, &e, cbFile, cbIo, data); sio->swap_dirn = index; sio->swap_filen = filen; - sio->diskOffset = diskOffset(pageId); - - DbCellHeader &firstDbSlot = dbSlot(pageId); - memcpy(firstDbSlot.key, e.key, sizeof(firstDbSlot.key)); - firstDbSlot.firstSlot = pageId.number; - firstDbSlot.nextSlot = 0; - ++firstDbSlot.version; - firstDbSlot.payloadSize = 0; - sio->dbSlot = &firstDbSlot; + sio->writeableAnchor_ = slot; debugs(47,5, HERE << "dir " << index << " created new filen " << std::setfill('0') << std::hex << std::uppercase << std::setw(8) << - sio->swap_filen << std::dec << " at " << + sio->swap_filen << std::dec << " starting at " << diskOffset(sio->swap_filen)); sio->file(theFile); @@ -607,44 +589,50 @@ Rock::SwapDir::entriesNeeded(const int64_t objSize) const } bool -Rock::SwapDir::popDbSlot(Ipc::Mem::PageId &pageId) +Rock::SwapDir::useFreeSlot(Ipc::Mem::PageId &pageId) { - return dbSlotIndex->pop(pageId); -} + if (freeSlots->pop(pageId)) { + debugs(47, 5, "got a previously free slot: " << pageId); + return true; + } -Rock::DbCellHeader & -Rock::SwapDir::dbSlot(const Ipc::Mem::PageId &pageId) -{ - const DbCellHeader &s = const_cast(this)->dbSlot(pageId); - return const_cast(s); + // catch free slots delivered to noteFreeMapSlice() + assert(!waitingForPage); + waitingForPage = &pageId; + if (map->purgeOne()) { + assert(!waitingForPage); // noteFreeMapSlice() should have cleared it + assert(pageId.set()); + debugs(47, 5, "got a previously busy slot: " << pageId); + return true; + } + assert(waitingForPage == &pageId); + waitingForPage = NULL; + + debugs(47, 3, "cannot get a slot; entries: " << map->entryCount()); + return false; } -const Rock::DbCellHeader & -Rock::SwapDir::dbSlot(const Ipc::Mem::PageId &pageId) const +bool +Rock::SwapDir::validSlotId(const SlotId slotId) const { - assert(dbSlotIndex->pageIdIsValid(pageId)); - return dbSlots[pageId.number - 1]; + return 0 <= slotId && slotId < entryLimitAllowed(); } void -Rock::SwapDir::cleanReadable(const sfileno fileno) -{ - Ipc::Mem::PageId pageId = map->extras(fileno).pageId; - Ipc::Mem::PageId nextPageId = pageId; - while (pageId) { - const DbCellHeader &curDbSlot = dbSlot(pageId); - nextPageId.number = curDbSlot.nextSlot; - const DbCellHeader &nextDbSlot = dbSlot(nextPageId); - const bool sameChain = memcmp(curDbSlot.key, nextDbSlot.key, - sizeof(curDbSlot.key)) == 0 && - curDbSlot.version == nextDbSlot.version; - dbSlotIndex->push(pageId); - if (sameChain) - pageId = nextPageId; +Rock::SwapDir::noteFreeMapSlice(const sfileno sliceId) +{ + Ipc::Mem::PageId pageId; + pageId.pool = index+1; + pageId.number = sliceId+1; + if (waitingForPage) { + *waitingForPage = pageId; + waitingForPage = NULL; + } else { + freeSlots->push(pageId); } } -// tries to open an old or being-written-to entry with swap_filen for reading +// tries to open an old entry with swap_filen for reading StoreIOState::Pointer Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOState::STIOCB *cbIo, void *data) { @@ -669,30 +657,26 @@ Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOS // The are two ways an entry can get swap_filen: our get() locked it for // reading or our storeSwapOutStart() locked it for writing. Peeking at our // locked entry is safe, but no support for reading a filling entry. - const Ipc::StoreMapSlot *slot = map->peekAtReader(e.swap_filen); + const Ipc::StoreMapAnchor *slot = map->peekAtReader(e.swap_filen); if (!slot) return NULL; // we were writing afterall - IoState *sio = new IoState(*this, &e, cbFile, cbIo, data); + Rock::SwapDir::Pointer self(this); + IoState *sio = new IoState(self, &e, cbFile, cbIo, data); sio->swap_dirn = index; sio->swap_filen = e.swap_filen; - sio->dbSlot = &dbSlot(map->extras(e.swap_filen).pageId); - - const Ipc::Mem::PageId &pageId = map->extras(e.swap_filen).pageId; - sio->diskOffset = diskOffset(pageId); - DbCellHeader &firstDbSlot = dbSlot(map->extras(e.swap_filen).pageId); - assert(memcmp(firstDbSlot.key, e.key, sizeof(firstDbSlot.key))); - assert(firstDbSlot.firstSlot == pageId.number); + sio->readableAnchor_ = slot; + sio->file(theFile); debugs(47,5, HERE << "dir " << index << " has old filen: " << std::setfill('0') << std::hex << std::uppercase << std::setw(8) << sio->swap_filen); + assert(slot->sameKey(static_cast(e.key))); assert(slot->basics.swap_file_sz > 0); assert(slot->basics.swap_file_sz == e.swap_file_sz); - sio->file(theFile); return sio; } @@ -745,14 +729,24 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest assert(request->sio != NULL); IoState &sio = *request->sio; + // quit if somebody called IoState::close() while we were waiting + if (!sio.stillWaiting()) { + debugs(79, 3, "ignoring closed entry " << sio.swap_filen); + return; + } + if (errflag == DISK_OK) { - // close, assuming we only write once; the entry gets the read lock - map->closeForWriting(sio.swap_filen, true); // do not increment sio.offset_ because we do it in sio->write() - if (request->isLast) + if (request->isLast) { + // close, the entry gets the read lock + map->closeForWriting(sio.swap_filen, true); sio.finishedWriting(errflag); - } else + } + } else { writeError(sio.swap_filen); + sio.finishedWriting(errflag); + // and hope that Core will call disconnect() to close the map entry + } } void @@ -760,14 +754,14 @@ Rock::SwapDir::writeError(const sfileno fileno) { // Do not abortWriting here. The entry should keep the write lock // instead of losing association with the store and confusing core. - map->free(fileno); // will mark as unusable, just in case - // XXX: should we call IoState callback? + map->freeEntry(fileno); // will mark as unusable, just in case + // All callers must also call IoState callback, to propagate the error. } bool Rock::SwapDir::full() const { - return map && map->full(); + return freeSlots != NULL && !freeSlots->size(); } // storeSwapOutFileClosed calls this nethod on DISK_NO_SPACE_LEFT, @@ -783,49 +777,10 @@ Rock::SwapDir::diskFull() void Rock::SwapDir::maintain() { - debugs(47,3, HERE << "cache_dir[" << index << "] guards: " << - !repl << !map << !full() << StoreController::store_dirs_rebuilding); - - if (!repl) - return; // no means (cannot find a victim) - - if (!map) - return; // no victims (yet) - - if (!full()) - return; // no need (to find a victim) - - // XXX: UFSSwapDir::maintain says we must quit during rebuild - if (StoreController::store_dirs_rebuilding) - return; - - debugs(47,3, HERE << "cache_dir[" << index << "] state: " << map->full() << - ' ' << currentSize() << " < " << diskOffsetLimit()); - - // Hopefully, we find a removable entry much sooner (TODO: use time?) - const int maxProbed = 10000; - RemovalPurgeWalker *walker = repl->PurgeInit(repl, maxProbed); - - // It really should not take that long, but this will stop "infinite" loops - const int maxFreed = 1000; - int freed = 0; - // TODO: should we purge more than needed to minimize overheads? - for (; freed < maxFreed && full(); ++freed) { - if (StoreEntry *e = walker->Next(walker)) - e->release(); // will call our unlink() method - else - break; // no more objects - } - - debugs(47,2, HERE << "Rock cache_dir[" << index << "] freed " << freed << - " scanned " << walker->scanned << '/' << walker->locked); - - walker->Done(walker); - - if (full()) { - debugs(47, DBG_CRITICAL, "ERROR: Rock cache_dir[" << index << "] " << - "is still full after freeing " << freed << " entries. A bug?"); - } + // The Store calls this to free some db space, but there is nothing wrong + // with a full() db, except when db has to shrink after reconfigure, and + // we do not support shrinking yet (it would have to purge specific slots). + // TODO: Disable maintain() requests when they are pointless. } void @@ -859,7 +814,7 @@ Rock::SwapDir::unlink(StoreEntry &e) { debugs(47, 5, HERE << e); ignoreReferences(e); - map->free(e.swap_filen); + map->freeEntry(e.swap_filen); disconnect(e); } @@ -896,6 +851,12 @@ Rock::SwapDir::statfs(StoreEntry &e) const storeAppendPrintf(&e, "Current entries: %9d %.2f%%\n", entryCount, (100.0 * entryCount / limit)); + const unsigned int slotsFree = !freeSlots ? 0 : freeSlots->size(); + if (slotsFree <= static_cast(limit)) { + const int usedSlots = limit - static_cast(slotsFree); + storeAppendPrintf(&e, "Used slots: %9d %.2f%%\n", + usedSlots, (100.0 * usedSlots / limit)); + } if (limit < 100) { // XXX: otherwise too expensive to count Ipc::ReadWriteLockStats stats; map->updateStats(stats); @@ -928,7 +889,7 @@ Rock::SwapDir::inodeMapPath() const { } const char * -Rock::SwapDir::spaceIndexPath() const { +Rock::SwapDir::freeSlotsPath() const { static String spacesPath; spacesPath = path; spacesPath.append("_spaces"); @@ -942,7 +903,7 @@ RunnerRegistrationEntry(rrAfterConfig, SwapDirRr); void Rock::SwapDirRr::create(const RunnerRegistry &) { - Must(mapOwners.empty() && dbSlotsOwners.empty()); + Must(mapOwners.empty() && freeSlotsOwners.empty()); for (int i = 0; i < Config.cacheSwap.n_configured; ++i) { if (const Rock::SwapDir *const sd = dynamic_cast(INDEXSD(i))) { const int64_t capacity = sd->entryLimitAllowed(); @@ -952,16 +913,16 @@ void Rock::SwapDirRr::create(const RunnerRegistry &) mapOwners.push_back(mapOwner); // XXX: remove pool id and counters from PageStack - Ipc::Mem::Owner *const dbSlotsOwner = - shm_new(Ipc::Mem::PageStack)(sd->spaceIndexPath(), - i, capacity, + Ipc::Mem::Owner *const freeSlotsOwner = + shm_new(Ipc::Mem::PageStack)(sd->freeSlotsPath(), + i+1, capacity, sizeof(DbCellHeader)); - dbSlotsOwners.push_back(dbSlotsOwner); + freeSlotsOwners.push_back(freeSlotsOwner); // XXX: add method to initialize PageStack with no free pages while (true) { Ipc::Mem::PageId pageId; - if (!dbSlotsOwner->object()->pop(pageId)) + if (!freeSlotsOwner->object()->pop(pageId)) break; } } @@ -972,6 +933,6 @@ Rock::SwapDirRr::~SwapDirRr() { for (size_t i = 0; i < mapOwners.size(); ++i) { delete mapOwners[i]; - delete dbSlotsOwners[i]; + delete freeSlotsOwners[i]; } } diff --git a/src/fs/rock/RockSwapDir.h b/src/fs/rock/RockSwapDir.h index 60c5451733..1480601be0 100644 --- a/src/fs/rock/RockSwapDir.h +++ b/src/fs/rock/RockSwapDir.h @@ -5,6 +5,7 @@ #include "DiskIO/DiskFile.h" #include "DiskIO/IORequestor.h" #include "fs/rock/RockDbCell.h" +#include "fs/rock/RockForward.h" #include "ipc/StoreMap.h" #include "ipc/mem/Page.h" #include "ipc/mem/PageStack.h" @@ -16,12 +17,13 @@ class WriteRequest; namespace Rock { -class Rebuild; - /// \ingroup Rock class SwapDir: public ::SwapDir, public IORequestor, public Ipc::StoreMapCleaner { public: + typedef RefCount Pointer; + typedef Ipc::StoreMap DirMap; + SwapDir(); virtual ~SwapDir(); @@ -41,25 +43,24 @@ public: // temporary path to the shared memory map of first slots of cached entries const char *inodeMapPath() const; // temporary path to the shared memory stack of free slots - const char *spaceIndexPath() const; + const char *freeSlotsPath() const; int64_t entryLimitHigh() const { return SwapFilenMax; } ///< Core limit int64_t entryLimitAllowed() const; - bool popDbSlot(Ipc::Mem::PageId &pageId); - DbCellHeader &dbSlot(const Ipc::Mem::PageId &pageId); - const DbCellHeader &dbSlot(const Ipc::Mem::PageId &pageId) const; + /// removes a slot from a list of free slots or returns false + bool useFreeSlot(Ipc::Mem::PageId &pageId); + /// whether the given slot ID may point to a slot in this db + bool validSlotId(const SlotId slotId) const; + /// purges one or more entries to make full() false and free some slots + void purgeSome(); int64_t diskOffset(Ipc::Mem::PageId &pageId) const; + int64_t diskOffset(int filen) const; void writeError(const sfileno fileno); - virtual void cleanReadable(const sfileno fileno); - - // TODO: merge with MemStoreMapExtras? - struct MapExtras { - Ipc::Mem::PageId pageId; - }; - typedef Ipc::StoreMapWithExtras DirMap; + /* StoreMapCleaner API */ + virtual void noteFreeMapSlice(const sfileno fileno); uint64_t slotSize; ///< all db slots are of this size @@ -101,23 +102,24 @@ protected: void trackReferences(StoreEntry &e); ///< add to replacement policy scope void ignoreReferences(StoreEntry &e); ///< delete from repl policy scope - int64_t diskOffset(int filen) const; int64_t diskOffsetLimit() const; int entryLimit() const { return map->entryLimit(); } int entryMaxPayloadSize() const; int entriesNeeded(const int64_t objSize) const; friend class Rebuild; + friend class IoState; const char *filePath; ///< location of cache storage file inside path/ + DirMap *map; ///< entry key/sfileno to MaxExtras/inode mapping private: void createError(const char *const msg); DiskIOStrategy *io; RefCount theFile; ///< cache storage for this cache_dir - DirMap *map; - DbCellHeader *dbSlots; - Ipc::Mem::Pointer dbSlotIndex; + DbCellHeader *allSlots; ///< SlotId to DbCellHeader mapping + Ipc::Mem::Pointer freeSlots; ///< free slots + Ipc::Mem::PageId *waitingForPage; ///< one-page cache for a "hot" free slot /* configurable options */ DiskFile::Config fileConfig; ///< file-level configuration options @@ -138,7 +140,7 @@ protected: private: Vector mapOwners; - Vector< Ipc::Mem::Owner *> dbSlotsOwners; + Vector< Ipc::Mem::Owner *> freeSlotsOwners; }; } // namespace Rock diff --git a/src/ipc/Makefile.am b/src/ipc/Makefile.am index 3a08d6ea46..cf4932cc6d 100644 --- a/src/ipc/Makefile.am +++ b/src/ipc/Makefile.am @@ -21,6 +21,8 @@ libipc_la_SOURCES = \ StartListening.h \ StoreMap.cc \ StoreMap.h \ + StoreMapSlice.cc \ + StoreMapSlice.h \ StrandCoord.cc \ StrandCoord.h \ StrandCoords.h \ diff --git a/src/ipc/StoreMap.cc b/src/ipc/StoreMap.cc index bb365513fd..99a040c9c7 100644 --- a/src/ipc/StoreMap.cc +++ b/src/ipc/StoreMap.cc @@ -31,34 +31,90 @@ Ipc::StoreMap::StoreMap(const char *const aPath): cleaner(NULL), path(aPath), shared->limit); } -Ipc::StoreMap::Slot * +int +Ipc::StoreMap::compareVersions(const sfileno fileno, time_t newVersion) const +{ + assert(valid(fileno)); + Anchor &inode = shared->slots[fileno].anchor; + + // note: we do not lock, so comparison may be inacurate + + if (inode.state == Anchor::Empty) + return +2; + + if (const time_t diff = newVersion - inode.basics.timestamp) + return diff < 0 ? -1 : +1; + + return 0; +} + +void +Ipc::StoreMap::forgetWritingEntry(sfileno fileno) +{ + assert(valid(fileno)); + Anchor &inode = shared->slots[fileno].anchor; + + assert(inode.state == Anchor::Writeable); + + // we do not iterate slices because we were told to forget about + // them; the caller is responsible for freeing them (most likely + // our slice list is incomplete or has holes) + + inode.waitingToBeFreed = false; + inode.state = Anchor::Empty; + + inode.lock.unlockExclusive(); + --shared->count; + + debugs(54, 8, "closed entry " << fileno << " for writing " << path); +} + +Ipc::StoreMap::Anchor * Ipc::StoreMap::openForWriting(const cache_key *const key, sfileno &fileno) { - debugs(54, 5, HERE << " trying to open slot for key " << storeKeyText(key) + debugs(54, 5, "opening entry with key " << storeKeyText(key) << " for writing in map [" << path << ']'); - const int idx = slotIndexByKey(key); + const int idx = anchorIndexByKey(key); - Slot &s = shared->slots[idx]; + if (Anchor *anchor = openForWritingAt(idx)) { + fileno = idx; + return anchor; + } + + return NULL; +} + +Ipc::StoreMap::Anchor * +Ipc::StoreMap::openForWritingAt(const sfileno fileno, bool overwriteExisting) +{ + Anchor &s = shared->slots[fileno].anchor; ReadWriteLock &lock = s.lock; if (lock.lockExclusive()) { - assert(s.state != Slot::Writeable); // until we start breaking locks + assert(s.state != Anchor::Writeable); // until we start breaking locks + + // bail if we cannot empty this position + if (!s.waitingToBeFreed && s.state == Anchor::Readable && !overwriteExisting) { + lock.unlockExclusive(); + debugs(54, 5, "cannot open existing entry at " << fileno << + " for writing in map [" << path << ']'); + return NULL; + } // free if the entry was used, keeping the entry locked - if (s.waitingToBeFreed || s.state == Slot::Readable) - freeLocked(s, true); + if (s.waitingToBeFreed || s.state == Anchor::Readable) + freeChain(fileno, s, true); - assert(s.state == Slot::Empty); + assert(s.state == Anchor::Empty); ++shared->count; - s.state = Slot::Writeable; - fileno = idx; + s.state = Anchor::Writeable; + //s.setKey(key); // XXX: the caller should do that - debugs(54, 5, HERE << " opened slot at " << idx << - " for writing in map [" << path << ']'); + debugs(54, 5, "opened entry " << fileno << " for writing " << path); return &s; // and keep the entry locked - } + } - debugs(54, 5, HERE << " failed to open slot at " << idx << + debugs(54, 5, "cannot open busy entry at " << fileno << " for writing in map [" << path << ']'); return NULL; } @@ -66,57 +122,86 @@ Ipc::StoreMap::openForWriting(const cache_key *const key, sfileno &fileno) void Ipc::StoreMap::closeForWriting(const sfileno fileno, bool lockForReading) { - debugs(54, 5, HERE << " closing slot at " << fileno << " for writing and " - "openning for reading in map [" << path << ']'); assert(valid(fileno)); - Slot &s = shared->slots[fileno]; - assert(s.state == Slot::Writeable); - s.state = Slot::Readable; - if (lockForReading) + Anchor &s = shared->slots[fileno].anchor; + assert(s.state == Anchor::Writeable); + s.state = Anchor::Readable; + if (lockForReading) { s.lock.switchExclusiveToShared(); - else + debugs(54, 5, "switched entry at " << fileno << + " from writing to reading in map [" << path << ']'); + } else { s.lock.unlockExclusive(); + debugs(54, 5, "closed entry " << fileno << " for writing " << path); + } +} + +Ipc::StoreMap::Slice & +Ipc::StoreMap::writeableSlice(const AnchorId anchorId, const SliceId sliceId) +{ + assert(valid(anchorId)); + assert(shared->slots[anchorId].anchor.state == Anchor::Writeable); + assert(valid(sliceId)); + return shared->slots[sliceId].slice; +} + +const Ipc::StoreMap::Slice & +Ipc::StoreMap::readableSlice(const AnchorId anchorId, const SliceId sliceId) const +{ + assert(valid(anchorId)); + assert(shared->slots[anchorId].anchor.state == Anchor::Readable); + assert(valid(sliceId)); + return shared->slots[sliceId].slice; +} + +Ipc::StoreMap::Anchor & +Ipc::StoreMap::writeableEntry(const AnchorId anchorId) +{ + assert(valid(anchorId)); + assert(shared->slots[anchorId].anchor.state == Anchor::Writeable); + return shared->slots[anchorId].anchor; } /// terminate writing the entry, freeing its slot for others to use void Ipc::StoreMap::abortWriting(const sfileno fileno) { - debugs(54, 5, HERE << " abort writing slot at " << fileno << - " in map [" << path << ']'); + debugs(54, 5, "abort entry at " << fileno << + " for writing in map [" << path << ']'); assert(valid(fileno)); - Slot &s = shared->slots[fileno]; - assert(s.state == Slot::Writeable); - freeLocked(s, false); + Anchor &s = shared->slots[fileno].anchor; + assert(s.state == Anchor::Writeable); + freeChain(fileno, s, false); + debugs(54, 5, "closed entry " << fileno << " for writing " << path); } void Ipc::StoreMap::abortIo(const sfileno fileno) { - debugs(54, 5, HERE << " abort I/O for slot at " << fileno << - " in map [" << path << ']'); + debugs(54, 5, "abort entry at " << fileno << + " for I/O in map [" << path << ']'); assert(valid(fileno)); - Slot &s = shared->slots[fileno]; + Anchor &s = shared->slots[fileno].anchor; // The caller is a lock holder. Thus, if we are Writeable, then the // caller must be the writer; otherwise the caller must be the reader. - if (s.state == Slot::Writeable) + if (s.state == Anchor::Writeable) abortWriting(fileno); else closeForReading(fileno); } -const Ipc::StoreMap::Slot * +const Ipc::StoreMap::Anchor * Ipc::StoreMap::peekAtReader(const sfileno fileno) const { assert(valid(fileno)); - const Slot &s = shared->slots[fileno]; + const Anchor &s = shared->slots[fileno].anchor; switch (s.state) { - case Slot::Readable: + case Anchor::Readable: return &s; // immediate access by lock holder so no locking - case Slot::Writeable: + case Anchor::Writeable: return NULL; // cannot read the slot when it is being written - case Slot::Empty: + case Anchor::Empty: assert(false); // must be locked for reading or writing } assert(false); // not reachable @@ -124,85 +209,141 @@ Ipc::StoreMap::peekAtReader(const sfileno fileno) const } void -Ipc::StoreMap::free(const sfileno fileno) +Ipc::StoreMap::freeEntry(const sfileno fileno) { - debugs(54, 5, HERE << " marking slot at " << fileno << " to be freed in" + debugs(54, 5, HERE << " marking entry at " << fileno << " to be freed in" " map [" << path << ']'); assert(valid(fileno)); - Slot &s = shared->slots[fileno]; + Anchor &s = shared->slots[fileno].anchor; if (s.lock.lockExclusive()) - freeLocked(s, false); + freeChain(fileno, s, false); else s.waitingToBeFreed = true; // mark to free it later } -const Ipc::StoreMap::Slot * +/// unconditionally frees an already locked chain of slots, unlocking if needed +void +Ipc::StoreMap::freeChain(const sfileno fileno, Anchor &inode, const bool keepLocked) +{ + debugs(54, 7, "freeing " << inode.state << " entry " << fileno << + " in map [" << path << ']'); + if (inode.state == Anchor::Readable && cleaner) { + sfileno sliceId = inode.start; + debugs(54, 7, "first slice " << sliceId); + while (sliceId >= 0) { + const sfileno nextId = shared->slots[sliceId].slice.next; + cleaner->noteFreeMapSlice(sliceId); // might change slice state + sliceId = nextId; + } + } + + inode.waitingToBeFreed = false; + inode.state = Anchor::Empty; + + if (!keepLocked) + inode.lock.unlockExclusive(); + --shared->count; + debugs(54, 5, "freed entry " << fileno << + " in map [" << path << ']'); +} + +const Ipc::StoreMap::Anchor * Ipc::StoreMap::openForReading(const cache_key *const key, sfileno &fileno) { - debugs(54, 5, HERE << " trying to open slot for key " << storeKeyText(key) + debugs(54, 5, "opening entry with key " << storeKeyText(key) << " for reading in map [" << path << ']'); - const int idx = slotIndexByKey(key); - if (const Slot *slot = openForReadingAt(idx)) { + const int idx = anchorIndexByKey(key); + if (const Anchor *slot = openForReadingAt(idx)) { if (slot->sameKey(key)) { fileno = idx; - debugs(54, 5, HERE << " opened slot at " << fileno << " for key " - << storeKeyText(key) << " for reading in map [" << path << - ']'); return slot; // locked for reading } slot->lock.unlockShared(); + debugs(54, 7, "closed entry " << idx << " for reading " << path); } - debugs(54, 5, HERE << " failed to open slot for key " << storeKeyText(key) - << " for reading in map [" << path << ']'); return NULL; } -const Ipc::StoreMap::Slot * +const Ipc::StoreMap::Anchor * Ipc::StoreMap::openForReadingAt(const sfileno fileno) { - debugs(54, 5, HERE << " trying to open slot at " << fileno << " for " - "reading in map [" << path << ']'); + debugs(54, 5, "opening entry at " << fileno << + " for reading in map [" << path << ']'); assert(valid(fileno)); - Slot &s = shared->slots[fileno]; + Anchor &s = shared->slots[fileno].anchor; if (!s.lock.lockShared()) { - debugs(54, 5, HERE << " failed to lock slot at " << fileno << " for " - "reading in map [" << path << ']'); + debugs(54, 5, "cannot open busy entry at " << fileno << + "for reading in map [" << path << ']'); return NULL; } - if (s.state == Slot::Empty) { + if (s.state == Anchor::Empty) { s.lock.unlockShared(); - debugs(54, 7, HERE << " empty slot at " << fileno << " for " - "reading in map [" << path << ']'); + debugs(54, 7, "cannot open empty entry at " << fileno << + " for reading in map [" << path << ']'); return NULL; } if (s.waitingToBeFreed) { s.lock.unlockShared(); - debugs(54, 7, HERE << " dirty slot at " << fileno << " for " - "reading in map [" << path << ']'); + debugs(54, 7, HERE << "cannot open marked entry at " << fileno << + " for reading in map [" << path << ']'); return NULL; } // cannot be Writing here if we got shared lock and checked Empty above - assert(s.state == Slot::Readable); - debugs(54, 5, HERE << " opened slot at " << fileno << " for reading in" - " map [" << path << ']'); + assert(s.state == Anchor::Readable); + debugs(54, 5, "opened entry " << fileno << " for reading " << path); return &s; } void Ipc::StoreMap::closeForReading(const sfileno fileno) { - debugs(54, 5, HERE << " closing slot at " << fileno << " for reading in " - "map [" << path << ']'); assert(valid(fileno)); - Slot &s = shared->slots[fileno]; - assert(s.state == Slot::Readable); + Anchor &s = shared->slots[fileno].anchor; + assert(s.state == Anchor::Readable); s.lock.unlockShared(); + debugs(54, 5, "closed entry " << fileno << " for reading " << path); +} + +bool +Ipc::StoreMap::purgeOne() +{ + // Hopefully, we find a removable entry much sooner (TODO: use time?). + // The min() will protect us from division by zero inside the loop. + const int searchLimit = min(10000, entryLimit()); + int tries = 0; + for (; tries < searchLimit; ++tries) { + const sfileno fileno = shared->victim++ % shared->limit; + assert(valid(fileno)); + Anchor &s = shared->slots[fileno].anchor; + if (s.lock.lockExclusive()) { + if (s.state == Anchor::Readable) { // skip empties + // this entry may be marked for deletion, and that is OK + freeChain(fileno, s, false); + debugs(54, 5, "purged entry at " << fileno); + return true; + } + s.lock.unlockExclusive(); + } + } + debugs(54, 5, "found no entries to purge; tried: " << tries); + return false; +} + +void +Ipc::StoreMap::importSlice(const SliceId sliceId, const Slice &slice) +{ + // Slices are imported into positions that should not be available via + // "get free slice" API. This is not something we can double check + // reliably because the anchor for the imported slice may not have been + // imported yet. + assert(valid(sliceId)); + shared->slots[sliceId].slice = slice; } int @@ -227,7 +368,7 @@ void Ipc::StoreMap::updateStats(ReadWriteLockStats &stats) const { for (int i = 0; i < shared->limit; ++i) - shared->slots[i].lock.updateStats(stats); + shared->slots[i].anchor.lock.updateStats(stats); } bool @@ -236,62 +377,48 @@ Ipc::StoreMap::valid(const int pos) const return 0 <= pos && pos < entryLimit(); } -int -Ipc::StoreMap::slotIndexByKey(const cache_key *const key) const +sfileno +Ipc::StoreMap::anchorIndexByKey(const cache_key *const key) const { const uint64_t *const k = reinterpret_cast(key); // TODO: use a better hash function return (k[0] + k[1]) % shared->limit; } -Ipc::StoreMap::Slot & -Ipc::StoreMap::slotByKey(const cache_key *const key) +Ipc::StoreMap::Anchor & +Ipc::StoreMap::anchorByKey(const cache_key *const key) { - return shared->slots[slotIndexByKey(key)]; + return shared->slots[anchorIndexByKey(key)].anchor; } -/// unconditionally frees the already exclusively locked slot and releases lock -void -Ipc::StoreMap::freeLocked(Slot &s, bool keepLocked) -{ - if (s.state == Slot::Readable && cleaner) - cleaner->cleanReadable(&s - shared->slots.raw()); - - s.waitingToBeFreed = false; - s.state = Slot::Empty; - if (!keepLocked) - s.lock.unlockExclusive(); - --shared->count; - debugs(54, 5, HERE << " freed slot at " << (&s - shared->slots.raw()) << - " in map [" << path << ']'); -} -/* Ipc::StoreMapSlot */ +/* Ipc::StoreMapAnchor */ -Ipc::StoreMapSlot::StoreMapSlot(): state(Empty) +Ipc::StoreMapAnchor::StoreMapAnchor(): start(0), state(Empty) { memset(&key, 0, sizeof(key)); memset(&basics, 0, sizeof(basics)); + // keep in sync with rewind() } void -Ipc::StoreMapSlot::setKey(const cache_key *const aKey) +Ipc::StoreMapAnchor::setKey(const cache_key *const aKey) { memcpy(key, aKey, sizeof(key)); } bool -Ipc::StoreMapSlot::sameKey(const cache_key *const aKey) const +Ipc::StoreMapAnchor::sameKey(const cache_key *const aKey) const { const uint64_t *const k = reinterpret_cast(aKey); return k[0] == key[0] && k[1] == key[1]; } void -Ipc::StoreMapSlot::set(const StoreEntry &from) +Ipc::StoreMapAnchor::set(const StoreEntry &from) { + assert(state == Writeable); memcpy(key, from.key, sizeof(key)); - // XXX: header = aHeader; basics.timestamp = from.timestamp; basics.lastref = from.lastref; basics.expires = from.expires; @@ -301,10 +428,21 @@ Ipc::StoreMapSlot::set(const StoreEntry &from) basics.flags = from.flags; } +void +Ipc::StoreMapAnchor::rewind() +{ + assert(state == Writeable); + start = 0; + memset(&key, 0, sizeof(key)); + memset(&basics, 0, sizeof(basics)); + // but keep the lock +} + /* Ipc::StoreMap::Shared */ Ipc::StoreMap::Shared::Shared(const int aLimit, const size_t anExtrasSize): - limit(aLimit), extrasSize(anExtrasSize), count(0), slots(aLimit) + limit(aLimit), extrasSize(anExtrasSize), count(0), victim(0), + slots(aLimit) { } @@ -317,6 +455,6 @@ Ipc::StoreMap::Shared::sharedMemorySize() const size_t Ipc::StoreMap::Shared::SharedMemorySize(const int limit, const size_t extrasSize) { - return sizeof(Shared) + limit * (sizeof(Slot) + extrasSize); + return sizeof(Shared) + limit * (sizeof(StoreMapSlot) + extrasSize); } diff --git a/src/ipc/StoreMap.h b/src/ipc/StoreMap.h index 444073a359..bacdad0276 100644 --- a/src/ipc/StoreMap.h +++ b/src/ipc/StoreMap.h @@ -4,23 +4,29 @@ #include "ipc/ReadWriteLock.h" #include "ipc/mem/FlexibleArray.h" #include "ipc/mem/Pointer.h" -#include "typedefs.h" +#include "ipc/StoreMapSlice.h" namespace Ipc { -/// a StoreMap element, holding basic shareable StoreEntry info -class StoreMapSlot +/// Maintains shareable information about a StoreEntry as a whole. +/// An anchor points to one or more StoreEntry slices. This is the +/// only lockable part of shared StoreEntry information, providing +/// protection for all StoreEntry slices. +class StoreMapAnchor { public: - StoreMapSlot(); + StoreMapAnchor(); - /// store StoreEntry key and basics + /// store StoreEntry key and basics for an inode slot void set(const StoreEntry &anEntry); void setKey(const cache_key *const aKey); bool sameKey(const cache_key *const aKey) const; + /// undo the effects of set(), setKey(), etc., but keep locks and state + void rewind(); + public: mutable ReadWriteLock lock; ///< protects slot data below Atomic::WordT waitingToBeFreed; ///< may be accessed w/o a lock @@ -38,6 +44,8 @@ public: uint16_t flags; } basics; + StoreMapSliceId start; ///< where the chain of StoreEntry slices begins + /// possible persistent states typedef enum { Empty, ///< ready for writing, with nothing of value @@ -47,14 +55,24 @@ public: State state; ///< current state }; +/// XXX: a hack to allocate one shared array for both anchors and slices +class StoreMapSlot { +public: + StoreMapAnchor anchor; + StoreMapSlice slice; +}; + class StoreMapCleaner; -/// map of StoreMapSlots indexed by their keys, with read/write slot locking +/// map of StoreMapSlots indexed by their keys, with read/write slice locking /// kids extend to store custom data class StoreMap { public: - typedef StoreMapSlot Slot; + typedef StoreMapAnchor Anchor; + typedef sfileno AnchorId; + typedef StoreMapSlice Slice; + typedef StoreMapSliceId SliceId; /// data shared across maps in different processes class Shared @@ -64,10 +82,11 @@ public: size_t sharedMemorySize() const; static size_t SharedMemorySize(const int limit, const size_t anExtrasSize); - const int limit; ///< maximum number of map slots - const size_t extrasSize; ///< size of slot extra data - Atomic::Word count; ///< current number of map slots - Ipc::Mem::FlexibleArray slots; ///< slots storage + const int limit; ///< maximum number of store entries + const size_t extrasSize; ///< size of slice extra data + Atomic::Word count; ///< current number of entries + Atomic::WordT victim; ///< starting point for purge search + Ipc::Mem::FlexibleArray slots; ///< storage }; public: @@ -78,31 +97,60 @@ public: StoreMap(const char *const aPath); - /// finds, reservers space for writing a new entry or returns nil - Slot *openForWriting(const cache_key *const key, sfileno &fileno); - /// successfully finish writing the entry + /// computes map entry position for a given entry key + sfileno anchorIndexByKey(const cache_key *const key) const; + + /// Like strcmp(mapped, new), but for store entry versions/timestamps. + /// Returns +2 if the mapped entry does not exist; -1/0/+1 otherwise. + /// Comparison may be inaccurate unless the caller is a lock holder. + int compareVersions(const sfileno oldFileno, time_t newVersion) const; + + /// finds, locks, and returns an anchor for an empty key position, + /// erasing the old entry (if any) + Anchor *openForWriting(const cache_key *const key, sfileno &fileno); + /// locks and returns an anchor for the empty fileno position; if + /// overwriteExisting is false and the position is not empty, returns nil + Anchor *openForWritingAt(sfileno fileno, bool overwriteExisting = true); + /// successfully finish creating or updating the entry at fileno pos void closeForWriting(const sfileno fileno, bool lockForReading = false); + /// unlock and "forget" openForWriting entry, making it Empty again + /// this call does not free entry slices so the caller has to do that + void forgetWritingEntry(const sfileno fileno); + + /// only works on locked entries; returns nil unless the slice is readable + const Anchor *peekAtReader(const sfileno fileno) const; + + /// if possible, free the entry and return true + /// otherwise mark it as waiting to be freed and return false + void freeEntry(const sfileno fileno); + + /// opens entry (identified by key) for reading, increments read level + const Anchor *openForReading(const cache_key *const key, sfileno &fileno); + /// opens entry (identified by sfileno) for reading, increments read level + const Anchor *openForReadingAt(const sfileno fileno); + /// closes open entry after reading, decrements read level + void closeForReading(const sfileno fileno); - /// only works on locked entries; returns nil unless the slot is readable - const Slot *peekAtReader(const sfileno fileno) const; + /// writeable slice within an entry chain created by openForWriting() + Slice &writeableSlice(const AnchorId anchorId, const SliceId sliceId); + /// readable slice within an entry chain opened by openForReading() + const Slice &readableSlice(const AnchorId anchorId, const SliceId sliceId) const; + /// writeable anchor for the entry created by openForWriting() + Anchor &writeableEntry(const AnchorId anchorId); - /// mark the slot as waiting to be freed and, if possible, free it - void free(const sfileno fileno); + /// called by lock holder to terminate either slice writing or reading + void abortIo(const sfileno fileno); - /// open slot for reading, increments read level - const Slot *openForReading(const cache_key *const key, sfileno &fileno); - /// open slot for reading, increments read level - const Slot *openForReadingAt(const sfileno fileno); - /// close slot after reading, decrements read level - void closeForReading(const sfileno fileno); + /// finds an unlocked entry and frees it or returns false + bool purgeOne(); - /// called by lock holder to terminate either slot writing or reading - void abortIo(const sfileno fileno); + /// copies slice to its designated position + void importSlice(const SliceId sliceId, const Slice &slice); - bool full() const; ///< there are no empty slots left - bool valid(const int n) const; ///< whether n is a valid slot coordinate - int entryCount() const; ///< number of used slots - int entryLimit() const; ///< maximum number of slots that can be used + bool full() const; ///< there are no empty slices left; XXX: remove as unused? + bool valid(const int n) const; ///< whether n is a valid slice coordinate + int entryCount() const; ///< number of writeable and readable entries + int entryLimit() const; ///< maximum entryCount() possible /// adds approximate current stats to the supplied ones void updateStats(ReadWriteLockStats &stats) const; @@ -116,16 +164,15 @@ protected: Mem::Pointer shared; private: - int slotIndexByKey(const cache_key *const key) const; - Slot &slotByKey(const cache_key *const key); + Anchor &anchorByKey(const cache_key *const key); - Slot *openForReading(Slot &s); + Anchor *openForReading(Slice &s); void abortWriting(const sfileno fileno); - void freeIfNeeded(Slot &s); - void freeLocked(Slot &s, bool keepLocked); + + void freeChain(const sfileno fileno, Anchor &inode, const bool keepLock); }; -/// StoreMap with extra slot data +/// StoreMap with extra slice data /// Note: ExtrasT must be POD, it is initialized with zeroes, no /// constructors or destructors are called template @@ -149,14 +196,14 @@ protected: ExtrasT *sharedExtras; ///< pointer to extras in shared memory }; -/// API for adjusting external state when dirty map slot is being freed +/// API for adjusting external state when dirty map slice is being freed class StoreMapCleaner { public: virtual ~StoreMapCleaner() {} - /// adjust slot-linked state before a locked Readable slot is erased - virtual void cleanReadable(const sfileno fileno) = 0; + /// adjust slice-linked state before a locked Readable slice is erased + virtual void noteFreeMapSlice(const sfileno sliceId) = 0; }; // StoreMapWithExtras implementation diff --git a/src/ipc/StoreMapSlice.cc b/src/ipc/StoreMapSlice.cc new file mode 100644 index 0000000000..e6b4c1f73e --- /dev/null +++ b/src/ipc/StoreMapSlice.cc @@ -0,0 +1,12 @@ +/* + * DEBUG: section 54 Interprocess Communication + */ + +#include "squid.h" +#include "ipc/StoreMapSlice.h" +#include "tools.h" + +Ipc::StoreMapSlice::StoreMapSlice() +{ + memset(this, 0, sizeof(*this)); +} diff --git a/src/ipc/StoreMapSlice.h b/src/ipc/StoreMapSlice.h new file mode 100644 index 0000000000..1ba775a4d1 --- /dev/null +++ b/src/ipc/StoreMapSlice.h @@ -0,0 +1,24 @@ +#ifndef SQUID_IPC_STORE_MAP_SLICE_H +#define SQUID_IPC_STORE_MAP_SLICE_H + +#include "typedefs.h" + +namespace Ipc +{ + +typedef uint32_t StoreMapSliceId; + +/// a piece of Store entry, linked to other pieces, forming a chain +class StoreMapSlice +{ +public: + StoreMapSlice(): next(0), /* location(0), */ size(0) {} + + StoreMapSliceId next; ///< ID of the next slice occupied by the entry +// uint32_t location; ///< slice contents location + uint32_t size; ///< slice contents size +}; + +} // namespace Ipc + +#endif /* SQUID_IPC_STORE_MAP_SLICE_H */