From: Alex Rousskov Date: Sat, 9 Apr 2011 04:24:06 +0000 (-0600) Subject: Split Rock-only Rock::DirMap into Rock::DirMap and reusable Ipc pieces X-Git-Tag: take06~46 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=44c95fcf593ff4c2e27107d0f6d4091840737032;p=thirdparty%2Fsquid.git Split Rock-only Rock::DirMap into Rock::DirMap and reusable Ipc pieces which a shared memory cache implementation can use: Ipc::StoreMap is responsible for maintaining a collection of lockable slots, each with readable/writeable/free state and a "waiting to be free" flag. Kids of this class can add more metadata (in parallel structures using the same index as primary slots). I tried extending the slots themselves, but that turned out to be more complex/messy. Ipc::ReadWriteLock is a basic multiple readers, single writer lock. Its earlier implementation inside Rock::DirMap mixed slot locking and slot state/flags. That simplified the caller code a little, but the current simpler class is easier to understand and reuse. Rock::DirMap now just adds Rock::DbCellHeader metadata to Ipc::StoreMap slots. Simplified mapping API by reducing the number of similar-but-different methods. For example, instead of putAt, the caller can use an openForWriting/closeForWriting pair. This helps with moving custom metadata manipulations outside of the reusable Ipc::StoreMap. It would be possible to split Ipc::StoreMap further by moving Store-specific bits outside of its slots. Currently, there is no need for that though. --- diff --git a/src/fs/rock/RockDirMap.cc b/src/fs/rock/RockDirMap.cc index 64c4458e49..160afd73eb 100644 --- a/src/fs/rock/RockDirMap.cc +++ b/src/fs/rock/RockDirMap.cc @@ -10,242 +10,34 @@ #include "fs/rock/RockDirMap.h" Rock::DirMap::DirMap(const char *const aPath, const int limit): - path(aPath), shm(aPath) + Ipc::StoreMap(aPath, limit, Shared::MemSize(limit)) { - shm.create(SharedSize(limit)); assert(shm.mem()); - shared = new (shm.mem()) Shared(limit); - debugs(79, 5, HERE << "] new map [" << path << "] created using a new " - "shared memory segment for cache_dir '" << path << "' with limit=" << - entryLimit()); + shared = new (shm.reserve(Shared::MemSize(limit))) Shared; } Rock::DirMap::DirMap(const char *const aPath): - path(aPath), shm(aPath) + Ipc::StoreMap(aPath) { - shm.open(); + const int limit = entryLimit(); assert(shm.mem()); - shared = reinterpret_cast(shm.mem()); - debugs(79, 5, HERE << "] new map [" << path << "] created using existing " - "shared memory segment for cache_dir '" << path << "' with limit=" << - entryLimit()); + shared = reinterpret_cast(shm.reserve(Shared::MemSize(limit))); } -Rock::StoreEntryBasics * -Rock::DirMap::openForWriting(const cache_key *const key, sfileno &fileno) +Rock::DbCellHeader & +Rock::DirMap::header(const sfileno fileno) { - debugs(79, 5, HERE << " trying to open slot for key " << storeKeyText(key) - << " for writing in map [" << path << ']'); - const int idx = slotIdx(key); - Slot &s = shared->slots[idx]; - - if (s.exclusiveLock()) { - assert(s.state != Slot::Writeable); // until we start breaking locks - if (s.state == Slot::Empty) // we may also overwrite a Readable slot - ++shared->count; - s.state = Slot::Writeable; - s.setKey(key); - fileno = idx; - debugs(79, 5, HERE << " opened slot at " << fileno << " for key " << - storeKeyText(key) << " for writing in map [" << path << ']'); - return &s.seBasics; // and keep the entry locked - } - - debugs(79, 5, HERE << " failed to open slot at " << idx << " for key " << storeKeyText(key) - << " for writing in map [" << path << ']'); - return NULL; -} - -void -Rock::DirMap::closeForWriting(const sfileno fileno) -{ - debugs(79, 5, HERE << " closing slot at " << fileno << " for writing and " - "openning for reading in map [" << path << ']'); - assert(valid(fileno)); - Slot &s = shared->slots[fileno]; - assert(s.state.swap_if(Slot::Writeable, Slot::Readable)); - s.switchExclusiveToSharedLock(); -} - -/// terminate writing the entry, freeing its slot for others to use -void -Rock::DirMap::abortWriting(const sfileno fileno) -{ - debugs(79, 5, HERE << " abort writing slot at " << fileno << - " in map [" << path << ']'); - assert(valid(fileno)); - Slot &s = shared->slots[fileno]; - assert(s.state == Slot::Writeable); - freeLocked(s); -} - -void -Rock::DirMap::abortIo(const sfileno fileno) -{ - debugs(79, 5, HERE << " abort I/O for slot at " << fileno << - " in map [" << path << ']'); - assert(valid(fileno)); - Slot &s = shared->slots[fileno]; - - // The caller is a lock holder. Thus, if we are Writeable, then the - // caller must be the writer; otherwise the caller must be the reader. - if (s.state == Slot::Writeable) - abortWriting(fileno); - else - closeForReading(fileno); -} - -const Rock::StoreEntryBasics * -Rock::DirMap::peekAtReader(const sfileno fileno) const -{ - assert(valid(fileno)); - const Slot &s = shared->slots[fileno]; - switch (s.state) { - case Slot::Readable: - return &s.seBasics; // immediate access by lock holder so no locking - case Slot::Writeable: - return NULL; // cannot read the slot when it is being written - case Slot::Empty: - assert(false); // must be locked for reading or writing - } - assert(false); // not reachable - return NULL; -} - -bool -Rock::DirMap::putAt(const DbCellHeader &header, const StoreEntry &e, - const sfileno fileno) -{ - const cache_key *key = static_cast(e.key); - debugs(79, 5, HERE << " trying to open slot for key " << storeKeyText(key) - << " for putting in map [" << path << ']'); - if (!valid(fileno)) { - debugs(79, 5, HERE << "failure: bad fileno: " << fileno); - return false; - } - - const int idx = slotIdx(key); - if (fileno != idx) { - debugs(79, 5, HERE << "failure: hash changed: " << idx << " vs. " << - fileno); - return false; - } - - Slot &s = shared->slots[idx]; - - if (s.exclusiveLock()) { - assert(s.state != Slot::Writeable); // until we start breaking locks - if (s.state == Slot::Empty) // we may also overwrite a Readable slot - ++shared->count; - s.setKey(static_cast(e.key)); - s.seBasics.set(header, e); - s.state = Slot::Readable; - s.releaseExclusiveLock(); - debugs(79, 5, HERE << " put slot at " << fileno << " for key " << - storeKeyText(key) << " in map [" << path << ']'); - return true; - } - - debugs(79, 5, HERE << " failed to open slot for key " << storeKeyText(key) - << " for putting in map [" << path << ']'); - return false; -} - -void -Rock::DirMap::free(const sfileno fileno) -{ - debugs(79, 5, HERE << " marking slot at " << fileno << " to be freed in" - " map [" << path << ']'); - - assert(valid(fileno)); - Slot &s = shared->slots[fileno]; - s.waitingToBeFreed = true; // mark, regardless of whether we can free - freeIfNeeded(s); -} - -const Rock::StoreEntryBasics * -Rock::DirMap::openForReading(const cache_key *const key, sfileno &fileno) -{ - debugs(79, 5, HERE << " trying to open slot for key " << storeKeyText(key) - << " for reading in map [" << path << ']'); - const int idx = slotIdx(key); - if (const StoreEntryBasics *const seBasics = openForReadingAt(idx)) { - Slot &s = shared->slots[idx]; - if (s.checkKey(key)) { - fileno = idx; - debugs(79, 5, HERE << " opened slot at " << fileno << " for key " - << storeKeyText(key) << " for reading in map [" << path << - ']'); - return seBasics; - } - s.releaseSharedLock(); - freeIfNeeded(s); - } - debugs(79, 5, HERE << " failed to open slot for key " << storeKeyText(key) - << " for reading in map [" << path << ']'); - return 0; + assert(0 <= fileno && fileno < entryLimit()); + assert(shared); + return shared->headers[fileno]; } -const Rock::StoreEntryBasics * -Rock::DirMap::openForReadingAt(const sfileno fileno) +const Rock::DbCellHeader & +Rock::DirMap::header(const sfileno fileno) const { - debugs(79, 5, HERE << " trying to open slot at " << fileno << " for " - "reading in map [" << path << ']'); - assert(valid(fileno)); - Slot &s = shared->slots[fileno]; - if (s.sharedLock()) { - debugs(79, 5, HERE << " opened slot at " << fileno << " for reading in" - " map [" << path << ']'); - return &s.seBasics; - } - freeIfNeeded(s); - debugs(79, 5, HERE << " failed to open slot at " << fileno << " for " - "reading in map [" << path << ']'); - return 0; -} - -void -Rock::DirMap::closeForReading(const sfileno fileno) -{ - debugs(79, 5, HERE << " closing slot at " << fileno << " for reading in " - "map [" << path << ']'); - assert(valid(fileno)); - Slot &s = shared->slots[fileno]; - assert(s.state == Slot::Readable); - s.releaseSharedLock(); - freeIfNeeded(s); -} - -int -Rock::DirMap::entryLimit() const -{ - return shared->limit; -} - -int -Rock::DirMap::entryCount() const -{ - return shared->count; -} - -bool -Rock::DirMap::full() const -{ - return entryCount() >= entryLimit(); -} - -void -Rock::DirMap::updateStats(MapStats &stats) const -{ - stats.capacity += shared->limit; - for (int i = 0; i < shared->limit; ++i) - shared->slots[i].updateStats(stats); -} - -bool -Rock::DirMap::valid(const int pos) const -{ - return 0 <= pos && pos < entryLimit(); + assert(0 <= fileno && fileno < entryLimit()); + assert(shared); + return shared->headers[fileno]; } int @@ -255,184 +47,8 @@ Rock::DirMap::AbsoluteEntryLimit() return sfilenoMax; } -int -Rock::DirMap::slotIdx(const cache_key *const key) const -{ - const uint64_t *const k = reinterpret_cast(key); - // TODO: use a better hash function - return (k[0] + k[1]) % shared->limit; -} - -Rock::Slot & -Rock::DirMap::slot(const cache_key *const key) -{ - return shared->slots[slotIdx(key)]; -} - -/// frees the slot if (b) it is waiting to be freed and (a) we can lock it -void -Rock::DirMap::freeIfNeeded(Slot &s) +size_t +Rock::DirMap::Shared::MemSize(int limit) { - if (s.exclusiveLock()) { - if (s.waitingToBeFreed == true) - freeLocked(s); - else - s.releaseExclusiveLock(); - } -} - -/// unconditionally frees the already exclusively locked slot and releases lock -void -Rock::DirMap::freeLocked(Slot &s) -{ - memset(s.key_, 0, sizeof(s.key_)); - memset(&s.seBasics, 0, sizeof(s.seBasics)); - s.waitingToBeFreed = false; - s.state = Slot::Empty; - s.releaseExclusiveLock(); - --shared->count; - debugs(79, 5, HERE << " freed slot at " << (&s - shared->slots) << - " in map [" << path << ']'); -} - -int -Rock::DirMap::SharedSize(const int limit) -{ - return sizeof(Shared) + limit * sizeof(Slot); -} - - -/* Rock::Slot */ - -void -Rock::Slot::setKey(const cache_key *const aKey) -{ - memcpy(key_, aKey, sizeof(key_)); -} - -bool -Rock::Slot::checkKey(const cache_key *const aKey) const -{ - const uint32_t *const k = reinterpret_cast(aKey); - return k[0] == key_[0] && k[1] == key_[1] && - k[2] == key_[2] && k[3] == key_[3]; -} - - -bool -Rock::Slot::sharedLock() const -{ - ++readers; // this locks new writers out - if (state == Readable && !writers && !waitingToBeFreed) - return true; - --readers; - return false; -} - -bool -Rock::Slot::exclusiveLock() -{ - if (!writers++) { // we are the first writer (this locks new readers out) - if (!readers) // there are no old readers - return true; - } - --writers; - return false; -} - -void -Rock::Slot::releaseSharedLock() const -{ - assert(readers-- > 0); -} - -void -Rock::Slot::releaseExclusiveLock() -{ - assert(writers-- > 0); -} - -void -Rock::Slot::switchExclusiveToSharedLock() -{ - ++readers; // must be done before we release exclusive control - releaseExclusiveLock(); -} - -void -Rock::Slot::updateStats(MapStats &stats) const -{ - switch (state) { - case Readable: - ++stats.readable; - stats.readers += readers; - break; - case Writeable: - ++stats.writeable; - stats.writers += writers; - break; - case Empty: - ++stats.empty; - break; - } - - if (waitingToBeFreed) - ++stats.marked; -} - - -Rock::DirMap::Shared::Shared(const int aLimit): limit(aLimit), count(0) -{ -} - -void -Rock::StoreEntryBasics::set(const DbCellHeader &aHeader, const StoreEntry &from) -{ - assert(from.swap_file_sz > 0); - memset(this, 0, sizeof(*this)); - header = aHeader; - timestamp = from.timestamp; - lastref = from.lastref; - expires = from.expires; - lastmod = from.lastmod; - swap_file_sz = from.swap_file_sz; - refcount = from.refcount; - flags = from.flags; -} - - -/* MapStats */ - -Rock::MapStats::MapStats() -{ - memset(this, 0, sizeof(*this)); -} - -void -Rock::MapStats::dump(StoreEntry &e) const -{ - storeAppendPrintf(&e, "Available slots: %9d\n", capacity); - - if (!capacity) - return; - - storeAppendPrintf(&e, "Readable slots: %9d %6.2f%%\n", - readable, (100.0 * readable / capacity)); - storeAppendPrintf(&e, "Filling slots: %9d %6.2f%%\n", - writeable, (100.0 * writeable / capacity)); - storeAppendPrintf(&e, "Empty slots: %9d %6.2f%%\n", - empty, (100.0 * empty / capacity)); - - if (readers || writers) { - const int locks = readers + writers; - storeAppendPrintf(&e, "Readers: %9d %6.2f%%\n", - readers, (100.0 * readers / locks)); - storeAppendPrintf(&e, "Writers: %9d %6.2f%%\n", - writers, (100.0 * writers / locks)); - } - - if (readable + writeable) { - storeAppendPrintf(&e, "Marked slots: %9d %6.2f%%\n", - marked, (100.0 * marked / (readable + writeable))); - } + return sizeof(Shared) + limit*sizeof(DbCellHeader); } diff --git a/src/fs/rock/RockDirMap.h b/src/fs/rock/RockDirMap.h index 6317e61aa9..cbba2f3773 100644 --- a/src/fs/rock/RockDirMap.h +++ b/src/fs/rock/RockDirMap.h @@ -2,142 +2,33 @@ #define SQUID_FS_ROCK_DIR_MAP_H #include "fs/rock/RockFile.h" -#include "ipc/AtomicWord.h" -#include "ipc/mem/Segment.h" +#include "ipc/StoreMap.h" namespace Rock { -class StoreEntryBasics { -public: - void set(const DbCellHeader &aHeader, const StoreEntry &anEntry); - - DbCellHeader header; ///< rock-specific entry metadata - - /* START OF ON-DISK STORE_META_STD TLV field */ - time_t timestamp; - time_t lastref; - time_t expires; - time_t lastmod; - uint64_t swap_file_sz; - u_short refcount; - u_short flags; - /* END OF ON-DISK STORE_META_STD */ -}; - -/// aggregates basic map performance measurements; all numbers are approximate -class MapStats { -public: - MapStats(); - - void dump(StoreEntry &e) const; - - int capacity; ///< the total number of slots in the map - int readable; ///< number of slots in Readable state - int writeable; ///< number of slots in Writeable state - int empty; ///< number of slots in Empty state - int readers; ///< sum of slot.readers - int writers; ///< sum of slot.writers - int marked; ///< number of slots marked for freeing -}; - -/// DirMap entry -class Slot { -public: - /// possible persistent states - typedef enum { - Empty, ///< ready for writing, with nothing of value - Writeable, ///< transitions from Empty to Readable - Readable, ///< ready for reading - } State; - - void setKey(const cache_key *const aKey); - bool checkKey(const cache_key *const aKey) const; - - bool sharedLock() const; ///< lock for reading or return false - bool exclusiveLock(); ///< lock for modification or return false - void releaseSharedLock() const; ///< undo successful sharedLock() - void releaseExclusiveLock(); ///< undo successful exclusiveLock() - void switchExclusiveToSharedLock(); ///< trade exclusive for shared access - - /// adds approximate current stats to the supplied ones - void updateStats(MapStats &stats) const; - -public: - // we want two uint64_t, but older GCCs lack __sync_fetch_and_add_8 - AtomicWordT key_[4]; ///< MD5 entry key - StoreEntryBasics seBasics; ///< basic store entry data - AtomicWordT state; ///< current state - AtomicWordT waitingToBeFreed; ///< a state-independent mark - -private: - mutable AtomicWord readers; ///< number of users trying to read - AtomicWord writers; ///< number of writers trying to modify the slot -}; - /// \ingroup Rock /// map of used db slots indexed by sfileno -class DirMap +class DirMap: public Ipc::StoreMap { public: DirMap(const char *const aPath, const int limit); ///< create a new shared DirMap DirMap(const char *const aPath); ///< open an existing shared DirMap - /// finds/reservers space for writing a new entry or returns nil - StoreEntryBasics *openForWriting(const cache_key *const key, sfileno &fileno); - /// successfully finish writing the entry, leaving it opened for reading - void closeForWriting(const sfileno fileno); - - /// stores entry info at the requested slot or returns false - bool putAt(const DbCellHeader &header, const StoreEntry &e, const sfileno fileno); - - /// only works on locked entries; returns nil unless the slot is readable - const StoreEntryBasics *peekAtReader(const sfileno fileno) const; - - /// mark the slot as waiting to be freed and, if possible, free it - void free(const sfileno fileno); - - /// open slot for reading, increments read level - const StoreEntryBasics *openForReading(const cache_key *const key, sfileno &fileno); - /// open slot for reading, increments read level - const StoreEntryBasics *openForReadingAt(const sfileno fileno); - /// close slot after reading, decrements read level - void closeForReading(const sfileno fileno); - - /// called by lock holder to terminate either slot writing or reading - void abortIo(const sfileno fileno); - - bool full() const; ///< there are no empty slots left - bool valid(int n) const; ///< whether n is a valid slot coordinate - int entryCount() const; ///< number of used slots - int entryLimit() const; ///< maximum number of slots that can be used - - /// adds approximate current stats to the supplied ones - void updateStats(MapStats &stats) const; + /// write access to the cell header; call openForWriting() first! + DbCellHeader &header(const sfileno fileno); + /// read-only access to the cell header; call openForReading() first! + const DbCellHeader &header(const sfileno fileno) const; static int AbsoluteEntryLimit(); ///< maximum entryLimit() possible private: - struct Shared { - Shared(const int aLimit); - - const AtomicWord limit; ///< maximum number of map slots - AtomicWord count; ///< current number of map slots - - Slot slots[]; ///< slots storage + /// data shared by all DirMaps with the same path + class Shared { + public: + static size_t MemSize(int limit); + DbCellHeader headers[]; ///< DbCellHeaders storage }; - int slotIdx(const cache_key *const key) const; - Slot &slot(const cache_key *const key); - const StoreEntryBasics *openForReading(Slot &s); - void abortWriting(const sfileno fileno); - void freeIfNeeded(Slot &s); - void freeLocked(Slot &s); - String sharedMemoryName(); - - static int SharedSize(const int limit); - - const String path; ///< cache_dir path, used for logging - Ipc::Mem::Segment shm; ///< shared memory segment Shared *shared; ///< pointer to shared memory }; diff --git a/src/fs/rock/RockSwapDir.cc b/src/fs/rock/RockSwapDir.cc index 988bce78f2..49cef04cb4 100644 --- a/src/fs/rock/RockSwapDir.cc +++ b/src/fs/rock/RockSwapDir.cc @@ -45,22 +45,24 @@ Rock::SwapDir::get(const cache_key *key) return NULL; sfileno fileno; - const StoreEntryBasics *const basics = map->openForReading(key, fileno); - if (!basics) + const Ipc::StoreMapSlot *const slot = map->openForReading(key, fileno); + if (!slot) return NULL; + const Ipc::StoreMapSlot::Basics &basics = slot->basics; + // create a brand new store entry and initialize it with stored basics StoreEntry *e = new StoreEntry(); e->lock_count = 0; e->swap_dirn = index; e->swap_filen = fileno; - e->swap_file_sz = basics->swap_file_sz; - e->lastref = basics->lastref; - e->timestamp = basics->timestamp; - e->expires = basics->expires; - e->lastmod = basics->lastmod; - e->refcount = basics->refcount; - e->flags = basics->flags; + e->swap_file_sz = basics.swap_file_sz; + e->lastref = basics.lastref; + e->timestamp = basics.timestamp; + e->expires = basics.expires; + e->lastmod = basics.lastmod; + e->refcount = basics.refcount; + e->flags = basics.flags; e->store_status = STORE_OK; e->setMemStatus(NOT_IN_MEMORY); e->swap_status = SWAPOUT_DONE; @@ -302,10 +304,16 @@ Rock::SwapDir::addEntry(const int fileno, const DbCellHeader &header, const Stor ", fileno="<< std::setfill('0') << std::hex << std::uppercase << std::setw(8) << fileno); - if (map->putAt(header, from, fileno)) { - // we do not add this entry to store_table so core will not updateSize - updateSize(from.swap_file_sz, +1); - return true; + sfileno newLocation = 0; + if (Ipc::StoreMapSlot *slot = map->openForWriting(reinterpret_cast(from.key), newLocation)) { + if (fileno == newLocation) { + slot->set(from); + map->header(fileno) = header; + // core will not updateSize: we do not add the entry to store_table + updateSize(from.swap_file_sz, +1); + } // else some other, newer entry got into our cell + map->closeForWriting(newLocation, false); + return fileno == newLocation; } return false; @@ -350,14 +358,15 @@ Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreI assert(payloadEnd <= max_objsize); sfileno fileno; - StoreEntryBasics *const basics = + Ipc::StoreMapSlot *const slot = map->openForWriting(reinterpret_cast(e.key), fileno); - if (!basics) { + if (!slot) { debugs(47, 5, HERE << "Rock::SwapDir::createStoreIO: map->add failed"); return NULL; } e.swap_file_sz = header.payloadSize; // and will be copied to the map - basics->set(header, e); + slot->set(e); + map->header(fileno) = header; // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno. // If that does not happen, the entry will not decrement the read level! @@ -412,23 +421,23 @@ Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOS // The are two ways an entry can get swap_filen: our get() locked it for // reading or our storeSwapOutStart() locked it for writing. Peeking at our // locked entry is safe, but no support for reading a filling entry. - const StoreEntryBasics *basics = map->peekAtReader(e.swap_filen); - if (!basics) + const Ipc::StoreMapSlot *slot = map->peekAtReader(e.swap_filen); + if (!slot) return NULL; // we were writing afterall IoState *sio = new IoState(this, &e, cbFile, cbIo, data); sio->swap_dirn = index; sio->swap_filen = e.swap_filen; - sio->payloadEnd = sizeof(DbCellHeader) + basics->header.payloadSize; + sio->payloadEnd = sizeof(DbCellHeader) + map->header(e.swap_filen).payloadSize; assert(sio->payloadEnd <= max_objsize); // the payload fits the slot debugs(47,5, HERE << "dir " << index << " has old fileno: " << std::setfill('0') << std::hex << std::uppercase << std::setw(8) << sio->swap_filen); - assert(basics->swap_file_sz > 0); - assert(basics->swap_file_sz == e.swap_file_sz); + assert(slot->basics.swap_file_sz > 0); + assert(slot->basics.swap_file_sz == e.swap_file_sz); sio->diskOffset = diskOffset(sio->swap_filen); assert(sio->diskOffset + sio->payloadEnd <= diskOffsetLimit()); @@ -499,7 +508,7 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest if (errflag == DISK_OK) { // close, assuming we only write once; the entry gets the read lock - map->closeForWriting(sio.swap_filen); + map->closeForWriting(sio.swap_filen, true); // do not increment sio.offset_ because we do it in sio->write() } else { // Do not abortWriting here. The entry should keep the write lock @@ -650,7 +659,7 @@ Rock::SwapDir::statfs(StoreEntry &e) const entryCount, (100.0 * entryCount / limit)); if (limit < 100) { // XXX: otherwise too expensive to count - MapStats stats; + Ipc::ReadWriteLockStats stats; map->updateStats(stats); stats.dump(e); } diff --git a/src/ipc/Makefile.am b/src/ipc/Makefile.am index 43b61e4c91..147917690e 100644 --- a/src/ipc/Makefile.am +++ b/src/ipc/Makefile.am @@ -14,8 +14,12 @@ libipc_la_SOURCES = \ Messages.h \ Queue.cc \ Queue.h \ + ReadWriteLock.cc \ + ReadWriteLock.h \ StartListening.cc \ StartListening.h \ + StoreMap.cc \ + StoreMap.h \ StrandCoord.cc \ StrandCoord.h \ StrandCoords.h \ diff --git a/src/ipc/ReadWriteLock.cc b/src/ipc/ReadWriteLock.cc new file mode 100644 index 0000000000..9fef9c1706 --- /dev/null +++ b/src/ipc/ReadWriteLock.cc @@ -0,0 +1,97 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + */ + +#include "squid.h" + +#include "Store.h" +#include "ipc/ReadWriteLock.h" + +bool +Ipc::ReadWriteLock::lockShared() +{ + ++readers; // this locks "new" writers out + if (!writers) // there are no old writers + return true; + --readers; + return false; +} + +bool +Ipc::ReadWriteLock::lockExclusive() +{ + if (!writers++) { // we are the first writer + this locks "new" readers out + if (!readers) // there are no old readers + return true; + } + --writers; + return false; +} + +void +Ipc::ReadWriteLock::unlockShared() +{ + assert(readers-- > 0); +} + +void +Ipc::ReadWriteLock::unlockExclusive() +{ + assert(writers-- > 0); +} + +void +Ipc::ReadWriteLock::switchExclusiveToShared() +{ + ++readers; // must be done before we release exclusive control + unlockExclusive(); +} + +void +Ipc::ReadWriteLock::updateStats(ReadWriteLockStats &stats) const +{ + if (readers) { + ++stats.readable; + stats.readers += readers; + } else if (writers) { + ++stats.writeable; + stats.writers += writers; + } else { + ++stats.idle; + } + ++stats.count; +} + + +/* Ipc::ReadWriteLockStats */ + +Ipc::ReadWriteLockStats::ReadWriteLockStats() +{ + memset(this, 0, sizeof(*this)); +} + +void +Ipc::ReadWriteLockStats::dump(StoreEntry &e) const +{ + storeAppendPrintf(&e, "Available locks: %9d\n", count); + + if (!count) + return; + + storeAppendPrintf(&e, "Reading: %9d %6.2f%%\n", + readable, (100.0 * readable / count)); + storeAppendPrintf(&e, "Writing: %9d %6.2f%%\n", + writeable, (100.0 * writeable / count)); + storeAppendPrintf(&e, "Idle: %9d %6.2f%%\n", + idle, (100.0 * idle / count)); + + if (readers || writers) { + const int locked = readers + writers; + storeAppendPrintf(&e, "Readers: %9d %6.2f%%\n", + readers, (100.0 * readers / locked)); + storeAppendPrintf(&e, "Writers: %9d %6.2f%%\n", + writers, (100.0 * writers / locked)); + } +} diff --git a/src/ipc/ReadWriteLock.h b/src/ipc/ReadWriteLock.h new file mode 100644 index 0000000000..91507dee46 --- /dev/null +++ b/src/ipc/ReadWriteLock.h @@ -0,0 +1,49 @@ +#ifndef SQUID_IPC_READ_WRITE_LOCK_H +#define SQUID_IPC_READ_WRITE_LOCK_H + +#include "ipc/AtomicWord.h" + +class StoreEntry; + +namespace Ipc { + +class ReadWriteLockStats; + +/// an atomic readers-writer or shared-exclusive lock suitable for maps/tables +class ReadWriteLock { +public: + // default constructor is OK because of shared memory zero-initialization + + bool lockShared(); ///< lock for reading or return false + bool lockExclusive(); ///< lock for modification or return false + void unlockShared(); ///< undo successful sharedLock() + void unlockExclusive(); ///< undo successful exclusiveLock() + void switchExclusiveToShared(); ///< stop writing, start reading + + /// adds approximate current stats to the supplied ones + void updateStats(ReadWriteLockStats &stats) const; + +public: + mutable AtomicWord readers; ///< number of users trying to read + AtomicWord writers; ///< number of writers trying to modify protected data +}; + + +/// approximate stats of a set of ReadWriteLocks +class ReadWriteLockStats { +public: + ReadWriteLockStats(); + + void dump(StoreEntry &e) const; + + int count; ///< the total number of locks + int readable; ///< number of locks locked for reading + int writeable; ///< number of locks locked for writing + int idle; ///< number of unlocked locks + int readers; ///< sum of lock.readers + int writers; ///< sum of lock.writers +}; + +} // namespace Ipc + +#endif /* SQUID_IPC_READ_WRITE_LOCK_H */ diff --git a/src/ipc/StoreMap.cc b/src/ipc/StoreMap.cc new file mode 100644 index 0000000000..1103477595 --- /dev/null +++ b/src/ipc/StoreMap.cc @@ -0,0 +1,312 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + */ + +#include "squid.h" + +#include "Store.h" +#include "ipc/StoreMap.h" + +Ipc::StoreMap::StoreMap(const char *const aPath, const int limit, + size_t sharedSizeExtra): + path(aPath), shm(aPath), shared(NULL) +{ + const size_t mySharedSize = Shared::MemSize(limit); + shm.create(mySharedSize + sharedSizeExtra); + shared = new (shm.reserve(mySharedSize)) Shared(limit); + debugs(54, 5, HERE << "new map [" << path << "] created"); +} + +Ipc::StoreMap::StoreMap(const char *const aPath): + path(aPath), shm(aPath), shared(NULL) +{ + shm.open(); + assert(shm.mem()); + shared = reinterpret_cast(shm.mem()); + // check that nobody used our segment chunk and that shared->limit is sane + assert(shared == reinterpret_cast(shm.reserve(Shared::MemSize(shared->limit)))); + debugs(54, 5, HERE << "attached map [" << path << "] created"); +} + +Ipc::StoreMap::Slot * +Ipc::StoreMap::openForWriting(const cache_key *const key, sfileno &fileno) +{ + debugs(54, 5, HERE << " trying to open slot for key " << storeKeyText(key) + << " for writing in map [" << path << ']'); + const int idx = slotIndexByKey(key); + + Slot &s = shared->slots[idx]; + ReadWriteLock &lock = s.lock; + + if (lock.lockExclusive()) { + assert(s.state != Slot::Writeable); // until we start breaking locks + + // free if the entry was dirty, keeping the entry locked + if (s.waitingToBeFreed == true) + freeLocked(s, true); + + if (s.state == Slot::Empty) // we may also overwrite a Readable slot + ++shared->count; + s.state = Slot::Writeable; + fileno = idx; + //s.setKey(key); // XXX: the caller should do that + debugs(54, 5, HERE << " opened slot at " << idx << + " for writing in map [" << path << ']'); + return &s; // and keep the entry locked + } + + debugs(54, 5, HERE << " failed to open slot at " << idx << + " for writing in map [" << path << ']'); + return NULL; +} + +void +Ipc::StoreMap::closeForWriting(const sfileno fileno, bool lockForReading) +{ + debugs(54, 5, HERE << " closing slot at " << fileno << " for writing and " + "openning for reading in map [" << path << ']'); + assert(valid(fileno)); + Slot &s = shared->slots[fileno]; + assert(s.state == Slot::Writeable); + s.state = Slot::Readable; + if (lockForReading) + s.lock.switchExclusiveToShared(); + else + s.lock.unlockExclusive(); +} + +/// terminate writing the entry, freeing its slot for others to use +void +Ipc::StoreMap::abortWriting(const sfileno fileno) +{ + debugs(54, 5, HERE << " abort writing slot at " << fileno << + " in map [" << path << ']'); + assert(valid(fileno)); + Slot &s = shared->slots[fileno]; + assert(s.state == Slot::Writeable); + freeLocked(s, false); +} + +void +Ipc::StoreMap::abortIo(const sfileno fileno) +{ + debugs(54, 5, HERE << " abort I/O for slot at " << fileno << + " in map [" << path << ']'); + assert(valid(fileno)); + Slot &s = shared->slots[fileno]; + + // The caller is a lock holder. Thus, if we are Writeable, then the + // caller must be the writer; otherwise the caller must be the reader. + if (s.state == Slot::Writeable) + abortWriting(fileno); + else + closeForReading(fileno); +} + +const Ipc::StoreMap::Slot * +Ipc::StoreMap::peekAtReader(const sfileno fileno) const +{ + assert(valid(fileno)); + const Slot &s = shared->slots[fileno]; + switch (s.state) { + case Slot::Readable: + return &s; // immediate access by lock holder so no locking + case Slot::Writeable: + return NULL; // cannot read the slot when it is being written + case Slot::Empty: + assert(false); // must be locked for reading or writing + } + assert(false); // not reachable + return NULL; +} + +void +Ipc::StoreMap::free(const sfileno fileno) +{ + debugs(54, 5, HERE << " marking slot at " << fileno << " to be freed in" + " map [" << path << ']'); + + assert(valid(fileno)); + Slot &s = shared->slots[fileno]; + + if (s.lock.lockExclusive()) + freeLocked(s, false); + else + s.waitingToBeFreed = true; // mark to free it later +} + +const Ipc::StoreMap::Slot * +Ipc::StoreMap::openForReading(const cache_key *const key, sfileno &fileno) +{ + debugs(54, 5, HERE << " trying to open slot for key " << storeKeyText(key) + << " for reading in map [" << path << ']'); + const int idx = slotIndexByKey(key); + if (const Slot *slot = openForReadingAt(idx)) { + if (slot->sameKey(key)) { + fileno = idx; + debugs(54, 5, HERE << " opened slot at " << fileno << " for key " + << storeKeyText(key) << " for reading in map [" << path << + ']'); + return slot; // locked for reading + } + slot->lock.unlockShared(); + } + debugs(54, 5, HERE << " failed to open slot for key " << storeKeyText(key) + << " for reading in map [" << path << ']'); + return NULL; +} + +const Ipc::StoreMap::Slot * +Ipc::StoreMap::openForReadingAt(const sfileno fileno) +{ + debugs(54, 5, HERE << " trying to open slot at " << fileno << " for " + "reading in map [" << path << ']'); + assert(valid(fileno)); + Slot &s = shared->slots[fileno]; + + if (!s.lock.lockShared()) { + debugs(54, 5, HERE << " failed to lock slot at " << fileno << " for " + "reading in map [" << path << ']'); + return NULL; + } + + if (s.state == Slot::Empty) { + s.lock.unlockShared(); + debugs(54, 7, HERE << " empty slot at " << fileno << " for " + "reading in map [" << path << ']'); + return NULL; + } + + if (s.waitingToBeFreed) { + s.lock.unlockShared(); + debugs(54, 7, HERE << " dirty slot at " << fileno << " for " + "reading in map [" << path << ']'); + return NULL; + } + + // cannot be Writing here if we got shared lock and checked Empty above + assert(s.state == Slot::Readable); + debugs(54, 5, HERE << " opened slot at " << fileno << " for reading in" + " map [" << path << ']'); + return &s; +} + +void +Ipc::StoreMap::closeForReading(const sfileno fileno) +{ + debugs(54, 5, HERE << " closing slot at " << fileno << " for reading in " + "map [" << path << ']'); + assert(valid(fileno)); + Slot &s = shared->slots[fileno]; + assert(s.state == Slot::Readable); + s.lock.unlockShared(); +} + +int +Ipc::StoreMap::entryLimit() const +{ + return shared->limit; +} + +int +Ipc::StoreMap::entryCount() const +{ + return shared->count; +} + +bool +Ipc::StoreMap::full() const +{ + return entryCount() >= entryLimit(); +} + +void +Ipc::StoreMap::updateStats(ReadWriteLockStats &stats) const +{ + for (int i = 0; i < shared->limit; ++i) + shared->slots[i].lock.updateStats(stats); +} + +bool +Ipc::StoreMap::valid(const int pos) const +{ + return 0 <= pos && pos < entryLimit(); +} + +int +Ipc::StoreMap::slotIndexByKey(const cache_key *const key) const +{ + const uint64_t *const k = reinterpret_cast(key); + // TODO: use a better hash function + return (k[0] + k[1]) % shared->limit; +} + +Ipc::StoreMap::Slot & +Ipc::StoreMap::slotByKey(const cache_key *const key) +{ + return shared->slots[slotIndexByKey(key)]; +} + +/// unconditionally frees the already exclusively locked slot and releases lock +void +Ipc::StoreMap::freeLocked(Slot &s, bool keepLocked) +{ + s.waitingToBeFreed = false; + s.state = Slot::Empty; + if (!keepLocked) + s.lock.unlockExclusive(); + --shared->count; + debugs(54, 5, HERE << " freed slot at " << (&s - shared->slots) << + " in map [" << path << ']'); +} + + +/* Ipc::StoreMapSlot */ + +Ipc::StoreMapSlot::StoreMapSlot(): state(Empty) +{ + xmemset(&key, 0, sizeof(key)); + xmemset(&basics, 0, sizeof(basics)); +} + +void +Ipc::StoreMapSlot::setKey(const cache_key *const aKey) +{ + memcpy(key, aKey, sizeof(key)); +} + +bool +Ipc::StoreMapSlot::sameKey(const cache_key *const aKey) const +{ + const uint64_t *const k = reinterpret_cast(aKey); + return k[0] == key[0] && k[1] == key[1]; +} + +void +Ipc::StoreMapSlot::set(const StoreEntry &from) +{ + memcpy(key, from.key, sizeof(key)); + // XXX: header = aHeader; + basics.timestamp = from.timestamp; + basics.lastref = from.lastref; + basics.expires = from.expires; + basics.lastmod = from.lastmod; + basics.swap_file_sz = from.swap_file_sz; + basics.refcount = from.refcount; + basics.flags = from.flags; +} + +/* Ipc::StoreMap::Shared */ + +Ipc::StoreMap::Shared::Shared(const int aLimit): limit(aLimit), count(0) +{ +} + +size_t +Ipc::StoreMap::Shared::MemSize(int limit) +{ + return sizeof(Shared) + limit * sizeof(Slot); +} + diff --git a/src/ipc/StoreMap.h b/src/ipc/StoreMap.h new file mode 100644 index 0000000000..3489d2891f --- /dev/null +++ b/src/ipc/StoreMap.h @@ -0,0 +1,120 @@ +#ifndef SQUID_IPC_STORE_MAP_H +#define SQUID_IPC_STORE_MAP_H + +#include "ipc/ReadWriteLock.h" +#include "ipc/mem/Segment.h" + +namespace Ipc { + +/// a StoreMap element, holding basic shareable StoreEntry info +class StoreMapSlot { +public: + StoreMapSlot(); + + /// store StoreEntry key and basics + void set(const StoreEntry &anEntry); + + void setKey(const cache_key *const aKey); + bool sameKey(const cache_key *const aKey) const; + +public: + mutable ReadWriteLock lock; ///< protects slot data below + AtomicWordT waitingToBeFreed; ///< may be accessed w/o a lock + + uint64_t key[2]; ///< StoreEntry key + + // STORE_META_STD TLV field from StoreEntry + struct Basics { + time_t timestamp; + time_t lastref; + time_t expires; + time_t lastmod; + uint64_t swap_file_sz; + u_short refcount; + u_short flags; + } basics; + + /// possible persistent states + typedef enum { + Empty, ///< ready for writing, with nothing of value + Writeable, ///< transitions from Empty to Readable + Readable, ///< ready for reading + } State; + State state; ///< current state +}; + +/// map of StoreMapSlots indexed by their keys, with read/write slot locking +/// kids extend to store custom data +class StoreMap +{ +public: + typedef StoreMapSlot Slot; + + StoreMap(const char *const aPath, const int limit, size_t sharedSizeExtra); ///< create a new shared StoreMap + explicit StoreMap(const char *const aPath); ///< open an existing shared StoreMap + + /// finds, reservers space for writing a new entry or returns nil + Slot *openForWriting(const cache_key *const key, sfileno &fileno); + /// successfully finish writing the entry + void closeForWriting(const sfileno fileno, bool lockForReading = false); + + /// only works on locked entries; returns nil unless the slot is readable + const Slot *peekAtReader(const sfileno fileno) const; + + /// mark the slot as waiting to be freed and, if possible, free it + void free(const sfileno fileno); + + /// open slot for reading, increments read level + const Slot *openForReading(const cache_key *const key, sfileno &fileno); + /// open slot for reading, increments read level + const Slot *openForReadingAt(const sfileno fileno); + /// close slot after reading, decrements read level + void closeForReading(const sfileno fileno); + + /// called by lock holder to terminate either slot writing or reading + void abortIo(const sfileno fileno); + + bool full() const; ///< there are no empty slots left + bool valid(int n) const; ///< whether n is a valid slot coordinate + int entryCount() const; ///< number of used slots + int entryLimit() const; ///< maximum number of slots that can be used + + /// adds approximate current stats to the supplied ones + void updateStats(ReadWriteLockStats &stats) const; + +protected: + class Shared { + public: + static size_t MemSize(int limit); + + Shared(const int aLimit); + + const AtomicWord limit; ///< maximum number of map slots + AtomicWord count; ///< current number of map slots + + Slot slots[]; ///< slots storage + }; + +protected: + const String path; ///< cache_dir path, used for logging + Ipc::Mem::Segment shm; ///< shared memory segment + +private: + int slotIndexByKey(const cache_key *const key) const; + Slot &slotByKey(const cache_key *const key); + + Slot *openForReading(Slot &s); + void abortWriting(const sfileno fileno); + void freeIfNeeded(Slot &s); + void freeLocked(Slot &s, bool keepLocked); + String sharedMemoryName(); + + Shared *shared; ///< pointer to shared memory +}; + +} // namespace Ipc + +// We do not reuse struct _fileMap because we cannot control its size, +// resulting in sfilenos that are pointing beyond the database. + +#endif /* SQUID_IPC_STORE_MAP_H */