#include "fs/rock/RockDirMap.h"
Rock::DirMap::DirMap(const char *const aPath, const int limit):
- path(aPath), shm(aPath)
+ Ipc::StoreMap(aPath, limit, Shared::MemSize(limit))
{
- shm.create(SharedSize(limit));
assert(shm.mem());
- shared = new (shm.mem()) Shared(limit);
- debugs(79, 5, HERE << "] new map [" << path << "] created using a new "
- "shared memory segment for cache_dir '" << path << "' with limit=" <<
- entryLimit());
+ shared = new (shm.reserve(Shared::MemSize(limit))) Shared;
}
Rock::DirMap::DirMap(const char *const aPath):
- path(aPath), shm(aPath)
+ Ipc::StoreMap(aPath)
{
- shm.open();
+ const int limit = entryLimit();
assert(shm.mem());
- shared = reinterpret_cast<Shared *>(shm.mem());
- debugs(79, 5, HERE << "] new map [" << path << "] created using existing "
- "shared memory segment for cache_dir '" << path << "' with limit=" <<
- entryLimit());
+ shared = reinterpret_cast<Shared *>(shm.reserve(Shared::MemSize(limit)));
}
-Rock::StoreEntryBasics *
-Rock::DirMap::openForWriting(const cache_key *const key, sfileno &fileno)
+Rock::DbCellHeader &
+Rock::DirMap::header(const sfileno fileno)
{
- debugs(79, 5, HERE << " trying to open slot for key " << storeKeyText(key)
- << " for writing in map [" << path << ']');
- const int idx = slotIdx(key);
- Slot &s = shared->slots[idx];
-
- if (s.exclusiveLock()) {
- assert(s.state != Slot::Writeable); // until we start breaking locks
- if (s.state == Slot::Empty) // we may also overwrite a Readable slot
- ++shared->count;
- s.state = Slot::Writeable;
- s.setKey(key);
- fileno = idx;
- debugs(79, 5, HERE << " opened slot at " << fileno << " for key " <<
- storeKeyText(key) << " for writing in map [" << path << ']');
- return &s.seBasics; // and keep the entry locked
- }
-
- debugs(79, 5, HERE << " failed to open slot at " << idx << " for key " << storeKeyText(key)
- << " for writing in map [" << path << ']');
- return NULL;
-}
-
-void
-Rock::DirMap::closeForWriting(const sfileno fileno)
-{
- debugs(79, 5, HERE << " closing slot at " << fileno << " for writing and "
- "openning for reading in map [" << path << ']');
- assert(valid(fileno));
- Slot &s = shared->slots[fileno];
- assert(s.state.swap_if(Slot::Writeable, Slot::Readable));
- s.switchExclusiveToSharedLock();
-}
-
-/// terminate writing the entry, freeing its slot for others to use
-void
-Rock::DirMap::abortWriting(const sfileno fileno)
-{
- debugs(79, 5, HERE << " abort writing slot at " << fileno <<
- " in map [" << path << ']');
- assert(valid(fileno));
- Slot &s = shared->slots[fileno];
- assert(s.state == Slot::Writeable);
- freeLocked(s);
-}
-
-void
-Rock::DirMap::abortIo(const sfileno fileno)
-{
- debugs(79, 5, HERE << " abort I/O for slot at " << fileno <<
- " in map [" << path << ']');
- assert(valid(fileno));
- Slot &s = shared->slots[fileno];
-
- // The caller is a lock holder. Thus, if we are Writeable, then the
- // caller must be the writer; otherwise the caller must be the reader.
- if (s.state == Slot::Writeable)
- abortWriting(fileno);
- else
- closeForReading(fileno);
-}
-
-const Rock::StoreEntryBasics *
-Rock::DirMap::peekAtReader(const sfileno fileno) const
-{
- assert(valid(fileno));
- const Slot &s = shared->slots[fileno];
- switch (s.state) {
- case Slot::Readable:
- return &s.seBasics; // immediate access by lock holder so no locking
- case Slot::Writeable:
- return NULL; // cannot read the slot when it is being written
- case Slot::Empty:
- assert(false); // must be locked for reading or writing
- }
- assert(false); // not reachable
- return NULL;
-}
-
-bool
-Rock::DirMap::putAt(const DbCellHeader &header, const StoreEntry &e,
- const sfileno fileno)
-{
- const cache_key *key = static_cast<const cache_key*>(e.key);
- debugs(79, 5, HERE << " trying to open slot for key " << storeKeyText(key)
- << " for putting in map [" << path << ']');
- if (!valid(fileno)) {
- debugs(79, 5, HERE << "failure: bad fileno: " << fileno);
- return false;
- }
-
- const int idx = slotIdx(key);
- if (fileno != idx) {
- debugs(79, 5, HERE << "failure: hash changed: " << idx << " vs. " <<
- fileno);
- return false;
- }
-
- Slot &s = shared->slots[idx];
-
- if (s.exclusiveLock()) {
- assert(s.state != Slot::Writeable); // until we start breaking locks
- if (s.state == Slot::Empty) // we may also overwrite a Readable slot
- ++shared->count;
- s.setKey(static_cast<const cache_key*>(e.key));
- s.seBasics.set(header, e);
- s.state = Slot::Readable;
- s.releaseExclusiveLock();
- debugs(79, 5, HERE << " put slot at " << fileno << " for key " <<
- storeKeyText(key) << " in map [" << path << ']');
- return true;
- }
-
- debugs(79, 5, HERE << " failed to open slot for key " << storeKeyText(key)
- << " for putting in map [" << path << ']');
- return false;
-}
-
-void
-Rock::DirMap::free(const sfileno fileno)
-{
- debugs(79, 5, HERE << " marking slot at " << fileno << " to be freed in"
- " map [" << path << ']');
-
- assert(valid(fileno));
- Slot &s = shared->slots[fileno];
- s.waitingToBeFreed = true; // mark, regardless of whether we can free
- freeIfNeeded(s);
-}
-
-const Rock::StoreEntryBasics *
-Rock::DirMap::openForReading(const cache_key *const key, sfileno &fileno)
-{
- debugs(79, 5, HERE << " trying to open slot for key " << storeKeyText(key)
- << " for reading in map [" << path << ']');
- const int idx = slotIdx(key);
- if (const StoreEntryBasics *const seBasics = openForReadingAt(idx)) {
- Slot &s = shared->slots[idx];
- if (s.checkKey(key)) {
- fileno = idx;
- debugs(79, 5, HERE << " opened slot at " << fileno << " for key "
- << storeKeyText(key) << " for reading in map [" << path <<
- ']');
- return seBasics;
- }
- s.releaseSharedLock();
- freeIfNeeded(s);
- }
- debugs(79, 5, HERE << " failed to open slot for key " << storeKeyText(key)
- << " for reading in map [" << path << ']');
- return 0;
+ assert(0 <= fileno && fileno < entryLimit());
+ assert(shared);
+ return shared->headers[fileno];
}
-const Rock::StoreEntryBasics *
-Rock::DirMap::openForReadingAt(const sfileno fileno)
+const Rock::DbCellHeader &
+Rock::DirMap::header(const sfileno fileno) const
{
- debugs(79, 5, HERE << " trying to open slot at " << fileno << " for "
- "reading in map [" << path << ']');
- assert(valid(fileno));
- Slot &s = shared->slots[fileno];
- if (s.sharedLock()) {
- debugs(79, 5, HERE << " opened slot at " << fileno << " for reading in"
- " map [" << path << ']');
- return &s.seBasics;
- }
- freeIfNeeded(s);
- debugs(79, 5, HERE << " failed to open slot at " << fileno << " for "
- "reading in map [" << path << ']');
- return 0;
-}
-
-void
-Rock::DirMap::closeForReading(const sfileno fileno)
-{
- debugs(79, 5, HERE << " closing slot at " << fileno << " for reading in "
- "map [" << path << ']');
- assert(valid(fileno));
- Slot &s = shared->slots[fileno];
- assert(s.state == Slot::Readable);
- s.releaseSharedLock();
- freeIfNeeded(s);
-}
-
-int
-Rock::DirMap::entryLimit() const
-{
- return shared->limit;
-}
-
-int
-Rock::DirMap::entryCount() const
-{
- return shared->count;
-}
-
-bool
-Rock::DirMap::full() const
-{
- return entryCount() >= entryLimit();
-}
-
-void
-Rock::DirMap::updateStats(MapStats &stats) const
-{
- stats.capacity += shared->limit;
- for (int i = 0; i < shared->limit; ++i)
- shared->slots[i].updateStats(stats);
-}
-
-bool
-Rock::DirMap::valid(const int pos) const
-{
- return 0 <= pos && pos < entryLimit();
+ assert(0 <= fileno && fileno < entryLimit());
+ assert(shared);
+ return shared->headers[fileno];
}
int
return sfilenoMax;
}
-int
-Rock::DirMap::slotIdx(const cache_key *const key) const
-{
- const uint64_t *const k = reinterpret_cast<const uint64_t *>(key);
- // TODO: use a better hash function
- return (k[0] + k[1]) % shared->limit;
-}
-
-Rock::Slot &
-Rock::DirMap::slot(const cache_key *const key)
-{
- return shared->slots[slotIdx(key)];
-}
-
-/// frees the slot if (b) it is waiting to be freed and (a) we can lock it
-void
-Rock::DirMap::freeIfNeeded(Slot &s)
+size_t
+Rock::DirMap::Shared::MemSize(int limit)
{
- if (s.exclusiveLock()) {
- if (s.waitingToBeFreed == true)
- freeLocked(s);
- else
- s.releaseExclusiveLock();
- }
-}
-
-/// unconditionally frees the already exclusively locked slot and releases lock
-void
-Rock::DirMap::freeLocked(Slot &s)
-{
- memset(s.key_, 0, sizeof(s.key_));
- memset(&s.seBasics, 0, sizeof(s.seBasics));
- s.waitingToBeFreed = false;
- s.state = Slot::Empty;
- s.releaseExclusiveLock();
- --shared->count;
- debugs(79, 5, HERE << " freed slot at " << (&s - shared->slots) <<
- " in map [" << path << ']');
-}
-
-int
-Rock::DirMap::SharedSize(const int limit)
-{
- return sizeof(Shared) + limit * sizeof(Slot);
-}
-
-
-/* Rock::Slot */
-
-void
-Rock::Slot::setKey(const cache_key *const aKey)
-{
- memcpy(key_, aKey, sizeof(key_));
-}
-
-bool
-Rock::Slot::checkKey(const cache_key *const aKey) const
-{
- const uint32_t *const k = reinterpret_cast<const uint32_t *>(aKey);
- return k[0] == key_[0] && k[1] == key_[1] &&
- k[2] == key_[2] && k[3] == key_[3];
-}
-
-
-bool
-Rock::Slot::sharedLock() const
-{
- ++readers; // this locks new writers out
- if (state == Readable && !writers && !waitingToBeFreed)
- return true;
- --readers;
- return false;
-}
-
-bool
-Rock::Slot::exclusiveLock()
-{
- if (!writers++) { // we are the first writer (this locks new readers out)
- if (!readers) // there are no old readers
- return true;
- }
- --writers;
- return false;
-}
-
-void
-Rock::Slot::releaseSharedLock() const
-{
- assert(readers-- > 0);
-}
-
-void
-Rock::Slot::releaseExclusiveLock()
-{
- assert(writers-- > 0);
-}
-
-void
-Rock::Slot::switchExclusiveToSharedLock()
-{
- ++readers; // must be done before we release exclusive control
- releaseExclusiveLock();
-}
-
-void
-Rock::Slot::updateStats(MapStats &stats) const
-{
- switch (state) {
- case Readable:
- ++stats.readable;
- stats.readers += readers;
- break;
- case Writeable:
- ++stats.writeable;
- stats.writers += writers;
- break;
- case Empty:
- ++stats.empty;
- break;
- }
-
- if (waitingToBeFreed)
- ++stats.marked;
-}
-
-
-Rock::DirMap::Shared::Shared(const int aLimit): limit(aLimit), count(0)
-{
-}
-
-void
-Rock::StoreEntryBasics::set(const DbCellHeader &aHeader, const StoreEntry &from)
-{
- assert(from.swap_file_sz > 0);
- memset(this, 0, sizeof(*this));
- header = aHeader;
- timestamp = from.timestamp;
- lastref = from.lastref;
- expires = from.expires;
- lastmod = from.lastmod;
- swap_file_sz = from.swap_file_sz;
- refcount = from.refcount;
- flags = from.flags;
-}
-
-
-/* MapStats */
-
-Rock::MapStats::MapStats()
-{
- memset(this, 0, sizeof(*this));
-}
-
-void
-Rock::MapStats::dump(StoreEntry &e) const
-{
- storeAppendPrintf(&e, "Available slots: %9d\n", capacity);
-
- if (!capacity)
- return;
-
- storeAppendPrintf(&e, "Readable slots: %9d %6.2f%%\n",
- readable, (100.0 * readable / capacity));
- storeAppendPrintf(&e, "Filling slots: %9d %6.2f%%\n",
- writeable, (100.0 * writeable / capacity));
- storeAppendPrintf(&e, "Empty slots: %9d %6.2f%%\n",
- empty, (100.0 * empty / capacity));
-
- if (readers || writers) {
- const int locks = readers + writers;
- storeAppendPrintf(&e, "Readers: %9d %6.2f%%\n",
- readers, (100.0 * readers / locks));
- storeAppendPrintf(&e, "Writers: %9d %6.2f%%\n",
- writers, (100.0 * writers / locks));
- }
-
- if (readable + writeable) {
- storeAppendPrintf(&e, "Marked slots: %9d %6.2f%%\n",
- marked, (100.0 * marked / (readable + writeable)));
- }
+ return sizeof(Shared) + limit*sizeof(DbCellHeader);
}
#define SQUID_FS_ROCK_DIR_MAP_H
#include "fs/rock/RockFile.h"
-#include "ipc/AtomicWord.h"
-#include "ipc/mem/Segment.h"
+#include "ipc/StoreMap.h"
namespace Rock {
-class StoreEntryBasics {
-public:
- void set(const DbCellHeader &aHeader, const StoreEntry &anEntry);
-
- DbCellHeader header; ///< rock-specific entry metadata
-
- /* START OF ON-DISK STORE_META_STD TLV field */
- time_t timestamp;
- time_t lastref;
- time_t expires;
- time_t lastmod;
- uint64_t swap_file_sz;
- u_short refcount;
- u_short flags;
- /* END OF ON-DISK STORE_META_STD */
-};
-
-/// aggregates basic map performance measurements; all numbers are approximate
-class MapStats {
-public:
- MapStats();
-
- void dump(StoreEntry &e) const;
-
- int capacity; ///< the total number of slots in the map
- int readable; ///< number of slots in Readable state
- int writeable; ///< number of slots in Writeable state
- int empty; ///< number of slots in Empty state
- int readers; ///< sum of slot.readers
- int writers; ///< sum of slot.writers
- int marked; ///< number of slots marked for freeing
-};
-
-/// DirMap entry
-class Slot {
-public:
- /// possible persistent states
- typedef enum {
- Empty, ///< ready for writing, with nothing of value
- Writeable, ///< transitions from Empty to Readable
- Readable, ///< ready for reading
- } State;
-
- void setKey(const cache_key *const aKey);
- bool checkKey(const cache_key *const aKey) const;
-
- bool sharedLock() const; ///< lock for reading or return false
- bool exclusiveLock(); ///< lock for modification or return false
- void releaseSharedLock() const; ///< undo successful sharedLock()
- void releaseExclusiveLock(); ///< undo successful exclusiveLock()
- void switchExclusiveToSharedLock(); ///< trade exclusive for shared access
-
- /// adds approximate current stats to the supplied ones
- void updateStats(MapStats &stats) const;
-
-public:
- // we want two uint64_t, but older GCCs lack __sync_fetch_and_add_8
- AtomicWordT<uint32_t> key_[4]; ///< MD5 entry key
- StoreEntryBasics seBasics; ///< basic store entry data
- AtomicWordT<uint8_t> state; ///< current state
- AtomicWordT<uint8_t> waitingToBeFreed; ///< a state-independent mark
-
-private:
- mutable AtomicWord readers; ///< number of users trying to read
- AtomicWord writers; ///< number of writers trying to modify the slot
-};
-
/// \ingroup Rock
/// map of used db slots indexed by sfileno
-class DirMap
+class DirMap: public Ipc::StoreMap
{
public:
DirMap(const char *const aPath, const int limit); ///< create a new shared DirMap
DirMap(const char *const aPath); ///< open an existing shared DirMap
- /// finds/reservers space for writing a new entry or returns nil
- StoreEntryBasics *openForWriting(const cache_key *const key, sfileno &fileno);
- /// successfully finish writing the entry, leaving it opened for reading
- void closeForWriting(const sfileno fileno);
-
- /// stores entry info at the requested slot or returns false
- bool putAt(const DbCellHeader &header, const StoreEntry &e, const sfileno fileno);
-
- /// only works on locked entries; returns nil unless the slot is readable
- const StoreEntryBasics *peekAtReader(const sfileno fileno) const;
-
- /// mark the slot as waiting to be freed and, if possible, free it
- void free(const sfileno fileno);
-
- /// open slot for reading, increments read level
- const StoreEntryBasics *openForReading(const cache_key *const key, sfileno &fileno);
- /// open slot for reading, increments read level
- const StoreEntryBasics *openForReadingAt(const sfileno fileno);
- /// close slot after reading, decrements read level
- void closeForReading(const sfileno fileno);
-
- /// called by lock holder to terminate either slot writing or reading
- void abortIo(const sfileno fileno);
-
- bool full() const; ///< there are no empty slots left
- bool valid(int n) const; ///< whether n is a valid slot coordinate
- int entryCount() const; ///< number of used slots
- int entryLimit() const; ///< maximum number of slots that can be used
-
- /// adds approximate current stats to the supplied ones
- void updateStats(MapStats &stats) const;
+ /// write access to the cell header; call openForWriting() first!
+ DbCellHeader &header(const sfileno fileno);
+ /// read-only access to the cell header; call openForReading() first!
+ const DbCellHeader &header(const sfileno fileno) const;
static int AbsoluteEntryLimit(); ///< maximum entryLimit() possible
private:
- struct Shared {
- Shared(const int aLimit);
-
- const AtomicWord limit; ///< maximum number of map slots
- AtomicWord count; ///< current number of map slots
-
- Slot slots[]; ///< slots storage
+ /// data shared by all DirMaps with the same path
+ class Shared {
+ public:
+ static size_t MemSize(int limit);
+ DbCellHeader headers[]; ///< DbCellHeaders storage
};
- int slotIdx(const cache_key *const key) const;
- Slot &slot(const cache_key *const key);
- const StoreEntryBasics *openForReading(Slot &s);
- void abortWriting(const sfileno fileno);
- void freeIfNeeded(Slot &s);
- void freeLocked(Slot &s);
- String sharedMemoryName();
-
- static int SharedSize(const int limit);
-
- const String path; ///< cache_dir path, used for logging
- Ipc::Mem::Segment shm; ///< shared memory segment
Shared *shared; ///< pointer to shared memory
};
return NULL;
sfileno fileno;
- const StoreEntryBasics *const basics = map->openForReading(key, fileno);
- if (!basics)
+ const Ipc::StoreMapSlot *const slot = map->openForReading(key, fileno);
+ if (!slot)
return NULL;
+ const Ipc::StoreMapSlot::Basics &basics = slot->basics;
+
// create a brand new store entry and initialize it with stored basics
StoreEntry *e = new StoreEntry();
e->lock_count = 0;
e->swap_dirn = index;
e->swap_filen = fileno;
- e->swap_file_sz = basics->swap_file_sz;
- e->lastref = basics->lastref;
- e->timestamp = basics->timestamp;
- e->expires = basics->expires;
- e->lastmod = basics->lastmod;
- e->refcount = basics->refcount;
- e->flags = basics->flags;
+ e->swap_file_sz = basics.swap_file_sz;
+ e->lastref = basics.lastref;
+ e->timestamp = basics.timestamp;
+ e->expires = basics.expires;
+ e->lastmod = basics.lastmod;
+ e->refcount = basics.refcount;
+ e->flags = basics.flags;
e->store_status = STORE_OK;
e->setMemStatus(NOT_IN_MEMORY);
e->swap_status = SWAPOUT_DONE;
", fileno="<< std::setfill('0') << std::hex << std::uppercase <<
std::setw(8) << fileno);
- if (map->putAt(header, from, fileno)) {
- // we do not add this entry to store_table so core will not updateSize
- updateSize(from.swap_file_sz, +1);
- return true;
+ sfileno newLocation = 0;
+ if (Ipc::StoreMapSlot *slot = map->openForWriting(reinterpret_cast<const cache_key *>(from.key), newLocation)) {
+ if (fileno == newLocation) {
+ slot->set(from);
+ map->header(fileno) = header;
+ // core will not updateSize: we do not add the entry to store_table
+ updateSize(from.swap_file_sz, +1);
+ } // else some other, newer entry got into our cell
+ map->closeForWriting(newLocation, false);
+ return fileno == newLocation;
}
return false;
assert(payloadEnd <= max_objsize);
sfileno fileno;
- StoreEntryBasics *const basics =
+ Ipc::StoreMapSlot *const slot =
map->openForWriting(reinterpret_cast<const cache_key *>(e.key), fileno);
- if (!basics) {
+ if (!slot) {
debugs(47, 5, HERE << "Rock::SwapDir::createStoreIO: map->add failed");
return NULL;
}
e.swap_file_sz = header.payloadSize; // and will be copied to the map
- basics->set(header, e);
+ slot->set(e);
+ map->header(fileno) = header;
// XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
// If that does not happen, the entry will not decrement the read level!
// The are two ways an entry can get swap_filen: our get() locked it for
// reading or our storeSwapOutStart() locked it for writing. Peeking at our
// locked entry is safe, but no support for reading a filling entry.
- const StoreEntryBasics *basics = map->peekAtReader(e.swap_filen);
- if (!basics)
+ const Ipc::StoreMapSlot *slot = map->peekAtReader(e.swap_filen);
+ if (!slot)
return NULL; // we were writing afterall
IoState *sio = new IoState(this, &e, cbFile, cbIo, data);
sio->swap_dirn = index;
sio->swap_filen = e.swap_filen;
- sio->payloadEnd = sizeof(DbCellHeader) + basics->header.payloadSize;
+ sio->payloadEnd = sizeof(DbCellHeader) + map->header(e.swap_filen).payloadSize;
assert(sio->payloadEnd <= max_objsize); // the payload fits the slot
debugs(47,5, HERE << "dir " << index << " has old fileno: " <<
std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
sio->swap_filen);
- assert(basics->swap_file_sz > 0);
- assert(basics->swap_file_sz == e.swap_file_sz);
+ assert(slot->basics.swap_file_sz > 0);
+ assert(slot->basics.swap_file_sz == e.swap_file_sz);
sio->diskOffset = diskOffset(sio->swap_filen);
assert(sio->diskOffset + sio->payloadEnd <= diskOffsetLimit());
if (errflag == DISK_OK) {
// close, assuming we only write once; the entry gets the read lock
- map->closeForWriting(sio.swap_filen);
+ map->closeForWriting(sio.swap_filen, true);
// do not increment sio.offset_ because we do it in sio->write()
} else {
// Do not abortWriting here. The entry should keep the write lock
entryCount, (100.0 * entryCount / limit));
if (limit < 100) { // XXX: otherwise too expensive to count
- MapStats stats;
+ Ipc::ReadWriteLockStats stats;
map->updateStats(stats);
stats.dump(e);
}
Messages.h \
Queue.cc \
Queue.h \
+ ReadWriteLock.cc \
+ ReadWriteLock.h \
StartListening.cc \
StartListening.h \
+ StoreMap.cc \
+ StoreMap.h \
StrandCoord.cc \
StrandCoord.h \
StrandCoords.h \
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ */
+
+#include "squid.h"
+
+#include "Store.h"
+#include "ipc/ReadWriteLock.h"
+
+bool
+Ipc::ReadWriteLock::lockShared()
+{
+ ++readers; // this locks "new" writers out
+ if (!writers) // there are no old writers
+ return true;
+ --readers;
+ return false;
+}
+
+bool
+Ipc::ReadWriteLock::lockExclusive()
+{
+ if (!writers++) { // we are the first writer + this locks "new" readers out
+ if (!readers) // there are no old readers
+ return true;
+ }
+ --writers;
+ return false;
+}
+
+void
+Ipc::ReadWriteLock::unlockShared()
+{
+ assert(readers-- > 0);
+}
+
+void
+Ipc::ReadWriteLock::unlockExclusive()
+{
+ assert(writers-- > 0);
+}
+
+void
+Ipc::ReadWriteLock::switchExclusiveToShared()
+{
+ ++readers; // must be done before we release exclusive control
+ unlockExclusive();
+}
+
+void
+Ipc::ReadWriteLock::updateStats(ReadWriteLockStats &stats) const
+{
+ if (readers) {
+ ++stats.readable;
+ stats.readers += readers;
+ } else if (writers) {
+ ++stats.writeable;
+ stats.writers += writers;
+ } else {
+ ++stats.idle;
+ }
+ ++stats.count;
+}
+
+
+/* Ipc::ReadWriteLockStats */
+
+Ipc::ReadWriteLockStats::ReadWriteLockStats()
+{
+ memset(this, 0, sizeof(*this));
+}
+
+void
+Ipc::ReadWriteLockStats::dump(StoreEntry &e) const
+{
+ storeAppendPrintf(&e, "Available locks: %9d\n", count);
+
+ if (!count)
+ return;
+
+ storeAppendPrintf(&e, "Reading: %9d %6.2f%%\n",
+ readable, (100.0 * readable / count));
+ storeAppendPrintf(&e, "Writing: %9d %6.2f%%\n",
+ writeable, (100.0 * writeable / count));
+ storeAppendPrintf(&e, "Idle: %9d %6.2f%%\n",
+ idle, (100.0 * idle / count));
+
+ if (readers || writers) {
+ const int locked = readers + writers;
+ storeAppendPrintf(&e, "Readers: %9d %6.2f%%\n",
+ readers, (100.0 * readers / locked));
+ storeAppendPrintf(&e, "Writers: %9d %6.2f%%\n",
+ writers, (100.0 * writers / locked));
+ }
+}
--- /dev/null
+#ifndef SQUID_IPC_READ_WRITE_LOCK_H
+#define SQUID_IPC_READ_WRITE_LOCK_H
+
+#include "ipc/AtomicWord.h"
+
+class StoreEntry;
+
+namespace Ipc {
+
+class ReadWriteLockStats;
+
+/// an atomic readers-writer or shared-exclusive lock suitable for maps/tables
+class ReadWriteLock {
+public:
+ // default constructor is OK because of shared memory zero-initialization
+
+ bool lockShared(); ///< lock for reading or return false
+ bool lockExclusive(); ///< lock for modification or return false
+ void unlockShared(); ///< undo successful sharedLock()
+ void unlockExclusive(); ///< undo successful exclusiveLock()
+ void switchExclusiveToShared(); ///< stop writing, start reading
+
+ /// adds approximate current stats to the supplied ones
+ void updateStats(ReadWriteLockStats &stats) const;
+
+public:
+ mutable AtomicWord readers; ///< number of users trying to read
+ AtomicWord writers; ///< number of writers trying to modify protected data
+};
+
+
+/// approximate stats of a set of ReadWriteLocks
+class ReadWriteLockStats {
+public:
+ ReadWriteLockStats();
+
+ void dump(StoreEntry &e) const;
+
+ int count; ///< the total number of locks
+ int readable; ///< number of locks locked for reading
+ int writeable; ///< number of locks locked for writing
+ int idle; ///< number of unlocked locks
+ int readers; ///< sum of lock.readers
+ int writers; ///< sum of lock.writers
+};
+
+} // namespace Ipc
+
+#endif /* SQUID_IPC_READ_WRITE_LOCK_H */
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ */
+
+#include "squid.h"
+
+#include "Store.h"
+#include "ipc/StoreMap.h"
+
+Ipc::StoreMap::StoreMap(const char *const aPath, const int limit,
+ size_t sharedSizeExtra):
+ path(aPath), shm(aPath), shared(NULL)
+{
+ const size_t mySharedSize = Shared::MemSize(limit);
+ shm.create(mySharedSize + sharedSizeExtra);
+ shared = new (shm.reserve(mySharedSize)) Shared(limit);
+ debugs(54, 5, HERE << "new map [" << path << "] created");
+}
+
+Ipc::StoreMap::StoreMap(const char *const aPath):
+ path(aPath), shm(aPath), shared(NULL)
+{
+ shm.open();
+ assert(shm.mem());
+ shared = reinterpret_cast<Shared *>(shm.mem());
+ // check that nobody used our segment chunk and that shared->limit is sane
+ assert(shared == reinterpret_cast<Shared *>(shm.reserve(Shared::MemSize(shared->limit))));
+ debugs(54, 5, HERE << "attached map [" << path << "] created");
+}
+
+Ipc::StoreMap::Slot *
+Ipc::StoreMap::openForWriting(const cache_key *const key, sfileno &fileno)
+{
+ debugs(54, 5, HERE << " trying to open slot for key " << storeKeyText(key)
+ << " for writing in map [" << path << ']');
+ const int idx = slotIndexByKey(key);
+
+ Slot &s = shared->slots[idx];
+ ReadWriteLock &lock = s.lock;
+
+ if (lock.lockExclusive()) {
+ assert(s.state != Slot::Writeable); // until we start breaking locks
+
+ // free if the entry was dirty, keeping the entry locked
+ if (s.waitingToBeFreed == true)
+ freeLocked(s, true);
+
+ if (s.state == Slot::Empty) // we may also overwrite a Readable slot
+ ++shared->count;
+ s.state = Slot::Writeable;
+ fileno = idx;
+ //s.setKey(key); // XXX: the caller should do that
+ debugs(54, 5, HERE << " opened slot at " << idx <<
+ " for writing in map [" << path << ']');
+ return &s; // and keep the entry locked
+ }
+
+ debugs(54, 5, HERE << " failed to open slot at " << idx <<
+ " for writing in map [" << path << ']');
+ return NULL;
+}
+
+void
+Ipc::StoreMap::closeForWriting(const sfileno fileno, bool lockForReading)
+{
+ debugs(54, 5, HERE << " closing slot at " << fileno << " for writing and "
+ "openning for reading in map [" << path << ']');
+ assert(valid(fileno));
+ Slot &s = shared->slots[fileno];
+ assert(s.state == Slot::Writeable);
+ s.state = Slot::Readable;
+ if (lockForReading)
+ s.lock.switchExclusiveToShared();
+ else
+ s.lock.unlockExclusive();
+}
+
+/// terminate writing the entry, freeing its slot for others to use
+void
+Ipc::StoreMap::abortWriting(const sfileno fileno)
+{
+ debugs(54, 5, HERE << " abort writing slot at " << fileno <<
+ " in map [" << path << ']');
+ assert(valid(fileno));
+ Slot &s = shared->slots[fileno];
+ assert(s.state == Slot::Writeable);
+ freeLocked(s, false);
+}
+
+void
+Ipc::StoreMap::abortIo(const sfileno fileno)
+{
+ debugs(54, 5, HERE << " abort I/O for slot at " << fileno <<
+ " in map [" << path << ']');
+ assert(valid(fileno));
+ Slot &s = shared->slots[fileno];
+
+ // The caller is a lock holder. Thus, if we are Writeable, then the
+ // caller must be the writer; otherwise the caller must be the reader.
+ if (s.state == Slot::Writeable)
+ abortWriting(fileno);
+ else
+ closeForReading(fileno);
+}
+
+const Ipc::StoreMap::Slot *
+Ipc::StoreMap::peekAtReader(const sfileno fileno) const
+{
+ assert(valid(fileno));
+ const Slot &s = shared->slots[fileno];
+ switch (s.state) {
+ case Slot::Readable:
+ return &s; // immediate access by lock holder so no locking
+ case Slot::Writeable:
+ return NULL; // cannot read the slot when it is being written
+ case Slot::Empty:
+ assert(false); // must be locked for reading or writing
+ }
+ assert(false); // not reachable
+ return NULL;
+}
+
+void
+Ipc::StoreMap::free(const sfileno fileno)
+{
+ debugs(54, 5, HERE << " marking slot at " << fileno << " to be freed in"
+ " map [" << path << ']');
+
+ assert(valid(fileno));
+ Slot &s = shared->slots[fileno];
+
+ if (s.lock.lockExclusive())
+ freeLocked(s, false);
+ else
+ s.waitingToBeFreed = true; // mark to free it later
+}
+
+const Ipc::StoreMap::Slot *
+Ipc::StoreMap::openForReading(const cache_key *const key, sfileno &fileno)
+{
+ debugs(54, 5, HERE << " trying to open slot for key " << storeKeyText(key)
+ << " for reading in map [" << path << ']');
+ const int idx = slotIndexByKey(key);
+ if (const Slot *slot = openForReadingAt(idx)) {
+ if (slot->sameKey(key)) {
+ fileno = idx;
+ debugs(54, 5, HERE << " opened slot at " << fileno << " for key "
+ << storeKeyText(key) << " for reading in map [" << path <<
+ ']');
+ return slot; // locked for reading
+ }
+ slot->lock.unlockShared();
+ }
+ debugs(54, 5, HERE << " failed to open slot for key " << storeKeyText(key)
+ << " for reading in map [" << path << ']');
+ return NULL;
+}
+
+const Ipc::StoreMap::Slot *
+Ipc::StoreMap::openForReadingAt(const sfileno fileno)
+{
+ debugs(54, 5, HERE << " trying to open slot at " << fileno << " for "
+ "reading in map [" << path << ']');
+ assert(valid(fileno));
+ Slot &s = shared->slots[fileno];
+
+ if (!s.lock.lockShared()) {
+ debugs(54, 5, HERE << " failed to lock slot at " << fileno << " for "
+ "reading in map [" << path << ']');
+ return NULL;
+ }
+
+ if (s.state == Slot::Empty) {
+ s.lock.unlockShared();
+ debugs(54, 7, HERE << " empty slot at " << fileno << " for "
+ "reading in map [" << path << ']');
+ return NULL;
+ }
+
+ if (s.waitingToBeFreed) {
+ s.lock.unlockShared();
+ debugs(54, 7, HERE << " dirty slot at " << fileno << " for "
+ "reading in map [" << path << ']');
+ return NULL;
+ }
+
+ // cannot be Writing here if we got shared lock and checked Empty above
+ assert(s.state == Slot::Readable);
+ debugs(54, 5, HERE << " opened slot at " << fileno << " for reading in"
+ " map [" << path << ']');
+ return &s;
+}
+
+void
+Ipc::StoreMap::closeForReading(const sfileno fileno)
+{
+ debugs(54, 5, HERE << " closing slot at " << fileno << " for reading in "
+ "map [" << path << ']');
+ assert(valid(fileno));
+ Slot &s = shared->slots[fileno];
+ assert(s.state == Slot::Readable);
+ s.lock.unlockShared();
+}
+
+int
+Ipc::StoreMap::entryLimit() const
+{
+ return shared->limit;
+}
+
+int
+Ipc::StoreMap::entryCount() const
+{
+ return shared->count;
+}
+
+bool
+Ipc::StoreMap::full() const
+{
+ return entryCount() >= entryLimit();
+}
+
+void
+Ipc::StoreMap::updateStats(ReadWriteLockStats &stats) const
+{
+ for (int i = 0; i < shared->limit; ++i)
+ shared->slots[i].lock.updateStats(stats);
+}
+
+bool
+Ipc::StoreMap::valid(const int pos) const
+{
+ return 0 <= pos && pos < entryLimit();
+}
+
+int
+Ipc::StoreMap::slotIndexByKey(const cache_key *const key) const
+{
+ const uint64_t *const k = reinterpret_cast<const uint64_t *>(key);
+ // TODO: use a better hash function
+ return (k[0] + k[1]) % shared->limit;
+}
+
+Ipc::StoreMap::Slot &
+Ipc::StoreMap::slotByKey(const cache_key *const key)
+{
+ return shared->slots[slotIndexByKey(key)];
+}
+
+/// unconditionally frees the already exclusively locked slot and releases lock
+void
+Ipc::StoreMap::freeLocked(Slot &s, bool keepLocked)
+{
+ s.waitingToBeFreed = false;
+ s.state = Slot::Empty;
+ if (!keepLocked)
+ s.lock.unlockExclusive();
+ --shared->count;
+ debugs(54, 5, HERE << " freed slot at " << (&s - shared->slots) <<
+ " in map [" << path << ']');
+}
+
+
+/* Ipc::StoreMapSlot */
+
+Ipc::StoreMapSlot::StoreMapSlot(): state(Empty)
+{
+ xmemset(&key, 0, sizeof(key));
+ xmemset(&basics, 0, sizeof(basics));
+}
+
+void
+Ipc::StoreMapSlot::setKey(const cache_key *const aKey)
+{
+ memcpy(key, aKey, sizeof(key));
+}
+
+bool
+Ipc::StoreMapSlot::sameKey(const cache_key *const aKey) const
+{
+ const uint64_t *const k = reinterpret_cast<const uint64_t *>(aKey);
+ return k[0] == key[0] && k[1] == key[1];
+}
+
+void
+Ipc::StoreMapSlot::set(const StoreEntry &from)
+{
+ memcpy(key, from.key, sizeof(key));
+ // XXX: header = aHeader;
+ basics.timestamp = from.timestamp;
+ basics.lastref = from.lastref;
+ basics.expires = from.expires;
+ basics.lastmod = from.lastmod;
+ basics.swap_file_sz = from.swap_file_sz;
+ basics.refcount = from.refcount;
+ basics.flags = from.flags;
+}
+
+/* Ipc::StoreMap::Shared */
+
+Ipc::StoreMap::Shared::Shared(const int aLimit): limit(aLimit), count(0)
+{
+}
+
+size_t
+Ipc::StoreMap::Shared::MemSize(int limit)
+{
+ return sizeof(Shared) + limit * sizeof(Slot);
+}
+
--- /dev/null
+#ifndef SQUID_IPC_STORE_MAP_H
+#define SQUID_IPC_STORE_MAP_H
+
+#include "ipc/ReadWriteLock.h"
+#include "ipc/mem/Segment.h"
+
+namespace Ipc {
+
+/// a StoreMap element, holding basic shareable StoreEntry info
+class StoreMapSlot {
+public:
+ StoreMapSlot();
+
+ /// store StoreEntry key and basics
+ void set(const StoreEntry &anEntry);
+
+ void setKey(const cache_key *const aKey);
+ bool sameKey(const cache_key *const aKey) const;
+
+public:
+ mutable ReadWriteLock lock; ///< protects slot data below
+ AtomicWordT<uint8_t> waitingToBeFreed; ///< may be accessed w/o a lock
+
+ uint64_t key[2]; ///< StoreEntry key
+
+ // STORE_META_STD TLV field from StoreEntry
+ struct Basics {
+ time_t timestamp;
+ time_t lastref;
+ time_t expires;
+ time_t lastmod;
+ uint64_t swap_file_sz;
+ u_short refcount;
+ u_short flags;
+ } basics;
+
+ /// possible persistent states
+ typedef enum {
+ Empty, ///< ready for writing, with nothing of value
+ Writeable, ///< transitions from Empty to Readable
+ Readable, ///< ready for reading
+ } State;
+ State state; ///< current state
+};
+
+/// map of StoreMapSlots indexed by their keys, with read/write slot locking
+/// kids extend to store custom data
+class StoreMap
+{
+public:
+ typedef StoreMapSlot Slot;
+
+ StoreMap(const char *const aPath, const int limit, size_t sharedSizeExtra); ///< create a new shared StoreMap
+ explicit StoreMap(const char *const aPath); ///< open an existing shared StoreMap
+
+ /// finds, reservers space for writing a new entry or returns nil
+ Slot *openForWriting(const cache_key *const key, sfileno &fileno);
+ /// successfully finish writing the entry
+ void closeForWriting(const sfileno fileno, bool lockForReading = false);
+
+ /// only works on locked entries; returns nil unless the slot is readable
+ const Slot *peekAtReader(const sfileno fileno) const;
+
+ /// mark the slot as waiting to be freed and, if possible, free it
+ void free(const sfileno fileno);
+
+ /// open slot for reading, increments read level
+ const Slot *openForReading(const cache_key *const key, sfileno &fileno);
+ /// open slot for reading, increments read level
+ const Slot *openForReadingAt(const sfileno fileno);
+ /// close slot after reading, decrements read level
+ void closeForReading(const sfileno fileno);
+
+ /// called by lock holder to terminate either slot writing or reading
+ void abortIo(const sfileno fileno);
+
+ bool full() const; ///< there are no empty slots left
+ bool valid(int n) const; ///< whether n is a valid slot coordinate
+ int entryCount() const; ///< number of used slots
+ int entryLimit() const; ///< maximum number of slots that can be used
+
+ /// adds approximate current stats to the supplied ones
+ void updateStats(ReadWriteLockStats &stats) const;
+
+protected:
+ class Shared {
+ public:
+ static size_t MemSize(int limit);
+
+ Shared(const int aLimit);
+
+ const AtomicWord limit; ///< maximum number of map slots
+ AtomicWord count; ///< current number of map slots
+
+ Slot slots[]; ///< slots storage
+ };
+
+protected:
+ const String path; ///< cache_dir path, used for logging
+ Ipc::Mem::Segment shm; ///< shared memory segment
+
+private:
+ int slotIndexByKey(const cache_key *const key) const;
+ Slot &slotByKey(const cache_key *const key);
+
+ Slot *openForReading(Slot &s);
+ void abortWriting(const sfileno fileno);
+ void freeIfNeeded(Slot &s);
+ void freeLocked(Slot &s, bool keepLocked);
+ String sharedMemoryName();
+
+ Shared *shared; ///< pointer to shared memory
+};
+
+} // namespace Ipc
+
+// We do not reuse struct _fileMap because we cannot control its size,
+// resulting in sfilenos that are pointing beyond the database.
+
+#endif /* SQUID_IPC_STORE_MAP_H */